use super::types::ExtractedContent;
use crate::authority::EventAuthority;
use crate::events::AgentEvent;
use crate::hooks::AgentHooks;
use crate::llm::{ChatResponse, Content, ContentBlock, Message, Role};
use crate::stores::EventStore;
use crate::types::{AgentError, PendingToolCallInfo, RetryConfig, ThreadId, ToolResult};
use futures::FutureExt;
use std::panic::AssertUnwindSafe;
use std::sync::Arc;
use std::time::Duration;
pub(super) fn panic_payload_message(payload: &(dyn std::any::Any + Send)) -> String {
if let Some(s) = payload.downcast_ref::<&'static str>() {
return (*s).to_string();
}
if let Some(s) = payload.downcast_ref::<String>() {
return s.clone();
}
"non-string panic payload".to_string()
}
pub(super) async fn catch_tool_panic<Fut>(execute: Fut) -> Result<ToolResult, AgentError>
where
Fut: std::future::Future<Output = Result<ToolResult, AgentError>>,
{
match AssertUnwindSafe(execute).catch_unwind().await {
Ok(result) => result,
Err(payload) => {
let message = panic_payload_message(payload.as_ref());
log::error!("tool execution panicked: {message}");
Ok(ToolResult::error(format!("Tool panicked: {message}")))
}
}
}
pub(super) fn turns_to_u32(turns: usize) -> u32 {
u32::try_from(turns).unwrap_or(u32::MAX)
}
pub(super) fn millis_to_u64(millis: u128) -> u64 {
u64::try_from(millis).unwrap_or(u64::MAX)
}
pub(super) fn calculate_backoff_delay(attempt: u32, config: &RetryConfig) -> Duration {
let base_delay = config
.base_delay_ms
.saturating_mul(1u64 << (attempt.saturating_sub(1)));
let max_jitter = config.base_delay_ms.min(1000);
let jitter = if max_jitter > 0 {
use std::collections::hash_map::RandomState;
use std::hash::{BuildHasher, Hasher};
let h = RandomState::new().build_hasher().finish();
h % max_jitter
} else {
0
};
let delay_ms = base_delay.saturating_add(jitter).min(config.max_delay_ms);
Duration::from_millis(delay_ms)
}
pub(super) fn pending_tool_index(
pending_tool_calls: &[PendingToolCallInfo],
tool_id: &str,
) -> Result<usize, AgentError> {
pending_tool_calls
.iter()
.position(|p| p.id == tool_id)
.ok_or_else(|| AgentError::new(format!("Pending tool ID not found: {tool_id}"), false))
}
pub(super) fn extract_content(response: &ChatResponse) -> ExtractedContent {
let mut thinking_parts = Vec::new();
let mut text_parts = Vec::new();
let mut tool_uses = Vec::new();
for block in &response.content {
match block {
ContentBlock::Text { text } => {
text_parts.push(text.clone());
}
ContentBlock::Thinking { thinking, .. } => {
thinking_parts.push(thinking.clone());
}
ContentBlock::ToolUse {
id, name, input, ..
} => {
let input = if input.is_null() {
serde_json::json!({})
} else {
input.clone()
};
tool_uses.push((id.clone(), name.clone(), input.clone()));
}
_ => {}
}
}
let thinking = if thinking_parts.is_empty() {
None
} else {
Some(thinking_parts.join("\n"))
};
let text = if text_parts.is_empty() {
None
} else {
Some(text_parts.join("\n"))
};
(thinking, text, tool_uses)
}
pub(super) async fn send_event<H>(
event_store: &Arc<dyn EventStore>,
thread_id: &ThreadId,
turn: usize,
hooks: &Arc<H>,
authority: &Arc<dyn EventAuthority>,
event: AgentEvent,
) -> Result<(), AgentError>
where
H: AgentHooks,
{
hooks.on_event(&event).await;
#[cfg(feature = "otel")]
crate::observability::trace_io::observe_current(&event);
#[cfg(feature = "otel")]
if let AgentEvent::Error { message, .. } = &event {
crate::observability::trace_io::observe_current_error(message);
}
let envelope = authority.wrap(event);
event_store
.append(thread_id, turn, envelope)
.await
.map_err(|error| AgentError::new(format!("Failed to append event: {error}"), false))
}
pub(super) async fn wrap_and_send(
event_store: &Arc<dyn EventStore>,
thread_id: &ThreadId,
turn: usize,
event: AgentEvent,
authority: &Arc<dyn EventAuthority>,
) -> Result<(), AgentError> {
#[cfg(feature = "otel")]
crate::observability::trace_io::observe_current(&event);
let envelope = authority.wrap(event);
event_store
.append(thread_id, turn, envelope)
.await
.map_err(|error| AgentError::new(format!("Failed to append event: {error}"), false))
}
pub(super) fn build_assistant_message(response: &ChatResponse) -> Message {
let mut blocks = Vec::new();
for block in &response.content {
match block {
ContentBlock::Text { text } => {
blocks.push(ContentBlock::Text { text: text.clone() });
}
ContentBlock::Thinking {
thinking,
signature,
} => {
blocks.push(ContentBlock::Thinking {
thinking: thinking.clone(),
signature: signature.clone(),
});
}
ContentBlock::RedactedThinking { data } => {
blocks.push(ContentBlock::RedactedThinking { data: data.clone() });
}
ContentBlock::ToolUse {
id,
name,
input,
thought_signature,
} => {
blocks.push(ContentBlock::ToolUse {
id: id.clone(),
name: name.clone(),
input: input.clone(),
thought_signature: thought_signature.clone(),
});
}
_ => {}
}
}
Message {
role: Role::Assistant,
content: Content::Blocks(blocks),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::llm::{ChatResponse, ContentBlock, Usage};
use serde_json::json;
#[test]
fn test_extract_content_text_only() {
let response = ChatResponse {
id: "msg_1".to_string(),
content: vec![ContentBlock::Text {
text: "Hello".to_string(),
}],
model: "test".to_string(),
stop_reason: None,
usage: Usage {
input_tokens: 0,
output_tokens: 0,
cached_input_tokens: 0,
cache_creation_input_tokens: 0,
},
};
let (thinking, text, tool_uses) = extract_content(&response);
assert!(thinking.is_none());
assert_eq!(text, Some("Hello".to_string()));
assert!(tool_uses.is_empty());
}
#[test]
fn test_extract_content_tool_use() {
let response = ChatResponse {
id: "msg_1".to_string(),
content: vec![ContentBlock::ToolUse {
id: "tool_1".to_string(),
name: "test_tool".to_string(),
input: json!({"key": "value"}),
thought_signature: None,
}],
model: "test".to_string(),
stop_reason: None,
usage: Usage {
input_tokens: 0,
output_tokens: 0,
cached_input_tokens: 0,
cache_creation_input_tokens: 0,
},
};
let (thinking, text, tool_uses) = extract_content(&response);
assert!(thinking.is_none());
assert!(text.is_none());
assert_eq!(tool_uses.len(), 1);
assert_eq!(tool_uses[0].1, "test_tool");
}
#[test]
fn test_extract_content_mixed() {
let response = ChatResponse {
id: "msg_1".to_string(),
content: vec![
ContentBlock::Text {
text: "Let me help".to_string(),
},
ContentBlock::ToolUse {
id: "tool_1".to_string(),
name: "helper".to_string(),
input: json!({}),
thought_signature: None,
},
],
model: "test".to_string(),
stop_reason: None,
usage: Usage {
input_tokens: 0,
output_tokens: 0,
cached_input_tokens: 0,
cache_creation_input_tokens: 0,
},
};
let (thinking, text, tool_uses) = extract_content(&response);
assert!(thinking.is_none());
assert_eq!(text, Some("Let me help".to_string()));
assert_eq!(tool_uses.len(), 1);
}
#[test]
fn test_millis_to_u64() {
assert_eq!(millis_to_u64(0), 0);
assert_eq!(millis_to_u64(1000), 1000);
assert_eq!(millis_to_u64(u128::from(u64::MAX)), u64::MAX);
assert_eq!(millis_to_u64(u128::from(u64::MAX) + 1), u64::MAX);
}
#[test]
fn test_build_assistant_message() {
let response = ChatResponse {
id: "msg_1".to_string(),
content: vec![
ContentBlock::Text {
text: "Response text".to_string(),
},
ContentBlock::ToolUse {
id: "tool_1".to_string(),
name: "echo".to_string(),
input: json!({"message": "test"}),
thought_signature: None,
},
],
model: "test".to_string(),
stop_reason: None,
usage: Usage {
input_tokens: 0,
output_tokens: 0,
cached_input_tokens: 0,
cache_creation_input_tokens: 0,
},
};
let msg = build_assistant_message(&response);
assert_eq!(msg.role, Role::Assistant);
if let Content::Blocks(blocks) = msg.content {
assert_eq!(blocks.len(), 2);
} else {
panic!("Expected Content::Blocks");
}
}
}