Skip to main content

statsig_rust/observability/
ops_stats.rs

1use super::{
2    observability_client_adapter::{MetricType, ObservabilityEvent},
3    sdk_errors_observer::ErrorBoundaryEvent,
4    DiagnosticsEvent,
5};
6use crate::user::StatsigUserLoggable;
7use crate::{log_e, log_w, StatsigRuntime};
8use crate::{
9    observability::console_capture_observer::ConsoleCaptureEvent,
10    sdk_diagnostics::{
11        diagnostics::ContextType,
12        marker::{KeyType, Marker},
13    },
14};
15use async_trait::async_trait;
16use lazy_static::lazy_static;
17use parking_lot::RwLock;
18use std::{
19    collections::HashMap,
20    sync::{Arc, Weak},
21};
22use tokio::sync::broadcast::{self, Sender};
23use tokio::sync::Notify;
24
25const TAG: &str = stringify!(OpsStats);
26
27/* Ideally we don't need to pass OpsStats around, but right now I could find a good way to do it to support multiple instances*/
28lazy_static! {
29    pub static ref OPS_STATS: OpsStats = OpsStats::new();
30}
31
32pub struct OpsStats {
33    instances_map: RwLock<HashMap<String, Weak<OpsStatsForInstance>>>, // key is sdk key
34}
35
36impl Default for OpsStats {
37    fn default() -> Self {
38        Self::new()
39    }
40}
41
42impl OpsStats {
43    pub fn new() -> Self {
44        OpsStats {
45            instances_map: HashMap::new().into(),
46        }
47    }
48
49    pub fn get_for_instance(&self, sdk_key: &str) -> Arc<OpsStatsForInstance> {
50        match self
51            .instances_map
52            .try_read_for(std::time::Duration::from_secs(5))
53        {
54            Some(read_guard) => {
55                if let Some(instance) = read_guard.get(sdk_key) {
56                    if let Some(instance) = instance.upgrade() {
57                        return instance.clone();
58                    }
59                }
60            }
61            None => {
62                log_e!(
63                    TAG,
64                    "Failed to get read guard: Failed to lock instances_map"
65                );
66            }
67        }
68
69        let instance = Arc::new(OpsStatsForInstance::new());
70        match self
71            .instances_map
72            .try_write_for(std::time::Duration::from_secs(5))
73        {
74            Some(mut write_guard) => {
75                write_guard.insert(sdk_key.into(), Arc::downgrade(&instance));
76            }
77            None => {
78                log_e!(
79                    TAG,
80                    "Failed to get write guard: Failed to lock instances_map"
81                );
82            }
83        }
84
85        instance
86    }
87}
88
89#[derive(Clone)]
90pub enum OpsStatsEvent {
91    Observability(ObservabilityEvent),
92    SDKError(ErrorBoundaryEvent),
93    Diagnostics(DiagnosticsEvent),
94    ConsoleCapture(ConsoleCaptureEvent),
95}
96
97pub struct OpsStatsForInstance {
98    sender: Sender<OpsStatsEvent>,
99    shutdown_notify: Arc<Notify>,
100}
101
102// The class used to handle all observability events including diagnostics, error, event logging, and external metric sharing
103impl Default for OpsStatsForInstance {
104    fn default() -> Self {
105        Self::new()
106    }
107}
108
109impl OpsStatsForInstance {
110    pub fn new() -> Self {
111        let (tx, _) = broadcast::channel(1000);
112        OpsStatsForInstance {
113            sender: tx,
114            shutdown_notify: Arc::new(Notify::new()),
115        }
116    }
117
118    pub fn log(&self, event: OpsStatsEvent) {
119        match self.sender.send(event) {
120            Ok(_) => {}
121            Err(e) => {
122                log_w!(
123                    "OpsStats Message Queue",
124                    "Dropping ops stats event {}",
125                    e.to_string()
126                );
127            }
128        }
129    }
130
131    pub fn log_error(&self, error: ErrorBoundaryEvent) {
132        self.log(OpsStatsEvent::SDKError(error));
133    }
134
135    pub fn log_checksum_validation_result(&self, is_success: bool) {
136        self.log(ObservabilityEvent::new_event(
137            MetricType::Increment,
138            "deltas_checksum_validation.count".to_string(),
139            1.0,
140            Some(HashMap::from([(
141                "result".to_string(),
142                if is_success { "success" } else { "failure" }.to_string(),
143            )])),
144        ));
145    }
146
147    pub fn add_marker(&self, marker: Marker, context: Option<ContextType>) {
148        self.log(OpsStatsEvent::Diagnostics(DiagnosticsEvent {
149            marker: Some(marker),
150            context,
151            key: None,
152            should_enqueue: false,
153        }));
154    }
155
156    pub fn set_diagnostics_context(&self, context: ContextType) {
157        self.log(OpsStatsEvent::Diagnostics(DiagnosticsEvent {
158            marker: None,
159            context: Some(context),
160            key: None,
161            should_enqueue: false,
162        }));
163    }
164
165    pub fn enqueue_diagnostics_event(&self, key: Option<KeyType>, context: Option<ContextType>) {
166        self.log(OpsStatsEvent::Diagnostics(DiagnosticsEvent {
167            marker: None,
168            context,
169            key,
170            should_enqueue: true,
171        }));
172    }
173
174    pub fn enqueue_console_capture_event(
175        &self,
176        level: String,
177        payload: Vec<String>,
178        timestamp: u64,
179        user: StatsigUserLoggable,
180        stack_trace: Option<String>,
181    ) {
182        self.log(OpsStatsEvent::ConsoleCapture(ConsoleCaptureEvent {
183            level,
184            payload,
185            timestamp,
186            user,
187            stack_trace,
188        }));
189    }
190
191    pub fn subscribe(
192        &self,
193        runtime: Arc<StatsigRuntime>,
194        observer: Weak<dyn OpsStatsEventObserver>,
195    ) {
196        let mut rx = self.sender.subscribe();
197        let shutdown_notify = self.shutdown_notify.clone();
198        let _ = runtime.spawn("opts_stats_listen_for", |rt_shutdown_notify| async move {
199            loop {
200                tokio::select! {
201                    event = rx.recv() => {
202                        let observer = match observer.upgrade() {
203                            Some(observer) => observer,
204                            None => break,
205                        };
206
207                        if let Ok(event) = event {
208                            observer.handle_event(event).await;
209                        }
210                    }
211                    () = rt_shutdown_notify.notified() => {
212                        break;
213                    }
214                    () = shutdown_notify.notified() => {
215                        break;
216                    }
217                }
218            }
219        });
220    }
221}
222
223impl Drop for OpsStatsForInstance {
224    fn drop(&mut self) {
225        self.shutdown_notify.notify_waiters();
226    }
227}
228
229#[async_trait]
230pub trait OpsStatsEventObserver: Send + Sync + 'static {
231    async fn handle_event(&self, event: OpsStatsEvent);
232}