statsig_rust/observability/
ops_stats.rs

1use super::{
2    observability_client_adapter::ObservabilityEvent, sdk_errors_observer::ErrorBoundaryEvent,
3    DiagnosticsEvent,
4};
5use crate::sdk_diagnostics::{
6    diagnostics::ContextType,
7    marker::{KeyType, Marker},
8};
9use crate::{log_e, log_w, StatsigRuntime};
10use async_trait::async_trait;
11use lazy_static::lazy_static;
12use parking_lot::RwLock;
13use std::{
14    collections::HashMap,
15    sync::{Arc, Weak},
16};
17use tokio::sync::broadcast::{self, Sender};
18use tokio::sync::Notify;
19
20const TAG: &str = stringify!(OpsStats);
21
22/* Ideally we don't need to pass OpsStats around, but right now I could find a good way to do it to support multiple instances*/
23lazy_static! {
24    pub static ref OPS_STATS: OpsStats = OpsStats::new();
25}
26
27pub struct OpsStats {
28    instances_map: RwLock<HashMap<String, Weak<OpsStatsForInstance>>>, // key is sdk key
29}
30
31impl OpsStats {
32    pub fn new() -> Self {
33        OpsStats {
34            instances_map: HashMap::new().into(),
35        }
36    }
37
38    pub fn get_for_instance(&self, sdk_key: &str) -> Arc<OpsStatsForInstance> {
39        match self
40            .instances_map
41            .try_read_for(std::time::Duration::from_secs(5))
42        {
43            Some(read_guard) => {
44                if let Some(instance) = read_guard.get(sdk_key) {
45                    if let Some(instance) = instance.upgrade() {
46                        return instance.clone();
47                    }
48                }
49            }
50            None => {
51                log_e!(
52                    TAG,
53                    "Failed to get read guard: Failed to lock instances_map"
54                );
55            }
56        }
57
58        let instance = Arc::new(OpsStatsForInstance::new());
59        match self
60            .instances_map
61            .try_write_for(std::time::Duration::from_secs(5))
62        {
63            Some(mut write_guard) => {
64                write_guard.insert(sdk_key.into(), Arc::downgrade(&instance));
65            }
66            None => {
67                log_e!(
68                    TAG,
69                    "Failed to get write guard: Failed to lock instances_map"
70                );
71            }
72        }
73
74        instance
75    }
76}
77
78#[derive(Clone)]
79pub enum OpsStatsEvent {
80    Observability(ObservabilityEvent),
81    SDKError(ErrorBoundaryEvent),
82    Diagnostics(DiagnosticsEvent),
83}
84
85pub struct OpsStatsForInstance {
86    sender: Sender<OpsStatsEvent>,
87    shutdown_notify: Arc<Notify>,
88}
89
90// The class used to handle all observability events including diagnostics, error, event logging, and external metric sharing
91impl Default for OpsStatsForInstance {
92    fn default() -> Self {
93        Self::new()
94    }
95}
96
97impl OpsStatsForInstance {
98    pub fn new() -> Self {
99        let (tx, _) = broadcast::channel(1000);
100        OpsStatsForInstance {
101            sender: tx,
102            shutdown_notify: Arc::new(Notify::new()),
103        }
104    }
105
106    pub fn log(&self, event: OpsStatsEvent) {
107        match self.sender.send(event) {
108            Ok(_) => {}
109            Err(e) => {
110                log_w!(
111                    "OpsStats Message Queue",
112                    "Dropping ops stats event {}",
113                    e.to_string()
114                );
115            }
116        }
117    }
118
119    pub fn log_error(&self, error: ErrorBoundaryEvent) {
120        self.log(OpsStatsEvent::SDKError(error));
121    }
122
123    pub fn add_marker(&self, marker: Marker, context: Option<ContextType>) {
124        self.log(OpsStatsEvent::Diagnostics(DiagnosticsEvent {
125            marker: Some(marker),
126            context,
127            key: None,
128            should_enqueue: false,
129        }));
130    }
131
132    pub fn set_diagnostics_context(&self, context: ContextType) {
133        self.log(OpsStatsEvent::Diagnostics(DiagnosticsEvent {
134            marker: None,
135            context: Some(context),
136            key: None,
137            should_enqueue: false,
138        }));
139    }
140
141    pub fn enqueue_diagnostics_event(&self, key: Option<KeyType>, context: Option<ContextType>) {
142        self.log(OpsStatsEvent::Diagnostics(DiagnosticsEvent {
143            marker: None,
144            context,
145            key,
146            should_enqueue: true,
147        }));
148    }
149
150    pub fn subscribe(
151        &self,
152        runtime: Arc<StatsigRuntime>,
153        observer: Weak<dyn OpsStatsEventObserver>,
154    ) {
155        let mut rx = self.sender.subscribe();
156        let shutdown_notify = self.shutdown_notify.clone();
157        let _ = runtime.spawn("opts_stats_listen_for", |rt_shutdown_notify| async move {
158            loop {
159                tokio::select! {
160                    event = rx.recv() => {
161                        let observer = match observer.upgrade() {
162                            Some(observer) => observer,
163                            None => break,
164                        };
165
166                        if let Ok(event) = event {
167                            observer.handle_event(event).await;
168                        }
169                    }
170                    () = rt_shutdown_notify.notified() => {
171                        break;
172                    }
173                    () = shutdown_notify.notified() => {
174                        break;
175                    }
176                }
177            }
178        });
179    }
180}
181
182impl Drop for OpsStatsForInstance {
183    fn drop(&mut self) {
184        self.shutdown_notify.notify_waiters();
185    }
186}
187
188#[async_trait]
189pub trait OpsStatsEventObserver: Send + Sync + 'static {
190    async fn handle_event(&self, event: OpsStatsEvent);
191}