Skip to main content

ralph_workflow/json_parser/codex/
event_handlers.rs

1use crate::common::truncate_text;
2use crate::json_parser::codex::event_interpretation::{
3    compute_reasoning_incremental_delta, interpret_item_completed_type,
4    interpret_item_started_type, ItemCompletedInterpretation, ItemStartedInterpretation,
5};
6use crate::json_parser::delta_display::{
7    sanitize_for_display, DeltaRenderer, TextDeltaRenderer, ThinkingDeltaRenderer,
8};
9use crate::json_parser::terminal::TerminalMode;
10use crate::json_parser::types::{
11    format_dim_continuation_line, format_token_counts, format_tokens_suffix, format_tool_input,
12    CodexItem, CodexUsage, ContentType,
13};
14use crate::logger::{CHECK, CROSS};
15
16#[cfg(any(test, debug_assertions))]
17use std::io::Write;
18pub struct EventHandlerContext<'a> {
19    pub colors: &'a crate::logger::Colors,
20    pub verbosity: crate::config::Verbosity,
21    pub display_name: &'a str,
22    pub streaming_session:
23        &'a std::rc::Rc<std::cell::RefCell<crate::json_parser::streaming_state::StreamingSession>>,
24    pub reasoning_accumulator:
25        &'a std::rc::Rc<std::cell::RefCell<crate::json_parser::types::DeltaAccumulator>>,
26    pub terminal_mode: crate::json_parser::terminal::TerminalMode,
27    pub show_streaming_metrics: bool,
28    pub last_rendered_content: &'a std::cell::RefCell<std::collections::HashMap<String, String>>,
29}
30
31impl<'a> EventHandlerContext<'a> {
32    pub fn with_session_mut<R>(
33        &self,
34        f: impl FnOnce(&mut crate::json_parser::streaming_state::StreamingSession) -> R,
35    ) -> R {
36        f(&mut self.streaming_session.borrow_mut())
37    }
38    pub fn with_reasoning_accumulator_mut<R>(
39        &self,
40        f: impl FnOnce(&mut crate::json_parser::types::DeltaAccumulator) -> R,
41    ) -> R {
42        f(&mut self.reasoning_accumulator.borrow_mut())
43    }
44    pub fn with_last_rendered_content_mut<R>(
45        &self,
46        f: impl FnOnce(&mut std::collections::HashMap<String, String>) -> R,
47    ) -> R {
48        f(&mut self.last_rendered_content.borrow_mut())
49    }
50}
51pub fn handle_item_started(
52    ctx: &EventHandlerContext<'_>,
53    item: Option<&CodexItem>,
54) -> Option<String> {
55    item.and_then(
56        |item| match interpret_item_started_type(item.item_type.as_deref()) {
57            Some(ItemStartedInterpretation::CommandExecution) => {
58                let output = handle_command_execution_started(ctx, item.command.clone());
59                (!output.is_empty()).then_some(output)
60            }
61            Some(ItemStartedInterpretation::AgentMessage) => {
62                Some(handle_agent_message_started(ctx, item.text.as_ref()))
63            }
64            Some(ItemStartedInterpretation::Reasoning) => {
65                Some(handle_reasoning_started(ctx, item.text.as_ref()))
66            }
67            Some(ItemStartedInterpretation::FileRead) => {
68                let output = handle_file_io_started(ctx, item.path.clone(), "file_read");
69                (!output.is_empty()).then_some(output)
70            }
71            Some(ItemStartedInterpretation::FileWrite) => {
72                let output = handle_file_io_started(ctx, item.path.clone(), "file_write");
73                (!output.is_empty()).then_some(output)
74            }
75            Some(ItemStartedInterpretation::McpTool) => {
76                let output =
77                    handle_mcp_tool_started(ctx, item.tool.as_ref(), item.arguments.as_ref());
78                (!output.is_empty()).then_some(output)
79            }
80            Some(ItemStartedInterpretation::WebSearch) => {
81                let output = handle_web_search_started(ctx, item.query.as_ref());
82                (!output.is_empty()).then_some(output)
83            }
84            Some(ItemStartedInterpretation::PlanUpdate) => {
85                let output = handle_plan_update_started(ctx);
86                (!output.is_empty()).then_some(output)
87            }
88            Some(ItemStartedInterpretation::Unknown(item_type)) => {
89                let output = handle_unknown_item_started(ctx, Some(item_type), item.path.clone());
90                (!output.is_empty()).then_some(output)
91            }
92            None => None,
93        },
94    )
95}
96pub fn handle_item_completed(
97    ctx: &EventHandlerContext<'_>,
98    item: Option<&CodexItem>,
99) -> Option<String> {
100    item.and_then(
101        |item| match interpret_item_completed_type(item.item_type.as_deref()) {
102            Some(ItemCompletedInterpretation::AgentMessage) => {
103                Some(handle_agent_message_completed(ctx, item.text.as_ref()))
104            }
105            Some(ItemCompletedInterpretation::Reasoning) => {
106                Some(handle_reasoning_completed(ctx, item.text.as_ref()))
107            }
108            Some(ItemCompletedInterpretation::CommandExecution) => {
109                let output = handle_command_execution_completed(ctx);
110                (!output.is_empty()).then_some(output)
111            }
112            Some(ItemCompletedInterpretation::FileWrite) => {
113                let output = handle_file_write_completed(ctx, item.path.clone());
114                (!output.is_empty()).then_some(output)
115            }
116            Some(ItemCompletedInterpretation::FileRead) => {
117                let output = handle_file_read_completed(ctx, item.path.clone());
118                (!output.is_empty()).then_some(output)
119            }
120            Some(ItemCompletedInterpretation::McpTool) => {
121                let output = handle_mcp_tool_completed(ctx, item.tool.clone());
122                (!output.is_empty()).then_some(output)
123            }
124            Some(ItemCompletedInterpretation::WebSearch) => {
125                let output = handle_web_search_completed(ctx);
126                (!output.is_empty()).then_some(output)
127            }
128            Some(ItemCompletedInterpretation::PlanUpdate) => {
129                let output = handle_plan_update_completed(ctx, item.plan.as_ref());
130                (!output.is_empty()).then_some(output)
131            }
132            _ => None,
133        },
134    )
135}
136pub fn handle_thread_started(ctx: &EventHandlerContext<'_>, thread_id: Option<String>) -> String {
137    let tid = thread_id.unwrap_or_else(|| "unknown".to_string());
138
139    // Thread start indicates a new logical stream in Codex; reset any append-only tracking
140    // so subsequent deltas start fresh.
141    ctx.last_rendered_content.borrow_mut().clear();
142
143    ctx.streaming_session
144        .borrow_mut()
145        .set_current_message_id(Some(tid.clone()));
146    let hash_display = crate::json_parser::types::format_short_hash(&tid);
147    let hash_suffix = if hash_display.is_empty() {
148        String::new()
149    } else {
150        format!(
151            " {}{}{}",
152            ctx.colors.dim(),
153            hash_display,
154            ctx.colors.reset()
155        )
156    };
157    format!(
158        "{}[{}]{} {}Thread started{}{}\n",
159        ctx.colors.dim(),
160        ctx.display_name,
161        ctx.colors.reset(),
162        ctx.colors.cyan(),
163        ctx.colors.reset(),
164        hash_suffix
165    )
166}
167
168pub fn handle_turn_started(ctx: &EventHandlerContext<'_>, turn_id: String) -> String {
169    ctx.streaming_session.borrow_mut().on_message_start();
170    let mut acc = ctx.reasoning_accumulator.borrow_mut();
171    let placeholder = crate::json_parser::types::DeltaAccumulator::new();
172    let old = std::mem::replace(&mut *acc, placeholder);
173    let new = old.clear();
174    *acc = new;
175
176    // Each Codex turn is a new logical stream. Clear append-only renderer state so the
177    // first delta of the new turn re-emits the prefix/label instead of computing a suffix
178    // against the previous turn's content.
179    ctx.last_rendered_content.borrow_mut().clear();
180
181    ctx.streaming_session
182        .borrow_mut()
183        .set_current_message_id(Some(turn_id));
184    format!(
185        "{}[{}]{} {}Turn started{}\n",
186        ctx.colors.dim(),
187        ctx.display_name,
188        ctx.colors.reset(),
189        ctx.colors.blue(),
190        ctx.colors.reset()
191    )
192}
193
194pub fn handle_turn_completed(ctx: &EventHandlerContext<'_>, usage: Option<CodexUsage>) -> String {
195    let was_in_block = ctx.streaming_session.borrow_mut().on_message_stop();
196    let (input, output) = usage.map_or((0, 0), |u| {
197        (u.input_tokens.unwrap_or(0), u.output_tokens.unwrap_or(0))
198    });
199    let completion = if was_in_block {
200        TextDeltaRenderer::render_completion(ctx.terminal_mode)
201    } else {
202        String::new()
203    };
204    let tokens_str = format_token_counts(input, output, 0, 0);
205    let tokens_suffix = format_tokens_suffix(&tokens_str);
206    format!(
207        "{}{}[{}]{} {}{} Turn completed{}{}{}{}\n",
208        completion,
209        ctx.colors.dim(),
210        ctx.display_name,
211        ctx.colors.reset(),
212        ctx.colors.green(),
213        CHECK,
214        ctx.colors.reset(),
215        ctx.colors.dim(),
216        tokens_suffix,
217        ctx.colors.reset()
218    )
219}
220
221pub fn handle_turn_failed(ctx: &EventHandlerContext<'_>, error: Option<String>) -> String {
222    let was_in_block = ctx.streaming_session.borrow_mut().on_message_stop();
223    let completion = if was_in_block {
224        TextDeltaRenderer::render_completion(ctx.terminal_mode)
225    } else {
226        String::new()
227    };
228    let err = error.unwrap_or_else(|| "unknown error".to_string());
229    format!(
230        "{}{}[{}]{} {}{} Turn failed:{} {}\n",
231        completion,
232        ctx.colors.dim(),
233        ctx.display_name,
234        ctx.colors.reset(),
235        ctx.colors.red(),
236        CROSS,
237        ctx.colors.reset(),
238        err
239    )
240}
241
242pub fn handle_command_execution_started(
243    ctx: &EventHandlerContext<'_>,
244    command: Option<String>,
245) -> String {
246    let cmd = command.unwrap_or_default();
247    let limit = ctx.verbosity.truncate_limit("command");
248    let preview = truncate_text(&cmd, limit);
249    format!(
250        "{}[{}]{} {}Exec{}: {}{}{}\n",
251        ctx.colors.dim(),
252        ctx.display_name,
253        ctx.colors.reset(),
254        ctx.colors.magenta(),
255        ctx.colors.reset(),
256        ctx.colors.white(),
257        preview,
258        ctx.colors.reset()
259    )
260}
261
262pub fn handle_file_io_started(
263    ctx: &EventHandlerContext<'_>,
264    path: Option<String>,
265    action: &str,
266) -> String {
267    let path = path.unwrap_or_default();
268    format!(
269        "{}[{}]{} {}{}:{} {}\n",
270        ctx.colors.dim(),
271        ctx.display_name,
272        ctx.colors.reset(),
273        ctx.colors.yellow(),
274        action,
275        ctx.colors.reset(),
276        path
277    )
278}
279
280fn maybe_format_mcp_tool_input(
281    ctx: &EventHandlerContext<'_>,
282    arguments: Option<&serde_json::Value>,
283) -> String {
284    arguments
285        .filter(|_| ctx.verbosity.show_tool_input())
286        .map_or_else(String::new, |args| {
287            let args_str = format_tool_input(args);
288            let limit = ctx.verbosity.truncate_limit("tool_input");
289            let preview = truncate_text(&args_str, limit);
290            if preview.is_empty() {
291                String::new()
292            } else {
293                // TerminalMode::None must not emit ANSI codes even when colors are enabled.
294                match ctx.terminal_mode {
295                    TerminalMode::None => {
296                        format!("[{}]   \u{2514}\u{2500} {}\n", ctx.display_name, preview)
297                    }
298                    _ => format_dim_continuation_line(&preview, ctx.display_name, *ctx.colors),
299                }
300            }
301        })
302}
303
304pub fn handle_mcp_tool_started(
305    ctx: &EventHandlerContext<'_>,
306    tool_name: Option<&String>,
307    arguments: Option<&serde_json::Value>,
308) -> String {
309    let default = String::from("unknown");
310    let tool_name = tool_name.unwrap_or(&default);
311
312    let base = match ctx.terminal_mode {
313        TerminalMode::Full | TerminalMode::Basic => format!(
314            "{}[{}]{} {}MCP Tool{}: {}{}{}\n",
315            ctx.colors.dim(),
316            ctx.display_name,
317            ctx.colors.reset(),
318            ctx.colors.magenta(),
319            ctx.colors.reset(),
320            ctx.colors.bold(),
321            tool_name,
322            ctx.colors.reset()
323        ),
324        TerminalMode::None => format!("[{}] MCP Tool: {}\n", ctx.display_name, tool_name),
325    };
326
327    let tool_input = maybe_format_mcp_tool_input(ctx, arguments);
328    format!("{base}{tool_input}")
329}
330
331pub fn handle_web_search_started(ctx: &EventHandlerContext<'_>, query: Option<&String>) -> String {
332    let default = String::new();
333    let query = query.unwrap_or(&default);
334    let limit = ctx.verbosity.truncate_limit("command");
335    let preview = truncate_text(query, limit);
336    format!(
337        "{}[{}]{} {}Search{}: {}{}{}\n",
338        ctx.colors.dim(),
339        ctx.display_name,
340        ctx.colors.reset(),
341        ctx.colors.cyan(),
342        ctx.colors.reset(),
343        ctx.colors.white(),
344        preview,
345        ctx.colors.reset()
346    )
347}
348
349pub fn handle_plan_update_started(ctx: &EventHandlerContext<'_>) -> String {
350    format!(
351        "{}[{}]{} {}Updating plan...{}\n",
352        ctx.colors.dim(),
353        ctx.display_name,
354        ctx.colors.reset(),
355        ctx.colors.blue(),
356        ctx.colors.reset()
357    )
358}
359
360pub fn handle_unknown_item_started(
361    ctx: &EventHandlerContext<'_>,
362    item_type: Option<String>,
363    path: Option<String>,
364) -> String {
365    if ctx.verbosity.is_verbose() {
366        if let Some(t) = item_type {
367            return format!(
368                "{}[{}]{} {}{}:{} {}\n",
369                ctx.colors.dim(),
370                ctx.display_name,
371                ctx.colors.reset(),
372                ctx.colors.dim(),
373                t,
374                ctx.colors.reset(),
375                path.unwrap_or_default()
376            );
377        }
378    }
379    String::new()
380}
381
382pub fn handle_command_execution_completed(ctx: &EventHandlerContext<'_>) -> String {
383    match ctx.terminal_mode {
384        TerminalMode::Full | TerminalMode::Basic => format!(
385            "{}[{}]{} {}{} Command done{}\n",
386            ctx.colors.dim(),
387            ctx.display_name,
388            ctx.colors.reset(),
389            ctx.colors.green(),
390            CHECK,
391            ctx.colors.reset()
392        ),
393        TerminalMode::None => format!("[{}] Command done\n", ctx.display_name),
394    }
395}
396
397pub fn handle_file_write_completed(ctx: &EventHandlerContext<'_>, path: Option<String>) -> String {
398    let path = path.unwrap_or_else(|| "unknown".to_string());
399    match ctx.terminal_mode {
400        TerminalMode::Full | TerminalMode::Basic => format!(
401            "{}[{}]{} {}File{}: {}\n",
402            ctx.colors.dim(),
403            ctx.display_name,
404            ctx.colors.reset(),
405            ctx.colors.yellow(),
406            ctx.colors.reset(),
407            path
408        ),
409        TerminalMode::None => format!("[{}] File: {}\n", ctx.display_name, path),
410    }
411}
412
413pub fn handle_file_read_completed(ctx: &EventHandlerContext<'_>, path: Option<String>) -> String {
414    if ctx.verbosity.is_verbose() {
415        let path = path.unwrap_or_else(|| "unknown".to_string());
416        format!(
417            "{}[{}]{} {}{} Read:{} {}\n",
418            ctx.colors.dim(),
419            ctx.display_name,
420            ctx.colors.reset(),
421            ctx.colors.green(),
422            CHECK,
423            ctx.colors.reset(),
424            path
425        )
426    } else {
427        String::new()
428    }
429}
430
431pub fn handle_mcp_tool_completed(
432    ctx: &EventHandlerContext<'_>,
433    tool_name: Option<String>,
434) -> String {
435    let tool_name = tool_name.unwrap_or_else(|| "tool".to_string());
436    match ctx.terminal_mode {
437        TerminalMode::Full | TerminalMode::Basic => format!(
438            "{}[{}]{} {}{} MCP:{} {} done\n",
439            ctx.colors.dim(),
440            ctx.display_name,
441            ctx.colors.reset(),
442            ctx.colors.green(),
443            CHECK,
444            ctx.colors.reset(),
445            tool_name
446        ),
447        TerminalMode::None => format!("[{}] MCP: {} done\n", ctx.display_name, tool_name),
448    }
449}
450
451pub fn handle_web_search_completed(ctx: &EventHandlerContext<'_>) -> String {
452    match ctx.terminal_mode {
453        TerminalMode::Full | TerminalMode::Basic => format!(
454            "{}[{}]{} {}{} Search completed{}\n",
455            ctx.colors.dim(),
456            ctx.display_name,
457            ctx.colors.reset(),
458            ctx.colors.green(),
459            CHECK,
460            ctx.colors.reset()
461        ),
462        TerminalMode::None => format!("[{}] Search completed\n", ctx.display_name),
463    }
464}
465
466pub fn handle_plan_update_completed(
467    ctx: &EventHandlerContext<'_>,
468    plan: Option<&String>,
469) -> String {
470    if ctx.verbosity.is_verbose() {
471        let limit = ctx.verbosity.truncate_limit("text");
472        plan.map_or_else(
473            || match ctx.terminal_mode {
474                TerminalMode::Full | TerminalMode::Basic => format!(
475                    "{}[{}]{} {}{} Plan updated{}\n",
476                    ctx.colors.dim(),
477                    ctx.display_name,
478                    ctx.colors.reset(),
479                    ctx.colors.green(),
480                    CHECK,
481                    ctx.colors.reset()
482                ),
483                TerminalMode::None => format!("[{}] Plan updated\n", ctx.display_name),
484            },
485            |plan| {
486                let preview = truncate_text(plan, limit);
487                match ctx.terminal_mode {
488                    TerminalMode::Full | TerminalMode::Basic => format!(
489                        "{}[{}]{} {}Plan:{} {}\n",
490                        ctx.colors.dim(),
491                        ctx.display_name,
492                        ctx.colors.reset(),
493                        ctx.colors.blue(),
494                        ctx.colors.reset(),
495                        preview
496                    ),
497                    TerminalMode::None => format!("[{}] Plan: {}\n", ctx.display_name, preview),
498                }
499            },
500        )
501    } else {
502        String::new()
503    }
504}
505
506pub fn handle_error(
507    ctx: &EventHandlerContext<'_>,
508    message: Option<String>,
509    error: Option<String>,
510) -> String {
511    let err = message
512        .or(error)
513        .unwrap_or_else(|| "unknown error".to_string());
514    format!(
515        "{}[{}]{} {}{} Error:{} {}\n",
516        ctx.colors.dim(),
517        ctx.display_name,
518        ctx.colors.reset(),
519        ctx.colors.red(),
520        CROSS,
521        ctx.colors.reset(),
522        err
523    )
524}
525
526include!("event_handlers_agent_message.rs");