use crate::{
error::{CostError, LlmError, StoreError, ToolError},
tape::{TapeEntry, TapeEntryKind, TapeSearchResult},
types::{
AccessDecision, ActivityEntry, ActivityQuery, AgentEvent, ApprovalContext,
ApprovalDecision, ArtifactRecord, ChannelAccessPolicy, InboundMessage, LlmCapabilities,
LlmRequest, LlmResponse, LlmStream, Message, OutboundMessage, SessionId, SessionState,
SubagentResult, TokenUsage, ToolCall, ToolDescriptor, ToolResult, TurnCheckpoint,
},
};
#[async_trait::async_trait]
pub trait LlmPort: Send + Sync {
#[must_use]
fn capabilities(&self) -> LlmCapabilities {
LlmCapabilities::default()
}
async fn complete(&self, req: LlmRequest) -> Result<LlmResponse, LlmError>;
async fn complete_stream(&self, req: LlmRequest) -> Result<LlmStream, LlmError>;
}
#[async_trait::async_trait]
pub trait ContextCompactorPort: Send + Sync {
async fn compact(&self, session: &SessionState) -> Vec<Message>;
}
#[async_trait::async_trait]
pub trait ToolCatalogPort: Send + Sync {
async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, ToolError>;
}
#[async_trait::async_trait]
pub trait ToolExecutorPort: Send + Sync {
async fn call_tool(&self, call: ToolCall) -> Result<ToolResult, ToolError>;
}
#[async_trait::async_trait]
pub trait ToolPort: Send + Sync {
async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, ToolError>;
async fn call_tool(&self, call: ToolCall) -> Result<ToolResult, ToolError>;
}
#[async_trait::async_trait]
impl<T> ToolCatalogPort for T
where
T: ToolPort + ?Sized,
{
async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, ToolError> {
ToolPort::list_tools(self).await
}
}
#[async_trait::async_trait]
impl<T> ToolExecutorPort for T
where
T: ToolPort + ?Sized,
{
async fn call_tool(&self, call: ToolCall) -> Result<ToolResult, ToolError> {
ToolPort::call_tool(self, call).await
}
}
pub trait ToolPolicyPort: Send + Sync {
fn is_tool_allowed(
&self,
tool: &str,
deny_tools: &[String],
allow_tools: Option<&[String]>,
) -> bool;
}
#[async_trait::async_trait]
pub trait ApprovalPort: Send + Sync {
async fn approve_tool_call(
&self,
call: &ToolCall,
context: &ApprovalContext,
) -> Result<ApprovalDecision, ToolError>;
}
#[async_trait::async_trait]
pub trait TurnCheckpointStorePort: Send + Sync {
async fn save_checkpoint(&self, checkpoint: &TurnCheckpoint) -> Result<(), StoreError>;
async fn load_latest(
&self,
session_id: &SessionId,
) -> Result<Option<TurnCheckpoint>, StoreError>;
}
#[async_trait::async_trait]
pub trait ArtifactStorePort: Send + Sync {
async fn put(&self, artifact: ArtifactRecord) -> Result<(), StoreError>;
async fn list_by_session(
&self,
session_id: &SessionId,
) -> Result<Vec<ArtifactRecord>, StoreError>;
}
#[async_trait::async_trait]
pub trait CostMeterPort: Send + Sync {
async fn check_budget(&self, session_id: &SessionId) -> Result<(), CostError>;
async fn record_llm_usage(
&self,
session_id: &SessionId,
model: &str,
usage: &TokenUsage,
) -> Result<(), CostError>;
async fn record_tool_result(
&self,
session_id: &SessionId,
tool_result: &ToolResult,
) -> Result<(), CostError>;
}
#[async_trait::async_trait]
pub trait TapeStorePort: Send + Sync {
async fn append(
&self,
session_id: &SessionId,
kind: TapeEntryKind,
) -> Result<TapeEntry, StoreError>;
async fn entries_since_last_handoff(
&self,
session_id: &SessionId,
) -> Result<Vec<TapeEntry>, StoreError>;
async fn search(
&self,
session_id: &SessionId,
query: &str,
) -> Result<Vec<TapeSearchResult>, StoreError>;
async fn all_entries(&self, session_id: &SessionId) -> Result<Vec<TapeEntry>, StoreError>;
async fn anchors(&self, session_id: &SessionId) -> Result<Vec<TapeEntry>, StoreError>;
}
#[async_trait::async_trait]
pub trait SessionStore: Send + Sync {
async fn load(&self, id: &SessionId) -> Result<Option<SessionState>, StoreError>;
async fn save(&self, id: &SessionId, state: &SessionState) -> Result<(), StoreError>;
async fn save_if_version(
&self,
id: &SessionId,
state: &SessionState,
expected_version: u64,
) -> Result<u64, StoreError> {
let current = self.load(id).await?;
let current_version = current.as_ref().map_or(0, |s| s.version);
if current_version != expected_version {
return Err(StoreError::VersionConflict {
expected: expected_version,
actual: current_version,
});
}
self.save(id, state).await?;
Ok(current_version.saturating_add(1))
}
}
pub trait EventSink: Send + Sync {
fn emit(&self, event: AgentEvent);
}
#[async_trait::async_trait]
pub trait SubagentPort: Send + Sync {
async fn spawn(
&self,
task: String,
parent_session_id: SessionId,
model: Option<String>,
max_steps: Option<u32>,
extra_deny_tools: Vec<String>,
) -> Result<SessionId, crate::error::AgentError>;
async fn await_result(
&self,
subagent_id: &SessionId,
) -> Result<SubagentResult, crate::error::AgentError>;
async fn list_active(
&self,
parent_session_id: &SessionId,
) -> Result<Vec<SessionId>, crate::error::AgentError>;
async fn cancel(&self, subagent_id: &SessionId) -> Result<(), crate::error::AgentError>;
}
pub trait AccessControlPort: Send + Sync {
fn check_access(&self, channel: &str, sender_id: &str) -> AccessDecision;
fn policies(&self) -> &[ChannelAccessPolicy];
}
#[async_trait::async_trait]
pub trait MessageBusPort: Send + Sync {
async fn send(&self, msg: OutboundMessage) -> Result<(), crate::error::AgentError>;
async fn recv(&self) -> Result<InboundMessage, crate::error::AgentError>;
}
#[async_trait::async_trait]
pub trait ActivityJournalPort: Send + Sync {
async fn append(&self, entry: ActivityEntry) -> Result<(), StoreError>;
async fn query(&self, query: &ActivityQuery) -> Result<Vec<ActivityEntry>, StoreError>;
async fn count(&self) -> Result<u64, StoreError>;
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
use super::*;
struct MockLlm {
response: LlmResponse,
}
#[async_trait::async_trait]
impl LlmPort for MockLlm {
async fn complete(&self, _req: LlmRequest) -> Result<LlmResponse, LlmError> {
Ok(self.response.clone())
}
async fn complete_stream(&self, _req: LlmRequest) -> Result<LlmStream, LlmError> {
Err(LlmError::Provider("streaming not implemented in mock".into()))
}
}
struct MockToolPort {
tools: Vec<ToolDescriptor>,
}
#[async_trait::async_trait]
impl ToolPort for MockToolPort {
async fn list_tools(&self) -> Result<Vec<ToolDescriptor>, ToolError> {
Ok(self.tools.clone())
}
async fn call_tool(&self, call: ToolCall) -> Result<ToolResult, ToolError> {
Ok(ToolResult {
name: call.name,
output: serde_json::json!({"result": "mock"}),
is_error: false,
})
}
}
struct MockSessionStore {
inner: Mutex<std::collections::HashMap<SessionId, SessionState>>,
}
impl MockSessionStore {
fn new() -> Self {
Self { inner: Mutex::new(std::collections::HashMap::new()) }
}
}
#[async_trait::async_trait]
impl SessionStore for MockSessionStore {
async fn load(&self, id: &SessionId) -> Result<Option<SessionState>, StoreError> {
let map = self.inner.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
Ok(map.get(id).cloned())
}
async fn save(&self, id: &SessionId, state: &SessionState) -> Result<(), StoreError> {
let mut map = self.inner.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
map.insert(id.clone(), state.clone());
Ok(())
}
}
struct MockEventSink {
events: Mutex<Vec<AgentEvent>>,
}
impl MockEventSink {
fn new() -> Self {
Self { events: Mutex::new(Vec::new()) }
}
}
impl EventSink for MockEventSink {
fn emit(&self, event: AgentEvent) {
let mut events = self.events.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
events.push(event);
}
}
#[tokio::test]
async fn llm_port_complete() {
let llm: Arc<dyn LlmPort> = Arc::new(MockLlm {
response: LlmResponse {
content: "Hello!".into(),
usage: crate::types::TokenUsage { prompt_tokens: 10, completion_tokens: 5 },
finish_reason: crate::types::FinishReason::Stop,
tool_calls: Vec::new(),
},
});
let req = LlmRequest {
model: "test-model".into(),
messages: vec![],
tools: vec![],
output_schema: None,
};
let resp = llm.complete(req).await;
assert!(resp.is_ok(), "complete should succeed");
if let Ok(resp) = resp {
assert_eq!(resp.content, "Hello!");
assert_eq!(resp.usage.total(), 15);
}
}
#[tokio::test]
async fn tool_port_list_and_call() {
let tools: Arc<dyn ToolPort> =
Arc::new(MockToolPort { tools: vec![ToolDescriptor::new("test/echo", "Echo tool")] });
let listed = tools.list_tools().await;
assert!(listed.is_ok(), "list_tools should succeed");
if let Ok(listed) = listed {
assert_eq!(listed.len(), 1);
assert_eq!(listed[0].id, "test/echo");
}
let result =
tools.call_tool(ToolCall::new("test/echo", serde_json::json!({"msg": "hi"}))).await;
assert!(result.is_ok(), "call_tool should succeed");
if let Ok(result) = result {
assert!(!result.is_error);
}
}
#[tokio::test]
async fn session_store_roundtrip() {
let store: Arc<dyn SessionStore> = Arc::new(MockSessionStore::new());
let id = "session-1".to_string();
let loaded_before = store.load(&id).await;
assert!(matches!(loaded_before, Ok(None)));
let state = SessionState {
messages: vec![crate::types::Message::text(crate::types::Role::User, "hello")],
..Default::default()
};
let saved = store.save(&id, &state).await;
assert!(saved.is_ok(), "save should succeed");
let loaded = store.load(&id).await;
assert!(matches!(loaded, Ok(Some(_))), "load should return stored state");
if let Ok(Some(loaded)) = loaded {
assert_eq!(loaded.messages.len(), 1);
assert_eq!(loaded.messages[0].content, "hello");
}
}
#[test]
fn event_sink_collect() {
let sink = MockEventSink::new();
sink.emit(AgentEvent::TurnStarted { session_id: "s1".into() });
sink.emit(AgentEvent::Error { session_id: "s1".into(), step: None, error: "boom".into() });
let events = sink.events.lock().unwrap_or_else(|poisoned| poisoned.into_inner());
assert_eq!(events.len(), 2);
}
#[test]
fn turn_policy_defaults() {
let policy = crate::types::TurnPolicy::default();
assert_eq!(policy.max_steps, 12);
assert_eq!(policy.max_tool_calls, 8);
assert_eq!(policy.max_consecutive_errors, 2);
assert_eq!(policy.turn_timeout_ms, 90_000);
assert_eq!(policy.tool_timeout_ms, 15_000);
}
}