Skip to main content

statsig_rust/
spec_store.rs

1use chrono::Utc;
2use parking_lot::RwLock;
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use crate::data_store_interface::{DataStoreCacheKeys, DataStoreTrait};
7use crate::evaluation::evaluator::SpecType;
8use crate::global_configs::GlobalConfigs;
9use crate::id_lists_adapter::{IdList, IdListsUpdateListener};
10use crate::interned_string::InternedString;
11use crate::networking::ResponseData;
12use crate::observability::observability_client_adapter::{MetricType, ObservabilityEvent};
13use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
14use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
15use crate::sdk_event_emitter::{SdkEvent, SdkEventEmitter};
16use crate::specs_response::proto_specs::deserialize_protobuf;
17use crate::specs_response::spec_types::{SpecsResponseFull, SpecsResponseNoUpdates};
18use crate::utils::try_release_unused_heap_memory;
19use crate::{
20    log_d, log_e, log_error_to_statsig_and_console, log_w, read_lock_or_else, write_lock_or_else,
21    SpecsFormat, SpecsInfo, SpecsSource, SpecsUpdate, SpecsUpdateListener, StatsigErr,
22    StatsigOptions, StatsigRuntime,
23};
24
25pub struct SpecStoreData {
26    pub source: SpecsSource,
27    pub source_api: Option<String>,
28    pub time_received_at: Option<u64>,
29    pub values: SpecsResponseFull,
30    pub id_lists: HashMap<String, IdList>,
31}
32
33const TAG: &str = stringify!(SpecStore);
34
35pub struct SpecStore {
36    pub data: Arc<RwLock<SpecStoreData>>,
37
38    data_store_keys: DataStoreCacheKeys,
39    data_store: Option<Arc<dyn DataStoreTrait>>,
40    statsig_runtime: Arc<StatsigRuntime>,
41    ops_stats: Arc<OpsStatsForInstance>,
42    global_configs: Arc<GlobalConfigs>,
43    event_emitter: Arc<SdkEventEmitter>,
44}
45
46impl SpecStore {
47    #[must_use]
48    pub fn new(
49        sdk_key: &str,
50        data_store_key: String,
51        statsig_runtime: Arc<StatsigRuntime>,
52        event_emitter: Arc<SdkEventEmitter>,
53        options: Option<&StatsigOptions>,
54    ) -> SpecStore {
55        let mut data_store = None;
56        if let Some(options) = options {
57            data_store = options.data_store.clone();
58        }
59
60        SpecStore {
61            data_store_keys: DataStoreCacheKeys::from_selected_key(&data_store_key),
62            data: Arc::new(RwLock::new(SpecStoreData {
63                values: SpecsResponseFull::default(),
64                time_received_at: None,
65                source: SpecsSource::Uninitialized,
66                source_api: None,
67                id_lists: HashMap::new(),
68            })),
69            event_emitter,
70            data_store,
71            statsig_runtime,
72            ops_stats: OPS_STATS.get_for_instance(sdk_key),
73            global_configs: GlobalConfigs::get_instance(sdk_key),
74        }
75    }
76
77    pub fn set_source(&self, source: SpecsSource) {
78        let mut locked_data = write_lock_or_else!(self.data, {
79            log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
80            return;
81        });
82
83        locked_data.source = source;
84        log_d!(TAG, "Source Changed ({:?})", locked_data.source);
85    }
86
87    pub fn get_current_values(&self) -> Option<SpecsResponseFull> {
88        let data = read_lock_or_else!(self.data, {
89            log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
90            return None;
91        });
92
93        let json = serde_json::to_string(&data.values).ok()?;
94        serde_json::from_str::<SpecsResponseFull>(&json).ok()
95    }
96
97    pub fn get_fields_used_for_entity(
98        &self,
99        entity_name: &str,
100        entity_type: SpecType,
101    ) -> Vec<String> {
102        let data = read_lock_or_else!(self.data, {
103            log_error_to_statsig_and_console!(
104                &self.ops_stats,
105                TAG,
106                StatsigErr::LockFailure(
107                    "Failed to acquire read lock for spec store data".to_string()
108                )
109            );
110            return vec![];
111        });
112
113        let entities = match entity_type {
114            SpecType::Gate => &data.values.feature_gates,
115            SpecType::DynamicConfig | SpecType::Experiment => &data.values.dynamic_configs,
116            SpecType::Layer => &data.values.layer_configs,
117            SpecType::ParameterStore => return vec![],
118        };
119
120        let entity_name = InternedString::from_str_ref(entity_name);
121        let entity = entities.get(&entity_name);
122
123        match entity {
124            Some(entity) => match &entity.as_spec_ref().fields_used {
125                Some(fields) => fields.iter().map(|f| f.unperformant_to_string()).collect(),
126                None => vec![],
127            },
128            None => vec![],
129        }
130    }
131
132    pub fn unperformant_keys_entity_filter(
133        &self,
134        top_level_key: &str,
135        entity_type: &str,
136    ) -> Vec<String> {
137        let data = read_lock_or_else!(self.data, {
138            log_error_to_statsig_and_console!(
139                &self.ops_stats,
140                TAG,
141                StatsigErr::LockFailure(
142                    "Failed to acquire read lock for spec store data".to_string()
143                )
144            );
145            return vec![];
146        });
147
148        if top_level_key == "param_stores" {
149            match &data.values.param_stores {
150                Some(param_stores) => {
151                    return param_stores
152                        .keys()
153                        .map(|k| k.unperformant_to_string())
154                        .collect()
155                }
156                None => return vec![],
157            }
158        }
159
160        let values = match top_level_key {
161            "feature_gates" => &data.values.feature_gates,
162            "dynamic_configs" => &data.values.dynamic_configs,
163            "layer_configs" => &data.values.layer_configs,
164            _ => {
165                log_e!(TAG, "Invalid top level key: {}", top_level_key);
166                return vec![];
167            }
168        };
169
170        if entity_type == "*" {
171            return values.keys().map(|k| k.unperformant_to_string()).collect();
172        }
173
174        values
175            .iter()
176            .filter(|(_, v)| v.as_spec_ref().entity == entity_type)
177            .map(|(k, _)| k.unperformant_to_string())
178            .collect()
179    }
180
181    pub fn set_values(&self, mut specs_update: SpecsUpdate) -> Result<(), StatsigErr> {
182        // Updating the spec store is a three step process that interacts with the SpecStoreData lock:
183        // 1. Prep (Read Lock). Deserialize the new data and compare it to the current values.
184        // 2. Apply (Write Lock). Update the spec store with the new values.
185        // 3. Notify (Read Lock). Emit the SDK event and update the data store.
186
187        // --- Prep ---
188
189        let prep_result = self.specs_update_prep(&mut specs_update).map_err(|e| {
190            log_error_to_statsig_and_console!(self.ops_stats, TAG, e);
191            e
192        })?;
193
194        let (next_values, response_format) = match prep_result {
195            PrepResult::HasUpdates(next_values, response_format) => (next_values, response_format),
196            PrepResult::CurrentValuesNewer => return Ok(()),
197            PrepResult::NoUpdates => {
198                self.ops_stats_log_no_update(specs_update.source, specs_update.source_api);
199                return Ok(());
200            }
201        };
202
203        // --- Apply ---
204
205        let apply_result = self
206            .specs_update_apply(next_values, &specs_update)
207            .map_err(|e| {
208                log_error_to_statsig_and_console!(self.ops_stats, TAG, e);
209                e
210            })?;
211
212        try_release_unused_heap_memory();
213
214        // --- Notify ---
215
216        self.specs_update_notify(response_format, specs_update, apply_result)
217            .map_err(|e| {
218                log_error_to_statsig_and_console!(self.ops_stats, TAG, e);
219                e
220            })?;
221
222        Ok(())
223    }
224}
225
226// -------------------------------------------------------------------------------------------- [ Private ]
227
228enum PrepResult {
229    HasUpdates(Box<SpecsResponseFull>, SpecsFormat),
230    NoUpdates,
231    CurrentValuesNewer,
232}
233
234struct ApplyResult {
235    prev_source: SpecsSource,
236    prev_lcut: u64,
237    time_received_at: u64,
238}
239
240impl SpecStore {
241    fn specs_update_prep(&self, specs_update: &mut SpecsUpdate) -> Result<PrepResult, StatsigErr> {
242        let response_format = self.get_spec_response_format(specs_update);
243
244        let read_data = read_lock_or_else!(self.data, {
245            let msg = "Failed to acquire read lock for extract_response_from_update";
246            log_e!(TAG, "{}", msg);
247            return Err(StatsigErr::LockFailure(msg.to_string()));
248        });
249
250        let current_values = &read_data.values;
251
252        // First, try a Full Specs Response deserialization
253        let first_deserialize_result =
254            self.deserialize_specs_data(current_values, &response_format, &mut specs_update.data);
255
256        let first_deserialize_error = match first_deserialize_result {
257            Ok(next_values) => {
258                if self.are_current_values_newer(&read_data, &next_values) {
259                    return Ok(PrepResult::CurrentValuesNewer);
260                }
261
262                if next_values.has_updates {
263                    return Ok(PrepResult::HasUpdates(
264                        Box::new(next_values),
265                        response_format,
266                    ));
267                }
268
269                None
270            }
271            Err(e) => Some(e),
272        };
273
274        // Second, try a No Updates deserialization
275        let second_deserialize_result = specs_update
276            .data
277            .deserialize_into::<SpecsResponseNoUpdates>();
278
279        let second_deserialize_error = match second_deserialize_result {
280            Ok(result) => {
281                if !result.has_updates {
282                    return Ok(PrepResult::NoUpdates);
283                }
284
285                None
286            }
287            Err(e) => Some(e),
288        };
289
290        let error = first_deserialize_error
291            .or(second_deserialize_error)
292            .unwrap_or_else(|| {
293                StatsigErr::JsonParseError("SpecsResponse".to_string(), "Unknown error".to_string())
294            });
295
296        Err(error)
297    }
298
299    fn specs_update_apply(
300        &self,
301        next_values: Box<SpecsResponseFull>,
302        specs_update: &SpecsUpdate,
303    ) -> Result<ApplyResult, StatsigErr> {
304        // DANGER: try_update_global_configs contains its own locks
305        self.try_update_global_configs(&next_values);
306
307        let mut data = write_lock_or_else!(self.data, {
308            let msg = "Failed to acquire write lock for swap_current_with_next";
309            log_e!(TAG, "{}", msg);
310            return Err(StatsigErr::LockFailure(msg.to_string()));
311        });
312
313        let prev_source = std::mem::replace(&mut data.source, specs_update.source.clone());
314        let prev_lcut = data.values.time;
315        let time_received_at = Utc::now().timestamp_millis() as u64;
316
317        data.values = *next_values;
318        data.time_received_at = Some(time_received_at);
319        data.source_api = specs_update.source_api.clone();
320
321        Ok(ApplyResult {
322            prev_source,
323            prev_lcut,
324            time_received_at,
325        })
326    }
327
328    fn specs_update_notify(
329        &self,
330        response_format: SpecsFormat,
331        specs_update: SpecsUpdate,
332        apply_result: ApplyResult,
333    ) -> Result<(), StatsigErr> {
334        let SpecsUpdate {
335            data,
336            source,
337            source_api,
338            ..
339        } = specs_update;
340
341        let current_lcut = {
342            let read_lock = read_lock_or_else!(self.data, {
343                let msg = "Failed to acquire read lock for set_values";
344                log_e!(TAG, "{}", msg);
345                return Err(StatsigErr::LockFailure(msg.to_string()));
346            });
347
348            self.emit_specs_updated_sdk_event(
349                &read_lock.source,
350                &read_lock.source_api,
351                &read_lock.values,
352            );
353
354            read_lock.values.time
355        };
356
357        // Data store writes now support both legacy JSON payloads and statsig-br protobuf bytes.
358        self.try_update_data_store(
359            &source,
360            data,
361            apply_result.time_received_at,
362            matches!(response_format, SpecsFormat::Protobuf),
363        );
364
365        self.ops_stats_log_config_propagation_diff(
366            current_lcut,
367            apply_result.prev_lcut,
368            &source,
369            &apply_result.prev_source,
370            source_api,
371            response_format,
372        );
373
374        Ok(())
375    }
376
377    fn deserialize_specs_data(
378        &self,
379        current_values: &SpecsResponseFull,
380        response_format: &SpecsFormat,
381        response_data: &mut ResponseData,
382    ) -> Result<SpecsResponseFull, StatsigErr> {
383        let mut next_values = SpecsResponseFull::default();
384
385        let parse_result = match response_format {
386            SpecsFormat::Protobuf => deserialize_protobuf(
387                &self.ops_stats,
388                current_values,
389                &mut next_values,
390                response_data,
391            ),
392            SpecsFormat::Json => response_data.deserialize_in_place(&mut next_values),
393        };
394
395        match parse_result {
396            Ok(()) => Ok(next_values),
397            Err(e) => Err(e),
398        }
399    }
400
401    fn emit_specs_updated_sdk_event(
402        &self,
403        source: &SpecsSource,
404        source_api: &Option<String>,
405        values: &SpecsResponseFull,
406    ) {
407        self.event_emitter.emit(SdkEvent::SpecsUpdated {
408            source,
409            source_api,
410            values,
411        });
412    }
413
414    fn get_spec_response_format(&self, update: &SpecsUpdate) -> SpecsFormat {
415        let content_type = update.data.get_header_ref("content-type");
416        if content_type.map(|s| s.as_str().contains("application/octet-stream")) != Some(true) {
417            return SpecsFormat::Json;
418        }
419
420        let content_encoding = update.data.get_header_ref("content-encoding");
421        if content_encoding.map(|s| s.as_str().contains("statsig-br")) != Some(true) {
422            return SpecsFormat::Json;
423        }
424
425        SpecsFormat::Protobuf
426    }
427
428    fn try_update_global_configs(&self, dcs: &SpecsResponseFull) {
429        if let Some(diagnostics) = &dcs.diagnostics {
430            self.global_configs
431                .set_diagnostics_sampling_rates(diagnostics.clone());
432        }
433
434        if let Some(sdk_configs) = &dcs.sdk_configs {
435            self.global_configs.set_sdk_configs(sdk_configs.clone());
436        }
437
438        if let Some(sdk_flags) = &dcs.sdk_flags {
439            self.global_configs.set_sdk_flags(sdk_flags.clone());
440        }
441    }
442
443    fn try_update_data_store(
444        &self,
445        source: &SpecsSource,
446        mut data: ResponseData,
447        now: u64,
448        is_protobuf: bool,
449    ) {
450        if source != &SpecsSource::Network {
451            return;
452        }
453
454        if data.get_header_ref("x-deltas-used").is_some() {
455            log_d!(
456                TAG,
457                "Skipping data store write for delta response identified by x-deltas-used header"
458            );
459            return;
460        }
461
462        let data_store = match &self.data_store {
463            Some(data_store) => data_store.clone(),
464            None => return,
465        };
466
467        let data_store_key = if is_protobuf {
468            self.data_store_keys.statsig_br.clone()
469        } else {
470            self.data_store_keys.plain_text.clone()
471        };
472
473        let spawn_result = self.statsig_runtime.spawn(
474            "spec_store_update_data_store",
475            move |_shutdown_notif| async move {
476                let data_bytes = match data.read_to_bytes() {
477                    Ok(bytes) => bytes,
478                    Err(e) => {
479                        log_e!(TAG, "Failed to read data as bytes: {}", e);
480                        return;
481                    }
482                };
483
484                write_specs_to_data_store(data_store, data_store_key, data_bytes, now, is_protobuf)
485                    .await;
486            },
487        );
488
489        if let Err(e) = spawn_result {
490            log_e!(
491                TAG,
492                "Failed to spawn spec store update data store task: {e}"
493            );
494        }
495    }
496
497    fn are_current_values_newer(
498        &self,
499        data: &SpecStoreData,
500        next_values: &SpecsResponseFull,
501    ) -> bool {
502        let curr_values = &data.values;
503        let curr_checksum = curr_values.checksum.as_deref().unwrap_or_default();
504        let new_checksum = next_values.checksum.as_deref().unwrap_or_default();
505
506        let cached_time_is_newer = curr_values.time > 0 && curr_values.time > next_values.time;
507        let checksums_match = !curr_checksum.is_empty() && curr_checksum == new_checksum;
508
509        if cached_time_is_newer || checksums_match {
510            log_d!(
511                TAG,
512                "Received values for [time: {}, checksum: {}], but currently has values for [time: {}, checksum: {}]. Ignoring values.",
513                next_values.time,
514                new_checksum,
515                curr_values.time,
516                curr_checksum,
517                );
518            return true;
519        }
520
521        false
522    }
523}
524
525async fn write_specs_to_data_store(
526    data_store: Arc<dyn DataStoreTrait>,
527    data_store_key: String,
528    data_bytes: Vec<u8>,
529    now: u64,
530    is_protobuf: bool,
531) {
532    match data_store
533        .set_bytes(&data_store_key, &data_bytes, Some(now))
534        .await
535    {
536        Ok(()) => return,
537        Err(e @ StatsigErr::BytesNotImplemented) if is_protobuf => {
538            log_w!(
539                TAG,
540                "Failed to write protobuf specs to data store as bytes. Protobuf specs cannot fall back to string writes: {}",
541                e
542            );
543            return;
544        }
545        Err(e @ StatsigErr::BytesNotImplemented) => {
546            log_w!(
547                TAG,
548                "Data store bytes write is not implemented. Falling back to string write: {}",
549                e
550            );
551        }
552        Err(e) => {
553            log_w!(TAG, "Failed to write specs to data store as bytes: {}", e);
554            return;
555        }
556    }
557
558    let data_string = match String::from_utf8(data_bytes) {
559        Ok(s) => s,
560        Err(e) => {
561            log_w!(
562                TAG,
563                "Skipping data store string write because payload is not valid UTF-8: {}",
564                e
565            );
566            return;
567        }
568    };
569
570    if let Err(e) = data_store
571        .set(&data_store_key, &data_string, Some(now))
572        .await
573    {
574        log_w!(TAG, "Failed to write specs to data store as string: {}", e);
575    }
576}
577
578// -------------------------------------------------------------------------------------------- [ OpsStats Helpers ]
579
580impl SpecStore {
581    fn ops_stats_log_no_update(&self, source: SpecsSource, source_api: Option<String>) {
582        log_d!(TAG, "No Updates");
583        self.ops_stats.log(ObservabilityEvent::new_event(
584            MetricType::Increment,
585            "config_no_update".to_string(),
586            1.0,
587            Some(HashMap::from([
588                ("source".to_string(), source.to_string()),
589                ("source_api".to_string(), source_api.unwrap_or_default()),
590            ])),
591        ));
592    }
593
594    #[allow(clippy::too_many_arguments)]
595    fn ops_stats_log_config_propagation_diff(
596        &self,
597        lcut: u64,
598        prev_lcut: u64,
599        source: &SpecsSource,
600        prev_source: &SpecsSource,
601        source_api: Option<String>,
602        response_format: SpecsFormat,
603    ) {
604        let delay = (Utc::now().timestamp_millis() as u64).saturating_sub(lcut);
605        log_d!(TAG, "Updated ({:?})", source);
606
607        if *prev_source == SpecsSource::Uninitialized || *prev_source == SpecsSource::Loading {
608            return;
609        }
610
611        self.ops_stats.log(ObservabilityEvent::new_event(
612            MetricType::Dist,
613            "config_propagation_diff".to_string(),
614            delay as f64,
615            Some(HashMap::from([
616                ("source".to_string(), source.to_string()),
617                ("lcut".to_string(), lcut.to_string()),
618                ("prev_lcut".to_string(), prev_lcut.to_string()),
619                ("source_api".to_string(), source_api.unwrap_or_default()),
620                (
621                    "response_format".to_string(),
622                    Into::<&str>::into(&response_format).to_string(),
623                ),
624            ])),
625        ));
626    }
627}
628
629// -------------------------------------------------------------------------------------------- [Impl SpecsUpdateListener]
630
631impl SpecsUpdateListener for SpecStore {
632    fn did_receive_specs_update(&self, update: SpecsUpdate) -> Result<(), StatsigErr> {
633        self.set_values(update)
634    }
635
636    fn get_current_specs_info(&self) -> SpecsInfo {
637        let data = read_lock_or_else!(self.data, {
638            log_e!(
639                TAG,
640                "Failed to acquire read lock for get_current_specs_info"
641            );
642            return SpecsInfo {
643                lcut: None,
644                checksum: None,
645                source: SpecsSource::Error,
646                source_api: None,
647            };
648        });
649
650        SpecsInfo {
651            lcut: Some(data.values.time),
652            checksum: data.values.checksum.clone(),
653            source: data.source.clone(),
654            source_api: data.source_api.clone(),
655        }
656    }
657}
658
659// -------------------------------------------------------------------------------------------- [Impl IdListsUpdateListener]
660
661impl IdListsUpdateListener for SpecStore {
662    fn get_current_id_list_metadata(
663        &self,
664    ) -> HashMap<String, crate::id_lists_adapter::IdListMetadata> {
665        let data = read_lock_or_else!(self.data, {
666            let err = StatsigErr::LockFailure(
667                "Failed to acquire read lock for id list metadata".to_string(),
668            );
669            log_error_to_statsig_and_console!(self.ops_stats, TAG, err);
670            return HashMap::new();
671        });
672
673        data.id_lists
674            .iter()
675            .map(|(key, list)| (key.clone(), list.metadata.clone()))
676            .collect()
677    }
678
679    fn did_receive_id_list_updates(
680        &self,
681        updates: HashMap<String, crate::id_lists_adapter::IdListUpdate>,
682    ) {
683        let mut data = write_lock_or_else!(self.data, {
684            let err = StatsigErr::LockFailure(
685                "Failed to acquire write lock for did_receive_id_list_updates".to_string(),
686            );
687            log_error_to_statsig_and_console!(self.ops_stats, TAG, err);
688
689            return;
690        });
691
692        // delete any id_lists that are not in the updates
693        data.id_lists.retain(|name, _| updates.contains_key(name));
694
695        for (list_name, update) in updates {
696            if let Some(entry) = data.id_lists.get_mut(&list_name) {
697                // update existing
698                entry.apply_update(update);
699            } else {
700                // add new
701                let mut list = IdList::new(update.new_metadata.clone());
702                list.apply_update(update);
703                data.id_lists.insert(list_name, list);
704            }
705        }
706    }
707}