statsig_rust/
spec_store.rs

1use crate::data_store_interface::{get_data_adapter_dcs_key, DataStoreTrait};
2use crate::id_lists_adapter::{IdList, IdListsUpdateListener};
3use crate::observability::observability_client_adapter::{MetricType, ObservabilityEvent};
4use crate::observability::ops_stats::OpsStatsForInstance;
5use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
6use crate::spec_types::{SpecsResponse, SpecsResponseFull};
7use crate::{
8    log_d, log_e, log_error_to_statsig_and_console, DynamicValue, SpecsInfo, SpecsSource,
9    SpecsUpdate, SpecsUpdateListener, StatsigErr, StatsigRuntime,
10};
11use chrono::Utc;
12use serde::Serialize;
13use std::collections::HashMap;
14use std::sync::{Arc, RwLock};
15
16#[derive(Clone, Serialize)]
17pub struct SpecStoreData {
18    pub source: SpecsSource,
19    pub time_received_at: Option<u64>,
20    pub values: SpecsResponseFull,
21
22    pub id_lists: HashMap<String, IdList>,
23}
24
25const TAG: &str = stringify!(SpecStore);
26
27pub struct SpecStore {
28    pub hashed_sdk_key: String,
29    pub data: Arc<RwLock<SpecStoreData>>,
30    pub data_store: Option<Arc<dyn DataStoreTrait>>,
31    pub statsig_runtime: Option<Arc<StatsigRuntime>>,
32    ops_stats: Arc<OpsStatsForInstance>,
33}
34
35impl SpecsUpdateListener for SpecStore {
36    fn did_receive_specs_update(&self, update: SpecsUpdate) -> Result<(), StatsigErr> {
37        self.set_values(update)
38    }
39
40    fn get_current_specs_info(&self) -> SpecsInfo {
41        match self.data.read() {
42            Ok(data) => SpecsInfo {
43                lcut: Some(data.values.time),
44                checksum: data.values.checksum.clone(),
45                source: data.source.clone(),
46            },
47            Err(e) => {
48                log_e!(TAG, "Failed to acquire read lock: {}", e);
49                SpecsInfo {
50                    lcut: None,
51                    checksum: None,
52                    source: SpecsSource::Error,
53                }
54            }
55        }
56    }
57}
58
59impl IdListsUpdateListener for SpecStore {
60    fn get_current_id_list_metadata(
61        &self,
62    ) -> HashMap<String, crate::id_lists_adapter::IdListMetadata> {
63        match self.data.read() {
64            Ok(data) => data
65                .id_lists
66                .iter()
67                .map(|(key, list)| (key.clone(), list.metadata.clone()))
68                .collect(),
69            Err(e) => {
70                log_e!(TAG, "Failed to acquire read lock: {}", e);
71                HashMap::new()
72            }
73        }
74    }
75
76    fn did_receive_id_list_updates(
77        &self,
78        updates: HashMap<String, crate::id_lists_adapter::IdListUpdate>,
79    ) {
80        let mut data = match self.data.write() {
81            Ok(data) => data,
82            Err(e) => {
83                log_e!(TAG, "Failed to acquire write lock: {}", e);
84                return;
85            }
86        };
87
88        // delete any id_lists that are not in the updates
89        data.id_lists.retain(|name, _| updates.contains_key(name));
90
91        for (list_name, update) in updates {
92            if let Some(entry) = data.id_lists.get_mut(&list_name) {
93                // update existing
94                entry.apply_update(&update);
95            } else {
96                // add new
97                let mut list = IdList::new(update.new_metadata.clone());
98                list.apply_update(&update);
99                data.id_lists.insert(list_name, list);
100            }
101        }
102    }
103}
104
105impl Default for SpecStore {
106    fn default() -> Self {
107        Self::new(
108            String::new(),
109            None,
110            None,
111            Arc::new(OpsStatsForInstance::new()),
112        )
113    }
114}
115
116impl SpecStore {
117    #[must_use]
118    pub fn new(
119        hashed_sdk_key: String,
120        data_store: Option<Arc<dyn DataStoreTrait>>,
121        statsig_runtime: Option<Arc<StatsigRuntime>>,
122        ops_stats: Arc<OpsStatsForInstance>,
123    ) -> SpecStore {
124        SpecStore {
125            hashed_sdk_key,
126            data: Arc::new(RwLock::new(SpecStoreData {
127                values: SpecsResponseFull::blank(),
128                time_received_at: None,
129                source: SpecsSource::Uninitialized,
130                id_lists: HashMap::new(),
131            })),
132            data_store,
133            statsig_runtime,
134            ops_stats,
135        }
136    }
137
138    pub fn set_source(&self, source: SpecsSource) {
139        if let Ok(mut mut_values) = self.data.write() {
140            mut_values.source = source;
141            log_d!(TAG, "SpecStore - Source Changed ({:?})", mut_values.source);
142        }
143    }
144
145    pub fn set_values(&self, values: SpecsUpdate) -> Result<(), StatsigErr> {
146        let parsed = serde_json::from_str::<SpecsResponse>(&values.data);
147        let dcs = match parsed {
148            Ok(SpecsResponse::Full(full)) => {
149                if !full.has_updates {
150                    self.log_no_update(values.source);
151                    return Ok(());
152                }
153
154                log_d!(
155                    TAG,
156                    "SpecStore Full Update: {} - [gates({}), configs({}), layers({})]",
157                    full.time,
158                    full.feature_gates.len(),
159                    full.dynamic_configs.len(),
160                    full.layer_configs.len(),
161                );
162
163                full
164            }
165            Ok(SpecsResponse::NoUpdates(no_updates)) => {
166                if !no_updates.has_updates {
167                    self.log_no_update(values.source);
168                    return Ok(());
169                }
170                log_error_to_statsig_and_console!(
171                    self.ops_stats,
172                    TAG,
173                    "Empty response with has_updates = true {:?}",
174                    values.source
175                );
176                return Err(StatsigErr::JsonParseError(
177                    "SpecsResponse".to_owned(),
178                    "Parse failure. 'has_update' is true, but failed to deserialize to response format 'dcs-v2'".to_owned(),
179                ));
180            }
181            Err(e) => {
182                // todo: Handle bad parsing
183                log_error_to_statsig_and_console!(
184                    self.ops_stats,
185                    TAG,
186                    "{:?}, {:?}",
187                    e,
188                    values.source
189                );
190                return Err(StatsigErr::JsonParseError(
191                    "config_spec".to_string(),
192                    e.to_string(),
193                ));
194            }
195        };
196        if let Ok(mut mut_values) = self.data.write() {
197            let cached_time_is_newer =
198                mut_values.values.time > 0 && mut_values.values.time > dcs.time;
199            let checksums_match =
200                mut_values
201                    .values
202                    .checksum
203                    .as_ref()
204                    .is_some_and(|cached_checksum| {
205                        dcs.checksum
206                            .as_ref()
207                            .is_some_and(|new_checksum| cached_checksum == new_checksum)
208                    });
209
210            if cached_time_is_newer || checksums_match {
211                log_d!(
212                    TAG,
213                    "SpecStore - Received values for [time: {}, checksum: {}], but currently has values for [time: {}, checksum: {}]. Ignoring values.",
214                    dcs.time,
215                    dcs.checksum.unwrap_or(String::new()),
216                    mut_values.values.time,
217                    mut_values.values.checksum.clone().unwrap_or(String::new()),
218                );
219                return Ok(());
220            }
221            let curr_time = Some(Utc::now().timestamp_millis() as u64);
222            let prev_source = mut_values.source.clone();
223            mut_values.values = *dcs;
224            mut_values.time_received_at = curr_time;
225            mut_values.source = values.source.clone();
226            if self.data_store.is_some() && mut_values.source == SpecsSource::Network {
227                if let Some(data_store) = self.data_store.clone() {
228                    let hashed_key = self.hashed_sdk_key.clone();
229                    self.statsig_runtime.clone().map(|rt| {
230                        let copy = curr_time;
231                        rt.spawn("update data adapter", move |_| async move {
232                            let _ = data_store
233                                .set(&get_data_adapter_dcs_key(&hashed_key), &values.data, copy)
234                                .await;
235                        })
236                    });
237                }
238            }
239            self.log_processing_config(
240                mut_values.values.time,
241                mut_values.source.clone(),
242                prev_source,
243            );
244        }
245
246        Ok(())
247    }
248
249    #[must_use]
250    pub fn get_sdk_config_value(&self, key: &str) -> Option<DynamicValue> {
251        match self.data.read() {
252            Ok(data) => match &data.values.sdk_configs {
253                Some(sdk_configs) => sdk_configs.get(key).cloned(),
254                None => {
255                    log_d!(TAG, "SDK Configs not found");
256                    None
257                }
258            },
259            Err(e) => {
260                log_e!(TAG, "Failed to acquire read lock: {}", e);
261                None
262            }
263        }
264    }
265
266    fn log_processing_config(&self, lcut: u64, source: SpecsSource, prev_source: SpecsSource) {
267        let delay = Utc::now().timestamp_millis() as u64 - lcut;
268        log_d!(TAG, "SpecStore - Updated ({:?})", source);
269
270        if prev_source != SpecsSource::Uninitialized && prev_source != SpecsSource::Loading {
271            self.ops_stats.log(ObservabilityEvent::new_event(
272                MetricType::Dist,
273                "config_propogation_diff".to_string(),
274                delay as f64,
275                Some(HashMap::from([("source".to_string(), source.to_string())])),
276            ));
277        }
278    }
279
280    fn log_no_update(&self, source: SpecsSource) {
281        log_d!(TAG, "SpecStore - No Updates");
282        self.ops_stats.log(ObservabilityEvent::new_event(
283            MetricType::Increment,
284            "config_no_update".to_string(),
285            1.0,
286            Some(HashMap::from([("source".to_string(), source.to_string())])),
287        ));
288    }
289}
290
291impl SpecsResponseFull {
292    fn blank() -> Self {
293        SpecsResponseFull {
294            feature_gates: Default::default(),
295            dynamic_configs: Default::default(),
296            layer_configs: Default::default(),
297            condition_map: Default::default(),
298            experiment_to_layer: Default::default(),
299            has_updates: true,
300            time: 0,
301            checksum: None,
302            default_environment: None,
303            app_id: None,
304            sdk_keys_to_app_ids: None,
305            hashed_sdk_keys_to_app_ids: None,
306            diagnostics: None,
307            param_stores: None,
308            sdk_configs: None,
309            cmab_configs: None,
310            overrides: None,
311            override_rules: None,
312        }
313    }
314}