1use crate::data_store_interface::{get_data_adapter_dcs_key, DataStoreTrait};
2use crate::global_configs::GlobalConfigs;
3use crate::id_lists_adapter::{IdList, IdListsUpdateListener};
4use crate::observability::observability_client_adapter::{MetricType, ObservabilityEvent};
5use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
6use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
7use crate::specs_response::spec_types::{SpecsResponseFull, SpecsResponseNoUpdates};
8use crate::utils::maybe_trim_malloc;
9use crate::{
10 log_d, log_e, log_error_to_statsig_and_console, SpecsInfo, SpecsSource, SpecsUpdate,
11 SpecsUpdateListener, StatsigErr, StatsigRuntime,
12};
13use chrono::Utc;
14use parking_lot::RwLock;
15use serde::Deserialize;
16use std::collections::HashMap;
17use std::sync::Arc;
18use std::time::Duration;
19
20pub struct SpecStoreData {
21 pub source: SpecsSource,
22 pub source_api: Option<String>,
23 pub time_received_at: Option<u64>,
24 pub values: SpecsResponseFull,
25 pub next_values: Option<SpecsResponseFull>,
26 pub id_lists: HashMap<String, IdList>,
27}
28
29const TAG: &str = stringify!(SpecStore);
30
31pub struct SpecStore {
32 pub data: Arc<RwLock<SpecStoreData>>,
33
34 hashed_sdk_key: String,
35 data_store: Option<Arc<dyn DataStoreTrait>>,
36 statsig_runtime: Arc<StatsigRuntime>,
37 ops_stats: Arc<OpsStatsForInstance>,
38 global_configs: Arc<GlobalConfigs>,
39}
40
41impl SpecStore {
42 #[must_use]
43 pub fn new(
44 sdk_key: &str,
45 hashed_sdk_key: String,
46 statsig_runtime: Arc<StatsigRuntime>,
47 data_store: Option<Arc<dyn DataStoreTrait>>,
48 ) -> SpecStore {
49 SpecStore {
50 hashed_sdk_key,
51 data: Arc::new(RwLock::new(SpecStoreData {
52 values: SpecsResponseFull::default(),
53 next_values: Some(SpecsResponseFull::default()),
54 time_received_at: None,
55 source: SpecsSource::Uninitialized,
56 source_api: None,
57 id_lists: HashMap::new(),
58 })),
59 data_store,
60 statsig_runtime,
61 ops_stats: OPS_STATS.get_for_instance(sdk_key),
62 global_configs: GlobalConfigs::get_instance(sdk_key),
63 }
64 }
65
66 pub fn set_source(&self, source: SpecsSource) {
67 match self.data.try_write_for(Duration::from_secs(5)) {
68 Some(mut data) => {
69 data.source = source;
70 log_d!(TAG, "Source Changed ({:?})", data.source);
71 }
72 None => {
73 log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
74 }
75 }
76 }
77
78 pub fn get_current_values(&self) -> Option<SpecsResponseFull> {
79 let data = match self.data.try_read_for(Duration::from_secs(5)) {
80 Some(data) => data,
81 None => {
82 log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
83 return None;
84 }
85 };
86 let json = serde_json::to_string(&data.values).ok()?;
87 serde_json::from_str::<SpecsResponseFull>(&json).ok()
88 }
89
90 pub fn set_values(&self, specs_update: SpecsUpdate) -> Result<(), StatsigErr> {
91 let mut next_values = match self.data.try_write_for(Duration::from_secs(5)) {
92 Some(mut data) => data.next_values.take().unwrap_or_default(),
93 None => {
94 log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
95 return Err(StatsigErr::LockFailure(
96 "Failed to acquire write lock: Failed to lock data".to_string(),
97 ));
98 }
99 };
100
101 match self.parse_specs_response(&specs_update, &mut next_values) {
102 Ok(ParseResult::HasUpdates) => (),
103 Ok(ParseResult::NoUpdates) => {
104 self.ops_stats_log_no_update(specs_update.source, specs_update.source_api);
105 return Ok(());
106 }
107 Err(e) => {
108 return Err(e);
109 }
110 };
111
112 if self.are_current_values_newer(&next_values) {
113 return Ok(());
114 }
115
116 self.try_update_global_configs(&next_values);
117
118 let now = Utc::now().timestamp_millis() as u64;
119 let (prev_source, prev_lcut, curr_values_time) = self.swap_current_with_next(
120 next_values,
121 &specs_update,
122 now,
123 specs_update.source_api.clone(),
124 )?;
125
126 self.try_update_data_store(&specs_update.source, specs_update.data, now);
127 self.ops_stats_log_config_propagation_diff(
128 curr_values_time,
129 prev_lcut,
130 &specs_update.source,
131 &prev_source,
132 specs_update.source_api,
133 );
134
135 maybe_trim_malloc();
139
140 Ok(())
141 }
142}
143
144impl SpecStore {
147 fn parse_specs_response(
148 &self,
149 values: &SpecsUpdate,
150 next_values: &mut SpecsResponseFull,
151 ) -> Result<ParseResult, StatsigErr> {
152 let mut deserializer = serde_json::Deserializer::from_slice(&values.data);
153 let parse_result = SpecsResponseFull::deserialize_in_place(&mut deserializer, next_values);
154
155 if parse_result.is_ok() && next_values.has_updates {
156 return Ok(ParseResult::HasUpdates);
157 }
158
159 let no_updates_result = serde_json::from_slice::<SpecsResponseNoUpdates>(&values.data);
160 if let Ok(result) = no_updates_result {
161 if !result.has_updates {
162 return Ok(ParseResult::NoUpdates);
163 }
164 }
165
166 let error = parse_result.err().map_or_else(
167 || StatsigErr::JsonParseError("SpecsResponse".to_string(), "Unknown error".to_string()),
168 |e| StatsigErr::JsonParseError("SpecsResponse".to_string(), e.to_string()),
169 );
170
171 log_error_to_statsig_and_console!(self.ops_stats, TAG, error);
172 Err(error)
173 }
174
175 fn swap_current_with_next(
176 &self,
177 next_values: SpecsResponseFull,
178 specs_update: &SpecsUpdate,
179 now: u64,
180 source_api: Option<String>,
181 ) -> Result<(SpecsSource, u64, u64), StatsigErr> {
182 match self.data.try_write_for(Duration::from_secs(5)) {
183 Some(mut data) => {
184 let prev_source = std::mem::replace(&mut data.source, specs_update.source.clone());
185 let prev_lcut = data.values.time;
186
187 let mut temp = next_values;
188 std::mem::swap(&mut data.values, &mut temp);
189 data.next_values = Some(temp);
190
191 data.time_received_at = Some(now);
192 data.source_api = source_api;
193 Ok((prev_source, prev_lcut, data.values.time))
194 }
195 None => {
196 log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
197 Err(StatsigErr::LockFailure(
198 "Failed to acquire write lock: Failed to lock data".to_string(),
199 ))
200 }
201 }
202 }
203
204 fn ops_stats_log_no_update(&self, source: SpecsSource, source_api: Option<String>) {
205 log_d!(TAG, "No Updates");
206 self.ops_stats.log(ObservabilityEvent::new_event(
207 MetricType::Increment,
208 "config_no_update".to_string(),
209 1.0,
210 Some(HashMap::from([
211 ("source".to_string(), source.to_string()),
212 (
213 "spec_source_api".to_string(),
214 source_api.unwrap_or_default(),
215 ),
216 ])),
217 ));
218 }
219
220 fn ops_stats_log_config_propagation_diff(
221 &self,
222 lcut: u64,
223 prev_lcut: u64,
224 source: &SpecsSource,
225 prev_source: &SpecsSource,
226 source_api: Option<String>,
227 ) {
228 let delay = Utc::now().timestamp_millis() as u64 - lcut;
229 log_d!(TAG, "Updated ({:?})", source);
230
231 if *prev_source == SpecsSource::Uninitialized || *prev_source == SpecsSource::Loading {
232 return;
233 }
234
235 self.ops_stats.log(ObservabilityEvent::new_event(
236 MetricType::Dist,
237 "config_propagation_diff".to_string(),
238 delay as f64,
239 Some(HashMap::from([
240 ("source".to_string(), source.to_string()),
241 ("lcut".to_string(), lcut.to_string()),
242 ("prev_lcut".to_string(), prev_lcut.to_string()),
243 (
244 "spec_source_api".to_string(),
245 source_api.unwrap_or_default(),
246 ),
247 ])),
248 ));
249 }
250
251 fn try_update_global_configs(&self, dcs: &SpecsResponseFull) {
252 if let Some(diagnostics) = &dcs.diagnostics {
253 self.global_configs
254 .set_diagnostics_sampling_rates(diagnostics.clone());
255 }
256
257 if let Some(sdk_configs) = &dcs.sdk_configs {
258 self.global_configs.set_sdk_configs(sdk_configs.clone());
259 }
260 }
261
262 fn try_update_data_store(&self, source: &SpecsSource, data: Vec<u8>, now: u64) {
263 if source != &SpecsSource::Network {
264 return;
265 }
266
267 let data_store = match &self.data_store {
268 Some(data_store) => data_store.clone(),
269 None => return,
270 };
271
272 let hashed_key = self.hashed_sdk_key.clone();
273
274 let spawn_result = self.statsig_runtime.spawn(
275 "spec_store_update_data_store",
276 move |_shutdown_notif| async move {
277 let data_string = match String::from_utf8(data) {
278 Ok(s) => s,
279 Err(e) => {
280 log_e!(TAG, "Failed to convert data to string: {}", e);
281 return;
282 }
283 };
284
285 let _ = data_store
286 .set(
287 &get_data_adapter_dcs_key(&hashed_key),
288 &data_string,
289 Some(now),
290 )
291 .await;
292 },
293 );
294
295 if let Err(e) = spawn_result {
296 log_e!(
297 TAG,
298 "Failed to spawn spec store update data store task: {e}"
299 );
300 }
301 }
302
303 fn are_current_values_newer(&self, next_values: &SpecsResponseFull) -> bool {
304 let data = match self.data.try_read_for(Duration::from_secs(5)) {
305 Some(data) => data,
306 None => {
307 log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
308 return false;
309 }
310 };
311
312 let curr_values = &data.values;
313 let curr_checksum = curr_values.checksum.as_deref().unwrap_or_default();
314 let new_checksum = next_values.checksum.as_deref().unwrap_or_default();
315
316 let cached_time_is_newer = curr_values.time > 0 && curr_values.time > next_values.time;
317 let checksums_match = !curr_checksum.is_empty() && curr_checksum == new_checksum;
318
319 if cached_time_is_newer || checksums_match {
320 log_d!(
321 TAG,
322 "Received values for [time: {}, checksum: {}], but currently has values for [time: {}, checksum: {}]. Ignoring values.",
323 next_values.time,
324 new_checksum,
325 curr_values.time,
326 curr_checksum,
327 );
328 return true;
329 }
330
331 false
332 }
333}
334
335impl SpecsUpdateListener for SpecStore {
338 fn did_receive_specs_update(&self, update: SpecsUpdate) -> Result<(), StatsigErr> {
339 self.set_values(update)
340 }
341
342 fn get_current_specs_info(&self) -> SpecsInfo {
343 match self.data.try_read_for(Duration::from_secs(5)) {
344 Some(data) => SpecsInfo {
345 lcut: Some(data.values.time),
346 checksum: data.values.checksum.clone(),
347 source: data.source.clone(),
348 source_api: data.source_api.clone(),
349 },
350 None => {
351 log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
352 SpecsInfo {
353 lcut: None,
354 checksum: None,
355 source: SpecsSource::Error,
356 source_api: None,
357 }
358 }
359 }
360 }
361}
362
363impl IdListsUpdateListener for SpecStore {
366 fn get_current_id_list_metadata(
367 &self,
368 ) -> HashMap<String, crate::id_lists_adapter::IdListMetadata> {
369 match self.data.try_read_for(Duration::from_secs(5)) {
370 Some(data) => data
371 .id_lists
372 .iter()
373 .map(|(key, list)| (key.clone(), list.metadata.clone()))
374 .collect(),
375 None => {
376 log_e!(TAG, "Failed to acquire read lock: Failed to lock data");
377 HashMap::new()
378 }
379 }
380 }
381
382 fn did_receive_id_list_updates(
383 &self,
384 updates: HashMap<String, crate::id_lists_adapter::IdListUpdate>,
385 ) {
386 let mut data = match self.data.try_write_for(Duration::from_secs(5)) {
387 Some(data) => data,
388 None => {
389 log_e!(TAG, "Failed to acquire write lock: Failed to lock data");
390 return;
391 }
392 };
393
394 data.id_lists.retain(|name, _| updates.contains_key(name));
396
397 for (list_name, update) in updates {
398 if let Some(entry) = data.id_lists.get_mut(&list_name) {
399 entry.apply_update(update);
401 } else {
402 let mut list = IdList::new(update.new_metadata.clone());
404 list.apply_update(update);
405 data.id_lists.insert(list_name, list);
406 }
407 }
408 }
409}
410
411enum ParseResult {
412 HasUpdates,
413 NoUpdates,
414}