statsig_rust/
spec_store.rs

1use crate::compression::zstd_decompression_dict::DictionaryDecoder;
2use crate::data_store_interface::{get_data_adapter_dcs_key, DataStoreTrait};
3use crate::global_configs::GlobalConfigs;
4use crate::id_lists_adapter::{IdList, IdListsUpdateListener};
5use crate::observability::observability_client_adapter::{MetricType, ObservabilityEvent};
6use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
7use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
8use crate::specs_response::spec_types::{SpecsResponseFull, SpecsResponseNoUpdates};
9use crate::specs_response::spec_types_encoded::DecodedSpecsResponse;
10use crate::utils::maybe_trim_malloc;
11use crate::{
12    log_d, log_e, log_error_to_statsig_and_console, SpecsInfo, SpecsSource, SpecsUpdate,
13    SpecsUpdateListener, StatsigErr, StatsigRuntime,
14};
15use chrono::Utc;
16use parking_lot::RwLock;
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Duration;
20
21pub struct SpecStoreData {
22    pub source: SpecsSource,
23    pub source_api: Option<String>,
24    pub time_received_at: Option<u64>,
25    pub values: SpecsResponseFull,
26    pub next_values: Option<SpecsResponseFull>,
27    pub decompression_dict: Option<DictionaryDecoder>,
28    pub id_lists: HashMap<String, IdList>,
29}
30
31const TAG: &str = stringify!(SpecStore);
32
33pub struct SpecStore {
34    pub data: Arc<RwLock<SpecStoreData>>,
35
36    hashed_sdk_key: String,
37    data_store: Option<Arc<dyn DataStoreTrait>>,
38    statsig_runtime: Arc<StatsigRuntime>,
39    ops_stats: Arc<OpsStatsForInstance>,
40    global_configs: Arc<GlobalConfigs>,
41}
42
43impl SpecStore {
44    #[must_use]
45    pub fn new(
46        sdk_key: &str,
47        hashed_sdk_key: String,
48        statsig_runtime: Arc<StatsigRuntime>,
49        data_store: Option<Arc<dyn DataStoreTrait>>,
50    ) -> SpecStore {
51        SpecStore {
52            hashed_sdk_key,
53            data: Arc::new(RwLock::new(SpecStoreData {
54                values: SpecsResponseFull::default(),
55                next_values: Some(SpecsResponseFull::default()),
56                time_received_at: None,
57                source: SpecsSource::Uninitialized,
58                source_api: None,
59                decompression_dict: None,
60                id_lists: HashMap::new(),
61            })),
62            data_store,
63            statsig_runtime,
64            ops_stats: OPS_STATS.get_for_instance(sdk_key),
65            global_configs: GlobalConfigs::get_instance(sdk_key),
66        }
67    }
68
69    pub fn set_source(&self, source: SpecsSource) {
70        match self.data.try_write_for(Duration::from_secs(5)) {
71            Some(mut data) => {
72                data.source = source;
73                log_d!(TAG, "Source Changed ({:?})", data.source);
74            }
75            None => {
76                log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
77            }
78        }
79    }
80
81    pub fn get_current_values(&self) -> Option<SpecsResponseFull> {
82        let data = match self.data.try_read_for(Duration::from_secs(5)) {
83            Some(data) => data,
84            None => {
85                log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
86                return None;
87            }
88        };
89        let json = serde_json::to_string(&data.values).ok()?;
90        serde_json::from_str::<SpecsResponseFull>(&json).ok()
91    }
92
93    pub fn set_values(&self, specs_update: SpecsUpdate) -> Result<(), StatsigErr> {
94        let (mut next_values, decompression_dict) =
95            match self.data.try_write_for(Duration::from_secs(5)) {
96                Some(mut data) => (
97                    data.next_values.take().unwrap_or_default(),
98                    data.decompression_dict.clone(),
99                ),
100                None => {
101                    log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
102                    return Err(StatsigErr::LockFailure(
103                        "Failed to acquire write lock: Failed to lock data".to_string(),
104                    ));
105                }
106            };
107
108        let decompression_dict =
109            match self.parse_specs_response(&specs_update, &mut next_values, decompression_dict) {
110                Ok(Some(full)) => full,
111                Ok(None) => {
112                    self.ops_stats_log_no_update(specs_update.source, specs_update.source_api);
113                    return Ok(());
114                }
115                Err(e) => {
116                    return Err(e);
117                }
118            };
119        let decompression_dict_id = decompression_dict.as_ref().map(|d|d.get_dict_id().to_string());
120        if self.are_current_values_newer(&next_values) {
121            return Ok(());
122        }
123
124        self.try_update_global_configs(&next_values);
125
126        let now = Utc::now().timestamp_millis() as u64;
127        let (prev_source, prev_lcut, curr_values_time) = self.swap_current_with_next(
128            next_values,
129            &specs_update,
130            decompression_dict,
131            now,
132            specs_update.source_api.clone(),
133        )?;
134
135        self.try_update_data_store(&specs_update.source, specs_update.data, now);
136        self.ops_stats_log_config_propagation_diff(
137            curr_values_time,
138            prev_lcut,
139            &specs_update.source,
140            &prev_source,
141            specs_update.source_api,
142            decompression_dict_id
143        );
144
145        // Glibc requested more memory than needed when deserializing a big json blob
146        // And memory allocator fails to return it.
147        // To prevent service from OOMing, manually unused heap memory.
148        maybe_trim_malloc();
149
150        Ok(())
151    }
152}
153
154// -------------------------------------------------------------------------------------------- [Private Functions]
155
156impl SpecStore {
157    fn parse_specs_response(
158        &self,
159        values: &SpecsUpdate,
160        next_values: &mut SpecsResponseFull,
161        decompression_dict: Option<DictionaryDecoder>,
162    ) -> Result<Option<Option<DictionaryDecoder>>, StatsigErr> {
163        let full_update_decoder_result = DecodedSpecsResponse::from_slice(
164            &values.data,
165            next_values,
166            decompression_dict.as_ref(),
167        );
168
169        if let Ok(result) = full_update_decoder_result {
170            if next_values.has_updates {
171                return Ok(Some(result));
172            }
173
174            return Ok(None);
175        }
176
177        let mut next_no_updates = SpecsResponseNoUpdates { has_updates: false };
178        let no_updates_decoder_result = DecodedSpecsResponse::from_slice(
179            &values.data,
180            &mut next_no_updates,
181            decompression_dict.as_ref(),
182        );
183
184        if no_updates_decoder_result.is_ok() && !next_no_updates.has_updates {
185            return Ok(None);
186        }
187
188        let error = full_update_decoder_result.err().map_or_else(
189            || StatsigErr::JsonParseError("SpecsResponse".to_string(), "Unknown error".to_string()),
190            |e| StatsigErr::JsonParseError("SpecsResponse".to_string(), e.to_string()),
191        );
192
193        log_error_to_statsig_and_console!(self.ops_stats, TAG, error);
194        Err(error)
195    }
196
197    fn swap_current_with_next(
198        &self,
199        next_values: SpecsResponseFull,
200        specs_update: &SpecsUpdate,
201        decompression_dict: Option<DictionaryDecoder>,
202        now: u64,
203        source_api: Option<String>,
204    ) -> Result<(SpecsSource, u64, u64), StatsigErr> {
205        match self.data.try_write_for(Duration::from_secs(5)) {
206            Some(mut data) => {
207                let prev_source = std::mem::replace(&mut data.source, specs_update.source.clone());
208                let prev_lcut = data.values.time;
209
210                let mut temp = next_values;
211                std::mem::swap(&mut data.values, &mut temp);
212                data.next_values = Some(temp);
213
214                data.time_received_at = Some(now);
215                data.decompression_dict = decompression_dict;
216                data.source_api = source_api;
217                Ok((prev_source, prev_lcut, data.values.time))
218            }
219            None => {
220                log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
221                Err(StatsigErr::LockFailure(
222                    "Failed to acquire write lock: Failed to lock data".to_string(),
223                ))
224            }
225        }
226    }
227
228    fn ops_stats_log_no_update(&self, source: SpecsSource, source_api: Option<String>) {
229        log_d!(TAG, "No Updates");
230        self.ops_stats.log(ObservabilityEvent::new_event(
231            MetricType::Increment,
232            "config_no_update".to_string(),
233            1.0,
234            Some(HashMap::from([
235                ("source".to_string(), source.to_string()),
236                (
237                    "spec_source_api".to_string(),
238                    source_api.unwrap_or_default(),
239                ),
240            ])),
241        ));
242    }
243
244    fn ops_stats_log_config_propagation_diff(
245        &self,
246        lcut: u64,
247        prev_lcut: u64,
248        source: &SpecsSource,
249        prev_source: &SpecsSource,
250        source_api: Option<String>,
251        dict_id: Option<String>,
252    ) {
253        let delay = Utc::now().timestamp_millis() as u64 - lcut;
254        log_d!(TAG, "Updated ({:?})", source);
255
256        if *prev_source == SpecsSource::Uninitialized || *prev_source == SpecsSource::Loading {
257            return;
258        }
259
260        self.ops_stats.log(ObservabilityEvent::new_event(
261            MetricType::Dist,
262            "config_propagation_diff".to_string(),
263            delay as f64,
264            Some(HashMap::from([
265                ("source".to_string(), source.to_string()),
266                ("lcut".to_string(), lcut.to_string()),
267                ("prev_lcut".to_string(), prev_lcut.to_string()),
268                (
269                    "spec_source_api".to_string(),
270                    source_api.unwrap_or_default(),
271                ),
272                ("dict_id".to_string(), dict_id.unwrap_or_default())
273            ])),
274        ));
275    }
276
277    fn try_update_global_configs(&self, dcs: &SpecsResponseFull) {
278        if let Some(diagnostics) = &dcs.diagnostics {
279            self.global_configs
280                .set_diagnostics_sampling_rates(diagnostics.clone());
281        }
282
283        if let Some(sdk_configs) = &dcs.sdk_configs {
284            self.global_configs.set_sdk_configs(sdk_configs.clone());
285        }
286    }
287
288    fn try_update_data_store(&self, source: &SpecsSource, data: Vec<u8>, now: u64) {
289        if source != &SpecsSource::Network {
290            return;
291        }
292
293        let data_store = match &self.data_store {
294            Some(data_store) => data_store.clone(),
295            None => return,
296        };
297
298        let hashed_key = self.hashed_sdk_key.clone();
299
300        let spawn_result = self.statsig_runtime.spawn(
301            "spec_store_update_data_store",
302            move |_shutdown_notif| async move {
303                let data_string = match String::from_utf8(data) {
304                    Ok(s) => s,
305                    Err(e) => {
306                        log_e!(TAG, "Failed to convert data to string: {}", e);
307                        return;
308                    }
309                };
310
311                let _ = data_store
312                    .set(
313                        &get_data_adapter_dcs_key(&hashed_key),
314                        &data_string,
315                        Some(now),
316                    )
317                    .await;
318            },
319        );
320
321        if let Err(e) = spawn_result {
322            log_e!(
323                TAG,
324                "Failed to spawn spec store update data store task: {e}"
325            );
326        }
327    }
328
329    fn are_current_values_newer(&self, next_values: &SpecsResponseFull) -> bool {
330        let data = match self.data.try_read_for(Duration::from_secs(5)) {
331            Some(data) => data,
332            None => {
333                log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
334                return false;
335            }
336        };
337
338        let curr_values = &data.values;
339        let curr_checksum = curr_values.checksum.as_deref().unwrap_or_default();
340        let new_checksum = next_values.checksum.as_deref().unwrap_or_default();
341
342        let cached_time_is_newer = curr_values.time > 0 && curr_values.time > next_values.time;
343        let checksums_match = !curr_checksum.is_empty() && curr_checksum == new_checksum;
344
345        if cached_time_is_newer || checksums_match {
346            log_d!(
347                TAG,
348                "Received values for [time: {}, checksum: {}], but currently has values for [time: {}, checksum: {}]. Ignoring values.",
349                next_values.time,
350                new_checksum,
351                curr_values.time,
352                curr_checksum,
353                );
354            return true;
355        }
356
357        false
358    }
359}
360
361// -------------------------------------------------------------------------------------------- [Impl SpecsUpdateListener]
362
363impl SpecsUpdateListener for SpecStore {
364    fn did_receive_specs_update(&self, update: SpecsUpdate) -> Result<(), StatsigErr> {
365        self.set_values(update)
366    }
367
368    fn get_current_specs_info(&self) -> SpecsInfo {
369        match self.data.try_read_for(Duration::from_secs(5)) {
370            Some(data) => SpecsInfo {
371                lcut: Some(data.values.time),
372                checksum: data.values.checksum.clone(),
373                zstd_dict_id: data
374                    .decompression_dict
375                    .as_ref()
376                    .map(|d| d.get_dict_id().to_string()),
377                source: data.source.clone(),
378                source_api: data.source_api.clone(),
379            },
380            None => {
381                log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
382                SpecsInfo {
383                    lcut: None,
384                    checksum: None,
385                    zstd_dict_id: None,
386                    source: SpecsSource::Error,
387                    source_api: None,
388                }
389            }
390        }
391    }
392}
393
394// -------------------------------------------------------------------------------------------- [Impl IdListsUpdateListener]
395
396impl IdListsUpdateListener for SpecStore {
397    fn get_current_id_list_metadata(
398        &self,
399    ) -> HashMap<String, crate::id_lists_adapter::IdListMetadata> {
400        match self.data.try_read_for(Duration::from_secs(5)) {
401            Some(data) => data
402                .id_lists
403                .iter()
404                .map(|(key, list)| (key.clone(), list.metadata.clone()))
405                .collect(),
406            None => {
407                log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
408                HashMap::new()
409            }
410        }
411    }
412
413    fn did_receive_id_list_updates(
414        &self,
415        updates: HashMap<String, crate::id_lists_adapter::IdListUpdate>,
416    ) {
417        let mut data = match self.data.try_write_for(Duration::from_secs(5)) {
418            Some(data) => data,
419            None => {
420                log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
421                return;
422            }
423        };
424
425        // delete any id_lists that are not in the updates
426        data.id_lists.retain(|name, _| updates.contains_key(name));
427
428        for (list_name, update) in updates {
429            if let Some(entry) = data.id_lists.get_mut(&list_name) {
430                // update existing
431                entry.apply_update(&update);
432            } else {
433                // add new
434                let mut list = IdList::new(update.new_metadata.clone());
435                list.apply_update(&update);
436                data.id_lists.insert(list_name, list);
437            }
438        }
439    }
440}