statsig_rust/observability/
ops_stats.rs

1use async_trait::async_trait;
2use lazy_static::lazy_static;
3use std::{
4    collections::HashMap,
5    sync::{Arc, RwLock, Weak},
6};
7use tokio::sync::broadcast::{self, Sender};
8use tokio::sync::Notify;
9
10use crate::sdk_diagnostics::{
11    diagnostics::ContextType,
12    marker::{KeyType, Marker},
13};
14use crate::{log_e, log_w, StatsigRuntime};
15
16use super::{
17    observability_client_adapter::ObservabilityEvent, sdk_errors_observer::ErrorBoundaryEvent,
18    DiagnosticsEvent,
19};
20
21const TAG: &str = stringify!(OpsStats);
22
23/* Ideally we don't need to pass OpsStats around, but right now I could find a good way to do it to support multiple instances*/
24lazy_static! {
25    pub static ref OPS_STATS: OpsStats = OpsStats::new();
26}
27
28pub struct OpsStats {
29    instances_map: RwLock<HashMap<String, Weak<OpsStatsForInstance>>>, // key is sdk key
30}
31
32impl OpsStats {
33    pub fn new() -> Self {
34        OpsStats {
35            instances_map: HashMap::new().into(),
36        }
37    }
38
39    pub fn get_for_instance(&self, sdk_key: &str) -> Arc<OpsStatsForInstance> {
40        match self.instances_map.read() {
41            Ok(read_guard) => {
42                if let Some(instance) = read_guard.get(sdk_key) {
43                    if let Some(instance) = instance.upgrade() {
44                        return instance.clone();
45                    }
46                }
47            }
48            Err(e) => {
49                log_e!(TAG, "Failed to get read guard: {}", e);
50            }
51        }
52
53        let instance = Arc::new(OpsStatsForInstance::new());
54        match self.instances_map.write() {
55            Ok(mut write_guard) => {
56                write_guard.insert(sdk_key.into(), Arc::downgrade(&instance));
57            }
58            Err(e) => {
59                log_e!(TAG, "Failed to get write guard: {}", e);
60            }
61        }
62
63        instance
64    }
65}
66
67#[derive(Clone)]
68pub enum OpsStatsEvent {
69    Observability(ObservabilityEvent),
70    SDKError(ErrorBoundaryEvent),
71    Diagnostics(DiagnosticsEvent),
72}
73
74pub struct OpsStatsForInstance {
75    sender: Sender<OpsStatsEvent>,
76    shutdown_notify: Arc<Notify>,
77}
78
79// The class used to handle all observability events including diagnostics, error, event logging, and external metric sharing
80impl Default for OpsStatsForInstance {
81    fn default() -> Self {
82        Self::new()
83    }
84}
85
86impl OpsStatsForInstance {
87    pub fn new() -> Self {
88        let (tx, _) = broadcast::channel(1000);
89        OpsStatsForInstance {
90            sender: tx,
91            shutdown_notify: Arc::new(Notify::new()),
92        }
93    }
94
95    pub fn log(&self, event: OpsStatsEvent) {
96        match self.sender.send(event) {
97            Ok(_) => {}
98            Err(e) => {
99                log_w!(
100                    "OpsStats Message Queue",
101                    "Dropping ops stats event {}",
102                    e.to_string()
103                );
104            }
105        }
106    }
107
108    pub fn log_error(&self, error: ErrorBoundaryEvent) {
109        self.log(OpsStatsEvent::SDKError(error));
110    }
111
112    pub fn add_marker(&self, marker: Marker, context: Option<ContextType>) {
113        self.log(OpsStatsEvent::Diagnostics(DiagnosticsEvent {
114            marker: Some(marker),
115            context,
116            key: None,
117            should_enqueue: false,
118        }));
119    }
120
121    pub fn set_diagnostics_context(&self, context: ContextType) {
122        self.log(OpsStatsEvent::Diagnostics(DiagnosticsEvent {
123            marker: None,
124            context: Some(context),
125            key: None,
126            should_enqueue: false,
127        }));
128    }
129
130    pub fn enqueue_diagnostics_event(&self, key: Option<KeyType>, context: Option<ContextType>) {
131        self.log(OpsStatsEvent::Diagnostics(DiagnosticsEvent {
132            marker: None,
133            context,
134            key,
135            should_enqueue: true,
136        }));
137    }
138
139    pub fn subscribe(
140        &self,
141        runtime: Arc<StatsigRuntime>,
142        observer: Weak<dyn OpsStatsEventObserver>,
143    ) {
144        let mut rx = self.sender.subscribe();
145        let shutdown_notify = self.shutdown_notify.clone();
146        let _ = runtime.spawn("opts_stats_listen_for", |rt_shutdown_notify| async move {
147            loop {
148                tokio::select! {
149                    event = rx.recv() => {
150                        let observer = match observer.upgrade() {
151                            Some(observer) => observer,
152                            None => break,
153                        };
154
155                        if let Ok(event) = event {
156                            observer.handle_event(event).await;
157                        }
158                    }
159                    () = rt_shutdown_notify.notified() => {
160                        break;
161                    }
162                    () = shutdown_notify.notified() => {
163                        break;
164                    }
165                }
166            }
167        });
168    }
169}
170
171impl Drop for OpsStatsForInstance {
172    fn drop(&mut self) {
173        self.shutdown_notify.notify_waiters();
174    }
175}
176
177#[async_trait]
178pub trait OpsStatsEventObserver: Send + Sync + 'static {
179    async fn handle_event(&self, event: OpsStatsEvent);
180}