1use crate::compression::zstd_decompression_dict::DictionaryDecoder;
2use crate::data_store_interface::{get_data_adapter_dcs_key, DataStoreTrait};
3use crate::global_configs::GlobalConfigs;
4use crate::id_lists_adapter::{IdList, IdListsUpdateListener};
5use crate::observability::observability_client_adapter::{MetricType, ObservabilityEvent};
6use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
7use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
8use crate::specs_response::spec_types::{SpecsResponseFull, SpecsResponseNoUpdates};
9use crate::specs_response::spec_types_encoded::DecodedSpecsResponse;
10use crate::utils::maybe_trim_malloc;
11use crate::{
12 log_d, log_e, log_error_to_statsig_and_console, SpecsInfo, SpecsSource, SpecsUpdate,
13 SpecsUpdateListener, StatsigErr, StatsigRuntime,
14};
15use chrono::Utc;
16use std::collections::HashMap;
17use std::sync::{Arc, RwLock};
18
19pub struct SpecStoreData {
20 pub source: SpecsSource,
21 pub source_api: Option<String>,
22 pub time_received_at: Option<u64>,
23 pub values: SpecsResponseFull,
24 pub next_values: Option<SpecsResponseFull>,
25 pub decompression_dict: Option<DictionaryDecoder>,
26 pub id_lists: HashMap<String, IdList>,
27}
28
29const TAG: &str = stringify!(SpecStore);
30
31pub struct SpecStore {
32 pub data: Arc<RwLock<SpecStoreData>>,
33
34 hashed_sdk_key: String,
35 data_store: Option<Arc<dyn DataStoreTrait>>,
36 statsig_runtime: Arc<StatsigRuntime>,
37 ops_stats: Arc<OpsStatsForInstance>,
38 global_configs: Arc<GlobalConfigs>,
39}
40
41impl SpecStore {
42 #[must_use]
43 pub fn new(
44 sdk_key: &str,
45 hashed_sdk_key: String,
46 statsig_runtime: Arc<StatsigRuntime>,
47 data_store: Option<Arc<dyn DataStoreTrait>>,
48 ) -> SpecStore {
49 SpecStore {
50 hashed_sdk_key,
51 data: Arc::new(RwLock::new(SpecStoreData {
52 values: SpecsResponseFull::default(),
53 next_values: Some(SpecsResponseFull::default()),
54 time_received_at: None,
55 source: SpecsSource::Uninitialized,
56 source_api: None,
57 decompression_dict: None,
58 id_lists: HashMap::new(),
59 })),
60 data_store,
61 statsig_runtime,
62 ops_stats: OPS_STATS.get_for_instance(sdk_key),
63 global_configs: GlobalConfigs::get_instance(sdk_key),
64 }
65 }
66
67 pub fn set_source(&self, source: SpecsSource) {
68 if let Ok(mut mut_values) = self.data.write() {
69 mut_values.source = source;
70 log_d!(TAG, "Source Changed ({:?})", mut_values.source);
71 }
72 }
73
74 pub fn get_current_values(&self) -> Option<SpecsResponseFull> {
75 let data = self.data.read().ok()?;
76 let json = serde_json::to_string(&data.values).ok()?;
77 serde_json::from_str::<SpecsResponseFull>(&json).ok()
78 }
79
80 pub fn set_values(&self, specs_update: SpecsUpdate) -> Result<(), StatsigErr> {
81 let (mut next_values, decompression_dict) = match self.data.write() {
82 Ok(mut data) => (
83 data.next_values.take().unwrap_or_default(),
84 data.decompression_dict.clone(),
85 ),
86 Err(e) => {
87 log_e!(TAG, "Failed to acquire write lock: {}", e);
88 return Err(StatsigErr::LockFailure(e.to_string()));
89 }
90 };
91
92 let decompression_dict =
93 match self.parse_specs_response(&specs_update, &mut next_values, decompression_dict) {
94 Ok(Some(full)) => full,
95 Ok(None) => {
96 self.ops_stats_log_no_update(specs_update.source, specs_update.source_api);
97 return Ok(());
98 }
99 Err(e) => {
100 return Err(e);
101 }
102 };
103
104 if self.are_current_values_newer(&next_values) {
105 return Ok(());
106 }
107
108 self.try_update_global_configs(&next_values);
109
110 let now = Utc::now().timestamp_millis() as u64;
111 let (prev_source, prev_lcut, curr_values_time) = self.swap_current_with_next(
112 next_values,
113 &specs_update,
114 decompression_dict,
115 now,
116 specs_update.source_api.clone(),
117 )?;
118
119 self.try_update_data_store(&specs_update.source, specs_update.data, now);
120 self.ops_stats_log_config_propagation_diff(
121 curr_values_time,
122 prev_lcut,
123 &specs_update.source,
124 &prev_source,
125 specs_update.source_api,
126 );
127
128 maybe_trim_malloc();
132
133 Ok(())
134 }
135}
136
137impl SpecStore {
140 fn parse_specs_response(
141 &self,
142 values: &SpecsUpdate,
143 next_values: &mut SpecsResponseFull,
144 decompression_dict: Option<DictionaryDecoder>,
145 ) -> Result<Option<Option<DictionaryDecoder>>, StatsigErr> {
146 let full_update_decoder_result = DecodedSpecsResponse::from_slice(
147 &values.data,
148 next_values,
149 decompression_dict.as_ref(),
150 );
151
152 if let Ok(result) = full_update_decoder_result {
153 if next_values.has_updates {
154 return Ok(Some(result));
155 }
156
157 return Ok(None);
158 }
159
160 let mut next_no_updates = SpecsResponseNoUpdates { has_updates: false };
161 let no_updates_decoder_result = DecodedSpecsResponse::from_slice(
162 &values.data,
163 &mut next_no_updates,
164 decompression_dict.as_ref(),
165 );
166
167 if no_updates_decoder_result.is_ok() && !next_no_updates.has_updates {
168 return Ok(None);
169 }
170
171 let error = full_update_decoder_result.err().map_or_else(
172 || StatsigErr::JsonParseError("SpecsResponse".to_string(), "Unknown error".to_string()),
173 |e| StatsigErr::JsonParseError("SpecsResponse".to_string(), e.to_string()),
174 );
175
176 log_error_to_statsig_and_console!(self.ops_stats, TAG, error);
177 Err(error)
178 }
179
180 fn swap_current_with_next(
181 &self,
182 next_values: SpecsResponseFull,
183 specs_update: &SpecsUpdate,
184 decompression_dict: Option<DictionaryDecoder>,
185 now: u64,
186 source_api: Option<String>,
187 ) -> Result<(SpecsSource, u64, u64), StatsigErr> {
188 match self.data.write() {
189 Ok(mut data) => {
190 let prev_source = std::mem::replace(&mut data.source, specs_update.source.clone());
191 let prev_lcut = data.values.time;
192
193 let mut temp = next_values;
194 std::mem::swap(&mut data.values, &mut temp);
195 data.next_values = Some(temp);
196
197 data.time_received_at = Some(now);
198 data.decompression_dict = decompression_dict;
199 data.source_api = source_api;
200 Ok((prev_source, prev_lcut, data.values.time))
201 }
202 Err(e) => {
203 log_e!(TAG, "Failed to acquire write lock: {}", e);
204 Err(StatsigErr::LockFailure(e.to_string()))
205 }
206 }
207 }
208
209 fn ops_stats_log_no_update(&self, source: SpecsSource, source_api: Option<String>) {
210 log_d!(TAG, "No Updates");
211 self.ops_stats.log(ObservabilityEvent::new_event(
212 MetricType::Increment,
213 "config_no_update".to_string(),
214 1.0,
215 Some(HashMap::from([
216 ("source".to_string(), source.to_string()),
217 (
218 "spec_source_api".to_string(),
219 source_api.unwrap_or_default(),
220 ),
221 ])),
222 ));
223 }
224
225 fn ops_stats_log_config_propagation_diff(
226 &self,
227 lcut: u64,
228 prev_lcut: u64,
229 source: &SpecsSource,
230 prev_source: &SpecsSource,
231 source_api: Option<String>,
232 ) {
233 let delay = Utc::now().timestamp_millis() as u64 - lcut;
234 log_d!(TAG, "Updated ({:?})", source);
235
236 if *prev_source == SpecsSource::Uninitialized || *prev_source == SpecsSource::Loading {
237 return;
238 }
239
240 self.ops_stats.log(ObservabilityEvent::new_event(
241 MetricType::Dist,
242 "config_propagation_diff".to_string(),
243 delay as f64,
244 Some(HashMap::from([
245 ("source".to_string(), source.to_string()),
246 ("lcut".to_string(), lcut.to_string()),
247 ("prev_lcut".to_string(), prev_lcut.to_string()),
248 (
249 "spec_source_api".to_string(),
250 source_api.unwrap_or_default(),
251 ),
252 ])),
253 ));
254 }
255
256 fn try_update_global_configs(&self, dcs: &SpecsResponseFull) {
257 if let Some(diagnostics) = &dcs.diagnostics {
258 self.global_configs
259 .set_diagnostics_sampling_rates(diagnostics.clone());
260 }
261
262 if let Some(sdk_configs) = &dcs.sdk_configs {
263 self.global_configs.set_sdk_configs(sdk_configs.clone());
264 }
265 }
266
267 fn try_update_data_store(&self, source: &SpecsSource, data: Vec<u8>, now: u64) {
268 if source != &SpecsSource::Network {
269 return;
270 }
271
272 let data_store = match &self.data_store {
273 Some(data_store) => data_store.clone(),
274 None => return,
275 };
276
277 let hashed_key = self.hashed_sdk_key.clone();
278
279 self.statsig_runtime.spawn(
280 "spec_store_update_data_store",
281 move |_shutdown_notif| async move {
282 let data_string = match String::from_utf8(data) {
283 Ok(s) => s,
284 Err(e) => {
285 log_e!(TAG, "Failed to convert data to string: {}", e);
286 return;
287 }
288 };
289
290 let _ = data_store
291 .set(
292 &get_data_adapter_dcs_key(&hashed_key),
293 &data_string,
294 Some(now),
295 )
296 .await;
297 },
298 );
299 }
300
301 fn are_current_values_newer(&self, next_values: &SpecsResponseFull) -> bool {
302 let data = match self.data.read() {
303 Ok(data) => data,
304 Err(e) => {
305 log_e!(TAG, "Failed to acquire read lock: {}", e);
306 return false;
307 }
308 };
309
310 let curr_values = &data.values;
311 let curr_checksum = curr_values.checksum.as_deref().unwrap_or_default();
312 let new_checksum = next_values.checksum.as_deref().unwrap_or_default();
313
314 let cached_time_is_newer = curr_values.time > 0 && curr_values.time > next_values.time;
315 let checksums_match = !curr_checksum.is_empty() && curr_checksum == new_checksum;
316
317 if cached_time_is_newer || checksums_match {
318 log_d!(
319 TAG,
320 "Received values for [time: {}, checksum: {}], but currently has values for [time: {}, checksum: {}]. Ignoring values.",
321 next_values.time,
322 new_checksum,
323 curr_values.time,
324 curr_checksum,
325 );
326 return true;
327 }
328
329 false
330 }
331}
332
333impl SpecsUpdateListener for SpecStore {
336 fn did_receive_specs_update(&self, update: SpecsUpdate) -> Result<(), StatsigErr> {
337 self.set_values(update)
338 }
339
340 fn get_current_specs_info(&self) -> SpecsInfo {
341 match self.data.read() {
342 Ok(data) => SpecsInfo {
343 lcut: Some(data.values.time),
344 checksum: data.values.checksum.clone(),
345 zstd_dict_id: data
346 .decompression_dict
347 .as_ref()
348 .map(|d| d.get_dict_id().to_string()),
349 source: data.source.clone(),
350 source_api: data.source_api.clone(),
351 },
352 Err(e) => {
353 log_e!(TAG, "Failed to acquire read lock: {}", e);
354 SpecsInfo {
355 lcut: None,
356 checksum: None,
357 zstd_dict_id: None,
358 source: SpecsSource::Error,
359 source_api: None,
360 }
361 }
362 }
363 }
364}
365
366impl IdListsUpdateListener for SpecStore {
369 fn get_current_id_list_metadata(
370 &self,
371 ) -> HashMap<String, crate::id_lists_adapter::IdListMetadata> {
372 match self.data.read() {
373 Ok(data) => data
374 .id_lists
375 .iter()
376 .map(|(key, list)| (key.clone(), list.metadata.clone()))
377 .collect(),
378 Err(e) => {
379 log_e!(TAG, "Failed to acquire read lock: {}", e);
380 HashMap::new()
381 }
382 }
383 }
384
385 fn did_receive_id_list_updates(
386 &self,
387 updates: HashMap<String, crate::id_lists_adapter::IdListUpdate>,
388 ) {
389 let mut data = match self.data.write() {
390 Ok(data) => data,
391 Err(e) => {
392 log_e!(TAG, "Failed to acquire write lock: {}", e);
393 return;
394 }
395 };
396
397 data.id_lists.retain(|name, _| updates.contains_key(name));
399
400 for (list_name, update) in updates {
401 if let Some(entry) = data.id_lists.get_mut(&list_name) {
402 entry.apply_update(&update);
404 } else {
405 let mut list = IdList::new(update.new_metadata.clone());
407 list.apply_update(&update);
408 data.id_lists.insert(list_name, list);
409 }
410 }
411 }
412}