1use ambient_fs_core::{awareness::FileAwareness, FileEvent};
2use serde::{Deserialize, Serialize};
3use serde_json::{json, Value as JsonValue};
4use std::collections::HashMap;
5use std::path::PathBuf;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::Arc;
8use thiserror::Error;
9use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
10use tokio::sync::{mpsc, oneshot, Mutex};
11use tokio::task::JoinHandle;
12
13#[cfg(unix)]
17use tokio::net::unix::OwnedWriteHalf;
18#[cfg(unix)]
19use tokio::net::UnixStream;
20
21#[cfg(windows)]
22use tokio::net::tcp::OwnedWriteHalf;
23#[cfg(windows)]
24use tokio::net::TcpStream;
25
26#[cfg(unix)]
28pub const DEFAULT_SOCKET_PATH: &str = "/tmp/ambient-fs.sock";
29
30#[cfg(windows)]
32pub const DEFAULT_ADDR: &str = "127.0.0.1:9851";
33
34const DEFAULT_NOTIFICATION_BUFFER: usize = 256;
36
37#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
41pub struct ClientNotification {
42 pub method: String,
43 #[serde(default)]
44 pub params: JsonValue,
45}
46
47#[derive(Clone, Debug, Serialize, Deserialize)]
52#[serde(tag = "method")]
53pub enum Notification {
54 #[serde(rename = "event")]
56 Event { params: FileEvent },
57 #[serde(rename = "awareness_changed")]
59 AwarenessChanged {
60 params: AwarenessChangedParams,
61 },
62 #[serde(rename = "analysis_complete")]
64 AnalysisComplete {
65 params: AnalysisCompleteParams,
66 },
67 #[serde(rename = "tree_patch")]
69 TreePatch { params: TreePatchParams },
70}
71
72#[derive(Clone, Debug, Serialize, Deserialize)]
74pub struct AwarenessChangedParams {
75 pub project_id: String,
76 pub file_path: String,
77 pub awareness: FileAwareness,
78}
79
80#[derive(Clone, Debug, Serialize, Deserialize)]
82pub struct AnalysisCompleteParams {
83 pub project_id: String,
84 pub file_path: String,
85 pub line_count: u32,
86 pub todo_count: u32,
87}
88
89#[derive(Clone, Debug, Serialize, Deserialize)]
91pub struct TreePatchParams {
92 pub project_id: String,
93 #[serde(flatten)]
94 pub patch: serde_json::Value,
95}
96
97pub struct AmbientFsClient {
104 socket_path: PathBuf,
105 writer: OwnedWriteHalf,
106 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<Result<JsonValue>>>>>,
107 notification_rx: Option<mpsc::Receiver<ClientNotification>>,
108 reader_handle: JoinHandle<()>,
109 next_id: AtomicU64,
110}
111
112impl std::fmt::Debug for AmbientFsClient {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 f.debug_struct("AmbientFsClient")
115 .field("socket_path", &self.socket_path)
116 .field("next_id", &self.next_id)
117 .finish_non_exhaustive()
118 }
119}
120
121impl Drop for AmbientFsClient {
122 fn drop(&mut self) {
123 self.reader_handle.abort();
124 }
125}
126
127#[derive(Debug, Serialize)]
129struct JsonRpcRequest<'a, T> {
130 jsonrpc: &'static str,
131 method: &'a str,
132 params: T,
133 id: u64,
134}
135
136#[cfg(test)]
138#[derive(Debug, Deserialize)]
139struct JsonRpcResponse {
140 #[allow(dead_code)]
141 jsonrpc: String,
142 #[serde(flatten)]
143 payload: ResponsePayload,
144}
145
146#[cfg(test)]
147#[derive(Debug, Deserialize)]
148#[serde(untagged)]
149enum ResponsePayload {
150 Success { result: JsonValue },
151 Error { error: JsonRpcError },
152}
153
154#[cfg(test)]
155#[derive(Debug, Deserialize)]
156struct JsonRpcError {
157 #[allow(dead_code)]
158 code: i32,
159 message: String,
160}
161
162#[derive(Debug, Clone, Serialize, Default)]
164pub struct EventFilter {
165 pub project_id: Option<String>,
166 pub since: Option<i64>, pub source: Option<String>,
168 pub limit: Option<usize>,
169}
170
171#[derive(Debug, Error)]
173pub enum ClientError {
174 #[error("IO error: {0}")]
175 Io(#[from] std::io::Error),
176
177 #[error("JSON serialization error: {0}")]
178 JsonSerialize(#[from] serde_json::Error),
179
180 #[error("daemon returned error: {0}")]
181 DaemonError(String),
182
183 #[error("invalid response from daemon")]
184 InvalidResponse,
185
186 #[error("daemon not connected")]
187 NotConnected,
188
189 #[error("request failed: connection closed")]
190 ConnectionClosed,
191}
192
193pub type Result<T> = std::result::Result<T, ClientError>;
194
195impl AmbientFsClient {
196 pub async fn connect_local() -> Result<Self> {
201 #[cfg(unix)]
202 {
203 Self::connect(DEFAULT_SOCKET_PATH).await
204 }
205 #[cfg(windows)]
206 {
207 Self::connect(DEFAULT_ADDR).await
208 }
209 }
210
211 pub async fn connect(endpoint: impl Into<PathBuf>) -> Result<Self> {
216 let endpoint = endpoint.into();
217 #[cfg(unix)]
218 let stream = UnixStream::connect(&endpoint).await?;
219 #[cfg(windows)]
220 let stream = {
221 let addr = endpoint.to_string_lossy().into_owned();
222 TcpStream::connect(&addr).await?
223 };
224 Ok(Self::from_stream(stream, endpoint, DEFAULT_NOTIFICATION_BUFFER))
225 }
226
227 #[cfg(unix)]
229 pub(crate) fn from_stream(
230 stream: UnixStream,
231 socket_path: PathBuf,
232 notification_buffer: usize,
233 ) -> Self {
234 let (read_half, write_half) = stream.into_split();
235 Self::from_halves(read_half, write_half, socket_path, notification_buffer)
236 }
237
238 #[cfg(windows)]
240 pub(crate) fn from_stream(
241 stream: TcpStream,
242 socket_path: PathBuf,
243 notification_buffer: usize,
244 ) -> Self {
245 let (read_half, write_half) = stream.into_split();
246 Self::from_halves(read_half, write_half, socket_path, notification_buffer)
247 }
248
249 fn from_halves<R: tokio::io::AsyncRead + Unpin + Send + 'static>(
251 read_half: R,
252 write_half: OwnedWriteHalf,
253 socket_path: PathBuf,
254 notification_buffer: usize,
255 ) -> Self {
256 let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<Result<JsonValue>>>>> =
257 Arc::new(Mutex::new(HashMap::new()));
258 let (notification_tx, notification_rx) = mpsc::channel(notification_buffer);
259
260 let reader_pending = pending.clone();
261 let reader_handle = tokio::spawn(async move {
262 let mut reader = BufReader::new(read_half);
263 let mut line = String::new();
264 loop {
265 line.clear();
266 match reader.read_line(&mut line).await {
267 Ok(0) => break, Ok(_) => {
269 let trimmed = line.trim();
270 if trimmed.is_empty() {
271 continue;
272 }
273 match serde_json::from_str::<JsonValue>(trimmed) {
274 Ok(msg) => {
275 if let Some(id) = msg.get("id").and_then(|v| v.as_u64()) {
276 let mut map = reader_pending.lock().await;
278 if let Some(tx) = map.remove(&id) {
279 let result = if let Some(err) = msg.get("error") {
280 let message = err
281 .get("message")
282 .and_then(|m| m.as_str())
283 .unwrap_or("unknown error")
284 .to_string();
285 Err(ClientError::DaemonError(message))
286 } else if let Some(result) = msg.get("result") {
287 Ok(result.clone())
288 } else {
289 Err(ClientError::InvalidResponse)
290 };
291 let _ = tx.send(result);
292 }
293 } else if msg.get("method").is_some() {
294 let method = msg["method"]
296 .as_str()
297 .unwrap_or("")
298 .to_string();
299 let params = msg
300 .get("params")
301 .cloned()
302 .unwrap_or(JsonValue::Null);
303 let notif = ClientNotification { method, params };
304 match notification_tx.try_send(notif) {
305 Ok(()) => {}
306 Err(mpsc::error::TrySendError::Full(_)) => {
307 tracing::warn!(
308 "notification channel full, dropping"
309 );
310 }
311 Err(mpsc::error::TrySendError::Closed(_)) => {
312 break;
313 }
314 }
315 } else {
316 tracing::warn!(
317 "unknown message from daemon: {}",
318 trimmed
319 );
320 }
321 }
322 Err(e) => {
323 tracing::warn!("failed to parse daemon message: {}", e);
324 }
325 }
326 }
327 Err(e) => {
328 tracing::warn!("reader error: {}", e);
329 break;
330 }
331 }
332 }
333 let mut map = reader_pending.lock().await;
335 for (_, tx) in map.drain() {
336 let _ = tx.send(Err(ClientError::ConnectionClosed));
337 }
338 });
339
340 Self {
341 socket_path,
342 writer: write_half,
343 pending,
344 notification_rx: Some(notification_rx),
345 reader_handle,
346 next_id: AtomicU64::new(1),
347 }
348 }
349
350 pub fn take_notification_stream(&mut self) -> Option<mpsc::Receiver<ClientNotification>> {
359 self.notification_rx.take()
360 }
361
362 pub async fn recv_notification(&mut self) -> Result<Option<Notification>> {
370 let rx = self.notification_rx.as_mut().ok_or(ClientError::NotConnected)?;
371 match rx.recv().await {
372 Some(raw) => {
373 let value = json!({
374 "method": raw.method,
375 "params": raw.params,
376 });
377 let notification: Notification = serde_json::from_value(value)
378 .map_err(|e| ClientError::DaemonError(
379 format!("invalid notification: {}", e),
380 ))?;
381 Ok(Some(notification))
382 }
383 None => Ok(None),
384 }
385 }
386
387 pub fn is_connected(&self) -> bool {
389 !self.reader_handle.is_finished()
390 }
391
392 pub async fn watch(&mut self, path: &str) -> Result<()> {
394 let params = json!({ "path": path });
395 self.send_request("watch", ¶ms).await?;
396 Ok(())
397 }
398
399 pub async fn events(&mut self, filter: EventFilter) -> Result<Vec<FileEvent>> {
401 let response = self.send_request("events", &filter).await?;
402 serde_json::from_value(response).map_err(|_| ClientError::InvalidResponse)
403 }
404
405 pub async fn subscribe(&mut self, project_id: &str) -> Result<()> {
407 let params = json!({ "project_id": project_id });
408 self.send_request("subscribe", ¶ms).await?;
409 Ok(())
410 }
411
412 pub async fn watch_project(&mut self, path: &str) -> Result<String> {
416 let params = json!({ "path": path });
417 let response = self.send_request("watch_project", ¶ms).await?;
418 serde_json::from_value(response).map_err(|_| ClientError::InvalidResponse)
419 }
420
421 pub async fn unwatch_project(&mut self, project_id: &str) -> Result<()> {
423 let params = json!({ "project_id": project_id });
424 self.send_request("unwatch_project", ¶ms).await?;
425 Ok(())
426 }
427
428 pub async fn unsubscribe(&mut self, project_id: &str) -> Result<()> {
430 let params = json!({ "project_id": project_id });
431 self.send_request("unsubscribe", ¶ms).await?;
432 Ok(())
433 }
434
435 pub async fn query_events(&mut self, filter: EventFilter) -> Result<Vec<FileEvent>> {
437 let response = self.send_request("query_events", &filter).await?;
438 serde_json::from_value(response).map_err(|_| ClientError::InvalidResponse)
439 }
440
441 pub async fn query_awareness(
443 &mut self,
444 project_id: &str,
445 path: &str,
446 ) -> Result<FileAwareness> {
447 let params = json!({
448 "project_id": project_id,
449 "path": path,
450 });
451 let response = self.send_request("query_awareness", ¶ms).await?;
452 serde_json::from_value(response).map_err(|_| ClientError::InvalidResponse)
453 }
454
455 pub async fn attribute(
457 &mut self,
458 project_id: &str,
459 file_path: &str,
460 source: &str,
461 source_id: Option<&str>,
462 ) -> Result<()> {
463 let mut params = json!({
464 "project_id": project_id,
465 "file_path": file_path,
466 "source": source,
467 });
468 if let Some(sid) = source_id {
469 params["source_id"] = json!(sid);
470 }
471 self.send_request("attribute", ¶ms).await?;
472 Ok(())
473 }
474
475 pub async fn query_agents(&mut self) -> Result<Vec<serde_json::Value>> {
477 let empty = json!({});
478 let response = self.send_request("query_agents", &empty).await?;
479 serde_json::from_value(response).map_err(|_| ClientError::InvalidResponse)
480 }
481
482 async fn send_request<T: Serialize>(
487 &mut self,
488 method: &str,
489 params: &T,
490 ) -> Result<JsonValue> {
491 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
492
493 let request = JsonRpcRequest {
494 jsonrpc: "2.0",
495 method,
496 params,
497 id,
498 };
499
500 let mut request_json = serde_json::to_string(&request)?;
501 request_json.push('\n');
502 tracing::debug!("sending request: {}", request_json.trim());
503
504 let (tx, rx) = oneshot::channel();
506 {
507 let mut map = self.pending.lock().await;
508 map.insert(id, tx);
509 }
510
511 if let Err(e) = self.writer.write_all(request_json.as_bytes()).await {
513 self.pending.lock().await.remove(&id);
514 return Err(ClientError::Io(e));
515 }
516
517 rx.await.map_err(|_| ClientError::ConnectionClosed)?
519 }
520}
521
522#[cfg(test)]
523mod tests {
524 use super::*;
525 use ambient_fs_core::{EventType, Source};
526
527 #[cfg(unix)]
530 mod unix_stream_tests {
531 use super::*;
532
533 fn mock_client() -> (AmbientFsClient, UnixStream) {
534 let (client_stream, server_stream) = UnixStream::pair().unwrap();
535 let client = AmbientFsClient::from_stream(
536 client_stream,
537 PathBuf::from("/tmp/test.sock"),
538 256,
539 );
540 (client, server_stream)
541 }
542
543 #[tokio::test]
544 async fn reader_routes_response_by_id() {
545 let (client, mut server) = mock_client();
546
547 let (tx10, rx10) = oneshot::channel();
548 let (tx20, rx20) = oneshot::channel();
549 {
550 let mut map = client.pending.lock().await;
551 map.insert(10, tx10);
552 map.insert(20, tx20);
553 }
554
555 let r20 = format!("{}\n", json!({"jsonrpc":"2.0","result":"twenty","id":20}));
556 let r10 = format!("{}\n", json!({"jsonrpc":"2.0","result":"ten","id":10}));
557 server.write_all(r20.as_bytes()).await.unwrap();
558 server.write_all(r10.as_bytes()).await.unwrap();
559
560 let result20 = rx20.await.unwrap().unwrap();
561 let result10 = rx10.await.unwrap().unwrap();
562 assert_eq!(result20, json!("twenty"));
563 assert_eq!(result10, json!("ten"));
564 }
565
566 #[tokio::test]
567 async fn reader_routes_error_response() {
568 let (client, mut server) = mock_client();
569
570 let (tx, rx) = oneshot::channel();
571 {
572 client.pending.lock().await.insert(1, tx);
573 }
574
575 let resp = format!(
576 "{}\n",
577 json!({"jsonrpc":"2.0","error":{"code":-32000,"message":"not found"},"id":1})
578 );
579 server.write_all(resp.as_bytes()).await.unwrap();
580
581 let result = rx.await.unwrap();
582 assert!(matches!(result, Err(ClientError::DaemonError(msg)) if msg == "not found"));
583 }
584
585 #[tokio::test]
586 async fn reader_routes_notification_to_channel() {
587 let (mut client, mut server) = mock_client();
588 let mut rx = client.take_notification_stream().unwrap();
589
590 let notif = format!(
591 "{}\n",
592 json!({"jsonrpc":"2.0","method":"event","params":{"path":"src/lib.rs"}})
593 );
594 server.write_all(notif.as_bytes()).await.unwrap();
595
596 let received = rx.recv().await.unwrap();
597 assert_eq!(received.method, "event");
598 assert_eq!(received.params["path"], "src/lib.rs");
599 }
600
601 #[tokio::test]
602 async fn take_notification_stream_returns_none_on_second_call() {
603 let (mut client, _server) = mock_client();
604 assert!(client.take_notification_stream().is_some());
605 assert!(client.take_notification_stream().is_none());
606 }
607
608 #[tokio::test]
609 async fn send_request_receives_response() {
610 let (mut client, server) = mock_client();
611
612 tokio::spawn(async move {
613 let (read_half, mut write_half) = server.into_split();
614 let mut reader = BufReader::new(read_half);
615 let mut line = String::new();
616 reader.read_line(&mut line).await.unwrap();
617 let req: JsonValue = serde_json::from_str(&line).unwrap();
618 let id = req["id"].as_u64().unwrap();
619 assert_eq!(req["method"], "watch");
620
621 let resp = format!("{}\n", json!({"jsonrpc":"2.0","result":"ok","id":id}));
622 write_half.write_all(resp.as_bytes()).await.unwrap();
623 });
624
625 client.watch("/test/path").await.unwrap();
626 }
627
628 #[tokio::test]
629 async fn send_request_receives_error_response() {
630 let (mut client, server) = mock_client();
631
632 tokio::spawn(async move {
633 let (read_half, mut write_half) = server.into_split();
634 let mut reader = BufReader::new(read_half);
635 let mut line = String::new();
636 reader.read_line(&mut line).await.unwrap();
637 let req: JsonValue = serde_json::from_str(&line).unwrap();
638 let id = req["id"].as_u64().unwrap();
639
640 let resp = format!(
641 "{}\n",
642 json!({"jsonrpc":"2.0","error":{"code":-32000,"message":"project not found"},"id":id})
643 );
644 write_half.write_all(resp.as_bytes()).await.unwrap();
645 });
646
647 let result = client.watch_project("/nonexistent").await;
648 assert!(matches!(result, Err(ClientError::DaemonError(msg)) if msg == "project not found"));
649 }
650
651 #[tokio::test]
652 async fn connection_closed_fails_pending_requests() {
653 let (mut client, server) = mock_client();
654 drop(server);
655
656 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
657
658 let result = client.watch("/test").await;
659 assert!(matches!(
660 result,
661 Err(ClientError::ConnectionClosed) | Err(ClientError::Io(_))
662 ));
663 }
664
665 #[tokio::test]
666 async fn notification_channel_closed_after_drop() {
667 let (mut client, mut server) = mock_client();
668 let mut rx = client.take_notification_stream().unwrap();
669
670 let notif = format!("{}\n", json!({"jsonrpc":"2.0","method":"ping","params":{}}));
671 server.write_all(notif.as_bytes()).await.unwrap();
672 let _ = rx.recv().await.unwrap();
673
674 drop(client);
675 drop(server);
676
677 assert!(rx.recv().await.is_none());
678 }
679
680 #[tokio::test]
681 async fn request_ids_increment() {
682 let (mut client, server) = mock_client();
683
684 tokio::spawn(async move {
685 let (read_half, mut write_half) = server.into_split();
686 let mut reader = BufReader::new(read_half);
687 for _ in 0..2 {
688 let mut line = String::new();
689 reader.read_line(&mut line).await.unwrap();
690 let req: JsonValue = serde_json::from_str(&line).unwrap();
691 let id = req["id"].as_u64().unwrap();
692 let resp = format!("{}\n", json!({"jsonrpc":"2.0","result":"ok","id":id}));
693 write_half.write_all(resp.as_bytes()).await.unwrap();
694 }
695 });
696
697 client.watch("/path1").await.unwrap();
698 client.watch("/path2").await.unwrap();
699 assert_eq!(client.next_id.load(Ordering::Relaxed), 3);
700 }
701
702 #[tokio::test]
703 async fn client_stores_socket_path() {
704 let (client, _server) = mock_client();
705 assert_eq!(client.socket_path, PathBuf::from("/tmp/test.sock"));
706 }
707
708 #[tokio::test]
709 async fn is_connected_reflects_reader_state() {
710 let (client, server) = mock_client();
711 assert!(client.is_connected());
712
713 drop(server);
714 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
715 assert!(!client.is_connected());
716 }
717
718 #[tokio::test]
719 async fn interleaved_notifications_and_responses() {
720 let (mut client, server) = mock_client();
721 let mut rx = client.take_notification_stream().unwrap();
722
723 tokio::spawn(async move {
724 let (read_half, mut write_half) = server.into_split();
725 let mut reader = BufReader::new(read_half);
726
727 let mut line = String::new();
728 reader.read_line(&mut line).await.unwrap();
729 let req: JsonValue = serde_json::from_str(&line).unwrap();
730 let id = req["id"].as_u64().unwrap();
731
732 let notif = format!(
733 "{}\n",
734 json!({"jsonrpc":"2.0","method":"event","params":{"type":"created"}})
735 );
736 write_half.write_all(notif.as_bytes()).await.unwrap();
737
738 let resp = format!("{}\n", json!({"jsonrpc":"2.0","result":"ok","id":id}));
739 write_half.write_all(resp.as_bytes()).await.unwrap();
740 });
741
742 client.watch("/test").await.unwrap();
743
744 let notif = rx.recv().await.unwrap();
745 assert_eq!(notif.method, "event");
746 assert_eq!(notif.params["type"], "created");
747 }
748
749 #[tokio::test]
750 async fn recv_notification_returns_typed() {
751 let (mut client, mut server) = mock_client();
752
753 let notif = format!(
754 "{}\n",
755 json!({
756 "jsonrpc": "2.0",
757 "method": "analysis_complete",
758 "params": {
759 "project_id": "proj-1",
760 "file_path": "src/main.rs",
761 "line_count": 42,
762 "todo_count": 3
763 }
764 })
765 );
766 server.write_all(notif.as_bytes()).await.unwrap();
767
768 let typed = client.recv_notification().await.unwrap().unwrap();
769 match typed {
770 Notification::AnalysisComplete { params } => {
771 assert_eq!(params.project_id, "proj-1");
772 assert_eq!(params.line_count, 42);
773 }
774 _ => panic!("expected AnalysisComplete"),
775 }
776 }
777
778 #[tokio::test]
779 async fn recv_notification_fails_after_take() {
780 let (mut client, _server) = mock_client();
781 let _rx = client.take_notification_stream().unwrap();
782
783 let result = client.recv_notification().await;
784 assert!(matches!(result, Err(ClientError::NotConnected)));
785 }
786 }
787
788 #[tokio::test]
791 async fn notification_serde_roundtrip() {
792 let notif = ClientNotification {
793 method: "event".to_string(),
794 params: json!({"project_id": "proj-1", "path": "src/main.rs"}),
795 };
796 let serialized = serde_json::to_string(¬if).unwrap();
797 let parsed: ClientNotification = serde_json::from_str(&serialized).unwrap();
798 assert_eq!(parsed, notif);
799 }
800
801 #[tokio::test]
802 async fn notification_deserialize_without_params() {
803 let raw = r#"{"method":"ping"}"#;
804 let notif: ClientNotification = serde_json::from_str(raw).unwrap();
805 assert_eq!(notif.method, "ping");
806 assert_eq!(notif.params, JsonValue::Null);
807 }
808
809 #[tokio::test]
810 async fn events_with_filter_sends_params() {
811 let filter = EventFilter {
812 project_id: Some("my-project".to_string()),
813 since: Some(1708100000),
814 source: Some("ai_agent".to_string()),
815 limit: Some(100),
816 };
817 let json = serde_json::to_string(&filter).unwrap();
818 assert!(json.contains("my-project"));
819 assert!(json.contains("ai_agent"));
820 }
821
822 #[tokio::test]
823 async fn subscribe_sends_project_id() {
824 let params = json!({ "project_id": "my-project" });
825 let json = serde_json::to_string(¶ms).unwrap();
826 assert!(json.contains("my-project"));
827 }
828
829 #[tokio::test]
830 async fn events_parses_daemon_response() {
831 let event_json = r#"{
832 "jsonrpc":"2.0",
833 "result":[{
834 "timestamp":"2024-02-16T10:32:00Z",
835 "event_type":"created",
836 "file_path":"src/main.rs",
837 "project_id":"my-project",
838 "source":"ai_agent",
839 "source_id":"chat_42",
840 "machine_id":"machine-1",
841 "content_hash":"abc123"
842 }],
843 "id":1
844 }"#;
845
846 let response: JsonRpcResponse = serde_json::from_str(event_json).unwrap();
847 match response.payload {
848 ResponsePayload::Success { result } => {
849 let events: Vec<FileEvent> = serde_json::from_value(result).unwrap();
850 assert_eq!(events.len(), 1);
851 assert_eq!(events[0].file_path, "src/main.rs");
852 assert_eq!(events[0].source, Source::AiAgent);
853 }
854 ResponsePayload::Error { .. } => panic!("expected success"),
855 }
856 }
857
858 #[tokio::test]
859 async fn daemon_error_is_propagated() {
860 let error_json = r#"{
861 "jsonrpc":"2.0",
862 "error":{"code":-32000,"message":"project not found"},
863 "id":1
864 }"#;
865
866 let response: JsonRpcResponse = serde_json::from_str(error_json).unwrap();
867 match response.payload {
868 ResponsePayload::Error { error } => {
869 assert_eq!(error.code, -32000);
870 assert_eq!(error.message, "project not found");
871 }
872 ResponsePayload::Success { .. } => panic!("expected error"),
873 }
874 }
875
876 #[tokio::test]
877 async fn event_filter_default_is_empty() {
878 let filter = EventFilter::default();
879 assert!(filter.project_id.is_none());
880 assert!(filter.since.is_none());
881 assert!(filter.source.is_none());
882 assert!(filter.limit.is_none());
883 }
884
885 #[tokio::test]
886 async fn jsonrpc_request_serialization() {
887 let request = JsonRpcRequest {
888 jsonrpc: "2.0",
889 method: "watch",
890 params: json!({ "path": "/home/user/project" }),
891 id: 1,
892 };
893
894 let json = serde_json::to_string(&request).unwrap();
895 assert!(json.contains(r#""jsonrpc":"2.0""#));
896 assert!(json.contains(r#""method":"watch""#));
897 assert!(json.contains(r#""id":1"#));
898 assert!(json.contains(r#""path""#));
899 }
900
901 #[tokio::test]
902 async fn multiple_events_parsed_correctly() {
903 let events_json = r#"{
904 "jsonrpc":"2.0",
905 "result":[
906 {
907 "timestamp":"2024-02-16T10:32:00Z",
908 "event_type":"created",
909 "file_path":"src/main.rs",
910 "project_id":"my-project",
911 "source":"user",
912 "machine_id":"m1"
913 },
914 {
915 "timestamp":"2024-02-16T10:33:00Z",
916 "event_type":"modified",
917 "file_path":"src/lib.rs",
918 "project_id":"my-project",
919 "source":"ai_agent",
920 "source_id":"chat_42",
921 "machine_id":"m1"
922 }
923 ],
924 "id":1
925 }"#;
926
927 let response: JsonRpcResponse = serde_json::from_str(events_json).unwrap();
928 match response.payload {
929 ResponsePayload::Success { result } => {
930 let events: Vec<FileEvent> = serde_json::from_value(result).unwrap();
931 assert_eq!(events.len(), 2);
932 assert_eq!(events[0].event_type, EventType::Created);
933 assert_eq!(events[1].event_type, EventType::Modified);
934 }
935 ResponsePayload::Error { .. } => panic!("expected success"),
936 }
937 }
938
939 #[test]
940 fn client_error_display() {
941 let err = ClientError::NotConnected;
942 assert_eq!(err.to_string(), "daemon not connected");
943
944 let err = ClientError::DaemonError("something broke".to_string());
945 assert_eq!(err.to_string(), "daemon returned error: something broke");
946
947 let err = ClientError::ConnectionClosed;
948 assert_eq!(err.to_string(), "request failed: connection closed");
949 }
950
951 #[tokio::test]
952 async fn attribute_request_serialization() {
953 let params = json!({
954 "project_id": "my-project",
955 "file_path": "src/auth.rs",
956 "source": "ai_agent",
957 "source_id": "chat-42"
958 });
959 let json = serde_json::to_string(¶ms).unwrap();
960 assert!(json.contains("my-project"));
961 assert!(json.contains("src/auth.rs"));
962 assert!(json.contains("ai_agent"));
963 assert!(json.contains("chat-42"));
964 }
965
966 #[tokio::test]
967 async fn attribute_request_without_source_id() {
968 let params = json!({
969 "project_id": "my-project",
970 "file_path": "src/auth.rs",
971 "source": "user"
972 });
973 let json = serde_json::to_string(¶ms).unwrap();
974 assert!(json.contains("user"));
975 assert!(!json.contains("source_id"));
976 }
977
978 #[tokio::test]
979 async fn watch_project_response_parsing() {
980 let response_json = r#"{
981 "jsonrpc":"2.0",
982 "result":"proj-abc-123",
983 "id":1
984 }"#;
985
986 let response: JsonRpcResponse = serde_json::from_str(response_json).unwrap();
987 match response.payload {
988 ResponsePayload::Success { result } => {
989 let project_id: String = serde_json::from_value(result).unwrap();
990 assert_eq!(project_id, "proj-abc-123");
991 }
992 ResponsePayload::Error { .. } => panic!("expected success"),
993 }
994 }
995
996 #[tokio::test]
997 async fn query_awareness_response_parsing() {
998 let awareness_json = r#"{
999 "jsonrpc":"2.0",
1000 "result":{
1001 "file_path":"src/main.rs",
1002 "project_id":"proj-123",
1003 "last_modified":"2024-02-16T10:32:00Z",
1004 "change_frequency":"hot",
1005 "modified_by":"ai_agent",
1006 "todo_count":0,
1007 "chat_references":0,
1008 "lint_hints":0,
1009 "line_count":100
1010 },
1011 "id":1
1012 }"#;
1013
1014 let response: JsonRpcResponse = serde_json::from_str(awareness_json).unwrap();
1015 match response.payload {
1016 ResponsePayload::Success { result } => {
1017 let awareness: FileAwareness = serde_json::from_value(result).unwrap();
1018 assert_eq!(awareness.file_path, "src/main.rs");
1019 assert_eq!(awareness.modified_by, ambient_fs_core::Source::AiAgent);
1020 }
1021 ResponsePayload::Error { .. } => panic!("expected success"),
1022 }
1023 }
1024
1025 #[tokio::test]
1026 async fn query_agents_response_parsing() {
1027 let agents_json = r#"{
1028 "jsonrpc":"2.0",
1029 "result":[
1030 {"id":"agent-1","name":"claude","status":"active"},
1031 {"id":"agent-2","name":"cursor","status":"idle"}
1032 ],
1033 "id":1
1034 }"#;
1035
1036 let response: JsonRpcResponse = serde_json::from_str(agents_json).unwrap();
1037 match response.payload {
1038 ResponsePayload::Success { result } => {
1039 let agents: Vec<serde_json::Value> = serde_json::from_value(result).unwrap();
1040 assert_eq!(agents.len(), 2);
1041 assert_eq!(agents[0]["name"], "claude");
1042 }
1043 ResponsePayload::Error { .. } => panic!("expected success"),
1044 }
1045 }
1046}