1use crate::stream_handler::StreamHandler;
12use serde::{Deserialize, Serialize};
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
20#[serde(tag = "type", rename_all = "snake_case")]
21pub enum PiStreamEvent {
22 MessageUpdate {
24 #[serde(rename = "assistantMessageEvent")]
25 assistant_message_event: PiAssistantEvent,
26 },
27
28 ToolExecutionStart {
30 #[serde(rename = "toolCallId")]
31 tool_call_id: String,
32 #[serde(rename = "toolName")]
33 tool_name: String,
34 args: serde_json::Value,
35 },
36
37 ToolExecutionEnd {
39 #[serde(rename = "toolCallId")]
40 tool_call_id: String,
41 #[serde(rename = "toolName")]
42 tool_name: String,
43 result: PiToolResult,
44 #[serde(rename = "isError")]
45 is_error: bool,
46 },
47
48 TurnEnd { message: Option<PiTurnMessage> },
50
51 #[serde(other)]
54 Other,
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
62#[serde(tag = "type", rename_all = "snake_case")]
63pub enum PiAssistantEvent {
64 TextDelta { delta: String },
66 ThinkingDelta { delta: String },
68 Error { reason: String },
70 #[serde(other)]
73 Other,
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct PiToolResult {
79 pub content: Vec<PiContentBlock>,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
84#[serde(tag = "type", rename_all = "snake_case")]
85pub enum PiContentBlock {
86 Text {
87 text: String,
88 },
89 #[serde(other)]
90 Other,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct PiTurnMessage {
96 #[serde(rename = "stopReason")]
97 pub stop_reason: Option<String>,
98 pub provider: Option<String>,
99 pub model: Option<String>,
100 pub usage: Option<PiUsage>,
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct PiUsage {
106 pub input: u64,
107 pub output: u64,
108 #[serde(rename = "cacheRead")]
109 pub cache_read: u64,
110 #[serde(rename = "cacheWrite")]
111 pub cache_write: u64,
112 pub cost: Option<PiCost>,
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct PiCost {
118 pub total: f64,
119}
120
121pub struct PiStreamParser;
123
124impl PiStreamParser {
125 pub fn parse_line(line: &str) -> Option<PiStreamEvent> {
129 let trimmed = line.trim();
130 if trimmed.is_empty() {
131 return None;
132 }
133
134 match serde_json::from_str::<PiStreamEvent>(trimmed) {
135 Ok(event) => Some(event),
136 Err(e) => {
137 tracing::debug!(
138 "Skipping malformed pi JSON: {} (error: {})",
139 truncate(trimmed, 100),
140 e
141 );
142 None
143 }
144 }
145 }
146}
147
148pub struct PiSessionState {
150 pub total_cost_usd: f64,
151 pub num_turns: u32,
152 pub stream_provider: Option<String>,
153 pub stream_model: Option<String>,
154 pub input_tokens: u64,
156 pub output_tokens: u64,
158 pub cache_read_tokens: u64,
160 pub cache_write_tokens: u64,
162}
163
164impl PiSessionState {
165 pub fn new() -> Self {
166 Self {
167 total_cost_usd: 0.0,
168 num_turns: 0,
169 stream_provider: None,
170 stream_model: None,
171 input_tokens: 0,
172 output_tokens: 0,
173 cache_read_tokens: 0,
174 cache_write_tokens: 0,
175 }
176 }
177}
178
179impl Default for PiSessionState {
180 fn default() -> Self {
181 Self::new()
182 }
183}
184
185pub fn dispatch_pi_stream_event<H: StreamHandler>(
190 event: PiStreamEvent,
191 handler: &mut H,
192 extracted_text: &mut String,
193 state: &mut PiSessionState,
194 verbose: bool,
195) {
196 match event {
197 PiStreamEvent::MessageUpdate {
198 assistant_message_event,
199 } => match assistant_message_event {
200 PiAssistantEvent::TextDelta { delta } => {
201 handler.on_text(&delta);
202 extracted_text.push_str(&delta);
203 }
204 PiAssistantEvent::ThinkingDelta { delta } => {
205 if verbose {
206 handler.on_text(&delta);
207 }
208 }
209 PiAssistantEvent::Error { reason } => {
210 handler.on_error(&reason);
211 }
212 PiAssistantEvent::Other => {}
213 },
214 PiStreamEvent::ToolExecutionStart {
215 tool_name,
216 tool_call_id,
217 args,
218 } => {
219 handler.on_tool_call(&tool_name, &tool_call_id, &args);
220 }
221 PiStreamEvent::ToolExecutionEnd {
222 tool_call_id,
223 result,
224 is_error,
225 ..
226 } => {
227 let output = result
228 .content
229 .iter()
230 .filter_map(|b| match b {
231 PiContentBlock::Text { text } => Some(text.as_str()),
232 PiContentBlock::Other => None,
233 })
234 .collect::<Vec<_>>()
235 .join("\n");
236 if is_error {
237 handler.on_error(&output);
238 } else {
239 handler.on_tool_result(&tool_call_id, &output);
240 }
241 }
242 PiStreamEvent::TurnEnd { message } => {
243 state.num_turns += 1;
244 if let Some(msg) = &message {
245 if let Some(provider) = &msg.provider
246 && !provider.is_empty()
247 {
248 state.stream_provider = Some(provider.clone());
249 }
250 if let Some(model) = &msg.model
251 && !model.is_empty()
252 {
253 state.stream_model = Some(model.clone());
254 }
255 if let Some(usage) = &msg.usage {
256 if let Some(cost) = &usage.cost {
257 state.total_cost_usd += cost.total;
258 }
259 state.input_tokens += usage.input;
260 state.output_tokens += usage.output;
261 state.cache_read_tokens += usage.cache_read;
262 state.cache_write_tokens += usage.cache_write;
263 }
264 }
265 }
266 PiStreamEvent::Other => {}
267 }
268}
269
270fn truncate(s: &str, max_len: usize) -> String {
272 if s.len() <= max_len {
273 s.to_string()
274 } else {
275 let boundary = s
276 .char_indices()
277 .take_while(|(i, _)| *i < max_len)
278 .last()
279 .map(|(i, c)| i + c.len_utf8())
280 .unwrap_or(0);
281 format!("{}...", &s[..boundary])
282 }
283}
284
285#[cfg(test)]
286mod tests {
287 use super::*;
288 use crate::SessionResult;
289 use serde_json::json;
290
291 #[test]
296 fn test_parse_text_delta() {
297 let json = r#"{"type":"message_update","assistantMessageEvent":{"type":"text_delta","contentIndex":0,"delta":"Hello world"}}"#;
298 let event = PiStreamParser::parse_line(json).unwrap();
299
300 match event {
301 PiStreamEvent::MessageUpdate {
302 assistant_message_event: PiAssistantEvent::TextDelta { delta },
303 } => {
304 assert_eq!(delta, "Hello world");
305 }
306 _ => panic!("Expected MessageUpdate with TextDelta, got {:?}", event),
307 }
308 }
309
310 #[test]
311 fn test_parse_thinking_delta() {
312 let json = r#"{"type":"message_update","assistantMessageEvent":{"type":"thinking_delta","contentIndex":0,"delta":"Let me think..."}}"#;
313 let event = PiStreamParser::parse_line(json).unwrap();
314
315 match event {
316 PiStreamEvent::MessageUpdate {
317 assistant_message_event: PiAssistantEvent::ThinkingDelta { delta },
318 } => {
319 assert_eq!(delta, "Let me think...");
320 }
321 _ => panic!("Expected MessageUpdate with ThinkingDelta, got {:?}", event),
322 }
323 }
324
325 #[test]
326 fn test_parse_error_event() {
327 let json = r#"{"type":"message_update","assistantMessageEvent":{"type":"error","reason":"aborted"}}"#;
328 let event = PiStreamParser::parse_line(json).unwrap();
329
330 match event {
331 PiStreamEvent::MessageUpdate {
332 assistant_message_event: PiAssistantEvent::Error { reason },
333 } => {
334 assert_eq!(reason, "aborted");
335 }
336 _ => panic!("Expected MessageUpdate with Error, got {:?}", event),
337 }
338 }
339
340 #[test]
341 fn test_parse_tool_execution_start() {
342 let json = r#"{"type":"tool_execution_start","toolCallId":"toolu_123","toolName":"bash","args":{"command":"echo hello"}}"#;
343 let event = PiStreamParser::parse_line(json).unwrap();
344
345 match event {
346 PiStreamEvent::ToolExecutionStart {
347 tool_call_id,
348 tool_name,
349 args,
350 } => {
351 assert_eq!(tool_call_id, "toolu_123");
352 assert_eq!(tool_name, "bash");
353 assert_eq!(args["command"], "echo hello");
354 }
355 _ => panic!("Expected ToolExecutionStart, got {:?}", event),
356 }
357 }
358
359 #[test]
360 fn test_parse_tool_execution_end() {
361 let json = r#"{"type":"tool_execution_end","toolCallId":"toolu_123","toolName":"bash","result":{"content":[{"type":"text","text":"hello\n"}]},"isError":false}"#;
362 let event = PiStreamParser::parse_line(json).unwrap();
363
364 match event {
365 PiStreamEvent::ToolExecutionEnd {
366 tool_call_id,
367 tool_name,
368 result,
369 is_error,
370 } => {
371 assert_eq!(tool_call_id, "toolu_123");
372 assert_eq!(tool_name, "bash");
373 assert!(!is_error);
374 assert_eq!(result.content.len(), 1);
375 match &result.content[0] {
376 PiContentBlock::Text { text } => assert_eq!(text, "hello\n"),
377 PiContentBlock::Other => panic!("Expected Text content block"),
378 }
379 }
380 _ => panic!("Expected ToolExecutionEnd, got {:?}", event),
381 }
382 }
383
384 #[test]
385 fn test_parse_tool_execution_end_error() {
386 let json = r#"{"type":"tool_execution_end","toolCallId":"toolu_456","toolName":"Read","result":{"content":[{"type":"text","text":"file not found"}]},"isError":true}"#;
387 let event = PiStreamParser::parse_line(json).unwrap();
388
389 match event {
390 PiStreamEvent::ToolExecutionEnd { is_error, .. } => {
391 assert!(is_error);
392 }
393 _ => panic!("Expected ToolExecutionEnd, got {:?}", event),
394 }
395 }
396
397 #[test]
398 fn test_parse_turn_end_with_usage() {
399 let json = r#"{"type":"turn_end","message":{"role":"assistant","content":[],"usage":{"input":1,"output":14,"cacheRead":8932,"cacheWrite":70,"totalTokens":9017,"cost":{"input":0.000005,"output":0.00035,"cacheRead":0.00447,"cacheWrite":0.00044,"total":0.00526}},"stopReason":"stop"},"toolResults":[]}"#;
400 let event = PiStreamParser::parse_line(json).unwrap();
401
402 match event {
403 PiStreamEvent::TurnEnd { message } => {
404 let msg = message.unwrap();
405 assert_eq!(msg.stop_reason, Some("stop".to_string()));
406 let usage = msg.usage.unwrap();
407 assert_eq!(usage.input, 1);
408 assert_eq!(usage.output, 14);
409 assert_eq!(usage.cache_read, 8932);
410 let cost = usage.cost.unwrap();
411 assert!((cost.total - 0.00526).abs() < 1e-10);
412 }
413 _ => panic!("Expected TurnEnd, got {:?}", event),
414 }
415 }
416
417 #[test]
418 fn test_parse_turn_end_without_usage() {
419 let json = r#"{"type":"turn_end","message":{"role":"assistant","content":[],"stopReason":"stop"}}"#;
420 let event = PiStreamParser::parse_line(json).unwrap();
421
422 match event {
423 PiStreamEvent::TurnEnd { message } => {
424 let msg = message.unwrap();
425 assert!(msg.usage.is_none());
426 }
427 _ => panic!("Expected TurnEnd, got {:?}", event),
428 }
429 }
430
431 #[test]
432 fn test_parse_unknown_event_type() {
433 let json = r#"{"type":"session","version":3,"id":"uuid","timestamp":"2026-02-05T02:39:26.125Z","cwd":"/tmp"}"#;
435 let event = PiStreamParser::parse_line(json).unwrap();
436 assert!(matches!(event, PiStreamEvent::Other));
437
438 let json = r#"{"type":"agent_start"}"#;
439 let event = PiStreamParser::parse_line(json).unwrap();
440 assert!(matches!(event, PiStreamEvent::Other));
441
442 let json = r#"{"type":"turn_start"}"#;
443 let event = PiStreamParser::parse_line(json).unwrap();
444 assert!(matches!(event, PiStreamEvent::Other));
445
446 let json = r#"{"type":"message_start","message":{"role":"user","content":[]}}"#;
447 let event = PiStreamParser::parse_line(json).unwrap();
448 assert!(matches!(event, PiStreamEvent::Other));
449
450 let json = r#"{"type":"message_end","message":{"role":"assistant","content":[]}}"#;
451 let event = PiStreamParser::parse_line(json).unwrap();
452 assert!(matches!(event, PiStreamEvent::Other));
453 }
454
455 #[test]
456 fn test_parse_unknown_assistant_event_type() {
457 let json = r#"{"type":"message_update","assistantMessageEvent":{"type":"toolcall_start","contentIndex":0}}"#;
459 let event = PiStreamParser::parse_line(json).unwrap();
460 match event {
461 PiStreamEvent::MessageUpdate {
462 assistant_message_event: PiAssistantEvent::Other,
463 } => {}
464 _ => panic!("Expected MessageUpdate with Other assistant event"),
465 }
466
467 let json =
468 r#"{"type":"message_update","assistantMessageEvent":{"type":"done","reason":"stop"}}"#;
469 let event = PiStreamParser::parse_line(json).unwrap();
470 match event {
471 PiStreamEvent::MessageUpdate {
472 assistant_message_event: PiAssistantEvent::Other,
473 } => {}
474 _ => panic!("Expected MessageUpdate with Other assistant event"),
475 }
476 }
477
478 #[test]
479 fn test_parse_empty_line() {
480 assert!(PiStreamParser::parse_line("").is_none());
481 assert!(PiStreamParser::parse_line(" ").is_none());
482 assert!(PiStreamParser::parse_line("\n").is_none());
483 }
484
485 #[test]
486 fn test_parse_malformed_json() {
487 assert!(PiStreamParser::parse_line("{not valid json}").is_none());
488 assert!(PiStreamParser::parse_line("plain text").is_none());
489 }
490
491 #[test]
492 fn test_parse_tool_execution_update_is_other() {
493 let json = r#"{"type":"tool_execution_update","toolCallId":"toolu_123","toolName":"bash","args":{"command":"echo hello"},"partialResult":{"content":[{"type":"text","text":"hello\n"}]}}"#;
494 let event = PiStreamParser::parse_line(json).unwrap();
495 assert!(matches!(event, PiStreamEvent::Other));
496 }
497
498 #[derive(Default)]
504 struct RecordingHandler {
505 texts: Vec<String>,
506 tool_calls: Vec<(String, String, serde_json::Value)>,
507 tool_results: Vec<(String, String)>,
508 errors: Vec<String>,
509 completions: Vec<SessionResult>,
510 }
511
512 impl StreamHandler for RecordingHandler {
513 fn on_text(&mut self, text: &str) {
514 self.texts.push(text.to_string());
515 }
516 fn on_tool_call(&mut self, name: &str, id: &str, input: &serde_json::Value) {
517 self.tool_calls
518 .push((name.to_string(), id.to_string(), input.clone()));
519 }
520 fn on_tool_result(&mut self, id: &str, output: &str) {
521 self.tool_results.push((id.to_string(), output.to_string()));
522 }
523 fn on_error(&mut self, error: &str) {
524 self.errors.push(error.to_string());
525 }
526 fn on_complete(&mut self, result: &SessionResult) {
527 self.completions.push(result.clone());
528 }
529 }
530
531 #[test]
532 fn test_dispatch_text_delta() {
533 let mut handler = RecordingHandler::default();
534 let mut extracted = String::new();
535 let mut state = PiSessionState::new();
536
537 let event = PiStreamEvent::MessageUpdate {
538 assistant_message_event: PiAssistantEvent::TextDelta {
539 delta: "Hello".to_string(),
540 },
541 };
542
543 dispatch_pi_stream_event(event, &mut handler, &mut extracted, &mut state, false);
544
545 assert_eq!(handler.texts, vec!["Hello"]);
546 assert_eq!(extracted, "Hello");
547 }
548
549 #[test]
550 fn test_dispatch_thinking_delta_verbose() {
551 let mut handler = RecordingHandler::default();
552 let mut extracted = String::new();
553 let mut state = PiSessionState::new();
554
555 let event = PiStreamEvent::MessageUpdate {
556 assistant_message_event: PiAssistantEvent::ThinkingDelta {
557 delta: "thinking...".to_string(),
558 },
559 };
560
561 dispatch_pi_stream_event(event, &mut handler, &mut extracted, &mut state, true);
562 assert_eq!(handler.texts, vec!["thinking..."]);
563 assert!(extracted.is_empty());
565 }
566
567 #[test]
568 fn test_dispatch_thinking_delta_not_verbose() {
569 let mut handler = RecordingHandler::default();
570 let mut extracted = String::new();
571 let mut state = PiSessionState::new();
572
573 let event = PiStreamEvent::MessageUpdate {
574 assistant_message_event: PiAssistantEvent::ThinkingDelta {
575 delta: "thinking...".to_string(),
576 },
577 };
578
579 dispatch_pi_stream_event(event, &mut handler, &mut extracted, &mut state, false);
580 assert!(handler.texts.is_empty());
581 assert!(extracted.is_empty());
582 }
583
584 #[test]
585 fn test_dispatch_error() {
586 let mut handler = RecordingHandler::default();
587 let mut extracted = String::new();
588 let mut state = PiSessionState::new();
589
590 let event = PiStreamEvent::MessageUpdate {
591 assistant_message_event: PiAssistantEvent::Error {
592 reason: "aborted".to_string(),
593 },
594 };
595
596 dispatch_pi_stream_event(event, &mut handler, &mut extracted, &mut state, false);
597 assert_eq!(handler.errors, vec!["aborted"]);
598 }
599
600 #[test]
601 fn test_dispatch_tool_execution_start() {
602 let mut handler = RecordingHandler::default();
603 let mut extracted = String::new();
604 let mut state = PiSessionState::new();
605
606 let event = PiStreamEvent::ToolExecutionStart {
607 tool_call_id: "toolu_123".to_string(),
608 tool_name: "bash".to_string(),
609 args: json!({"command": "echo hello"}),
610 };
611
612 dispatch_pi_stream_event(event, &mut handler, &mut extracted, &mut state, false);
613
614 assert_eq!(handler.tool_calls.len(), 1);
615 assert_eq!(handler.tool_calls[0].0, "bash");
616 assert_eq!(handler.tool_calls[0].1, "toolu_123");
617 assert_eq!(handler.tool_calls[0].2["command"], "echo hello");
618 }
619
620 #[test]
621 fn test_dispatch_tool_execution_end_success() {
622 let mut handler = RecordingHandler::default();
623 let mut extracted = String::new();
624 let mut state = PiSessionState::new();
625
626 let event = PiStreamEvent::ToolExecutionEnd {
627 tool_call_id: "toolu_123".to_string(),
628 tool_name: "bash".to_string(),
629 result: PiToolResult {
630 content: vec![PiContentBlock::Text {
631 text: "hello\n".to_string(),
632 }],
633 },
634 is_error: false,
635 };
636
637 dispatch_pi_stream_event(event, &mut handler, &mut extracted, &mut state, false);
638
639 assert_eq!(handler.tool_results.len(), 1);
640 assert_eq!(handler.tool_results[0].0, "toolu_123");
641 assert_eq!(handler.tool_results[0].1, "hello\n");
642 assert!(handler.errors.is_empty());
643 }
644
645 #[test]
646 fn test_dispatch_tool_execution_end_error() {
647 let mut handler = RecordingHandler::default();
648 let mut extracted = String::new();
649 let mut state = PiSessionState::new();
650
651 let event = PiStreamEvent::ToolExecutionEnd {
652 tool_call_id: "toolu_456".to_string(),
653 tool_name: "Read".to_string(),
654 result: PiToolResult {
655 content: vec![PiContentBlock::Text {
656 text: "file not found".to_string(),
657 }],
658 },
659 is_error: true,
660 };
661
662 dispatch_pi_stream_event(event, &mut handler, &mut extracted, &mut state, false);
663
664 assert!(handler.tool_results.is_empty());
665 assert_eq!(handler.errors, vec!["file not found"]);
666 }
667
668 #[test]
669 fn test_dispatch_turn_end_accumulates_cost() {
670 let mut handler = RecordingHandler::default();
671 let mut extracted = String::new();
672 let mut state = PiSessionState::new();
673
674 for cost in [0.05, 0.03, 0.01] {
676 let event = PiStreamEvent::TurnEnd {
677 message: Some(PiTurnMessage {
678 stop_reason: Some("stop".to_string()),
679 provider: None,
680 model: None,
681 usage: Some(PiUsage {
682 input: 100,
683 output: 50,
684 cache_read: 0,
685 cache_write: 0,
686 cost: Some(PiCost { total: cost }),
687 }),
688 }),
689 };
690 dispatch_pi_stream_event(event, &mut handler, &mut extracted, &mut state, false);
691 }
692
693 assert_eq!(state.num_turns, 3);
694 assert!((state.total_cost_usd - 0.09).abs() < 1e-10);
695 }
696
697 #[test]
698 fn test_dispatch_turn_end_missing_usage() {
699 let mut handler = RecordingHandler::default();
700 let mut extracted = String::new();
701 let mut state = PiSessionState::new();
702
703 let event = PiStreamEvent::TurnEnd {
704 message: Some(PiTurnMessage {
705 stop_reason: Some("stop".to_string()),
706 provider: None,
707 model: None,
708 usage: None,
709 }),
710 };
711
712 dispatch_pi_stream_event(event, &mut handler, &mut extracted, &mut state, false);
713
714 assert_eq!(state.num_turns, 1);
715 assert!((state.total_cost_usd - 0.0).abs() < f64::EPSILON);
716 }
717
718 #[test]
719 fn test_dispatch_turn_end_missing_message() {
720 let mut handler = RecordingHandler::default();
721 let mut extracted = String::new();
722 let mut state = PiSessionState::new();
723
724 let event = PiStreamEvent::TurnEnd { message: None };
725
726 dispatch_pi_stream_event(event, &mut handler, &mut extracted, &mut state, false);
727
728 assert_eq!(state.num_turns, 1);
729 assert!((state.total_cost_usd - 0.0).abs() < f64::EPSILON);
730 }
731
732 #[test]
733 fn test_dispatch_other_is_noop() {
734 let mut handler = RecordingHandler::default();
735 let mut extracted = String::new();
736 let mut state = PiSessionState::new();
737
738 dispatch_pi_stream_event(
739 PiStreamEvent::Other,
740 &mut handler,
741 &mut extracted,
742 &mut state,
743 false,
744 );
745
746 assert!(handler.texts.is_empty());
747 assert!(handler.tool_calls.is_empty());
748 assert!(handler.tool_results.is_empty());
749 assert!(handler.errors.is_empty());
750 assert!(handler.completions.is_empty());
751 assert!(extracted.is_empty());
752 assert_eq!(state.num_turns, 0);
753 }
754
755 #[test]
756 fn test_dispatch_assistant_other_is_noop() {
757 let mut handler = RecordingHandler::default();
758 let mut extracted = String::new();
759 let mut state = PiSessionState::new();
760
761 let event = PiStreamEvent::MessageUpdate {
762 assistant_message_event: PiAssistantEvent::Other,
763 };
764
765 dispatch_pi_stream_event(event, &mut handler, &mut extracted, &mut state, false);
766
767 assert!(handler.texts.is_empty());
768 assert!(handler.errors.is_empty());
769 }
770
771 #[test]
776 fn test_parse_real_session_event() {
777 let json = r#"{"type":"session","version":3,"id":"550e8400-e29b-41d4-a716-446655440000","timestamp":"2026-02-05T02:39:26.125Z","cwd":"/home/user/project"}"#;
778 let event = PiStreamParser::parse_line(json).unwrap();
779 assert!(matches!(event, PiStreamEvent::Other));
780 }
781
782 #[test]
783 fn test_parse_real_tool_execution_start() {
784 let json = r#"{"type":"tool_execution_start","toolCallId":"toolu_01BKzy4E5YAeFLdgwFKtNRqv","toolName":"bash","args":{"command":"echo hello"}}"#;
785 let event = PiStreamParser::parse_line(json).unwrap();
786 match event {
787 PiStreamEvent::ToolExecutionStart {
788 tool_call_id,
789 tool_name,
790 args,
791 } => {
792 assert_eq!(tool_call_id, "toolu_01BKzy4E5YAeFLdgwFKtNRqv");
793 assert_eq!(tool_name, "bash");
794 assert_eq!(args["command"], "echo hello");
795 }
796 _ => panic!("Expected ToolExecutionStart"),
797 }
798 }
799
800 #[test]
801 fn test_parse_real_turn_end() {
802 let json = r#"{"type":"turn_end","message":{"role":"assistant","content":[{"type":"text","text":"Done."}],"api":"anthropic-messages","provider":"anthropic","model":"claude-opus-4-5","usage":{"input":1,"output":14,"cacheRead":8932,"cacheWrite":70,"totalTokens":9017,"cost":{"input":0.000005,"output":0.00035,"cacheRead":0.00447,"cacheWrite":0.00044,"total":0.00526}},"stopReason":"stop","timestamp":1770259166907},"toolResults":[]}"#;
803 let event = PiStreamParser::parse_line(json).unwrap();
804 match event {
805 PiStreamEvent::TurnEnd { message } => {
806 let msg = message.unwrap();
807 assert_eq!(msg.stop_reason, Some("stop".to_string()));
808 assert_eq!(msg.provider, Some("anthropic".to_string()));
809 assert_eq!(msg.model, Some("claude-opus-4-5".to_string()));
810 let usage = msg.usage.unwrap();
811 let cost = usage.cost.unwrap();
812 assert!((cost.total - 0.00526).abs() < 1e-10);
813 }
814 _ => panic!("Expected TurnEnd"),
815 }
816 }
817
818 #[test]
819 fn test_dispatch_turn_end_captures_stream_identity() {
820 let mut handler = RecordingHandler::default();
821 let mut extracted = String::new();
822 let mut state = PiSessionState::new();
823
824 let event = PiStreamEvent::TurnEnd {
825 message: Some(PiTurnMessage {
826 stop_reason: Some("stop".to_string()),
827 provider: Some("anthropic".to_string()),
828 model: Some("claude-sonnet-4".to_string()),
829 usage: None,
830 }),
831 };
832
833 dispatch_pi_stream_event(event, &mut handler, &mut extracted, &mut state, false);
834
835 assert_eq!(state.stream_provider, Some("anthropic".to_string()));
836 assert_eq!(state.stream_model, Some("claude-sonnet-4".to_string()));
837 }
838
839 #[test]
840 fn test_tool_result_multiple_content_blocks() {
841 let mut handler = RecordingHandler::default();
842 let mut extracted = String::new();
843 let mut state = PiSessionState::new();
844
845 let event = PiStreamEvent::ToolExecutionEnd {
846 tool_call_id: "toolu_789".to_string(),
847 tool_name: "Read".to_string(),
848 result: PiToolResult {
849 content: vec![
850 PiContentBlock::Text {
851 text: "line 1".to_string(),
852 },
853 PiContentBlock::Text {
854 text: "line 2".to_string(),
855 },
856 ],
857 },
858 is_error: false,
859 };
860
861 dispatch_pi_stream_event(event, &mut handler, &mut extracted, &mut state, false);
862
863 assert_eq!(handler.tool_results[0].1, "line 1\nline 2");
864 }
865}