1use std::collections::HashMap;
22
23use crate::GeminiConvo;
24use crate::error::Result;
25use crate::provider::{to_turn, to_view};
26use crate::types::{ChatFile, GeminiMessage};
27use toolpath_convo::WatcherEvent;
28
29#[derive(Debug, Clone, Default)]
32struct FileState {
33 seen_messages: usize,
34 tool_statuses: HashMap<String, HashMap<String, String>>,
36 summary_emitted: bool,
37 existed: bool,
38}
39
40#[derive(Debug)]
64pub struct ConversationWatcher {
65 manager: GeminiConvo,
66 project: String,
67 session_uuid: String,
68 state: HashMap<String, FileState>,
70 seen_total: usize,
72}
73
74impl ConversationWatcher {
75 pub fn new(manager: GeminiConvo, project: String, session_uuid: String) -> Self {
76 Self {
77 manager,
78 project,
79 session_uuid,
80 state: HashMap::new(),
81 seen_total: 0,
82 }
83 }
84
85 pub fn project(&self) -> &str {
86 &self.project
87 }
88
89 pub fn session_uuid(&self) -> &str {
90 &self.session_uuid
91 }
92
93 pub fn seen_count(&self) -> usize {
95 self.seen_total
96 }
97
98 pub fn reset(&mut self) {
100 self.state.clear();
101 self.seen_total = 0;
102 }
103
104 pub fn poll(&mut self) -> Result<Vec<WatcherEvent>> {
111 let mut events: Vec<WatcherEvent> = Vec::new();
112
113 let resolver = self.manager.resolver();
114 let chat_stems = resolver.list_chat_files(&self.project, &self.session_uuid)?;
115
116 if chat_stems.is_empty() {
118 return Ok(events);
119 }
120
121 let io = self.manager.io();
122
123 let mut chats: Vec<(String, ChatFile)> = Vec::with_capacity(chat_stems.len());
126 for stem in &chat_stems {
127 match io.read_chat(&self.project, &self.session_uuid, stem) {
128 Ok(chat) => chats.push((stem.clone(), chat)),
129 Err(e) => {
130 eprintln!("Warning: failed to read chat {}: {}", stem, e);
131 }
132 }
133 }
134
135 let main_idx = chats
136 .iter()
137 .position(|(_, c)| c.kind.as_deref() != Some("subagent"))
138 .unwrap_or(0);
139
140 let mut order: Vec<usize> = (0..chats.len()).collect();
143 if main_idx != 0 {
144 order.remove(main_idx);
145 order.insert(0, main_idx);
146 }
147
148 for idx in order {
149 let (stem, chat) = &chats[idx];
150 let is_subagent = chat.kind.as_deref() == Some("subagent");
151 let state = self.state.entry(stem.clone()).or_default();
152 let first_time = !state.existed;
153 state.existed = true;
154
155 if is_subagent && first_time {
158 events.push(WatcherEvent::Progress {
159 kind: "subagent_started".into(),
160 data: serde_json::json!({
161 "session_id": chat.session_id,
162 "chat_name": stem,
163 }),
164 });
165 }
166
167 for (i, msg) in chat.messages.iter().enumerate() {
169 if i < state.seen_messages {
170 continue;
171 }
172 events.push(WatcherEvent::Turn(Box::new(to_turn(msg))));
173 state
174 .tool_statuses
175 .insert(msg.id.clone(), snapshot_statuses(msg));
176 self.seen_total += 1;
177 }
178
179 let limit = state.seen_messages.min(chat.messages.len());
181 for msg in chat.messages.iter().take(limit) {
182 let current = snapshot_statuses(msg);
183 let prev = state
184 .tool_statuses
185 .get(&msg.id)
186 .cloned()
187 .unwrap_or_default();
188 if current != prev {
189 events.push(WatcherEvent::TurnUpdated(Box::new(to_turn(msg))));
190 state.tool_statuses.insert(msg.id.clone(), current);
191 }
192 }
193
194 state.seen_messages = chat.messages.len();
195
196 if is_subagent && !state.summary_emitted && chat.summary.is_some() {
198 events.push(WatcherEvent::Progress {
199 kind: "subagent_complete".into(),
200 data: serde_json::json!({
201 "session_id": chat.session_id,
202 "chat_name": stem,
203 "summary": chat.summary,
204 }),
205 });
206 state.summary_emitted = true;
207 }
208 }
209
210 Ok(events)
211 }
212
213 pub fn poll_with_view(
218 &mut self,
219 ) -> Result<(toolpath_convo::ConversationView, Vec<WatcherEvent>)> {
220 let events = self.poll()?;
221 let convo = self
222 .manager
223 .read_conversation(&self.project, &self.session_uuid)?;
224 Ok((to_view(&convo), events))
225 }
226}
227
228fn snapshot_statuses(msg: &GeminiMessage) -> HashMap<String, String> {
229 msg.tool_calls()
230 .iter()
231 .map(|t| (t.id.clone(), t.status.clone()))
232 .collect()
233}
234
235impl toolpath_convo::ConversationWatcher for ConversationWatcher {
238 fn poll(&mut self) -> toolpath_convo::Result<Vec<WatcherEvent>> {
239 ConversationWatcher::poll(self)
240 .map_err(|e| toolpath_convo::ConvoError::Provider(e.to_string()))
241 }
242
243 fn seen_count(&self) -> usize {
244 ConversationWatcher::seen_count(self)
245 }
246}
247
248#[cfg(test)]
251mod tests {
252 use super::*;
253 use crate::PathResolver;
254 use std::fs;
255 use tempfile::TempDir;
256 use toolpath_convo::{Role, WatcherEvent};
257
258 fn setup() -> (TempDir, GeminiConvo, std::path::PathBuf) {
259 let temp = TempDir::new().unwrap();
260 let gemini = temp.path().join(".gemini");
261 let session_dir = gemini.join("tmp/myrepo/chats/session-uuid");
262 fs::create_dir_all(&session_dir).unwrap();
263 fs::write(
264 gemini.join("projects.json"),
265 r#"{"projects":{"/abs/myrepo":"myrepo"}}"#,
266 )
267 .unwrap();
268 let mgr = GeminiConvo::with_resolver(PathResolver::new().with_gemini_dir(&gemini));
269 (temp, mgr, session_dir)
270 }
271
272 fn write_main(dir: &std::path::Path, body: &str) {
273 fs::write(dir.join("main.json"), body).unwrap();
274 }
275
276 #[test]
277 fn test_poll_empty_session() {
278 let (_t, mgr, _dir) = setup();
279 let mut w = ConversationWatcher::new(mgr, "/abs/myrepo".into(), "missing".into());
280 let events = w.poll().unwrap();
281 assert!(events.is_empty());
282 assert_eq!(w.seen_count(), 0);
283 }
284
285 #[test]
286 fn test_poll_first_call_returns_all() {
287 let (_t, mgr, dir) = setup();
288 write_main(
289 &dir,
290 r#"{"sessionId":"s","projectHash":"","messages":[
291 {"id":"m1","timestamp":"ts","type":"user","content":[{"text":"hi"}]},
292 {"id":"m2","timestamp":"ts","type":"gemini","content":"hello","model":"g"}
293]}"#,
294 );
295 let mut w = ConversationWatcher::new(mgr, "/abs/myrepo".into(), "session-uuid".into());
296 let events = w.poll().unwrap();
297 assert_eq!(events.len(), 2);
298 assert!(matches!(events[0], WatcherEvent::Turn(_)));
299 assert!(matches!(events[1], WatcherEvent::Turn(_)));
300 assert_eq!(w.seen_count(), 2);
301 }
302
303 #[test]
304 fn test_poll_second_call_returns_empty_when_idle() {
305 let (_t, mgr, dir) = setup();
306 write_main(
307 &dir,
308 r#"{"sessionId":"s","projectHash":"","messages":[
309 {"id":"m1","timestamp":"ts","type":"user","content":[{"text":"hi"}]}
310]}"#,
311 );
312 let mut w = ConversationWatcher::new(mgr, "/abs/myrepo".into(), "session-uuid".into());
313 let _ = w.poll().unwrap();
314 let events = w.poll().unwrap();
315 assert!(events.is_empty());
316 }
317
318 #[test]
319 fn test_poll_detects_new_messages() {
320 let (_t, mgr, dir) = setup();
321 write_main(
322 &dir,
323 r#"{"sessionId":"s","projectHash":"","messages":[
324 {"id":"m1","timestamp":"ts","type":"user","content":[{"text":"hi"}]}
325]}"#,
326 );
327 let mut w = ConversationWatcher::new(mgr, "/abs/myrepo".into(), "session-uuid".into());
328 let first = w.poll().unwrap();
329 assert_eq!(first.len(), 1);
330
331 write_main(
332 &dir,
333 r#"{"sessionId":"s","projectHash":"","messages":[
334 {"id":"m1","timestamp":"ts","type":"user","content":[{"text":"hi"}]},
335 {"id":"m2","timestamp":"ts","type":"gemini","content":"hello","model":"g"}
336]}"#,
337 );
338 let second = w.poll().unwrap();
339 assert_eq!(second.len(), 1);
340 match &second[0] {
341 WatcherEvent::Turn(t) => assert_eq!(t.text, "hello"),
342 other => panic!("expected Turn, got {:?}", std::mem::discriminant(other)),
343 }
344 }
345
346 #[test]
347 fn test_poll_detects_status_transition() {
348 let (_t, mgr, dir) = setup();
349 write_main(
351 &dir,
352 r#"{"sessionId":"s","projectHash":"","messages":[
353 {"id":"m1","timestamp":"ts","type":"gemini","content":"","toolCalls":[
354 {"id":"t1","name":"read_file","args":{},"status":"pending","timestamp":"ts"}
355 ]}
356]}"#,
357 );
358 let mut w = ConversationWatcher::new(mgr, "/abs/myrepo".into(), "session-uuid".into());
359 let first = w.poll().unwrap();
360 assert_eq!(first.len(), 1);
361 assert!(matches!(first[0], WatcherEvent::Turn(_)));
362
363 write_main(
365 &dir,
366 r#"{"sessionId":"s","projectHash":"","messages":[
367 {"id":"m1","timestamp":"ts","type":"gemini","content":"","toolCalls":[
368 {"id":"t1","name":"read_file","args":{},"status":"success","timestamp":"ts","result":[{"functionResponse":{"id":"t1","name":"read_file","response":{"output":"ok"}}}]}
369 ]}
370]}"#,
371 );
372 let second = w.poll().unwrap();
373 assert_eq!(second.len(), 1);
374 match &second[0] {
375 WatcherEvent::TurnUpdated(t) => {
376 assert_eq!(t.id, "m1");
377 assert_eq!(t.tool_uses[0].result.as_ref().unwrap().content, "ok");
378 }
379 other => panic!(
380 "expected TurnUpdated, got {:?}",
381 std::mem::discriminant(other)
382 ),
383 }
384 }
385
386 #[test]
387 fn test_poll_emits_subagent_started_and_complete() {
388 let (_t, mgr, dir) = setup();
389 write_main(&dir, r#"{"sessionId":"m","projectHash":"","messages":[]}"#);
390 let mut w = ConversationWatcher::new(mgr, "/abs/myrepo".into(), "session-uuid".into());
392 let e1 = w.poll().unwrap();
393 assert!(e1.is_empty());
394
395 fs::write(
397 dir.join("sub.json"),
398 r#"{"sessionId":"subby","projectHash":"","kind":"subagent","messages":[
399 {"id":"sx","timestamp":"ts","type":"user","content":[{"text":"go"}]}
400]}"#,
401 )
402 .unwrap();
403 let e2 = w.poll().unwrap();
404 let kinds: Vec<&str> = e2
405 .iter()
406 .filter_map(|e| match e {
407 WatcherEvent::Progress { kind, .. } => Some(kind.as_str()),
408 _ => None,
409 })
410 .collect();
411 assert!(kinds.contains(&"subagent_started"));
412 assert!(!kinds.contains(&"subagent_complete"));
413
414 fs::write(
416 dir.join("sub.json"),
417 r#"{"sessionId":"subby","projectHash":"","kind":"subagent","summary":"done","messages":[
418 {"id":"sx","timestamp":"ts","type":"user","content":[{"text":"go"}]}
419]}"#,
420 )
421 .unwrap();
422 let e3 = w.poll().unwrap();
423 let kinds: Vec<&str> = e3
424 .iter()
425 .filter_map(|e| match e {
426 WatcherEvent::Progress { kind, .. } => Some(kind.as_str()),
427 _ => None,
428 })
429 .collect();
430 assert!(kinds.contains(&"subagent_complete"));
431 }
432
433 #[test]
434 fn test_poll_preserves_role() {
435 let (_t, mgr, dir) = setup();
436 write_main(
437 &dir,
438 r#"{"sessionId":"s","projectHash":"","messages":[
439 {"id":"u","timestamp":"ts","type":"user","content":[{"text":"hi"}]},
440 {"id":"a","timestamp":"ts","type":"gemini","content":"hey"}
441]}"#,
442 );
443 let mut w = ConversationWatcher::new(mgr, "/abs/myrepo".into(), "session-uuid".into());
444 let events = w.poll().unwrap();
445 let roles: Vec<&Role> = events
446 .iter()
447 .filter_map(|e| e.as_turn().map(|t| &t.role))
448 .collect();
449 assert_eq!(roles, vec![&Role::User, &Role::Assistant]);
450 }
451
452 #[test]
453 fn test_reset_re_emits_all() {
454 let (_t, mgr, dir) = setup();
455 write_main(
456 &dir,
457 r#"{"sessionId":"s","projectHash":"","messages":[
458 {"id":"m1","timestamp":"ts","type":"user","content":[{"text":"hi"}]}
459]}"#,
460 );
461 let mut w = ConversationWatcher::new(mgr, "/abs/myrepo".into(), "session-uuid".into());
462 let _ = w.poll().unwrap();
463 w.reset();
464 let re = w.poll().unwrap();
465 assert_eq!(re.len(), 1);
466 }
467
468 #[test]
469 fn test_poll_with_view_returns_full_conversation() {
470 let (_t, mgr, dir) = setup();
471 write_main(
472 &dir,
473 r#"{"sessionId":"s","projectHash":"","messages":[
474 {"id":"m1","timestamp":"ts","type":"user","content":[{"text":"hi"}]}
475]}"#,
476 );
477 let mut w = ConversationWatcher::new(mgr, "/abs/myrepo".into(), "session-uuid".into());
478 let (view, events) = w.poll_with_view().unwrap();
479 assert_eq!(view.turns.len(), 1);
480 assert_eq!(events.len(), 1);
481 }
482
483 #[test]
484 fn test_trait_impl() {
485 let (_t, mgr, dir) = setup();
486 write_main(
487 &dir,
488 r#"{"sessionId":"s","projectHash":"","messages":[
489 {"id":"m1","timestamp":"ts","type":"user","content":[{"text":"hi"}]}
490]}"#,
491 );
492 let mut w = ConversationWatcher::new(mgr, "/abs/myrepo".into(), "session-uuid".into());
493 let events = toolpath_convo::ConversationWatcher::poll(&mut w).unwrap();
494 assert_eq!(events.len(), 1);
495 assert_eq!(toolpath_convo::ConversationWatcher::seen_count(&w), 1);
496 }
497
498 #[test]
501 fn test_project_and_session_accessors() {
502 let (_t, mgr, _dir) = setup();
503 let w = ConversationWatcher::new(mgr, "/abs/myrepo".into(), "session-uuid".into());
504 assert_eq!(w.project(), "/abs/myrepo");
505 assert_eq!(w.session_uuid(), "session-uuid");
506 }
507
508 #[test]
511 fn test_poll_detects_status_transition_to_cancelled() {
512 let (_t, mgr, dir) = setup();
513 write_main(
514 &dir,
515 r#"{"sessionId":"s","projectHash":"","messages":[
516 {"id":"m1","timestamp":"ts","type":"gemini","content":"","toolCalls":[
517 {"id":"t1","name":"run_shell_command","args":{},"status":"pending","timestamp":"ts"}
518 ]}
519]}"#,
520 );
521 let mut w = ConversationWatcher::new(mgr, "/abs/myrepo".into(), "session-uuid".into());
522 let _ = w.poll().unwrap();
523
524 write_main(
526 &dir,
527 r#"{"sessionId":"s","projectHash":"","messages":[
528 {"id":"m1","timestamp":"ts","type":"gemini","content":"","toolCalls":[
529 {"id":"t1","name":"run_shell_command","args":{},"status":"cancelled","timestamp":"ts"}
530 ]}
531]}"#,
532 );
533 let events = w.poll().unwrap();
534 assert_eq!(events.len(), 1);
535 assert!(matches!(events[0], WatcherEvent::TurnUpdated(_)));
536 }
537
538 #[test]
539 fn test_poll_no_event_when_status_unchanged() {
540 let (_t, mgr, dir) = setup();
541 write_main(
542 &dir,
543 r#"{"sessionId":"s","projectHash":"","messages":[
544 {"id":"m1","timestamp":"ts","type":"gemini","content":"done","toolCalls":[
545 {"id":"t1","name":"read_file","args":{},"status":"success","timestamp":"ts"}
546 ]}
547]}"#,
548 );
549 let mut w = ConversationWatcher::new(mgr, "/abs/myrepo".into(), "session-uuid".into());
550 let _ = w.poll().unwrap();
551
552 let events = w.poll().unwrap();
554 assert!(events.is_empty());
555 }
556
557 #[test]
560 fn test_subagent_added_after_non_empty_main() {
561 let (_t, mgr, dir) = setup();
562 write_main(
563 &dir,
564 r#"{"sessionId":"m","projectHash":"","messages":[
565 {"id":"u1","timestamp":"ts","type":"user","content":[{"text":"hi"}]}
566]}"#,
567 );
568 let mut w = ConversationWatcher::new(mgr, "/abs/myrepo".into(), "session-uuid".into());
569 let events = w.poll().unwrap();
570 assert_eq!(events.len(), 1);
571 assert!(
573 !events
574 .iter()
575 .any(|e| matches!(e, WatcherEvent::Progress { .. }))
576 );
577
578 fs::write(
580 dir.join("helper.json"),
581 r#"{"sessionId":"helper","projectHash":"","kind":"subagent","messages":[
582 {"id":"h1","timestamp":"ts","type":"user","content":[{"text":"search"}]}
583]}"#,
584 )
585 .unwrap();
586 let events = w.poll().unwrap();
587 let has_started = events.iter().any(
588 |e| matches!(e, WatcherEvent::Progress { kind, .. } if kind == "subagent_started"),
589 );
590 let has_turn = events.iter().any(|e| matches!(e, WatcherEvent::Turn(_)));
591 assert!(
592 has_started,
593 "expected subagent_started, got {:?}",
594 events.len()
595 );
596 assert!(has_turn, "expected the sub-agent's first turn");
597 }
598
599 #[test]
600 fn test_subagent_complete_emitted_separately_from_started() {
601 let (_t, mgr, dir) = setup();
605 write_main(&dir, r#"{"sessionId":"m","projectHash":"","messages":[]}"#);
606 let mut w = ConversationWatcher::new(mgr, "/abs/myrepo".into(), "session-uuid".into());
607 let _ = w.poll().unwrap();
608
609 fs::write(
610 dir.join("sub.json"),
611 r#"{"sessionId":"s","projectHash":"","kind":"subagent","messages":[]}"#,
612 )
613 .unwrap();
614 let e1 = w.poll().unwrap();
615 let started_count = e1
616 .iter()
617 .filter(
618 |e| matches!(e, WatcherEvent::Progress { kind, .. } if kind == "subagent_started"),
619 )
620 .count();
621 let complete_count = e1
622 .iter()
623 .filter(
624 |e| matches!(e, WatcherEvent::Progress { kind, .. } if kind == "subagent_complete"),
625 )
626 .count();
627 assert_eq!(started_count, 1);
628 assert_eq!(complete_count, 0);
629
630 fs::write(
632 dir.join("sub.json"),
633 r#"{"sessionId":"s","projectHash":"","kind":"subagent","summary":"done","messages":[]}"#,
634 )
635 .unwrap();
636 let e2 = w.poll().unwrap();
637 let started_count = e2
639 .iter()
640 .filter(
641 |e| matches!(e, WatcherEvent::Progress { kind, .. } if kind == "subagent_started"),
642 )
643 .count();
644 let complete_count = e2
645 .iter()
646 .filter(
647 |e| matches!(e, WatcherEvent::Progress { kind, .. } if kind == "subagent_complete"),
648 )
649 .count();
650 assert_eq!(started_count, 0);
651 assert_eq!(complete_count, 1);
652 }
653
654 #[test]
657 fn test_poll_preserves_unknown_role() {
658 let (_t, mgr, dir) = setup();
659 write_main(
660 &dir,
661 r#"{"sessionId":"s","projectHash":"","messages":[
662 {"id":"m1","timestamp":"ts","type":"plan","content":"planning..."}
663]}"#,
664 );
665 let mut w = ConversationWatcher::new(mgr, "/abs/myrepo".into(), "session-uuid".into());
666 let events = w.poll().unwrap();
667 assert_eq!(events.len(), 1);
668 match &events[0] {
669 WatcherEvent::Turn(t) => {
670 assert!(matches!(t.role, Role::Other(ref s) if s == "plan"));
672 }
673 other => panic!("expected Turn, got {:?}", std::mem::discriminant(other)),
674 }
675 }
676}