Skip to main content

zeph_tui/app/
events.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use tokio::sync::mpsc;
5
6use crate::event::{AgentEvent, AppEvent};
7
8use super::{App, ChatMessage, ConfirmState, ElicitationState, MessageRole, debug};
9
10impl App {
11    /// Dispatch a top-level [`AppEvent`] to the appropriate handler.
12    ///
13    /// Called once per event in the main [`crate::run_tui`] loop.
14    pub fn handle_event(&mut self, event: AppEvent) {
15        match event {
16            AppEvent::Key(key) => self.handle_key(key),
17            AppEvent::Tick => {
18                self.throbber_state.calc_next();
19            }
20            AppEvent::Resize(_, _) => {
21                self.sessions.current_mut().render_cache.clear();
22            }
23            AppEvent::Agent(agent_event) => self.handle_agent_event(agent_event),
24            AppEvent::Paste(text) => self.handle_paste(&text),
25        }
26    }
27
28    /// Await the next [`AgentEvent`] from the agent channel.
29    ///
30    /// Returns `None` when all senders have been dropped (agent exited).
31    /// Called from the `select!` block in [`crate::run_tui`].
32    pub fn poll_agent_event(&mut self) -> impl Future<Output = Option<AgentEvent>> + use<'_> {
33        self.agent_event_rx.recv()
34    }
35
36    /// Non-blocking poll for a pending [`AgentEvent`].
37    ///
38    /// Used to drain the channel after a first event has been received,
39    /// coalescing multiple events into a single render frame.
40    ///
41    /// # Errors
42    ///
43    /// Returns `TryRecvError::Empty` if no events are pending, or
44    /// `TryRecvError::Disconnected` if the sender has been dropped.
45    pub fn try_recv_agent_event(&mut self) -> Result<AgentEvent, mpsc::error::TryRecvError> {
46        self.agent_event_rx.try_recv()
47    }
48
49    /// Handle an [`AgentEvent`] and update widget state accordingly.
50    ///
51    /// This is the main state-transition function for agent-driven updates:
52    /// appending streaming chunks, recording tool events, displaying confirm
53    /// dialogs, and wiring late-bound channels (cancel signal, metrics).
54    #[allow(clippy::too_many_lines)] // large match over all agent event variants
55    pub fn handle_agent_event(&mut self, event: AgentEvent) {
56        match event {
57            AgentEvent::Chunk(text) => {
58                self.sessions.current_mut().status_label = None;
59                if let Some(last) = self.sessions.current_mut().messages.last_mut()
60                    && last.role == MessageRole::Assistant
61                    && last.streaming
62                {
63                    last.content.push_str(&text);
64                } else {
65                    self.sessions
66                        .current_mut()
67                        .messages
68                        .push(ChatMessage::new(MessageRole::Assistant, text).streaming());
69                    self.trim_messages();
70                }
71                // No explicit cache invalidation needed: the cache key includes
72                // content_hash, so new chunk content causes a natural cache miss.
73                self.auto_scroll();
74            }
75            AgentEvent::FullMessage(text) => {
76                self.sessions.current_mut().status_label = None;
77                if !text.starts_with("[tool output") {
78                    self.sessions
79                        .current_mut()
80                        .messages
81                        .push(ChatMessage::new(MessageRole::Assistant, text));
82                    self.trim_messages();
83                }
84                self.auto_scroll();
85            }
86            AgentEvent::Flush => {
87                if let Some(last) = self.sessions.current_mut().messages.last_mut()
88                    && last.streaming
89                {
90                    last.streaming = false;
91                    let last_idx = self.sessions.current().messages.len().saturating_sub(1);
92                    self.sessions
93                        .current_mut()
94                        .render_cache
95                        .invalidate(last_idx);
96                }
97            }
98            AgentEvent::Typing => {
99                self.pending_count = self.pending_count.saturating_sub(1);
100                self.sessions.current_mut().status_label = Some("thinking...".to_owned());
101            }
102            AgentEvent::Status(text) => {
103                self.sessions.current_mut().status_label =
104                    if text.is_empty() { None } else { Some(text) };
105                self.auto_scroll();
106            }
107            AgentEvent::ToolStart {
108                tool_name,
109                command,
110                tool_call_id,
111            } => {
112                self.sessions.current_mut().status_label = None;
113                self.sessions.current_mut().messages.push(
114                    ChatMessage::new(MessageRole::Tool, format!("$ {command}\n"))
115                        .streaming()
116                        .with_tool(tool_name)
117                        .with_tool_call_id(tool_call_id),
118                );
119                self.trim_messages();
120                self.auto_scroll();
121            }
122            AgentEvent::ToolOutputChunk {
123                chunk,
124                tool_call_id,
125                ..
126            } => {
127                let pos = if tool_call_id.is_empty() {
128                    // Shell tool chunks arrive without a tool_call_id; fall back to the last
129                    // streaming Tool message (there is at most one active at a time).
130                    self.sessions
131                        .current()
132                        .messages
133                        .iter()
134                        .rposition(|m| m.role == MessageRole::Tool && m.streaming)
135                } else {
136                    let found =
137                        self.sessions.current().messages.iter().rposition(|m| {
138                            m.tool_call_id.as_deref() == Some(tool_call_id.as_str())
139                        });
140                    if found.is_none() {
141                        tracing::warn!(
142                            %tool_call_id,
143                            "ToolOutputChunk: no message with matching tool_call_id — dropping chunk"
144                        );
145                    }
146                    found
147                };
148                if let Some(pos) = pos {
149                    self.sessions.current_mut().messages[pos]
150                        .content
151                        .push_str(&chunk);
152                    self.sessions.current_mut().render_cache.invalidate(pos);
153                }
154                self.auto_scroll();
155            }
156            AgentEvent::ToolOutput {
157                tool_name,
158                output,
159                diff,
160                filter_stats,
161                kept_lines,
162                success,
163                tool_call_id,
164                ..
165            } => {
166                self.handle_tool_output_event(
167                    tool_name,
168                    output,
169                    diff,
170                    filter_stats,
171                    kept_lines,
172                    success,
173                    tool_call_id,
174                );
175            }
176            AgentEvent::ConfirmRequest {
177                prompt,
178                response_tx,
179            } => {
180                self.confirm_state = Some(ConfirmState {
181                    prompt,
182                    response_tx: Some(response_tx),
183                });
184            }
185            AgentEvent::ElicitationRequest {
186                request,
187                response_tx,
188            } => {
189                let dialog = crate::widgets::elicitation::ElicitationDialogState::new(request);
190                self.elicitation_state = Some(ElicitationState {
191                    dialog,
192                    response_tx: Some(response_tx),
193                });
194            }
195            AgentEvent::QueueCount(count) => {
196                self.queued_count = count;
197                self.pending_count = count;
198            }
199            AgentEvent::DiffReady { diff, tool_call_id } => {
200                self.handle_diff_ready(diff, &tool_call_id);
201            }
202            AgentEvent::CommandResult { output, .. } => {
203                self.command_palette = None;
204                self.sessions
205                    .current_mut()
206                    .messages
207                    .push(ChatMessage::new(MessageRole::System, output));
208                self.trim_messages();
209                self.auto_scroll();
210            }
211            AgentEvent::SetCancelSignal(signal) => {
212                self.set_cancel_signal(signal);
213            }
214            AgentEvent::SetMetricsRx(rx) => {
215                self.set_metrics_rx(rx);
216            }
217            AgentEvent::ForegroundSubagentStarted { id, name } => {
218                self.sessions.current_mut().status_label =
219                    Some(format!("Sub-agent '{name}' running..."));
220                self.set_view_target(super::AgentViewTarget::SubAgent { id, name });
221            }
222            AgentEvent::ForegroundSubagentCompleted { id, name, success } => {
223                // Only switch back to Main if we are still viewing this subagent.
224                // If the user manually navigated away, respect that choice.
225                if self.sessions.current().view_target.subagent_id() == Some(id.as_str()) {
226                    self.set_view_target(super::AgentViewTarget::Main);
227                }
228                let label = if success {
229                    format!("Sub-agent '{name}' completed")
230                } else {
231                    format!("Sub-agent '{name}' failed")
232                };
233                self.sessions.current_mut().status_label = Some(label.clone());
234                self.sessions
235                    .current_mut()
236                    .messages
237                    .push(ChatMessage::new(MessageRole::System, label));
238                self.trim_messages();
239                self.auto_scroll();
240            }
241            AgentEvent::ContextEstimate(tokens) => {
242                self.context_token_estimate = tokens;
243            }
244            AgentEvent::FleetSnapshot(snapshot) => {
245                self.fleet_snapshot = snapshot;
246            }
247        }
248    }
249
250    fn handle_diff_ready(&mut self, diff: zeph_core::DiffData, tool_call_id: &str) {
251        if let Some(msg) = self
252            .sessions
253            .current_mut()
254            .messages
255            .iter_mut()
256            .rev()
257            .find(|m| {
258                m.role == MessageRole::Tool && m.tool_call_id.as_deref() == Some(tool_call_id)
259            })
260        {
261            msg.diff_data = Some(diff);
262        }
263    }
264
265    #[allow(clippy::too_many_arguments)]
266    fn handle_tool_output_event(
267        &mut self,
268        tool_name: zeph_common::ToolName,
269        output: String,
270        diff: Option<zeph_core::DiffData>,
271        filter_stats: Option<String>,
272        kept_lines: Option<Vec<usize>>,
273        success: bool,
274        tool_call_id: String,
275    ) {
276        debug!(
277            %tool_name,
278            has_diff = diff.is_some(),
279            has_filter_stats = filter_stats.is_some(),
280            output_len = output.len(),
281            "TUI ToolOutput event received"
282        );
283        // Try id-based lookup first; fall back to streaming-flag lookup for
284        // cases where ToolStart was not emitted (legacy path, empty tool_call_id).
285        let pos = if tool_call_id.is_empty() {
286            self.sessions
287                .current()
288                .messages
289                .iter()
290                .rposition(|m| m.role == MessageRole::Tool && m.streaming)
291        } else {
292            let found = self
293                .sessions
294                .current()
295                .messages
296                .iter()
297                .rposition(|m| {
298                    m.role == MessageRole::Tool
299                        && m.streaming
300                        && m.tool_call_id.as_deref() == Some(tool_call_id.as_str())
301                })
302                .or_else(|| {
303                    self.sessions
304                        .current()
305                        .messages
306                        .iter()
307                        .rposition(|m| m.role == MessageRole::Tool && m.streaming)
308                });
309            if found.is_none() {
310                tracing::warn!(
311                    tool_call_id = %tool_call_id,
312                    "ToolOutput: no streaming Tool message found — skipping finalization"
313                );
314            }
315            found
316        };
317
318        if let Some(pos) = pos {
319            // Finalize existing streaming tool message (shell or native path with ToolStart).
320            // Replace content after the header line ("$ cmd\n") with the canonical body_display
321            // from ToolOutputEvent. Streaming chunks (Path B) may already occupy that space;
322            // appending would duplicate the output. Truncating to the header and re-writing
323            // body_display produces exactly one copy regardless of whether chunks arrived.
324            debug!("finalizing existing streaming Tool message");
325            let header_end = self.sessions.current_mut().messages[pos]
326                .content
327                .find('\n')
328                .map_or(0, |i| i + 1);
329            self.sessions.current_mut().messages[pos]
330                .content
331                .truncate(header_end);
332            self.sessions.current_mut().messages[pos]
333                .content
334                .push_str(&output);
335            self.sessions.current_mut().messages[pos].streaming = false;
336            self.sessions.current_mut().messages[pos].diff_data = diff;
337            self.sessions.current_mut().messages[pos].filter_stats = filter_stats;
338            self.sessions.current_mut().messages[pos].kept_lines = kept_lines;
339            self.sessions.current_mut().messages[pos].success = Some(success);
340            self.sessions.current_mut().render_cache.invalidate(pos);
341        } else if diff.is_some() || filter_stats.is_some() || kept_lines.is_some() {
342            // No prior ToolStart: create the message now (legacy fallback).
343            debug!("creating new Tool message with diff (no prior ToolStart)");
344            let mut msg = ChatMessage::new(MessageRole::Tool, output)
345                .with_tool(tool_name)
346                .with_tool_call_id(tool_call_id);
347            msg.diff_data = diff;
348            msg.filter_stats = filter_stats;
349            msg.kept_lines = kept_lines;
350            msg.success = Some(success);
351            self.sessions.current_mut().messages.push(msg);
352            self.trim_messages();
353        } else if let Some(msg) = self
354            .sessions
355            .current_mut()
356            .messages
357            .iter_mut()
358            .rev()
359            .find(|m| m.role == MessageRole::Tool)
360        {
361            msg.filter_stats = filter_stats;
362        }
363        self.auto_scroll();
364    }
365
366    #[must_use]
367    pub fn confirm_state(&self) -> Option<&ConfirmState> {
368        self.confirm_state.as_ref()
369    }
370}
371
372#[cfg(test)]
373mod tests {
374    use tokio::sync::mpsc;
375
376    use crate::app::{AgentViewTarget, App};
377    use crate::event::AgentEvent;
378    use crate::types::{ChatMessage, MessageRole};
379    use zeph_core::DiffData;
380
381    fn make_app() -> App {
382        let (user_tx, agent_rx) = {
383            let (utx, _urx) = mpsc::channel(8);
384            let (_atx, arx) = mpsc::channel(8);
385            (utx, arx)
386        };
387        let mut app = App::new(user_tx, agent_rx);
388        app.sessions.current_mut().messages.clear();
389        app
390    }
391
392    /// Push a streaming Tool message with a specific `tool_call_id` directly onto the session.
393    fn push_tool_msg(app: &mut App, id: &str) {
394        let msg = ChatMessage::new(MessageRole::Tool, format!("$ cmd_{id}\n"))
395            .streaming()
396            .with_tool_call_id(id.to_owned());
397        app.sessions.current_mut().messages.push(msg);
398    }
399
400    fn tool_msg(id: &str) -> ChatMessage {
401        ChatMessage::new(MessageRole::Tool, "$ cmd\n".to_owned())
402            .with_tool("bash".into())
403            .with_tool_call_id(id.to_owned())
404    }
405
406    fn diff() -> DiffData {
407        DiffData {
408            file_path: "a.rs".into(),
409            old_content: "old".into(),
410            new_content: "new".into(),
411        }
412    }
413
414    #[test]
415    fn tool_output_chunk_routes_by_id_out_of_order() {
416        let mut app = make_app();
417        push_tool_msg(&mut app, "a");
418        push_tool_msg(&mut app, "b");
419        push_tool_msg(&mut app, "c");
420
421        // Deliver chunks out of order: c, a, b, a, c
422        for (id, chunk) in [
423            ("c", "c1"),
424            ("a", "a1"),
425            ("b", "b1"),
426            ("a", "a2"),
427            ("c", "c2"),
428        ] {
429            app.handle_agent_event(AgentEvent::ToolOutputChunk {
430                tool_name: "bash".into(),
431                command: String::new(),
432                chunk: chunk.to_owned(),
433                tool_call_id: id.to_owned(),
434            });
435        }
436
437        let msgs = app.messages();
438        assert_eq!(msgs.len(), 3);
439        // Message order: a=0, b=1, c=2
440        assert_eq!(msgs[0].content, "$ cmd_a\na1a2");
441        assert_eq!(msgs[1].content, "$ cmd_b\nb1");
442        assert_eq!(msgs[2].content, "$ cmd_c\nc1c2");
443    }
444
445    #[test]
446    fn tool_output_chunk_with_unknown_id_is_dropped() {
447        let mut app = make_app();
448        push_tool_msg(&mut app, "known");
449
450        // Chunk for an id that has no matching message — must be silently dropped.
451        app.handle_agent_event(AgentEvent::ToolOutputChunk {
452            tool_name: "bash".into(),
453            command: String::new(),
454            chunk: "should-not-appear".to_owned(),
455            tool_call_id: "unknown-xyz".to_owned(),
456        });
457
458        // The known message must be unchanged.
459        assert_eq!(app.messages().len(), 1);
460        assert_eq!(app.messages()[0].content, "$ cmd_known\n");
461    }
462
463    #[test]
464    fn tool_output_finalizes_correct_message_by_id() {
465        let mut app = make_app();
466        push_tool_msg(&mut app, "t1");
467        push_tool_msg(&mut app, "t2");
468
469        // Finalize t1 with ToolOutput.
470        app.handle_agent_event(AgentEvent::ToolOutput {
471            tool_name: "bash".into(),
472            command: "$ cmd_t1\n".into(),
473            output: "final-output-t1".to_owned(),
474            success: true,
475            diff: None,
476            filter_stats: None,
477            kept_lines: None,
478            tool_call_id: "t1".to_owned(),
479        });
480
481        let msgs = app.messages();
482        assert_eq!(msgs.len(), 2);
483        // t1 must be finalized (not streaming) with the canonical output.
484        assert!(!msgs[0].streaming);
485        assert!(msgs[0].content.contains("final-output-t1"));
486        // t2 must still be streaming and unchanged.
487        assert!(msgs[1].streaming);
488        assert_eq!(msgs[1].content, "$ cmd_t2\n");
489    }
490
491    #[test]
492    fn diff_ready_attaches_to_matching_id() {
493        let mut app = make_app();
494        app.sessions.current_mut().messages.push(tool_msg("call-1"));
495        app.sessions.current_mut().messages.push(tool_msg("call-2"));
496
497        app.handle_agent_event(AgentEvent::DiffReady {
498            diff: diff(),
499            tool_call_id: "call-2".into(),
500        });
501
502        assert!(app.sessions.current().messages[0].diff_data.is_none());
503        assert!(app.sessions.current().messages[1].diff_data.is_some());
504    }
505
506    #[test]
507    fn diff_ready_mismatched_id_does_not_attach() {
508        let mut app = make_app();
509        app.sessions.current_mut().messages.push(tool_msg("call-1"));
510
511        app.handle_agent_event(AgentEvent::DiffReady {
512            diff: diff(),
513            tool_call_id: "call-99".into(),
514        });
515
516        assert!(app.sessions.current().messages[0].diff_data.is_none());
517    }
518
519    #[test]
520    fn diff_ready_empty_id_does_not_attach() {
521        let mut app = make_app();
522        app.sessions.current_mut().messages.push(tool_msg("call-1"));
523
524        app.handle_agent_event(AgentEvent::DiffReady {
525            diff: diff(),
526            tool_call_id: String::new(),
527        });
528
529        assert!(app.sessions.current().messages[0].diff_data.is_none());
530    }
531
532    #[test]
533    fn diff_ready_two_concurrent_attach_to_correct_messages() {
534        let mut app = make_app();
535        app.sessions.current_mut().messages.push(tool_msg("call-A"));
536        app.sessions.current_mut().messages.push(tool_msg("call-B"));
537        app.sessions.current_mut().messages.push(tool_msg("call-C"));
538
539        let diff_a = DiffData {
540            file_path: "a.rs".into(),
541            old_content: "old_a".into(),
542            new_content: "new_a".into(),
543        };
544        let diff_b = DiffData {
545            file_path: "b.rs".into(),
546            old_content: "old_b".into(),
547            new_content: "new_b".into(),
548        };
549
550        // Deliver out of order: B first, then A
551        app.handle_agent_event(AgentEvent::DiffReady {
552            diff: diff_b,
553            tool_call_id: "call-B".into(),
554        });
555        app.handle_agent_event(AgentEvent::DiffReady {
556            diff: diff_a,
557            tool_call_id: "call-A".into(),
558        });
559
560        let msgs = &app.sessions.current().messages;
561        assert_eq!(
562            msgs[0].diff_data.as_ref().map(|d| d.file_path.as_str()),
563            Some("a.rs"),
564            "call-A diff must attach to message 0"
565        );
566        assert_eq!(
567            msgs[1].diff_data.as_ref().map(|d| d.file_path.as_str()),
568            Some("b.rs"),
569            "call-B diff must attach to message 1"
570        );
571        assert!(
572            msgs[2].diff_data.is_none(),
573            "call-C must remain without diff"
574        );
575    }
576
577    #[test]
578    fn foreground_subagent_started_switches_view_to_subagent() {
579        let mut app = make_app();
580        assert!(app.sessions.current().view_target.is_main());
581
582        app.handle_agent_event(AgentEvent::ForegroundSubagentStarted {
583            id: "sa-001".into(),
584            name: "planner".into(),
585        });
586
587        assert_eq!(
588            app.sessions.current().view_target.subagent_id(),
589            Some("sa-001"),
590            "view must switch to the started subagent"
591        );
592        assert_eq!(
593            app.sessions.current().status_label.as_deref(),
594            Some("Sub-agent 'planner' running...")
595        );
596    }
597
598    #[test]
599    fn foreground_subagent_completed_switches_back_when_viewing_subagent() {
600        let mut app = make_app();
601
602        app.handle_agent_event(AgentEvent::ForegroundSubagentStarted {
603            id: "sa-002".into(),
604            name: "coder".into(),
605        });
606        assert_eq!(
607            app.sessions.current().view_target.subagent_id(),
608            Some("sa-002")
609        );
610
611        app.handle_agent_event(AgentEvent::ForegroundSubagentCompleted {
612            id: "sa-002".into(),
613            name: "coder".into(),
614            success: true,
615        });
616
617        assert!(
618            app.sessions.current().view_target.is_main(),
619            "view must return to Main after completion"
620        );
621        assert_eq!(
622            app.sessions.current().status_label.as_deref(),
623            Some("Sub-agent 'coder' completed")
624        );
625        let system_msg = app
626            .sessions
627            .current()
628            .messages
629            .iter()
630            .find(|m| m.role == MessageRole::System);
631        assert!(
632            system_msg.is_some(),
633            "completion system message must be pushed"
634        );
635    }
636
637    #[test]
638    fn foreground_subagent_completed_respects_manual_navigation() {
639        let mut app = make_app();
640
641        app.handle_agent_event(AgentEvent::ForegroundSubagentStarted {
642            id: "sa-003".into(),
643            name: "researcher".into(),
644        });
645
646        // Simulate user manually navigating away to a different subagent.
647        app.set_view_target(AgentViewTarget::SubAgent {
648            id: "sa-other".into(),
649            name: "other".into(),
650        });
651
652        app.handle_agent_event(AgentEvent::ForegroundSubagentCompleted {
653            id: "sa-003".into(),
654            name: "researcher".into(),
655            success: false,
656        });
657
658        // View must NOT switch to Main because user is viewing a different subagent.
659        assert_eq!(
660            app.sessions.current().view_target.subagent_id(),
661            Some("sa-other"),
662            "user's manual navigation must be preserved"
663        );
664    }
665
666    #[test]
667    fn foreground_subagent_failed_shows_failed_label() {
668        let mut app = make_app();
669
670        app.handle_agent_event(AgentEvent::ForegroundSubagentStarted {
671            id: "sa-004".into(),
672            name: "builder".into(),
673        });
674        app.handle_agent_event(AgentEvent::ForegroundSubagentCompleted {
675            id: "sa-004".into(),
676            name: "builder".into(),
677            success: false,
678        });
679
680        assert_eq!(
681            app.sessions.current().status_label.as_deref(),
682            Some("Sub-agent 'builder' failed")
683        );
684    }
685
686    // Fast-completing subagents cause a Started then immediately Completed event.
687    // This results in a brief flash before returning to Main, which is acceptable.
688    #[test]
689    fn foreground_subagent_fast_complete_ends_on_main() {
690        let mut app = make_app();
691
692        app.handle_agent_event(AgentEvent::ForegroundSubagentStarted {
693            id: "sa-fast".into(),
694            name: "quick".into(),
695        });
696        app.handle_agent_event(AgentEvent::ForegroundSubagentCompleted {
697            id: "sa-fast".into(),
698            name: "quick".into(),
699            success: true,
700        });
701
702        assert!(app.sessions.current().view_target.is_main());
703    }
704
705    #[test]
706    fn context_estimate_updates_cached_value() {
707        let mut app = make_app();
708        assert_eq!(
709            app.context_token_estimate(),
710            0,
711            "initial estimate must be 0"
712        );
713
714        app.handle_agent_event(AgentEvent::ContextEstimate(14_200));
715        assert_eq!(app.context_token_estimate(), 14_200);
716
717        app.handle_agent_event(AgentEvent::ContextEstimate(512));
718        assert_eq!(
719            app.context_token_estimate(),
720            512,
721            "estimate must update on each event"
722        );
723    }
724}