1use std::collections::HashMap;
17use std::sync::Arc;
18
19use launchdarkly_server_sdk_evaluation::{Flag, FlagBuilder, Segment};
20use parking_lot::{Mutex, RwLock};
21use tokio::sync::broadcast;
22
23use crate::data_source::DataSource;
24use crate::data_source_builders::{BuildError, DataSourceFactory};
25use crate::service_endpoints;
26use crate::stores::store::DataStore;
27use crate::stores::store_types::{AllData, PatchTarget, StorageItem};
28
29enum FlagOrigin {
31 Builder(FlagBuilder),
32 Preconfigured,
33}
34
35struct TestDataInner {
36 flag_origins: HashMap<String, FlagOrigin>,
37 current_flags: HashMap<String, Flag>,
38 flag_versions: HashMap<String, u64>,
39 current_segments: HashMap<String, Segment>,
40 segment_versions: HashMap<String, u64>,
41 instances: Vec<Arc<RwLock<dyn DataStore>>>,
42}
43
44#[derive(Clone)]
56pub struct TestData {
57 inner: Arc<Mutex<TestDataInner>>,
58}
59
60impl TestData {
61 pub fn new() -> Self {
63 Self {
64 inner: Arc::new(Mutex::new(TestDataInner {
65 flag_origins: HashMap::new(),
66 current_flags: HashMap::new(),
67 flag_versions: HashMap::new(),
68 current_segments: HashMap::new(),
69 segment_versions: HashMap::new(),
70 instances: Vec::new(),
71 })),
72 }
73 }
74
75 pub fn flag(&self, key: &str) -> FlagBuilder {
83 let inner = self.inner.lock();
84 match inner.flag_origins.get(key) {
85 Some(FlagOrigin::Builder(builder)) => builder.clone(),
86 _ => FlagBuilder::new(key),
87 }
88 }
89
90 pub fn update(&self, builder: FlagBuilder) {
95 let mut inner = self.inner.lock();
96
97 let key = builder.key().to_owned();
98 let stored_builder = builder.clone();
99 let mut flag = builder.build();
100
101 let version = inner.flag_versions.entry(key.clone()).or_insert(0);
102 *version += 1;
103 flag.version = *version;
104
105 inner
106 .flag_origins
107 .insert(key.clone(), FlagOrigin::Builder(stored_builder));
108 inner.current_flags.insert(key.clone(), flag.clone());
109
110 for store in &inner.instances {
111 let mut store = store.write();
112 let _ = store.upsert(&key, PatchTarget::Flag(StorageItem::Item(flag.clone())));
113 }
114 }
115
116 pub fn use_preconfigured_flag(&self, mut flag: Flag) {
124 let mut inner = self.inner.lock();
125
126 let key = flag.key.clone();
127 let version = inner.flag_versions.entry(key.clone()).or_insert(0);
128 *version += 1;
129 flag.version = *version;
130
131 inner
132 .flag_origins
133 .insert(key.clone(), FlagOrigin::Preconfigured);
134 inner.current_flags.insert(key.clone(), flag.clone());
135
136 for store in &inner.instances {
137 let mut store = store.write();
138 let _ = store.upsert(&key, PatchTarget::Flag(StorageItem::Item(flag.clone())));
139 }
140 }
141
142 pub fn use_preconfigured_segment(&self, mut segment: Segment) {
146 let mut inner = self.inner.lock();
147
148 let key = segment.key.clone();
149 let version = inner.segment_versions.entry(key.clone()).or_insert(0);
150 *version += 1;
151 segment.version = *version;
152
153 inner.current_segments.insert(key.clone(), segment.clone());
154
155 for store in &inner.instances {
156 let mut store = store.write();
157 let _ = store.upsert(
158 &key,
159 PatchTarget::Segment(StorageItem::Item(segment.clone())),
160 );
161 }
162 }
163}
164
165impl Default for TestData {
166 fn default() -> Self {
167 Self::new()
168 }
169}
170
171impl DataSourceFactory for TestData {
172 fn build(
173 &self,
174 _endpoints: &service_endpoints::ServiceEndpoints,
175 _sdk_key: &str,
176 _tags: Option<String>,
177 ) -> Result<Arc<dyn DataSource>, BuildError> {
178 Ok(Arc::new(TestDataSource {
179 inner: self.inner.clone(),
180 }))
181 }
182
183 fn to_owned(&self) -> Box<dyn DataSourceFactory> {
184 Box::new(self.clone())
185 }
186}
187
188struct TestDataSource {
189 inner: Arc<Mutex<TestDataInner>>,
190}
191
192impl DataSource for TestDataSource {
193 fn subscribe(
194 &self,
195 data_store: Arc<RwLock<dyn DataStore>>,
196 init_complete: Arc<dyn Fn(bool) + Send + Sync>,
197 shutdown_receiver: broadcast::Receiver<()>,
198 ) {
199 let mut inner = self.inner.lock();
200
201 let all_data = AllData {
203 flags: inner.current_flags.clone(),
204 segments: inner.current_segments.clone(),
205 };
206
207 {
208 let mut store = data_store.write();
209 store.init(all_data);
210 }
211
212 inner.instances.push(data_store.clone());
214
215 (init_complete)(true);
216
217 let inner_ref = self.inner.clone();
219 let store_ref = data_store.clone();
220 tokio::spawn(async move {
221 let mut shutdown = shutdown_receiver;
222 let _ = shutdown.recv().await;
223 let mut inner = inner_ref.lock();
224 inner.instances.retain(|s| !Arc::ptr_eq(s, &store_ref));
225 });
226 }
227}
228
229#[cfg(test)]
230mod tests {
231 use super::*;
232 use crate::stores::store::InMemoryDataStore;
233 use launchdarkly_server_sdk_evaluation::FlagBuilder;
234 use std::sync::atomic::{AtomicBool, Ordering};
235
236 fn make_store() -> Arc<RwLock<dyn DataStore>> {
237 Arc::new(RwLock::new(InMemoryDataStore::new()))
238 }
239
240 fn subscribe_store(td: &TestData, store: &Arc<RwLock<dyn DataStore>>) -> broadcast::Sender<()> {
241 let factory: &dyn DataSourceFactory = td;
242 let endpoints = crate::ServiceEndpointsBuilder::new().build().unwrap();
243 let ds = factory.build(&endpoints, "fake-key", None).unwrap();
244
245 let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
246 ds.subscribe(store.clone(), Arc::new(|_| {}), shutdown_rx);
247 shutdown_tx
248 }
249
250 #[test]
251 fn flag_returns_default_builder_for_unknown_key() {
252 let td = TestData::new();
253 let builder = td.flag("unknown");
254 assert_eq!(builder.key(), "unknown");
255 let flag = builder.build();
256 assert_eq!(flag.key, "unknown");
257 }
258
259 #[test]
260 fn flag_returns_cloned_builder_after_update() {
261 let td = TestData::new();
262 td.update(FlagBuilder::new("my-flag").variation_for_all(false));
263 let builder = td.flag("my-flag");
264 assert_eq!(builder.key(), "my-flag");
265 }
266
267 #[test]
268 fn flag_returns_default_builder_for_preconfigured_flag() {
269 let td = TestData::new();
270 let flag = FlagBuilder::new("preconf").build();
271 td.use_preconfigured_flag(flag);
272 let builder = td.flag("preconf");
273 assert_eq!(builder.key(), "preconf");
274 }
275
276 #[test]
277 fn update_increments_version_each_call() {
278 let td = TestData::new();
279 td.update(FlagBuilder::new("my-flag"));
280 td.update(FlagBuilder::new("my-flag"));
281 td.update(FlagBuilder::new("my-flag"));
282
283 let inner = td.inner.lock();
284 let flag = inner.current_flags.get("my-flag").unwrap();
285 assert_eq!(flag.version, 3);
286 }
287
288 #[test]
289 fn use_preconfigured_flag_increments_version_each_call() {
290 let td = TestData::new();
291 td.use_preconfigured_flag(FlagBuilder::new("my-flag").build());
292 td.use_preconfigured_flag(FlagBuilder::new("my-flag").build());
293 td.use_preconfigured_flag(FlagBuilder::new("my-flag").build());
294
295 let inner = td.inner.lock();
296 let flag = inner.current_flags.get("my-flag").unwrap();
297 assert_eq!(flag.version, 3);
298 }
299
300 #[test]
301 fn flag_builder_increments_version_each_call() {
302 let td = TestData::new();
303 td.update(td.flag("my-flag").variation_for_all(true));
304 td.update(td.flag("my-flag").variation_for_all(false));
305 td.update(td.flag("my-flag").variation_for_all(true));
306
307 let inner = td.inner.lock();
308 let flag = inner.current_flags.get("my-flag").unwrap();
309 assert_eq!(flag.version, 3);
310 }
311
312 #[tokio::test]
313 async fn update_propagates_to_connected_store() {
314 let td = TestData::new();
315 let store = make_store();
316 let _shutdown = subscribe_store(&td, &store);
317
318 td.update(FlagBuilder::new("my-flag").variation_for_all(true));
319
320 let s = store.read();
321 let flag = s.flag("my-flag").unwrap();
322 assert_eq!(flag.key, "my-flag");
323 }
324
325 #[tokio::test]
326 async fn subscribe_initializes_store_with_all_current_data() {
327 let td = TestData::new();
328 td.update(FlagBuilder::new("flag-1").variation_for_all(true));
329 td.update(FlagBuilder::new("flag-2").variation_for_all(false));
330
331 let store = make_store();
332 let _shutdown = subscribe_store(&td, &store);
333
334 let s = store.read();
335 assert!(s.flag("flag-1").is_some());
336 assert!(s.flag("flag-2").is_some());
337 }
338
339 #[tokio::test]
340 async fn subscribe_calls_init_complete_true() {
341 let td = TestData::new();
342 let store = make_store();
343
344 let factory: &dyn DataSourceFactory = &td;
345 let endpoints = crate::ServiceEndpointsBuilder::new().build().unwrap();
346 let ds = factory.build(&endpoints, "fake-key", None).unwrap();
347
348 let initialized = Arc::new(AtomicBool::new(false));
349 let init_clone = initialized.clone();
350 let (_shutdown_tx, shutdown_rx) = broadcast::channel(1);
351
352 ds.subscribe(
353 store,
354 Arc::new(move |success| init_clone.store(success, Ordering::SeqCst)),
355 shutdown_rx,
356 );
357
358 assert!(initialized.load(Ordering::SeqCst));
359 }
360
361 #[tokio::test]
362 async fn multiple_stores_receive_updates() {
363 let td = TestData::new();
364 let store1 = make_store();
365 let store2 = make_store();
366 let _shutdown1 = subscribe_store(&td, &store1);
367 let _shutdown2 = subscribe_store(&td, &store2);
368
369 td.update(FlagBuilder::new("shared-flag"));
370
371 assert!(store1.read().flag("shared-flag").is_some());
372 assert!(store2.read().flag("shared-flag").is_some());
373 }
374
375 #[tokio::test]
376 async fn use_preconfigured_flag_propagates() {
377 let td = TestData::new();
378 let store = make_store();
379 let _shutdown = subscribe_store(&td, &store);
380
381 let flag = FlagBuilder::new("preconf").variation_for_all(true).build();
382 td.use_preconfigured_flag(flag);
383
384 let s = store.read();
385 let stored_flag = s.flag("preconf").unwrap();
386 assert_eq!(stored_flag.version, 1);
387 }
388
389 #[tokio::test]
390 async fn use_preconfigured_segment_propagates() {
391 let td = TestData::new();
392 let store = make_store();
393 let _shutdown = subscribe_store(&td, &store);
394
395 let segment: Segment = serde_json::from_str(
396 r#"{
397 "key": "seg-1",
398 "included": ["alice"],
399 "excluded": [],
400 "rules": [],
401 "salt": "salty",
402 "version": 999
403 }"#,
404 )
405 .unwrap();
406
407 td.use_preconfigured_segment(segment);
408
409 let s = store.read();
410 let stored = s.segment("seg-1").unwrap();
411 assert_eq!(stored.version, 1); }
413
414 #[test]
415 fn version_counters_are_independent_per_flag() {
416 let td = TestData::new();
417 td.update(FlagBuilder::new("a"));
418 td.update(FlagBuilder::new("a"));
419 td.update(FlagBuilder::new("b"));
420
421 let inner = td.inner.lock();
422 assert_eq!(inner.current_flags.get("a").unwrap().version, 2);
423 assert_eq!(inner.current_flags.get("b").unwrap().version, 1);
424 }
425
426 #[tokio::test]
427 async fn data_source_factory_build_returns_working_data_source() {
428 let td = TestData::new();
429 td.update(FlagBuilder::new("factory-flag"));
430
431 let factory: &dyn DataSourceFactory = &td;
432 let endpoints = crate::ServiceEndpointsBuilder::new().build().unwrap();
433 let ds = factory.build(&endpoints, "key", None).unwrap();
434
435 let store = make_store();
436 let (_tx, rx) = broadcast::channel(1);
437 ds.subscribe(store.clone(), Arc::new(|_| {}), rx);
438
439 assert!(store.read().flag("factory-flag").is_some());
440 }
441
442 #[tokio::test]
443 async fn data_source_factory_to_owned_shares_state() {
444 let td = TestData::new();
445 let owned = DataSourceFactory::to_owned(&td);
446
447 td.update(FlagBuilder::new("shared-state"));
448
449 let endpoints = crate::ServiceEndpointsBuilder::new().build().unwrap();
450 let ds = owned.build(&endpoints, "key", None).unwrap();
451
452 let store = make_store();
453 let (_tx, rx) = broadcast::channel(1);
454 ds.subscribe(store.clone(), Arc::new(|_| {}), rx);
455
456 assert!(store.read().flag("shared-state").is_some());
457 }
458
459 #[tokio::test]
460 async fn shutdown_unregisters_store() {
461 let td = TestData::new();
462 let store = make_store();
463 let shutdown_tx = subscribe_store(&td, &store);
464
465 assert_eq!(td.inner.lock().instances.len(), 1);
466
467 let _ = shutdown_tx.send(());
468 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
470
471 assert_eq!(td.inner.lock().instances.len(), 0);
472 }
473}