1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::{Arc, Mutex};
4
5use chrono::{DateTime, Utc};
6use rusqlite::{params, Connection};
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use tokio::sync::broadcast;
10
11const MAX_READ_CONNS: usize = 4;
12
13#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
14#[serde(rename_all = "snake_case")]
15pub enum ContextEventKindV1 {
16 ToolCallRecorded,
17 SessionMutated,
18 KnowledgeRemembered,
19 ArtifactStored,
20 GraphBuilt,
21 ProofAdded,
22}
23
24impl ContextEventKindV1 {
25 pub fn as_str(&self) -> &'static str {
26 match self {
27 Self::ToolCallRecorded => "tool_call_recorded",
28 Self::SessionMutated => "session_mutated",
29 Self::KnowledgeRemembered => "knowledge_remembered",
30 Self::ArtifactStored => "artifact_stored",
31 Self::GraphBuilt => "graph_built",
32 Self::ProofAdded => "proof_added",
33 }
34 }
35
36 pub fn parse(s: &str) -> Self {
37 match s.trim().to_lowercase().as_str() {
38 "tool_call_recorded" => Self::ToolCallRecorded,
39 "session_mutated" => Self::SessionMutated,
40 "knowledge_remembered" => Self::KnowledgeRemembered,
41 "artifact_stored" => Self::ArtifactStored,
42 "graph_built" => Self::GraphBuilt,
43 "proof_added" => Self::ProofAdded,
44 other => {
45 tracing::warn!(
46 "unknown ContextEventKind '{other}', defaulting to ToolCallRecorded"
47 );
48 Self::ToolCallRecorded
49 }
50 }
51 }
52
53 pub fn consistency_level(&self) -> ConsistencyLevel {
59 match self {
60 Self::ToolCallRecorded | Self::GraphBuilt => ConsistencyLevel::Local,
61 Self::KnowledgeRemembered | Self::ArtifactStored => ConsistencyLevel::Eventual,
62 Self::SessionMutated | Self::ProofAdded => ConsistencyLevel::Strong,
63 }
64 }
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
70#[serde(rename_all = "snake_case")]
71pub enum ConsistencyLevel {
72 Local = 0,
74 Eventual = 1,
76 Strong = 2,
78}
79
80impl ConsistencyLevel {
81 pub fn as_str(&self) -> &'static str {
82 match self {
83 Self::Local => "local",
84 Self::Eventual => "eventual",
85 Self::Strong => "strong",
86 }
87 }
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91#[serde(rename_all = "camelCase")]
92pub struct ContextEventV1 {
93 pub id: i64,
94 pub workspace_id: String,
95 pub channel_id: String,
96 pub kind: String,
97 pub actor: Option<String>,
98 pub timestamp: DateTime<Utc>,
99 pub version: i64,
100 #[serde(skip_serializing_if = "Option::is_none")]
101 pub parent_id: Option<i64>,
102 pub consistency_level: String,
103 pub payload: Value,
104 #[serde(skip_serializing_if = "Option::is_none", default)]
105 pub target_agents: Option<Vec<String>>,
106}
107
108impl ContextEventV1 {
109 pub fn consistency(&self) -> ConsistencyLevel {
110 ContextEventKindV1::parse(&self.kind).consistency_level()
111 }
112
113 pub fn is_visible_to_agent(&self, agent_id: &str) -> bool {
114 match &self.target_agents {
115 None => true,
116 Some(targets) => targets.iter().any(|t| t == agent_id),
117 }
118 }
119}
120
121#[derive(Debug, Clone, Default)]
124pub struct TopicFilter {
125 pub kinds: Option<Vec<ContextEventKindV1>>,
126 pub actors: Option<Vec<String>>,
127 pub min_consistency: Option<ConsistencyLevel>,
128 pub agent_id: Option<String>,
129}
130
131impl TopicFilter {
132 pub fn kinds(kind_strs: &[&str]) -> Self {
134 Self {
135 kinds: Some(
136 kind_strs
137 .iter()
138 .map(|s| ContextEventKindV1::parse(s))
139 .collect(),
140 ),
141 ..Self::default()
142 }
143 }
144
145 pub fn matches(&self, event: &ContextEventV1) -> bool {
146 if let Some(ref kinds) = self.kinds {
147 let parsed = ContextEventKindV1::parse(&event.kind);
148 if !kinds.contains(&parsed) {
149 return false;
150 }
151 }
152 if let Some(ref actors) = self.actors {
153 match &event.actor {
154 Some(actor) if actors.iter().any(|a| a == actor) => {}
155 Some(_) | None => return false,
156 }
157 }
158 if let Some(min) = self.min_consistency {
159 if event.consistency() < min {
160 return false;
161 }
162 }
163 if let Some(ref aid) = self.agent_id {
164 if !event.is_visible_to_agent(aid) {
165 return false;
166 }
167 }
168 true
169 }
170}
171
172fn event_from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<ContextEventV1> {
173 let ts_str: String = row.get(5)?;
174 let ts = DateTime::parse_from_rfc3339(&ts_str)
175 .map_or_else(|_| Utc::now(), |d| d.with_timezone(&Utc));
176 let payload_str: String = row.get(6)?;
177 let payload: Value = serde_json::from_str(&payload_str).unwrap_or(Value::Null);
178 let kind_str: String = row.get(3)?;
179 let cl = ContextEventKindV1::parse(&kind_str)
180 .consistency_level()
181 .as_str()
182 .to_string();
183 Ok(ContextEventV1 {
184 id: row.get(0)?,
185 workspace_id: row.get(1)?,
186 channel_id: row.get(2)?,
187 kind: kind_str,
188 actor: row.get::<_, Option<String>>(4)?,
189 timestamp: ts,
190 version: row.get::<_, i64>(7).unwrap_or(0),
191 parent_id: row.get::<_, Option<i64>>(8).ok().flatten(),
192 consistency_level: cl,
193 payload,
194 target_agents: None,
195 })
196}
197
198#[derive(Clone)]
199pub struct ContextBus {
200 inner: Arc<Inner>,
201}
202
203const STREAM_CHANNEL_SIZE: usize = 256;
204const MAX_SUBSCRIBERS_PER_CHANNEL: usize = 64;
205
206struct Inner {
207 write_conn: Mutex<Connection>,
208 read_pool: Mutex<Vec<Connection>>,
209 streams: Mutex<HashMap<String, broadcast::Sender<ContextEventV1>>>,
210 version_cache: Mutex<HashMap<String, i64>>,
211 db_path: PathBuf,
212}
213
214impl Inner {
215 fn open_read_conn(path: &PathBuf) -> Connection {
216 let conn = Connection::open(path).expect("open read context-os db");
217 let _ = conn.busy_timeout(std::time::Duration::from_secs(5));
218 let _ = conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA query_only=ON;");
219 conn
220 }
221
222 fn take_read_conn(&self) -> Connection {
223 self.read_pool
224 .lock()
225 .unwrap_or_else(std::sync::PoisonError::into_inner)
226 .pop()
227 .unwrap_or_else(|| Self::open_read_conn(&self.db_path))
228 }
229
230 fn return_read_conn(&self, conn: Connection) {
231 let mut pool = self
232 .read_pool
233 .lock()
234 .unwrap_or_else(std::sync::PoisonError::into_inner);
235 if pool.len() < MAX_READ_CONNS {
236 pool.push(conn);
237 }
238 }
239
240 fn stream_key(workspace_id: &str, channel_id: &str) -> String {
241 format!("{workspace_id}\0{channel_id}")
242 }
243
244 fn next_version(&self, workspace_id: &str, channel_id: &str) -> i64 {
245 let key = Self::stream_key(workspace_id, channel_id);
246
247 {
248 let mut cache = self
249 .version_cache
250 .lock()
251 .unwrap_or_else(std::sync::PoisonError::into_inner);
252 if let Some(v) = cache.get_mut(&key) {
253 *v += 1;
254 return *v;
255 }
256 }
257
258 let conn = self.take_read_conn();
259 let v: i64 = conn
260 .query_row(
261 "SELECT COALESCE(MAX(version), 0) FROM context_events WHERE workspace_id = ?1 AND channel_id = ?2",
262 params![workspace_id, channel_id],
263 |row| row.get(0),
264 )
265 .unwrap_or(0);
266 self.return_read_conn(conn);
267
268 let mut cache = self
269 .version_cache
270 .lock()
271 .unwrap_or_else(std::sync::PoisonError::into_inner);
272 let entry = cache.entry(key).or_insert(v);
273 *entry = (*entry).max(v) + 1;
274 *entry
275 }
276}
277
278impl Default for ContextBus {
279 fn default() -> Self {
280 Self::new()
281 }
282}
283
284impl ContextBus {
285 pub fn new() -> Self {
286 let path = default_db_path();
287 Self::open_at(path)
288 }
289
290 fn open_at(path: PathBuf) -> Self {
291 if let Some(parent) = path.parent() {
292 let _ = std::fs::create_dir_all(parent);
293 }
294 let conn = Connection::open(&path).expect("open context-os db");
295 let _ = conn.busy_timeout(std::time::Duration::from_secs(5));
296 conn.execute_batch(
297 "PRAGMA journal_mode=WAL;
298 CREATE TABLE IF NOT EXISTS context_events (
299 id INTEGER PRIMARY KEY AUTOINCREMENT,
300 workspace_id TEXT NOT NULL,
301 channel_id TEXT NOT NULL,
302 kind TEXT NOT NULL,
303 actor TEXT,
304 timestamp TEXT NOT NULL,
305 payload_json TEXT NOT NULL,
306 version INTEGER NOT NULL DEFAULT 0,
307 parent_id INTEGER
308 );
309 CREATE INDEX IF NOT EXISTS idx_context_events_stream
310 ON context_events(workspace_id, channel_id, id);",
311 )
312 .expect("init context-os db");
313
314 let _ = conn.execute_batch(
315 "ALTER TABLE context_events ADD COLUMN version INTEGER NOT NULL DEFAULT 0;",
316 );
317 let _ = conn.execute_batch("ALTER TABLE context_events ADD COLUMN parent_id INTEGER;");
318
319 let _ = conn.execute_batch(
320 "CREATE VIRTUAL TABLE IF NOT EXISTS context_events_fts USING fts5(
321 payload_text,
322 content=context_events,
323 content_rowid=id
324 );",
325 );
326
327 let mut read_conns = Vec::with_capacity(MAX_READ_CONNS);
328 for _ in 0..MAX_READ_CONNS {
329 read_conns.push(Inner::open_read_conn(&path));
330 }
331
332 Self {
333 inner: Arc::new(Inner {
334 write_conn: Mutex::new(conn),
335 read_pool: Mutex::new(read_conns),
336 streams: Mutex::new(HashMap::new()),
337 version_cache: Mutex::new(HashMap::new()),
338 db_path: path,
339 }),
340 }
341 }
342
343 pub fn subscribe(
344 &self,
345 workspace_id: &str,
346 channel_id: &str,
347 ) -> Option<broadcast::Receiver<ContextEventV1>> {
348 let key = Inner::stream_key(workspace_id, channel_id);
349 let mut streams = self
350 .inner
351 .streams
352 .lock()
353 .unwrap_or_else(std::sync::PoisonError::into_inner);
354 let tx = streams
355 .entry(key)
356 .or_insert_with(|| broadcast::channel(STREAM_CHANNEL_SIZE).0);
357 if tx.receiver_count() >= MAX_SUBSCRIBERS_PER_CHANNEL {
358 tracing::warn!(
359 "SSE subscriber cap ({MAX_SUBSCRIBERS_PER_CHANNEL}) reached for {workspace_id}/{channel_id} — rejecting"
360 );
361 return None;
362 }
363 Some(tx.subscribe())
364 }
365
366 pub fn subscribe_filtered(
369 &self,
370 workspace_id: &str,
371 channel_id: &str,
372 filter: TopicFilter,
373 ) -> Option<FilteredSubscription> {
374 let rx = self.subscribe(workspace_id, channel_id)?;
375 Some(FilteredSubscription { rx, filter })
376 }
377
378 pub fn append(
379 &self,
380 workspace_id: &str,
381 channel_id: &str,
382 kind: &ContextEventKindV1,
383 actor: Option<&str>,
384 payload: Value,
385 ) -> Option<ContextEventV1> {
386 self.append_with_parent(workspace_id, channel_id, kind, actor, payload, None)
387 }
388
389 pub fn append_with_parent(
390 &self,
391 workspace_id: &str,
392 channel_id: &str,
393 kind: &ContextEventKindV1,
394 actor: Option<&str>,
395 payload: Value,
396 parent_id: Option<i64>,
397 ) -> Option<ContextEventV1> {
398 let ev = self.insert_event(
399 workspace_id,
400 channel_id,
401 kind,
402 actor,
403 payload,
404 parent_id,
405 None,
406 )?;
407 self.broadcast_event(&ev);
408 Some(ev)
409 }
410
411 pub fn append_directed(
414 &self,
415 workspace_id: &str,
416 channel_id: &str,
417 kind: &ContextEventKindV1,
418 actor: Option<&str>,
419 payload: Value,
420 target_agents: Vec<String>,
421 ) -> Option<ContextEventV1> {
422 let ev = self.insert_event(
423 workspace_id,
424 channel_id,
425 kind,
426 actor,
427 payload,
428 None,
429 Some(target_agents),
430 )?;
431 self.broadcast_event(&ev);
432 Some(ev)
433 }
434
435 fn insert_event(
436 &self,
437 workspace_id: &str,
438 channel_id: &str,
439 kind: &ContextEventKindV1,
440 actor: Option<&str>,
441 payload: Value,
442 parent_id: Option<i64>,
443 target_agents: Option<Vec<String>>,
444 ) -> Option<ContextEventV1> {
445 let ts = Utc::now();
446 let payload_json = payload.to_string();
447
448 let (id, version) = {
449 let Ok(conn) = self.inner.write_conn.lock() else {
450 return None;
451 };
452 let version = self.inner.next_version(workspace_id, channel_id);
453
454 let result: Result<(i64, i64), rusqlite::Error> = conn
455 .execute_batch("BEGIN IMMEDIATE")
456 .and_then(|()| {
457 conn.execute(
458 "INSERT INTO context_events (workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id)
459 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
460 params![
461 workspace_id,
462 channel_id,
463 kind.as_str(),
464 actor.map(str::to_string),
465 ts.to_rfc3339(),
466 payload_json,
467 version,
468 parent_id,
469 ],
470 )?;
471 let rowid = conn.last_insert_rowid();
472 if let Err(e) = conn.execute(
473 "INSERT INTO context_events_fts(rowid, payload_text) VALUES (?1, ?2)",
474 params![rowid, payload_json],
475 ) {
476 tracing::warn!("FTS insert failed for event {rowid}: {e}");
477 }
478 conn.execute_batch("COMMIT")?;
479 Ok((rowid, version))
480 });
481
482 match result {
483 Ok(pair) => pair,
484 Err(e) => {
485 tracing::warn!("context bus append failed: {e}");
486 let _ = conn.execute_batch("ROLLBACK");
487 return None;
488 }
489 }
490 };
491
492 Some(ContextEventV1 {
493 id,
494 workspace_id: workspace_id.to_string(),
495 channel_id: channel_id.to_string(),
496 consistency_level: kind.consistency_level().as_str().to_string(),
497 kind: kind.as_str().to_string(),
498 actor: actor.map(str::to_string),
499 timestamp: ts,
500 version,
501 parent_id,
502 payload,
503 target_agents,
504 })
505 }
506
507 fn broadcast_event(&self, ev: &ContextEventV1) {
508 let key = Inner::stream_key(&ev.workspace_id, &ev.channel_id);
509 let tx = self
510 .inner
511 .streams
512 .lock()
513 .unwrap_or_else(std::sync::PoisonError::into_inner)
514 .get(&key)
515 .cloned();
516 if let Some(tx) = tx {
517 let _ = tx.send(ev.clone());
518 }
519 }
520
521 pub fn read(
522 &self,
523 workspace_id: &str,
524 channel_id: &str,
525 since: i64,
526 limit: usize,
527 ) -> Vec<ContextEventV1> {
528 let limit = limit.clamp(1, 1000) as i64;
529 let conn = self.inner.take_read_conn();
530 let result = (|| {
531 let mut stmt = conn.prepare(
532 "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id
533 FROM context_events
534 WHERE workspace_id = ?1 AND channel_id = ?2 AND id > ?3
535 ORDER BY id ASC
536 LIMIT ?4",
537 ).ok()?;
538 let rows = stmt
539 .query_map(
540 params![workspace_id, channel_id, since, limit],
541 event_from_row,
542 )
543 .ok()?;
544 Some(rows.flatten().collect::<Vec<_>>())
545 })();
546 self.inner.return_read_conn(conn);
547 result.unwrap_or_default()
548 }
549
550 pub fn recent_by_kind(
552 &self,
553 workspace_id: &str,
554 channel_id: &str,
555 kind: &str,
556 limit: usize,
557 ) -> Vec<ContextEventV1> {
558 let limit = limit.clamp(1, 100) as i64;
559 let conn = self.inner.take_read_conn();
560 let result = (|| {
561 let mut stmt = conn.prepare(
562 "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id
563 FROM context_events
564 WHERE workspace_id = ?1 AND channel_id = ?2 AND kind = ?3
565 ORDER BY id DESC
566 LIMIT ?4",
567 ).ok()?;
568 let rows = stmt
569 .query_map(
570 params![workspace_id, channel_id, kind, limit],
571 event_from_row,
572 )
573 .ok()?;
574 Some(rows.flatten().collect::<Vec<_>>())
575 })();
576 self.inner.return_read_conn(conn);
577 result.unwrap_or_default()
578 }
579
580 pub fn search(
582 &self,
583 workspace_id: &str,
584 channel_id: Option<&str>,
585 query: &str,
586 limit: usize,
587 ) -> Vec<ContextEventV1> {
588 let limit = limit.clamp(1, 100) as i64;
589 let conn = self.inner.take_read_conn();
590 let result =
591 if let Some(ch) = channel_id {
592 (|| {
593 let mut stmt = conn.prepare(
594 "SELECT e.id, e.workspace_id, e.channel_id, e.kind, e.actor, e.timestamp,
595 e.payload_json, e.version, e.parent_id
596 FROM context_events e
597 JOIN context_events_fts f ON e.id = f.rowid
598 WHERE f.payload_text MATCH ?1 AND e.workspace_id = ?2 AND e.channel_id = ?3
599 ORDER BY f.rank
600 LIMIT ?4",
601 ).ok()?;
602 let rows = stmt
603 .query_map(params![query, workspace_id, ch, limit], event_from_row)
604 .ok()?;
605 Some(rows.flatten().collect::<Vec<_>>())
606 })()
607 } else {
608 (|| {
609 let mut stmt = conn.prepare(
610 "SELECT e.id, e.workspace_id, e.channel_id, e.kind, e.actor, e.timestamp,
611 e.payload_json, e.version, e.parent_id
612 FROM context_events e
613 JOIN context_events_fts f ON e.id = f.rowid
614 WHERE f.payload_text MATCH ?1 AND e.workspace_id = ?2
615 ORDER BY f.rank
616 LIMIT ?3",
617 ).ok()?;
618 let rows = stmt
619 .query_map(params![query, workspace_id, limit], event_from_row)
620 .ok()?;
621 Some(rows.flatten().collect::<Vec<_>>())
622 })()
623 };
624 self.inner.return_read_conn(conn);
625 result.unwrap_or_default()
626 }
627
628 pub fn lineage(
631 &self,
632 event_id: i64,
633 workspace_id: &str,
634 max_depth: usize,
635 ) -> Vec<ContextEventV1> {
636 let max_depth = max_depth.clamp(1, 50);
637 let conn = self.inner.take_read_conn();
638 let mut chain = Vec::new();
639 let mut current_id = Some(event_id);
640
641 for _ in 0..max_depth {
642 let Some(id) = current_id else {
643 break;
644 };
645 let ev = conn.query_row(
646 "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id
647 FROM context_events WHERE id = ?1 AND workspace_id = ?2",
648 params![id, workspace_id],
649 event_from_row,
650 );
651 match ev {
652 Ok(ev) => {
653 current_id = ev.parent_id;
654 chain.push(ev);
655 }
656 Err(_) => break,
657 }
658 }
659 self.inner.return_read_conn(conn);
660 chain
661 }
662
663 pub fn latest_id(&self, workspace_id: &str, channel_id: &str) -> i64 {
665 let conn = self.inner.take_read_conn();
666 let result = conn
667 .query_row(
668 "SELECT COALESCE(MAX(id), 0) FROM context_events WHERE workspace_id = ?1 AND channel_id = ?2",
669 params![workspace_id, channel_id],
670 |row| row.get(0),
671 )
672 .unwrap_or(0);
673 self.inner.return_read_conn(conn);
674 result
675 }
676}
677
678pub struct FilteredSubscription {
680 pub rx: broadcast::Receiver<ContextEventV1>,
681 pub filter: TopicFilter,
682}
683
684impl FilteredSubscription {
685 pub async fn recv_filtered(&mut self) -> Result<ContextEventV1, broadcast::error::RecvError> {
688 loop {
689 let ev = self.rx.recv().await?;
690 if self.filter.matches(&ev) {
691 return Ok(ev);
692 }
693 }
694 }
695}
696
697fn default_db_path() -> PathBuf {
698 let data = crate::core::data_dir::lean_ctx_data_dir().unwrap_or_else(|_| PathBuf::from("."));
699 data.join("context-os").join("context-os.db")
700}
701
702#[cfg(test)]
703mod tests {
704 use super::*;
705 use tempfile::tempdir;
706
707 fn test_bus() -> (ContextBus, tempfile::TempDir) {
708 let td = tempdir().expect("tempdir");
709 let bus = ContextBus::open_at(td.path().join("test-context-os.db"));
710 (bus, td)
711 }
712
713 #[test]
714 fn append_and_read_roundtrip() {
715 let (bus, _td) = test_bus();
716 let ev = bus
717 .append(
718 "ws",
719 "ch",
720 &ContextEventKindV1::ToolCallRecorded,
721 Some("agent"),
722 serde_json::json!({"tool":"ctx_read"}),
723 )
724 .expect("append");
725 let got = bus.read("ws", "ch", ev.id - 1, 10);
726 assert!(got.iter().any(|e| e.id == ev.id));
727 }
728
729 #[test]
730 fn multi_client_concurrent_appends_have_deterministic_ordering() {
731 let (bus, _td) = test_bus();
732 let bus = Arc::new(bus);
733 let n_clients = 5;
734 let n_events_per_client = 20;
735 let ws = format!("ws-concurrent-{}", std::process::id());
736 let ch = format!("ch-concurrent-{}", std::process::id());
737
738 let mut handles = vec![];
739 for client_idx in 0..n_clients {
740 let bus = Arc::clone(&bus);
741 let ws = ws.clone();
742 let ch = ch.clone();
743 handles.push(std::thread::spawn(move || {
744 let agent = format!("agent-{client_idx}");
745 for event_idx in 0..n_events_per_client {
746 bus.append(
747 &ws,
748 &ch,
749 &ContextEventKindV1::ToolCallRecorded,
750 Some(&agent),
751 serde_json::json!({"client": client_idx, "seq": event_idx}),
752 );
753 }
754 }));
755 }
756
757 for h in handles {
758 h.join().unwrap();
759 }
760
761 let all = bus.read(&ws, &ch, 0, 1000);
762 assert_eq!(
763 all.len(),
764 n_clients * n_events_per_client,
765 "all events should be persisted"
766 );
767
768 let ids: Vec<i64> = all.iter().map(|e| e.id).collect();
769 let mut sorted = ids.clone();
770 sorted.sort_unstable();
771 assert_eq!(ids, sorted, "events must be in strictly ascending ID order");
772
773 for win in ids.windows(2) {
774 assert!(
775 win[1] > win[0],
776 "IDs must be strictly monotonic (no gaps from concurrent access)"
777 );
778 }
779 }
780
781 #[test]
782 fn workspace_channel_isolation() {
783 let (bus, _td) = test_bus();
784 let pid = std::process::id();
785 let ws_a = format!("ws-iso-a-{pid}");
786 let ws_b = format!("ws-iso-b-{pid}");
787 let ws_c = format!("ws-iso-c-{pid}");
788 let ch1 = format!("ch-iso-1-{pid}");
789 let ch2 = format!("ch-iso-2-{pid}");
790
791 bus.append(
792 &ws_a,
793 &ch1,
794 &ContextEventKindV1::SessionMutated,
795 Some("agent-a"),
796 serde_json::json!({"ws":"a","ch":"1"}),
797 );
798 bus.append(
799 &ws_a,
800 &ch2,
801 &ContextEventKindV1::KnowledgeRemembered,
802 Some("agent-a"),
803 serde_json::json!({"ws":"a","ch":"2"}),
804 );
805 bus.append(
806 &ws_b,
807 &ch1,
808 &ContextEventKindV1::ArtifactStored,
809 Some("agent-b"),
810 serde_json::json!({"ws":"b","ch":"1"}),
811 );
812
813 let ws_a_ch_1 = bus.read(&ws_a, &ch1, 0, 100);
814 assert_eq!(ws_a_ch_1.len(), 1);
815 assert_eq!(ws_a_ch_1[0].kind, "session_mutated");
816
817 let ws_a_ch_2 = bus.read(&ws_a, &ch2, 0, 100);
818 assert_eq!(ws_a_ch_2.len(), 1);
819 assert_eq!(ws_a_ch_2[0].kind, "knowledge_remembered");
820
821 let ws_b_ch_1 = bus.read(&ws_b, &ch1, 0, 100);
822 assert_eq!(ws_b_ch_1.len(), 1);
823 assert_eq!(ws_b_ch_1[0].kind, "artifact_stored");
824
825 let ws_c_ch_1 = bus.read(&ws_c, &ch1, 0, 100);
826 assert!(ws_c_ch_1.is_empty(), "non-existent workspace returns empty");
827 }
828
829 #[test]
830 fn replay_from_cursor_returns_only_newer_events() {
831 let (bus, _td) = test_bus();
832 let pid = std::process::id();
833 let ws = &format!("ws-replay-{pid}");
834 let ch = &format!("ch-replay-{pid}");
835
836 let ev1 = bus
837 .append(
838 ws,
839 ch,
840 &ContextEventKindV1::ToolCallRecorded,
841 None,
842 serde_json::json!({"seq":1}),
843 )
844 .unwrap();
845 let ev2 = bus
846 .append(
847 ws,
848 ch,
849 &ContextEventKindV1::SessionMutated,
850 None,
851 serde_json::json!({"seq":2}),
852 )
853 .unwrap();
854 let _ev3 = bus
855 .append(
856 ws,
857 ch,
858 &ContextEventKindV1::GraphBuilt,
859 None,
860 serde_json::json!({"seq":3}),
861 )
862 .unwrap();
863
864 let from_cursor = bus.read(ws, ch, ev2.id, 100);
865 assert_eq!(from_cursor.len(), 1, "only events after cursor");
866 assert_eq!(from_cursor[0].kind, "graph_built");
867
868 let from_first = bus.read(ws, ch, ev1.id, 100);
869 assert_eq!(from_first.len(), 2, "events after first");
870
871 let from_zero = bus.read(ws, ch, 0, 100);
872 assert_eq!(from_zero.len(), 3, "all events from zero");
873 }
874
875 #[test]
876 fn broadcast_subscriber_receives_events() {
877 let (bus, _td) = test_bus();
878 let mut rx = bus.subscribe("ws", "ch").expect("subscribe should succeed");
879
880 let ev = bus
881 .append(
882 "ws",
883 "ch",
884 &ContextEventKindV1::ProofAdded,
885 Some("verifier"),
886 serde_json::json!({"proof":"hash"}),
887 )
888 .unwrap();
889
890 let received = rx.try_recv().expect("subscriber should receive event");
891 assert_eq!(received.id, ev.id);
892 assert_eq!(received.kind, "proof_added");
893 assert_eq!(received.actor.as_deref(), Some("verifier"));
894 }
895}