statsig_rust/
spec_store.rs

1use crate::compression::zstd_decompression_dict::DictionaryDecoder;
2use crate::data_store_interface::{get_data_adapter_dcs_key, DataStoreTrait};
3use crate::global_configs::GlobalConfigs;
4use crate::id_lists_adapter::{IdList, IdListsUpdateListener};
5use crate::observability::observability_client_adapter::{MetricType, ObservabilityEvent};
6use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
7use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
8use crate::specs_response::spec_types::{SpecsResponseFull, SpecsResponseNoUpdates};
9use crate::specs_response::spec_types_encoded::DecodedSpecsResponse;
10use crate::utils::maybe_trim_malloc;
11use crate::{
12    log_d, log_e, log_error_to_statsig_and_console, SpecsInfo, SpecsSource, SpecsUpdate,
13    SpecsUpdateListener, StatsigErr, StatsigRuntime,
14};
15use chrono::Utc;
16use std::collections::HashMap;
17use std::sync::{Arc, RwLock};
18
19pub struct SpecStoreData {
20    pub source: SpecsSource,
21    pub source_api: Option<String>,
22    pub time_received_at: Option<u64>,
23    pub values: SpecsResponseFull,
24    pub next_values: Option<SpecsResponseFull>,
25    pub decompression_dict: Option<DictionaryDecoder>,
26    pub id_lists: HashMap<String, IdList>,
27}
28
29const TAG: &str = stringify!(SpecStore);
30
31pub struct SpecStore {
32    pub data: Arc<RwLock<SpecStoreData>>,
33
34    hashed_sdk_key: String,
35    data_store: Option<Arc<dyn DataStoreTrait>>,
36    statsig_runtime: Arc<StatsigRuntime>,
37    ops_stats: Arc<OpsStatsForInstance>,
38    global_configs: Arc<GlobalConfigs>,
39}
40
41impl SpecStore {
42    #[must_use]
43    pub fn new(
44        sdk_key: &str,
45        hashed_sdk_key: String,
46        statsig_runtime: Arc<StatsigRuntime>,
47        data_store: Option<Arc<dyn DataStoreTrait>>,
48    ) -> SpecStore {
49        SpecStore {
50            hashed_sdk_key,
51            data: Arc::new(RwLock::new(SpecStoreData {
52                values: SpecsResponseFull::default(),
53                next_values: Some(SpecsResponseFull::default()),
54                time_received_at: None,
55                source: SpecsSource::Uninitialized,
56                source_api: None,
57                decompression_dict: None,
58                id_lists: HashMap::new(),
59            })),
60            data_store,
61            statsig_runtime,
62            ops_stats: OPS_STATS.get_for_instance(sdk_key),
63            global_configs: GlobalConfigs::get_instance(sdk_key),
64        }
65    }
66
67    pub fn set_source(&self, source: SpecsSource) {
68        if let Ok(mut mut_values) = self.data.write() {
69            mut_values.source = source;
70            log_d!(TAG, "Source Changed ({:?})", mut_values.source);
71        }
72    }
73
74    pub fn get_current_values(&self) -> Option<SpecsResponseFull> {
75        let data = self.data.read().ok()?;
76        let json = serde_json::to_string(&data.values).ok()?;
77        serde_json::from_str::<SpecsResponseFull>(&json).ok()
78    }
79
80    pub fn set_values(&self, specs_update: SpecsUpdate) -> Result<(), StatsigErr> {
81        let (mut next_values, decompression_dict) = match self.data.write() {
82            Ok(mut data) => (
83                data.next_values.take().unwrap_or_default(),
84                data.decompression_dict.clone(),
85            ),
86            Err(e) => {
87                log_e!(TAG, "Failed to acquire write lock: {}", e);
88                return Err(StatsigErr::LockFailure(e.to_string()));
89            }
90        };
91
92        let decompression_dict =
93            match self.parse_specs_response(&specs_update, &mut next_values, decompression_dict) {
94                Ok(Some(full)) => full,
95                Ok(None) => {
96                    self.ops_stats_log_no_update(specs_update.source, specs_update.source_api);
97                    return Ok(());
98                }
99                Err(e) => {
100                    return Err(e);
101                }
102            };
103
104        if self.are_current_values_newer(&next_values) {
105            return Ok(());
106        }
107
108        self.try_update_global_configs(&next_values);
109
110        let now = Utc::now().timestamp_millis() as u64;
111        let (prev_source, prev_lcut, curr_values_time) = self.swap_current_with_next(
112            next_values,
113            &specs_update,
114            decompression_dict,
115            now,
116            specs_update.source_api.clone(),
117        )?;
118
119        self.try_update_data_store(&specs_update.source, specs_update.data, now);
120        self.ops_stats_log_config_propagation_diff(
121            curr_values_time,
122            prev_lcut,
123            &specs_update.source,
124            &prev_source,
125            specs_update.source_api,
126        );
127
128        // Glibc requested more memory than needed when deserializing a big json blob
129        // And memory allocator fails to return it.
130        // To prevent service from OOMing, manually unused heap memory.
131        maybe_trim_malloc();
132
133        Ok(())
134    }
135}
136
137// -------------------------------------------------------------------------------------------- [Private Functions]
138
139impl SpecStore {
140    fn parse_specs_response(
141        &self,
142        values: &SpecsUpdate,
143        next_values: &mut SpecsResponseFull,
144        decompression_dict: Option<DictionaryDecoder>,
145    ) -> Result<Option<Option<DictionaryDecoder>>, StatsigErr> {
146        let full_update_decoder_result = DecodedSpecsResponse::from_slice(
147            &values.data,
148            next_values,
149            decompression_dict.as_ref(),
150        );
151
152        if let Ok(result) = full_update_decoder_result {
153            if next_values.has_updates {
154                return Ok(Some(result));
155            }
156
157            return Ok(None);
158        }
159
160        let mut next_no_updates = SpecsResponseNoUpdates { has_updates: false };
161        let no_updates_decoder_result = DecodedSpecsResponse::from_slice(
162            &values.data,
163            &mut next_no_updates,
164            decompression_dict.as_ref(),
165        );
166
167        if no_updates_decoder_result.is_ok() && !next_no_updates.has_updates {
168            return Ok(None);
169        }
170
171        let error = full_update_decoder_result.err().map_or_else(
172            || StatsigErr::JsonParseError("SpecsResponse".to_string(), "Unknown error".to_string()),
173            |e| StatsigErr::JsonParseError("SpecsResponse".to_string(), e.to_string()),
174        );
175
176        log_error_to_statsig_and_console!(self.ops_stats, TAG, error);
177        Err(error)
178    }
179
180    fn swap_current_with_next(
181        &self,
182        next_values: SpecsResponseFull,
183        specs_update: &SpecsUpdate,
184        decompression_dict: Option<DictionaryDecoder>,
185        now: u64,
186        source_api: Option<String>,
187    ) -> Result<(SpecsSource, u64, u64), StatsigErr> {
188        match self.data.write() {
189            Ok(mut data) => {
190                let prev_source = std::mem::replace(&mut data.source, specs_update.source.clone());
191                let prev_lcut = data.values.time;
192
193                let mut temp = next_values;
194                std::mem::swap(&mut data.values, &mut temp);
195                data.next_values = Some(temp);
196
197                data.time_received_at = Some(now);
198                data.decompression_dict = decompression_dict;
199                data.source_api = source_api;
200                Ok((prev_source, prev_lcut, data.values.time))
201            }
202            Err(e) => {
203                log_e!(TAG, "Failed to acquire write lock: {}", e);
204                Err(StatsigErr::LockFailure(e.to_string()))
205            }
206        }
207    }
208
209    fn ops_stats_log_no_update(&self, source: SpecsSource, source_api: Option<String>) {
210        log_d!(TAG, "No Updates");
211        self.ops_stats.log(ObservabilityEvent::new_event(
212            MetricType::Increment,
213            "config_no_update".to_string(),
214            1.0,
215            Some(HashMap::from([
216                ("source".to_string(), source.to_string()),
217                (
218                    "spec_source_api".to_string(),
219                    source_api.unwrap_or_default(),
220                ),
221            ])),
222        ));
223    }
224
225    fn ops_stats_log_config_propagation_diff(
226        &self,
227        lcut: u64,
228        prev_lcut: u64,
229        source: &SpecsSource,
230        prev_source: &SpecsSource,
231        source_api: Option<String>,
232    ) {
233        let delay = Utc::now().timestamp_millis() as u64 - lcut;
234        log_d!(TAG, "Updated ({:?})", source);
235
236        if *prev_source == SpecsSource::Uninitialized || *prev_source == SpecsSource::Loading {
237            return;
238        }
239
240        self.ops_stats.log(ObservabilityEvent::new_event(
241            MetricType::Dist,
242            "config_propagation_diff".to_string(),
243            delay as f64,
244            Some(HashMap::from([
245                ("source".to_string(), source.to_string()),
246                ("lcut".to_string(), lcut.to_string()),
247                ("prev_lcut".to_string(), prev_lcut.to_string()),
248                (
249                    "spec_source_api".to_string(),
250                    source_api.unwrap_or_default(),
251                ),
252            ])),
253        ));
254    }
255
256    fn try_update_global_configs(&self, dcs: &SpecsResponseFull) {
257        if let Some(diagnostics) = &dcs.diagnostics {
258            self.global_configs
259                .set_diagnostics_sampling_rates(diagnostics.clone());
260        }
261
262        if let Some(sdk_configs) = &dcs.sdk_configs {
263            self.global_configs.set_sdk_configs(sdk_configs.clone());
264        }
265    }
266
267    fn try_update_data_store(&self, source: &SpecsSource, data: Vec<u8>, now: u64) {
268        if source != &SpecsSource::Network {
269            return;
270        }
271
272        let data_store = match &self.data_store {
273            Some(data_store) => data_store.clone(),
274            None => return,
275        };
276
277        let hashed_key = self.hashed_sdk_key.clone();
278
279        self.statsig_runtime.spawn(
280            "spec_store_update_data_store",
281            move |_shutdown_notif| async move {
282                let data_string = match String::from_utf8(data) {
283                    Ok(s) => s,
284                    Err(e) => {
285                        log_e!(TAG, "Failed to convert data to string: {}", e);
286                        return;
287                    }
288                };
289
290                let _ = data_store
291                    .set(
292                        &get_data_adapter_dcs_key(&hashed_key),
293                        &data_string,
294                        Some(now),
295                    )
296                    .await;
297            },
298        );
299    }
300
301    fn are_current_values_newer(&self, next_values: &SpecsResponseFull) -> bool {
302        let data = match self.data.read() {
303            Ok(data) => data,
304            Err(e) => {
305                log_e!(TAG, "Failed to acquire read lock: {}", e);
306                return false;
307            }
308        };
309
310        let curr_values = &data.values;
311        let curr_checksum = curr_values.checksum.as_deref().unwrap_or_default();
312        let new_checksum = next_values.checksum.as_deref().unwrap_or_default();
313
314        let cached_time_is_newer = curr_values.time > 0 && curr_values.time > next_values.time;
315        let checksums_match = !curr_checksum.is_empty() && curr_checksum == new_checksum;
316
317        if cached_time_is_newer || checksums_match {
318            log_d!(
319                TAG,
320                "Received values for [time: {}, checksum: {}], but currently has values for [time: {}, checksum: {}]. Ignoring values.",
321                next_values.time,
322                new_checksum,
323                curr_values.time,
324                curr_checksum,
325                );
326            return true;
327        }
328
329        false
330    }
331}
332
333// -------------------------------------------------------------------------------------------- [Impl SpecsUpdateListener]
334
335impl SpecsUpdateListener for SpecStore {
336    fn did_receive_specs_update(&self, update: SpecsUpdate) -> Result<(), StatsigErr> {
337        self.set_values(update)
338    }
339
340    fn get_current_specs_info(&self) -> SpecsInfo {
341        match self.data.read() {
342            Ok(data) => SpecsInfo {
343                lcut: Some(data.values.time),
344                checksum: data.values.checksum.clone(),
345                zstd_dict_id: data
346                    .decompression_dict
347                    .as_ref()
348                    .map(|d| d.get_dict_id().to_string()),
349                source: data.source.clone(),
350                source_api: data.source_api.clone(),
351            },
352            Err(e) => {
353                log_e!(TAG, "Failed to acquire read lock: {}", e);
354                SpecsInfo {
355                    lcut: None,
356                    checksum: None,
357                    zstd_dict_id: None,
358                    source: SpecsSource::Error,
359                    source_api: None,
360                }
361            }
362        }
363    }
364}
365
366// -------------------------------------------------------------------------------------------- [Impl IdListsUpdateListener]
367
368impl IdListsUpdateListener for SpecStore {
369    fn get_current_id_list_metadata(
370        &self,
371    ) -> HashMap<String, crate::id_lists_adapter::IdListMetadata> {
372        match self.data.read() {
373            Ok(data) => data
374                .id_lists
375                .iter()
376                .map(|(key, list)| (key.clone(), list.metadata.clone()))
377                .collect(),
378            Err(e) => {
379                log_e!(TAG, "Failed to acquire read lock: {}", e);
380                HashMap::new()
381            }
382        }
383    }
384
385    fn did_receive_id_list_updates(
386        &self,
387        updates: HashMap<String, crate::id_lists_adapter::IdListUpdate>,
388    ) {
389        let mut data = match self.data.write() {
390            Ok(data) => data,
391            Err(e) => {
392                log_e!(TAG, "Failed to acquire write lock: {}", e);
393                return;
394            }
395        };
396
397        // delete any id_lists that are not in the updates
398        data.id_lists.retain(|name, _| updates.contains_key(name));
399
400        for (list_name, update) in updates {
401            if let Some(entry) = data.id_lists.get_mut(&list_name) {
402                // update existing
403                entry.apply_update(&update);
404            } else {
405                // add new
406                let mut list = IdList::new(update.new_metadata.clone());
407                list.apply_update(&update);
408                data.id_lists.insert(list_name, list);
409            }
410        }
411    }
412}