#[cfg(not(target_arch = "wasm32"))]
use crate::builtin::shell::{JobManager, ShellConfig};
#[cfg(feature = "skills")]
use crate::builtin::skills::SkillToolSet;
use crate::builtin::store::TaskStore;
use crate::builtin::{BuiltinTool, BuiltinToolConfig, BuiltinToolError, ToolOutput};
use async_trait::async_trait;
use meerkat_core::AgentToolDispatcher;
use meerkat_core::ExternalToolUpdate;
use meerkat_core::agent::{BindOutcome, DispatcherCapabilities, OpsLifecycleBindError};
use meerkat_core::error::ToolError;
use meerkat_core::ops::ToolDispatchOutcome;
use meerkat_core::ops_lifecycle::OpsLifecycleRegistry;
use meerkat_core::types::{SessionId, ToolCallView, ToolDef, ToolResult};
use serde_json::Value;
use std::collections::HashSet;
#[cfg(not(target_arch = "wasm32"))]
use std::path::PathBuf;
use std::sync::Arc;
#[derive(Debug, thiserror::Error)]
pub enum CompositeDispatcherError {
#[error("Builtin tool error: {0}")]
Builtin(#[from] BuiltinToolError),
#[error("Tool collision: name '{0}' is already registered")]
Collision(String),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Tool initialization failed for '{name}': {message}")]
ToolInitFailed { name: String, message: String },
}
pub struct CompositeDispatcher {
builtin_tools: Vec<Arc<dyn BuiltinTool>>,
#[cfg(feature = "skills")]
skill_tools: Option<SkillToolSet>,
external: Option<Arc<dyn AgentToolDispatcher>>,
#[cfg_attr(target_arch = "wasm32", allow(dead_code))]
builtin_config: BuiltinToolConfig,
#[allow(dead_code)]
task_store: Arc<dyn TaskStore>,
#[cfg(not(target_arch = "wasm32"))]
project_root: Option<PathBuf>,
#[cfg(not(target_arch = "wasm32"))]
shell_config: Option<ShellConfig>,
#[allow(dead_code)]
session_id: Option<String>,
#[cfg_attr(target_arch = "wasm32", allow(dead_code))]
image_tool_results: bool,
#[cfg(not(target_arch = "wasm32"))]
#[allow(dead_code)]
job_manager: Option<Arc<JobManager>>,
allowed_tools: HashSet<String>,
wait_interrupt_rx: Option<meerkat_core::wait_interrupt::WaitInterruptReceiver>,
completion_feed: Option<std::sync::Arc<dyn meerkat_core::completion_feed::CompletionFeed>>,
interrupt_baseline: Option<std::sync::Arc<std::sync::atomic::AtomicU64>>,
}
impl CompositeDispatcher {
#[cfg(not(target_arch = "wasm32"))]
pub fn new(
task_store: Arc<dyn TaskStore>,
config: &BuiltinToolConfig,
project_root: Option<PathBuf>,
shell_config: Option<ShellConfig>,
external: Option<Arc<dyn AgentToolDispatcher>>,
session_id: Option<String>,
_image_tool_results: bool,
) -> Result<Self, CompositeDispatcherError> {
Self::new_with_ops_lifecycle(
task_store,
config,
project_root,
shell_config,
external,
session_id,
None,
_image_tool_results,
)
}
#[cfg(not(target_arch = "wasm32"))]
#[allow(clippy::too_many_arguments)]
pub fn new_with_ops_lifecycle(
task_store: Arc<dyn TaskStore>,
config: &BuiltinToolConfig,
project_root: Option<PathBuf>,
shell_config: Option<ShellConfig>,
external: Option<Arc<dyn AgentToolDispatcher>>,
session_id: Option<String>,
ops_lifecycle: Option<Arc<dyn OpsLifecycleRegistry>>,
_image_tool_results: bool,
) -> Result<Self, CompositeDispatcherError> {
let mut builtin_tools: Vec<Arc<dyn BuiltinTool>> = Vec::new();
let shell_session_id = session_id.clone();
let project_root = project_root
.or_else(|| shell_config.as_ref().map(|cfg| cfg.project_root.clone()))
.or_else(|| std::env::current_dir().ok())
.ok_or_else(|| CompositeDispatcherError::ToolInitFailed {
name: "apply_patch".to_string(),
message: "failed to resolve project root".to_string(),
})?;
use crate::builtin::tasks::{TaskCreateTool, TaskGetTool, TaskListTool, TaskUpdateTool};
builtin_tools.push(Arc::new(TaskListTool::new(task_store.clone())));
builtin_tools.push(Arc::new(TaskGetTool::new(task_store.clone())));
builtin_tools.push(Arc::new(TaskCreateTool::with_session_opt(
task_store.clone(),
session_id.clone(),
)));
builtin_tools.push(Arc::new(TaskUpdateTool::with_session_opt(
task_store.clone(),
session_id,
)));
use crate::builtin::utility::{ApplyPatchTool, DateTimeTool, ViewImageTool, WaitTool};
builtin_tools.push(Arc::new(WaitTool::new()));
builtin_tools.push(Arc::new(DateTimeTool::new()));
builtin_tools.push(Arc::new(ApplyPatchTool::new(project_root.clone())));
builtin_tools.push(Arc::new(ViewImageTool::new(project_root.clone())));
let job_manager = if let Some(ref cfg) = shell_config {
if cfg.enabled {
let mut manager = JobManager::new(cfg.clone());
if let Some(session_id) = shell_session_id
.as_deref()
.and_then(|id| meerkat_core::types::SessionId::parse(id).ok())
{
manager = manager.with_owner_session_id(session_id);
}
if let Some(registry) = ops_lifecycle {
manager = manager.with_ops_registry(registry);
}
let mgr = Arc::new(manager);
use crate::builtin::shell::{
ShellJobCancelTool, ShellJobStatusTool, ShellJobsListTool, ShellTool,
};
builtin_tools.push(Arc::new(ShellTool::with_job_manager(
cfg.clone(),
mgr.clone(),
)));
builtin_tools.push(Arc::new(ShellJobStatusTool::new(mgr.clone())));
builtin_tools.push(Arc::new(ShellJobsListTool::new(mgr.clone())));
builtin_tools.push(Arc::new(ShellJobCancelTool::new(mgr.clone())));
Some(mgr)
} else {
None
}
} else {
None
};
let mut allowed_tools = HashSet::new();
let resolved_policy = config.resolve();
for tool in &builtin_tools {
let name = tool.name().to_string();
if resolved_policy.is_enabled(&name, tool.default_enabled()) {
allowed_tools.insert(name);
}
}
Ok(Self {
builtin_tools,
#[cfg(feature = "skills")]
skill_tools: None,
external,
builtin_config: config.clone(),
task_store,
project_root: Some(project_root),
shell_config,
session_id: shell_session_id,
image_tool_results: _image_tool_results,
job_manager,
allowed_tools,
wait_interrupt_rx: None,
completion_feed: None,
interrupt_baseline: None,
})
}
#[cfg(target_arch = "wasm32")]
pub fn new_wasm(
task_store: Arc<dyn TaskStore>,
config: &BuiltinToolConfig,
external: Option<Arc<dyn AgentToolDispatcher>>,
session_id: Option<String>,
) -> Result<Self, CompositeDispatcherError> {
let mut builtin_tools: Vec<Arc<dyn BuiltinTool>> = Vec::new();
use crate::builtin::tasks::{TaskCreateTool, TaskGetTool, TaskListTool, TaskUpdateTool};
builtin_tools.push(Arc::new(TaskListTool::new(task_store.clone())));
builtin_tools.push(Arc::new(TaskGetTool::new(task_store.clone())));
builtin_tools.push(Arc::new(TaskCreateTool::with_session_opt(
task_store.clone(),
session_id.clone(),
)));
builtin_tools.push(Arc::new(TaskUpdateTool::with_session_opt(
task_store.clone(),
session_id.clone(),
)));
use crate::builtin::utility::{DateTimeTool, WaitTool};
builtin_tools.push(Arc::new(WaitTool::new()));
builtin_tools.push(Arc::new(DateTimeTool::new()));
let mut allowed_tools = HashSet::new();
let resolved_policy = config.resolve();
for tool in &builtin_tools {
let name = tool.name().to_string();
if resolved_policy.is_enabled(&name, tool.default_enabled()) {
allowed_tools.insert(name);
}
}
Ok(Self {
builtin_tools,
#[cfg(feature = "skills")]
skill_tools: None,
external,
builtin_config: config.clone(),
task_store,
session_id,
image_tool_results: true,
allowed_tools,
wait_interrupt_rx: None,
completion_feed: None,
interrupt_baseline: None,
})
}
#[cfg(feature = "skills")]
pub fn register_skill_tools(&mut self, tool_set: SkillToolSet) {
for tool in tool_set.tools() {
self.allowed_tools.insert(tool.name().to_string());
}
self.skill_tools = Some(tool_set);
}
pub fn usage_instructions(&self) -> String {
let mut out = String::from("# Available Tools\n\n");
let mut seen_names: HashSet<String> = HashSet::new();
for tool in &self.builtin_tools {
if self.allowed_tools.contains(tool.name()) {
seen_names.insert(tool.name().to_string());
{
use std::fmt::Write;
let _ = write!(out, "## {}\n{}\n\n", tool.name(), tool.def().description);
}
}
}
if let Some(ref ext) = self.external {
let mut wrote_external_header = false;
for tool in ext.tools().iter() {
if seen_names.contains(&tool.name) {
continue;
}
if !wrote_external_header {
out.push_str(
"## External tools\nProvided by integrated runtimes/services.\n\n",
);
wrote_external_header = true;
}
seen_names.insert(tool.name.clone());
{
use std::fmt::Write;
let _ = write!(out, "## {}\n{}\n\n", tool.name, tool.description);
}
}
}
out
}
#[cfg(not(target_arch = "wasm32"))]
pub fn shell_job_manager(&self) -> Option<Arc<JobManager>> {
self.job_manager.clone()
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl AgentToolDispatcher for CompositeDispatcher {
fn tools(&self) -> Arc<[Arc<ToolDef>]> {
let mut tools = Vec::new();
let mut seen_names = HashSet::new();
for tool in &self.builtin_tools {
if self.allowed_tools.contains(tool.name()) {
seen_names.insert(tool.name().to_string());
tools.push(Arc::new(tool.def()));
}
}
#[cfg(feature = "skills")]
if let Some(ref skill) = self.skill_tools {
for tool in skill.tools() {
if self.allowed_tools.contains(tool.name()) {
seen_names.insert(tool.name().to_string());
tools.push(Arc::new(tool.def()));
}
}
}
if let Some(ref ext) = self.external {
for tool in ext.tools().iter() {
if !seen_names.contains(&tool.name) {
seen_names.insert(tool.name.clone());
tools.push(Arc::clone(tool));
}
}
}
tools.into()
}
async fn dispatch(&self, call: ToolCallView<'_>) -> Result<ToolDispatchOutcome, ToolError> {
let args: Value =
serde_json::from_str(call.args.get()).map_err(|e| ToolError::InvalidArguments {
name: call.name.to_string(),
reason: e.to_string(),
})?;
if !self.allowed_tools.contains(call.name) {
if let Some(ref ext) = self.external
&& ext.tools().iter().any(|t| t.name == call.name)
{
return ext.dispatch(call).await;
}
return Err(ToolError::NotFound {
name: call.name.to_string(),
});
}
for tool in &self.builtin_tools {
if tool.name() == call.name {
let output = tool.call(args.clone()).await.map_err(|e| match e {
BuiltinToolError::InvalidArgs(msg) => ToolError::InvalidArguments {
name: call.name.to_string(),
reason: msg,
},
BuiltinToolError::ExecutionFailed(msg) => {
ToolError::ExecutionFailed { message: msg }
}
BuiltinToolError::TaskError(te) => ToolError::ExecutionFailed { message: te },
})?;
let async_ops = tool.async_ops_for_output(&output);
let result = match output {
ToolOutput::Json(value) => {
let content = match &value {
Value::String(s) => s.clone(),
_ => serde_json::to_string(&value).unwrap_or_default(),
};
ToolResult::new(call.id.to_string(), content, false)
}
ToolOutput::Blocks(blocks) => {
ToolResult::with_blocks(call.id.to_string(), blocks, false)
}
};
return Ok(ToolDispatchOutcome { result, async_ops });
}
}
#[cfg(feature = "skills")]
if let Some(ref skill) = self.skill_tools {
for tool in skill.tools() {
if tool.name() == call.name {
let output = tool.call(args.clone()).await.map_err(|e| match e {
BuiltinToolError::InvalidArgs(msg) => ToolError::InvalidArguments {
name: call.name.to_string(),
reason: msg,
},
BuiltinToolError::ExecutionFailed(msg) => {
ToolError::ExecutionFailed { message: msg }
}
BuiltinToolError::TaskError(te) => {
ToolError::ExecutionFailed { message: te }
}
})?;
let async_ops = tool.async_ops_for_output(&output);
let result = match output {
ToolOutput::Json(value) => {
let content = match &value {
Value::String(s) => s.clone(),
_ => serde_json::to_string(&value).unwrap_or_default(),
};
ToolResult::new(call.id.to_string(), content, false)
}
ToolOutput::Blocks(blocks) => {
ToolResult::with_blocks(call.id.to_string(), blocks, false)
}
};
return Ok(ToolDispatchOutcome { result, async_ops });
}
}
}
if let Some(ref ext) = self.external
&& ext.tools().iter().any(|t| t.name == call.name)
{
return ext.dispatch(call).await;
}
Err(ToolError::NotFound {
name: call.name.to_string(),
})
}
async fn poll_external_updates(&self) -> ExternalToolUpdate {
#[allow(unused_mut)]
let mut update = if let Some(ref ext) = self.external {
ext.poll_external_updates().await
} else {
ExternalToolUpdate::default()
};
#[cfg(not(target_arch = "wasm32"))]
if let Some(ref mgr) = self.job_manager {
let drained = mgr.drain_completed().await;
if self.completion_feed.is_none() {
update.background_completions.extend(drained);
}
}
update
}
fn capabilities(&self) -> DispatcherCapabilities {
let has_wait = self.builtin_tools.iter().any(|t| t.name() == "wait");
let mut ops_lifecycle = false;
#[cfg(not(target_arch = "wasm32"))]
if self.job_manager.is_some() {
ops_lifecycle = true;
}
if !ops_lifecycle {
ops_lifecycle = self
.external
.as_ref()
.is_some_and(|ext| ext.capabilities().ops_lifecycle);
}
DispatcherCapabilities {
wait_interrupt: has_wait,
ops_lifecycle,
completion_feed: has_wait,
}
}
fn bind_wait_interrupt(
self: Arc<Self>,
rx: meerkat_core::wait_interrupt::WaitInterruptReceiver,
) -> Result<BindOutcome, meerkat_core::wait_interrupt::WaitInterruptBindError> {
let mut owned = Arc::try_unwrap(self)
.map_err(|_| meerkat_core::wait_interrupt::WaitInterruptBindError::SharedOwnership)?;
use crate::builtin::utility::WaitTool;
let new_wait = Arc::new(WaitTool::with_interrupt_and_feed(
rx.clone(),
owned.completion_feed.clone(),
owned.interrupt_baseline.clone(),
));
for tool in &mut owned.builtin_tools {
if tool.name() == "wait" {
*tool = new_wait;
break;
}
}
owned.wait_interrupt_rx = Some(rx);
Ok(BindOutcome::Bound(Arc::new(owned)))
}
fn bind_completion_feed(
self: Arc<Self>,
feed: std::sync::Arc<dyn meerkat_core::completion_feed::CompletionFeed>,
baseline: std::sync::Arc<std::sync::atomic::AtomicU64>,
) -> Result<BindOutcome, meerkat_core::wait_interrupt::WaitInterruptBindError> {
let mut owned = Arc::try_unwrap(self)
.map_err(|_| meerkat_core::wait_interrupt::WaitInterruptBindError::SharedOwnership)?;
use crate::builtin::utility::WaitTool;
let new_wait = Arc::new(WaitTool::with_feed_only(
Arc::clone(&feed),
Arc::clone(&baseline),
));
for tool in &mut owned.builtin_tools {
if tool.name() == "wait" {
*tool = new_wait;
break;
}
}
owned.completion_feed = Some(feed);
owned.interrupt_baseline = Some(baseline);
Ok(BindOutcome::Bound(Arc::new(owned)))
}
fn bind_ops_lifecycle(
self: Arc<Self>,
registry: Arc<dyn OpsLifecycleRegistry>,
owner_session_id: SessionId,
) -> Result<BindOutcome, OpsLifecycleBindError> {
let mut owned =
Arc::try_unwrap(self).map_err(|_| OpsLifecycleBindError::SharedOwnership)?;
#[allow(clippy::redundant_clone)]
let rebound_external = match owned.external.take() {
Some(external)
if external.capabilities().ops_lifecycle && Arc::strong_count(&external) == 1 =>
{
Some(
external
.bind_ops_lifecycle(Arc::clone(®istry), owner_session_id.clone())?
.into_dispatcher(),
)
}
other => other,
};
#[cfg(not(target_arch = "wasm32"))]
{
if owned.job_manager.is_none() && rebound_external.is_none() {
return Err(OpsLifecycleBindError::Unsupported);
}
let old_wait = owned
.builtin_tools
.iter()
.find(|t| t.name() == "wait")
.cloned();
let interrupt_rx = owned.wait_interrupt_rx.take();
let feed = owned.completion_feed.take();
let baseline = owned.interrupt_baseline.take();
#[cfg_attr(not(feature = "skills"), allow(unused_mut))]
let mut rebound = CompositeDispatcher::new_with_ops_lifecycle(
Arc::clone(&owned.task_store),
&owned.builtin_config,
owned.project_root.clone(),
owned.shell_config.clone(),
rebound_external,
Some(owner_session_id.to_string()),
Some(registry),
owned.image_tool_results,
)
.map_err(|_| OpsLifecycleBindError::Unsupported)?;
rebound.wait_interrupt_rx = interrupt_rx;
rebound.completion_feed = feed;
rebound.interrupt_baseline = baseline;
if let Some(configured_wait) = old_wait {
for tool in &mut rebound.builtin_tools {
if tool.name() == "wait" {
*tool = configured_wait;
break;
}
}
}
#[cfg(feature = "skills")]
if let Some(skill_tools) = owned.skill_tools.take() {
rebound.register_skill_tools(skill_tools);
}
Ok(BindOutcome::Bound(Arc::new(rebound)))
}
#[cfg(target_arch = "wasm32")]
{
let _ = registry;
let _ = owner_session_id;
let _ = rebound_external;
Err(OpsLifecycleBindError::Unsupported)
}
}
fn completion_enrichment(
&self,
) -> Option<Arc<dyn meerkat_core::completion_feed::CompletionEnrichmentProvider>> {
#[cfg(not(target_arch = "wasm32"))]
if let Some(ref mgr) = self.job_manager {
return Some(Arc::clone(mgr)
as Arc<
dyn meerkat_core::completion_feed::CompletionEnrichmentProvider,
>);
}
None
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
use crate::builtin::MemoryTaskStore;
use meerkat_core::ops_lifecycle::OpsLifecycleRegistry;
use meerkat_core::types::SessionId;
use serde_json::json;
use tempfile::TempDir;
struct MockExternalDispatcher {
tools: Arc<[Arc<ToolDef>]>,
}
impl MockExternalDispatcher {
fn new(name: &str, description: &str) -> Self {
let tools: Arc<[Arc<ToolDef>]> = Arc::from([Arc::new(ToolDef {
name: name.to_string(),
description: description.to_string(),
input_schema: json!({
"type": "object",
"properties": {},
"required": []
}),
})]);
Self { tools }
}
}
#[async_trait]
impl AgentToolDispatcher for MockExternalDispatcher {
fn tools(&self) -> Arc<[Arc<ToolDef>]> {
self.tools.clone()
}
async fn dispatch(&self, call: ToolCallView<'_>) -> Result<ToolDispatchOutcome, ToolError> {
if self.tools.iter().any(|tool| tool.name == call.name) {
return Ok(ToolResult::new(call.id.to_string(), "{}".to_string(), false).into());
}
Err(ToolError::not_found(call.name))
}
}
#[test]
fn usage_instructions_include_external_tools() {
let store = Arc::new(MemoryTaskStore::new());
let external: Arc<dyn AgentToolDispatcher> =
Arc::new(MockExternalDispatcher::new("mob_list", "List active mobs"));
let dispatcher = CompositeDispatcher::new(
store,
&BuiltinToolConfig::default(),
None,
None,
Some(external),
None,
true,
)
.expect("composite dispatcher should build");
let usage = dispatcher.usage_instructions();
assert!(usage.contains("External tools"));
assert!(usage.contains("mob_list"));
assert!(usage.contains("List active mobs"));
}
#[tokio::test]
async fn bind_wait_interrupt_swaps_in_interrupt_aware_wait_tool() {
use crate::builtin::utility::WaitInterrupt;
use meerkat_core::time_compat::Duration;
let store = Arc::new(MemoryTaskStore::new());
let dispatcher = Arc::new(
CompositeDispatcher::new(
store,
&BuiltinToolConfig::default(),
None,
None,
None,
None,
true,
)
.expect("composite dispatcher should build"),
);
let (tx, rx) = tokio::sync::watch::channel(None::<WaitInterrupt>);
let rebound = dispatcher
.bind_wait_interrupt(rx)
.expect("bind_wait_interrupt should succeed")
.into_dispatcher();
let call_json =
serde_json::value::RawValue::from_string(r#"{"seconds": 30.0}"#.to_string()).unwrap();
let call = ToolCallView {
id: "test-id",
name: "wait",
args: &call_json,
};
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(50)).await;
let _ = tx.send(Some(WaitInterrupt {
reason: "bind test interrupt".to_string(),
}));
});
let start = std::time::Instant::now();
let result = rebound
.dispatch(call)
.await
.expect("dispatch should succeed");
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_secs(2),
"wait should be interrupted quickly, took {elapsed:?}"
);
let content: serde_json::Value =
serde_json::from_str(&result.result.text_content()).unwrap();
assert_eq!(content["status"], "interrupted");
}
#[test]
#[allow(clippy::panic)]
fn bind_wait_interrupt_shared_ownership_error() {
let store = Arc::new(MemoryTaskStore::new());
let dispatcher = Arc::new(
CompositeDispatcher::new(
store,
&BuiltinToolConfig::default(),
None,
None,
None,
None,
true,
)
.expect("composite dispatcher should build"),
);
let _clone = Arc::clone(&dispatcher);
let (_tx, rx) =
tokio::sync::watch::channel(None::<meerkat_core::wait_interrupt::WaitInterrupt>);
match dispatcher.bind_wait_interrupt(rx) {
Err(meerkat_core::wait_interrupt::WaitInterruptBindError::SharedOwnership) => {}
Ok(_) => panic!("expected SharedOwnership error, got Ok"),
Err(e) => panic!("expected SharedOwnership, got {e:?}"),
}
}
#[tokio::test]
async fn dispatch_json_string_produces_text_result() {
let store = Arc::new(MemoryTaskStore::new());
let dispatcher = CompositeDispatcher::new(
store,
&BuiltinToolConfig::default(),
None,
None,
None,
None,
true,
)
.expect("composite dispatcher should build");
let call_json =
serde_json::value::RawValue::from_string(r#"{"seconds": 0.1}"#.to_string()).unwrap();
let call = ToolCallView {
id: "test-str",
name: "wait",
args: &call_json,
};
let result = dispatcher
.dispatch(call)
.await
.expect("dispatch should succeed");
assert!(!result.result.is_error);
let parsed: serde_json::Value = serde_json::from_str(&result.result.text_content())
.expect("content should be valid JSON");
assert_eq!(parsed["status"], "complete");
}
#[tokio::test]
async fn dispatch_json_object_produces_serialized_text() {
let store = Arc::new(MemoryTaskStore::new());
let dispatcher = CompositeDispatcher::new(
store,
&BuiltinToolConfig::default(),
None,
None,
None,
None,
true,
)
.expect("composite dispatcher should build");
let call_json = serde_json::value::RawValue::from_string("{}".to_string()).unwrap();
let call = ToolCallView {
id: "test-obj",
name: "datetime",
args: &call_json,
};
let result = dispatcher
.dispatch(call)
.await
.expect("dispatch should succeed");
assert!(!result.result.is_error);
let parsed: serde_json::Value = serde_json::from_str(&result.result.text_content())
.expect("content should be valid JSON");
assert!(
parsed.get("iso8601").is_some(),
"should contain iso8601 field"
);
}
#[tokio::test]
async fn dispatch_forwards_allowed_external_tool_calls() {
let store = Arc::new(MemoryTaskStore::new());
let external: Arc<dyn AgentToolDispatcher> =
Arc::new(MockExternalDispatcher::new("mob_list", "List active mobs"));
let dispatcher = CompositeDispatcher::new(
store,
&BuiltinToolConfig::default(),
None,
None,
Some(external),
None,
true,
)
.expect("composite dispatcher should build");
let call_json = serde_json::value::RawValue::from_string("{}".to_string()).unwrap();
let call = ToolCallView {
id: "ext-1",
name: "mob_list",
args: &call_json,
};
let result = dispatcher
.dispatch(call)
.await
.expect("external tool dispatch should succeed");
assert_eq!(result.result.text_content(), "{}");
}
#[tokio::test]
async fn dispatch_forwards_external_tool_calls_when_allowed_set_contains_name() {
let store = Arc::new(MemoryTaskStore::new());
let external: Arc<dyn AgentToolDispatcher> =
Arc::new(MockExternalDispatcher::new("mob_list", "List active mobs"));
let mut dispatcher = CompositeDispatcher::new(
store,
&BuiltinToolConfig::default(),
None,
None,
Some(external),
None,
true,
)
.expect("composite dispatcher should build");
dispatcher.allowed_tools.insert("mob_list".to_string());
let call_json = serde_json::value::RawValue::from_string("{}".to_string()).unwrap();
let call = ToolCallView {
id: "ext-2",
name: "mob_list",
args: &call_json,
};
let result = dispatcher
.dispatch(call)
.await
.expect("external tool dispatch should succeed even with a stale allow-set entry");
assert_eq!(result.result.text_content(), "{}");
}
#[test]
fn supports_ops_lifecycle_binding_when_shell_tools_present() {
let temp_dir = TempDir::new().unwrap();
let store = Arc::new(MemoryTaskStore::new());
let shell_config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
let mut config = BuiltinToolConfig::default();
config.policy.enable.insert("shell".to_string());
config.policy.enable.insert("shell_job_cancel".to_string());
let dispatcher = CompositeDispatcher::new(
store,
&config,
None,
Some(shell_config),
None,
Some(SessionId::new().to_string()),
true,
)
.expect("composite dispatcher should build");
assert!(
dispatcher.capabilities().ops_lifecycle,
"shell-enabled composite dispatcher should support ops lifecycle binding"
);
assert!(
!dispatcher
.shell_job_manager()
.expect("shell manager")
.exports_canonical_async_ops(),
"shell manager should start unbound before canonical registry binding"
);
}
#[tokio::test]
async fn bind_ops_lifecycle_rebuilds_shell_tools_with_canonical_registry() {
let temp_dir = TempDir::new().unwrap();
let store = Arc::new(MemoryTaskStore::new());
let shell_config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
let mut config = BuiltinToolConfig::default();
config.policy.enable.insert("shell".to_string());
config.policy.enable.insert("shell_job_cancel".to_string());
let dispatcher = Arc::new(
CompositeDispatcher::new(
store,
&config,
None,
Some(shell_config),
None,
Some(SessionId::new().to_string()),
true,
)
.expect("composite dispatcher should build"),
);
let registry: Arc<dyn OpsLifecycleRegistry> =
Arc::new(meerkat_runtime::RuntimeOpsLifecycleRegistry::new());
let rebound = dispatcher
.bind_ops_lifecycle(Arc::clone(®istry), SessionId::new())
.expect("ops lifecycle binding should succeed")
.into_dispatcher();
let call_json = serde_json::value::RawValue::from_string(
r#"{"command":"sleep 60","background":true}"#.to_string(),
)
.unwrap();
let call = ToolCallView {
id: "shell-bg",
name: "shell",
args: &call_json,
};
let outcome = rebound
.dispatch(call)
.await
.expect("background shell dispatch");
assert_eq!(
outcome.async_ops.len(),
1,
"rebound shell dispatcher must emit canonical async op refs"
);
let payload: serde_json::Value =
serde_json::from_str(&outcome.result.text_content()).expect("json result");
let cancel_json = serde_json::value::RawValue::from_string(
serde_json::json!({
"job_id": payload["job_id"].as_str().expect("job id"),
})
.to_string(),
)
.unwrap();
let cancel = ToolCallView {
id: "shell-cancel",
name: "shell_job_cancel",
args: &cancel_json,
};
let _ = rebound
.dispatch(cancel)
.await
.expect("background shell cancel");
}
#[tokio::test]
async fn existing_datetime_tool_returns_json_output() {
use crate::builtin::BuiltinTool;
use crate::builtin::utility::DateTimeTool;
let tool = DateTimeTool::new();
let output = tool.call(json!({})).await.expect("call should succeed");
let value = output.into_json().expect("should be Json variant");
assert!(value.get("iso8601").is_some());
assert!(value.get("unix_timestamp").is_some());
}
#[test]
fn view_image_always_in_base_tool_set() {
for image_tool_results in [false, true] {
let store = Arc::new(MemoryTaskStore::new());
let dispatcher = CompositeDispatcher::new(
store,
&BuiltinToolConfig::default(),
None,
None,
None,
None,
image_tool_results,
)
.expect("composite dispatcher should build");
let tools = dispatcher.tools();
let tool_names: Vec<String> = tools.iter().map(|t| t.name.clone()).collect();
assert!(
tool_names.contains(&"view_image".to_string()),
"view_image should always be in base tool set (image_tool_results={image_tool_results}), but found: {tool_names:?}"
);
}
}
}