1use crate::data_store_interface::{get_data_adapter_dcs_key, DataStoreTrait};
2use crate::id_lists_adapter::{IdList, IdListsUpdateListener};
3use crate::observability::observability_client_adapter::{MetricType, ObservabilityEvent};
4use crate::observability::ops_stats::OpsStatsForInstance;
5use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
6use crate::spec_types::{SpecsResponse, SpecsResponseFull};
7use crate::{
8 log_d, log_e, log_error_to_statsig_and_console, DynamicValue, SpecsInfo, SpecsSource,
9 SpecsUpdate, SpecsUpdateListener, StatsigErr, StatsigRuntime,
10};
11use chrono::Utc;
12use serde::Serialize;
13use std::collections::HashMap;
14use std::sync::{Arc, RwLock};
15
16#[derive(Clone, Serialize)]
17pub struct SpecStoreData {
18 pub source: SpecsSource,
19 pub time_received_at: Option<u64>,
20 pub values: SpecsResponseFull,
21
22 pub id_lists: HashMap<String, IdList>,
23}
24
25const TAG: &str = stringify!(SpecStore);
26
27pub struct SpecStore {
28 pub hashed_sdk_key: String,
29 pub data: Arc<RwLock<SpecStoreData>>,
30 pub data_store: Option<Arc<dyn DataStoreTrait>>,
31 pub statsig_runtime: Option<Arc<StatsigRuntime>>,
32 ops_stats: Arc<OpsStatsForInstance>,
33}
34
35impl SpecsUpdateListener for SpecStore {
36 fn did_receive_specs_update(&self, update: SpecsUpdate) -> Result<(), StatsigErr> {
37 self.set_values(update)
38 }
39
40 fn get_current_specs_info(&self) -> SpecsInfo {
41 match self.data.read() {
42 Ok(data) => SpecsInfo {
43 lcut: Some(data.values.time),
44 checksum: data.values.checksum.clone(),
45 source: data.source.clone(),
46 },
47 Err(e) => {
48 log_e!(TAG, "Failed to acquire read lock: {}", e);
49 SpecsInfo {
50 lcut: None,
51 checksum: None,
52 source: SpecsSource::Error,
53 }
54 }
55 }
56 }
57}
58
59impl IdListsUpdateListener for SpecStore {
60 fn get_current_id_list_metadata(
61 &self,
62 ) -> HashMap<String, crate::id_lists_adapter::IdListMetadata> {
63 match self.data.read() {
64 Ok(data) => data
65 .id_lists
66 .iter()
67 .map(|(key, list)| (key.clone(), list.metadata.clone()))
68 .collect(),
69 Err(e) => {
70 log_e!(TAG, "Failed to acquire read lock: {}", e);
71 HashMap::new()
72 }
73 }
74 }
75
76 fn did_receive_id_list_updates(
77 &self,
78 updates: HashMap<String, crate::id_lists_adapter::IdListUpdate>,
79 ) {
80 let mut data = match self.data.write() {
81 Ok(data) => data,
82 Err(e) => {
83 log_e!(TAG, "Failed to acquire write lock: {}", e);
84 return;
85 }
86 };
87
88 data.id_lists.retain(|name, _| updates.contains_key(name));
90
91 for (list_name, update) in updates {
92 if let Some(entry) = data.id_lists.get_mut(&list_name) {
93 entry.apply_update(&update);
95 } else {
96 let mut list = IdList::new(update.new_metadata.clone());
98 list.apply_update(&update);
99 data.id_lists.insert(list_name, list);
100 }
101 }
102 }
103}
104
105impl Default for SpecStore {
106 fn default() -> Self {
107 Self::new(
108 String::new(),
109 None,
110 None,
111 Arc::new(OpsStatsForInstance::new()),
112 )
113 }
114}
115
116impl SpecStore {
117 #[must_use]
118 pub fn new(
119 hashed_sdk_key: String,
120 data_store: Option<Arc<dyn DataStoreTrait>>,
121 statsig_runtime: Option<Arc<StatsigRuntime>>,
122 ops_stats: Arc<OpsStatsForInstance>,
123 ) -> SpecStore {
124 SpecStore {
125 hashed_sdk_key,
126 data: Arc::new(RwLock::new(SpecStoreData {
127 values: SpecsResponseFull::blank(),
128 time_received_at: None,
129 source: SpecsSource::Uninitialized,
130 id_lists: HashMap::new(),
131 })),
132 data_store,
133 statsig_runtime,
134 ops_stats,
135 }
136 }
137
138 pub fn set_source(&self, source: SpecsSource) {
139 if let Ok(mut mut_values) = self.data.write() {
140 mut_values.source = source;
141 log_d!(TAG, "SpecStore - Source Changed ({:?})", mut_values.source);
142 }
143 }
144
145 pub fn set_values(&self, values: SpecsUpdate) -> Result<(), StatsigErr> {
146 let parsed = serde_json::from_str::<SpecsResponse>(&values.data);
147 let dcs = match parsed {
148 Ok(SpecsResponse::Full(full)) => {
149 if !full.has_updates {
150 self.log_no_update(values.source);
151 return Ok(());
152 }
153
154 log_d!(
155 TAG,
156 "SpecStore Full Update: {} - [gates({}), configs({}), layers({})]",
157 full.time,
158 full.feature_gates.len(),
159 full.dynamic_configs.len(),
160 full.layer_configs.len(),
161 );
162
163 full
164 }
165 Ok(SpecsResponse::NoUpdates(no_updates)) => {
166 if !no_updates.has_updates {
167 self.log_no_update(values.source);
168 return Ok(());
169 }
170 log_error_to_statsig_and_console!(
171 self.ops_stats,
172 TAG,
173 "Empty response with has_updates = true {:?}",
174 values.source
175 );
176 return Err(StatsigErr::JsonParseError(
177 "SpecsResponse".to_owned(),
178 "Parse failure. 'has_update' is true, but failed to deserialize to response format 'dcs-v2'".to_owned(),
179 ));
180 }
181 Err(e) => {
182 log_error_to_statsig_and_console!(
184 self.ops_stats,
185 TAG,
186 "{:?}, {:?}",
187 e,
188 values.source
189 );
190 return Err(StatsigErr::JsonParseError(
191 "config_spec".to_string(),
192 e.to_string(),
193 ));
194 }
195 };
196 if let Ok(mut mut_values) = self.data.write() {
197 let cached_time_is_newer =
198 mut_values.values.time > 0 && mut_values.values.time > dcs.time;
199 let checksums_match =
200 mut_values
201 .values
202 .checksum
203 .as_ref()
204 .is_some_and(|cached_checksum| {
205 dcs.checksum
206 .as_ref()
207 .is_some_and(|new_checksum| cached_checksum == new_checksum)
208 });
209
210 if cached_time_is_newer || checksums_match {
211 log_d!(
212 TAG,
213 "SpecStore - Received values for [time: {}, checksum: {}], but currently has values for [time: {}, checksum: {}]. Ignoring values.",
214 dcs.time,
215 dcs.checksum.unwrap_or(String::new()),
216 mut_values.values.time,
217 mut_values.values.checksum.clone().unwrap_or(String::new()),
218 );
219 return Ok(());
220 }
221 let curr_time = Some(Utc::now().timestamp_millis() as u64);
222 let prev_source = mut_values.source.clone();
223 mut_values.values = *dcs;
224 mut_values.time_received_at = curr_time;
225 mut_values.source = values.source.clone();
226 if self.data_store.is_some() && mut_values.source == SpecsSource::Network {
227 if let Some(data_store) = self.data_store.clone() {
228 let hashed_key = self.hashed_sdk_key.clone();
229 self.statsig_runtime.clone().map(|rt| {
230 let copy = curr_time;
231 rt.spawn("update data adapter", move |_| async move {
232 let _ = data_store
233 .set(&get_data_adapter_dcs_key(&hashed_key), &values.data, copy)
234 .await;
235 })
236 });
237 }
238 }
239 self.log_processing_config(
240 mut_values.values.time,
241 mut_values.source.clone(),
242 prev_source,
243 );
244 }
245
246 Ok(())
247 }
248
249 #[must_use]
250 pub fn get_sdk_config_value(&self, key: &str) -> Option<DynamicValue> {
251 match self.data.read() {
252 Ok(data) => match &data.values.sdk_configs {
253 Some(sdk_configs) => sdk_configs.get(key).cloned(),
254 None => {
255 log_d!(TAG, "SDK Configs not found");
256 None
257 }
258 },
259 Err(e) => {
260 log_e!(TAG, "Failed to acquire read lock: {}", e);
261 None
262 }
263 }
264 }
265
266 fn log_processing_config(&self, lcut: u64, source: SpecsSource, prev_source: SpecsSource) {
267 let delay = Utc::now().timestamp_millis() as u64 - lcut;
268 log_d!(TAG, "SpecStore - Updated ({:?})", source);
269
270 if prev_source != SpecsSource::Uninitialized && prev_source != SpecsSource::Loading {
271 self.ops_stats.log(ObservabilityEvent::new_event(
272 MetricType::Dist,
273 "config_propogation_diff".to_string(),
274 delay as f64,
275 Some(HashMap::from([("source".to_string(), source.to_string())])),
276 ));
277 }
278 }
279
280 fn log_no_update(&self, source: SpecsSource) {
281 log_d!(TAG, "SpecStore - No Updates");
282 self.ops_stats.log(ObservabilityEvent::new_event(
283 MetricType::Increment,
284 "config_no_update".to_string(),
285 1.0,
286 Some(HashMap::from([("source".to_string(), source.to_string())])),
287 ));
288 }
289}
290
291impl SpecsResponseFull {
292 fn blank() -> Self {
293 SpecsResponseFull {
294 feature_gates: Default::default(),
295 dynamic_configs: Default::default(),
296 layer_configs: Default::default(),
297 condition_map: Default::default(),
298 experiment_to_layer: Default::default(),
299 has_updates: true,
300 time: 0,
301 checksum: None,
302 default_environment: None,
303 app_id: None,
304 sdk_keys_to_app_ids: None,
305 hashed_sdk_keys_to_app_ids: None,
306 diagnostics: None,
307 param_stores: None,
308 sdk_configs: None,
309 cmab_configs: None,
310 overrides: None,
311 override_rules: None,
312 }
313 }
314}