statsig_rust/
spec_store.rs

1use crate::data_store_interface::{get_data_adapter_dcs_key, DataStoreTrait};
2use crate::global_configs::GlobalConfigs;
3use crate::id_lists_adapter::{IdList, IdListsUpdateListener};
4use crate::networking::ResponseData;
5use crate::observability::observability_client_adapter::{MetricType, ObservabilityEvent};
6use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
7use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
8use crate::sdk_event_emitter::{SdkEvent, SdkEventEmitter};
9use crate::specs_response::spec_types::{SpecsResponseFull, SpecsResponseNoUpdates};
10use crate::utils::maybe_trim_malloc;
11use crate::{
12    log_d, log_e, log_error_to_statsig_and_console, SpecsInfo, SpecsSource, SpecsUpdate,
13    SpecsUpdateListener, StatsigErr, StatsigRuntime,
14};
15use chrono::Utc;
16use parking_lot::RwLock;
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Duration;
20
21pub struct SpecStoreData {
22    pub source: SpecsSource,
23    pub source_api: Option<String>,
24    pub time_received_at: Option<u64>,
25    pub values: SpecsResponseFull,
26    pub next_values: Option<SpecsResponseFull>,
27    pub id_lists: HashMap<String, IdList>,
28}
29
30const TAG: &str = stringify!(SpecStore);
31
32pub struct SpecStore {
33    pub data: Arc<RwLock<SpecStoreData>>,
34
35    hashed_sdk_key: String,
36    data_store: Option<Arc<dyn DataStoreTrait>>,
37    statsig_runtime: Arc<StatsigRuntime>,
38    ops_stats: Arc<OpsStatsForInstance>,
39    global_configs: Arc<GlobalConfigs>,
40    event_emitter: Arc<SdkEventEmitter>,
41}
42
43impl SpecStore {
44    #[must_use]
45    pub fn new(
46        sdk_key: &str,
47        hashed_sdk_key: String,
48        statsig_runtime: Arc<StatsigRuntime>,
49        event_emitter: Arc<SdkEventEmitter>,
50        data_store: Option<Arc<dyn DataStoreTrait>>,
51    ) -> SpecStore {
52        SpecStore {
53            hashed_sdk_key,
54            data: Arc::new(RwLock::new(SpecStoreData {
55                values: SpecsResponseFull::default(),
56                next_values: Some(SpecsResponseFull::default()),
57                time_received_at: None,
58                source: SpecsSource::Uninitialized,
59                source_api: None,
60                id_lists: HashMap::new(),
61            })),
62            event_emitter,
63            data_store,
64            statsig_runtime,
65            ops_stats: OPS_STATS.get_for_instance(sdk_key),
66            global_configs: GlobalConfigs::get_instance(sdk_key),
67        }
68    }
69
70    pub fn set_source(&self, source: SpecsSource) {
71        match self.data.try_write_for(Duration::from_secs(5)) {
72            Some(mut data) => {
73                data.source = source;
74                log_d!(TAG, "Source Changed ({:?})", data.source);
75            }
76            None => {
77                log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
78            }
79        }
80    }
81
82    pub fn get_current_values(&self) -> Option<SpecsResponseFull> {
83        let data = match self.data.try_read_for(Duration::from_secs(5)) {
84            Some(data) => data,
85            None => {
86                log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
87                return None;
88            }
89        };
90        let json = serde_json::to_string(&data.values).ok()?;
91        serde_json::from_str::<SpecsResponseFull>(&json).ok()
92    }
93
94    pub fn set_values(&self, specs_update: SpecsUpdate) -> Result<(), StatsigErr> {
95        let mut specs_update = specs_update;
96        let mut next_values = match self.data.try_write_for(Duration::from_secs(5)) {
97            Some(mut data) => data.next_values.take().unwrap_or_default(),
98            None => {
99                log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
100                return Err(StatsigErr::LockFailure(
101                    "Failed to acquire write lock: Failed to lock data".to_string(),
102                ));
103            }
104        };
105
106        match self.parse_specs_response(&mut specs_update, &mut next_values) {
107            Ok(ParseResult::HasUpdates) => (),
108            Ok(ParseResult::NoUpdates) => {
109                self.ops_stats_log_no_update(specs_update.source, specs_update.source_api);
110                return Ok(());
111            }
112            Err(e) => {
113                return Err(e);
114            }
115        };
116
117        if self.are_current_values_newer(&next_values) {
118            return Ok(());
119        }
120
121        self.try_update_global_configs(&next_values);
122
123        let now = Utc::now().timestamp_millis() as u64;
124        let (prev_source, prev_lcut, curr_values_time) = self.swap_current_with_next(
125            next_values,
126            &specs_update,
127            now,
128            specs_update.source_api.clone(),
129        )?;
130
131        self.try_update_data_store(&specs_update.source, specs_update.data, now);
132        self.ops_stats_log_config_propagation_diff(
133            curr_values_time,
134            prev_lcut,
135            &specs_update.source,
136            &prev_source,
137            specs_update.source_api,
138        );
139
140        // Glibc requested more memory than needed when deserializing a big json blob
141        // And memory allocator fails to return it.
142        // To prevent service from OOMing, manually unused heap memory.
143        maybe_trim_malloc();
144
145        Ok(())
146    }
147}
148
149// -------------------------------------------------------------------------------------------- [Private Functions]
150
151impl SpecStore {
152    fn parse_specs_response(
153        &self,
154        values: &mut SpecsUpdate,
155        next_values: &mut SpecsResponseFull,
156    ) -> Result<ParseResult, StatsigErr> {
157        let parse_result = values.data.deserialize_in_place(next_values);
158
159        if parse_result.is_ok() && next_values.has_updates {
160            return Ok(ParseResult::HasUpdates);
161        }
162
163        let no_updates_result = values.data.deserialize_into::<SpecsResponseNoUpdates>();
164        if let Ok(result) = no_updates_result {
165            if !result.has_updates {
166                return Ok(ParseResult::NoUpdates);
167            }
168        }
169
170        let error = parse_result.err().unwrap_or_else(|| {
171            StatsigErr::JsonParseError("SpecsResponse".to_string(), "Unknown error".to_string())
172        });
173
174        log_error_to_statsig_and_console!(self.ops_stats, TAG, error);
175        Err(error)
176    }
177
178    fn swap_current_with_next(
179        &self,
180        next_values: SpecsResponseFull,
181        specs_update: &SpecsUpdate,
182        now: u64,
183        source_api: Option<String>,
184    ) -> Result<(SpecsSource, u64, u64), StatsigErr> {
185        match self.data.try_write_for(Duration::from_secs(5)) {
186            Some(mut data) => {
187                let prev_source = std::mem::replace(&mut data.source, specs_update.source.clone());
188                let prev_lcut = data.values.time;
189
190                let mut temp = next_values;
191                std::mem::swap(&mut data.values, &mut temp);
192                data.next_values = Some(temp);
193
194                data.time_received_at = Some(now);
195                data.source_api = source_api;
196
197                self.emit_specs_updated_sdk_event(&data.source, &data.source_api, &data.values);
198
199                Ok((prev_source, prev_lcut, data.values.time))
200            }
201            None => {
202                log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
203                Err(StatsigErr::LockFailure(
204                    "Failed to acquire write lock: Failed to lock data".to_string(),
205                ))
206            }
207        }
208    }
209
210    fn emit_specs_updated_sdk_event(
211        &self,
212        source: &SpecsSource,
213        source_api: &Option<String>,
214        values: &SpecsResponseFull,
215    ) {
216        self.event_emitter.emit(SdkEvent::SpecsUpdated {
217            source,
218            source_api,
219            values,
220        });
221    }
222
223    fn ops_stats_log_no_update(&self, source: SpecsSource, source_api: Option<String>) {
224        log_d!(TAG, "No Updates");
225        self.ops_stats.log(ObservabilityEvent::new_event(
226            MetricType::Increment,
227            "config_no_update".to_string(),
228            1.0,
229            Some(HashMap::from([
230                ("source".to_string(), source.to_string()),
231                (
232                    "spec_source_api".to_string(),
233                    source_api.unwrap_or_default(),
234                ),
235            ])),
236        ));
237    }
238
239    fn ops_stats_log_config_propagation_diff(
240        &self,
241        lcut: u64,
242        prev_lcut: u64,
243        source: &SpecsSource,
244        prev_source: &SpecsSource,
245        source_api: Option<String>,
246    ) {
247        let delay = Utc::now().timestamp_millis() as u64 - lcut;
248        log_d!(TAG, "Updated ({:?})", source);
249
250        if *prev_source == SpecsSource::Uninitialized || *prev_source == SpecsSource::Loading {
251            return;
252        }
253
254        self.ops_stats.log(ObservabilityEvent::new_event(
255            MetricType::Dist,
256            "config_propagation_diff".to_string(),
257            delay as f64,
258            Some(HashMap::from([
259                ("source".to_string(), source.to_string()),
260                ("lcut".to_string(), lcut.to_string()),
261                ("prev_lcut".to_string(), prev_lcut.to_string()),
262                (
263                    "spec_source_api".to_string(),
264                    source_api.unwrap_or_default(),
265                ),
266            ])),
267        ));
268    }
269
270    fn try_update_global_configs(&self, dcs: &SpecsResponseFull) {
271        if let Some(diagnostics) = &dcs.diagnostics {
272            self.global_configs
273                .set_diagnostics_sampling_rates(diagnostics.clone());
274        }
275
276        if let Some(sdk_configs) = &dcs.sdk_configs {
277            self.global_configs.set_sdk_configs(sdk_configs.clone());
278        }
279
280        if let Some(sdk_flags) = &dcs.sdk_flags {
281            self.global_configs.set_sdk_flags(sdk_flags.clone());
282        }
283    }
284
285    fn try_update_data_store(&self, source: &SpecsSource, mut data: ResponseData, now: u64) {
286        if source != &SpecsSource::Network {
287            return;
288        }
289
290        let data_store = match &self.data_store {
291            Some(data_store) => data_store.clone(),
292            None => return,
293        };
294
295        let hashed_key = self.hashed_sdk_key.clone();
296
297        let spawn_result = self.statsig_runtime.spawn(
298            "spec_store_update_data_store",
299            move |_shutdown_notif| async move {
300                let data_string = match data.read_to_string() {
301                    Ok(s) => s,
302                    Err(e) => {
303                        log_e!(TAG, "Failed to convert data to string: {}", e);
304                        return;
305                    }
306                };
307
308                let _ = data_store
309                    .set(
310                        &get_data_adapter_dcs_key(&hashed_key),
311                        &data_string,
312                        Some(now),
313                    )
314                    .await;
315            },
316        );
317
318        if let Err(e) = spawn_result {
319            log_e!(
320                TAG,
321                "Failed to spawn spec store update data store task: {e}"
322            );
323        }
324    }
325
326    fn are_current_values_newer(&self, next_values: &SpecsResponseFull) -> bool {
327        let data = match self.data.try_read_for(Duration::from_secs(5)) {
328            Some(data) => data,
329            None => {
330                log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
331                return false;
332            }
333        };
334
335        let curr_values = &data.values;
336        let curr_checksum = curr_values.checksum.as_deref().unwrap_or_default();
337        let new_checksum = next_values.checksum.as_deref().unwrap_or_default();
338
339        let cached_time_is_newer = curr_values.time > 0 && curr_values.time > next_values.time;
340        let checksums_match = !curr_checksum.is_empty() && curr_checksum == new_checksum;
341
342        if cached_time_is_newer || checksums_match {
343            log_d!(
344                TAG,
345                "Received values for [time: {}, checksum: {}], but currently has values for [time: {}, checksum: {}]. Ignoring values.",
346                next_values.time,
347                new_checksum,
348                curr_values.time,
349                curr_checksum,
350                );
351            return true;
352        }
353
354        false
355    }
356}
357
358// -------------------------------------------------------------------------------------------- [Impl SpecsUpdateListener]
359
360impl SpecsUpdateListener for SpecStore {
361    fn did_receive_specs_update(&self, update: SpecsUpdate) -> Result<(), StatsigErr> {
362        self.set_values(update)
363    }
364
365    fn get_current_specs_info(&self) -> SpecsInfo {
366        match self.data.try_read_for(Duration::from_secs(5)) {
367            Some(data) => SpecsInfo {
368                lcut: Some(data.values.time),
369                checksum: data.values.checksum.clone(),
370                source: data.source.clone(),
371                source_api: data.source_api.clone(),
372            },
373            None => {
374                log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
375                SpecsInfo {
376                    lcut: None,
377                    checksum: None,
378                    source: SpecsSource::Error,
379                    source_api: None,
380                }
381            }
382        }
383    }
384}
385
386// -------------------------------------------------------------------------------------------- [Impl IdListsUpdateListener]
387
388impl IdListsUpdateListener for SpecStore {
389    fn get_current_id_list_metadata(
390        &self,
391    ) -> HashMap<String, crate::id_lists_adapter::IdListMetadata> {
392        match self.data.try_read_for(Duration::from_secs(5)) {
393            Some(data) => data
394                .id_lists
395                .iter()
396                .map(|(key, list)| (key.clone(), list.metadata.clone()))
397                .collect(),
398            None => {
399                log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
400                HashMap::new()
401            }
402        }
403    }
404
405    fn did_receive_id_list_updates(
406        &self,
407        updates: HashMap<String, crate::id_lists_adapter::IdListUpdate>,
408    ) {
409        let mut data = match self.data.try_write_for(Duration::from_secs(5)) {
410            Some(data) => data,
411            None => {
412                log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
413                return;
414            }
415        };
416
417        // delete any id_lists that are not in the updates
418        data.id_lists.retain(|name, _| updates.contains_key(name));
419
420        for (list_name, update) in updates {
421            if let Some(entry) = data.id_lists.get_mut(&list_name) {
422                // update existing
423                entry.apply_update(update);
424            } else {
425                // add new
426                let mut list = IdList::new(update.new_metadata.clone());
427                list.apply_update(update);
428                data.id_lists.insert(list_name, list);
429            }
430        }
431    }
432}
433
434enum ParseResult {
435    HasUpdates,
436    NoUpdates,
437}