1use eval::Context;
2use parking_lot::RwLock;
3use std::io;
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7use tokio::runtime::Runtime;
8
9use launchdarkly_server_sdk_evaluation::{self as eval, Detail, FlagValue, PrerequisiteEvent};
10use serde::Serialize;
11use thiserror::Error;
12use tokio::sync::{broadcast, Semaphore};
13
14use super::config::Config;
15use super::data_source::DataSource;
16use super::data_source_builders::BuildError as DataSourceError;
17use super::evaluation::{FlagDetail, FlagDetailConfig};
18use super::stores::store::DataStore;
19use super::stores::store_builders::BuildError as DataStoreError;
20use crate::config::BuildError as ConfigBuildError;
21use crate::events::event::EventFactory;
22use crate::events::event::InputEvent;
23use crate::events::processor::EventProcessor;
24use crate::events::processor_builders::BuildError as EventProcessorError;
25use crate::{MigrationOpTracker, Stage};
26
27struct EventsScope {
28 disabled: bool,
29 event_factory: EventFactory,
30 prerequisite_event_recorder: Box<dyn eval::PrerequisiteEventRecorder + Send + Sync>,
31}
32
33struct PrerequisiteEventRecorder {
34 event_factory: EventFactory,
35 event_processor: Arc<dyn EventProcessor>,
36}
37
38impl eval::PrerequisiteEventRecorder for PrerequisiteEventRecorder {
39 fn record(&self, event: PrerequisiteEvent) {
40 let evt = self.event_factory.new_eval_event(
41 &event.prerequisite_flag.key,
42 event.context.clone(),
43 &event.prerequisite_flag,
44 event.prerequisite_result,
45 FlagValue::Json(serde_json::Value::Null),
46 Some(event.target_flag_key),
47 );
48
49 self.event_processor.send(evt);
50 }
51}
52
53#[non_exhaustive]
55#[derive(Debug, Error)]
56pub enum BuildError {
57 #[error("invalid client config: {0}")]
59 InvalidConfig(String),
60}
61
62impl From<DataSourceError> for BuildError {
63 fn from(error: DataSourceError) -> Self {
64 Self::InvalidConfig(error.to_string())
65 }
66}
67
68impl From<DataStoreError> for BuildError {
69 fn from(error: DataStoreError) -> Self {
70 Self::InvalidConfig(error.to_string())
71 }
72}
73
74impl From<EventProcessorError> for BuildError {
75 fn from(error: EventProcessorError) -> Self {
76 Self::InvalidConfig(error.to_string())
77 }
78}
79
80impl From<ConfigBuildError> for BuildError {
81 fn from(error: ConfigBuildError) -> Self {
82 Self::InvalidConfig(error.to_string())
83 }
84}
85
86#[non_exhaustive]
88#[derive(Debug, Error)]
89pub enum StartError {
90 #[error("couldn't spawn background thread for client: {0}")]
92 SpawnFailed(io::Error),
93}
94
95#[derive(PartialEq, Copy, Clone, Debug)]
96enum ClientInitState {
97 Initializing = 0,
98 Initialized = 1,
99 InitializationFailed = 2,
100}
101
102impl PartialEq<usize> for ClientInitState {
103 fn eq(&self, other: &usize) -> bool {
104 *self as usize == *other
105 }
106}
107
108impl From<usize> for ClientInitState {
109 fn from(val: usize) -> Self {
110 match val {
111 0 => ClientInitState::Initializing,
112 1 => ClientInitState::Initialized,
113 2 => ClientInitState::InitializationFailed,
114 _ => unreachable!(),
115 }
116 }
117}
118
119pub struct Client {
149 event_processor: Arc<dyn EventProcessor>,
150 data_source: Arc<dyn DataSource>,
151 data_store: Arc<RwLock<dyn DataStore>>,
152 events_default: EventsScope,
153 events_with_reasons: EventsScope,
154 init_notify: Arc<Semaphore>,
155 init_state: Arc<AtomicUsize>,
156 started: AtomicBool,
157 offline: bool,
158 daemon_mode: bool,
159 sdk_key: String,
160 shutdown_broadcast: broadcast::Sender<()>,
161 runtime: RwLock<Option<Runtime>>,
162}
163
164impl Client {
165 pub fn build(config: Config) -> Result<Self, BuildError> {
167 if config.offline() {
168 info!("Started LaunchDarkly Client in offline mode");
169 } else if config.daemon_mode() {
170 info!("Started LaunchDarkly Client in daemon mode");
171 }
172
173 let tags = config.application_tag();
174
175 let endpoints = config.service_endpoints_builder().build()?;
176 let event_processor =
177 config
178 .event_processor_builder()
179 .build(&endpoints, config.sdk_key(), tags.clone())?;
180 let data_source =
181 config
182 .data_source_builder()
183 .build(&endpoints, config.sdk_key(), tags.clone())?;
184 let data_store = config.data_store_builder().build()?;
185
186 let events_default = EventsScope {
187 disabled: config.offline(),
188 event_factory: EventFactory::new(false),
189 prerequisite_event_recorder: Box::new(PrerequisiteEventRecorder {
190 event_factory: EventFactory::new(false),
191 event_processor: event_processor.clone(),
192 }),
193 };
194
195 let events_with_reasons = EventsScope {
196 disabled: config.offline(),
197 event_factory: EventFactory::new(true),
198 prerequisite_event_recorder: Box::new(PrerequisiteEventRecorder {
199 event_factory: EventFactory::new(true),
200 event_processor: event_processor.clone(),
201 }),
202 };
203
204 let (shutdown_tx, _) = broadcast::channel(1);
205
206 Ok(Client {
207 event_processor,
208 data_source,
209 data_store,
210 events_default,
211 events_with_reasons,
212 init_notify: Arc::new(Semaphore::new(0)),
213 init_state: Arc::new(AtomicUsize::new(ClientInitState::Initializing as usize)),
214 started: AtomicBool::new(false),
215 offline: config.offline(),
216 daemon_mode: config.daemon_mode(),
217 sdk_key: config.sdk_key().into(),
218 shutdown_broadcast: shutdown_tx,
219 runtime: RwLock::new(None),
220 })
221 }
222
223 pub fn start_with_default_executor(&self) {
225 if self.started.load(Ordering::SeqCst) {
226 return;
227 }
228 self.started.store(true, Ordering::SeqCst);
229 self.start_with_default_executor_internal();
230 }
231
232 fn start_with_default_executor_internal(&self) {
233 let notify = self.init_notify.clone();
237 let init_state = self.init_state.clone();
238
239 self.data_source.subscribe(
240 self.data_store.clone(),
241 Arc::new(move |success| {
242 init_state.store(
243 (if success {
244 ClientInitState::Initialized
245 } else {
246 ClientInitState::InitializationFailed
247 }) as usize,
248 Ordering::SeqCst,
249 );
250 notify.add_permits(1);
251 }),
252 self.shutdown_broadcast.subscribe(),
253 );
254 }
255
256 pub fn start_with_runtime(&self) -> Result<bool, StartError> {
262 if self.started.load(Ordering::SeqCst) {
263 return Ok(true);
264 }
265 self.started.store(true, Ordering::SeqCst);
266
267 let runtime = Runtime::new().map_err(StartError::SpawnFailed)?;
268 let _guard = runtime.enter();
269 self.runtime.write().replace(runtime);
270
271 self.start_with_default_executor_internal();
272
273 Ok(true)
274 }
275
276 #[deprecated(
280 note = "blocking without a timeout is discouraged, use wait_for_initialization instead"
281 )]
282 pub async fn initialized_async(&self) -> bool {
283 self.initialized_async_internal().await
284 }
285
286 pub async fn wait_for_initialization(&self, timeout: Duration) -> Option<bool> {
292 if timeout > Duration::from_secs(60) {
293 warn!("wait_for_initialization was configured to block for up to {} seconds. We recommend blocking no longer than 60 seconds.", timeout.as_secs());
294 }
295
296 let initialized = tokio::time::timeout(timeout, self.initialized_async_internal()).await;
297 match initialized {
298 Ok(result) => Some(result),
299 Err(_) => None,
300 }
301 }
302
303 async fn initialized_async_internal(&self) -> bool {
304 if self.offline || self.daemon_mode {
305 return true;
306 }
307
308 if ClientInitState::Initialized != self.init_state.load(Ordering::SeqCst) {
314 let _permit = self.init_notify.acquire().await;
315 }
316 ClientInitState::Initialized == self.init_state.load(Ordering::SeqCst)
317 }
318
319 pub fn initialized(&self) -> bool {
323 self.offline
324 || self.daemon_mode
325 || ClientInitState::Initialized == self.init_state.load(Ordering::SeqCst)
326 }
327
328 pub fn close(&self) {
332 self.event_processor.close();
333
334 if !self.offline && !self.daemon_mode {
337 if let Err(e) = self.shutdown_broadcast.send(()) {
338 error!("Failed to shutdown client appropriately: {}", e);
339 }
340 }
341
342 self.runtime.write().take();
345 }
346
347 pub fn flush(&self) {
355 self.event_processor.flush();
356 }
357
358 pub fn identify(&self, context: Context) {
363 if self.events_default.disabled {
364 return;
365 }
366
367 self.send_internal(self.events_default.event_factory.new_identify(context));
368 }
369
370 pub fn bool_variation(&self, context: &Context, flag_key: &str, default: bool) -> bool {
378 let val = self.variation(context, flag_key, default);
379 if let Some(b) = val.as_bool() {
380 b
381 } else {
382 warn!(
383 "bool_variation called for a non-bool flag {:?} (got {:?})",
384 flag_key, val
385 );
386 default
387 }
388 }
389
390 pub fn str_variation(&self, context: &Context, flag_key: &str, default: String) -> String {
398 let val = self.variation(context, flag_key, default.clone());
399 if let Some(s) = val.as_string() {
400 s
401 } else {
402 warn!(
403 "str_variation called for a non-string flag {:?} (got {:?})",
404 flag_key, val
405 );
406 default
407 }
408 }
409
410 pub fn float_variation(&self, context: &Context, flag_key: &str, default: f64) -> f64 {
418 let val = self.variation(context, flag_key, default);
419 if let Some(f) = val.as_float() {
420 f
421 } else {
422 warn!(
423 "float_variation called for a non-float flag {:?} (got {:?})",
424 flag_key, val
425 );
426 default
427 }
428 }
429
430 pub fn int_variation(&self, context: &Context, flag_key: &str, default: i64) -> i64 {
438 let val = self.variation(context, flag_key, default);
439 if let Some(f) = val.as_int() {
440 f
441 } else {
442 warn!(
443 "int_variation called for a non-int flag {:?} (got {:?})",
444 flag_key, val
445 );
446 default
447 }
448 }
449
450 pub fn json_variation(
460 &self,
461 context: &Context,
462 flag_key: &str,
463 default: serde_json::Value,
464 ) -> serde_json::Value {
465 self.variation(context, flag_key, default.clone())
466 .as_json()
467 .unwrap_or(default)
468 }
469
470 pub fn bool_variation_detail(
477 &self,
478 context: &Context,
479 flag_key: &str,
480 default: bool,
481 ) -> Detail<bool> {
482 self.variation_detail(context, flag_key, default).try_map(
483 |val| val.as_bool(),
484 default,
485 eval::Error::WrongType,
486 )
487 }
488
489 pub fn str_variation_detail(
496 &self,
497 context: &Context,
498 flag_key: &str,
499 default: String,
500 ) -> Detail<String> {
501 self.variation_detail(context, flag_key, default.clone())
502 .try_map(|val| val.as_string(), default, eval::Error::WrongType)
503 }
504
505 pub fn float_variation_detail(
512 &self,
513 context: &Context,
514 flag_key: &str,
515 default: f64,
516 ) -> Detail<f64> {
517 self.variation_detail(context, flag_key, default).try_map(
518 |val| val.as_float(),
519 default,
520 eval::Error::WrongType,
521 )
522 }
523
524 pub fn int_variation_detail(
531 &self,
532 context: &Context,
533 flag_key: &str,
534 default: i64,
535 ) -> Detail<i64> {
536 self.variation_detail(context, flag_key, default).try_map(
537 |val| val.as_int(),
538 default,
539 eval::Error::WrongType,
540 )
541 }
542
543 pub fn json_variation_detail(
550 &self,
551 context: &Context,
552 flag_key: &str,
553 default: serde_json::Value,
554 ) -> Detail<serde_json::Value> {
555 self.variation_detail(context, flag_key, default.clone())
556 .try_map(|val| val.as_json(), default, eval::Error::WrongType)
557 }
558
559 pub fn secure_mode_hash(&self, context: &Context) -> String {
564 let key = aws_lc_rs::hmac::Key::new(aws_lc_rs::hmac::HMAC_SHA256, self.sdk_key.as_bytes());
565 let tag = aws_lc_rs::hmac::sign(&key, context.canonical_key().as_bytes());
566
567 data_encoding::HEXLOWER.encode(tag.as_ref())
568 }
569
570 pub fn all_flags_detail(
581 &self,
582 context: &Context,
583 flag_state_config: FlagDetailConfig,
584 ) -> FlagDetail {
585 if self.offline {
586 warn!(
587 "all_flags_detail() called, but client is in offline mode. Returning empty state"
588 );
589 return FlagDetail::new(false);
590 }
591
592 if !self.initialized() {
593 warn!("all_flags_detail() called before client has finished initializing! Feature store unavailable - returning empty state");
594 return FlagDetail::new(false);
595 }
596
597 let data_store = self.data_store.read();
598
599 let mut flag_detail = FlagDetail::new(true);
600 flag_detail.populate(&*data_store, context, flag_state_config);
601
602 flag_detail
603 }
604
605 pub fn variation_detail<T: Into<FlagValue> + Clone>(
611 &self,
612 context: &Context,
613 flag_key: &str,
614 default: T,
615 ) -> Detail<FlagValue> {
616 let (detail, _) =
617 self.variation_internal(context, flag_key, default, &self.events_with_reasons);
618 detail
619 }
620
621 pub fn variation<T: Into<FlagValue> + Clone>(
632 &self,
633 context: &Context,
634 flag_key: &str,
635 default: T,
636 ) -> FlagValue {
637 let (detail, _) = self.variation_internal(context, flag_key, default, &self.events_default);
638 detail.value.unwrap()
639 }
640
641 pub fn migration_variation(
646 &self,
647 context: &Context,
648 flag_key: &str,
649 default_stage: Stage,
650 ) -> (Stage, Arc<Mutex<MigrationOpTracker>>) {
651 let (detail, flag) =
652 self.variation_internal(context, flag_key, default_stage, &self.events_default);
653
654 let migration_detail =
655 detail.try_map(|v| v.try_into().ok(), default_stage, eval::Error::WrongType);
656 let tracker = MigrationOpTracker::new(
657 flag_key.into(),
658 flag,
659 context.clone(),
660 migration_detail.clone(),
661 default_stage,
662 );
663
664 (
665 migration_detail.value.unwrap_or(default_stage),
666 Arc::new(Mutex::new(tracker)),
667 )
668 }
669
670 pub fn track_event(&self, context: Context, key: impl Into<String>) {
680 let _ = self.track(context, key, None, serde_json::Value::Null);
681 }
682
683 pub fn track_data(
696 &self,
697 context: Context,
698 key: impl Into<String>,
699 data: impl Serialize,
700 ) -> serde_json::Result<()> {
701 self.track(context, key, None, data)
702 }
703
704 pub fn track_metric(
715 &self,
716 context: Context,
717 key: impl Into<String>,
718 value: f64,
719 data: impl Serialize,
720 ) {
721 let _ = self.track(context, key, Some(value), data);
722 }
723
724 fn track(
725 &self,
726 context: Context,
727 key: impl Into<String>,
728 metric_value: Option<f64>,
729 data: impl Serialize,
730 ) -> serde_json::Result<()> {
731 if !self.events_default.disabled {
732 let event =
733 self.events_default
734 .event_factory
735 .new_custom(context, key, metric_value, data)?;
736
737 self.send_internal(event);
738 }
739
740 Ok(())
741 }
742
743 pub fn track_migration_op(&self, tracker: Arc<Mutex<MigrationOpTracker>>) {
750 if self.events_default.disabled {
751 return;
752 }
753
754 match tracker.lock() {
755 Ok(tracker) => {
756 let event = tracker.build();
757 match event {
758 Ok(event) => {
759 self.send_internal(
760 self.events_default.event_factory.new_migration_op(event),
761 );
762 }
763 Err(e) => error!(
764 "Failed to build migration event, no event will be sent: {}",
765 e
766 ),
767 }
768 }
769 Err(e) => error!(
770 "Failed to lock migration tracker, no event will be sent: {}",
771 e
772 ),
773 }
774 }
775
776 fn variation_internal<T: Into<FlagValue> + Clone>(
777 &self,
778 context: &Context,
779 flag_key: &str,
780 default: T,
781 events_scope: &EventsScope,
782 ) -> (Detail<FlagValue>, Option<eval::Flag>) {
783 if self.offline {
784 return (
785 Detail::err_default(eval::Error::ClientNotReady, default.into()),
786 None,
787 );
788 }
789
790 let (flag, result) = match self.initialized() {
791 false => (
792 None,
793 Detail::err_default(eval::Error::ClientNotReady, default.clone().into()),
794 ),
795 true => {
796 let data_store = self.data_store.read();
797 match data_store.flag(flag_key) {
798 Some(flag) => {
799 let result = eval::evaluate(
800 data_store.to_store(),
801 &flag,
802 context,
803 Some(&*events_scope.prerequisite_event_recorder),
804 )
805 .map(|v| v.clone())
806 .or(default.clone().into());
807
808 (Some(flag), result)
809 }
810 None => (
811 None,
812 Detail::err_default(eval::Error::FlagNotFound, default.clone().into()),
813 ),
814 }
815 }
816 };
817
818 if !events_scope.disabled {
819 let event = match &flag {
820 Some(f) => events_scope.event_factory.new_eval_event(
821 flag_key,
822 context.clone(),
823 f,
824 result.clone(),
825 default.into(),
826 None,
827 ),
828 None => events_scope.event_factory.new_unknown_flag_event(
829 flag_key,
830 context.clone(),
831 result.clone(),
832 default.into(),
833 ),
834 };
835 self.send_internal(event);
836 }
837
838 (result, flag)
839 }
840
841 fn send_internal(&self, event: InputEvent) {
842 self.event_processor.send(event);
843 }
844}
845
846#[cfg(test)]
847mod tests {
848 use assert_json_diff::assert_json_eq;
849 use crossbeam_channel::Receiver;
850 use eval::{ContextBuilder, MultiContextBuilder};
851 use futures::FutureExt;
852 use hyper::client::HttpConnector;
853 use launchdarkly_server_sdk_evaluation::{Flag, Reason, Segment};
854 use maplit::hashmap;
855 use std::collections::HashMap;
856 use tokio::time::Instant;
857
858 use crate::data_source::MockDataSource;
859 use crate::data_source_builders::MockDataSourceBuilder;
860 use crate::events::create_event_sender;
861 use crate::events::event::{OutputEvent, VariationKey};
862 use crate::events::processor_builders::EventProcessorBuilder;
863 use crate::stores::persistent_store::tests::InMemoryPersistentDataStore;
864 use crate::stores::store_types::{PatchTarget, StorageItem};
865 use crate::test_common::{
866 self, basic_flag, basic_flag_with_prereq, basic_flag_with_prereqs_and_visibility,
867 basic_flag_with_visibility, basic_int_flag, basic_migration_flag, basic_off_flag,
868 };
869 use crate::{
870 AllData, ConfigBuilder, MigratorBuilder, NullEventProcessorBuilder, Operation, Origin,
871 PersistentDataStore, PersistentDataStoreBuilder, PersistentDataStoreFactory,
872 SerializedItem,
873 };
874 use test_case::test_case;
875
876 use super::*;
877
878 fn is_send_and_sync<T: Send + Sync>() {}
879
880 #[test]
881 fn ensure_client_is_send_and_sync() {
882 is_send_and_sync::<Client>()
883 }
884
885 #[tokio::test]
886 async fn client_asynchronously_initializes() {
887 let (client, _event_rx) = make_mocked_client_with_delay(1000, false, false);
888 client.start_with_default_executor();
889
890 let now = Instant::now();
891 let initialized = client.initialized_async().await;
892 let elapsed_time = now.elapsed();
893 assert!(initialized);
894 assert!(elapsed_time.as_millis() > 500)
896 }
897
898 #[tokio::test]
899 async fn client_asynchronously_initializes_within_timeout() {
900 let (client, _event_rx) = make_mocked_client_with_delay(1000, false, false);
901 client.start_with_default_executor();
902
903 let now = Instant::now();
904 let initialized = client
905 .wait_for_initialization(Duration::from_millis(1500))
906 .await;
907 let elapsed_time = now.elapsed();
908 assert!(elapsed_time.as_millis() > 500);
910 assert_eq!(initialized, Some(true));
911 }
912
913 #[tokio::test]
914 async fn client_asynchronously_initializes_slower_than_timeout() {
915 let (client, _event_rx) = make_mocked_client_with_delay(2000, false, false);
916 client.start_with_default_executor();
917
918 let now = Instant::now();
919 let initialized = client
920 .wait_for_initialization(Duration::from_millis(500))
921 .await;
922 let elapsed_time = now.elapsed();
923 assert!(elapsed_time.as_millis() < 750);
925 assert!(initialized.is_none());
926 }
927
928 #[tokio::test]
929 async fn client_initializes_immediately_in_offline_mode() {
930 let (client, _event_rx) = make_mocked_client_with_delay(1000, true, false);
931 client.start_with_default_executor();
932
933 assert!(client.initialized());
934
935 let now = Instant::now();
936 let initialized = client
937 .wait_for_initialization(Duration::from_millis(2000))
938 .await;
939 let elapsed_time = now.elapsed();
940 assert_eq!(initialized, Some(true));
941 assert!(elapsed_time.as_millis() < 500)
942 }
943
944 #[tokio::test]
945 async fn client_initializes_immediately_in_daemon_mode() {
946 let (client, _event_rx) = make_mocked_client_with_delay(1000, false, true);
947 client.start_with_default_executor();
948
949 assert!(client.initialized());
950
951 let now = Instant::now();
952 let initialized = client
953 .wait_for_initialization(Duration::from_millis(2000))
954 .await;
955 let elapsed_time = now.elapsed();
956 assert_eq!(initialized, Some(true));
957 assert!(elapsed_time.as_millis() < 500)
958 }
959
960 #[test_case(basic_flag("myFlag"), false.into(), true.into())]
961 #[test_case(basic_int_flag("myFlag"), 0.into(), test_common::FLOAT_TO_INT_MAX.into())]
962 fn client_updates_changes_evaluation_results(
963 flag: eval::Flag,
964 default: FlagValue,
965 expected: FlagValue,
966 ) {
967 let context = ContextBuilder::new("foo")
968 .build()
969 .expect("Failed to create context");
970
971 let (client, _event_rx) = make_mocked_client();
972
973 let result = client.variation_detail(&context, "myFlag", default.clone());
974 assert_eq!(result.value.unwrap(), default);
975
976 client.start_with_default_executor();
977 client
978 .data_store
979 .write()
980 .upsert(
981 &flag.key,
982 PatchTarget::Flag(StorageItem::Item(flag.clone())),
983 )
984 .expect("patch should apply");
985
986 let result = client.variation_detail(&context, "myFlag", default);
987 assert_eq!(result.value.unwrap(), expected);
988 assert!(matches!(
989 result.reason,
990 Reason::Fallthrough {
991 in_experiment: false
992 }
993 ));
994 }
995
996 #[test]
997 fn all_flags_detail_is_invalid_when_offline() {
998 let (client, _event_rx) = make_mocked_offline_client();
999 client.start_with_default_executor();
1000
1001 let context = ContextBuilder::new("bob")
1002 .build()
1003 .expect("Failed to create context");
1004
1005 let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1006 assert_json_eq!(all_flags, json!({"$valid": false, "$flagsState" : {}}));
1007 }
1008
1009 #[test]
1010 fn all_flags_detail_is_invalid_when_not_initialized() {
1011 let (client, _event_rx) = make_mocked_client();
1012
1013 let context = ContextBuilder::new("bob")
1014 .build()
1015 .expect("Failed to create context");
1016
1017 let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1018 assert_json_eq!(all_flags, json!({"$valid": false, "$flagsState" : {}}));
1019 }
1020
1021 #[test]
1022 fn all_flags_detail_returns_flag_states() {
1023 let (client, _event_rx) = make_mocked_client();
1024 client.start_with_default_executor();
1025 client
1026 .data_store
1027 .write()
1028 .upsert(
1029 "myFlag1",
1030 PatchTarget::Flag(StorageItem::Item(basic_flag("myFlag1"))),
1031 )
1032 .expect("patch should apply");
1033 client
1034 .data_store
1035 .write()
1036 .upsert(
1037 "myFlag2",
1038 PatchTarget::Flag(StorageItem::Item(basic_flag("myFlag2"))),
1039 )
1040 .expect("patch should apply");
1041 let context = ContextBuilder::new("bob")
1042 .build()
1043 .expect("Failed to create context");
1044
1045 let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1046
1047 client.close();
1048
1049 assert_json_eq!(
1050 all_flags,
1051 json!({
1052 "myFlag1": true,
1053 "myFlag2": true,
1054 "$flagsState": {
1055 "myFlag1": {
1056 "version": 42,
1057 "variation": 1
1058 },
1059 "myFlag2": {
1060 "version": 42,
1061 "variation": 1
1062 },
1063 },
1064 "$valid": true
1065 })
1066 );
1067 }
1068
1069 #[test]
1070 fn all_flags_detail_returns_prerequisite_relations() {
1071 let (client, _event_rx) = make_mocked_client();
1072 client.start_with_default_executor();
1073 client
1074 .data_store
1075 .write()
1076 .upsert(
1077 "prereq1",
1078 PatchTarget::Flag(StorageItem::Item(basic_flag("prereq1"))),
1079 )
1080 .expect("patch should apply");
1081 client
1082 .data_store
1083 .write()
1084 .upsert(
1085 "prereq2",
1086 PatchTarget::Flag(StorageItem::Item(basic_flag("prereq2"))),
1087 )
1088 .expect("patch should apply");
1089
1090 client
1091 .data_store
1092 .write()
1093 .upsert(
1094 "toplevel",
1095 PatchTarget::Flag(StorageItem::Item(basic_flag_with_prereqs_and_visibility(
1096 "toplevel",
1097 &["prereq1", "prereq2"],
1098 false,
1099 ))),
1100 )
1101 .expect("patch should apply");
1102
1103 let context = ContextBuilder::new("bob")
1104 .build()
1105 .expect("Failed to create context");
1106
1107 let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1108
1109 client.close();
1110
1111 assert_json_eq!(
1112 all_flags,
1113 json!({
1114 "prereq1": true,
1115 "prereq2": true,
1116 "toplevel": true,
1117 "$flagsState": {
1118 "toplevel": {
1119 "version": 42,
1120 "variation": 1,
1121 "prerequisites": ["prereq1", "prereq2"]
1122 },
1123 "prereq1": {
1124 "version": 42,
1125 "variation": 1
1126 },
1127 "prereq2": {
1128 "version": 42,
1129 "variation": 1
1130 },
1131 },
1132 "$valid": true
1133 })
1134 );
1135 }
1136
1137 #[test]
1138 fn all_flags_detail_returns_prerequisite_relations_when_not_visible_to_clients() {
1139 let (client, _event_rx) = make_mocked_client();
1140 client.start_with_default_executor();
1141 client
1142 .data_store
1143 .write()
1144 .upsert(
1145 "prereq1",
1146 PatchTarget::Flag(StorageItem::Item(basic_flag_with_visibility(
1147 "prereq1", false,
1148 ))),
1149 )
1150 .expect("patch should apply");
1151 client
1152 .data_store
1153 .write()
1154 .upsert(
1155 "prereq2",
1156 PatchTarget::Flag(StorageItem::Item(basic_flag_with_visibility(
1157 "prereq2", false,
1158 ))),
1159 )
1160 .expect("patch should apply");
1161
1162 client
1163 .data_store
1164 .write()
1165 .upsert(
1166 "toplevel",
1167 PatchTarget::Flag(StorageItem::Item(basic_flag_with_prereqs_and_visibility(
1168 "toplevel",
1169 &["prereq1", "prereq2"],
1170 true,
1171 ))),
1172 )
1173 .expect("patch should apply");
1174
1175 let context = ContextBuilder::new("bob")
1176 .build()
1177 .expect("Failed to create context");
1178
1179 let mut config = FlagDetailConfig::new();
1180 config.client_side_only();
1181
1182 let all_flags = client.all_flags_detail(&context, config);
1183
1184 client.close();
1185
1186 assert_json_eq!(
1187 all_flags,
1188 json!({
1189 "toplevel": true,
1190 "$flagsState": {
1191 "toplevel": {
1192 "version": 42,
1193 "variation": 1,
1194 "prerequisites": ["prereq1", "prereq2"]
1195 },
1196 },
1197 "$valid": true
1198 })
1199 );
1200 }
1201
1202 #[test]
1203 fn variation_tracks_events_correctly() {
1204 let (client, event_rx) = make_mocked_client();
1205 client.start_with_default_executor();
1206 client
1207 .data_store
1208 .write()
1209 .upsert(
1210 "myFlag",
1211 PatchTarget::Flag(StorageItem::Item(basic_flag("myFlag"))),
1212 )
1213 .expect("patch should apply");
1214 let context = ContextBuilder::new("bob")
1215 .build()
1216 .expect("Failed to create context");
1217
1218 let flag_value = client.variation(&context, "myFlag", FlagValue::Bool(false));
1219
1220 assert!(flag_value.as_bool().unwrap());
1221 client.flush();
1222 client.close();
1223
1224 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1225 assert_eq!(events.len(), 2);
1226 assert_eq!(events[0].kind(), "index");
1227 assert_eq!(events[1].kind(), "summary");
1228
1229 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1230 let variation_key = VariationKey {
1231 version: Some(42),
1232 variation: Some(1),
1233 };
1234 let feature = event_summary.features.get("myFlag");
1235 assert!(feature.is_some());
1236
1237 let feature = feature.unwrap();
1238 assert!(feature.counters.contains_key(&variation_key));
1239 } else {
1240 panic!("Event should be a summary type");
1241 }
1242 }
1243
1244 #[test]
1245 fn variation_handles_offline_mode() {
1246 let (client, event_rx) = make_mocked_offline_client();
1247 client.start_with_default_executor();
1248
1249 let context = ContextBuilder::new("bob")
1250 .build()
1251 .expect("Failed to create context");
1252 let flag_value = client.variation(&context, "myFlag", FlagValue::Bool(false));
1253
1254 assert!(!flag_value.as_bool().unwrap());
1255 client.flush();
1256 client.close();
1257
1258 assert_eq!(event_rx.iter().count(), 0);
1259 }
1260
1261 #[test]
1262 fn variation_handles_unknown_flags() {
1263 let (client, event_rx) = make_mocked_client();
1264 client.start_with_default_executor();
1265 let context = ContextBuilder::new("bob")
1266 .build()
1267 .expect("Failed to create context");
1268
1269 let flag_value = client.variation(&context, "non-existent-flag", FlagValue::Bool(false));
1270
1271 assert!(!flag_value.as_bool().unwrap());
1272 client.flush();
1273 client.close();
1274
1275 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1276 assert_eq!(events.len(), 2);
1277 assert_eq!(events[0].kind(), "index");
1278 assert_eq!(events[1].kind(), "summary");
1279
1280 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1281 let variation_key = VariationKey {
1282 version: None,
1283 variation: None,
1284 };
1285
1286 let feature = event_summary.features.get("non-existent-flag");
1287 assert!(feature.is_some());
1288
1289 let feature = feature.unwrap();
1290 assert!(feature.counters.contains_key(&variation_key));
1291 } else {
1292 panic!("Event should be a summary type");
1293 }
1294 }
1295
1296 #[test]
1297 fn variation_detail_handles_debug_events_correctly() {
1298 let (client, event_rx) = make_mocked_client();
1299 client.start_with_default_executor();
1300
1301 let mut flag = basic_flag("myFlag");
1302 flag.debug_events_until_date = Some(64_060_606_800_000); client
1305 .data_store
1306 .write()
1307 .upsert(
1308 &flag.key,
1309 PatchTarget::Flag(StorageItem::Item(flag.clone())),
1310 )
1311 .expect("patch should apply");
1312 let context = ContextBuilder::new("bob")
1313 .build()
1314 .expect("Failed to create context");
1315
1316 let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1317
1318 assert!(detail.value.unwrap().as_bool().unwrap());
1319 assert!(matches!(
1320 detail.reason,
1321 Reason::Fallthrough {
1322 in_experiment: false
1323 }
1324 ));
1325 client.flush();
1326 client.close();
1327
1328 let events = event_rx.try_iter().collect::<Vec<OutputEvent>>();
1329 assert_eq!(events.len(), 3);
1330 assert_eq!(events[0].kind(), "index");
1331 assert_eq!(events[1].kind(), "debug");
1332 assert_eq!(events[2].kind(), "summary");
1333
1334 if let OutputEvent::Summary(event_summary) = events[2].clone() {
1335 let variation_key = VariationKey {
1336 version: Some(42),
1337 variation: Some(1),
1338 };
1339
1340 let feature = event_summary.features.get("myFlag");
1341 assert!(feature.is_some());
1342
1343 let feature = feature.unwrap();
1344 assert!(feature.counters.contains_key(&variation_key));
1345 } else {
1346 panic!("Event should be a summary type");
1347 }
1348 }
1349
1350 #[test]
1351 fn variation_detail_tracks_events_correctly() {
1352 let (client, event_rx) = make_mocked_client();
1353 client.start_with_default_executor();
1354
1355 client
1356 .data_store
1357 .write()
1358 .upsert(
1359 "myFlag",
1360 PatchTarget::Flag(StorageItem::Item(basic_flag("myFlag"))),
1361 )
1362 .expect("patch should apply");
1363 let context = ContextBuilder::new("bob")
1364 .build()
1365 .expect("Failed to create context");
1366
1367 let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1368
1369 assert!(detail.value.unwrap().as_bool().unwrap());
1370 assert!(matches!(
1371 detail.reason,
1372 Reason::Fallthrough {
1373 in_experiment: false
1374 }
1375 ));
1376 client.flush();
1377 client.close();
1378
1379 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1380 assert_eq!(events.len(), 2);
1381 assert_eq!(events[0].kind(), "index");
1382 assert_eq!(events[1].kind(), "summary");
1383
1384 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1385 let variation_key = VariationKey {
1386 version: Some(42),
1387 variation: Some(1),
1388 };
1389
1390 let feature = event_summary.features.get("myFlag");
1391 assert!(feature.is_some());
1392
1393 let feature = feature.unwrap();
1394 assert!(feature.counters.contains_key(&variation_key));
1395 } else {
1396 panic!("Event should be a summary type");
1397 }
1398 }
1399
1400 #[test]
1401 fn variation_detail_handles_offline_mode() {
1402 let (client, event_rx) = make_mocked_offline_client();
1403 client.start_with_default_executor();
1404
1405 let context = ContextBuilder::new("bob")
1406 .build()
1407 .expect("Failed to create context");
1408
1409 let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1410
1411 assert!(!detail.value.unwrap().as_bool().unwrap());
1412 assert!(matches!(
1413 detail.reason,
1414 Reason::Error {
1415 error: eval::Error::ClientNotReady
1416 }
1417 ));
1418 client.flush();
1419 client.close();
1420
1421 assert_eq!(event_rx.iter().count(), 0);
1422 }
1423
1424 struct InMemoryPersistentDataStoreFactory {
1425 data: AllData<Flag, Segment>,
1426 initialized: bool,
1427 }
1428
1429 impl PersistentDataStoreFactory for InMemoryPersistentDataStoreFactory {
1430 fn create_persistent_data_store(
1431 &self,
1432 ) -> Result<Box<(dyn PersistentDataStore + 'static)>, std::io::Error> {
1433 let serialized_data =
1434 AllData::<SerializedItem, SerializedItem>::try_from(self.data.clone())?;
1435 Ok(Box::new(InMemoryPersistentDataStore {
1436 data: serialized_data,
1437 initialized: self.initialized,
1438 }))
1439 }
1440 }
1441
1442 #[test]
1443 fn variation_detail_handles_daemon_mode() {
1444 testing_logger::setup();
1445 let factory = InMemoryPersistentDataStoreFactory {
1446 data: AllData {
1447 flags: hashmap!["flag".into() => basic_flag("flag")],
1448 segments: HashMap::new(),
1449 },
1450 initialized: true,
1451 };
1452 let builder = PersistentDataStoreBuilder::new(Arc::new(factory));
1453
1454 let config = ConfigBuilder::new("sdk-key")
1455 .daemon_mode(true)
1456 .data_store(&builder)
1457 .event_processor(&NullEventProcessorBuilder::new())
1458 .build()
1459 .expect("config should build");
1460
1461 let client = Client::build(config).expect("Should be built.");
1462
1463 client.start_with_default_executor();
1464
1465 let context = ContextBuilder::new("bob")
1466 .build()
1467 .expect("Failed to create context");
1468
1469 let detail = client.variation_detail(&context, "flag", FlagValue::Bool(false));
1470
1471 assert!(detail.value.unwrap().as_bool().unwrap());
1472 assert!(matches!(
1473 detail.reason,
1474 Reason::Fallthrough {
1475 in_experiment: false
1476 }
1477 ));
1478 client.flush();
1479 client.close();
1480
1481 testing_logger::validate(|captured_logs| {
1482 assert_eq!(captured_logs.len(), 1);
1483 assert_eq!(
1484 captured_logs[0].body,
1485 "Started LaunchDarkly Client in daemon mode"
1486 );
1487 });
1488 }
1489
1490 #[test]
1491 fn daemon_mode_is_quiet_if_store_is_not_initialized() {
1492 testing_logger::setup();
1493
1494 let factory = InMemoryPersistentDataStoreFactory {
1495 data: AllData {
1496 flags: HashMap::new(),
1497 segments: HashMap::new(),
1498 },
1499 initialized: false,
1500 };
1501 let builder = PersistentDataStoreBuilder::new(Arc::new(factory));
1502
1503 let config = ConfigBuilder::new("sdk-key")
1504 .daemon_mode(true)
1505 .data_store(&builder)
1506 .event_processor(&NullEventProcessorBuilder::new())
1507 .build()
1508 .expect("config should build");
1509
1510 let client = Client::build(config).expect("Should be built.");
1511
1512 client.start_with_default_executor();
1513
1514 let context = ContextBuilder::new("bob")
1515 .build()
1516 .expect("Failed to create context");
1517
1518 client.variation_detail(&context, "flag", FlagValue::Bool(false));
1519
1520 testing_logger::validate(|captured_logs| {
1521 assert_eq!(captured_logs.len(), 1);
1522 assert_eq!(
1523 captured_logs[0].body,
1524 "Started LaunchDarkly Client in daemon mode"
1525 );
1526 });
1527 }
1528
1529 #[test]
1530 fn variation_handles_off_flag_without_variation() {
1531 let (client, event_rx) = make_mocked_client();
1532 client.start_with_default_executor();
1533
1534 client
1535 .data_store
1536 .write()
1537 .upsert(
1538 "myFlag",
1539 PatchTarget::Flag(StorageItem::Item(basic_off_flag("myFlag"))),
1540 )
1541 .expect("patch should apply");
1542 let context = ContextBuilder::new("bob")
1543 .build()
1544 .expect("Failed to create context");
1545
1546 let result = client.variation(&context, "myFlag", FlagValue::Bool(false));
1547
1548 assert!(!result.as_bool().unwrap());
1549 client.flush();
1550 client.close();
1551
1552 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1553 assert_eq!(events.len(), 2);
1554 assert_eq!(events[0].kind(), "index");
1555 assert_eq!(events[1].kind(), "summary");
1556
1557 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1558 let variation_key = VariationKey {
1559 version: Some(42),
1560 variation: None,
1561 };
1562 let feature = event_summary.features.get("myFlag");
1563 assert!(feature.is_some());
1564
1565 let feature = feature.unwrap();
1566 assert!(feature.counters.contains_key(&variation_key));
1567 } else {
1568 panic!("Event should be a summary type");
1569 }
1570 }
1571
1572 #[test]
1573 fn variation_detail_tracks_prereq_events_correctly() {
1574 let (client, event_rx) = make_mocked_client();
1575 client.start_with_default_executor();
1576
1577 let mut basic_preqreq_flag = basic_flag("prereqFlag");
1578 basic_preqreq_flag.track_events = true;
1579
1580 client
1581 .data_store
1582 .write()
1583 .upsert(
1584 "prereqFlag",
1585 PatchTarget::Flag(StorageItem::Item(basic_preqreq_flag)),
1586 )
1587 .expect("patch should apply");
1588
1589 let mut basic_flag = basic_flag_with_prereq("myFlag", "prereqFlag");
1590 basic_flag.track_events = true;
1591 client
1592 .data_store
1593 .write()
1594 .upsert("myFlag", PatchTarget::Flag(StorageItem::Item(basic_flag)))
1595 .expect("patch should apply");
1596 let context = ContextBuilder::new("bob")
1597 .build()
1598 .expect("Failed to create context");
1599
1600 let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1601
1602 assert!(detail.value.unwrap().as_bool().unwrap());
1603 assert!(matches!(
1604 detail.reason,
1605 Reason::Fallthrough {
1606 in_experiment: false
1607 }
1608 ));
1609 client.flush();
1610 client.close();
1611
1612 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1613 assert_eq!(events.len(), 4);
1614 assert_eq!(events[0].kind(), "index");
1615 assert_eq!(events[1].kind(), "feature");
1616 assert_eq!(events[2].kind(), "feature");
1617 assert_eq!(events[3].kind(), "summary");
1618
1619 if let OutputEvent::Summary(event_summary) = events[3].clone() {
1620 let variation_key = VariationKey {
1621 version: Some(42),
1622 variation: Some(1),
1623 };
1624 let feature = event_summary.features.get("myFlag");
1625 assert!(feature.is_some());
1626
1627 let feature = feature.unwrap();
1628 assert!(feature.counters.contains_key(&variation_key));
1629
1630 let variation_key = VariationKey {
1631 version: Some(42),
1632 variation: Some(1),
1633 };
1634 let feature = event_summary.features.get("prereqFlag");
1635 assert!(feature.is_some());
1636
1637 let feature = feature.unwrap();
1638 assert!(feature.counters.contains_key(&variation_key));
1639 }
1640 }
1641
1642 #[test]
1643 fn variation_handles_failed_prereqs_correctly() {
1644 let (client, event_rx) = make_mocked_client();
1645 client.start_with_default_executor();
1646
1647 let mut basic_preqreq_flag = basic_off_flag("prereqFlag");
1648 basic_preqreq_flag.track_events = true;
1649
1650 client
1651 .data_store
1652 .write()
1653 .upsert(
1654 "prereqFlag",
1655 PatchTarget::Flag(StorageItem::Item(basic_preqreq_flag)),
1656 )
1657 .expect("patch should apply");
1658
1659 let mut basic_flag = basic_flag_with_prereq("myFlag", "prereqFlag");
1660 basic_flag.track_events = true;
1661 client
1662 .data_store
1663 .write()
1664 .upsert("myFlag", PatchTarget::Flag(StorageItem::Item(basic_flag)))
1665 .expect("patch should apply");
1666 let context = ContextBuilder::new("bob")
1667 .build()
1668 .expect("Failed to create context");
1669
1670 let detail = client.variation(&context, "myFlag", FlagValue::Bool(false));
1671
1672 assert!(!detail.as_bool().unwrap());
1673 client.flush();
1674 client.close();
1675
1676 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1677 assert_eq!(events.len(), 4);
1678 assert_eq!(events[0].kind(), "index");
1679 assert_eq!(events[1].kind(), "feature");
1680 assert_eq!(events[2].kind(), "feature");
1681 assert_eq!(events[3].kind(), "summary");
1682
1683 if let OutputEvent::Summary(event_summary) = events[3].clone() {
1684 let variation_key = VariationKey {
1685 version: Some(42),
1686 variation: Some(0),
1687 };
1688 let feature = event_summary.features.get("myFlag");
1689 assert!(feature.is_some());
1690
1691 let feature = feature.unwrap();
1692 assert!(feature.counters.contains_key(&variation_key));
1693
1694 let variation_key = VariationKey {
1695 version: Some(42),
1696 variation: None,
1697 };
1698 let feature = event_summary.features.get("prereqFlag");
1699 assert!(feature.is_some());
1700
1701 let feature = feature.unwrap();
1702 assert!(feature.counters.contains_key(&variation_key));
1703 }
1704 }
1705
1706 #[test]
1707 fn variation_detail_handles_flag_not_found() {
1708 let (client, event_rx) = make_mocked_client();
1709 client.start_with_default_executor();
1710
1711 let context = ContextBuilder::new("bob")
1712 .build()
1713 .expect("Failed to create context");
1714 let detail = client.variation_detail(&context, "non-existent-flag", FlagValue::Bool(false));
1715
1716 assert!(!detail.value.unwrap().as_bool().unwrap());
1717 assert!(matches!(
1718 detail.reason,
1719 Reason::Error {
1720 error: eval::Error::FlagNotFound
1721 }
1722 ));
1723 client.flush();
1724 client.close();
1725
1726 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1727 assert_eq!(events.len(), 2);
1728 assert_eq!(events[0].kind(), "index");
1729 assert_eq!(events[1].kind(), "summary");
1730
1731 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1732 let variation_key = VariationKey {
1733 version: None,
1734 variation: None,
1735 };
1736 let feature = event_summary.features.get("non-existent-flag");
1737 assert!(feature.is_some());
1738
1739 let feature = feature.unwrap();
1740 assert!(feature.counters.contains_key(&variation_key));
1741 } else {
1742 panic!("Event should be a summary type");
1743 }
1744 }
1745
1746 #[tokio::test]
1747 async fn variation_detail_handles_client_not_ready() {
1748 let (client, event_rx) = make_mocked_client_with_delay(u64::MAX, false, false);
1749 client.start_with_default_executor();
1750 let context = ContextBuilder::new("bob")
1751 .build()
1752 .expect("Failed to create context");
1753
1754 let detail = client.variation_detail(&context, "non-existent-flag", FlagValue::Bool(false));
1755
1756 assert!(!detail.value.unwrap().as_bool().unwrap());
1757 assert!(matches!(
1758 detail.reason,
1759 Reason::Error {
1760 error: eval::Error::ClientNotReady
1761 }
1762 ));
1763 client.flush();
1764 client.close();
1765
1766 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1767 assert_eq!(events.len(), 2);
1768 assert_eq!(events[0].kind(), "index");
1769 assert_eq!(events[1].kind(), "summary");
1770
1771 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1772 let variation_key = VariationKey {
1773 version: None,
1774 variation: None,
1775 };
1776 let feature = event_summary.features.get("non-existent-flag");
1777 assert!(feature.is_some());
1778
1779 let feature = feature.unwrap();
1780 assert!(feature.counters.contains_key(&variation_key));
1781 } else {
1782 panic!("Event should be a summary type");
1783 }
1784 }
1785
1786 #[test]
1787 fn identify_sends_identify_event() {
1788 let (client, event_rx) = make_mocked_client();
1789 client.start_with_default_executor();
1790
1791 let context = ContextBuilder::new("bob")
1792 .build()
1793 .expect("Failed to create context");
1794
1795 client.identify(context);
1796 client.flush();
1797 client.close();
1798
1799 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1800 assert_eq!(events.len(), 1);
1801 assert_eq!(events[0].kind(), "identify");
1802 }
1803
1804 #[test]
1805 fn identify_sends_sends_nothing_in_offline_mode() {
1806 let (client, event_rx) = make_mocked_offline_client();
1807 client.start_with_default_executor();
1808
1809 let context = ContextBuilder::new("bob")
1810 .build()
1811 .expect("Failed to create context");
1812
1813 client.identify(context);
1814 client.flush();
1815 client.close();
1816
1817 assert_eq!(event_rx.iter().count(), 0);
1818 }
1819
1820 #[test]
1821 fn secure_mode_hash() {
1822 let config = ConfigBuilder::new("secret")
1823 .offline(true)
1824 .build()
1825 .expect("config should build");
1826 let client = Client::build(config).expect("Should be built.");
1827 let context = ContextBuilder::new("Message")
1828 .build()
1829 .expect("Failed to create context");
1830
1831 assert_eq!(
1832 client.secure_mode_hash(&context),
1833 "aa747c502a898200f9e4fa21bac68136f886a0e27aec70ba06daf2e2a5cb5597"
1834 );
1835 }
1836
1837 #[test]
1838 fn secure_mode_hash_with_multi_kind() {
1839 let config = ConfigBuilder::new("secret")
1840 .offline(true)
1841 .build()
1842 .expect("config should build");
1843 let client = Client::build(config).expect("Should be built.");
1844
1845 let org = ContextBuilder::new("org-key|1")
1846 .kind("org")
1847 .build()
1848 .expect("Failed to create context");
1849 let user = ContextBuilder::new("user-key:2")
1850 .build()
1851 .expect("Failed to create context");
1852
1853 let context = MultiContextBuilder::new()
1854 .add_context(org)
1855 .add_context(user)
1856 .build()
1857 .expect("failed to build multi-context");
1858
1859 assert_eq!(
1860 client.secure_mode_hash(&context),
1861 "5687e6383b920582ed50c2a96c98a115f1b6aad85a60579d761d9b8797415163"
1862 );
1863 }
1864
1865 #[derive(Serialize)]
1866 struct MyCustomData {
1867 pub answer: u32,
1868 }
1869
1870 #[test]
1871 fn track_sends_track_and_index_events() -> serde_json::Result<()> {
1872 let (client, event_rx) = make_mocked_client();
1873 client.start_with_default_executor();
1874
1875 let context = ContextBuilder::new("bob")
1876 .build()
1877 .expect("Failed to create context");
1878
1879 client.track_event(context.clone(), "event-with-null");
1880 client.track_data(context.clone(), "event-with-string", "string-data")?;
1881 client.track_data(context.clone(), "event-with-json", json!({"answer": 42}))?;
1882 client.track_data(
1883 context.clone(),
1884 "event-with-struct",
1885 MyCustomData { answer: 42 },
1886 )?;
1887 client.track_metric(context, "event-with-metric", 42.0, serde_json::Value::Null);
1888
1889 client.flush();
1890 client.close();
1891
1892 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1893 assert_eq!(events.len(), 6);
1894
1895 let mut events_by_type: HashMap<&str, usize> = HashMap::new();
1896 for event in events {
1897 if let Some(count) = events_by_type.get_mut(event.kind()) {
1898 *count += 1;
1899 } else {
1900 events_by_type.insert(event.kind(), 1);
1901 }
1902 }
1903 assert!(matches!(events_by_type.get("index"), Some(1)));
1904 assert!(matches!(events_by_type.get("custom"), Some(5)));
1905
1906 Ok(())
1907 }
1908
1909 #[test]
1910 fn track_sends_nothing_in_offline_mode() -> serde_json::Result<()> {
1911 let (client, event_rx) = make_mocked_offline_client();
1912 client.start_with_default_executor();
1913
1914 let context = ContextBuilder::new("bob")
1915 .build()
1916 .expect("Failed to create context");
1917
1918 client.track_event(context.clone(), "event-with-null");
1919 client.track_data(context.clone(), "event-with-string", "string-data")?;
1920 client.track_data(context.clone(), "event-with-json", json!({"answer": 42}))?;
1921 client.track_data(
1922 context.clone(),
1923 "event-with-struct",
1924 MyCustomData { answer: 42 },
1925 )?;
1926 client.track_metric(context, "event-with-metric", 42.0, serde_json::Value::Null);
1927
1928 client.flush();
1929 client.close();
1930
1931 assert_eq!(event_rx.iter().count(), 0);
1932
1933 Ok(())
1934 }
1935
1936 #[test]
1937 fn migration_handles_flag_not_found() {
1938 let (client, _event_rx) = make_mocked_client();
1939 client.start_with_default_executor();
1940
1941 let context = ContextBuilder::new("bob")
1942 .build()
1943 .expect("Failed to create context");
1944
1945 let (stage, _tracker) =
1946 client.migration_variation(&context, "non-existent-flag-key", Stage::Off);
1947
1948 assert_eq!(stage, Stage::Off);
1949 }
1950
1951 #[test]
1952 fn migration_uses_non_migration_flag() {
1953 let (client, _event_rx) = make_mocked_client();
1954 client.start_with_default_executor();
1955 client
1956 .data_store
1957 .write()
1958 .upsert(
1959 "boolean-flag",
1960 PatchTarget::Flag(StorageItem::Item(basic_flag("boolean-flag"))),
1961 )
1962 .expect("patch should apply");
1963
1964 let context = ContextBuilder::new("bob")
1965 .build()
1966 .expect("Failed to create context");
1967
1968 let (stage, _tracker) = client.migration_variation(&context, "boolean-flag", Stage::Off);
1969
1970 assert_eq!(stage, Stage::Off);
1971 }
1972
1973 #[test_case(Stage::Off)]
1974 #[test_case(Stage::DualWrite)]
1975 #[test_case(Stage::Shadow)]
1976 #[test_case(Stage::Live)]
1977 #[test_case(Stage::Rampdown)]
1978 #[test_case(Stage::Complete)]
1979 fn migration_can_determine_correct_stage_from_flag(stage: Stage) {
1980 let (client, _event_rx) = make_mocked_client();
1981 client.start_with_default_executor();
1982 client
1983 .data_store
1984 .write()
1985 .upsert(
1986 "stage-flag",
1987 PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
1988 )
1989 .expect("patch should apply");
1990
1991 let context = ContextBuilder::new("bob")
1992 .build()
1993 .expect("Failed to create context");
1994
1995 let (evaluated_stage, _tracker) =
1996 client.migration_variation(&context, "stage-flag", Stage::Off);
1997
1998 assert_eq!(evaluated_stage, stage);
1999 }
2000
2001 #[tokio::test]
2002 async fn migration_tracks_invoked_correctly() {
2003 migration_tracks_invoked_correctly_driver(Stage::Off, Operation::Read, vec![Origin::Old])
2004 .await;
2005 migration_tracks_invoked_correctly_driver(
2006 Stage::DualWrite,
2007 Operation::Read,
2008 vec![Origin::Old],
2009 )
2010 .await;
2011 migration_tracks_invoked_correctly_driver(
2012 Stage::Shadow,
2013 Operation::Read,
2014 vec![Origin::Old, Origin::New],
2015 )
2016 .await;
2017 migration_tracks_invoked_correctly_driver(
2018 Stage::Live,
2019 Operation::Read,
2020 vec![Origin::Old, Origin::New],
2021 )
2022 .await;
2023 migration_tracks_invoked_correctly_driver(
2024 Stage::Rampdown,
2025 Operation::Read,
2026 vec![Origin::New],
2027 )
2028 .await;
2029 migration_tracks_invoked_correctly_driver(
2030 Stage::Complete,
2031 Operation::Read,
2032 vec![Origin::New],
2033 )
2034 .await;
2035 migration_tracks_invoked_correctly_driver(Stage::Off, Operation::Write, vec![Origin::Old])
2036 .await;
2037 migration_tracks_invoked_correctly_driver(
2038 Stage::DualWrite,
2039 Operation::Write,
2040 vec![Origin::Old, Origin::New],
2041 )
2042 .await;
2043 migration_tracks_invoked_correctly_driver(
2044 Stage::Shadow,
2045 Operation::Write,
2046 vec![Origin::Old, Origin::New],
2047 )
2048 .await;
2049 migration_tracks_invoked_correctly_driver(
2050 Stage::Live,
2051 Operation::Write,
2052 vec![Origin::Old, Origin::New],
2053 )
2054 .await;
2055 migration_tracks_invoked_correctly_driver(
2056 Stage::Rampdown,
2057 Operation::Write,
2058 vec![Origin::Old, Origin::New],
2059 )
2060 .await;
2061 migration_tracks_invoked_correctly_driver(
2062 Stage::Complete,
2063 Operation::Write,
2064 vec![Origin::New],
2065 )
2066 .await;
2067 }
2068
2069 async fn migration_tracks_invoked_correctly_driver(
2070 stage: Stage,
2071 operation: Operation,
2072 origins: Vec<Origin>,
2073 ) {
2074 let (client, event_rx) = make_mocked_client();
2075 let client = Arc::new(client);
2076 client.start_with_default_executor();
2077 client
2078 .data_store
2079 .write()
2080 .upsert(
2081 "stage-flag",
2082 PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2083 )
2084 .expect("patch should apply");
2085
2086 let mut migrator = MigratorBuilder::new(client.clone())
2087 .read(
2088 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2089 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2090 Some(|_, _| true),
2091 )
2092 .write(
2093 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2094 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2095 )
2096 .build()
2097 .expect("migrator should build");
2098
2099 let context = ContextBuilder::new("bob")
2100 .build()
2101 .expect("Failed to create context");
2102
2103 if let Operation::Read = operation {
2104 migrator
2105 .read(
2106 &context,
2107 "stage-flag".into(),
2108 Stage::Off,
2109 serde_json::Value::Null,
2110 )
2111 .await;
2112 } else {
2113 migrator
2114 .write(
2115 &context,
2116 "stage-flag".into(),
2117 Stage::Off,
2118 serde_json::Value::Null,
2119 )
2120 .await;
2121 }
2122
2123 client.flush();
2124 client.close();
2125
2126 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2127 assert_eq!(events.len(), 3);
2128 match &events[1] {
2129 OutputEvent::MigrationOp(event) => {
2130 assert!(event.invoked.len() == origins.len());
2131 assert!(event.invoked.iter().all(|i| origins.contains(i)));
2132 }
2133 _ => panic!("Expected migration event"),
2134 }
2135 }
2136
2137 #[tokio::test]
2138 async fn migration_tracks_latency() {
2139 migration_tracks_latency_driver(Stage::Off, Operation::Read, vec![Origin::Old]).await;
2140 migration_tracks_latency_driver(Stage::DualWrite, Operation::Read, vec![Origin::Old]).await;
2141 migration_tracks_latency_driver(
2142 Stage::Shadow,
2143 Operation::Read,
2144 vec![Origin::Old, Origin::New],
2145 )
2146 .await;
2147 migration_tracks_latency_driver(
2148 Stage::Live,
2149 Operation::Read,
2150 vec![Origin::Old, Origin::New],
2151 )
2152 .await;
2153 migration_tracks_latency_driver(Stage::Rampdown, Operation::Read, vec![Origin::New]).await;
2154 migration_tracks_latency_driver(Stage::Complete, Operation::Read, vec![Origin::New]).await;
2155 migration_tracks_latency_driver(Stage::Off, Operation::Write, vec![Origin::Old]).await;
2156 migration_tracks_latency_driver(
2157 Stage::DualWrite,
2158 Operation::Write,
2159 vec![Origin::Old, Origin::New],
2160 )
2161 .await;
2162 migration_tracks_latency_driver(
2163 Stage::Shadow,
2164 Operation::Write,
2165 vec![Origin::Old, Origin::New],
2166 )
2167 .await;
2168 migration_tracks_latency_driver(
2169 Stage::Live,
2170 Operation::Write,
2171 vec![Origin::Old, Origin::New],
2172 )
2173 .await;
2174 migration_tracks_latency_driver(
2175 Stage::Rampdown,
2176 Operation::Write,
2177 vec![Origin::Old, Origin::New],
2178 )
2179 .await;
2180 migration_tracks_latency_driver(Stage::Complete, Operation::Write, vec![Origin::New]).await;
2181 }
2182
2183 async fn migration_tracks_latency_driver(
2184 stage: Stage,
2185 operation: Operation,
2186 origins: Vec<Origin>,
2187 ) {
2188 let (client, event_rx) = make_mocked_client();
2189 let client = Arc::new(client);
2190 client.start_with_default_executor();
2191 client
2192 .data_store
2193 .write()
2194 .upsert(
2195 "stage-flag",
2196 PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2197 )
2198 .expect("patch should apply");
2199
2200 let mut migrator = MigratorBuilder::new(client.clone())
2201 .track_latency(true)
2202 .read(
2203 |_| {
2204 async move {
2205 async_std::task::sleep(Duration::from_millis(100)).await;
2206 Ok(serde_json::Value::Null)
2207 }
2208 .boxed()
2209 },
2210 |_| {
2211 async move {
2212 async_std::task::sleep(Duration::from_millis(100)).await;
2213 Ok(serde_json::Value::Null)
2214 }
2215 .boxed()
2216 },
2217 Some(|_, _| true),
2218 )
2219 .write(
2220 |_| {
2221 async move {
2222 async_std::task::sleep(Duration::from_millis(100)).await;
2223 Ok(serde_json::Value::Null)
2224 }
2225 .boxed()
2226 },
2227 |_| {
2228 async move {
2229 async_std::task::sleep(Duration::from_millis(100)).await;
2230 Ok(serde_json::Value::Null)
2231 }
2232 .boxed()
2233 },
2234 )
2235 .build()
2236 .expect("migrator should build");
2237
2238 let context = ContextBuilder::new("bob")
2239 .build()
2240 .expect("Failed to create context");
2241
2242 if let Operation::Read = operation {
2243 migrator
2244 .read(
2245 &context,
2246 "stage-flag".into(),
2247 Stage::Off,
2248 serde_json::Value::Null,
2249 )
2250 .await;
2251 } else {
2252 migrator
2253 .write(
2254 &context,
2255 "stage-flag".into(),
2256 Stage::Off,
2257 serde_json::Value::Null,
2258 )
2259 .await;
2260 }
2261
2262 client.flush();
2263 client.close();
2264
2265 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2266 assert_eq!(events.len(), 3);
2267 match &events[1] {
2268 OutputEvent::MigrationOp(event) => {
2269 assert!(event.latency.len() == origins.len());
2270 assert!(event
2271 .latency
2272 .values()
2273 .all(|l| l > &Duration::from_millis(100)));
2274 }
2275 _ => panic!("Expected migration event"),
2276 }
2277 }
2278
2279 #[tokio::test]
2280 async fn migration_tracks_read_errors() {
2281 migration_tracks_read_errors_driver(Stage::Off, vec![Origin::Old]).await;
2282 migration_tracks_read_errors_driver(Stage::DualWrite, vec![Origin::Old]).await;
2283 migration_tracks_read_errors_driver(Stage::Shadow, vec![Origin::Old, Origin::New]).await;
2284 migration_tracks_read_errors_driver(Stage::Live, vec![Origin::Old, Origin::New]).await;
2285 migration_tracks_read_errors_driver(Stage::Rampdown, vec![Origin::New]).await;
2286 migration_tracks_read_errors_driver(Stage::Complete, vec![Origin::New]).await;
2287 }
2288
2289 async fn migration_tracks_read_errors_driver(stage: Stage, origins: Vec<Origin>) {
2290 let (client, event_rx) = make_mocked_client();
2291 let client = Arc::new(client);
2292 client.start_with_default_executor();
2293 client
2294 .data_store
2295 .write()
2296 .upsert(
2297 "stage-flag",
2298 PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2299 )
2300 .expect("patch should apply");
2301
2302 let mut migrator = MigratorBuilder::new(client.clone())
2303 .track_latency(true)
2304 .read(
2305 |_| async move { Err("fail".into()) }.boxed(),
2306 |_| async move { Err("fail".into()) }.boxed(),
2307 Some(|_: &String, _: &String| true),
2308 )
2309 .write(
2310 |_| async move { Err("fail".into()) }.boxed(),
2311 |_| async move { Err("fail".into()) }.boxed(),
2312 )
2313 .build()
2314 .expect("migrator should build");
2315
2316 let context = ContextBuilder::new("bob")
2317 .build()
2318 .expect("Failed to create context");
2319
2320 migrator
2321 .read(
2322 &context,
2323 "stage-flag".into(),
2324 Stage::Off,
2325 serde_json::Value::Null,
2326 )
2327 .await;
2328 client.flush();
2329 client.close();
2330
2331 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2332 assert_eq!(events.len(), 3);
2333 match &events[1] {
2334 OutputEvent::MigrationOp(event) => {
2335 assert!(event.errors.len() == origins.len());
2336 assert!(event.errors.iter().all(|i| origins.contains(i)));
2337 }
2338 _ => panic!("Expected migration event"),
2339 }
2340 }
2341
2342 #[tokio::test]
2343 async fn migration_tracks_authoritative_write_errors() {
2344 migration_tracks_authoritative_write_errors_driver(Stage::Off, vec![Origin::Old]).await;
2345 migration_tracks_authoritative_write_errors_driver(Stage::DualWrite, vec![Origin::Old])
2346 .await;
2347 migration_tracks_authoritative_write_errors_driver(Stage::Shadow, vec![Origin::Old]).await;
2348 migration_tracks_authoritative_write_errors_driver(Stage::Live, vec![Origin::New]).await;
2349 migration_tracks_authoritative_write_errors_driver(Stage::Rampdown, vec![Origin::New])
2350 .await;
2351 migration_tracks_authoritative_write_errors_driver(Stage::Complete, vec![Origin::New])
2352 .await;
2353 }
2354
2355 async fn migration_tracks_authoritative_write_errors_driver(
2356 stage: Stage,
2357 origins: Vec<Origin>,
2358 ) {
2359 let (client, event_rx) = make_mocked_client();
2360 let client = Arc::new(client);
2361 client.start_with_default_executor();
2362 client
2363 .data_store
2364 .write()
2365 .upsert(
2366 "stage-flag",
2367 PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2368 )
2369 .expect("patch should apply");
2370
2371 let mut migrator = MigratorBuilder::new(client.clone())
2372 .track_latency(true)
2373 .read(
2374 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2375 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2376 None,
2377 )
2378 .write(
2379 |_| async move { Err("fail".into()) }.boxed(),
2380 |_| async move { Err("fail".into()) }.boxed(),
2381 )
2382 .build()
2383 .expect("migrator should build");
2384
2385 let context = ContextBuilder::new("bob")
2386 .build()
2387 .expect("Failed to create context");
2388
2389 migrator
2390 .write(
2391 &context,
2392 "stage-flag".into(),
2393 Stage::Off,
2394 serde_json::Value::Null,
2395 )
2396 .await;
2397
2398 client.flush();
2399 client.close();
2400
2401 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2402 assert_eq!(events.len(), 3);
2403 match &events[1] {
2404 OutputEvent::MigrationOp(event) => {
2405 assert!(event.errors.len() == origins.len());
2406 assert!(event.errors.iter().all(|i| origins.contains(i)));
2407 }
2408 _ => panic!("Expected migration event"),
2409 }
2410 }
2411
2412 #[tokio::test]
2413 async fn migration_tracks_nonauthoritative_write_errors() {
2414 migration_tracks_nonauthoritative_write_errors_driver(
2415 Stage::DualWrite,
2416 false,
2417 true,
2418 vec![Origin::New],
2419 )
2420 .await;
2421 migration_tracks_nonauthoritative_write_errors_driver(
2422 Stage::Shadow,
2423 false,
2424 true,
2425 vec![Origin::New],
2426 )
2427 .await;
2428 migration_tracks_nonauthoritative_write_errors_driver(
2429 Stage::Live,
2430 true,
2431 false,
2432 vec![Origin::Old],
2433 )
2434 .await;
2435 migration_tracks_nonauthoritative_write_errors_driver(
2436 Stage::Rampdown,
2437 true,
2438 false,
2439 vec![Origin::Old],
2440 )
2441 .await;
2442 }
2443
2444 async fn migration_tracks_nonauthoritative_write_errors_driver(
2445 stage: Stage,
2446 fail_old: bool,
2447 fail_new: bool,
2448 origins: Vec<Origin>,
2449 ) {
2450 let (client, event_rx) = make_mocked_client();
2451 let client = Arc::new(client);
2452 client.start_with_default_executor();
2453 client
2454 .data_store
2455 .write()
2456 .upsert(
2457 "stage-flag",
2458 PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2459 )
2460 .expect("patch should apply");
2461
2462 let mut migrator = MigratorBuilder::new(client.clone())
2463 .track_latency(true)
2464 .read(
2465 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2466 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2467 None,
2468 )
2469 .write(
2470 move |_| {
2471 async move {
2472 if fail_old {
2473 Err("fail".into())
2474 } else {
2475 Ok(serde_json::Value::Null)
2476 }
2477 }
2478 .boxed()
2479 },
2480 move |_| {
2481 async move {
2482 if fail_new {
2483 Err("fail".into())
2484 } else {
2485 Ok(serde_json::Value::Null)
2486 }
2487 }
2488 .boxed()
2489 },
2490 )
2491 .build()
2492 .expect("migrator should build");
2493
2494 let context = ContextBuilder::new("bob")
2495 .build()
2496 .expect("Failed to create context");
2497
2498 migrator
2499 .write(
2500 &context,
2501 "stage-flag".into(),
2502 Stage::Off,
2503 serde_json::Value::Null,
2504 )
2505 .await;
2506
2507 client.flush();
2508 client.close();
2509
2510 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2511 assert_eq!(events.len(), 3);
2512 match &events[1] {
2513 OutputEvent::MigrationOp(event) => {
2514 assert!(event.errors.len() == origins.len());
2515 assert!(event.errors.iter().all(|i| origins.contains(i)));
2516 }
2517 _ => panic!("Expected migration event"),
2518 }
2519 }
2520
2521 #[tokio::test]
2522 async fn migration_tracks_consistency() {
2523 migration_tracks_consistency_driver(Stage::Shadow, "same", "same", true).await;
2524 migration_tracks_consistency_driver(Stage::Shadow, "same", "different", false).await;
2525 migration_tracks_consistency_driver(Stage::Live, "same", "same", true).await;
2526 migration_tracks_consistency_driver(Stage::Live, "same", "different", false).await;
2527 }
2528
2529 async fn migration_tracks_consistency_driver(
2530 stage: Stage,
2531 old_return: &'static str,
2532 new_return: &'static str,
2533 expected_consistency: bool,
2534 ) {
2535 let (client, event_rx) = make_mocked_client();
2536 let client = Arc::new(client);
2537 client.start_with_default_executor();
2538 client
2539 .data_store
2540 .write()
2541 .upsert(
2542 "stage-flag",
2543 PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2544 )
2545 .expect("patch should apply");
2546
2547 let mut migrator = MigratorBuilder::new(client.clone())
2548 .track_latency(true)
2549 .read(
2550 |_| {
2551 async move {
2552 async_std::task::sleep(Duration::from_millis(100)).await;
2553 Ok(serde_json::Value::String(old_return.to_string()))
2554 }
2555 .boxed()
2556 },
2557 |_| {
2558 async move {
2559 async_std::task::sleep(Duration::from_millis(100)).await;
2560 Ok(serde_json::Value::String(new_return.to_string()))
2561 }
2562 .boxed()
2563 },
2564 Some(|lhs, rhs| lhs == rhs),
2565 )
2566 .write(
2567 |_| {
2568 async move {
2569 async_std::task::sleep(Duration::from_millis(100)).await;
2570 Ok(serde_json::Value::Null)
2571 }
2572 .boxed()
2573 },
2574 |_| {
2575 async move {
2576 async_std::task::sleep(Duration::from_millis(100)).await;
2577 Ok(serde_json::Value::Null)
2578 }
2579 .boxed()
2580 },
2581 )
2582 .build()
2583 .expect("migrator should build");
2584
2585 let context = ContextBuilder::new("bob")
2586 .build()
2587 .expect("Failed to create context");
2588
2589 migrator
2590 .read(
2591 &context,
2592 "stage-flag".into(),
2593 Stage::Off,
2594 serde_json::Value::Null,
2595 )
2596 .await;
2597
2598 client.flush();
2599 client.close();
2600
2601 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2602 assert_eq!(events.len(), 3);
2603 match &events[1] {
2604 OutputEvent::MigrationOp(event) => {
2605 assert!(event.consistency_check == Some(expected_consistency))
2606 }
2607 _ => panic!("Expected migration event"),
2608 }
2609 }
2610
2611 fn make_mocked_client_with_delay(
2612 delay: u64,
2613 offline: bool,
2614 daemon_mode: bool,
2615 ) -> (Client, Receiver<OutputEvent>) {
2616 let updates = Arc::new(MockDataSource::new_with_init_delay(delay));
2617 let (event_sender, event_rx) = create_event_sender();
2618
2619 let config = ConfigBuilder::new("sdk-key")
2620 .offline(offline)
2621 .daemon_mode(daemon_mode)
2622 .data_source(MockDataSourceBuilder::new().data_source(updates))
2623 .event_processor(
2624 EventProcessorBuilder::<HttpConnector>::new().event_sender(Arc::new(event_sender)),
2625 )
2626 .build()
2627 .expect("config should build");
2628
2629 let client = Client::build(config).expect("Should be built.");
2630
2631 (client, event_rx)
2632 }
2633
2634 fn make_mocked_offline_client() -> (Client, Receiver<OutputEvent>) {
2635 make_mocked_client_with_delay(0, true, false)
2636 }
2637
2638 fn make_mocked_client() -> (Client, Receiver<OutputEvent>) {
2639 make_mocked_client_with_delay(0, false, false)
2640 }
2641}