Skip to main content

codetether_agent/tui/app/
session_events.rs

1use crate::session::{Session, SessionEvent};
2use crate::tui::app::smart_switch::maybe_schedule_smart_switch_retry;
3use crate::tui::app::smart_switch::smart_switch_max_retries;
4use crate::tui::app::state::App;
5use crate::tui::app::text::truncate_preview;
6use crate::tui::app::worker_bridge::{handle_processing_started, handle_processing_stopped};
7use crate::tui::chat::message::{ChatMessage, MessageType};
8use crate::tui::worker_bridge::TuiWorkerBridge;
9
10pub async fn handle_session_event(
11    app: &mut App,
12    session: &mut Session,
13    worker_bridge: &Option<TuiWorkerBridge>,
14    evt: SessionEvent,
15) {
16    // Update watchdog timestamp on every session event
17    app.state.main_last_event_at = Some(std::time::Instant::now());
18    // Auto-follow latest output on every event (matches legacy TUI behavior).
19    // Without this, scroll stays pinned to wherever the user last left it and
20    // new streaming text / tool activity is rendered off-screen.
21    app.state.scroll_to_bottom();
22
23    match evt {
24        SessionEvent::Thinking => {
25            handle_processing_started(app, worker_bridge).await;
26            if app.state.processing_started_at.is_none() {
27                app.state.begin_request_timing();
28            }
29            app.state.status = "Thinking…".to_string();
30        }
31        SessionEvent::ToolCallStart { name, arguments } => {
32            handle_processing_started(app, worker_bridge).await;
33            if app.state.processing_started_at.is_none() {
34                app.state.begin_request_timing();
35            }
36            // Flush any in-flight streaming text into a real assistant message
37            // before showing the tool call. Otherwise the streamed reply is
38            // discarded when `streaming_text` is cleared on Done/TextComplete.
39            if !app.state.streaming_text.is_empty() {
40                let text = std::mem::take(&mut app.state.streaming_text);
41                app.state
42                    .messages
43                    .push(ChatMessage::new(MessageType::Assistant, text));
44            }
45            app.state.reset_tool_preview_scroll();
46            app.state.status = format!("Running tool: {name}");
47            app.state.messages.push(ChatMessage::new(
48                MessageType::ToolCall {
49                    name: name.clone(),
50                    arguments: arguments.clone(),
51                },
52                format!("{name}: {}", truncate_preview(&arguments, 240)),
53            ));
54            app.state.scroll_to_bottom();
55        }
56        SessionEvent::ToolCallComplete {
57            name,
58            output,
59            success,
60            duration_ms,
61        } => {
62            app.state.reset_tool_preview_scroll();
63            app.state.messages.push(ChatMessage::new(
64                MessageType::ToolResult {
65                    name: name.clone(),
66                    output: output.clone(),
67                    success,
68                    duration_ms: Some(duration_ms),
69                },
70                format!("{name}: {}", truncate_preview(&output, 600)),
71            ));
72            app.state.last_tool_name = Some(name.clone());
73            app.state.last_tool_latency_ms = Some(duration_ms);
74            app.state.last_tool_success = Some(success);
75            app.state.status = format!("Tool finished: {name}");
76            app.state.scroll_to_bottom();
77        }
78        SessionEvent::TextChunk(chunk) => {
79            app.state.scroll_to_bottom();
80            app.state.note_text_token();
81            app.state.streaming_text =
82                if chunk.len() > crate::tui::constants::MAX_STREAMING_TEXT_BYTES {
83                    let mut t = crate::util::truncate_bytes_safe(
84                        &chunk,
85                        crate::tui::constants::MAX_STREAMING_TEXT_BYTES,
86                    )
87                    .to_string();
88                    t.push_str(" …[truncated]");
89                    t
90                } else {
91                    chunk
92                };
93        }
94        SessionEvent::TextComplete(text) => {
95            app.state.note_text_token();
96            app.state.streaming_text.clear();
97            app.state
98                .messages
99                .push(ChatMessage::new(MessageType::Assistant, text));
100            app.state.status = "Assistant replied".to_string();
101            app.state.scroll_to_bottom();
102        }
103        SessionEvent::ThinkingComplete(text) => {
104            if !text.is_empty() {
105                app.state.reset_tool_preview_scroll();
106                app.state.messages.push(ChatMessage::new(
107                    MessageType::Thinking(text.clone()),
108                    truncate_preview(&text, 600),
109                ));
110                app.state.scroll_to_bottom();
111            }
112        }
113        SessionEvent::UsageReport {
114            model,
115            prompt_tokens,
116            completion_tokens,
117            duration_ms,
118        } => {
119            app.state.last_completion_model = Some(model.clone());
120            app.state.last_completion_latency_ms = Some(duration_ms);
121            app.state.last_completion_prompt_tokens = Some(prompt_tokens);
122            app.state.last_completion_output_tokens = Some(completion_tokens);
123            app.state.status = format!(
124                "Completed with model {model} • {} in / {} out • {} ms",
125                prompt_tokens, completion_tokens, duration_ms
126            );
127        }
128        SessionEvent::SessionSync(updated) => {
129            *session = *updated;
130            session.attach_global_bus_if_missing();
131            app.state.session_id = Some(session.id.clone());
132        }
133        SessionEvent::Done => {
134            handle_processing_stopped(app, worker_bridge).await;
135            app.state.streaming_text.clear();
136            app.state.complete_request_timing();
137            app.state.status = "Ready".to_string();
138        }
139        SessionEvent::Error(err) => {
140            handle_processing_stopped(app, worker_bridge).await;
141            app.state.streaming_text.clear();
142            app.state.complete_request_timing();
143
144            // Attempt smart switch retry on retryable provider errors
145            let current_model = session.metadata.model.as_deref();
146            let current_provider = current_model.and_then(|m| m.split('/').next());
147            let prompt = app.state.main_inflight_prompt.clone().unwrap_or_default();
148
149            if let Some(pending) = maybe_schedule_smart_switch_retry(
150                &err,
151                current_model,
152                current_provider,
153                &app.state.available_models,
154                &prompt,
155                app.state.smart_switch_retry_count,
156                &app.state.smart_switch_attempted_models,
157            ) {
158                app.state.smart_switch_retry_count += 1;
159                app.state
160                    .smart_switch_attempted_models
161                    .push(current_model.unwrap_or("unknown").to_string());
162                app.state
163                    .smart_switch_attempted_models
164                    .push(pending.target_model.clone());
165                app.state.status = format!(
166                    "Smart switch retry {}/{} → {}",
167                    app.state.smart_switch_retry_count,
168                    smart_switch_max_retries(),
169                    pending.target_model,
170                );
171                app.state.pending_smart_switch_retry = Some(pending);
172            } else {
173                // No retry possible — reset smart switch state
174                app.state.smart_switch_retry_count = 0;
175                app.state.smart_switch_attempted_models.clear();
176                app.state.pending_smart_switch_retry = None;
177            }
178
179            app.state
180                .messages
181                .push(ChatMessage::new(MessageType::Error, err.clone()));
182            app.state.status = "Error".to_string();
183            app.state.scroll_to_bottom();
184        }
185        // New non-exhaustive variants (TokenEstimate, TokenUsage,
186        // RlmProgress, RlmComplete, Compaction*, ContextTruncated) are
187        // consumed by dedicated SessionBus subscribers, not this legacy
188        // mpsc handler. Intentionally ignored here.
189        _ => {}
190    }
191}
192
193#[cfg(test)]
194mod tests {
195    use super::*;
196    use crate::session::Session;
197    use crate::tui::chat::message::MessageType;
198
199    #[tokio::test]
200    async fn text_chunk_replaces_streaming_preview_with_latest_cumulative_text() {
201        let mut app = App::default();
202        let mut session = Session::new().await.expect("session should create");
203
204        handle_session_event(
205            &mut app,
206            &mut session,
207            &None,
208            SessionEvent::TextChunk("hel".to_string()),
209        )
210        .await;
211        assert_eq!(app.state.streaming_text, "hel");
212
213        handle_session_event(
214            &mut app,
215            &mut session,
216            &None,
217            SessionEvent::TextChunk("hello".to_string()),
218        )
219        .await;
220        assert_eq!(app.state.streaming_text, "hello");
221    }
222
223    #[tokio::test]
224    async fn tool_completion_records_duration_for_chat_and_latency_view() {
225        let mut app = App::default();
226        let mut session = Session::new().await.expect("session should create");
227
228        handle_session_event(
229            &mut app,
230            &mut session,
231            &None,
232            SessionEvent::ToolCallComplete {
233                name: "read".to_string(),
234                output: "src/main.rs".to_string(),
235                success: true,
236                duration_ms: 42,
237            },
238        )
239        .await;
240
241        let Some(message) = app.state.messages.last() else {
242            panic!("expected a tool result message");
243        };
244        match &message.message_type {
245            MessageType::ToolResult {
246                name,
247                success,
248                duration_ms,
249                ..
250            } => {
251                assert_eq!(name, "read");
252                assert!(*success);
253                assert_eq!(*duration_ms, Some(42));
254            }
255            other => panic!("expected tool result message, got {other:?}"),
256        }
257        assert_eq!(app.state.last_tool_name.as_deref(), Some("read"));
258        assert_eq!(app.state.last_tool_latency_ms, Some(42));
259        assert_eq!(app.state.last_tool_success, Some(true));
260    }
261
262    #[tokio::test]
263    async fn usage_report_updates_latency_snapshot() {
264        let mut app = App::default();
265        let mut session = Session::new().await.expect("session should create");
266
267        handle_session_event(
268            &mut app,
269            &mut session,
270            &None,
271            SessionEvent::UsageReport {
272                model: "openai/gpt-5.4".to_string(),
273                prompt_tokens: 120,
274                completion_tokens: 64,
275                duration_ms: 1_250,
276            },
277        )
278        .await;
279
280        assert_eq!(
281            app.state.last_completion_model.as_deref(),
282            Some("openai/gpt-5.4")
283        );
284        assert_eq!(app.state.last_completion_latency_ms, Some(1_250));
285        assert_eq!(app.state.last_completion_prompt_tokens, Some(120));
286        assert_eq!(app.state.last_completion_output_tokens, Some(64));
287    }
288
289    #[tokio::test]
290    async fn text_events_record_request_ttft_and_last_token() {
291        let mut app = App::default();
292        let mut session = Session::new().await.expect("session should create");
293        app.state.processing_started_at =
294            Some(std::time::Instant::now() - std::time::Duration::from_millis(15));
295
296        handle_session_event(
297            &mut app,
298            &mut session,
299            &None,
300            SessionEvent::TextChunk("hello".to_string()),
301        )
302        .await;
303
304        let first = app
305            .state
306            .current_request_first_token_ms
307            .expect("expected ttft after first chunk");
308        assert_eq!(app.state.current_request_last_token_ms, Some(first));
309
310        app.state.processing_started_at =
311            Some(std::time::Instant::now() - std::time::Duration::from_millis(30));
312        handle_session_event(
313            &mut app,
314            &mut session,
315            &None,
316            SessionEvent::TextComplete("hello".to_string()),
317        )
318        .await;
319
320        assert_eq!(app.state.current_request_first_token_ms, Some(first));
321        assert!(
322            app.state
323                .current_request_last_token_ms
324                .expect("expected last token timing")
325                >= first
326        );
327    }
328
329    #[tokio::test]
330    async fn done_promotes_request_timing_snapshot() {
331        let mut app = App::default();
332        let mut session = Session::new().await.expect("session should create");
333        app.state.processing_started_at = Some(std::time::Instant::now());
334        app.state.current_request_first_token_ms = Some(120);
335        app.state.current_request_last_token_ms = Some(980);
336
337        handle_session_event(&mut app, &mut session, &None, SessionEvent::Done).await;
338
339        assert_eq!(app.state.last_request_first_token_ms, Some(120));
340        assert_eq!(app.state.last_request_last_token_ms, Some(980));
341        assert!(app.state.processing_started_at.is_none());
342        assert!(app.state.current_request_first_token_ms.is_none());
343        assert!(app.state.current_request_last_token_ms.is_none());
344    }
345}