1use std::pin::Pin;
2use std::sync::Arc;
3use std::task::{Context, Poll};
4
5use futures::stream::Stream;
6
7use crate::error::Result;
8
9pub use agtrace_runtime::{DiscoveryEvent, StreamEvent, WorkspaceEvent};
11
12pub struct WatchBuilder {
13 inner: Arc<agtrace_runtime::AgTrace>,
14 providers: Vec<String>,
15}
16
17impl WatchBuilder {
18 pub(crate) fn new(inner: Arc<agtrace_runtime::AgTrace>) -> Self {
19 Self {
20 inner,
21 providers: vec![],
22 }
23 }
24
25 pub fn provider(mut self, name: &str) -> Self {
26 self.providers.push(name.to_string());
27 self
28 }
29
30 pub fn all_providers(mut self) -> Self {
31 self.providers.clear();
32 self
33 }
34
35 pub fn start(self) -> Result<LiveStream> {
36 let monitor = self
37 .inner
38 .workspace_monitor()
39 .map_err(crate::error::Error::Runtime)?
40 .start_background_scan()
41 .map_err(crate::error::Error::Runtime)?;
42
43 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
45
46 tokio::task::spawn_blocking(move || {
49 while let Ok(event) = monitor.receiver().recv() {
50 if tx.send(event).is_err() {
51 break; }
53 }
54 });
55
56 Ok(LiveStream { receiver: rx })
57 }
58}
59
60pub struct LiveStream {
61 receiver: tokio::sync::mpsc::UnboundedReceiver<WorkspaceEvent>,
62}
63
64impl LiveStream {
65 pub fn try_next(&mut self) -> Option<WorkspaceEvent> {
69 self.receiver.try_recv().ok()
70 }
71}
72
73impl Stream for LiveStream {
74 type Item = WorkspaceEvent;
75
76 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
77 self.receiver.poll_recv(cx)
78 }
79}