use std::collections::BTreeMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use base64::Engine as _;
use parking_lot::Mutex;
use serde_json::{json, Value};
use tokio::sync::{broadcast, Notify};
use tracing::{debug, warn};
use uuid::Uuid;
use crate::backends::dispatch::{dispatch_post_turn, dispatch_tool_call, gate_pre_turn};
use crate::backends::anthropic::api::SharedClient;
use crate::backends::anthropic::wire::{
Block, BlockDelta, ImageSource, Message, MessagesRequest, Role, StopReason, StreamEvent,
ThinkingConfig, ToolDef, WireUsage, DEFAULT_MAX_TOKENS,
};
use crate::backends::gemini::tools::FINISH_TOOL_NAME;
use crate::backends::stream_timeout::{idle_timeout_ms, next_with_idle_timeout, NextChunk};
use crate::content::{Content, Part as ApiPart};
use crate::error::{Error, Result};
use crate::hooks::{HookRunner, SessionContext};
use crate::tools::ToolRunner;
use crate::types::{
Step, StepStatus, StreamChunk, SystemInstructions, ThinkingLevel, ToolCall, ToolResult,
UsageMetadata,
};
const MAX_TOOL_ROUNDS: u32 = 16;
const MAX_PAUSE_RESUMES: u32 = 8;
#[derive(Clone)]
pub(crate) struct LoopConfig {
pub model: String,
pub system: Option<String>,
pub thinking: Option<ThinkingLevel>,
pub temperature: Option<f32>,
pub max_tokens: u32,
pub tool_declarations: Vec<ToolDef>,
pub compaction_threshold: Option<u32>,
}
impl LoopConfig {
pub fn from_system(
model: String,
system: Option<&SystemInstructions>,
thinking: Option<ThinkingLevel>,
temperature: Option<f32>,
max_tokens: Option<u32>,
tool_declarations: Vec<ToolDef>,
compaction_threshold: Option<u32>,
) -> Result<Self> {
let system = system.map(render_system);
Ok(Self {
model,
system,
thinking,
temperature,
max_tokens: max_tokens.unwrap_or(DEFAULT_MAX_TOKENS),
tool_declarations,
compaction_threshold,
})
}
}
pub(crate) use crate::backends::render_system;
pub(crate) struct LoopState {
pub history: Mutex<Vec<Message>>,
pub idle: Arc<AtomicBool>,
pub idle_notify: Arc<Notify>,
pub cancel: Arc<AtomicBool>,
pub steps: broadcast::Sender<Step>,
pub next_step_index: AtomicU32,
pub last_turn_usage: Mutex<Option<UsageMetadata>>,
pub last_structured_output: Mutex<Option<Value>>,
}
impl LoopState {
pub fn new(steps: broadcast::Sender<Step>) -> Self {
Self {
history: Mutex::new(Vec::new()),
idle: Arc::new(AtomicBool::new(true)),
idle_notify: Arc::new(Notify::new()),
cancel: Arc::new(AtomicBool::new(false)),
steps,
next_step_index: AtomicU32::new(0),
last_turn_usage: Mutex::new(None),
last_structured_output: Mutex::new(None),
}
}
fn alloc_step_index(&self) -> u32 {
self.next_step_index.fetch_add(1, Ordering::Relaxed)
}
fn emit(&self, step: Step) {
let _ = self.steps.send(step);
}
}
pub(crate) fn to_wire_user_content(content: Content) -> Result<Message> {
let mut blocks: Vec<Block> = Vec::with_capacity(content.parts.len().max(1));
for p in content.parts {
match p {
ApiPart::Text(t) => blocks.push(Block::Text { text: t }),
ApiPart::Media(m) => blocks.push(Block::Image {
source: ImageSource {
source_type: "base64".to_string(),
media_type: m.mime_type,
data: base64::engine::general_purpose::STANDARD.encode(m.data.as_ref()),
},
}),
}
}
if blocks.is_empty() {
return Err(Error::config("empty content"));
}
Ok(Message {
role: Role::User,
content: blocks,
})
}
#[derive(Clone)]
pub(crate) struct TurnDeps {
pub client: SharedClient,
pub config: LoopConfig,
pub state: Arc<LoopState>,
pub tool_runner: Option<Arc<ToolRunner>>,
pub hook_runner: Option<Arc<HookRunner>>,
pub session_ctx: Option<SessionContext>,
}
#[derive(Default)]
struct ToolUseAccum {
id: String,
name: String,
args_json: String,
}
#[derive(Default)]
struct ThinkingAccum {
thinking: String,
signature: String,
}
pub(crate) async fn run_turn(deps: TurnDeps, user: Message, prompt: Content) -> Result<()> {
deps.state.idle.store(false, Ordering::Release);
deps.state.cancel.store(false, Ordering::Release);
let turn_ctx = deps
.session_ctx
.as_ref()
.map(|s| s.child())
.unwrap_or_default();
if let Some(denied) = gate_pre_turn(deps.hook_runner.as_ref(), &turn_ctx, &prompt).await {
emit_error(&deps.state, denied.clone());
deps.state.idle.store(true, Ordering::Release);
deps.state.idle_notify.notify_waiters();
return Err(Error::other(denied));
}
{
let mut hist = deps.state.history.lock();
hist.push(user);
}
*deps.state.last_turn_usage.lock() = Some(UsageMetadata::default());
*deps.state.last_structured_output.lock() = None;
let mut rounds = 0u32;
let mut last_text = String::new();
let mut last_stop: Option<StopReason> = None;
let mut finished_turn = false;
let trajectory_id = Uuid::new_v4().to_string();
loop {
rounds += 1;
if rounds > MAX_TOOL_ROUNDS {
warn!(rounds, "exceeded MAX_TOOL_ROUNDS; forcing turn end");
break;
}
if deps.state.cancel.load(Ordering::Acquire) {
debug!("turn cancelled before model call");
break;
}
let step_index = deps.state.alloc_step_index();
let mut accumulated_text = String::new();
let mut thinking_blocks: BTreeMap<u32, ThinkingAccum> = BTreeMap::new();
let mut tool_blocks: BTreeMap<u32, ToolUseAccum> = BTreeMap::new();
let mut stop_reason: Option<StopReason> = None;
let mut round_usage = WireUsage::default();
let mut pause_resumes = 0u32;
let paused = 'request: loop {
let request = build_request(&deps.config, &deps.state.history.lock());
let mut stream = match deps.client.stream_messages(&request).await {
Ok(s) => s,
Err(e) => {
emit_error(&deps.state, e.to_string());
deps.state.idle.store(true, Ordering::Release);
deps.state.idle_notify.notify_waiters();
return Err(e);
}
};
let idle_ms = idle_timeout_ms();
loop {
let ev_res = match next_with_idle_timeout(&mut stream, idle_ms).await {
NextChunk::Item(item) => item,
NextChunk::End => break,
NextChunk::IdleTimeout => {
let e = Error::other(format!(
"model stream stalled — no data for {}s",
idle_ms / 1000
));
emit_error(&deps.state, e.to_string());
deps.state.idle.store(true, Ordering::Release);
deps.state.idle_notify.notify_waiters();
return Err(e);
}
};
if deps.state.cancel.load(Ordering::Acquire) {
break;
}
let ev = match ev_res {
Ok(e) => e,
Err(e) => {
emit_error(&deps.state, e.to_string());
deps.state.idle.store(true, Ordering::Release);
deps.state.idle_notify.notify_waiters();
return Err(e);
}
};
match ev {
StreamEvent::MessageStart { message } => {
if let Some(u) = message.usage {
accumulate_wire_usage(&mut round_usage, &u);
}
}
StreamEvent::ContentBlockStart {
index,
content_block,
} => {
match content_block {
Block::ToolUse { id, name, .. } => {
tool_blocks.insert(
index,
ToolUseAccum {
id,
name,
args_json: String::new(),
},
);
}
Block::Text { text } => {
if !text.is_empty() {
accumulated_text.push_str(&text);
deps.state.emit(Step::text_delta(
&trajectory_id,
step_index,
&text,
));
}
}
_ => {}
}
}
StreamEvent::ContentBlockDelta { index, delta } => match delta {
BlockDelta::TextDelta { text } => {
if !text.is_empty() {
accumulated_text.push_str(&text);
deps.state
.emit(Step::text_delta(&trajectory_id, step_index, &text));
}
}
BlockDelta::ThinkingDelta { thinking } => {
if !thinking.is_empty() {
thinking_blocks
.entry(index)
.or_default()
.thinking
.push_str(&thinking);
deps.state.emit(Step::thought_delta(
&trajectory_id,
step_index,
&thinking,
));
}
}
BlockDelta::SignatureDelta { signature } => {
thinking_blocks.entry(index).or_default().signature = signature;
}
BlockDelta::InputJsonDelta { partial_json } => {
if let Some(acc) = tool_blocks.get_mut(&index) {
acc.args_json.push_str(&partial_json);
}
}
_ => {}
},
StreamEvent::ContentBlockStop { .. } => {}
StreamEvent::MessageDelta { delta, usage } => {
if let Some(r) = delta.stop_reason {
stop_reason = Some(r);
}
if let Some(u) = usage {
accumulate_wire_usage(&mut round_usage, &u);
}
}
StreamEvent::MessageStop => {}
StreamEvent::Error { error } => {
let msg = format!("anthropic stream error [{}]: {}", error.kind, error.message);
emit_error(&deps.state, msg.clone());
deps.state.idle.store(true, Ordering::Release);
deps.state.idle_notify.notify_waiters();
return Err(Error::other(msg));
}
StreamEvent::Ping | StreamEvent::Unknown => {}
}
}
if matches!(stop_reason, Some(StopReason::PauseTurn))
&& !deps.state.cancel.load(Ordering::Acquire)
&& pause_resumes < MAX_PAUSE_RESUMES
{
pause_resumes += 1;
debug!(pause_resumes, "anthropic pause_turn; resuming");
stop_reason = None;
continue 'request;
}
break 'request matches!(stop_reason, Some(StopReason::PauseTurn));
};
let mut pending_calls: Vec<(String, String, Value, Option<String>)> = Vec::new();
for (_idx, acc) in tool_blocks {
let (args, parse_error) = resolve_tool_args(&acc.name, &acc.args_json);
pending_calls.push((acc.id, acc.name, args, parse_error));
}
let mut assistant_blocks: Vec<Block> = Vec::new();
for (_idx, acc) in std::mem::take(&mut thinking_blocks) {
if !acc.thinking.is_empty() && !acc.signature.is_empty() {
assistant_blocks.push(Block::Thinking {
thinking: acc.thinking,
signature: Some(acc.signature),
});
}
}
if !accumulated_text.is_empty() {
assistant_blocks.push(Block::Text {
text: accumulated_text.clone(),
});
}
for (id, name, args, _parse_error) in &pending_calls {
assistant_blocks.push(Block::ToolUse {
id: id.clone(),
name: name.clone(),
input: args.clone(),
});
}
if !assistant_blocks.is_empty() {
deps.state.history.lock().push(Message {
role: Role::Assistant,
content: assistant_blocks,
});
}
let usage: UsageMetadata = round_usage.into();
if usage != UsageMetadata::default() {
let mut slot = deps.state.last_turn_usage.lock();
match slot.as_mut() {
Some(acc) => acc.merge_round(&usage),
None => *slot = Some(usage),
}
}
last_text = accumulated_text;
last_stop = stop_reason;
if pending_calls.is_empty() || paused {
break;
}
if deps.state.cancel.load(Ordering::Acquire) {
debug!("turn cancelled before tool dispatch");
break;
}
let mut result_blocks: Vec<Block> = Vec::with_capacity(pending_calls.len());
let mut saw_finish = false;
for (id, name, args, parse_error) in pending_calls {
if let Some(msg) = parse_error {
let post_result = ToolResult {
name: name.clone(),
id: Some(id.clone()),
result: Some(json!({ "error": msg.clone() })),
error: Some(msg.clone()),
};
deps.state
.emit_chunk_step(StreamChunk::ToolResult(post_result));
result_blocks.push(Block::ToolResult {
tool_use_id: id,
content: tool_result_content(&json!({ "error": msg })),
is_error: Some(true),
});
continue;
}
if name == FINISH_TOOL_NAME {
if let Some(out) = args.get("output").cloned() {
*deps.state.last_structured_output.lock() = Some(out);
}
saw_finish = true;
result_blocks.push(Block::ToolResult {
tool_use_id: id,
content: tool_result_content(&json!({ "ok": true })),
is_error: None,
});
continue;
}
let tool_call = ToolCall {
name: name.clone(),
args: args.clone(),
id: Some(id.clone()),
canonical_path: extract_canonical_path(&args),
};
deps.state
.emit_chunk_step(StreamChunk::ToolCall(tool_call.clone()));
let post_result = dispatch_tool_call(
deps.tool_runner.as_ref(),
deps.hook_runner.as_ref(),
&turn_ctx,
&tool_call,
)
.await;
let result_value = post_result.result.clone().unwrap_or(Value::Null);
let is_error = post_result.error.is_some();
deps.state
.emit_chunk_step(StreamChunk::ToolResult(post_result.clone()));
result_blocks.push(Block::ToolResult {
tool_use_id: id,
content: tool_result_content(&result_value),
is_error: is_error.then_some(true),
});
}
deps.state.history.lock().push(Message {
role: Role::User,
content: result_blocks,
});
if saw_finish {
finished_turn = true;
break;
}
}
let usage = deps.state.last_turn_usage.lock().clone().unwrap_or_default();
let usage_opt = if usage == UsageMetadata::default() {
None
} else {
Some(usage.clone())
};
let (status, error_msg): (StepStatus, &str) = match last_stop {
Some(StopReason::Refusal) => (StepStatus::Error, "stopped by refusal"),
Some(StopReason::MaxTokens) => (StepStatus::Done, "stopped at max tokens"),
Some(StopReason::PauseTurn) => (StepStatus::Done, "paused (resume cap reached)"),
_ => (StepStatus::Done, ""),
};
let structured = deps.state.last_structured_output.lock().clone();
let terminal = Step::turn_complete(
trajectory_id,
deps.state.alloc_step_index(),
status,
last_text.as_str(),
error_msg,
finished_turn,
structured,
usage_opt,
);
deps.state.emit(terminal);
dispatch_post_turn(deps.hook_runner.as_ref(), &turn_ctx, &last_text).await;
let used = usage.prompt_token_count;
if crate::backends::anthropic::compaction::should_compact(used, deps.config.compaction_threshold)
{
debug!(
used,
threshold = ?deps.config.compaction_threshold,
"compaction triggered"
);
crate::backends::anthropic::compaction::try_compact(
&deps.state.history,
&deps.client,
&deps.config.model,
)
.await;
}
deps.state.idle.store(true, Ordering::Release);
deps.state.idle_notify.notify_waiters();
debug!(?last_stop, rounds, "turn complete");
Ok(())
}
fn resolve_tool_args(name: &str, args_json: &str) -> (Value, Option<String>) {
if args_json.trim().is_empty() {
return (json!({}), None);
}
match serde_json::from_str(args_json) {
Ok(v) => (v, None),
Err(e) => {
let msg = format!("malformed tool arguments for '{name}': {e} (got: {args_json})");
warn!(error = %e, name = %name, "tool_use args not valid JSON; surfacing tool error");
(json!({}), Some(msg))
}
}
}
fn tool_result_content(v: &Value) -> Value {
match v {
Value::String(_) => v.clone(),
other => Value::String(other.to_string()),
}
}
pub(crate) fn build_request(config: &LoopConfig, history: &[Message]) -> MessagesRequest {
let (thinking, max_tokens) = match config.thinking.map(thinking_level_to_budget) {
Some(budget) => {
let max = config.max_tokens.max(budget + 1024);
(Some(ThinkingConfig::enabled(budget)), max)
}
None => (None, config.max_tokens),
};
MessagesRequest {
model: config.model.clone(),
max_tokens,
system: config.system.clone(),
messages: history.to_vec(),
tools: config.tool_declarations.clone(),
tool_choice: None,
stream: true,
temperature: if thinking.is_some() {
None
} else {
config.temperature
},
thinking,
}
}
fn thinking_level_to_budget(level: ThinkingLevel) -> u32 {
match level {
ThinkingLevel::Minimal => 1024,
ThinkingLevel::Low => 2048,
ThinkingLevel::Medium => 8192,
ThinkingLevel::High => 16384,
}
}
fn accumulate_wire_usage(acc: &mut WireUsage, other: &WireUsage) {
fn take_latest(a: &mut Option<i32>, b: Option<i32>) {
if b.is_some() {
*a = b;
}
}
take_latest(&mut acc.input_tokens, other.input_tokens);
take_latest(&mut acc.output_tokens, other.output_tokens);
take_latest(
&mut acc.cache_read_input_tokens,
other.cache_read_input_tokens,
);
take_latest(
&mut acc.cache_creation_input_tokens,
other.cache_creation_input_tokens,
);
}
fn extract_canonical_path(args: &Value) -> Option<String> {
let path_str = args.get("path").and_then(|v| v.as_str())?;
let path = std::path::Path::new(path_str);
if let Ok(p) = dunce::canonicalize(path) {
return Some(p.display().to_string());
}
let parent = path.parent()?;
let file = path.file_name()?;
let parent = if parent.as_os_str().is_empty() {
std::path::Path::new(".")
} else {
parent
};
dunce::canonicalize(parent)
.ok()
.map(|p| p.join(file).display().to_string())
}
fn emit_error(state: &LoopState, message: String) {
state.emit(Step::turn_error(state.alloc_step_index(), message));
}
impl LoopState {
fn emit_chunk_step(&self, chunk: StreamChunk) {
if let StreamChunk::ToolCall(tc) = chunk {
self.emit(Step::tool_call(
self.alloc_step_index(),
tc,
StepStatus::Active,
));
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{CustomSystemInstructions, SystemInstructions};
#[test]
fn render_system_custom() {
let s = SystemInstructions::Custom(CustomSystemInstructions {
text: "be terse".into(),
});
assert_eq!(render_system(&s), "be terse");
}
#[test]
fn resolve_tool_args_valid_json_parses() {
let (args, err) = resolve_tool_args("view_file", r#"{"path":"main.rs"}"#);
assert!(err.is_none());
assert_eq!(args["path"], "main.rs");
}
#[test]
fn resolve_tool_args_empty_is_valid_no_arg_call() {
let (args, err) = resolve_tool_args("list_subdomains", "");
assert!(err.is_none(), "empty args must NOT be treated as malformed");
assert_eq!(args, json!({}));
let (args2, err2) = resolve_tool_args("list_subdomains", " ");
assert!(err2.is_none());
assert_eq!(args2, json!({}));
}
#[test]
fn resolve_tool_args_malformed_surfaces_error_not_empty() {
let (args, err) = resolve_tool_args("edit_file", r#"{"path":"a.rs","content":"#);
assert!(err.is_some(), "malformed non-empty args must surface an error");
let msg = err.unwrap();
assert!(msg.contains("malformed tool arguments for 'edit_file'"));
assert_eq!(args, json!({}));
}
#[test]
fn tool_result_content_objects_become_json_strings() {
let obj = json!({"contents": "fn main() {}", "lines": 1});
let wire = tool_result_content(&obj);
assert!(wire.is_string(), "object must serialize to a string, got {wire}");
let back: Value = serde_json::from_str(wire.as_str().unwrap()).unwrap();
assert_eq!(back, obj);
assert!(tool_result_content(&json!({"ok": true})).is_string());
assert!(tool_result_content(&json!({"error": "boom"})).is_string());
assert!(tool_result_content(&json!(["a", "b"])).is_string());
let s = json!("plain text result");
assert_eq!(tool_result_content(&s), json!("plain text result"));
}
#[test]
fn build_request_clamps_thinking_below_max_tokens() {
let config = LoopConfig {
model: "claude-haiku-4-5-20251001".into(),
system: None,
thinking: Some(ThinkingLevel::High),
temperature: Some(0.7),
max_tokens: 8192, tool_declarations: Vec::new(),
compaction_threshold: None,
};
let req = build_request(&config, &[Message::user_text("hi")]);
let thinking = req.thinking.expect("thinking enabled");
assert!(
req.max_tokens > thinking.budget_tokens,
"max_tokens ({}) must exceed budget ({})",
req.max_tokens,
thinking.budget_tokens
);
assert!(req.temperature.is_none());
}
#[test]
fn build_request_no_thinking_keeps_temperature() {
let config = LoopConfig {
model: "claude-haiku-4-5-20251001".into(),
system: Some("sys".into()),
thinking: None,
temperature: Some(0.3),
max_tokens: 4096,
tool_declarations: Vec::new(),
compaction_threshold: None,
};
let req = build_request(&config, &[Message::user_text("hi")]);
assert!(req.thinking.is_none());
assert_eq!(req.temperature, Some(0.3));
assert_eq!(req.max_tokens, 4096);
assert_eq!(req.system.as_deref(), Some("sys"));
}
#[test]
fn usage_does_not_double_count_message_start_output_placeholder() {
let mut round = WireUsage::default();
accumulate_wire_usage(
&mut round,
&WireUsage {
input_tokens: Some(12),
output_tokens: Some(1), cache_read_input_tokens: Some(8),
cache_creation_input_tokens: None,
},
);
accumulate_wire_usage(
&mut round,
&WireUsage {
input_tokens: None,
output_tokens: Some(33),
cache_read_input_tokens: None,
cache_creation_input_tokens: None,
},
);
assert_eq!(round.input_tokens, Some(12), "input from message_start");
assert_eq!(
round.cache_read_input_tokens,
Some(8),
"cache_read from message_start"
);
assert_eq!(
round.output_tokens,
Some(33),
"output_tokens must be the cumulative message_delta value (33), \
not message_start placeholder (1) + 33 = 34"
);
let neutral: UsageMetadata = round.into();
assert_eq!(neutral.candidates_token_count, Some(33));
assert_eq!(neutral.prompt_token_count, Some(12));
assert_eq!(neutral.total_token_count, Some(45)); }
#[test]
fn assistant_turn_preserves_signed_thinking_block_before_tool_use() {
let mut thinking_blocks: BTreeMap<u32, ThinkingAccum> = BTreeMap::new();
thinking_blocks.insert(
0,
ThinkingAccum {
thinking: "Let me reason about this.".into(),
signature: "sig_abc".into(),
},
);
let accumulated_text = "I'll read it.".to_string();
let pending_calls: Vec<(String, String, Value, Option<String>)> =
vec![("toolu_1".into(), "view_file".into(), json!({"path": "a.rs"}), None)];
let mut assistant_blocks: Vec<Block> = Vec::new();
for (_idx, acc) in std::mem::take(&mut thinking_blocks) {
if !acc.thinking.is_empty() && !acc.signature.is_empty() {
assistant_blocks.push(Block::Thinking {
thinking: acc.thinking,
signature: Some(acc.signature),
});
}
}
if !accumulated_text.is_empty() {
assistant_blocks.push(Block::Text {
text: accumulated_text.clone(),
});
}
for (id, name, args, _e) in &pending_calls {
assistant_blocks.push(Block::ToolUse {
id: id.clone(),
name: name.clone(),
input: args.clone(),
});
}
match &assistant_blocks[0] {
Block::Thinking { thinking, signature } => {
assert_eq!(thinking, "Let me reason about this.");
assert_eq!(signature.as_deref(), Some("sig_abc"));
}
other => panic!("expected leading Thinking block, got {other:?}"),
}
assert!(matches!(assistant_blocks[1], Block::Text { .. }));
assert!(matches!(assistant_blocks[2], Block::ToolUse { .. }));
let wire = serde_json::to_value(&assistant_blocks[0]).unwrap();
assert_eq!(wire["type"], "thinking");
assert_eq!(wire["thinking"], "Let me reason about this.");
assert_eq!(wire["signature"], "sig_abc");
}
#[test]
fn unsigned_thinking_block_is_dropped() {
let mut thinking_blocks: BTreeMap<u32, ThinkingAccum> = BTreeMap::new();
thinking_blocks.insert(
0,
ThinkingAccum {
thinking: "partial reasoning".into(),
signature: String::new(), },
);
let mut assistant_blocks: Vec<Block> = Vec::new();
for (_idx, acc) in std::mem::take(&mut thinking_blocks) {
if !acc.thinking.is_empty() && !acc.signature.is_empty() {
assistant_blocks.push(Block::Thinking {
thinking: acc.thinking,
signature: Some(acc.signature),
});
}
}
assert!(
assistant_blocks.is_empty(),
"unsigned thinking must not be persisted"
);
}
#[test]
fn usage_takes_latest_cumulative_output_across_multiple_deltas() {
let mut round = WireUsage::default();
accumulate_wire_usage(
&mut round,
&WireUsage {
input_tokens: Some(20),
output_tokens: Some(2),
..Default::default()
},
);
accumulate_wire_usage(
&mut round,
&WireUsage {
output_tokens: Some(10),
..Default::default()
},
);
accumulate_wire_usage(
&mut round,
&WireUsage {
output_tokens: Some(25),
..Default::default()
},
);
assert_eq!(
round.output_tokens,
Some(25),
"cumulative deltas: final reported output is the LAST value (25), not 2+10+25"
);
assert_eq!(round.input_tokens, Some(20));
}
}