statsig_rust/observability/
ops_stats.rs1use async_trait::async_trait;
2use lazy_static::lazy_static;
3use std::{
4 collections::HashMap,
5 sync::{Arc, RwLock, Weak},
6};
7use tokio::sync::broadcast::{self, Sender};
8use tokio::sync::Notify;
9
10use crate::{log_e, log_w, StatsigRuntime};
11
12use super::{
13 observability_client_adapter::ObservabilityEvent, sdk_errors_observer::ErrorBoundaryEvent,
14};
15
16const TAG: &str = stringify!(OpsStats);
17
18lazy_static! {
20 pub static ref OPS_STATS: OpsStats = OpsStats::new();
21}
22
23pub struct OpsStats {
24 instances_map: RwLock<HashMap<String, Weak<OpsStatsForInstance>>>, }
26
27impl OpsStats {
28 pub fn new() -> Self {
29 OpsStats {
30 instances_map: HashMap::new().into(),
31 }
32 }
33
34 pub fn get_for_instance(&self, sdk_key: &str) -> Arc<OpsStatsForInstance> {
35 match self.instances_map.read() {
36 Ok(read_guard) => {
37 if let Some(instance) = read_guard.get(sdk_key) {
38 if let Some(instance) = instance.upgrade() {
39 return instance.clone();
40 }
41 }
42 }
43 Err(e) => {
44 log_e!(TAG, "Failed to get read guard: {}", e);
45 }
46 }
47
48 let instance = Arc::new(OpsStatsForInstance::new());
49 match self.instances_map.write() {
50 Ok(mut write_guard) => {
51 write_guard.insert(sdk_key.into(), Arc::downgrade(&instance));
52 }
53 Err(e) => {
54 log_e!(TAG, "Failed to get write guard: {}", e);
55 }
56 }
57
58 instance
59 }
60}
61
62#[derive(Clone)]
63pub enum OpsStatsEvent {
64 ObservabilityEvent(ObservabilityEvent),
65 SDKErrorEvent(ErrorBoundaryEvent),
66}
67
68pub struct OpsStatsForInstance {
69 sender: Sender<OpsStatsEvent>,
70 shutdown_notify: Arc<Notify>,
71}
72
73impl Default for OpsStatsForInstance {
75 fn default() -> Self {
76 Self::new()
77 }
78}
79
80impl OpsStatsForInstance {
81 pub fn new() -> Self {
82 let (tx, _) = broadcast::channel(1000);
83 OpsStatsForInstance {
84 sender: tx,
85 shutdown_notify: Arc::new(Notify::new()),
86 }
87 }
88
89 pub fn log(&self, event: OpsStatsEvent) {
90 match self.sender.send(event) {
91 Ok(_) => {}
92 Err(e) => {
93 log_w!(
94 "OpsStats Message Queue",
95 "Dropping ops stats event {}",
96 e.to_string()
97 );
98 }
99 }
100 }
101
102 pub fn log_error(&self, error: ErrorBoundaryEvent) {
103 self.log(OpsStatsEvent::SDKErrorEvent(error));
104 }
105
106 pub fn subscribe(
107 &self,
108 runtime: Arc<StatsigRuntime>,
109 observer: Weak<dyn OpsStatsEventObserver>,
110 ) {
111 let mut rx = self.sender.subscribe();
112 let shutdown_notify = self.shutdown_notify.clone();
113 let _ = runtime.spawn("opts_stats_listen_for", |rt_shutdown_notify| async move {
114 loop {
115 tokio::select! {
116 event = rx.recv() => {
117 let observer = match observer.upgrade() {
118 Some(observer) => observer,
119 None => break,
120 };
121
122 if let Ok(event) = event {
123 observer.handle_event(event).await;
124 }
125 }
126 () = rt_shutdown_notify.notified() => {
127 break;
128 }
129 () = shutdown_notify.notified() => {
130 break;
131 }
132 }
133 }
134 });
135 }
136}
137
138impl Drop for OpsStatsForInstance {
139 fn drop(&mut self) {
140 self.shutdown_notify.notify_waiters();
141 }
142}
143
144#[async_trait]
145pub trait OpsStatsEventObserver: Send + Sync + 'static {
146 async fn handle_event(&self, event: OpsStatsEvent);
147}