Skip to main content

lean_ctx/core/context_os/
context_bus.rs

1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::{Arc, Mutex};
4
5use chrono::{DateTime, Utc};
6use rusqlite::{params, Connection};
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use tokio::sync::broadcast;
10
11const MAX_READ_CONNS: usize = 4;
12
13#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
14#[serde(rename_all = "snake_case")]
15pub enum ContextEventKindV1 {
16    ToolCallRecorded,
17    SessionMutated,
18    KnowledgeRemembered,
19    ArtifactStored,
20    GraphBuilt,
21    ProofAdded,
22}
23
24impl ContextEventKindV1 {
25    pub fn as_str(&self) -> &'static str {
26        match self {
27            Self::ToolCallRecorded => "tool_call_recorded",
28            Self::SessionMutated => "session_mutated",
29            Self::KnowledgeRemembered => "knowledge_remembered",
30            Self::ArtifactStored => "artifact_stored",
31            Self::GraphBuilt => "graph_built",
32            Self::ProofAdded => "proof_added",
33        }
34    }
35
36    pub fn parse(s: &str) -> Self {
37        match s.trim().to_lowercase().as_str() {
38            "tool_call_recorded" => Self::ToolCallRecorded,
39            "session_mutated" => Self::SessionMutated,
40            "knowledge_remembered" => Self::KnowledgeRemembered,
41            "artifact_stored" => Self::ArtifactStored,
42            "graph_built" => Self::GraphBuilt,
43            "proof_added" => Self::ProofAdded,
44            other => {
45                tracing::warn!(
46                    "unknown ContextEventKind '{other}', defaulting to ToolCallRecorded"
47                );
48                Self::ToolCallRecorded
49            }
50        }
51    }
52
53    /// Classifies the consistency requirement for this event kind.
54    ///
55    /// - `Local`: Agent-local, never shared (tool reads, cache hits).
56    /// - `Eventual`: Broadcast via bus, other agents see it "soon" (knowledge, artifacts).
57    /// - `Strong`: Critical decisions that require acknowledgment before proceeding.
58    pub fn consistency_level(&self) -> ConsistencyLevel {
59        match self {
60            Self::ToolCallRecorded | Self::GraphBuilt => ConsistencyLevel::Local,
61            Self::KnowledgeRemembered | Self::ArtifactStored => ConsistencyLevel::Eventual,
62            Self::SessionMutated | Self::ProofAdded => ConsistencyLevel::Strong,
63        }
64    }
65}
66
67/// Consistency requirement for shared context events.
68/// Ordered from least to most strict for filtering comparisons.
69#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
70#[serde(rename_all = "snake_case")]
71pub enum ConsistencyLevel {
72    /// Agent-local, authoritative: session task, local cache, current file set.
73    Local = 0,
74    /// Shared, eventually consistent: knowledge facts, gotchas, artifact refs.
75    Eventual = 1,
76    /// Shared, strongly consistent: workspace config, critical decisions.
77    Strong = 2,
78}
79
80impl ConsistencyLevel {
81    pub fn as_str(&self) -> &'static str {
82        match self {
83            Self::Local => "local",
84            Self::Eventual => "eventual",
85            Self::Strong => "strong",
86        }
87    }
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91#[serde(rename_all = "camelCase")]
92pub struct ContextEventV1 {
93    pub id: i64,
94    pub workspace_id: String,
95    pub channel_id: String,
96    pub kind: String,
97    pub actor: Option<String>,
98    pub timestamp: DateTime<Utc>,
99    pub version: i64,
100    #[serde(skip_serializing_if = "Option::is_none")]
101    pub parent_id: Option<i64>,
102    pub consistency_level: String,
103    pub payload: Value,
104    #[serde(skip_serializing_if = "Option::is_none", default)]
105    pub target_agents: Option<Vec<String>>,
106}
107
108impl ContextEventV1 {
109    pub fn consistency(&self) -> ConsistencyLevel {
110        ContextEventKindV1::parse(&self.kind).consistency_level()
111    }
112
113    pub fn is_visible_to_agent(&self, agent_id: &str) -> bool {
114        match &self.target_agents {
115            None => true,
116            Some(targets) => targets.iter().any(|t| t == agent_id),
117        }
118    }
119}
120
121/// Filter for selective event subscriptions.
122/// All fields are optional; `None` means "accept all".
123#[derive(Debug, Clone, Default)]
124pub struct TopicFilter {
125    pub kinds: Option<Vec<ContextEventKindV1>>,
126    pub actors: Option<Vec<String>>,
127    pub min_consistency: Option<ConsistencyLevel>,
128    pub agent_id: Option<String>,
129}
130
131impl TopicFilter {
132    pub fn matches(&self, event: &ContextEventV1) -> bool {
133        if let Some(ref kinds) = self.kinds {
134            let parsed = ContextEventKindV1::parse(&event.kind);
135            if !kinds.contains(&parsed) {
136                return false;
137            }
138        }
139        if let Some(ref actors) = self.actors {
140            match &event.actor {
141                Some(actor) if actors.iter().any(|a| a == actor) => {}
142                Some(_) | None => return false,
143            }
144        }
145        if let Some(min) = self.min_consistency {
146            if event.consistency() < min {
147                return false;
148            }
149        }
150        if let Some(ref aid) = self.agent_id {
151            if !event.is_visible_to_agent(aid) {
152                return false;
153            }
154        }
155        true
156    }
157}
158
159fn event_from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<ContextEventV1> {
160    let ts_str: String = row.get(5)?;
161    let ts = DateTime::parse_from_rfc3339(&ts_str)
162        .map_or_else(|_| Utc::now(), |d| d.with_timezone(&Utc));
163    let payload_str: String = row.get(6)?;
164    let payload: Value = serde_json::from_str(&payload_str).unwrap_or(Value::Null);
165    let kind_str: String = row.get(3)?;
166    let cl = ContextEventKindV1::parse(&kind_str)
167        .consistency_level()
168        .as_str()
169        .to_string();
170    Ok(ContextEventV1 {
171        id: row.get(0)?,
172        workspace_id: row.get(1)?,
173        channel_id: row.get(2)?,
174        kind: kind_str,
175        actor: row.get::<_, Option<String>>(4)?,
176        timestamp: ts,
177        version: row.get::<_, i64>(7).unwrap_or(0),
178        parent_id: row.get::<_, Option<i64>>(8).ok().flatten(),
179        consistency_level: cl,
180        payload,
181        target_agents: None,
182    })
183}
184
185#[derive(Clone)]
186pub struct ContextBus {
187    inner: Arc<Inner>,
188}
189
190const STREAM_CHANNEL_SIZE: usize = 256;
191
192struct Inner {
193    write_conn: Mutex<Connection>,
194    read_pool: Mutex<Vec<Connection>>,
195    streams: Mutex<HashMap<String, broadcast::Sender<ContextEventV1>>>,
196    version_cache: Mutex<HashMap<String, i64>>,
197    db_path: PathBuf,
198}
199
200impl Inner {
201    fn open_read_conn(path: &PathBuf) -> Connection {
202        let conn = Connection::open(path).expect("open read context-os db");
203        let _ = conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA query_only=ON;");
204        conn
205    }
206
207    fn take_read_conn(&self) -> Connection {
208        self.read_pool
209            .lock()
210            .unwrap_or_else(std::sync::PoisonError::into_inner)
211            .pop()
212            .unwrap_or_else(|| Self::open_read_conn(&self.db_path))
213    }
214
215    fn return_read_conn(&self, conn: Connection) {
216        let mut pool = self
217            .read_pool
218            .lock()
219            .unwrap_or_else(std::sync::PoisonError::into_inner);
220        if pool.len() < MAX_READ_CONNS {
221            pool.push(conn);
222        }
223    }
224
225    fn stream_key(workspace_id: &str, channel_id: &str) -> String {
226        format!("{workspace_id}\0{channel_id}")
227    }
228
229    fn next_version(&self, workspace_id: &str, channel_id: &str) -> i64 {
230        let key = Self::stream_key(workspace_id, channel_id);
231
232        {
233            let mut cache = self
234                .version_cache
235                .lock()
236                .unwrap_or_else(std::sync::PoisonError::into_inner);
237            if let Some(v) = cache.get_mut(&key) {
238                *v += 1;
239                return *v;
240            }
241        }
242
243        let conn = self.take_read_conn();
244        let v: i64 = conn
245            .query_row(
246                "SELECT COALESCE(MAX(version), 0) FROM context_events WHERE workspace_id = ?1 AND channel_id = ?2",
247                params![workspace_id, channel_id],
248                |row| row.get(0),
249            )
250            .unwrap_or(0);
251        self.return_read_conn(conn);
252
253        let mut cache = self
254            .version_cache
255            .lock()
256            .unwrap_or_else(std::sync::PoisonError::into_inner);
257        let entry = cache.entry(key).or_insert(v);
258        *entry = (*entry).max(v) + 1;
259        *entry
260    }
261}
262
263impl Default for ContextBus {
264    fn default() -> Self {
265        Self::new()
266    }
267}
268
269impl ContextBus {
270    pub fn new() -> Self {
271        let path = default_db_path();
272        if let Some(parent) = path.parent() {
273            let _ = std::fs::create_dir_all(parent);
274        }
275        let conn = Connection::open(&path).expect("open context-os db");
276        conn.execute_batch(
277            "PRAGMA journal_mode=WAL;
278             CREATE TABLE IF NOT EXISTS context_events (
279               id INTEGER PRIMARY KEY AUTOINCREMENT,
280               workspace_id TEXT NOT NULL,
281               channel_id TEXT NOT NULL,
282               kind TEXT NOT NULL,
283               actor TEXT,
284               timestamp TEXT NOT NULL,
285               payload_json TEXT NOT NULL,
286               version INTEGER NOT NULL DEFAULT 0,
287               parent_id INTEGER
288             );
289             CREATE INDEX IF NOT EXISTS idx_context_events_stream
290               ON context_events(workspace_id, channel_id, id);",
291        )
292        .expect("init context-os db");
293
294        let _ = conn.execute_batch(
295            "ALTER TABLE context_events ADD COLUMN version INTEGER NOT NULL DEFAULT 0;",
296        );
297        let _ = conn.execute_batch("ALTER TABLE context_events ADD COLUMN parent_id INTEGER;");
298
299        let _ = conn.execute_batch(
300            "CREATE VIRTUAL TABLE IF NOT EXISTS context_events_fts USING fts5(
301               payload_text,
302               content=context_events,
303               content_rowid=id
304             );",
305        );
306
307        let mut read_conns = Vec::with_capacity(MAX_READ_CONNS);
308        for _ in 0..MAX_READ_CONNS {
309            read_conns.push(Inner::open_read_conn(&path));
310        }
311
312        Self {
313            inner: Arc::new(Inner {
314                write_conn: Mutex::new(conn),
315                read_pool: Mutex::new(read_conns),
316                streams: Mutex::new(HashMap::new()),
317                version_cache: Mutex::new(HashMap::new()),
318                db_path: path,
319            }),
320        }
321    }
322
323    pub fn subscribe(
324        &self,
325        workspace_id: &str,
326        channel_id: &str,
327    ) -> broadcast::Receiver<ContextEventV1> {
328        let key = Inner::stream_key(workspace_id, channel_id);
329        let mut streams = self
330            .inner
331            .streams
332            .lock()
333            .unwrap_or_else(std::sync::PoisonError::into_inner);
334        let tx = streams
335            .entry(key)
336            .or_insert_with(|| broadcast::channel(STREAM_CHANNEL_SIZE).0);
337        tx.subscribe()
338    }
339
340    /// Subscribe with a filter — only events matching the filter are delivered.
341    /// Returns `(Receiver, TopicFilter)` for use in filtered receive loops.
342    pub fn subscribe_filtered(
343        &self,
344        workspace_id: &str,
345        channel_id: &str,
346        filter: TopicFilter,
347    ) -> FilteredSubscription {
348        let rx = self.subscribe(workspace_id, channel_id);
349        FilteredSubscription { rx, filter }
350    }
351
352    pub fn append(
353        &self,
354        workspace_id: &str,
355        channel_id: &str,
356        kind: &ContextEventKindV1,
357        actor: Option<&str>,
358        payload: Value,
359    ) -> Option<ContextEventV1> {
360        self.append_with_parent(workspace_id, channel_id, kind, actor, payload, None)
361    }
362
363    pub fn append_with_parent(
364        &self,
365        workspace_id: &str,
366        channel_id: &str,
367        kind: &ContextEventKindV1,
368        actor: Option<&str>,
369        payload: Value,
370        parent_id: Option<i64>,
371    ) -> Option<ContextEventV1> {
372        let ev = self.insert_event(
373            workspace_id,
374            channel_id,
375            kind,
376            actor,
377            payload,
378            parent_id,
379            None,
380        )?;
381        self.broadcast_event(&ev);
382        Some(ev)
383    }
384
385    /// Append an event directed at specific agents only.
386    /// Only subscribers whose `TopicFilter.agent_id` matches a target will see it.
387    pub fn append_directed(
388        &self,
389        workspace_id: &str,
390        channel_id: &str,
391        kind: &ContextEventKindV1,
392        actor: Option<&str>,
393        payload: Value,
394        target_agents: Vec<String>,
395    ) -> Option<ContextEventV1> {
396        let ev = self.insert_event(
397            workspace_id,
398            channel_id,
399            kind,
400            actor,
401            payload,
402            None,
403            Some(target_agents),
404        )?;
405        self.broadcast_event(&ev);
406        Some(ev)
407    }
408
409    fn insert_event(
410        &self,
411        workspace_id: &str,
412        channel_id: &str,
413        kind: &ContextEventKindV1,
414        actor: Option<&str>,
415        payload: Value,
416        parent_id: Option<i64>,
417        target_agents: Option<Vec<String>>,
418    ) -> Option<ContextEventV1> {
419        let ts = Utc::now();
420        let payload_json = payload.to_string();
421
422        let (id, version) = {
423            let Ok(conn) = self.inner.write_conn.lock() else {
424                return None;
425            };
426            let version = self.inner.next_version(workspace_id, channel_id);
427
428            let result: Result<(i64, i64), rusqlite::Error> = conn
429                .execute_batch("BEGIN IMMEDIATE")
430                .and_then(|()| {
431                    conn.execute(
432                        "INSERT INTO context_events (workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id)
433                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
434                        params![
435                            workspace_id,
436                            channel_id,
437                            kind.as_str(),
438                            actor.map(str::to_string),
439                            ts.to_rfc3339(),
440                            payload_json,
441                            version,
442                            parent_id,
443                        ],
444                    )?;
445                    let rowid = conn.last_insert_rowid();
446                    if let Err(e) = conn.execute(
447                        "INSERT INTO context_events_fts(rowid, payload_text) VALUES (?1, ?2)",
448                        params![rowid, payload_json],
449                    ) {
450                        tracing::warn!("FTS insert failed for event {rowid}: {e}");
451                    }
452                    conn.execute_batch("COMMIT")?;
453                    Ok((rowid, version))
454                });
455
456            match result {
457                Ok(pair) => pair,
458                Err(e) => {
459                    tracing::warn!("context bus append failed: {e}");
460                    let _ = conn.execute_batch("ROLLBACK");
461                    return None;
462                }
463            }
464        };
465
466        Some(ContextEventV1 {
467            id,
468            workspace_id: workspace_id.to_string(),
469            channel_id: channel_id.to_string(),
470            consistency_level: kind.consistency_level().as_str().to_string(),
471            kind: kind.as_str().to_string(),
472            actor: actor.map(str::to_string),
473            timestamp: ts,
474            version,
475            parent_id,
476            payload,
477            target_agents,
478        })
479    }
480
481    fn broadcast_event(&self, ev: &ContextEventV1) {
482        let key = Inner::stream_key(&ev.workspace_id, &ev.channel_id);
483        let tx = self
484            .inner
485            .streams
486            .lock()
487            .unwrap_or_else(std::sync::PoisonError::into_inner)
488            .get(&key)
489            .cloned();
490        if let Some(tx) = tx {
491            let _ = tx.send(ev.clone());
492        }
493    }
494
495    pub fn read(
496        &self,
497        workspace_id: &str,
498        channel_id: &str,
499        since: i64,
500        limit: usize,
501    ) -> Vec<ContextEventV1> {
502        let limit = limit.clamp(1, 1000) as i64;
503        let conn = self.inner.take_read_conn();
504        let result = (|| {
505            let mut stmt = conn.prepare(
506                "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id
507                 FROM context_events
508                 WHERE workspace_id = ?1 AND channel_id = ?2 AND id > ?3
509                 ORDER BY id ASC
510                 LIMIT ?4",
511            ).ok()?;
512            let rows = stmt
513                .query_map(
514                    params![workspace_id, channel_id, since, limit],
515                    event_from_row,
516                )
517                .ok()?;
518            Some(rows.flatten().collect::<Vec<_>>())
519        })();
520        self.inner.return_read_conn(conn);
521        result.unwrap_or_default()
522    }
523
524    /// Query recent events of a specific kind (for conflict detection).
525    pub fn recent_by_kind(
526        &self,
527        workspace_id: &str,
528        channel_id: &str,
529        kind: &str,
530        limit: usize,
531    ) -> Vec<ContextEventV1> {
532        let limit = limit.clamp(1, 100) as i64;
533        let conn = self.inner.take_read_conn();
534        let result = (|| {
535            let mut stmt = conn.prepare(
536                "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id
537                 FROM context_events
538                 WHERE workspace_id = ?1 AND channel_id = ?2 AND kind = ?3
539                 ORDER BY id DESC
540                 LIMIT ?4",
541            ).ok()?;
542            let rows = stmt
543                .query_map(
544                    params![workspace_id, channel_id, kind, limit],
545                    event_from_row,
546                )
547                .ok()?;
548            Some(rows.flatten().collect::<Vec<_>>())
549        })();
550        self.inner.return_read_conn(conn);
551        result.unwrap_or_default()
552    }
553
554    /// Full-text search over event payloads via FTS5.
555    pub fn search(
556        &self,
557        workspace_id: &str,
558        channel_id: Option<&str>,
559        query: &str,
560        limit: usize,
561    ) -> Vec<ContextEventV1> {
562        let limit = limit.clamp(1, 100) as i64;
563        let conn = self.inner.take_read_conn();
564        let result =
565            if let Some(ch) = channel_id {
566                (|| {
567                    let mut stmt = conn.prepare(
568                    "SELECT e.id, e.workspace_id, e.channel_id, e.kind, e.actor, e.timestamp,
569                            e.payload_json, e.version, e.parent_id
570                     FROM context_events e
571                     JOIN context_events_fts f ON e.id = f.rowid
572                     WHERE f.payload_text MATCH ?1 AND e.workspace_id = ?2 AND e.channel_id = ?3
573                     ORDER BY f.rank
574                     LIMIT ?4",
575                ).ok()?;
576                    let rows = stmt
577                        .query_map(params![query, workspace_id, ch, limit], event_from_row)
578                        .ok()?;
579                    Some(rows.flatten().collect::<Vec<_>>())
580                })()
581            } else {
582                (|| {
583                    let mut stmt = conn.prepare(
584                    "SELECT e.id, e.workspace_id, e.channel_id, e.kind, e.actor, e.timestamp,
585                            e.payload_json, e.version, e.parent_id
586                     FROM context_events e
587                     JOIN context_events_fts f ON e.id = f.rowid
588                     WHERE f.payload_text MATCH ?1 AND e.workspace_id = ?2
589                     ORDER BY f.rank
590                     LIMIT ?3",
591                ).ok()?;
592                    let rows = stmt
593                        .query_map(params![query, workspace_id, limit], event_from_row)
594                        .ok()?;
595                    Some(rows.flatten().collect::<Vec<_>>())
596                })()
597            };
598        self.inner.return_read_conn(conn);
599        result.unwrap_or_default()
600    }
601
602    /// Trace the causal lineage of an event by following parent_id chains.
603    /// Only returns events belonging to the given workspace (tenant isolation).
604    pub fn lineage(
605        &self,
606        event_id: i64,
607        workspace_id: &str,
608        max_depth: usize,
609    ) -> Vec<ContextEventV1> {
610        let max_depth = max_depth.clamp(1, 50);
611        let conn = self.inner.take_read_conn();
612        let mut chain = Vec::new();
613        let mut current_id = Some(event_id);
614
615        for _ in 0..max_depth {
616            let Some(id) = current_id else {
617                break;
618            };
619            let ev = conn.query_row(
620                "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id
621                 FROM context_events WHERE id = ?1 AND workspace_id = ?2",
622                params![id, workspace_id],
623                event_from_row,
624            );
625            match ev {
626                Ok(ev) => {
627                    current_id = ev.parent_id;
628                    chain.push(ev);
629                }
630                Err(_) => break,
631            }
632        }
633        self.inner.return_read_conn(conn);
634        chain
635    }
636
637    /// Returns the highest event id for a workspace/channel pair, or 0 if none.
638    pub fn latest_id(&self, workspace_id: &str, channel_id: &str) -> i64 {
639        let conn = self.inner.take_read_conn();
640        let result = conn
641            .query_row(
642                "SELECT COALESCE(MAX(id), 0) FROM context_events WHERE workspace_id = ?1 AND channel_id = ?2",
643                params![workspace_id, channel_id],
644                |row| row.get(0),
645            )
646            .unwrap_or(0);
647        self.inner.return_read_conn(conn);
648        result
649    }
650}
651
652/// A subscription wrapper that applies a [`TopicFilter`] to received events.
653pub struct FilteredSubscription {
654    pub rx: broadcast::Receiver<ContextEventV1>,
655    pub filter: TopicFilter,
656}
657
658impl FilteredSubscription {
659    /// Receive the next event that matches the filter.
660    /// Skips non-matching events silently.
661    pub async fn recv_filtered(&mut self) -> Result<ContextEventV1, broadcast::error::RecvError> {
662        loop {
663            let ev = self.rx.recv().await?;
664            if self.filter.matches(&ev) {
665                return Ok(ev);
666            }
667        }
668    }
669}
670
671fn default_db_path() -> PathBuf {
672    let data = crate::core::data_dir::lean_ctx_data_dir().unwrap_or_else(|_| PathBuf::from("."));
673    data.join("context-os").join("context-os.db")
674}
675
676#[cfg(test)]
677mod tests {
678    use super::*;
679
680    #[test]
681    fn append_and_read_roundtrip() {
682        let bus = ContextBus::new();
683        let ev = bus
684            .append(
685                "ws",
686                "ch",
687                &ContextEventKindV1::ToolCallRecorded,
688                Some("agent"),
689                serde_json::json!({"tool":"ctx_read"}),
690            )
691            .expect("append");
692        let got = bus.read("ws", "ch", ev.id - 1, 10);
693        assert!(got.iter().any(|e| e.id == ev.id));
694    }
695
696    #[test]
697    fn multi_client_concurrent_appends_have_deterministic_ordering() {
698        let bus = Arc::new(ContextBus::new());
699        let n_clients = 5;
700        let n_events_per_client = 20;
701        let ws = format!("ws-concurrent-{}", std::process::id());
702        let ch = format!("ch-concurrent-{}", std::process::id());
703
704        let mut handles = vec![];
705        for client_idx in 0..n_clients {
706            let bus = Arc::clone(&bus);
707            let ws = ws.clone();
708            let ch = ch.clone();
709            handles.push(std::thread::spawn(move || {
710                let agent = format!("agent-{client_idx}");
711                for event_idx in 0..n_events_per_client {
712                    bus.append(
713                        &ws,
714                        &ch,
715                        &ContextEventKindV1::ToolCallRecorded,
716                        Some(&agent),
717                        serde_json::json!({"client": client_idx, "seq": event_idx}),
718                    );
719                }
720            }));
721        }
722
723        for h in handles {
724            h.join().unwrap();
725        }
726
727        let all = bus.read(&ws, &ch, 0, 1000);
728        assert_eq!(
729            all.len(),
730            n_clients * n_events_per_client,
731            "all events should be persisted"
732        );
733
734        let ids: Vec<i64> = all.iter().map(|e| e.id).collect();
735        let mut sorted = ids.clone();
736        sorted.sort_unstable();
737        assert_eq!(ids, sorted, "events must be in strictly ascending ID order");
738
739        for win in ids.windows(2) {
740            assert!(
741                win[1] > win[0],
742                "IDs must be strictly monotonic (no gaps from concurrent access)"
743            );
744        }
745    }
746
747    #[test]
748    fn workspace_channel_isolation() {
749        let bus = ContextBus::new();
750        let pid = std::process::id();
751        let ws_a = format!("ws-iso-a-{pid}");
752        let ws_b = format!("ws-iso-b-{pid}");
753        let ws_c = format!("ws-iso-c-{pid}");
754        let ch1 = format!("ch-iso-1-{pid}");
755        let ch2 = format!("ch-iso-2-{pid}");
756
757        bus.append(
758            &ws_a,
759            &ch1,
760            &ContextEventKindV1::SessionMutated,
761            Some("agent-a"),
762            serde_json::json!({"ws":"a","ch":"1"}),
763        );
764        bus.append(
765            &ws_a,
766            &ch2,
767            &ContextEventKindV1::KnowledgeRemembered,
768            Some("agent-a"),
769            serde_json::json!({"ws":"a","ch":"2"}),
770        );
771        bus.append(
772            &ws_b,
773            &ch1,
774            &ContextEventKindV1::ArtifactStored,
775            Some("agent-b"),
776            serde_json::json!({"ws":"b","ch":"1"}),
777        );
778
779        let ws_a_ch_1 = bus.read(&ws_a, &ch1, 0, 100);
780        assert_eq!(ws_a_ch_1.len(), 1);
781        assert_eq!(ws_a_ch_1[0].kind, "session_mutated");
782
783        let ws_a_ch_2 = bus.read(&ws_a, &ch2, 0, 100);
784        assert_eq!(ws_a_ch_2.len(), 1);
785        assert_eq!(ws_a_ch_2[0].kind, "knowledge_remembered");
786
787        let ws_b_ch_1 = bus.read(&ws_b, &ch1, 0, 100);
788        assert_eq!(ws_b_ch_1.len(), 1);
789        assert_eq!(ws_b_ch_1[0].kind, "artifact_stored");
790
791        let ws_c_ch_1 = bus.read(&ws_c, &ch1, 0, 100);
792        assert!(ws_c_ch_1.is_empty(), "non-existent workspace returns empty");
793    }
794
795    #[test]
796    fn replay_from_cursor_returns_only_newer_events() {
797        let bus = ContextBus::new();
798        let pid = std::process::id();
799        let ws = &format!("ws-replay-{pid}");
800        let ch = &format!("ch-replay-{pid}");
801
802        let ev1 = bus
803            .append(
804                ws,
805                ch,
806                &ContextEventKindV1::ToolCallRecorded,
807                None,
808                serde_json::json!({"seq":1}),
809            )
810            .unwrap();
811        let ev2 = bus
812            .append(
813                ws,
814                ch,
815                &ContextEventKindV1::SessionMutated,
816                None,
817                serde_json::json!({"seq":2}),
818            )
819            .unwrap();
820        let _ev3 = bus
821            .append(
822                ws,
823                ch,
824                &ContextEventKindV1::GraphBuilt,
825                None,
826                serde_json::json!({"seq":3}),
827            )
828            .unwrap();
829
830        let from_cursor = bus.read(ws, ch, ev2.id, 100);
831        assert_eq!(from_cursor.len(), 1, "only events after cursor");
832        assert_eq!(from_cursor[0].kind, "graph_built");
833
834        let from_first = bus.read(ws, ch, ev1.id, 100);
835        assert_eq!(from_first.len(), 2, "events after first");
836
837        let from_zero = bus.read(ws, ch, 0, 100);
838        assert_eq!(from_zero.len(), 3, "all events from zero");
839    }
840
841    #[test]
842    fn broadcast_subscriber_receives_events() {
843        let bus = ContextBus::new();
844        let mut rx = bus.subscribe("ws", "ch");
845
846        let ev = bus
847            .append(
848                "ws",
849                "ch",
850                &ContextEventKindV1::ProofAdded,
851                Some("verifier"),
852                serde_json::json!({"proof":"hash"}),
853            )
854            .unwrap();
855
856        let received = rx.try_recv().expect("subscriber should receive event");
857        assert_eq!(received.id, ev.id);
858        assert_eq!(received.kind, "proof_added");
859        assert_eq!(received.actor.as_deref(), Some("verifier"));
860    }
861}