Skip to main content

agtrace_runtime/runtime/
streamer.rs

1use crate::runtime::events::{StreamEvent, WorkspaceEvent};
2use crate::{Error, Result};
3use agtrace_engine::{AgentSession, assemble_sessions};
4use agtrace_index::Database;
5use agtrace_providers::ProviderAdapter;
6use agtrace_types::AgentEvent;
7use notify::{Event, EventKind, PollWatcher, RecursiveMode, Watcher};
8use std::collections::HashMap;
9use std::path::{Path, PathBuf};
10use std::sync::mpsc::{Receiver, Sender, channel};
11use std::sync::{Arc, Mutex};
12use std::thread::JoinHandle;
13use std::time::Duration;
14
15struct StreamContext {
16    provider: Arc<ProviderAdapter>,
17    /// Events per file, preserving file-internal order
18    file_events: HashMap<PathBuf, Vec<AgentEvent>>,
19    /// Assembled sessions (main + child streams)
20    sessions: Vec<AgentSession>,
21}
22
23impl StreamContext {
24    fn new(provider: Arc<ProviderAdapter>) -> Self {
25        Self {
26            provider,
27            file_events: HashMap::new(),
28            sessions: Vec::new(),
29        }
30    }
31
32    fn load_all_events(&mut self, session_files: &[PathBuf]) -> Result<Vec<AgentEvent>> {
33        for path in session_files {
34            let events = Self::load_file(path, &self.provider)?;
35            self.file_events.insert(path.clone(), events);
36        }
37
38        let all_events = self.merge_all_events();
39        self.sessions = assemble_sessions(&all_events);
40
41        Ok(all_events)
42    }
43
44    fn handle_change(&mut self, path: &Path) -> Result<Vec<AgentEvent>> {
45        let all_file_events = Self::load_file(path, &self.provider)?;
46        let last_count = self.file_events.get(path).map(|e| e.len()).unwrap_or(0);
47
48        // Determine new events for the return value
49        let new_events: Vec<AgentEvent> = if all_file_events.len() >= last_count {
50            all_file_events.iter().skip(last_count).cloned().collect()
51        } else {
52            // File shrunk (e.g., log rotation) - treat all events as new
53            all_file_events.clone()
54        };
55
56        // Replace the entire file's events to preserve correct ordering
57        // This fixes the bug where extend + sort would break event ordering
58        // for events with identical timestamps (e.g., ToolCall before ToolResult)
59        self.file_events.insert(path.to_path_buf(), all_file_events);
60
61        // Rebuild all_events from all files and reassemble sessions
62        let all_events = self.merge_all_events();
63        self.sessions = assemble_sessions(&all_events);
64
65        Ok(new_events)
66    }
67
68    /// Merge events from all files, sorting by timestamp while preserving
69    /// file-internal order for events with identical timestamps
70    fn merge_all_events(&self) -> Vec<AgentEvent> {
71        let mut all_events: Vec<AgentEvent> =
72            self.file_events.values().flatten().cloned().collect();
73        // Stable sort preserves file-internal order for same-timestamp events
74        all_events.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
75        all_events
76    }
77
78    fn load_file(path: &Path, provider: &Arc<ProviderAdapter>) -> Result<Vec<AgentEvent>> {
79        Ok(provider.parser.parse_file(path)?)
80    }
81}
82
83pub struct SessionStreamer {
84    _watcher: PollWatcher,
85    _handle: JoinHandle<()>,
86    rx: Receiver<WorkspaceEvent>,
87}
88
89impl SessionStreamer {
90    pub fn receiver(&self) -> &Receiver<WorkspaceEvent> {
91        &self.rx
92    }
93
94    pub fn attach(
95        session_id: String,
96        db: Arc<Mutex<Database>>,
97        provider: Arc<ProviderAdapter>,
98    ) -> Result<Self> {
99        let session_files = {
100            let db_lock = db.lock().unwrap();
101            let files = db_lock.get_session_files(&session_id)?;
102            if files.is_empty() {
103                return Err(Error::InvalidOperation(format!(
104                    "Session not found: {}",
105                    session_id
106                )));
107            }
108            files
109                .into_iter()
110                .map(|f| PathBuf::from(f.path))
111                .collect::<Vec<_>>()
112        };
113
114        Self::start_core(session_id, session_files, provider)
115    }
116
117    /// Attach to a session by scanning the filesystem for session files
118    /// This is used when the session is not yet indexed in the database
119    pub fn attach_from_filesystem(
120        session_id: String,
121        log_root: PathBuf,
122        provider: Arc<ProviderAdapter>,
123    ) -> Result<Self> {
124        let session_files = find_session_files(&log_root, &session_id, &provider)?;
125
126        if session_files.is_empty() {
127            return Err(Error::InvalidOperation(format!(
128                "No files found for session: {}",
129                session_id
130            )));
131        }
132
133        Self::start_core(session_id, session_files, provider)
134    }
135
136    fn start_core(
137        session_id: String,
138        session_files: Vec<PathBuf>,
139        provider: Arc<ProviderAdapter>,
140    ) -> Result<Self> {
141        let (tx_out, rx_out) = channel();
142        let (tx_fs, rx_fs) = channel();
143
144        let watch_dir = session_files
145            .first()
146            .and_then(|p| p.parent())
147            .ok_or_else(|| Error::InvalidOperation("Cannot determine watch directory".to_string()))?
148            .to_path_buf();
149
150        let config = notify::Config::default().with_poll_interval(Duration::from_millis(100));
151
152        let mut watcher = PollWatcher::new(
153            move |res: std::result::Result<Event, notify::Error>| {
154                if let Ok(event) = res {
155                    let _ = tx_fs.send(event);
156                }
157            },
158            config,
159        )
160        .map_err(|e| Error::InvalidOperation(format!("Failed to create file watcher: {}", e)))?;
161
162        watcher
163            .watch(&watch_dir, RecursiveMode::Recursive)
164            .map_err(|e| Error::InvalidOperation(format!("Failed to watch directory: {}", e)))?;
165
166        let tx_attached = tx_out.clone();
167        let first_file = session_files.first().cloned().unwrap();
168        let _ = tx_attached.send(WorkspaceEvent::Stream(StreamEvent::Attached {
169            session_id: session_id.clone(),
170            path: first_file.clone(),
171        }));
172
173        let mut context = StreamContext::new(provider);
174
175        if let Ok(events) = context.load_all_events(&session_files)
176            && !events.is_empty()
177        {
178            let _ = tx_out.send(WorkspaceEvent::Stream(StreamEvent::Events {
179                events: events.clone(),
180                sessions: context.sessions.clone(),
181            }));
182        }
183
184        let tx_worker = tx_out.clone();
185        let handle = std::thread::Builder::new()
186            .name("session-streamer".to_string())
187            .spawn(move || {
188                loop {
189                    match rx_fs.recv() {
190                        Ok(event) => {
191                            if let Err(e) =
192                                handle_fs_event(&event, &session_files, &mut context, &tx_worker)
193                            {
194                                let _ = tx_worker
195                                    .send(WorkspaceEvent::Error(format!("Stream error: {}", e)));
196                            }
197                        }
198                        Err(_) => {
199                            let _ =
200                                tx_worker.send(WorkspaceEvent::Stream(StreamEvent::Disconnected {
201                                    reason: "Stream ended".to_string(),
202                                }));
203                            break;
204                        }
205                    }
206                }
207            })?;
208
209        Ok(Self {
210            _watcher: watcher,
211            _handle: handle,
212            rx: rx_out,
213        })
214    }
215}
216
217fn handle_fs_event(
218    event: &Event,
219    session_files: &[PathBuf],
220    context: &mut StreamContext,
221    tx: &Sender<WorkspaceEvent>,
222) -> Result<()> {
223    if let EventKind::Modify(_) = event.kind {
224        for path in &event.paths {
225            if session_files.contains(path)
226                && let Ok(new_events) = context.handle_change(path)
227                && !new_events.is_empty()
228            {
229                let _ = tx.send(WorkspaceEvent::Stream(StreamEvent::Events {
230                    events: new_events,
231                    sessions: context.sessions.clone(),
232                }));
233            }
234        }
235    }
236    Ok(())
237}
238
239fn find_session_files(
240    log_root: &Path,
241    session_id: &str,
242    provider: &Arc<ProviderAdapter>,
243) -> Result<Vec<PathBuf>> {
244    use std::fs;
245
246    let mut session_files = Vec::new();
247
248    fn visit_dir(
249        dir: &Path,
250        session_id: &str,
251        provider: &Arc<ProviderAdapter>,
252        files: &mut Vec<PathBuf>,
253    ) -> Result<()> {
254        if !dir.is_dir() {
255            return Ok(());
256        }
257
258        for entry in fs::read_dir(dir)? {
259            let entry = entry?;
260            let path = entry.path();
261
262            if path.is_dir() {
263                visit_dir(&path, session_id, provider, files)?;
264            } else if provider.discovery.probe(&path).is_match()
265                && let Ok(id) = provider.discovery.extract_session_id(&path)
266                && id == session_id
267            {
268                files.push(path);
269            }
270        }
271
272        Ok(())
273    }
274
275    visit_dir(log_root, session_id, provider, &mut session_files)?;
276    Ok(session_files)
277}