statsig_rust/observability/
ops_stats.rs1use super::{
2 observability_client_adapter::ObservabilityEvent, sdk_errors_observer::ErrorBoundaryEvent,
3 DiagnosticsEvent,
4};
5use crate::user::StatsigUserLoggable;
6use crate::{log_e, log_w, StatsigRuntime};
7use crate::{
8 observability::console_capture_observer::ConsoleCaptureEvent,
9 sdk_diagnostics::{
10 diagnostics::ContextType,
11 marker::{KeyType, Marker},
12 },
13};
14use async_trait::async_trait;
15use lazy_static::lazy_static;
16use parking_lot::RwLock;
17use std::{
18 collections::HashMap,
19 sync::{Arc, Weak},
20};
21use tokio::sync::broadcast::{self, Sender};
22use tokio::sync::Notify;
23
24const TAG: &str = stringify!(OpsStats);
25
26lazy_static! {
28 pub static ref OPS_STATS: OpsStats = OpsStats::new();
29}
30
31pub struct OpsStats {
32 instances_map: RwLock<HashMap<String, Weak<OpsStatsForInstance>>>, }
34
35impl Default for OpsStats {
36 fn default() -> Self {
37 Self::new()
38 }
39}
40
41impl OpsStats {
42 pub fn new() -> Self {
43 OpsStats {
44 instances_map: HashMap::new().into(),
45 }
46 }
47
48 pub fn get_for_instance(&self, sdk_key: &str) -> Arc<OpsStatsForInstance> {
49 match self
50 .instances_map
51 .try_read_for(std::time::Duration::from_secs(5))
52 {
53 Some(read_guard) => {
54 if let Some(instance) = read_guard.get(sdk_key) {
55 if let Some(instance) = instance.upgrade() {
56 return instance.clone();
57 }
58 }
59 }
60 None => {
61 log_e!(
62 TAG,
63 "Failed to get read guard: Failed to lock instances_map"
64 );
65 }
66 }
67
68 let instance = Arc::new(OpsStatsForInstance::new());
69 match self
70 .instances_map
71 .try_write_for(std::time::Duration::from_secs(5))
72 {
73 Some(mut write_guard) => {
74 write_guard.insert(sdk_key.into(), Arc::downgrade(&instance));
75 }
76 None => {
77 log_e!(
78 TAG,
79 "Failed to get write guard: Failed to lock instances_map"
80 );
81 }
82 }
83
84 instance
85 }
86}
87
88#[derive(Clone)]
89pub enum OpsStatsEvent {
90 Observability(ObservabilityEvent),
91 SDKError(ErrorBoundaryEvent),
92 Diagnostics(DiagnosticsEvent),
93 ConsoleCapture(ConsoleCaptureEvent),
94}
95
96pub struct OpsStatsForInstance {
97 sender: Sender<OpsStatsEvent>,
98 shutdown_notify: Arc<Notify>,
99}
100
101impl Default for OpsStatsForInstance {
103 fn default() -> Self {
104 Self::new()
105 }
106}
107
108impl OpsStatsForInstance {
109 pub fn new() -> Self {
110 let (tx, _) = broadcast::channel(1000);
111 OpsStatsForInstance {
112 sender: tx,
113 shutdown_notify: Arc::new(Notify::new()),
114 }
115 }
116
117 pub fn log(&self, event: OpsStatsEvent) {
118 match self.sender.send(event) {
119 Ok(_) => {}
120 Err(e) => {
121 log_w!(
122 "OpsStats Message Queue",
123 "Dropping ops stats event {}",
124 e.to_string()
125 );
126 }
127 }
128 }
129
130 pub fn log_error(&self, error: ErrorBoundaryEvent) {
131 self.log(OpsStatsEvent::SDKError(error));
132 }
133
134 pub fn add_marker(&self, marker: Marker, context: Option<ContextType>) {
135 self.log(OpsStatsEvent::Diagnostics(DiagnosticsEvent {
136 marker: Some(marker),
137 context,
138 key: None,
139 should_enqueue: false,
140 }));
141 }
142
143 pub fn set_diagnostics_context(&self, context: ContextType) {
144 self.log(OpsStatsEvent::Diagnostics(DiagnosticsEvent {
145 marker: None,
146 context: Some(context),
147 key: None,
148 should_enqueue: false,
149 }));
150 }
151
152 pub fn enqueue_diagnostics_event(&self, key: Option<KeyType>, context: Option<ContextType>) {
153 self.log(OpsStatsEvent::Diagnostics(DiagnosticsEvent {
154 marker: None,
155 context,
156 key,
157 should_enqueue: true,
158 }));
159 }
160
161 pub fn enqueue_console_capture_event(
162 &self,
163 level: String,
164 payload: Vec<String>,
165 timestamp: u64,
166 user: StatsigUserLoggable,
167 stack_trace: Option<String>,
168 ) {
169 self.log(OpsStatsEvent::ConsoleCapture(ConsoleCaptureEvent {
170 level,
171 payload,
172 timestamp,
173 user,
174 stack_trace,
175 }));
176 }
177
178 pub fn subscribe(
179 &self,
180 runtime: Arc<StatsigRuntime>,
181 observer: Weak<dyn OpsStatsEventObserver>,
182 ) {
183 let mut rx = self.sender.subscribe();
184 let shutdown_notify = self.shutdown_notify.clone();
185 let _ = runtime.spawn("opts_stats_listen_for", |rt_shutdown_notify| async move {
186 loop {
187 tokio::select! {
188 event = rx.recv() => {
189 let observer = match observer.upgrade() {
190 Some(observer) => observer,
191 None => break,
192 };
193
194 if let Ok(event) = event {
195 observer.handle_event(event).await;
196 }
197 }
198 () = rt_shutdown_notify.notified() => {
199 break;
200 }
201 () = shutdown_notify.notified() => {
202 break;
203 }
204 }
205 }
206 });
207 }
208}
209
210impl Drop for OpsStatsForInstance {
211 fn drop(&mut self) {
212 self.shutdown_notify.notify_waiters();
213 }
214}
215
216#[async_trait]
217pub trait OpsStatsEventObserver: Send + Sync + 'static {
218 async fn handle_event(&self, event: OpsStatsEvent);
219}