Skip to main content

lean_ctx/core/context_os/
metrics.rs

1use std::sync::atomic::{AtomicU64, Ordering};
2use std::sync::Mutex;
3
4use serde::Serialize;
5
6/// Process-local metrics for Context OS observability.
7#[derive(Default)]
8pub struct ContextOsMetrics {
9    events_appended: AtomicU64,
10    events_broadcast: AtomicU64,
11    events_replayed: AtomicU64,
12    sse_connections_opened: AtomicU64,
13    sse_connections_closed: AtomicU64,
14    shared_sessions_loaded: AtomicU64,
15    shared_sessions_persisted: AtomicU64,
16    active_workspaces: Mutex<std::collections::HashSet<String>>,
17}
18
19#[derive(Debug, Clone, Serialize)]
20#[serde(rename_all = "camelCase")]
21pub struct MetricsSnapshot {
22    pub events_appended: u64,
23    pub events_broadcast: u64,
24    pub events_replayed: u64,
25    pub sse_connections_active: u64,
26    pub sse_connections_total: u64,
27    pub shared_sessions_loaded: u64,
28    pub shared_sessions_persisted: u64,
29    pub active_workspace_count: usize,
30}
31
32impl ContextOsMetrics {
33    pub fn record_event_appended(&self) {
34        self.events_appended.fetch_add(1, Ordering::Relaxed);
35    }
36
37    pub fn record_event_broadcast(&self) {
38        self.events_broadcast.fetch_add(1, Ordering::Relaxed);
39    }
40
41    pub fn record_events_replayed(&self, count: u64) {
42        self.events_replayed.fetch_add(count, Ordering::Relaxed);
43    }
44
45    pub fn record_sse_connect(&self) {
46        self.sse_connections_opened.fetch_add(1, Ordering::Relaxed);
47    }
48
49    pub fn record_sse_disconnect(&self) {
50        self.sse_connections_closed.fetch_add(1, Ordering::Relaxed);
51    }
52
53    pub fn record_session_loaded(&self) {
54        self.shared_sessions_loaded.fetch_add(1, Ordering::Relaxed);
55    }
56
57    pub fn record_session_persisted(&self) {
58        self.shared_sessions_persisted
59            .fetch_add(1, Ordering::Relaxed);
60    }
61
62    pub fn record_workspace_active(&self, workspace_id: &str) {
63        if let Ok(mut set) = self.active_workspaces.lock() {
64            set.insert(workspace_id.to_string());
65        }
66    }
67
68    pub fn snapshot(&self) -> MetricsSnapshot {
69        let opened = self.sse_connections_opened.load(Ordering::Relaxed);
70        let closed = self.sse_connections_closed.load(Ordering::Relaxed);
71        MetricsSnapshot {
72            events_appended: self.events_appended.load(Ordering::Relaxed),
73            events_broadcast: self.events_broadcast.load(Ordering::Relaxed),
74            events_replayed: self.events_replayed.load(Ordering::Relaxed),
75            sse_connections_active: opened.saturating_sub(closed),
76            sse_connections_total: opened,
77            shared_sessions_loaded: self.shared_sessions_loaded.load(Ordering::Relaxed),
78            shared_sessions_persisted: self.shared_sessions_persisted.load(Ordering::Relaxed),
79            active_workspace_count: self.active_workspaces.lock().map_or(0, |s| s.len()),
80        }
81    }
82}