Skip to main content

statsig_rust/observability/
ops_stats.rs

1use super::{
2    observability_client_adapter::ObservabilityEvent, sdk_errors_observer::ErrorBoundaryEvent,
3    DiagnosticsEvent,
4};
5use crate::user::StatsigUserLoggable;
6use crate::{log_e, log_w, StatsigRuntime};
7use crate::{
8    observability::console_capture_observer::ConsoleCaptureEvent,
9    sdk_diagnostics::{
10        diagnostics::ContextType,
11        marker::{KeyType, Marker},
12    },
13};
14use async_trait::async_trait;
15use lazy_static::lazy_static;
16use parking_lot::RwLock;
17use std::{
18    collections::HashMap,
19    sync::{Arc, Weak},
20};
21use tokio::sync::broadcast::{self, Sender};
22use tokio::sync::Notify;
23
24const TAG: &str = stringify!(OpsStats);
25
26/* Ideally we don't need to pass OpsStats around, but right now I could find a good way to do it to support multiple instances*/
27lazy_static! {
28    pub static ref OPS_STATS: OpsStats = OpsStats::new();
29}
30
31pub struct OpsStats {
32    instances_map: RwLock<HashMap<String, Weak<OpsStatsForInstance>>>, // key is sdk key
33}
34
35impl Default for OpsStats {
36    fn default() -> Self {
37        Self::new()
38    }
39}
40
41impl OpsStats {
42    pub fn new() -> Self {
43        OpsStats {
44            instances_map: HashMap::new().into(),
45        }
46    }
47
48    pub fn get_for_instance(&self, sdk_key: &str) -> Arc<OpsStatsForInstance> {
49        match self
50            .instances_map
51            .try_read_for(std::time::Duration::from_secs(5))
52        {
53            Some(read_guard) => {
54                if let Some(instance) = read_guard.get(sdk_key) {
55                    if let Some(instance) = instance.upgrade() {
56                        return instance.clone();
57                    }
58                }
59            }
60            None => {
61                log_e!(
62                    TAG,
63                    "Failed to get read guard: Failed to lock instances_map"
64                );
65            }
66        }
67
68        let instance = Arc::new(OpsStatsForInstance::new());
69        match self
70            .instances_map
71            .try_write_for(std::time::Duration::from_secs(5))
72        {
73            Some(mut write_guard) => {
74                write_guard.insert(sdk_key.into(), Arc::downgrade(&instance));
75            }
76            None => {
77                log_e!(
78                    TAG,
79                    "Failed to get write guard: Failed to lock instances_map"
80                );
81            }
82        }
83
84        instance
85    }
86}
87
88#[derive(Clone)]
89pub enum OpsStatsEvent {
90    Observability(ObservabilityEvent),
91    SDKError(ErrorBoundaryEvent),
92    Diagnostics(DiagnosticsEvent),
93    ConsoleCapture(ConsoleCaptureEvent),
94}
95
96pub struct OpsStatsForInstance {
97    sender: Sender<OpsStatsEvent>,
98    shutdown_notify: Arc<Notify>,
99}
100
101// The class used to handle all observability events including diagnostics, error, event logging, and external metric sharing
102impl Default for OpsStatsForInstance {
103    fn default() -> Self {
104        Self::new()
105    }
106}
107
108impl OpsStatsForInstance {
109    pub fn new() -> Self {
110        let (tx, _) = broadcast::channel(1000);
111        OpsStatsForInstance {
112            sender: tx,
113            shutdown_notify: Arc::new(Notify::new()),
114        }
115    }
116
117    pub fn log(&self, event: OpsStatsEvent) {
118        match self.sender.send(event) {
119            Ok(_) => {}
120            Err(e) => {
121                log_w!(
122                    "OpsStats Message Queue",
123                    "Dropping ops stats event {}",
124                    e.to_string()
125                );
126            }
127        }
128    }
129
130    pub fn log_error(&self, error: ErrorBoundaryEvent) {
131        self.log(OpsStatsEvent::SDKError(error));
132    }
133
134    pub fn add_marker(&self, marker: Marker, context: Option<ContextType>) {
135        self.log(OpsStatsEvent::Diagnostics(DiagnosticsEvent {
136            marker: Some(marker),
137            context,
138            key: None,
139            should_enqueue: false,
140        }));
141    }
142
143    pub fn set_diagnostics_context(&self, context: ContextType) {
144        self.log(OpsStatsEvent::Diagnostics(DiagnosticsEvent {
145            marker: None,
146            context: Some(context),
147            key: None,
148            should_enqueue: false,
149        }));
150    }
151
152    pub fn enqueue_diagnostics_event(&self, key: Option<KeyType>, context: Option<ContextType>) {
153        self.log(OpsStatsEvent::Diagnostics(DiagnosticsEvent {
154            marker: None,
155            context,
156            key,
157            should_enqueue: true,
158        }));
159    }
160
161    pub fn enqueue_console_capture_event(
162        &self,
163        level: String,
164        payload: Vec<String>,
165        timestamp: u64,
166        user: StatsigUserLoggable,
167        stack_trace: Option<String>,
168    ) {
169        self.log(OpsStatsEvent::ConsoleCapture(ConsoleCaptureEvent {
170            level,
171            payload,
172            timestamp,
173            user,
174            stack_trace,
175        }));
176    }
177
178    pub fn subscribe(
179        &self,
180        runtime: Arc<StatsigRuntime>,
181        observer: Weak<dyn OpsStatsEventObserver>,
182    ) {
183        let mut rx = self.sender.subscribe();
184        let shutdown_notify = self.shutdown_notify.clone();
185        let _ = runtime.spawn("opts_stats_listen_for", |rt_shutdown_notify| async move {
186            loop {
187                tokio::select! {
188                    event = rx.recv() => {
189                        let observer = match observer.upgrade() {
190                            Some(observer) => observer,
191                            None => break,
192                        };
193
194                        if let Ok(event) = event {
195                            observer.handle_event(event).await;
196                        }
197                    }
198                    () = rt_shutdown_notify.notified() => {
199                        break;
200                    }
201                    () = shutdown_notify.notified() => {
202                        break;
203                    }
204                }
205            }
206        });
207    }
208}
209
210impl Drop for OpsStatsForInstance {
211    fn drop(&mut self) {
212        self.shutdown_notify.notify_waiters();
213    }
214}
215
216#[async_trait]
217pub trait OpsStatsEventObserver: Send + Sync + 'static {
218    async fn handle_event(&self, event: OpsStatsEvent);
219}