1use crate::{
4 Daemon, DaemonConfig,
5 config::{ResolvedManifest, resolve_manifests},
6 daemon::event::{DaemonEvent, DaemonEventSender},
7};
8use anyhow::Result;
9use crabllm_core::Provider;
10use crabllm_provider::{ProviderRegistry, RemoteProvider};
11use runtime::{Env, SkillHandler, host::Host, mcp::McpHandler, memory::Memory};
12use std::{
13 collections::{BTreeMap, HashMap},
14 path::{Path, PathBuf},
15 sync::Arc,
16};
17use tokio::sync::{Mutex, RwLock, broadcast};
18use wcore::{AgentConfig, Runtime, ToolRequest, model::Model};
19
20pub type DefaultProvider = crate::provider::Retrying<ProviderRegistry<RemoteProvider>>;
30
31pub type BuildProvider<P> =
35 Arc<dyn Fn(&DaemonConfig) -> Result<wcore::model::Model<P>> + Send + Sync>;
36
37pub fn build_default_provider(config: &DaemonConfig) -> Result<Model<DefaultProvider>> {
43 build_providers(config)
44}
45
46fn resolve_plugin_skills(skills: &mut Vec<String>, plugin_skill_dirs: &BTreeMap<String, PathBuf>) {
48 let mut resolved = Vec::new();
49 for entry in skills.drain(..) {
50 if entry.contains('/') {
51 if let Some(dir) = plugin_skill_dirs.get(&entry) {
52 match runtime::skill::loader::load_skills_dir(dir) {
53 Ok(registry) => {
54 for skill in ®istry.skills {
55 resolved.push(skill.name.clone());
56 }
57 }
58 Err(e) => {
59 tracing::warn!("failed to resolve plugin skills for '{entry}': {e}");
60 }
61 }
62 } else {
63 tracing::warn!("unknown plugin skill reference: '{entry}'");
64 }
65 } else {
66 resolved.push(entry);
67 }
68 }
69 *skills = resolved;
70}
71
72const SYSTEM_AGENT: &str = runtime::memory::DEFAULT_SOUL;
73
74impl<P: Provider + 'static, H: Host + 'static> Daemon<P, H> {
75 pub(crate) async fn build(
78 config: &DaemonConfig,
79 config_dir: &Path,
80 event_tx: DaemonEventSender,
81 shutdown_tx: broadcast::Sender<()>,
82 host: H,
83 build_provider: BuildProvider<P>,
84 ) -> Result<Self> {
85 let runtime =
86 Self::build_runtime(config, config_dir, &event_tx, host, &build_provider).await?;
87 let cron_store = crate::cron::CronStore::load(
88 config_dir.join("crons.toml"),
89 event_tx.clone(),
90 shutdown_tx,
91 );
92 let crons = Arc::new(Mutex::new(cron_store));
93 crons.lock().await.start_all(crons.clone());
94 let event_bus =
95 crate::event_bus::EventBus::load(config_dir.join("events.toml"), event_tx.clone());
96 let events = Arc::new(Mutex::new(event_bus));
97 Ok(Self {
98 runtime: Arc::new(RwLock::new(Arc::new(runtime))),
99 config_dir: config_dir.to_path_buf(),
100 event_tx,
101 started_at: std::time::Instant::now(),
102 crons,
103 events,
104 build_provider,
105 })
106 }
107
108 pub async fn reload(&self) -> Result<()> {
115 let config = DaemonConfig::load(&self.config_dir.join(wcore::paths::CONFIG_FILE))?;
116 let host = {
117 let old_rt = self.runtime.read().await;
118 old_rt.hook.host.clone()
119 };
120 let mut new_runtime = Self::build_runtime(
121 &config,
122 &self.config_dir,
123 &self.event_tx,
124 host,
125 &self.build_provider,
126 )
127 .await?;
128 {
129 let old_runtime = self.runtime.read().await;
130 (**old_runtime)
131 .transfer_conversations(&mut new_runtime)
132 .await;
133 }
134 *self.runtime.write().await = Arc::new(new_runtime);
135 tracing::info!("daemon reloaded");
136 Ok(())
137 }
138
139 async fn build_runtime(
142 config: &DaemonConfig,
143 config_dir: &Path,
144 event_tx: &DaemonEventSender,
145 host: H,
146 build_provider: &BuildProvider<P>,
147 ) -> Result<Runtime<P, Env<H>>> {
148 let (mut manifest, _warnings) = resolve_manifests(config_dir);
149 manifest.disabled = config.disabled.clone();
150 wcore::filter_disabled_external(&mut manifest.skill_dirs, &manifest.disabled.external);
151 let model = build_provider(config)?;
152 let hook = build_env(config, config_dir, &manifest, host).await?;
153 let tool_tx = build_tool_sender(event_tx);
154 let mut runtime = Runtime::new(model, hook, Some(tool_tx)).await;
155 load_agents(&mut runtime, config, &manifest)?;
156 Ok(runtime)
157 }
158}
159
160fn build_providers(config: &DaemonConfig) -> Result<Model<DefaultProvider>> {
165 let providers: HashMap<String, _> = config
168 .provider
169 .iter()
170 .filter(|(name, _)| !config.disabled.providers.contains(name))
171 .map(|(k, v)| (k.clone(), v.clone()))
172 .collect();
173 let provider_count = providers.len();
174 let model_count: usize = providers.values().map(|def| def.models.len()).sum();
175
176 let registry = ProviderRegistry::from_provider_configs(&providers, &HashMap::new(), |r| r)?;
177 let retrying = crate::provider::Retrying::new(registry);
178
179 tracing::info!(
180 "provider registry initialized — {model_count} models across {provider_count} providers"
181 );
182 Ok(Model::new(retrying))
183}
184
185async fn build_env<H: Host>(
187 config: &DaemonConfig,
188 config_dir: &Path,
189 manifest: &ResolvedManifest,
190 host: H,
191) -> Result<Env<H>> {
192 let skills = SkillHandler::load(manifest.skill_dirs.clone(), &manifest.disabled.skills)
193 .unwrap_or_else(|e| {
194 tracing::warn!("failed to load skills: {e}");
195 SkillHandler::default()
196 });
197
198 let mcp_servers: Vec<_> = manifest
200 .mcps
201 .iter()
202 .filter(|(name, _)| !manifest.disabled.mcps.contains(name))
203 .map(|(_, mcp)| {
204 let mut mcp = mcp.clone();
205 for (k, v) in &config.env {
206 mcp.env.entry(k.clone()).or_insert_with(|| v.clone());
207 }
208 mcp
209 })
210 .collect();
211 let mcp_handler = McpHandler::load(&mcp_servers).await;
212
213 let memory = Some(Memory::open(
214 config_dir.join("memory"),
215 config.system.memory.clone(),
216 Box::new(runtime::memory::storage::FsStorage),
217 ));
218
219 let cwd = std::env::current_dir().unwrap_or_else(|_| config_dir.to_path_buf());
220
221 Ok(Env::new(skills, mcp_handler, cwd, memory, host))
222}
223
224fn build_tool_sender(event_tx: &DaemonEventSender) -> wcore::ToolSender {
227 let (tool_tx, mut tool_rx) = tokio::sync::mpsc::unbounded_channel::<ToolRequest>();
228 let event_tx = event_tx.clone();
229 tokio::spawn(async move {
230 while let Some(req) = tool_rx.recv().await {
231 if event_tx.send(DaemonEvent::ToolCall(req)).is_err() {
232 break;
233 }
234 }
235 });
236 tool_tx
237}
238
239fn load_agents<P: Provider + 'static, H: Host + 'static>(
241 runtime: &mut Runtime<P, Env<H>>,
242 config: &DaemonConfig,
243 manifest: &ResolvedManifest,
244) -> Result<()> {
245 let prompts = crate::config::load_agents_dirs(&manifest.agent_dirs)?;
246 let prompt_map: BTreeMap<String, String> = prompts.into_iter().collect();
247
248 let default_model = config
252 .system
253 .crab
254 .model
255 .clone()
256 .ok_or_else(|| anyhow::anyhow!("system.crab.model is required in config.toml"))?;
257
258 let mut crab_config = config.system.crab.clone();
260 crab_config.name = wcore::paths::DEFAULT_AGENT.to_owned();
261 crab_config.system_prompt = SYSTEM_AGENT.to_owned();
262 runtime.add_agent(crab_config.clone());
263
264 for (name, agent_config) in &manifest.agents {
266 if name == wcore::paths::DEFAULT_AGENT {
267 tracing::warn!(
268 "agents.{name} overrides the built-in system agent and will be ignored — \
269 configure it under [system.crab] instead"
270 );
271 continue;
272 }
273 let Some(prompt) = prompt_map.get(name) else {
274 tracing::warn!("agent '{name}' in manifest has no matching .md file, skipping");
275 continue;
276 };
277 let mut agent = agent_config.clone();
278 agent.name = name.clone();
279 agent.system_prompt = prompt.clone();
280 if agent.model.is_none() {
281 agent.model = Some(default_model.clone());
282 }
283 resolve_plugin_skills(&mut agent.skills, &manifest.plugin_skill_dirs);
284 tracing::info!("registered agent '{name}' (thinking={})", agent.thinking);
285 runtime.add_agent(agent);
286 }
287
288 let default_think = config.system.crab.thinking;
290 for (stem, prompt) in &prompt_map {
291 if stem == wcore::paths::DEFAULT_AGENT {
292 tracing::warn!(
293 "agents/{stem}.md shadows the built-in system agent and will be ignored"
294 );
295 continue;
296 }
297 if manifest.agents.contains_key(stem) {
298 continue;
299 }
300 let mut agent = AgentConfig::new(stem.as_str());
301 agent.system_prompt = prompt.clone();
302 agent.thinking = default_think;
303 agent.model = Some(default_model.clone());
304 tracing::info!("registered agent '{stem}' (defaults, thinking={default_think})");
305 runtime.add_agent(agent);
306 }
307
308 for agent_config in runtime.agents() {
310 runtime
311 .hook
312 .register_scope(agent_config.name.clone(), &agent_config);
313 }
314
315 Ok(())
316}