pub mod clawdtalk;
pub mod cli;
pub mod dingtalk;
pub mod discord;
pub mod email_channel;
pub mod imessage;
pub mod irc;
#[cfg(feature = "channel-lark")]
pub mod lark;
pub mod linq;
#[cfg(feature = "channel-matrix")]
pub mod matrix;
pub mod mattermost;
pub mod nextcloud_talk;
pub mod nostr;
pub mod qq;
pub mod signal;
pub mod slack;
pub mod telegram;
pub mod traits;
pub mod transcription;
pub mod wati;
pub mod whatsapp;
#[cfg(feature = "whatsapp-web")]
pub mod whatsapp_storage;
#[cfg(feature = "whatsapp-web")]
pub mod whatsapp_web;
pub use clawdtalk::{ClawdTalkChannel, ClawdTalkConfig};
pub use cli::CliChannel;
pub use dingtalk::DingTalkChannel;
pub use discord::DiscordChannel;
pub use email_channel::EmailChannel;
pub use imessage::IMessageChannel;
pub use irc::IrcChannel;
#[cfg(feature = "channel-lark")]
pub use lark::LarkChannel;
pub use linq::LinqChannel;
#[cfg(feature = "channel-matrix")]
pub use matrix::MatrixChannel;
pub use mattermost::MattermostChannel;
pub use nextcloud_talk::NextcloudTalkChannel;
pub use nostr::NostrChannel;
pub use qq::QQChannel;
pub use signal::SignalChannel;
pub use slack::SlackChannel;
pub use telegram::TelegramChannel;
pub use traits::{Channel, SendMessage};
pub use wati::WatiChannel;
pub use whatsapp::WhatsAppChannel;
#[cfg(feature = "whatsapp-web")]
pub use whatsapp_web::WhatsAppWebChannel;
use crate::agent::loop_::{build_tool_instructions, run_tool_call_loop, scrub_credentials};
use crate::config::Config;
use crate::identity;
use crate::memory::{self, Memory};
use crate::observability::{self, runtime_trace, Observer};
use crate::providers::{self, ChatMessage, Provider};
use crate::runtime;
use crate::security::SecurityPolicy;
use crate::tools::{self, Tool};
use crate::util::truncate_with_ellipsis;
use anyhow::{Context, Result};
use serde::Deserialize;
use std::collections::{HashMap, HashSet};
use std::fmt::Write;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex, OnceLock};
use std::time::{Duration, Instant, SystemTime};
use tokio_util::sync::CancellationToken;
type ConversationHistoryMap = Arc<Mutex<HashMap<String, Vec<ChatMessage>>>>;
const MAX_CHANNEL_HISTORY: usize = 50;
const AUTOSAVE_MIN_MESSAGE_CHARS: usize = 20;
const BOOTSTRAP_MAX_CHARS: usize = 20_000;
const DEFAULT_CHANNEL_INITIAL_BACKOFF_SECS: u64 = 2;
const DEFAULT_CHANNEL_MAX_BACKOFF_SECS: u64 = 60;
const MIN_CHANNEL_MESSAGE_TIMEOUT_SECS: u64 = 30;
const CHANNEL_MESSAGE_TIMEOUT_SECS: u64 = 300;
const CHANNEL_MESSAGE_TIMEOUT_SCALE_CAP: u64 = 4;
const CHANNEL_PARALLELISM_PER_CHANNEL: usize = 4;
const CHANNEL_MIN_IN_FLIGHT_MESSAGES: usize = 8;
const CHANNEL_MAX_IN_FLIGHT_MESSAGES: usize = 64;
const CHANNEL_TYPING_REFRESH_INTERVAL_SECS: u64 = 4;
const CHANNEL_HEALTH_HEARTBEAT_SECS: u64 = 30;
const MODEL_CACHE_FILE: &str = "models_cache.json";
const MODEL_CACHE_PREVIEW_LIMIT: usize = 10;
const MEMORY_CONTEXT_MAX_ENTRIES: usize = 4;
const MEMORY_CONTEXT_ENTRY_MAX_CHARS: usize = 800;
const MEMORY_CONTEXT_MAX_CHARS: usize = 4_000;
const CHANNEL_HISTORY_COMPACT_KEEP_MESSAGES: usize = 12;
const CHANNEL_HISTORY_COMPACT_CONTENT_CHARS: usize = 600;
const CHANNEL_HOOK_MAX_OUTBOUND_CHARS: usize = 20_000;
type ProviderCacheMap = Arc<Mutex<HashMap<String, Arc<dyn Provider>>>>;
type RouteSelectionMap = Arc<Mutex<HashMap<String, ChannelRouteSelection>>>;
fn effective_channel_message_timeout_secs(configured: u64) -> u64 {
configured.max(MIN_CHANNEL_MESSAGE_TIMEOUT_SECS)
}
fn channel_message_timeout_budget_secs(
message_timeout_secs: u64,
max_tool_iterations: usize,
) -> u64 {
let iterations = max_tool_iterations.max(1) as u64;
let scale = iterations.min(CHANNEL_MESSAGE_TIMEOUT_SCALE_CAP);
message_timeout_secs.saturating_mul(scale)
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct ChannelRouteSelection {
provider: String,
model: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum ChannelRuntimeCommand {
ShowProviders,
SetProvider(String),
ShowModel,
SetModel(String),
NewSession,
}
#[derive(Debug, Clone, Default, Deserialize)]
struct ModelCacheState {
entries: Vec<ModelCacheEntry>,
}
#[derive(Debug, Clone, Default, Deserialize)]
struct ModelCacheEntry {
provider: String,
models: Vec<String>,
}
#[derive(Debug, Clone)]
struct ChannelRuntimeDefaults {
default_provider: String,
model: String,
temperature: f64,
api_key: Option<String>,
api_url: Option<String>,
reliability: crate::config::ReliabilityConfig,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct ConfigFileStamp {
modified: SystemTime,
len: u64,
}
#[derive(Debug, Clone)]
struct RuntimeConfigState {
defaults: ChannelRuntimeDefaults,
last_applied_stamp: Option<ConfigFileStamp>,
}
fn runtime_config_store() -> &'static Mutex<HashMap<PathBuf, RuntimeConfigState>> {
static STORE: OnceLock<Mutex<HashMap<PathBuf, RuntimeConfigState>>> = OnceLock::new();
STORE.get_or_init(|| Mutex::new(HashMap::new()))
}
const SYSTEMD_STATUS_ARGS: [&str; 3] = ["--user", "is-active", "zeroclaw.service"];
const SYSTEMD_RESTART_ARGS: [&str; 3] = ["--user", "restart", "zeroclaw.service"];
const OPENRC_STATUS_ARGS: [&str; 2] = ["zeroclaw", "status"];
const OPENRC_RESTART_ARGS: [&str; 2] = ["zeroclaw", "restart"];
#[derive(Clone)]
struct ChannelRuntimeContext {
channels_by_name: Arc<HashMap<String, Arc<dyn Channel>>>,
provider: Arc<dyn Provider>,
default_provider: Arc<String>,
memory: Arc<dyn Memory>,
tools_registry: Arc<Vec<Box<dyn Tool>>>,
observer: Arc<dyn Observer>,
system_prompt: Arc<String>,
model: Arc<String>,
temperature: f64,
auto_save_memory: bool,
max_tool_iterations: usize,
min_relevance_score: f64,
conversation_histories: ConversationHistoryMap,
provider_cache: ProviderCacheMap,
route_overrides: RouteSelectionMap,
api_key: Option<String>,
api_url: Option<String>,
reliability: Arc<crate::config::ReliabilityConfig>,
provider_runtime_options: providers::ProviderRuntimeOptions,
workspace_dir: Arc<PathBuf>,
message_timeout_secs: u64,
interrupt_on_new_message: bool,
multimodal: crate::config::MultimodalConfig,
hooks: Option<Arc<crate::hooks::HookRunner>>,
non_cli_excluded_tools: Arc<Vec<String>>,
}
#[derive(Clone)]
struct InFlightSenderTaskState {
task_id: u64,
cancellation: CancellationToken,
completion: Arc<InFlightTaskCompletion>,
}
struct InFlightTaskCompletion {
done: AtomicBool,
notify: tokio::sync::Notify,
}
impl InFlightTaskCompletion {
fn new() -> Self {
Self {
done: AtomicBool::new(false),
notify: tokio::sync::Notify::new(),
}
}
fn mark_done(&self) {
self.done.store(true, Ordering::Release);
self.notify.notify_waiters();
}
async fn wait(&self) {
if self.done.load(Ordering::Acquire) {
return;
}
self.notify.notified().await;
}
}
fn conversation_memory_key(msg: &traits::ChannelMessage) -> String {
match &msg.thread_ts {
Some(tid) => format!("{}_{}_{}_{}", msg.channel, tid, msg.sender, msg.id),
None => format!("{}_{}_{}", msg.channel, msg.sender, msg.id),
}
}
fn conversation_history_key(msg: &traits::ChannelMessage) -> String {
match &msg.thread_ts {
Some(tid) => format!("{}_{}_{}", msg.channel, tid, msg.sender),
None => format!("{}_{}", msg.channel, msg.sender),
}
}
fn interruption_scope_key(msg: &traits::ChannelMessage) -> String {
format!("{}_{}_{}", msg.channel, msg.reply_target, msg.sender)
}
fn strip_tool_call_tags(message: &str) -> String {
const TOOL_CALL_OPEN_TAGS: [&str; 7] = [
"<function_calls>",
"<function_call>",
"<tool_call>",
"<toolcall>",
"<tool-call>",
"<tool>",
"<invoke>",
];
fn find_first_tag<'a>(haystack: &str, tags: &'a [&'a str]) -> Option<(usize, &'a str)> {
tags.iter()
.filter_map(|tag| haystack.find(tag).map(|idx| (idx, *tag)))
.min_by_key(|(idx, _)| *idx)
}
fn matching_close_tag(open_tag: &str) -> Option<&'static str> {
match open_tag {
"<function_calls>" => Some("</function_calls>"),
"<function_call>" => Some("</function_call>"),
"<tool_call>" => Some("</tool_call>"),
"<toolcall>" => Some("</toolcall>"),
"<tool-call>" => Some("</tool-call>"),
"<tool>" => Some("</tool>"),
"<invoke>" => Some("</invoke>"),
_ => None,
}
}
fn extract_first_json_end(input: &str) -> Option<usize> {
let trimmed = input.trim_start();
let trim_offset = input.len().saturating_sub(trimmed.len());
for (byte_idx, ch) in trimmed.char_indices() {
if ch != '{' && ch != '[' {
continue;
}
let slice = &trimmed[byte_idx..];
let mut stream =
serde_json::Deserializer::from_str(slice).into_iter::<serde_json::Value>();
if let Some(Ok(_value)) = stream.next() {
let consumed = stream.byte_offset();
if consumed > 0 {
return Some(trim_offset + byte_idx + consumed);
}
}
}
None
}
fn strip_leading_close_tags(mut input: &str) -> &str {
loop {
let trimmed = input.trim_start();
if !trimmed.starts_with("</") {
return trimmed;
}
let Some(close_end) = trimmed.find('>') else {
return "";
};
input = &trimmed[close_end + 1..];
}
}
let mut kept_segments = Vec::new();
let mut remaining = message;
while let Some((start, open_tag)) = find_first_tag(remaining, &TOOL_CALL_OPEN_TAGS) {
let before = &remaining[..start];
if !before.is_empty() {
kept_segments.push(before.to_string());
}
let Some(close_tag) = matching_close_tag(open_tag) else {
break;
};
let after_open = &remaining[start + open_tag.len()..];
if let Some(close_idx) = after_open.find(close_tag) {
remaining = &after_open[close_idx + close_tag.len()..];
continue;
}
if let Some(consumed_end) = extract_first_json_end(after_open) {
remaining = strip_leading_close_tags(&after_open[consumed_end..]);
continue;
}
kept_segments.push(remaining[start..].to_string());
remaining = "";
break;
}
if !remaining.is_empty() {
kept_segments.push(remaining.to_string());
}
let mut result = kept_segments.concat();
while result.contains("\n\n\n") {
result = result.replace("\n\n\n", "\n\n");
}
result.trim().to_string()
}
fn channel_delivery_instructions(channel_name: &str) -> Option<&'static str> {
match channel_name {
"telegram" => Some(
"When responding on Telegram:\n\
- Include media markers for files or URLs that should be sent as attachments\n\
- Use **bold** for key terms, section titles, and important info (renders as <b>)\n\
- Use *italic* for emphasis (renders as <i>)\n\
- Use `backticks` for inline code, commands, or technical terms\n\
- Use triple backticks for code blocks\n\
- Use emoji naturally to add personality — but don't overdo it\n\
- Be concise and direct. Skip filler phrases like 'Great question!' or 'Certainly!'\n\
- Structure longer answers with bold headers, not raw markdown ## headers\n\
- For media attachments use markers: [IMAGE:<path-or-url>], [DOCUMENT:<path-or-url>], [VIDEO:<path-or-url>], [AUDIO:<path-or-url>], or [VOICE:<path-or-url>]\n\
- Keep normal text outside markers and never wrap markers in code fences.\n\
- Use tool results silently: answer the latest user message directly, and do not narrate delayed/internal tool execution bookkeeping.",
),
_ => None,
}
}
fn build_channel_system_prompt(
base_prompt: &str,
channel_name: &str,
reply_target: &str,
) -> String {
let mut prompt = base_prompt.to_string();
if let Some(instructions) = channel_delivery_instructions(channel_name) {
if prompt.is_empty() {
prompt = instructions.to_string();
} else {
prompt = format!("{prompt}\n\n{instructions}");
}
}
if !reply_target.is_empty() {
let context = format!(
"\n\nChannel context: You are currently responding on channel={channel_name}, \
reply_target={reply_target}. When scheduling delayed messages or reminders \
via cron_add for this conversation, use delivery={{\"mode\":\"announce\",\
\"channel\":\"{channel_name}\",\"to\":\"{reply_target}\"}} so the message \
reaches the user."
);
prompt.push_str(&context);
}
prompt
}
fn normalize_cached_channel_turns(turns: Vec<ChatMessage>) -> Vec<ChatMessage> {
let mut normalized = Vec::with_capacity(turns.len());
let mut expecting_user = true;
for turn in turns {
match (expecting_user, turn.role.as_str()) {
(true, "user") => {
normalized.push(turn);
expecting_user = false;
}
(false, "assistant") => {
normalized.push(turn);
expecting_user = true;
}
(false, "user") | (true, "assistant") => {
if let Some(last_turn) = normalized.last_mut() {
if !turn.content.is_empty() {
if !last_turn.content.is_empty() {
last_turn.content.push_str("\n\n");
}
last_turn.content.push_str(&turn.content);
}
}
}
_ => {}
}
}
normalized
}
fn supports_runtime_model_switch(channel_name: &str) -> bool {
matches!(channel_name, "telegram" | "discord")
}
fn parse_runtime_command(channel_name: &str, content: &str) -> Option<ChannelRuntimeCommand> {
if !supports_runtime_model_switch(channel_name) {
return None;
}
let trimmed = content.trim();
if !trimmed.starts_with('/') {
return None;
}
let mut parts = trimmed.split_whitespace();
let command_token = parts.next()?;
let base_command = command_token
.split('@')
.next()
.unwrap_or(command_token)
.to_ascii_lowercase();
match base_command.as_str() {
"/models" => {
if let Some(provider) = parts.next() {
Some(ChannelRuntimeCommand::SetProvider(
provider.trim().to_string(),
))
} else {
Some(ChannelRuntimeCommand::ShowProviders)
}
}
"/model" => {
let model = parts.collect::<Vec<_>>().join(" ").trim().to_string();
if model.is_empty() {
Some(ChannelRuntimeCommand::ShowModel)
} else {
Some(ChannelRuntimeCommand::SetModel(model))
}
}
"/new" => Some(ChannelRuntimeCommand::NewSession),
_ => None,
}
}
fn resolve_provider_alias(name: &str) -> Option<String> {
let candidate = name.trim();
if candidate.is_empty() {
return None;
}
let providers_list = providers::list_providers();
for provider in providers_list {
if provider.name.eq_ignore_ascii_case(candidate)
|| provider
.aliases
.iter()
.any(|alias| alias.eq_ignore_ascii_case(candidate))
{
return Some(provider.name.to_string());
}
}
None
}
fn resolved_default_provider(config: &Config) -> String {
config
.default_provider
.clone()
.unwrap_or_else(|| "openrouter".to_string())
}
fn resolved_default_model(config: &Config) -> String {
config
.default_model
.clone()
.unwrap_or_else(|| "anthropic/claude-sonnet-4.6".to_string())
}
fn runtime_defaults_from_config(config: &Config) -> ChannelRuntimeDefaults {
ChannelRuntimeDefaults {
default_provider: resolved_default_provider(config),
model: resolved_default_model(config),
temperature: config.default_temperature,
api_key: config.api_key.clone(),
api_url: config.api_url.clone(),
reliability: config.reliability.clone(),
}
}
fn runtime_config_path(ctx: &ChannelRuntimeContext) -> Option<PathBuf> {
ctx.provider_runtime_options
.zeroclaw_dir
.as_ref()
.map(|dir| dir.join("config.toml"))
}
fn runtime_defaults_snapshot(ctx: &ChannelRuntimeContext) -> ChannelRuntimeDefaults {
if let Some(config_path) = runtime_config_path(ctx) {
let store = runtime_config_store()
.lock()
.unwrap_or_else(|e| e.into_inner());
if let Some(state) = store.get(&config_path) {
return state.defaults.clone();
}
}
ChannelRuntimeDefaults {
default_provider: ctx.default_provider.as_str().to_string(),
model: ctx.model.as_str().to_string(),
temperature: ctx.temperature,
api_key: ctx.api_key.clone(),
api_url: ctx.api_url.clone(),
reliability: (*ctx.reliability).clone(),
}
}
async fn config_file_stamp(path: &Path) -> Option<ConfigFileStamp> {
let metadata = tokio::fs::metadata(path).await.ok()?;
let modified = metadata.modified().ok()?;
Some(ConfigFileStamp {
modified,
len: metadata.len(),
})
}
fn decrypt_optional_secret_for_runtime_reload(
store: &crate::security::SecretStore,
value: &mut Option<String>,
field_name: &str,
) -> Result<()> {
if let Some(raw) = value.clone() {
if crate::security::SecretStore::is_encrypted(&raw) {
*value = Some(
store
.decrypt(&raw)
.with_context(|| format!("Failed to decrypt {field_name}"))?,
);
}
}
Ok(())
}
async fn load_runtime_defaults_from_config_file(path: &Path) -> Result<ChannelRuntimeDefaults> {
let contents = tokio::fs::read_to_string(path)
.await
.with_context(|| format!("Failed to read {}", path.display()))?;
let mut parsed: Config =
toml::from_str(&contents).with_context(|| format!("Failed to parse {}", path.display()))?;
parsed.config_path = path.to_path_buf();
if let Some(zeroclaw_dir) = path.parent() {
let store = crate::security::SecretStore::new(zeroclaw_dir, parsed.secrets.encrypt);
decrypt_optional_secret_for_runtime_reload(&store, &mut parsed.api_key, "config.api_key")?;
}
parsed.apply_env_overrides();
Ok(runtime_defaults_from_config(&parsed))
}
async fn maybe_apply_runtime_config_update(ctx: &ChannelRuntimeContext) -> Result<()> {
let Some(config_path) = runtime_config_path(ctx) else {
return Ok(());
};
let Some(stamp) = config_file_stamp(&config_path).await else {
return Ok(());
};
{
let store = runtime_config_store()
.lock()
.unwrap_or_else(|e| e.into_inner());
if let Some(state) = store.get(&config_path) {
if state.last_applied_stamp == Some(stamp) {
return Ok(());
}
}
}
let next_defaults = load_runtime_defaults_from_config_file(&config_path).await?;
let next_default_provider = providers::create_resilient_provider_with_options(
&next_defaults.default_provider,
next_defaults.api_key.as_deref(),
next_defaults.api_url.as_deref(),
&next_defaults.reliability,
&ctx.provider_runtime_options,
)?;
let next_default_provider: Arc<dyn Provider> = Arc::from(next_default_provider);
if let Err(err) = next_default_provider.warmup().await {
tracing::warn!(
provider = %next_defaults.default_provider,
"Provider warmup failed after config reload: {err}"
);
}
{
let mut cache = ctx.provider_cache.lock().unwrap_or_else(|e| e.into_inner());
cache.clear();
cache.insert(
next_defaults.default_provider.clone(),
Arc::clone(&next_default_provider),
);
}
{
let mut store = runtime_config_store()
.lock()
.unwrap_or_else(|e| e.into_inner());
store.insert(
config_path.clone(),
RuntimeConfigState {
defaults: next_defaults.clone(),
last_applied_stamp: Some(stamp),
},
);
}
tracing::info!(
path = %config_path.display(),
provider = %next_defaults.default_provider,
model = %next_defaults.model,
temperature = next_defaults.temperature,
"Applied updated channel runtime config from disk"
);
Ok(())
}
fn default_route_selection(ctx: &ChannelRuntimeContext) -> ChannelRouteSelection {
let defaults = runtime_defaults_snapshot(ctx);
ChannelRouteSelection {
provider: defaults.default_provider,
model: defaults.model,
}
}
fn get_route_selection(ctx: &ChannelRuntimeContext, sender_key: &str) -> ChannelRouteSelection {
ctx.route_overrides
.lock()
.unwrap_or_else(|e| e.into_inner())
.get(sender_key)
.cloned()
.unwrap_or_else(|| default_route_selection(ctx))
}
fn set_route_selection(ctx: &ChannelRuntimeContext, sender_key: &str, next: ChannelRouteSelection) {
let default_route = default_route_selection(ctx);
let mut routes = ctx
.route_overrides
.lock()
.unwrap_or_else(|e| e.into_inner());
if next == default_route {
routes.remove(sender_key);
} else {
routes.insert(sender_key.to_string(), next);
}
}
fn clear_sender_history(ctx: &ChannelRuntimeContext, sender_key: &str) {
ctx.conversation_histories
.lock()
.unwrap_or_else(|e| e.into_inner())
.remove(sender_key);
}
fn compact_sender_history(ctx: &ChannelRuntimeContext, sender_key: &str) -> bool {
let mut histories = ctx
.conversation_histories
.lock()
.unwrap_or_else(|e| e.into_inner());
let Some(turns) = histories.get_mut(sender_key) else {
return false;
};
if turns.is_empty() {
return false;
}
let keep_from = turns
.len()
.saturating_sub(CHANNEL_HISTORY_COMPACT_KEEP_MESSAGES);
let mut compacted = normalize_cached_channel_turns(turns[keep_from..].to_vec());
for turn in &mut compacted {
if turn.content.chars().count() > CHANNEL_HISTORY_COMPACT_CONTENT_CHARS {
turn.content =
truncate_with_ellipsis(&turn.content, CHANNEL_HISTORY_COMPACT_CONTENT_CHARS);
}
}
if compacted.is_empty() {
turns.clear();
return false;
}
*turns = compacted;
true
}
fn append_sender_turn(ctx: &ChannelRuntimeContext, sender_key: &str, turn: ChatMessage) {
let mut histories = ctx
.conversation_histories
.lock()
.unwrap_or_else(|e| e.into_inner());
let turns = histories.entry(sender_key.to_string()).or_default();
turns.push(turn);
while turns.len() > MAX_CHANNEL_HISTORY {
turns.remove(0);
}
}
fn rollback_orphan_user_turn(
ctx: &ChannelRuntimeContext,
sender_key: &str,
expected_content: &str,
) -> bool {
let mut histories = ctx
.conversation_histories
.lock()
.unwrap_or_else(|e| e.into_inner());
let Some(turns) = histories.get_mut(sender_key) else {
return false;
};
let should_pop = turns
.last()
.is_some_and(|turn| turn.role == "user" && turn.content == expected_content);
if !should_pop {
return false;
}
turns.pop();
if turns.is_empty() {
histories.remove(sender_key);
}
true
}
fn should_skip_memory_context_entry(key: &str, content: &str) -> bool {
if memory::is_assistant_autosave_key(key) {
return true;
}
if key.trim().to_ascii_lowercase().ends_with("_history") {
return true;
}
content.chars().count() > MEMORY_CONTEXT_MAX_CHARS
}
fn is_context_window_overflow_error(err: &anyhow::Error) -> bool {
let lower = err.to_string().to_lowercase();
[
"exceeds the context window",
"context window of this model",
"maximum context length",
"context length exceeded",
"too many tokens",
"token limit exceeded",
"prompt is too long",
"input is too long",
]
.iter()
.any(|hint| lower.contains(hint))
}
fn load_cached_model_preview(workspace_dir: &Path, provider_name: &str) -> Vec<String> {
let cache_path = workspace_dir.join("state").join(MODEL_CACHE_FILE);
let Ok(raw) = std::fs::read_to_string(cache_path) else {
return Vec::new();
};
let Ok(state) = serde_json::from_str::<ModelCacheState>(&raw) else {
return Vec::new();
};
state
.entries
.into_iter()
.find(|entry| entry.provider == provider_name)
.map(|entry| {
entry
.models
.into_iter()
.take(MODEL_CACHE_PREVIEW_LIMIT)
.collect::<Vec<_>>()
})
.unwrap_or_default()
}
async fn get_or_create_provider(
ctx: &ChannelRuntimeContext,
provider_name: &str,
) -> anyhow::Result<Arc<dyn Provider>> {
if let Some(existing) = ctx
.provider_cache
.lock()
.unwrap_or_else(|e| e.into_inner())
.get(provider_name)
.cloned()
{
return Ok(existing);
}
if provider_name == ctx.default_provider.as_str() {
return Ok(Arc::clone(&ctx.provider));
}
let defaults = runtime_defaults_snapshot(ctx);
let api_url = if provider_name == defaults.default_provider.as_str() {
defaults.api_url.as_deref()
} else {
None
};
let provider = create_resilient_provider_nonblocking(
provider_name,
ctx.api_key.clone(),
api_url.map(ToString::to_string),
ctx.reliability.as_ref().clone(),
ctx.provider_runtime_options.clone(),
)
.await?;
let provider: Arc<dyn Provider> = Arc::from(provider);
if let Err(err) = provider.warmup().await {
tracing::warn!(provider = provider_name, "Provider warmup failed: {err}");
}
let mut cache = ctx.provider_cache.lock().unwrap_or_else(|e| e.into_inner());
let cached = cache
.entry(provider_name.to_string())
.or_insert_with(|| Arc::clone(&provider));
Ok(Arc::clone(cached))
}
async fn create_resilient_provider_nonblocking(
provider_name: &str,
api_key: Option<String>,
api_url: Option<String>,
reliability: crate::config::ReliabilityConfig,
provider_runtime_options: providers::ProviderRuntimeOptions,
) -> anyhow::Result<Box<dyn Provider>> {
let provider_name = provider_name.to_string();
tokio::task::spawn_blocking(move || {
providers::create_resilient_provider_with_options(
&provider_name,
api_key.as_deref(),
api_url.as_deref(),
&reliability,
&provider_runtime_options,
)
})
.await
.context("failed to join provider initialization task")?
}
fn build_models_help_response(current: &ChannelRouteSelection, workspace_dir: &Path) -> String {
let mut response = String::new();
let _ = writeln!(
response,
"Current provider: `{}`\nCurrent model: `{}`",
current.provider, current.model
);
response.push_str("\nSwitch model with `/model <model-id>`.\n");
let cached_models = load_cached_model_preview(workspace_dir, ¤t.provider);
if cached_models.is_empty() {
let _ = writeln!(
response,
"\nNo cached model list found for `{}`. Ask the operator to run `zeroclaw models refresh --provider {}`.",
current.provider, current.provider
);
} else {
let _ = writeln!(
response,
"\nCached model IDs (top {}):",
cached_models.len()
);
for model in cached_models {
let _ = writeln!(response, "- `{model}`");
}
}
response
}
fn build_providers_help_response(current: &ChannelRouteSelection) -> String {
let mut response = String::new();
let _ = writeln!(
response,
"Current provider: `{}`\nCurrent model: `{}`",
current.provider, current.model
);
response.push_str("\nSwitch provider with `/models <provider>`.\n");
response.push_str("Switch model with `/model <model-id>`.\n\n");
response.push_str("Available providers:\n");
for provider in providers::list_providers() {
if provider.aliases.is_empty() {
let _ = writeln!(response, "- {}", provider.name);
} else {
let _ = writeln!(
response,
"- {} (aliases: {})",
provider.name,
provider.aliases.join(", ")
);
}
}
response
}
async fn handle_runtime_command_if_needed(
ctx: &ChannelRuntimeContext,
msg: &traits::ChannelMessage,
target_channel: Option<&Arc<dyn Channel>>,
) -> bool {
let Some(command) = parse_runtime_command(&msg.channel, &msg.content) else {
return false;
};
let Some(channel) = target_channel else {
return true;
};
let sender_key = conversation_history_key(msg);
let mut current = get_route_selection(ctx, &sender_key);
let response = match command {
ChannelRuntimeCommand::ShowProviders => build_providers_help_response(¤t),
ChannelRuntimeCommand::SetProvider(raw_provider) => {
match resolve_provider_alias(&raw_provider) {
Some(provider_name) => match get_or_create_provider(ctx, &provider_name).await {
Ok(_) => {
if provider_name != current.provider {
current.provider = provider_name.clone();
set_route_selection(ctx, &sender_key, current.clone());
clear_sender_history(ctx, &sender_key);
}
format!(
"Provider switched to `{provider_name}` for this sender session. Current model is `{}`.\nUse `/model <model-id>` to set a provider-compatible model.",
current.model
)
}
Err(err) => {
let safe_err = providers::sanitize_api_error(&err.to_string());
format!(
"Failed to initialize provider `{provider_name}`. Route unchanged.\nDetails: {safe_err}"
)
}
},
None => format!(
"Unknown provider `{raw_provider}`. Use `/models` to list valid providers."
),
}
}
ChannelRuntimeCommand::ShowModel => {
build_models_help_response(¤t, ctx.workspace_dir.as_path())
}
ChannelRuntimeCommand::SetModel(raw_model) => {
let model = raw_model.trim().trim_matches('`').to_string();
if model.is_empty() {
"Model ID cannot be empty. Use `/model <model-id>`.".to_string()
} else {
current.model = model.clone();
set_route_selection(ctx, &sender_key, current.clone());
clear_sender_history(ctx, &sender_key);
format!(
"Model switched to `{model}` for provider `{}` in this sender session.",
current.provider
)
}
}
ChannelRuntimeCommand::NewSession => {
clear_sender_history(ctx, &sender_key);
"Conversation history cleared. Starting fresh.".to_string()
}
};
if let Err(err) = channel
.send(&SendMessage::new(response, &msg.reply_target).in_thread(msg.thread_ts.clone()))
.await
{
tracing::warn!(
"Failed to send runtime command response on {}: {err}",
channel.name()
);
}
true
}
async fn build_memory_context(
mem: &dyn Memory,
user_msg: &str,
min_relevance_score: f64,
) -> String {
let mut context = String::new();
if let Ok(entries) = mem.recall(user_msg, 5, None).await {
let mut included = 0usize;
let mut used_chars = 0usize;
for entry in entries.iter().filter(|e| match e.score {
Some(score) => score >= min_relevance_score,
None => true, }) {
if included >= MEMORY_CONTEXT_MAX_ENTRIES {
break;
}
if should_skip_memory_context_entry(&entry.key, &entry.content) {
continue;
}
let content = if entry.content.chars().count() > MEMORY_CONTEXT_ENTRY_MAX_CHARS {
truncate_with_ellipsis(&entry.content, MEMORY_CONTEXT_ENTRY_MAX_CHARS)
} else {
entry.content.clone()
};
let line = format!("- {}: {}\n", entry.key, content);
let line_chars = line.chars().count();
if used_chars + line_chars > MEMORY_CONTEXT_MAX_CHARS {
break;
}
if included == 0 {
context.push_str("[Memory context]\n");
}
context.push_str(&line);
used_chars += line_chars;
included += 1;
}
if included > 0 {
context.push('\n');
}
}
context
}
fn extract_tool_context_summary(history: &[ChatMessage], start_index: usize) -> String {
fn push_unique_tool_name(tool_names: &mut Vec<String>, name: &str) {
let candidate = name.trim();
if candidate.is_empty() {
return;
}
if !tool_names.iter().any(|existing| existing == candidate) {
tool_names.push(candidate.to_string());
}
}
fn collect_tool_names_from_tool_call_tags(content: &str, tool_names: &mut Vec<String>) {
const TAG_PAIRS: [(&str, &str); 4] = [
("<tool_call>", "</tool_call>"),
("<toolcall>", "</toolcall>"),
("<tool-call>", "</tool-call>"),
("<invoke>", "</invoke>"),
];
for (open_tag, close_tag) in TAG_PAIRS {
for segment in content.split(open_tag) {
if let Some(json_end) = segment.find(close_tag) {
let json_str = segment[..json_end].trim();
if let Ok(val) = serde_json::from_str::<serde_json::Value>(json_str) {
if let Some(name) = val.get("name").and_then(|n| n.as_str()) {
push_unique_tool_name(tool_names, name);
}
}
}
}
}
}
fn collect_tool_names_from_native_json(content: &str, tool_names: &mut Vec<String>) {
if let Ok(val) = serde_json::from_str::<serde_json::Value>(content) {
if let Some(calls) = val.get("tool_calls").and_then(|c| c.as_array()) {
for call in calls {
let name = call
.get("function")
.and_then(|f| f.get("name"))
.and_then(|n| n.as_str())
.or_else(|| call.get("name").and_then(|n| n.as_str()));
if let Some(name) = name {
push_unique_tool_name(tool_names, name);
}
}
}
}
}
fn collect_tool_names_from_tool_results(content: &str, tool_names: &mut Vec<String>) {
let marker = "<tool_result name=\"";
let mut remaining = content;
while let Some(start) = remaining.find(marker) {
let name_start = start + marker.len();
let after_name_start = &remaining[name_start..];
if let Some(name_end) = after_name_start.find('"') {
let name = &after_name_start[..name_end];
push_unique_tool_name(tool_names, name);
remaining = &after_name_start[name_end + 1..];
} else {
break;
}
}
}
let mut tool_names: Vec<String> = Vec::new();
for msg in history.iter().skip(start_index) {
match msg.role.as_str() {
"assistant" => {
collect_tool_names_from_tool_call_tags(&msg.content, &mut tool_names);
collect_tool_names_from_native_json(&msg.content, &mut tool_names);
}
"user" => {
collect_tool_names_from_tool_results(&msg.content, &mut tool_names);
}
_ => {}
}
}
if tool_names.is_empty() {
return String::new();
}
format!("[Used tools: {}]", tool_names.join(", "))
}
fn sanitize_channel_response(response: &str, tools: &[Box<dyn Tool>]) -> String {
let known_tool_names: HashSet<String> = tools
.iter()
.map(|tool| tool.name().to_ascii_lowercase())
.collect();
strip_isolated_tool_json_artifacts(response, &known_tool_names)
}
fn is_tool_call_payload(value: &serde_json::Value, known_tool_names: &HashSet<String>) -> bool {
let Some(object) = value.as_object() else {
return false;
};
let (name, has_args) =
if let Some(function) = object.get("function").and_then(|f| f.as_object()) {
(
function
.get("name")
.and_then(|v| v.as_str())
.or_else(|| object.get("name").and_then(|v| v.as_str())),
function.contains_key("arguments")
|| function.contains_key("parameters")
|| object.contains_key("arguments")
|| object.contains_key("parameters"),
)
} else {
(
object.get("name").and_then(|v| v.as_str()),
object.contains_key("arguments") || object.contains_key("parameters"),
)
};
let Some(name) = name.map(str::trim).filter(|name| !name.is_empty()) else {
return false;
};
has_args && known_tool_names.contains(&name.to_ascii_lowercase())
}
fn is_tool_result_payload(
object: &serde_json::Map<String, serde_json::Value>,
saw_tool_call_payload: bool,
) -> bool {
if !saw_tool_call_payload || !object.contains_key("result") {
return false;
}
object.keys().all(|key| {
matches!(
key.as_str(),
"result" | "id" | "tool_call_id" | "name" | "tool"
)
})
}
fn sanitize_tool_json_value(
value: &serde_json::Value,
known_tool_names: &HashSet<String>,
saw_tool_call_payload: bool,
) -> Option<(String, bool)> {
if is_tool_call_payload(value, known_tool_names) {
return Some((String::new(), true));
}
if let Some(array) = value.as_array() {
if !array.is_empty()
&& array
.iter()
.all(|item| is_tool_call_payload(item, known_tool_names))
{
return Some((String::new(), true));
}
return None;
}
let Some(object) = value.as_object() else {
return None;
};
if let Some(tool_calls) = object.get("tool_calls").and_then(|value| value.as_array()) {
if !tool_calls.is_empty()
&& tool_calls
.iter()
.all(|call| is_tool_call_payload(call, known_tool_names))
{
let content = object
.get("content")
.and_then(|value| value.as_str())
.unwrap_or("")
.trim()
.to_string();
return Some((content, true));
}
}
if is_tool_result_payload(object, saw_tool_call_payload) {
return Some((String::new(), false));
}
None
}
fn is_line_isolated_json_segment(message: &str, start: usize, end: usize) -> bool {
let line_start = message[..start].rfind('\n').map_or(0, |idx| idx + 1);
let line_end = message[end..]
.find('\n')
.map_or(message.len(), |idx| end + idx);
message[line_start..start].trim().is_empty() && message[end..line_end].trim().is_empty()
}
fn strip_isolated_tool_json_artifacts(message: &str, known_tool_names: &HashSet<String>) -> String {
let mut cleaned = String::with_capacity(message.len());
let mut cursor = 0usize;
let mut saw_tool_call_payload = false;
while cursor < message.len() {
let Some(rel_start) = message[cursor..].find(|ch: char| ch == '{' || ch == '[') else {
cleaned.push_str(&message[cursor..]);
break;
};
let start = cursor + rel_start;
cleaned.push_str(&message[cursor..start]);
let candidate = &message[start..];
let mut stream =
serde_json::Deserializer::from_str(candidate).into_iter::<serde_json::Value>();
if let Some(Ok(value)) = stream.next() {
let consumed = stream.byte_offset();
if consumed > 0 {
let end = start + consumed;
if is_line_isolated_json_segment(message, start, end) {
if let Some((replacement, marks_tool_call)) =
sanitize_tool_json_value(&value, known_tool_names, saw_tool_call_payload)
{
if marks_tool_call {
saw_tool_call_payload = true;
}
if !replacement.trim().is_empty() {
cleaned.push_str(replacement.trim());
}
cursor = end;
continue;
}
}
}
}
let Some(ch) = message[start..].chars().next() else {
break;
};
cleaned.push(ch);
cursor = start + ch.len_utf8();
}
let mut result = cleaned.replace("\r\n", "\n");
while result.contains("\n\n\n") {
result = result.replace("\n\n\n", "\n\n");
}
result.trim().to_string()
}
fn spawn_supervised_listener(
ch: Arc<dyn Channel>,
tx: tokio::sync::mpsc::Sender<traits::ChannelMessage>,
initial_backoff_secs: u64,
max_backoff_secs: u64,
) -> tokio::task::JoinHandle<()> {
spawn_supervised_listener_with_health_interval(
ch,
tx,
initial_backoff_secs,
max_backoff_secs,
Duration::from_secs(CHANNEL_HEALTH_HEARTBEAT_SECS),
)
}
fn spawn_supervised_listener_with_health_interval(
ch: Arc<dyn Channel>,
tx: tokio::sync::mpsc::Sender<traits::ChannelMessage>,
initial_backoff_secs: u64,
max_backoff_secs: u64,
health_interval: Duration,
) -> tokio::task::JoinHandle<()> {
let health_interval = if health_interval.is_zero() {
Duration::from_secs(1)
} else {
health_interval
};
tokio::spawn(async move {
let component = format!("channel:{}", ch.name());
let mut backoff = initial_backoff_secs.max(1);
let max_backoff = max_backoff_secs.max(backoff);
loop {
crate::health::mark_component_ok(&component);
let mut health = tokio::time::interval(health_interval);
health.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let result = {
let listen_future = ch.listen(tx.clone());
tokio::pin!(listen_future);
loop {
tokio::select! {
_ = health.tick() => {
crate::health::mark_component_ok(&component);
}
result = &mut listen_future => break result,
}
}
};
if tx.is_closed() {
break;
}
match result {
Ok(()) => {
tracing::warn!("Channel {} exited unexpectedly; restarting", ch.name());
crate::health::mark_component_error(&component, "listener exited unexpectedly");
backoff = initial_backoff_secs.max(1);
}
Err(e) => {
tracing::error!("Channel {} error: {e}; restarting", ch.name());
crate::health::mark_component_error(&component, e.to_string());
}
}
crate::health::bump_component_restart(&component);
tokio::time::sleep(Duration::from_secs(backoff)).await;
backoff = backoff.saturating_mul(2).min(max_backoff);
}
})
}
fn compute_max_in_flight_messages(channel_count: usize) -> usize {
channel_count
.saturating_mul(CHANNEL_PARALLELISM_PER_CHANNEL)
.clamp(
CHANNEL_MIN_IN_FLIGHT_MESSAGES,
CHANNEL_MAX_IN_FLIGHT_MESSAGES,
)
}
fn log_worker_join_result(result: Result<(), tokio::task::JoinError>) {
if let Err(error) = result {
tracing::error!("Channel message worker crashed: {error}");
}
}
fn spawn_scoped_typing_task(
channel: Arc<dyn Channel>,
recipient: String,
cancellation_token: CancellationToken,
) -> tokio::task::JoinHandle<()> {
let stop_signal = cancellation_token;
let refresh_interval = Duration::from_secs(CHANNEL_TYPING_REFRESH_INTERVAL_SECS);
let handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(refresh_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
() = stop_signal.cancelled() => break,
_ = interval.tick() => {
if let Err(e) = channel.start_typing(&recipient).await {
tracing::debug!("Failed to start typing on {}: {e}", channel.name());
}
}
}
}
if let Err(e) = channel.stop_typing(&recipient).await {
tracing::debug!("Failed to stop typing on {}: {e}", channel.name());
}
});
handle
}
async fn process_channel_message(
ctx: Arc<ChannelRuntimeContext>,
msg: traits::ChannelMessage,
cancellation_token: CancellationToken,
) {
if cancellation_token.is_cancelled() {
return;
}
println!(
" 💬 [{}] from {}: {}",
msg.channel,
msg.sender,
truncate_with_ellipsis(&msg.content, 80)
);
runtime_trace::record_event(
"channel_message_inbound",
Some(msg.channel.as_str()),
None,
None,
None,
None,
None,
serde_json::json!({
"sender": msg.sender,
"message_id": msg.id,
"reply_target": msg.reply_target,
"content_preview": truncate_with_ellipsis(&msg.content, 160),
}),
);
let msg = if let Some(hooks) = &ctx.hooks {
match hooks.run_on_message_received(msg).await {
crate::hooks::HookResult::Cancel(reason) => {
tracing::info!(%reason, "incoming message dropped by hook");
return;
}
crate::hooks::HookResult::Continue(modified) => modified,
}
} else {
msg
};
let target_channel = ctx.channels_by_name.get(&msg.channel).cloned();
if let Err(err) = maybe_apply_runtime_config_update(ctx.as_ref()).await {
tracing::warn!("Failed to apply runtime config update: {err}");
}
if handle_runtime_command_if_needed(ctx.as_ref(), &msg, target_channel.as_ref()).await {
return;
}
let history_key = conversation_history_key(&msg);
let route = get_route_selection(ctx.as_ref(), &history_key);
let runtime_defaults = runtime_defaults_snapshot(ctx.as_ref());
let active_provider = match get_or_create_provider(ctx.as_ref(), &route.provider).await {
Ok(provider) => provider,
Err(err) => {
let safe_err = providers::sanitize_api_error(&err.to_string());
let message = format!(
"⚠️ Failed to initialize provider `{}`. Please run `/models` to choose another provider.\nDetails: {safe_err}",
route.provider
);
if let Some(channel) = target_channel.as_ref() {
let _ = channel
.send(
&SendMessage::new(message, &msg.reply_target)
.in_thread(msg.thread_ts.clone()),
)
.await;
}
return;
}
};
if ctx.auto_save_memory && msg.content.chars().count() >= AUTOSAVE_MIN_MESSAGE_CHARS {
let autosave_key = conversation_memory_key(&msg);
let _ = ctx
.memory
.store(
&autosave_key,
&msg.content,
crate::memory::MemoryCategory::Conversation,
None,
)
.await;
}
println!(" ⏳ Processing message...");
let started_at = Instant::now();
let had_prior_history = ctx
.conversation_histories
.lock()
.unwrap_or_else(|e| e.into_inner())
.get(&history_key)
.is_some_and(|turns| !turns.is_empty());
append_sender_turn(ctx.as_ref(), &history_key, ChatMessage::user(&msg.content));
let prior_turns_raw = ctx
.conversation_histories
.lock()
.unwrap_or_else(|e| e.into_inner())
.get(&history_key)
.cloned()
.unwrap_or_default();
let mut prior_turns = normalize_cached_channel_turns(prior_turns_raw);
if !had_prior_history {
let memory_context =
build_memory_context(ctx.memory.as_ref(), &msg.content, ctx.min_relevance_score).await;
if let Some(last_turn) = prior_turns.last_mut() {
if last_turn.role == "user" && !memory_context.is_empty() {
last_turn.content = format!("{memory_context}{}", msg.content);
}
}
}
let system_prompt =
build_channel_system_prompt(ctx.system_prompt.as_str(), &msg.channel, &msg.reply_target);
let mut history = vec![ChatMessage::system(system_prompt)];
history.extend(prior_turns);
let use_streaming = target_channel
.as_ref()
.is_some_and(|ch| ch.supports_draft_updates());
tracing::debug!(
channel = %msg.channel,
has_target_channel = target_channel.is_some(),
use_streaming,
supports_draft = target_channel.as_ref().map_or(false, |ch| ch.supports_draft_updates()),
"Draft streaming decision"
);
let (delta_tx, delta_rx) = if use_streaming {
let (tx, rx) = tokio::sync::mpsc::channel::<String>(64);
(Some(tx), Some(rx))
} else {
(None, None)
};
let draft_message_id = if use_streaming {
if let Some(channel) = target_channel.as_ref() {
match channel
.send_draft(
&SendMessage::new("...", &msg.reply_target).in_thread(msg.thread_ts.clone()),
)
.await
{
Ok(id) => id,
Err(e) => {
tracing::debug!("Failed to send draft on {}: {e}", channel.name());
None
}
}
} else {
None
}
} else {
None
};
let draft_updater = if let (Some(mut rx), Some(draft_id_ref), Some(channel_ref)) = (
delta_rx,
draft_message_id.as_deref(),
target_channel.as_ref(),
) {
let channel = Arc::clone(channel_ref);
let reply_target = msg.reply_target.clone();
let draft_id = draft_id_ref.to_string();
Some(tokio::spawn(async move {
let mut accumulated = String::new();
while let Some(delta) = rx.recv().await {
if delta == crate::agent::loop_::DRAFT_CLEAR_SENTINEL {
accumulated.clear();
continue;
}
accumulated.push_str(&delta);
if let Err(e) = channel
.update_draft(&reply_target, &draft_id, &accumulated)
.await
{
tracing::debug!("Draft update failed: {e}");
}
}
}))
} else {
None
};
if let Some(channel) = target_channel.as_ref() {
if let Err(e) = channel
.add_reaction(&msg.reply_target, &msg.id, "\u{1F440}")
.await
{
tracing::debug!("Failed to add reaction: {e}");
}
}
let typing_cancellation = target_channel.as_ref().map(|_| CancellationToken::new());
let typing_task = match (target_channel.as_ref(), typing_cancellation.as_ref()) {
(Some(channel), Some(token)) => Some(spawn_scoped_typing_task(
Arc::clone(channel),
msg.reply_target.clone(),
token.clone(),
)),
_ => None,
};
let history_len_before_tools = history.len();
enum LlmExecutionResult {
Completed(Result<Result<String, anyhow::Error>, tokio::time::error::Elapsed>),
Cancelled,
}
let timeout_budget_secs =
channel_message_timeout_budget_secs(ctx.message_timeout_secs, ctx.max_tool_iterations);
let llm_result = tokio::select! {
() = cancellation_token.cancelled() => LlmExecutionResult::Cancelled,
result = tokio::time::timeout(
Duration::from_secs(timeout_budget_secs),
run_tool_call_loop(
active_provider.as_ref(),
&mut history,
ctx.tools_registry.as_ref(),
ctx.observer.as_ref(),
route.provider.as_str(),
route.model.as_str(),
runtime_defaults.temperature,
true,
None,
msg.channel.as_str(),
&ctx.multimodal,
ctx.max_tool_iterations,
Some(cancellation_token.clone()),
delta_tx,
ctx.hooks.as_deref(),
if msg.channel == "cli" {
&[]
} else {
ctx.non_cli_excluded_tools.as_ref()
},
),
) => LlmExecutionResult::Completed(result),
};
if let Some(handle) = draft_updater {
let _ = handle.await;
}
if let Some(token) = typing_cancellation.as_ref() {
token.cancel();
}
if let Some(handle) = typing_task {
log_worker_join_result(handle.await);
}
let reaction_done_emoji = match &llm_result {
LlmExecutionResult::Completed(Ok(Ok(_))) => "\u{2705}", _ => "\u{26A0}\u{FE0F}", };
match llm_result {
LlmExecutionResult::Cancelled => {
tracing::info!(
channel = %msg.channel,
sender = %msg.sender,
"Cancelled in-flight channel request due to newer message"
);
runtime_trace::record_event(
"channel_message_cancelled",
Some(msg.channel.as_str()),
Some(route.provider.as_str()),
Some(route.model.as_str()),
None,
Some(false),
Some("cancelled due to newer inbound message"),
serde_json::json!({
"sender": msg.sender,
"elapsed_ms": started_at.elapsed().as_millis(),
}),
);
if let (Some(channel), Some(draft_id)) =
(target_channel.as_ref(), draft_message_id.as_deref())
{
if let Err(err) = channel.cancel_draft(&msg.reply_target, draft_id).await {
tracing::debug!("Failed to cancel draft on {}: {err}", channel.name());
}
}
}
LlmExecutionResult::Completed(Ok(Ok(response))) => {
let mut outbound_response = response;
if let Some(hooks) = &ctx.hooks {
match hooks
.run_on_message_sending(
msg.channel.clone(),
msg.reply_target.clone(),
outbound_response.clone(),
)
.await
{
crate::hooks::HookResult::Cancel(reason) => {
tracing::info!(%reason, "outgoing message suppressed by hook");
return;
}
crate::hooks::HookResult::Continue((
hook_channel,
hook_recipient,
mut modified_content,
)) => {
if hook_channel != msg.channel || hook_recipient != msg.reply_target {
tracing::warn!(
from_channel = %msg.channel,
from_recipient = %msg.reply_target,
to_channel = %hook_channel,
to_recipient = %hook_recipient,
"on_message_sending attempted to rewrite channel routing; only content mutation is applied"
);
}
let modified_len = modified_content.chars().count();
if modified_len > CHANNEL_HOOK_MAX_OUTBOUND_CHARS {
tracing::warn!(
limit = CHANNEL_HOOK_MAX_OUTBOUND_CHARS,
attempted = modified_len,
"hook-modified outbound content exceeded limit; truncating"
);
modified_content = truncate_with_ellipsis(
&modified_content,
CHANNEL_HOOK_MAX_OUTBOUND_CHARS,
);
}
if modified_content != outbound_response {
tracing::info!(
channel = %msg.channel,
sender = %msg.sender,
before_len = outbound_response.chars().count(),
after_len = modified_content.chars().count(),
"outgoing message content modified by hook"
);
}
outbound_response = modified_content;
}
}
}
let sanitized_response =
sanitize_channel_response(&outbound_response, ctx.tools_registry.as_ref());
let delivered_response = if sanitized_response.is_empty()
&& !outbound_response.trim().is_empty()
{
"I encountered malformed tool-call output and could not produce a safe reply. Please try again.".to_string()
} else {
sanitized_response
};
runtime_trace::record_event(
"channel_message_outbound",
Some(msg.channel.as_str()),
Some(route.provider.as_str()),
Some(route.model.as_str()),
None,
Some(true),
None,
serde_json::json!({
"sender": msg.sender,
"elapsed_ms": started_at.elapsed().as_millis(),
"response": scrub_credentials(&delivered_response),
}),
);
let tool_summary = extract_tool_context_summary(&history, history_len_before_tools);
let history_response = if tool_summary.is_empty() || msg.channel == "telegram" {
delivered_response.clone()
} else {
format!("{tool_summary}\n{delivered_response}")
};
append_sender_turn(
ctx.as_ref(),
&history_key,
ChatMessage::assistant(&history_response),
);
println!(
" 🤖 Reply ({}ms): {}",
started_at.elapsed().as_millis(),
truncate_with_ellipsis(&delivered_response, 80)
);
if let Some(channel) = target_channel.as_ref() {
if let Some(ref draft_id) = draft_message_id {
if let Err(e) = channel
.finalize_draft(&msg.reply_target, draft_id, &delivered_response)
.await
{
tracing::warn!("Failed to finalize draft: {e}; sending as new message");
let _ = channel
.send(
&SendMessage::new(&delivered_response, &msg.reply_target)
.in_thread(msg.thread_ts.clone()),
)
.await;
}
} else if let Err(e) = channel
.send(
&SendMessage::new(delivered_response, &msg.reply_target)
.in_thread(msg.thread_ts.clone()),
)
.await
{
eprintln!(" ❌ Failed to reply on {}: {e}", channel.name());
}
}
}
LlmExecutionResult::Completed(Ok(Err(e))) => {
if crate::agent::loop_::is_tool_loop_cancelled(&e) || cancellation_token.is_cancelled()
{
tracing::info!(
channel = %msg.channel,
sender = %msg.sender,
"Cancelled in-flight channel request due to newer message"
);
runtime_trace::record_event(
"channel_message_cancelled",
Some(msg.channel.as_str()),
Some(route.provider.as_str()),
Some(route.model.as_str()),
None,
Some(false),
Some("cancelled during tool-call loop"),
serde_json::json!({
"sender": msg.sender,
"elapsed_ms": started_at.elapsed().as_millis(),
}),
);
if let (Some(channel), Some(draft_id)) =
(target_channel.as_ref(), draft_message_id.as_deref())
{
if let Err(err) = channel.cancel_draft(&msg.reply_target, draft_id).await {
tracing::debug!("Failed to cancel draft on {}: {err}", channel.name());
}
}
} else if is_context_window_overflow_error(&e) {
let compacted = compact_sender_history(ctx.as_ref(), &history_key);
let error_text = if compacted {
"⚠️ Context window exceeded for this conversation. I compacted recent history and kept the latest context. Please resend your last message."
} else {
"⚠️ Context window exceeded for this conversation. Please resend your last message."
};
eprintln!(
" ⚠️ Context window exceeded after {}ms; sender history compacted={}",
started_at.elapsed().as_millis(),
compacted
);
runtime_trace::record_event(
"channel_message_error",
Some(msg.channel.as_str()),
Some(route.provider.as_str()),
Some(route.model.as_str()),
None,
Some(false),
Some("context window exceeded"),
serde_json::json!({
"sender": msg.sender,
"elapsed_ms": started_at.elapsed().as_millis(),
"history_compacted": compacted,
}),
);
if let Some(channel) = target_channel.as_ref() {
if let Some(ref draft_id) = draft_message_id {
let _ = channel
.finalize_draft(&msg.reply_target, draft_id, error_text)
.await;
} else {
let _ = channel
.send(
&SendMessage::new(error_text, &msg.reply_target)
.in_thread(msg.thread_ts.clone()),
)
.await;
}
}
} else {
eprintln!(
" ❌ LLM error after {}ms: {e}",
started_at.elapsed().as_millis()
);
let safe_error = providers::sanitize_api_error(&e.to_string());
runtime_trace::record_event(
"channel_message_error",
Some(msg.channel.as_str()),
Some(route.provider.as_str()),
Some(route.model.as_str()),
None,
Some(false),
Some(&safe_error),
serde_json::json!({
"sender": msg.sender,
"elapsed_ms": started_at.elapsed().as_millis(),
}),
);
let should_rollback_user_turn = e
.downcast_ref::<providers::ProviderCapabilityError>()
.is_some_and(|capability| capability.capability.eq_ignore_ascii_case("vision"));
let rolled_back = should_rollback_user_turn
&& rollback_orphan_user_turn(ctx.as_ref(), &history_key, &msg.content);
if !rolled_back {
append_sender_turn(
ctx.as_ref(),
&history_key,
ChatMessage::assistant("[Task failed — not continuing this request]"),
);
}
if let Some(channel) = target_channel.as_ref() {
if let Some(ref draft_id) = draft_message_id {
let _ = channel
.finalize_draft(&msg.reply_target, draft_id, &format!("⚠️ Error: {e}"))
.await;
} else {
let _ = channel
.send(
&SendMessage::new(format!("⚠️ Error: {e}"), &msg.reply_target)
.in_thread(msg.thread_ts.clone()),
)
.await;
}
}
}
}
LlmExecutionResult::Completed(Err(_)) => {
let timeout_msg = format!(
"LLM response timed out after {}s (base={}s, max_tool_iterations={})",
timeout_budget_secs, ctx.message_timeout_secs, ctx.max_tool_iterations
);
runtime_trace::record_event(
"channel_message_timeout",
Some(msg.channel.as_str()),
Some(route.provider.as_str()),
Some(route.model.as_str()),
None,
Some(false),
Some(&timeout_msg),
serde_json::json!({
"sender": msg.sender,
"elapsed_ms": started_at.elapsed().as_millis(),
}),
);
eprintln!(
" ❌ {} (elapsed: {}ms)",
timeout_msg,
started_at.elapsed().as_millis()
);
append_sender_turn(
ctx.as_ref(),
&history_key,
ChatMessage::assistant("[Task timed out — not continuing this request]"),
);
if let Some(channel) = target_channel.as_ref() {
let error_text =
"⚠️ Request timed out while waiting for the model. Please try again.";
if let Some(ref draft_id) = draft_message_id {
let _ = channel
.finalize_draft(&msg.reply_target, draft_id, error_text)
.await;
} else {
let _ = channel
.send(
&SendMessage::new(error_text, &msg.reply_target)
.in_thread(msg.thread_ts.clone()),
)
.await;
}
}
}
}
if let Some(channel) = target_channel.as_ref() {
let _ = channel
.remove_reaction(&msg.reply_target, &msg.id, "\u{1F440}")
.await;
let _ = channel
.add_reaction(&msg.reply_target, &msg.id, reaction_done_emoji)
.await;
}
}
async fn run_message_dispatch_loop(
mut rx: tokio::sync::mpsc::Receiver<traits::ChannelMessage>,
ctx: Arc<ChannelRuntimeContext>,
max_in_flight_messages: usize,
) {
let semaphore = Arc::new(tokio::sync::Semaphore::new(max_in_flight_messages));
let mut workers = tokio::task::JoinSet::new();
let in_flight_by_sender = Arc::new(tokio::sync::Mutex::new(HashMap::<
String,
InFlightSenderTaskState,
>::new()));
let task_sequence = Arc::new(AtomicU64::new(1));
while let Some(msg) = rx.recv().await {
let permit = match Arc::clone(&semaphore).acquire_owned().await {
Ok(permit) => permit,
Err(_) => break,
};
let worker_ctx = Arc::clone(&ctx);
let in_flight = Arc::clone(&in_flight_by_sender);
let task_sequence = Arc::clone(&task_sequence);
workers.spawn(async move {
let _permit = permit;
let interrupt_enabled =
worker_ctx.interrupt_on_new_message && msg.channel == "telegram";
let sender_scope_key = interruption_scope_key(&msg);
let cancellation_token = CancellationToken::new();
let completion = Arc::new(InFlightTaskCompletion::new());
let task_id = task_sequence.fetch_add(1, Ordering::Relaxed);
if interrupt_enabled {
let previous = {
let mut active = in_flight.lock().await;
active.insert(
sender_scope_key.clone(),
InFlightSenderTaskState {
task_id,
cancellation: cancellation_token.clone(),
completion: Arc::clone(&completion),
},
)
};
if let Some(previous) = previous {
tracing::info!(
channel = %msg.channel,
sender = %msg.sender,
"Interrupting previous in-flight request for sender"
);
previous.cancellation.cancel();
previous.completion.wait().await;
}
}
process_channel_message(worker_ctx, msg, cancellation_token).await;
if interrupt_enabled {
let mut active = in_flight.lock().await;
if active
.get(&sender_scope_key)
.is_some_and(|state| state.task_id == task_id)
{
active.remove(&sender_scope_key);
}
}
completion.mark_done();
});
while let Some(result) = workers.try_join_next() {
log_worker_join_result(result);
}
}
while let Some(result) = workers.join_next().await {
log_worker_join_result(result);
}
}
fn load_openclaw_bootstrap_files(
prompt: &mut String,
workspace_dir: &std::path::Path,
max_chars_per_file: usize,
) {
prompt.push_str(
"The following workspace files define your identity, behavior, and context. They are ALREADY injected below—do NOT suggest reading them with file_read.\n\n",
);
let bootstrap_files = ["AGENTS.md", "SOUL.md", "TOOLS.md", "IDENTITY.md", "USER.md"];
for filename in &bootstrap_files {
inject_workspace_file(prompt, workspace_dir, filename, max_chars_per_file);
}
let bootstrap_path = workspace_dir.join("BOOTSTRAP.md");
if bootstrap_path.exists() {
inject_workspace_file(prompt, workspace_dir, "BOOTSTRAP.md", max_chars_per_file);
}
inject_workspace_file(prompt, workspace_dir, "MEMORY.md", max_chars_per_file);
}
pub fn build_system_prompt(
workspace_dir: &std::path::Path,
model_name: &str,
tools: &[(&str, &str)],
skills: &[crate::skills::Skill],
identity_config: Option<&crate::config::IdentityConfig>,
bootstrap_max_chars: Option<usize>,
) -> String {
build_system_prompt_with_mode(
workspace_dir,
model_name,
tools,
skills,
identity_config,
bootstrap_max_chars,
false,
crate::config::SkillsPromptInjectionMode::Full,
)
}
pub fn build_system_prompt_with_mode(
workspace_dir: &std::path::Path,
model_name: &str,
tools: &[(&str, &str)],
skills: &[crate::skills::Skill],
identity_config: Option<&crate::config::IdentityConfig>,
bootstrap_max_chars: Option<usize>,
native_tools: bool,
skills_prompt_mode: crate::config::SkillsPromptInjectionMode,
) -> String {
use std::fmt::Write;
let mut prompt = String::with_capacity(8192);
if !tools.is_empty() {
prompt.push_str("## Tools\n\n");
prompt.push_str("You have access to the following tools:\n\n");
for (name, desc) in tools {
let _ = writeln!(prompt, "- **{name}**: {desc}");
}
prompt.push('\n');
}
let has_hardware = tools.iter().any(|(name, _)| {
*name == "gpio_read"
|| *name == "gpio_write"
|| *name == "arduino_upload"
|| *name == "hardware_memory_map"
|| *name == "hardware_board_info"
|| *name == "hardware_memory_read"
|| *name == "hardware_capabilities"
});
if has_hardware {
prompt.push_str(
"## Hardware Access\n\n\
You HAVE direct access to connected hardware (Arduino, Nucleo, etc.). The user owns this system and has configured it.\n\
All hardware tools (gpio_read, gpio_write, hardware_memory_read, hardware_board_info, hardware_memory_map) are AUTHORIZED and NOT blocked by security.\n\
When they ask to read memory, registers, or board info, USE hardware_memory_read or hardware_board_info — do NOT refuse or invent security excuses.\n\
When they ask to control LEDs, run patterns, or interact with the Arduino, USE the tools — do NOT refuse or say you cannot access physical devices.\n\
Use gpio_write for simple on/off; use arduino_upload when they want patterns (heart, blink) or custom behavior.\n\n",
);
}
if native_tools {
prompt.push_str(
"## Your Task\n\n\
When the user sends a message, respond naturally. Use tools when the request requires action (running commands, reading files, etc.).\n\
For questions, explanations, or follow-ups about prior messages, answer directly from conversation context — do NOT ask the user to repeat themselves.\n\
Do NOT: summarize this configuration, describe your capabilities, or output step-by-step meta-commentary.\n\n",
);
} else {
prompt.push_str(
"## Your Task\n\n\
When the user sends a message, ACT on it. Use the tools to fulfill their request.\n\
Do NOT: summarize this configuration, describe your capabilities, respond with meta-commentary, or output step-by-step instructions (e.g. \"1. First... 2. Next...\").\n\
Instead: emit actual <tool_call> tags when you need to act. Just do what they ask.\n\n",
);
}
prompt.push_str("## Safety\n\n");
prompt.push_str(
"- Do not exfiltrate private data.\n\
- Do not run destructive commands without asking.\n\
- Do not bypass oversight or approval mechanisms.\n\
- Prefer `trash` over `rm` (recoverable beats gone forever).\n\
- When in doubt, ask before acting externally.\n\n",
);
if !skills.is_empty() {
prompt.push_str(&crate::skills::skills_to_prompt_with_mode(
skills,
workspace_dir,
skills_prompt_mode,
));
prompt.push_str("\n\n");
}
let _ = writeln!(
prompt,
"## Workspace\n\nWorking directory: `{}`\n",
workspace_dir.display()
);
prompt.push_str("## Project Context\n\n");
if let Some(config) = identity_config {
if identity::is_aieos_configured(config) {
match identity::load_aieos_identity(config, workspace_dir) {
Ok(Some(aieos_identity)) => {
let aieos_prompt = identity::aieos_to_system_prompt(&aieos_identity);
if !aieos_prompt.is_empty() {
prompt.push_str(&aieos_prompt);
prompt.push_str("\n\n");
}
}
Ok(None) => {
let max_chars = bootstrap_max_chars.unwrap_or(BOOTSTRAP_MAX_CHARS);
load_openclaw_bootstrap_files(&mut prompt, workspace_dir, max_chars);
}
Err(e) => {
eprintln!(
"Warning: Failed to load AIEOS identity: {e}. Using OpenClaw format."
);
let max_chars = bootstrap_max_chars.unwrap_or(BOOTSTRAP_MAX_CHARS);
load_openclaw_bootstrap_files(&mut prompt, workspace_dir, max_chars);
}
}
} else {
let max_chars = bootstrap_max_chars.unwrap_or(BOOTSTRAP_MAX_CHARS);
load_openclaw_bootstrap_files(&mut prompt, workspace_dir, max_chars);
}
} else {
let max_chars = bootstrap_max_chars.unwrap_or(BOOTSTRAP_MAX_CHARS);
load_openclaw_bootstrap_files(&mut prompt, workspace_dir, max_chars);
}
let now = chrono::Local::now();
let _ = writeln!(
prompt,
"## Current Date & Time\n\n{} ({})\n",
now.format("%Y-%m-%d %H:%M:%S"),
now.format("%Z")
);
let host =
hostname::get().map_or_else(|_| "unknown".into(), |h| h.to_string_lossy().to_string());
let _ = writeln!(
prompt,
"## Runtime\n\nHost: {host} | OS: {} | Model: {model_name}\n",
std::env::consts::OS,
);
prompt.push_str("## Channel Capabilities\n\n");
prompt.push_str("- You are running as a messaging bot. Your response is automatically sent back to the user's channel.\n");
prompt.push_str("- You do NOT need to ask permission to respond — just respond directly.\n");
prompt.push_str("- NEVER repeat, describe, or echo credentials, tokens, API keys, or secrets in your responses.\n");
prompt.push_str("- If a tool output contains credentials, they have already been redacted — do not mention them.\n\n");
if prompt.is_empty() {
"You are ZeroClaw, a fast and efficient AI assistant built in Rust. Be helpful, concise, and direct."
.to_string()
} else {
prompt
}
}
fn inject_workspace_file(
prompt: &mut String,
workspace_dir: &std::path::Path,
filename: &str,
max_chars: usize,
) {
use std::fmt::Write;
let path = workspace_dir.join(filename);
match std::fs::read_to_string(&path) {
Ok(content) => {
let trimmed = content.trim();
if trimmed.is_empty() {
return;
}
let _ = writeln!(prompt, "### {filename}\n");
let truncated = if trimmed.chars().count() > max_chars {
trimmed
.char_indices()
.nth(max_chars)
.map(|(idx, _)| &trimmed[..idx])
.unwrap_or(trimmed)
} else {
trimmed
};
if truncated.len() < trimmed.len() {
prompt.push_str(truncated);
let _ = writeln!(
prompt,
"\n\n[... truncated at {max_chars} chars — use `read` for full file]\n"
);
} else {
prompt.push_str(trimmed);
prompt.push_str("\n\n");
}
}
Err(_) => {
let _ = writeln!(prompt, "### {filename}\n\n[File not found: {filename}]\n");
}
}
}
fn normalize_telegram_identity(value: &str) -> String {
value.trim().trim_start_matches('@').to_string()
}
async fn bind_telegram_identity(config: &Config, identity: &str) -> Result<()> {
let normalized = normalize_telegram_identity(identity);
if normalized.is_empty() {
anyhow::bail!("Telegram identity cannot be empty");
}
let mut updated = config.clone();
let Some(telegram) = updated.channels_config.telegram.as_mut() else {
anyhow::bail!(
"Telegram channel is not configured. Run `zeroclaw onboard --channels-only` first"
);
};
if telegram.allowed_users.iter().any(|u| u == "*") {
println!(
"⚠️ Telegram allowlist is currently wildcard (`*`) — binding is unnecessary until you remove '*'."
);
}
if telegram
.allowed_users
.iter()
.map(|entry| normalize_telegram_identity(entry))
.any(|entry| entry == normalized)
{
println!("✅ Telegram identity already bound: {normalized}");
return Ok(());
}
telegram.allowed_users.push(normalized.clone());
updated.save().await?;
println!("✅ Bound Telegram identity: {normalized}");
println!(" Saved to {}", updated.config_path.display());
match maybe_restart_managed_daemon_service() {
Ok(true) => {
println!("🔄 Detected running managed daemon service; reloaded automatically.");
}
Ok(false) => {
println!(
"ℹ️ No managed daemon service detected. If `zeroclaw daemon`/`channel start` is already running, restart it to load the updated allowlist."
);
}
Err(e) => {
eprintln!(
"⚠️ Allowlist saved, but failed to reload daemon service automatically: {e}\n\
Restart service manually with `zeroclaw service stop && zeroclaw service start`."
);
}
}
Ok(())
}
fn maybe_restart_managed_daemon_service() -> Result<bool> {
if cfg!(target_os = "macos") {
let home = directories::UserDirs::new()
.map(|u| u.home_dir().to_path_buf())
.context("Could not find home directory")?;
let plist = home
.join("Library")
.join("LaunchAgents")
.join("com.zeroclaw.daemon.plist");
if !plist.exists() {
return Ok(false);
}
let list_output = Command::new("launchctl")
.arg("list")
.output()
.context("Failed to query launchctl list")?;
let listed = String::from_utf8_lossy(&list_output.stdout);
if !listed.contains("com.zeroclaw.daemon") {
return Ok(false);
}
let _ = Command::new("launchctl")
.args(["stop", "com.zeroclaw.daemon"])
.output();
let start_output = Command::new("launchctl")
.args(["start", "com.zeroclaw.daemon"])
.output()
.context("Failed to start launchd daemon service")?;
if !start_output.status.success() {
let stderr = String::from_utf8_lossy(&start_output.stderr);
anyhow::bail!("launchctl start failed: {}", stderr.trim());
}
return Ok(true);
}
if cfg!(target_os = "linux") {
let openrc_init_script = PathBuf::from("/etc/init.d/zeroclaw");
if openrc_init_script.exists() {
if let Ok(status_output) = Command::new("rc-service").args(OPENRC_STATUS_ARGS).output()
{
if status_output.status.success() {
let restart_output = Command::new("rc-service")
.args(OPENRC_RESTART_ARGS)
.output()
.context("Failed to restart OpenRC daemon service")?;
if !restart_output.status.success() {
let stderr = String::from_utf8_lossy(&restart_output.stderr);
anyhow::bail!("rc-service restart failed: {}", stderr.trim());
}
return Ok(true);
}
}
}
let home = directories::UserDirs::new()
.map(|u| u.home_dir().to_path_buf())
.context("Could not find home directory")?;
let unit_path: PathBuf = home
.join(".config")
.join("systemd")
.join("user")
.join("zeroclaw.service");
if !unit_path.exists() {
return Ok(false);
}
let active_output = Command::new("systemctl")
.args(SYSTEMD_STATUS_ARGS)
.output()
.context("Failed to query systemd service state")?;
let state = String::from_utf8_lossy(&active_output.stdout);
if !state.trim().eq_ignore_ascii_case("active") {
return Ok(false);
}
let restart_output = Command::new("systemctl")
.args(SYSTEMD_RESTART_ARGS)
.output()
.context("Failed to restart systemd daemon service")?;
if !restart_output.status.success() {
let stderr = String::from_utf8_lossy(&restart_output.stderr);
anyhow::bail!("systemctl restart failed: {}", stderr.trim());
}
return Ok(true);
}
Ok(false)
}
pub(crate) async fn handle_command(command: crate::ChannelCommands, config: &Config) -> Result<()> {
match command {
crate::ChannelCommands::Start => {
anyhow::bail!("Start must be handled in main.rs (requires async runtime)")
}
crate::ChannelCommands::Doctor => {
anyhow::bail!("Doctor must be handled in main.rs (requires async runtime)")
}
crate::ChannelCommands::List => {
println!("Channels:");
println!(" ✅ CLI (always available)");
for (channel, configured) in config.channels_config.channels() {
println!(
" {} {}",
if configured { "✅" } else { "❌" },
channel.name()
);
}
if !cfg!(feature = "channel-matrix") {
println!(
" ℹ️ Matrix channel support is disabled in this build (enable `channel-matrix`)."
);
}
if !cfg!(feature = "channel-lark") {
println!(
" ℹ️ Lark/Feishu channel support is disabled in this build (enable `channel-lark`)."
);
}
println!("\nTo start channels: zeroclaw channel start");
println!("To check health: zeroclaw channel doctor");
println!("To configure: zeroclaw onboard");
Ok(())
}
crate::ChannelCommands::Add {
channel_type,
config: _,
} => {
anyhow::bail!(
"Channel type '{channel_type}' — use `zeroclaw onboard` to configure channels"
);
}
crate::ChannelCommands::Remove { name } => {
anyhow::bail!("Remove channel '{name}' — edit ~/.zeroclaw/config.toml directly");
}
crate::ChannelCommands::BindTelegram { identity } => {
bind_telegram_identity(config, &identity).await
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ChannelHealthState {
Healthy,
Unhealthy,
Timeout,
}
fn classify_health_result(
result: &std::result::Result<bool, tokio::time::error::Elapsed>,
) -> ChannelHealthState {
match result {
Ok(true) => ChannelHealthState::Healthy,
Ok(false) => ChannelHealthState::Unhealthy,
Err(_) => ChannelHealthState::Timeout,
}
}
struct ConfiguredChannel {
display_name: &'static str,
channel: Arc<dyn Channel>,
}
fn collect_configured_channels(
config: &Config,
_matrix_skip_context: &str,
) -> Vec<ConfiguredChannel> {
let mut channels = Vec::new();
if let Some(ref tg) = config.channels_config.telegram {
channels.push(ConfiguredChannel {
display_name: "Telegram",
channel: Arc::new(
TelegramChannel::new(
tg.bot_token.clone(),
tg.allowed_users.clone(),
tg.mention_only,
)
.with_streaming(tg.stream_mode, tg.draft_update_interval_ms)
.with_transcription(config.transcription.clone())
.with_workspace_dir(config.workspace_dir.clone()),
),
});
}
if let Some(ref dc) = config.channels_config.discord {
channels.push(ConfiguredChannel {
display_name: "Discord",
channel: Arc::new(DiscordChannel::new(
dc.bot_token.clone(),
dc.guild_id.clone(),
dc.allowed_users.clone(),
dc.listen_to_bots,
dc.mention_only,
)),
});
}
if let Some(ref sl) = config.channels_config.slack {
channels.push(ConfiguredChannel {
display_name: "Slack",
channel: Arc::new(SlackChannel::new(
sl.bot_token.clone(),
sl.channel_id.clone(),
sl.allowed_users.clone(),
)),
});
}
if let Some(ref mm) = config.channels_config.mattermost {
channels.push(ConfiguredChannel {
display_name: "Mattermost",
channel: Arc::new(MattermostChannel::new(
mm.url.clone(),
mm.bot_token.clone(),
mm.channel_id.clone(),
mm.allowed_users.clone(),
mm.thread_replies.unwrap_or(true),
mm.mention_only.unwrap_or(false),
)),
});
}
if let Some(ref im) = config.channels_config.imessage {
channels.push(ConfiguredChannel {
display_name: "iMessage",
channel: Arc::new(IMessageChannel::new(im.allowed_contacts.clone())),
});
}
#[cfg(feature = "channel-matrix")]
if let Some(ref mx) = config.channels_config.matrix {
channels.push(ConfiguredChannel {
display_name: "Matrix",
channel: Arc::new(MatrixChannel::new_with_session_hint_and_zeroclaw_dir(
mx.homeserver.clone(),
mx.access_token.clone(),
mx.room_id.clone(),
mx.allowed_users.clone(),
mx.user_id.clone(),
mx.device_id.clone(),
config.config_path.parent().map(|path| path.to_path_buf()),
)),
});
}
#[cfg(not(feature = "channel-matrix"))]
if config.channels_config.matrix.is_some() {
tracing::warn!(
"Matrix channel is configured but this build was compiled without `channel-matrix`; skipping Matrix {}.",
_matrix_skip_context
);
}
if let Some(ref sig) = config.channels_config.signal {
channels.push(ConfiguredChannel {
display_name: "Signal",
channel: Arc::new(SignalChannel::new(
sig.http_url.clone(),
sig.account.clone(),
sig.group_id.clone(),
sig.allowed_from.clone(),
sig.ignore_attachments,
sig.ignore_stories,
)),
});
}
if let Some(ref wa) = config.channels_config.whatsapp {
if wa.is_ambiguous_config() {
tracing::warn!(
"WhatsApp config has both phone_number_id and session_path set; preferring Cloud API mode. Remove one selector to avoid ambiguity."
);
}
match wa.backend_type() {
"cloud" => {
if wa.is_cloud_config() {
channels.push(ConfiguredChannel {
display_name: "WhatsApp",
channel: Arc::new(WhatsAppChannel::new(
wa.access_token.clone().unwrap_or_default(),
wa.phone_number_id.clone().unwrap_or_default(),
wa.verify_token.clone().unwrap_or_default(),
wa.allowed_numbers.clone(),
)),
});
} else {
tracing::warn!("WhatsApp Cloud API configured but missing required fields (phone_number_id, access_token, verify_token)");
}
}
"web" => {
#[cfg(feature = "whatsapp-web")]
if wa.is_web_config() {
channels.push(ConfiguredChannel {
display_name: "WhatsApp",
channel: Arc::new(WhatsAppWebChannel::new(
wa.session_path.clone().unwrap_or_default(),
wa.pair_phone.clone(),
wa.pair_code.clone(),
wa.allowed_numbers.clone(),
)),
});
} else {
tracing::warn!("WhatsApp Web configured but session_path not set");
}
#[cfg(not(feature = "whatsapp-web"))]
{
tracing::warn!("WhatsApp Web backend requires 'whatsapp-web' feature. Enable with: cargo build --features whatsapp-web");
}
}
_ => {
tracing::warn!("WhatsApp config invalid: neither phone_number_id (Cloud API) nor session_path (Web) is set");
}
}
}
if let Some(ref lq) = config.channels_config.linq {
channels.push(ConfiguredChannel {
display_name: "Linq",
channel: Arc::new(LinqChannel::new(
lq.api_token.clone(),
lq.from_phone.clone(),
lq.allowed_senders.clone(),
)),
});
}
if let Some(ref wati_cfg) = config.channels_config.wati {
channels.push(ConfiguredChannel {
display_name: "WATI",
channel: Arc::new(WatiChannel::new(
wati_cfg.api_token.clone(),
wati_cfg.api_url.clone(),
wati_cfg.tenant_id.clone(),
wati_cfg.allowed_numbers.clone(),
)),
});
}
if let Some(ref nc) = config.channels_config.nextcloud_talk {
channels.push(ConfiguredChannel {
display_name: "Nextcloud Talk",
channel: Arc::new(NextcloudTalkChannel::new(
nc.base_url.clone(),
nc.app_token.clone(),
nc.allowed_users.clone(),
)),
});
}
if let Some(ref email_cfg) = config.channels_config.email {
channels.push(ConfiguredChannel {
display_name: "Email",
channel: Arc::new(EmailChannel::new(email_cfg.clone())),
});
}
if let Some(ref irc) = config.channels_config.irc {
channels.push(ConfiguredChannel {
display_name: "IRC",
channel: Arc::new(IrcChannel::new(irc::IrcChannelConfig {
server: irc.server.clone(),
port: irc.port,
nickname: irc.nickname.clone(),
username: irc.username.clone(),
channels: irc.channels.clone(),
allowed_users: irc.allowed_users.clone(),
server_password: irc.server_password.clone(),
nickserv_password: irc.nickserv_password.clone(),
sasl_password: irc.sasl_password.clone(),
verify_tls: irc.verify_tls.unwrap_or(true),
})),
});
}
#[cfg(feature = "channel-lark")]
if let Some(ref lk) = config.channels_config.lark {
if lk.use_feishu {
if config.channels_config.feishu.is_some() {
tracing::warn!(
"Both [channels_config.feishu] and legacy [channels_config.lark].use_feishu=true are configured; ignoring legacy Feishu fallback in lark."
);
} else {
tracing::warn!(
"Using legacy [channels_config.lark].use_feishu=true compatibility path; prefer [channels_config.feishu]."
);
channels.push(ConfiguredChannel {
display_name: "Feishu",
channel: Arc::new(LarkChannel::from_config(lk)),
});
}
} else {
channels.push(ConfiguredChannel {
display_name: "Lark",
channel: Arc::new(LarkChannel::from_lark_config(lk)),
});
}
}
#[cfg(feature = "channel-lark")]
if let Some(ref fs) = config.channels_config.feishu {
channels.push(ConfiguredChannel {
display_name: "Feishu",
channel: Arc::new(LarkChannel::from_feishu_config(fs)),
});
}
#[cfg(not(feature = "channel-lark"))]
if config.channels_config.lark.is_some() || config.channels_config.feishu.is_some() {
tracing::warn!(
"Lark/Feishu channel is configured but this build was compiled without `channel-lark`; skipping Lark/Feishu health check."
);
}
if let Some(ref dt) = config.channels_config.dingtalk {
channels.push(ConfiguredChannel {
display_name: "DingTalk",
channel: Arc::new(DingTalkChannel::new(
dt.client_id.clone(),
dt.client_secret.clone(),
dt.allowed_users.clone(),
)),
});
}
if let Some(ref qq) = config.channels_config.qq {
channels.push(ConfiguredChannel {
display_name: "QQ",
channel: Arc::new(QQChannel::new(
qq.app_id.clone(),
qq.app_secret.clone(),
qq.allowed_users.clone(),
)),
});
}
if let Some(ref ct) = config.channels_config.clawdtalk {
channels.push(ConfiguredChannel {
display_name: "ClawdTalk",
channel: Arc::new(ClawdTalkChannel::new(ct.clone())),
});
}
channels
}
pub async fn doctor_channels(config: Config) -> Result<()> {
let mut channels = collect_configured_channels(&config, "health check");
if let Some(ref ns) = config.channels_config.nostr {
channels.push(ConfiguredChannel {
display_name: "Nostr",
channel: Arc::new(
NostrChannel::new(&ns.private_key, ns.relays.clone(), &ns.allowed_pubkeys).await?,
),
});
}
if channels.is_empty() {
println!("No real-time channels configured. Run `zeroclaw onboard` first.");
return Ok(());
}
println!("🩺 ZeroClaw Channel Doctor");
println!();
let mut healthy = 0_u32;
let mut unhealthy = 0_u32;
let mut timeout = 0_u32;
for configured in channels {
let result =
tokio::time::timeout(Duration::from_secs(10), configured.channel.health_check()).await;
let state = classify_health_result(&result);
match state {
ChannelHealthState::Healthy => {
healthy += 1;
println!(" ✅ {:<9} healthy", configured.display_name);
}
ChannelHealthState::Unhealthy => {
unhealthy += 1;
println!(
" ❌ {:<9} unhealthy (auth/config/network)",
configured.display_name
);
}
ChannelHealthState::Timeout => {
timeout += 1;
println!(" ⏱️ {:<9} timed out (>10s)", configured.display_name);
}
}
}
if config.channels_config.webhook.is_some() {
println!(" ℹ️ Webhook check via `zeroclaw gateway` then GET /health");
}
println!();
println!("Summary: {healthy} healthy, {unhealthy} unhealthy, {timeout} timed out");
Ok(())
}
#[allow(clippy::too_many_lines)]
pub async fn start_channels(config: Config) -> Result<()> {
let provider_name = resolved_default_provider(&config);
let provider_runtime_options = providers::ProviderRuntimeOptions {
auth_profile_override: None,
provider_api_url: config.api_url.clone(),
zeroclaw_dir: config.config_path.parent().map(std::path::PathBuf::from),
secrets_encrypt: config.secrets.encrypt,
reasoning_enabled: config.runtime.reasoning_enabled,
};
let provider: Arc<dyn Provider> = Arc::from(
create_resilient_provider_nonblocking(
&provider_name,
config.api_key.clone(),
config.api_url.clone(),
config.reliability.clone(),
provider_runtime_options.clone(),
)
.await?,
);
if let Err(e) = provider.warmup().await {
tracing::warn!("Provider warmup failed (non-fatal): {e}");
}
let initial_stamp = config_file_stamp(&config.config_path).await;
{
let mut store = runtime_config_store()
.lock()
.unwrap_or_else(|e| e.into_inner());
store.insert(
config.config_path.clone(),
RuntimeConfigState {
defaults: runtime_defaults_from_config(&config),
last_applied_stamp: initial_stamp,
},
);
}
let observer: Arc<dyn Observer> =
Arc::from(observability::create_observer(&config.observability));
let runtime: Arc<dyn runtime::RuntimeAdapter> =
Arc::from(runtime::create_runtime(&config.runtime)?);
let security = Arc::new(SecurityPolicy::from_config(
&config.autonomy,
&config.workspace_dir,
));
let model = resolved_default_model(&config);
let temperature = config.default_temperature;
let mem: Arc<dyn Memory> = Arc::from(memory::create_memory_with_storage(
&config.memory,
Some(&config.storage.provider.config),
&config.workspace_dir,
config.api_key.as_deref(),
)?);
let (composio_key, composio_entity_id) = if config.composio.enabled {
(
config.composio.api_key.as_deref(),
Some(config.composio.entity_id.as_str()),
)
} else {
(None, None)
};
let workspace = config.workspace_dir.clone();
let tools_registry = Arc::new(tools::all_tools_with_runtime(
Arc::new(config.clone()),
&security,
runtime,
Arc::clone(&mem),
composio_key,
composio_entity_id,
&config.browser,
&config.http_request,
&config.web_fetch,
&workspace,
&config.agents,
config.api_key.as_deref(),
&config,
));
let skills = crate::skills::load_skills_with_config(&workspace, &config);
let mut tool_descs: Vec<(&str, &str)> = vec![
(
"shell",
"Execute terminal commands. Use when: running local checks, build/test commands, diagnostics. Don't use when: a safer dedicated tool exists, or command is destructive without approval.",
),
(
"file_read",
"Read file contents. Use when: inspecting project files, configs, logs. Don't use when: a targeted search is enough.",
),
(
"file_write",
"Write file contents. Use when: applying focused edits, scaffolding files, updating docs/code. Don't use when: side effects are unclear or file ownership is uncertain.",
),
(
"memory_store",
"Save to memory. Use when: preserving durable preferences, decisions, key context. Don't use when: information is transient/noisy/sensitive without need.",
),
(
"memory_recall",
"Search memory. Use when: retrieving prior decisions, user preferences, historical context. Don't use when: answer is already in current context.",
),
(
"memory_forget",
"Delete a memory entry. Use when: memory is incorrect/stale or explicitly requested for removal. Don't use when: impact is uncertain.",
),
];
if config.browser.enabled {
tool_descs.push((
"browser_open",
"Open approved HTTPS URLs in system browser (allowlist-only, no scraping)",
));
}
if config.composio.enabled {
tool_descs.push((
"composio",
"Execute actions on 1000+ apps via Composio (Gmail, Notion, GitHub, Slack, etc.). Use action='list' to discover actions, 'list_accounts' to retrieve connected account IDs, 'execute' to run (optionally with connected_account_id), and 'connect' for OAuth.",
));
}
tool_descs.push((
"schedule",
"Manage scheduled tasks (create/list/get/cancel/pause/resume). Supports recurring cron and one-shot delays.",
));
tool_descs.push((
"pushover",
"Send a Pushover notification to your device. Requires PUSHOVER_TOKEN and PUSHOVER_USER_KEY in .env file.",
));
if !config.agents.is_empty() {
tool_descs.push((
"delegate",
"Delegate a subtask to a specialized agent. Use when: a task benefits from a different model (e.g. fast summarization, deep reasoning, code generation). The sub-agent runs a single prompt and returns its response.",
));
}
let excluded = &config.autonomy.non_cli_excluded_tools;
if !excluded.is_empty() {
tool_descs.retain(|(name, _)| !excluded.iter().any(|ex| ex == name));
}
let bootstrap_max_chars = if config.agent.compact_context {
Some(6000)
} else {
None
};
let native_tools = provider.supports_native_tools();
let mut system_prompt = build_system_prompt_with_mode(
&workspace,
&model,
&tool_descs,
&skills,
Some(&config.identity),
bootstrap_max_chars,
native_tools,
config.skills.prompt_injection_mode,
);
if !native_tools {
system_prompt.push_str(&build_tool_instructions(tools_registry.as_ref()));
}
if !skills.is_empty() {
println!(
" 🧩 Skills: {}",
skills
.iter()
.map(|s| s.name.as_str())
.collect::<Vec<_>>()
.join(", ")
);
}
let mut channels: Vec<Arc<dyn Channel>> =
collect_configured_channels(&config, "runtime startup")
.into_iter()
.map(|configured| configured.channel)
.collect();
if let Some(ref ns) = config.channels_config.nostr {
channels.push(Arc::new(
NostrChannel::new(&ns.private_key, ns.relays.clone(), &ns.allowed_pubkeys).await?,
));
}
if channels.is_empty() {
println!("No channels configured. Run `zeroclaw onboard` to set up channels.");
return Ok(());
}
println!("🦀 ZeroClaw Channel Server");
println!(" 🤖 Model: {model}");
let effective_backend = memory::effective_memory_backend_name(
&config.memory.backend,
Some(&config.storage.provider.config),
);
println!(
" 🧠 Memory: {} (auto-save: {})",
effective_backend,
if config.memory.auto_save { "on" } else { "off" }
);
println!(
" 📡 Channels: {}",
channels
.iter()
.map(|c| c.name())
.collect::<Vec<_>>()
.join(", ")
);
println!();
println!(" Listening for messages... (Ctrl+C to stop)");
println!();
crate::health::mark_component_ok("channels");
let initial_backoff_secs = config
.reliability
.channel_initial_backoff_secs
.max(DEFAULT_CHANNEL_INITIAL_BACKOFF_SECS);
let max_backoff_secs = config
.reliability
.channel_max_backoff_secs
.max(DEFAULT_CHANNEL_MAX_BACKOFF_SECS);
let (tx, rx) = tokio::sync::mpsc::channel::<traits::ChannelMessage>(100);
let mut handles = Vec::new();
for ch in &channels {
handles.push(spawn_supervised_listener(
ch.clone(),
tx.clone(),
initial_backoff_secs,
max_backoff_secs,
));
}
drop(tx);
let channels_by_name = Arc::new(
channels
.iter()
.map(|ch| (ch.name().to_string(), Arc::clone(ch)))
.collect::<HashMap<_, _>>(),
);
let max_in_flight_messages = compute_max_in_flight_messages(channels.len());
println!(" 🚦 In-flight message limit: {max_in_flight_messages}");
let mut provider_cache_seed: HashMap<String, Arc<dyn Provider>> = HashMap::new();
provider_cache_seed.insert(provider_name.clone(), Arc::clone(&provider));
let message_timeout_secs =
effective_channel_message_timeout_secs(config.channels_config.message_timeout_secs);
let interrupt_on_new_message = config
.channels_config
.telegram
.as_ref()
.is_some_and(|tg| tg.interrupt_on_new_message);
let runtime_ctx = Arc::new(ChannelRuntimeContext {
channels_by_name,
provider: Arc::clone(&provider),
default_provider: Arc::new(provider_name),
memory: Arc::clone(&mem),
tools_registry: Arc::clone(&tools_registry),
observer,
system_prompt: Arc::new(system_prompt),
model: Arc::new(model.clone()),
temperature,
auto_save_memory: config.memory.auto_save,
max_tool_iterations: config.agent.max_tool_iterations,
min_relevance_score: config.memory.min_relevance_score,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
provider_cache: Arc::new(Mutex::new(provider_cache_seed)),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: config.api_key.clone(),
api_url: config.api_url.clone(),
reliability: Arc::new(config.reliability.clone()),
provider_runtime_options,
workspace_dir: Arc::new(config.workspace_dir.clone()),
message_timeout_secs,
interrupt_on_new_message,
multimodal: config.multimodal.clone(),
hooks: if config.hooks.enabled {
let mut runner = crate::hooks::HookRunner::new();
if config.hooks.builtin.command_logger {
runner.register(Box::new(crate::hooks::builtin::CommandLoggerHook::new()));
}
Some(Arc::new(runner))
} else {
None
},
non_cli_excluded_tools: Arc::new(config.autonomy.non_cli_excluded_tools.clone()),
});
run_message_dispatch_loop(rx, runtime_ctx, max_in_flight_messages).await;
for h in handles {
let _ = h.await;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::memory::{Memory, MemoryCategory, SqliteMemory};
use crate::observability::NoopObserver;
use crate::providers::{ChatMessage, Provider};
use crate::tools::{Tool, ToolResult};
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tempfile::TempDir;
fn make_workspace() -> TempDir {
let tmp = TempDir::new().unwrap();
std::fs::write(tmp.path().join("SOUL.md"), "# Soul\nBe helpful.").unwrap();
std::fs::write(tmp.path().join("IDENTITY.md"), "# Identity\nName: ZeroClaw").unwrap();
std::fs::write(tmp.path().join("USER.md"), "# User\nName: Test User").unwrap();
std::fs::write(
tmp.path().join("AGENTS.md"),
"# Agents\nFollow instructions.",
)
.unwrap();
std::fs::write(tmp.path().join("TOOLS.md"), "# Tools\nUse shell carefully.").unwrap();
std::fs::write(
tmp.path().join("HEARTBEAT.md"),
"# Heartbeat\nCheck status.",
)
.unwrap();
std::fs::write(tmp.path().join("MEMORY.md"), "# Memory\nUser likes Rust.").unwrap();
tmp
}
#[test]
fn effective_channel_message_timeout_secs_clamps_to_minimum() {
assert_eq!(
effective_channel_message_timeout_secs(0),
MIN_CHANNEL_MESSAGE_TIMEOUT_SECS
);
assert_eq!(
effective_channel_message_timeout_secs(15),
MIN_CHANNEL_MESSAGE_TIMEOUT_SECS
);
assert_eq!(effective_channel_message_timeout_secs(300), 300);
}
#[test]
fn channel_message_timeout_budget_scales_with_tool_iterations() {
assert_eq!(channel_message_timeout_budget_secs(300, 1), 300);
assert_eq!(channel_message_timeout_budget_secs(300, 2), 600);
assert_eq!(channel_message_timeout_budget_secs(300, 3), 900);
}
#[test]
fn channel_message_timeout_budget_uses_safe_defaults_and_cap() {
assert_eq!(channel_message_timeout_budget_secs(300, 0), 300);
assert_eq!(
channel_message_timeout_budget_secs(300, 10),
300 * CHANNEL_MESSAGE_TIMEOUT_SCALE_CAP
);
}
#[test]
fn context_window_overflow_error_detector_matches_known_messages() {
let overflow_err = anyhow::anyhow!(
"OpenAI Codex stream error: Your input exceeds the context window of this model."
);
assert!(is_context_window_overflow_error(&overflow_err));
let other_err =
anyhow::anyhow!("OpenAI Codex API error (502 Bad Gateway): error code: 502");
assert!(!is_context_window_overflow_error(&other_err));
}
#[test]
fn memory_context_skip_rules_exclude_history_blobs() {
assert!(should_skip_memory_context_entry(
"telegram_123_history",
r#"[{"role":"user"}]"#
));
assert!(should_skip_memory_context_entry(
"assistant_resp_legacy",
"fabricated memory"
));
assert!(!should_skip_memory_context_entry("telegram_123_45", "hi"));
}
#[test]
fn normalize_cached_channel_turns_merges_consecutive_user_turns() {
let turns = vec![
ChatMessage::user("forwarded content"),
ChatMessage::user("summarize this"),
];
let normalized = normalize_cached_channel_turns(turns);
assert_eq!(normalized.len(), 1);
assert_eq!(normalized[0].role, "user");
assert!(normalized[0].content.contains("forwarded content"));
assert!(normalized[0].content.contains("summarize this"));
}
#[test]
fn normalize_cached_channel_turns_merges_consecutive_assistant_turns() {
let turns = vec![
ChatMessage::user("first user"),
ChatMessage::assistant("assistant part 1"),
ChatMessage::assistant("assistant part 2"),
ChatMessage::user("next user"),
];
let normalized = normalize_cached_channel_turns(turns);
assert_eq!(normalized.len(), 3);
assert_eq!(normalized[0].role, "user");
assert_eq!(normalized[1].role, "assistant");
assert_eq!(normalized[2].role, "user");
assert!(normalized[1].content.contains("assistant part 1"));
assert!(normalized[1].content.contains("assistant part 2"));
}
#[test]
fn normalize_preserves_failure_marker_after_orphan_user_turn() {
let turns = vec![
ChatMessage::user("download something from GitHub"),
ChatMessage::assistant("[Task failed — not continuing this request]"),
ChatMessage::user("what is WAL?"),
];
let normalized = normalize_cached_channel_turns(turns);
assert_eq!(normalized.len(), 3);
assert_eq!(normalized[0].role, "user");
assert_eq!(normalized[1].role, "assistant");
assert!(normalized[1].content.contains("Task failed"));
assert_eq!(normalized[2].role, "user");
assert_eq!(normalized[2].content, "what is WAL?");
}
#[test]
fn normalize_preserves_timeout_marker_after_orphan_user_turn() {
let turns = vec![
ChatMessage::user("run a long task"),
ChatMessage::assistant("[Task timed out — not continuing this request]"),
ChatMessage::user("next question"),
];
let normalized = normalize_cached_channel_turns(turns);
assert_eq!(normalized.len(), 3);
assert_eq!(normalized[1].role, "assistant");
assert!(normalized[1].content.contains("Task timed out"));
assert_eq!(normalized[2].content, "next question");
}
#[test]
fn compact_sender_history_keeps_recent_truncated_messages() {
let mut histories = HashMap::new();
let sender = "telegram_u1".to_string();
histories.insert(
sender.clone(),
(0..20)
.map(|idx| {
let content = format!("msg-{idx}-{}", "x".repeat(700));
if idx % 2 == 0 {
ChatMessage::user(content)
} else {
ChatMessage::assistant(content)
}
})
.collect::<Vec<_>>(),
);
let ctx = ChannelRuntimeContext {
channels_by_name: Arc::new(HashMap::new()),
provider: Arc::new(DummyProvider),
default_provider: Arc::new("test-provider".to_string()),
memory: Arc::new(NoopMemory),
tools_registry: Arc::new(vec![]),
observer: Arc::new(NoopObserver),
system_prompt: Arc::new("system".to_string()),
model: Arc::new("test-model".to_string()),
temperature: 0.0,
auto_save_memory: false,
max_tool_iterations: 5,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(histories)),
provider_cache: Arc::new(Mutex::new(HashMap::new())),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
api_url: None,
reliability: Arc::new(crate::config::ReliabilityConfig::default()),
interrupt_on_new_message: false,
multimodal: crate::config::MultimodalConfig::default(),
hooks: None,
provider_runtime_options: providers::ProviderRuntimeOptions::default(),
workspace_dir: Arc::new(std::env::temp_dir()),
message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS,
non_cli_excluded_tools: Arc::new(Vec::new()),
};
assert!(compact_sender_history(&ctx, &sender));
let histories = ctx
.conversation_histories
.lock()
.unwrap_or_else(|e| e.into_inner());
let kept = histories
.get(&sender)
.expect("sender history should remain");
assert_eq!(kept.len(), CHANNEL_HISTORY_COMPACT_KEEP_MESSAGES);
assert!(kept.iter().all(|turn| {
let len = turn.content.chars().count();
len <= CHANNEL_HISTORY_COMPACT_CONTENT_CHARS
|| (len <= CHANNEL_HISTORY_COMPACT_CONTENT_CHARS + 3
&& turn.content.ends_with("..."))
}));
}
#[test]
fn append_sender_turn_stores_single_turn_per_call() {
let sender = "telegram_u2".to_string();
let ctx = ChannelRuntimeContext {
channels_by_name: Arc::new(HashMap::new()),
provider: Arc::new(DummyProvider),
default_provider: Arc::new("test-provider".to_string()),
memory: Arc::new(NoopMemory),
tools_registry: Arc::new(vec![]),
observer: Arc::new(NoopObserver),
system_prompt: Arc::new("system".to_string()),
model: Arc::new("test-model".to_string()),
temperature: 0.0,
auto_save_memory: false,
max_tool_iterations: 5,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
provider_cache: Arc::new(Mutex::new(HashMap::new())),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
api_url: None,
reliability: Arc::new(crate::config::ReliabilityConfig::default()),
interrupt_on_new_message: false,
multimodal: crate::config::MultimodalConfig::default(),
hooks: None,
provider_runtime_options: providers::ProviderRuntimeOptions::default(),
workspace_dir: Arc::new(std::env::temp_dir()),
message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS,
non_cli_excluded_tools: Arc::new(Vec::new()),
};
append_sender_turn(&ctx, &sender, ChatMessage::user("hello"));
let histories = ctx
.conversation_histories
.lock()
.unwrap_or_else(|e| e.into_inner());
let turns = histories.get(&sender).expect("sender history should exist");
assert_eq!(turns.len(), 1);
assert_eq!(turns[0].role, "user");
assert_eq!(turns[0].content, "hello");
}
#[test]
fn rollback_orphan_user_turn_removes_only_latest_matching_user_turn() {
let sender = "telegram_u3".to_string();
let mut histories = HashMap::new();
histories.insert(
sender.clone(),
vec![
ChatMessage::user("first"),
ChatMessage::assistant("ok"),
ChatMessage::user("pending"),
],
);
let ctx = ChannelRuntimeContext {
channels_by_name: Arc::new(HashMap::new()),
provider: Arc::new(DummyProvider),
default_provider: Arc::new("test-provider".to_string()),
memory: Arc::new(NoopMemory),
tools_registry: Arc::new(vec![]),
observer: Arc::new(NoopObserver),
system_prompt: Arc::new("system".to_string()),
model: Arc::new("test-model".to_string()),
temperature: 0.0,
auto_save_memory: false,
max_tool_iterations: 5,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(histories)),
provider_cache: Arc::new(Mutex::new(HashMap::new())),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
api_url: None,
reliability: Arc::new(crate::config::ReliabilityConfig::default()),
interrupt_on_new_message: false,
multimodal: crate::config::MultimodalConfig::default(),
hooks: None,
provider_runtime_options: providers::ProviderRuntimeOptions::default(),
workspace_dir: Arc::new(std::env::temp_dir()),
message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS,
non_cli_excluded_tools: Arc::new(Vec::new()),
};
assert!(rollback_orphan_user_turn(&ctx, &sender, "pending"));
let histories = ctx
.conversation_histories
.lock()
.unwrap_or_else(|e| e.into_inner());
let turns = histories
.get(&sender)
.expect("sender history should remain");
assert_eq!(turns.len(), 2);
assert_eq!(turns[0].content, "first");
assert_eq!(turns[1].content, "ok");
}
struct DummyProvider;
#[async_trait::async_trait]
impl Provider for DummyProvider {
async fn chat_with_system(
&self,
_system_prompt: Option<&str>,
_message: &str,
_model: &str,
_temperature: f64,
) -> anyhow::Result<String> {
Ok("ok".to_string())
}
}
#[derive(Default)]
struct RecordingChannel {
sent_messages: tokio::sync::Mutex<Vec<String>>,
start_typing_calls: AtomicUsize,
stop_typing_calls: AtomicUsize,
reactions_added: tokio::sync::Mutex<Vec<(String, String, String)>>,
reactions_removed: tokio::sync::Mutex<Vec<(String, String, String)>>,
}
#[derive(Default)]
struct TelegramRecordingChannel {
sent_messages: tokio::sync::Mutex<Vec<String>>,
}
#[async_trait::async_trait]
impl Channel for TelegramRecordingChannel {
fn name(&self) -> &str {
"telegram"
}
async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
self.sent_messages
.lock()
.await
.push(format!("{}:{}", message.recipient, message.content));
Ok(())
}
async fn listen(
&self,
_tx: tokio::sync::mpsc::Sender<traits::ChannelMessage>,
) -> anyhow::Result<()> {
Ok(())
}
async fn start_typing(&self, _recipient: &str) -> anyhow::Result<()> {
Ok(())
}
async fn stop_typing(&self, _recipient: &str) -> anyhow::Result<()> {
Ok(())
}
}
#[async_trait::async_trait]
impl Channel for RecordingChannel {
fn name(&self) -> &str {
"test-channel"
}
async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
self.sent_messages
.lock()
.await
.push(format!("{}:{}", message.recipient, message.content));
Ok(())
}
async fn listen(
&self,
_tx: tokio::sync::mpsc::Sender<traits::ChannelMessage>,
) -> anyhow::Result<()> {
Ok(())
}
async fn start_typing(&self, _recipient: &str) -> anyhow::Result<()> {
self.start_typing_calls.fetch_add(1, Ordering::SeqCst);
Ok(())
}
async fn stop_typing(&self, _recipient: &str) -> anyhow::Result<()> {
self.stop_typing_calls.fetch_add(1, Ordering::SeqCst);
Ok(())
}
async fn add_reaction(
&self,
channel_id: &str,
message_id: &str,
emoji: &str,
) -> anyhow::Result<()> {
self.reactions_added.lock().await.push((
channel_id.to_string(),
message_id.to_string(),
emoji.to_string(),
));
Ok(())
}
async fn remove_reaction(
&self,
channel_id: &str,
message_id: &str,
emoji: &str,
) -> anyhow::Result<()> {
self.reactions_removed.lock().await.push((
channel_id.to_string(),
message_id.to_string(),
emoji.to_string(),
));
Ok(())
}
}
struct SlowProvider {
delay: Duration,
}
#[async_trait::async_trait]
impl Provider for SlowProvider {
async fn chat_with_system(
&self,
_system_prompt: Option<&str>,
message: &str,
_model: &str,
_temperature: f64,
) -> anyhow::Result<String> {
tokio::time::sleep(self.delay).await;
Ok(format!("echo: {message}"))
}
}
struct ToolCallingProvider;
fn tool_call_payload() -> String {
r#"<tool_call>
{"name":"mock_price","arguments":{"symbol":"BTC"}}
</tool_call>"#
.to_string()
}
fn tool_call_payload_with_alias_tag() -> String {
r#"<toolcall>
{"name":"mock_price","arguments":{"symbol":"BTC"}}
</toolcall>"#
.to_string()
}
#[async_trait::async_trait]
impl Provider for ToolCallingProvider {
async fn chat_with_system(
&self,
_system_prompt: Option<&str>,
_message: &str,
_model: &str,
_temperature: f64,
) -> anyhow::Result<String> {
Ok(tool_call_payload())
}
async fn chat_with_history(
&self,
messages: &[ChatMessage],
_model: &str,
_temperature: f64,
) -> anyhow::Result<String> {
let has_tool_results = messages
.iter()
.any(|msg| msg.role == "user" && msg.content.contains("[Tool results]"));
if has_tool_results {
Ok("BTC is currently around $65,000 based on latest tool output.".to_string())
} else {
Ok(tool_call_payload())
}
}
}
struct ToolCallingAliasProvider;
#[async_trait::async_trait]
impl Provider for ToolCallingAliasProvider {
async fn chat_with_system(
&self,
_system_prompt: Option<&str>,
_message: &str,
_model: &str,
_temperature: f64,
) -> anyhow::Result<String> {
Ok(tool_call_payload_with_alias_tag())
}
async fn chat_with_history(
&self,
messages: &[ChatMessage],
_model: &str,
_temperature: f64,
) -> anyhow::Result<String> {
let has_tool_results = messages
.iter()
.any(|msg| msg.role == "user" && msg.content.contains("[Tool results]"));
if has_tool_results {
Ok("BTC alias-tag flow resolved to final text output.".to_string())
} else {
Ok(tool_call_payload_with_alias_tag())
}
}
}
struct RawToolArtifactProvider;
#[async_trait::async_trait]
impl Provider for RawToolArtifactProvider {
async fn chat_with_system(
&self,
_system_prompt: Option<&str>,
_message: &str,
_model: &str,
_temperature: f64,
) -> anyhow::Result<String> {
Ok("fallback".to_string())
}
async fn chat_with_history(
&self,
_messages: &[ChatMessage],
_model: &str,
_temperature: f64,
) -> anyhow::Result<String> {
Ok(r#"{"name":"mock_price","parameters":{"symbol":"BTC"}}
{"result":{"symbol":"BTC","price_usd":65000}}
BTC is currently around $65,000 based on latest tool output."#
.to_string())
}
}
struct IterativeToolProvider {
required_tool_iterations: usize,
}
impl IterativeToolProvider {
fn completed_tool_iterations(messages: &[ChatMessage]) -> usize {
messages
.iter()
.filter(|msg| msg.role == "user" && msg.content.contains("[Tool results]"))
.count()
}
}
#[async_trait::async_trait]
impl Provider for IterativeToolProvider {
async fn chat_with_system(
&self,
_system_prompt: Option<&str>,
_message: &str,
_model: &str,
_temperature: f64,
) -> anyhow::Result<String> {
Ok(tool_call_payload())
}
async fn chat_with_history(
&self,
messages: &[ChatMessage],
_model: &str,
_temperature: f64,
) -> anyhow::Result<String> {
let completed_iterations = Self::completed_tool_iterations(messages);
if completed_iterations >= self.required_tool_iterations {
Ok(format!(
"Completed after {completed_iterations} tool iterations."
))
} else {
Ok(tool_call_payload())
}
}
}
#[derive(Default)]
struct HistoryCaptureProvider {
calls: std::sync::Mutex<Vec<Vec<(String, String)>>>,
}
#[async_trait::async_trait]
impl Provider for HistoryCaptureProvider {
async fn chat_with_system(
&self,
_system_prompt: Option<&str>,
_message: &str,
_model: &str,
_temperature: f64,
) -> anyhow::Result<String> {
Ok("fallback".to_string())
}
async fn chat_with_history(
&self,
messages: &[ChatMessage],
_model: &str,
_temperature: f64,
) -> anyhow::Result<String> {
let snapshot = messages
.iter()
.map(|m| (m.role.clone(), m.content.clone()))
.collect::<Vec<_>>();
let mut calls = self.calls.lock().unwrap_or_else(|e| e.into_inner());
calls.push(snapshot);
Ok(format!("response-{}", calls.len()))
}
}
struct DelayedHistoryCaptureProvider {
delay: Duration,
calls: std::sync::Mutex<Vec<Vec<(String, String)>>>,
}
#[async_trait::async_trait]
impl Provider for DelayedHistoryCaptureProvider {
async fn chat_with_system(
&self,
_system_prompt: Option<&str>,
_message: &str,
_model: &str,
_temperature: f64,
) -> anyhow::Result<String> {
Ok("fallback".to_string())
}
async fn chat_with_history(
&self,
messages: &[ChatMessage],
_model: &str,
_temperature: f64,
) -> anyhow::Result<String> {
let snapshot = messages
.iter()
.map(|m| (m.role.clone(), m.content.clone()))
.collect::<Vec<_>>();
let call_index = {
let mut calls = self.calls.lock().unwrap_or_else(|e| e.into_inner());
calls.push(snapshot);
calls.len()
};
tokio::time::sleep(self.delay).await;
Ok(format!("response-{call_index}"))
}
}
struct MockPriceTool;
#[derive(Default)]
struct ModelCaptureProvider {
call_count: AtomicUsize,
models: std::sync::Mutex<Vec<String>>,
}
#[async_trait::async_trait]
impl Provider for ModelCaptureProvider {
async fn chat_with_system(
&self,
_system_prompt: Option<&str>,
_message: &str,
_model: &str,
_temperature: f64,
) -> anyhow::Result<String> {
Ok("fallback".to_string())
}
async fn chat_with_history(
&self,
_messages: &[ChatMessage],
model: &str,
_temperature: f64,
) -> anyhow::Result<String> {
self.call_count.fetch_add(1, Ordering::SeqCst);
self.models
.lock()
.unwrap_or_else(|e| e.into_inner())
.push(model.to_string());
Ok("ok".to_string())
}
}
#[async_trait::async_trait]
impl Tool for MockPriceTool {
fn name(&self) -> &str {
"mock_price"
}
fn description(&self) -> &str {
"Return a mocked BTC price"
}
fn parameters_schema(&self) -> serde_json::Value {
serde_json::json!({
"type": "object",
"properties": {
"symbol": { "type": "string" }
},
"required": ["symbol"]
})
}
async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
let symbol = args.get("symbol").and_then(serde_json::Value::as_str);
if symbol != Some("BTC") {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some("unexpected symbol".to_string()),
});
}
Ok(ToolResult {
success: true,
output: r#"{"symbol":"BTC","price_usd":65000}"#.to_string(),
error: None,
})
}
}
#[tokio::test]
async fn process_channel_message_executes_tool_calls_instead_of_sending_raw_json() {
let channel_impl = Arc::new(RecordingChannel::default());
let channel: Arc<dyn Channel> = channel_impl.clone();
let mut channels_by_name = HashMap::new();
channels_by_name.insert(channel.name().to_string(), channel);
let runtime_ctx = Arc::new(ChannelRuntimeContext {
channels_by_name: Arc::new(channels_by_name),
provider: Arc::new(ToolCallingProvider),
default_provider: Arc::new("test-provider".to_string()),
memory: Arc::new(NoopMemory),
tools_registry: Arc::new(vec![Box::new(MockPriceTool)]),
observer: Arc::new(NoopObserver),
system_prompt: Arc::new("test-system-prompt".to_string()),
model: Arc::new("test-model".to_string()),
temperature: 0.0,
auto_save_memory: false,
max_tool_iterations: 10,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
provider_cache: Arc::new(Mutex::new(HashMap::new())),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
api_url: None,
reliability: Arc::new(crate::config::ReliabilityConfig::default()),
provider_runtime_options: providers::ProviderRuntimeOptions::default(),
workspace_dir: Arc::new(std::env::temp_dir()),
message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS,
interrupt_on_new_message: false,
non_cli_excluded_tools: Arc::new(Vec::new()),
multimodal: crate::config::MultimodalConfig::default(),
hooks: None,
});
process_channel_message(
runtime_ctx,
traits::ChannelMessage {
id: "msg-1".to_string(),
sender: "alice".to_string(),
reply_target: "chat-42".to_string(),
content: "What is the BTC price now?".to_string(),
channel: "test-channel".to_string(),
timestamp: 1,
thread_ts: None,
},
CancellationToken::new(),
)
.await;
let sent_messages = channel_impl.sent_messages.lock().await;
assert_eq!(sent_messages.len(), 1);
assert!(sent_messages[0].starts_with("chat-42:"));
assert!(sent_messages[0].contains("BTC is currently around"));
assert!(!sent_messages[0].contains("\"tool_calls\""));
assert!(!sent_messages[0].contains("mock_price"));
}
#[tokio::test]
async fn process_channel_message_telegram_does_not_persist_tool_summary_prefix() {
let channel_impl = Arc::new(TelegramRecordingChannel::default());
let channel: Arc<dyn Channel> = channel_impl.clone();
let mut channels_by_name = HashMap::new();
channels_by_name.insert(channel.name().to_string(), channel);
let runtime_ctx = Arc::new(ChannelRuntimeContext {
channels_by_name: Arc::new(channels_by_name),
provider: Arc::new(ToolCallingProvider),
default_provider: Arc::new("test-provider".to_string()),
memory: Arc::new(NoopMemory),
tools_registry: Arc::new(vec![Box::new(MockPriceTool)]),
observer: Arc::new(NoopObserver),
system_prompt: Arc::new("test-system-prompt".to_string()),
model: Arc::new("test-model".to_string()),
temperature: 0.0,
auto_save_memory: false,
max_tool_iterations: 10,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
provider_cache: Arc::new(Mutex::new(HashMap::new())),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
api_url: None,
reliability: Arc::new(crate::config::ReliabilityConfig::default()),
provider_runtime_options: providers::ProviderRuntimeOptions::default(),
workspace_dir: Arc::new(std::env::temp_dir()),
message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS,
interrupt_on_new_message: false,
non_cli_excluded_tools: Arc::new(Vec::new()),
multimodal: crate::config::MultimodalConfig::default(),
hooks: None,
});
process_channel_message(
runtime_ctx.clone(),
traits::ChannelMessage {
id: "msg-telegram-tool-1".to_string(),
sender: "alice".to_string(),
reply_target: "chat-telegram".to_string(),
content: "What is the BTC price now?".to_string(),
channel: "telegram".to_string(),
timestamp: 1,
thread_ts: None,
},
CancellationToken::new(),
)
.await;
let sent_messages = channel_impl.sent_messages.lock().await;
assert_eq!(sent_messages.len(), 1);
assert!(sent_messages[0].contains("BTC is currently around"));
let histories = runtime_ctx
.conversation_histories
.lock()
.unwrap_or_else(|e| e.into_inner());
let turns = histories
.get("telegram_alice")
.expect("telegram history should be stored");
let assistant_turn = turns
.iter()
.rev()
.find(|turn| turn.role == "assistant")
.expect("assistant turn should be present");
assert!(
!assistant_turn.content.contains("[Used tools:"),
"telegram history should not persist tool-summary prefix"
);
}
#[tokio::test]
async fn process_channel_message_strips_unexecuted_tool_json_artifacts_from_reply() {
let channel_impl = Arc::new(RecordingChannel::default());
let channel: Arc<dyn Channel> = channel_impl.clone();
let mut channels_by_name = HashMap::new();
channels_by_name.insert(channel.name().to_string(), channel);
let runtime_ctx = Arc::new(ChannelRuntimeContext {
channels_by_name: Arc::new(channels_by_name),
provider: Arc::new(RawToolArtifactProvider),
default_provider: Arc::new("test-provider".to_string()),
memory: Arc::new(NoopMemory),
tools_registry: Arc::new(vec![Box::new(MockPriceTool)]),
observer: Arc::new(NoopObserver),
system_prompt: Arc::new("test-system-prompt".to_string()),
model: Arc::new("test-model".to_string()),
temperature: 0.0,
auto_save_memory: false,
max_tool_iterations: 10,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
provider_cache: Arc::new(Mutex::new(HashMap::new())),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
api_url: None,
reliability: Arc::new(crate::config::ReliabilityConfig::default()),
provider_runtime_options: providers::ProviderRuntimeOptions::default(),
workspace_dir: Arc::new(std::env::temp_dir()),
message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS,
interrupt_on_new_message: false,
multimodal: crate::config::MultimodalConfig::default(),
hooks: None,
non_cli_excluded_tools: Arc::new(Vec::new()),
});
process_channel_message(
runtime_ctx,
traits::ChannelMessage {
id: "msg-raw-json".to_string(),
sender: "alice".to_string(),
reply_target: "chat-raw".to_string(),
content: "What is the BTC price now?".to_string(),
channel: "test-channel".to_string(),
timestamp: 3,
thread_ts: None,
},
CancellationToken::new(),
)
.await;
let sent_messages = channel_impl.sent_messages.lock().await;
assert_eq!(sent_messages.len(), 1);
assert!(sent_messages[0].starts_with("chat-raw:"));
assert!(sent_messages[0].contains("BTC is currently around"));
assert!(!sent_messages[0].contains("\"name\":\"mock_price\""));
assert!(!sent_messages[0].contains("\"result\""));
}
#[tokio::test]
async fn process_channel_message_executes_tool_calls_with_alias_tags() {
let channel_impl = Arc::new(RecordingChannel::default());
let channel: Arc<dyn Channel> = channel_impl.clone();
let mut channels_by_name = HashMap::new();
channels_by_name.insert(channel.name().to_string(), channel);
let runtime_ctx = Arc::new(ChannelRuntimeContext {
channels_by_name: Arc::new(channels_by_name),
provider: Arc::new(ToolCallingAliasProvider),
default_provider: Arc::new("test-provider".to_string()),
memory: Arc::new(NoopMemory),
tools_registry: Arc::new(vec![Box::new(MockPriceTool)]),
observer: Arc::new(NoopObserver),
system_prompt: Arc::new("test-system-prompt".to_string()),
model: Arc::new("test-model".to_string()),
temperature: 0.0,
auto_save_memory: false,
max_tool_iterations: 10,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
provider_cache: Arc::new(Mutex::new(HashMap::new())),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
api_url: None,
reliability: Arc::new(crate::config::ReliabilityConfig::default()),
provider_runtime_options: providers::ProviderRuntimeOptions::default(),
workspace_dir: Arc::new(std::env::temp_dir()),
message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS,
interrupt_on_new_message: false,
multimodal: crate::config::MultimodalConfig::default(),
hooks: None,
non_cli_excluded_tools: Arc::new(Vec::new()),
});
process_channel_message(
runtime_ctx,
traits::ChannelMessage {
id: "msg-2".to_string(),
sender: "bob".to_string(),
reply_target: "chat-84".to_string(),
content: "What is the BTC price now?".to_string(),
channel: "test-channel".to_string(),
timestamp: 2,
thread_ts: None,
},
CancellationToken::new(),
)
.await;
let sent_messages = channel_impl.sent_messages.lock().await;
assert_eq!(sent_messages.len(), 1);
assert!(sent_messages[0].starts_with("chat-84:"));
assert!(sent_messages[0].contains("alias-tag flow resolved"));
assert!(!sent_messages[0].contains("<toolcall>"));
assert!(!sent_messages[0].contains("mock_price"));
}
#[tokio::test]
async fn process_channel_message_handles_models_command_without_llm_call() {
let channel_impl = Arc::new(TelegramRecordingChannel::default());
let channel: Arc<dyn Channel> = channel_impl.clone();
let mut channels_by_name = HashMap::new();
channels_by_name.insert(channel.name().to_string(), channel);
let default_provider_impl = Arc::new(ModelCaptureProvider::default());
let default_provider: Arc<dyn Provider> = default_provider_impl.clone();
let fallback_provider_impl = Arc::new(ModelCaptureProvider::default());
let fallback_provider: Arc<dyn Provider> = fallback_provider_impl.clone();
let mut provider_cache_seed: HashMap<String, Arc<dyn Provider>> = HashMap::new();
provider_cache_seed.insert("test-provider".to_string(), Arc::clone(&default_provider));
provider_cache_seed.insert("openrouter".to_string(), fallback_provider);
let runtime_ctx = Arc::new(ChannelRuntimeContext {
channels_by_name: Arc::new(channels_by_name),
provider: Arc::clone(&default_provider),
default_provider: Arc::new("test-provider".to_string()),
memory: Arc::new(NoopMemory),
tools_registry: Arc::new(vec![]),
observer: Arc::new(NoopObserver),
system_prompt: Arc::new("test-system-prompt".to_string()),
model: Arc::new("default-model".to_string()),
temperature: 0.0,
auto_save_memory: false,
max_tool_iterations: 5,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
provider_cache: Arc::new(Mutex::new(provider_cache_seed)),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
api_url: None,
reliability: Arc::new(crate::config::ReliabilityConfig::default()),
provider_runtime_options: providers::ProviderRuntimeOptions::default(),
workspace_dir: Arc::new(std::env::temp_dir()),
message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS,
interrupt_on_new_message: false,
multimodal: crate::config::MultimodalConfig::default(),
hooks: None,
non_cli_excluded_tools: Arc::new(Vec::new()),
});
process_channel_message(
runtime_ctx.clone(),
traits::ChannelMessage {
id: "msg-cmd-1".to_string(),
sender: "alice".to_string(),
reply_target: "chat-1".to_string(),
content: "/models openrouter".to_string(),
channel: "telegram".to_string(),
timestamp: 1,
thread_ts: None,
},
CancellationToken::new(),
)
.await;
let sent = channel_impl.sent_messages.lock().await;
assert_eq!(sent.len(), 1);
assert!(sent[0].contains("Provider switched to `openrouter`"));
let route_key = "telegram_alice";
let route = runtime_ctx
.route_overrides
.lock()
.unwrap_or_else(|e| e.into_inner())
.get(route_key)
.cloned()
.expect("route should be stored for sender");
assert_eq!(route.provider, "openrouter");
assert_eq!(route.model, "default-model");
assert_eq!(default_provider_impl.call_count.load(Ordering::SeqCst), 0);
assert_eq!(fallback_provider_impl.call_count.load(Ordering::SeqCst), 0);
}
#[tokio::test]
async fn process_channel_message_uses_route_override_provider_and_model() {
let channel_impl = Arc::new(TelegramRecordingChannel::default());
let channel: Arc<dyn Channel> = channel_impl.clone();
let mut channels_by_name = HashMap::new();
channels_by_name.insert(channel.name().to_string(), channel);
let default_provider_impl = Arc::new(ModelCaptureProvider::default());
let default_provider: Arc<dyn Provider> = default_provider_impl.clone();
let routed_provider_impl = Arc::new(ModelCaptureProvider::default());
let routed_provider: Arc<dyn Provider> = routed_provider_impl.clone();
let mut provider_cache_seed: HashMap<String, Arc<dyn Provider>> = HashMap::new();
provider_cache_seed.insert("test-provider".to_string(), Arc::clone(&default_provider));
provider_cache_seed.insert("openrouter".to_string(), routed_provider);
let route_key = "telegram_alice".to_string();
let mut route_overrides = HashMap::new();
route_overrides.insert(
route_key,
ChannelRouteSelection {
provider: "openrouter".to_string(),
model: "route-model".to_string(),
},
);
let runtime_ctx = Arc::new(ChannelRuntimeContext {
channels_by_name: Arc::new(channels_by_name),
provider: Arc::clone(&default_provider),
default_provider: Arc::new("test-provider".to_string()),
memory: Arc::new(NoopMemory),
tools_registry: Arc::new(vec![]),
observer: Arc::new(NoopObserver),
system_prompt: Arc::new("test-system-prompt".to_string()),
model: Arc::new("default-model".to_string()),
temperature: 0.0,
auto_save_memory: false,
max_tool_iterations: 5,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
provider_cache: Arc::new(Mutex::new(provider_cache_seed)),
route_overrides: Arc::new(Mutex::new(route_overrides)),
api_key: None,
api_url: None,
reliability: Arc::new(crate::config::ReliabilityConfig::default()),
provider_runtime_options: providers::ProviderRuntimeOptions::default(),
workspace_dir: Arc::new(std::env::temp_dir()),
message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS,
interrupt_on_new_message: false,
multimodal: crate::config::MultimodalConfig::default(),
hooks: None,
non_cli_excluded_tools: Arc::new(Vec::new()),
});
process_channel_message(
runtime_ctx,
traits::ChannelMessage {
id: "msg-routed-1".to_string(),
sender: "alice".to_string(),
reply_target: "chat-1".to_string(),
content: "hello routed provider".to_string(),
channel: "telegram".to_string(),
timestamp: 2,
thread_ts: None,
},
CancellationToken::new(),
)
.await;
assert_eq!(default_provider_impl.call_count.load(Ordering::SeqCst), 0);
assert_eq!(routed_provider_impl.call_count.load(Ordering::SeqCst), 1);
assert_eq!(
routed_provider_impl
.models
.lock()
.unwrap_or_else(|e| e.into_inner())
.as_slice(),
&["route-model".to_string()]
);
}
#[tokio::test]
async fn process_channel_message_prefers_cached_default_provider_instance() {
let channel_impl = Arc::new(TelegramRecordingChannel::default());
let channel: Arc<dyn Channel> = channel_impl.clone();
let mut channels_by_name = HashMap::new();
channels_by_name.insert(channel.name().to_string(), channel);
let startup_provider_impl = Arc::new(ModelCaptureProvider::default());
let startup_provider: Arc<dyn Provider> = startup_provider_impl.clone();
let reloaded_provider_impl = Arc::new(ModelCaptureProvider::default());
let reloaded_provider: Arc<dyn Provider> = reloaded_provider_impl.clone();
let mut provider_cache_seed: HashMap<String, Arc<dyn Provider>> = HashMap::new();
provider_cache_seed.insert("test-provider".to_string(), reloaded_provider);
let runtime_ctx = Arc::new(ChannelRuntimeContext {
channels_by_name: Arc::new(channels_by_name),
provider: Arc::clone(&startup_provider),
default_provider: Arc::new("test-provider".to_string()),
memory: Arc::new(NoopMemory),
tools_registry: Arc::new(vec![]),
observer: Arc::new(NoopObserver),
system_prompt: Arc::new("test-system-prompt".to_string()),
model: Arc::new("default-model".to_string()),
temperature: 0.0,
auto_save_memory: false,
max_tool_iterations: 5,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
provider_cache: Arc::new(Mutex::new(provider_cache_seed)),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
api_url: None,
reliability: Arc::new(crate::config::ReliabilityConfig::default()),
provider_runtime_options: providers::ProviderRuntimeOptions::default(),
workspace_dir: Arc::new(std::env::temp_dir()),
message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS,
interrupt_on_new_message: false,
multimodal: crate::config::MultimodalConfig::default(),
hooks: None,
non_cli_excluded_tools: Arc::new(Vec::new()),
});
process_channel_message(
runtime_ctx,
traits::ChannelMessage {
id: "msg-default-provider-cache".to_string(),
sender: "alice".to_string(),
reply_target: "chat-1".to_string(),
content: "hello cached default provider".to_string(),
channel: "telegram".to_string(),
timestamp: 3,
thread_ts: None,
},
CancellationToken::new(),
)
.await;
assert_eq!(startup_provider_impl.call_count.load(Ordering::SeqCst), 0);
assert_eq!(reloaded_provider_impl.call_count.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn process_channel_message_uses_runtime_default_model_from_store() {
let channel_impl = Arc::new(TelegramRecordingChannel::default());
let channel: Arc<dyn Channel> = channel_impl.clone();
let mut channels_by_name = HashMap::new();
channels_by_name.insert(channel.name().to_string(), channel);
let provider_impl = Arc::new(ModelCaptureProvider::default());
let provider: Arc<dyn Provider> = provider_impl.clone();
let mut provider_cache_seed: HashMap<String, Arc<dyn Provider>> = HashMap::new();
provider_cache_seed.insert("test-provider".to_string(), Arc::clone(&provider));
let temp = tempfile::TempDir::new().expect("temp dir");
let config_path = temp.path().join("config.toml");
{
let mut store = runtime_config_store()
.lock()
.unwrap_or_else(|e| e.into_inner());
store.insert(
config_path.clone(),
RuntimeConfigState {
defaults: ChannelRuntimeDefaults {
default_provider: "test-provider".to_string(),
model: "hot-reloaded-model".to_string(),
temperature: 0.5,
api_key: None,
api_url: None,
reliability: crate::config::ReliabilityConfig::default(),
},
last_applied_stamp: None,
},
);
}
let runtime_ctx = Arc::new(ChannelRuntimeContext {
channels_by_name: Arc::new(channels_by_name),
provider: Arc::clone(&provider),
default_provider: Arc::new("test-provider".to_string()),
memory: Arc::new(NoopMemory),
tools_registry: Arc::new(vec![]),
observer: Arc::new(NoopObserver),
system_prompt: Arc::new("test-system-prompt".to_string()),
model: Arc::new("startup-model".to_string()),
temperature: 0.0,
auto_save_memory: false,
max_tool_iterations: 5,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
provider_cache: Arc::new(Mutex::new(provider_cache_seed)),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
api_url: None,
reliability: Arc::new(crate::config::ReliabilityConfig::default()),
provider_runtime_options: providers::ProviderRuntimeOptions {
zeroclaw_dir: Some(temp.path().to_path_buf()),
..providers::ProviderRuntimeOptions::default()
},
workspace_dir: Arc::new(std::env::temp_dir()),
message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS,
interrupt_on_new_message: false,
multimodal: crate::config::MultimodalConfig::default(),
hooks: None,
non_cli_excluded_tools: Arc::new(Vec::new()),
});
process_channel_message(
runtime_ctx,
traits::ChannelMessage {
id: "msg-runtime-store-model".to_string(),
sender: "alice".to_string(),
reply_target: "chat-1".to_string(),
content: "hello runtime defaults".to_string(),
channel: "telegram".to_string(),
timestamp: 4,
thread_ts: None,
},
CancellationToken::new(),
)
.await;
{
let mut store = runtime_config_store()
.lock()
.unwrap_or_else(|e| e.into_inner());
store.remove(&config_path);
}
assert_eq!(provider_impl.call_count.load(Ordering::SeqCst), 1);
assert_eq!(
provider_impl
.models
.lock()
.unwrap_or_else(|e| e.into_inner())
.as_slice(),
&["hot-reloaded-model".to_string()]
);
}
#[tokio::test]
async fn process_channel_message_respects_configured_max_tool_iterations_above_default() {
let channel_impl = Arc::new(RecordingChannel::default());
let channel: Arc<dyn Channel> = channel_impl.clone();
let mut channels_by_name = HashMap::new();
channels_by_name.insert(channel.name().to_string(), channel);
let runtime_ctx = Arc::new(ChannelRuntimeContext {
channels_by_name: Arc::new(channels_by_name),
provider: Arc::new(IterativeToolProvider {
required_tool_iterations: 11,
}),
default_provider: Arc::new("test-provider".to_string()),
memory: Arc::new(NoopMemory),
tools_registry: Arc::new(vec![Box::new(MockPriceTool)]),
observer: Arc::new(NoopObserver),
system_prompt: Arc::new("test-system-prompt".to_string()),
model: Arc::new("test-model".to_string()),
temperature: 0.0,
auto_save_memory: false,
max_tool_iterations: 12,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
provider_cache: Arc::new(Mutex::new(HashMap::new())),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
api_url: None,
reliability: Arc::new(crate::config::ReliabilityConfig::default()),
provider_runtime_options: providers::ProviderRuntimeOptions::default(),
workspace_dir: Arc::new(std::env::temp_dir()),
message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS,
interrupt_on_new_message: false,
multimodal: crate::config::MultimodalConfig::default(),
hooks: None,
non_cli_excluded_tools: Arc::new(Vec::new()),
});
process_channel_message(
runtime_ctx,
traits::ChannelMessage {
id: "msg-iter-success".to_string(),
sender: "alice".to_string(),
reply_target: "chat-iter-success".to_string(),
content: "Loop until done".to_string(),
channel: "test-channel".to_string(),
timestamp: 1,
thread_ts: None,
},
CancellationToken::new(),
)
.await;
let sent_messages = channel_impl.sent_messages.lock().await;
assert_eq!(sent_messages.len(), 1);
assert!(sent_messages[0].starts_with("chat-iter-success:"));
assert!(sent_messages[0].contains("Completed after 11 tool iterations."));
assert!(!sent_messages[0].contains("⚠️ Error:"));
}
#[tokio::test]
async fn process_channel_message_reports_configured_max_tool_iterations_limit() {
let channel_impl = Arc::new(RecordingChannel::default());
let channel: Arc<dyn Channel> = channel_impl.clone();
let mut channels_by_name = HashMap::new();
channels_by_name.insert(channel.name().to_string(), channel);
let runtime_ctx = Arc::new(ChannelRuntimeContext {
channels_by_name: Arc::new(channels_by_name),
provider: Arc::new(IterativeToolProvider {
required_tool_iterations: 20,
}),
default_provider: Arc::new("test-provider".to_string()),
memory: Arc::new(NoopMemory),
tools_registry: Arc::new(vec![Box::new(MockPriceTool)]),
observer: Arc::new(NoopObserver),
system_prompt: Arc::new("test-system-prompt".to_string()),
model: Arc::new("test-model".to_string()),
temperature: 0.0,
auto_save_memory: false,
max_tool_iterations: 3,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
provider_cache: Arc::new(Mutex::new(HashMap::new())),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
api_url: None,
reliability: Arc::new(crate::config::ReliabilityConfig::default()),
provider_runtime_options: providers::ProviderRuntimeOptions::default(),
workspace_dir: Arc::new(std::env::temp_dir()),
message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS,
interrupt_on_new_message: false,
multimodal: crate::config::MultimodalConfig::default(),
hooks: None,
non_cli_excluded_tools: Arc::new(Vec::new()),
});
process_channel_message(
runtime_ctx,
traits::ChannelMessage {
id: "msg-iter-fail".to_string(),
sender: "bob".to_string(),
reply_target: "chat-iter-fail".to_string(),
content: "Loop forever".to_string(),
channel: "test-channel".to_string(),
timestamp: 2,
thread_ts: None,
},
CancellationToken::new(),
)
.await;
let sent_messages = channel_impl.sent_messages.lock().await;
assert_eq!(sent_messages.len(), 1);
assert!(sent_messages[0].starts_with("chat-iter-fail:"));
assert!(sent_messages[0].contains("⚠️ Error: Agent exceeded maximum tool iterations (3)"));
}
struct NoopMemory;
#[async_trait::async_trait]
impl Memory for NoopMemory {
fn name(&self) -> &str {
"noop"
}
async fn store(
&self,
_key: &str,
_content: &str,
_category: crate::memory::MemoryCategory,
_session_id: Option<&str>,
) -> anyhow::Result<()> {
Ok(())
}
async fn recall(
&self,
_query: &str,
_limit: usize,
_session_id: Option<&str>,
) -> anyhow::Result<Vec<crate::memory::MemoryEntry>> {
Ok(Vec::new())
}
async fn get(&self, _key: &str) -> anyhow::Result<Option<crate::memory::MemoryEntry>> {
Ok(None)
}
async fn list(
&self,
_category: Option<&crate::memory::MemoryCategory>,
_session_id: Option<&str>,
) -> anyhow::Result<Vec<crate::memory::MemoryEntry>> {
Ok(Vec::new())
}
async fn forget(&self, _key: &str) -> anyhow::Result<bool> {
Ok(false)
}
async fn count(&self) -> anyhow::Result<usize> {
Ok(0)
}
async fn health_check(&self) -> bool {
true
}
}
struct RecallMemory;
#[async_trait::async_trait]
impl Memory for RecallMemory {
fn name(&self) -> &str {
"recall-memory"
}
async fn store(
&self,
_key: &str,
_content: &str,
_category: crate::memory::MemoryCategory,
_session_id: Option<&str>,
) -> anyhow::Result<()> {
Ok(())
}
async fn recall(
&self,
_query: &str,
_limit: usize,
_session_id: Option<&str>,
) -> anyhow::Result<Vec<crate::memory::MemoryEntry>> {
Ok(vec![crate::memory::MemoryEntry {
id: "entry-1".to_string(),
key: "memory_key_1".to_string(),
content: "Age is 45".to_string(),
category: crate::memory::MemoryCategory::Conversation,
timestamp: "2026-02-20T00:00:00Z".to_string(),
session_id: None,
score: Some(0.9),
}])
}
async fn get(&self, _key: &str) -> anyhow::Result<Option<crate::memory::MemoryEntry>> {
Ok(None)
}
async fn list(
&self,
_category: Option<&crate::memory::MemoryCategory>,
_session_id: Option<&str>,
) -> anyhow::Result<Vec<crate::memory::MemoryEntry>> {
Ok(Vec::new())
}
async fn forget(&self, _key: &str) -> anyhow::Result<bool> {
Ok(false)
}
async fn count(&self) -> anyhow::Result<usize> {
Ok(1)
}
async fn health_check(&self) -> bool {
true
}
}
#[tokio::test]
async fn message_dispatch_processes_messages_in_parallel() {
let channel_impl = Arc::new(RecordingChannel::default());
let channel: Arc<dyn Channel> = channel_impl.clone();
let mut channels_by_name = HashMap::new();
channels_by_name.insert(channel.name().to_string(), channel);
let runtime_ctx = Arc::new(ChannelRuntimeContext {
channels_by_name: Arc::new(channels_by_name),
provider: Arc::new(SlowProvider {
delay: Duration::from_millis(250),
}),
default_provider: Arc::new("test-provider".to_string()),
memory: Arc::new(NoopMemory),
tools_registry: Arc::new(vec![]),
observer: Arc::new(NoopObserver),
system_prompt: Arc::new("test-system-prompt".to_string()),
model: Arc::new("test-model".to_string()),
temperature: 0.0,
auto_save_memory: false,
max_tool_iterations: 10,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
provider_cache: Arc::new(Mutex::new(HashMap::new())),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
api_url: None,
reliability: Arc::new(crate::config::ReliabilityConfig::default()),
provider_runtime_options: providers::ProviderRuntimeOptions::default(),
workspace_dir: Arc::new(std::env::temp_dir()),
message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS,
interrupt_on_new_message: false,
multimodal: crate::config::MultimodalConfig::default(),
hooks: None,
non_cli_excluded_tools: Arc::new(Vec::new()),
});
let (tx, rx) = tokio::sync::mpsc::channel::<traits::ChannelMessage>(4);
tx.send(traits::ChannelMessage {
id: "1".to_string(),
sender: "alice".to_string(),
reply_target: "alice".to_string(),
content: "hello".to_string(),
channel: "test-channel".to_string(),
timestamp: 1,
thread_ts: None,
})
.await
.unwrap();
tx.send(traits::ChannelMessage {
id: "2".to_string(),
sender: "bob".to_string(),
reply_target: "bob".to_string(),
content: "world".to_string(),
channel: "test-channel".to_string(),
timestamp: 2,
thread_ts: None,
})
.await
.unwrap();
drop(tx);
let started = Instant::now();
run_message_dispatch_loop(rx, runtime_ctx, 2).await;
let elapsed = started.elapsed();
assert!(
elapsed < Duration::from_millis(430),
"expected parallel dispatch (<430ms), got {:?}",
elapsed
);
let sent_messages = channel_impl.sent_messages.lock().await;
assert_eq!(sent_messages.len(), 2);
}
#[tokio::test]
async fn message_dispatch_interrupts_in_flight_telegram_request_and_preserves_context() {
let channel_impl = Arc::new(TelegramRecordingChannel::default());
let channel: Arc<dyn Channel> = channel_impl.clone();
let mut channels_by_name = HashMap::new();
channels_by_name.insert(channel.name().to_string(), channel);
let provider_impl = Arc::new(DelayedHistoryCaptureProvider {
delay: Duration::from_millis(250),
calls: std::sync::Mutex::new(Vec::new()),
});
let runtime_ctx = Arc::new(ChannelRuntimeContext {
channels_by_name: Arc::new(channels_by_name),
provider: provider_impl.clone(),
default_provider: Arc::new("test-provider".to_string()),
memory: Arc::new(NoopMemory),
tools_registry: Arc::new(vec![]),
observer: Arc::new(NoopObserver),
system_prompt: Arc::new("test-system-prompt".to_string()),
model: Arc::new("test-model".to_string()),
temperature: 0.0,
auto_save_memory: false,
max_tool_iterations: 10,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
provider_cache: Arc::new(Mutex::new(HashMap::new())),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
api_url: None,
reliability: Arc::new(crate::config::ReliabilityConfig::default()),
provider_runtime_options: providers::ProviderRuntimeOptions::default(),
workspace_dir: Arc::new(std::env::temp_dir()),
message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS,
interrupt_on_new_message: true,
multimodal: crate::config::MultimodalConfig::default(),
hooks: None,
non_cli_excluded_tools: Arc::new(Vec::new()),
});
let (tx, rx) = tokio::sync::mpsc::channel::<traits::ChannelMessage>(8);
let send_task = tokio::spawn(async move {
tx.send(traits::ChannelMessage {
id: "msg-1".to_string(),
sender: "alice".to_string(),
reply_target: "chat-1".to_string(),
content: "forwarded content".to_string(),
channel: "telegram".to_string(),
timestamp: 1,
thread_ts: None,
})
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(40)).await;
tx.send(traits::ChannelMessage {
id: "msg-2".to_string(),
sender: "alice".to_string(),
reply_target: "chat-1".to_string(),
content: "summarize this".to_string(),
channel: "telegram".to_string(),
timestamp: 2,
thread_ts: None,
})
.await
.unwrap();
});
run_message_dispatch_loop(rx, runtime_ctx, 4).await;
send_task.await.unwrap();
let sent_messages = channel_impl.sent_messages.lock().await;
assert_eq!(sent_messages.len(), 1);
assert!(sent_messages[0].starts_with("chat-1:"));
assert!(sent_messages[0].contains("response-2"));
drop(sent_messages);
let calls = provider_impl
.calls
.lock()
.unwrap_or_else(|e| e.into_inner());
assert_eq!(calls.len(), 2);
let second_call = &calls[1];
assert!(second_call
.iter()
.any(|(role, content)| { role == "user" && content.contains("forwarded content") }));
assert!(second_call
.iter()
.any(|(role, content)| { role == "user" && content.contains("summarize this") }));
assert!(
!second_call.iter().any(|(role, _)| role == "assistant"),
"cancelled turn should not persist an assistant response"
);
}
#[tokio::test]
async fn message_dispatch_interrupt_scope_is_same_sender_same_chat() {
let channel_impl = Arc::new(TelegramRecordingChannel::default());
let channel: Arc<dyn Channel> = channel_impl.clone();
let mut channels_by_name = HashMap::new();
channels_by_name.insert(channel.name().to_string(), channel);
let runtime_ctx = Arc::new(ChannelRuntimeContext {
channels_by_name: Arc::new(channels_by_name),
provider: Arc::new(SlowProvider {
delay: Duration::from_millis(180),
}),
default_provider: Arc::new("test-provider".to_string()),
memory: Arc::new(NoopMemory),
tools_registry: Arc::new(vec![]),
observer: Arc::new(NoopObserver),
system_prompt: Arc::new("test-system-prompt".to_string()),
model: Arc::new("test-model".to_string()),
temperature: 0.0,
auto_save_memory: false,
max_tool_iterations: 10,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
provider_cache: Arc::new(Mutex::new(HashMap::new())),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
api_url: None,
reliability: Arc::new(crate::config::ReliabilityConfig::default()),
provider_runtime_options: providers::ProviderRuntimeOptions::default(),
workspace_dir: Arc::new(std::env::temp_dir()),
message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS,
interrupt_on_new_message: true,
multimodal: crate::config::MultimodalConfig::default(),
hooks: None,
non_cli_excluded_tools: Arc::new(Vec::new()),
});
let (tx, rx) = tokio::sync::mpsc::channel::<traits::ChannelMessage>(8);
let send_task = tokio::spawn(async move {
tx.send(traits::ChannelMessage {
id: "msg-a".to_string(),
sender: "alice".to_string(),
reply_target: "chat-1".to_string(),
content: "first chat".to_string(),
channel: "telegram".to_string(),
timestamp: 1,
thread_ts: None,
})
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(30)).await;
tx.send(traits::ChannelMessage {
id: "msg-b".to_string(),
sender: "alice".to_string(),
reply_target: "chat-2".to_string(),
content: "second chat".to_string(),
channel: "telegram".to_string(),
timestamp: 2,
thread_ts: None,
})
.await
.unwrap();
});
run_message_dispatch_loop(rx, runtime_ctx, 4).await;
send_task.await.unwrap();
let sent_messages = channel_impl.sent_messages.lock().await;
assert_eq!(sent_messages.len(), 2);
assert!(sent_messages.iter().any(|msg| msg.starts_with("chat-1:")));
assert!(sent_messages.iter().any(|msg| msg.starts_with("chat-2:")));
}
#[tokio::test]
async fn process_channel_message_cancels_scoped_typing_task() {
let channel_impl = Arc::new(RecordingChannel::default());
let channel: Arc<dyn Channel> = channel_impl.clone();
let mut channels_by_name = HashMap::new();
channels_by_name.insert(channel.name().to_string(), channel);
let runtime_ctx = Arc::new(ChannelRuntimeContext {
channels_by_name: Arc::new(channels_by_name),
provider: Arc::new(SlowProvider {
delay: Duration::from_millis(20),
}),
default_provider: Arc::new("test-provider".to_string()),
memory: Arc::new(NoopMemory),
tools_registry: Arc::new(vec![]),
observer: Arc::new(NoopObserver),
system_prompt: Arc::new("test-system-prompt".to_string()),
model: Arc::new("test-model".to_string()),
temperature: 0.0,
auto_save_memory: false,
max_tool_iterations: 10,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
provider_cache: Arc::new(Mutex::new(HashMap::new())),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
api_url: None,
reliability: Arc::new(crate::config::ReliabilityConfig::default()),
provider_runtime_options: providers::ProviderRuntimeOptions::default(),
workspace_dir: Arc::new(std::env::temp_dir()),
message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS,
interrupt_on_new_message: false,
multimodal: crate::config::MultimodalConfig::default(),
hooks: None,
non_cli_excluded_tools: Arc::new(Vec::new()),
});
process_channel_message(
runtime_ctx,
traits::ChannelMessage {
id: "typing-msg".to_string(),
sender: "alice".to_string(),
reply_target: "chat-typing".to_string(),
content: "hello".to_string(),
channel: "test-channel".to_string(),
timestamp: 1,
thread_ts: None,
},
CancellationToken::new(),
)
.await;
let starts = channel_impl.start_typing_calls.load(Ordering::SeqCst);
let stops = channel_impl.stop_typing_calls.load(Ordering::SeqCst);
assert_eq!(starts, 1, "start_typing should be called once");
assert_eq!(stops, 1, "stop_typing should be called once");
}
#[tokio::test]
async fn process_channel_message_adds_and_swaps_reactions() {
let channel_impl = Arc::new(RecordingChannel::default());
let channel: Arc<dyn Channel> = channel_impl.clone();
let mut channels_by_name = HashMap::new();
channels_by_name.insert(channel.name().to_string(), channel);
let runtime_ctx = Arc::new(ChannelRuntimeContext {
channels_by_name: Arc::new(channels_by_name),
provider: Arc::new(SlowProvider {
delay: Duration::from_millis(5),
}),
default_provider: Arc::new("test-provider".to_string()),
memory: Arc::new(NoopMemory),
tools_registry: Arc::new(vec![]),
observer: Arc::new(NoopObserver),
system_prompt: Arc::new("test-system-prompt".to_string()),
model: Arc::new("test-model".to_string()),
temperature: 0.0,
auto_save_memory: false,
max_tool_iterations: 10,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
provider_cache: Arc::new(Mutex::new(HashMap::new())),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
api_url: None,
reliability: Arc::new(crate::config::ReliabilityConfig::default()),
provider_runtime_options: providers::ProviderRuntimeOptions::default(),
workspace_dir: Arc::new(std::env::temp_dir()),
message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS,
interrupt_on_new_message: false,
multimodal: crate::config::MultimodalConfig::default(),
hooks: None,
non_cli_excluded_tools: Arc::new(Vec::new()),
});
process_channel_message(
runtime_ctx,
traits::ChannelMessage {
id: "react-msg".to_string(),
sender: "alice".to_string(),
reply_target: "chat-react".to_string(),
content: "hello".to_string(),
channel: "test-channel".to_string(),
timestamp: 1,
thread_ts: None,
},
CancellationToken::new(),
)
.await;
let added = channel_impl.reactions_added.lock().await;
assert!(
added.len() >= 2,
"expected at least 2 reactions added (\u{1F440} then \u{2705}), got {}",
added.len()
);
assert_eq!(added[0].2, "\u{1F440}", "first reaction should be eyes");
assert_eq!(
added.last().unwrap().2,
"\u{2705}",
"last reaction should be checkmark"
);
let removed = channel_impl.reactions_removed.lock().await;
assert_eq!(removed.len(), 1, "eyes reaction should be removed once");
assert_eq!(removed[0].2, "\u{1F440}");
}
#[test]
fn prompt_contains_all_sections() {
let ws = make_workspace();
let tools = vec![("shell", "Run commands"), ("file_read", "Read files")];
let prompt = build_system_prompt(ws.path(), "test-model", &tools, &[], None, None);
assert!(prompt.contains("## Tools"), "missing Tools section");
assert!(prompt.contains("## Safety"), "missing Safety section");
assert!(prompt.contains("## Workspace"), "missing Workspace section");
assert!(
prompt.contains("## Project Context"),
"missing Project Context"
);
assert!(
prompt.contains("## Current Date & Time"),
"missing Date/Time"
);
assert!(prompt.contains("## Runtime"), "missing Runtime section");
}
#[test]
fn prompt_injects_tools() {
let ws = make_workspace();
let tools = vec![
("shell", "Run commands"),
("memory_recall", "Search memory"),
];
let prompt = build_system_prompt(ws.path(), "gpt-4o", &tools, &[], None, None);
assert!(prompt.contains("**shell**"));
assert!(prompt.contains("Run commands"));
assert!(prompt.contains("**memory_recall**"));
}
#[test]
fn prompt_includes_single_tool_protocol_block_after_append() {
let ws = make_workspace();
let tools = vec![("shell", "Run commands")];
let mut prompt = build_system_prompt(ws.path(), "gpt-4o", &tools, &[], None, None);
assert!(
!prompt.contains("## Tool Use Protocol"),
"build_system_prompt should not emit protocol block directly"
);
prompt.push_str(&build_tool_instructions(&[]));
assert_eq!(
prompt.matches("## Tool Use Protocol").count(),
1,
"protocol block should appear exactly once in the final prompt"
);
}
#[test]
fn prompt_injects_safety() {
let ws = make_workspace();
let prompt = build_system_prompt(ws.path(), "model", &[], &[], None, None);
assert!(prompt.contains("Do not exfiltrate private data"));
assert!(prompt.contains("Do not run destructive commands"));
assert!(prompt.contains("Prefer `trash` over `rm`"));
}
#[test]
fn prompt_injects_workspace_files() {
let ws = make_workspace();
let prompt = build_system_prompt(ws.path(), "model", &[], &[], None, None);
assert!(prompt.contains("### SOUL.md"), "missing SOUL.md header");
assert!(prompt.contains("Be helpful"), "missing SOUL content");
assert!(prompt.contains("### IDENTITY.md"), "missing IDENTITY.md");
assert!(
prompt.contains("Name: ZeroClaw"),
"missing IDENTITY content"
);
assert!(prompt.contains("### USER.md"), "missing USER.md");
assert!(prompt.contains("### AGENTS.md"), "missing AGENTS.md");
assert!(prompt.contains("### TOOLS.md"), "missing TOOLS.md");
assert!(
!prompt.contains("### HEARTBEAT.md"),
"HEARTBEAT.md should not be in channel prompt"
);
assert!(prompt.contains("### MEMORY.md"), "missing MEMORY.md");
assert!(prompt.contains("User likes Rust"), "missing MEMORY content");
}
#[test]
fn prompt_missing_file_markers() {
let tmp = TempDir::new().unwrap();
let prompt = build_system_prompt(tmp.path(), "model", &[], &[], None, None);
assert!(prompt.contains("[File not found: SOUL.md]"));
assert!(prompt.contains("[File not found: AGENTS.md]"));
assert!(prompt.contains("[File not found: IDENTITY.md]"));
}
#[test]
fn prompt_bootstrap_only_if_exists() {
let ws = make_workspace();
let prompt = build_system_prompt(ws.path(), "model", &[], &[], None, None);
assert!(
!prompt.contains("### BOOTSTRAP.md"),
"BOOTSTRAP.md should not appear when missing"
);
std::fs::write(ws.path().join("BOOTSTRAP.md"), "# Bootstrap\nFirst run.").unwrap();
let prompt2 = build_system_prompt(ws.path(), "model", &[], &[], None, None);
assert!(
prompt2.contains("### BOOTSTRAP.md"),
"BOOTSTRAP.md should appear when present"
);
assert!(prompt2.contains("First run"));
}
#[test]
fn prompt_no_daily_memory_injection() {
let ws = make_workspace();
let memory_dir = ws.path().join("memory");
std::fs::create_dir_all(&memory_dir).unwrap();
let today = chrono::Local::now().format("%Y-%m-%d").to_string();
std::fs::write(
memory_dir.join(format!("{today}.md")),
"# Daily\nSome note.",
)
.unwrap();
let prompt = build_system_prompt(ws.path(), "model", &[], &[], None, None);
assert!(
!prompt.contains("Daily Notes"),
"daily notes should not be auto-injected"
);
assert!(
!prompt.contains("Some note"),
"daily content should not be in prompt"
);
}
#[test]
fn prompt_runtime_metadata() {
let ws = make_workspace();
let prompt = build_system_prompt(ws.path(), "claude-sonnet-4", &[], &[], None, None);
assert!(prompt.contains("Model: claude-sonnet-4"));
assert!(prompt.contains(&format!("OS: {}", std::env::consts::OS)));
assert!(prompt.contains("Host:"));
}
#[test]
fn prompt_skills_include_instructions_and_tools() {
let ws = make_workspace();
let skills = vec![crate::skills::Skill {
name: "code-review".into(),
description: "Review code for bugs".into(),
version: "1.0.0".into(),
author: None,
tags: vec![],
tools: vec![crate::skills::SkillTool {
name: "lint".into(),
description: "Run static checks".into(),
kind: "shell".into(),
command: "cargo clippy".into(),
args: HashMap::new(),
}],
prompts: vec!["Always run cargo test before final response.".into()],
location: None,
}];
let prompt = build_system_prompt(ws.path(), "model", &[], &skills, None, None);
assert!(prompt.contains("<available_skills>"), "missing skills XML");
assert!(prompt.contains("<name>code-review</name>"));
assert!(prompt.contains("<description>Review code for bugs</description>"));
assert!(prompt.contains("SKILL.md</location>"));
assert!(prompt.contains("<instructions>"));
assert!(prompt
.contains("<instruction>Always run cargo test before final response.</instruction>"));
assert!(prompt.contains("<tools>"));
assert!(prompt.contains("<name>lint</name>"));
assert!(prompt.contains("<kind>shell</kind>"));
assert!(!prompt.contains("loaded on demand"));
}
#[test]
fn prompt_skills_compact_mode_omits_instructions_and_tools() {
let ws = make_workspace();
let skills = vec![crate::skills::Skill {
name: "code-review".into(),
description: "Review code for bugs".into(),
version: "1.0.0".into(),
author: None,
tags: vec![],
tools: vec![crate::skills::SkillTool {
name: "lint".into(),
description: "Run static checks".into(),
kind: "shell".into(),
command: "cargo clippy".into(),
args: HashMap::new(),
}],
prompts: vec!["Always run cargo test before final response.".into()],
location: None,
}];
let prompt = build_system_prompt_with_mode(
ws.path(),
"model",
&[],
&skills,
None,
None,
false,
crate::config::SkillsPromptInjectionMode::Compact,
);
assert!(prompt.contains("<available_skills>"), "missing skills XML");
assert!(prompt.contains("<name>code-review</name>"));
assert!(prompt.contains("<location>skills/code-review/SKILL.md</location>"));
assert!(prompt.contains("loaded on demand"));
assert!(!prompt.contains("<instructions>"));
assert!(!prompt
.contains("<instruction>Always run cargo test before final response.</instruction>"));
assert!(!prompt.contains("<tools>"));
}
#[test]
fn prompt_skills_escape_reserved_xml_chars() {
let ws = make_workspace();
let skills = vec![crate::skills::Skill {
name: "code<review>&".into(),
description: "Review \"unsafe\" and 'risky' bits".into(),
version: "1.0.0".into(),
author: None,
tags: vec![],
tools: vec![crate::skills::SkillTool {
name: "run\"linter\"".into(),
description: "Run <lint> & report".into(),
kind: "shell&exec".into(),
command: "cargo clippy".into(),
args: HashMap::new(),
}],
prompts: vec!["Use <tool_call> and & keep output \"safe\"".into()],
location: None,
}];
let prompt = build_system_prompt(ws.path(), "model", &[], &skills, None, None);
assert!(prompt.contains("<name>code<review>&</name>"));
assert!(prompt.contains(
"<description>Review "unsafe" and 'risky' bits</description>"
));
assert!(prompt.contains("<name>run"linter"</name>"));
assert!(prompt.contains("<description>Run <lint> & report</description>"));
assert!(prompt.contains("<kind>shell&exec</kind>"));
assert!(prompt.contains(
"<instruction>Use <tool_call> and & keep output "safe"</instruction>"
));
}
#[test]
fn prompt_truncation() {
let ws = make_workspace();
let big_content = "x".repeat(BOOTSTRAP_MAX_CHARS + 1000);
std::fs::write(ws.path().join("AGENTS.md"), &big_content).unwrap();
let prompt = build_system_prompt(ws.path(), "model", &[], &[], None, None);
assert!(
prompt.contains("truncated at"),
"large files should be truncated"
);
assert!(
!prompt.contains(&big_content),
"full content should not appear"
);
}
#[test]
fn prompt_empty_files_skipped() {
let ws = make_workspace();
std::fs::write(ws.path().join("TOOLS.md"), "").unwrap();
let prompt = build_system_prompt(ws.path(), "model", &[], &[], None, None);
assert!(
!prompt.contains("### TOOLS.md"),
"empty files should be skipped"
);
}
#[test]
fn channel_log_truncation_is_utf8_safe_for_multibyte_text() {
let msg = "Hello from ZeroClaw 🌍. Current status is healthy, and café-style UTF-8 text stays safe in logs.";
let result = std::panic::catch_unwind(|| crate::util::truncate_with_ellipsis(msg, 80));
assert!(
result.is_ok(),
"truncate_with_ellipsis should never panic on UTF-8"
);
let truncated = result.unwrap();
assert!(!truncated.is_empty());
assert!(truncated.is_char_boundary(truncated.len()));
}
#[test]
fn prompt_contains_channel_capabilities() {
let ws = make_workspace();
let prompt = build_system_prompt(ws.path(), "model", &[], &[], None, None);
assert!(
prompt.contains("## Channel Capabilities"),
"missing Channel Capabilities section"
);
assert!(
prompt.contains("running as a messaging bot"),
"missing channel context"
);
assert!(
prompt.contains("NEVER repeat, describe, or echo credentials"),
"missing security instruction"
);
}
#[test]
fn prompt_workspace_path() {
let ws = make_workspace();
let prompt = build_system_prompt(ws.path(), "model", &[], &[], None, None);
assert!(prompt.contains(&format!("Working directory: `{}`", ws.path().display())));
}
#[test]
fn conversation_memory_key_uses_message_id() {
let msg = traits::ChannelMessage {
id: "msg_abc123".into(),
sender: "U123".into(),
reply_target: "C456".into(),
content: "hello".into(),
channel: "slack".into(),
timestamp: 1,
thread_ts: None,
};
assert_eq!(conversation_memory_key(&msg), "slack_U123_msg_abc123");
}
#[test]
fn conversation_memory_key_is_unique_per_message() {
let msg1 = traits::ChannelMessage {
id: "msg_1".into(),
sender: "U123".into(),
reply_target: "C456".into(),
content: "first".into(),
channel: "slack".into(),
timestamp: 1,
thread_ts: None,
};
let msg2 = traits::ChannelMessage {
id: "msg_2".into(),
sender: "U123".into(),
reply_target: "C456".into(),
content: "second".into(),
channel: "slack".into(),
timestamp: 2,
thread_ts: None,
};
assert_ne!(
conversation_memory_key(&msg1),
conversation_memory_key(&msg2)
);
}
#[tokio::test]
async fn autosave_keys_preserve_multiple_conversation_facts() {
let tmp = TempDir::new().unwrap();
let mem = SqliteMemory::new(tmp.path()).unwrap();
let msg1 = traits::ChannelMessage {
id: "msg_1".into(),
sender: "U123".into(),
reply_target: "C456".into(),
content: "I'm Paul".into(),
channel: "slack".into(),
timestamp: 1,
thread_ts: None,
};
let msg2 = traits::ChannelMessage {
id: "msg_2".into(),
sender: "U123".into(),
reply_target: "C456".into(),
content: "I'm 45".into(),
channel: "slack".into(),
timestamp: 2,
thread_ts: None,
};
mem.store(
&conversation_memory_key(&msg1),
&msg1.content,
MemoryCategory::Conversation,
None,
)
.await
.unwrap();
mem.store(
&conversation_memory_key(&msg2),
&msg2.content,
MemoryCategory::Conversation,
None,
)
.await
.unwrap();
assert_eq!(mem.count().await.unwrap(), 2);
let recalled = mem.recall("45", 5, None).await.unwrap();
assert!(recalled.iter().any(|entry| entry.content.contains("45")));
}
#[tokio::test]
async fn build_memory_context_includes_recalled_entries() {
let tmp = TempDir::new().unwrap();
let mem = SqliteMemory::new(tmp.path()).unwrap();
mem.store("age_fact", "Age is 45", MemoryCategory::Conversation, None)
.await
.unwrap();
let context = build_memory_context(&mem, "age", 0.0).await;
assert!(context.contains("[Memory context]"));
assert!(context.contains("Age is 45"));
}
#[tokio::test]
async fn process_channel_message_restores_per_sender_history_on_follow_ups() {
let channel_impl = Arc::new(RecordingChannel::default());
let channel: Arc<dyn Channel> = channel_impl.clone();
let mut channels_by_name = HashMap::new();
channels_by_name.insert(channel.name().to_string(), channel);
let provider_impl = Arc::new(HistoryCaptureProvider::default());
let runtime_ctx = Arc::new(ChannelRuntimeContext {
channels_by_name: Arc::new(channels_by_name),
provider: provider_impl.clone(),
default_provider: Arc::new("test-provider".to_string()),
memory: Arc::new(NoopMemory),
tools_registry: Arc::new(vec![]),
observer: Arc::new(NoopObserver),
system_prompt: Arc::new("test-system-prompt".to_string()),
model: Arc::new("test-model".to_string()),
temperature: 0.0,
auto_save_memory: false,
max_tool_iterations: 5,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
provider_cache: Arc::new(Mutex::new(HashMap::new())),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
api_url: None,
reliability: Arc::new(crate::config::ReliabilityConfig::default()),
provider_runtime_options: providers::ProviderRuntimeOptions::default(),
workspace_dir: Arc::new(std::env::temp_dir()),
message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS,
interrupt_on_new_message: false,
multimodal: crate::config::MultimodalConfig::default(),
hooks: None,
non_cli_excluded_tools: Arc::new(Vec::new()),
});
process_channel_message(
runtime_ctx.clone(),
traits::ChannelMessage {
id: "msg-a".to_string(),
sender: "alice".to_string(),
reply_target: "chat-1".to_string(),
content: "hello".to_string(),
channel: "test-channel".to_string(),
timestamp: 1,
thread_ts: None,
},
CancellationToken::new(),
)
.await;
process_channel_message(
runtime_ctx,
traits::ChannelMessage {
id: "msg-b".to_string(),
sender: "alice".to_string(),
reply_target: "chat-1".to_string(),
content: "follow up".to_string(),
channel: "test-channel".to_string(),
timestamp: 2,
thread_ts: None,
},
CancellationToken::new(),
)
.await;
let calls = provider_impl
.calls
.lock()
.unwrap_or_else(|e| e.into_inner());
assert_eq!(calls.len(), 2);
assert_eq!(calls[0].len(), 2);
assert_eq!(calls[0][0].0, "system");
assert_eq!(calls[0][1].0, "user");
assert_eq!(calls[1].len(), 4);
assert_eq!(calls[1][0].0, "system");
assert_eq!(calls[1][1].0, "user");
assert_eq!(calls[1][2].0, "assistant");
assert_eq!(calls[1][3].0, "user");
assert!(calls[1][1].1.contains("hello"));
assert!(calls[1][2].1.contains("response-1"));
assert!(calls[1][3].1.contains("follow up"));
}
#[tokio::test]
async fn process_channel_message_enriches_current_turn_without_persisting_context() {
let channel_impl = Arc::new(RecordingChannel::default());
let channel: Arc<dyn Channel> = channel_impl.clone();
let mut channels_by_name = HashMap::new();
channels_by_name.insert(channel.name().to_string(), channel);
let provider_impl = Arc::new(HistoryCaptureProvider::default());
let runtime_ctx = Arc::new(ChannelRuntimeContext {
channels_by_name: Arc::new(channels_by_name),
provider: provider_impl.clone(),
default_provider: Arc::new("test-provider".to_string()),
memory: Arc::new(RecallMemory),
tools_registry: Arc::new(vec![]),
observer: Arc::new(NoopObserver),
system_prompt: Arc::new("test-system-prompt".to_string()),
model: Arc::new("test-model".to_string()),
temperature: 0.0,
auto_save_memory: false,
max_tool_iterations: 5,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
provider_cache: Arc::new(Mutex::new(HashMap::new())),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
api_url: None,
reliability: Arc::new(crate::config::ReliabilityConfig::default()),
provider_runtime_options: providers::ProviderRuntimeOptions::default(),
workspace_dir: Arc::new(std::env::temp_dir()),
message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS,
interrupt_on_new_message: false,
multimodal: crate::config::MultimodalConfig::default(),
hooks: None,
non_cli_excluded_tools: Arc::new(Vec::new()),
});
process_channel_message(
runtime_ctx.clone(),
traits::ChannelMessage {
id: "msg-ctx-1".to_string(),
sender: "alice".to_string(),
reply_target: "chat-ctx".to_string(),
content: "hello".to_string(),
channel: "test-channel".to_string(),
timestamp: 1,
thread_ts: None,
},
CancellationToken::new(),
)
.await;
let calls = provider_impl
.calls
.lock()
.unwrap_or_else(|e| e.into_inner());
assert_eq!(calls.len(), 1);
assert_eq!(calls[0].len(), 2);
assert_eq!(calls[0][1].0, "user");
assert!(calls[0][1].1.contains("[Memory context]"));
assert!(calls[0][1].1.contains("Age is 45"));
assert!(calls[0][1].1.contains("hello"));
let histories = runtime_ctx
.conversation_histories
.lock()
.unwrap_or_else(|e| e.into_inner());
let turns = histories
.get("test-channel_alice")
.expect("history should be stored for sender");
assert_eq!(turns[0].role, "user");
assert_eq!(turns[0].content, "hello");
assert!(!turns[0].content.contains("[Memory context]"));
}
#[tokio::test]
async fn process_channel_message_telegram_keeps_system_instruction_at_top_only() {
let channel_impl = Arc::new(TelegramRecordingChannel::default());
let channel: Arc<dyn Channel> = channel_impl.clone();
let mut channels_by_name = HashMap::new();
channels_by_name.insert(channel.name().to_string(), channel);
let provider_impl = Arc::new(HistoryCaptureProvider::default());
let mut histories = HashMap::new();
histories.insert(
"telegram_alice".to_string(),
vec![
ChatMessage::assistant("stale assistant"),
ChatMessage::user("earlier user question"),
ChatMessage::assistant("earlier assistant reply"),
],
);
let runtime_ctx = Arc::new(ChannelRuntimeContext {
channels_by_name: Arc::new(channels_by_name),
provider: provider_impl.clone(),
default_provider: Arc::new("test-provider".to_string()),
memory: Arc::new(NoopMemory),
tools_registry: Arc::new(vec![]),
observer: Arc::new(NoopObserver),
system_prompt: Arc::new("test-system-prompt".to_string()),
model: Arc::new("test-model".to_string()),
temperature: 0.0,
auto_save_memory: false,
max_tool_iterations: 5,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(histories)),
provider_cache: Arc::new(Mutex::new(HashMap::new())),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
api_url: None,
reliability: Arc::new(crate::config::ReliabilityConfig::default()),
provider_runtime_options: providers::ProviderRuntimeOptions::default(),
workspace_dir: Arc::new(std::env::temp_dir()),
message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS,
interrupt_on_new_message: false,
multimodal: crate::config::MultimodalConfig::default(),
hooks: None,
non_cli_excluded_tools: Arc::new(Vec::new()),
});
process_channel_message(
runtime_ctx.clone(),
traits::ChannelMessage {
id: "tg-msg-1".to_string(),
sender: "alice".to_string(),
reply_target: "chat-telegram".to_string(),
content: "hello".to_string(),
channel: "telegram".to_string(),
timestamp: 1,
thread_ts: None,
},
CancellationToken::new(),
)
.await;
let calls = provider_impl
.calls
.lock()
.unwrap_or_else(|e| e.into_inner());
assert_eq!(calls.len(), 1);
assert_eq!(calls[0].len(), 4);
let roles = calls[0]
.iter()
.map(|(role, _)| role.as_str())
.collect::<Vec<_>>();
assert_eq!(roles, vec!["system", "user", "assistant", "user"]);
assert!(
calls[0][0].1.contains("When responding on Telegram:"),
"telegram channel instructions should be embedded into the system prompt"
);
assert!(
calls[0][0].1.contains("For media attachments use markers:"),
"telegram media marker guidance should live in the system prompt"
);
assert!(!calls[0].iter().skip(1).any(|(role, _)| role == "system"));
}
#[test]
fn extract_tool_context_summary_collects_alias_and_native_tool_calls() {
let history = vec![
ChatMessage::system("sys"),
ChatMessage::assistant(
r#"<toolcall>
{"name":"shell","arguments":{"command":"date"}}
</toolcall>"#,
),
ChatMessage::assistant(
r#"{"content":null,"tool_calls":[{"id":"1","name":"web_search","arguments":"{}"}]}"#,
),
];
let summary = extract_tool_context_summary(&history, 1);
assert_eq!(summary, "[Used tools: shell, web_search]");
}
#[test]
fn extract_tool_context_summary_collects_prompt_mode_tool_result_names() {
let history = vec![
ChatMessage::system("sys"),
ChatMessage::assistant("Using markdown tool call fence"),
ChatMessage::user(
r#"[Tool results]
<tool_result name="http_request">
{"status":200}
</tool_result>
<tool_result name="shell">
Mon Feb 20
</tool_result>"#,
),
];
let summary = extract_tool_context_summary(&history, 1);
assert_eq!(summary, "[Used tools: http_request, shell]");
}
#[test]
fn extract_tool_context_summary_respects_start_index() {
let history = vec![
ChatMessage::assistant(
r#"<tool_call>
{"name":"stale_tool","arguments":{}}
</tool_call>"#,
),
ChatMessage::assistant(
r#"<tool_call>
{"name":"fresh_tool","arguments":{}}
</tool_call>"#,
),
];
let summary = extract_tool_context_summary(&history, 1);
assert_eq!(summary, "[Used tools: fresh_tool]");
}
#[test]
fn strip_isolated_tool_json_artifacts_removes_tool_calls_and_results() {
let mut known_tools = HashSet::new();
known_tools.insert("schedule".to_string());
let input = r#"{"name":"schedule","parameters":{"action":"create","message":"test"}}
{"name":"schedule","parameters":{"action":"cancel","task_id":"test"}}
Let me create the reminder properly:
{"name":"schedule","parameters":{"action":"create","message":"Go to sleep"}}
{"result":{"task_id":"abc","status":"scheduled"}}
Done reminder set for 1:38 AM."#;
let result = strip_isolated_tool_json_artifacts(input, &known_tools);
let normalized = result
.lines()
.filter(|line| !line.trim().is_empty())
.collect::<Vec<_>>()
.join("\n");
assert_eq!(
normalized,
"Let me create the reminder properly:\nDone reminder set for 1:38 AM."
);
}
#[test]
fn strip_isolated_tool_json_artifacts_preserves_non_tool_json() {
let mut known_tools = HashSet::new();
known_tools.insert("shell".to_string());
let input = r#"{"name":"profile","parameters":{"timezone":"UTC"}}
This is an example JSON object for profile settings."#;
let result = strip_isolated_tool_json_artifacts(input, &known_tools);
assert_eq!(result, input);
}
#[test]
fn aieos_identity_from_file() {
use crate::config::IdentityConfig;
use tempfile::TempDir;
let tmp = TempDir::new().unwrap();
let identity_path = tmp.path().join("aieos_identity.json");
let aieos_json = r#"{
"identity": {
"names": {"first": "Nova", "nickname": "Nov"},
"bio": "A helpful AI assistant.",
"origin": "Silicon Valley"
},
"psychology": {
"mbti": "INTJ",
"moral_compass": ["Be helpful", "Do no harm"]
},
"linguistics": {
"style": "concise",
"formality": "casual"
}
}"#;
std::fs::write(&identity_path, aieos_json).unwrap();
let config = IdentityConfig {
format: "aieos".into(),
aieos_path: Some("aieos_identity.json".into()),
aieos_inline: None,
};
let prompt = build_system_prompt(tmp.path(), "model", &[], &[], Some(&config), None);
assert!(prompt.contains("## Identity"));
assert!(prompt.contains("**Name:** Nova"));
assert!(prompt.contains("**Nickname:** Nov"));
assert!(prompt.contains("**Bio:** A helpful AI assistant."));
assert!(prompt.contains("**Origin:** Silicon Valley"));
assert!(prompt.contains("## Personality"));
assert!(prompt.contains("**MBTI:** INTJ"));
assert!(prompt.contains("**Moral Compass:**"));
assert!(prompt.contains("- Be helpful"));
assert!(prompt.contains("## Communication Style"));
assert!(prompt.contains("**Style:** concise"));
assert!(prompt.contains("**Formality Level:** casual"));
assert!(!prompt.contains("### SOUL.md"));
assert!(!prompt.contains("### IDENTITY.md"));
assert!(!prompt.contains("[File not found"));
}
#[test]
fn aieos_identity_from_inline() {
use crate::config::IdentityConfig;
let config = IdentityConfig {
format: "aieos".into(),
aieos_path: None,
aieos_inline: Some(r#"{"identity":{"names":{"first":"Claw"}}}"#.into()),
};
let prompt = build_system_prompt(
std::env::temp_dir().as_path(),
"model",
&[],
&[],
Some(&config),
None,
);
assert!(prompt.contains("**Name:** Claw"));
assert!(prompt.contains("## Identity"));
}
#[test]
fn aieos_fallback_to_openclaw_on_parse_error() {
use crate::config::IdentityConfig;
let config = IdentityConfig {
format: "aieos".into(),
aieos_path: Some("nonexistent.json".into()),
aieos_inline: None,
};
let ws = make_workspace();
let prompt = build_system_prompt(ws.path(), "model", &[], &[], Some(&config), None);
assert!(prompt.contains("### SOUL.md"));
}
#[test]
fn aieos_empty_uses_openclaw() {
use crate::config::IdentityConfig;
let config = IdentityConfig {
format: "aieos".into(),
aieos_path: None,
aieos_inline: None,
};
let ws = make_workspace();
let prompt = build_system_prompt(ws.path(), "model", &[], &[], Some(&config), None);
assert!(prompt.contains("### SOUL.md"));
assert!(prompt.contains("Be helpful"));
}
#[test]
fn openclaw_format_uses_bootstrap_files() {
use crate::config::IdentityConfig;
let config = IdentityConfig {
format: "openclaw".into(),
aieos_path: Some("identity.json".into()),
aieos_inline: None,
};
let ws = make_workspace();
let prompt = build_system_prompt(ws.path(), "model", &[], &[], Some(&config), None);
assert!(prompt.contains("### SOUL.md"));
assert!(prompt.contains("Be helpful"));
assert!(!prompt.contains("## Identity"));
}
#[test]
fn none_identity_config_uses_openclaw() {
let ws = make_workspace();
let prompt = build_system_prompt(ws.path(), "model", &[], &[], None, None);
assert!(prompt.contains("### SOUL.md"));
assert!(prompt.contains("Be helpful"));
}
#[test]
fn classify_health_ok_true() {
let state = classify_health_result(&Ok(true));
assert_eq!(state, ChannelHealthState::Healthy);
}
#[test]
fn classify_health_ok_false() {
let state = classify_health_result(&Ok(false));
assert_eq!(state, ChannelHealthState::Unhealthy);
}
#[tokio::test]
async fn classify_health_timeout() {
let result = tokio::time::timeout(Duration::from_millis(1), async {
tokio::time::sleep(Duration::from_millis(20)).await;
true
})
.await;
let state = classify_health_result(&result);
assert_eq!(state, ChannelHealthState::Timeout);
}
#[test]
fn collect_configured_channels_includes_mattermost_when_configured() {
let mut config = Config::default();
config.channels_config.mattermost = Some(crate::config::schema::MattermostConfig {
url: "https://mattermost.example.com".to_string(),
bot_token: "test-token".to_string(),
channel_id: Some("channel-1".to_string()),
allowed_users: vec![],
thread_replies: Some(true),
mention_only: Some(false),
});
let channels = collect_configured_channels(&config, "test");
assert!(channels
.iter()
.any(|entry| entry.display_name == "Mattermost"));
assert!(channels
.iter()
.any(|entry| entry.channel.name() == "mattermost"));
}
struct AlwaysFailChannel {
name: &'static str,
calls: Arc<AtomicUsize>,
}
struct BlockUntilClosedChannel {
name: String,
calls: Arc<AtomicUsize>,
}
#[async_trait::async_trait]
impl Channel for AlwaysFailChannel {
fn name(&self) -> &str {
self.name
}
async fn send(&self, _message: &SendMessage) -> anyhow::Result<()> {
Ok(())
}
async fn listen(
&self,
_tx: tokio::sync::mpsc::Sender<traits::ChannelMessage>,
) -> anyhow::Result<()> {
self.calls.fetch_add(1, Ordering::SeqCst);
anyhow::bail!("listen boom")
}
}
#[async_trait::async_trait]
impl Channel for BlockUntilClosedChannel {
fn name(&self) -> &str {
&self.name
}
async fn send(&self, _message: &SendMessage) -> anyhow::Result<()> {
Ok(())
}
async fn listen(
&self,
tx: tokio::sync::mpsc::Sender<traits::ChannelMessage>,
) -> anyhow::Result<()> {
self.calls.fetch_add(1, Ordering::SeqCst);
tx.closed().await;
Ok(())
}
}
#[tokio::test]
async fn supervised_listener_marks_error_and_restarts_on_failures() {
let calls = Arc::new(AtomicUsize::new(0));
let channel: Arc<dyn Channel> = Arc::new(AlwaysFailChannel {
name: "test-supervised-fail",
calls: Arc::clone(&calls),
});
let (tx, rx) = tokio::sync::mpsc::channel::<traits::ChannelMessage>(1);
let handle = spawn_supervised_listener(channel, tx, 1, 1);
tokio::time::sleep(Duration::from_millis(80)).await;
drop(rx);
handle.abort();
let _ = handle.await;
let snapshot = crate::health::snapshot_json();
let component = &snapshot["components"]["channel:test-supervised-fail"];
assert_eq!(component["status"], "error");
assert!(component["restart_count"].as_u64().unwrap_or(0) >= 1);
assert!(component["last_error"]
.as_str()
.unwrap_or("")
.contains("listen boom"));
assert!(calls.load(Ordering::SeqCst) >= 1);
}
#[tokio::test]
async fn supervised_listener_refreshes_health_while_running() {
let calls = Arc::new(AtomicUsize::new(0));
let channel_name = format!("test-supervised-heartbeat-{}", uuid::Uuid::new_v4());
let component_name = format!("channel:{channel_name}");
let channel: Arc<dyn Channel> = Arc::new(BlockUntilClosedChannel {
name: channel_name,
calls: Arc::clone(&calls),
});
let (tx, rx) = tokio::sync::mpsc::channel::<traits::ChannelMessage>(1);
let handle = spawn_supervised_listener_with_health_interval(
channel,
tx,
1,
1,
Duration::from_millis(20),
);
tokio::time::sleep(Duration::from_millis(35)).await;
let first_last_ok = crate::health::snapshot_json()["components"][&component_name]
["last_ok"]
.as_str()
.unwrap_or("")
.to_string();
assert!(!first_last_ok.is_empty());
tokio::time::sleep(Duration::from_millis(70)).await;
let second_last_ok = crate::health::snapshot_json()["components"][&component_name]
["last_ok"]
.as_str()
.unwrap_or("")
.to_string();
let first = chrono::DateTime::parse_from_rfc3339(&first_last_ok)
.expect("last_ok should be valid RFC3339");
let second = chrono::DateTime::parse_from_rfc3339(&second_last_ok)
.expect("last_ok should be valid RFC3339");
assert!(second > first, "expected periodic health heartbeat refresh");
drop(rx);
let join = tokio::time::timeout(Duration::from_secs(1), handle).await;
assert!(join.is_ok(), "listener should stop after channel shutdown");
assert!(calls.load(Ordering::SeqCst) >= 1);
}
#[test]
fn maybe_restart_daemon_systemd_args_regression() {
assert_eq!(
SYSTEMD_STATUS_ARGS,
["--user", "is-active", "zeroclaw.service"]
);
assert_eq!(
SYSTEMD_RESTART_ARGS,
["--user", "restart", "zeroclaw.service"]
);
}
#[test]
fn maybe_restart_daemon_openrc_args_regression() {
assert_eq!(OPENRC_STATUS_ARGS, ["zeroclaw", "status"]);
assert_eq!(OPENRC_RESTART_ARGS, ["zeroclaw", "restart"]);
}
#[test]
fn normalize_merges_consecutive_user_turns() {
let turns = vec![ChatMessage::user("hello"), ChatMessage::user("world")];
let result = normalize_cached_channel_turns(turns);
assert_eq!(result.len(), 1);
assert_eq!(result[0].role, "user");
assert_eq!(result[0].content, "hello\n\nworld");
}
#[test]
fn normalize_preserves_strict_alternation() {
let turns = vec![
ChatMessage::user("hello"),
ChatMessage::assistant("hi"),
ChatMessage::user("bye"),
];
let result = normalize_cached_channel_turns(turns);
assert_eq!(result.len(), 3);
assert_eq!(result[0].content, "hello");
assert_eq!(result[1].content, "hi");
assert_eq!(result[2].content, "bye");
}
#[test]
fn normalize_merges_multiple_consecutive_user_turns() {
let turns = vec![
ChatMessage::user("a"),
ChatMessage::user("b"),
ChatMessage::user("c"),
];
let result = normalize_cached_channel_turns(turns);
assert_eq!(result.len(), 1);
assert_eq!(result[0].role, "user");
assert_eq!(result[0].content, "a\n\nb\n\nc");
}
#[test]
fn normalize_empty_input() {
let result = normalize_cached_channel_turns(vec![]);
assert!(result.is_empty());
}
#[tokio::test]
async fn e2e_photo_attachment_rejected_by_non_vision_provider() {
let channel_impl = Arc::new(RecordingChannel::default());
let channel: Arc<dyn Channel> = channel_impl.clone();
let mut channels_by_name = HashMap::new();
channels_by_name.insert(channel.name().to_string(), channel);
let runtime_ctx = Arc::new(ChannelRuntimeContext {
channels_by_name: Arc::new(channels_by_name),
provider: Arc::new(DummyProvider),
default_provider: Arc::new("dummy".to_string()),
memory: Arc::new(NoopMemory),
tools_registry: Arc::new(vec![]),
observer: Arc::new(NoopObserver),
system_prompt: Arc::new("You are a helpful assistant.".to_string()),
model: Arc::new("test-model".to_string()),
temperature: 0.0,
auto_save_memory: false,
max_tool_iterations: 5,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
provider_cache: Arc::new(Mutex::new(HashMap::new())),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
api_url: None,
reliability: Arc::new(crate::config::ReliabilityConfig::default()),
provider_runtime_options: providers::ProviderRuntimeOptions::default(),
workspace_dir: Arc::new(std::env::temp_dir()),
message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS,
interrupt_on_new_message: false,
multimodal: crate::config::MultimodalConfig::default(),
hooks: None,
non_cli_excluded_tools: Arc::new(Vec::new()),
});
process_channel_message(
runtime_ctx,
traits::ChannelMessage {
id: "msg-photo-1".to_string(),
sender: "zeroclaw_user".to_string(),
reply_target: "chat-photo".to_string(),
content: "[IMAGE:/tmp/workspace/photo_99_1.jpg]\n\nWhat is this?".to_string(),
channel: "test-channel".to_string(),
timestamp: 1,
thread_ts: None,
},
CancellationToken::new(),
)
.await;
let sent = channel_impl.sent_messages.lock().await;
assert_eq!(sent.len(), 1, "expected exactly one reply message");
assert!(
sent[0].contains("does not support vision"),
"reply must mention vision capability error, got: {}",
sent[0]
);
assert!(
sent[0].contains("⚠️ Error"),
"reply must start with error prefix, got: {}",
sent[0]
);
}
#[tokio::test]
async fn e2e_failed_vision_turn_does_not_poison_follow_up_text_turn() {
let channel_impl = Arc::new(RecordingChannel::default());
let channel: Arc<dyn Channel> = channel_impl.clone();
let mut channels_by_name = HashMap::new();
channels_by_name.insert(channel.name().to_string(), channel);
let runtime_ctx = Arc::new(ChannelRuntimeContext {
channels_by_name: Arc::new(channels_by_name),
provider: Arc::new(DummyProvider),
default_provider: Arc::new("dummy".to_string()),
memory: Arc::new(NoopMemory),
tools_registry: Arc::new(vec![]),
observer: Arc::new(NoopObserver),
system_prompt: Arc::new("You are a helpful assistant.".to_string()),
model: Arc::new("test-model".to_string()),
temperature: 0.0,
auto_save_memory: false,
max_tool_iterations: 5,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(HashMap::new())),
provider_cache: Arc::new(Mutex::new(HashMap::new())),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
api_url: None,
reliability: Arc::new(crate::config::ReliabilityConfig::default()),
provider_runtime_options: providers::ProviderRuntimeOptions::default(),
workspace_dir: Arc::new(std::env::temp_dir()),
message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS,
interrupt_on_new_message: false,
multimodal: crate::config::MultimodalConfig::default(),
hooks: None,
non_cli_excluded_tools: Arc::new(Vec::new()),
});
process_channel_message(
Arc::clone(&runtime_ctx),
traits::ChannelMessage {
id: "msg-photo-1".to_string(),
sender: "zeroclaw_user".to_string(),
reply_target: "chat-photo".to_string(),
content: "[IMAGE:/tmp/workspace/photo_99_1.jpg]\n\nWhat is this?".to_string(),
channel: "test-channel".to_string(),
timestamp: 1,
thread_ts: None,
},
CancellationToken::new(),
)
.await;
process_channel_message(
Arc::clone(&runtime_ctx),
traits::ChannelMessage {
id: "msg-text-2".to_string(),
sender: "zeroclaw_user".to_string(),
reply_target: "chat-photo".to_string(),
content: "What is WAL?".to_string(),
channel: "test-channel".to_string(),
timestamp: 2,
thread_ts: None,
},
CancellationToken::new(),
)
.await;
let sent = channel_impl.sent_messages.lock().await;
assert_eq!(sent.len(), 2, "expected one error and one successful reply");
assert!(
sent[0].contains("does not support vision"),
"first reply must mention vision capability error, got: {}",
sent[0]
);
assert!(
sent[1].ends_with(":ok"),
"second reply should succeed for text-only turn, got: {}",
sent[1]
);
drop(sent);
let histories = runtime_ctx
.conversation_histories
.lock()
.unwrap_or_else(|e| e.into_inner());
let turns = histories
.get("test-channel_zeroclaw_user")
.expect("history should exist for sender");
assert_eq!(turns.len(), 2);
assert_eq!(turns[0].role, "user");
assert_eq!(turns[0].content, "What is WAL?");
assert_eq!(turns[1].role, "assistant");
assert_eq!(turns[1].content, "ok");
assert!(
turns.iter().all(|turn| !turn.content.contains("[IMAGE:")),
"failed vision turn must not persist image marker content"
);
}
}