1use std::future::Future;
5use std::pin::Pin;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::{Arc, Mutex};
8use std::time::{Duration, Instant};
9
10use daemon8_store::{
11 ActiveSessionState, DebugSessionStore, LensManager, LibrarianStore, MemoryStore, StateModel,
12};
13use daemon8_types::{Checkpoint, DevicePlatform, Filter, Observation, SourceActivator};
14use rmcp::handler::server::router::tool::ToolRouter;
15use rmcp::handler::server::wrapper::Parameters;
16use rmcp::model::{Implementation, ServerCapabilities, ServerInfo, Tool};
17use rmcp::schemars::{self, JsonSchema};
18use rmcp::{RoleServer, ServerHandler, tool, tool_router};
19use serde::Deserialize;
20use tokio::sync::broadcast;
21use tracing::Instrument;
22
23pub mod envelope;
24pub mod help;
25use envelope::{ActiveSessionEcho, DaemonMeta};
26use help::FeatureGate;
27
28const INSTRUCTIONS: &str = include_str!("../tool_descriptions/instructions.md");
29static MCP_SESSION_COUNTER: AtomicU64 = AtomicU64::new(1);
30
31fn next_mcp_session_id() -> String {
32 let id = MCP_SESSION_COUNTER.fetch_add(1, Ordering::Relaxed);
33 format!("mcp-{id}")
34}
35
36pub struct DeviceScreenshotResult {
37 pub png_bytes: Vec<u8>,
38 pub source: String,
39}
40
41pub type DeviceScreenshotFn = Arc<
45 dyn Fn(
46 String,
47 DevicePlatform,
48 ) -> Pin<Box<dyn Future<Output = anyhow::Result<DeviceScreenshotResult>> + Send>>
49 + Send
50 + Sync,
51>;
52
53#[derive(Debug)]
54pub enum ChromeCommand {
55 Connect { endpoint: String },
56 Action(daemon8_chrome::BrowserAction),
57}
58
59#[derive(Debug, Deserialize, JsonSchema)]
60pub struct ObserveParams {
61 #[schemars(
62 description = "Filter by observation kind: log, query, http_exchange, exception, js_exception, lifecycle, state_snapshot, metric, custom. Browser console output is 'log', browser JS errors are 'js_exception', page load events are 'lifecycle', network requests are 'http_exchange'."
63 )]
64 pub kinds: Option<Vec<String>>,
65
66 #[schemars(description = "Minimum severity threshold: trace, debug, info, warn, error")]
67 pub severity_min: Option<String>,
68
69 #[schemars(
70 description = "Filter by origin pattern: 'app' or 'app:name' for applications, 'browser' or 'browser:tab_id' for browser tabs, 'device' or 'device:serial' for devices. Omit to see all origins."
71 )]
72 pub origins: Option<Vec<String>>,
73
74 #[schemars(description = "Search across materialized observation text")]
75 pub text_match: Option<String>,
76
77 #[schemars(description = "Return only observations after this checkpoint id")]
78 pub since_checkpoint: Option<u64>,
79
80 #[schemars(description = "Maximum number of results to return (default 50)")]
81 pub limit: Option<usize>,
82
83 #[schemars(description = "Filter by correlation ID (exact match)")]
84 pub correlation_id: Option<String>,
85
86 #[schemars(description = "Filter by tags (all listed tags must be present)")]
87 pub tags: Option<Vec<String>>,
88
89 #[schemars(
90 description = "Include system/infrastructure observations (tagged '_system'). These are excluded by default to reduce noise from CLI hooks and internal tooling."
91 )]
92 pub include_system: Option<bool>,
93}
94
95#[derive(Debug, Deserialize, JsonSchema)]
96pub struct ConnectParams {
97 #[schemars(description = "Browser DevTools endpoint URL (default http://localhost:9222)")]
98 pub endpoint: String,
99}
100
101pub use daemon8_types::DebugAction;
102
103#[derive(Debug, Deserialize, JsonSchema)]
104#[serde(rename_all = "snake_case")]
105pub enum NetworkPreset {
106 Offline,
107 #[serde(rename = "slow-3g")]
108 Slow3g,
109 #[serde(rename = "fast-3g")]
110 Fast3g,
111 Restore,
112}
113
114impl NetworkPreset {
115 fn as_str(&self) -> &'static str {
116 match self {
117 Self::Offline => "offline",
118 Self::Slow3g => "slow-3g",
119 Self::Fast3g => "fast-3g",
120 Self::Restore => "restore",
121 }
122 }
123}
124
125#[derive(Debug, Deserialize, JsonSchema)]
126#[serde(rename_all = "snake_case")]
127pub enum StoreType {
128 Localstorage,
129 Sessionstorage,
130 Cookie,
131}
132
133impl StoreType {
134 fn as_str(&self) -> &'static str {
135 match self {
136 Self::Localstorage => "localstorage",
137 Self::Sessionstorage => "sessionstorage",
138 Self::Cookie => "cookie",
139 }
140 }
141}
142
143#[derive(Debug, Deserialize, JsonSchema)]
144pub struct ActParams {
145 pub action: DebugAction,
146 #[schemars(description = "Target tab ID (omit for first/default tab)")]
147 pub tab_id: Option<String>,
148 #[schemars(description = "JavaScript expression to evaluate (for eval_js)")]
149 pub expression: Option<String>,
150 #[schemars(description = "CSS selector for element screenshot (for screenshot)")]
151 pub selector: Option<String>,
152 #[schemars(description = "CSS text to inject (for inject_css)")]
153 pub css: Option<String>,
154 #[schemars(description = "Track injected CSS for later revert (for inject_css, default true)")]
155 pub temporary: Option<bool>,
156 #[schemars(
157 description = "Device serial for device screenshot (e.g. 'emulator-5554'). When provided with action='screenshot', captures from the device instead of the browser. Uses host window capture for emulators, ADB for physical devices."
158 )]
159 pub device_serial: Option<String>,
160 #[schemars(
161 description = "Device platform hint: 'android' or 'vega'. Used with device_serial to select the right capture method. Defaults to 'android'."
162 )]
163 pub device_platform: Option<String>,
164 #[schemars(
165 description = "Viewport width in CSS pixels (for set_viewport). iPhone 15=390, Pixel 8=412, iPad=820, desktop=1280"
166 )]
167 pub viewport_width: Option<u32>,
168 #[schemars(
169 description = "Viewport height in CSS pixels (for set_viewport). iPhone 15=844, Pixel 8=915, iPad=1180, desktop=800"
170 )]
171 pub viewport_height: Option<u32>,
172 #[schemars(
173 description = "Device pixel ratio / scale factor (for set_viewport). iPhone 15=3.0, Pixel 8=2.625, iPad=2.0, desktop=1.0"
174 )]
175 pub viewport_scale: Option<f64>,
176 #[schemars(
177 description = "Enable mobile emulation with touch events (for set_viewport). true for mobile devices, false for desktop"
178 )]
179 pub viewport_mobile: Option<bool>,
180 #[schemars(description = "User-agent string override (for set_viewport, optional)")]
181 pub viewport_ua: Option<String>,
182 #[schemars(
183 description = "Network preset for network_conditions. offline=no connectivity, slow-3g=400ms/780Kbps, fast-3g=150ms/1.6Mbps, restore=remove throttling"
184 )]
185 pub network_preset: Option<NetworkPreset>,
186 #[schemars(description = "Storage type for storage_set")]
187 pub store_type: Option<StoreType>,
188 #[schemars(description = "Storage key to read or write (for storage_set)")]
189 pub storage_key: Option<String>,
190 #[schemars(description = "Storage value to write (for storage_set)")]
191 pub storage_value: Option<String>,
192 #[schemars(
193 description = "Comma-separated storage types to clear (for storage_clear): 'cookies', 'local_storage', 'session_storage', 'indexeddb', 'cache_storage', 'service_workers', 'all'. Default: 'all'"
194 )]
195 pub storage_types: Option<String>,
196 #[schemars(description = "X coordinate in CSS pixels (for element_at_point)")]
197 pub x: Option<f64>,
198 #[schemars(description = "Y coordinate in CSS pixels (for element_at_point)")]
199 pub y: Option<f64>,
200 #[schemars(description = "URL to navigate to (for navigate)")]
201 pub url: Option<String>,
202}
203
204#[derive(Debug, Deserialize, JsonSchema)]
205pub struct IngestParams {
206 #[schemars(
207 description = "Your agent or application name (e.g. 'my-agent'). Used for filtering with origins=['app:name']."
208 )]
209 pub app: Option<String>,
210
211 #[schemars(
212 description = "Observation kind: log, metric, query, exception, custom. Defaults to log."
213 )]
214 pub kind: Option<String>,
215
216 #[schemars(
217 description = "Severity: trace, debug, info, warn, error. Defaults to debug. Setting warn or error triggers a real-time alert push to all connected agent sessions."
218 )]
219 pub severity: Option<String>,
220
221 #[schemars(
222 description = "The observation payload (JSON object). Use a 'message' key for clean alert formatting: {\"message\": \"what happened\"}. Additional fields are preserved."
223 )]
224 pub data: serde_json::Value,
225
226 #[schemars(description = "Channel name for custom kind observations.")]
227 pub channel: Option<String>,
228
229 #[schemars(description = "Correlation ID to group related observations across sources")]
230 pub correlation_id: Option<String>,
231
232 #[schemars(description = "Parent observation ID for causal chains")]
233 pub parent_id: Option<u64>,
234
235 #[schemars(
236 description = "Tags for categorization (e.g. [\"reasoning\", \"high-confidence\"])"
237 )]
238 pub tags: Option<Vec<String>>,
239
240 #[schemars(description = "Agent session ID that produced this observation")]
241 pub session_id: Option<String>,
242
243 #[schemars(description = "Daemon instance node ID")]
244 pub node_id: Option<String>,
245}
246
247#[derive(Debug, Deserialize, JsonSchema)]
248pub struct SubscribeParams {
249 #[schemars(
250 description = "Filter by observation kind: log, query, http_exchange, exception, state_snapshot, js_exception, lifecycle, metric, custom. Omit for all kinds."
251 )]
252 pub kinds: Option<Vec<String>>,
253
254 #[schemars(
255 description = "Minimum severity threshold: trace, debug, info, warn, error. Default: warn (only warn and error push alerts)."
256 )]
257 pub severity_min: Option<String>,
258
259 #[schemars(
260 description = "Filter by origin patterns: 'app', 'app:name', 'browser', 'browser:tab_id', 'device', or 'device:serial'. Omit for all origins."
261 )]
262 pub origins: Option<Vec<String>>,
263
264 #[schemars(
265 description = "Search across materialized observation text. Omit for no text filtering."
266 )]
267 pub text_match: Option<String>,
268
269 #[schemars(description = "Filter by correlation ID (exact match)")]
270 pub correlation_id: Option<String>,
271
272 #[schemars(description = "Filter by tags (all listed tags must be present)")]
273 pub tags: Option<Vec<String>>,
274
275 #[schemars(
276 description = "Include system/infrastructure observations (tagged '_system'). Excluded by default."
277 )]
278 pub include_system: Option<bool>,
279}
280
281#[derive(Debug, Deserialize, JsonSchema)]
282pub struct LensParams {
283 #[schemars(
284 description = "Filter by observation kind: log, query, http_exchange, exception, state_snapshot, js_exception, lifecycle, metric, custom"
285 )]
286 pub kinds: Option<Vec<String>>,
287
288 #[schemars(description = "Minimum severity threshold: trace, debug, info, warn, error")]
289 pub severity_min: Option<String>,
290
291 #[schemars(description = "Filter by origin pattern: 'app:name', 'browser', 'device:serial'")]
292 pub origins: Option<Vec<String>>,
293
294 #[schemars(description = "Search across materialized observation text")]
295 pub text_match: Option<String>,
296
297 #[schemars(description = "Filter by correlation ID (exact match)")]
298 pub correlation_id: Option<String>,
299
300 #[schemars(description = "Filter by tags (all listed tags must be present)")]
301 pub tags: Option<Vec<String>>,
302
303 #[schemars(description = "Maximum observations to buffer (default 200)")]
304 pub capacity: Option<usize>,
305}
306
307#[derive(Debug, Deserialize, JsonSchema)]
308pub struct SaveMemoryParams {
309 #[schemars(description = "The memory content to persist")]
310 pub content: String,
311
312 #[schemars(
313 description = "Memory kind: pattern, decision, error_signature, session_summary, user_flagged. Defaults to user_flagged."
314 )]
315 pub kind: Option<String>,
316
317 #[schemars(description = "Tags for categorization and retrieval")]
318 pub tags: Option<Vec<String>>,
319
320 #[schemars(description = "Observation IDs that informed this memory")]
321 pub source_observations: Option<Vec<u64>>,
322
323 #[schemars(description = "Project slug to scope this memory to")]
324 pub project_slug: Option<String>,
325
326 #[schemars(description = "Session ID that produced this memory")]
327 pub session_id: Option<String>,
328
329 #[schemars(description = "Confidence score from 0.0 to 1.0 (default 1.0)")]
330 pub confidence: Option<f64>,
331}
332
333#[derive(Debug, Deserialize, JsonSchema)]
334pub struct QueryMemoryParams {
335 #[schemars(description = "Substring search across memory content")]
336 pub text: Option<String>,
337
338 #[schemars(
339 description = "Filter by kind: pattern, decision, error_signature, session_summary, user_flagged"
340 )]
341 pub kinds: Option<Vec<String>>,
342
343 #[schemars(description = "Filter by tags (all listed tags must be present)")]
344 pub tags: Option<Vec<String>>,
345
346 #[schemars(description = "Filter by project slug")]
347 pub project_slug: Option<String>,
348
349 #[schemars(description = "Maximum number of results (default 20)")]
350 pub limit: Option<u64>,
351}
352
353#[derive(Debug, Deserialize, JsonSchema)]
354pub struct ForgetMemoryParams {
355 #[schemars(description = "The memory ID to delete")]
356 pub id: String,
357 #[schemars(
358 description = "Required to confirm deletion. Must be true to delete the memory; any other value (including absent) returns an error and leaves the memory intact."
359 )]
360 pub confirm: Option<bool>,
361}
362
363#[derive(Debug, Deserialize, JsonSchema)]
364pub struct HelpParams {
365 #[schemars(
366 description = "Help topic: index, debug_session, checkpoint, setup, hooks, lens, memory, observations, envelope, librarian. Omit for index."
367 )]
368 pub topic: Option<String>,
369}
370
371#[derive(Debug, Deserialize, JsonSchema)]
372pub struct LibrarianIndexEdge {
373 #[schemars(
374 description = "Edge kind: has_source | documented_by | fixes | supersedes | child_of"
375 )]
376 pub kind: String,
377 #[schemars(description = "Target catalog node ID for this edge")]
378 pub target_node_id: String,
379}
380
381#[derive(Debug, Deserialize, JsonSchema)]
382pub struct LibrarianIndexParams {
383 #[schemars(description = "Node kind: doc | source_template | fix | project")]
384 pub kind: String,
385 #[schemars(description = "Human-readable name for this reference")]
386 pub label: String,
387 #[schemars(description = "Locator type: file | url | vault")]
388 pub locator_kind: String,
389 #[schemars(description = "The actual pointer — a file path, URL, or vault note path")]
390 pub locator: String,
391 #[schemars(description = "Free-form retrieval tags")]
392 pub tags: Option<Vec<String>>,
393 #[schemars(description = "Project slug to scope this reference")]
394 pub project_slug: Option<String>,
395 #[schemars(description = "Place under an existing catalog node for hierarchy")]
396 pub parent_id: Option<String>,
397 #[schemars(description = "Optional edge to create at index time")]
398 pub edge: Option<LibrarianIndexEdge>,
399 #[schemars(
400 description = "Mark as authoritative reference — canonicalized nodes are never flagged as stale"
401 )]
402 pub canonicalize: Option<bool>,
403}
404
405#[derive(Debug, Deserialize, JsonSchema)]
406pub struct LibrarianLookupParams {
407 #[schemars(description = "Look up a single node by ID (returns node + edges)")]
408 pub id: Option<String>,
409 #[schemars(description = "Filter by kind: doc | source_template | fix | project")]
410 pub kinds: Option<Vec<String>>,
411 #[schemars(description = "Filter by tags")]
412 pub tags: Option<Vec<String>>,
413 #[schemars(description = "Scope to a project")]
414 pub project_slug: Option<String>,
415 #[schemars(description = "Case-insensitive search across label and locator")]
416 pub text: Option<String>,
417 #[schemars(description = "Max results. Default 20, max 500.")]
418 pub limit: Option<u32>,
419 #[schemars(description = "Include superseded/deprecated entries. Default false.")]
420 pub include_deprecated: Option<bool>,
421 #[schemars(description = "Find nodes not accessed in N days")]
422 pub stale_before_days: Option<u32>,
423 #[schemars(description = "Browse children of a specific catalog node")]
424 pub parent_id: Option<String>,
425}
426
427#[derive(Debug, Deserialize, JsonSchema)]
428pub struct LibrarianForgetParams {
429 #[schemars(description = "Catalog node ID to remove or deprecate")]
430 pub id: String,
431 #[schemars(
432 description = "Required for hard delete (deprecate=false). Must be true to proceed."
433 )]
434 pub confirm: Option<bool>,
435 #[schemars(
436 description = "Default true. When true: soft-delete (deprecated_at set). When false and confirm=true: permanent removal."
437 )]
438 pub deprecate: Option<bool>,
439}
440
441#[derive(Debug, Deserialize, JsonSchema)]
442pub struct CreateCheckpointParams {
443 #[schemars(
444 description = "Optional human-readable note about why this checkpoint exists (e.g. \"before applying retry patch\")."
445 )]
446 pub description: Option<String>,
447}
448
449#[derive(Debug, Deserialize, JsonSchema)]
450pub struct StartDebugSessionParams {
451 #[schemars(description = "Project slug to scope the session to (e.g. \"daemon8\").")]
452 pub project: Option<String>,
453 #[schemars(description = "One-line description of what is being investigated.")]
454 pub description: Option<String>,
455 #[schemars(
456 description = "Required. Agent identity in format :host/tool+role> (e.g. :mbp/claude+plan-agent>). Identifies who is running this investigation."
457 )]
458 pub agent_id: String,
459 #[schemars(
460 description = "Optional. Feature being investigated (e.g. 'auth', 'search'). Used by other agents to discover overlapping work."
461 )]
462 pub feature: Option<String>,
463}
464
465#[derive(Debug, Deserialize, JsonSchema)]
466pub struct EndDebugSessionParams {
467 #[schemars(
468 description = "Outcome string. Defaults to \"abandoned\". Use resolve_debug_session for \"resolved\"."
469 )]
470 pub outcome: Option<String>,
471}
472
473#[derive(Debug, Deserialize, JsonSchema)]
474pub struct ResolveDebugSessionParams {
475 #[schemars(description = "Required: human summary of what broke and what fixed it.")]
476 pub summary: String,
477 #[schemars(description = "Optional: one-sentence root cause.")]
478 pub root_cause: Option<String>,
479 #[schemars(description = "Optional: unified diff or short patch text.")]
480 pub fix_diff: Option<String>,
481 #[schemars(description = "Optional: CLI commands that mattered to the fix.")]
482 pub commands_used: Option<Vec<String>>,
483 #[schemars(description = "Optional: error_hash strings this fix resolves.")]
484 pub related_errors: Option<Vec<String>>,
485 #[schemars(description = "Optional: extra tags for retrieval.")]
486 pub tags: Option<Vec<String>>,
487}
488
489#[derive(Debug, Deserialize, JsonSchema)]
490pub struct ListDebugSessionsParams {
491 #[schemars(description = "Filter by status: active, completed, abandoned. Omit for all.")]
492 pub status: Option<String>,
493 #[schemars(
494 description = "Optional. Filter by feature name (e.g. 'auth', 'search'). Returns only sessions investigating that feature."
495 )]
496 pub feature: Option<String>,
497}
498
499#[derive(Debug, Clone, Deserialize, JsonSchema)]
500pub struct SetupToolAction {
501 #[schemars(description = "Setup action: status or apply.")]
502 pub action: String,
503 #[schemars(description = "Project working directory. Defaults to daemon current directory.")]
504 pub cwd: Option<String>,
505 #[schemars(description = "Required to confirm mutating setup_apply.")]
506 pub yes: Option<bool>,
507 #[schemars(
508 description = "Comma-separated providers to configure (e.g. \"claude-code,gemini,codex\"). Omit for auto-detection."
509 )]
510 pub providers: Option<String>,
511}
512
513#[derive(Debug, Deserialize, JsonSchema)]
514pub struct SetupStatusParams {
515 #[schemars(description = "Project working directory. Defaults to daemon current directory.")]
516 pub cwd: Option<String>,
517}
518
519#[derive(Debug, Deserialize, JsonSchema)]
520pub struct SetupPlanParams {
521 #[schemars(description = "Project working directory. Defaults to daemon current directory.")]
522 pub cwd: Option<String>,
523}
524
525#[derive(Debug, Deserialize, JsonSchema)]
526pub struct SetupApplyParams {
527 #[schemars(description = "Project working directory. Defaults to daemon current directory.")]
528 pub cwd: Option<String>,
529 #[schemars(description = "Required to confirm setup_apply writes.")]
530 pub yes: bool,
531 #[schemars(
532 description = "Comma-separated providers to configure (e.g. \"claude-code,gemini,codex\"). Omit for auto-detection."
533 )]
534 pub providers: Option<String>,
535}
536
537pub type SetupToolFn =
538 Arc<dyn Fn(SetupToolAction) -> Pin<Box<dyn Future<Output = String> + Send>> + Send + Sync>;
539
540#[derive(Debug, Clone, Deserialize, JsonSchema)]
541pub struct HooksToolAction {
542 #[schemars(description = "Action: list, remove, update, repair.")]
543 pub action: String,
544 #[schemars(description = "Provider for remove/update: claude or codex.")]
545 pub provider: Option<String>,
546 #[schemars(description = "Scope for remove/update (claude only): local, shared, or global.")]
547 pub scope: Option<String>,
548}
549
550pub type HooksToolFn =
551 Arc<dyn Fn(HooksToolAction) -> Pin<Box<dyn Future<Output = String> + Send>> + Send + Sync>;
552
553pub struct DaemonMcp {
554 store: Arc<dyn StateModel>,
555 memory_store: Option<Arc<dyn MemoryStore>>,
556 debug_session_store: Option<Arc<dyn DebugSessionStore>>,
557 librarian_store: Option<Arc<dyn LibrarianStore>>,
558 active_state: ActiveSessionState,
559 obs_tx: tokio::sync::mpsc::UnboundedSender<Observation>,
560 chrome_tx: tokio::sync::mpsc::Sender<ChromeCommand>,
561 chrome_state: tokio::sync::watch::Receiver<daemon8_chrome::ConnectionState>,
562 chrome_endpoint: Arc<Mutex<Option<Arc<str>>>>,
563 last_checkpoint: Mutex<Checkpoint>,
564 device_screenshot_fn: Option<DeviceScreenshotFn>,
565 screenshot_dir: std::path::PathBuf,
566 subscription_tx: tokio::sync::watch::Sender<Option<Filter>>,
570 broadcast_tx: broadcast::Sender<(Arc<Observation>, Arc<str>)>,
571 lens: Arc<LensManager>,
572 setup_tool_fn: Option<SetupToolFn>,
573 hooks_tool_fn: Option<HooksToolFn>,
574 source_activator: Option<Arc<dyn SourceActivator>>,
575 cancel: tokio_util::sync::CancellationToken,
578 enabled_features: Vec<FeatureGate>,
579 tool_router: ToolRouter<Self>,
580}
581
582pub struct DaemonMcpConfig {
583 pub store: Arc<dyn StateModel>,
584 pub memory_store: Option<Arc<dyn MemoryStore>>,
585 pub debug_session_store: Option<Arc<dyn DebugSessionStore>>,
586 pub librarian_store: Option<Arc<dyn LibrarianStore>>,
587 pub obs_tx: tokio::sync::mpsc::UnboundedSender<Observation>,
588 pub chrome_tx: tokio::sync::mpsc::Sender<ChromeCommand>,
589 pub chrome_state: tokio::sync::watch::Receiver<daemon8_chrome::ConnectionState>,
590 pub chrome_endpoint: Arc<Mutex<Option<Arc<str>>>>,
591 pub device_screenshot_fn: Option<DeviceScreenshotFn>,
592 pub screenshot_dir: std::path::PathBuf,
593 pub broadcast_tx: broadcast::Sender<(Arc<Observation>, Arc<str>)>,
594 pub lens: Arc<LensManager>,
595 pub setup_tool_fn: Option<SetupToolFn>,
596 pub hooks_tool_fn: Option<HooksToolFn>,
597 pub source_activator: Option<Arc<dyn SourceActivator>>,
598 pub cancel: tokio_util::sync::CancellationToken,
601}
602
603#[tool_router(vis = "pub")]
604impl DaemonMcp {
605 pub fn new(cfg: DaemonMcpConfig) -> Self {
606 let mut router = Self::tool_router();
607 router += Self::action_tool_router();
608 router += Self::lens_tool_router();
609 if cfg.memory_store.is_some() {
610 router += Self::memory_tool_router();
611 }
612 if cfg.debug_session_store.is_some() && cfg.memory_store.is_some() {
613 router += Self::debug_session_tool_router();
614 }
615 if cfg.setup_tool_fn.is_some() {
616 router += Self::setup_tool_router();
617 }
618 if cfg.hooks_tool_fn.is_some() {
619 router += Self::hooks_tool_router();
620 }
621 if cfg.librarian_store.is_some() {
622 router += Self::librarian_tool_router();
623 }
624 let mut enabled_features = Vec::new();
625 if cfg.memory_store.is_some() {
626 enabled_features.push(FeatureGate::Memory);
627 }
628 if cfg.debug_session_store.is_some() && cfg.memory_store.is_some() {
629 enabled_features.push(FeatureGate::DebugSession);
630 }
631 if cfg.setup_tool_fn.is_some() {
632 enabled_features.push(FeatureGate::Setup);
633 }
634 if cfg.hooks_tool_fn.is_some() {
635 enabled_features.push(FeatureGate::Hooks);
636 }
637 if cfg.librarian_store.is_some() {
638 enabled_features.push(FeatureGate::Librarian);
639 }
640 let (subscription_tx, _) = tokio::sync::watch::channel::<Option<Filter>>(None);
641 Self {
642 store: cfg.store,
643 memory_store: cfg.memory_store,
644 debug_session_store: cfg.debug_session_store,
645 librarian_store: cfg.librarian_store,
646 active_state: ActiveSessionState::new(),
647 obs_tx: cfg.obs_tx,
648 chrome_tx: cfg.chrome_tx,
649 chrome_state: cfg.chrome_state,
650 chrome_endpoint: cfg.chrome_endpoint,
651 last_checkpoint: Mutex::new(Checkpoint(0)),
652 device_screenshot_fn: cfg.device_screenshot_fn,
653 screenshot_dir: cfg.screenshot_dir,
654 subscription_tx,
655 broadcast_tx: cfg.broadcast_tx,
656 lens: cfg.lens,
657 setup_tool_fn: cfg.setup_tool_fn,
658 hooks_tool_fn: cfg.hooks_tool_fn,
659 source_activator: cfg.source_activator,
660 cancel: cfg.cancel,
661 enabled_features,
662 tool_router: router,
663 }
664 }
665
666 pub fn subscription_rx(&self) -> tokio::sync::watch::Receiver<Option<Filter>> {
667 self.subscription_tx.subscribe()
668 }
669
670 #[cfg(feature = "test-util")]
676 pub fn set_subscription(&self, filter: Option<Filter>) {
677 self.subscription_tx.send_replace(filter);
678 }
679
680 #[cfg(feature = "test-util")]
685 pub fn child_cancel_token(&self) -> tokio_util::sync::CancellationToken {
686 self.cancel.child_token()
687 }
688
689 #[cfg(feature = "test-util")]
690 pub fn help_index_body(&self) -> String {
691 help::build_dynamic_index(&self.enabled_features, self.librarian_store.is_some())
692 }
693
694 #[cfg(feature = "test-util")]
695 pub fn help_topic_body(&self, topic: &str) -> (String, String) {
696 match help::find_topic(topic, &self.enabled_features) {
697 Some(t) => (t.name.to_string(), t.body.to_string()),
698 None => (
699 "index".to_string(),
700 help::build_dynamic_index(&self.enabled_features, self.librarian_store.is_some()),
701 ),
702 }
703 }
704
705 async fn ensure_chrome_connected(&self, timeout: std::time::Duration) -> Result<(), String> {
708 use daemon8_chrome::ConnectionState;
709
710 let state = *self.chrome_state.borrow();
711 match state {
712 ConnectionState::Connected => Ok(()),
713 ConnectionState::Disconnected => {
714 let endpoint = self
715 .chrome_endpoint
716 .lock()
717 .expect("chrome_endpoint mutex poisoned")
718 .as_ref()
719 .map(|s| s.to_string())
720 .unwrap_or_else(|| "http://localhost:9222".to_string());
721 let _ = self
722 .chrome_tx
723 .send(ChromeCommand::Connect { endpoint })
724 .await;
725 let result = self.wait_for_connected(timeout).await;
726 if result.is_ok() {
727 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
728 }
729 result
730 }
731 ConnectionState::Connecting | ConnectionState::Reconnecting => {
732 self.wait_for_connected(timeout).await
733 }
734 }
735 }
736
737 async fn wait_for_connected(&self, timeout: std::time::Duration) -> Result<(), String> {
738 use daemon8_chrome::ConnectionState;
739
740 let mut rx = self.chrome_state.clone();
741 let result = tokio::time::timeout(timeout, async {
742 loop {
743 if *rx.borrow_and_update() == ConnectionState::Connected {
744 return true;
745 }
746 if rx.changed().await.is_err() {
747 return false;
748 }
749 }
750 })
751 .await;
752 match result {
753 Ok(true) => Ok(()),
754 _ => Err(
755 "Browser connection timed out. The daemon will keep retrying in the background."
756 .into(),
757 ),
758 }
759 }
760
761 #[doc = include_str!("../tool_descriptions/query_observations.md")]
762 #[tool(name = "query_observations")]
763 async fn query_observations(&self, Parameters(params): Parameters<ObserveParams>) -> String {
764 let wants_browser = params
766 .origins
767 .as_ref()
768 .is_some_and(|origins| origins.iter().any(|o| o.starts_with("browser")));
769 if wants_browser
770 && let Err(e) = self
771 .ensure_chrome_connected(std::time::Duration::from_secs(10))
772 .await
773 {
774 tracing::warn!("Browser not available for observation: {e}");
777 }
778
779 let kinds = params.kinds.map(Filter::kinds_from_vec);
780
781 let severity_min = params.severity_min.and_then(|s| Filter::parse_severity(&s));
782
783 let origins = params.origins.map(Filter::origins_from_vec);
784
785 let since = params.since_checkpoint.map(Checkpoint);
786
787 let filter = Filter {
788 kinds,
789 severity_min,
790 origins,
791 text_match: params.text_match,
792 since,
793 limit: Some(params.limit.unwrap_or(50).min(500)),
794 correlation_id: params.correlation_id,
795 tags: params.tags,
796 include_system: params.include_system,
797 };
798
799 if let Some(ref sa) = self.source_activator {
800 sa.touch_matching(&filter);
801 }
802
803 match self.store.query(&filter).await {
804 Ok(slice) => {
805 let mut result = serde_json::to_value(&slice).unwrap_or_default();
806
807 if wants_browser {
808 let chrome_state = *self.chrome_state.borrow();
809 result["browser_state"] = serde_json::json!(format!("{chrome_state}"));
810 }
811
812 let lens_obs = self.lens.drain().await;
813 if !lens_obs.is_empty() {
814 result["lens_observations"] =
815 serde_json::to_value(&lens_obs).unwrap_or_default();
816 result["lens_count"] = serde_json::json!(lens_obs.len());
817 }
818
819 self.ok(result)
820 }
821 Err(e) => self.err("query_failed", &e.to_string(), None, None),
822 }
823 }
824
825 #[doc = include_str!("../tool_descriptions/status.md")]
826 #[tool(name = "status")]
827 async fn status(&self) -> String {
828 match self.store.summary().await {
829 Ok(summary) => match serde_json::to_value(&summary) {
830 Ok(mut val) => {
831 if let Some(obj) = val.as_object_mut() {
832 obj.insert(
833 "daemon_version".into(),
834 serde_json::Value::String(env!("CARGO_PKG_VERSION").to_string()),
835 );
836 }
837 self.ok(val)
838 }
839 Err(e) => self.err("serialization_failed", &e.to_string(), None, None),
840 },
841 Err(e) => self.err("summary_failed", &e.to_string(), None, None),
842 }
843 }
844
845 #[tool(
846 name = "daemon8_help",
847 description = "Narrative documentation for daemon8 protocols. Pass topic='index' (or omit) for the topic list. Returns markdown."
848 )]
849 async fn daemon8_help(&self, Parameters(params): Parameters<HelpParams>) -> String {
850 let topic = params.topic.as_deref().unwrap_or("index");
851 if topic == "index" {
852 let body =
853 help::build_dynamic_index(&self.enabled_features, self.librarian_store.is_some());
854 return self.ok(serde_json::json!({ "topic": "index", "body": body }));
855 }
856 match help::find_topic(topic, &self.enabled_features) {
857 Some(t) => self.ok(serde_json::json!({ "topic": t.name, "body": t.body })),
858 None => {
859 let body = help::build_dynamic_index(
860 &self.enabled_features,
861 self.librarian_store.is_some(),
862 );
863 self.ok(serde_json::json!({ "topic": "index", "body": body }))
864 }
865 }
866 }
867
868 #[doc = include_str!("../tool_descriptions/create_checkpoint.md")]
869 #[tool(name = "create_checkpoint")]
870 async fn create_checkpoint(
871 &self,
872 Parameters(params): Parameters<CreateCheckpointParams>,
873 ) -> String {
874 let active = match self.active_state.current_session() {
875 Some(s) => s,
876 None => {
877 return self.err(
878 "no_active_debug_session",
879 "create_checkpoint requires an active debug session",
880 Some("call start_debug_session first"),
881 Some("start_debug_session"),
882 );
883 }
884 };
885 let ds_store = match &self.debug_session_store {
886 Some(s) => s,
887 None => {
888 return self.err(
889 "internal_error",
890 "debug_session store not available",
891 None,
892 None,
893 );
894 }
895 };
896
897 let seq = self.store.checkpoint().await;
898 let now = current_ns();
899 let cp = daemon8_store::DebugCheckpoint {
900 id: None,
901 debug_session_id: active.id.to_string(),
902 description: params.description,
903 created_at: now,
904 seq_at_creation: seq.0,
905 };
906 let cp_id = match ds_store.create_checkpoint(cp).await {
907 Ok(id) => id,
908 Err(e) => return self.err("create_checkpoint_failed", &e.to_string(), None, None),
909 };
910 self.active_state
911 .set_checkpoint(Some(Arc::from(cp_id.as_str())));
912 active.touch(now);
913 *self
914 .last_checkpoint
915 .lock()
916 .expect("last_checkpoint mutex poisoned") = seq;
917 self.ok_with(
918 serde_json::json!({
919 "checkpoint_id": cp_id,
920 "debug_session_id": active.id.as_ref(),
921 "seq_at_creation": seq.0,
922 "created_at": now
923 }),
924 vec!["query_observations"],
925 Some("checkpoint set; query_observations(since_checkpoint=...) shows what comes next"),
926 )
927 }
928
929 #[doc = include_str!("../tool_descriptions/list_connections.md")]
930 #[tool(name = "list_connections")]
931 async fn list_connections(&self) -> String {
932 wrap_inner_result(self, &self.connections_json().await)
933 }
934
935 #[doc = include_str!("../tool_descriptions/ingest_observation.md")]
936 #[tool(name = "ingest_observation")]
937 async fn ingest_observation(&self, Parameters(params): Parameters<IngestParams>) -> String {
938 let mut body = serde_json::Map::new();
939 if let Some(app) = params.app {
940 body.insert("app".into(), serde_json::Value::String(app));
941 } else {
942 body.insert("app".into(), serde_json::Value::String("agent".into()));
943 }
944 if let Some(kind) = params.kind {
945 body.insert("kind".into(), serde_json::Value::String(kind));
946 }
947 if let Some(severity) = params.severity {
948 body.insert("severity".into(), serde_json::Value::String(severity));
949 }
950 if let Some(channel) = params.channel {
951 body.insert("channel".into(), serde_json::Value::String(channel));
952 }
953 if let Some(cid) = params.correlation_id {
954 body.insert("correlation_id".into(), serde_json::Value::String(cid));
955 }
956 if let Some(pid) = params.parent_id {
957 body.insert("parent_id".into(), serde_json::Value::Number(pid.into()));
958 }
959 if let Some(tags) = params.tags {
960 body.insert(
961 "tags".into(),
962 serde_json::Value::Array(tags.into_iter().map(serde_json::Value::String).collect()),
963 );
964 }
965 if let Some(sid) = params.session_id {
966 body.insert("session_id".into(), serde_json::Value::String(sid));
967 }
968 if let Some(nid) = params.node_id {
969 body.insert("node_id".into(), serde_json::Value::String(nid));
970 }
971 body.insert("data".into(), params.data);
972
973 let mut obs = daemon8_ingest::normalize::normalize(serde_json::Value::Object(body));
974
975 if let Some(ref session) = self.active_state.current_session() {
979 obs.debug_session_id = Some(session.id.clone());
980 let slug_tag = format!("project:{}", session.project_slug);
981 obs.tags = Some(match obs.tags {
982 Some(mut existing) => {
983 if !existing.contains(&slug_tag) {
984 existing.push(slug_tag);
985 }
986 existing
987 }
988 None => vec![slug_tag],
989 });
990 }
991 if let Some(cp) = self.active_state.current_checkpoint() {
992 obs.checkpoint_id = Some(cp);
993 }
994
995 if let Err(e) = self.obs_tx.send(obs) {
996 tracing::warn!(
997 origin = ?e.0.origin,
998 kind = %e.0.kind.tag(),
999 severity = %e.0.severity,
1000 "MCP ingest failed: observation channel closed"
1001 );
1002 return self.err(
1003 "daemon_shutting_down",
1004 "Daemon is shutting down.",
1005 None,
1006 None,
1007 );
1008 }
1009
1010 self.ok(serde_json::json!({"ok": true}))
1011 }
1012
1013 #[doc = include_str!("../tool_descriptions/subscribe_observations.md")]
1014 #[tool(name = "subscribe_observations")]
1015 async fn subscribe_observations(
1016 &self,
1017 Parameters(params): Parameters<SubscribeParams>,
1018 ) -> String {
1019 let kinds = params.kinds.map(Filter::kinds_from_vec);
1020
1021 let severity_min = params.severity_min.and_then(|s| Filter::parse_severity(&s));
1022
1023 let origins = params.origins.map(Filter::origins_from_vec);
1024
1025 let filter = Filter {
1026 kinds,
1027 severity_min,
1028 origins,
1029 text_match: params.text_match,
1030 since: None,
1031 limit: None,
1032 correlation_id: params.correlation_id,
1033 tags: params.tags,
1034 include_system: params.include_system,
1035 };
1036
1037 if let Some(ref sa) = self.source_activator {
1038 sa.touch_matching(&filter);
1039 }
1040
1041 let is_default = filter.kinds.is_none()
1042 && filter.severity_min.is_none()
1043 && filter.origins.is_none()
1044 && filter.text_match.is_none()
1045 && filter.correlation_id.is_none()
1046 && filter.tags.is_none()
1047 && filter.include_system.is_none();
1048
1049 if is_default {
1050 self.subscription_tx.send_replace(None);
1051 self.ok(serde_json::json!({
1052 "subscribed": true,
1053 "filter": "default (severity >= warn)"
1054 }))
1055 } else {
1056 self.subscription_tx.send_replace(Some(filter));
1057 self.ok(serde_json::json!({
1058 "subscribed": true,
1059 "filter": "custom"
1060 }))
1061 }
1062 }
1063}
1064
1065#[tool_router(router = action_tool_router, vis = "pub")]
1066impl DaemonMcp {
1067 #[doc = include_str!("../tool_descriptions/connect_browser.md")]
1068 #[tool(name = "connect_browser")]
1069 async fn connect_browser(&self, Parameters(params): Parameters<ConnectParams>) -> String {
1070 self.connect_browser_inner(params).await
1071 }
1072
1073 #[doc = include_str!("../tool_descriptions/issue_command.md")]
1074 #[tool(name = "issue_command")]
1075 async fn issue_command(&self, Parameters(params): Parameters<ActParams>) -> String {
1076 self.issue_command_inner(params).await
1077 }
1078}
1079
1080#[tool_router(router = lens_tool_router, vis = "pub")]
1081impl DaemonMcp {
1082 #[doc = include_str!("../tool_descriptions/set_lens.md")]
1083 #[tool(name = "set_lens")]
1084 async fn set_lens(&self, Parameters(params): Parameters<LensParams>) -> String {
1085 let filter = Filter {
1086 kinds: params.kinds.map(Filter::kinds_from_vec),
1087 severity_min: params.severity_min.and_then(|s| Filter::parse_severity(&s)),
1088 origins: params.origins.map(Filter::origins_from_vec),
1089 text_match: params.text_match,
1090 since: None,
1091 limit: None,
1092 correlation_id: params.correlation_id,
1093 tags: params.tags,
1094 include_system: None,
1095 };
1096
1097 if let Some(ref sa) = self.source_activator {
1098 sa.touch_matching(&filter);
1099 }
1100 let capacity = params.capacity.unwrap_or(200).min(1000);
1101 self.lens.set_with_capacity(filter, capacity).await;
1102
1103 let status = self.lens.status().await;
1104 self.ok(serde_json::to_value(&status).unwrap_or(serde_json::Value::Null))
1105 }
1106
1107 #[doc = include_str!("../tool_descriptions/clear_lens.md")]
1108 #[tool(name = "clear_lens")]
1109 async fn clear_lens(&self) -> String {
1110 self.lens.clear().await;
1111 self.ok(serde_json::json!({"cleared": true}))
1112 }
1113
1114 #[doc = include_str!("../tool_descriptions/lens_status.md")]
1115 #[tool(name = "lens_status")]
1116 async fn lens_status(&self) -> String {
1117 let status = self.lens.status().await;
1118 self.ok(serde_json::to_value(&status).unwrap_or(serde_json::Value::Null))
1119 }
1120}
1121
1122#[tool_router(router = memory_tool_router, vis = "pub")]
1123impl DaemonMcp {
1124 #[doc = include_str!("../tool_descriptions/save_memory.md")]
1125 #[tool(name = "save_memory")]
1126 async fn save_memory(&self, Parameters(params): Parameters<SaveMemoryParams>) -> String {
1127 let mem_store = match &self.memory_store {
1128 Some(s) => s,
1129 None => {
1130 return self.err(
1131 "memory_store_unavailable",
1132 "memory store not available",
1133 None,
1134 None,
1135 );
1136 }
1137 };
1138 let hint = if self.librarian_store.is_some() {
1139 detect_librarian_hint(¶ms.content)
1140 } else {
1141 None
1142 };
1143 let inner = save_memory_inner(mem_store.as_ref(), params).await;
1144 match hint {
1145 Some(h) => match serde_json::from_str::<serde_json::Value>(&inner) {
1146 Ok(v) if v.get("error").is_none() => self.ok_with(v, vec![], Some(&h)),
1147 _ => wrap_inner_result(self, &inner),
1148 },
1149 None => wrap_inner_result(self, &inner),
1150 }
1151 }
1152
1153 #[doc = include_str!("../tool_descriptions/query_memory.md")]
1154 #[tool(name = "query_memory")]
1155 async fn query_memory(&self, Parameters(params): Parameters<QueryMemoryParams>) -> String {
1156 let mem_store = match &self.memory_store {
1157 Some(s) => s,
1158 None => {
1159 return self.err(
1160 "memory_store_unavailable",
1161 "memory store not available",
1162 None,
1163 None,
1164 );
1165 }
1166 };
1167 let inner = query_memory_inner(mem_store.as_ref(), params).await;
1168 wrap_inner_result(self, &inner)
1169 }
1170
1171 #[doc = include_str!("../tool_descriptions/forget_memory.md")]
1172 #[tool(name = "forget_memory")]
1173 async fn forget_memory(&self, Parameters(params): Parameters<ForgetMemoryParams>) -> String {
1174 if let Err(msg) = check_forget_memory_confirm(params.confirm) {
1175 return self.err(
1176 "missing_confirm",
1177 &msg,
1178 Some("pass confirm=true to acknowledge deletion"),
1179 None,
1180 );
1181 }
1182
1183 let mem_store = match &self.memory_store {
1184 Some(s) => s,
1185 None => {
1186 return self.err(
1187 "memory_store_unavailable",
1188 "memory store not available",
1189 None,
1190 None,
1191 );
1192 }
1193 };
1194
1195 match mem_store.forget_memory(¶ms.id).await {
1196 Ok(existed) => self.ok(serde_json::json!({ "deleted": existed })),
1197 Err(e) => self.err("forget_memory_failed", &e.to_string(), None, None),
1198 }
1199 }
1200}
1201
1202fn wrap_inner_result(daemon: &DaemonMcp, raw: &str) -> String {
1207 match serde_json::from_str::<serde_json::Value>(raw) {
1208 Ok(v) => {
1209 if let Some(err_obj) = v.get("error").and_then(|e| e.as_object()) {
1214 let code = err_obj
1215 .get("code")
1216 .and_then(|c| c.as_str())
1217 .unwrap_or("internal_error");
1218 let message = err_obj
1219 .get("message")
1220 .and_then(|m| m.as_str())
1221 .unwrap_or("(no message)");
1222 return daemon.err(code, message, None, None);
1223 }
1224 daemon.ok(v)
1225 }
1226 Err(_) => daemon.ok(serde_json::json!({"raw": raw})),
1227 }
1228}
1229
1230fn check_forget_memory_confirm(confirm: Option<bool>) -> Result<(), String> {
1234 match confirm {
1235 Some(true) => Ok(()),
1236 _ => Err("forget_memory requires confirm=true to delete the memory".into()),
1237 }
1238}
1239
1240fn validate_agent_id(id: &str) -> Result<(), String> {
1244 if id.len() > 64 {
1245 return Err("agent_id must be at most 64 characters".into());
1246 }
1247 let body = id
1248 .strip_prefix(':')
1249 .and_then(|s| s.strip_suffix('>'))
1250 .ok_or("agent_id must start with ':' and end with '>'")?;
1251 let (host_rest, role) = body
1252 .split_once('+')
1253 .ok_or("agent_id must contain '+' separating tool from role")?;
1254 let (host, tool) = host_rest
1255 .split_once('/')
1256 .ok_or("agent_id must contain '/' separating host from tool")?;
1257 if host.is_empty() || tool.is_empty() || role.is_empty() {
1258 return Err("host, tool, and role must be non-empty".into());
1259 }
1260 let valid_segment = |s: &str| {
1261 s.chars()
1262 .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-')
1263 };
1264 if !valid_segment(host) || !valid_segment(tool) || !valid_segment(role) {
1265 return Err("agent_id segments must be lowercase alphanumeric with hyphens only".into());
1266 }
1267 Ok(())
1268}
1269
1270#[tool_router(router = debug_session_tool_router, vis = "pub")]
1271impl DaemonMcp {
1272 #[doc = include_str!("../tool_descriptions/start_debug_session.md")]
1273 #[tool(name = "start_debug_session")]
1274 async fn start_debug_session(
1275 &self,
1276 Parameters(params): Parameters<StartDebugSessionParams>,
1277 ) -> String {
1278 let ds_store = match &self.debug_session_store {
1279 Some(s) => s,
1280 None => {
1281 return self.err(
1282 "debug_session_unavailable",
1283 "debug_session store not available",
1284 Some("ensure setup_apply has run"),
1285 Some("setup_apply"),
1286 );
1287 }
1288 };
1289 if let Some(existing) = self.active_state.current_session() {
1290 return self.err(
1291 "already_active_debug_session",
1292 &format!("session {} is already active", existing.id),
1293 Some(
1294 "call end_debug_session(outcome=\"abandoned\") or resolve_debug_session first",
1295 ),
1296 Some("end_debug_session"),
1297 );
1298 }
1299 if let Err(msg) = validate_agent_id(¶ms.agent_id) {
1300 return self.err(
1301 "invalid_agent_id",
1302 &msg,
1303 Some("agent_id format: :host/tool+role> (e.g. :mbp/claude+plan-agent>)"),
1304 None,
1305 );
1306 }
1307 let now = current_ns();
1308 let session = daemon8_store::DebugSession {
1309 id: None,
1310 started_at: now,
1311 ended_at: None,
1312 last_activity: now,
1313 project_slug: params.project.unwrap_or_else(|| "unknown".into()),
1314 description: params.description,
1315 status: daemon8_types::DebugSessionStatus::Active,
1316 outcome: None,
1317 summary_memory_id: None,
1318 agent_id: params.agent_id.clone(),
1319 feature: params.feature.clone(),
1320 };
1321 match ds_store.start_debug_session(session.clone()).await {
1322 Ok(id) => {
1323 self.active_state
1324 .set_session(Some(daemon8_store::ActiveDebugSession {
1325 id: Arc::from(id.as_str()),
1326 project_slug: Arc::from(session.project_slug.as_str()),
1327 started_at_ns: now,
1328 last_activity_ns: Arc::new(AtomicU64::new(now)),
1329 agent_id: Arc::from(params.agent_id.as_str()),
1330 feature: params.feature.as_deref().map(Arc::from),
1331 }));
1332 self.ok_with(
1333 serde_json::json!({
1334 "debug_session_id": id,
1335 "started_at": now,
1336 }),
1337 vec!["create_checkpoint", "query_observations"],
1338 Some("debug session opened; checkpoint before any change you might want to roll back through"),
1339 )
1340 }
1341 Err(e) => self.err("start_debug_session_failed", &e.to_string(), None, None),
1342 }
1343 }
1344
1345 #[doc = include_str!("../tool_descriptions/end_debug_session.md")]
1346 #[tool(name = "end_debug_session")]
1347 async fn end_debug_session(
1348 &self,
1349 Parameters(params): Parameters<EndDebugSessionParams>,
1350 ) -> String {
1351 end_or_resolve_inner(
1352 self,
1353 EndIntent::Abandon {
1354 outcome_str: params.outcome,
1355 },
1356 )
1357 .await
1358 }
1359
1360 #[doc = include_str!("../tool_descriptions/resolve_debug_session.md")]
1361 #[tool(name = "resolve_debug_session")]
1362 async fn resolve_debug_session(
1363 &self,
1364 Parameters(params): Parameters<ResolveDebugSessionParams>,
1365 ) -> String {
1366 end_or_resolve_inner(self, EndIntent::Resolve(params)).await
1367 }
1368
1369 #[doc = include_str!("../tool_descriptions/list_debug_sessions.md")]
1370 #[tool(name = "list_debug_sessions")]
1371 async fn list_debug_sessions(
1372 &self,
1373 Parameters(params): Parameters<ListDebugSessionsParams>,
1374 ) -> String {
1375 let ds_store = match &self.debug_session_store {
1376 Some(s) => s,
1377 None => {
1378 return self.err(
1379 "debug_session_unavailable",
1380 "debug_session store not available",
1381 Some("ensure setup_apply has run"),
1382 Some("setup_apply"),
1383 );
1384 }
1385 };
1386 let status = match params.status.as_deref() {
1387 Some(s) => match s.parse::<daemon8_types::DebugSessionStatus>() {
1388 Ok(v) => Some(v),
1389 Err(e) => return self.err("bad_status", &e, None, None),
1390 },
1391 None => None,
1392 };
1393 match ds_store.list_debug_sessions(status).await {
1394 Ok(mut sessions) => {
1395 if let Some(ref feat) = params.feature {
1396 sessions.retain(|s| s.feature.as_deref() == Some(feat.as_str()));
1397 }
1398 self.ok(serde_json::json!({
1399 "count": sessions.len(),
1400 "sessions": sessions,
1401 }))
1402 }
1403 Err(e) => self.err("list_debug_sessions_failed", &e.to_string(), None, None),
1404 }
1405 }
1406}
1407
1408enum EndIntent {
1409 Abandon { outcome_str: Option<String> },
1410 Resolve(ResolveDebugSessionParams),
1411}
1412
1413async fn end_or_resolve_inner(daemon: &DaemonMcp, intent: EndIntent) -> String {
1414 let ds_store = match &daemon.debug_session_store {
1415 Some(s) => s,
1416 None => {
1417 return daemon.err(
1418 "debug_session_unavailable",
1419 "debug_session store not available",
1420 None,
1421 None,
1422 );
1423 }
1424 };
1425 let mem_store = match &daemon.memory_store {
1426 Some(s) => s,
1427 None => {
1428 return daemon.err(
1429 "memory_store_unavailable",
1430 "memory store not available",
1431 None,
1432 None,
1433 );
1434 }
1435 };
1436 let active = match daemon.active_state.current_session() {
1437 Some(s) => s,
1438 None => {
1439 return daemon.err(
1440 "no_active_debug_session",
1441 "no active debug session to end/resolve",
1442 Some("call start_debug_session first"),
1443 Some("start_debug_session"),
1444 );
1445 }
1446 };
1447 let now = current_ns();
1448
1449 let checkpoints = ds_store
1452 .list_checkpoints(active.id.as_ref())
1453 .await
1454 .unwrap_or_default();
1455 let mut source_observations: Vec<u64> =
1456 checkpoints.iter().map(|cp| cp.seq_at_creation).collect();
1457
1458 let (outcome, summary_text, tags, data_blob) = match intent {
1459 EndIntent::Abandon { outcome_str } => {
1460 let outcome = outcome_str
1461 .as_deref()
1462 .and_then(|s| s.parse::<daemon8_types::DebugSessionOutcome>().ok())
1463 .unwrap_or(daemon8_types::DebugSessionOutcome::Abandoned);
1464 let summary = format!(
1465 "Debug session abandoned. Project: {}, started_at_ns: {}, checkpoints: {}.",
1466 active.project_slug,
1467 active.started_at_ns,
1468 checkpoints.len()
1469 );
1470 let tags = vec![
1471 "kind:debug_session_summary".to_string(),
1472 format!("project:{}", active.project_slug),
1473 format!("outcome:{}", outcome),
1474 ];
1475 (outcome, summary, tags, None)
1476 }
1477 EndIntent::Resolve(params) => {
1478 let mut tags = vec![
1479 "kind:debug_session_summary".to_string(),
1480 format!("project:{}", active.project_slug),
1481 "outcome:resolved".to_string(),
1482 ];
1483 if let Some(extra) = ¶ms.tags {
1484 tags.extend(extra.iter().cloned());
1485 }
1486 if let Some(errs) = ¶ms.related_errors {
1489 tags.extend(errs.iter().map(|h| format!("hash:{h}")));
1490 }
1491 let mut data = serde_json::Map::new();
1492 if let Some(rc) = ¶ms.root_cause {
1493 data.insert("root_cause".into(), serde_json::json!(rc));
1494 }
1495 if let Some(diff) = ¶ms.fix_diff {
1496 data.insert("fix_diff".into(), serde_json::json!(diff));
1497 }
1498 if let Some(cmds) = ¶ms.commands_used {
1499 data.insert("commands_used".into(), serde_json::json!(cmds));
1500 }
1501 if let Some(errs) = ¶ms.related_errors {
1502 data.insert("related_errors".into(), serde_json::json!(errs));
1503 }
1504 data.insert(
1505 "checkpoint_count".into(),
1506 serde_json::json!(checkpoints.len()),
1507 );
1508 data.insert(
1509 "started_at_ns".into(),
1510 serde_json::json!(active.started_at_ns),
1511 );
1512 data.insert("ended_at_ns".into(), serde_json::json!(now));
1513 (
1514 daemon8_types::DebugSessionOutcome::Resolved,
1515 params.summary,
1516 tags,
1517 Some(serde_json::Value::Object(data)),
1518 )
1519 }
1520 };
1521
1522 if source_observations.len() > 50 {
1524 let drop = source_observations.len() - 50;
1525 source_observations.drain(0..drop);
1526 }
1527
1528 let mem = daemon8_store::Memory {
1529 id: None,
1530 created_at: now,
1531 updated_at: now,
1532 kind: daemon8_types::MemoryKind::SessionSummary,
1533 content: summary_text,
1534 source_observations,
1535 tags,
1536 project_slug: active.project_slug.to_string(),
1537 session_id: Some(active.id.to_string()),
1538 confidence: 1.0,
1539 data: data_blob,
1540 };
1541
1542 let summary_memory_id = match mem_store.save_memory(mem).await {
1543 Ok(id) => id,
1544 Err(e) => {
1545 return daemon.err("session_summary_save_failed", &e.to_string(), None, None);
1546 }
1547 };
1548
1549 let status = match outcome {
1550 daemon8_types::DebugSessionOutcome::Resolved => {
1551 daemon8_types::DebugSessionStatus::Completed
1552 }
1553 daemon8_types::DebugSessionOutcome::Abandoned
1554 | daemon8_types::DebugSessionOutcome::InProgress => {
1555 daemon8_types::DebugSessionStatus::Abandoned
1556 }
1557 };
1558
1559 if let Err(e) = ds_store
1560 .end_debug_session(
1561 active.id.as_ref(),
1562 status,
1563 Some(outcome),
1564 Some(summary_memory_id.clone()),
1565 now,
1566 )
1567 .await
1568 {
1569 return daemon.err(
1570 "end_debug_session_db_update_failed",
1571 &e.to_string(),
1572 None,
1573 None,
1574 );
1575 }
1576
1577 daemon.active_state.clear();
1578
1579 daemon.ok_with(
1580 serde_json::json!({
1581 "debug_session_id": active.id.as_ref(),
1582 "summary_memory_id": summary_memory_id,
1583 "checkpoint_count": checkpoints.len(),
1584 }),
1585 vec!["start_debug_session", "query_memory"],
1586 Some("session closed; start_debug_session for the next investigation"),
1587 )
1588}
1589
1590fn current_ns() -> u64 {
1591 std::time::SystemTime::now()
1592 .duration_since(std::time::UNIX_EPOCH)
1593 .unwrap_or_default()
1594 .as_nanos() as u64
1595}
1596
1597impl DaemonMcp {
1598 pub(crate) fn current_meta(&self) -> DaemonMeta {
1604 let mut meta = DaemonMeta::default();
1605 if let Some(s) = self.active_state.current_session() {
1606 meta.active_debug_session = Some(ActiveSessionEcho {
1607 id: s.id.to_string(),
1608 project_slug: s.project_slug.to_string(),
1609 started_at_ns: s.started_at_ns,
1610 });
1611 }
1612 meta
1613 }
1614
1615 pub(crate) fn current_meta_with(
1616 &self,
1617 next_actions: Vec<&str>,
1618 hint: Option<&str>,
1619 ) -> DaemonMeta {
1620 let mut meta = self.current_meta();
1621 if !next_actions.is_empty() {
1622 meta.next_actions = Some(next_actions.into_iter().map(String::from).collect());
1623 }
1624 if let Some(h) = hint {
1625 meta.hint = Some(h.to_string());
1626 }
1627 meta
1628 }
1629
1630 pub(crate) fn ok(&self, value: serde_json::Value) -> String {
1631 envelope::ok_value(value, self.current_meta())
1632 }
1633
1634 pub(crate) fn ok_with(
1635 &self,
1636 value: serde_json::Value,
1637 next_actions: Vec<&str>,
1638 hint: Option<&str>,
1639 ) -> String {
1640 envelope::ok_value(value, self.current_meta_with(next_actions, hint))
1641 }
1642
1643 pub(crate) fn err(
1644 &self,
1645 code: &str,
1646 message: &str,
1647 hint: Option<&str>,
1648 fix_tool: Option<&str>,
1649 ) -> String {
1650 envelope::err(code, message, hint, fix_tool, self.current_meta())
1651 }
1652}
1653
1654#[tool_router(router = setup_tool_router, vis = "pub")]
1655impl DaemonMcp {
1656 #[doc = include_str!("../tool_descriptions/setup_status.md")]
1657 #[tool(name = "setup_status")]
1658 async fn setup_status(&self, Parameters(params): Parameters<SetupStatusParams>) -> String {
1659 let inner = self
1660 .call_setup_tool(SetupToolAction {
1661 action: "status".into(),
1662 cwd: params.cwd,
1663 yes: None,
1664 providers: None,
1665 })
1666 .await;
1667 wrap_inner_result(self, &inner)
1668 }
1669
1670 #[doc = include_str!("../tool_descriptions/setup_plan.md")]
1671 #[tool(name = "setup_plan")]
1672 async fn setup_plan(&self, Parameters(params): Parameters<SetupPlanParams>) -> String {
1673 let inner = self
1674 .call_setup_tool(SetupToolAction {
1675 action: "status".into(),
1676 cwd: params.cwd,
1677 yes: None,
1678 providers: None,
1679 })
1680 .await;
1681 wrap_inner_result(self, &inner)
1682 }
1683
1684 #[doc = include_str!("../tool_descriptions/setup_apply.md")]
1685 #[tool(name = "setup_apply")]
1686 async fn setup_apply(&self, Parameters(params): Parameters<SetupApplyParams>) -> String {
1687 let inner = self
1688 .call_setup_tool(SetupToolAction {
1689 action: "apply".into(),
1690 cwd: params.cwd,
1691 yes: Some(params.yes),
1692 providers: params.providers,
1693 })
1694 .await;
1695 wrap_inner_result(self, &inner)
1696 }
1697}
1698
1699#[tool_router(router = hooks_tool_router, vis = "pub")]
1700impl DaemonMcp {
1701 #[doc = include_str!("../tool_descriptions/hooks_list.md")]
1702 #[tool(name = "hooks_list")]
1703 async fn hooks_list(&self) -> String {
1704 let inner = self
1705 .call_hooks_tool(HooksToolAction {
1706 action: "list".into(),
1707 provider: None,
1708 scope: None,
1709 })
1710 .await;
1711 wrap_inner_result(self, &inner)
1712 }
1713
1714 #[doc = include_str!("../tool_descriptions/hooks_remove.md")]
1715 #[tool(name = "hooks_remove")]
1716 async fn hooks_remove(&self, Parameters(params): Parameters<HooksToolAction>) -> String {
1717 let inner = self
1718 .call_hooks_tool(HooksToolAction {
1719 action: "remove".into(),
1720 provider: params.provider,
1721 scope: params.scope,
1722 })
1723 .await;
1724 wrap_inner_result(self, &inner)
1725 }
1726
1727 #[doc = include_str!("../tool_descriptions/hooks_update.md")]
1728 #[tool(name = "hooks_update")]
1729 async fn hooks_update(&self, Parameters(params): Parameters<HooksToolAction>) -> String {
1730 let inner = self
1731 .call_hooks_tool(HooksToolAction {
1732 action: "update".into(),
1733 provider: params.provider,
1734 scope: params.scope,
1735 })
1736 .await;
1737 wrap_inner_result(self, &inner)
1738 }
1739
1740 #[doc = include_str!("../tool_descriptions/hooks_repair.md")]
1741 #[tool(name = "hooks_repair")]
1742 async fn hooks_repair(&self) -> String {
1743 let inner = self
1744 .call_hooks_tool(HooksToolAction {
1745 action: "repair".into(),
1746 provider: None,
1747 scope: None,
1748 })
1749 .await;
1750 wrap_inner_result(self, &inner)
1751 }
1752}
1753
1754#[tool_router(router = librarian_tool_router, vis = "pub")]
1755impl DaemonMcp {
1756 #[doc = include_str!("../tool_descriptions/librarian_index.md")]
1757 #[tool(name = "librarian_index")]
1758 async fn librarian_index(
1759 &self,
1760 Parameters(params): Parameters<LibrarianIndexParams>,
1761 ) -> String {
1762 let lib_store = match &self.librarian_store {
1763 Some(s) => s,
1764 None => {
1765 return self.err(
1766 "librarian_store_unavailable",
1767 "librarian catalog not configured",
1768 None,
1769 None,
1770 );
1771 }
1772 };
1773 let inner = librarian_index_inner(lib_store.as_ref(), params).await;
1774 match serde_json::from_str::<serde_json::Value>(&inner) {
1775 Ok(v) if v.get("error").is_some() => wrap_inner_result(self, &inner),
1776 Ok(v) => {
1777 let mut hints = Vec::new();
1778 let kind = v.get("kind").and_then(|k| k.as_str()).unwrap_or("");
1779 let version = v.get("version").and_then(|v| v.as_str()).unwrap_or("");
1780 match kind {
1781 "project" => hints.push(
1782 "Next: index its documentation and source configs with edges linking back.",
1783 ),
1784 "fix" => hints.push(
1785 "Consider linking this fix to the error it resolves with edge kind 'fixes'.",
1786 ),
1787 _ => {}
1788 }
1789 if version.matches('.').count() > 2 {
1790 hints.push("Previous version deprecated and linked via supersedes edge.");
1791 }
1792 if v.get("parent_id").and_then(|p| p.as_str()).is_none() && kind != "project" {
1793 hints.push("Consider organizing under a parent node for hierarchy.");
1794 }
1795 let hint = if hints.is_empty() {
1796 None
1797 } else {
1798 Some(hints.join(" "))
1799 };
1800 self.ok_with(v, vec!["librarian_lookup"], hint.as_deref())
1801 }
1802 Err(_) => wrap_inner_result(self, &inner),
1803 }
1804 }
1805
1806 #[doc = include_str!("../tool_descriptions/librarian_lookup.md")]
1807 #[tool(name = "librarian_lookup")]
1808 async fn librarian_lookup(
1809 &self,
1810 Parameters(params): Parameters<LibrarianLookupParams>,
1811 ) -> String {
1812 let lib_store = match &self.librarian_store {
1813 Some(s) => s,
1814 None => {
1815 return self.err(
1816 "librarian_store_unavailable",
1817 "librarian catalog not configured",
1818 None,
1819 None,
1820 );
1821 }
1822 };
1823 let inner = librarian_lookup_inner(lib_store.as_ref(), params).await;
1824 match serde_json::from_str::<serde_json::Value>(&inner) {
1825 Ok(v) if v.get("error").is_some() => wrap_inner_result(self, &inner),
1826 Ok(v) => {
1827 let mut hints = Vec::new();
1828 if let Some(nodes) = v.get("nodes").and_then(|n| n.as_array()) {
1829 let thirty_days_ago_ns = std::time::SystemTime::now()
1830 .duration_since(std::time::UNIX_EPOCH)
1831 .unwrap_or_default()
1832 .as_nanos() as u64
1833 - 30 * 86_400_000_000_000;
1834 let stale_count = nodes
1835 .iter()
1836 .filter(|n| {
1837 if n.get("canonicalized_at").and_then(|c| c.as_u64()).is_some() {
1838 return false;
1839 }
1840 n.get("last_read_at")
1841 .and_then(|r| r.as_u64())
1842 .is_none_or(|ts| ts < thirty_days_ago_ns)
1843 })
1844 .count();
1845 if stale_count > 0 {
1846 hints.push(
1847 "Some results haven't been accessed in over 30 days. Consider reviewing and deprecating stale entries with librarian_forget(deprecate=true).",
1848 );
1849 }
1850 }
1851 let hint = if hints.is_empty() {
1852 None
1853 } else {
1854 Some(hints.join(" "))
1855 };
1856 self.ok_with(v, vec![], hint.as_deref())
1857 }
1858 Err(_) => wrap_inner_result(self, &inner),
1859 }
1860 }
1861
1862 #[doc = include_str!("../tool_descriptions/librarian_forget.md")]
1863 #[tool(name = "librarian_forget")]
1864 async fn librarian_forget(
1865 &self,
1866 Parameters(params): Parameters<LibrarianForgetParams>,
1867 ) -> String {
1868 let lib_store = match &self.librarian_store {
1869 Some(s) => s,
1870 None => {
1871 return self.err(
1872 "librarian_store_unavailable",
1873 "librarian catalog not configured",
1874 None,
1875 None,
1876 );
1877 }
1878 };
1879 let deprecate = params.deprecate.unwrap_or(true);
1880 if deprecate {
1881 match lib_store.deprecate_node(¶ms.id).await {
1882 Ok(existed) => self.ok(serde_json::json!({ "deprecated": existed })),
1883 Err(e) => self.err("librarian_forget_failed", &e.to_string(), None, None),
1884 }
1885 } else {
1886 if params.confirm != Some(true) {
1887 return self.err(
1888 "missing_confirm",
1889 "hard delete requires confirm=true",
1890 Some("pass confirm=true to permanently delete, or use deprecate=true (default) for soft-delete"),
1891 None,
1892 );
1893 }
1894 match lib_store.forget_node(¶ms.id).await {
1895 Ok(existed) => self.ok(serde_json::json!({ "deleted": existed })),
1896 Err(e) => self.err("librarian_forget_failed", &e.to_string(), None, None),
1897 }
1898 }
1899 }
1900}
1901
1902impl DaemonMcp {
1904 async fn call_setup_tool(&self, action: SetupToolAction) -> String {
1905 match &self.setup_tool_fn {
1906 Some(f) => f(action).await,
1907 None => error_json("setup tools not available"),
1908 }
1909 }
1910
1911 async fn call_hooks_tool(&self, action: HooksToolAction) -> String {
1912 match &self.hooks_tool_fn {
1913 Some(f) => f(action).await,
1914 None => error_json("hooks tools not available"),
1915 }
1916 }
1917
1918 async fn connect_browser_inner(&self, params: ConnectParams) -> String {
1919 let endpoint = params.endpoint.clone();
1920 *self
1921 .chrome_endpoint
1922 .lock()
1923 .expect("chrome_endpoint mutex poisoned") = Some(Arc::from(endpoint.as_str()));
1924 match self
1925 .chrome_tx
1926 .send(ChromeCommand::Connect {
1927 endpoint: params.endpoint,
1928 })
1929 .await
1930 {
1931 Ok(()) => {
1932 tracing::info!(endpoint = %endpoint, "MCP requested browser connection");
1933 self.ok(serde_json::json!({
1934 "status": "connecting",
1935 "endpoint": endpoint,
1936 }))
1937 }
1938 Err(_) => {
1939 tracing::warn!(endpoint = %endpoint, "browser connect command rejected: daemon shutting down");
1940 self.err(
1941 "daemon_shutting_down",
1942 "Daemon is shutting down.",
1943 None,
1944 None,
1945 )
1946 }
1947 }
1948 }
1949
1950 async fn issue_command_inner(&self, params: ActParams) -> String {
1951 use daemon8_chrome::BrowserAction;
1952
1953 if params.action == DebugAction::Screenshot && params.device_serial.is_some() {
1955 return self.handle_device_screenshot(¶ms).await;
1956 }
1957
1958 if let Err(e) = self
1959 .ensure_chrome_connected(std::time::Duration::from_secs(10))
1960 .await
1961 {
1962 return error_json(&e);
1963 }
1964
1965 let (reply_tx, reply_rx) =
1966 tokio::sync::oneshot::channel::<Result<serde_json::Value, anyhow::Error>>();
1967
1968 let action: BrowserAction = match params.action {
1971 DebugAction::EvalJs => {
1972 let expression = match params.expression {
1973 Some(expr) => expr,
1974 None => return error_json("eval_js requires 'expression' parameter"),
1975 };
1976 let (tx, rx) = tokio::sync::oneshot::channel();
1977 let reply_tx = reply_tx;
1978 tokio::spawn(async move {
1979 let result = rx
1980 .await
1981 .map_err(|_| anyhow::anyhow!("browser task died"))
1982 .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from))
1983 .map(|s| serde_json::json!({ "result": s }));
1984 let _ = reply_tx.send(result);
1985 });
1986 BrowserAction::EvalJs {
1987 tab_id: params.tab_id,
1988 expression,
1989 reply: tx,
1990 }
1991 }
1992 DebugAction::Screenshot => {
1993 let (tx, rx) = tokio::sync::oneshot::channel();
1994 let reply_tx = reply_tx;
1995 let selector = params.selector.clone();
1996 let shot_dir = self.screenshot_dir.clone();
1997 tokio::spawn(async move {
1998 let result = rx
1999 .await
2000 .map_err(|_| anyhow::anyhow!("browser task died"))
2001 .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from))
2002 .and_then(|bytes: Vec<u8>| {
2003 let path = screenshot_path(&shot_dir, "browser", selector.as_deref());
2004 std::fs::write(&path, &bytes)
2005 .map_err(|e| anyhow::anyhow!("failed to write screenshot: {e}"))?;
2006 Ok(serde_json::json!({
2007 "screenshot": path.display().to_string(),
2008 "size_bytes": bytes.len(),
2009 "selector": selector,
2010 }))
2011 });
2012 let _ = reply_tx.send(result);
2013 });
2014 BrowserAction::Screenshot {
2015 tab_id: params.tab_id,
2016 selector: params.selector,
2017 reply: tx,
2018 }
2019 }
2020 DebugAction::InjectCss => {
2021 let css = match params.css {
2022 Some(css) => css,
2023 None => return error_json("inject_css requires 'css' parameter"),
2024 };
2025 let temporary = params.temporary.unwrap_or(true);
2026 let (tx, rx) = tokio::sync::oneshot::channel();
2027 let reply_tx = reply_tx;
2028 tokio::spawn(async move {
2029 let result = rx
2030 .await
2031 .map_err(|_| anyhow::anyhow!("browser task died"))
2032 .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from))
2033 .map(|element_id| {
2034 serde_json::json!({
2035 "injected": true,
2036 "element_id": element_id,
2037 "temporary": temporary,
2038 })
2039 });
2040 let _ = reply_tx.send(result);
2041 });
2042 BrowserAction::InjectCss {
2043 tab_id: params.tab_id,
2044 css,
2045 temporary,
2046 reply: tx,
2047 }
2048 }
2049 DebugAction::RevertCss => {
2050 let (tx, rx) = tokio::sync::oneshot::channel();
2051 let reply_tx = reply_tx;
2052 tokio::spawn(async move {
2053 let result = rx
2054 .await
2055 .map_err(|_| anyhow::anyhow!("browser task died"))
2056 .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from))
2057 .map(|count| serde_json::json!({ "reverted_count": count }));
2058 let _ = reply_tx.send(result);
2059 });
2060 BrowserAction::RevertCss {
2061 tab_id: params.tab_id,
2062 reply: tx,
2063 }
2064 }
2065 DebugAction::ListTabs => {
2066 let (tx, rx) = tokio::sync::oneshot::channel();
2067 let reply_tx = reply_tx;
2068 tokio::spawn(async move {
2069 let result = rx
2070 .await
2071 .map_err(|_| anyhow::anyhow!("browser task died"))
2072 .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from))
2073 .map(|tabs| serde_json::json!({ "tabs": tabs }));
2074 let _ = reply_tx.send(result);
2075 });
2076 BrowserAction::ListTabs { reply: tx }
2077 }
2078 DebugAction::GetPerfMetrics => {
2079 let (tx, rx) = tokio::sync::oneshot::channel();
2080 let reply_tx = reply_tx;
2081 tokio::spawn(async move {
2082 let result = rx
2083 .await
2084 .map_err(|_| anyhow::anyhow!("browser task died"))
2085 .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from))
2086 .and_then(|metrics| {
2087 let json = serde_json::to_value(&metrics)
2088 .map_err(|e| anyhow::anyhow!("serialization failed: {e}"))?;
2089 Ok(serde_json::json!({ "metrics": json }))
2090 });
2091 let _ = reply_tx.send(result);
2092 });
2093 BrowserAction::GetPerformanceMetrics {
2094 tab_id: params.tab_id,
2095 reply: tx,
2096 }
2097 }
2098 DebugAction::GetDom => {
2099 let (tx, rx) = tokio::sync::oneshot::channel();
2100 let reply_tx = reply_tx;
2101 tokio::spawn(async move {
2102 let result = rx
2103 .await
2104 .map_err(|_| anyhow::anyhow!("browser task died"))
2105 .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from))
2106 .map(|html| serde_json::json!({ "html": html }));
2107 let _ = reply_tx.send(result);
2108 });
2109 BrowserAction::GetDom {
2110 tab_id: params.tab_id,
2111 selector: params.selector,
2112 reply: tx,
2113 }
2114 }
2115 DebugAction::SetViewport => {
2116 let width = params.viewport_width.unwrap_or(390);
2117 let height = params.viewport_height.unwrap_or(844);
2118 let scale = params.viewport_scale.unwrap_or(2.0);
2119 let mobile = params.viewport_mobile.unwrap_or(true);
2120 let (tx, rx) = tokio::sync::oneshot::channel();
2121 let reply_tx = reply_tx;
2122 tokio::spawn(async move {
2123 let result = rx
2124 .await
2125 .map_err(|_| anyhow::anyhow!("browser task died"))
2126 .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from))
2127 .map(|()| {
2128 serde_json::json!({
2129 "viewport_set": true,
2130 "width": width,
2131 "height": height,
2132 "scale": scale,
2133 "mobile": mobile,
2134 })
2135 });
2136 let _ = reply_tx.send(result);
2137 });
2138 BrowserAction::SetViewport {
2139 tab_id: params.tab_id,
2140 width,
2141 height,
2142 device_scale_factor: scale,
2143 mobile,
2144 user_agent: params.viewport_ua,
2145 reply: tx,
2146 }
2147 }
2148 DebugAction::ClearViewport => {
2149 let (tx, rx) = tokio::sync::oneshot::channel();
2150 let reply_tx = reply_tx;
2151 tokio::spawn(async move {
2152 let result = rx
2153 .await
2154 .map_err(|_| anyhow::anyhow!("browser task died"))
2155 .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from))
2156 .map(|()| serde_json::json!({ "viewport_cleared": true }));
2157 let _ = reply_tx.send(result);
2158 });
2159 BrowserAction::ClearViewport {
2160 tab_id: params.tab_id,
2161 reply: tx,
2162 }
2163 }
2164 DebugAction::NetworkConditions => {
2165 let preset = params.network_preset.unwrap_or(NetworkPreset::Restore);
2166 let preset_str = preset.as_str();
2167 let (tx, rx) = tokio::sync::oneshot::channel();
2168 let reply_tx = reply_tx;
2169 tokio::spawn(async move {
2170 let result = rx
2171 .await
2172 .map_err(|_| anyhow::anyhow!("browser task died"))
2173 .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from))
2174 .map(|()| serde_json::json!({ "network_conditions": preset_str }));
2175 let _ = reply_tx.send(result);
2176 });
2177 BrowserAction::SetNetworkConditions {
2178 tab_id: params.tab_id,
2179 preset: preset.as_str().to_string(),
2180 reply: tx,
2181 }
2182 }
2183 DebugAction::Navigate => {
2184 let url = match params.url {
2185 Some(u) => u,
2186 None => return error_json("navigate requires 'url' parameter"),
2187 };
2188 let (tx, rx) = tokio::sync::oneshot::channel();
2189 let reply_tx = reply_tx;
2190 tokio::spawn(async move {
2191 let result = rx
2192 .await
2193 .map_err(|_| anyhow::anyhow!("browser task died"))
2194 .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from))
2195 .map(|title| serde_json::json!({ "navigated": true, "title": title }));
2196 let _ = reply_tx.send(result);
2197 });
2198 BrowserAction::Navigate {
2199 tab_id: params.tab_id,
2200 url,
2201 reply: tx,
2202 }
2203 }
2204 DebugAction::StorageClear => {
2205 let types = params.storage_types.unwrap_or_else(|| "all".to_string());
2206 let (tx, rx) = tokio::sync::oneshot::channel();
2207 let reply_tx = reply_tx;
2208 tokio::spawn(async move {
2209 let result = rx
2210 .await
2211 .map_err(|_| anyhow::anyhow!("browser task died"))
2212 .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from))
2213 .map(|()| serde_json::json!({ "cleared": true }));
2214 let _ = reply_tx.send(result);
2215 });
2216 BrowserAction::StorageClear {
2217 tab_id: params.tab_id,
2218 storage_types: types,
2219 reply: tx,
2220 }
2221 }
2222 DebugAction::StorageInspect => {
2223 let (tx, rx) = tokio::sync::oneshot::channel();
2224 let reply_tx = reply_tx;
2225 tokio::spawn(async move {
2226 let result = rx
2227 .await
2228 .map_err(|_| anyhow::anyhow!("browser task died"))
2229 .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from));
2230 let _ = reply_tx.send(result);
2231 });
2232 BrowserAction::StorageInspect {
2233 tab_id: params.tab_id,
2234 reply: tx,
2235 }
2236 }
2237 DebugAction::StorageSet => {
2238 let store_type = match params.store_type {
2239 Some(t) => t.as_str().to_string(),
2240 None => return error_json("storage_set requires 'store_type' parameter"),
2241 };
2242 let key = match params.storage_key {
2243 Some(k) => k,
2244 None => return error_json("storage_set requires 'storage_key' parameter"),
2245 };
2246 let value = params.storage_value.unwrap_or_default();
2247 let (tx, rx) = tokio::sync::oneshot::channel();
2248 let reply_tx = reply_tx;
2249 tokio::spawn(async move {
2250 let result = rx
2251 .await
2252 .map_err(|_| anyhow::anyhow!("browser task died"))
2253 .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from))
2254 .map(|()| serde_json::json!({ "set": true }));
2255 let _ = reply_tx.send(result);
2256 });
2257 BrowserAction::StorageSet {
2258 tab_id: params.tab_id,
2259 store_type,
2260 key,
2261 value,
2262 reply: tx,
2263 }
2264 }
2265 DebugAction::ElementAtPoint => {
2266 let x = match params.x {
2267 Some(v) => v,
2268 None => return error_json("element_at_point requires 'x' parameter"),
2269 };
2270 let y = match params.y {
2271 Some(v) => v,
2272 None => return error_json("element_at_point requires 'y' parameter"),
2273 };
2274 let (tx, rx) = tokio::sync::oneshot::channel();
2275 let reply_tx = reply_tx;
2276 tokio::spawn(async move {
2277 let result = rx
2278 .await
2279 .map_err(|_| anyhow::anyhow!("browser task died"))
2280 .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from));
2281 let _ = reply_tx.send(result);
2282 });
2283 BrowserAction::ElementAtPoint {
2284 tab_id: params.tab_id,
2285 x,
2286 y,
2287 reply: tx,
2288 }
2289 }
2290 DebugAction::NewTab => {
2291 let url = params.url.unwrap_or_else(|| "about:blank".to_string());
2292 let (tx, rx) = tokio::sync::oneshot::channel();
2293 let reply_tx = reply_tx;
2294 tokio::spawn(async move {
2295 let result = rx
2296 .await
2297 .map_err(|_| anyhow::anyhow!("browser task died"))
2298 .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from))
2299 .map(|target_id| serde_json::json!({ "tab_id": target_id }));
2300 let _ = reply_tx.send(result);
2301 });
2302 BrowserAction::NewTab { url, reply: tx }
2303 }
2304 DebugAction::CloseTab => {
2305 let tab_id = match params.tab_id {
2306 Some(id) => id,
2307 None => return error_json("close_tab requires 'tab_id' parameter"),
2308 };
2309 let (tx, rx) = tokio::sync::oneshot::channel();
2310 let reply_tx = reply_tx;
2311 tokio::spawn(async move {
2312 let result = rx
2313 .await
2314 .map_err(|_| anyhow::anyhow!("browser task died"))
2315 .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from))
2316 .map(|()| serde_json::json!({ "closed": true }));
2317 let _ = reply_tx.send(result);
2318 });
2319 BrowserAction::CloseTab { tab_id, reply: tx }
2320 }
2321 };
2322
2323 if self
2324 .chrome_tx
2325 .send(ChromeCommand::Action(action))
2326 .await
2327 .is_err()
2328 {
2329 tracing::warn!("browser action command rejected: daemon shutting down");
2330 return error_json("Daemon is shutting down.");
2331 }
2332
2333 match tokio::time::timeout(std::time::Duration::from_secs(30), reply_rx).await {
2334 Err(_) => error_json(
2335 "Browser action timed out (30s). The daemon is still connected and will recover.",
2336 ),
2337 Ok(Ok(Ok(value))) => serde_json::to_string(&value).unwrap_or_default(),
2338 Ok(Ok(Err(e))) => error_json(&format!("{e}")),
2339 Ok(Err(_)) => error_json(
2340 "Browser connection lost during action. The daemon is reconnecting automatically.",
2341 ),
2342 }
2343 }
2344}
2345
2346impl DaemonMcp {
2347 async fn handle_device_screenshot(&self, params: &ActParams) -> String {
2348 let screenshot_fn = match &self.device_screenshot_fn {
2349 Some(f) => f,
2350 None => {
2351 tracing::warn!(
2352 "device screenshot requested but ADB screenshot support is unavailable"
2353 );
2354 return error_json("device screenshots not available (ADB not enabled)");
2355 }
2356 };
2357
2358 let serial = params.device_serial.clone().unwrap_or_default();
2359 let platform = match params.device_platform.as_deref() {
2360 Some("vega") => DevicePlatform::Vega,
2361 _ => DevicePlatform::Android,
2362 };
2363
2364 let result = tokio::time::timeout(
2365 std::time::Duration::from_secs(15),
2366 (screenshot_fn)(serial.clone(), platform.clone()),
2367 )
2368 .await;
2369
2370 match result {
2371 Err(_) => {
2372 tracing::warn!(serial, platform = ?platform, "device screenshot timed out");
2373 error_json("device screenshot timed out (15s)")
2374 }
2375 Ok(Err(e)) => {
2376 tracing::warn!(serial, platform = ?platform, error = %e, "device screenshot failed");
2377 error_json(&format!("device screenshot failed for {serial}: {e}"))
2378 }
2379 Ok(Ok(shot)) => {
2380 let path = screenshot_path(&self.screenshot_dir, &serial, Some(&shot.source));
2381 if let Err(e) = std::fs::write(&path, &shot.png_bytes) {
2382 tracing::warn!(serial, path = %path.display(), error = %e, "failed to write device screenshot");
2383 return error_json(&format!("failed to write screenshot: {e}"));
2384 }
2385 tracing::info!(
2386 serial,
2387 source = %shot.source,
2388 path = %path.display(),
2389 size_bytes = shot.png_bytes.len(),
2390 "device screenshot captured"
2391 );
2392 serde_json::to_string(&serde_json::json!({
2393 "screenshot": path.display().to_string(),
2394 "size_bytes": shot.png_bytes.len(),
2395 "source": shot.source,
2396 "serial": serial,
2397 }))
2398 .unwrap_or_default()
2399 }
2400 }
2401 }
2402}
2403
2404fn screenshot_path(dir: &std::path::Path, target: &str, label: Option<&str>) -> std::path::PathBuf {
2405 let ts = std::time::SystemTime::now()
2406 .duration_since(std::time::UNIX_EPOCH)
2407 .unwrap_or_default()
2408 .as_secs();
2409 let safe_target = target.replace(['/', '\\', ':'], "-");
2410 let suffix = label.map(|l| format!("-{l}")).unwrap_or_default();
2411 dir.join(format!("daemon8-screenshot-{ts}-{safe_target}{suffix}.png"))
2412}
2413
2414fn error_json(msg: &str) -> String {
2419 envelope::err(
2420 "internal_error",
2421 msg,
2422 None,
2423 None,
2424 envelope::DaemonMeta::default(),
2425 )
2426}
2427
2428fn detect_librarian_hint(content: &str) -> Option<String> {
2429 let lower = content.to_ascii_lowercase();
2430 if lower.contains("documentation")
2431 || lower.contains("config template")
2432 || lower.contains("source config")
2433 || lower.contains("log source")
2434 {
2435 Some("This memory describes a reference — consider also indexing it with librarian_index(kind=\"doc\" or \"source_template\") for graph-based retrieval.".into())
2436 } else if lower.contains("fix for")
2437 || lower.contains("fixed by")
2438 || lower.contains("workaround")
2439 {
2440 Some("This memory describes a fix — consider also indexing it with librarian_index(kind=\"fix\") to link it to the error it resolves.".into())
2441 } else {
2442 None
2443 }
2444}
2445
2446impl DaemonMcp {
2447 pub async fn connections_json(&self) -> String {
2449 let chrome_state = *self.chrome_state.borrow();
2450 let chrome_endpoint = self
2451 .chrome_endpoint
2452 .lock()
2453 .expect("chrome_endpoint mutex poisoned")
2454 .clone();
2455 let mut result = serde_json::json!({
2456 "browser": {
2457 "state": format!("{chrome_state}"),
2458 "endpoint": chrome_endpoint,
2459 }
2460 });
2461
2462 if let Ok(summary) = self.store.summary().await
2463 && !summary.connections.is_empty()
2464 {
2465 result["applications"] = serde_json::to_value(&summary.connections).unwrap_or_default();
2466 }
2467
2468 serde_json::to_string_pretty(&result)
2469 .unwrap_or_else(|e| error_json(&format!("serialization failed: {e}")))
2470 }
2471
2472 pub fn tools_for_client(&self) -> Vec<Tool> {
2473 self.tool_router.list_all()
2474 }
2475}
2476
2477impl ServerHandler for DaemonMcp {
2478 fn get_info(&self) -> ServerInfo {
2479 let instructions = String::from(INSTRUCTIONS);
2480
2481 let mut capabilities = ServerCapabilities::builder().enable_tools().build();
2482 capabilities.experimental = Some(std::collections::BTreeMap::from([(
2483 "claude/channel".to_string(),
2484 serde_json::Map::new(),
2485 )]));
2486
2487 ServerInfo::new(capabilities)
2488 .with_server_info(Implementation::new("daemon8", env!("CARGO_PKG_VERSION")))
2489 .with_instructions(instructions)
2490 }
2491
2492 async fn on_initialized(&self, context: rmcp::service::NotificationContext<RoleServer>) {
2493 let session_id = next_mcp_session_id();
2494 tracing::info!(
2495 session_id,
2496 "MCP session initialized, starting observation push"
2497 );
2498 let peer = context.peer;
2499 let mut rx = self.broadcast_tx.subscribe();
2500 let sub_rx = self.subscription_tx.subscribe();
2501 let push_source_activator = self.source_activator.clone();
2502 let session_cancel = self.cancel.child_token();
2503 let span = tracing::info_span!("mcp_session", session_id = %session_id);
2504
2505 tokio::spawn(async move {
2506 let mut last_push = std::time::Instant::now() - Duration::from_secs(2);
2507 loop {
2508 let (arc_obs, _json) = tokio::select! {
2509 biased;
2510 _ = session_cancel.cancelled() => {
2511 tracing::debug!("MCP observation push cancelled");
2512 break;
2513 }
2514 recv = rx.recv() => match recv {
2515 Ok(payload) => payload,
2516 Err(broadcast::error::RecvError::Lagged(skipped)) => {
2517 tracing::warn!(skipped, "MCP observation push receiver lagged; continuing with newest observations");
2518 continue;
2519 }
2520 Err(broadcast::error::RecvError::Closed) => {
2521 tracing::debug!("MCP observation push broadcast closed");
2522 break;
2523 }
2524 },
2525 };
2526
2527 let obs: &Observation = &arc_obs;
2528 let filter = sub_rx.borrow().clone();
2529 let should_push = match filter.as_ref() {
2530 Some(f) => f.matches(obs),
2531 None => obs.severity.level() >= daemon8_types::Severity::Warn.level(),
2532 };
2533
2534 if !should_push {
2535 continue;
2536 }
2537
2538 if let (Some(f), Some(sa)) = (&filter, &push_source_activator) {
2539 sa.touch_matching(f);
2540 }
2541
2542 if last_push.elapsed() < Duration::from_secs(1) {
2543 tracing::debug!(
2544 observation_id = obs.id,
2545 severity = %obs.severity,
2546 kind = %obs.kind.tag(),
2547 "MCP observation push throttled"
2548 );
2549 continue;
2550 }
2551
2552 let param = logging_notification(obs);
2553 let send = tokio::time::timeout(Duration::from_secs(5), peer.notify_logging_message(param)).await;
2554
2555 match send {
2556 Ok(Ok(())) => {
2557 last_push = std::time::Instant::now();
2558 tracing::debug!(
2559 observation_id = obs.id,
2560 severity = %obs.severity,
2561 kind = %obs.kind.tag(),
2562 "MCP observation pushed to client"
2563 );
2564 }
2565 Ok(Err(e)) => {
2566 tracing::warn!(error = ?e, "MCP observation push failed; ending session push task");
2567 break;
2568 }
2569 Err(_) => {
2570 tracing::warn!(
2571 observation_id = obs.id,
2572 severity = %obs.severity,
2573 kind = %obs.kind.tag(),
2574 "MCP observation push timed out"
2575 );
2576 }
2577 }
2578 }
2579 tracing::debug!("MCP observation push task ended");
2580 }
2581 .instrument(span));
2582
2583 if let Some(ds_store) = self.debug_session_store.clone() {
2587 let flush_state = self.active_state.clone();
2588 let flush_cancel = self.cancel.child_token();
2589 tokio::spawn(async move {
2590 loop {
2591 tokio::select! {
2592 () = tokio::time::sleep(Duration::from_secs(60)) => {}
2593 () = flush_cancel.cancelled() => break,
2594 }
2595 if let Some(session) = flush_state.current_session() {
2596 let last = session.last_activity();
2597 if let Err(e) = ds_store
2598 .touch_debug_session(session.id.as_ref(), last)
2599 .await
2600 {
2601 tracing::warn!(
2602 session_id = %session.id,
2603 error = %e,
2604 "per-session debug session flush failed"
2605 );
2606 }
2607 }
2608 }
2609 tracing::debug!("per-session debug session flush task ended");
2610 });
2611 }
2612 }
2613
2614 async fn list_tools(
2615 &self,
2616 _request: Option<rmcp::model::PaginatedRequestParams>,
2617 _context: rmcp::service::RequestContext<rmcp::RoleServer>,
2618 ) -> Result<rmcp::model::ListToolsResult, rmcp::ErrorData> {
2619 Ok(rmcp::model::ListToolsResult {
2620 tools: self.tools_for_client(),
2621 meta: None,
2622 next_cursor: None,
2623 })
2624 }
2625
2626 async fn call_tool(
2627 &self,
2628 request: rmcp::model::CallToolRequestParams,
2629 context: rmcp::service::RequestContext<rmcp::RoleServer>,
2630 ) -> Result<rmcp::model::CallToolResult, rmcp::ErrorData> {
2631 let tool = request.name.to_string();
2632 let started = Instant::now();
2633 tracing::debug!(tool = %tool, "MCP tool call started");
2634
2635 let tcc = rmcp::handler::server::tool::ToolCallContext::new(self, request, context);
2636 let result = self.tool_router.call(tcc).await;
2637 let duration_ms = started.elapsed().as_millis();
2638
2639 match &result {
2640 Ok(result) if result.is_error.unwrap_or(false) => {
2641 tracing::warn!(tool = %tool, duration_ms, "MCP tool call returned error result");
2642 }
2643 Ok(_) => {
2644 tracing::info!(tool = %tool, duration_ms, "MCP tool call completed");
2645 }
2646 Err(e) => {
2647 tracing::warn!(tool = %tool, duration_ms, error = ?e, "MCP tool call failed");
2648 }
2649 }
2650
2651 result
2652 }
2653
2654 fn get_tool(&self, name: &str) -> Option<rmcp::model::Tool> {
2655 self.tool_router.get(name).cloned()
2656 }
2657}
2658
2659pub async fn save_memory_inner(mem_store: &dyn MemoryStore, params: SaveMemoryParams) -> String {
2662 let kind = params
2663 .kind
2664 .as_deref()
2665 .and_then(|s| s.parse::<daemon8_types::MemoryKind>().ok())
2666 .unwrap_or(daemon8_types::MemoryKind::UserFlagged);
2667
2668 let now = std::time::SystemTime::now()
2669 .duration_since(std::time::UNIX_EPOCH)
2670 .unwrap_or_default()
2671 .as_nanos() as u64;
2672
2673 let memory = daemon8_store::Memory {
2674 id: None,
2675 created_at: now,
2676 updated_at: now,
2677 kind,
2678 content: params.content,
2679 source_observations: params.source_observations.unwrap_or_default(),
2680 tags: params.tags.unwrap_or_default(),
2681 project_slug: params.project_slug.unwrap_or_default(),
2682 session_id: params.session_id,
2683 confidence: params.confidence.unwrap_or(1.0),
2684 data: None,
2685 };
2686
2687 match mem_store.save_memory(memory).await {
2688 Ok(id) => serde_json::to_string(&serde_json::json!({ "id": id })).unwrap_or_default(),
2689 Err(e) => error_json(&format!("save_memory failed: {e}")),
2690 }
2691}
2692
2693pub async fn query_memory_inner(mem_store: &dyn MemoryStore, params: QueryMemoryParams) -> String {
2696 let kinds = params.kinds.map(|v| {
2697 v.into_iter()
2698 .filter_map(|s| s.parse::<daemon8_types::MemoryKind>().ok())
2699 .collect()
2700 });
2701
2702 let filter = daemon8_store::MemoryFilter {
2703 kinds,
2704 tags: params.tags,
2705 project_slug: params.project_slug,
2706 session_id: None,
2707 text_match: params.text,
2708 limit: Some(params.limit.unwrap_or(20).min(500) as usize),
2709 };
2710
2711 match mem_store.query_memory(&filter).await {
2712 Ok(memories) => serde_json::to_string_pretty(&memories)
2713 .unwrap_or_else(|e| error_json(&format!("serialization failed: {e}"))),
2714 Err(e) => error_json(&format!("query_memory failed: {e}")),
2715 }
2716}
2717
2718pub async fn librarian_index_inner(
2719 lib_store: &dyn LibrarianStore,
2720 params: LibrarianIndexParams,
2721) -> String {
2722 let kind = match params.kind.parse::<daemon8_types::LibrarianNodeKind>() {
2723 Ok(k) => k,
2724 Err(_) => {
2725 return error_json(&format!(
2726 "invalid kind '{}'. Use: doc, source_template, fix, project",
2727 params.kind
2728 ));
2729 }
2730 };
2731 let locator_kind = match params.locator_kind.parse::<daemon8_types::LocatorKind>() {
2732 Ok(k) => k,
2733 Err(_) => {
2734 return error_json(&format!(
2735 "invalid locator_kind '{}'. Use: file, url, vault",
2736 params.locator_kind
2737 ));
2738 }
2739 };
2740
2741 let now = std::time::SystemTime::now()
2742 .duration_since(std::time::UNIX_EPOCH)
2743 .unwrap_or_default()
2744 .as_nanos() as u64;
2745
2746 let node = daemon8_store::LibrarianNode {
2747 id: None,
2748 kind,
2749 label: params.label,
2750 locator_kind,
2751 locator: params.locator,
2752 tags: params.tags.unwrap_or_default(),
2753 project_slug: params.project_slug.unwrap_or_default(),
2754 version: String::new(),
2755 parent_id: params.parent_id.clone(),
2756 created_at: now,
2757 updated_at: now,
2758 last_read_at: None,
2759 deprecated_at: None,
2760 canonicalized_at: if params.canonicalize.unwrap_or(false) {
2761 Some(now)
2762 } else {
2763 None
2764 },
2765 };
2766
2767 let id = match lib_store.index_node(node).await {
2768 Ok(id) => id,
2769 Err(e) => return error_json(&format!("librarian_index failed: {e}")),
2770 };
2771
2772 let indexed_node = match lib_store.get_node(&id).await {
2773 Ok(Some(n)) => n,
2774 _ => {
2775 return serde_json::to_string(&serde_json::json!({
2776 "id": id, "version": "unknown", "kind": kind.to_string()
2777 }))
2778 .unwrap_or_default();
2779 }
2780 };
2781
2782 if let Some(ref edge) = params.edge {
2783 let edge_kind = match edge.kind.parse::<daemon8_types::LibrarianEdgeKind>() {
2784 Ok(k) => k,
2785 Err(_) => {
2786 return serde_json::to_string(&serde_json::json!({
2787 "id": id,
2788 "version": indexed_node.version,
2789 "kind": kind.to_string(),
2790 "edge_error": format!("invalid edge kind '{}'. Use: has_source, documented_by, fixes, supersedes, child_of", edge.kind)
2791 }))
2792 .unwrap_or_default();
2793 }
2794 };
2795 let lib_edge = daemon8_store::LibrarianEdge {
2796 id: None,
2797 kind: edge_kind,
2798 from_node: id.clone(),
2799 to_node: edge.target_node_id.clone(),
2800 created_at: now,
2801 };
2802 if let Err(e) = lib_store.index_edge(lib_edge).await {
2803 return serde_json::to_string(&serde_json::json!({
2804 "id": id,
2805 "version": indexed_node.version,
2806 "kind": kind.to_string(),
2807 "edge_error": format!("edge creation failed: {e}")
2808 }))
2809 .unwrap_or_default();
2810 }
2811 }
2812
2813 serde_json::to_string(&serde_json::json!({
2814 "id": id,
2815 "version": indexed_node.version,
2816 "kind": kind.to_string(),
2817 "parent_id": params.parent_id,
2818 }))
2819 .unwrap_or_default()
2820}
2821
2822pub async fn librarian_lookup_inner(
2823 lib_store: &dyn LibrarianStore,
2824 params: LibrarianLookupParams,
2825) -> String {
2826 if let Some(ref id) = params.id {
2827 let node = match lib_store.get_node(id).await {
2828 Ok(Some(n)) => n,
2829 Ok(None) => return error_json(&format!("node '{id}' not found")),
2830 Err(e) => return error_json(&format!("librarian_lookup failed: {e}")),
2831 };
2832 let edges = match lib_store.get_edges(id).await {
2833 Ok(e) => e,
2834 Err(e) => return error_json(&format!("librarian_lookup edges: {e}")),
2835 };
2836 return serde_json::to_string_pretty(&serde_json::json!({
2837 "node": node,
2838 "edges": edges,
2839 }))
2840 .unwrap_or_default();
2841 }
2842
2843 let kinds = params.kinds.map(|v| {
2844 v.into_iter()
2845 .filter_map(|s| s.parse::<daemon8_types::LibrarianNodeKind>().ok())
2846 .collect()
2847 });
2848
2849 let stale_before = params.stale_before_days.map(|days| {
2850 let now = std::time::SystemTime::now()
2851 .duration_since(std::time::UNIX_EPOCH)
2852 .unwrap_or_default()
2853 .as_nanos() as u64;
2854 now.saturating_sub(u64::from(days) * 86_400_000_000_000)
2855 });
2856
2857 let filter = daemon8_store::LibrarianFilter {
2858 kinds,
2859 tags: params.tags,
2860 project_slug: params.project_slug,
2861 text_match: params.text,
2862 limit: Some(params.limit.unwrap_or(20).min(500) as usize),
2863 include_deprecated: params.include_deprecated.unwrap_or(false),
2864 stale_before,
2865 parent_id: params.parent_id,
2866 };
2867
2868 match lib_store.lookup(&filter).await {
2869 Ok(nodes) => serde_json::to_string_pretty(&serde_json::json!({ "nodes": nodes }))
2870 .unwrap_or_else(|e| error_json(&format!("serialization failed: {e}"))),
2871 Err(e) => error_json(&format!("librarian_lookup failed: {e}")),
2872 }
2873}
2874
2875fn logging_notification(obs: &Observation) -> rmcp::model::LoggingMessageNotificationParam {
2876 let severity_str = obs.severity.to_string();
2877 let kind_str = obs.kind.tag().to_string();
2878 let origin_str = match &obs.origin {
2879 daemon8_types::Origin::Application { name } => format!("app:{name}"),
2880 daemon8_types::Origin::Browser { tab_id, .. } => format!("browser:{tab_id}"),
2881 daemon8_types::Origin::Device { serial, .. } => format!("device:{serial}"),
2882 };
2883 let msg = obs.data["message"]
2884 .as_str()
2885 .or_else(|| obs.data["msg"].as_str())
2886 .unwrap_or("(no message)");
2887 let level = match obs.severity {
2888 daemon8_types::Severity::Trace | daemon8_types::Severity::Debug => {
2889 rmcp::model::LoggingLevel::Debug
2890 }
2891 daemon8_types::Severity::Info => rmcp::model::LoggingLevel::Info,
2892 daemon8_types::Severity::Warn => rmcp::model::LoggingLevel::Warning,
2893 daemon8_types::Severity::Error => rmcp::model::LoggingLevel::Error,
2894 };
2895 let data = serde_json::json!({
2896 "message": format!("[{severity_str}] {kind_str} from {origin_str}: {msg}"),
2897 "severity": severity_str,
2898 "kind": kind_str,
2899 "origin": origin_str,
2900 "observation_id": obs.id,
2901 });
2902
2903 rmcp::model::LoggingMessageNotificationParam::new(level, data)
2904 .with_logger("daemon8".to_string())
2905}
2906
2907#[cfg(test)]
2908mod logging_tests {
2909 use daemon8_types::{ObservationKind, Origin, Severity};
2910
2911 use super::*;
2912
2913 #[test]
2914 fn mcp_session_ids_are_stable_and_prefixed() {
2915 let id = next_mcp_session_id();
2916 assert!(id.starts_with("mcp-"));
2917 }
2918
2919 #[test]
2920 fn forget_memory_gate_rejects_missing_confirm() {
2921 let result = check_forget_memory_confirm(None);
2922 let err = result.expect_err("missing confirm should error");
2923 assert!(
2924 err.contains("confirm=true"),
2925 "error message should name the required field: {err}"
2926 );
2927 }
2928
2929 #[test]
2930 fn forget_memory_gate_rejects_confirm_false() {
2931 let result = check_forget_memory_confirm(Some(false));
2932 assert!(result.is_err(), "confirm=false should error");
2933 }
2934
2935 #[test]
2936 fn forget_memory_gate_accepts_confirm_true() {
2937 let result = check_forget_memory_confirm(Some(true));
2938 assert!(result.is_ok(), "confirm=true should pass");
2939 }
2940
2941 #[test]
2942 fn forget_memory_params_parses_without_confirm() {
2943 let p: ForgetMemoryParams = serde_json::from_str(r#"{"id":"abc"}"#).unwrap();
2944 assert_eq!(p.id, "abc");
2945 assert_eq!(p.confirm, None);
2946 }
2947
2948 #[test]
2949 fn forget_memory_params_parses_with_confirm_true() {
2950 let p: ForgetMemoryParams = serde_json::from_str(r#"{"id":"abc","confirm":true}"#).unwrap();
2951 assert_eq!(p.confirm, Some(true));
2952 }
2953
2954 #[test]
2955 fn forget_memory_params_parses_with_confirm_false() {
2956 let p: ForgetMemoryParams =
2957 serde_json::from_str(r#"{"id":"abc","confirm":false}"#).unwrap();
2958 assert_eq!(p.confirm, Some(false));
2959 }
2960
2961 async fn build_mcp_with_debug_session() -> DaemonMcp {
2962 let store = Arc::new(daemon8_store::SurrealStore::memory().await.unwrap());
2963 let memory_store: Arc<dyn MemoryStore> = Arc::new(store.memory_store());
2964 let debug_session_store: Arc<dyn DebugSessionStore> = Arc::new(store.debug_session_store());
2965 let (obs_tx, _obs_rx) = tokio::sync::mpsc::unbounded_channel();
2966 let (chrome_tx, _chrome_rx) = tokio::sync::mpsc::channel(8);
2967 let (_, chrome_state) =
2968 tokio::sync::watch::channel(daemon8_chrome::ConnectionState::Disconnected);
2969 let (broadcast_tx, _broadcast_rx) = broadcast::channel(8);
2970 let lens = Arc::new(LensManager::new(broadcast_tx.subscribe(), None));
2971 DaemonMcp::new(DaemonMcpConfig {
2972 store: store.clone(),
2973 memory_store: Some(memory_store),
2974 debug_session_store: Some(debug_session_store),
2975 librarian_store: None,
2976 obs_tx,
2977 chrome_tx,
2978 chrome_state,
2979 chrome_endpoint: Arc::new(Mutex::new(None)),
2980 device_screenshot_fn: None,
2981 screenshot_dir: std::env::temp_dir().join("daemon8-test"),
2982 broadcast_tx,
2983 lens,
2984 setup_tool_fn: None,
2985 hooks_tool_fn: None,
2986 source_activator: None,
2987 cancel: tokio_util::sync::CancellationToken::new(),
2988 })
2989 }
2990
2991 #[tokio::test]
2992 async fn debug_session_lifecycle_resolved_writes_rich_summary() {
2993 let mcp = build_mcp_with_debug_session().await;
2994
2995 let start_res = mcp
2997 .start_debug_session(Parameters(StartDebugSessionParams {
2998 project: Some("daemon8".into()),
2999 description: Some("flaky login test".into()),
3000 agent_id: ":test/claude+plan-agent>".into(),
3001 feature: None,
3002 }))
3003 .await;
3004 let started: serde_json::Value = serde_json::from_str(&start_res).unwrap();
3005 let session_id = started["result"]["debug_session_id"]
3006 .as_str()
3007 .unwrap()
3008 .to_string();
3009 assert!(mcp.active_state.current_session().is_some());
3010
3011 let resolve_res = mcp
3013 .resolve_debug_session(Parameters(ResolveDebugSessionParams {
3014 summary: "Cookie domain mismatch dropped session on subdomain switch.".into(),
3015 root_cause: Some("Set-Cookie missing Domain attr".into()),
3016 fix_diff: Some(
3017 "- res.cookie('s', tok)\n+ res.cookie('s', tok, {domain: '.x'})".into(),
3018 ),
3019 commands_used: Some(vec!["cargo test login".into()]),
3020 related_errors: Some(vec!["abcd1234deadbeef".into()]),
3021 tags: Some(vec!["auth".into(), "regression".into()]),
3022 }))
3023 .await;
3024 let resolved: serde_json::Value = serde_json::from_str(&resolve_res).unwrap();
3025 assert_eq!(resolved["result"]["debug_session_id"], session_id);
3026 let memory_id = resolved["result"]["summary_memory_id"].as_str().unwrap();
3027
3028 assert!(mcp.active_state.current_session().is_none());
3030
3031 let mem_store = mcp.memory_store.clone().unwrap();
3033 let mem = mem_store.get_memory(memory_id).await.unwrap().unwrap();
3034 assert_eq!(mem.kind, daemon8_types::MemoryKind::SessionSummary);
3035 assert!(mem.content.contains("Cookie domain"));
3036 let data = mem.data.expect("resolved session must carry rich data");
3037 assert_eq!(data["root_cause"], "Set-Cookie missing Domain attr");
3038 assert!(mem.tags.contains(&"outcome:resolved".to_string()));
3039 assert!(mem.tags.contains(&"hash:abcd1234deadbeef".to_string()));
3040 assert!(mem.tags.contains(&"auth".to_string()));
3041
3042 let ds_store = mcp.debug_session_store.clone().unwrap();
3044 let session = ds_store
3045 .get_debug_session(&session_id)
3046 .await
3047 .unwrap()
3048 .unwrap();
3049 assert_eq!(session.status, daemon8_types::DebugSessionStatus::Completed);
3050 assert_eq!(
3051 session.outcome,
3052 Some(daemon8_types::DebugSessionOutcome::Resolved)
3053 );
3054 assert_eq!(session.summary_memory_id.as_deref(), Some(memory_id));
3055 }
3056
3057 #[tokio::test]
3058 async fn debug_session_double_start_rejected() {
3059 let mcp = build_mcp_with_debug_session().await;
3060 let _ = mcp
3061 .start_debug_session(Parameters(StartDebugSessionParams {
3062 project: None,
3063 description: None,
3064 agent_id: ":test/claude+plan-agent>".into(),
3065 feature: None,
3066 }))
3067 .await;
3068 let second = mcp
3069 .start_debug_session(Parameters(StartDebugSessionParams {
3070 project: None,
3071 description: None,
3072 agent_id: ":test/claude+plan-agent>".into(),
3073 feature: None,
3074 }))
3075 .await;
3076 assert!(
3077 second.contains("already_active_debug_session"),
3078 "second start must be rejected: {second}"
3079 );
3080 }
3081
3082 #[tokio::test]
3083 async fn create_checkpoint_without_active_session_returns_structured_error() {
3084 let mcp = build_mcp_with_debug_session().await;
3085 let res = mcp
3086 .create_checkpoint(Parameters(CreateCheckpointParams { description: None }))
3087 .await;
3088 let parsed: serde_json::Value = serde_json::from_str(&res).unwrap();
3089 assert_eq!(parsed["error"]["code"], "no_active_debug_session");
3090 assert_eq!(parsed["error"]["fix"]["tool"], "start_debug_session");
3091 assert!(parsed["result"].is_null());
3092 }
3093
3094 #[tokio::test]
3095 async fn create_checkpoint_inside_active_session_writes_row_and_updates_active_state() {
3096 let mcp = build_mcp_with_debug_session().await;
3097 let _ = mcp
3098 .start_debug_session(Parameters(StartDebugSessionParams {
3099 project: Some("daemon8".into()),
3100 description: None,
3101 agent_id: ":test/claude+plan-agent>".into(),
3102 feature: None,
3103 }))
3104 .await;
3105 let res = mcp
3106 .create_checkpoint(Parameters(CreateCheckpointParams {
3107 description: Some("before fix".into()),
3108 }))
3109 .await;
3110 let parsed: serde_json::Value = serde_json::from_str(&res).unwrap();
3111 let result = &parsed["result"];
3112 let cp_id = result["checkpoint_id"].as_str().unwrap();
3113 assert!(result["seq_at_creation"].is_number());
3114 assert!(parsed["daemon8"]["active_debug_session"].is_object());
3116
3117 let active_cp = mcp.active_state.current_checkpoint().unwrap();
3119 assert_eq!(active_cp.as_ref(), cp_id);
3120
3121 let ds_store = mcp.debug_session_store.clone().unwrap();
3123 let cp = ds_store.get_checkpoint(cp_id).await.unwrap().unwrap();
3124 assert_eq!(cp.description.as_deref(), Some("before fix"));
3125 }
3126
3127 #[tokio::test]
3128 async fn end_without_active_session_returns_error() {
3129 let mcp = build_mcp_with_debug_session().await;
3130 let res = mcp
3131 .end_debug_session(Parameters(EndDebugSessionParams { outcome: None }))
3132 .await;
3133 assert!(res.contains("no_active_debug_session"));
3134 }
3135
3136 #[tokio::test]
3137 async fn list_debug_sessions_filters_by_status() {
3138 let mcp = build_mcp_with_debug_session().await;
3139 let _ = mcp
3141 .start_debug_session(Parameters(StartDebugSessionParams {
3142 project: Some("p".into()),
3143 description: None,
3144 agent_id: ":test/claude+plan-agent>".into(),
3145 feature: None,
3146 }))
3147 .await;
3148 let _ = mcp
3149 .resolve_debug_session(Parameters(ResolveDebugSessionParams {
3150 summary: "x".into(),
3151 root_cause: None,
3152 fix_diff: None,
3153 commands_used: None,
3154 related_errors: None,
3155 tags: None,
3156 }))
3157 .await;
3158 let _ = mcp
3160 .start_debug_session(Parameters(StartDebugSessionParams {
3161 project: Some("p".into()),
3162 description: None,
3163 agent_id: ":test/claude+plan-agent>".into(),
3164 feature: None,
3165 }))
3166 .await;
3167
3168 let active_only = mcp
3169 .list_debug_sessions(Parameters(ListDebugSessionsParams {
3170 status: Some("active".into()),
3171 feature: None,
3172 }))
3173 .await;
3174 let parsed: serde_json::Value = serde_json::from_str(&active_only).unwrap();
3175 assert_eq!(parsed["result"]["count"], 1);
3176
3177 let all = mcp
3178 .list_debug_sessions(Parameters(ListDebugSessionsParams {
3179 status: None,
3180 feature: None,
3181 }))
3182 .await;
3183 let parsed: serde_json::Value = serde_json::from_str(&all).unwrap();
3184 assert_eq!(parsed["result"]["count"], 2);
3185 }
3186
3187 #[tokio::test]
3188 async fn save_memory_inner_persists_curated_memory() {
3189 let store = daemon8_store::SurrealStore::memory().await.unwrap();
3190 let mem_store = store.memory_store();
3191 mem_store.init_schema().await.unwrap();
3192
3193 let result = save_memory_inner(
3194 &mem_store,
3195 SaveMemoryParams {
3196 content: "Prefer checkpoints before runtime checks.".into(),
3197 kind: Some("decision".into()),
3198 tags: Some(vec!["project:daemon8".into(), "kind:test".into()]),
3199 source_observations: Some(vec![42]),
3200 project_slug: Some("daemon8".into()),
3201 session_id: Some("test-session".into()),
3202 confidence: Some(0.9),
3203 },
3204 )
3205 .await;
3206
3207 let value: serde_json::Value = serde_json::from_str(&result).unwrap();
3208 let id = value["id"]
3209 .as_str()
3210 .expect("save_memory should return an id");
3211 let saved = mem_store.get_memory(id).await.unwrap().unwrap();
3212
3213 assert_eq!(saved.kind, daemon8_types::MemoryKind::Decision);
3214 assert_eq!(saved.content, "Prefer checkpoints before runtime checks.");
3215 assert_eq!(saved.source_observations, vec![42]);
3216 assert_eq!(saved.tags, vec!["project:daemon8", "kind:test"]);
3217 assert_eq!(saved.project_slug, "daemon8");
3218 assert_eq!(saved.session_id.as_deref(), Some("test-session"));
3219 assert_eq!(saved.confidence, 0.9);
3220 }
3221
3222 #[test]
3223 fn logging_notification_includes_operational_fields() {
3224 let mut obs = Observation::new(
3225 Origin::Application {
3226 name: "test-app".into(),
3227 },
3228 ObservationKind::Log,
3229 serde_json::json!({"message": "hello"}),
3230 Severity::Warn,
3231 None,
3232 );
3233 obs.id = 42;
3234
3235 let param = logging_notification(&obs);
3236 assert_eq!(param.logger.as_deref(), Some("daemon8"));
3237 assert_eq!(param.data["severity"], "warn");
3238 assert_eq!(param.data["kind"], "log");
3239 assert_eq!(param.data["origin"], "app:test-app");
3240 assert_eq!(param.data["observation_id"], 42);
3241 }
3242
3243 async fn build_shared_mcps() -> (DaemonMcp, DaemonMcp) {
3249 let shared_store = Arc::new(daemon8_store::SurrealStore::memory().await.unwrap());
3250 let shared_mem: Arc<dyn MemoryStore> = Arc::new(shared_store.memory_store());
3251 let shared_ds: Arc<dyn DebugSessionStore> = Arc::new(shared_store.debug_session_store());
3252
3253 let (shared_obs_tx, mut shared_obs_rx) = tokio::sync::mpsc::unbounded_channel();
3256
3257 let drain_store = shared_store.clone();
3260 tokio::spawn(async move {
3261 while let Some(obs) = shared_obs_rx.recv().await {
3262 let _ = drain_store.insert(obs).await;
3263 }
3264 });
3265
3266 let make = || {
3267 let (chrome_tx, _chrome_rx) = tokio::sync::mpsc::channel(8);
3268 let (_, chrome_state) =
3269 tokio::sync::watch::channel(daemon8_chrome::ConnectionState::Disconnected);
3270 let (broadcast_tx, _broadcast_rx) = broadcast::channel(8);
3271 let lens = Arc::new(LensManager::new(broadcast_tx.subscribe(), None));
3272 DaemonMcp::new(DaemonMcpConfig {
3273 store: shared_store.clone(),
3274 memory_store: Some(shared_mem.clone()),
3275 debug_session_store: Some(shared_ds.clone()),
3276 librarian_store: None,
3277 obs_tx: shared_obs_tx.clone(),
3278 chrome_tx,
3279 chrome_state,
3280 chrome_endpoint: Arc::new(Mutex::new(None)),
3281 device_screenshot_fn: None,
3282 screenshot_dir: std::env::temp_dir().join("daemon8-test"),
3283 broadcast_tx,
3284 lens,
3285 setup_tool_fn: None,
3286 hooks_tool_fn: None,
3287 source_activator: None,
3288 cancel: tokio_util::sync::CancellationToken::new(),
3289 })
3290 };
3291
3292 (make(), make())
3293 }
3294
3295 #[tokio::test]
3296 async fn multi_session_two_agents_non_conflicting() {
3297 let (a, b) = build_shared_mcps().await;
3298
3299 let a_start = a
3300 .start_debug_session(Parameters(StartDebugSessionParams {
3301 project: Some("daemon8".into()),
3302 description: Some("agent A investigating auth".into()),
3303 agent_id: ":test/claude+plan-agent>".into(),
3304 feature: Some("auth".into()),
3305 }))
3306 .await;
3307 let a_parsed: serde_json::Value = serde_json::from_str(&a_start).unwrap();
3308 assert!(
3309 a_parsed["result"]["debug_session_id"].is_string(),
3310 "agent A must start successfully: {a_start}"
3311 );
3312
3313 let b_start = b
3314 .start_debug_session(Parameters(StartDebugSessionParams {
3315 project: Some("daemon8".into()),
3316 description: Some("agent B investigating search".into()),
3317 agent_id: ":test/codex+build-agent>".into(),
3318 feature: Some("search".into()),
3319 }))
3320 .await;
3321 let b_parsed: serde_json::Value = serde_json::from_str(&b_start).unwrap();
3322 assert!(
3323 b_parsed["result"]["debug_session_id"].is_string(),
3324 "agent B must start successfully — no global single-active: {b_start}"
3325 );
3326
3327 assert!(
3329 !b_start.contains("already_active_debug_session"),
3330 "agent B must not be blocked by agent A's active session"
3331 );
3332 }
3333
3334 #[tokio::test]
3335 async fn multi_session_observations_stamped_independently() {
3336 let (a, b) = build_shared_mcps().await;
3337
3338 let a_start: serde_json::Value = serde_json::from_str(
3340 &a.start_debug_session(Parameters(StartDebugSessionParams {
3341 project: Some("p".into()),
3342 description: None,
3343 agent_id: ":test/claude+plan-agent>".into(),
3344 feature: None,
3345 }))
3346 .await,
3347 )
3348 .unwrap();
3349 let a_sid = a_start["result"]["debug_session_id"].as_str().unwrap();
3350
3351 let b_start: serde_json::Value = serde_json::from_str(
3353 &b.start_debug_session(Parameters(StartDebugSessionParams {
3354 project: Some("p".into()),
3355 description: None,
3356 agent_id: ":test/codex+build-agent>".into(),
3357 feature: None,
3358 }))
3359 .await,
3360 )
3361 .unwrap();
3362 let b_sid = b_start["result"]["debug_session_id"].as_str().unwrap();
3363
3364 assert_ne!(a_sid, b_sid, "each agent must get a distinct session id");
3365
3366 a.ingest_observation(Parameters(IngestParams {
3368 kind: Some("log".into()),
3369 severity: Some("info".into()),
3370 app: Some("test-a".into()),
3371 channel: None,
3372 correlation_id: None,
3373 parent_id: None,
3374 node_id: None,
3375 session_id: None,
3376 tags: None,
3377 data: serde_json::json!({"msg": "from agent A"}),
3378 }))
3379 .await;
3380
3381 b.ingest_observation(Parameters(IngestParams {
3382 kind: Some("log".into()),
3383 severity: Some("info".into()),
3384 app: Some("test-b".into()),
3385 channel: None,
3386 correlation_id: None,
3387 parent_id: None,
3388 node_id: None,
3389 session_id: None,
3390 tags: None,
3391 data: serde_json::json!({"msg": "from agent B"}),
3392 }))
3393 .await;
3394
3395 tokio::task::yield_now().await;
3397 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3398
3399 let slice = a
3401 .store
3402 .query(&daemon8_types::Filter {
3403 kinds: Some(vec![daemon8_types::ObservationKindTag::Log]),
3404 ..Default::default()
3405 })
3406 .await
3407 .unwrap();
3408
3409 assert_eq!(
3410 slice.observations.len(),
3411 2,
3412 "both observations must be in the shared store"
3413 );
3414
3415 let has_a = slice
3416 .observations
3417 .iter()
3418 .any(|o| o.debug_session_id.as_deref() == Some(a_sid));
3419 let has_b = slice
3420 .observations
3421 .iter()
3422 .any(|o| o.debug_session_id.as_deref() == Some(b_sid));
3423 assert!(
3424 has_a,
3425 "observation from agent A must be stamped with A's session"
3426 );
3427 assert!(
3428 has_b,
3429 "observation from agent B must be stamped with B's session"
3430 );
3431 }
3432
3433 #[tokio::test]
3434 async fn multi_session_end_one_leaves_other_active() {
3435 let (a, b) = build_shared_mcps().await;
3436
3437 let _ = a
3439 .start_debug_session(Parameters(StartDebugSessionParams {
3440 project: None,
3441 description: None,
3442 agent_id: ":test/claude+plan-agent>".into(),
3443 feature: None,
3444 }))
3445 .await;
3446 let _ = b
3447 .start_debug_session(Parameters(StartDebugSessionParams {
3448 project: None,
3449 description: None,
3450 agent_id: ":test/codex+build-agent>".into(),
3451 feature: None,
3452 }))
3453 .await;
3454
3455 let end_res: serde_json::Value = serde_json::from_str(
3457 &a.end_debug_session(Parameters(EndDebugSessionParams { outcome: None }))
3458 .await,
3459 )
3460 .unwrap();
3461 assert!(
3462 end_res["result"]["debug_session_id"].is_string(),
3463 "end must succeed: {end_res}"
3464 );
3465
3466 assert!(a.active_state.current_session().is_none());
3468
3469 assert!(
3471 b.active_state.current_session().is_some(),
3472 "agent B must remain active after A ends"
3473 );
3474 }
3475
3476 #[tokio::test]
3477 async fn list_debug_sessions_filters_by_feature() {
3478 let (a, _b) = build_shared_mcps().await;
3479
3480 let _ = a
3482 .start_debug_session(Parameters(StartDebugSessionParams {
3483 project: Some("p".into()),
3484 description: None,
3485 agent_id: ":test/claude+plan-agent>".into(),
3486 feature: Some("auth".into()),
3487 }))
3488 .await;
3489 let _ = a
3491 .end_debug_session(Parameters(EndDebugSessionParams { outcome: None }))
3492 .await;
3493 let _ = a
3494 .start_debug_session(Parameters(StartDebugSessionParams {
3495 project: Some("p".into()),
3496 description: None,
3497 agent_id: ":test/claude+plan-agent>".into(),
3498 feature: Some("search".into()),
3499 }))
3500 .await;
3501
3502 let auth_only: serde_json::Value = serde_json::from_str(
3504 &a.list_debug_sessions(Parameters(ListDebugSessionsParams {
3505 status: None,
3506 feature: Some("auth".into()),
3507 }))
3508 .await,
3509 )
3510 .unwrap();
3511 assert_eq!(
3512 auth_only["result"]["count"], 1,
3513 "feature filter must return only the auth session"
3514 );
3515 assert_eq!(
3516 auth_only["result"]["sessions"][0]["feature"], "auth",
3517 "returned session must have the matching feature"
3518 );
3519
3520 let search_only: serde_json::Value = serde_json::from_str(
3521 &a.list_debug_sessions(Parameters(ListDebugSessionsParams {
3522 status: None,
3523 feature: Some("search".into()),
3524 }))
3525 .await,
3526 )
3527 .unwrap();
3528 assert_eq!(
3529 search_only["result"]["count"], 1,
3530 "feature filter must return only the search session"
3531 );
3532
3533 let none: serde_json::Value = serde_json::from_str(
3534 &a.list_debug_sessions(Parameters(ListDebugSessionsParams {
3535 status: None,
3536 feature: Some("nonexistent".into()),
3537 }))
3538 .await,
3539 )
3540 .unwrap();
3541 assert_eq!(
3542 none["result"]["count"], 0,
3543 "unknown feature must return empty"
3544 );
3545 }
3546
3547 #[test]
3550 fn agent_id_valid_formats() {
3551 for id in [
3552 ":mbp/claude+plan-agent>",
3553 ":linux/codex+build-agent>",
3554 ":mbp/gemini+researcher>",
3555 ":mini/copilot+reviewer>",
3556 ":box/opencode+crawler>",
3557 ":test-host/my-tool+my-role>",
3558 ] {
3559 assert!(
3560 validate_agent_id(id).is_ok(),
3561 "valid agent_id must pass: {id}"
3562 );
3563 }
3564 }
3565
3566 #[test]
3567 fn agent_id_rejects_missing_colon() {
3568 assert!(validate_agent_id("mbp/claude+agent>").is_err());
3569 }
3570
3571 #[test]
3572 fn agent_id_rejects_missing_gt() {
3573 assert!(validate_agent_id(":mbp/claude+agent").is_err());
3574 }
3575
3576 #[test]
3577 fn agent_id_rejects_missing_slash() {
3578 assert!(validate_agent_id(":mbp-claude+agent>").is_err());
3579 }
3580
3581 #[test]
3582 fn agent_id_rejects_missing_plus() {
3583 assert!(validate_agent_id(":mbp/claude-agent>").is_err());
3584 }
3585
3586 #[test]
3587 fn agent_id_rejects_uppercase() {
3588 assert!(validate_agent_id(":MBP/claude+agent>").is_err());
3589 }
3590
3591 #[test]
3592 fn agent_id_rejects_empty_host() {
3593 assert!(validate_agent_id(":/tool+role>").is_err());
3594 }
3595
3596 #[test]
3597 fn agent_id_rejects_empty_tool() {
3598 assert!(validate_agent_id(":host/+role>").is_err());
3599 }
3600
3601 #[test]
3602 fn agent_id_rejects_empty_role() {
3603 assert!(validate_agent_id(":host/tool+>").is_err());
3604 }
3605
3606 #[test]
3607 fn agent_id_rejects_too_long() {
3608 let long_id = format!(":{}", "x".repeat(65));
3609 assert!(validate_agent_id(&long_id).is_err());
3610 }
3611
3612 #[tokio::test]
3613 async fn start_debug_session_rejects_invalid_agent_id() {
3614 let mcp = build_mcp_with_debug_session().await;
3615 let res = mcp
3616 .start_debug_session(Parameters(StartDebugSessionParams {
3617 project: None,
3618 description: None,
3619 agent_id: "bad-format".into(),
3620 feature: None,
3621 }))
3622 .await;
3623 assert!(
3624 res.contains("invalid_agent_id"),
3625 "bad agent_id must be rejected: {res}"
3626 );
3627 }
3628}