statsig_rust/
spec_store.rs

1use crate::compression::zstd_decompression_dict::DictionaryDecoder;
2use crate::data_store_interface::{get_data_adapter_dcs_key, DataStoreTrait};
3use crate::global_configs::GlobalConfigs;
4use crate::id_lists_adapter::{IdList, IdListsUpdateListener};
5use crate::observability::observability_client_adapter::{MetricType, ObservabilityEvent};
6use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
7use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
8use crate::specs_response::spec_types::{SpecsResponseFull, SpecsResponseNoUpdates};
9use crate::specs_response::spec_types_encoded::DecodedSpecsResponse;
10use crate::utils::maybe_trim_malloc;
11use crate::{
12    log_d, log_e, log_error_to_statsig_and_console, SpecsInfo, SpecsSource, SpecsUpdate,
13    SpecsUpdateListener, StatsigErr, StatsigRuntime,
14};
15use chrono::Utc;
16use std::collections::HashMap;
17use std::sync::{Arc, RwLock};
18
19pub struct SpecStoreData {
20    pub source: SpecsSource,
21    pub time_received_at: Option<u64>,
22    pub values: SpecsResponseFull,
23    pub next_values: Option<SpecsResponseFull>,
24    pub decompression_dict: Option<DictionaryDecoder>,
25    pub id_lists: HashMap<String, IdList>,
26}
27
28const TAG: &str = stringify!(SpecStore);
29
30pub struct SpecStore {
31    pub data: Arc<RwLock<SpecStoreData>>,
32
33    hashed_sdk_key: String,
34    data_store: Option<Arc<dyn DataStoreTrait>>,
35    statsig_runtime: Arc<StatsigRuntime>,
36    ops_stats: Arc<OpsStatsForInstance>,
37    global_configs: Arc<GlobalConfigs>,
38}
39
40impl SpecStore {
41    #[must_use]
42    pub fn new(
43        sdk_key: &str,
44        hashed_sdk_key: String,
45        statsig_runtime: Arc<StatsigRuntime>,
46        data_store: Option<Arc<dyn DataStoreTrait>>,
47    ) -> SpecStore {
48        SpecStore {
49            hashed_sdk_key,
50            data: Arc::new(RwLock::new(SpecStoreData {
51                values: SpecsResponseFull::default(),
52                next_values: Some(SpecsResponseFull::default()),
53                time_received_at: None,
54                source: SpecsSource::Uninitialized,
55                decompression_dict: None,
56                id_lists: HashMap::new(),
57            })),
58            data_store,
59            statsig_runtime,
60            ops_stats: OPS_STATS.get_for_instance(sdk_key),
61            global_configs: GlobalConfigs::get_instance(sdk_key),
62        }
63    }
64
65    pub fn set_source(&self, source: SpecsSource) {
66        if let Ok(mut mut_values) = self.data.write() {
67            mut_values.source = source;
68            log_d!(TAG, "Source Changed ({:?})", mut_values.source);
69        }
70    }
71
72    pub fn get_current_values(&self) -> Option<SpecsResponseFull> {
73        let data = self.data.read().ok()?;
74        let json = serde_json::to_string(&data.values).ok()?;
75        serde_json::from_str::<SpecsResponseFull>(&json).ok()
76    }
77
78    pub fn set_values(&self, specs_update: SpecsUpdate) -> Result<(), StatsigErr> {
79        let (mut next_values, decompression_dict) = match self.data.write() {
80            Ok(mut data) => (
81                data.next_values.take().unwrap_or_default(),
82                data.decompression_dict.clone(),
83            ),
84            Err(e) => {
85                log_e!(TAG, "Failed to acquire write lock: {}", e);
86                return Err(StatsigErr::LockFailure(e.to_string()));
87            }
88        };
89
90        let decompression_dict =
91            match self.parse_specs_response(&specs_update, &mut next_values, decompression_dict) {
92                Ok(Some(full)) => full,
93                Ok(None) => {
94                    self.ops_stats_log_no_update(specs_update.source);
95                    return Ok(());
96                }
97                Err(e) => {
98                    return Err(e);
99                }
100            };
101
102        if self.are_current_values_newer(&next_values) {
103            return Ok(());
104        }
105
106        self.try_update_global_configs(&next_values);
107
108        let now = Utc::now().timestamp_millis() as u64;
109        let (prev_source, prev_lcut, curr_values_time) =
110            self.swap_current_with_next(next_values, &specs_update, decompression_dict, now)?;
111
112        self.try_update_data_store(&specs_update.source, specs_update.data, now);
113        self.ops_stats_log_config_propagation_diff(
114            curr_values_time,
115            prev_lcut,
116            &specs_update.source,
117            &prev_source,
118        );
119
120        // Glibc requested more memory than needed when deserializing a big json blob
121        // And memory allocator fails to return it.
122        // To prevent service from OOMing, manually unused heap memory.
123        maybe_trim_malloc();
124
125        Ok(())
126    }
127}
128
129// -------------------------------------------------------------------------------------------- [Private Functions]
130
131impl SpecStore {
132    fn parse_specs_response(
133        &self,
134        values: &SpecsUpdate,
135        next_values: &mut SpecsResponseFull,
136        decompression_dict: Option<DictionaryDecoder>,
137    ) -> Result<Option<Option<DictionaryDecoder>>, StatsigErr> {
138        let full_update_decoder_result = DecodedSpecsResponse::from_slice(
139            &values.data,
140            next_values,
141            decompression_dict.as_ref(),
142        );
143
144        if let Ok(result) = full_update_decoder_result {
145            if next_values.has_updates {
146                return Ok(Some(result));
147            }
148
149            return Ok(None);
150        }
151
152        let mut next_no_updates = SpecsResponseNoUpdates { has_updates: false };
153        let no_updates_decoder_result = DecodedSpecsResponse::from_slice(
154            &values.data,
155            &mut next_no_updates,
156            decompression_dict.as_ref(),
157        );
158
159        if no_updates_decoder_result.is_ok() && !next_no_updates.has_updates {
160            return Ok(None);
161        }
162
163        let error = full_update_decoder_result.err().map_or_else(
164            || StatsigErr::JsonParseError("SpecsResponse".to_string(), "Unknown error".to_string()),
165            |e| StatsigErr::JsonParseError("SpecsResponse".to_string(), e.to_string()),
166        );
167
168        log_error_to_statsig_and_console!(self.ops_stats, TAG, error);
169        Err(error)
170    }
171
172    fn swap_current_with_next(
173        &self,
174        next_values: SpecsResponseFull,
175        specs_update: &SpecsUpdate,
176        decompression_dict: Option<DictionaryDecoder>,
177        now: u64,
178    ) -> Result<(SpecsSource, u64, u64), StatsigErr> {
179        match self.data.write() {
180            Ok(mut data) => {
181                let prev_source = std::mem::replace(&mut data.source, specs_update.source.clone());
182                let prev_lcut = data.values.time;
183
184                let mut temp = next_values;
185                std::mem::swap(&mut data.values, &mut temp);
186                data.next_values = Some(temp);
187
188                data.time_received_at = Some(now);
189                data.decompression_dict = decompression_dict;
190
191                Ok((prev_source, prev_lcut, data.values.time))
192            }
193            Err(e) => {
194                log_e!(TAG, "Failed to acquire write lock: {}", e);
195                Err(StatsigErr::LockFailure(e.to_string()))
196            }
197        }
198    }
199
200    fn ops_stats_log_no_update(&self, source: SpecsSource) {
201        log_d!(TAG, "No Updates");
202        self.ops_stats.log(ObservabilityEvent::new_event(
203            MetricType::Increment,
204            "config_no_update".to_string(),
205            1.0,
206            Some(HashMap::from([("source".to_string(), source.to_string())])),
207        ));
208    }
209
210    fn ops_stats_log_config_propagation_diff(
211        &self,
212        lcut: u64,
213        prev_lcut: u64,
214        source: &SpecsSource,
215        prev_source: &SpecsSource,
216    ) {
217        let delay = Utc::now().timestamp_millis() as u64 - lcut;
218        log_d!(TAG, "Updated ({:?})", source);
219
220        if *prev_source == SpecsSource::Uninitialized || *prev_source == SpecsSource::Loading {
221            return;
222        }
223
224        self.ops_stats.log(ObservabilityEvent::new_event(
225            MetricType::Dist,
226            "config_propagation_diff".to_string(),
227            delay as f64,
228            Some(HashMap::from([
229                ("source".to_string(), source.to_string()),
230                ("lcut".to_string(), lcut.to_string()),
231                ("prev_lcut".to_string(), prev_lcut.to_string()),
232            ])),
233        ));
234    }
235
236    fn try_update_global_configs(&self, dcs: &SpecsResponseFull) {
237        if let Some(diagnostics) = &dcs.diagnostics {
238            self.global_configs
239                .set_diagnostics_sampling_rates(diagnostics.clone());
240        }
241
242        if let Some(sdk_configs) = &dcs.sdk_configs {
243            self.global_configs.set_sdk_configs(sdk_configs.clone());
244        }
245    }
246
247    fn try_update_data_store(&self, source: &SpecsSource, data: Vec<u8>, now: u64) {
248        if *source != SpecsSource::Network {
249            return;
250        }
251
252        let data_store = match &self.data_store {
253            Some(data_store) => data_store.clone(),
254            None => return,
255        };
256
257        let hashed_key = self.hashed_sdk_key.clone();
258
259        self.statsig_runtime.spawn(
260            "spec_store_update_data_store",
261            move |_shutdown_notif| async move {
262                let data_string = match String::from_utf8(data) {
263                    Ok(s) => s,
264                    Err(e) => {
265                        log_e!(TAG, "Failed to convert data to string: {}", e);
266                        return;
267                    }
268                };
269
270                let _ = data_store
271                    .set(
272                        &get_data_adapter_dcs_key(&hashed_key),
273                        &data_string,
274                        Some(now),
275                    )
276                    .await;
277            },
278        );
279    }
280
281    fn are_current_values_newer(&self, next_values: &SpecsResponseFull) -> bool {
282        let data = match self.data.read() {
283            Ok(data) => data,
284            Err(e) => {
285                log_e!(TAG, "Failed to acquire read lock: {}", e);
286                return false;
287            }
288        };
289
290        let curr_values = &data.values;
291        let curr_checksum = curr_values.checksum.as_deref().unwrap_or_default();
292        let new_checksum = next_values.checksum.as_deref().unwrap_or_default();
293
294        let cached_time_is_newer = curr_values.time > 0 && curr_values.time > next_values.time;
295        let checksums_match = !curr_checksum.is_empty() && curr_checksum == new_checksum;
296
297        if cached_time_is_newer || checksums_match {
298            log_d!(
299                TAG,
300                "Received values for [time: {}, checksum: {}], but currently has values for [time: {}, checksum: {}]. Ignoring values.",
301                next_values.time,
302                new_checksum,
303                curr_values.time,
304                curr_checksum,
305                );
306            return true;
307        }
308
309        false
310    }
311}
312
313// -------------------------------------------------------------------------------------------- [Impl SpecsUpdateListener]
314
315impl SpecsUpdateListener for SpecStore {
316    fn did_receive_specs_update(&self, update: SpecsUpdate) -> Result<(), StatsigErr> {
317        self.set_values(update)
318    }
319
320    fn get_current_specs_info(&self) -> SpecsInfo {
321        match self.data.read() {
322            Ok(data) => SpecsInfo {
323                lcut: Some(data.values.time),
324                checksum: data.values.checksum.clone(),
325                zstd_dict_id: data
326                    .decompression_dict
327                    .as_ref()
328                    .map(|d| d.get_dict_id().to_string()),
329                source: data.source.clone(),
330            },
331            Err(e) => {
332                log_e!(TAG, "Failed to acquire read lock: {}", e);
333                SpecsInfo {
334                    lcut: None,
335                    checksum: None,
336                    zstd_dict_id: None,
337                    source: SpecsSource::Error,
338                }
339            }
340        }
341    }
342}
343
344// -------------------------------------------------------------------------------------------- [Impl IdListsUpdateListener]
345
346impl IdListsUpdateListener for SpecStore {
347    fn get_current_id_list_metadata(
348        &self,
349    ) -> HashMap<String, crate::id_lists_adapter::IdListMetadata> {
350        match self.data.read() {
351            Ok(data) => data
352                .id_lists
353                .iter()
354                .map(|(key, list)| (key.clone(), list.metadata.clone()))
355                .collect(),
356            Err(e) => {
357                log_e!(TAG, "Failed to acquire read lock: {}", e);
358                HashMap::new()
359            }
360        }
361    }
362
363    fn did_receive_id_list_updates(
364        &self,
365        updates: HashMap<String, crate::id_lists_adapter::IdListUpdate>,
366    ) {
367        let mut data = match self.data.write() {
368            Ok(data) => data,
369            Err(e) => {
370                log_e!(TAG, "Failed to acquire write lock: {}", e);
371                return;
372            }
373        };
374
375        // delete any id_lists that are not in the updates
376        data.id_lists.retain(|name, _| updates.contains_key(name));
377
378        for (list_name, update) in updates {
379            if let Some(entry) = data.id_lists.get_mut(&list_name) {
380                // update existing
381                entry.apply_update(&update);
382            } else {
383                // add new
384                let mut list = IdList::new(update.new_metadata.clone());
385                list.apply_update(&update);
386                data.id_lists.insert(list_name, list);
387            }
388        }
389    }
390}