statsig_rust/
spec_store.rs

1use crate::compression::zstd_decompression_dict::DictionaryDecoder;
2use crate::data_store_interface::{get_data_adapter_dcs_key, DataStoreTrait};
3use crate::global_configs::GlobalConfigs;
4use crate::id_lists_adapter::{IdList, IdListsUpdateListener};
5use crate::observability::observability_client_adapter::{MetricType, ObservabilityEvent};
6use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
7use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
8use crate::spec_types::{SpecsResponse, SpecsResponseFull};
9use crate::{
10    log_d, log_e, log_error_to_statsig_and_console, SpecsInfo, SpecsSource, SpecsUpdate,
11    SpecsUpdateListener, StatsigErr, StatsigRuntime,
12};
13use chrono::Utc;
14use serde::Serialize;
15use std::collections::HashMap;
16use std::sync::{Arc, RwLock};
17
18#[derive(Clone, Serialize)]
19pub struct SpecStoreData {
20    pub source: SpecsSource,
21    pub time_received_at: Option<u64>,
22    pub values: SpecsResponseFull,
23    pub decompression_dict: Option<DictionaryDecoder>,
24    pub id_lists: HashMap<String, IdList>,
25}
26
27const TAG: &str = stringify!(SpecStore);
28
29pub struct SpecStore {
30    pub hashed_sdk_key: String,
31    pub data: Arc<RwLock<SpecStoreData>>,
32    pub data_store: Option<Arc<dyn DataStoreTrait>>,
33    pub statsig_runtime: Option<Arc<StatsigRuntime>>,
34    ops_stats: Arc<OpsStatsForInstance>,
35    global_configs: Arc<GlobalConfigs>,
36}
37
38impl SpecsUpdateListener for SpecStore {
39    fn did_receive_specs_update(&self, update: SpecsUpdate) -> Result<(), StatsigErr> {
40        self.set_values(update)
41    }
42
43    fn get_current_specs_info(&self) -> SpecsInfo {
44        match self.data.read() {
45            Ok(data) => SpecsInfo {
46                lcut: Some(data.values.time),
47                checksum: data.values.checksum.clone(),
48                zstd_dict_id: data
49                    .decompression_dict
50                    .as_ref()
51                    .map(|d| d.get_dict_id().to_string()),
52                source: data.source.clone(),
53            },
54            Err(e) => {
55                log_e!(TAG, "Failed to acquire read lock: {}", e);
56                SpecsInfo {
57                    lcut: None,
58                    checksum: None,
59                    zstd_dict_id: None,
60                    source: SpecsSource::Error,
61                }
62            }
63        }
64    }
65}
66
67impl IdListsUpdateListener for SpecStore {
68    fn get_current_id_list_metadata(
69        &self,
70    ) -> HashMap<String, crate::id_lists_adapter::IdListMetadata> {
71        match self.data.read() {
72            Ok(data) => data
73                .id_lists
74                .iter()
75                .map(|(key, list)| (key.clone(), list.metadata.clone()))
76                .collect(),
77            Err(e) => {
78                log_e!(TAG, "Failed to acquire read lock: {}", e);
79                HashMap::new()
80            }
81        }
82    }
83
84    fn did_receive_id_list_updates(
85        &self,
86        updates: HashMap<String, crate::id_lists_adapter::IdListUpdate>,
87    ) {
88        let mut data = match self.data.write() {
89            Ok(data) => data,
90            Err(e) => {
91                log_e!(TAG, "Failed to acquire write lock: {}", e);
92                return;
93            }
94        };
95
96        // delete any id_lists that are not in the updates
97        data.id_lists.retain(|name, _| updates.contains_key(name));
98
99        for (list_name, update) in updates {
100            if let Some(entry) = data.id_lists.get_mut(&list_name) {
101                // update existing
102                entry.apply_update(&update);
103            } else {
104                // add new
105                let mut list = IdList::new(update.new_metadata.clone());
106                list.apply_update(&update);
107                data.id_lists.insert(list_name, list);
108            }
109        }
110    }
111}
112
113impl Default for SpecStore {
114    fn default() -> Self {
115        let sdk_key = String::new();
116        Self::new(&sdk_key, sdk_key.to_string(), None, None)
117    }
118}
119
120impl SpecStore {
121    #[must_use]
122    pub fn new(
123        sdk_key: &str,
124        hashed_sdk_key: String,
125        data_store: Option<Arc<dyn DataStoreTrait>>,
126        statsig_runtime: Option<Arc<StatsigRuntime>>,
127    ) -> SpecStore {
128        SpecStore {
129            hashed_sdk_key,
130            data: Arc::new(RwLock::new(SpecStoreData {
131                values: SpecsResponseFull::blank(),
132                time_received_at: None,
133                source: SpecsSource::Uninitialized,
134                decompression_dict: None,
135                id_lists: HashMap::new(),
136            })),
137            data_store,
138            statsig_runtime,
139            ops_stats: OPS_STATS.get_for_instance(sdk_key),
140            global_configs: GlobalConfigs::get_instance(sdk_key),
141        }
142    }
143
144    pub fn set_source(&self, source: SpecsSource) {
145        if let Ok(mut mut_values) = self.data.write() {
146            mut_values.source = source;
147            log_d!(TAG, "SpecStore - Source Changed ({:?})", mut_values.source);
148        }
149    }
150
151    pub fn set_values(&self, values: SpecsUpdate) -> Result<(), StatsigErr> {
152        let parsed = serde_json::from_str::<SpecsResponse>(&values.data);
153        let dcs = match parsed {
154            Ok(SpecsResponse::Full(full)) => {
155                if !full.has_updates {
156                    self.log_no_update(values.source);
157                    return Ok(());
158                }
159
160                log_d!(
161                    TAG,
162                    "SpecStore Full Update: {} - [gates({}), configs({}), layers({})]",
163                    full.time,
164                    full.feature_gates.len(),
165                    full.dynamic_configs.len(),
166                    full.layer_configs.len(),
167                );
168
169                full
170            }
171            Ok(SpecsResponse::NoUpdates(no_updates)) => {
172                if !no_updates.has_updates {
173                    self.log_no_update(values.source);
174                    return Ok(());
175                }
176                log_error_to_statsig_and_console!(
177                    self.ops_stats,
178                    TAG,
179                    "Empty response with has_updates = true {:?}",
180                    values.source
181                );
182                return Err(StatsigErr::JsonParseError(
183                    "SpecsResponse".to_owned(),
184                    "Parse failure. 'has_update' is true, but failed to deserialize to response format 'dcs-v2'".to_owned(),
185                ));
186            }
187            Err(e) => {
188                // todo: Handle bad parsing
189                log_error_to_statsig_and_console!(
190                    self.ops_stats,
191                    TAG,
192                    "{:?}, {:?}",
193                    e,
194                    values.source
195                );
196                return Err(StatsigErr::JsonParseError(
197                    "config_spec".to_string(),
198                    e.to_string(),
199                ));
200            }
201        };
202
203        if let Some(diagnostics) = &dcs.diagnostics {
204            self.global_configs
205                .set_diagnostics_sampling_rates(diagnostics.clone());
206        }
207        if let Some(sdk_configs) = &dcs.sdk_configs {
208            self.global_configs.set_sdk_configs(sdk_configs.clone());
209        }
210
211        if let Ok(mut mut_values) = self.data.write() {
212            let cached_time_is_newer =
213                mut_values.values.time > 0 && mut_values.values.time > dcs.time;
214            let checksums_match =
215                mut_values
216                    .values
217                    .checksum
218                    .as_ref()
219                    .is_some_and(|cached_checksum| {
220                        dcs.checksum
221                            .as_ref()
222                            .is_some_and(|new_checksum| cached_checksum == new_checksum)
223                    });
224
225            if cached_time_is_newer || checksums_match {
226                log_d!(
227                    TAG,
228                    "SpecStore - Received values for [time: {}, checksum: {}], but currently has values for [time: {}, checksum: {}]. Ignoring values.",
229                    dcs.time,
230                    dcs.checksum.unwrap_or(String::new()),
231                    mut_values.values.time,
232                    mut_values.values.checksum.clone().unwrap_or(String::new()),
233                );
234                return Ok(());
235            }
236            let curr_time = Some(Utc::now().timestamp_millis() as u64);
237            let prev_source = mut_values.source.clone();
238            mut_values.values = *dcs;
239            mut_values.time_received_at = curr_time;
240            mut_values.source = values.source.clone();
241            if self.data_store.is_some() && mut_values.source == SpecsSource::Network {
242                if let Some(data_store) = self.data_store.clone() {
243                    let hashed_key = self.hashed_sdk_key.clone();
244                    self.statsig_runtime.clone().map(|rt| {
245                        let copy = curr_time;
246                        rt.spawn("update data adapter", move |_| async move {
247                            let _ = data_store
248                                .set(&get_data_adapter_dcs_key(&hashed_key), &values.data, copy)
249                                .await;
250                        })
251                    });
252                }
253            }
254            self.log_processing_config(
255                mut_values.values.time,
256                mut_values.source.clone(),
257                prev_source,
258            );
259        }
260
261        Ok(())
262    }
263
264    fn log_processing_config(&self, lcut: u64, source: SpecsSource, prev_source: SpecsSource) {
265        let delay = Utc::now().timestamp_millis() as u64 - lcut;
266        log_d!(TAG, "SpecStore - Updated ({:?})", source);
267
268        if prev_source != SpecsSource::Uninitialized && prev_source != SpecsSource::Loading {
269            self.ops_stats.log(ObservabilityEvent::new_event(
270                MetricType::Dist,
271                "config_propogation_diff".to_string(),
272                delay as f64,
273                Some(HashMap::from([("source".to_string(), source.to_string())])),
274            ));
275        }
276    }
277
278    fn log_no_update(&self, source: SpecsSource) {
279        log_d!(TAG, "SpecStore - No Updates");
280        self.ops_stats.log(ObservabilityEvent::new_event(
281            MetricType::Increment,
282            "config_no_update".to_string(),
283            1.0,
284            Some(HashMap::from([("source".to_string(), source.to_string())])),
285        ));
286    }
287}
288
289impl SpecsResponseFull {
290    fn blank() -> Self {
291        SpecsResponseFull {
292            feature_gates: Default::default(),
293            dynamic_configs: Default::default(),
294            layer_configs: Default::default(),
295            condition_map: Default::default(),
296            experiment_to_layer: Default::default(),
297            has_updates: true,
298            time: 0,
299            checksum: None,
300            default_environment: None,
301            app_id: None,
302            sdk_keys_to_app_ids: None,
303            hashed_sdk_keys_to_app_ids: None,
304            diagnostics: None,
305            param_stores: None,
306            sdk_configs: None,
307            cmab_configs: None,
308            overrides: None,
309            override_rules: None,
310        }
311    }
312}