Skip to main content

lean_ctx/core/context_os/
context_bus.rs

1use std::path::PathBuf;
2use std::sync::{Arc, Mutex};
3
4use chrono::{DateTime, Utc};
5use rusqlite::{params, Connection};
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use tokio::sync::broadcast;
9
10#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
11#[serde(rename_all = "snake_case")]
12pub enum ContextEventKindV1 {
13    ToolCallRecorded,
14    SessionMutated,
15    KnowledgeRemembered,
16    ArtifactStored,
17    GraphBuilt,
18    ProofAdded,
19}
20
21impl ContextEventKindV1 {
22    pub fn as_str(&self) -> &'static str {
23        match self {
24            Self::ToolCallRecorded => "tool_call_recorded",
25            Self::SessionMutated => "session_mutated",
26            Self::KnowledgeRemembered => "knowledge_remembered",
27            Self::ArtifactStored => "artifact_stored",
28            Self::GraphBuilt => "graph_built",
29            Self::ProofAdded => "proof_added",
30        }
31    }
32
33    pub fn parse(s: &str) -> Self {
34        match s.trim().to_lowercase().as_str() {
35            "session_mutated" => Self::SessionMutated,
36            "knowledge_remembered" => Self::KnowledgeRemembered,
37            "artifact_stored" => Self::ArtifactStored,
38            "graph_built" => Self::GraphBuilt,
39            "proof_added" => Self::ProofAdded,
40            _ => Self::ToolCallRecorded,
41        }
42    }
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
46#[serde(rename_all = "camelCase")]
47pub struct ContextEventV1 {
48    pub id: i64,
49    pub workspace_id: String,
50    pub channel_id: String,
51    pub kind: String,
52    pub actor: Option<String>,
53    pub timestamp: DateTime<Utc>,
54    pub payload: Value,
55}
56
57#[derive(Clone)]
58pub struct ContextBus {
59    inner: Arc<Inner>,
60}
61
62struct Inner {
63    conn: Mutex<Connection>,
64    tx: broadcast::Sender<ContextEventV1>,
65}
66
67impl Default for ContextBus {
68    fn default() -> Self {
69        Self::new()
70    }
71}
72
73impl ContextBus {
74    pub fn new() -> Self {
75        let path = default_db_path();
76        if let Some(parent) = path.parent() {
77            let _ = std::fs::create_dir_all(parent);
78        }
79        let conn = Connection::open(path).expect("open context-os db");
80        conn.execute_batch(
81            "PRAGMA journal_mode=WAL;
82             CREATE TABLE IF NOT EXISTS context_events (
83               id INTEGER PRIMARY KEY AUTOINCREMENT,
84               workspace_id TEXT NOT NULL,
85               channel_id TEXT NOT NULL,
86               kind TEXT NOT NULL,
87               actor TEXT,
88               timestamp TEXT NOT NULL,
89               payload_json TEXT NOT NULL
90             );
91             CREATE INDEX IF NOT EXISTS idx_context_events_stream
92               ON context_events(workspace_id, channel_id, id);",
93        )
94        .expect("init context-os db");
95
96        let (tx, _) = broadcast::channel(1024);
97        Self {
98            inner: Arc::new(Inner {
99                conn: Mutex::new(conn),
100                tx,
101            }),
102        }
103    }
104
105    pub fn subscribe(&self) -> broadcast::Receiver<ContextEventV1> {
106        self.inner.tx.subscribe()
107    }
108
109    pub fn append(
110        &self,
111        workspace_id: &str,
112        channel_id: &str,
113        kind: &ContextEventKindV1,
114        actor: Option<&str>,
115        payload: Value,
116    ) -> Option<ContextEventV1> {
117        let ts = Utc::now();
118        let payload_json = payload.to_string();
119
120        let id = {
121            let Ok(conn) = self.inner.conn.lock() else {
122                return None;
123            };
124            let _ = conn.execute(
125                "INSERT INTO context_events (workspace_id, channel_id, kind, actor, timestamp, payload_json)
126                 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
127                params![
128                    workspace_id,
129                    channel_id,
130                    kind.as_str(),
131                    actor.map(str::to_string),
132                    ts.to_rfc3339(),
133                    payload_json
134                ],
135            );
136            conn.last_insert_rowid()
137        };
138
139        let ev = ContextEventV1 {
140            id,
141            workspace_id: workspace_id.to_string(),
142            channel_id: channel_id.to_string(),
143            kind: kind.as_str().to_string(),
144            actor: actor.map(str::to_string),
145            timestamp: ts,
146            payload,
147        };
148        let _ = self.inner.tx.send(ev.clone());
149        Some(ev)
150    }
151
152    pub fn read(
153        &self,
154        workspace_id: &str,
155        channel_id: &str,
156        since: i64,
157        limit: usize,
158    ) -> Vec<ContextEventV1> {
159        let limit = limit.clamp(1, 1000) as i64;
160        let Ok(conn) = self.inner.conn.lock() else {
161            return Vec::new();
162        };
163        let Ok(mut stmt) = conn.prepare(
164            "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json
165             FROM context_events
166             WHERE workspace_id = ?1 AND channel_id = ?2 AND id > ?3
167             ORDER BY id ASC
168             LIMIT ?4",
169        ) else {
170            return Vec::new();
171        };
172        let rows = stmt
173            .query_map(params![workspace_id, channel_id, since, limit], |row| {
174                let ts_str: String = row.get(5)?;
175                let ts = DateTime::parse_from_rfc3339(&ts_str)
176                    .map_or_else(|_| Utc::now(), |d| d.with_timezone(&Utc));
177                let payload_str: String = row.get(6)?;
178                let payload: Value = serde_json::from_str(&payload_str).unwrap_or(Value::Null);
179                Ok(ContextEventV1 {
180                    id: row.get(0)?,
181                    workspace_id: row.get(1)?,
182                    channel_id: row.get(2)?,
183                    kind: row.get(3)?,
184                    actor: row.get::<_, Option<String>>(4)?,
185                    timestamp: ts,
186                    payload,
187                })
188            })
189            .ok();
190        let Some(rows) = rows else {
191            return Vec::new();
192        };
193        rows.flatten().collect()
194    }
195}
196
197fn default_db_path() -> PathBuf {
198    let data = crate::core::data_dir::lean_ctx_data_dir().unwrap_or_else(|_| PathBuf::from("."));
199    data.join("context-os").join("context-os.db")
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205
206    #[test]
207    fn append_and_read_roundtrip() {
208        let bus = ContextBus::new();
209        let ev = bus
210            .append(
211                "ws",
212                "ch",
213                &ContextEventKindV1::ToolCallRecorded,
214                Some("agent"),
215                serde_json::json!({"tool":"ctx_read"}),
216            )
217            .expect("append");
218        let got = bus.read("ws", "ch", ev.id - 1, 10);
219        assert!(got.iter().any(|e| e.id == ev.id));
220    }
221
222    #[test]
223    fn multi_client_concurrent_appends_have_deterministic_ordering() {
224        let bus = Arc::new(ContextBus::new());
225        let n_clients = 5;
226        let n_events_per_client = 20;
227        let ws = format!("ws-concurrent-{}", std::process::id());
228        let ch = format!("ch-concurrent-{}", std::process::id());
229
230        let mut handles = vec![];
231        for client_idx in 0..n_clients {
232            let bus = Arc::clone(&bus);
233            let ws = ws.clone();
234            let ch = ch.clone();
235            handles.push(std::thread::spawn(move || {
236                let agent = format!("agent-{client_idx}");
237                for event_idx in 0..n_events_per_client {
238                    bus.append(
239                        &ws,
240                        &ch,
241                        &ContextEventKindV1::ToolCallRecorded,
242                        Some(&agent),
243                        serde_json::json!({"client": client_idx, "seq": event_idx}),
244                    );
245                }
246            }));
247        }
248
249        for h in handles {
250            h.join().unwrap();
251        }
252
253        let all = bus.read(&ws, &ch, 0, 1000);
254        assert_eq!(
255            all.len(),
256            n_clients * n_events_per_client,
257            "all events should be persisted"
258        );
259
260        let ids: Vec<i64> = all.iter().map(|e| e.id).collect();
261        let mut sorted = ids.clone();
262        sorted.sort_unstable();
263        assert_eq!(ids, sorted, "events must be in strictly ascending ID order");
264
265        for win in ids.windows(2) {
266            assert!(
267                win[1] > win[0],
268                "IDs must be strictly monotonic (no gaps from concurrent access)"
269            );
270        }
271    }
272
273    #[test]
274    fn workspace_channel_isolation() {
275        let bus = ContextBus::new();
276        let pid = std::process::id();
277        let ws_a = format!("ws-iso-a-{pid}");
278        let ws_b = format!("ws-iso-b-{pid}");
279        let ws_c = format!("ws-iso-c-{pid}");
280        let ch1 = format!("ch-iso-1-{pid}");
281        let ch2 = format!("ch-iso-2-{pid}");
282
283        bus.append(
284            &ws_a,
285            &ch1,
286            &ContextEventKindV1::SessionMutated,
287            Some("agent-a"),
288            serde_json::json!({"ws":"a","ch":"1"}),
289        );
290        bus.append(
291            &ws_a,
292            &ch2,
293            &ContextEventKindV1::KnowledgeRemembered,
294            Some("agent-a"),
295            serde_json::json!({"ws":"a","ch":"2"}),
296        );
297        bus.append(
298            &ws_b,
299            &ch1,
300            &ContextEventKindV1::ArtifactStored,
301            Some("agent-b"),
302            serde_json::json!({"ws":"b","ch":"1"}),
303        );
304
305        let ws_a_ch_1 = bus.read(&ws_a, &ch1, 0, 100);
306        assert_eq!(ws_a_ch_1.len(), 1);
307        assert_eq!(ws_a_ch_1[0].kind, "session_mutated");
308
309        let ws_a_ch_2 = bus.read(&ws_a, &ch2, 0, 100);
310        assert_eq!(ws_a_ch_2.len(), 1);
311        assert_eq!(ws_a_ch_2[0].kind, "knowledge_remembered");
312
313        let ws_b_ch_1 = bus.read(&ws_b, &ch1, 0, 100);
314        assert_eq!(ws_b_ch_1.len(), 1);
315        assert_eq!(ws_b_ch_1[0].kind, "artifact_stored");
316
317        let ws_c_ch_1 = bus.read(&ws_c, &ch1, 0, 100);
318        assert!(ws_c_ch_1.is_empty(), "non-existent workspace returns empty");
319    }
320
321    #[test]
322    fn replay_from_cursor_returns_only_newer_events() {
323        let bus = ContextBus::new();
324        let pid = std::process::id();
325        let ws = &format!("ws-replay-{pid}");
326        let ch = &format!("ch-replay-{pid}");
327
328        let ev1 = bus
329            .append(
330                ws,
331                ch,
332                &ContextEventKindV1::ToolCallRecorded,
333                None,
334                serde_json::json!({"seq":1}),
335            )
336            .unwrap();
337        let ev2 = bus
338            .append(
339                ws,
340                ch,
341                &ContextEventKindV1::SessionMutated,
342                None,
343                serde_json::json!({"seq":2}),
344            )
345            .unwrap();
346        let _ev3 = bus
347            .append(
348                ws,
349                ch,
350                &ContextEventKindV1::GraphBuilt,
351                None,
352                serde_json::json!({"seq":3}),
353            )
354            .unwrap();
355
356        let from_cursor = bus.read(ws, ch, ev2.id, 100);
357        assert_eq!(from_cursor.len(), 1, "only events after cursor");
358        assert_eq!(from_cursor[0].kind, "graph_built");
359
360        let from_first = bus.read(ws, ch, ev1.id, 100);
361        assert_eq!(from_first.len(), 2, "events after first");
362
363        let from_zero = bus.read(ws, ch, 0, 100);
364        assert_eq!(from_zero.len(), 3, "all events from zero");
365    }
366
367    #[test]
368    fn broadcast_subscriber_receives_events() {
369        let bus = ContextBus::new();
370        let mut rx = bus.subscribe();
371
372        let ev = bus
373            .append(
374                "ws",
375                "ch",
376                &ContextEventKindV1::ProofAdded,
377                Some("verifier"),
378                serde_json::json!({"proof":"hash"}),
379            )
380            .unwrap();
381
382        let received = rx.try_recv().expect("subscriber should receive event");
383        assert_eq!(received.id, ev.id);
384        assert_eq!(received.kind, "proof_added");
385        assert_eq!(received.actor.as_deref(), Some("verifier"));
386    }
387}