1use chrono::Utc;
2use parking_lot::RwLock;
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::Duration;
6
7use crate::data_store_interface::DataStoreTrait;
8use crate::evaluation::evaluator::SpecType;
9use crate::global_configs::GlobalConfigs;
10use crate::id_lists_adapter::{IdList, IdListsUpdateListener};
11use crate::interned_string::InternedString;
12use crate::networking::ResponseData;
13use crate::observability::observability_client_adapter::{MetricType, ObservabilityEvent};
14use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
15use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
16use crate::sdk_event_emitter::{SdkEvent, SdkEventEmitter};
17use crate::specs_response::proto_specs::deserialize_protobuf;
18use crate::specs_response::spec_types::{SpecsResponseFull, SpecsResponseNoUpdates};
19use crate::utils::maybe_trim_malloc;
20use crate::{
21 log_d, log_e, log_error_to_statsig_and_console, read_lock_or_else, SpecsFormat, SpecsInfo,
22 SpecsSource, SpecsUpdate, SpecsUpdateListener, StatsigErr, StatsigOptions, StatsigRuntime,
23};
24
25pub struct SpecStoreData {
26 pub source: SpecsSource,
27 pub source_api: Option<String>,
28 pub time_received_at: Option<u64>,
29 pub values: SpecsResponseFull,
30 pub next_values: SpecsResponseFull,
31 pub id_lists: HashMap<String, IdList>,
32}
33
34const TAG: &str = stringify!(SpecStore);
35
36pub struct SpecStore {
37 pub data: Arc<RwLock<SpecStoreData>>,
38
39 data_store_key: String,
40 data_store: Option<Arc<dyn DataStoreTrait>>,
41 statsig_runtime: Arc<StatsigRuntime>,
42 ops_stats: Arc<OpsStatsForInstance>,
43 global_configs: Arc<GlobalConfigs>,
44 event_emitter: Arc<SdkEventEmitter>,
45}
46
47impl SpecStore {
48 #[must_use]
49 pub fn new(
50 sdk_key: &str,
51 data_store_key: String,
52 statsig_runtime: Arc<StatsigRuntime>,
53 event_emitter: Arc<SdkEventEmitter>,
54 options: Option<&StatsigOptions>,
55 ) -> SpecStore {
56 let mut data_store = None;
57 if let Some(options) = options {
58 data_store = options.data_store.clone();
59 }
60
61 SpecStore {
62 data_store_key,
63 data: Arc::new(RwLock::new(SpecStoreData {
64 values: SpecsResponseFull::default(),
65 next_values: SpecsResponseFull::default(),
66 time_received_at: None,
67 source: SpecsSource::Uninitialized,
68 source_api: None,
69 id_lists: HashMap::new(),
70 })),
71 event_emitter,
72 data_store,
73 statsig_runtime,
74 ops_stats: OPS_STATS.get_for_instance(sdk_key),
75 global_configs: GlobalConfigs::get_instance(sdk_key),
76 }
77 }
78
79 pub fn set_source(&self, source: SpecsSource) {
80 match self.data.try_write_for(Duration::from_secs(5)) {
81 Some(mut data) => {
82 data.source = source;
83 log_d!(TAG, "Source Changed ({:?})", data.source);
84 }
85 None => {
86 log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
87 }
88 }
89 }
90
91 pub fn get_current_values(&self) -> Option<SpecsResponseFull> {
92 let data = match self.data.try_read_for(Duration::from_secs(5)) {
93 Some(data) => data,
94 None => {
95 log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
96 return None;
97 }
98 };
99 let json = serde_json::to_string(&data.values).ok()?;
100 serde_json::from_str::<SpecsResponseFull>(&json).ok()
101 }
102
103 pub fn get_fields_used_for_entity(
104 &self,
105 entity_name: &str,
106 entity_type: SpecType,
107 ) -> Vec<String> {
108 let data = read_lock_or_else!(self.data, {
109 log_error_to_statsig_and_console!(
110 &self.ops_stats,
111 TAG,
112 StatsigErr::LockFailure(
113 "Failed to acquire read lock for spec store data".to_string()
114 )
115 );
116 return vec![];
117 });
118
119 let entities = match entity_type {
120 SpecType::Gate => &data.values.feature_gates,
121 SpecType::DynamicConfig | SpecType::Experiment => &data.values.dynamic_configs,
122 SpecType::Layer => &data.values.layer_configs,
123 SpecType::ParameterStore => return vec![],
124 };
125
126 let entity_name = InternedString::from_str_ref(entity_name);
127 let entity = entities.get(&entity_name);
128
129 match entity {
130 Some(entity) => match &entity.as_spec_ref().fields_used {
131 Some(fields) => fields.iter().map(|f| f.unperformant_to_string()).collect(),
132 None => vec![],
133 },
134 None => vec![],
135 }
136 }
137
138 pub fn unperformant_keys_entity_filter(
139 &self,
140 top_level_key: &str,
141 entity_type: &str,
142 ) -> Vec<String> {
143 let data = read_lock_or_else!(self.data, {
144 log_error_to_statsig_and_console!(
145 &self.ops_stats,
146 TAG,
147 StatsigErr::LockFailure(
148 "Failed to acquire read lock for spec store data".to_string()
149 )
150 );
151 return vec![];
152 });
153
154 if top_level_key == "param_stores" {
155 match &data.values.param_stores {
156 Some(param_stores) => {
157 return param_stores
158 .keys()
159 .map(|k| k.unperformant_to_string())
160 .collect()
161 }
162 None => return vec![],
163 }
164 }
165
166 let values = match top_level_key {
167 "feature_gates" => &data.values.feature_gates,
168 "dynamic_configs" => &data.values.dynamic_configs,
169 "layer_configs" => &data.values.layer_configs,
170 _ => {
171 log_e!(TAG, "Invalid top level key: {}", top_level_key);
172 return vec![];
173 }
174 };
175
176 if entity_type == "*" {
177 return values.keys().map(|k| k.unperformant_to_string()).collect();
178 }
179
180 values
181 .iter()
182 .filter(|(_, v)| v.as_spec_ref().entity == entity_type)
183 .map(|(k, _)| k.unperformant_to_string())
184 .collect()
185 }
186
187 pub fn set_values(&self, specs_update: SpecsUpdate) -> Result<(), StatsigErr> {
188 let mut specs_update = specs_update;
189 let mut locked_data = match self.data.try_write_for(Duration::from_secs(5)) {
190 Some(data) => data,
191 None => {
192 log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
193 return Err(StatsigErr::LockFailure(
194 "Failed to acquire write lock: Failed to lock data".to_string(),
195 ));
196 }
197 };
198 let response_format = self.get_spec_response_format(&specs_update);
199
200 match self.parse_specs_response(&mut specs_update, &mut locked_data, &response_format) {
201 Ok(ParseResult::HasUpdates) => (),
202 Ok(ParseResult::NoUpdates) => {
203 self.ops_stats_log_no_update(specs_update.source, specs_update.source_api);
204 return Ok(());
205 }
206 Err(e) => {
207 return Err(e);
208 }
209 };
210
211 if self.are_current_values_newer(&locked_data) {
212 return Ok(());
213 }
214
215 self.try_update_global_configs(&locked_data.next_values);
216
217 let now = Utc::now().timestamp_millis() as u64;
218 let (prev_source, prev_lcut, curr_values_time) = self.swap_current_with_next(
219 &mut locked_data,
220 &specs_update,
221 now,
222 specs_update.source_api.clone(),
223 )?;
224
225 if let SpecsFormat::Json = response_format {
226 self.try_update_data_store(&specs_update.source, specs_update.data, now);
228 }
229
230 self.ops_stats_log_config_propagation_diff(
231 curr_values_time,
232 prev_lcut,
233 &specs_update.source,
234 &prev_source,
235 specs_update.source_api,
236 response_format,
237 );
238
239 maybe_trim_malloc();
243
244 Ok(())
245 }
246}
247
248impl SpecStore {
251 fn parse_specs_response(
252 &self,
253 values: &mut SpecsUpdate,
254 spec_store_data: &mut SpecStoreData,
255 response_format: &SpecsFormat,
256 ) -> Result<ParseResult, StatsigErr> {
257 spec_store_data.next_values.reset();
258
259 let parse_result = match response_format {
260 SpecsFormat::Protobuf => deserialize_protobuf(
261 &self.ops_stats,
262 &spec_store_data.values,
263 &mut spec_store_data.next_values,
264 &mut values.data,
265 ),
266 SpecsFormat::Json => values
267 .data
268 .deserialize_in_place(&mut spec_store_data.next_values),
269 };
270
271 if parse_result.is_ok() && spec_store_data.next_values.has_updates {
272 return Ok(ParseResult::HasUpdates);
273 }
274
275 let no_updates_result = values.data.deserialize_into::<SpecsResponseNoUpdates>();
276 if let Ok(result) = no_updates_result {
277 if !result.has_updates {
278 return Ok(ParseResult::NoUpdates);
279 }
280 }
281
282 let error = parse_result.err().unwrap_or_else(|| {
283 StatsigErr::JsonParseError("SpecsResponse".to_string(), "Unknown error".to_string())
284 });
285
286 log_error_to_statsig_and_console!(self.ops_stats, TAG, error);
287 Err(error)
288 }
289
290 fn swap_current_with_next(
291 &self,
292 data: &mut SpecStoreData,
293 specs_update: &SpecsUpdate,
294 now: u64,
295 source_api: Option<String>,
296 ) -> Result<(SpecsSource, u64, u64), StatsigErr> {
297 let prev_source = std::mem::replace(&mut data.source, specs_update.source.clone());
298 let prev_lcut = data.values.time;
299
300 std::mem::swap(&mut data.values, &mut data.next_values);
301
302 data.time_received_at = Some(now);
303 data.source_api = source_api;
304 data.next_values.reset();
305
306 self.emit_specs_updated_sdk_event(&data.source, &data.source_api, &data.values);
307
308 Ok((prev_source, prev_lcut, data.values.time))
309 }
310
311 fn emit_specs_updated_sdk_event(
312 &self,
313 source: &SpecsSource,
314 source_api: &Option<String>,
315 values: &SpecsResponseFull,
316 ) {
317 self.event_emitter.emit(SdkEvent::SpecsUpdated {
318 source,
319 source_api,
320 values,
321 });
322 }
323
324 fn ops_stats_log_no_update(&self, source: SpecsSource, source_api: Option<String>) {
325 log_d!(TAG, "No Updates");
326 self.ops_stats.log(ObservabilityEvent::new_event(
327 MetricType::Increment,
328 "config_no_update".to_string(),
329 1.0,
330 Some(HashMap::from([
331 ("source".to_string(), source.to_string()),
332 (
333 "spec_source_api".to_string(),
334 source_api.unwrap_or_default(),
335 ),
336 ])),
337 ));
338 }
339
340 #[allow(clippy::too_many_arguments)]
341 fn ops_stats_log_config_propagation_diff(
342 &self,
343 lcut: u64,
344 prev_lcut: u64,
345 source: &SpecsSource,
346 prev_source: &SpecsSource,
347 source_api: Option<String>,
348 response_format: SpecsFormat,
349 ) {
350 let delay = (Utc::now().timestamp_millis() as u64).saturating_sub(lcut);
351 log_d!(TAG, "Updated ({:?})", source);
352
353 if *prev_source == SpecsSource::Uninitialized || *prev_source == SpecsSource::Loading {
354 return;
355 }
356
357 self.ops_stats.log(ObservabilityEvent::new_event(
358 MetricType::Dist,
359 "config_propagation_diff".to_string(),
360 delay as f64,
361 Some(HashMap::from([
362 ("source".to_string(), source.to_string()),
363 ("lcut".to_string(), lcut.to_string()),
364 ("prev_lcut".to_string(), prev_lcut.to_string()),
365 (
366 "spec_source_api".to_string(),
367 source_api.unwrap_or_default(),
368 ),
369 (
370 "response_format".to_string(),
371 Into::<&str>::into(&response_format).to_string(),
372 ),
373 ])),
374 ));
375 }
376
377 fn get_spec_response_format(&self, update: &SpecsUpdate) -> SpecsFormat {
378 let content_type = update.data.get_header_ref("content-type");
379 if content_type.map(|s| s.as_str().contains("application/octet-stream")) != Some(true) {
380 return SpecsFormat::Json;
381 }
382
383 let content_encoding = update.data.get_header_ref("content-encoding");
384 if content_encoding.map(|s| s.as_str().contains("statsig-br")) != Some(true) {
385 return SpecsFormat::Json;
386 }
387
388 SpecsFormat::Protobuf
389 }
390
391 fn try_update_global_configs(&self, dcs: &SpecsResponseFull) {
392 if let Some(diagnostics) = &dcs.diagnostics {
393 self.global_configs
394 .set_diagnostics_sampling_rates(diagnostics.clone());
395 }
396
397 if let Some(sdk_configs) = &dcs.sdk_configs {
398 self.global_configs.set_sdk_configs(sdk_configs.clone());
399 }
400
401 if let Some(sdk_flags) = &dcs.sdk_flags {
402 self.global_configs.set_sdk_flags(sdk_flags.clone());
403 }
404 }
405
406 fn try_update_data_store(&self, source: &SpecsSource, mut data: ResponseData, now: u64) {
407 if source != &SpecsSource::Network {
408 return;
409 }
410
411 let data_store = match &self.data_store {
412 Some(data_store) => data_store.clone(),
413 None => return,
414 };
415
416 let data_store_key = self.data_store_key.clone();
417
418 let spawn_result = self.statsig_runtime.spawn(
419 "spec_store_update_data_store",
420 move |_shutdown_notif| async move {
421 let data_string = match data.read_to_string() {
422 Ok(s) => s,
423 Err(e) => {
424 log_e!(TAG, "Failed to convert data to string: {}", e);
425 return;
426 }
427 };
428
429 let _ = data_store
430 .set(&data_store_key, &data_string, Some(now))
431 .await;
432 },
433 );
434
435 if let Err(e) = spawn_result {
436 log_e!(
437 TAG,
438 "Failed to spawn spec store update data store task: {e}"
439 );
440 }
441 }
442
443 fn are_current_values_newer(&self, data: &SpecStoreData) -> bool {
444 let curr_values = &data.values;
445 let next_values = &data.next_values;
446 let curr_checksum = curr_values.checksum.as_deref().unwrap_or_default();
447 let new_checksum = next_values.checksum.as_deref().unwrap_or_default();
448
449 let cached_time_is_newer = curr_values.time > 0 && curr_values.time > next_values.time;
450 let checksums_match = !curr_checksum.is_empty() && curr_checksum == new_checksum;
451
452 if cached_time_is_newer || checksums_match {
453 log_d!(
454 TAG,
455 "Received values for [time: {}, checksum: {}], but currently has values for [time: {}, checksum: {}]. Ignoring values.",
456 next_values.time,
457 new_checksum,
458 curr_values.time,
459 curr_checksum,
460 );
461 return true;
462 }
463
464 false
465 }
466}
467
468impl SpecsUpdateListener for SpecStore {
471 fn did_receive_specs_update(&self, update: SpecsUpdate) -> Result<(), StatsigErr> {
472 self.set_values(update)
473 }
474
475 fn get_current_specs_info(&self) -> SpecsInfo {
476 match self.data.try_read_for(Duration::from_secs(5)) {
477 Some(data) => SpecsInfo {
478 lcut: Some(data.values.time),
479 checksum: data.values.checksum.clone(),
480 source: data.source.clone(),
481 source_api: data.source_api.clone(),
482 },
483 None => {
484 log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
485 SpecsInfo {
486 lcut: None,
487 checksum: None,
488 source: SpecsSource::Error,
489 source_api: None,
490 }
491 }
492 }
493 }
494}
495
496impl IdListsUpdateListener for SpecStore {
499 fn get_current_id_list_metadata(
500 &self,
501 ) -> HashMap<String, crate::id_lists_adapter::IdListMetadata> {
502 match self.data.try_read_for(Duration::from_secs(5)) {
503 Some(data) => data
504 .id_lists
505 .iter()
506 .map(|(key, list)| (key.clone(), list.metadata.clone()))
507 .collect(),
508 None => {
509 log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
510 HashMap::new()
511 }
512 }
513 }
514
515 fn did_receive_id_list_updates(
516 &self,
517 updates: HashMap<String, crate::id_lists_adapter::IdListUpdate>,
518 ) {
519 let mut data = match self.data.try_write_for(Duration::from_secs(5)) {
520 Some(data) => data,
521 None => {
522 log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
523 return;
524 }
525 };
526
527 data.id_lists.retain(|name, _| updates.contains_key(name));
529
530 for (list_name, update) in updates {
531 if let Some(entry) = data.id_lists.get_mut(&list_name) {
532 entry.apply_update(update);
534 } else {
535 let mut list = IdList::new(update.new_metadata.clone());
537 list.apply_update(update);
538 data.id_lists.insert(list_name, list);
539 }
540 }
541 }
542}
543
544enum ParseResult {
545 HasUpdates,
546 NoUpdates,
547}