Skip to main content

crabtalk_daemon/daemon/
builder.rs

1//! Daemon construction and lifecycle methods.
2
3use crate::{
4    Daemon, DaemonConfig,
5    config::{ResolvedManifest, resolve_manifests},
6    daemon::event::{DaemonEvent, DaemonEventSender},
7};
8use anyhow::Result;
9use crabllm_core::Provider;
10use crabllm_provider::{ProviderRegistry, RemoteProvider};
11use runtime::{Env, SkillHandler, host::Host, mcp::McpHandler, memory::Memory};
12use std::{
13    collections::{BTreeMap, HashMap},
14    path::{Path, PathBuf},
15    sync::Arc,
16};
17use tokio::sync::{Mutex, RwLock, broadcast};
18use wcore::{AgentConfig, Runtime, ToolRequest, model::Model};
19
20/// The concrete provider type the default daemon uses: a `crabllm`
21/// `ProviderRegistry<RemoteProvider>` wrapped in a `Retrying` layer. The
22/// registry implements `crabllm_core::Provider` via model-name routing;
23/// `Retrying` adds the exponential-backoff loop and per-call timeout that
24/// the daemon expects from a production deployment.
25///
26/// Exposed (pub) so downstream consumers can name the type explicitly,
27/// e.g. `Daemon<DefaultProvider, MyHost>` or as a bound for helper
28/// functions that thread P through without caring what it is.
29pub type DefaultProvider = crate::provider::Retrying<ProviderRegistry<RemoteProvider>>;
30
31/// Closure that builds a `Model<P>` from a `DaemonConfig`. Stored on
32/// `Daemon` so `reload()` can call it with the freshly-loaded config.
33/// `Arc<dyn Fn>` so `Daemon` remains `Clone` regardless of concrete P.
34pub type BuildProvider<P> =
35    Arc<dyn Fn(&DaemonConfig) -> Result<wcore::model::Model<P>> + Send + Sync>;
36
37/// Construct the default `Model<DefaultProvider>` from a config.
38///
39/// This is the function the `Daemon::start` convenience path uses. Apple
40/// app and other library consumers supply their own closure with a
41/// different return type.
42pub fn build_default_provider(config: &DaemonConfig) -> Result<Model<DefaultProvider>> {
43    build_providers(config)
44}
45
46/// Resolve qualified plugin references in an agent's skill list.
47fn resolve_plugin_skills(skills: &mut Vec<String>, plugin_skill_dirs: &BTreeMap<String, PathBuf>) {
48    let mut resolved = Vec::new();
49    for entry in skills.drain(..) {
50        if entry.contains('/') {
51            if let Some(dir) = plugin_skill_dirs.get(&entry) {
52                match runtime::skill::loader::load_skills_dir(dir) {
53                    Ok(registry) => {
54                        for skill in &registry.skills {
55                            resolved.push(skill.name.clone());
56                        }
57                    }
58                    Err(e) => {
59                        tracing::warn!("failed to resolve plugin skills for '{entry}': {e}");
60                    }
61                }
62            } else {
63                tracing::warn!("unknown plugin skill reference: '{entry}'");
64            }
65        } else {
66            resolved.push(entry);
67        }
68    }
69    *skills = resolved;
70}
71
72const SYSTEM_AGENT: &str = runtime::memory::DEFAULT_SOUL;
73
74impl<P: Provider + 'static, H: Host + 'static> Daemon<P, H> {
75    /// Build a fully-configured [`Daemon`] from the given config, config
76    /// directory, event sender, backend, and provider-builder closure.
77    pub(crate) async fn build(
78        config: &DaemonConfig,
79        config_dir: &Path,
80        event_tx: DaemonEventSender,
81        shutdown_tx: broadcast::Sender<()>,
82        host: H,
83        build_provider: BuildProvider<P>,
84    ) -> Result<Self> {
85        let runtime =
86            Self::build_runtime(config, config_dir, &event_tx, host, &build_provider).await?;
87        let cron_store = crate::cron::CronStore::load(
88            config_dir.join("crons.toml"),
89            event_tx.clone(),
90            shutdown_tx,
91        );
92        let crons = Arc::new(Mutex::new(cron_store));
93        crons.lock().await.start_all(crons.clone());
94        let event_bus =
95            crate::event_bus::EventBus::load(config_dir.join("events.toml"), event_tx.clone());
96        let events = Arc::new(Mutex::new(event_bus));
97        Ok(Self {
98            runtime: Arc::new(RwLock::new(Arc::new(runtime))),
99            config_dir: config_dir.to_path_buf(),
100            event_tx,
101            started_at: std::time::Instant::now(),
102            crons,
103            events,
104            build_provider,
105        })
106    }
107
108    /// Rebuild the runtime from disk and swap it in atomically.
109    ///
110    /// Clones the backend from the current runtime so shared state
111    /// (channels, pending asks) is preserved across reloads. The
112    /// provider-builder closure stored on `Daemon` is re-run with the
113    /// fresh config to construct the new `Model<P>`.
114    pub async fn reload(&self) -> Result<()> {
115        let config = DaemonConfig::load(&self.config_dir.join(wcore::paths::CONFIG_FILE))?;
116        let host = {
117            let old_rt = self.runtime.read().await;
118            old_rt.hook.host.clone()
119        };
120        let mut new_runtime = Self::build_runtime(
121            &config,
122            &self.config_dir,
123            &self.event_tx,
124            host,
125            &self.build_provider,
126        )
127        .await?;
128        {
129            let old_runtime = self.runtime.read().await;
130            (**old_runtime)
131                .transfer_conversations(&mut new_runtime)
132                .await;
133        }
134        *self.runtime.write().await = Arc::new(new_runtime);
135        tracing::info!("daemon reloaded");
136        Ok(())
137    }
138
139    /// Construct a fresh [`Runtime`] from config with the given backend
140    /// and provider builder.
141    async fn build_runtime(
142        config: &DaemonConfig,
143        config_dir: &Path,
144        event_tx: &DaemonEventSender,
145        host: H,
146        build_provider: &BuildProvider<P>,
147    ) -> Result<Runtime<P, Env<H>>> {
148        let (mut manifest, _warnings) = resolve_manifests(config_dir);
149        manifest.disabled = config.disabled.clone();
150        wcore::filter_disabled_external(&mut manifest.skill_dirs, &manifest.disabled.external);
151        let model = build_provider(config)?;
152        let hook = build_env(config, config_dir, &manifest, host).await?;
153        let tool_tx = build_tool_sender(event_tx);
154        let mut runtime = Runtime::new(model, hook, Some(tool_tx)).await;
155        load_agents(&mut runtime, config, &manifest)?;
156        Ok(runtime)
157    }
158}
159
160/// Construct the provider registry from config, filtering out disabled
161/// providers. Returns the registry wrapped in `Retrying` (for retry +
162/// timeout) and then in `Model<P>` so the caller can hand it directly to
163/// `Runtime::new`.
164fn build_providers(config: &DaemonConfig) -> Result<Model<DefaultProvider>> {
165    // Filter out disabled providers and convert from BTreeMap to HashMap
166    // (crabllm's `from_provider_configs` takes a HashMap).
167    let providers: HashMap<String, _> = config
168        .provider
169        .iter()
170        .filter(|(name, _)| !config.disabled.providers.contains(name))
171        .map(|(k, v)| (k.clone(), v.clone()))
172        .collect();
173    let provider_count = providers.len();
174    let model_count: usize = providers.values().map(|def| def.models.len()).sum();
175
176    let registry = ProviderRegistry::from_provider_configs(&providers, &HashMap::new(), |r| r)?;
177    let retrying = crate::provider::Retrying::new(registry);
178
179    tracing::info!(
180        "provider registry initialized — {model_count} models across {provider_count} providers"
181    );
182    Ok(Model::new(retrying))
183}
184
185/// Build the engine environment with all backends (skills, MCP, memory).
186async fn build_env<H: Host>(
187    config: &DaemonConfig,
188    config_dir: &Path,
189    manifest: &ResolvedManifest,
190    host: H,
191) -> Result<Env<H>> {
192    let skills = SkillHandler::load(manifest.skill_dirs.clone(), &manifest.disabled.skills)
193        .unwrap_or_else(|e| {
194            tracing::warn!("failed to load skills: {e}");
195            SkillHandler::default()
196        });
197
198    // Inject [env] from config.toml into each MCP's env map, skipping disabled.
199    let mcp_servers: Vec<_> = manifest
200        .mcps
201        .iter()
202        .filter(|(name, _)| !manifest.disabled.mcps.contains(name))
203        .map(|(_, mcp)| {
204            let mut mcp = mcp.clone();
205            for (k, v) in &config.env {
206                mcp.env.entry(k.clone()).or_insert_with(|| v.clone());
207            }
208            mcp
209        })
210        .collect();
211    let mcp_handler = McpHandler::load(&mcp_servers).await;
212
213    let memory = Some(Memory::open(
214        config_dir.join("memory"),
215        config.system.memory.clone(),
216        Box::new(runtime::memory::storage::FsStorage),
217    ));
218
219    let cwd = std::env::current_dir().unwrap_or_else(|_| config_dir.to_path_buf());
220
221    Ok(Env::new(skills, mcp_handler, cwd, memory, host))
222}
223
224/// Build a [`ToolSender`] that forwards [`ToolRequest`]s into the daemon
225/// event loop as [`DaemonEvent::ToolCall`] variants.
226fn build_tool_sender(event_tx: &DaemonEventSender) -> wcore::ToolSender {
227    let (tool_tx, mut tool_rx) = tokio::sync::mpsc::unbounded_channel::<ToolRequest>();
228    let event_tx = event_tx.clone();
229    tokio::spawn(async move {
230        while let Some(req) = tool_rx.recv().await {
231            if event_tx.send(DaemonEvent::ToolCall(req)).is_err() {
232                break;
233            }
234        }
235    });
236    tool_tx
237}
238
239/// Load agents and add them to the runtime.
240fn load_agents<P: Provider + 'static, H: Host + 'static>(
241    runtime: &mut Runtime<P, Env<H>>,
242    config: &DaemonConfig,
243    manifest: &ResolvedManifest,
244) -> Result<()> {
245    let prompts = crate::config::load_agents_dirs(&manifest.agent_dirs)?;
246    let prompt_map: BTreeMap<String, String> = prompts.into_iter().collect();
247
248    // The daemon-wide default model. Required because every agent must
249    // resolve to a concrete model name at registration time — there is no
250    // longer a runtime fallback in the registry.
251    let default_model = config
252        .system
253        .crab
254        .model
255        .clone()
256        .ok_or_else(|| anyhow::anyhow!("system.crab.model is required in config.toml"))?;
257
258    // Built-in crab agent.
259    let mut crab_config = config.system.crab.clone();
260    crab_config.name = wcore::paths::DEFAULT_AGENT.to_owned();
261    crab_config.system_prompt = SYSTEM_AGENT.to_owned();
262    runtime.add_agent(crab_config.clone());
263
264    // Sub-agents from manifests.
265    for (name, agent_config) in &manifest.agents {
266        if name == wcore::paths::DEFAULT_AGENT {
267            tracing::warn!(
268                "agents.{name} overrides the built-in system agent and will be ignored — \
269                 configure it under [system.crab] instead"
270            );
271            continue;
272        }
273        let Some(prompt) = prompt_map.get(name) else {
274            tracing::warn!("agent '{name}' in manifest has no matching .md file, skipping");
275            continue;
276        };
277        let mut agent = agent_config.clone();
278        agent.name = name.clone();
279        agent.system_prompt = prompt.clone();
280        if agent.model.is_none() {
281            agent.model = Some(default_model.clone());
282        }
283        resolve_plugin_skills(&mut agent.skills, &manifest.plugin_skill_dirs);
284        tracing::info!("registered agent '{name}' (thinking={})", agent.thinking);
285        runtime.add_agent(agent);
286    }
287
288    // Also register agents that have .md files but no manifest entry.
289    let default_think = config.system.crab.thinking;
290    for (stem, prompt) in &prompt_map {
291        if stem == wcore::paths::DEFAULT_AGENT {
292            tracing::warn!(
293                "agents/{stem}.md shadows the built-in system agent and will be ignored"
294            );
295            continue;
296        }
297        if manifest.agents.contains_key(stem) {
298            continue;
299        }
300        let mut agent = AgentConfig::new(stem.as_str());
301        agent.system_prompt = prompt.clone();
302        agent.thinking = default_think;
303        agent.model = Some(default_model.clone());
304        tracing::info!("registered agent '{stem}' (defaults, thinking={default_think})");
305        runtime.add_agent(agent);
306    }
307
308    // Populate per-agent scope maps.
309    for agent_config in runtime.agents() {
310        runtime
311            .hook
312            .register_scope(agent_config.name.clone(), &agent_config);
313    }
314
315    Ok(())
316}