Skip to main content

ralph_workflow/json_parser/codex/
event_handlers.rs

1use crate::common::truncate_text;
2use crate::json_parser::codex::event_interpretation::{
3    compute_reasoning_incremental_delta, interpret_item_completed_type,
4    interpret_item_started_type, ItemCompletedInterpretation, ItemStartedInterpretation,
5};
6use crate::json_parser::delta_display::{
7    sanitize_for_display, DeltaRenderer, TextDeltaRenderer, ThinkingDeltaRenderer,
8};
9use crate::json_parser::terminal::TerminalMode;
10use crate::json_parser::types::{format_tool_input, CodexItem, CodexUsage, ContentType};
11use crate::logger::{CHECK, CROSS};
12
13#[cfg(any(test, debug_assertions))]
14use std::io::Write;
15pub struct EventHandlerContext<'a> {
16    pub colors: &'a crate::logger::Colors,
17    pub verbosity: crate::config::Verbosity,
18    pub display_name: &'a str,
19    pub streaming_session:
20        &'a std::rc::Rc<std::cell::RefCell<crate::json_parser::streaming_state::StreamingSession>>,
21    pub reasoning_accumulator:
22        &'a std::rc::Rc<std::cell::RefCell<crate::json_parser::types::DeltaAccumulator>>,
23    pub terminal_mode: crate::json_parser::terminal::TerminalMode,
24    pub show_streaming_metrics: bool,
25    pub last_rendered_content: &'a std::cell::RefCell<std::collections::HashMap<String, String>>,
26}
27
28impl<'a> EventHandlerContext<'a> {
29    pub fn with_session_mut<R>(
30        &self,
31        f: impl FnOnce(&mut crate::json_parser::streaming_state::StreamingSession) -> R,
32    ) -> R {
33        f(&mut self.streaming_session.borrow_mut())
34    }
35    pub fn with_reasoning_accumulator_mut<R>(
36        &self,
37        f: impl FnOnce(&mut crate::json_parser::types::DeltaAccumulator) -> R,
38    ) -> R {
39        f(&mut self.reasoning_accumulator.borrow_mut())
40    }
41    pub fn with_last_rendered_content_mut<R>(
42        &self,
43        f: impl FnOnce(&mut std::collections::HashMap<String, String>) -> R,
44    ) -> R {
45        f(&mut self.last_rendered_content.borrow_mut())
46    }
47}
48pub fn handle_item_started(
49    ctx: &EventHandlerContext<'_>,
50    item: Option<&CodexItem>,
51) -> Option<String> {
52    item.and_then(
53        |item| match interpret_item_started_type(item.item_type.as_deref()) {
54            Some(ItemStartedInterpretation::CommandExecution) => {
55                let output = handle_command_execution_started(ctx, item.command.clone());
56                (!output.is_empty()).then_some(output)
57            }
58            Some(ItemStartedInterpretation::AgentMessage) => {
59                Some(handle_agent_message_started(ctx, item.text.as_ref()))
60            }
61            Some(ItemStartedInterpretation::Reasoning) => {
62                Some(handle_reasoning_started(ctx, item.text.as_ref()))
63            }
64            Some(ItemStartedInterpretation::FileRead) => {
65                let output = handle_file_io_started(ctx, item.path.clone(), "file_read");
66                (!output.is_empty()).then_some(output)
67            }
68            Some(ItemStartedInterpretation::FileWrite) => {
69                let output = handle_file_io_started(ctx, item.path.clone(), "file_write");
70                (!output.is_empty()).then_some(output)
71            }
72            Some(ItemStartedInterpretation::McpTool) => {
73                let output =
74                    handle_mcp_tool_started(ctx, item.tool.as_ref(), item.arguments.as_ref());
75                (!output.is_empty()).then_some(output)
76            }
77            Some(ItemStartedInterpretation::WebSearch) => {
78                let output = handle_web_search_started(ctx, item.query.as_ref());
79                (!output.is_empty()).then_some(output)
80            }
81            Some(ItemStartedInterpretation::PlanUpdate) => {
82                let output = handle_plan_update_started(ctx);
83                (!output.is_empty()).then_some(output)
84            }
85            Some(ItemStartedInterpretation::Unknown(item_type)) => {
86                let output = handle_unknown_item_started(ctx, Some(item_type), item.path.clone());
87                (!output.is_empty()).then_some(output)
88            }
89            None => None,
90        },
91    )
92}
93pub fn handle_item_completed(
94    ctx: &EventHandlerContext<'_>,
95    item: Option<&CodexItem>,
96) -> Option<String> {
97    item.and_then(
98        |item| match interpret_item_completed_type(item.item_type.as_deref()) {
99            Some(ItemCompletedInterpretation::AgentMessage) => {
100                Some(handle_agent_message_completed(ctx, item.text.as_ref()))
101            }
102            Some(ItemCompletedInterpretation::Reasoning) => {
103                Some(handle_reasoning_completed(ctx, item.text.as_ref()))
104            }
105            Some(ItemCompletedInterpretation::CommandExecution) => {
106                let output = handle_command_execution_completed(ctx);
107                (!output.is_empty()).then_some(output)
108            }
109            Some(ItemCompletedInterpretation::FileWrite) => {
110                let output = handle_file_write_completed(ctx, item.path.clone());
111                (!output.is_empty()).then_some(output)
112            }
113            Some(ItemCompletedInterpretation::FileRead) => {
114                let output = handle_file_read_completed(ctx, item.path.clone());
115                (!output.is_empty()).then_some(output)
116            }
117            Some(ItemCompletedInterpretation::McpTool) => {
118                let output = handle_mcp_tool_completed(ctx, item.tool.clone());
119                (!output.is_empty()).then_some(output)
120            }
121            Some(ItemCompletedInterpretation::WebSearch) => {
122                let output = handle_web_search_completed(ctx);
123                (!output.is_empty()).then_some(output)
124            }
125            Some(ItemCompletedInterpretation::PlanUpdate) => {
126                let output = handle_plan_update_completed(ctx, item.plan.as_ref());
127                (!output.is_empty()).then_some(output)
128            }
129            _ => None,
130        },
131    )
132}
133pub fn handle_thread_started(ctx: &EventHandlerContext<'_>, thread_id: Option<String>) -> String {
134    let tid = thread_id.unwrap_or_else(|| "unknown".to_string());
135
136    // Thread start indicates a new logical stream in Codex; reset any append-only tracking
137    // so subsequent deltas start fresh.
138    ctx.last_rendered_content.borrow_mut().clear();
139
140    ctx.streaming_session
141        .borrow_mut()
142        .set_current_message_id(Some(tid.clone()));
143    format!(
144        "{}[{}]{} {}Thread started{} {}({:.8}...){}\n",
145        ctx.colors.dim(),
146        ctx.display_name,
147        ctx.colors.reset(),
148        ctx.colors.cyan(),
149        ctx.colors.reset(),
150        ctx.colors.dim(),
151        tid,
152        ctx.colors.reset()
153    )
154}
155
156pub fn handle_turn_started(ctx: &EventHandlerContext<'_>, turn_id: String) -> String {
157    ctx.streaming_session.borrow_mut().on_message_start();
158    let mut acc = ctx.reasoning_accumulator.borrow_mut();
159    let placeholder = crate::json_parser::types::DeltaAccumulator::new();
160    let old = std::mem::replace(&mut *acc, placeholder);
161    let new = old.clear();
162    *acc = new;
163
164    // Each Codex turn is a new logical stream. Clear append-only renderer state so the
165    // first delta of the new turn re-emits the prefix/label instead of computing a suffix
166    // against the previous turn's content.
167    ctx.last_rendered_content.borrow_mut().clear();
168
169    ctx.streaming_session
170        .borrow_mut()
171        .set_current_message_id(Some(turn_id));
172    format!(
173        "{}[{}]{} {}Turn started{}\n",
174        ctx.colors.dim(),
175        ctx.display_name,
176        ctx.colors.reset(),
177        ctx.colors.blue(),
178        ctx.colors.reset()
179    )
180}
181
182pub fn handle_turn_completed(ctx: &EventHandlerContext<'_>, usage: Option<CodexUsage>) -> String {
183    let was_in_block = ctx.streaming_session.borrow_mut().on_message_stop();
184    let (input, output) = usage.map_or((0, 0), |u| {
185        (u.input_tokens.unwrap_or(0), u.output_tokens.unwrap_or(0))
186    });
187    let completion = if was_in_block {
188        TextDeltaRenderer::render_completion(ctx.terminal_mode)
189    } else {
190        String::new()
191    };
192    format!(
193        "{}{}[{}]{} {}{} Turn completed{} {}(in:{} out:{}){}\n",
194        completion,
195        ctx.colors.dim(),
196        ctx.display_name,
197        ctx.colors.reset(),
198        ctx.colors.green(),
199        CHECK,
200        ctx.colors.reset(),
201        ctx.colors.dim(),
202        input,
203        output,
204        ctx.colors.reset()
205    )
206}
207
208pub fn handle_turn_failed(ctx: &EventHandlerContext<'_>, error: Option<String>) -> String {
209    let was_in_block = ctx.streaming_session.borrow_mut().on_message_stop();
210    let completion = if was_in_block {
211        TextDeltaRenderer::render_completion(ctx.terminal_mode)
212    } else {
213        String::new()
214    };
215    let err = error.unwrap_or_else(|| "unknown error".to_string());
216    format!(
217        "{}{}[{}]{} {}{} Turn failed:{} {}\n",
218        completion,
219        ctx.colors.dim(),
220        ctx.display_name,
221        ctx.colors.reset(),
222        ctx.colors.red(),
223        CROSS,
224        ctx.colors.reset(),
225        err
226    )
227}
228
229pub fn handle_command_execution_started(
230    ctx: &EventHandlerContext<'_>,
231    command: Option<String>,
232) -> String {
233    let cmd = command.unwrap_or_default();
234    let limit = ctx.verbosity.truncate_limit("command");
235    let preview = truncate_text(&cmd, limit);
236    format!(
237        "{}[{}]{} {}Exec{}: {}{}{}\n",
238        ctx.colors.dim(),
239        ctx.display_name,
240        ctx.colors.reset(),
241        ctx.colors.magenta(),
242        ctx.colors.reset(),
243        ctx.colors.white(),
244        preview,
245        ctx.colors.reset()
246    )
247}
248
249pub fn handle_file_io_started(
250    ctx: &EventHandlerContext<'_>,
251    path: Option<String>,
252    action: &str,
253) -> String {
254    let path = path.unwrap_or_default();
255    format!(
256        "{}[{}]{} {}{}:{} {}\n",
257        ctx.colors.dim(),
258        ctx.display_name,
259        ctx.colors.reset(),
260        ctx.colors.yellow(),
261        action,
262        ctx.colors.reset(),
263        path
264    )
265}
266
267fn format_mcp_tool_input_preview(ctx: &EventHandlerContext<'_>, preview: &str) -> String {
268    match ctx.terminal_mode {
269        TerminalMode::Full | TerminalMode::Basic => format!(
270            "{}[{}]{} {}  \u{2514}\u{2500} {}{}{}\n",
271            ctx.colors.dim(),
272            ctx.display_name,
273            ctx.colors.reset(),
274            ctx.colors.dim(),
275            ctx.colors.reset(),
276            preview,
277            ctx.colors.reset()
278        ),
279        TerminalMode::None => format!("[{}]   \u{2514}\u{2500} {}\n", ctx.display_name, preview),
280    }
281}
282
283fn maybe_format_mcp_tool_input(
284    ctx: &EventHandlerContext<'_>,
285    arguments: Option<&serde_json::Value>,
286) -> String {
287    arguments
288        .filter(|_| ctx.verbosity.show_tool_input())
289        .map_or_else(String::new, |args| {
290            let args_str = format_tool_input(args);
291            let limit = ctx.verbosity.truncate_limit("tool_input");
292            let preview = truncate_text(&args_str, limit);
293            if preview.is_empty() {
294                String::new()
295            } else {
296                format_mcp_tool_input_preview(ctx, &preview)
297            }
298        })
299}
300
301pub fn handle_mcp_tool_started(
302    ctx: &EventHandlerContext<'_>,
303    tool_name: Option<&String>,
304    arguments: Option<&serde_json::Value>,
305) -> String {
306    let default = String::from("unknown");
307    let tool_name = tool_name.unwrap_or(&default);
308
309    let base = match ctx.terminal_mode {
310        TerminalMode::Full | TerminalMode::Basic => format!(
311            "{}[{}]{} {}MCP Tool{}: {}{}{}\n",
312            ctx.colors.dim(),
313            ctx.display_name,
314            ctx.colors.reset(),
315            ctx.colors.magenta(),
316            ctx.colors.reset(),
317            ctx.colors.bold(),
318            tool_name,
319            ctx.colors.reset()
320        ),
321        TerminalMode::None => format!("[{}] MCP Tool: {}\n", ctx.display_name, tool_name),
322    };
323
324    let tool_input = maybe_format_mcp_tool_input(ctx, arguments);
325    format!("{base}{tool_input}")
326}
327
328pub fn handle_web_search_started(ctx: &EventHandlerContext<'_>, query: Option<&String>) -> String {
329    let default = String::new();
330    let query = query.unwrap_or(&default);
331    let limit = ctx.verbosity.truncate_limit("command");
332    let preview = truncate_text(query, limit);
333    format!(
334        "{}[{}]{} {}Search{}: {}{}{}\n",
335        ctx.colors.dim(),
336        ctx.display_name,
337        ctx.colors.reset(),
338        ctx.colors.cyan(),
339        ctx.colors.reset(),
340        ctx.colors.white(),
341        preview,
342        ctx.colors.reset()
343    )
344}
345
346pub fn handle_plan_update_started(ctx: &EventHandlerContext<'_>) -> String {
347    format!(
348        "{}[{}]{} {}Updating plan...{}\n",
349        ctx.colors.dim(),
350        ctx.display_name,
351        ctx.colors.reset(),
352        ctx.colors.blue(),
353        ctx.colors.reset()
354    )
355}
356
357pub fn handle_unknown_item_started(
358    ctx: &EventHandlerContext<'_>,
359    item_type: Option<String>,
360    path: Option<String>,
361) -> String {
362    if ctx.verbosity.is_verbose() {
363        if let Some(t) = item_type {
364            return format!(
365                "{}[{}]{} {}{}:{} {}\n",
366                ctx.colors.dim(),
367                ctx.display_name,
368                ctx.colors.reset(),
369                ctx.colors.dim(),
370                t,
371                ctx.colors.reset(),
372                path.unwrap_or_default()
373            );
374        }
375    }
376    String::new()
377}
378
379pub fn handle_command_execution_completed(ctx: &EventHandlerContext<'_>) -> String {
380    match ctx.terminal_mode {
381        TerminalMode::Full | TerminalMode::Basic => format!(
382            "{}[{}]{} {}{} Command done{}\n",
383            ctx.colors.dim(),
384            ctx.display_name,
385            ctx.colors.reset(),
386            ctx.colors.green(),
387            CHECK,
388            ctx.colors.reset()
389        ),
390        TerminalMode::None => format!("[{}] Command done\n", ctx.display_name),
391    }
392}
393
394pub fn handle_file_write_completed(ctx: &EventHandlerContext<'_>, path: Option<String>) -> String {
395    let path = path.unwrap_or_else(|| "unknown".to_string());
396    match ctx.terminal_mode {
397        TerminalMode::Full | TerminalMode::Basic => format!(
398            "{}[{}]{} {}File{}: {}\n",
399            ctx.colors.dim(),
400            ctx.display_name,
401            ctx.colors.reset(),
402            ctx.colors.yellow(),
403            ctx.colors.reset(),
404            path
405        ),
406        TerminalMode::None => format!("[{}] File: {}\n", ctx.display_name, path),
407    }
408}
409
410pub fn handle_file_read_completed(ctx: &EventHandlerContext<'_>, path: Option<String>) -> String {
411    if ctx.verbosity.is_verbose() {
412        let path = path.unwrap_or_else(|| "unknown".to_string());
413        format!(
414            "{}[{}]{} {}{} Read:{} {}\n",
415            ctx.colors.dim(),
416            ctx.display_name,
417            ctx.colors.reset(),
418            ctx.colors.green(),
419            CHECK,
420            ctx.colors.reset(),
421            path
422        )
423    } else {
424        String::new()
425    }
426}
427
428pub fn handle_mcp_tool_completed(
429    ctx: &EventHandlerContext<'_>,
430    tool_name: Option<String>,
431) -> String {
432    let tool_name = tool_name.unwrap_or_else(|| "tool".to_string());
433    match ctx.terminal_mode {
434        TerminalMode::Full | TerminalMode::Basic => format!(
435            "{}[{}]{} {}{} MCP:{} {} done\n",
436            ctx.colors.dim(),
437            ctx.display_name,
438            ctx.colors.reset(),
439            ctx.colors.green(),
440            CHECK,
441            ctx.colors.reset(),
442            tool_name
443        ),
444        TerminalMode::None => format!("[{}] MCP: {} done\n", ctx.display_name, tool_name),
445    }
446}
447
448pub fn handle_web_search_completed(ctx: &EventHandlerContext<'_>) -> String {
449    match ctx.terminal_mode {
450        TerminalMode::Full | TerminalMode::Basic => format!(
451            "{}[{}]{} {}{} Search completed{}\n",
452            ctx.colors.dim(),
453            ctx.display_name,
454            ctx.colors.reset(),
455            ctx.colors.green(),
456            CHECK,
457            ctx.colors.reset()
458        ),
459        TerminalMode::None => format!("[{}] Search completed\n", ctx.display_name),
460    }
461}
462
463pub fn handle_plan_update_completed(
464    ctx: &EventHandlerContext<'_>,
465    plan: Option<&String>,
466) -> String {
467    if ctx.verbosity.is_verbose() {
468        let limit = ctx.verbosity.truncate_limit("text");
469        plan.map_or_else(
470            || match ctx.terminal_mode {
471                TerminalMode::Full | TerminalMode::Basic => format!(
472                    "{}[{}]{} {}{} Plan updated{}\n",
473                    ctx.colors.dim(),
474                    ctx.display_name,
475                    ctx.colors.reset(),
476                    ctx.colors.green(),
477                    CHECK,
478                    ctx.colors.reset()
479                ),
480                TerminalMode::None => format!("[{}] Plan updated\n", ctx.display_name),
481            },
482            |plan| {
483                let preview = truncate_text(plan, limit);
484                match ctx.terminal_mode {
485                    TerminalMode::Full | TerminalMode::Basic => format!(
486                        "{}[{}]{} {}Plan:{} {}\n",
487                        ctx.colors.dim(),
488                        ctx.display_name,
489                        ctx.colors.reset(),
490                        ctx.colors.blue(),
491                        ctx.colors.reset(),
492                        preview
493                    ),
494                    TerminalMode::None => format!("[{}] Plan: {}\n", ctx.display_name, preview),
495                }
496            },
497        )
498    } else {
499        String::new()
500    }
501}
502
503pub fn handle_error(
504    ctx: &EventHandlerContext<'_>,
505    message: Option<String>,
506    error: Option<String>,
507) -> String {
508    let err = message
509        .or(error)
510        .unwrap_or_else(|| "unknown error".to_string());
511    format!(
512        "{}[{}]{} {}{} Error:{} {}\n",
513        ctx.colors.dim(),
514        ctx.display_name,
515        ctx.colors.reset(),
516        ctx.colors.red(),
517        CROSS,
518        ctx.colors.reset(),
519        err
520    )
521}
522
523include!("event_handlers_agent_message.rs");