1use eval::Context;
2use parking_lot::RwLock;
3use std::io;
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7use tokio::runtime::Runtime;
8
9use launchdarkly_server_sdk_evaluation::{self as eval, Detail, FlagValue, PrerequisiteEvent};
10use serde::Serialize;
11use thiserror::Error;
12use tokio::sync::{broadcast, Semaphore};
13
14use super::config::Config;
15use super::data_source::DataSource;
16use super::data_source_builders::BuildError as DataSourceError;
17use super::evaluation::{FlagDetail, FlagDetailConfig};
18use super::stores::store::DataStore;
19use super::stores::store_builders::BuildError as DataStoreError;
20use crate::config::BuildError as ConfigBuildError;
21use crate::events::event::EventFactory;
22use crate::events::event::InputEvent;
23use crate::events::processor::EventProcessor;
24use crate::events::processor_builders::BuildError as EventProcessorError;
25use crate::{MigrationOpTracker, Stage};
26
27struct EventsScope {
28 disabled: bool,
29 event_factory: EventFactory,
30 prerequisite_event_recorder: Box<dyn eval::PrerequisiteEventRecorder + Send + Sync>,
31}
32
33struct PrerequisiteEventRecorder {
34 event_factory: EventFactory,
35 event_processor: Arc<dyn EventProcessor>,
36}
37
38impl eval::PrerequisiteEventRecorder for PrerequisiteEventRecorder {
39 fn record(&self, event: PrerequisiteEvent) {
40 let evt = self.event_factory.new_eval_event(
41 &event.prerequisite_flag.key,
42 event.context.clone(),
43 &event.prerequisite_flag,
44 event.prerequisite_result,
45 FlagValue::Json(serde_json::Value::Null),
46 Some(event.target_flag_key),
47 );
48
49 self.event_processor.send(evt);
50 }
51}
52
53#[non_exhaustive]
55#[derive(Debug, Error)]
56pub enum BuildError {
57 #[error("invalid client config: {0}")]
59 InvalidConfig(String),
60}
61
62impl From<DataSourceError> for BuildError {
63 fn from(error: DataSourceError) -> Self {
64 Self::InvalidConfig(error.to_string())
65 }
66}
67
68impl From<DataStoreError> for BuildError {
69 fn from(error: DataStoreError) -> Self {
70 Self::InvalidConfig(error.to_string())
71 }
72}
73
74impl From<EventProcessorError> for BuildError {
75 fn from(error: EventProcessorError) -> Self {
76 Self::InvalidConfig(error.to_string())
77 }
78}
79
80impl From<ConfigBuildError> for BuildError {
81 fn from(error: ConfigBuildError) -> Self {
82 Self::InvalidConfig(error.to_string())
83 }
84}
85
86#[non_exhaustive]
88#[derive(Debug, Error)]
89pub enum StartError {
90 #[error("couldn't spawn background thread for client: {0}")]
92 SpawnFailed(io::Error),
93}
94
95#[derive(PartialEq, Copy, Clone, Debug)]
96enum ClientInitState {
97 Initializing = 0,
98 Initialized = 1,
99 InitializationFailed = 2,
100}
101
102impl PartialEq<usize> for ClientInitState {
103 fn eq(&self, other: &usize) -> bool {
104 *self as usize == *other
105 }
106}
107
108impl From<usize> for ClientInitState {
109 fn from(val: usize) -> Self {
110 match val {
111 0 => ClientInitState::Initializing,
112 1 => ClientInitState::Initialized,
113 2 => ClientInitState::InitializationFailed,
114 _ => unreachable!(),
115 }
116 }
117}
118
119pub struct Client {
149 event_processor: Arc<dyn EventProcessor>,
150 data_source: Arc<dyn DataSource>,
151 data_store: Arc<RwLock<dyn DataStore>>,
152 events_default: EventsScope,
153 events_with_reasons: EventsScope,
154 init_notify: Arc<Semaphore>,
155 init_state: Arc<AtomicUsize>,
156 started: AtomicBool,
157 offline: bool,
158 daemon_mode: bool,
159 sdk_key: String,
160 shutdown_broadcast: broadcast::Sender<()>,
161 runtime: RwLock<Option<Runtime>>,
162}
163
164impl Client {
165 pub fn build(config: Config) -> Result<Self, BuildError> {
167 if config.offline() {
168 info!("Started LaunchDarkly Client in offline mode");
169 } else if config.daemon_mode() {
170 info!("Started LaunchDarkly Client in daemon mode");
171 }
172
173 let tags = config.application_tag();
174
175 let endpoints = config.service_endpoints_builder().build()?;
176 let event_processor =
177 config
178 .event_processor_builder()
179 .build(&endpoints, config.sdk_key(), tags.clone())?;
180 let data_source =
181 config
182 .data_source_builder()
183 .build(&endpoints, config.sdk_key(), tags.clone())?;
184 let data_store = config.data_store_builder().build()?;
185
186 let events_default = EventsScope {
187 disabled: config.offline(),
188 event_factory: EventFactory::new(false),
189 prerequisite_event_recorder: Box::new(PrerequisiteEventRecorder {
190 event_factory: EventFactory::new(false),
191 event_processor: event_processor.clone(),
192 }),
193 };
194
195 let events_with_reasons = EventsScope {
196 disabled: config.offline(),
197 event_factory: EventFactory::new(true),
198 prerequisite_event_recorder: Box::new(PrerequisiteEventRecorder {
199 event_factory: EventFactory::new(true),
200 event_processor: event_processor.clone(),
201 }),
202 };
203
204 let (shutdown_tx, _) = broadcast::channel(1);
205
206 Ok(Client {
207 event_processor,
208 data_source,
209 data_store,
210 events_default,
211 events_with_reasons,
212 init_notify: Arc::new(Semaphore::new(0)),
213 init_state: Arc::new(AtomicUsize::new(ClientInitState::Initializing as usize)),
214 started: AtomicBool::new(false),
215 offline: config.offline(),
216 daemon_mode: config.daemon_mode(),
217 sdk_key: config.sdk_key().into(),
218 shutdown_broadcast: shutdown_tx,
219 runtime: RwLock::new(None),
220 })
221 }
222
223 pub fn start_with_default_executor(&self) {
225 if self.started.load(Ordering::SeqCst) {
226 return;
227 }
228 self.started.store(true, Ordering::SeqCst);
229 self.start_with_default_executor_internal();
230 }
231
232 fn start_with_default_executor_internal(&self) {
233 let notify = self.init_notify.clone();
237 let init_state = self.init_state.clone();
238
239 self.data_source.subscribe(
240 self.data_store.clone(),
241 Arc::new(move |success| {
242 init_state.store(
243 (if success {
244 ClientInitState::Initialized
245 } else {
246 ClientInitState::InitializationFailed
247 }) as usize,
248 Ordering::SeqCst,
249 );
250 notify.add_permits(1);
251 }),
252 self.shutdown_broadcast.subscribe(),
253 );
254 }
255
256 pub fn start_with_runtime(&self) -> Result<bool, StartError> {
262 if self.started.load(Ordering::SeqCst) {
263 return Ok(true);
264 }
265 self.started.store(true, Ordering::SeqCst);
266
267 let runtime = Runtime::new().map_err(StartError::SpawnFailed)?;
268 let _guard = runtime.enter();
269 self.runtime.write().replace(runtime);
270
271 self.start_with_default_executor_internal();
272
273 Ok(true)
274 }
275
276 #[deprecated(
280 note = "blocking without a timeout is discouraged, use wait_for_initialization instead"
281 )]
282 pub async fn initialized_async(&self) -> bool {
283 self.initialized_async_internal().await
284 }
285
286 pub async fn wait_for_initialization(&self, timeout: Duration) -> Option<bool> {
292 if timeout > Duration::from_secs(60) {
293 warn!("wait_for_initialization was configured to block for up to {} seconds. We recommend blocking no longer than 60 seconds.", timeout.as_secs());
294 }
295
296 let initialized = tokio::time::timeout(timeout, self.initialized_async_internal()).await;
297 initialized.ok()
298 }
299
300 async fn initialized_async_internal(&self) -> bool {
301 if self.offline || self.daemon_mode {
302 return true;
303 }
304
305 if ClientInitState::Initialized != self.init_state.load(Ordering::SeqCst) {
311 let _permit = self.init_notify.acquire().await;
312 }
313 ClientInitState::Initialized == self.init_state.load(Ordering::SeqCst)
314 }
315
316 pub fn initialized(&self) -> bool {
320 self.offline
321 || self.daemon_mode
322 || ClientInitState::Initialized == self.init_state.load(Ordering::SeqCst)
323 }
324
325 pub fn close(&self) {
329 self.event_processor.close();
330
331 if !self.offline && !self.daemon_mode {
334 if let Err(e) = self.shutdown_broadcast.send(()) {
335 error!("Failed to shutdown client appropriately: {e}");
336 }
337 }
338
339 self.runtime.write().take();
342 }
343
344 pub fn flush(&self) {
352 self.event_processor.flush();
353 }
354
355 pub fn identify(&self, context: Context) {
360 if self.events_default.disabled {
361 return;
362 }
363
364 self.send_internal(self.events_default.event_factory.new_identify(context));
365 }
366
367 pub fn bool_variation(&self, context: &Context, flag_key: &str, default: bool) -> bool {
375 let val = self.variation(context, flag_key, default);
376 if let Some(b) = val.as_bool() {
377 b
378 } else {
379 warn!("bool_variation called for a non-bool flag {flag_key:?} (got {val:?})");
380 default
381 }
382 }
383
384 pub fn str_variation(&self, context: &Context, flag_key: &str, default: String) -> String {
392 let val = self.variation(context, flag_key, default.clone());
393 if let Some(s) = val.as_string() {
394 s
395 } else {
396 warn!("str_variation called for a non-string flag {flag_key:?} (got {val:?})");
397 default
398 }
399 }
400
401 pub fn float_variation(&self, context: &Context, flag_key: &str, default: f64) -> f64 {
409 let val = self.variation(context, flag_key, default);
410 if let Some(f) = val.as_float() {
411 f
412 } else {
413 warn!("float_variation called for a non-float flag {flag_key:?} (got {val:?})");
414 default
415 }
416 }
417
418 pub fn int_variation(&self, context: &Context, flag_key: &str, default: i64) -> i64 {
426 let val = self.variation(context, flag_key, default);
427 if let Some(f) = val.as_int() {
428 f
429 } else {
430 warn!("int_variation called for a non-int flag {flag_key:?} (got {val:?})");
431 default
432 }
433 }
434
435 pub fn json_variation(
445 &self,
446 context: &Context,
447 flag_key: &str,
448 default: serde_json::Value,
449 ) -> serde_json::Value {
450 self.variation(context, flag_key, default.clone())
451 .as_json()
452 .unwrap_or(default)
453 }
454
455 pub fn bool_variation_detail(
462 &self,
463 context: &Context,
464 flag_key: &str,
465 default: bool,
466 ) -> Detail<bool> {
467 self.variation_detail(context, flag_key, default).try_map(
468 |val| val.as_bool(),
469 default,
470 eval::Error::WrongType,
471 )
472 }
473
474 pub fn str_variation_detail(
481 &self,
482 context: &Context,
483 flag_key: &str,
484 default: String,
485 ) -> Detail<String> {
486 self.variation_detail(context, flag_key, default.clone())
487 .try_map(|val| val.as_string(), default, eval::Error::WrongType)
488 }
489
490 pub fn float_variation_detail(
497 &self,
498 context: &Context,
499 flag_key: &str,
500 default: f64,
501 ) -> Detail<f64> {
502 self.variation_detail(context, flag_key, default).try_map(
503 |val| val.as_float(),
504 default,
505 eval::Error::WrongType,
506 )
507 }
508
509 pub fn int_variation_detail(
516 &self,
517 context: &Context,
518 flag_key: &str,
519 default: i64,
520 ) -> Detail<i64> {
521 self.variation_detail(context, flag_key, default).try_map(
522 |val| val.as_int(),
523 default,
524 eval::Error::WrongType,
525 )
526 }
527
528 pub fn json_variation_detail(
535 &self,
536 context: &Context,
537 flag_key: &str,
538 default: serde_json::Value,
539 ) -> Detail<serde_json::Value> {
540 self.variation_detail(context, flag_key, default.clone())
541 .try_map(|val| val.as_json(), default, eval::Error::WrongType)
542 }
543
544 pub fn secure_mode_hash(&self, context: &Context) -> String {
549 let key = aws_lc_rs::hmac::Key::new(aws_lc_rs::hmac::HMAC_SHA256, self.sdk_key.as_bytes());
550 let tag = aws_lc_rs::hmac::sign(&key, context.canonical_key().as_bytes());
551
552 data_encoding::HEXLOWER.encode(tag.as_ref())
553 }
554
555 pub fn all_flags_detail(
566 &self,
567 context: &Context,
568 flag_state_config: FlagDetailConfig,
569 ) -> FlagDetail {
570 if self.offline {
571 warn!(
572 "all_flags_detail() called, but client is in offline mode. Returning empty state"
573 );
574 return FlagDetail::new(false);
575 }
576
577 if !self.initialized() {
578 warn!("all_flags_detail() called before client has finished initializing! Feature store unavailable - returning empty state");
579 return FlagDetail::new(false);
580 }
581
582 let data_store = self.data_store.read();
583
584 let mut flag_detail = FlagDetail::new(true);
585 flag_detail.populate(&*data_store, context, flag_state_config);
586
587 flag_detail
588 }
589
590 pub fn variation_detail<T: Into<FlagValue> + Clone>(
596 &self,
597 context: &Context,
598 flag_key: &str,
599 default: T,
600 ) -> Detail<FlagValue> {
601 let (detail, _) =
602 self.variation_internal(context, flag_key, default, &self.events_with_reasons);
603 detail
604 }
605
606 pub fn variation<T: Into<FlagValue> + Clone>(
617 &self,
618 context: &Context,
619 flag_key: &str,
620 default: T,
621 ) -> FlagValue {
622 let (detail, _) = self.variation_internal(context, flag_key, default, &self.events_default);
623 detail.value.unwrap()
624 }
625
626 pub fn migration_variation(
631 &self,
632 context: &Context,
633 flag_key: &str,
634 default_stage: Stage,
635 ) -> (Stage, Arc<Mutex<MigrationOpTracker>>) {
636 let (detail, flag) =
637 self.variation_internal(context, flag_key, default_stage, &self.events_default);
638
639 let migration_detail =
640 detail.try_map(|v| v.try_into().ok(), default_stage, eval::Error::WrongType);
641 let tracker = MigrationOpTracker::new(
642 flag_key.into(),
643 flag,
644 context.clone(),
645 migration_detail.clone(),
646 default_stage,
647 );
648
649 (
650 migration_detail.value.unwrap_or(default_stage),
651 Arc::new(Mutex::new(tracker)),
652 )
653 }
654
655 pub fn track_event(&self, context: Context, key: impl Into<String>) {
665 let _ = self.track(context, key, None, serde_json::Value::Null);
666 }
667
668 pub fn track_data(
681 &self,
682 context: Context,
683 key: impl Into<String>,
684 data: impl Serialize,
685 ) -> serde_json::Result<()> {
686 self.track(context, key, None, data)
687 }
688
689 pub fn track_metric(
700 &self,
701 context: Context,
702 key: impl Into<String>,
703 value: f64,
704 data: impl Serialize,
705 ) {
706 let _ = self.track(context, key, Some(value), data);
707 }
708
709 fn track(
710 &self,
711 context: Context,
712 key: impl Into<String>,
713 metric_value: Option<f64>,
714 data: impl Serialize,
715 ) -> serde_json::Result<()> {
716 if !self.events_default.disabled {
717 let event =
718 self.events_default
719 .event_factory
720 .new_custom(context, key, metric_value, data)?;
721
722 self.send_internal(event);
723 }
724
725 Ok(())
726 }
727
728 pub fn track_migration_op(&self, tracker: Arc<Mutex<MigrationOpTracker>>) {
735 if self.events_default.disabled {
736 return;
737 }
738
739 match tracker.lock() {
740 Ok(tracker) => {
741 let event = tracker.build();
742 match event {
743 Ok(event) => {
744 self.send_internal(
745 self.events_default.event_factory.new_migration_op(event),
746 );
747 }
748 Err(e) => error!("Failed to build migration event, no event will be sent: {e}"),
749 }
750 }
751 Err(e) => error!("Failed to lock migration tracker, no event will be sent: {e}"),
752 }
753 }
754
755 fn variation_internal<T: Into<FlagValue> + Clone>(
756 &self,
757 context: &Context,
758 flag_key: &str,
759 default: T,
760 events_scope: &EventsScope,
761 ) -> (Detail<FlagValue>, Option<eval::Flag>) {
762 if self.offline {
763 return (
764 Detail::err_default(eval::Error::ClientNotReady, default.into()),
765 None,
766 );
767 }
768
769 let (flag, result) = match self.initialized() {
770 false => (
771 None,
772 Detail::err_default(eval::Error::ClientNotReady, default.clone().into()),
773 ),
774 true => {
775 let data_store = self.data_store.read();
776 match data_store.flag(flag_key) {
777 Some(flag) => {
778 let result = eval::evaluate(
779 data_store.to_store(),
780 &flag,
781 context,
782 Some(&*events_scope.prerequisite_event_recorder),
783 )
784 .map(|v| v.clone())
785 .or(default.clone().into());
786
787 (Some(flag), result)
788 }
789 None => (
790 None,
791 Detail::err_default(eval::Error::FlagNotFound, default.clone().into()),
792 ),
793 }
794 }
795 };
796
797 if !events_scope.disabled {
798 let event = match &flag {
799 Some(f) => events_scope.event_factory.new_eval_event(
800 flag_key,
801 context.clone(),
802 f,
803 result.clone(),
804 default.into(),
805 None,
806 ),
807 None => events_scope.event_factory.new_unknown_flag_event(
808 flag_key,
809 context.clone(),
810 result.clone(),
811 default.into(),
812 ),
813 };
814 self.send_internal(event);
815 }
816
817 (result, flag)
818 }
819
820 fn send_internal(&self, event: InputEvent) {
821 self.event_processor.send(event);
822 }
823}
824
825#[cfg(test)]
826mod tests {
827 use assert_json_diff::assert_json_eq;
828 use crossbeam_channel::Receiver;
829 use eval::{ContextBuilder, MultiContextBuilder};
830 use futures::FutureExt;
831 use hyper::client::HttpConnector;
832 use launchdarkly_server_sdk_evaluation::{Flag, Reason, Segment};
833 use maplit::hashmap;
834 use std::collections::HashMap;
835 use tokio::time::Instant;
836
837 use crate::data_source::MockDataSource;
838 use crate::data_source_builders::MockDataSourceBuilder;
839 use crate::events::create_event_sender;
840 use crate::events::event::{OutputEvent, VariationKey};
841 use crate::events::processor_builders::EventProcessorBuilder;
842 use crate::stores::persistent_store::tests::InMemoryPersistentDataStore;
843 use crate::stores::store_types::{PatchTarget, StorageItem};
844 use crate::test_common::{
845 self, basic_flag, basic_flag_with_prereq, basic_flag_with_prereqs_and_visibility,
846 basic_flag_with_visibility, basic_int_flag, basic_migration_flag, basic_off_flag,
847 };
848 use crate::{
849 AllData, ConfigBuilder, MigratorBuilder, NullEventProcessorBuilder, Operation, Origin,
850 PersistentDataStore, PersistentDataStoreBuilder, PersistentDataStoreFactory,
851 SerializedItem,
852 };
853 use test_case::test_case;
854
855 use super::*;
856
857 fn is_send_and_sync<T: Send + Sync>() {}
858
859 #[test]
860 fn ensure_client_is_send_and_sync() {
861 is_send_and_sync::<Client>()
862 }
863
864 #[tokio::test]
865 async fn client_asynchronously_initializes() {
866 let (client, _event_rx) = make_mocked_client_with_delay(1000, false, false);
867 client.start_with_default_executor();
868
869 let now = Instant::now();
870 let initialized = client.initialized_async().await;
871 let elapsed_time = now.elapsed();
872 assert!(initialized);
873 assert!(elapsed_time.as_millis() > 500)
875 }
876
877 #[tokio::test]
878 async fn client_asynchronously_initializes_within_timeout() {
879 let (client, _event_rx) = make_mocked_client_with_delay(1000, false, false);
880 client.start_with_default_executor();
881
882 let now = Instant::now();
883 let initialized = client
884 .wait_for_initialization(Duration::from_millis(1500))
885 .await;
886 let elapsed_time = now.elapsed();
887 assert!(elapsed_time.as_millis() > 500);
889 assert_eq!(initialized, Some(true));
890 }
891
892 #[tokio::test]
893 async fn client_asynchronously_initializes_slower_than_timeout() {
894 let (client, _event_rx) = make_mocked_client_with_delay(2000, false, false);
895 client.start_with_default_executor();
896
897 let now = Instant::now();
898 let initialized = client
899 .wait_for_initialization(Duration::from_millis(500))
900 .await;
901 let elapsed_time = now.elapsed();
902 assert!(elapsed_time.as_millis() < 750);
904 assert!(initialized.is_none());
905 }
906
907 #[tokio::test]
908 async fn client_initializes_immediately_in_offline_mode() {
909 let (client, _event_rx) = make_mocked_client_with_delay(1000, true, false);
910 client.start_with_default_executor();
911
912 assert!(client.initialized());
913
914 let now = Instant::now();
915 let initialized = client
916 .wait_for_initialization(Duration::from_millis(2000))
917 .await;
918 let elapsed_time = now.elapsed();
919 assert_eq!(initialized, Some(true));
920 assert!(elapsed_time.as_millis() < 500)
921 }
922
923 #[tokio::test]
924 async fn client_initializes_immediately_in_daemon_mode() {
925 let (client, _event_rx) = make_mocked_client_with_delay(1000, false, true);
926 client.start_with_default_executor();
927
928 assert!(client.initialized());
929
930 let now = Instant::now();
931 let initialized = client
932 .wait_for_initialization(Duration::from_millis(2000))
933 .await;
934 let elapsed_time = now.elapsed();
935 assert_eq!(initialized, Some(true));
936 assert!(elapsed_time.as_millis() < 500)
937 }
938
939 #[test_case(basic_flag("myFlag"), false.into(), true.into())]
940 #[test_case(basic_int_flag("myFlag"), 0.into(), test_common::FLOAT_TO_INT_MAX.into())]
941 fn client_updates_changes_evaluation_results(
942 flag: eval::Flag,
943 default: FlagValue,
944 expected: FlagValue,
945 ) {
946 let context = ContextBuilder::new("foo")
947 .build()
948 .expect("Failed to create context");
949
950 let (client, _event_rx) = make_mocked_client();
951
952 let result = client.variation_detail(&context, "myFlag", default.clone());
953 assert_eq!(result.value.unwrap(), default);
954
955 client.start_with_default_executor();
956 client
957 .data_store
958 .write()
959 .upsert(
960 &flag.key,
961 PatchTarget::Flag(StorageItem::Item(flag.clone())),
962 )
963 .expect("patch should apply");
964
965 let result = client.variation_detail(&context, "myFlag", default);
966 assert_eq!(result.value.unwrap(), expected);
967 assert!(matches!(
968 result.reason,
969 Reason::Fallthrough {
970 in_experiment: false
971 }
972 ));
973 }
974
975 #[test]
976 fn all_flags_detail_is_invalid_when_offline() {
977 let (client, _event_rx) = make_mocked_offline_client();
978 client.start_with_default_executor();
979
980 let context = ContextBuilder::new("bob")
981 .build()
982 .expect("Failed to create context");
983
984 let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
985 assert_json_eq!(all_flags, json!({"$valid": false, "$flagsState" : {}}));
986 }
987
988 #[test]
989 fn all_flags_detail_is_invalid_when_not_initialized() {
990 let (client, _event_rx) = make_mocked_client();
991
992 let context = ContextBuilder::new("bob")
993 .build()
994 .expect("Failed to create context");
995
996 let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
997 assert_json_eq!(all_flags, json!({"$valid": false, "$flagsState" : {}}));
998 }
999
1000 #[test]
1001 fn all_flags_detail_returns_flag_states() {
1002 let (client, _event_rx) = make_mocked_client();
1003 client.start_with_default_executor();
1004 client
1005 .data_store
1006 .write()
1007 .upsert(
1008 "myFlag1",
1009 PatchTarget::Flag(StorageItem::Item(basic_flag("myFlag1"))),
1010 )
1011 .expect("patch should apply");
1012 client
1013 .data_store
1014 .write()
1015 .upsert(
1016 "myFlag2",
1017 PatchTarget::Flag(StorageItem::Item(basic_flag("myFlag2"))),
1018 )
1019 .expect("patch should apply");
1020 let context = ContextBuilder::new("bob")
1021 .build()
1022 .expect("Failed to create context");
1023
1024 let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1025
1026 client.close();
1027
1028 assert_json_eq!(
1029 all_flags,
1030 json!({
1031 "myFlag1": true,
1032 "myFlag2": true,
1033 "$flagsState": {
1034 "myFlag1": {
1035 "version": 42,
1036 "variation": 1
1037 },
1038 "myFlag2": {
1039 "version": 42,
1040 "variation": 1
1041 },
1042 },
1043 "$valid": true
1044 })
1045 );
1046 }
1047
1048 #[test]
1049 fn all_flags_detail_returns_prerequisite_relations() {
1050 let (client, _event_rx) = make_mocked_client();
1051 client.start_with_default_executor();
1052 client
1053 .data_store
1054 .write()
1055 .upsert(
1056 "prereq1",
1057 PatchTarget::Flag(StorageItem::Item(basic_flag("prereq1"))),
1058 )
1059 .expect("patch should apply");
1060 client
1061 .data_store
1062 .write()
1063 .upsert(
1064 "prereq2",
1065 PatchTarget::Flag(StorageItem::Item(basic_flag("prereq2"))),
1066 )
1067 .expect("patch should apply");
1068
1069 client
1070 .data_store
1071 .write()
1072 .upsert(
1073 "toplevel",
1074 PatchTarget::Flag(StorageItem::Item(basic_flag_with_prereqs_and_visibility(
1075 "toplevel",
1076 &["prereq1", "prereq2"],
1077 false,
1078 ))),
1079 )
1080 .expect("patch should apply");
1081
1082 let context = ContextBuilder::new("bob")
1083 .build()
1084 .expect("Failed to create context");
1085
1086 let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1087
1088 client.close();
1089
1090 assert_json_eq!(
1091 all_flags,
1092 json!({
1093 "prereq1": true,
1094 "prereq2": true,
1095 "toplevel": true,
1096 "$flagsState": {
1097 "toplevel": {
1098 "version": 42,
1099 "variation": 1,
1100 "prerequisites": ["prereq1", "prereq2"]
1101 },
1102 "prereq1": {
1103 "version": 42,
1104 "variation": 1
1105 },
1106 "prereq2": {
1107 "version": 42,
1108 "variation": 1
1109 },
1110 },
1111 "$valid": true
1112 })
1113 );
1114 }
1115
1116 #[test]
1117 fn all_flags_detail_returns_prerequisite_relations_when_not_visible_to_clients() {
1118 let (client, _event_rx) = make_mocked_client();
1119 client.start_with_default_executor();
1120 client
1121 .data_store
1122 .write()
1123 .upsert(
1124 "prereq1",
1125 PatchTarget::Flag(StorageItem::Item(basic_flag_with_visibility(
1126 "prereq1", false,
1127 ))),
1128 )
1129 .expect("patch should apply");
1130 client
1131 .data_store
1132 .write()
1133 .upsert(
1134 "prereq2",
1135 PatchTarget::Flag(StorageItem::Item(basic_flag_with_visibility(
1136 "prereq2", false,
1137 ))),
1138 )
1139 .expect("patch should apply");
1140
1141 client
1142 .data_store
1143 .write()
1144 .upsert(
1145 "toplevel",
1146 PatchTarget::Flag(StorageItem::Item(basic_flag_with_prereqs_and_visibility(
1147 "toplevel",
1148 &["prereq1", "prereq2"],
1149 true,
1150 ))),
1151 )
1152 .expect("patch should apply");
1153
1154 let context = ContextBuilder::new("bob")
1155 .build()
1156 .expect("Failed to create context");
1157
1158 let mut config = FlagDetailConfig::new();
1159 config.client_side_only();
1160
1161 let all_flags = client.all_flags_detail(&context, config);
1162
1163 client.close();
1164
1165 assert_json_eq!(
1166 all_flags,
1167 json!({
1168 "toplevel": true,
1169 "$flagsState": {
1170 "toplevel": {
1171 "version": 42,
1172 "variation": 1,
1173 "prerequisites": ["prereq1", "prereq2"]
1174 },
1175 },
1176 "$valid": true
1177 })
1178 );
1179 }
1180
1181 #[test]
1182 fn variation_tracks_events_correctly() {
1183 let (client, event_rx) = make_mocked_client();
1184 client.start_with_default_executor();
1185 client
1186 .data_store
1187 .write()
1188 .upsert(
1189 "myFlag",
1190 PatchTarget::Flag(StorageItem::Item(basic_flag("myFlag"))),
1191 )
1192 .expect("patch should apply");
1193 let context = ContextBuilder::new("bob")
1194 .build()
1195 .expect("Failed to create context");
1196
1197 let flag_value = client.variation(&context, "myFlag", FlagValue::Bool(false));
1198
1199 assert!(flag_value.as_bool().unwrap());
1200 client.flush();
1201 client.close();
1202
1203 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1204 assert_eq!(events.len(), 2);
1205 assert_eq!(events[0].kind(), "index");
1206 assert_eq!(events[1].kind(), "summary");
1207
1208 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1209 let variation_key = VariationKey {
1210 version: Some(42),
1211 variation: Some(1),
1212 };
1213 let feature = event_summary.features.get("myFlag");
1214 assert!(feature.is_some());
1215
1216 let feature = feature.unwrap();
1217 assert!(feature.counters.contains_key(&variation_key));
1218 } else {
1219 panic!("Event should be a summary type");
1220 }
1221 }
1222
1223 #[test]
1224 fn variation_handles_offline_mode() {
1225 let (client, event_rx) = make_mocked_offline_client();
1226 client.start_with_default_executor();
1227
1228 let context = ContextBuilder::new("bob")
1229 .build()
1230 .expect("Failed to create context");
1231 let flag_value = client.variation(&context, "myFlag", FlagValue::Bool(false));
1232
1233 assert!(!flag_value.as_bool().unwrap());
1234 client.flush();
1235 client.close();
1236
1237 assert_eq!(event_rx.iter().count(), 0);
1238 }
1239
1240 #[test]
1241 fn variation_handles_unknown_flags() {
1242 let (client, event_rx) = make_mocked_client();
1243 client.start_with_default_executor();
1244 let context = ContextBuilder::new("bob")
1245 .build()
1246 .expect("Failed to create context");
1247
1248 let flag_value = client.variation(&context, "non-existent-flag", FlagValue::Bool(false));
1249
1250 assert!(!flag_value.as_bool().unwrap());
1251 client.flush();
1252 client.close();
1253
1254 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1255 assert_eq!(events.len(), 2);
1256 assert_eq!(events[0].kind(), "index");
1257 assert_eq!(events[1].kind(), "summary");
1258
1259 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1260 let variation_key = VariationKey {
1261 version: None,
1262 variation: None,
1263 };
1264
1265 let feature = event_summary.features.get("non-existent-flag");
1266 assert!(feature.is_some());
1267
1268 let feature = feature.unwrap();
1269 assert!(feature.counters.contains_key(&variation_key));
1270 } else {
1271 panic!("Event should be a summary type");
1272 }
1273 }
1274
1275 #[test]
1276 fn variation_detail_handles_debug_events_correctly() {
1277 let (client, event_rx) = make_mocked_client();
1278 client.start_with_default_executor();
1279
1280 let mut flag = basic_flag("myFlag");
1281 flag.debug_events_until_date = Some(64_060_606_800_000); client
1284 .data_store
1285 .write()
1286 .upsert(
1287 &flag.key,
1288 PatchTarget::Flag(StorageItem::Item(flag.clone())),
1289 )
1290 .expect("patch should apply");
1291 let context = ContextBuilder::new("bob")
1292 .build()
1293 .expect("Failed to create context");
1294
1295 let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1296
1297 assert!(detail.value.unwrap().as_bool().unwrap());
1298 assert!(matches!(
1299 detail.reason,
1300 Reason::Fallthrough {
1301 in_experiment: false
1302 }
1303 ));
1304 client.flush();
1305 client.close();
1306
1307 let events = event_rx.try_iter().collect::<Vec<OutputEvent>>();
1308 assert_eq!(events.len(), 3);
1309 assert_eq!(events[0].kind(), "index");
1310 assert_eq!(events[1].kind(), "debug");
1311 assert_eq!(events[2].kind(), "summary");
1312
1313 if let OutputEvent::Summary(event_summary) = events[2].clone() {
1314 let variation_key = VariationKey {
1315 version: Some(42),
1316 variation: Some(1),
1317 };
1318
1319 let feature = event_summary.features.get("myFlag");
1320 assert!(feature.is_some());
1321
1322 let feature = feature.unwrap();
1323 assert!(feature.counters.contains_key(&variation_key));
1324 } else {
1325 panic!("Event should be a summary type");
1326 }
1327 }
1328
1329 #[test]
1330 fn variation_detail_tracks_events_correctly() {
1331 let (client, event_rx) = make_mocked_client();
1332 client.start_with_default_executor();
1333
1334 client
1335 .data_store
1336 .write()
1337 .upsert(
1338 "myFlag",
1339 PatchTarget::Flag(StorageItem::Item(basic_flag("myFlag"))),
1340 )
1341 .expect("patch should apply");
1342 let context = ContextBuilder::new("bob")
1343 .build()
1344 .expect("Failed to create context");
1345
1346 let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1347
1348 assert!(detail.value.unwrap().as_bool().unwrap());
1349 assert!(matches!(
1350 detail.reason,
1351 Reason::Fallthrough {
1352 in_experiment: false
1353 }
1354 ));
1355 client.flush();
1356 client.close();
1357
1358 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1359 assert_eq!(events.len(), 2);
1360 assert_eq!(events[0].kind(), "index");
1361 assert_eq!(events[1].kind(), "summary");
1362
1363 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1364 let variation_key = VariationKey {
1365 version: Some(42),
1366 variation: Some(1),
1367 };
1368
1369 let feature = event_summary.features.get("myFlag");
1370 assert!(feature.is_some());
1371
1372 let feature = feature.unwrap();
1373 assert!(feature.counters.contains_key(&variation_key));
1374 } else {
1375 panic!("Event should be a summary type");
1376 }
1377 }
1378
1379 #[test]
1380 fn variation_detail_handles_offline_mode() {
1381 let (client, event_rx) = make_mocked_offline_client();
1382 client.start_with_default_executor();
1383
1384 let context = ContextBuilder::new("bob")
1385 .build()
1386 .expect("Failed to create context");
1387
1388 let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1389
1390 assert!(!detail.value.unwrap().as_bool().unwrap());
1391 assert!(matches!(
1392 detail.reason,
1393 Reason::Error {
1394 error: eval::Error::ClientNotReady
1395 }
1396 ));
1397 client.flush();
1398 client.close();
1399
1400 assert_eq!(event_rx.iter().count(), 0);
1401 }
1402
1403 struct InMemoryPersistentDataStoreFactory {
1404 data: AllData<Flag, Segment>,
1405 initialized: bool,
1406 }
1407
1408 impl PersistentDataStoreFactory for InMemoryPersistentDataStoreFactory {
1409 fn create_persistent_data_store(
1410 &self,
1411 ) -> Result<Box<(dyn PersistentDataStore + 'static)>, std::io::Error> {
1412 let serialized_data =
1413 AllData::<SerializedItem, SerializedItem>::try_from(self.data.clone())?;
1414 Ok(Box::new(InMemoryPersistentDataStore {
1415 data: serialized_data,
1416 initialized: self.initialized,
1417 }))
1418 }
1419 }
1420
1421 #[test]
1422 fn variation_detail_handles_daemon_mode() {
1423 testing_logger::setup();
1424 let factory = InMemoryPersistentDataStoreFactory {
1425 data: AllData {
1426 flags: hashmap!["flag".into() => basic_flag("flag")],
1427 segments: HashMap::new(),
1428 },
1429 initialized: true,
1430 };
1431 let builder = PersistentDataStoreBuilder::new(Arc::new(factory));
1432
1433 let config = ConfigBuilder::new("sdk-key")
1434 .daemon_mode(true)
1435 .data_store(&builder)
1436 .event_processor(&NullEventProcessorBuilder::new())
1437 .build()
1438 .expect("config should build");
1439
1440 let client = Client::build(config).expect("Should be built.");
1441
1442 client.start_with_default_executor();
1443
1444 let context = ContextBuilder::new("bob")
1445 .build()
1446 .expect("Failed to create context");
1447
1448 let detail = client.variation_detail(&context, "flag", FlagValue::Bool(false));
1449
1450 assert!(detail.value.unwrap().as_bool().unwrap());
1451 assert!(matches!(
1452 detail.reason,
1453 Reason::Fallthrough {
1454 in_experiment: false
1455 }
1456 ));
1457 client.flush();
1458 client.close();
1459
1460 testing_logger::validate(|captured_logs| {
1461 assert_eq!(captured_logs.len(), 1);
1462 assert_eq!(
1463 captured_logs[0].body,
1464 "Started LaunchDarkly Client in daemon mode"
1465 );
1466 });
1467 }
1468
1469 #[test]
1470 fn daemon_mode_is_quiet_if_store_is_not_initialized() {
1471 testing_logger::setup();
1472
1473 let factory = InMemoryPersistentDataStoreFactory {
1474 data: AllData {
1475 flags: HashMap::new(),
1476 segments: HashMap::new(),
1477 },
1478 initialized: false,
1479 };
1480 let builder = PersistentDataStoreBuilder::new(Arc::new(factory));
1481
1482 let config = ConfigBuilder::new("sdk-key")
1483 .daemon_mode(true)
1484 .data_store(&builder)
1485 .event_processor(&NullEventProcessorBuilder::new())
1486 .build()
1487 .expect("config should build");
1488
1489 let client = Client::build(config).expect("Should be built.");
1490
1491 client.start_with_default_executor();
1492
1493 let context = ContextBuilder::new("bob")
1494 .build()
1495 .expect("Failed to create context");
1496
1497 client.variation_detail(&context, "flag", FlagValue::Bool(false));
1498
1499 testing_logger::validate(|captured_logs| {
1500 assert_eq!(captured_logs.len(), 1);
1501 assert_eq!(
1502 captured_logs[0].body,
1503 "Started LaunchDarkly Client in daemon mode"
1504 );
1505 });
1506 }
1507
1508 #[test]
1509 fn variation_handles_off_flag_without_variation() {
1510 let (client, event_rx) = make_mocked_client();
1511 client.start_with_default_executor();
1512
1513 client
1514 .data_store
1515 .write()
1516 .upsert(
1517 "myFlag",
1518 PatchTarget::Flag(StorageItem::Item(basic_off_flag("myFlag"))),
1519 )
1520 .expect("patch should apply");
1521 let context = ContextBuilder::new("bob")
1522 .build()
1523 .expect("Failed to create context");
1524
1525 let result = client.variation(&context, "myFlag", FlagValue::Bool(false));
1526
1527 assert!(!result.as_bool().unwrap());
1528 client.flush();
1529 client.close();
1530
1531 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1532 assert_eq!(events.len(), 2);
1533 assert_eq!(events[0].kind(), "index");
1534 assert_eq!(events[1].kind(), "summary");
1535
1536 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1537 let variation_key = VariationKey {
1538 version: Some(42),
1539 variation: None,
1540 };
1541 let feature = event_summary.features.get("myFlag");
1542 assert!(feature.is_some());
1543
1544 let feature = feature.unwrap();
1545 assert!(feature.counters.contains_key(&variation_key));
1546 } else {
1547 panic!("Event should be a summary type");
1548 }
1549 }
1550
1551 #[test]
1552 fn variation_detail_tracks_prereq_events_correctly() {
1553 let (client, event_rx) = make_mocked_client();
1554 client.start_with_default_executor();
1555
1556 let mut basic_preqreq_flag = basic_flag("prereqFlag");
1557 basic_preqreq_flag.track_events = true;
1558
1559 client
1560 .data_store
1561 .write()
1562 .upsert(
1563 "prereqFlag",
1564 PatchTarget::Flag(StorageItem::Item(basic_preqreq_flag)),
1565 )
1566 .expect("patch should apply");
1567
1568 let mut basic_flag = basic_flag_with_prereq("myFlag", "prereqFlag");
1569 basic_flag.track_events = true;
1570 client
1571 .data_store
1572 .write()
1573 .upsert("myFlag", PatchTarget::Flag(StorageItem::Item(basic_flag)))
1574 .expect("patch should apply");
1575 let context = ContextBuilder::new("bob")
1576 .build()
1577 .expect("Failed to create context");
1578
1579 let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1580
1581 assert!(detail.value.unwrap().as_bool().unwrap());
1582 assert!(matches!(
1583 detail.reason,
1584 Reason::Fallthrough {
1585 in_experiment: false
1586 }
1587 ));
1588 client.flush();
1589 client.close();
1590
1591 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1592 assert_eq!(events.len(), 4);
1593 assert_eq!(events[0].kind(), "index");
1594 assert_eq!(events[1].kind(), "feature");
1595 assert_eq!(events[2].kind(), "feature");
1596 assert_eq!(events[3].kind(), "summary");
1597
1598 if let OutputEvent::Summary(event_summary) = events[3].clone() {
1599 let variation_key = VariationKey {
1600 version: Some(42),
1601 variation: Some(1),
1602 };
1603 let feature = event_summary.features.get("myFlag");
1604 assert!(feature.is_some());
1605
1606 let feature = feature.unwrap();
1607 assert!(feature.counters.contains_key(&variation_key));
1608
1609 let variation_key = VariationKey {
1610 version: Some(42),
1611 variation: Some(1),
1612 };
1613 let feature = event_summary.features.get("prereqFlag");
1614 assert!(feature.is_some());
1615
1616 let feature = feature.unwrap();
1617 assert!(feature.counters.contains_key(&variation_key));
1618 }
1619 }
1620
1621 #[test]
1622 fn variation_handles_failed_prereqs_correctly() {
1623 let (client, event_rx) = make_mocked_client();
1624 client.start_with_default_executor();
1625
1626 let mut basic_preqreq_flag = basic_off_flag("prereqFlag");
1627 basic_preqreq_flag.track_events = true;
1628
1629 client
1630 .data_store
1631 .write()
1632 .upsert(
1633 "prereqFlag",
1634 PatchTarget::Flag(StorageItem::Item(basic_preqreq_flag)),
1635 )
1636 .expect("patch should apply");
1637
1638 let mut basic_flag = basic_flag_with_prereq("myFlag", "prereqFlag");
1639 basic_flag.track_events = true;
1640 client
1641 .data_store
1642 .write()
1643 .upsert("myFlag", PatchTarget::Flag(StorageItem::Item(basic_flag)))
1644 .expect("patch should apply");
1645 let context = ContextBuilder::new("bob")
1646 .build()
1647 .expect("Failed to create context");
1648
1649 let detail = client.variation(&context, "myFlag", FlagValue::Bool(false));
1650
1651 assert!(!detail.as_bool().unwrap());
1652 client.flush();
1653 client.close();
1654
1655 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1656 assert_eq!(events.len(), 4);
1657 assert_eq!(events[0].kind(), "index");
1658 assert_eq!(events[1].kind(), "feature");
1659 assert_eq!(events[2].kind(), "feature");
1660 assert_eq!(events[3].kind(), "summary");
1661
1662 if let OutputEvent::Summary(event_summary) = events[3].clone() {
1663 let variation_key = VariationKey {
1664 version: Some(42),
1665 variation: Some(0),
1666 };
1667 let feature = event_summary.features.get("myFlag");
1668 assert!(feature.is_some());
1669
1670 let feature = feature.unwrap();
1671 assert!(feature.counters.contains_key(&variation_key));
1672
1673 let variation_key = VariationKey {
1674 version: Some(42),
1675 variation: None,
1676 };
1677 let feature = event_summary.features.get("prereqFlag");
1678 assert!(feature.is_some());
1679
1680 let feature = feature.unwrap();
1681 assert!(feature.counters.contains_key(&variation_key));
1682 }
1683 }
1684
1685 #[test]
1686 fn variation_detail_handles_flag_not_found() {
1687 let (client, event_rx) = make_mocked_client();
1688 client.start_with_default_executor();
1689
1690 let context = ContextBuilder::new("bob")
1691 .build()
1692 .expect("Failed to create context");
1693 let detail = client.variation_detail(&context, "non-existent-flag", FlagValue::Bool(false));
1694
1695 assert!(!detail.value.unwrap().as_bool().unwrap());
1696 assert!(matches!(
1697 detail.reason,
1698 Reason::Error {
1699 error: eval::Error::FlagNotFound
1700 }
1701 ));
1702 client.flush();
1703 client.close();
1704
1705 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1706 assert_eq!(events.len(), 2);
1707 assert_eq!(events[0].kind(), "index");
1708 assert_eq!(events[1].kind(), "summary");
1709
1710 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1711 let variation_key = VariationKey {
1712 version: None,
1713 variation: None,
1714 };
1715 let feature = event_summary.features.get("non-existent-flag");
1716 assert!(feature.is_some());
1717
1718 let feature = feature.unwrap();
1719 assert!(feature.counters.contains_key(&variation_key));
1720 } else {
1721 panic!("Event should be a summary type");
1722 }
1723 }
1724
1725 #[tokio::test]
1726 async fn variation_detail_handles_client_not_ready() {
1727 let (client, event_rx) = make_mocked_client_with_delay(u64::MAX, false, false);
1728 client.start_with_default_executor();
1729 let context = ContextBuilder::new("bob")
1730 .build()
1731 .expect("Failed to create context");
1732
1733 let detail = client.variation_detail(&context, "non-existent-flag", FlagValue::Bool(false));
1734
1735 assert!(!detail.value.unwrap().as_bool().unwrap());
1736 assert!(matches!(
1737 detail.reason,
1738 Reason::Error {
1739 error: eval::Error::ClientNotReady
1740 }
1741 ));
1742 client.flush();
1743 client.close();
1744
1745 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1746 assert_eq!(events.len(), 2);
1747 assert_eq!(events[0].kind(), "index");
1748 assert_eq!(events[1].kind(), "summary");
1749
1750 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1751 let variation_key = VariationKey {
1752 version: None,
1753 variation: None,
1754 };
1755 let feature = event_summary.features.get("non-existent-flag");
1756 assert!(feature.is_some());
1757
1758 let feature = feature.unwrap();
1759 assert!(feature.counters.contains_key(&variation_key));
1760 } else {
1761 panic!("Event should be a summary type");
1762 }
1763 }
1764
1765 #[test]
1766 fn identify_sends_identify_event() {
1767 let (client, event_rx) = make_mocked_client();
1768 client.start_with_default_executor();
1769
1770 let context = ContextBuilder::new("bob")
1771 .build()
1772 .expect("Failed to create context");
1773
1774 client.identify(context);
1775 client.flush();
1776 client.close();
1777
1778 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1779 assert_eq!(events.len(), 1);
1780 assert_eq!(events[0].kind(), "identify");
1781 }
1782
1783 #[test]
1784 fn identify_sends_sends_nothing_in_offline_mode() {
1785 let (client, event_rx) = make_mocked_offline_client();
1786 client.start_with_default_executor();
1787
1788 let context = ContextBuilder::new("bob")
1789 .build()
1790 .expect("Failed to create context");
1791
1792 client.identify(context);
1793 client.flush();
1794 client.close();
1795
1796 assert_eq!(event_rx.iter().count(), 0);
1797 }
1798
1799 #[test]
1800 fn secure_mode_hash() {
1801 let config = ConfigBuilder::new("secret")
1802 .offline(true)
1803 .build()
1804 .expect("config should build");
1805 let client = Client::build(config).expect("Should be built.");
1806 let context = ContextBuilder::new("Message")
1807 .build()
1808 .expect("Failed to create context");
1809
1810 assert_eq!(
1811 client.secure_mode_hash(&context),
1812 "aa747c502a898200f9e4fa21bac68136f886a0e27aec70ba06daf2e2a5cb5597"
1813 );
1814 }
1815
1816 #[test]
1817 fn secure_mode_hash_with_multi_kind() {
1818 let config = ConfigBuilder::new("secret")
1819 .offline(true)
1820 .build()
1821 .expect("config should build");
1822 let client = Client::build(config).expect("Should be built.");
1823
1824 let org = ContextBuilder::new("org-key|1")
1825 .kind("org")
1826 .build()
1827 .expect("Failed to create context");
1828 let user = ContextBuilder::new("user-key:2")
1829 .build()
1830 .expect("Failed to create context");
1831
1832 let context = MultiContextBuilder::new()
1833 .add_context(org)
1834 .add_context(user)
1835 .build()
1836 .expect("failed to build multi-context");
1837
1838 assert_eq!(
1839 client.secure_mode_hash(&context),
1840 "5687e6383b920582ed50c2a96c98a115f1b6aad85a60579d761d9b8797415163"
1841 );
1842 }
1843
1844 #[derive(Serialize)]
1845 struct MyCustomData {
1846 pub answer: u32,
1847 }
1848
1849 #[test]
1850 fn track_sends_track_and_index_events() -> serde_json::Result<()> {
1851 let (client, event_rx) = make_mocked_client();
1852 client.start_with_default_executor();
1853
1854 let context = ContextBuilder::new("bob")
1855 .build()
1856 .expect("Failed to create context");
1857
1858 client.track_event(context.clone(), "event-with-null");
1859 client.track_data(context.clone(), "event-with-string", "string-data")?;
1860 client.track_data(context.clone(), "event-with-json", json!({"answer": 42}))?;
1861 client.track_data(
1862 context.clone(),
1863 "event-with-struct",
1864 MyCustomData { answer: 42 },
1865 )?;
1866 client.track_metric(context, "event-with-metric", 42.0, serde_json::Value::Null);
1867
1868 client.flush();
1869 client.close();
1870
1871 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1872 assert_eq!(events.len(), 6);
1873
1874 let mut events_by_type: HashMap<&str, usize> = HashMap::new();
1875 for event in events {
1876 if let Some(count) = events_by_type.get_mut(event.kind()) {
1877 *count += 1;
1878 } else {
1879 events_by_type.insert(event.kind(), 1);
1880 }
1881 }
1882 assert!(matches!(events_by_type.get("index"), Some(1)));
1883 assert!(matches!(events_by_type.get("custom"), Some(5)));
1884
1885 Ok(())
1886 }
1887
1888 #[test]
1889 fn track_sends_nothing_in_offline_mode() -> serde_json::Result<()> {
1890 let (client, event_rx) = make_mocked_offline_client();
1891 client.start_with_default_executor();
1892
1893 let context = ContextBuilder::new("bob")
1894 .build()
1895 .expect("Failed to create context");
1896
1897 client.track_event(context.clone(), "event-with-null");
1898 client.track_data(context.clone(), "event-with-string", "string-data")?;
1899 client.track_data(context.clone(), "event-with-json", json!({"answer": 42}))?;
1900 client.track_data(
1901 context.clone(),
1902 "event-with-struct",
1903 MyCustomData { answer: 42 },
1904 )?;
1905 client.track_metric(context, "event-with-metric", 42.0, serde_json::Value::Null);
1906
1907 client.flush();
1908 client.close();
1909
1910 assert_eq!(event_rx.iter().count(), 0);
1911
1912 Ok(())
1913 }
1914
1915 #[test]
1916 fn migration_handles_flag_not_found() {
1917 let (client, _event_rx) = make_mocked_client();
1918 client.start_with_default_executor();
1919
1920 let context = ContextBuilder::new("bob")
1921 .build()
1922 .expect("Failed to create context");
1923
1924 let (stage, _tracker) =
1925 client.migration_variation(&context, "non-existent-flag-key", Stage::Off);
1926
1927 assert_eq!(stage, Stage::Off);
1928 }
1929
1930 #[test]
1931 fn migration_uses_non_migration_flag() {
1932 let (client, _event_rx) = make_mocked_client();
1933 client.start_with_default_executor();
1934 client
1935 .data_store
1936 .write()
1937 .upsert(
1938 "boolean-flag",
1939 PatchTarget::Flag(StorageItem::Item(basic_flag("boolean-flag"))),
1940 )
1941 .expect("patch should apply");
1942
1943 let context = ContextBuilder::new("bob")
1944 .build()
1945 .expect("Failed to create context");
1946
1947 let (stage, _tracker) = client.migration_variation(&context, "boolean-flag", Stage::Off);
1948
1949 assert_eq!(stage, Stage::Off);
1950 }
1951
1952 #[test_case(Stage::Off)]
1953 #[test_case(Stage::DualWrite)]
1954 #[test_case(Stage::Shadow)]
1955 #[test_case(Stage::Live)]
1956 #[test_case(Stage::Rampdown)]
1957 #[test_case(Stage::Complete)]
1958 fn migration_can_determine_correct_stage_from_flag(stage: Stage) {
1959 let (client, _event_rx) = make_mocked_client();
1960 client.start_with_default_executor();
1961 client
1962 .data_store
1963 .write()
1964 .upsert(
1965 "stage-flag",
1966 PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
1967 )
1968 .expect("patch should apply");
1969
1970 let context = ContextBuilder::new("bob")
1971 .build()
1972 .expect("Failed to create context");
1973
1974 let (evaluated_stage, _tracker) =
1975 client.migration_variation(&context, "stage-flag", Stage::Off);
1976
1977 assert_eq!(evaluated_stage, stage);
1978 }
1979
1980 #[tokio::test]
1981 async fn migration_tracks_invoked_correctly() {
1982 migration_tracks_invoked_correctly_driver(Stage::Off, Operation::Read, vec![Origin::Old])
1983 .await;
1984 migration_tracks_invoked_correctly_driver(
1985 Stage::DualWrite,
1986 Operation::Read,
1987 vec![Origin::Old],
1988 )
1989 .await;
1990 migration_tracks_invoked_correctly_driver(
1991 Stage::Shadow,
1992 Operation::Read,
1993 vec![Origin::Old, Origin::New],
1994 )
1995 .await;
1996 migration_tracks_invoked_correctly_driver(
1997 Stage::Live,
1998 Operation::Read,
1999 vec![Origin::Old, Origin::New],
2000 )
2001 .await;
2002 migration_tracks_invoked_correctly_driver(
2003 Stage::Rampdown,
2004 Operation::Read,
2005 vec![Origin::New],
2006 )
2007 .await;
2008 migration_tracks_invoked_correctly_driver(
2009 Stage::Complete,
2010 Operation::Read,
2011 vec![Origin::New],
2012 )
2013 .await;
2014 migration_tracks_invoked_correctly_driver(Stage::Off, Operation::Write, vec![Origin::Old])
2015 .await;
2016 migration_tracks_invoked_correctly_driver(
2017 Stage::DualWrite,
2018 Operation::Write,
2019 vec![Origin::Old, Origin::New],
2020 )
2021 .await;
2022 migration_tracks_invoked_correctly_driver(
2023 Stage::Shadow,
2024 Operation::Write,
2025 vec![Origin::Old, Origin::New],
2026 )
2027 .await;
2028 migration_tracks_invoked_correctly_driver(
2029 Stage::Live,
2030 Operation::Write,
2031 vec![Origin::Old, Origin::New],
2032 )
2033 .await;
2034 migration_tracks_invoked_correctly_driver(
2035 Stage::Rampdown,
2036 Operation::Write,
2037 vec![Origin::Old, Origin::New],
2038 )
2039 .await;
2040 migration_tracks_invoked_correctly_driver(
2041 Stage::Complete,
2042 Operation::Write,
2043 vec![Origin::New],
2044 )
2045 .await;
2046 }
2047
2048 async fn migration_tracks_invoked_correctly_driver(
2049 stage: Stage,
2050 operation: Operation,
2051 origins: Vec<Origin>,
2052 ) {
2053 let (client, event_rx) = make_mocked_client();
2054 let client = Arc::new(client);
2055 client.start_with_default_executor();
2056 client
2057 .data_store
2058 .write()
2059 .upsert(
2060 "stage-flag",
2061 PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2062 )
2063 .expect("patch should apply");
2064
2065 let mut migrator = MigratorBuilder::new(client.clone())
2066 .read(
2067 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2068 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2069 Some(|_, _| true),
2070 )
2071 .write(
2072 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2073 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2074 )
2075 .build()
2076 .expect("migrator should build");
2077
2078 let context = ContextBuilder::new("bob")
2079 .build()
2080 .expect("Failed to create context");
2081
2082 if let Operation::Read = operation {
2083 migrator
2084 .read(
2085 &context,
2086 "stage-flag".into(),
2087 Stage::Off,
2088 serde_json::Value::Null,
2089 )
2090 .await;
2091 } else {
2092 migrator
2093 .write(
2094 &context,
2095 "stage-flag".into(),
2096 Stage::Off,
2097 serde_json::Value::Null,
2098 )
2099 .await;
2100 }
2101
2102 client.flush();
2103 client.close();
2104
2105 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2106 assert_eq!(events.len(), 3);
2107 match &events[1] {
2108 OutputEvent::MigrationOp(event) => {
2109 assert!(event.invoked.len() == origins.len());
2110 assert!(event.invoked.iter().all(|i| origins.contains(i)));
2111 }
2112 _ => panic!("Expected migration event"),
2113 }
2114 }
2115
2116 #[tokio::test]
2117 async fn migration_tracks_latency() {
2118 migration_tracks_latency_driver(Stage::Off, Operation::Read, vec![Origin::Old]).await;
2119 migration_tracks_latency_driver(Stage::DualWrite, Operation::Read, vec![Origin::Old]).await;
2120 migration_tracks_latency_driver(
2121 Stage::Shadow,
2122 Operation::Read,
2123 vec![Origin::Old, Origin::New],
2124 )
2125 .await;
2126 migration_tracks_latency_driver(
2127 Stage::Live,
2128 Operation::Read,
2129 vec![Origin::Old, Origin::New],
2130 )
2131 .await;
2132 migration_tracks_latency_driver(Stage::Rampdown, Operation::Read, vec![Origin::New]).await;
2133 migration_tracks_latency_driver(Stage::Complete, Operation::Read, vec![Origin::New]).await;
2134 migration_tracks_latency_driver(Stage::Off, Operation::Write, vec![Origin::Old]).await;
2135 migration_tracks_latency_driver(
2136 Stage::DualWrite,
2137 Operation::Write,
2138 vec![Origin::Old, Origin::New],
2139 )
2140 .await;
2141 migration_tracks_latency_driver(
2142 Stage::Shadow,
2143 Operation::Write,
2144 vec![Origin::Old, Origin::New],
2145 )
2146 .await;
2147 migration_tracks_latency_driver(
2148 Stage::Live,
2149 Operation::Write,
2150 vec![Origin::Old, Origin::New],
2151 )
2152 .await;
2153 migration_tracks_latency_driver(
2154 Stage::Rampdown,
2155 Operation::Write,
2156 vec![Origin::Old, Origin::New],
2157 )
2158 .await;
2159 migration_tracks_latency_driver(Stage::Complete, Operation::Write, vec![Origin::New]).await;
2160 }
2161
2162 async fn migration_tracks_latency_driver(
2163 stage: Stage,
2164 operation: Operation,
2165 origins: Vec<Origin>,
2166 ) {
2167 let (client, event_rx) = make_mocked_client();
2168 let client = Arc::new(client);
2169 client.start_with_default_executor();
2170 client
2171 .data_store
2172 .write()
2173 .upsert(
2174 "stage-flag",
2175 PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2176 )
2177 .expect("patch should apply");
2178
2179 let mut migrator = MigratorBuilder::new(client.clone())
2180 .track_latency(true)
2181 .read(
2182 |_| {
2183 async move {
2184 async_std::task::sleep(Duration::from_millis(100)).await;
2185 Ok(serde_json::Value::Null)
2186 }
2187 .boxed()
2188 },
2189 |_| {
2190 async move {
2191 async_std::task::sleep(Duration::from_millis(100)).await;
2192 Ok(serde_json::Value::Null)
2193 }
2194 .boxed()
2195 },
2196 Some(|_, _| true),
2197 )
2198 .write(
2199 |_| {
2200 async move {
2201 async_std::task::sleep(Duration::from_millis(100)).await;
2202 Ok(serde_json::Value::Null)
2203 }
2204 .boxed()
2205 },
2206 |_| {
2207 async move {
2208 async_std::task::sleep(Duration::from_millis(100)).await;
2209 Ok(serde_json::Value::Null)
2210 }
2211 .boxed()
2212 },
2213 )
2214 .build()
2215 .expect("migrator should build");
2216
2217 let context = ContextBuilder::new("bob")
2218 .build()
2219 .expect("Failed to create context");
2220
2221 if let Operation::Read = operation {
2222 migrator
2223 .read(
2224 &context,
2225 "stage-flag".into(),
2226 Stage::Off,
2227 serde_json::Value::Null,
2228 )
2229 .await;
2230 } else {
2231 migrator
2232 .write(
2233 &context,
2234 "stage-flag".into(),
2235 Stage::Off,
2236 serde_json::Value::Null,
2237 )
2238 .await;
2239 }
2240
2241 client.flush();
2242 client.close();
2243
2244 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2245 assert_eq!(events.len(), 3);
2246 match &events[1] {
2247 OutputEvent::MigrationOp(event) => {
2248 assert!(event.latency.len() == origins.len());
2249 assert!(event
2250 .latency
2251 .values()
2252 .all(|l| l > &Duration::from_millis(100)));
2253 }
2254 _ => panic!("Expected migration event"),
2255 }
2256 }
2257
2258 #[tokio::test]
2259 async fn migration_tracks_read_errors() {
2260 migration_tracks_read_errors_driver(Stage::Off, vec![Origin::Old]).await;
2261 migration_tracks_read_errors_driver(Stage::DualWrite, vec![Origin::Old]).await;
2262 migration_tracks_read_errors_driver(Stage::Shadow, vec![Origin::Old, Origin::New]).await;
2263 migration_tracks_read_errors_driver(Stage::Live, vec![Origin::Old, Origin::New]).await;
2264 migration_tracks_read_errors_driver(Stage::Rampdown, vec![Origin::New]).await;
2265 migration_tracks_read_errors_driver(Stage::Complete, vec![Origin::New]).await;
2266 }
2267
2268 async fn migration_tracks_read_errors_driver(stage: Stage, origins: Vec<Origin>) {
2269 let (client, event_rx) = make_mocked_client();
2270 let client = Arc::new(client);
2271 client.start_with_default_executor();
2272 client
2273 .data_store
2274 .write()
2275 .upsert(
2276 "stage-flag",
2277 PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2278 )
2279 .expect("patch should apply");
2280
2281 let mut migrator = MigratorBuilder::new(client.clone())
2282 .track_latency(true)
2283 .read(
2284 |_| async move { Err("fail".into()) }.boxed(),
2285 |_| async move { Err("fail".into()) }.boxed(),
2286 Some(|_: &String, _: &String| true),
2287 )
2288 .write(
2289 |_| async move { Err("fail".into()) }.boxed(),
2290 |_| async move { Err("fail".into()) }.boxed(),
2291 )
2292 .build()
2293 .expect("migrator should build");
2294
2295 let context = ContextBuilder::new("bob")
2296 .build()
2297 .expect("Failed to create context");
2298
2299 migrator
2300 .read(
2301 &context,
2302 "stage-flag".into(),
2303 Stage::Off,
2304 serde_json::Value::Null,
2305 )
2306 .await;
2307 client.flush();
2308 client.close();
2309
2310 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2311 assert_eq!(events.len(), 3);
2312 match &events[1] {
2313 OutputEvent::MigrationOp(event) => {
2314 assert!(event.errors.len() == origins.len());
2315 assert!(event.errors.iter().all(|i| origins.contains(i)));
2316 }
2317 _ => panic!("Expected migration event"),
2318 }
2319 }
2320
2321 #[tokio::test]
2322 async fn migration_tracks_authoritative_write_errors() {
2323 migration_tracks_authoritative_write_errors_driver(Stage::Off, vec![Origin::Old]).await;
2324 migration_tracks_authoritative_write_errors_driver(Stage::DualWrite, vec![Origin::Old])
2325 .await;
2326 migration_tracks_authoritative_write_errors_driver(Stage::Shadow, vec![Origin::Old]).await;
2327 migration_tracks_authoritative_write_errors_driver(Stage::Live, vec![Origin::New]).await;
2328 migration_tracks_authoritative_write_errors_driver(Stage::Rampdown, vec![Origin::New])
2329 .await;
2330 migration_tracks_authoritative_write_errors_driver(Stage::Complete, vec![Origin::New])
2331 .await;
2332 }
2333
2334 async fn migration_tracks_authoritative_write_errors_driver(
2335 stage: Stage,
2336 origins: Vec<Origin>,
2337 ) {
2338 let (client, event_rx) = make_mocked_client();
2339 let client = Arc::new(client);
2340 client.start_with_default_executor();
2341 client
2342 .data_store
2343 .write()
2344 .upsert(
2345 "stage-flag",
2346 PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2347 )
2348 .expect("patch should apply");
2349
2350 let mut migrator = MigratorBuilder::new(client.clone())
2351 .track_latency(true)
2352 .read(
2353 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2354 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2355 None,
2356 )
2357 .write(
2358 |_| async move { Err("fail".into()) }.boxed(),
2359 |_| async move { Err("fail".into()) }.boxed(),
2360 )
2361 .build()
2362 .expect("migrator should build");
2363
2364 let context = ContextBuilder::new("bob")
2365 .build()
2366 .expect("Failed to create context");
2367
2368 migrator
2369 .write(
2370 &context,
2371 "stage-flag".into(),
2372 Stage::Off,
2373 serde_json::Value::Null,
2374 )
2375 .await;
2376
2377 client.flush();
2378 client.close();
2379
2380 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2381 assert_eq!(events.len(), 3);
2382 match &events[1] {
2383 OutputEvent::MigrationOp(event) => {
2384 assert!(event.errors.len() == origins.len());
2385 assert!(event.errors.iter().all(|i| origins.contains(i)));
2386 }
2387 _ => panic!("Expected migration event"),
2388 }
2389 }
2390
2391 #[tokio::test]
2392 async fn migration_tracks_nonauthoritative_write_errors() {
2393 migration_tracks_nonauthoritative_write_errors_driver(
2394 Stage::DualWrite,
2395 false,
2396 true,
2397 vec![Origin::New],
2398 )
2399 .await;
2400 migration_tracks_nonauthoritative_write_errors_driver(
2401 Stage::Shadow,
2402 false,
2403 true,
2404 vec![Origin::New],
2405 )
2406 .await;
2407 migration_tracks_nonauthoritative_write_errors_driver(
2408 Stage::Live,
2409 true,
2410 false,
2411 vec![Origin::Old],
2412 )
2413 .await;
2414 migration_tracks_nonauthoritative_write_errors_driver(
2415 Stage::Rampdown,
2416 true,
2417 false,
2418 vec![Origin::Old],
2419 )
2420 .await;
2421 }
2422
2423 async fn migration_tracks_nonauthoritative_write_errors_driver(
2424 stage: Stage,
2425 fail_old: bool,
2426 fail_new: bool,
2427 origins: Vec<Origin>,
2428 ) {
2429 let (client, event_rx) = make_mocked_client();
2430 let client = Arc::new(client);
2431 client.start_with_default_executor();
2432 client
2433 .data_store
2434 .write()
2435 .upsert(
2436 "stage-flag",
2437 PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2438 )
2439 .expect("patch should apply");
2440
2441 let mut migrator = MigratorBuilder::new(client.clone())
2442 .track_latency(true)
2443 .read(
2444 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2445 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2446 None,
2447 )
2448 .write(
2449 move |_| {
2450 async move {
2451 if fail_old {
2452 Err("fail".into())
2453 } else {
2454 Ok(serde_json::Value::Null)
2455 }
2456 }
2457 .boxed()
2458 },
2459 move |_| {
2460 async move {
2461 if fail_new {
2462 Err("fail".into())
2463 } else {
2464 Ok(serde_json::Value::Null)
2465 }
2466 }
2467 .boxed()
2468 },
2469 )
2470 .build()
2471 .expect("migrator should build");
2472
2473 let context = ContextBuilder::new("bob")
2474 .build()
2475 .expect("Failed to create context");
2476
2477 migrator
2478 .write(
2479 &context,
2480 "stage-flag".into(),
2481 Stage::Off,
2482 serde_json::Value::Null,
2483 )
2484 .await;
2485
2486 client.flush();
2487 client.close();
2488
2489 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2490 assert_eq!(events.len(), 3);
2491 match &events[1] {
2492 OutputEvent::MigrationOp(event) => {
2493 assert!(event.errors.len() == origins.len());
2494 assert!(event.errors.iter().all(|i| origins.contains(i)));
2495 }
2496 _ => panic!("Expected migration event"),
2497 }
2498 }
2499
2500 #[tokio::test]
2501 async fn migration_tracks_consistency() {
2502 migration_tracks_consistency_driver(Stage::Shadow, "same", "same", true).await;
2503 migration_tracks_consistency_driver(Stage::Shadow, "same", "different", false).await;
2504 migration_tracks_consistency_driver(Stage::Live, "same", "same", true).await;
2505 migration_tracks_consistency_driver(Stage::Live, "same", "different", false).await;
2506 }
2507
2508 async fn migration_tracks_consistency_driver(
2509 stage: Stage,
2510 old_return: &'static str,
2511 new_return: &'static str,
2512 expected_consistency: bool,
2513 ) {
2514 let (client, event_rx) = make_mocked_client();
2515 let client = Arc::new(client);
2516 client.start_with_default_executor();
2517 client
2518 .data_store
2519 .write()
2520 .upsert(
2521 "stage-flag",
2522 PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2523 )
2524 .expect("patch should apply");
2525
2526 let mut migrator = MigratorBuilder::new(client.clone())
2527 .track_latency(true)
2528 .read(
2529 |_| {
2530 async move {
2531 async_std::task::sleep(Duration::from_millis(100)).await;
2532 Ok(serde_json::Value::String(old_return.to_string()))
2533 }
2534 .boxed()
2535 },
2536 |_| {
2537 async move {
2538 async_std::task::sleep(Duration::from_millis(100)).await;
2539 Ok(serde_json::Value::String(new_return.to_string()))
2540 }
2541 .boxed()
2542 },
2543 Some(|lhs, rhs| lhs == rhs),
2544 )
2545 .write(
2546 |_| {
2547 async move {
2548 async_std::task::sleep(Duration::from_millis(100)).await;
2549 Ok(serde_json::Value::Null)
2550 }
2551 .boxed()
2552 },
2553 |_| {
2554 async move {
2555 async_std::task::sleep(Duration::from_millis(100)).await;
2556 Ok(serde_json::Value::Null)
2557 }
2558 .boxed()
2559 },
2560 )
2561 .build()
2562 .expect("migrator should build");
2563
2564 let context = ContextBuilder::new("bob")
2565 .build()
2566 .expect("Failed to create context");
2567
2568 migrator
2569 .read(
2570 &context,
2571 "stage-flag".into(),
2572 Stage::Off,
2573 serde_json::Value::Null,
2574 )
2575 .await;
2576
2577 client.flush();
2578 client.close();
2579
2580 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2581 assert_eq!(events.len(), 3);
2582 match &events[1] {
2583 OutputEvent::MigrationOp(event) => {
2584 assert!(event.consistency_check == Some(expected_consistency))
2585 }
2586 _ => panic!("Expected migration event"),
2587 }
2588 }
2589
2590 fn make_mocked_client_with_delay(
2591 delay: u64,
2592 offline: bool,
2593 daemon_mode: bool,
2594 ) -> (Client, Receiver<OutputEvent>) {
2595 let updates = Arc::new(MockDataSource::new_with_init_delay(delay));
2596 let (event_sender, event_rx) = create_event_sender();
2597
2598 let config = ConfigBuilder::new("sdk-key")
2599 .offline(offline)
2600 .daemon_mode(daemon_mode)
2601 .data_source(MockDataSourceBuilder::new().data_source(updates))
2602 .event_processor(
2603 EventProcessorBuilder::<HttpConnector>::new().event_sender(Arc::new(event_sender)),
2604 )
2605 .build()
2606 .expect("config should build");
2607
2608 let client = Client::build(config).expect("Should be built.");
2609
2610 (client, event_rx)
2611 }
2612
2613 fn make_mocked_offline_client() -> (Client, Receiver<OutputEvent>) {
2614 make_mocked_client_with_delay(0, true, false)
2615 }
2616
2617 fn make_mocked_client() -> (Client, Receiver<OutputEvent>) {
2618 make_mocked_client_with_delay(0, false, false)
2619 }
2620}