Skip to main content

lean_ctx/core/context_os/
mod.rs

1use std::path::PathBuf;
2use std::sync::{Arc, OnceLock};
3
4mod shared_sessions;
5pub use shared_sessions::{SharedSessionKey, SharedSessionStore};
6
7mod context_bus;
8pub use context_bus::{
9    ConsistencyLevel, ContextBus, ContextEventKindV1, ContextEventV1, FilteredSubscription,
10    TopicFilter,
11};
12
13pub mod redaction;
14pub use redaction::{redact_event_payload, redact_payload_value, RedactionLevel};
15
16/// Wraps either a plain `broadcast::Receiver` or a `FilteredSubscription`
17/// so the SSE route can handle both with the same code path.
18pub enum SubscriptionKind {
19    Unfiltered(tokio::sync::broadcast::Receiver<ContextEventV1>),
20    Filtered(FilteredSubscription),
21}
22
23impl SubscriptionKind {
24    pub async fn recv(
25        &mut self,
26    ) -> Result<ContextEventV1, tokio::sync::broadcast::error::RecvError> {
27        match self {
28            Self::Unfiltered(rx) => rx.recv().await,
29            Self::Filtered(fs) => fs.recv_filtered().await,
30        }
31    }
32}
33
34mod metrics;
35pub use metrics::{ContextOsMetrics, MetricsSnapshot};
36
37/// Shared runtime backing Context OS features (shared sessions + event bus).
38///
39/// This is intentionally process-local: it enables multi-client coordination
40/// for HTTP/daemon/team-server deployments (one process handling many clients).
41#[derive(Clone)]
42pub struct ContextOsRuntime {
43    pub shared_sessions: Arc<SharedSessionStore>,
44    pub bus: Arc<ContextBus>,
45    pub metrics: Arc<ContextOsMetrics>,
46}
47
48impl Default for ContextOsRuntime {
49    fn default() -> Self {
50        Self {
51            shared_sessions: Arc::new(SharedSessionStore::new()),
52            bus: Arc::new(ContextBus::new()),
53            metrics: Arc::new(ContextOsMetrics::default()),
54        }
55    }
56}
57
58impl ContextOsRuntime {
59    pub fn new() -> Self {
60        Self::default()
61    }
62
63    pub fn data_dir() -> Option<PathBuf> {
64        crate::core::data_dir::lean_ctx_data_dir().ok()
65    }
66}
67
68static RUNTIME: OnceLock<Arc<ContextOsRuntime>> = OnceLock::new();
69
70pub fn runtime() -> Arc<ContextOsRuntime> {
71    RUNTIME
72        .get_or_init(|| Arc::new(ContextOsRuntime::new()))
73        .clone()
74}
75
76/// Convenience: append an event to the global bus with metrics tracking.
77pub fn emit_event(
78    workspace_id: &str,
79    channel_id: &str,
80    kind: &ContextEventKindV1,
81    actor: Option<&str>,
82    payload: serde_json::Value,
83) {
84    let rt = runtime();
85    if rt
86        .bus
87        .append(workspace_id, channel_id, kind, actor, payload)
88        .is_some()
89    {
90        rt.metrics.record_event_appended();
91        rt.metrics.record_event_broadcast();
92        rt.metrics.record_workspace_active(workspace_id);
93    }
94}
95
96/// Emit an event directed at specific agents only.
97pub fn emit_directed_event(
98    workspace_id: &str,
99    channel_id: &str,
100    kind: &ContextEventKindV1,
101    actor: Option<&str>,
102    payload: serde_json::Value,
103    target_agents: Vec<String>,
104) {
105    let rt = runtime();
106    if rt
107        .bus
108        .append_directed(
109            workspace_id,
110            channel_id,
111            kind,
112            actor,
113            payload,
114            target_agents,
115        )
116        .is_some()
117    {
118        rt.metrics.record_event_appended();
119        rt.metrics.record_event_broadcast();
120        rt.metrics.record_workspace_active(workspace_id);
121    }
122}
123
124/// Classify a tool name into a secondary event kind (beyond ToolCallRecorded).
125pub fn secondary_event_kind(tool: &str, action: Option<&str>) -> Option<ContextEventKindV1> {
126    match tool {
127        "ctx_session" => {
128            let a = action.unwrap_or("");
129            if matches!(
130                a,
131                "save"
132                    | "set_task"
133                    | "task"
134                    | "checkpoint"
135                    | "finding"
136                    | "decision"
137                    | "reset"
138                    | "import"
139                    | "export"
140            ) {
141                Some(ContextEventKindV1::SessionMutated)
142            } else {
143                None
144            }
145        }
146        "ctx_handoff" | "ctx_workflow" | "ctx_share" => Some(ContextEventKindV1::SessionMutated),
147        "ctx_knowledge" | "ctx_knowledge_relations" => {
148            let a = action.unwrap_or("");
149            if matches!(
150                a,
151                "remember"
152                    | "relate"
153                    | "unrelate"
154                    | "feedback"
155                    | "remove"
156                    | "consolidate"
157                    | "import"
158            ) {
159                Some(ContextEventKindV1::KnowledgeRemembered)
160            } else {
161                None
162            }
163        }
164        "ctx_artifacts" => {
165            let a = action.unwrap_or("");
166            if matches!(a, "reindex" | "remove") {
167                Some(ContextEventKindV1::ArtifactStored)
168            } else {
169                None
170            }
171        }
172        "ctx_graph" => {
173            let a = action.unwrap_or("");
174            if matches!(
175                a,
176                "index-build"
177                    | "index-build-full"
178                    | "index-build-background"
179                    | "index-build-full-background"
180            ) {
181                Some(ContextEventKindV1::GraphBuilt)
182            } else {
183                None
184            }
185        }
186        "ctx_proof" | "ctx_verify" => {
187            let a = action.unwrap_or("");
188            if matches!(a, "generate" | "export" | "verify") {
189                Some(ContextEventKindV1::ProofAdded)
190            } else {
191                None
192            }
193        }
194        _ => None,
195    }
196}