agtrace_runtime/runtime/
streamer.rs

1use crate::runtime::events::{StreamEvent, WorkspaceEvent};
2use agtrace_engine::{AgentSession, assemble_session};
3use agtrace_index::Database;
4use agtrace_providers::ProviderAdapter;
5use agtrace_types::AgentEvent;
6use anyhow::Result;
7use notify::{Event, EventKind, PollWatcher, RecursiveMode, Watcher};
8use std::collections::HashMap;
9use std::path::{Path, PathBuf};
10use std::sync::mpsc::{Receiver, Sender, channel};
11use std::sync::{Arc, Mutex};
12use std::thread::JoinHandle;
13use std::time::Duration;
14
15struct StreamContext {
16    provider: Arc<ProviderAdapter>,
17    file_states: HashMap<PathBuf, usize>,
18    all_events: Vec<AgentEvent>,
19    session: Option<AgentSession>,
20}
21
22impl StreamContext {
23    fn new(provider: Arc<ProviderAdapter>) -> Self {
24        Self {
25            provider,
26            file_states: HashMap::new(),
27            all_events: Vec::new(),
28            session: None,
29        }
30    }
31
32    fn load_all_events(&mut self, session_files: &[PathBuf]) -> Result<Vec<AgentEvent>> {
33        let mut events = Vec::new();
34
35        for path in session_files {
36            let file_events = Self::load_file(path, &self.provider)?;
37            self.file_states.insert(path.clone(), file_events.len());
38            events.extend(file_events);
39        }
40
41        events.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
42
43        self.all_events = events.clone();
44        self.session = assemble_session(&self.all_events);
45
46        Ok(events)
47    }
48
49    fn handle_change(&mut self, path: &Path) -> Result<Vec<AgentEvent>> {
50        let all_file_events = Self::load_file(path, &self.provider)?;
51        let last_count = *self.file_states.get(path).unwrap_or(&0);
52
53        if all_file_events.len() < last_count {
54            self.file_states
55                .insert(path.to_path_buf(), all_file_events.len());
56            self.all_events = all_file_events.clone();
57            self.session = assemble_session(&self.all_events);
58            return Ok(all_file_events);
59        }
60
61        let new_events: Vec<AgentEvent> = all_file_events.into_iter().skip(last_count).collect();
62
63        self.file_states
64            .insert(path.to_path_buf(), last_count + new_events.len());
65
66        self.all_events.extend(new_events.clone());
67        self.all_events
68            .sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
69        self.session = assemble_session(&self.all_events);
70
71        Ok(new_events)
72    }
73
74    fn load_file(path: &Path, provider: &Arc<ProviderAdapter>) -> Result<Vec<AgentEvent>> {
75        provider.parser.parse_file(path)
76    }
77}
78
79pub struct SessionStreamer {
80    _watcher: PollWatcher,
81    _handle: JoinHandle<()>,
82    rx: Receiver<WorkspaceEvent>,
83}
84
85impl SessionStreamer {
86    pub fn receiver(&self) -> &Receiver<WorkspaceEvent> {
87        &self.rx
88    }
89
90    pub fn attach(
91        session_id: String,
92        db: Arc<Mutex<Database>>,
93        provider: Arc<ProviderAdapter>,
94    ) -> Result<Self> {
95        let session_files = {
96            let db_lock = db.lock().unwrap();
97            let files = db_lock.get_session_files(&session_id)?;
98            if files.is_empty() {
99                anyhow::bail!("Session not found: {}", session_id);
100            }
101            files
102                .into_iter()
103                .map(|f| PathBuf::from(f.path))
104                .collect::<Vec<_>>()
105        };
106
107        Self::start_core(session_id, session_files, provider)
108    }
109
110    /// Attach to a session by scanning the filesystem for session files
111    /// This is used when the session is not yet indexed in the database
112    pub fn attach_from_filesystem(
113        session_id: String,
114        log_root: PathBuf,
115        provider: Arc<ProviderAdapter>,
116    ) -> Result<Self> {
117        let session_files = find_session_files(&log_root, &session_id, &provider)?;
118
119        if session_files.is_empty() {
120            anyhow::bail!("No files found for session: {}", session_id);
121        }
122
123        Self::start_core(session_id, session_files, provider)
124    }
125
126    fn start_core(
127        session_id: String,
128        session_files: Vec<PathBuf>,
129        provider: Arc<ProviderAdapter>,
130    ) -> Result<Self> {
131        let (tx_out, rx_out) = channel();
132        let (tx_fs, rx_fs) = channel();
133
134        let watch_dir = session_files
135            .first()
136            .and_then(|p| p.parent())
137            .ok_or_else(|| anyhow::anyhow!("Cannot determine watch directory"))?
138            .to_path_buf();
139
140        let config = notify::Config::default().with_poll_interval(Duration::from_millis(500));
141
142        let mut watcher = PollWatcher::new(
143            move |res: Result<Event, _>| {
144                if let Ok(event) = res {
145                    let _ = tx_fs.send(event);
146                }
147            },
148            config,
149        )?;
150
151        watcher.watch(&watch_dir, RecursiveMode::Recursive)?;
152
153        let tx_attached = tx_out.clone();
154        let first_file = session_files.first().cloned().unwrap();
155        let _ = tx_attached.send(WorkspaceEvent::Stream(StreamEvent::Attached {
156            session_id: session_id.clone(),
157            path: first_file.clone(),
158        }));
159
160        let mut context = StreamContext::new(provider);
161
162        if let Ok(events) = context.load_all_events(&session_files)
163            && !events.is_empty()
164        {
165            let _ = tx_out.send(WorkspaceEvent::Stream(StreamEvent::Events {
166                events: events.clone(),
167                session: context.session.clone(),
168            }));
169        }
170
171        let tx_worker = tx_out.clone();
172        let handle = std::thread::Builder::new()
173            .name("session-streamer".to_string())
174            .spawn(move || {
175                loop {
176                    match rx_fs.recv() {
177                        Ok(event) => {
178                            if let Err(e) =
179                                handle_fs_event(&event, &session_files, &mut context, &tx_worker)
180                            {
181                                let _ = tx_worker
182                                    .send(WorkspaceEvent::Error(format!("Stream error: {}", e)));
183                            }
184                        }
185                        Err(_) => {
186                            let _ =
187                                tx_worker.send(WorkspaceEvent::Stream(StreamEvent::Disconnected {
188                                    reason: "Stream ended".to_string(),
189                                }));
190                            break;
191                        }
192                    }
193                }
194            })?;
195
196        Ok(Self {
197            _watcher: watcher,
198            _handle: handle,
199            rx: rx_out,
200        })
201    }
202}
203
204fn handle_fs_event(
205    event: &Event,
206    session_files: &[PathBuf],
207    context: &mut StreamContext,
208    tx: &Sender<WorkspaceEvent>,
209) -> Result<()> {
210    if let EventKind::Modify(_) = event.kind {
211        for path in &event.paths {
212            if session_files.contains(path)
213                && let Ok(new_events) = context.handle_change(path)
214                && !new_events.is_empty()
215            {
216                let _ = tx.send(WorkspaceEvent::Stream(StreamEvent::Events {
217                    events: new_events,
218                    session: context.session.clone(),
219                }));
220            }
221        }
222    }
223    Ok(())
224}
225
226fn find_session_files(
227    log_root: &Path,
228    session_id: &str,
229    provider: &Arc<ProviderAdapter>,
230) -> Result<Vec<PathBuf>> {
231    use std::fs;
232
233    let mut session_files = Vec::new();
234
235    fn visit_dir(
236        dir: &Path,
237        session_id: &str,
238        provider: &Arc<ProviderAdapter>,
239        files: &mut Vec<PathBuf>,
240    ) -> Result<()> {
241        if !dir.is_dir() {
242            return Ok(());
243        }
244
245        for entry in fs::read_dir(dir)? {
246            let entry = entry?;
247            let path = entry.path();
248
249            if path.is_dir() {
250                visit_dir(&path, session_id, provider, files)?;
251            } else if provider.discovery.probe(&path).is_match()
252                && let Ok(id) = provider.discovery.extract_session_id(&path)
253                && id == session_id
254            {
255                files.push(path);
256            }
257        }
258
259        Ok(())
260    }
261
262    visit_dir(log_root, session_id, provider, &mut session_files)?;
263    Ok(session_files)
264}