Skip to main content

lean_ctx/core/context_os/
context_bus.rs

1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::{Arc, Mutex};
4
5use chrono::{DateTime, Utc};
6use rusqlite::{params, Connection};
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use tokio::sync::broadcast;
10
11const MAX_READ_CONNS: usize = 4;
12
13#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
14#[serde(rename_all = "snake_case")]
15pub enum ContextEventKindV1 {
16    ToolCallRecorded,
17    SessionMutated,
18    KnowledgeRemembered,
19    ArtifactStored,
20    GraphBuilt,
21    ProofAdded,
22}
23
24impl ContextEventKindV1 {
25    pub fn as_str(&self) -> &'static str {
26        match self {
27            Self::ToolCallRecorded => "tool_call_recorded",
28            Self::SessionMutated => "session_mutated",
29            Self::KnowledgeRemembered => "knowledge_remembered",
30            Self::ArtifactStored => "artifact_stored",
31            Self::GraphBuilt => "graph_built",
32            Self::ProofAdded => "proof_added",
33        }
34    }
35
36    pub fn parse(s: &str) -> Self {
37        match s.trim().to_lowercase().as_str() {
38            "tool_call_recorded" => Self::ToolCallRecorded,
39            "session_mutated" => Self::SessionMutated,
40            "knowledge_remembered" => Self::KnowledgeRemembered,
41            "artifact_stored" => Self::ArtifactStored,
42            "graph_built" => Self::GraphBuilt,
43            "proof_added" => Self::ProofAdded,
44            other => {
45                tracing::warn!(
46                    "unknown ContextEventKind '{other}', defaulting to ToolCallRecorded"
47                );
48                Self::ToolCallRecorded
49            }
50        }
51    }
52
53    /// Classifies the consistency requirement for this event kind.
54    ///
55    /// - `Local`: Agent-local, never shared (tool reads, cache hits).
56    /// - `Eventual`: Broadcast via bus, other agents see it "soon" (knowledge, artifacts).
57    /// - `Strong`: Critical decisions that require acknowledgment before proceeding.
58    pub fn consistency_level(&self) -> ConsistencyLevel {
59        match self {
60            Self::ToolCallRecorded | Self::GraphBuilt => ConsistencyLevel::Local,
61            Self::KnowledgeRemembered | Self::ArtifactStored => ConsistencyLevel::Eventual,
62            Self::SessionMutated | Self::ProofAdded => ConsistencyLevel::Strong,
63        }
64    }
65}
66
67/// Consistency requirement for shared context events.
68#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
69#[serde(rename_all = "snake_case")]
70pub enum ConsistencyLevel {
71    /// Agent-local, authoritative: session task, local cache, current file set.
72    Local,
73    /// Shared, eventually consistent: knowledge facts, gotchas, artifact refs.
74    Eventual,
75    /// Shared, strongly consistent: workspace config, critical decisions.
76    Strong,
77}
78
79impl ConsistencyLevel {
80    pub fn as_str(&self) -> &'static str {
81        match self {
82            Self::Local => "local",
83            Self::Eventual => "eventual",
84            Self::Strong => "strong",
85        }
86    }
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
90#[serde(rename_all = "camelCase")]
91pub struct ContextEventV1 {
92    pub id: i64,
93    pub workspace_id: String,
94    pub channel_id: String,
95    pub kind: String,
96    pub actor: Option<String>,
97    pub timestamp: DateTime<Utc>,
98    pub version: i64,
99    #[serde(skip_serializing_if = "Option::is_none")]
100    pub parent_id: Option<i64>,
101    pub consistency_level: String,
102    pub payload: Value,
103}
104
105impl ContextEventV1 {
106    pub fn consistency(&self) -> ConsistencyLevel {
107        ContextEventKindV1::parse(&self.kind).consistency_level()
108    }
109}
110
111fn event_from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<ContextEventV1> {
112    let ts_str: String = row.get(5)?;
113    let ts = DateTime::parse_from_rfc3339(&ts_str)
114        .map_or_else(|_| Utc::now(), |d| d.with_timezone(&Utc));
115    let payload_str: String = row.get(6)?;
116    let payload: Value = serde_json::from_str(&payload_str).unwrap_or(Value::Null);
117    let kind_str: String = row.get(3)?;
118    let cl = ContextEventKindV1::parse(&kind_str)
119        .consistency_level()
120        .as_str()
121        .to_string();
122    Ok(ContextEventV1 {
123        id: row.get(0)?,
124        workspace_id: row.get(1)?,
125        channel_id: row.get(2)?,
126        kind: kind_str,
127        actor: row.get::<_, Option<String>>(4)?,
128        timestamp: ts,
129        version: row.get::<_, i64>(7).unwrap_or(0),
130        parent_id: row.get::<_, Option<i64>>(8).ok().flatten(),
131        consistency_level: cl,
132        payload,
133    })
134}
135
136#[derive(Clone)]
137pub struct ContextBus {
138    inner: Arc<Inner>,
139}
140
141const STREAM_CHANNEL_SIZE: usize = 256;
142
143struct Inner {
144    write_conn: Mutex<Connection>,
145    read_pool: Mutex<Vec<Connection>>,
146    streams: Mutex<HashMap<String, broadcast::Sender<ContextEventV1>>>,
147    version_cache: Mutex<HashMap<String, i64>>,
148    db_path: PathBuf,
149}
150
151impl Inner {
152    fn open_read_conn(path: &PathBuf) -> Connection {
153        let conn = Connection::open(path).expect("open read context-os db");
154        let _ = conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA query_only=ON;");
155        conn
156    }
157
158    fn take_read_conn(&self) -> Connection {
159        self.read_pool
160            .lock()
161            .unwrap_or_else(std::sync::PoisonError::into_inner)
162            .pop()
163            .unwrap_or_else(|| Self::open_read_conn(&self.db_path))
164    }
165
166    fn return_read_conn(&self, conn: Connection) {
167        let mut pool = self
168            .read_pool
169            .lock()
170            .unwrap_or_else(std::sync::PoisonError::into_inner);
171        if pool.len() < MAX_READ_CONNS {
172            pool.push(conn);
173        }
174    }
175
176    fn stream_key(workspace_id: &str, channel_id: &str) -> String {
177        format!("{workspace_id}\0{channel_id}")
178    }
179
180    fn next_version(&self, workspace_id: &str, channel_id: &str) -> i64 {
181        let key = Self::stream_key(workspace_id, channel_id);
182
183        {
184            let mut cache = self
185                .version_cache
186                .lock()
187                .unwrap_or_else(std::sync::PoisonError::into_inner);
188            if let Some(v) = cache.get_mut(&key) {
189                *v += 1;
190                return *v;
191            }
192        }
193
194        let conn = self.take_read_conn();
195        let v: i64 = conn
196            .query_row(
197                "SELECT COALESCE(MAX(version), 0) FROM context_events WHERE workspace_id = ?1 AND channel_id = ?2",
198                params![workspace_id, channel_id],
199                |row| row.get(0),
200            )
201            .unwrap_or(0);
202        self.return_read_conn(conn);
203
204        let mut cache = self
205            .version_cache
206            .lock()
207            .unwrap_or_else(std::sync::PoisonError::into_inner);
208        let entry = cache.entry(key).or_insert(v);
209        *entry = (*entry).max(v) + 1;
210        *entry
211    }
212}
213
214impl Default for ContextBus {
215    fn default() -> Self {
216        Self::new()
217    }
218}
219
220impl ContextBus {
221    pub fn new() -> Self {
222        let path = default_db_path();
223        if let Some(parent) = path.parent() {
224            let _ = std::fs::create_dir_all(parent);
225        }
226        let conn = Connection::open(&path).expect("open context-os db");
227        conn.execute_batch(
228            "PRAGMA journal_mode=WAL;
229             CREATE TABLE IF NOT EXISTS context_events (
230               id INTEGER PRIMARY KEY AUTOINCREMENT,
231               workspace_id TEXT NOT NULL,
232               channel_id TEXT NOT NULL,
233               kind TEXT NOT NULL,
234               actor TEXT,
235               timestamp TEXT NOT NULL,
236               payload_json TEXT NOT NULL,
237               version INTEGER NOT NULL DEFAULT 0,
238               parent_id INTEGER
239             );
240             CREATE INDEX IF NOT EXISTS idx_context_events_stream
241               ON context_events(workspace_id, channel_id, id);",
242        )
243        .expect("init context-os db");
244
245        let _ = conn.execute_batch(
246            "ALTER TABLE context_events ADD COLUMN version INTEGER NOT NULL DEFAULT 0;",
247        );
248        let _ = conn.execute_batch("ALTER TABLE context_events ADD COLUMN parent_id INTEGER;");
249
250        let _ = conn.execute_batch(
251            "CREATE VIRTUAL TABLE IF NOT EXISTS context_events_fts USING fts5(
252               payload_text,
253               content=context_events,
254               content_rowid=id
255             );",
256        );
257
258        let mut read_conns = Vec::with_capacity(MAX_READ_CONNS);
259        for _ in 0..MAX_READ_CONNS {
260            read_conns.push(Inner::open_read_conn(&path));
261        }
262
263        Self {
264            inner: Arc::new(Inner {
265                write_conn: Mutex::new(conn),
266                read_pool: Mutex::new(read_conns),
267                streams: Mutex::new(HashMap::new()),
268                version_cache: Mutex::new(HashMap::new()),
269                db_path: path,
270            }),
271        }
272    }
273
274    pub fn subscribe(
275        &self,
276        workspace_id: &str,
277        channel_id: &str,
278    ) -> broadcast::Receiver<ContextEventV1> {
279        let key = Inner::stream_key(workspace_id, channel_id);
280        let mut streams = self
281            .inner
282            .streams
283            .lock()
284            .unwrap_or_else(std::sync::PoisonError::into_inner);
285        let tx = streams
286            .entry(key)
287            .or_insert_with(|| broadcast::channel(STREAM_CHANNEL_SIZE).0);
288        tx.subscribe()
289    }
290
291    pub fn append(
292        &self,
293        workspace_id: &str,
294        channel_id: &str,
295        kind: &ContextEventKindV1,
296        actor: Option<&str>,
297        payload: Value,
298    ) -> Option<ContextEventV1> {
299        self.append_with_parent(workspace_id, channel_id, kind, actor, payload, None)
300    }
301
302    pub fn append_with_parent(
303        &self,
304        workspace_id: &str,
305        channel_id: &str,
306        kind: &ContextEventKindV1,
307        actor: Option<&str>,
308        payload: Value,
309        parent_id: Option<i64>,
310    ) -> Option<ContextEventV1> {
311        let ts = Utc::now();
312        let payload_json = payload.to_string();
313
314        let (id, version) = {
315            let Ok(conn) = self.inner.write_conn.lock() else {
316                return None;
317            };
318            let version = self.inner.next_version(workspace_id, channel_id);
319
320            let result: Result<(i64, i64), rusqlite::Error> = conn
321                .execute_batch("BEGIN IMMEDIATE")
322                .and_then(|()| {
323                    conn.execute(
324                        "INSERT INTO context_events (workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id)
325                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
326                        params![
327                            workspace_id,
328                            channel_id,
329                            kind.as_str(),
330                            actor.map(str::to_string),
331                            ts.to_rfc3339(),
332                            payload_json,
333                            version,
334                            parent_id,
335                        ],
336                    )?;
337                    let rowid = conn.last_insert_rowid();
338                    if let Err(e) = conn.execute(
339                        "INSERT INTO context_events_fts(rowid, payload_text) VALUES (?1, ?2)",
340                        params![rowid, payload_json],
341                    ) {
342                        tracing::warn!("FTS insert failed for event {rowid}: {e}");
343                    }
344                    conn.execute_batch("COMMIT")?;
345                    Ok((rowid, version))
346                });
347
348            match result {
349                Ok(pair) => pair,
350                Err(e) => {
351                    tracing::warn!("context bus append failed: {e}");
352                    let _ = conn.execute_batch("ROLLBACK");
353                    return None;
354                }
355            }
356        };
357
358        let ev = ContextEventV1 {
359            id,
360            workspace_id: workspace_id.to_string(),
361            channel_id: channel_id.to_string(),
362            consistency_level: kind.consistency_level().as_str().to_string(),
363            kind: kind.as_str().to_string(),
364            actor: actor.map(str::to_string),
365            timestamp: ts,
366            version,
367            parent_id,
368            payload,
369        };
370        let key = Inner::stream_key(workspace_id, channel_id);
371        let tx = self
372            .inner
373            .streams
374            .lock()
375            .unwrap_or_else(std::sync::PoisonError::into_inner)
376            .get(&key)
377            .cloned();
378        if let Some(tx) = tx {
379            let _ = tx.send(ev.clone());
380        }
381        Some(ev)
382    }
383
384    pub fn read(
385        &self,
386        workspace_id: &str,
387        channel_id: &str,
388        since: i64,
389        limit: usize,
390    ) -> Vec<ContextEventV1> {
391        let limit = limit.clamp(1, 1000) as i64;
392        let conn = self.inner.take_read_conn();
393        let result = (|| {
394            let mut stmt = conn.prepare(
395                "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id
396                 FROM context_events
397                 WHERE workspace_id = ?1 AND channel_id = ?2 AND id > ?3
398                 ORDER BY id ASC
399                 LIMIT ?4",
400            ).ok()?;
401            let rows = stmt
402                .query_map(
403                    params![workspace_id, channel_id, since, limit],
404                    event_from_row,
405                )
406                .ok()?;
407            Some(rows.flatten().collect::<Vec<_>>())
408        })();
409        self.inner.return_read_conn(conn);
410        result.unwrap_or_default()
411    }
412
413    /// Query recent events of a specific kind (for conflict detection).
414    pub fn recent_by_kind(
415        &self,
416        workspace_id: &str,
417        channel_id: &str,
418        kind: &str,
419        limit: usize,
420    ) -> Vec<ContextEventV1> {
421        let limit = limit.clamp(1, 100) as i64;
422        let conn = self.inner.take_read_conn();
423        let result = (|| {
424            let mut stmt = conn.prepare(
425                "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id
426                 FROM context_events
427                 WHERE workspace_id = ?1 AND channel_id = ?2 AND kind = ?3
428                 ORDER BY id DESC
429                 LIMIT ?4",
430            ).ok()?;
431            let rows = stmt
432                .query_map(
433                    params![workspace_id, channel_id, kind, limit],
434                    event_from_row,
435                )
436                .ok()?;
437            Some(rows.flatten().collect::<Vec<_>>())
438        })();
439        self.inner.return_read_conn(conn);
440        result.unwrap_or_default()
441    }
442
443    /// Full-text search over event payloads via FTS5.
444    pub fn search(
445        &self,
446        workspace_id: &str,
447        channel_id: Option<&str>,
448        query: &str,
449        limit: usize,
450    ) -> Vec<ContextEventV1> {
451        let limit = limit.clamp(1, 100) as i64;
452        let conn = self.inner.take_read_conn();
453        let result =
454            if let Some(ch) = channel_id {
455                (|| {
456                    let mut stmt = conn.prepare(
457                    "SELECT e.id, e.workspace_id, e.channel_id, e.kind, e.actor, e.timestamp,
458                            e.payload_json, e.version, e.parent_id
459                     FROM context_events e
460                     JOIN context_events_fts f ON e.id = f.rowid
461                     WHERE f.payload_text MATCH ?1 AND e.workspace_id = ?2 AND e.channel_id = ?3
462                     ORDER BY f.rank
463                     LIMIT ?4",
464                ).ok()?;
465                    let rows = stmt
466                        .query_map(params![query, workspace_id, ch, limit], event_from_row)
467                        .ok()?;
468                    Some(rows.flatten().collect::<Vec<_>>())
469                })()
470            } else {
471                (|| {
472                    let mut stmt = conn.prepare(
473                    "SELECT e.id, e.workspace_id, e.channel_id, e.kind, e.actor, e.timestamp,
474                            e.payload_json, e.version, e.parent_id
475                     FROM context_events e
476                     JOIN context_events_fts f ON e.id = f.rowid
477                     WHERE f.payload_text MATCH ?1 AND e.workspace_id = ?2
478                     ORDER BY f.rank
479                     LIMIT ?3",
480                ).ok()?;
481                    let rows = stmt
482                        .query_map(params![query, workspace_id, limit], event_from_row)
483                        .ok()?;
484                    Some(rows.flatten().collect::<Vec<_>>())
485                })()
486            };
487        self.inner.return_read_conn(conn);
488        result.unwrap_or_default()
489    }
490
491    /// Trace the causal lineage of an event by following parent_id chains.
492    pub fn lineage(&self, event_id: i64, max_depth: usize) -> Vec<ContextEventV1> {
493        let max_depth = max_depth.clamp(1, 50);
494        let conn = self.inner.take_read_conn();
495        let mut chain = Vec::new();
496        let mut current_id = Some(event_id);
497
498        for _ in 0..max_depth {
499            let Some(id) = current_id else {
500                break;
501            };
502            let ev = conn.query_row(
503                "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id
504                 FROM context_events WHERE id = ?1",
505                params![id],
506                event_from_row,
507            );
508            match ev {
509                Ok(ev) => {
510                    current_id = ev.parent_id;
511                    chain.push(ev);
512                }
513                Err(_) => break,
514            }
515        }
516        self.inner.return_read_conn(conn);
517        chain
518    }
519
520    /// Returns the highest event id for a workspace/channel pair, or 0 if none.
521    pub fn latest_id(&self, workspace_id: &str, channel_id: &str) -> i64 {
522        let conn = self.inner.take_read_conn();
523        let result = conn
524            .query_row(
525                "SELECT COALESCE(MAX(id), 0) FROM context_events WHERE workspace_id = ?1 AND channel_id = ?2",
526                params![workspace_id, channel_id],
527                |row| row.get(0),
528            )
529            .unwrap_or(0);
530        self.inner.return_read_conn(conn);
531        result
532    }
533}
534
535fn default_db_path() -> PathBuf {
536    let data = crate::core::data_dir::lean_ctx_data_dir().unwrap_or_else(|_| PathBuf::from("."));
537    data.join("context-os").join("context-os.db")
538}
539
540#[cfg(test)]
541mod tests {
542    use super::*;
543
544    #[test]
545    fn append_and_read_roundtrip() {
546        let bus = ContextBus::new();
547        let ev = bus
548            .append(
549                "ws",
550                "ch",
551                &ContextEventKindV1::ToolCallRecorded,
552                Some("agent"),
553                serde_json::json!({"tool":"ctx_read"}),
554            )
555            .expect("append");
556        let got = bus.read("ws", "ch", ev.id - 1, 10);
557        assert!(got.iter().any(|e| e.id == ev.id));
558    }
559
560    #[test]
561    fn multi_client_concurrent_appends_have_deterministic_ordering() {
562        let bus = Arc::new(ContextBus::new());
563        let n_clients = 5;
564        let n_events_per_client = 20;
565        let ws = format!("ws-concurrent-{}", std::process::id());
566        let ch = format!("ch-concurrent-{}", std::process::id());
567
568        let mut handles = vec![];
569        for client_idx in 0..n_clients {
570            let bus = Arc::clone(&bus);
571            let ws = ws.clone();
572            let ch = ch.clone();
573            handles.push(std::thread::spawn(move || {
574                let agent = format!("agent-{client_idx}");
575                for event_idx in 0..n_events_per_client {
576                    bus.append(
577                        &ws,
578                        &ch,
579                        &ContextEventKindV1::ToolCallRecorded,
580                        Some(&agent),
581                        serde_json::json!({"client": client_idx, "seq": event_idx}),
582                    );
583                }
584            }));
585        }
586
587        for h in handles {
588            h.join().unwrap();
589        }
590
591        let all = bus.read(&ws, &ch, 0, 1000);
592        assert_eq!(
593            all.len(),
594            n_clients * n_events_per_client,
595            "all events should be persisted"
596        );
597
598        let ids: Vec<i64> = all.iter().map(|e| e.id).collect();
599        let mut sorted = ids.clone();
600        sorted.sort_unstable();
601        assert_eq!(ids, sorted, "events must be in strictly ascending ID order");
602
603        for win in ids.windows(2) {
604            assert!(
605                win[1] > win[0],
606                "IDs must be strictly monotonic (no gaps from concurrent access)"
607            );
608        }
609    }
610
611    #[test]
612    fn workspace_channel_isolation() {
613        let bus = ContextBus::new();
614        let pid = std::process::id();
615        let ws_a = format!("ws-iso-a-{pid}");
616        let ws_b = format!("ws-iso-b-{pid}");
617        let ws_c = format!("ws-iso-c-{pid}");
618        let ch1 = format!("ch-iso-1-{pid}");
619        let ch2 = format!("ch-iso-2-{pid}");
620
621        bus.append(
622            &ws_a,
623            &ch1,
624            &ContextEventKindV1::SessionMutated,
625            Some("agent-a"),
626            serde_json::json!({"ws":"a","ch":"1"}),
627        );
628        bus.append(
629            &ws_a,
630            &ch2,
631            &ContextEventKindV1::KnowledgeRemembered,
632            Some("agent-a"),
633            serde_json::json!({"ws":"a","ch":"2"}),
634        );
635        bus.append(
636            &ws_b,
637            &ch1,
638            &ContextEventKindV1::ArtifactStored,
639            Some("agent-b"),
640            serde_json::json!({"ws":"b","ch":"1"}),
641        );
642
643        let ws_a_ch_1 = bus.read(&ws_a, &ch1, 0, 100);
644        assert_eq!(ws_a_ch_1.len(), 1);
645        assert_eq!(ws_a_ch_1[0].kind, "session_mutated");
646
647        let ws_a_ch_2 = bus.read(&ws_a, &ch2, 0, 100);
648        assert_eq!(ws_a_ch_2.len(), 1);
649        assert_eq!(ws_a_ch_2[0].kind, "knowledge_remembered");
650
651        let ws_b_ch_1 = bus.read(&ws_b, &ch1, 0, 100);
652        assert_eq!(ws_b_ch_1.len(), 1);
653        assert_eq!(ws_b_ch_1[0].kind, "artifact_stored");
654
655        let ws_c_ch_1 = bus.read(&ws_c, &ch1, 0, 100);
656        assert!(ws_c_ch_1.is_empty(), "non-existent workspace returns empty");
657    }
658
659    #[test]
660    fn replay_from_cursor_returns_only_newer_events() {
661        let bus = ContextBus::new();
662        let pid = std::process::id();
663        let ws = &format!("ws-replay-{pid}");
664        let ch = &format!("ch-replay-{pid}");
665
666        let ev1 = bus
667            .append(
668                ws,
669                ch,
670                &ContextEventKindV1::ToolCallRecorded,
671                None,
672                serde_json::json!({"seq":1}),
673            )
674            .unwrap();
675        let ev2 = bus
676            .append(
677                ws,
678                ch,
679                &ContextEventKindV1::SessionMutated,
680                None,
681                serde_json::json!({"seq":2}),
682            )
683            .unwrap();
684        let _ev3 = bus
685            .append(
686                ws,
687                ch,
688                &ContextEventKindV1::GraphBuilt,
689                None,
690                serde_json::json!({"seq":3}),
691            )
692            .unwrap();
693
694        let from_cursor = bus.read(ws, ch, ev2.id, 100);
695        assert_eq!(from_cursor.len(), 1, "only events after cursor");
696        assert_eq!(from_cursor[0].kind, "graph_built");
697
698        let from_first = bus.read(ws, ch, ev1.id, 100);
699        assert_eq!(from_first.len(), 2, "events after first");
700
701        let from_zero = bus.read(ws, ch, 0, 100);
702        assert_eq!(from_zero.len(), 3, "all events from zero");
703    }
704
705    #[test]
706    fn broadcast_subscriber_receives_events() {
707        let bus = ContextBus::new();
708        let mut rx = bus.subscribe("ws", "ch");
709
710        let ev = bus
711            .append(
712                "ws",
713                "ch",
714                &ContextEventKindV1::ProofAdded,
715                Some("verifier"),
716                serde_json::json!({"proof":"hash"}),
717            )
718            .unwrap();
719
720        let received = rx.try_recv().expect("subscriber should receive event");
721        assert_eq!(received.id, ev.id);
722        assert_eq!(received.kind, "proof_added");
723        assert_eq!(received.actor.as_deref(), Some("verifier"));
724    }
725}