statsig_rust/observability/
ops_stats.rs1use async_trait::async_trait;
2use lazy_static::lazy_static;
3use std::{
4 collections::HashMap,
5 sync::{Arc, RwLock, Weak},
6};
7use tokio::sync::broadcast::{self, Sender};
8use tokio::sync::Notify;
9
10use crate::sdk_diagnostics::{
11 diagnostics::ContextType,
12 marker::{KeyType, Marker},
13};
14use crate::{log_e, log_w, StatsigRuntime};
15
16use super::{
17 observability_client_adapter::ObservabilityEvent, sdk_errors_observer::ErrorBoundaryEvent,
18 DiagnosticsEvent,
19};
20
21const TAG: &str = stringify!(OpsStats);
22
23lazy_static! {
25 pub static ref OPS_STATS: OpsStats = OpsStats::new();
26}
27
28pub struct OpsStats {
29 instances_map: RwLock<HashMap<String, Weak<OpsStatsForInstance>>>, }
31
32impl OpsStats {
33 pub fn new() -> Self {
34 OpsStats {
35 instances_map: HashMap::new().into(),
36 }
37 }
38
39 pub fn get_for_instance(&self, sdk_key: &str) -> Arc<OpsStatsForInstance> {
40 match self.instances_map.read() {
41 Ok(read_guard) => {
42 if let Some(instance) = read_guard.get(sdk_key) {
43 if let Some(instance) = instance.upgrade() {
44 return instance.clone();
45 }
46 }
47 }
48 Err(e) => {
49 log_e!(TAG, "Failed to get read guard: {}", e);
50 }
51 }
52
53 let instance = Arc::new(OpsStatsForInstance::new());
54 match self.instances_map.write() {
55 Ok(mut write_guard) => {
56 write_guard.insert(sdk_key.into(), Arc::downgrade(&instance));
57 }
58 Err(e) => {
59 log_e!(TAG, "Failed to get write guard: {}", e);
60 }
61 }
62
63 instance
64 }
65}
66
67#[derive(Clone)]
68pub enum OpsStatsEvent {
69 Observability(ObservabilityEvent),
70 SDKError(ErrorBoundaryEvent),
71 Diagnostics(DiagnosticsEvent),
72}
73
74pub struct OpsStatsForInstance {
75 sender: Sender<OpsStatsEvent>,
76 shutdown_notify: Arc<Notify>,
77}
78
79impl Default for OpsStatsForInstance {
81 fn default() -> Self {
82 Self::new()
83 }
84}
85
86impl OpsStatsForInstance {
87 pub fn new() -> Self {
88 let (tx, _) = broadcast::channel(1000);
89 OpsStatsForInstance {
90 sender: tx,
91 shutdown_notify: Arc::new(Notify::new()),
92 }
93 }
94
95 pub fn log(&self, event: OpsStatsEvent) {
96 match self.sender.send(event) {
97 Ok(_) => {}
98 Err(e) => {
99 log_w!(
100 "OpsStats Message Queue",
101 "Dropping ops stats event {}",
102 e.to_string()
103 );
104 }
105 }
106 }
107
108 pub fn log_error(&self, error: ErrorBoundaryEvent) {
109 self.log(OpsStatsEvent::SDKError(error));
110 }
111
112 pub fn add_marker(&self, marker: Marker, context: Option<ContextType>) {
113 self.log(OpsStatsEvent::Diagnostics(DiagnosticsEvent {
114 marker: Some(marker),
115 context,
116 key: None,
117 should_enqueue: false,
118 }));
119 }
120
121 pub fn set_diagnostics_context(&self, context: ContextType) {
122 self.log(OpsStatsEvent::Diagnostics(DiagnosticsEvent {
123 marker: None,
124 context: Some(context),
125 key: None,
126 should_enqueue: false,
127 }));
128 }
129
130 pub fn enqueue_diagnostics_event(&self, key: Option<KeyType>, context: Option<ContextType>) {
131 self.log(OpsStatsEvent::Diagnostics(DiagnosticsEvent {
132 marker: None,
133 context,
134 key,
135 should_enqueue: true,
136 }));
137 }
138
139 pub fn subscribe(
140 &self,
141 runtime: Arc<StatsigRuntime>,
142 observer: Weak<dyn OpsStatsEventObserver>,
143 ) {
144 let mut rx = self.sender.subscribe();
145 let shutdown_notify = self.shutdown_notify.clone();
146 let _ = runtime.spawn("opts_stats_listen_for", |rt_shutdown_notify| async move {
147 loop {
148 tokio::select! {
149 event = rx.recv() => {
150 let observer = match observer.upgrade() {
151 Some(observer) => observer,
152 None => break,
153 };
154
155 if let Ok(event) = event {
156 observer.handle_event(event).await;
157 }
158 }
159 () = rt_shutdown_notify.notified() => {
160 break;
161 }
162 () = shutdown_notify.notified() => {
163 break;
164 }
165 }
166 }
167 });
168 }
169}
170
171impl Drop for OpsStatsForInstance {
172 fn drop(&mut self) {
173 self.shutdown_notify.notify_waiters();
174 }
175}
176
177#[async_trait]
178pub trait OpsStatsEventObserver: Send + Sync + 'static {
179 async fn handle_event(&self, event: OpsStatsEvent);
180}