use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration};
use anyhow::{Context, Result};
use tokio::sync::{broadcast, mpsc};
use tracing::{debug, error, info, warn};
use super::channels::{start_channels, start_custom_channels};
use rsclaw_provider::build::build_providers;
use crate::{
MemoryTier,
cron::CronRunner,
gateway::{
LiveConfig,
hot_reload::{ConfigChange, FileWatcher},
},
server::{AppState, serve},
};
use rsclaw_agent::{
AgentMessage, AgentRegistry, AgentReply, AgentRuntime, AgentSpawner, MemoryStore,
PendingAnalysis,
};
use rsclaw_channel::OutboundMessage;
use rsclaw_config::{self as config, runtime::RuntimeConfig, schema::BindMode};
use rsclaw_plugin::{MemoryStoreSlot, PluginRegistry, load_all_plugins};
use rsclaw_provider::registry::ProviderRegistry;
use rsclaw_skill::{SkillRegistry, load_skills};
use rsclaw_store::Store;
const SYNC_ONLY_CHANNELS: &[&str] = &["api"];
fn is_sync_only_channel(name: &str) -> bool {
SYNC_ONLY_CHANNELS.contains(&name)
}
fn enrich_process_path() {
let sep = if cfg!(windows) { ';' } else { ':' };
let current = std::env::var("PATH").unwrap_or_default();
let existing: std::collections::HashSet<&str> = current.split(sep).collect();
let mut prefix: Vec<String> = Vec::new();
let mut push_if = |p: String| {
if std::path::Path::new(&p).is_dir() && !existing.contains(p.as_str()) {
prefix.push(p);
}
};
let tools = rsclaw_config::loader::base_dir().join("tools");
#[cfg(unix)]
{
push_if(tools.join("bin").to_string_lossy().to_string());
}
#[cfg(windows)]
{
if let Ok(entries) = std::fs::read_dir(&tools) {
for entry in entries.flatten() {
let p = entry.path();
if p.is_dir() {
push_if(p.join("node_modules").join(".bin").to_string_lossy().to_string());
push_if(p.join("bin").to_string_lossy().to_string());
push_if(p.to_string_lossy().to_string());
}
}
}
push_if(
tools
.join("claude-code")
.join("node_modules")
.join("@anthropic-ai")
.join("claude-code")
.join("bin")
.to_string_lossy()
.to_string(),
);
}
#[cfg(target_os = "macos")]
{
for p in ["/usr/local/bin", "/opt/homebrew/bin", "/opt/homebrew/sbin"] {
push_if(p.to_owned());
}
}
#[cfg(target_os = "linux")]
{
for p in ["/usr/local/bin"] {
push_if(p.to_owned());
}
}
if let Some(home) = dirs_next::home_dir() {
for rel in [
".local/bin",
".cargo/bin",
"bin",
"go/bin",
".bun/bin",
".opencode/bin",
] {
push_if(home.join(rel).to_string_lossy().to_string());
}
}
if prefix.is_empty() {
return;
}
for entry in current.split(sep) {
if !entry.is_empty() {
prefix.push(entry.to_string());
}
}
let mut seen = std::collections::HashSet::new();
prefix.retain(|p| seen.insert(p.clone()));
let joined = prefix.join(&sep.to_string());
unsafe {
std::env::set_var("PATH", &joined);
}
tracing::info!(path = %joined, "enriched process PATH for subprocess tool resolution");
}
pub async fn start_gateway(config: Arc<RuntimeConfig>, tier: MemoryTier) -> Result<()> {
wait_for_parent_release();
crate::cmd::tools::sync_tool_shims();
enrich_process_path();
rsclaw_config::apply_proxy_env(&config);
rsclaw_agent::evolution::init_evolution_config(
rsclaw_agent::evolution::EvolutionConfig::from_raw(config.ext.evolution.as_ref()),
);
propagate_skill_registry_env(&config);
propagate_user_env(&config);
let base_dir = rsclaw_config::loader::base_dir();
let data_dir = base_dir.join("var/data");
std::fs::create_dir_all(&data_dir).context("create data dir")?;
{
let lang = config
.raw
.gateway
.as_ref()
.and_then(|g| g.language.as_deref());
if let Err(e) = rsclaw_agent::bootstrap::seed_tools(&base_dir, lang) {
warn!("failed to seed tool prompts: {e:#}");
}
}
let store = match Store::open(&data_dir, tier) {
Ok(s) => Arc::new(s),
Err(e) => {
let msg = format!("{e:#}");
if msg.contains("already open") || msg.contains("Cannot acquire lock") {
eprintln!(" [!] Database locked by another gateway instance. Exiting cleanly.");
std::process::exit(0);
}
return Err(e).context("open store");
}
};
info!("store opened at {}", data_dir.display());
crate::cron::init_cron_store(Arc::clone(&store.db));
let providers = Arc::new(build_providers(&config));
info!("{} provider(s) registered", providers.names().len());
let global_skills = base_dir.join("skills");
let skills = Arc::new(
load_skills(&global_skills, None, config.ext.skills.as_ref()).unwrap_or_else(|e| {
warn!("failed to load skills: {e:#}");
SkillRegistry::new()
}),
);
info!("{} skill(s) loaded", skills.len());
let (al_s, al_p) = rsclaw_skill::allowlist::load_cached();
info!(
skills = al_s,
plugins = al_p,
"allowlist: loaded from cache"
);
tokio::spawn(async {
if let Err(e) = rsclaw_skill::allowlist::refresh().await {
warn!("allowlist refresh failed (keeping cache; fail-closed): {e:#}");
}
});
tokio::spawn(async {
crate::cmd::tools::fetch_manifest_if_missing().await;
});
let (registry, receivers) =
AgentRegistry::from_config_with_receivers(&config, Arc::clone(&providers));
let registry = Arc::new(registry);
info!("{} agent(s) registered", registry.len());
let (notification_tx, notification_rx) =
broadcast::channel::<rsclaw_channel::OutboundMessage>(64);
let (restart_request_tx, _restart_request_rx) =
tokio::sync::broadcast::channel::<rsclaw_events::RestartRequest>(16);
let pending_restart: Arc<std::sync::RwLock<Option<rsclaw_events::RestartRequest>>> =
Arc::new(std::sync::RwLock::new(None));
let shutdown = crate::gateway::ShutdownCoordinator::new();
let search_cfg = config.raw.memory_search.as_ref();
let model_dir = {
let base_zh = base_dir.join("models/bge-base-zh");
let zh = base_dir.join("models/bge-small-zh");
let en = base_dir.join("models/bge-small-en");
if base_zh.join("model.safetensors").exists() {
base_zh
} else if zh.join("model.safetensors").exists() {
zh
} else if en.join("model.safetensors").exists() {
en
} else {
zh }
};
ensure_bge_model_present(&model_dir, search_cfg).await?;
let memory = match MemoryStore::open(&data_dir, Some(&model_dir), tier, search_cfg).await {
Ok(mut m) => {
m.set_search_index(Arc::clone(&store.search));
match m.reindex_bm25() {
Ok(n) if n > 0 => info!(indexed = n, "memory: BM25 backfill complete"),
Ok(_) => {}
Err(e) => warn!("memory: BM25 reindex failed (vector still works): {e:#}"),
}
info!("memory store opened");
let arc = Arc::new(tokio::sync::Mutex::new(m));
rsclaw_agent::memory::set_global_store(Arc::clone(&arc));
Some(arc)
}
Err(e) => {
return Err(anyhow::anyhow!("failed to open memory store: {e:#}"));
}
};
if let Some(mem_arc) = memory.as_ref() {
let pending = {
let mem = mem_arc.lock().await;
mem.pending_migration_count()
};
if pending > 0 {
info!(
pending,
"embedder dimension changed since last run; spawning background re-embed"
);
let bg_mem = Arc::clone(mem_arc);
tokio::spawn(async move {
if let Err(e) = run_embedder_reembed(&bg_mem).await {
warn!(
"background re-embed failed ({e:#}); search will be partial until next restart"
);
}
});
}
}
let plugins_dir = base_dir.join("plugins");
let wasm_browser: Arc<tokio::sync::Mutex<Option<rsclaw_browser::BrowserSession>>> =
Arc::new(tokio::sync::Mutex::new(None));
let vision_model = config
.raw
.agents
.as_ref()
.and_then(|a| a.defaults.as_ref())
.and_then(|d| d.model.as_ref())
.and_then(|m| m.vision_head().map(String::from));
let mut plugin_registry = load_all_plugins(
&plugins_dir,
config.ext.plugins.as_ref(),
Arc::clone(&wasm_browser),
Some(notification_tx.clone()),
Some(Arc::clone(&providers)),
vision_model,
)
.await
.unwrap_or_else(|e| {
warn!("plugin load error: {e:#}");
PluginRegistry::new()
});
if let Some(ref mem_arc) = memory
&& !plugin_registry.slots.has_memory()
{
let slot = MemoryStoreSlot::new(Arc::clone(mem_arc));
let _ = plugin_registry.slots.set_memory(Arc::new(slot), "built-in");
}
info!(
"{} plugin(s) loaded (js={}, wasm={}), memory slot: {}",
plugin_registry.len(),
plugin_registry.js_count(),
plugin_registry.wasm_count(),
plugin_registry.slots.has_memory()
);
let wasm_plugins = Arc::new(plugin_registry.take_wasm_plugins());
let plugins = Arc::new(plugin_registry);
for handle in registry.all() {
handle.set_wasm_plugins(Arc::clone(&wasm_plugins));
handle.set_notification_tx(notification_tx.clone());
}
let (event_tx, _) = broadcast::channel::<rsclaw_events::AgentEvent>(1024);
let cap_manager = std::sync::Arc::new(rsclaw_cap::CapAgentManager::new(event_tx.clone()));
let cap_live_manager = {
let mut m = rsclaw_cap::CapLiveManager::new(event_tx.clone());
m.set_notification_tx(notification_tx.clone());
std::sync::Arc::new(m)
};
rsclaw_cap::set_global_cap_live(std::sync::Arc::clone(&cap_live_manager));
let live = Arc::new(LiveConfig::new((*config).clone()));
let model_health = rsclaw_provider::health::ProviderHealthRegistry::new();
let spawner = AgentSpawner::new_arc(
Arc::clone(®istry),
Arc::clone(&config),
Arc::clone(&live),
Arc::clone(&providers),
Arc::clone(&skills),
Arc::clone(&store),
memory.clone(),
event_tx.clone(),
Some(Arc::clone(&plugins)),
model_health.clone(),
Some(Arc::clone(&cap_manager)),
Some(Arc::clone(&cap_live_manager)),
);
let mcp_registry = Arc::new(rsclaw_mcp::McpRegistry::new());
spawn_mcp_servers(&config, Arc::clone(&mcp_registry)).await;
let heartbeat_memory = memory.clone();
let bypass_all_default = config
.raw
.tools
.as_ref()
.and_then(|t| t.computer_use.as_ref())
.and_then(|cu| cu.bypass_all)
.unwrap_or(false);
let computer_permission = Arc::new(rsclaw_computer::permission::RedbPermissionStore::new(
Arc::clone(&store.db),
bypass_all_default,
));
let (computer_permission_tx, _) =
broadcast::channel::<rsclaw_computer::permission::PermissionRequest>(64);
let (computer_status_tx, _) =
broadcast::channel::<rsclaw_computer::status::ComputerUseStatus>(256);
let computer_runs: Arc<
tokio::sync::RwLock<std::collections::HashMap<String, Arc<std::sync::atomic::AtomicBool>>>,
> = Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new()));
spawn_agent_tasks(
receivers,
Arc::clone(®istry),
Arc::clone(&config),
Arc::clone(&live),
Arc::clone(&store),
Arc::clone(&skills),
Arc::clone(&providers),
memory.clone(),
event_tx.clone(),
Some(Arc::clone(&spawner)),
Some(Arc::clone(&plugins)),
Some(Arc::clone(&mcp_registry)),
Some(notification_tx.clone()),
Arc::clone(&wasm_plugins),
Arc::clone(&computer_permission),
computer_permission_tx.clone(),
computer_status_tx.clone(),
Arc::clone(&computer_runs),
model_health.clone(),
Arc::clone(&cap_manager),
Arc::clone(&cap_live_manager),
);
let lang = config
.raw
.gateway
.as_ref()
.and_then(|g| g.language.as_deref());
info!(lang = ?lang, "i18n: gateway language config");
if let Some(lang) = lang {
rsclaw_i18n::set_default_lang(lang);
info!(
resolved = rsclaw_i18n::default_lang(),
"i18n: default language set"
);
}
let mut channel_manager = rsclaw_channel::ChannelManager::new(tier);
let feishu_slot: Arc<tokio::sync::OnceCell<Arc<rsclaw_channel::feishu::FeishuChannel>>> =
Arc::new(tokio::sync::OnceCell::new());
let wecom_slot: Arc<tokio::sync::OnceCell<Arc<rsclaw_channel::wecom::WeComChannel>>> =
Arc::new(tokio::sync::OnceCell::new());
let whatsapp_slot: Arc<tokio::sync::OnceCell<Arc<rsclaw_channel::whatsapp::WhatsAppChannel>>> =
Arc::new(tokio::sync::OnceCell::new());
let line_slot: Arc<tokio::sync::OnceCell<Arc<rsclaw_channel::line::LineChannel>>> =
Arc::new(tokio::sync::OnceCell::new());
let zalo_slot: Arc<tokio::sync::OnceCell<Arc<rsclaw_channel::zalo::ZaloChannel>>> =
Arc::new(tokio::sync::OnceCell::new());
let dm_enforcers: Arc<
std::sync::RwLock<std::collections::HashMap<String, Arc<rsclaw_channel::DmPolicyEnforcer>>>,
> = Arc::new(std::sync::RwLock::new(std::collections::HashMap::new()));
let channel_senders: Arc<
std::sync::RwLock<std::collections::HashMap<String, mpsc::Sender<OutboundMessage>>>,
> = Arc::new(std::sync::RwLock::new(std::collections::HashMap::new()));
super::task_queue::install_channel_senders(Arc::clone(&channel_senders));
let task_queue_mgr = Arc::new(super::task_queue::TaskQueueManager::new(Arc::clone(
&store.db,
)));
super::task_queue::install_task_queue(Arc::clone(&task_queue_mgr));
rsclaw_types::set_task_queue_host(Arc::new(super::task_queue::GatewayTaskQueueHost));
rsclaw_plugin::set_plugin_background_host(Arc::new(
super::task_queue::GatewayPluginBackgroundHost,
));
start_channels(
&config,
Arc::clone(®istry),
&mut channel_manager,
Arc::clone(&feishu_slot),
Arc::clone(&wecom_slot),
Arc::clone(&whatsapp_slot),
Arc::clone(&line_slot),
Arc::clone(&zalo_slot),
Arc::clone(&dm_enforcers),
Arc::clone(&store.db),
Arc::clone(&channel_senders),
Arc::clone(&task_queue_mgr),
shutdown.clone(),
);
{
let worker = Arc::new(super::task_queue::TaskQueueWorker::new(
Arc::clone(&task_queue_mgr),
Arc::clone(®istry),
Arc::clone(&channel_senders),
shutdown.clone(),
(*config).clone(),
));
tokio::spawn(async move { worker.run().await });
info!("task queue worker started");
}
{
let worker = Arc::new(super::external_jobs_worker::ExternalJobsWorker::new(
Arc::clone(&store.db),
notification_tx.clone(),
shutdown.clone(),
Arc::clone(&config),
));
tokio::spawn(async move { worker.run().await });
info!("external jobs worker started");
}
{
let senders = Arc::clone(&channel_senders);
let mut rx = notification_rx;
tokio::spawn(async move {
info!("notification router started");
while let Ok(msg) = rx.recv().await {
if let Some(ref ch_name) = msg.channel {
let tx = {
let senders_guard =
senders.read().expect("channel_senders RwLock poisoned");
msg.account
.as_ref()
.filter(|a| !a.is_empty())
.and_then(|acct| {
let key = format!("{ch_name}/{acct}");
senders_guard.get(&key).cloned()
})
.or_else(|| senders_guard.get(ch_name).cloned())
};
if let Some(tx) = tx {
info!(channel = %ch_name, target_id = %msg.target_id, "routing notification");
if let Err(e) = tx.send(msg.clone()).await {
tracing::warn!(error = %e, "notification send failed");
}
} else if is_sync_only_channel(ch_name) {
tracing::debug!(
channel = %ch_name,
"notification: sync-only channel, no sender expected"
);
} else {
warn!(channel = %ch_name, "no channel sender registered for notification");
}
} else {
let first = {
let guard = senders.read().expect("channel_senders RwLock poisoned");
guard.iter().next().map(|(k, v)| (k.clone(), v.clone()))
};
if let Some((ch_name, tx)) = first {
info!(channel = %ch_name, "routing notification to default channel");
if let Err(e) = tx.send(msg.clone()).await {
tracing::warn!(error = %e, "notification send failed");
}
} else {
warn!("notification: no channels registered");
}
}
}
info!("notification router ended");
});
}
let hb_enabled = config
.agents
.defaults
.heartbeat
.as_ref()
.and_then(|h| h.enabled)
.unwrap_or(true);
if hb_enabled {
let hb_host: std::sync::Arc<dyn rsclaw_heartbeat::HeartbeatHost> =
std::sync::Arc::new(crate::gateway::heartbeat_host::RuntimeHeartbeatHost {
registry: Arc::clone(®istry),
shutdown: Some(shutdown.clone()),
defaults: config.agents.defaults.clone(),
});
let runner = rsclaw_heartbeat::HeartbeatRunner::new(
hb_host,
&data_dir,
heartbeat_memory,
)
.with_meditation_deps(rsclaw_heartbeat::MeditationDeps {
config: Arc::clone(&config),
db: Arc::clone(&store.db),
});
let runner = std::sync::Arc::new(runner);
runner.run();
info!("heartbeat runner started");
}
let pid_file = rsclaw_config::loader::pid_file();
if let Some(parent) = pid_file.parent() {
if let Err(e) = std::fs::create_dir_all(parent) {
warn!("could not create PID file directory: {e}");
}
}
let pid = std::process::id();
if let Err(e) = std::fs::write(&pid_file, pid.to_string()) {
warn!("could not write PID file: {e}");
}
info!(pid, "gateway PID written to {}", pid_file.display());
if let Some(config_path) = config::loader::detect_config_path() {
let (mut watcher, mut reload_rx) = FileWatcher::new(config_path);
tokio::spawn(async move { watcher.run().await });
let live_reload = Arc::clone(&live);
let (restart_tx, _) = broadcast::channel::<Vec<String>>(8);
let bridge_tx = restart_request_tx.clone();
let bridge_pending = Arc::clone(&pending_restart);
let bridge_shutdown = shutdown.clone();
let cfg_lang = config
.raw
.gateway
.as_ref()
.and_then(|g| g.language.as_deref())
.map(str::to_owned);
tokio::spawn(async move {
let lang = rsclaw_i18n::resolve_lang(cfg_lang.as_deref().unwrap_or("en")).to_owned();
loop {
match reload_rx.recv().await {
Ok(ConfigChange::FullReload(new_cfg)) => {
let new_owned = (*new_cfg).clone();
let needs_restart = live_reload.apply(new_owned, &restart_tx).await;
if needs_restart.is_empty() {
info!("config hot-reload applied (hot-safe fields only)");
} else {
warn!(?needs_restart, "config change requires gateway restart");
publish_restart(
&bridge_tx,
&bridge_pending,
&bridge_shutdown,
rsclaw_events::RestartRequest::new(
rsclaw_events::RestartReason::ConfigChanged {
sections: needs_restart,
},
rsclaw_events::RestartUrgency::Recommended,
rsclaw_i18n::t("restart_required_config_changed", &lang),
),
);
}
}
Ok(ConfigChange::RequiresRestart(fields)) => {
warn!(?fields, "config change requires restart — surfacing banner");
publish_restart(
&bridge_tx,
&bridge_pending,
&bridge_shutdown,
rsclaw_events::RestartRequest::new(
rsclaw_events::RestartReason::ConfigChanged { sections: fields },
rsclaw_events::RestartUrgency::Required,
rsclaw_i18n::t("restart_required_config_changed", &lang),
),
);
}
Ok(_) => {}
Err(_) => break,
}
}
});
}
let devices_path = rsclaw_config::loader::base_dir().join("var/data/devices.json");
let devices = Arc::new(crate::ws::DeviceStore::new(devices_path));
let ws_conns = Arc::new(crate::ws::ConnRegistry::new());
let custom_webhooks: Arc<
std::sync::RwLock<
std::collections::HashMap<String, Arc<rsclaw_channel::custom::CustomWebhookChannel>>,
>,
> = Arc::new(std::sync::RwLock::new(std::collections::HashMap::new()));
start_custom_channels(
&config,
Arc::clone(®istry),
&mut channel_manager,
Arc::clone(&custom_webhooks),
Arc::clone(&channel_senders),
Arc::clone(&store.db),
shutdown.clone(),
);
{
let desktop_ch = Arc::new(crate::gateway::desktop_channel::DesktopChannel::new(Arc::clone(
&ws_conns,
)));
let (desktop_out_tx, mut desktop_out_rx) = mpsc::channel::<OutboundMessage>(64);
{
let mut senders = channel_senders
.write()
.expect("channel_senders lock poisoned");
senders.insert("desktop".to_string(), desktop_out_tx.clone());
senders.insert("ws".to_string(), desktop_out_tx);
}
let desktop_for_bridge = Arc::clone(&desktop_ch);
tokio::spawn(async move {
use rsclaw_channel::Channel;
while let Some(msg) = desktop_out_rx.recv().await {
if let Err(e) = desktop_for_bridge.send(msg).await {
warn!(error = %e, "desktop notification bridge: send failed");
}
}
});
if let Err(e) = channel_manager.register(desktop_ch as Arc<dyn rsclaw_channel::Channel>) {
warn!("failed to register desktop channel: {e}");
}
}
let channel_manager = Arc::new(channel_manager);
crate::gateway::watch::WatchRegistry::init(Arc::clone(&channel_manager));
tracing::info!("watch registry initialized");
let (cron_reload_tx, _cron_reload_rx) = tokio::sync::broadcast::channel::<()>(16);
crate::cron::install_reload_sender(cron_reload_tx.clone());
{
let cron_cfg =
config
.ops
.cron
.clone()
.unwrap_or_else(|| rsclaw_config::schema::CronConfig {
enabled: Some(true),
max_concurrent_runs: None,
session_retention: None,
run_log: None,
jobs: None,
default_delivery: None,
});
let cron_enabled = cron_cfg.enabled.unwrap_or(true);
let cron_file = crate::cron::resolve_cron_store_path();
let (jobs, parse_ok) = crate::cron::load_cron_jobs();
if !parse_ok {
error!(file = %cron_file.display(), "cron.json5 has syntax errors - jobs will NOT run until file is fixed");
} else if !jobs.is_empty() {
info!(file = %cron_file.display(), count = jobs.len(), "loaded cron jobs");
}
if cron_enabled {
let cron_data_dir = base_dir.join("var").join("data");
let runner = CronRunner::new_with_shutdown(
&cron_cfg,
jobs,
!parse_ok, Arc::clone(®istry),
Arc::clone(&channel_manager),
cron_data_dir,
cron_reload_tx.clone(),
Arc::clone(&ws_conns),
Some(shutdown.clone()),
)
.with_daemon_agent_ids(config.agents.daemon_agent_ids());
tokio::spawn(async move {
if let Err(e) = runner.run().await {
error!("cron runner error: {e:#}");
}
});
info!("cron runner started");
}
}
let a2a_bus = crate::a2a::event::TaskEventBus::new();
let a2a_task_store = {
let path = rsclaw_config::loader::base_dir().join("var/data/a2a/tasks.redb");
Arc::new(crate::a2a::store::TaskStore::open(&path).expect("open A2A task store"))
};
let a2a_push_dispatcher = Arc::new(crate::a2a::push::PushDispatcher::new(
Arc::clone(&a2a_task_store),
a2a_bus.clone(),
));
let a2a_relay_hub = Arc::new(crate::a2a::relay::RelayHub::new());
if config.gateway.a2a_relay.mode == rsclaw_config::runtime::A2aRelayModeRuntime::Hub
&& config.gateway.auth_token.is_none()
&& config.gateway.a2a_principals.is_empty()
{
warn!(
relay_id = %config.gateway.a2a_relay.relay_id,
"a2a relay is running in Hub mode without inbound auth — any HTTP client \
can invoke any connected spoke agent. Set `gateway.auth.token` or define \
`gateway.a2a.clients` before exposing this gateway to a network."
);
}
let knowledge_svc = match rsclaw_kb::KnowledgeService::open(base_dir.join("kb")) {
Ok(svc) => {
let svc = Arc::new(svc);
svc.spawn_worker();
rsclaw_kb::set_global_service(Arc::clone(&svc));
Some(svc)
}
Err(e) => {
tracing::error!("knowledge base disabled: failed to open store: {e:#}");
None
}
};
let state = AppState {
config: Arc::clone(&config),
live: Arc::clone(&live),
agents: Arc::clone(®istry),
store: Arc::clone(&store),
event_bus: event_tx,
computer_permission: Arc::clone(&computer_permission),
computer_permission_tx: computer_permission_tx.clone(),
computer_status_tx: computer_status_tx.clone(),
computer_runs: Arc::clone(&computer_runs),
devices,
ws_conns,
feishu: Arc::clone(&feishu_slot),
wecom: Arc::clone(&wecom_slot),
whatsapp: Arc::clone(&whatsapp_slot),
line: Arc::clone(&line_slot),
zalo: Arc::clone(&zalo_slot),
started_at: std::time::Instant::now(),
dm_enforcers: Arc::clone(&dm_enforcers),
custom_webhooks: Arc::clone(&custom_webhooks),
cron_reload: cron_reload_tx,
notification_tx: notification_tx.clone(),
wasm_plugins: Arc::clone(&wasm_plugins),
plugins: Arc::clone(&plugins),
restart_request_tx: restart_request_tx.clone(),
pending_restart: Arc::clone(&pending_restart),
shutdown: shutdown.clone(),
task_event_bus: a2a_bus,
task_cancels: Arc::new(dashmap::DashMap::new()),
suspended_tasks: Arc::new(dashmap::DashMap::new()),
task_store: a2a_task_store,
push_dispatcher: a2a_push_dispatcher,
relay_hub: a2a_relay_hub,
knowledge: knowledge_svc,
memory: memory.clone(),
model_health: model_health.clone(),
};
crate::a2a::relay::start_spoke_if_configured(state.clone());
crate::ws::tick::start_tick_loop(Arc::clone(&state.ws_conns));
{
let relay_hub = Arc::clone(&state.relay_hub);
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
let swept = relay_hub.sweep_expired_streams();
if swept > 0 {
tracing::info!(swept, "relay stream deadline sweeper");
}
}
});
}
tokio::spawn(async {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
rsclaw_browser::pool::BrowserPool::global()
.reap_if_idle()
.await;
}
});
let bind_addr = resolve_bind_addr(&config);
info!("starting HTTP server on {bind_addr}");
for w in rsclaw_desktop::macos_perm::preflight_warnings() {
warn!("macOS permission preflight: {w}");
}
tokio::spawn(async {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
let client = reqwest::Client::builder()
.user_agent("rsclaw/dev")
.timeout(std::time::Duration::from_secs(10))
.build();
let Ok(client) = client else { return };
let sources = [
crate::cmd::update::RSCLAW_VERSION_URL,
"https://api.github.com/repos/rsclaw-ai/rsclaw/releases?per_page=10",
];
let mut release: Option<serde_json::Value> = None;
for url in sources {
if let Ok(resp) = client.get(url).send().await {
if resp.status().is_success() {
let body = resp.bytes().await.unwrap_or_default();
if let Some(found) = crate::cmd::update::parse_release_body(&body) {
release = Some(found);
break;
}
}
}
}
if let Some(release) = release {
{
let latest_raw = release["tag_name"].as_str().unwrap_or("");
let current_raw = option_env!("RSCLAW_BUILD_VERSION").unwrap_or("dev");
fn strip_ver(s: &str) -> &str {
let s = s.trim_start_matches('v');
let s = s.split_once(' ').map(|(v, _)| v).unwrap_or(s);
s.split_once('-').map(|(v, _)| v).unwrap_or(s)
}
fn ver_newer(latest: &str, current: &str) -> bool {
let parse = |s: &str| -> Vec<u32> {
s.split('.').filter_map(|p| p.parse().ok()).collect()
};
let l = parse(latest);
let c = parse(current);
for i in 0..l.len().max(c.len()) {
let lv = l.get(i).copied().unwrap_or(0);
let cv = c.get(i).copied().unwrap_or(0);
if lv > cv {
return true;
}
if lv < cv {
return false;
}
}
false
}
let latest = strip_ver(latest_raw);
let current = strip_ver(current_raw);
if !latest.is_empty() && ver_newer(latest, current) {
info!(
current = current_raw,
latest = latest_raw,
"new rsclaw version available -- run `rsclaw update` to upgrade"
);
}
}
}
});
tokio::spawn(async {
const DEFAULTS_URL: &str = "https://app.rsclaw.ai/defaults.toml";
const THROTTLE: std::time::Duration = std::time::Duration::from_secs(2 * 60 * 60);
tokio::time::sleep(std::time::Duration::from_secs(8)).await;
let stamp = rsclaw_config::loader::base_dir().join(".defaults_fetch");
let now = std::time::SystemTime::now();
if let Ok(meta) = std::fs::metadata(&stamp) {
if let Ok(modified) = meta.modified() {
if now.duration_since(modified).unwrap_or(THROTTLE) < THROTTLE {
debug!("remote defaults: throttled (checked < 2h ago)");
return;
}
}
}
let Ok(client) = reqwest::Client::builder()
.user_agent("rsclaw/dev")
.timeout(std::time::Duration::from_secs(10))
.build()
else {
return;
};
match client.get(DEFAULTS_URL).send().await {
Ok(resp) if resp.status().is_success() => {
if let Err(e) = std::fs::write(&stamp, b"") {
tracing::debug!(error = %e, "failed to write defaults fetch throttle stamp");
}
const MAX_DEFAULTS_BYTES: usize = 1_048_576;
if let Some(len) = resp.content_length() {
if len > MAX_DEFAULTS_BYTES as u64 {
tracing::warn!(size = len, "remote defaults.toml too large, skipping");
return;
}
}
use futures::StreamExt;
let mut stream = resp.bytes_stream();
let mut buf: Vec<u8> = Vec::new();
let mut overflow = false;
while let Some(chunk) = stream.next().await {
match chunk {
Ok(bytes) => {
if buf.len() + bytes.len() > MAX_DEFAULTS_BYTES {
overflow = true;
break;
}
buf.extend_from_slice(&bytes);
}
Err(e) => {
debug!(error = %e, "remote defaults.toml read failed; keeping local");
return;
}
}
}
if overflow {
tracing::warn!(
cap = MAX_DEFAULTS_BYTES,
"remote defaults.toml exceeded size cap mid-stream, skipping"
);
return;
}
match String::from_utf8(buf) {
Ok(body) => match rsclaw_config::loader::merge_remote_defaults(&body) {
Ok(true) => info!(
"remote defaults.toml updated; takes effect on next restart"
),
Ok(false) => debug!("remote defaults.toml not newer than local"),
Err(e) => {
debug!(error = %e, "remote defaults.toml invalid; keeping local")
}
},
Err(e) => debug!(error = %e, "remote defaults.toml not valid UTF-8; keeping local"),
}
}
Ok(resp) => debug!(status = %resp.status(), "remote defaults.toml fetch non-200"),
Err(e) => debug!(error = %e, "remote defaults.toml fetch failed; keeping local"),
}
});
{
let sd = shutdown.clone();
tokio::spawn(async move {
#[cfg(unix)]
{
use tokio::signal::unix::{SignalKind, signal};
let mut sigterm = match signal(SignalKind::terminate()) {
Ok(s) => s,
Err(e) => {
warn!("failed to install SIGTERM handler: {e:#}");
return;
}
};
tokio::select! {
res = tokio::signal::ctrl_c() => {
if let Err(e) = res {
warn!("ctrl_c handler error: {e:#}");
return;
}
info!("SIGINT received, beginning graceful shutdown");
}
_ = sigterm.recv() => {
info!("SIGTERM received, beginning graceful shutdown");
}
}
}
#[cfg(not(unix))]
{
if let Err(e) = tokio::signal::ctrl_c().await {
warn!("ctrl_c handler error: {e:#}");
return;
}
info!("Ctrl-C received, beginning graceful shutdown");
}
if let Some(reg) = crate::gateway::watch::WatchRegistry::global() {
reg.shutdown_all().await;
}
sd.begin_drain();
});
}
const SHUTDOWN_HARD_TIMEOUT_SECS: u64 = 60;
let shutdown_for_timeout = shutdown.clone();
let result = tokio::select! {
r = serve(state, bind_addr) => r,
_ = async {
shutdown_for_timeout.notified().await;
tokio::time::sleep(Duration::from_secs(SHUTDOWN_HARD_TIMEOUT_SECS)).await;
warn!(
timeout_secs = SHUTDOWN_HARD_TIMEOUT_SECS,
"graceful shutdown hard timeout reached — connections still open, forcing serve to return"
);
} => Ok(()),
};
if shutdown.is_restart_requested() {
const UNDELIVERABLE_ORIGINS: &[&str] = &[
"wechat", "feishu", "telegram", "discord", "qq", "dingtalk",
"wecom", "slack", "whatsapp", "line", "matrix", "signal", "cron",
];
let mut cancelled = 0usize;
for handle in registry.all() {
if let Ok(map) = handle.cancel_tokens.read() {
for (sk, tok) in map.iter() {
let channel = sk.split(':').nth(2).unwrap_or("");
if UNDELIVERABLE_ORIGINS.contains(&channel) {
tok.cancel();
cancelled += 1;
}
}
}
}
if cancelled > 0 {
info!(
cancelled,
"drain: cancelled channel-origin turns whose delivery path already stopped"
);
}
info!("restart requested - waiting for inflight drain (max 60s)");
let deadline = std::time::Instant::now() + Duration::from_secs(60);
loop {
let n = shutdown.inflight();
if n == 0 {
info!("graceful drain: inflight cleared");
break;
}
if std::time::Instant::now() >= deadline {
let stuck: Vec<String> = registry
.all()
.iter()
.filter_map(|h| h.cancel_tokens.read().ok().map(|m| {
m.keys().cloned().collect::<Vec<_>>()
}))
.flatten()
.collect();
warn!(
inflight = n,
stuck = ?stuck,
"graceful drain: 60s timeout reached, restarting anyway"
);
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
if try_service_self_restart() {
info!("service-manager restart dispatched; exiting for supervisor takeover");
std::process::exit(0);
}
let exe = match std::env::current_exe() {
Ok(p) => p,
Err(e) => {
error!("current_exe failed; cannot respawn replacement: {e:#}");
return result;
}
};
let mut cmd = std::process::Command::new(&exe);
let original_args: Vec<String> = std::env::args().collect();
let mut extra_args: Vec<String> = Vec::new();
let mut i = 1; while i < original_args.len() {
match original_args[i].as_str() {
"--dev" => {
extra_args.push("--dev".to_owned());
}
"--profile" => {
extra_args.push("--profile".to_owned());
if let Some(val) = original_args.get(i + 1) {
extra_args.push(val.clone());
i += 1;
}
}
"--base-dir" => {
extra_args.push("--base-dir".to_owned());
if let Some(val) = original_args.get(i + 1) {
extra_args.push(val.clone());
i += 1;
}
}
s if s.starts_with("--profile=") => {
extra_args.push(s.to_owned());
}
s if s.starts_with("--base-dir=") => {
extra_args.push(s.to_owned());
}
_ => {}
}
i += 1;
}
extra_args.extend(["gateway".to_owned(), "run".to_owned()]);
cmd.args(&extra_args);
cmd.env("RSCLAW_PARENT_PID", std::process::id().to_string());
{
let log_path = rsclaw_config::loader::log_file();
if let Some(parent) = log_path.parent() {
let _ = std::fs::create_dir_all(parent);
}
if let Ok(f) = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&log_path)
&& let Ok(f2) = f.try_clone()
{
cmd.stdout(std::process::Stdio::from(f));
cmd.stderr(std::process::Stdio::from(f2));
}
}
#[cfg(target_os = "windows")]
{
use std::os::windows::process::CommandExt;
const CREATE_NO_WINDOW: u32 = 0x0800_0000;
cmd.creation_flags(CREATE_NO_WINDOW);
}
#[cfg(unix)]
{
use std::os::unix::process::CommandExt;
let err = cmd.exec();
error!(
"exec for replacement gateway failed: {err:#}; \
falling back to spawn+exit (may hit lock race)"
);
}
match cmd.spawn() {
Ok(_) => info!("replacement gateway spawned"),
Err(e) => error!("failed to spawn replacement gateway: {e:#}"),
}
std::process::exit(0);
}
if let Err(e) = std::fs::remove_file(&pid_file) {
warn!("could not remove PID file on exit: {e}");
}
result
}
fn wait_for_parent_release() {
let Ok(pid_str) = std::env::var("RSCLAW_PARENT_PID") else {
return;
};
let Ok(pid) = pid_str.trim().parse::<i32>() else {
return;
};
if pid == std::process::id() as i32 {
return;
}
let start = std::time::Instant::now();
let deadline = start + std::time::Duration::from_secs(5);
let mut waited = false;
while std::time::Instant::now() < deadline {
if !parent_alive(pid) {
if waited {
info!(
parent_pid = pid,
elapsed_ms = start.elapsed().as_millis() as u64,
"parent gateway exited; lock released"
);
}
return;
}
waited = true;
std::thread::sleep(std::time::Duration::from_millis(50));
}
warn!(
parent_pid = pid,
"parent gateway still alive after 5s; opening anyway (may hit lock race)"
);
unsafe {
std::env::remove_var("RSCLAW_PARENT_PID");
}
}
fn parent_alive(pid: i32) -> bool {
#[cfg(unix)]
{
let rc = unsafe { libc::kill(pid, 0) };
if rc == 0 {
return true;
}
std::io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH)
}
#[cfg(not(unix))]
{
use std::os::windows::process::CommandExt;
match std::process::Command::new("tasklist")
.args([
"/FI",
&format!("PID eq {pid}"),
"/NH",
"/FO",
"CSV",
])
.creation_flags(0x08000000)
.output()
{
Ok(out) => {
let s = String::from_utf8_lossy(&out.stdout);
s.lines().any(|l| l.contains(&pid.to_string()))
}
Err(_) => false,
}
}
}
fn try_service_self_restart() -> bool {
#[cfg(target_os = "macos")]
{
let label = match std::env::var("XPC_SERVICE_NAME") {
Ok(l) if !l.is_empty() && l != "0" => l,
_ => return false,
};
info!(label = %label, "detected launchd supervision; kickstarting via launchctl");
let uid = unsafe { libc::getuid() };
for target in [
format!("gui/{}/{}", uid, label),
format!("system/{}", label),
] {
let status = std::process::Command::new("launchctl")
.args(["kickstart", "-k", &target])
.status();
if matches!(status, Ok(s) if s.success()) {
info!(target = %target, "launchctl kickstart dispatched");
return true;
}
}
info!("launchctl kickstart unavailable for label {label}; using native respawn");
false
}
#[cfg(target_os = "linux")]
{
if std::env::var("INVOCATION_ID").is_err() {
return false;
}
let unit = std::fs::read_to_string("/proc/self/cgroup")
.ok()
.and_then(|s| {
s.lines()
.find_map(|l| l.rsplit('/').next().filter(|t| t.ends_with(".service")))
.map(str::to_owned)
});
let Some(unit) = unit else {
warn!("INVOCATION_ID set but unit name not found in /proc/self/cgroup; falling back");
return false;
};
info!(unit = %unit, "detected systemd supervision; restarting via systemctl");
for args in [
vec!["--user", "restart", unit.as_str()],
vec!["restart", unit.as_str()],
] {
let status = std::process::Command::new("systemctl").args(&args).status();
if matches!(status, Ok(s) if s.success()) {
info!(unit = %unit, "systemctl restart dispatched");
return true;
}
}
warn!("systemctl restart failed for unit {unit}; falling back to native respawn");
false
}
#[cfg(target_os = "windows")]
{
let in_services_session = std::env::var("SESSIONNAME")
.map(|s| s.eq_ignore_ascii_case("Services"))
.unwrap_or(false);
if !in_services_session {
return false;
}
info!("detected Windows service supervision; signaling SCM via sc stop");
use std::os::windows::process::CommandExt;
let status = std::process::Command::new("sc")
.args(["stop", "rsclaw"])
.creation_flags(0x08000000)
.status();
if matches!(status, Ok(s) if s.success()) {
info!("sc stop dispatched; SCM FailureActions will respawn if configured");
return true;
}
warn!("sc stop failed; falling back to native respawn");
false
}
#[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
{
false
}
}
#[allow(clippy::too_many_arguments)]
fn registry_env_names(name: &str) -> Option<(&'static str, &'static str)> {
match name {
"iwencai" => Some(("IWENCAI_API_KEY", "IWENCAI_BASE_URL")),
_ => None,
}
}
fn propagate_user_env(config: &RuntimeConfig) {
let Some(env_map) = config.raw.env.as_ref() else {
return;
};
apply_user_env_map(&env_map.0);
}
fn apply_user_env_map(env_map: &std::collections::HashMap<String, String>) {
for (key, raw_val) in env_map {
if key.is_empty() {
continue;
}
if std::env::var(key).is_ok() {
continue;
}
let expanded = rsclaw_config::loader::expand_env_vars(raw_val);
unsafe { std::env::set_var(key, &expanded) };
info!(key = %key, "exported user env var from rsclaw.json5");
}
}
fn propagate_skill_registry_env(config: &RuntimeConfig) {
let Some(map) = config.raw.skill_registries.as_ref() else {
return;
};
for (name, entry) in map {
let Some((api_key_var, base_url_var)) = registry_env_names(name) else {
continue;
};
if let Some(key_field) = entry.api_key.as_ref() {
if let Some(val) = key_field.resolve_early().filter(|s| !s.is_empty()) {
if std::env::var(api_key_var).is_err() {
unsafe { std::env::set_var(api_key_var, &val) };
info!(registry = %name, env = api_key_var, "exported registry api key to env");
}
}
}
if let Some(url_field) = entry.base_url.as_ref() {
if let Some(val) = url_field.resolve_early().filter(|s| !s.is_empty()) {
if std::env::var(base_url_var).is_err() {
unsafe { std::env::set_var(base_url_var, &val) };
info!(registry = %name, env = base_url_var, "exported registry base url to env");
}
}
}
}
}
fn spawn_agent_tasks(
receivers: HashMap<String, mpsc::Receiver<AgentMessage>>,
registry: Arc<AgentRegistry>,
config: Arc<RuntimeConfig>,
live: Arc<LiveConfig>,
store: Arc<Store>,
skills: Arc<SkillRegistry>,
providers: Arc<ProviderRegistry>,
memory: Option<Arc<tokio::sync::Mutex<MemoryStore>>>,
event_tx: broadcast::Sender<rsclaw_events::AgentEvent>,
spawner: Option<Arc<AgentSpawner>>,
plugins: Option<Arc<rsclaw_plugin::PluginRegistry>>,
mcp: Option<Arc<rsclaw_mcp::McpRegistry>>,
notification_tx: Option<broadcast::Sender<rsclaw_channel::OutboundMessage>>,
wasm_plugins: Arc<Vec<rsclaw_plugin::WasmPlugin>>,
computer_permission: Arc<rsclaw_computer::permission::RedbPermissionStore>,
computer_permission_tx: broadcast::Sender<rsclaw_computer::permission::PermissionRequest>,
computer_status_tx: broadcast::Sender<rsclaw_computer::status::ComputerUseStatus>,
computer_runs: Arc<
tokio::sync::RwLock<std::collections::HashMap<String, Arc<std::sync::atomic::AtomicBool>>>,
>,
model_health: rsclaw_provider::health::ProviderHealthRegistry,
cap_manager: std::sync::Arc<rsclaw_cap::CapAgentManager>,
cap_live_manager: std::sync::Arc<rsclaw_cap::CapLiveManager>,
) {
for (agent_id, mut rx) in receivers {
let handle = match registry.get(&agent_id) {
Ok(h) => h,
Err(e) => {
error!(agent_id, "agent handle not found: {e:#}");
continue;
}
};
let fallback_models = handle
.config
.model
.as_ref()
.and_then(|m| m.fallbacks.clone())
.or_else(|| {
config
.agents
.defaults
.model
.as_ref()
.and_then(|m| m.fallbacks.clone())
})
.unwrap_or_default();
let mut runtime = AgentRuntime::new(
Arc::clone(&handle),
Arc::clone(&config),
Arc::clone(&live),
Arc::clone(&providers),
fallback_models,
Arc::clone(&skills),
Arc::clone(&store),
memory.clone(),
Some(Arc::clone(®istry)),
Some(event_tx.clone()),
spawner.clone(),
plugins.clone(),
mcp.clone(),
notification_tx.clone(),
model_health.clone(),
Some(Arc::clone(&cap_manager)),
Some(Arc::clone(&cap_live_manager)),
);
runtime.wasm_plugins = Arc::clone(&wasm_plugins);
runtime.computer_permission = Some(Arc::clone(&computer_permission));
runtime.computer_permission_tx = Some(computer_permission_tx.clone());
runtime.computer_status_tx = Some(computer_status_tx.clone());
runtime.computer_runs = Some(Arc::clone(&computer_runs));
let event_tx_task = event_tx.clone();
let config_for_task = Arc::clone(&config);
tokio::spawn(async move {
info!(agent_id = %handle.id, "agent runtime task started");
while let Some(msg) = rx.recv().await {
info!(
agent_id = %handle.id,
session_key = %msg.session_key,
channel = %msg.channel,
"agent runtime: received msg from queue"
);
let AgentMessage {
session_key,
text,
channel,
peer_id,
chat_id,
reply_tx,
extra_tools,
images,
files,
account,
task_id,
context_id,
cancel_token,
event_tx,
input_request_tx,
} = msg;
let (turn_token, registered) = match cancel_token {
Some(t) => (t, false),
None => {
let t = tokio_util::sync::CancellationToken::new();
if let Ok(mut toks) = handle.cancel_tokens.write() {
toks.insert(session_key.clone(), t.clone());
}
(t, true)
}
};
let is_daemon = runtime.is_daemon_agent(&handle.id);
let progress = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
let turn_ctx = rsclaw_agent::registry::TurnContext {
task_id,
context_id,
event_tx,
cancel_token: Some(turn_token.clone()),
input_request_tx,
progress: Some(progress.clone()),
};
let watchdog_token = turn_token.clone();
let watchdog_agent = handle.id.clone();
let watchdog_session = session_key.clone();
let watchdog = tokio::spawn(async move {
use std::sync::atomic::Ordering;
if is_daemon {
const POLL: std::time::Duration = std::time::Duration::from_secs(30);
const STALL_LIMIT: std::time::Duration =
std::time::Duration::from_secs(180);
let mut last = progress.load(Ordering::Relaxed);
let mut stalled = std::time::Duration::ZERO;
loop {
tokio::time::sleep(POLL).await;
if watchdog_token.is_cancelled() {
break;
}
let cur = progress.load(Ordering::Relaxed);
if cur != last {
last = cur;
stalled = std::time::Duration::ZERO;
continue;
}
stalled += POLL;
if stalled >= STALL_LIMIT {
tracing::error!(
agent = %watchdog_agent,
session = %watchdog_session,
stall_s = stalled.as_secs(),
"stuck-turn watchdog (daemon): no loop progress — a tool is \
wedged; firing cancel_token (cron will restart)"
);
watchdog_token.cancel();
break;
}
}
} else {
const TURN_WALL_CLOCK_LIMIT: std::time::Duration =
std::time::Duration::from_secs(20 * 60);
tokio::time::sleep(TURN_WALL_CLOCK_LIMIT).await;
if !watchdog_token.is_cancelled() {
tracing::error!(
agent = %watchdog_agent,
session = %watchdog_session,
limit_s = TURN_WALL_CLOCK_LIMIT.as_secs(),
"stuck-turn watchdog: firing cancel_token — turn exceeded \
wall-clock limit; the agent queue must not stay dark"
);
watchdog_token.cancel();
}
}
});
let result = tokio::select! {
biased;
_ = turn_token.cancelled() => Err(anyhow::anyhow!("turn aborted")),
r = runtime.run_turn(
&session_key,
&text,
&channel,
&peer_id,
&chat_id,
account.as_deref(),
extra_tools,
images,
files,
turn_ctx,
) => r,
};
watchdog.abort();
if registered {
if let Ok(mut toks) = handle.cancel_tokens.write() {
toks.remove(&session_key);
}
}
let turn_errored = result.is_err();
let reply = result.unwrap_or_else(|e| {
let err_str = e.to_string();
let is_user_cancel = err_str.contains("canceled by A2A CancelTask")
|| err_str.contains("turn aborted");
if is_user_cancel {
info!(agent = %handle.id, "turn canceled by user: {e:#}");
} else {
error!(agent = %handle.id, "turn error: {e:#}");
}
let outcome = if is_user_cancel {
rsclaw_agent::registry::ReplyOutcome::Canceled
} else {
rsclaw_agent::registry::ReplyOutcome::Error
};
let i18n_lang = config_for_task
.raw
.gateway
.as_ref()
.and_then(|g| g.language.as_deref())
.map(rsclaw_i18n::resolve_lang)
.unwrap_or("en");
let user_text = match outcome {
rsclaw_agent::registry::ReplyOutcome::Canceled => "[canceled]".to_owned(),
_ => rsclaw_i18n::t("backend_unavailable", i18n_lang),
};
AgentReply {
text: user_text,
is_empty: false,
tool_calls: None,
images: vec![],
files: vec![],
pending_analysis: None,
needs_outer_done_emit: false,
outcome,
}
});
if reply.needs_outer_done_emit || turn_errored {
if !reply.text.is_empty() {
let _ = event_tx_task.send(rsclaw_events::AgentEvent {
session_id: session_key.clone(),
agent_id: handle.id.clone(),
delta: reply.text.clone(),
done: false,
files: vec![],
images: vec![],
tool_log: vec![],
question: None,
channel: None,
});
}
let _ = event_tx_task.send(rsclaw_events::AgentEvent {
session_id: session_key.clone(),
agent_id: handle.id.clone(),
delta: String::new(),
done: true,
files: vec![],
images: vec![],
tool_log: vec![],
question: None,
channel: None,
});
}
let mut reply = reply;
if !turn_errored
&& let Some(reaction) =
rsclaw_agent::goal::check_after_turn(&session_key, &reply.text).await
{
use rsclaw_agent::goal::Reaction;
match reaction {
Reaction::Done(status_line) => {
reply.text = strip_trailing_goal_marker(&reply.text);
if !reply.text.is_empty() {
reply.text.push_str("\n\n");
}
reply.text.push_str(&status_line);
reply.is_empty = reply.text.is_empty()
&& reply.images.is_empty()
&& reply.files.is_empty();
}
Reaction::Continue(next_prompt) => {
if let Some(tq) =
crate::gateway::task_queue::get_task_queue()
{
let delivery_channel: &str =
if channel == "ws" { "desktop" } else { &channel };
if let Err(e) =
crate::gateway::task_queue::submit_to_queue(
&tq,
&session_key,
&next_prompt,
delivery_channel,
&peer_id,
&peer_id,
false,
crate::gateway::task_queue::Priority::Cron,
)
{
warn!(
session = %session_key,
error = %e,
"/goal: failed to enqueue continuation turn"
);
}
} else {
warn!(
"/goal: task_queue not installed; cannot enqueue continuation"
);
}
}
}
}
let _ = reply_tx.send(reply);
}
info!(agent_id = %handle.id, "agent runtime task ended (channel closed)");
});
}
}
fn strip_trailing_goal_marker(text: &str) -> String {
let mut lines: Vec<&str> = text.lines().collect();
while let Some(last) = lines.last() {
if last.trim().is_empty() {
lines.pop();
continue;
}
break;
}
if let Some(last) = lines.last() {
let t = last.trim();
if t == "GOAL_ACHIEVED" || t.starts_with("GOAL_FAILED") {
lines.pop();
while let Some(last) = lines.last() {
if last.trim().is_empty() {
lines.pop();
} else {
break;
}
}
}
}
lines.join("\n")
}
fn resolve_bind_addr(config: &RuntimeConfig) -> SocketAddr {
let port = config.gateway.port;
if let Some(ref addr) = config.gateway.bind_address {
if let Ok(ip) = addr.parse::<std::net::IpAddr>() {
return SocketAddr::new(ip, port);
}
tracing::warn!(
addr = addr.as_str(),
"invalid bind_address, falling back to bind mode"
);
}
match config.gateway.bind {
BindMode::Auto | BindMode::Lan => SocketAddr::from(([0, 0, 0, 0], port)),
BindMode::Loopback => SocketAddr::from(([127, 0, 0, 1], port)),
BindMode::All => SocketAddr::from(([0, 0, 0, 0], port)),
BindMode::Custom => SocketAddr::from(([0, 0, 0, 0], port)),
BindMode::Tailnet => SocketAddr::from(([127, 0, 0, 1], port)),
}
}
async fn spawn_mcp_servers(config: &RuntimeConfig, registry: Arc<rsclaw_mcp::McpRegistry>) {
let mcp = match config.raw.mcp.as_ref() {
Some(m) => m,
None => return,
};
if mcp.enabled == Some(false) {
return;
}
let servers = match mcp.servers.as_ref() {
Some(s) => s,
None => return,
};
for server_cfg in servers {
match rsclaw_mcp::McpClient::spawn(server_cfg).await {
Ok(mut client) => {
if let Err(e) = client.initialize().await {
error!(name = %server_cfg.name, error = %e, "MCP initialize failed");
continue;
}
match client.list_tools().await {
Ok(tools) => {
info!(
name = %server_cfg.name,
tools = tools.len(),
"MCP server ready"
);
}
Err(e) => {
warn!(name = %server_cfg.name, error = %e, "MCP tools/list failed");
}
}
registry.register(Arc::new(client)).await;
}
Err(e) => {
error!(name = %server_cfg.name, error = %e, "failed to start MCP server");
}
}
}
let total = registry.clients.lock().await.len();
if total > 0 {
info!(count = total, "MCP server(s) registered");
}
}
pub(crate) async fn handle_pending_analysis(
analysis: PendingAnalysis,
handle: Arc<rsclaw_agent::AgentHandle>,
out_tx: &mpsc::Sender<rsclaw_channel::OutboundMessage>,
target_id: String,
is_group: bool,
config: &RuntimeConfig,
) {
let i18n_lang = config
.raw
.gateway
.as_ref()
.and_then(|g| g.language.as_deref())
.map(rsclaw_i18n::resolve_lang)
.unwrap_or("en");
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
let msg = AgentMessage {
session_key: analysis.session_key,
text: analysis.text,
channel: analysis.channel,
peer_id: analysis.peer_id.clone(),
chat_id: String::new(),
reply_tx,
task_id: None,
context_id: None,
event_tx: None,
cancel_token: None,
input_request_tx: None,
extra_tools: vec![],
images: vec![],
files: vec![],
account: None,
};
if handle.tx.send(msg).await.is_err() {
let _ = out_tx
.send(rsclaw_channel::OutboundMessage {
target_id,
is_group,
text: rsclaw_i18n::t("analysis_failed", i18n_lang),
reply_to: None,
images: vec![],
channel: None,
account: None,
files: vec![],
})
.await;
return;
}
match tokio::time::timeout(Duration::from_secs(600), reply_rx).await {
Ok(Ok(r)) if !r.text.is_empty() || !r.images.is_empty() || !r.files.is_empty() => {
let _ = out_tx
.send(rsclaw_channel::OutboundMessage {
target_id,
is_group,
text: r.text,
reply_to: None,
images: r.images,
files: r.files,
account: None,
channel: None,
})
.await;
}
Ok(Ok(_)) => {} Ok(Err(_)) => {
let _ = out_tx
.send(rsclaw_channel::OutboundMessage {
target_id,
is_group,
text: rsclaw_i18n::t("analysis_failed", i18n_lang),
reply_to: None,
images: vec![],
channel: None,
account: None,
files: vec![],
})
.await;
}
Err(_) => {
let _ = out_tx
.send(rsclaw_channel::OutboundMessage {
target_id,
is_group,
text: rsclaw_i18n::t("analysis_timeout", i18n_lang),
reply_to: None,
images: vec![],
channel: None,
account: None,
files: vec![],
})
.await;
}
}
}
async fn run_embedder_reembed(
mem_arc: &std::sync::Arc<tokio::sync::Mutex<rsclaw_agent::MemoryStore>>,
) -> anyhow::Result<()> {
const BATCH: usize = 50;
let (embedder, expected_total) = {
let mut mem = mem_arc.lock().await;
let e = mem.embedder_arc();
let pending_count = mem.pending_migration_count();
mem.begin_swap(std::sync::Arc::clone(&e))?;
(e, pending_count)
};
let started = std::time::Instant::now();
let mut total = 0usize;
let mut batch_no = 0usize;
loop {
let pending = {
let mem = mem_arc.lock().await;
mem.swap_pending(BATCH)
};
if pending.is_empty() {
break;
}
batch_no += 1;
let batch_started = std::time::Instant::now();
use rayon::prelude::*;
let batch: Vec<(usize, Vec<f32>)> = pending
.into_par_iter()
.map(|(idx, text)| (idx, embedder.embed(&text)))
.collect();
let applied = {
let mut mem = mem_arc.lock().await;
match mem.swap_apply_batch(batch) {
Ok(n) => n,
Err(e) => {
mem.abort_swap();
return Err(e);
}
}
};
total += applied;
if batch_no > expected_total.saturating_mul(2).max(64) {
let mut mem = mem_arc.lock().await;
mem.abort_swap();
anyhow::bail!(
"embedder re-embed: ran {batch_no} batches against {expected_total} expected docs without converging — aborting"
);
}
info!(
batch = batch_no,
applied,
total,
expected = expected_total,
batch_ms = batch_started.elapsed().as_millis() as u64,
"embedder re-embed: batch complete"
);
}
let migrated = {
let mut mem = mem_arc.lock().await;
mem.commit_swap()?
};
info!(
total,
migrated,
elapsed_secs = started.elapsed().as_secs(),
"embedder re-embed complete; semantic search now full-coverage"
);
Ok(())
}
fn pid_alive(pid: u32) -> bool {
#[cfg(unix)]
{
let rc = unsafe { libc::kill(pid as libc::pid_t, 0) };
if rc == 0 {
return true;
}
std::io::Error::last_os_error().raw_os_error() == Some(libc::EPERM)
}
#[cfg(windows)]
{
use winapi::um::{
handleapi::CloseHandle, processthreadsapi::OpenProcess,
winnt::PROCESS_QUERY_LIMITED_INFORMATION,
};
unsafe {
let h = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
if h.is_null() {
false
} else {
CloseHandle(h);
true
}
}
}
#[cfg(not(any(unix, windows)))]
{
let _ = pid;
true
}
}
const SENTINEL_FILE: &str = ".rsclaw-managed";
fn write_managed_sentinel(model_dir: &std::path::Path, url: &str, bytes: u64) {
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis())
.unwrap_or(0);
let body = format!(
"version={ver}\nurl={url}\nbytes={bytes}\ninstalled_at_ms={now_ms}\n",
ver = env!("CARGO_PKG_VERSION"),
);
if let Err(e) = std::fs::write(model_dir.join(SENTINEL_FILE), body) {
tracing::warn!(
error = %e,
"failed to write {SENTINEL_FILE} — re-download recovery on next start will be disabled"
);
}
}
fn read_sentinel_bytes(model_dir: &std::path::Path) -> Option<u64> {
let body = std::fs::read_to_string(model_dir.join(SENTINEL_FILE)).ok()?;
body.lines()
.find_map(|line| line.strip_prefix("bytes="))
.and_then(|v| v.trim().parse::<u64>().ok())
}
pub(crate) async fn ensure_bge_model_present(
model_dir: &std::path::Path,
search_cfg: Option<&rsclaw_config::schema::MemorySearchConfig>,
) -> anyhow::Result<()> {
use rsclaw_agent::memory::LocalBgeEmbedder;
let seeding_lock = model_dir.with_extension("seeding.tauri");
if seeding_lock.exists() {
let lock_pid = std::fs::read_to_string(&seeding_lock)
.ok()
.and_then(|s| s.trim().parse::<u32>().ok());
if let Some(pid) = lock_pid {
if pid_alive(pid) {
tracing::info!(
pid,
lock = %seeding_lock.display(),
"BGE model seed in progress (Tauri); waiting up to 30s"
);
let deadline = std::time::Instant::now() + Duration::from_secs(30);
while seeding_lock.exists() && std::time::Instant::now() < deadline {
tokio::time::sleep(Duration::from_millis(500)).await;
let still_alive = std::fs::read_to_string(&seeding_lock)
.ok()
.and_then(|s| s.trim().parse::<u32>().ok())
.map(pid_alive)
.unwrap_or(false);
if !still_alive {
let _ = std::fs::remove_file(&seeding_lock);
break;
}
}
} else {
tracing::debug!(stale_pid = pid, "stale seed lock from dead PID, removing");
let _ = std::fs::remove_file(&seeding_lock);
}
} else {
let _ = std::fs::remove_file(&seeding_lock);
}
}
let weights_path = model_dir.join("model.safetensors");
if weights_path.exists() {
let weights_bytes = std::fs::metadata(&weights_path).map(|m| m.len()).ok();
let dir_is_managed = model_dir.join(SENTINEL_FILE).exists();
if dir_is_managed {
if let (Some(actual), Some(expected)) = (weights_bytes, read_sentinel_bytes(model_dir))
{
if actual != expected {
tracing::warn!(
actual,
expected,
"BGE model.safetensors size differs from sentinel; re-downloading"
);
let _ = std::fs::remove_dir_all(model_dir);
}
}
}
if model_dir.join("model.safetensors").exists() {
match LocalBgeEmbedder::load(model_dir) {
Ok(_) => return Ok(()),
Err(e) if dir_is_managed => {
tracing::warn!(
error = %format!("{e:#}"),
dir = %model_dir.display(),
"managed BGE model failed to load; auto-recovering"
);
let _ = std::fs::remove_dir_all(model_dir);
}
Err(e) => {
anyhow::bail!(
"BGE model at {} failed to load: {e:#}\n\
This is a user-placed directory (no {SENTINEL_FILE} sentinel) — \
fix the files or remove the directory to trigger a fresh \
download, then restart.",
model_dir.display()
);
}
}
}
}
let local_cfg = search_cfg.and_then(|c| c.local.as_ref());
let url = local_cfg
.and_then(|c| c.model_download_url.as_deref())
.unwrap_or("https://gitfast.org/tools/models/bge-small-zh-v1.5.zip")
.to_owned();
let pid = std::process::id();
let tmp_dir = model_dir.with_extension(format!("downloading.pid{pid}"));
if let Some(parent) = model_dir.parent() {
let prefix = model_dir
.file_name()
.and_then(|n| n.to_str())
.map(|n| format!("{n}.downloading.pid"))
.unwrap_or_default();
if !prefix.is_empty()
&& let Ok(entries) = std::fs::read_dir(parent)
{
for entry in entries.flatten() {
let p = entry.path();
if let Some(name) = p.file_name().and_then(|n| n.to_str())
&& let Some(other_pid_str) = name.strip_prefix(&prefix)
&& let Ok(other_pid) = other_pid_str.parse::<u32>()
&& other_pid != pid
&& !pid_alive(other_pid)
{
tracing::info!(stale = %p.display(), "sweeping stale staging dir");
let _ = std::fs::remove_dir_all(&p);
}
}
}
}
std::fs::create_dir_all(&tmp_dir)
.with_context(|| format!("failed to create download dir {}", tmp_dir.display()))?;
let archive_name = url.rsplit('/').next().unwrap_or("bge-model.zip");
let archive_path = tmp_dir.join(archive_name);
info!(
"BGE model not present; downloading from {url} -> {}",
archive_path.display()
);
let client = reqwest::Client::new();
let download_result =
crate::cmd::tools::download_resumable(&client, &url, &archive_path, "BGE model").await;
if let Err(e) = download_result {
anyhow::bail!(
"BGE model download failed: {e:#}\n\
URL: {url}\n\
Partial download retained at {} for resume on next start.\n\
Or manually place model files at {} and restart.",
archive_path.display(),
model_dir.display()
);
}
for entry in std::fs::read_dir(&tmp_dir)?.flatten() {
let p = entry.path();
if p == archive_path {
continue;
}
if p.is_dir() {
let _ = std::fs::remove_dir_all(&p);
} else {
let _ = std::fs::remove_file(&p);
}
}
if let Err(e) = crate::cmd::tools::extract_zip_public(&archive_path, &tmp_dir) {
let _ = std::fs::remove_dir_all(&tmp_dir);
anyhow::bail!(
"BGE model archive extraction failed: {e:#}\n\
The downloaded file at {} may be corrupted. Re-run after deleting it.",
archive_path.display()
);
}
if let Err(e) = LocalBgeEmbedder::load(&tmp_dir) {
let _ = std::fs::remove_dir_all(&tmp_dir);
anyhow::bail!(
"downloaded BGE model failed validation: {e:#}\n\
The download may have been corrupted. Retry by restarting; if this\n\
persists, the upstream model URL may be broken: {url}"
);
}
let _ = std::fs::remove_file(&archive_path);
let dir_is_managed = model_dir.exists() && model_dir.join(SENTINEL_FILE).exists();
if !model_dir.exists() || dir_is_managed {
if model_dir.exists() {
std::fs::remove_dir_all(model_dir)
.with_context(|| format!("failed to clear managed dir {}", model_dir.display()))?;
}
std::fs::rename(&tmp_dir, model_dir).with_context(|| {
format!(
"failed to install model: rename {} -> {}",
tmp_dir.display(),
model_dir.display()
)
})?;
} else {
std::fs::create_dir_all(model_dir)
.with_context(|| format!("failed to ensure install dir {}", model_dir.display()))?;
for entry in std::fs::read_dir(&tmp_dir)?.flatten() {
let src = entry.path();
let Some(name) = src.file_name() else {
continue;
};
let dst = model_dir.join(name);
if dst.exists() && name != "model.safetensors" {
tracing::debug!(file = %dst.display(), "preserving user-placed file");
continue;
}
if let Err(e) = std::fs::rename(&src, &dst) {
std::fs::copy(&src, &dst).with_context(|| {
format!(
"failed to install {} -> {}: rename {e}",
src.display(),
dst.display()
)
})?;
let _ = std::fs::remove_file(&src);
}
}
let _ = std::fs::remove_dir_all(&tmp_dir);
}
let installed_bytes = std::fs::metadata(model_dir.join("model.safetensors"))
.map(|m| m.len())
.unwrap_or(0);
write_managed_sentinel(model_dir, &url, installed_bytes);
info!("BGE model installed at {}", model_dir.display());
Ok(())
}
pub(crate) fn publish_restart(
tx: &tokio::sync::broadcast::Sender<rsclaw_events::RestartRequest>,
latch: &Arc<std::sync::RwLock<Option<rsclaw_events::RestartRequest>>>,
shutdown: &crate::gateway::ShutdownCoordinator,
mut req: rsclaw_events::RestartRequest,
) {
let initial = shutdown.inflight() as u64;
req.inflight = initial;
if let Ok(mut guard) = latch.write() {
*guard = Some(req.clone());
} else {
warn!("pending_restart lock poisoned; restart event still broadcast");
}
let _ = tx.send(req.clone());
if initial == 0 {
return;
}
let tx = tx.clone();
let latch = Arc::clone(latch);
let shutdown = shutdown.clone();
tokio::spawn(async move {
let deadline = std::time::Instant::now() + Duration::from_secs(60);
loop {
tokio::time::sleep(Duration::from_millis(200)).await;
if shutdown.inflight() == 0 {
let mut updated = req;
updated.inflight = 0;
if let Ok(mut guard) = latch.write() {
*guard = Some(updated.clone());
}
let _ = tx.send(updated);
return;
}
if std::time::Instant::now() >= deadline {
return;
}
}
});
}
#[cfg(test)]
mod user_env_tests {
use std::collections::HashMap;
use super::*;
#[test]
fn sets_unset_variables() {
unsafe { std::env::remove_var("RSCLAW_TEST_USER_ENV_NEW") };
let mut map = HashMap::new();
map.insert(
"RSCLAW_TEST_USER_ENV_NEW".to_owned(),
"from-config".to_owned(),
);
apply_user_env_map(&map);
assert_eq!(
std::env::var("RSCLAW_TEST_USER_ENV_NEW").as_deref(),
Ok("from-config")
);
unsafe { std::env::remove_var("RSCLAW_TEST_USER_ENV_NEW") };
}
#[test]
fn preexisting_shell_value_wins() {
unsafe { std::env::set_var("RSCLAW_TEST_USER_ENV_KEEP", "from-shell") };
let mut map = HashMap::new();
map.insert(
"RSCLAW_TEST_USER_ENV_KEEP".to_owned(),
"from-config".to_owned(),
);
apply_user_env_map(&map);
assert_eq!(
std::env::var("RSCLAW_TEST_USER_ENV_KEEP").as_deref(),
Ok("from-shell"),
"shell-provided value must not be overwritten"
);
unsafe { std::env::remove_var("RSCLAW_TEST_USER_ENV_KEEP") };
}
#[test]
fn expands_nested_var_refs() {
unsafe { std::env::set_var("RSCLAW_TEST_USER_ENV_REF", "/var/log/foo") };
unsafe { std::env::remove_var("RSCLAW_TEST_USER_ENV_TARGET") };
let mut map = HashMap::new();
map.insert(
"RSCLAW_TEST_USER_ENV_TARGET".to_owned(),
"${RSCLAW_TEST_USER_ENV_REF}/app.log".to_owned(),
);
apply_user_env_map(&map);
assert_eq!(
std::env::var("RSCLAW_TEST_USER_ENV_TARGET").as_deref(),
Ok("/var/log/foo/app.log")
);
unsafe { std::env::remove_var("RSCLAW_TEST_USER_ENV_REF") };
unsafe { std::env::remove_var("RSCLAW_TEST_USER_ENV_TARGET") };
}
}