agtrace_runtime/runtime/
streamer.rs1use crate::runtime::events::{StreamEvent, WorkspaceEvent};
2use crate::{Error, Result};
3use agtrace_engine::{AgentSession, assemble_sessions};
4use agtrace_index::Database;
5use agtrace_providers::ProviderAdapter;
6use agtrace_types::AgentEvent;
7use notify::{Event, EventKind, PollWatcher, RecursiveMode, Watcher};
8use std::collections::HashMap;
9use std::path::{Path, PathBuf};
10use std::sync::mpsc::{Receiver, Sender, channel};
11use std::sync::{Arc, Mutex};
12use std::thread::JoinHandle;
13use std::time::Duration;
14
15struct StreamContext {
16 provider: Arc<ProviderAdapter>,
17 file_events: HashMap<PathBuf, Vec<AgentEvent>>,
19 sessions: Vec<AgentSession>,
21}
22
23impl StreamContext {
24 fn new(provider: Arc<ProviderAdapter>) -> Self {
25 Self {
26 provider,
27 file_events: HashMap::new(),
28 sessions: Vec::new(),
29 }
30 }
31
32 fn load_all_events(&mut self, session_files: &[PathBuf]) -> Result<Vec<AgentEvent>> {
33 for path in session_files {
34 let events = Self::load_file(path, &self.provider)?;
35 self.file_events.insert(path.clone(), events);
36 }
37
38 let all_events = self.merge_all_events();
39 self.sessions = assemble_sessions(&all_events);
40
41 Ok(all_events)
42 }
43
44 fn handle_change(&mut self, path: &Path) -> Result<Vec<AgentEvent>> {
45 let all_file_events = Self::load_file(path, &self.provider)?;
46 let last_count = self.file_events.get(path).map(|e| e.len()).unwrap_or(0);
47
48 let new_events: Vec<AgentEvent> = if all_file_events.len() >= last_count {
50 all_file_events.iter().skip(last_count).cloned().collect()
51 } else {
52 all_file_events.clone()
54 };
55
56 self.file_events.insert(path.to_path_buf(), all_file_events);
60
61 let all_events = self.merge_all_events();
63 self.sessions = assemble_sessions(&all_events);
64
65 Ok(new_events)
66 }
67
68 fn merge_all_events(&self) -> Vec<AgentEvent> {
71 let mut all_events: Vec<AgentEvent> =
72 self.file_events.values().flatten().cloned().collect();
73 all_events.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
75 all_events
76 }
77
78 fn load_file(path: &Path, provider: &Arc<ProviderAdapter>) -> Result<Vec<AgentEvent>> {
79 Ok(provider.parser.parse_file(path)?)
80 }
81}
82
83pub struct SessionStreamer {
84 _watcher: PollWatcher,
85 _handle: JoinHandle<()>,
86 rx: Receiver<WorkspaceEvent>,
87}
88
89impl SessionStreamer {
90 pub fn receiver(&self) -> &Receiver<WorkspaceEvent> {
91 &self.rx
92 }
93
94 pub fn attach(
95 session_id: String,
96 db: Arc<Mutex<Database>>,
97 provider: Arc<ProviderAdapter>,
98 ) -> Result<Self> {
99 let session_files = {
100 let db_lock = db.lock().unwrap();
101 let files = db_lock.get_session_files(&session_id)?;
102 if files.is_empty() {
103 return Err(Error::InvalidOperation(format!(
104 "Session not found: {}",
105 session_id
106 )));
107 }
108 files
109 .into_iter()
110 .map(|f| PathBuf::from(f.path))
111 .collect::<Vec<_>>()
112 };
113
114 Self::start_core(session_id, session_files, provider)
115 }
116
117 pub fn attach_from_filesystem(
120 session_id: String,
121 log_root: PathBuf,
122 provider: Arc<ProviderAdapter>,
123 ) -> Result<Self> {
124 let session_files = find_session_files(&log_root, &session_id, &provider)?;
125
126 if session_files.is_empty() {
127 return Err(Error::InvalidOperation(format!(
128 "No files found for session: {}",
129 session_id
130 )));
131 }
132
133 Self::start_core(session_id, session_files, provider)
134 }
135
136 fn start_core(
137 session_id: String,
138 session_files: Vec<PathBuf>,
139 provider: Arc<ProviderAdapter>,
140 ) -> Result<Self> {
141 let (tx_out, rx_out) = channel();
142 let (tx_fs, rx_fs) = channel();
143
144 let watch_dir = session_files
145 .first()
146 .and_then(|p| p.parent())
147 .ok_or_else(|| Error::InvalidOperation("Cannot determine watch directory".to_string()))?
148 .to_path_buf();
149
150 let config = notify::Config::default().with_poll_interval(Duration::from_millis(100));
151
152 let mut watcher = PollWatcher::new(
153 move |res: std::result::Result<Event, notify::Error>| {
154 if let Ok(event) = res {
155 let _ = tx_fs.send(event);
156 }
157 },
158 config,
159 )
160 .map_err(|e| Error::InvalidOperation(format!("Failed to create file watcher: {}", e)))?;
161
162 watcher
163 .watch(&watch_dir, RecursiveMode::Recursive)
164 .map_err(|e| Error::InvalidOperation(format!("Failed to watch directory: {}", e)))?;
165
166 let tx_attached = tx_out.clone();
167 let first_file = session_files.first().cloned().unwrap();
168 let _ = tx_attached.send(WorkspaceEvent::Stream(StreamEvent::Attached {
169 session_id: session_id.clone(),
170 path: first_file.clone(),
171 }));
172
173 let mut context = StreamContext::new(provider);
174
175 if let Ok(events) = context.load_all_events(&session_files)
176 && !events.is_empty()
177 {
178 let _ = tx_out.send(WorkspaceEvent::Stream(StreamEvent::Events {
179 events: events.clone(),
180 sessions: context.sessions.clone(),
181 }));
182 }
183
184 let tx_worker = tx_out.clone();
185 let handle = std::thread::Builder::new()
186 .name("session-streamer".to_string())
187 .spawn(move || {
188 loop {
189 match rx_fs.recv() {
190 Ok(event) => {
191 if let Err(e) =
192 handle_fs_event(&event, &session_files, &mut context, &tx_worker)
193 {
194 let _ = tx_worker
195 .send(WorkspaceEvent::Error(format!("Stream error: {}", e)));
196 }
197 }
198 Err(_) => {
199 let _ =
200 tx_worker.send(WorkspaceEvent::Stream(StreamEvent::Disconnected {
201 reason: "Stream ended".to_string(),
202 }));
203 break;
204 }
205 }
206 }
207 })?;
208
209 Ok(Self {
210 _watcher: watcher,
211 _handle: handle,
212 rx: rx_out,
213 })
214 }
215}
216
217fn handle_fs_event(
218 event: &Event,
219 session_files: &[PathBuf],
220 context: &mut StreamContext,
221 tx: &Sender<WorkspaceEvent>,
222) -> Result<()> {
223 if let EventKind::Modify(_) = event.kind {
224 for path in &event.paths {
225 if session_files.contains(path)
226 && let Ok(new_events) = context.handle_change(path)
227 && !new_events.is_empty()
228 {
229 let _ = tx.send(WorkspaceEvent::Stream(StreamEvent::Events {
230 events: new_events,
231 sessions: context.sessions.clone(),
232 }));
233 }
234 }
235 }
236 Ok(())
237}
238
239fn find_session_files(
240 log_root: &Path,
241 session_id: &str,
242 provider: &Arc<ProviderAdapter>,
243) -> Result<Vec<PathBuf>> {
244 use std::fs;
245
246 let mut session_files = Vec::new();
247
248 fn visit_dir(
249 dir: &Path,
250 session_id: &str,
251 provider: &Arc<ProviderAdapter>,
252 files: &mut Vec<PathBuf>,
253 ) -> Result<()> {
254 if !dir.is_dir() {
255 return Ok(());
256 }
257
258 for entry in fs::read_dir(dir)? {
259 let entry = entry?;
260 let path = entry.path();
261
262 if path.is_dir() {
263 visit_dir(&path, session_id, provider, files)?;
264 } else if provider.discovery.probe(&path).is_match()
265 && let Ok(id) = provider.discovery.extract_session_id(&path)
266 && id == session_id
267 {
268 files.push(path);
269 }
270 }
271
272 Ok(())
273 }
274
275 visit_dir(log_root, session_id, provider, &mut session_files)?;
276 Ok(session_files)
277}