agtrace_runtime/runtime/
streamer.rs

1use crate::runtime::events::{StreamEvent, WorkspaceEvent};
2use crate::{Error, Result};
3use agtrace_engine::{AgentSession, assemble_sessions};
4use agtrace_index::Database;
5use agtrace_providers::ProviderAdapter;
6use agtrace_types::AgentEvent;
7use notify::{Event, EventKind, PollWatcher, RecursiveMode, Watcher};
8use std::collections::HashMap;
9use std::path::{Path, PathBuf};
10use std::sync::mpsc::{Receiver, Sender, channel};
11use std::sync::{Arc, Mutex};
12use std::thread::JoinHandle;
13use std::time::Duration;
14
15struct StreamContext {
16    provider: Arc<ProviderAdapter>,
17    file_states: HashMap<PathBuf, usize>,
18    all_events: Vec<AgentEvent>,
19    /// Assembled sessions (main + child streams)
20    sessions: Vec<AgentSession>,
21}
22
23impl StreamContext {
24    fn new(provider: Arc<ProviderAdapter>) -> Self {
25        Self {
26            provider,
27            file_states: HashMap::new(),
28            all_events: Vec::new(),
29            sessions: Vec::new(),
30        }
31    }
32
33    fn load_all_events(&mut self, session_files: &[PathBuf]) -> Result<Vec<AgentEvent>> {
34        let mut events = Vec::new();
35
36        for path in session_files {
37            let file_events = Self::load_file(path, &self.provider)?;
38            self.file_states.insert(path.clone(), file_events.len());
39            events.extend(file_events);
40        }
41
42        events.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
43
44        self.all_events = events.clone();
45        self.sessions = assemble_sessions(&self.all_events);
46
47        Ok(events)
48    }
49
50    fn handle_change(&mut self, path: &Path) -> Result<Vec<AgentEvent>> {
51        let all_file_events = Self::load_file(path, &self.provider)?;
52        let last_count = *self.file_states.get(path).unwrap_or(&0);
53
54        if all_file_events.len() < last_count {
55            self.file_states
56                .insert(path.to_path_buf(), all_file_events.len());
57            self.all_events = all_file_events.clone();
58            self.sessions = assemble_sessions(&self.all_events);
59            return Ok(all_file_events);
60        }
61
62        let new_events: Vec<AgentEvent> = all_file_events.into_iter().skip(last_count).collect();
63
64        self.file_states
65            .insert(path.to_path_buf(), last_count + new_events.len());
66
67        self.all_events.extend(new_events.clone());
68        self.all_events
69            .sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
70        self.sessions = assemble_sessions(&self.all_events);
71
72        Ok(new_events)
73    }
74
75    fn load_file(path: &Path, provider: &Arc<ProviderAdapter>) -> Result<Vec<AgentEvent>> {
76        Ok(provider.parser.parse_file(path)?)
77    }
78}
79
80pub struct SessionStreamer {
81    _watcher: PollWatcher,
82    _handle: JoinHandle<()>,
83    rx: Receiver<WorkspaceEvent>,
84}
85
86impl SessionStreamer {
87    pub fn receiver(&self) -> &Receiver<WorkspaceEvent> {
88        &self.rx
89    }
90
91    pub fn attach(
92        session_id: String,
93        db: Arc<Mutex<Database>>,
94        provider: Arc<ProviderAdapter>,
95    ) -> Result<Self> {
96        let session_files = {
97            let db_lock = db.lock().unwrap();
98            let files = db_lock.get_session_files(&session_id)?;
99            if files.is_empty() {
100                return Err(Error::InvalidOperation(format!(
101                    "Session not found: {}",
102                    session_id
103                )));
104            }
105            files
106                .into_iter()
107                .map(|f| PathBuf::from(f.path))
108                .collect::<Vec<_>>()
109        };
110
111        Self::start_core(session_id, session_files, provider)
112    }
113
114    /// Attach to a session by scanning the filesystem for session files
115    /// This is used when the session is not yet indexed in the database
116    pub fn attach_from_filesystem(
117        session_id: String,
118        log_root: PathBuf,
119        provider: Arc<ProviderAdapter>,
120    ) -> Result<Self> {
121        let session_files = find_session_files(&log_root, &session_id, &provider)?;
122
123        if session_files.is_empty() {
124            return Err(Error::InvalidOperation(format!(
125                "No files found for session: {}",
126                session_id
127            )));
128        }
129
130        Self::start_core(session_id, session_files, provider)
131    }
132
133    fn start_core(
134        session_id: String,
135        session_files: Vec<PathBuf>,
136        provider: Arc<ProviderAdapter>,
137    ) -> Result<Self> {
138        let (tx_out, rx_out) = channel();
139        let (tx_fs, rx_fs) = channel();
140
141        let watch_dir = session_files
142            .first()
143            .and_then(|p| p.parent())
144            .ok_or_else(|| Error::InvalidOperation("Cannot determine watch directory".to_string()))?
145            .to_path_buf();
146
147        let config = notify::Config::default().with_poll_interval(Duration::from_millis(100));
148
149        let mut watcher = PollWatcher::new(
150            move |res: std::result::Result<Event, notify::Error>| {
151                if let Ok(event) = res {
152                    let _ = tx_fs.send(event);
153                }
154            },
155            config,
156        )
157        .map_err(|e| Error::InvalidOperation(format!("Failed to create file watcher: {}", e)))?;
158
159        watcher
160            .watch(&watch_dir, RecursiveMode::Recursive)
161            .map_err(|e| Error::InvalidOperation(format!("Failed to watch directory: {}", e)))?;
162
163        let tx_attached = tx_out.clone();
164        let first_file = session_files.first().cloned().unwrap();
165        let _ = tx_attached.send(WorkspaceEvent::Stream(StreamEvent::Attached {
166            session_id: session_id.clone(),
167            path: first_file.clone(),
168        }));
169
170        let mut context = StreamContext::new(provider);
171
172        if let Ok(events) = context.load_all_events(&session_files)
173            && !events.is_empty()
174        {
175            let _ = tx_out.send(WorkspaceEvent::Stream(StreamEvent::Events {
176                events: events.clone(),
177                sessions: context.sessions.clone(),
178            }));
179        }
180
181        let tx_worker = tx_out.clone();
182        let handle = std::thread::Builder::new()
183            .name("session-streamer".to_string())
184            .spawn(move || {
185                loop {
186                    match rx_fs.recv() {
187                        Ok(event) => {
188                            if let Err(e) =
189                                handle_fs_event(&event, &session_files, &mut context, &tx_worker)
190                            {
191                                let _ = tx_worker
192                                    .send(WorkspaceEvent::Error(format!("Stream error: {}", e)));
193                            }
194                        }
195                        Err(_) => {
196                            let _ =
197                                tx_worker.send(WorkspaceEvent::Stream(StreamEvent::Disconnected {
198                                    reason: "Stream ended".to_string(),
199                                }));
200                            break;
201                        }
202                    }
203                }
204            })?;
205
206        Ok(Self {
207            _watcher: watcher,
208            _handle: handle,
209            rx: rx_out,
210        })
211    }
212}
213
214fn handle_fs_event(
215    event: &Event,
216    session_files: &[PathBuf],
217    context: &mut StreamContext,
218    tx: &Sender<WorkspaceEvent>,
219) -> Result<()> {
220    if let EventKind::Modify(_) = event.kind {
221        for path in &event.paths {
222            if session_files.contains(path)
223                && let Ok(new_events) = context.handle_change(path)
224                && !new_events.is_empty()
225            {
226                let _ = tx.send(WorkspaceEvent::Stream(StreamEvent::Events {
227                    events: new_events,
228                    sessions: context.sessions.clone(),
229                }));
230            }
231        }
232    }
233    Ok(())
234}
235
236fn find_session_files(
237    log_root: &Path,
238    session_id: &str,
239    provider: &Arc<ProviderAdapter>,
240) -> Result<Vec<PathBuf>> {
241    use std::fs;
242
243    let mut session_files = Vec::new();
244
245    fn visit_dir(
246        dir: &Path,
247        session_id: &str,
248        provider: &Arc<ProviderAdapter>,
249        files: &mut Vec<PathBuf>,
250    ) -> Result<()> {
251        if !dir.is_dir() {
252            return Ok(());
253        }
254
255        for entry in fs::read_dir(dir)? {
256            let entry = entry?;
257            let path = entry.path();
258
259            if path.is_dir() {
260                visit_dir(&path, session_id, provider, files)?;
261            } else if provider.discovery.probe(&path).is_match()
262                && let Ok(id) = provider.discovery.extract_session_id(&path)
263                && id == session_id
264            {
265                files.push(path);
266            }
267        }
268
269        Ok(())
270    }
271
272    visit_dir(log_root, session_id, provider, &mut session_files)?;
273    Ok(session_files)
274}