1use crate::compression::zstd_decompression_dict::DictionaryDecoder;
2use crate::data_store_interface::{get_data_adapter_dcs_key, DataStoreTrait};
3use crate::global_configs::GlobalConfigs;
4use crate::id_lists_adapter::{IdList, IdListsUpdateListener};
5use crate::observability::observability_client_adapter::{MetricType, ObservabilityEvent};
6use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
7use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
8use crate::specs_response::spec_types::{SpecsResponseFull, SpecsResponseNoUpdates};
9use crate::specs_response::spec_types_encoded::DecodedSpecsResponse;
10use crate::utils::maybe_trim_malloc;
11use crate::{
12 log_d, log_e, log_error_to_statsig_and_console, SpecsInfo, SpecsSource, SpecsUpdate,
13 SpecsUpdateListener, StatsigErr, StatsigRuntime,
14};
15use chrono::Utc;
16use parking_lot::RwLock;
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Duration;
20
21pub struct SpecStoreData {
22 pub source: SpecsSource,
23 pub source_api: Option<String>,
24 pub time_received_at: Option<u64>,
25 pub values: SpecsResponseFull,
26 pub next_values: Option<SpecsResponseFull>,
27 pub decompression_dict: Option<DictionaryDecoder>,
28 pub id_lists: HashMap<String, IdList>,
29}
30
31const TAG: &str = stringify!(SpecStore);
32
33pub struct SpecStore {
34 pub data: Arc<RwLock<SpecStoreData>>,
35
36 hashed_sdk_key: String,
37 data_store: Option<Arc<dyn DataStoreTrait>>,
38 statsig_runtime: Arc<StatsigRuntime>,
39 ops_stats: Arc<OpsStatsForInstance>,
40 global_configs: Arc<GlobalConfigs>,
41}
42
43impl SpecStore {
44 #[must_use]
45 pub fn new(
46 sdk_key: &str,
47 hashed_sdk_key: String,
48 statsig_runtime: Arc<StatsigRuntime>,
49 data_store: Option<Arc<dyn DataStoreTrait>>,
50 ) -> SpecStore {
51 SpecStore {
52 hashed_sdk_key,
53 data: Arc::new(RwLock::new(SpecStoreData {
54 values: SpecsResponseFull::default(),
55 next_values: Some(SpecsResponseFull::default()),
56 time_received_at: None,
57 source: SpecsSource::Uninitialized,
58 source_api: None,
59 decompression_dict: None,
60 id_lists: HashMap::new(),
61 })),
62 data_store,
63 statsig_runtime,
64 ops_stats: OPS_STATS.get_for_instance(sdk_key),
65 global_configs: GlobalConfigs::get_instance(sdk_key),
66 }
67 }
68
69 pub fn set_source(&self, source: SpecsSource) {
70 match self.data.try_write_for(Duration::from_secs(5)) {
71 Some(mut data) => {
72 data.source = source;
73 log_d!(TAG, "Source Changed ({:?})", data.source);
74 }
75 None => {
76 log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
77 }
78 }
79 }
80
81 pub fn get_current_values(&self) -> Option<SpecsResponseFull> {
82 let data = match self.data.try_read_for(Duration::from_secs(5)) {
83 Some(data) => data,
84 None => {
85 log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
86 return None;
87 }
88 };
89 let json = serde_json::to_string(&data.values).ok()?;
90 serde_json::from_str::<SpecsResponseFull>(&json).ok()
91 }
92
93 pub fn set_values(&self, specs_update: SpecsUpdate) -> Result<(), StatsigErr> {
94 let (mut next_values, decompression_dict) =
95 match self.data.try_write_for(Duration::from_secs(5)) {
96 Some(mut data) => (
97 data.next_values.take().unwrap_or_default(),
98 data.decompression_dict.clone(),
99 ),
100 None => {
101 log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
102 return Err(StatsigErr::LockFailure(
103 "Failed to acquire write lock: Failed to lock data".to_string(),
104 ));
105 }
106 };
107
108 let decompression_dict =
109 match self.parse_specs_response(&specs_update, &mut next_values, decompression_dict) {
110 Ok(Some(full)) => full,
111 Ok(None) => {
112 self.ops_stats_log_no_update(specs_update.source, specs_update.source_api);
113 return Ok(());
114 }
115 Err(e) => {
116 return Err(e);
117 }
118 };
119 let decompression_dict_id = decompression_dict.as_ref().map(|d|d.get_dict_id().to_string());
120 if self.are_current_values_newer(&next_values) {
121 return Ok(());
122 }
123
124 self.try_update_global_configs(&next_values);
125
126 let now = Utc::now().timestamp_millis() as u64;
127 let (prev_source, prev_lcut, curr_values_time) = self.swap_current_with_next(
128 next_values,
129 &specs_update,
130 decompression_dict,
131 now,
132 specs_update.source_api.clone(),
133 )?;
134
135 self.try_update_data_store(&specs_update.source, specs_update.data, now);
136 self.ops_stats_log_config_propagation_diff(
137 curr_values_time,
138 prev_lcut,
139 &specs_update.source,
140 &prev_source,
141 specs_update.source_api,
142 decompression_dict_id
143 );
144
145 maybe_trim_malloc();
149
150 Ok(())
151 }
152}
153
154impl SpecStore {
157 fn parse_specs_response(
158 &self,
159 values: &SpecsUpdate,
160 next_values: &mut SpecsResponseFull,
161 decompression_dict: Option<DictionaryDecoder>,
162 ) -> Result<Option<Option<DictionaryDecoder>>, StatsigErr> {
163 let full_update_decoder_result = DecodedSpecsResponse::from_slice(
164 &values.data,
165 next_values,
166 decompression_dict.as_ref(),
167 );
168
169 if let Ok(result) = full_update_decoder_result {
170 if next_values.has_updates {
171 return Ok(Some(result));
172 }
173
174 return Ok(None);
175 }
176
177 let mut next_no_updates = SpecsResponseNoUpdates { has_updates: false };
178 let no_updates_decoder_result = DecodedSpecsResponse::from_slice(
179 &values.data,
180 &mut next_no_updates,
181 decompression_dict.as_ref(),
182 );
183
184 if no_updates_decoder_result.is_ok() && !next_no_updates.has_updates {
185 return Ok(None);
186 }
187
188 let error = full_update_decoder_result.err().map_or_else(
189 || StatsigErr::JsonParseError("SpecsResponse".to_string(), "Unknown error".to_string()),
190 |e| StatsigErr::JsonParseError("SpecsResponse".to_string(), e.to_string()),
191 );
192
193 log_error_to_statsig_and_console!(self.ops_stats, TAG, error);
194 Err(error)
195 }
196
197 fn swap_current_with_next(
198 &self,
199 next_values: SpecsResponseFull,
200 specs_update: &SpecsUpdate,
201 decompression_dict: Option<DictionaryDecoder>,
202 now: u64,
203 source_api: Option<String>,
204 ) -> Result<(SpecsSource, u64, u64), StatsigErr> {
205 match self.data.try_write_for(Duration::from_secs(5)) {
206 Some(mut data) => {
207 let prev_source = std::mem::replace(&mut data.source, specs_update.source.clone());
208 let prev_lcut = data.values.time;
209
210 let mut temp = next_values;
211 std::mem::swap(&mut data.values, &mut temp);
212 data.next_values = Some(temp);
213
214 data.time_received_at = Some(now);
215 data.decompression_dict = decompression_dict;
216 data.source_api = source_api;
217 Ok((prev_source, prev_lcut, data.values.time))
218 }
219 None => {
220 log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
221 Err(StatsigErr::LockFailure(
222 "Failed to acquire write lock: Failed to lock data".to_string(),
223 ))
224 }
225 }
226 }
227
228 fn ops_stats_log_no_update(&self, source: SpecsSource, source_api: Option<String>) {
229 log_d!(TAG, "No Updates");
230 self.ops_stats.log(ObservabilityEvent::new_event(
231 MetricType::Increment,
232 "config_no_update".to_string(),
233 1.0,
234 Some(HashMap::from([
235 ("source".to_string(), source.to_string()),
236 (
237 "spec_source_api".to_string(),
238 source_api.unwrap_or_default(),
239 ),
240 ])),
241 ));
242 }
243
244 fn ops_stats_log_config_propagation_diff(
245 &self,
246 lcut: u64,
247 prev_lcut: u64,
248 source: &SpecsSource,
249 prev_source: &SpecsSource,
250 source_api: Option<String>,
251 dict_id: Option<String>,
252 ) {
253 let delay = Utc::now().timestamp_millis() as u64 - lcut;
254 log_d!(TAG, "Updated ({:?})", source);
255
256 if *prev_source == SpecsSource::Uninitialized || *prev_source == SpecsSource::Loading {
257 return;
258 }
259
260 self.ops_stats.log(ObservabilityEvent::new_event(
261 MetricType::Dist,
262 "config_propagation_diff".to_string(),
263 delay as f64,
264 Some(HashMap::from([
265 ("source".to_string(), source.to_string()),
266 ("lcut".to_string(), lcut.to_string()),
267 ("prev_lcut".to_string(), prev_lcut.to_string()),
268 (
269 "spec_source_api".to_string(),
270 source_api.unwrap_or_default(),
271 ),
272 ("dict_id".to_string(), dict_id.unwrap_or_default())
273 ])),
274 ));
275 }
276
277 fn try_update_global_configs(&self, dcs: &SpecsResponseFull) {
278 if let Some(diagnostics) = &dcs.diagnostics {
279 self.global_configs
280 .set_diagnostics_sampling_rates(diagnostics.clone());
281 }
282
283 if let Some(sdk_configs) = &dcs.sdk_configs {
284 self.global_configs.set_sdk_configs(sdk_configs.clone());
285 }
286 }
287
288 fn try_update_data_store(&self, source: &SpecsSource, data: Vec<u8>, now: u64) {
289 if source != &SpecsSource::Network {
290 return;
291 }
292
293 let data_store = match &self.data_store {
294 Some(data_store) => data_store.clone(),
295 None => return,
296 };
297
298 let hashed_key = self.hashed_sdk_key.clone();
299
300 let spawn_result = self.statsig_runtime.spawn(
301 "spec_store_update_data_store",
302 move |_shutdown_notif| async move {
303 let data_string = match String::from_utf8(data) {
304 Ok(s) => s,
305 Err(e) => {
306 log_e!(TAG, "Failed to convert data to string: {}", e);
307 return;
308 }
309 };
310
311 let _ = data_store
312 .set(
313 &get_data_adapter_dcs_key(&hashed_key),
314 &data_string,
315 Some(now),
316 )
317 .await;
318 },
319 );
320
321 if let Err(e) = spawn_result {
322 log_e!(
323 TAG,
324 "Failed to spawn spec store update data store task: {e}"
325 );
326 }
327 }
328
329 fn are_current_values_newer(&self, next_values: &SpecsResponseFull) -> bool {
330 let data = match self.data.try_read_for(Duration::from_secs(5)) {
331 Some(data) => data,
332 None => {
333 log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
334 return false;
335 }
336 };
337
338 let curr_values = &data.values;
339 let curr_checksum = curr_values.checksum.as_deref().unwrap_or_default();
340 let new_checksum = next_values.checksum.as_deref().unwrap_or_default();
341
342 let cached_time_is_newer = curr_values.time > 0 && curr_values.time > next_values.time;
343 let checksums_match = !curr_checksum.is_empty() && curr_checksum == new_checksum;
344
345 if cached_time_is_newer || checksums_match {
346 log_d!(
347 TAG,
348 "Received values for [time: {}, checksum: {}], but currently has values for [time: {}, checksum: {}]. Ignoring values.",
349 next_values.time,
350 new_checksum,
351 curr_values.time,
352 curr_checksum,
353 );
354 return true;
355 }
356
357 false
358 }
359}
360
361impl SpecsUpdateListener for SpecStore {
364 fn did_receive_specs_update(&self, update: SpecsUpdate) -> Result<(), StatsigErr> {
365 self.set_values(update)
366 }
367
368 fn get_current_specs_info(&self) -> SpecsInfo {
369 match self.data.try_read_for(Duration::from_secs(5)) {
370 Some(data) => SpecsInfo {
371 lcut: Some(data.values.time),
372 checksum: data.values.checksum.clone(),
373 zstd_dict_id: data
374 .decompression_dict
375 .as_ref()
376 .map(|d| d.get_dict_id().to_string()),
377 source: data.source.clone(),
378 source_api: data.source_api.clone(),
379 },
380 None => {
381 log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
382 SpecsInfo {
383 lcut: None,
384 checksum: None,
385 zstd_dict_id: None,
386 source: SpecsSource::Error,
387 source_api: None,
388 }
389 }
390 }
391 }
392}
393
394impl IdListsUpdateListener for SpecStore {
397 fn get_current_id_list_metadata(
398 &self,
399 ) -> HashMap<String, crate::id_lists_adapter::IdListMetadata> {
400 match self.data.try_read_for(Duration::from_secs(5)) {
401 Some(data) => data
402 .id_lists
403 .iter()
404 .map(|(key, list)| (key.clone(), list.metadata.clone()))
405 .collect(),
406 None => {
407 log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
408 HashMap::new()
409 }
410 }
411 }
412
413 fn did_receive_id_list_updates(
414 &self,
415 updates: HashMap<String, crate::id_lists_adapter::IdListUpdate>,
416 ) {
417 let mut data = match self.data.try_write_for(Duration::from_secs(5)) {
418 Some(data) => data,
419 None => {
420 log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
421 return;
422 }
423 };
424
425 data.id_lists.retain(|name, _| updates.contains_key(name));
427
428 for (list_name, update) in updates {
429 if let Some(entry) = data.id_lists.get_mut(&list_name) {
430 entry.apply_update(&update);
432 } else {
433 let mut list = IdList::new(update.new_metadata.clone());
435 list.apply_update(&update);
436 data.id_lists.insert(list_name, list);
437 }
438 }
439 }
440}