use crate::{
Daemon, DaemonConfig,
config::{ResolvedManifest, resolve_manifests},
daemon::event::{DaemonEvent, DaemonEventSender},
};
use anyhow::Result;
use crabllm_core::Provider;
use crabllm_provider::{ProviderRegistry, RemoteProvider};
use runtime::{Env, SkillHandler, host::Host, mcp::McpHandler, memory::Memory};
use std::{
collections::{BTreeMap, HashMap},
path::{Path, PathBuf},
sync::Arc,
};
use tokio::sync::{Mutex, RwLock, broadcast};
use wcore::{AgentConfig, Runtime, ToolRequest, model::Model};
pub type DefaultProvider = crate::provider::Retrying<ProviderRegistry<RemoteProvider>>;
pub type BuildProvider<P> =
Arc<dyn Fn(&DaemonConfig) -> Result<wcore::model::Model<P>> + Send + Sync>;
pub fn build_default_provider(config: &DaemonConfig) -> Result<Model<DefaultProvider>> {
build_providers(config)
}
fn resolve_plugin_skills(skills: &mut Vec<String>, plugin_skill_dirs: &BTreeMap<String, PathBuf>) {
let mut resolved = Vec::new();
for entry in skills.drain(..) {
if entry.contains('/') {
if let Some(dir) = plugin_skill_dirs.get(&entry) {
match runtime::skill::loader::load_skills_dir(dir) {
Ok(registry) => {
for skill in ®istry.skills {
resolved.push(skill.name.clone());
}
}
Err(e) => {
tracing::warn!("failed to resolve plugin skills for '{entry}': {e}");
}
}
} else {
tracing::warn!("unknown plugin skill reference: '{entry}'");
}
} else {
resolved.push(entry);
}
}
*skills = resolved;
}
const SYSTEM_AGENT: &str = runtime::memory::DEFAULT_SOUL;
impl<P: Provider + 'static, H: Host + 'static> Daemon<P, H> {
pub(crate) async fn build(
config: &DaemonConfig,
config_dir: &Path,
event_tx: DaemonEventSender,
shutdown_tx: broadcast::Sender<()>,
host: H,
build_provider: BuildProvider<P>,
) -> Result<Self> {
let runtime =
Self::build_runtime(config, config_dir, &event_tx, host, &build_provider).await?;
let cron_store = crate::cron::CronStore::load(
config_dir.join("crons.toml"),
event_tx.clone(),
shutdown_tx,
);
let crons = Arc::new(Mutex::new(cron_store));
crons.lock().await.start_all(crons.clone());
let event_bus =
crate::event_bus::EventBus::load(config_dir.join("events.toml"), event_tx.clone());
let events = Arc::new(Mutex::new(event_bus));
Ok(Self {
runtime: Arc::new(RwLock::new(Arc::new(runtime))),
config_dir: config_dir.to_path_buf(),
event_tx,
started_at: std::time::Instant::now(),
crons,
events,
build_provider,
})
}
pub async fn reload(&self) -> Result<()> {
let config = DaemonConfig::load(&self.config_dir.join(wcore::paths::CONFIG_FILE))?;
let host = {
let old_rt = self.runtime.read().await;
old_rt.hook.host.clone()
};
let mut new_runtime = Self::build_runtime(
&config,
&self.config_dir,
&self.event_tx,
host,
&self.build_provider,
)
.await?;
{
let old_runtime = self.runtime.read().await;
(**old_runtime)
.transfer_conversations(&mut new_runtime)
.await;
}
*self.runtime.write().await = Arc::new(new_runtime);
tracing::info!("daemon reloaded");
Ok(())
}
async fn build_runtime(
config: &DaemonConfig,
config_dir: &Path,
event_tx: &DaemonEventSender,
host: H,
build_provider: &BuildProvider<P>,
) -> Result<Runtime<P, Env<H>>> {
let (mut manifest, _warnings) = resolve_manifests(config_dir);
manifest.disabled = config.disabled.clone();
wcore::filter_disabled_external(&mut manifest.skill_dirs, &manifest.disabled.external);
let model = build_provider(config)?;
let hook = build_env(config, config_dir, &manifest, host).await?;
let tool_tx = build_tool_sender(event_tx);
let mut runtime = Runtime::new(model, hook, Some(tool_tx)).await;
load_agents(&mut runtime, config, &manifest)?;
Ok(runtime)
}
}
fn build_providers(config: &DaemonConfig) -> Result<Model<DefaultProvider>> {
let providers: HashMap<String, _> = config
.provider
.iter()
.filter(|(name, _)| !config.disabled.providers.contains(name))
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
let provider_count = providers.len();
let model_count: usize = providers.values().map(|def| def.models.len()).sum();
let registry = ProviderRegistry::from_provider_configs(&providers, &HashMap::new(), |r| r)?;
let retrying = crate::provider::Retrying::new(registry);
tracing::info!(
"provider registry initialized — {model_count} models across {provider_count} providers"
);
Ok(Model::new(retrying))
}
async fn build_env<H: Host>(
config: &DaemonConfig,
config_dir: &Path,
manifest: &ResolvedManifest,
host: H,
) -> Result<Env<H>> {
let skills = SkillHandler::load(manifest.skill_dirs.clone(), &manifest.disabled.skills)
.unwrap_or_else(|e| {
tracing::warn!("failed to load skills: {e}");
SkillHandler::default()
});
let mcp_servers: Vec<_> = manifest
.mcps
.iter()
.filter(|(name, _)| !manifest.disabled.mcps.contains(name))
.map(|(_, mcp)| {
let mut mcp = mcp.clone();
for (k, v) in &config.env {
mcp.env.entry(k.clone()).or_insert_with(|| v.clone());
}
mcp
})
.collect();
let mcp_handler = McpHandler::load(&mcp_servers).await;
let memory = Some(Memory::open(
config_dir.join("memory"),
config.system.memory.clone(),
Box::new(runtime::memory::storage::FsStorage),
));
let cwd = std::env::current_dir().unwrap_or_else(|_| config_dir.to_path_buf());
Ok(Env::new(skills, mcp_handler, cwd, memory, host))
}
fn build_tool_sender(event_tx: &DaemonEventSender) -> wcore::ToolSender {
let (tool_tx, mut tool_rx) = tokio::sync::mpsc::unbounded_channel::<ToolRequest>();
let event_tx = event_tx.clone();
tokio::spawn(async move {
while let Some(req) = tool_rx.recv().await {
if event_tx.send(DaemonEvent::ToolCall(req)).is_err() {
break;
}
}
});
tool_tx
}
fn load_agents<P: Provider + 'static, H: Host + 'static>(
runtime: &mut Runtime<P, Env<H>>,
config: &DaemonConfig,
manifest: &ResolvedManifest,
) -> Result<()> {
let prompts = crate::config::load_agents_dirs(&manifest.agent_dirs)?;
let prompt_map: BTreeMap<String, String> = prompts.into_iter().collect();
let default_model = config
.system
.crab
.model
.clone()
.ok_or_else(|| anyhow::anyhow!("system.crab.model is required in config.toml"))?;
let mut crab_config = config.system.crab.clone();
crab_config.name = wcore::paths::DEFAULT_AGENT.to_owned();
crab_config.system_prompt = SYSTEM_AGENT.to_owned();
runtime.add_agent(crab_config.clone());
for (name, agent_config) in &manifest.agents {
if name == wcore::paths::DEFAULT_AGENT {
tracing::warn!(
"agents.{name} overrides the built-in system agent and will be ignored — \
configure it under [system.crab] instead"
);
continue;
}
let Some(prompt) = prompt_map.get(name) else {
tracing::warn!("agent '{name}' in manifest has no matching .md file, skipping");
continue;
};
let mut agent = agent_config.clone();
agent.name = name.clone();
agent.system_prompt = prompt.clone();
if agent.model.is_none() {
agent.model = Some(default_model.clone());
}
resolve_plugin_skills(&mut agent.skills, &manifest.plugin_skill_dirs);
tracing::info!("registered agent '{name}' (thinking={})", agent.thinking);
runtime.add_agent(agent);
}
let default_think = config.system.crab.thinking;
for (stem, prompt) in &prompt_map {
if stem == wcore::paths::DEFAULT_AGENT {
tracing::warn!(
"agents/{stem}.md shadows the built-in system agent and will be ignored"
);
continue;
}
if manifest.agents.contains_key(stem) {
continue;
}
let mut agent = AgentConfig::new(stem.as_str());
agent.system_prompt = prompt.clone();
agent.thinking = default_think;
agent.model = Some(default_model.clone());
tracing::info!("registered agent '{stem}' (defaults, thinking={default_think})");
runtime.add_agent(agent);
}
for agent_config in runtime.agents() {
runtime
.hook
.register_scope(agent_config.name.clone(), &agent_config);
}
Ok(())
}