Skip to main content

lean_ctx/core/context_os/
context_bus.rs

1use std::path::PathBuf;
2use std::sync::{Arc, Mutex};
3
4use chrono::{DateTime, Utc};
5use rusqlite::{params, Connection};
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use tokio::sync::broadcast;
9
10#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
11#[serde(rename_all = "snake_case")]
12pub enum ContextEventKindV1 {
13    ToolCallRecorded,
14    SessionMutated,
15    KnowledgeRemembered,
16    ArtifactStored,
17    GraphBuilt,
18    ProofAdded,
19}
20
21impl ContextEventKindV1 {
22    pub fn as_str(&self) -> &'static str {
23        match self {
24            Self::ToolCallRecorded => "tool_call_recorded",
25            Self::SessionMutated => "session_mutated",
26            Self::KnowledgeRemembered => "knowledge_remembered",
27            Self::ArtifactStored => "artifact_stored",
28            Self::GraphBuilt => "graph_built",
29            Self::ProofAdded => "proof_added",
30        }
31    }
32
33    pub fn parse(s: &str) -> Self {
34        match s.trim().to_lowercase().as_str() {
35            "session_mutated" => Self::SessionMutated,
36            "knowledge_remembered" => Self::KnowledgeRemembered,
37            "artifact_stored" => Self::ArtifactStored,
38            "graph_built" => Self::GraphBuilt,
39            "proof_added" => Self::ProofAdded,
40            _ => Self::ToolCallRecorded,
41        }
42    }
43
44    /// Classifies the consistency requirement for this event kind.
45    ///
46    /// - `Local`: Agent-local, never shared (tool reads, cache hits).
47    /// - `Eventual`: Broadcast via bus, other agents see it "soon" (knowledge, artifacts).
48    /// - `Strong`: Critical decisions that require acknowledgment before proceeding.
49    pub fn consistency_level(&self) -> ConsistencyLevel {
50        match self {
51            Self::ToolCallRecorded | Self::GraphBuilt => ConsistencyLevel::Local,
52            Self::KnowledgeRemembered | Self::ArtifactStored => ConsistencyLevel::Eventual,
53            Self::SessionMutated | Self::ProofAdded => ConsistencyLevel::Strong,
54        }
55    }
56}
57
58/// Consistency requirement for shared context events.
59#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
60#[serde(rename_all = "snake_case")]
61pub enum ConsistencyLevel {
62    /// Agent-local, authoritative: session task, local cache, current file set.
63    Local,
64    /// Shared, eventually consistent: knowledge facts, gotchas, artifact refs.
65    Eventual,
66    /// Shared, strongly consistent: workspace config, critical decisions.
67    Strong,
68}
69
70impl ConsistencyLevel {
71    pub fn as_str(&self) -> &'static str {
72        match self {
73            Self::Local => "local",
74            Self::Eventual => "eventual",
75            Self::Strong => "strong",
76        }
77    }
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
81#[serde(rename_all = "camelCase")]
82pub struct ContextEventV1 {
83    pub id: i64,
84    pub workspace_id: String,
85    pub channel_id: String,
86    pub kind: String,
87    pub actor: Option<String>,
88    pub timestamp: DateTime<Utc>,
89    pub version: i64,
90    #[serde(skip_serializing_if = "Option::is_none")]
91    pub parent_id: Option<i64>,
92    pub consistency_level: String,
93    pub payload: Value,
94}
95
96impl ContextEventV1 {
97    pub fn consistency(&self) -> ConsistencyLevel {
98        ContextEventKindV1::parse(&self.kind).consistency_level()
99    }
100}
101
102fn event_from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<ContextEventV1> {
103    let ts_str: String = row.get(5)?;
104    let ts = DateTime::parse_from_rfc3339(&ts_str)
105        .map_or_else(|_| Utc::now(), |d| d.with_timezone(&Utc));
106    let payload_str: String = row.get(6)?;
107    let payload: Value = serde_json::from_str(&payload_str).unwrap_or(Value::Null);
108    let kind_str: String = row.get(3)?;
109    let cl = ContextEventKindV1::parse(&kind_str)
110        .consistency_level()
111        .as_str()
112        .to_string();
113    Ok(ContextEventV1 {
114        id: row.get(0)?,
115        workspace_id: row.get(1)?,
116        channel_id: row.get(2)?,
117        kind: kind_str,
118        actor: row.get::<_, Option<String>>(4)?,
119        timestamp: ts,
120        version: row.get::<_, i64>(7).unwrap_or(0),
121        parent_id: row.get::<_, Option<i64>>(8).ok().flatten(),
122        consistency_level: cl,
123        payload,
124    })
125}
126
127#[derive(Clone)]
128pub struct ContextBus {
129    inner: Arc<Inner>,
130}
131
132struct Inner {
133    conn: Mutex<Connection>,
134    tx: broadcast::Sender<ContextEventV1>,
135}
136
137impl Default for ContextBus {
138    fn default() -> Self {
139        Self::new()
140    }
141}
142
143impl ContextBus {
144    pub fn new() -> Self {
145        let path = default_db_path();
146        if let Some(parent) = path.parent() {
147            let _ = std::fs::create_dir_all(parent);
148        }
149        let conn = Connection::open(path).expect("open context-os db");
150        conn.execute_batch(
151            "PRAGMA journal_mode=WAL;
152             CREATE TABLE IF NOT EXISTS context_events (
153               id INTEGER PRIMARY KEY AUTOINCREMENT,
154               workspace_id TEXT NOT NULL,
155               channel_id TEXT NOT NULL,
156               kind TEXT NOT NULL,
157               actor TEXT,
158               timestamp TEXT NOT NULL,
159               payload_json TEXT NOT NULL,
160               version INTEGER NOT NULL DEFAULT 0,
161               parent_id INTEGER
162             );
163             CREATE INDEX IF NOT EXISTS idx_context_events_stream
164               ON context_events(workspace_id, channel_id, id);",
165        )
166        .expect("init context-os db");
167
168        // Migration: add version + parent_id to existing tables (idempotent).
169        let _ = conn.execute_batch(
170            "ALTER TABLE context_events ADD COLUMN version INTEGER NOT NULL DEFAULT 0;",
171        );
172        let _ = conn.execute_batch("ALTER TABLE context_events ADD COLUMN parent_id INTEGER;");
173
174        // FTS5 virtual table for full-text search over event payloads (idempotent).
175        let _ = conn.execute_batch(
176            "CREATE VIRTUAL TABLE IF NOT EXISTS context_events_fts USING fts5(
177               payload_text,
178               content=context_events,
179               content_rowid=id
180             );",
181        );
182
183        let (tx, _) = broadcast::channel(1024);
184        Self {
185            inner: Arc::new(Inner {
186                conn: Mutex::new(conn),
187                tx,
188            }),
189        }
190    }
191
192    pub fn subscribe(&self) -> broadcast::Receiver<ContextEventV1> {
193        self.inner.tx.subscribe()
194    }
195
196    pub fn append(
197        &self,
198        workspace_id: &str,
199        channel_id: &str,
200        kind: &ContextEventKindV1,
201        actor: Option<&str>,
202        payload: Value,
203    ) -> Option<ContextEventV1> {
204        self.append_with_parent(workspace_id, channel_id, kind, actor, payload, None)
205    }
206
207    pub fn append_with_parent(
208        &self,
209        workspace_id: &str,
210        channel_id: &str,
211        kind: &ContextEventKindV1,
212        actor: Option<&str>,
213        payload: Value,
214        parent_id: Option<i64>,
215    ) -> Option<ContextEventV1> {
216        let ts = Utc::now();
217        let payload_json = payload.to_string();
218
219        let (id, version) = {
220            let Ok(conn) = self.inner.conn.lock() else {
221                return None;
222            };
223            let version: i64 = conn
224                .query_row(
225                    "SELECT COALESCE(MAX(version), 0) FROM context_events WHERE workspace_id = ?1 AND channel_id = ?2",
226                    params![workspace_id, channel_id],
227                    |row| row.get(0),
228                )
229                .unwrap_or(0)
230                + 1;
231            let _ = conn.execute(
232                "INSERT INTO context_events (workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id)
233                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
234                params![
235                    workspace_id,
236                    channel_id,
237                    kind.as_str(),
238                    actor.map(str::to_string),
239                    ts.to_rfc3339(),
240                    payload_json,
241                    version,
242                    parent_id,
243                ],
244            );
245            let rowid = conn.last_insert_rowid();
246            let _ = conn.execute(
247                "INSERT INTO context_events_fts(rowid, payload_text) VALUES (?1, ?2)",
248                params![rowid, payload_json],
249            );
250            (rowid, version)
251        };
252
253        let ev = ContextEventV1 {
254            id,
255            workspace_id: workspace_id.to_string(),
256            channel_id: channel_id.to_string(),
257            consistency_level: kind.consistency_level().as_str().to_string(),
258            kind: kind.as_str().to_string(),
259            actor: actor.map(str::to_string),
260            timestamp: ts,
261            version,
262            parent_id,
263            payload,
264        };
265        let _ = self.inner.tx.send(ev.clone());
266        Some(ev)
267    }
268
269    pub fn read(
270        &self,
271        workspace_id: &str,
272        channel_id: &str,
273        since: i64,
274        limit: usize,
275    ) -> Vec<ContextEventV1> {
276        let limit = limit.clamp(1, 1000) as i64;
277        let Ok(conn) = self.inner.conn.lock() else {
278            return Vec::new();
279        };
280        let Ok(mut stmt) = conn.prepare(
281            "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id
282             FROM context_events
283             WHERE workspace_id = ?1 AND channel_id = ?2 AND id > ?3
284             ORDER BY id ASC
285             LIMIT ?4",
286        ) else {
287            return Vec::new();
288        };
289        let rows = stmt
290            .query_map(
291                params![workspace_id, channel_id, since, limit],
292                event_from_row,
293            )
294            .ok();
295        let Some(rows) = rows else {
296            return Vec::new();
297        };
298        rows.flatten().collect()
299    }
300
301    /// Query recent events of a specific kind (for conflict detection).
302    pub fn recent_by_kind(
303        &self,
304        workspace_id: &str,
305        channel_id: &str,
306        kind: &str,
307        limit: usize,
308    ) -> Vec<ContextEventV1> {
309        let limit = limit.clamp(1, 100) as i64;
310        let Ok(conn) = self.inner.conn.lock() else {
311            return Vec::new();
312        };
313        let Ok(mut stmt) = conn.prepare(
314            "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id
315             FROM context_events
316             WHERE workspace_id = ?1 AND channel_id = ?2 AND kind = ?3
317             ORDER BY id DESC
318             LIMIT ?4",
319        ) else {
320            return Vec::new();
321        };
322        let rows = stmt
323            .query_map(
324                params![workspace_id, channel_id, kind, limit],
325                event_from_row,
326            )
327            .ok();
328        rows.map(|r| r.flatten().collect()).unwrap_or_default()
329    }
330
331    /// Full-text search over event payloads via FTS5.
332    pub fn search(&self, workspace_id: &str, query: &str, limit: usize) -> Vec<ContextEventV1> {
333        let limit = limit.clamp(1, 100) as i64;
334        let Ok(conn) = self.inner.conn.lock() else {
335            return Vec::new();
336        };
337        let Ok(mut stmt) = conn.prepare(
338            "SELECT e.id, e.workspace_id, e.channel_id, e.kind, e.actor, e.timestamp,
339                    e.payload_json, e.version, e.parent_id
340             FROM context_events e
341             JOIN context_events_fts f ON e.id = f.rowid
342             WHERE f.payload_text MATCH ?1 AND e.workspace_id = ?2
343             ORDER BY f.rank
344             LIMIT ?3",
345        ) else {
346            return Vec::new();
347        };
348        let rows = stmt
349            .query_map(params![query, workspace_id, limit], event_from_row)
350            .ok();
351        rows.map(|r| r.flatten().collect()).unwrap_or_default()
352    }
353
354    /// Trace the causal lineage of an event by following parent_id chains.
355    pub fn lineage(&self, event_id: i64, max_depth: usize) -> Vec<ContextEventV1> {
356        let max_depth = max_depth.clamp(1, 50);
357        let Ok(conn) = self.inner.conn.lock() else {
358            return Vec::new();
359        };
360        let mut chain = Vec::new();
361        let mut current_id = Some(event_id);
362
363        for _ in 0..max_depth {
364            let Some(id) = current_id else {
365                break;
366            };
367            let ev = conn.query_row(
368                "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id
369                 FROM context_events WHERE id = ?1",
370                params![id],
371                event_from_row,
372            );
373            match ev {
374                Ok(ev) => {
375                    current_id = ev.parent_id;
376                    chain.push(ev);
377                }
378                Err(_) => break,
379            }
380        }
381        chain
382    }
383
384    /// Returns the highest event id for a workspace/channel pair, or 0 if none.
385    pub fn latest_id(&self, workspace_id: &str, channel_id: &str) -> i64 {
386        let Ok(conn) = self.inner.conn.lock() else {
387            return 0;
388        };
389        conn.query_row(
390            "SELECT COALESCE(MAX(id), 0) FROM context_events WHERE workspace_id = ?1 AND channel_id = ?2",
391            params![workspace_id, channel_id],
392            |row| row.get(0),
393        )
394        .unwrap_or(0)
395    }
396}
397
398fn default_db_path() -> PathBuf {
399    let data = crate::core::data_dir::lean_ctx_data_dir().unwrap_or_else(|_| PathBuf::from("."));
400    data.join("context-os").join("context-os.db")
401}
402
403#[cfg(test)]
404mod tests {
405    use super::*;
406
407    #[test]
408    fn append_and_read_roundtrip() {
409        let bus = ContextBus::new();
410        let ev = bus
411            .append(
412                "ws",
413                "ch",
414                &ContextEventKindV1::ToolCallRecorded,
415                Some("agent"),
416                serde_json::json!({"tool":"ctx_read"}),
417            )
418            .expect("append");
419        let got = bus.read("ws", "ch", ev.id - 1, 10);
420        assert!(got.iter().any(|e| e.id == ev.id));
421    }
422
423    #[test]
424    fn multi_client_concurrent_appends_have_deterministic_ordering() {
425        let bus = Arc::new(ContextBus::new());
426        let n_clients = 5;
427        let n_events_per_client = 20;
428        let ws = format!("ws-concurrent-{}", std::process::id());
429        let ch = format!("ch-concurrent-{}", std::process::id());
430
431        let mut handles = vec![];
432        for client_idx in 0..n_clients {
433            let bus = Arc::clone(&bus);
434            let ws = ws.clone();
435            let ch = ch.clone();
436            handles.push(std::thread::spawn(move || {
437                let agent = format!("agent-{client_idx}");
438                for event_idx in 0..n_events_per_client {
439                    bus.append(
440                        &ws,
441                        &ch,
442                        &ContextEventKindV1::ToolCallRecorded,
443                        Some(&agent),
444                        serde_json::json!({"client": client_idx, "seq": event_idx}),
445                    );
446                }
447            }));
448        }
449
450        for h in handles {
451            h.join().unwrap();
452        }
453
454        let all = bus.read(&ws, &ch, 0, 1000);
455        assert_eq!(
456            all.len(),
457            n_clients * n_events_per_client,
458            "all events should be persisted"
459        );
460
461        let ids: Vec<i64> = all.iter().map(|e| e.id).collect();
462        let mut sorted = ids.clone();
463        sorted.sort_unstable();
464        assert_eq!(ids, sorted, "events must be in strictly ascending ID order");
465
466        for win in ids.windows(2) {
467            assert!(
468                win[1] > win[0],
469                "IDs must be strictly monotonic (no gaps from concurrent access)"
470            );
471        }
472    }
473
474    #[test]
475    fn workspace_channel_isolation() {
476        let bus = ContextBus::new();
477        let pid = std::process::id();
478        let ws_a = format!("ws-iso-a-{pid}");
479        let ws_b = format!("ws-iso-b-{pid}");
480        let ws_c = format!("ws-iso-c-{pid}");
481        let ch1 = format!("ch-iso-1-{pid}");
482        let ch2 = format!("ch-iso-2-{pid}");
483
484        bus.append(
485            &ws_a,
486            &ch1,
487            &ContextEventKindV1::SessionMutated,
488            Some("agent-a"),
489            serde_json::json!({"ws":"a","ch":"1"}),
490        );
491        bus.append(
492            &ws_a,
493            &ch2,
494            &ContextEventKindV1::KnowledgeRemembered,
495            Some("agent-a"),
496            serde_json::json!({"ws":"a","ch":"2"}),
497        );
498        bus.append(
499            &ws_b,
500            &ch1,
501            &ContextEventKindV1::ArtifactStored,
502            Some("agent-b"),
503            serde_json::json!({"ws":"b","ch":"1"}),
504        );
505
506        let ws_a_ch_1 = bus.read(&ws_a, &ch1, 0, 100);
507        assert_eq!(ws_a_ch_1.len(), 1);
508        assert_eq!(ws_a_ch_1[0].kind, "session_mutated");
509
510        let ws_a_ch_2 = bus.read(&ws_a, &ch2, 0, 100);
511        assert_eq!(ws_a_ch_2.len(), 1);
512        assert_eq!(ws_a_ch_2[0].kind, "knowledge_remembered");
513
514        let ws_b_ch_1 = bus.read(&ws_b, &ch1, 0, 100);
515        assert_eq!(ws_b_ch_1.len(), 1);
516        assert_eq!(ws_b_ch_1[0].kind, "artifact_stored");
517
518        let ws_c_ch_1 = bus.read(&ws_c, &ch1, 0, 100);
519        assert!(ws_c_ch_1.is_empty(), "non-existent workspace returns empty");
520    }
521
522    #[test]
523    fn replay_from_cursor_returns_only_newer_events() {
524        let bus = ContextBus::new();
525        let pid = std::process::id();
526        let ws = &format!("ws-replay-{pid}");
527        let ch = &format!("ch-replay-{pid}");
528
529        let ev1 = bus
530            .append(
531                ws,
532                ch,
533                &ContextEventKindV1::ToolCallRecorded,
534                None,
535                serde_json::json!({"seq":1}),
536            )
537            .unwrap();
538        let ev2 = bus
539            .append(
540                ws,
541                ch,
542                &ContextEventKindV1::SessionMutated,
543                None,
544                serde_json::json!({"seq":2}),
545            )
546            .unwrap();
547        let _ev3 = bus
548            .append(
549                ws,
550                ch,
551                &ContextEventKindV1::GraphBuilt,
552                None,
553                serde_json::json!({"seq":3}),
554            )
555            .unwrap();
556
557        let from_cursor = bus.read(ws, ch, ev2.id, 100);
558        assert_eq!(from_cursor.len(), 1, "only events after cursor");
559        assert_eq!(from_cursor[0].kind, "graph_built");
560
561        let from_first = bus.read(ws, ch, ev1.id, 100);
562        assert_eq!(from_first.len(), 2, "events after first");
563
564        let from_zero = bus.read(ws, ch, 0, 100);
565        assert_eq!(from_zero.len(), 3, "all events from zero");
566    }
567
568    #[test]
569    fn broadcast_subscriber_receives_events() {
570        let bus = ContextBus::new();
571        let mut rx = bus.subscribe();
572
573        let ev = bus
574            .append(
575                "ws",
576                "ch",
577                &ContextEventKindV1::ProofAdded,
578                Some("verifier"),
579                serde_json::json!({"proof":"hash"}),
580            )
581            .unwrap();
582
583        let received = rx.try_recv().expect("subscriber should receive event");
584        assert_eq!(received.id, ev.id);
585        assert_eq!(received.kind, "proof_added");
586        assert_eq!(received.actor.as_deref(), Some("verifier"));
587    }
588}