1use std::path::PathBuf;
2use std::sync::{Arc, Mutex};
3
4use chrono::{DateTime, Utc};
5use rusqlite::{params, Connection};
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use tokio::sync::broadcast;
9
10#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
11#[serde(rename_all = "snake_case")]
12pub enum ContextEventKindV1 {
13 ToolCallRecorded,
14 SessionMutated,
15 KnowledgeRemembered,
16 ArtifactStored,
17 GraphBuilt,
18 ProofAdded,
19}
20
21impl ContextEventKindV1 {
22 pub fn as_str(&self) -> &'static str {
23 match self {
24 Self::ToolCallRecorded => "tool_call_recorded",
25 Self::SessionMutated => "session_mutated",
26 Self::KnowledgeRemembered => "knowledge_remembered",
27 Self::ArtifactStored => "artifact_stored",
28 Self::GraphBuilt => "graph_built",
29 Self::ProofAdded => "proof_added",
30 }
31 }
32
33 pub fn parse(s: &str) -> Self {
34 match s.trim().to_lowercase().as_str() {
35 "session_mutated" => Self::SessionMutated,
36 "knowledge_remembered" => Self::KnowledgeRemembered,
37 "artifact_stored" => Self::ArtifactStored,
38 "graph_built" => Self::GraphBuilt,
39 "proof_added" => Self::ProofAdded,
40 _ => Self::ToolCallRecorded,
41 }
42 }
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
46#[serde(rename_all = "camelCase")]
47pub struct ContextEventV1 {
48 pub id: i64,
49 pub workspace_id: String,
50 pub channel_id: String,
51 pub kind: String,
52 pub actor: Option<String>,
53 pub timestamp: DateTime<Utc>,
54 pub payload: Value,
55}
56
57#[derive(Clone)]
58pub struct ContextBus {
59 inner: Arc<Inner>,
60}
61
62struct Inner {
63 conn: Mutex<Connection>,
64 tx: broadcast::Sender<ContextEventV1>,
65}
66
67impl Default for ContextBus {
68 fn default() -> Self {
69 Self::new()
70 }
71}
72
73impl ContextBus {
74 pub fn new() -> Self {
75 let path = default_db_path();
76 if let Some(parent) = path.parent() {
77 let _ = std::fs::create_dir_all(parent);
78 }
79 let conn = Connection::open(path).expect("open context-os db");
80 conn.execute_batch(
81 "PRAGMA journal_mode=WAL;
82 CREATE TABLE IF NOT EXISTS context_events (
83 id INTEGER PRIMARY KEY AUTOINCREMENT,
84 workspace_id TEXT NOT NULL,
85 channel_id TEXT NOT NULL,
86 kind TEXT NOT NULL,
87 actor TEXT,
88 timestamp TEXT NOT NULL,
89 payload_json TEXT NOT NULL
90 );
91 CREATE INDEX IF NOT EXISTS idx_context_events_stream
92 ON context_events(workspace_id, channel_id, id);",
93 )
94 .expect("init context-os db");
95
96 let (tx, _) = broadcast::channel(1024);
97 Self {
98 inner: Arc::new(Inner {
99 conn: Mutex::new(conn),
100 tx,
101 }),
102 }
103 }
104
105 pub fn subscribe(&self) -> broadcast::Receiver<ContextEventV1> {
106 self.inner.tx.subscribe()
107 }
108
109 pub fn append(
110 &self,
111 workspace_id: &str,
112 channel_id: &str,
113 kind: &ContextEventKindV1,
114 actor: Option<&str>,
115 payload: Value,
116 ) -> Option<ContextEventV1> {
117 let ts = Utc::now();
118 let payload_json = payload.to_string();
119
120 let id = {
121 let Ok(conn) = self.inner.conn.lock() else {
122 return None;
123 };
124 let _ = conn.execute(
125 "INSERT INTO context_events (workspace_id, channel_id, kind, actor, timestamp, payload_json)
126 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
127 params![
128 workspace_id,
129 channel_id,
130 kind.as_str(),
131 actor.map(str::to_string),
132 ts.to_rfc3339(),
133 payload_json
134 ],
135 );
136 conn.last_insert_rowid()
137 };
138
139 let ev = ContextEventV1 {
140 id,
141 workspace_id: workspace_id.to_string(),
142 channel_id: channel_id.to_string(),
143 kind: kind.as_str().to_string(),
144 actor: actor.map(str::to_string),
145 timestamp: ts,
146 payload,
147 };
148 let _ = self.inner.tx.send(ev.clone());
149 Some(ev)
150 }
151
152 pub fn read(
153 &self,
154 workspace_id: &str,
155 channel_id: &str,
156 since: i64,
157 limit: usize,
158 ) -> Vec<ContextEventV1> {
159 let limit = limit.clamp(1, 1000) as i64;
160 let Ok(conn) = self.inner.conn.lock() else {
161 return Vec::new();
162 };
163 let Ok(mut stmt) = conn.prepare(
164 "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json
165 FROM context_events
166 WHERE workspace_id = ?1 AND channel_id = ?2 AND id > ?3
167 ORDER BY id ASC
168 LIMIT ?4",
169 ) else {
170 return Vec::new();
171 };
172 let rows = stmt
173 .query_map(params![workspace_id, channel_id, since, limit], |row| {
174 let ts_str: String = row.get(5)?;
175 let ts = DateTime::parse_from_rfc3339(&ts_str)
176 .map_or_else(|_| Utc::now(), |d| d.with_timezone(&Utc));
177 let payload_str: String = row.get(6)?;
178 let payload: Value = serde_json::from_str(&payload_str).unwrap_or(Value::Null);
179 Ok(ContextEventV1 {
180 id: row.get(0)?,
181 workspace_id: row.get(1)?,
182 channel_id: row.get(2)?,
183 kind: row.get(3)?,
184 actor: row.get::<_, Option<String>>(4)?,
185 timestamp: ts,
186 payload,
187 })
188 })
189 .ok();
190 let Some(rows) = rows else {
191 return Vec::new();
192 };
193 rows.flatten().collect()
194 }
195}
196
197fn default_db_path() -> PathBuf {
198 let data = crate::core::data_dir::lean_ctx_data_dir().unwrap_or_else(|_| PathBuf::from("."));
199 data.join("context-os").join("context-os.db")
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205
206 #[test]
207 fn append_and_read_roundtrip() {
208 let bus = ContextBus::new();
209 let ev = bus
210 .append(
211 "ws",
212 "ch",
213 &ContextEventKindV1::ToolCallRecorded,
214 Some("agent"),
215 serde_json::json!({"tool":"ctx_read"}),
216 )
217 .expect("append");
218 let got = bus.read("ws", "ch", ev.id - 1, 10);
219 assert!(got.iter().any(|e| e.id == ev.id));
220 }
221
222 #[test]
223 fn multi_client_concurrent_appends_have_deterministic_ordering() {
224 let bus = Arc::new(ContextBus::new());
225 let n_clients = 5;
226 let n_events_per_client = 20;
227 let ws = format!("ws-concurrent-{}", std::process::id());
228 let ch = format!("ch-concurrent-{}", std::process::id());
229
230 let mut handles = vec![];
231 for client_idx in 0..n_clients {
232 let bus = Arc::clone(&bus);
233 let ws = ws.clone();
234 let ch = ch.clone();
235 handles.push(std::thread::spawn(move || {
236 let agent = format!("agent-{client_idx}");
237 for event_idx in 0..n_events_per_client {
238 bus.append(
239 &ws,
240 &ch,
241 &ContextEventKindV1::ToolCallRecorded,
242 Some(&agent),
243 serde_json::json!({"client": client_idx, "seq": event_idx}),
244 );
245 }
246 }));
247 }
248
249 for h in handles {
250 h.join().unwrap();
251 }
252
253 let all = bus.read(&ws, &ch, 0, 1000);
254 assert_eq!(
255 all.len(),
256 n_clients * n_events_per_client,
257 "all events should be persisted"
258 );
259
260 let ids: Vec<i64> = all.iter().map(|e| e.id).collect();
261 let mut sorted = ids.clone();
262 sorted.sort_unstable();
263 assert_eq!(ids, sorted, "events must be in strictly ascending ID order");
264
265 for win in ids.windows(2) {
266 assert!(
267 win[1] > win[0],
268 "IDs must be strictly monotonic (no gaps from concurrent access)"
269 );
270 }
271 }
272
273 #[test]
274 fn workspace_channel_isolation() {
275 let bus = ContextBus::new();
276 let pid = std::process::id();
277 let ws_a = format!("ws-iso-a-{pid}");
278 let ws_b = format!("ws-iso-b-{pid}");
279 let ws_c = format!("ws-iso-c-{pid}");
280 let ch1 = format!("ch-iso-1-{pid}");
281 let ch2 = format!("ch-iso-2-{pid}");
282
283 bus.append(
284 &ws_a,
285 &ch1,
286 &ContextEventKindV1::SessionMutated,
287 Some("agent-a"),
288 serde_json::json!({"ws":"a","ch":"1"}),
289 );
290 bus.append(
291 &ws_a,
292 &ch2,
293 &ContextEventKindV1::KnowledgeRemembered,
294 Some("agent-a"),
295 serde_json::json!({"ws":"a","ch":"2"}),
296 );
297 bus.append(
298 &ws_b,
299 &ch1,
300 &ContextEventKindV1::ArtifactStored,
301 Some("agent-b"),
302 serde_json::json!({"ws":"b","ch":"1"}),
303 );
304
305 let ws_a_ch_1 = bus.read(&ws_a, &ch1, 0, 100);
306 assert_eq!(ws_a_ch_1.len(), 1);
307 assert_eq!(ws_a_ch_1[0].kind, "session_mutated");
308
309 let ws_a_ch_2 = bus.read(&ws_a, &ch2, 0, 100);
310 assert_eq!(ws_a_ch_2.len(), 1);
311 assert_eq!(ws_a_ch_2[0].kind, "knowledge_remembered");
312
313 let ws_b_ch_1 = bus.read(&ws_b, &ch1, 0, 100);
314 assert_eq!(ws_b_ch_1.len(), 1);
315 assert_eq!(ws_b_ch_1[0].kind, "artifact_stored");
316
317 let ws_c_ch_1 = bus.read(&ws_c, &ch1, 0, 100);
318 assert!(ws_c_ch_1.is_empty(), "non-existent workspace returns empty");
319 }
320
321 #[test]
322 fn replay_from_cursor_returns_only_newer_events() {
323 let bus = ContextBus::new();
324 let pid = std::process::id();
325 let ws = &format!("ws-replay-{pid}");
326 let ch = &format!("ch-replay-{pid}");
327
328 let ev1 = bus
329 .append(
330 ws,
331 ch,
332 &ContextEventKindV1::ToolCallRecorded,
333 None,
334 serde_json::json!({"seq":1}),
335 )
336 .unwrap();
337 let ev2 = bus
338 .append(
339 ws,
340 ch,
341 &ContextEventKindV1::SessionMutated,
342 None,
343 serde_json::json!({"seq":2}),
344 )
345 .unwrap();
346 let _ev3 = bus
347 .append(
348 ws,
349 ch,
350 &ContextEventKindV1::GraphBuilt,
351 None,
352 serde_json::json!({"seq":3}),
353 )
354 .unwrap();
355
356 let from_cursor = bus.read(ws, ch, ev2.id, 100);
357 assert_eq!(from_cursor.len(), 1, "only events after cursor");
358 assert_eq!(from_cursor[0].kind, "graph_built");
359
360 let from_first = bus.read(ws, ch, ev1.id, 100);
361 assert_eq!(from_first.len(), 2, "events after first");
362
363 let from_zero = bus.read(ws, ch, 0, 100);
364 assert_eq!(from_zero.len(), 3, "all events from zero");
365 }
366
367 #[test]
368 fn broadcast_subscriber_receives_events() {
369 let bus = ContextBus::new();
370 let mut rx = bus.subscribe();
371
372 let ev = bus
373 .append(
374 "ws",
375 "ch",
376 &ContextEventKindV1::ProofAdded,
377 Some("verifier"),
378 serde_json::json!({"proof":"hash"}),
379 )
380 .unwrap();
381
382 let received = rx.try_recv().expect("subscriber should receive event");
383 assert_eq!(received.id, ev.id);
384 assert_eq!(received.kind, "proof_added");
385 assert_eq!(received.actor.as_deref(), Some("verifier"));
386 }
387}