agtrace_sdk/
watch.rs

1use std::pin::Pin;
2use std::sync::Arc;
3use std::task::{Context, Poll};
4
5use futures::stream::Stream;
6
7use crate::error::Result;
8
9// Re-export event types for convenient use in examples/client code
10pub use agtrace_runtime::{DiscoveryEvent, StreamEvent, WorkspaceEvent};
11
12pub struct WatchBuilder {
13    inner: Arc<agtrace_runtime::AgTrace>,
14    providers: Vec<String>,
15}
16
17impl WatchBuilder {
18    pub(crate) fn new(inner: Arc<agtrace_runtime::AgTrace>) -> Self {
19        Self {
20            inner,
21            providers: vec![],
22        }
23    }
24
25    pub fn provider(mut self, name: &str) -> Self {
26        self.providers.push(name.to_string());
27        self
28    }
29
30    pub fn all_providers(mut self) -> Self {
31        self.providers.clear();
32        self
33    }
34
35    pub fn start(self) -> Result<LiveStream> {
36        let monitor = self
37            .inner
38            .workspace_monitor()
39            .map_err(crate::error::Error::Runtime)?
40            .start_background_scan()
41            .map_err(crate::error::Error::Runtime)?;
42
43        // Create async channel for stream implementation
44        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
45
46        // Spawn background task to bridge blocking receiver to async sender
47        // The monitor is moved into the task
48        tokio::task::spawn_blocking(move || {
49            while let Ok(event) = monitor.receiver().recv() {
50                if tx.send(event).is_err() {
51                    break; // Receiver dropped
52                }
53            }
54        });
55
56        Ok(LiveStream { receiver: rx })
57    }
58}
59
60pub struct LiveStream {
61    receiver: tokio::sync::mpsc::UnboundedReceiver<WorkspaceEvent>,
62}
63
64impl LiveStream {
65    /// Poll for the next event (non-blocking).
66    ///
67    /// Returns `None` if no event is available immediately.
68    pub fn try_next(&mut self) -> Option<WorkspaceEvent> {
69        self.receiver.try_recv().ok()
70    }
71}
72
73impl Stream for LiveStream {
74    type Item = WorkspaceEvent;
75
76    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
77        self.receiver.poll_recv(cx)
78    }
79}