statsig_rust/
spec_store.rs

1use crate::compression::zstd_decompression_dict::DictionaryDecoder;
2use crate::data_store_interface::{get_data_adapter_dcs_key, DataStoreTrait};
3use crate::global_configs::GlobalConfigs;
4use crate::id_lists_adapter::{IdList, IdListsUpdateListener};
5use crate::observability::observability_client_adapter::{MetricType, ObservabilityEvent};
6use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
7use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
8use crate::spec_types::{SpecsResponse, SpecsResponseFull, SpecsResponseNoUpdates};
9use crate::{
10    log_d, log_e, log_error_to_statsig_and_console, SpecsInfo, SpecsSource, SpecsUpdate,
11    SpecsUpdateListener, StatsigErr, StatsigRuntime,
12};
13use chrono::Utc;
14use serde::Serialize;
15use std::collections::HashMap;
16use std::sync::{Arc, RwLock};
17
18#[derive(Clone, Serialize)]
19pub struct SpecStoreData {
20    pub source: SpecsSource,
21    pub time_received_at: Option<u64>,
22    pub values: SpecsResponseFull,
23    pub decompression_dict: Option<DictionaryDecoder>,
24    pub id_lists: HashMap<String, IdList>,
25}
26
27const TAG: &str = stringify!(SpecStore);
28
29pub struct SpecStore {
30    pub data: Arc<RwLock<SpecStoreData>>,
31
32    hashed_sdk_key: String,
33    data_store: Option<Arc<dyn DataStoreTrait>>,
34    statsig_runtime: Arc<StatsigRuntime>,
35    ops_stats: Arc<OpsStatsForInstance>,
36    global_configs: Arc<GlobalConfigs>,
37}
38
39impl SpecStore {
40    #[must_use]
41    pub fn new(
42        sdk_key: &str,
43        hashed_sdk_key: String,
44        statsig_runtime: Arc<StatsigRuntime>,
45        data_store: Option<Arc<dyn DataStoreTrait>>,
46    ) -> SpecStore {
47        SpecStore {
48            hashed_sdk_key,
49            data: Arc::new(RwLock::new(SpecStoreData {
50                values: SpecsResponseFull::blank(),
51                time_received_at: None,
52                source: SpecsSource::Uninitialized,
53                decompression_dict: None,
54                id_lists: HashMap::new(),
55            })),
56            data_store,
57            statsig_runtime,
58            ops_stats: OPS_STATS.get_for_instance(sdk_key),
59            global_configs: GlobalConfigs::get_instance(sdk_key),
60        }
61    }
62
63    pub fn set_source(&self, source: SpecsSource) {
64        if let Ok(mut mut_values) = self.data.write() {
65            mut_values.source = source;
66            log_d!(TAG, "SpecStore - Source Changed ({:?})", mut_values.source);
67        }
68    }
69
70    pub fn get_current_values(&self) -> Option<SpecStoreData> {
71        let cloned = self.data.read().ok()?.clone();
72        Some(cloned)
73    }
74
75    pub fn set_values(&self, values: SpecsUpdate) -> Result<(), StatsigErr> {
76        let dcs = match self.parse_specs_response(&values)? {
77            SpecsResponse::Full(full) => full,
78            SpecsResponse::NoUpdates(_) => {
79                self.ops_stats_log_no_update(values.source);
80                return Ok(());
81            }
82        };
83
84        if self.are_current_values_newer(&dcs) {
85            return Ok(());
86        }
87
88        let mut mut_values = match self.data.write() {
89            Ok(mut_values) => mut_values,
90            Err(e) => {
91                log_e!(TAG, "Failed to acquire write lock: {}", e);
92                return Err(StatsigErr::LockFailure(e.to_string()));
93            }
94        };
95
96        self.try_update_global_configs(&dcs);
97
98        let now = Utc::now().timestamp_millis() as u64;
99        let prev_source = mut_values.source.clone();
100
101        mut_values.values = *dcs;
102        mut_values.time_received_at = Some(now);
103        mut_values.source = values.source.clone();
104
105        self.try_update_data_store(&mut_values.source, values.data, now);
106        self.ops_stats_log_config_propogation_diff(
107            mut_values.values.time,
108            &mut_values.source,
109            &prev_source,
110        );
111
112        Ok(())
113    }
114}
115
116// -------------------------------------------------------------------------------------------- [Private Functions]
117
118impl SpecStore {
119    fn parse_specs_response(&self, values: &SpecsUpdate) -> Result<SpecsResponse, StatsigErr> {
120        let parsed = match serde_json::from_str::<SpecsResponse>(&values.data) {
121            Ok(response) => response,
122            Err(e) => {
123                log_error_to_statsig_and_console!(
124                    self.ops_stats,
125                    TAG,
126                    "{:?}, {:?}",
127                    e,
128                    values.source
129                );
130                return Err(StatsigErr::JsonParseError(
131                    "SpecsResponse".to_string(),
132                    e.to_string(),
133                ));
134            }
135        };
136
137        match parsed {
138            SpecsResponse::Full(full) => {
139                if !full.has_updates {
140                    return Ok(SpecsResponse::NoUpdates(SpecsResponseNoUpdates {
141                        has_updates: false,
142                    }));
143                }
144
145                log_d!(
146                    TAG,
147                    "SpecStore Full Update: {} - [gates({}), configs({}), layers({})]",
148                    full.time,
149                    full.feature_gates.len(),
150                    full.dynamic_configs.len(),
151                    full.layer_configs.len(),
152                );
153
154                Ok(SpecsResponse::Full(full))
155            }
156            SpecsResponse::NoUpdates(no_updates) => {
157                if no_updates.has_updates {
158                    log_error_to_statsig_and_console!(
159                        self.ops_stats,
160                        TAG,
161                        "Empty response with has_updates = true {:?}",
162                        values.source
163                    );
164
165                    return Err(StatsigErr::JsonParseError(
166                        "SpecsResponse".to_owned(),
167                        "Parse failure. 'has_update' is true, but failed to deserialize to response format 'dcs-v2'".to_owned(),
168                    ));
169                }
170
171                Ok(SpecsResponse::NoUpdates(no_updates))
172            }
173        }
174    }
175
176    fn ops_stats_log_no_update(&self, source: SpecsSource) {
177        log_d!(TAG, "SpecStore - No Updates");
178        self.ops_stats.log(ObservabilityEvent::new_event(
179            MetricType::Increment,
180            "config_no_update".to_string(),
181            1.0,
182            Some(HashMap::from([("source".to_string(), source.to_string())])),
183        ));
184    }
185
186    fn ops_stats_log_config_propogation_diff(
187        &self,
188        lcut: u64,
189        source: &SpecsSource,
190        prev_source: &SpecsSource,
191    ) {
192        let delay = Utc::now().timestamp_millis() as u64 - lcut;
193        log_d!(TAG, "SpecStore - Updated ({:?})", source);
194
195        if *prev_source == SpecsSource::Uninitialized || *prev_source == SpecsSource::Loading {
196            return;
197        }
198
199        self.ops_stats.log(ObservabilityEvent::new_event(
200            MetricType::Dist,
201            "config_propogation_diff".to_string(),
202            delay as f64,
203            Some(HashMap::from([("source".to_string(), source.to_string())])),
204        ));
205    }
206
207    fn try_update_global_configs(&self, dcs: &SpecsResponseFull) {
208        if let Some(diagnostics) = &dcs.diagnostics {
209            self.global_configs
210                .set_diagnostics_sampling_rates(diagnostics.clone());
211        }
212
213        if let Some(sdk_configs) = &dcs.sdk_configs {
214            self.global_configs.set_sdk_configs(sdk_configs.clone());
215        }
216    }
217
218    fn try_update_data_store(&self, source: &SpecsSource, data: String, now: u64) {
219        if *source != SpecsSource::Network {
220            return;
221        }
222
223        let data_store = match &self.data_store {
224            Some(data_store) => data_store.clone(),
225            None => return,
226        };
227
228        let hashed_key = self.hashed_sdk_key.clone();
229        self.statsig_runtime.spawn(
230            "spec_store_update_data_store",
231            move |_shutdown_notif| async move {
232                let _ = data_store
233                    .set(&get_data_adapter_dcs_key(&hashed_key), &data, Some(now))
234                    .await;
235            },
236        );
237    }
238
239    fn are_current_values_newer(&self, dcs: &SpecsResponseFull) -> bool {
240        let guard = match self.data.read() {
241            Ok(guard) => guard,
242            Err(e) => {
243                log_e!(TAG, "Failed to acquire read lock: {}", e);
244                return false;
245            }
246        };
247
248        let curr_values = &guard.values;
249        let curr_checksum = curr_values.checksum.as_deref().unwrap_or_default();
250        let new_checksum = dcs.checksum.as_deref().unwrap_or_default();
251
252        let cached_time_is_newer = curr_values.time > 0 && curr_values.time > dcs.time;
253        let checksums_match = !curr_checksum.is_empty() && curr_checksum == new_checksum;
254
255        if cached_time_is_newer || checksums_match {
256            log_d!(
257                    TAG,
258                    "SpecStore - Received values for [time: {}, checksum: {}], but currently has values for [time: {}, checksum: {}]. Ignoring values.",
259                    dcs.time,
260                    new_checksum,
261                    curr_values.time,
262                    curr_checksum,
263                );
264            return true;
265        }
266
267        false
268    }
269}
270
271// -------------------------------------------------------------------------------------------- [Impl SpecsUpdateListener]
272
273impl SpecsUpdateListener for SpecStore {
274    fn did_receive_specs_update(&self, update: SpecsUpdate) -> Result<(), StatsigErr> {
275        self.set_values(update)
276    }
277
278    fn get_current_specs_info(&self) -> SpecsInfo {
279        match self.data.read() {
280            Ok(data) => SpecsInfo {
281                lcut: Some(data.values.time),
282                checksum: data.values.checksum.clone(),
283                zstd_dict_id: data
284                    .decompression_dict
285                    .as_ref()
286                    .map(|d| d.get_dict_id().to_string()),
287                source: data.source.clone(),
288            },
289            Err(e) => {
290                log_e!(TAG, "Failed to acquire read lock: {}", e);
291                SpecsInfo {
292                    lcut: None,
293                    checksum: None,
294                    zstd_dict_id: None,
295                    source: SpecsSource::Error,
296                }
297            }
298        }
299    }
300}
301
302// -------------------------------------------------------------------------------------------- [Impl IdListsUpdateListener]
303
304impl IdListsUpdateListener for SpecStore {
305    fn get_current_id_list_metadata(
306        &self,
307    ) -> HashMap<String, crate::id_lists_adapter::IdListMetadata> {
308        match self.data.read() {
309            Ok(data) => data
310                .id_lists
311                .iter()
312                .map(|(key, list)| (key.clone(), list.metadata.clone()))
313                .collect(),
314            Err(e) => {
315                log_e!(TAG, "Failed to acquire read lock: {}", e);
316                HashMap::new()
317            }
318        }
319    }
320
321    fn did_receive_id_list_updates(
322        &self,
323        updates: HashMap<String, crate::id_lists_adapter::IdListUpdate>,
324    ) {
325        let mut data = match self.data.write() {
326            Ok(data) => data,
327            Err(e) => {
328                log_e!(TAG, "Failed to acquire write lock: {}", e);
329                return;
330            }
331        };
332
333        // delete any id_lists that are not in the updates
334        data.id_lists.retain(|name, _| updates.contains_key(name));
335
336        for (list_name, update) in updates {
337            if let Some(entry) = data.id_lists.get_mut(&list_name) {
338                // update existing
339                entry.apply_update(&update);
340            } else {
341                // add new
342                let mut list = IdList::new(update.new_metadata.clone());
343                list.apply_update(&update);
344                data.id_lists.insert(list_name, list);
345            }
346        }
347    }
348}
349
350impl SpecsResponseFull {
351    fn blank() -> Self {
352        SpecsResponseFull {
353            feature_gates: Default::default(),
354            dynamic_configs: Default::default(),
355            layer_configs: Default::default(),
356            condition_map: Default::default(),
357            experiment_to_layer: Default::default(),
358            has_updates: true,
359            time: 0,
360            checksum: None,
361            default_environment: None,
362            app_id: None,
363            sdk_keys_to_app_ids: None,
364            hashed_sdk_keys_to_app_ids: None,
365            diagnostics: None,
366            param_stores: None,
367            sdk_configs: None,
368            cmab_configs: None,
369            overrides: None,
370            override_rules: None,
371        }
372    }
373}