use serde_json::Value;
use crate::{
normalize_process_event, ByteRange, ParsedLine, ProcessEvent, RunEvent, SessionInfo,
SuggestedEdit, ToolCallEnd, ToolCallStart, UsageInfo,
};
pub fn normalize_bob_event(event: ProcessEvent) -> Vec<RunEvent> {
normalize_process_event(event, parse_bob_line)
}
pub fn parse_bob_line(line: &str) -> ParsedLine {
let trimmed = line.trim();
if trimmed.is_empty() {
return ParsedLine::default();
}
let payload: Value = match serde_json::from_str(trimmed) {
Ok(value) => value,
Err(_) => {
return ParsedLine {
text: Some(line.to_owned()),
..ParsedLine::default()
}
}
};
let Some(record) = payload.as_object() else {
return ParsedLine::default();
};
match record.get("type").and_then(Value::as_str) {
Some("message") => {
if record.get("role").and_then(Value::as_str) == Some("assistant") {
if let Some(content) = pick_string(record, "content") {
return ParsedLine {
text: Some(content),
..ParsedLine::default()
};
}
}
ParsedLine::default()
}
Some("tool_use") => {
let name = pick_string(record, "tool_name").unwrap_or_else(|| "tool".to_owned());
if name == "attempt_completion" {
return match record
.get("parameters")
.and_then(Value::as_object)
.and_then(|p| p.get("result"))
.and_then(Value::as_str)
.filter(|s| !s.is_empty())
{
Some(result) => ParsedLine {
text: Some(result.to_owned()),
..ParsedLine::default()
},
None => ParsedLine::default(),
};
}
let tool_call_id = pick_string(record, "tool_id").unwrap_or_default();
let input = record.get("parameters").map(value_to_display_string);
ParsedLine {
tool_start: Some(ToolCallStart { tool_call_id, name, input }),
..ParsedLine::default()
}
}
Some("tool_result") => {
let tool_call_id = pick_string(record, "tool_id").unwrap_or_default();
let ok = record.get("status").and_then(Value::as_str) != Some("error");
let output = record
.get("output")
.map(value_to_display_string)
.filter(|s| !s.is_empty());
ParsedLine {
tool_end: Some(ToolCallEnd { tool_call_id, ok, output }),
..ParsedLine::default()
}
}
Some("init") => ParsedLine {
session: Some(SessionInfo {
session_id: pick_string(record, "session_id"),
model: pick_string(record, "model"),
}),
..ParsedLine::default()
},
Some("result") => {
let total_tokens = record
.get("stats")
.and_then(Value::as_object)
.and_then(|s| s.get("total_tokens"))
.and_then(Value::as_u64);
ParsedLine {
usage: total_tokens.map(|t| UsageInfo {
total_tokens: Some(t),
..UsageInfo::default()
}),
..ParsedLine::default()
}
}
_ => {
let edits = parse_suggested_edits(record);
if edits.is_empty() {
ParsedLine::default()
} else {
let n = edits.len();
ParsedLine {
edits,
activity: Some(format!("{n} suggested edit{}", if n == 1 { "" } else { "s" })),
..ParsedLine::default()
}
}
}
}
}
#[derive(Debug, Default)]
pub struct BobStreamParser {
in_thinking: bool,
}
impl BobStreamParser {
pub fn parse_line(&mut self, line: &str) -> ParsedLine {
let mut parsed = parse_bob_line(line);
if let Some(content) = parsed.text.take() {
let (text, thinking) = self.route_thinking(&content);
parsed.text = text;
parsed.thinking = match (thinking, parsed.thinking.take()) {
(Some(a), Some(b)) => Some(a + &b),
(a, b) => a.or(b),
};
}
parsed
}
fn route_thinking(&mut self, content: &str) -> (Option<String>, Option<String>) {
const OPEN: &str = "<thinking>";
const CLOSE: &str = "</thinking>";
let mut text = String::new();
let mut thinking = String::new();
let mut rest = content;
loop {
if self.in_thinking {
match rest.find(CLOSE) {
Some(i) => {
thinking.push_str(&rest[..i]);
self.in_thinking = false;
rest = &rest[i + CLOSE.len()..];
}
None => {
thinking.push_str(rest);
break;
}
}
} else {
match rest.find(OPEN) {
Some(i) => {
text.push_str(&rest[..i]);
self.in_thinking = true;
rest = &rest[i + OPEN.len()..];
}
None => {
text.push_str(rest);
break;
}
}
}
}
(
(!text.is_empty()).then_some(text),
(!thinking.is_empty()).then_some(thinking),
)
}
}
fn value_to_display_string(value: &Value) -> String {
match value {
Value::String(s) => s.clone(),
other => other.to_string(),
}
}
fn pick_string(record: &serde_json::Map<String, Value>, key: &str) -> Option<String> {
match record.get(key) {
Some(Value::String(s)) if !s.is_empty() => Some(s.clone()),
_ => None,
}
}
fn pick_string_value(record: &serde_json::Map<String, Value>, key: &str) -> Option<String> {
match record.get(key) {
Some(Value::String(s)) => Some(s.clone()),
_ => None,
}
}
fn parse_suggested_edits(record: &serde_json::Map<String, Value>) -> Vec<SuggestedEdit> {
let mut edits = Vec::new();
if let Some(direct) = parse_suggested_edit(record) {
edits.push(direct);
}
for key in ["edits", "suggestedEdits", "suggestions"] {
let Some(Value::Array(items)) = record.get(key) else {
continue;
};
for item in items {
if let Some(obj) = item.as_object() {
if let Some(parsed) = parse_suggested_edit(obj) {
edits.push(parsed);
}
}
}
}
edits
}
fn parse_suggested_edit(record: &serde_json::Map<String, Value>) -> Option<SuggestedEdit> {
let file_path = pick_string(record, "filePath")
.or_else(|| pick_string(record, "path"))
.or_else(|| pick_string(record, "file"))?;
let range_record = match record.get("range").and_then(Value::as_object) {
Some(nested) => nested,
None => record,
};
let start = range_record.get("start").and_then(Value::as_u64)?;
let end = range_record.get("end").and_then(Value::as_u64)?;
let replacement = pick_string_value(record, "replacement")
.or_else(|| pick_string_value(record, "replaceWith"))
.or_else(|| pick_string_value(record, "insert"))
.or_else(|| pick_string_value(record, "newText"))?;
let title = pick_string(record, "title")
.or_else(|| pick_string(record, "summary"))
.or_else(|| pick_string(record, "description"));
Some(SuggestedEdit {
file_path,
range: ByteRange { start, end },
replacement,
title,
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn blank_line_yields_nothing() {
assert!(parse_bob_line(" ").is_empty());
}
#[test]
fn non_json_passes_through_as_text() {
let parsed = parse_bob_line("hello world");
assert_eq!(parsed.text.as_deref(), Some("hello world"));
assert!(parsed.edits.is_empty());
}
#[test]
fn assistant_message_becomes_text() {
let parsed =
parse_bob_line(r#"{"type":"message","role":"assistant","content":"hi there"}"#);
assert_eq!(parsed.text.as_deref(), Some("hi there"));
assert!(parsed.activity.is_none());
}
#[test]
fn user_message_is_skipped() {
let parsed = parse_bob_line(r#"{"type":"message","role":"user","content":"my prompt"}"#);
assert!(parsed.is_empty());
}
#[test]
fn assistant_delta_chunk_becomes_text() {
let parsed = parse_bob_line(
r#"{"type":"message","role":"assistant","content":"chunk","delta":true}"#,
);
assert_eq!(parsed.text.as_deref(), Some("chunk"));
}
#[test]
fn flat_suggested_edit_parses() {
let line = r#"{"filePath":"notes/a.md","start":3,"end":7,"replacement":"X","title":"fix"}"#;
let parsed = parse_bob_line(line);
assert_eq!(parsed.edits.len(), 1);
let edit = &parsed.edits[0];
assert_eq!(edit.file_path, "notes/a.md");
assert_eq!(edit.range, ByteRange { start: 3, end: 7 });
assert_eq!(edit.replacement, "X");
assert_eq!(edit.title.as_deref(), Some("fix"));
assert_eq!(parsed.activity.as_deref(), Some("1 suggested edit"));
}
#[test]
fn nested_range_and_array_edits_parse() {
let line = r#"{"edits":[{"path":"a.md","range":{"start":0,"end":1},"newText":""},
{"file":"b.md","range":{"start":2,"end":4},"insert":"yo"}]}"#;
let parsed = parse_bob_line(line);
assert_eq!(parsed.edits.len(), 2);
assert_eq!(parsed.edits[0].replacement, ""); assert_eq!(parsed.edits[1].replacement, "yo");
assert_eq!(parsed.activity.as_deref(), Some("2 suggested edits"));
}
#[test]
fn tool_use_becomes_tool_start() {
let parsed = parse_bob_line(
r#"{"type":"tool_use","tool_id":"tool-1","tool_name":"execute_command","parameters":{"command":"ls"}}"#,
);
let start = parsed.tool_start.expect("tool_start");
assert_eq!(start.tool_call_id, "tool-1");
assert_eq!(start.name, "execute_command");
assert_eq!(start.input.as_deref(), Some(r#"{"command":"ls"}"#));
assert!(parsed.activity.is_none());
}
#[test]
fn edit_tools_surface_as_tool_start() {
let start = parse_bob_line(
r#"{"type":"tool_use","tool_id":"t9","tool_name":"apply_diff","parameters":{"path":"a.md"}}"#,
)
.tool_start
.expect("tool_start");
assert_eq!(start.name, "apply_diff");
}
#[test]
fn tool_result_becomes_tool_end() {
let ok = parse_bob_line(
r#"{"type":"tool_result","tool_id":"tool-1","status":"success","output":"done"}"#,
)
.tool_end
.expect("tool_end");
assert_eq!(ok.tool_call_id, "tool-1");
assert!(ok.ok);
assert_eq!(ok.output.as_deref(), Some("done"));
let err = parse_bob_line(
r#"{"type":"tool_result","tool_id":"tool-2","status":"error","output":"boom"}"#,
)
.tool_end
.expect("tool_end");
assert!(!err.ok);
}
#[test]
fn init_yields_session_and_result_yields_usage() {
let init = parse_bob_line(r#"{"type":"init","session_id":"s1","model":"premium"}"#);
let session = init.session.expect("session");
assert_eq!(session.session_id.as_deref(), Some("s1"));
assert_eq!(session.model.as_deref(), Some("premium"));
assert!(init.text.is_none() && init.tool_start.is_none());
let result = parse_bob_line(
r#"{"type":"result","status":"success","stats":{"total_tokens":1280,"session_costs":3,"tool_calls":2}}"#,
);
let usage = result.usage.expect("usage");
assert_eq!(usage.total_tokens, Some(1280));
assert_eq!(usage.input_tokens, None);
assert_eq!(usage.output_tokens, None);
assert!(parse_bob_line(r#"{"type":"result","status":"success","stats":{"tool_calls":2}}"#)
.is_empty());
}
#[test]
fn incomplete_edit_is_ignored() {
let parsed = parse_bob_line(r#"{"filePath":"a.md","start":3,"replacement":"X"}"#);
assert!(parsed.edits.is_empty());
}
#[test]
fn normalize_stdout_text_event() {
let events = normalize_bob_event(ProcessEvent::Stdout {
run_id: "r1".to_owned(),
line: r#"{"type":"message","role":"assistant","content":"hi"}"#.to_owned(),
});
assert_eq!(events.len(), 1);
assert!(matches!(
&events[0],
RunEvent::Text { run_id, delta } if run_id == "r1" && delta == "hi"
));
}
#[test]
fn normalize_bob_tool_events() {
let start = normalize_bob_event(ProcessEvent::Stdout {
run_id: "r1".to_owned(),
line: r#"{"type":"tool_use","tool_id":"t1","tool_name":"write_file"}"#.to_owned(),
});
assert!(matches!(
start.as_slice(),
[RunEvent::ToolStart { tool_call_id, name, .. }]
if tool_call_id == "t1" && name == "write_file"
));
let end = normalize_bob_event(ProcessEvent::Stdout {
run_id: "r1".to_owned(),
line: r#"{"type":"tool_result","tool_id":"t1","status":"success"}"#.to_owned(),
});
assert!(matches!(
end.as_slice(),
[RunEvent::ToolEnd { tool_call_id, ok, .. }] if tool_call_id == "t1" && *ok
));
}
#[test]
fn attempt_completion_becomes_answer_text() {
let parsed = parse_bob_line(
r#"{"type":"tool_use","tool_id":"tool-2","tool_name":"attempt_completion","parameters":{"result":"The answer is 42."}}"#,
);
assert_eq!(parsed.text.as_deref(), Some("The answer is 42."));
assert!(parsed.tool_start.is_none());
}
#[test]
fn bob_stream_parser_routes_thinking_across_deltas() {
let mut parser = BobStreamParser::default();
let msg = |content: &str| {
serde_json::json!({ "type": "message", "role": "assistant", "content": content, "delta": true })
.to_string()
};
let open = parser.parse_line(&msg("<thinking>\n"));
assert_eq!(open.thinking.as_deref(), Some("\n"));
assert!(open.text.is_none());
let mid = parser.parse_line(&msg("the user wants X"));
assert_eq!(mid.thinking.as_deref(), Some("the user wants X"));
assert!(mid.text.is_none());
let close = parser.parse_line(&msg("</thinking>Hello!"));
assert!(close.thinking.is_none());
assert_eq!(close.text.as_deref(), Some("Hello!"));
let after = parser.parse_line(&msg(" more"));
assert_eq!(after.text.as_deref(), Some(" more"));
assert!(after.thinking.is_none());
}
#[test]
fn grounded_against_real_bob_capture() {
let mut parser = BobStreamParser::default();
let session = parser
.parse_line(r#"{"type":"init","session_id":"s","model":"premium"}"#)
.session
.expect("session");
assert_eq!(session.session_id.as_deref(), Some("s"));
assert_eq!(session.model.as_deref(), Some("premium"));
assert!(parser
.parse_line(r#"{"type":"message","role":"user","content":"list files"}"#)
.is_empty());
assert_eq!(
parser
.parse_line(
r#"{"type":"message","role":"assistant","content":"<thinking>\n","delta":true}"#
)
.thinking
.as_deref(),
Some("\n")
);
let _ = parser.parse_line(
r#"{"type":"message","role":"assistant","content":"</thinking>\n","delta":true}"#,
);
let start = parser
.parse_line(r#"{"type":"tool_use","tool_name":"list_files","tool_id":"tool-1","parameters":{"dir_path":"/x/docs"}}"#)
.tool_start
.expect("tool_start");
assert_eq!(start.tool_call_id, "tool-1");
assert_eq!(start.name, "list_files");
assert_eq!(start.input.as_deref(), Some(r#"{"dir_path":"/x/docs"}"#));
let end = parser
.parse_line(r#"{"type":"tool_result","tool_id":"tool-1","status":"success","output":"Listed 11 item(s)."}"#)
.tool_end
.expect("tool_end");
assert!(end.ok);
assert_eq!(end.output.as_deref(), Some("Listed 11 item(s)."));
let answer = parser.parse_line(
r#"{"type":"tool_use","tool_id":"tool-2","tool_name":"attempt_completion","parameters":{"result":"The docs directory contains 10 files."}}"#,
);
assert_eq!(answer.text.as_deref(), Some("The docs directory contains 10 files."));
assert!(answer.tool_start.is_none());
assert!(parser
.parse_line(r#"{"type":"result","status":"success","stats":{"tool_calls":2}}"#)
.is_empty());
}
#[test]
fn normalize_passes_through_lifecycle_events() {
assert!(matches!(
normalize_bob_event(ProcessEvent::Started { run_id: "r".into() }).as_slice(),
[RunEvent::Started { .. }]
));
assert!(matches!(
normalize_bob_event(ProcessEvent::Exited {
run_id: "r".into(),
exit_code: Some(0),
cancelled: false
})
.as_slice(),
[RunEvent::Exited { exit_code: Some(0), cancelled: false, .. }]
));
}
}