use std::sync::Arc;
use tokio::sync::{RwLock, broadcast};
use tracing::{info, warn};
use crate::config::runtime::{
AgentsRuntime, ChannelRuntime, ExtRuntime, GatewayRuntime, ModelRuntime, OpsRuntime,
RuntimeConfig,
};
#[derive(Clone)]
pub struct LiveConfig {
pub gateway: Arc<RwLock<GatewayRuntime>>,
pub agents: Arc<RwLock<AgentsRuntime>>,
pub channel: Arc<RwLock<ChannelRuntime>>,
pub model: Arc<RwLock<ModelRuntime>>,
pub ext: Arc<RwLock<ExtRuntime>>,
pub ops: Arc<RwLock<OpsRuntime>>,
}
impl LiveConfig {
pub fn new(cfg: RuntimeConfig) -> Self {
Self {
gateway: Arc::new(RwLock::new(cfg.gateway)),
agents: Arc::new(RwLock::new(cfg.agents)),
channel: Arc::new(RwLock::new(cfg.channel)),
model: Arc::new(RwLock::new(cfg.model)),
ext: Arc::new(RwLock::new(cfg.ext)),
ops: Arc::new(RwLock::new(cfg.ops)),
}
}
pub async fn apply(
&self,
new: RuntimeConfig,
restart_tx: &broadcast::Sender<Vec<String>>,
) -> Vec<String> {
let mut restart_fields = {
let old_gw = self.gateway.read().await;
detect_restart_fields(&old_gw, &new.gateway)
};
{
let old_ch = self.channel.read().await;
if channels_changed(&old_ch.channels, &new.channel.channels) {
restart_fields.push("channels".to_owned());
}
}
{
let old_agents = self.agents.read().await;
let old_primary = old_agents.defaults.model.as_ref()
.and_then(|m| m.primary.as_deref());
let new_primary = new.agents.defaults.model.as_ref()
.and_then(|m| m.primary.as_deref());
if old_primary != new_primary || old_agents.list.len() != new.agents.list.len() {
restart_fields.push("agents/model".to_owned());
}
}
if !restart_fields.is_empty() {
warn!(
?restart_fields,
"hot-reload skipped: fields require gateway restart"
);
let _ = restart_tx.send(restart_fields.clone());
return restart_fields;
}
*self.gateway.write().await = new.gateway;
*self.agents.write().await = new.agents;
*self.channel.write().await = new.channel;
*self.model.write().await = new.model;
*self.ext.write().await = new.ext;
*self.ops.write().await = new.ops;
info!("hot-reload applied — all domains updated");
vec![]
}
pub async fn snapshot(&self) -> RuntimeConfig {
RuntimeConfig {
gateway: self.gateway.read().await.clone(),
agents: self.agents.read().await.clone(),
channel: self.channel.read().await.clone(),
model: self.model.read().await.clone(),
ext: self.ext.read().await.clone(),
ops: self.ops.read().await.clone(),
raw: Default::default(),
}
}
}
pub(crate) fn detect_restart_fields(old: &GatewayRuntime, new: &GatewayRuntime) -> Vec<String> {
let mut fields = Vec::new();
if old.port != new.port {
fields.push("gateway.port".to_owned());
}
if old.bind != new.bind {
fields.push("gateway.bind".to_owned());
}
if old.reload != new.reload {
fields.push("gateway.reload".to_owned());
}
fields
}
fn channels_changed(
old: &crate::config::schema::ChannelsConfig,
new: &crate::config::schema::ChannelsConfig,
) -> bool {
old.telegram.is_some() != new.telegram.is_some()
|| old.discord.is_some() != new.discord.is_some()
|| old.slack.is_some() != new.slack.is_some()
|| old.whatsapp.is_some() != new.whatsapp.is_some()
|| old.signal.is_some() != new.signal.is_some()
|| old.feishu.is_some() != new.feishu.is_some()
|| old.dingtalk.is_some() != new.dingtalk.is_some()
|| old.wecom.is_some() != new.wecom.is_some()
|| old.wechat.is_some() != new.wechat.is_some()
|| old.qq.is_some() != new.qq.is_some()
|| old.line.is_some() != new.line.is_some()
|| old.zalo.is_some() != new.zalo.is_some()
|| old.matrix.is_some() != new.matrix.is_some()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::schema::{BindMode, GatewayMode, ReloadMode};
fn base_gw() -> GatewayRuntime {
GatewayRuntime {
port: 18888,
mode: GatewayMode::Local,
bind: BindMode::Loopback,
bind_address: None,
reload: ReloadMode::Hybrid,
auth_token: None,
allow_tailscale: false,
channel_health_check_minutes: 5,
channel_stale_event_threshold_minutes: 30,
channel_max_restarts_per_hour: 10,
auth_token_configured: false,
auth_token_is_plaintext: false,
user_agent: None,
language: None,
}
}
#[test]
fn no_restart_for_auth_token_change() {
let old = base_gw();
let mut new = old.clone();
new.auth_token = Some("new-token".to_owned());
assert!(detect_restart_fields(&old, &new).is_empty());
}
#[test]
fn restart_required_for_port_change() {
let old = base_gw();
let mut new = old.clone();
new.port = 19000;
let fields = detect_restart_fields(&old, &new);
assert!(fields.contains(&"gateway.port".to_owned()));
}
#[test]
fn restart_required_for_bind_change() {
let old = base_gw();
let mut new = old.clone();
new.bind = BindMode::All;
let fields = detect_restart_fields(&old, &new);
assert!(fields.contains(&"gateway.bind".to_owned()));
}
#[tokio::test]
async fn apply_updates_auth_token() {
use crate::config::{
runtime::{AgentsRuntime, ChannelRuntime, ExtRuntime, ModelRuntime, OpsRuntime},
schema::SessionConfig,
};
let gw = base_gw();
let cfg = RuntimeConfig {
gateway: gw,
agents: AgentsRuntime {
defaults: Default::default(),
list: vec![],
bindings: vec![],
external: vec![],
},
channel: ChannelRuntime {
channels: Default::default(),
session: SessionConfig {
dm_scope: None,
thread_bindings: None,
reset: None,
identity_links: None,
maintenance: None,
},
},
model: ModelRuntime {
models: None,
auth: None,
},
ext: ExtRuntime {
tools: None,
skills: None,
plugins: None,
},
ops: OpsRuntime {
cron: None,
hooks: None,
sandbox: None,
logging: None,
secrets: None,
},
raw: Default::default(),
};
let live = LiveConfig::new(cfg.clone());
let mut new_cfg = cfg;
new_cfg.gateway.auth_token = Some("rotated".to_owned());
let (tx, _) = broadcast::channel(8);
let restart = live.apply(new_cfg, &tx).await;
assert!(restart.is_empty());
assert_eq!(
live.gateway.read().await.auth_token.as_deref(),
Some("rotated")
);
}
}