1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::{Arc, Mutex};
4
5use chrono::{DateTime, Utc};
6use rusqlite::{params, Connection};
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use tokio::sync::broadcast;
10
11const MAX_READ_CONNS: usize = 4;
12
13#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
14#[serde(rename_all = "snake_case")]
15pub enum ContextEventKindV1 {
16 ToolCallRecorded,
17 SessionMutated,
18 KnowledgeRemembered,
19 ArtifactStored,
20 GraphBuilt,
21 ProofAdded,
22}
23
24impl ContextEventKindV1 {
25 pub fn as_str(&self) -> &'static str {
26 match self {
27 Self::ToolCallRecorded => "tool_call_recorded",
28 Self::SessionMutated => "session_mutated",
29 Self::KnowledgeRemembered => "knowledge_remembered",
30 Self::ArtifactStored => "artifact_stored",
31 Self::GraphBuilt => "graph_built",
32 Self::ProofAdded => "proof_added",
33 }
34 }
35
36 pub fn parse(s: &str) -> Self {
37 match s.trim().to_lowercase().as_str() {
38 "tool_call_recorded" => Self::ToolCallRecorded,
39 "session_mutated" => Self::SessionMutated,
40 "knowledge_remembered" => Self::KnowledgeRemembered,
41 "artifact_stored" => Self::ArtifactStored,
42 "graph_built" => Self::GraphBuilt,
43 "proof_added" => Self::ProofAdded,
44 other => {
45 tracing::warn!(
46 "unknown ContextEventKind '{other}', defaulting to ToolCallRecorded"
47 );
48 Self::ToolCallRecorded
49 }
50 }
51 }
52
53 pub fn consistency_level(&self) -> ConsistencyLevel {
59 match self {
60 Self::ToolCallRecorded | Self::GraphBuilt => ConsistencyLevel::Local,
61 Self::KnowledgeRemembered | Self::ArtifactStored => ConsistencyLevel::Eventual,
62 Self::SessionMutated | Self::ProofAdded => ConsistencyLevel::Strong,
63 }
64 }
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
69#[serde(rename_all = "snake_case")]
70pub enum ConsistencyLevel {
71 Local,
73 Eventual,
75 Strong,
77}
78
79impl ConsistencyLevel {
80 pub fn as_str(&self) -> &'static str {
81 match self {
82 Self::Local => "local",
83 Self::Eventual => "eventual",
84 Self::Strong => "strong",
85 }
86 }
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
90#[serde(rename_all = "camelCase")]
91pub struct ContextEventV1 {
92 pub id: i64,
93 pub workspace_id: String,
94 pub channel_id: String,
95 pub kind: String,
96 pub actor: Option<String>,
97 pub timestamp: DateTime<Utc>,
98 pub version: i64,
99 #[serde(skip_serializing_if = "Option::is_none")]
100 pub parent_id: Option<i64>,
101 pub consistency_level: String,
102 pub payload: Value,
103}
104
105impl ContextEventV1 {
106 pub fn consistency(&self) -> ConsistencyLevel {
107 ContextEventKindV1::parse(&self.kind).consistency_level()
108 }
109}
110
111fn event_from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<ContextEventV1> {
112 let ts_str: String = row.get(5)?;
113 let ts = DateTime::parse_from_rfc3339(&ts_str)
114 .map_or_else(|_| Utc::now(), |d| d.with_timezone(&Utc));
115 let payload_str: String = row.get(6)?;
116 let payload: Value = serde_json::from_str(&payload_str).unwrap_or(Value::Null);
117 let kind_str: String = row.get(3)?;
118 let cl = ContextEventKindV1::parse(&kind_str)
119 .consistency_level()
120 .as_str()
121 .to_string();
122 Ok(ContextEventV1 {
123 id: row.get(0)?,
124 workspace_id: row.get(1)?,
125 channel_id: row.get(2)?,
126 kind: kind_str,
127 actor: row.get::<_, Option<String>>(4)?,
128 timestamp: ts,
129 version: row.get::<_, i64>(7).unwrap_or(0),
130 parent_id: row.get::<_, Option<i64>>(8).ok().flatten(),
131 consistency_level: cl,
132 payload,
133 })
134}
135
136#[derive(Clone)]
137pub struct ContextBus {
138 inner: Arc<Inner>,
139}
140
141const STREAM_CHANNEL_SIZE: usize = 256;
142
143struct Inner {
144 write_conn: Mutex<Connection>,
145 read_pool: Mutex<Vec<Connection>>,
146 streams: Mutex<HashMap<String, broadcast::Sender<ContextEventV1>>>,
147 version_cache: Mutex<HashMap<String, i64>>,
148 db_path: PathBuf,
149}
150
151impl Inner {
152 fn open_read_conn(path: &PathBuf) -> Connection {
153 let conn = Connection::open(path).expect("open read context-os db");
154 let _ = conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA query_only=ON;");
155 conn
156 }
157
158 fn take_read_conn(&self) -> Connection {
159 self.read_pool
160 .lock()
161 .unwrap_or_else(std::sync::PoisonError::into_inner)
162 .pop()
163 .unwrap_or_else(|| Self::open_read_conn(&self.db_path))
164 }
165
166 fn return_read_conn(&self, conn: Connection) {
167 let mut pool = self
168 .read_pool
169 .lock()
170 .unwrap_or_else(std::sync::PoisonError::into_inner);
171 if pool.len() < MAX_READ_CONNS {
172 pool.push(conn);
173 }
174 }
175
176 fn stream_key(workspace_id: &str, channel_id: &str) -> String {
177 format!("{workspace_id}\0{channel_id}")
178 }
179
180 fn next_version(&self, workspace_id: &str, channel_id: &str) -> i64 {
181 let key = Self::stream_key(workspace_id, channel_id);
182
183 {
184 let mut cache = self
185 .version_cache
186 .lock()
187 .unwrap_or_else(std::sync::PoisonError::into_inner);
188 if let Some(v) = cache.get_mut(&key) {
189 *v += 1;
190 return *v;
191 }
192 }
193
194 let conn = self.take_read_conn();
195 let v: i64 = conn
196 .query_row(
197 "SELECT COALESCE(MAX(version), 0) FROM context_events WHERE workspace_id = ?1 AND channel_id = ?2",
198 params![workspace_id, channel_id],
199 |row| row.get(0),
200 )
201 .unwrap_or(0);
202 self.return_read_conn(conn);
203
204 let mut cache = self
205 .version_cache
206 .lock()
207 .unwrap_or_else(std::sync::PoisonError::into_inner);
208 let entry = cache.entry(key).or_insert(v);
209 *entry = (*entry).max(v) + 1;
210 *entry
211 }
212}
213
214impl Default for ContextBus {
215 fn default() -> Self {
216 Self::new()
217 }
218}
219
220impl ContextBus {
221 pub fn new() -> Self {
222 let path = default_db_path();
223 if let Some(parent) = path.parent() {
224 let _ = std::fs::create_dir_all(parent);
225 }
226 let conn = Connection::open(&path).expect("open context-os db");
227 conn.execute_batch(
228 "PRAGMA journal_mode=WAL;
229 CREATE TABLE IF NOT EXISTS context_events (
230 id INTEGER PRIMARY KEY AUTOINCREMENT,
231 workspace_id TEXT NOT NULL,
232 channel_id TEXT NOT NULL,
233 kind TEXT NOT NULL,
234 actor TEXT,
235 timestamp TEXT NOT NULL,
236 payload_json TEXT NOT NULL,
237 version INTEGER NOT NULL DEFAULT 0,
238 parent_id INTEGER
239 );
240 CREATE INDEX IF NOT EXISTS idx_context_events_stream
241 ON context_events(workspace_id, channel_id, id);",
242 )
243 .expect("init context-os db");
244
245 let _ = conn.execute_batch(
246 "ALTER TABLE context_events ADD COLUMN version INTEGER NOT NULL DEFAULT 0;",
247 );
248 let _ = conn.execute_batch("ALTER TABLE context_events ADD COLUMN parent_id INTEGER;");
249
250 let _ = conn.execute_batch(
251 "CREATE VIRTUAL TABLE IF NOT EXISTS context_events_fts USING fts5(
252 payload_text,
253 content=context_events,
254 content_rowid=id
255 );",
256 );
257
258 let mut read_conns = Vec::with_capacity(MAX_READ_CONNS);
259 for _ in 0..MAX_READ_CONNS {
260 read_conns.push(Inner::open_read_conn(&path));
261 }
262
263 Self {
264 inner: Arc::new(Inner {
265 write_conn: Mutex::new(conn),
266 read_pool: Mutex::new(read_conns),
267 streams: Mutex::new(HashMap::new()),
268 version_cache: Mutex::new(HashMap::new()),
269 db_path: path,
270 }),
271 }
272 }
273
274 pub fn subscribe(
275 &self,
276 workspace_id: &str,
277 channel_id: &str,
278 ) -> broadcast::Receiver<ContextEventV1> {
279 let key = Inner::stream_key(workspace_id, channel_id);
280 let mut streams = self
281 .inner
282 .streams
283 .lock()
284 .unwrap_or_else(std::sync::PoisonError::into_inner);
285 let tx = streams
286 .entry(key)
287 .or_insert_with(|| broadcast::channel(STREAM_CHANNEL_SIZE).0);
288 tx.subscribe()
289 }
290
291 pub fn append(
292 &self,
293 workspace_id: &str,
294 channel_id: &str,
295 kind: &ContextEventKindV1,
296 actor: Option<&str>,
297 payload: Value,
298 ) -> Option<ContextEventV1> {
299 self.append_with_parent(workspace_id, channel_id, kind, actor, payload, None)
300 }
301
302 pub fn append_with_parent(
303 &self,
304 workspace_id: &str,
305 channel_id: &str,
306 kind: &ContextEventKindV1,
307 actor: Option<&str>,
308 payload: Value,
309 parent_id: Option<i64>,
310 ) -> Option<ContextEventV1> {
311 let ts = Utc::now();
312 let payload_json = payload.to_string();
313
314 let (id, version) = {
315 let Ok(conn) = self.inner.write_conn.lock() else {
316 return None;
317 };
318 let version = self.inner.next_version(workspace_id, channel_id);
319
320 let result: Result<(i64, i64), rusqlite::Error> = conn
321 .execute_batch("BEGIN IMMEDIATE")
322 .and_then(|()| {
323 conn.execute(
324 "INSERT INTO context_events (workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id)
325 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
326 params![
327 workspace_id,
328 channel_id,
329 kind.as_str(),
330 actor.map(str::to_string),
331 ts.to_rfc3339(),
332 payload_json,
333 version,
334 parent_id,
335 ],
336 )?;
337 let rowid = conn.last_insert_rowid();
338 if let Err(e) = conn.execute(
339 "INSERT INTO context_events_fts(rowid, payload_text) VALUES (?1, ?2)",
340 params![rowid, payload_json],
341 ) {
342 tracing::warn!("FTS insert failed for event {rowid}: {e}");
343 }
344 conn.execute_batch("COMMIT")?;
345 Ok((rowid, version))
346 });
347
348 match result {
349 Ok(pair) => pair,
350 Err(e) => {
351 tracing::warn!("context bus append failed: {e}");
352 let _ = conn.execute_batch("ROLLBACK");
353 return None;
354 }
355 }
356 };
357
358 let ev = ContextEventV1 {
359 id,
360 workspace_id: workspace_id.to_string(),
361 channel_id: channel_id.to_string(),
362 consistency_level: kind.consistency_level().as_str().to_string(),
363 kind: kind.as_str().to_string(),
364 actor: actor.map(str::to_string),
365 timestamp: ts,
366 version,
367 parent_id,
368 payload,
369 };
370 let key = Inner::stream_key(workspace_id, channel_id);
371 let tx = self
372 .inner
373 .streams
374 .lock()
375 .unwrap_or_else(std::sync::PoisonError::into_inner)
376 .get(&key)
377 .cloned();
378 if let Some(tx) = tx {
379 let _ = tx.send(ev.clone());
380 }
381 Some(ev)
382 }
383
384 pub fn read(
385 &self,
386 workspace_id: &str,
387 channel_id: &str,
388 since: i64,
389 limit: usize,
390 ) -> Vec<ContextEventV1> {
391 let limit = limit.clamp(1, 1000) as i64;
392 let conn = self.inner.take_read_conn();
393 let result = (|| {
394 let mut stmt = conn.prepare(
395 "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id
396 FROM context_events
397 WHERE workspace_id = ?1 AND channel_id = ?2 AND id > ?3
398 ORDER BY id ASC
399 LIMIT ?4",
400 ).ok()?;
401 let rows = stmt
402 .query_map(
403 params![workspace_id, channel_id, since, limit],
404 event_from_row,
405 )
406 .ok()?;
407 Some(rows.flatten().collect::<Vec<_>>())
408 })();
409 self.inner.return_read_conn(conn);
410 result.unwrap_or_default()
411 }
412
413 pub fn recent_by_kind(
415 &self,
416 workspace_id: &str,
417 channel_id: &str,
418 kind: &str,
419 limit: usize,
420 ) -> Vec<ContextEventV1> {
421 let limit = limit.clamp(1, 100) as i64;
422 let conn = self.inner.take_read_conn();
423 let result = (|| {
424 let mut stmt = conn.prepare(
425 "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id
426 FROM context_events
427 WHERE workspace_id = ?1 AND channel_id = ?2 AND kind = ?3
428 ORDER BY id DESC
429 LIMIT ?4",
430 ).ok()?;
431 let rows = stmt
432 .query_map(
433 params![workspace_id, channel_id, kind, limit],
434 event_from_row,
435 )
436 .ok()?;
437 Some(rows.flatten().collect::<Vec<_>>())
438 })();
439 self.inner.return_read_conn(conn);
440 result.unwrap_or_default()
441 }
442
443 pub fn search(
445 &self,
446 workspace_id: &str,
447 channel_id: Option<&str>,
448 query: &str,
449 limit: usize,
450 ) -> Vec<ContextEventV1> {
451 let limit = limit.clamp(1, 100) as i64;
452 let conn = self.inner.take_read_conn();
453 let result =
454 if let Some(ch) = channel_id {
455 (|| {
456 let mut stmt = conn.prepare(
457 "SELECT e.id, e.workspace_id, e.channel_id, e.kind, e.actor, e.timestamp,
458 e.payload_json, e.version, e.parent_id
459 FROM context_events e
460 JOIN context_events_fts f ON e.id = f.rowid
461 WHERE f.payload_text MATCH ?1 AND e.workspace_id = ?2 AND e.channel_id = ?3
462 ORDER BY f.rank
463 LIMIT ?4",
464 ).ok()?;
465 let rows = stmt
466 .query_map(params![query, workspace_id, ch, limit], event_from_row)
467 .ok()?;
468 Some(rows.flatten().collect::<Vec<_>>())
469 })()
470 } else {
471 (|| {
472 let mut stmt = conn.prepare(
473 "SELECT e.id, e.workspace_id, e.channel_id, e.kind, e.actor, e.timestamp,
474 e.payload_json, e.version, e.parent_id
475 FROM context_events e
476 JOIN context_events_fts f ON e.id = f.rowid
477 WHERE f.payload_text MATCH ?1 AND e.workspace_id = ?2
478 ORDER BY f.rank
479 LIMIT ?3",
480 ).ok()?;
481 let rows = stmt
482 .query_map(params![query, workspace_id, limit], event_from_row)
483 .ok()?;
484 Some(rows.flatten().collect::<Vec<_>>())
485 })()
486 };
487 self.inner.return_read_conn(conn);
488 result.unwrap_or_default()
489 }
490
491 pub fn lineage(&self, event_id: i64, max_depth: usize) -> Vec<ContextEventV1> {
493 let max_depth = max_depth.clamp(1, 50);
494 let conn = self.inner.take_read_conn();
495 let mut chain = Vec::new();
496 let mut current_id = Some(event_id);
497
498 for _ in 0..max_depth {
499 let Some(id) = current_id else {
500 break;
501 };
502 let ev = conn.query_row(
503 "SELECT id, workspace_id, channel_id, kind, actor, timestamp, payload_json, version, parent_id
504 FROM context_events WHERE id = ?1",
505 params![id],
506 event_from_row,
507 );
508 match ev {
509 Ok(ev) => {
510 current_id = ev.parent_id;
511 chain.push(ev);
512 }
513 Err(_) => break,
514 }
515 }
516 self.inner.return_read_conn(conn);
517 chain
518 }
519
520 pub fn latest_id(&self, workspace_id: &str, channel_id: &str) -> i64 {
522 let conn = self.inner.take_read_conn();
523 let result = conn
524 .query_row(
525 "SELECT COALESCE(MAX(id), 0) FROM context_events WHERE workspace_id = ?1 AND channel_id = ?2",
526 params![workspace_id, channel_id],
527 |row| row.get(0),
528 )
529 .unwrap_or(0);
530 self.inner.return_read_conn(conn);
531 result
532 }
533}
534
535fn default_db_path() -> PathBuf {
536 let data = crate::core::data_dir::lean_ctx_data_dir().unwrap_or_else(|_| PathBuf::from("."));
537 data.join("context-os").join("context-os.db")
538}
539
540#[cfg(test)]
541mod tests {
542 use super::*;
543
544 #[test]
545 fn append_and_read_roundtrip() {
546 let bus = ContextBus::new();
547 let ev = bus
548 .append(
549 "ws",
550 "ch",
551 &ContextEventKindV1::ToolCallRecorded,
552 Some("agent"),
553 serde_json::json!({"tool":"ctx_read"}),
554 )
555 .expect("append");
556 let got = bus.read("ws", "ch", ev.id - 1, 10);
557 assert!(got.iter().any(|e| e.id == ev.id));
558 }
559
560 #[test]
561 fn multi_client_concurrent_appends_have_deterministic_ordering() {
562 let bus = Arc::new(ContextBus::new());
563 let n_clients = 5;
564 let n_events_per_client = 20;
565 let ws = format!("ws-concurrent-{}", std::process::id());
566 let ch = format!("ch-concurrent-{}", std::process::id());
567
568 let mut handles = vec![];
569 for client_idx in 0..n_clients {
570 let bus = Arc::clone(&bus);
571 let ws = ws.clone();
572 let ch = ch.clone();
573 handles.push(std::thread::spawn(move || {
574 let agent = format!("agent-{client_idx}");
575 for event_idx in 0..n_events_per_client {
576 bus.append(
577 &ws,
578 &ch,
579 &ContextEventKindV1::ToolCallRecorded,
580 Some(&agent),
581 serde_json::json!({"client": client_idx, "seq": event_idx}),
582 );
583 }
584 }));
585 }
586
587 for h in handles {
588 h.join().unwrap();
589 }
590
591 let all = bus.read(&ws, &ch, 0, 1000);
592 assert_eq!(
593 all.len(),
594 n_clients * n_events_per_client,
595 "all events should be persisted"
596 );
597
598 let ids: Vec<i64> = all.iter().map(|e| e.id).collect();
599 let mut sorted = ids.clone();
600 sorted.sort_unstable();
601 assert_eq!(ids, sorted, "events must be in strictly ascending ID order");
602
603 for win in ids.windows(2) {
604 assert!(
605 win[1] > win[0],
606 "IDs must be strictly monotonic (no gaps from concurrent access)"
607 );
608 }
609 }
610
611 #[test]
612 fn workspace_channel_isolation() {
613 let bus = ContextBus::new();
614 let pid = std::process::id();
615 let ws_a = format!("ws-iso-a-{pid}");
616 let ws_b = format!("ws-iso-b-{pid}");
617 let ws_c = format!("ws-iso-c-{pid}");
618 let ch1 = format!("ch-iso-1-{pid}");
619 let ch2 = format!("ch-iso-2-{pid}");
620
621 bus.append(
622 &ws_a,
623 &ch1,
624 &ContextEventKindV1::SessionMutated,
625 Some("agent-a"),
626 serde_json::json!({"ws":"a","ch":"1"}),
627 );
628 bus.append(
629 &ws_a,
630 &ch2,
631 &ContextEventKindV1::KnowledgeRemembered,
632 Some("agent-a"),
633 serde_json::json!({"ws":"a","ch":"2"}),
634 );
635 bus.append(
636 &ws_b,
637 &ch1,
638 &ContextEventKindV1::ArtifactStored,
639 Some("agent-b"),
640 serde_json::json!({"ws":"b","ch":"1"}),
641 );
642
643 let ws_a_ch_1 = bus.read(&ws_a, &ch1, 0, 100);
644 assert_eq!(ws_a_ch_1.len(), 1);
645 assert_eq!(ws_a_ch_1[0].kind, "session_mutated");
646
647 let ws_a_ch_2 = bus.read(&ws_a, &ch2, 0, 100);
648 assert_eq!(ws_a_ch_2.len(), 1);
649 assert_eq!(ws_a_ch_2[0].kind, "knowledge_remembered");
650
651 let ws_b_ch_1 = bus.read(&ws_b, &ch1, 0, 100);
652 assert_eq!(ws_b_ch_1.len(), 1);
653 assert_eq!(ws_b_ch_1[0].kind, "artifact_stored");
654
655 let ws_c_ch_1 = bus.read(&ws_c, &ch1, 0, 100);
656 assert!(ws_c_ch_1.is_empty(), "non-existent workspace returns empty");
657 }
658
659 #[test]
660 fn replay_from_cursor_returns_only_newer_events() {
661 let bus = ContextBus::new();
662 let pid = std::process::id();
663 let ws = &format!("ws-replay-{pid}");
664 let ch = &format!("ch-replay-{pid}");
665
666 let ev1 = bus
667 .append(
668 ws,
669 ch,
670 &ContextEventKindV1::ToolCallRecorded,
671 None,
672 serde_json::json!({"seq":1}),
673 )
674 .unwrap();
675 let ev2 = bus
676 .append(
677 ws,
678 ch,
679 &ContextEventKindV1::SessionMutated,
680 None,
681 serde_json::json!({"seq":2}),
682 )
683 .unwrap();
684 let _ev3 = bus
685 .append(
686 ws,
687 ch,
688 &ContextEventKindV1::GraphBuilt,
689 None,
690 serde_json::json!({"seq":3}),
691 )
692 .unwrap();
693
694 let from_cursor = bus.read(ws, ch, ev2.id, 100);
695 assert_eq!(from_cursor.len(), 1, "only events after cursor");
696 assert_eq!(from_cursor[0].kind, "graph_built");
697
698 let from_first = bus.read(ws, ch, ev1.id, 100);
699 assert_eq!(from_first.len(), 2, "events after first");
700
701 let from_zero = bus.read(ws, ch, 0, 100);
702 assert_eq!(from_zero.len(), 3, "all events from zero");
703 }
704
705 #[test]
706 fn broadcast_subscriber_receives_events() {
707 let bus = ContextBus::new();
708 let mut rx = bus.subscribe("ws", "ch");
709
710 let ev = bus
711 .append(
712 "ws",
713 "ch",
714 &ContextEventKindV1::ProofAdded,
715 Some("verifier"),
716 serde_json::json!({"proof":"hash"}),
717 )
718 .unwrap();
719
720 let received = rx.try_recv().expect("subscriber should receive event");
721 assert_eq!(received.id, ev.id);
722 assert_eq!(received.kind, "proof_added");
723 assert_eq!(received.actor.as_deref(), Some("verifier"));
724 }
725}