use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration};
use anyhow::{Context, Result};
use tokio::sync::{broadcast, mpsc};
use tracing::{error, info, warn};
use crate::{
MemoryTier,
agent::{
AgentMessage, AgentRegistry, AgentReply, AgentRuntime, AgentSpawner,
MemoryStore, PendingAnalysis,
},
channel::OutboundMessage,
config::{
self,
runtime::RuntimeConfig,
schema::BindMode,
},
cron::CronRunner,
gateway::{
LiveConfig,
hot_reload::{ConfigChange, FileWatcher},
},
plugin::{MemoryStoreSlot, PluginRegistry, load_all_plugins},
provider::registry::ProviderRegistry,
server::{AppState, serve},
skill::{SkillRegistry, load_skills},
store::Store,
};
use super::channels::{start_channels, start_custom_channels};
use super::providers::build_providers;
pub async fn start_gateway(config: Arc<RuntimeConfig>, tier: MemoryTier) -> Result<()> {
crate::config::apply_proxy_env(&config);
crate::agent::evolution::init_evolution_config(
crate::agent::evolution::EvolutionConfig::from_raw(config.ext.evolution.as_ref()),
);
propagate_skill_registry_env(&config);
let base_dir = crate::config::loader::base_dir();
let data_dir = base_dir.join("var/data");
std::fs::create_dir_all(&data_dir).context("create data dir")?;
{
let lang = config.raw.gateway.as_ref().and_then(|g| g.language.as_deref());
if let Err(e) = crate::agent::bootstrap::seed_tools(&base_dir, lang) {
warn!("failed to seed tool prompts: {e:#}");
}
}
let store = match Store::open(&data_dir, tier) {
Ok(s) => Arc::new(s),
Err(e) => {
let msg = format!("{e:#}");
if msg.contains("already open") || msg.contains("Cannot acquire lock") {
eprintln!(" [!] Database locked by another gateway instance. Exiting cleanly.");
std::process::exit(0);
}
return Err(e).context("open store");
}
};
info!("store opened at {}", data_dir.display());
let providers = Arc::new(build_providers(&config));
info!("{} provider(s) registered", providers.names().len());
let global_skills = base_dir.join("skills");
let skills = Arc::new(
load_skills(&global_skills, None, config.ext.skills.as_ref()).unwrap_or_else(|e| {
warn!("failed to load skills: {e:#}");
SkillRegistry::new()
}),
);
info!("{} skill(s) loaded", skills.len());
let (registry, receivers) =
AgentRegistry::from_config_with_receivers(&config, Arc::clone(&providers));
let registry = Arc::new(registry);
info!("{} agent(s) registered", registry.len());
let (notification_tx, notification_rx) =
broadcast::channel::<crate::channel::OutboundMessage>(64);
let (restart_request_tx, _restart_request_rx) =
tokio::sync::broadcast::channel::<crate::events::RestartRequest>(16);
let pending_restart: Arc<std::sync::RwLock<Option<crate::events::RestartRequest>>> =
Arc::new(std::sync::RwLock::new(None));
let shutdown = crate::gateway::ShutdownCoordinator::new();
let search_cfg = config.raw.memory_search.as_ref();
let model_dir = {
let base_zh = base_dir.join("models/bge-base-zh");
let zh = base_dir.join("models/bge-small-zh");
let en = base_dir.join("models/bge-small-en");
if base_zh.join("model.safetensors").exists() {
base_zh
} else if zh.join("model.safetensors").exists() {
zh
} else if en.join("model.safetensors").exists() {
en
} else {
zh }
};
ensure_bge_model_present(&model_dir, search_cfg).await?;
let memory = match MemoryStore::open(&data_dir, Some(&model_dir), tier, search_cfg).await {
Ok(m) => {
info!("memory store opened");
Some(Arc::new(tokio::sync::Mutex::new(m)))
}
Err(e) => {
return Err(anyhow::anyhow!("failed to open memory store: {e:#}"));
}
};
if let Some(mem_arc) = memory.as_ref() {
let pending = {
let mem = mem_arc.lock().await;
mem.pending_migration_count()
};
if pending > 0 {
info!(
pending,
"embedder dimension changed since last run; spawning background re-embed"
);
let bg_mem = Arc::clone(mem_arc);
tokio::spawn(async move {
if let Err(e) = run_embedder_reembed(&bg_mem).await {
warn!("background re-embed failed ({e:#}); search will be partial until next restart");
}
});
}
}
let plugins_dir = base_dir.join("plugins");
let wasm_browser: Arc<tokio::sync::Mutex<Option<crate::browser::BrowserSession>>> =
Arc::new(tokio::sync::Mutex::new(None));
let mut plugin_registry = load_all_plugins(
&plugins_dir,
config.ext.plugins.as_ref(),
Arc::clone(&wasm_browser),
Some(notification_tx.clone()),
)
.await
.unwrap_or_else(|e| {
warn!("plugin load error: {e:#}");
PluginRegistry::new()
});
if let Some(ref mem_arc) = memory
&& !plugin_registry.slots.has_memory()
{
let slot = MemoryStoreSlot::new(Arc::clone(mem_arc));
let _ = plugin_registry.slots.set_memory(Arc::new(slot), "built-in");
}
info!(
"{} plugin(s) loaded (js={}, wasm={}), memory slot: {}",
plugin_registry.len(),
plugin_registry.js_count(),
plugin_registry.wasm_count(),
plugin_registry.slots.has_memory()
);
let wasm_plugins = Arc::new(plugin_registry.take_wasm_plugins());
let plugins = Arc::new(plugin_registry);
let (event_tx, _) = broadcast::channel::<crate::events::AgentEvent>(1024);
let live = Arc::new(LiveConfig::new((*config).clone()));
let spawner = AgentSpawner::new_arc(
Arc::clone(®istry),
Arc::clone(&config),
Arc::clone(&live),
Arc::clone(&providers),
Arc::clone(&skills),
Arc::clone(&store),
memory.clone(),
event_tx.clone(),
Some(Arc::clone(&plugins)),
);
let mcp_registry = Arc::new(crate::mcp::McpRegistry::new());
spawn_mcp_servers(&config, Arc::clone(&mcp_registry)).await;
let heartbeat_memory = memory.clone();
spawn_agent_tasks(
receivers,
Arc::clone(®istry),
Arc::clone(&config),
Arc::clone(&live),
Arc::clone(&store),
Arc::clone(&skills),
Arc::clone(&providers),
memory,
event_tx.clone(),
Some(Arc::clone(&spawner)),
Some(Arc::clone(&plugins)),
Some(Arc::clone(&mcp_registry)),
Some(notification_tx.clone()),
Arc::clone(&wasm_plugins),
);
let lang = config
.raw
.gateway
.as_ref()
.and_then(|g| g.language.as_deref());
info!(lang = ?lang, "i18n: gateway language config");
if let Some(lang) = lang {
crate::i18n::set_default_lang(lang);
info!(
resolved = crate::i18n::default_lang(),
"i18n: default language set"
);
}
let mut channel_manager = crate::channel::ChannelManager::new(tier);
let feishu_slot: Arc<tokio::sync::OnceCell<Arc<crate::channel::feishu::FeishuChannel>>> =
Arc::new(tokio::sync::OnceCell::new());
let wecom_slot: Arc<tokio::sync::OnceCell<Arc<crate::channel::wecom::WeComChannel>>> =
Arc::new(tokio::sync::OnceCell::new());
let whatsapp_slot: Arc<tokio::sync::OnceCell<Arc<crate::channel::whatsapp::WhatsAppChannel>>> =
Arc::new(tokio::sync::OnceCell::new());
let line_slot: Arc<tokio::sync::OnceCell<Arc<crate::channel::line::LineChannel>>> =
Arc::new(tokio::sync::OnceCell::new());
let zalo_slot: Arc<tokio::sync::OnceCell<Arc<crate::channel::zalo::ZaloChannel>>> =
Arc::new(tokio::sync::OnceCell::new());
let dm_enforcers: Arc<
std::sync::RwLock<std::collections::HashMap<String, Arc<crate::channel::DmPolicyEnforcer>>>,
> = Arc::new(std::sync::RwLock::new(std::collections::HashMap::new()));
let channel_senders: Arc<
std::sync::RwLock<std::collections::HashMap<String, mpsc::Sender<OutboundMessage>>>,
> = Arc::new(std::sync::RwLock::new(std::collections::HashMap::new()));
super::task_queue::install_channel_senders(Arc::clone(&channel_senders));
let task_queue_mgr = Arc::new(
super::task_queue::TaskQueueManager::new(Arc::clone(&store.db)),
);
super::task_queue::install_task_queue(Arc::clone(&task_queue_mgr));
start_channels(
&config,
Arc::clone(®istry),
&mut channel_manager,
Arc::clone(&feishu_slot),
Arc::clone(&wecom_slot),
Arc::clone(&whatsapp_slot),
Arc::clone(&line_slot),
Arc::clone(&zalo_slot),
Arc::clone(&dm_enforcers),
Arc::clone(&store.db),
Arc::clone(&channel_senders),
Arc::clone(&task_queue_mgr),
);
{
let worker = Arc::new(super::task_queue::TaskQueueWorker::new(
Arc::clone(&task_queue_mgr),
Arc::clone(®istry),
Arc::clone(&channel_senders),
shutdown.clone(),
(*config).clone(),
));
tokio::spawn(async move { worker.run().await });
info!("task queue worker started");
}
{
let worker = Arc::new(super::external_jobs_worker::ExternalJobsWorker::new(
Arc::clone(&store.db),
notification_tx.clone(),
shutdown.clone(),
Arc::clone(&config),
));
tokio::spawn(async move { worker.run().await });
info!("external jobs worker started");
}
{
let senders = Arc::clone(&channel_senders);
let mut rx = notification_rx;
tokio::spawn(async move {
info!("notification router started");
while let Ok(msg) = rx.recv().await {
if let Some(ref ch_name) = msg.channel {
let tx = {
let senders_guard = senders.read().expect("channel_senders RwLock poisoned");
senders_guard.get(ch_name).cloned()
};
if let Some(tx) = tx {
info!(channel = %ch_name, target_id = %msg.target_id, "routing notification");
if let Err(e) = tx.send(msg.clone()).await {
tracing::warn!(error = %e, "notification send failed");
}
} else {
warn!(channel = %ch_name, "no channel sender registered for notification");
}
} else {
let first = {
let guard = senders.read().expect("channel_senders RwLock poisoned");
guard.iter().next().map(|(k, v)| (k.clone(), v.clone()))
};
if let Some((ch_name, tx)) = first {
info!(channel = %ch_name, "routing notification to default channel");
if let Err(e) = tx.send(msg.clone()).await {
tracing::warn!(error = %e, "notification send failed");
}
} else {
warn!("notification: no channels registered");
}
}
}
info!("notification router ended");
});
}
let hb_enabled = config
.agents
.defaults
.heartbeat
.as_ref()
.and_then(|h| h.enabled)
.unwrap_or(true);
if hb_enabled {
let runner = crate::heartbeat::HeartbeatRunner::new_with_shutdown(
Arc::clone(®istry),
&data_dir,
heartbeat_memory,
Some(shutdown.clone()),
)
.with_meditation_deps(crate::heartbeat::MeditationDeps {
config: Arc::clone(&config),
});
let runner = std::sync::Arc::new(runner);
runner.run();
info!("heartbeat runner started");
}
let pid_file = crate::config::loader::pid_file();
if let Some(parent) = pid_file.parent() {
if let Err(e) = std::fs::create_dir_all(parent) {
warn!("could not create PID file directory: {e}");
}
}
let pid = std::process::id();
if let Err(e) = std::fs::write(&pid_file, pid.to_string()) {
warn!("could not write PID file: {e}");
}
info!(pid, "gateway PID written to {}", pid_file.display());
if let Some(config_path) = config::loader::detect_config_path() {
let (mut watcher, mut reload_rx) = FileWatcher::new(config_path);
tokio::spawn(async move { watcher.run().await });
let live_reload = Arc::clone(&live);
let (restart_tx, _) = broadcast::channel::<Vec<String>>(8);
let bridge_tx = restart_request_tx.clone();
let bridge_pending = Arc::clone(&pending_restart);
let bridge_shutdown = shutdown.clone();
let cfg_lang = config
.raw
.gateway
.as_ref()
.and_then(|g| g.language.as_deref())
.map(str::to_owned);
tokio::spawn(async move {
let lang = crate::i18n::resolve_lang(cfg_lang.as_deref().unwrap_or("en")).to_owned();
loop {
match reload_rx.recv().await {
Ok(ConfigChange::FullReload(new_cfg)) => {
let new_owned = (*new_cfg).clone();
let needs_restart =
live_reload.apply(new_owned, &restart_tx).await;
if needs_restart.is_empty() {
info!("config hot-reload applied (hot-safe fields only)");
} else {
warn!(?needs_restart, "config change requires gateway restart");
publish_restart(
&bridge_tx,
&bridge_pending,
&bridge_shutdown,
crate::events::RestartRequest::new(
crate::events::RestartReason::ConfigChanged {
sections: needs_restart,
},
crate::events::RestartUrgency::Recommended,
crate::i18n::t("restart_required_config_changed", &lang),
),
);
}
}
Ok(ConfigChange::RequiresRestart(fields)) => {
warn!(?fields, "config change requires restart — surfacing banner");
publish_restart(
&bridge_tx,
&bridge_pending,
&bridge_shutdown,
crate::events::RestartRequest::new(
crate::events::RestartReason::ConfigChanged {
sections: fields,
},
crate::events::RestartUrgency::Required,
crate::i18n::t("restart_required_config_changed", &lang),
),
);
}
Ok(_) => {}
Err(_) => break,
}
}
});
}
let devices_path = crate::config::loader::base_dir().join("var/data/devices.json");
let devices = Arc::new(crate::ws::DeviceStore::new(devices_path));
let ws_conns = Arc::new(crate::ws::ConnRegistry::new());
let custom_webhooks: Arc<
std::sync::RwLock<
std::collections::HashMap<String, Arc<crate::channel::custom::CustomWebhookChannel>>,
>,
> = Arc::new(std::sync::RwLock::new(std::collections::HashMap::new()));
start_custom_channels(
&config,
Arc::clone(®istry),
&mut channel_manager,
Arc::clone(&custom_webhooks),
);
{
let desktop_ch = Arc::new(crate::channel::desktop::DesktopChannel::new(Arc::clone(&ws_conns)));
let (desktop_out_tx, mut desktop_out_rx) = mpsc::channel::<OutboundMessage>(64);
{
let mut senders = channel_senders
.write()
.expect("channel_senders lock poisoned");
senders.insert("desktop".to_string(), desktop_out_tx.clone());
senders.insert("ws".to_string(), desktop_out_tx);
}
let desktop_for_bridge = Arc::clone(&desktop_ch);
tokio::spawn(async move {
use crate::channel::Channel;
while let Some(msg) = desktop_out_rx.recv().await {
if let Err(e) = desktop_for_bridge.send(msg).await {
warn!(error = %e, "desktop notification bridge: send failed");
}
}
});
if let Err(e) = channel_manager.register(desktop_ch as Arc<dyn crate::channel::Channel>) {
warn!("failed to register desktop channel: {e}");
}
}
let channel_manager = Arc::new(channel_manager);
let (cron_reload_tx, _cron_reload_rx) = tokio::sync::broadcast::channel::<()>(16);
crate::cron::install_reload_sender(cron_reload_tx.clone());
{
let cron_cfg = config.ops.cron.clone().unwrap_or_else(|| {
crate::config::schema::CronConfig {
enabled: Some(true),
max_concurrent_runs: None,
session_retention: None,
run_log: None,
jobs: None,
default_delivery: None,
}
});
let cron_enabled = cron_cfg.enabled.unwrap_or(true);
let cron_file = crate::cron::resolve_cron_store_path();
let (jobs, parse_ok) = crate::cron::load_cron_jobs();
if !parse_ok {
error!(file = %cron_file.display(), "cron.json5 has syntax errors - jobs will NOT run until file is fixed");
} else if !jobs.is_empty() {
info!(file = %cron_file.display(), count = jobs.len(), "loaded cron jobs");
}
if cron_enabled {
let cron_data_dir = base_dir.join("var").join("data");
let runner = CronRunner::new_with_shutdown(
&cron_cfg,
jobs,
!parse_ok, Arc::clone(®istry),
Arc::clone(&channel_manager),
cron_data_dir,
cron_reload_tx.clone(),
Arc::clone(&ws_conns),
Some(shutdown.clone()),
);
tokio::spawn(async move {
if let Err(e) = runner.run().await {
error!("cron runner error: {e:#}");
}
});
info!("cron runner started");
}
}
let state = AppState {
config: Arc::clone(&config),
live: Arc::clone(&live),
agents: Arc::clone(®istry),
store: Arc::clone(&store),
event_bus: event_tx,
devices,
ws_conns,
feishu: Arc::clone(&feishu_slot),
wecom: Arc::clone(&wecom_slot),
whatsapp: Arc::clone(&whatsapp_slot),
line: Arc::clone(&line_slot),
zalo: Arc::clone(&zalo_slot),
started_at: std::time::Instant::now(),
dm_enforcers: Arc::clone(&dm_enforcers),
custom_webhooks: Arc::clone(&custom_webhooks),
cron_reload: cron_reload_tx,
notification_tx: notification_tx.clone(),
wasm_plugins: Arc::clone(&wasm_plugins),
plugins: Arc::clone(&plugins),
restart_request_tx: restart_request_tx.clone(),
pending_restart: Arc::clone(&pending_restart),
shutdown: shutdown.clone(),
};
crate::ws::tick::start_tick_loop(Arc::clone(&state.ws_conns));
tokio::spawn(async {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
crate::browser::pool::BrowserPool::global().reap_if_idle().await;
}
});
let bind_addr = resolve_bind_addr(&config);
info!("starting HTTP server on {bind_addr}");
tokio::spawn(async {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
let client = reqwest::Client::builder()
.user_agent("rsclaw/dev")
.timeout(std::time::Duration::from_secs(10))
.build();
let Ok(client) = client else { return };
let resp = client
.get("https://api.github.com/repos/rsclaw-ai/rsclaw/releases/latest")
.send()
.await;
if let Ok(resp) = resp {
if let Ok(release) = resp.json::<serde_json::Value>().await {
let latest_raw = release["tag_name"]
.as_str()
.unwrap_or("");
let current_raw = option_env!("RSCLAW_BUILD_VERSION").unwrap_or("dev");
fn strip_ver(s: &str) -> &str {
let s = s.trim_start_matches('v');
let s = s.split_once(' ').map(|(v, _)| v).unwrap_or(s);
s.split_once('-').map(|(v, _)| v).unwrap_or(s)
}
fn ver_newer(latest: &str, current: &str) -> bool {
let parse = |s: &str| -> Vec<u32> {
s.split('.').filter_map(|p| p.parse().ok()).collect()
};
let l = parse(latest);
let c = parse(current);
for i in 0..l.len().max(c.len()) {
let lv = l.get(i).copied().unwrap_or(0);
let cv = c.get(i).copied().unwrap_or(0);
if lv > cv {
return true;
}
if lv < cv {
return false;
}
}
false
}
let latest = strip_ver(latest_raw);
let current = strip_ver(current_raw);
if !latest.is_empty() && ver_newer(latest, current) {
info!(
current = current_raw,
latest = latest_raw,
"new rsclaw version available -- run `rsclaw update` to upgrade"
);
}
}
}
});
{
let sd = shutdown.clone();
tokio::spawn(async move {
#[cfg(unix)]
{
use tokio::signal::unix::{signal, SignalKind};
let mut sigterm = match signal(SignalKind::terminate()) {
Ok(s) => s,
Err(e) => {
warn!("failed to install SIGTERM handler: {e:#}");
return;
}
};
tokio::select! {
res = tokio::signal::ctrl_c() => {
if let Err(e) = res {
warn!("ctrl_c handler error: {e:#}");
return;
}
info!("SIGINT received, beginning graceful shutdown");
}
_ = sigterm.recv() => {
info!("SIGTERM received, beginning graceful shutdown");
}
}
}
#[cfg(not(unix))]
{
if let Err(e) = tokio::signal::ctrl_c().await {
warn!("ctrl_c handler error: {e:#}");
return;
}
info!("Ctrl-C received, beginning graceful shutdown");
}
sd.begin_drain();
});
}
let result = serve(state, bind_addr).await;
if shutdown.is_restart_requested() {
info!("restart requested - waiting for inflight drain (max 60s)");
let deadline = std::time::Instant::now() + Duration::from_secs(60);
loop {
let n = shutdown.inflight();
if n == 0 {
info!("graceful drain: inflight cleared");
break;
}
if std::time::Instant::now() >= deadline {
warn!(
inflight = n,
"graceful drain: 60s timeout reached, restarting anyway"
);
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
let exe = match std::env::current_exe() {
Ok(p) => p,
Err(e) => {
error!("current_exe failed; cannot respawn replacement: {e:#}");
return result;
}
};
let mut cmd = std::process::Command::new(&exe);
let original_args: Vec<String> = std::env::args().collect();
let mut extra_args: Vec<String> = Vec::new();
let mut i = 1; while i < original_args.len() {
match original_args[i].as_str() {
"--dev" => { extra_args.push("--dev".to_owned()); }
"--profile" => {
extra_args.push("--profile".to_owned());
if let Some(val) = original_args.get(i + 1) {
extra_args.push(val.clone());
i += 1;
}
}
"--base-dir" => {
extra_args.push("--base-dir".to_owned());
if let Some(val) = original_args.get(i + 1) {
extra_args.push(val.clone());
i += 1;
}
}
s if s.starts_with("--profile=") => { extra_args.push(s.to_owned()); }
s if s.starts_with("--base-dir=") => { extra_args.push(s.to_owned()); }
_ => {}
}
i += 1;
}
extra_args.extend(["gateway".to_owned(), "run".to_owned()]);
cmd.args(&extra_args);
#[cfg(target_os = "windows")]
{
use std::os::windows::process::CommandExt;
const CREATE_NO_WINDOW: u32 = 0x0800_0000;
cmd.creation_flags(CREATE_NO_WINDOW);
}
match cmd.spawn() {
Ok(_) => info!("replacement gateway spawned"),
Err(e) => error!("failed to spawn replacement gateway: {e:#}"),
}
std::process::exit(0);
}
if let Err(e) = std::fs::remove_file(&pid_file) {
warn!("could not remove PID file on exit: {e}");
}
result
}
#[allow(clippy::too_many_arguments)]
fn registry_env_names(name: &str) -> Option<(&'static str, &'static str)> {
match name {
"iwencai" => Some(("IWENCAI_API_KEY", "IWENCAI_BASE_URL")),
_ => None,
}
}
fn propagate_skill_registry_env(config: &RuntimeConfig) {
let Some(map) = config.raw.skill_registries.as_ref() else { return };
for (name, entry) in map {
let Some((api_key_var, base_url_var)) = registry_env_names(name) else { continue };
if let Some(key_field) = entry.api_key.as_ref() {
if let Some(val) = key_field.resolve_early().filter(|s| !s.is_empty()) {
if std::env::var(api_key_var).is_err() {
unsafe { std::env::set_var(api_key_var, &val) };
info!(registry = %name, env = api_key_var, "exported registry api key to env");
}
}
}
if let Some(url_field) = entry.base_url.as_ref() {
if let Some(val) = url_field.resolve_early().filter(|s| !s.is_empty()) {
if std::env::var(base_url_var).is_err() {
unsafe { std::env::set_var(base_url_var, &val) };
info!(registry = %name, env = base_url_var, "exported registry base url to env");
}
}
}
}
}
fn spawn_agent_tasks(
receivers: HashMap<String, mpsc::Receiver<AgentMessage>>,
registry: Arc<AgentRegistry>,
config: Arc<RuntimeConfig>,
live: Arc<LiveConfig>,
store: Arc<Store>,
skills: Arc<SkillRegistry>,
providers: Arc<ProviderRegistry>,
memory: Option<Arc<tokio::sync::Mutex<MemoryStore>>>,
event_tx: broadcast::Sender<crate::events::AgentEvent>,
spawner: Option<Arc<AgentSpawner>>,
plugins: Option<Arc<crate::plugin::PluginRegistry>>,
mcp: Option<Arc<crate::mcp::McpRegistry>>,
notification_tx: Option<broadcast::Sender<crate::channel::OutboundMessage>>,
wasm_plugins: Arc<Vec<crate::plugin::WasmPlugin>>,
) {
for (agent_id, mut rx) in receivers {
let handle = match registry.get(&agent_id) {
Ok(h) => h,
Err(e) => {
error!(agent_id, "agent handle not found: {e:#}");
continue;
}
};
let fallback_models = handle
.config
.model
.as_ref()
.and_then(|m| m.fallbacks.clone())
.or_else(|| {
config
.agents
.defaults
.model
.as_ref()
.and_then(|m| m.fallbacks.clone())
})
.unwrap_or_default();
let mut runtime = AgentRuntime::new(
Arc::clone(&handle),
Arc::clone(&config),
Arc::clone(&live),
Arc::clone(&providers),
fallback_models,
Arc::clone(&skills),
Arc::clone(&store),
memory.clone(),
Some(Arc::clone(®istry)),
Some(event_tx.clone()),
spawner.clone(),
plugins.clone(),
mcp.clone(),
notification_tx.clone(),
);
runtime.wasm_plugins = Arc::clone(&wasm_plugins);
let event_tx_task = event_tx.clone();
tokio::spawn(async move {
info!(agent_id = %handle.id, "agent runtime task started");
while let Some(msg) = rx.recv().await {
info!(
agent_id = %handle.id,
session_key = %msg.session_key,
channel = %msg.channel,
"agent runtime: received msg from queue"
);
let AgentMessage {
session_key,
text,
channel,
peer_id,
chat_id,
reply_tx,
extra_tools,
images,
files,
account: _,
} = msg;
let result = runtime
.run_turn(
&session_key,
&text,
&channel,
&peer_id,
&chat_id,
extra_tools,
images,
files,
)
.await;
let turn_errored = result.is_err();
let reply = result.unwrap_or_else(|e| {
error!(agent = %handle.id, "turn error: {e:#}");
AgentReply {
text: format!("[error: {e}]"),
is_empty: false,
tool_calls: None,
images: vec![],
files: vec![],
pending_analysis: None,
needs_outer_done_emit: false,
}
});
if reply.needs_outer_done_emit || turn_errored {
if !reply.text.is_empty() {
let _ = event_tx_task.send(crate::events::AgentEvent {
session_id: session_key.clone(),
agent_id: handle.id.clone(),
delta: reply.text.clone(),
done: false,
files: vec![],
images: vec![],
tool_log: vec![],
});
}
let _ = event_tx_task.send(crate::events::AgentEvent {
session_id: session_key.clone(),
agent_id: handle.id.clone(),
delta: String::new(),
done: true,
files: vec![],
images: vec![],
tool_log: vec![],
});
}
let _ = reply_tx.send(reply);
}
info!(agent_id = %handle.id, "agent runtime task ended (channel closed)");
});
}
}
fn resolve_bind_addr(config: &RuntimeConfig) -> SocketAddr {
let port = config.gateway.port;
if let Some(ref addr) = config.gateway.bind_address {
if let Ok(ip) = addr.parse::<std::net::IpAddr>() {
return SocketAddr::new(ip, port);
}
tracing::warn!(
addr = addr.as_str(),
"invalid bind_address, falling back to bind mode"
);
}
match config.gateway.bind {
BindMode::Auto | BindMode::Lan => SocketAddr::from(([0, 0, 0, 0], port)),
BindMode::Loopback => SocketAddr::from(([127, 0, 0, 1], port)),
BindMode::All => SocketAddr::from(([0, 0, 0, 0], port)),
BindMode::Custom => SocketAddr::from(([0, 0, 0, 0], port)),
BindMode::Tailnet => SocketAddr::from(([127, 0, 0, 1], port)),
}
}
async fn spawn_mcp_servers(config: &RuntimeConfig, registry: Arc<crate::mcp::McpRegistry>) {
let mcp = match config.raw.mcp.as_ref() {
Some(m) => m,
None => return,
};
if mcp.enabled == Some(false) {
return;
}
let servers = match mcp.servers.as_ref() {
Some(s) => s,
None => return,
};
for server_cfg in servers {
match crate::mcp::McpClient::spawn(server_cfg).await {
Ok(mut client) => {
if let Err(e) = client.initialize().await {
error!(name = %server_cfg.name, error = %e, "MCP initialize failed");
continue;
}
match client.list_tools().await {
Ok(tools) => {
info!(
name = %server_cfg.name,
tools = tools.len(),
"MCP server ready"
);
}
Err(e) => {
warn!(name = %server_cfg.name, error = %e, "MCP tools/list failed");
}
}
registry.register(Arc::new(client)).await;
}
Err(e) => {
error!(name = %server_cfg.name, error = %e, "failed to start MCP server");
}
}
}
let total = registry.clients.lock().await.len();
if total > 0 {
info!(count = total, "MCP server(s) registered");
}
}
pub(crate) async fn handle_pending_analysis(
analysis: PendingAnalysis,
handle: Arc<crate::agent::AgentHandle>,
out_tx: &mpsc::Sender<crate::channel::OutboundMessage>,
target_id: String,
is_group: bool,
config: &RuntimeConfig,
) {
let i18n_lang = config
.raw
.gateway
.as_ref()
.and_then(|g| g.language.as_deref())
.map(crate::i18n::resolve_lang)
.unwrap_or("en");
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
let msg = AgentMessage {
session_key: analysis.session_key,
text: analysis.text,
channel: analysis.channel,
peer_id: analysis.peer_id.clone(),
chat_id: String::new(),
reply_tx,
extra_tools: vec![],
images: vec![],
files: vec![],
account: None,
};
if handle.tx.send(msg).await.is_err() {
let _ = out_tx
.send(crate::channel::OutboundMessage {
target_id,
is_group,
text: crate::i18n::t("analysis_failed", i18n_lang),
reply_to: None,
images: vec![],
channel: None,
account: None,
files: vec![], })
.await;
return;
}
match tokio::time::timeout(Duration::from_secs(600), reply_rx).await {
Ok(Ok(r)) if !r.text.is_empty() || !r.images.is_empty() || !r.files.is_empty() => {
let _ = out_tx
.send(crate::channel::OutboundMessage {
target_id,
is_group,
text: r.text,
reply_to: None,
images: r.images,
files: r.files,
account: None,
channel: None, })
.await;
}
Ok(Ok(_)) => {} Ok(Err(_)) => {
let _ = out_tx
.send(crate::channel::OutboundMessage {
target_id,
is_group,
text: crate::i18n::t("analysis_failed", i18n_lang),
reply_to: None,
images: vec![],
channel: None,
account: None,
files: vec![], })
.await;
}
Err(_) => {
let _ = out_tx
.send(crate::channel::OutboundMessage {
target_id,
is_group,
text: crate::i18n::t("analysis_timeout", i18n_lang),
reply_to: None,
images: vec![],
channel: None,
account: None,
files: vec![], })
.await;
}
}
}
async fn run_embedder_reembed(
mem_arc: &std::sync::Arc<tokio::sync::Mutex<crate::agent::MemoryStore>>,
) -> anyhow::Result<()> {
const BATCH: usize = 50;
let (embedder, expected_total) = {
let mut mem = mem_arc.lock().await;
let e = mem.embedder_arc();
let pending_count = mem.pending_migration_count();
mem.begin_swap(std::sync::Arc::clone(&e))?;
(e, pending_count)
};
let started = std::time::Instant::now();
let mut total = 0usize;
let mut batch_no = 0usize;
loop {
let pending = {
let mem = mem_arc.lock().await;
mem.swap_pending(BATCH)
};
if pending.is_empty() {
break;
}
batch_no += 1;
let batch_started = std::time::Instant::now();
use rayon::prelude::*;
let batch: Vec<(usize, Vec<f32>)> = pending
.into_par_iter()
.map(|(idx, text)| (idx, embedder.embed(&text)))
.collect();
let applied = {
let mut mem = mem_arc.lock().await;
match mem.swap_apply_batch(batch) {
Ok(n) => n,
Err(e) => {
mem.abort_swap();
return Err(e);
}
}
};
total += applied;
if batch_no > expected_total.saturating_mul(2).max(64) {
let mut mem = mem_arc.lock().await;
mem.abort_swap();
anyhow::bail!(
"embedder re-embed: ran {batch_no} batches against {expected_total} expected docs without converging — aborting"
);
}
info!(
batch = batch_no,
applied,
total,
expected = expected_total,
batch_ms = batch_started.elapsed().as_millis() as u64,
"embedder re-embed: batch complete"
);
}
let migrated = {
let mut mem = mem_arc.lock().await;
mem.commit_swap()?
};
info!(
total,
migrated,
elapsed_secs = started.elapsed().as_secs(),
"embedder re-embed complete; semantic search now full-coverage"
);
Ok(())
}
fn pid_alive(pid: u32) -> bool {
#[cfg(unix)]
{
let rc = unsafe { libc::kill(pid as libc::pid_t, 0) };
if rc == 0 {
return true;
}
std::io::Error::last_os_error().raw_os_error() == Some(libc::EPERM)
}
#[cfg(windows)]
{
use winapi::um::handleapi::CloseHandle;
use winapi::um::processthreadsapi::OpenProcess;
use winapi::um::winnt::PROCESS_QUERY_LIMITED_INFORMATION;
unsafe {
let h = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
if h.is_null() {
false
} else {
CloseHandle(h);
true
}
}
}
#[cfg(not(any(unix, windows)))]
{
let _ = pid;
true
}
}
const SENTINEL_FILE: &str = ".rsclaw-managed";
fn write_managed_sentinel(model_dir: &std::path::Path, url: &str, bytes: u64) {
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis())
.unwrap_or(0);
let body = format!(
"version={ver}\nurl={url}\nbytes={bytes}\ninstalled_at_ms={now_ms}\n",
ver = env!("CARGO_PKG_VERSION"),
);
if let Err(e) = std::fs::write(model_dir.join(SENTINEL_FILE), body) {
tracing::warn!(
error = %e,
"failed to write {SENTINEL_FILE} — re-download recovery on next start will be disabled"
);
}
}
fn read_sentinel_bytes(model_dir: &std::path::Path) -> Option<u64> {
let body = std::fs::read_to_string(model_dir.join(SENTINEL_FILE)).ok()?;
body.lines()
.find_map(|line| line.strip_prefix("bytes="))
.and_then(|v| v.trim().parse::<u64>().ok())
}
pub(crate) async fn ensure_bge_model_present(
model_dir: &std::path::Path,
search_cfg: Option<&crate::config::schema::MemorySearchConfig>,
) -> anyhow::Result<()> {
use crate::agent::memory::LocalBgeEmbedder;
let seeding_lock = model_dir.with_extension("seeding.tauri");
if seeding_lock.exists() {
let lock_pid = std::fs::read_to_string(&seeding_lock)
.ok()
.and_then(|s| s.trim().parse::<u32>().ok());
if let Some(pid) = lock_pid {
if pid_alive(pid) {
tracing::info!(
pid,
lock = %seeding_lock.display(),
"BGE model seed in progress (Tauri); waiting up to 30s"
);
let deadline = std::time::Instant::now() + Duration::from_secs(30);
while seeding_lock.exists() && std::time::Instant::now() < deadline {
tokio::time::sleep(Duration::from_millis(500)).await;
let still_alive = std::fs::read_to_string(&seeding_lock)
.ok()
.and_then(|s| s.trim().parse::<u32>().ok())
.map(pid_alive)
.unwrap_or(false);
if !still_alive {
let _ = std::fs::remove_file(&seeding_lock);
break;
}
}
} else {
tracing::debug!(
stale_pid = pid,
"stale seed lock from dead PID, removing"
);
let _ = std::fs::remove_file(&seeding_lock);
}
} else {
let _ = std::fs::remove_file(&seeding_lock);
}
}
let weights_path = model_dir.join("model.safetensors");
if weights_path.exists() {
let weights_bytes = std::fs::metadata(&weights_path).map(|m| m.len()).ok();
let dir_is_managed = model_dir.join(SENTINEL_FILE).exists();
if dir_is_managed {
if let (Some(actual), Some(expected)) =
(weights_bytes, read_sentinel_bytes(model_dir))
{
if actual != expected {
tracing::warn!(
actual,
expected,
"BGE model.safetensors size differs from sentinel; re-downloading"
);
let _ = std::fs::remove_dir_all(model_dir);
}
}
}
if model_dir.join("model.safetensors").exists() {
match LocalBgeEmbedder::load(model_dir) {
Ok(_) => return Ok(()),
Err(e) if dir_is_managed => {
tracing::warn!(
error = %format!("{e:#}"),
dir = %model_dir.display(),
"managed BGE model failed to load; auto-recovering"
);
let _ = std::fs::remove_dir_all(model_dir);
}
Err(e) => {
anyhow::bail!(
"BGE model at {} failed to load: {e:#}\n\
This is a user-placed directory (no {SENTINEL_FILE} sentinel) — \
fix the files or remove the directory to trigger a fresh \
download, then restart.",
model_dir.display()
);
}
}
}
}
let local_cfg = search_cfg.and_then(|c| c.local.as_ref());
let url = local_cfg
.and_then(|c| c.model_download_url.as_deref())
.unwrap_or("https://gitfast.org/tools/models/bge-small-zh-v1.5.zip")
.to_owned();
let pid = std::process::id();
let tmp_dir = model_dir.with_extension(format!("downloading.pid{pid}"));
if let Some(parent) = model_dir.parent() {
let prefix = model_dir
.file_name()
.and_then(|n| n.to_str())
.map(|n| format!("{n}.downloading.pid"))
.unwrap_or_default();
if !prefix.is_empty()
&& let Ok(entries) = std::fs::read_dir(parent)
{
for entry in entries.flatten() {
let p = entry.path();
if let Some(name) = p.file_name().and_then(|n| n.to_str())
&& let Some(other_pid_str) = name.strip_prefix(&prefix)
&& let Ok(other_pid) = other_pid_str.parse::<u32>()
&& other_pid != pid
&& !pid_alive(other_pid)
{
tracing::info!(stale = %p.display(), "sweeping stale staging dir");
let _ = std::fs::remove_dir_all(&p);
}
}
}
}
std::fs::create_dir_all(&tmp_dir).with_context(|| {
format!("failed to create download dir {}", tmp_dir.display())
})?;
let archive_name = url.rsplit('/').next().unwrap_or("bge-model.zip");
let archive_path = tmp_dir.join(archive_name);
info!("BGE model not present; downloading from {url} -> {}", archive_path.display());
let client = reqwest::Client::new();
let download_result =
crate::cmd::tools::download_resumable(&client, &url, &archive_path, "BGE model").await;
if let Err(e) = download_result {
anyhow::bail!(
"BGE model download failed: {e:#}\n\
URL: {url}\n\
Partial download retained at {} for resume on next start.\n\
Or manually place model files at {} and restart.",
archive_path.display(),
model_dir.display()
);
}
for entry in std::fs::read_dir(&tmp_dir)?.flatten() {
let p = entry.path();
if p == archive_path {
continue;
}
if p.is_dir() {
let _ = std::fs::remove_dir_all(&p);
} else {
let _ = std::fs::remove_file(&p);
}
}
if let Err(e) = crate::cmd::tools::extract_zip_public(&archive_path, &tmp_dir) {
let _ = std::fs::remove_dir_all(&tmp_dir);
anyhow::bail!(
"BGE model archive extraction failed: {e:#}\n\
The downloaded file at {} may be corrupted. Re-run after deleting it.",
archive_path.display()
);
}
if let Err(e) = LocalBgeEmbedder::load(&tmp_dir) {
let _ = std::fs::remove_dir_all(&tmp_dir);
anyhow::bail!(
"downloaded BGE model failed validation: {e:#}\n\
The download may have been corrupted. Retry by restarting; if this\n\
persists, the upstream model URL may be broken: {url}"
);
}
let _ = std::fs::remove_file(&archive_path);
let dir_is_managed = model_dir.exists() && model_dir.join(SENTINEL_FILE).exists();
if !model_dir.exists() || dir_is_managed {
if model_dir.exists() {
std::fs::remove_dir_all(model_dir).with_context(|| {
format!("failed to clear managed dir {}", model_dir.display())
})?;
}
std::fs::rename(&tmp_dir, model_dir).with_context(|| {
format!(
"failed to install model: rename {} -> {}",
tmp_dir.display(),
model_dir.display()
)
})?;
} else {
std::fs::create_dir_all(model_dir).with_context(|| {
format!("failed to ensure install dir {}", model_dir.display())
})?;
for entry in std::fs::read_dir(&tmp_dir)?.flatten() {
let src = entry.path();
let Some(name) = src.file_name() else {
continue;
};
let dst = model_dir.join(name);
if dst.exists() && name != "model.safetensors" {
tracing::debug!(file = %dst.display(), "preserving user-placed file");
continue;
}
if let Err(e) = std::fs::rename(&src, &dst) {
std::fs::copy(&src, &dst).with_context(|| {
format!(
"failed to install {} -> {}: rename {e}",
src.display(),
dst.display()
)
})?;
let _ = std::fs::remove_file(&src);
}
}
let _ = std::fs::remove_dir_all(&tmp_dir);
}
let installed_bytes = std::fs::metadata(model_dir.join("model.safetensors"))
.map(|m| m.len())
.unwrap_or(0);
write_managed_sentinel(model_dir, &url, installed_bytes);
info!("BGE model installed at {}", model_dir.display());
Ok(())
}
pub(crate) fn publish_restart(
tx: &tokio::sync::broadcast::Sender<crate::events::RestartRequest>,
latch: &Arc<std::sync::RwLock<Option<crate::events::RestartRequest>>>,
shutdown: &crate::gateway::ShutdownCoordinator,
mut req: crate::events::RestartRequest,
) {
let initial = shutdown.inflight() as u64;
req.inflight = initial;
if let Ok(mut guard) = latch.write() {
*guard = Some(req.clone());
} else {
warn!("pending_restart lock poisoned; restart event still broadcast");
}
let _ = tx.send(req.clone());
if initial == 0 {
return;
}
let tx = tx.clone();
let latch = Arc::clone(latch);
let shutdown = shutdown.clone();
tokio::spawn(async move {
let deadline = std::time::Instant::now() + Duration::from_secs(60);
loop {
tokio::time::sleep(Duration::from_millis(200)).await;
if shutdown.inflight() == 0 {
let mut updated = req;
updated.inflight = 0;
if let Ok(mut guard) = latch.write() {
*guard = Some(updated.clone());
}
let _ = tx.send(updated);
return;
}
if std::time::Instant::now() >= deadline {
return;
}
}
});
}