1use std::collections::HashSet;
12
13use crate::stream_handler::{SessionResult, StreamHandler};
14use serde_json::Value;
15
16#[derive(Debug, Clone, PartialEq)]
18pub struct CopilotToolRequest {
19 pub tool_call_id: String,
20 pub name: String,
21 pub arguments: Value,
22}
23
24#[derive(Debug, Clone, PartialEq)]
26pub struct CopilotAssistantMessage {
27 pub message_id: Option<String>,
28 pub content: Value,
29 pub tool_requests: Vec<CopilotToolRequest>,
30}
31
32#[derive(Debug, Clone, PartialEq)]
34pub struct CopilotAssistantMessageDelta {
35 pub message_id: Option<String>,
36 pub delta_content: String,
37}
38
39#[derive(Debug, Clone, PartialEq)]
41pub struct CopilotAssistantReasoning {
42 pub reasoning_id: Option<String>,
43 pub content: String,
44}
45
46#[derive(Debug, Clone, PartialEq)]
48pub struct CopilotAssistantReasoningDelta {
49 pub reasoning_id: Option<String>,
50 pub delta_content: String,
51}
52
53#[derive(Debug, Clone, PartialEq)]
55pub struct CopilotTurnBoundary {
56 pub turn_id: Option<String>,
57}
58
59#[derive(Debug, Clone, PartialEq)]
61pub struct CopilotToolExecutionStart {
62 pub tool_call_id: String,
63 pub tool_name: String,
64 pub arguments: Value,
65}
66
67#[derive(Debug, Clone, PartialEq)]
69pub struct CopilotToolExecutionPartialResult {
70 pub tool_call_id: String,
71 pub partial_output: String,
72}
73
74#[derive(Debug, Clone, PartialEq)]
76pub struct CopilotToolExecutionOutput {
77 pub content: Value,
78 pub detailed_content: Option<String>,
79}
80
81#[derive(Debug, Clone, PartialEq)]
83pub struct CopilotToolExecutionError {
84 pub message: String,
85 pub code: Option<String>,
86}
87
88#[derive(Debug, Clone, PartialEq)]
90pub struct CopilotToolExecutionComplete {
91 pub tool_call_id: String,
92 pub success: bool,
93 pub result: Option<CopilotToolExecutionOutput>,
94 pub error: Option<CopilotToolExecutionError>,
95}
96
97impl CopilotToolExecutionComplete {
98 fn output_text(&self) -> Option<String> {
99 self.result.as_ref().and_then(|result| {
100 result
101 .detailed_content
102 .clone()
103 .or_else(|| extract_content_text(&result.content))
104 })
105 }
106
107 fn error_text(&self) -> Option<String> {
108 self.error
109 .as_ref()
110 .map(|error| error.message.clone())
111 .or_else(|| self.output_text())
112 }
113}
114
115#[derive(Debug, Clone, PartialEq)]
117pub struct CopilotResult {
118 pub exit_code: Option<i32>,
119 pub session_duration_ms: Option<u64>,
120 pub total_api_duration_ms: Option<u64>,
121}
122
123#[derive(Debug, Clone, PartialEq)]
125pub enum CopilotStreamEvent {
126 AssistantMessage { data: CopilotAssistantMessage },
128 AssistantMessageDelta { data: CopilotAssistantMessageDelta },
130 AssistantReasoning { data: CopilotAssistantReasoning },
132 AssistantReasoningDelta {
134 data: CopilotAssistantReasoningDelta,
135 },
136 AssistantTurnStart { data: CopilotTurnBoundary },
138 AssistantTurnEnd { data: CopilotTurnBoundary },
140 ToolExecutionStart { data: CopilotToolExecutionStart },
142 ToolExecutionPartialResult {
144 data: CopilotToolExecutionPartialResult,
145 },
146 ToolExecutionComplete { data: CopilotToolExecutionComplete },
148 Result { data: CopilotResult },
150 Other,
152}
153
154#[cfg(test)]
155#[derive(Debug, Clone, PartialEq, Eq)]
156pub(crate) struct CopilotLiveChunk {
157 pub text: String,
158 pub append_newline: bool,
159}
160
161#[derive(Debug, Default, Clone)]
162pub(crate) struct CopilotStreamState {
163 streamed_message_ids: HashSet<String>,
164 completed_turns: u32,
165}
166
167impl CopilotStreamState {
168 pub(crate) fn new() -> Self {
169 Self::default()
170 }
171}
172
173pub struct CopilotStreamParser;
175
176impl CopilotStreamParser {
177 pub fn parse_line(line: &str) -> Option<CopilotStreamEvent> {
181 let trimmed = line.trim();
182 if trimmed.is_empty() {
183 return None;
184 }
185
186 let value = match serde_json::from_str::<Value>(trimmed) {
187 Ok(value) => value,
188 Err(e) => {
189 tracing::debug!(
190 "Skipping malformed JSON line: {} (error: {})",
191 truncate(trimmed, 100),
192 e
193 );
194 return None;
195 }
196 };
197
198 match value.get("type").and_then(Value::as_str) {
199 Some("assistant.message") => Some(CopilotStreamEvent::AssistantMessage {
200 data: parse_assistant_message(&value),
201 }),
202 Some("assistant.message_delta") => Some(CopilotStreamEvent::AssistantMessageDelta {
203 data: CopilotAssistantMessageDelta {
204 message_id: data_str(&value, "messageId"),
205 delta_content: data_str(&value, "deltaContent").unwrap_or_default(),
206 },
207 }),
208 Some("assistant.reasoning") => Some(CopilotStreamEvent::AssistantReasoning {
209 data: CopilotAssistantReasoning {
210 reasoning_id: data_str(&value, "reasoningId"),
211 content: data_str(&value, "content").unwrap_or_default(),
212 },
213 }),
214 Some("assistant.reasoning_delta") => {
215 Some(CopilotStreamEvent::AssistantReasoningDelta {
216 data: CopilotAssistantReasoningDelta {
217 reasoning_id: data_str(&value, "reasoningId"),
218 delta_content: data_str(&value, "deltaContent").unwrap_or_default(),
219 },
220 })
221 }
222 Some("assistant.turn_start") => Some(CopilotStreamEvent::AssistantTurnStart {
223 data: CopilotTurnBoundary {
224 turn_id: data_str(&value, "turnId"),
225 },
226 }),
227 Some("assistant.turn_end") => Some(CopilotStreamEvent::AssistantTurnEnd {
228 data: CopilotTurnBoundary {
229 turn_id: data_str(&value, "turnId"),
230 },
231 }),
232 Some("tool.execution_start") => parse_tool_execution_start(&value)
233 .map(|data| CopilotStreamEvent::ToolExecutionStart { data })
234 .or(Some(CopilotStreamEvent::Other)),
235 Some("tool.execution_partial_result") => parse_tool_execution_partial_result(&value)
236 .map(|data| CopilotStreamEvent::ToolExecutionPartialResult { data })
237 .or(Some(CopilotStreamEvent::Other)),
238 Some("tool.execution_complete") => parse_tool_execution_complete(&value)
239 .map(|data| CopilotStreamEvent::ToolExecutionComplete { data })
240 .or(Some(CopilotStreamEvent::Other)),
241 Some("result") => Some(CopilotStreamEvent::Result {
242 data: parse_result(&value),
243 }),
244 Some(_) => Some(CopilotStreamEvent::Other),
245 None => None,
246 }
247 }
248
249 pub fn extract_text(line: &str) -> Option<String> {
251 match Self::parse_line(line)? {
252 CopilotStreamEvent::AssistantMessage { data } => extract_content_text(&data.content),
253 CopilotStreamEvent::AssistantMessageDelta { .. }
254 | CopilotStreamEvent::AssistantReasoning { .. }
255 | CopilotStreamEvent::AssistantReasoningDelta { .. }
256 | CopilotStreamEvent::AssistantTurnStart { .. }
257 | CopilotStreamEvent::AssistantTurnEnd { .. }
258 | CopilotStreamEvent::ToolExecutionStart { .. }
259 | CopilotStreamEvent::ToolExecutionPartialResult { .. }
260 | CopilotStreamEvent::ToolExecutionComplete { .. }
261 | CopilotStreamEvent::Result { .. }
262 | CopilotStreamEvent::Other => None,
263 }
264 }
265
266 #[cfg(test)]
270 pub(crate) fn extract_live_chunk(
271 line: &str,
272 state: &mut CopilotStreamState,
273 ) -> Option<CopilotLiveChunk> {
274 match Self::parse_line(line)? {
275 CopilotStreamEvent::AssistantMessageDelta { data } => {
276 if let Some(message_id) = data.message_id {
277 state.streamed_message_ids.insert(message_id);
278 }
279
280 if data.delta_content.is_empty() {
281 None
282 } else {
283 Some(CopilotLiveChunk {
284 text: data.delta_content,
285 append_newline: false,
286 })
287 }
288 }
289 CopilotStreamEvent::AssistantMessage { data } => {
290 if should_suppress_full_message(data.message_id.as_deref(), state) {
291 return Some(CopilotLiveChunk {
292 text: String::new(),
293 append_newline: true,
294 });
295 }
296
297 extract_content_text(&data.content).map(|text| CopilotLiveChunk {
298 text,
299 append_newline: true,
300 })
301 }
302 CopilotStreamEvent::AssistantReasoning { .. }
303 | CopilotStreamEvent::AssistantReasoningDelta { .. }
304 | CopilotStreamEvent::AssistantTurnStart { .. }
305 | CopilotStreamEvent::AssistantTurnEnd { .. }
306 | CopilotStreamEvent::ToolExecutionStart { .. }
307 | CopilotStreamEvent::ToolExecutionPartialResult { .. }
308 | CopilotStreamEvent::ToolExecutionComplete { .. }
309 | CopilotStreamEvent::Result { .. }
310 | CopilotStreamEvent::Other => None,
311 }
312 }
313
314 pub fn extract_all_text(raw_output: &str) -> String {
316 let mut extracted = String::new();
317
318 for line in raw_output.lines() {
319 let Some(text) = Self::extract_text(line) else {
320 continue;
321 };
322 Self::append_text_chunk(&mut extracted, &text);
323 }
324
325 extracted
326 }
327
328 pub fn append_text_chunk(output: &mut String, chunk: &str) {
330 output.push_str(chunk);
331 if !chunk.ends_with('\n') {
332 output.push('\n');
333 }
334 }
335}
336
337pub(crate) fn dispatch_copilot_stream_event<H: StreamHandler>(
338 event: CopilotStreamEvent,
339 handler: &mut H,
340 extracted_text: &mut String,
341 state: &mut CopilotStreamState,
342) -> Option<SessionResult> {
343 match event {
344 CopilotStreamEvent::AssistantMessageDelta { data } => {
345 if let Some(message_id) = data.message_id {
346 state.streamed_message_ids.insert(message_id);
347 }
348
349 if !data.delta_content.is_empty() {
350 handler.on_text(&data.delta_content);
351 }
352 None
353 }
354 CopilotStreamEvent::AssistantMessage { data } => {
355 let message_text = extract_content_text(&data.content);
356
357 if should_suppress_full_message(data.message_id.as_deref(), state) {
358 handler.on_text("\n");
359 } else if let Some(text) = message_text.as_deref() {
360 handler.on_text(text);
361 }
362
363 if let Some(text) = message_text {
364 CopilotStreamParser::append_text_chunk(extracted_text, &text);
365 }
366 None
367 }
368 CopilotStreamEvent::AssistantReasoning { .. }
369 | CopilotStreamEvent::AssistantReasoningDelta { .. }
370 | CopilotStreamEvent::AssistantTurnStart { .. }
371 | CopilotStreamEvent::ToolExecutionPartialResult { .. }
372 | CopilotStreamEvent::Other => None,
373 CopilotStreamEvent::AssistantTurnEnd { .. } => {
374 state.completed_turns += 1;
375 None
376 }
377 CopilotStreamEvent::ToolExecutionStart { data } => {
378 handler.on_tool_call(&data.tool_name, &data.tool_call_id, &data.arguments);
379 None
380 }
381 CopilotStreamEvent::ToolExecutionComplete { data } => {
382 if data.success {
383 handler.on_tool_result(&data.tool_call_id, &data.output_text().unwrap_or_default());
384 } else {
385 handler.on_error(
386 &data
387 .error_text()
388 .unwrap_or_else(|| format!("Tool execution failed: {}", data.tool_call_id)),
389 );
390 }
391 None
392 }
393 CopilotStreamEvent::Result { data } => {
394 let exit_code = data.exit_code.unwrap_or_default();
395 let session_result = SessionResult {
396 duration_ms: data
397 .session_duration_ms
398 .or(data.total_api_duration_ms)
399 .unwrap_or_default(),
400 total_cost_usd: 0.0,
401 num_turns: state.completed_turns,
402 is_error: exit_code != 0,
403 ..Default::default()
404 };
405 if session_result.is_error {
406 handler.on_error(&format!("Session ended with exit code {exit_code}"));
407 }
408 handler.on_complete(&session_result);
409 Some(session_result)
410 }
411 }
412}
413
414fn parse_assistant_message(value: &Value) -> CopilotAssistantMessage {
415 let tool_requests = value
416 .get("data")
417 .and_then(|data| data.get("toolRequests"))
418 .and_then(Value::as_array)
419 .map(|items| {
420 items
421 .iter()
422 .filter_map(parse_tool_request)
423 .collect::<Vec<_>>()
424 })
425 .unwrap_or_default();
426
427 CopilotAssistantMessage {
428 message_id: data_str(value, "messageId"),
429 content: data_value(value, "content").cloned().unwrap_or(Value::Null),
430 tool_requests,
431 }
432}
433
434fn parse_tool_request(value: &Value) -> Option<CopilotToolRequest> {
435 Some(CopilotToolRequest {
436 tool_call_id: value.get("toolCallId").and_then(Value::as_str)?.to_string(),
437 name: value.get("name").and_then(Value::as_str)?.to_string(),
438 arguments: value.get("arguments").cloned().unwrap_or(Value::Null),
439 })
440}
441
442fn parse_tool_execution_start(value: &Value) -> Option<CopilotToolExecutionStart> {
443 Some(CopilotToolExecutionStart {
444 tool_call_id: data_str(value, "toolCallId")?,
445 tool_name: data_str(value, "toolName")?,
446 arguments: data_value(value, "arguments")
447 .cloned()
448 .unwrap_or(Value::Null),
449 })
450}
451
452fn parse_tool_execution_partial_result(value: &Value) -> Option<CopilotToolExecutionPartialResult> {
453 Some(CopilotToolExecutionPartialResult {
454 tool_call_id: data_str(value, "toolCallId")?,
455 partial_output: data_str(value, "partialOutput").unwrap_or_default(),
456 })
457}
458
459fn parse_tool_execution_complete(value: &Value) -> Option<CopilotToolExecutionComplete> {
460 Some(CopilotToolExecutionComplete {
461 tool_call_id: data_str(value, "toolCallId")?,
462 success: data_bool(value, "success").unwrap_or(false),
463 result: data_value(value, "result").map(parse_tool_execution_output),
464 error: data_value(value, "error").and_then(parse_tool_execution_error),
465 })
466}
467
468fn parse_tool_execution_output(value: &Value) -> CopilotToolExecutionOutput {
469 CopilotToolExecutionOutput {
470 content: value.get("content").cloned().unwrap_or(Value::Null),
471 detailed_content: value
472 .get("detailedContent")
473 .and_then(Value::as_str)
474 .map(ToOwned::to_owned),
475 }
476}
477
478fn parse_tool_execution_error(value: &Value) -> Option<CopilotToolExecutionError> {
479 Some(CopilotToolExecutionError {
480 message: value.get("message").and_then(Value::as_str)?.to_string(),
481 code: value
482 .get("code")
483 .and_then(Value::as_str)
484 .map(ToOwned::to_owned),
485 })
486}
487
488fn parse_result(value: &Value) -> CopilotResult {
489 let usage = value.get("usage");
490 CopilotResult {
491 exit_code: value
492 .get("exitCode")
493 .and_then(Value::as_i64)
494 .and_then(|code| i32::try_from(code).ok()),
495 session_duration_ms: usage
496 .and_then(|usage| usage.get("sessionDurationMs"))
497 .and_then(Value::as_u64),
498 total_api_duration_ms: usage
499 .and_then(|usage| usage.get("totalApiDurationMs"))
500 .and_then(Value::as_u64),
501 }
502}
503
504fn data_value<'a>(value: &'a Value, key: &str) -> Option<&'a Value> {
505 value.get("data").and_then(|data| data.get(key))
506}
507
508fn data_str(value: &Value, key: &str) -> Option<String> {
509 data_value(value, key)
510 .and_then(Value::as_str)
511 .map(ToOwned::to_owned)
512}
513
514fn data_bool(value: &Value, key: &str) -> Option<bool> {
515 data_value(value, key).and_then(Value::as_bool)
516}
517
518fn should_suppress_full_message(message_id: Option<&str>, state: &CopilotStreamState) -> bool {
519 message_id.is_some_and(|message_id| state.streamed_message_ids.contains(message_id))
520}
521
522fn extract_content_text(value: &Value) -> Option<String> {
523 match value {
524 Value::String(text) => Some(text.clone()),
525 Value::Array(items) => {
526 let mut combined = String::new();
527 for item in items {
528 let text = match item {
529 Value::String(text) => Some(text.clone()),
530 Value::Object(map) => map
531 .get("text")
532 .and_then(Value::as_str)
533 .map(ToOwned::to_owned),
534 _ => None,
535 };
536 if let Some(text) = text {
537 combined.push_str(&text);
538 }
539 }
540
541 if combined.is_empty() {
542 None
543 } else {
544 Some(combined)
545 }
546 }
547 Value::Object(map) => map
548 .get("text")
549 .and_then(Value::as_str)
550 .map(ToOwned::to_owned),
551 _ => None,
552 }
553}
554
555fn truncate(s: &str, max_len: usize) -> String {
556 if s.len() <= max_len {
557 s.to_string()
558 } else {
559 let boundary = s
560 .char_indices()
561 .take_while(|(i, _)| *i < max_len)
562 .last()
563 .map(|(i, c)| i + c.len_utf8())
564 .unwrap_or(0);
565 format!("{}...", &s[..boundary])
566 }
567}
568
569#[cfg(test)]
570mod tests {
571 use super::{
572 CopilotLiveChunk, CopilotStreamEvent, CopilotStreamParser, CopilotStreamState,
573 dispatch_copilot_stream_event,
574 };
575 use crate::stream_handler::{SessionResult, StreamHandler};
576 use serde_json::{Value, json};
577
578 #[derive(Default)]
579 struct RecordingHandler {
580 texts: Vec<String>,
581 tool_calls: Vec<(String, String, serde_json::Value)>,
582 tool_results: Vec<(String, String)>,
583 errors: Vec<String>,
584 completions: Vec<SessionResult>,
585 }
586
587 impl StreamHandler for RecordingHandler {
588 fn on_text(&mut self, text: &str) {
589 self.texts.push(text.to_string());
590 }
591
592 fn on_tool_call(&mut self, name: &str, id: &str, input: &serde_json::Value) {
593 self.tool_calls
594 .push((name.to_string(), id.to_string(), input.clone()));
595 }
596
597 fn on_tool_result(&mut self, id: &str, output: &str) {
598 self.tool_results.push((id.to_string(), output.to_string()));
599 }
600
601 fn on_error(&mut self, error: &str) {
602 self.errors.push(error.to_string());
603 }
604
605 fn on_complete(&mut self, result: &SessionResult) {
606 self.completions.push(result.clone());
607 }
608 }
609
610 #[test]
611 fn test_parse_assistant_message_content() {
612 let line = r#"{"type":"assistant.message","data":{"messageId":"msg-1","content":"hello world","toolRequests":[]}}"#;
613 let event = CopilotStreamParser::parse_line(line).unwrap();
614
615 match event {
616 CopilotStreamEvent::AssistantMessage { data } => {
617 assert_eq!(data.message_id.as_deref(), Some("msg-1"));
618 assert_eq!(data.content, Value::String("hello world".to_string()));
619 assert!(data.tool_requests.is_empty());
620 }
621 _ => panic!("Expected AssistantMessage event"),
622 }
623 }
624
625 #[test]
626 fn test_parse_assistant_message_delta() {
627 let line = r#"{"type":"assistant.message_delta","data":{"messageId":"msg-1","deltaContent":"hello"}}"#;
628 let event = CopilotStreamParser::parse_line(line).unwrap();
629
630 match event {
631 CopilotStreamEvent::AssistantMessageDelta { data } => {
632 assert_eq!(data.message_id.as_deref(), Some("msg-1"));
633 assert_eq!(data.delta_content, "hello");
634 }
635 _ => panic!("Expected AssistantMessageDelta event"),
636 }
637 }
638
639 #[test]
640 fn test_parse_assistant_message_with_tool_requests() {
641 let line = r#"{"type":"assistant.message","data":{"messageId":"msg-1","content":"Let me inspect that.","toolRequests":[{"toolCallId":"tool-1","name":"bash","arguments":{"command":"echo hi"},"type":"function"}]}}"#;
642 let event = CopilotStreamParser::parse_line(line).unwrap();
643
644 match event {
645 CopilotStreamEvent::AssistantMessage { data } => {
646 assert_eq!(data.message_id.as_deref(), Some("msg-1"));
647 assert_eq!(
648 data.content,
649 Value::String("Let me inspect that.".to_string())
650 );
651 assert_eq!(data.tool_requests.len(), 1);
652 assert_eq!(data.tool_requests[0].tool_call_id, "tool-1");
653 assert_eq!(data.tool_requests[0].name, "bash");
654 assert_eq!(
655 data.tool_requests[0].arguments,
656 json!({"command": "echo hi"})
657 );
658 }
659 _ => panic!("Expected AssistantMessage event"),
660 }
661 }
662
663 #[test]
664 fn test_parse_assistant_reasoning_delta() {
665 let line = r#"{"type":"assistant.reasoning_delta","data":{"reasoningId":"reason-1","deltaContent":"Thinking..."}}"#;
666 let event = CopilotStreamParser::parse_line(line).unwrap();
667
668 match event {
669 CopilotStreamEvent::AssistantReasoningDelta { data } => {
670 assert_eq!(data.reasoning_id.as_deref(), Some("reason-1"));
671 assert_eq!(data.delta_content, "Thinking...");
672 }
673 _ => panic!("Expected AssistantReasoningDelta event"),
674 }
675 }
676
677 #[test]
678 fn test_parse_tool_execution_start() {
679 let line = r#"{"type":"tool.execution_start","data":{"toolCallId":"tool-1","toolName":"bash","arguments":{"command":"echo hi"}}}"#;
680 let event = CopilotStreamParser::parse_line(line).unwrap();
681
682 match event {
683 CopilotStreamEvent::ToolExecutionStart { data } => {
684 assert_eq!(data.tool_call_id, "tool-1");
685 assert_eq!(data.tool_name, "bash");
686 assert_eq!(data.arguments, json!({"command": "echo hi"}));
687 }
688 _ => panic!("Expected ToolExecutionStart event"),
689 }
690 }
691
692 #[test]
693 fn test_parse_tool_execution_complete_success() {
694 let line = r#"{"type":"tool.execution_complete","data":{"toolCallId":"tool-1","success":true,"result":{"content":"hi\n","detailedContent":"hi\n"}}}"#;
695 let event = CopilotStreamParser::parse_line(line).unwrap();
696
697 match event {
698 CopilotStreamEvent::ToolExecutionComplete { data } => {
699 assert_eq!(data.tool_call_id, "tool-1");
700 assert!(data.success);
701 assert_eq!(
702 data.result.and_then(|result| result.detailed_content),
703 Some("hi\n".to_string())
704 );
705 }
706 _ => panic!("Expected ToolExecutionComplete event"),
707 }
708 }
709
710 #[test]
711 fn test_parse_result_event() {
712 let line = r#"{"type":"result","exitCode":0,"usage":{"totalApiDurationMs":12,"sessionDurationMs":34}}"#;
713 let event = CopilotStreamParser::parse_line(line).unwrap();
714
715 match event {
716 CopilotStreamEvent::Result { data } => {
717 assert_eq!(data.exit_code, Some(0));
718 assert_eq!(data.total_api_duration_ms, Some(12));
719 assert_eq!(data.session_duration_ms, Some(34));
720 }
721 _ => panic!("Expected Result event"),
722 }
723 }
724
725 #[test]
726 fn test_extract_text_ignores_non_assistant_lines() {
727 let line = r#"{"type":"assistant.turn_start","data":{"turnId":"0"}}"#;
728 assert_eq!(CopilotStreamParser::extract_text(line), None);
729 }
730
731 #[test]
732 fn test_extract_live_chunk_streams_deltas_without_duplication() {
733 let mut state = CopilotStreamState::new();
734 let delta = r#"{"type":"assistant.message_delta","data":{"messageId":"msg-1","deltaContent":"Hello"}}"#;
735 let message =
736 r#"{"type":"assistant.message","data":{"messageId":"msg-1","content":"Hello"}}"#;
737
738 assert_eq!(
739 CopilotStreamParser::extract_live_chunk(delta, &mut state),
740 Some(CopilotLiveChunk {
741 text: "Hello".to_string(),
742 append_newline: false,
743 })
744 );
745 assert_eq!(
746 CopilotStreamParser::extract_live_chunk(message, &mut state),
747 Some(CopilotLiveChunk {
748 text: String::new(),
749 append_newline: true,
750 })
751 );
752 }
753
754 #[test]
755 fn test_extract_all_text_aggregates_text_from_jsonl() {
756 let raw = concat!(
757 "{\"type\":\"assistant.turn_start\",\"data\":{\"turnId\":\"0\"}}\n",
758 "{\"type\":\"assistant.message_delta\",\"data\":{\"messageId\":\"msg-1\",\"deltaContent\":\"ignored\"}}\n",
759 "{\"type\":\"assistant.message\",\"data\":{\"content\":\"First line\"}}\n",
760 "{\"type\":\"assistant.message\",\"data\":{\"content\":\"LOOP_COMPLETE\"}}\n",
761 "{\"type\":\"result\",\"exitCode\":0}\n"
762 );
763
764 assert_eq!(
765 CopilotStreamParser::extract_all_text(raw),
766 "First line\nLOOP_COMPLETE\n"
767 );
768 }
769
770 #[test]
771 fn test_sdk_events_outside_supported_subset_parse_as_other() {
772 let intent = r#"{"type":"assistant.intent","data":{"intent":"Reviewing parser changes"}}"#;
773 let idle = r#"{"type":"session.idle","data":{"backgroundTasks":{}}}"#;
774
775 assert_eq!(
776 CopilotStreamParser::parse_line(intent),
777 Some(CopilotStreamEvent::Other)
778 );
779 assert_eq!(
780 CopilotStreamParser::parse_line(idle),
781 Some(CopilotStreamEvent::Other)
782 );
783 }
784
785 #[test]
786 fn test_dispatch_tool_execution_events_routes_handler_callbacks() {
787 let mut handler = RecordingHandler::default();
788 let mut extracted = String::new();
789 let mut state = CopilotStreamState::new();
790
791 let start = CopilotStreamParser::parse_line(
792 r#"{"type":"tool.execution_start","data":{"toolCallId":"tool-1","toolName":"bash","arguments":{"command":"echo hi"}}}"#,
793 )
794 .unwrap();
795 dispatch_copilot_stream_event(start, &mut handler, &mut extracted, &mut state);
796
797 let complete = CopilotStreamParser::parse_line(
798 r#"{"type":"tool.execution_complete","data":{"toolCallId":"tool-1","success":true,"result":{"content":"hi\n","detailedContent":"hi\n"}}}"#,
799 )
800 .unwrap();
801 dispatch_copilot_stream_event(complete, &mut handler, &mut extracted, &mut state);
802
803 assert_eq!(
804 handler.tool_calls,
805 vec![(
806 "bash".to_string(),
807 "tool-1".to_string(),
808 json!({"command": "echo hi"}),
809 )]
810 );
811 assert_eq!(
812 handler.tool_results,
813 vec![("tool-1".to_string(), "hi\n".to_string())]
814 );
815 assert!(handler.errors.is_empty());
816 assert!(extracted.is_empty());
817 }
818
819 #[test]
820 fn test_dispatch_suppressed_full_message_still_records_extracted_text() {
821 let mut handler = RecordingHandler::default();
822 let mut extracted = String::new();
823 let mut state = CopilotStreamState::new();
824 state.streamed_message_ids.insert("msg-1".to_string());
825
826 let message = CopilotStreamParser::parse_line(
827 r#"{"type":"assistant.message","data":{"messageId":"msg-1","content":"Checking parser"}}"#,
828 )
829 .unwrap();
830 dispatch_copilot_stream_event(message, &mut handler, &mut extracted, &mut state);
831
832 assert_eq!(handler.texts, vec!["\n".to_string()]);
833 assert_eq!(extracted, "Checking parser\n");
834 }
835
836 #[test]
837 fn test_dispatch_tool_execution_complete_error_routes_handler_error() {
838 let mut handler = RecordingHandler::default();
839 let mut extracted = String::new();
840 let mut state = CopilotStreamState::new();
841
842 let complete = CopilotStreamParser::parse_line(
843 r#"{"type":"tool.execution_complete","data":{"toolCallId":"tool-1","success":false,"error":{"message":"rg: unrecognized file type: rs","code":"failure"}}}"#,
844 )
845 .unwrap();
846 dispatch_copilot_stream_event(complete, &mut handler, &mut extracted, &mut state);
847
848 assert!(handler.tool_results.is_empty());
849 assert_eq!(
850 handler.errors,
851 vec!["rg: unrecognized file type: rs".to_string()]
852 );
853 assert!(extracted.is_empty());
854 }
855
856 #[test]
857 fn test_dispatch_result_routes_completion() {
858 let mut handler = RecordingHandler::default();
859 let mut extracted = String::new();
860 let mut state = CopilotStreamState::new();
861
862 let turn_end = CopilotStreamParser::parse_line(
863 r#"{"type":"assistant.turn_end","data":{"turnId":"0"}}"#,
864 )
865 .unwrap();
866 dispatch_copilot_stream_event(turn_end, &mut handler, &mut extracted, &mut state);
867
868 let result = CopilotStreamParser::parse_line(
869 r#"{"type":"result","exitCode":0,"usage":{"sessionDurationMs":34}}"#,
870 )
871 .unwrap();
872 let session_result =
873 dispatch_copilot_stream_event(result, &mut handler, &mut extracted, &mut state)
874 .expect("session result");
875
876 assert_eq!(session_result.duration_ms, 34);
877 assert_eq!(session_result.num_turns, 1);
878 assert!(!session_result.is_error);
879 assert_eq!(handler.completions.len(), 1);
880 assert_eq!(handler.completions[0].duration_ms, 34);
881 assert_eq!(handler.completions[0].num_turns, 1);
882 }
883}