Skip to main content

lean_ctx/core/context_os/
mod.rs

1use std::path::PathBuf;
2use std::sync::{Arc, OnceLock};
3
4mod shared_sessions;
5pub use shared_sessions::{SharedSessionKey, SharedSessionStore};
6
7mod context_bus;
8pub use context_bus::{
9    ConsistencyLevel, ContextBus, ContextEventKindV1, ContextEventV1, FilteredSubscription,
10    TopicFilter,
11};
12
13pub mod redaction;
14pub use redaction::{redact_event_payload, RedactionLevel};
15
16mod metrics;
17pub use metrics::{ContextOsMetrics, MetricsSnapshot};
18
19/// Shared runtime backing Context OS features (shared sessions + event bus).
20///
21/// This is intentionally process-local: it enables multi-client coordination
22/// for HTTP/daemon/team-server deployments (one process handling many clients).
23#[derive(Clone)]
24pub struct ContextOsRuntime {
25    pub shared_sessions: Arc<SharedSessionStore>,
26    pub bus: Arc<ContextBus>,
27    pub metrics: Arc<ContextOsMetrics>,
28}
29
30impl Default for ContextOsRuntime {
31    fn default() -> Self {
32        Self {
33            shared_sessions: Arc::new(SharedSessionStore::new()),
34            bus: Arc::new(ContextBus::new()),
35            metrics: Arc::new(ContextOsMetrics::default()),
36        }
37    }
38}
39
40impl ContextOsRuntime {
41    pub fn new() -> Self {
42        Self::default()
43    }
44
45    pub fn data_dir() -> Option<PathBuf> {
46        crate::core::data_dir::lean_ctx_data_dir().ok()
47    }
48}
49
50static RUNTIME: OnceLock<Arc<ContextOsRuntime>> = OnceLock::new();
51
52pub fn runtime() -> Arc<ContextOsRuntime> {
53    RUNTIME
54        .get_or_init(|| Arc::new(ContextOsRuntime::new()))
55        .clone()
56}
57
58/// Convenience: append an event to the global bus with metrics tracking.
59pub fn emit_event(
60    workspace_id: &str,
61    channel_id: &str,
62    kind: &ContextEventKindV1,
63    actor: Option<&str>,
64    payload: serde_json::Value,
65) {
66    let rt = runtime();
67    if rt
68        .bus
69        .append(workspace_id, channel_id, kind, actor, payload)
70        .is_some()
71    {
72        rt.metrics.record_event_appended();
73        rt.metrics.record_event_broadcast();
74        rt.metrics.record_workspace_active(workspace_id);
75    }
76}
77
78/// Emit an event directed at specific agents only.
79pub fn emit_directed_event(
80    workspace_id: &str,
81    channel_id: &str,
82    kind: &ContextEventKindV1,
83    actor: Option<&str>,
84    payload: serde_json::Value,
85    target_agents: Vec<String>,
86) {
87    let rt = runtime();
88    if rt
89        .bus
90        .append_directed(
91            workspace_id,
92            channel_id,
93            kind,
94            actor,
95            payload,
96            target_agents,
97        )
98        .is_some()
99    {
100        rt.metrics.record_event_appended();
101        rt.metrics.record_event_broadcast();
102        rt.metrics.record_workspace_active(workspace_id);
103    }
104}
105
106/// Classify a tool name into a secondary event kind (beyond ToolCallRecorded).
107pub fn secondary_event_kind(tool: &str, action: Option<&str>) -> Option<ContextEventKindV1> {
108    match tool {
109        "ctx_session" => {
110            let a = action.unwrap_or("");
111            if matches!(
112                a,
113                "save"
114                    | "set_task"
115                    | "task"
116                    | "checkpoint"
117                    | "finding"
118                    | "decision"
119                    | "reset"
120                    | "import"
121                    | "export"
122            ) {
123                Some(ContextEventKindV1::SessionMutated)
124            } else {
125                None
126            }
127        }
128        "ctx_handoff" | "ctx_workflow" | "ctx_share" => Some(ContextEventKindV1::SessionMutated),
129        "ctx_knowledge" | "ctx_knowledge_relations" => {
130            let a = action.unwrap_or("");
131            if matches!(
132                a,
133                "remember"
134                    | "relate"
135                    | "unrelate"
136                    | "feedback"
137                    | "remove"
138                    | "consolidate"
139                    | "import"
140            ) {
141                Some(ContextEventKindV1::KnowledgeRemembered)
142            } else {
143                None
144            }
145        }
146        "ctx_artifacts" => {
147            let a = action.unwrap_or("");
148            if matches!(a, "reindex" | "remove") {
149                Some(ContextEventKindV1::ArtifactStored)
150            } else {
151                None
152            }
153        }
154        "ctx_graph" => {
155            let a = action.unwrap_or("");
156            if matches!(
157                a,
158                "index-build"
159                    | "index-build-full"
160                    | "index-build-background"
161                    | "index-build-full-background"
162            ) {
163                Some(ContextEventKindV1::GraphBuilt)
164            } else {
165                None
166            }
167        }
168        "ctx_proof" | "ctx_verify" => {
169            let a = action.unwrap_or("");
170            if matches!(a, "generate" | "export" | "verify") {
171                Some(ContextEventKindV1::ProofAdded)
172            } else {
173                None
174            }
175        }
176        _ => None,
177    }
178}