use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
#[cfg(unix)]
use libc;
use tokio::io::AsyncBufReadExt as _;
use tokio::sync::mpsc;
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;
use crate::agent::context::ConversationContext;
use crate::agent::guard::{AgentGuard, GuardVerdict, StopReason};
use crate::agent::journal::Journal;
use crate::api::models::{ChatRequest, Message};
use crate::api::provider::{ApiCallError, OpenAiCompatibleProvider};
use crate::api::streaming::{self, StreamEvent};
use crate::config::Config;
use crate::mcp::manager::McpManager;
use crate::tools::registry;
use crate::tools::tool_index::ToolIndex;
use crate::trust::TrustLevel;
use super::AgentEvent;
use super::worker::apply_worker_instruction;
const STREAM_REQUEST_TIMEOUT_SECS: u64 = 180;
const STREAM_REQUEST_MAX_RETRIES: u32 = 8;
pub(super) fn compute_backoff(attempt: u32, retry_after: Option<Duration>) -> Duration {
use rand::RngExt;
const BACKOFF_CEILING: Duration = Duration::from_secs(60);
if let Some(hint) = retry_after {
let floored = std::cmp::max(hint, Duration::from_millis(100));
return std::cmp::min(floored, BACKOFF_CEILING);
}
let exp = attempt.saturating_sub(1).min(5);
let base_secs = 1u64 << exp;
let base_ms = (base_secs * 1000) as f64;
let jitter: f64 = rand::rng().random_range(-0.25..=0.25);
let jittered = (base_ms * (1.0 + jitter)).max(100.0);
let dur = Duration::from_millis(jittered as u64);
std::cmp::min(dur, BACKOFF_CEILING)
}
pub(super) fn classify_retry(err: &ApiCallError) -> Option<&'static str> {
if !err.is_retryable() {
return None;
}
Some(if err.is_rate_limit() {
"Rate limited"
} else {
match err {
ApiCallError::Status { status, .. } => match status {
500 => "Upstream 500",
502 => "Bad gateway",
503 => "Service unavailable",
504 => "Gateway timeout",
408 | 425 => "Request timeout",
_ => "Transient error",
},
ApiCallError::Network(_) => "Network error",
ApiCallError::Decode(_) => "Transient error",
}
})
}
pub(super) enum GuardOutcome {
Continue,
Stop(StopReason),
}
pub(super) fn build_tool_defs(
trust_level: TrustLevel,
has_skills: bool,
has_rag: bool,
model_supports_tools: bool,
mcp_manager: &Arc<McpManager>,
tool_index: &Arc<ToolIndex>,
user_msg: &str,
) -> Option<Arc<Vec<serde_json::Value>>> {
if !model_supports_tools {
return None;
}
let mut defs = registry::trusted_tool_definitions(trust_level, has_skills, has_rag);
let eager_mcp = mcp_manager.eager_tool_definitions();
defs.extend(eager_mcp);
let is_deferred = mcp_manager.is_deferred_mode()
|| tool_index.entry_count() > crate::mcp::manager::MCP_EAGER_THRESHOLD;
if is_deferred {
defs.push(crate::tools::tool_search::definition());
let auto_schemas = tool_index.auto_select(user_msg, 5);
if !auto_schemas.is_empty() {
let count = auto_schemas.len();
for schema in auto_schemas {
let tool_name = schema["function"]["name"].as_str().unwrap_or("");
if !defs
.iter()
.any(|d| d["function"]["name"].as_str() == Some(tool_name))
{
defs.push(schema);
}
}
tracing::info!(
tools = count,
"Auto-selected deferred tools from user prompt"
);
}
}
if defs.is_empty() {
None
} else {
Some(Arc::new(defs))
}
}
pub(super) fn build_chat_request(
client: &OpenAiCompatibleProvider,
config: &Config,
context: &mut ConversationContext,
tool_defs: Option<Arc<Vec<serde_json::Value>>>,
) -> ChatRequest {
let model_profile = crate::api::model_profile::profile_for(&client.model);
let request_temperature = if model_profile.supports_reasoning {
None
} else {
config.temperature
};
let (request_thinking_budget, request_reasoning_effort) = if model_profile.supports_reasoning {
if config.thinking_budget_tokens.is_some() {
(config.thinking_budget_tokens, None)
} else {
(None, config.reasoning_effort.clone())
}
} else {
(None, None)
};
ChatRequest {
model: client.model.clone(),
messages: context.build_messages(),
tool_choice: if tool_defs.is_some() {
Some("auto".to_string())
} else {
None
},
tools: tool_defs,
max_tokens: client.max_tokens,
stream: true,
temperature: request_temperature,
thinking_budget_tokens: request_thinking_budget,
reasoning_effort: request_reasoning_effort,
}
}
pub(super) async fn drain_worker_instructions(
rx: &mut mpsc::UnboundedReceiver<crate::agent::swarm::knowledge::WorkerInstruction>,
paused: &mut bool,
context: &mut ConversationContext,
cancel: &CancellationToken,
event_tx: &mpsc::UnboundedSender<AgentEvent>,
) -> Option<StopReason> {
use crate::agent::swarm::knowledge::WorkerInstruction;
while *paused {
tokio::select! {
_ = cancel.cancelled() => {
let _ = event_tx.send(AgentEvent::GuardStop("Cancelled by user.".to_string()));
*paused = false;
return Some(StopReason::Cancelled);
}
instr = rx.recv() => {
match instr {
Some(WorkerInstruction::Resume) => {
*paused = false;
let _ = event_tx.send(AgentEvent::SwarmWorkerResumed {
agent_id: String::new(),
});
context.push(Message {
role: "user".to_string(),
content: Some(crate::api::Content::text(
"[SYSTEM] You have been resumed. Continue your work.",
)),
reasoning_content: None,
tool_calls: None,
tool_call_id: None,
});
}
Some(other) => apply_worker_instruction(other, context),
None => {
let _ = event_tx.send(AgentEvent::GuardStop(
"Worker instruction channel closed.".to_string(),
));
*paused = false;
return Some(StopReason::Cancelled);
}
}
}
}
}
while let Ok(instr) = rx.try_recv() {
match instr {
WorkerInstruction::Pause => {
*paused = true;
let _ = event_tx.send(AgentEvent::SwarmWorkerPaused {
agent_id: String::new(),
});
}
other => apply_worker_instruction(other, context),
}
}
None
}
pub(super) fn apply_guard_verdict(
verdict: GuardVerdict,
guard: &mut AgentGuard,
context: &mut ConversationContext,
event_tx: &mpsc::UnboundedSender<AgentEvent>,
) -> GuardOutcome {
match verdict {
GuardVerdict::Proceed => GuardOutcome::Continue,
GuardVerdict::ApproachingTimeout { remaining } => {
context.push(Message {
role: "user".to_string(),
content: Some(crate::api::Content::text(format!(
"[SYSTEM] You have approximately {}s remaining before timeout. \
Wrap up your current work: summarize what you've accomplished \
and what remains to be done. Focus on completing the most \
critical remaining step if possible.",
remaining.as_secs()
))),
reasoning_content: None,
tool_calls: None,
tool_call_id: None,
});
GuardOutcome::Continue
}
GuardVerdict::IterationLimit { count } => {
let externally_extended = guard.budget().map(|b| b.get() > count).unwrap_or(false);
if externally_extended {
guard.reset();
let _ = event_tx.send(AgentEvent::PhaseChange {
label: format!("Iteration budget extended — continuing from {count}"),
});
GuardOutcome::Continue
} else {
let msg = format!("Iteration limit reached ({count}). Stopping.");
let _ = event_tx.send(AgentEvent::GuardStop(msg));
GuardOutcome::Stop(StopReason::IterationLimit { count })
}
}
GuardVerdict::CircuitBreakerRecovery { failures } => {
let _ = event_tx.send(AgentEvent::PhaseChange {
label: format!(
"Recovering from {failures} consecutive tool failures — re-diagnosing..."
),
});
context.push(Message {
role: "user".to_string(),
content: Some(crate::api::Content::text(format!(
"[SYSTEM] You've had {failures} consecutive tool failures. \
Stop and diagnose what's going wrong before continuing. \
Re-read the relevant files, examine the actual error messages, \
and try a completely different approach."
))),
reasoning_content: None,
tool_calls: None,
tool_call_id: None,
});
GuardOutcome::Continue
}
GuardVerdict::ApproachingIterationLimit { remaining } => {
context.push(Message {
role: "user".to_string(),
content: Some(crate::api::Content::text(format!(
"[SYSTEM] You have approximately {remaining} iterations remaining. \
Prioritize completing the most critical remaining steps. \
Skip non-essential work and focus on finishing the core task."
))),
reasoning_content: None,
tool_calls: None,
tool_call_id: None,
});
let _ = event_tx.send(AgentEvent::SwarmWorkerApproaching {
agent_id: String::new(),
task_preview: String::new(),
remaining,
});
GuardOutcome::Continue
}
GuardVerdict::CircuitBreaker { failures } => {
let msg = format!("Circuit breaker: {failures} consecutive tool failures.");
let _ = event_tx.send(AgentEvent::GuardStop(msg));
GuardOutcome::Stop(StopReason::CircuitBreaker { failures })
}
GuardVerdict::TaskTimeout { elapsed } => {
let msg = format!("Task timeout after {}s.", elapsed.as_secs());
let _ = event_tx.send(AgentEvent::GuardStop(msg));
GuardOutcome::Stop(StopReason::TaskTimeout {
elapsed_secs: elapsed.as_secs(),
})
}
}
}
pub(super) fn inject_discovered_schemas(
result_str: &str,
tool_defs: &mut Option<Arc<Vec<serde_json::Value>>>,
) {
let Ok(parsed) = serde_json::from_str::<serde_json::Value>(result_str) else {
return;
};
let Some(tools_arr) = parsed.get("tools").and_then(|t| t.as_array()) else {
return;
};
let Some(defs_arc) = tool_defs.as_mut() else {
return;
};
let defs = Arc::make_mut(defs_arc);
for tool_entry in tools_arr {
if let Some(schema) = tool_entry.get("schema") {
let tool_name = schema["function"]["name"].as_str().unwrap_or("");
if !defs
.iter()
.any(|d| d["function"]["name"].as_str() == Some(tool_name))
{
defs.push(schema.clone());
tracing::debug!(tool = tool_name, "Injected deferred tool schema");
}
}
}
}
pub(super) async fn finish_with_done(
context: ConversationContext,
journal: &Journal,
mcp_manager: &Arc<McpManager>,
owns_mcp: bool,
event_tx: &mpsc::UnboundedSender<AgentEvent>,
stop_reason: Option<StopReason>,
) {
if owns_mcp {
mcp_manager.shutdown_all().await;
}
journal.write_completion().await;
let _ = event_tx.send(AgentEvent::Done {
context,
stop_reason,
});
}
pub(super) struct StreamIterationOutput {
pub full_content: String,
pub reasoning_content: String,
pub tool_calls: Vec<(String, String, String)>,
pub turn_prompt_tokens: u32,
pub turn_completion_tokens: u32,
pub turn_cached_tokens: u32,
pub api_elapsed_ms: u64,
}
pub(super) enum StreamIterationResult {
Ok(StreamIterationOutput),
Exit,
}
async fn call_chat_stream_with_retry(
client: &OpenAiCompatibleProvider,
request: &ChatRequest,
config: &Config,
cancel: &CancellationToken,
event_tx: &mpsc::UnboundedSender<AgentEvent>,
) -> Option<reqwest::Response> {
let max_retries = STREAM_REQUEST_MAX_RETRIES;
let mut attempt = 0u32;
loop {
let attempt_result = tokio::select! {
_ = cancel.cancelled() => {
let _ = event_tx.send(AgentEvent::GuardStop("Cancelled by user.".to_string()));
return None;
}
r = timeout(
Duration::from_secs(STREAM_REQUEST_TIMEOUT_SECS),
client.chat_stream(request),
) => r,
};
let api_err: ApiCallError = match attempt_result {
Ok(Ok(resp)) => return Some(resp),
Ok(Err(e)) => e,
Err(_elapsed) => {
attempt += 1;
if attempt > max_retries {
let _ = event_tx.send(AgentEvent::Error(
"LLM request timed out after all retries".to_string(),
));
return None;
}
let wait = compute_backoff(attempt, None);
emit_retry_event(event_tx, attempt, max_retries, "Request timed out", wait);
tokio::time::sleep(wait).await;
continue;
}
};
let Some(reason) = classify_retry(&api_err) else {
crate::telemetry::error(
"api_error",
serde_json::json!({
"error": api_err.to_string(),
"transient": false,
"model": &config.model,
}),
);
let _ = event_tx.send(AgentEvent::Error(format!("API error: {api_err}")));
return None;
};
attempt += 1;
if attempt > max_retries {
crate::telemetry::error(
"api_error_exhausted",
serde_json::json!({
"error": api_err.to_string(),
"retries": max_retries,
"model": &config.model,
}),
);
let _ = event_tx.send(AgentEvent::Error(format!(
"API error after {max_retries} retries: {api_err}"
)));
return None;
}
let wait = compute_backoff(attempt, api_err.retry_after());
emit_retry_event(event_tx, attempt, max_retries, reason, wait);
tokio::time::sleep(wait).await;
}
}
fn emit_retry_event(
event_tx: &mpsc::UnboundedSender<AgentEvent>,
attempt: u32,
max: u32,
reason: &str,
wait: Duration,
) {
let wait_secs = wait.as_secs_f32();
let msg = format!("{reason} — retrying in {wait_secs:.1}s (attempt {attempt}/{max})...");
tracing::warn!("{msg}");
let _ = event_tx.send(AgentEvent::StreamRetry {
attempt,
max,
message: msg,
});
}
pub(super) async fn run_stream_iteration(
client: &OpenAiCompatibleProvider,
request: &ChatRequest,
config: &Config,
cancel: &CancellationToken,
event_tx: &mpsc::UnboundedSender<AgentEvent>,
) -> StreamIterationResult {
let api_start = std::time::Instant::now();
let Some(initial_response) =
call_chat_stream_with_retry(client, request, config, cancel, event_tx).await
else {
return StreamIterationResult::Exit;
};
let stream_idle_secs = config.stream_idle_timeout_secs;
let stream_max_retries = config.stream_max_retries;
let mut full_content = String::with_capacity(2048);
let mut reasoning_content = String::new();
let mut tool_calls: Vec<(String, String, String)> = Vec::new();
let mut current_tool_args: HashMap<u32, String> = HashMap::new();
let mut current_tool_meta: HashMap<u32, (String, String)> = HashMap::new();
let mut turn_prompt_tokens: u32 = 0;
let mut turn_completion_tokens: u32 = 0;
let mut turn_cached_tokens: u32 = 0;
let mut stream_attempt = 0u32;
let mut next_response: Option<reqwest::Response> = Some(initial_response);
'stream_retry: loop {
let current_response = next_response
.take()
.expect("response always set before loop");
let (stream_tx, mut stream_rx) = mpsc::unbounded_channel();
let stream_handle = tokio::spawn(streaming::process_stream(current_response, stream_tx));
let idle_dur = Duration::from_secs(stream_idle_secs);
loop {
tokio::select! {
_ = cancel.cancelled() => {
stream_handle.abort();
let _ = event_tx.send(AgentEvent::GuardStop("Cancelled by user.".to_string()));
return StreamIterationResult::Exit;
}
event = tokio::time::timeout(idle_dur, stream_rx.recv()) => {
match event {
Err(_) => {
stream_handle.abort();
break;
}
Ok(Some(StreamEvent::Token(token))) => {
full_content.push_str(&token);
let _ = event_tx.send(AgentEvent::Token(token));
}
Ok(Some(StreamEvent::Reasoning(text))) => {
reasoning_content.push_str(&text);
}
Ok(Some(StreamEvent::ToolCallStart { index, id, name })) => {
current_tool_meta.insert(index, (id, name));
current_tool_args.entry(index).or_default();
}
Ok(Some(StreamEvent::ToolCallArgs { index, args_chunk })) => {
current_tool_args.entry(index).or_default().push_str(&args_chunk);
}
Ok(Some(StreamEvent::Done { prompt_tokens, completion_tokens, cached_tokens })) => {
turn_prompt_tokens = prompt_tokens;
turn_completion_tokens = completion_tokens;
turn_cached_tokens = cached_tokens;
break 'stream_retry;
}
Ok(Some(StreamEvent::Error(msg))) => {
tracing::warn!(error = %msg, "Stream error during token processing");
current_tool_meta.clear();
current_tool_args.clear();
break 'stream_retry;
}
Ok(None) => break 'stream_retry,
}
}
}
}
let elapsed = api_start.elapsed().as_secs();
let _ = event_tx.send(AgentEvent::StreamWaiting {
elapsed_secs: elapsed,
});
let has_text = full_content.len() > 200;
let has_tool_calls_in_progress = !current_tool_meta.is_empty();
if has_text && !has_tool_calls_in_progress {
tracing::info!(
content_len = full_content.len(),
"Stream idle but accepting partial text response (no tool calls in progress)"
);
break 'stream_retry;
}
stream_attempt += 1;
if stream_attempt > stream_max_retries {
if !full_content.is_empty() && !has_tool_calls_in_progress {
tracing::warn!("Stream retries exhausted but partial text available — accepting");
break 'stream_retry;
}
let _ = event_tx.send(AgentEvent::Error(format!(
"Stream unresponsive after {}s × {} retries. Server may be overloaded.",
stream_idle_secs, stream_max_retries
)));
return StreamIterationResult::Exit;
}
let _ = event_tx.send(AgentEvent::StreamRetry {
attempt: stream_attempt,
max: stream_max_retries,
message: format!("Stream idle — retrying ({stream_attempt}/{stream_max_retries})..."),
});
full_content.clear();
reasoning_content.clear();
tool_calls.clear();
current_tool_args.clear();
current_tool_meta.clear();
let backoff = Duration::from_secs(std::cmp::min(2u64.pow(stream_attempt), 30));
tokio::select! {
_ = cancel.cancelled() => {
let _ = event_tx.send(AgentEvent::GuardStop("Cancelled by user.".to_string()));
return StreamIterationResult::Exit;
}
_ = tokio::time::sleep(backoff) => {}
}
let retry_result = tokio::select! {
_ = cancel.cancelled() => {
let _ = event_tx.send(AgentEvent::GuardStop("Cancelled by user.".to_string()));
return StreamIterationResult::Exit;
}
r = timeout(Duration::from_secs(STREAM_REQUEST_TIMEOUT_SECS), client.chat_stream(request)) => r,
};
match retry_result {
Ok(Ok(resp)) => {
next_response = Some(resp);
}
Ok(Err(e)) => {
let _ = event_tx.send(AgentEvent::Error(format!("Retry failed: {e}")));
return StreamIterationResult::Exit;
}
Err(_) => {
let _ = event_tx.send(AgentEvent::Error("Retry connection timed out.".to_string()));
return StreamIterationResult::Exit;
}
}
}
for (index, args) in ¤t_tool_args {
if let Some((id, name)) = current_tool_meta.get(index) {
tool_calls.push((id.clone(), name.clone(), args.clone()));
}
}
StreamIterationResult::Ok(StreamIterationOutput {
full_content,
reasoning_content,
tool_calls,
turn_prompt_tokens,
turn_completion_tokens,
turn_cached_tokens,
api_elapsed_ms: api_start.elapsed().as_millis() as u64,
})
}
#[allow(clippy::too_many_arguments)]
pub(super) async fn run_cli_fast_path(
cli_binary: String,
cli_args: Vec<String>,
cli_yolo_args: Vec<String>,
cli_model_env: Option<String>,
cli_skip_model: bool,
cli_yolo_env: Vec<String>,
yolo: bool,
user_msg: String,
working_dir: String,
client: OpenAiCompatibleProvider,
cancel: CancellationToken,
event_tx: mpsc::UnboundedSender<AgentEvent>,
mut context: ConversationContext,
journal: Journal,
mcp_manager: Arc<McpManager>,
owns_mcp: bool,
task_timeout_secs: u64,
max_iterations: u32,
cli_max_turns_flag: Option<String>,
) {
const MAX_OUTPUT_BYTES: usize = 4 * 1024 * 1024;
let model_flag = if cli_skip_model {
crate::api::cli_provider::ModelFlag::Skip
} else if let Some(env_var) = cli_model_env {
crate::api::cli_provider::ModelFlag::Env(env_var)
} else {
crate::api::cli_provider::ModelFlag::Flag("--model".into())
};
let yolo_env: Vec<(String, String)> = cli_yolo_env
.iter()
.filter_map(|kv| {
let mut parts = kv.splitn(2, '=');
Some((parts.next()?.to_string(), parts.next()?.to_string()))
})
.collect();
let cli_provider = crate::api::cli_provider::CliProvider {
binary: cli_binary.clone(),
args: cli_args.clone(),
yolo_args: cli_yolo_args.clone(),
yolo_env,
model_flag,
max_turns_flag: cli_max_turns_flag.clone(),
};
let _ = event_tx.send(AgentEvent::Token(format!(
"⟩ Calling {} {}…\n",
cli_binary,
cli_args.join(" "),
)));
let max_iters_for_cli = cli_max_turns_flag.is_some().then_some(max_iterations);
let output_format = crate::api::cli_output::CliOutputFormat::detect(&cli_args);
let mut final_result: Option<String> = None;
let mut child = match cli_provider.spawn_child(
&user_msg,
&working_dir,
Some(&client.model),
yolo,
max_iters_for_cli,
) {
Ok(c) => c,
Err(e) => {
let _ = event_tx.send(AgentEvent::Error(format!("Failed to spawn CLI: {e}")));
let _ = event_tx.send(AgentEvent::Done {
context,
stop_reason: None,
});
if owns_mcp {
mcp_manager.shutdown_all().await;
}
return;
}
};
let stdout = child.stdout.take().expect("stdout is piped");
let mut reader = tokio::io::BufReader::new(stdout).lines();
let mut full_response = String::new();
let mut output_bytes: usize = 0;
let task_deadline =
tokio::time::Instant::now() + std::time::Duration::from_secs(task_timeout_secs);
let guard_stop_reason = loop {
tokio::select! {
_ = cancel.cancelled() => {
kill_child_gracefully(&mut child).await;
break Some((
"Cancelled by user.".to_string(),
StopReason::Cancelled,
));
}
_ = tokio::time::sleep_until(task_deadline) => {
kill_child_gracefully(&mut child).await;
break Some((
format!("CLI task exceeded time limit ({task_timeout_secs}s)."),
StopReason::TaskTimeout { elapsed_secs: task_timeout_secs },
));
}
line_result = reader.next_line() => {
match line_result {
Ok(Some(line)) => {
output_bytes += line.len();
if output_bytes > MAX_OUTPUT_BYTES {
kill_child_gracefully(&mut child).await;
break Some((
"CLI output exceeded 4 MiB limit.".to_string(),
StopReason::IterationLimit { count: 0 },
));
}
use crate::api::cli_output::{parse_cli_line, ParsedCliLine};
match parse_cli_line(&line, output_format) {
ParsedCliLine::Text(text) => {
full_response.push_str(&text);
full_response.push('\n');
let _ = event_tx.send(AgentEvent::Token(format!("{text}\n")));
}
ParsedCliLine::ToolCall { name } => {
let _ = event_tx
.send(AgentEvent::Token(format!(" \u{27e9} {name}\u{2026}\n")));
}
ParsedCliLine::FinalResult(result) => {
final_result = Some(result);
}
ParsedCliLine::Skip => {}
}
}
Ok(None) => break None, Err(e) => {
let _ = event_tx.send(AgentEvent::Error(format!("CLI read error: {e}")));
break None;
}
}
}
}
};
if let Some((msg, stop_reason)) = guard_stop_reason {
let _ = event_tx.send(AgentEvent::GuardStop(msg));
let _ = event_tx.send(AgentEvent::Done {
context,
stop_reason: Some(stop_reason),
});
if owns_mcp {
mcp_manager.shutdown_all().await;
}
journal.write_completion().await;
return;
}
match child.wait().await {
Ok(status) if status.success() => {
let output = final_result.unwrap_or_else(|| full_response.trim().to_string());
let _ = event_tx.send(AgentEvent::Response(output.clone()));
context.push(Message {
role: "assistant".to_string(),
content: Some(crate::api::Content::text(output)),
reasoning_content: None,
tool_calls: None,
tool_call_id: None,
});
}
Ok(status) => {
let _ = event_tx.send(AgentEvent::Error(format!(
"CLI `{}` exited with {}",
cli_binary, status
)));
}
Err(e) => {
let _ = event_tx.send(AgentEvent::Error(format!("CLI wait error: {e}")));
}
}
if owns_mcp {
mcp_manager.shutdown_all().await;
}
journal.write_completion().await;
let _ = event_tx.send(AgentEvent::Done {
context,
stop_reason: None,
});
}
async fn kill_child_gracefully(child: &mut tokio::process::Child) {
#[cfg(unix)]
if let Some(pid) = child.id() {
unsafe {
libc::kill(pid as libc::pid_t, libc::SIGTERM);
}
}
#[cfg(not(unix))]
{
let _ = child.start_kill();
return;
}
let _ = tokio::time::timeout(std::time::Duration::from_secs(1), child.wait()).await;
let _ = child.start_kill();
}
#[cfg(test)]
mod tests {
use super::*;
fn status(code: u16) -> ApiCallError {
ApiCallError::Status {
status: code,
retry_after: None,
body: String::new(),
}
}
#[test]
fn classify_retry_recognizes_500() {
assert_eq!(classify_retry(&status(500)), Some("Upstream 500"));
}
#[test]
fn classify_retry_distinguishes_rate_limit() {
assert_eq!(classify_retry(&status(429)), Some("Rate limited"));
}
#[test]
fn classify_retry_returns_none_for_permanent() {
assert_eq!(classify_retry(&status(400)), None);
assert_eq!(classify_retry(&status(401)), None);
assert_eq!(classify_retry(&status(404)), None);
}
#[test]
fn classify_retry_handles_5xx_family() {
assert_eq!(classify_retry(&status(502)), Some("Bad gateway"));
assert_eq!(classify_retry(&status(503)), Some("Service unavailable"));
assert_eq!(classify_retry(&status(504)), Some("Gateway timeout"));
}
#[test]
fn compute_backoff_grows_exponentially() {
let bounds = [
(1u32, 0.7, 1.3), (2, 1.4, 2.6), (3, 2.8, 5.2), (4, 5.6, 10.4), (5, 11.2, 20.8), (6, 22.4, 41.6), ];
for (attempt, lo, hi) in bounds {
let secs = compute_backoff(attempt, None).as_secs_f64();
assert!(
secs >= lo && secs <= hi,
"attempt={attempt}: backoff={secs}s outside [{lo}, {hi}]"
);
}
}
#[test]
fn compute_backoff_caps_at_ceiling() {
for attempt in [7u32, 10, 20, 100] {
let secs = compute_backoff(attempt, None).as_secs_f64();
assert!(
secs <= 60.0,
"attempt={attempt}: backoff={secs}s exceeds 60s ceiling"
);
assert!(
secs >= 22.0,
"attempt={attempt}: backoff={secs}s suspiciously small"
);
}
}
#[test]
fn compute_backoff_honors_retry_after() {
let hint = Duration::from_secs(7);
assert_eq!(compute_backoff(1, Some(hint)), hint);
assert_eq!(compute_backoff(5, Some(hint)), hint);
}
#[test]
fn compute_backoff_floors_sub_100ms_retry_after() {
assert_eq!(
compute_backoff(1, Some(Duration::ZERO)),
Duration::from_millis(100)
);
assert_eq!(
compute_backoff(1, Some(Duration::from_millis(50))),
Duration::from_millis(100),
);
assert_eq!(
compute_backoff(1, Some(Duration::from_millis(100))),
Duration::from_millis(100),
);
}
#[test]
fn compute_backoff_clamps_extreme_retry_after() {
let hint = Duration::from_secs(3600);
let waited = compute_backoff(1, Some(hint));
assert!(waited <= Duration::from_secs(60));
}
}