solo_api/mcp_notify.rs
1// SPDX-License-Identifier: Apache-2.0
2
3//! v0.11.0 P4 — bridge per-tenant `InvalidateEvent` broadcasts into
4//! MCP `notifications/message` events on each session's SSE stream.
5//!
6//! ## Scope
7//!
8//! v0.11.0 ships only the two MCP-spec-defined notification shapes
9//! (`notifications/progress` and `notifications/message`) per plan §3
10//! Decision B. P3 wired `notifications/progress` (per-tool progress).
11//! P4 wires `notifications/message` by subscribing to each tenant's
12//! existing `TenantHandle::invalidate_sender()` broadcast channel (the
13//! same channel that powers `/v1/graph/stream` for solo-web) and mapping
14//! each `InvalidateEvent` into a JSON-RPC `notifications/message`
15//! envelope on the per-session broadcast channel from P1+P2.
16//!
17//! ## Mapping policy (locked for v0.11.0)
18//!
19//! `InvalidateEvent` carries a `kind` discriminator (`episode`,
20//! `document`, `chunk`, `cluster`, `triple`, `tenant`) and a `reason`
21//! string (`memory.remember`, `memory.forget`, …) per
22//! `docs/dev-log/0118-graph-stream-impl.md`. P4 maps these conservatively
23//! into MCP `data` strings the client can switch on:
24//!
25//! | `kind` | `data` string | Notes |
26//! |------------|------------------------|-------|
27//! | `episode` | `memories_updated` | Coarse signal: any episode-level change |
28//! | `document` | `documents_updated` | Ingest / forget at document granularity |
29//! | `chunk` | `documents_updated` | Coalesced under documents (chunks are a leaf of one document) |
30//! | `cluster` | `consolidation_updated`| Steward consolidation landed |
31//! | `triple` | `graph_updated` | Triples-extract committed |
32//! | `tenant` | `tenant_updated` | GDPR cascade or tenant lifecycle |
33//! | other | `memory_updated` | Fallback so a future writer kind never silently drops |
34//!
35//! `level = "info"` for every variant; `logger = "solo"`. The MCP spec's
36//! `notifications/message` shape allows `level` ∈ {debug, info, warning,
37//! error}; v0.11.0 emits at `info` because every wire shape is a
38//! committed write (invariant: `InvalidateEvent` is only sent after the
39//! writer-actor's commit succeeds — see `solo_core::InvalidateEvent`'s
40//! docstring).
41//!
42//! Each emitted MCP message ALSO carries the original `reason`,
43//! `tenant_id`, and `ts_ms` in a nested `details` object so clients that
44//! want the structured shape get it without losing the coarse
45//! `data` switch. The two halves stay in lock-step — `data` is what to
46//! refetch; `details.reason` is why.
47//!
48//! ## Per-session task lifecycle
49//!
50//! For each session created by `POST /mcp`, the POST handler calls
51//! [`spawn_invalidate_bridge`] passing the freshly-resolved
52//! `Arc<TenantHandle>` + `Arc<SessionState>`. The function:
53//!
54//! 1. Calls `tenant.invalidate_sender().subscribe()` BEFORE downgrading
55//! the session Arc to a `Weak<SessionState>`.
56//! 2. Spawns a `tokio::spawn` task that loops on `rx.recv()`:
57//! - `Ok(event)` → map via [`map_invalidate_to_message`], call
58//! `session.publish_event(Message, envelope)` via the `Weak`
59//! upgrade. Returns immediately if upgrade fails (session
60//! dropped — task exits).
61//! - `Err(RecvError::Lagged(n))` → log warn (matches
62//! `/v1/graph/stream` discipline) and continue. The session's
63//! own event-channel lag handling kicks in on the GET side; the
64//! bridge itself is best-effort fan-out.
65//! - `Err(RecvError::Closed)` → tenant handle dropped (rare
66//! outside test shutdown); task exits cleanly.
67//!
68//! The task holds NO strong reference to `SessionState` — only a
69//! `Weak`. The session's `Arc` lives in the [`SessionStore`]; when the
70//! store drops the session (TTL expiry or `delete`), the Weak upgrade
71//! fails on the next event and the task exits. This is the
72//! `session_task_exits_when_session_dropped` test contract.
73//!
74//! ## Out of scope (v0.12.0+)
75//!
76//! - **Coalescing rapid bursts.** v0.11.0 emits one MCP message per
77//! `InvalidateEvent`. A pathological pattern (e.g. 1000 rapid
78//! `memory.remember` calls under one session) would emit 1000
79//! messages. The per-session broadcast channel is bounded
80//! ([`crate::mcp_session::MCP_SESSION_EVENT_BUFFER_CAPACITY`] = 256),
81//! so a slow GET subscriber would see `event: lagged` per P2 rather
82//! than memory growth. A future v0.12.0 follow-up could coalesce
83//! bursts at the bridge tier (e.g. "if N events of the same kind in
84//! 1s, emit one"); not needed for v0.11.0 launch.
85//! - **Solo-custom event types** (`audit_event`, `memory_consolidated`,
86//! `tenant_changed` as Solo-custom shapes). Deferred to v0.12.0+ per
87//! plan §3 Decision B. P4 ships only spec-compliant
88//! `notifications/message`.
89//! - **Per-event auth scoping.** The bridge inherits the session's
90//! bound tenant; cross-tenant filtering happens at the broadcast
91//! layer (each tenant has its own `invalidate_sender`) so cross-
92//! tenant leakage is impossible by construction.
93
94use std::sync::{Arc, Weak};
95
96use solo_core::InvalidateEvent;
97use solo_storage::TenantHandle;
98use tokio::sync::broadcast;
99
100use crate::mcp_session::{McpEventKind, SessionState};
101
102/// JSON-RPC method literal for MCP `notifications/message`. Held as
103/// `pub const` so audits can grep the wire literal once and trust it
104/// never drifts (Lesson #25 / #39 grep-ability).
105pub const MCP_NOTIFICATION_MESSAGE_METHOD: &str = "notifications/message";
106
107/// `level` field of every MCP `notifications/message` envelope this
108/// bridge emits. The spec permits `debug` / `info` / `warning` /
109/// `error`; v0.11.0 emits `info` because the invariant on
110/// [`solo_core::InvalidateEvent`] is that it ONLY fires after a
111/// successful writer-actor commit — there is no error/warning path.
112pub const MCP_NOTIFICATION_MESSAGE_LEVEL: &str = "info";
113
114/// `logger` field of every MCP `notifications/message` envelope this
115/// bridge emits. The MCP spec requires `logger` to be a server-chosen
116/// identifier — `"solo"` distinguishes our messages from any
117/// proxy / middleware that might emit on the same stream.
118pub const MCP_NOTIFICATION_MESSAGE_LOGGER: &str = "solo";
119
120/// `data` discriminator for episode-level invalidations. Coarse
121/// signal: "any episode-level memory changed; refetch what you care
122/// about". Maps `InvalidateEvent.kind = "episode"`.
123pub const MCP_NOTIFICATION_DATA_MEMORIES_UPDATED: &str = "memories_updated";
124
125/// `data` discriminator for document-level invalidations. Includes
126/// chunks (coalesced — chunks are a leaf of a parent document, so
127/// refetching the document covers both).
128pub const MCP_NOTIFICATION_DATA_DOCUMENTS_UPDATED: &str = "documents_updated";
129
130/// `data` discriminator for consolidation/cluster invalidations.
131pub const MCP_NOTIFICATION_DATA_CONSOLIDATION_UPDATED: &str = "consolidation_updated";
132
133/// `data` discriminator for graph (triples) invalidations.
134pub const MCP_NOTIFICATION_DATA_GRAPH_UPDATED: &str = "graph_updated";
135
136/// `data` discriminator for tenant-lifecycle invalidations (today
137/// fired by the GDPR `forget_user` cascade per
138/// `solo_core::InvalidateEvent`'s kind taxonomy).
139pub const MCP_NOTIFICATION_DATA_TENANT_UPDATED: &str = "tenant_updated";
140
141/// Fallback `data` for `InvalidateEvent.kind` values P4 doesn't
142/// recognise. Defensive: a future writer command emits a new kind
143/// string we haven't mapped yet → the client still sees a refetch
144/// signal rather than the bridge dropping the event silently.
145pub const MCP_NOTIFICATION_DATA_MEMORY_UPDATED: &str = "memory_updated";
146
147/// Map one [`solo_core::InvalidateEvent`] into the MCP spec's
148/// `notifications/message` JSON envelope.
149///
150/// Output shape:
151///
152/// ```json
153/// {
154/// "jsonrpc": "2.0",
155/// "method": "notifications/message",
156/// "params": {
157/// "level": "info",
158/// "logger": "solo",
159/// "data": "memories_updated",
160/// "details": {
161/// "reason": "memory.remember",
162/// "tenant_id": "default",
163/// "ts_ms": 1715625600000,
164/// "kind": "episode"
165/// }
166/// }
167/// }
168/// ```
169///
170/// The `details` object preserves the full original event for clients
171/// that want the structured shape; the top-level `data` carries the
172/// coarse switch that drives a "refetch what you care about" UX. Per
173/// plan §3 Decision B + §6 P4, this is the canonical mapping the
174/// publish path uses.
175pub fn map_invalidate_to_message(event: &InvalidateEvent) -> serde_json::Value {
176 let data_kind = match event.kind.as_str() {
177 "episode" => MCP_NOTIFICATION_DATA_MEMORIES_UPDATED,
178 "document" | "chunk" => MCP_NOTIFICATION_DATA_DOCUMENTS_UPDATED,
179 "cluster" => MCP_NOTIFICATION_DATA_CONSOLIDATION_UPDATED,
180 "triple" => MCP_NOTIFICATION_DATA_GRAPH_UPDATED,
181 "tenant" => MCP_NOTIFICATION_DATA_TENANT_UPDATED,
182 _ => MCP_NOTIFICATION_DATA_MEMORY_UPDATED,
183 };
184 serde_json::json!({
185 "jsonrpc": "2.0",
186 "method": MCP_NOTIFICATION_MESSAGE_METHOD,
187 "params": {
188 "level": MCP_NOTIFICATION_MESSAGE_LEVEL,
189 "logger": MCP_NOTIFICATION_MESSAGE_LOGGER,
190 "data": data_kind,
191 "details": {
192 "reason": event.reason,
193 "tenant_id": event.tenant_id,
194 "ts_ms": event.ts_ms,
195 "kind": event.kind,
196 }
197 }
198 })
199}
200
201/// Spawn the per-session invalidate-bridge task.
202///
203/// Subscribes to the tenant's existing `invalidate_sender` BEFORE
204/// downgrading the `SessionState` Arc to a `Weak` so a publish that
205/// races the bridge spawn isn't dropped. The spawned task forwards
206/// each [`solo_core::InvalidateEvent`] into the session's broadcast
207/// channel as an [`McpEventKind::Message`] event whose JSON payload is
208/// the [`map_invalidate_to_message`] result.
209///
210/// Task lifecycle (the canonical "when does this exit" answer):
211///
212/// - **Session dropped from store** (TTL expiry or `SessionStore::delete`):
213/// the `Weak::upgrade` returns `None` on the next received event
214/// → task exits cleanly. Pinned by
215/// `session_task_exits_when_session_dropped`.
216/// - **Tenant handle dropped** (rare outside test shutdown — production
217/// keeps tenants alive for the daemon lifetime): `rx.recv()`
218/// returns `RecvError::Closed` → task exits cleanly.
219/// - **Subscriber lagged** (broadcast capacity overrun): log a `warn`
220/// and continue receiving. The next-event semantics matches
221/// `/v1/graph/stream` and ADR-0003's "invalidations are idempotent
222/// refetch signals" invariant.
223///
224/// Returns the spawned `JoinHandle` so callers that want explicit
225/// teardown (tests, future graceful-shutdown work) can await it; the
226/// production POST handler discards the handle and lets the task run
227/// detached until one of the exit conditions trips.
228pub fn spawn_invalidate_bridge(
229 tenant: Arc<TenantHandle>,
230 session: Arc<SessionState>,
231) -> tokio::task::JoinHandle<()> {
232 let rx = tenant.invalidate_sender().subscribe();
233 let weak = Arc::downgrade(&session);
234 // Drop the strong session ref BEFORE the spawn so the task only
235 // holds the Weak. Without this drop, the task captures a strong
236 // ref through the closure and never exits when the store evicts.
237 drop(session);
238 let session_id_for_log = match weak.upgrade() {
239 Some(s) => format!("{:p}", Arc::as_ptr(&s)),
240 None => "<dropped>".to_string(),
241 };
242 let task_session_id = session_id_for_log.clone();
243 tokio::spawn(async move {
244 run_invalidate_bridge(rx, weak, task_session_id).await;
245 })
246}
247
248/// Inner loop for the bridge task. Split out so unit tests can drive
249/// it without paying for `tokio::spawn`.
250async fn run_invalidate_bridge(
251 mut rx: broadcast::Receiver<InvalidateEvent>,
252 weak: Weak<SessionState>,
253 session_id_for_log: String,
254) {
255 loop {
256 match rx.recv().await {
257 Ok(event) => {
258 let Some(state) = weak.upgrade() else {
259 tracing::debug!(
260 session = %session_id_for_log,
261 "mcp invalidate bridge: session dropped; exiting"
262 );
263 return;
264 };
265 let payload = map_invalidate_to_message(&event);
266 state.publish_event(McpEventKind::Message, payload);
267 }
268 Err(broadcast::error::RecvError::Lagged(n)) => {
269 tracing::warn!(
270 lagged = n,
271 session = %session_id_for_log,
272 "mcp invalidate bridge subscriber lagged; client will \
273 resync on the next real invalidate"
274 );
275 }
276 Err(broadcast::error::RecvError::Closed) => {
277 tracing::debug!(
278 session = %session_id_for_log,
279 "mcp invalidate bridge: tenant invalidate channel closed; exiting"
280 );
281 return;
282 }
283 }
284 }
285}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290 use solo_core::TenantId;
291
292 fn fake_event(kind: &str) -> InvalidateEvent {
293 InvalidateEvent {
294 reason: "memory.remember".to_string(),
295 tenant_id: "default".to_string(),
296 ts_ms: 1_715_625_600_000,
297 kind: kind.to_string(),
298 }
299 }
300
301 /// Mapping table pin — each known `kind` produces the right `data`
302 /// discriminator. Drives every row in the mapping table from the
303 /// module docs.
304 #[test]
305 fn map_invalidate_to_message_routes_each_kind_to_data_discriminator() {
306 let cases = [
307 ("episode", MCP_NOTIFICATION_DATA_MEMORIES_UPDATED),
308 ("document", MCP_NOTIFICATION_DATA_DOCUMENTS_UPDATED),
309 ("chunk", MCP_NOTIFICATION_DATA_DOCUMENTS_UPDATED),
310 ("cluster", MCP_NOTIFICATION_DATA_CONSOLIDATION_UPDATED),
311 ("triple", MCP_NOTIFICATION_DATA_GRAPH_UPDATED),
312 ("tenant", MCP_NOTIFICATION_DATA_TENANT_UPDATED),
313 // Unknown kinds fall back to the generic discriminator so a
314 // future writer kind isn't dropped silently.
315 (
316 "hypothetical_new_kind",
317 MCP_NOTIFICATION_DATA_MEMORY_UPDATED,
318 ),
319 ];
320 for (kind, expected_data) in cases {
321 let msg = map_invalidate_to_message(&fake_event(kind));
322 assert_eq!(
323 msg["params"]["data"].as_str(),
324 Some(expected_data),
325 "kind={kind} must map to data={expected_data}",
326 );
327 }
328 }
329
330 /// Pin the spec envelope shape: `jsonrpc=2.0` + `method =
331 /// notifications/message` + `params.{level,logger,data,details}` all
332 /// present. Grep-able by the constants so any drift surfaces here.
333 #[test]
334 fn map_invalidate_to_message_uses_jsonrpc_notifications_message_method() {
335 let msg = map_invalidate_to_message(&fake_event("episode"));
336 assert_eq!(msg["jsonrpc"].as_str(), Some("2.0"));
337 assert_eq!(
338 msg["method"].as_str(),
339 Some(MCP_NOTIFICATION_MESSAGE_METHOD),
340 );
341 // params.{level,logger,data} per the spec; details is Solo-
342 // additive (allowed by the MCP spec — params is open-shape).
343 let params = &msg["params"];
344 assert_eq!(
345 params["level"].as_str(),
346 Some(MCP_NOTIFICATION_MESSAGE_LEVEL)
347 );
348 assert_eq!(
349 params["logger"].as_str(),
350 Some(MCP_NOTIFICATION_MESSAGE_LOGGER),
351 );
352 assert!(params["data"].is_string(), "data must be present + string");
353 }
354
355 /// Pin that `level` is `info` for every emitted message (the
356 /// invariant from the module-level docstring: invalidate events
357 /// only fire post-commit, so there's no error/warning path on this
358 /// bridge).
359 #[test]
360 fn mcp_message_payload_includes_level_and_data() {
361 let msg = map_invalidate_to_message(&fake_event("episode"));
362 let params = &msg["params"];
363 assert_eq!(
364 params["level"].as_str(),
365 Some("info"),
366 "level must be 'info' for invalidate-bridged messages",
367 );
368 assert_eq!(
369 params["data"].as_str(),
370 Some(MCP_NOTIFICATION_DATA_MEMORIES_UPDATED),
371 );
372 // Details object carries the structured original event for
373 // clients that want the granular shape — pinned here so a
374 // future refactor that drops `details` from the wire surface
375 // breaks this test loudly.
376 assert_eq!(
377 params["details"]["reason"].as_str(),
378 Some("memory.remember"),
379 );
380 assert_eq!(params["details"]["kind"].as_str(), Some("episode"),);
381 assert!(params["details"]["ts_ms"].is_number());
382 }
383
384 /// End-to-end: subscribe to a session's events, drive an invalidate
385 /// through the bridge directly (without a real TenantHandle), and
386 /// confirm the session receives the mapped MCP message.
387 #[tokio::test]
388 async fn run_invalidate_bridge_publishes_mcp_message_to_session() {
389 // Build a fake broadcast and an empty session.
390 let (tx, _) = broadcast::channel::<InvalidateEvent>(16);
391 let state = Arc::new(SessionState::new(TenantId::default_tenant(), None));
392 let rx = tx.subscribe();
393 let weak = Arc::downgrade(&state);
394 // Pre-subscribe to the SESSION channel so we can read what the
395 // bridge publishes.
396 let mut session_rx = state.subscribe_events();
397 // Spawn the bridge.
398 let bridge_handle = tokio::spawn(async move {
399 run_invalidate_bridge(rx, weak, "test-session".to_string()).await;
400 });
401 // Fire one invalidate.
402 tx.send(fake_event("document"))
403 .expect("at least one subscriber");
404 // Bridge should publish exactly one MCP Message event onto the
405 // session channel.
406 let received = tokio::time::timeout(std::time::Duration::from_secs(2), session_rx.recv())
407 .await
408 .expect("event must arrive within 2s")
409 .expect("session receiver must receive published event");
410 assert_eq!(received.event, McpEventKind::Message);
411 assert_eq!(
412 received.data["method"].as_str(),
413 Some(MCP_NOTIFICATION_MESSAGE_METHOD),
414 );
415 assert_eq!(
416 received.data["params"]["data"].as_str(),
417 Some(MCP_NOTIFICATION_DATA_DOCUMENTS_UPDATED),
418 );
419 // Drop the session strong ref → next invalidate should exit
420 // the bridge.
421 drop(state);
422 tx.send(fake_event("episode")).ok();
423 // Bridge task should observe the dropped session and exit
424 // within ~1s.
425 tokio::time::timeout(std::time::Duration::from_secs(2), bridge_handle)
426 .await
427 .expect("bridge must exit when session drops")
428 .expect("bridge task panic");
429 }
430
431 /// Bridge exits when the session is dropped from the store.
432 /// Specifically pins the `Weak::upgrade == None` exit path.
433 #[tokio::test]
434 async fn session_task_exits_when_session_dropped() {
435 let (tx, _) = broadcast::channel::<InvalidateEvent>(16);
436 let state = Arc::new(SessionState::new(TenantId::default_tenant(), None));
437 let rx = tx.subscribe();
438 let weak = Arc::downgrade(&state);
439 let bridge = tokio::spawn(async move {
440 run_invalidate_bridge(rx, weak, "test-session-drop".to_string()).await;
441 });
442 // Drop the session BEFORE the bridge sees any events.
443 drop(state);
444 // Fire one event; the bridge upgrades the Weak, sees None,
445 // and exits.
446 tx.send(fake_event("episode")).ok();
447 tokio::time::timeout(std::time::Duration::from_secs(2), bridge)
448 .await
449 .expect("bridge must exit within 2s of session drop")
450 .expect("bridge task panic");
451 }
452
453 /// Bridge exits when the broadcast channel closes (TenantHandle
454 /// dropped). Pins the `RecvError::Closed` exit path.
455 #[tokio::test]
456 async fn bridge_exits_when_tenant_invalidate_channel_closes() {
457 let (tx, _) = broadcast::channel::<InvalidateEvent>(16);
458 let state = Arc::new(SessionState::new(TenantId::default_tenant(), None));
459 let rx = tx.subscribe();
460 let weak = Arc::downgrade(&state);
461 let bridge = tokio::spawn(async move {
462 run_invalidate_bridge(rx, weak, "test-tenant-drop".to_string()).await;
463 });
464 // Drop the broadcast Sender → channel closes.
465 drop(tx);
466 tokio::time::timeout(std::time::Duration::from_secs(2), bridge)
467 .await
468 .expect("bridge must exit within 2s of channel close")
469 .expect("bridge task panic");
470 // Keep session alive — it should be retrievable after the
471 // bridge exits (proves the bridge didn't accidentally take
472 // ownership).
473 let _id = state.publish_event(McpEventKind::Message, serde_json::json!({"alive": true}));
474 }
475
476 /// Bridge ignores `RecvError::Lagged` and continues forwarding.
477 /// Drives the warn-and-continue path explicitly.
478 #[tokio::test]
479 async fn bridge_logs_and_continues_on_lagged_subscriber() {
480 // Channel cap 2 — easy to overrun.
481 let (tx, _) = broadcast::channel::<InvalidateEvent>(2);
482 let state = Arc::new(SessionState::new(TenantId::default_tenant(), None));
483 let rx = tx.subscribe();
484 let weak = Arc::downgrade(&state);
485 let mut session_rx = state.subscribe_events();
486 let bridge = tokio::spawn(async move {
487 run_invalidate_bridge(rx, weak, "test-lag".to_string()).await;
488 });
489 // Publish 5 events before the bridge has a chance to drain. The
490 // bridge's broadcast receiver will lag past capacity 2.
491 for _ in 0..5 {
492 tx.send(fake_event("episode")).ok();
493 }
494 // The bridge MUST keep running — even if it missed some events
495 // due to lag, it should forward the next one. Send another
496 // event after a brief settle.
497 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
498 tx.send(fake_event("triple")).expect("send must succeed");
499 // The bridge should publish at least one mapped event to the
500 // session. Don't assert exact count (lag is by design lossy);
501 // assert that the bridge keeps forwarding after a lag.
502 let _ = tokio::time::timeout(std::time::Duration::from_secs(2), session_rx.recv())
503 .await
504 .expect("bridge must still be alive after lag")
505 .expect("session must receive at least one event");
506 // Tear down.
507 drop(state);
508 drop(tx);
509 // Bridge should now exit either via closed channel OR weak
510 // upgrade failure on the next message — either is fine.
511 let _ = tokio::time::timeout(std::time::Duration::from_secs(2), bridge).await;
512 }
513}