1use chrono::Utc;
2use parking_lot::RwLock;
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use crate::data_store_interface::{DataStoreCacheKeys, DataStoreTrait};
7use crate::evaluation::evaluator::SpecType;
8use crate::global_configs::GlobalConfigs;
9use crate::id_lists_adapter::{IdList, IdListsUpdateListener};
10use crate::interned_string::InternedString;
11use crate::networking::ResponseData;
12use crate::observability::observability_client_adapter::{MetricType, ObservabilityEvent};
13use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
14use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
15use crate::sdk_event_emitter::{SdkEvent, SdkEventEmitter};
16use crate::specs_response::proto_specs::deserialize_protobuf;
17use crate::specs_response::spec_types::{SpecsResponseFull, SpecsResponseNoUpdates};
18use crate::utils::try_release_unused_heap_memory;
19use crate::{
20 log_d, log_e, log_error_to_statsig_and_console, log_w, read_lock_or_else, write_lock_or_else,
21 SpecsFormat, SpecsInfo, SpecsSource, SpecsUpdate, SpecsUpdateListener, StatsigErr,
22 StatsigOptions, StatsigRuntime,
23};
24
25pub struct SpecStoreData {
26 pub source: SpecsSource,
27 pub source_api: Option<String>,
28 pub time_received_at: Option<u64>,
29 pub values: SpecsResponseFull,
30 pub id_lists: HashMap<String, IdList>,
31}
32
33const TAG: &str = stringify!(SpecStore);
34
35pub struct SpecStore {
36 pub data: Arc<RwLock<SpecStoreData>>,
37
38 data_store_keys: DataStoreCacheKeys,
39 data_store: Option<Arc<dyn DataStoreTrait>>,
40 statsig_runtime: Arc<StatsigRuntime>,
41 ops_stats: Arc<OpsStatsForInstance>,
42 global_configs: Arc<GlobalConfigs>,
43 event_emitter: Arc<SdkEventEmitter>,
44}
45
46impl SpecStore {
47 #[must_use]
48 pub fn new(
49 sdk_key: &str,
50 data_store_key: String,
51 statsig_runtime: Arc<StatsigRuntime>,
52 event_emitter: Arc<SdkEventEmitter>,
53 options: Option<&StatsigOptions>,
54 ) -> SpecStore {
55 let mut data_store = None;
56 if let Some(options) = options {
57 data_store = options.data_store.clone();
58 }
59
60 SpecStore {
61 data_store_keys: DataStoreCacheKeys::from_selected_key(&data_store_key),
62 data: Arc::new(RwLock::new(SpecStoreData {
63 values: SpecsResponseFull::default(),
64 time_received_at: None,
65 source: SpecsSource::Uninitialized,
66 source_api: None,
67 id_lists: HashMap::new(),
68 })),
69 event_emitter,
70 data_store,
71 statsig_runtime,
72 ops_stats: OPS_STATS.get_for_instance(sdk_key),
73 global_configs: GlobalConfigs::get_instance(sdk_key),
74 }
75 }
76
77 pub fn set_source(&self, source: SpecsSource) {
78 let mut locked_data = write_lock_or_else!(self.data, {
79 log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
80 return;
81 });
82
83 locked_data.source = source;
84 log_d!(TAG, "Source Changed ({:?})", locked_data.source);
85 }
86
87 pub fn get_current_values(&self) -> Option<SpecsResponseFull> {
88 let data = read_lock_or_else!(self.data, {
89 log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
90 return None;
91 });
92
93 let json = serde_json::to_string(&data.values).ok()?;
94 serde_json::from_str::<SpecsResponseFull>(&json).ok()
95 }
96
97 pub fn get_fields_used_for_entity(
98 &self,
99 entity_name: &str,
100 entity_type: SpecType,
101 ) -> Vec<String> {
102 let data = read_lock_or_else!(self.data, {
103 log_error_to_statsig_and_console!(
104 &self.ops_stats,
105 TAG,
106 StatsigErr::LockFailure(
107 "Failed to acquire read lock for spec store data".to_string()
108 )
109 );
110 return vec![];
111 });
112
113 let entities = match entity_type {
114 SpecType::Gate => &data.values.feature_gates,
115 SpecType::DynamicConfig | SpecType::Experiment => &data.values.dynamic_configs,
116 SpecType::Layer => &data.values.layer_configs,
117 SpecType::ParameterStore => return vec![],
118 };
119
120 let entity_name = InternedString::from_str_ref(entity_name);
121 let entity = entities.get(&entity_name);
122
123 match entity {
124 Some(entity) => match &entity.as_spec_ref().fields_used {
125 Some(fields) => fields.iter().map(|f| f.unperformant_to_string()).collect(),
126 None => vec![],
127 },
128 None => vec![],
129 }
130 }
131
132 pub fn unperformant_keys_entity_filter(
133 &self,
134 top_level_key: &str,
135 entity_type: &str,
136 ) -> Vec<String> {
137 let data = read_lock_or_else!(self.data, {
138 log_error_to_statsig_and_console!(
139 &self.ops_stats,
140 TAG,
141 StatsigErr::LockFailure(
142 "Failed to acquire read lock for spec store data".to_string()
143 )
144 );
145 return vec![];
146 });
147
148 if top_level_key == "param_stores" {
149 match &data.values.param_stores {
150 Some(param_stores) => {
151 return param_stores
152 .keys()
153 .map(|k| k.unperformant_to_string())
154 .collect()
155 }
156 None => return vec![],
157 }
158 }
159
160 let values = match top_level_key {
161 "feature_gates" => &data.values.feature_gates,
162 "dynamic_configs" => &data.values.dynamic_configs,
163 "layer_configs" => &data.values.layer_configs,
164 _ => {
165 log_e!(TAG, "Invalid top level key: {}", top_level_key);
166 return vec![];
167 }
168 };
169
170 if entity_type == "*" {
171 return values.keys().map(|k| k.unperformant_to_string()).collect();
172 }
173
174 values
175 .iter()
176 .filter(|(_, v)| v.as_spec_ref().entity == entity_type)
177 .map(|(k, _)| k.unperformant_to_string())
178 .collect()
179 }
180
181 pub fn set_values(&self, mut specs_update: SpecsUpdate) -> Result<(), StatsigErr> {
182 let prep_result = self.specs_update_prep(&mut specs_update).map_err(|e| {
190 log_error_to_statsig_and_console!(self.ops_stats, TAG, e);
191 e
192 })?;
193
194 let (next_values, response_format) = match prep_result {
195 PrepResult::HasUpdates(next_values, response_format) => (next_values, response_format),
196 PrepResult::CurrentValuesNewer => return Ok(()),
197 PrepResult::NoUpdates => {
198 self.ops_stats_log_no_update(specs_update.source, specs_update.source_api);
199 return Ok(());
200 }
201 };
202
203 let apply_result = self
206 .specs_update_apply(next_values, &specs_update)
207 .map_err(|e| {
208 log_error_to_statsig_and_console!(self.ops_stats, TAG, e);
209 e
210 })?;
211
212 try_release_unused_heap_memory();
213
214 self.specs_update_notify(response_format, specs_update, apply_result)
217 .map_err(|e| {
218 log_error_to_statsig_and_console!(self.ops_stats, TAG, e);
219 e
220 })?;
221
222 Ok(())
223 }
224}
225
226enum PrepResult {
229 HasUpdates(Box<SpecsResponseFull>, SpecsFormat),
230 NoUpdates,
231 CurrentValuesNewer,
232}
233
234struct ApplyResult {
235 prev_source: SpecsSource,
236 prev_lcut: u64,
237 time_received_at: u64,
238}
239
240impl SpecStore {
241 fn specs_update_prep(&self, specs_update: &mut SpecsUpdate) -> Result<PrepResult, StatsigErr> {
242 let response_format = self.get_spec_response_format(specs_update);
243
244 let read_data = read_lock_or_else!(self.data, {
245 let msg = "Failed to acquire read lock for extract_response_from_update";
246 log_e!(TAG, "{}", msg);
247 return Err(StatsigErr::LockFailure(msg.to_string()));
248 });
249
250 let current_values = &read_data.values;
251
252 let first_deserialize_result =
254 self.deserialize_specs_data(current_values, &response_format, &mut specs_update.data);
255
256 let first_deserialize_error = match first_deserialize_result {
257 Ok(next_values) => {
258 if self.are_current_values_newer(&read_data, &next_values) {
259 return Ok(PrepResult::CurrentValuesNewer);
260 }
261
262 if next_values.has_updates {
263 return Ok(PrepResult::HasUpdates(
264 Box::new(next_values),
265 response_format,
266 ));
267 }
268
269 None
270 }
271 Err(e) => Some(e),
272 };
273
274 let second_deserialize_result = specs_update
276 .data
277 .deserialize_into::<SpecsResponseNoUpdates>();
278
279 let second_deserialize_error = match second_deserialize_result {
280 Ok(result) => {
281 if !result.has_updates {
282 return Ok(PrepResult::NoUpdates);
283 }
284
285 None
286 }
287 Err(e) => Some(e),
288 };
289
290 let error = first_deserialize_error
291 .or(second_deserialize_error)
292 .unwrap_or_else(|| {
293 StatsigErr::JsonParseError("SpecsResponse".to_string(), "Unknown error".to_string())
294 });
295
296 Err(error)
297 }
298
299 fn specs_update_apply(
300 &self,
301 next_values: Box<SpecsResponseFull>,
302 specs_update: &SpecsUpdate,
303 ) -> Result<ApplyResult, StatsigErr> {
304 self.try_update_global_configs(&next_values);
306
307 let mut data = write_lock_or_else!(self.data, {
308 let msg = "Failed to acquire write lock for swap_current_with_next";
309 log_e!(TAG, "{}", msg);
310 return Err(StatsigErr::LockFailure(msg.to_string()));
311 });
312
313 let prev_source = std::mem::replace(&mut data.source, specs_update.source.clone());
314 let prev_lcut = data.values.time;
315 let time_received_at = Utc::now().timestamp_millis() as u64;
316
317 data.values = *next_values;
318 data.time_received_at = Some(time_received_at);
319 data.source_api = specs_update.source_api.clone();
320
321 Ok(ApplyResult {
322 prev_source,
323 prev_lcut,
324 time_received_at,
325 })
326 }
327
328 fn specs_update_notify(
329 &self,
330 response_format: SpecsFormat,
331 specs_update: SpecsUpdate,
332 apply_result: ApplyResult,
333 ) -> Result<(), StatsigErr> {
334 let SpecsUpdate {
335 data,
336 source,
337 source_api,
338 ..
339 } = specs_update;
340
341 let current_lcut = {
342 let read_lock = read_lock_or_else!(self.data, {
343 let msg = "Failed to acquire read lock for set_values";
344 log_e!(TAG, "{}", msg);
345 return Err(StatsigErr::LockFailure(msg.to_string()));
346 });
347
348 self.emit_specs_updated_sdk_event(
349 &read_lock.source,
350 &read_lock.source_api,
351 &read_lock.values,
352 );
353
354 read_lock.values.time
355 };
356
357 self.try_update_data_store(
359 &source,
360 data,
361 apply_result.time_received_at,
362 matches!(response_format, SpecsFormat::Protobuf),
363 );
364
365 self.ops_stats_log_config_propagation_diff(
366 current_lcut,
367 apply_result.prev_lcut,
368 &source,
369 &apply_result.prev_source,
370 source_api,
371 response_format,
372 );
373
374 Ok(())
375 }
376
377 fn deserialize_specs_data(
378 &self,
379 current_values: &SpecsResponseFull,
380 response_format: &SpecsFormat,
381 response_data: &mut ResponseData,
382 ) -> Result<SpecsResponseFull, StatsigErr> {
383 let mut next_values = SpecsResponseFull::default();
384
385 let parse_result = match response_format {
386 SpecsFormat::Protobuf => deserialize_protobuf(
387 &self.ops_stats,
388 current_values,
389 &mut next_values,
390 response_data,
391 ),
392 SpecsFormat::Json => response_data.deserialize_in_place(&mut next_values),
393 };
394
395 match parse_result {
396 Ok(()) => Ok(next_values),
397 Err(e) => Err(e),
398 }
399 }
400
401 fn emit_specs_updated_sdk_event(
402 &self,
403 source: &SpecsSource,
404 source_api: &Option<String>,
405 values: &SpecsResponseFull,
406 ) {
407 self.event_emitter.emit(SdkEvent::SpecsUpdated {
408 source,
409 source_api,
410 values,
411 });
412 }
413
414 fn get_spec_response_format(&self, update: &SpecsUpdate) -> SpecsFormat {
415 let content_type = update.data.get_header_ref("content-type");
416 if content_type.map(|s| s.as_str().contains("application/octet-stream")) != Some(true) {
417 return SpecsFormat::Json;
418 }
419
420 let content_encoding = update.data.get_header_ref("content-encoding");
421 if content_encoding.map(|s| s.as_str().contains("statsig-br")) != Some(true) {
422 return SpecsFormat::Json;
423 }
424
425 SpecsFormat::Protobuf
426 }
427
428 fn try_update_global_configs(&self, dcs: &SpecsResponseFull) {
429 if let Some(diagnostics) = &dcs.diagnostics {
430 self.global_configs
431 .set_diagnostics_sampling_rates(diagnostics.clone());
432 }
433
434 if let Some(sdk_configs) = &dcs.sdk_configs {
435 self.global_configs.set_sdk_configs(sdk_configs.clone());
436 }
437
438 if let Some(sdk_flags) = &dcs.sdk_flags {
439 self.global_configs.set_sdk_flags(sdk_flags.clone());
440 }
441 }
442
443 fn try_update_data_store(
444 &self,
445 source: &SpecsSource,
446 mut data: ResponseData,
447 now: u64,
448 is_protobuf: bool,
449 ) {
450 if source != &SpecsSource::Network {
451 return;
452 }
453
454 if data.get_header_ref("x-deltas-used").is_some() {
455 log_d!(
456 TAG,
457 "Skipping data store write for delta response identified by x-deltas-used header"
458 );
459 return;
460 }
461
462 let data_store = match &self.data_store {
463 Some(data_store) => data_store.clone(),
464 None => return,
465 };
466
467 let data_store_key = if is_protobuf {
468 self.data_store_keys.statsig_br.clone()
469 } else {
470 self.data_store_keys.plain_text.clone()
471 };
472
473 let spawn_result = self.statsig_runtime.spawn(
474 "spec_store_update_data_store",
475 move |_shutdown_notif| async move {
476 let data_bytes = match data.read_to_bytes() {
477 Ok(bytes) => bytes,
478 Err(e) => {
479 log_e!(TAG, "Failed to read data as bytes: {}", e);
480 return;
481 }
482 };
483
484 write_specs_to_data_store(data_store, data_store_key, data_bytes, now, is_protobuf)
485 .await;
486 },
487 );
488
489 if let Err(e) = spawn_result {
490 log_e!(
491 TAG,
492 "Failed to spawn spec store update data store task: {e}"
493 );
494 }
495 }
496
497 fn are_current_values_newer(
498 &self,
499 data: &SpecStoreData,
500 next_values: &SpecsResponseFull,
501 ) -> bool {
502 let curr_values = &data.values;
503 let curr_checksum = curr_values.checksum.as_deref().unwrap_or_default();
504 let new_checksum = next_values.checksum.as_deref().unwrap_or_default();
505
506 let cached_time_is_newer = curr_values.time > 0 && curr_values.time > next_values.time;
507 let checksums_match = !curr_checksum.is_empty() && curr_checksum == new_checksum;
508
509 if cached_time_is_newer || checksums_match {
510 log_d!(
511 TAG,
512 "Received values for [time: {}, checksum: {}], but currently has values for [time: {}, checksum: {}]. Ignoring values.",
513 next_values.time,
514 new_checksum,
515 curr_values.time,
516 curr_checksum,
517 );
518 return true;
519 }
520
521 false
522 }
523}
524
525async fn write_specs_to_data_store(
526 data_store: Arc<dyn DataStoreTrait>,
527 data_store_key: String,
528 data_bytes: Vec<u8>,
529 now: u64,
530 is_protobuf: bool,
531) {
532 match data_store
533 .set_bytes(&data_store_key, &data_bytes, Some(now))
534 .await
535 {
536 Ok(()) => return,
537 Err(e @ StatsigErr::BytesNotImplemented) if is_protobuf => {
538 log_w!(
539 TAG,
540 "Failed to write protobuf specs to data store as bytes. Protobuf specs cannot fall back to string writes: {}",
541 e
542 );
543 return;
544 }
545 Err(e @ StatsigErr::BytesNotImplemented) => {
546 log_w!(
547 TAG,
548 "Data store bytes write is not implemented. Falling back to string write: {}",
549 e
550 );
551 }
552 Err(e) => {
553 log_w!(TAG, "Failed to write specs to data store as bytes: {}", e);
554 return;
555 }
556 }
557
558 let data_string = match String::from_utf8(data_bytes) {
559 Ok(s) => s,
560 Err(e) => {
561 log_w!(
562 TAG,
563 "Skipping data store string write because payload is not valid UTF-8: {}",
564 e
565 );
566 return;
567 }
568 };
569
570 if let Err(e) = data_store
571 .set(&data_store_key, &data_string, Some(now))
572 .await
573 {
574 log_w!(TAG, "Failed to write specs to data store as string: {}", e);
575 }
576}
577
578impl SpecStore {
581 fn ops_stats_log_no_update(&self, source: SpecsSource, source_api: Option<String>) {
582 log_d!(TAG, "No Updates");
583 self.ops_stats.log(ObservabilityEvent::new_event(
584 MetricType::Increment,
585 "config_no_update".to_string(),
586 1.0,
587 Some(HashMap::from([
588 ("source".to_string(), source.to_string()),
589 ("source_api".to_string(), source_api.unwrap_or_default()),
590 ])),
591 ));
592 }
593
594 #[allow(clippy::too_many_arguments)]
595 fn ops_stats_log_config_propagation_diff(
596 &self,
597 lcut: u64,
598 prev_lcut: u64,
599 source: &SpecsSource,
600 prev_source: &SpecsSource,
601 source_api: Option<String>,
602 response_format: SpecsFormat,
603 ) {
604 let delay = (Utc::now().timestamp_millis() as u64).saturating_sub(lcut);
605 log_d!(TAG, "Updated ({:?})", source);
606
607 if *prev_source == SpecsSource::Uninitialized || *prev_source == SpecsSource::Loading {
608 return;
609 }
610
611 self.ops_stats.log(ObservabilityEvent::new_event(
612 MetricType::Dist,
613 "config_propagation_diff".to_string(),
614 delay as f64,
615 Some(HashMap::from([
616 ("source".to_string(), source.to_string()),
617 ("lcut".to_string(), lcut.to_string()),
618 ("prev_lcut".to_string(), prev_lcut.to_string()),
619 ("source_api".to_string(), source_api.unwrap_or_default()),
620 (
621 "response_format".to_string(),
622 Into::<&str>::into(&response_format).to_string(),
623 ),
624 ])),
625 ));
626 }
627}
628
629impl SpecsUpdateListener for SpecStore {
632 fn did_receive_specs_update(&self, update: SpecsUpdate) -> Result<(), StatsigErr> {
633 self.set_values(update)
634 }
635
636 fn get_current_specs_info(&self) -> SpecsInfo {
637 let data = read_lock_or_else!(self.data, {
638 log_e!(
639 TAG,
640 "Failed to acquire read lock for get_current_specs_info"
641 );
642 return SpecsInfo {
643 lcut: None,
644 checksum: None,
645 source: SpecsSource::Error,
646 source_api: None,
647 };
648 });
649
650 SpecsInfo {
651 lcut: Some(data.values.time),
652 checksum: data.values.checksum.clone(),
653 source: data.source.clone(),
654 source_api: data.source_api.clone(),
655 }
656 }
657}
658
659impl IdListsUpdateListener for SpecStore {
662 fn get_current_id_list_metadata(
663 &self,
664 ) -> HashMap<String, crate::id_lists_adapter::IdListMetadata> {
665 let data = read_lock_or_else!(self.data, {
666 let err = StatsigErr::LockFailure(
667 "Failed to acquire read lock for id list metadata".to_string(),
668 );
669 log_error_to_statsig_and_console!(self.ops_stats, TAG, err);
670 return HashMap::new();
671 });
672
673 data.id_lists
674 .iter()
675 .map(|(key, list)| (key.clone(), list.metadata.clone()))
676 .collect()
677 }
678
679 fn did_receive_id_list_updates(
680 &self,
681 updates: HashMap<String, crate::id_lists_adapter::IdListUpdate>,
682 ) {
683 let mut data = write_lock_or_else!(self.data, {
684 let err = StatsigErr::LockFailure(
685 "Failed to acquire write lock for did_receive_id_list_updates".to_string(),
686 );
687 log_error_to_statsig_and_console!(self.ops_stats, TAG, err);
688
689 return;
690 });
691
692 data.id_lists.retain(|name, _| updates.contains_key(name));
694
695 for (list_name, update) in updates {
696 if let Some(entry) = data.id_lists.get_mut(&list_name) {
697 entry.apply_update(update);
699 } else {
700 let mut list = IdList::new(update.new_metadata.clone());
702 list.apply_update(update);
703 data.id_lists.insert(list_name, list);
704 }
705 }
706 }
707}