statsig_rust/observability/
ops_stats.rs

1use async_trait::async_trait;
2use lazy_static::lazy_static;
3use std::{
4    collections::HashMap,
5    sync::{Arc, RwLock, Weak},
6};
7use tokio::sync::broadcast::{self, Sender};
8use tokio::sync::Notify;
9
10use crate::{log_e, log_w, StatsigRuntime};
11
12use super::{
13    observability_client_adapter::ObservabilityEvent, sdk_errors_observer::ErrorBoundaryEvent,
14};
15
16const TAG: &str = stringify!(OpsStats);
17
18/* Ideally we don't need to pass OpsStats around, but right now I could find a good way to do it to support multiple instances*/
19lazy_static! {
20    pub static ref OPS_STATS: OpsStats = OpsStats::new();
21}
22
23pub struct OpsStats {
24    instances_map: RwLock<HashMap<String, Weak<OpsStatsForInstance>>>, // key is sdk key
25}
26
27impl OpsStats {
28    pub fn new() -> Self {
29        OpsStats {
30            instances_map: HashMap::new().into(),
31        }
32    }
33
34    pub fn get_for_instance(&self, sdk_key: &str) -> Arc<OpsStatsForInstance> {
35        match self.instances_map.read() {
36            Ok(read_guard) => {
37                if let Some(instance) = read_guard.get(sdk_key) {
38                    if let Some(instance) = instance.upgrade() {
39                        return instance.clone();
40                    }
41                }
42            }
43            Err(e) => {
44                log_e!(TAG, "Failed to get read guard: {}", e);
45            }
46        }
47
48        let instance = Arc::new(OpsStatsForInstance::new());
49        match self.instances_map.write() {
50            Ok(mut write_guard) => {
51                write_guard.insert(sdk_key.into(), Arc::downgrade(&instance));
52            }
53            Err(e) => {
54                log_e!(TAG, "Failed to get write guard: {}", e);
55            }
56        }
57
58        instance
59    }
60}
61
62#[derive(Clone)]
63pub enum OpsStatsEvent {
64    ObservabilityEvent(ObservabilityEvent),
65    SDKErrorEvent(ErrorBoundaryEvent),
66}
67
68pub struct OpsStatsForInstance {
69    sender: Sender<OpsStatsEvent>,
70    shutdown_notify: Arc<Notify>,
71}
72
73// The class used to handle all observability events including diagnostics, error, event logging, and external metric sharing
74impl Default for OpsStatsForInstance {
75    fn default() -> Self {
76        Self::new()
77    }
78}
79
80impl OpsStatsForInstance {
81    pub fn new() -> Self {
82        let (tx, _) = broadcast::channel(1000);
83        OpsStatsForInstance {
84            sender: tx,
85            shutdown_notify: Arc::new(Notify::new()),
86        }
87    }
88
89    pub fn log(&self, event: OpsStatsEvent) {
90        match self.sender.send(event) {
91            Ok(_) => {}
92            Err(e) => {
93                log_w!(
94                    "OpsStats Message Queue",
95                    "Dropping ops stats event {}",
96                    e.to_string()
97                );
98            }
99        }
100    }
101
102    pub fn log_error(&self, error: ErrorBoundaryEvent) {
103        self.log(OpsStatsEvent::SDKErrorEvent(error));
104    }
105
106    pub fn subscribe(
107        &self,
108        runtime: Arc<StatsigRuntime>,
109        observer: Weak<dyn OpsStatsEventObserver>,
110    ) {
111        let mut rx = self.sender.subscribe();
112        let shutdown_notify = self.shutdown_notify.clone();
113        let _ = runtime.spawn("opts_stats_listen_for", |rt_shutdown_notify| async move {
114            loop {
115                tokio::select! {
116                    event = rx.recv() => {
117                        let observer = match observer.upgrade() {
118                            Some(observer) => observer,
119                            None => break,
120                        };
121
122                        if let Ok(event) = event {
123                            observer.handle_event(event).await;
124                        }
125                    }
126                    () = rt_shutdown_notify.notified() => {
127                        break;
128                    }
129                    () = shutdown_notify.notified() => {
130                        break;
131                    }
132                }
133            }
134        });
135    }
136}
137
138impl Drop for OpsStatsForInstance {
139    fn drop(&mut self) {
140        self.shutdown_notify.notify_waiters();
141    }
142}
143
144#[async_trait]
145pub trait OpsStatsEventObserver: Send + Sync + 'static {
146    async fn handle_event(&self, event: OpsStatsEvent);
147}