1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::{Arc, Mutex};
4
5use chrono::{DateTime, Utc};
6use rusqlite::{params, Connection};
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use tokio::sync::broadcast;
10
11const MAX_READ_CONNS: usize = 4;
12
13#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
14#[serde(rename_all = "snake_case")]
15pub enum ContextEventKindV1 {
16 ToolCallRecorded,
17 SessionMutated,
18 KnowledgeRemembered,
19 ArtifactStored,
20 GraphBuilt,
21 ProofAdded,
22}
23
24impl ContextEventKindV1 {
25 pub fn as_str(&self) -> &'static str {
26 match self {
27 Self::ToolCallRecorded => "tool_call_recorded",
28 Self::SessionMutated => "session_mutated",
29 Self::KnowledgeRemembered => "knowledge_remembered",
30 Self::ArtifactStored => "artifact_stored",
31 Self::GraphBuilt => "graph_built",
32 Self::ProofAdded => "proof_added",
33 }
34 }
35
36 pub fn parse(s: &str) -> Self {
37 match s.trim().to_lowercase().as_str() {
38 "tool_call_recorded" => Self::ToolCallRecorded,
39 "session_mutated" => Self::SessionMutated,
40 "knowledge_remembered" => Self::KnowledgeRemembered,
41 "artifact_stored" => Self::ArtifactStored,
42 "graph_built" => Self::GraphBuilt,
43 "proof_added" => Self::ProofAdded,
44 other => {
45 tracing::warn!(
46 "unknown ContextEventKind '{other}', defaulting to ToolCallRecorded"
47 );
48 Self::ToolCallRecorded
49 }
50 }
51 }
52
53 pub fn consistency_level(&self) -> ConsistencyLevel {
59 match self {
60 Self::ToolCallRecorded | Self::GraphBuilt => ConsistencyLevel::Local,
61 Self::KnowledgeRemembered | Self::ArtifactStored => ConsistencyLevel::Eventual,
62 Self::SessionMutated | Self::ProofAdded => ConsistencyLevel::Strong,
63 }
64 }
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
70#[serde(rename_all = "snake_case")]
71pub enum ConsistencyLevel {
72 Local = 0,
74 Eventual = 1,
76 Strong = 2,
78}
79
80impl ConsistencyLevel {
81 pub fn as_str(&self) -> &'static str {
82 match self {
83 Self::Local => "local",
84 Self::Eventual => "eventual",
85 Self::Strong => "strong",
86 }
87 }
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91#[serde(rename_all = "camelCase")]
92pub struct ContextEventV1 {
93 pub id: i64,
94 pub workspace_id: String,
95 pub channel_id: String,
96 pub kind: String,
97 pub actor: Option<String>,
98 pub timestamp: DateTime<Utc>,
99 pub version: i64,
100 #[serde(skip_serializing_if = "Option::is_none")]
101 pub parent_id: Option<i64>,
102 pub consistency_level: String,
103 pub payload: Value,
104 #[serde(skip_serializing_if = "Option::is_none", default)]
105 pub target_agents: Option<Vec<String>>,
106}
107
108impl ContextEventV1 {
109 pub fn consistency(&self) -> ConsistencyLevel {
110 ContextEventKindV1::parse(&self.kind).consistency_level()
111 }
112
113 pub fn is_visible_to_agent(&self, agent_id: &str) -> bool {
114 match &self.target_agents {
115 None => true,
116 Some(targets) => targets.iter().any(|t| t == agent_id),
117 }
118 }
119}
120
121#[derive(Debug, Clone, Default)]
124pub struct TopicFilter {
125 pub kinds: Option<Vec<ContextEventKindV1>>,
126 pub actors: Option<Vec<String>>,
127 pub min_consistency: Option<ConsistencyLevel>,
128 pub agent_id: Option<String>,
129}
130
131impl TopicFilter {
132 pub fn kinds(kind_strs: &[&str]) -> Self {
134 Self {
135 kinds: Some(
136 kind_strs
137 .iter()
138 .map(|s| ContextEventKindV1::parse(s))
139 .collect(),
140 ),
141 ..Self::default()
142 }
143 }
144
145 pub fn matches(&self, event: &ContextEventV1) -> bool {
146 if let Some(ref kinds) = self.kinds {
147 let parsed = ContextEventKindV1::parse(&event.kind);
148 if !kinds.contains(&parsed) {
149 return false;
150 }
151 }
152 if let Some(ref actors) = self.actors {
153 match &event.actor {
154 Some(actor) if actors.iter().any(|a| a == actor) => {}
155 Some(_) | None => return false,
156 }
157 }
158 if let Some(min) = self.min_consistency {
159 if event.consistency() < min {
160 return false;
161 }
162 }
163 if let Some(ref aid) = self.agent_id {
164 if !event.is_visible_to_agent(aid) {
165 return false;
166 }
167 }
168 true
169 }
170}
171
172fn event_from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<ContextEventV1> {
173 let ts_str: String = row.get(5)?;
174 let ts = DateTime::parse_from_rfc3339(&ts_str)
175 .map_or_else(|_| Utc::now(), |d| d.with_timezone(&Utc));
176 let payload_str: String = row.get(6)?;
177 let payload: Value = serde_json::from_str(&payload_str).unwrap_or(Value::Null);
178 let kind_str: String = row.get(3)?;
179 let cl = ContextEventKindV1::parse(&kind_str)
180 .consistency_level()
181 .as_str()
182 .to_string();
183 Ok(ContextEventV1 {
184 id: row.get(0)?,
185 workspace_id: row.get(1)?,
186 channel_id: row.get(2)?,
187 kind: kind_str,
188 actor: row.get::<_, Option<String>>(4)?,
189 timestamp: ts,
190 version: row.get::<_, i64>(7).unwrap_or(0),
191 parent_id: row.get::<_, Option<i64>>(8).ok().flatten(),
192 consistency_level: cl,
193 payload,
194 target_agents: None,
195 })
196}
197
198#[derive(Clone)]
199pub struct ContextBus {
200 inner: Arc<Inner>,
201}
202
203const STREAM_CHANNEL_SIZE: usize = 256;
204
205struct Inner {
206 write_conn: Mutex<Connection>,
207 read_pool: Mutex<Vec<Connection>>,
208 streams: Mutex<HashMap<String, broadcast::Sender<ContextEventV1>>>,
209 version_cache: Mutex<HashMap<String, i64>>,
210 db_path: PathBuf,
211}
212
213impl Inner {
214 fn open_read_conn(path: &PathBuf) -> Connection {
215 let conn = Connection::open(path).expect("open read context-os db");
216 let _ = conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA query_only=ON;");
217 conn
218 }
219
220 fn take_read_conn(&self) -> Connection {
221 self.read_pool
222 .lock()
223 .unwrap_or_else(std::sync::PoisonError::into_inner)
224 .pop()
225 .unwrap_or_else(|| Self::open_read_conn(&self.db_path))
226 }
227
228 fn return_read_conn(&self, conn: Connection) {
229 let mut pool = self
230 .read_pool
231 .lock()
232 .unwrap_or_else(std::sync::PoisonError::into_inner);
233 if pool.len() < MAX_READ_CONNS {
234 pool.push(conn);
235 }
236 }
237
238 fn stream_key(workspace_id: &str, channel_id: &str) -> String {
239 format!("{workspace_id}\0{channel_id}")
240 }
241
242 fn next_version(&self, workspace_id: &str, channel_id: &str) -> i64 {
243 let key = Self::stream_key(workspace_id, channel_id);
244
245 {
246 let mut cache = self
247 .version_cache
248 .lock()
249 .unwrap_or_else(std::sync::PoisonError::into_inner);
250 if let Some(v) = cache.get_mut(&key) {
251 *v += 1;
252 return *v;
253 }
254 }
255
256 let conn = self.take_read_conn();
257 let v: i64 = conn
258 .query_row(
259 "SELECT COALESCE(MAX(version), 0) FROM context_events WHERE workspace_id = ?1 AND channel_id = ?2",
260 params![workspace_id, channel_id],
261 |row| row.get(0),
262 )
263 .unwrap_or(0);
264 self.return_read_conn(conn);
265
266 let mut cache = self
267 .version_cache
268 .lock()
269 .unwrap_or_else(std::sync::PoisonError::into_inner);
270 let entry = cache.entry(key).or_insert(v);
271 *entry = (*entry).max(v) + 1;
272 *entry
273 }
274}
275
276impl Default for ContextBus {
277 fn default() -> Self {
278 Self::new()
279 }
280}
281
282impl ContextBus {
283 pub fn new() -> Self {
284 let path = default_db_path();
285 if let Some(parent) = path.parent() {
286 let _ = std::fs::create_dir_all(parent);
287 }
288 let conn = Connection::open(&path).expect("open context-os db");
289 conn.execute_batch(
290 "PRAGMA journal_mode=WAL;
291 CREATE TABLE IF NOT EXISTS context_events (
292 id INTEGER PRIMARY KEY AUTOINCREMENT,
293 workspace_id TEXT NOT NULL,
294 channel_id TEXT NOT NULL,
295 kind TEXT NOT NULL,
296 actor TEXT,
297 timestamp TEXT NOT NULL,
298 payload_json TEXT NOT NULL,
299 version INTEGER NOT NULL DEFAULT 0,
300 parent_id INTEGER
301 );
302 CREATE INDEX IF NOT EXISTS idx_context_events_stream
303 ON context_events(workspace_id, channel_id, id);",
304 )
305 .expect("init context-os db");
306
307 let _ = conn.execute_batch(
308 "ALTER TABLE context_events ADD COLUMN version INTEGER NOT NULL DEFAULT 0;",
309 );
310 let _ = conn.execute_batch("ALTER TABLE context_events ADD COLUMN parent_id INTEGER;");
311
312 let _ = conn.execute_batch(
313 "CREATE VIRTUAL TABLE IF NOT EXISTS context_events_fts USING fts5(
314 payload_text,
315 content=context_events,
316 content_rowid=id
317 );",
318 );
319
320 let mut read_conns = Vec::with_capacity(MAX_READ_CONNS);
321 for _ in 0..MAX_READ_CONNS {
322 read_conns.push(Inner::open_read_conn(&path));
323 }
324
325 Self {
326 inner: Arc::new(Inner {
327 write_conn: Mutex::new(conn),
328 read_pool: Mutex::new(read_conns),
329 streams: Mutex::new(HashMap::new()),
330 version_cache: Mutex::new(HashMap::new()),
331 db_path: path,
332 }),
333 }
334 }
335
336 pub fn subscribe(
337 &self,
338 workspace_id: &str,
339 channel_id: &str,
340 ) -> broadcast::Receiver<ContextEventV1> {
341 let key = Inner::stream_key(workspace_id, channel_id);
342 let mut streams = self
343 .inner
344 .streams
345 .lock()
346 .unwrap_or_else(std::sync::PoisonError::into_inner);
347 let tx = streams
348 .entry(key)
349 .or_insert_with(|| broadcast::channel(STREAM_CHANNEL_SIZE).0);
350 tx.subscribe()
351 }
352
353 pub fn subscribe_filtered(
356 &self,
357 workspace_id: &str,
358 channel_id: &str,
359 filter: TopicFilter,
360 ) -> FilteredSubscription {
361 let rx = self.subscribe(workspace_id, channel_id);
362 FilteredSubscription { rx, filter }
363 }
364
365 pub fn append(
366 &self,
367 workspace_id: &str,
368 channel_id: &str,
369 kind: &ContextEventKindV1,
370 actor: Option<&str>,
371 payload: Value,
372 ) -> Option<ContextEventV1> {
373 self.append_with_parent(workspace_id, channel_id, kind, actor, payload, None)
374 }
375
376 pub fn append_with_parent(
377 &self,
378 workspace_id: &str,
379 channel_id: &str,
380 kind: &ContextEventKindV1,
381 actor: Option<&str>,
382 payload: Value,
383 parent_id: Option<i64>,
384 ) -> Option<ContextEventV1> {
385 let ev = self.insert_event(
386 workspace_id,
387 channel_id,
388 kind,
389 actor,
390 payload,
391 parent_id,
392 None,
393 )?;
394 self.broadcast_event(&ev);
395 Some(ev)
396 }
397
398 pub fn append_directed(
401 &self,
402 workspace_id: &str,
403 channel_id: &str,
404 kind: &ContextEventKindV1,
405 actor: Option<&str>,
406 payload: Value,
407 target_agents: Vec<String>,
408 ) -> Option<ContextEventV1> {
409 let ev = self.insert_event(
410 workspace_id,
411 channel_id,
412 kind,
413 actor,
414 payload,
415 None,
416 Some(target_agents),
417 )?;
418 self.broadcast_event(&ev);
419 Some(ev)
420 }
421
422 fn insert_event(
423 &self,
424 workspace_id: &str,
425 channel_id: &str,
426 kind: &ContextEventKindV1,
427 actor: Option<&str>,
428 payload: Value,
429 parent_id: Option<i64>,
430 target_agents: Option<Vec<String>>,
431 ) -> Option<ContextEventV1> {
432 let ts = Utc::now();
433 let payload_json = payload.to_string();
434
435 let (id, version) = {
436 let Ok(conn) = self.inner.write_conn.lock() else {
437 return None;
438 };
439 let version = self.inner.next_version(workspace_id, channel_id);
440
441 let result: Result<(i64, i64), rusqlite::Error> = conn
442 .execute_batch("BEGIN IMMEDIATE")
443 .and_then(|()| {
444 conn.execute(
445 "INSERT INTO context_events (workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id)
446 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
447 params![
448 workspace_id,
449 channel_id,
450 kind.as_str(),
451 actor.map(str::to_string),
452 ts.to_rfc3339(),
453 payload_json,
454 version,
455 parent_id,
456 ],
457 )?;
458 let rowid = conn.last_insert_rowid();
459 if let Err(e) = conn.execute(
460 "INSERT INTO context_events_fts(rowid, payload_text) VALUES (?1, ?2)",
461 params![rowid, payload_json],
462 ) {
463 tracing::warn!("FTS insert failed for event {rowid}: {e}");
464 }
465 conn.execute_batch("COMMIT")?;
466 Ok((rowid, version))
467 });
468
469 match result {
470 Ok(pair) => pair,
471 Err(e) => {
472 tracing::warn!("context bus append failed: {e}");
473 let _ = conn.execute_batch("ROLLBACK");
474 return None;
475 }
476 }
477 };
478
479 Some(ContextEventV1 {
480 id,
481 workspace_id: workspace_id.to_string(),
482 channel_id: channel_id.to_string(),
483 consistency_level: kind.consistency_level().as_str().to_string(),
484 kind: kind.as_str().to_string(),
485 actor: actor.map(str::to_string),
486 timestamp: ts,
487 version,
488 parent_id,
489 payload,
490 target_agents,
491 })
492 }
493
494 fn broadcast_event(&self, ev: &ContextEventV1) {
495 let key = Inner::stream_key(&ev.workspace_id, &ev.channel_id);
496 let tx = self
497 .inner
498 .streams
499 .lock()
500 .unwrap_or_else(std::sync::PoisonError::into_inner)
501 .get(&key)
502 .cloned();
503 if let Some(tx) = tx {
504 let _ = tx.send(ev.clone());
505 }
506 }
507
508 pub fn read(
509 &self,
510 workspace_id: &str,
511 channel_id: &str,
512 since: i64,
513 limit: usize,
514 ) -> Vec<ContextEventV1> {
515 let limit = limit.clamp(1, 1000) as i64;
516 let conn = self.inner.take_read_conn();
517 let result = (|| {
518 let mut stmt = conn.prepare(
519 "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id
520 FROM context_events
521 WHERE workspace_id = ?1 AND channel_id = ?2 AND id > ?3
522 ORDER BY id ASC
523 LIMIT ?4",
524 ).ok()?;
525 let rows = stmt
526 .query_map(
527 params![workspace_id, channel_id, since, limit],
528 event_from_row,
529 )
530 .ok()?;
531 Some(rows.flatten().collect::<Vec<_>>())
532 })();
533 self.inner.return_read_conn(conn);
534 result.unwrap_or_default()
535 }
536
537 pub fn recent_by_kind(
539 &self,
540 workspace_id: &str,
541 channel_id: &str,
542 kind: &str,
543 limit: usize,
544 ) -> Vec<ContextEventV1> {
545 let limit = limit.clamp(1, 100) as i64;
546 let conn = self.inner.take_read_conn();
547 let result = (|| {
548 let mut stmt = conn.prepare(
549 "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id
550 FROM context_events
551 WHERE workspace_id = ?1 AND channel_id = ?2 AND kind = ?3
552 ORDER BY id DESC
553 LIMIT ?4",
554 ).ok()?;
555 let rows = stmt
556 .query_map(
557 params![workspace_id, channel_id, kind, limit],
558 event_from_row,
559 )
560 .ok()?;
561 Some(rows.flatten().collect::<Vec<_>>())
562 })();
563 self.inner.return_read_conn(conn);
564 result.unwrap_or_default()
565 }
566
567 pub fn search(
569 &self,
570 workspace_id: &str,
571 channel_id: Option<&str>,
572 query: &str,
573 limit: usize,
574 ) -> Vec<ContextEventV1> {
575 let limit = limit.clamp(1, 100) as i64;
576 let conn = self.inner.take_read_conn();
577 let result =
578 if let Some(ch) = channel_id {
579 (|| {
580 let mut stmt = conn.prepare(
581 "SELECT e.id, e.workspace_id, e.channel_id, e.kind, e.actor, e.timestamp,
582 e.payload_json, e.version, e.parent_id
583 FROM context_events e
584 JOIN context_events_fts f ON e.id = f.rowid
585 WHERE f.payload_text MATCH ?1 AND e.workspace_id = ?2 AND e.channel_id = ?3
586 ORDER BY f.rank
587 LIMIT ?4",
588 ).ok()?;
589 let rows = stmt
590 .query_map(params![query, workspace_id, ch, limit], event_from_row)
591 .ok()?;
592 Some(rows.flatten().collect::<Vec<_>>())
593 })()
594 } else {
595 (|| {
596 let mut stmt = conn.prepare(
597 "SELECT e.id, e.workspace_id, e.channel_id, e.kind, e.actor, e.timestamp,
598 e.payload_json, e.version, e.parent_id
599 FROM context_events e
600 JOIN context_events_fts f ON e.id = f.rowid
601 WHERE f.payload_text MATCH ?1 AND e.workspace_id = ?2
602 ORDER BY f.rank
603 LIMIT ?3",
604 ).ok()?;
605 let rows = stmt
606 .query_map(params![query, workspace_id, limit], event_from_row)
607 .ok()?;
608 Some(rows.flatten().collect::<Vec<_>>())
609 })()
610 };
611 self.inner.return_read_conn(conn);
612 result.unwrap_or_default()
613 }
614
615 pub fn lineage(
618 &self,
619 event_id: i64,
620 workspace_id: &str,
621 max_depth: usize,
622 ) -> Vec<ContextEventV1> {
623 let max_depth = max_depth.clamp(1, 50);
624 let conn = self.inner.take_read_conn();
625 let mut chain = Vec::new();
626 let mut current_id = Some(event_id);
627
628 for _ in 0..max_depth {
629 let Some(id) = current_id else {
630 break;
631 };
632 let ev = conn.query_row(
633 "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id
634 FROM context_events WHERE id = ?1 AND workspace_id = ?2",
635 params![id, workspace_id],
636 event_from_row,
637 );
638 match ev {
639 Ok(ev) => {
640 current_id = ev.parent_id;
641 chain.push(ev);
642 }
643 Err(_) => break,
644 }
645 }
646 self.inner.return_read_conn(conn);
647 chain
648 }
649
650 pub fn latest_id(&self, workspace_id: &str, channel_id: &str) -> i64 {
652 let conn = self.inner.take_read_conn();
653 let result = conn
654 .query_row(
655 "SELECT COALESCE(MAX(id), 0) FROM context_events WHERE workspace_id = ?1 AND channel_id = ?2",
656 params![workspace_id, channel_id],
657 |row| row.get(0),
658 )
659 .unwrap_or(0);
660 self.inner.return_read_conn(conn);
661 result
662 }
663}
664
665pub struct FilteredSubscription {
667 pub rx: broadcast::Receiver<ContextEventV1>,
668 pub filter: TopicFilter,
669}
670
671impl FilteredSubscription {
672 pub async fn recv_filtered(&mut self) -> Result<ContextEventV1, broadcast::error::RecvError> {
675 loop {
676 let ev = self.rx.recv().await?;
677 if self.filter.matches(&ev) {
678 return Ok(ev);
679 }
680 }
681 }
682}
683
684fn default_db_path() -> PathBuf {
685 let data = crate::core::data_dir::lean_ctx_data_dir().unwrap_or_else(|_| PathBuf::from("."));
686 data.join("context-os").join("context-os.db")
687}
688
689#[cfg(test)]
690mod tests {
691 use super::*;
692
693 #[test]
694 fn append_and_read_roundtrip() {
695 let bus = ContextBus::new();
696 let ev = bus
697 .append(
698 "ws",
699 "ch",
700 &ContextEventKindV1::ToolCallRecorded,
701 Some("agent"),
702 serde_json::json!({"tool":"ctx_read"}),
703 )
704 .expect("append");
705 let got = bus.read("ws", "ch", ev.id - 1, 10);
706 assert!(got.iter().any(|e| e.id == ev.id));
707 }
708
709 #[test]
710 fn multi_client_concurrent_appends_have_deterministic_ordering() {
711 let bus = Arc::new(ContextBus::new());
712 let n_clients = 5;
713 let n_events_per_client = 20;
714 let ws = format!("ws-concurrent-{}", std::process::id());
715 let ch = format!("ch-concurrent-{}", std::process::id());
716
717 let mut handles = vec![];
718 for client_idx in 0..n_clients {
719 let bus = Arc::clone(&bus);
720 let ws = ws.clone();
721 let ch = ch.clone();
722 handles.push(std::thread::spawn(move || {
723 let agent = format!("agent-{client_idx}");
724 for event_idx in 0..n_events_per_client {
725 bus.append(
726 &ws,
727 &ch,
728 &ContextEventKindV1::ToolCallRecorded,
729 Some(&agent),
730 serde_json::json!({"client": client_idx, "seq": event_idx}),
731 );
732 }
733 }));
734 }
735
736 for h in handles {
737 h.join().unwrap();
738 }
739
740 let all = bus.read(&ws, &ch, 0, 1000);
741 assert_eq!(
742 all.len(),
743 n_clients * n_events_per_client,
744 "all events should be persisted"
745 );
746
747 let ids: Vec<i64> = all.iter().map(|e| e.id).collect();
748 let mut sorted = ids.clone();
749 sorted.sort_unstable();
750 assert_eq!(ids, sorted, "events must be in strictly ascending ID order");
751
752 for win in ids.windows(2) {
753 assert!(
754 win[1] > win[0],
755 "IDs must be strictly monotonic (no gaps from concurrent access)"
756 );
757 }
758 }
759
760 #[test]
761 fn workspace_channel_isolation() {
762 let bus = ContextBus::new();
763 let pid = std::process::id();
764 let ws_a = format!("ws-iso-a-{pid}");
765 let ws_b = format!("ws-iso-b-{pid}");
766 let ws_c = format!("ws-iso-c-{pid}");
767 let ch1 = format!("ch-iso-1-{pid}");
768 let ch2 = format!("ch-iso-2-{pid}");
769
770 bus.append(
771 &ws_a,
772 &ch1,
773 &ContextEventKindV1::SessionMutated,
774 Some("agent-a"),
775 serde_json::json!({"ws":"a","ch":"1"}),
776 );
777 bus.append(
778 &ws_a,
779 &ch2,
780 &ContextEventKindV1::KnowledgeRemembered,
781 Some("agent-a"),
782 serde_json::json!({"ws":"a","ch":"2"}),
783 );
784 bus.append(
785 &ws_b,
786 &ch1,
787 &ContextEventKindV1::ArtifactStored,
788 Some("agent-b"),
789 serde_json::json!({"ws":"b","ch":"1"}),
790 );
791
792 let ws_a_ch_1 = bus.read(&ws_a, &ch1, 0, 100);
793 assert_eq!(ws_a_ch_1.len(), 1);
794 assert_eq!(ws_a_ch_1[0].kind, "session_mutated");
795
796 let ws_a_ch_2 = bus.read(&ws_a, &ch2, 0, 100);
797 assert_eq!(ws_a_ch_2.len(), 1);
798 assert_eq!(ws_a_ch_2[0].kind, "knowledge_remembered");
799
800 let ws_b_ch_1 = bus.read(&ws_b, &ch1, 0, 100);
801 assert_eq!(ws_b_ch_1.len(), 1);
802 assert_eq!(ws_b_ch_1[0].kind, "artifact_stored");
803
804 let ws_c_ch_1 = bus.read(&ws_c, &ch1, 0, 100);
805 assert!(ws_c_ch_1.is_empty(), "non-existent workspace returns empty");
806 }
807
808 #[test]
809 fn replay_from_cursor_returns_only_newer_events() {
810 let bus = ContextBus::new();
811 let pid = std::process::id();
812 let ws = &format!("ws-replay-{pid}");
813 let ch = &format!("ch-replay-{pid}");
814
815 let ev1 = bus
816 .append(
817 ws,
818 ch,
819 &ContextEventKindV1::ToolCallRecorded,
820 None,
821 serde_json::json!({"seq":1}),
822 )
823 .unwrap();
824 let ev2 = bus
825 .append(
826 ws,
827 ch,
828 &ContextEventKindV1::SessionMutated,
829 None,
830 serde_json::json!({"seq":2}),
831 )
832 .unwrap();
833 let _ev3 = bus
834 .append(
835 ws,
836 ch,
837 &ContextEventKindV1::GraphBuilt,
838 None,
839 serde_json::json!({"seq":3}),
840 )
841 .unwrap();
842
843 let from_cursor = bus.read(ws, ch, ev2.id, 100);
844 assert_eq!(from_cursor.len(), 1, "only events after cursor");
845 assert_eq!(from_cursor[0].kind, "graph_built");
846
847 let from_first = bus.read(ws, ch, ev1.id, 100);
848 assert_eq!(from_first.len(), 2, "events after first");
849
850 let from_zero = bus.read(ws, ch, 0, 100);
851 assert_eq!(from_zero.len(), 3, "all events from zero");
852 }
853
854 #[test]
855 fn broadcast_subscriber_receives_events() {
856 let bus = ContextBus::new();
857 let mut rx = bus.subscribe("ws", "ch");
858
859 let ev = bus
860 .append(
861 "ws",
862 "ch",
863 &ContextEventKindV1::ProofAdded,
864 Some("verifier"),
865 serde_json::json!({"proof":"hash"}),
866 )
867 .unwrap();
868
869 let received = rx.try_recv().expect("subscriber should receive event");
870 assert_eq!(received.id, ev.id);
871 assert_eq!(received.kind, "proof_added");
872 assert_eq!(received.actor.as_deref(), Some("verifier"));
873 }
874}