Skip to main content

toolpath_gemini/
watcher.rs

1//! Polling-based watcher for Gemini CLI conversation files.
2//!
3//! Gemini rewrites the chat JSON on every turn rather than appending,
4//! so there's no byte-offset tail to follow. Instead, each `poll()`:
5//!
6//! 1. Re-parses the main chat file and every sibling sub-agent file.
7//! 2. Diffs `messages[]` length against the last-seen per-file state,
8//!    emitting `WatcherEvent::Turn` for any new messages.
9//! 3. Checks `toolCalls[].status` transitions on previously-emitted
10//!    messages and emits `WatcherEvent::TurnUpdated` when a status
11//!    flips (e.g. `pending` → `success`).
12//! 4. Detects new sub-agent chat files and emits
13//!    `WatcherEvent::Progress { kind: "subagent_started" }`; when a
14//!    sub-agent's `summary` appears, emits
15//!    `WatcherEvent::Progress { kind: "subagent_complete" }`.
16//!
17//! The watcher is deliberately polling-based in v1 — consumers drive
18//! the cadence. A push-based `notify`-powered async variant can layer
19//! on top later.
20
21use std::collections::HashMap;
22
23use crate::GeminiConvo;
24use crate::error::Result;
25use crate::provider::{to_turn, to_view};
26use crate::types::{ChatFile, GeminiMessage};
27use toolpath_convo::WatcherEvent;
28
29/// Per-file state: how many messages we've emitted and a snapshot of
30/// tool-call statuses for cheap diff on the next poll.
31#[derive(Debug, Clone, Default)]
32struct FileState {
33    seen_messages: usize,
34    /// `message.id` → (tool_call.id → status)
35    tool_statuses: HashMap<String, HashMap<String, String>>,
36    summary_emitted: bool,
37    existed: bool,
38}
39
40/// Watches a Gemini conversation for new turns and tool-call updates.
41///
42/// # Example
43///
44/// ```rust,no_run
45/// use toolpath_gemini::{GeminiConvo, ConversationWatcher};
46/// use toolpath_convo::WatcherEvent;
47///
48/// let manager = GeminiConvo::new();
49/// let mut watcher = ConversationWatcher::new(
50///     manager,
51///     "/path/to/project".to_string(),
52///     "session-uuid".to_string(),
53/// );
54/// let events = watcher.poll().unwrap();
55/// for event in events {
56///     match event {
57///         WatcherEvent::Turn(t) => println!("new: {}", t.text),
58///         WatcherEvent::TurnUpdated(t) => println!("updated: {}", t.id),
59///         WatcherEvent::Progress { kind, data } => println!("{}: {}", kind, data),
60///     }
61/// }
62/// ```
63#[derive(Debug)]
64pub struct ConversationWatcher {
65    manager: GeminiConvo,
66    project: String,
67    session_uuid: String,
68    /// Per-chat-file state, keyed by file stem.
69    state: HashMap<String, FileState>,
70    /// Total number of turns (across all files) we've emitted.
71    seen_total: usize,
72}
73
74impl ConversationWatcher {
75    pub fn new(manager: GeminiConvo, project: String, session_uuid: String) -> Self {
76        Self {
77            manager,
78            project,
79            session_uuid,
80            state: HashMap::new(),
81            seen_total: 0,
82        }
83    }
84
85    pub fn project(&self) -> &str {
86        &self.project
87    }
88
89    pub fn session_uuid(&self) -> &str {
90        &self.session_uuid
91    }
92
93    /// Total turns emitted so far across all chat files in this session.
94    pub fn seen_count(&self) -> usize {
95        self.seen_total
96    }
97
98    /// Reset state so the next poll returns all messages from scratch.
99    pub fn reset(&mut self) {
100        self.state.clear();
101        self.seen_total = 0;
102    }
103
104    /// Poll for new events.
105    ///
106    /// On the first call, emits one `Turn` event per existing message,
107    /// plus one `subagent_started`/`subagent_complete` per pre-existing
108    /// sub-agent file. Subsequent calls emit only new messages, status
109    /// updates, and new sub-agent files.
110    pub fn poll(&mut self) -> Result<Vec<WatcherEvent>> {
111        let mut events: Vec<WatcherEvent> = Vec::new();
112
113        let resolver = self.manager.resolver();
114        let chat_stems = resolver.list_chat_files(&self.project, &self.session_uuid)?;
115
116        // If the session dir doesn't exist yet, emit nothing.
117        if chat_stems.is_empty() {
118            return Ok(events);
119        }
120
121        let io = self.manager.io();
122
123        // Determine main vs sub-agent up-front so we can emit stable
124        // sub-agent Progress events.
125        let mut chats: Vec<(String, ChatFile)> = Vec::with_capacity(chat_stems.len());
126        for stem in &chat_stems {
127            match io.read_chat(&self.project, &self.session_uuid, stem) {
128                Ok(chat) => chats.push((stem.clone(), chat)),
129                Err(e) => {
130                    eprintln!("Warning: failed to read chat {}: {}", stem, e);
131                }
132            }
133        }
134
135        let main_idx = chats
136            .iter()
137            .position(|(_, c)| c.kind.as_deref() != Some("subagent"))
138            .unwrap_or(0);
139
140        // Process the main chat first so its `Turn`s land before the
141        // sub-agents' (matching the batch-load order).
142        let mut order: Vec<usize> = (0..chats.len()).collect();
143        if main_idx != 0 {
144            order.remove(main_idx);
145            order.insert(0, main_idx);
146        }
147
148        for idx in order {
149            let (stem, chat) = &chats[idx];
150            let is_subagent = chat.kind.as_deref() == Some("subagent");
151            let state = self.state.entry(stem.clone()).or_default();
152            let first_time = !state.existed;
153            state.existed = true;
154
155            // Emit subagent_started when we see a sub-agent file for
156            // the first time.
157            if is_subagent && first_time {
158                events.push(WatcherEvent::Progress {
159                    kind: "subagent_started".into(),
160                    data: serde_json::json!({
161                        "session_id": chat.session_id,
162                        "chat_name": stem,
163                    }),
164                });
165            }
166
167            // Diff message count for new-turn events.
168            for (i, msg) in chat.messages.iter().enumerate() {
169                if i < state.seen_messages {
170                    continue;
171                }
172                events.push(WatcherEvent::Turn(Box::new(to_turn(msg))));
173                state
174                    .tool_statuses
175                    .insert(msg.id.clone(), snapshot_statuses(msg));
176                self.seen_total += 1;
177            }
178
179            // Detect status transitions on messages we've already emitted.
180            let limit = state.seen_messages.min(chat.messages.len());
181            for msg in chat.messages.iter().take(limit) {
182                let current = snapshot_statuses(msg);
183                let prev = state
184                    .tool_statuses
185                    .get(&msg.id)
186                    .cloned()
187                    .unwrap_or_default();
188                if current != prev {
189                    events.push(WatcherEvent::TurnUpdated(Box::new(to_turn(msg))));
190                    state.tool_statuses.insert(msg.id.clone(), current);
191                }
192            }
193
194            state.seen_messages = chat.messages.len();
195
196            // Sub-agent completion is signalled by `summary` appearing.
197            if is_subagent && !state.summary_emitted && chat.summary.is_some() {
198                events.push(WatcherEvent::Progress {
199                    kind: "subagent_complete".into(),
200                    data: serde_json::json!({
201                        "session_id": chat.session_id,
202                        "chat_name": stem,
203                        "summary": chat.summary,
204                    }),
205                });
206                state.summary_emitted = true;
207            }
208        }
209
210        Ok(events)
211    }
212
213    /// Re-read the full session and return a fresh `ConversationView`
214    /// along with the new events emitted since the last poll.
215    ///
216    /// Useful when consumers need both the delta and the full state.
217    pub fn poll_with_view(
218        &mut self,
219    ) -> Result<(toolpath_convo::ConversationView, Vec<WatcherEvent>)> {
220        let events = self.poll()?;
221        let convo = self
222            .manager
223            .read_conversation(&self.project, &self.session_uuid)?;
224        Ok((to_view(&convo), events))
225    }
226}
227
228fn snapshot_statuses(msg: &GeminiMessage) -> HashMap<String, String> {
229    msg.tool_calls()
230        .iter()
231        .map(|t| (t.id.clone(), t.status.clone()))
232        .collect()
233}
234
235// ── toolpath_convo::ConversationWatcher impl ────────────────────────
236
237impl toolpath_convo::ConversationWatcher for ConversationWatcher {
238    fn poll(&mut self) -> toolpath_convo::Result<Vec<WatcherEvent>> {
239        ConversationWatcher::poll(self)
240            .map_err(|e| toolpath_convo::ConvoError::Provider(e.to_string()))
241    }
242
243    fn seen_count(&self) -> usize {
244        ConversationWatcher::seen_count(self)
245    }
246}
247
248// ── Tests ────────────────────────────────────────────────────────────
249
250#[cfg(test)]
251mod tests {
252    use super::*;
253    use crate::PathResolver;
254    use std::fs;
255    use tempfile::TempDir;
256    use toolpath_convo::{Role, WatcherEvent};
257
258    fn setup() -> (TempDir, GeminiConvo, std::path::PathBuf) {
259        let temp = TempDir::new().unwrap();
260        let gemini = temp.path().join(".gemini");
261        let session_dir = gemini.join("tmp/myrepo/chats/session-uuid");
262        fs::create_dir_all(&session_dir).unwrap();
263        fs::write(
264            gemini.join("projects.json"),
265            r#"{"projects":{"/abs/myrepo":"myrepo"}}"#,
266        )
267        .unwrap();
268        let mgr = GeminiConvo::with_resolver(PathResolver::new().with_gemini_dir(&gemini));
269        (temp, mgr, session_dir)
270    }
271
272    fn write_main(dir: &std::path::Path, body: &str) {
273        fs::write(dir.join("main.json"), body).unwrap();
274    }
275
276    #[test]
277    fn test_poll_empty_session() {
278        let (_t, mgr, _dir) = setup();
279        let mut w = ConversationWatcher::new(mgr, "/abs/myrepo".into(), "missing".into());
280        let events = w.poll().unwrap();
281        assert!(events.is_empty());
282        assert_eq!(w.seen_count(), 0);
283    }
284
285    #[test]
286    fn test_poll_first_call_returns_all() {
287        let (_t, mgr, dir) = setup();
288        write_main(
289            &dir,
290            r#"{"sessionId":"s","projectHash":"","messages":[
291  {"id":"m1","timestamp":"ts","type":"user","content":[{"text":"hi"}]},
292  {"id":"m2","timestamp":"ts","type":"gemini","content":"hello","model":"g"}
293]}"#,
294        );
295        let mut w = ConversationWatcher::new(mgr, "/abs/myrepo".into(), "session-uuid".into());
296        let events = w.poll().unwrap();
297        assert_eq!(events.len(), 2);
298        assert!(matches!(events[0], WatcherEvent::Turn(_)));
299        assert!(matches!(events[1], WatcherEvent::Turn(_)));
300        assert_eq!(w.seen_count(), 2);
301    }
302
303    #[test]
304    fn test_poll_second_call_returns_empty_when_idle() {
305        let (_t, mgr, dir) = setup();
306        write_main(
307            &dir,
308            r#"{"sessionId":"s","projectHash":"","messages":[
309  {"id":"m1","timestamp":"ts","type":"user","content":[{"text":"hi"}]}
310]}"#,
311        );
312        let mut w = ConversationWatcher::new(mgr, "/abs/myrepo".into(), "session-uuid".into());
313        let _ = w.poll().unwrap();
314        let events = w.poll().unwrap();
315        assert!(events.is_empty());
316    }
317
318    #[test]
319    fn test_poll_detects_new_messages() {
320        let (_t, mgr, dir) = setup();
321        write_main(
322            &dir,
323            r#"{"sessionId":"s","projectHash":"","messages":[
324  {"id":"m1","timestamp":"ts","type":"user","content":[{"text":"hi"}]}
325]}"#,
326        );
327        let mut w = ConversationWatcher::new(mgr, "/abs/myrepo".into(), "session-uuid".into());
328        let first = w.poll().unwrap();
329        assert_eq!(first.len(), 1);
330
331        write_main(
332            &dir,
333            r#"{"sessionId":"s","projectHash":"","messages":[
334  {"id":"m1","timestamp":"ts","type":"user","content":[{"text":"hi"}]},
335  {"id":"m2","timestamp":"ts","type":"gemini","content":"hello","model":"g"}
336]}"#,
337        );
338        let second = w.poll().unwrap();
339        assert_eq!(second.len(), 1);
340        match &second[0] {
341            WatcherEvent::Turn(t) => assert_eq!(t.text, "hello"),
342            other => panic!("expected Turn, got {:?}", std::mem::discriminant(other)),
343        }
344    }
345
346    #[test]
347    fn test_poll_detects_status_transition() {
348        let (_t, mgr, dir) = setup();
349        // First state: tool call pending.
350        write_main(
351            &dir,
352            r#"{"sessionId":"s","projectHash":"","messages":[
353  {"id":"m1","timestamp":"ts","type":"gemini","content":"","toolCalls":[
354    {"id":"t1","name":"read_file","args":{},"status":"pending","timestamp":"ts"}
355  ]}
356]}"#,
357        );
358        let mut w = ConversationWatcher::new(mgr, "/abs/myrepo".into(), "session-uuid".into());
359        let first = w.poll().unwrap();
360        assert_eq!(first.len(), 1);
361        assert!(matches!(first[0], WatcherEvent::Turn(_)));
362
363        // Second state: same message, tool call now successful.
364        write_main(
365            &dir,
366            r#"{"sessionId":"s","projectHash":"","messages":[
367  {"id":"m1","timestamp":"ts","type":"gemini","content":"","toolCalls":[
368    {"id":"t1","name":"read_file","args":{},"status":"success","timestamp":"ts","result":[{"functionResponse":{"id":"t1","name":"read_file","response":{"output":"ok"}}}]}
369  ]}
370]}"#,
371        );
372        let second = w.poll().unwrap();
373        assert_eq!(second.len(), 1);
374        match &second[0] {
375            WatcherEvent::TurnUpdated(t) => {
376                assert_eq!(t.id, "m1");
377                assert_eq!(t.tool_uses[0].result.as_ref().unwrap().content, "ok");
378            }
379            other => panic!(
380                "expected TurnUpdated, got {:?}",
381                std::mem::discriminant(other)
382            ),
383        }
384    }
385
386    #[test]
387    fn test_poll_emits_subagent_started_and_complete() {
388        let (_t, mgr, dir) = setup();
389        write_main(&dir, r#"{"sessionId":"m","projectHash":"","messages":[]}"#);
390        // First poll — main only, no events (no messages).
391        let mut w = ConversationWatcher::new(mgr, "/abs/myrepo".into(), "session-uuid".into());
392        let e1 = w.poll().unwrap();
393        assert!(e1.is_empty());
394
395        // Add a sub-agent without summary.
396        fs::write(
397            dir.join("sub.json"),
398            r#"{"sessionId":"subby","projectHash":"","kind":"subagent","messages":[
399  {"id":"sx","timestamp":"ts","type":"user","content":[{"text":"go"}]}
400]}"#,
401        )
402        .unwrap();
403        let e2 = w.poll().unwrap();
404        let kinds: Vec<&str> = e2
405            .iter()
406            .filter_map(|e| match e {
407                WatcherEvent::Progress { kind, .. } => Some(kind.as_str()),
408                _ => None,
409            })
410            .collect();
411        assert!(kinds.contains(&"subagent_started"));
412        assert!(!kinds.contains(&"subagent_complete"));
413
414        // Now write a summary.
415        fs::write(
416            dir.join("sub.json"),
417            r#"{"sessionId":"subby","projectHash":"","kind":"subagent","summary":"done","messages":[
418  {"id":"sx","timestamp":"ts","type":"user","content":[{"text":"go"}]}
419]}"#,
420        )
421        .unwrap();
422        let e3 = w.poll().unwrap();
423        let kinds: Vec<&str> = e3
424            .iter()
425            .filter_map(|e| match e {
426                WatcherEvent::Progress { kind, .. } => Some(kind.as_str()),
427                _ => None,
428            })
429            .collect();
430        assert!(kinds.contains(&"subagent_complete"));
431    }
432
433    #[test]
434    fn test_poll_preserves_role() {
435        let (_t, mgr, dir) = setup();
436        write_main(
437            &dir,
438            r#"{"sessionId":"s","projectHash":"","messages":[
439  {"id":"u","timestamp":"ts","type":"user","content":[{"text":"hi"}]},
440  {"id":"a","timestamp":"ts","type":"gemini","content":"hey"}
441]}"#,
442        );
443        let mut w = ConversationWatcher::new(mgr, "/abs/myrepo".into(), "session-uuid".into());
444        let events = w.poll().unwrap();
445        let roles: Vec<&Role> = events
446            .iter()
447            .filter_map(|e| e.as_turn().map(|t| &t.role))
448            .collect();
449        assert_eq!(roles, vec![&Role::User, &Role::Assistant]);
450    }
451
452    #[test]
453    fn test_reset_re_emits_all() {
454        let (_t, mgr, dir) = setup();
455        write_main(
456            &dir,
457            r#"{"sessionId":"s","projectHash":"","messages":[
458  {"id":"m1","timestamp":"ts","type":"user","content":[{"text":"hi"}]}
459]}"#,
460        );
461        let mut w = ConversationWatcher::new(mgr, "/abs/myrepo".into(), "session-uuid".into());
462        let _ = w.poll().unwrap();
463        w.reset();
464        let re = w.poll().unwrap();
465        assert_eq!(re.len(), 1);
466    }
467
468    #[test]
469    fn test_poll_with_view_returns_full_conversation() {
470        let (_t, mgr, dir) = setup();
471        write_main(
472            &dir,
473            r#"{"sessionId":"s","projectHash":"","messages":[
474  {"id":"m1","timestamp":"ts","type":"user","content":[{"text":"hi"}]}
475]}"#,
476        );
477        let mut w = ConversationWatcher::new(mgr, "/abs/myrepo".into(), "session-uuid".into());
478        let (view, events) = w.poll_with_view().unwrap();
479        assert_eq!(view.turns.len(), 1);
480        assert_eq!(events.len(), 1);
481    }
482
483    #[test]
484    fn test_trait_impl() {
485        let (_t, mgr, dir) = setup();
486        write_main(
487            &dir,
488            r#"{"sessionId":"s","projectHash":"","messages":[
489  {"id":"m1","timestamp":"ts","type":"user","content":[{"text":"hi"}]}
490]}"#,
491        );
492        let mut w = ConversationWatcher::new(mgr, "/abs/myrepo".into(), "session-uuid".into());
493        let events = toolpath_convo::ConversationWatcher::poll(&mut w).unwrap();
494        assert_eq!(events.len(), 1);
495        assert_eq!(toolpath_convo::ConversationWatcher::seen_count(&w), 1);
496    }
497
498    // ── Accessors ────────────────────────────────────────────────────
499
500    #[test]
501    fn test_project_and_session_accessors() {
502        let (_t, mgr, _dir) = setup();
503        let w = ConversationWatcher::new(mgr, "/abs/myrepo".into(), "session-uuid".into());
504        assert_eq!(w.project(), "/abs/myrepo");
505        assert_eq!(w.session_uuid(), "session-uuid");
506    }
507
508    // ── Additional status transitions ───────────────────────────────
509
510    #[test]
511    fn test_poll_detects_status_transition_to_cancelled() {
512        let (_t, mgr, dir) = setup();
513        write_main(
514            &dir,
515            r#"{"sessionId":"s","projectHash":"","messages":[
516  {"id":"m1","timestamp":"ts","type":"gemini","content":"","toolCalls":[
517    {"id":"t1","name":"run_shell_command","args":{},"status":"pending","timestamp":"ts"}
518  ]}
519]}"#,
520        );
521        let mut w = ConversationWatcher::new(mgr, "/abs/myrepo".into(), "session-uuid".into());
522        let _ = w.poll().unwrap();
523
524        // Status flips to cancelled — must still produce TurnUpdated.
525        write_main(
526            &dir,
527            r#"{"sessionId":"s","projectHash":"","messages":[
528  {"id":"m1","timestamp":"ts","type":"gemini","content":"","toolCalls":[
529    {"id":"t1","name":"run_shell_command","args":{},"status":"cancelled","timestamp":"ts"}
530  ]}
531]}"#,
532        );
533        let events = w.poll().unwrap();
534        assert_eq!(events.len(), 1);
535        assert!(matches!(events[0], WatcherEvent::TurnUpdated(_)));
536    }
537
538    #[test]
539    fn test_poll_no_event_when_status_unchanged() {
540        let (_t, mgr, dir) = setup();
541        write_main(
542            &dir,
543            r#"{"sessionId":"s","projectHash":"","messages":[
544  {"id":"m1","timestamp":"ts","type":"gemini","content":"done","toolCalls":[
545    {"id":"t1","name":"read_file","args":{},"status":"success","timestamp":"ts"}
546  ]}
547]}"#,
548        );
549        let mut w = ConversationWatcher::new(mgr, "/abs/myrepo".into(), "session-uuid".into());
550        let _ = w.poll().unwrap();
551
552        // Same file, same content — should produce nothing.
553        let events = w.poll().unwrap();
554        assert!(events.is_empty());
555    }
556
557    // ── Sub-agent timing edge cases ─────────────────────────────────
558
559    #[test]
560    fn test_subagent_added_after_non_empty_main() {
561        let (_t, mgr, dir) = setup();
562        write_main(
563            &dir,
564            r#"{"sessionId":"m","projectHash":"","messages":[
565  {"id":"u1","timestamp":"ts","type":"user","content":[{"text":"hi"}]}
566]}"#,
567        );
568        let mut w = ConversationWatcher::new(mgr, "/abs/myrepo".into(), "session-uuid".into());
569        let events = w.poll().unwrap();
570        assert_eq!(events.len(), 1);
571        // No progress events yet — no sub-agent file exists.
572        assert!(
573            !events
574                .iter()
575                .any(|e| matches!(e, WatcherEvent::Progress { .. }))
576        );
577
578        // Now a sub-agent appears.
579        fs::write(
580            dir.join("helper.json"),
581            r#"{"sessionId":"helper","projectHash":"","kind":"subagent","messages":[
582  {"id":"h1","timestamp":"ts","type":"user","content":[{"text":"search"}]}
583]}"#,
584        )
585        .unwrap();
586        let events = w.poll().unwrap();
587        let has_started = events.iter().any(
588            |e| matches!(e, WatcherEvent::Progress { kind, .. } if kind == "subagent_started"),
589        );
590        let has_turn = events.iter().any(|e| matches!(e, WatcherEvent::Turn(_)));
591        assert!(
592            has_started,
593            "expected subagent_started, got {:?}",
594            events.len()
595        );
596        assert!(has_turn, "expected the sub-agent's first turn");
597    }
598
599    #[test]
600    fn test_subagent_complete_emitted_separately_from_started() {
601        // When a sub-agent file is created without `summary` and then
602        // `summary` is added, the two Progress events should fire in
603        // separate polls.
604        let (_t, mgr, dir) = setup();
605        write_main(&dir, r#"{"sessionId":"m","projectHash":"","messages":[]}"#);
606        let mut w = ConversationWatcher::new(mgr, "/abs/myrepo".into(), "session-uuid".into());
607        let _ = w.poll().unwrap();
608
609        fs::write(
610            dir.join("sub.json"),
611            r#"{"sessionId":"s","projectHash":"","kind":"subagent","messages":[]}"#,
612        )
613        .unwrap();
614        let e1 = w.poll().unwrap();
615        let started_count = e1
616            .iter()
617            .filter(
618                |e| matches!(e, WatcherEvent::Progress { kind, .. } if kind == "subagent_started"),
619            )
620            .count();
621        let complete_count = e1
622            .iter()
623            .filter(
624                |e| matches!(e, WatcherEvent::Progress { kind, .. } if kind == "subagent_complete"),
625            )
626            .count();
627        assert_eq!(started_count, 1);
628        assert_eq!(complete_count, 0);
629
630        // Add a summary.
631        fs::write(
632            dir.join("sub.json"),
633            r#"{"sessionId":"s","projectHash":"","kind":"subagent","summary":"done","messages":[]}"#,
634        )
635        .unwrap();
636        let e2 = w.poll().unwrap();
637        // Complete fires, started does NOT fire again (we've seen the file).
638        let started_count = e2
639            .iter()
640            .filter(
641                |e| matches!(e, WatcherEvent::Progress { kind, .. } if kind == "subagent_started"),
642            )
643            .count();
644        let complete_count = e2
645            .iter()
646            .filter(
647                |e| matches!(e, WatcherEvent::Progress { kind, .. } if kind == "subagent_complete"),
648            )
649            .count();
650        assert_eq!(started_count, 0);
651        assert_eq!(complete_count, 1);
652    }
653
654    // ── Unknown role survives watcher path ─────────────────────────
655
656    #[test]
657    fn test_poll_preserves_unknown_role() {
658        let (_t, mgr, dir) = setup();
659        write_main(
660            &dir,
661            r#"{"sessionId":"s","projectHash":"","messages":[
662  {"id":"m1","timestamp":"ts","type":"plan","content":"planning..."}
663]}"#,
664        );
665        let mut w = ConversationWatcher::new(mgr, "/abs/myrepo".into(), "session-uuid".into());
666        let events = w.poll().unwrap();
667        assert_eq!(events.len(), 1);
668        match &events[0] {
669            WatcherEvent::Turn(t) => {
670                // `plan` maps to Role::Other("plan")
671                assert!(matches!(t.role, Role::Other(ref s) if s == "plan"));
672            }
673            other => panic!("expected Turn, got {:?}", std::mem::discriminant(other)),
674        }
675    }
676}