Skip to main content

statsig_rust/
spec_store.rs

1use chrono::Utc;
2use parking_lot::RwLock;
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::Duration;
6
7use crate::data_store_interface::DataStoreTrait;
8use crate::evaluation::evaluator::SpecType;
9use crate::global_configs::GlobalConfigs;
10use crate::id_lists_adapter::{IdList, IdListsUpdateListener};
11use crate::interned_string::InternedString;
12use crate::networking::ResponseData;
13use crate::observability::observability_client_adapter::{MetricType, ObservabilityEvent};
14use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
15use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
16use crate::sdk_event_emitter::{SdkEvent, SdkEventEmitter};
17use crate::specs_response::proto_specs::deserialize_protobuf;
18use crate::specs_response::spec_types::{SpecsResponseFull, SpecsResponseNoUpdates};
19use crate::utils::maybe_trim_malloc;
20use crate::{
21    log_d, log_e, log_error_to_statsig_and_console, read_lock_or_else, SpecsFormat, SpecsInfo,
22    SpecsSource, SpecsUpdate, SpecsUpdateListener, StatsigErr, StatsigOptions, StatsigRuntime,
23};
24
25pub struct SpecStoreData {
26    pub source: SpecsSource,
27    pub source_api: Option<String>,
28    pub time_received_at: Option<u64>,
29    pub values: SpecsResponseFull,
30    pub next_values: SpecsResponseFull,
31    pub id_lists: HashMap<String, IdList>,
32}
33
34const TAG: &str = stringify!(SpecStore);
35
36pub struct SpecStore {
37    pub data: Arc<RwLock<SpecStoreData>>,
38
39    data_store_key: String,
40    data_store: Option<Arc<dyn DataStoreTrait>>,
41    statsig_runtime: Arc<StatsigRuntime>,
42    ops_stats: Arc<OpsStatsForInstance>,
43    global_configs: Arc<GlobalConfigs>,
44    event_emitter: Arc<SdkEventEmitter>,
45}
46
47impl SpecStore {
48    #[must_use]
49    pub fn new(
50        sdk_key: &str,
51        data_store_key: String,
52        statsig_runtime: Arc<StatsigRuntime>,
53        event_emitter: Arc<SdkEventEmitter>,
54        options: Option<&StatsigOptions>,
55    ) -> SpecStore {
56        let mut data_store = None;
57        if let Some(options) = options {
58            data_store = options.data_store.clone();
59        }
60
61        SpecStore {
62            data_store_key,
63            data: Arc::new(RwLock::new(SpecStoreData {
64                values: SpecsResponseFull::default(),
65                next_values: SpecsResponseFull::default(),
66                time_received_at: None,
67                source: SpecsSource::Uninitialized,
68                source_api: None,
69                id_lists: HashMap::new(),
70            })),
71            event_emitter,
72            data_store,
73            statsig_runtime,
74            ops_stats: OPS_STATS.get_for_instance(sdk_key),
75            global_configs: GlobalConfigs::get_instance(sdk_key),
76        }
77    }
78
79    pub fn set_source(&self, source: SpecsSource) {
80        match self.data.try_write_for(Duration::from_secs(5)) {
81            Some(mut data) => {
82                data.source = source;
83                log_d!(TAG, "Source Changed ({:?})", data.source);
84            }
85            None => {
86                log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
87            }
88        }
89    }
90
91    pub fn get_current_values(&self) -> Option<SpecsResponseFull> {
92        let data = match self.data.try_read_for(Duration::from_secs(5)) {
93            Some(data) => data,
94            None => {
95                log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
96                return None;
97            }
98        };
99        let json = serde_json::to_string(&data.values).ok()?;
100        serde_json::from_str::<SpecsResponseFull>(&json).ok()
101    }
102
103    pub fn get_fields_used_for_entity(
104        &self,
105        entity_name: &str,
106        entity_type: SpecType,
107    ) -> Vec<String> {
108        let data = read_lock_or_else!(self.data, {
109            log_error_to_statsig_and_console!(
110                &self.ops_stats,
111                TAG,
112                StatsigErr::LockFailure(
113                    "Failed to acquire read lock for spec store data".to_string()
114                )
115            );
116            return vec![];
117        });
118
119        let entities = match entity_type {
120            SpecType::Gate => &data.values.feature_gates,
121            SpecType::DynamicConfig | SpecType::Experiment => &data.values.dynamic_configs,
122            SpecType::Layer => &data.values.layer_configs,
123            SpecType::ParameterStore => return vec![],
124        };
125
126        let entity_name = InternedString::from_str_ref(entity_name);
127        let entity = entities.get(&entity_name);
128
129        match entity {
130            Some(entity) => match &entity.as_spec_ref().fields_used {
131                Some(fields) => fields.iter().map(|f| f.unperformant_to_string()).collect(),
132                None => vec![],
133            },
134            None => vec![],
135        }
136    }
137
138    pub fn unperformant_keys_entity_filter(
139        &self,
140        top_level_key: &str,
141        entity_type: &str,
142    ) -> Vec<String> {
143        let data = read_lock_or_else!(self.data, {
144            log_error_to_statsig_and_console!(
145                &self.ops_stats,
146                TAG,
147                StatsigErr::LockFailure(
148                    "Failed to acquire read lock for spec store data".to_string()
149                )
150            );
151            return vec![];
152        });
153
154        if top_level_key == "param_stores" {
155            match &data.values.param_stores {
156                Some(param_stores) => {
157                    return param_stores
158                        .keys()
159                        .map(|k| k.unperformant_to_string())
160                        .collect()
161                }
162                None => return vec![],
163            }
164        }
165
166        let values = match top_level_key {
167            "feature_gates" => &data.values.feature_gates,
168            "dynamic_configs" => &data.values.dynamic_configs,
169            "layer_configs" => &data.values.layer_configs,
170            _ => {
171                log_e!(TAG, "Invalid top level key: {}", top_level_key);
172                return vec![];
173            }
174        };
175
176        if entity_type == "*" {
177            return values.keys().map(|k| k.unperformant_to_string()).collect();
178        }
179
180        values
181            .iter()
182            .filter(|(_, v)| v.as_spec_ref().entity == entity_type)
183            .map(|(k, _)| k.unperformant_to_string())
184            .collect()
185    }
186
187    pub fn set_values(&self, specs_update: SpecsUpdate) -> Result<(), StatsigErr> {
188        let mut specs_update = specs_update;
189        let mut locked_data = match self.data.try_write_for(Duration::from_secs(5)) {
190            Some(data) => data,
191            None => {
192                log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
193                return Err(StatsigErr::LockFailure(
194                    "Failed to acquire write lock: Failed to lock data".to_string(),
195                ));
196            }
197        };
198        let response_format = self.get_spec_response_format(&specs_update);
199
200        match self.parse_specs_response(&mut specs_update, &mut locked_data, &response_format) {
201            Ok(ParseResult::HasUpdates) => (),
202            Ok(ParseResult::NoUpdates) => {
203                self.ops_stats_log_no_update(specs_update.source, specs_update.source_api);
204                return Ok(());
205            }
206            Err(e) => {
207                return Err(e);
208            }
209        };
210
211        if self.are_current_values_newer(&locked_data) {
212            return Ok(());
213        }
214
215        self.try_update_global_configs(&locked_data.next_values);
216
217        let now = Utc::now().timestamp_millis() as u64;
218        let (prev_source, prev_lcut, curr_values_time) = self.swap_current_with_next(
219            &mut locked_data,
220            &specs_update,
221            now,
222            specs_update.source_api.clone(),
223        )?;
224
225        if let SpecsFormat::Json = response_format {
226            // protobuf response writes to data store are not current supported
227            self.try_update_data_store(&specs_update.source, specs_update.data, now);
228        }
229
230        self.ops_stats_log_config_propagation_diff(
231            curr_values_time,
232            prev_lcut,
233            &specs_update.source,
234            &prev_source,
235            specs_update.source_api,
236            response_format,
237        );
238
239        // Glibc requested more memory than needed when deserializing a big json blob
240        // And memory allocator fails to return it.
241        // To prevent service from OOMing, manually unused heap memory.
242        maybe_trim_malloc();
243
244        Ok(())
245    }
246}
247
248// -------------------------------------------------------------------------------------------- [Private Functions]
249
250impl SpecStore {
251    fn parse_specs_response(
252        &self,
253        values: &mut SpecsUpdate,
254        spec_store_data: &mut SpecStoreData,
255        response_format: &SpecsFormat,
256    ) -> Result<ParseResult, StatsigErr> {
257        spec_store_data.next_values.reset();
258
259        let parse_result = match response_format {
260            SpecsFormat::Protobuf => deserialize_protobuf(
261                &self.ops_stats,
262                &spec_store_data.values,
263                &mut spec_store_data.next_values,
264                &mut values.data,
265            ),
266            SpecsFormat::Json => values
267                .data
268                .deserialize_in_place(&mut spec_store_data.next_values),
269        };
270
271        if parse_result.is_ok() && spec_store_data.next_values.has_updates {
272            return Ok(ParseResult::HasUpdates);
273        }
274
275        let no_updates_result = values.data.deserialize_into::<SpecsResponseNoUpdates>();
276        if let Ok(result) = no_updates_result {
277            if !result.has_updates {
278                return Ok(ParseResult::NoUpdates);
279            }
280        }
281
282        let error = parse_result.err().unwrap_or_else(|| {
283            StatsigErr::JsonParseError("SpecsResponse".to_string(), "Unknown error".to_string())
284        });
285
286        log_error_to_statsig_and_console!(self.ops_stats, TAG, error);
287        Err(error)
288    }
289
290    fn swap_current_with_next(
291        &self,
292        data: &mut SpecStoreData,
293        specs_update: &SpecsUpdate,
294        now: u64,
295        source_api: Option<String>,
296    ) -> Result<(SpecsSource, u64, u64), StatsigErr> {
297        let prev_source = std::mem::replace(&mut data.source, specs_update.source.clone());
298        let prev_lcut = data.values.time;
299
300        std::mem::swap(&mut data.values, &mut data.next_values);
301
302        data.time_received_at = Some(now);
303        data.source_api = source_api;
304        data.next_values.reset();
305
306        self.emit_specs_updated_sdk_event(&data.source, &data.source_api, &data.values);
307
308        Ok((prev_source, prev_lcut, data.values.time))
309    }
310
311    fn emit_specs_updated_sdk_event(
312        &self,
313        source: &SpecsSource,
314        source_api: &Option<String>,
315        values: &SpecsResponseFull,
316    ) {
317        self.event_emitter.emit(SdkEvent::SpecsUpdated {
318            source,
319            source_api,
320            values,
321        });
322    }
323
324    fn ops_stats_log_no_update(&self, source: SpecsSource, source_api: Option<String>) {
325        log_d!(TAG, "No Updates");
326        self.ops_stats.log(ObservabilityEvent::new_event(
327            MetricType::Increment,
328            "config_no_update".to_string(),
329            1.0,
330            Some(HashMap::from([
331                ("source".to_string(), source.to_string()),
332                (
333                    "spec_source_api".to_string(),
334                    source_api.unwrap_or_default(),
335                ),
336            ])),
337        ));
338    }
339
340    #[allow(clippy::too_many_arguments)]
341    fn ops_stats_log_config_propagation_diff(
342        &self,
343        lcut: u64,
344        prev_lcut: u64,
345        source: &SpecsSource,
346        prev_source: &SpecsSource,
347        source_api: Option<String>,
348        response_format: SpecsFormat,
349    ) {
350        let delay = (Utc::now().timestamp_millis() as u64).saturating_sub(lcut);
351        log_d!(TAG, "Updated ({:?})", source);
352
353        if *prev_source == SpecsSource::Uninitialized || *prev_source == SpecsSource::Loading {
354            return;
355        }
356
357        self.ops_stats.log(ObservabilityEvent::new_event(
358            MetricType::Dist,
359            "config_propagation_diff".to_string(),
360            delay as f64,
361            Some(HashMap::from([
362                ("source".to_string(), source.to_string()),
363                ("lcut".to_string(), lcut.to_string()),
364                ("prev_lcut".to_string(), prev_lcut.to_string()),
365                (
366                    "spec_source_api".to_string(),
367                    source_api.unwrap_or_default(),
368                ),
369                (
370                    "response_format".to_string(),
371                    Into::<&str>::into(&response_format).to_string(),
372                ),
373            ])),
374        ));
375    }
376
377    fn get_spec_response_format(&self, update: &SpecsUpdate) -> SpecsFormat {
378        let content_type = update.data.get_header_ref("content-type");
379        if content_type.map(|s| s.as_str().contains("application/octet-stream")) != Some(true) {
380            return SpecsFormat::Json;
381        }
382
383        let content_encoding = update.data.get_header_ref("content-encoding");
384        if content_encoding.map(|s| s.as_str().contains("statsig-br")) != Some(true) {
385            return SpecsFormat::Json;
386        }
387
388        SpecsFormat::Protobuf
389    }
390
391    fn try_update_global_configs(&self, dcs: &SpecsResponseFull) {
392        if let Some(diagnostics) = &dcs.diagnostics {
393            self.global_configs
394                .set_diagnostics_sampling_rates(diagnostics.clone());
395        }
396
397        if let Some(sdk_configs) = &dcs.sdk_configs {
398            self.global_configs.set_sdk_configs(sdk_configs.clone());
399        }
400
401        if let Some(sdk_flags) = &dcs.sdk_flags {
402            self.global_configs.set_sdk_flags(sdk_flags.clone());
403        }
404    }
405
406    fn try_update_data_store(&self, source: &SpecsSource, mut data: ResponseData, now: u64) {
407        if source != &SpecsSource::Network {
408            return;
409        }
410
411        let data_store = match &self.data_store {
412            Some(data_store) => data_store.clone(),
413            None => return,
414        };
415
416        let data_store_key = self.data_store_key.clone();
417
418        let spawn_result = self.statsig_runtime.spawn(
419            "spec_store_update_data_store",
420            move |_shutdown_notif| async move {
421                let data_string = match data.read_to_string() {
422                    Ok(s) => s,
423                    Err(e) => {
424                        log_e!(TAG, "Failed to convert data to string: {}", e);
425                        return;
426                    }
427                };
428
429                let _ = data_store
430                    .set(&data_store_key, &data_string, Some(now))
431                    .await;
432            },
433        );
434
435        if let Err(e) = spawn_result {
436            log_e!(
437                TAG,
438                "Failed to spawn spec store update data store task: {e}"
439            );
440        }
441    }
442
443    fn are_current_values_newer(&self, data: &SpecStoreData) -> bool {
444        let curr_values = &data.values;
445        let next_values = &data.next_values;
446        let curr_checksum = curr_values.checksum.as_deref().unwrap_or_default();
447        let new_checksum = next_values.checksum.as_deref().unwrap_or_default();
448
449        let cached_time_is_newer = curr_values.time > 0 && curr_values.time > next_values.time;
450        let checksums_match = !curr_checksum.is_empty() && curr_checksum == new_checksum;
451
452        if cached_time_is_newer || checksums_match {
453            log_d!(
454                TAG,
455                "Received values for [time: {}, checksum: {}], but currently has values for [time: {}, checksum: {}]. Ignoring values.",
456                next_values.time,
457                new_checksum,
458                curr_values.time,
459                curr_checksum,
460                );
461            return true;
462        }
463
464        false
465    }
466}
467
468// -------------------------------------------------------------------------------------------- [Impl SpecsUpdateListener]
469
470impl SpecsUpdateListener for SpecStore {
471    fn did_receive_specs_update(&self, update: SpecsUpdate) -> Result<(), StatsigErr> {
472        self.set_values(update)
473    }
474
475    fn get_current_specs_info(&self) -> SpecsInfo {
476        match self.data.try_read_for(Duration::from_secs(5)) {
477            Some(data) => SpecsInfo {
478                lcut: Some(data.values.time),
479                checksum: data.values.checksum.clone(),
480                source: data.source.clone(),
481                source_api: data.source_api.clone(),
482            },
483            None => {
484                log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
485                SpecsInfo {
486                    lcut: None,
487                    checksum: None,
488                    source: SpecsSource::Error,
489                    source_api: None,
490                }
491            }
492        }
493    }
494}
495
496// -------------------------------------------------------------------------------------------- [Impl IdListsUpdateListener]
497
498impl IdListsUpdateListener for SpecStore {
499    fn get_current_id_list_metadata(
500        &self,
501    ) -> HashMap<String, crate::id_lists_adapter::IdListMetadata> {
502        match self.data.try_read_for(Duration::from_secs(5)) {
503            Some(data) => data
504                .id_lists
505                .iter()
506                .map(|(key, list)| (key.clone(), list.metadata.clone()))
507                .collect(),
508            None => {
509                log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
510                HashMap::new()
511            }
512        }
513    }
514
515    fn did_receive_id_list_updates(
516        &self,
517        updates: HashMap<String, crate::id_lists_adapter::IdListUpdate>,
518    ) {
519        let mut data = match self.data.try_write_for(Duration::from_secs(5)) {
520            Some(data) => data,
521            None => {
522                log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
523                return;
524            }
525        };
526
527        // delete any id_lists that are not in the updates
528        data.id_lists.retain(|name, _| updates.contains_key(name));
529
530        for (list_name, update) in updates {
531            if let Some(entry) = data.id_lists.get_mut(&list_name) {
532                // update existing
533                entry.apply_update(update);
534            } else {
535                // add new
536                let mut list = IdList::new(update.new_metadata.clone());
537                list.apply_update(update);
538                data.id_lists.insert(list_name, list);
539            }
540        }
541    }
542}
543
544enum ParseResult {
545    HasUpdates,
546    NoUpdates,
547}