statsig_rust/
spec_store.rs

1use crate::data_store_interface::{get_data_adapter_dcs_key, DataStoreTrait};
2use crate::id_lists_adapter::{IdList, IdListsUpdateListener};
3use crate::observability::observability_client_adapter::{MetricType, ObservabilityEvent};
4use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
5use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
6use crate::sdk_diagnostics::diagnostics::Diagnostics;
7use crate::spec_types::{SpecsResponse, SpecsResponseFull};
8use crate::{
9    log_d, log_e, log_error_to_statsig_and_console, DynamicValue, SpecsInfo, SpecsSource,
10    SpecsUpdate, SpecsUpdateListener, StatsigErr, StatsigRuntime,
11};
12use chrono::Utc;
13use serde::Serialize;
14use std::collections::HashMap;
15use std::sync::{Arc, RwLock};
16
17#[derive(Clone, Serialize)]
18pub struct SpecStoreData {
19    pub source: SpecsSource,
20    pub time_received_at: Option<u64>,
21    pub values: SpecsResponseFull,
22
23    pub id_lists: HashMap<String, IdList>,
24}
25
26const TAG: &str = stringify!(SpecStore);
27
28pub struct SpecStore {
29    pub hashed_sdk_key: String,
30    pub data: Arc<RwLock<SpecStoreData>>,
31    pub data_store: Option<Arc<dyn DataStoreTrait>>,
32    pub statsig_runtime: Option<Arc<StatsigRuntime>>,
33    diagnostics: Option<Arc<Diagnostics>>,
34    ops_stats: Arc<OpsStatsForInstance>,
35}
36
37impl SpecsUpdateListener for SpecStore {
38    fn did_receive_specs_update(&self, update: SpecsUpdate) -> Result<(), StatsigErr> {
39        self.set_values(update)
40    }
41
42    fn get_current_specs_info(&self) -> SpecsInfo {
43        match self.data.read() {
44            Ok(data) => SpecsInfo {
45                lcut: Some(data.values.time),
46                checksum: data.values.checksum.clone(),
47                source: data.source.clone(),
48            },
49            Err(e) => {
50                log_e!(TAG, "Failed to acquire read lock: {}", e);
51                SpecsInfo {
52                    lcut: None,
53                    checksum: None,
54                    source: SpecsSource::Error,
55                }
56            }
57        }
58    }
59}
60
61impl IdListsUpdateListener for SpecStore {
62    fn get_current_id_list_metadata(
63        &self,
64    ) -> HashMap<String, crate::id_lists_adapter::IdListMetadata> {
65        match self.data.read() {
66            Ok(data) => data
67                .id_lists
68                .iter()
69                .map(|(key, list)| (key.clone(), list.metadata.clone()))
70                .collect(),
71            Err(e) => {
72                log_e!(TAG, "Failed to acquire read lock: {}", e);
73                HashMap::new()
74            }
75        }
76    }
77
78    fn did_receive_id_list_updates(
79        &self,
80        updates: HashMap<String, crate::id_lists_adapter::IdListUpdate>,
81    ) {
82        let mut data = match self.data.write() {
83            Ok(data) => data,
84            Err(e) => {
85                log_e!(TAG, "Failed to acquire write lock: {}", e);
86                return;
87            }
88        };
89
90        // delete any id_lists that are not in the updates
91        data.id_lists.retain(|name, _| updates.contains_key(name));
92
93        for (list_name, update) in updates {
94            if let Some(entry) = data.id_lists.get_mut(&list_name) {
95                // update existing
96                entry.apply_update(&update);
97            } else {
98                // add new
99                let mut list = IdList::new(update.new_metadata.clone());
100                list.apply_update(&update);
101                data.id_lists.insert(list_name, list);
102            }
103        }
104    }
105}
106
107impl Default for SpecStore {
108    fn default() -> Self {
109        let sdk_key = String::new();
110        Self::new(&sdk_key, sdk_key.to_string(), None, None, None)
111    }
112}
113
114impl SpecStore {
115    #[must_use]
116    pub fn new(
117        sdk_key: &str,
118        hashed_sdk_key: String,
119        data_store: Option<Arc<dyn DataStoreTrait>>,
120        statsig_runtime: Option<Arc<StatsigRuntime>>,
121        diagnostics: Option<Arc<Diagnostics>>,
122    ) -> SpecStore {
123        SpecStore {
124            hashed_sdk_key,
125            data: Arc::new(RwLock::new(SpecStoreData {
126                values: SpecsResponseFull::blank(),
127                time_received_at: None,
128                source: SpecsSource::Uninitialized,
129                id_lists: HashMap::new(),
130            })),
131            data_store,
132            statsig_runtime,
133            ops_stats: OPS_STATS.get_for_instance(sdk_key),
134            diagnostics,
135        }
136    }
137
138    pub fn set_source(&self, source: SpecsSource) {
139        if let Ok(mut mut_values) = self.data.write() {
140            mut_values.source = source;
141            log_d!(TAG, "SpecStore - Source Changed ({:?})", mut_values.source);
142        }
143    }
144
145    pub fn set_values(&self, values: SpecsUpdate) -> Result<(), StatsigErr> {
146        let parsed = serde_json::from_str::<SpecsResponse>(&values.data);
147        let dcs = match parsed {
148            Ok(SpecsResponse::Full(full)) => {
149                if !full.has_updates {
150                    self.log_no_update(values.source);
151                    return Ok(());
152                }
153
154                log_d!(
155                    TAG,
156                    "SpecStore Full Update: {} - [gates({}), configs({}), layers({})]",
157                    full.time,
158                    full.feature_gates.len(),
159                    full.dynamic_configs.len(),
160                    full.layer_configs.len(),
161                );
162
163                full
164            }
165            Ok(SpecsResponse::NoUpdates(no_updates)) => {
166                if !no_updates.has_updates {
167                    self.log_no_update(values.source);
168                    return Ok(());
169                }
170                log_error_to_statsig_and_console!(
171                    self.ops_stats,
172                    TAG,
173                    "Empty response with has_updates = true {:?}",
174                    values.source
175                );
176                return Err(StatsigErr::JsonParseError(
177                    "SpecsResponse".to_owned(),
178                    "Parse failure. 'has_update' is true, but failed to deserialize to response format 'dcs-v2'".to_owned(),
179                ));
180            }
181            Err(e) => {
182                // todo: Handle bad parsing
183                log_error_to_statsig_and_console!(
184                    self.ops_stats,
185                    TAG,
186                    "{:?}, {:?}",
187                    e,
188                    values.source
189                );
190                return Err(StatsigErr::JsonParseError(
191                    "config_spec".to_string(),
192                    e.to_string(),
193                ));
194            }
195        };
196
197        if let Some(ref diagnostics) = dcs.diagnostics {
198            if self.diagnostics.is_some() {
199                if let Some(diagnostics_instance) = &self.diagnostics {
200                    diagnostics_instance.set_sampling_rate(diagnostics.clone());
201                }
202            }
203        }
204
205        if let Ok(mut mut_values) = self.data.write() {
206            let cached_time_is_newer =
207                mut_values.values.time > 0 && mut_values.values.time > dcs.time;
208            let checksums_match =
209                mut_values
210                    .values
211                    .checksum
212                    .as_ref()
213                    .is_some_and(|cached_checksum| {
214                        dcs.checksum
215                            .as_ref()
216                            .is_some_and(|new_checksum| cached_checksum == new_checksum)
217                    });
218
219            if cached_time_is_newer || checksums_match {
220                log_d!(
221                    TAG,
222                    "SpecStore - Received values for [time: {}, checksum: {}], but currently has values for [time: {}, checksum: {}]. Ignoring values.",
223                    dcs.time,
224                    dcs.checksum.unwrap_or(String::new()),
225                    mut_values.values.time,
226                    mut_values.values.checksum.clone().unwrap_or(String::new()),
227                );
228                return Ok(());
229            }
230            let curr_time = Some(Utc::now().timestamp_millis() as u64);
231            let prev_source = mut_values.source.clone();
232            mut_values.values = *dcs;
233            mut_values.time_received_at = curr_time;
234            mut_values.source = values.source.clone();
235            if self.data_store.is_some() && mut_values.source == SpecsSource::Network {
236                if let Some(data_store) = self.data_store.clone() {
237                    let hashed_key = self.hashed_sdk_key.clone();
238                    self.statsig_runtime.clone().map(|rt| {
239                        let copy = curr_time;
240                        rt.spawn("update data adapter", move |_| async move {
241                            let _ = data_store
242                                .set(&get_data_adapter_dcs_key(&hashed_key), &values.data, copy)
243                                .await;
244                        })
245                    });
246                }
247            }
248            self.log_processing_config(
249                mut_values.values.time,
250                mut_values.source.clone(),
251                prev_source,
252            );
253        }
254
255        Ok(())
256    }
257
258    #[must_use]
259    pub fn get_sdk_config_value(&self, key: &str) -> Option<DynamicValue> {
260        match self.data.read() {
261            Ok(data) => match &data.values.sdk_configs {
262                Some(sdk_configs) => sdk_configs.get(key).cloned(),
263                None => {
264                    log_d!(TAG, "SDK Configs not found");
265                    None
266                }
267            },
268            Err(e) => {
269                log_e!(TAG, "Failed to acquire read lock: {}", e);
270                None
271            }
272        }
273    }
274
275    fn log_processing_config(&self, lcut: u64, source: SpecsSource, prev_source: SpecsSource) {
276        let delay = Utc::now().timestamp_millis() as u64 - lcut;
277        log_d!(TAG, "SpecStore - Updated ({:?})", source);
278
279        if prev_source != SpecsSource::Uninitialized && prev_source != SpecsSource::Loading {
280            self.ops_stats.log(ObservabilityEvent::new_event(
281                MetricType::Dist,
282                "config_propogation_diff".to_string(),
283                delay as f64,
284                Some(HashMap::from([("source".to_string(), source.to_string())])),
285            ));
286        }
287    }
288
289    fn log_no_update(&self, source: SpecsSource) {
290        log_d!(TAG, "SpecStore - No Updates");
291        self.ops_stats.log(ObservabilityEvent::new_event(
292            MetricType::Increment,
293            "config_no_update".to_string(),
294            1.0,
295            Some(HashMap::from([("source".to_string(), source.to_string())])),
296        ));
297    }
298}
299
300impl SpecsResponseFull {
301    fn blank() -> Self {
302        SpecsResponseFull {
303            feature_gates: Default::default(),
304            dynamic_configs: Default::default(),
305            layer_configs: Default::default(),
306            condition_map: Default::default(),
307            experiment_to_layer: Default::default(),
308            has_updates: true,
309            time: 0,
310            checksum: None,
311            default_environment: None,
312            app_id: None,
313            sdk_keys_to_app_ids: None,
314            hashed_sdk_keys_to_app_ids: None,
315            diagnostics: None,
316            param_stores: None,
317            sdk_configs: None,
318            cmab_configs: None,
319            overrides: None,
320            override_rules: None,
321        }
322    }
323}