1use crate::data_store_interface::{get_data_adapter_dcs_key, DataStoreTrait};
2use crate::id_lists_adapter::{IdList, IdListsUpdateListener};
3use crate::observability::observability_client_adapter::{MetricType, ObservabilityEvent};
4use crate::observability::ops_stats::{OpsStatsForInstance, OPS_STATS};
5use crate::observability::sdk_errors_observer::ErrorBoundaryEvent;
6use crate::sdk_diagnostics::diagnostics::Diagnostics;
7use crate::spec_types::{SpecsResponse, SpecsResponseFull};
8use crate::{
9 log_d, log_e, log_error_to_statsig_and_console, DynamicValue, SpecsInfo, SpecsSource,
10 SpecsUpdate, SpecsUpdateListener, StatsigErr, StatsigRuntime,
11};
12use chrono::Utc;
13use serde::Serialize;
14use std::collections::HashMap;
15use std::sync::{Arc, RwLock};
16
17#[derive(Clone, Serialize)]
18pub struct SpecStoreData {
19 pub source: SpecsSource,
20 pub time_received_at: Option<u64>,
21 pub values: SpecsResponseFull,
22
23 pub id_lists: HashMap<String, IdList>,
24}
25
26const TAG: &str = stringify!(SpecStore);
27
28pub struct SpecStore {
29 pub hashed_sdk_key: String,
30 pub data: Arc<RwLock<SpecStoreData>>,
31 pub data_store: Option<Arc<dyn DataStoreTrait>>,
32 pub statsig_runtime: Option<Arc<StatsigRuntime>>,
33 diagnostics: Option<Arc<Diagnostics>>,
34 ops_stats: Arc<OpsStatsForInstance>,
35}
36
37impl SpecsUpdateListener for SpecStore {
38 fn did_receive_specs_update(&self, update: SpecsUpdate) -> Result<(), StatsigErr> {
39 self.set_values(update)
40 }
41
42 fn get_current_specs_info(&self) -> SpecsInfo {
43 match self.data.read() {
44 Ok(data) => SpecsInfo {
45 lcut: Some(data.values.time),
46 checksum: data.values.checksum.clone(),
47 source: data.source.clone(),
48 },
49 Err(e) => {
50 log_e!(TAG, "Failed to acquire read lock: {}", e);
51 SpecsInfo {
52 lcut: None,
53 checksum: None,
54 source: SpecsSource::Error,
55 }
56 }
57 }
58 }
59}
60
61impl IdListsUpdateListener for SpecStore {
62 fn get_current_id_list_metadata(
63 &self,
64 ) -> HashMap<String, crate::id_lists_adapter::IdListMetadata> {
65 match self.data.read() {
66 Ok(data) => data
67 .id_lists
68 .iter()
69 .map(|(key, list)| (key.clone(), list.metadata.clone()))
70 .collect(),
71 Err(e) => {
72 log_e!(TAG, "Failed to acquire read lock: {}", e);
73 HashMap::new()
74 }
75 }
76 }
77
78 fn did_receive_id_list_updates(
79 &self,
80 updates: HashMap<String, crate::id_lists_adapter::IdListUpdate>,
81 ) {
82 let mut data = match self.data.write() {
83 Ok(data) => data,
84 Err(e) => {
85 log_e!(TAG, "Failed to acquire write lock: {}", e);
86 return;
87 }
88 };
89
90 data.id_lists.retain(|name, _| updates.contains_key(name));
92
93 for (list_name, update) in updates {
94 if let Some(entry) = data.id_lists.get_mut(&list_name) {
95 entry.apply_update(&update);
97 } else {
98 let mut list = IdList::new(update.new_metadata.clone());
100 list.apply_update(&update);
101 data.id_lists.insert(list_name, list);
102 }
103 }
104 }
105}
106
107impl Default for SpecStore {
108 fn default() -> Self {
109 let sdk_key = String::new();
110 Self::new(&sdk_key, sdk_key.to_string(), None, None, None)
111 }
112}
113
114impl SpecStore {
115 #[must_use]
116 pub fn new(
117 sdk_key: &str,
118 hashed_sdk_key: String,
119 data_store: Option<Arc<dyn DataStoreTrait>>,
120 statsig_runtime: Option<Arc<StatsigRuntime>>,
121 diagnostics: Option<Arc<Diagnostics>>,
122 ) -> SpecStore {
123 SpecStore {
124 hashed_sdk_key,
125 data: Arc::new(RwLock::new(SpecStoreData {
126 values: SpecsResponseFull::blank(),
127 time_received_at: None,
128 source: SpecsSource::Uninitialized,
129 id_lists: HashMap::new(),
130 })),
131 data_store,
132 statsig_runtime,
133 ops_stats: OPS_STATS.get_for_instance(sdk_key),
134 diagnostics,
135 }
136 }
137
138 pub fn set_source(&self, source: SpecsSource) {
139 if let Ok(mut mut_values) = self.data.write() {
140 mut_values.source = source;
141 log_d!(TAG, "SpecStore - Source Changed ({:?})", mut_values.source);
142 }
143 }
144
145 pub fn set_values(&self, values: SpecsUpdate) -> Result<(), StatsigErr> {
146 let parsed = serde_json::from_str::<SpecsResponse>(&values.data);
147 let dcs = match parsed {
148 Ok(SpecsResponse::Full(full)) => {
149 if !full.has_updates {
150 self.log_no_update(values.source);
151 return Ok(());
152 }
153
154 log_d!(
155 TAG,
156 "SpecStore Full Update: {} - [gates({}), configs({}), layers({})]",
157 full.time,
158 full.feature_gates.len(),
159 full.dynamic_configs.len(),
160 full.layer_configs.len(),
161 );
162
163 full
164 }
165 Ok(SpecsResponse::NoUpdates(no_updates)) => {
166 if !no_updates.has_updates {
167 self.log_no_update(values.source);
168 return Ok(());
169 }
170 log_error_to_statsig_and_console!(
171 self.ops_stats,
172 TAG,
173 "Empty response with has_updates = true {:?}",
174 values.source
175 );
176 return Err(StatsigErr::JsonParseError(
177 "SpecsResponse".to_owned(),
178 "Parse failure. 'has_update' is true, but failed to deserialize to response format 'dcs-v2'".to_owned(),
179 ));
180 }
181 Err(e) => {
182 log_error_to_statsig_and_console!(
184 self.ops_stats,
185 TAG,
186 "{:?}, {:?}",
187 e,
188 values.source
189 );
190 return Err(StatsigErr::JsonParseError(
191 "config_spec".to_string(),
192 e.to_string(),
193 ));
194 }
195 };
196
197 if let Some(ref diagnostics) = dcs.diagnostics {
198 if self.diagnostics.is_some() {
199 if let Some(diagnostics_instance) = &self.diagnostics {
200 diagnostics_instance.set_sampling_rate(diagnostics.clone());
201 }
202 }
203 }
204
205 if let Ok(mut mut_values) = self.data.write() {
206 let cached_time_is_newer =
207 mut_values.values.time > 0 && mut_values.values.time > dcs.time;
208 let checksums_match =
209 mut_values
210 .values
211 .checksum
212 .as_ref()
213 .is_some_and(|cached_checksum| {
214 dcs.checksum
215 .as_ref()
216 .is_some_and(|new_checksum| cached_checksum == new_checksum)
217 });
218
219 if cached_time_is_newer || checksums_match {
220 log_d!(
221 TAG,
222 "SpecStore - Received values for [time: {}, checksum: {}], but currently has values for [time: {}, checksum: {}]. Ignoring values.",
223 dcs.time,
224 dcs.checksum.unwrap_or(String::new()),
225 mut_values.values.time,
226 mut_values.values.checksum.clone().unwrap_or(String::new()),
227 );
228 return Ok(());
229 }
230 let curr_time = Some(Utc::now().timestamp_millis() as u64);
231 let prev_source = mut_values.source.clone();
232 mut_values.values = *dcs;
233 mut_values.time_received_at = curr_time;
234 mut_values.source = values.source.clone();
235 if self.data_store.is_some() && mut_values.source == SpecsSource::Network {
236 if let Some(data_store) = self.data_store.clone() {
237 let hashed_key = self.hashed_sdk_key.clone();
238 self.statsig_runtime.clone().map(|rt| {
239 let copy = curr_time;
240 rt.spawn("update data adapter", move |_| async move {
241 let _ = data_store
242 .set(&get_data_adapter_dcs_key(&hashed_key), &values.data, copy)
243 .await;
244 })
245 });
246 }
247 }
248 self.log_processing_config(
249 mut_values.values.time,
250 mut_values.source.clone(),
251 prev_source,
252 );
253 }
254
255 Ok(())
256 }
257
258 #[must_use]
259 pub fn get_sdk_config_value(&self, key: &str) -> Option<DynamicValue> {
260 match self.data.read() {
261 Ok(data) => match &data.values.sdk_configs {
262 Some(sdk_configs) => sdk_configs.get(key).cloned(),
263 None => {
264 log_d!(TAG, "SDK Configs not found");
265 None
266 }
267 },
268 Err(e) => {
269 log_e!(TAG, "Failed to acquire read lock: {}", e);
270 None
271 }
272 }
273 }
274
275 fn log_processing_config(&self, lcut: u64, source: SpecsSource, prev_source: SpecsSource) {
276 let delay = Utc::now().timestamp_millis() as u64 - lcut;
277 log_d!(TAG, "SpecStore - Updated ({:?})", source);
278
279 if prev_source != SpecsSource::Uninitialized && prev_source != SpecsSource::Loading {
280 self.ops_stats.log(ObservabilityEvent::new_event(
281 MetricType::Dist,
282 "config_propogation_diff".to_string(),
283 delay as f64,
284 Some(HashMap::from([("source".to_string(), source.to_string())])),
285 ));
286 }
287 }
288
289 fn log_no_update(&self, source: SpecsSource) {
290 log_d!(TAG, "SpecStore - No Updates");
291 self.ops_stats.log(ObservabilityEvent::new_event(
292 MetricType::Increment,
293 "config_no_update".to_string(),
294 1.0,
295 Some(HashMap::from([("source".to_string(), source.to_string())])),
296 ));
297 }
298}
299
300impl SpecsResponseFull {
301 fn blank() -> Self {
302 SpecsResponseFull {
303 feature_gates: Default::default(),
304 dynamic_configs: Default::default(),
305 layer_configs: Default::default(),
306 condition_map: Default::default(),
307 experiment_to_layer: Default::default(),
308 has_updates: true,
309 time: 0,
310 checksum: None,
311 default_environment: None,
312 app_id: None,
313 sdk_keys_to_app_ids: None,
314 hashed_sdk_keys_to_app_ids: None,
315 diagnostics: None,
316 param_stores: None,
317 sdk_configs: None,
318 cmab_configs: None,
319 overrides: None,
320 override_rules: None,
321 }
322 }
323}