Skip to main content

launchdarkly_server_sdk/
test_data.rs

1//! Test data source for use in tests.
2//!
3//! The [`TestData`] type provides a way to inject feature flag data into the SDK for testing,
4//! without needing a LaunchDarkly connection. It implements [`DataSourceFactory`] and can be
5//! passed to [`crate::ConfigBuilder::data_source`].
6//!
7//! # Examples
8//!
9//! ```no_run
10//! use launchdarkly_server_sdk::{TestData, FlagBuilder};
11//!
12//! let td = TestData::new();
13//! td.update(FlagBuilder::new("flag-key-1").variation_for_all(true));
14//! ```
15
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use launchdarkly_server_sdk_evaluation::{Flag, FlagBuilder, Segment};
20use parking_lot::{Mutex, RwLock};
21use tokio::sync::broadcast;
22
23use crate::data_source::DataSource;
24use crate::data_source_builders::{BuildError, DataSourceFactory};
25use crate::service_endpoints;
26use crate::stores::store::DataStore;
27use crate::stores::store_types::{AllData, PatchTarget, StorageItem};
28
29/// Tracks whether a flag was created via a builder or set as a preconfigured flag.
30enum FlagOrigin {
31    Builder(FlagBuilder),
32    Preconfigured,
33}
34
35struct TestDataInner {
36    flag_origins: HashMap<String, FlagOrigin>,
37    current_flags: HashMap<String, Flag>,
38    flag_versions: HashMap<String, u64>,
39    current_segments: HashMap<String, Segment>,
40    segment_versions: HashMap<String, u64>,
41    instances: Vec<Arc<RwLock<dyn DataStore>>>,
42}
43
44/// A mechanism for providing dynamically updatable feature flag state in a simplified form to an
45/// SDK client.
46///
47/// `TestData` implements `DataSourceFactory`, so it can be passed to
48/// [`crate::ConfigBuilder::data_source`]. When the SDK client is started, it will receive the
49/// current flag state from `TestData` and will be notified of any subsequent changes.
50///
51/// Flag data can be provided using [`FlagBuilder`](crate::FlagBuilder) (via [`TestData::update`])
52/// or by passing fully constructed [`Flag`] objects (via [`TestData::use_preconfigured_flag`]).
53///
54/// Cloning a `TestData` creates a new handle that shares the same underlying state.
55#[derive(Clone)]
56pub struct TestData {
57    inner: Arc<Mutex<TestDataInner>>,
58}
59
60impl TestData {
61    /// Creates a new `TestData` instance with no initial flag data.
62    pub fn new() -> Self {
63        Self {
64            inner: Arc::new(Mutex::new(TestDataInner {
65                flag_origins: HashMap::new(),
66                current_flags: HashMap::new(),
67                flag_versions: HashMap::new(),
68                current_segments: HashMap::new(),
69                segment_versions: HashMap::new(),
70                instances: Vec::new(),
71            })),
72        }
73    }
74
75    /// Returns a copy of the current [`FlagBuilder`](crate::FlagBuilder) for the specified flag key, or a new default
76    /// builder if the flag has not been configured yet.
77    ///
78    /// If the flag was previously set via [`TestData::use_preconfigured_flag`], this returns a new
79    /// default builder since [`FlagBuilder`](crate::FlagBuilder) cannot represent all flag fields
80    /// (e.g. prerequisites, migration settings, client visibility) and hydrating from a [`Flag`]
81    /// would silently lose that data.
82    pub fn flag(&self, key: &str) -> FlagBuilder {
83        let inner = self.inner.lock();
84        match inner.flag_origins.get(key) {
85            Some(FlagOrigin::Builder(builder)) => builder.clone(),
86            _ => FlagBuilder::new(key),
87        }
88    }
89
90    /// Updates the test data with the given flag builder.
91    ///
92    /// If there are any connected SDK clients, they will be notified of the flag change. The flag
93    /// version is automatically incremented.
94    pub fn update(&self, builder: FlagBuilder) {
95        let mut inner = self.inner.lock();
96
97        let key = builder.key().to_owned();
98        let stored_builder = builder.clone();
99        let mut flag = builder.build();
100
101        let version = inner.flag_versions.entry(key.clone()).or_insert(0);
102        *version += 1;
103        flag.version = *version;
104
105        inner
106            .flag_origins
107            .insert(key.clone(), FlagOrigin::Builder(stored_builder));
108        inner.current_flags.insert(key.clone(), flag.clone());
109
110        for store in &inner.instances {
111            let mut store = store.write();
112            let _ = store.upsert(&key, PatchTarget::Flag(StorageItem::Item(flag.clone())));
113        }
114    }
115
116    /// Sets a preconfigured [`Flag`] directly.
117    ///
118    /// Use this when you need to set a flag with a configuration that cannot be expressed through
119    /// [`FlagBuilder`](crate::FlagBuilder). The flag version is automatically managed.
120    ///
121    /// Note: calling [`TestData::flag`] after this will return a new default builder, not a
122    /// builder derived from the preconfigured flag.
123    pub fn use_preconfigured_flag(&self, mut flag: Flag) {
124        let mut inner = self.inner.lock();
125
126        let key = flag.key.clone();
127        let version = inner.flag_versions.entry(key.clone()).or_insert(0);
128        *version += 1;
129        flag.version = *version;
130
131        inner
132            .flag_origins
133            .insert(key.clone(), FlagOrigin::Preconfigured);
134        inner.current_flags.insert(key.clone(), flag.clone());
135
136        for store in &inner.instances {
137            let mut store = store.write();
138            let _ = store.upsert(&key, PatchTarget::Flag(StorageItem::Item(flag.clone())));
139        }
140    }
141
142    /// Sets a preconfigured [`Segment`] directly.
143    ///
144    /// The segment version is automatically managed.
145    pub fn use_preconfigured_segment(&self, mut segment: Segment) {
146        let mut inner = self.inner.lock();
147
148        let key = segment.key.clone();
149        let version = inner.segment_versions.entry(key.clone()).or_insert(0);
150        *version += 1;
151        segment.version = *version;
152
153        inner.current_segments.insert(key.clone(), segment.clone());
154
155        for store in &inner.instances {
156            let mut store = store.write();
157            let _ = store.upsert(
158                &key,
159                PatchTarget::Segment(StorageItem::Item(segment.clone())),
160            );
161        }
162    }
163}
164
165impl Default for TestData {
166    fn default() -> Self {
167        Self::new()
168    }
169}
170
171impl DataSourceFactory for TestData {
172    fn build(
173        &self,
174        _endpoints: &service_endpoints::ServiceEndpoints,
175        _sdk_key: &str,
176        _tags: Option<String>,
177    ) -> Result<Arc<dyn DataSource>, BuildError> {
178        Ok(Arc::new(TestDataSource {
179            inner: self.inner.clone(),
180        }))
181    }
182
183    fn to_owned(&self) -> Box<dyn DataSourceFactory> {
184        Box::new(self.clone())
185    }
186}
187
188struct TestDataSource {
189    inner: Arc<Mutex<TestDataInner>>,
190}
191
192impl DataSource for TestDataSource {
193    fn subscribe(
194        &self,
195        data_store: Arc<RwLock<dyn DataStore>>,
196        init_complete: Arc<dyn Fn(bool) + Send + Sync>,
197        shutdown_receiver: broadcast::Receiver<()>,
198    ) {
199        let mut inner = self.inner.lock();
200
201        // Initialize the store with all current data
202        let all_data = AllData {
203            flags: inner.current_flags.clone(),
204            segments: inner.current_segments.clone(),
205        };
206
207        {
208            let mut store = data_store.write();
209            store.init(all_data);
210        }
211
212        // Register this store for future updates
213        inner.instances.push(data_store.clone());
214
215        (init_complete)(true);
216
217        // Spawn a task to unregister the store on shutdown
218        let inner_ref = self.inner.clone();
219        let store_ref = data_store.clone();
220        tokio::spawn(async move {
221            let mut shutdown = shutdown_receiver;
222            let _ = shutdown.recv().await;
223            let mut inner = inner_ref.lock();
224            inner.instances.retain(|s| !Arc::ptr_eq(s, &store_ref));
225        });
226    }
227}
228
229#[cfg(test)]
230mod tests {
231    use super::*;
232    use crate::stores::store::InMemoryDataStore;
233    use launchdarkly_server_sdk_evaluation::FlagBuilder;
234    use std::sync::atomic::{AtomicBool, Ordering};
235
236    fn make_store() -> Arc<RwLock<dyn DataStore>> {
237        Arc::new(RwLock::new(InMemoryDataStore::new()))
238    }
239
240    fn subscribe_store(td: &TestData, store: &Arc<RwLock<dyn DataStore>>) -> broadcast::Sender<()> {
241        let factory: &dyn DataSourceFactory = td;
242        let endpoints = crate::ServiceEndpointsBuilder::new().build().unwrap();
243        let ds = factory.build(&endpoints, "fake-key", None).unwrap();
244
245        let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
246        ds.subscribe(store.clone(), Arc::new(|_| {}), shutdown_rx);
247        shutdown_tx
248    }
249
250    #[test]
251    fn flag_returns_default_builder_for_unknown_key() {
252        let td = TestData::new();
253        let builder = td.flag("unknown");
254        assert_eq!(builder.key(), "unknown");
255        let flag = builder.build();
256        assert_eq!(flag.key, "unknown");
257    }
258
259    #[test]
260    fn flag_returns_cloned_builder_after_update() {
261        let td = TestData::new();
262        td.update(FlagBuilder::new("my-flag").variation_for_all(false));
263        let builder = td.flag("my-flag");
264        assert_eq!(builder.key(), "my-flag");
265    }
266
267    #[test]
268    fn flag_returns_default_builder_for_preconfigured_flag() {
269        let td = TestData::new();
270        let flag = FlagBuilder::new("preconf").build();
271        td.use_preconfigured_flag(flag);
272        let builder = td.flag("preconf");
273        assert_eq!(builder.key(), "preconf");
274    }
275
276    #[test]
277    fn update_increments_version_each_call() {
278        let td = TestData::new();
279        td.update(FlagBuilder::new("my-flag"));
280        td.update(FlagBuilder::new("my-flag"));
281        td.update(FlagBuilder::new("my-flag"));
282
283        let inner = td.inner.lock();
284        let flag = inner.current_flags.get("my-flag").unwrap();
285        assert_eq!(flag.version, 3);
286    }
287
288    #[test]
289    fn use_preconfigured_flag_increments_version_each_call() {
290        let td = TestData::new();
291        td.use_preconfigured_flag(FlagBuilder::new("my-flag").build());
292        td.use_preconfigured_flag(FlagBuilder::new("my-flag").build());
293        td.use_preconfigured_flag(FlagBuilder::new("my-flag").build());
294
295        let inner = td.inner.lock();
296        let flag = inner.current_flags.get("my-flag").unwrap();
297        assert_eq!(flag.version, 3);
298    }
299
300    #[test]
301    fn flag_builder_increments_version_each_call() {
302        let td = TestData::new();
303        td.update(td.flag("my-flag").variation_for_all(true));
304        td.update(td.flag("my-flag").variation_for_all(false));
305        td.update(td.flag("my-flag").variation_for_all(true));
306
307        let inner = td.inner.lock();
308        let flag = inner.current_flags.get("my-flag").unwrap();
309        assert_eq!(flag.version, 3);
310    }
311
312    #[tokio::test]
313    async fn update_propagates_to_connected_store() {
314        let td = TestData::new();
315        let store = make_store();
316        let _shutdown = subscribe_store(&td, &store);
317
318        td.update(FlagBuilder::new("my-flag").variation_for_all(true));
319
320        let s = store.read();
321        let flag = s.flag("my-flag").unwrap();
322        assert_eq!(flag.key, "my-flag");
323    }
324
325    #[tokio::test]
326    async fn subscribe_initializes_store_with_all_current_data() {
327        let td = TestData::new();
328        td.update(FlagBuilder::new("flag-1").variation_for_all(true));
329        td.update(FlagBuilder::new("flag-2").variation_for_all(false));
330
331        let store = make_store();
332        let _shutdown = subscribe_store(&td, &store);
333
334        let s = store.read();
335        assert!(s.flag("flag-1").is_some());
336        assert!(s.flag("flag-2").is_some());
337    }
338
339    #[tokio::test]
340    async fn subscribe_calls_init_complete_true() {
341        let td = TestData::new();
342        let store = make_store();
343
344        let factory: &dyn DataSourceFactory = &td;
345        let endpoints = crate::ServiceEndpointsBuilder::new().build().unwrap();
346        let ds = factory.build(&endpoints, "fake-key", None).unwrap();
347
348        let initialized = Arc::new(AtomicBool::new(false));
349        let init_clone = initialized.clone();
350        let (_shutdown_tx, shutdown_rx) = broadcast::channel(1);
351
352        ds.subscribe(
353            store,
354            Arc::new(move |success| init_clone.store(success, Ordering::SeqCst)),
355            shutdown_rx,
356        );
357
358        assert!(initialized.load(Ordering::SeqCst));
359    }
360
361    #[tokio::test]
362    async fn multiple_stores_receive_updates() {
363        let td = TestData::new();
364        let store1 = make_store();
365        let store2 = make_store();
366        let _shutdown1 = subscribe_store(&td, &store1);
367        let _shutdown2 = subscribe_store(&td, &store2);
368
369        td.update(FlagBuilder::new("shared-flag"));
370
371        assert!(store1.read().flag("shared-flag").is_some());
372        assert!(store2.read().flag("shared-flag").is_some());
373    }
374
375    #[tokio::test]
376    async fn use_preconfigured_flag_propagates() {
377        let td = TestData::new();
378        let store = make_store();
379        let _shutdown = subscribe_store(&td, &store);
380
381        let flag = FlagBuilder::new("preconf").variation_for_all(true).build();
382        td.use_preconfigured_flag(flag);
383
384        let s = store.read();
385        let stored_flag = s.flag("preconf").unwrap();
386        assert_eq!(stored_flag.version, 1);
387    }
388
389    #[tokio::test]
390    async fn use_preconfigured_segment_propagates() {
391        let td = TestData::new();
392        let store = make_store();
393        let _shutdown = subscribe_store(&td, &store);
394
395        let segment: Segment = serde_json::from_str(
396            r#"{
397                "key": "seg-1",
398                "included": ["alice"],
399                "excluded": [],
400                "rules": [],
401                "salt": "salty",
402                "version": 999
403            }"#,
404        )
405        .unwrap();
406
407        td.use_preconfigured_segment(segment);
408
409        let s = store.read();
410        let stored = s.segment("seg-1").unwrap();
411        assert_eq!(stored.version, 1); // version auto-managed, not 999
412    }
413
414    #[test]
415    fn version_counters_are_independent_per_flag() {
416        let td = TestData::new();
417        td.update(FlagBuilder::new("a"));
418        td.update(FlagBuilder::new("a"));
419        td.update(FlagBuilder::new("b"));
420
421        let inner = td.inner.lock();
422        assert_eq!(inner.current_flags.get("a").unwrap().version, 2);
423        assert_eq!(inner.current_flags.get("b").unwrap().version, 1);
424    }
425
426    #[tokio::test]
427    async fn data_source_factory_build_returns_working_data_source() {
428        let td = TestData::new();
429        td.update(FlagBuilder::new("factory-flag"));
430
431        let factory: &dyn DataSourceFactory = &td;
432        let endpoints = crate::ServiceEndpointsBuilder::new().build().unwrap();
433        let ds = factory.build(&endpoints, "key", None).unwrap();
434
435        let store = make_store();
436        let (_tx, rx) = broadcast::channel(1);
437        ds.subscribe(store.clone(), Arc::new(|_| {}), rx);
438
439        assert!(store.read().flag("factory-flag").is_some());
440    }
441
442    #[tokio::test]
443    async fn data_source_factory_to_owned_shares_state() {
444        let td = TestData::new();
445        let owned = DataSourceFactory::to_owned(&td);
446
447        td.update(FlagBuilder::new("shared-state"));
448
449        let endpoints = crate::ServiceEndpointsBuilder::new().build().unwrap();
450        let ds = owned.build(&endpoints, "key", None).unwrap();
451
452        let store = make_store();
453        let (_tx, rx) = broadcast::channel(1);
454        ds.subscribe(store.clone(), Arc::new(|_| {}), rx);
455
456        assert!(store.read().flag("shared-state").is_some());
457    }
458
459    #[tokio::test]
460    async fn shutdown_unregisters_store() {
461        let td = TestData::new();
462        let store = make_store();
463        let shutdown_tx = subscribe_store(&td, &store);
464
465        assert_eq!(td.inner.lock().instances.len(), 1);
466
467        let _ = shutdown_tx.send(());
468        // Give the spawned task time to run
469        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
470
471        assert_eq!(td.inner.lock().instances.len(), 0);
472    }
473}