statsig_rust/observability/
ops_stats.rs1use super::{
2 observability_client_adapter::{MetricType, ObservabilityEvent},
3 sdk_errors_observer::ErrorBoundaryEvent,
4 DiagnosticsEvent,
5};
6use crate::user::StatsigUserLoggable;
7use crate::{log_e, log_w, StatsigRuntime};
8use crate::{
9 observability::console_capture_observer::ConsoleCaptureEvent,
10 sdk_diagnostics::{
11 diagnostics::ContextType,
12 marker::{KeyType, Marker},
13 },
14};
15use async_trait::async_trait;
16use lazy_static::lazy_static;
17use parking_lot::RwLock;
18use std::{
19 collections::HashMap,
20 sync::{Arc, Weak},
21};
22use tokio::sync::broadcast::{self, Sender};
23use tokio::sync::Notify;
24
25const TAG: &str = stringify!(OpsStats);
26
27lazy_static! {
29 pub static ref OPS_STATS: OpsStats = OpsStats::new();
30}
31
32pub struct OpsStats {
33 instances_map: RwLock<HashMap<String, Weak<OpsStatsForInstance>>>, }
35
36impl Default for OpsStats {
37 fn default() -> Self {
38 Self::new()
39 }
40}
41
42impl OpsStats {
43 pub fn new() -> Self {
44 OpsStats {
45 instances_map: HashMap::new().into(),
46 }
47 }
48
49 pub fn get_for_instance(&self, sdk_key: &str) -> Arc<OpsStatsForInstance> {
50 match self
51 .instances_map
52 .try_read_for(std::time::Duration::from_secs(5))
53 {
54 Some(read_guard) => {
55 if let Some(instance) = read_guard.get(sdk_key) {
56 if let Some(instance) = instance.upgrade() {
57 return instance.clone();
58 }
59 }
60 }
61 None => {
62 log_e!(
63 TAG,
64 "Failed to get read guard: Failed to lock instances_map"
65 );
66 }
67 }
68
69 let instance = Arc::new(OpsStatsForInstance::new());
70 match self
71 .instances_map
72 .try_write_for(std::time::Duration::from_secs(5))
73 {
74 Some(mut write_guard) => {
75 write_guard.insert(sdk_key.into(), Arc::downgrade(&instance));
76 }
77 None => {
78 log_e!(
79 TAG,
80 "Failed to get write guard: Failed to lock instances_map"
81 );
82 }
83 }
84
85 instance
86 }
87}
88
89#[derive(Clone)]
90pub enum OpsStatsEvent {
91 Observability(ObservabilityEvent),
92 SDKError(ErrorBoundaryEvent),
93 Diagnostics(DiagnosticsEvent),
94 ConsoleCapture(ConsoleCaptureEvent),
95}
96
97pub struct OpsStatsForInstance {
98 sender: Sender<OpsStatsEvent>,
99 shutdown_notify: Arc<Notify>,
100}
101
102impl Default for OpsStatsForInstance {
104 fn default() -> Self {
105 Self::new()
106 }
107}
108
109impl OpsStatsForInstance {
110 pub fn new() -> Self {
111 let (tx, _) = broadcast::channel(1000);
112 OpsStatsForInstance {
113 sender: tx,
114 shutdown_notify: Arc::new(Notify::new()),
115 }
116 }
117
118 pub fn log(&self, event: OpsStatsEvent) {
119 match self.sender.send(event) {
120 Ok(_) => {}
121 Err(e) => {
122 log_w!(
123 "OpsStats Message Queue",
124 "Dropping ops stats event {}",
125 e.to_string()
126 );
127 }
128 }
129 }
130
131 pub fn log_error(&self, error: ErrorBoundaryEvent) {
132 self.log(OpsStatsEvent::SDKError(error));
133 }
134
135 pub fn log_checksum_validation_result(&self, is_success: bool) {
136 self.log(ObservabilityEvent::new_event(
137 MetricType::Increment,
138 "deltas_checksum_validation.count".to_string(),
139 1.0,
140 Some(HashMap::from([(
141 "result".to_string(),
142 if is_success { "success" } else { "failure" }.to_string(),
143 )])),
144 ));
145 }
146
147 pub fn add_marker(&self, marker: Marker, context: Option<ContextType>) {
148 self.log(OpsStatsEvent::Diagnostics(DiagnosticsEvent {
149 marker: Some(marker),
150 context,
151 key: None,
152 should_enqueue: false,
153 }));
154 }
155
156 pub fn set_diagnostics_context(&self, context: ContextType) {
157 self.log(OpsStatsEvent::Diagnostics(DiagnosticsEvent {
158 marker: None,
159 context: Some(context),
160 key: None,
161 should_enqueue: false,
162 }));
163 }
164
165 pub fn enqueue_diagnostics_event(&self, key: Option<KeyType>, context: Option<ContextType>) {
166 self.log(OpsStatsEvent::Diagnostics(DiagnosticsEvent {
167 marker: None,
168 context,
169 key,
170 should_enqueue: true,
171 }));
172 }
173
174 pub fn enqueue_console_capture_event(
175 &self,
176 level: String,
177 payload: Vec<String>,
178 timestamp: u64,
179 user: StatsigUserLoggable,
180 stack_trace: Option<String>,
181 ) {
182 self.log(OpsStatsEvent::ConsoleCapture(ConsoleCaptureEvent {
183 level,
184 payload,
185 timestamp,
186 user,
187 stack_trace,
188 }));
189 }
190
191 pub fn subscribe(
192 &self,
193 runtime: Arc<StatsigRuntime>,
194 observer: Weak<dyn OpsStatsEventObserver>,
195 ) {
196 let mut rx = self.sender.subscribe();
197 let shutdown_notify = self.shutdown_notify.clone();
198 let _ = runtime.spawn("opts_stats_listen_for", |rt_shutdown_notify| async move {
199 loop {
200 tokio::select! {
201 event = rx.recv() => {
202 let observer = match observer.upgrade() {
203 Some(observer) => observer,
204 None => break,
205 };
206
207 if let Ok(event) = event {
208 observer.handle_event(event).await;
209 }
210 }
211 () = rt_shutdown_notify.notified() => {
212 break;
213 }
214 () = shutdown_notify.notified() => {
215 break;
216 }
217 }
218 }
219 });
220 }
221}
222
223impl Drop for OpsStatsForInstance {
224 fn drop(&mut self) {
225 self.shutdown_notify.notify_waiters();
226 }
227}
228
229#[async_trait]
230pub trait OpsStatsEventObserver: Send + Sync + 'static {
231 async fn handle_event(&self, event: OpsStatsEvent);
232}