Skip to main content

ambient_fs_client/
client.rs

1use ambient_fs_core::{awareness::FileAwareness, FileEvent};
2use serde::{Deserialize, Serialize};
3use serde_json::{json, Value as JsonValue};
4use std::collections::HashMap;
5use std::path::PathBuf;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::Arc;
8use thiserror::Error;
9use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
10use tokio::sync::{mpsc, oneshot, Mutex};
11use tokio::task::JoinHandle;
12
13// Platform-specific stream types: Unix socket on unix, TCP on windows.
14// Both OwnedWriteHalf variants implement AsyncWrite, so the rest of the
15// code is identical across platforms.
16#[cfg(unix)]
17use tokio::net::unix::OwnedWriteHalf;
18#[cfg(unix)]
19use tokio::net::UnixStream;
20
21#[cfg(windows)]
22use tokio::net::tcp::OwnedWriteHalf;
23#[cfg(windows)]
24use tokio::net::TcpStream;
25
26/// Default socket path for ambient-fsd (Unix)
27#[cfg(unix)]
28pub const DEFAULT_SOCKET_PATH: &str = "/tmp/ambient-fs.sock";
29
30/// Default TCP address for ambient-fsd (Windows)
31#[cfg(windows)]
32pub const DEFAULT_ADDR: &str = "127.0.0.1:9851";
33
34/// Default notification channel buffer size
35const DEFAULT_NOTIFICATION_BUFFER: usize = 256;
36
37/// A generic notification pushed by the server (JSON-RPC notification: has method, no id).
38///
39/// This is the raw wire type. For typed parsing, see [`Notification`].
40#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
41pub struct ClientNotification {
42    pub method: String,
43    #[serde(default)]
44    pub params: JsonValue,
45}
46
47/// Typed notification variants received from daemon (pushed to subscribers).
48///
49/// Use [`AmbientFsClient::recv_notification`] for typed access, or
50/// [`AmbientFsClient::take_notification_stream`] for raw [`ClientNotification`] access.
51#[derive(Clone, Debug, Serialize, Deserialize)]
52#[serde(tag = "method")]
53pub enum Notification {
54    /// Raw file event from the watcher
55    #[serde(rename = "event")]
56    Event { params: FileEvent },
57    /// File awareness state changed
58    #[serde(rename = "awareness_changed")]
59    AwarenessChanged {
60        params: AwarenessChangedParams,
61    },
62    /// File analysis completed
63    #[serde(rename = "analysis_complete")]
64    AnalysisComplete {
65        params: AnalysisCompleteParams,
66    },
67    /// Tree structure changed (patch)
68    #[serde(rename = "tree_patch")]
69    TreePatch { params: TreePatchParams },
70}
71
72/// Params for awareness_changed notification
73#[derive(Clone, Debug, Serialize, Deserialize)]
74pub struct AwarenessChangedParams {
75    pub project_id: String,
76    pub file_path: String,
77    pub awareness: FileAwareness,
78}
79
80/// Params for analysis_complete notification
81#[derive(Clone, Debug, Serialize, Deserialize)]
82pub struct AnalysisCompleteParams {
83    pub project_id: String,
84    pub file_path: String,
85    pub line_count: u32,
86    pub todo_count: u32,
87}
88
89/// Params for tree_patch notification
90#[derive(Clone, Debug, Serialize, Deserialize)]
91pub struct TreePatchParams {
92    pub project_id: String,
93    #[serde(flatten)]
94    pub patch: serde_json::Value,
95}
96
97/// Client for connecting to ambient-fsd daemon.
98///
99/// Internally splits the Unix socket into read/write halves. A background task
100/// reads incoming messages and routes them: responses (has `id`) go to the
101/// matching pending request via oneshot, notifications (has `method`, no `id`)
102/// go to an mpsc channel.
103pub struct AmbientFsClient {
104    socket_path: PathBuf,
105    writer: OwnedWriteHalf,
106    pending: Arc<Mutex<HashMap<u64, oneshot::Sender<Result<JsonValue>>>>>,
107    notification_rx: Option<mpsc::Receiver<ClientNotification>>,
108    reader_handle: JoinHandle<()>,
109    next_id: AtomicU64,
110}
111
112impl std::fmt::Debug for AmbientFsClient {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        f.debug_struct("AmbientFsClient")
115            .field("socket_path", &self.socket_path)
116            .field("next_id", &self.next_id)
117            .finish_non_exhaustive()
118    }
119}
120
121impl Drop for AmbientFsClient {
122    fn drop(&mut self) {
123        self.reader_handle.abort();
124    }
125}
126
127/// JSON-RPC request envelope
128#[derive(Debug, Serialize)]
129struct JsonRpcRequest<'a, T> {
130    jsonrpc: &'static str,
131    method: &'a str,
132    params: T,
133    id: u64,
134}
135
136/// JSON-RPC response envelope (used in tests for parsing validation)
137#[cfg(test)]
138#[derive(Debug, Deserialize)]
139struct JsonRpcResponse {
140    #[allow(dead_code)]
141    jsonrpc: String,
142    #[serde(flatten)]
143    payload: ResponsePayload,
144}
145
146#[cfg(test)]
147#[derive(Debug, Deserialize)]
148#[serde(untagged)]
149enum ResponsePayload {
150    Success { result: JsonValue },
151    Error { error: JsonRpcError },
152}
153
154#[cfg(test)]
155#[derive(Debug, Deserialize)]
156struct JsonRpcError {
157    #[allow(dead_code)]
158    code: i32,
159    message: String,
160}
161
162/// Event filter for queries
163#[derive(Debug, Clone, Serialize, Default)]
164pub struct EventFilter {
165    pub project_id: Option<String>,
166    pub since: Option<i64>, // unix timestamp
167    pub source: Option<String>,
168    pub limit: Option<usize>,
169}
170
171/// Errors from the client
172#[derive(Debug, Error)]
173pub enum ClientError {
174    #[error("IO error: {0}")]
175    Io(#[from] std::io::Error),
176
177    #[error("JSON serialization error: {0}")]
178    JsonSerialize(#[from] serde_json::Error),
179
180    #[error("daemon returned error: {0}")]
181    DaemonError(String),
182
183    #[error("invalid response from daemon")]
184    InvalidResponse,
185
186    #[error("daemon not connected")]
187    NotConnected,
188
189    #[error("request failed: connection closed")]
190    ConnectionClosed,
191}
192
193pub type Result<T> = std::result::Result<T, ClientError>;
194
195impl AmbientFsClient {
196    /// Connect to the daemon at the default endpoint.
197    ///
198    /// On Unix, connects to `/tmp/ambient-fs.sock` (Unix socket).
199    /// On Windows, connects to `127.0.0.1:9851` (TCP).
200    pub async fn connect_local() -> Result<Self> {
201        #[cfg(unix)]
202        {
203            Self::connect(DEFAULT_SOCKET_PATH).await
204        }
205        #[cfg(windows)]
206        {
207            Self::connect(DEFAULT_ADDR).await
208        }
209    }
210
211    /// Connect to the daemon at the specified endpoint.
212    ///
213    /// On Unix, this is a socket path (e.g. `/tmp/ambient-fs.sock`).
214    /// On Windows, this is a TCP address (e.g. `127.0.0.1:9851`).
215    pub async fn connect(endpoint: impl Into<PathBuf>) -> Result<Self> {
216        let endpoint = endpoint.into();
217        #[cfg(unix)]
218        let stream = UnixStream::connect(&endpoint).await?;
219        #[cfg(windows)]
220        let stream = {
221            let addr = endpoint.to_string_lossy().into_owned();
222            TcpStream::connect(&addr).await?
223        };
224        Ok(Self::from_stream(stream, endpoint, DEFAULT_NOTIFICATION_BUFFER))
225    }
226
227    /// Build a client from a pre-connected UnixStream (unix only).
228    #[cfg(unix)]
229    pub(crate) fn from_stream(
230        stream: UnixStream,
231        socket_path: PathBuf,
232        notification_buffer: usize,
233    ) -> Self {
234        let (read_half, write_half) = stream.into_split();
235        Self::from_halves(read_half, write_half, socket_path, notification_buffer)
236    }
237
238    /// Build a client from a pre-connected TcpStream (windows only).
239    #[cfg(windows)]
240    pub(crate) fn from_stream(
241        stream: TcpStream,
242        socket_path: PathBuf,
243        notification_buffer: usize,
244    ) -> Self {
245        let (read_half, write_half) = stream.into_split();
246        Self::from_halves(read_half, write_half, socket_path, notification_buffer)
247    }
248
249    /// Internal: wire up the reader task and channels from pre-split halves.
250    fn from_halves<R: tokio::io::AsyncRead + Unpin + Send + 'static>(
251        read_half: R,
252        write_half: OwnedWriteHalf,
253        socket_path: PathBuf,
254        notification_buffer: usize,
255    ) -> Self {
256        let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<Result<JsonValue>>>>> =
257            Arc::new(Mutex::new(HashMap::new()));
258        let (notification_tx, notification_rx) = mpsc::channel(notification_buffer);
259
260        let reader_pending = pending.clone();
261        let reader_handle = tokio::spawn(async move {
262            let mut reader = BufReader::new(read_half);
263            let mut line = String::new();
264            loop {
265                line.clear();
266                match reader.read_line(&mut line).await {
267                    Ok(0) => break, // EOF
268                    Ok(_) => {
269                        let trimmed = line.trim();
270                        if trimmed.is_empty() {
271                            continue;
272                        }
273                        match serde_json::from_str::<JsonValue>(trimmed) {
274                            Ok(msg) => {
275                                if let Some(id) = msg.get("id").and_then(|v| v.as_u64()) {
276                                    // Response to a pending request
277                                    let mut map = reader_pending.lock().await;
278                                    if let Some(tx) = map.remove(&id) {
279                                        let result = if let Some(err) = msg.get("error") {
280                                            let message = err
281                                                .get("message")
282                                                .and_then(|m| m.as_str())
283                                                .unwrap_or("unknown error")
284                                                .to_string();
285                                            Err(ClientError::DaemonError(message))
286                                        } else if let Some(result) = msg.get("result") {
287                                            Ok(result.clone())
288                                        } else {
289                                            Err(ClientError::InvalidResponse)
290                                        };
291                                        let _ = tx.send(result);
292                                    }
293                                } else if msg.get("method").is_some() {
294                                    // Server-pushed notification (no id)
295                                    let method = msg["method"]
296                                        .as_str()
297                                        .unwrap_or("")
298                                        .to_string();
299                                    let params = msg
300                                        .get("params")
301                                        .cloned()
302                                        .unwrap_or(JsonValue::Null);
303                                    let notif = ClientNotification { method, params };
304                                    match notification_tx.try_send(notif) {
305                                        Ok(()) => {}
306                                        Err(mpsc::error::TrySendError::Full(_)) => {
307                                            tracing::warn!(
308                                                "notification channel full, dropping"
309                                            );
310                                        }
311                                        Err(mpsc::error::TrySendError::Closed(_)) => {
312                                            break;
313                                        }
314                                    }
315                                } else {
316                                    tracing::warn!(
317                                        "unknown message from daemon: {}",
318                                        trimmed
319                                    );
320                                }
321                            }
322                            Err(e) => {
323                                tracing::warn!("failed to parse daemon message: {}", e);
324                            }
325                        }
326                    }
327                    Err(e) => {
328                        tracing::warn!("reader error: {}", e);
329                        break;
330                    }
331                }
332            }
333            // Connection closed - fail all pending requests
334            let mut map = reader_pending.lock().await;
335            for (_, tx) in map.drain() {
336                let _ = tx.send(Err(ClientError::ConnectionClosed));
337            }
338        });
339
340        Self {
341            socket_path,
342            writer: write_half,
343            pending,
344            notification_rx: Some(notification_rx),
345            reader_handle,
346            next_id: AtomicU64::new(1),
347        }
348    }
349
350    /// Take ownership of the raw notification receiver.
351    ///
352    /// Returns the mpsc::Receiver for server-pushed notifications as generic
353    /// [`ClientNotification`] values. Can only be called once; subsequent
354    /// calls return None.
355    ///
356    /// Note: after calling this, [`recv_notification`](Self::recv_notification)
357    /// will return `Err(NotConnected)` since the receiver has been moved out.
358    pub fn take_notification_stream(&mut self) -> Option<mpsc::Receiver<ClientNotification>> {
359        self.notification_rx.take()
360    }
361
362    /// Receive a typed notification from the daemon (blocking).
363    ///
364    /// After subscribing to projects, call this in a loop to receive pushed
365    /// notifications. Returns `Ok(None)` if the connection closed.
366    ///
367    /// This is mutually exclusive with [`take_notification_stream`](Self::take_notification_stream).
368    /// If the notification stream has been taken, returns `Err(NotConnected)`.
369    pub async fn recv_notification(&mut self) -> Result<Option<Notification>> {
370        let rx = self.notification_rx.as_mut().ok_or(ClientError::NotConnected)?;
371        match rx.recv().await {
372            Some(raw) => {
373                let value = json!({
374                    "method": raw.method,
375                    "params": raw.params,
376                });
377                let notification: Notification = serde_json::from_value(value)
378                    .map_err(|e| ClientError::DaemonError(
379                        format!("invalid notification: {}", e),
380                    ))?;
381                Ok(Some(notification))
382            }
383            None => Ok(None),
384        }
385    }
386
387    /// Check if the client's reader task is still running (connection alive).
388    pub fn is_connected(&self) -> bool {
389        !self.reader_handle.is_finished()
390    }
391
392    /// Watch a directory path for events
393    pub async fn watch(&mut self, path: &str) -> Result<()> {
394        let params = json!({ "path": path });
395        self.send_request("watch", &params).await?;
396        Ok(())
397    }
398
399    /// Query events with optional filter
400    pub async fn events(&mut self, filter: EventFilter) -> Result<Vec<FileEvent>> {
401        let response = self.send_request("events", &filter).await?;
402        serde_json::from_value(response).map_err(|_| ClientError::InvalidResponse)
403    }
404
405    /// Subscribe to events for a project
406    pub async fn subscribe(&mut self, project_id: &str) -> Result<()> {
407        let params = json!({ "project_id": project_id });
408        self.send_request("subscribe", &params).await?;
409        Ok(())
410    }
411
412    // ===== protocol methods matching server =====
413
414    /// Watch a project directory and get its project_id
415    pub async fn watch_project(&mut self, path: &str) -> Result<String> {
416        let params = json!({ "path": path });
417        let response = self.send_request("watch_project", &params).await?;
418        serde_json::from_value(response).map_err(|_| ClientError::InvalidResponse)
419    }
420
421    /// Unwatch a project by ID
422    pub async fn unwatch_project(&mut self, project_id: &str) -> Result<()> {
423        let params = json!({ "project_id": project_id });
424        self.send_request("unwatch_project", &params).await?;
425        Ok(())
426    }
427
428    /// Unsubscribe from project notifications
429    pub async fn unsubscribe(&mut self, project_id: &str) -> Result<()> {
430        let params = json!({ "project_id": project_id });
431        self.send_request("unsubscribe", &params).await?;
432        Ok(())
433    }
434
435    /// Query events with filter (renamed from events)
436    pub async fn query_events(&mut self, filter: EventFilter) -> Result<Vec<FileEvent>> {
437        let response = self.send_request("query_events", &filter).await?;
438        serde_json::from_value(response).map_err(|_| ClientError::InvalidResponse)
439    }
440
441    /// Query awareness for a file in a project
442    pub async fn query_awareness(
443        &mut self,
444        project_id: &str,
445        path: &str,
446    ) -> Result<FileAwareness> {
447        let params = json!({
448            "project_id": project_id,
449            "path": path,
450        });
451        let response = self.send_request("query_awareness", &params).await?;
452        serde_json::from_value(response).map_err(|_| ClientError::InvalidResponse)
453    }
454
455    /// Attribute a file change to a specific source
456    pub async fn attribute(
457        &mut self,
458        project_id: &str,
459        file_path: &str,
460        source: &str,
461        source_id: Option<&str>,
462    ) -> Result<()> {
463        let mut params = json!({
464            "project_id": project_id,
465            "file_path": file_path,
466            "source": source,
467        });
468        if let Some(sid) = source_id {
469            params["source_id"] = json!(sid);
470        }
471        self.send_request("attribute", &params).await?;
472        Ok(())
473    }
474
475    /// Query active agents (returns generic JSON since AgentInfo not defined yet)
476    pub async fn query_agents(&mut self) -> Result<Vec<serde_json::Value>> {
477        let empty = json!({});
478        let response = self.send_request("query_agents", &empty).await?;
479        serde_json::from_value(response).map_err(|_| ClientError::InvalidResponse)
480    }
481
482    /// Send a JSON-RPC request and get the response.
483    ///
484    /// Registers a oneshot channel in the pending map, writes the request,
485    /// and awaits the response routed by the background reader task.
486    async fn send_request<T: Serialize>(
487        &mut self,
488        method: &str,
489        params: &T,
490    ) -> Result<JsonValue> {
491        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
492
493        let request = JsonRpcRequest {
494            jsonrpc: "2.0",
495            method,
496            params,
497            id,
498        };
499
500        let mut request_json = serde_json::to_string(&request)?;
501        request_json.push('\n');
502        tracing::debug!("sending request: {}", request_json.trim());
503
504        // Register pending response before writing
505        let (tx, rx) = oneshot::channel();
506        {
507            let mut map = self.pending.lock().await;
508            map.insert(id, tx);
509        }
510
511        // Write request - clean up pending on failure
512        if let Err(e) = self.writer.write_all(request_json.as_bytes()).await {
513            self.pending.lock().await.remove(&id);
514            return Err(ClientError::Io(e));
515        }
516
517        // Wait for response from reader task
518        rx.await.map_err(|_| ClientError::ConnectionClosed)?
519    }
520}
521
522#[cfg(test)]
523mod tests {
524    use super::*;
525    use ambient_fs_core::{EventType, Source};
526
527    // ===== unix-only tests (use UnixStream::pair for mock) =====
528
529    #[cfg(unix)]
530    mod unix_stream_tests {
531        use super::*;
532
533        fn mock_client() -> (AmbientFsClient, UnixStream) {
534            let (client_stream, server_stream) = UnixStream::pair().unwrap();
535            let client = AmbientFsClient::from_stream(
536                client_stream,
537                PathBuf::from("/tmp/test.sock"),
538                256,
539            );
540            (client, server_stream)
541        }
542
543        #[tokio::test]
544        async fn reader_routes_response_by_id() {
545            let (client, mut server) = mock_client();
546
547            let (tx10, rx10) = oneshot::channel();
548            let (tx20, rx20) = oneshot::channel();
549            {
550                let mut map = client.pending.lock().await;
551                map.insert(10, tx10);
552                map.insert(20, tx20);
553            }
554
555            let r20 = format!("{}\n", json!({"jsonrpc":"2.0","result":"twenty","id":20}));
556            let r10 = format!("{}\n", json!({"jsonrpc":"2.0","result":"ten","id":10}));
557            server.write_all(r20.as_bytes()).await.unwrap();
558            server.write_all(r10.as_bytes()).await.unwrap();
559
560            let result20 = rx20.await.unwrap().unwrap();
561            let result10 = rx10.await.unwrap().unwrap();
562            assert_eq!(result20, json!("twenty"));
563            assert_eq!(result10, json!("ten"));
564        }
565
566        #[tokio::test]
567        async fn reader_routes_error_response() {
568            let (client, mut server) = mock_client();
569
570            let (tx, rx) = oneshot::channel();
571            {
572                client.pending.lock().await.insert(1, tx);
573            }
574
575            let resp = format!(
576                "{}\n",
577                json!({"jsonrpc":"2.0","error":{"code":-32000,"message":"not found"},"id":1})
578            );
579            server.write_all(resp.as_bytes()).await.unwrap();
580
581            let result = rx.await.unwrap();
582            assert!(matches!(result, Err(ClientError::DaemonError(msg)) if msg == "not found"));
583        }
584
585        #[tokio::test]
586        async fn reader_routes_notification_to_channel() {
587            let (mut client, mut server) = mock_client();
588            let mut rx = client.take_notification_stream().unwrap();
589
590            let notif = format!(
591                "{}\n",
592                json!({"jsonrpc":"2.0","method":"event","params":{"path":"src/lib.rs"}})
593            );
594            server.write_all(notif.as_bytes()).await.unwrap();
595
596            let received = rx.recv().await.unwrap();
597            assert_eq!(received.method, "event");
598            assert_eq!(received.params["path"], "src/lib.rs");
599        }
600
601        #[tokio::test]
602        async fn take_notification_stream_returns_none_on_second_call() {
603            let (mut client, _server) = mock_client();
604            assert!(client.take_notification_stream().is_some());
605            assert!(client.take_notification_stream().is_none());
606        }
607
608        #[tokio::test]
609        async fn send_request_receives_response() {
610            let (mut client, server) = mock_client();
611
612            tokio::spawn(async move {
613                let (read_half, mut write_half) = server.into_split();
614                let mut reader = BufReader::new(read_half);
615                let mut line = String::new();
616                reader.read_line(&mut line).await.unwrap();
617                let req: JsonValue = serde_json::from_str(&line).unwrap();
618                let id = req["id"].as_u64().unwrap();
619                assert_eq!(req["method"], "watch");
620
621                let resp = format!("{}\n", json!({"jsonrpc":"2.0","result":"ok","id":id}));
622                write_half.write_all(resp.as_bytes()).await.unwrap();
623            });
624
625            client.watch("/test/path").await.unwrap();
626        }
627
628        #[tokio::test]
629        async fn send_request_receives_error_response() {
630            let (mut client, server) = mock_client();
631
632            tokio::spawn(async move {
633                let (read_half, mut write_half) = server.into_split();
634                let mut reader = BufReader::new(read_half);
635                let mut line = String::new();
636                reader.read_line(&mut line).await.unwrap();
637                let req: JsonValue = serde_json::from_str(&line).unwrap();
638                let id = req["id"].as_u64().unwrap();
639
640                let resp = format!(
641                    "{}\n",
642                    json!({"jsonrpc":"2.0","error":{"code":-32000,"message":"project not found"},"id":id})
643                );
644                write_half.write_all(resp.as_bytes()).await.unwrap();
645            });
646
647            let result = client.watch_project("/nonexistent").await;
648            assert!(matches!(result, Err(ClientError::DaemonError(msg)) if msg == "project not found"));
649        }
650
651        #[tokio::test]
652        async fn connection_closed_fails_pending_requests() {
653            let (mut client, server) = mock_client();
654            drop(server);
655
656            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
657
658            let result = client.watch("/test").await;
659            assert!(matches!(
660                result,
661                Err(ClientError::ConnectionClosed) | Err(ClientError::Io(_))
662            ));
663        }
664
665        #[tokio::test]
666        async fn notification_channel_closed_after_drop() {
667            let (mut client, mut server) = mock_client();
668            let mut rx = client.take_notification_stream().unwrap();
669
670            let notif = format!("{}\n", json!({"jsonrpc":"2.0","method":"ping","params":{}}));
671            server.write_all(notif.as_bytes()).await.unwrap();
672            let _ = rx.recv().await.unwrap();
673
674            drop(client);
675            drop(server);
676
677            assert!(rx.recv().await.is_none());
678        }
679
680        #[tokio::test]
681        async fn request_ids_increment() {
682            let (mut client, server) = mock_client();
683
684            tokio::spawn(async move {
685                let (read_half, mut write_half) = server.into_split();
686                let mut reader = BufReader::new(read_half);
687                for _ in 0..2 {
688                    let mut line = String::new();
689                    reader.read_line(&mut line).await.unwrap();
690                    let req: JsonValue = serde_json::from_str(&line).unwrap();
691                    let id = req["id"].as_u64().unwrap();
692                    let resp = format!("{}\n", json!({"jsonrpc":"2.0","result":"ok","id":id}));
693                    write_half.write_all(resp.as_bytes()).await.unwrap();
694                }
695            });
696
697            client.watch("/path1").await.unwrap();
698            client.watch("/path2").await.unwrap();
699            assert_eq!(client.next_id.load(Ordering::Relaxed), 3);
700        }
701
702        #[tokio::test]
703        async fn client_stores_socket_path() {
704            let (client, _server) = mock_client();
705            assert_eq!(client.socket_path, PathBuf::from("/tmp/test.sock"));
706        }
707
708        #[tokio::test]
709        async fn is_connected_reflects_reader_state() {
710            let (client, server) = mock_client();
711            assert!(client.is_connected());
712
713            drop(server);
714            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
715            assert!(!client.is_connected());
716        }
717
718        #[tokio::test]
719        async fn interleaved_notifications_and_responses() {
720            let (mut client, server) = mock_client();
721            let mut rx = client.take_notification_stream().unwrap();
722
723            tokio::spawn(async move {
724                let (read_half, mut write_half) = server.into_split();
725                let mut reader = BufReader::new(read_half);
726
727                let mut line = String::new();
728                reader.read_line(&mut line).await.unwrap();
729                let req: JsonValue = serde_json::from_str(&line).unwrap();
730                let id = req["id"].as_u64().unwrap();
731
732                let notif = format!(
733                    "{}\n",
734                    json!({"jsonrpc":"2.0","method":"event","params":{"type":"created"}})
735                );
736                write_half.write_all(notif.as_bytes()).await.unwrap();
737
738                let resp = format!("{}\n", json!({"jsonrpc":"2.0","result":"ok","id":id}));
739                write_half.write_all(resp.as_bytes()).await.unwrap();
740            });
741
742            client.watch("/test").await.unwrap();
743
744            let notif = rx.recv().await.unwrap();
745            assert_eq!(notif.method, "event");
746            assert_eq!(notif.params["type"], "created");
747        }
748
749        #[tokio::test]
750        async fn recv_notification_returns_typed() {
751            let (mut client, mut server) = mock_client();
752
753            let notif = format!(
754                "{}\n",
755                json!({
756                    "jsonrpc": "2.0",
757                    "method": "analysis_complete",
758                    "params": {
759                        "project_id": "proj-1",
760                        "file_path": "src/main.rs",
761                        "line_count": 42,
762                        "todo_count": 3
763                    }
764                })
765            );
766            server.write_all(notif.as_bytes()).await.unwrap();
767
768            let typed = client.recv_notification().await.unwrap().unwrap();
769            match typed {
770                Notification::AnalysisComplete { params } => {
771                    assert_eq!(params.project_id, "proj-1");
772                    assert_eq!(params.line_count, 42);
773                }
774                _ => panic!("expected AnalysisComplete"),
775            }
776        }
777
778        #[tokio::test]
779        async fn recv_notification_fails_after_take() {
780            let (mut client, _server) = mock_client();
781            let _rx = client.take_notification_stream().unwrap();
782
783            let result = client.recv_notification().await;
784            assert!(matches!(result, Err(ClientError::NotConnected)));
785        }
786    }
787
788    // ===== platform-independent serialization/parsing tests =====
789
790    #[tokio::test]
791    async fn notification_serde_roundtrip() {
792        let notif = ClientNotification {
793            method: "event".to_string(),
794            params: json!({"project_id": "proj-1", "path": "src/main.rs"}),
795        };
796        let serialized = serde_json::to_string(&notif).unwrap();
797        let parsed: ClientNotification = serde_json::from_str(&serialized).unwrap();
798        assert_eq!(parsed, notif);
799    }
800
801    #[tokio::test]
802    async fn notification_deserialize_without_params() {
803        let raw = r#"{"method":"ping"}"#;
804        let notif: ClientNotification = serde_json::from_str(raw).unwrap();
805        assert_eq!(notif.method, "ping");
806        assert_eq!(notif.params, JsonValue::Null);
807    }
808
809    #[tokio::test]
810    async fn events_with_filter_sends_params() {
811        let filter = EventFilter {
812            project_id: Some("my-project".to_string()),
813            since: Some(1708100000),
814            source: Some("ai_agent".to_string()),
815            limit: Some(100),
816        };
817        let json = serde_json::to_string(&filter).unwrap();
818        assert!(json.contains("my-project"));
819        assert!(json.contains("ai_agent"));
820    }
821
822    #[tokio::test]
823    async fn subscribe_sends_project_id() {
824        let params = json!({ "project_id": "my-project" });
825        let json = serde_json::to_string(&params).unwrap();
826        assert!(json.contains("my-project"));
827    }
828
829    #[tokio::test]
830    async fn events_parses_daemon_response() {
831        let event_json = r#"{
832            "jsonrpc":"2.0",
833            "result":[{
834                "timestamp":"2024-02-16T10:32:00Z",
835                "event_type":"created",
836                "file_path":"src/main.rs",
837                "project_id":"my-project",
838                "source":"ai_agent",
839                "source_id":"chat_42",
840                "machine_id":"machine-1",
841                "content_hash":"abc123"
842            }],
843            "id":1
844        }"#;
845
846        let response: JsonRpcResponse = serde_json::from_str(event_json).unwrap();
847        match response.payload {
848            ResponsePayload::Success { result } => {
849                let events: Vec<FileEvent> = serde_json::from_value(result).unwrap();
850                assert_eq!(events.len(), 1);
851                assert_eq!(events[0].file_path, "src/main.rs");
852                assert_eq!(events[0].source, Source::AiAgent);
853            }
854            ResponsePayload::Error { .. } => panic!("expected success"),
855        }
856    }
857
858    #[tokio::test]
859    async fn daemon_error_is_propagated() {
860        let error_json = r#"{
861            "jsonrpc":"2.0",
862            "error":{"code":-32000,"message":"project not found"},
863            "id":1
864        }"#;
865
866        let response: JsonRpcResponse = serde_json::from_str(error_json).unwrap();
867        match response.payload {
868            ResponsePayload::Error { error } => {
869                assert_eq!(error.code, -32000);
870                assert_eq!(error.message, "project not found");
871            }
872            ResponsePayload::Success { .. } => panic!("expected error"),
873        }
874    }
875
876    #[tokio::test]
877    async fn event_filter_default_is_empty() {
878        let filter = EventFilter::default();
879        assert!(filter.project_id.is_none());
880        assert!(filter.since.is_none());
881        assert!(filter.source.is_none());
882        assert!(filter.limit.is_none());
883    }
884
885    #[tokio::test]
886    async fn jsonrpc_request_serialization() {
887        let request = JsonRpcRequest {
888            jsonrpc: "2.0",
889            method: "watch",
890            params: json!({ "path": "/home/user/project" }),
891            id: 1,
892        };
893
894        let json = serde_json::to_string(&request).unwrap();
895        assert!(json.contains(r#""jsonrpc":"2.0""#));
896        assert!(json.contains(r#""method":"watch""#));
897        assert!(json.contains(r#""id":1"#));
898        assert!(json.contains(r#""path""#));
899    }
900
901    #[tokio::test]
902    async fn multiple_events_parsed_correctly() {
903        let events_json = r#"{
904            "jsonrpc":"2.0",
905            "result":[
906                {
907                    "timestamp":"2024-02-16T10:32:00Z",
908                    "event_type":"created",
909                    "file_path":"src/main.rs",
910                    "project_id":"my-project",
911                    "source":"user",
912                    "machine_id":"m1"
913                },
914                {
915                    "timestamp":"2024-02-16T10:33:00Z",
916                    "event_type":"modified",
917                    "file_path":"src/lib.rs",
918                    "project_id":"my-project",
919                    "source":"ai_agent",
920                    "source_id":"chat_42",
921                    "machine_id":"m1"
922                }
923            ],
924            "id":1
925        }"#;
926
927        let response: JsonRpcResponse = serde_json::from_str(events_json).unwrap();
928        match response.payload {
929            ResponsePayload::Success { result } => {
930                let events: Vec<FileEvent> = serde_json::from_value(result).unwrap();
931                assert_eq!(events.len(), 2);
932                assert_eq!(events[0].event_type, EventType::Created);
933                assert_eq!(events[1].event_type, EventType::Modified);
934            }
935            ResponsePayload::Error { .. } => panic!("expected success"),
936        }
937    }
938
939    #[test]
940    fn client_error_display() {
941        let err = ClientError::NotConnected;
942        assert_eq!(err.to_string(), "daemon not connected");
943
944        let err = ClientError::DaemonError("something broke".to_string());
945        assert_eq!(err.to_string(), "daemon returned error: something broke");
946
947        let err = ClientError::ConnectionClosed;
948        assert_eq!(err.to_string(), "request failed: connection closed");
949    }
950
951    #[tokio::test]
952    async fn attribute_request_serialization() {
953        let params = json!({
954            "project_id": "my-project",
955            "file_path": "src/auth.rs",
956            "source": "ai_agent",
957            "source_id": "chat-42"
958        });
959        let json = serde_json::to_string(&params).unwrap();
960        assert!(json.contains("my-project"));
961        assert!(json.contains("src/auth.rs"));
962        assert!(json.contains("ai_agent"));
963        assert!(json.contains("chat-42"));
964    }
965
966    #[tokio::test]
967    async fn attribute_request_without_source_id() {
968        let params = json!({
969            "project_id": "my-project",
970            "file_path": "src/auth.rs",
971            "source": "user"
972        });
973        let json = serde_json::to_string(&params).unwrap();
974        assert!(json.contains("user"));
975        assert!(!json.contains("source_id"));
976    }
977
978    #[tokio::test]
979    async fn watch_project_response_parsing() {
980        let response_json = r#"{
981            "jsonrpc":"2.0",
982            "result":"proj-abc-123",
983            "id":1
984        }"#;
985
986        let response: JsonRpcResponse = serde_json::from_str(response_json).unwrap();
987        match response.payload {
988            ResponsePayload::Success { result } => {
989                let project_id: String = serde_json::from_value(result).unwrap();
990                assert_eq!(project_id, "proj-abc-123");
991            }
992            ResponsePayload::Error { .. } => panic!("expected success"),
993        }
994    }
995
996    #[tokio::test]
997    async fn query_awareness_response_parsing() {
998        let awareness_json = r#"{
999            "jsonrpc":"2.0",
1000            "result":{
1001                "file_path":"src/main.rs",
1002                "project_id":"proj-123",
1003                "last_modified":"2024-02-16T10:32:00Z",
1004                "change_frequency":"hot",
1005                "modified_by":"ai_agent",
1006                "todo_count":0,
1007                "chat_references":0,
1008                "lint_hints":0,
1009                "line_count":100
1010            },
1011            "id":1
1012        }"#;
1013
1014        let response: JsonRpcResponse = serde_json::from_str(awareness_json).unwrap();
1015        match response.payload {
1016            ResponsePayload::Success { result } => {
1017                let awareness: FileAwareness = serde_json::from_value(result).unwrap();
1018                assert_eq!(awareness.file_path, "src/main.rs");
1019                assert_eq!(awareness.modified_by, ambient_fs_core::Source::AiAgent);
1020            }
1021            ResponsePayload::Error { .. } => panic!("expected success"),
1022        }
1023    }
1024
1025    #[tokio::test]
1026    async fn query_agents_response_parsing() {
1027        let agents_json = r#"{
1028            "jsonrpc":"2.0",
1029            "result":[
1030                {"id":"agent-1","name":"claude","status":"active"},
1031                {"id":"agent-2","name":"cursor","status":"idle"}
1032            ],
1033            "id":1
1034        }"#;
1035
1036        let response: JsonRpcResponse = serde_json::from_str(agents_json).unwrap();
1037        match response.payload {
1038            ResponsePayload::Success { result } => {
1039                let agents: Vec<serde_json::Value> = serde_json::from_value(result).unwrap();
1040                assert_eq!(agents.len(), 2);
1041                assert_eq!(agents[0]["name"], "claude");
1042            }
1043            ResponsePayload::Error { .. } => panic!("expected success"),
1044        }
1045    }
1046}