statsig_rust/
spec_store.rs

1use chrono::Utc;
2use parking_lot::RwLock;
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::Duration;
6
7use crate::data_store_interface::DataStoreTrait;
8use crate::evaluation::evaluator::SpecType;
9use crate::global_configs::GlobalConfigs;
10use crate::id_lists_adapter::{IdList, IdListsUpdateListener};
11use crate::interned_string::InternedString;
12use crate::networking::ResponseData;
13use crate::observability::observability_client_adapter::{MetricType, ObservabilityEvent};
14use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
15use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
16use crate::sdk_event_emitter::{SdkEvent, SdkEventEmitter};
17use crate::specs_response::proto_specs::deserialize_protobuf;
18use crate::specs_response::spec_types::{SpecsResponseFull, SpecsResponseNoUpdates};
19use crate::utils::maybe_trim_malloc;
20use crate::{
21    log_d, log_e, log_error_to_statsig_and_console, read_lock_or_else, SpecsInfo, SpecsSource,
22    SpecsUpdate, SpecsUpdateListener, StatsigErr, StatsigOptions, StatsigRuntime,
23};
24
25pub struct SpecStoreData {
26    pub source: SpecsSource,
27    pub source_api: Option<String>,
28    pub time_received_at: Option<u64>,
29    pub values: SpecsResponseFull,
30    pub next_values: SpecsResponseFull,
31    pub id_lists: HashMap<String, IdList>,
32}
33
34const TAG: &str = stringify!(SpecStore);
35
36pub struct SpecStore {
37    pub data: Arc<RwLock<SpecStoreData>>,
38
39    data_adapter_key: String,
40    data_store: Option<Arc<dyn DataStoreTrait>>,
41    statsig_runtime: Arc<StatsigRuntime>,
42    ops_stats: Arc<OpsStatsForInstance>,
43    global_configs: Arc<GlobalConfigs>,
44    event_emitter: Arc<SdkEventEmitter>,
45    enable_proto_spec_support: bool,
46}
47
48impl SpecStore {
49    #[must_use]
50    pub fn new(
51        sdk_key: &str,
52        data_adapter_key: String,
53        statsig_runtime: Arc<StatsigRuntime>,
54        event_emitter: Arc<SdkEventEmitter>,
55        options: Option<&StatsigOptions>,
56    ) -> SpecStore {
57        let mut data_store = None;
58        if let Some(options) = options {
59            data_store = options.data_store.clone();
60        }
61
62        let enable_proto_spec_support = options
63            .and_then(|opts| opts.experimental_flags.as_ref())
64            .is_some_and(|flags| flags.contains("enable_proto_spec_support"));
65
66        SpecStore {
67            data_adapter_key,
68            data: Arc::new(RwLock::new(SpecStoreData {
69                values: SpecsResponseFull::default(),
70                next_values: SpecsResponseFull::default(),
71                time_received_at: None,
72                source: SpecsSource::Uninitialized,
73                source_api: None,
74                id_lists: HashMap::new(),
75            })),
76            event_emitter,
77            data_store,
78            statsig_runtime,
79            ops_stats: OPS_STATS.get_for_instance(sdk_key),
80            global_configs: GlobalConfigs::get_instance(sdk_key),
81            enable_proto_spec_support,
82        }
83    }
84
85    pub fn set_source(&self, source: SpecsSource) {
86        match self.data.try_write_for(Duration::from_secs(5)) {
87            Some(mut data) => {
88                data.source = source;
89                log_d!(TAG, "Source Changed ({:?})", data.source);
90            }
91            None => {
92                log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
93            }
94        }
95    }
96
97    pub fn get_current_values(&self) -> Option<SpecsResponseFull> {
98        let data = match self.data.try_read_for(Duration::from_secs(5)) {
99            Some(data) => data,
100            None => {
101                log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
102                return None;
103            }
104        };
105        let json = serde_json::to_string(&data.values).ok()?;
106        serde_json::from_str::<SpecsResponseFull>(&json).ok()
107    }
108
109    pub fn get_fields_used_for_entity(
110        &self,
111        entity_name: &str,
112        entity_type: SpecType,
113    ) -> Vec<String> {
114        let data = read_lock_or_else!(self.data, {
115            log_error_to_statsig_and_console!(
116                &self.ops_stats,
117                TAG,
118                StatsigErr::LockFailure(
119                    "Failed to acquire read lock for spec store data".to_string()
120                )
121            );
122            return vec![];
123        });
124
125        let entities = match entity_type {
126            SpecType::Gate => &data.values.feature_gates,
127            SpecType::DynamicConfig | SpecType::Experiment => &data.values.dynamic_configs,
128            SpecType::Layer => &data.values.layer_configs,
129        };
130
131        let entity_name = InternedString::from_str_ref(entity_name);
132        let entity = entities.get(&entity_name);
133
134        match entity {
135            Some(entity) => match &entity.inner.fields_used {
136                Some(fields) => fields.iter().map(|f| f.unperformant_to_string()).collect(),
137                None => vec![],
138            },
139            None => vec![],
140        }
141    }
142
143    pub fn unperformant_keys_entity_filter(
144        &self,
145        top_level_key: &str,
146        entity_type: &str,
147    ) -> Vec<String> {
148        let data = read_lock_or_else!(self.data, {
149            log_error_to_statsig_and_console!(
150                &self.ops_stats,
151                TAG,
152                StatsigErr::LockFailure(
153                    "Failed to acquire read lock for spec store data".to_string()
154                )
155            );
156            return vec![];
157        });
158
159        if top_level_key == "param_stores" {
160            match &data.values.param_stores {
161                Some(param_stores) => {
162                    return param_stores
163                        .keys()
164                        .map(|k| k.unperformant_to_string())
165                        .collect()
166                }
167                None => return vec![],
168            }
169        }
170
171        let values = match top_level_key {
172            "feature_gates" => &data.values.feature_gates,
173            "dynamic_configs" => &data.values.dynamic_configs,
174            "layer_configs" => &data.values.layer_configs,
175            _ => {
176                log_e!(TAG, "Invalid top level key: {}", top_level_key);
177                return vec![];
178            }
179        };
180
181        if entity_type == "*" {
182            return values.keys().map(|k| k.unperformant_to_string()).collect();
183        }
184
185        values
186            .iter()
187            .filter(|(_, v)| v.inner.entity == entity_type)
188            .map(|(k, _)| k.unperformant_to_string())
189            .collect()
190    }
191
192    pub fn set_values(&self, specs_update: SpecsUpdate) -> Result<(), StatsigErr> {
193        let mut specs_update = specs_update;
194        let mut locked_data = match self.data.try_write_for(Duration::from_secs(5)) {
195            Some(data) => data,
196            None => {
197                log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
198                return Err(StatsigErr::LockFailure(
199                    "Failed to acquire write lock: Failed to lock data".to_string(),
200                ));
201            }
202        };
203        let use_protobuf = self.is_proto_spec_response(&specs_update);
204
205        match self.parse_specs_response(&mut specs_update, &mut locked_data, use_protobuf) {
206            Ok(ParseResult::HasUpdates) => (),
207            Ok(ParseResult::NoUpdates) => {
208                self.ops_stats_log_no_update(specs_update.source, specs_update.source_api);
209                return Ok(());
210            }
211            Err(e) => {
212                return Err(e);
213            }
214        };
215
216        if self.are_current_values_newer(&locked_data) {
217            return Ok(());
218        }
219
220        self.try_update_global_configs(&locked_data.next_values);
221
222        let now = Utc::now().timestamp_millis() as u64;
223        let (prev_source, prev_lcut, curr_values_time) = self.swap_current_with_next(
224            &mut locked_data,
225            &specs_update,
226            now,
227            specs_update.source_api.clone(),
228        )?;
229
230        if !use_protobuf {
231            // protobuf response writes to data store are not current supported
232            self.try_update_data_store(&specs_update.source, specs_update.data, now);
233        }
234
235        self.ops_stats_log_config_propagation_diff(
236            curr_values_time,
237            prev_lcut,
238            &specs_update.source,
239            &prev_source,
240            specs_update.source_api,
241            self.enable_proto_spec_support,
242            use_protobuf,
243        );
244
245        // Glibc requested more memory than needed when deserializing a big json blob
246        // And memory allocator fails to return it.
247        // To prevent service from OOMing, manually unused heap memory.
248        maybe_trim_malloc();
249
250        Ok(())
251    }
252}
253
254// -------------------------------------------------------------------------------------------- [Private Functions]
255
256impl SpecStore {
257    fn parse_specs_response(
258        &self,
259        values: &mut SpecsUpdate,
260        spec_store_data: &mut SpecStoreData,
261        use_protobuf: bool,
262    ) -> Result<ParseResult, StatsigErr> {
263        spec_store_data.next_values.reset();
264
265        let parse_result = if use_protobuf {
266            deserialize_protobuf(
267                &self.ops_stats,
268                &spec_store_data.values,
269                &mut spec_store_data.next_values,
270                &mut values.data,
271            )
272        } else {
273            values
274                .data
275                .deserialize_in_place(&mut spec_store_data.next_values)
276        };
277
278        if parse_result.is_ok() && spec_store_data.next_values.has_updates {
279            return Ok(ParseResult::HasUpdates);
280        }
281
282        let no_updates_result = values.data.deserialize_into::<SpecsResponseNoUpdates>();
283        if let Ok(result) = no_updates_result {
284            if !result.has_updates {
285                return Ok(ParseResult::NoUpdates);
286            }
287        }
288
289        let error = parse_result.err().unwrap_or_else(|| {
290            StatsigErr::JsonParseError("SpecsResponse".to_string(), "Unknown error".to_string())
291        });
292
293        log_error_to_statsig_and_console!(self.ops_stats, TAG, error);
294        Err(error)
295    }
296
297    fn swap_current_with_next(
298        &self,
299        data: &mut SpecStoreData,
300        specs_update: &SpecsUpdate,
301        now: u64,
302        source_api: Option<String>,
303    ) -> Result<(SpecsSource, u64, u64), StatsigErr> {
304        let prev_source = std::mem::replace(&mut data.source, specs_update.source.clone());
305        let prev_lcut = data.values.time;
306
307        std::mem::swap(&mut data.values, &mut data.next_values);
308
309        data.time_received_at = Some(now);
310        data.source_api = source_api;
311        data.next_values.reset();
312
313        self.emit_specs_updated_sdk_event(&data.source, &data.source_api, &data.values);
314
315        Ok((prev_source, prev_lcut, data.values.time))
316    }
317
318    fn emit_specs_updated_sdk_event(
319        &self,
320        source: &SpecsSource,
321        source_api: &Option<String>,
322        values: &SpecsResponseFull,
323    ) {
324        self.event_emitter.emit(SdkEvent::SpecsUpdated {
325            source,
326            source_api,
327            values,
328        });
329    }
330
331    fn ops_stats_log_no_update(&self, source: SpecsSource, source_api: Option<String>) {
332        log_d!(TAG, "No Updates");
333        self.ops_stats.log(ObservabilityEvent::new_event(
334            MetricType::Increment,
335            "config_no_update".to_string(),
336            1.0,
337            Some(HashMap::from([
338                ("source".to_string(), source.to_string()),
339                (
340                    "spec_source_api".to_string(),
341                    source_api.unwrap_or_default(),
342                ),
343            ])),
344        ));
345    }
346
347    #[allow(clippy::too_many_arguments)]
348    fn ops_stats_log_config_propagation_diff(
349        &self,
350        lcut: u64,
351        prev_lcut: u64,
352        source: &SpecsSource,
353        prev_source: &SpecsSource,
354        source_api: Option<String>,
355        request_supports_proto: bool,
356        response_use_proto: bool,
357    ) {
358        let delay = (Utc::now().timestamp_millis() as u64).saturating_sub(lcut);
359        log_d!(TAG, "Updated ({:?})", source);
360
361        if *prev_source == SpecsSource::Uninitialized || *prev_source == SpecsSource::Loading {
362            return;
363        }
364
365        self.ops_stats.log(ObservabilityEvent::new_event(
366            MetricType::Dist,
367            "config_propagation_diff".to_string(),
368            delay as f64,
369            Some(HashMap::from([
370                ("source".to_string(), source.to_string()),
371                ("lcut".to_string(), lcut.to_string()),
372                ("prev_lcut".to_string(), prev_lcut.to_string()),
373                (
374                    "spec_source_api".to_string(),
375                    source_api.unwrap_or_default(),
376                ),
377                (
378                    "request_supports_proto".to_string(),
379                    request_supports_proto.to_string(),
380                ),
381                (
382                    "response_use_proto".to_string(),
383                    response_use_proto.to_string(),
384                ),
385            ])),
386        ));
387    }
388
389    fn is_proto_spec_response(&self, update: &SpecsUpdate) -> bool {
390        if !self.enable_proto_spec_support {
391            return false;
392        }
393
394        let content_type = update.data.get_header_ref("content-type");
395        if content_type.map(|s| s.as_str().contains("application/octet-stream")) != Some(true) {
396            return false;
397        }
398
399        let content_encoding = update.data.get_header_ref("content-encoding");
400        if content_encoding.map(|s| s.as_str().contains("statsig-br")) != Some(true) {
401            return false;
402        }
403
404        true
405    }
406
407    fn try_update_global_configs(&self, dcs: &SpecsResponseFull) {
408        if let Some(diagnostics) = &dcs.diagnostics {
409            self.global_configs
410                .set_diagnostics_sampling_rates(diagnostics.clone());
411        }
412
413        if let Some(sdk_configs) = &dcs.sdk_configs {
414            self.global_configs.set_sdk_configs(sdk_configs.clone());
415        }
416
417        if let Some(sdk_flags) = &dcs.sdk_flags {
418            self.global_configs.set_sdk_flags(sdk_flags.clone());
419        }
420    }
421
422    fn try_update_data_store(&self, source: &SpecsSource, mut data: ResponseData, now: u64) {
423        if source != &SpecsSource::Network {
424            return;
425        }
426
427        let data_store = match &self.data_store {
428            Some(data_store) => data_store.clone(),
429            None => return,
430        };
431
432        let data_adapter_key = self.data_adapter_key.clone();
433
434        let spawn_result = self.statsig_runtime.spawn(
435            "spec_store_update_data_store",
436            move |_shutdown_notif| async move {
437                let data_string = match data.read_to_string() {
438                    Ok(s) => s,
439                    Err(e) => {
440                        log_e!(TAG, "Failed to convert data to string: {}", e);
441                        return;
442                    }
443                };
444
445                let _ = data_store
446                    .set(&data_adapter_key, &data_string, Some(now))
447                    .await;
448            },
449        );
450
451        if let Err(e) = spawn_result {
452            log_e!(
453                TAG,
454                "Failed to spawn spec store update data store task: {e}"
455            );
456        }
457    }
458
459    fn are_current_values_newer(&self, data: &SpecStoreData) -> bool {
460        let curr_values = &data.values;
461        let next_values = &data.next_values;
462        let curr_checksum = curr_values.checksum.as_deref().unwrap_or_default();
463        let new_checksum = next_values.checksum.as_deref().unwrap_or_default();
464
465        let cached_time_is_newer = curr_values.time > 0 && curr_values.time > next_values.time;
466        let checksums_match = !curr_checksum.is_empty() && curr_checksum == new_checksum;
467
468        if cached_time_is_newer || checksums_match {
469            log_d!(
470                TAG,
471                "Received values for [time: {}, checksum: {}], but currently has values for [time: {}, checksum: {}]. Ignoring values.",
472                next_values.time,
473                new_checksum,
474                curr_values.time,
475                curr_checksum,
476                );
477            return true;
478        }
479
480        false
481    }
482}
483
484// -------------------------------------------------------------------------------------------- [Impl SpecsUpdateListener]
485
486impl SpecsUpdateListener for SpecStore {
487    fn did_receive_specs_update(&self, update: SpecsUpdate) -> Result<(), StatsigErr> {
488        self.set_values(update)
489    }
490
491    fn get_current_specs_info(&self) -> SpecsInfo {
492        match self.data.try_read_for(Duration::from_secs(5)) {
493            Some(data) => SpecsInfo {
494                lcut: Some(data.values.time),
495                checksum: data.values.checksum.clone(),
496                source: data.source.clone(),
497                source_api: data.source_api.clone(),
498            },
499            None => {
500                log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
501                SpecsInfo {
502                    lcut: None,
503                    checksum: None,
504                    source: SpecsSource::Error,
505                    source_api: None,
506                }
507            }
508        }
509    }
510}
511
512// -------------------------------------------------------------------------------------------- [Impl IdListsUpdateListener]
513
514impl IdListsUpdateListener for SpecStore {
515    fn get_current_id_list_metadata(
516        &self,
517    ) -> HashMap<String, crate::id_lists_adapter::IdListMetadata> {
518        match self.data.try_read_for(Duration::from_secs(5)) {
519            Some(data) => data
520                .id_lists
521                .iter()
522                .map(|(key, list)| (key.clone(), list.metadata.clone()))
523                .collect(),
524            None => {
525                log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
526                HashMap::new()
527            }
528        }
529    }
530
531    fn did_receive_id_list_updates(
532        &self,
533        updates: HashMap<String, crate::id_lists_adapter::IdListUpdate>,
534    ) {
535        let mut data = match self.data.try_write_for(Duration::from_secs(5)) {
536            Some(data) => data,
537            None => {
538                log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
539                return;
540            }
541        };
542
543        // delete any id_lists that are not in the updates
544        data.id_lists.retain(|name, _| updates.contains_key(name));
545
546        for (list_name, update) in updates {
547            if let Some(entry) = data.id_lists.get_mut(&list_name) {
548                // update existing
549                entry.apply_update(update);
550            } else {
551                // add new
552                let mut list = IdList::new(update.new_metadata.clone());
553                list.apply_update(update);
554                data.id_lists.insert(list_name, list);
555            }
556        }
557    }
558}
559
560enum ParseResult {
561    HasUpdates,
562    NoUpdates,
563}