Skip to main content

lean_ctx/core/context_os/
context_bus.rs

1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::{Arc, Mutex};
4
5use chrono::{DateTime, Utc};
6use rusqlite::{params, Connection};
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use tokio::sync::broadcast;
10
11const MAX_READ_CONNS: usize = 4;
12
13#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
14#[serde(rename_all = "snake_case")]
15pub enum ContextEventKindV1 {
16    ToolCallRecorded,
17    SessionMutated,
18    KnowledgeRemembered,
19    ArtifactStored,
20    GraphBuilt,
21    ProofAdded,
22}
23
24impl ContextEventKindV1 {
25    pub fn as_str(&self) -> &'static str {
26        match self {
27            Self::ToolCallRecorded => "tool_call_recorded",
28            Self::SessionMutated => "session_mutated",
29            Self::KnowledgeRemembered => "knowledge_remembered",
30            Self::ArtifactStored => "artifact_stored",
31            Self::GraphBuilt => "graph_built",
32            Self::ProofAdded => "proof_added",
33        }
34    }
35
36    pub fn parse(s: &str) -> Self {
37        match s.trim().to_lowercase().as_str() {
38            "tool_call_recorded" => Self::ToolCallRecorded,
39            "session_mutated" => Self::SessionMutated,
40            "knowledge_remembered" => Self::KnowledgeRemembered,
41            "artifact_stored" => Self::ArtifactStored,
42            "graph_built" => Self::GraphBuilt,
43            "proof_added" => Self::ProofAdded,
44            other => {
45                tracing::warn!(
46                    "unknown ContextEventKind '{other}', defaulting to ToolCallRecorded"
47                );
48                Self::ToolCallRecorded
49            }
50        }
51    }
52
53    /// Classifies the consistency requirement for this event kind.
54    ///
55    /// - `Local`: Agent-local, never shared (tool reads, cache hits).
56    /// - `Eventual`: Broadcast via bus, other agents see it "soon" (knowledge, artifacts).
57    /// - `Strong`: Critical decisions that require acknowledgment before proceeding.
58    pub fn consistency_level(&self) -> ConsistencyLevel {
59        match self {
60            Self::ToolCallRecorded | Self::GraphBuilt => ConsistencyLevel::Local,
61            Self::KnowledgeRemembered | Self::ArtifactStored => ConsistencyLevel::Eventual,
62            Self::SessionMutated | Self::ProofAdded => ConsistencyLevel::Strong,
63        }
64    }
65}
66
67/// Consistency requirement for shared context events.
68/// Ordered from least to most strict for filtering comparisons.
69#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
70#[serde(rename_all = "snake_case")]
71pub enum ConsistencyLevel {
72    /// Agent-local, authoritative: session task, local cache, current file set.
73    Local = 0,
74    /// Shared, eventually consistent: knowledge facts, gotchas, artifact refs.
75    Eventual = 1,
76    /// Shared, strongly consistent: workspace config, critical decisions.
77    Strong = 2,
78}
79
80impl ConsistencyLevel {
81    pub fn as_str(&self) -> &'static str {
82        match self {
83            Self::Local => "local",
84            Self::Eventual => "eventual",
85            Self::Strong => "strong",
86        }
87    }
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91#[serde(rename_all = "camelCase")]
92pub struct ContextEventV1 {
93    pub id: i64,
94    pub workspace_id: String,
95    pub channel_id: String,
96    pub kind: String,
97    pub actor: Option<String>,
98    pub timestamp: DateTime<Utc>,
99    pub version: i64,
100    #[serde(skip_serializing_if = "Option::is_none")]
101    pub parent_id: Option<i64>,
102    pub consistency_level: String,
103    pub payload: Value,
104    #[serde(skip_serializing_if = "Option::is_none", default)]
105    pub target_agents: Option<Vec<String>>,
106}
107
108impl ContextEventV1 {
109    pub fn consistency(&self) -> ConsistencyLevel {
110        ContextEventKindV1::parse(&self.kind).consistency_level()
111    }
112
113    pub fn is_visible_to_agent(&self, agent_id: &str) -> bool {
114        match &self.target_agents {
115            None => true,
116            Some(targets) => targets.iter().any(|t| t == agent_id),
117        }
118    }
119}
120
121/// Filter for selective event subscriptions.
122/// All fields are optional; `None` means "accept all".
123#[derive(Debug, Clone, Default)]
124pub struct TopicFilter {
125    pub kinds: Option<Vec<ContextEventKindV1>>,
126    pub actors: Option<Vec<String>>,
127    pub min_consistency: Option<ConsistencyLevel>,
128    pub agent_id: Option<String>,
129}
130
131impl TopicFilter {
132    /// Convenience constructor: filter by event kind strings.
133    pub fn kinds(kind_strs: &[&str]) -> Self {
134        Self {
135            kinds: Some(
136                kind_strs
137                    .iter()
138                    .map(|s| ContextEventKindV1::parse(s))
139                    .collect(),
140            ),
141            ..Self::default()
142        }
143    }
144
145    pub fn matches(&self, event: &ContextEventV1) -> bool {
146        if let Some(ref kinds) = self.kinds {
147            let parsed = ContextEventKindV1::parse(&event.kind);
148            if !kinds.contains(&parsed) {
149                return false;
150            }
151        }
152        if let Some(ref actors) = self.actors {
153            match &event.actor {
154                Some(actor) if actors.iter().any(|a| a == actor) => {}
155                Some(_) | None => return false,
156            }
157        }
158        if let Some(min) = self.min_consistency {
159            if event.consistency() < min {
160                return false;
161            }
162        }
163        if let Some(ref aid) = self.agent_id {
164            if !event.is_visible_to_agent(aid) {
165                return false;
166            }
167        }
168        true
169    }
170}
171
172fn event_from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<ContextEventV1> {
173    let ts_str: String = row.get(5)?;
174    let ts = DateTime::parse_from_rfc3339(&ts_str)
175        .map_or_else(|_| Utc::now(), |d| d.with_timezone(&Utc));
176    let payload_str: String = row.get(6)?;
177    let payload: Value = serde_json::from_str(&payload_str).unwrap_or(Value::Null);
178    let kind_str: String = row.get(3)?;
179    let cl = ContextEventKindV1::parse(&kind_str)
180        .consistency_level()
181        .as_str()
182        .to_string();
183    Ok(ContextEventV1 {
184        id: row.get(0)?,
185        workspace_id: row.get(1)?,
186        channel_id: row.get(2)?,
187        kind: kind_str,
188        actor: row.get::<_, Option<String>>(4)?,
189        timestamp: ts,
190        version: row.get::<_, i64>(7).unwrap_or(0),
191        parent_id: row.get::<_, Option<i64>>(8).ok().flatten(),
192        consistency_level: cl,
193        payload,
194        target_agents: None,
195    })
196}
197
198#[derive(Clone)]
199pub struct ContextBus {
200    inner: Arc<Inner>,
201}
202
203const STREAM_CHANNEL_SIZE: usize = 256;
204const MAX_SUBSCRIBERS_PER_CHANNEL: usize = 64;
205
206struct Inner {
207    write_conn: Mutex<Connection>,
208    read_pool: Mutex<Vec<Connection>>,
209    streams: Mutex<HashMap<String, broadcast::Sender<ContextEventV1>>>,
210    version_cache: Mutex<HashMap<String, i64>>,
211    db_path: PathBuf,
212}
213
214impl Inner {
215    fn open_read_conn(path: &PathBuf) -> Connection {
216        let conn = Connection::open(path).expect("open read context-os db");
217        let _ = conn.busy_timeout(std::time::Duration::from_secs(5));
218        let _ = conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA query_only=ON;");
219        conn
220    }
221
222    fn take_read_conn(&self) -> Connection {
223        self.read_pool
224            .lock()
225            .unwrap_or_else(std::sync::PoisonError::into_inner)
226            .pop()
227            .unwrap_or_else(|| Self::open_read_conn(&self.db_path))
228    }
229
230    fn return_read_conn(&self, conn: Connection) {
231        let mut pool = self
232            .read_pool
233            .lock()
234            .unwrap_or_else(std::sync::PoisonError::into_inner);
235        if pool.len() < MAX_READ_CONNS {
236            pool.push(conn);
237        }
238    }
239
240    fn stream_key(workspace_id: &str, channel_id: &str) -> String {
241        format!("{workspace_id}\0{channel_id}")
242    }
243
244    fn next_version(&self, workspace_id: &str, channel_id: &str) -> i64 {
245        let key = Self::stream_key(workspace_id, channel_id);
246
247        {
248            let mut cache = self
249                .version_cache
250                .lock()
251                .unwrap_or_else(std::sync::PoisonError::into_inner);
252            if let Some(v) = cache.get_mut(&key) {
253                *v += 1;
254                return *v;
255            }
256        }
257
258        let conn = self.take_read_conn();
259        let v: i64 = conn
260            .query_row(
261                "SELECT COALESCE(MAX(version), 0) FROM context_events WHERE workspace_id = ?1 AND channel_id = ?2",
262                params![workspace_id, channel_id],
263                |row| row.get(0),
264            )
265            .unwrap_or(0);
266        self.return_read_conn(conn);
267
268        let mut cache = self
269            .version_cache
270            .lock()
271            .unwrap_or_else(std::sync::PoisonError::into_inner);
272        let entry = cache.entry(key).or_insert(v);
273        *entry = (*entry).max(v) + 1;
274        *entry
275    }
276}
277
278impl Default for ContextBus {
279    fn default() -> Self {
280        Self::new()
281    }
282}
283
284impl ContextBus {
285    pub fn new() -> Self {
286        let path = default_db_path();
287        Self::open_at(path)
288    }
289
290    fn open_at(path: PathBuf) -> Self {
291        if let Some(parent) = path.parent() {
292            let _ = std::fs::create_dir_all(parent);
293        }
294        let conn = Connection::open(&path).expect("open context-os db");
295        let _ = conn.busy_timeout(std::time::Duration::from_secs(5));
296        conn.execute_batch(
297            "PRAGMA journal_mode=WAL;
298             CREATE TABLE IF NOT EXISTS context_events (
299               id INTEGER PRIMARY KEY AUTOINCREMENT,
300               workspace_id TEXT NOT NULL,
301               channel_id TEXT NOT NULL,
302               kind TEXT NOT NULL,
303               actor TEXT,
304               timestamp TEXT NOT NULL,
305               payload_json TEXT NOT NULL,
306               version INTEGER NOT NULL DEFAULT 0,
307               parent_id INTEGER
308             );
309             CREATE INDEX IF NOT EXISTS idx_context_events_stream
310               ON context_events(workspace_id, channel_id, id);",
311        )
312        .expect("init context-os db");
313
314        let _ = conn.execute_batch(
315            "ALTER TABLE context_events ADD COLUMN version INTEGER NOT NULL DEFAULT 0;",
316        );
317        let _ = conn.execute_batch("ALTER TABLE context_events ADD COLUMN parent_id INTEGER;");
318
319        let _ = conn.execute_batch(
320            "CREATE VIRTUAL TABLE IF NOT EXISTS context_events_fts USING fts5(
321               payload_text,
322               content=context_events,
323               content_rowid=id
324             );",
325        );
326
327        let mut read_conns = Vec::with_capacity(MAX_READ_CONNS);
328        for _ in 0..MAX_READ_CONNS {
329            read_conns.push(Inner::open_read_conn(&path));
330        }
331
332        Self {
333            inner: Arc::new(Inner {
334                write_conn: Mutex::new(conn),
335                read_pool: Mutex::new(read_conns),
336                streams: Mutex::new(HashMap::new()),
337                version_cache: Mutex::new(HashMap::new()),
338                db_path: path,
339            }),
340        }
341    }
342
343    pub fn subscribe(
344        &self,
345        workspace_id: &str,
346        channel_id: &str,
347    ) -> Option<broadcast::Receiver<ContextEventV1>> {
348        let key = Inner::stream_key(workspace_id, channel_id);
349        let mut streams = self
350            .inner
351            .streams
352            .lock()
353            .unwrap_or_else(std::sync::PoisonError::into_inner);
354        let tx = streams
355            .entry(key)
356            .or_insert_with(|| broadcast::channel(STREAM_CHANNEL_SIZE).0);
357        if tx.receiver_count() >= MAX_SUBSCRIBERS_PER_CHANNEL {
358            tracing::warn!(
359                "SSE subscriber cap ({MAX_SUBSCRIBERS_PER_CHANNEL}) reached for {workspace_id}/{channel_id} — rejecting"
360            );
361            return None;
362        }
363        Some(tx.subscribe())
364    }
365
366    /// Subscribe with a filter — only events matching the filter are delivered.
367    /// Returns `(Receiver, TopicFilter)` for use in filtered receive loops.
368    pub fn subscribe_filtered(
369        &self,
370        workspace_id: &str,
371        channel_id: &str,
372        filter: TopicFilter,
373    ) -> Option<FilteredSubscription> {
374        let rx = self.subscribe(workspace_id, channel_id)?;
375        Some(FilteredSubscription { rx, filter })
376    }
377
378    pub fn append(
379        &self,
380        workspace_id: &str,
381        channel_id: &str,
382        kind: &ContextEventKindV1,
383        actor: Option<&str>,
384        payload: Value,
385    ) -> Option<ContextEventV1> {
386        self.append_with_parent(workspace_id, channel_id, kind, actor, payload, None)
387    }
388
389    pub fn append_with_parent(
390        &self,
391        workspace_id: &str,
392        channel_id: &str,
393        kind: &ContextEventKindV1,
394        actor: Option<&str>,
395        payload: Value,
396        parent_id: Option<i64>,
397    ) -> Option<ContextEventV1> {
398        let ev = self.insert_event(
399            workspace_id,
400            channel_id,
401            kind,
402            actor,
403            payload,
404            parent_id,
405            None,
406        )?;
407        self.broadcast_event(&ev);
408        Some(ev)
409    }
410
411    /// Append an event directed at specific agents only.
412    /// Only subscribers whose `TopicFilter.agent_id` matches a target will see it.
413    pub fn append_directed(
414        &self,
415        workspace_id: &str,
416        channel_id: &str,
417        kind: &ContextEventKindV1,
418        actor: Option<&str>,
419        payload: Value,
420        target_agents: Vec<String>,
421    ) -> Option<ContextEventV1> {
422        let ev = self.insert_event(
423            workspace_id,
424            channel_id,
425            kind,
426            actor,
427            payload,
428            None,
429            Some(target_agents),
430        )?;
431        self.broadcast_event(&ev);
432        Some(ev)
433    }
434
435    fn insert_event(
436        &self,
437        workspace_id: &str,
438        channel_id: &str,
439        kind: &ContextEventKindV1,
440        actor: Option<&str>,
441        payload: Value,
442        parent_id: Option<i64>,
443        target_agents: Option<Vec<String>>,
444    ) -> Option<ContextEventV1> {
445        let ts = Utc::now();
446        let payload_json = payload.to_string();
447
448        let (id, version) = {
449            let Ok(conn) = self.inner.write_conn.lock() else {
450                return None;
451            };
452            let version = self.inner.next_version(workspace_id, channel_id);
453
454            let result: Result<(i64, i64), rusqlite::Error> = conn
455                .execute_batch("BEGIN IMMEDIATE")
456                .and_then(|()| {
457                    conn.execute(
458                        "INSERT INTO context_events (workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id)
459                         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
460                        params![
461                            workspace_id,
462                            channel_id,
463                            kind.as_str(),
464                            actor.map(str::to_string),
465                            ts.to_rfc3339(),
466                            payload_json,
467                            version,
468                            parent_id,
469                        ],
470                    )?;
471                    let rowid = conn.last_insert_rowid();
472                    if let Err(e) = conn.execute(
473                        "INSERT INTO context_events_fts(rowid, payload_text) VALUES (?1, ?2)",
474                        params![rowid, payload_json],
475                    ) {
476                        tracing::warn!("FTS insert failed for event {rowid}: {e}");
477                    }
478                    conn.execute_batch("COMMIT")?;
479                    Ok((rowid, version))
480                });
481
482            match result {
483                Ok(pair) => pair,
484                Err(e) => {
485                    tracing::warn!("context bus append failed: {e}");
486                    let _ = conn.execute_batch("ROLLBACK");
487                    return None;
488                }
489            }
490        };
491
492        Some(ContextEventV1 {
493            id,
494            workspace_id: workspace_id.to_string(),
495            channel_id: channel_id.to_string(),
496            consistency_level: kind.consistency_level().as_str().to_string(),
497            kind: kind.as_str().to_string(),
498            actor: actor.map(str::to_string),
499            timestamp: ts,
500            version,
501            parent_id,
502            payload,
503            target_agents,
504        })
505    }
506
507    fn broadcast_event(&self, ev: &ContextEventV1) {
508        let key = Inner::stream_key(&ev.workspace_id, &ev.channel_id);
509        let tx = self
510            .inner
511            .streams
512            .lock()
513            .unwrap_or_else(std::sync::PoisonError::into_inner)
514            .get(&key)
515            .cloned();
516        if let Some(tx) = tx {
517            let _ = tx.send(ev.clone());
518        }
519    }
520
521    pub fn read(
522        &self,
523        workspace_id: &str,
524        channel_id: &str,
525        since: i64,
526        limit: usize,
527    ) -> Vec<ContextEventV1> {
528        let limit = limit.clamp(1, 1000) as i64;
529        let conn = self.inner.take_read_conn();
530        let result = (|| {
531            let mut stmt = conn.prepare(
532                "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id
533                 FROM context_events
534                 WHERE workspace_id = ?1 AND channel_id = ?2 AND id > ?3
535                 ORDER BY id ASC
536                 LIMIT ?4",
537            ).ok()?;
538            let rows = stmt
539                .query_map(
540                    params![workspace_id, channel_id, since, limit],
541                    event_from_row,
542                )
543                .ok()?;
544            Some(rows.flatten().collect::<Vec<_>>())
545        })();
546        self.inner.return_read_conn(conn);
547        result.unwrap_or_default()
548    }
549
550    /// Query recent events of a specific kind (for conflict detection).
551    pub fn recent_by_kind(
552        &self,
553        workspace_id: &str,
554        channel_id: &str,
555        kind: &str,
556        limit: usize,
557    ) -> Vec<ContextEventV1> {
558        let limit = limit.clamp(1, 100) as i64;
559        let conn = self.inner.take_read_conn();
560        let result = (|| {
561            let mut stmt = conn.prepare(
562                "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id
563                 FROM context_events
564                 WHERE workspace_id = ?1 AND channel_id = ?2 AND kind = ?3
565                 ORDER BY id DESC
566                 LIMIT ?4",
567            ).ok()?;
568            let rows = stmt
569                .query_map(
570                    params![workspace_id, channel_id, kind, limit],
571                    event_from_row,
572                )
573                .ok()?;
574            Some(rows.flatten().collect::<Vec<_>>())
575        })();
576        self.inner.return_read_conn(conn);
577        result.unwrap_or_default()
578    }
579
580    /// Full-text search over event payloads via FTS5.
581    pub fn search(
582        &self,
583        workspace_id: &str,
584        channel_id: Option<&str>,
585        query: &str,
586        limit: usize,
587    ) -> Vec<ContextEventV1> {
588        let limit = limit.clamp(1, 100) as i64;
589        let conn = self.inner.take_read_conn();
590        let result =
591            if let Some(ch) = channel_id {
592                (|| {
593                    let mut stmt = conn.prepare(
594                    "SELECT e.id, e.workspace_id, e.channel_id, e.kind, e.actor, e.timestamp,
595                            e.payload_json, e.version, e.parent_id
596                     FROM context_events e
597                     JOIN context_events_fts f ON e.id = f.rowid
598                     WHERE f.payload_text MATCH ?1 AND e.workspace_id = ?2 AND e.channel_id = ?3
599                     ORDER BY f.rank
600                     LIMIT ?4",
601                ).ok()?;
602                    let rows = stmt
603                        .query_map(params![query, workspace_id, ch, limit], event_from_row)
604                        .ok()?;
605                    Some(rows.flatten().collect::<Vec<_>>())
606                })()
607            } else {
608                (|| {
609                    let mut stmt = conn.prepare(
610                    "SELECT e.id, e.workspace_id, e.channel_id, e.kind, e.actor, e.timestamp,
611                            e.payload_json, e.version, e.parent_id
612                     FROM context_events e
613                     JOIN context_events_fts f ON e.id = f.rowid
614                     WHERE f.payload_text MATCH ?1 AND e.workspace_id = ?2
615                     ORDER BY f.rank
616                     LIMIT ?3",
617                ).ok()?;
618                    let rows = stmt
619                        .query_map(params![query, workspace_id, limit], event_from_row)
620                        .ok()?;
621                    Some(rows.flatten().collect::<Vec<_>>())
622                })()
623            };
624        self.inner.return_read_conn(conn);
625        result.unwrap_or_default()
626    }
627
628    /// Trace the causal lineage of an event by following parent_id chains.
629    /// Only returns events belonging to the given workspace (tenant isolation).
630    pub fn lineage(
631        &self,
632        event_id: i64,
633        workspace_id: &str,
634        max_depth: usize,
635    ) -> Vec<ContextEventV1> {
636        let max_depth = max_depth.clamp(1, 50);
637        let conn = self.inner.take_read_conn();
638        let mut chain = Vec::new();
639        let mut current_id = Some(event_id);
640
641        for _ in 0..max_depth {
642            let Some(id) = current_id else {
643                break;
644            };
645            let ev = conn.query_row(
646                "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id
647                 FROM context_events WHERE id = ?1 AND workspace_id = ?2",
648                params![id, workspace_id],
649                event_from_row,
650            );
651            match ev {
652                Ok(ev) => {
653                    current_id = ev.parent_id;
654                    chain.push(ev);
655                }
656                Err(_) => break,
657            }
658        }
659        self.inner.return_read_conn(conn);
660        chain
661    }
662
663    /// Returns the highest event id for a workspace/channel pair, or 0 if none.
664    pub fn latest_id(&self, workspace_id: &str, channel_id: &str) -> i64 {
665        let conn = self.inner.take_read_conn();
666        let result = conn
667            .query_row(
668                "SELECT COALESCE(MAX(id), 0) FROM context_events WHERE workspace_id = ?1 AND channel_id = ?2",
669                params![workspace_id, channel_id],
670                |row| row.get(0),
671            )
672            .unwrap_or(0);
673        self.inner.return_read_conn(conn);
674        result
675    }
676}
677
678/// A subscription wrapper that applies a [`TopicFilter`] to received events.
679pub struct FilteredSubscription {
680    pub rx: broadcast::Receiver<ContextEventV1>,
681    pub filter: TopicFilter,
682}
683
684impl FilteredSubscription {
685    /// Receive the next event that matches the filter.
686    /// Skips non-matching events silently.
687    pub async fn recv_filtered(&mut self) -> Result<ContextEventV1, broadcast::error::RecvError> {
688        loop {
689            let ev = self.rx.recv().await?;
690            if self.filter.matches(&ev) {
691                return Ok(ev);
692            }
693        }
694    }
695}
696
697fn default_db_path() -> PathBuf {
698    let data = crate::core::data_dir::lean_ctx_data_dir().unwrap_or_else(|_| PathBuf::from("."));
699    data.join("context-os").join("context-os.db")
700}
701
702#[cfg(test)]
703mod tests {
704    use super::*;
705    use tempfile::tempdir;
706
707    fn test_bus() -> (ContextBus, tempfile::TempDir) {
708        let td = tempdir().expect("tempdir");
709        let bus = ContextBus::open_at(td.path().join("test-context-os.db"));
710        (bus, td)
711    }
712
713    #[test]
714    fn append_and_read_roundtrip() {
715        let (bus, _td) = test_bus();
716        let ev = bus
717            .append(
718                "ws",
719                "ch",
720                &ContextEventKindV1::ToolCallRecorded,
721                Some("agent"),
722                serde_json::json!({"tool":"ctx_read"}),
723            )
724            .expect("append");
725        let got = bus.read("ws", "ch", ev.id - 1, 10);
726        assert!(got.iter().any(|e| e.id == ev.id));
727    }
728
729    #[test]
730    fn multi_client_concurrent_appends_have_deterministic_ordering() {
731        let (bus, _td) = test_bus();
732        let bus = Arc::new(bus);
733        let n_clients = 5;
734        let n_events_per_client = 20;
735        let ws = format!("ws-concurrent-{}", std::process::id());
736        let ch = format!("ch-concurrent-{}", std::process::id());
737
738        let mut handles = vec![];
739        for client_idx in 0..n_clients {
740            let bus = Arc::clone(&bus);
741            let ws = ws.clone();
742            let ch = ch.clone();
743            handles.push(std::thread::spawn(move || {
744                let agent = format!("agent-{client_idx}");
745                for event_idx in 0..n_events_per_client {
746                    bus.append(
747                        &ws,
748                        &ch,
749                        &ContextEventKindV1::ToolCallRecorded,
750                        Some(&agent),
751                        serde_json::json!({"client": client_idx, "seq": event_idx}),
752                    );
753                }
754            }));
755        }
756
757        for h in handles {
758            h.join().unwrap();
759        }
760
761        let all = bus.read(&ws, &ch, 0, 1000);
762        assert_eq!(
763            all.len(),
764            n_clients * n_events_per_client,
765            "all events should be persisted"
766        );
767
768        let ids: Vec<i64> = all.iter().map(|e| e.id).collect();
769        let mut sorted = ids.clone();
770        sorted.sort_unstable();
771        assert_eq!(ids, sorted, "events must be in strictly ascending ID order");
772
773        for win in ids.windows(2) {
774            assert!(
775                win[1] > win[0],
776                "IDs must be strictly monotonic (no gaps from concurrent access)"
777            );
778        }
779    }
780
781    #[test]
782    fn workspace_channel_isolation() {
783        let (bus, _td) = test_bus();
784        let pid = std::process::id();
785        let ws_a = format!("ws-iso-a-{pid}");
786        let ws_b = format!("ws-iso-b-{pid}");
787        let ws_c = format!("ws-iso-c-{pid}");
788        let ch1 = format!("ch-iso-1-{pid}");
789        let ch2 = format!("ch-iso-2-{pid}");
790
791        bus.append(
792            &ws_a,
793            &ch1,
794            &ContextEventKindV1::SessionMutated,
795            Some("agent-a"),
796            serde_json::json!({"ws":"a","ch":"1"}),
797        );
798        bus.append(
799            &ws_a,
800            &ch2,
801            &ContextEventKindV1::KnowledgeRemembered,
802            Some("agent-a"),
803            serde_json::json!({"ws":"a","ch":"2"}),
804        );
805        bus.append(
806            &ws_b,
807            &ch1,
808            &ContextEventKindV1::ArtifactStored,
809            Some("agent-b"),
810            serde_json::json!({"ws":"b","ch":"1"}),
811        );
812
813        let ws_a_ch_1 = bus.read(&ws_a, &ch1, 0, 100);
814        assert_eq!(ws_a_ch_1.len(), 1);
815        assert_eq!(ws_a_ch_1[0].kind, "session_mutated");
816
817        let ws_a_ch_2 = bus.read(&ws_a, &ch2, 0, 100);
818        assert_eq!(ws_a_ch_2.len(), 1);
819        assert_eq!(ws_a_ch_2[0].kind, "knowledge_remembered");
820
821        let ws_b_ch_1 = bus.read(&ws_b, &ch1, 0, 100);
822        assert_eq!(ws_b_ch_1.len(), 1);
823        assert_eq!(ws_b_ch_1[0].kind, "artifact_stored");
824
825        let ws_c_ch_1 = bus.read(&ws_c, &ch1, 0, 100);
826        assert!(ws_c_ch_1.is_empty(), "non-existent workspace returns empty");
827    }
828
829    #[test]
830    fn replay_from_cursor_returns_only_newer_events() {
831        let (bus, _td) = test_bus();
832        let pid = std::process::id();
833        let ws = &format!("ws-replay-{pid}");
834        let ch = &format!("ch-replay-{pid}");
835
836        let ev1 = bus
837            .append(
838                ws,
839                ch,
840                &ContextEventKindV1::ToolCallRecorded,
841                None,
842                serde_json::json!({"seq":1}),
843            )
844            .unwrap();
845        let ev2 = bus
846            .append(
847                ws,
848                ch,
849                &ContextEventKindV1::SessionMutated,
850                None,
851                serde_json::json!({"seq":2}),
852            )
853            .unwrap();
854        let _ev3 = bus
855            .append(
856                ws,
857                ch,
858                &ContextEventKindV1::GraphBuilt,
859                None,
860                serde_json::json!({"seq":3}),
861            )
862            .unwrap();
863
864        let from_cursor = bus.read(ws, ch, ev2.id, 100);
865        assert_eq!(from_cursor.len(), 1, "only events after cursor");
866        assert_eq!(from_cursor[0].kind, "graph_built");
867
868        let from_first = bus.read(ws, ch, ev1.id, 100);
869        assert_eq!(from_first.len(), 2, "events after first");
870
871        let from_zero = bus.read(ws, ch, 0, 100);
872        assert_eq!(from_zero.len(), 3, "all events from zero");
873    }
874
875    #[test]
876    fn broadcast_subscriber_receives_events() {
877        let (bus, _td) = test_bus();
878        let mut rx = bus.subscribe("ws", "ch").expect("subscribe should succeed");
879
880        let ev = bus
881            .append(
882                "ws",
883                "ch",
884                &ContextEventKindV1::ProofAdded,
885                Some("verifier"),
886                serde_json::json!({"proof":"hash"}),
887            )
888            .unwrap();
889
890        let received = rx.try_recv().expect("subscriber should receive event");
891        assert_eq!(received.id, ev.id);
892        assert_eq!(received.kind, "proof_added");
893        assert_eq!(received.actor.as_deref(), Some("verifier"));
894    }
895}