1use crate::compression::zstd_decompression_dict::DictionaryDecoder;
2use crate::data_store_interface::{get_data_adapter_dcs_key, DataStoreTrait};
3use crate::global_configs::GlobalConfigs;
4use crate::id_lists_adapter::{IdList, IdListsUpdateListener};
5use crate::observability::observability_client_adapter::{MetricType, ObservabilityEvent};
6use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
7use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
8use crate::specs_response::spec_types::{SpecsResponseFull, SpecsResponseNoUpdates};
9use crate::specs_response::spec_types_encoded::DecodedSpecsResponse;
10use crate::utils::maybe_trim_malloc;
11use crate::{
12 log_d, log_e, log_error_to_statsig_and_console, SpecsInfo, SpecsSource, SpecsUpdate,
13 SpecsUpdateListener, StatsigErr, StatsigRuntime,
14};
15use chrono::Utc;
16use std::collections::HashMap;
17use std::sync::{Arc, RwLock};
18
19pub struct SpecStoreData {
20 pub source: SpecsSource,
21 pub time_received_at: Option<u64>,
22 pub values: SpecsResponseFull,
23 pub next_values: Option<SpecsResponseFull>,
24 pub decompression_dict: Option<DictionaryDecoder>,
25 pub id_lists: HashMap<String, IdList>,
26}
27
28const TAG: &str = stringify!(SpecStore);
29
30pub struct SpecStore {
31 pub data: Arc<RwLock<SpecStoreData>>,
32
33 hashed_sdk_key: String,
34 data_store: Option<Arc<dyn DataStoreTrait>>,
35 statsig_runtime: Arc<StatsigRuntime>,
36 ops_stats: Arc<OpsStatsForInstance>,
37 global_configs: Arc<GlobalConfigs>,
38}
39
40impl SpecStore {
41 #[must_use]
42 pub fn new(
43 sdk_key: &str,
44 hashed_sdk_key: String,
45 statsig_runtime: Arc<StatsigRuntime>,
46 data_store: Option<Arc<dyn DataStoreTrait>>,
47 ) -> SpecStore {
48 SpecStore {
49 hashed_sdk_key,
50 data: Arc::new(RwLock::new(SpecStoreData {
51 values: SpecsResponseFull::default(),
52 next_values: Some(SpecsResponseFull::default()),
53 time_received_at: None,
54 source: SpecsSource::Uninitialized,
55 decompression_dict: None,
56 id_lists: HashMap::new(),
57 })),
58 data_store,
59 statsig_runtime,
60 ops_stats: OPS_STATS.get_for_instance(sdk_key),
61 global_configs: GlobalConfigs::get_instance(sdk_key),
62 }
63 }
64
65 pub fn set_source(&self, source: SpecsSource) {
66 if let Ok(mut mut_values) = self.data.write() {
67 mut_values.source = source;
68 log_d!(TAG, "Source Changed ({:?})", mut_values.source);
69 }
70 }
71
72 pub fn get_current_values(&self) -> Option<SpecsResponseFull> {
73 let data = self.data.read().ok()?;
74 let json = serde_json::to_string(&data.values).ok()?;
75 serde_json::from_str::<SpecsResponseFull>(&json).ok()
76 }
77
78 pub fn set_values(&self, specs_update: SpecsUpdate) -> Result<(), StatsigErr> {
79 let (mut next_values, decompression_dict) = match self.data.write() {
80 Ok(mut data) => (
81 data.next_values.take().unwrap_or_default(),
82 data.decompression_dict.clone(),
83 ),
84 Err(e) => {
85 log_e!(TAG, "Failed to acquire write lock: {}", e);
86 return Err(StatsigErr::LockFailure(e.to_string()));
87 }
88 };
89
90 let decompression_dict =
91 match self.parse_specs_response(&specs_update, &mut next_values, decompression_dict) {
92 Ok(Some(full)) => full,
93 Ok(None) => {
94 self.ops_stats_log_no_update(specs_update.source);
95 return Ok(());
96 }
97 Err(e) => {
98 return Err(e);
99 }
100 };
101
102 if self.are_current_values_newer(&next_values) {
103 return Ok(());
104 }
105
106 self.try_update_global_configs(&next_values);
107
108 let now = Utc::now().timestamp_millis() as u64;
109 let (prev_source, prev_lcut, curr_values_time) =
110 self.swap_current_with_next(next_values, &specs_update, decompression_dict, now)?;
111
112 self.try_update_data_store(&specs_update.source, specs_update.data, now);
113 self.ops_stats_log_config_propagation_diff(
114 curr_values_time,
115 prev_lcut,
116 &specs_update.source,
117 &prev_source,
118 );
119
120 maybe_trim_malloc();
124
125 Ok(())
126 }
127}
128
129impl SpecStore {
132 fn parse_specs_response(
133 &self,
134 values: &SpecsUpdate,
135 next_values: &mut SpecsResponseFull,
136 decompression_dict: Option<DictionaryDecoder>,
137 ) -> Result<Option<Option<DictionaryDecoder>>, StatsigErr> {
138 let full_update_decoder_result = DecodedSpecsResponse::from_slice(
139 &values.data,
140 next_values,
141 decompression_dict.as_ref(),
142 );
143
144 if let Ok(result) = full_update_decoder_result {
145 if next_values.has_updates {
146 return Ok(Some(result));
147 }
148
149 return Ok(None);
150 }
151
152 let mut next_no_updates = SpecsResponseNoUpdates { has_updates: false };
153 let no_updates_decoder_result = DecodedSpecsResponse::from_slice(
154 &values.data,
155 &mut next_no_updates,
156 decompression_dict.as_ref(),
157 );
158
159 if no_updates_decoder_result.is_ok() && !next_no_updates.has_updates {
160 return Ok(None);
161 }
162
163 let error = full_update_decoder_result.err().map_or_else(
164 || StatsigErr::JsonParseError("SpecsResponse".to_string(), "Unknown error".to_string()),
165 |e| StatsigErr::JsonParseError("SpecsResponse".to_string(), e.to_string()),
166 );
167
168 log_error_to_statsig_and_console!(self.ops_stats, TAG, error);
169 Err(error)
170 }
171
172 fn swap_current_with_next(
173 &self,
174 next_values: SpecsResponseFull,
175 specs_update: &SpecsUpdate,
176 decompression_dict: Option<DictionaryDecoder>,
177 now: u64,
178 ) -> Result<(SpecsSource, u64, u64), StatsigErr> {
179 match self.data.write() {
180 Ok(mut data) => {
181 let prev_source = std::mem::replace(&mut data.source, specs_update.source.clone());
182 let prev_lcut = data.values.time;
183
184 let mut temp = next_values;
185 std::mem::swap(&mut data.values, &mut temp);
186 data.next_values = Some(temp);
187
188 data.time_received_at = Some(now);
189 data.decompression_dict = decompression_dict;
190
191 Ok((prev_source, prev_lcut, data.values.time))
192 }
193 Err(e) => {
194 log_e!(TAG, "Failed to acquire write lock: {}", e);
195 Err(StatsigErr::LockFailure(e.to_string()))
196 }
197 }
198 }
199
200 fn ops_stats_log_no_update(&self, source: SpecsSource) {
201 log_d!(TAG, "No Updates");
202 self.ops_stats.log(ObservabilityEvent::new_event(
203 MetricType::Increment,
204 "config_no_update".to_string(),
205 1.0,
206 Some(HashMap::from([("source".to_string(), source.to_string())])),
207 ));
208 }
209
210 fn ops_stats_log_config_propagation_diff(
211 &self,
212 lcut: u64,
213 prev_lcut: u64,
214 source: &SpecsSource,
215 prev_source: &SpecsSource,
216 ) {
217 let delay = Utc::now().timestamp_millis() as u64 - lcut;
218 log_d!(TAG, "Updated ({:?})", source);
219
220 if *prev_source == SpecsSource::Uninitialized || *prev_source == SpecsSource::Loading {
221 return;
222 }
223
224 self.ops_stats.log(ObservabilityEvent::new_event(
225 MetricType::Dist,
226 "config_propagation_diff".to_string(),
227 delay as f64,
228 Some(HashMap::from([
229 ("source".to_string(), source.to_string()),
230 ("lcut".to_string(), lcut.to_string()),
231 ("prev_lcut".to_string(), prev_lcut.to_string()),
232 ])),
233 ));
234 }
235
236 fn try_update_global_configs(&self, dcs: &SpecsResponseFull) {
237 if let Some(diagnostics) = &dcs.diagnostics {
238 self.global_configs
239 .set_diagnostics_sampling_rates(diagnostics.clone());
240 }
241
242 if let Some(sdk_configs) = &dcs.sdk_configs {
243 self.global_configs.set_sdk_configs(sdk_configs.clone());
244 }
245 }
246
247 fn try_update_data_store(&self, source: &SpecsSource, data: Vec<u8>, now: u64) {
248 if *source != SpecsSource::Network {
249 return;
250 }
251
252 let data_store = match &self.data_store {
253 Some(data_store) => data_store.clone(),
254 None => return,
255 };
256
257 let hashed_key = self.hashed_sdk_key.clone();
258
259 self.statsig_runtime.spawn(
260 "spec_store_update_data_store",
261 move |_shutdown_notif| async move {
262 let data_string = match String::from_utf8(data) {
263 Ok(s) => s,
264 Err(e) => {
265 log_e!(TAG, "Failed to convert data to string: {}", e);
266 return;
267 }
268 };
269
270 let _ = data_store
271 .set(
272 &get_data_adapter_dcs_key(&hashed_key),
273 &data_string,
274 Some(now),
275 )
276 .await;
277 },
278 );
279 }
280
281 fn are_current_values_newer(&self, next_values: &SpecsResponseFull) -> bool {
282 let data = match self.data.read() {
283 Ok(data) => data,
284 Err(e) => {
285 log_e!(TAG, "Failed to acquire read lock: {}", e);
286 return false;
287 }
288 };
289
290 let curr_values = &data.values;
291 let curr_checksum = curr_values.checksum.as_deref().unwrap_or_default();
292 let new_checksum = next_values.checksum.as_deref().unwrap_or_default();
293
294 let cached_time_is_newer = curr_values.time > 0 && curr_values.time > next_values.time;
295 let checksums_match = !curr_checksum.is_empty() && curr_checksum == new_checksum;
296
297 if cached_time_is_newer || checksums_match {
298 log_d!(
299 TAG,
300 "Received values for [time: {}, checksum: {}], but currently has values for [time: {}, checksum: {}]. Ignoring values.",
301 next_values.time,
302 new_checksum,
303 curr_values.time,
304 curr_checksum,
305 );
306 return true;
307 }
308
309 false
310 }
311}
312
313impl SpecsUpdateListener for SpecStore {
316 fn did_receive_specs_update(&self, update: SpecsUpdate) -> Result<(), StatsigErr> {
317 self.set_values(update)
318 }
319
320 fn get_current_specs_info(&self) -> SpecsInfo {
321 match self.data.read() {
322 Ok(data) => SpecsInfo {
323 lcut: Some(data.values.time),
324 checksum: data.values.checksum.clone(),
325 zstd_dict_id: data
326 .decompression_dict
327 .as_ref()
328 .map(|d| d.get_dict_id().to_string()),
329 source: data.source.clone(),
330 },
331 Err(e) => {
332 log_e!(TAG, "Failed to acquire read lock: {}", e);
333 SpecsInfo {
334 lcut: None,
335 checksum: None,
336 zstd_dict_id: None,
337 source: SpecsSource::Error,
338 }
339 }
340 }
341 }
342}
343
344impl IdListsUpdateListener for SpecStore {
347 fn get_current_id_list_metadata(
348 &self,
349 ) -> HashMap<String, crate::id_lists_adapter::IdListMetadata> {
350 match self.data.read() {
351 Ok(data) => data
352 .id_lists
353 .iter()
354 .map(|(key, list)| (key.clone(), list.metadata.clone()))
355 .collect(),
356 Err(e) => {
357 log_e!(TAG, "Failed to acquire read lock: {}", e);
358 HashMap::new()
359 }
360 }
361 }
362
363 fn did_receive_id_list_updates(
364 &self,
365 updates: HashMap<String, crate::id_lists_adapter::IdListUpdate>,
366 ) {
367 let mut data = match self.data.write() {
368 Ok(data) => data,
369 Err(e) => {
370 log_e!(TAG, "Failed to acquire write lock: {}", e);
371 return;
372 }
373 };
374
375 data.id_lists.retain(|name, _| updates.contains_key(name));
377
378 for (list_name, update) in updates {
379 if let Some(entry) = data.id_lists.get_mut(&list_name) {
380 entry.apply_update(&update);
382 } else {
383 let mut list = IdList::new(update.new_metadata.clone());
385 list.apply_update(&update);
386 data.id_lists.insert(list_name, list);
387 }
388 }
389 }
390}