1use crate::runtime::events::{StreamEvent, WorkspaceEvent};
2use crate::{Error, Result};
3use agtrace_engine::{AgentSession, assemble_session};
4use agtrace_index::Database;
5use agtrace_providers::ProviderAdapter;
6use agtrace_types::AgentEvent;
7use notify::{Event, EventKind, PollWatcher, RecursiveMode, Watcher};
8use std::collections::HashMap;
9use std::path::{Path, PathBuf};
10use std::sync::mpsc::{Receiver, Sender, channel};
11use std::sync::{Arc, Mutex};
12use std::thread::JoinHandle;
13use std::time::Duration;
14
15struct StreamContext {
16 provider: Arc<ProviderAdapter>,
17 file_states: HashMap<PathBuf, usize>,
18 all_events: Vec<AgentEvent>,
19 session: Option<AgentSession>,
20}
21
22impl StreamContext {
23 fn new(provider: Arc<ProviderAdapter>) -> Self {
24 Self {
25 provider,
26 file_states: HashMap::new(),
27 all_events: Vec::new(),
28 session: None,
29 }
30 }
31
32 fn load_all_events(&mut self, session_files: &[PathBuf]) -> Result<Vec<AgentEvent>> {
33 let mut events = Vec::new();
34
35 for path in session_files {
36 let file_events = Self::load_file(path, &self.provider)?;
37 self.file_states.insert(path.clone(), file_events.len());
38 events.extend(file_events);
39 }
40
41 events.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
42
43 self.all_events = events.clone();
44 self.session = assemble_session(&self.all_events);
45
46 Ok(events)
47 }
48
49 fn handle_change(&mut self, path: &Path) -> Result<Vec<AgentEvent>> {
50 let all_file_events = Self::load_file(path, &self.provider)?;
51 let last_count = *self.file_states.get(path).unwrap_or(&0);
52
53 if all_file_events.len() < last_count {
54 self.file_states
55 .insert(path.to_path_buf(), all_file_events.len());
56 self.all_events = all_file_events.clone();
57 self.session = assemble_session(&self.all_events);
58 return Ok(all_file_events);
59 }
60
61 let new_events: Vec<AgentEvent> = all_file_events.into_iter().skip(last_count).collect();
62
63 self.file_states
64 .insert(path.to_path_buf(), last_count + new_events.len());
65
66 self.all_events.extend(new_events.clone());
67 self.all_events
68 .sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
69 self.session = assemble_session(&self.all_events);
70
71 Ok(new_events)
72 }
73
74 fn load_file(path: &Path, provider: &Arc<ProviderAdapter>) -> Result<Vec<AgentEvent>> {
75 Ok(provider.parser.parse_file(path)?)
76 }
77}
78
79pub struct SessionStreamer {
80 _watcher: PollWatcher,
81 _handle: JoinHandle<()>,
82 rx: Receiver<WorkspaceEvent>,
83}
84
85impl SessionStreamer {
86 pub fn receiver(&self) -> &Receiver<WorkspaceEvent> {
87 &self.rx
88 }
89
90 pub fn attach(
91 session_id: String,
92 db: Arc<Mutex<Database>>,
93 provider: Arc<ProviderAdapter>,
94 ) -> Result<Self> {
95 let session_files = {
96 let db_lock = db.lock().unwrap();
97 let files = db_lock.get_session_files(&session_id)?;
98 if files.is_empty() {
99 return Err(Error::InvalidOperation(format!(
100 "Session not found: {}",
101 session_id
102 )));
103 }
104 files
105 .into_iter()
106 .map(|f| PathBuf::from(f.path))
107 .collect::<Vec<_>>()
108 };
109
110 Self::start_core(session_id, session_files, provider)
111 }
112
113 pub fn attach_from_filesystem(
116 session_id: String,
117 log_root: PathBuf,
118 provider: Arc<ProviderAdapter>,
119 ) -> Result<Self> {
120 let session_files = find_session_files(&log_root, &session_id, &provider)?;
121
122 if session_files.is_empty() {
123 return Err(Error::InvalidOperation(format!(
124 "No files found for session: {}",
125 session_id
126 )));
127 }
128
129 Self::start_core(session_id, session_files, provider)
130 }
131
132 fn start_core(
133 session_id: String,
134 session_files: Vec<PathBuf>,
135 provider: Arc<ProviderAdapter>,
136 ) -> Result<Self> {
137 let (tx_out, rx_out) = channel();
138 let (tx_fs, rx_fs) = channel();
139
140 let watch_dir = session_files
141 .first()
142 .and_then(|p| p.parent())
143 .ok_or_else(|| Error::InvalidOperation("Cannot determine watch directory".to_string()))?
144 .to_path_buf();
145
146 let config = notify::Config::default().with_poll_interval(Duration::from_millis(500));
147
148 let mut watcher = PollWatcher::new(
149 move |res: std::result::Result<Event, notify::Error>| {
150 if let Ok(event) = res {
151 let _ = tx_fs.send(event);
152 }
153 },
154 config,
155 )
156 .map_err(|e| Error::InvalidOperation(format!("Failed to create file watcher: {}", e)))?;
157
158 watcher
159 .watch(&watch_dir, RecursiveMode::Recursive)
160 .map_err(|e| Error::InvalidOperation(format!("Failed to watch directory: {}", e)))?;
161
162 let tx_attached = tx_out.clone();
163 let first_file = session_files.first().cloned().unwrap();
164 let _ = tx_attached.send(WorkspaceEvent::Stream(StreamEvent::Attached {
165 session_id: session_id.clone(),
166 path: first_file.clone(),
167 }));
168
169 let mut context = StreamContext::new(provider);
170
171 if let Ok(events) = context.load_all_events(&session_files)
172 && !events.is_empty()
173 {
174 let _ = tx_out.send(WorkspaceEvent::Stream(StreamEvent::Events {
175 events: events.clone(),
176 session: context.session.clone(),
177 }));
178 }
179
180 let tx_worker = tx_out.clone();
181 let handle = std::thread::Builder::new()
182 .name("session-streamer".to_string())
183 .spawn(move || {
184 loop {
185 match rx_fs.recv() {
186 Ok(event) => {
187 if let Err(e) =
188 handle_fs_event(&event, &session_files, &mut context, &tx_worker)
189 {
190 let _ = tx_worker
191 .send(WorkspaceEvent::Error(format!("Stream error: {}", e)));
192 }
193 }
194 Err(_) => {
195 let _ =
196 tx_worker.send(WorkspaceEvent::Stream(StreamEvent::Disconnected {
197 reason: "Stream ended".to_string(),
198 }));
199 break;
200 }
201 }
202 }
203 })?;
204
205 Ok(Self {
206 _watcher: watcher,
207 _handle: handle,
208 rx: rx_out,
209 })
210 }
211}
212
213fn handle_fs_event(
214 event: &Event,
215 session_files: &[PathBuf],
216 context: &mut StreamContext,
217 tx: &Sender<WorkspaceEvent>,
218) -> Result<()> {
219 if let EventKind::Modify(_) = event.kind {
220 for path in &event.paths {
221 if session_files.contains(path)
222 && let Ok(new_events) = context.handle_change(path)
223 && !new_events.is_empty()
224 {
225 let _ = tx.send(WorkspaceEvent::Stream(StreamEvent::Events {
226 events: new_events,
227 session: context.session.clone(),
228 }));
229 }
230 }
231 }
232 Ok(())
233}
234
235fn find_session_files(
236 log_root: &Path,
237 session_id: &str,
238 provider: &Arc<ProviderAdapter>,
239) -> Result<Vec<PathBuf>> {
240 use std::fs;
241
242 let mut session_files = Vec::new();
243
244 fn visit_dir(
245 dir: &Path,
246 session_id: &str,
247 provider: &Arc<ProviderAdapter>,
248 files: &mut Vec<PathBuf>,
249 ) -> Result<()> {
250 if !dir.is_dir() {
251 return Ok(());
252 }
253
254 for entry in fs::read_dir(dir)? {
255 let entry = entry?;
256 let path = entry.path();
257
258 if path.is_dir() {
259 visit_dir(&path, session_id, provider, files)?;
260 } else if provider.discovery.probe(&path).is_match()
261 && let Ok(id) = provider.discovery.extract_session_id(&path)
262 && id == session_id
263 {
264 files.push(path);
265 }
266 }
267
268 Ok(())
269 }
270
271 visit_dir(log_root, session_id, provider, &mut session_files)?;
272 Ok(session_files)
273}