use async_trait::async_trait;
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Instant;
use super::act_hooks::{self, PostActHook};
use super::{Atom, AtomContext};
use crate::error::Result;
use crate::events::{
ActCompletedData, ActStartedData, EventContext, EventRequest, ToolCompletedData,
ToolStartedData,
};
use crate::message::ContentPart;
use crate::tool_narration::{
ToolNarrationPhase, render_group_headline_with_locale, render_tool_narration_with_locale,
};
use crate::tool_types::{ToolCall, ToolDefinition, ToolResult};
use crate::traits::{
AgentStore, EventEmitter, SessionFileSystem, SessionMutator, SessionStore, ToolContext,
ToolExecutor,
};
use crate::typed_id::{AgentId, HarnessId};
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ActInput {
#[serde(skip_serializing_if = "Option::is_none")]
pub org_id: Option<i64>,
pub context: AtomContext,
pub harness_id: HarnessId,
#[serde(skip_serializing_if = "Option::is_none")]
pub agent_id: Option<AgentId>,
pub tool_calls: Vec<ToolCall>,
pub tool_definitions: Vec<ToolDefinition>,
#[serde(skip_serializing_if = "Option::is_none")]
pub locale: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub blueprint_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub network_access: Option<crate::network_access::NetworkAccessList>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ToolCallResult {
pub tool_call: ToolCall,
pub result: ToolResult,
pub success: bool,
pub status: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub connection_required: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ActResult {
pub results: Vec<ToolCallResult>,
pub completed: bool,
pub success_count: u32,
pub error_count: u32,
#[serde(default)]
pub waiting_for_tool_results: bool,
#[serde(default, skip_serializing_if = "is_false")]
pub blocked: bool,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub client_tool_calls: Vec<ToolCall>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub client_tool_definitions: Vec<ToolDefinition>,
}
fn is_false(value: &bool) -> bool {
!*value
}
pub struct ActAtom<T, E>
where
T: ToolExecutor,
E: EventEmitter,
{
tool_executor: T,
event_emitter: Arc<E>,
file_store: Option<Arc<dyn SessionFileSystem>>,
sqldb_store: Option<crate::traits::SessionSqlDbStoreRef>,
storage_store: Option<Arc<dyn crate::traits::SessionStorageStore>>,
image_store: Option<Arc<dyn crate::traits::ImageArtifactStore>>,
provider_credential_store: Option<Arc<dyn crate::traits::ProviderCredentialStore>>,
connection_resolver: Option<Arc<dyn crate::traits::UserConnectionResolver>>,
session_store: Option<Arc<dyn SessionStore>>,
session_mutator: Option<Arc<dyn SessionMutator>>,
agent_store: Option<Arc<dyn AgentStore>>,
schedule_store: Option<Arc<dyn crate::traits::SessionScheduleStore>>,
platform_store: Option<Arc<dyn crate::platform_store::PlatformStore>>,
leased_resource_store: Option<Arc<dyn crate::traits::LeasedResourceStore>>,
session_resource_registry: Option<Arc<dyn crate::traits::SessionResourceRegistry>>,
capability_registry: Option<crate::capabilities::CapabilityRegistry>,
tool_registry: Option<Arc<crate::tools::ToolRegistry>>,
memory_store: Option<Arc<dyn crate::memory_store::MemoryStoreBackend>>,
org_id: Option<crate::typed_id::OrgId>,
network_access: Option<crate::network_access::NetworkAccessList>,
budget_checker: Option<Arc<dyn crate::traits::BudgetChecker>>,
payment_authority: Option<Arc<dyn crate::traits::PaymentAuthority>>,
hooks: Vec<Box<dyn PostActHook>>,
post_tool_hooks: Vec<Arc<dyn act_hooks::PostToolExecHook>>,
tool_call_hooks: Vec<Arc<dyn crate::capabilities::ToolCallHook>>,
final_post_tool_hooks: Vec<Arc<dyn act_hooks::PostToolExecHook>>,
}
impl<T, E> ActAtom<T, E>
where
T: ToolExecutor,
E: EventEmitter,
{
pub fn new(tool_executor: T, event_emitter: E) -> Self {
Self {
tool_executor,
event_emitter: Arc::new(event_emitter),
file_store: None,
sqldb_store: None,
storage_store: None,
image_store: None,
provider_credential_store: None,
connection_resolver: None,
session_store: None,
session_mutator: None,
agent_store: None,
schedule_store: None,
platform_store: None,
leased_resource_store: None,
session_resource_registry: None,
capability_registry: None,
tool_registry: None,
memory_store: None,
org_id: None,
network_access: None,
budget_checker: None,
payment_authority: None,
hooks: Self::default_hooks(),
post_tool_hooks: Vec::new(),
tool_call_hooks: Vec::new(),
final_post_tool_hooks: Self::default_final_hooks(),
}
}
pub fn with_file_store(
tool_executor: T,
event_emitter: E,
file_store: Arc<dyn SessionFileSystem>,
) -> Self {
Self {
tool_executor,
event_emitter: Arc::new(event_emitter),
file_store: Some(file_store),
sqldb_store: None,
storage_store: None,
image_store: None,
provider_credential_store: None,
connection_resolver: None,
session_store: None,
session_mutator: None,
agent_store: None,
schedule_store: None,
platform_store: None,
leased_resource_store: None,
session_resource_registry: None,
capability_registry: None,
tool_registry: None,
memory_store: None,
org_id: None,
network_access: None,
budget_checker: None,
payment_authority: None,
hooks: Self::default_hooks(),
post_tool_hooks: Vec::new(),
tool_call_hooks: Vec::new(),
final_post_tool_hooks: Self::default_final_hooks(),
}
}
pub fn with_hook(mut self, hook: Box<dyn PostActHook>) -> Self {
self.hooks.push(hook);
self
}
fn default_hooks() -> Vec<Box<dyn PostActHook>> {
vec![
Box::new(act_hooks::ConnectionSetupHook),
Box::new(act_hooks::ClientSideToolHook),
]
}
fn default_final_hooks() -> Vec<Arc<dyn act_hooks::PostToolExecHook>> {
vec![Arc::new(act_hooks::OutputHardLimitHook)]
}
pub fn with_sqldb_store(mut self, store: crate::traits::SessionSqlDbStoreRef) -> Self {
self.sqldb_store = Some(store);
self
}
pub fn with_storage_store(
mut self,
store: Arc<dyn crate::traits::SessionStorageStore>,
) -> Self {
self.storage_store = Some(store);
self
}
pub fn with_image_store(mut self, store: Arc<dyn crate::traits::ImageArtifactStore>) -> Self {
self.image_store = Some(store);
self
}
pub fn with_provider_credential_store(
mut self,
store: Arc<dyn crate::traits::ProviderCredentialStore>,
) -> Self {
self.provider_credential_store = Some(store);
self
}
pub fn with_connection_resolver(
mut self,
resolver: Arc<dyn crate::traits::UserConnectionResolver>,
) -> Self {
self.connection_resolver = Some(resolver);
self
}
pub fn with_session_store(mut self, store: Arc<dyn SessionStore>) -> Self {
self.session_store = Some(store);
self
}
pub fn with_session_mutator(mut self, mutator: Arc<dyn SessionMutator>) -> Self {
self.session_mutator = Some(mutator);
self
}
pub fn with_agent_store(mut self, store: Arc<dyn AgentStore>) -> Self {
self.agent_store = Some(store);
self
}
pub fn with_schedule_store(
mut self,
store: Arc<dyn crate::traits::SessionScheduleStore>,
) -> Self {
self.schedule_store = Some(store);
self
}
pub fn with_platform_store(
mut self,
store: Arc<dyn crate::platform_store::PlatformStore>,
) -> Self {
self.platform_store = Some(store);
self
}
pub fn with_leased_resource_store(
mut self,
store: Arc<dyn crate::traits::LeasedResourceStore>,
) -> Self {
self.leased_resource_store = Some(store);
self
}
pub fn with_session_resource_registry(
mut self,
registry: Arc<dyn crate::traits::SessionResourceRegistry>,
) -> Self {
self.session_resource_registry = Some(registry);
self
}
pub fn with_capability_registry(
mut self,
registry: crate::capabilities::CapabilityRegistry,
) -> Self {
self.capability_registry = Some(registry);
self
}
pub fn with_tool_registry(mut self, registry: Arc<crate::tools::ToolRegistry>) -> Self {
self.tool_registry = Some(registry);
self
}
pub fn with_post_tool_hooks(
mut self,
hooks: Vec<Arc<dyn act_hooks::PostToolExecHook>>,
) -> Self {
self.post_tool_hooks.extend(hooks);
self
}
pub fn with_tool_call_hooks(
mut self,
hooks: Vec<Arc<dyn crate::capabilities::ToolCallHook>>,
) -> Self {
self.tool_call_hooks.extend(hooks);
self
}
pub fn with_memory_store(
mut self,
store: Arc<dyn crate::memory_store::MemoryStoreBackend>,
) -> Self {
self.memory_store = Some(store);
self
}
pub fn with_org_id(mut self, org_id: crate::typed_id::OrgId) -> Self {
self.org_id = Some(org_id);
self
}
pub fn with_network_access(
mut self,
network_access: Option<crate::network_access::NetworkAccessList>,
) -> Self {
self.network_access = network_access;
self
}
pub fn with_budget_checker(mut self, checker: Arc<dyn crate::traits::BudgetChecker>) -> Self {
self.budget_checker = Some(checker);
self
}
pub fn with_payment_authority(
mut self,
authority: Arc<dyn crate::traits::PaymentAuthority>,
) -> Self {
self.payment_authority = Some(authority);
self
}
}
#[async_trait]
impl<T, E> Atom for ActAtom<T, E>
where
T: ToolExecutor + Send + Sync,
E: EventEmitter + Send + Sync + 'static,
{
type Input = ActInput;
type Output = ActResult;
fn name(&self) -> &'static str {
"act"
}
async fn execute(&self, input: Self::Input) -> Result<Self::Output> {
let ActInput {
context,
tool_calls,
tool_definitions,
locale,
network_access,
.. } = input;
let (server_tool_calls, client_tool_calls): (Vec<_>, Vec<_>) =
tool_calls.into_iter().partition(|tc| {
tool_definitions
.iter()
.find(|td| td.name() == tc.name)
.map(|td| !matches!(td, ToolDefinition::ClientSide(_)))
.unwrap_or(true) });
let client_tool_calls: Vec<_> = client_tool_calls
.into_iter()
.map(|tool_call| self.transform_tool_call_for_execution(tool_call))
.collect();
let client_tool_definitions: Vec<_> = if client_tool_calls.is_empty() {
vec![]
} else {
tool_definitions
.iter()
.filter(|td| {
if let ToolDefinition::ClientSide(ct) = td {
client_tool_calls.iter().any(|tc| tc.name == ct.name)
} else {
false
}
})
.cloned()
.collect()
};
if server_tool_calls.is_empty() && client_tool_calls.is_empty() {
return Ok(ActResult {
results: vec![],
completed: true,
success_count: 0,
error_count: 0,
waiting_for_tool_results: false,
blocked: false,
client_tool_calls: vec![],
client_tool_definitions: vec![],
});
}
if server_tool_calls.is_empty() {
let mut result = ActResult {
results: vec![],
completed: true,
success_count: 0,
error_count: 0,
waiting_for_tool_results: false,
blocked: false,
client_tool_calls,
client_tool_definitions,
};
act_hooks::run_post_act_hooks(
&self.hooks,
&context,
&mut result,
&tool_definitions,
&self.event_emitter,
locale.as_deref(),
)
.await;
return Ok(result);
}
let tool_calls = server_tool_calls;
tracing::info!(
session_id = %context.session_id,
turn_id = %context.turn_id,
exec_id = %context.exec_id,
tool_count = %tool_calls.len(),
"ActAtom: executing tools in parallel"
);
let trace_id = context.turn_id.to_string();
let act_span_id = Uuid::now_v7().to_string();
let parent_span_id = trace_id.clone();
let event_context = EventContext::from_atom_context(&context).with_span(
trace_id.clone(),
act_span_id.clone(),
Some(parent_span_id.clone()),
);
let act_start = Instant::now();
let tool_map: std::collections::HashMap<&str, &ToolDefinition> = tool_definitions
.iter()
.map(|def| {
let name = def.name();
(name, def)
})
.collect();
let mut started_data = ActStartedData::with_definitions_and_locale(
&tool_calls,
&tool_definitions,
locale.as_deref(),
);
for summary in &mut started_data.tool_calls {
if let Some(tool_call) = tool_calls.iter().find(|tc| tc.id == summary.id) {
let tool_def = tool_map.get(tool_call.name.as_str()).copied();
summary.narration = Some(self.render_tool_narration(
tool_def,
tool_call,
ToolNarrationPhase::Started,
locale.as_deref(),
));
}
}
if tool_calls.len() == 1 {
started_data.headline = started_data
.tool_calls
.first()
.and_then(|summary| summary.narration.clone());
}
if let Err(e) = self
.event_emitter
.emit(EventRequest::new(
context.session_id,
event_context.clone(),
started_data,
))
.await
{
tracing::warn!(
session_id = %context.session_id,
error = %e,
"ActAtom: failed to emit act.started event"
);
}
let futures: Vec<_> = tool_calls
.iter()
.map(|tool_call| {
let tool_def = tool_map.get(tool_call.name.as_str()).cloned();
self.execute_single_tool(
&context,
tool_call.clone(),
tool_def,
&trace_id,
&act_span_id,
locale.as_deref(),
network_access.as_ref(),
)
})
.collect();
let results = join_all(futures).await;
let success_count = results.iter().filter(|r| r.success).count() as u32;
let error_count = results.iter().filter(|r| !r.success).count() as u32;
let act_duration_ms = act_start.elapsed().as_millis() as u64;
let completed_context = EventContext::from_atom_context(&context).with_span(
trace_id.clone(),
act_span_id.clone(), Some(parent_span_id.clone()),
);
let mut completed_headline = render_group_headline_with_locale(
&tool_calls,
&tool_definitions,
ToolNarrationPhase::Completed,
locale.as_deref(),
);
if tool_calls.len() == 1
&& let Some(tool_call) = tool_calls.first()
{
completed_headline = Some(self.render_tool_narration(
tool_map.get(tool_call.name.as_str()).copied(),
tool_call,
ToolNarrationPhase::Completed,
locale.as_deref(),
));
}
if error_count > 0 {
let suffix = crate::localization::format_error_suffix(locale.as_deref(), error_count);
completed_headline = Some(match completed_headline {
Some(text) => format!("{text}{suffix}"),
None => {
crate::localization::format_completed_tool_batch(locale.as_deref(), error_count)
}
});
}
if let Err(e) = self
.event_emitter
.emit(EventRequest::new(
context.session_id,
completed_context,
ActCompletedData {
completed: true,
success_count,
error_count,
duration_ms: Some(act_duration_ms),
headline: completed_headline,
},
))
.await
{
tracing::warn!(
session_id = %context.session_id,
error = %e,
"ActAtom: failed to emit act.completed event"
);
}
tracing::info!(
session_id = %context.session_id,
turn_id = %context.turn_id,
success_count = %success_count,
error_count = %error_count,
"ActAtom: all tools completed"
);
let mut act_result = ActResult {
results,
completed: true,
success_count,
error_count,
waiting_for_tool_results: false,
blocked: false,
client_tool_calls,
client_tool_definitions,
};
act_hooks::run_post_act_hooks(
&self.hooks,
&context,
&mut act_result,
&tool_definitions,
&self.event_emitter,
locale.as_deref(),
)
.await;
Ok(act_result)
}
}
impl<T, E> ActAtom<T, E>
where
T: ToolExecutor + Send + Sync,
E: EventEmitter + Send + Sync + 'static,
{
fn render_tool_narration(
&self,
tool_def: Option<&ToolDefinition>,
tool_call: &ToolCall,
phase: ToolNarrationPhase,
locale: Option<&str>,
) -> String {
for hook in &self.tool_call_hooks {
if let Some(narration) = hook.narration(tool_def, tool_call, phase, locale) {
return narration;
}
}
render_tool_narration_with_locale(tool_def, tool_call, phase, locale)
}
fn transform_tool_call_for_execution(&self, tool_call: ToolCall) -> ToolCall {
self.tool_call_hooks
.iter()
.fold(tool_call, |tool_call, hook| {
hook.transform_for_execution(tool_call)
})
}
#[allow(clippy::too_many_arguments)]
async fn execute_single_tool(
&self,
context: &AtomContext,
tool_call: ToolCall,
tool_def: Option<&ToolDefinition>,
trace_id: &str,
act_span_id: &str,
locale: Option<&str>,
network_access: Option<&crate::network_access::NetworkAccessList>,
) -> ToolCallResult {
tracing::debug!(
session_id = %context.session_id,
turn_id = %context.turn_id,
tool_name = %tool_call.name,
tool_call_id = %tool_call.id,
"ActAtom: executing tool"
);
let tool_span_id = Uuid::now_v7().to_string();
let event_context = EventContext::from_atom_context(context).with_span(
trace_id.to_string(),
tool_span_id.clone(),
Some(act_span_id.to_string()),
);
let tool_start = Instant::now();
let display_name = crate::localization::localized_tool_display_name(
&tool_call.name,
tool_def.and_then(|d| d.display_name()),
locale,
);
let capability_attribution = tool_def.and_then(|def| {
def.capability_attribution()
.map(|(id, name)| (id.to_string(), name.map(str::to_string)))
});
if let Err(e) = self
.event_emitter
.emit(EventRequest::new(
context.session_id,
event_context.clone(),
ToolStartedData {
tool_call: tool_call.clone(),
display_name: display_name.clone(),
narration: Some(self.render_tool_narration(
tool_def,
&tool_call,
ToolNarrationPhase::Started,
locale,
)),
},
))
.await
{
tracing::warn!(
session_id = %context.session_id,
tool_call_id = %tool_call.id,
error = %e,
"ActAtom: failed to emit tool.started event"
);
}
let Some(tool_def) = tool_def else {
let error_msg = format!("Tool definition not found: {}", tool_call.name);
let tool_duration_ms = tool_start.elapsed().as_millis() as u64;
if let Err(e) = self
.event_emitter
.emit(EventRequest::new(
context.session_id,
event_context,
ToolCompletedData::failure(
tool_call.id.clone(),
tool_call.name.clone(),
"error".to_string(),
error_msg.clone(),
Some(tool_duration_ms),
)
.with_narration(Some(self.render_tool_narration(
None,
&tool_call,
ToolNarrationPhase::Failed,
locale,
))),
))
.await
{
tracing::warn!(
session_id = %context.session_id,
tool_call_id = %tool_call.id,
error = %e,
"ActAtom: failed to emit tool.completed event"
);
}
return ToolCallResult {
tool_call: tool_call.clone(),
result: ToolResult {
tool_call_id: tool_call.id.clone(),
result: None,
images: None,
error: Some(error_msg),
connection_required: None,
raw_output: None,
},
success: false,
status: "error".to_string(),
connection_required: None,
};
};
let mut tool_context = if let Some(ref store) = self.file_store {
ToolContext::with_file_store(context.session_id, store.clone())
} else {
ToolContext::new(context.session_id)
};
if let Some(ref store) = self.sqldb_store {
tool_context.sqldb_store = Some(store.clone());
}
if let Some(ref store) = self.storage_store {
tool_context.storage_store = Some(store.clone());
}
if let Some(ref store) = self.image_store {
tool_context.image_store = Some(store.clone());
}
if let Some(ref store) = self.provider_credential_store {
tool_context.provider_credential_store = Some(store.clone());
}
if let Some(ref resolver) = self.connection_resolver {
tool_context.connection_resolver = Some(resolver.clone());
}
if let Some(ref store) = self.session_store {
tool_context.session_store = Some(store.clone());
}
if let Some(ref mutator) = self.session_mutator {
tool_context.session_mutator = Some(mutator.clone());
}
if let Some(ref store) = self.agent_store {
tool_context.agent_store = Some(store.clone());
}
if let Some(ref store) = self.schedule_store {
tool_context.schedule_store = Some(store.clone());
}
if let Some(ref store) = self.platform_store {
tool_context.platform_store = Some(store.clone());
}
if let Some(ref store) = self.leased_resource_store {
tool_context.leased_resource_store = Some(store.clone());
}
if let Some(ref registry) = self.session_resource_registry {
tool_context.session_resource_registry = Some(registry.clone());
}
if let Some(ref registry) = self.capability_registry {
tool_context.capability_registry = Some(registry.clone());
}
if let Some(ref registry) = self.tool_registry {
tool_context.tool_registry = Some(registry.clone());
}
if let Some(ref store) = self.memory_store {
tool_context.memory_store = Some(store.clone());
}
if let Some(ref checker) = self.budget_checker {
tool_context.budget_checker = Some(checker.clone());
}
if let Some(ref authority) = self.payment_authority {
tool_context.payment_authority = Some(authority.clone());
}
tool_context.org_id = self.org_id;
tool_context.network_access = network_access
.cloned()
.or_else(|| self.network_access.clone());
tool_context.event_emitter = Some(self.event_emitter.clone() as Arc<dyn EventEmitter>);
tool_context.event_context = Some(event_context.clone());
tool_context.tool_call_id = Some(tool_call.id.clone());
let execution_tool_call = self.transform_tool_call_for_execution(tool_call.clone());
let result = self
.tool_executor
.execute_with_context(&execution_tool_call, tool_def, &tool_context)
.await;
match result {
Ok(mut tool_result) => {
act_hooks::run_post_tool_exec_hooks(
&self.post_tool_hooks,
&self.final_post_tool_hooks,
&execution_tool_call,
tool_def,
&mut tool_result,
&tool_context,
)
.await;
let tool_duration_ms = tool_start.elapsed().as_millis() as u64;
let success = tool_result.error.is_none();
let status = if success { "success" } else { "error" };
let completed_data = if success {
let mut result_content = tool_result
.result
.as_ref()
.map(|r| vec![ContentPart::text(r.to_string())])
.unwrap_or_default();
if let Some(ref images) = tool_result.images {
for img in images {
result_content.push(ContentPart::Image(
crate::message::ImageContentPart::from_base64(
&img.base64,
&img.media_type,
),
));
}
}
ToolCompletedData::success(
tool_call.id.clone(),
tool_call.name.clone(),
result_content,
Some(tool_duration_ms),
)
.with_display_name(display_name.clone())
.with_capability_attribution(
capability_attribution.as_ref().map(|(id, _)| id.clone()),
capability_attribution
.as_ref()
.and_then(|(_, name)| name.clone()),
)
.with_narration(Some(self.render_tool_narration(
Some(tool_def),
&tool_call,
ToolNarrationPhase::Completed,
locale,
)))
} else {
ToolCompletedData::failure(
tool_call.id.clone(),
tool_call.name.clone(),
status.to_string(),
tool_result.error.clone().unwrap_or_default(),
Some(tool_duration_ms),
)
.with_display_name(display_name.clone())
.with_capability_attribution(
capability_attribution.as_ref().map(|(id, _)| id.clone()),
capability_attribution
.as_ref()
.and_then(|(_, name)| name.clone()),
)
.with_narration(Some(self.render_tool_narration(
Some(tool_def),
&tool_call,
ToolNarrationPhase::Failed,
locale,
)))
};
if let Err(e) = self
.event_emitter
.emit(EventRequest::new(
context.session_id,
event_context.clone(),
completed_data,
))
.await
{
tracing::warn!(
session_id = %context.session_id,
tool_call_id = %tool_call.id,
error = %e,
"ActAtom: failed to emit tool.completed event"
);
}
tracing::debug!(
session_id = %context.session_id,
tool_name = %tool_call.name,
tool_call_id = %tool_call.id,
success = %success,
"ActAtom: tool execution completed"
);
let conn_req = tool_result.connection_required.clone();
ToolCallResult {
tool_call,
result: tool_result,
success,
status: status.to_string(),
connection_required: conn_req,
}
}
Err(e) => {
let tool_duration_ms = tool_start.elapsed().as_millis() as u64;
let error_msg = e.to_string();
if let Err(emit_err) = self
.event_emitter
.emit(EventRequest::new(
context.session_id,
event_context,
ToolCompletedData::failure(
tool_call.id.clone(),
tool_call.name.clone(),
"error".to_string(),
error_msg.clone(),
Some(tool_duration_ms),
)
.with_display_name(display_name.clone())
.with_capability_attribution(
capability_attribution.as_ref().map(|(id, _)| id.clone()),
capability_attribution
.as_ref()
.and_then(|(_, name)| name.clone()),
)
.with_narration(Some(self.render_tool_narration(
Some(tool_def),
&tool_call,
ToolNarrationPhase::Failed,
locale,
))),
))
.await
{
tracing::warn!(
session_id = %context.session_id,
tool_call_id = %tool_call.id,
error = %emit_err,
"ActAtom: failed to emit tool.completed event"
);
}
tracing::warn!(
session_id = %context.session_id,
tool_name = %tool_call.name,
tool_call_id = %tool_call.id,
error = %e,
"ActAtom: tool execution failed"
);
ToolCallResult {
tool_call: tool_call.clone(),
result: ToolResult {
tool_call_id: tool_call.id.clone(),
result: None,
images: None,
error: Some(error_msg),
connection_required: None,
raw_output: None,
},
success: false,
status: "error".to_string(),
connection_required: None,
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tools::ToolRegistry;
use crate::traits::NoopEventEmitter;
use crate::typed_id::{AgentId, HarnessId, MessageId, SessionId, TurnId};
use async_trait::async_trait;
use serde_json::json;
struct ArgumentEchoTool;
#[async_trait]
impl crate::tools::Tool for ArgumentEchoTool {
fn name(&self) -> &str {
"argument_echo"
}
fn description(&self) -> &str {
"returns the execution arguments"
}
fn parameters_schema(&self) -> serde_json::Value {
json!({
"type": "object",
"properties": {
"value": { "type": "string" }
}
})
}
async fn execute(&self, arguments: serde_json::Value) -> crate::ToolExecutionResult {
crate::ToolExecutionResult::success(arguments)
}
}
#[tokio::test]
async fn test_act_atom_empty_tool_calls() {
let executor = ToolRegistry::with_defaults();
let event_emitter = NoopEventEmitter;
let atom = ActAtom::new(executor, event_emitter);
let context = AtomContext::new(SessionId::new(), TurnId::new(), MessageId::new());
let input = ActInput {
org_id: Some(1),
context,
harness_id: HarnessId::from_seed(1),
agent_id: Some(AgentId::new()),
tool_calls: vec![],
tool_definitions: vec![],
locale: None,
blueprint_id: None,
network_access: None,
};
let result = atom.execute(input).await.unwrap();
assert!(result.completed);
assert!(result.results.is_empty());
assert_eq!(result.success_count, 0);
assert_eq!(result.error_count, 0);
}
#[tokio::test]
async fn test_act_atom_tool_not_found() {
let executor = ToolRegistry::with_defaults();
let event_emitter = NoopEventEmitter;
let atom = ActAtom::new(executor, event_emitter);
let context = AtomContext::new(SessionId::new(), TurnId::new(), MessageId::new());
let input = ActInput {
org_id: Some(1),
context,
harness_id: HarnessId::from_seed(1),
agent_id: Some(AgentId::new()),
tool_calls: vec![ToolCall {
id: "call_1".to_string(),
name: "nonexistent_tool".to_string(),
arguments: json!({}),
}],
tool_definitions: vec![],
locale: None,
blueprint_id: None,
network_access: None,
};
let result = atom.execute(input).await.unwrap();
assert!(result.completed);
assert_eq!(result.results.len(), 1);
assert!(!result.results[0].success);
assert_eq!(result.results[0].status, "error");
assert!(
result.results[0]
.result
.error
.as_ref()
.unwrap()
.contains("not found")
);
}
#[tokio::test]
async fn test_act_atom_uses_tool_call_hooks_for_execution_arguments() {
use crate::capabilities::{Capability, HumanIntentCapability};
let mut executor = ToolRegistry::new();
executor.register(ArgumentEchoTool);
let tool_def = executor.get("argument_echo").unwrap().to_definition();
let emitter = crate::memory::InMemoryEventEmitter::new();
let atom = ActAtom::new(executor, emitter.clone())
.with_tool_call_hooks(HumanIntentCapability.tool_call_hooks());
let context = AtomContext::new(SessionId::new(), TurnId::new(), MessageId::new());
let input = ActInput {
org_id: Some(1),
context,
harness_id: HarnessId::from_seed(1),
agent_id: Some(AgentId::new()),
tool_calls: vec![ToolCall {
id: "call_1".to_string(),
name: "argument_echo".to_string(),
arguments: json!({
"value": "visible",
"human_intent": "Echoing test arguments"
}),
}],
tool_definitions: vec![tool_def],
locale: None,
blueprint_id: None,
network_access: None,
};
let result = atom.execute(input).await.unwrap();
assert!(result.results[0].success);
assert_eq!(
result.results[0].result.result,
Some(json!({ "value": "visible" }))
);
let events = emitter.events().await;
let act_started = events
.iter()
.find(|event| event.event_type == "act.started")
.expect("act.started event");
let crate::events::EventData::ActStarted(data) = &act_started.data else {
panic!("expected act.started data");
};
assert_eq!(data.headline.as_deref(), Some("Echoing test arguments"));
assert_eq!(
data.tool_calls[0].narration.as_deref(),
Some("Echoing test arguments")
);
let tool_started = events
.iter()
.find(|event| event.event_type == "tool.started")
.expect("tool.started event");
let crate::events::EventData::ToolStarted(data) = &tool_started.data else {
panic!("expected tool.started data");
};
assert_eq!(data.narration.as_deref(), Some("Echoing test arguments"));
let tool_completed = events
.iter()
.find(|event| event.event_type == "tool.completed")
.expect("tool.completed event");
let crate::events::EventData::ToolCompleted(data) = &tool_completed.data else {
panic!("expected tool.completed data");
};
assert_eq!(data.narration.as_deref(), Some("Echoing test arguments"));
}
#[tokio::test]
async fn test_act_atom_strips_human_intent_from_client_tool_calls() {
use crate::capabilities::{Capability, HumanIntentCapability};
let executor = ToolRegistry::new();
let emitter = crate::memory::InMemoryEventEmitter::new();
let atom = ActAtom::new(executor, emitter)
.with_tool_call_hooks(HumanIntentCapability.tool_call_hooks());
let context = AtomContext::new(SessionId::new(), TurnId::new(), MessageId::new());
let input = ActInput {
org_id: Some(1),
context,
harness_id: HarnessId::from_seed(1),
agent_id: Some(AgentId::new()),
tool_calls: vec![ToolCall {
id: "call_client".to_string(),
name: "browser_click".to_string(),
arguments: json!({
"selector": "#btn",
"human_intent": "Clicking approve"
}),
}],
tool_definitions: vec![crate::ToolDefinition::ClientSide(crate::ClientSideTool {
name: "browser_click".to_string(),
display_name: None,
description: "Click button".to_string(),
parameters: json!({
"type": "object",
"properties": {
"selector": {"type": "string"}
},
"required": ["selector"]
}),
category: None,
deferrable: Default::default(),
hints: crate::tool_types::ToolHints::default(),
})],
locale: None,
blueprint_id: None,
network_access: None,
};
let result = atom.execute(input).await.unwrap();
assert_eq!(result.client_tool_calls.len(), 1);
assert_eq!(
result.client_tool_calls[0].arguments,
json!({ "selector": "#btn" })
);
}
fn manage_harnesses_tool_def() -> crate::ToolDefinition {
crate::ToolDefinition::Builtin(crate::BuiltinTool {
name: "manage_harnesses".to_string(),
display_name: None,
description: "CRUD operations for harnesses".to_string(),
parameters: json!({
"type": "object",
"properties": {
"operation": {"type": "string"}
},
"required": ["operation"]
}),
policy: Default::default(),
category: None,
deferrable: Default::default(),
hints: crate::tool_types::ToolHints::default(),
})
}
fn read_capabilities_tool_def() -> crate::ToolDefinition {
crate::ToolDefinition::Builtin(crate::BuiltinTool {
name: "read_capabilities".to_string(),
display_name: None,
description: "List capabilities".to_string(),
parameters: json!({
"type": "object",
"properties": {
"id": {"type": "string"},
"search": {"type": "string"}
}
}),
policy: Default::default(),
category: None,
deferrable: Default::default(),
hints: crate::tool_types::ToolHints::default(),
})
}
#[tokio::test]
async fn test_act_atom_platform_tool_works_with_platform_store() {
use crate::capabilities::{Capability, PlatformManagementCapability};
let mut executor = ToolRegistry::with_defaults();
for tool in PlatformManagementCapability.tools() {
executor.register_boxed(tool);
}
let event_emitter = NoopEventEmitter;
let mock_store = crate::platform_store::tests::MockPlatformStore::new();
let platform_store: Arc<dyn crate::platform_store::PlatformStore> = Arc::new(mock_store);
let atom = ActAtom::new(executor, event_emitter).with_platform_store(platform_store);
let context = AtomContext::new(SessionId::new(), TurnId::new(), MessageId::new());
let input = ActInput {
org_id: Some(1),
context,
harness_id: HarnessId::from_seed(1),
agent_id: None,
tool_calls: vec![ToolCall {
id: "call_1".to_string(),
name: "read_capabilities".to_string(),
arguments: json!({}),
}],
tool_definitions: vec![read_capabilities_tool_def()],
locale: None,
blueprint_id: None,
network_access: None,
};
let result = atom.execute(input).await.unwrap();
assert!(result.completed);
assert_eq!(result.results.len(), 1);
assert!(
result.results[0].success,
"read_capabilities should succeed when platform_store is wired: {:?}",
result.results[0].result.error
);
}
#[tokio::test]
async fn test_act_atom_platform_tool_fails_without_platform_store() {
use crate::capabilities::{Capability, PlatformManagementCapability};
let mut executor = ToolRegistry::with_defaults();
for tool in PlatformManagementCapability.tools() {
executor.register_boxed(tool);
}
let event_emitter = NoopEventEmitter;
let atom = ActAtom::new(executor, event_emitter);
let context = AtomContext::new(SessionId::new(), TurnId::new(), MessageId::new());
let input = ActInput {
org_id: Some(1),
context,
harness_id: HarnessId::from_seed(1),
agent_id: None,
tool_calls: vec![ToolCall {
id: "call_1".to_string(),
name: "manage_harnesses".to_string(),
arguments: json!({"operation": "list"}),
}],
tool_definitions: vec![manage_harnesses_tool_def()],
locale: None,
blueprint_id: None,
network_access: None,
};
let result = atom.execute(input).await.unwrap();
assert!(result.completed);
assert_eq!(result.results.len(), 1);
assert!(!result.results[0].success);
let err_msg = result.results[0].result.error.as_deref().unwrap();
assert!(
err_msg.contains("Platform management not available"),
"Expected platform management error, got: {err_msg}"
);
}
#[test]
fn test_act_result_connection_required_serialization() {
let result = ActResult {
results: vec![ToolCallResult {
tool_call: ToolCall {
id: "call_1".to_string(),
name: "daytona_create_sandbox".to_string(),
arguments: json!({}),
},
result: ToolResult {
tool_call_id: "call_1".to_string(),
result: Some(json!({"connection_required": "daytona"})),
images: None,
error: None,
connection_required: Some("daytona".to_string()),
raw_output: None,
},
success: false,
status: "success".to_string(),
connection_required: Some("daytona".to_string()),
}],
completed: true,
success_count: 0,
error_count: 0,
waiting_for_tool_results: true,
blocked: false,
client_tool_calls: vec![],
client_tool_definitions: vec![],
};
let json_str = serde_json::to_string(&result).unwrap();
let parsed: ActResult = serde_json::from_str(&json_str).unwrap();
assert!(parsed.waiting_for_tool_results);
assert_eq!(
parsed.results[0].connection_required,
Some("daytona".to_string())
);
}
#[test]
fn test_act_result_backward_compat_deserialization() {
let json_str = r#"{
"results": [],
"completed": true,
"success_count": 0,
"error_count": 0
}"#;
let parsed: ActResult = serde_json::from_str(json_str).unwrap();
assert!(!parsed.waiting_for_tool_results);
assert!(parsed.client_tool_calls.is_empty());
}
}