1use chrono::Utc;
2use parking_lot::RwLock;
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::Duration;
6
7use crate::data_store_interface::DataStoreTrait;
8use crate::evaluation::evaluator::SpecType;
9use crate::global_configs::GlobalConfigs;
10use crate::id_lists_adapter::{IdList, IdListsUpdateListener};
11use crate::interned_string::InternedString;
12use crate::networking::ResponseData;
13use crate::observability::observability_client_adapter::{MetricType, ObservabilityEvent};
14use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
15use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
16use crate::sdk_event_emitter::{SdkEvent, SdkEventEmitter};
17use crate::specs_response::proto_specs::deserialize_protobuf;
18use crate::specs_response::spec_types::{SpecsResponseFull, SpecsResponseNoUpdates};
19use crate::utils::maybe_trim_malloc;
20use crate::{
21 log_d, log_e, log_error_to_statsig_and_console, read_lock_or_else, SpecsInfo, SpecsSource,
22 SpecsUpdate, SpecsUpdateListener, StatsigErr, StatsigOptions, StatsigRuntime,
23};
24
25pub struct SpecStoreData {
26 pub source: SpecsSource,
27 pub source_api: Option<String>,
28 pub time_received_at: Option<u64>,
29 pub values: SpecsResponseFull,
30 pub next_values: SpecsResponseFull,
31 pub id_lists: HashMap<String, IdList>,
32}
33
34const TAG: &str = stringify!(SpecStore);
35
36pub struct SpecStore {
37 pub data: Arc<RwLock<SpecStoreData>>,
38
39 data_adapter_key: String,
40 data_store: Option<Arc<dyn DataStoreTrait>>,
41 statsig_runtime: Arc<StatsigRuntime>,
42 ops_stats: Arc<OpsStatsForInstance>,
43 global_configs: Arc<GlobalConfigs>,
44 event_emitter: Arc<SdkEventEmitter>,
45 enable_proto_spec_support: bool,
46}
47
48impl SpecStore {
49 #[must_use]
50 pub fn new(
51 sdk_key: &str,
52 data_adapter_key: String,
53 statsig_runtime: Arc<StatsigRuntime>,
54 event_emitter: Arc<SdkEventEmitter>,
55 options: Option<&StatsigOptions>,
56 ) -> SpecStore {
57 let mut data_store = None;
58 if let Some(options) = options {
59 data_store = options.data_store.clone();
60 }
61
62 let enable_proto_spec_support = options
63 .and_then(|opts| opts.experimental_flags.as_ref())
64 .is_some_and(|flags| flags.contains("enable_proto_spec_support"));
65
66 SpecStore {
67 data_adapter_key,
68 data: Arc::new(RwLock::new(SpecStoreData {
69 values: SpecsResponseFull::default(),
70 next_values: SpecsResponseFull::default(),
71 time_received_at: None,
72 source: SpecsSource::Uninitialized,
73 source_api: None,
74 id_lists: HashMap::new(),
75 })),
76 event_emitter,
77 data_store,
78 statsig_runtime,
79 ops_stats: OPS_STATS.get_for_instance(sdk_key),
80 global_configs: GlobalConfigs::get_instance(sdk_key),
81 enable_proto_spec_support,
82 }
83 }
84
85 pub fn set_source(&self, source: SpecsSource) {
86 match self.data.try_write_for(Duration::from_secs(5)) {
87 Some(mut data) => {
88 data.source = source;
89 log_d!(TAG, "Source Changed ({:?})", data.source);
90 }
91 None => {
92 log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
93 }
94 }
95 }
96
97 pub fn get_current_values(&self) -> Option<SpecsResponseFull> {
98 let data = match self.data.try_read_for(Duration::from_secs(5)) {
99 Some(data) => data,
100 None => {
101 log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
102 return None;
103 }
104 };
105 let json = serde_json::to_string(&data.values).ok()?;
106 serde_json::from_str::<SpecsResponseFull>(&json).ok()
107 }
108
109 pub fn get_fields_used_for_entity(
110 &self,
111 entity_name: &str,
112 entity_type: SpecType,
113 ) -> Vec<String> {
114 let data = read_lock_or_else!(self.data, {
115 log_error_to_statsig_and_console!(
116 &self.ops_stats,
117 TAG,
118 StatsigErr::LockFailure(
119 "Failed to acquire read lock for spec store data".to_string()
120 )
121 );
122 return vec![];
123 });
124
125 let entities = match entity_type {
126 SpecType::Gate => &data.values.feature_gates,
127 SpecType::DynamicConfig | SpecType::Experiment => &data.values.dynamic_configs,
128 SpecType::Layer => &data.values.layer_configs,
129 };
130
131 let entity_name = InternedString::from_str_ref(entity_name);
132 let entity = entities.get(&entity_name);
133
134 match entity {
135 Some(entity) => match &entity.inner.fields_used {
136 Some(fields) => fields.iter().map(|f| f.unperformant_to_string()).collect(),
137 None => vec![],
138 },
139 None => vec![],
140 }
141 }
142
143 pub fn unperformant_keys_entity_filter(
144 &self,
145 top_level_key: &str,
146 entity_type: &str,
147 ) -> Vec<String> {
148 let data = read_lock_or_else!(self.data, {
149 log_error_to_statsig_and_console!(
150 &self.ops_stats,
151 TAG,
152 StatsigErr::LockFailure(
153 "Failed to acquire read lock for spec store data".to_string()
154 )
155 );
156 return vec![];
157 });
158
159 if top_level_key == "param_stores" {
160 match &data.values.param_stores {
161 Some(param_stores) => {
162 return param_stores
163 .keys()
164 .map(|k| k.unperformant_to_string())
165 .collect()
166 }
167 None => return vec![],
168 }
169 }
170
171 let values = match top_level_key {
172 "feature_gates" => &data.values.feature_gates,
173 "dynamic_configs" => &data.values.dynamic_configs,
174 "layer_configs" => &data.values.layer_configs,
175 _ => {
176 log_e!(TAG, "Invalid top level key: {}", top_level_key);
177 return vec![];
178 }
179 };
180
181 if entity_type == "*" {
182 return values.keys().map(|k| k.unperformant_to_string()).collect();
183 }
184
185 values
186 .iter()
187 .filter(|(_, v)| v.inner.entity == entity_type)
188 .map(|(k, _)| k.unperformant_to_string())
189 .collect()
190 }
191
192 pub fn set_values(&self, specs_update: SpecsUpdate) -> Result<(), StatsigErr> {
193 let mut specs_update = specs_update;
194 let mut locked_data = match self.data.try_write_for(Duration::from_secs(5)) {
195 Some(data) => data,
196 None => {
197 log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
198 return Err(StatsigErr::LockFailure(
199 "Failed to acquire write lock: Failed to lock data".to_string(),
200 ));
201 }
202 };
203 let use_protobuf = self.is_proto_spec_response(&specs_update);
204
205 match self.parse_specs_response(&mut specs_update, &mut locked_data, use_protobuf) {
206 Ok(ParseResult::HasUpdates) => (),
207 Ok(ParseResult::NoUpdates) => {
208 self.ops_stats_log_no_update(specs_update.source, specs_update.source_api);
209 return Ok(());
210 }
211 Err(e) => {
212 return Err(e);
213 }
214 };
215
216 if self.are_current_values_newer(&locked_data) {
217 return Ok(());
218 }
219
220 self.try_update_global_configs(&locked_data.next_values);
221
222 let now = Utc::now().timestamp_millis() as u64;
223 let (prev_source, prev_lcut, curr_values_time) = self.swap_current_with_next(
224 &mut locked_data,
225 &specs_update,
226 now,
227 specs_update.source_api.clone(),
228 )?;
229
230 if !use_protobuf {
231 self.try_update_data_store(&specs_update.source, specs_update.data, now);
233 }
234
235 self.ops_stats_log_config_propagation_diff(
236 curr_values_time,
237 prev_lcut,
238 &specs_update.source,
239 &prev_source,
240 specs_update.source_api,
241 self.enable_proto_spec_support,
242 use_protobuf,
243 );
244
245 maybe_trim_malloc();
249
250 Ok(())
251 }
252}
253
254impl SpecStore {
257 fn parse_specs_response(
258 &self,
259 values: &mut SpecsUpdate,
260 spec_store_data: &mut SpecStoreData,
261 use_protobuf: bool,
262 ) -> Result<ParseResult, StatsigErr> {
263 spec_store_data.next_values.reset();
264
265 let parse_result = if use_protobuf {
266 deserialize_protobuf(
267 &self.ops_stats,
268 &spec_store_data.values,
269 &mut spec_store_data.next_values,
270 &mut values.data,
271 )
272 } else {
273 values
274 .data
275 .deserialize_in_place(&mut spec_store_data.next_values)
276 };
277
278 if parse_result.is_ok() && spec_store_data.next_values.has_updates {
279 return Ok(ParseResult::HasUpdates);
280 }
281
282 let no_updates_result = values.data.deserialize_into::<SpecsResponseNoUpdates>();
283 if let Ok(result) = no_updates_result {
284 if !result.has_updates {
285 return Ok(ParseResult::NoUpdates);
286 }
287 }
288
289 let error = parse_result.err().unwrap_or_else(|| {
290 StatsigErr::JsonParseError("SpecsResponse".to_string(), "Unknown error".to_string())
291 });
292
293 log_error_to_statsig_and_console!(self.ops_stats, TAG, error);
294 Err(error)
295 }
296
297 fn swap_current_with_next(
298 &self,
299 data: &mut SpecStoreData,
300 specs_update: &SpecsUpdate,
301 now: u64,
302 source_api: Option<String>,
303 ) -> Result<(SpecsSource, u64, u64), StatsigErr> {
304 let prev_source = std::mem::replace(&mut data.source, specs_update.source.clone());
305 let prev_lcut = data.values.time;
306
307 std::mem::swap(&mut data.values, &mut data.next_values);
308
309 data.time_received_at = Some(now);
310 data.source_api = source_api;
311 data.next_values.reset();
312
313 self.emit_specs_updated_sdk_event(&data.source, &data.source_api, &data.values);
314
315 Ok((prev_source, prev_lcut, data.values.time))
316 }
317
318 fn emit_specs_updated_sdk_event(
319 &self,
320 source: &SpecsSource,
321 source_api: &Option<String>,
322 values: &SpecsResponseFull,
323 ) {
324 self.event_emitter.emit(SdkEvent::SpecsUpdated {
325 source,
326 source_api,
327 values,
328 });
329 }
330
331 fn ops_stats_log_no_update(&self, source: SpecsSource, source_api: Option<String>) {
332 log_d!(TAG, "No Updates");
333 self.ops_stats.log(ObservabilityEvent::new_event(
334 MetricType::Increment,
335 "config_no_update".to_string(),
336 1.0,
337 Some(HashMap::from([
338 ("source".to_string(), source.to_string()),
339 (
340 "spec_source_api".to_string(),
341 source_api.unwrap_or_default(),
342 ),
343 ])),
344 ));
345 }
346
347 #[allow(clippy::too_many_arguments)]
348 fn ops_stats_log_config_propagation_diff(
349 &self,
350 lcut: u64,
351 prev_lcut: u64,
352 source: &SpecsSource,
353 prev_source: &SpecsSource,
354 source_api: Option<String>,
355 request_supports_proto: bool,
356 response_use_proto: bool,
357 ) {
358 let delay = (Utc::now().timestamp_millis() as u64).saturating_sub(lcut);
359 log_d!(TAG, "Updated ({:?})", source);
360
361 if *prev_source == SpecsSource::Uninitialized || *prev_source == SpecsSource::Loading {
362 return;
363 }
364
365 self.ops_stats.log(ObservabilityEvent::new_event(
366 MetricType::Dist,
367 "config_propagation_diff".to_string(),
368 delay as f64,
369 Some(HashMap::from([
370 ("source".to_string(), source.to_string()),
371 ("lcut".to_string(), lcut.to_string()),
372 ("prev_lcut".to_string(), prev_lcut.to_string()),
373 (
374 "spec_source_api".to_string(),
375 source_api.unwrap_or_default(),
376 ),
377 (
378 "request_supports_proto".to_string(),
379 request_supports_proto.to_string(),
380 ),
381 (
382 "response_use_proto".to_string(),
383 response_use_proto.to_string(),
384 ),
385 ])),
386 ));
387 }
388
389 fn is_proto_spec_response(&self, update: &SpecsUpdate) -> bool {
390 if !self.enable_proto_spec_support {
391 return false;
392 }
393
394 let content_type = update.data.get_header_ref("content-type");
395 if content_type.map(|s| s.as_str().contains("application/octet-stream")) != Some(true) {
396 return false;
397 }
398
399 let content_encoding = update.data.get_header_ref("content-encoding");
400 if content_encoding.map(|s| s.as_str().contains("statsig-br")) != Some(true) {
401 return false;
402 }
403
404 true
405 }
406
407 fn try_update_global_configs(&self, dcs: &SpecsResponseFull) {
408 if let Some(diagnostics) = &dcs.diagnostics {
409 self.global_configs
410 .set_diagnostics_sampling_rates(diagnostics.clone());
411 }
412
413 if let Some(sdk_configs) = &dcs.sdk_configs {
414 self.global_configs.set_sdk_configs(sdk_configs.clone());
415 }
416
417 if let Some(sdk_flags) = &dcs.sdk_flags {
418 self.global_configs.set_sdk_flags(sdk_flags.clone());
419 }
420 }
421
422 fn try_update_data_store(&self, source: &SpecsSource, mut data: ResponseData, now: u64) {
423 if source != &SpecsSource::Network {
424 return;
425 }
426
427 let data_store = match &self.data_store {
428 Some(data_store) => data_store.clone(),
429 None => return,
430 };
431
432 let data_adapter_key = self.data_adapter_key.clone();
433
434 let spawn_result = self.statsig_runtime.spawn(
435 "spec_store_update_data_store",
436 move |_shutdown_notif| async move {
437 let data_string = match data.read_to_string() {
438 Ok(s) => s,
439 Err(e) => {
440 log_e!(TAG, "Failed to convert data to string: {}", e);
441 return;
442 }
443 };
444
445 let _ = data_store
446 .set(&data_adapter_key, &data_string, Some(now))
447 .await;
448 },
449 );
450
451 if let Err(e) = spawn_result {
452 log_e!(
453 TAG,
454 "Failed to spawn spec store update data store task: {e}"
455 );
456 }
457 }
458
459 fn are_current_values_newer(&self, data: &SpecStoreData) -> bool {
460 let curr_values = &data.values;
461 let next_values = &data.next_values;
462 let curr_checksum = curr_values.checksum.as_deref().unwrap_or_default();
463 let new_checksum = next_values.checksum.as_deref().unwrap_or_default();
464
465 let cached_time_is_newer = curr_values.time > 0 && curr_values.time > next_values.time;
466 let checksums_match = !curr_checksum.is_empty() && curr_checksum == new_checksum;
467
468 if cached_time_is_newer || checksums_match {
469 log_d!(
470 TAG,
471 "Received values for [time: {}, checksum: {}], but currently has values for [time: {}, checksum: {}]. Ignoring values.",
472 next_values.time,
473 new_checksum,
474 curr_values.time,
475 curr_checksum,
476 );
477 return true;
478 }
479
480 false
481 }
482}
483
484impl SpecsUpdateListener for SpecStore {
487 fn did_receive_specs_update(&self, update: SpecsUpdate) -> Result<(), StatsigErr> {
488 self.set_values(update)
489 }
490
491 fn get_current_specs_info(&self) -> SpecsInfo {
492 match self.data.try_read_for(Duration::from_secs(5)) {
493 Some(data) => SpecsInfo {
494 lcut: Some(data.values.time),
495 checksum: data.values.checksum.clone(),
496 source: data.source.clone(),
497 source_api: data.source_api.clone(),
498 },
499 None => {
500 log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
501 SpecsInfo {
502 lcut: None,
503 checksum: None,
504 source: SpecsSource::Error,
505 source_api: None,
506 }
507 }
508 }
509 }
510}
511
512impl IdListsUpdateListener for SpecStore {
515 fn get_current_id_list_metadata(
516 &self,
517 ) -> HashMap<String, crate::id_lists_adapter::IdListMetadata> {
518 match self.data.try_read_for(Duration::from_secs(5)) {
519 Some(data) => data
520 .id_lists
521 .iter()
522 .map(|(key, list)| (key.clone(), list.metadata.clone()))
523 .collect(),
524 None => {
525 log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
526 HashMap::new()
527 }
528 }
529 }
530
531 fn did_receive_id_list_updates(
532 &self,
533 updates: HashMap<String, crate::id_lists_adapter::IdListUpdate>,
534 ) {
535 let mut data = match self.data.try_write_for(Duration::from_secs(5)) {
536 Some(data) => data,
537 None => {
538 log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
539 return;
540 }
541 };
542
543 data.id_lists.retain(|name, _| updates.contains_key(name));
545
546 for (list_name, update) in updates {
547 if let Some(entry) = data.id_lists.get_mut(&list_name) {
548 entry.apply_update(update);
550 } else {
551 let mut list = IdList::new(update.new_metadata.clone());
553 list.apply_update(update);
554 data.id_lists.insert(list_name, list);
555 }
556 }
557 }
558}
559
560enum ParseResult {
561 HasUpdates,
562 NoUpdates,
563}