statsig_rust/observability/
ops_stats.rs1use super::{
2 observability_client_adapter::ObservabilityEvent, sdk_errors_observer::ErrorBoundaryEvent,
3 DiagnosticsEvent,
4};
5use crate::sdk_diagnostics::{
6 diagnostics::ContextType,
7 marker::{KeyType, Marker},
8};
9use crate::{log_e, log_w, StatsigRuntime};
10use async_trait::async_trait;
11use lazy_static::lazy_static;
12use parking_lot::RwLock;
13use std::{
14 collections::HashMap,
15 sync::{Arc, Weak},
16};
17use tokio::sync::broadcast::{self, Sender};
18use tokio::sync::Notify;
19
20const TAG: &str = stringify!(OpsStats);
21
22lazy_static! {
24 pub static ref OPS_STATS: OpsStats = OpsStats::new();
25}
26
27pub struct OpsStats {
28 instances_map: RwLock<HashMap<String, Weak<OpsStatsForInstance>>>, }
30
31impl OpsStats {
32 pub fn new() -> Self {
33 OpsStats {
34 instances_map: HashMap::new().into(),
35 }
36 }
37
38 pub fn get_for_instance(&self, sdk_key: &str) -> Arc<OpsStatsForInstance> {
39 match self
40 .instances_map
41 .try_read_for(std::time::Duration::from_secs(5))
42 {
43 Some(read_guard) => {
44 if let Some(instance) = read_guard.get(sdk_key) {
45 if let Some(instance) = instance.upgrade() {
46 return instance.clone();
47 }
48 }
49 }
50 None => {
51 log_e!(
52 TAG,
53 "Failed to get read guard: Failed to lock instances_map"
54 );
55 }
56 }
57
58 let instance = Arc::new(OpsStatsForInstance::new());
59 match self
60 .instances_map
61 .try_write_for(std::time::Duration::from_secs(5))
62 {
63 Some(mut write_guard) => {
64 write_guard.insert(sdk_key.into(), Arc::downgrade(&instance));
65 }
66 None => {
67 log_e!(
68 TAG,
69 "Failed to get write guard: Failed to lock instances_map"
70 );
71 }
72 }
73
74 instance
75 }
76}
77
78#[derive(Clone)]
79pub enum OpsStatsEvent {
80 Observability(ObservabilityEvent),
81 SDKError(ErrorBoundaryEvent),
82 Diagnostics(DiagnosticsEvent),
83}
84
85pub struct OpsStatsForInstance {
86 sender: Sender<OpsStatsEvent>,
87 shutdown_notify: Arc<Notify>,
88}
89
90impl Default for OpsStatsForInstance {
92 fn default() -> Self {
93 Self::new()
94 }
95}
96
97impl OpsStatsForInstance {
98 pub fn new() -> Self {
99 let (tx, _) = broadcast::channel(1000);
100 OpsStatsForInstance {
101 sender: tx,
102 shutdown_notify: Arc::new(Notify::new()),
103 }
104 }
105
106 pub fn log(&self, event: OpsStatsEvent) {
107 match self.sender.send(event) {
108 Ok(_) => {}
109 Err(e) => {
110 log_w!(
111 "OpsStats Message Queue",
112 "Dropping ops stats event {}",
113 e.to_string()
114 );
115 }
116 }
117 }
118
119 pub fn log_error(&self, error: ErrorBoundaryEvent) {
120 self.log(OpsStatsEvent::SDKError(error));
121 }
122
123 pub fn add_marker(&self, marker: Marker, context: Option<ContextType>) {
124 self.log(OpsStatsEvent::Diagnostics(DiagnosticsEvent {
125 marker: Some(marker),
126 context,
127 key: None,
128 should_enqueue: false,
129 }));
130 }
131
132 pub fn set_diagnostics_context(&self, context: ContextType) {
133 self.log(OpsStatsEvent::Diagnostics(DiagnosticsEvent {
134 marker: None,
135 context: Some(context),
136 key: None,
137 should_enqueue: false,
138 }));
139 }
140
141 pub fn enqueue_diagnostics_event(&self, key: Option<KeyType>, context: Option<ContextType>) {
142 self.log(OpsStatsEvent::Diagnostics(DiagnosticsEvent {
143 marker: None,
144 context,
145 key,
146 should_enqueue: true,
147 }));
148 }
149
150 pub fn subscribe(
151 &self,
152 runtime: Arc<StatsigRuntime>,
153 observer: Weak<dyn OpsStatsEventObserver>,
154 ) {
155 let mut rx = self.sender.subscribe();
156 let shutdown_notify = self.shutdown_notify.clone();
157 let _ = runtime.spawn("opts_stats_listen_for", |rt_shutdown_notify| async move {
158 loop {
159 tokio::select! {
160 event = rx.recv() => {
161 let observer = match observer.upgrade() {
162 Some(observer) => observer,
163 None => break,
164 };
165
166 if let Ok(event) = event {
167 observer.handle_event(event).await;
168 }
169 }
170 () = rt_shutdown_notify.notified() => {
171 break;
172 }
173 () = shutdown_notify.notified() => {
174 break;
175 }
176 }
177 }
178 });
179 }
180}
181
182impl Drop for OpsStatsForInstance {
183 fn drop(&mut self) {
184 self.shutdown_notify.notify_waiters();
185 }
186}
187
188#[async_trait]
189pub trait OpsStatsEventObserver: Send + Sync + 'static {
190 async fn handle_event(&self, event: OpsStatsEvent);
191}