Skip to main content

lean_ctx/core/context_os/
metrics.rs

1use std::sync::atomic::{AtomicU64, Ordering};
2use std::sync::Mutex;
3use std::time::Instant;
4
5use serde::Serialize;
6
7const WORKSPACE_ACTIVE_TTL_SECS: u64 = 600;
8
9/// Process-local metrics for Context OS observability.
10pub struct ContextOsMetrics {
11    events_appended: AtomicU64,
12    events_broadcast: AtomicU64,
13    events_replayed: AtomicU64,
14    sse_connections_opened: AtomicU64,
15    sse_connections_closed: AtomicU64,
16    shared_sessions_loaded: AtomicU64,
17    shared_sessions_persisted: AtomicU64,
18    active_workspaces: Mutex<std::collections::HashMap<String, Instant>>,
19}
20
21impl Default for ContextOsMetrics {
22    fn default() -> Self {
23        Self {
24            events_appended: AtomicU64::new(0),
25            events_broadcast: AtomicU64::new(0),
26            events_replayed: AtomicU64::new(0),
27            sse_connections_opened: AtomicU64::new(0),
28            sse_connections_closed: AtomicU64::new(0),
29            shared_sessions_loaded: AtomicU64::new(0),
30            shared_sessions_persisted: AtomicU64::new(0),
31            active_workspaces: Mutex::new(std::collections::HashMap::new()),
32        }
33    }
34}
35
36#[derive(Debug, Clone, Serialize)]
37#[serde(rename_all = "camelCase")]
38pub struct MetricsSnapshot {
39    pub events_appended: u64,
40    pub events_broadcast: u64,
41    pub events_replayed: u64,
42    pub sse_connections_active: u64,
43    pub sse_connections_total: u64,
44    pub shared_sessions_loaded: u64,
45    pub shared_sessions_persisted: u64,
46    pub active_workspace_count: usize,
47}
48
49impl ContextOsMetrics {
50    pub fn record_event_appended(&self) {
51        self.events_appended.fetch_add(1, Ordering::Relaxed);
52    }
53
54    pub fn record_event_broadcast(&self) {
55        self.events_broadcast.fetch_add(1, Ordering::Relaxed);
56    }
57
58    pub fn record_events_replayed(&self, count: u64) {
59        self.events_replayed.fetch_add(count, Ordering::Relaxed);
60    }
61
62    pub fn record_sse_connect(&self) {
63        self.sse_connections_opened.fetch_add(1, Ordering::Relaxed);
64    }
65
66    pub fn record_sse_disconnect(&self) {
67        self.sse_connections_closed.fetch_add(1, Ordering::Relaxed);
68    }
69
70    pub fn record_session_loaded(&self) {
71        self.shared_sessions_loaded.fetch_add(1, Ordering::Relaxed);
72    }
73
74    pub fn record_session_persisted(&self) {
75        self.shared_sessions_persisted
76            .fetch_add(1, Ordering::Relaxed);
77    }
78
79    pub fn record_workspace_active(&self, workspace_id: &str) {
80        if let Ok(mut map) = self.active_workspaces.lock() {
81            map.insert(workspace_id.to_string(), Instant::now());
82        }
83    }
84
85    pub fn snapshot(&self) -> MetricsSnapshot {
86        let opened = self.sse_connections_opened.load(Ordering::Relaxed);
87        let closed = self.sse_connections_closed.load(Ordering::Relaxed);
88        let active_workspace_count = if let Ok(mut map) = self.active_workspaces.lock() {
89            if let Some(cutoff) = Instant::now()
90                .checked_sub(std::time::Duration::from_secs(WORKSPACE_ACTIVE_TTL_SECS))
91            {
92                map.retain(|_, last_seen| *last_seen > cutoff);
93            }
94            map.len()
95        } else {
96            0
97        };
98        MetricsSnapshot {
99            events_appended: self.events_appended.load(Ordering::Relaxed),
100            events_broadcast: self.events_broadcast.load(Ordering::Relaxed),
101            events_replayed: self.events_replayed.load(Ordering::Relaxed),
102            sse_connections_active: opened.saturating_sub(closed),
103            sse_connections_total: opened,
104            shared_sessions_loaded: self.shared_sessions_loaded.load(Ordering::Relaxed),
105            shared_sessions_persisted: self.shared_sessions_persisted.load(Ordering::Relaxed),
106            active_workspace_count,
107        }
108    }
109}