statsig_rust/
spec_store.rs

1use crate::data_store_interface::{get_data_adapter_dcs_key, DataStoreTrait};
2use crate::global_configs::GlobalConfigs;
3use crate::id_lists_adapter::{IdList, IdListsUpdateListener};
4use crate::observability::observability_client_adapter::{MetricType, ObservabilityEvent};
5use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
6use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
7use crate::specs_response::spec_types::{SpecsResponseFull, SpecsResponseNoUpdates};
8use crate::utils::maybe_trim_malloc;
9use crate::{
10    log_d, log_e, log_error_to_statsig_and_console, SpecsInfo, SpecsSource, SpecsUpdate,
11    SpecsUpdateListener, StatsigErr, StatsigRuntime,
12};
13use chrono::Utc;
14use parking_lot::RwLock;
15use serde::Deserialize;
16use std::collections::HashMap;
17use std::sync::Arc;
18use std::time::Duration;
19
20pub struct SpecStoreData {
21    pub source: SpecsSource,
22    pub source_api: Option<String>,
23    pub time_received_at: Option<u64>,
24    pub values: SpecsResponseFull,
25    pub next_values: Option<SpecsResponseFull>,
26    pub id_lists: HashMap<String, IdList>,
27}
28
29const TAG: &str = stringify!(SpecStore);
30
31pub struct SpecStore {
32    pub data: Arc<RwLock<SpecStoreData>>,
33
34    hashed_sdk_key: String,
35    data_store: Option<Arc<dyn DataStoreTrait>>,
36    statsig_runtime: Arc<StatsigRuntime>,
37    ops_stats: Arc<OpsStatsForInstance>,
38    global_configs: Arc<GlobalConfigs>,
39}
40
41impl SpecStore {
42    #[must_use]
43    pub fn new(
44        sdk_key: &str,
45        hashed_sdk_key: String,
46        statsig_runtime: Arc<StatsigRuntime>,
47        data_store: Option<Arc<dyn DataStoreTrait>>,
48    ) -> SpecStore {
49        SpecStore {
50            hashed_sdk_key,
51            data: Arc::new(RwLock::new(SpecStoreData {
52                values: SpecsResponseFull::default(),
53                next_values: Some(SpecsResponseFull::default()),
54                time_received_at: None,
55                source: SpecsSource::Uninitialized,
56                source_api: None,
57                id_lists: HashMap::new(),
58            })),
59            data_store,
60            statsig_runtime,
61            ops_stats: OPS_STATS.get_for_instance(sdk_key),
62            global_configs: GlobalConfigs::get_instance(sdk_key),
63        }
64    }
65
66    pub fn set_source(&self, source: SpecsSource) {
67        match self.data.try_write_for(Duration::from_secs(5)) {
68            Some(mut data) => {
69                data.source = source;
70                log_d!(TAG, "Source Changed ({:?})", data.source);
71            }
72            None => {
73                log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
74            }
75        }
76    }
77
78    pub fn get_current_values(&self) -> Option<SpecsResponseFull> {
79        let data = match self.data.try_read_for(Duration::from_secs(5)) {
80            Some(data) => data,
81            None => {
82                log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
83                return None;
84            }
85        };
86        let json = serde_json::to_string(&data.values).ok()?;
87        serde_json::from_str::<SpecsResponseFull>(&json).ok()
88    }
89
90    pub fn set_values(&self, specs_update: SpecsUpdate) -> Result<(), StatsigErr> {
91        let mut next_values = match self.data.try_write_for(Duration::from_secs(5)) {
92            Some(mut data) => data.next_values.take().unwrap_or_default(),
93            None => {
94                log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
95                return Err(StatsigErr::LockFailure(
96                    "Failed to acquire write lock: Failed to lock data".to_string(),
97                ));
98            }
99        };
100
101        match self.parse_specs_response(&specs_update, &mut next_values) {
102            Ok(ParseResult::HasUpdates) => (),
103            Ok(ParseResult::NoUpdates) => {
104                self.ops_stats_log_no_update(specs_update.source, specs_update.source_api);
105                return Ok(());
106            }
107            Err(e) => {
108                return Err(e);
109            }
110        };
111
112        if self.are_current_values_newer(&next_values) {
113            return Ok(());
114        }
115
116        self.try_update_global_configs(&next_values);
117
118        let now = Utc::now().timestamp_millis() as u64;
119        let (prev_source, prev_lcut, curr_values_time) = self.swap_current_with_next(
120            next_values,
121            &specs_update,
122            now,
123            specs_update.source_api.clone(),
124        )?;
125
126        self.try_update_data_store(&specs_update.source, specs_update.data, now);
127        self.ops_stats_log_config_propagation_diff(
128            curr_values_time,
129            prev_lcut,
130            &specs_update.source,
131            &prev_source,
132            specs_update.source_api,
133        );
134
135        // Glibc requested more memory than needed when deserializing a big json blob
136        // And memory allocator fails to return it.
137        // To prevent service from OOMing, manually unused heap memory.
138        maybe_trim_malloc();
139
140        Ok(())
141    }
142}
143
144// -------------------------------------------------------------------------------------------- [Private Functions]
145
146impl SpecStore {
147    fn parse_specs_response(
148        &self,
149        values: &SpecsUpdate,
150        next_values: &mut SpecsResponseFull,
151    ) -> Result<ParseResult, StatsigErr> {
152        let mut deserializer = serde_json::Deserializer::from_slice(&values.data);
153        let parse_result = SpecsResponseFull::deserialize_in_place(&mut deserializer, next_values);
154
155        if parse_result.is_ok() && next_values.has_updates {
156            return Ok(ParseResult::HasUpdates);
157        }
158
159        let no_updates_result = serde_json::from_slice::<SpecsResponseNoUpdates>(&values.data);
160        if let Ok(result) = no_updates_result {
161            if !result.has_updates {
162                return Ok(ParseResult::NoUpdates);
163            }
164        }
165
166        let error = parse_result.err().map_or_else(
167            || StatsigErr::JsonParseError("SpecsResponse".to_string(), "Unknown error".to_string()),
168            |e| StatsigErr::JsonParseError("SpecsResponse".to_string(), e.to_string()),
169        );
170
171        log_error_to_statsig_and_console!(self.ops_stats, TAG, error);
172        Err(error)
173    }
174
175    fn swap_current_with_next(
176        &self,
177        next_values: SpecsResponseFull,
178        specs_update: &SpecsUpdate,
179        now: u64,
180        source_api: Option<String>,
181    ) -> Result<(SpecsSource, u64, u64), StatsigErr> {
182        match self.data.try_write_for(Duration::from_secs(5)) {
183            Some(mut data) => {
184                let prev_source = std::mem::replace(&mut data.source, specs_update.source.clone());
185                let prev_lcut = data.values.time;
186
187                let mut temp = next_values;
188                std::mem::swap(&mut data.values, &mut temp);
189                data.next_values = Some(temp);
190
191                data.time_received_at = Some(now);
192                data.source_api = source_api;
193                Ok((prev_source, prev_lcut, data.values.time))
194            }
195            None => {
196                log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
197                Err(StatsigErr::LockFailure(
198                    "Failed to acquire write lock: Failed to lock data".to_string(),
199                ))
200            }
201        }
202    }
203
204    fn ops_stats_log_no_update(&self, source: SpecsSource, source_api: Option<String>) {
205        log_d!(TAG, "No Updates");
206        self.ops_stats.log(ObservabilityEvent::new_event(
207            MetricType::Increment,
208            "config_no_update".to_string(),
209            1.0,
210            Some(HashMap::from([
211                ("source".to_string(), source.to_string()),
212                (
213                    "spec_source_api".to_string(),
214                    source_api.unwrap_or_default(),
215                ),
216            ])),
217        ));
218    }
219
220    fn ops_stats_log_config_propagation_diff(
221        &self,
222        lcut: u64,
223        prev_lcut: u64,
224        source: &SpecsSource,
225        prev_source: &SpecsSource,
226        source_api: Option<String>,
227    ) {
228        let delay = Utc::now().timestamp_millis() as u64 - lcut;
229        log_d!(TAG, "Updated ({:?})", source);
230
231        if *prev_source == SpecsSource::Uninitialized || *prev_source == SpecsSource::Loading {
232            return;
233        }
234
235        self.ops_stats.log(ObservabilityEvent::new_event(
236            MetricType::Dist,
237            "config_propagation_diff".to_string(),
238            delay as f64,
239            Some(HashMap::from([
240                ("source".to_string(), source.to_string()),
241                ("lcut".to_string(), lcut.to_string()),
242                ("prev_lcut".to_string(), prev_lcut.to_string()),
243                (
244                    "spec_source_api".to_string(),
245                    source_api.unwrap_or_default(),
246                ),
247            ])),
248        ));
249    }
250
251    fn try_update_global_configs(&self, dcs: &SpecsResponseFull) {
252        if let Some(diagnostics) = &dcs.diagnostics {
253            self.global_configs
254                .set_diagnostics_sampling_rates(diagnostics.clone());
255        }
256
257        if let Some(sdk_configs) = &dcs.sdk_configs {
258            self.global_configs.set_sdk_configs(sdk_configs.clone());
259        }
260    }
261
262    fn try_update_data_store(&self, source: &SpecsSource, data: Vec<u8>, now: u64) {
263        if source != &SpecsSource::Network {
264            return;
265        }
266
267        let data_store = match &self.data_store {
268            Some(data_store) => data_store.clone(),
269            None => return,
270        };
271
272        let hashed_key = self.hashed_sdk_key.clone();
273
274        let spawn_result = self.statsig_runtime.spawn(
275            "spec_store_update_data_store",
276            move |_shutdown_notif| async move {
277                let data_string = match String::from_utf8(data) {
278                    Ok(s) => s,
279                    Err(e) => {
280                        log_e!(TAG, "Failed to convert data to string: {}", e);
281                        return;
282                    }
283                };
284
285                let _ = data_store
286                    .set(
287                        &get_data_adapter_dcs_key(&hashed_key),
288                        &data_string,
289                        Some(now),
290                    )
291                    .await;
292            },
293        );
294
295        if let Err(e) = spawn_result {
296            log_e!(
297                TAG,
298                "Failed to spawn spec store update data store task: {e}"
299            );
300        }
301    }
302
303    fn are_current_values_newer(&self, next_values: &SpecsResponseFull) -> bool {
304        let data = match self.data.try_read_for(Duration::from_secs(5)) {
305            Some(data) => data,
306            None => {
307                log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
308                return false;
309            }
310        };
311
312        let curr_values = &data.values;
313        let curr_checksum = curr_values.checksum.as_deref().unwrap_or_default();
314        let new_checksum = next_values.checksum.as_deref().unwrap_or_default();
315
316        let cached_time_is_newer = curr_values.time > 0 && curr_values.time > next_values.time;
317        let checksums_match = !curr_checksum.is_empty() && curr_checksum == new_checksum;
318
319        if cached_time_is_newer || checksums_match {
320            log_d!(
321                TAG,
322                "Received values for [time: {}, checksum: {}], but currently has values for [time: {}, checksum: {}]. Ignoring values.",
323                next_values.time,
324                new_checksum,
325                curr_values.time,
326                curr_checksum,
327                );
328            return true;
329        }
330
331        false
332    }
333}
334
335// -------------------------------------------------------------------------------------------- [Impl SpecsUpdateListener]
336
337impl SpecsUpdateListener for SpecStore {
338    fn did_receive_specs_update(&self, update: SpecsUpdate) -> Result<(), StatsigErr> {
339        self.set_values(update)
340    }
341
342    fn get_current_specs_info(&self) -> SpecsInfo {
343        match self.data.try_read_for(Duration::from_secs(5)) {
344            Some(data) => SpecsInfo {
345                lcut: Some(data.values.time),
346                checksum: data.values.checksum.clone(),
347                source: data.source.clone(),
348                source_api: data.source_api.clone(),
349            },
350            None => {
351                log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
352                SpecsInfo {
353                    lcut: None,
354                    checksum: None,
355                    source: SpecsSource::Error,
356                    source_api: None,
357                }
358            }
359        }
360    }
361}
362
363// -------------------------------------------------------------------------------------------- [Impl IdListsUpdateListener]
364
365impl IdListsUpdateListener for SpecStore {
366    fn get_current_id_list_metadata(
367        &self,
368    ) -> HashMap<String, crate::id_lists_adapter::IdListMetadata> {
369        match self.data.try_read_for(Duration::from_secs(5)) {
370            Some(data) => data
371                .id_lists
372                .iter()
373                .map(|(key, list)| (key.clone(), list.metadata.clone()))
374                .collect(),
375            None => {
376                log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
377                HashMap::new()
378            }
379        }
380    }
381
382    fn did_receive_id_list_updates(
383        &self,
384        updates: HashMap<String, crate::id_lists_adapter::IdListUpdate>,
385    ) {
386        let mut data = match self.data.try_write_for(Duration::from_secs(5)) {
387            Some(data) => data,
388            None => {
389                log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
390                return;
391            }
392        };
393
394        // delete any id_lists that are not in the updates
395        data.id_lists.retain(|name, _| updates.contains_key(name));
396
397        for (list_name, update) in updates {
398            if let Some(entry) = data.id_lists.get_mut(&list_name) {
399                // update existing
400                entry.apply_update(update);
401            } else {
402                // add new
403                let mut list = IdList::new(update.new_metadata.clone());
404                list.apply_update(update);
405                data.id_lists.insert(list_name, list);
406            }
407        }
408    }
409}
410
411enum ParseResult {
412    HasUpdates,
413    NoUpdates,
414}