Skip to main content

crabtalk_daemon/daemon/
builder.rs

1//! Daemon construction and lifecycle methods.
2
3use crate::hook::DaemonHook;
4use crate::{
5    Daemon, DaemonConfig,
6    config::{ResolvedManifest, resolve_manifests},
7    daemon::event::{DaemonEvent, DaemonEventSender},
8    hook::bridge::DaemonBridge,
9};
10use anyhow::Result;
11use model::ProviderRegistry;
12use runtime::{RuntimeHook, SkillHandler, mcp::McpHandler, memory::Memory};
13use std::{
14    collections::{BTreeMap, HashMap},
15    path::{Path, PathBuf},
16    sync::Arc,
17};
18use tokio::sync::{Mutex, RwLock, broadcast};
19use wcore::{AgentConfig, Runtime, ToolRequest};
20
21/// Resolve qualified package references in an agent's skill list.
22fn resolve_package_skills(
23    skills: &mut Vec<String>,
24    package_skill_dirs: &BTreeMap<String, PathBuf>,
25) {
26    let mut resolved = Vec::new();
27    for entry in skills.drain(..) {
28        if entry.contains('/') {
29            if let Some(dir) = package_skill_dirs.get(&entry) {
30                match runtime::skill::loader::load_skills_dir(dir) {
31                    Ok(registry) => {
32                        for skill in registry.skills() {
33                            resolved.push(skill.name.clone());
34                        }
35                    }
36                    Err(e) => {
37                        tracing::warn!("failed to resolve package skills for '{entry}': {e}");
38                    }
39                }
40            } else {
41                tracing::warn!("unknown package skill reference: '{entry}'");
42            }
43        } else {
44            resolved.push(entry);
45        }
46    }
47    *skills = resolved;
48}
49
50const SYSTEM_AGENT: &str = runtime::memory::DEFAULT_SOUL;
51
52impl Daemon {
53    /// Build a fully-configured [`Daemon`] from the given config, config
54    /// directory, and event sender.
55    pub(crate) async fn build(
56        config: &DaemonConfig,
57        config_dir: &Path,
58        event_tx: DaemonEventSender,
59        shutdown_tx: broadcast::Sender<()>,
60    ) -> Result<Self> {
61        let runtime = Self::build_runtime(config, config_dir, &event_tx).await?;
62        let cron_store = crate::cron::CronStore::load(
63            config_dir.join("crons.toml"),
64            event_tx.clone(),
65            shutdown_tx,
66        );
67        let crons = Arc::new(Mutex::new(cron_store));
68        crons.lock().await.start_all(crons.clone());
69        Ok(Self {
70            runtime: Arc::new(RwLock::new(Arc::new(runtime))),
71            config_dir: config_dir.to_path_buf(),
72            event_tx,
73            started_at: std::time::Instant::now(),
74            crons,
75        })
76    }
77
78    /// Rebuild the runtime from disk and swap it in atomically.
79    pub async fn reload(&self) -> Result<()> {
80        let config = DaemonConfig::load(&self.config_dir.join(wcore::paths::CONFIG_FILE))?;
81        let mut new_runtime =
82            Self::build_runtime(&config, &self.config_dir, &self.event_tx).await?;
83
84        {
85            let old_runtime = self.runtime.read().await;
86            (**old_runtime).transfer_sessions(&mut new_runtime).await;
87        }
88
89        *self.runtime.write().await = Arc::new(new_runtime);
90        tracing::info!("daemon reloaded");
91        Ok(())
92    }
93
94    /// Construct a fresh [`Runtime`] from config.
95    async fn build_runtime(
96        config: &DaemonConfig,
97        config_dir: &Path,
98        event_tx: &DaemonEventSender,
99    ) -> Result<Runtime<ProviderRegistry, DaemonHook>> {
100        let manager = Self::build_providers(config)?;
101        let (manifest, _warnings) = resolve_manifests(config_dir);
102        let hook = Self::build_hook(config, config_dir, &manifest, event_tx).await?;
103        let tool_tx = Self::build_tool_sender(event_tx);
104        let mut runtime = Runtime::new(manager, hook, Some(tool_tx)).await;
105        Self::load_agents(&mut runtime, config, &manifest)?;
106        Ok(runtime)
107    }
108
109    /// Construct the provider registry from config.
110    fn build_providers(config: &DaemonConfig) -> Result<ProviderRegistry> {
111        let active_model = config
112            .system
113            .crab
114            .model
115            .clone()
116            .ok_or_else(|| anyhow::anyhow!("system.crab.model is required in config.toml"))?;
117        let registry = ProviderRegistry::from_providers(active_model.clone(), &config.provider)?;
118
119        tracing::info!(
120            "provider registry initialized — active model: {}",
121            registry.active_model_name().unwrap_or_default()
122        );
123        Ok(registry)
124    }
125
126    /// Build the daemon hook with all backends (skills, MCP, tasks, memory).
127    async fn build_hook(
128        config: &DaemonConfig,
129        config_dir: &Path,
130        manifest: &ResolvedManifest,
131        event_tx: &DaemonEventSender,
132    ) -> Result<DaemonHook> {
133        let skills = SkillHandler::load(manifest.skill_dirs.clone()).unwrap_or_else(|e| {
134            tracing::warn!("failed to load skills: {e}");
135            SkillHandler::default()
136        });
137
138        // Inject [env] from config.toml into each MCP's env map.
139        let mcp_servers: Vec<_> = manifest
140            .mcps
141            .values()
142            .map(|mcp| {
143                let mut mcp = mcp.clone();
144                for (k, v) in &config.env {
145                    mcp.env.entry(k.clone()).or_insert_with(|| v.clone());
146                }
147                mcp
148            })
149            .collect();
150        let mcp_handler = McpHandler::load(&mcp_servers).await;
151
152        let memory = Some(Memory::open(
153            config_dir.join("memory"),
154            config.system.memory.clone(),
155            Box::new(runtime::memory::storage::FsStorage),
156        ));
157
158        let cwd = std::env::current_dir().unwrap_or_else(|_| config_dir.to_path_buf());
159
160        let (events_tx, _) = tokio::sync::broadcast::channel(256);
161        let bridge = DaemonBridge {
162            event_tx: event_tx.clone(),
163            pending_asks: Arc::new(Mutex::new(HashMap::new())),
164            session_cwds: Arc::new(Mutex::new(HashMap::new())),
165            events_tx,
166        };
167
168        Ok(RuntimeHook::new(skills, mcp_handler, cwd, memory, bridge))
169    }
170
171    /// Build a [`ToolSender`] that forwards [`ToolRequest`]s into the daemon
172    /// event loop as [`DaemonEvent::ToolCall`] variants.
173    fn build_tool_sender(event_tx: &DaemonEventSender) -> wcore::ToolSender {
174        let (tool_tx, mut tool_rx) = tokio::sync::mpsc::unbounded_channel::<ToolRequest>();
175        let event_tx = event_tx.clone();
176        tokio::spawn(async move {
177            while let Some(req) = tool_rx.recv().await {
178                if event_tx.send(DaemonEvent::ToolCall(req)).is_err() {
179                    break;
180                }
181            }
182        });
183        tool_tx
184    }
185
186    /// Load agents and add them to the runtime.
187    fn load_agents(
188        runtime: &mut Runtime<ProviderRegistry, DaemonHook>,
189        config: &DaemonConfig,
190        manifest: &ResolvedManifest,
191    ) -> Result<()> {
192        let prompts = crate::config::load_agents_dirs(&manifest.agent_dirs)?;
193        let prompt_map: BTreeMap<String, String> = prompts.into_iter().collect();
194
195        // Built-in crab agent.
196        let mut crab_config = config.system.crab.clone();
197        crab_config.name = wcore::paths::DEFAULT_AGENT.to_owned();
198        crab_config.system_prompt = SYSTEM_AGENT.to_owned();
199        runtime.add_agent(crab_config);
200
201        // Sub-agents from manifests.
202        for (name, agent_config) in &manifest.agents {
203            if name == wcore::paths::DEFAULT_AGENT {
204                tracing::warn!(
205                    "agents.{name} overrides the built-in system agent and will be ignored — \
206                     configure it under [system.crab] instead"
207                );
208                continue;
209            }
210            let Some(prompt) = prompt_map.get(name) else {
211                tracing::warn!("agent '{name}' in manifest has no matching .md file, skipping");
212                continue;
213            };
214            let mut agent = agent_config.clone();
215            agent.name = name.clone();
216            agent.system_prompt = prompt.clone();
217            resolve_package_skills(&mut agent.skills, &manifest.package_skill_dirs);
218            tracing::info!("registered agent '{name}' (thinking={})", agent.thinking);
219            runtime.add_agent(agent);
220        }
221
222        // Also register agents that have .md files but no manifest entry.
223        let default_think = config.system.crab.thinking;
224        for (stem, prompt) in &prompt_map {
225            if stem == wcore::paths::DEFAULT_AGENT {
226                tracing::warn!(
227                    "agents/{stem}.md shadows the built-in system agent and will be ignored"
228                );
229                continue;
230            }
231            if manifest.agents.contains_key(stem) {
232                continue;
233            }
234            let mut agent = AgentConfig::new(stem.as_str());
235            agent.system_prompt = prompt.clone();
236            agent.thinking = default_think;
237            tracing::info!("registered agent '{stem}' (defaults, thinking={default_think})");
238            runtime.add_agent(agent);
239        }
240
241        // Populate per-agent scope maps.
242        for agent_config in runtime.agents() {
243            runtime
244                .hook
245                .register_scope(agent_config.name.clone(), &agent_config);
246        }
247
248        Ok(())
249    }
250}