1use crate::data_store_interface::{get_data_adapter_dcs_key, DataStoreTrait};
2use crate::global_configs::GlobalConfigs;
3use crate::id_lists_adapter::{IdList, IdListsUpdateListener};
4use crate::networking::ResponseData;
5use crate::observability::observability_client_adapter::{MetricType, ObservabilityEvent};
6use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
7use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
8use crate::sdk_event_emitter::{SdkEvent, SdkEventEmitter};
9use crate::specs_response::spec_types::{SpecsResponseFull, SpecsResponseNoUpdates};
10use crate::utils::maybe_trim_malloc;
11use crate::{
12 log_d, log_e, log_error_to_statsig_and_console, SpecsInfo, SpecsSource, SpecsUpdate,
13 SpecsUpdateListener, StatsigErr, StatsigRuntime,
14};
15use chrono::Utc;
16use parking_lot::RwLock;
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Duration;
20
21pub struct SpecStoreData {
22 pub source: SpecsSource,
23 pub source_api: Option<String>,
24 pub time_received_at: Option<u64>,
25 pub values: SpecsResponseFull,
26 pub next_values: Option<SpecsResponseFull>,
27 pub id_lists: HashMap<String, IdList>,
28}
29
30const TAG: &str = stringify!(SpecStore);
31
32pub struct SpecStore {
33 pub data: Arc<RwLock<SpecStoreData>>,
34
35 hashed_sdk_key: String,
36 data_store: Option<Arc<dyn DataStoreTrait>>,
37 statsig_runtime: Arc<StatsigRuntime>,
38 ops_stats: Arc<OpsStatsForInstance>,
39 global_configs: Arc<GlobalConfigs>,
40 event_emitter: Arc<SdkEventEmitter>,
41}
42
43impl SpecStore {
44 #[must_use]
45 pub fn new(
46 sdk_key: &str,
47 hashed_sdk_key: String,
48 statsig_runtime: Arc<StatsigRuntime>,
49 event_emitter: Arc<SdkEventEmitter>,
50 data_store: Option<Arc<dyn DataStoreTrait>>,
51 ) -> SpecStore {
52 SpecStore {
53 hashed_sdk_key,
54 data: Arc::new(RwLock::new(SpecStoreData {
55 values: SpecsResponseFull::default(),
56 next_values: Some(SpecsResponseFull::default()),
57 time_received_at: None,
58 source: SpecsSource::Uninitialized,
59 source_api: None,
60 id_lists: HashMap::new(),
61 })),
62 event_emitter,
63 data_store,
64 statsig_runtime,
65 ops_stats: OPS_STATS.get_for_instance(sdk_key),
66 global_configs: GlobalConfigs::get_instance(sdk_key),
67 }
68 }
69
70 pub fn set_source(&self, source: SpecsSource) {
71 match self.data.try_write_for(Duration::from_secs(5)) {
72 Some(mut data) => {
73 data.source = source;
74 log_d!(TAG, "Source Changed ({:?})", data.source);
75 }
76 None => {
77 log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
78 }
79 }
80 }
81
82 pub fn get_current_values(&self) -> Option<SpecsResponseFull> {
83 let data = match self.data.try_read_for(Duration::from_secs(5)) {
84 Some(data) => data,
85 None => {
86 log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
87 return None;
88 }
89 };
90 let json = serde_json::to_string(&data.values).ok()?;
91 serde_json::from_str::<SpecsResponseFull>(&json).ok()
92 }
93
94 pub fn set_values(&self, specs_update: SpecsUpdate) -> Result<(), StatsigErr> {
95 let mut specs_update = specs_update;
96 let mut next_values = match self.data.try_write_for(Duration::from_secs(5)) {
97 Some(mut data) => data.next_values.take().unwrap_or_default(),
98 None => {
99 log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
100 return Err(StatsigErr::LockFailure(
101 "Failed to acquire write lock: Failed to lock data".to_string(),
102 ));
103 }
104 };
105
106 match self.parse_specs_response(&mut specs_update, &mut next_values) {
107 Ok(ParseResult::HasUpdates) => (),
108 Ok(ParseResult::NoUpdates) => {
109 self.ops_stats_log_no_update(specs_update.source, specs_update.source_api);
110 return Ok(());
111 }
112 Err(e) => {
113 return Err(e);
114 }
115 };
116
117 if self.are_current_values_newer(&next_values) {
118 return Ok(());
119 }
120
121 self.try_update_global_configs(&next_values);
122
123 let now = Utc::now().timestamp_millis() as u64;
124 let (prev_source, prev_lcut, curr_values_time) = self.swap_current_with_next(
125 next_values,
126 &specs_update,
127 now,
128 specs_update.source_api.clone(),
129 )?;
130
131 self.try_update_data_store(&specs_update.source, specs_update.data, now);
132 self.ops_stats_log_config_propagation_diff(
133 curr_values_time,
134 prev_lcut,
135 &specs_update.source,
136 &prev_source,
137 specs_update.source_api,
138 );
139
140 maybe_trim_malloc();
144
145 Ok(())
146 }
147}
148
149impl SpecStore {
152 fn parse_specs_response(
153 &self,
154 values: &mut SpecsUpdate,
155 next_values: &mut SpecsResponseFull,
156 ) -> Result<ParseResult, StatsigErr> {
157 let parse_result = values.data.deserialize_in_place(next_values);
158
159 if parse_result.is_ok() && next_values.has_updates {
160 return Ok(ParseResult::HasUpdates);
161 }
162
163 let no_updates_result = values.data.deserialize_into::<SpecsResponseNoUpdates>();
164 if let Ok(result) = no_updates_result {
165 if !result.has_updates {
166 return Ok(ParseResult::NoUpdates);
167 }
168 }
169
170 let error = parse_result.err().unwrap_or_else(|| {
171 StatsigErr::JsonParseError("SpecsResponse".to_string(), "Unknown error".to_string())
172 });
173
174 log_error_to_statsig_and_console!(self.ops_stats, TAG, error);
175 Err(error)
176 }
177
178 fn swap_current_with_next(
179 &self,
180 next_values: SpecsResponseFull,
181 specs_update: &SpecsUpdate,
182 now: u64,
183 source_api: Option<String>,
184 ) -> Result<(SpecsSource, u64, u64), StatsigErr> {
185 match self.data.try_write_for(Duration::from_secs(5)) {
186 Some(mut data) => {
187 let prev_source = std::mem::replace(&mut data.source, specs_update.source.clone());
188 let prev_lcut = data.values.time;
189
190 let mut temp = next_values;
191 std::mem::swap(&mut data.values, &mut temp);
192 data.next_values = Some(temp);
193
194 data.time_received_at = Some(now);
195 data.source_api = source_api;
196
197 self.emit_specs_updated_sdk_event(&data.source, &data.source_api, &data.values);
198
199 Ok((prev_source, prev_lcut, data.values.time))
200 }
201 None => {
202 log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
203 Err(StatsigErr::LockFailure(
204 "Failed to acquire write lock: Failed to lock data".to_string(),
205 ))
206 }
207 }
208 }
209
210 fn emit_specs_updated_sdk_event(
211 &self,
212 source: &SpecsSource,
213 source_api: &Option<String>,
214 values: &SpecsResponseFull,
215 ) {
216 self.event_emitter.emit(SdkEvent::SpecsUpdated {
217 source,
218 source_api,
219 values,
220 });
221 }
222
223 fn ops_stats_log_no_update(&self, source: SpecsSource, source_api: Option<String>) {
224 log_d!(TAG, "No Updates");
225 self.ops_stats.log(ObservabilityEvent::new_event(
226 MetricType::Increment,
227 "config_no_update".to_string(),
228 1.0,
229 Some(HashMap::from([
230 ("source".to_string(), source.to_string()),
231 (
232 "spec_source_api".to_string(),
233 source_api.unwrap_or_default(),
234 ),
235 ])),
236 ));
237 }
238
239 fn ops_stats_log_config_propagation_diff(
240 &self,
241 lcut: u64,
242 prev_lcut: u64,
243 source: &SpecsSource,
244 prev_source: &SpecsSource,
245 source_api: Option<String>,
246 ) {
247 let delay = Utc::now().timestamp_millis() as u64 - lcut;
248 log_d!(TAG, "Updated ({:?})", source);
249
250 if *prev_source == SpecsSource::Uninitialized || *prev_source == SpecsSource::Loading {
251 return;
252 }
253
254 self.ops_stats.log(ObservabilityEvent::new_event(
255 MetricType::Dist,
256 "config_propagation_diff".to_string(),
257 delay as f64,
258 Some(HashMap::from([
259 ("source".to_string(), source.to_string()),
260 ("lcut".to_string(), lcut.to_string()),
261 ("prev_lcut".to_string(), prev_lcut.to_string()),
262 (
263 "spec_source_api".to_string(),
264 source_api.unwrap_or_default(),
265 ),
266 ])),
267 ));
268 }
269
270 fn try_update_global_configs(&self, dcs: &SpecsResponseFull) {
271 if let Some(diagnostics) = &dcs.diagnostics {
272 self.global_configs
273 .set_diagnostics_sampling_rates(diagnostics.clone());
274 }
275
276 if let Some(sdk_configs) = &dcs.sdk_configs {
277 self.global_configs.set_sdk_configs(sdk_configs.clone());
278 }
279
280 if let Some(sdk_flags) = &dcs.sdk_flags {
281 self.global_configs.set_sdk_flags(sdk_flags.clone());
282 }
283 }
284
285 fn try_update_data_store(&self, source: &SpecsSource, mut data: ResponseData, now: u64) {
286 if source != &SpecsSource::Network {
287 return;
288 }
289
290 let data_store = match &self.data_store {
291 Some(data_store) => data_store.clone(),
292 None => return,
293 };
294
295 let hashed_key = self.hashed_sdk_key.clone();
296
297 let spawn_result = self.statsig_runtime.spawn(
298 "spec_store_update_data_store",
299 move |_shutdown_notif| async move {
300 let data_string = match data.read_to_string() {
301 Ok(s) => s,
302 Err(e) => {
303 log_e!(TAG, "Failed to convert data to string: {}", e);
304 return;
305 }
306 };
307
308 let _ = data_store
309 .set(
310 &get_data_adapter_dcs_key(&hashed_key),
311 &data_string,
312 Some(now),
313 )
314 .await;
315 },
316 );
317
318 if let Err(e) = spawn_result {
319 log_e!(
320 TAG,
321 "Failed to spawn spec store update data store task: {e}"
322 );
323 }
324 }
325
326 fn are_current_values_newer(&self, next_values: &SpecsResponseFull) -> bool {
327 let data = match self.data.try_read_for(Duration::from_secs(5)) {
328 Some(data) => data,
329 None => {
330 log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
331 return false;
332 }
333 };
334
335 let curr_values = &data.values;
336 let curr_checksum = curr_values.checksum.as_deref().unwrap_or_default();
337 let new_checksum = next_values.checksum.as_deref().unwrap_or_default();
338
339 let cached_time_is_newer = curr_values.time > 0 && curr_values.time > next_values.time;
340 let checksums_match = !curr_checksum.is_empty() && curr_checksum == new_checksum;
341
342 if cached_time_is_newer || checksums_match {
343 log_d!(
344 TAG,
345 "Received values for [time: {}, checksum: {}], but currently has values for [time: {}, checksum: {}]. Ignoring values.",
346 next_values.time,
347 new_checksum,
348 curr_values.time,
349 curr_checksum,
350 );
351 return true;
352 }
353
354 false
355 }
356}
357
358impl SpecsUpdateListener for SpecStore {
361 fn did_receive_specs_update(&self, update: SpecsUpdate) -> Result<(), StatsigErr> {
362 self.set_values(update)
363 }
364
365 fn get_current_specs_info(&self) -> SpecsInfo {
366 match self.data.try_read_for(Duration::from_secs(5)) {
367 Some(data) => SpecsInfo {
368 lcut: Some(data.values.time),
369 checksum: data.values.checksum.clone(),
370 source: data.source.clone(),
371 source_api: data.source_api.clone(),
372 },
373 None => {
374 log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
375 SpecsInfo {
376 lcut: None,
377 checksum: None,
378 source: SpecsSource::Error,
379 source_api: None,
380 }
381 }
382 }
383 }
384}
385
386impl IdListsUpdateListener for SpecStore {
389 fn get_current_id_list_metadata(
390 &self,
391 ) -> HashMap<String, crate::id_lists_adapter::IdListMetadata> {
392 match self.data.try_read_for(Duration::from_secs(5)) {
393 Some(data) => data
394 .id_lists
395 .iter()
396 .map(|(key, list)| (key.clone(), list.metadata.clone()))
397 .collect(),
398 None => {
399 log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
400 HashMap::new()
401 }
402 }
403 }
404
405 fn did_receive_id_list_updates(
406 &self,
407 updates: HashMap<String, crate::id_lists_adapter::IdListUpdate>,
408 ) {
409 let mut data = match self.data.try_write_for(Duration::from_secs(5)) {
410 Some(data) => data,
411 None => {
412 log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
413 return;
414 }
415 };
416
417 data.id_lists.retain(|name, _| updates.contains_key(name));
419
420 for (list_name, update) in updates {
421 if let Some(entry) = data.id_lists.get_mut(&list_name) {
422 entry.apply_update(update);
424 } else {
425 let mut list = IdList::new(update.new_metadata.clone());
427 list.apply_update(update);
428 data.id_lists.insert(list_name, list);
429 }
430 }
431 }
432}
433
434enum ParseResult {
435 HasUpdates,
436 NoUpdates,
437}