agtrace_runtime/runtime/
streamer.rs

1use crate::runtime::events::{StreamEvent, WorkspaceEvent};
2use crate::{Error, Result};
3use agtrace_engine::{AgentSession, assemble_session};
4use agtrace_index::Database;
5use agtrace_providers::ProviderAdapter;
6use agtrace_types::AgentEvent;
7use notify::{Event, EventKind, PollWatcher, RecursiveMode, Watcher};
8use std::collections::HashMap;
9use std::path::{Path, PathBuf};
10use std::sync::mpsc::{Receiver, Sender, channel};
11use std::sync::{Arc, Mutex};
12use std::thread::JoinHandle;
13use std::time::Duration;
14
15struct StreamContext {
16    provider: Arc<ProviderAdapter>,
17    file_states: HashMap<PathBuf, usize>,
18    all_events: Vec<AgentEvent>,
19    session: Option<AgentSession>,
20}
21
22impl StreamContext {
23    fn new(provider: Arc<ProviderAdapter>) -> Self {
24        Self {
25            provider,
26            file_states: HashMap::new(),
27            all_events: Vec::new(),
28            session: None,
29        }
30    }
31
32    fn load_all_events(&mut self, session_files: &[PathBuf]) -> Result<Vec<AgentEvent>> {
33        let mut events = Vec::new();
34
35        for path in session_files {
36            let file_events = Self::load_file(path, &self.provider)?;
37            self.file_states.insert(path.clone(), file_events.len());
38            events.extend(file_events);
39        }
40
41        events.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
42
43        self.all_events = events.clone();
44        self.session = assemble_session(&self.all_events);
45
46        Ok(events)
47    }
48
49    fn handle_change(&mut self, path: &Path) -> Result<Vec<AgentEvent>> {
50        let all_file_events = Self::load_file(path, &self.provider)?;
51        let last_count = *self.file_states.get(path).unwrap_or(&0);
52
53        if all_file_events.len() < last_count {
54            self.file_states
55                .insert(path.to_path_buf(), all_file_events.len());
56            self.all_events = all_file_events.clone();
57            self.session = assemble_session(&self.all_events);
58            return Ok(all_file_events);
59        }
60
61        let new_events: Vec<AgentEvent> = all_file_events.into_iter().skip(last_count).collect();
62
63        self.file_states
64            .insert(path.to_path_buf(), last_count + new_events.len());
65
66        self.all_events.extend(new_events.clone());
67        self.all_events
68            .sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
69        self.session = assemble_session(&self.all_events);
70
71        Ok(new_events)
72    }
73
74    fn load_file(path: &Path, provider: &Arc<ProviderAdapter>) -> Result<Vec<AgentEvent>> {
75        Ok(provider.parser.parse_file(path)?)
76    }
77}
78
79pub struct SessionStreamer {
80    _watcher: PollWatcher,
81    _handle: JoinHandle<()>,
82    rx: Receiver<WorkspaceEvent>,
83}
84
85impl SessionStreamer {
86    pub fn receiver(&self) -> &Receiver<WorkspaceEvent> {
87        &self.rx
88    }
89
90    pub fn attach(
91        session_id: String,
92        db: Arc<Mutex<Database>>,
93        provider: Arc<ProviderAdapter>,
94    ) -> Result<Self> {
95        let session_files = {
96            let db_lock = db.lock().unwrap();
97            let files = db_lock.get_session_files(&session_id)?;
98            if files.is_empty() {
99                return Err(Error::InvalidOperation(format!(
100                    "Session not found: {}",
101                    session_id
102                )));
103            }
104            files
105                .into_iter()
106                .map(|f| PathBuf::from(f.path))
107                .collect::<Vec<_>>()
108        };
109
110        Self::start_core(session_id, session_files, provider)
111    }
112
113    /// Attach to a session by scanning the filesystem for session files
114    /// This is used when the session is not yet indexed in the database
115    pub fn attach_from_filesystem(
116        session_id: String,
117        log_root: PathBuf,
118        provider: Arc<ProviderAdapter>,
119    ) -> Result<Self> {
120        let session_files = find_session_files(&log_root, &session_id, &provider)?;
121
122        if session_files.is_empty() {
123            return Err(Error::InvalidOperation(format!(
124                "No files found for session: {}",
125                session_id
126            )));
127        }
128
129        Self::start_core(session_id, session_files, provider)
130    }
131
132    fn start_core(
133        session_id: String,
134        session_files: Vec<PathBuf>,
135        provider: Arc<ProviderAdapter>,
136    ) -> Result<Self> {
137        let (tx_out, rx_out) = channel();
138        let (tx_fs, rx_fs) = channel();
139
140        let watch_dir = session_files
141            .first()
142            .and_then(|p| p.parent())
143            .ok_or_else(|| Error::InvalidOperation("Cannot determine watch directory".to_string()))?
144            .to_path_buf();
145
146        let config = notify::Config::default().with_poll_interval(Duration::from_millis(500));
147
148        let mut watcher = PollWatcher::new(
149            move |res: std::result::Result<Event, notify::Error>| {
150                if let Ok(event) = res {
151                    let _ = tx_fs.send(event);
152                }
153            },
154            config,
155        )
156        .map_err(|e| Error::InvalidOperation(format!("Failed to create file watcher: {}", e)))?;
157
158        watcher
159            .watch(&watch_dir, RecursiveMode::Recursive)
160            .map_err(|e| Error::InvalidOperation(format!("Failed to watch directory: {}", e)))?;
161
162        let tx_attached = tx_out.clone();
163        let first_file = session_files.first().cloned().unwrap();
164        let _ = tx_attached.send(WorkspaceEvent::Stream(StreamEvent::Attached {
165            session_id: session_id.clone(),
166            path: first_file.clone(),
167        }));
168
169        let mut context = StreamContext::new(provider);
170
171        if let Ok(events) = context.load_all_events(&session_files)
172            && !events.is_empty()
173        {
174            let _ = tx_out.send(WorkspaceEvent::Stream(StreamEvent::Events {
175                events: events.clone(),
176                session: context.session.clone(),
177            }));
178        }
179
180        let tx_worker = tx_out.clone();
181        let handle = std::thread::Builder::new()
182            .name("session-streamer".to_string())
183            .spawn(move || {
184                loop {
185                    match rx_fs.recv() {
186                        Ok(event) => {
187                            if let Err(e) =
188                                handle_fs_event(&event, &session_files, &mut context, &tx_worker)
189                            {
190                                let _ = tx_worker
191                                    .send(WorkspaceEvent::Error(format!("Stream error: {}", e)));
192                            }
193                        }
194                        Err(_) => {
195                            let _ =
196                                tx_worker.send(WorkspaceEvent::Stream(StreamEvent::Disconnected {
197                                    reason: "Stream ended".to_string(),
198                                }));
199                            break;
200                        }
201                    }
202                }
203            })?;
204
205        Ok(Self {
206            _watcher: watcher,
207            _handle: handle,
208            rx: rx_out,
209        })
210    }
211}
212
213fn handle_fs_event(
214    event: &Event,
215    session_files: &[PathBuf],
216    context: &mut StreamContext,
217    tx: &Sender<WorkspaceEvent>,
218) -> Result<()> {
219    if let EventKind::Modify(_) = event.kind {
220        for path in &event.paths {
221            if session_files.contains(path)
222                && let Ok(new_events) = context.handle_change(path)
223                && !new_events.is_empty()
224            {
225                let _ = tx.send(WorkspaceEvent::Stream(StreamEvent::Events {
226                    events: new_events,
227                    session: context.session.clone(),
228                }));
229            }
230        }
231    }
232    Ok(())
233}
234
235fn find_session_files(
236    log_root: &Path,
237    session_id: &str,
238    provider: &Arc<ProviderAdapter>,
239) -> Result<Vec<PathBuf>> {
240    use std::fs;
241
242    let mut session_files = Vec::new();
243
244    fn visit_dir(
245        dir: &Path,
246        session_id: &str,
247        provider: &Arc<ProviderAdapter>,
248        files: &mut Vec<PathBuf>,
249    ) -> Result<()> {
250        if !dir.is_dir() {
251            return Ok(());
252        }
253
254        for entry in fs::read_dir(dir)? {
255            let entry = entry?;
256            let path = entry.path();
257
258            if path.is_dir() {
259                visit_dir(&path, session_id, provider, files)?;
260            } else if provider.discovery.probe(&path).is_match()
261                && let Ok(id) = provider.discovery.extract_session_id(&path)
262                && id == session_id
263            {
264                files.push(path);
265            }
266        }
267
268        Ok(())
269    }
270
271    visit_dir(log_root, session_id, provider, &mut session_files)?;
272    Ok(session_files)
273}