1use std::path::PathBuf;
2use std::sync::{Arc, Mutex};
3
4use chrono::{DateTime, Utc};
5use rusqlite::{params, Connection};
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use tokio::sync::broadcast;
9
10#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
11#[serde(rename_all = "snake_case")]
12pub enum ContextEventKindV1 {
13 ToolCallRecorded,
14 SessionMutated,
15 KnowledgeRemembered,
16 ArtifactStored,
17 GraphBuilt,
18 ProofAdded,
19}
20
21impl ContextEventKindV1 {
22 pub fn as_str(&self) -> &'static str {
23 match self {
24 Self::ToolCallRecorded => "tool_call_recorded",
25 Self::SessionMutated => "session_mutated",
26 Self::KnowledgeRemembered => "knowledge_remembered",
27 Self::ArtifactStored => "artifact_stored",
28 Self::GraphBuilt => "graph_built",
29 Self::ProofAdded => "proof_added",
30 }
31 }
32
33 pub fn parse(s: &str) -> Self {
34 match s.trim().to_lowercase().as_str() {
35 "session_mutated" => Self::SessionMutated,
36 "knowledge_remembered" => Self::KnowledgeRemembered,
37 "artifact_stored" => Self::ArtifactStored,
38 "graph_built" => Self::GraphBuilt,
39 "proof_added" => Self::ProofAdded,
40 _ => Self::ToolCallRecorded,
41 }
42 }
43
44 pub fn consistency_level(&self) -> ConsistencyLevel {
50 match self {
51 Self::ToolCallRecorded | Self::GraphBuilt => ConsistencyLevel::Local,
52 Self::KnowledgeRemembered | Self::ArtifactStored => ConsistencyLevel::Eventual,
53 Self::SessionMutated | Self::ProofAdded => ConsistencyLevel::Strong,
54 }
55 }
56}
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
60#[serde(rename_all = "snake_case")]
61pub enum ConsistencyLevel {
62 Local,
64 Eventual,
66 Strong,
68}
69
70impl ConsistencyLevel {
71 pub fn as_str(&self) -> &'static str {
72 match self {
73 Self::Local => "local",
74 Self::Eventual => "eventual",
75 Self::Strong => "strong",
76 }
77 }
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
81#[serde(rename_all = "camelCase")]
82pub struct ContextEventV1 {
83 pub id: i64,
84 pub workspace_id: String,
85 pub channel_id: String,
86 pub kind: String,
87 pub actor: Option<String>,
88 pub timestamp: DateTime<Utc>,
89 pub version: i64,
90 #[serde(skip_serializing_if = "Option::is_none")]
91 pub parent_id: Option<i64>,
92 pub consistency_level: String,
93 pub payload: Value,
94}
95
96impl ContextEventV1 {
97 pub fn consistency(&self) -> ConsistencyLevel {
98 ContextEventKindV1::parse(&self.kind).consistency_level()
99 }
100}
101
102fn event_from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<ContextEventV1> {
103 let ts_str: String = row.get(5)?;
104 let ts = DateTime::parse_from_rfc3339(&ts_str)
105 .map_or_else(|_| Utc::now(), |d| d.with_timezone(&Utc));
106 let payload_str: String = row.get(6)?;
107 let payload: Value = serde_json::from_str(&payload_str).unwrap_or(Value::Null);
108 let kind_str: String = row.get(3)?;
109 let cl = ContextEventKindV1::parse(&kind_str)
110 .consistency_level()
111 .as_str()
112 .to_string();
113 Ok(ContextEventV1 {
114 id: row.get(0)?,
115 workspace_id: row.get(1)?,
116 channel_id: row.get(2)?,
117 kind: kind_str,
118 actor: row.get::<_, Option<String>>(4)?,
119 timestamp: ts,
120 version: row.get::<_, i64>(7).unwrap_or(0),
121 parent_id: row.get::<_, Option<i64>>(8).ok().flatten(),
122 consistency_level: cl,
123 payload,
124 })
125}
126
127#[derive(Clone)]
128pub struct ContextBus {
129 inner: Arc<Inner>,
130}
131
132struct Inner {
133 conn: Mutex<Connection>,
134 tx: broadcast::Sender<ContextEventV1>,
135}
136
137impl Default for ContextBus {
138 fn default() -> Self {
139 Self::new()
140 }
141}
142
143impl ContextBus {
144 pub fn new() -> Self {
145 let path = default_db_path();
146 if let Some(parent) = path.parent() {
147 let _ = std::fs::create_dir_all(parent);
148 }
149 let conn = Connection::open(path).expect("open context-os db");
150 conn.execute_batch(
151 "PRAGMA journal_mode=WAL;
152 CREATE TABLE IF NOT EXISTS context_events (
153 id INTEGER PRIMARY KEY AUTOINCREMENT,
154 workspace_id TEXT NOT NULL,
155 channel_id TEXT NOT NULL,
156 kind TEXT NOT NULL,
157 actor TEXT,
158 timestamp TEXT NOT NULL,
159 payload_json TEXT NOT NULL,
160 version INTEGER NOT NULL DEFAULT 0,
161 parent_id INTEGER
162 );
163 CREATE INDEX IF NOT EXISTS idx_context_events_stream
164 ON context_events(workspace_id, channel_id, id);",
165 )
166 .expect("init context-os db");
167
168 let _ = conn.execute_batch(
170 "ALTER TABLE context_events ADD COLUMN version INTEGER NOT NULL DEFAULT 0;",
171 );
172 let _ = conn.execute_batch("ALTER TABLE context_events ADD COLUMN parent_id INTEGER;");
173
174 let _ = conn.execute_batch(
176 "CREATE VIRTUAL TABLE IF NOT EXISTS context_events_fts USING fts5(
177 payload_text,
178 content=context_events,
179 content_rowid=id
180 );",
181 );
182
183 let (tx, _) = broadcast::channel(1024);
184 Self {
185 inner: Arc::new(Inner {
186 conn: Mutex::new(conn),
187 tx,
188 }),
189 }
190 }
191
192 pub fn subscribe(&self) -> broadcast::Receiver<ContextEventV1> {
193 self.inner.tx.subscribe()
194 }
195
196 pub fn append(
197 &self,
198 workspace_id: &str,
199 channel_id: &str,
200 kind: &ContextEventKindV1,
201 actor: Option<&str>,
202 payload: Value,
203 ) -> Option<ContextEventV1> {
204 self.append_with_parent(workspace_id, channel_id, kind, actor, payload, None)
205 }
206
207 pub fn append_with_parent(
208 &self,
209 workspace_id: &str,
210 channel_id: &str,
211 kind: &ContextEventKindV1,
212 actor: Option<&str>,
213 payload: Value,
214 parent_id: Option<i64>,
215 ) -> Option<ContextEventV1> {
216 let ts = Utc::now();
217 let payload_json = payload.to_string();
218
219 let (id, version) = {
220 let Ok(conn) = self.inner.conn.lock() else {
221 return None;
222 };
223 let version: i64 = conn
224 .query_row(
225 "SELECT COALESCE(MAX(version), 0) FROM context_events WHERE workspace_id = ?1 AND channel_id = ?2",
226 params![workspace_id, channel_id],
227 |row| row.get(0),
228 )
229 .unwrap_or(0)
230 + 1;
231 let _ = conn.execute(
232 "INSERT INTO context_events (workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id)
233 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
234 params![
235 workspace_id,
236 channel_id,
237 kind.as_str(),
238 actor.map(str::to_string),
239 ts.to_rfc3339(),
240 payload_json,
241 version,
242 parent_id,
243 ],
244 );
245 let rowid = conn.last_insert_rowid();
246 let _ = conn.execute(
247 "INSERT INTO context_events_fts(rowid, payload_text) VALUES (?1, ?2)",
248 params![rowid, payload_json],
249 );
250 (rowid, version)
251 };
252
253 let ev = ContextEventV1 {
254 id,
255 workspace_id: workspace_id.to_string(),
256 channel_id: channel_id.to_string(),
257 consistency_level: kind.consistency_level().as_str().to_string(),
258 kind: kind.as_str().to_string(),
259 actor: actor.map(str::to_string),
260 timestamp: ts,
261 version,
262 parent_id,
263 payload,
264 };
265 let _ = self.inner.tx.send(ev.clone());
266 Some(ev)
267 }
268
269 pub fn read(
270 &self,
271 workspace_id: &str,
272 channel_id: &str,
273 since: i64,
274 limit: usize,
275 ) -> Vec<ContextEventV1> {
276 let limit = limit.clamp(1, 1000) as i64;
277 let Ok(conn) = self.inner.conn.lock() else {
278 return Vec::new();
279 };
280 let Ok(mut stmt) = conn.prepare(
281 "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id
282 FROM context_events
283 WHERE workspace_id = ?1 AND channel_id = ?2 AND id > ?3
284 ORDER BY id ASC
285 LIMIT ?4",
286 ) else {
287 return Vec::new();
288 };
289 let rows = stmt
290 .query_map(
291 params![workspace_id, channel_id, since, limit],
292 event_from_row,
293 )
294 .ok();
295 let Some(rows) = rows else {
296 return Vec::new();
297 };
298 rows.flatten().collect()
299 }
300
301 pub fn recent_by_kind(
303 &self,
304 workspace_id: &str,
305 channel_id: &str,
306 kind: &str,
307 limit: usize,
308 ) -> Vec<ContextEventV1> {
309 let limit = limit.clamp(1, 100) as i64;
310 let Ok(conn) = self.inner.conn.lock() else {
311 return Vec::new();
312 };
313 let Ok(mut stmt) = conn.prepare(
314 "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id
315 FROM context_events
316 WHERE workspace_id = ?1 AND channel_id = ?2 AND kind = ?3
317 ORDER BY id DESC
318 LIMIT ?4",
319 ) else {
320 return Vec::new();
321 };
322 let rows = stmt
323 .query_map(
324 params![workspace_id, channel_id, kind, limit],
325 event_from_row,
326 )
327 .ok();
328 rows.map(|r| r.flatten().collect()).unwrap_or_default()
329 }
330
331 pub fn search(&self, workspace_id: &str, query: &str, limit: usize) -> Vec<ContextEventV1> {
333 let limit = limit.clamp(1, 100) as i64;
334 let Ok(conn) = self.inner.conn.lock() else {
335 return Vec::new();
336 };
337 let Ok(mut stmt) = conn.prepare(
338 "SELECT e.id, e.workspace_id, e.channel_id, e.kind, e.actor, e.timestamp,
339 e.payload_json, e.version, e.parent_id
340 FROM context_events e
341 JOIN context_events_fts f ON e.id = f.rowid
342 WHERE f.payload_text MATCH ?1 AND e.workspace_id = ?2
343 ORDER BY f.rank
344 LIMIT ?3",
345 ) else {
346 return Vec::new();
347 };
348 let rows = stmt
349 .query_map(params![query, workspace_id, limit], event_from_row)
350 .ok();
351 rows.map(|r| r.flatten().collect()).unwrap_or_default()
352 }
353
354 pub fn lineage(&self, event_id: i64, max_depth: usize) -> Vec<ContextEventV1> {
356 let max_depth = max_depth.clamp(1, 50);
357 let Ok(conn) = self.inner.conn.lock() else {
358 return Vec::new();
359 };
360 let mut chain = Vec::new();
361 let mut current_id = Some(event_id);
362
363 for _ in 0..max_depth {
364 let Some(id) = current_id else {
365 break;
366 };
367 let ev = conn.query_row(
368 "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id
369 FROM context_events WHERE id = ?1",
370 params![id],
371 event_from_row,
372 );
373 match ev {
374 Ok(ev) => {
375 current_id = ev.parent_id;
376 chain.push(ev);
377 }
378 Err(_) => break,
379 }
380 }
381 chain
382 }
383
384 pub fn latest_id(&self, workspace_id: &str, channel_id: &str) -> i64 {
386 let Ok(conn) = self.inner.conn.lock() else {
387 return 0;
388 };
389 conn.query_row(
390 "SELECT COALESCE(MAX(id), 0) FROM context_events WHERE workspace_id = ?1 AND channel_id = ?2",
391 params![workspace_id, channel_id],
392 |row| row.get(0),
393 )
394 .unwrap_or(0)
395 }
396}
397
398fn default_db_path() -> PathBuf {
399 let data = crate::core::data_dir::lean_ctx_data_dir().unwrap_or_else(|_| PathBuf::from("."));
400 data.join("context-os").join("context-os.db")
401}
402
403#[cfg(test)]
404mod tests {
405 use super::*;
406
407 #[test]
408 fn append_and_read_roundtrip() {
409 let bus = ContextBus::new();
410 let ev = bus
411 .append(
412 "ws",
413 "ch",
414 &ContextEventKindV1::ToolCallRecorded,
415 Some("agent"),
416 serde_json::json!({"tool":"ctx_read"}),
417 )
418 .expect("append");
419 let got = bus.read("ws", "ch", ev.id - 1, 10);
420 assert!(got.iter().any(|e| e.id == ev.id));
421 }
422
423 #[test]
424 fn multi_client_concurrent_appends_have_deterministic_ordering() {
425 let bus = Arc::new(ContextBus::new());
426 let n_clients = 5;
427 let n_events_per_client = 20;
428 let ws = format!("ws-concurrent-{}", std::process::id());
429 let ch = format!("ch-concurrent-{}", std::process::id());
430
431 let mut handles = vec![];
432 for client_idx in 0..n_clients {
433 let bus = Arc::clone(&bus);
434 let ws = ws.clone();
435 let ch = ch.clone();
436 handles.push(std::thread::spawn(move || {
437 let agent = format!("agent-{client_idx}");
438 for event_idx in 0..n_events_per_client {
439 bus.append(
440 &ws,
441 &ch,
442 &ContextEventKindV1::ToolCallRecorded,
443 Some(&agent),
444 serde_json::json!({"client": client_idx, "seq": event_idx}),
445 );
446 }
447 }));
448 }
449
450 for h in handles {
451 h.join().unwrap();
452 }
453
454 let all = bus.read(&ws, &ch, 0, 1000);
455 assert_eq!(
456 all.len(),
457 n_clients * n_events_per_client,
458 "all events should be persisted"
459 );
460
461 let ids: Vec<i64> = all.iter().map(|e| e.id).collect();
462 let mut sorted = ids.clone();
463 sorted.sort_unstable();
464 assert_eq!(ids, sorted, "events must be in strictly ascending ID order");
465
466 for win in ids.windows(2) {
467 assert!(
468 win[1] > win[0],
469 "IDs must be strictly monotonic (no gaps from concurrent access)"
470 );
471 }
472 }
473
474 #[test]
475 fn workspace_channel_isolation() {
476 let bus = ContextBus::new();
477 let pid = std::process::id();
478 let ws_a = format!("ws-iso-a-{pid}");
479 let ws_b = format!("ws-iso-b-{pid}");
480 let ws_c = format!("ws-iso-c-{pid}");
481 let ch1 = format!("ch-iso-1-{pid}");
482 let ch2 = format!("ch-iso-2-{pid}");
483
484 bus.append(
485 &ws_a,
486 &ch1,
487 &ContextEventKindV1::SessionMutated,
488 Some("agent-a"),
489 serde_json::json!({"ws":"a","ch":"1"}),
490 );
491 bus.append(
492 &ws_a,
493 &ch2,
494 &ContextEventKindV1::KnowledgeRemembered,
495 Some("agent-a"),
496 serde_json::json!({"ws":"a","ch":"2"}),
497 );
498 bus.append(
499 &ws_b,
500 &ch1,
501 &ContextEventKindV1::ArtifactStored,
502 Some("agent-b"),
503 serde_json::json!({"ws":"b","ch":"1"}),
504 );
505
506 let ws_a_ch_1 = bus.read(&ws_a, &ch1, 0, 100);
507 assert_eq!(ws_a_ch_1.len(), 1);
508 assert_eq!(ws_a_ch_1[0].kind, "session_mutated");
509
510 let ws_a_ch_2 = bus.read(&ws_a, &ch2, 0, 100);
511 assert_eq!(ws_a_ch_2.len(), 1);
512 assert_eq!(ws_a_ch_2[0].kind, "knowledge_remembered");
513
514 let ws_b_ch_1 = bus.read(&ws_b, &ch1, 0, 100);
515 assert_eq!(ws_b_ch_1.len(), 1);
516 assert_eq!(ws_b_ch_1[0].kind, "artifact_stored");
517
518 let ws_c_ch_1 = bus.read(&ws_c, &ch1, 0, 100);
519 assert!(ws_c_ch_1.is_empty(), "non-existent workspace returns empty");
520 }
521
522 #[test]
523 fn replay_from_cursor_returns_only_newer_events() {
524 let bus = ContextBus::new();
525 let pid = std::process::id();
526 let ws = &format!("ws-replay-{pid}");
527 let ch = &format!("ch-replay-{pid}");
528
529 let ev1 = bus
530 .append(
531 ws,
532 ch,
533 &ContextEventKindV1::ToolCallRecorded,
534 None,
535 serde_json::json!({"seq":1}),
536 )
537 .unwrap();
538 let ev2 = bus
539 .append(
540 ws,
541 ch,
542 &ContextEventKindV1::SessionMutated,
543 None,
544 serde_json::json!({"seq":2}),
545 )
546 .unwrap();
547 let _ev3 = bus
548 .append(
549 ws,
550 ch,
551 &ContextEventKindV1::GraphBuilt,
552 None,
553 serde_json::json!({"seq":3}),
554 )
555 .unwrap();
556
557 let from_cursor = bus.read(ws, ch, ev2.id, 100);
558 assert_eq!(from_cursor.len(), 1, "only events after cursor");
559 assert_eq!(from_cursor[0].kind, "graph_built");
560
561 let from_first = bus.read(ws, ch, ev1.id, 100);
562 assert_eq!(from_first.len(), 2, "events after first");
563
564 let from_zero = bus.read(ws, ch, 0, 100);
565 assert_eq!(from_zero.len(), 3, "all events from zero");
566 }
567
568 #[test]
569 fn broadcast_subscriber_receives_events() {
570 let bus = ContextBus::new();
571 let mut rx = bus.subscribe();
572
573 let ev = bus
574 .append(
575 "ws",
576 "ch",
577 &ContextEventKindV1::ProofAdded,
578 Some("verifier"),
579 serde_json::json!({"proof":"hash"}),
580 )
581 .unwrap();
582
583 let received = rx.try_recv().expect("subscriber should receive event");
584 assert_eq!(received.id, ev.id);
585 assert_eq!(received.kind, "proof_added");
586 assert_eq!(received.actor.as_deref(), Some("verifier"));
587 }
588}