use std::sync::{Arc, OnceLock, Weak};
use anyhow::{Result, anyhow};
use tokio::sync::{broadcast, mpsc};
use tracing::info;
use rsclaw_config::live_config::LiveConfig;
use crate::{
AgentHandle, AgentKind, AgentMessage, AgentRegistry, AgentReply, AgentRuntime, MemoryStore,
};
use rsclaw_config::{runtime::RuntimeConfig, schema::AgentEntry};
use rsclaw_events::AgentEvent;
use rsclaw_plugin::PluginRegistry;
use rsclaw_provider::registry::ProviderRegistry;
use rsclaw_skill::SkillRegistry;
use rsclaw_store::Store;
pub struct AgentSpawner {
pub registry: Arc<AgentRegistry>,
pub config: Arc<RuntimeConfig>,
pub live: Arc<LiveConfig>,
pub providers: Arc<ProviderRegistry>,
pub skills: Arc<SkillRegistry>,
pub store: Arc<Store>,
pub memory: Option<Arc<tokio::sync::Mutex<MemoryStore>>>,
pub event_tx: broadcast::Sender<AgentEvent>,
pub plugins: Option<Arc<PluginRegistry>>,
pub model_health: rsclaw_provider::health::ProviderHealthRegistry,
pub cap_manager: Option<std::sync::Arc<rsclaw_cap::CapAgentManager>>,
pub cap_live_manager: Option<std::sync::Arc<rsclaw_cap::CapLiveManager>>,
me: OnceLock<Weak<AgentSpawner>>,
}
impl AgentSpawner {
#[allow(clippy::too_many_arguments)]
pub fn new_arc(
registry: Arc<AgentRegistry>,
config: Arc<RuntimeConfig>,
live: Arc<LiveConfig>,
providers: Arc<ProviderRegistry>,
skills: Arc<SkillRegistry>,
store: Arc<Store>,
memory: Option<Arc<tokio::sync::Mutex<MemoryStore>>>,
event_tx: broadcast::Sender<AgentEvent>,
plugins: Option<Arc<PluginRegistry>>,
model_health: rsclaw_provider::health::ProviderHealthRegistry,
cap_manager: Option<std::sync::Arc<rsclaw_cap::CapAgentManager>>,
cap_live_manager: Option<std::sync::Arc<rsclaw_cap::CapLiveManager>>,
) -> Arc<Self> {
let s = Arc::new(Self {
registry,
config,
live,
providers,
skills,
store,
memory,
event_tx,
plugins,
model_health,
cap_manager,
cap_live_manager,
me: OnceLock::new(),
});
s.me.set(Arc::downgrade(&s)).ok();
s
}
pub fn spawn_agent(&self, entry: AgentEntry) -> Result<String> {
self.spawn_agent_with_kind(entry, AgentKind::Named)
}
pub fn spawn_agent_with_kind(&self, entry: AgentEntry, kind: AgentKind) -> Result<String> {
let id = entry.id.clone();
if self.registry.get(&id).is_ok() {
return Err(anyhow!("agent '{}' already exists", id));
}
let (tx, mut rx) = mpsc::channel::<AgentMessage>(32);
let max_concurrent = entry
.lane_concurrency
.or(self.config.agents.defaults.max_concurrent)
.unwrap_or(4) as usize;
let context_window = entry
.model
.as_ref()
.and_then(|m| m.context_tokens)
.or(self.config.agents.defaults.context_tokens)
.unwrap_or(0) as usize;
let effective_model =
crate::runtime::resolve_primary_model_for(&entry, &self.config.agents.defaults)
.unwrap_or_else(|| "rsclaw/rsclaw-agent-v1".to_owned());
let handle = Arc::new(AgentHandle {
id: id.clone(),
kind,
config: entry.clone(),
tx,
concurrency: Arc::new(tokio::sync::Semaphore::new(max_concurrent)),
live_status: Arc::new(tokio::sync::RwLock::new(
crate::runtime::LiveStatus::default(),
)),
providers: Arc::clone(&self.providers),
abort_flags: Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
cancel_tokens: Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
plugin_overrides: Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
wasm_plugins: Arc::new(std::sync::RwLock::new(Arc::new(Vec::new()))),
notification_tx: Arc::new(std::sync::RwLock::new(None)),
cold_enabled: Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
started_at: std::time::Instant::now(),
session_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
session_tokens: Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
last_ctx_tokens: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
last_sys_tokens: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
last_tools_tokens: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
last_msg_tokens: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
clear_signal: Arc::new(std::sync::atomic::AtomicBool::new(false)),
new_session_signal: Arc::new(std::sync::atomic::AtomicBool::new(false)),
context_window,
effective_model,
});
self.registry.insert_handle(Arc::clone(&handle));
let fallback_models = handle
.config
.model
.as_ref()
.and_then(|m| m.fallbacks.clone())
.unwrap_or_default();
let self_arc: Option<Arc<AgentSpawner>> = self.me.get().and_then(|w| w.upgrade());
let mut runtime = AgentRuntime::new(
Arc::clone(&handle),
Arc::clone(&self.config),
Arc::clone(&self.live),
Arc::clone(&self.providers),
fallback_models,
Arc::clone(&self.skills),
Arc::clone(&self.store),
self.memory.clone(),
Some(Arc::clone(&self.registry)),
Some(self.event_tx.clone()),
self_arc,
self.plugins.clone(),
None, None, self.model_health.clone(),
self.cap_manager.clone(),
self.cap_live_manager.clone(),
);
let config_for_task = Arc::clone(&self.config);
tokio::spawn(async move {
info!(agent_id = %handle.id, "dynamic agent spawned");
while let Some(msg) = rx.recv().await {
let AgentMessage {
session_key,
text,
channel,
account,
peer_id,
reply_tx,
extra_tools,
images,
files,
chat_id,
..
} = msg;
let result = runtime
.run_turn(
&session_key,
&text,
&channel,
&peer_id,
&chat_id,
account.as_deref(),
extra_tools,
images,
files,
crate::registry::TurnContext::default(),
)
.await;
let reply = result.unwrap_or_else(|e| {
tracing::error!(agent = %handle.id, "dynamic agent turn error: {e:#}");
let outcome = if e.to_string().contains("canceled by A2A CancelTask") {
crate::registry::ReplyOutcome::Canceled
} else {
crate::registry::ReplyOutcome::Error
};
let i18n_lang = config_for_task
.raw
.gateway
.as_ref()
.and_then(|g| g.language.as_deref())
.map(rsclaw_i18n::resolve_lang)
.unwrap_or("en");
let user_text = match outcome {
crate::registry::ReplyOutcome::Canceled => "[canceled]".to_owned(),
_ => rsclaw_i18n::t("backend_unavailable", i18n_lang),
};
AgentReply {
text: user_text,
is_empty: false,
tool_calls: None,
images: vec![],
files: vec![],
pending_analysis: None,
needs_outer_done_emit: false,
outcome,
}
});
let _ = reply_tx.send(reply);
}
info!(agent_id = %handle.id, "dynamic agent task ended");
});
Ok(id)
}
}