Skip to main content

lean_ctx/core/context_os/
context_bus.rs

1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::{Arc, Mutex};
4
5use chrono::{DateTime, Utc};
6use rusqlite::{params, Connection};
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use tokio::sync::broadcast;
10
11const MAX_READ_CONNS: usize = 4;
12
13#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
14#[serde(rename_all = "snake_case")]
15pub enum ContextEventKindV1 {
16    ToolCallRecorded,
17    SessionMutated,
18    KnowledgeRemembered,
19    ArtifactStored,
20    GraphBuilt,
21    ProofAdded,
22}
23
24impl ContextEventKindV1 {
25    pub fn as_str(&self) -> &'static str {
26        match self {
27            Self::ToolCallRecorded => "tool_call_recorded",
28            Self::SessionMutated => "session_mutated",
29            Self::KnowledgeRemembered => "knowledge_remembered",
30            Self::ArtifactStored => "artifact_stored",
31            Self::GraphBuilt => "graph_built",
32            Self::ProofAdded => "proof_added",
33        }
34    }
35
36    pub fn parse(s: &str) -> Self {
37        match s.trim().to_lowercase().as_str() {
38            "tool_call_recorded" => Self::ToolCallRecorded,
39            "session_mutated" => Self::SessionMutated,
40            "knowledge_remembered" => Self::KnowledgeRemembered,
41            "artifact_stored" => Self::ArtifactStored,
42            "graph_built" => Self::GraphBuilt,
43            "proof_added" => Self::ProofAdded,
44            other => {
45                tracing::warn!(
46                    "unknown ContextEventKind '{other}', defaulting to ToolCallRecorded"
47                );
48                Self::ToolCallRecorded
49            }
50        }
51    }
52
53    /// Classifies the consistency requirement for this event kind.
54    ///
55    /// - `Local`: Agent-local, never shared (tool reads, cache hits).
56    /// - `Eventual`: Broadcast via bus, other agents see it "soon" (knowledge, artifacts).
57    /// - `Strong`: Critical decisions that require acknowledgment before proceeding.
58    pub fn consistency_level(&self) -> ConsistencyLevel {
59        match self {
60            Self::ToolCallRecorded | Self::GraphBuilt => ConsistencyLevel::Local,
61            Self::KnowledgeRemembered | Self::ArtifactStored => ConsistencyLevel::Eventual,
62            Self::SessionMutated | Self::ProofAdded => ConsistencyLevel::Strong,
63        }
64    }
65}
66
67/// Consistency requirement for shared context events.
68/// Ordered from least to most strict for filtering comparisons.
69#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
70#[serde(rename_all = "snake_case")]
71pub enum ConsistencyLevel {
72    /// Agent-local, authoritative: session task, local cache, current file set.
73    Local = 0,
74    /// Shared, eventually consistent: knowledge facts, gotchas, artifact refs.
75    Eventual = 1,
76    /// Shared, strongly consistent: workspace config, critical decisions.
77    Strong = 2,
78}
79
80impl ConsistencyLevel {
81    pub fn as_str(&self) -> &'static str {
82        match self {
83            Self::Local => "local",
84            Self::Eventual => "eventual",
85            Self::Strong => "strong",
86        }
87    }
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91#[serde(rename_all = "camelCase")]
92pub struct ContextEventV1 {
93    pub id: i64,
94    pub workspace_id: String,
95    pub channel_id: String,
96    pub kind: String,
97    pub actor: Option<String>,
98    pub timestamp: DateTime<Utc>,
99    pub version: i64,
100    #[serde(skip_serializing_if = "Option::is_none")]
101    pub parent_id: Option<i64>,
102    pub consistency_level: String,
103    pub payload: Value,
104    #[serde(skip_serializing_if = "Option::is_none", default)]
105    pub target_agents: Option<Vec<String>>,
106}
107
108impl ContextEventV1 {
109    pub fn consistency(&self) -> ConsistencyLevel {
110        ContextEventKindV1::parse(&self.kind).consistency_level()
111    }
112
113    pub fn is_visible_to_agent(&self, agent_id: &str) -> bool {
114        match &self.target_agents {
115            None => true,
116            Some(targets) => targets.iter().any(|t| t == agent_id),
117        }
118    }
119}
120
121/// Filter for selective event subscriptions.
122/// All fields are optional; `None` means "accept all".
123#[derive(Debug, Clone, Default)]
124pub struct TopicFilter {
125    pub kinds: Option<Vec<ContextEventKindV1>>,
126    pub actors: Option<Vec<String>>,
127    pub min_consistency: Option<ConsistencyLevel>,
128    pub agent_id: Option<String>,
129}
130
131impl TopicFilter {
132    /// Convenience constructor: filter by event kind strings.
133    pub fn kinds(kind_strs: &[&str]) -> Self {
134        Self {
135            kinds: Some(
136                kind_strs
137                    .iter()
138                    .map(|s| ContextEventKindV1::parse(s))
139                    .collect(),
140            ),
141            ..Self::default()
142        }
143    }
144
145    pub fn matches(&self, event: &ContextEventV1) -> bool {
146        if let Some(ref kinds) = self.kinds {
147            let parsed = ContextEventKindV1::parse(&event.kind);
148            if !kinds.contains(&parsed) {
149                return false;
150            }
151        }
152        if let Some(ref actors) = self.actors {
153            match &event.actor {
154                Some(actor) if actors.iter().any(|a| a == actor) => {}
155                Some(_) | None => return false,
156            }
157        }
158        if let Some(min) = self.min_consistency {
159            if event.consistency() < min {
160                return false;
161            }
162        }
163        if let Some(ref aid) = self.agent_id {
164            if !event.is_visible_to_agent(aid) {
165                return false;
166            }
167        }
168        true
169    }
170}
171
172fn event_from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<ContextEventV1> {
173    let ts_str: String = row.get(5)?;
174    let ts = DateTime::parse_from_rfc3339(&ts_str)
175        .map_or_else(|_| Utc::now(), |d| d.with_timezone(&Utc));
176    let payload_str: String = row.get(6)?;
177    let payload: Value = serde_json::from_str(&payload_str).unwrap_or(Value::Null);
178    let kind_str: String = row.get(3)?;
179    let cl = ContextEventKindV1::parse(&kind_str)
180        .consistency_level()
181        .as_str()
182        .to_string();
183    Ok(ContextEventV1 {
184        id: row.get(0)?,
185        workspace_id: row.get(1)?,
186        channel_id: row.get(2)?,
187        kind: kind_str,
188        actor: row.get::<_, Option<String>>(4)?,
189        timestamp: ts,
190        version: row.get::<_, i64>(7).unwrap_or(0),
191        parent_id: row.get::<_, Option<i64>>(8).ok().flatten(),
192        consistency_level: cl,
193        payload,
194        target_agents: None,
195    })
196}
197
198#[derive(Clone)]
199pub struct ContextBus {
200    inner: Arc<Inner>,
201}
202
203const STREAM_CHANNEL_SIZE: usize = 256;
204
205struct Inner {
206    write_conn: Mutex<Connection>,
207    read_pool: Mutex<Vec<Connection>>,
208    streams: Mutex<HashMap<String, broadcast::Sender<ContextEventV1>>>,
209    version_cache: Mutex<HashMap<String, i64>>,
210    db_path: PathBuf,
211}
212
213impl Inner {
214    fn open_read_conn(path: &PathBuf) -> Connection {
215        let conn = Connection::open(path).expect("open read context-os db");
216        let _ = conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA query_only=ON;");
217        conn
218    }
219
220    fn take_read_conn(&self) -> Connection {
221        self.read_pool
222            .lock()
223            .unwrap_or_else(std::sync::PoisonError::into_inner)
224            .pop()
225            .unwrap_or_else(|| Self::open_read_conn(&self.db_path))
226    }
227
228    fn return_read_conn(&self, conn: Connection) {
229        let mut pool = self
230            .read_pool
231            .lock()
232            .unwrap_or_else(std::sync::PoisonError::into_inner);
233        if pool.len() < MAX_READ_CONNS {
234            pool.push(conn);
235        }
236    }
237
238    fn stream_key(workspace_id: &str, channel_id: &str) -> String {
239        format!("{workspace_id}\0{channel_id}")
240    }
241
242    fn next_version(&self, workspace_id: &str, channel_id: &str) -> i64 {
243        let key = Self::stream_key(workspace_id, channel_id);
244
245        {
246            let mut cache = self
247                .version_cache
248                .lock()
249                .unwrap_or_else(std::sync::PoisonError::into_inner);
250            if let Some(v) = cache.get_mut(&key) {
251                *v += 1;
252                return *v;
253            }
254        }
255
256        let conn = self.take_read_conn();
257        let v: i64 = conn
258            .query_row(
259                "SELECT COALESCE(MAX(version), 0) FROM context_events WHERE workspace_id = ?1 AND channel_id = ?2",
260                params![workspace_id, channel_id],
261                |row| row.get(0),
262            )
263            .unwrap_or(0);
264        self.return_read_conn(conn);
265
266        let mut cache = self
267            .version_cache
268            .lock()
269            .unwrap_or_else(std::sync::PoisonError::into_inner);
270        let entry = cache.entry(key).or_insert(v);
271        *entry = (*entry).max(v) + 1;
272        *entry
273    }
274}
275
276impl Default for ContextBus {
277    fn default() -> Self {
278        Self::new()
279    }
280}
281
282impl ContextBus {
283    pub fn new() -> Self {
284        let path = default_db_path();
285        if let Some(parent) = path.parent() {
286            let _ = std::fs::create_dir_all(parent);
287        }
288        let conn = Connection::open(&path).expect("open context-os db");
289        conn.execute_batch(
290            "PRAGMA journal_mode=WAL;
291             CREATE TABLE IF NOT EXISTS context_events (
292               id INTEGER PRIMARY KEY AUTOINCREMENT,
293               workspace_id TEXT NOT NULL,
294               channel_id TEXT NOT NULL,
295               kind TEXT NOT NULL,
296               actor TEXT,
297               timestamp TEXT NOT NULL,
298               payload_json TEXT NOT NULL,
299               version INTEGER NOT NULL DEFAULT 0,
300               parent_id INTEGER
301             );
302             CREATE INDEX IF NOT EXISTS idx_context_events_stream
303               ON context_events(workspace_id, channel_id, id);",
304        )
305        .expect("init context-os db");
306
307        let _ = conn.execute_batch(
308            "ALTER TABLE context_events ADD COLUMN version INTEGER NOT NULL DEFAULT 0;",
309        );
310        let _ = conn.execute_batch("ALTER TABLE context_events ADD COLUMN parent_id INTEGER;");
311
312        let _ = conn.execute_batch(
313            "CREATE VIRTUAL TABLE IF NOT EXISTS context_events_fts USING fts5(
314               payload_text,
315               content=context_events,
316               content_rowid=id
317             );",
318        );
319
320        let mut read_conns = Vec::with_capacity(MAX_READ_CONNS);
321        for _ in 0..MAX_READ_CONNS {
322            read_conns.push(Inner::open_read_conn(&path));
323        }
324
325        Self {
326            inner: Arc::new(Inner {
327                write_conn: Mutex::new(conn),
328                read_pool: Mutex::new(read_conns),
329                streams: Mutex::new(HashMap::new()),
330                version_cache: Mutex::new(HashMap::new()),
331                db_path: path,
332            }),
333        }
334    }
335
336    pub fn subscribe(
337        &self,
338        workspace_id: &str,
339        channel_id: &str,
340    ) -> broadcast::Receiver<ContextEventV1> {
341        let key = Inner::stream_key(workspace_id, channel_id);
342        let mut streams = self
343            .inner
344            .streams
345            .lock()
346            .unwrap_or_else(std::sync::PoisonError::into_inner);
347        let tx = streams
348            .entry(key)
349            .or_insert_with(|| broadcast::channel(STREAM_CHANNEL_SIZE).0);
350        tx.subscribe()
351    }
352
353    /// Subscribe with a filter — only events matching the filter are delivered.
354    /// Returns `(Receiver, TopicFilter)` for use in filtered receive loops.
355    pub fn subscribe_filtered(
356        &self,
357        workspace_id: &str,
358        channel_id: &str,
359        filter: TopicFilter,
360    ) -> FilteredSubscription {
361        let rx = self.subscribe(workspace_id, channel_id);
362        FilteredSubscription { rx, filter }
363    }
364
365    pub fn append(
366        &self,
367        workspace_id: &str,
368        channel_id: &str,
369        kind: &ContextEventKindV1,
370        actor: Option<&str>,
371        payload: Value,
372    ) -> Option<ContextEventV1> {
373        self.append_with_parent(workspace_id, channel_id, kind, actor, payload, None)
374    }
375
376    pub fn append_with_parent(
377        &self,
378        workspace_id: &str,
379        channel_id: &str,
380        kind: &ContextEventKindV1,
381        actor: Option<&str>,
382        payload: Value,
383        parent_id: Option<i64>,
384    ) -> Option<ContextEventV1> {
385        let ev = self.insert_event(
386            workspace_id,
387            channel_id,
388            kind,
389            actor,
390            payload,
391            parent_id,
392            None,
393        )?;
394        self.broadcast_event(&ev);
395        Some(ev)
396    }
397
398    /// Append an event directed at specific agents only.
399    /// Only subscribers whose `TopicFilter.agent_id` matches a target will see it.
400    pub fn append_directed(
401        &self,
402        workspace_id: &str,
403        channel_id: &str,
404        kind: &ContextEventKindV1,
405        actor: Option<&str>,
406        payload: Value,
407        target_agents: Vec<String>,
408    ) -> Option<ContextEventV1> {
409        let ev = self.insert_event(
410            workspace_id,
411            channel_id,
412            kind,
413            actor,
414            payload,
415            None,
416            Some(target_agents),
417        )?;
418        self.broadcast_event(&ev);
419        Some(ev)
420    }
421
422    fn insert_event(
423        &self,
424        workspace_id: &str,
425        channel_id: &str,
426        kind: &ContextEventKindV1,
427        actor: Option<&str>,
428        payload: Value,
429        parent_id: Option<i64>,
430        target_agents: Option<Vec<String>>,
431    ) -> Option<ContextEventV1> {
432        let ts = Utc::now();
433        let payload_json = payload.to_string();
434
435        let (id, version) = {
436            let Ok(conn) = self.inner.write_conn.lock() else {
437                return None;
438            };
439            let version = self.inner.next_version(workspace_id, channel_id);
440
441            let result: Result<(i64, i64), rusqlite::Error> = conn
442                .execute_batch("BEGIN IMMEDIATE")
443                .and_then(|()| {
444                    conn.execute(
445                        "INSERT INTO context_events (workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id)
446                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
447                        params![
448                            workspace_id,
449                            channel_id,
450                            kind.as_str(),
451                            actor.map(str::to_string),
452                            ts.to_rfc3339(),
453                            payload_json,
454                            version,
455                            parent_id,
456                        ],
457                    )?;
458                    let rowid = conn.last_insert_rowid();
459                    if let Err(e) = conn.execute(
460                        "INSERT INTO context_events_fts(rowid, payload_text) VALUES (?1, ?2)",
461                        params![rowid, payload_json],
462                    ) {
463                        tracing::warn!("FTS insert failed for event {rowid}: {e}");
464                    }
465                    conn.execute_batch("COMMIT")?;
466                    Ok((rowid, version))
467                });
468
469            match result {
470                Ok(pair) => pair,
471                Err(e) => {
472                    tracing::warn!("context bus append failed: {e}");
473                    let _ = conn.execute_batch("ROLLBACK");
474                    return None;
475                }
476            }
477        };
478
479        Some(ContextEventV1 {
480            id,
481            workspace_id: workspace_id.to_string(),
482            channel_id: channel_id.to_string(),
483            consistency_level: kind.consistency_level().as_str().to_string(),
484            kind: kind.as_str().to_string(),
485            actor: actor.map(str::to_string),
486            timestamp: ts,
487            version,
488            parent_id,
489            payload,
490            target_agents,
491        })
492    }
493
494    fn broadcast_event(&self, ev: &ContextEventV1) {
495        let key = Inner::stream_key(&ev.workspace_id, &ev.channel_id);
496        let tx = self
497            .inner
498            .streams
499            .lock()
500            .unwrap_or_else(std::sync::PoisonError::into_inner)
501            .get(&key)
502            .cloned();
503        if let Some(tx) = tx {
504            let _ = tx.send(ev.clone());
505        }
506    }
507
508    pub fn read(
509        &self,
510        workspace_id: &str,
511        channel_id: &str,
512        since: i64,
513        limit: usize,
514    ) -> Vec<ContextEventV1> {
515        let limit = limit.clamp(1, 1000) as i64;
516        let conn = self.inner.take_read_conn();
517        let result = (|| {
518            let mut stmt = conn.prepare(
519                "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id
520                 FROM context_events
521                 WHERE workspace_id = ?1 AND channel_id = ?2 AND id > ?3
522                 ORDER BY id ASC
523                 LIMIT ?4",
524            ).ok()?;
525            let rows = stmt
526                .query_map(
527                    params![workspace_id, channel_id, since, limit],
528                    event_from_row,
529                )
530                .ok()?;
531            Some(rows.flatten().collect::<Vec<_>>())
532        })();
533        self.inner.return_read_conn(conn);
534        result.unwrap_or_default()
535    }
536
537    /// Query recent events of a specific kind (for conflict detection).
538    pub fn recent_by_kind(
539        &self,
540        workspace_id: &str,
541        channel_id: &str,
542        kind: &str,
543        limit: usize,
544    ) -> Vec<ContextEventV1> {
545        let limit = limit.clamp(1, 100) as i64;
546        let conn = self.inner.take_read_conn();
547        let result = (|| {
548            let mut stmt = conn.prepare(
549                "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id
550                 FROM context_events
551                 WHERE workspace_id = ?1 AND channel_id = ?2 AND kind = ?3
552                 ORDER BY id DESC
553                 LIMIT ?4",
554            ).ok()?;
555            let rows = stmt
556                .query_map(
557                    params![workspace_id, channel_id, kind, limit],
558                    event_from_row,
559                )
560                .ok()?;
561            Some(rows.flatten().collect::<Vec<_>>())
562        })();
563        self.inner.return_read_conn(conn);
564        result.unwrap_or_default()
565    }
566
567    /// Full-text search over event payloads via FTS5.
568    pub fn search(
569        &self,
570        workspace_id: &str,
571        channel_id: Option<&str>,
572        query: &str,
573        limit: usize,
574    ) -> Vec<ContextEventV1> {
575        let limit = limit.clamp(1, 100) as i64;
576        let conn = self.inner.take_read_conn();
577        let result =
578            if let Some(ch) = channel_id {
579                (|| {
580                    let mut stmt = conn.prepare(
581                    "SELECT e.id, e.workspace_id, e.channel_id, e.kind, e.actor, e.timestamp,
582                            e.payload_json, e.version, e.parent_id
583                     FROM context_events e
584                     JOIN context_events_fts f ON e.id = f.rowid
585                     WHERE f.payload_text MATCH ?1 AND e.workspace_id = ?2 AND e.channel_id = ?3
586                     ORDER BY f.rank
587                     LIMIT ?4",
588                ).ok()?;
589                    let rows = stmt
590                        .query_map(params![query, workspace_id, ch, limit], event_from_row)
591                        .ok()?;
592                    Some(rows.flatten().collect::<Vec<_>>())
593                })()
594            } else {
595                (|| {
596                    let mut stmt = conn.prepare(
597                    "SELECT e.id, e.workspace_id, e.channel_id, e.kind, e.actor, e.timestamp,
598                            e.payload_json, e.version, e.parent_id
599                     FROM context_events e
600                     JOIN context_events_fts f ON e.id = f.rowid
601                     WHERE f.payload_text MATCH ?1 AND e.workspace_id = ?2
602                     ORDER BY f.rank
603                     LIMIT ?3",
604                ).ok()?;
605                    let rows = stmt
606                        .query_map(params![query, workspace_id, limit], event_from_row)
607                        .ok()?;
608                    Some(rows.flatten().collect::<Vec<_>>())
609                })()
610            };
611        self.inner.return_read_conn(conn);
612        result.unwrap_or_default()
613    }
614
615    /// Trace the causal lineage of an event by following parent_id chains.
616    /// Only returns events belonging to the given workspace (tenant isolation).
617    pub fn lineage(
618        &self,
619        event_id: i64,
620        workspace_id: &str,
621        max_depth: usize,
622    ) -> Vec<ContextEventV1> {
623        let max_depth = max_depth.clamp(1, 50);
624        let conn = self.inner.take_read_conn();
625        let mut chain = Vec::new();
626        let mut current_id = Some(event_id);
627
628        for _ in 0..max_depth {
629            let Some(id) = current_id else {
630                break;
631            };
632            let ev = conn.query_row(
633                "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id
634                 FROM context_events WHERE id = ?1 AND workspace_id = ?2",
635                params![id, workspace_id],
636                event_from_row,
637            );
638            match ev {
639                Ok(ev) => {
640                    current_id = ev.parent_id;
641                    chain.push(ev);
642                }
643                Err(_) => break,
644            }
645        }
646        self.inner.return_read_conn(conn);
647        chain
648    }
649
650    /// Returns the highest event id for a workspace/channel pair, or 0 if none.
651    pub fn latest_id(&self, workspace_id: &str, channel_id: &str) -> i64 {
652        let conn = self.inner.take_read_conn();
653        let result = conn
654            .query_row(
655                "SELECT COALESCE(MAX(id), 0) FROM context_events WHERE workspace_id = ?1 AND channel_id = ?2",
656                params![workspace_id, channel_id],
657                |row| row.get(0),
658            )
659            .unwrap_or(0);
660        self.inner.return_read_conn(conn);
661        result
662    }
663}
664
665/// A subscription wrapper that applies a [`TopicFilter`] to received events.
666pub struct FilteredSubscription {
667    pub rx: broadcast::Receiver<ContextEventV1>,
668    pub filter: TopicFilter,
669}
670
671impl FilteredSubscription {
672    /// Receive the next event that matches the filter.
673    /// Skips non-matching events silently.
674    pub async fn recv_filtered(&mut self) -> Result<ContextEventV1, broadcast::error::RecvError> {
675        loop {
676            let ev = self.rx.recv().await?;
677            if self.filter.matches(&ev) {
678                return Ok(ev);
679            }
680        }
681    }
682}
683
684fn default_db_path() -> PathBuf {
685    let data = crate::core::data_dir::lean_ctx_data_dir().unwrap_or_else(|_| PathBuf::from("."));
686    data.join("context-os").join("context-os.db")
687}
688
689#[cfg(test)]
690mod tests {
691    use super::*;
692
693    #[test]
694    fn append_and_read_roundtrip() {
695        let bus = ContextBus::new();
696        let ev = bus
697            .append(
698                "ws",
699                "ch",
700                &ContextEventKindV1::ToolCallRecorded,
701                Some("agent"),
702                serde_json::json!({"tool":"ctx_read"}),
703            )
704            .expect("append");
705        let got = bus.read("ws", "ch", ev.id - 1, 10);
706        assert!(got.iter().any(|e| e.id == ev.id));
707    }
708
709    #[test]
710    fn multi_client_concurrent_appends_have_deterministic_ordering() {
711        let bus = Arc::new(ContextBus::new());
712        let n_clients = 5;
713        let n_events_per_client = 20;
714        let ws = format!("ws-concurrent-{}", std::process::id());
715        let ch = format!("ch-concurrent-{}", std::process::id());
716
717        let mut handles = vec![];
718        for client_idx in 0..n_clients {
719            let bus = Arc::clone(&bus);
720            let ws = ws.clone();
721            let ch = ch.clone();
722            handles.push(std::thread::spawn(move || {
723                let agent = format!("agent-{client_idx}");
724                for event_idx in 0..n_events_per_client {
725                    bus.append(
726                        &ws,
727                        &ch,
728                        &ContextEventKindV1::ToolCallRecorded,
729                        Some(&agent),
730                        serde_json::json!({"client": client_idx, "seq": event_idx}),
731                    );
732                }
733            }));
734        }
735
736        for h in handles {
737            h.join().unwrap();
738        }
739
740        let all = bus.read(&ws, &ch, 0, 1000);
741        assert_eq!(
742            all.len(),
743            n_clients * n_events_per_client,
744            "all events should be persisted"
745        );
746
747        let ids: Vec<i64> = all.iter().map(|e| e.id).collect();
748        let mut sorted = ids.clone();
749        sorted.sort_unstable();
750        assert_eq!(ids, sorted, "events must be in strictly ascending ID order");
751
752        for win in ids.windows(2) {
753            assert!(
754                win[1] > win[0],
755                "IDs must be strictly monotonic (no gaps from concurrent access)"
756            );
757        }
758    }
759
760    #[test]
761    fn workspace_channel_isolation() {
762        let bus = ContextBus::new();
763        let pid = std::process::id();
764        let ws_a = format!("ws-iso-a-{pid}");
765        let ws_b = format!("ws-iso-b-{pid}");
766        let ws_c = format!("ws-iso-c-{pid}");
767        let ch1 = format!("ch-iso-1-{pid}");
768        let ch2 = format!("ch-iso-2-{pid}");
769
770        bus.append(
771            &ws_a,
772            &ch1,
773            &ContextEventKindV1::SessionMutated,
774            Some("agent-a"),
775            serde_json::json!({"ws":"a","ch":"1"}),
776        );
777        bus.append(
778            &ws_a,
779            &ch2,
780            &ContextEventKindV1::KnowledgeRemembered,
781            Some("agent-a"),
782            serde_json::json!({"ws":"a","ch":"2"}),
783        );
784        bus.append(
785            &ws_b,
786            &ch1,
787            &ContextEventKindV1::ArtifactStored,
788            Some("agent-b"),
789            serde_json::json!({"ws":"b","ch":"1"}),
790        );
791
792        let ws_a_ch_1 = bus.read(&ws_a, &ch1, 0, 100);
793        assert_eq!(ws_a_ch_1.len(), 1);
794        assert_eq!(ws_a_ch_1[0].kind, "session_mutated");
795
796        let ws_a_ch_2 = bus.read(&ws_a, &ch2, 0, 100);
797        assert_eq!(ws_a_ch_2.len(), 1);
798        assert_eq!(ws_a_ch_2[0].kind, "knowledge_remembered");
799
800        let ws_b_ch_1 = bus.read(&ws_b, &ch1, 0, 100);
801        assert_eq!(ws_b_ch_1.len(), 1);
802        assert_eq!(ws_b_ch_1[0].kind, "artifact_stored");
803
804        let ws_c_ch_1 = bus.read(&ws_c, &ch1, 0, 100);
805        assert!(ws_c_ch_1.is_empty(), "non-existent workspace returns empty");
806    }
807
808    #[test]
809    fn replay_from_cursor_returns_only_newer_events() {
810        let bus = ContextBus::new();
811        let pid = std::process::id();
812        let ws = &format!("ws-replay-{pid}");
813        let ch = &format!("ch-replay-{pid}");
814
815        let ev1 = bus
816            .append(
817                ws,
818                ch,
819                &ContextEventKindV1::ToolCallRecorded,
820                None,
821                serde_json::json!({"seq":1}),
822            )
823            .unwrap();
824        let ev2 = bus
825            .append(
826                ws,
827                ch,
828                &ContextEventKindV1::SessionMutated,
829                None,
830                serde_json::json!({"seq":2}),
831            )
832            .unwrap();
833        let _ev3 = bus
834            .append(
835                ws,
836                ch,
837                &ContextEventKindV1::GraphBuilt,
838                None,
839                serde_json::json!({"seq":3}),
840            )
841            .unwrap();
842
843        let from_cursor = bus.read(ws, ch, ev2.id, 100);
844        assert_eq!(from_cursor.len(), 1, "only events after cursor");
845        assert_eq!(from_cursor[0].kind, "graph_built");
846
847        let from_first = bus.read(ws, ch, ev1.id, 100);
848        assert_eq!(from_first.len(), 2, "events after first");
849
850        let from_zero = bus.read(ws, ch, 0, 100);
851        assert_eq!(from_zero.len(), 3, "all events from zero");
852    }
853
854    #[test]
855    fn broadcast_subscriber_receives_events() {
856        let bus = ContextBus::new();
857        let mut rx = bus.subscribe("ws", "ch");
858
859        let ev = bus
860            .append(
861                "ws",
862                "ch",
863                &ContextEventKindV1::ProofAdded,
864                Some("verifier"),
865                serde_json::json!({"proof":"hash"}),
866            )
867            .unwrap();
868
869        let received = rx.try_recv().expect("subscriber should receive event");
870        assert_eq!(received.id, ev.id);
871        assert_eq!(received.kind, "proof_added");
872        assert_eq!(received.actor.as_deref(), Some("verifier"));
873    }
874}