Skip to main content

daemon8_mcp/
lib.rs

1// SPDX-License-Identifier: LicenseRef-FCL-1.0-ALv2
2// Copyright (c) 2026 Havy.tech, LLC
3
4use std::future::Future;
5use std::pin::Pin;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::{Arc, Mutex};
8use std::time::{Duration, Instant};
9
10use daemon8_store::{
11    ActiveSessionState, DebugSessionStore, LensManager, LibrarianStore, MemoryStore, StateModel,
12};
13use daemon8_types::{Checkpoint, DevicePlatform, Filter, Observation, SourceActivator};
14use rmcp::handler::server::router::tool::ToolRouter;
15use rmcp::handler::server::wrapper::Parameters;
16use rmcp::model::{Implementation, ServerCapabilities, ServerInfo, Tool};
17use rmcp::schemars::{self, JsonSchema};
18use rmcp::{RoleServer, ServerHandler, tool, tool_router};
19use serde::Deserialize;
20use tokio::sync::broadcast;
21use tracing::Instrument;
22
23pub mod envelope;
24pub mod help;
25use envelope::{ActiveSessionEcho, DaemonMeta};
26use help::FeatureGate;
27
28const INSTRUCTIONS: &str = include_str!("../tool_descriptions/instructions.md");
29static MCP_SESSION_COUNTER: AtomicU64 = AtomicU64::new(1);
30
31fn next_mcp_session_id() -> String {
32    let id = MCP_SESSION_COUNTER.fetch_add(1, Ordering::Relaxed);
33    format!("mcp-{id}")
34}
35
36pub struct DeviceScreenshotResult {
37    pub png_bytes: Vec<u8>,
38    pub source: String,
39}
40
41/// Callback type for device screenshot capture. Receives (serial, platform) and
42/// returns PNG bytes + source label. Constructed by the daemon crate with access
43/// to ADB transport and xcap.
44pub type DeviceScreenshotFn = Arc<
45    dyn Fn(
46            String,
47            DevicePlatform,
48        ) -> Pin<Box<dyn Future<Output = anyhow::Result<DeviceScreenshotResult>> + Send>>
49        + Send
50        + Sync,
51>;
52
53#[derive(Debug)]
54pub enum ChromeCommand {
55    Connect { endpoint: String },
56    Action(daemon8_chrome::BrowserAction),
57}
58
59#[derive(Debug, Deserialize, JsonSchema)]
60pub struct ObserveParams {
61    #[schemars(
62        description = "Filter by observation kind: log, query, http_exchange, exception, js_exception, lifecycle, state_snapshot, metric, custom. Browser console output is 'log', browser JS errors are 'js_exception', page load events are 'lifecycle', network requests are 'http_exchange'."
63    )]
64    pub kinds: Option<Vec<String>>,
65
66    #[schemars(description = "Minimum severity threshold: trace, debug, info, warn, error")]
67    pub severity_min: Option<String>,
68
69    #[schemars(
70        description = "Filter by origin pattern: 'app' or 'app:name' for applications, 'browser' or 'browser:tab_id' for browser tabs, 'device' or 'device:serial' for devices. Omit to see all origins."
71    )]
72    pub origins: Option<Vec<String>>,
73
74    #[schemars(description = "Search across materialized observation text")]
75    pub text_match: Option<String>,
76
77    #[schemars(description = "Return only observations after this checkpoint id")]
78    pub since_checkpoint: Option<u64>,
79
80    #[schemars(description = "Maximum number of results to return (default 50)")]
81    pub limit: Option<usize>,
82
83    #[schemars(description = "Filter by correlation ID (exact match)")]
84    pub correlation_id: Option<String>,
85
86    #[schemars(description = "Filter by tags (all listed tags must be present)")]
87    pub tags: Option<Vec<String>>,
88
89    #[schemars(
90        description = "Include system/infrastructure observations (tagged '_system'). These are excluded by default to reduce noise from CLI hooks and internal tooling."
91    )]
92    pub include_system: Option<bool>,
93}
94
95#[derive(Debug, Deserialize, JsonSchema)]
96pub struct ConnectParams {
97    #[schemars(description = "Browser DevTools endpoint URL (default http://localhost:9222)")]
98    pub endpoint: String,
99}
100
101pub use daemon8_types::DebugAction;
102
103#[derive(Debug, Deserialize, JsonSchema)]
104#[serde(rename_all = "snake_case")]
105pub enum NetworkPreset {
106    Offline,
107    #[serde(rename = "slow-3g")]
108    Slow3g,
109    #[serde(rename = "fast-3g")]
110    Fast3g,
111    Restore,
112}
113
114impl NetworkPreset {
115    fn as_str(&self) -> &'static str {
116        match self {
117            Self::Offline => "offline",
118            Self::Slow3g => "slow-3g",
119            Self::Fast3g => "fast-3g",
120            Self::Restore => "restore",
121        }
122    }
123}
124
125#[derive(Debug, Deserialize, JsonSchema)]
126#[serde(rename_all = "snake_case")]
127pub enum StoreType {
128    Localstorage,
129    Sessionstorage,
130    Cookie,
131}
132
133impl StoreType {
134    fn as_str(&self) -> &'static str {
135        match self {
136            Self::Localstorage => "localstorage",
137            Self::Sessionstorage => "sessionstorage",
138            Self::Cookie => "cookie",
139        }
140    }
141}
142
143#[derive(Debug, Deserialize, JsonSchema)]
144pub struct ActParams {
145    pub action: DebugAction,
146    #[schemars(description = "Target tab ID (omit for first/default tab)")]
147    pub tab_id: Option<String>,
148    #[schemars(description = "JavaScript expression to evaluate (for eval_js)")]
149    pub expression: Option<String>,
150    #[schemars(description = "CSS selector for element screenshot (for screenshot)")]
151    pub selector: Option<String>,
152    #[schemars(description = "CSS text to inject (for inject_css)")]
153    pub css: Option<String>,
154    #[schemars(description = "Track injected CSS for later revert (for inject_css, default true)")]
155    pub temporary: Option<bool>,
156    #[schemars(
157        description = "Device serial for device screenshot (e.g. 'emulator-5554'). When provided with action='screenshot', captures from the device instead of the browser. Uses host window capture for emulators, ADB for physical devices."
158    )]
159    pub device_serial: Option<String>,
160    #[schemars(
161        description = "Device platform hint: 'android' or 'vega'. Used with device_serial to select the right capture method. Defaults to 'android'."
162    )]
163    pub device_platform: Option<String>,
164    #[schemars(
165        description = "Viewport width in CSS pixels (for set_viewport). iPhone 15=390, Pixel 8=412, iPad=820, desktop=1280"
166    )]
167    pub viewport_width: Option<u32>,
168    #[schemars(
169        description = "Viewport height in CSS pixels (for set_viewport). iPhone 15=844, Pixel 8=915, iPad=1180, desktop=800"
170    )]
171    pub viewport_height: Option<u32>,
172    #[schemars(
173        description = "Device pixel ratio / scale factor (for set_viewport). iPhone 15=3.0, Pixel 8=2.625, iPad=2.0, desktop=1.0"
174    )]
175    pub viewport_scale: Option<f64>,
176    #[schemars(
177        description = "Enable mobile emulation with touch events (for set_viewport). true for mobile devices, false for desktop"
178    )]
179    pub viewport_mobile: Option<bool>,
180    #[schemars(description = "User-agent string override (for set_viewport, optional)")]
181    pub viewport_ua: Option<String>,
182    #[schemars(
183        description = "Network preset for network_conditions. offline=no connectivity, slow-3g=400ms/780Kbps, fast-3g=150ms/1.6Mbps, restore=remove throttling"
184    )]
185    pub network_preset: Option<NetworkPreset>,
186    #[schemars(description = "Storage type for storage_set")]
187    pub store_type: Option<StoreType>,
188    #[schemars(description = "Storage key to read or write (for storage_set)")]
189    pub storage_key: Option<String>,
190    #[schemars(description = "Storage value to write (for storage_set)")]
191    pub storage_value: Option<String>,
192    #[schemars(
193        description = "Comma-separated storage types to clear (for storage_clear): 'cookies', 'local_storage', 'session_storage', 'indexeddb', 'cache_storage', 'service_workers', 'all'. Default: 'all'"
194    )]
195    pub storage_types: Option<String>,
196    #[schemars(description = "X coordinate in CSS pixels (for element_at_point)")]
197    pub x: Option<f64>,
198    #[schemars(description = "Y coordinate in CSS pixels (for element_at_point)")]
199    pub y: Option<f64>,
200    #[schemars(description = "URL to navigate to (for navigate)")]
201    pub url: Option<String>,
202}
203
204#[derive(Debug, Deserialize, JsonSchema)]
205pub struct IngestParams {
206    #[schemars(
207        description = "Your agent or application name (e.g. 'my-agent'). Used for filtering with origins=['app:name']."
208    )]
209    pub app: Option<String>,
210
211    #[schemars(
212        description = "Observation kind: log, metric, query, exception, custom. Defaults to log."
213    )]
214    pub kind: Option<String>,
215
216    #[schemars(
217        description = "Severity: trace, debug, info, warn, error. Defaults to debug. Setting warn or error triggers a real-time alert push to all connected agent sessions."
218    )]
219    pub severity: Option<String>,
220
221    #[schemars(
222        description = "The observation payload (JSON object). Use a 'message' key for clean alert formatting: {\"message\": \"what happened\"}. Additional fields are preserved."
223    )]
224    pub data: serde_json::Value,
225
226    #[schemars(description = "Channel name for custom kind observations.")]
227    pub channel: Option<String>,
228
229    #[schemars(description = "Correlation ID to group related observations across sources")]
230    pub correlation_id: Option<String>,
231
232    #[schemars(description = "Parent observation ID for causal chains")]
233    pub parent_id: Option<u64>,
234
235    #[schemars(
236        description = "Tags for categorization (e.g. [\"reasoning\", \"high-confidence\"])"
237    )]
238    pub tags: Option<Vec<String>>,
239
240    #[schemars(description = "Agent session ID that produced this observation")]
241    pub session_id: Option<String>,
242
243    #[schemars(description = "Daemon instance node ID")]
244    pub node_id: Option<String>,
245}
246
247#[derive(Debug, Deserialize, JsonSchema)]
248pub struct SubscribeParams {
249    #[schemars(
250        description = "Filter by observation kind: log, query, http_exchange, exception, state_snapshot, js_exception, lifecycle, metric, custom. Omit for all kinds."
251    )]
252    pub kinds: Option<Vec<String>>,
253
254    #[schemars(
255        description = "Minimum severity threshold: trace, debug, info, warn, error. Default: warn (only warn and error push alerts)."
256    )]
257    pub severity_min: Option<String>,
258
259    #[schemars(
260        description = "Filter by origin patterns: 'app', 'app:name', 'browser', 'browser:tab_id', 'device', or 'device:serial'. Omit for all origins."
261    )]
262    pub origins: Option<Vec<String>>,
263
264    #[schemars(
265        description = "Search across materialized observation text. Omit for no text filtering."
266    )]
267    pub text_match: Option<String>,
268
269    #[schemars(description = "Filter by correlation ID (exact match)")]
270    pub correlation_id: Option<String>,
271
272    #[schemars(description = "Filter by tags (all listed tags must be present)")]
273    pub tags: Option<Vec<String>>,
274
275    #[schemars(
276        description = "Include system/infrastructure observations (tagged '_system'). Excluded by default."
277    )]
278    pub include_system: Option<bool>,
279}
280
281#[derive(Debug, Deserialize, JsonSchema)]
282pub struct LensParams {
283    #[schemars(
284        description = "Filter by observation kind: log, query, http_exchange, exception, state_snapshot, js_exception, lifecycle, metric, custom"
285    )]
286    pub kinds: Option<Vec<String>>,
287
288    #[schemars(description = "Minimum severity threshold: trace, debug, info, warn, error")]
289    pub severity_min: Option<String>,
290
291    #[schemars(description = "Filter by origin pattern: 'app:name', 'browser', 'device:serial'")]
292    pub origins: Option<Vec<String>>,
293
294    #[schemars(description = "Search across materialized observation text")]
295    pub text_match: Option<String>,
296
297    #[schemars(description = "Filter by correlation ID (exact match)")]
298    pub correlation_id: Option<String>,
299
300    #[schemars(description = "Filter by tags (all listed tags must be present)")]
301    pub tags: Option<Vec<String>>,
302
303    #[schemars(description = "Maximum observations to buffer (default 200)")]
304    pub capacity: Option<usize>,
305}
306
307#[derive(Debug, Deserialize, JsonSchema)]
308pub struct SaveMemoryParams {
309    #[schemars(description = "The memory content to persist")]
310    pub content: String,
311
312    #[schemars(
313        description = "Memory kind: pattern, decision, error_signature, session_summary, user_flagged. Defaults to user_flagged."
314    )]
315    pub kind: Option<String>,
316
317    #[schemars(description = "Tags for categorization and retrieval")]
318    pub tags: Option<Vec<String>>,
319
320    #[schemars(description = "Observation IDs that informed this memory")]
321    pub source_observations: Option<Vec<u64>>,
322
323    #[schemars(description = "Project slug to scope this memory to")]
324    pub project_slug: Option<String>,
325
326    #[schemars(description = "Session ID that produced this memory")]
327    pub session_id: Option<String>,
328
329    #[schemars(description = "Confidence score from 0.0 to 1.0 (default 1.0)")]
330    pub confidence: Option<f64>,
331}
332
333#[derive(Debug, Deserialize, JsonSchema)]
334pub struct QueryMemoryParams {
335    #[schemars(description = "Substring search across memory content")]
336    pub text: Option<String>,
337
338    #[schemars(
339        description = "Filter by kind: pattern, decision, error_signature, session_summary, user_flagged"
340    )]
341    pub kinds: Option<Vec<String>>,
342
343    #[schemars(description = "Filter by tags (all listed tags must be present)")]
344    pub tags: Option<Vec<String>>,
345
346    #[schemars(description = "Filter by project slug")]
347    pub project_slug: Option<String>,
348
349    #[schemars(description = "Maximum number of results (default 20)")]
350    pub limit: Option<u64>,
351}
352
353#[derive(Debug, Deserialize, JsonSchema)]
354pub struct ForgetMemoryParams {
355    #[schemars(description = "The memory ID to delete")]
356    pub id: String,
357    #[schemars(
358        description = "Required to confirm deletion. Must be true to delete the memory; any other value (including absent) returns an error and leaves the memory intact."
359    )]
360    pub confirm: Option<bool>,
361}
362
363#[derive(Debug, Deserialize, JsonSchema)]
364pub struct HelpParams {
365    #[schemars(
366        description = "Help topic: index, debug_session, checkpoint, setup, hooks, lens, memory, observations, envelope, librarian. Omit for index."
367    )]
368    pub topic: Option<String>,
369}
370
371#[derive(Debug, Deserialize, JsonSchema)]
372pub struct LibrarianIndexEdge {
373    #[schemars(
374        description = "Edge kind: has_source | documented_by | fixes | supersedes | child_of"
375    )]
376    pub kind: String,
377    #[schemars(description = "Target catalog node ID for this edge")]
378    pub target_node_id: String,
379}
380
381#[derive(Debug, Deserialize, JsonSchema)]
382pub struct LibrarianIndexParams {
383    #[schemars(description = "Node kind: doc | source_template | fix | project")]
384    pub kind: String,
385    #[schemars(description = "Human-readable name for this reference")]
386    pub label: String,
387    #[schemars(description = "Locator type: file | url | vault")]
388    pub locator_kind: String,
389    #[schemars(description = "The actual pointer — a file path, URL, or vault note path")]
390    pub locator: String,
391    #[schemars(description = "Free-form retrieval tags")]
392    pub tags: Option<Vec<String>>,
393    #[schemars(description = "Project slug to scope this reference")]
394    pub project_slug: Option<String>,
395    #[schemars(description = "Place under an existing catalog node for hierarchy")]
396    pub parent_id: Option<String>,
397    #[schemars(description = "Optional edge to create at index time")]
398    pub edge: Option<LibrarianIndexEdge>,
399    #[schemars(
400        description = "Mark as authoritative reference — canonicalized nodes are never flagged as stale"
401    )]
402    pub canonicalize: Option<bool>,
403}
404
405#[derive(Debug, Deserialize, JsonSchema)]
406pub struct LibrarianLookupParams {
407    #[schemars(description = "Look up a single node by ID (returns node + edges)")]
408    pub id: Option<String>,
409    #[schemars(description = "Filter by kind: doc | source_template | fix | project")]
410    pub kinds: Option<Vec<String>>,
411    #[schemars(description = "Filter by tags")]
412    pub tags: Option<Vec<String>>,
413    #[schemars(description = "Scope to a project")]
414    pub project_slug: Option<String>,
415    #[schemars(description = "Case-insensitive search across label and locator")]
416    pub text: Option<String>,
417    #[schemars(description = "Max results. Default 20, max 500.")]
418    pub limit: Option<u32>,
419    #[schemars(description = "Include superseded/deprecated entries. Default false.")]
420    pub include_deprecated: Option<bool>,
421    #[schemars(description = "Find nodes not accessed in N days")]
422    pub stale_before_days: Option<u32>,
423    #[schemars(description = "Browse children of a specific catalog node")]
424    pub parent_id: Option<String>,
425}
426
427#[derive(Debug, Deserialize, JsonSchema)]
428pub struct LibrarianForgetParams {
429    #[schemars(description = "Catalog node ID to remove or deprecate")]
430    pub id: String,
431    #[schemars(
432        description = "Required for hard delete (deprecate=false). Must be true to proceed."
433    )]
434    pub confirm: Option<bool>,
435    #[schemars(
436        description = "Default true. When true: soft-delete (deprecated_at set). When false and confirm=true: permanent removal."
437    )]
438    pub deprecate: Option<bool>,
439}
440
441#[derive(Debug, Deserialize, JsonSchema)]
442pub struct CreateCheckpointParams {
443    #[schemars(
444        description = "Optional human-readable note about why this checkpoint exists (e.g. \"before applying retry patch\")."
445    )]
446    pub description: Option<String>,
447}
448
449#[derive(Debug, Deserialize, JsonSchema)]
450pub struct StartDebugSessionParams {
451    #[schemars(description = "Project slug to scope the session to (e.g. \"daemon8\").")]
452    pub project: Option<String>,
453    #[schemars(description = "One-line description of what is being investigated.")]
454    pub description: Option<String>,
455    #[schemars(
456        description = "Required. Agent identity in format :host/tool+role> (e.g. :mbp/claude+plan-agent>). Identifies who is running this investigation."
457    )]
458    pub agent_id: String,
459    #[schemars(
460        description = "Optional. Feature being investigated (e.g. 'auth', 'search'). Used by other agents to discover overlapping work."
461    )]
462    pub feature: Option<String>,
463}
464
465#[derive(Debug, Deserialize, JsonSchema)]
466pub struct EndDebugSessionParams {
467    #[schemars(
468        description = "Outcome string. Defaults to \"abandoned\". Use resolve_debug_session for \"resolved\"."
469    )]
470    pub outcome: Option<String>,
471}
472
473#[derive(Debug, Deserialize, JsonSchema)]
474pub struct ResolveDebugSessionParams {
475    #[schemars(description = "Required: human summary of what broke and what fixed it.")]
476    pub summary: String,
477    #[schemars(description = "Optional: one-sentence root cause.")]
478    pub root_cause: Option<String>,
479    #[schemars(description = "Optional: unified diff or short patch text.")]
480    pub fix_diff: Option<String>,
481    #[schemars(description = "Optional: CLI commands that mattered to the fix.")]
482    pub commands_used: Option<Vec<String>>,
483    #[schemars(description = "Optional: error_hash strings this fix resolves.")]
484    pub related_errors: Option<Vec<String>>,
485    #[schemars(description = "Optional: extra tags for retrieval.")]
486    pub tags: Option<Vec<String>>,
487}
488
489#[derive(Debug, Deserialize, JsonSchema)]
490pub struct ListDebugSessionsParams {
491    #[schemars(description = "Filter by status: active, completed, abandoned. Omit for all.")]
492    pub status: Option<String>,
493    #[schemars(
494        description = "Optional. Filter by feature name (e.g. 'auth', 'search'). Returns only sessions investigating that feature."
495    )]
496    pub feature: Option<String>,
497}
498
499#[derive(Debug, Clone, Deserialize, JsonSchema)]
500pub struct SetupToolAction {
501    #[schemars(description = "Setup action: status or apply.")]
502    pub action: String,
503    #[schemars(description = "Project working directory. Defaults to daemon current directory.")]
504    pub cwd: Option<String>,
505    #[schemars(description = "Required to confirm mutating setup_apply.")]
506    pub yes: Option<bool>,
507    #[schemars(
508        description = "Comma-separated providers to configure (e.g. \"claude-code,gemini,codex\"). Omit for auto-detection."
509    )]
510    pub providers: Option<String>,
511}
512
513#[derive(Debug, Deserialize, JsonSchema)]
514pub struct SetupStatusParams {
515    #[schemars(description = "Project working directory. Defaults to daemon current directory.")]
516    pub cwd: Option<String>,
517}
518
519#[derive(Debug, Deserialize, JsonSchema)]
520pub struct SetupPlanParams {
521    #[schemars(description = "Project working directory. Defaults to daemon current directory.")]
522    pub cwd: Option<String>,
523}
524
525#[derive(Debug, Deserialize, JsonSchema)]
526pub struct SetupApplyParams {
527    #[schemars(description = "Project working directory. Defaults to daemon current directory.")]
528    pub cwd: Option<String>,
529    #[schemars(description = "Required to confirm setup_apply writes.")]
530    pub yes: bool,
531    #[schemars(
532        description = "Comma-separated providers to configure (e.g. \"claude-code,gemini,codex\"). Omit for auto-detection."
533    )]
534    pub providers: Option<String>,
535}
536
537pub type SetupToolFn =
538    Arc<dyn Fn(SetupToolAction) -> Pin<Box<dyn Future<Output = String> + Send>> + Send + Sync>;
539
540#[derive(Debug, Clone, Deserialize, JsonSchema)]
541pub struct HooksToolAction {
542    #[schemars(description = "Action: list, remove, update, repair.")]
543    pub action: String,
544    #[schemars(description = "Provider for remove/update: claude or codex.")]
545    pub provider: Option<String>,
546    #[schemars(description = "Scope for remove/update (claude only): local, shared, or global.")]
547    pub scope: Option<String>,
548}
549
550pub type HooksToolFn =
551    Arc<dyn Fn(HooksToolAction) -> Pin<Box<dyn Future<Output = String> + Send>> + Send + Sync>;
552
553pub struct DaemonMcp {
554    store: Arc<dyn StateModel>,
555    memory_store: Option<Arc<dyn MemoryStore>>,
556    debug_session_store: Option<Arc<dyn DebugSessionStore>>,
557    librarian_store: Option<Arc<dyn LibrarianStore>>,
558    active_state: ActiveSessionState,
559    obs_tx: tokio::sync::mpsc::UnboundedSender<Observation>,
560    chrome_tx: tokio::sync::mpsc::Sender<ChromeCommand>,
561    chrome_state: tokio::sync::watch::Receiver<daemon8_chrome::ConnectionState>,
562    chrome_endpoint: Arc<Mutex<Option<Arc<str>>>>,
563    last_checkpoint: Mutex<Checkpoint>,
564    device_screenshot_fn: Option<DeviceScreenshotFn>,
565    screenshot_dir: std::path::PathBuf,
566    /// Per-session subscription filter. Each `DaemonMcp` instance owns its own
567    /// channel so concurrent MCP sessions do not overwrite each other's
568    /// `subscribe_observations` filter.
569    subscription_tx: tokio::sync::watch::Sender<Option<Filter>>,
570    broadcast_tx: broadcast::Sender<(Arc<Observation>, Arc<str>)>,
571    lens: Arc<LensManager>,
572    setup_tool_fn: Option<SetupToolFn>,
573    hooks_tool_fn: Option<HooksToolFn>,
574    source_activator: Option<Arc<dyn SourceActivator>>,
575    /// Parent cancellation token. The push task spawned in `on_initialized`
576    /// uses a child token so daemon shutdown propagates cleanly.
577    cancel: tokio_util::sync::CancellationToken,
578    enabled_features: Vec<FeatureGate>,
579    tool_router: ToolRouter<Self>,
580}
581
582pub struct DaemonMcpConfig {
583    pub store: Arc<dyn StateModel>,
584    pub memory_store: Option<Arc<dyn MemoryStore>>,
585    pub debug_session_store: Option<Arc<dyn DebugSessionStore>>,
586    pub librarian_store: Option<Arc<dyn LibrarianStore>>,
587    pub obs_tx: tokio::sync::mpsc::UnboundedSender<Observation>,
588    pub chrome_tx: tokio::sync::mpsc::Sender<ChromeCommand>,
589    pub chrome_state: tokio::sync::watch::Receiver<daemon8_chrome::ConnectionState>,
590    pub chrome_endpoint: Arc<Mutex<Option<Arc<str>>>>,
591    pub device_screenshot_fn: Option<DeviceScreenshotFn>,
592    pub screenshot_dir: std::path::PathBuf,
593    pub broadcast_tx: broadcast::Sender<(Arc<Observation>, Arc<str>)>,
594    pub lens: Arc<LensManager>,
595    pub setup_tool_fn: Option<SetupToolFn>,
596    pub hooks_tool_fn: Option<HooksToolFn>,
597    pub source_activator: Option<Arc<dyn SourceActivator>>,
598    /// Parent cancellation token. Use the daemon-wide token; the MCP push task
599    /// derives a child from it so daemon shutdown stops per-session work.
600    pub cancel: tokio_util::sync::CancellationToken,
601}
602
603#[tool_router(vis = "pub")]
604impl DaemonMcp {
605    pub fn new(cfg: DaemonMcpConfig) -> Self {
606        let mut router = Self::tool_router();
607        router += Self::action_tool_router();
608        router += Self::lens_tool_router();
609        if cfg.memory_store.is_some() {
610            router += Self::memory_tool_router();
611        }
612        if cfg.debug_session_store.is_some() && cfg.memory_store.is_some() {
613            router += Self::debug_session_tool_router();
614        }
615        if cfg.setup_tool_fn.is_some() {
616            router += Self::setup_tool_router();
617        }
618        if cfg.hooks_tool_fn.is_some() {
619            router += Self::hooks_tool_router();
620        }
621        if cfg.librarian_store.is_some() {
622            router += Self::librarian_tool_router();
623        }
624        let mut enabled_features = Vec::new();
625        if cfg.memory_store.is_some() {
626            enabled_features.push(FeatureGate::Memory);
627        }
628        if cfg.debug_session_store.is_some() && cfg.memory_store.is_some() {
629            enabled_features.push(FeatureGate::DebugSession);
630        }
631        if cfg.setup_tool_fn.is_some() {
632            enabled_features.push(FeatureGate::Setup);
633        }
634        if cfg.hooks_tool_fn.is_some() {
635            enabled_features.push(FeatureGate::Hooks);
636        }
637        if cfg.librarian_store.is_some() {
638            enabled_features.push(FeatureGate::Librarian);
639        }
640        let (subscription_tx, _) = tokio::sync::watch::channel::<Option<Filter>>(None);
641        Self {
642            store: cfg.store,
643            memory_store: cfg.memory_store,
644            debug_session_store: cfg.debug_session_store,
645            librarian_store: cfg.librarian_store,
646            active_state: ActiveSessionState::new(),
647            obs_tx: cfg.obs_tx,
648            chrome_tx: cfg.chrome_tx,
649            chrome_state: cfg.chrome_state,
650            chrome_endpoint: cfg.chrome_endpoint,
651            last_checkpoint: Mutex::new(Checkpoint(0)),
652            device_screenshot_fn: cfg.device_screenshot_fn,
653            screenshot_dir: cfg.screenshot_dir,
654            subscription_tx,
655            broadcast_tx: cfg.broadcast_tx,
656            lens: cfg.lens,
657            setup_tool_fn: cfg.setup_tool_fn,
658            hooks_tool_fn: cfg.hooks_tool_fn,
659            source_activator: cfg.source_activator,
660            cancel: cfg.cancel,
661            enabled_features,
662            tool_router: router,
663        }
664    }
665
666    pub fn subscription_rx(&self) -> tokio::sync::watch::Receiver<Option<Filter>> {
667        self.subscription_tx.subscribe()
668    }
669
670    /// Set this session's subscription filter directly. Equivalent to invoking
671    /// the `subscribe_observations` tool — exposed for integration tests that
672    /// need to verify per-session subscription scoping without driving the
673    /// rmcp tool router. Gated behind the `test-util` feature so it does not
674    /// appear in the public API of release builds.
675    #[cfg(feature = "test-util")]
676    pub fn set_subscription(&self, filter: Option<Filter>) {
677        self.subscription_tx.send_replace(filter);
678    }
679
680    /// Derive a child cancellation token from this session's stored parent
681    /// token. Mirrors what `on_initialized` does for the per-session push
682    /// task; exposed so integration tests can prove daemon-shutdown
683    /// propagates into per-session work.
684    #[cfg(feature = "test-util")]
685    pub fn child_cancel_token(&self) -> tokio_util::sync::CancellationToken {
686        self.cancel.child_token()
687    }
688
689    #[cfg(feature = "test-util")]
690    pub fn help_index_body(&self) -> String {
691        help::build_dynamic_index(&self.enabled_features, self.librarian_store.is_some())
692    }
693
694    #[cfg(feature = "test-util")]
695    pub fn help_topic_body(&self, topic: &str) -> (String, String) {
696        match help::find_topic(topic, &self.enabled_features) {
697            Some(t) => (t.name.to_string(), t.body.to_string()),
698            None => (
699                "index".to_string(),
700                help::build_dynamic_index(&self.enabled_features, self.librarian_store.is_some()),
701            ),
702        }
703    }
704
705    /// Ensure Chrome is connected, waiting up to `timeout` for the connection.
706    /// Returns Ok if connected, Err with a user-facing error message if not.
707    async fn ensure_chrome_connected(&self, timeout: std::time::Duration) -> Result<(), String> {
708        use daemon8_chrome::ConnectionState;
709
710        let state = *self.chrome_state.borrow();
711        match state {
712            ConnectionState::Connected => Ok(()),
713            ConnectionState::Disconnected => {
714                let endpoint = self
715                    .chrome_endpoint
716                    .lock()
717                    .expect("chrome_endpoint mutex poisoned")
718                    .as_ref()
719                    .map(|s| s.to_string())
720                    .unwrap_or_else(|| "http://localhost:9222".to_string());
721                let _ = self
722                    .chrome_tx
723                    .send(ChromeCommand::Connect { endpoint })
724                    .await;
725                let result = self.wait_for_connected(timeout).await;
726                if result.is_ok() {
727                    tokio::time::sleep(std::time::Duration::from_secs(2)).await;
728                }
729                result
730            }
731            ConnectionState::Connecting | ConnectionState::Reconnecting => {
732                self.wait_for_connected(timeout).await
733            }
734        }
735    }
736
737    async fn wait_for_connected(&self, timeout: std::time::Duration) -> Result<(), String> {
738        use daemon8_chrome::ConnectionState;
739
740        let mut rx = self.chrome_state.clone();
741        let result = tokio::time::timeout(timeout, async {
742            loop {
743                if *rx.borrow_and_update() == ConnectionState::Connected {
744                    return true;
745                }
746                if rx.changed().await.is_err() {
747                    return false;
748                }
749            }
750        })
751        .await;
752        match result {
753            Ok(true) => Ok(()),
754            _ => Err(
755                "Browser connection timed out. The daemon will keep retrying in the background."
756                    .into(),
757            ),
758        }
759    }
760
761    #[doc = include_str!("../tool_descriptions/query_observations.md")]
762    #[tool(name = "query_observations")]
763    async fn query_observations(&self, Parameters(params): Parameters<ObserveParams>) -> String {
764        // If the caller wants browser observations, ensure Chrome is connected.
765        let wants_browser = params
766            .origins
767            .as_ref()
768            .is_some_and(|origins| origins.iter().any(|o| o.starts_with("browser")));
769        if wants_browser
770            && let Err(e) = self
771                .ensure_chrome_connected(std::time::Duration::from_secs(10))
772                .await
773        {
774            // Don't fail -- still query the store for whatever's there,
775            // but include the connection error in the response.
776            tracing::warn!("Browser not available for observation: {e}");
777        }
778
779        let kinds = params.kinds.map(Filter::kinds_from_vec);
780
781        let severity_min = params.severity_min.and_then(|s| Filter::parse_severity(&s));
782
783        let origins = params.origins.map(Filter::origins_from_vec);
784
785        let since = params.since_checkpoint.map(Checkpoint);
786
787        let filter = Filter {
788            kinds,
789            severity_min,
790            origins,
791            text_match: params.text_match,
792            since,
793            limit: Some(params.limit.unwrap_or(50).min(500)),
794            correlation_id: params.correlation_id,
795            tags: params.tags,
796            include_system: params.include_system,
797        };
798
799        if let Some(ref sa) = self.source_activator {
800            sa.touch_matching(&filter);
801        }
802
803        match self.store.query(&filter).await {
804            Ok(slice) => {
805                let mut result = serde_json::to_value(&slice).unwrap_or_default();
806
807                if wants_browser {
808                    let chrome_state = *self.chrome_state.borrow();
809                    result["browser_state"] = serde_json::json!(format!("{chrome_state}"));
810                }
811
812                let lens_obs = self.lens.drain().await;
813                if !lens_obs.is_empty() {
814                    result["lens_observations"] =
815                        serde_json::to_value(&lens_obs).unwrap_or_default();
816                    result["lens_count"] = serde_json::json!(lens_obs.len());
817                }
818
819                self.ok(result)
820            }
821            Err(e) => self.err("query_failed", &e.to_string(), None, None),
822        }
823    }
824
825    #[doc = include_str!("../tool_descriptions/status.md")]
826    #[tool(name = "status")]
827    async fn status(&self) -> String {
828        match self.store.summary().await {
829            Ok(summary) => match serde_json::to_value(&summary) {
830                Ok(mut val) => {
831                    if let Some(obj) = val.as_object_mut() {
832                        obj.insert(
833                            "daemon_version".into(),
834                            serde_json::Value::String(env!("CARGO_PKG_VERSION").to_string()),
835                        );
836                    }
837                    self.ok(val)
838                }
839                Err(e) => self.err("serialization_failed", &e.to_string(), None, None),
840            },
841            Err(e) => self.err("summary_failed", &e.to_string(), None, None),
842        }
843    }
844
845    #[tool(
846        name = "daemon8_help",
847        description = "Narrative documentation for daemon8 protocols. Pass topic='index' (or omit) for the topic list. Returns markdown."
848    )]
849    async fn daemon8_help(&self, Parameters(params): Parameters<HelpParams>) -> String {
850        let topic = params.topic.as_deref().unwrap_or("index");
851        if topic == "index" {
852            let body =
853                help::build_dynamic_index(&self.enabled_features, self.librarian_store.is_some());
854            return self.ok(serde_json::json!({ "topic": "index", "body": body }));
855        }
856        match help::find_topic(topic, &self.enabled_features) {
857            Some(t) => self.ok(serde_json::json!({ "topic": t.name, "body": t.body })),
858            None => {
859                let body = help::build_dynamic_index(
860                    &self.enabled_features,
861                    self.librarian_store.is_some(),
862                );
863                self.ok(serde_json::json!({ "topic": "index", "body": body }))
864            }
865        }
866    }
867
868    #[doc = include_str!("../tool_descriptions/create_checkpoint.md")]
869    #[tool(name = "create_checkpoint")]
870    async fn create_checkpoint(
871        &self,
872        Parameters(params): Parameters<CreateCheckpointParams>,
873    ) -> String {
874        let active = match self.active_state.current_session() {
875            Some(s) => s,
876            None => {
877                return self.err(
878                    "no_active_debug_session",
879                    "create_checkpoint requires an active debug session",
880                    Some("call start_debug_session first"),
881                    Some("start_debug_session"),
882                );
883            }
884        };
885        let ds_store = match &self.debug_session_store {
886            Some(s) => s,
887            None => {
888                return self.err(
889                    "internal_error",
890                    "debug_session store not available",
891                    None,
892                    None,
893                );
894            }
895        };
896
897        let seq = self.store.checkpoint().await;
898        let now = current_ns();
899        let cp = daemon8_store::DebugCheckpoint {
900            id: None,
901            debug_session_id: active.id.to_string(),
902            description: params.description,
903            created_at: now,
904            seq_at_creation: seq.0,
905        };
906        let cp_id = match ds_store.create_checkpoint(cp).await {
907            Ok(id) => id,
908            Err(e) => return self.err("create_checkpoint_failed", &e.to_string(), None, None),
909        };
910        self.active_state
911            .set_checkpoint(Some(Arc::from(cp_id.as_str())));
912        active.touch(now);
913        *self
914            .last_checkpoint
915            .lock()
916            .expect("last_checkpoint mutex poisoned") = seq;
917        self.ok_with(
918            serde_json::json!({
919                "checkpoint_id": cp_id,
920                "debug_session_id": active.id.as_ref(),
921                "seq_at_creation": seq.0,
922                "created_at": now
923            }),
924            vec!["query_observations"],
925            Some("checkpoint set; query_observations(since_checkpoint=...) shows what comes next"),
926        )
927    }
928
929    #[doc = include_str!("../tool_descriptions/list_connections.md")]
930    #[tool(name = "list_connections")]
931    async fn list_connections(&self) -> String {
932        wrap_inner_result(self, &self.connections_json().await)
933    }
934
935    #[doc = include_str!("../tool_descriptions/ingest_observation.md")]
936    #[tool(name = "ingest_observation")]
937    async fn ingest_observation(&self, Parameters(params): Parameters<IngestParams>) -> String {
938        let mut body = serde_json::Map::new();
939        if let Some(app) = params.app {
940            body.insert("app".into(), serde_json::Value::String(app));
941        } else {
942            body.insert("app".into(), serde_json::Value::String("agent".into()));
943        }
944        if let Some(kind) = params.kind {
945            body.insert("kind".into(), serde_json::Value::String(kind));
946        }
947        if let Some(severity) = params.severity {
948            body.insert("severity".into(), serde_json::Value::String(severity));
949        }
950        if let Some(channel) = params.channel {
951            body.insert("channel".into(), serde_json::Value::String(channel));
952        }
953        if let Some(cid) = params.correlation_id {
954            body.insert("correlation_id".into(), serde_json::Value::String(cid));
955        }
956        if let Some(pid) = params.parent_id {
957            body.insert("parent_id".into(), serde_json::Value::Number(pid.into()));
958        }
959        if let Some(tags) = params.tags {
960            body.insert(
961                "tags".into(),
962                serde_json::Value::Array(tags.into_iter().map(serde_json::Value::String).collect()),
963            );
964        }
965        if let Some(sid) = params.session_id {
966            body.insert("session_id".into(), serde_json::Value::String(sid));
967        }
968        if let Some(nid) = params.node_id {
969            body.insert("node_id".into(), serde_json::Value::String(nid));
970        }
971        body.insert("data".into(), params.data);
972
973        let mut obs = daemon8_ingest::normalize::normalize(serde_json::Value::Object(body));
974
975        // Stamp per-session debug-session and checkpoint links. Each DaemonMcp
976        // instance owns its own ActiveSessionState, so concurrent MCP sessions
977        // do not interfere with each other's observation stamping.
978        if let Some(ref session) = self.active_state.current_session() {
979            obs.debug_session_id = Some(session.id.clone());
980            let slug_tag = format!("project:{}", session.project_slug);
981            obs.tags = Some(match obs.tags {
982                Some(mut existing) => {
983                    if !existing.contains(&slug_tag) {
984                        existing.push(slug_tag);
985                    }
986                    existing
987                }
988                None => vec![slug_tag],
989            });
990        }
991        if let Some(cp) = self.active_state.current_checkpoint() {
992            obs.checkpoint_id = Some(cp);
993        }
994
995        if let Err(e) = self.obs_tx.send(obs) {
996            tracing::warn!(
997                origin = ?e.0.origin,
998                kind = %e.0.kind.tag(),
999                severity = %e.0.severity,
1000                "MCP ingest failed: observation channel closed"
1001            );
1002            return self.err(
1003                "daemon_shutting_down",
1004                "Daemon is shutting down.",
1005                None,
1006                None,
1007            );
1008        }
1009
1010        self.ok(serde_json::json!({"ok": true}))
1011    }
1012
1013    #[doc = include_str!("../tool_descriptions/subscribe_observations.md")]
1014    #[tool(name = "subscribe_observations")]
1015    async fn subscribe_observations(
1016        &self,
1017        Parameters(params): Parameters<SubscribeParams>,
1018    ) -> String {
1019        let kinds = params.kinds.map(Filter::kinds_from_vec);
1020
1021        let severity_min = params.severity_min.and_then(|s| Filter::parse_severity(&s));
1022
1023        let origins = params.origins.map(Filter::origins_from_vec);
1024
1025        let filter = Filter {
1026            kinds,
1027            severity_min,
1028            origins,
1029            text_match: params.text_match,
1030            since: None,
1031            limit: None,
1032            correlation_id: params.correlation_id,
1033            tags: params.tags,
1034            include_system: params.include_system,
1035        };
1036
1037        if let Some(ref sa) = self.source_activator {
1038            sa.touch_matching(&filter);
1039        }
1040
1041        let is_default = filter.kinds.is_none()
1042            && filter.severity_min.is_none()
1043            && filter.origins.is_none()
1044            && filter.text_match.is_none()
1045            && filter.correlation_id.is_none()
1046            && filter.tags.is_none()
1047            && filter.include_system.is_none();
1048
1049        if is_default {
1050            self.subscription_tx.send_replace(None);
1051            self.ok(serde_json::json!({
1052                "subscribed": true,
1053                "filter": "default (severity >= warn)"
1054            }))
1055        } else {
1056            self.subscription_tx.send_replace(Some(filter));
1057            self.ok(serde_json::json!({
1058                "subscribed": true,
1059                "filter": "custom"
1060            }))
1061        }
1062    }
1063}
1064
1065#[tool_router(router = action_tool_router, vis = "pub")]
1066impl DaemonMcp {
1067    #[doc = include_str!("../tool_descriptions/connect_browser.md")]
1068    #[tool(name = "connect_browser")]
1069    async fn connect_browser(&self, Parameters(params): Parameters<ConnectParams>) -> String {
1070        self.connect_browser_inner(params).await
1071    }
1072
1073    #[doc = include_str!("../tool_descriptions/issue_command.md")]
1074    #[tool(name = "issue_command")]
1075    async fn issue_command(&self, Parameters(params): Parameters<ActParams>) -> String {
1076        self.issue_command_inner(params).await
1077    }
1078}
1079
1080#[tool_router(router = lens_tool_router, vis = "pub")]
1081impl DaemonMcp {
1082    #[doc = include_str!("../tool_descriptions/set_lens.md")]
1083    #[tool(name = "set_lens")]
1084    async fn set_lens(&self, Parameters(params): Parameters<LensParams>) -> String {
1085        let filter = Filter {
1086            kinds: params.kinds.map(Filter::kinds_from_vec),
1087            severity_min: params.severity_min.and_then(|s| Filter::parse_severity(&s)),
1088            origins: params.origins.map(Filter::origins_from_vec),
1089            text_match: params.text_match,
1090            since: None,
1091            limit: None,
1092            correlation_id: params.correlation_id,
1093            tags: params.tags,
1094            include_system: None,
1095        };
1096
1097        if let Some(ref sa) = self.source_activator {
1098            sa.touch_matching(&filter);
1099        }
1100        let capacity = params.capacity.unwrap_or(200).min(1000);
1101        self.lens.set_with_capacity(filter, capacity).await;
1102
1103        let status = self.lens.status().await;
1104        self.ok(serde_json::to_value(&status).unwrap_or(serde_json::Value::Null))
1105    }
1106
1107    #[doc = include_str!("../tool_descriptions/clear_lens.md")]
1108    #[tool(name = "clear_lens")]
1109    async fn clear_lens(&self) -> String {
1110        self.lens.clear().await;
1111        self.ok(serde_json::json!({"cleared": true}))
1112    }
1113
1114    #[doc = include_str!("../tool_descriptions/lens_status.md")]
1115    #[tool(name = "lens_status")]
1116    async fn lens_status(&self) -> String {
1117        let status = self.lens.status().await;
1118        self.ok(serde_json::to_value(&status).unwrap_or(serde_json::Value::Null))
1119    }
1120}
1121
1122#[tool_router(router = memory_tool_router, vis = "pub")]
1123impl DaemonMcp {
1124    #[doc = include_str!("../tool_descriptions/save_memory.md")]
1125    #[tool(name = "save_memory")]
1126    async fn save_memory(&self, Parameters(params): Parameters<SaveMemoryParams>) -> String {
1127        let mem_store = match &self.memory_store {
1128            Some(s) => s,
1129            None => {
1130                return self.err(
1131                    "memory_store_unavailable",
1132                    "memory store not available",
1133                    None,
1134                    None,
1135                );
1136            }
1137        };
1138        let hint = if self.librarian_store.is_some() {
1139            detect_librarian_hint(&params.content)
1140        } else {
1141            None
1142        };
1143        let inner = save_memory_inner(mem_store.as_ref(), params).await;
1144        match hint {
1145            Some(h) => match serde_json::from_str::<serde_json::Value>(&inner) {
1146                Ok(v) if v.get("error").is_none() => self.ok_with(v, vec![], Some(&h)),
1147                _ => wrap_inner_result(self, &inner),
1148            },
1149            None => wrap_inner_result(self, &inner),
1150        }
1151    }
1152
1153    #[doc = include_str!("../tool_descriptions/query_memory.md")]
1154    #[tool(name = "query_memory")]
1155    async fn query_memory(&self, Parameters(params): Parameters<QueryMemoryParams>) -> String {
1156        let mem_store = match &self.memory_store {
1157            Some(s) => s,
1158            None => {
1159                return self.err(
1160                    "memory_store_unavailable",
1161                    "memory store not available",
1162                    None,
1163                    None,
1164                );
1165            }
1166        };
1167        let inner = query_memory_inner(mem_store.as_ref(), params).await;
1168        wrap_inner_result(self, &inner)
1169    }
1170
1171    #[doc = include_str!("../tool_descriptions/forget_memory.md")]
1172    #[tool(name = "forget_memory")]
1173    async fn forget_memory(&self, Parameters(params): Parameters<ForgetMemoryParams>) -> String {
1174        if let Err(msg) = check_forget_memory_confirm(params.confirm) {
1175            return self.err(
1176                "missing_confirm",
1177                &msg,
1178                Some("pass confirm=true to acknowledge deletion"),
1179                None,
1180            );
1181        }
1182
1183        let mem_store = match &self.memory_store {
1184            Some(s) => s,
1185            None => {
1186                return self.err(
1187                    "memory_store_unavailable",
1188                    "memory store not available",
1189                    None,
1190                    None,
1191                );
1192            }
1193        };
1194
1195        match mem_store.forget_memory(&params.id).await {
1196            Ok(existed) => self.ok(serde_json::json!({ "deleted": existed })),
1197            Err(e) => self.err("forget_memory_failed", &e.to_string(), None, None),
1198        }
1199    }
1200}
1201
1202/// Wrap a JSON string from an inner helper (no envelope) into the standard
1203/// envelope shape. Pure best-effort: malformed JSON falls back to a string
1204/// payload so the LLM still gets *something* readable instead of an opaque
1205/// error.
1206fn wrap_inner_result(daemon: &DaemonMcp, raw: &str) -> String {
1207    match serde_json::from_str::<serde_json::Value>(raw) {
1208        Ok(v) => {
1209            // If the inner function already produced an envelope-shaped value
1210            // (i.e. `error_json(...)` from inside the inner), pass it through
1211            // by extracting the error and rewrapping. Otherwise it's a raw
1212            // success payload to wrap as `result`.
1213            if let Some(err_obj) = v.get("error").and_then(|e| e.as_object()) {
1214                let code = err_obj
1215                    .get("code")
1216                    .and_then(|c| c.as_str())
1217                    .unwrap_or("internal_error");
1218                let message = err_obj
1219                    .get("message")
1220                    .and_then(|m| m.as_str())
1221                    .unwrap_or("(no message)");
1222                return daemon.err(code, message, None, None);
1223            }
1224            daemon.ok(v)
1225        }
1226        Err(_) => daemon.ok(serde_json::json!({"raw": raw})),
1227    }
1228}
1229
1230// MVP-12-A1: confirmation gate parallel to setup_apply's `yes=true` requirement.
1231// Without this, an MCP client that misroutes a delete intent (or hallucinates a
1232// memory id) silently destroys data. The gate forces an explicit boolean.
1233fn check_forget_memory_confirm(confirm: Option<bool>) -> Result<(), String> {
1234    match confirm {
1235        Some(true) => Ok(()),
1236        _ => Err("forget_memory requires confirm=true to delete the memory".into()),
1237    }
1238}
1239
1240/// Validate agent_id against the `:host/tool+role>` convention.
1241/// All lowercase, bounded by `:` prefix and `>` suffix, `/` separates host from tool,
1242/// `+` separates tool from role. Max 64 chars total.
1243fn validate_agent_id(id: &str) -> Result<(), String> {
1244    if id.len() > 64 {
1245        return Err("agent_id must be at most 64 characters".into());
1246    }
1247    let body = id
1248        .strip_prefix(':')
1249        .and_then(|s| s.strip_suffix('>'))
1250        .ok_or("agent_id must start with ':' and end with '>'")?;
1251    let (host_rest, role) = body
1252        .split_once('+')
1253        .ok_or("agent_id must contain '+' separating tool from role")?;
1254    let (host, tool) = host_rest
1255        .split_once('/')
1256        .ok_or("agent_id must contain '/' separating host from tool")?;
1257    if host.is_empty() || tool.is_empty() || role.is_empty() {
1258        return Err("host, tool, and role must be non-empty".into());
1259    }
1260    let valid_segment = |s: &str| {
1261        s.chars()
1262            .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-')
1263    };
1264    if !valid_segment(host) || !valid_segment(tool) || !valid_segment(role) {
1265        return Err("agent_id segments must be lowercase alphanumeric with hyphens only".into());
1266    }
1267    Ok(())
1268}
1269
1270#[tool_router(router = debug_session_tool_router, vis = "pub")]
1271impl DaemonMcp {
1272    #[doc = include_str!("../tool_descriptions/start_debug_session.md")]
1273    #[tool(name = "start_debug_session")]
1274    async fn start_debug_session(
1275        &self,
1276        Parameters(params): Parameters<StartDebugSessionParams>,
1277    ) -> String {
1278        let ds_store = match &self.debug_session_store {
1279            Some(s) => s,
1280            None => {
1281                return self.err(
1282                    "debug_session_unavailable",
1283                    "debug_session store not available",
1284                    Some("ensure setup_apply has run"),
1285                    Some("setup_apply"),
1286                );
1287            }
1288        };
1289        if let Some(existing) = self.active_state.current_session() {
1290            return self.err(
1291                "already_active_debug_session",
1292                &format!("session {} is already active", existing.id),
1293                Some(
1294                    "call end_debug_session(outcome=\"abandoned\") or resolve_debug_session first",
1295                ),
1296                Some("end_debug_session"),
1297            );
1298        }
1299        if let Err(msg) = validate_agent_id(&params.agent_id) {
1300            return self.err(
1301                "invalid_agent_id",
1302                &msg,
1303                Some("agent_id format: :host/tool+role> (e.g. :mbp/claude+plan-agent>)"),
1304                None,
1305            );
1306        }
1307        let now = current_ns();
1308        let session = daemon8_store::DebugSession {
1309            id: None,
1310            started_at: now,
1311            ended_at: None,
1312            last_activity: now,
1313            project_slug: params.project.unwrap_or_else(|| "unknown".into()),
1314            description: params.description,
1315            status: daemon8_types::DebugSessionStatus::Active,
1316            outcome: None,
1317            summary_memory_id: None,
1318            agent_id: params.agent_id.clone(),
1319            feature: params.feature.clone(),
1320        };
1321        match ds_store.start_debug_session(session.clone()).await {
1322            Ok(id) => {
1323                self.active_state
1324                    .set_session(Some(daemon8_store::ActiveDebugSession {
1325                        id: Arc::from(id.as_str()),
1326                        project_slug: Arc::from(session.project_slug.as_str()),
1327                        started_at_ns: now,
1328                        last_activity_ns: Arc::new(AtomicU64::new(now)),
1329                        agent_id: Arc::from(params.agent_id.as_str()),
1330                        feature: params.feature.as_deref().map(Arc::from),
1331                    }));
1332                self.ok_with(
1333                    serde_json::json!({
1334                        "debug_session_id": id,
1335                        "started_at": now,
1336                    }),
1337                    vec!["create_checkpoint", "query_observations"],
1338                    Some("debug session opened; checkpoint before any change you might want to roll back through"),
1339                )
1340            }
1341            Err(e) => self.err("start_debug_session_failed", &e.to_string(), None, None),
1342        }
1343    }
1344
1345    #[doc = include_str!("../tool_descriptions/end_debug_session.md")]
1346    #[tool(name = "end_debug_session")]
1347    async fn end_debug_session(
1348        &self,
1349        Parameters(params): Parameters<EndDebugSessionParams>,
1350    ) -> String {
1351        end_or_resolve_inner(
1352            self,
1353            EndIntent::Abandon {
1354                outcome_str: params.outcome,
1355            },
1356        )
1357        .await
1358    }
1359
1360    #[doc = include_str!("../tool_descriptions/resolve_debug_session.md")]
1361    #[tool(name = "resolve_debug_session")]
1362    async fn resolve_debug_session(
1363        &self,
1364        Parameters(params): Parameters<ResolveDebugSessionParams>,
1365    ) -> String {
1366        end_or_resolve_inner(self, EndIntent::Resolve(params)).await
1367    }
1368
1369    #[doc = include_str!("../tool_descriptions/list_debug_sessions.md")]
1370    #[tool(name = "list_debug_sessions")]
1371    async fn list_debug_sessions(
1372        &self,
1373        Parameters(params): Parameters<ListDebugSessionsParams>,
1374    ) -> String {
1375        let ds_store = match &self.debug_session_store {
1376            Some(s) => s,
1377            None => {
1378                return self.err(
1379                    "debug_session_unavailable",
1380                    "debug_session store not available",
1381                    Some("ensure setup_apply has run"),
1382                    Some("setup_apply"),
1383                );
1384            }
1385        };
1386        let status = match params.status.as_deref() {
1387            Some(s) => match s.parse::<daemon8_types::DebugSessionStatus>() {
1388                Ok(v) => Some(v),
1389                Err(e) => return self.err("bad_status", &e, None, None),
1390            },
1391            None => None,
1392        };
1393        match ds_store.list_debug_sessions(status).await {
1394            Ok(mut sessions) => {
1395                if let Some(ref feat) = params.feature {
1396                    sessions.retain(|s| s.feature.as_deref() == Some(feat.as_str()));
1397                }
1398                self.ok(serde_json::json!({
1399                    "count": sessions.len(),
1400                    "sessions": sessions,
1401                }))
1402            }
1403            Err(e) => self.err("list_debug_sessions_failed", &e.to_string(), None, None),
1404        }
1405    }
1406}
1407
1408enum EndIntent {
1409    Abandon { outcome_str: Option<String> },
1410    Resolve(ResolveDebugSessionParams),
1411}
1412
1413async fn end_or_resolve_inner(daemon: &DaemonMcp, intent: EndIntent) -> String {
1414    let ds_store = match &daemon.debug_session_store {
1415        Some(s) => s,
1416        None => {
1417            return daemon.err(
1418                "debug_session_unavailable",
1419                "debug_session store not available",
1420                None,
1421                None,
1422            );
1423        }
1424    };
1425    let mem_store = match &daemon.memory_store {
1426        Some(s) => s,
1427        None => {
1428            return daemon.err(
1429                "memory_store_unavailable",
1430                "memory store not available",
1431                None,
1432                None,
1433            );
1434        }
1435    };
1436    let active = match daemon.active_state.current_session() {
1437        Some(s) => s,
1438        None => {
1439            return daemon.err(
1440                "no_active_debug_session",
1441                "no active debug session to end/resolve",
1442                Some("call start_debug_session first"),
1443                Some("start_debug_session"),
1444            );
1445        }
1446    };
1447    let now = current_ns();
1448
1449    // Gather source observations from this session's checkpoints. Falls back
1450    // to the bare seq range from start..now if checkpoint listing fails.
1451    let checkpoints = ds_store
1452        .list_checkpoints(active.id.as_ref())
1453        .await
1454        .unwrap_or_default();
1455    let mut source_observations: Vec<u64> =
1456        checkpoints.iter().map(|cp| cp.seq_at_creation).collect();
1457
1458    let (outcome, summary_text, tags, data_blob) = match intent {
1459        EndIntent::Abandon { outcome_str } => {
1460            let outcome = outcome_str
1461                .as_deref()
1462                .and_then(|s| s.parse::<daemon8_types::DebugSessionOutcome>().ok())
1463                .unwrap_or(daemon8_types::DebugSessionOutcome::Abandoned);
1464            let summary = format!(
1465                "Debug session abandoned. Project: {}, started_at_ns: {}, checkpoints: {}.",
1466                active.project_slug,
1467                active.started_at_ns,
1468                checkpoints.len()
1469            );
1470            let tags = vec![
1471                "kind:debug_session_summary".to_string(),
1472                format!("project:{}", active.project_slug),
1473                format!("outcome:{}", outcome),
1474            ];
1475            (outcome, summary, tags, None)
1476        }
1477        EndIntent::Resolve(params) => {
1478            let mut tags = vec![
1479                "kind:debug_session_summary".to_string(),
1480                format!("project:{}", active.project_slug),
1481                "outcome:resolved".to_string(),
1482            ];
1483            if let Some(extra) = &params.tags {
1484                tags.extend(extra.iter().cloned());
1485            }
1486            // Add error_hash tags so query_memory(tags=["hash:abc"]) finds
1487            // the resolution alongside the ErrorSignature memory.
1488            if let Some(errs) = &params.related_errors {
1489                tags.extend(errs.iter().map(|h| format!("hash:{h}")));
1490            }
1491            let mut data = serde_json::Map::new();
1492            if let Some(rc) = &params.root_cause {
1493                data.insert("root_cause".into(), serde_json::json!(rc));
1494            }
1495            if let Some(diff) = &params.fix_diff {
1496                data.insert("fix_diff".into(), serde_json::json!(diff));
1497            }
1498            if let Some(cmds) = &params.commands_used {
1499                data.insert("commands_used".into(), serde_json::json!(cmds));
1500            }
1501            if let Some(errs) = &params.related_errors {
1502                data.insert("related_errors".into(), serde_json::json!(errs));
1503            }
1504            data.insert(
1505                "checkpoint_count".into(),
1506                serde_json::json!(checkpoints.len()),
1507            );
1508            data.insert(
1509                "started_at_ns".into(),
1510                serde_json::json!(active.started_at_ns),
1511            );
1512            data.insert("ended_at_ns".into(), serde_json::json!(now));
1513            (
1514                daemon8_types::DebugSessionOutcome::Resolved,
1515                params.summary,
1516                tags,
1517                Some(serde_json::Value::Object(data)),
1518            )
1519        }
1520    };
1521
1522    // Cap source_observations to the most recent 50 to avoid unbounded blobs.
1523    if source_observations.len() > 50 {
1524        let drop = source_observations.len() - 50;
1525        source_observations.drain(0..drop);
1526    }
1527
1528    let mem = daemon8_store::Memory {
1529        id: None,
1530        created_at: now,
1531        updated_at: now,
1532        kind: daemon8_types::MemoryKind::SessionSummary,
1533        content: summary_text,
1534        source_observations,
1535        tags,
1536        project_slug: active.project_slug.to_string(),
1537        session_id: Some(active.id.to_string()),
1538        confidence: 1.0,
1539        data: data_blob,
1540    };
1541
1542    let summary_memory_id = match mem_store.save_memory(mem).await {
1543        Ok(id) => id,
1544        Err(e) => {
1545            return daemon.err("session_summary_save_failed", &e.to_string(), None, None);
1546        }
1547    };
1548
1549    let status = match outcome {
1550        daemon8_types::DebugSessionOutcome::Resolved => {
1551            daemon8_types::DebugSessionStatus::Completed
1552        }
1553        daemon8_types::DebugSessionOutcome::Abandoned
1554        | daemon8_types::DebugSessionOutcome::InProgress => {
1555            daemon8_types::DebugSessionStatus::Abandoned
1556        }
1557    };
1558
1559    if let Err(e) = ds_store
1560        .end_debug_session(
1561            active.id.as_ref(),
1562            status,
1563            Some(outcome),
1564            Some(summary_memory_id.clone()),
1565            now,
1566        )
1567        .await
1568    {
1569        return daemon.err(
1570            "end_debug_session_db_update_failed",
1571            &e.to_string(),
1572            None,
1573            None,
1574        );
1575    }
1576
1577    daemon.active_state.clear();
1578
1579    daemon.ok_with(
1580        serde_json::json!({
1581            "debug_session_id": active.id.as_ref(),
1582            "summary_memory_id": summary_memory_id,
1583            "checkpoint_count": checkpoints.len(),
1584        }),
1585        vec!["start_debug_session", "query_memory"],
1586        Some("session closed; start_debug_session for the next investigation"),
1587    )
1588}
1589
1590fn current_ns() -> u64 {
1591    std::time::SystemTime::now()
1592        .duration_since(std::time::UNIX_EPOCH)
1593        .unwrap_or_default()
1594        .as_nanos() as u64
1595}
1596
1597impl DaemonMcp {
1598    /// Build the per-tool-call meta block. Currently echoes the active debug
1599    /// session so the LLM sees its lifecycle context on every response.
1600    /// Setup-status surfacing lands in v0.4 once the synchronous probe is
1601    /// wired across crate boundaries; until then `setup_status` remains the
1602    /// explicit tool for setup state.
1603    pub(crate) fn current_meta(&self) -> DaemonMeta {
1604        let mut meta = DaemonMeta::default();
1605        if let Some(s) = self.active_state.current_session() {
1606            meta.active_debug_session = Some(ActiveSessionEcho {
1607                id: s.id.to_string(),
1608                project_slug: s.project_slug.to_string(),
1609                started_at_ns: s.started_at_ns,
1610            });
1611        }
1612        meta
1613    }
1614
1615    pub(crate) fn current_meta_with(
1616        &self,
1617        next_actions: Vec<&str>,
1618        hint: Option<&str>,
1619    ) -> DaemonMeta {
1620        let mut meta = self.current_meta();
1621        if !next_actions.is_empty() {
1622            meta.next_actions = Some(next_actions.into_iter().map(String::from).collect());
1623        }
1624        if let Some(h) = hint {
1625            meta.hint = Some(h.to_string());
1626        }
1627        meta
1628    }
1629
1630    pub(crate) fn ok(&self, value: serde_json::Value) -> String {
1631        envelope::ok_value(value, self.current_meta())
1632    }
1633
1634    pub(crate) fn ok_with(
1635        &self,
1636        value: serde_json::Value,
1637        next_actions: Vec<&str>,
1638        hint: Option<&str>,
1639    ) -> String {
1640        envelope::ok_value(value, self.current_meta_with(next_actions, hint))
1641    }
1642
1643    pub(crate) fn err(
1644        &self,
1645        code: &str,
1646        message: &str,
1647        hint: Option<&str>,
1648        fix_tool: Option<&str>,
1649    ) -> String {
1650        envelope::err(code, message, hint, fix_tool, self.current_meta())
1651    }
1652}
1653
1654#[tool_router(router = setup_tool_router, vis = "pub")]
1655impl DaemonMcp {
1656    #[doc = include_str!("../tool_descriptions/setup_status.md")]
1657    #[tool(name = "setup_status")]
1658    async fn setup_status(&self, Parameters(params): Parameters<SetupStatusParams>) -> String {
1659        let inner = self
1660            .call_setup_tool(SetupToolAction {
1661                action: "status".into(),
1662                cwd: params.cwd,
1663                yes: None,
1664                providers: None,
1665            })
1666            .await;
1667        wrap_inner_result(self, &inner)
1668    }
1669
1670    #[doc = include_str!("../tool_descriptions/setup_plan.md")]
1671    #[tool(name = "setup_plan")]
1672    async fn setup_plan(&self, Parameters(params): Parameters<SetupPlanParams>) -> String {
1673        let inner = self
1674            .call_setup_tool(SetupToolAction {
1675                action: "status".into(),
1676                cwd: params.cwd,
1677                yes: None,
1678                providers: None,
1679            })
1680            .await;
1681        wrap_inner_result(self, &inner)
1682    }
1683
1684    #[doc = include_str!("../tool_descriptions/setup_apply.md")]
1685    #[tool(name = "setup_apply")]
1686    async fn setup_apply(&self, Parameters(params): Parameters<SetupApplyParams>) -> String {
1687        let inner = self
1688            .call_setup_tool(SetupToolAction {
1689                action: "apply".into(),
1690                cwd: params.cwd,
1691                yes: Some(params.yes),
1692                providers: params.providers,
1693            })
1694            .await;
1695        wrap_inner_result(self, &inner)
1696    }
1697}
1698
1699#[tool_router(router = hooks_tool_router, vis = "pub")]
1700impl DaemonMcp {
1701    #[doc = include_str!("../tool_descriptions/hooks_list.md")]
1702    #[tool(name = "hooks_list")]
1703    async fn hooks_list(&self) -> String {
1704        let inner = self
1705            .call_hooks_tool(HooksToolAction {
1706                action: "list".into(),
1707                provider: None,
1708                scope: None,
1709            })
1710            .await;
1711        wrap_inner_result(self, &inner)
1712    }
1713
1714    #[doc = include_str!("../tool_descriptions/hooks_remove.md")]
1715    #[tool(name = "hooks_remove")]
1716    async fn hooks_remove(&self, Parameters(params): Parameters<HooksToolAction>) -> String {
1717        let inner = self
1718            .call_hooks_tool(HooksToolAction {
1719                action: "remove".into(),
1720                provider: params.provider,
1721                scope: params.scope,
1722            })
1723            .await;
1724        wrap_inner_result(self, &inner)
1725    }
1726
1727    #[doc = include_str!("../tool_descriptions/hooks_update.md")]
1728    #[tool(name = "hooks_update")]
1729    async fn hooks_update(&self, Parameters(params): Parameters<HooksToolAction>) -> String {
1730        let inner = self
1731            .call_hooks_tool(HooksToolAction {
1732                action: "update".into(),
1733                provider: params.provider,
1734                scope: params.scope,
1735            })
1736            .await;
1737        wrap_inner_result(self, &inner)
1738    }
1739
1740    #[doc = include_str!("../tool_descriptions/hooks_repair.md")]
1741    #[tool(name = "hooks_repair")]
1742    async fn hooks_repair(&self) -> String {
1743        let inner = self
1744            .call_hooks_tool(HooksToolAction {
1745                action: "repair".into(),
1746                provider: None,
1747                scope: None,
1748            })
1749            .await;
1750        wrap_inner_result(self, &inner)
1751    }
1752}
1753
1754#[tool_router(router = librarian_tool_router, vis = "pub")]
1755impl DaemonMcp {
1756    #[doc = include_str!("../tool_descriptions/librarian_index.md")]
1757    #[tool(name = "librarian_index")]
1758    async fn librarian_index(
1759        &self,
1760        Parameters(params): Parameters<LibrarianIndexParams>,
1761    ) -> String {
1762        let lib_store = match &self.librarian_store {
1763            Some(s) => s,
1764            None => {
1765                return self.err(
1766                    "librarian_store_unavailable",
1767                    "librarian catalog not configured",
1768                    None,
1769                    None,
1770                );
1771            }
1772        };
1773        let inner = librarian_index_inner(lib_store.as_ref(), params).await;
1774        match serde_json::from_str::<serde_json::Value>(&inner) {
1775            Ok(v) if v.get("error").is_some() => wrap_inner_result(self, &inner),
1776            Ok(v) => {
1777                let mut hints = Vec::new();
1778                let kind = v.get("kind").and_then(|k| k.as_str()).unwrap_or("");
1779                let version = v.get("version").and_then(|v| v.as_str()).unwrap_or("");
1780                match kind {
1781                    "project" => hints.push(
1782                        "Next: index its documentation and source configs with edges linking back.",
1783                    ),
1784                    "fix" => hints.push(
1785                        "Consider linking this fix to the error it resolves with edge kind 'fixes'.",
1786                    ),
1787                    _ => {}
1788                }
1789                if version.matches('.').count() > 2 {
1790                    hints.push("Previous version deprecated and linked via supersedes edge.");
1791                }
1792                if v.get("parent_id").and_then(|p| p.as_str()).is_none() && kind != "project" {
1793                    hints.push("Consider organizing under a parent node for hierarchy.");
1794                }
1795                let hint = if hints.is_empty() {
1796                    None
1797                } else {
1798                    Some(hints.join(" "))
1799                };
1800                self.ok_with(v, vec!["librarian_lookup"], hint.as_deref())
1801            }
1802            Err(_) => wrap_inner_result(self, &inner),
1803        }
1804    }
1805
1806    #[doc = include_str!("../tool_descriptions/librarian_lookup.md")]
1807    #[tool(name = "librarian_lookup")]
1808    async fn librarian_lookup(
1809        &self,
1810        Parameters(params): Parameters<LibrarianLookupParams>,
1811    ) -> String {
1812        let lib_store = match &self.librarian_store {
1813            Some(s) => s,
1814            None => {
1815                return self.err(
1816                    "librarian_store_unavailable",
1817                    "librarian catalog not configured",
1818                    None,
1819                    None,
1820                );
1821            }
1822        };
1823        let inner = librarian_lookup_inner(lib_store.as_ref(), params).await;
1824        match serde_json::from_str::<serde_json::Value>(&inner) {
1825            Ok(v) if v.get("error").is_some() => wrap_inner_result(self, &inner),
1826            Ok(v) => {
1827                let mut hints = Vec::new();
1828                if let Some(nodes) = v.get("nodes").and_then(|n| n.as_array()) {
1829                    let thirty_days_ago_ns = std::time::SystemTime::now()
1830                        .duration_since(std::time::UNIX_EPOCH)
1831                        .unwrap_or_default()
1832                        .as_nanos() as u64
1833                        - 30 * 86_400_000_000_000;
1834                    let stale_count = nodes
1835                        .iter()
1836                        .filter(|n| {
1837                            if n.get("canonicalized_at").and_then(|c| c.as_u64()).is_some() {
1838                                return false;
1839                            }
1840                            n.get("last_read_at")
1841                                .and_then(|r| r.as_u64())
1842                                .is_none_or(|ts| ts < thirty_days_ago_ns)
1843                        })
1844                        .count();
1845                    if stale_count > 0 {
1846                        hints.push(
1847                            "Some results haven't been accessed in over 30 days. Consider reviewing and deprecating stale entries with librarian_forget(deprecate=true).",
1848                        );
1849                    }
1850                }
1851                let hint = if hints.is_empty() {
1852                    None
1853                } else {
1854                    Some(hints.join(" "))
1855                };
1856                self.ok_with(v, vec![], hint.as_deref())
1857            }
1858            Err(_) => wrap_inner_result(self, &inner),
1859        }
1860    }
1861
1862    #[doc = include_str!("../tool_descriptions/librarian_forget.md")]
1863    #[tool(name = "librarian_forget")]
1864    async fn librarian_forget(
1865        &self,
1866        Parameters(params): Parameters<LibrarianForgetParams>,
1867    ) -> String {
1868        let lib_store = match &self.librarian_store {
1869            Some(s) => s,
1870            None => {
1871                return self.err(
1872                    "librarian_store_unavailable",
1873                    "librarian catalog not configured",
1874                    None,
1875                    None,
1876                );
1877            }
1878        };
1879        let deprecate = params.deprecate.unwrap_or(true);
1880        if deprecate {
1881            match lib_store.deprecate_node(&params.id).await {
1882                Ok(existed) => self.ok(serde_json::json!({ "deprecated": existed })),
1883                Err(e) => self.err("librarian_forget_failed", &e.to_string(), None, None),
1884            }
1885        } else {
1886            if params.confirm != Some(true) {
1887                return self.err(
1888                    "missing_confirm",
1889                    "hard delete requires confirm=true",
1890                    Some("pass confirm=true to permanently delete, or use deprecate=true (default) for soft-delete"),
1891                    None,
1892                );
1893            }
1894            match lib_store.forget_node(&params.id).await {
1895                Ok(existed) => self.ok(serde_json::json!({ "deleted": existed })),
1896                Err(e) => self.err("librarian_forget_failed", &e.to_string(), None, None),
1897            }
1898        }
1899    }
1900}
1901
1902// Command handler implementations (inner methods, not registered with tool_router).
1903impl DaemonMcp {
1904    async fn call_setup_tool(&self, action: SetupToolAction) -> String {
1905        match &self.setup_tool_fn {
1906            Some(f) => f(action).await,
1907            None => error_json("setup tools not available"),
1908        }
1909    }
1910
1911    async fn call_hooks_tool(&self, action: HooksToolAction) -> String {
1912        match &self.hooks_tool_fn {
1913            Some(f) => f(action).await,
1914            None => error_json("hooks tools not available"),
1915        }
1916    }
1917
1918    async fn connect_browser_inner(&self, params: ConnectParams) -> String {
1919        let endpoint = params.endpoint.clone();
1920        *self
1921            .chrome_endpoint
1922            .lock()
1923            .expect("chrome_endpoint mutex poisoned") = Some(Arc::from(endpoint.as_str()));
1924        match self
1925            .chrome_tx
1926            .send(ChromeCommand::Connect {
1927                endpoint: params.endpoint,
1928            })
1929            .await
1930        {
1931            Ok(()) => {
1932                tracing::info!(endpoint = %endpoint, "MCP requested browser connection");
1933                self.ok(serde_json::json!({
1934                    "status": "connecting",
1935                    "endpoint": endpoint,
1936                }))
1937            }
1938            Err(_) => {
1939                tracing::warn!(endpoint = %endpoint, "browser connect command rejected: daemon shutting down");
1940                self.err(
1941                    "daemon_shutting_down",
1942                    "Daemon is shutting down.",
1943                    None,
1944                    None,
1945                )
1946            }
1947        }
1948    }
1949
1950    async fn issue_command_inner(&self, params: ActParams) -> String {
1951        use daemon8_chrome::BrowserAction;
1952
1953        // Device screenshot: bypass Chrome entirely
1954        if params.action == DebugAction::Screenshot && params.device_serial.is_some() {
1955            return self.handle_device_screenshot(&params).await;
1956        }
1957
1958        if let Err(e) = self
1959            .ensure_chrome_connected(std::time::Duration::from_secs(10))
1960            .await
1961        {
1962            return error_json(&e);
1963        }
1964
1965        let (reply_tx, reply_rx) =
1966            tokio::sync::oneshot::channel::<Result<serde_json::Value, anyhow::Error>>();
1967
1968        // Build the BrowserAction and a mapper that converts the typed reply
1969        // into a uniform serde_json::Value result for the wrapper oneshot.
1970        let action: BrowserAction = match params.action {
1971            DebugAction::EvalJs => {
1972                let expression = match params.expression {
1973                    Some(expr) => expr,
1974                    None => return error_json("eval_js requires 'expression' parameter"),
1975                };
1976                let (tx, rx) = tokio::sync::oneshot::channel();
1977                let reply_tx = reply_tx;
1978                tokio::spawn(async move {
1979                    let result = rx
1980                        .await
1981                        .map_err(|_| anyhow::anyhow!("browser task died"))
1982                        .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from))
1983                        .map(|s| serde_json::json!({ "result": s }));
1984                    let _ = reply_tx.send(result);
1985                });
1986                BrowserAction::EvalJs {
1987                    tab_id: params.tab_id,
1988                    expression,
1989                    reply: tx,
1990                }
1991            }
1992            DebugAction::Screenshot => {
1993                let (tx, rx) = tokio::sync::oneshot::channel();
1994                let reply_tx = reply_tx;
1995                let selector = params.selector.clone();
1996                let shot_dir = self.screenshot_dir.clone();
1997                tokio::spawn(async move {
1998                    let result = rx
1999                        .await
2000                        .map_err(|_| anyhow::anyhow!("browser task died"))
2001                        .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from))
2002                        .and_then(|bytes: Vec<u8>| {
2003                            let path = screenshot_path(&shot_dir, "browser", selector.as_deref());
2004                            std::fs::write(&path, &bytes)
2005                                .map_err(|e| anyhow::anyhow!("failed to write screenshot: {e}"))?;
2006                            Ok(serde_json::json!({
2007                                "screenshot": path.display().to_string(),
2008                                "size_bytes": bytes.len(),
2009                                "selector": selector,
2010                            }))
2011                        });
2012                    let _ = reply_tx.send(result);
2013                });
2014                BrowserAction::Screenshot {
2015                    tab_id: params.tab_id,
2016                    selector: params.selector,
2017                    reply: tx,
2018                }
2019            }
2020            DebugAction::InjectCss => {
2021                let css = match params.css {
2022                    Some(css) => css,
2023                    None => return error_json("inject_css requires 'css' parameter"),
2024                };
2025                let temporary = params.temporary.unwrap_or(true);
2026                let (tx, rx) = tokio::sync::oneshot::channel();
2027                let reply_tx = reply_tx;
2028                tokio::spawn(async move {
2029                    let result = rx
2030                        .await
2031                        .map_err(|_| anyhow::anyhow!("browser task died"))
2032                        .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from))
2033                        .map(|element_id| {
2034                            serde_json::json!({
2035                                "injected": true,
2036                                "element_id": element_id,
2037                                "temporary": temporary,
2038                            })
2039                        });
2040                    let _ = reply_tx.send(result);
2041                });
2042                BrowserAction::InjectCss {
2043                    tab_id: params.tab_id,
2044                    css,
2045                    temporary,
2046                    reply: tx,
2047                }
2048            }
2049            DebugAction::RevertCss => {
2050                let (tx, rx) = tokio::sync::oneshot::channel();
2051                let reply_tx = reply_tx;
2052                tokio::spawn(async move {
2053                    let result = rx
2054                        .await
2055                        .map_err(|_| anyhow::anyhow!("browser task died"))
2056                        .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from))
2057                        .map(|count| serde_json::json!({ "reverted_count": count }));
2058                    let _ = reply_tx.send(result);
2059                });
2060                BrowserAction::RevertCss {
2061                    tab_id: params.tab_id,
2062                    reply: tx,
2063                }
2064            }
2065            DebugAction::ListTabs => {
2066                let (tx, rx) = tokio::sync::oneshot::channel();
2067                let reply_tx = reply_tx;
2068                tokio::spawn(async move {
2069                    let result = rx
2070                        .await
2071                        .map_err(|_| anyhow::anyhow!("browser task died"))
2072                        .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from))
2073                        .map(|tabs| serde_json::json!({ "tabs": tabs }));
2074                    let _ = reply_tx.send(result);
2075                });
2076                BrowserAction::ListTabs { reply: tx }
2077            }
2078            DebugAction::GetPerfMetrics => {
2079                let (tx, rx) = tokio::sync::oneshot::channel();
2080                let reply_tx = reply_tx;
2081                tokio::spawn(async move {
2082                    let result = rx
2083                        .await
2084                        .map_err(|_| anyhow::anyhow!("browser task died"))
2085                        .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from))
2086                        .and_then(|metrics| {
2087                            let json = serde_json::to_value(&metrics)
2088                                .map_err(|e| anyhow::anyhow!("serialization failed: {e}"))?;
2089                            Ok(serde_json::json!({ "metrics": json }))
2090                        });
2091                    let _ = reply_tx.send(result);
2092                });
2093                BrowserAction::GetPerformanceMetrics {
2094                    tab_id: params.tab_id,
2095                    reply: tx,
2096                }
2097            }
2098            DebugAction::GetDom => {
2099                let (tx, rx) = tokio::sync::oneshot::channel();
2100                let reply_tx = reply_tx;
2101                tokio::spawn(async move {
2102                    let result = rx
2103                        .await
2104                        .map_err(|_| anyhow::anyhow!("browser task died"))
2105                        .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from))
2106                        .map(|html| serde_json::json!({ "html": html }));
2107                    let _ = reply_tx.send(result);
2108                });
2109                BrowserAction::GetDom {
2110                    tab_id: params.tab_id,
2111                    selector: params.selector,
2112                    reply: tx,
2113                }
2114            }
2115            DebugAction::SetViewport => {
2116                let width = params.viewport_width.unwrap_or(390);
2117                let height = params.viewport_height.unwrap_or(844);
2118                let scale = params.viewport_scale.unwrap_or(2.0);
2119                let mobile = params.viewport_mobile.unwrap_or(true);
2120                let (tx, rx) = tokio::sync::oneshot::channel();
2121                let reply_tx = reply_tx;
2122                tokio::spawn(async move {
2123                    let result = rx
2124                        .await
2125                        .map_err(|_| anyhow::anyhow!("browser task died"))
2126                        .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from))
2127                        .map(|()| {
2128                            serde_json::json!({
2129                                "viewport_set": true,
2130                                "width": width,
2131                                "height": height,
2132                                "scale": scale,
2133                                "mobile": mobile,
2134                            })
2135                        });
2136                    let _ = reply_tx.send(result);
2137                });
2138                BrowserAction::SetViewport {
2139                    tab_id: params.tab_id,
2140                    width,
2141                    height,
2142                    device_scale_factor: scale,
2143                    mobile,
2144                    user_agent: params.viewport_ua,
2145                    reply: tx,
2146                }
2147            }
2148            DebugAction::ClearViewport => {
2149                let (tx, rx) = tokio::sync::oneshot::channel();
2150                let reply_tx = reply_tx;
2151                tokio::spawn(async move {
2152                    let result = rx
2153                        .await
2154                        .map_err(|_| anyhow::anyhow!("browser task died"))
2155                        .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from))
2156                        .map(|()| serde_json::json!({ "viewport_cleared": true }));
2157                    let _ = reply_tx.send(result);
2158                });
2159                BrowserAction::ClearViewport {
2160                    tab_id: params.tab_id,
2161                    reply: tx,
2162                }
2163            }
2164            DebugAction::NetworkConditions => {
2165                let preset = params.network_preset.unwrap_or(NetworkPreset::Restore);
2166                let preset_str = preset.as_str();
2167                let (tx, rx) = tokio::sync::oneshot::channel();
2168                let reply_tx = reply_tx;
2169                tokio::spawn(async move {
2170                    let result = rx
2171                        .await
2172                        .map_err(|_| anyhow::anyhow!("browser task died"))
2173                        .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from))
2174                        .map(|()| serde_json::json!({ "network_conditions": preset_str }));
2175                    let _ = reply_tx.send(result);
2176                });
2177                BrowserAction::SetNetworkConditions {
2178                    tab_id: params.tab_id,
2179                    preset: preset.as_str().to_string(),
2180                    reply: tx,
2181                }
2182            }
2183            DebugAction::Navigate => {
2184                let url = match params.url {
2185                    Some(u) => u,
2186                    None => return error_json("navigate requires 'url' parameter"),
2187                };
2188                let (tx, rx) = tokio::sync::oneshot::channel();
2189                let reply_tx = reply_tx;
2190                tokio::spawn(async move {
2191                    let result = rx
2192                        .await
2193                        .map_err(|_| anyhow::anyhow!("browser task died"))
2194                        .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from))
2195                        .map(|title| serde_json::json!({ "navigated": true, "title": title }));
2196                    let _ = reply_tx.send(result);
2197                });
2198                BrowserAction::Navigate {
2199                    tab_id: params.tab_id,
2200                    url,
2201                    reply: tx,
2202                }
2203            }
2204            DebugAction::StorageClear => {
2205                let types = params.storage_types.unwrap_or_else(|| "all".to_string());
2206                let (tx, rx) = tokio::sync::oneshot::channel();
2207                let reply_tx = reply_tx;
2208                tokio::spawn(async move {
2209                    let result = rx
2210                        .await
2211                        .map_err(|_| anyhow::anyhow!("browser task died"))
2212                        .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from))
2213                        .map(|()| serde_json::json!({ "cleared": true }));
2214                    let _ = reply_tx.send(result);
2215                });
2216                BrowserAction::StorageClear {
2217                    tab_id: params.tab_id,
2218                    storage_types: types,
2219                    reply: tx,
2220                }
2221            }
2222            DebugAction::StorageInspect => {
2223                let (tx, rx) = tokio::sync::oneshot::channel();
2224                let reply_tx = reply_tx;
2225                tokio::spawn(async move {
2226                    let result = rx
2227                        .await
2228                        .map_err(|_| anyhow::anyhow!("browser task died"))
2229                        .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from));
2230                    let _ = reply_tx.send(result);
2231                });
2232                BrowserAction::StorageInspect {
2233                    tab_id: params.tab_id,
2234                    reply: tx,
2235                }
2236            }
2237            DebugAction::StorageSet => {
2238                let store_type = match params.store_type {
2239                    Some(t) => t.as_str().to_string(),
2240                    None => return error_json("storage_set requires 'store_type' parameter"),
2241                };
2242                let key = match params.storage_key {
2243                    Some(k) => k,
2244                    None => return error_json("storage_set requires 'storage_key' parameter"),
2245                };
2246                let value = params.storage_value.unwrap_or_default();
2247                let (tx, rx) = tokio::sync::oneshot::channel();
2248                let reply_tx = reply_tx;
2249                tokio::spawn(async move {
2250                    let result = rx
2251                        .await
2252                        .map_err(|_| anyhow::anyhow!("browser task died"))
2253                        .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from))
2254                        .map(|()| serde_json::json!({ "set": true }));
2255                    let _ = reply_tx.send(result);
2256                });
2257                BrowserAction::StorageSet {
2258                    tab_id: params.tab_id,
2259                    store_type,
2260                    key,
2261                    value,
2262                    reply: tx,
2263                }
2264            }
2265            DebugAction::ElementAtPoint => {
2266                let x = match params.x {
2267                    Some(v) => v,
2268                    None => return error_json("element_at_point requires 'x' parameter"),
2269                };
2270                let y = match params.y {
2271                    Some(v) => v,
2272                    None => return error_json("element_at_point requires 'y' parameter"),
2273                };
2274                let (tx, rx) = tokio::sync::oneshot::channel();
2275                let reply_tx = reply_tx;
2276                tokio::spawn(async move {
2277                    let result = rx
2278                        .await
2279                        .map_err(|_| anyhow::anyhow!("browser task died"))
2280                        .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from));
2281                    let _ = reply_tx.send(result);
2282                });
2283                BrowserAction::ElementAtPoint {
2284                    tab_id: params.tab_id,
2285                    x,
2286                    y,
2287                    reply: tx,
2288                }
2289            }
2290            DebugAction::NewTab => {
2291                let url = params.url.unwrap_or_else(|| "about:blank".to_string());
2292                let (tx, rx) = tokio::sync::oneshot::channel();
2293                let reply_tx = reply_tx;
2294                tokio::spawn(async move {
2295                    let result = rx
2296                        .await
2297                        .map_err(|_| anyhow::anyhow!("browser task died"))
2298                        .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from))
2299                        .map(|target_id| serde_json::json!({ "tab_id": target_id }));
2300                    let _ = reply_tx.send(result);
2301                });
2302                BrowserAction::NewTab { url, reply: tx }
2303            }
2304            DebugAction::CloseTab => {
2305                let tab_id = match params.tab_id {
2306                    Some(id) => id,
2307                    None => return error_json("close_tab requires 'tab_id' parameter"),
2308                };
2309                let (tx, rx) = tokio::sync::oneshot::channel();
2310                let reply_tx = reply_tx;
2311                tokio::spawn(async move {
2312                    let result = rx
2313                        .await
2314                        .map_err(|_| anyhow::anyhow!("browser task died"))
2315                        .and_then(|r: daemon8_chrome::Result<_>| r.map_err(anyhow::Error::from))
2316                        .map(|()| serde_json::json!({ "closed": true }));
2317                    let _ = reply_tx.send(result);
2318                });
2319                BrowserAction::CloseTab { tab_id, reply: tx }
2320            }
2321        };
2322
2323        if self
2324            .chrome_tx
2325            .send(ChromeCommand::Action(action))
2326            .await
2327            .is_err()
2328        {
2329            tracing::warn!("browser action command rejected: daemon shutting down");
2330            return error_json("Daemon is shutting down.");
2331        }
2332
2333        match tokio::time::timeout(std::time::Duration::from_secs(30), reply_rx).await {
2334            Err(_) => error_json(
2335                "Browser action timed out (30s). The daemon is still connected and will recover.",
2336            ),
2337            Ok(Ok(Ok(value))) => serde_json::to_string(&value).unwrap_or_default(),
2338            Ok(Ok(Err(e))) => error_json(&format!("{e}")),
2339            Ok(Err(_)) => error_json(
2340                "Browser connection lost during action. The daemon is reconnecting automatically.",
2341            ),
2342        }
2343    }
2344}
2345
2346impl DaemonMcp {
2347    async fn handle_device_screenshot(&self, params: &ActParams) -> String {
2348        let screenshot_fn = match &self.device_screenshot_fn {
2349            Some(f) => f,
2350            None => {
2351                tracing::warn!(
2352                    "device screenshot requested but ADB screenshot support is unavailable"
2353                );
2354                return error_json("device screenshots not available (ADB not enabled)");
2355            }
2356        };
2357
2358        let serial = params.device_serial.clone().unwrap_or_default();
2359        let platform = match params.device_platform.as_deref() {
2360            Some("vega") => DevicePlatform::Vega,
2361            _ => DevicePlatform::Android,
2362        };
2363
2364        let result = tokio::time::timeout(
2365            std::time::Duration::from_secs(15),
2366            (screenshot_fn)(serial.clone(), platform.clone()),
2367        )
2368        .await;
2369
2370        match result {
2371            Err(_) => {
2372                tracing::warn!(serial, platform = ?platform, "device screenshot timed out");
2373                error_json("device screenshot timed out (15s)")
2374            }
2375            Ok(Err(e)) => {
2376                tracing::warn!(serial, platform = ?platform, error = %e, "device screenshot failed");
2377                error_json(&format!("device screenshot failed for {serial}: {e}"))
2378            }
2379            Ok(Ok(shot)) => {
2380                let path = screenshot_path(&self.screenshot_dir, &serial, Some(&shot.source));
2381                if let Err(e) = std::fs::write(&path, &shot.png_bytes) {
2382                    tracing::warn!(serial, path = %path.display(), error = %e, "failed to write device screenshot");
2383                    return error_json(&format!("failed to write screenshot: {e}"));
2384                }
2385                tracing::info!(
2386                    serial,
2387                    source = %shot.source,
2388                    path = %path.display(),
2389                    size_bytes = shot.png_bytes.len(),
2390                    "device screenshot captured"
2391                );
2392                serde_json::to_string(&serde_json::json!({
2393                    "screenshot": path.display().to_string(),
2394                    "size_bytes": shot.png_bytes.len(),
2395                    "source": shot.source,
2396                    "serial": serial,
2397                }))
2398                .unwrap_or_default()
2399            }
2400        }
2401    }
2402}
2403
2404fn screenshot_path(dir: &std::path::Path, target: &str, label: Option<&str>) -> std::path::PathBuf {
2405    let ts = std::time::SystemTime::now()
2406        .duration_since(std::time::UNIX_EPOCH)
2407        .unwrap_or_default()
2408        .as_secs();
2409    let safe_target = target.replace(['/', '\\', ':'], "-");
2410    let suffix = label.map(|l| format!("-{l}")).unwrap_or_default();
2411    dir.join(format!("daemon8-screenshot-{ts}-{safe_target}{suffix}.png"))
2412}
2413
2414/// Standalone error builder for code paths without &self access (free
2415/// functions, inner helpers). Produces an envelope-shaped error with no
2416/// `daemon8` meta — tool methods that have &self should prefer
2417/// `self.err(...)` so the active debug session echoes correctly.
2418fn error_json(msg: &str) -> String {
2419    envelope::err(
2420        "internal_error",
2421        msg,
2422        None,
2423        None,
2424        envelope::DaemonMeta::default(),
2425    )
2426}
2427
2428fn detect_librarian_hint(content: &str) -> Option<String> {
2429    let lower = content.to_ascii_lowercase();
2430    if lower.contains("documentation")
2431        || lower.contains("config template")
2432        || lower.contains("source config")
2433        || lower.contains("log source")
2434    {
2435        Some("This memory describes a reference — consider also indexing it with librarian_index(kind=\"doc\" or \"source_template\") for graph-based retrieval.".into())
2436    } else if lower.contains("fix for")
2437        || lower.contains("fixed by")
2438        || lower.contains("workaround")
2439    {
2440        Some("This memory describes a fix — consider also indexing it with librarian_index(kind=\"fix\") to link it to the error it resolves.".into())
2441    } else {
2442        None
2443    }
2444}
2445
2446impl DaemonMcp {
2447    /// Returns the connection-state JSON with full state including browser.
2448    pub async fn connections_json(&self) -> String {
2449        let chrome_state = *self.chrome_state.borrow();
2450        let chrome_endpoint = self
2451            .chrome_endpoint
2452            .lock()
2453            .expect("chrome_endpoint mutex poisoned")
2454            .clone();
2455        let mut result = serde_json::json!({
2456            "browser": {
2457                "state": format!("{chrome_state}"),
2458                "endpoint": chrome_endpoint,
2459            }
2460        });
2461
2462        if let Ok(summary) = self.store.summary().await
2463            && !summary.connections.is_empty()
2464        {
2465            result["applications"] = serde_json::to_value(&summary.connections).unwrap_or_default();
2466        }
2467
2468        serde_json::to_string_pretty(&result)
2469            .unwrap_or_else(|e| error_json(&format!("serialization failed: {e}")))
2470    }
2471
2472    pub fn tools_for_client(&self) -> Vec<Tool> {
2473        self.tool_router.list_all()
2474    }
2475}
2476
2477impl ServerHandler for DaemonMcp {
2478    fn get_info(&self) -> ServerInfo {
2479        let instructions = String::from(INSTRUCTIONS);
2480
2481        let mut capabilities = ServerCapabilities::builder().enable_tools().build();
2482        capabilities.experimental = Some(std::collections::BTreeMap::from([(
2483            "claude/channel".to_string(),
2484            serde_json::Map::new(),
2485        )]));
2486
2487        ServerInfo::new(capabilities)
2488            .with_server_info(Implementation::new("daemon8", env!("CARGO_PKG_VERSION")))
2489            .with_instructions(instructions)
2490    }
2491
2492    async fn on_initialized(&self, context: rmcp::service::NotificationContext<RoleServer>) {
2493        let session_id = next_mcp_session_id();
2494        tracing::info!(
2495            session_id,
2496            "MCP session initialized, starting observation push"
2497        );
2498        let peer = context.peer;
2499        let mut rx = self.broadcast_tx.subscribe();
2500        let sub_rx = self.subscription_tx.subscribe();
2501        let push_source_activator = self.source_activator.clone();
2502        let session_cancel = self.cancel.child_token();
2503        let span = tracing::info_span!("mcp_session", session_id = %session_id);
2504
2505        tokio::spawn(async move {
2506            let mut last_push = std::time::Instant::now() - Duration::from_secs(2);
2507            loop {
2508                let (arc_obs, _json) = tokio::select! {
2509                    biased;
2510                    _ = session_cancel.cancelled() => {
2511                        tracing::debug!("MCP observation push cancelled");
2512                        break;
2513                    }
2514                    recv = rx.recv() => match recv {
2515                        Ok(payload) => payload,
2516                        Err(broadcast::error::RecvError::Lagged(skipped)) => {
2517                            tracing::warn!(skipped, "MCP observation push receiver lagged; continuing with newest observations");
2518                            continue;
2519                        }
2520                        Err(broadcast::error::RecvError::Closed) => {
2521                            tracing::debug!("MCP observation push broadcast closed");
2522                            break;
2523                        }
2524                    },
2525                };
2526
2527                let obs: &Observation = &arc_obs;
2528                let filter = sub_rx.borrow().clone();
2529                let should_push = match filter.as_ref() {
2530                    Some(f) => f.matches(obs),
2531                    None => obs.severity.level() >= daemon8_types::Severity::Warn.level(),
2532                };
2533
2534                if !should_push {
2535                    continue;
2536                }
2537
2538                if let (Some(f), Some(sa)) = (&filter, &push_source_activator) {
2539                    sa.touch_matching(f);
2540                }
2541
2542                if last_push.elapsed() < Duration::from_secs(1) {
2543                    tracing::debug!(
2544                        observation_id = obs.id,
2545                        severity = %obs.severity,
2546                        kind = %obs.kind.tag(),
2547                        "MCP observation push throttled"
2548                    );
2549                    continue;
2550                }
2551
2552                let param = logging_notification(obs);
2553                let send = tokio::time::timeout(Duration::from_secs(5), peer.notify_logging_message(param)).await;
2554
2555                match send {
2556                    Ok(Ok(())) => {
2557                        last_push = std::time::Instant::now();
2558                        tracing::debug!(
2559                            observation_id = obs.id,
2560                            severity = %obs.severity,
2561                            kind = %obs.kind.tag(),
2562                            "MCP observation pushed to client"
2563                        );
2564                    }
2565                    Ok(Err(e)) => {
2566                        tracing::warn!(error = ?e, "MCP observation push failed; ending session push task");
2567                        break;
2568                    }
2569                    Err(_) => {
2570                        tracing::warn!(
2571                            observation_id = obs.id,
2572                            severity = %obs.severity,
2573                            kind = %obs.kind.tag(),
2574                            "MCP observation push timed out"
2575                        );
2576                    }
2577                }
2578            }
2579            tracing::debug!("MCP observation push task ended");
2580        }
2581        .instrument(span));
2582
2583        // Per-session debug session flush: periodically writes the in-memory
2584        // last_activity to the DB so the cleanup task's find_stale_active
2585        // sees current data. Each MCP session flushes its own active session.
2586        if let Some(ds_store) = self.debug_session_store.clone() {
2587            let flush_state = self.active_state.clone();
2588            let flush_cancel = self.cancel.child_token();
2589            tokio::spawn(async move {
2590                loop {
2591                    tokio::select! {
2592                        () = tokio::time::sleep(Duration::from_secs(60)) => {}
2593                        () = flush_cancel.cancelled() => break,
2594                    }
2595                    if let Some(session) = flush_state.current_session() {
2596                        let last = session.last_activity();
2597                        if let Err(e) = ds_store
2598                            .touch_debug_session(session.id.as_ref(), last)
2599                            .await
2600                        {
2601                            tracing::warn!(
2602                                session_id = %session.id,
2603                                error = %e,
2604                                "per-session debug session flush failed"
2605                            );
2606                        }
2607                    }
2608                }
2609                tracing::debug!("per-session debug session flush task ended");
2610            });
2611        }
2612    }
2613
2614    async fn list_tools(
2615        &self,
2616        _request: Option<rmcp::model::PaginatedRequestParams>,
2617        _context: rmcp::service::RequestContext<rmcp::RoleServer>,
2618    ) -> Result<rmcp::model::ListToolsResult, rmcp::ErrorData> {
2619        Ok(rmcp::model::ListToolsResult {
2620            tools: self.tools_for_client(),
2621            meta: None,
2622            next_cursor: None,
2623        })
2624    }
2625
2626    async fn call_tool(
2627        &self,
2628        request: rmcp::model::CallToolRequestParams,
2629        context: rmcp::service::RequestContext<rmcp::RoleServer>,
2630    ) -> Result<rmcp::model::CallToolResult, rmcp::ErrorData> {
2631        let tool = request.name.to_string();
2632        let started = Instant::now();
2633        tracing::debug!(tool = %tool, "MCP tool call started");
2634
2635        let tcc = rmcp::handler::server::tool::ToolCallContext::new(self, request, context);
2636        let result = self.tool_router.call(tcc).await;
2637        let duration_ms = started.elapsed().as_millis();
2638
2639        match &result {
2640            Ok(result) if result.is_error.unwrap_or(false) => {
2641                tracing::warn!(tool = %tool, duration_ms, "MCP tool call returned error result");
2642            }
2643            Ok(_) => {
2644                tracing::info!(tool = %tool, duration_ms, "MCP tool call completed");
2645            }
2646            Err(e) => {
2647                tracing::warn!(tool = %tool, duration_ms, error = ?e, "MCP tool call failed");
2648            }
2649        }
2650
2651        result
2652    }
2653
2654    fn get_tool(&self, name: &str) -> Option<rmcp::model::Tool> {
2655        self.tool_router.get(name).cloned()
2656    }
2657}
2658
2659/// Pure handler for `save_memory`. Extracted so tests can drive it
2660/// against an in-memory `MemoryStore` without spinning up DaemonMcp.
2661pub async fn save_memory_inner(mem_store: &dyn MemoryStore, params: SaveMemoryParams) -> String {
2662    let kind = params
2663        .kind
2664        .as_deref()
2665        .and_then(|s| s.parse::<daemon8_types::MemoryKind>().ok())
2666        .unwrap_or(daemon8_types::MemoryKind::UserFlagged);
2667
2668    let now = std::time::SystemTime::now()
2669        .duration_since(std::time::UNIX_EPOCH)
2670        .unwrap_or_default()
2671        .as_nanos() as u64;
2672
2673    let memory = daemon8_store::Memory {
2674        id: None,
2675        created_at: now,
2676        updated_at: now,
2677        kind,
2678        content: params.content,
2679        source_observations: params.source_observations.unwrap_or_default(),
2680        tags: params.tags.unwrap_or_default(),
2681        project_slug: params.project_slug.unwrap_or_default(),
2682        session_id: params.session_id,
2683        confidence: params.confidence.unwrap_or(1.0),
2684        data: None,
2685    };
2686
2687    match mem_store.save_memory(memory).await {
2688        Ok(id) => serde_json::to_string(&serde_json::json!({ "id": id })).unwrap_or_default(),
2689        Err(e) => error_json(&format!("save_memory failed: {e}")),
2690    }
2691}
2692
2693/// Pure handler for `query_memory`. Extracted so tests can drive it
2694/// against an in-memory `MemoryStore` without spinning up DaemonMcp.
2695pub async fn query_memory_inner(mem_store: &dyn MemoryStore, params: QueryMemoryParams) -> String {
2696    let kinds = params.kinds.map(|v| {
2697        v.into_iter()
2698            .filter_map(|s| s.parse::<daemon8_types::MemoryKind>().ok())
2699            .collect()
2700    });
2701
2702    let filter = daemon8_store::MemoryFilter {
2703        kinds,
2704        tags: params.tags,
2705        project_slug: params.project_slug,
2706        session_id: None,
2707        text_match: params.text,
2708        limit: Some(params.limit.unwrap_or(20).min(500) as usize),
2709    };
2710
2711    match mem_store.query_memory(&filter).await {
2712        Ok(memories) => serde_json::to_string_pretty(&memories)
2713            .unwrap_or_else(|e| error_json(&format!("serialization failed: {e}"))),
2714        Err(e) => error_json(&format!("query_memory failed: {e}")),
2715    }
2716}
2717
2718pub async fn librarian_index_inner(
2719    lib_store: &dyn LibrarianStore,
2720    params: LibrarianIndexParams,
2721) -> String {
2722    let kind = match params.kind.parse::<daemon8_types::LibrarianNodeKind>() {
2723        Ok(k) => k,
2724        Err(_) => {
2725            return error_json(&format!(
2726                "invalid kind '{}'. Use: doc, source_template, fix, project",
2727                params.kind
2728            ));
2729        }
2730    };
2731    let locator_kind = match params.locator_kind.parse::<daemon8_types::LocatorKind>() {
2732        Ok(k) => k,
2733        Err(_) => {
2734            return error_json(&format!(
2735                "invalid locator_kind '{}'. Use: file, url, vault",
2736                params.locator_kind
2737            ));
2738        }
2739    };
2740
2741    let now = std::time::SystemTime::now()
2742        .duration_since(std::time::UNIX_EPOCH)
2743        .unwrap_or_default()
2744        .as_nanos() as u64;
2745
2746    let node = daemon8_store::LibrarianNode {
2747        id: None,
2748        kind,
2749        label: params.label,
2750        locator_kind,
2751        locator: params.locator,
2752        tags: params.tags.unwrap_or_default(),
2753        project_slug: params.project_slug.unwrap_or_default(),
2754        version: String::new(),
2755        parent_id: params.parent_id.clone(),
2756        created_at: now,
2757        updated_at: now,
2758        last_read_at: None,
2759        deprecated_at: None,
2760        canonicalized_at: if params.canonicalize.unwrap_or(false) {
2761            Some(now)
2762        } else {
2763            None
2764        },
2765    };
2766
2767    let id = match lib_store.index_node(node).await {
2768        Ok(id) => id,
2769        Err(e) => return error_json(&format!("librarian_index failed: {e}")),
2770    };
2771
2772    let indexed_node = match lib_store.get_node(&id).await {
2773        Ok(Some(n)) => n,
2774        _ => {
2775            return serde_json::to_string(&serde_json::json!({
2776                "id": id, "version": "unknown", "kind": kind.to_string()
2777            }))
2778            .unwrap_or_default();
2779        }
2780    };
2781
2782    if let Some(ref edge) = params.edge {
2783        let edge_kind = match edge.kind.parse::<daemon8_types::LibrarianEdgeKind>() {
2784            Ok(k) => k,
2785            Err(_) => {
2786                return serde_json::to_string(&serde_json::json!({
2787                    "id": id,
2788                    "version": indexed_node.version,
2789                    "kind": kind.to_string(),
2790                    "edge_error": format!("invalid edge kind '{}'. Use: has_source, documented_by, fixes, supersedes, child_of", edge.kind)
2791                }))
2792                .unwrap_or_default();
2793            }
2794        };
2795        let lib_edge = daemon8_store::LibrarianEdge {
2796            id: None,
2797            kind: edge_kind,
2798            from_node: id.clone(),
2799            to_node: edge.target_node_id.clone(),
2800            created_at: now,
2801        };
2802        if let Err(e) = lib_store.index_edge(lib_edge).await {
2803            return serde_json::to_string(&serde_json::json!({
2804                "id": id,
2805                "version": indexed_node.version,
2806                "kind": kind.to_string(),
2807                "edge_error": format!("edge creation failed: {e}")
2808            }))
2809            .unwrap_or_default();
2810        }
2811    }
2812
2813    serde_json::to_string(&serde_json::json!({
2814        "id": id,
2815        "version": indexed_node.version,
2816        "kind": kind.to_string(),
2817        "parent_id": params.parent_id,
2818    }))
2819    .unwrap_or_default()
2820}
2821
2822pub async fn librarian_lookup_inner(
2823    lib_store: &dyn LibrarianStore,
2824    params: LibrarianLookupParams,
2825) -> String {
2826    if let Some(ref id) = params.id {
2827        let node = match lib_store.get_node(id).await {
2828            Ok(Some(n)) => n,
2829            Ok(None) => return error_json(&format!("node '{id}' not found")),
2830            Err(e) => return error_json(&format!("librarian_lookup failed: {e}")),
2831        };
2832        let edges = match lib_store.get_edges(id).await {
2833            Ok(e) => e,
2834            Err(e) => return error_json(&format!("librarian_lookup edges: {e}")),
2835        };
2836        return serde_json::to_string_pretty(&serde_json::json!({
2837            "node": node,
2838            "edges": edges,
2839        }))
2840        .unwrap_or_default();
2841    }
2842
2843    let kinds = params.kinds.map(|v| {
2844        v.into_iter()
2845            .filter_map(|s| s.parse::<daemon8_types::LibrarianNodeKind>().ok())
2846            .collect()
2847    });
2848
2849    let stale_before = params.stale_before_days.map(|days| {
2850        let now = std::time::SystemTime::now()
2851            .duration_since(std::time::UNIX_EPOCH)
2852            .unwrap_or_default()
2853            .as_nanos() as u64;
2854        now.saturating_sub(u64::from(days) * 86_400_000_000_000)
2855    });
2856
2857    let filter = daemon8_store::LibrarianFilter {
2858        kinds,
2859        tags: params.tags,
2860        project_slug: params.project_slug,
2861        text_match: params.text,
2862        limit: Some(params.limit.unwrap_or(20).min(500) as usize),
2863        include_deprecated: params.include_deprecated.unwrap_or(false),
2864        stale_before,
2865        parent_id: params.parent_id,
2866    };
2867
2868    match lib_store.lookup(&filter).await {
2869        Ok(nodes) => serde_json::to_string_pretty(&serde_json::json!({ "nodes": nodes }))
2870            .unwrap_or_else(|e| error_json(&format!("serialization failed: {e}"))),
2871        Err(e) => error_json(&format!("librarian_lookup failed: {e}")),
2872    }
2873}
2874
2875fn logging_notification(obs: &Observation) -> rmcp::model::LoggingMessageNotificationParam {
2876    let severity_str = obs.severity.to_string();
2877    let kind_str = obs.kind.tag().to_string();
2878    let origin_str = match &obs.origin {
2879        daemon8_types::Origin::Application { name } => format!("app:{name}"),
2880        daemon8_types::Origin::Browser { tab_id, .. } => format!("browser:{tab_id}"),
2881        daemon8_types::Origin::Device { serial, .. } => format!("device:{serial}"),
2882    };
2883    let msg = obs.data["message"]
2884        .as_str()
2885        .or_else(|| obs.data["msg"].as_str())
2886        .unwrap_or("(no message)");
2887    let level = match obs.severity {
2888        daemon8_types::Severity::Trace | daemon8_types::Severity::Debug => {
2889            rmcp::model::LoggingLevel::Debug
2890        }
2891        daemon8_types::Severity::Info => rmcp::model::LoggingLevel::Info,
2892        daemon8_types::Severity::Warn => rmcp::model::LoggingLevel::Warning,
2893        daemon8_types::Severity::Error => rmcp::model::LoggingLevel::Error,
2894    };
2895    let data = serde_json::json!({
2896        "message": format!("[{severity_str}] {kind_str} from {origin_str}: {msg}"),
2897        "severity": severity_str,
2898        "kind": kind_str,
2899        "origin": origin_str,
2900        "observation_id": obs.id,
2901    });
2902
2903    rmcp::model::LoggingMessageNotificationParam::new(level, data)
2904        .with_logger("daemon8".to_string())
2905}
2906
2907#[cfg(test)]
2908mod logging_tests {
2909    use daemon8_types::{ObservationKind, Origin, Severity};
2910
2911    use super::*;
2912
2913    #[test]
2914    fn mcp_session_ids_are_stable_and_prefixed() {
2915        let id = next_mcp_session_id();
2916        assert!(id.starts_with("mcp-"));
2917    }
2918
2919    #[test]
2920    fn forget_memory_gate_rejects_missing_confirm() {
2921        let result = check_forget_memory_confirm(None);
2922        let err = result.expect_err("missing confirm should error");
2923        assert!(
2924            err.contains("confirm=true"),
2925            "error message should name the required field: {err}"
2926        );
2927    }
2928
2929    #[test]
2930    fn forget_memory_gate_rejects_confirm_false() {
2931        let result = check_forget_memory_confirm(Some(false));
2932        assert!(result.is_err(), "confirm=false should error");
2933    }
2934
2935    #[test]
2936    fn forget_memory_gate_accepts_confirm_true() {
2937        let result = check_forget_memory_confirm(Some(true));
2938        assert!(result.is_ok(), "confirm=true should pass");
2939    }
2940
2941    #[test]
2942    fn forget_memory_params_parses_without_confirm() {
2943        let p: ForgetMemoryParams = serde_json::from_str(r#"{"id":"abc"}"#).unwrap();
2944        assert_eq!(p.id, "abc");
2945        assert_eq!(p.confirm, None);
2946    }
2947
2948    #[test]
2949    fn forget_memory_params_parses_with_confirm_true() {
2950        let p: ForgetMemoryParams = serde_json::from_str(r#"{"id":"abc","confirm":true}"#).unwrap();
2951        assert_eq!(p.confirm, Some(true));
2952    }
2953
2954    #[test]
2955    fn forget_memory_params_parses_with_confirm_false() {
2956        let p: ForgetMemoryParams =
2957            serde_json::from_str(r#"{"id":"abc","confirm":false}"#).unwrap();
2958        assert_eq!(p.confirm, Some(false));
2959    }
2960
2961    async fn build_mcp_with_debug_session() -> DaemonMcp {
2962        let store = Arc::new(daemon8_store::SurrealStore::memory().await.unwrap());
2963        let memory_store: Arc<dyn MemoryStore> = Arc::new(store.memory_store());
2964        let debug_session_store: Arc<dyn DebugSessionStore> = Arc::new(store.debug_session_store());
2965        let (obs_tx, _obs_rx) = tokio::sync::mpsc::unbounded_channel();
2966        let (chrome_tx, _chrome_rx) = tokio::sync::mpsc::channel(8);
2967        let (_, chrome_state) =
2968            tokio::sync::watch::channel(daemon8_chrome::ConnectionState::Disconnected);
2969        let (broadcast_tx, _broadcast_rx) = broadcast::channel(8);
2970        let lens = Arc::new(LensManager::new(broadcast_tx.subscribe(), None));
2971        DaemonMcp::new(DaemonMcpConfig {
2972            store: store.clone(),
2973            memory_store: Some(memory_store),
2974            debug_session_store: Some(debug_session_store),
2975            librarian_store: None,
2976            obs_tx,
2977            chrome_tx,
2978            chrome_state,
2979            chrome_endpoint: Arc::new(Mutex::new(None)),
2980            device_screenshot_fn: None,
2981            screenshot_dir: std::env::temp_dir().join("daemon8-test"),
2982            broadcast_tx,
2983            lens,
2984            setup_tool_fn: None,
2985            hooks_tool_fn: None,
2986            source_activator: None,
2987            cancel: tokio_util::sync::CancellationToken::new(),
2988        })
2989    }
2990
2991    #[tokio::test]
2992    async fn debug_session_lifecycle_resolved_writes_rich_summary() {
2993        let mcp = build_mcp_with_debug_session().await;
2994
2995        // start
2996        let start_res = mcp
2997            .start_debug_session(Parameters(StartDebugSessionParams {
2998                project: Some("daemon8".into()),
2999                description: Some("flaky login test".into()),
3000                agent_id: ":test/claude+plan-agent>".into(),
3001                feature: None,
3002            }))
3003            .await;
3004        let started: serde_json::Value = serde_json::from_str(&start_res).unwrap();
3005        let session_id = started["result"]["debug_session_id"]
3006            .as_str()
3007            .unwrap()
3008            .to_string();
3009        assert!(mcp.active_state.current_session().is_some());
3010
3011        // resolve with rich fields
3012        let resolve_res = mcp
3013            .resolve_debug_session(Parameters(ResolveDebugSessionParams {
3014                summary: "Cookie domain mismatch dropped session on subdomain switch.".into(),
3015                root_cause: Some("Set-Cookie missing Domain attr".into()),
3016                fix_diff: Some(
3017                    "- res.cookie('s', tok)\n+ res.cookie('s', tok, {domain: '.x'})".into(),
3018                ),
3019                commands_used: Some(vec!["cargo test login".into()]),
3020                related_errors: Some(vec!["abcd1234deadbeef".into()]),
3021                tags: Some(vec!["auth".into(), "regression".into()]),
3022            }))
3023            .await;
3024        let resolved: serde_json::Value = serde_json::from_str(&resolve_res).unwrap();
3025        assert_eq!(resolved["result"]["debug_session_id"], session_id);
3026        let memory_id = resolved["result"]["summary_memory_id"].as_str().unwrap();
3027
3028        // active state cleared
3029        assert!(mcp.active_state.current_session().is_none());
3030
3031        // SessionSummary memory landed with rich data
3032        let mem_store = mcp.memory_store.clone().unwrap();
3033        let mem = mem_store.get_memory(memory_id).await.unwrap().unwrap();
3034        assert_eq!(mem.kind, daemon8_types::MemoryKind::SessionSummary);
3035        assert!(mem.content.contains("Cookie domain"));
3036        let data = mem.data.expect("resolved session must carry rich data");
3037        assert_eq!(data["root_cause"], "Set-Cookie missing Domain attr");
3038        assert!(mem.tags.contains(&"outcome:resolved".to_string()));
3039        assert!(mem.tags.contains(&"hash:abcd1234deadbeef".to_string()));
3040        assert!(mem.tags.contains(&"auth".to_string()));
3041
3042        // session row updated to completed
3043        let ds_store = mcp.debug_session_store.clone().unwrap();
3044        let session = ds_store
3045            .get_debug_session(&session_id)
3046            .await
3047            .unwrap()
3048            .unwrap();
3049        assert_eq!(session.status, daemon8_types::DebugSessionStatus::Completed);
3050        assert_eq!(
3051            session.outcome,
3052            Some(daemon8_types::DebugSessionOutcome::Resolved)
3053        );
3054        assert_eq!(session.summary_memory_id.as_deref(), Some(memory_id));
3055    }
3056
3057    #[tokio::test]
3058    async fn debug_session_double_start_rejected() {
3059        let mcp = build_mcp_with_debug_session().await;
3060        let _ = mcp
3061            .start_debug_session(Parameters(StartDebugSessionParams {
3062                project: None,
3063                description: None,
3064                agent_id: ":test/claude+plan-agent>".into(),
3065                feature: None,
3066            }))
3067            .await;
3068        let second = mcp
3069            .start_debug_session(Parameters(StartDebugSessionParams {
3070                project: None,
3071                description: None,
3072                agent_id: ":test/claude+plan-agent>".into(),
3073                feature: None,
3074            }))
3075            .await;
3076        assert!(
3077            second.contains("already_active_debug_session"),
3078            "second start must be rejected: {second}"
3079        );
3080    }
3081
3082    #[tokio::test]
3083    async fn create_checkpoint_without_active_session_returns_structured_error() {
3084        let mcp = build_mcp_with_debug_session().await;
3085        let res = mcp
3086            .create_checkpoint(Parameters(CreateCheckpointParams { description: None }))
3087            .await;
3088        let parsed: serde_json::Value = serde_json::from_str(&res).unwrap();
3089        assert_eq!(parsed["error"]["code"], "no_active_debug_session");
3090        assert_eq!(parsed["error"]["fix"]["tool"], "start_debug_session");
3091        assert!(parsed["result"].is_null());
3092    }
3093
3094    #[tokio::test]
3095    async fn create_checkpoint_inside_active_session_writes_row_and_updates_active_state() {
3096        let mcp = build_mcp_with_debug_session().await;
3097        let _ = mcp
3098            .start_debug_session(Parameters(StartDebugSessionParams {
3099                project: Some("daemon8".into()),
3100                description: None,
3101                agent_id: ":test/claude+plan-agent>".into(),
3102                feature: None,
3103            }))
3104            .await;
3105        let res = mcp
3106            .create_checkpoint(Parameters(CreateCheckpointParams {
3107                description: Some("before fix".into()),
3108            }))
3109            .await;
3110        let parsed: serde_json::Value = serde_json::from_str(&res).unwrap();
3111        let result = &parsed["result"];
3112        let cp_id = result["checkpoint_id"].as_str().unwrap();
3113        assert!(result["seq_at_creation"].is_number());
3114        // Envelope echoes the active session.
3115        assert!(parsed["daemon8"]["active_debug_session"].is_object());
3116
3117        // active_state.checkpoint should now match
3118        let active_cp = mcp.active_state.current_checkpoint().unwrap();
3119        assert_eq!(active_cp.as_ref(), cp_id);
3120
3121        // Row exists in store with our description
3122        let ds_store = mcp.debug_session_store.clone().unwrap();
3123        let cp = ds_store.get_checkpoint(cp_id).await.unwrap().unwrap();
3124        assert_eq!(cp.description.as_deref(), Some("before fix"));
3125    }
3126
3127    #[tokio::test]
3128    async fn end_without_active_session_returns_error() {
3129        let mcp = build_mcp_with_debug_session().await;
3130        let res = mcp
3131            .end_debug_session(Parameters(EndDebugSessionParams { outcome: None }))
3132            .await;
3133        assert!(res.contains("no_active_debug_session"));
3134    }
3135
3136    #[tokio::test]
3137    async fn list_debug_sessions_filters_by_status() {
3138        let mcp = build_mcp_with_debug_session().await;
3139        // start + resolve one
3140        let _ = mcp
3141            .start_debug_session(Parameters(StartDebugSessionParams {
3142                project: Some("p".into()),
3143                description: None,
3144                agent_id: ":test/claude+plan-agent>".into(),
3145                feature: None,
3146            }))
3147            .await;
3148        let _ = mcp
3149            .resolve_debug_session(Parameters(ResolveDebugSessionParams {
3150                summary: "x".into(),
3151                root_cause: None,
3152                fix_diff: None,
3153                commands_used: None,
3154                related_errors: None,
3155                tags: None,
3156            }))
3157            .await;
3158        // start a fresh one (active)
3159        let _ = mcp
3160            .start_debug_session(Parameters(StartDebugSessionParams {
3161                project: Some("p".into()),
3162                description: None,
3163                agent_id: ":test/claude+plan-agent>".into(),
3164                feature: None,
3165            }))
3166            .await;
3167
3168        let active_only = mcp
3169            .list_debug_sessions(Parameters(ListDebugSessionsParams {
3170                status: Some("active".into()),
3171                feature: None,
3172            }))
3173            .await;
3174        let parsed: serde_json::Value = serde_json::from_str(&active_only).unwrap();
3175        assert_eq!(parsed["result"]["count"], 1);
3176
3177        let all = mcp
3178            .list_debug_sessions(Parameters(ListDebugSessionsParams {
3179                status: None,
3180                feature: None,
3181            }))
3182            .await;
3183        let parsed: serde_json::Value = serde_json::from_str(&all).unwrap();
3184        assert_eq!(parsed["result"]["count"], 2);
3185    }
3186
3187    #[tokio::test]
3188    async fn save_memory_inner_persists_curated_memory() {
3189        let store = daemon8_store::SurrealStore::memory().await.unwrap();
3190        let mem_store = store.memory_store();
3191        mem_store.init_schema().await.unwrap();
3192
3193        let result = save_memory_inner(
3194            &mem_store,
3195            SaveMemoryParams {
3196                content: "Prefer checkpoints before runtime checks.".into(),
3197                kind: Some("decision".into()),
3198                tags: Some(vec!["project:daemon8".into(), "kind:test".into()]),
3199                source_observations: Some(vec![42]),
3200                project_slug: Some("daemon8".into()),
3201                session_id: Some("test-session".into()),
3202                confidence: Some(0.9),
3203            },
3204        )
3205        .await;
3206
3207        let value: serde_json::Value = serde_json::from_str(&result).unwrap();
3208        let id = value["id"]
3209            .as_str()
3210            .expect("save_memory should return an id");
3211        let saved = mem_store.get_memory(id).await.unwrap().unwrap();
3212
3213        assert_eq!(saved.kind, daemon8_types::MemoryKind::Decision);
3214        assert_eq!(saved.content, "Prefer checkpoints before runtime checks.");
3215        assert_eq!(saved.source_observations, vec![42]);
3216        assert_eq!(saved.tags, vec!["project:daemon8", "kind:test"]);
3217        assert_eq!(saved.project_slug, "daemon8");
3218        assert_eq!(saved.session_id.as_deref(), Some("test-session"));
3219        assert_eq!(saved.confidence, 0.9);
3220    }
3221
3222    #[test]
3223    fn logging_notification_includes_operational_fields() {
3224        let mut obs = Observation::new(
3225            Origin::Application {
3226                name: "test-app".into(),
3227            },
3228            ObservationKind::Log,
3229            serde_json::json!({"message": "hello"}),
3230            Severity::Warn,
3231            None,
3232        );
3233        obs.id = 42;
3234
3235        let param = logging_notification(&obs);
3236        assert_eq!(param.logger.as_deref(), Some("daemon8"));
3237        assert_eq!(param.data["severity"], "warn");
3238        assert_eq!(param.data["kind"], "log");
3239        assert_eq!(param.data["origin"], "app:test-app");
3240        assert_eq!(param.data["observation_id"], 42);
3241    }
3242
3243    // ── B4: Multi-session + agent ID tests ──────────────────────────
3244
3245    /// Two MCP instances from a shared SurrealDB store — each gets its own
3246    /// ActiveSessionState (created internally by DaemonMcp::new), so they
3247    /// do not interfere with each other's debug sessions.
3248    async fn build_shared_mcps() -> (DaemonMcp, DaemonMcp) {
3249        let shared_store = Arc::new(daemon8_store::SurrealStore::memory().await.unwrap());
3250        let shared_mem: Arc<dyn MemoryStore> = Arc::new(shared_store.memory_store());
3251        let shared_ds: Arc<dyn DebugSessionStore> = Arc::new(shared_store.debug_session_store());
3252
3253        // Keep receivers alive so observations sent via ingest_observation
3254        // actually reach the store through the drain tasks.
3255        let (shared_obs_tx, mut shared_obs_rx) = tokio::sync::mpsc::unbounded_channel();
3256
3257        // Drain task: reads from the shared channel and inserts into the store.
3258        // Tokio drops the spawned task when the test runtime shuts down.
3259        let drain_store = shared_store.clone();
3260        tokio::spawn(async move {
3261            while let Some(obs) = shared_obs_rx.recv().await {
3262                let _ = drain_store.insert(obs).await;
3263            }
3264        });
3265
3266        let make = || {
3267            let (chrome_tx, _chrome_rx) = tokio::sync::mpsc::channel(8);
3268            let (_, chrome_state) =
3269                tokio::sync::watch::channel(daemon8_chrome::ConnectionState::Disconnected);
3270            let (broadcast_tx, _broadcast_rx) = broadcast::channel(8);
3271            let lens = Arc::new(LensManager::new(broadcast_tx.subscribe(), None));
3272            DaemonMcp::new(DaemonMcpConfig {
3273                store: shared_store.clone(),
3274                memory_store: Some(shared_mem.clone()),
3275                debug_session_store: Some(shared_ds.clone()),
3276                librarian_store: None,
3277                obs_tx: shared_obs_tx.clone(),
3278                chrome_tx,
3279                chrome_state,
3280                chrome_endpoint: Arc::new(Mutex::new(None)),
3281                device_screenshot_fn: None,
3282                screenshot_dir: std::env::temp_dir().join("daemon8-test"),
3283                broadcast_tx,
3284                lens,
3285                setup_tool_fn: None,
3286                hooks_tool_fn: None,
3287                source_activator: None,
3288                cancel: tokio_util::sync::CancellationToken::new(),
3289            })
3290        };
3291
3292        (make(), make())
3293    }
3294
3295    #[tokio::test]
3296    async fn multi_session_two_agents_non_conflicting() {
3297        let (a, b) = build_shared_mcps().await;
3298
3299        let a_start = a
3300            .start_debug_session(Parameters(StartDebugSessionParams {
3301                project: Some("daemon8".into()),
3302                description: Some("agent A investigating auth".into()),
3303                agent_id: ":test/claude+plan-agent>".into(),
3304                feature: Some("auth".into()),
3305            }))
3306            .await;
3307        let a_parsed: serde_json::Value = serde_json::from_str(&a_start).unwrap();
3308        assert!(
3309            a_parsed["result"]["debug_session_id"].is_string(),
3310            "agent A must start successfully: {a_start}"
3311        );
3312
3313        let b_start = b
3314            .start_debug_session(Parameters(StartDebugSessionParams {
3315                project: Some("daemon8".into()),
3316                description: Some("agent B investigating search".into()),
3317                agent_id: ":test/codex+build-agent>".into(),
3318                feature: Some("search".into()),
3319            }))
3320            .await;
3321        let b_parsed: serde_json::Value = serde_json::from_str(&b_start).unwrap();
3322        assert!(
3323            b_parsed["result"]["debug_session_id"].is_string(),
3324            "agent B must start successfully — no global single-active: {b_start}"
3325        );
3326
3327        // Verify B's response does NOT contain an already_active error
3328        assert!(
3329            !b_start.contains("already_active_debug_session"),
3330            "agent B must not be blocked by agent A's active session"
3331        );
3332    }
3333
3334    #[tokio::test]
3335    async fn multi_session_observations_stamped_independently() {
3336        let (a, b) = build_shared_mcps().await;
3337
3338        // Agent A starts auth session
3339        let a_start: serde_json::Value = serde_json::from_str(
3340            &a.start_debug_session(Parameters(StartDebugSessionParams {
3341                project: Some("p".into()),
3342                description: None,
3343                agent_id: ":test/claude+plan-agent>".into(),
3344                feature: None,
3345            }))
3346            .await,
3347        )
3348        .unwrap();
3349        let a_sid = a_start["result"]["debug_session_id"].as_str().unwrap();
3350
3351        // Agent B starts search session
3352        let b_start: serde_json::Value = serde_json::from_str(
3353            &b.start_debug_session(Parameters(StartDebugSessionParams {
3354                project: Some("p".into()),
3355                description: None,
3356                agent_id: ":test/codex+build-agent>".into(),
3357                feature: None,
3358            }))
3359            .await,
3360        )
3361        .unwrap();
3362        let b_sid = b_start["result"]["debug_session_id"].as_str().unwrap();
3363
3364        assert_ne!(a_sid, b_sid, "each agent must get a distinct session id");
3365
3366        // Each ingests an observation through their own MCP instance
3367        a.ingest_observation(Parameters(IngestParams {
3368            kind: Some("log".into()),
3369            severity: Some("info".into()),
3370            app: Some("test-a".into()),
3371            channel: None,
3372            correlation_id: None,
3373            parent_id: None,
3374            node_id: None,
3375            session_id: None,
3376            tags: None,
3377            data: serde_json::json!({"msg": "from agent A"}),
3378        }))
3379        .await;
3380
3381        b.ingest_observation(Parameters(IngestParams {
3382            kind: Some("log".into()),
3383            severity: Some("info".into()),
3384            app: Some("test-b".into()),
3385            channel: None,
3386            correlation_id: None,
3387            parent_id: None,
3388            node_id: None,
3389            session_id: None,
3390            tags: None,
3391            data: serde_json::json!({"msg": "from agent B"}),
3392        }))
3393        .await;
3394
3395        // Let the drain task process both observations
3396        tokio::task::yield_now().await;
3397        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3398
3399        // Query the store directly to verify each observation got the right stamp
3400        let slice = a
3401            .store
3402            .query(&daemon8_types::Filter {
3403                kinds: Some(vec![daemon8_types::ObservationKindTag::Log]),
3404                ..Default::default()
3405            })
3406            .await
3407            .unwrap();
3408
3409        assert_eq!(
3410            slice.observations.len(),
3411            2,
3412            "both observations must be in the shared store"
3413        );
3414
3415        let has_a = slice
3416            .observations
3417            .iter()
3418            .any(|o| o.debug_session_id.as_deref() == Some(a_sid));
3419        let has_b = slice
3420            .observations
3421            .iter()
3422            .any(|o| o.debug_session_id.as_deref() == Some(b_sid));
3423        assert!(
3424            has_a,
3425            "observation from agent A must be stamped with A's session"
3426        );
3427        assert!(
3428            has_b,
3429            "observation from agent B must be stamped with B's session"
3430        );
3431    }
3432
3433    #[tokio::test]
3434    async fn multi_session_end_one_leaves_other_active() {
3435        let (a, b) = build_shared_mcps().await;
3436
3437        // Both start sessions
3438        let _ = a
3439            .start_debug_session(Parameters(StartDebugSessionParams {
3440                project: None,
3441                description: None,
3442                agent_id: ":test/claude+plan-agent>".into(),
3443                feature: None,
3444            }))
3445            .await;
3446        let _ = b
3447            .start_debug_session(Parameters(StartDebugSessionParams {
3448                project: None,
3449                description: None,
3450                agent_id: ":test/codex+build-agent>".into(),
3451                feature: None,
3452            }))
3453            .await;
3454
3455        // A ends its session
3456        let end_res: serde_json::Value = serde_json::from_str(
3457            &a.end_debug_session(Parameters(EndDebugSessionParams { outcome: None }))
3458                .await,
3459        )
3460        .unwrap();
3461        assert!(
3462            end_res["result"]["debug_session_id"].is_string(),
3463            "end must succeed: {end_res}"
3464        );
3465
3466        // A's active state should be clear
3467        assert!(a.active_state.current_session().is_none());
3468
3469        // B's active state should still be present
3470        assert!(
3471            b.active_state.current_session().is_some(),
3472            "agent B must remain active after A ends"
3473        );
3474    }
3475
3476    #[tokio::test]
3477    async fn list_debug_sessions_filters_by_feature() {
3478        let (a, _b) = build_shared_mcps().await;
3479
3480        // Agent A creates two sessions with different features
3481        let _ = a
3482            .start_debug_session(Parameters(StartDebugSessionParams {
3483                project: Some("p".into()),
3484                description: None,
3485                agent_id: ":test/claude+plan-agent>".into(),
3486                feature: Some("auth".into()),
3487            }))
3488            .await;
3489        // End first session so we can start a second (single-active per instance)
3490        let _ = a
3491            .end_debug_session(Parameters(EndDebugSessionParams { outcome: None }))
3492            .await;
3493        let _ = a
3494            .start_debug_session(Parameters(StartDebugSessionParams {
3495                project: Some("p".into()),
3496                description: None,
3497                agent_id: ":test/claude+plan-agent>".into(),
3498                feature: Some("search".into()),
3499            }))
3500            .await;
3501
3502        // Filter by feature
3503        let auth_only: serde_json::Value = serde_json::from_str(
3504            &a.list_debug_sessions(Parameters(ListDebugSessionsParams {
3505                status: None,
3506                feature: Some("auth".into()),
3507            }))
3508            .await,
3509        )
3510        .unwrap();
3511        assert_eq!(
3512            auth_only["result"]["count"], 1,
3513            "feature filter must return only the auth session"
3514        );
3515        assert_eq!(
3516            auth_only["result"]["sessions"][0]["feature"], "auth",
3517            "returned session must have the matching feature"
3518        );
3519
3520        let search_only: serde_json::Value = serde_json::from_str(
3521            &a.list_debug_sessions(Parameters(ListDebugSessionsParams {
3522                status: None,
3523                feature: Some("search".into()),
3524            }))
3525            .await,
3526        )
3527        .unwrap();
3528        assert_eq!(
3529            search_only["result"]["count"], 1,
3530            "feature filter must return only the search session"
3531        );
3532
3533        let none: serde_json::Value = serde_json::from_str(
3534            &a.list_debug_sessions(Parameters(ListDebugSessionsParams {
3535                status: None,
3536                feature: Some("nonexistent".into()),
3537            }))
3538            .await,
3539        )
3540        .unwrap();
3541        assert_eq!(
3542            none["result"]["count"], 0,
3543            "unknown feature must return empty"
3544        );
3545    }
3546
3547    // ── B4: Agent ID validation ──────────────────────────────────────
3548
3549    #[test]
3550    fn agent_id_valid_formats() {
3551        for id in [
3552            ":mbp/claude+plan-agent>",
3553            ":linux/codex+build-agent>",
3554            ":mbp/gemini+researcher>",
3555            ":mini/copilot+reviewer>",
3556            ":box/opencode+crawler>",
3557            ":test-host/my-tool+my-role>",
3558        ] {
3559            assert!(
3560                validate_agent_id(id).is_ok(),
3561                "valid agent_id must pass: {id}"
3562            );
3563        }
3564    }
3565
3566    #[test]
3567    fn agent_id_rejects_missing_colon() {
3568        assert!(validate_agent_id("mbp/claude+agent>").is_err());
3569    }
3570
3571    #[test]
3572    fn agent_id_rejects_missing_gt() {
3573        assert!(validate_agent_id(":mbp/claude+agent").is_err());
3574    }
3575
3576    #[test]
3577    fn agent_id_rejects_missing_slash() {
3578        assert!(validate_agent_id(":mbp-claude+agent>").is_err());
3579    }
3580
3581    #[test]
3582    fn agent_id_rejects_missing_plus() {
3583        assert!(validate_agent_id(":mbp/claude-agent>").is_err());
3584    }
3585
3586    #[test]
3587    fn agent_id_rejects_uppercase() {
3588        assert!(validate_agent_id(":MBP/claude+agent>").is_err());
3589    }
3590
3591    #[test]
3592    fn agent_id_rejects_empty_host() {
3593        assert!(validate_agent_id(":/tool+role>").is_err());
3594    }
3595
3596    #[test]
3597    fn agent_id_rejects_empty_tool() {
3598        assert!(validate_agent_id(":host/+role>").is_err());
3599    }
3600
3601    #[test]
3602    fn agent_id_rejects_empty_role() {
3603        assert!(validate_agent_id(":host/tool+>").is_err());
3604    }
3605
3606    #[test]
3607    fn agent_id_rejects_too_long() {
3608        let long_id = format!(":{}", "x".repeat(65));
3609        assert!(validate_agent_id(&long_id).is_err());
3610    }
3611
3612    #[tokio::test]
3613    async fn start_debug_session_rejects_invalid_agent_id() {
3614        let mcp = build_mcp_with_debug_session().await;
3615        let res = mcp
3616            .start_debug_session(Parameters(StartDebugSessionParams {
3617                project: None,
3618                description: None,
3619                agent_id: "bad-format".into(),
3620                feature: None,
3621            }))
3622            .await;
3623        assert!(
3624            res.contains("invalid_agent_id"),
3625            "bad agent_id must be rejected: {res}"
3626        );
3627    }
3628}