1use std::collections::{BTreeMap, BTreeSet};
2
3#[derive(Clone, Debug, PartialEq, Eq)]
4pub struct TextContent {
5 pub text: String,
6}
7
8#[derive(Clone, Debug, PartialEq, Eq)]
9pub struct ThinkingContent {
10 pub thinking: String,
11}
12
13#[derive(Clone, Debug, PartialEq, Eq)]
14pub struct ToolCall {
15 pub id: String,
16 pub name: String,
17 pub arguments: String,
18}
19
20#[derive(Clone, Debug, PartialEq, Eq)]
21pub struct ToolResult {
22 pub tool_call_id: String,
23 pub content: String,
24 pub is_error: bool,
25}
26
27#[derive(Clone, Debug, PartialEq, Eq)]
28pub struct JsonEvent {
29 pub event_type: String,
30 pub text: String,
31 pub thinking: String,
32 pub tool_call: Option<ToolCall>,
33 pub tool_result: Option<ToolResult>,
34}
35
36#[derive(Clone, Debug, PartialEq)]
37pub struct ParsedJsonOutput {
38 pub schema_name: String,
39 pub events: Vec<JsonEvent>,
40 pub final_text: String,
41 pub session_id: String,
42 pub error: String,
43 pub usage: BTreeMap<String, i64>,
44 pub cost_usd: f64,
45 pub duration_ms: i64,
46 pub unknown_json_lines: Vec<String>,
47}
48
49fn new_output(schema_name: &str) -> ParsedJsonOutput {
50 ParsedJsonOutput {
51 schema_name: schema_name.into(),
52 events: Vec::new(),
53 final_text: String::new(),
54 session_id: String::new(),
55 error: String::new(),
56 usage: BTreeMap::new(),
57 cost_usd: 0.0,
58 duration_ms: 0,
59 unknown_json_lines: Vec::new(),
60 }
61}
62
63fn parser_state(
64 result: &ParsedJsonOutput,
65) -> (
66 usize,
67 String,
68 String,
69 String,
70 BTreeMap<String, i64>,
71 i64,
72 u64,
73) {
74 (
75 result.events.len(),
76 result.final_text.clone(),
77 result.error.clone(),
78 result.session_id.clone(),
79 result.usage.clone(),
80 result.duration_ms,
81 result.cost_usd.to_bits(),
82 )
83}
84
85fn parse_json_line(line: &str) -> Option<serde_json::Value> {
86 let trimmed = line.trim();
87 if trimmed.is_empty() {
88 return None;
89 }
90 let value: serde_json::Value = serde_json::from_str(trimmed).ok()?;
91 if value.is_object() {
92 Some(value)
93 } else {
94 None
95 }
96}
97
98fn apply_opencode_obj(result: &mut ParsedJsonOutput, obj: &serde_json::Value) {
99 if let Some(text) = obj.get("response").and_then(|value| value.as_str()) {
100 result.final_text = text.to_string();
101 result.events.push(JsonEvent {
102 event_type: "text".into(),
103 text: text.into(),
104 thinking: String::new(),
105 tool_call: None,
106 tool_result: None,
107 });
108 } else if let Some(err) = obj.get("error").and_then(|value| value.as_str()) {
109 result.error = err.to_string();
110 result.events.push(JsonEvent {
111 event_type: "error".into(),
112 text: err.into(),
113 thinking: String::new(),
114 tool_call: None,
115 tool_result: None,
116 });
117 } else if obj.get("type").and_then(|value| value.as_str()) == Some("step_start") {
118 result.session_id = obj
119 .get("sessionID")
120 .and_then(|value| value.as_str())
121 .unwrap_or(&result.session_id)
122 .to_string();
123 } else if obj.get("type").and_then(|value| value.as_str()) == Some("text") {
124 if let Some(part) = obj.get("part").and_then(|value| value.as_object()) {
125 if let Some(text) = part.get("text").and_then(|value| value.as_str()) {
126 if !text.is_empty() {
127 result.final_text = text.to_string();
128 result.events.push(JsonEvent {
129 event_type: "text".into(),
130 text: text.into(),
131 thinking: String::new(),
132 tool_call: None,
133 tool_result: None,
134 });
135 }
136 }
137 }
138 } else if obj.get("type").and_then(|value| value.as_str()) == Some("tool_use") {
139 if let Some(part) = obj.get("part").and_then(|value| value.as_object()) {
140 let tool_name = part
141 .get("tool")
142 .and_then(|value| value.as_str())
143 .unwrap_or("")
144 .to_string();
145 let call_id = part
146 .get("callID")
147 .and_then(|value| value.as_str())
148 .unwrap_or("")
149 .to_string();
150 let state = part
151 .get("state")
152 .and_then(|value| value.as_object())
153 .cloned()
154 .unwrap_or_default();
155 let tool_input = state
156 .get("input")
157 .cloned()
158 .unwrap_or(serde_json::Value::Null);
159 let tool_output = state
160 .get("output")
161 .and_then(|value| value.as_str())
162 .unwrap_or("")
163 .to_string();
164 let is_error = state
165 .get("status")
166 .and_then(|value| value.as_str())
167 .map(|value| value.eq_ignore_ascii_case("error"))
168 .unwrap_or(false);
169 result.events.push(JsonEvent {
170 event_type: "tool_use".into(),
171 text: String::new(),
172 thinking: String::new(),
173 tool_call: Some(ToolCall {
174 id: call_id.clone(),
175 name: tool_name,
176 arguments: serde_json::to_string(&tool_input).unwrap_or_default(),
177 }),
178 tool_result: None,
179 });
180 result.events.push(JsonEvent {
181 event_type: "tool_result".into(),
182 text: String::new(),
183 thinking: String::new(),
184 tool_call: None,
185 tool_result: Some(ToolResult {
186 tool_call_id: call_id,
187 content: tool_output,
188 is_error,
189 }),
190 });
191 }
192 } else if obj.get("type").and_then(|value| value.as_str()) == Some("step_finish") {
193 if let Some(part) = obj.get("part").and_then(|value| value.as_object()) {
194 if let Some(tokens) = part.get("tokens").and_then(|value| value.as_object()) {
195 let mut usage = BTreeMap::new();
196 for key in ["total", "input", "output", "reasoning"] {
197 if let Some(value) = tokens.get(key).and_then(|value| value.as_i64()) {
198 usage.insert(key.to_string(), value);
199 }
200 }
201 if let Some(cache) = tokens.get("cache").and_then(|value| value.as_object()) {
202 for key in ["write", "read"] {
203 if let Some(value) = cache.get(key).and_then(|value| value.as_i64()) {
204 usage.insert(format!("cache_{key}"), value);
205 }
206 }
207 }
208 if !usage.is_empty() {
209 result.usage = usage;
210 }
211 }
212 if let Some(cost) = part.get("cost").and_then(|value| value.as_f64()) {
213 result.cost_usd = cost;
214 }
215 }
216 }
217}
218
219fn apply_claude_obj(result: &mut ParsedJsonOutput, obj: &serde_json::Value) {
220 let msg_type = obj
221 .get("type")
222 .and_then(|value| value.as_str())
223 .unwrap_or("");
224 match msg_type {
225 "system" => {
226 let subtype = obj
227 .get("subtype")
228 .and_then(|value| value.as_str())
229 .unwrap_or("");
230 if subtype == "init" {
231 result.session_id = obj
232 .get("session_id")
233 .and_then(|value| value.as_str())
234 .unwrap_or("")
235 .to_string();
236 } else if subtype == "api_retry" {
237 result.events.push(JsonEvent {
238 event_type: "system_retry".into(),
239 text: String::new(),
240 thinking: String::new(),
241 tool_call: None,
242 tool_result: None,
243 });
244 }
245 }
246 "assistant" => {
247 if let Some(message) = obj.get("message").and_then(|value| value.as_object()) {
248 if let Some(content) = message.get("content").and_then(|value| value.as_array()) {
249 let texts: Vec<String> = content
250 .iter()
251 .filter(|block| {
252 block.get("type").and_then(|value| value.as_str()) == Some("text")
253 })
254 .filter_map(|block| block.get("text").and_then(|value| value.as_str()))
255 .map(|text| text.to_string())
256 .collect();
257 if !texts.is_empty() {
258 result.final_text = texts.join("\n");
259 result.events.push(JsonEvent {
260 event_type: "assistant".into(),
261 text: result.final_text.clone(),
262 thinking: String::new(),
263 tool_call: None,
264 tool_result: None,
265 });
266 }
267 }
268 if let Some(usage) = message.get("usage").and_then(|value| value.as_object()) {
269 result.usage = usage
270 .iter()
271 .filter_map(|(key, value)| value.as_i64().map(|count| (key.clone(), count)))
272 .collect();
273 }
274 }
275 }
276 "user" => {
277 if let Some(message) = obj.get("message").and_then(|value| value.as_object()) {
278 if let Some(content) = message.get("content").and_then(|value| value.as_array()) {
279 for block in content {
280 if block.get("type").and_then(|value| value.as_str()) == Some("tool_result")
281 {
282 result.events.push(JsonEvent {
283 event_type: "tool_result".into(),
284 text: String::new(),
285 thinking: String::new(),
286 tool_call: None,
287 tool_result: Some(ToolResult {
288 tool_call_id: block
289 .get("tool_use_id")
290 .and_then(|value| value.as_str())
291 .unwrap_or("")
292 .to_string(),
293 content: block
294 .get("content")
295 .and_then(|value| value.as_str())
296 .unwrap_or("")
297 .to_string(),
298 is_error: block
299 .get("is_error")
300 .and_then(|value| value.as_bool())
301 .unwrap_or(false),
302 }),
303 });
304 }
305 }
306 }
307 }
308 }
309 "stream_event" => {
310 if let Some(event) = obj.get("event").and_then(|value| value.as_object()) {
311 let event_type = event
312 .get("type")
313 .and_then(|value| value.as_str())
314 .unwrap_or("");
315 if event_type == "content_block_delta" {
316 if let Some(delta) = event.get("delta").and_then(|value| value.as_object()) {
317 let delta_type = delta
318 .get("type")
319 .and_then(|value| value.as_str())
320 .unwrap_or("");
321 match delta_type {
322 "text_delta" => result.events.push(JsonEvent {
323 event_type: "text_delta".into(),
324 text: delta
325 .get("text")
326 .and_then(|value| value.as_str())
327 .unwrap_or("")
328 .to_string(),
329 thinking: String::new(),
330 tool_call: None,
331 tool_result: None,
332 }),
333 "thinking_delta" => result.events.push(JsonEvent {
334 event_type: "thinking_delta".into(),
335 text: String::new(),
336 thinking: delta
337 .get("thinking")
338 .and_then(|value| value.as_str())
339 .unwrap_or("")
340 .to_string(),
341 tool_call: None,
342 tool_result: None,
343 }),
344 "input_json_delta" => result.events.push(JsonEvent {
345 event_type: "tool_input_delta".into(),
346 text: delta
347 .get("partial_json")
348 .and_then(|value| value.as_str())
349 .unwrap_or("")
350 .to_string(),
351 thinking: String::new(),
352 tool_call: None,
353 tool_result: None,
354 }),
355 _ => {}
356 }
357 }
358 } else if event_type == "content_block_start" {
359 if let Some(content_block) = event
360 .get("content_block")
361 .and_then(|value| value.as_object())
362 {
363 let block_type = content_block
364 .get("type")
365 .and_then(|value| value.as_str())
366 .unwrap_or("");
367 if block_type == "thinking" {
368 result.events.push(JsonEvent {
369 event_type: "thinking_start".into(),
370 text: String::new(),
371 thinking: String::new(),
372 tool_call: None,
373 tool_result: None,
374 });
375 } else if block_type == "tool_use" {
376 result.events.push(JsonEvent {
377 event_type: "tool_use_start".into(),
378 text: String::new(),
379 thinking: String::new(),
380 tool_call: Some(ToolCall {
381 id: content_block
382 .get("id")
383 .and_then(|value| value.as_str())
384 .unwrap_or("")
385 .to_string(),
386 name: content_block
387 .get("name")
388 .and_then(|value| value.as_str())
389 .unwrap_or("")
390 .to_string(),
391 arguments: String::new(),
392 }),
393 tool_result: None,
394 });
395 }
396 }
397 }
398 }
399 }
400 "tool_use" => {
401 let tool_input = obj
402 .get("tool_input")
403 .cloned()
404 .unwrap_or(serde_json::Value::Null);
405 result.events.push(JsonEvent {
406 event_type: "tool_use".into(),
407 text: String::new(),
408 thinking: String::new(),
409 tool_call: Some(ToolCall {
410 id: String::new(),
411 name: obj
412 .get("tool_name")
413 .and_then(|value| value.as_str())
414 .unwrap_or("")
415 .to_string(),
416 arguments: serde_json::to_string(&tool_input).unwrap_or_default(),
417 }),
418 tool_result: None,
419 });
420 }
421 "tool_result" => {
422 result.events.push(JsonEvent {
423 event_type: "tool_result".into(),
424 text: String::new(),
425 thinking: String::new(),
426 tool_call: None,
427 tool_result: Some(ToolResult {
428 tool_call_id: obj
429 .get("tool_use_id")
430 .and_then(|value| value.as_str())
431 .unwrap_or("")
432 .to_string(),
433 content: obj
434 .get("content")
435 .and_then(|value| value.as_str())
436 .unwrap_or("")
437 .to_string(),
438 is_error: obj
439 .get("is_error")
440 .and_then(|value| value.as_bool())
441 .unwrap_or(false),
442 }),
443 });
444 }
445 "result" => {
446 let subtype = obj
447 .get("subtype")
448 .and_then(|value| value.as_str())
449 .unwrap_or("");
450 if subtype == "success" {
451 result.final_text = obj
452 .get("result")
453 .and_then(|value| value.as_str())
454 .unwrap_or(&result.final_text)
455 .to_string();
456 result.cost_usd = obj
457 .get("cost_usd")
458 .and_then(|value| value.as_f64())
459 .unwrap_or(0.0);
460 result.duration_ms = obj
461 .get("duration_ms")
462 .and_then(|value| value.as_i64())
463 .unwrap_or(0);
464 if let Some(usage) = obj.get("usage").and_then(|value| value.as_object()) {
465 result.usage = usage
466 .iter()
467 .filter_map(|(key, value)| value.as_i64().map(|count| (key.clone(), count)))
468 .collect();
469 }
470 result.events.push(JsonEvent {
471 event_type: "result".into(),
472 text: result.final_text.clone(),
473 thinking: String::new(),
474 tool_call: None,
475 tool_result: None,
476 });
477 } else if subtype == "error" {
478 result.error = obj
479 .get("error")
480 .and_then(|value| value.as_str())
481 .unwrap_or("")
482 .to_string();
483 result.events.push(JsonEvent {
484 event_type: "error".into(),
485 text: result.error.clone(),
486 thinking: String::new(),
487 tool_call: None,
488 tool_result: None,
489 });
490 }
491 }
492 _ => {}
493 }
494}
495
496fn apply_kimi_obj(result: &mut ParsedJsonOutput, obj: &serde_json::Value) {
497 let passthrough_events = [
498 "TurnBegin",
499 "StepBegin",
500 "StepInterrupted",
501 "TurnEnd",
502 "StatusUpdate",
503 "HookTriggered",
504 "HookResolved",
505 "ApprovalRequest",
506 "SubagentEvent",
507 "ToolCallRequest",
508 ];
509 let wire_type = obj
510 .get("type")
511 .and_then(|value| value.as_str())
512 .unwrap_or("");
513 if passthrough_events.contains(&wire_type) {
514 result.events.push(JsonEvent {
515 event_type: wire_type.to_ascii_lowercase(),
516 text: String::new(),
517 thinking: String::new(),
518 tool_call: None,
519 tool_result: None,
520 });
521 return;
522 }
523
524 let role = obj
525 .get("role")
526 .and_then(|value| value.as_str())
527 .unwrap_or("");
528 if role == "assistant" {
529 if let Some(text) = obj.get("content").and_then(|value| value.as_str()) {
530 result.final_text = text.to_string();
531 result.events.push(JsonEvent {
532 event_type: "assistant".into(),
533 text: text.to_string(),
534 thinking: String::new(),
535 tool_call: None,
536 tool_result: None,
537 });
538 } else if let Some(parts) = obj.get("content").and_then(|value| value.as_array()) {
539 let mut texts = Vec::new();
540 for part in parts {
541 let part_type = part
542 .get("type")
543 .and_then(|value| value.as_str())
544 .unwrap_or("");
545 if part_type == "text" {
546 if let Some(text) = part.get("text").and_then(|value| value.as_str()) {
547 texts.push(text.to_string());
548 }
549 } else if part_type == "think" {
550 result.events.push(JsonEvent {
551 event_type: "thinking".into(),
552 text: String::new(),
553 thinking: part
554 .get("think")
555 .and_then(|value| value.as_str())
556 .unwrap_or("")
557 .to_string(),
558 tool_call: None,
559 tool_result: None,
560 });
561 }
562 }
563 if !texts.is_empty() {
564 result.final_text = texts.join("\n");
565 result.events.push(JsonEvent {
566 event_type: "assistant".into(),
567 text: result.final_text.clone(),
568 thinking: String::new(),
569 tool_call: None,
570 tool_result: None,
571 });
572 }
573 }
574 if let Some(tool_calls) = obj.get("tool_calls").and_then(|value| value.as_array()) {
575 for tool_call in tool_calls {
576 let function = tool_call
577 .get("function")
578 .and_then(|value| value.as_object());
579 result.events.push(JsonEvent {
580 event_type: "tool_call".into(),
581 text: String::new(),
582 thinking: String::new(),
583 tool_call: Some(ToolCall {
584 id: tool_call
585 .get("id")
586 .and_then(|value| value.as_str())
587 .unwrap_or("")
588 .to_string(),
589 name: function
590 .and_then(|f| f.get("name"))
591 .and_then(|value| value.as_str())
592 .unwrap_or("")
593 .to_string(),
594 arguments: function
595 .and_then(|f| f.get("arguments"))
596 .and_then(|value| value.as_str())
597 .unwrap_or("")
598 .to_string(),
599 }),
600 tool_result: None,
601 });
602 }
603 }
604 } else if role == "tool" {
605 let mut texts = Vec::new();
606 if let Some(parts) = obj.get("content").and_then(|value| value.as_array()) {
607 for part in parts {
608 if part.get("type").and_then(|value| value.as_str()) == Some("text") {
609 if let Some(text) = part.get("text").and_then(|value| value.as_str()) {
610 if !text.starts_with("<system>") {
611 texts.push(text.to_string());
612 }
613 }
614 }
615 }
616 }
617 result.events.push(JsonEvent {
618 event_type: "tool_result".into(),
619 text: String::new(),
620 thinking: String::new(),
621 tool_call: None,
622 tool_result: Some(ToolResult {
623 tool_call_id: obj
624 .get("tool_call_id")
625 .and_then(|value| value.as_str())
626 .unwrap_or("")
627 .to_string(),
628 content: texts.join("\n"),
629 is_error: false,
630 }),
631 });
632 }
633}
634
635fn message_text(message: &serde_json::Value) -> String {
636 if let Some(text) = message.get("content").and_then(|value| value.as_str()) {
637 return text.to_string();
638 }
639 let Some(content) = message.get("content").and_then(|value| value.as_array()) else {
640 return String::new();
641 };
642 content
643 .iter()
644 .filter(|block| block.get("type").and_then(|value| value.as_str()) == Some("text"))
645 .filter_map(|block| block.get("text").and_then(|value| value.as_str()))
646 .map(str::to_string)
647 .collect::<Vec<_>>()
648 .join("\n")
649}
650
651fn normalize_cursor_text(text: &str) -> String {
652 text.trim_matches('\n').to_string()
653}
654
655fn apply_cursor_agent_obj(result: &mut ParsedJsonOutput, obj: &serde_json::Value) {
656 match obj
657 .get("type")
658 .and_then(|value| value.as_str())
659 .unwrap_or("")
660 {
661 "system" => {
662 if obj.get("subtype").and_then(|value| value.as_str()) == Some("init") {
663 result.session_id = obj
664 .get("session_id")
665 .and_then(|value| value.as_str())
666 .unwrap_or("")
667 .to_string();
668 }
669 }
670 "assistant" => {
671 let text =
672 normalize_cursor_text(&obj.get("message").map(message_text).unwrap_or_default());
673 if !text.is_empty() {
674 result.final_text = text.clone();
675 result.events.push(JsonEvent {
676 event_type: "assistant".into(),
677 text,
678 thinking: String::new(),
679 tool_call: None,
680 tool_result: None,
681 });
682 }
683 }
684 "result" => {
685 if let Some(session_id) = obj.get("session_id").and_then(|value| value.as_str()) {
686 result.session_id = session_id.to_string();
687 }
688 if let Some(duration) = obj.get("duration_ms").and_then(|value| value.as_i64()) {
689 result.duration_ms = duration;
690 }
691 if let Some(usage) = obj.get("usage").and_then(|value| value.as_object()) {
692 result.usage = usage
693 .iter()
694 .filter_map(|(key, value)| value.as_i64().map(|number| (key.clone(), number)))
695 .collect();
696 }
697 let is_error = obj
698 .get("is_error")
699 .and_then(|value| value.as_bool())
700 .unwrap_or(false);
701 let subtype = obj
702 .get("subtype")
703 .and_then(|value| value.as_str())
704 .unwrap_or("");
705 if subtype == "success" && !is_error {
706 let text = normalize_cursor_text(
707 obj.get("result")
708 .and_then(|value| value.as_str())
709 .unwrap_or(&result.final_text),
710 );
711 result.final_text = text.clone();
712 if !text.is_empty() {
713 result.events.push(JsonEvent {
714 event_type: "result".into(),
715 text,
716 thinking: String::new(),
717 tool_call: None,
718 tool_result: None,
719 });
720 }
721 } else {
722 let text = obj
723 .get("error")
724 .or_else(|| obj.get("result"))
725 .and_then(|value| value.as_str())
726 .unwrap_or("")
727 .to_string();
728 result.error = text.clone();
729 result.events.push(JsonEvent {
730 event_type: "error".into(),
731 text,
732 thinking: String::new(),
733 tool_call: None,
734 tool_result: None,
735 });
736 }
737 }
738 _ => {}
739 }
740}
741
742fn apply_codex_obj(result: &mut ParsedJsonOutput, obj: &serde_json::Value) -> bool {
743 match obj
744 .get("type")
745 .and_then(|value| value.as_str())
746 .unwrap_or("")
747 {
748 "thread.started" => {
749 result.session_id = obj
750 .get("thread_id")
751 .and_then(|value| value.as_str())
752 .unwrap_or("")
753 .to_string();
754 true
755 }
756 "turn.started" => true,
757 "turn.completed" => {
758 if let Some(usage) = obj.get("usage").and_then(|value| value.as_object()) {
759 result.usage = usage
760 .iter()
761 .filter_map(|(key, value)| value.as_i64().map(|count| (key.clone(), count)))
762 .collect();
763 }
764 true
765 }
766 "error" => {
767 let text = obj
768 .get("message")
769 .or_else(|| obj.get("error"))
770 .map(codex_error_event_text)
771 .unwrap_or_default();
772 record_codex_error(result, text)
773 }
774 "turn.failed" => {
775 let text = obj
776 .get("error")
777 .map(codex_error_event_text)
778 .unwrap_or_default();
779 record_codex_error(result, text)
780 }
781 "item.started" | "item.completed" => {
782 let Some(item) = obj.get("item").and_then(|value| value.as_object()) else {
783 return false;
784 };
785 let item_type = item
786 .get("type")
787 .and_then(|value| value.as_str())
788 .unwrap_or("");
789 if item_type == "agent_message"
790 && obj.get("type").and_then(|value| value.as_str()) == Some("item.completed")
791 {
792 let text = item
793 .get("text")
794 .and_then(|value| value.as_str())
795 .unwrap_or("")
796 .to_string();
797 result.final_text = text.clone();
798 result.events.push(JsonEvent {
799 event_type: "assistant".into(),
800 text,
801 thinking: String::new(),
802 tool_call: None,
803 tool_result: None,
804 });
805 true
806 } else if item_type == "command_execution" {
807 let call_id = item
808 .get("id")
809 .and_then(|value| value.as_str())
810 .unwrap_or("")
811 .to_string();
812 let command = item
813 .get("command")
814 .and_then(|value| value.as_str())
815 .unwrap_or("")
816 .to_string();
817 if obj.get("type").and_then(|value| value.as_str()) == Some("item.started") {
818 result.events.push(JsonEvent {
819 event_type: "tool_use_start".into(),
820 text: String::new(),
821 thinking: String::new(),
822 tool_call: Some(ToolCall {
823 id: call_id,
824 name: "command_execution".into(),
825 arguments: serde_json::json!({ "command": command }).to_string(),
826 }),
827 tool_result: None,
828 });
829 true
830 } else {
831 let status = item
832 .get("status")
833 .and_then(|value| value.as_str())
834 .unwrap_or("");
835 let exit_code = item.get("exit_code").and_then(|value| value.as_i64());
836 result.events.push(JsonEvent {
837 event_type: "tool_result".into(),
838 text: String::new(),
839 thinking: String::new(),
840 tool_call: None,
841 tool_result: Some(ToolResult {
842 tool_call_id: call_id,
843 content: item
844 .get("aggregated_output")
845 .and_then(|value| value.as_str())
846 .unwrap_or("")
847 .to_string(),
848 is_error: exit_code.is_some_and(|code| code != 0)
849 || (!status.is_empty() && status != "completed"),
850 }),
851 });
852 true
853 }
854 } else {
855 false
856 }
857 }
858 _ => false,
859 }
860}
861
862fn record_codex_error(result: &mut ParsedJsonOutput, text: String) -> bool {
863 if text.is_empty() {
864 return false;
865 }
866 if result.error == text {
867 return true;
868 }
869 result.error = text.clone();
870 result.events.push(JsonEvent {
871 event_type: "error".into(),
872 text,
873 thinking: String::new(),
874 tool_call: None,
875 tool_result: None,
876 });
877 true
878}
879
880fn codex_error_event_text(value: &serde_json::Value) -> String {
881 if let Some(obj) = value.as_object() {
882 if let Some(message) = obj.get("message") {
883 let nested = codex_error_event_text(message);
884 if !nested.is_empty() {
885 return nested;
886 }
887 }
888 return format_codex_error_payload(value);
889 }
890
891 let Some(text) = value.as_str().map(str::trim) else {
892 return String::new();
893 };
894 if text.is_empty() {
895 return String::new();
896 }
897 let Ok(decoded) = serde_json::from_str::<serde_json::Value>(text) else {
898 return text.to_string();
899 };
900 if decoded.as_object().is_some() {
901 let formatted = format_codex_error_payload(&decoded);
902 if !formatted.is_empty() {
903 return formatted;
904 }
905 }
906 text.to_string()
907}
908
909fn format_codex_error_payload(payload: &serde_json::Value) -> String {
910 let error = payload.get("error");
911 let error_obj = error.and_then(|value| value.as_object());
912 let message = error_obj
913 .and_then(|obj| obj.get("message"))
914 .or_else(|| payload.get("message"))
915 .or(error)
916 .and_then(|value| value.as_str())
917 .unwrap_or("");
918 if message.is_empty() {
919 return String::new();
920 }
921
922 let status = payload.get("status").and_then(|value| value.as_i64());
923 let error_type = error_obj
924 .and_then(|obj| obj.get("type"))
925 .or_else(|| payload.get("type"))
926 .and_then(|value| value.as_str())
927 .unwrap_or("");
928 let mut prefix = String::new();
929 if !error_type.is_empty() && error_type != "error" {
930 prefix.push_str(error_type);
931 }
932 if let Some(status) = status {
933 if prefix.is_empty() {
934 prefix = format!("HTTP {status}");
935 } else {
936 prefix = format!("{prefix} ({status})");
937 }
938 }
939 if prefix.is_empty() {
940 message.to_string()
941 } else {
942 format!("{prefix}: {message}")
943 }
944}
945
946fn apply_gemini_stats(
947 result: &mut ParsedJsonOutput,
948 stats: &serde_json::Map<String, serde_json::Value>,
949) {
950 let usage: BTreeMap<String, i64> = stats
951 .iter()
952 .filter_map(|(key, value)| value.as_i64().map(|count| (key.clone(), count)))
953 .collect();
954 if !usage.is_empty() {
955 result.usage = usage;
956 }
957 if let Some(duration_ms) = stats.get("duration_ms").and_then(|value| value.as_i64()) {
958 result.duration_ms = duration_ms;
959 }
960}
961
962fn apply_gemini_obj(result: &mut ParsedJsonOutput, obj: &serde_json::Value) -> bool {
963 if let Some(session_id) = obj.get("session_id").and_then(|value| value.as_str()) {
964 if !session_id.is_empty() {
965 result.session_id = session_id.to_string();
966 }
967 }
968
969 if let Some(response) = obj.get("response").and_then(|value| value.as_str()) {
970 result.final_text = response.to_string();
971 if !response.is_empty() {
972 result.events.push(JsonEvent {
973 event_type: "assistant".into(),
974 text: response.into(),
975 thinking: String::new(),
976 tool_call: None,
977 tool_result: None,
978 });
979 }
980 if let Some(stats) = obj.get("stats").and_then(|value| value.as_object()) {
981 apply_gemini_stats(result, stats);
982 }
983 return true;
984 }
985
986 match obj
987 .get("type")
988 .and_then(|value| value.as_str())
989 .unwrap_or("")
990 {
991 "init" => true,
992 "message" => {
993 let role = obj
994 .get("role")
995 .and_then(|value| value.as_str())
996 .unwrap_or("");
997 if role == "assistant" {
998 let text = obj
999 .get("content")
1000 .and_then(|value| value.as_str())
1001 .unwrap_or("");
1002 result.final_text.push_str(text);
1003 if !text.is_empty() {
1004 result.events.push(JsonEvent {
1005 event_type: if obj
1006 .get("delta")
1007 .and_then(|value| value.as_bool())
1008 .unwrap_or(false)
1009 {
1010 "text_delta".into()
1011 } else {
1012 "assistant".into()
1013 },
1014 text: text.into(),
1015 thinking: String::new(),
1016 tool_call: None,
1017 tool_result: None,
1018 });
1019 }
1020 true
1021 } else {
1022 role == "user"
1023 }
1024 }
1025 "result" => {
1026 if let Some(stats) = obj.get("stats").and_then(|value| value.as_object()) {
1027 apply_gemini_stats(result, stats);
1028 }
1029 let status = obj
1030 .get("status")
1031 .and_then(|value| value.as_str())
1032 .unwrap_or("");
1033 if !status.is_empty() && status != "success" {
1034 result.error = obj
1035 .get("error")
1036 .and_then(|value| value.as_str())
1037 .unwrap_or(status)
1038 .to_string();
1039 result.events.push(JsonEvent {
1040 event_type: "error".into(),
1041 text: result.error.clone(),
1042 thinking: String::new(),
1043 tool_call: None,
1044 tool_result: None,
1045 });
1046 }
1047 true
1048 }
1049 _ => false,
1050 }
1051}
1052
1053pub fn parse_opencode_json(raw: &str) -> ParsedJsonOutput {
1054 let mut result = new_output("opencode");
1055 for line in raw.lines() {
1056 if let Some(obj) = parse_json_line(line) {
1057 let before = parser_state(&result);
1058 apply_opencode_obj(&mut result, &obj);
1059 let after = parser_state(&result);
1060 if before == after {
1061 result.unknown_json_lines.push(line.trim().to_string());
1062 }
1063 }
1064 }
1065 result
1066}
1067
1068pub fn parse_claude_code_json(raw: &str) -> ParsedJsonOutput {
1069 let mut result = new_output("claude-code");
1070 for line in raw.lines() {
1071 if let Some(obj) = parse_json_line(line) {
1072 let before = parser_state(&result);
1073 apply_claude_obj(&mut result, &obj);
1074 let after = parser_state(&result);
1075 if before == after {
1076 result.unknown_json_lines.push(line.trim().to_string());
1077 }
1078 }
1079 }
1080 result
1081}
1082
1083pub fn parse_kimi_json(raw: &str) -> ParsedJsonOutput {
1084 let mut result = new_output("kimi");
1085 for line in raw.lines() {
1086 if let Some(obj) = parse_json_line(line) {
1087 let before = parser_state(&result);
1088 apply_kimi_obj(&mut result, &obj);
1089 let after = parser_state(&result);
1090 if before == after {
1091 result.unknown_json_lines.push(line.trim().to_string());
1092 }
1093 }
1094 }
1095 result
1096}
1097
1098pub fn parse_cursor_agent_json(raw: &str) -> ParsedJsonOutput {
1099 let mut result = new_output("cursor-agent");
1100 for line in raw.lines() {
1101 if let Some(obj) = parse_json_line(line) {
1102 let before = parser_state(&result);
1103 apply_cursor_agent_obj(&mut result, &obj);
1104 let after = parser_state(&result);
1105 if before == after {
1106 result.unknown_json_lines.push(line.trim().to_string());
1107 }
1108 }
1109 }
1110 result
1111}
1112
1113pub fn parse_codex_json(raw: &str) -> ParsedJsonOutput {
1114 let mut result = new_output("codex");
1115 for line in raw.lines() {
1116 if let Some(obj) = parse_json_line(line) {
1117 if !apply_codex_obj(&mut result, &obj) {
1118 result.unknown_json_lines.push(line.trim().to_string());
1119 }
1120 }
1121 }
1122 result
1123}
1124
1125pub fn parse_gemini_json(raw: &str) -> ParsedJsonOutput {
1126 let mut result = new_output("gemini");
1127 for line in raw.lines() {
1128 if let Some(obj) = parse_json_line(line) {
1129 if !apply_gemini_obj(&mut result, &obj) {
1130 result.unknown_json_lines.push(line.trim().to_string());
1131 }
1132 }
1133 }
1134 result
1135}
1136
1137pub fn parse_json_output(raw: &str, schema: &str) -> ParsedJsonOutput {
1138 match schema {
1139 "opencode" => parse_opencode_json(raw),
1140 "claude-code" => parse_claude_code_json(raw),
1141 "kimi" => parse_kimi_json(raw),
1142 "cursor-agent" => parse_cursor_agent_json(raw),
1143 "codex" => parse_codex_json(raw),
1144 "gemini" => parse_gemini_json(raw),
1145 _ => ParsedJsonOutput {
1146 schema_name: schema.into(),
1147 events: Vec::new(),
1148 final_text: String::new(),
1149 session_id: String::new(),
1150 error: format!("unknown schema: {schema}"),
1151 usage: BTreeMap::new(),
1152 cost_usd: 0.0,
1153 duration_ms: 0,
1154 unknown_json_lines: Vec::new(),
1155 },
1156 }
1157}
1158
1159fn truncate_to_char_limit(text: &str, max_chars: usize) -> Option<String> {
1160 text.char_indices()
1161 .nth(max_chars)
1162 .map(|(index, _)| text[..index].to_string())
1163}
1164
1165fn summarize_text(text: &str, max_lines: usize, max_chars: usize) -> String {
1166 let lines: Vec<&str> = text.trim().lines().collect();
1167 if lines.is_empty() {
1168 return String::new();
1169 }
1170 let mut clipped = lines
1171 .into_iter()
1172 .take(max_lines)
1173 .collect::<Vec<_>>()
1174 .join("\n");
1175 let mut truncated = text.trim().lines().count() > max_lines;
1176 if let Some(safe_clipped) = truncate_to_char_limit(&clipped, max_chars) {
1177 clipped = safe_clipped;
1178 clipped = clipped.trim_end().to_string();
1179 truncated = true;
1180 }
1181 if truncated {
1182 clipped.push_str(" …");
1183 }
1184 clipped
1185}
1186
1187fn parse_tool_arguments(arguments: &str) -> Option<serde_json::Map<String, serde_json::Value>> {
1188 let value: serde_json::Value = serde_json::from_str(arguments).ok()?;
1189 value.as_object().cloned()
1190}
1191
1192fn bash_command_preview(tool_call: &ToolCall) -> Option<String> {
1193 let args = parse_tool_arguments(&tool_call.arguments)?;
1194 for key in ["command", "cmd", "bash_command", "script"] {
1195 if let Some(value) = args.get(key).and_then(|value| value.as_str()) {
1196 let mut preview = value.trim().to_string();
1197 if preview.is_empty() {
1198 continue;
1199 }
1200 if let Some(safe_preview) = truncate_to_char_limit(&preview, 400) {
1201 preview = safe_preview.trim_end().to_string() + " …";
1202 }
1203 return Some(preview);
1204 }
1205 }
1206 None
1207}
1208
1209fn tool_preview(tool_name: &str, text: &str) -> String {
1210 match tool_name.to_ascii_lowercase().as_str() {
1211 "read" | "write" | "edit" | "multiedit" | "read_file" | "write_file" | "edit_file" => {
1212 String::new()
1213 }
1214 _ => summarize_text(text, 8, 400),
1215 }
1216}
1217
1218pub fn resolve_human_tty(tty: bool, force_color: Option<&str>, no_color: Option<&str>) -> bool {
1219 if force_color.is_some_and(|value| !value.is_empty()) {
1220 return true;
1221 }
1222 if no_color.is_some_and(|value| !value.is_empty()) {
1223 return false;
1224 }
1225 tty
1226}
1227
1228fn style(text: &str, code: &str, tty: bool) -> String {
1229 if tty {
1230 format!("\x1b[{code}m{text}\x1b[0m")
1231 } else {
1232 text.to_string()
1233 }
1234}
1235
1236pub struct FormattedRenderer {
1237 show_thinking: bool,
1238 tty: bool,
1239 seen_final_texts: BTreeSet<String>,
1240 tool_calls_by_id: BTreeMap<String, ToolCall>,
1241 pending_tool_call: Option<ToolCall>,
1242 streamed_assistant_buffer: String,
1243 plain_text_tool_work: bool,
1244}
1245
1246impl FormattedRenderer {
1247 pub fn new(show_thinking: bool, tty: bool) -> Self {
1248 Self {
1249 show_thinking,
1250 tty,
1251 seen_final_texts: BTreeSet::new(),
1252 tool_calls_by_id: BTreeMap::new(),
1253 pending_tool_call: None,
1254 streamed_assistant_buffer: String::new(),
1255 plain_text_tool_work: false,
1256 }
1257 }
1258
1259 pub fn render_output(&mut self, output: &ParsedJsonOutput) -> String {
1260 output
1261 .events
1262 .iter()
1263 .filter_map(|event| self.render_event(event))
1264 .collect::<Vec<_>>()
1265 .join("\n")
1266 }
1267
1268 pub fn render_event(&mut self, event: &JsonEvent) -> Option<String> {
1269 match event.event_type.as_str() {
1270 "text_delta" if !event.text.is_empty() => {
1271 self.streamed_assistant_buffer.push_str(&event.text);
1272 Some(self.render_message("assistant", &event.text))
1273 }
1274 "text" | "assistant" if !event.text.is_empty() => {
1275 if !self.streamed_assistant_buffer.is_empty()
1276 && event.text == self.streamed_assistant_buffer
1277 {
1278 self.seen_final_texts.insert(event.text.clone());
1279 self.streamed_assistant_buffer.clear();
1280 None
1281 } else {
1282 self.streamed_assistant_buffer.clear();
1283 Some(self.render_message("assistant", &event.text))
1284 }
1285 }
1286 "result" if !event.text.is_empty() => {
1287 if !self.streamed_assistant_buffer.is_empty()
1288 && event.text == self.streamed_assistant_buffer
1289 {
1290 self.seen_final_texts.insert(event.text.clone());
1291 self.streamed_assistant_buffer.clear();
1292 None
1293 } else if self.seen_final_texts.contains(&event.text) {
1294 None
1295 } else {
1296 self.streamed_assistant_buffer.clear();
1297 Some(self.render_message("success", &event.text))
1298 }
1299 }
1300 "thinking" | "thinking_delta" if !event.thinking.is_empty() && self.show_thinking => {
1301 Some(self.render_message("thinking", &event.thinking))
1302 }
1303 "tool_use" | "tool_use_start" | "tool_call" => {
1304 if let Some(tool_call) = &event.tool_call {
1305 self.streamed_assistant_buffer.clear();
1306 if !tool_call.id.is_empty() {
1307 self.tool_calls_by_id
1308 .insert(tool_call.id.clone(), tool_call.clone());
1309 }
1310 self.pending_tool_call = Some(tool_call.clone());
1311 self.plain_text_tool_work = true;
1312 Some(self.render_tool_start(tool_call))
1313 } else {
1314 None
1315 }
1316 }
1317 "tool_input_delta" if !event.text.is_empty() => {
1318 if let Some(tool_call) = &mut self.pending_tool_call {
1319 tool_call.arguments.push_str(&event.text);
1320 if !tool_call.id.is_empty() {
1321 self.tool_calls_by_id
1322 .insert(tool_call.id.clone(), tool_call.clone());
1323 }
1324 }
1325 None
1326 }
1327 "tool_result" => event.tool_result.as_ref().map(|tool_result| {
1328 self.streamed_assistant_buffer.clear();
1329 self.render_tool_result(tool_result)
1330 }),
1331 "error" if !event.text.is_empty() => {
1332 self.streamed_assistant_buffer.clear();
1333 Some(self.render_message("error", &event.text))
1334 }
1335 _ => None,
1336 }
1337 }
1338
1339 fn render_message(&mut self, kind: &str, text: &str) -> String {
1340 if matches!(kind, "assistant" | "success") {
1341 self.seen_final_texts.insert(text.to_string());
1342 }
1343 let prefix = match kind {
1344 "assistant" => renderer_prefix(
1345 "💬",
1346 "[assistant]",
1347 "96",
1348 self.tty,
1349 self.plain_text_tool_work,
1350 ),
1351 "thinking" => renderer_prefix(
1352 "🧠",
1353 "[thinking]",
1354 "2;35",
1355 self.tty,
1356 self.plain_text_tool_work,
1357 ),
1358 "success" => renderer_prefix("✅", "[ok]", "92", self.tty, self.plain_text_tool_work),
1359 _ => renderer_prefix("❌", "[error]", "91", self.tty, self.plain_text_tool_work),
1360 };
1361 with_prefix(&prefix, text)
1362 }
1363
1364 fn render_tool_start(&self, tool_call: &ToolCall) -> String {
1365 let prefix = prefix("🛠️", "[tool:start]", "94", self.tty);
1366 let mut detail = tool_call.name.clone();
1367 if let Some(preview) = bash_command_preview(tool_call) {
1368 detail.push_str(": ");
1369 detail.push_str(&preview);
1370 }
1371 with_prefix(&prefix, &detail)
1372 }
1373
1374 fn render_tool_result(&self, tool_result: &ToolResult) -> String {
1375 let prefix = prefix("📎", "[tool:result]", "36", self.tty);
1376 let tool_call = self
1377 .tool_calls_by_id
1378 .get(&tool_result.tool_call_id)
1379 .or(self.pending_tool_call.as_ref());
1380 let tool_name = tool_call
1381 .map(|tool_call| tool_call.name.clone())
1382 .unwrap_or_else(|| "tool".into());
1383 let mut summary = format!(
1384 "{} ({})",
1385 tool_name,
1386 if tool_result.is_error { "error" } else { "ok" }
1387 );
1388 if let Some(tool_call) = tool_call {
1389 if let Some(preview) = bash_command_preview(tool_call) {
1390 summary.push_str(": ");
1391 summary.push_str(&preview);
1392 }
1393 }
1394 let preview = tool_preview(&tool_name, &tool_result.content);
1395 if !preview.is_empty() {
1396 summary.push('\n');
1397 summary.push_str(&preview);
1398 }
1399 with_prefix(&prefix, &summary)
1400 }
1401}
1402
1403fn prefix(emoji: &str, plain: &str, color_code: &str, tty: bool) -> String {
1404 if tty {
1405 style(emoji, color_code, true)
1406 } else {
1407 plain.to_string()
1408 }
1409}
1410
1411fn renderer_prefix(
1412 emoji: &str,
1413 plain: &str,
1414 color_code: &str,
1415 tty: bool,
1416 plain_text_tool_work: bool,
1417) -> String {
1418 if tty {
1419 return style(emoji, color_code, true);
1420 }
1421 if plain_text_tool_work && matches!(plain, "[assistant]" | "[thinking]" | "[ok]" | "[error]") {
1422 return plain.to_string();
1423 }
1424 plain.to_string()
1425}
1426
1427fn with_prefix(prefix: &str, text: &str) -> String {
1428 text.lines()
1429 .map(|line| {
1430 if line.is_empty() {
1431 prefix.to_string()
1432 } else {
1433 format!("{prefix} {line}")
1434 }
1435 })
1436 .collect::<Vec<_>>()
1437 .join("\n")
1438}
1439
1440pub struct StructuredStreamProcessor {
1441 schema: String,
1442 renderer: FormattedRenderer,
1443 output: ParsedJsonOutput,
1444 buffer: String,
1445 unknown_json_lines: Vec<String>,
1446}
1447
1448impl StructuredStreamProcessor {
1449 pub fn new(schema: &str, renderer: FormattedRenderer) -> Self {
1450 Self {
1451 schema: schema.into(),
1452 renderer,
1453 output: new_output(schema),
1454 buffer: String::new(),
1455 unknown_json_lines: Vec::new(),
1456 }
1457 }
1458
1459 pub fn output(&self) -> &ParsedJsonOutput {
1460 &self.output
1461 }
1462
1463 pub fn feed(&mut self, chunk: &str) -> String {
1464 self.buffer.push_str(chunk);
1465 let mut rendered = Vec::new();
1466 while let Some(index) = self.buffer.find('\n') {
1467 let line = self.buffer[..index].to_string();
1468 self.buffer = self.buffer[index + 1..].to_string();
1469 if let Some(obj) = parse_json_line(&line) {
1470 let before = parser_state(&self.output);
1471 let event_count = self.output.events.len();
1472 let recognized = self.apply(&obj);
1473 let after = parser_state(&self.output);
1474 if before == after && !recognized {
1475 self.unknown_json_lines.push(line.trim().to_string());
1476 }
1477 for event in &self.output.events[event_count..] {
1478 if let Some(text) = self.renderer.render_event(event) {
1479 rendered.push(text);
1480 }
1481 }
1482 }
1483 }
1484 rendered.join("\n")
1485 }
1486
1487 pub fn finish(&mut self) -> String {
1488 if self.buffer.trim().is_empty() {
1489 return String::new();
1490 }
1491 let line = std::mem::take(&mut self.buffer);
1492 if let Some(obj) = parse_json_line(&line) {
1493 let before = parser_state(&self.output);
1494 let event_count = self.output.events.len();
1495 let recognized = self.apply(&obj);
1496 let after = parser_state(&self.output);
1497 if before == after && !recognized {
1498 self.unknown_json_lines.push(line.trim().to_string());
1499 }
1500 return self.output.events[event_count..]
1501 .iter()
1502 .filter_map(|event| self.renderer.render_event(event))
1503 .collect::<Vec<_>>()
1504 .join("\n");
1505 }
1506 String::new()
1507 }
1508
1509 pub fn take_unknown_json_lines(&mut self) -> Vec<String> {
1510 std::mem::take(&mut self.unknown_json_lines)
1511 }
1512
1513 fn apply(&mut self, obj: &serde_json::Value) -> bool {
1514 match self.schema.as_str() {
1515 "opencode" => {
1516 apply_opencode_obj(&mut self.output, obj);
1517 false
1518 }
1519 "claude-code" => {
1520 apply_claude_obj(&mut self.output, obj);
1521 false
1522 }
1523 "kimi" => {
1524 apply_kimi_obj(&mut self.output, obj);
1525 false
1526 }
1527 "cursor-agent" => {
1528 apply_cursor_agent_obj(&mut self.output, obj);
1529 false
1530 }
1531 "codex" => apply_codex_obj(&mut self.output, obj),
1532 "gemini" => apply_gemini_obj(&mut self.output, obj),
1533 _ => false,
1534 }
1535 }
1536}
1537
1538pub fn render_parsed(output: &ParsedJsonOutput, show_thinking: bool, tty: bool) -> String {
1539 let mut renderer = FormattedRenderer::new(show_thinking, tty);
1540 let rendered = renderer.render_output(output);
1541 if rendered.is_empty() {
1542 output.final_text.clone()
1543 } else {
1544 rendered
1545 }
1546}