crabtalk_daemon/daemon/
builder.rs1use crate::hook::DaemonHook;
4use crate::{
5 Daemon, DaemonConfig,
6 config::{ResolvedManifest, resolve_manifests},
7 daemon::event::{DaemonEvent, DaemonEventSender},
8 hook::bridge::DaemonBridge,
9};
10use anyhow::Result;
11use model::ProviderRegistry;
12use runtime::{RuntimeHook, SkillHandler, mcp::McpHandler, memory::Memory};
13use std::{
14 collections::{BTreeMap, HashMap},
15 path::{Path, PathBuf},
16 sync::Arc,
17};
18use tokio::sync::{Mutex, RwLock, broadcast};
19use wcore::{AgentConfig, Runtime, ToolRequest};
20
21fn resolve_package_skills(
23 skills: &mut Vec<String>,
24 package_skill_dirs: &BTreeMap<String, PathBuf>,
25) {
26 let mut resolved = Vec::new();
27 for entry in skills.drain(..) {
28 if entry.contains('/') {
29 if let Some(dir) = package_skill_dirs.get(&entry) {
30 match runtime::skill::loader::load_skills_dir(dir) {
31 Ok(registry) => {
32 for skill in registry.skills() {
33 resolved.push(skill.name.clone());
34 }
35 }
36 Err(e) => {
37 tracing::warn!("failed to resolve package skills for '{entry}': {e}");
38 }
39 }
40 } else {
41 tracing::warn!("unknown package skill reference: '{entry}'");
42 }
43 } else {
44 resolved.push(entry);
45 }
46 }
47 *skills = resolved;
48}
49
50const SYSTEM_AGENT: &str = runtime::memory::DEFAULT_SOUL;
51
52impl Daemon {
53 pub(crate) async fn build(
56 config: &DaemonConfig,
57 config_dir: &Path,
58 event_tx: DaemonEventSender,
59 shutdown_tx: broadcast::Sender<()>,
60 ) -> Result<Self> {
61 let runtime = Self::build_runtime(config, config_dir, &event_tx).await?;
62 let cron_store = crate::cron::CronStore::load(
63 config_dir.join("crons.toml"),
64 event_tx.clone(),
65 shutdown_tx,
66 );
67 let crons = Arc::new(Mutex::new(cron_store));
68 crons.lock().await.start_all(crons.clone());
69 Ok(Self {
70 runtime: Arc::new(RwLock::new(Arc::new(runtime))),
71 config_dir: config_dir.to_path_buf(),
72 event_tx,
73 started_at: std::time::Instant::now(),
74 crons,
75 })
76 }
77
78 pub async fn reload(&self) -> Result<()> {
80 let config = DaemonConfig::load(&self.config_dir.join(wcore::paths::CONFIG_FILE))?;
81 let mut new_runtime =
82 Self::build_runtime(&config, &self.config_dir, &self.event_tx).await?;
83
84 {
85 let old_runtime = self.runtime.read().await;
86 (**old_runtime).transfer_sessions(&mut new_runtime).await;
87 }
88
89 *self.runtime.write().await = Arc::new(new_runtime);
90 tracing::info!("daemon reloaded");
91 Ok(())
92 }
93
94 async fn build_runtime(
96 config: &DaemonConfig,
97 config_dir: &Path,
98 event_tx: &DaemonEventSender,
99 ) -> Result<Runtime<ProviderRegistry, DaemonHook>> {
100 let manager = Self::build_providers(config)?;
101 let (manifest, _warnings) = resolve_manifests(config_dir);
102 let hook = Self::build_hook(config, config_dir, &manifest, event_tx).await?;
103 let tool_tx = Self::build_tool_sender(event_tx);
104 let mut runtime = Runtime::new(manager, hook, Some(tool_tx)).await;
105 Self::load_agents(&mut runtime, config, &manifest)?;
106 Ok(runtime)
107 }
108
109 fn build_providers(config: &DaemonConfig) -> Result<ProviderRegistry> {
111 let active_model = config
112 .system
113 .crab
114 .model
115 .clone()
116 .ok_or_else(|| anyhow::anyhow!("system.crab.model is required in config.toml"))?;
117 let registry = ProviderRegistry::from_providers(active_model.clone(), &config.provider)?;
118
119 tracing::info!(
120 "provider registry initialized — active model: {}",
121 registry.active_model_name().unwrap_or_default()
122 );
123 Ok(registry)
124 }
125
126 async fn build_hook(
128 config: &DaemonConfig,
129 config_dir: &Path,
130 manifest: &ResolvedManifest,
131 event_tx: &DaemonEventSender,
132 ) -> Result<DaemonHook> {
133 let skills = SkillHandler::load(manifest.skill_dirs.clone()).unwrap_or_else(|e| {
134 tracing::warn!("failed to load skills: {e}");
135 SkillHandler::default()
136 });
137
138 let mcp_servers: Vec<_> = manifest
140 .mcps
141 .values()
142 .map(|mcp| {
143 let mut mcp = mcp.clone();
144 for (k, v) in &config.env {
145 mcp.env.entry(k.clone()).or_insert_with(|| v.clone());
146 }
147 mcp
148 })
149 .collect();
150 let mcp_handler = McpHandler::load(&mcp_servers).await;
151
152 let memory = Some(Memory::open(
153 config_dir.join("memory"),
154 config.system.memory.clone(),
155 Box::new(runtime::memory::storage::FsStorage),
156 ));
157
158 let cwd = std::env::current_dir().unwrap_or_else(|_| config_dir.to_path_buf());
159
160 let (events_tx, _) = tokio::sync::broadcast::channel(256);
161 let bridge = DaemonBridge {
162 event_tx: event_tx.clone(),
163 pending_asks: Arc::new(Mutex::new(HashMap::new())),
164 session_cwds: Arc::new(Mutex::new(HashMap::new())),
165 events_tx,
166 };
167
168 Ok(RuntimeHook::new(skills, mcp_handler, cwd, memory, bridge))
169 }
170
171 fn build_tool_sender(event_tx: &DaemonEventSender) -> wcore::ToolSender {
174 let (tool_tx, mut tool_rx) = tokio::sync::mpsc::unbounded_channel::<ToolRequest>();
175 let event_tx = event_tx.clone();
176 tokio::spawn(async move {
177 while let Some(req) = tool_rx.recv().await {
178 if event_tx.send(DaemonEvent::ToolCall(req)).is_err() {
179 break;
180 }
181 }
182 });
183 tool_tx
184 }
185
186 fn load_agents(
188 runtime: &mut Runtime<ProviderRegistry, DaemonHook>,
189 config: &DaemonConfig,
190 manifest: &ResolvedManifest,
191 ) -> Result<()> {
192 let prompts = crate::config::load_agents_dirs(&manifest.agent_dirs)?;
193 let prompt_map: BTreeMap<String, String> = prompts.into_iter().collect();
194
195 let mut crab_config = config.system.crab.clone();
197 crab_config.name = wcore::paths::DEFAULT_AGENT.to_owned();
198 crab_config.system_prompt = SYSTEM_AGENT.to_owned();
199 runtime.add_agent(crab_config);
200
201 for (name, agent_config) in &manifest.agents {
203 if name == wcore::paths::DEFAULT_AGENT {
204 tracing::warn!(
205 "agents.{name} overrides the built-in system agent and will be ignored — \
206 configure it under [system.crab] instead"
207 );
208 continue;
209 }
210 let Some(prompt) = prompt_map.get(name) else {
211 tracing::warn!("agent '{name}' in manifest has no matching .md file, skipping");
212 continue;
213 };
214 let mut agent = agent_config.clone();
215 agent.name = name.clone();
216 agent.system_prompt = prompt.clone();
217 resolve_package_skills(&mut agent.skills, &manifest.package_skill_dirs);
218 tracing::info!("registered agent '{name}' (thinking={})", agent.thinking);
219 runtime.add_agent(agent);
220 }
221
222 let default_think = config.system.crab.thinking;
224 for (stem, prompt) in &prompt_map {
225 if stem == wcore::paths::DEFAULT_AGENT {
226 tracing::warn!(
227 "agents/{stem}.md shadows the built-in system agent and will be ignored"
228 );
229 continue;
230 }
231 if manifest.agents.contains_key(stem) {
232 continue;
233 }
234 let mut agent = AgentConfig::new(stem.as_str());
235 agent.system_prompt = prompt.clone();
236 agent.thinking = default_think;
237 tracing::info!("registered agent '{stem}' (defaults, thinking={default_think})");
238 runtime.add_agent(agent);
239 }
240
241 for agent_config in runtime.agents() {
243 runtime
244 .hook
245 .register_scope(agent_config.name.clone(), &agent_config);
246 }
247
248 Ok(())
249 }
250}