use crate::agent::Agent;
use crate::harness::Harness;
use crate::llm_models::LlmProviderType;
use crate::session_file::{FileInfo, FileStat, GrepMatch, InitialFile, SessionFile};
use crate::tool_types::{ToolCall, ToolDefinition, ToolResult};
use crate::typed_id::{AgentId, HarnessId, ImageId, ModelId, SessionId};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use std::any::{Any, TypeId};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use uuid::Uuid;
fn build_tool_map(tool_defs: &[ToolDefinition]) -> HashMap<&str, &ToolDefinition> {
tool_defs.iter().map(|def| (def.name(), def)).collect()
}
use crate::error::Result;
#[async_trait]
pub trait AgentStore: Send + Sync {
async fn get_agent(&self, agent_id: AgentId) -> Result<Option<Agent>>;
}
#[async_trait]
impl<T: AgentStore + ?Sized> AgentStore for std::sync::Arc<T> {
async fn get_agent(&self, agent_id: AgentId) -> Result<Option<Agent>> {
(**self).get_agent(agent_id).await
}
}
#[async_trait]
pub trait HarnessStore: Send + Sync {
async fn get_harness_chain(&self, harness_id: HarnessId) -> Result<Vec<Harness>>;
}
#[async_trait]
impl<T: HarnessStore + ?Sized> HarnessStore for std::sync::Arc<T> {
async fn get_harness_chain(&self, harness_id: HarnessId) -> Result<Vec<Harness>> {
(**self).get_harness_chain(harness_id).await
}
}
use crate::leased_resource::{LeasedResource, UpsertLeasedResource};
use crate::session::Session;
#[async_trait]
pub trait SessionStore: Send + Sync {
async fn get_session(&self, session_id: SessionId) -> Result<Option<Session>>;
}
#[async_trait]
impl<T: SessionStore + ?Sized> SessionStore for std::sync::Arc<T> {
async fn get_session(&self, session_id: SessionId) -> Result<Option<Session>> {
(**self).get_session(session_id).await
}
}
#[async_trait]
pub trait SessionMutator: Send + Sync {
async fn update_session_title(&self, session_id: SessionId, title: String) -> Result<Session>;
}
#[async_trait]
impl<T: SessionMutator + ?Sized> SessionMutator for std::sync::Arc<T> {
async fn update_session_title(&self, session_id: SessionId, title: String) -> Result<Session> {
(**self).update_session_title(session_id, title).await
}
}
#[derive(Debug, Clone)]
pub struct ModelWithProvider {
pub model: String,
pub provider_type: LlmProviderType,
pub api_key: Option<String>,
pub base_url: Option<String>,
}
#[async_trait]
pub trait LlmProviderStore: Send + Sync {
async fn get_model_with_provider(&self, model_id: ModelId)
-> Result<Option<ModelWithProvider>>;
async fn get_default_model(&self) -> Result<Option<ModelWithProvider>>;
}
#[async_trait]
impl<T: LlmProviderStore + ?Sized> LlmProviderStore for std::sync::Arc<T> {
async fn get_model_with_provider(
&self,
model_id: ModelId,
) -> Result<Option<ModelWithProvider>> {
(**self).get_model_with_provider(model_id).await
}
async fn get_default_model(&self) -> Result<Option<ModelWithProvider>> {
(**self).get_default_model().await
}
}
#[derive(Debug, Clone)]
pub struct StoredImageInfo {
pub id: ImageId,
pub filename: String,
pub content_type: String,
pub size_bytes: i64,
pub metadata: serde_json::Value,
pub created_at: DateTime<Utc>,
}
#[derive(Debug, Clone)]
pub struct StoredImage {
pub info: StoredImageInfo,
pub data: Vec<u8>,
}
#[derive(Debug, Clone)]
pub struct CreateStoredImage {
pub filename: String,
pub content_type: String,
pub data: Vec<u8>,
pub metadata: serde_json::Value,
}
#[async_trait]
pub trait ImageArtifactStore: Send + Sync {
async fn create_image(&self, input: CreateStoredImage) -> Result<StoredImageInfo>;
async fn get_image(&self, image_id: ImageId) -> Result<Option<StoredImage>>;
async fn get_image_info(&self, image_id: ImageId) -> Result<Option<StoredImageInfo>>;
}
#[derive(Debug, Clone)]
pub struct ProviderCredentials {
pub api_key: String,
pub base_url: Option<String>,
}
#[async_trait]
pub trait ProviderCredentialStore: Send + Sync {
async fn get_default_provider_credentials(
&self,
provider_type: &str,
) -> Result<Option<ProviderCredentials>>;
}
#[async_trait]
pub trait ToolExecutor: Send + Sync {
async fn execute(&self, tool_call: &ToolCall, tool_def: &ToolDefinition) -> Result<ToolResult>;
async fn execute_with_context(
&self,
tool_call: &ToolCall,
tool_def: &ToolDefinition,
_context: &ToolContext,
) -> Result<ToolResult> {
self.execute(tool_call, tool_def).await
}
async fn execute_batch(
&self,
tool_calls: &[ToolCall],
tool_defs: &[ToolDefinition],
) -> Result<Vec<ToolResult>> {
let mut results = Vec::with_capacity(tool_calls.len());
let tool_map = build_tool_map(tool_defs);
for tool_call in tool_calls {
let tool_def = tool_map.get(tool_call.name.as_str()).ok_or_else(|| {
crate::error::AgentLoopError::tool(format!(
"Tool definition not found: {}",
tool_call.name
))
})?;
results.push(self.execute(tool_call, tool_def).await?);
}
Ok(results)
}
async fn execute_parallel(
&self,
tool_calls: &[ToolCall],
tool_defs: &[ToolDefinition],
) -> Result<Vec<ToolResult>>
where
Self: Sized,
{
use futures::future::join_all;
let tool_map = build_tool_map(tool_defs);
let futures: Vec<_> = tool_calls
.iter()
.map(|tool_call| async {
let tool_def = tool_map.get(tool_call.name.as_str()).ok_or_else(|| {
crate::error::AgentLoopError::tool(format!(
"Tool definition not found: {}",
tool_call.name
))
})?;
self.execute(tool_call, tool_def).await
})
.collect();
let results = join_all(futures).await;
results.into_iter().collect()
}
}
#[async_trait]
impl ToolExecutor for std::sync::Arc<dyn ToolExecutor> {
async fn execute(&self, tool_call: &ToolCall, tool_def: &ToolDefinition) -> Result<ToolResult> {
(**self).execute(tool_call, tool_def).await
}
async fn execute_with_context(
&self,
tool_call: &ToolCall,
tool_def: &ToolDefinition,
context: &ToolContext,
) -> Result<ToolResult> {
(**self)
.execute_with_context(tool_call, tool_def, context)
.await
}
async fn execute_batch(
&self,
tool_calls: &[ToolCall],
tool_defs: &[ToolDefinition],
) -> Result<Vec<ToolResult>> {
(**self).execute_batch(tool_calls, tool_defs).await
}
}
#[async_trait]
pub trait SessionFileSystem: Send + Sync {
async fn read_file(&self, session_id: SessionId, path: &str) -> Result<Option<SessionFile>>;
async fn write_file(
&self,
session_id: SessionId,
path: &str,
content: &str,
encoding: &str,
) -> Result<SessionFile>;
async fn write_file_if_content_matches(
&self,
session_id: SessionId,
path: &str,
expected_content: &str,
expected_encoding: &str,
content: &str,
encoding: &str,
) -> Result<Option<SessionFile>> {
let Some(existing) = self.read_file(session_id, path).await? else {
return Ok(None);
};
if existing.is_directory {
return Ok(None);
}
let current_content = existing.content.unwrap_or_default();
if current_content != expected_content || existing.encoding != expected_encoding {
return Ok(None);
}
self.write_file(session_id, path, content, encoding)
.await
.map(Some)
}
async fn delete_file(&self, session_id: SessionId, path: &str, recursive: bool)
-> Result<bool>;
async fn list_directory(&self, session_id: SessionId, path: &str) -> Result<Vec<FileInfo>>;
async fn stat_file(&self, session_id: SessionId, path: &str) -> Result<Option<FileStat>>;
async fn grep_files(
&self,
session_id: SessionId,
pattern: &str,
path_pattern: Option<&str>,
) -> Result<Vec<GrepMatch>>;
async fn create_directory(&self, session_id: SessionId, path: &str) -> Result<FileInfo>;
async fn seed_initial_file(&self, session_id: SessionId, file: &InitialFile) -> Result<()> {
if file.is_readonly {
return Err(crate::error::AgentLoopError::store(
"read-only initial files require a SessionFileSystem-specific seed implementation",
));
}
self.write_file(session_id, &file.path, &file.content, &file.encoding)
.await?;
Ok(())
}
}
#[async_trait]
impl<T: SessionFileSystem + ?Sized> SessionFileSystem for std::sync::Arc<T> {
async fn read_file(&self, session_id: SessionId, path: &str) -> Result<Option<SessionFile>> {
(**self).read_file(session_id, path).await
}
async fn write_file(
&self,
session_id: SessionId,
path: &str,
content: &str,
encoding: &str,
) -> Result<SessionFile> {
(**self)
.write_file(session_id, path, content, encoding)
.await
}
async fn write_file_if_content_matches(
&self,
session_id: SessionId,
path: &str,
expected_content: &str,
expected_encoding: &str,
content: &str,
encoding: &str,
) -> Result<Option<SessionFile>> {
(**self)
.write_file_if_content_matches(
session_id,
path,
expected_content,
expected_encoding,
content,
encoding,
)
.await
}
async fn delete_file(
&self,
session_id: SessionId,
path: &str,
recursive: bool,
) -> Result<bool> {
(**self).delete_file(session_id, path, recursive).await
}
async fn list_directory(&self, session_id: SessionId, path: &str) -> Result<Vec<FileInfo>> {
(**self).list_directory(session_id, path).await
}
async fn stat_file(&self, session_id: SessionId, path: &str) -> Result<Option<FileStat>> {
(**self).stat_file(session_id, path).await
}
async fn grep_files(
&self,
session_id: SessionId,
pattern: &str,
path_pattern: Option<&str>,
) -> Result<Vec<GrepMatch>> {
(**self).grep_files(session_id, pattern, path_pattern).await
}
async fn create_directory(&self, session_id: SessionId, path: &str) -> Result<FileInfo> {
(**self).create_directory(session_id, path).await
}
async fn seed_initial_file(&self, session_id: SessionId, file: &InitialFile) -> Result<()> {
(**self).seed_initial_file(session_id, file).await
}
}
pub use SessionFileSystem as SessionFileStore;
#[derive(Clone, Default)]
pub struct SessionFileSystemFactoryContext {
values: Arc<HashMap<TypeId, Arc<dyn Any + Send + Sync>>>,
}
impl SessionFileSystemFactoryContext {
pub fn new() -> Self {
Self::default()
}
pub fn with<T: Any + Send + Sync>(mut self, value: Arc<T>) -> Self {
let values = Arc::make_mut(&mut self.values);
values.insert(TypeId::of::<T>(), value);
self
}
pub fn get<T: Any + Send + Sync>(&self) -> Option<Arc<T>> {
self.values
.get(&TypeId::of::<T>())
.and_then(|value| value.clone().downcast::<T>().ok())
}
}
#[async_trait]
pub trait SessionFileSystemFactory: Send + Sync {
fn name(&self) -> &'static str {
"SessionFileSystemFactory"
}
fn is_disabled(&self) -> bool {
false
}
async fn create_session_file_system(
&self,
context: SessionFileSystemFactoryContext,
) -> Result<Arc<dyn SessionFileSystem>>;
}
#[derive(Debug, Clone, Default)]
pub struct DisabledSessionFileSystemFactory;
#[async_trait]
impl SessionFileSystemFactory for DisabledSessionFileSystemFactory {
fn name(&self) -> &'static str {
"DisabledSessionFileSystemFactory"
}
fn is_disabled(&self) -> bool {
true
}
async fn create_session_file_system(
&self,
_context: SessionFileSystemFactoryContext,
) -> Result<Arc<dyn SessionFileSystem>> {
Err(crate::error::AgentLoopError::config(
"session filesystem is disabled",
))
}
}
#[derive(Debug, Clone)]
pub struct KeyInfo {
pub key: String,
pub created_at: chrono::DateTime<chrono::Utc>,
pub updated_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone)]
pub struct SecretInfo {
pub name: String,
pub created_at: chrono::DateTime<chrono::Utc>,
pub updated_at: chrono::DateTime<chrono::Utc>,
}
#[async_trait]
pub trait SessionStorageStore: Send + Sync {
async fn set_value(&self, session_id: SessionId, key: &str, value: &str) -> Result<()>;
async fn get_value(&self, session_id: SessionId, key: &str) -> Result<Option<String>>;
async fn delete_value(&self, session_id: SessionId, key: &str) -> Result<bool>;
async fn list_keys(&self, session_id: SessionId) -> Result<Vec<KeyInfo>>;
async fn set_secret(&self, session_id: SessionId, name: &str, value: &str) -> Result<()>;
async fn get_secret(&self, session_id: SessionId, name: &str) -> Result<Option<String>>;
async fn delete_secret(&self, session_id: SessionId, name: &str) -> Result<bool>;
async fn list_secrets(&self, session_id: SessionId) -> Result<Vec<SecretInfo>>;
}
use crate::session_schedule::SessionSchedule;
use crate::typed_id::ScheduleId;
#[async_trait]
pub trait SessionScheduleStore: Send + Sync {
async fn create_schedule(
&self,
session_id: SessionId,
description: String,
cron_expression: Option<String>,
scheduled_at: Option<chrono::DateTime<chrono::Utc>>,
timezone: String,
) -> Result<SessionSchedule>;
async fn cancel_schedule(
&self,
session_id: SessionId,
schedule_id: ScheduleId,
) -> Result<SessionSchedule>;
async fn list_schedules(&self, session_id: SessionId) -> Result<Vec<SessionSchedule>>;
async fn count_active_schedules(&self, session_id: SessionId) -> Result<u32>;
}
#[async_trait]
pub trait SessionResourceRegistry: Send + Sync {
async fn register(
&self,
entry: crate::session_resource::RegisterSessionResource,
) -> Result<crate::session_resource::SessionResourceEntry>;
async fn update_status(
&self,
session_id: SessionId,
resource_id: &str,
status: crate::session_resource::SessionResourceStatus,
) -> Result<Option<crate::session_resource::SessionResourceEntry>>;
async fn get(
&self,
session_id: SessionId,
resource_id: &str,
) -> Result<Option<crate::session_resource::SessionResourceEntry>>;
async fn list(
&self,
session_id: SessionId,
filter: Option<&crate::session_resource::SessionResourceFilter>,
) -> Result<Vec<crate::session_resource::SessionResourceEntry>>;
async fn deregister(&self, session_id: SessionId, resource_id: &str) -> Result<bool>;
}
#[async_trait]
pub trait LeasedResourceStore: Send + Sync {
async fn upsert_resource(&self, input: UpsertLeasedResource) -> Result<LeasedResource>;
async fn release_resource(
&self,
session_id: SessionId,
provider: &str,
resource_type: &str,
external_id: &str,
) -> Result<Option<LeasedResource>>;
async fn list_resources(&self, session_id: SessionId) -> Result<Vec<LeasedResource>>;
}
pub type SessionSqlDbStoreRef = Arc<dyn crate::session_sqldb::SessionSqlDbStore>;
#[async_trait]
pub trait UserConnectionResolver: Send + Sync {
async fn get_connection_token(
&self,
session_id: SessionId,
provider: &str,
) -> Result<Option<String>>;
async fn get_connection_user(
&self,
_session_id: SessionId,
_provider: &str,
) -> Result<Option<Uuid>> {
Ok(None)
}
async fn get_connection_token_for_user(
&self,
_user_id: Uuid,
_provider: &str,
) -> Result<Option<String>> {
Ok(None)
}
async fn get_connection_metadata(
&self,
_session_id: SessionId,
_provider: &str,
) -> Result<Option<serde_json::Value>> {
Ok(None)
}
}
#[async_trait]
pub trait BudgetChecker: Send + Sync {
async fn check_budgets(&self, session_id: &str) -> Result<crate::budget::BudgetToolResponse>;
}
#[async_trait]
pub trait PaymentAuthority: Send + Sync {
async fn execute_machine_payment(
&self,
session_id: SessionId,
request: crate::payment::MachinePaymentRequest,
) -> Result<crate::payment::MachinePaymentResponse>;
}
#[async_trait]
pub trait OutboundToolRateLimiter: Send + Sync {
async fn check_org(&self, org_id: &crate::typed_id::OrgId) -> bool;
}
#[derive(Debug)]
pub enum ToolCallClaimResult {
Claimed { claim_token: uuid::Uuid },
AlreadySettled {
result_json: serde_json::Value,
args_fingerprint: String,
},
AlreadyRunning { args_fingerprint: String },
DeterminismViolation {
stored_fingerprint: String,
current_fingerprint: String,
},
}
#[async_trait]
pub trait DurableToolResultStore: Send + Sync + 'static {
async fn try_claim_tool_call(
&self,
turn_id: &str,
tool_call_id: &str,
tool_name: &str,
args_fingerprint: &str,
) -> Result<ToolCallClaimResult>;
async fn settle_tool_call(
&self,
turn_id: &str,
tool_call_id: &str,
result_json: serde_json::Value,
status: &str,
claim_token: uuid::Uuid,
) -> Result<bool>;
}
pub struct NoopDurableToolResultStore;
#[async_trait]
impl DurableToolResultStore for NoopDurableToolResultStore {
async fn try_claim_tool_call(
&self,
_turn_id: &str,
_tool_call_id: &str,
_tool_name: &str,
_args_fingerprint: &str,
) -> Result<ToolCallClaimResult> {
Ok(ToolCallClaimResult::Claimed {
claim_token: uuid::Uuid::new_v4(),
})
}
async fn settle_tool_call(
&self,
_turn_id: &str,
_tool_call_id: &str,
_result_json: serde_json::Value,
_status: &str,
_claim_token: uuid::Uuid,
) -> Result<bool> {
Ok(true)
}
}
#[derive(Debug, Clone)]
pub struct StreamProgress {
pub accumulated_len: usize,
pub last_delta_at: u64,
}
#[async_trait]
pub trait StreamHeartbeater: Send + Sync {
async fn heartbeat(&self, progress: StreamProgress);
}
pub struct NoopStreamHeartbeater;
#[async_trait]
impl StreamHeartbeater for NoopStreamHeartbeater {
async fn heartbeat(&self, _progress: StreamProgress) {}
}
#[derive(Clone)]
pub struct ToolContext {
pub session_id: SessionId,
pub file_store: Option<Arc<dyn SessionFileSystem>>,
pub storage_store: Option<Arc<dyn SessionStorageStore>>,
pub image_store: Option<Arc<dyn ImageArtifactStore>>,
pub provider_credential_store: Option<Arc<dyn ProviderCredentialStore>>,
pub utility_llm_service: Option<Arc<dyn crate::UtilityLlmService>>,
pub egress_service: Option<Arc<dyn crate::EgressService>>,
pub sqldb_store: Option<SessionSqlDbStoreRef>,
pub message_retriever: Option<Arc<dyn crate::message_retriever::MessageRetriever>>,
pub session_store: Option<Arc<dyn SessionStore>>,
pub session_mutator: Option<Arc<dyn SessionMutator>>,
pub agent_store: Option<Arc<dyn AgentStore>>,
pub connection_resolver: Option<Arc<dyn UserConnectionResolver>>,
pub schedule_store: Option<Arc<dyn SessionScheduleStore>>,
pub platform_store: Option<Arc<dyn crate::platform_store::PlatformStore>>,
pub leased_resource_store: Option<Arc<dyn LeasedResourceStore>>,
pub session_resource_registry: Option<Arc<dyn SessionResourceRegistry>>,
pub event_emitter: Option<Arc<dyn EventEmitter>>,
pub event_context: Option<crate::events::EventContext>,
pub tool_call_id: Option<String>,
pub capability_registry: Option<crate::capabilities::CapabilityRegistry>,
pub tool_registry: Option<Arc<crate::tools::ToolRegistry>>,
pub visible_tool_names: Option<Arc<HashSet<String>>>,
pub memory_store: Option<Arc<dyn crate::memory_store::MemoryStoreBackend>>,
pub org_id: Option<crate::typed_id::OrgId>,
pub network_access: Option<crate::network_access::NetworkAccessList>,
pub locale: Option<String>,
pub budget_checker: Option<Arc<dyn BudgetChecker>>,
pub payment_authority: Option<Arc<dyn PaymentAuthority>>,
}
impl ToolContext {
pub fn new(session_id: SessionId) -> Self {
Self {
session_id,
file_store: None,
storage_store: None,
image_store: None,
provider_credential_store: None,
utility_llm_service: None,
egress_service: None,
sqldb_store: None,
message_retriever: None,
session_store: None,
session_mutator: None,
agent_store: None,
connection_resolver: None,
schedule_store: None,
platform_store: None,
leased_resource_store: None,
session_resource_registry: None,
event_emitter: None,
event_context: None,
tool_call_id: None,
capability_registry: None,
tool_registry: None,
visible_tool_names: None,
memory_store: None,
org_id: None,
network_access: None,
locale: None,
budget_checker: None,
payment_authority: None,
}
}
pub fn with_file_store(session_id: SessionId, file_store: Arc<dyn SessionFileSystem>) -> Self {
Self {
session_id,
file_store: Some(file_store),
storage_store: None,
image_store: None,
provider_credential_store: None,
utility_llm_service: None,
egress_service: None,
sqldb_store: None,
message_retriever: None,
session_store: None,
session_mutator: None,
agent_store: None,
connection_resolver: None,
schedule_store: None,
platform_store: None,
leased_resource_store: None,
session_resource_registry: None,
event_emitter: None,
event_context: None,
tool_call_id: None,
capability_registry: None,
tool_registry: None,
visible_tool_names: None,
memory_store: None,
org_id: None,
network_access: None,
locale: None,
budget_checker: None,
payment_authority: None,
}
}
pub fn with_storage_store(
session_id: SessionId,
storage_store: Arc<dyn SessionStorageStore>,
) -> Self {
Self {
session_id,
file_store: None,
storage_store: Some(storage_store),
image_store: None,
provider_credential_store: None,
utility_llm_service: None,
egress_service: None,
sqldb_store: None,
message_retriever: None,
session_store: None,
session_mutator: None,
agent_store: None,
connection_resolver: None,
schedule_store: None,
platform_store: None,
leased_resource_store: None,
session_resource_registry: None,
event_emitter: None,
event_context: None,
tool_call_id: None,
capability_registry: None,
tool_registry: None,
visible_tool_names: None,
memory_store: None,
org_id: None,
network_access: None,
locale: None,
budget_checker: None,
payment_authority: None,
}
}
pub fn with_stores(
session_id: SessionId,
file_store: Arc<dyn SessionFileSystem>,
storage_store: Arc<dyn SessionStorageStore>,
) -> Self {
Self {
session_id,
file_store: Some(file_store),
storage_store: Some(storage_store),
sqldb_store: None,
image_store: None,
provider_credential_store: None,
utility_llm_service: None,
egress_service: None,
message_retriever: None,
session_store: None,
session_mutator: None,
agent_store: None,
connection_resolver: None,
schedule_store: None,
platform_store: None,
leased_resource_store: None,
session_resource_registry: None,
event_emitter: None,
event_context: None,
tool_call_id: None,
capability_registry: None,
tool_registry: None,
visible_tool_names: None,
memory_store: None,
org_id: None,
network_access: None,
locale: None,
budget_checker: None,
payment_authority: None,
}
}
pub fn with_sqldb_store(mut self, sqldb_store: SessionSqlDbStoreRef) -> Self {
self.sqldb_store = Some(sqldb_store);
self
}
pub fn with_message_retriever(
mut self,
retriever: Arc<dyn crate::message_retriever::MessageRetriever>,
) -> Self {
self.message_retriever = Some(retriever);
self
}
pub fn with_session_store(mut self, store: Arc<dyn SessionStore>) -> Self {
self.session_store = Some(store);
self
}
pub fn with_session_mutator(mut self, mutator: Arc<dyn SessionMutator>) -> Self {
self.session_mutator = Some(mutator);
self
}
pub fn with_agent_store(mut self, store: Arc<dyn AgentStore>) -> Self {
self.agent_store = Some(store);
self
}
pub fn with_connection_resolver(mut self, resolver: Arc<dyn UserConnectionResolver>) -> Self {
self.connection_resolver = Some(resolver);
self
}
pub fn with_image_store(
session_id: SessionId,
image_store: Arc<dyn ImageArtifactStore>,
) -> Self {
Self {
session_id,
file_store: None,
storage_store: None,
image_store: Some(image_store),
provider_credential_store: None,
utility_llm_service: None,
egress_service: None,
sqldb_store: None,
message_retriever: None,
session_store: None,
session_mutator: None,
agent_store: None,
connection_resolver: None,
schedule_store: None,
platform_store: None,
leased_resource_store: None,
session_resource_registry: None,
event_emitter: None,
event_context: None,
tool_call_id: None,
capability_registry: None,
tool_registry: None,
visible_tool_names: None,
memory_store: None,
org_id: None,
network_access: None,
locale: None,
budget_checker: None,
payment_authority: None,
}
}
pub fn with_provider_credential_store(
mut self,
store: Arc<dyn ProviderCredentialStore>,
) -> Self {
self.provider_credential_store = Some(store);
self
}
pub fn with_utility_llm_service(mut self, service: Arc<dyn crate::UtilityLlmService>) -> Self {
self.utility_llm_service = Some(service);
self
}
pub fn with_egress_service(mut self, service: Arc<dyn crate::EgressService>) -> Self {
self.egress_service = Some(service);
self
}
pub fn with_schedule_store(mut self, store: Arc<dyn SessionScheduleStore>) -> Self {
self.schedule_store = Some(store);
self
}
pub fn with_platform_store(
mut self,
store: Arc<dyn crate::platform_store::PlatformStore>,
) -> Self {
self.platform_store = Some(store);
self
}
pub fn with_leased_resource_store(mut self, store: Arc<dyn LeasedResourceStore>) -> Self {
self.leased_resource_store = Some(store);
self
}
pub fn with_session_resource_registry(
mut self,
registry: Arc<dyn SessionResourceRegistry>,
) -> Self {
self.session_resource_registry = Some(registry);
self
}
pub fn with_memory_store(
mut self,
store: Arc<dyn crate::memory_store::MemoryStoreBackend>,
) -> Self {
self.memory_store = Some(store);
self
}
pub fn with_org_id(mut self, org_id: crate::typed_id::OrgId) -> Self {
self.org_id = Some(org_id);
self
}
pub fn with_tool_registry(mut self, registry: Arc<crate::tools::ToolRegistry>) -> Self {
self.tool_registry = Some(registry);
self
}
pub fn with_visible_tool_names(mut self, names: Arc<HashSet<String>>) -> Self {
self.visible_tool_names = Some(names);
self
}
pub fn with_network_access(
mut self,
network_access: Option<crate::network_access::NetworkAccessList>,
) -> Self {
self.network_access = network_access;
self
}
pub fn with_payment_authority(mut self, authority: Arc<dyn PaymentAuthority>) -> Self {
self.payment_authority = Some(authority);
self
}
pub async fn emit_progress(&self, tool_name: &str, message: &str) {
let (Some(emitter), Some(ctx), Some(call_id)) =
(&self.event_emitter, &self.event_context, &self.tool_call_id)
else {
return;
};
if let Err(e) = emitter
.emit(EventRequest::new(
self.session_id,
ctx.clone(),
crate::events::ToolProgressData {
tool_call_id: call_id.clone(),
tool_name: tool_name.to_string(),
message: message.to_string(),
display_name: None,
},
))
.await
{
tracing::debug!(
tool_call_id = call_id,
tool_name,
error = %e,
"Failed to emit tool.progress event"
);
}
}
pub async fn emit_tool_output(&self, tool_name: &str, delta: &str, stream: &str) {
let (Some(emitter), Some(ctx), Some(call_id)) =
(&self.event_emitter, &self.event_context, &self.tool_call_id)
else {
return;
};
if let Err(e) = emitter
.emit(EventRequest::new(
self.session_id,
ctx.clone(),
crate::events::ToolOutputDeltaData {
tool_call_id: call_id.clone(),
tool_name: tool_name.to_string(),
delta: delta.to_string(),
stream: stream.to_string(),
},
))
.await
{
tracing::debug!(
tool_call_id = call_id,
tool_name,
error = %e,
"Failed to emit tool.output.delta event"
);
}
}
}
impl std::fmt::Debug for ToolContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ToolContext")
.field("session_id", &self.session_id)
.field("file_store", &self.file_store.is_some())
.field("storage_store", &self.storage_store.is_some())
.field("image_store", &self.image_store.is_some())
.field(
"provider_credential_store",
&self.provider_credential_store.is_some(),
)
.field("utility_llm_service", &self.utility_llm_service.is_some())
.field("egress_service", &self.egress_service.is_some())
.field("sqldb_store", &self.sqldb_store.is_some())
.field("message_retriever", &self.message_retriever.is_some())
.field("session_store", &self.session_store.is_some())
.field("session_mutator", &self.session_mutator.is_some())
.field("agent_store", &self.agent_store.is_some())
.field("connection_resolver", &self.connection_resolver.is_some())
.field("schedule_store", &self.schedule_store.is_some())
.field("platform_store", &self.platform_store.is_some())
.field(
"leased_resource_store",
&self.leased_resource_store.is_some(),
)
.field("event_emitter", &self.event_emitter.is_some())
.field("tool_registry", &self.tool_registry.is_some())
.field("memory_store", &self.memory_store.is_some())
.field("payment_authority", &self.payment_authority.is_some())
.field("org_id", &self.org_id)
.finish()
}
}
use crate::events::{Event, EventRequest};
#[async_trait]
pub trait EventEmitter: Send + Sync {
async fn emit(&self, request: EventRequest) -> Result<Event>;
}
#[async_trait]
impl<E: EventEmitter + ?Sized> EventEmitter for Arc<E> {
async fn emit(&self, request: EventRequest) -> Result<Event> {
(**self).emit(request).await
}
}
#[derive(Debug, Clone, Default)]
pub struct NoopEventEmitter;
#[async_trait]
impl EventEmitter for NoopEventEmitter {
async fn emit(&self, request: EventRequest) -> Result<Event> {
Ok(request.into_event(crate::typed_id::EventId::new(), 0))
}
}
#[derive(Debug, Clone)]
pub struct ResolvedImage {
pub base64: String,
pub media_type: String,
}
impl ResolvedImage {
pub fn new(base64: impl Into<String>, media_type: impl Into<String>) -> Self {
Self {
base64: base64.into(),
media_type: media_type.into(),
}
}
pub fn to_data_url(&self) -> String {
format!("data:{};base64,{}", self.media_type, self.base64)
}
}
#[async_trait]
pub trait ImageResolver: Send + Sync {
async fn resolve_image(&self, image_id: Uuid) -> Result<Option<ResolvedImage>>;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_resolved_image_new() {
let image = ResolvedImage::new("SGVsbG8=", "image/png");
assert_eq!(image.base64, "SGVsbG8=");
assert_eq!(image.media_type, "image/png");
}
#[test]
fn test_resolved_image_to_data_url() {
let image = ResolvedImage::new("SGVsbG8=", "image/png");
let data_url = image.to_data_url();
assert_eq!(data_url, "data:image/png;base64,SGVsbG8=");
}
#[test]
fn test_resolved_image_jpeg() {
let image = ResolvedImage::new("base64data", "image/jpeg");
let data_url = image.to_data_url();
assert!(data_url.starts_with("data:image/jpeg;base64,"));
}
}