Skip to main content

solo_api/
mcp_notify.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! v0.11.0 P4 — bridge per-tenant `InvalidateEvent` broadcasts into
4//! MCP `notifications/message` events on each session's SSE stream.
5//!
6//! ## Scope
7//!
8//! v0.11.0 ships only the two MCP-spec-defined notification shapes
9//! (`notifications/progress` and `notifications/message`) per plan §3
10//! Decision B. P3 wired `notifications/progress` (per-tool progress).
11//! P4 wires `notifications/message` by subscribing to each tenant's
12//! existing `TenantHandle::invalidate_sender()` broadcast channel (the
13//! same channel that powers `/v1/graph/stream` for solo-web) and mapping
14//! each `InvalidateEvent` into a JSON-RPC `notifications/message`
15//! envelope on the per-session broadcast channel from P1+P2.
16//!
17//! ## Mapping policy (locked for v0.11.0)
18//!
19//! `InvalidateEvent` carries a `kind` discriminator (`episode`,
20//! `document`, `chunk`, `cluster`, `triple`, `tenant`) and a `reason`
21//! string (`memory.remember`, `memory.forget`, …) per
22//! `docs/dev-log/0118-graph-stream-impl.md`. P4 maps these conservatively
23//! into MCP `data` strings the client can switch on:
24//!
25//! | `kind`     | `data` string          | Notes |
26//! |------------|------------------------|-------|
27//! | `episode`  | `memories_updated`     | Coarse signal: any episode-level change |
28//! | `document` | `documents_updated`    | Ingest / forget at document granularity |
29//! | `chunk`    | `documents_updated`    | Coalesced under documents (chunks are a leaf of one document) |
30//! | `cluster`  | `consolidation_updated`| Steward consolidation landed |
31//! | `triple`   | `graph_updated`        | Triples-extract committed |
32//! | `tenant`   | `tenant_updated`       | GDPR cascade or tenant lifecycle |
33//! | other      | `memory_updated`       | Fallback so a future writer kind never silently drops |
34//!
35//! `level = "info"` for every variant; `logger = "solo"`. The MCP spec's
36//! `notifications/message` shape allows `level` ∈ {debug, info, warning,
37//! error}; v0.11.0 emits at `info` because every wire shape is a
38//! committed write (invariant: `InvalidateEvent` is only sent after the
39//! writer-actor's commit succeeds — see `solo_core::InvalidateEvent`'s
40//! docstring).
41//!
42//! Each emitted MCP message ALSO carries the original `reason`,
43//! `tenant_id`, and `ts_ms` in a nested `details` object so clients that
44//! want the structured shape get it without losing the coarse
45//! `data` switch. The two halves stay in lock-step — `data` is what to
46//! refetch; `details.reason` is why.
47//!
48//! ## Per-session task lifecycle
49//!
50//! For each session created by `POST /mcp`, the POST handler calls
51//! [`spawn_invalidate_bridge`] passing the freshly-resolved
52//! `Arc<TenantHandle>` + `Arc<SessionState>`. The function:
53//!
54//!   1. Calls `tenant.invalidate_sender().subscribe()` BEFORE downgrading
55//!      the session Arc to a `Weak<SessionState>`.
56//!   2. Spawns a `tokio::spawn` task that loops on `rx.recv()`:
57//!      - `Ok(event)` → map via [`map_invalidate_to_message`], call
58//!        `session.publish_event(Message, envelope)` via the `Weak`
59//!        upgrade. Returns immediately if upgrade fails (session
60//!        dropped — task exits).
61//!      - `Err(RecvError::Lagged(n))` → log warn (matches
62//!        `/v1/graph/stream` discipline) and continue. The session's
63//!        own event-channel lag handling kicks in on the GET side; the
64//!        bridge itself is best-effort fan-out.
65//!      - `Err(RecvError::Closed)` → tenant handle dropped (rare
66//!        outside test shutdown); task exits cleanly.
67//!
68//! The task holds NO strong reference to `SessionState` — only a
69//! `Weak`. The session's `Arc` lives in the [`SessionStore`]; when the
70//! store drops the session (TTL expiry or `delete`), the Weak upgrade
71//! fails on the next event and the task exits. This is the
72//! `session_task_exits_when_session_dropped` test contract.
73//!
74//! ## Out of scope (v0.12.0+)
75//!
76//! - **Coalescing rapid bursts.** v0.11.0 emits one MCP message per
77//!   `InvalidateEvent`. A pathological pattern (e.g. 1000 rapid
78//!   `memory.remember` calls under one session) would emit 1000
79//!   messages. The per-session broadcast channel is bounded
80//!   ([`crate::mcp_session::MCP_SESSION_EVENT_BUFFER_CAPACITY`] = 256),
81//!   so a slow GET subscriber would see `event: lagged` per P2 rather
82//!   than memory growth. A future v0.12.0 follow-up could coalesce
83//!   bursts at the bridge tier (e.g. "if N events of the same kind in
84//!   1s, emit one"); not needed for v0.11.0 launch.
85//! - **Solo-custom event types** (`audit_event`, `memory_consolidated`,
86//!   `tenant_changed` as Solo-custom shapes). Deferred to v0.12.0+ per
87//!   plan §3 Decision B. P4 ships only spec-compliant
88//!   `notifications/message`.
89//! - **Per-event auth scoping.** The bridge inherits the session's
90//!   bound tenant; cross-tenant filtering happens at the broadcast
91//!   layer (each tenant has its own `invalidate_sender`) so cross-
92//!   tenant leakage is impossible by construction.
93
94use std::sync::{Arc, Weak};
95
96use solo_core::InvalidateEvent;
97use solo_storage::TenantHandle;
98use tokio::sync::broadcast;
99
100use crate::mcp_session::{McpEventKind, SessionState};
101
102/// JSON-RPC method literal for MCP `notifications/message`. Held as
103/// `pub const` so audits can grep the wire literal once and trust it
104/// never drifts (Lesson #25 / #39 grep-ability).
105pub const MCP_NOTIFICATION_MESSAGE_METHOD: &str = "notifications/message";
106
107/// `level` field of every MCP `notifications/message` envelope this
108/// bridge emits. The spec permits `debug` / `info` / `warning` /
109/// `error`; v0.11.0 emits `info` because the invariant on
110/// [`solo_core::InvalidateEvent`] is that it ONLY fires after a
111/// successful writer-actor commit — there is no error/warning path.
112pub const MCP_NOTIFICATION_MESSAGE_LEVEL: &str = "info";
113
114/// `logger` field of every MCP `notifications/message` envelope this
115/// bridge emits. The MCP spec requires `logger` to be a server-chosen
116/// identifier — `"solo"` distinguishes our messages from any
117/// proxy / middleware that might emit on the same stream.
118pub const MCP_NOTIFICATION_MESSAGE_LOGGER: &str = "solo";
119
120/// `data` discriminator for episode-level invalidations. Coarse
121/// signal: "any episode-level memory changed; refetch what you care
122/// about". Maps `InvalidateEvent.kind = "episode"`.
123pub const MCP_NOTIFICATION_DATA_MEMORIES_UPDATED: &str = "memories_updated";
124
125/// `data` discriminator for document-level invalidations. Includes
126/// chunks (coalesced — chunks are a leaf of a parent document, so
127/// refetching the document covers both).
128pub const MCP_NOTIFICATION_DATA_DOCUMENTS_UPDATED: &str = "documents_updated";
129
130/// `data` discriminator for consolidation/cluster invalidations.
131pub const MCP_NOTIFICATION_DATA_CONSOLIDATION_UPDATED: &str = "consolidation_updated";
132
133/// `data` discriminator for graph (triples) invalidations.
134pub const MCP_NOTIFICATION_DATA_GRAPH_UPDATED: &str = "graph_updated";
135
136/// `data` discriminator for tenant-lifecycle invalidations (today
137/// fired by the GDPR `forget_user` cascade per
138/// `solo_core::InvalidateEvent`'s kind taxonomy).
139pub const MCP_NOTIFICATION_DATA_TENANT_UPDATED: &str = "tenant_updated";
140
141/// Fallback `data` for `InvalidateEvent.kind` values P4 doesn't
142/// recognise. Defensive: a future writer command emits a new kind
143/// string we haven't mapped yet → the client still sees a refetch
144/// signal rather than the bridge dropping the event silently.
145pub const MCP_NOTIFICATION_DATA_MEMORY_UPDATED: &str = "memory_updated";
146
147/// Map one [`solo_core::InvalidateEvent`] into the MCP spec's
148/// `notifications/message` JSON envelope.
149///
150/// Output shape:
151///
152/// ```json
153/// {
154///   "jsonrpc": "2.0",
155///   "method": "notifications/message",
156///   "params": {
157///     "level": "info",
158///     "logger": "solo",
159///     "data": "memories_updated",
160///     "details": {
161///       "reason": "memory.remember",
162///       "tenant_id": "default",
163///       "ts_ms": 1715625600000,
164///       "kind": "episode"
165///     }
166///   }
167/// }
168/// ```
169///
170/// The `details` object preserves the full original event for clients
171/// that want the structured shape; the top-level `data` carries the
172/// coarse switch that drives a "refetch what you care about" UX. Per
173/// plan §3 Decision B + §6 P4, this is the canonical mapping the
174/// publish path uses.
175pub fn map_invalidate_to_message(event: &InvalidateEvent) -> serde_json::Value {
176    let data_kind = match event.kind.as_str() {
177        "episode" => MCP_NOTIFICATION_DATA_MEMORIES_UPDATED,
178        "document" | "chunk" => MCP_NOTIFICATION_DATA_DOCUMENTS_UPDATED,
179        "cluster" => MCP_NOTIFICATION_DATA_CONSOLIDATION_UPDATED,
180        "triple" => MCP_NOTIFICATION_DATA_GRAPH_UPDATED,
181        "tenant" => MCP_NOTIFICATION_DATA_TENANT_UPDATED,
182        _ => MCP_NOTIFICATION_DATA_MEMORY_UPDATED,
183    };
184    serde_json::json!({
185        "jsonrpc": "2.0",
186        "method": MCP_NOTIFICATION_MESSAGE_METHOD,
187        "params": {
188            "level": MCP_NOTIFICATION_MESSAGE_LEVEL,
189            "logger": MCP_NOTIFICATION_MESSAGE_LOGGER,
190            "data": data_kind,
191            "details": {
192                "reason": event.reason,
193                "tenant_id": event.tenant_id,
194                "ts_ms": event.ts_ms,
195                "kind": event.kind,
196            }
197        }
198    })
199}
200
201/// Spawn the per-session invalidate-bridge task.
202///
203/// Subscribes to the tenant's existing `invalidate_sender` BEFORE
204/// downgrading the `SessionState` Arc to a `Weak` so a publish that
205/// races the bridge spawn isn't dropped. The spawned task forwards
206/// each [`solo_core::InvalidateEvent`] into the session's broadcast
207/// channel as an [`McpEventKind::Message`] event whose JSON payload is
208/// the [`map_invalidate_to_message`] result.
209///
210/// Task lifecycle (the canonical "when does this exit" answer):
211///
212///   - **Session dropped from store** (TTL expiry or `SessionStore::delete`):
213///     the `Weak::upgrade` returns `None` on the next received event
214///     → task exits cleanly. Pinned by
215///     `session_task_exits_when_session_dropped`.
216///   - **Tenant handle dropped** (rare outside test shutdown — production
217///     keeps tenants alive for the daemon lifetime): `rx.recv()`
218///     returns `RecvError::Closed` → task exits cleanly.
219///   - **Subscriber lagged** (broadcast capacity overrun): log a `warn`
220///     and continue receiving. The next-event semantics matches
221///     `/v1/graph/stream` and ADR-0003's "invalidations are idempotent
222///     refetch signals" invariant.
223///
224/// Returns the spawned `JoinHandle` so callers that want explicit
225/// teardown (tests, future graceful-shutdown work) can await it; the
226/// production POST handler discards the handle and lets the task run
227/// detached until one of the exit conditions trips.
228pub fn spawn_invalidate_bridge(
229    tenant: Arc<TenantHandle>,
230    session: Arc<SessionState>,
231) -> tokio::task::JoinHandle<()> {
232    let rx = tenant.invalidate_sender().subscribe();
233    let weak = Arc::downgrade(&session);
234    // Drop the strong session ref BEFORE the spawn so the task only
235    // holds the Weak. Without this drop, the task captures a strong
236    // ref through the closure and never exits when the store evicts.
237    drop(session);
238    let session_id_for_log = match weak.upgrade() {
239        Some(s) => format!("{:p}", Arc::as_ptr(&s)),
240        None => "<dropped>".to_string(),
241    };
242    let task_session_id = session_id_for_log.clone();
243    tokio::spawn(async move {
244        run_invalidate_bridge(rx, weak, task_session_id).await;
245    })
246}
247
248/// Inner loop for the bridge task. Split out so unit tests can drive
249/// it without paying for `tokio::spawn`.
250async fn run_invalidate_bridge(
251    mut rx: broadcast::Receiver<InvalidateEvent>,
252    weak: Weak<SessionState>,
253    session_id_for_log: String,
254) {
255    loop {
256        match rx.recv().await {
257            Ok(event) => {
258                let Some(state) = weak.upgrade() else {
259                    tracing::debug!(
260                        session = %session_id_for_log,
261                        "mcp invalidate bridge: session dropped; exiting"
262                    );
263                    return;
264                };
265                let payload = map_invalidate_to_message(&event);
266                state.publish_event(McpEventKind::Message, payload);
267            }
268            Err(broadcast::error::RecvError::Lagged(n)) => {
269                tracing::warn!(
270                    lagged = n,
271                    session = %session_id_for_log,
272                    "mcp invalidate bridge subscriber lagged; client will \
273                     resync on the next real invalidate"
274                );
275            }
276            Err(broadcast::error::RecvError::Closed) => {
277                tracing::debug!(
278                    session = %session_id_for_log,
279                    "mcp invalidate bridge: tenant invalidate channel closed; exiting"
280                );
281                return;
282            }
283        }
284    }
285}
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290    use solo_core::TenantId;
291
292    fn fake_event(kind: &str) -> InvalidateEvent {
293        InvalidateEvent {
294            reason: "memory.remember".to_string(),
295            tenant_id: "default".to_string(),
296            ts_ms: 1_715_625_600_000,
297            kind: kind.to_string(),
298        }
299    }
300
301    /// Mapping table pin — each known `kind` produces the right `data`
302    /// discriminator. Drives every row in the mapping table from the
303    /// module docs.
304    #[test]
305    fn map_invalidate_to_message_routes_each_kind_to_data_discriminator() {
306        let cases = [
307            ("episode", MCP_NOTIFICATION_DATA_MEMORIES_UPDATED),
308            ("document", MCP_NOTIFICATION_DATA_DOCUMENTS_UPDATED),
309            ("chunk", MCP_NOTIFICATION_DATA_DOCUMENTS_UPDATED),
310            ("cluster", MCP_NOTIFICATION_DATA_CONSOLIDATION_UPDATED),
311            ("triple", MCP_NOTIFICATION_DATA_GRAPH_UPDATED),
312            ("tenant", MCP_NOTIFICATION_DATA_TENANT_UPDATED),
313            // Unknown kinds fall back to the generic discriminator so a
314            // future writer kind isn't dropped silently.
315            (
316                "hypothetical_new_kind",
317                MCP_NOTIFICATION_DATA_MEMORY_UPDATED,
318            ),
319        ];
320        for (kind, expected_data) in cases {
321            let msg = map_invalidate_to_message(&fake_event(kind));
322            assert_eq!(
323                msg["params"]["data"].as_str(),
324                Some(expected_data),
325                "kind={kind} must map to data={expected_data}",
326            );
327        }
328    }
329
330    /// Pin the spec envelope shape: `jsonrpc=2.0` + `method =
331    /// notifications/message` + `params.{level,logger,data,details}` all
332    /// present. Grep-able by the constants so any drift surfaces here.
333    #[test]
334    fn map_invalidate_to_message_uses_jsonrpc_notifications_message_method() {
335        let msg = map_invalidate_to_message(&fake_event("episode"));
336        assert_eq!(msg["jsonrpc"].as_str(), Some("2.0"));
337        assert_eq!(
338            msg["method"].as_str(),
339            Some(MCP_NOTIFICATION_MESSAGE_METHOD),
340        );
341        // params.{level,logger,data} per the spec; details is Solo-
342        // additive (allowed by the MCP spec — params is open-shape).
343        let params = &msg["params"];
344        assert_eq!(
345            params["level"].as_str(),
346            Some(MCP_NOTIFICATION_MESSAGE_LEVEL)
347        );
348        assert_eq!(
349            params["logger"].as_str(),
350            Some(MCP_NOTIFICATION_MESSAGE_LOGGER),
351        );
352        assert!(params["data"].is_string(), "data must be present + string");
353    }
354
355    /// Pin that `level` is `info` for every emitted message (the
356    /// invariant from the module-level docstring: invalidate events
357    /// only fire post-commit, so there's no error/warning path on this
358    /// bridge).
359    #[test]
360    fn mcp_message_payload_includes_level_and_data() {
361        let msg = map_invalidate_to_message(&fake_event("episode"));
362        let params = &msg["params"];
363        assert_eq!(
364            params["level"].as_str(),
365            Some("info"),
366            "level must be 'info' for invalidate-bridged messages",
367        );
368        assert_eq!(
369            params["data"].as_str(),
370            Some(MCP_NOTIFICATION_DATA_MEMORIES_UPDATED),
371        );
372        // Details object carries the structured original event for
373        // clients that want the granular shape — pinned here so a
374        // future refactor that drops `details` from the wire surface
375        // breaks this test loudly.
376        assert_eq!(
377            params["details"]["reason"].as_str(),
378            Some("memory.remember"),
379        );
380        assert_eq!(params["details"]["kind"].as_str(), Some("episode"),);
381        assert!(params["details"]["ts_ms"].is_number());
382    }
383
384    /// End-to-end: subscribe to a session's events, drive an invalidate
385    /// through the bridge directly (without a real TenantHandle), and
386    /// confirm the session receives the mapped MCP message.
387    #[tokio::test]
388    async fn run_invalidate_bridge_publishes_mcp_message_to_session() {
389        // Build a fake broadcast and an empty session.
390        let (tx, _) = broadcast::channel::<InvalidateEvent>(16);
391        let state = Arc::new(SessionState::new(TenantId::default_tenant(), None));
392        let rx = tx.subscribe();
393        let weak = Arc::downgrade(&state);
394        // Pre-subscribe to the SESSION channel so we can read what the
395        // bridge publishes.
396        let mut session_rx = state.subscribe_events();
397        // Spawn the bridge.
398        let bridge_handle = tokio::spawn(async move {
399            run_invalidate_bridge(rx, weak, "test-session".to_string()).await;
400        });
401        // Fire one invalidate.
402        tx.send(fake_event("document"))
403            .expect("at least one subscriber");
404        // Bridge should publish exactly one MCP Message event onto the
405        // session channel.
406        let received = tokio::time::timeout(std::time::Duration::from_secs(2), session_rx.recv())
407            .await
408            .expect("event must arrive within 2s")
409            .expect("session receiver must receive published event");
410        assert_eq!(received.event, McpEventKind::Message);
411        assert_eq!(
412            received.data["method"].as_str(),
413            Some(MCP_NOTIFICATION_MESSAGE_METHOD),
414        );
415        assert_eq!(
416            received.data["params"]["data"].as_str(),
417            Some(MCP_NOTIFICATION_DATA_DOCUMENTS_UPDATED),
418        );
419        // Drop the session strong ref → next invalidate should exit
420        // the bridge.
421        drop(state);
422        tx.send(fake_event("episode")).ok();
423        // Bridge task should observe the dropped session and exit
424        // within ~1s.
425        tokio::time::timeout(std::time::Duration::from_secs(2), bridge_handle)
426            .await
427            .expect("bridge must exit when session drops")
428            .expect("bridge task panic");
429    }
430
431    /// Bridge exits when the session is dropped from the store.
432    /// Specifically pins the `Weak::upgrade == None` exit path.
433    #[tokio::test]
434    async fn session_task_exits_when_session_dropped() {
435        let (tx, _) = broadcast::channel::<InvalidateEvent>(16);
436        let state = Arc::new(SessionState::new(TenantId::default_tenant(), None));
437        let rx = tx.subscribe();
438        let weak = Arc::downgrade(&state);
439        let bridge = tokio::spawn(async move {
440            run_invalidate_bridge(rx, weak, "test-session-drop".to_string()).await;
441        });
442        // Drop the session BEFORE the bridge sees any events.
443        drop(state);
444        // Fire one event; the bridge upgrades the Weak, sees None,
445        // and exits.
446        tx.send(fake_event("episode")).ok();
447        tokio::time::timeout(std::time::Duration::from_secs(2), bridge)
448            .await
449            .expect("bridge must exit within 2s of session drop")
450            .expect("bridge task panic");
451    }
452
453    /// Bridge exits when the broadcast channel closes (TenantHandle
454    /// dropped). Pins the `RecvError::Closed` exit path.
455    #[tokio::test]
456    async fn bridge_exits_when_tenant_invalidate_channel_closes() {
457        let (tx, _) = broadcast::channel::<InvalidateEvent>(16);
458        let state = Arc::new(SessionState::new(TenantId::default_tenant(), None));
459        let rx = tx.subscribe();
460        let weak = Arc::downgrade(&state);
461        let bridge = tokio::spawn(async move {
462            run_invalidate_bridge(rx, weak, "test-tenant-drop".to_string()).await;
463        });
464        // Drop the broadcast Sender → channel closes.
465        drop(tx);
466        tokio::time::timeout(std::time::Duration::from_secs(2), bridge)
467            .await
468            .expect("bridge must exit within 2s of channel close")
469            .expect("bridge task panic");
470        // Keep session alive — it should be retrievable after the
471        // bridge exits (proves the bridge didn't accidentally take
472        // ownership).
473        let _id = state.publish_event(McpEventKind::Message, serde_json::json!({"alive": true}));
474    }
475
476    /// Bridge ignores `RecvError::Lagged` and continues forwarding.
477    /// Drives the warn-and-continue path explicitly.
478    #[tokio::test]
479    async fn bridge_logs_and_continues_on_lagged_subscriber() {
480        // Channel cap 2 — easy to overrun.
481        let (tx, _) = broadcast::channel::<InvalidateEvent>(2);
482        let state = Arc::new(SessionState::new(TenantId::default_tenant(), None));
483        let rx = tx.subscribe();
484        let weak = Arc::downgrade(&state);
485        let mut session_rx = state.subscribe_events();
486        let bridge = tokio::spawn(async move {
487            run_invalidate_bridge(rx, weak, "test-lag".to_string()).await;
488        });
489        // Publish 5 events before the bridge has a chance to drain. The
490        // bridge's broadcast receiver will lag past capacity 2.
491        for _ in 0..5 {
492            tx.send(fake_event("episode")).ok();
493        }
494        // The bridge MUST keep running — even if it missed some events
495        // due to lag, it should forward the next one. Send another
496        // event after a brief settle.
497        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
498        tx.send(fake_event("triple")).expect("send must succeed");
499        // The bridge should publish at least one mapped event to the
500        // session. Don't assert exact count (lag is by design lossy);
501        // assert that the bridge keeps forwarding after a lag.
502        let _ = tokio::time::timeout(std::time::Duration::from_secs(2), session_rx.recv())
503            .await
504            .expect("bridge must still be alive after lag")
505            .expect("session must receive at least one event");
506        // Tear down.
507        drop(state);
508        drop(tx);
509        // Bridge should now exit either via closed channel OR weak
510        // upgrade failure on the next message — either is fine.
511        let _ = tokio::time::timeout(std::time::Duration::from_secs(2), bridge).await;
512    }
513}