Skip to main content

ralph_adapters/
pi_stream.rs

1//! Pi stream event types for parsing `--mode json` NDJSON output.
2//!
3//! When invoked with `--mode json`, pi emits newline-delimited JSON events.
4//! This module provides typed Rust structures for deserializing and processing
5//! these events, plus a dispatch function for mapping them to `StreamHandler` calls.
6//!
7//! Only events that Ralph needs are modeled as typed variants. All other event
8//! types are captured by `#[serde(other)]` and silently ignored, providing
9//! forward compatibility with new pi event types.
10
11use crate::stream_handler::StreamHandler;
12use serde::{Deserialize, Serialize};
13
14/// Events from pi's `--mode json` NDJSON output.
15///
16/// Only the events Ralph needs are modeled. All other event types
17/// (session, agent_start, turn_start, message_start, message_end,
18/// tool_execution_update, etc.) are captured by the `Other` variant.
19#[derive(Debug, Clone, Serialize, Deserialize)]
20#[serde(tag = "type", rename_all = "snake_case")]
21pub enum PiStreamEvent {
22    /// Streaming text/thinking deltas and errors from assistant.
23    MessageUpdate {
24        #[serde(rename = "assistantMessageEvent")]
25        assistant_message_event: PiAssistantEvent,
26    },
27
28    /// Tool begins execution.
29    ToolExecutionStart {
30        #[serde(rename = "toolCallId")]
31        tool_call_id: String,
32        #[serde(rename = "toolName")]
33        tool_name: String,
34        args: serde_json::Value,
35    },
36
37    /// Tool completes execution.
38    ToolExecutionEnd {
39        #[serde(rename = "toolCallId")]
40        tool_call_id: String,
41        #[serde(rename = "toolName")]
42        tool_name: String,
43        result: PiToolResult,
44        #[serde(rename = "isError")]
45        is_error: bool,
46    },
47
48    /// Turn completes — contains per-turn usage/cost.
49    TurnEnd { message: Option<PiTurnMessage> },
50
51    /// All other events (session, agent_start, turn_start, message_start,
52    /// message_end, tool_execution_update, etc.)
53    #[serde(other)]
54    Other,
55}
56
57/// Assistant message event within a message_update.
58///
59/// Only text_delta, thinking_delta, and error are actionable.
60/// All other sub-types are captured by `Other`.
61#[derive(Debug, Clone, Serialize, Deserialize)]
62#[serde(tag = "type", rename_all = "snake_case")]
63pub enum PiAssistantEvent {
64    /// Text content delta.
65    TextDelta { delta: String },
66    /// Extended thinking delta.
67    ThinkingDelta { delta: String },
68    /// Error during message generation.
69    Error { reason: String },
70    /// All other sub-types (text_start, text_end, thinking_start, thinking_end,
71    /// toolcall_start, toolcall_delta, toolcall_end, done)
72    #[serde(other)]
73    Other,
74}
75
76/// Tool execution result.
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct PiToolResult {
79    pub content: Vec<PiContentBlock>,
80}
81
82/// Content block within a tool result.
83#[derive(Debug, Clone, Serialize, Deserialize)]
84#[serde(tag = "type", rename_all = "snake_case")]
85pub enum PiContentBlock {
86    Text {
87        text: String,
88    },
89    #[serde(other)]
90    Other,
91}
92
93/// Message in turn_end — contains usage data.
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct PiTurnMessage {
96    #[serde(rename = "stopReason")]
97    pub stop_reason: Option<String>,
98    pub provider: Option<String>,
99    pub model: Option<String>,
100    pub usage: Option<PiUsage>,
101}
102
103/// Token usage statistics from pi.
104#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct PiUsage {
106    pub input: u64,
107    pub output: u64,
108    #[serde(rename = "cacheRead")]
109    pub cache_read: u64,
110    #[serde(rename = "cacheWrite")]
111    pub cache_write: u64,
112    pub cost: Option<PiCost>,
113}
114
115/// Cost breakdown from pi.
116#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct PiCost {
118    pub total: f64,
119}
120
121/// Parses NDJSON lines from pi's stream output.
122pub struct PiStreamParser;
123
124impl PiStreamParser {
125    /// Parse a single line of NDJSON output.
126    ///
127    /// Returns `None` for empty lines or malformed JSON (logged at debug level).
128    pub fn parse_line(line: &str) -> Option<PiStreamEvent> {
129        let trimmed = line.trim();
130        if trimmed.is_empty() {
131            return None;
132        }
133
134        match serde_json::from_str::<PiStreamEvent>(trimmed) {
135            Ok(event) => Some(event),
136            Err(e) => {
137                tracing::debug!(
138                    "Skipping malformed pi JSON: {} (error: {})",
139                    truncate(trimmed, 100),
140                    e
141                );
142                None
143            }
144        }
145    }
146}
147
148/// State accumulated across events for session summary.
149pub struct PiSessionState {
150    pub total_cost_usd: f64,
151    pub num_turns: u32,
152    pub stream_provider: Option<String>,
153    pub stream_model: Option<String>,
154    /// Accumulated input tokens across all turns.
155    pub input_tokens: u64,
156    /// Accumulated output tokens across all turns.
157    pub output_tokens: u64,
158    /// Accumulated cache-read tokens across all turns.
159    pub cache_read_tokens: u64,
160    /// Accumulated cache-write tokens across all turns.
161    pub cache_write_tokens: u64,
162}
163
164impl PiSessionState {
165    pub fn new() -> Self {
166        Self {
167            total_cost_usd: 0.0,
168            num_turns: 0,
169            stream_provider: None,
170            stream_model: None,
171            input_tokens: 0,
172            output_tokens: 0,
173            cache_read_tokens: 0,
174            cache_write_tokens: 0,
175        }
176    }
177}
178
179impl Default for PiSessionState {
180    fn default() -> Self {
181        Self::new()
182    }
183}
184
185/// Dispatch a pi stream event to the `StreamHandler`.
186///
187/// Accumulates cost/turn data in `state` for the final `on_complete()` call.
188/// Appends text content to `extracted_text` for LOOP_COMPLETE detection.
189pub fn dispatch_pi_stream_event<H: StreamHandler>(
190    event: PiStreamEvent,
191    handler: &mut H,
192    extracted_text: &mut String,
193    state: &mut PiSessionState,
194    verbose: bool,
195) {
196    match event {
197        PiStreamEvent::MessageUpdate {
198            assistant_message_event,
199        } => match assistant_message_event {
200            PiAssistantEvent::TextDelta { delta } => {
201                handler.on_text(&delta);
202                extracted_text.push_str(&delta);
203            }
204            PiAssistantEvent::ThinkingDelta { delta } => {
205                if verbose {
206                    handler.on_text(&delta);
207                }
208            }
209            PiAssistantEvent::Error { reason } => {
210                handler.on_error(&reason);
211            }
212            PiAssistantEvent::Other => {}
213        },
214        PiStreamEvent::ToolExecutionStart {
215            tool_name,
216            tool_call_id,
217            args,
218        } => {
219            handler.on_tool_call(&tool_name, &tool_call_id, &args);
220        }
221        PiStreamEvent::ToolExecutionEnd {
222            tool_call_id,
223            result,
224            is_error,
225            ..
226        } => {
227            let output = result
228                .content
229                .iter()
230                .filter_map(|b| match b {
231                    PiContentBlock::Text { text } => Some(text.as_str()),
232                    PiContentBlock::Other => None,
233                })
234                .collect::<Vec<_>>()
235                .join("\n");
236            if is_error {
237                handler.on_error(&output);
238            } else {
239                handler.on_tool_result(&tool_call_id, &output);
240            }
241        }
242        PiStreamEvent::TurnEnd { message } => {
243            state.num_turns += 1;
244            if let Some(msg) = &message {
245                if let Some(provider) = &msg.provider
246                    && !provider.is_empty()
247                {
248                    state.stream_provider = Some(provider.clone());
249                }
250                if let Some(model) = &msg.model
251                    && !model.is_empty()
252                {
253                    state.stream_model = Some(model.clone());
254                }
255                if let Some(usage) = &msg.usage {
256                    if let Some(cost) = &usage.cost {
257                        state.total_cost_usd += cost.total;
258                    }
259                    state.input_tokens += usage.input;
260                    state.output_tokens += usage.output;
261                    state.cache_read_tokens += usage.cache_read;
262                    state.cache_write_tokens += usage.cache_write;
263                }
264            }
265        }
266        PiStreamEvent::Other => {}
267    }
268}
269
270/// Truncates a string to a maximum length, adding "..." if truncated.
271fn truncate(s: &str, max_len: usize) -> String {
272    if s.len() <= max_len {
273        s.to_string()
274    } else {
275        let boundary = s
276            .char_indices()
277            .take_while(|(i, _)| *i < max_len)
278            .last()
279            .map(|(i, c)| i + c.len_utf8())
280            .unwrap_or(0);
281        format!("{}...", &s[..boundary])
282    }
283}
284
285#[cfg(test)]
286mod tests {
287    use super::*;
288    use crate::SessionResult;
289    use serde_json::json;
290
291    // =========================================================================
292    // PiStreamParser::parse_line tests
293    // =========================================================================
294
295    #[test]
296    fn test_parse_text_delta() {
297        let json = r#"{"type":"message_update","assistantMessageEvent":{"type":"text_delta","contentIndex":0,"delta":"Hello world"}}"#;
298        let event = PiStreamParser::parse_line(json).unwrap();
299
300        match event {
301            PiStreamEvent::MessageUpdate {
302                assistant_message_event: PiAssistantEvent::TextDelta { delta },
303            } => {
304                assert_eq!(delta, "Hello world");
305            }
306            _ => panic!("Expected MessageUpdate with TextDelta, got {:?}", event),
307        }
308    }
309
310    #[test]
311    fn test_parse_thinking_delta() {
312        let json = r#"{"type":"message_update","assistantMessageEvent":{"type":"thinking_delta","contentIndex":0,"delta":"Let me think..."}}"#;
313        let event = PiStreamParser::parse_line(json).unwrap();
314
315        match event {
316            PiStreamEvent::MessageUpdate {
317                assistant_message_event: PiAssistantEvent::ThinkingDelta { delta },
318            } => {
319                assert_eq!(delta, "Let me think...");
320            }
321            _ => panic!("Expected MessageUpdate with ThinkingDelta, got {:?}", event),
322        }
323    }
324
325    #[test]
326    fn test_parse_error_event() {
327        let json = r#"{"type":"message_update","assistantMessageEvent":{"type":"error","reason":"aborted"}}"#;
328        let event = PiStreamParser::parse_line(json).unwrap();
329
330        match event {
331            PiStreamEvent::MessageUpdate {
332                assistant_message_event: PiAssistantEvent::Error { reason },
333            } => {
334                assert_eq!(reason, "aborted");
335            }
336            _ => panic!("Expected MessageUpdate with Error, got {:?}", event),
337        }
338    }
339
340    #[test]
341    fn test_parse_tool_execution_start() {
342        let json = r#"{"type":"tool_execution_start","toolCallId":"toolu_123","toolName":"bash","args":{"command":"echo hello"}}"#;
343        let event = PiStreamParser::parse_line(json).unwrap();
344
345        match event {
346            PiStreamEvent::ToolExecutionStart {
347                tool_call_id,
348                tool_name,
349                args,
350            } => {
351                assert_eq!(tool_call_id, "toolu_123");
352                assert_eq!(tool_name, "bash");
353                assert_eq!(args["command"], "echo hello");
354            }
355            _ => panic!("Expected ToolExecutionStart, got {:?}", event),
356        }
357    }
358
359    #[test]
360    fn test_parse_tool_execution_end() {
361        let json = r#"{"type":"tool_execution_end","toolCallId":"toolu_123","toolName":"bash","result":{"content":[{"type":"text","text":"hello\n"}]},"isError":false}"#;
362        let event = PiStreamParser::parse_line(json).unwrap();
363
364        match event {
365            PiStreamEvent::ToolExecutionEnd {
366                tool_call_id,
367                tool_name,
368                result,
369                is_error,
370            } => {
371                assert_eq!(tool_call_id, "toolu_123");
372                assert_eq!(tool_name, "bash");
373                assert!(!is_error);
374                assert_eq!(result.content.len(), 1);
375                match &result.content[0] {
376                    PiContentBlock::Text { text } => assert_eq!(text, "hello\n"),
377                    PiContentBlock::Other => panic!("Expected Text content block"),
378                }
379            }
380            _ => panic!("Expected ToolExecutionEnd, got {:?}", event),
381        }
382    }
383
384    #[test]
385    fn test_parse_tool_execution_end_error() {
386        let json = r#"{"type":"tool_execution_end","toolCallId":"toolu_456","toolName":"Read","result":{"content":[{"type":"text","text":"file not found"}]},"isError":true}"#;
387        let event = PiStreamParser::parse_line(json).unwrap();
388
389        match event {
390            PiStreamEvent::ToolExecutionEnd { is_error, .. } => {
391                assert!(is_error);
392            }
393            _ => panic!("Expected ToolExecutionEnd, got {:?}", event),
394        }
395    }
396
397    #[test]
398    fn test_parse_turn_end_with_usage() {
399        let json = r#"{"type":"turn_end","message":{"role":"assistant","content":[],"usage":{"input":1,"output":14,"cacheRead":8932,"cacheWrite":70,"totalTokens":9017,"cost":{"input":0.000005,"output":0.00035,"cacheRead":0.00447,"cacheWrite":0.00044,"total":0.00526}},"stopReason":"stop"},"toolResults":[]}"#;
400        let event = PiStreamParser::parse_line(json).unwrap();
401
402        match event {
403            PiStreamEvent::TurnEnd { message } => {
404                let msg = message.unwrap();
405                assert_eq!(msg.stop_reason, Some("stop".to_string()));
406                let usage = msg.usage.unwrap();
407                assert_eq!(usage.input, 1);
408                assert_eq!(usage.output, 14);
409                assert_eq!(usage.cache_read, 8932);
410                let cost = usage.cost.unwrap();
411                assert!((cost.total - 0.00526).abs() < 1e-10);
412            }
413            _ => panic!("Expected TurnEnd, got {:?}", event),
414        }
415    }
416
417    #[test]
418    fn test_parse_turn_end_without_usage() {
419        let json = r#"{"type":"turn_end","message":{"role":"assistant","content":[],"stopReason":"stop"}}"#;
420        let event = PiStreamParser::parse_line(json).unwrap();
421
422        match event {
423            PiStreamEvent::TurnEnd { message } => {
424                let msg = message.unwrap();
425                assert!(msg.usage.is_none());
426            }
427            _ => panic!("Expected TurnEnd, got {:?}", event),
428        }
429    }
430
431    #[test]
432    fn test_parse_unknown_event_type() {
433        // session, agent_start, turn_start, etc. should all parse as Other
434        let json = r#"{"type":"session","version":3,"id":"uuid","timestamp":"2026-02-05T02:39:26.125Z","cwd":"/tmp"}"#;
435        let event = PiStreamParser::parse_line(json).unwrap();
436        assert!(matches!(event, PiStreamEvent::Other));
437
438        let json = r#"{"type":"agent_start"}"#;
439        let event = PiStreamParser::parse_line(json).unwrap();
440        assert!(matches!(event, PiStreamEvent::Other));
441
442        let json = r#"{"type":"turn_start"}"#;
443        let event = PiStreamParser::parse_line(json).unwrap();
444        assert!(matches!(event, PiStreamEvent::Other));
445
446        let json = r#"{"type":"message_start","message":{"role":"user","content":[]}}"#;
447        let event = PiStreamParser::parse_line(json).unwrap();
448        assert!(matches!(event, PiStreamEvent::Other));
449
450        let json = r#"{"type":"message_end","message":{"role":"assistant","content":[]}}"#;
451        let event = PiStreamParser::parse_line(json).unwrap();
452        assert!(matches!(event, PiStreamEvent::Other));
453    }
454
455    #[test]
456    fn test_parse_unknown_assistant_event_type() {
457        // toolcall_start, toolcall_delta, toolcall_end, text_start, text_end, done
458        let json = r#"{"type":"message_update","assistantMessageEvent":{"type":"toolcall_start","contentIndex":0}}"#;
459        let event = PiStreamParser::parse_line(json).unwrap();
460        match event {
461            PiStreamEvent::MessageUpdate {
462                assistant_message_event: PiAssistantEvent::Other,
463            } => {}
464            _ => panic!("Expected MessageUpdate with Other assistant event"),
465        }
466
467        let json =
468            r#"{"type":"message_update","assistantMessageEvent":{"type":"done","reason":"stop"}}"#;
469        let event = PiStreamParser::parse_line(json).unwrap();
470        match event {
471            PiStreamEvent::MessageUpdate {
472                assistant_message_event: PiAssistantEvent::Other,
473            } => {}
474            _ => panic!("Expected MessageUpdate with Other assistant event"),
475        }
476    }
477
478    #[test]
479    fn test_parse_empty_line() {
480        assert!(PiStreamParser::parse_line("").is_none());
481        assert!(PiStreamParser::parse_line("   ").is_none());
482        assert!(PiStreamParser::parse_line("\n").is_none());
483    }
484
485    #[test]
486    fn test_parse_malformed_json() {
487        assert!(PiStreamParser::parse_line("{not valid json}").is_none());
488        assert!(PiStreamParser::parse_line("plain text").is_none());
489    }
490
491    #[test]
492    fn test_parse_tool_execution_update_is_other() {
493        let json = r#"{"type":"tool_execution_update","toolCallId":"toolu_123","toolName":"bash","args":{"command":"echo hello"},"partialResult":{"content":[{"type":"text","text":"hello\n"}]}}"#;
494        let event = PiStreamParser::parse_line(json).unwrap();
495        assert!(matches!(event, PiStreamEvent::Other));
496    }
497
498    // =========================================================================
499    // dispatch_pi_stream_event tests
500    // =========================================================================
501
502    /// Recording handler for testing dispatch behavior.
503    #[derive(Default)]
504    struct RecordingHandler {
505        texts: Vec<String>,
506        tool_calls: Vec<(String, String, serde_json::Value)>,
507        tool_results: Vec<(String, String)>,
508        errors: Vec<String>,
509        completions: Vec<SessionResult>,
510    }
511
512    impl StreamHandler for RecordingHandler {
513        fn on_text(&mut self, text: &str) {
514            self.texts.push(text.to_string());
515        }
516        fn on_tool_call(&mut self, name: &str, id: &str, input: &serde_json::Value) {
517            self.tool_calls
518                .push((name.to_string(), id.to_string(), input.clone()));
519        }
520        fn on_tool_result(&mut self, id: &str, output: &str) {
521            self.tool_results.push((id.to_string(), output.to_string()));
522        }
523        fn on_error(&mut self, error: &str) {
524            self.errors.push(error.to_string());
525        }
526        fn on_complete(&mut self, result: &SessionResult) {
527            self.completions.push(result.clone());
528        }
529    }
530
531    #[test]
532    fn test_dispatch_text_delta() {
533        let mut handler = RecordingHandler::default();
534        let mut extracted = String::new();
535        let mut state = PiSessionState::new();
536
537        let event = PiStreamEvent::MessageUpdate {
538            assistant_message_event: PiAssistantEvent::TextDelta {
539                delta: "Hello".to_string(),
540            },
541        };
542
543        dispatch_pi_stream_event(event, &mut handler, &mut extracted, &mut state, false);
544
545        assert_eq!(handler.texts, vec!["Hello"]);
546        assert_eq!(extracted, "Hello");
547    }
548
549    #[test]
550    fn test_dispatch_thinking_delta_verbose() {
551        let mut handler = RecordingHandler::default();
552        let mut extracted = String::new();
553        let mut state = PiSessionState::new();
554
555        let event = PiStreamEvent::MessageUpdate {
556            assistant_message_event: PiAssistantEvent::ThinkingDelta {
557                delta: "thinking...".to_string(),
558            },
559        };
560
561        dispatch_pi_stream_event(event, &mut handler, &mut extracted, &mut state, true);
562        assert_eq!(handler.texts, vec!["thinking..."]);
563        // Thinking should NOT go into extracted_text (not part of output)
564        assert!(extracted.is_empty());
565    }
566
567    #[test]
568    fn test_dispatch_thinking_delta_not_verbose() {
569        let mut handler = RecordingHandler::default();
570        let mut extracted = String::new();
571        let mut state = PiSessionState::new();
572
573        let event = PiStreamEvent::MessageUpdate {
574            assistant_message_event: PiAssistantEvent::ThinkingDelta {
575                delta: "thinking...".to_string(),
576            },
577        };
578
579        dispatch_pi_stream_event(event, &mut handler, &mut extracted, &mut state, false);
580        assert!(handler.texts.is_empty());
581        assert!(extracted.is_empty());
582    }
583
584    #[test]
585    fn test_dispatch_error() {
586        let mut handler = RecordingHandler::default();
587        let mut extracted = String::new();
588        let mut state = PiSessionState::new();
589
590        let event = PiStreamEvent::MessageUpdate {
591            assistant_message_event: PiAssistantEvent::Error {
592                reason: "aborted".to_string(),
593            },
594        };
595
596        dispatch_pi_stream_event(event, &mut handler, &mut extracted, &mut state, false);
597        assert_eq!(handler.errors, vec!["aborted"]);
598    }
599
600    #[test]
601    fn test_dispatch_tool_execution_start() {
602        let mut handler = RecordingHandler::default();
603        let mut extracted = String::new();
604        let mut state = PiSessionState::new();
605
606        let event = PiStreamEvent::ToolExecutionStart {
607            tool_call_id: "toolu_123".to_string(),
608            tool_name: "bash".to_string(),
609            args: json!({"command": "echo hello"}),
610        };
611
612        dispatch_pi_stream_event(event, &mut handler, &mut extracted, &mut state, false);
613
614        assert_eq!(handler.tool_calls.len(), 1);
615        assert_eq!(handler.tool_calls[0].0, "bash");
616        assert_eq!(handler.tool_calls[0].1, "toolu_123");
617        assert_eq!(handler.tool_calls[0].2["command"], "echo hello");
618    }
619
620    #[test]
621    fn test_dispatch_tool_execution_end_success() {
622        let mut handler = RecordingHandler::default();
623        let mut extracted = String::new();
624        let mut state = PiSessionState::new();
625
626        let event = PiStreamEvent::ToolExecutionEnd {
627            tool_call_id: "toolu_123".to_string(),
628            tool_name: "bash".to_string(),
629            result: PiToolResult {
630                content: vec![PiContentBlock::Text {
631                    text: "hello\n".to_string(),
632                }],
633            },
634            is_error: false,
635        };
636
637        dispatch_pi_stream_event(event, &mut handler, &mut extracted, &mut state, false);
638
639        assert_eq!(handler.tool_results.len(), 1);
640        assert_eq!(handler.tool_results[0].0, "toolu_123");
641        assert_eq!(handler.tool_results[0].1, "hello\n");
642        assert!(handler.errors.is_empty());
643    }
644
645    #[test]
646    fn test_dispatch_tool_execution_end_error() {
647        let mut handler = RecordingHandler::default();
648        let mut extracted = String::new();
649        let mut state = PiSessionState::new();
650
651        let event = PiStreamEvent::ToolExecutionEnd {
652            tool_call_id: "toolu_456".to_string(),
653            tool_name: "Read".to_string(),
654            result: PiToolResult {
655                content: vec![PiContentBlock::Text {
656                    text: "file not found".to_string(),
657                }],
658            },
659            is_error: true,
660        };
661
662        dispatch_pi_stream_event(event, &mut handler, &mut extracted, &mut state, false);
663
664        assert!(handler.tool_results.is_empty());
665        assert_eq!(handler.errors, vec!["file not found"]);
666    }
667
668    #[test]
669    fn test_dispatch_turn_end_accumulates_cost() {
670        let mut handler = RecordingHandler::default();
671        let mut extracted = String::new();
672        let mut state = PiSessionState::new();
673
674        // Three turns with different costs
675        for cost in [0.05, 0.03, 0.01] {
676            let event = PiStreamEvent::TurnEnd {
677                message: Some(PiTurnMessage {
678                    stop_reason: Some("stop".to_string()),
679                    provider: None,
680                    model: None,
681                    usage: Some(PiUsage {
682                        input: 100,
683                        output: 50,
684                        cache_read: 0,
685                        cache_write: 0,
686                        cost: Some(PiCost { total: cost }),
687                    }),
688                }),
689            };
690            dispatch_pi_stream_event(event, &mut handler, &mut extracted, &mut state, false);
691        }
692
693        assert_eq!(state.num_turns, 3);
694        assert!((state.total_cost_usd - 0.09).abs() < 1e-10);
695    }
696
697    #[test]
698    fn test_dispatch_turn_end_missing_usage() {
699        let mut handler = RecordingHandler::default();
700        let mut extracted = String::new();
701        let mut state = PiSessionState::new();
702
703        let event = PiStreamEvent::TurnEnd {
704            message: Some(PiTurnMessage {
705                stop_reason: Some("stop".to_string()),
706                provider: None,
707                model: None,
708                usage: None,
709            }),
710        };
711
712        dispatch_pi_stream_event(event, &mut handler, &mut extracted, &mut state, false);
713
714        assert_eq!(state.num_turns, 1);
715        assert!((state.total_cost_usd - 0.0).abs() < f64::EPSILON);
716    }
717
718    #[test]
719    fn test_dispatch_turn_end_missing_message() {
720        let mut handler = RecordingHandler::default();
721        let mut extracted = String::new();
722        let mut state = PiSessionState::new();
723
724        let event = PiStreamEvent::TurnEnd { message: None };
725
726        dispatch_pi_stream_event(event, &mut handler, &mut extracted, &mut state, false);
727
728        assert_eq!(state.num_turns, 1);
729        assert!((state.total_cost_usd - 0.0).abs() < f64::EPSILON);
730    }
731
732    #[test]
733    fn test_dispatch_other_is_noop() {
734        let mut handler = RecordingHandler::default();
735        let mut extracted = String::new();
736        let mut state = PiSessionState::new();
737
738        dispatch_pi_stream_event(
739            PiStreamEvent::Other,
740            &mut handler,
741            &mut extracted,
742            &mut state,
743            false,
744        );
745
746        assert!(handler.texts.is_empty());
747        assert!(handler.tool_calls.is_empty());
748        assert!(handler.tool_results.is_empty());
749        assert!(handler.errors.is_empty());
750        assert!(handler.completions.is_empty());
751        assert!(extracted.is_empty());
752        assert_eq!(state.num_turns, 0);
753    }
754
755    #[test]
756    fn test_dispatch_assistant_other_is_noop() {
757        let mut handler = RecordingHandler::default();
758        let mut extracted = String::new();
759        let mut state = PiSessionState::new();
760
761        let event = PiStreamEvent::MessageUpdate {
762            assistant_message_event: PiAssistantEvent::Other,
763        };
764
765        dispatch_pi_stream_event(event, &mut handler, &mut extracted, &mut state, false);
766
767        assert!(handler.texts.is_empty());
768        assert!(handler.errors.is_empty());
769    }
770
771    // =========================================================================
772    // Real NDJSON line tests (from research samples)
773    // =========================================================================
774
775    #[test]
776    fn test_parse_real_session_event() {
777        let json = r#"{"type":"session","version":3,"id":"550e8400-e29b-41d4-a716-446655440000","timestamp":"2026-02-05T02:39:26.125Z","cwd":"/home/user/project"}"#;
778        let event = PiStreamParser::parse_line(json).unwrap();
779        assert!(matches!(event, PiStreamEvent::Other));
780    }
781
782    #[test]
783    fn test_parse_real_tool_execution_start() {
784        let json = r#"{"type":"tool_execution_start","toolCallId":"toolu_01BKzy4E5YAeFLdgwFKtNRqv","toolName":"bash","args":{"command":"echo hello"}}"#;
785        let event = PiStreamParser::parse_line(json).unwrap();
786        match event {
787            PiStreamEvent::ToolExecutionStart {
788                tool_call_id,
789                tool_name,
790                args,
791            } => {
792                assert_eq!(tool_call_id, "toolu_01BKzy4E5YAeFLdgwFKtNRqv");
793                assert_eq!(tool_name, "bash");
794                assert_eq!(args["command"], "echo hello");
795            }
796            _ => panic!("Expected ToolExecutionStart"),
797        }
798    }
799
800    #[test]
801    fn test_parse_real_turn_end() {
802        let json = r#"{"type":"turn_end","message":{"role":"assistant","content":[{"type":"text","text":"Done."}],"api":"anthropic-messages","provider":"anthropic","model":"claude-opus-4-5","usage":{"input":1,"output":14,"cacheRead":8932,"cacheWrite":70,"totalTokens":9017,"cost":{"input":0.000005,"output":0.00035,"cacheRead":0.00447,"cacheWrite":0.00044,"total":0.00526}},"stopReason":"stop","timestamp":1770259166907},"toolResults":[]}"#;
803        let event = PiStreamParser::parse_line(json).unwrap();
804        match event {
805            PiStreamEvent::TurnEnd { message } => {
806                let msg = message.unwrap();
807                assert_eq!(msg.stop_reason, Some("stop".to_string()));
808                assert_eq!(msg.provider, Some("anthropic".to_string()));
809                assert_eq!(msg.model, Some("claude-opus-4-5".to_string()));
810                let usage = msg.usage.unwrap();
811                let cost = usage.cost.unwrap();
812                assert!((cost.total - 0.00526).abs() < 1e-10);
813            }
814            _ => panic!("Expected TurnEnd"),
815        }
816    }
817
818    #[test]
819    fn test_dispatch_turn_end_captures_stream_identity() {
820        let mut handler = RecordingHandler::default();
821        let mut extracted = String::new();
822        let mut state = PiSessionState::new();
823
824        let event = PiStreamEvent::TurnEnd {
825            message: Some(PiTurnMessage {
826                stop_reason: Some("stop".to_string()),
827                provider: Some("anthropic".to_string()),
828                model: Some("claude-sonnet-4".to_string()),
829                usage: None,
830            }),
831        };
832
833        dispatch_pi_stream_event(event, &mut handler, &mut extracted, &mut state, false);
834
835        assert_eq!(state.stream_provider, Some("anthropic".to_string()));
836        assert_eq!(state.stream_model, Some("claude-sonnet-4".to_string()));
837    }
838
839    #[test]
840    fn test_tool_result_multiple_content_blocks() {
841        let mut handler = RecordingHandler::default();
842        let mut extracted = String::new();
843        let mut state = PiSessionState::new();
844
845        let event = PiStreamEvent::ToolExecutionEnd {
846            tool_call_id: "toolu_789".to_string(),
847            tool_name: "Read".to_string(),
848            result: PiToolResult {
849                content: vec![
850                    PiContentBlock::Text {
851                        text: "line 1".to_string(),
852                    },
853                    PiContentBlock::Text {
854                        text: "line 2".to_string(),
855                    },
856                ],
857            },
858            is_error: false,
859        };
860
861        dispatch_pi_stream_event(event, &mut handler, &mut extracted, &mut state, false);
862
863        assert_eq!(handler.tool_results[0].1, "line 1\nline 2");
864    }
865}