1use crate::runtime::events::{StreamEvent, WorkspaceEvent};
2use crate::{Error, Result};
3use agtrace_engine::{AgentSession, assemble_sessions};
4use agtrace_index::Database;
5use agtrace_providers::ProviderAdapter;
6use agtrace_types::AgentEvent;
7use notify::{Event, EventKind, PollWatcher, RecursiveMode, Watcher};
8use std::collections::HashMap;
9use std::path::{Path, PathBuf};
10use std::sync::mpsc::{Receiver, Sender, channel};
11use std::sync::{Arc, Mutex};
12use std::thread::JoinHandle;
13use std::time::Duration;
14
15struct StreamContext {
16 provider: Arc<ProviderAdapter>,
17 file_states: HashMap<PathBuf, usize>,
18 all_events: Vec<AgentEvent>,
19 sessions: Vec<AgentSession>,
21}
22
23impl StreamContext {
24 fn new(provider: Arc<ProviderAdapter>) -> Self {
25 Self {
26 provider,
27 file_states: HashMap::new(),
28 all_events: Vec::new(),
29 sessions: Vec::new(),
30 }
31 }
32
33 fn load_all_events(&mut self, session_files: &[PathBuf]) -> Result<Vec<AgentEvent>> {
34 let mut events = Vec::new();
35
36 for path in session_files {
37 let file_events = Self::load_file(path, &self.provider)?;
38 self.file_states.insert(path.clone(), file_events.len());
39 events.extend(file_events);
40 }
41
42 events.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
43
44 self.all_events = events.clone();
45 self.sessions = assemble_sessions(&self.all_events);
46
47 Ok(events)
48 }
49
50 fn handle_change(&mut self, path: &Path) -> Result<Vec<AgentEvent>> {
51 let all_file_events = Self::load_file(path, &self.provider)?;
52 let last_count = *self.file_states.get(path).unwrap_or(&0);
53
54 if all_file_events.len() < last_count {
55 self.file_states
56 .insert(path.to_path_buf(), all_file_events.len());
57 self.all_events = all_file_events.clone();
58 self.sessions = assemble_sessions(&self.all_events);
59 return Ok(all_file_events);
60 }
61
62 let new_events: Vec<AgentEvent> = all_file_events.into_iter().skip(last_count).collect();
63
64 self.file_states
65 .insert(path.to_path_buf(), last_count + new_events.len());
66
67 self.all_events.extend(new_events.clone());
68 self.all_events
69 .sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
70 self.sessions = assemble_sessions(&self.all_events);
71
72 Ok(new_events)
73 }
74
75 fn load_file(path: &Path, provider: &Arc<ProviderAdapter>) -> Result<Vec<AgentEvent>> {
76 Ok(provider.parser.parse_file(path)?)
77 }
78}
79
80pub struct SessionStreamer {
81 _watcher: PollWatcher,
82 _handle: JoinHandle<()>,
83 rx: Receiver<WorkspaceEvent>,
84}
85
86impl SessionStreamer {
87 pub fn receiver(&self) -> &Receiver<WorkspaceEvent> {
88 &self.rx
89 }
90
91 pub fn attach(
92 session_id: String,
93 db: Arc<Mutex<Database>>,
94 provider: Arc<ProviderAdapter>,
95 ) -> Result<Self> {
96 let session_files = {
97 let db_lock = db.lock().unwrap();
98 let files = db_lock.get_session_files(&session_id)?;
99 if files.is_empty() {
100 return Err(Error::InvalidOperation(format!(
101 "Session not found: {}",
102 session_id
103 )));
104 }
105 files
106 .into_iter()
107 .map(|f| PathBuf::from(f.path))
108 .collect::<Vec<_>>()
109 };
110
111 Self::start_core(session_id, session_files, provider)
112 }
113
114 pub fn attach_from_filesystem(
117 session_id: String,
118 log_root: PathBuf,
119 provider: Arc<ProviderAdapter>,
120 ) -> Result<Self> {
121 let session_files = find_session_files(&log_root, &session_id, &provider)?;
122
123 if session_files.is_empty() {
124 return Err(Error::InvalidOperation(format!(
125 "No files found for session: {}",
126 session_id
127 )));
128 }
129
130 Self::start_core(session_id, session_files, provider)
131 }
132
133 fn start_core(
134 session_id: String,
135 session_files: Vec<PathBuf>,
136 provider: Arc<ProviderAdapter>,
137 ) -> Result<Self> {
138 let (tx_out, rx_out) = channel();
139 let (tx_fs, rx_fs) = channel();
140
141 let watch_dir = session_files
142 .first()
143 .and_then(|p| p.parent())
144 .ok_or_else(|| Error::InvalidOperation("Cannot determine watch directory".to_string()))?
145 .to_path_buf();
146
147 let config = notify::Config::default().with_poll_interval(Duration::from_millis(100));
148
149 let mut watcher = PollWatcher::new(
150 move |res: std::result::Result<Event, notify::Error>| {
151 if let Ok(event) = res {
152 let _ = tx_fs.send(event);
153 }
154 },
155 config,
156 )
157 .map_err(|e| Error::InvalidOperation(format!("Failed to create file watcher: {}", e)))?;
158
159 watcher
160 .watch(&watch_dir, RecursiveMode::Recursive)
161 .map_err(|e| Error::InvalidOperation(format!("Failed to watch directory: {}", e)))?;
162
163 let tx_attached = tx_out.clone();
164 let first_file = session_files.first().cloned().unwrap();
165 let _ = tx_attached.send(WorkspaceEvent::Stream(StreamEvent::Attached {
166 session_id: session_id.clone(),
167 path: first_file.clone(),
168 }));
169
170 let mut context = StreamContext::new(provider);
171
172 if let Ok(events) = context.load_all_events(&session_files)
173 && !events.is_empty()
174 {
175 let _ = tx_out.send(WorkspaceEvent::Stream(StreamEvent::Events {
176 events: events.clone(),
177 sessions: context.sessions.clone(),
178 }));
179 }
180
181 let tx_worker = tx_out.clone();
182 let handle = std::thread::Builder::new()
183 .name("session-streamer".to_string())
184 .spawn(move || {
185 loop {
186 match rx_fs.recv() {
187 Ok(event) => {
188 if let Err(e) =
189 handle_fs_event(&event, &session_files, &mut context, &tx_worker)
190 {
191 let _ = tx_worker
192 .send(WorkspaceEvent::Error(format!("Stream error: {}", e)));
193 }
194 }
195 Err(_) => {
196 let _ =
197 tx_worker.send(WorkspaceEvent::Stream(StreamEvent::Disconnected {
198 reason: "Stream ended".to_string(),
199 }));
200 break;
201 }
202 }
203 }
204 })?;
205
206 Ok(Self {
207 _watcher: watcher,
208 _handle: handle,
209 rx: rx_out,
210 })
211 }
212}
213
214fn handle_fs_event(
215 event: &Event,
216 session_files: &[PathBuf],
217 context: &mut StreamContext,
218 tx: &Sender<WorkspaceEvent>,
219) -> Result<()> {
220 if let EventKind::Modify(_) = event.kind {
221 for path in &event.paths {
222 if session_files.contains(path)
223 && let Ok(new_events) = context.handle_change(path)
224 && !new_events.is_empty()
225 {
226 let _ = tx.send(WorkspaceEvent::Stream(StreamEvent::Events {
227 events: new_events,
228 sessions: context.sessions.clone(),
229 }));
230 }
231 }
232 }
233 Ok(())
234}
235
236fn find_session_files(
237 log_root: &Path,
238 session_id: &str,
239 provider: &Arc<ProviderAdapter>,
240) -> Result<Vec<PathBuf>> {
241 use std::fs;
242
243 let mut session_files = Vec::new();
244
245 fn visit_dir(
246 dir: &Path,
247 session_id: &str,
248 provider: &Arc<ProviderAdapter>,
249 files: &mut Vec<PathBuf>,
250 ) -> Result<()> {
251 if !dir.is_dir() {
252 return Ok(());
253 }
254
255 for entry in fs::read_dir(dir)? {
256 let entry = entry?;
257 let path = entry.path();
258
259 if path.is_dir() {
260 visit_dir(&path, session_id, provider, files)?;
261 } else if provider.discovery.probe(&path).is_match()
262 && let Ok(id) = provider.discovery.extract_session_id(&path)
263 && id == session_id
264 {
265 files.push(path);
266 }
267 }
268
269 Ok(())
270 }
271
272 visit_dir(log_root, session_id, provider, &mut session_files)?;
273 Ok(session_files)
274}