1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::{Arc, Mutex};
4
5use chrono::{DateTime, Utc};
6use rusqlite::{params, Connection};
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use tokio::sync::broadcast;
10
11const MAX_READ_CONNS: usize = 4;
12
13#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
14#[serde(rename_all = "snake_case")]
15pub enum ContextEventKindV1 {
16 ToolCallRecorded,
17 SessionMutated,
18 KnowledgeRemembered,
19 ArtifactStored,
20 GraphBuilt,
21 ProofAdded,
22}
23
24impl ContextEventKindV1 {
25 pub fn as_str(&self) -> &'static str {
26 match self {
27 Self::ToolCallRecorded => "tool_call_recorded",
28 Self::SessionMutated => "session_mutated",
29 Self::KnowledgeRemembered => "knowledge_remembered",
30 Self::ArtifactStored => "artifact_stored",
31 Self::GraphBuilt => "graph_built",
32 Self::ProofAdded => "proof_added",
33 }
34 }
35
36 pub fn parse(s: &str) -> Self {
37 match s.trim().to_lowercase().as_str() {
38 "tool_call_recorded" => Self::ToolCallRecorded,
39 "session_mutated" => Self::SessionMutated,
40 "knowledge_remembered" => Self::KnowledgeRemembered,
41 "artifact_stored" => Self::ArtifactStored,
42 "graph_built" => Self::GraphBuilt,
43 "proof_added" => Self::ProofAdded,
44 other => {
45 tracing::warn!(
46 "unknown ContextEventKind '{other}', defaulting to ToolCallRecorded"
47 );
48 Self::ToolCallRecorded
49 }
50 }
51 }
52
53 pub fn consistency_level(&self) -> ConsistencyLevel {
59 match self {
60 Self::ToolCallRecorded | Self::GraphBuilt => ConsistencyLevel::Local,
61 Self::KnowledgeRemembered | Self::ArtifactStored => ConsistencyLevel::Eventual,
62 Self::SessionMutated | Self::ProofAdded => ConsistencyLevel::Strong,
63 }
64 }
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
70#[serde(rename_all = "snake_case")]
71pub enum ConsistencyLevel {
72 Local = 0,
74 Eventual = 1,
76 Strong = 2,
78}
79
80impl ConsistencyLevel {
81 pub fn as_str(&self) -> &'static str {
82 match self {
83 Self::Local => "local",
84 Self::Eventual => "eventual",
85 Self::Strong => "strong",
86 }
87 }
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91#[serde(rename_all = "camelCase")]
92pub struct ContextEventV1 {
93 pub id: i64,
94 pub workspace_id: String,
95 pub channel_id: String,
96 pub kind: String,
97 pub actor: Option<String>,
98 pub timestamp: DateTime<Utc>,
99 pub version: i64,
100 #[serde(skip_serializing_if = "Option::is_none")]
101 pub parent_id: Option<i64>,
102 pub consistency_level: String,
103 pub payload: Value,
104 #[serde(skip_serializing_if = "Option::is_none", default)]
105 pub target_agents: Option<Vec<String>>,
106}
107
108impl ContextEventV1 {
109 pub fn consistency(&self) -> ConsistencyLevel {
110 ContextEventKindV1::parse(&self.kind).consistency_level()
111 }
112
113 pub fn is_visible_to_agent(&self, agent_id: &str) -> bool {
114 match &self.target_agents {
115 None => true,
116 Some(targets) => targets.iter().any(|t| t == agent_id),
117 }
118 }
119}
120
121#[derive(Debug, Clone, Default)]
124pub struct TopicFilter {
125 pub kinds: Option<Vec<ContextEventKindV1>>,
126 pub actors: Option<Vec<String>>,
127 pub min_consistency: Option<ConsistencyLevel>,
128 pub agent_id: Option<String>,
129}
130
131impl TopicFilter {
132 pub fn matches(&self, event: &ContextEventV1) -> bool {
133 if let Some(ref kinds) = self.kinds {
134 let parsed = ContextEventKindV1::parse(&event.kind);
135 if !kinds.contains(&parsed) {
136 return false;
137 }
138 }
139 if let Some(ref actors) = self.actors {
140 match &event.actor {
141 Some(actor) if actors.iter().any(|a| a == actor) => {}
142 Some(_) | None => return false,
143 }
144 }
145 if let Some(min) = self.min_consistency {
146 if event.consistency() < min {
147 return false;
148 }
149 }
150 if let Some(ref aid) = self.agent_id {
151 if !event.is_visible_to_agent(aid) {
152 return false;
153 }
154 }
155 true
156 }
157}
158
159fn event_from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<ContextEventV1> {
160 let ts_str: String = row.get(5)?;
161 let ts = DateTime::parse_from_rfc3339(&ts_str)
162 .map_or_else(|_| Utc::now(), |d| d.with_timezone(&Utc));
163 let payload_str: String = row.get(6)?;
164 let payload: Value = serde_json::from_str(&payload_str).unwrap_or(Value::Null);
165 let kind_str: String = row.get(3)?;
166 let cl = ContextEventKindV1::parse(&kind_str)
167 .consistency_level()
168 .as_str()
169 .to_string();
170 Ok(ContextEventV1 {
171 id: row.get(0)?,
172 workspace_id: row.get(1)?,
173 channel_id: row.get(2)?,
174 kind: kind_str,
175 actor: row.get::<_, Option<String>>(4)?,
176 timestamp: ts,
177 version: row.get::<_, i64>(7).unwrap_or(0),
178 parent_id: row.get::<_, Option<i64>>(8).ok().flatten(),
179 consistency_level: cl,
180 payload,
181 target_agents: None,
182 })
183}
184
185#[derive(Clone)]
186pub struct ContextBus {
187 inner: Arc<Inner>,
188}
189
190const STREAM_CHANNEL_SIZE: usize = 256;
191
192struct Inner {
193 write_conn: Mutex<Connection>,
194 read_pool: Mutex<Vec<Connection>>,
195 streams: Mutex<HashMap<String, broadcast::Sender<ContextEventV1>>>,
196 version_cache: Mutex<HashMap<String, i64>>,
197 db_path: PathBuf,
198}
199
200impl Inner {
201 fn open_read_conn(path: &PathBuf) -> Connection {
202 let conn = Connection::open(path).expect("open read context-os db");
203 let _ = conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA query_only=ON;");
204 conn
205 }
206
207 fn take_read_conn(&self) -> Connection {
208 self.read_pool
209 .lock()
210 .unwrap_or_else(std::sync::PoisonError::into_inner)
211 .pop()
212 .unwrap_or_else(|| Self::open_read_conn(&self.db_path))
213 }
214
215 fn return_read_conn(&self, conn: Connection) {
216 let mut pool = self
217 .read_pool
218 .lock()
219 .unwrap_or_else(std::sync::PoisonError::into_inner);
220 if pool.len() < MAX_READ_CONNS {
221 pool.push(conn);
222 }
223 }
224
225 fn stream_key(workspace_id: &str, channel_id: &str) -> String {
226 format!("{workspace_id}\0{channel_id}")
227 }
228
229 fn next_version(&self, workspace_id: &str, channel_id: &str) -> i64 {
230 let key = Self::stream_key(workspace_id, channel_id);
231
232 {
233 let mut cache = self
234 .version_cache
235 .lock()
236 .unwrap_or_else(std::sync::PoisonError::into_inner);
237 if let Some(v) = cache.get_mut(&key) {
238 *v += 1;
239 return *v;
240 }
241 }
242
243 let conn = self.take_read_conn();
244 let v: i64 = conn
245 .query_row(
246 "SELECT COALESCE(MAX(version), 0) FROM context_events WHERE workspace_id = ?1 AND channel_id = ?2",
247 params![workspace_id, channel_id],
248 |row| row.get(0),
249 )
250 .unwrap_or(0);
251 self.return_read_conn(conn);
252
253 let mut cache = self
254 .version_cache
255 .lock()
256 .unwrap_or_else(std::sync::PoisonError::into_inner);
257 let entry = cache.entry(key).or_insert(v);
258 *entry = (*entry).max(v) + 1;
259 *entry
260 }
261}
262
263impl Default for ContextBus {
264 fn default() -> Self {
265 Self::new()
266 }
267}
268
269impl ContextBus {
270 pub fn new() -> Self {
271 let path = default_db_path();
272 if let Some(parent) = path.parent() {
273 let _ = std::fs::create_dir_all(parent);
274 }
275 let conn = Connection::open(&path).expect("open context-os db");
276 conn.execute_batch(
277 "PRAGMA journal_mode=WAL;
278 CREATE TABLE IF NOT EXISTS context_events (
279 id INTEGER PRIMARY KEY AUTOINCREMENT,
280 workspace_id TEXT NOT NULL,
281 channel_id TEXT NOT NULL,
282 kind TEXT NOT NULL,
283 actor TEXT,
284 timestamp TEXT NOT NULL,
285 payload_json TEXT NOT NULL,
286 version INTEGER NOT NULL DEFAULT 0,
287 parent_id INTEGER
288 );
289 CREATE INDEX IF NOT EXISTS idx_context_events_stream
290 ON context_events(workspace_id, channel_id, id);",
291 )
292 .expect("init context-os db");
293
294 let _ = conn.execute_batch(
295 "ALTER TABLE context_events ADD COLUMN version INTEGER NOT NULL DEFAULT 0;",
296 );
297 let _ = conn.execute_batch("ALTER TABLE context_events ADD COLUMN parent_id INTEGER;");
298
299 let _ = conn.execute_batch(
300 "CREATE VIRTUAL TABLE IF NOT EXISTS context_events_fts USING fts5(
301 payload_text,
302 content=context_events,
303 content_rowid=id
304 );",
305 );
306
307 let mut read_conns = Vec::with_capacity(MAX_READ_CONNS);
308 for _ in 0..MAX_READ_CONNS {
309 read_conns.push(Inner::open_read_conn(&path));
310 }
311
312 Self {
313 inner: Arc::new(Inner {
314 write_conn: Mutex::new(conn),
315 read_pool: Mutex::new(read_conns),
316 streams: Mutex::new(HashMap::new()),
317 version_cache: Mutex::new(HashMap::new()),
318 db_path: path,
319 }),
320 }
321 }
322
323 pub fn subscribe(
324 &self,
325 workspace_id: &str,
326 channel_id: &str,
327 ) -> broadcast::Receiver<ContextEventV1> {
328 let key = Inner::stream_key(workspace_id, channel_id);
329 let mut streams = self
330 .inner
331 .streams
332 .lock()
333 .unwrap_or_else(std::sync::PoisonError::into_inner);
334 let tx = streams
335 .entry(key)
336 .or_insert_with(|| broadcast::channel(STREAM_CHANNEL_SIZE).0);
337 tx.subscribe()
338 }
339
340 pub fn subscribe_filtered(
343 &self,
344 workspace_id: &str,
345 channel_id: &str,
346 filter: TopicFilter,
347 ) -> FilteredSubscription {
348 let rx = self.subscribe(workspace_id, channel_id);
349 FilteredSubscription { rx, filter }
350 }
351
352 pub fn append(
353 &self,
354 workspace_id: &str,
355 channel_id: &str,
356 kind: &ContextEventKindV1,
357 actor: Option<&str>,
358 payload: Value,
359 ) -> Option<ContextEventV1> {
360 self.append_with_parent(workspace_id, channel_id, kind, actor, payload, None)
361 }
362
363 pub fn append_with_parent(
364 &self,
365 workspace_id: &str,
366 channel_id: &str,
367 kind: &ContextEventKindV1,
368 actor: Option<&str>,
369 payload: Value,
370 parent_id: Option<i64>,
371 ) -> Option<ContextEventV1> {
372 let ev = self.insert_event(
373 workspace_id,
374 channel_id,
375 kind,
376 actor,
377 payload,
378 parent_id,
379 None,
380 )?;
381 self.broadcast_event(&ev);
382 Some(ev)
383 }
384
385 pub fn append_directed(
388 &self,
389 workspace_id: &str,
390 channel_id: &str,
391 kind: &ContextEventKindV1,
392 actor: Option<&str>,
393 payload: Value,
394 target_agents: Vec<String>,
395 ) -> Option<ContextEventV1> {
396 let ev = self.insert_event(
397 workspace_id,
398 channel_id,
399 kind,
400 actor,
401 payload,
402 None,
403 Some(target_agents),
404 )?;
405 self.broadcast_event(&ev);
406 Some(ev)
407 }
408
409 fn insert_event(
410 &self,
411 workspace_id: &str,
412 channel_id: &str,
413 kind: &ContextEventKindV1,
414 actor: Option<&str>,
415 payload: Value,
416 parent_id: Option<i64>,
417 target_agents: Option<Vec<String>>,
418 ) -> Option<ContextEventV1> {
419 let ts = Utc::now();
420 let payload_json = payload.to_string();
421
422 let (id, version) = {
423 let Ok(conn) = self.inner.write_conn.lock() else {
424 return None;
425 };
426 let version = self.inner.next_version(workspace_id, channel_id);
427
428 let result: Result<(i64, i64), rusqlite::Error> = conn
429 .execute_batch("BEGIN IMMEDIATE")
430 .and_then(|()| {
431 conn.execute(
432 "INSERT INTO context_events (workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id)
433 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
434 params![
435 workspace_id,
436 channel_id,
437 kind.as_str(),
438 actor.map(str::to_string),
439 ts.to_rfc3339(),
440 payload_json,
441 version,
442 parent_id,
443 ],
444 )?;
445 let rowid = conn.last_insert_rowid();
446 if let Err(e) = conn.execute(
447 "INSERT INTO context_events_fts(rowid, payload_text) VALUES (?1, ?2)",
448 params![rowid, payload_json],
449 ) {
450 tracing::warn!("FTS insert failed for event {rowid}: {e}");
451 }
452 conn.execute_batch("COMMIT")?;
453 Ok((rowid, version))
454 });
455
456 match result {
457 Ok(pair) => pair,
458 Err(e) => {
459 tracing::warn!("context bus append failed: {e}");
460 let _ = conn.execute_batch("ROLLBACK");
461 return None;
462 }
463 }
464 };
465
466 Some(ContextEventV1 {
467 id,
468 workspace_id: workspace_id.to_string(),
469 channel_id: channel_id.to_string(),
470 consistency_level: kind.consistency_level().as_str().to_string(),
471 kind: kind.as_str().to_string(),
472 actor: actor.map(str::to_string),
473 timestamp: ts,
474 version,
475 parent_id,
476 payload,
477 target_agents,
478 })
479 }
480
481 fn broadcast_event(&self, ev: &ContextEventV1) {
482 let key = Inner::stream_key(&ev.workspace_id, &ev.channel_id);
483 let tx = self
484 .inner
485 .streams
486 .lock()
487 .unwrap_or_else(std::sync::PoisonError::into_inner)
488 .get(&key)
489 .cloned();
490 if let Some(tx) = tx {
491 let _ = tx.send(ev.clone());
492 }
493 }
494
495 pub fn read(
496 &self,
497 workspace_id: &str,
498 channel_id: &str,
499 since: i64,
500 limit: usize,
501 ) -> Vec<ContextEventV1> {
502 let limit = limit.clamp(1, 1000) as i64;
503 let conn = self.inner.take_read_conn();
504 let result = (|| {
505 let mut stmt = conn.prepare(
506 "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id
507 FROM context_events
508 WHERE workspace_id = ?1 AND channel_id = ?2 AND id > ?3
509 ORDER BY id ASC
510 LIMIT ?4",
511 ).ok()?;
512 let rows = stmt
513 .query_map(
514 params![workspace_id, channel_id, since, limit],
515 event_from_row,
516 )
517 .ok()?;
518 Some(rows.flatten().collect::<Vec<_>>())
519 })();
520 self.inner.return_read_conn(conn);
521 result.unwrap_or_default()
522 }
523
524 pub fn recent_by_kind(
526 &self,
527 workspace_id: &str,
528 channel_id: &str,
529 kind: &str,
530 limit: usize,
531 ) -> Vec<ContextEventV1> {
532 let limit = limit.clamp(1, 100) as i64;
533 let conn = self.inner.take_read_conn();
534 let result = (|| {
535 let mut stmt = conn.prepare(
536 "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id
537 FROM context_events
538 WHERE workspace_id = ?1 AND channel_id = ?2 AND kind = ?3
539 ORDER BY id DESC
540 LIMIT ?4",
541 ).ok()?;
542 let rows = stmt
543 .query_map(
544 params![workspace_id, channel_id, kind, limit],
545 event_from_row,
546 )
547 .ok()?;
548 Some(rows.flatten().collect::<Vec<_>>())
549 })();
550 self.inner.return_read_conn(conn);
551 result.unwrap_or_default()
552 }
553
554 pub fn search(
556 &self,
557 workspace_id: &str,
558 channel_id: Option<&str>,
559 query: &str,
560 limit: usize,
561 ) -> Vec<ContextEventV1> {
562 let limit = limit.clamp(1, 100) as i64;
563 let conn = self.inner.take_read_conn();
564 let result =
565 if let Some(ch) = channel_id {
566 (|| {
567 let mut stmt = conn.prepare(
568 "SELECT e.id, e.workspace_id, e.channel_id, e.kind, e.actor, e.timestamp,
569 e.payload_json, e.version, e.parent_id
570 FROM context_events e
571 JOIN context_events_fts f ON e.id = f.rowid
572 WHERE f.payload_text MATCH ?1 AND e.workspace_id = ?2 AND e.channel_id = ?3
573 ORDER BY f.rank
574 LIMIT ?4",
575 ).ok()?;
576 let rows = stmt
577 .query_map(params![query, workspace_id, ch, limit], event_from_row)
578 .ok()?;
579 Some(rows.flatten().collect::<Vec<_>>())
580 })()
581 } else {
582 (|| {
583 let mut stmt = conn.prepare(
584 "SELECT e.id, e.workspace_id, e.channel_id, e.kind, e.actor, e.timestamp,
585 e.payload_json, e.version, e.parent_id
586 FROM context_events e
587 JOIN context_events_fts f ON e.id = f.rowid
588 WHERE f.payload_text MATCH ?1 AND e.workspace_id = ?2
589 ORDER BY f.rank
590 LIMIT ?3",
591 ).ok()?;
592 let rows = stmt
593 .query_map(params![query, workspace_id, limit], event_from_row)
594 .ok()?;
595 Some(rows.flatten().collect::<Vec<_>>())
596 })()
597 };
598 self.inner.return_read_conn(conn);
599 result.unwrap_or_default()
600 }
601
602 pub fn lineage(
605 &self,
606 event_id: i64,
607 workspace_id: &str,
608 max_depth: usize,
609 ) -> Vec<ContextEventV1> {
610 let max_depth = max_depth.clamp(1, 50);
611 let conn = self.inner.take_read_conn();
612 let mut chain = Vec::new();
613 let mut current_id = Some(event_id);
614
615 for _ in 0..max_depth {
616 let Some(id) = current_id else {
617 break;
618 };
619 let ev = conn.query_row(
620 "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id
621 FROM context_events WHERE id = ?1 AND workspace_id = ?2",
622 params![id, workspace_id],
623 event_from_row,
624 );
625 match ev {
626 Ok(ev) => {
627 current_id = ev.parent_id;
628 chain.push(ev);
629 }
630 Err(_) => break,
631 }
632 }
633 self.inner.return_read_conn(conn);
634 chain
635 }
636
637 pub fn latest_id(&self, workspace_id: &str, channel_id: &str) -> i64 {
639 let conn = self.inner.take_read_conn();
640 let result = conn
641 .query_row(
642 "SELECT COALESCE(MAX(id), 0) FROM context_events WHERE workspace_id = ?1 AND channel_id = ?2",
643 params![workspace_id, channel_id],
644 |row| row.get(0),
645 )
646 .unwrap_or(0);
647 self.inner.return_read_conn(conn);
648 result
649 }
650}
651
652pub struct FilteredSubscription {
654 pub rx: broadcast::Receiver<ContextEventV1>,
655 pub filter: TopicFilter,
656}
657
658impl FilteredSubscription {
659 pub async fn recv_filtered(&mut self) -> Result<ContextEventV1, broadcast::error::RecvError> {
662 loop {
663 let ev = self.rx.recv().await?;
664 if self.filter.matches(&ev) {
665 return Ok(ev);
666 }
667 }
668 }
669}
670
671fn default_db_path() -> PathBuf {
672 let data = crate::core::data_dir::lean_ctx_data_dir().unwrap_or_else(|_| PathBuf::from("."));
673 data.join("context-os").join("context-os.db")
674}
675
676#[cfg(test)]
677mod tests {
678 use super::*;
679
680 #[test]
681 fn append_and_read_roundtrip() {
682 let bus = ContextBus::new();
683 let ev = bus
684 .append(
685 "ws",
686 "ch",
687 &ContextEventKindV1::ToolCallRecorded,
688 Some("agent"),
689 serde_json::json!({"tool":"ctx_read"}),
690 )
691 .expect("append");
692 let got = bus.read("ws", "ch", ev.id - 1, 10);
693 assert!(got.iter().any(|e| e.id == ev.id));
694 }
695
696 #[test]
697 fn multi_client_concurrent_appends_have_deterministic_ordering() {
698 let bus = Arc::new(ContextBus::new());
699 let n_clients = 5;
700 let n_events_per_client = 20;
701 let ws = format!("ws-concurrent-{}", std::process::id());
702 let ch = format!("ch-concurrent-{}", std::process::id());
703
704 let mut handles = vec![];
705 for client_idx in 0..n_clients {
706 let bus = Arc::clone(&bus);
707 let ws = ws.clone();
708 let ch = ch.clone();
709 handles.push(std::thread::spawn(move || {
710 let agent = format!("agent-{client_idx}");
711 for event_idx in 0..n_events_per_client {
712 bus.append(
713 &ws,
714 &ch,
715 &ContextEventKindV1::ToolCallRecorded,
716 Some(&agent),
717 serde_json::json!({"client": client_idx, "seq": event_idx}),
718 );
719 }
720 }));
721 }
722
723 for h in handles {
724 h.join().unwrap();
725 }
726
727 let all = bus.read(&ws, &ch, 0, 1000);
728 assert_eq!(
729 all.len(),
730 n_clients * n_events_per_client,
731 "all events should be persisted"
732 );
733
734 let ids: Vec<i64> = all.iter().map(|e| e.id).collect();
735 let mut sorted = ids.clone();
736 sorted.sort_unstable();
737 assert_eq!(ids, sorted, "events must be in strictly ascending ID order");
738
739 for win in ids.windows(2) {
740 assert!(
741 win[1] > win[0],
742 "IDs must be strictly monotonic (no gaps from concurrent access)"
743 );
744 }
745 }
746
747 #[test]
748 fn workspace_channel_isolation() {
749 let bus = ContextBus::new();
750 let pid = std::process::id();
751 let ws_a = format!("ws-iso-a-{pid}");
752 let ws_b = format!("ws-iso-b-{pid}");
753 let ws_c = format!("ws-iso-c-{pid}");
754 let ch1 = format!("ch-iso-1-{pid}");
755 let ch2 = format!("ch-iso-2-{pid}");
756
757 bus.append(
758 &ws_a,
759 &ch1,
760 &ContextEventKindV1::SessionMutated,
761 Some("agent-a"),
762 serde_json::json!({"ws":"a","ch":"1"}),
763 );
764 bus.append(
765 &ws_a,
766 &ch2,
767 &ContextEventKindV1::KnowledgeRemembered,
768 Some("agent-a"),
769 serde_json::json!({"ws":"a","ch":"2"}),
770 );
771 bus.append(
772 &ws_b,
773 &ch1,
774 &ContextEventKindV1::ArtifactStored,
775 Some("agent-b"),
776 serde_json::json!({"ws":"b","ch":"1"}),
777 );
778
779 let ws_a_ch_1 = bus.read(&ws_a, &ch1, 0, 100);
780 assert_eq!(ws_a_ch_1.len(), 1);
781 assert_eq!(ws_a_ch_1[0].kind, "session_mutated");
782
783 let ws_a_ch_2 = bus.read(&ws_a, &ch2, 0, 100);
784 assert_eq!(ws_a_ch_2.len(), 1);
785 assert_eq!(ws_a_ch_2[0].kind, "knowledge_remembered");
786
787 let ws_b_ch_1 = bus.read(&ws_b, &ch1, 0, 100);
788 assert_eq!(ws_b_ch_1.len(), 1);
789 assert_eq!(ws_b_ch_1[0].kind, "artifact_stored");
790
791 let ws_c_ch_1 = bus.read(&ws_c, &ch1, 0, 100);
792 assert!(ws_c_ch_1.is_empty(), "non-existent workspace returns empty");
793 }
794
795 #[test]
796 fn replay_from_cursor_returns_only_newer_events() {
797 let bus = ContextBus::new();
798 let pid = std::process::id();
799 let ws = &format!("ws-replay-{pid}");
800 let ch = &format!("ch-replay-{pid}");
801
802 let ev1 = bus
803 .append(
804 ws,
805 ch,
806 &ContextEventKindV1::ToolCallRecorded,
807 None,
808 serde_json::json!({"seq":1}),
809 )
810 .unwrap();
811 let ev2 = bus
812 .append(
813 ws,
814 ch,
815 &ContextEventKindV1::SessionMutated,
816 None,
817 serde_json::json!({"seq":2}),
818 )
819 .unwrap();
820 let _ev3 = bus
821 .append(
822 ws,
823 ch,
824 &ContextEventKindV1::GraphBuilt,
825 None,
826 serde_json::json!({"seq":3}),
827 )
828 .unwrap();
829
830 let from_cursor = bus.read(ws, ch, ev2.id, 100);
831 assert_eq!(from_cursor.len(), 1, "only events after cursor");
832 assert_eq!(from_cursor[0].kind, "graph_built");
833
834 let from_first = bus.read(ws, ch, ev1.id, 100);
835 assert_eq!(from_first.len(), 2, "events after first");
836
837 let from_zero = bus.read(ws, ch, 0, 100);
838 assert_eq!(from_zero.len(), 3, "all events from zero");
839 }
840
841 #[test]
842 fn broadcast_subscriber_receives_events() {
843 let bus = ContextBus::new();
844 let mut rx = bus.subscribe("ws", "ch");
845
846 let ev = bus
847 .append(
848 "ws",
849 "ch",
850 &ContextEventKindV1::ProofAdded,
851 Some("verifier"),
852 serde_json::json!({"proof":"hash"}),
853 )
854 .unwrap();
855
856 let received = rx.try_recv().expect("subscriber should receive event");
857 assert_eq!(received.id, ev.id);
858 assert_eq!(received.kind, "proof_added");
859 assert_eq!(received.actor.as_deref(), Some("verifier"));
860 }
861}