1use crate::compression::zstd_decompression_dict::DictionaryDecoder;
2use crate::data_store_interface::{get_data_adapter_dcs_key, DataStoreTrait};
3use crate::global_configs::GlobalConfigs;
4use crate::id_lists_adapter::{IdList, IdListsUpdateListener};
5use crate::observability::observability_client_adapter::{MetricType, ObservabilityEvent};
6use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
7use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
8use crate::spec_types::{SpecsResponse, SpecsResponseFull};
9use crate::{
10 log_d, log_e, log_error_to_statsig_and_console, SpecsInfo, SpecsSource, SpecsUpdate,
11 SpecsUpdateListener, StatsigErr, StatsigRuntime,
12};
13use chrono::Utc;
14use serde::Serialize;
15use std::collections::HashMap;
16use std::sync::{Arc, RwLock};
17
18#[derive(Clone, Serialize)]
19pub struct SpecStoreData {
20 pub source: SpecsSource,
21 pub time_received_at: Option<u64>,
22 pub values: SpecsResponseFull,
23 pub decompression_dict: Option<DictionaryDecoder>,
24 pub id_lists: HashMap<String, IdList>,
25}
26
27const TAG: &str = stringify!(SpecStore);
28
29pub struct SpecStore {
30 pub hashed_sdk_key: String,
31 pub data: Arc<RwLock<SpecStoreData>>,
32 pub data_store: Option<Arc<dyn DataStoreTrait>>,
33 pub statsig_runtime: Option<Arc<StatsigRuntime>>,
34 ops_stats: Arc<OpsStatsForInstance>,
35 global_configs: Arc<GlobalConfigs>,
36}
37
38impl SpecsUpdateListener for SpecStore {
39 fn did_receive_specs_update(&self, update: SpecsUpdate) -> Result<(), StatsigErr> {
40 self.set_values(update)
41 }
42
43 fn get_current_specs_info(&self) -> SpecsInfo {
44 match self.data.read() {
45 Ok(data) => SpecsInfo {
46 lcut: Some(data.values.time),
47 checksum: data.values.checksum.clone(),
48 zstd_dict_id: data
49 .decompression_dict
50 .as_ref()
51 .map(|d| d.get_dict_id().to_string()),
52 source: data.source.clone(),
53 },
54 Err(e) => {
55 log_e!(TAG, "Failed to acquire read lock: {}", e);
56 SpecsInfo {
57 lcut: None,
58 checksum: None,
59 zstd_dict_id: None,
60 source: SpecsSource::Error,
61 }
62 }
63 }
64 }
65}
66
67impl IdListsUpdateListener for SpecStore {
68 fn get_current_id_list_metadata(
69 &self,
70 ) -> HashMap<String, crate::id_lists_adapter::IdListMetadata> {
71 match self.data.read() {
72 Ok(data) => data
73 .id_lists
74 .iter()
75 .map(|(key, list)| (key.clone(), list.metadata.clone()))
76 .collect(),
77 Err(e) => {
78 log_e!(TAG, "Failed to acquire read lock: {}", e);
79 HashMap::new()
80 }
81 }
82 }
83
84 fn did_receive_id_list_updates(
85 &self,
86 updates: HashMap<String, crate::id_lists_adapter::IdListUpdate>,
87 ) {
88 let mut data = match self.data.write() {
89 Ok(data) => data,
90 Err(e) => {
91 log_e!(TAG, "Failed to acquire write lock: {}", e);
92 return;
93 }
94 };
95
96 data.id_lists.retain(|name, _| updates.contains_key(name));
98
99 for (list_name, update) in updates {
100 if let Some(entry) = data.id_lists.get_mut(&list_name) {
101 entry.apply_update(&update);
103 } else {
104 let mut list = IdList::new(update.new_metadata.clone());
106 list.apply_update(&update);
107 data.id_lists.insert(list_name, list);
108 }
109 }
110 }
111}
112
113impl Default for SpecStore {
114 fn default() -> Self {
115 let sdk_key = String::new();
116 Self::new(&sdk_key, sdk_key.to_string(), None, None)
117 }
118}
119
120impl SpecStore {
121 #[must_use]
122 pub fn new(
123 sdk_key: &str,
124 hashed_sdk_key: String,
125 data_store: Option<Arc<dyn DataStoreTrait>>,
126 statsig_runtime: Option<Arc<StatsigRuntime>>,
127 ) -> SpecStore {
128 SpecStore {
129 hashed_sdk_key,
130 data: Arc::new(RwLock::new(SpecStoreData {
131 values: SpecsResponseFull::blank(),
132 time_received_at: None,
133 source: SpecsSource::Uninitialized,
134 decompression_dict: None,
135 id_lists: HashMap::new(),
136 })),
137 data_store,
138 statsig_runtime,
139 ops_stats: OPS_STATS.get_for_instance(sdk_key),
140 global_configs: GlobalConfigs::get_instance(sdk_key),
141 }
142 }
143
144 pub fn set_source(&self, source: SpecsSource) {
145 if let Ok(mut mut_values) = self.data.write() {
146 mut_values.source = source;
147 log_d!(TAG, "SpecStore - Source Changed ({:?})", mut_values.source);
148 }
149 }
150
151 pub fn set_values(&self, values: SpecsUpdate) -> Result<(), StatsigErr> {
152 let parsed = serde_json::from_str::<SpecsResponse>(&values.data);
153 let dcs = match parsed {
154 Ok(SpecsResponse::Full(full)) => {
155 if !full.has_updates {
156 self.log_no_update(values.source);
157 return Ok(());
158 }
159
160 log_d!(
161 TAG,
162 "SpecStore Full Update: {} - [gates({}), configs({}), layers({})]",
163 full.time,
164 full.feature_gates.len(),
165 full.dynamic_configs.len(),
166 full.layer_configs.len(),
167 );
168
169 full
170 }
171 Ok(SpecsResponse::NoUpdates(no_updates)) => {
172 if !no_updates.has_updates {
173 self.log_no_update(values.source);
174 return Ok(());
175 }
176 log_error_to_statsig_and_console!(
177 self.ops_stats,
178 TAG,
179 "Empty response with has_updates = true {:?}",
180 values.source
181 );
182 return Err(StatsigErr::JsonParseError(
183 "SpecsResponse".to_owned(),
184 "Parse failure. 'has_update' is true, but failed to deserialize to response format 'dcs-v2'".to_owned(),
185 ));
186 }
187 Err(e) => {
188 log_error_to_statsig_and_console!(
190 self.ops_stats,
191 TAG,
192 "{:?}, {:?}",
193 e,
194 values.source
195 );
196 return Err(StatsigErr::JsonParseError(
197 "config_spec".to_string(),
198 e.to_string(),
199 ));
200 }
201 };
202
203 if let Some(diagnostics) = &dcs.diagnostics {
204 self.global_configs
205 .set_diagnostics_sampling_rates(diagnostics.clone());
206 }
207 if let Some(sdk_configs) = &dcs.sdk_configs {
208 self.global_configs.set_sdk_configs(sdk_configs.clone());
209 }
210
211 if let Ok(mut mut_values) = self.data.write() {
212 let cached_time_is_newer =
213 mut_values.values.time > 0 && mut_values.values.time > dcs.time;
214 let checksums_match =
215 mut_values
216 .values
217 .checksum
218 .as_ref()
219 .is_some_and(|cached_checksum| {
220 dcs.checksum
221 .as_ref()
222 .is_some_and(|new_checksum| cached_checksum == new_checksum)
223 });
224
225 if cached_time_is_newer || checksums_match {
226 log_d!(
227 TAG,
228 "SpecStore - Received values for [time: {}, checksum: {}], but currently has values for [time: {}, checksum: {}]. Ignoring values.",
229 dcs.time,
230 dcs.checksum.unwrap_or(String::new()),
231 mut_values.values.time,
232 mut_values.values.checksum.clone().unwrap_or(String::new()),
233 );
234 return Ok(());
235 }
236 let curr_time = Some(Utc::now().timestamp_millis() as u64);
237 let prev_source = mut_values.source.clone();
238 mut_values.values = *dcs;
239 mut_values.time_received_at = curr_time;
240 mut_values.source = values.source.clone();
241 if self.data_store.is_some() && mut_values.source == SpecsSource::Network {
242 if let Some(data_store) = self.data_store.clone() {
243 let hashed_key = self.hashed_sdk_key.clone();
244 self.statsig_runtime.clone().map(|rt| {
245 let copy = curr_time;
246 rt.spawn("update data adapter", move |_| async move {
247 let _ = data_store
248 .set(&get_data_adapter_dcs_key(&hashed_key), &values.data, copy)
249 .await;
250 })
251 });
252 }
253 }
254 self.log_processing_config(
255 mut_values.values.time,
256 mut_values.source.clone(),
257 prev_source,
258 );
259 }
260
261 Ok(())
262 }
263
264 fn log_processing_config(&self, lcut: u64, source: SpecsSource, prev_source: SpecsSource) {
265 let delay = Utc::now().timestamp_millis() as u64 - lcut;
266 log_d!(TAG, "SpecStore - Updated ({:?})", source);
267
268 if prev_source != SpecsSource::Uninitialized && prev_source != SpecsSource::Loading {
269 self.ops_stats.log(ObservabilityEvent::new_event(
270 MetricType::Dist,
271 "config_propogation_diff".to_string(),
272 delay as f64,
273 Some(HashMap::from([("source".to_string(), source.to_string())])),
274 ));
275 }
276 }
277
278 fn log_no_update(&self, source: SpecsSource) {
279 log_d!(TAG, "SpecStore - No Updates");
280 self.ops_stats.log(ObservabilityEvent::new_event(
281 MetricType::Increment,
282 "config_no_update".to_string(),
283 1.0,
284 Some(HashMap::from([("source".to_string(), source.to_string())])),
285 ));
286 }
287}
288
289impl SpecsResponseFull {
290 fn blank() -> Self {
291 SpecsResponseFull {
292 feature_gates: Default::default(),
293 dynamic_configs: Default::default(),
294 layer_configs: Default::default(),
295 condition_map: Default::default(),
296 experiment_to_layer: Default::default(),
297 has_updates: true,
298 time: 0,
299 checksum: None,
300 default_environment: None,
301 app_id: None,
302 sdk_keys_to_app_ids: None,
303 hashed_sdk_keys_to_app_ids: None,
304 diagnostics: None,
305 param_stores: None,
306 sdk_configs: None,
307 cmab_configs: None,
308 overrides: None,
309 override_rules: None,
310 }
311 }
312}