use std::sync::{Arc, Mutex};
use std::time::Instant;
use ratatui::style::{Color, Style};
use ratatui::text::{Line, Span};
use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
use tracing::{debug, warn};
use ralph_proto::json_rpc::RpcEvent;
use crate::state::{TaskCounts, TuiState, WaveInfo};
use crate::state_mutations::{apply_loop_completed, apply_task_active, apply_task_close};
use crate::text_renderer::{sanitize_tui_inline_text, text_to_lines, truncate};
use ralph_adapters::tool_preview::{format_tool_result, format_tool_summary};
pub async fn run_rpc_event_reader<R>(
reader: R,
state: Arc<Mutex<TuiState>>,
mut cancel_rx: tokio::sync::watch::Receiver<bool>,
) where
R: AsyncRead + Unpin,
{
let mut lines = BufReader::new(reader).lines();
let mut received_any_event = false;
let mut text_accumulator = TextAccumulator::new();
loop {
tokio::select! {
biased;
_ = cancel_rx.changed() => {
if *cancel_rx.borrow() {
debug!("RPC event reader cancelled");
break;
}
}
result = lines.next_line() => {
match result {
Ok(Some(line)) => {
let line = line.trim();
if line.is_empty() {
continue;
}
match serde_json::from_str::<RpcEvent>(line) {
Ok(event) => {
received_any_event = true;
apply_rpc_event(&event, &state, &mut text_accumulator);
}
Err(e) => {
debug!(error = %e, line = %line, "Failed to parse RPC event");
}
}
}
Ok(None) => {
debug!("RPC event reader reached EOF");
if let Ok(mut s) = state.lock() {
if !received_any_event {
warn!("Subprocess exited without sending any RPC events");
s.subprocess_error = Some(
"Subprocess exited before starting. Check .ralph/diagnostics/logs/ for details.".to_string()
);
s.start_new_iteration();
if let Some(handle) = s.latest_iteration_lines_handle()
&& let Ok(mut lines) = handle.lock()
{
lines.push(Line::from(vec![
Span::styled(
"\u{26A0} ",
ratatui::style::Style::default()
.fg(ratatui::style::Color::Red)
.add_modifier(ratatui::style::Modifier::BOLD),
),
Span::raw("Subprocess exited before starting the orchestration loop."),
]));
lines.push(Line::raw(""));
lines.push(Line::raw("Possible causes:"));
lines.push(Line::raw(" - Loop lock held by another process (stale .ralph/loop.lock)"));
lines.push(Line::raw(" - Worktree creation failed (branch name collision)"));
lines.push(Line::raw(" - Configuration error in hat/config files"));
lines.push(Line::raw(""));
lines.push(Line::raw("Check logs: .ralph/diagnostics/logs/"));
}
}
s.loop_completed = true;
s.finish_latest_iteration();
}
break;
}
Err(e) => {
warn!(error = %e, "Error reading from subprocess stdout");
break;
}
}
}
}
}
}
struct TextAccumulator {
blocks: Vec<ContentBlock>,
current_text: String,
}
enum ContentBlock {
Text(String),
NonText(Line<'static>),
}
fn format_error_line(code: &str, message: &str) -> Line<'static> {
let clean = sanitize_tui_inline_text(message);
if code == "EXECUTION_ERROR" {
Line::from(vec![
Span::styled(" ↳ ", Style::default().fg(Color::DarkGray)),
Span::styled(
format!("\u{2717} Error: {}", clean),
Style::default().fg(Color::Red),
),
])
} else {
Line::from(vec![
Span::styled(
format!("\u{26A0} [{code}] "),
Style::default().fg(Color::Yellow),
),
Span::raw(clean),
])
}
}
impl TextAccumulator {
fn new() -> Self {
Self {
blocks: Vec::new(),
current_text: String::new(),
}
}
fn push_text(&mut self, delta: &str, lines_handle: &Arc<Mutex<Vec<Line<'static>>>>) {
self.current_text.push_str(delta);
self.rebuild_lines(lines_handle);
}
fn push_non_text(
&mut self,
line: Line<'static>,
lines_handle: &Arc<Mutex<Vec<Line<'static>>>>,
) {
if !self.current_text.is_empty() {
self.blocks
.push(ContentBlock::Text(std::mem::take(&mut self.current_text)));
}
self.blocks.push(ContentBlock::NonText(line));
self.rebuild_lines(lines_handle);
}
fn reset(&mut self) {
self.blocks.clear();
self.current_text.clear();
}
fn rebuild_lines(&self, lines_handle: &Arc<Mutex<Vec<Line<'static>>>>) {
let mut all_lines = Vec::new();
for block in &self.blocks {
match block {
ContentBlock::Text(t) => all_lines.extend(text_to_lines(t)),
ContentBlock::NonText(l) => all_lines.push(l.clone()),
}
}
if !self.current_text.is_empty() {
all_lines.extend(text_to_lines(&self.current_text));
}
if let Ok(mut buf) = lines_handle.lock() {
*buf = all_lines;
}
}
}
fn apply_rpc_event(event: &RpcEvent, state: &Arc<Mutex<TuiState>>, acc: &mut TextAccumulator) {
let Ok(mut s) = state.lock() else {
return;
};
match event {
RpcEvent::LoopStarted {
max_iterations,
backend,
..
} => {
s.loop_started = Some(Instant::now());
s.max_iterations = *max_iterations;
s.pending_backend = Some(backend.clone());
}
RpcEvent::IterationStart {
iteration,
max_iterations,
hat_display,
backend,
..
} => {
s.max_iterations = *max_iterations;
s.pending_backend = Some(backend.clone());
acc.reset();
s.start_new_iteration_with_metadata(Some(hat_display.clone()), Some(backend.clone()));
s.iteration = *iteration;
s.iteration_started = Some(Instant::now());
s.last_event = Some("iteration_start".to_string());
s.last_event_at = Some(Instant::now());
}
RpcEvent::IterationEnd {
loop_complete_triggered,
..
} => {
s.prev_iteration = s.iteration;
s.finish_latest_iteration();
if *loop_complete_triggered {
s.final_loop_elapsed = s.loop_started.map(|start| start.elapsed());
}
s.last_event = Some("iteration_end".to_string());
s.last_event_at = Some(Instant::now());
}
RpcEvent::TextDelta { delta, .. } => {
if let Some(handle) = s.latest_iteration_lines_handle() {
acc.push_text(delta, &handle);
}
s.last_event = Some("text_delta".to_string());
s.last_event_at = Some(Instant::now());
}
RpcEvent::ToolCallStart {
tool_name, input, ..
} => {
let mut spans = vec![Span::styled(
format!("\u{2699} [{}]", tool_name),
Style::default().fg(Color::Blue),
)];
if !tool_name.contains(' ')
&& let Some(summary) = format_tool_summary(tool_name, input)
{
let summary = sanitize_tui_inline_text(&summary);
spans.push(Span::styled(
format!(" {}", summary),
Style::default().fg(Color::DarkGray),
));
}
let line = Line::from(spans);
if let Some(handle) = s.latest_iteration_lines_handle() {
acc.push_non_text(line, &handle);
}
s.last_event = Some("tool_call_start".to_string());
s.last_event_at = Some(Instant::now());
}
RpcEvent::ToolCallEnd {
output, is_error, ..
} => {
let display = format_tool_result(output);
if display.is_empty() {
s.last_event = Some("tool_call_end".to_string());
s.last_event_at = Some(Instant::now());
return;
}
let clean = sanitize_tui_inline_text(&display);
let truncated = truncate(&clean, 200);
let line = if *is_error {
Line::from(vec![
Span::styled(" ↳ ", Style::default().fg(Color::DarkGray)),
Span::styled("✗ ", Style::default().fg(Color::Red)),
Span::styled(truncated, Style::default().fg(Color::Red)),
])
} else {
Line::from(vec![
Span::styled(" ↳ ", Style::default().fg(Color::DarkGray)),
Span::styled("✓ ", Style::default().fg(Color::Green)),
Span::styled(truncated, Style::default().fg(Color::DarkGray)),
])
};
if let Some(handle) = s.latest_iteration_lines_handle() {
acc.push_non_text(line, &handle);
}
s.last_event = Some("tool_call_end".to_string());
s.last_event_at = Some(Instant::now());
}
RpcEvent::Error { code, message, .. } => {
let line = format_error_line(code, message);
if let Some(handle) = s.latest_iteration_lines_handle() {
acc.push_non_text(line, &handle);
}
s.last_event = Some("error".to_string());
s.last_event_at = Some(Instant::now());
}
RpcEvent::HatChanged {
to_hat,
to_hat_display,
..
} => {
use ralph_proto::HatId;
s.pending_hat = Some((HatId::new(to_hat), to_hat_display.clone()));
s.last_event = Some("hat_changed".to_string());
s.last_event_at = Some(Instant::now());
}
RpcEvent::TaskStatusChanged {
task_id,
to_status,
title,
..
} => {
match to_status.as_str() {
"running" | "in_progress" => {
apply_task_active(&mut s, task_id, title, to_status);
}
"closed" | "done" | "completed" => {
apply_task_close(&mut s, task_id);
}
_ => {}
}
s.last_event = Some("task_status_changed".to_string());
s.last_event_at = Some(Instant::now());
}
RpcEvent::TaskCountsUpdated {
total,
open,
closed,
ready,
} => {
s.set_task_counts(TaskCounts::new(*total, *open, *closed, *ready));
s.last_event = Some("task_counts_updated".to_string());
s.last_event_at = Some(Instant::now());
}
RpcEvent::GuidanceAck { .. } => {
s.last_event = Some("guidance_ack".to_string());
s.last_event_at = Some(Instant::now());
}
RpcEvent::LoopTerminated {
total_iterations, ..
} => {
s.iteration = *total_iterations;
apply_loop_completed(&mut s);
s.last_event = Some("loop_terminated".to_string());
s.last_event_at = Some(Instant::now());
}
RpcEvent::Response { .. } => {
s.last_event = Some("response".to_string());
s.last_event_at = Some(Instant::now());
}
RpcEvent::OrchestrationEvent { topic, .. } => {
s.last_event = Some(topic.clone());
s.last_event_at = Some(Instant::now());
}
RpcEvent::WaveStarted {
hat_name,
worker_count,
timeout_secs,
} => {
s.wave_active = Some(WaveInfo::new(hat_name.clone(), *worker_count));
s.wave_active_iteration_idx = Some(s.iterations.len().saturating_sub(1));
let line = Line::from(vec![
Span::styled("── WAVE: ", Style::default().fg(Color::Magenta)),
Span::styled(
format!(
"{} | {} workers | timeout {}s",
hat_name, worker_count, timeout_secs
),
Style::default().fg(Color::Magenta),
),
Span::styled(
" ──────────────────────",
Style::default().fg(Color::Magenta),
),
]);
if let Some(handle) = s.latest_iteration_lines_handle() {
acc.push_non_text(line, &handle);
}
s.last_event = Some("wave_started".to_string());
s.last_event_at = Some(Instant::now());
}
RpcEvent::WaveWorkerDone {
index,
total,
duration_ms,
success,
payload_preview,
} => {
if let Some(ref mut wave) = s.wave_active {
wave.completed += 1;
}
let secs = *duration_ms / 1000;
let (icon, color) = if *success {
("\u{2713}", Color::Green)
} else {
("\u{2717}", Color::Red)
};
let status_word = if *success { "done" } else { "failed" };
let preview = truncate(payload_preview, 60);
let line = Line::from(vec![
Span::styled(format!(" {} ", icon), Style::default().fg(color)),
Span::raw(format!(
"Worker {}/{} {} ({}s) — {}",
index + 1,
total,
status_word,
secs,
preview
)),
]);
if let Some(handle) = s.latest_iteration_lines_handle() {
acc.push_non_text(line, &handle);
}
s.last_event = Some("wave_worker_done".to_string());
s.last_event_at = Some(Instant::now());
}
RpcEvent::WaveWorkerTextDelta {
worker_index,
delta,
} => {
tracing::debug!(
worker = worker_index,
delta_len = delta.len(),
wave_active = s.wave_active.is_some(),
"RPC: received WaveWorkerTextDelta"
);
if let Some(ref wave) = s.wave_active
&& let Some(buffer) = wave.worker_buffers.get(*worker_index as usize)
{
let new_lines = text_to_lines(delta);
let handle = buffer.lines_handle();
if let Ok(mut lines) = handle.lock() {
tracing::debug!(
worker = worker_index,
new_lines = new_lines.len(),
total_lines = lines.len() + new_lines.len(),
"RPC: appending lines to wave worker buffer"
);
lines.extend(new_lines);
}
} else {
tracing::warn!(
worker = worker_index,
wave_active = s.wave_active.is_some(),
"RPC: WaveWorkerTextDelta dropped — no wave_active or buffer"
);
}
s.last_event = Some("wave_worker_text_delta".to_string());
s.last_event_at = Some(Instant::now());
}
RpcEvent::WaveCompleted {
succeeded,
failed,
duration_ms,
} => {
let wave_iter_idx = s.wave_active_iteration_idx.take();
if let Some(wave) = s.wave_active.take() {
let target_idx = wave_iter_idx.unwrap_or(s.iterations.len().saturating_sub(1));
if let Some(buf) = s.iterations.get_mut(target_idx) {
buf.wave_info = Some(wave);
}
}
let secs = *duration_ms / 1000;
let color = if *failed > 0 {
Color::Yellow
} else {
Color::Green
};
let line = Line::from(Span::styled(
format!(
"── Wave complete: {} succeeded, {} failed ({}s) ──────────────────────",
succeeded, failed, secs
),
Style::default().fg(color),
));
if let Some(handle) = s.latest_iteration_lines_handle() {
acc.push_non_text(line, &handle);
}
s.last_event = Some("wave_completed".to_string());
s.last_event_at = Some(Instant::now());
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use ralph_proto::json_rpc::{RpcEvent, TerminationReason};
use serde_json::json;
fn make_state() -> Arc<Mutex<TuiState>> {
Arc::new(Mutex::new(TuiState::new()))
}
fn make_acc() -> TextAccumulator {
TextAccumulator::new()
}
#[test]
fn test_loop_started_sets_timer() {
let state = make_state();
let mut acc = make_acc();
{
let mut s = state.lock().unwrap();
s.loop_started = None;
}
let event = RpcEvent::LoopStarted {
prompt: "test".to_string(),
max_iterations: Some(10),
backend: "claude".to_string(),
started_at: 0,
};
apply_rpc_event(&event, &state, &mut acc);
let s = state.lock().unwrap();
assert!(s.loop_started.is_some());
assert_eq!(s.max_iterations, Some(10));
}
#[test]
fn test_iteration_start_creates_buffer() {
let state = make_state();
let mut acc = make_acc();
let event = RpcEvent::IterationStart {
iteration: 1,
max_iterations: Some(10),
hat: "builder".to_string(),
hat_display: "🔨Builder".to_string(),
backend: "claude".to_string(),
started_at: 0,
};
apply_rpc_event(&event, &state, &mut acc);
let s = state.lock().unwrap();
assert_eq!(s.total_iterations(), 1);
assert_eq!(s.iteration, 1);
}
#[test]
fn test_text_delta_appends_content() {
let state = make_state();
let mut acc = make_acc();
let start_event = RpcEvent::IterationStart {
iteration: 1,
max_iterations: None,
hat: "builder".to_string(),
hat_display: "🔨Builder".to_string(),
backend: "claude".to_string(),
started_at: 0,
};
apply_rpc_event(&start_event, &state, &mut acc);
let text_event = RpcEvent::TextDelta {
iteration: 1,
delta: "Hello world".to_string(),
};
apply_rpc_event(&text_event, &state, &mut acc);
let s = state.lock().unwrap();
let lines = s.iterations[0].lines.lock().unwrap();
assert!(!lines.is_empty());
}
#[test]
fn test_tool_call_start_adds_header() {
let state = make_state();
let mut acc = make_acc();
let start_event = RpcEvent::IterationStart {
iteration: 1,
max_iterations: None,
hat: "builder".to_string(),
hat_display: "🔨Builder".to_string(),
backend: "claude".to_string(),
started_at: 0,
};
apply_rpc_event(&start_event, &state, &mut acc);
let tool_event = RpcEvent::ToolCallStart {
iteration: 1,
tool_name: "Bash".to_string(),
tool_call_id: "tool_1".to_string(),
input: json!({"command": "ls -la"}),
};
apply_rpc_event(&tool_event, &state, &mut acc);
let s = state.lock().unwrap();
let lines = s.iterations[0].lines.lock().unwrap();
assert_eq!(lines.len(), 1);
}
#[test]
fn test_tool_call_start_sanitizes_multiline_summary() {
let state = make_state();
let mut acc = make_acc();
apply_rpc_event(
&RpcEvent::IterationStart {
iteration: 1,
max_iterations: None,
hat: "builder".to_string(),
hat_display: "🔨Builder".to_string(),
backend: "claude".to_string(),
started_at: 0,
},
&state,
&mut acc,
);
apply_rpc_event(
&RpcEvent::ToolCallStart {
iteration: 1,
tool_name: "Bash".to_string(),
tool_call_id: "tool_1".to_string(),
input: json!({"command": "printf 'first line'\nprintf 'second line'"}),
},
&state,
&mut acc,
);
let s = state.lock().unwrap();
let lines = s.iterations[0].lines.lock().unwrap();
let rendered = lines[0].to_string();
assert!(!rendered.contains('\n'));
assert!(rendered.contains("printf 'first line' printf 'second line'"));
}
#[test]
fn test_tool_call_end_sanitizes_multiline_result_preview() {
let state = make_state();
let mut acc = make_acc();
apply_rpc_event(
&RpcEvent::IterationStart {
iteration: 1,
max_iterations: None,
hat: "builder".to_string(),
hat_display: "🔨Builder".to_string(),
backend: "pi".to_string(),
started_at: 0,
},
&state,
&mut acc,
);
apply_rpc_event(
&RpcEvent::ToolCallEnd {
iteration: 1,
tool_call_id: "tool_1".to_string(),
output: "# Demo README\nSecond line of context.".to_string(),
is_error: false,
duration_ms: 12,
},
&state,
&mut acc,
);
let s = state.lock().unwrap();
let lines = s.iterations[0].lines.lock().unwrap();
let rendered = lines[0].to_string();
assert_eq!(rendered, " ↳ ✓ # Demo README • Second line of context.");
}
#[test]
fn test_execution_error_preserves_order_with_following_text() {
let state = make_state();
let mut acc = make_acc();
apply_rpc_event(
&RpcEvent::IterationStart {
iteration: 1,
max_iterations: None,
hat: "builder".to_string(),
hat_display: "🔨Builder".to_string(),
backend: "pi".to_string(),
started_at: 0,
},
&state,
&mut acc,
);
apply_rpc_event(
&RpcEvent::TextDelta {
iteration: 1,
delta: "Before failure. ".to_string(),
},
&state,
&mut acc,
);
apply_rpc_event(
&RpcEvent::Error {
iteration: 1,
code: "EXECUTION_ERROR".to_string(),
message: "permission denied: /root/summary.md".to_string(),
recoverable: true,
},
&state,
&mut acc,
);
apply_rpc_event(
&RpcEvent::TextDelta {
iteration: 1,
delta: "Recovered after failure.".to_string(),
},
&state,
&mut acc,
);
let s = state.lock().unwrap();
let lines = s.iterations[0].lines.lock().unwrap();
let rendered: Vec<String> = lines.iter().map(|line| line.to_string()).collect();
let before_idx = rendered
.iter()
.position(|line| line.contains("Before failure."))
.expect("before text should remain visible");
let error_idx = rendered
.iter()
.position(|line| line.contains("Error: permission denied"))
.expect("error line should remain visible");
let after_idx = rendered
.iter()
.position(|line| line.contains("Recovered after failure."))
.expect("after text should remain visible");
assert!(
before_idx < error_idx,
"error should stay after preceding text: {rendered:?}"
);
assert!(
error_idx < after_idx,
"error should stay before following text: {rendered:?}"
);
}
#[test]
fn test_execution_error_uses_legacy_style_red_error_line() {
let line = format_error_line("EXECUTION_ERROR", "permission denied");
assert_eq!(line.to_string(), " ↳ ✗ Error: permission denied");
assert_eq!(line.spans[1].style.fg, Some(Color::Red));
}
#[test]
fn test_loop_terminated_marks_complete() {
let state = make_state();
let mut acc = make_acc();
let event = RpcEvent::LoopTerminated {
reason: TerminationReason::Completed,
total_iterations: 5,
duration_ms: 10000,
total_cost_usd: 0.25,
terminated_at: 0,
};
apply_rpc_event(&event, &state, &mut acc);
let s = state.lock().unwrap();
assert!(s.loop_completed);
assert_eq!(s.iteration, 5);
}
#[test]
fn test_task_counts_updated() {
let state = make_state();
let mut acc = make_acc();
let event = RpcEvent::TaskCountsUpdated {
total: 10,
open: 3,
closed: 7,
ready: 2,
};
apply_rpc_event(&event, &state, &mut acc);
let s = state.lock().unwrap();
assert_eq!(s.task_counts.total, 10);
assert_eq!(s.task_counts.open, 3);
assert_eq!(s.task_counts.closed, 7);
assert_eq!(s.task_counts.ready, 2);
}
#[test]
fn test_small_text_deltas_form_flowing_paragraph_not_one_per_line() {
let state = make_state();
let mut acc = make_acc();
apply_rpc_event(
&RpcEvent::IterationStart {
iteration: 1,
max_iterations: None,
hat: "scoper".to_string(),
hat_display: "🔎 Scoper".to_string(),
backend: "pi".to_string(),
started_at: 0,
},
&state,
&mut acc,
);
let deltas = vec![
"Rust",
" is a systems",
" programming language",
" that runs",
" blazingly fast,",
" prevents segfaults,",
" and guarantees",
" thread safety.",
];
for delta in deltas {
apply_rpc_event(
&RpcEvent::TextDelta {
iteration: 1,
delta: delta.to_string(),
},
&state,
&mut acc,
);
}
let s = state.lock().unwrap();
let lines = s.iterations[0].lines.lock().unwrap();
assert!(
lines.len() <= 3,
"Small text deltas without newlines should form a flowing paragraph, \
not one line per delta. Expected <= 3 lines but got {} lines: {:?}",
lines.len(),
lines.iter().map(|l| l.to_string()).collect::<Vec<_>>()
);
let full_text: String = lines.iter().map(|l| l.to_string()).collect();
assert!(
full_text.contains("Rust is a systems programming language"),
"Text should flow as a paragraph. Got: {:?}",
full_text
);
}
#[test]
fn test_text_deltas_frozen_by_tool_call_preserve_order() {
let state = make_state();
let mut acc = make_acc();
apply_rpc_event(
&RpcEvent::IterationStart {
iteration: 1,
max_iterations: None,
hat: "builder".to_string(),
hat_display: "🔨 Builder".to_string(),
backend: "pi".to_string(),
started_at: 0,
},
&state,
&mut acc,
);
for delta in ["I'll ", "review ", "the code."] {
apply_rpc_event(
&RpcEvent::TextDelta {
iteration: 1,
delta: delta.to_string(),
},
&state,
&mut acc,
);
}
apply_rpc_event(
&RpcEvent::ToolCallStart {
iteration: 1,
tool_name: "Read".to_string(),
tool_call_id: "t1".to_string(),
input: json!({"file_path": "src/main.rs"}),
},
&state,
&mut acc,
);
for delta in ["Now ", "checking."] {
apply_rpc_event(
&RpcEvent::TextDelta {
iteration: 1,
delta: delta.to_string(),
},
&state,
&mut acc,
);
}
let s = state.lock().unwrap();
let lines = s.iterations[0].lines.lock().unwrap();
let line_strs: Vec<String> = lines.iter().map(|l| l.to_string()).collect();
let text1_idx = line_strs.iter().position(|l| l.contains("review the code"));
let tool_idx = line_strs.iter().position(|l| l.contains("Read"));
let text2_idx = line_strs.iter().position(|l| l.contains("checking"));
assert!(
text1_idx.is_some(),
"text1 should be present: {:?}",
line_strs
);
assert!(
tool_idx.is_some(),
"tool should be present: {:?}",
line_strs
);
assert!(
text2_idx.is_some(),
"text2 should be present: {:?}",
line_strs
);
assert!(
text1_idx.unwrap() < tool_idx.unwrap(),
"text1 should come before tool: {:?}",
line_strs
);
assert!(
tool_idx.unwrap() < text2_idx.unwrap(),
"tool should come before text2: {:?}",
line_strs
);
}
#[test]
fn test_format_tool_summary() {
assert_eq!(
format_tool_summary("Read", &json!({"path": "/foo/bar.rs"})),
Some("/foo/bar.rs".to_string())
);
assert_eq!(
format_tool_summary("Edit", &json!({"file_path": "/foo/bar.rs"})),
Some("/foo/bar.rs".to_string())
);
assert_eq!(
format_tool_summary("Bash", &json!({"command": "ls"})),
Some("ls".to_string())
);
assert_eq!(format_tool_summary("Unknown", &json!({})), None);
assert_eq!(
format_tool_summary("read", &json!({"path": "/foo/bar.rs"})),
Some("/foo/bar.rs".to_string())
);
assert_eq!(
format_tool_summary("shell", &json!({"command": "cargo test"})),
Some("cargo test".to_string())
);
assert_eq!(
format_tool_summary("ls", &json!({"path": "/src"})),
Some("/src".to_string())
);
assert_eq!(
format_tool_summary("grep", &json!({"pattern": "TODO"})),
Some("TODO".to_string())
);
}
#[test]
fn test_truncate() {
use crate::text_renderer::truncate;
assert_eq!(truncate("hello", 10), "hello");
assert_eq!(truncate("hello world", 5), "hello...");
}
#[tokio::test]
async fn test_eof_without_events_sets_subprocess_error() {
let empty_input: &[u8] = b"";
let state = make_state();
let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
run_rpc_event_reader(empty_input, state.clone(), cancel_rx).await;
let s = state.lock().unwrap();
assert!(
s.subprocess_error.is_some(),
"should set subprocess_error on EOF without events"
);
assert!(s.loop_completed, "should mark loop as completed");
}
#[tokio::test]
async fn test_eof_without_events_creates_error_iteration() {
let empty_input: &[u8] = b"";
let state = make_state();
let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
run_rpc_event_reader(empty_input, state.clone(), cancel_rx).await;
let s = state.lock().unwrap();
assert_eq!(s.total_iterations(), 1, "should create one error iteration");
let lines = s.iterations[0].lines.lock().unwrap();
let text: String = lines
.iter()
.flat_map(|l| l.spans.iter().map(|s| s.content.to_string()))
.collect();
assert!(
text.contains("Subprocess exited"),
"error iteration should contain error message, got: {}",
text
);
}
#[tokio::test]
async fn test_eof_after_loop_started_does_not_set_subprocess_error() {
let event = RpcEvent::LoopStarted {
prompt: "test".to_string(),
max_iterations: Some(10),
backend: "claude".to_string(),
started_at: 0,
};
let line = format!("{}\n", serde_json::to_string(&event).unwrap());
let state = make_state();
let (_cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
run_rpc_event_reader(line.as_bytes(), state.clone(), cancel_rx).await;
let s = state.lock().unwrap();
assert!(
s.subprocess_error.is_none(),
"should NOT set subprocess_error when events were received"
);
}
}