pub mod api;
pub mod compaction;
pub mod wire;
#[path = "loop.rs"]
mod r#loop;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use async_trait::async_trait;
use tokio::sync::broadcast;
use tracing::warn;
use crate::backends::anthropic::api::{AnthropicClient, SharedClient};
use crate::backends::anthropic::r#loop::{
run_turn, to_wire_user_content, LoopConfig, LoopState, TurnDeps,
};
use crate::backends::anthropic::wire::ToolDef;
use crate::backends::gemini::tools::{register_builtins, BuiltinDeps};
use crate::connections::{Connection, ConnectionStrategy, StepStream};
use crate::content::Content;
use crate::error::{Error, Result};
use crate::tools::ToolRunner;
use crate::types::{
CapabilitiesConfig, Step, SystemInstructions, ThinkingLevel, ToolResult,
};
pub use wire::{DEFAULT_MODEL, OPUS_MODEL, SONNET_MODEL};
const STEP_BROADCAST_CAPACITY: usize = 256;
#[derive(Debug, Clone)]
pub struct AnthropicBackendConfig {
pub api_key: String,
pub model: String,
pub system_instructions: Option<SystemInstructions>,
pub thinking: Option<ThinkingLevel>,
pub temperature: Option<f32>,
pub max_tokens: Option<u32>,
pub base_url: Option<url::Url>,
pub conversation_id: Option<String>,
pub capabilities: CapabilitiesConfig,
pub filesystem: Option<crate::filesystem::SharedFilesystem>,
pub api_key_provider: Option<crate::backends::AuthTokenProvider>,
}
impl AnthropicBackendConfig {
pub fn new(api_key: impl Into<String>) -> Self {
Self {
api_key: api_key.into(),
model: DEFAULT_MODEL.to_string(),
system_instructions: None,
thinking: None,
temperature: None,
max_tokens: None,
base_url: None,
conversation_id: None,
capabilities: CapabilitiesConfig::default(),
filesystem: None,
api_key_provider: None,
}
}
pub fn with_filesystem(mut self, fs: crate::filesystem::SharedFilesystem) -> Self {
self.filesystem = Some(fs);
self
}
pub fn with_model(mut self, model: impl Into<String>) -> Self {
self.model = model.into();
self
}
pub fn with_system_instructions(mut self, s: impl Into<SystemInstructions>) -> Self {
self.system_instructions = Some(s.into());
self
}
pub fn with_thinking(mut self, level: ThinkingLevel) -> Self {
self.thinking = Some(level);
self
}
pub fn with_temperature(mut self, t: f32) -> Self {
self.temperature = Some(t);
self
}
pub fn with_max_tokens(mut self, n: u32) -> Self {
self.max_tokens = Some(n);
self
}
pub fn with_capabilities(mut self, c: CapabilitiesConfig) -> Self {
self.capabilities = c;
self
}
pub fn with_base_url(mut self, url: url::Url) -> Self {
self.base_url = Some(url);
self
}
}
pub type AnthropicRunners = crate::backends::BackendRunners;
pub struct AnthropicConnectionStrategy {
config: AnthropicBackendConfig,
runners: AnthropicRunners,
typed_capture: Option<Arc<parking_lot::Mutex<Option<Arc<AnthropicConnection>>>>>,
}
impl AnthropicConnectionStrategy {
pub fn new(config: AnthropicBackendConfig) -> Self {
Self {
config,
runners: AnthropicRunners::default(),
typed_capture: None,
}
}
pub fn with_runners(mut self, runners: AnthropicRunners) -> Self {
self.runners = runners;
self
}
pub fn with_typed_capture(
mut self,
slot: Arc<parking_lot::Mutex<Option<Arc<AnthropicConnection>>>>,
) -> Self {
self.typed_capture = Some(slot);
self
}
}
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl ConnectionStrategy for AnthropicConnectionStrategy {
async fn connect(&self) -> Result<Arc<dyn Connection>> {
if self.config.api_key.trim().is_empty() {
return Err(Error::config("AnthropicBackendConfig.api_key is empty"));
}
let mut client = AnthropicClient::new(self.config.api_key.clone())?;
if let Some(base) = &self.config.base_url {
client = client.with_base_url(base.clone());
}
if let Some(provider) = &self.config.api_key_provider {
client = client.with_key_provider(provider.0.clone());
}
let client: SharedClient = Arc::new(client);
if let Some(runner) = self.runners.tool_runner.as_ref() {
let fs: Option<crate::filesystem::SharedFilesystem> =
self.config.filesystem.clone().or_else(default_filesystem);
let deps = BuiltinDeps {
chat_client: None,
chat_model: self.config.model.clone(),
image_client: None,
image_model: String::new(),
fs,
};
let registered = register_builtins(runner, &self.config.capabilities, &deps);
if !registered.is_empty() {
tracing::debug!(?registered, "registered built-in tools (anthropic)");
}
}
let tool_decls = self
.runners
.tool_runner
.as_ref()
.map(|r| build_tool_declarations(r))
.unwrap_or_default();
let loop_config = LoopConfig::from_system(
self.config.model.clone(),
self.config.system_instructions.as_ref(),
self.config.thinking,
self.config.temperature,
self.config.max_tokens,
tool_decls,
self.config.capabilities.compaction_threshold,
)?;
let (steps_tx, _) = broadcast::channel::<Step>(STEP_BROADCAST_CAPACITY);
let state = Arc::new(LoopState::new(steps_tx));
let conv_id = self
.config
.conversation_id
.clone()
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
let typed = Arc::new(AnthropicConnection {
deps_template: TurnDeps {
client,
config: loop_config,
state: state.clone(),
tool_runner: self.runners.tool_runner.clone(),
hook_runner: self.runners.hook_runner.clone(),
session_ctx: self.runners.session_ctx.clone(),
},
state,
conversation_id: conv_id.into(),
});
if let Some(slot) = &self.typed_capture {
*slot.lock() = Some(typed.clone());
}
Ok(typed)
}
}
#[cfg(feature = "native")]
fn default_filesystem() -> Option<crate::filesystem::SharedFilesystem> {
Some(Arc::new(crate::filesystem::NativeFilesystem::new()))
}
#[cfg(not(feature = "native"))]
fn default_filesystem() -> Option<crate::filesystem::SharedFilesystem> {
None
}
fn build_tool_declarations(runner: &ToolRunner) -> Vec<ToolDef> {
runner
.iter_tools()
.into_iter()
.map(|tool| ToolDef {
name: tool.name().to_string(),
description: tool.description().to_string(),
input_schema: tool.input_schema(),
})
.collect()
}
pub struct AnthropicConnection {
deps_template: TurnDeps,
state: Arc<LoopState>,
conversation_id: Arc<str>,
}
impl AnthropicConnection {
pub fn history_bytes(&self) -> Result<Vec<u8>> {
let snapshot = self.state.history.lock().clone();
serde_json::to_vec(&snapshot).map_err(|e| Error::other(format!("history_bytes: {e}")))
}
pub fn set_history_bytes(&self, bytes: &[u8]) -> Result<()> {
if bytes.is_empty() {
return Ok(());
}
let restored: Vec<wire::Message> = serde_json::from_slice(bytes)
.map_err(|e| Error::other(format!("set_history_bytes: {e}")))?;
*self.state.history.lock() = restored;
Ok(())
}
pub async fn compact(&self) -> bool {
compaction::try_compact(
&self.state.history,
&self.deps_template.client,
&self.deps_template.config.model,
)
.await
}
pub fn clear_history(&self) {
self.state.history.lock().clear();
*self.state.last_turn_usage.lock() = None;
*self.state.last_structured_output.lock() = None;
self.state
.next_step_index
.store(0, std::sync::atomic::Ordering::Relaxed);
}
pub fn transcript(&self) -> Vec<crate::types::TranscriptEntry> {
let snap = self.state.history.lock().clone();
project_history(&snap)
}
}
pub fn decode_transcript_bytes(bytes: &[u8]) -> Result<Vec<crate::types::TranscriptEntry>> {
if bytes.is_empty() {
return Ok(Vec::new());
}
let history: Vec<wire::Message> = serde_json::from_slice(bytes)
.map_err(|e| Error::other(format!("decode_transcript_bytes: {e}")))?;
Ok(project_history(&history))
}
fn project_history(history: &[wire::Message]) -> Vec<crate::types::TranscriptEntry> {
use crate::types::{TranscriptEntry, TranscriptRole, TranscriptToolCall};
use wire::{Block, Role};
let mut out: Vec<TranscriptEntry> = Vec::with_capacity(history.len());
let mut call_index: std::collections::HashMap<String, (usize, usize)> =
std::collections::HashMap::new();
for msg in history {
let role = match msg.role {
Role::User => TranscriptRole::User,
Role::Assistant => TranscriptRole::Assistant,
};
let mut buf = String::new();
let mut calls_this_turn: Vec<(String, TranscriptToolCall)> = Vec::new();
for block in &msg.content {
match block {
Block::Text { text } => buf.push_str(text),
Block::Thinking { thinking, .. } => buf.push_str(thinking),
Block::ToolUse { id, name, input } => {
calls_this_turn.push((
id.clone(),
TranscriptToolCall {
name: name.clone(),
args: input.clone(),
result: None,
error: None,
},
));
}
Block::ToolResult {
tool_use_id,
content,
is_error,
} => {
if let Some(&(ei, ci)) = call_index.get(tool_use_id) {
if let Some(call) = out.get_mut(ei).and_then(|e| e.tool_calls.get_mut(ci)) {
if is_error.unwrap_or(false) {
call.error = Some(content.to_string());
} else {
call.result = Some(content.clone());
}
}
}
}
Block::Image { .. } => {}
Block::Other => {}
}
}
if !buf.is_empty() || !calls_this_turn.is_empty() {
let entry_idx = out.len();
let tool_calls: Vec<TranscriptToolCall> = calls_this_turn
.into_iter()
.enumerate()
.map(|(ci, (id, call))| {
call_index.insert(id, (entry_idx, ci));
call
})
.collect();
out.push(TranscriptEntry {
role,
text: buf,
tool_calls,
});
}
}
out
}
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl Connection for AnthropicConnection {
fn is_idle(&self) -> bool {
self.state.idle.load(Ordering::Acquire)
}
fn conversation_id(&self) -> &str {
&self.conversation_id
}
async fn send(&self, content: Content) -> Result<()> {
let user = to_wire_user_content(content.clone())?;
let deps = self.deps_template.clone();
crate::runtime::spawn(async move {
if let Err(e) = run_turn(deps, user, content).await {
warn!(error = %e, "anthropic turn failed");
}
});
Ok(())
}
async fn send_trigger(&self, content: String) -> Result<()> {
self.send(Content::text(content)).await
}
async fn send_tool_results(&self, _results: Vec<ToolResult>) -> Result<()> {
Ok(())
}
fn subscribe_steps(&self) -> StepStream {
crate::backends::subscribe_step_stream(self.state.steps.subscribe(), "anthropic")
}
async fn wait_for_idle(&self) -> Result<()> {
loop {
if self.is_idle() {
return Ok(());
}
self.state.idle_notify.notified().await;
}
}
fn cancel_turn(&self) {
self.state.cancel.store(true, Ordering::Release);
}
async fn shutdown(&self) -> Result<()> {
self.state.idle.store(true, Ordering::Release);
self.state.idle_notify.notify_waiters();
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::filesystem::NativeFilesystem;
use crate::tools::ToolRunner;
use crate::types::StepStatus;
use futures_util::stream::StreamExt;
fn assert_single_type(v: &serde_json::Value, tool: &str, path: &str) {
match v {
serde_json::Value::Object(map) => {
if let Some(t) = map.get("type") {
assert!(
!t.is_array(),
"tool `{tool}` schema at `{path}.type` = {t} is an array union",
);
}
for (k, val) in map {
assert_single_type(val, tool, &format!("{path}.{k}"));
}
}
serde_json::Value::Array(arr) => {
for (i, val) in arr.iter().enumerate() {
assert_single_type(val, tool, &format!("{path}[{i}]"));
}
}
_ => {}
}
}
#[test]
fn anthropic_tool_declarations_have_single_type_schemas() {
let runner = ToolRunner::new();
let deps = BuiltinDeps {
chat_client: None,
chat_model: String::new(),
image_client: None,
image_model: String::new(),
fs: Some(Arc::new(NativeFilesystem::new()) as crate::filesystem::SharedFilesystem),
};
register_builtins(&runner, &CapabilitiesConfig::unrestricted(), &deps);
let decls = build_tool_declarations(&runner);
assert!(!decls.is_empty(), "expected builtins registered");
for d in &decls {
assert_single_type(&d.input_schema, &d.name, "input_schema");
}
}
#[test]
fn gemini_client_coupled_tools_do_not_register() {
let runner = ToolRunner::new();
let deps = BuiltinDeps {
chat_client: None,
chat_model: String::new(),
image_client: None,
image_model: String::new(),
fs: Some(Arc::new(NativeFilesystem::new()) as crate::filesystem::SharedFilesystem),
};
let registered = register_builtins(&runner, &CapabilitiesConfig::unrestricted(), &deps);
assert!(
!registered.iter().any(|n| n == "start_subagent"),
"start_subagent must not register without a chat client"
);
assert!(
!registered.iter().any(|n| n == "generate_image"),
"generate_image must not register without an image client"
);
assert!(registered.iter().any(|n| n == "finish"));
assert!(registered.iter().any(|n| n == "view_file"));
}
#[tokio::test]
async fn empty_api_key_errors() {
let strategy = AnthropicConnectionStrategy::new(AnthropicBackendConfig::new(" "));
match strategy.connect().await {
Ok(_) => panic!("expected empty api_key to error"),
Err(e) => assert!(e.to_string().contains("api_key is empty")),
}
}
#[test]
fn transcript_matches_tool_calls_by_id() {
use wire::{Block, Message, Role};
let history = vec![
Message::user_text("read main.rs"),
Message {
role: Role::Assistant,
content: vec![
Block::Text {
text: "Reading.".into(),
},
Block::ToolUse {
id: "toolu_1".into(),
name: "view_file".into(),
input: serde_json::json!({"path": "main.rs"}),
},
],
},
Message {
role: Role::User,
content: vec![Block::ToolResult {
tool_use_id: "toolu_1".into(),
content: serde_json::json!({"contents": "fn main() {}"}),
is_error: None,
}],
},
Message::assistant_text("Done."),
];
let entries = project_history(&history);
let asst = entries
.iter()
.find(|e| !e.tool_calls.is_empty())
.expect("assistant entry with a tool call");
assert_eq!(asst.tool_calls.len(), 1);
assert_eq!(asst.tool_calls[0].name, "view_file");
assert_eq!(
asst.tool_calls[0].result.as_ref().unwrap()["contents"],
"fn main() {}"
);
}
fn test_connection() -> Arc<AnthropicConnection> {
let (steps_tx, _) = broadcast::channel::<Step>(STEP_BROADCAST_CAPACITY);
let state = Arc::new(LoopState::new(steps_tx));
let client: SharedClient = Arc::new(AnthropicClient::new("k").unwrap());
let config = LoopConfig::from_system(
DEFAULT_MODEL.to_string(),
None,
None,
None,
None,
Vec::new(),
None,
)
.unwrap();
Arc::new(AnthropicConnection {
deps_template: TurnDeps {
client,
config,
state: state.clone(),
tool_runner: None,
hook_runner: None,
session_ctx: None,
},
state,
conversation_id: "test".into(),
})
}
#[tokio::test]
async fn error_step_surfaces_as_stream_error() {
use crate::error::Error;
let conn = test_connection();
let mut stream = conn.subscribe_steps();
conn.state
.steps
.send(Step::turn_error(0, "anthropic HTTP 500: boom"))
.expect("subscriber is live");
let item = stream.next().await.expect("a stream item");
match item {
Ok(step) => panic!("error Step leaked as Ok: {step:?}"),
Err(Error::Other(msg)) => assert!(
msg.contains("anthropic HTTP 500: boom"),
"expected the real error message, got: {msg}"
),
Err(other) => panic!("unexpected error variant: {other:?}"),
}
}
#[tokio::test]
async fn done_step_passes_through_as_ok() {
let conn = test_connection();
let mut stream = conn.subscribe_steps();
conn.state
.steps
.send(Step::turn_complete(
"traj",
0,
StepStatus::Done,
"all good",
"",
false,
None,
None,
))
.expect("subscriber is live");
let item = stream.next().await.expect("a stream item");
let step = item.expect("Done step must pass through as Ok");
assert_eq!(step.content, "all good");
assert_eq!(step.status, StepStatus::Done);
}
#[tokio::test]
async fn chat_text_returns_err_on_connect_failure() {
use crate::conversation::Conversation;
let base = url::Url::parse("http://127.0.0.1:1/").unwrap();
let cfg = AnthropicBackendConfig::new("k").with_base_url(base);
let conn = AnthropicConnectionStrategy::new(cfg)
.connect()
.await
.expect("connect (no network yet)");
let conv = Conversation::new(conn);
let resp = conv.chat("hi").await.expect("send dispatches");
match resp.text().await {
Ok(t) => panic!("expected an error, got empty success: {t:?}"),
Err(e) => {
let s = e.to_string();
assert!(
s.contains("anthropic POST") || s.contains("anthropic HTTP"),
"expected the surfaced turn error, got: {s}"
);
}
}
}
}