1use crate::compression::zstd_decompression_dict::DictionaryDecoder;
2use crate::data_store_interface::{get_data_adapter_dcs_key, DataStoreTrait};
3use crate::global_configs::GlobalConfigs;
4use crate::id_lists_adapter::{IdList, IdListsUpdateListener};
5use crate::observability::observability_client_adapter::{MetricType, ObservabilityEvent};
6use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
7use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
8use crate::spec_types::{SpecsResponse, SpecsResponseFull, SpecsResponseNoUpdates};
9use crate::{
10 log_d, log_e, log_error_to_statsig_and_console, SpecsInfo, SpecsSource, SpecsUpdate,
11 SpecsUpdateListener, StatsigErr, StatsigRuntime,
12};
13use chrono::Utc;
14use serde::Serialize;
15use std::collections::HashMap;
16use std::sync::{Arc, RwLock};
17
18#[derive(Clone, Serialize)]
19pub struct SpecStoreData {
20 pub source: SpecsSource,
21 pub time_received_at: Option<u64>,
22 pub values: SpecsResponseFull,
23 pub decompression_dict: Option<DictionaryDecoder>,
24 pub id_lists: HashMap<String, IdList>,
25}
26
27const TAG: &str = stringify!(SpecStore);
28
29pub struct SpecStore {
30 pub data: Arc<RwLock<SpecStoreData>>,
31
32 hashed_sdk_key: String,
33 data_store: Option<Arc<dyn DataStoreTrait>>,
34 statsig_runtime: Arc<StatsigRuntime>,
35 ops_stats: Arc<OpsStatsForInstance>,
36 global_configs: Arc<GlobalConfigs>,
37}
38
39impl SpecStore {
40 #[must_use]
41 pub fn new(
42 sdk_key: &str,
43 hashed_sdk_key: String,
44 statsig_runtime: Arc<StatsigRuntime>,
45 data_store: Option<Arc<dyn DataStoreTrait>>,
46 ) -> SpecStore {
47 SpecStore {
48 hashed_sdk_key,
49 data: Arc::new(RwLock::new(SpecStoreData {
50 values: SpecsResponseFull::blank(),
51 time_received_at: None,
52 source: SpecsSource::Uninitialized,
53 decompression_dict: None,
54 id_lists: HashMap::new(),
55 })),
56 data_store,
57 statsig_runtime,
58 ops_stats: OPS_STATS.get_for_instance(sdk_key),
59 global_configs: GlobalConfigs::get_instance(sdk_key),
60 }
61 }
62
63 pub fn set_source(&self, source: SpecsSource) {
64 if let Ok(mut mut_values) = self.data.write() {
65 mut_values.source = source;
66 log_d!(TAG, "SpecStore - Source Changed ({:?})", mut_values.source);
67 }
68 }
69
70 pub fn get_current_values(&self) -> Option<SpecStoreData> {
71 let cloned = self.data.read().ok()?.clone();
72 Some(cloned)
73 }
74
75 pub fn set_values(&self, values: SpecsUpdate) -> Result<(), StatsigErr> {
76 let dcs = match self.parse_specs_response(&values)? {
77 SpecsResponse::Full(full) => full,
78 SpecsResponse::NoUpdates(_) => {
79 self.ops_stats_log_no_update(values.source);
80 return Ok(());
81 }
82 };
83
84 if self.are_current_values_newer(&dcs) {
85 return Ok(());
86 }
87
88 let mut mut_values = match self.data.write() {
89 Ok(mut_values) => mut_values,
90 Err(e) => {
91 log_e!(TAG, "Failed to acquire write lock: {}", e);
92 return Err(StatsigErr::LockFailure(e.to_string()));
93 }
94 };
95
96 self.try_update_global_configs(&dcs);
97
98 let now = Utc::now().timestamp_millis() as u64;
99 let prev_source = mut_values.source.clone();
100
101 mut_values.values = *dcs;
102 mut_values.time_received_at = Some(now);
103 mut_values.source = values.source.clone();
104
105 self.try_update_data_store(&mut_values.source, values.data, now);
106 self.ops_stats_log_config_propogation_diff(
107 mut_values.values.time,
108 &mut_values.source,
109 &prev_source,
110 );
111
112 Ok(())
113 }
114}
115
116impl SpecStore {
119 fn parse_specs_response(&self, values: &SpecsUpdate) -> Result<SpecsResponse, StatsigErr> {
120 let parsed = match serde_json::from_str::<SpecsResponse>(&values.data) {
121 Ok(response) => response,
122 Err(e) => {
123 log_error_to_statsig_and_console!(
124 self.ops_stats,
125 TAG,
126 "{:?}, {:?}",
127 e,
128 values.source
129 );
130 return Err(StatsigErr::JsonParseError(
131 "SpecsResponse".to_string(),
132 e.to_string(),
133 ));
134 }
135 };
136
137 match parsed {
138 SpecsResponse::Full(full) => {
139 if !full.has_updates {
140 return Ok(SpecsResponse::NoUpdates(SpecsResponseNoUpdates {
141 has_updates: false,
142 }));
143 }
144
145 log_d!(
146 TAG,
147 "SpecStore Full Update: {} - [gates({}), configs({}), layers({})]",
148 full.time,
149 full.feature_gates.len(),
150 full.dynamic_configs.len(),
151 full.layer_configs.len(),
152 );
153
154 Ok(SpecsResponse::Full(full))
155 }
156 SpecsResponse::NoUpdates(no_updates) => {
157 if no_updates.has_updates {
158 log_error_to_statsig_and_console!(
159 self.ops_stats,
160 TAG,
161 "Empty response with has_updates = true {:?}",
162 values.source
163 );
164
165 return Err(StatsigErr::JsonParseError(
166 "SpecsResponse".to_owned(),
167 "Parse failure. 'has_update' is true, but failed to deserialize to response format 'dcs-v2'".to_owned(),
168 ));
169 }
170
171 Ok(SpecsResponse::NoUpdates(no_updates))
172 }
173 }
174 }
175
176 fn ops_stats_log_no_update(&self, source: SpecsSource) {
177 log_d!(TAG, "SpecStore - No Updates");
178 self.ops_stats.log(ObservabilityEvent::new_event(
179 MetricType::Increment,
180 "config_no_update".to_string(),
181 1.0,
182 Some(HashMap::from([("source".to_string(), source.to_string())])),
183 ));
184 }
185
186 fn ops_stats_log_config_propogation_diff(
187 &self,
188 lcut: u64,
189 source: &SpecsSource,
190 prev_source: &SpecsSource,
191 ) {
192 let delay = Utc::now().timestamp_millis() as u64 - lcut;
193 log_d!(TAG, "SpecStore - Updated ({:?})", source);
194
195 if *prev_source == SpecsSource::Uninitialized || *prev_source == SpecsSource::Loading {
196 return;
197 }
198
199 self.ops_stats.log(ObservabilityEvent::new_event(
200 MetricType::Dist,
201 "config_propogation_diff".to_string(),
202 delay as f64,
203 Some(HashMap::from([("source".to_string(), source.to_string())])),
204 ));
205 }
206
207 fn try_update_global_configs(&self, dcs: &SpecsResponseFull) {
208 if let Some(diagnostics) = &dcs.diagnostics {
209 self.global_configs
210 .set_diagnostics_sampling_rates(diagnostics.clone());
211 }
212
213 if let Some(sdk_configs) = &dcs.sdk_configs {
214 self.global_configs.set_sdk_configs(sdk_configs.clone());
215 }
216 }
217
218 fn try_update_data_store(&self, source: &SpecsSource, data: String, now: u64) {
219 if *source != SpecsSource::Network {
220 return;
221 }
222
223 let data_store = match &self.data_store {
224 Some(data_store) => data_store.clone(),
225 None => return,
226 };
227
228 let hashed_key = self.hashed_sdk_key.clone();
229 self.statsig_runtime.spawn(
230 "spec_store_update_data_store",
231 move |_shutdown_notif| async move {
232 let _ = data_store
233 .set(&get_data_adapter_dcs_key(&hashed_key), &data, Some(now))
234 .await;
235 },
236 );
237 }
238
239 fn are_current_values_newer(&self, dcs: &SpecsResponseFull) -> bool {
240 let guard = match self.data.read() {
241 Ok(guard) => guard,
242 Err(e) => {
243 log_e!(TAG, "Failed to acquire read lock: {}", e);
244 return false;
245 }
246 };
247
248 let curr_values = &guard.values;
249 let curr_checksum = curr_values.checksum.as_deref().unwrap_or_default();
250 let new_checksum = dcs.checksum.as_deref().unwrap_or_default();
251
252 let cached_time_is_newer = curr_values.time > 0 && curr_values.time > dcs.time;
253 let checksums_match = !curr_checksum.is_empty() && curr_checksum == new_checksum;
254
255 if cached_time_is_newer || checksums_match {
256 log_d!(
257 TAG,
258 "SpecStore - Received values for [time: {}, checksum: {}], but currently has values for [time: {}, checksum: {}]. Ignoring values.",
259 dcs.time,
260 new_checksum,
261 curr_values.time,
262 curr_checksum,
263 );
264 return true;
265 }
266
267 false
268 }
269}
270
271impl SpecsUpdateListener for SpecStore {
274 fn did_receive_specs_update(&self, update: SpecsUpdate) -> Result<(), StatsigErr> {
275 self.set_values(update)
276 }
277
278 fn get_current_specs_info(&self) -> SpecsInfo {
279 match self.data.read() {
280 Ok(data) => SpecsInfo {
281 lcut: Some(data.values.time),
282 checksum: data.values.checksum.clone(),
283 zstd_dict_id: data
284 .decompression_dict
285 .as_ref()
286 .map(|d| d.get_dict_id().to_string()),
287 source: data.source.clone(),
288 },
289 Err(e) => {
290 log_e!(TAG, "Failed to acquire read lock: {}", e);
291 SpecsInfo {
292 lcut: None,
293 checksum: None,
294 zstd_dict_id: None,
295 source: SpecsSource::Error,
296 }
297 }
298 }
299 }
300}
301
302impl IdListsUpdateListener for SpecStore {
305 fn get_current_id_list_metadata(
306 &self,
307 ) -> HashMap<String, crate::id_lists_adapter::IdListMetadata> {
308 match self.data.read() {
309 Ok(data) => data
310 .id_lists
311 .iter()
312 .map(|(key, list)| (key.clone(), list.metadata.clone()))
313 .collect(),
314 Err(e) => {
315 log_e!(TAG, "Failed to acquire read lock: {}", e);
316 HashMap::new()
317 }
318 }
319 }
320
321 fn did_receive_id_list_updates(
322 &self,
323 updates: HashMap<String, crate::id_lists_adapter::IdListUpdate>,
324 ) {
325 let mut data = match self.data.write() {
326 Ok(data) => data,
327 Err(e) => {
328 log_e!(TAG, "Failed to acquire write lock: {}", e);
329 return;
330 }
331 };
332
333 data.id_lists.retain(|name, _| updates.contains_key(name));
335
336 for (list_name, update) in updates {
337 if let Some(entry) = data.id_lists.get_mut(&list_name) {
338 entry.apply_update(&update);
340 } else {
341 let mut list = IdList::new(update.new_metadata.clone());
343 list.apply_update(&update);
344 data.id_lists.insert(list_name, list);
345 }
346 }
347 }
348}
349
350impl SpecsResponseFull {
351 fn blank() -> Self {
352 SpecsResponseFull {
353 feature_gates: Default::default(),
354 dynamic_configs: Default::default(),
355 layer_configs: Default::default(),
356 condition_map: Default::default(),
357 experiment_to_layer: Default::default(),
358 has_updates: true,
359 time: 0,
360 checksum: None,
361 default_environment: None,
362 app_id: None,
363 sdk_keys_to_app_ids: None,
364 hashed_sdk_keys_to_app_ids: None,
365 diagnostics: None,
366 param_stores: None,
367 sdk_configs: None,
368 cmab_configs: None,
369 overrides: None,
370 override_rules: None,
371 }
372 }
373}