use adk_core::ToolContext;
use adk_tool::FunctionTool;
use schemars::JsonSchema;
use serde_json::Value;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use arc_swap::ArcSwap;
use dashmap::DashMap;
use tokio::sync::broadcast;
use crate::agent_codegen::AgentCodegen;
use crate::agent_registry::AgentRegistry;
use crate::control_panel::ws::WsEvent;
use crate::knowledge_graph::KnowledgeGraph;
use crate::process_manager::ProcessManager;
use crate::proxy_pool::RemoteAgentProxyPool;
use crate::rbac_bridge::RbacBridge;
use crate::router::MessageRouter;
pub fn build_kg_tools(kg: Arc<KnowledgeGraph>) -> Vec<Arc<dyn adk_core::Tool>> {
vec![
Arc::new(kg_create_entities(kg.clone())),
Arc::new(kg_add_observations(kg.clone())),
Arc::new(kg_search_nodes(kg.clone())),
Arc::new(kg_read_graph(kg.clone())),
Arc::new(kg_delete_entities(kg.clone())),
]
}
fn kg_create_entities(kg: Arc<KnowledgeGraph>) -> FunctionTool {
FunctionTool::new(
"kg_create_entities",
"Create entities in the knowledge graph. Each entity has a name, type, and optional observations. Use this to store facts about the user, their projects, preferences, and important context.",
move |ctx: Arc<dyn ToolContext>, args: Value| {
let kg = kg.clone();
async move {
let user_id = ctx.user_id().to_string();
tracing::info!(user_id = %user_id, "kg_create_entities: storing for user");
let entities = args.get("entities")
.and_then(|v| v.as_array())
.ok_or_else(|| adk_core::AdkError::tool("'entities' array is required".to_string()))?;
let inputs: Vec<crate::knowledge_graph::CreateEntityInput> = entities.iter()
.filter_map(|e| {
let name = e.get("name")?.as_str()?.to_string();
let entity_type = e.get("entity_type")
.or_else(|| e.get("type"))
.and_then(|v| v.as_str())
.unwrap_or("general")
.to_string();
let observations = e.get("observations")
.and_then(|v| v.as_array())
.map(|arr| arr.iter().filter_map(|o| o.as_str().map(String::from)).collect())
.unwrap_or_default();
Some(crate::knowledge_graph::CreateEntityInput { name, entity_type, observations })
})
.collect();
if inputs.is_empty() {
return Ok(serde_json::json!({"error": "no valid entities provided"}));
}
let created_names = kg.create_entities(&user_id, inputs);
Ok(serde_json::json!({
"created": created_names.len(),
"entities": created_names
}))
}
},
)
}
fn kg_add_observations(kg: Arc<KnowledgeGraph>) -> FunctionTool {
FunctionTool::new(
"kg_add_observations",
"Add observations (facts, notes, preferences) to an existing entity in the knowledge graph. The entity must already exist.",
move |ctx: Arc<dyn ToolContext>, args: Value| {
let kg = kg.clone();
async move {
let user_id = ctx.user_id().to_string();
let entity_name = args.get("entity_name")
.or_else(|| args.get("entity"))
.and_then(|v| v.as_str())
.ok_or_else(|| adk_core::AdkError::tool("'entity_name' is required".to_string()))?
.to_string();
let observations: Vec<String> = args.get("observations")
.and_then(|v| v.as_array())
.map(|arr| arr.iter().filter_map(|o| o.as_str().map(String::from)).collect())
.unwrap_or_default();
if observations.is_empty() {
return Ok(serde_json::json!({"error": "no observations provided"}));
}
let count = observations.len();
match kg.add_observations(&user_id, &entity_name, observations) {
Some(ids) => Ok(serde_json::json!({
"entity": entity_name,
"added": ids.len()
})),
None => Ok(serde_json::json!({
"error": format!("entity '{}' not found", entity_name),
"attempted": count
})),
}
}
},
)
}
fn kg_search_nodes(kg: Arc<KnowledgeGraph>) -> FunctionTool {
FunctionTool::new(
"kg_search_nodes",
"Search for entities in the knowledge graph by text query. Returns matching entities with their observations.",
move |ctx: Arc<dyn ToolContext>, args: Value| {
let kg = kg.clone();
async move {
let user_id = ctx.user_id().to_string();
let query = args.get("query")
.and_then(|v| v.as_str())
.unwrap_or("");
let results = kg.search_nodes(&user_id, query);
let entries: Vec<Value> = results.iter().map(|r| {
serde_json::json!({
"name": r.entity.name,
"type": r.entity.entity_type,
"observations": r.entity.observations.iter()
.map(|o| &o.content)
.collect::<Vec<_>>(),
})
}).collect();
if entries.is_empty() {
Ok(serde_json::json!({
"results": [],
"count": 0,
"message": format!("No entities matching '{}' found in the knowledge graph.", query)
}))
} else {
Ok(serde_json::json!({
"results": entries,
"count": entries.len()
}))
}
}
},
)
.with_read_only(true)
}
fn kg_read_graph(kg: Arc<KnowledgeGraph>) -> FunctionTool {
FunctionTool::new(
"kg_read_graph",
"Read the entire knowledge graph for the current user. Returns all entities, their types, observations, and relations.",
move |ctx: Arc<dyn ToolContext>, args: Value| {
let kg = kg.clone();
async move {
let user_id = ctx.user_id().to_string();
let _ = args;
tracing::info!(user_id = %user_id, "kg_read_graph: querying KG");
let (entities, relations) = kg.read_graph(&user_id);
tracing::info!(
user_id = %user_id,
entity_count = entities.len(),
relation_count = relations.len(),
"kg_read_graph: results"
);
let entity_values: Vec<Value> = entities.iter().map(|e| {
serde_json::json!({
"name": e.name,
"type": e.entity_type,
"observations": e.observations.iter()
.map(|o| &o.content)
.collect::<Vec<_>>(),
})
}).collect();
let relation_values: Vec<Value> = relations.iter().map(|r| {
serde_json::json!({
"source": r.source,
"target": r.target,
"relation_type": r.relation_type,
})
}).collect();
if entity_values.is_empty() && relation_values.is_empty() {
Ok(serde_json::json!({
"entities": [],
"relations": [],
"entity_count": 0,
"relation_count": 0,
"message": "Knowledge graph is empty for this user. No entities or relations stored yet."
}))
} else {
Ok(serde_json::json!({
"entities": entity_values,
"relations": relation_values,
"entity_count": entity_values.len(),
"relation_count": relation_values.len()
}))
}
}
},
)
.with_read_only(true)
}
fn kg_delete_entities(kg: Arc<KnowledgeGraph>) -> FunctionTool {
FunctionTool::new(
"kg_delete_entities",
"Delete entities from the knowledge graph by name. Also removes associated relations.",
move |ctx: Arc<dyn ToolContext>, args: Value| {
let kg = kg.clone();
async move {
let user_id = ctx.user_id().to_string();
let names: Vec<String> = args
.get("names")
.or_else(|| args.get("entities"))
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|n| n.as_str().map(String::from))
.collect()
})
.unwrap_or_default();
if names.is_empty() {
return Ok(serde_json::json!({"error": "no entity names provided"}));
}
let deleted = kg.delete_entities(&user_id, names);
Ok(serde_json::json!({
"deleted": deleted
}))
}
},
)
}
#[allow(dead_code)]
pub fn build_agent_tools(
registry: Arc<AgentRegistry>,
rbac: Arc<RbacBridge>,
ws_broadcast: broadcast::Sender<WsEvent>,
workspace_root: PathBuf,
) -> Vec<Arc<dyn adk_core::Tool>> {
vec![
Arc::new(agent_list_tool(registry.clone())),
Arc::new(agent_create_tool(
registry.clone(),
rbac,
ws_broadcast,
workspace_root,
)),
]
}
pub fn build_agent_management_tools(
registry: Arc<AgentRegistry>,
process_manager: Arc<ProcessManager>,
proxy_pool: Arc<RemoteAgentProxyPool>,
rbac: Arc<RbacBridge>,
router: Arc<ArcSwap<MessageRouter>>,
codegen: Arc<AgentCodegen>,
ws_broadcast: broadcast::Sender<WsEvent>,
workspace_root: PathBuf,
global_config: Arc<arc_swap::ArcSwap<crate::config::GatewayConfig>>,
) -> Vec<Arc<dyn adk_core::Tool>> {
vec![
Arc::new(agent_list_tool(registry.clone())),
Arc::new(agent_create_tool(
registry.clone(),
rbac.clone(),
ws_broadcast.clone(),
workspace_root.clone(),
)),
Arc::new(agent_start_tool(
registry.clone(),
process_manager.clone(),
proxy_pool.clone(),
rbac.clone(),
router.clone(),
codegen.clone(),
ws_broadcast.clone(),
workspace_root.clone(),
global_config.clone(),
)),
Arc::new(agent_stop_tool(
registry.clone(),
process_manager.clone(),
proxy_pool.clone(),
router.clone(),
ws_broadcast.clone(),
)),
Arc::new(agent_delete_tool(
registry.clone(),
rbac.clone(),
router.clone(),
ws_broadcast.clone(),
)),
Arc::new(agent_configure_tool(
registry.clone(),
process_manager.clone(),
proxy_pool.clone(),
rbac.clone(),
router.clone(),
codegen.clone(),
ws_broadcast.clone(),
workspace_root,
global_config,
)),
]
}
fn agent_list_tool(registry: Arc<AgentRegistry>) -> FunctionTool {
FunctionTool::new(
"agent_list",
"List all registered agents with their current lifecycle state, model, and description.",
move |_ctx: Arc<dyn ToolContext>, _args: Value| {
let registry = registry.clone();
async move {
let agents = registry.list();
let entries: Vec<Value> = agents
.iter()
.map(|(id, record)| {
serde_json::json!({
"id": id,
"name": record.config.name,
"description": record.config.description,
"state": format!("{:?}", record.state),
"model": record.config.model,
"auto_start": record.config.auto_start,
})
})
.collect();
Ok(serde_json::json!({
"agents": entries,
"count": entries.len()
}))
}
},
)
.with_read_only(true)
}
fn agent_create_tool(
registry: Arc<AgentRegistry>,
rbac: Arc<RbacBridge>,
ws_broadcast: broadcast::Sender<WsEvent>,
workspace_root: PathBuf,
) -> FunctionTool {
FunctionTool::new(
"agent_create",
"Create a new specialist LLM sub-agent for conversation routing and specialized tasks. NOT for coding tasks — use delegate_to_coding_agent for code changes. Requires: name, description, model (e.g. 'anthropic/claude-sonnet-4'), instruction. Optional: tools (array of tool names), auto_start (bool), channel_bindings (array of 'channel:account_id').",
move |_ctx: Arc<dyn ToolContext>, args: Value| {
let registry = registry.clone();
let rbac = rbac.clone();
let ws_broadcast = ws_broadcast.clone();
let workspace_root = workspace_root.clone();
async move {
let name = args.get("name")
.and_then(|v| v.as_str())
.ok_or_else(|| adk_core::AdkError::tool("'name' is required".to_string()))?
.to_string();
let description = args.get("description")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let model = args.get("model")
.and_then(|v| v.as_str())
.unwrap_or("anthropic/claude-sonnet-4")
.to_string();
let instruction = args.get("instruction")
.and_then(|v| v.as_str())
.unwrap_or("You are a helpful specialist agent.")
.to_string();
let tools: Vec<String> = args.get("tools")
.and_then(|v| v.as_array())
.map(|arr| arr.iter().filter_map(|t| t.as_str().map(String::from)).collect())
.unwrap_or_default();
let auto_start = args.get("auto_start")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let id = name.to_lowercase()
.chars()
.map(|c| if c.is_alphanumeric() || c == '-' { c } else { '-' })
.collect::<String>();
let config = crate::agent_config::AgentConfig {
id: id.clone(),
name,
description,
agent_type: crate::agent_config::AgentType::Llm,
model,
api_key_env: String::new(),
instruction,
tools: tools.clone(),
action_nodes: vec![],
workflow_edges: vec![],
sub_agents: vec![],
role: crate::agent_config::AgentRoleConfig {
allow: tools,
deny: vec![],
},
channel_bindings: vec![],
auto_start,
temperature: None,
max_output_tokens: None,
model_override: None,
};
let agent_id = match registry.create_agent(config.clone()) {
Ok(aid) => aid,
Err(e) => {
return Ok(serde_json::json!({
"created": false,
"error": e.to_string()
}));
}
};
let agent_dir = workspace_root.join("agents").join(&agent_id);
let context_dir = agent_dir.join("context");
let data_dir = agent_dir.join("data");
let src_dir = agent_dir.join("src");
for dir in [&context_dir, &data_dir, &src_dir] {
if let Err(e) = std::fs::create_dir_all(dir) {
tracing::warn!(
agent_id = %agent_id,
dir = %dir.display(),
error = %e,
"failed to create workspace directory"
);
}
}
let default_context_files: &[(&str, &str)] = &[
("PROFILE.md", "# Agent Profile\n\nSpecialist agent profile.\n"),
("USER.md", "# User Context\n\nUser-specific context and preferences.\n"),
("PROJECTS.md", "# Projects\n\nActive projects and tasks.\n"),
("HABITS.md", "# Habits\n\nUser habits and patterns.\n"),
("NOTES.md", "# Notes\n\nGeneral notes and observations.\n"),
("BOOTSTRAP.md", "# Bootstrap\n\nInitial setup and configuration context.\n"),
];
for (filename, content) in default_context_files {
let file_path = context_dir.join(filename);
if let Err(e) = std::fs::write(&file_path, content) {
tracing::warn!(
agent_id = %agent_id,
file = %file_path.display(),
error = %e,
"failed to write default context file"
);
}
}
let stripped = rbac.register_agent(&agent_id, &config.role);
if !stripped.is_empty() {
tracing::info!(
agent_id = %agent_id,
stripped = ?stripped,
"stripped system tool permissions from agent role"
);
}
let _ = ws_broadcast.send(WsEvent::AgentState {
agent_id: agent_id.clone(),
state: "Created".into(),
});
Ok(serde_json::json!({
"created": true,
"agent_id": agent_id,
"message": format!("Agent '{}' created successfully.", agent_id)
}))
}
},
)
}
fn agent_stop_tool(
registry: Arc<AgentRegistry>,
process_manager: Arc<ProcessManager>,
proxy_pool: Arc<RemoteAgentProxyPool>,
router: Arc<ArcSwap<MessageRouter>>,
ws_broadcast: broadcast::Sender<WsEvent>,
) -> FunctionTool {
FunctionTool::new(
"agent_stop",
"Stop a running User Agent by ID. Gracefully drains the process, removes it from the proxy pool and message router.",
move |_ctx: Arc<dyn ToolContext>, args: Value| {
let registry = registry.clone();
let process_manager = process_manager.clone();
let proxy_pool = proxy_pool.clone();
let router = router.clone();
let ws_broadcast = ws_broadcast.clone();
async move {
let agent_id = args.get("agent_id")
.and_then(|v| v.as_str())
.ok_or_else(|| adk_core::AdkError::tool("'agent_id' is required".to_string()))?
.to_string();
{
let _entry = registry.get(&agent_id)
.ok_or_else(|| adk_core::AdkError::tool(
format!("agent '{}' not found in registry", agent_id)
))?;
}
registry.transition(&agent_id, crate::agent_config::LifecycleState::Stopping)
.map_err(|e| adk_core::AdkError::tool(
format!("failed to transition '{}' to Stopping: {}", agent_id, e)
))?;
let _ = ws_broadcast.send(WsEvent::AgentState {
agent_id: agent_id.clone(),
state: "Stopping".into(),
});
if let Err(e) = process_manager.stop(&agent_id, Duration::from_secs(10)).await {
tracing::warn!(
agent_id = %agent_id,
error = %e,
"process stop failed, continuing with cleanup"
);
}
proxy_pool.remove(&agent_id);
let current = router.load();
let mut new_router = (**current).clone();
new_router.remove_agent_bindings(&agent_id);
router.store(Arc::new(new_router));
let _ = registry.transition(&agent_id, crate::agent_config::LifecycleState::Stopped);
let _ = ws_broadcast.send(WsEvent::AgentState {
agent_id: agent_id.clone(),
state: "Stopped".into(),
});
Ok(serde_json::json!({
"stopped": true,
"agent_id": agent_id,
"state": "Stopped"
}))
}
},
)
}
fn agent_start_tool(
registry: Arc<AgentRegistry>,
process_manager: Arc<ProcessManager>,
proxy_pool: Arc<RemoteAgentProxyPool>,
rbac: Arc<RbacBridge>,
router: Arc<ArcSwap<MessageRouter>>,
codegen: Arc<AgentCodegen>,
ws_broadcast: broadcast::Sender<WsEvent>,
workspace_root: PathBuf,
global_config: Arc<arc_swap::ArcSwap<crate::config::GatewayConfig>>,
) -> FunctionTool {
FunctionTool::new(
"agent_start",
"Start a User Agent by ID. Builds the agent binary, spawns the process, waits for readiness, and registers it for message routing.",
move |_ctx: Arc<dyn ToolContext>, args: Value| {
let registry = registry.clone();
let process_manager = process_manager.clone();
let proxy_pool = proxy_pool.clone();
let rbac = rbac.clone();
let router = router.clone();
let codegen = codegen.clone();
let ws_broadcast = ws_broadcast.clone();
let workspace_root = workspace_root.clone();
let global_config = global_config.clone();
async move {
let agent_id = args.get("agent_id")
.and_then(|v| v.as_str())
.ok_or_else(|| adk_core::AdkError::tool("'agent_id' is required".to_string()))?
.to_string();
let config = {
let entry = registry.get(&agent_id)
.ok_or_else(|| adk_core::AdkError::tool(
format!("agent '{}' not found in registry", agent_id)
))?;
entry.config.clone()
};
let global_cfg = global_config.load();
let effective_model = config
.resolve_model("primary", &global_cfg.agent.model)
.unwrap_or(config.model.as_str());
registry.transition(&agent_id, crate::agent_config::LifecycleState::Starting)
.map_err(|e| adk_core::AdkError::tool(
format!("failed to transition '{}' to Starting: {}", agent_id, e)
))?;
let _ = ws_broadcast.send(WsEvent::AgentState {
agent_id: agent_id.clone(),
state: "Starting".into(),
});
let binary_path = match codegen.build_agent(&config).await {
Ok(path) => path,
Err(e) => {
let _ = registry.transition(
&agent_id,
crate::agent_config::LifecycleState::Error {
message: format!("build failed: {}", e),
},
);
let _ = ws_broadcast.send(WsEvent::AgentState {
agent_id: agent_id.clone(),
state: "Error".into(),
});
return Ok(serde_json::json!({
"started": false,
"agent_id": agent_id,
"error": format!("build failed: {}", e)
}));
}
};
let api_key_env = config.resolve_api_key_env().to_string();
let mut env = HashMap::new();
env.insert("AGENT_ID".to_string(), agent_id.clone());
env.insert("AGENT_MODEL".to_string(), effective_model.to_string());
if let Ok(val) = std::env::var(&api_key_env) {
env.insert(api_key_env.clone(), val);
}
env.insert(
"AGENT_DATA_DIR".to_string(),
workspace_root
.join("agents")
.join(&agent_id)
.join("data")
.display()
.to_string(),
);
let port = match process_manager.spawn(&agent_id, &binary_path, env).await {
Ok(port) => port,
Err(e) => {
let _ = registry.transition(
&agent_id,
crate::agent_config::LifecycleState::Error {
message: format!("spawn failed: {}", e),
},
);
let _ = ws_broadcast.send(WsEvent::AgentState {
agent_id: agent_id.clone(),
state: "Error".into(),
});
return Ok(serde_json::json!({
"started": false,
"agent_id": agent_id,
"error": format!("spawn failed: {}", e)
}));
}
};
if let Err(e) = process_manager.wait_ready(&agent_id, Duration::from_secs(30)).await {
let _ = process_manager.stop(&agent_id, Duration::from_secs(5)).await;
proxy_pool.remove(&agent_id);
let _ = registry.transition(
&agent_id,
crate::agent_config::LifecycleState::Error {
message: format!("readiness check failed: {}", e),
},
);
let _ = ws_broadcast.send(WsEvent::AgentState {
agent_id: agent_id.clone(),
state: "Error".into(),
});
return Ok(serde_json::json!({
"started": false,
"agent_id": agent_id,
"error": format!("agent '{}' failed readiness check: {}", agent_id, e)
}));
}
proxy_pool.register(&agent_id, port);
rbac.register_agent(&agent_id, &config.role);
if !config.channel_bindings.is_empty() {
let current = router.load();
let mut new_router = (**current).clone();
new_router.add_agent_bindings(&agent_id, &config.channel_bindings);
router.store(Arc::new(new_router));
}
let _ = registry.transition(&agent_id, crate::agent_config::LifecycleState::Running);
let _ = ws_broadcast.send(WsEvent::AgentState {
agent_id: agent_id.clone(),
state: "Running".into(),
});
Ok(serde_json::json!({
"started": true,
"agent_id": agent_id,
"port": port,
"state": "Running"
}))
}
},
)
}
fn agent_delete_tool(
registry: Arc<AgentRegistry>,
rbac: Arc<RbacBridge>,
router: Arc<ArcSwap<MessageRouter>>,
ws_broadcast: broadcast::Sender<WsEvent>,
) -> FunctionTool {
FunctionTool::new(
"agent_delete",
"Delete a User Agent by ID. The agent must be in Stopped or Error state. Removes the agent from the registry, RBAC roles, and message router bindings.",
move |_ctx: Arc<dyn ToolContext>, args: Value| {
let registry = registry.clone();
let rbac = rbac.clone();
let router = router.clone();
let ws_broadcast = ws_broadcast.clone();
async move {
let agent_id = args.get("agent_id")
.and_then(|v| v.as_str())
.ok_or_else(|| adk_core::AdkError::tool("'agent_id' is required".to_string()))?
.to_string();
registry.delete(&agent_id)
.map_err(|e| adk_core::AdkError::tool(
format!("cannot delete agent '{}': {}", agent_id, e)
))?;
rbac.remove_agent(&agent_id);
let current = router.load();
let mut new_router = (**current).clone();
new_router.remove_agent_bindings(&agent_id);
router.store(Arc::new(new_router));
let _ = ws_broadcast.send(WsEvent::AgentState {
agent_id: agent_id.clone(),
state: "Deleted".into(),
});
Ok(serde_json::json!({
"deleted": true,
"agent_id": agent_id,
"state": "Deleted"
}))
}
},
)
}
fn agent_configure_tool(
registry: Arc<AgentRegistry>,
process_manager: Arc<ProcessManager>,
proxy_pool: Arc<RemoteAgentProxyPool>,
rbac: Arc<RbacBridge>,
router: Arc<ArcSwap<MessageRouter>>,
codegen: Arc<AgentCodegen>,
ws_broadcast: broadcast::Sender<WsEvent>,
workspace_root: PathBuf,
global_config: Arc<arc_swap::ArcSwap<crate::config::GatewayConfig>>,
) -> FunctionTool {
FunctionTool::new(
"agent_configure",
"Update a User Agent's configuration. Accepts the agent_id and a new config object. If the agent is Running, it will be stopped, reconfigured, and restarted automatically.",
move |_ctx: Arc<dyn ToolContext>, args: Value| {
let registry = registry.clone();
let process_manager = process_manager.clone();
let proxy_pool = proxy_pool.clone();
let rbac = rbac.clone();
let router = router.clone();
let codegen = codegen.clone();
let ws_broadcast = ws_broadcast.clone();
let workspace_root = workspace_root.clone();
let global_config = global_config.clone();
async move {
let agent_id = args.get("agent_id")
.and_then(|v| v.as_str())
.ok_or_else(|| adk_core::AdkError::tool("'agent_id' is required".to_string()))?
.to_string();
let config_value = args.get("config")
.ok_or_else(|| adk_core::AdkError::tool("'config' is required".to_string()))?;
let new_config: crate::agent_config::AgentConfig = serde_json::from_value(config_value.clone())
.map_err(|e| adk_core::AdkError::tool(
format!("invalid config: {}", e)
))?;
if new_config.id != agent_id {
return Err(adk_core::AdkError::tool(
format!("config id '{}' does not match agent_id '{}'", new_config.id, agent_id)
));
}
let (was_running, old_channel_bindings) = {
let entry = registry.get(&agent_id)
.ok_or_else(|| adk_core::AdkError::tool(
format!("agent '{}' not found in registry", agent_id)
))?;
let running = entry.state == crate::agent_config::LifecycleState::Running;
let old_bindings = entry.config.channel_bindings.clone();
(running, old_bindings)
};
if was_running {
registry.transition(&agent_id, crate::agent_config::LifecycleState::Stopping)
.map_err(|e| adk_core::AdkError::tool(
format!("failed to transition '{}' to Stopping: {}", agent_id, e)
))?;
let _ = ws_broadcast.send(WsEvent::AgentState {
agent_id: agent_id.clone(),
state: "Stopping".into(),
});
if let Err(e) = process_manager.stop(&agent_id, Duration::from_secs(10)).await {
tracing::warn!(
agent_id = %agent_id,
error = %e,
"process stop failed during configure, continuing with cleanup"
);
}
proxy_pool.remove(&agent_id);
let current = router.load();
let mut new_router = (**current).clone();
new_router.remove_agent_bindings(&agent_id);
router.store(Arc::new(new_router));
let _ = registry.transition(&agent_id, crate::agent_config::LifecycleState::Stopped);
let _ = ws_broadcast.send(WsEvent::AgentState {
agent_id: agent_id.clone(),
state: "Stopped".into(),
});
}
registry.update_config(&agent_id, new_config.clone())
.map_err(|e| adk_core::AdkError::tool(
format!("failed to update config for '{}': {}", agent_id, e)
))?;
rbac.register_agent(&agent_id, &new_config.role);
if new_config.channel_bindings != old_channel_bindings {
let current = router.load();
let mut new_router = (**current).clone();
new_router.update_agent_bindings(&agent_id, &new_config.channel_bindings);
router.store(Arc::new(new_router));
}
if was_running {
registry.transition(&agent_id, crate::agent_config::LifecycleState::Starting)
.map_err(|e| adk_core::AdkError::tool(
format!("failed to transition '{}' to Starting: {}", agent_id, e)
))?;
let _ = ws_broadcast.send(WsEvent::AgentState {
agent_id: agent_id.clone(),
state: "Starting".into(),
});
let binary_path = match codegen.build_agent(&new_config).await {
Ok(path) => path,
Err(e) => {
let _ = registry.transition(
&agent_id,
crate::agent_config::LifecycleState::Error {
message: format!("build failed during configure: {}", e),
},
);
let _ = ws_broadcast.send(WsEvent::AgentState {
agent_id: agent_id.clone(),
state: "Error".into(),
});
return Ok(serde_json::json!({
"configured": true,
"restarted": false,
"agent_id": agent_id,
"error": format!("config updated but restart failed: build error: {}", e)
}));
}
};
let global_cfg = global_config.load();
let effective_model = new_config
.resolve_model("primary", &global_cfg.agent.model)
.unwrap_or(new_config.model.as_str());
let api_key_env = new_config.resolve_api_key_env().to_string();
let mut env = HashMap::new();
env.insert("AGENT_ID".to_string(), agent_id.clone());
env.insert("AGENT_MODEL".to_string(), effective_model.to_string());
if let Ok(val) = std::env::var(&api_key_env) {
env.insert(api_key_env.clone(), val);
}
env.insert(
"AGENT_DATA_DIR".to_string(),
workspace_root
.join("agents")
.join(&agent_id)
.join("data")
.display()
.to_string(),
);
let port = match process_manager.spawn(&agent_id, &binary_path, env).await {
Ok(port) => port,
Err(e) => {
let _ = registry.transition(
&agent_id,
crate::agent_config::LifecycleState::Error {
message: format!("spawn failed during configure: {}", e),
},
);
let _ = ws_broadcast.send(WsEvent::AgentState {
agent_id: agent_id.clone(),
state: "Error".into(),
});
return Ok(serde_json::json!({
"configured": true,
"restarted": false,
"agent_id": agent_id,
"error": format!("config updated but restart failed: spawn error: {}", e)
}));
}
};
if let Err(e) = process_manager.wait_ready(&agent_id, Duration::from_secs(30)).await {
let _ = process_manager.stop(&agent_id, Duration::from_secs(5)).await;
proxy_pool.remove(&agent_id);
let _ = registry.transition(
&agent_id,
crate::agent_config::LifecycleState::Error {
message: format!("readiness check failed during configure: {}", e),
},
);
let _ = ws_broadcast.send(WsEvent::AgentState {
agent_id: agent_id.clone(),
state: "Error".into(),
});
return Ok(serde_json::json!({
"configured": true,
"restarted": false,
"agent_id": agent_id,
"error": format!("config updated but restart failed: readiness timeout: {}", e)
}));
}
proxy_pool.register(&agent_id, port);
if !new_config.channel_bindings.is_empty() {
let current = router.load();
let mut new_router = (**current).clone();
new_router.update_agent_bindings(&agent_id, &new_config.channel_bindings);
router.store(Arc::new(new_router));
}
let _ = registry.transition(&agent_id, crate::agent_config::LifecycleState::Running);
let _ = ws_broadcast.send(WsEvent::AgentState {
agent_id: agent_id.clone(),
state: "Running".into(),
});
return Ok(serde_json::json!({
"configured": true,
"restarted": true,
"agent_id": agent_id,
"port": port,
"state": "Running"
}));
}
let _ = ws_broadcast.send(WsEvent::AgentState {
agent_id: agent_id.clone(),
state: "Configured".into(),
});
Ok(serde_json::json!({
"configured": true,
"restarted": false,
"agent_id": agent_id,
"state": "Configured"
}))
}
},
)
}
pub fn build_scheduled_task_tools(
cron_scheduler: Arc<tokio::sync::Mutex<Option<crate::cron::CronScheduler>>>,
config: Arc<ArcSwap<crate::config::GatewayConfig>>,
config_path: PathBuf,
) -> Vec<Arc<dyn adk_core::Tool>> {
vec![
Arc::new(task_list_tool(cron_scheduler.clone())),
Arc::new(task_create_tool(
cron_scheduler.clone(),
config.clone(),
config_path.clone(),
)),
Arc::new(task_cancel_tool(cron_scheduler.clone())),
Arc::new(task_delete_tool(
cron_scheduler.clone(),
config.clone(),
config_path,
)),
]
}
fn task_list_tool(
scheduler: Arc<tokio::sync::Mutex<Option<crate::cron::CronScheduler>>>,
) -> FunctionTool {
FunctionTool::new(
"task_list",
"List all scheduled tasks (cron jobs) with their ID, schedule, message, delivery target, and status.",
move |_ctx: Arc<dyn ToolContext>, _args: Value| {
let scheduler = scheduler.clone();
async move {
let guard = scheduler.lock().await;
let jobs: Vec<Value> = match guard.as_ref() {
Some(sched) => {
sched.list_all_jobs().iter().map(|(job, status)| {
serde_json::json!({
"id": job.id,
"schedule": job.schedule,
"message": job.message,
"delivery": job.deliver_to.as_ref().map(|d| serde_json::json!({
"channel": d.channel,
"target": d.target,
})),
"status": format!("{:?}", status),
})
}).collect()
}
None => vec![],
};
Ok(serde_json::json!({
"tasks": jobs,
"count": jobs.len()
}))
}
},
)
.with_read_only(true)
}
fn task_create_tool(
scheduler: Arc<tokio::sync::Mutex<Option<crate::cron::CronScheduler>>>,
config: Arc<ArcSwap<crate::config::GatewayConfig>>,
config_path: PathBuf,
) -> FunctionTool {
FunctionTool::new(
"task_create",
"Create a new scheduled task. Required: id (unique string), schedule (e.g. '@every 5m', '@every 1h'), message (text to send or 'ask:prompt' for agent processing). Optional: delivery (object with 'channel' and 'target' fields).",
move |_ctx: Arc<dyn ToolContext>, args: Value| {
let scheduler = scheduler.clone();
let config = config.clone();
let config_path = config_path.clone();
async move {
let id = args.get("id").and_then(|v| v.as_str()).unwrap_or("").trim().to_string();
let schedule = args.get("schedule").and_then(|v| v.as_str()).unwrap_or("").trim().to_string();
let message = args.get("message").and_then(|v| v.as_str()).unwrap_or("").trim().to_string();
if id.is_empty() || schedule.is_empty() || message.is_empty() {
return Err(adk_core::AdkError::tool(
"Required fields: 'id', 'schedule', 'message'".to_string()
));
}
let delivery = args.get("delivery").and_then(|d| {
let channel = d.get("channel")?.as_str()?.to_string();
let target = d.get("target")?.as_str()?.to_string();
if channel.is_empty() { return None; }
Some(crate::config::CronDelivery { channel, target })
});
let suppress_keyword = args.get("suppress_keyword")
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
.map(|s| s.to_string());
let new_job = crate::config::CronJob {
id: id.clone(),
schedule: schedule.clone(),
message: message.clone(),
deliver_to: delivery,
suppress_keyword,
target: None,
workspace: None,
};
let mut cfg = config.load().as_ref().clone();
if cfg.cron.jobs.iter().any(|j| j.id == id) {
return Err(adk_core::AdkError::tool(
format!("Task with ID '{}' already exists", id)
));
}
cfg.cron.jobs.push(new_job.clone());
let output = serde_json::to_string_pretty(&cfg)
.map_err(|e| adk_core::AdkError::tool(format!("Serialize error: {e}")))?;
std::fs::write(&config_path, &output)
.map_err(|e| adk_core::AdkError::tool(format!("Write error: {e}")))?;
config.store(std::sync::Arc::new(cfg.clone()));
let mut guard = scheduler.lock().await;
if let Some(sched) = guard.as_mut() {
sched.reconcile(&cfg.cron.jobs);
}
Ok(serde_json::json!({
"created": true,
"id": id,
"schedule": schedule,
"message": message
}))
}
},
)
}
fn task_cancel_tool(
scheduler: Arc<tokio::sync::Mutex<Option<crate::cron::CronScheduler>>>,
) -> FunctionTool {
FunctionTool::new(
"task_cancel",
"Cancel (pause) a running scheduled task by ID. The task remains in config but stops firing.",
move |_ctx: Arc<dyn ToolContext>, args: Value| {
let scheduler = scheduler.clone();
async move {
let id = args.get("id").and_then(|v| v.as_str()).unwrap_or("").trim().to_string();
if id.is_empty() {
return Err(adk_core::AdkError::tool("'id' is required".to_string()));
}
let mut guard = scheduler.lock().await;
if let Some(sched) = guard.as_mut() {
sched.cancel(&id);
}
Ok(serde_json::json!({
"cancelled": true,
"id": id
}))
}
},
)
}
fn task_delete_tool(
scheduler: Arc<tokio::sync::Mutex<Option<crate::cron::CronScheduler>>>,
config: Arc<ArcSwap<crate::config::GatewayConfig>>,
config_path: PathBuf,
) -> FunctionTool {
FunctionTool::new(
"task_delete",
"Permanently delete a scheduled task by ID. Removes it from config and stops it if running.",
move |_ctx: Arc<dyn ToolContext>, args: Value| {
let scheduler = scheduler.clone();
let config = config.clone();
let config_path = config_path.clone();
async move {
let id = args.get("id").and_then(|v| v.as_str()).unwrap_or("").trim().to_string();
if id.is_empty() {
return Err(adk_core::AdkError::tool("'id' is required".to_string()));
}
let mut cfg = config.load().as_ref().clone();
let before = cfg.cron.jobs.len();
cfg.cron.jobs.retain(|j| j.id != id);
if cfg.cron.jobs.len() == before {
return Err(adk_core::AdkError::tool(
format!("Task '{}' not found", id)
));
}
let output = serde_json::to_string_pretty(&cfg)
.map_err(|e| adk_core::AdkError::tool(format!("Serialize error: {e}")))?;
std::fs::write(&config_path, &output)
.map_err(|e| adk_core::AdkError::tool(format!("Write error: {e}")))?;
config.store(std::sync::Arc::new(cfg.clone()));
let mut guard = scheduler.lock().await;
if let Some(sched) = guard.as_mut() {
sched.reconcile(&cfg.cron.jobs);
}
Ok(serde_json::json!({
"deleted": true,
"id": id
}))
}
},
)
}
pub fn build_filesystem_tools(workspace_root: PathBuf) -> Vec<Arc<dyn adk_core::Tool>> {
vec![
Arc::new(fs_pwd_tool(workspace_root.clone())),
Arc::new(fs_list_tool(workspace_root.clone())),
Arc::new(fs_tree_tool(workspace_root.clone())),
Arc::new(fs_read_tool(workspace_root.clone())),
Arc::new(fs_search_tool(workspace_root)),
]
}
pub fn build_coding_agent_delegation_tool(
delegator: Arc<crate::coding_agent::delegator::TaskDelegator>,
) -> Arc<dyn adk_core::Tool> {
Arc::new(FunctionTool::new(
"delegate_to_coding_agent",
"Delegate a coding task to a registered coding agent (e.g., Claude Code, Kiro CLI, Codex). The agent executes the task in a real workspace with filesystem access — writing code, running commands, creating files. Use coding_agent_list first to see available agents. NOT for creating new agents — use agent_create for that.",
move |ctx: Arc<dyn ToolContext>, args: Value| {
let delegator = delegator.clone();
async move {
let agent = args.get("agent")
.and_then(|v| v.as_str())
.ok_or_else(|| adk_core::AdkError::tool("'agent' field is required (agent ID or alias)"))?;
let task = args.get("task")
.and_then(|v| v.as_str())
.ok_or_else(|| adk_core::AdkError::tool("'task' field is required (task description)"))?;
let workspace = args.get("workspace")
.and_then(|v| v.as_str())
.map(std::path::PathBuf::from);
let file_context: Option<Vec<std::path::PathBuf>> = args.get("files")
.and_then(|v| v.as_array())
.map(|arr| arr.iter().filter_map(|v| v.as_str().map(std::path::PathBuf::from)).collect());
let user_id = ctx.user_id().to_string();
let (channel_type, channel_id) = match user_id.split_once(':') {
Some((ct, id)) => (ct.to_string(), id.to_string()),
None => ("internal".to_string(), "system".to_string()),
};
let request = crate::coding_agent::models::TaskRequest {
description: task.to_string(),
trigger: crate::coding_agent::models::TaskTrigger::AgentDelegation {
source_agent_id: "system".to_string(),
},
workspace,
file_context,
reply_to: crate::coding_agent::models::ReplyTarget {
channel_type,
channel_id,
message_id: None,
},
};
match delegator.delegate(agent, request).await {
Ok(task_id) => Ok(serde_json::json!({
"status": "queued",
"task_id": task_id,
"agent": agent,
"message": format!("Task delegated to '{}'. The agent is now working and will stream progress and results directly to this chat. Task ID: {}", agent, task_id)
})),
Err(e) => Ok(serde_json::json!({
"status": "error",
"error": format!("{}", e),
"agent": agent
})),
}
}
},
))
}
pub fn build_coding_agent_list_tool(
registry: Arc<crate::coding_agent::registry::CodingAgentRegistry>,
) -> Arc<dyn adk_core::Tool> {
Arc::new(FunctionTool::new(
"coding_agent_list",
"List all registered coding agents with their current status. Use this to discover which coding agents are available before delegating tasks. Shows agent ID, alias, backend type, connection status, and workspaces.",
move |_ctx: Arc<dyn ToolContext>, _args: Value| {
let registry = registry.clone();
async move {
let agents = registry.list_agents();
let list: Vec<serde_json::Value> = agents.iter().map(|a| {
let status_str = match &a.status {
crate::coding_agent::status::AgentConnectionStatus::Connected => "connected",
crate::coding_agent::status::AgentConnectionStatus::Disconnected { .. } => "disconnected",
crate::coding_agent::status::AgentConnectionStatus::Error { .. } => "error",
crate::coding_agent::status::AgentConnectionStatus::Unknown => "unknown",
};
serde_json::json!({
"id": a.id,
"alias": a.config.alias,
"backend_type": a.backend_type,
"status": status_str,
"workspaces": a.config.workspaces.iter().map(|w| w.display().to_string()).collect::<Vec<_>>(),
})
}).collect();
if list.is_empty() {
Ok(serde_json::json!({
"agents": [],
"message": "No coding agents registered. The user can add one via the Control Panel at /ui/coding-agents/new"
}))
} else {
Ok(serde_json::json!({
"agents": list,
"count": list.len()
}))
}
}
},
))
}
pub fn build_coding_agent_task_status_tool(
task_history: Arc<crate::coding_agent::history::TaskHistory>,
history_db: Arc<crate::coding_agent::history_db::PersistentTaskHistory>,
) -> Arc<dyn adk_core::Tool> {
Arc::new(FunctionTool::new(
"coding_agent_task_status",
"Check the status of a previously delegated coding agent task. Returns the current state (queued, running, completed, failed, cancelled) and the output/result if completed. Use this to check on tasks you delegated with delegate_to_coding_agent.",
move |_ctx: Arc<dyn ToolContext>, args: Value| {
let task_history = task_history.clone();
let history_db = history_db.clone();
async move {
let task_id = args.get("task_id")
.and_then(|v| v.as_str())
.ok_or_else(|| adk_core::AdkError::tool("'task_id' field is required"))?;
let task = history_db.get_task(&task_id.to_string())
.or_else(|| task_history.get_task(&task_id.to_string()));
match task {
Some(entry) => {
let (status, output, error) = match &entry.state {
crate::coding_agent::models::TaskState::Queued { .. } => {
("queued".to_string(), None, None)
}
crate::coding_agent::models::TaskState::Running { progress_percent, .. } => {
("running".to_string(), progress_percent.map(|p| format!("{}% complete", p)), None)
}
crate::coding_agent::models::TaskState::Completed { result, .. } => {
("completed".to_string(), Some(result.output.clone()), None)
}
crate::coding_agent::models::TaskState::Failed { error, .. } => {
let err_msg = format!("{:?}", error);
("failed".to_string(), None, Some(err_msg))
}
crate::coding_agent::models::TaskState::Cancelled { reason, .. } => {
("cancelled".to_string(), None, Some(format!("{:?}", reason)))
}
};
Ok(serde_json::json!({
"task_id": task_id,
"agent_id": entry.agent_id,
"description": entry.description,
"status": status,
"output": output,
"error": error,
}))
}
None => Ok(serde_json::json!({
"task_id": task_id,
"status": "not_found",
"message": "Task not found. It may have expired from history or the ID is incorrect."
})),
}
}
},
))
}
#[derive(serde::Serialize, JsonSchema)]
struct FsListParams {
path: Option<String>,
show_hidden: Option<bool>,
}
#[derive(serde::Serialize, JsonSchema)]
struct FsReadParams {
path: String,
}
#[derive(serde::Serialize, JsonSchema)]
struct FsTreeParams {
path: Option<String>,
depth: Option<u64>,
show_hidden: Option<bool>,
}
#[derive(serde::Serialize, JsonSchema)]
struct FsSearchParams {
query: String,
path: Option<String>,
}
fn fs_pwd_tool(root: PathBuf) -> FunctionTool {
FunctionTool::new(
"fs_pwd",
"Show the current workspace root directory (absolute path). No arguments needed.",
move |_ctx: Arc<dyn ToolContext>, _args: Value| {
let root = root.clone();
async move {
let abs = root.canonicalize().unwrap_or_else(|_| root.clone());
Ok(serde_json::json!({
"workspace_root": abs.to_string_lossy(),
}))
}
},
)
.with_read_only(true)
}
fn fs_list_tool(root: PathBuf) -> FunctionTool {
FunctionTool::new(
"fs_list",
"List files and directories at a given path. Returns names, types (file/dir), and sizes. Path can be relative to workspace root or absolute. Use '.' for the root directory. Supports '..' for parent traversal within the filesystem.",
move |_ctx: Arc<dyn ToolContext>, args: Value| {
let root = root.clone();
async move {
let path_str = args.get("path").and_then(|v| v.as_str()).unwrap_or(".");
let show_hidden = args.get("show_hidden").and_then(|v| v.as_bool()).unwrap_or(false);
let target = if std::path::Path::new(path_str).is_absolute() {
PathBuf::from(path_str)
} else {
root.join(path_str)
};
let canonical = target.canonicalize().map_err(|e| {
adk_core::AdkError::tool(format!("Path not found: {e}"))
})?;
let mut entries = Vec::new();
let read_dir = std::fs::read_dir(&canonical).map_err(|e| {
adk_core::AdkError::tool(format!("Cannot read directory: {e}"))
})?;
for entry in read_dir.flatten() {
let meta = entry.metadata().ok();
let name = entry.file_name().to_string_lossy().to_string();
if !show_hidden && name.starts_with('.') {
continue;
}
if name == "node_modules" || name == "target" {
continue;
}
entries.push(serde_json::json!({
"name": name,
"type": if meta.as_ref().map(|m| m.is_dir()).unwrap_or(false) { "dir" } else { "file" },
"size": meta.as_ref().map(|m| m.len()).unwrap_or(0),
}));
}
entries.sort_by(|a, b| {
let a_type = a["type"].as_str().unwrap_or("");
let b_type = b["type"].as_str().unwrap_or("");
b_type.cmp(a_type).then_with(|| {
a["name"].as_str().unwrap_or("").cmp(b["name"].as_str().unwrap_or(""))
})
});
Ok(serde_json::json!({
"path": canonical.to_string_lossy(),
"entries": entries,
"count": entries.len()
}))
}
},
)
.with_read_only(true)
.with_parameters_schema::<FsListParams>()
}
fn fs_read_tool(root: PathBuf) -> FunctionTool {
FunctionTool::new(
"fs_read",
"Read the contents of a file. Path can be relative to workspace root or absolute. Returns the text content (max 50KB). For binary files, returns a size indicator instead.",
move |_ctx: Arc<dyn ToolContext>, args: Value| {
let root = root.clone();
async move {
let path_str = args.get("path").and_then(|v| v.as_str()).unwrap_or("");
if path_str.is_empty() {
return Err(adk_core::AdkError::tool("'path' is required".to_string()));
}
let target = if std::path::Path::new(path_str).is_absolute() {
PathBuf::from(path_str)
} else {
root.join(path_str)
};
let canonical = target.canonicalize().map_err(|e| {
adk_core::AdkError::tool(format!("Path not found: {e}"))
})?;
let meta = std::fs::metadata(&canonical).map_err(|e| {
adk_core::AdkError::tool(format!("Cannot read file: {e}"))
})?;
if meta.is_dir() {
return Err(adk_core::AdkError::tool("Path is a directory, use fs_list instead".to_string()));
}
const MAX_SIZE: u64 = 50 * 1024;
if meta.len() > MAX_SIZE {
return Ok(serde_json::json!({
"path": path_str,
"truncated": true,
"size": meta.len(),
"content": std::fs::read_to_string(&canonical)
.map(|s| s[..MAX_SIZE as usize].to_string())
.unwrap_or_else(|_| format!("[Binary file, {} bytes]", meta.len()))
}));
}
match std::fs::read_to_string(&canonical) {
Ok(content) => Ok(serde_json::json!({
"path": path_str,
"size": meta.len(),
"content": content
})),
Err(_) => Ok(serde_json::json!({
"path": path_str,
"size": meta.len(),
"content": format!("[Binary file, {} bytes]", meta.len())
})),
}
}
},
)
.with_read_only(true)
.with_parameters_schema::<FsReadParams>()
}
fn fs_tree_tool(root: PathBuf) -> FunctionTool {
FunctionTool::new(
"fs_tree",
"Show a directory tree structure with configurable depth. Path can be relative or absolute. Returns an indented tree view of files and directories. Optional 'depth' (default 2, max 5).",
move |_ctx: Arc<dyn ToolContext>, args: Value| {
let root = root.clone();
async move {
let path_str = args.get("path").and_then(|v| v.as_str()).unwrap_or(".");
let max_depth = args.get("depth").and_then(|v| v.as_u64()).unwrap_or(2).min(5) as usize;
let show_hidden = args.get("show_hidden").and_then(|v| v.as_bool()).unwrap_or(false);
let target = if std::path::Path::new(path_str).is_absolute() {
PathBuf::from(path_str)
} else {
root.join(path_str)
};
let canonical = target.canonicalize().map_err(|e| {
adk_core::AdkError::tool(format!("Path not found: {e}"))
})?;
if !canonical.is_dir() {
return Err(adk_core::AdkError::tool("Path is not a directory".to_string()));
}
let mut tree = String::new();
let mut file_count = 0usize;
let mut dir_count = 0usize;
fn walk(
dir: &std::path::Path,
prefix: &str,
depth: usize,
max_depth: usize,
show_hidden: bool,
tree: &mut String,
file_count: &mut usize,
dir_count: &mut usize,
) {
if depth >= max_depth { return; }
let mut entries: Vec<_> = match std::fs::read_dir(dir) {
Ok(rd) => rd.flatten().collect(),
Err(_) => return,
};
entries.sort_by_key(|e| e.file_name());
let entries: Vec<_> = entries.into_iter().filter(|e| {
let name = e.file_name().to_string_lossy().to_string();
if !show_hidden && name.starts_with('.') { return false; }
if name == "node_modules" || name == "target" { return false; }
true
}).collect();
let count = entries.len();
for (i, entry) in entries.iter().enumerate() {
let is_last = i == count - 1;
let connector = if is_last { "└── " } else { "├── " };
let name = entry.file_name().to_string_lossy().to_string();
let is_dir = entry.metadata().map(|m| m.is_dir()).unwrap_or(false);
tree.push_str(&format!("{}{}{}{}\n", prefix, connector, name, if is_dir { "/" } else { "" }));
if is_dir {
*dir_count += 1;
let new_prefix = format!("{}{}", prefix, if is_last { " " } else { "│ " });
walk(&entry.path(), &new_prefix, depth + 1, max_depth, show_hidden, tree, file_count, dir_count);
} else {
*file_count += 1;
}
}
}
let root_name = canonical.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_else(|| canonical.to_string_lossy().to_string());
tree.push_str(&format!("{}/\n", root_name));
walk(&canonical, "", 0, max_depth, show_hidden, &mut tree, &mut file_count, &mut dir_count);
Ok(serde_json::json!({
"path": canonical.to_string_lossy(),
"tree": tree,
"directories": dir_count,
"files": file_count
}))
}
},
)
.with_read_only(true)
.with_parameters_schema::<FsTreeParams>()
}
fn fs_search_tool(root: PathBuf) -> FunctionTool {
FunctionTool::new(
"fs_search",
"Search for files by name pattern (case-insensitive substring match). Returns matching file paths. Path can be relative or absolute. Optional 'path' to limit search to a subdirectory.",
move |_ctx: Arc<dyn ToolContext>, args: Value| {
let root = root.clone();
async move {
let query = args.get("query").and_then(|v| v.as_str()).unwrap_or("").to_lowercase();
if query.is_empty() {
return Err(adk_core::AdkError::tool("'query' is required".to_string()));
}
let sub_path = args.get("path").and_then(|v| v.as_str()).unwrap_or(".");
let search_root = if std::path::Path::new(sub_path).is_absolute() {
PathBuf::from(sub_path)
} else {
root.join(sub_path)
};
let canonical = search_root.canonicalize().unwrap_or_else(|_| search_root.clone());
let mut matches = Vec::new();
let mut stack = vec![canonical.clone()];
let max_results = 50;
while let Some(dir) = stack.pop() {
if matches.len() >= max_results { break; }
let entries = match std::fs::read_dir(&dir) {
Ok(e) => e,
Err(_) => continue,
};
for entry in entries.flatten() {
let name = entry.file_name().to_string_lossy().to_string();
if name.starts_with('.') || name == "node_modules" || name == "target" {
continue;
}
let path = entry.path();
if path.is_dir() {
stack.push(path);
} else if name.to_lowercase().contains(&query) {
matches.push(path.to_string_lossy().to_string());
if matches.len() >= max_results { break; }
}
}
}
if matches.is_empty() {
Ok(serde_json::json!({
"query": query,
"matches": [],
"count": 0,
"message": format!("No files matching '{}' found in '{}'. The search is complete — do not retry with the same query.", query, canonical.to_string_lossy())
}))
} else {
Ok(serde_json::json!({
"query": query,
"matches": matches,
"count": matches.len(),
"truncated": matches.len() >= max_results
}))
}
}
},
)
.with_read_only(true)
.with_parameters_schema::<FsSearchParams>()
}
pub fn build_channel_tools(
channel_map: Arc<DashMap<crate::channel::ChannelKey, Arc<dyn crate::channel::Channel>>>,
) -> Vec<Arc<dyn adk_core::Tool>> {
vec![Arc::new(send_photo_tool(channel_map))]
}
fn send_photo_tool(
channel_map: Arc<DashMap<crate::channel::ChannelKey, Arc<dyn crate::channel::Channel>>>,
) -> FunctionTool {
FunctionTool::new(
"send_photo",
"Send a photo/image to the user's chat. Provide either a file 'path' (absolute path to an image file on disk) or 'base64' (base64-encoded image data). Optional 'caption' text. The image is sent to the current user's Telegram chat.",
move |ctx: Arc<dyn ToolContext>, args: Value| {
let channel_map = channel_map.clone();
async move {
let user_id = ctx.user_id().to_string();
let caption = args.get("caption").and_then(|v| v.as_str()).map(|s| s.to_string());
let (data, mime_type) = if let Some(path_str) = args.get("path").and_then(|v| v.as_str()) {
let path = std::path::Path::new(path_str);
let bytes = std::fs::read(path).map_err(|e| {
adk_core::AdkError::tool(format!("Cannot read file '{}': {e}", path_str))
})?;
let mime = match path.extension().and_then(|e| e.to_str()) {
Some("png") => "image/png",
Some("jpg") | Some("jpeg") => "image/jpeg",
Some("gif") => "image/gif",
Some("webp") => "image/webp",
_ => "image/png",
};
(bytes, mime.to_string())
} else if let Some(b64) = args.get("base64").and_then(|v| v.as_str()) {
let bytes = base64::Engine::decode(&base64::engine::general_purpose::STANDARD, b64)
.map_err(|e| adk_core::AdkError::tool(format!("Invalid base64: {e}")))?;
let mime = args.get("mime_type").and_then(|v| v.as_str()).unwrap_or("image/png").to_string();
(bytes, mime)
} else {
return Err(adk_core::AdkError::tool(
"Either 'path' (file path) or 'base64' (base64 data) is required".to_string()
));
};
const MAX_PHOTO_SIZE: usize = 10 * 1024 * 1024;
if data.len() > MAX_PHOTO_SIZE {
return Err(adk_core::AdkError::tool(
format!("Image too large ({} bytes). Telegram limit is 10MB.", data.len())
));
}
let is_valid_image = data.starts_with(&[0x89, 0x50, 0x4E, 0x47]) || data.starts_with(&[0xFF, 0xD8, 0xFF]) || data.starts_with(b"GIF8") || data.starts_with(b"RIFF");
if !is_valid_image {
return Err(adk_core::AdkError::tool(
"File does not appear to be a valid image (PNG/JPEG/GIF/WebP). Check the file format.".to_string()
));
}
if data.len() > 5 * 1024 * 1024 && mime_type == "image/jpeg" {
tracing::warn!(size = data.len(), "large JPEG may fail Telegram processing");
}
let chat_id = user_id.split(':').last().unwrap_or(&user_id).to_string();
let key = crate::channel::ChannelKey {
channel_type: crate::channel::ChannelType::Telegram,
account_id: "default".to_string(),
};
if let Some(ch) = channel_map.get(&key) {
ch.send_photo(&chat_id, &data, &mime_type, caption.as_deref())
.await
.map_err(|e| adk_core::AdkError::tool(format!("Failed to send photo: {e}")))?;
Ok(serde_json::json!({
"sent": true,
"chat_id": chat_id,
"size_bytes": data.len(),
"mime_type": mime_type
}))
} else {
Err(adk_core::AdkError::tool("No Telegram channel available".to_string()))
}
}
},
)
}