Skip to main content

synwire_daemon/
ipc.rs

1//! JSON-RPC 2.0 based IPC protocol for the synwire daemon.
2//!
3//! The daemon communicates with MCP server proxies over a Unix domain socket
4//! using newline-delimited JSON (NDJSON). Each line is a complete JSON-RPC 2.0
5//! request or response object.
6//!
7//! # Supported methods
8//!
9//! | Method string        | Variant              | Description                          |
10//! |----------------------|----------------------|--------------------------------------|
11//! | `index`              | [`IpcMethod::Index`] | Trigger indexing for a worktree      |
12//! | `search`             | [`IpcMethod::Search`]| Semantic vector search               |
13//! | `graph_query`        | [`IpcMethod::GraphQuery`] | Code graph query              |
14//! | `graph_search`       | [`IpcMethod::GraphSearch`]| Code graph search             |
15//! | `community_search`   | [`IpcMethod::CommunitySearch`] | Community detection search |
16//! | `hybrid_search`      | [`IpcMethod::HybridSearch`] | Hybrid vector + BM25 search |
17//! | `clone_repo`         | [`IpcMethod::CloneRepo`] | Clone a repository              |
18//! | `xref_query`         | [`IpcMethod::XrefQuery`] | Cross-reference query            |
19//! | `index_status`       | [`IpcMethod::IndexStatus`] | Check indexing status          |
20//!
21//! # Wire format
22//!
23//! Each message is a single UTF-8 JSON object terminated by `\n`. The daemon
24//! reads one line at a time, deserialises it as an [`IpcRequest`], dispatches
25//! the method, and writes back an [`IpcResponse`] followed by `\n`.
26
27#![forbid(unsafe_code)]
28
29use serde::{Deserialize, Serialize};
30use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
31
32/// JSON-RPC 2.0 version string.
33const JSONRPC_VERSION: &str = "2.0";
34
35// ── Error codes (JSON-RPC 2.0 standard range) ──────────────────────────────
36
37/// Error code: the requested method does not exist.
38pub const ERROR_METHOD_NOT_FOUND: i32 = -32601;
39
40/// Error code: invalid method parameters.
41pub const ERROR_INVALID_PARAMS: i32 = -32602;
42
43/// Error code: an internal error occurred while processing the request.
44pub const ERROR_INTERNAL: i32 = -32603;
45
46/// Error code: the request could not be parsed as valid JSON-RPC.
47pub const ERROR_PARSE: i32 = -32700;
48
49// ── Request ─────────────────────────────────────────────────────────────────
50
51/// A JSON-RPC 2.0 request received from a client over the UDS.
52#[derive(Debug, Deserialize)]
53pub struct IpcRequest {
54    /// Must be `"2.0"`.
55    pub jsonrpc: String,
56
57    /// Caller-assigned request identifier, echoed back in the response.
58    pub id: serde_json::Value,
59
60    /// The method name to invoke (e.g. `"search"`, `"index"`).
61    pub method: String,
62
63    /// Method-specific parameters. Defaults to `null` if omitted.
64    #[serde(default)]
65    pub params: serde_json::Value,
66}
67
68// ── Response ────────────────────────────────────────────────────────────────
69
70/// A JSON-RPC 2.0 response sent back to the client.
71#[derive(Debug, Serialize)]
72pub struct IpcResponse {
73    /// Always `"2.0"`.
74    pub jsonrpc: &'static str,
75
76    /// The request identifier from the corresponding [`IpcRequest`].
77    pub id: serde_json::Value,
78
79    /// The successful result payload, if any.
80    #[serde(skip_serializing_if = "Option::is_none")]
81    pub result: Option<serde_json::Value>,
82
83    /// The error payload, present only when the request failed.
84    #[serde(skip_serializing_if = "Option::is_none")]
85    pub error: Option<IpcError>,
86}
87
88impl IpcResponse {
89    /// Construct a successful response with the given result payload.
90    ///
91    /// # Examples
92    ///
93    /// ```
94    /// # use serde_json::json;
95    /// # use synwire_daemon::ipc::IpcResponse;
96    /// let resp = IpcResponse::success(json!(1), json!({"hits": 42}));
97    /// assert!(resp.error.is_none());
98    /// assert_eq!(resp.result, Some(json!({"hits": 42})));
99    /// ```
100    pub const fn success(id: serde_json::Value, result: serde_json::Value) -> Self {
101        Self {
102            jsonrpc: JSONRPC_VERSION,
103            id,
104            result: Some(result),
105            error: None,
106        }
107    }
108
109    /// Construct an error response.
110    ///
111    /// # Examples
112    ///
113    /// ```
114    /// # use serde_json::json;
115    /// # use synwire_daemon::ipc::{IpcResponse, ERROR_METHOD_NOT_FOUND};
116    /// let resp = IpcResponse::error(json!(1), ERROR_METHOD_NOT_FOUND, "unknown method");
117    /// assert!(resp.result.is_none());
118    /// assert_eq!(resp.error.as_ref().map(|e| e.code), Some(ERROR_METHOD_NOT_FOUND));
119    /// ```
120    pub fn error(id: serde_json::Value, code: i32, message: impl Into<String>) -> Self {
121        Self {
122            jsonrpc: JSONRPC_VERSION,
123            id,
124            result: None,
125            error: Some(IpcError {
126                code,
127                message: message.into(),
128            }),
129        }
130    }
131}
132
133// ── Error ───────────────────────────────────────────────────────────────────
134
135/// A JSON-RPC 2.0 error object.
136#[derive(Debug, Serialize)]
137pub struct IpcError {
138    /// A numeric error code. Standard codes are defined as `ERROR_*` constants.
139    pub code: i32,
140
141    /// A short human-readable description of the error.
142    pub message: String,
143}
144
145// ── Method enum ─────────────────────────────────────────────────────────────
146
147/// The set of IPC methods the daemon understands.
148///
149/// Parsed from the `method` field of an [`IpcRequest`]. Unrecognised method
150/// strings are captured in the [`Unknown`](IpcMethod::Unknown) variant so
151/// callers can return a structured `method not found` error rather than
152/// silently dropping the request.
153#[derive(Debug, Clone, PartialEq, Eq)]
154#[non_exhaustive]
155pub enum IpcMethod {
156    /// Trigger indexing for a worktree.
157    Index,
158    /// Semantic (vector) search.
159    Search,
160    /// Code graph query.
161    GraphQuery,
162    /// Code graph search.
163    GraphSearch,
164    /// Community detection search.
165    CommunitySearch,
166    /// Hybrid vector + BM25 search.
167    HybridSearch,
168    /// Clone a repository.
169    CloneRepo,
170    /// Cross-reference query.
171    XrefQuery,
172    /// Check indexing status for a worktree.
173    IndexStatus,
174    /// An unrecognised method string.
175    Unknown(String),
176}
177
178impl IpcMethod {
179    /// Parse a method string into the corresponding variant.
180    ///
181    /// # Examples
182    ///
183    /// ```
184    /// # use synwire_daemon::ipc::IpcMethod;
185    /// assert_eq!(IpcMethod::from_method_str("code.search_semantic"), IpcMethod::Search);
186    /// assert_eq!(
187    ///     IpcMethod::from_method_str("nonexistent"),
188    ///     IpcMethod::Unknown("nonexistent".to_owned()),
189    /// );
190    /// ```
191    pub fn from_method_str(s: &str) -> Self {
192        match s {
193            "index.build" => Self::Index,
194            "code.search_semantic" => Self::Search,
195            "graph_query" => Self::GraphQuery,
196            "graph_search" => Self::GraphSearch,
197            "code.search_by_community" => Self::CommunitySearch,
198            "code.search_hybrid" => Self::HybridSearch,
199            "clone_repo" => Self::CloneRepo,
200            "xref_query" => Self::XrefQuery,
201            "index.status" => Self::IndexStatus,
202            other => Self::Unknown(other.to_owned()),
203        }
204    }
205
206    /// Return the canonical wire-format string for this method.
207    ///
208    /// For [`Unknown`](IpcMethod::Unknown) variants this returns the original
209    /// unrecognised string.
210    pub fn as_str(&self) -> &str {
211        match self {
212            Self::Index => "index.build",
213            Self::Search => "code.search_semantic",
214            Self::GraphQuery => "graph_query",
215            Self::GraphSearch => "graph_search",
216            Self::CommunitySearch => "code.search_by_community",
217            Self::HybridSearch => "code.search_hybrid",
218            Self::CloneRepo => "clone_repo",
219            Self::XrefQuery => "xref_query",
220            Self::IndexStatus => "index.status",
221            Self::Unknown(s) => s,
222        }
223    }
224}
225
226impl std::fmt::Display for IpcMethod {
227    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
228        f.write_str(self.as_str())
229    }
230}
231
232impl std::str::FromStr for IpcMethod {
233    type Err = std::convert::Infallible;
234
235    /// Parse a method string. This never fails — unrecognised strings become
236    /// [`IpcMethod::Unknown`].
237    fn from_str(s: &str) -> Result<Self, Self::Err> {
238        Ok(Self::from_method_str(s))
239    }
240}
241
242// ── Frame I/O ───────────────────────────────────────────────────────────────
243
244/// Maximum size of a single JSON-RPC line (1 MiB).
245///
246/// Requests larger than this are rejected to prevent unbounded memory
247/// allocation from a misbehaving client.
248const MAX_LINE_BYTES: usize = 1024 * 1024;
249
250/// Read a single JSON-RPC request from a newline-delimited stream.
251///
252/// Returns `Ok(None)` on clean EOF (the client closed the connection).
253/// Returns `Err` with an [`IpcError`] if the line cannot be parsed.
254///
255/// The reader should be a `BufReader` wrapping the socket's read half.
256pub async fn read_request<R>(reader: &mut R) -> Result<Option<IpcRequest>, IpcError>
257where
258    R: tokio::io::AsyncBufRead + Unpin,
259{
260    let mut line = String::new();
261
262    // AsyncBufReadExt::read_line returns 0 on EOF.
263    let n = reader.read_line(&mut line).await.map_err(|e| IpcError {
264        code: ERROR_PARSE,
265        message: format!("I/O error reading request line: {e}"),
266    })?;
267
268    if n == 0 {
269        return Ok(None);
270    }
271
272    if n > MAX_LINE_BYTES {
273        return Err(IpcError {
274            code: ERROR_PARSE,
275            message: format!("request line exceeds maximum size ({MAX_LINE_BYTES} bytes)"),
276        });
277    }
278
279    let request: IpcRequest = serde_json::from_str(line.trim()).map_err(|e| IpcError {
280        code: ERROR_PARSE,
281        message: format!("invalid JSON-RPC request: {e}"),
282    })?;
283
284    if request.jsonrpc != JSONRPC_VERSION {
285        return Err(IpcError {
286            code: ERROR_PARSE,
287            message: format!(
288                "unsupported JSON-RPC version {:?}, expected {:?}",
289                request.jsonrpc, JSONRPC_VERSION
290            ),
291        });
292    }
293
294    Ok(Some(request))
295}
296
297/// Write a JSON-RPC response as a single newline-terminated JSON line.
298///
299/// The writer should be the socket's write half (or a `BufWriter` around it).
300pub async fn write_response<W>(writer: &mut W, response: &IpcResponse) -> std::io::Result<()>
301where
302    W: tokio::io::AsyncWrite + Unpin,
303{
304    let mut payload = serde_json::to_vec(response).map_err(|e| {
305        std::io::Error::new(
306            std::io::ErrorKind::InvalidData,
307            format!("failed to serialise IPC response: {e}"),
308        )
309    })?;
310    payload.push(b'\n');
311
312    writer.write_all(&payload).await?;
313    writer.flush().await?;
314
315    Ok(())
316}
317
318#[cfg(test)]
319#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
320mod tests {
321    use super::*;
322    use serde_json::json;
323
324    #[test]
325    fn method_roundtrip() {
326        let methods = [
327            "index.build",
328            "code.search_semantic",
329            "graph_query",
330            "graph_search",
331            "code.search_by_community",
332            "code.search_hybrid",
333            "clone_repo",
334            "xref_query",
335            "index.status",
336        ];
337
338        for name in methods {
339            let m = IpcMethod::from_method_str(name);
340            assert_eq!(m.as_str(), name);
341            assert_ne!(m, IpcMethod::Unknown(name.to_owned()));
342        }
343    }
344
345    #[test]
346    fn method_unknown() {
347        let m = IpcMethod::from_method_str("does_not_exist");
348        assert_eq!(m, IpcMethod::Unknown("does_not_exist".to_owned()));
349        assert_eq!(m.as_str(), "does_not_exist");
350    }
351
352    #[test]
353    fn method_from_str_trait() {
354        let m: IpcMethod = "code.search_semantic"
355            .parse()
356            .unwrap_or(IpcMethod::Unknown(String::new()));
357        assert_eq!(m, IpcMethod::Search);
358    }
359
360    #[test]
361    fn method_display() {
362        assert_eq!(IpcMethod::HybridSearch.to_string(), "code.search_hybrid");
363        assert_eq!(IpcMethod::Unknown("foo".to_owned()).to_string(), "foo");
364    }
365
366    #[test]
367    fn response_success_serialisation() {
368        let resp = IpcResponse::success(json!(42), json!({"status": "ok"}));
369        let serialised = serde_json::to_value(&resp).unwrap_or_default();
370        assert_eq!(serialised["jsonrpc"], "2.0");
371        assert_eq!(serialised["id"], 42);
372        assert_eq!(serialised["result"]["status"], "ok");
373        // error field should be absent (skip_serializing_if)
374        assert!(serialised.get("error").is_none());
375    }
376
377    #[test]
378    fn response_error_serialisation() {
379        let resp = IpcResponse::error(json!("abc"), ERROR_METHOD_NOT_FOUND, "not found");
380        let serialised = serde_json::to_value(&resp).unwrap_or_default();
381        assert_eq!(serialised["jsonrpc"], "2.0");
382        assert_eq!(serialised["id"], "abc");
383        assert!(serialised.get("result").is_none());
384        assert_eq!(serialised["error"]["code"], ERROR_METHOD_NOT_FOUND);
385        assert_eq!(serialised["error"]["message"], "not found");
386    }
387
388    #[tokio::test]
389    async fn read_request_valid() {
390        let input = r#"{"jsonrpc":"2.0","id":1,"method":"search","params":{"q":"hello"}}"#;
391        let input_with_newline = format!("{input}\n");
392        let mut cursor = std::io::Cursor::new(input_with_newline.into_bytes());
393        let mut reader = tokio::io::BufReader::new(&mut cursor);
394
395        let req = read_request(&mut reader).await;
396        assert!(req.is_ok());
397        let req = req.unwrap_or(None);
398        assert!(req.is_some());
399        let req = req.unwrap_or_else(|| IpcRequest {
400            jsonrpc: String::new(),
401            id: json!(null),
402            method: String::new(),
403            params: json!(null),
404        });
405        assert_eq!(req.method, "search");
406        assert_eq!(req.id, json!(1));
407        assert_eq!(req.params["q"], "hello");
408    }
409
410    #[tokio::test]
411    async fn read_request_eof() {
412        let mut cursor = std::io::Cursor::new(Vec::<u8>::new());
413        let mut reader = tokio::io::BufReader::new(&mut cursor);
414
415        let result = read_request(&mut reader).await;
416        assert!(result.is_ok());
417        assert!(
418            result
419                .unwrap_or(Some(IpcRequest {
420                    jsonrpc: String::new(),
421                    id: json!(null),
422                    method: String::new(),
423                    params: json!(null),
424                }))
425                .is_none()
426        );
427    }
428
429    #[tokio::test]
430    async fn read_request_invalid_json() {
431        let input = b"not json at all\n";
432        let mut cursor = std::io::Cursor::new(input.to_vec());
433        let mut reader = tokio::io::BufReader::new(&mut cursor);
434
435        let result = read_request(&mut reader).await;
436        assert!(result.is_err());
437        let err = result.unwrap_err();
438        assert_eq!(err.code, ERROR_PARSE);
439    }
440
441    #[tokio::test]
442    async fn read_request_wrong_version() {
443        let input = r#"{"jsonrpc":"1.0","id":1,"method":"search"}"#;
444        let input_with_newline = format!("{input}\n");
445        let mut cursor = std::io::Cursor::new(input_with_newline.into_bytes());
446        let mut reader = tokio::io::BufReader::new(&mut cursor);
447
448        let result = read_request(&mut reader).await;
449        assert!(result.is_err());
450        let err = result.unwrap_err();
451        assert_eq!(err.code, ERROR_PARSE);
452        assert!(err.message.contains("unsupported JSON-RPC version"));
453    }
454
455    #[tokio::test]
456    async fn write_response_roundtrip() {
457        let resp = IpcResponse::success(json!(99), json!(["a", "b"]));
458        let mut buf = Vec::new();
459
460        let write_result = write_response(&mut buf, &resp).await;
461        assert!(write_result.is_ok());
462
463        // The output should be valid JSON followed by a newline.
464        let output = String::from_utf8(buf).unwrap_or_default();
465        assert!(output.ends_with('\n'));
466
467        let parsed: serde_json::Value = serde_json::from_str(output.trim()).unwrap_or_default();
468        assert_eq!(parsed["id"], 99);
469        assert_eq!(parsed["result"], json!(["a", "b"]));
470    }
471
472    #[tokio::test]
473    async fn read_request_default_params() {
474        let input = r#"{"jsonrpc":"2.0","id":1,"method":"index.status"}"#;
475        let input_with_newline = format!("{input}\n");
476        let mut cursor = std::io::Cursor::new(input_with_newline.into_bytes());
477        let mut reader = tokio::io::BufReader::new(&mut cursor);
478
479        let req = read_request(&mut reader).await;
480        assert!(req.is_ok());
481        let req = req.unwrap_or(None);
482        assert!(req.is_some());
483        let req = req.unwrap_or_else(|| IpcRequest {
484            jsonrpc: String::new(),
485            id: json!(null),
486            method: String::new(),
487            params: json!(null),
488        });
489        assert_eq!(req.method, "index.status");
490        assert!(req.params.is_null());
491    }
492}