1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::{Arc, Mutex};
4
5use chrono::{DateTime, Utc};
6use rusqlite::{params, Connection};
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use tokio::sync::broadcast;
10
11const MAX_READ_CONNS: usize = 4;
12
13#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
14#[serde(rename_all = "snake_case")]
15pub enum ContextEventKindV1 {
16 ToolCallRecorded,
17 SessionMutated,
18 KnowledgeRemembered,
19 ArtifactStored,
20 GraphBuilt,
21 ProofAdded,
22}
23
24impl ContextEventKindV1 {
25 pub fn as_str(&self) -> &'static str {
26 match self {
27 Self::ToolCallRecorded => "tool_call_recorded",
28 Self::SessionMutated => "session_mutated",
29 Self::KnowledgeRemembered => "knowledge_remembered",
30 Self::ArtifactStored => "artifact_stored",
31 Self::GraphBuilt => "graph_built",
32 Self::ProofAdded => "proof_added",
33 }
34 }
35
36 pub fn parse(s: &str) -> Self {
37 match s.trim().to_lowercase().as_str() {
38 "tool_call_recorded" => Self::ToolCallRecorded,
39 "session_mutated" => Self::SessionMutated,
40 "knowledge_remembered" => Self::KnowledgeRemembered,
41 "artifact_stored" => Self::ArtifactStored,
42 "graph_built" => Self::GraphBuilt,
43 "proof_added" => Self::ProofAdded,
44 other => {
45 tracing::warn!(
46 "unknown ContextEventKind '{other}', defaulting to ToolCallRecorded"
47 );
48 Self::ToolCallRecorded
49 }
50 }
51 }
52
53 pub fn consistency_level(&self) -> ConsistencyLevel {
59 match self {
60 Self::ToolCallRecorded | Self::GraphBuilt => ConsistencyLevel::Local,
61 Self::KnowledgeRemembered | Self::ArtifactStored => ConsistencyLevel::Eventual,
62 Self::SessionMutated | Self::ProofAdded => ConsistencyLevel::Strong,
63 }
64 }
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
70#[serde(rename_all = "snake_case")]
71pub enum ConsistencyLevel {
72 Local = 0,
74 Eventual = 1,
76 Strong = 2,
78}
79
80impl ConsistencyLevel {
81 pub fn as_str(&self) -> &'static str {
82 match self {
83 Self::Local => "local",
84 Self::Eventual => "eventual",
85 Self::Strong => "strong",
86 }
87 }
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91#[serde(rename_all = "camelCase")]
92pub struct ContextEventV1 {
93 pub id: i64,
94 pub workspace_id: String,
95 pub channel_id: String,
96 pub kind: String,
97 pub actor: Option<String>,
98 pub timestamp: DateTime<Utc>,
99 pub version: i64,
100 #[serde(skip_serializing_if = "Option::is_none")]
101 pub parent_id: Option<i64>,
102 pub consistency_level: String,
103 pub payload: Value,
104 #[serde(skip_serializing_if = "Option::is_none", default)]
105 pub target_agents: Option<Vec<String>>,
106}
107
108impl ContextEventV1 {
109 pub fn consistency(&self) -> ConsistencyLevel {
110 ContextEventKindV1::parse(&self.kind).consistency_level()
111 }
112
113 pub fn is_visible_to_agent(&self, agent_id: &str) -> bool {
114 match &self.target_agents {
115 None => true,
116 Some(targets) => targets.iter().any(|t| t == agent_id),
117 }
118 }
119}
120
121#[derive(Debug, Clone, Default)]
124pub struct TopicFilter {
125 pub kinds: Option<Vec<ContextEventKindV1>>,
126 pub actors: Option<Vec<String>>,
127 pub min_consistency: Option<ConsistencyLevel>,
128 pub agent_id: Option<String>,
129}
130
131impl TopicFilter {
132 pub fn kinds(kind_strs: &[&str]) -> Self {
134 Self {
135 kinds: Some(
136 kind_strs
137 .iter()
138 .map(|s| ContextEventKindV1::parse(s))
139 .collect(),
140 ),
141 ..Self::default()
142 }
143 }
144
145 pub fn matches(&self, event: &ContextEventV1) -> bool {
146 if let Some(ref kinds) = self.kinds {
147 let parsed = ContextEventKindV1::parse(&event.kind);
148 if !kinds.contains(&parsed) {
149 return false;
150 }
151 }
152 if let Some(ref actors) = self.actors {
153 match &event.actor {
154 Some(actor) if actors.iter().any(|a| a == actor) => {}
155 Some(_) | None => return false,
156 }
157 }
158 if let Some(min) = self.min_consistency {
159 if event.consistency() < min {
160 return false;
161 }
162 }
163 if let Some(ref aid) = self.agent_id {
164 if !event.is_visible_to_agent(aid) {
165 return false;
166 }
167 }
168 true
169 }
170}
171
172fn event_from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<ContextEventV1> {
173 let ts_str: String = row.get(5)?;
174 let ts = DateTime::parse_from_rfc3339(&ts_str)
175 .map_or_else(|_| Utc::now(), |d| d.with_timezone(&Utc));
176 let payload_str: String = row.get(6)?;
177 let payload: Value = serde_json::from_str(&payload_str).unwrap_or(Value::Null);
178 let kind_str: String = row.get(3)?;
179 let cl = ContextEventKindV1::parse(&kind_str)
180 .consistency_level()
181 .as_str()
182 .to_string();
183 Ok(ContextEventV1 {
184 id: row.get(0)?,
185 workspace_id: row.get(1)?,
186 channel_id: row.get(2)?,
187 kind: kind_str,
188 actor: row.get::<_, Option<String>>(4)?,
189 timestamp: ts,
190 version: row.get::<_, i64>(7).unwrap_or(0),
191 parent_id: row.get::<_, Option<i64>>(8).ok().flatten(),
192 consistency_level: cl,
193 payload,
194 target_agents: None,
195 })
196}
197
198#[derive(Clone)]
199pub struct ContextBus {
200 inner: Arc<Inner>,
201}
202
203const STREAM_CHANNEL_SIZE: usize = 256;
204
205struct Inner {
206 write_conn: Mutex<Connection>,
207 read_pool: Mutex<Vec<Connection>>,
208 streams: Mutex<HashMap<String, broadcast::Sender<ContextEventV1>>>,
209 version_cache: Mutex<HashMap<String, i64>>,
210 db_path: PathBuf,
211}
212
213impl Inner {
214 fn open_read_conn(path: &PathBuf) -> Connection {
215 let conn = Connection::open(path).expect("open read context-os db");
216 let _ = conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA query_only=ON;");
217 conn
218 }
219
220 fn take_read_conn(&self) -> Connection {
221 self.read_pool
222 .lock()
223 .unwrap_or_else(std::sync::PoisonError::into_inner)
224 .pop()
225 .unwrap_or_else(|| Self::open_read_conn(&self.db_path))
226 }
227
228 fn return_read_conn(&self, conn: Connection) {
229 let mut pool = self
230 .read_pool
231 .lock()
232 .unwrap_or_else(std::sync::PoisonError::into_inner);
233 if pool.len() < MAX_READ_CONNS {
234 pool.push(conn);
235 }
236 }
237
238 fn stream_key(workspace_id: &str, channel_id: &str) -> String {
239 format!("{workspace_id}\0{channel_id}")
240 }
241
242 fn next_version(&self, workspace_id: &str, channel_id: &str) -> i64 {
243 let key = Self::stream_key(workspace_id, channel_id);
244
245 {
246 let mut cache = self
247 .version_cache
248 .lock()
249 .unwrap_or_else(std::sync::PoisonError::into_inner);
250 if let Some(v) = cache.get_mut(&key) {
251 *v += 1;
252 return *v;
253 }
254 }
255
256 let conn = self.take_read_conn();
257 let v: i64 = conn
258 .query_row(
259 "SELECT COALESCE(MAX(version), 0) FROM context_events WHERE workspace_id = ?1 AND channel_id = ?2",
260 params![workspace_id, channel_id],
261 |row| row.get(0),
262 )
263 .unwrap_or(0);
264 self.return_read_conn(conn);
265
266 let mut cache = self
267 .version_cache
268 .lock()
269 .unwrap_or_else(std::sync::PoisonError::into_inner);
270 let entry = cache.entry(key).or_insert(v);
271 *entry = (*entry).max(v) + 1;
272 *entry
273 }
274}
275
276impl Default for ContextBus {
277 fn default() -> Self {
278 Self::new()
279 }
280}
281
282impl ContextBus {
283 pub fn new() -> Self {
284 let path = default_db_path();
285 Self::open_at(path)
286 }
287
288 fn open_at(path: PathBuf) -> Self {
289 if let Some(parent) = path.parent() {
290 let _ = std::fs::create_dir_all(parent);
291 }
292 let conn = Connection::open(&path).expect("open context-os db");
293 conn.execute_batch(
294 "PRAGMA journal_mode=WAL;
295 CREATE TABLE IF NOT EXISTS context_events (
296 id INTEGER PRIMARY KEY AUTOINCREMENT,
297 workspace_id TEXT NOT NULL,
298 channel_id TEXT NOT NULL,
299 kind TEXT NOT NULL,
300 actor TEXT,
301 timestamp TEXT NOT NULL,
302 payload_json TEXT NOT NULL,
303 version INTEGER NOT NULL DEFAULT 0,
304 parent_id INTEGER
305 );
306 CREATE INDEX IF NOT EXISTS idx_context_events_stream
307 ON context_events(workspace_id, channel_id, id);",
308 )
309 .expect("init context-os db");
310
311 let _ = conn.execute_batch(
312 "ALTER TABLE context_events ADD COLUMN version INTEGER NOT NULL DEFAULT 0;",
313 );
314 let _ = conn.execute_batch("ALTER TABLE context_events ADD COLUMN parent_id INTEGER;");
315
316 let _ = conn.execute_batch(
317 "CREATE VIRTUAL TABLE IF NOT EXISTS context_events_fts USING fts5(
318 payload_text,
319 content=context_events,
320 content_rowid=id
321 );",
322 );
323
324 let mut read_conns = Vec::with_capacity(MAX_READ_CONNS);
325 for _ in 0..MAX_READ_CONNS {
326 read_conns.push(Inner::open_read_conn(&path));
327 }
328
329 Self {
330 inner: Arc::new(Inner {
331 write_conn: Mutex::new(conn),
332 read_pool: Mutex::new(read_conns),
333 streams: Mutex::new(HashMap::new()),
334 version_cache: Mutex::new(HashMap::new()),
335 db_path: path,
336 }),
337 }
338 }
339
340 pub fn subscribe(
341 &self,
342 workspace_id: &str,
343 channel_id: &str,
344 ) -> broadcast::Receiver<ContextEventV1> {
345 let key = Inner::stream_key(workspace_id, channel_id);
346 let mut streams = self
347 .inner
348 .streams
349 .lock()
350 .unwrap_or_else(std::sync::PoisonError::into_inner);
351 let tx = streams
352 .entry(key)
353 .or_insert_with(|| broadcast::channel(STREAM_CHANNEL_SIZE).0);
354 tx.subscribe()
355 }
356
357 pub fn subscribe_filtered(
360 &self,
361 workspace_id: &str,
362 channel_id: &str,
363 filter: TopicFilter,
364 ) -> FilteredSubscription {
365 let rx = self.subscribe(workspace_id, channel_id);
366 FilteredSubscription { rx, filter }
367 }
368
369 pub fn append(
370 &self,
371 workspace_id: &str,
372 channel_id: &str,
373 kind: &ContextEventKindV1,
374 actor: Option<&str>,
375 payload: Value,
376 ) -> Option<ContextEventV1> {
377 self.append_with_parent(workspace_id, channel_id, kind, actor, payload, None)
378 }
379
380 pub fn append_with_parent(
381 &self,
382 workspace_id: &str,
383 channel_id: &str,
384 kind: &ContextEventKindV1,
385 actor: Option<&str>,
386 payload: Value,
387 parent_id: Option<i64>,
388 ) -> Option<ContextEventV1> {
389 let ev = self.insert_event(
390 workspace_id,
391 channel_id,
392 kind,
393 actor,
394 payload,
395 parent_id,
396 None,
397 )?;
398 self.broadcast_event(&ev);
399 Some(ev)
400 }
401
402 pub fn append_directed(
405 &self,
406 workspace_id: &str,
407 channel_id: &str,
408 kind: &ContextEventKindV1,
409 actor: Option<&str>,
410 payload: Value,
411 target_agents: Vec<String>,
412 ) -> Option<ContextEventV1> {
413 let ev = self.insert_event(
414 workspace_id,
415 channel_id,
416 kind,
417 actor,
418 payload,
419 None,
420 Some(target_agents),
421 )?;
422 self.broadcast_event(&ev);
423 Some(ev)
424 }
425
426 fn insert_event(
427 &self,
428 workspace_id: &str,
429 channel_id: &str,
430 kind: &ContextEventKindV1,
431 actor: Option<&str>,
432 payload: Value,
433 parent_id: Option<i64>,
434 target_agents: Option<Vec<String>>,
435 ) -> Option<ContextEventV1> {
436 let ts = Utc::now();
437 let payload_json = payload.to_string();
438
439 let (id, version) = {
440 let Ok(conn) = self.inner.write_conn.lock() else {
441 return None;
442 };
443 let version = self.inner.next_version(workspace_id, channel_id);
444
445 let result: Result<(i64, i64), rusqlite::Error> = conn
446 .execute_batch("BEGIN IMMEDIATE")
447 .and_then(|()| {
448 conn.execute(
449 "INSERT INTO context_events (workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id)
450 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
451 params![
452 workspace_id,
453 channel_id,
454 kind.as_str(),
455 actor.map(str::to_string),
456 ts.to_rfc3339(),
457 payload_json,
458 version,
459 parent_id,
460 ],
461 )?;
462 let rowid = conn.last_insert_rowid();
463 if let Err(e) = conn.execute(
464 "INSERT INTO context_events_fts(rowid, payload_text) VALUES (?1, ?2)",
465 params![rowid, payload_json],
466 ) {
467 tracing::warn!("FTS insert failed for event {rowid}: {e}");
468 }
469 conn.execute_batch("COMMIT")?;
470 Ok((rowid, version))
471 });
472
473 match result {
474 Ok(pair) => pair,
475 Err(e) => {
476 tracing::warn!("context bus append failed: {e}");
477 let _ = conn.execute_batch("ROLLBACK");
478 return None;
479 }
480 }
481 };
482
483 Some(ContextEventV1 {
484 id,
485 workspace_id: workspace_id.to_string(),
486 channel_id: channel_id.to_string(),
487 consistency_level: kind.consistency_level().as_str().to_string(),
488 kind: kind.as_str().to_string(),
489 actor: actor.map(str::to_string),
490 timestamp: ts,
491 version,
492 parent_id,
493 payload,
494 target_agents,
495 })
496 }
497
498 fn broadcast_event(&self, ev: &ContextEventV1) {
499 let key = Inner::stream_key(&ev.workspace_id, &ev.channel_id);
500 let tx = self
501 .inner
502 .streams
503 .lock()
504 .unwrap_or_else(std::sync::PoisonError::into_inner)
505 .get(&key)
506 .cloned();
507 if let Some(tx) = tx {
508 let _ = tx.send(ev.clone());
509 }
510 }
511
512 pub fn read(
513 &self,
514 workspace_id: &str,
515 channel_id: &str,
516 since: i64,
517 limit: usize,
518 ) -> Vec<ContextEventV1> {
519 let limit = limit.clamp(1, 1000) as i64;
520 let conn = self.inner.take_read_conn();
521 let result = (|| {
522 let mut stmt = conn.prepare(
523 "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id
524 FROM context_events
525 WHERE workspace_id = ?1 AND channel_id = ?2 AND id > ?3
526 ORDER BY id ASC
527 LIMIT ?4",
528 ).ok()?;
529 let rows = stmt
530 .query_map(
531 params![workspace_id, channel_id, since, limit],
532 event_from_row,
533 )
534 .ok()?;
535 Some(rows.flatten().collect::<Vec<_>>())
536 })();
537 self.inner.return_read_conn(conn);
538 result.unwrap_or_default()
539 }
540
541 pub fn recent_by_kind(
543 &self,
544 workspace_id: &str,
545 channel_id: &str,
546 kind: &str,
547 limit: usize,
548 ) -> Vec<ContextEventV1> {
549 let limit = limit.clamp(1, 100) as i64;
550 let conn = self.inner.take_read_conn();
551 let result = (|| {
552 let mut stmt = conn.prepare(
553 "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id
554 FROM context_events
555 WHERE workspace_id = ?1 AND channel_id = ?2 AND kind = ?3
556 ORDER BY id DESC
557 LIMIT ?4",
558 ).ok()?;
559 let rows = stmt
560 .query_map(
561 params![workspace_id, channel_id, kind, limit],
562 event_from_row,
563 )
564 .ok()?;
565 Some(rows.flatten().collect::<Vec<_>>())
566 })();
567 self.inner.return_read_conn(conn);
568 result.unwrap_or_default()
569 }
570
571 pub fn search(
573 &self,
574 workspace_id: &str,
575 channel_id: Option<&str>,
576 query: &str,
577 limit: usize,
578 ) -> Vec<ContextEventV1> {
579 let limit = limit.clamp(1, 100) as i64;
580 let conn = self.inner.take_read_conn();
581 let result =
582 if let Some(ch) = channel_id {
583 (|| {
584 let mut stmt = conn.prepare(
585 "SELECT e.id, e.workspace_id, e.channel_id, e.kind, e.actor, e.timestamp,
586 e.payload_json, e.version, e.parent_id
587 FROM context_events e
588 JOIN context_events_fts f ON e.id = f.rowid
589 WHERE f.payload_text MATCH ?1 AND e.workspace_id = ?2 AND e.channel_id = ?3
590 ORDER BY f.rank
591 LIMIT ?4",
592 ).ok()?;
593 let rows = stmt
594 .query_map(params![query, workspace_id, ch, limit], event_from_row)
595 .ok()?;
596 Some(rows.flatten().collect::<Vec<_>>())
597 })()
598 } else {
599 (|| {
600 let mut stmt = conn.prepare(
601 "SELECT e.id, e.workspace_id, e.channel_id, e.kind, e.actor, e.timestamp,
602 e.payload_json, e.version, e.parent_id
603 FROM context_events e
604 JOIN context_events_fts f ON e.id = f.rowid
605 WHERE f.payload_text MATCH ?1 AND e.workspace_id = ?2
606 ORDER BY f.rank
607 LIMIT ?3",
608 ).ok()?;
609 let rows = stmt
610 .query_map(params![query, workspace_id, limit], event_from_row)
611 .ok()?;
612 Some(rows.flatten().collect::<Vec<_>>())
613 })()
614 };
615 self.inner.return_read_conn(conn);
616 result.unwrap_or_default()
617 }
618
619 pub fn lineage(
622 &self,
623 event_id: i64,
624 workspace_id: &str,
625 max_depth: usize,
626 ) -> Vec<ContextEventV1> {
627 let max_depth = max_depth.clamp(1, 50);
628 let conn = self.inner.take_read_conn();
629 let mut chain = Vec::new();
630 let mut current_id = Some(event_id);
631
632 for _ in 0..max_depth {
633 let Some(id) = current_id else {
634 break;
635 };
636 let ev = conn.query_row(
637 "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id
638 FROM context_events WHERE id = ?1 AND workspace_id = ?2",
639 params![id, workspace_id],
640 event_from_row,
641 );
642 match ev {
643 Ok(ev) => {
644 current_id = ev.parent_id;
645 chain.push(ev);
646 }
647 Err(_) => break,
648 }
649 }
650 self.inner.return_read_conn(conn);
651 chain
652 }
653
654 pub fn latest_id(&self, workspace_id: &str, channel_id: &str) -> i64 {
656 let conn = self.inner.take_read_conn();
657 let result = conn
658 .query_row(
659 "SELECT COALESCE(MAX(id), 0) FROM context_events WHERE workspace_id = ?1 AND channel_id = ?2",
660 params![workspace_id, channel_id],
661 |row| row.get(0),
662 )
663 .unwrap_or(0);
664 self.inner.return_read_conn(conn);
665 result
666 }
667}
668
669pub struct FilteredSubscription {
671 pub rx: broadcast::Receiver<ContextEventV1>,
672 pub filter: TopicFilter,
673}
674
675impl FilteredSubscription {
676 pub async fn recv_filtered(&mut self) -> Result<ContextEventV1, broadcast::error::RecvError> {
679 loop {
680 let ev = self.rx.recv().await?;
681 if self.filter.matches(&ev) {
682 return Ok(ev);
683 }
684 }
685 }
686}
687
688fn default_db_path() -> PathBuf {
689 let data = crate::core::data_dir::lean_ctx_data_dir().unwrap_or_else(|_| PathBuf::from("."));
690 data.join("context-os").join("context-os.db")
691}
692
693#[cfg(test)]
694mod tests {
695 use super::*;
696 use tempfile::tempdir;
697
698 fn test_bus() -> (ContextBus, tempfile::TempDir) {
699 let td = tempdir().expect("tempdir");
700 let bus = ContextBus::open_at(td.path().join("test-context-os.db"));
701 (bus, td)
702 }
703
704 #[test]
705 fn append_and_read_roundtrip() {
706 let (bus, _td) = test_bus();
707 let ev = bus
708 .append(
709 "ws",
710 "ch",
711 &ContextEventKindV1::ToolCallRecorded,
712 Some("agent"),
713 serde_json::json!({"tool":"ctx_read"}),
714 )
715 .expect("append");
716 let got = bus.read("ws", "ch", ev.id - 1, 10);
717 assert!(got.iter().any(|e| e.id == ev.id));
718 }
719
720 #[test]
721 fn multi_client_concurrent_appends_have_deterministic_ordering() {
722 let (bus, _td) = test_bus();
723 let bus = Arc::new(bus);
724 let n_clients = 5;
725 let n_events_per_client = 20;
726 let ws = format!("ws-concurrent-{}", std::process::id());
727 let ch = format!("ch-concurrent-{}", std::process::id());
728
729 let mut handles = vec![];
730 for client_idx in 0..n_clients {
731 let bus = Arc::clone(&bus);
732 let ws = ws.clone();
733 let ch = ch.clone();
734 handles.push(std::thread::spawn(move || {
735 let agent = format!("agent-{client_idx}");
736 for event_idx in 0..n_events_per_client {
737 bus.append(
738 &ws,
739 &ch,
740 &ContextEventKindV1::ToolCallRecorded,
741 Some(&agent),
742 serde_json::json!({"client": client_idx, "seq": event_idx}),
743 );
744 }
745 }));
746 }
747
748 for h in handles {
749 h.join().unwrap();
750 }
751
752 let all = bus.read(&ws, &ch, 0, 1000);
753 assert_eq!(
754 all.len(),
755 n_clients * n_events_per_client,
756 "all events should be persisted"
757 );
758
759 let ids: Vec<i64> = all.iter().map(|e| e.id).collect();
760 let mut sorted = ids.clone();
761 sorted.sort_unstable();
762 assert_eq!(ids, sorted, "events must be in strictly ascending ID order");
763
764 for win in ids.windows(2) {
765 assert!(
766 win[1] > win[0],
767 "IDs must be strictly monotonic (no gaps from concurrent access)"
768 );
769 }
770 }
771
772 #[test]
773 fn workspace_channel_isolation() {
774 let (bus, _td) = test_bus();
775 let pid = std::process::id();
776 let ws_a = format!("ws-iso-a-{pid}");
777 let ws_b = format!("ws-iso-b-{pid}");
778 let ws_c = format!("ws-iso-c-{pid}");
779 let ch1 = format!("ch-iso-1-{pid}");
780 let ch2 = format!("ch-iso-2-{pid}");
781
782 bus.append(
783 &ws_a,
784 &ch1,
785 &ContextEventKindV1::SessionMutated,
786 Some("agent-a"),
787 serde_json::json!({"ws":"a","ch":"1"}),
788 );
789 bus.append(
790 &ws_a,
791 &ch2,
792 &ContextEventKindV1::KnowledgeRemembered,
793 Some("agent-a"),
794 serde_json::json!({"ws":"a","ch":"2"}),
795 );
796 bus.append(
797 &ws_b,
798 &ch1,
799 &ContextEventKindV1::ArtifactStored,
800 Some("agent-b"),
801 serde_json::json!({"ws":"b","ch":"1"}),
802 );
803
804 let ws_a_ch_1 = bus.read(&ws_a, &ch1, 0, 100);
805 assert_eq!(ws_a_ch_1.len(), 1);
806 assert_eq!(ws_a_ch_1[0].kind, "session_mutated");
807
808 let ws_a_ch_2 = bus.read(&ws_a, &ch2, 0, 100);
809 assert_eq!(ws_a_ch_2.len(), 1);
810 assert_eq!(ws_a_ch_2[0].kind, "knowledge_remembered");
811
812 let ws_b_ch_1 = bus.read(&ws_b, &ch1, 0, 100);
813 assert_eq!(ws_b_ch_1.len(), 1);
814 assert_eq!(ws_b_ch_1[0].kind, "artifact_stored");
815
816 let ws_c_ch_1 = bus.read(&ws_c, &ch1, 0, 100);
817 assert!(ws_c_ch_1.is_empty(), "non-existent workspace returns empty");
818 }
819
820 #[test]
821 fn replay_from_cursor_returns_only_newer_events() {
822 let (bus, _td) = test_bus();
823 let pid = std::process::id();
824 let ws = &format!("ws-replay-{pid}");
825 let ch = &format!("ch-replay-{pid}");
826
827 let ev1 = bus
828 .append(
829 ws,
830 ch,
831 &ContextEventKindV1::ToolCallRecorded,
832 None,
833 serde_json::json!({"seq":1}),
834 )
835 .unwrap();
836 let ev2 = bus
837 .append(
838 ws,
839 ch,
840 &ContextEventKindV1::SessionMutated,
841 None,
842 serde_json::json!({"seq":2}),
843 )
844 .unwrap();
845 let _ev3 = bus
846 .append(
847 ws,
848 ch,
849 &ContextEventKindV1::GraphBuilt,
850 None,
851 serde_json::json!({"seq":3}),
852 )
853 .unwrap();
854
855 let from_cursor = bus.read(ws, ch, ev2.id, 100);
856 assert_eq!(from_cursor.len(), 1, "only events after cursor");
857 assert_eq!(from_cursor[0].kind, "graph_built");
858
859 let from_first = bus.read(ws, ch, ev1.id, 100);
860 assert_eq!(from_first.len(), 2, "events after first");
861
862 let from_zero = bus.read(ws, ch, 0, 100);
863 assert_eq!(from_zero.len(), 3, "all events from zero");
864 }
865
866 #[test]
867 fn broadcast_subscriber_receives_events() {
868 let (bus, _td) = test_bus();
869 let mut rx = bus.subscribe("ws", "ch");
870
871 let ev = bus
872 .append(
873 "ws",
874 "ch",
875 &ContextEventKindV1::ProofAdded,
876 Some("verifier"),
877 serde_json::json!({"proof":"hash"}),
878 )
879 .unwrap();
880
881 let received = rx.try_recv().expect("subscriber should receive event");
882 assert_eq!(received.id, ev.id);
883 assert_eq!(received.kind, "proof_added");
884 assert_eq!(received.actor.as_deref(), Some("verifier"));
885 }
886}