use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;
use super::act_hooks::{self, PostActHook};
use super::tool_scheduler;
use super::{Atom, AtomContext};
use crate::error::Result;
use crate::events::{
ActCompletedData, ActStartedData, EventContext, EventRequest, ToolCompletedData,
ToolStartedData,
};
use crate::message::ContentPart;
use crate::tool_fingerprint::{
tool_call_fingerprint, tool_error_fingerprint, tool_result_fingerprint,
};
use crate::tool_narration::{
ToolNarrationPhase, render_group_headline_with_locale, render_tool_narration_with_locale,
};
use crate::tool_types::{ToolCall, ToolDefinition, ToolResult};
use crate::traits::{
AgentStore, EventEmitter, SessionFileSystem, SessionMutator, SessionStore, ToolContext,
ToolExecutor,
};
use crate::typed_id::{AgentId, HarnessId};
use uuid::Uuid;
struct AbortOnDropJoinHandle<T> {
handle: tokio::task::JoinHandle<T>,
}
impl<T> AbortOnDropJoinHandle<T> {
fn new(handle: tokio::task::JoinHandle<T>) -> Self {
Self { handle }
}
}
impl<T> Future for AbortOnDropJoinHandle<T> {
type Output = std::result::Result<T, tokio::task::JoinError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.handle).poll(cx)
}
}
impl<T> Drop for AbortOnDropJoinHandle<T> {
fn drop(&mut self) {
if !self.handle.is_finished() {
self.handle.abort();
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ActInput {
#[serde(skip_serializing_if = "Option::is_none")]
pub org_id: Option<i64>,
pub context: AtomContext,
pub harness_id: HarnessId,
#[serde(skip_serializing_if = "Option::is_none")]
pub agent_id: Option<AgentId>,
pub tool_calls: Vec<ToolCall>,
pub tool_definitions: Vec<ToolDefinition>,
#[serde(skip_serializing_if = "Option::is_none")]
pub locale: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub blueprint_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub network_access: Option<crate::network_access::NetworkAccessList>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub parallel_tool_calls: Option<bool>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ToolCallResult {
pub tool_call: ToolCall,
pub result: ToolResult,
pub success: bool,
pub status: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub connection_required: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ActResult {
pub results: Vec<ToolCallResult>,
pub completed: bool,
pub success_count: u32,
pub error_count: u32,
#[serde(default)]
pub waiting_for_tool_results: bool,
#[serde(default, skip_serializing_if = "is_false")]
pub blocked: bool,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub client_tool_calls: Vec<ToolCall>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub client_tool_definitions: Vec<ToolDefinition>,
}
fn is_false(value: &bool) -> bool {
!*value
}
pub struct ActAtom<T, E>
where
T: ToolExecutor,
E: EventEmitter,
{
tool_executor: Arc<T>,
event_emitter: Arc<E>,
file_store: Option<Arc<dyn SessionFileSystem>>,
sqldb_store: Option<crate::traits::SessionSqlDbStoreRef>,
storage_store: Option<Arc<dyn crate::traits::SessionStorageStore>>,
image_store: Option<Arc<dyn crate::traits::ImageArtifactStore>>,
provider_credential_store: Option<Arc<dyn crate::traits::ProviderCredentialStore>>,
utility_llm_service: Option<Arc<dyn crate::UtilityLlmService>>,
egress_service: Option<Arc<dyn crate::EgressService>>,
connection_resolver: Option<Arc<dyn crate::traits::UserConnectionResolver>>,
session_store: Option<Arc<dyn SessionStore>>,
session_mutator: Option<Arc<dyn SessionMutator>>,
agent_store: Option<Arc<dyn AgentStore>>,
schedule_store: Option<Arc<dyn crate::traits::SessionScheduleStore>>,
platform_store: Option<Arc<dyn crate::platform_store::PlatformStore>>,
leased_resource_store: Option<Arc<dyn crate::traits::LeasedResourceStore>>,
session_resource_registry: Option<Arc<dyn crate::traits::SessionResourceRegistry>>,
capability_registry: Option<crate::capabilities::CapabilityRegistry>,
tool_registry: Option<Arc<crate::tools::ToolRegistry>>,
memory_store: Option<Arc<dyn crate::memory_store::MemoryStoreBackend>>,
org_id: Option<crate::typed_id::OrgId>,
network_access: Option<crate::network_access::NetworkAccessList>,
budget_checker: Option<Arc<dyn crate::traits::BudgetChecker>>,
payment_authority: Option<Arc<dyn crate::traits::PaymentAuthority>>,
outbound_tool_rate_limiter: Option<Arc<dyn crate::traits::OutboundToolRateLimiter>>,
hooks: Vec<Box<dyn PostActHook>>,
post_tool_hooks: Vec<Arc<dyn act_hooks::PostToolExecHook>>,
pre_tool_hooks: Vec<Arc<dyn act_hooks::PreToolUseHook>>,
tool_call_hooks: Vec<Arc<dyn crate::capabilities::ToolCallHook>>,
final_post_tool_hooks: Vec<Arc<dyn act_hooks::PostToolExecHook>>,
}
impl<T, E> ActAtom<T, E>
where
T: ToolExecutor,
E: EventEmitter,
{
pub fn new(tool_executor: T, event_emitter: E) -> Self {
Self {
tool_executor: Arc::new(tool_executor),
event_emitter: Arc::new(event_emitter),
file_store: None,
sqldb_store: None,
storage_store: None,
image_store: None,
provider_credential_store: None,
utility_llm_service: None,
egress_service: None,
connection_resolver: None,
session_store: None,
session_mutator: None,
agent_store: None,
schedule_store: None,
platform_store: None,
leased_resource_store: None,
session_resource_registry: None,
capability_registry: None,
tool_registry: None,
memory_store: None,
org_id: None,
network_access: None,
budget_checker: None,
payment_authority: None,
outbound_tool_rate_limiter: None,
hooks: Self::default_hooks(),
post_tool_hooks: Vec::new(),
pre_tool_hooks: Vec::new(),
tool_call_hooks: Vec::new(),
final_post_tool_hooks: Self::default_final_hooks(),
}
}
pub fn with_file_store(
tool_executor: T,
event_emitter: E,
file_store: Arc<dyn SessionFileSystem>,
) -> Self {
Self {
tool_executor: Arc::new(tool_executor),
event_emitter: Arc::new(event_emitter),
file_store: Some(file_store),
sqldb_store: None,
storage_store: None,
image_store: None,
provider_credential_store: None,
utility_llm_service: None,
egress_service: None,
connection_resolver: None,
session_store: None,
session_mutator: None,
agent_store: None,
schedule_store: None,
platform_store: None,
leased_resource_store: None,
session_resource_registry: None,
capability_registry: None,
tool_registry: None,
memory_store: None,
org_id: None,
network_access: None,
budget_checker: None,
payment_authority: None,
outbound_tool_rate_limiter: None,
hooks: Self::default_hooks(),
post_tool_hooks: Vec::new(),
pre_tool_hooks: Vec::new(),
tool_call_hooks: Vec::new(),
final_post_tool_hooks: Self::default_final_hooks(),
}
}
pub fn with_hook(mut self, hook: Box<dyn PostActHook>) -> Self {
self.hooks.push(hook);
self
}
fn default_hooks() -> Vec<Box<dyn PostActHook>> {
vec![
Box::new(act_hooks::ConnectionSetupHook),
Box::new(act_hooks::ClientSideToolHook),
]
}
fn default_final_hooks() -> Vec<Arc<dyn act_hooks::PostToolExecHook>> {
vec![
Arc::new(crate::capabilities::PersistOutputHook),
Arc::new(act_hooks::OutputHardLimitHook),
]
}
pub fn with_sqldb_store(mut self, store: crate::traits::SessionSqlDbStoreRef) -> Self {
self.sqldb_store = Some(store);
self
}
pub fn with_storage_store(
mut self,
store: Arc<dyn crate::traits::SessionStorageStore>,
) -> Self {
self.storage_store = Some(store);
self
}
pub fn with_image_store(mut self, store: Arc<dyn crate::traits::ImageArtifactStore>) -> Self {
self.image_store = Some(store);
self
}
pub fn with_provider_credential_store(
mut self,
store: Arc<dyn crate::traits::ProviderCredentialStore>,
) -> Self {
self.provider_credential_store = Some(store);
self
}
pub fn with_utility_llm_service(mut self, service: Arc<dyn crate::UtilityLlmService>) -> Self {
self.utility_llm_service = Some(service);
self
}
pub fn with_egress_service(mut self, service: Arc<dyn crate::EgressService>) -> Self {
self.egress_service = Some(service);
self
}
pub fn with_connection_resolver(
mut self,
resolver: Arc<dyn crate::traits::UserConnectionResolver>,
) -> Self {
self.connection_resolver = Some(resolver);
self
}
pub fn with_session_store(mut self, store: Arc<dyn SessionStore>) -> Self {
self.session_store = Some(store);
self
}
pub fn with_session_mutator(mut self, mutator: Arc<dyn SessionMutator>) -> Self {
self.session_mutator = Some(mutator);
self
}
pub fn with_agent_store(mut self, store: Arc<dyn AgentStore>) -> Self {
self.agent_store = Some(store);
self
}
pub fn with_schedule_store(
mut self,
store: Arc<dyn crate::traits::SessionScheduleStore>,
) -> Self {
self.schedule_store = Some(store);
self
}
pub fn with_platform_store(
mut self,
store: Arc<dyn crate::platform_store::PlatformStore>,
) -> Self {
self.platform_store = Some(store);
self
}
pub fn with_leased_resource_store(
mut self,
store: Arc<dyn crate::traits::LeasedResourceStore>,
) -> Self {
self.leased_resource_store = Some(store);
self
}
pub fn with_session_resource_registry(
mut self,
registry: Arc<dyn crate::traits::SessionResourceRegistry>,
) -> Self {
self.session_resource_registry = Some(registry);
self
}
pub fn with_capability_registry(
mut self,
registry: crate::capabilities::CapabilityRegistry,
) -> Self {
self.capability_registry = Some(registry);
self
}
pub fn with_tool_registry(mut self, registry: Arc<crate::tools::ToolRegistry>) -> Self {
self.tool_registry = Some(registry);
self
}
pub fn with_post_tool_hooks(
mut self,
hooks: Vec<Arc<dyn act_hooks::PostToolExecHook>>,
) -> Self {
self.post_tool_hooks.extend(hooks);
self
}
pub fn with_pre_tool_hooks(mut self, hooks: Vec<Arc<dyn act_hooks::PreToolUseHook>>) -> Self {
self.pre_tool_hooks.extend(hooks);
self
}
pub fn with_tool_call_hooks(
mut self,
hooks: Vec<Arc<dyn crate::capabilities::ToolCallHook>>,
) -> Self {
self.tool_call_hooks.extend(hooks);
self
}
pub fn with_memory_store(
mut self,
store: Arc<dyn crate::memory_store::MemoryStoreBackend>,
) -> Self {
self.memory_store = Some(store);
self
}
pub fn with_org_id(mut self, org_id: crate::typed_id::OrgId) -> Self {
self.org_id = Some(org_id);
self
}
pub fn with_network_access(
mut self,
network_access: Option<crate::network_access::NetworkAccessList>,
) -> Self {
self.network_access = network_access;
self
}
pub fn with_budget_checker(mut self, checker: Arc<dyn crate::traits::BudgetChecker>) -> Self {
self.budget_checker = Some(checker);
self
}
pub fn with_payment_authority(
mut self,
authority: Arc<dyn crate::traits::PaymentAuthority>,
) -> Self {
self.payment_authority = Some(authority);
self
}
pub fn with_outbound_tool_rate_limiter(
mut self,
limiter: Arc<dyn crate::traits::OutboundToolRateLimiter>,
) -> Self {
self.outbound_tool_rate_limiter = Some(limiter);
self
}
}
#[async_trait]
impl<T, E> Atom for ActAtom<T, E>
where
T: ToolExecutor + Send + Sync + 'static,
E: EventEmitter + Send + Sync + 'static,
{
type Input = ActInput;
type Output = ActResult;
fn name(&self) -> &'static str {
"act"
}
async fn execute(&self, input: Self::Input) -> Result<Self::Output> {
let ActInput {
context,
tool_calls,
tool_definitions,
locale,
network_access,
parallel_tool_calls,
.. } = input;
let (server_tool_calls, client_tool_calls): (Vec<_>, Vec<_>) =
tool_calls.into_iter().partition(|tc| {
tool_definitions
.iter()
.find(|td| td.name() == tc.name)
.map(|td| !matches!(td, ToolDefinition::ClientSide(_)))
.unwrap_or(true) });
let client_tool_calls: Vec<_> = client_tool_calls
.into_iter()
.map(|tool_call| self.transform_tool_call_for_execution(tool_call))
.collect();
let client_tool_definitions: Vec<_> = if client_tool_calls.is_empty() {
vec![]
} else {
tool_definitions
.iter()
.filter(|td| {
if let ToolDefinition::ClientSide(ct) = td {
client_tool_calls.iter().any(|tc| tc.name == ct.name)
} else {
false
}
})
.cloned()
.collect()
};
if server_tool_calls.is_empty() && client_tool_calls.is_empty() {
return Ok(ActResult {
results: vec![],
completed: true,
success_count: 0,
error_count: 0,
waiting_for_tool_results: false,
blocked: false,
client_tool_calls: vec![],
client_tool_definitions: vec![],
});
}
if server_tool_calls.is_empty() {
let mut result = ActResult {
results: vec![],
completed: true,
success_count: 0,
error_count: 0,
waiting_for_tool_results: false,
blocked: false,
client_tool_calls,
client_tool_definitions,
};
act_hooks::run_post_act_hooks(
&self.hooks,
&context,
&mut result,
&tool_definitions,
&self.event_emitter,
locale.as_deref(),
)
.await;
return Ok(result);
}
let tool_calls = server_tool_calls;
tracing::info!(
session_id = %context.session_id,
turn_id = %context.turn_id,
exec_id = %context.exec_id,
tool_count = %tool_calls.len(),
"ActAtom: executing tools in parallel"
);
let trace_id = context.turn_id.to_string();
let act_span_id = Uuid::now_v7().to_string();
let parent_span_id = trace_id.clone();
let event_context = EventContext::from_atom_context(&context).with_span(
trace_id.clone(),
act_span_id.clone(),
Some(parent_span_id.clone()),
);
let act_start = Instant::now();
let visible_tool_names = Arc::new(
tool_definitions
.iter()
.map(|def| def.name().to_string())
.collect::<HashSet<_>>(),
);
let tool_map: std::collections::HashMap<&str, &ToolDefinition> = tool_definitions
.iter()
.map(|def| {
let name = def.name();
(name, def)
})
.collect();
let mut started_data = ActStartedData::with_definitions_and_locale(
&tool_calls,
&tool_definitions,
locale.as_deref(),
);
for summary in &mut started_data.tool_calls {
if let Some(tool_call) = tool_calls.iter().find(|tc| tc.id == summary.id) {
let tool_def = tool_map.get(tool_call.name.as_str()).copied();
summary.narration = Some(self.render_tool_narration(
tool_def,
tool_call,
ToolNarrationPhase::Started,
locale.as_deref(),
));
}
}
if tool_calls.len() == 1 {
started_data.headline = started_data
.tool_calls
.first()
.and_then(|summary| summary.narration.clone());
}
if let Err(e) = self
.event_emitter
.emit(EventRequest::new(
context.session_id,
event_context.clone(),
started_data,
))
.await
{
tracing::warn!(
session_id = %context.session_id,
error = %e,
"ActAtom: failed to emit act.started event"
);
}
let classes: Vec<Option<String>> = tool_calls
.iter()
.map(|tool_call| {
tool_map
.get(tool_call.name.as_str())
.and_then(|def| def.concurrency_class())
.map(|class| class.to_string())
})
.collect();
let schedule_config = tool_scheduler::ScheduleConfig {
serialize_all: parallel_tool_calls == Some(false),
..tool_scheduler::ScheduleConfig::default()
};
let results =
tool_scheduler::schedule(tool_calls.len(), &classes, schedule_config, |index| {
let tool_call = &tool_calls[index];
let tool_def = tool_map.get(tool_call.name.as_str()).cloned();
self.execute_single_tool(
&context,
tool_call.clone(),
tool_def,
&trace_id,
&act_span_id,
locale.as_deref(),
network_access.as_ref(),
visible_tool_names.clone(),
)
})
.await;
let success_count = results.iter().filter(|r| r.success).count() as u32;
let error_count = results.iter().filter(|r| !r.success).count() as u32;
let act_duration_ms = act_start.elapsed().as_millis() as u64;
let completed_context = EventContext::from_atom_context(&context).with_span(
trace_id.clone(),
act_span_id.clone(), Some(parent_span_id.clone()),
);
let mut completed_headline = render_group_headline_with_locale(
&tool_calls,
&tool_definitions,
ToolNarrationPhase::Completed,
locale.as_deref(),
);
if tool_calls.len() == 1
&& let Some(tool_call) = tool_calls.first()
{
completed_headline = Some(self.render_tool_narration(
tool_map.get(tool_call.name.as_str()).copied(),
tool_call,
ToolNarrationPhase::Completed,
locale.as_deref(),
));
}
if error_count > 0 {
let suffix = crate::localization::format_error_suffix(locale.as_deref(), error_count);
completed_headline = Some(match completed_headline {
Some(text) => format!("{text}{suffix}"),
None => {
crate::localization::format_completed_tool_batch(locale.as_deref(), error_count)
}
});
}
if let Err(e) = self
.event_emitter
.emit(EventRequest::new(
context.session_id,
completed_context,
ActCompletedData {
completed: true,
success_count,
error_count,
duration_ms: Some(act_duration_ms),
headline: completed_headline,
},
))
.await
{
tracing::warn!(
session_id = %context.session_id,
error = %e,
"ActAtom: failed to emit act.completed event"
);
}
tracing::info!(
session_id = %context.session_id,
turn_id = %context.turn_id,
success_count = %success_count,
error_count = %error_count,
"ActAtom: all tools completed"
);
let mut act_result = ActResult {
results,
completed: true,
success_count,
error_count,
waiting_for_tool_results: false,
blocked: false,
client_tool_calls,
client_tool_definitions,
};
act_hooks::run_post_act_hooks(
&self.hooks,
&context,
&mut act_result,
&tool_definitions,
&self.event_emitter,
locale.as_deref(),
)
.await;
Ok(act_result)
}
}
impl<T, E> ActAtom<T, E>
where
T: ToolExecutor + Send + Sync + 'static,
E: EventEmitter + Send + Sync + 'static,
{
fn render_tool_narration(
&self,
tool_def: Option<&ToolDefinition>,
tool_call: &ToolCall,
phase: ToolNarrationPhase,
locale: Option<&str>,
) -> String {
for hook in &self.tool_call_hooks {
if let Some(narration) = hook.narration(tool_def, tool_call, phase, locale) {
return narration;
}
}
render_tool_narration_with_locale(tool_def, tool_call, phase, locale)
}
fn transform_tool_call_for_execution(&self, tool_call: ToolCall) -> ToolCall {
self.tool_call_hooks
.iter()
.fold(tool_call, |tool_call, hook| {
hook.transform_for_execution(tool_call)
})
}
#[allow(clippy::too_many_arguments)]
async fn execute_single_tool(
&self,
context: &AtomContext,
tool_call: ToolCall,
tool_def: Option<&ToolDefinition>,
trace_id: &str,
act_span_id: &str,
locale: Option<&str>,
network_access: Option<&crate::network_access::NetworkAccessList>,
visible_tool_names: Arc<HashSet<String>>,
) -> ToolCallResult {
tracing::debug!(
session_id = %context.session_id,
turn_id = %context.turn_id,
tool_name = %tool_call.name,
tool_call_id = %tool_call.id,
"ActAtom: executing tool"
);
let tool_span_id = Uuid::now_v7().to_string();
let event_context = EventContext::from_atom_context(context).with_span(
trace_id.to_string(),
tool_span_id.clone(),
Some(act_span_id.to_string()),
);
let tool_start = Instant::now();
let tool_call_fingerprint = tool_call_fingerprint(&tool_call);
let display_name = crate::localization::localized_tool_display_name(
&tool_call.name,
tool_def.and_then(|d| d.display_name()),
locale,
);
let capability_attribution = tool_def.and_then(|def| {
def.capability_attribution()
.map(|(id, name)| (id.to_string(), name.map(str::to_string)))
});
if let (Some(limiter), Some(ref org_id)) = (&self.outbound_tool_rate_limiter, self.org_id)
&& !limiter.check_org(org_id).await
{
tracing::warn!(
session_id = %context.session_id,
tool_name = %tool_call.name,
"ActAtom: outbound tool rate limit exceeded for org"
);
return ToolCallResult {
tool_call: tool_call.clone(),
result: ToolResult {
tool_call_id: tool_call.id.clone(),
result: None,
images: None,
error: Some(
"Outbound tool rate limit exceeded for this organization; back off and retry later.".to_string(),
),
connection_required: None,
raw_output: None,
},
success: false,
status: "error".to_string(),
connection_required: None,
};
}
if let Err(e) = self
.event_emitter
.emit(EventRequest::new(
context.session_id,
event_context.clone(),
ToolStartedData {
tool_call: tool_call.clone(),
tool_call_fingerprint: Some(tool_call_fingerprint.clone()),
display_name: display_name.clone(),
narration: Some(self.render_tool_narration(
tool_def,
&tool_call,
ToolNarrationPhase::Started,
locale,
)),
},
))
.await
{
tracing::warn!(
session_id = %context.session_id,
tool_call_id = %tool_call.id,
error = %e,
"ActAtom: failed to emit tool.started event"
);
}
let Some(tool_def) = tool_def else {
let error_msg = format!("Tool definition not found: {}", tool_call.name);
let tool_duration_ms = tool_start.elapsed().as_millis() as u64;
if let Err(e) = self
.event_emitter
.emit(EventRequest::new(
context.session_id,
event_context,
ToolCompletedData::failure(
tool_call.id.clone(),
tool_call.name.clone(),
"error".to_string(),
error_msg.clone(),
Some(tool_duration_ms),
)
.with_fingerprints(
tool_call_fingerprint.clone(),
tool_error_fingerprint(&tool_call.name, "error", &error_msg),
)
.with_narration(Some(self.render_tool_narration(
None,
&tool_call,
ToolNarrationPhase::Failed,
locale,
))),
))
.await
{
tracing::warn!(
session_id = %context.session_id,
tool_call_id = %tool_call.id,
error = %e,
"ActAtom: failed to emit tool.completed event"
);
}
return ToolCallResult {
tool_call: tool_call.clone(),
result: ToolResult {
tool_call_id: tool_call.id.clone(),
result: None,
images: None,
error: Some(error_msg),
connection_required: None,
raw_output: None,
},
success: false,
status: "error".to_string(),
connection_required: None,
};
};
let mut tool_context = if let Some(ref store) = self.file_store {
ToolContext::with_file_store(context.session_id, store.clone())
} else {
ToolContext::new(context.session_id)
};
if let Some(ref store) = self.sqldb_store {
tool_context.sqldb_store = Some(store.clone());
}
if let Some(ref store) = self.storage_store {
tool_context.storage_store = Some(store.clone());
}
if let Some(ref store) = self.image_store {
tool_context.image_store = Some(store.clone());
}
if let Some(ref store) = self.provider_credential_store {
tool_context.provider_credential_store = Some(store.clone());
}
if let Some(ref service) = self.utility_llm_service {
tool_context.utility_llm_service = Some(service.clone());
}
if let Some(ref service) = self.egress_service {
tool_context.egress_service = Some(service.clone());
}
if let Some(ref resolver) = self.connection_resolver {
tool_context.connection_resolver = Some(resolver.clone());
}
if let Some(ref store) = self.session_store {
tool_context.session_store = Some(store.clone());
}
if let Some(ref mutator) = self.session_mutator {
tool_context.session_mutator = Some(mutator.clone());
}
if let Some(ref store) = self.agent_store {
tool_context.agent_store = Some(store.clone());
}
if let Some(ref store) = self.schedule_store {
tool_context.schedule_store = Some(store.clone());
}
if let Some(ref store) = self.platform_store {
tool_context.platform_store = Some(store.clone());
}
if let Some(ref store) = self.leased_resource_store {
tool_context.leased_resource_store = Some(store.clone());
}
if let Some(ref registry) = self.session_resource_registry {
tool_context.session_resource_registry = Some(registry.clone());
}
if let Some(ref registry) = self.capability_registry {
tool_context.capability_registry = Some(registry.clone());
}
if let Some(ref registry) = self.tool_registry {
tool_context.tool_registry = Some(registry.clone());
}
tool_context.visible_tool_names = Some(visible_tool_names.clone());
if let Some(ref store) = self.memory_store {
tool_context.memory_store = Some(store.clone());
}
if let Some(ref checker) = self.budget_checker {
tool_context.budget_checker = Some(checker.clone());
}
if let Some(ref authority) = self.payment_authority {
tool_context.payment_authority = Some(authority.clone());
}
tool_context.org_id = self.org_id;
tool_context.network_access = network_access
.cloned()
.or_else(|| self.network_access.clone());
tool_context.event_emitter = Some(self.event_emitter.clone() as Arc<dyn EventEmitter>);
tool_context.event_context = Some(event_context.clone());
tool_context.tool_call_id = Some(tool_call.id.clone());
let execution_tool_call = self.transform_tool_call_for_execution(tool_call.clone());
let (execution_tool_call, pre_block_reason) = if self.pre_tool_hooks.is_empty() {
(execution_tool_call, None)
} else {
match act_hooks::run_pre_tool_use_hooks(
&self.pre_tool_hooks,
execution_tool_call.clone(),
tool_def,
&tool_context,
)
.await
{
act_hooks::PreToolUseDecision::Continue(updated) => (updated, None),
act_hooks::PreToolUseDecision::Block {
tool_call: blocked,
reason,
..
} => (blocked, Some(reason)),
}
};
let result = if let Some(reason) = pre_block_reason {
tracing::warn!(
session_id = %context.session_id,
tool_call_id = %execution_tool_call.id,
tool_name = %execution_tool_call.name,
reason = %reason,
"ActAtom: pre_tool_use hook blocked execution"
);
Ok(crate::tool_types::ToolResult {
tool_call_id: execution_tool_call.id.clone(),
result: None,
images: None,
error: Some(format!("blocked by pre_tool_use hook: {reason}")),
connection_required: None,
raw_output: None,
})
} else if tool_def.is_cpu_bound() {
let executor = self.tool_executor.clone();
let call = execution_tool_call.clone();
let def = tool_def.clone();
let ctx = tool_context.clone();
match AbortOnDropJoinHandle::new(tokio::spawn(async move {
executor.execute_with_context(&call, &def, &ctx).await
}))
.await
{
Ok(result) => result,
Err(join_err) => Err(crate::error::AgentLoopError::tool(format!(
"tool task failed to complete: {join_err}"
))),
}
} else {
self.tool_executor
.execute_with_context(&execution_tool_call, tool_def, &tool_context)
.await
};
match result {
Ok(mut tool_result) => {
act_hooks::run_post_tool_exec_hooks(
&self.post_tool_hooks,
&self.final_post_tool_hooks,
&execution_tool_call,
tool_def,
&mut tool_result,
&tool_context,
)
.await;
let tool_duration_ms = tool_start.elapsed().as_millis() as u64;
let success = tool_result.error.is_none();
let status = if success { "success" } else { "error" };
let completed_data = if success {
let result_fingerprint = tool_result_fingerprint(&tool_call.name, &tool_result);
let mut result_content = tool_result
.result
.as_ref()
.map(|r| vec![ContentPart::text(r.to_string())])
.unwrap_or_default();
if let Some(ref images) = tool_result.images {
for img in images {
result_content.push(ContentPart::Image(
crate::message::ImageContentPart::from_base64(
&img.base64,
&img.media_type,
),
));
}
}
ToolCompletedData::success(
tool_call.id.clone(),
tool_call.name.clone(),
result_content,
Some(tool_duration_ms),
)
.with_fingerprints(tool_call_fingerprint.clone(), result_fingerprint)
.with_display_name(display_name.clone())
.with_capability_attribution(
capability_attribution.as_ref().map(|(id, _)| id.clone()),
capability_attribution
.as_ref()
.and_then(|(_, name)| name.clone()),
)
.with_narration(Some(self.render_tool_narration(
Some(tool_def),
&tool_call,
ToolNarrationPhase::Completed,
locale,
)))
} else {
let result_fingerprint = tool_result_fingerprint(&tool_call.name, &tool_result);
ToolCompletedData::failure(
tool_call.id.clone(),
tool_call.name.clone(),
status.to_string(),
tool_result.error.clone().unwrap_or_default(),
Some(tool_duration_ms),
)
.with_fingerprints(tool_call_fingerprint.clone(), result_fingerprint)
.with_display_name(display_name.clone())
.with_capability_attribution(
capability_attribution.as_ref().map(|(id, _)| id.clone()),
capability_attribution
.as_ref()
.and_then(|(_, name)| name.clone()),
)
.with_narration(Some(self.render_tool_narration(
Some(tool_def),
&tool_call,
ToolNarrationPhase::Failed,
locale,
)))
};
if let Err(e) = self
.event_emitter
.emit(EventRequest::new(
context.session_id,
event_context.clone(),
completed_data,
))
.await
{
tracing::warn!(
session_id = %context.session_id,
tool_call_id = %tool_call.id,
error = %e,
"ActAtom: failed to emit tool.completed event"
);
}
tracing::debug!(
session_id = %context.session_id,
tool_name = %tool_call.name,
tool_call_id = %tool_call.id,
success = %success,
"ActAtom: tool execution completed"
);
let conn_req = tool_result.connection_required.clone();
ToolCallResult {
tool_call,
result: tool_result,
success,
status: status.to_string(),
connection_required: conn_req,
}
}
Err(e) => {
let tool_duration_ms = tool_start.elapsed().as_millis() as u64;
let error_msg = e.to_string();
if let Err(emit_err) = self
.event_emitter
.emit(EventRequest::new(
context.session_id,
event_context,
ToolCompletedData::failure(
tool_call.id.clone(),
tool_call.name.clone(),
"error".to_string(),
error_msg.clone(),
Some(tool_duration_ms),
)
.with_fingerprints(
tool_call_fingerprint.clone(),
tool_error_fingerprint(&tool_call.name, "error", &error_msg),
)
.with_display_name(display_name.clone())
.with_capability_attribution(
capability_attribution.as_ref().map(|(id, _)| id.clone()),
capability_attribution
.as_ref()
.and_then(|(_, name)| name.clone()),
)
.with_narration(Some(self.render_tool_narration(
Some(tool_def),
&tool_call,
ToolNarrationPhase::Failed,
locale,
))),
))
.await
{
tracing::warn!(
session_id = %context.session_id,
tool_call_id = %tool_call.id,
error = %emit_err,
"ActAtom: failed to emit tool.completed event"
);
}
tracing::warn!(
session_id = %context.session_id,
tool_name = %tool_call.name,
tool_call_id = %tool_call.id,
error = %e,
"ActAtom: tool execution failed"
);
ToolCallResult {
tool_call: tool_call.clone(),
result: ToolResult {
tool_call_id: tool_call.id.clone(),
result: None,
images: None,
error: Some(error_msg),
connection_required: None,
raw_output: None,
},
success: false,
status: "error".to_string(),
connection_required: None,
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tools::ToolRegistry;
use crate::traits::NoopEventEmitter;
use crate::typed_id::{AgentId, HarnessId, MessageId, SessionId, TurnId};
use async_trait::async_trait;
use serde_json::json;
struct ArgumentEchoTool;
#[async_trait]
impl crate::tools::Tool for ArgumentEchoTool {
fn name(&self) -> &str {
"argument_echo"
}
fn description(&self) -> &str {
"returns the execution arguments"
}
fn parameters_schema(&self) -> serde_json::Value {
json!({
"type": "object",
"properties": {
"value": { "type": "string" }
}
})
}
async fn execute(&self, arguments: serde_json::Value) -> crate::ToolExecutionResult {
crate::ToolExecutionResult::success(arguments)
}
}
struct UtilityLlmContextProbeTool;
#[async_trait]
impl crate::tools::Tool for UtilityLlmContextProbeTool {
fn name(&self) -> &str {
"utility_llm_context_probe"
}
fn description(&self) -> &str {
"checks whether the utility LLM service is present in tool context"
}
fn parameters_schema(&self) -> serde_json::Value {
json!({
"type": "object",
"properties": {}
})
}
async fn execute(&self, _arguments: serde_json::Value) -> crate::ToolExecutionResult {
crate::ToolExecutionResult::tool_error("context required")
}
async fn execute_with_context(
&self,
_arguments: serde_json::Value,
context: &crate::traits::ToolContext,
) -> crate::ToolExecutionResult {
crate::ToolExecutionResult::success(json!({
"utility_llm_service": context.utility_llm_service.is_some(),
"configured": context
.utility_llm_service
.as_ref()
.is_some_and(|service| service.is_configured()),
}))
}
fn requires_context(&self) -> bool {
true
}
}
#[derive(Default)]
struct SchedObservations {
class_inflight: std::collections::HashMap<String, usize>,
class_max: std::collections::HashMap<String, usize>,
global_inflight: usize,
global_max: usize,
}
struct RecordingTool {
name: String,
class: Option<String>,
obs: Arc<std::sync::Mutex<SchedObservations>>,
}
#[async_trait]
impl crate::tools::Tool for RecordingTool {
fn name(&self) -> &str {
&self.name
}
fn description(&self) -> &str {
"records scheduling order"
}
fn parameters_schema(&self) -> serde_json::Value {
json!({ "type": "object", "properties": {} })
}
async fn execute(&self, _arguments: serde_json::Value) -> crate::ToolExecutionResult {
{
let mut obs = self.obs.lock().unwrap();
obs.global_inflight += 1;
let g = obs.global_inflight;
if g > obs.global_max {
obs.global_max = g;
}
if let Some(class) = &self.class {
let n = obs.class_inflight.entry(class.clone()).or_default();
*n += 1;
let cur = *n;
let m = obs.class_max.entry(class.clone()).or_default();
if cur > *m {
*m = cur;
}
}
}
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
{
let mut obs = self.obs.lock().unwrap();
obs.global_inflight -= 1;
if let Some(class) = &self.class
&& let Some(n) = obs.class_inflight.get_mut(class)
{
*n -= 1;
}
}
crate::ToolExecutionResult::success(json!({ "tool": self.name }))
}
}
struct CancellationProbeTool {
started: Arc<tokio::sync::Notify>,
dropped_tx: Arc<std::sync::Mutex<Option<tokio::sync::oneshot::Sender<()>>>>,
}
impl CancellationProbeTool {
fn new(
started: Arc<tokio::sync::Notify>,
dropped_tx: tokio::sync::oneshot::Sender<()>,
) -> Self {
Self {
started,
dropped_tx: Arc::new(std::sync::Mutex::new(Some(dropped_tx))),
}
}
}
#[async_trait]
impl crate::tools::Tool for CancellationProbeTool {
fn name(&self) -> &str {
"cancellation_probe"
}
fn description(&self) -> &str {
"waits until cancelled"
}
fn parameters_schema(&self) -> serde_json::Value {
json!({ "type": "object", "properties": {} })
}
async fn execute(&self, _arguments: serde_json::Value) -> crate::ToolExecutionResult {
struct DropSignal {
tx: Arc<std::sync::Mutex<Option<tokio::sync::oneshot::Sender<()>>>>,
}
impl Drop for DropSignal {
fn drop(&mut self) {
if let Ok(mut guard) = self.tx.lock()
&& let Some(tx) = guard.take()
{
let _ = tx.send(());
}
}
}
let _drop_signal = DropSignal {
tx: self.dropped_tx.clone(),
};
self.started.notify_one();
std::future::pending::<()>().await;
unreachable!("pending cancellation probe should only finish by cancellation")
}
}
fn recording_tool_def(name: &str, class: Option<&str>, cpu_bound: bool) -> ToolDefinition {
let mut hints = crate::tool_types::ToolHints::default();
if let Some(class) = class {
hints = hints.with_concurrency_class(class);
}
if cpu_bound {
hints = hints.with_cpu_bound(true);
}
ToolDefinition::Builtin(crate::BuiltinTool {
name: name.to_string(),
display_name: None,
description: "records scheduling order".to_string(),
parameters: json!({ "type": "object", "properties": {} }),
policy: Default::default(),
category: None,
deferrable: Default::default(),
hints,
full_parameters: None,
})
}
#[tokio::test]
async fn test_act_atom_empty_tool_calls() {
let executor = ToolRegistry::with_defaults();
let event_emitter = NoopEventEmitter;
let atom = ActAtom::new(executor, event_emitter);
let context = AtomContext::new(SessionId::new(), TurnId::new(), MessageId::new());
let input = ActInput {
org_id: Some(1),
context,
harness_id: HarnessId::from_seed(1),
agent_id: Some(AgentId::new()),
tool_calls: vec![],
tool_definitions: vec![],
locale: None,
blueprint_id: None,
network_access: None,
parallel_tool_calls: None,
};
let result = atom.execute(input).await.unwrap();
assert!(result.completed);
assert!(result.results.is_empty());
assert_eq!(result.success_count, 0);
assert_eq!(result.error_count, 0);
}
#[tokio::test]
async fn test_act_atom_threads_utility_llm_service_to_tool_context() {
let mut executor = ToolRegistry::with_defaults();
executor.register(UtilityLlmContextProbeTool);
let event_emitter = NoopEventEmitter;
let atom = ActAtom::new(executor, event_emitter)
.with_utility_llm_service(Arc::new(crate::DisabledUtilityLlmService));
let context = AtomContext::new(SessionId::new(), TurnId::new(), MessageId::new());
let input = ActInput {
org_id: Some(1),
context,
harness_id: HarnessId::from_seed(1),
agent_id: Some(AgentId::new()),
tool_calls: vec![ToolCall {
id: "call_1".to_string(),
name: "utility_llm_context_probe".to_string(),
arguments: json!({}),
}],
tool_definitions: vec![ToolDefinition::Builtin(crate::BuiltinTool {
name: "utility_llm_context_probe".to_string(),
display_name: None,
description: "checks context".to_string(),
parameters: json!({
"type": "object",
"properties": {}
}),
policy: Default::default(),
category: None,
deferrable: Default::default(),
hints: crate::tool_types::ToolHints::default(),
full_parameters: None,
})],
locale: None,
blueprint_id: None,
network_access: None,
parallel_tool_calls: None,
};
let result = atom.execute(input).await.unwrap();
assert_eq!(result.success_count, 1);
let payload = result.results[0].result.result.as_ref().unwrap();
assert_eq!(payload["utility_llm_service"], true);
assert_eq!(payload["configured"], false);
}
#[tokio::test]
async fn test_act_atom_schedules_batch_by_concurrency_class() {
let obs = Arc::new(std::sync::Mutex::new(SchedObservations::default()));
let mut executor = ToolRegistry::new();
executor.register(RecordingTool {
name: "writer_a".to_string(),
class: Some("ws".to_string()),
obs: obs.clone(),
});
executor.register(RecordingTool {
name: "writer_b".to_string(),
class: Some("ws".to_string()),
obs: obs.clone(),
});
executor.register(RecordingTool {
name: "reader".to_string(),
class: None,
obs: obs.clone(),
});
let atom = ActAtom::new(executor, NoopEventEmitter);
let context = AtomContext::new(SessionId::new(), TurnId::new(), MessageId::new());
let input = ActInput {
org_id: Some(1),
context,
harness_id: HarnessId::from_seed(1),
agent_id: Some(AgentId::new()),
tool_calls: vec![
ToolCall {
id: "call_a".to_string(),
name: "writer_a".to_string(),
arguments: json!({}),
},
ToolCall {
id: "call_r".to_string(),
name: "reader".to_string(),
arguments: json!({}),
},
ToolCall {
id: "call_b".to_string(),
name: "writer_b".to_string(),
arguments: json!({}),
},
],
tool_definitions: vec![
recording_tool_def("writer_a", Some("ws"), false),
recording_tool_def("reader", None, false),
recording_tool_def("writer_b", Some("ws"), true),
],
locale: None,
blueprint_id: None,
network_access: None,
parallel_tool_calls: None,
};
let result = atom.execute(input).await.unwrap();
assert_eq!(result.success_count, 3, "all three tools should succeed");
let names: Vec<&str> = result
.results
.iter()
.map(|r| r.tool_call.name.as_str())
.collect();
assert_eq!(names, vec!["writer_a", "reader", "writer_b"]);
let obs = obs.lock().unwrap();
assert_eq!(
obs.class_max.get("ws").copied(),
Some(1),
"same-class tools must serialize"
);
assert!(
obs.global_max >= 2,
"independent tool should run concurrently with the class group (global_max={})",
obs.global_max
);
}
#[tokio::test]
async fn test_act_atom_aborts_cpu_bound_tool_task_on_cancellation() {
let started = Arc::new(tokio::sync::Notify::new());
let (dropped_tx, dropped_rx) = tokio::sync::oneshot::channel();
let mut executor = ToolRegistry::new();
executor.register(CancellationProbeTool::new(started.clone(), dropped_tx));
let atom = ActAtom::new(executor, NoopEventEmitter);
let context = AtomContext::new(SessionId::new(), TurnId::new(), MessageId::new());
let input = ActInput {
org_id: Some(1),
context,
harness_id: HarnessId::from_seed(1),
agent_id: Some(AgentId::new()),
tool_calls: vec![ToolCall {
id: "call_1".to_string(),
name: "cancellation_probe".to_string(),
arguments: json!({}),
}],
tool_definitions: vec![recording_tool_def("cancellation_probe", None, true)],
locale: None,
blueprint_id: None,
network_access: None,
parallel_tool_calls: None,
};
let act_task = tokio::spawn(async move { atom.execute(input).await });
started.notified().await;
act_task.abort();
assert!(act_task.await.unwrap_err().is_cancelled());
tokio::time::timeout(std::time::Duration::from_secs(1), dropped_rx)
.await
.expect("cpu-bound tool task should be aborted when ActAtom is cancelled")
.expect("drop signal should be sent by cancelled tool future");
}
#[tokio::test]
async fn test_act_atom_parallel_tool_calls_false_serializes_everything() {
let obs = Arc::new(std::sync::Mutex::new(SchedObservations::default()));
let mut executor = ToolRegistry::new();
for name in ["t0", "t1", "t2"] {
executor.register(RecordingTool {
name: name.to_string(),
class: None,
obs: obs.clone(),
});
}
let atom = ActAtom::new(executor, NoopEventEmitter);
let context = AtomContext::new(SessionId::new(), TurnId::new(), MessageId::new());
let input = ActInput {
org_id: Some(1),
context,
harness_id: HarnessId::from_seed(1),
agent_id: Some(AgentId::new()),
tool_calls: vec![
ToolCall {
id: "c0".to_string(),
name: "t0".to_string(),
arguments: json!({}),
},
ToolCall {
id: "c1".to_string(),
name: "t1".to_string(),
arguments: json!({}),
},
ToolCall {
id: "c2".to_string(),
name: "t2".to_string(),
arguments: json!({}),
},
],
tool_definitions: vec![
recording_tool_def("t0", None, false),
recording_tool_def("t1", None, false),
recording_tool_def("t2", None, false),
],
locale: None,
blueprint_id: None,
network_access: None,
parallel_tool_calls: Some(false),
};
let result = atom.execute(input).await.unwrap();
assert_eq!(result.success_count, 3);
assert_eq!(
obs.lock().unwrap().global_max,
1,
"parallel_tool_calls=false must serialize the whole batch"
);
}
#[tokio::test]
async fn test_act_atom_tool_not_found() {
let executor = ToolRegistry::with_defaults();
let event_emitter = NoopEventEmitter;
let atom = ActAtom::new(executor, event_emitter);
let context = AtomContext::new(SessionId::new(), TurnId::new(), MessageId::new());
let input = ActInput {
org_id: Some(1),
context,
harness_id: HarnessId::from_seed(1),
agent_id: Some(AgentId::new()),
tool_calls: vec![ToolCall {
id: "call_1".to_string(),
name: "nonexistent_tool".to_string(),
arguments: json!({}),
}],
tool_definitions: vec![],
locale: None,
blueprint_id: None,
network_access: None,
parallel_tool_calls: None,
};
let result = atom.execute(input).await.unwrap();
assert!(result.completed);
assert_eq!(result.results.len(), 1);
assert!(!result.results[0].success);
assert_eq!(result.results[0].status, "error");
assert!(
result.results[0]
.result
.error
.as_ref()
.unwrap()
.contains("not found")
);
}
#[tokio::test]
async fn test_act_atom_uses_tool_call_hooks_for_execution_arguments() {
use crate::capabilities::{Capability, HumanIntentCapability};
let mut executor = ToolRegistry::new();
executor.register(ArgumentEchoTool);
let tool_def = executor.get("argument_echo").unwrap().to_definition();
let emitter = crate::memory::InMemoryEventEmitter::new();
let atom = ActAtom::new(executor, emitter.clone())
.with_tool_call_hooks(HumanIntentCapability.tool_call_hooks());
let context = AtomContext::new(SessionId::new(), TurnId::new(), MessageId::new());
let input = ActInput {
org_id: Some(1),
context,
harness_id: HarnessId::from_seed(1),
agent_id: Some(AgentId::new()),
tool_calls: vec![ToolCall {
id: "call_1".to_string(),
name: "argument_echo".to_string(),
arguments: json!({
"value": "visible",
"human_intent": "Echoing test arguments"
}),
}],
tool_definitions: vec![tool_def],
locale: None,
blueprint_id: None,
network_access: None,
parallel_tool_calls: None,
};
let result = atom.execute(input).await.unwrap();
assert!(result.results[0].success);
assert_eq!(
result.results[0].result.result,
Some(json!({ "value": "visible" }))
);
let events = emitter.events().await;
let act_started = events
.iter()
.find(|event| event.event_type == "act.started")
.expect("act.started event");
let crate::events::EventData::ActStarted(data) = &act_started.data else {
panic!("expected act.started data");
};
assert_eq!(data.headline.as_deref(), Some("Echoing test arguments"));
assert_eq!(
data.tool_calls[0].narration.as_deref(),
Some("Echoing test arguments")
);
let tool_started = events
.iter()
.find(|event| event.event_type == "tool.started")
.expect("tool.started event");
let crate::events::EventData::ToolStarted(data) = &tool_started.data else {
panic!("expected tool.started data");
};
let started_fingerprint = data
.tool_call_fingerprint
.as_ref()
.expect("tool.started call fingerprint");
assert_eq!(data.narration.as_deref(), Some("Echoing test arguments"));
let tool_completed = events
.iter()
.find(|event| event.event_type == "tool.completed")
.expect("tool.completed event");
let crate::events::EventData::ToolCompleted(data) = &tool_completed.data else {
panic!("expected tool.completed data");
};
assert_eq!(
data.tool_call_fingerprint.as_ref(),
Some(started_fingerprint)
);
assert!(data.tool_result_fingerprint.is_some());
assert_eq!(data.narration.as_deref(), Some("Echoing test arguments"));
}
#[tokio::test]
async fn test_act_atom_strips_human_intent_from_client_tool_calls() {
use crate::capabilities::{Capability, HumanIntentCapability};
let executor = ToolRegistry::new();
let emitter = crate::memory::InMemoryEventEmitter::new();
let atom = ActAtom::new(executor, emitter)
.with_tool_call_hooks(HumanIntentCapability.tool_call_hooks());
let context = AtomContext::new(SessionId::new(), TurnId::new(), MessageId::new());
let input = ActInput {
org_id: Some(1),
context,
harness_id: HarnessId::from_seed(1),
agent_id: Some(AgentId::new()),
tool_calls: vec![ToolCall {
id: "call_client".to_string(),
name: "browser_click".to_string(),
arguments: json!({
"selector": "#btn",
"human_intent": "Clicking approve"
}),
}],
tool_definitions: vec![crate::ToolDefinition::ClientSide(crate::ClientSideTool {
name: "browser_click".to_string(),
display_name: None,
description: "Click button".to_string(),
parameters: json!({
"type": "object",
"properties": {
"selector": {"type": "string"}
},
"required": ["selector"]
}),
category: None,
deferrable: Default::default(),
hints: crate::tool_types::ToolHints::default(),
full_parameters: None,
})],
locale: None,
blueprint_id: None,
network_access: None,
parallel_tool_calls: None,
};
let result = atom.execute(input).await.unwrap();
assert_eq!(result.client_tool_calls.len(), 1);
assert_eq!(
result.client_tool_calls[0].arguments,
json!({ "selector": "#btn" })
);
}
fn manage_harnesses_tool_def() -> crate::ToolDefinition {
crate::ToolDefinition::Builtin(crate::BuiltinTool {
name: "manage_harnesses".to_string(),
display_name: None,
description: "CRUD operations for harnesses".to_string(),
parameters: json!({
"type": "object",
"properties": {
"operation": {"type": "string"}
},
"required": ["operation"]
}),
policy: Default::default(),
category: None,
deferrable: Default::default(),
hints: crate::tool_types::ToolHints::default(),
full_parameters: None,
})
}
fn read_capabilities_tool_def() -> crate::ToolDefinition {
crate::ToolDefinition::Builtin(crate::BuiltinTool {
name: "read_capabilities".to_string(),
display_name: None,
description: "List capabilities".to_string(),
parameters: json!({
"type": "object",
"properties": {
"id": {"type": "string"},
"search": {"type": "string"}
}
}),
policy: Default::default(),
category: None,
deferrable: Default::default(),
hints: crate::tool_types::ToolHints::default(),
full_parameters: None,
})
}
#[tokio::test]
async fn test_act_atom_platform_tool_works_with_platform_store() {
use crate::capabilities::{Capability, PlatformManagementCapability};
let mut executor = ToolRegistry::with_defaults();
for tool in PlatformManagementCapability.tools() {
executor.register_boxed(tool);
}
let event_emitter = NoopEventEmitter;
let mock_store = crate::platform_store::tests::MockPlatformStore::new();
let platform_store: Arc<dyn crate::platform_store::PlatformStore> = Arc::new(mock_store);
let atom = ActAtom::new(executor, event_emitter).with_platform_store(platform_store);
let context = AtomContext::new(SessionId::new(), TurnId::new(), MessageId::new());
let input = ActInput {
org_id: Some(1),
context,
harness_id: HarnessId::from_seed(1),
agent_id: None,
tool_calls: vec![ToolCall {
id: "call_1".to_string(),
name: "read_capabilities".to_string(),
arguments: json!({}),
}],
tool_definitions: vec![read_capabilities_tool_def()],
locale: None,
blueprint_id: None,
network_access: None,
parallel_tool_calls: None,
};
let result = atom.execute(input).await.unwrap();
assert!(result.completed);
assert_eq!(result.results.len(), 1);
assert!(
result.results[0].success,
"read_capabilities should succeed when platform_store is wired: {:?}",
result.results[0].result.error
);
}
#[tokio::test]
async fn test_act_atom_platform_tool_fails_without_platform_store() {
use crate::capabilities::{Capability, PlatformManagementCapability};
let mut executor = ToolRegistry::with_defaults();
for tool in PlatformManagementCapability.tools() {
executor.register_boxed(tool);
}
let event_emitter = NoopEventEmitter;
let atom = ActAtom::new(executor, event_emitter);
let context = AtomContext::new(SessionId::new(), TurnId::new(), MessageId::new());
let input = ActInput {
org_id: Some(1),
context,
harness_id: HarnessId::from_seed(1),
agent_id: None,
tool_calls: vec![ToolCall {
id: "call_1".to_string(),
name: "manage_harnesses".to_string(),
arguments: json!({"operation": "list"}),
}],
tool_definitions: vec![manage_harnesses_tool_def()],
locale: None,
blueprint_id: None,
network_access: None,
parallel_tool_calls: None,
};
let result = atom.execute(input).await.unwrap();
assert!(result.completed);
assert_eq!(result.results.len(), 1);
assert!(!result.results[0].success);
let err_msg = result.results[0].result.error.as_deref().unwrap();
assert!(
err_msg.contains("Platform management not available"),
"Expected platform management error, got: {err_msg}"
);
}
#[test]
fn test_act_result_connection_required_serialization() {
let result = ActResult {
results: vec![ToolCallResult {
tool_call: ToolCall {
id: "call_1".to_string(),
name: "daytona_create_sandbox".to_string(),
arguments: json!({}),
},
result: ToolResult {
tool_call_id: "call_1".to_string(),
result: Some(json!({"connection_required": "daytona"})),
images: None,
error: None,
connection_required: Some("daytona".to_string()),
raw_output: None,
},
success: false,
status: "success".to_string(),
connection_required: Some("daytona".to_string()),
}],
completed: true,
success_count: 0,
error_count: 0,
waiting_for_tool_results: true,
blocked: false,
client_tool_calls: vec![],
client_tool_definitions: vec![],
};
let json_str = serde_json::to_string(&result).unwrap();
let parsed: ActResult = serde_json::from_str(&json_str).unwrap();
assert!(parsed.waiting_for_tool_results);
assert_eq!(
parsed.results[0].connection_required,
Some("daytona".to_string())
);
}
#[test]
fn test_act_result_backward_compat_deserialization() {
let json_str = r#"{
"results": [],
"completed": true,
"success_count": 0,
"error_count": 0
}"#;
let parsed: ActResult = serde_json::from_str(json_str).unwrap();
assert!(!parsed.waiting_for_tool_results);
assert!(parsed.client_tool_calls.is_empty());
}
#[tokio::test]
async fn test_outbound_tool_rate_limiter_blocks_execution() {
use crate::typed_id::OrgId;
struct DenyAll;
#[async_trait]
impl crate::traits::OutboundToolRateLimiter for DenyAll {
async fn check_org(&self, _org_id: &OrgId) -> bool {
false
}
}
let mut executor = ToolRegistry::with_defaults();
executor.register(ArgumentEchoTool);
let atom = ActAtom::new(executor, NoopEventEmitter)
.with_org_id(OrgId::from_seed(1))
.with_outbound_tool_rate_limiter(Arc::new(DenyAll));
let context = AtomContext::new(SessionId::new(), TurnId::new(), MessageId::new());
let input = ActInput {
org_id: Some(1),
context,
harness_id: HarnessId::from_seed(1),
agent_id: Some(AgentId::new()),
tool_calls: vec![ToolCall {
id: "call_1".to_string(),
name: "argument_echo".to_string(),
arguments: json!({"value": "should_not_reach"}),
}],
tool_definitions: vec![ToolDefinition::Builtin(crate::BuiltinTool {
name: "argument_echo".to_string(),
display_name: None,
description: "echo".to_string(),
parameters: json!({"type": "object"}),
policy: Default::default(),
category: None,
deferrable: Default::default(),
hints: crate::tool_types::ToolHints::default(),
full_parameters: None,
})],
locale: None,
blueprint_id: None,
network_access: None,
parallel_tool_calls: None,
};
let result = atom.execute(input).await.unwrap();
assert_eq!(result.success_count, 0);
assert_eq!(result.error_count, 1);
let tool_result = &result.results[0];
assert!(!tool_result.success);
assert_eq!(tool_result.status, "error");
assert!(
tool_result
.result
.error
.as_deref()
.unwrap_or("")
.contains("rate limit exceeded")
);
assert!(tool_result.result.result.is_none());
}
#[tokio::test]
async fn test_outbound_tool_rate_limiter_allows_execution() {
use crate::typed_id::OrgId;
struct AllowAll;
#[async_trait]
impl crate::traits::OutboundToolRateLimiter for AllowAll {
async fn check_org(&self, _org_id: &OrgId) -> bool {
true
}
}
let mut executor = ToolRegistry::with_defaults();
executor.register(ArgumentEchoTool);
let atom = ActAtom::new(executor, NoopEventEmitter)
.with_org_id(OrgId::from_seed(1))
.with_outbound_tool_rate_limiter(Arc::new(AllowAll));
let context = AtomContext::new(SessionId::new(), TurnId::new(), MessageId::new());
let input = ActInput {
org_id: Some(1),
context,
harness_id: HarnessId::from_seed(1),
agent_id: Some(AgentId::new()),
tool_calls: vec![ToolCall {
id: "call_1".to_string(),
name: "argument_echo".to_string(),
arguments: json!({"value": "hello"}),
}],
tool_definitions: vec![ToolDefinition::Builtin(crate::BuiltinTool {
name: "argument_echo".to_string(),
display_name: None,
description: "echo".to_string(),
parameters: json!({"type": "object"}),
policy: Default::default(),
category: None,
deferrable: Default::default(),
hints: crate::tool_types::ToolHints::default(),
full_parameters: None,
})],
locale: None,
blueprint_id: None,
network_access: None,
parallel_tool_calls: None,
};
let result = atom.execute(input).await.unwrap();
assert_eq!(result.success_count, 1);
assert_eq!(result.error_count, 0);
}
}