use std::{sync::Arc, time::Duration};
use anyhow::{Result, anyhow, bail};
use chrono::Utc;
use futures::StreamExt;
use serde_json::{Value, json};
use tokio::{
io::AsyncWriteExt as _,
sync::{Mutex, RwLock, broadcast},
time,
};
use tracing::{debug, info, warn};
use uuid::Uuid;
#[derive(Debug, Clone, Default)]
pub struct LiveStatus {
pub state: String,
pub current_task: String,
pub tool_history: Vec<String>,
pub text_preview: String,
pub started_at: Option<std::time::Instant>,
pub session_key: String,
}
use super::{
loop_detection::LoopDetector,
memory::{MemoryDoc, MemoryStore},
registry::{AgentHandle, AgentMessage, AgentRegistry, AgentReply},
workspace::{
DEFAULT_MAX_CHARS_PER_FILE, DEFAULT_TOTAL_MAX_CHARS, SessionType, WorkspaceContext,
},
};
use crate::{
config::{runtime::RuntimeConfig, schema::ContextPruningConfig},
events::AgentEvent,
plugin::PluginRegistry,
provider::{
ContentPart, LlmRequest, Message, MessageContent, Role, StreamEvent, ToolDef,
failover::FailoverManager, registry::ProviderRegistry,
},
skill::{RunOptions, SkillRegistry, run_tool},
store::Store,
};
const DEFAULT_TIMEOUT_SECONDS: u64 = 172_800;
const NO_REPLY_TOKEN: &str = "NO_REPLY";
const DEFAULT_MAX_FILE_SIZE: usize = 50_000_000;
const DEFAULT_MAX_TEXT_CHARS: usize = 50_000;
const READONLY_COMMANDS: &[&str] = &[
"/help", "/version", "/status", "/health", "/uptime", "/models", "/ctx", "/btw", "/clear",
"/history", "/cron",
];
enum PendingStage {
SizeConfirm,
TokenConfirm {
extracted_text: String,
#[allow(dead_code)]
estimated_tokens: usize,
},
}
struct PendingFile {
filename: String,
path: std::path::PathBuf,
#[allow(dead_code)]
size: usize,
#[allow(dead_code)]
mime_type: String,
#[allow(dead_code)]
images: Vec<super::registry::ImageAttachment>,
stage: PendingStage,
}
fn model_supports_vision(model: &str, config: &RuntimeConfig) -> bool {
if let Some(v) = config
.ext
.tools
.as_ref()
.and_then(|t| t.upload.as_ref())
.and_then(|u| u.supports_vision)
{
return v;
}
let lower = model.to_lowercase();
lower.contains("gpt-4o")
|| lower.contains("gpt-4-turbo")
|| lower.contains("gpt-4-vision")
|| lower.contains("claude-3")
|| lower.contains("claude-sonnet")
|| lower.contains("claude-opus")
|| lower.contains("claude-haiku")
|| lower.contains("gemini")
|| lower.contains("qwen-vl")
|| lower.contains("qwen2-vl")
|| lower.contains("glm-4v")
|| lower.contains("yi-vision")
|| lower.contains("internvl")
|| lower.contains("llava")
|| lower.contains("minicpm-v")
|| lower.contains("deepseek-vl")
|| lower.contains("qwen3")
}
pub struct RunContext {
pub agent_id: String,
pub session_key: String,
pub channel: String,
pub peer_id: String,
pub loop_detector: LoopDetector,
pub has_images: bool,
pub user_msg_with_images: Option<Message>,
}
pub struct AgentRuntime {
pub handle: Arc<AgentHandle>,
pub config: Arc<RuntimeConfig>,
pub providers: Arc<ProviderRegistry>,
failover: FailoverManager,
pub skills: Arc<SkillRegistry>,
pub store: Arc<Store>,
pub memory: Option<Arc<Mutex<MemoryStore>>>,
pub agents: Option<Arc<AgentRegistry>>,
pub event_bus: Option<broadcast::Sender<AgentEvent>>,
pub spawner: Option<Arc<crate::agent::AgentSpawner>>,
pub plugins: Option<Arc<PluginRegistry>>,
pub mcp: Option<Arc<crate::mcp::McpRegistry>>,
browser: Arc<tokio::sync::OnceCell<Mutex<crate::browser::BrowserSession>>>,
sessions: std::collections::HashMap<String, Vec<Message>>,
compaction_state: std::collections::HashMap<String, (std::time::Instant, u32)>,
pending_files: std::collections::HashMap<String, Vec<PendingFile>>,
pub live_status: Arc<RwLock<LiveStatus>>,
runtime_max_file_size: Option<usize>,
runtime_max_text_chars: Option<usize>,
started_at: std::time::Instant,
workspace_cache: Option<crate::agent::workspace::WorkspaceCache>,
btw_manager: super::btw::BtwManager,
notification_tx: Option<tokio::sync::broadcast::Sender<crate::channel::OutboundMessage>>,
opencode_client: Arc<tokio::sync::OnceCell<crate::acp::client::AcpClient>>,
session_aliases: std::collections::HashMap<String, String>,
}
impl AgentRuntime {
pub fn new(
#[allow(clippy::too_many_arguments)] handle: Arc<AgentHandle>,
config: Arc<RuntimeConfig>,
providers: Arc<ProviderRegistry>,
fallback_models: Vec<String>,
skills: Arc<SkillRegistry>,
store: Arc<Store>,
memory: Option<Arc<Mutex<MemoryStore>>>,
agents: Option<Arc<AgentRegistry>>,
event_bus: Option<broadcast::Sender<AgentEvent>>,
spawner: Option<Arc<crate::agent::AgentSpawner>>,
plugins: Option<Arc<PluginRegistry>>,
mcp: Option<Arc<crate::mcp::McpRegistry>>,
notification_tx: Option<tokio::sync::broadcast::Sender<crate::channel::OutboundMessage>>,
) -> Self {
let auth_order = config
.model
.auth
.as_ref()
.and_then(|a| a.order.clone())
.unwrap_or_default();
let failover = FailoverManager::new(
auth_order,
std::collections::HashMap::new(),
fallback_models,
);
let session_aliases = store.db.load_all_aliases().unwrap_or_default();
let btw_manager = super::btw::BtwManager::new(Some(Arc::clone(&store.db)));
let live_status = Arc::clone(&handle.live_status);
Self {
handle,
config,
providers,
failover,
skills,
store,
memory,
agents,
event_bus,
spawner,
plugins,
mcp,
live_status,
browser: Arc::new(tokio::sync::OnceCell::new()),
sessions: std::collections::HashMap::new(),
compaction_state: std::collections::HashMap::new(),
pending_files: std::collections::HashMap::new(),
runtime_max_file_size: None,
runtime_max_text_chars: None,
started_at: std::time::Instant::now(),
workspace_cache: None,
btw_manager,
notification_tx,
opencode_client: Arc::new(tokio::sync::OnceCell::new()),
session_aliases,
}
}
async fn get_opencode_client(&self) -> Result<crate::acp::client::AcpClient> {
if let Some(client) = self.opencode_client.get() {
return Ok(client.clone());
}
let command = which::which("opencode")
.map(|p| p.to_string_lossy().to_string())
.or_else(|_| std::env::var("OPENCODE_PATH"))
.unwrap_or_else(|_| "opencode".to_string());
let args: Vec<&str> = vec!["acp"];
tracing::info!(command = %command, args = ?args, "OpenCode: starting subprocess");
let cwd = self
.handle
.config
.workspace
.as_deref()
.or(self.config.agents.defaults.workspace.as_deref())
.map(expand_tilde)
.unwrap_or_else(|| crate::config::loader::base_dir().join("workspace"))
.to_string_lossy()
.to_string();
tracing::info!(cwd = %cwd, "OpenCode: using workspace directory");
let client = crate::acp::client::AcpClient::spawn(&command, &args).await?;
client
.initialize("rsclaw", env!("RSCLAW_BUILD_VERSION"))
.await?;
let model = std::env::var("OPENCODE_MODEL").ok();
let session_resp = client.create_session(&cwd, model.as_deref(), None).await?;
tracing::info!(
session_id = %session_resp.session_id,
current_model = ?session_resp.models.as_ref().and_then(|m| m.available_models.first()).map(|m| &m.model_id),
"OpenCode session created"
);
self.opencode_client.set(client.clone()).ok();
Ok(client)
}
async fn tool_opencode(&self, ctx: &RunContext, args: Value) -> Result<Value> {
let task = args["task"]
.as_str()
.ok_or_else(|| anyhow!("opencode tool requires 'task' argument"))?;
tracing::info!(task = %task, "tool_opencode: starting");
let client = self.get_opencode_client().await?;
let session_id = client.session_id().await.unwrap_or_default();
let session_id_clone = session_id.clone();
let notif_tx = self.notification_tx.clone();
let target_id = ctx.peer_id.clone();
let task_str = task.to_string();
if let Some(ref tx) = notif_tx {
let _ = tx.send(crate::channel::OutboundMessage {
target_id: target_id.clone(),
is_group: false,
text: "🚀 OpenCode 任务已提交,执行中...".to_string(),
reply_to: None,
images: vec![],
});
}
let notif_tx_bg = notif_tx.clone();
let target_id_bg = target_id.clone();
tokio::spawn(async move {
let mut event_rx = client.subscribe_events();
let events = Arc::new(tokio::sync::Mutex::new(Vec::<String>::new()));
let events_clone = Arc::clone(&events);
let notif_tx_clone = notif_tx_bg.clone();
let target_id_clone = target_id_bg.clone();
let _event_collector = tokio::spawn(async move {
let mut pending = String::new();
let mut interval = 0u64;
loop {
match event_rx.recv().await {
Ok(event) => {
let event_str = match &event {
crate::acp::client::SessionEvent::ToolCallStarted {
title, ..
} => {
format!("🔧 {}", title.as_deref().unwrap_or("tool"))
}
crate::acp::client::SessionEvent::ToolCallCompleted {
result,
..
} => result
.as_ref()
.map(|r| {
if r.len() > 100 {
format!("✅ {}...", &r[..100])
} else {
format!("✅ {}", r)
}
})
.unwrap_or_default(),
crate::acp::client::SessionEvent::ToolCallFailed {
error, ..
} => {
format!("❌ {}", error)
}
_ => String::new(),
};
if !event_str.is_empty() {
events_clone.lock().await.push(event_str.clone());
if let Some(ref tx) = notif_tx_clone {
pending.push_str(&event_str);
pending.push('\n');
interval += 1;
if interval >= 3 || pending.len() > 400 {
let _ = tx.send(crate::channel::OutboundMessage {
target_id: target_id_clone.clone(),
is_group: false,
text: format!("🔄 OpenCode\n{}", pending.trim()),
reply_to: None,
images: vec![],
});
pending.clear();
interval = 0;
}
}
}
}
Err(_) => break,
}
}
});
tracing::info!("tool_opencode: sending prompt");
let send_result = client.send_prompt(&task_str).await;
match send_result {
Ok(resp) => {
tracing::info!(
"tool_opencode: send_prompt completed, stop_reason={:?}",
resp.stop_reason
);
let events_text = events.lock().await.join("\n");
let collected = client.get_collected_content().await;
tracing::info!(
"tool_opencode: events_text len={}, collected len={}",
events_text.len(),
collected.len()
);
let output = if !events_text.is_empty() {
events_text
} else if !collected.is_empty() {
collected
} else if let Some(result) = resp.result {
result
.content
.iter()
.filter_map(|b| match b {
crate::acp::types::ContentBlock::Text { text } => {
Some(text.clone())
}
_ => None,
})
.collect::<Vec<_>>()
.join("\n")
} else {
"(无输出)".to_string()
};
let final_output = if output.len() > 4000 {
format!("{}...\n\n[已截断]", &output[..4000])
} else {
output
};
if let Some(ref tx) = notif_tx_bg {
let _ = tx.send(crate::channel::OutboundMessage {
target_id: target_id_bg.clone(),
is_group: false,
text: format!("✅ OpenCode 完成\n\n{}", final_output),
reply_to: None,
images: vec![],
});
}
}
Err(e) => {
tracing::error!("tool_opencode: send_prompt failed: {}", e);
if let Some(ref tx) = notif_tx_bg {
let _ = tx.send(crate::channel::OutboundMessage {
target_id: target_id_bg.clone(),
is_group: false,
text: format!("❌ OpenCode 错误\n\n{}", e),
reply_to: None,
images: vec![],
});
}
}
}
});
Ok(serde_json::json!({
"output": "OpenCode 任务已提交,完成后将推送结果。",
"status": "submitted",
"session_id": session_id_clone
}))
}
fn resolve_model_name(&self) -> String {
self.handle
.config
.model
.as_ref()
.and_then(|m| m.primary.as_deref())
.or_else(|| {
self.config
.agents
.defaults
.model
.as_ref()
.and_then(|m| m.primary.as_deref())
})
.unwrap_or("anthropic/claude-sonnet-4-6")
.to_owned()
}
async fn fire_hook(&self, hook: &str, params: Value) {
let Some(ref reg) = self.plugins else { return };
for plugin in reg.all() {
if !plugin.manifest.hooks.iter().any(|h| h == hook) {
continue;
}
if let Err(e) = plugin.call(hook, params.clone()).await {
warn!(plugin = %plugin.manifest.name, hook, "hook error: {e:#}");
}
}
}
async fn handle_side_query(&mut self, session_key: &str, question: &str) -> Result<AgentReply> {
let history: Vec<Message> = self.sessions.get(session_key).cloned().unwrap_or_default();
let mut messages: Vec<Message> = history.into_iter().rev().take(10).rev().collect();
messages.push(Message {
role: Role::User,
content: MessageContent::Text(question.to_owned()),
});
let model = self.resolve_model_name();
let req = LlmRequest {
model,
messages,
tools: vec![], system: Some(
"You are answering a quick side question (/btw). Be concise and direct. \
You have no tools available. Answer from the conversation context and \
your general knowledge only."
.to_owned(),
),
max_tokens: Some(500),
temperature: None,
thinking_budget: None,
};
let providers = Arc::clone(&self.providers);
let mut stream = self.failover.call(req, &providers).await?;
let mut text_buf = String::new();
while let Some(event) = stream.next().await {
match event {
Ok(StreamEvent::TextDelta(d)) => text_buf.push_str(&d),
Ok(StreamEvent::Done { .. }) | Ok(StreamEvent::Error(_)) => break,
Ok(_) => {}
Err(e) => {
warn!("/btw stream error: {e:#}");
break;
}
}
}
Ok(AgentReply {
text: if text_buf.is_empty() {
"[/btw] (no response)".to_owned()
} else {
format!("[/btw] {}", text_buf)
},
is_empty: text_buf.is_empty(),
tool_calls: None,
images: vec![],
pending_analysis: None,
})
}
pub async fn run_turn(
&mut self,
session_key: &str,
text: &str,
channel: &str,
peer_id: &str,
extra_tools: Vec<ToolDef>,
images: Vec<super::registry::ImageAttachment>,
files: Vec<super::registry::FileAttachment>,
) -> Result<AgentReply> {
let session_key = self.resolve_session_key(session_key).to_owned();
let session_key = session_key.as_str();
let sem = Arc::clone(&self.handle.concurrency);
let _permit = sem
.acquire()
.await
.map_err(|_| anyhow!("agent concurrency semaphore closed"))?;
if let Ok(mut status) = self.live_status.try_write() {
status.state = "thinking".to_owned();
let preview = text
.char_indices()
.nth(100)
.map(|(i, _)| &text[..i])
.unwrap_or(text);
status.current_task = preview.to_owned();
status.started_at = Some(std::time::Instant::now());
status.session_key = session_key.to_owned();
status.tool_history.clear();
status.text_preview.clear();
}
let _agent_cfg = &self.handle.config;
let i18n_lang = self
.config
.raw
.gateway
.as_ref()
.and_then(|g| g.language.as_deref())
.map(crate::i18n::resolve_lang)
.unwrap_or("en");
let pending_response = text.trim();
if (pending_response == "1" || pending_response == "2" || pending_response == "3")
&& let Some(files) = self.pending_files.remove(session_key)
&& !files.is_empty()
{
let workspace = self
.handle
.config
.workspace
.as_deref()
.or(self.config.agents.defaults.workspace.as_deref())
.map(expand_tilde)
.unwrap_or_else(|| crate::config::loader::base_dir().join("workspace"));
let uploads = workspace.join("uploads");
match pending_response {
"1" => {
let upload_cfg = self
.config
.ext
.tools
.as_ref()
.and_then(|t| t.upload.as_ref());
let max_chars = upload_cfg
.and_then(|u| u.max_text_chars)
.unwrap_or(DEFAULT_MAX_TEXT_CHARS);
let mut analysis_text = String::new();
let mut binary_kept = Vec::new();
for pf in &files {
if let PendingStage::TokenConfirm {
ref extracted_text, ..
} = pf.stage
{
let mut end = max_chars.min(extracted_text.len());
while end < extracted_text.len()
&& !extracted_text.is_char_boundary(end)
{
end += 1;
}
let truncated = &extracted_text[..end];
analysis_text
.push_str(&format!("[File: {}]\n{}\n", pf.filename, truncated));
} else {
binary_kept.push(pf.filename.clone());
}
let _ = std::fs::remove_file(&pf.path);
}
if analysis_text.is_empty() {
let msg = binary_kept
.iter()
.map(|f| format!("- {f} (kept in uploads/)"))
.collect::<Vec<_>>()
.join("\n");
return Ok(AgentReply {
text: msg,
is_empty: false,
tool_calls: None,
images: vec![],
pending_analysis: None,
});
}
return Ok(AgentReply {
text: crate::i18n::t("analyzing", i18n_lang),
is_empty: false,
tool_calls: None,
images: vec![],
pending_analysis: Some(crate::agent::PendingAnalysis {
text: analysis_text,
session_key: session_key.to_owned(),
channel: channel.to_owned(),
peer_id: peer_id.to_owned(),
}),
});
}
"2" => {
let upload_cfg = self
.config
.ext
.tools
.as_ref()
.and_then(|t| t.upload.as_ref());
let max_chars = upload_cfg
.and_then(|u| u.max_text_chars)
.unwrap_or(DEFAULT_MAX_TEXT_CHARS);
let mut analysis_text = String::new();
let mut binary_deleted = Vec::new();
for pf in &files {
if let PendingStage::TokenConfirm {
ref extracted_text, ..
} = pf.stage
{
let mut end = max_chars.min(extracted_text.len());
while end < extracted_text.len()
&& !extracted_text.is_char_boundary(end)
{
end += 1;
}
let truncated = &extracted_text[..end];
analysis_text
.push_str(&format!("[File: {}]\n{}\n", pf.filename, truncated));
} else {
binary_deleted.push(pf.filename.clone());
}
let _ = std::fs::remove_file(&pf.path);
let _ = std::fs::remove_file(uploads.join(&pf.filename));
}
if analysis_text.is_empty() {
let msg = if binary_deleted.is_empty() {
crate::i18n::t("no_extractable_deleted", i18n_lang)
} else {
format!(
"{}\n{}",
binary_deleted
.iter()
.map(|f| format!("- {f}"))
.collect::<Vec<_>>()
.join("\n"),
crate::i18n::t("no_extractable_deleted", i18n_lang)
)
};
return Ok(AgentReply {
text: msg,
is_empty: false,
tool_calls: None,
images: vec![],
pending_analysis: None,
});
}
if !binary_deleted.is_empty() {
analysis_text.push_str(&format!(
"\n[Binary files deleted (no extractable text): {}]\n",
binary_deleted.join(", ")
));
}
return Ok(AgentReply {
text: crate::i18n::t("analyzing", i18n_lang),
is_empty: false,
tool_calls: None,
images: vec![],
pending_analysis: Some(crate::agent::PendingAnalysis {
text: analysis_text,
session_key: session_key.to_owned(),
channel: channel.to_owned(),
peer_id: peer_id.to_owned(),
}),
});
}
_ => {
for pf in &files {
let _ = std::fs::remove_file(&pf.path);
let _ = std::fs::remove_file(uploads.join(&pf.filename));
}
return Ok(AgentReply {
text: crate::i18n::t("files_deleted", i18n_lang),
is_empty: false,
tool_calls: None,
images: vec![],
pending_analysis: None,
});
}
}
}
let safety_on = self
.config
.ext
.tools
.as_ref()
.and_then(|t| t.exec.as_ref())
.and_then(|e| e.safety)
.unwrap_or(false);
let preparse = crate::agent::preparse::PreParseEngine::load_with_safety(safety_on);
let is_default = self.handle.config.default.unwrap_or(false) || self.handle.id == "main";
let allowed = self
.handle
.config
.allowed_commands
.as_deref()
.unwrap_or(if is_default { "*" } else { "" });
let cmd_permitted = |input: &str| -> bool {
if allowed == "*" {
return true;
}
let cmd = input.trim().split_whitespace().next().unwrap_or("");
if READONLY_COMMANDS.iter().any(|c| *c == cmd) {
return true;
}
if allowed.is_empty() {
return false;
}
allowed.split('|').any(|a| a.trim() == cmd)
};
match preparse.try_parse(text) {
crate::agent::preparse::PreParseResult::PassThrough => {
}
crate::agent::preparse::PreParseResult::DirectResponse(response)
if cmd_permitted(text) =>
{
let reply_text = match response.as_str() {
"__HELP__" => build_help_text_filtered(allowed),
"__VERSION__" => format!("rsclaw v{}", env!("RSCLAW_BUILD_VERSION")),
"__STATUS__" => {
let model = self.resolve_model_name();
let sessions = self.sessions.len();
let uptime = format_duration(self.started_at.elapsed());
format!(
"Gateway: running\nModel: {model}\nSessions: {sessions}\nUptime: {uptime}\nVersion: rsclaw v{}",
env!("RSCLAW_BUILD_VERSION")
)
}
"__HEALTH__" => {
let model = self.resolve_model_name();
let (prov_name, _) =
crate::provider::registry::ProviderRegistry::parse_model(&model);
let provider_ok = self.providers.get(prov_name).is_ok();
format!(
"Health check:\n Provider ({}): {}\n Store: ok\n Agent: {}\n Version: rsclaw v{}",
model,
if provider_ok { "ok" } else { "unavailable" },
self.handle.id,
env!("RSCLAW_BUILD_VERSION"),
)
}
"__UPTIME__" => format_duration(self.started_at.elapsed()),
"__MODELS__" => {
let current = self.resolve_model_name();
let mut lines = vec![format!("Current model: {current}")];
lines.push(String::new());
lines.push("Registered providers:".to_owned());
for name in self.providers.names() {
lines.push(format!(" {name}"));
}
lines.join("\n")
}
s if s.starts_with("__MODEL_SET__:") => {
let model = s.strip_prefix("__MODEL_SET__:").unwrap_or("");
format!(
"Model switched to: {model} (runtime only, use configure to persist)"
)
}
"__CLEAR__" => {
self.sessions.remove(session_key);
"Session cleared.".to_owned()
}
"__RESET__" => {
let key = self.resolve_session_key(session_key).to_owned();
self.sessions.remove(&key);
let _ = self.store.db.delete_session(&key);
"Session reset.".to_owned()
}
s if s.starts_with("__HISTORY__:") => {
let n: usize = s
.strip_prefix("__HISTORY__:")
.unwrap_or("20")
.parse()
.unwrap_or(20);
if let Some(msgs) = self.sessions.get(session_key) {
let start = msgs.len().saturating_sub(n);
let mut lines = Vec::new();
for (i, msg) in msgs[start..].iter().enumerate() {
let role = match msg.role {
crate::provider::Role::User => "You",
crate::provider::Role::Assistant => "AI",
crate::provider::Role::System => "Sys",
crate::provider::Role::Tool => "Tool",
};
let text = match &msg.content {
crate::provider::MessageContent::Text(s) => s.clone(),
crate::provider::MessageContent::Parts(parts) => parts
.iter()
.filter_map(|p| {
if let crate::provider::ContentPart::Text { text } = p {
Some(text.as_str())
} else {
None
}
})
.collect::<Vec<_>>()
.join(" "),
};
let preview: String = if text.chars().count() > 100 {
text.chars().take(100).collect::<String>() + "..."
} else {
text.clone()
};
lines.push(format!("{}. [{}] {}", start + i + 1, role, preview));
}
if lines.is_empty() {
"No messages in this session.".to_owned()
} else {
lines.join("\n")
}
} else {
"No messages in this session.".to_owned()
}
}
"__SESSIONS__" => {
if self.sessions.is_empty() {
"No active sessions.".to_owned()
} else {
let mut lines =
vec![format!("Active sessions: {}", self.sessions.len())];
for (key, msgs) in &self.sessions {
let short_key = if key.len() > 30 {
let end = key.char_indices().nth(30).map(|(i, _)| i).unwrap_or(key.len());
&key[..end]
} else { key };
lines.push(format!(" {} ({} messages)", short_key, msgs.len()));
}
lines.join("\n")
}
}
"__CRON_LIST__" => {
if let Some(ref cron_cfg) = self.config.ops.cron {
if let Some(ref jobs) = cron_cfg.jobs {
if jobs.is_empty() {
"No cron jobs configured.".to_owned()
} else {
let mut lines = vec!["Cron jobs:".to_owned()];
for job in jobs {
let enabled = job.enabled.unwrap_or(true);
let status = if enabled { "" } else { " (disabled)" };
let agent = job.agent_id.as_deref().unwrap_or("main");
let msg_preview = if job.message.len() > 50 {
let end = job.message.char_indices().nth(47).map(|(i, _)| i).unwrap_or(job.message.len());
format!("{}...", &job.message[..end])
} else {
job.message.clone()
};
lines.push(format!(
" [{}] {} -> {} \"{}\"{}",
job.id, job.schedule, agent, msg_preview, status
));
}
lines.join("\n")
}
} else {
"No cron jobs configured.".to_owned()
}
} else {
"No cron jobs configured.".to_owned()
}
}
"__GET_UPLOAD_SIZE__" => {
let max = self
.runtime_max_file_size
.or_else(|| {
self.config
.ext
.tools
.as_ref()
.and_then(|t| t.upload.as_ref())
.and_then(|u| u.max_file_size)
})
.unwrap_or(DEFAULT_MAX_FILE_SIZE);
format!("Upload size limit: {} MB", max / 1_000_000)
}
s if s.starts_with("__SET_UPLOAD_SIZE__:") => {
let mb = s
.strip_prefix("__SET_UPLOAD_SIZE__:")
.unwrap_or("50")
.parse::<usize>()
.unwrap_or(50);
self.runtime_max_file_size = Some(mb * 1_000_000);
format!("Upload size limit set to {mb} MB (effective immediately)")
}
"__GET_UPLOAD_CHARS__" => {
let max_chars = self
.runtime_max_text_chars
.or_else(|| {
self.config
.ext
.tools
.as_ref()
.and_then(|t| t.upload.as_ref())
.and_then(|u| u.max_text_chars)
})
.unwrap_or(DEFAULT_MAX_TEXT_CHARS);
let est_tokens = max_chars / 4;
format!("Max text per message: {max_chars} chars (~{est_tokens} tokens)")
}
s if s.starts_with("__SET_UPLOAD_CHARS__:") => {
let chars = s
.strip_prefix("__SET_UPLOAD_CHARS__:")
.unwrap_or("50000")
.parse::<usize>()
.unwrap_or(50000);
let est_tokens = chars / 4;
self.runtime_max_text_chars = Some(chars);
format!(
"Upload text limit set to {chars} chars (~{est_tokens} tokens, effective immediately)"
)
}
s if s.starts_with("__CONFIG_UPLOAD_SIZE__:") => {
let mb = s
.strip_prefix("__CONFIG_UPLOAD_SIZE__:")
.unwrap_or("50")
.parse::<usize>()
.unwrap_or(50);
let bytes = mb * 1_000_000;
self.runtime_max_file_size = Some(bytes);
match write_config_value(
"tools.upload.maxFileSize",
serde_json::json!(bytes),
) {
Ok(()) => format!("Upload size limit set to {mb} MB (saved to config)"),
Err(e) => format!(
"Upload size limit set to {mb} MB (runtime only, config write failed: {e})"
),
}
}
s if s.starts_with("__CONFIG_UPLOAD_CHARS__:") => {
let chars = s
.strip_prefix("__CONFIG_UPLOAD_CHARS__:")
.unwrap_or("50000")
.parse::<usize>()
.unwrap_or(50_000);
let est_tokens = chars / 4;
self.runtime_max_text_chars = Some(chars);
match write_config_value(
"tools.upload.maxTextChars",
serde_json::json!(chars),
) {
Ok(()) => format!(
"Upload text limit set to {chars} chars (~{est_tokens} tokens, saved to config)"
),
Err(e) => format!(
"Upload text limit set to {chars} chars (runtime only, config write failed: {e})"
),
}
}
s if s.starts_with("__CTX_ADD__:") => {
let content = s.strip_prefix("__CTX_ADD__:").unwrap_or("");
let id = self
.btw_manager
.add(
content,
super::btw::BtwScope::Session(session_key.to_owned()),
None,
)
.await;
let lang = crate::i18n::default_lang();
crate::i18n::t_fmt("btw_added", lang, &[("id", &id.to_string())])
}
s if s.starts_with("__CTX_TTL__:") => {
let rest = s.strip_prefix("__CTX_TTL__:").unwrap_or("");
let (turns_str, content) = rest.split_once(':').unwrap_or(("0", rest));
let turns: u32 = turns_str.parse().unwrap_or(0);
let id = self
.btw_manager
.add(
content,
super::btw::BtwScope::Session(session_key.to_owned()),
Some(turns),
)
.await;
let lang = crate::i18n::default_lang();
crate::i18n::t_fmt(
"btw_added_ttl",
lang,
&[("id", &id.to_string()), ("turns", &turns.to_string())],
)
}
s if s.starts_with("__CTX_GLOBAL__:") => {
if !is_default {
format!("Command not available on agent `{}`.", self.handle.id)
} else {
let content = s.strip_prefix("__CTX_GLOBAL__:").unwrap_or("");
let id = self.btw_manager.add(
content,
super::btw::BtwScope::Global,
None,
).await;
let lang = crate::i18n::default_lang();
crate::i18n::t_fmt("btw_added_global", lang, &[("id", &id.to_string())])
}
}
"__CTX_LIST__" => {
let entries = self.btw_manager.list(session_key, channel).await;
if entries.is_empty() {
let lang = crate::i18n::default_lang();
crate::i18n::t("btw_list_empty", lang)
} else {
let mut lines = Vec::new();
for e in &entries {
let ttl_info = if let Some(remaining) = e.remaining_turns {
format!("{remaining} turns left")
} else {
"permanent".to_owned()
};
let scope_info = match &e.scope {
super::btw::BtwScope::Session(_) => "",
super::btw::BtwScope::Channel(_) => " [channel]",
super::btw::BtwScope::Global => " [global]",
};
lines.push(format!(
"[{}] ({}{}) {}",
e.id, ttl_info, scope_info, e.content
));
}
lines.join("\n")
}
}
"__CTX_CLEAR__" => {
self.btw_manager.clear(Some(session_key)).await;
let lang = crate::i18n::default_lang();
crate::i18n::t("btw_cleared", lang)
}
s if s.starts_with("__CTX_REMOVE__:") => {
let id_str = s.strip_prefix("__CTX_REMOVE__:").unwrap_or("0");
let id: u32 = id_str.parse().unwrap_or(0);
let lang = crate::i18n::default_lang();
if self.btw_manager.remove(id).await {
crate::i18n::t_fmt("btw_removed", lang, &[("id", &id.to_string())])
} else {
crate::i18n::t_fmt("btw_not_found", lang, &[("id", &id.to_string())])
}
}
"__CTX_USAGE__" => {
"Usage:\n /ctx <text> Add context (session)\n /ctx --ttl <N> <text> Add context (expires in N turns)\n /ctx --global <text> Add global context\n /ctx --list List entries\n /ctx --remove <id> Remove entry\n /ctx --clear Clear all".to_owned()
}
s if s.starts_with("__SIDE_QUERY__:") => {
let question = s.strip_prefix("__SIDE_QUERY__:").unwrap_or("");
return self.handle_side_query(session_key, question).await;
}
s if s.starts_with("__") => {
text.to_owned() }
"" => {
return Ok(AgentReply {
text: String::new(),
is_empty: true,
tool_calls: None,
images: vec![],
pending_analysis: None,
});
}
other => other.to_owned(),
};
if !reply_text.starts_with("__") {
return Ok(AgentReply {
text: reply_text,
is_empty: false,
tool_calls: None,
images: vec![],
pending_analysis: None,
});
}
}
crate::agent::preparse::PreParseResult::ToolCall { tool, args }
if cmd_permitted(text) =>
{
let is_group = session_key.contains(":group:");
if is_group && matches!(tool.as_str(), "exec" | "read" | "write") {
return Ok(AgentReply {
text: "[Blocked] Shell/file commands are not allowed in group chats for security.".to_owned(),
is_empty: false,
tool_calls: None,
images: vec![],
pending_analysis: None,
});
}
info!(tool = %tool, "pre-parse: executing tool directly");
let result = self
.dispatch_tool(
&RunContext {
agent_id: self.handle.id.clone(),
session_key: session_key.to_owned(),
channel: channel.to_owned(),
peer_id: peer_id.to_owned(),
loop_detector: crate::agent::loop_detection::LoopDetector::default(),
has_images: false,
user_msg_with_images: None,
},
"",
&tool,
args.clone(),
)
.await;
match result {
Ok(val) => {
let (reply_text, reply_images) =
if let Some(img) = val.get("image").and_then(|v| v.as_str()) {
("".to_owned(), vec![img.to_owned()])
} else if val.is_string() {
(val.as_str().unwrap_or("").to_owned(), vec![])
} else {
(format_tool_result(&val), vec![])
};
return Ok(AgentReply {
text: reply_text.clone(),
is_empty: reply_text.is_empty() && reply_images.is_empty(),
tool_calls: None,
images: reply_images,
pending_analysis: None,
});
}
Err(e) => {
return Ok(AgentReply {
text: format!("error: {e}"),
is_empty: false,
tool_calls: None,
images: vec![],
pending_analysis: None,
});
}
}
}
crate::agent::preparse::PreParseResult::Blocked(reason) => {
let safety_on = self
.config
.ext
.tools
.as_ref()
.and_then(|t| t.exec.as_ref())
.and_then(|e| e.safety)
.unwrap_or(false);
if safety_on {
warn!(reason = %reason, "pre-parse: command blocked");
return Ok(AgentReply {
text: format!("[blocked] {reason}"),
is_empty: false,
tool_calls: None,
images: vec![],
pending_analysis: None,
});
}
}
crate::agent::preparse::PreParseResult::NeedsConfirm { command, reason } => {
let safety_on = self
.config
.ext
.tools
.as_ref()
.and_then(|t| t.exec.as_ref())
.and_then(|e| e.safety)
.unwrap_or(false);
if safety_on {
return Ok(AgentReply {
text: format!(
"[confirm required] {reason}\nCommand: {command}\nReply 'yes' or 'y' to confirm."
),
is_empty: false,
tool_calls: None,
images: vec![],
pending_analysis: None,
});
}
}
crate::agent::preparse::PreParseResult::DirectResponse(_)
| crate::agent::preparse::PreParseResult::ToolCall { .. } => {
return Ok(AgentReply {
text: format!("Command not available on agent `{}`.", self.handle.id),
is_empty: false,
tool_calls: None,
images: vec![],
pending_analysis: None,
});
}
}
let agent_cfg = &self.handle.config;
if text.starts_with("__DIRECT_REPLY__") {
let reply = text.strip_prefix("__DIRECT_REPLY__").unwrap_or(text);
return Ok(AgentReply {
text: reply.to_owned(),
is_empty: false,
tool_calls: None,
images: vec![],
pending_analysis: None,
});
}
let (media_files, regular_files): (Vec<_>, Vec<_>) = files.into_iter().partition(|f| {
crate::channel::is_video_attachment(&f.mime_type, &f.filename)
|| crate::channel::is_audio_attachment(&f.mime_type, &f.filename)
});
let files = regular_files;
if !media_files.is_empty() {
let mut transcriptions = Vec::new();
for mf in &media_files {
if let Some(t) = extract_audio_text(&mf.data, &mf.filename.to_lowercase()).await {
info!(chars = t.len(), file = %mf.filename, "media transcribed from file attachment");
transcriptions.push(format!("[{}]\n{}", mf.filename, t));
} else {
transcriptions.push(format!("[{} (transcription failed)]", mf.filename));
}
}
if !transcriptions.is_empty() && files.is_empty() {
let combined = transcriptions.join("\n\n");
let full_text = if text.is_empty() {
combined
} else {
format!("{text}\n\n{combined}")
};
return Box::pin(self.run_turn(
session_key,
&full_text,
channel,
peer_id,
extra_tools,
images,
vec![],
))
.await;
} else if !transcriptions.is_empty() {
let combined = transcriptions.join("\n\n");
let full_text = if text.is_empty() {
combined
} else {
format!("{text}\n\n{combined}")
};
return Box::pin(self.run_turn(
session_key,
&full_text,
channel,
peer_id,
extra_tools,
images,
files,
))
.await;
}
}
if !files.is_empty() {
let ws = agent_cfg
.workspace
.as_deref()
.or(self.config.agents.defaults.workspace.as_deref())
.map(expand_tilde)
.unwrap_or_else(|| crate::config::loader::base_dir().join("workspace"));
let uploads = ws.join("uploads");
let _ = std::fs::create_dir_all(&uploads);
let upload_cfg = self
.config
.ext
.tools
.as_ref()
.and_then(|t| t.upload.as_ref());
let max_file_size = self
.runtime_max_file_size
.or_else(|| upload_cfg.and_then(|u| u.max_file_size))
.unwrap_or(DEFAULT_MAX_FILE_SIZE);
let mut rejected = Vec::new();
let mut accepted = Vec::new();
for f in files {
if f.data.len() > max_file_size {
rejected.push(format!(
"- {} ({:.1} MB)",
f.filename,
f.data.len() as f64 / 1e6
));
} else {
accepted.push(f);
}
}
if !rejected.is_empty() && accepted.is_empty() {
let limit_str = format!("{:.0}", max_file_size as f64 / 1e6);
let msg =
crate::i18n::t_fmt("file_size_exceeded", i18n_lang, &[("limit", &limit_str)]);
let adjust = crate::i18n::t("file_size_adjust", i18n_lang);
return Ok(AgentReply {
text: format!("{msg}\n{}\n\n{adjust}", rejected.join("\n")),
is_empty: false,
tool_calls: None,
images: vec![],
pending_analysis: None,
});
}
let files = accepted;
let total_size: usize = files.iter().map(|f| f.data.len()).sum();
let available = fs2::available_space(&uploads).unwrap_or(u64::MAX);
if (total_size as u64) + 100_000_000 > available {
let avail_mb = available / 1_000_000;
let need_mb = total_size / 1_000_000;
return Ok(AgentReply {
text: crate::i18n::t_fmt(
"disk_space_low",
i18n_lang,
&[
("need", &need_mb.to_string()),
("avail", &avail_mb.to_string()),
],
),
is_empty: false,
tool_calls: None,
images: vec![],
pending_analysis: None,
});
}
let mut file_info = Vec::new();
for file in files {
let dest = uploads.join(&file.filename);
let size = file.data.len();
let _ = std::fs::write(&dest, &file.data);
let extracted = extract_file_text(&file.filename, &file.data).await;
let has_text = extracted.is_some();
let est_tokens = extracted.as_ref().map(|t| estimate_tokens(t)).unwrap_or(0);
file_info.push((file.filename.clone(), size, has_text, est_tokens));
let path =
std::env::temp_dir().join(format!("rsclaw_pending_{}.bin", Uuid::new_v4()));
let _ = std::fs::write(&path, &file.data);
let stage = if let Some(ext_text) = extracted {
PendingStage::TokenConfirm {
extracted_text: ext_text,
estimated_tokens: est_tokens,
}
} else {
PendingStage::SizeConfirm
};
self.pending_files
.entry(session_key.to_owned())
.or_default()
.push(PendingFile {
filename: file.filename,
path,
size,
mime_type: file.mime_type,
images: vec![],
stage,
});
}
let file_list: String = file_info
.iter()
.map(|(name, size, has_text, tokens)| {
let size_str = if *size > 1_000_000 {
format!("{:.1} MB", *size as f64 / 1_000_000.0)
} else {
format!("{:.1} KB", *size as f64 / 1_000.0)
};
let analysis = if *has_text {
crate::i18n::t_fmt(
"file_analyzable",
i18n_lang,
&[("tokens", &tokens.to_string())],
)
} else {
crate::i18n::t("file_binary", i18n_lang)
};
format!("- {name} ({size_str}, {analysis})")
})
.collect::<Vec<_>>()
.join("\n");
let saved_msg = crate::i18n::t_fmt(
"file_saved",
i18n_lang,
&[("count", &file_info.len().to_string())],
);
let any_analyzable = file_info.iter().any(|(_, _, has_text, _)| *has_text);
let menu_msg = if any_analyzable {
crate::i18n::t("file_menu", i18n_lang)
} else {
"1. Keep\n2. Delete".to_owned()
};
let reply = format!("{saved_msg}\n{file_list}\n\n{menu_msg}");
return Ok(AgentReply {
text: reply,
is_empty: false,
tool_calls: None,
images: vec![],
pending_analysis: None,
});
}
let workspace = agent_cfg
.workspace
.as_deref()
.or(self.config.agents.defaults.workspace.as_deref())
.map(expand_tilde)
.unwrap_or_else(|| crate::config::loader::base_dir().join("workspace"));
let ws_ctx = {
let cache = self
.workspace_cache
.get_or_insert_with(|| crate::agent::workspace::WorkspaceCache::new(&workspace));
cache.load(
SessionType::Normal,
true,
DEFAULT_MAX_CHARS_PER_FILE,
DEFAULT_TOTAL_MAX_CHARS,
)
};
let mut system_prompt = build_system_prompt(&ws_ctx, &self.skills, &self.config.raw);
if let Some(ref mem) = self.memory
&& !text.trim().is_empty()
{
let scope = format!("agent:{}", self.handle.id);
let mem_cfg = &self.config.raw.memory;
let recall_top_k = mem_cfg.as_ref().and_then(|m| m.recall_top_k).unwrap_or(10);
let recall_final_k = mem_cfg.as_ref().and_then(|m| m.recall_final_k).unwrap_or(5);
let vec_hits = {
let mut guard = mem.lock().await;
guard
.search(text, Some(&scope), recall_top_k)
.await
.unwrap_or_default()
};
let bm25_hits = self
.store
.search
.search(text, Some(&scope), recall_top_k)
.unwrap_or_default();
let results = rrf_fuse(vec_hits, bm25_hits, recall_final_k);
if !results.is_empty() {
let mem_block = format!(
"<relevant-memories>\n{}\n</relevant-memories>",
results
.iter()
.map(|d| format!("- [{}] {}", d.kind, d.display_text()))
.collect::<Vec<_>>()
.join("\n")
);
if system_prompt.is_empty() {
system_prompt = mem_block;
} else {
system_prompt = format!("{system_prompt}\n\n{mem_block}");
}
}
}
let btw_block = self
.btw_manager
.to_prompt_block_relevant(session_key, channel, text)
.await;
if !btw_block.is_empty() {
system_prompt.push_str("\n\n");
system_prompt.push_str(&btw_block);
}
self.fire_hook(
"before_prompt_build",
json!({
"agent_id": self.handle.id,
"session_key": session_key,
"channel": channel,
}),
)
.await;
let model = agent_cfg
.model
.as_ref()
.and_then(|m| m.primary.as_deref())
.or_else(|| {
self.config
.agents
.defaults
.model
.as_ref()
.and_then(|m| m.primary.as_deref())
})
.unwrap_or("anthropic/claude-sonnet-4-6")
.to_owned();
if channel == "feishu" || channel == "dingtalk" || channel == "wecom" {
system_prompt.push_str(concat!(
"\n\n[Output format rules for IM chat]\n",
"- Never use Markdown headings (#, ##, ###).\n",
"- Use **bold text** or 【section title】 for sections.\n",
"- Use 1. or - for lists.\n",
"- Use > for important quotes.\n",
"- Do NOT use Markdown tables (|---|). Use \"label: value\" format instead.\n",
"- Keep responses concise for chat readability.\n",
"\n[Tool usage rules]\n",
"- For ANY question about real-time data (prices, weather, news, dates, events), you MUST call web_search first. NEVER answer from memory.\n",
"- For shell commands, file operations, use exec/read/write tools.\n",
));
}
let model_cfg = self.handle.config.model.as_ref().or(self
.config
.agents
.defaults
.model
.as_ref());
let tools_enabled = model_cfg.and_then(|m| m.tools_enabled).unwrap_or(true);
let tools = if !tools_enabled {
vec![]
} else {
let mut all = build_tool_list(
&self.skills,
self.agents.as_deref(),
&self.handle.id,
&self.config.agents.external,
);
all.extend(extra_tools.iter().cloned());
if let Some(ref mcp) = self.mcp {
all.extend(mcp.all_tool_defs().await);
}
let is_default = self.handle.config.default.unwrap_or(false);
let default_toolset = if is_default { "full" } else { "standard" };
let toolset = model_cfg
.and_then(|m| m.toolset.as_deref())
.unwrap_or(default_toolset);
let custom_tools = model_cfg.and_then(|m| m.tools.as_ref());
let allowed = toolset_allowed_names(toolset, custom_tools);
if let Some(ref names) = allowed {
all.retain(|t| names.contains(&t.name.as_str().to_owned()));
}
let is_group = session_key.contains(":group:");
if is_group {
const GROUP_BLOCKED_TOOLS: &[&str] = &["exec", "read", "write", "computer_use"];
all.retain(|t| !GROUP_BLOCKED_TOOLS.contains(&t.name.as_str()));
}
const CHANNEL_ACTION_TOOLS: &[&str] = &[
"telegram_actions",
"discord_actions",
"slack_actions",
"whatsapp_actions",
"feishu_actions",
"weixin_actions",
"qq_actions",
"dingtalk_actions",
];
let active_channel = session_key.split(':').nth(2).unwrap_or("");
all.retain(|t| {
let name = t.name.as_str();
if CHANNEL_ACTION_TOOLS.contains(&name) {
name == "channel_actions" || name.starts_with(active_channel)
} else {
true
}
});
all
};
let vision = model_supports_vision(&model, &self.config);
let session_messages = self.load_session(session_key);
let images = if vision {
images
} else {
if !images.is_empty() {
info!(
"model {model} does not support vision, stripping {} image(s)",
images.len()
);
}
vec![]
};
let compressed_images: Vec<_> = images
.iter()
.filter_map(|img| {
compress_image_for_llm(&img.data).map(|data| super::registry::ImageAttachment {
data,
mime_type: "image/jpeg".to_owned(),
})
})
.collect();
let content = if compressed_images.is_empty() && images.is_empty() {
MessageContent::Text(text.to_owned())
} else {
let imgs = if compressed_images.is_empty() {
&images
} else {
&compressed_images
};
let mut parts = vec![ContentPart::Text {
text: text.to_owned(),
}];
for img in imgs {
parts.push(ContentPart::Image {
url: img.data.clone(),
});
}
MessageContent::Parts(parts)
};
let user_msg = Message {
role: Role::User,
content,
};
let persist_msg = if images.is_empty() {
user_msg.clone()
} else {
Message {
role: Role::User,
content: MessageContent::Text(format!("{text} [image attached]")),
}
};
session_messages.push(persist_msg.clone());
let _ = self.store.db.append_message(
session_key,
&serde_json::to_value(&persist_msg).unwrap_or_default(),
);
let timeout_secs = self
.config
.agents
.defaults
.timeout_seconds
.unwrap_or(DEFAULT_TIMEOUT_SECONDS as u32) as u64;
let mut ctx = RunContext {
agent_id: self.handle.id.clone(),
session_key: session_key.to_owned(),
channel: channel.to_owned(),
peer_id: peer_id.to_owned(),
loop_detector: {
let ld_cfg = self
.config
.ext
.tools
.as_ref()
.and_then(|t| t.loop_detection.as_ref());
if ld_cfg.map(|c| c.enabled.unwrap_or(true)).unwrap_or(true) {
let window = ld_cfg.and_then(|c| c.window).unwrap_or(20);
let warning_threshold = ld_cfg.and_then(|c| c.threshold).unwrap_or(20);
let critical_threshold = warning_threshold
.saturating_add(10)
.max(warning_threshold + 1);
let overrides: std::collections::HashMap<String, (usize, usize)> = ld_cfg
.and_then(|c| c.overrides.clone())
.unwrap_or_default()
.into_iter()
.map(|(k, v)| (k, (v, v.saturating_add(10).max(v + 1))))
.collect();
LoopDetector::with_overrides(
window,
warning_threshold,
critical_threshold,
overrides,
)
} else {
LoopDetector::new(usize::MAX, usize::MAX)
}
},
has_images: !images.is_empty(),
user_msg_with_images: if !images.is_empty() {
Some(user_msg.clone())
} else {
None
},
};
let reply = time::timeout(
Duration::from_secs(timeout_secs),
self.agent_loop(&mut ctx, &model, &system_prompt, tools, extra_tools),
)
.await
.map_err(|_| {
anyhow!(
"agent `{}` turn timed out after {timeout_secs}s",
self.handle.id
)
})??;
if let Ok(mut status) = self.live_status.try_write() {
status.state = "idle".to_owned();
status.current_task.clear();
status.text_preview.clear();
}
self.append_transcript(session_key, text, &reply.text).await;
if let Some(ref mem) = self.memory
&& text.len() > 20
&& !reply.text.starts_with(NO_REPLY_TOKEN)
{
let doc_id = Uuid::new_v4().to_string();
let doc_scope = format!("agent:{}", self.handle.id);
let doc = MemoryDoc {
id: doc_id.clone(),
scope: doc_scope.clone(),
kind: "note".to_owned(),
text: text.to_owned(),
created_at: 0, accessed_at: 0,
access_count: 0,
importance: 0.5,
vector: vec![],
tier: Default::default(),
abstract_text: None,
overview_text: None,
};
let _ = mem.lock().await.add(doc).await;
if let Err(e) = self
.store
.search
.index_memory_doc(&doc_id, &doc_scope, "note", text)
{
tracing::warn!("BM25 index failed for auto-capture doc: {e:#}");
}
}
self.compact_if_needed(session_key, &model).await;
self.btw_manager.tick_turn(session_key).await;
self.fire_hook(
"after_turn",
json!({
"agent_id": self.handle.id,
"session_key": session_key,
"reply_len": reply.text.len(),
"is_empty": reply.is_empty,
}),
)
.await;
Ok(reply)
}
fn resolve_session_key<'a>(&'a self, session_key: &'a str) -> &'a str {
if let Some(canonical) = self.session_aliases.get(session_key) {
canonical.as_str()
} else {
session_key
}
}
fn load_session(&mut self, session_key: &str) -> &mut Vec<Message> {
if !self.sessions.contains_key(session_key) {
let history = self
.store
.db
.load_messages(session_key)
.unwrap_or_default()
.into_iter()
.filter_map(|v| serde_json::from_value::<Message>(v).ok())
.collect::<Vec<_>>();
self.sessions.insert(session_key.to_owned(), history);
}
self.sessions.get_mut(session_key).expect("just inserted")
}
async fn agent_loop(
&mut self,
ctx: &mut RunContext,
model: &str,
system_prompt: &str,
tools: Vec<ToolDef>,
extra_tools: Vec<ToolDef>,
) -> Result<AgentReply> {
let pruning_cfg = self.config.agents.defaults.context_pruning.clone();
let context_tokens = self
.handle
.config
.model
.as_ref()
.and_then(|m| m.context_tokens)
.or(self.config.agents.defaults.context_tokens)
.or_else(|| {
self.config
.agents
.defaults
.model
.as_ref()
.and_then(|m| m.context_tokens)
})
.unwrap_or(128_000) as usize;
let mut tool_images: Vec<String> = Vec::new();
loop {
if let Some(sess) = self.sessions.get_mut(&ctx.session_key) {
apply_context_pruning(sess, pruning_cfg.as_ref());
}
if let Some(sess) = self.sessions.get_mut(&ctx.session_key) {
apply_context_budget_trim(sess, context_tokens, system_prompt, &tools);
}
let messages = {
let mut raw = self
.sessions
.get(&ctx.session_key)
.cloned()
.unwrap_or_default();
if ctx.has_images {
if let Some(last) = raw.last_mut() {
if last.role == Role::User {
*last = ctx.user_msg_with_images.clone().unwrap_or(last.clone());
}
}
}
strip_old_images(raw)
};
let thinking_budget = {
let agent_thinking = self
.handle
.config
.model
.as_ref()
.and_then(|m| m.thinking.as_ref());
let default_thinking = self.config.agents.defaults.thinking.as_ref();
let tc = agent_thinking.or(default_thinking);
tc.and_then(|t| {
if let Some(budget) = t.budget_tokens {
return Some(budget);
}
if let Some(ref level) = t.level {
let b = level.budget_tokens();
if b > 0 {
return Some(b);
}
}
if t.enabled == Some(true) {
return Some(10240);
}
None
})
};
let msg_count = messages.len();
let approx_tokens: usize = messages.iter().map(msg_tokens).sum();
info!(session = %ctx.session_key, msg_count, approx_tokens, model = %model, "LLM call: context size");
let configured_max_tokens = {
let from_agent = self.handle.config.model.as_ref().and_then(|m| m.max_tokens);
let from_defaults = self
.config
.agents
.defaults
.model
.as_ref()
.and_then(|m| m.max_tokens);
let from_provider = {
let (provider, model_id) =
crate::provider::registry::ProviderRegistry::parse_model(&model);
self.config
.model
.models
.as_ref()
.and_then(|m| m.providers.get(provider))
.and_then(|p| p.models.as_ref())
.and_then(|models| models.iter().find(|m| m.id == model_id))
.and_then(|m| m.max_tokens)
.map(|v| v as u32)
};
from_agent
.or(from_defaults)
.or(from_provider)
.unwrap_or(65536)
};
let req = LlmRequest {
model: model.to_owned(),
messages,
tools: tools.clone(),
system: Some(system_prompt.to_owned()),
max_tokens: Some(configured_max_tokens),
temperature: None,
thinking_budget,
};
if let Ok(mut status) = self.live_status.try_write() {
status.state = "streaming".to_owned();
}
let providers = Arc::clone(&self.providers);
let mut stream = self.failover.call(req, &providers).await?;
let mut text_buf = String::new();
let mut tool_calls: Vec<(String, String, Value)> = Vec::new();
let mut delta_buf = String::new();
let mut last_delta_flush = std::time::Instant::now();
while let Some(event) = stream.next().await {
match event? {
StreamEvent::TextDelta(delta) => {
text_buf.push_str(&delta);
if text_buf.len() <= 250 {
if let Ok(mut status) = self.live_status.try_write() {
let preview = text_buf
.char_indices()
.nth(200)
.map(|(i, _)| &text_buf[..i])
.unwrap_or(&text_buf);
status.text_preview = preview.to_owned();
}
}
delta_buf.push_str(&delta);
let now = std::time::Instant::now();
let elapsed = now.duration_since(last_delta_flush);
if delta_buf.len() >= 80 || elapsed >= std::time::Duration::from_millis(150)
{
if let Some(ref bus) = self.event_bus {
let _ = bus.send(AgentEvent {
session_id: ctx.session_key.clone(),
agent_id: ctx.agent_id.clone(),
delta: std::mem::take(&mut delta_buf),
done: false,
});
}
last_delta_flush = now;
}
}
StreamEvent::ReasoningDelta(_) => {}
StreamEvent::ToolCall { id, name, input } => {
if !id.is_empty() && !name.is_empty() {
let loop_key = if name == "exec" {
let cmd =
input.get("command").and_then(|v| v.as_str()).unwrap_or("");
format!("exec:{cmd}")
} else {
name.clone()
};
if let Some(warning_msg) =
ctx.loop_detector.check(&loop_key).to_result()?
{
tracing::warn!(tool = %loop_key, "{}", warning_msg);
}
tool_calls.push((id, name, input));
} else if !id.is_empty() && name.is_empty() {
tool_calls.push((
id,
String::new(),
serde_json::Value::Object(Default::default()),
));
} else if let Some(last) = tool_calls.last_mut() {
if !name.is_empty() && last.1.is_empty() {
last.1 = name.clone();
}
if !input.is_null()
&& input != serde_json::Value::Object(Default::default())
{
if last.2 == serde_json::Value::Object(Default::default()) {
last.2 = input;
} else if let (Some(existing), Some(new_str)) =
(last.2.as_str(), input.as_str())
{
let merged = format!("{existing}{new_str}");
if let Some(repair) =
crate::agent::tool_call_repair::try_extract_usable_args(
&merged,
)
{
tracing::debug!(
repair_kind = ?repair.kind,
"streaming tool call: real-time repair succeeded"
);
last.2 = repair.args;
} else {
last.2 = serde_json::Value::String(merged);
}
}
}
}
}
StreamEvent::Done { .. } => {}
StreamEvent::Error(e) => {
return Err(anyhow!("LLM stream error: {e}"));
}
}
}
if !delta_buf.is_empty() {
if let Some(ref bus) = self.event_bus {
let _ = bus.send(AgentEvent {
session_id: ctx.session_key.clone(),
agent_id: ctx.agent_id.clone(),
delta: delta_buf,
done: false,
});
}
}
let pre_strip_len = text_buf.trim().len();
text_buf = crate::provider::openai::strip_think_tags_pub(&text_buf);
for (_id, _name, input) in &mut tool_calls {
if let serde_json::Value::String(s) = input {
tracing::info!(
args_len = s.len(),
args_start = ?s.chars().take(200).collect::<String>(),
args_end = ?s.chars().rev().take(200).collect::<String>().chars().rev().collect::<String>(),
"streaming tool call: accumulated args (start and end)"
);
let parsed = serde_json::from_str::<serde_json::Value>(&s);
match &parsed {
Ok(v) if v.is_object() => {
tracing::info!(
keys = ?v.as_object().map(|o| o.keys().collect::<Vec<_>>()),
"streaming tool call: parsed successfully"
);
*input = v.clone();
}
_ => {
match crate::agent::tool_call_repair::try_extract_usable_args(&s) {
Some(repair) => {
tracing::warn!(
args_len = s.len(),
repair_kind = ?repair.kind,
leading_prefix_len = repair.leading_prefix.len(),
trailing_suffix_len = repair.trailing_suffix.len(),
"streaming tool call: repaired malformed JSON"
);
*input = repair.args;
}
None => {
let is_truncated = {
let trimmed = s.trim();
let starts_with_json =
trimmed.starts_with('{') || trimmed.starts_with('[');
let ends_with_complete =
trimmed.ends_with('}') || trimmed.ends_with(']');
starts_with_json && !ends_with_complete
};
tracing::warn!(
args_len = s.len(),
is_truncated = is_truncated,
args_start = ?s.chars().take(100).collect::<String>(),
args_end = ?s.chars().rev().take(50).collect::<String>().chars().rev().collect::<String>(),
"streaming tool call: malformed JSON from model{}",
if is_truncated { " (DETECTED TRUNCATION)" } else { "" }
);
if is_truncated {
*input = serde_json::json!({
"content": s,
"_parse_error": format!(
"truncated: Your tool call was cut off at {} chars. \
Try again with shorter content, or split into multiple files.",
s.len()
),
});
} else {
*input = serde_json::json!({
"content": s,
"_parse_error": "Model sent malformed JSON arguments.",
});
}
}
}
}
}
}
}
tool_calls.retain(|(_, name, _)| !name.is_empty());
tracing::info!(
session = %ctx.session_key,
tool_call_count = tool_calls.len(),
text_len = text_buf.len(),
"agent_loop: stream finished"
);
if tool_calls.is_empty() {
let assistant_msg = Message {
role: Role::Assistant,
content: MessageContent::Text(text_buf.clone()),
};
let _ = self.store.db.append_message(
&ctx.session_key,
&serde_json::to_value(&assistant_msg).unwrap_or_default(),
);
if let Some(sess) = self.sessions.get_mut(&ctx.session_key) {
sess.push(assistant_msg);
}
if let Some(ref bus) = self.event_bus {
tracing::debug!(session = %ctx.session_key, "agent_loop: emitting done=true");
let _ = bus.send(AgentEvent {
session_id: ctx.session_key.clone(),
agent_id: ctx.agent_id.clone(),
delta: String::new(),
done: true,
});
}
let clean = text_buf.trim().to_uppercase();
let no_reply = clean.starts_with(NO_REPLY_TOKEN);
let is_empty = text_buf.trim().is_empty();
let final_text = if no_reply {
String::new()
} else if is_empty && pre_strip_len > 0 {
String::new()
} else if is_empty {
"[The model returned an empty response. Please try again or rephrase your message.]".to_owned()
} else {
text_buf
};
return Ok(AgentReply {
text: final_text,
is_empty: no_reply && tool_images.is_empty(),
tool_calls: None,
images: tool_images,
pending_analysis: None,
});
}
let mut parts: Vec<crate::provider::ContentPart> = Vec::new();
if !text_buf.is_empty() {
parts.push(crate::provider::ContentPart::Text { text: text_buf });
}
for (id, name, input) in &tool_calls {
parts.push(crate::provider::ContentPart::ToolUse {
id: id.clone(),
name: name.clone(),
input: input.clone(),
});
}
let assistant_msg = Message {
role: Role::Assistant,
content: MessageContent::Parts(parts),
};
let _ = self.store.db.append_message(
&ctx.session_key,
&serde_json::to_value(&assistant_msg).unwrap_or_default(),
);
if let Some(sess) = self.sessions.get_mut(&ctx.session_key) {
sess.push(assistant_msg);
}
let external_calls: Vec<(String, String, Value)> = tool_calls
.iter()
.filter(|(_, name, _)| extra_tools.iter().any(|t| &t.name == name))
.cloned()
.collect();
if !external_calls.is_empty() {
let oai_tool_calls: Vec<Value> = external_calls
.into_iter()
.map(|(id, name, input)| {
let arguments = if input.is_string() {
input.as_str().unwrap_or("{}").to_owned()
} else {
input.to_string()
};
json!({
"id": id,
"type": "function",
"function": {
"name": name,
"arguments": arguments
}
})
})
.collect();
return Ok(AgentReply {
text: String::new(),
is_empty: true,
tool_calls: Some(oai_tool_calls),
images: vec![],
pending_analysis: None,
});
}
for (tool_id, tool_name, tool_input) in tool_calls {
debug!(tool = %tool_name, "dispatching tool call");
if let Ok(mut status) = self.live_status.try_write() {
status.state = "tool_call".to_owned();
status.tool_history.push(tool_name.clone());
}
self.fire_hook(
"before_tool_call",
json!({
"agent_id": self.handle.id,
"tool": tool_name,
"input": tool_input,
}),
)
.await;
let result = self
.dispatch_tool(ctx, &tool_id, &tool_name, tool_input)
.await;
self.fire_hook(
"after_tool_call",
json!({
"agent_id": self.handle.id,
"tool": tool_name,
"ok": result.is_ok(),
}),
)
.await;
let (result_text, result_images) = match result {
Ok(v) => {
if let Some(img) = v.get("image").and_then(|i| i.as_str()) {
let desc = v.get("action").and_then(|a| a.as_str()).unwrap_or("tool");
(
format!(
"{{\"action\":\"{desc}\",\"status\":\"image sent to user\"}}"
),
vec![img.to_owned()],
)
} else {
(v.to_string(), vec![])
}
}
Err(e) => {
warn!(tool = %tool_name, "tool error: {e:#}");
(format!("{{\"error\":\"{}\"}}", e), vec![])
}
};
tool_images.extend(result_images);
let limits = self
.config
.ext
.tools
.as_ref()
.and_then(|t| t.session_result_limits.as_ref());
let max_chars = match tool_name.as_str() {
"web_search" => limits.and_then(|l| l.web_search).unwrap_or(2000),
"web_fetch" => limits.and_then(|l| l.web_fetch).unwrap_or(5000),
"exec" => limits.and_then(|l| l.exec).unwrap_or(3000),
_ => limits.and_then(|l| l.default).unwrap_or(3000),
};
let session_text = if result_text.len() > max_chars {
let truncated = &result_text[..result_text
.char_indices()
.nth(max_chars)
.map(|(i, _)| i)
.unwrap_or(result_text.len())];
format!(
"{truncated}\n[...truncated, {}/{} chars]",
max_chars,
result_text.len()
)
} else {
result_text.clone()
};
let tool_msg = Message {
role: Role::Tool,
content: MessageContent::Parts(vec![
crate::provider::ContentPart::ToolResult {
tool_use_id: tool_id.clone(),
content: session_text,
is_error: Some(false),
},
]),
};
let _ = self.store.db.append_message(
&ctx.session_key,
&serde_json::to_value(&tool_msg).unwrap_or_default(),
);
if let Some(sess) = self.sessions.get_mut(&ctx.session_key) {
sess.push(tool_msg);
}
}
}
}
async fn dispatch_tool(
&self,
ctx: &RunContext,
_id: &str,
name: &str,
args: Value,
) -> Result<Value> {
match name {
"memory" => return self.tool_memory_consolidated(ctx, args).await,
"session" => return self.tool_session_consolidated(ctx, args).await,
"agent" | "subagents" => return self.tool_agent_consolidated(args).await,
"channel" => return self.tool_channel_consolidated(args).await,
"memory_search" => {
return self
.tool_memory_consolidated(ctx, inject_action(args, "search"))
.await;
}
"memory_get" => {
return self
.tool_memory_consolidated(ctx, inject_action(args, "get"))
.await;
}
"memory_put" => {
return self
.tool_memory_consolidated(ctx, inject_action(args, "put"))
.await;
}
"memory_delete" => {
return self
.tool_memory_consolidated(ctx, inject_action(args, "delete"))
.await;
}
"sessions_send" => {
return self
.tool_session_consolidated(ctx, inject_action(args, "send"))
.await;
}
"sessions_list" => {
return self
.tool_session_consolidated(ctx, inject_action(args, "list"))
.await;
}
"sessions_history" => {
return self
.tool_session_consolidated(ctx, inject_action(args, "history"))
.await;
}
"session_status" => {
return self
.tool_session_consolidated(ctx, inject_action(args, "status"))
.await;
}
"agent_spawn" | "sessions_spawn" => {
return self
.tool_agent_consolidated(inject_action(args, "spawn"))
.await;
}
"agent_list" | "agents_list" => {
return self
.tool_agent_consolidated(inject_action(args, "list"))
.await;
}
"telegram_actions" => {
return self
.tool_channel_consolidated(inject_channel(args, "telegram"))
.await;
}
"discord_actions" => {
return self
.tool_channel_consolidated(inject_channel(args, "discord"))
.await;
}
"slack_actions" => {
return self
.tool_channel_consolidated(inject_channel(args, "slack"))
.await;
}
"whatsapp_actions" => {
return self
.tool_channel_consolidated(inject_channel(args, "whatsapp"))
.await;
}
"feishu_actions" => {
return self
.tool_channel_consolidated(inject_channel(args, "feishu"))
.await;
}
"weixin_actions" => {
return self
.tool_channel_consolidated(inject_channel(args, "wechat"))
.await;
}
"qq_actions" => {
return self
.tool_channel_consolidated(inject_channel(args, "qq"))
.await;
}
"dingtalk_actions" => {
return self
.tool_channel_consolidated(inject_channel(args, "dingtalk"))
.await;
}
"read" => return self.tool_read(args).await,
"write" => return self.tool_write(args).await,
"exec" => return self.tool_exec(args).await,
"web_search" => return self.tool_web_search(args).await,
"web_fetch" => return self.tool_web_fetch(args).await,
"web_browser" | "browser" => return self.tool_web_browser(args).await,
"computer_use" => return self.tool_computer_use(args).await,
"image" => return self.tool_image(args).await,
"pdf" => return self.tool_pdf(args).await,
"tts" => return self.tool_tts(args).await,
"message" => return self.tool_message(args).await,
"cron" => return self.tool_cron(args).await,
"gateway" => return self.tool_gateway(args).await,
"pairing" => return self.tool_pairing(args).await,
"opencode" => return self.tool_opencode(ctx, args).await,
_ => {}
}
if let Some(agent_id) = name.strip_prefix("agent_") {
return self.dispatch_a2a(ctx, agent_id, args).await;
}
if name.starts_with("mcp_") {
if let Some(ref mcp) = self.mcp
&& let Some(client) = mcp.find_for_tool(name).await
{
let prefix = format!("mcp_{}_", client.name);
let original_name = name.strip_prefix(&prefix).unwrap_or(name);
let result = client.call_tool(original_name, args).await?;
let text = result
.get("content")
.and_then(|c| c.as_array())
.map(|arr| {
arr.iter()
.filter_map(|item| item.get("text").and_then(|t| t.as_str()))
.collect::<Vec<_>>()
.join("\n")
})
.unwrap_or_else(|| result.to_string());
return Ok(serde_json::json!(text));
}
return Err(anyhow!("MCP tool `{name}` not found"));
}
let (skill_name, tool_name) = name.split_once('.').unwrap_or((name, name));
let Some(skill) = self.skills.get(skill_name) else {
return Err(anyhow!("unknown tool: `{name}`"));
};
let Some(spec) = skill.tools.iter().find(|t| t.name == tool_name) else {
return Err(anyhow!("skill `{}` has no tool `{tool_name}`", skill.name));
};
run_tool(spec, &skill.dir, args, &RunOptions::default()).await
}
async fn dispatch_a2a(&self, ctx: &RunContext, agent_id: &str, args: Value) -> Result<Value> {
let text = args["text"]
.as_str()
.ok_or_else(|| anyhow!("A2A: `text` argument required"))?
.to_owned();
if let Some(ref registry) = self.agents
&& let Ok(target) = registry.get(agent_id)
{
let child_session = format!("{}:a2a:{agent_id}", ctx.session_key);
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<AgentReply>();
let msg = AgentMessage {
session_key: child_session,
text,
channel: format!("a2a:{}", ctx.agent_id),
peer_id: ctx.agent_id.clone(),
chat_id: String::new(),
reply_tx,
extra_tools: vec![],
images: vec![],
files: vec![],
};
target
.tx
.send(msg)
.await
.map_err(|_| anyhow!("A2A: agent `{agent_id}` inbox closed"))?;
let a2a_timeout_secs =
self.config
.agents
.defaults
.timeout_seconds
.unwrap_or(DEFAULT_TIMEOUT_SECONDS as u32) as u64;
let reply = tokio::time::timeout(Duration::from_secs(a2a_timeout_secs), reply_rx)
.await
.map_err(|_| {
anyhow!("A2A: agent `{agent_id}` timed out after {a2a_timeout_secs}s")
})?
.map_err(|_| anyhow!("A2A: reply channel dropped"))?;
return Ok(Value::String(reply.text));
}
let normalized_id = agent_id.replace('-', "_");
if let Some(ext) = self
.config
.agents
.external
.iter()
.find(|e| e.id == agent_id || e.id == normalized_id)
{
use crate::a2a::client::A2aClient;
let client = A2aClient::new();
let remote_id = ext.remote_agent_id.as_deref().unwrap_or("");
let reply = client
.send_task(
&ext.url,
remote_id,
&text,
&ctx.session_key,
ext.auth_token.as_deref(),
)
.await
.map_err(|e| anyhow!("A2A remote `{agent_id}`: {e}"))?;
return Ok(Value::String(reply));
}
Err(anyhow!(
"A2A: agent `{agent_id}` not found locally or in external registry"
))
}
async fn tool_memory_search(&self, args: Value) -> Result<Value> {
let query = args["query"].as_str().unwrap_or("").to_owned();
let scope = args["scope"].as_str().map(str::to_owned);
let top_k = args["top_k"].as_u64().unwrap_or(5) as usize;
let Some(ref mem) = self.memory else {
return Ok(json!({"results": [], "note": "memory store not available"}));
};
let mut store = mem.lock().await;
let docs = store.search(&query, scope.as_deref(), top_k).await?;
let results: Vec<Value> = docs
.into_iter()
.map(
|d| json!({"id": d.id, "scope": d.scope, "kind": d.kind, "text": d.display_text()}),
)
.collect();
Ok(json!({"results": results}))
}
async fn tool_memory_get(&self, args: Value) -> Result<Value> {
let id = args["id"].as_str().unwrap_or("").to_owned();
let Some(ref mem) = self.memory else {
return Ok(json!({"error": "memory store not available"}));
};
let store = mem.lock().await;
match store.get(&id).await? {
Some(d) => Ok(json!({"id": d.id, "scope": d.scope, "kind": d.kind, "text": d.text})),
None => Ok(json!({"error": "not found", "id": id})),
}
}
async fn tool_memory_put(&self, ctx: &RunContext, args: Value) -> Result<Value> {
let text = args["text"].as_str().unwrap_or("").to_owned();
let scope = args["scope"].as_str().unwrap_or(&ctx.agent_id).to_owned();
let kind = args["kind"].as_str().unwrap_or("note").to_owned();
let id = args["id"]
.as_str()
.map(str::to_owned)
.unwrap_or_else(|| Uuid::new_v4().to_string());
let Some(ref mem) = self.memory else {
return Ok(json!({"error": "memory store not available"}));
};
let mut store = mem.lock().await;
store
.add(MemoryDoc {
id: id.clone(),
scope: scope.clone(),
kind: kind.clone(),
text: text.clone(),
vector: vec![],
created_at: 0,
accessed_at: 0,
access_count: 0,
importance: 0.5,
tier: Default::default(),
abstract_text: None,
overview_text: None,
})
.await?;
drop(store);
if let Err(e) = self
.store
.search
.index_memory_doc(&id, &scope, &kind, &text)
{
tracing::warn!("BM25 index failed for memory_put doc: {e:#}");
}
Ok(json!({"stored": true, "id": id}))
}
async fn tool_memory_delete(&self, args: Value) -> Result<Value> {
let id = args["id"]
.as_str()
.ok_or_else(|| anyhow!("memory_delete: `id` required"))?
.to_owned();
let Some(ref mem) = self.memory else {
return Ok(json!({"error": "memory store not available"}));
};
mem.lock().await.delete(&id).await?;
if let Err(e) = self
.store
.search
.delete_document(&id)
.and_then(|_| self.store.search.commit())
{
tracing::warn!("BM25 delete failed for doc {id}: {e:#}");
}
Ok(json!({"deleted": true, "id": id}))
}
async fn tool_read(&self, args: Value) -> Result<Value> {
let path = args["path"]
.as_str()
.or_else(|| args["file_path"].as_str())
.or_else(|| args["filename"].as_str())
.or_else(|| args["file"].as_str())
.ok_or_else(|| anyhow!("read: `path` required"))?;
let workspace = self
.handle
.config
.workspace
.as_deref()
.or(self.config.agents.defaults.workspace.as_deref())
.map(expand_tilde)
.unwrap_or_else(|| std::path::PathBuf::from("."));
let path_normalized = path.replace('/', std::path::MAIN_SEPARATOR.to_string().as_str());
let path_buf = std::path::PathBuf::from(&path_normalized);
let full = if path_buf.is_absolute() {
path_buf
} else {
workspace.join(&path_normalized)
};
let safety_enabled = self
.config
.ext
.tools
.as_ref()
.and_then(|t| t.exec.as_ref())
.and_then(|e| e.safety)
.unwrap_or(false);
if safety_enabled {
check_read_safety(path, &full)?;
}
let lower = path.to_lowercase();
if lower.ends_with(".pdf") {
let pdf_bytes = tokio::fs::read(&full)
.await
.map_err(|e| anyhow!("read `{}`: {e}", full.display()))?;
let content = match pdf_extract::extract_text_from_mem(&pdf_bytes) {
Ok(text) => text,
Err(e) => {
tracing::warn!("pdf-extract failed ({e}), trying pdftotext CLI");
let output = tokio::process::Command::new("pdftotext")
.args([full.to_str().unwrap_or(""), "-"])
.output()
.await
.map_err(|e2| {
anyhow!(
"read `{}`: pdf extraction failed: {e}, pdftotext: {e2}",
full.display()
)
})?;
if !output.status.success() {
anyhow::bail!("read `{}`: pdf extraction failed: {e}", full.display());
}
String::from_utf8_lossy(&output.stdout).to_string()
}
};
return Ok(json!({"content": content, "path": path}));
}
if lower.ends_with(".docx") || lower.ends_with(".xlsx") || lower.ends_with(".pptx") {
let bytes = tokio::fs::read(&full)
.await
.map_err(|e| anyhow!("read `{}`: {e}", full.display()))?;
if let Some(text) = crate::channel::extract_office_text(path, &bytes) {
return Ok(json!({"content": text, "path": path}));
}
anyhow::bail!("read `{}`: failed to extract office text", full.display());
}
let content = tokio::fs::read_to_string(&full)
.await
.map_err(|e| anyhow!("read `{}`: {e}", full.display()))?;
Ok(json!({"content": content, "path": path}))
}
async fn tool_write(&self, args: Value) -> Result<Value> {
if let Some(parse_error) = args.get("_parse_error").and_then(|v| v.as_str()) {
tracing::warn!("tool_write: received malformed JSON from model");
let is_truncated = parse_error.starts_with("truncated:");
return Ok(json!({
"error": if is_truncated { "Your tool call was truncated by the API." } else { "Your tool call contained malformed JSON arguments." },
"details": parse_error,
"hint": if is_truncated {
"The API truncated your response. Split into multiple smaller writes (under 3500 chars each)."
} else {
"Ensure all quotes/backslashes are escaped and JSON is complete."
}
}));
}
let path = args["path"]
.as_str()
.or_else(|| args["file_path"].as_str())
.or_else(|| args["filename"].as_str())
.or_else(|| args["file"].as_str())
.or_else(|| args.as_str());
let content = args["content"].as_str();
if path.is_none() || path.map(|p| p.is_empty()).unwrap_or(true) {
let has_content = content.map(|c| !c.is_empty()).unwrap_or(false);
tracing::warn!(has_content, "tool_write: missing path parameter");
return Ok(json!({
"error": "Missing 'path' parameter. The write tool requires BOTH 'path' and 'content'.",
"hint": "Retry with: {\"path\": \"file.py\", \"content\": \"...\"}"
}));
}
if content.is_none() {
tracing::warn!("tool_write: missing content parameter");
return Ok(json!({
"error": "Missing 'content' parameter.",
"hint": "Provide a 'content' parameter with the text to write."
}));
}
let path = path.unwrap().to_owned();
let content = content.unwrap().to_owned();
let workspace = self
.handle
.config
.workspace
.as_deref()
.or(self.config.agents.defaults.workspace.as_deref())
.map(expand_tilde)
.unwrap_or_else(|| std::path::PathBuf::from("."));
let path_normalized = path.replace('/', std::path::MAIN_SEPARATOR.to_string().as_str());
let path_buf = std::path::PathBuf::from(&path_normalized);
let full = if path_buf.is_absolute() {
path_buf
} else {
workspace.join(&path_normalized)
};
let safety_enabled = self
.config
.ext
.tools
.as_ref()
.and_then(|t| t.exec.as_ref())
.and_then(|e| e.safety)
.unwrap_or(false);
if safety_enabled {
check_write_safety(&path, &full, &content)?;
}
if let Some(parent) = full.parent() {
tokio::fs::create_dir_all(parent).await?;
}
tokio::fs::write(&full, &content)
.await
.map_err(|e| anyhow!("write `{}`: {e}", full.display()))?;
Ok(json!({"written": true, "path": path, "bytes": content.len()}))
}
async fn compact_if_needed(&mut self, session_key: &str, model: &str) {
use crate::config::schema::CompactionMode;
let Some(cfg) = self.config.agents.defaults.compaction.clone() else {
return;
};
let token_threshold = cfg
.reserve_tokens_floor
.map(|t| t as usize)
.unwrap_or(8_000);
let max_turns: u32 = 20;
let max_elapsed_secs: u64 = 30 * 60;
let total_tokens: usize = self
.sessions
.get(session_key)
.map(|msgs| msgs.iter().map(msg_tokens).sum())
.unwrap_or(0);
let (last_compaction, turns) = self
.compaction_state
.get(session_key)
.copied()
.unwrap_or((std::time::Instant::now(), 0));
let token_trigger = total_tokens > token_threshold;
let turn_trigger = turns >= max_turns;
let time_trigger = last_compaction.elapsed().as_secs() >= max_elapsed_secs
&& total_tokens > token_threshold / 2;
debug!(
session = session_key,
total_tokens,
token_threshold,
turns,
token_trigger,
turn_trigger,
time_trigger,
"compaction check"
);
if !token_trigger && !turn_trigger && !time_trigger {
self.compaction_state
.entry(session_key.to_owned())
.and_modify(|(_, t)| *t += 1)
.or_insert((std::time::Instant::now(), 1));
return;
}
let trigger_reason = if token_trigger {
"tokens"
} else if turn_trigger {
"turns"
} else {
"time"
};
info!(
session = session_key,
trigger = trigger_reason,
total_tokens,
turns,
"compaction triggered"
);
let mode = cfg
.mode
.as_ref()
.cloned()
.unwrap_or(CompactionMode::Layered);
let compaction_model = cfg.model.as_deref().unwrap_or(model);
let configured_pairs = cfg.keep_recent_pairs.unwrap_or(5) as usize;
let keep_pairs = if total_tokens > token_threshold * 3 {
1.max(configured_pairs / 3) } else if total_tokens > token_threshold * 2 {
1.max(configured_pairs / 2) } else {
configured_pairs };
let extract_facts = cfg.extract_facts.unwrap_or(true);
let msgs_to_text = |msgs: &[Message]| -> String {
msgs.iter()
.map(|m| {
let role = format!("{:?}", m.role).to_lowercase();
let body = match &m.content {
MessageContent::Text(t) => t.clone(),
MessageContent::Parts(parts) => parts
.iter()
.filter_map(|p| match p {
ContentPart::Text { text } => Some(text.as_str()),
_ => None,
})
.collect::<Vec<_>>()
.join(" "),
};
format!("{role}: {body}")
})
.collect::<Vec<_>>()
.join("\n")
};
let (old_text, recent_msgs) = if mode == CompactionMode::Layered {
let msgs = self.sessions.get(session_key).cloned().unwrap_or_default();
let mut pair_count = 0usize;
let mut split_idx = msgs.len();
let mut i = msgs.len();
while i > 0 && pair_count < keep_pairs {
i -= 1;
if msgs[i].role == Role::User {
pair_count += 1;
split_idx = i;
}
}
let old_portion = &msgs[..split_idx];
let recent = msgs[split_idx..].to_vec();
if old_portion.is_empty() {
return;
}
(msgs_to_text(old_portion), recent)
} else {
let msgs = self.sessions.get(session_key).cloned().unwrap_or_default();
(msgs_to_text(&msgs), vec![])
};
let summary = match mode {
CompactionMode::Default | CompactionMode::Layered => {
self.compact_single(compaction_model, &old_text).await
}
CompactionMode::Safeguard => {
const CHUNK_SIZE: usize = 40_000;
let chunks: Vec<&str> = old_text
.as_bytes()
.chunks(CHUNK_SIZE)
.map(|c| std::str::from_utf8(c).unwrap_or(""))
.collect();
let mut combined = String::new();
for chunk in chunks {
match self.compact_single(compaction_model, chunk).await {
Some(s) => {
combined.push_str(&s);
combined.push('\n');
}
None => return,
}
}
if combined.is_empty() {
None
} else {
Some(combined)
}
}
};
let Some(summary) = summary else { return };
if extract_facts {
if let Some(facts) = self.extract_key_facts(compaction_model, &old_text).await {
if let Some(ref mem) = self.memory {
let scope = format!("agent:{}", self.handle.id);
let mut guard = mem.lock().await;
for fact in facts.lines().filter(|l| !l.trim().is_empty()) {
let fact_text = fact.trim_start_matches("- ").trim();
if fact_text.len() > 5 {
let doc = crate::agent::memory::MemoryDoc {
id: format!("cf-{}", uuid::Uuid::new_v4()),
scope: scope.clone(),
kind: "compaction_fact".to_owned(),
text: fact_text.to_owned(),
vector: vec![],
created_at: 0, accessed_at: 0,
access_count: 0,
importance: 0.7, tier: Default::default(),
abstract_text: None,
overview_text: None,
};
let _ = guard.add(doc).await;
}
}
drop(guard);
debug!(
session = session_key,
"key facts extracted to long-term memory"
);
}
}
}
if let Some(sess) = self.sessions.get_mut(session_key) {
let compacted = Message {
role: Role::User,
content: MessageContent::Text(format!(
"[Conversation history compacted — summary follows]\n{summary}"
)),
};
sess.clear();
sess.push(compacted);
sess.extend(recent_msgs);
}
self.compaction_state
.insert(session_key.to_owned(), (std::time::Instant::now(), 0));
if let Some(sess) = self.sessions.get(session_key) {
let _ = self.store.db.delete_session(session_key);
for msg in sess.iter() {
let val = serde_json::to_value(msg).unwrap_or_default();
let _ = self.store.db.append_message(session_key, &val);
}
}
let new_tokens: usize = self
.sessions
.get(session_key)
.map(|msgs| msgs.iter().map(msg_tokens).sum())
.unwrap_or(0);
info!(
session = session_key,
tokens_before = total_tokens,
tokens_after = new_tokens,
keep_pairs,
"auto-compaction complete (layered)"
);
self.append_transcript(
session_key,
"[auto-compaction triggered]",
&format!("[summary: {summary}]"),
)
.await;
}
async fn compact_single(&mut self, model: &str, history: &str) -> Option<String> {
let req = LlmRequest {
model: model.to_owned(),
messages: vec![Message {
role: Role::User,
content: MessageContent::Text(format!(
"Summarise the following conversation history concisely. \
Preserve all important facts, decisions, file paths, IDs, \
and context needed to continue the conversation:\n\n{history}"
)),
}],
tools: vec![],
system: Some(
"You are a conversation summarizer. Produce a dense, accurate summary.".to_owned(),
),
max_tokens: Some(1024), temperature: None,
thinking_budget: None,
};
let providers = Arc::clone(&self.providers);
let mut stream = match self.failover.call(req, &providers).await {
Ok(s) => s,
Err(e) => {
warn!("compaction LLM call failed: {e:#}");
return None;
}
};
let mut summary = String::new();
while let Some(event) = stream.next().await {
match event {
Ok(StreamEvent::TextDelta(d)) => summary.push_str(&d),
Ok(StreamEvent::ReasoningDelta(_)) => {} Ok(StreamEvent::Done { .. }) | Ok(StreamEvent::Error(_)) => break,
Ok(StreamEvent::ToolCall { .. }) => {} Err(e) => {
warn!("compaction stream error: {e:#}");
return None;
}
}
}
if summary.is_empty() {
None
} else {
Some(summary)
}
}
async fn extract_key_facts(&mut self, model: &str, history: &str) -> Option<String> {
let input = if history.len() > 60_000 {
let mut end = 60_000;
while end < history.len() && !history.is_char_boundary(end) {
end += 1;
}
&history[..end]
} else {
history
};
let req = LlmRequest {
model: model.to_owned(),
messages: vec![Message {
role: Role::User,
content: MessageContent::Text(format!(
"Extract the key facts from this conversation that should be remembered \
long-term. Output ONLY a bullet list (one fact per line, prefixed with \
'- '). Include: names, user IDs, chat IDs, important decisions, file \
paths, URLs, preferences, and action items. Be concise. Skip ephemeral \
chit-chat.\n\n{input}"
)),
}],
tools: vec![],
system: Some(
"You extract key facts from conversations. Output only a bullet list.".to_owned(),
),
max_tokens: Some(1024),
temperature: None,
thinking_budget: None,
};
let providers = Arc::clone(&self.providers);
let mut stream = match self.failover.call(req, &providers).await {
Ok(s) => s,
Err(e) => {
warn!("key fact extraction failed: {e:#}");
return None;
}
};
let mut result = String::new();
while let Some(event) = stream.next().await {
match event {
Ok(StreamEvent::TextDelta(d)) => result.push_str(&d),
Ok(StreamEvent::Done { .. }) | Ok(StreamEvent::Error(_)) => break,
_ => {}
}
}
if result.is_empty() {
None
} else {
Some(result)
}
}
async fn append_transcript(&self, session_key: &str, user_text: &str, assistant_text: &str) {
let transcripts_dir = dirs_next::home_dir()
.unwrap_or_default()
.join(".rsclaw/transcripts");
let safe_key: String = session_key
.chars()
.map(|c| {
if c.is_alphanumeric() || c == '-' {
c
} else {
'_'
}
})
.collect();
let path = transcripts_dir.join(format!("{safe_key}.jsonl"));
if let Err(e) = tokio::fs::create_dir_all(&transcripts_dir).await {
warn!("transcript mkdir: {e:#}");
return;
}
let ts = Utc::now().to_rfc3339();
let mut lines = String::new();
for (role, content) in [("user", user_text), ("assistant", assistant_text)] {
let entry = json!({
"role": role,
"content": content,
"session": session_key,
"agent": self.handle.id,
"ts": ts,
});
if let Ok(s) = serde_json::to_string(&entry) {
lines.push_str(&s);
lines.push('\n');
}
}
match tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.await
{
Ok(mut f) => {
if let Err(e) = f.write_all(lines.as_bytes()).await {
warn!("transcript write: {e:#}");
}
}
Err(e) => warn!("transcript open: {e:#}"),
}
}
async fn tool_exec(&self, args: Value) -> Result<Value> {
tracing::debug!(?args, "tool_exec called");
let command = if let Some(cmd) = args["command"].as_str() {
cmd.to_owned()
} else if let Some(cmd) = args["cmd"].as_str() {
let cmd_args = args["args"]
.as_array()
.map(|a| {
a.iter()
.filter_map(|v| v.as_str())
.collect::<Vec<_>>()
.join(" ")
})
.unwrap_or_default();
if cmd_args.is_empty() {
cmd.to_owned()
} else {
format!("{cmd} {cmd_args}")
}
} else {
bail!("exec: `command` required");
};
let command = command.as_str();
let safety_enabled = self
.config
.ext
.tools
.as_ref()
.and_then(|t| t.exec.as_ref())
.and_then(|e| e.safety)
.unwrap_or(false);
if safety_enabled {
let preparse = crate::agent::preparse::PreParseEngine::load_with_safety(true);
match preparse.check_exec_safety(command) {
crate::agent::preparse::SafetyCheck::Allow => {}
crate::agent::preparse::SafetyCheck::Deny(reason) => {
bail!("[blocked] {reason}");
}
crate::agent::preparse::SafetyCheck::Confirm(reason) => {
bail!("[needs confirmation] {reason}. Command: {command}");
}
}
}
let (shell, shell_flag) = if cfg!(target_os = "windows") {
("cmd", "/C")
} else {
("sh", "-c")
};
let workspace = self
.handle
.config
.workspace
.as_deref()
.or(self.config.agents.defaults.workspace.as_deref())
.map(expand_tilde)
.unwrap_or_else(|| crate::config::loader::base_dir().join("workspace"));
if safety_enabled {
let cmd_tokens: Vec<&str> = command.split_whitespace().collect();
const INTERPRETERS: &[&str] = &[
"bash",
"sh",
"zsh",
"fish",
"dash",
"csh",
"tcsh",
"python",
"python3",
"python2",
"ruby",
"perl",
"node",
"bun",
"deno",
"powershell",
"pwsh",
];
if let Some(first) = cmd_tokens.first() {
if INTERPRETERS
.iter()
.any(|i| first.ends_with(i) || *first == *i)
{
if let Some(file_arg) = cmd_tokens.get(1) {
let file_path = std::path::Path::new(file_arg);
let resolved = if file_path.is_absolute() {
file_path.to_path_buf()
} else {
workspace.join(file_path)
};
check_file_content_safety(&resolved)?;
}
}
}
let ws_canon = if workspace.exists() {
std::fs::canonicalize(&workspace).unwrap_or_else(|_| workspace.clone())
} else {
workspace.clone()
};
for token in command.split_whitespace() {
if token.starts_with('/') || token.contains("..") {
let resolved = if token.starts_with('/') {
std::path::PathBuf::from(token)
} else {
workspace.join(token)
};
let canon = if resolved.exists() {
std::fs::canonicalize(&resolved).unwrap_or_else(|_| resolved.clone())
} else {
resolved.clone()
};
if !canon.starts_with(&ws_canon) {
bail!("[sandbox] access denied: path `{token}` is outside workspace");
}
}
}
}
tracing::info!(cwd = %workspace.display(), command = %command, "exec: executing");
let output = tokio::process::Command::new(shell)
.args(&[shell_flag, command])
.current_dir(&workspace)
.output()
.await
.map_err(|e| anyhow!("exec `{command}`: {e}"))?;
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
tracing::info!(cwd = %workspace.display(), command = %command, exit_code = ?output.status.code(), stdout_len = stdout.len(), stderr_len = stderr.len(), "exec: done");
Ok(json!({
"exit_code": output.status.code(),
"stdout": stdout,
"stderr": stderr,
}))
}
async fn tool_agent_spawn(&self, args: Value) -> Result<Value> {
let spawner = self
.spawner
.as_ref()
.ok_or_else(|| anyhow!("agent_spawn: spawner not available"))?;
let id = args["id"]
.as_str()
.ok_or_else(|| anyhow!("agent_spawn: `id` required"))?
.to_owned();
let model = args["model"]
.as_str()
.ok_or_else(|| anyhow!("agent_spawn: `model` required"))?
.to_owned();
let system = args["system"]
.as_str()
.ok_or_else(|| anyhow!("agent_spawn: `system` required"))?
.to_owned();
use crate::config::schema::{AgentEntry, ModelConfig};
let entry = AgentEntry {
id: id.clone(),
default: Some(false),
workspace: Some(crate::config::loader::path_to_forward_slash(
&dirs_next::home_dir()
.unwrap_or_default()
.join(format!(".rsclaw/workspace/{id}")),
)),
model: Some(ModelConfig {
primary: Some(model),
fallbacks: None,
image_fallbacks: None,
thinking: None,
tools_enabled: None,
toolset: None,
tools: None,
context_tokens: None,
max_tokens: None,
}),
lane: None,
lane_concurrency: None,
group_chat: None,
channels: None,
name: None,
agent_dir: None,
system: None,
commands: None,
allowed_commands: None,
opencode: None,
};
spawner.spawn_agent(entry)?;
let ws_path = dirs_next::home_dir()
.unwrap_or_default()
.join(format!(".rsclaw/workspace/{id}"));
let _ = tokio::fs::create_dir_all(&ws_path).await;
let soul_path = ws_path.join("SOUL.md");
let _ = tokio::fs::write(&soul_path, format!("# Agent: {id}\n\n{system}\n")).await;
Ok(json!({
"spawned": id,
"model": args["model"],
"status": "ready"
}))
}
async fn tool_agent_list(&self) -> Result<Value> {
let agents = match &self.agents {
Some(reg) => reg
.all()
.iter()
.map(|h| {
json!({
"id": h.id,
"model": h.config.model.as_ref()
.and_then(|m| m.primary.as_deref())
.unwrap_or("unknown"),
})
})
.collect::<Vec<_>>(),
None => vec![],
};
Ok(json!({"agents": agents}))
}
async fn tool_web_search(&self, args: Value) -> Result<Value> {
let query = args["query"]
.as_str()
.ok_or_else(|| anyhow!("web_search: `query` required"))?;
let ws_cfg = self
.config
.ext
.tools
.as_ref()
.and_then(|t| t.web_search.as_ref());
let limit = args["limit"]
.as_u64()
.unwrap_or_else(|| ws_cfg.and_then(|c| c.max_results).unwrap_or(5) as u64)
as usize;
let provider_raw = args["provider"].as_str().unwrap_or("");
let provider = match provider_raw {
"auto-detect" | "auto" | "default" | "none" => "",
other => other,
};
let resolve_key = |cfg_key: Option<&crate::config::schema::SecretOrString>,
env_name: &str|
-> Option<String> {
cfg_key
.and_then(|k| k.resolve_early())
.or_else(|| std::env::var(env_name).ok())
.filter(|k| !k.is_empty())
};
let brave_key = resolve_key(
ws_cfg.and_then(|c| c.brave_api_key.as_ref()),
"BRAVE_API_KEY",
);
let google_key = resolve_key(
ws_cfg.and_then(|c| c.google_api_key.as_ref()),
"GOOGLE_SEARCH_API_KEY",
);
let google_cx = ws_cfg
.and_then(|c| c.google_cx.clone())
.or_else(|| std::env::var("GOOGLE_SEARCH_CX").ok());
let bing_key = resolve_key(ws_cfg.and_then(|c| c.bing_api_key.as_ref()), "BING_API_KEY");
let chosen = if !provider.is_empty() {
provider.to_owned()
} else if let Some(default) = ws_cfg.and_then(|c| c.provider.as_deref()) {
default.to_owned()
} else if brave_key.is_some() {
"brave".to_owned()
} else if google_key.is_some() && google_cx.is_some() {
"google".to_owned()
} else if bing_key.is_some() {
"bing".to_owned()
} else {
"bing-free".to_owned()
};
let client = reqwest::Client::builder()
.user_agent("Mozilla/5.0 (compatible; rsclaw/1.0)")
.timeout(Duration::from_secs(15))
.build()?;
let results: Vec<Value> = match chosen.as_str() {
"duckduckgo" => {
let base = search_engine_url("duckduckgo");
let url = format!(
"{}?q={}",
if base.is_empty() {
"https://html.duckduckgo.com/html/"
} else {
base
},
urlencoding::encode(query)
);
let html = client.get(&url).send().await?.text().await?;
parse_ddg_results(&html, limit)
}
"google" => {
let (key, cx) = match (google_key, google_cx) {
(Some(k), Some(c)) => (k, c),
_ => {
tracing::warn!(
"web_search: google credentials incomplete, falling back to DuckDuckGo"
);
let url = format!(
"{}?q={}",
{
let b = search_engine_url("duckduckgo");
if b.is_empty() {
"https://html.duckduckgo.com/html/"
} else {
b
}
},
urlencoding::encode(query)
);
let html = client.get(&url).send().await?.text().await?;
return Ok(
json!({"results": parse_ddg_results(&html, limit), "provider": "duckduckgo (fallback)"}),
);
}
};
let base = search_engine_url("google");
let resp: Value = client
.get(if base.is_empty() {
"https://www.googleapis.com/customsearch/v1"
} else {
base
})
.query(&[
("key", key.as_str()),
("cx", cx.as_str()),
("q", query),
("num", &limit.min(10).to_string()),
])
.send()
.await?
.json()
.await?;
resp["items"]
.as_array()
.map(|arr| {
arr.iter()
.take(limit)
.map(|item| {
json!({
"title": item["title"].as_str().unwrap_or(""),
"url": item["link"].as_str().unwrap_or(""),
"snippet": item["snippet"].as_str().unwrap_or("")
})
})
.collect()
})
.unwrap_or_default()
}
"bing" => {
let key = bing_key.ok_or_else(|| anyhow!("web_search: bing API key not set (config tools.webSearch.bingApiKey or env BING_API_KEY)"))?;
let base = search_engine_url("bing");
let resp: Value = client
.get(if base.is_empty() {
"https://api.bing.microsoft.com/v7.0/search"
} else {
base
})
.query(&[("q", query), ("count", &limit.to_string())])
.header("Ocp-Apim-Subscription-Key", &key)
.send()
.await?
.json()
.await?;
resp["webPages"]["value"]
.as_array()
.map(|arr| {
arr.iter()
.take(limit)
.map(|item| {
json!({
"title": item["name"].as_str().unwrap_or(""),
"url": item["url"].as_str().unwrap_or(""),
"snippet": item["snippet"].as_str().unwrap_or("")
})
})
.collect()
})
.unwrap_or_default()
}
"brave" => {
let key = brave_key.ok_or_else(|| anyhow!("web_search: brave API key not set (config tools.webSearch.braveApiKey or env BRAVE_API_KEY)"))?;
let base = search_engine_url("brave");
let resp: Value = client
.get(if base.is_empty() {
"https://api.search.brave.com/res/v1/web/search"
} else {
base
})
.query(&[("q", query), ("count", &limit.to_string())])
.header("X-Subscription-Token", &key)
.send()
.await?
.json()
.await?;
resp["web"]["results"]
.as_array()
.map(|arr| {
arr.iter()
.take(limit)
.map(|item| {
json!({
"title": item["title"].as_str().unwrap_or(""),
"url": item["url"].as_str().unwrap_or(""),
"snippet": item["description"].as_str().unwrap_or("")
})
})
.collect()
})
.unwrap_or_default()
}
"bing-free" => {
let lang = self
.config
.raw
.gateway
.as_ref()
.and_then(|g| g.language.as_deref())
.unwrap_or("");
let mkt = lang_to_bing_mkt(lang);
let mkt_param = if mkt.is_empty() {
String::new()
} else {
format!("&mkt={mkt}&setlang={}", &mkt[..2])
};
let url = format!(
"https://www.bing.com/search?q={}&count={limit}{mkt_param}",
urlencoding::encode(query)
);
let html = client
.get(&url)
.header(
"User-Agent",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
)
.send()
.await?
.text()
.await?;
parse_bing_html_results(&html, limit)
}
"baidu" => {
let url = format!(
"https://www.baidu.com/s?wd={}&rn={limit}",
urlencoding::encode(query)
);
let html = client
.get(&url)
.header(
"User-Agent",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
)
.send()
.await?
.text()
.await?;
parse_baidu_results(&html, limit)
}
"sogou" => {
let url = format!(
"https://www.sogou.com/web?query={}&num={limit}",
urlencoding::encode(query)
);
let html = client
.get(&url)
.header(
"User-Agent",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
)
.send()
.await?
.text()
.await?;
parse_sogou_results(&html, limit)
}
"360" => {
let url = format!("https://www.so.com/s?q={}", urlencoding::encode(query));
let html = client
.get(&url)
.header(
"User-Agent",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
)
.send()
.await?
.text()
.await?;
parse_360_results(&html, limit)
}
other => return Err(anyhow!("web_search: unknown provider `{other}`")),
};
if results.is_empty() && chosen == "duckduckgo" {
tracing::warn!("web_search: DuckDuckGo returned 0 results, falling back to bing-free");
let lang = self
.config
.raw
.gateway
.as_ref()
.and_then(|g| g.language.as_deref())
.unwrap_or("");
let mkt = lang_to_bing_mkt(lang);
let mkt_param = if mkt.is_empty() {
String::new()
} else {
format!("&mkt={mkt}&setlang={}", &mkt[..2])
};
let url = format!(
"https://www.bing.com/search?q={}&count={limit}{mkt_param}",
urlencoding::encode(query)
);
let html = client
.get(&url)
.header(
"User-Agent",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
)
.send()
.await?
.text()
.await?;
let fallback = parse_bing_html_results(&html, limit);
if !fallback.is_empty() {
return Ok(json!({ "results": fallback, "provider": "bing-free (fallback)" }));
}
}
Ok(json!({ "results": results }))
}
async fn tool_web_fetch(&self, args: Value) -> Result<Value> {
let url = args["url"]
.as_str()
.ok_or_else(|| anyhow!("web_fetch: `url` required"))?;
let client = reqwest::Client::builder()
.user_agent(
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) \
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
)
.timeout(Duration::from_secs(30))
.redirect(reqwest::redirect::Policy::limited(10))
.build()?;
let html = client.get(url).send().await?.text().await?;
let title = extract_html_title(&html);
let text = strip_html(&html);
let truncated: String = if text.chars().count() > 50_000 {
text.chars().take(50_000).collect()
} else {
text.clone()
};
Ok(json!({
"url": url,
"title": title,
"text": truncated,
"length": truncated.len()
}))
}
async fn tool_web_browser(&self, args: Value) -> Result<Value> {
let action = args
.get("action")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow!("web_browser: `action` required"))?;
let session = self
.browser
.get_or_try_init(|| async {
let chrome_path = self.config.ext.tools.as_ref()
.and_then(|t| t.web_browser.as_ref())
.and_then(|b| b.chrome_path.clone())
.or_else(|| detect_chrome())
.ok_or_else(|| anyhow!(
"Chrome/Chromium not found. Install Chrome or set browser.chrome_path in config."
))?;
crate::browser::can_launch_chrome()?;
let bs = crate::browser::BrowserSession::start(&chrome_path).await?;
Ok::<_, anyhow::Error>(Mutex::new(bs))
})
.await?;
let mut browser = session.lock().await;
browser.execute(action, &args).await
}
async fn tool_computer_use(&self, args: Value) -> Result<Value> {
let action = args["action"]
.as_str()
.ok_or_else(|| anyhow!("computer_use: `action` required"))?;
let is_macos = cfg!(target_os = "macos");
let is_windows = cfg!(target_os = "windows");
match action {
"screenshot" => {
let tmp_path = std::env::temp_dir().join("rsclaw_screen.png");
let tmp_path_str = tmp_path.to_string_lossy().to_string();
let output = if is_macos {
tokio::process::Command::new("screencapture")
.args(["-x", &tmp_path_str])
.output()
.await
} else if is_windows {
let script = format!(
r#"
Add-Type -AssemblyName System.Windows.Forms
Add-Type -AssemblyName System.Drawing
$screen = [System.Windows.Forms.Screen]::PrimaryScreen
$bounds = $screen.Bounds
$bitmap = New-Object System.Drawing.Bitmap($bounds.Width, $bounds.Height)
$graphics = [System.Drawing.Graphics]::FromImage($bitmap)
$graphics.CopyFromScreen($bounds.Location, [System.Drawing.Point]::Empty, $bounds.Size)
$bitmap.Save('{}')
$graphics.Dispose()
$bitmap.Dispose()
"#,
tmp_path_str
);
tokio::process::Command::new("powershell")
.args(["-NoProfile", "-Command", &script])
.output()
.await
} else {
let res = tokio::process::Command::new("scrot")
.arg(&tmp_path_str)
.output()
.await;
if res.is_err() || !res.as_ref().unwrap().status.success() {
tokio::process::Command::new("import")
.args(["-window", "root", &tmp_path_str])
.output()
.await
} else {
res
}
}
.map_err(|e| anyhow!("computer_use screenshot: {e}"))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(anyhow!("computer_use screenshot failed: {stderr}"));
}
let bytes = tokio::fs::read(&tmp_path)
.await
.map_err(|e| anyhow!("computer_use: failed to read screenshot: {e}"))?;
use base64::Engine;
let b64 = base64::engine::general_purpose::STANDARD.encode(&bytes);
let _ = tokio::fs::remove_file(&tmp_path).await;
let (width, height) = if bytes.len() >= 24 {
let w = u32::from_be_bytes([bytes[16], bytes[17], bytes[18], bytes[19]]);
let h = u32::from_be_bytes([bytes[20], bytes[21], bytes[22], bytes[23]]);
(w, h)
} else {
(0, 0)
};
Ok(json!({
"action": "screenshot",
"image": format!("data:image/png;base64,{b64}"),
"width": width,
"height": height
}))
}
"mouse_move" => {
let x = args["x"].as_f64().unwrap_or(0.0) as i64;
let y = args["y"].as_f64().unwrap_or(0.0) as i64;
if is_macos {
run_subprocess("cliclick", &[&format!("m:{x},{y}")]).await?;
} else {
run_subprocess("xdotool", &["mousemove", &x.to_string(), &y.to_string()])
.await?;
}
Ok(json!({"action": "mouse_move", "ok": true}))
}
"mouse_click" => {
let x = args["x"].as_f64().unwrap_or(0.0) as i64;
let y = args["y"].as_f64().unwrap_or(0.0) as i64;
if is_macos {
run_subprocess("cliclick", &[&format!("c:{x},{y}")]).await?;
} else {
run_subprocess("xdotool", &["mousemove", &x.to_string(), &y.to_string()])
.await?;
run_subprocess("xdotool", &["click", "1"]).await?;
}
Ok(json!({"action": "mouse_click", "ok": true}))
}
"type" => {
let text = args["text"]
.as_str()
.ok_or_else(|| anyhow!("computer_use type: `text` required"))?;
if is_macos {
let escaped = text.replace('\\', "\\\\").replace('"', "\\\"");
run_subprocess(
"osascript",
&[
"-e",
&format!(
"tell application \"System Events\" to keystroke \"{escaped}\""
),
],
)
.await?;
} else {
run_subprocess("xdotool", &["type", "--clearmodifiers", text]).await?;
}
Ok(json!({"action": "type", "ok": true}))
}
"key" => {
let key = args["key"]
.as_str()
.ok_or_else(|| anyhow!("computer_use key: `key` required"))?;
if is_macos {
run_subprocess("cliclick", &[&format!("kp:{key}")]).await?;
} else {
run_subprocess("xdotool", &["key", key]).await?;
}
Ok(json!({"action": "key", "ok": true}))
}
other => Err(anyhow!(
"computer_use: unsupported action `{other}` \
(supported: screenshot, mouse_move, mouse_click, type, key)"
)),
}
}
async fn tool_image(&self, args: Value) -> Result<Value> {
let prompt = args["prompt"]
.as_str()
.ok_or_else(|| anyhow!("image: `prompt` required"))?;
let size = args["size"].as_str().unwrap_or("1024x1024");
let api_key = std::env::var("OPENAI_API_KEY")
.map_err(|_| anyhow!("image: OPENAI_API_KEY not set — cannot generate images"))?;
let client = reqwest::Client::new();
let resp = client
.post("https://api.openai.com/v1/images/generations")
.header("Authorization", format!("Bearer {api_key}"))
.json(&json!({
"model": "dall-e-3",
"prompt": prompt,
"size": size,
"n": 1,
"response_format": "url"
}))
.send()
.await
.map_err(|e| anyhow!("image: request failed: {e}"))?;
let status = resp.status();
let body: Value = resp
.json()
.await
.map_err(|e| anyhow!("image: failed to parse response: {e}"))?;
if !status.is_success() {
let err_msg = body["error"]["message"].as_str().unwrap_or("unknown error");
return Err(anyhow!("image: API error: {err_msg}"));
}
let url = body["data"][0]["url"].as_str().unwrap_or("").to_owned();
let revised = body["data"][0]["revised_prompt"]
.as_str()
.unwrap_or("")
.to_owned();
Ok(json!({
"url": url,
"revised_prompt": revised,
"size": size
}))
}
async fn tool_pdf(&self, args: Value) -> Result<Value> {
let path = args["path"]
.as_str()
.ok_or_else(|| anyhow!("pdf: `path` required"))?;
let local_path = if path.starts_with("http://") || path.starts_with("https://") {
let tmp = std::env::temp_dir().join("rsclaw_pdf_download.pdf");
let client = reqwest::Client::new();
let bytes = client
.get(path)
.send()
.await
.map_err(|e| anyhow!("pdf: download failed: {e}"))?
.bytes()
.await
.map_err(|e| anyhow!("pdf: download read failed: {e}"))?;
tokio::fs::write(&tmp, &bytes)
.await
.map_err(|e| anyhow!("pdf: write temp file failed: {e}"))?;
tmp
} else {
std::path::PathBuf::from(path)
};
let pdf_bytes = tokio::fs::read(&local_path)
.await
.map_err(|e| anyhow!("pdf: read failed: {e}"))?;
let text = match pdf_extract::extract_text_from_mem(&pdf_bytes) {
Ok(t) => t,
Err(e) => {
tracing::warn!("pdf-extract failed ({e}), trying pdftotext CLI");
let output = tokio::process::Command::new("pdftotext")
.args([local_path.to_str().unwrap_or(""), "-"])
.output()
.await
.map_err(|e2| anyhow!("pdf: extraction failed: {e}, pdftotext: {e2}"))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(anyhow!("pdf: extraction failed: {e}, pdftotext: {stderr}"));
}
String::from_utf8_lossy(&output.stdout).into_owned()
}
};
let truncated = if text.len() > 100_000 {
let mut end = 100_000usize;
while end < text.len() && !text.is_char_boundary(end) {
end += 1;
}
format!("{}...\n[truncated at 100000 chars]", &text[..end])
} else {
text
};
Ok(json!({
"path": path,
"text": truncated,
"chars": truncated.len()
}))
}
async fn tool_tts(&self, args: Value) -> Result<Value> {
let text = args["text"]
.as_str()
.ok_or_else(|| anyhow!("tts: `text` required"))?;
let voice = args["voice"].as_str().unwrap_or("default");
let out_path = std::env::temp_dir().join(format!(
"rsclaw_tts_{}{}",
chrono::Utc::now().timestamp_millis(),
if cfg!(target_os = "windows") {
".wav"
} else {
".aiff"
}
));
let out_path_str = out_path.to_string_lossy().to_string();
let is_macos = cfg!(target_os = "macos");
let is_windows = cfg!(target_os = "windows");
if is_macos {
let mut cmd = tokio::process::Command::new("say");
if voice != "default" {
cmd.args(["-v", voice]);
}
cmd.args(["-o", &out_path_str, text]);
let output = cmd
.output()
.await
.map_err(|e| anyhow!("tts: `say` command failed: {e}"))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(anyhow!("tts: say failed: {stderr}"));
}
} else if is_windows {
let script = format!(
r#"
Add-Type -AssemblyName System.Speech
$synth = New-Object System.Speech.Synthesis.SpeechSynthesizer
$synth.SetOutputToWaveFile('{}')
$synth.Speak('{}')
"#,
out_path_str, text
);
let output = tokio::process::Command::new("powershell")
.args(["-NoProfile", "-Command", &script])
.output()
.await
.map_err(|e| anyhow!("tts: PowerShell SAPI failed: {e}"))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(anyhow!("tts: SAPI failed: {stderr}"));
}
} else {
let espeak_result = tokio::process::Command::new("espeak")
.args(["-w", &out_path_str, text])
.output()
.await;
match espeak_result {
Ok(o) if o.status.success() => {}
_ => {
let output = tokio::process::Command::new("pico2wave")
.args(["-w", &out_path_str, "--", text])
.output()
.await
.map_err(|e| anyhow!("tts: neither espeak nor pico2wave available: {e}"))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(anyhow!("tts: pico2wave failed: {stderr}"));
}
}
}
}
Ok(json!({
"audio_file": out_path_str,
"voice": voice,
"chars": text.len()
}))
}
async fn tool_message(&self, args: Value) -> Result<Value> {
let target = args["target"]
.as_str()
.ok_or_else(|| anyhow!("message: `target` required"))?;
let text = args["text"]
.as_str()
.ok_or_else(|| anyhow!("message: `text` required"))?;
let channel = args["channel"].as_str().unwrap_or("default");
let port = self.config.gateway.port;
let client = reqwest::Client::new();
let resp = client
.post(format!("http://127.0.0.1:{port}/api/v1/message/send"))
.json(&json!({
"channel": channel,
"target": target,
"text": text
}))
.send()
.await;
match resp {
Ok(r) if r.status().is_success() => {
let body: Value = r.json().await.unwrap_or(json!({"ok": true}));
Ok(json!({
"sent": true,
"channel": channel,
"target": target,
"response": body
}))
}
Ok(r) => {
let status = r.status();
let body = r.text().await.unwrap_or_default();
Err(anyhow!("message: gateway returned {status}: {body}"))
}
Err(e) => Err(anyhow!("message: failed to reach gateway: {e}")),
}
}
async fn tool_cron(&self, args: Value) -> Result<Value> {
let action = args["action"]
.as_str()
.ok_or_else(|| anyhow!("cron: `action` required"))?;
let cron_dir = crate::config::loader::base_dir().join("cron");
let cron_path = cron_dir.join("jobs.json");
match action {
"list" => {
let jobs = read_cron_jobs(&cron_path).await;
Ok(json!({"jobs": jobs}))
}
"add" => {
let schedule = args["schedule"]
.as_str()
.ok_or_else(|| anyhow!("cron add: `schedule` required"))?;
let message = args["message"]
.as_str()
.ok_or_else(|| anyhow!("cron add: `message` required"))?;
let name = args["name"].as_str();
let tz = args["tz"].as_str();
let agent_id = args["agent_id"].as_str().or(args["agentId"].as_str());
let mut jobs = read_cron_jobs(&cron_path).await;
let now_ms = Utc::now().timestamp_millis() as u64;
let id = Uuid::new_v4().to_string();
let mut job = json!({
"id": id,
"agentId": agent_id.unwrap_or("main"),
"enabled": true,
"createdAtMs": now_ms,
"updatedAtMs": now_ms,
});
if let Some(tz_val) = tz {
job["schedule"] = json!({"kind": "cron", "expr": schedule, "tz": tz_val});
} else {
job["schedule"] = json!({"kind": "cron", "expr": schedule});
}
job["payload"] = json!({"kind": "systemEvent", "text": message});
if let Some(n) = name {
job["name"] = json!(n);
}
jobs.push(job);
write_cron_jobs(&cron_path, &jobs).await?;
Ok(json!({"added": id, "schedule": schedule, "message": message}))
}
"remove" => {
let id = args["id"]
.as_str()
.ok_or_else(|| anyhow!("cron remove: `id` required"))?;
let mut jobs = read_cron_jobs(&cron_path).await;
let before = jobs.len();
jobs.retain(|j| j["id"].as_str() != Some(id));
let removed = before - jobs.len();
write_cron_jobs(&cron_path, &jobs).await?;
Ok(json!({"removed": removed, "id": id}))
}
other => Err(anyhow!(
"cron: unsupported action `{other}` (list, add, remove)"
)),
}
}
}
async fn read_cron_jobs(path: &std::path::Path) -> Vec<Value> {
let data = tokio::fs::read_to_string(path)
.await
.unwrap_or_else(|_| "[]".to_owned());
if let Ok(wrapper) = serde_json::from_str::<Value>(&data) {
if let Some(jobs) = wrapper.get("jobs").and_then(|v| v.as_array()) {
return jobs.clone();
}
if let Some(arr) = wrapper.as_array() {
return arr.clone();
}
}
Vec::new()
}
async fn write_cron_jobs(path: &std::path::Path, jobs: &[Value]) -> Result<()> {
if let Some(parent) = path.parent() {
let _ = tokio::fs::create_dir_all(parent).await;
}
let wrapper = json!({"version": 1, "jobs": jobs});
tokio::fs::write(path, serde_json::to_string_pretty(&wrapper)?)
.await
.map_err(|e| anyhow!("cron: failed to write jobs: {e}"))?;
Ok(())
}
impl AgentRuntime {
async fn tool_sessions_send(&self, ctx: &RunContext, args: Value) -> Result<Value> {
let message = args["message"]
.as_str()
.ok_or_else(|| anyhow!("sessions_send: `message` required"))?
.to_owned();
let agent_id = args["agentId"]
.as_str()
.or_else(|| args["agent_id"].as_str());
let session_key = args["sessionKey"]
.as_str()
.or_else(|| args["session_key"].as_str());
let registry = self
.agents
.as_ref()
.ok_or_else(|| anyhow!("sessions_send: agent registry not available"))?;
let target_id = agent_id.unwrap_or(&ctx.agent_id);
let target = registry
.get(target_id)
.map_err(|_| anyhow!("sessions_send: agent `{target_id}` not found"))?;
let child_session = session_key
.map(|s| s.to_owned())
.unwrap_or_else(|| format!("{}:send:{}", ctx.session_key, Uuid::new_v4()));
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel::<AgentReply>();
let msg = AgentMessage {
session_key: child_session.clone(),
text: message,
channel: format!("sessions_send:{}", ctx.agent_id),
peer_id: ctx.agent_id.clone(),
chat_id: String::new(),
reply_tx,
extra_tools: vec![],
images: vec![],
files: vec![],
};
target
.tx
.send(msg)
.await
.map_err(|_| anyhow!("sessions_send: agent `{target_id}` inbox closed"))?;
let timeout_secs = self
.config
.agents
.defaults
.timeout_seconds
.unwrap_or(DEFAULT_TIMEOUT_SECONDS as u32) as u64;
let reply = tokio::time::timeout(Duration::from_secs(timeout_secs), reply_rx)
.await
.map_err(|_| anyhow!("sessions_send: timed out after {timeout_secs}s"))?
.map_err(|_| anyhow!("sessions_send: reply channel dropped"))?;
Ok(json!({
"session_key": child_session,
"agent_id": target_id,
"reply": reply.text
}))
}
async fn tool_sessions_list(&self) -> Result<Value> {
let sessions = self.store.db.list_sessions()?;
let list: Vec<Value> = sessions
.iter()
.filter_map(|key| {
let meta = self.store.db.get_session_meta(key).ok().flatten();
Some(json!({
"session_key": key,
"message_count": meta.as_ref().map(|m| m.message_count).unwrap_or(0),
"last_active": meta.as_ref().map(|m| m.last_active).unwrap_or(0),
"created_at": meta.as_ref().map(|m| m.created_at).unwrap_or(0),
}))
})
.collect();
Ok(json!({"sessions": list, "count": list.len()}))
}
async fn tool_sessions_history(&self, args: Value) -> Result<Value> {
let session_key = args["sessionKey"]
.as_str()
.or_else(|| args["session_key"].as_str())
.ok_or_else(|| anyhow!("sessions_history: `sessionKey` required"))?;
let limit = args["limit"].as_u64().unwrap_or(50) as usize;
let messages = self.store.db.load_messages(session_key)?;
let total = messages.len();
let truncated: Vec<&Value> = messages.iter().rev().take(limit).collect();
Ok(json!({
"session_key": session_key,
"messages": truncated,
"total": total,
"returned": truncated.len()
}))
}
async fn tool_session_status(&self, ctx: &RunContext, args: Value) -> Result<Value> {
let session_key = args["sessionKey"]
.as_str()
.or_else(|| args["session_key"].as_str())
.unwrap_or(&ctx.session_key);
let meta = self.store.db.get_session_meta(session_key)?;
match meta {
Some(m) => Ok(json!({
"session_key": session_key,
"message_count": m.message_count,
"last_active": m.last_active,
"created_at": m.created_at,
"active": true
})),
None => Ok(json!({
"session_key": session_key,
"active": false,
"note": "session not found or no metadata"
})),
}
}
async fn tool_gateway(&self, args: Value) -> Result<Value> {
let action = args["action"]
.as_str()
.ok_or_else(|| anyhow!("gateway: `action` required"))?;
let port = self.config.gateway.port;
let version = env!("CARGO_PKG_VERSION");
match action {
"status" | "health" => Ok(json!({
"status": "running",
"version": version,
"port": port,
"agents": self.agents.as_ref().map(|r| r.all().len()).unwrap_or(0),
})),
"version" => Ok(json!({
"version": version,
"name": "rsclaw",
})),
other => Err(anyhow!(
"gateway: unsupported action `{other}` (status, health, version)"
)),
}
}
async fn tool_pairing(&self, args: Value) -> Result<Value> {
let action = args["action"]
.as_str()
.ok_or_else(|| anyhow!("pairing: `action` required"))?;
let port = self.config.gateway.port;
let client = reqwest::Client::new();
let base = format!("http://127.0.0.1:{port}/api/v1");
let auth_token = self
.config
.gateway
.auth_token
.as_deref()
.unwrap_or_default();
let auth_header = if auth_token.is_empty() {
String::new()
} else {
format!("Bearer {auth_token}")
};
match action {
"list" => {
let mut req = client.get(format!("{base}/channels/pairings"));
if !auth_header.is_empty() {
req = req.header("Authorization", &auth_header);
}
let resp = req.send().await?;
let data: Value = resp.json().await?;
Ok(data)
}
"approve" => {
let code = args["code"]
.as_str()
.ok_or_else(|| anyhow!("pairing approve: `code` required"))?;
let mut req = client
.post(format!("{base}/channels/pair"))
.json(&json!({"code": code}));
if !auth_header.is_empty() {
req = req.header("Authorization", &auth_header);
}
let resp = req.send().await?;
let data: Value = resp.json().await?;
Ok(data)
}
"revoke" => {
let channel = args["channel"]
.as_str()
.ok_or_else(|| anyhow!("pairing revoke: `channel` required"))?;
let peer_id = args["peerId"]
.as_str()
.ok_or_else(|| anyhow!("pairing revoke: `peerId` required"))?;
let mut req = client
.post(format!("{base}/channels/unpair"))
.json(&json!({"channel": channel, "peerId": peer_id}));
if !auth_header.is_empty() {
req = req.header("Authorization", &auth_header);
}
let resp = req.send().await?;
let data: Value = resp.json().await?;
Ok(data)
}
other => Err(anyhow!(
"pairing: unsupported action `{other}` (list, approve, revoke)"
)),
}
}
async fn tool_memory_consolidated(&self, ctx: &RunContext, args: Value) -> Result<Value> {
let action = args["action"].as_str().unwrap_or("search");
match action {
"search" => self.tool_memory_search(args).await,
"get" => self.tool_memory_get(args).await,
"put" => self.tool_memory_put(ctx, args).await,
"delete" => self.tool_memory_delete(args).await,
_ => bail!("memory: unknown action '{action}' (search, get, put, delete)"),
}
}
async fn tool_session_consolidated(&self, ctx: &RunContext, args: Value) -> Result<Value> {
let action = args["action"].as_str().unwrap_or("list");
match action {
"send" => self.tool_sessions_send(ctx, args).await,
"list" => self.tool_sessions_list().await,
"history" => self.tool_sessions_history(args).await,
"status" => self.tool_session_status(ctx, args).await,
_ => bail!("session: unknown action '{action}' (send, list, history, status)"),
}
}
async fn tool_agent_consolidated(&self, args: Value) -> Result<Value> {
let action = args["action"].as_str().unwrap_or("list");
match action {
"spawn" => self.tool_agent_spawn(args).await,
"list" => self.tool_agent_list().await,
"kill" => {
let id = args["id"]
.as_str()
.ok_or_else(|| anyhow!("agent kill: `id` required"))?;
Ok(json!({
"action": "kill",
"id": id,
"note": "agent termination not yet implemented; agent will stop on next idle timeout"
}))
}
_ => bail!("agent: unknown action '{action}' (spawn, list, kill)"),
}
}
async fn tool_channel_consolidated(&self, args: Value) -> Result<Value> {
let channel_type = args["channel"].as_str().unwrap_or("unknown").to_owned();
self.tool_channel_actions(&channel_type, args).await
}
async fn tool_channel_actions(&self, channel_type: &str, args: Value) -> Result<Value> {
let action = args["action"]
.as_str()
.ok_or_else(|| anyhow!("{channel_type}_actions: `action` required"))?;
let chat_id = args["chatId"]
.as_str()
.or_else(|| args["chat_id"].as_str())
.unwrap_or("");
let text = args["text"].as_str().unwrap_or("");
let message_id = args["messageId"]
.as_str()
.or_else(|| args["message_id"].as_str())
.unwrap_or("");
Ok(json!({
"channel": channel_type,
"action": action,
"chatId": chat_id,
"text": text,
"messageId": message_id,
"status": "stub",
"note": format!(
"{channel_type} action `{action}` received. \
Channel-specific API integration is not yet wired — \
use the `message` tool for basic send operations."
)
}))
}
}
fn expand_tilde(p: &str) -> std::path::PathBuf {
if let Some(rest) = p.strip_prefix("~/").or_else(|| p.strip_prefix("~\\")) {
dirs_next::home_dir().unwrap_or_default().join(rest)
} else if p == "~" {
dirs_next::home_dir().unwrap_or_default()
} else {
std::path::PathBuf::from(p)
}
}
async fn extract_file_text(filename: &str, bytes: &[u8]) -> Option<String> {
let lower = filename.to_lowercase();
if lower.ends_with(".pdf") {
match pdf_extract::extract_text_from_mem(bytes) {
Ok(text) => return Some(text),
Err(_) => {}
}
let tmp = std::env::temp_dir().join(format!("rsclaw_extract_{}", uuid::Uuid::new_v4()));
std::fs::write(&tmp, bytes).ok()?;
let output = std::process::Command::new("pdftotext")
.args([tmp.to_str().unwrap_or(""), "-"])
.output();
let _ = std::fs::remove_file(&tmp);
output
.ok()
.filter(|o| o.status.success())
.map(|o| String::from_utf8_lossy(&o.stdout).to_string())
} else if lower.ends_with(".docx") || lower.ends_with(".xlsx") || lower.ends_with(".pptx") {
crate::channel::extract_office_text(filename, bytes)
} else if is_likely_text_file(&lower) {
Some(String::from_utf8_lossy(bytes).to_string())
} else if is_audio_or_video(&lower) {
extract_audio_text(bytes, &lower).await
} else {
None
}
}
fn is_audio_or_video(lower: &str) -> bool {
[
".mp4", ".mov", ".avi", ".mkv", ".webm", ".flv", ".wmv", ".mp3", ".wav", ".ogg", ".m4a",
".aac", ".flac", ".wma", ".opus",
]
.iter()
.any(|e| lower.ends_with(e))
}
async fn extract_audio_text(bytes: &[u8], lower_filename: &str) -> Option<String> {
let ext = lower_filename.rsplit('.').next().unwrap_or("mp4");
let mime = match ext {
"mp4" | "m4a" | "m4v" => "video/mp4",
"ogg" | "oga" | "opus" => "audio/ogg",
"mp3" => "audio/mpeg",
"wav" => "audio/wav",
"flac" => "audio/flac",
"amr" => "audio/amr",
"webm" => "video/webm",
_ => "application/octet-stream",
};
tracing::info!(file = %lower_filename, bytes = bytes.len(), "extract_audio_text: starting");
let client = reqwest::Client::new();
let result =
crate::channel::transcription::transcribe_audio(&client, bytes, lower_filename, mime).await;
match result {
Ok(text) if !text.trim().is_empty() => Some(format!(
"[Audio transcription from {ext} file]\n{}",
text.trim()
)),
Ok(_) => None,
Err(e) => {
tracing::warn!("extract_audio_text: transcription failed: {e:#}");
None
}
}
}
fn is_likely_text_file(lower: &str) -> bool {
[
".txt", ".md", ".csv", ".json", ".toml", ".yaml", ".yml", ".xml", ".html", ".rs", ".py",
".js", ".ts", ".go", ".sh", ".log", ".conf", ".cfg", ".c", ".h", ".java", ".css", ".sql",
".rb", ".php", ".swift", ".kt", ".lua",
]
.iter()
.any(|e| lower.ends_with(e))
}
fn search_engine_url(name: &str) -> &'static str {
static URLS: std::sync::LazyLock<std::collections::HashMap<String, String>> =
std::sync::LazyLock::new(|| {
#[derive(serde::Deserialize)]
struct Entry {
name: String,
url: String,
}
#[derive(serde::Deserialize)]
struct Defs {
#[serde(default)]
search_engines: Vec<Entry>,
}
let defaults_str = crate::config::loader::load_defaults_toml();
let defs: Defs = toml::from_str(&defaults_str).unwrap_or(Defs {
search_engines: vec![],
});
defs.search_engines
.into_iter()
.map(|e| (e.name, e.url))
.collect()
});
URLS.get(name).map(|s| s.as_str()).unwrap_or("")
}
mod urlencoding {
pub fn encode(s: &str) -> String {
let mut out = String::with_capacity(s.len() * 3);
for byte in s.bytes() {
match byte {
b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
out.push(byte as char);
}
_ => {
out.push('%');
out.push_str(&format!("{byte:02X}"));
}
}
}
out
}
}
fn lang_to_bing_mkt(lang: &str) -> &'static str {
match lang.to_lowercase().as_str() {
"chinese" | "zh" => "zh-CN",
"english" | "en" => "en-US",
"japanese" | "ja" => "ja-JP",
"korean" | "ko" => "ko-KR",
"thai" | "th" => "th-TH",
"vietnamese" | "vi" => "vi-VN",
"indonesian" | "id" | "bahasa" => "id-ID",
"malay" | "ms" => "ms-MY",
"tagalog" | "tl" | "filipino" => "en-PH",
"burmese" | "my" => "en-US", "khmer" | "km" => "en-US", "lao" | "lo" => "en-US", "spanish" | "es" => "es-ES",
"french" | "fr" => "fr-FR",
"german" | "de" => "de-DE",
"portuguese" | "pt" => "pt-BR",
"russian" | "ru" => "ru-RU",
"arabic" | "ar" => "ar-SA",
"hindi" | "hi" => "hi-IN",
_ => "",
}
}
fn parse_ddg_results(html: &str, limit: usize) -> Vec<Value> {
let mut results = Vec::new();
let link_re =
regex::Regex::new(r#"<a\s+class="result__a"[^>]*href="([^"]*)"[^>]*>(.*?)</a>"#).unwrap();
let snippet_re = regex::Regex::new(r#"<a\s+class="result__snippet"[^>]*>(.*?)</a>"#).unwrap();
let link_caps: Vec<_> = link_re.captures_iter(html).collect();
let snippet_caps: Vec<_> = snippet_re.captures_iter(html).collect();
for (i, cap) in link_caps.iter().enumerate().take(limit) {
let raw_url = cap.get(1).map(|m| m.as_str()).unwrap_or("");
let title = cap.get(2).map(|m| m.as_str()).unwrap_or("");
let snippet = snippet_caps
.get(i)
.and_then(|c| c.get(1))
.map(|m| m.as_str())
.unwrap_or("");
let url = if let Some(pos) = raw_url.find("uddg=") {
let start = pos + 5;
let end = raw_url[start..]
.find('&')
.map(|e| start + e)
.unwrap_or(raw_url.len());
percent_decode(&raw_url[start..end])
} else {
raw_url.to_owned()
};
results.push(json!({
"title": strip_inline_tags(title),
"url": url,
"snippet": strip_inline_tags(snippet)
}));
}
results
}
fn parse_bing_html_results(html: &str, limit: usize) -> Vec<Value> {
let mut results = Vec::new();
let parts: Vec<&str> = html.split("class=\"b_algo\"").collect();
let link_re = regex::Regex::new(r#"<a[^>]*href="(https?://[^"]*)"[^>]*>(.*?)</a>"#).unwrap();
let snippet_re = regex::Regex::new(r#"<p[^>]*>(.*?)</p>"#).unwrap();
for block in parts.iter().skip(1).take(limit) {
let (url, title) = link_re
.captures(block)
.map(|c| {
(
c.get(1).map(|m| m.as_str()).unwrap_or(""),
c.get(2).map(|m| m.as_str()).unwrap_or(""),
)
})
.unwrap_or(("", ""));
let snippet = snippet_re
.captures(block)
.and_then(|c| c.get(1))
.map(|m| m.as_str())
.unwrap_or("");
if !url.is_empty() {
results.push(json!({
"title": strip_inline_tags(title),
"url": url,
"snippet": strip_inline_tags(snippet)
}));
}
}
results
}
fn parse_baidu_results(html: &str, limit: usize) -> Vec<Value> {
let mut results = Vec::new();
let link_re = regex::Regex::new(r#"<h3[^>]*>\s*<a[^>]*href="([^"]*)"[^>]*>(.*?)</a>"#).unwrap();
let snippet_re =
regex::Regex::new(r#"<span[^>]*class="content-right[^"]*"[^>]*>(.*?)</span>"#).unwrap();
let links: Vec<_> = link_re.captures_iter(html).collect();
let snippets: Vec<_> = snippet_re.captures_iter(html).collect();
for (i, cap) in links.iter().enumerate().take(limit) {
let url = cap.get(1).map(|m| m.as_str()).unwrap_or("");
let title = cap.get(2).map(|m| m.as_str()).unwrap_or("");
let snippet = snippets
.get(i)
.and_then(|c| c.get(1))
.map(|m| m.as_str())
.unwrap_or("");
if !url.is_empty() {
results.push(json!({
"title": strip_inline_tags(title),
"url": url,
"snippet": strip_inline_tags(snippet)
}));
}
}
results
}
fn parse_sogou_results(html: &str, limit: usize) -> Vec<Value> {
let mut results = Vec::new();
let link_re = regex::Regex::new(r#"<h3[^>]*>\s*<a[^>]*href="([^"]*)"[^>]*>(.*?)</a>"#).unwrap();
for cap in link_re.captures_iter(html).take(limit) {
let url = cap.get(1).map(|m| m.as_str()).unwrap_or("");
let title = cap.get(2).map(|m| m.as_str()).unwrap_or("");
if !url.is_empty() {
results.push(json!({
"title": strip_inline_tags(title),
"url": url,
"snippet": ""
}));
}
}
results
}
fn parse_360_results(html: &str, limit: usize) -> Vec<Value> {
let mut results = Vec::new();
let link_re = regex::Regex::new(r#"<h3[^>]*>\s*<a[^>]*href="([^"]*)"[^>]*>(.*?)</a>"#).unwrap();
for cap in link_re.captures_iter(html).take(limit) {
let url = cap.get(1).map(|m| m.as_str()).unwrap_or("");
let title = cap.get(2).map(|m| m.as_str()).unwrap_or("");
if !url.is_empty() {
results.push(json!({
"title": strip_inline_tags(title),
"url": url,
"snippet": ""
}));
}
}
results
}
fn percent_decode(s: &str) -> String {
let mut out = Vec::with_capacity(s.len());
let bytes = s.as_bytes();
let mut i = 0;
while i < bytes.len() {
if bytes[i] == b'%' && i + 2 < bytes.len() {
if let Ok(byte) =
u8::from_str_radix(std::str::from_utf8(&bytes[i + 1..i + 3]).unwrap_or(""), 16)
{
out.push(byte);
i += 3;
continue;
}
}
out.push(bytes[i]);
i += 1;
}
String::from_utf8_lossy(&out).into_owned()
}
fn strip_inline_tags(s: &str) -> String {
let re = regex::Regex::new(r"<[^>]+>").unwrap();
let text = re.replace_all(s, "");
decode_html_entities(&text)
}
fn extract_html_title(html: &str) -> String {
let re = regex::Regex::new(r"(?is)<title[^>]*>(.*?)</title>").unwrap();
re.captures(html)
.and_then(|c| c.get(1))
.map(|m| decode_html_entities(m.as_str().trim()))
.unwrap_or_default()
}
fn strip_html(html: &str) -> String {
let no_script = regex::Regex::new(r"(?is)<script[^>]*>.*?</script>")
.unwrap()
.replace_all(html, "");
let no_style = regex::Regex::new(r"(?is)<style[^>]*>.*?</style>")
.unwrap()
.replace_all(&no_script, "");
let no_tags = regex::Regex::new(r"<[^>]+>")
.unwrap()
.replace_all(&no_style, " ");
let decoded = decode_html_entities(&no_tags);
regex::Regex::new(r"\s+")
.unwrap()
.replace_all(&decoded, " ")
.trim()
.to_owned()
}
fn decode_html_entities(s: &str) -> String {
s.replace("&", "&")
.replace("<", "<")
.replace(">", ">")
.replace(""", "\"")
.replace("'", "'")
.replace("'", "'")
.replace(" ", " ")
}
fn format_tool_result(val: &serde_json::Value) -> String {
if val.get("stdout").is_some() || val.get("stderr").is_some() {
let stdout = val["stdout"].as_str().unwrap_or("").trim();
let stderr = val["stderr"].as_str().unwrap_or("").trim();
let exit_code = val["exit_code"].as_i64();
let mut out = String::new();
if !stdout.is_empty() {
out.push_str(stdout);
}
if !stderr.is_empty() {
if !out.is_empty() {
out.push('\n');
}
out.push_str("[stderr] ");
out.push_str(stderr);
}
if let Some(code) = exit_code {
if code != 0 {
if !out.is_empty() {
out.push('\n');
}
out.push_str(&format!("[exit code: {code}]"));
}
}
if out.is_empty() {
"(no output)".to_owned()
} else {
out
}
}
else if let Some(content) = val.get("content").and_then(|v| v.as_str()) {
let path = val.get("path").and_then(|v| v.as_str()).unwrap_or("");
if path.is_empty() {
content.to_owned()
} else {
format!("[{path}]\n{content}")
}
}
else if val.get("action").is_some() && val.get("text").is_some() {
let action = val["action"].as_str().unwrap_or("");
let text = val["text"].as_str().unwrap_or("");
if text.is_empty() {
format!("[{action}] done")
} else {
text.to_owned()
}
}
else if let Some(results) = val.get("results").and_then(|v| v.as_array()) {
let mut out = String::new();
for (i, r) in results.iter().enumerate() {
let title = r
.get("title")
.and_then(|v| v.as_str())
.unwrap_or("(no title)");
let url = r.get("url").and_then(|v| v.as_str()).unwrap_or("");
let snippet = r.get("snippet").and_then(|v| v.as_str()).unwrap_or("");
out.push_str(&format!("{}. {}\n", i + 1, title));
if !url.is_empty() {
out.push_str(&format!(" {url}\n"));
}
if !snippet.is_empty() {
out.push_str(&format!(" {snippet}\n"));
}
out.push('\n');
}
if out.is_empty() {
"No results found.".to_owned()
} else {
out.trim_end().to_owned()
}
}
else if let Some(cookies) = val.get("cookies").and_then(|v| v.as_array()) {
let mut out = String::new();
for c in cookies {
let name = c.get("name").and_then(|v| v.as_str()).unwrap_or("-");
let value = c.get("value").and_then(|v| v.as_str()).unwrap_or("-");
let domain = c.get("domain").and_then(|v| v.as_str()).unwrap_or("-");
let val_short = if value.len() > 30 {
let end = value
.char_indices()
.nth(27)
.map(|(i, _)| i)
.unwrap_or(value.len());
&value[..end]
} else {
value
};
out.push_str(&format!("{name}={val_short} ({domain})\n"));
}
if out.is_empty() {
"(no cookies)".to_owned()
} else {
out.trim_end().to_owned()
}
}
else {
serde_json::to_string_pretty(val).unwrap_or_default()
}
}
fn write_config_value(dot_path: &str, value: serde_json::Value) -> anyhow::Result<()> {
use crate::cmd::config_json::{load_config_json, set_nested_value};
let (path, mut val) = load_config_json()?;
let parts: Vec<&str> = dot_path.split('.').collect();
for i in 0..parts.len().saturating_sub(1) {
let key = parts[i];
if val.get(key).is_none() {
val.as_object_mut()
.map(|o| o.insert(key.to_string(), serde_json::json!({})));
}
if i > 0 {
let prefix = parts[..=i].join(".");
if crate::cmd::config_json::get_nested_value(&val, &prefix).is_none() {
set_nested_value(&mut val, &prefix, serde_json::json!({}))?;
}
}
}
set_nested_value(&mut val, dot_path, value)?;
std::fs::write(&path, serde_json::to_string_pretty(&val)?)?;
Ok(())
}
fn detect_chrome() -> Option<String> {
#[cfg(target_os = "macos")]
{
let app_path = "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome";
if std::path::Path::new(app_path).exists() {
return Some(app_path.to_owned());
}
}
#[cfg(target_os = "windows")]
{
let candidates = [
r"C:\Program Files\Google\Chrome\Application\chrome.exe",
r"C:\Program Files (x86)\Google\Chrome\Application\chrome.exe",
];
for path in &candidates {
if std::path::Path::new(path).exists() {
return Some(path.to_string());
}
}
if let Ok(userprofile) = std::env::var("USERPROFILE") {
let user_chrome = format!(
r"{}\AppData\Local\Google\Chrome\Application\chrome.exe",
userprofile
);
if std::path::Path::new(&user_chrome).exists() {
return Some(user_chrome);
}
}
}
let which_cmd = if cfg!(target_os = "windows") {
"where"
} else {
"which"
};
for name in &["google-chrome", "chromium", "chromium-browser", "chrome"] {
if let Ok(output) = std::process::Command::new(which_cmd).arg(name).output() {
if output.status.success() {
let path = String::from_utf8_lossy(&output.stdout).trim().to_owned();
if !path.is_empty() {
return Some(path);
}
}
}
}
None
}
async fn run_subprocess(cmd: &str, args: &[&str]) -> Result<()> {
let output = tokio::process::Command::new(cmd)
.args(args)
.output()
.await
.map_err(|e| anyhow!("{cmd}: {e}"))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(anyhow!("{cmd} failed: {stderr}"));
}
Ok(())
}
fn toolset_allowed_names(
toolset: &str,
custom_tools: Option<&Vec<String>>,
) -> Option<std::collections::HashSet<String>> {
const MINIMAL: &[&str] = &["exec", "read", "write", "web_search", "web_fetch", "memory"];
const STANDARD: &[&str] = &[
"exec",
"read",
"write",
"web_search",
"web_fetch",
"memory",
"web_browser",
"image",
"channel",
"cron",
];
let base: Option<&[&str]> = match toolset {
"minimal" => Some(MINIMAL),
"standard" => Some(STANDARD),
"full" => None,
_ => Some(STANDARD), };
match (base, custom_tools) {
(None, None) => None, (None, Some(extra)) => {
Some(extra.iter().cloned().collect())
}
(Some(base_list), None) => Some(base_list.iter().map(|s| s.to_string()).collect()),
(Some(base_list), Some(extra)) => {
let mut set: std::collections::HashSet<String> =
base_list.iter().map(|s| s.to_string()).collect();
set.extend(extra.iter().cloned());
Some(set)
}
}
}
fn format_duration(d: std::time::Duration) -> String {
let secs = d.as_secs();
let days = secs / 86400;
let hours = (secs % 86400) / 3600;
let mins = (secs % 3600) / 60;
let s = secs % 60;
if days > 0 {
format!("{days}d {hours}h {mins}m")
} else if hours > 0 {
format!("{hours}h {mins}m {s}s")
} else if mins > 0 {
format!("{mins}m {s}s")
} else {
format!("{s}s")
}
}
fn build_help_text_filtered(allowed: &str) -> String {
let full = allowed == "*";
let has = |cmd: &str| -> bool {
if full {
return true;
}
READONLY_COMMANDS.iter().any(|c| *c == cmd) || allowed.split('|').any(|a| a.trim() == cmd)
};
let mut help = String::from("Available commands:\n\n");
if has("/run") || has("/find") || has("/grep") {
help.push_str("Shell:\n");
if has("/run") {
help.push_str(" /run <cmd> Execute a shell command\n");
help.push_str(" $ <cmd> Execute a shell command (shortcut)\n");
}
if has("/find") {
help.push_str(" /find <pattern> Find files by name\n");
}
if has("/grep") {
help.push_str(" /grep <pattern> Search file contents\n");
}
help.push('\n');
}
if has("/read") || has("/write") || has("/ls") {
help.push_str("Files:\n");
if has("/read") {
help.push_str(" /read <path> Read a file\n");
}
if has("/write") {
help.push_str(" /write <path> <content> Write to a file\n");
}
if has("/ls") {
help.push_str(" /ls [path] List directory\n");
}
help.push('\n');
}
if has("/search") || has("/fetch") || has("/screenshot") || has("/ss") {
help.push_str("Search & Web:\n");
if has("/search") {
help.push_str(" /search <query> Search the web\n");
}
if has("/fetch") {
help.push_str(" /fetch <url> Fetch a web page\n");
}
if has("/screenshot") {
help.push_str(" /screenshot <url> Screenshot a web page\n");
}
if has("/ss") {
help.push_str(" /ss Screenshot desktop\n");
}
help.push('\n');
}
if has("/remember") || has("/recall") {
help.push_str("Memory:\n");
if has("/remember") {
help.push_str(" /remember <text> Save to memory\n");
}
if has("/recall") {
help.push_str(" /recall <query> Search memory\n");
}
help.push('\n');
}
help.push_str("Background Context:\n");
help.push_str(" /ctx <text> Add persistent context\n");
help.push_str(" /ctx --ttl <N> <text> Add context (expires in N turns)\n");
if full {
help.push_str(" /ctx --global <text> Add global context (all sessions)\n");
}
help.push_str(" /ctx --list List active context entries\n");
help.push_str(" /ctx --remove <id> Remove entry by id\n");
help.push_str(" /ctx --clear Clear all context for this session\n");
help.push('\n');
help.push_str("Side Query:\n");
help.push_str(" /btw <question> Quick query (no tools, ephemeral)\n");
help.push('\n');
if full {
help.push_str("Tools (consolidated):\n");
help.push_str(" memory search/get/put/delete long-term memory\n");
help.push_str(" session send/list/history/status for sessions\n");
help.push_str(" agent spawn/list/kill sub-agents\n");
help.push_str(" channel send/reply/pin/delete across channels\n");
help.push('\n');
}
help.push_str("System:\n");
help.push_str(" /status Gateway status\n");
help.push_str(" /version Show version\n");
help.push_str(" /models List models\n");
if has("/model") {
help.push_str(" /model <name> Switch model\n");
}
help.push_str(" /uptime Show uptime\n");
help.push('\n');
help.push_str("Session:\n");
help.push_str(" /clear Clear session\n");
if has("/reset") {
help.push_str(" /reset Reset session\n");
}
help.push_str(" /history [n] Show history\n");
if has("/sessions") {
help.push_str(" /sessions List sessions\n");
}
help.push('\n');
help.push_str("Cron:\n");
help.push_str(" /cron list List cron jobs\n");
help.push('\n');
if has("/send") {
help.push_str("Messaging:\n");
help.push_str(" /send <target> <msg> Send a message\n");
help.push('\n');
}
if has("/skill") {
help.push_str("Skill:\n");
help.push_str(" /skill install <name>\n");
help.push_str(" /skill list\n");
help.push_str(" /skill search <query>\n");
help.push('\n');
}
if full {
help.push_str("Upload & Limits:\n");
help.push_str(" /get_upload_size Show upload size limit\n");
help.push_str(" /set_upload_size <MB> Set size limit (runtime)\n");
help.push_str(" /get_upload_chars Show text char limit\n");
help.push_str(" /set_upload_chars <N> Set char limit (runtime)\n");
help.push_str(" /config_upload_size <MB> Set size limit (persistent)\n");
help.push_str(" /config_upload_chars <N> Set char limit (persistent)\n");
help.push('\n');
}
help.push_str("Type any message without / to chat with the AI agent.");
help
}
fn build_system_prompt(
ws_ctx: &WorkspaceContext,
skills: &SkillRegistry,
config: &crate::config::schema::Config,
) -> String {
let mut parts: Vec<String> = Vec::new();
let now = chrono::Local::now();
use chrono::Datelike;
let weekday = now.date_naive().weekday().num_days_from_monday(); let last_friday = if weekday >= 4 {
now.date_naive() - chrono::Duration::days((weekday - 4) as i64)
} else {
now.date_naive() - chrono::Duration::days((weekday + 3) as i64)
};
let yesterday = now.date_naive() - chrono::Duration::days(1);
let mut date_line = format!(
"Current date: {} ({}). Yesterday: {}. Last Friday: {}.",
now.format("%Y-%m-%d %H:%M"),
now.format("%A"),
yesterday.format("%Y-%m-%d"),
last_friday.format("%Y-%m-%d"),
);
if let Some(lang) = config.gateway.as_ref().and_then(|g| g.language.as_deref()) {
date_line.push_str(&format!(
"\nDefault response language: {lang}. Always reply in {lang} unless the user explicitly uses another language."
));
}
parts.push(date_line);
parts.push(
"## Tool Usage Guidelines\n\
When using tools, keep arguments concise:\n\
- write tool: content must be under 2000 characters per call. Split large files into multiple calls.\n\
- Prefer multiple short tool calls over one long call.\n\
- For code generation: write one module/function at a time, then iterate.".to_string(),
);
let ws_segment = ws_ctx.to_prompt_segment();
if !ws_segment.is_empty() {
parts.push(ws_segment);
}
if !skills.is_empty() {
let skill_xml: String = skills
.all()
.map(|s| {
format!(
" <skill name=\"{}\">{}</skill>",
s.name,
s.description.as_deref().unwrap_or("")
)
})
.collect::<Vec<_>>()
.join("\n");
parts.push(format!(
"<available_skills>\n{skill_xml}\n</available_skills>"
));
}
parts.join("\n\n")
}
fn rrf_fuse(
vec_hits: Vec<crate::agent::memory::MemoryDoc>,
bm25_hits: Vec<crate::store::search::IndexDoc>,
top_k: usize,
) -> Vec<crate::agent::memory::MemoryDoc> {
use std::collections::HashMap;
use crate::agent::memory::MemoryDoc;
const K: f32 = 60.0;
let mut scores: HashMap<String, (f32, MemoryDoc)> = HashMap::new();
for (rank, doc) in vec_hits.into_iter().enumerate() {
let rrf = 1.0 / (K + (rank + 1) as f32);
scores
.entry(doc.id.clone())
.and_modify(|(s, _)| *s += rrf)
.or_insert((rrf, doc));
}
for (rank, doc) in bm25_hits.into_iter().enumerate() {
let rrf = 1.0 / (K + (rank + 1) as f32);
scores
.entry(doc.id.clone())
.and_modify(|(s, _)| *s += rrf)
.or_insert_with(|| {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i64;
(
rrf,
MemoryDoc {
id: doc.id,
scope: doc.scope,
kind: doc.kind,
text: doc.content,
vector: vec![],
created_at: now,
accessed_at: now,
access_count: 0,
importance: 0.5,
tier: Default::default(),
abstract_text: None,
overview_text: None,
},
)
});
}
let mut ranked: Vec<(f32, MemoryDoc)> = scores
.into_values()
.map(|(score, doc)| (score * doc.decay_multiplier(), doc))
.collect();
ranked.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
ranked.into_iter().take(top_k).map(|(_, doc)| doc).collect()
}
fn inject_action(mut args: Value, action: &str) -> Value {
if let Some(obj) = args.as_object_mut() {
obj.entry("action").or_insert_with(|| json!(action));
}
args
}
fn inject_channel(mut args: Value, channel: &str) -> Value {
if let Some(obj) = args.as_object_mut() {
obj.entry("channel").or_insert_with(|| json!(channel));
}
args
}
fn build_tool_list(
skills: &SkillRegistry,
agents: Option<&AgentRegistry>,
caller_id: &str,
external_agents: &[crate::config::schema::ExternalAgentConfig],
) -> Vec<ToolDef> {
let mut tools = Vec::new();
tools.push(ToolDef {
name: "memory".to_owned(),
description: "Manage long-term memory. Actions: search (semantic search), get (by ID), put (store new), delete (remove by ID).".to_owned(),
parameters: json!({
"type": "object",
"properties": {
"action": {"type": "string", "enum": ["search", "get", "put", "delete"], "description": "Action to perform"},
"query": {"type": "string", "description": "Search query (for search action)"},
"id": {"type": "string", "description": "Memory document ID (for get/delete)"},
"text": {"type": "string", "description": "Content to store (for put action)"},
"scope": {"type": "string", "description": "Scope filter (optional)"},
"kind": {"type": "string", "description": "Document kind: note, fact, summary (for put)"},
"top_k": {"type": "integer", "description": "Max results (for search, default 5)"}
},
"required": ["action"]
}),
});
tools.push(ToolDef {
name: "read".to_owned(),
description: "Read a file from the agent workspace. Path is relative to workspace root."
.to_owned(),
parameters: json!({
"type": "object",
"properties": {
"path": {"type": "string", "description": "Relative file path within the workspace"}
},
"required": ["path"]
}),
});
tools.push(ToolDef {
name: "write".to_owned(),
description: "Write content to a file in the agent workspace. Creates parent directories as needed.
CRITICAL CONTENT LENGTH LIMIT: Each write call MUST keep content under 2500 characters. Calls exceeding this limit will be rejected. For large files, split into multiple calls of 1500-2000 characters each, or use exec with shell redirection.
IMPORTANT: Both 'path' and 'content' parameters are REQUIRED.
Parameters:
- path: Relative file path within the workspace (e.g., 'output.py', 'src/main.rs')
- content: The text content to write to the file
Example - Good (under 2500 chars):
{\"path\": \"hello.py\", \"content\": \"print('Hello, World!')\"}
Example - Large file (SPLIT INTO MULTIPLE CALLS):
Part 1: {\"path\": \"src/main.rs\", \"content\": \"// Part 1: imports + main fn...\"}
Part 2: {\"path\": \"src/lib.rs\", \"content\": \"// Part 2: helper functions...\"}".to_owned(),
parameters: json!({
"type": "object",
"properties": {
"path": {"type": "string", "description": "Relative file path within the workspace (REQUIRED). Example: 'output.py'"},
"content": {"type": "string", "description": "File content to write (REQUIRED). STRICT LIMIT: Must be under 2500 characters or the call will fail. Split large files into multiple calls."}
},
"required": ["path", "content"]
}),
});
tools.push(ToolDef {
name: "exec".to_owned(),
description: "Run a shell command. The user has pre-authorized all commands.".to_owned(),
parameters: json!({
"type": "object",
"properties": {
"command": {"type": "string", "description": "Shell command to execute"}
},
"required": ["command"]
}),
});
tools.push(ToolDef {
name: "agent".to_owned(),
description: "Manage agents. Actions: spawn (create new sub-agent), list (all registered agents), kill (stop an agent).".to_owned(),
parameters: json!({
"type": "object",
"properties": {
"action": {"type": "string", "enum": ["spawn", "list", "kill"], "description": "Action to perform"},
"id": {"type": "string", "description": "Agent ID (for spawn/kill)"},
"model": {"type": "string", "description": "Model string (for spawn)"},
"system": {"type": "string", "description": "System prompt (for spawn)"}
},
"required": ["action"]
}),
});
tools.push(ToolDef {
name: "web_search".to_owned(),
description: "Search the web using a configurable search engine. Returns titles, URLs, and snippets.".to_owned(),
parameters: json!({
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
"provider": {"type": "string", "description": "Search provider: duckduckgo, google, bing, brave. Leave empty to use the configured default."},
"limit": {"type": "integer", "description": "Max results to return (default 5)"}
},
"required": ["query"]
}),
});
tools.push(ToolDef {
name: "web_fetch".to_owned(),
description: "Download a web page and extract its text content. Strips HTML tags, scripts, and styles. Truncates to 50000 chars.".to_owned(),
parameters: json!({
"type": "object",
"properties": {
"url": {"type": "string", "description": "URL to fetch"},
"selector": {"type": "string", "description": "CSS-like selector hint for content extraction"}
},
"required": ["url"]
}),
});
tools.push(ToolDef {
name: "web_browser".to_owned(),
description: "Control a web browser via CDP. Actions: open, snapshot, click, fill, type, select, check, uncheck, scroll, screenshot, pdf, back, forward, reload, get_text, get_url, get_title, wait, evaluate, cookies".to_owned(),
parameters: json!({
"type": "object",
"properties": {
"action": {"type": "string", "description": "Action to perform"},
"url": {"type": "string", "description": "URL for open action"},
"ref": {"type": "string", "description": "Element reference like @e3 from snapshot"},
"text": {"type": "string", "description": "Text for fill/type actions"},
"value": {"type": "string", "description": "Value for select action"},
"direction": {"type": "string", "description": "up/down for scroll"},
"js": {"type": "string", "description": "JavaScript for evaluate action"},
"target": {"type": "string", "description": "element/text/url for wait action"},
"timeout": {"type": "number", "description": "Timeout in seconds (default 15)"}
},
"required": ["action"]
}),
});
tools.push(ToolDef {
name: "computer_use".to_owned(),
description: "Control the computer: take screenshots, move/click mouse, type text, press keys. Works on macOS (screencapture, cliclick, osascript) and Linux (scrot, xdotool).".to_owned(),
parameters: json!({
"type": "object",
"properties": {
"action": {"type": "string", "description": "Action: screenshot, mouse_move, mouse_click, type, key"},
"x": {"type": "number", "description": "X coordinate for mouse actions"},
"y": {"type": "number", "description": "Y coordinate for mouse actions"},
"text": {"type": "string", "description": "Text for type action"},
"key": {"type": "string", "description": "Key name for key action (e.g. enter, tab, escape)"}
},
"required": ["action"]
}),
});
tools.push(ToolDef {
name: "image".to_owned(),
description: "Generate an image from a text description using an AI image model.".to_owned(),
parameters: json!({
"type": "object",
"properties": {
"prompt": {"type": "string", "description": "Text description of the image to generate"},
"size": {"type": "string", "description": "Image size (default: 1024x1024)", "default": "1024x1024"}
},
"required": ["prompt"]
}),
});
tools.push(ToolDef {
name: "pdf".to_owned(),
description: "Extract text content from a PDF file or URL.".to_owned(),
parameters: json!({
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path or URL to a PDF document"}
},
"required": ["path"]
}),
});
tools.push(ToolDef {
name: "tts".to_owned(),
description: "Convert text to speech audio.".to_owned(),
parameters: json!({
"type": "object",
"properties": {
"text": {"type": "string", "description": "Text to convert to speech"},
"voice": {"type": "string", "description": "Voice name (macOS: say -v '?', Linux: espeak --voices)"}
},
"required": ["text"]
}),
});
tools.push(ToolDef {
name: "message".to_owned(),
description: "Send a message to a chat channel target (user or group).".to_owned(),
parameters: json!({
"type": "object",
"properties": {
"channel": {"type": "string", "description": "Channel type (e.g. telegram, discord)"},
"target": {"type": "string", "description": "Target user or group ID"},
"text": {"type": "string", "description": "Message text to send"}
},
"required": ["target", "text"]
}),
});
tools.push(ToolDef {
name: "cron".to_owned(),
description: "List, add, or remove cron jobs.".to_owned(),
parameters: json!({
"type": "object",
"properties": {
"action": {"type": "string", "enum": ["list", "add", "remove"], "description": "Action to perform"},
"schedule": {"type": "string", "description": "Cron schedule expression (for add)"},
"message": {"type": "string", "description": "Message or task to run (for add)"},
"id": {"type": "string", "description": "Job ID (for remove)"}
},
"required": ["action"]
}),
});
tools.push(ToolDef {
name: "session".to_owned(),
description: "Manage sessions. Actions: send (message to another agent), list (all active sessions), history (retrieve conversation), status (session info).".to_owned(),
parameters: json!({
"type": "object",
"properties": {
"action": {"type": "string", "enum": ["send", "list", "history", "status"], "description": "Action to perform"},
"agentId": {"type": "string", "description": "Target agent ID (for send)"},
"sessionKey": {"type": "string", "description": "Session key (for send/history/status)"},
"message": {"type": "string", "description": "Message text (for send)"},
"limit": {"type": "number", "description": "Max messages to return (for history, default 50)"}
},
"required": ["action"]
}),
});
tools.push(ToolDef {
name: "gateway".to_owned(),
description: "Query gateway status and information.".to_owned(),
parameters: json!({
"type": "object",
"properties": {
"action": {"type": "string", "enum": ["status", "health", "version"], "description": "Info to retrieve"}
},
"required": ["action"]
}),
});
tools.push(ToolDef {
name: "opencode".to_owned(),
description: "Execute coding tasks using OpenCode (a powerful coding agent). IMPORTANT: When creating new projects or files, ALWAYS create a dedicated project directory first (e.g., 'my-project/') and place all files inside it. Do NOT create files directly in the workspace root. The task will run asynchronously and results will be sent when complete.".to_owned(),
parameters: json!({
"type": "object",
"properties": {
"task": {"type": "string", "description": "The coding task to execute. Be specific about file paths and always mention creating a project subdirectory for new projects."}
},
"required": ["task"]
}),
});
tools.push(ToolDef {
name: "channel".to_owned(),
description: "Perform channel-specific actions (send, reply, pin, delete messages). Channel is auto-detected from current session or can be specified explicitly: telegram, discord, slack, whatsapp, feishu, weixin, qq, dingtalk.".to_owned(),
parameters: json!({
"type": "object",
"properties": {
"action": {"type": "string", "enum": ["send", "reply", "forward", "pin", "unpin", "delete"], "description": "Action to perform"},
"channel": {"type": "string", "description": "Channel type (auto-detected if omitted): telegram, discord, slack, whatsapp, feishu, weixin, qq, dingtalk"},
"chatId": {"type": "string", "description": "Chat/channel ID"},
"text": {"type": "string", "description": "Message text"},
"messageId": {"type": "string", "description": "Message ID (for reply/pin/delete)"}
},
"required": ["action"]
}),
});
tools.push(ToolDef {
name: "pairing".to_owned(),
description: "Manage channel pairing (dmPolicy=pairing). Actions: list (show pending codes and approved peers), approve (approve a pairing code), revoke (revoke an approved peer).".to_owned(),
parameters: json!({
"type": "object",
"properties": {
"action": {"type": "string", "enum": ["list", "approve", "revoke"], "description": "Action to perform"},
"code": {"type": "string", "description": "Pairing code to approve (for approve action, e.g. ZGTB-NB79)"},
"channel": {"type": "string", "description": "Channel name (for revoke action, e.g. qq, telegram)"},
"peerId": {"type": "string", "description": "Peer ID to revoke (for revoke action)"}
},
"required": ["action"]
}),
});
if let Some(reg) = agents {
for handle in reg.all() {
if handle.id == caller_id {
continue;
}
tools.push(ToolDef {
name: format!("agent_{}", handle.id),
description: format!(
"Send a task to agent '{}'. Returns the agent's reply.",
handle.id
),
parameters: json!({
"type": "object",
"properties": {
"text": {"type": "string", "description": "Task or message to send"}
},
"required": ["text"]
}),
});
}
}
tracing::debug!(
count = external_agents.len(),
"build_tool_list: external agents"
);
for ext in external_agents {
if ext.id == caller_id {
continue;
}
tools.push(ToolDef {
name: format!("agent_{}", ext.id),
description: format!(
"Send a task to remote agent '{}' at {}. Returns the agent's reply.",
ext.id, ext.url
),
parameters: json!({
"type": "object",
"properties": {
"text": {"type": "string", "description": "Task or message to send"}
},
"required": ["text"]
}),
});
}
for skill in skills.all() {
for spec in &skill.tools {
tools.push(ToolDef {
name: format!("{}.{}", skill.name, spec.name),
description: spec.description.clone(),
parameters: spec
.input_schema
.clone()
.unwrap_or_else(|| Value::Object(Default::default())),
});
}
}
tools
}
fn compress_image_for_llm(data_uri: &str) -> Option<String> {
let b64 = data_uri
.strip_prefix("data:image/png;base64,")
.or_else(|| data_uri.strip_prefix("data:image/jpeg;base64,"))
.or_else(|| data_uri.strip_prefix("data:image/webp;base64,"))
.or_else(|| data_uri.strip_prefix("data:image/gif;base64,"))
.unwrap_or(data_uri);
use base64::Engine;
let bytes = base64::engine::general_purpose::STANDARD.decode(b64).ok()?;
if bytes.len() < 200_000 {
return Some(data_uri.to_owned());
}
let tmp_in = std::env::temp_dir().join(format!("rsclaw_img_in_{}.png", uuid::Uuid::new_v4()));
let tmp_out = std::env::temp_dir().join(format!("rsclaw_img_out_{}.jpg", uuid::Uuid::new_v4()));
std::fs::write(&tmp_in, &bytes).ok()?;
let ok = std::process::Command::new("ffmpeg")
.args([
"-y",
"-i",
tmp_in.to_str()?,
"-vf",
"scale='min(1024,iw)':'min(1024,ih)':force_original_aspect_ratio=decrease",
"-q:v",
"5", tmp_out.to_str()?,
])
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.status()
.map(|s| s.success())
.unwrap_or(false);
#[cfg(target_os = "macos")]
let ok = if !ok {
let _ = std::fs::copy(&tmp_in, &tmp_out);
std::process::Command::new("sips")
.args([
"--resampleWidth",
"1024",
"--setProperty",
"formatOptions",
"85",
tmp_out.to_str()?,
])
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.status()
.map(|s| s.success())
.unwrap_or(false)
} else {
ok
};
let result = if ok && tmp_out.exists() {
let compressed = std::fs::read(&tmp_out).ok()?;
let b64 = base64::engine::general_purpose::STANDARD.encode(&compressed);
tracing::debug!(
original = bytes.len(),
compressed = compressed.len(),
"image compressed for LLM"
);
Some(format!("data:image/jpeg;base64,{b64}"))
} else {
Some(data_uri.to_owned())
};
let _ = std::fs::remove_file(&tmp_in);
let _ = std::fs::remove_file(&tmp_out);
result
}
#[allow(dead_code)]
const MAX_FILE_CONTENT_CHARS: usize = 20_000;
fn check_write_safety(path: &str, full: &std::path::Path, content: &str) -> anyhow::Result<()> {
if path.starts_with('/') || path.starts_with('\\') || path.contains(":\\") {
anyhow::bail!(
"[blocked] absolute path not allowed: {path}. Use relative paths within workspace."
);
}
if path.contains("../") || path.contains("..\\") {
anyhow::bail!("[blocked] path traversal not allowed: {path}");
}
let path_lower = path.to_lowercase();
const SENSITIVE_NAMES: &[&str] = &[
".bashrc",
".bash_profile",
".zshrc",
".profile",
".login",
"authorized_keys",
"known_hosts",
"id_rsa",
"id_ed25519",
"crontab",
".env",
"openclaw.json",
"rsclaw.json5",
"auth-profiles.json",
];
let filename = full
.file_name()
.map(|f| f.to_string_lossy().to_lowercase())
.unwrap_or_default();
for sensitive in SENSITIVE_NAMES {
if filename == *sensitive || path_lower.ends_with(sensitive) {
anyhow::bail!("[blocked] write to sensitive file: {path}");
}
}
if !content.is_empty() {
let preparse = crate::agent::preparse::PreParseEngine::load();
for line in content.lines() {
let trimmed = line.trim();
if trimmed.is_empty()
|| trimmed.starts_with('#')
|| trimmed.starts_with("//")
|| trimmed.starts_with("--")
{
continue;
}
match preparse.check_exec_safety(trimmed) {
crate::agent::preparse::SafetyCheck::Deny(reason) => {
anyhow::bail!("[blocked] file contains dangerous command: {reason}");
}
_ => {}
}
}
}
Ok(())
}
fn check_read_safety(path: &str, full: &std::path::Path) -> anyhow::Result<()> {
let path_str = full.to_string_lossy().to_lowercase();
let path_lower = path.to_lowercase();
const SENSITIVE_DIRS: &[&str] = &[
".ssh/",
".gnupg/",
".gpg/",
".aws/",
".azure/",
".gcloud/",
".config/gcloud/",
".kube/",
".docker/",
".claude/",
".opencode/",
".openclaw/credentials/",
".rsclaw/credentials/",
];
for dir in SENSITIVE_DIRS {
if path_lower.contains(dir) || path_str.contains(dir) {
anyhow::bail!("[blocked] access to sensitive directory: {path}");
}
}
let filename = full
.file_name()
.map(|f| f.to_string_lossy().to_lowercase())
.unwrap_or_default();
const SENSITIVE_FILES: &[&str] = &[
"id_rsa",
"id_ed25519",
"id_ecdsa",
"id_dsa",
"id_rsa.pub",
"id_ed25519.pub",
"authorized_keys",
"known_hosts",
"secring.gpg",
"trustdb.gpg",
"credentials",
"credentials.json",
"credentials.yaml",
"service_account.json",
"application_default_credentials.json",
".env",
".env.local",
".env.production",
".env.secret",
".netrc",
".npmrc",
".pypirc",
".bash_history",
".zsh_history",
".pgpass",
".my.cnf",
".mongoshrc.js",
"config.json", "wallet.dat",
"keystore",
"openclaw.json",
"rsclaw.json5",
"auth-profiles.json",
];
for sensitive in SENSITIVE_FILES {
if filename == *sensitive {
anyhow::bail!("[blocked] access to sensitive file: {path}");
}
}
if filename.contains("private") && (filename.contains("key") || filename.ends_with(".pem")) {
anyhow::bail!("[blocked] access to private key file: {path}");
}
const SYSTEM_FILES: &[&str] = &[
"/etc/shadow",
"/etc/gshadow",
"/etc/master.passwd",
"/etc/sudoers",
];
for sys in SYSTEM_FILES {
if path_str.ends_with(sys) || path == *sys {
anyhow::bail!("[blocked] access to system file: {path}");
}
}
Ok(())
}
fn check_file_content_safety(file_path: &std::path::Path) -> anyhow::Result<()> {
let content = match std::fs::read_to_string(file_path) {
Ok(c) => c,
Err(_) => return Ok(()), };
let preparse = crate::agent::preparse::PreParseEngine::load();
for (line_num, line) in content.lines().enumerate() {
let trimmed = line.trim();
if trimmed.is_empty()
|| trimmed.starts_with('#')
|| trimmed.starts_with("//")
|| trimmed.starts_with("--")
{
continue;
}
match preparse.check_exec_safety(trimmed) {
crate::agent::preparse::SafetyCheck::Deny(reason) => {
anyhow::bail!(
"[blocked] file {}:{} contains dangerous command: {reason}",
file_path.display(),
line_num + 1
);
}
_ => {}
}
}
Ok(())
}
pub fn estimate_tokens(text: &str) -> usize {
let mut ascii_chars = 0usize;
let mut cjk_chars = 0usize;
let mut other_chars = 0usize;
for ch in text.chars() {
if ch.is_ascii() {
ascii_chars += 1;
} else if ('\u{4E00}'..='\u{9FFF}').contains(&ch)
|| ('\u{3400}'..='\u{4DBF}').contains(&ch)
|| ('\u{3000}'..='\u{303F}').contains(&ch)
|| ('\u{FF00}'..='\u{FFEF}').contains(&ch)
|| ('\u{AC00}'..='\u{D7AF}').contains(&ch)
{
cjk_chars += 1;
} else {
other_chars += 1;
}
}
(ascii_chars / 4) + (cjk_chars * 3 / 2) + (other_chars / 2)
}
fn strip_old_images(mut messages: Vec<Message>) -> Vec<Message> {
let last_user_idx = messages.iter().rposition(|m| m.role == Role::User);
for (i, msg) in messages.iter_mut().enumerate() {
if Some(i) == last_user_idx {
continue; }
if let MessageContent::Parts(parts) = &msg.content {
let has_image = parts.iter().any(|p| matches!(p, ContentPart::Image { .. }));
if has_image {
let text: String = parts
.iter()
.filter_map(|p| match p {
ContentPart::Text { text } => Some(text.as_str()),
_ => None,
})
.collect::<Vec<_>>()
.join(" ");
msg.content = MessageContent::Text(if text.is_empty() {
"[image]".to_owned()
} else {
format!("{text} [image]")
});
}
}
}
messages
}
fn apply_context_pruning(messages: &mut Vec<Message>, cfg: Option<&ContextPruningConfig>) {
let Some(cfg) = cfg else { return };
let total: usize = messages.iter().map(msg_chars).sum();
if let Some(hc) = &cfg.hard_clear
&& hc.enabled.unwrap_or(false)
{
let threshold = hc.threshold.unwrap_or(200_000) as usize;
if total > threshold {
let last_user = messages
.iter()
.rev()
.find(|m| m.role == Role::User)
.cloned();
messages.clear();
if let Some(m) = last_user {
messages.push(m);
}
return;
}
}
if let Some(st) = &cfg.soft_trim
&& st.enabled.unwrap_or(false)
{
let limit = st.tail_chars.unwrap_or(80_000) as usize;
let min_prunable = cfg.min_prunable_tool_chars.unwrap_or(500) as usize;
if total > limit {
let mut chars_over = total - limit;
let mut to_remove: Vec<usize> = Vec::new();
for (i, msg) in messages.iter().enumerate() {
if chars_over == 0 {
break;
}
if msg.role == Role::Tool {
let c = msg_chars(msg);
if c >= min_prunable {
to_remove.push(i);
chars_over = chars_over.saturating_sub(c);
}
}
}
for i in to_remove.into_iter().rev() {
messages.remove(i);
}
}
}
}
fn msg_chars(m: &Message) -> usize {
match &m.content {
MessageContent::Text(t) => t.len(),
MessageContent::Parts(parts) => parts
.iter()
.map(|p| match p {
ContentPart::Text { text } => text.len(),
_ => 50,
})
.sum(),
}
}
fn msg_tokens(m: &Message) -> usize {
let text = match &m.content {
MessageContent::Text(t) => t.as_str(),
MessageContent::Parts(parts) => {
return parts
.iter()
.map(|p| match p {
ContentPart::Text { text } => estimate_tokens(text),
_ => 50,
})
.sum();
}
};
estimate_tokens(text)
}
fn apply_context_budget_trim(
messages: &mut Vec<Message>,
context_tokens: usize,
system_prompt: &str,
tools: &[ToolDef],
) {
if messages.len() <= 6 {
return;
}
let reply_reserve = (context_tokens / 5).max(2000);
let system_tokens = system_prompt.len() / 4;
let tools_tokens = serde_json::to_string(tools)
.map(|s| s.len() / 4)
.unwrap_or(0);
let history_budget_tokens = context_tokens
.saturating_sub(reply_reserve)
.saturating_sub(system_tokens)
.saturating_sub(tools_tokens);
let history_budget_chars = history_budget_tokens * 4;
let total_chars: usize = messages.iter().map(msg_chars).sum();
if total_chars <= history_budget_chars {
return;
}
let min_keep = 6;
let max_removable = messages.len().saturating_sub(min_keep);
let mut removed_chars: usize = 0;
let mut remove_count = 0;
for i in 0..max_removable {
if total_chars - removed_chars <= history_budget_chars {
break;
}
removed_chars += msg_chars(&messages[i]);
remove_count += 1;
}
if remove_count > 0 {
tracing::debug!(
context_tokens,
history_budget_chars,
total_chars,
removed = remove_count,
"context budget trim: removed {remove_count} oldest messages"
);
messages.drain(..remove_count);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
config::schema::{ContextPruningConfig, HardClearConfig, SoftTrimConfig},
provider::{Message, MessageContent, Role},
skill::SkillRegistry,
};
fn text_msg(role: Role, text: &str) -> Message {
Message {
role,
content: MessageContent::Text(text.to_owned()),
}
}
#[test]
fn msg_chars_text_variant() {
let m = text_msg(Role::User, "hello");
assert_eq!(msg_chars(&m), 5);
}
#[test]
fn msg_chars_parts_variant() {
let m = Message {
role: Role::Assistant,
content: MessageContent::Parts(vec![
ContentPart::Text {
text: "abc".to_owned(),
},
ContentPart::Text {
text: "de".to_owned(),
},
]),
};
assert_eq!(msg_chars(&m), 5);
}
#[test]
fn hard_clear_removes_all_but_last_user() -> anyhow::Result<()> {
let mut msgs = vec![
text_msg(Role::User, &"u".repeat(50_000)),
text_msg(Role::Assistant, &"a".repeat(50_000)),
text_msg(Role::Tool, &"t".repeat(50_000)),
text_msg(Role::User, "last user message"),
];
let cfg = ContextPruningConfig {
mode: None,
ttl: None,
keep_last_assistants: None,
min_prunable_tool_chars: None,
soft_trim: None,
hard_clear: Some(HardClearConfig {
enabled: Some(true),
threshold: Some(100_000),
}),
tools: None,
};
apply_context_pruning(&mut msgs, Some(&cfg));
assert_eq!(msgs.len(), 1, "hard clear should leave only one message");
assert_eq!(msgs[0].role, Role::User);
match &msgs[0].content {
MessageContent::Text(t) => assert_eq!(t, "last user message"),
other => return Err(anyhow::anyhow!("expected Text content, got {:?}", other)),
}
Ok(())
}
#[test]
fn soft_trim_removes_large_tool_messages() {
let large_tool = "x".repeat(2_000);
let mut msgs = vec![
text_msg(Role::User, "hi"),
text_msg(Role::Tool, &large_tool),
text_msg(Role::Assistant, "response"),
];
let cfg = ContextPruningConfig {
mode: None,
ttl: None,
keep_last_assistants: None,
min_prunable_tool_chars: Some(500),
soft_trim: Some(SoftTrimConfig {
enabled: Some(true),
head_chars: None,
tail_chars: Some(500), }),
hard_clear: None,
tools: None,
};
apply_context_pruning(&mut msgs, Some(&cfg));
let has_tool = msgs.iter().any(|m| m.role == Role::Tool);
assert!(!has_tool, "large Tool message should have been pruned");
}
#[test]
fn build_tool_list_contains_builtins() {
let skills = SkillRegistry::new();
let tools = build_tool_list(&skills, None, "test-agent", &[]);
let names: Vec<&str> = tools.iter().map(|t| t.name.as_str()).collect();
for expected in &[
"memory", "session", "agent", "channel", "read", "write", "exec",
] {
assert!(
names.contains(expected),
"expected built-in tool `{expected}` in tool list, got: {names:?}"
);
}
}
}