1use eval::Context;
2use parking_lot::RwLock;
3use std::io;
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7use tokio::runtime::Runtime;
8
9use launchdarkly_server_sdk_evaluation::{self as eval, Detail, FlagValue, PrerequisiteEvent};
10use serde::Serialize;
11use thiserror::Error;
12use tokio::sync::{broadcast, Semaphore};
13
14use super::config::Config;
15use super::data_source::DataSource;
16use super::data_source_builders::BuildError as DataSourceError;
17use super::evaluation::{FlagDetail, FlagDetailConfig};
18use super::stores::store::DataStore;
19use super::stores::store_builders::BuildError as DataStoreError;
20use crate::config::BuildError as ConfigBuildError;
21use crate::events::event::EventFactory;
22use crate::events::event::InputEvent;
23use crate::events::processor::EventProcessor;
24use crate::events::processor_builders::BuildError as EventProcessorError;
25use crate::{MigrationOpTracker, Stage};
26
27struct EventsScope {
28 disabled: bool,
29 event_factory: EventFactory,
30 prerequisite_event_recorder: Box<dyn eval::PrerequisiteEventRecorder + Send + Sync>,
31}
32
33struct PrerequisiteEventRecorder {
34 event_factory: EventFactory,
35 event_processor: Arc<dyn EventProcessor>,
36}
37
38impl eval::PrerequisiteEventRecorder for PrerequisiteEventRecorder {
39 fn record(&self, event: PrerequisiteEvent) {
40 let evt = self.event_factory.new_eval_event(
41 &event.prerequisite_flag.key,
42 event.context.clone(),
43 &event.prerequisite_flag,
44 event.prerequisite_result,
45 FlagValue::Json(serde_json::Value::Null),
46 Some(event.target_flag_key),
47 );
48
49 self.event_processor.send(evt);
50 }
51}
52
53#[non_exhaustive]
55#[derive(Debug, Error)]
56pub enum BuildError {
57 #[error("invalid client config: {0}")]
59 InvalidConfig(String),
60}
61
62impl From<DataSourceError> for BuildError {
63 fn from(error: DataSourceError) -> Self {
64 Self::InvalidConfig(error.to_string())
65 }
66}
67
68impl From<DataStoreError> for BuildError {
69 fn from(error: DataStoreError) -> Self {
70 Self::InvalidConfig(error.to_string())
71 }
72}
73
74impl From<EventProcessorError> for BuildError {
75 fn from(error: EventProcessorError) -> Self {
76 Self::InvalidConfig(error.to_string())
77 }
78}
79
80impl From<ConfigBuildError> for BuildError {
81 fn from(error: ConfigBuildError) -> Self {
82 Self::InvalidConfig(error.to_string())
83 }
84}
85
86#[non_exhaustive]
88#[derive(Debug, Error)]
89pub enum StartError {
90 #[error("couldn't spawn background thread for client: {0}")]
92 SpawnFailed(io::Error),
93}
94
95#[derive(PartialEq, Copy, Clone, Debug)]
96enum ClientInitState {
97 Initializing = 0,
98 Initialized = 1,
99 InitializationFailed = 2,
100}
101
102impl PartialEq<usize> for ClientInitState {
103 fn eq(&self, other: &usize) -> bool {
104 *self as usize == *other
105 }
106}
107
108impl From<usize> for ClientInitState {
109 fn from(val: usize) -> Self {
110 match val {
111 0 => ClientInitState::Initializing,
112 1 => ClientInitState::Initialized,
113 2 => ClientInitState::InitializationFailed,
114 _ => unreachable!(),
115 }
116 }
117}
118
119pub struct Client {
149 event_processor: Arc<dyn EventProcessor>,
150 data_source: Arc<dyn DataSource>,
151 data_store: Arc<RwLock<dyn DataStore>>,
152 events_default: EventsScope,
153 events_with_reasons: EventsScope,
154 init_notify: Arc<Semaphore>,
155 init_state: Arc<AtomicUsize>,
156 started: AtomicBool,
157 offline: bool,
158 daemon_mode: bool,
159 #[cfg_attr(
160 not(any(feature = "crypto-openssl", feature = "crypto-aws-lc-rs")),
161 allow(dead_code)
162 )]
163 sdk_key: String,
164 shutdown_broadcast: broadcast::Sender<()>,
165 runtime: RwLock<Option<Runtime>>,
166}
167
168impl Client {
169 pub fn build(config: Config) -> Result<Self, BuildError> {
171 if config.offline() {
172 info!("Started LaunchDarkly Client in offline mode");
173 } else if config.daemon_mode() {
174 info!("Started LaunchDarkly Client in daemon mode");
175 }
176
177 let tags = config.application_tag();
178 let instance_id = config.instance_id().to_string();
179
180 let endpoints = config.service_endpoints_builder().build()?;
181
182 let mut event_processor_builder = config.event_processor_builder().to_owned();
183 event_processor_builder.set_instance_id(instance_id.clone());
184 let event_processor =
185 event_processor_builder.build(&endpoints, config.sdk_key(), tags.clone())?;
186
187 let mut data_source_builder = config.data_source_builder().to_owned();
188 data_source_builder.set_instance_id(instance_id);
189 let data_source = data_source_builder.build(&endpoints, config.sdk_key(), tags.clone())?;
190 let data_store = config.data_store_builder().build()?;
191
192 let events_default = EventsScope {
193 disabled: config.offline(),
194 event_factory: EventFactory::new(false),
195 prerequisite_event_recorder: Box::new(PrerequisiteEventRecorder {
196 event_factory: EventFactory::new(false),
197 event_processor: event_processor.clone(),
198 }),
199 };
200
201 let events_with_reasons = EventsScope {
202 disabled: config.offline(),
203 event_factory: EventFactory::new(true),
204 prerequisite_event_recorder: Box::new(PrerequisiteEventRecorder {
205 event_factory: EventFactory::new(true),
206 event_processor: event_processor.clone(),
207 }),
208 };
209
210 let (shutdown_tx, _) = broadcast::channel(1);
211
212 Ok(Client {
213 event_processor,
214 data_source,
215 data_store,
216 events_default,
217 events_with_reasons,
218 init_notify: Arc::new(Semaphore::new(0)),
219 init_state: Arc::new(AtomicUsize::new(ClientInitState::Initializing as usize)),
220 started: AtomicBool::new(false),
221 offline: config.offline(),
222 daemon_mode: config.daemon_mode(),
223 sdk_key: config.sdk_key().into(),
224 shutdown_broadcast: shutdown_tx,
225 runtime: RwLock::new(None),
226 })
227 }
228
229 pub fn start_with_default_executor(&self) {
231 if self.started.load(Ordering::SeqCst) {
232 return;
233 }
234 self.started.store(true, Ordering::SeqCst);
235 self.start_with_default_executor_internal();
236 }
237
238 fn start_with_default_executor_internal(&self) {
239 let notify = self.init_notify.clone();
243 let init_state = self.init_state.clone();
244
245 self.data_source.subscribe(
246 self.data_store.clone(),
247 Arc::new(move |success| {
248 init_state.store(
249 (if success {
250 ClientInitState::Initialized
251 } else {
252 ClientInitState::InitializationFailed
253 }) as usize,
254 Ordering::SeqCst,
255 );
256 notify.add_permits(1);
257 }),
258 self.shutdown_broadcast.subscribe(),
259 );
260 }
261
262 pub fn start_with_runtime(&self) -> Result<bool, StartError> {
268 if self.started.load(Ordering::SeqCst) {
269 return Ok(true);
270 }
271 self.started.store(true, Ordering::SeqCst);
272
273 let runtime = Runtime::new().map_err(StartError::SpawnFailed)?;
274 let _guard = runtime.enter();
275 self.runtime.write().replace(runtime);
276
277 self.start_with_default_executor_internal();
278
279 Ok(true)
280 }
281
282 pub async fn wait_for_initialization(&self, timeout: Duration) -> Option<bool> {
288 if timeout > Duration::from_secs(60) {
289 warn!("wait_for_initialization was configured to block for up to {} seconds. We recommend blocking no longer than 60 seconds.", timeout.as_secs());
290 }
291
292 let initialized = tokio::time::timeout(timeout, self.initialized_async_internal()).await;
293 initialized.ok()
294 }
295
296 async fn initialized_async_internal(&self) -> bool {
297 if self.offline || self.daemon_mode {
298 return true;
299 }
300
301 if ClientInitState::Initialized != self.init_state.load(Ordering::SeqCst) {
307 let _permit = self.init_notify.acquire().await;
308 }
309 ClientInitState::Initialized == self.init_state.load(Ordering::SeqCst)
310 }
311
312 pub fn initialized(&self) -> bool {
316 self.offline
317 || self.daemon_mode
318 || ClientInitState::Initialized == self.init_state.load(Ordering::SeqCst)
319 }
320
321 pub fn close(&self) {
325 self.event_processor.close();
326
327 if !self.offline && !self.daemon_mode {
330 if let Err(e) = self.shutdown_broadcast.send(()) {
331 error!("Failed to shutdown client appropriately: {e}");
332 }
333 }
334
335 self.runtime.write().take();
338 }
339
340 pub fn flush(&self) {
348 self.event_processor.flush();
349 }
350
351 pub async fn flush_blocking(&self, timeout: Duration) -> bool {
387 let event_processor = self.event_processor.clone();
388
389 let flush_future =
390 tokio::task::spawn_blocking(move || event_processor.flush_blocking(timeout));
391
392 if timeout == Duration::ZERO {
393 flush_future.await.unwrap_or(false)
395 } else {
396 match tokio::time::timeout(timeout, flush_future).await {
398 Ok(Ok(result)) => result,
399 Ok(Err(_)) => false, Err(_) => false, }
402 }
403 }
404
405 pub fn identify(&self, context: Context) {
410 if self.events_default.disabled {
411 return;
412 }
413
414 self.send_internal(self.events_default.event_factory.new_identify(context));
415 }
416
417 pub fn bool_variation(&self, context: &Context, flag_key: &str, default: bool) -> bool {
425 let val = self.variation(context, flag_key, default);
426 if let Some(b) = val.as_bool() {
427 b
428 } else {
429 warn!("bool_variation called for a non-bool flag {flag_key:?} (got {val:?})");
430 default
431 }
432 }
433
434 pub fn str_variation(&self, context: &Context, flag_key: &str, default: String) -> String {
442 let val = self.variation(context, flag_key, default.clone());
443 if let Some(s) = val.as_string() {
444 s
445 } else {
446 warn!("str_variation called for a non-string flag {flag_key:?} (got {val:?})");
447 default
448 }
449 }
450
451 pub fn float_variation(&self, context: &Context, flag_key: &str, default: f64) -> f64 {
459 let val = self.variation(context, flag_key, default);
460 if let Some(f) = val.as_float() {
461 f
462 } else {
463 warn!("float_variation called for a non-float flag {flag_key:?} (got {val:?})");
464 default
465 }
466 }
467
468 pub fn int_variation(&self, context: &Context, flag_key: &str, default: i64) -> i64 {
476 let val = self.variation(context, flag_key, default);
477 if let Some(f) = val.as_int() {
478 f
479 } else {
480 warn!("int_variation called for a non-int flag {flag_key:?} (got {val:?})");
481 default
482 }
483 }
484
485 pub fn json_variation(
495 &self,
496 context: &Context,
497 flag_key: &str,
498 default: serde_json::Value,
499 ) -> serde_json::Value {
500 self.variation(context, flag_key, default.clone())
501 .as_json()
502 .unwrap_or(default)
503 }
504
505 pub fn bool_variation_detail(
512 &self,
513 context: &Context,
514 flag_key: &str,
515 default: bool,
516 ) -> Detail<bool> {
517 self.variation_detail(context, flag_key, default).try_map(
518 |val| val.as_bool(),
519 default,
520 eval::Error::WrongType,
521 )
522 }
523
524 pub fn str_variation_detail(
531 &self,
532 context: &Context,
533 flag_key: &str,
534 default: String,
535 ) -> Detail<String> {
536 self.variation_detail(context, flag_key, default.clone())
537 .try_map(|val| val.as_string(), default, eval::Error::WrongType)
538 }
539
540 pub fn float_variation_detail(
547 &self,
548 context: &Context,
549 flag_key: &str,
550 default: f64,
551 ) -> Detail<f64> {
552 self.variation_detail(context, flag_key, default).try_map(
553 |val| val.as_float(),
554 default,
555 eval::Error::WrongType,
556 )
557 }
558
559 pub fn int_variation_detail(
566 &self,
567 context: &Context,
568 flag_key: &str,
569 default: i64,
570 ) -> Detail<i64> {
571 self.variation_detail(context, flag_key, default).try_map(
572 |val| val.as_int(),
573 default,
574 eval::Error::WrongType,
575 )
576 }
577
578 pub fn json_variation_detail(
585 &self,
586 context: &Context,
587 flag_key: &str,
588 default: serde_json::Value,
589 ) -> Detail<serde_json::Value> {
590 self.variation_detail(context, flag_key, default.clone())
591 .try_map(|val| val.as_json(), default, eval::Error::WrongType)
592 }
593
594 #[cfg(any(feature = "crypto-aws-lc-rs", feature = "crypto-openssl"))]
595 pub fn secure_mode_hash(&self, context: &Context) -> Result<String, String> {
600 #[cfg(feature = "crypto-aws-lc-rs")]
601 {
602 let key =
603 aws_lc_rs::hmac::Key::new(aws_lc_rs::hmac::HMAC_SHA256, self.sdk_key.as_bytes());
604 let tag = aws_lc_rs::hmac::sign(&key, context.canonical_key().as_bytes());
605
606 Ok(data_encoding::HEXLOWER.encode(tag.as_ref()))
607 }
608 #[cfg(feature = "crypto-openssl")]
609 {
610 use openssl::hash::MessageDigest;
611 use openssl::pkey::PKey;
612 use openssl::sign::Signer;
613
614 let key = PKey::hmac(self.sdk_key.as_bytes())
615 .map_err(|e| format!("Failed to create HMAC key: {e}"))?;
616 let mut signer = Signer::new(MessageDigest::sha256(), &key)
617 .map_err(|e| format!("Failed to create signer: {e}"))?;
618 signer
619 .update(context.canonical_key().as_bytes())
620 .map_err(|e| format!("Failed to update signer: {e}"))?;
621 let hmac = signer
622 .sign_to_vec()
623 .map_err(|e| format!("Failed to sign: {e}"))?;
624
625 Ok(data_encoding::HEXLOWER.encode(&hmac))
626 }
627 }
628
629 pub fn all_flags_detail(
640 &self,
641 context: &Context,
642 flag_state_config: FlagDetailConfig,
643 ) -> FlagDetail {
644 if self.offline {
645 warn!(
646 "all_flags_detail() called, but client is in offline mode. Returning empty state"
647 );
648 return FlagDetail::new(false);
649 }
650
651 if !self.initialized() {
652 warn!("all_flags_detail() called before client has finished initializing! Feature store unavailable - returning empty state");
653 return FlagDetail::new(false);
654 }
655
656 let data_store = self.data_store.read();
657
658 let mut flag_detail = FlagDetail::new(true);
659 flag_detail.populate(&*data_store, context, flag_state_config);
660
661 flag_detail
662 }
663
664 pub fn variation_detail<T: Into<FlagValue> + Clone>(
670 &self,
671 context: &Context,
672 flag_key: &str,
673 default: T,
674 ) -> Detail<FlagValue> {
675 let (detail, _) =
676 self.variation_internal(context, flag_key, default, &self.events_with_reasons);
677 detail
678 }
679
680 pub fn variation<T: Into<FlagValue> + Clone>(
691 &self,
692 context: &Context,
693 flag_key: &str,
694 default: T,
695 ) -> FlagValue {
696 let (detail, _) = self.variation_internal(context, flag_key, default, &self.events_default);
697 detail.value.unwrap()
698 }
699
700 pub fn migration_variation(
705 &self,
706 context: &Context,
707 flag_key: &str,
708 default_stage: Stage,
709 ) -> (Stage, Arc<Mutex<MigrationOpTracker>>) {
710 let (detail, flag) =
711 self.variation_internal(context, flag_key, default_stage, &self.events_default);
712
713 let migration_detail =
714 detail.try_map(|v| v.try_into().ok(), default_stage, eval::Error::WrongType);
715 let tracker = MigrationOpTracker::new(
716 flag_key.into(),
717 flag,
718 context.clone(),
719 migration_detail.clone(),
720 default_stage,
721 );
722
723 (
724 migration_detail.value.unwrap_or(default_stage),
725 Arc::new(Mutex::new(tracker)),
726 )
727 }
728
729 pub fn track_event(&self, context: Context, key: impl Into<String>) {
739 let _ = self.track(context, key, None, serde_json::Value::Null);
740 }
741
742 pub fn track_data(
755 &self,
756 context: Context,
757 key: impl Into<String>,
758 data: impl Serialize,
759 ) -> serde_json::Result<()> {
760 self.track(context, key, None, data)
761 }
762
763 pub fn track_metric(
774 &self,
775 context: Context,
776 key: impl Into<String>,
777 value: f64,
778 data: impl Serialize,
779 ) {
780 let _ = self.track(context, key, Some(value), data);
781 }
782
783 fn track(
784 &self,
785 context: Context,
786 key: impl Into<String>,
787 metric_value: Option<f64>,
788 data: impl Serialize,
789 ) -> serde_json::Result<()> {
790 if !self.events_default.disabled {
791 let event =
792 self.events_default
793 .event_factory
794 .new_custom(context, key, metric_value, data)?;
795
796 self.send_internal(event);
797 }
798
799 Ok(())
800 }
801
802 pub fn track_migration_op(&self, tracker: Arc<Mutex<MigrationOpTracker>>) {
809 if self.events_default.disabled {
810 return;
811 }
812
813 match tracker.lock() {
814 Ok(tracker) => {
815 let event = tracker.build();
816 match event {
817 Ok(event) => {
818 self.send_internal(
819 self.events_default.event_factory.new_migration_op(event),
820 );
821 }
822 Err(e) => error!("Failed to build migration event, no event will be sent: {e}"),
823 }
824 }
825 Err(e) => error!("Failed to lock migration tracker, no event will be sent: {e}"),
826 }
827 }
828
829 fn variation_internal<T: Into<FlagValue> + Clone>(
830 &self,
831 context: &Context,
832 flag_key: &str,
833 default: T,
834 events_scope: &EventsScope,
835 ) -> (Detail<FlagValue>, Option<eval::Flag>) {
836 if self.offline {
837 return (
838 Detail::err_default(eval::Error::ClientNotReady, default.into()),
839 None,
840 );
841 }
842
843 let (flag, result) = match self.initialized() {
844 false => (
845 None,
846 Detail::err_default(eval::Error::ClientNotReady, default.clone().into()),
847 ),
848 true => {
849 let data_store = self.data_store.read();
850 match data_store.flag(flag_key) {
851 Some(flag) => {
852 let result = eval::evaluate(
853 data_store.to_store(),
854 &flag,
855 context,
856 Some(&*events_scope.prerequisite_event_recorder),
857 )
858 .map(|v| v.clone())
859 .or(default.clone().into());
860
861 (Some(flag), result)
862 }
863 None => (
864 None,
865 Detail::err_default(eval::Error::FlagNotFound, default.clone().into()),
866 ),
867 }
868 }
869 };
870
871 if !events_scope.disabled {
872 let event = match &flag {
873 Some(f) => events_scope.event_factory.new_eval_event(
874 flag_key,
875 context.clone(),
876 f,
877 result.clone(),
878 default.into(),
879 None,
880 ),
881 None => events_scope.event_factory.new_unknown_flag_event(
882 flag_key,
883 context.clone(),
884 result.clone(),
885 default.into(),
886 ),
887 };
888 self.send_internal(event);
889 }
890
891 (result, flag)
892 }
893
894 fn send_internal(&self, event: InputEvent) {
895 self.event_processor.send(event);
896 }
897}
898
899#[cfg(test)]
900mod tests {
901 use assert_json_diff::assert_json_eq;
902 use crossbeam_channel::Receiver;
903 use eval::{ContextBuilder, MultiContextBuilder};
904 use futures::FutureExt;
905 use launchdarkly_server_sdk_evaluation::{Flag, Reason, Segment};
906 use maplit::hashmap;
907 use std::collections::HashMap;
908 use tokio::time::Instant;
909
910 use crate::data_source::MockDataSource;
911 use crate::data_source_builders::MockDataSourceBuilder;
912 use crate::evaluation::FlagFilter;
913 use crate::events::create_event_sender;
914 use crate::events::event::{OutputEvent, VariationKey};
915 use crate::events::processor_builders::EventProcessorBuilder;
916 use crate::stores::persistent_store::tests::InMemoryPersistentDataStore;
917 use crate::stores::store_types::{PatchTarget, StorageItem};
918 use crate::test_common::{
919 self, basic_flag, basic_flag_with_prereq, basic_flag_with_prereqs_and_visibility,
920 basic_flag_with_visibility, basic_int_flag, basic_migration_flag, basic_off_flag,
921 };
922 use crate::test_data::TestData;
923 use crate::{
924 AllData, ConfigBuilder, MigratorBuilder, NullEventProcessorBuilder, Operation, Origin,
925 PersistentDataStore, PersistentDataStoreBuilder, PersistentDataStoreFactory,
926 SerializedItem,
927 };
928 use test_case::test_case;
929
930 use super::*;
931
932 fn is_send_and_sync<T: Send + Sync>() {}
933
934 #[test]
935 fn ensure_client_is_send_and_sync() {
936 is_send_and_sync::<Client>()
937 }
938
939 #[tokio::test]
940 async fn client_asynchronously_initializes_within_timeout() {
941 let (client, _event_rx) = make_mocked_client_with_delay(1000, false, false);
942 client.start_with_default_executor();
943
944 let now = Instant::now();
945 let initialized = client
946 .wait_for_initialization(Duration::from_millis(1500))
947 .await;
948 let elapsed_time = now.elapsed();
949 assert!(elapsed_time.as_millis() > 500);
951 assert_eq!(initialized, Some(true));
952 }
953
954 #[tokio::test]
955 async fn client_asynchronously_initializes_slower_than_timeout() {
956 let (client, _event_rx) = make_mocked_client_with_delay(2000, false, false);
957 client.start_with_default_executor();
958
959 let now = Instant::now();
960 let initialized = client
961 .wait_for_initialization(Duration::from_millis(500))
962 .await;
963 let elapsed_time = now.elapsed();
964 assert!(elapsed_time.as_millis() < 750);
966 assert!(initialized.is_none());
967 }
968
969 #[tokio::test]
970 async fn client_initializes_immediately_in_offline_mode() {
971 let (client, _event_rx) = make_mocked_client_with_delay(1000, true, false);
972 client.start_with_default_executor();
973
974 assert!(client.initialized());
975
976 let now = Instant::now();
977 let initialized = client
978 .wait_for_initialization(Duration::from_millis(2000))
979 .await;
980 let elapsed_time = now.elapsed();
981 assert_eq!(initialized, Some(true));
982 assert!(elapsed_time.as_millis() < 500)
983 }
984
985 #[tokio::test]
986 async fn client_initializes_immediately_in_daemon_mode() {
987 let (client, _event_rx) = make_mocked_client_with_delay(1000, false, true);
988 client.start_with_default_executor();
989
990 assert!(client.initialized());
991
992 let now = Instant::now();
993 let initialized = client
994 .wait_for_initialization(Duration::from_millis(2000))
995 .await;
996 let elapsed_time = now.elapsed();
997 assert_eq!(initialized, Some(true));
998 assert!(elapsed_time.as_millis() < 500)
999 }
1000
1001 #[test_case(basic_flag("myFlag"), false.into(), true.into())]
1002 #[test_case(basic_int_flag("myFlag"), 0.into(), test_common::FLOAT_TO_INT_MAX.into())]
1003 fn client_updates_changes_evaluation_results(
1004 flag: eval::Flag,
1005 default: FlagValue,
1006 expected: FlagValue,
1007 ) {
1008 let context = ContextBuilder::new("foo")
1009 .build()
1010 .expect("Failed to create context");
1011
1012 let (client, _event_rx) = make_mocked_client();
1013
1014 let result = client.variation_detail(&context, "myFlag", default.clone());
1015 assert_eq!(result.value.unwrap(), default);
1016
1017 client.start_with_default_executor();
1018 client
1019 .data_store
1020 .write()
1021 .upsert(
1022 &flag.key,
1023 PatchTarget::Flag(StorageItem::Item(flag.clone())),
1024 )
1025 .expect("patch should apply");
1026
1027 let result = client.variation_detail(&context, "myFlag", default);
1028 assert_eq!(result.value.unwrap(), expected);
1029 assert!(matches!(
1030 result.reason,
1031 Reason::Fallthrough {
1032 in_experiment: false
1033 }
1034 ));
1035 }
1036
1037 #[test]
1038 fn all_flags_detail_is_invalid_when_offline() {
1039 let (client, _event_rx) = make_mocked_offline_client();
1040 client.start_with_default_executor();
1041
1042 let context = ContextBuilder::new("bob")
1043 .build()
1044 .expect("Failed to create context");
1045
1046 let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1047 assert_json_eq!(all_flags, json!({"$valid": false, "$flagsState" : {}}));
1048 }
1049
1050 #[test]
1051 fn all_flags_detail_is_invalid_when_not_initialized() {
1052 let (client, _event_rx) = make_mocked_client();
1053
1054 let context = ContextBuilder::new("bob")
1055 .build()
1056 .expect("Failed to create context");
1057
1058 let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1059 assert_json_eq!(all_flags, json!({"$valid": false, "$flagsState" : {}}));
1060 }
1061
1062 #[tokio::test]
1063 async fn all_flags_detail_returns_flag_states() {
1064 let td = TestData::new();
1065 td.use_preconfigured_flag(basic_flag("myFlag1"));
1066 td.use_preconfigured_flag(basic_flag("myFlag2"));
1067 let (client, _event_rx) = make_client_with_test_data(&td);
1068 client.start_with_default_executor();
1069
1070 let context = ContextBuilder::new("bob")
1071 .build()
1072 .expect("Failed to create context");
1073
1074 let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1075
1076 client.close();
1077
1078 assert_json_eq!(
1079 all_flags,
1080 json!({
1081 "myFlag1": true,
1082 "myFlag2": true,
1083 "$flagsState": {
1084 "myFlag1": {
1085 "version": 1,
1086 "variation": 1
1087 },
1088 "myFlag2": {
1089 "version": 1,
1090 "variation": 1
1091 },
1092 },
1093 "$valid": true
1094 })
1095 );
1096 }
1097
1098 #[tokio::test]
1099 async fn all_flags_detail_returns_prerequisite_relations() {
1100 let td = TestData::new();
1101 td.use_preconfigured_flag(basic_flag("prereq1"));
1102 td.use_preconfigured_flag(basic_flag("prereq2"));
1103 td.use_preconfigured_flag(basic_flag_with_prereqs_and_visibility(
1104 "toplevel",
1105 &["prereq1", "prereq2"],
1106 false,
1107 false,
1108 ));
1109 let (client, _event_rx) = make_client_with_test_data(&td);
1110 client.start_with_default_executor();
1111
1112 let context = ContextBuilder::new("bob")
1113 .build()
1114 .expect("Failed to create context");
1115
1116 let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1117
1118 client.close();
1119
1120 assert_json_eq!(
1121 all_flags,
1122 json!({
1123 "prereq1": true,
1124 "prereq2": true,
1125 "toplevel": true,
1126 "$flagsState": {
1127 "toplevel": {
1128 "version": 1,
1129 "variation": 1,
1130 "prerequisites": ["prereq1", "prereq2"]
1131 },
1132 "prereq1": {
1133 "version": 1,
1134 "variation": 1
1135 },
1136 "prereq2": {
1137 "version": 1,
1138 "variation": 1
1139 },
1140 },
1141 "$valid": true
1142 })
1143 );
1144 }
1145
1146 #[tokio::test]
1147 async fn all_flags_detail_returns_prerequisite_relations_when_not_visible_to_clients() {
1148 let td = TestData::new();
1149 td.use_preconfigured_flag(basic_flag_with_visibility("prereq1", false, false));
1150 td.use_preconfigured_flag(basic_flag_with_visibility("prereq2", false, false));
1151 td.use_preconfigured_flag(basic_flag_with_prereqs_and_visibility(
1152 "toplevel",
1153 &["prereq1", "prereq2"],
1154 true,
1155 false,
1156 ));
1157 let (client, _event_rx) = make_client_with_test_data(&td);
1158 client.start_with_default_executor();
1159
1160 let context = ContextBuilder::new("bob")
1161 .build()
1162 .expect("Failed to create context");
1163
1164 let mut config = FlagDetailConfig::new();
1165 config.flag_filter(FlagFilter::CLIENT);
1166
1167 let all_flags = client.all_flags_detail(&context, config);
1168
1169 client.close();
1170
1171 assert_json_eq!(
1172 all_flags,
1173 json!({
1174 "toplevel": true,
1175 "$flagsState": {
1176 "toplevel": {
1177 "version": 1,
1178 "variation": 1,
1179 "prerequisites": ["prereq1", "prereq2"]
1180 },
1181 },
1182 "$valid": true
1183 })
1184 );
1185 }
1186
1187 #[tokio::test]
1188 async fn variation_tracks_events_correctly() {
1189 let td = TestData::new();
1190 td.use_preconfigured_flag(basic_flag("myFlag"));
1191 let (client, event_rx) = make_client_with_test_data(&td);
1192 client.start_with_default_executor();
1193
1194 let context = ContextBuilder::new("bob")
1195 .build()
1196 .expect("Failed to create context");
1197
1198 let flag_value = client.variation(&context, "myFlag", FlagValue::Bool(false));
1199
1200 assert!(flag_value.as_bool().unwrap());
1201 client.flush();
1202 client.close();
1203
1204 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1205 assert_eq!(events.len(), 2);
1206 assert_eq!(events[0].kind(), "index");
1207 assert_eq!(events[1].kind(), "summary");
1208
1209 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1210 let variation_key = VariationKey {
1211 version: Some(1),
1212 variation: Some(1),
1213 };
1214 let feature = event_summary.features.get("myFlag");
1215 assert!(feature.is_some());
1216
1217 let feature = feature.unwrap();
1218 assert!(feature.counters.contains_key(&variation_key));
1219 } else {
1220 panic!("Event should be a summary type");
1221 }
1222 }
1223
1224 #[test]
1225 fn variation_handles_offline_mode() {
1226 let (client, event_rx) = make_mocked_offline_client();
1227 client.start_with_default_executor();
1228
1229 let context = ContextBuilder::new("bob")
1230 .build()
1231 .expect("Failed to create context");
1232 let flag_value = client.variation(&context, "myFlag", FlagValue::Bool(false));
1233
1234 assert!(!flag_value.as_bool().unwrap());
1235 client.flush();
1236 client.close();
1237
1238 assert_eq!(event_rx.iter().count(), 0);
1239 }
1240
1241 #[test]
1242 fn variation_handles_unknown_flags() {
1243 let (client, event_rx) = make_mocked_client();
1244 client.start_with_default_executor();
1245 let context = ContextBuilder::new("bob")
1246 .build()
1247 .expect("Failed to create context");
1248
1249 let flag_value = client.variation(&context, "non-existent-flag", FlagValue::Bool(false));
1250
1251 assert!(!flag_value.as_bool().unwrap());
1252 client.flush();
1253 client.close();
1254
1255 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1256 assert_eq!(events.len(), 2);
1257 assert_eq!(events[0].kind(), "index");
1258 assert_eq!(events[1].kind(), "summary");
1259
1260 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1261 let variation_key = VariationKey {
1262 version: None,
1263 variation: None,
1264 };
1265
1266 let feature = event_summary.features.get("non-existent-flag");
1267 assert!(feature.is_some());
1268
1269 let feature = feature.unwrap();
1270 assert!(feature.counters.contains_key(&variation_key));
1271 } else {
1272 panic!("Event should be a summary type");
1273 }
1274 }
1275
1276 #[tokio::test]
1277 async fn variation_detail_handles_debug_events_correctly() {
1278 let td = TestData::new();
1279 let mut flag = basic_flag("myFlag");
1280 flag.debug_events_until_date = Some(64_060_606_800_000); td.use_preconfigured_flag(flag);
1282 let (client, event_rx) = make_client_with_test_data(&td);
1283 client.start_with_default_executor();
1284
1285 let context = ContextBuilder::new("bob")
1286 .build()
1287 .expect("Failed to create context");
1288
1289 let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1290
1291 assert!(detail.value.unwrap().as_bool().unwrap());
1292 assert!(matches!(
1293 detail.reason,
1294 Reason::Fallthrough {
1295 in_experiment: false
1296 }
1297 ));
1298 client.flush();
1299 client.close();
1300
1301 let events = event_rx.try_iter().collect::<Vec<OutputEvent>>();
1302 assert_eq!(events.len(), 3);
1303 assert_eq!(events[0].kind(), "index");
1304 assert_eq!(events[1].kind(), "debug");
1305 assert_eq!(events[2].kind(), "summary");
1306
1307 if let OutputEvent::Summary(event_summary) = events[2].clone() {
1308 let variation_key = VariationKey {
1309 version: Some(1),
1310 variation: Some(1),
1311 };
1312
1313 let feature = event_summary.features.get("myFlag");
1314 assert!(feature.is_some());
1315
1316 let feature = feature.unwrap();
1317 assert!(feature.counters.contains_key(&variation_key));
1318 } else {
1319 panic!("Event should be a summary type");
1320 }
1321 }
1322
1323 #[tokio::test]
1324 async fn variation_detail_tracks_events_correctly() {
1325 let td = TestData::new();
1326 td.use_preconfigured_flag(basic_flag("myFlag"));
1327 let (client, event_rx) = make_client_with_test_data(&td);
1328 client.start_with_default_executor();
1329
1330 let context = ContextBuilder::new("bob")
1331 .build()
1332 .expect("Failed to create context");
1333
1334 let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1335
1336 assert!(detail.value.unwrap().as_bool().unwrap());
1337 assert!(matches!(
1338 detail.reason,
1339 Reason::Fallthrough {
1340 in_experiment: false
1341 }
1342 ));
1343 client.flush();
1344 client.close();
1345
1346 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1347 assert_eq!(events.len(), 2);
1348 assert_eq!(events[0].kind(), "index");
1349 assert_eq!(events[1].kind(), "summary");
1350
1351 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1352 let variation_key = VariationKey {
1353 version: Some(1),
1354 variation: Some(1),
1355 };
1356
1357 let feature = event_summary.features.get("myFlag");
1358 assert!(feature.is_some());
1359
1360 let feature = feature.unwrap();
1361 assert!(feature.counters.contains_key(&variation_key));
1362 } else {
1363 panic!("Event should be a summary type");
1364 }
1365 }
1366
1367 #[test]
1368 fn variation_detail_handles_offline_mode() {
1369 let (client, event_rx) = make_mocked_offline_client();
1370 client.start_with_default_executor();
1371
1372 let context = ContextBuilder::new("bob")
1373 .build()
1374 .expect("Failed to create context");
1375
1376 let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1377
1378 assert!(!detail.value.unwrap().as_bool().unwrap());
1379 assert!(matches!(
1380 detail.reason,
1381 Reason::Error {
1382 error: eval::Error::ClientNotReady
1383 }
1384 ));
1385 client.flush();
1386 client.close();
1387
1388 assert_eq!(event_rx.iter().count(), 0);
1389 }
1390
1391 struct InMemoryPersistentDataStoreFactory {
1392 data: AllData<Flag, Segment>,
1393 initialized: bool,
1394 }
1395
1396 impl PersistentDataStoreFactory for InMemoryPersistentDataStoreFactory {
1397 fn create_persistent_data_store(
1398 &self,
1399 ) -> Result<Box<dyn PersistentDataStore + 'static>, std::io::Error> {
1400 let serialized_data =
1401 AllData::<SerializedItem, SerializedItem>::try_from(self.data.clone())?;
1402 Ok(Box::new(InMemoryPersistentDataStore {
1403 data: serialized_data,
1404 initialized: self.initialized,
1405 }))
1406 }
1407 }
1408
1409 #[test]
1410 fn variation_detail_handles_daemon_mode() {
1411 testing_logger::setup();
1412 let factory = InMemoryPersistentDataStoreFactory {
1413 data: AllData {
1414 flags: hashmap!["flag".into() => basic_flag("flag")],
1415 segments: HashMap::new(),
1416 },
1417 initialized: true,
1418 };
1419 let builder = PersistentDataStoreBuilder::new(Arc::new(factory));
1420
1421 let config = ConfigBuilder::new("sdk-key")
1422 .daemon_mode(true)
1423 .data_store(&builder)
1424 .event_processor(&NullEventProcessorBuilder::new())
1425 .build()
1426 .expect("config should build");
1427
1428 let client = Client::build(config).expect("Should be built.");
1429
1430 client.start_with_default_executor();
1431
1432 let context = ContextBuilder::new("bob")
1433 .build()
1434 .expect("Failed to create context");
1435
1436 let detail = client.variation_detail(&context, "flag", FlagValue::Bool(false));
1437
1438 assert!(detail.value.unwrap().as_bool().unwrap());
1439 assert!(matches!(
1440 detail.reason,
1441 Reason::Fallthrough {
1442 in_experiment: false
1443 }
1444 ));
1445 client.flush();
1446 client.close();
1447
1448 testing_logger::validate(|captured_logs| {
1449 assert_eq!(captured_logs.len(), 1);
1450 assert_eq!(
1451 captured_logs[0].body,
1452 "Started LaunchDarkly Client in daemon mode"
1453 );
1454 });
1455 }
1456
1457 #[test]
1458 fn daemon_mode_is_quiet_if_store_is_not_initialized() {
1459 testing_logger::setup();
1460
1461 let factory = InMemoryPersistentDataStoreFactory {
1462 data: AllData {
1463 flags: HashMap::new(),
1464 segments: HashMap::new(),
1465 },
1466 initialized: false,
1467 };
1468 let builder = PersistentDataStoreBuilder::new(Arc::new(factory));
1469
1470 let config = ConfigBuilder::new("sdk-key")
1471 .daemon_mode(true)
1472 .data_store(&builder)
1473 .event_processor(&NullEventProcessorBuilder::new())
1474 .build()
1475 .expect("config should build");
1476
1477 let client = Client::build(config).expect("Should be built.");
1478
1479 client.start_with_default_executor();
1480
1481 let context = ContextBuilder::new("bob")
1482 .build()
1483 .expect("Failed to create context");
1484
1485 client.variation_detail(&context, "flag", FlagValue::Bool(false));
1486
1487 testing_logger::validate(|captured_logs| {
1488 assert_eq!(captured_logs.len(), 1);
1489 assert_eq!(
1490 captured_logs[0].body,
1491 "Started LaunchDarkly Client in daemon mode"
1492 );
1493 });
1494 }
1495
1496 #[tokio::test]
1497 async fn variation_handles_off_flag_without_variation() {
1498 let td = TestData::new();
1499 td.use_preconfigured_flag(basic_off_flag("myFlag"));
1500 let (client, event_rx) = make_client_with_test_data(&td);
1501 client.start_with_default_executor();
1502
1503 let context = ContextBuilder::new("bob")
1504 .build()
1505 .expect("Failed to create context");
1506
1507 let result = client.variation(&context, "myFlag", FlagValue::Bool(false));
1508
1509 assert!(!result.as_bool().unwrap());
1510 client.flush();
1511 client.close();
1512
1513 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1514 assert_eq!(events.len(), 2);
1515 assert_eq!(events[0].kind(), "index");
1516 assert_eq!(events[1].kind(), "summary");
1517
1518 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1519 let variation_key = VariationKey {
1520 version: Some(1),
1521 variation: None,
1522 };
1523 let feature = event_summary.features.get("myFlag");
1524 assert!(feature.is_some());
1525
1526 let feature = feature.unwrap();
1527 assert!(feature.counters.contains_key(&variation_key));
1528 } else {
1529 panic!("Event should be a summary type");
1530 }
1531 }
1532
1533 #[tokio::test]
1534 async fn variation_detail_tracks_prereq_events_correctly() {
1535 let td = TestData::new();
1536 let mut prereq_flag = basic_flag("prereqFlag");
1537 prereq_flag.track_events = true;
1538 td.use_preconfigured_flag(prereq_flag);
1539
1540 let mut main_flag = basic_flag_with_prereq("myFlag", "prereqFlag");
1541 main_flag.track_events = true;
1542 td.use_preconfigured_flag(main_flag);
1543
1544 let (client, event_rx) = make_client_with_test_data(&td);
1545 client.start_with_default_executor();
1546
1547 let context = ContextBuilder::new("bob")
1548 .build()
1549 .expect("Failed to create context");
1550
1551 let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1552
1553 assert!(detail.value.unwrap().as_bool().unwrap());
1554 assert!(matches!(
1555 detail.reason,
1556 Reason::Fallthrough {
1557 in_experiment: false
1558 }
1559 ));
1560 client.flush();
1561 client.close();
1562
1563 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1564 assert_eq!(events.len(), 4);
1565 assert_eq!(events[0].kind(), "index");
1566 assert_eq!(events[1].kind(), "feature");
1567 assert_eq!(events[2].kind(), "feature");
1568 assert_eq!(events[3].kind(), "summary");
1569
1570 if let OutputEvent::Summary(event_summary) = events[3].clone() {
1571 let variation_key = VariationKey {
1572 version: Some(1),
1573 variation: Some(1),
1574 };
1575 let feature = event_summary.features.get("myFlag");
1576 assert!(feature.is_some());
1577
1578 let feature = feature.unwrap();
1579 assert!(feature.counters.contains_key(&variation_key));
1580
1581 let variation_key = VariationKey {
1582 version: Some(1),
1583 variation: Some(1),
1584 };
1585 let feature = event_summary.features.get("prereqFlag");
1586 assert!(feature.is_some());
1587
1588 let feature = feature.unwrap();
1589 assert!(feature.counters.contains_key(&variation_key));
1590 }
1591 }
1592
1593 #[tokio::test]
1594 async fn variation_handles_failed_prereqs_correctly() {
1595 let td = TestData::new();
1596 let mut prereq_flag = basic_off_flag("prereqFlag");
1597 prereq_flag.track_events = true;
1598 td.use_preconfigured_flag(prereq_flag);
1599
1600 let mut main_flag = basic_flag_with_prereq("myFlag", "prereqFlag");
1601 main_flag.track_events = true;
1602 td.use_preconfigured_flag(main_flag);
1603
1604 let (client, event_rx) = make_client_with_test_data(&td);
1605 client.start_with_default_executor();
1606
1607 let context = ContextBuilder::new("bob")
1608 .build()
1609 .expect("Failed to create context");
1610
1611 let detail = client.variation(&context, "myFlag", FlagValue::Bool(false));
1612
1613 assert!(!detail.as_bool().unwrap());
1614 client.flush();
1615 client.close();
1616
1617 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1618 assert_eq!(events.len(), 4);
1619 assert_eq!(events[0].kind(), "index");
1620 assert_eq!(events[1].kind(), "feature");
1621 assert_eq!(events[2].kind(), "feature");
1622 assert_eq!(events[3].kind(), "summary");
1623
1624 if let OutputEvent::Summary(event_summary) = events[3].clone() {
1625 let variation_key = VariationKey {
1626 version: Some(1),
1627 variation: Some(0),
1628 };
1629 let feature = event_summary.features.get("myFlag");
1630 assert!(feature.is_some());
1631
1632 let feature = feature.unwrap();
1633 assert!(feature.counters.contains_key(&variation_key));
1634
1635 let variation_key = VariationKey {
1636 version: Some(1),
1637 variation: None,
1638 };
1639 let feature = event_summary.features.get("prereqFlag");
1640 assert!(feature.is_some());
1641
1642 let feature = feature.unwrap();
1643 assert!(feature.counters.contains_key(&variation_key));
1644 }
1645 }
1646
1647 #[test]
1648 fn variation_detail_handles_flag_not_found() {
1649 let (client, event_rx) = make_mocked_client();
1650 client.start_with_default_executor();
1651
1652 let context = ContextBuilder::new("bob")
1653 .build()
1654 .expect("Failed to create context");
1655 let detail = client.variation_detail(&context, "non-existent-flag", FlagValue::Bool(false));
1656
1657 assert!(!detail.value.unwrap().as_bool().unwrap());
1658 assert!(matches!(
1659 detail.reason,
1660 Reason::Error {
1661 error: eval::Error::FlagNotFound
1662 }
1663 ));
1664 client.flush();
1665 client.close();
1666
1667 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1668 assert_eq!(events.len(), 2);
1669 assert_eq!(events[0].kind(), "index");
1670 assert_eq!(events[1].kind(), "summary");
1671
1672 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1673 let variation_key = VariationKey {
1674 version: None,
1675 variation: None,
1676 };
1677 let feature = event_summary.features.get("non-existent-flag");
1678 assert!(feature.is_some());
1679
1680 let feature = feature.unwrap();
1681 assert!(feature.counters.contains_key(&variation_key));
1682 } else {
1683 panic!("Event should be a summary type");
1684 }
1685 }
1686
1687 #[tokio::test]
1688 async fn variation_detail_handles_client_not_ready() {
1689 let (client, event_rx) = make_mocked_client_with_delay(u64::MAX, false, false);
1690 client.start_with_default_executor();
1691 let context = ContextBuilder::new("bob")
1692 .build()
1693 .expect("Failed to create context");
1694
1695 let detail = client.variation_detail(&context, "non-existent-flag", FlagValue::Bool(false));
1696
1697 assert!(!detail.value.unwrap().as_bool().unwrap());
1698 assert!(matches!(
1699 detail.reason,
1700 Reason::Error {
1701 error: eval::Error::ClientNotReady
1702 }
1703 ));
1704 client.flush();
1705 client.close();
1706
1707 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1708 assert_eq!(events.len(), 2);
1709 assert_eq!(events[0].kind(), "index");
1710 assert_eq!(events[1].kind(), "summary");
1711
1712 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1713 let variation_key = VariationKey {
1714 version: None,
1715 variation: None,
1716 };
1717 let feature = event_summary.features.get("non-existent-flag");
1718 assert!(feature.is_some());
1719
1720 let feature = feature.unwrap();
1721 assert!(feature.counters.contains_key(&variation_key));
1722 } else {
1723 panic!("Event should be a summary type");
1724 }
1725 }
1726
1727 #[test]
1728 fn identify_sends_identify_event() {
1729 let (client, event_rx) = make_mocked_client();
1730 client.start_with_default_executor();
1731
1732 let context = ContextBuilder::new("bob")
1733 .build()
1734 .expect("Failed to create context");
1735
1736 client.identify(context);
1737 client.flush();
1738 client.close();
1739
1740 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1741 assert_eq!(events.len(), 1);
1742 assert_eq!(events[0].kind(), "identify");
1743 }
1744
1745 #[test]
1746 fn identify_sends_sends_nothing_in_offline_mode() {
1747 let (client, event_rx) = make_mocked_offline_client();
1748 client.start_with_default_executor();
1749
1750 let context = ContextBuilder::new("bob")
1751 .build()
1752 .expect("Failed to create context");
1753
1754 client.identify(context);
1755 client.flush();
1756 client.close();
1757
1758 assert_eq!(event_rx.iter().count(), 0);
1759 }
1760
1761 #[test]
1762 #[cfg(any(feature = "crypto-aws-lc-rs", feature = "crypto-openssl"))]
1763 fn secure_mode_hash() {
1764 let config = ConfigBuilder::new("secret")
1765 .offline(true)
1766 .build()
1767 .expect("config should build");
1768 let client = Client::build(config).expect("Should be built.");
1769 let context = ContextBuilder::new("Message")
1770 .build()
1771 .expect("Failed to create context");
1772
1773 assert_eq!(
1774 client
1775 .secure_mode_hash(&context)
1776 .expect("Hash should be computed"),
1777 "aa747c502a898200f9e4fa21bac68136f886a0e27aec70ba06daf2e2a5cb5597"
1778 );
1779 }
1780
1781 #[test]
1782 #[cfg(any(feature = "crypto-aws-lc-rs", feature = "crypto-openssl"))]
1783 fn secure_mode_hash_with_multi_kind() {
1784 let config = ConfigBuilder::new("secret")
1785 .offline(true)
1786 .build()
1787 .expect("config should build");
1788 let client = Client::build(config).expect("Should be built.");
1789
1790 let org = ContextBuilder::new("org-key|1")
1791 .kind("org")
1792 .build()
1793 .expect("Failed to create context");
1794 let user = ContextBuilder::new("user-key:2")
1795 .build()
1796 .expect("Failed to create context");
1797
1798 let context = MultiContextBuilder::new()
1799 .add_context(org)
1800 .add_context(user)
1801 .build()
1802 .expect("failed to build multi-context");
1803
1804 assert_eq!(
1805 client
1806 .secure_mode_hash(&context)
1807 .expect("Hash should be computed"),
1808 "5687e6383b920582ed50c2a96c98a115f1b6aad85a60579d761d9b8797415163"
1809 );
1810 }
1811
1812 #[derive(Serialize)]
1813 struct MyCustomData {
1814 pub answer: u32,
1815 }
1816
1817 #[test]
1818 fn track_sends_track_and_index_events() -> serde_json::Result<()> {
1819 let (client, event_rx) = make_mocked_client();
1820 client.start_with_default_executor();
1821
1822 let context = ContextBuilder::new("bob")
1823 .build()
1824 .expect("Failed to create context");
1825
1826 client.track_event(context.clone(), "event-with-null");
1827 client.track_data(context.clone(), "event-with-string", "string-data")?;
1828 client.track_data(context.clone(), "event-with-json", json!({"answer": 42}))?;
1829 client.track_data(
1830 context.clone(),
1831 "event-with-struct",
1832 MyCustomData { answer: 42 },
1833 )?;
1834 client.track_metric(context, "event-with-metric", 42.0, serde_json::Value::Null);
1835
1836 client.flush();
1837 client.close();
1838
1839 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1840 assert_eq!(events.len(), 6);
1841
1842 let mut events_by_type: HashMap<&str, usize> = HashMap::new();
1843 for event in events {
1844 if let Some(count) = events_by_type.get_mut(event.kind()) {
1845 *count += 1;
1846 } else {
1847 events_by_type.insert(event.kind(), 1);
1848 }
1849 }
1850 assert!(matches!(events_by_type.get("index"), Some(1)));
1851 assert!(matches!(events_by_type.get("custom"), Some(5)));
1852
1853 Ok(())
1854 }
1855
1856 #[test]
1857 fn track_sends_nothing_in_offline_mode() -> serde_json::Result<()> {
1858 let (client, event_rx) = make_mocked_offline_client();
1859 client.start_with_default_executor();
1860
1861 let context = ContextBuilder::new("bob")
1862 .build()
1863 .expect("Failed to create context");
1864
1865 client.track_event(context.clone(), "event-with-null");
1866 client.track_data(context.clone(), "event-with-string", "string-data")?;
1867 client.track_data(context.clone(), "event-with-json", json!({"answer": 42}))?;
1868 client.track_data(
1869 context.clone(),
1870 "event-with-struct",
1871 MyCustomData { answer: 42 },
1872 )?;
1873 client.track_metric(context, "event-with-metric", 42.0, serde_json::Value::Null);
1874
1875 client.flush();
1876 client.close();
1877
1878 assert_eq!(event_rx.iter().count(), 0);
1879
1880 Ok(())
1881 }
1882
1883 #[test]
1884 fn migration_handles_flag_not_found() {
1885 let (client, _event_rx) = make_mocked_client();
1886 client.start_with_default_executor();
1887
1888 let context = ContextBuilder::new("bob")
1889 .build()
1890 .expect("Failed to create context");
1891
1892 let (stage, _tracker) =
1893 client.migration_variation(&context, "non-existent-flag-key", Stage::Off);
1894
1895 assert_eq!(stage, Stage::Off);
1896 }
1897
1898 #[tokio::test]
1899 async fn migration_uses_non_migration_flag() {
1900 let td = TestData::new();
1901 td.use_preconfigured_flag(basic_flag("boolean-flag"));
1902 let (client, _event_rx) = make_client_with_test_data(&td);
1903 client.start_with_default_executor();
1904
1905 let context = ContextBuilder::new("bob")
1906 .build()
1907 .expect("Failed to create context");
1908
1909 let (stage, _tracker) = client.migration_variation(&context, "boolean-flag", Stage::Off);
1910
1911 assert_eq!(stage, Stage::Off);
1912 }
1913
1914 #[test_case(Stage::Off)]
1915 #[test_case(Stage::DualWrite)]
1916 #[test_case(Stage::Shadow)]
1917 #[test_case(Stage::Live)]
1918 #[test_case(Stage::Rampdown)]
1919 #[test_case(Stage::Complete)]
1920 #[tokio::test]
1921 async fn migration_can_determine_correct_stage_from_flag(stage: Stage) {
1922 let td = TestData::new();
1923 td.use_preconfigured_flag(basic_migration_flag("stage-flag", stage));
1924 let (client, _event_rx) = make_client_with_test_data(&td);
1925 client.start_with_default_executor();
1926
1927 let context = ContextBuilder::new("bob")
1928 .build()
1929 .expect("Failed to create context");
1930
1931 let (evaluated_stage, _tracker) =
1932 client.migration_variation(&context, "stage-flag", Stage::Off);
1933
1934 assert_eq!(evaluated_stage, stage);
1935 }
1936
1937 #[tokio::test]
1938 async fn migration_tracks_invoked_correctly() {
1939 migration_tracks_invoked_correctly_driver(Stage::Off, Operation::Read, vec![Origin::Old])
1940 .await;
1941 migration_tracks_invoked_correctly_driver(
1942 Stage::DualWrite,
1943 Operation::Read,
1944 vec![Origin::Old],
1945 )
1946 .await;
1947 migration_tracks_invoked_correctly_driver(
1948 Stage::Shadow,
1949 Operation::Read,
1950 vec![Origin::Old, Origin::New],
1951 )
1952 .await;
1953 migration_tracks_invoked_correctly_driver(
1954 Stage::Live,
1955 Operation::Read,
1956 vec![Origin::Old, Origin::New],
1957 )
1958 .await;
1959 migration_tracks_invoked_correctly_driver(
1960 Stage::Rampdown,
1961 Operation::Read,
1962 vec![Origin::New],
1963 )
1964 .await;
1965 migration_tracks_invoked_correctly_driver(
1966 Stage::Complete,
1967 Operation::Read,
1968 vec![Origin::New],
1969 )
1970 .await;
1971 migration_tracks_invoked_correctly_driver(Stage::Off, Operation::Write, vec![Origin::Old])
1972 .await;
1973 migration_tracks_invoked_correctly_driver(
1974 Stage::DualWrite,
1975 Operation::Write,
1976 vec![Origin::Old, Origin::New],
1977 )
1978 .await;
1979 migration_tracks_invoked_correctly_driver(
1980 Stage::Shadow,
1981 Operation::Write,
1982 vec![Origin::Old, Origin::New],
1983 )
1984 .await;
1985 migration_tracks_invoked_correctly_driver(
1986 Stage::Live,
1987 Operation::Write,
1988 vec![Origin::Old, Origin::New],
1989 )
1990 .await;
1991 migration_tracks_invoked_correctly_driver(
1992 Stage::Rampdown,
1993 Operation::Write,
1994 vec![Origin::Old, Origin::New],
1995 )
1996 .await;
1997 migration_tracks_invoked_correctly_driver(
1998 Stage::Complete,
1999 Operation::Write,
2000 vec![Origin::New],
2001 )
2002 .await;
2003 }
2004
2005 async fn migration_tracks_invoked_correctly_driver(
2006 stage: Stage,
2007 operation: Operation,
2008 origins: Vec<Origin>,
2009 ) {
2010 let td = TestData::new();
2011 td.use_preconfigured_flag(basic_migration_flag("stage-flag", stage));
2012 let (client, event_rx) = make_client_with_test_data(&td);
2013 let client = Arc::new(client);
2014 client.start_with_default_executor();
2015
2016 let mut migrator = MigratorBuilder::new(client.clone())
2017 .read(
2018 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2019 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2020 Some(|_, _| true),
2021 )
2022 .write(
2023 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2024 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2025 )
2026 .build()
2027 .expect("migrator should build");
2028
2029 let context = ContextBuilder::new("bob")
2030 .build()
2031 .expect("Failed to create context");
2032
2033 if let Operation::Read = operation {
2034 migrator
2035 .read(
2036 &context,
2037 "stage-flag".into(),
2038 Stage::Off,
2039 serde_json::Value::Null,
2040 )
2041 .await;
2042 } else {
2043 migrator
2044 .write(
2045 &context,
2046 "stage-flag".into(),
2047 Stage::Off,
2048 serde_json::Value::Null,
2049 )
2050 .await;
2051 }
2052
2053 client.flush();
2054 client.close();
2055
2056 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2057 assert_eq!(events.len(), 3);
2058 match &events[1] {
2059 OutputEvent::MigrationOp(event) => {
2060 assert!(event.invoked.len() == origins.len());
2061 assert!(event.invoked.iter().all(|i| origins.contains(i)));
2062 }
2063 _ => panic!("Expected migration event"),
2064 }
2065 }
2066
2067 #[tokio::test]
2068 async fn migration_tracks_latency() {
2069 migration_tracks_latency_driver(Stage::Off, Operation::Read, vec![Origin::Old]).await;
2070 migration_tracks_latency_driver(Stage::DualWrite, Operation::Read, vec![Origin::Old]).await;
2071 migration_tracks_latency_driver(
2072 Stage::Shadow,
2073 Operation::Read,
2074 vec![Origin::Old, Origin::New],
2075 )
2076 .await;
2077 migration_tracks_latency_driver(
2078 Stage::Live,
2079 Operation::Read,
2080 vec![Origin::Old, Origin::New],
2081 )
2082 .await;
2083 migration_tracks_latency_driver(Stage::Rampdown, Operation::Read, vec![Origin::New]).await;
2084 migration_tracks_latency_driver(Stage::Complete, Operation::Read, vec![Origin::New]).await;
2085 migration_tracks_latency_driver(Stage::Off, Operation::Write, vec![Origin::Old]).await;
2086 migration_tracks_latency_driver(
2087 Stage::DualWrite,
2088 Operation::Write,
2089 vec![Origin::Old, Origin::New],
2090 )
2091 .await;
2092 migration_tracks_latency_driver(
2093 Stage::Shadow,
2094 Operation::Write,
2095 vec![Origin::Old, Origin::New],
2096 )
2097 .await;
2098 migration_tracks_latency_driver(
2099 Stage::Live,
2100 Operation::Write,
2101 vec![Origin::Old, Origin::New],
2102 )
2103 .await;
2104 migration_tracks_latency_driver(
2105 Stage::Rampdown,
2106 Operation::Write,
2107 vec![Origin::Old, Origin::New],
2108 )
2109 .await;
2110 migration_tracks_latency_driver(Stage::Complete, Operation::Write, vec![Origin::New]).await;
2111 }
2112
2113 async fn migration_tracks_latency_driver(
2114 stage: Stage,
2115 operation: Operation,
2116 origins: Vec<Origin>,
2117 ) {
2118 let td = TestData::new();
2119 td.use_preconfigured_flag(basic_migration_flag("stage-flag", stage));
2120 let (client, event_rx) = make_client_with_test_data(&td);
2121 let client = Arc::new(client);
2122 client.start_with_default_executor();
2123
2124 let mut migrator = MigratorBuilder::new(client.clone())
2125 .track_latency(true)
2126 .read(
2127 |_| {
2128 async move {
2129 async_std::task::sleep(Duration::from_millis(100)).await;
2130 Ok(serde_json::Value::Null)
2131 }
2132 .boxed()
2133 },
2134 |_| {
2135 async move {
2136 async_std::task::sleep(Duration::from_millis(100)).await;
2137 Ok(serde_json::Value::Null)
2138 }
2139 .boxed()
2140 },
2141 Some(|_, _| true),
2142 )
2143 .write(
2144 |_| {
2145 async move {
2146 async_std::task::sleep(Duration::from_millis(100)).await;
2147 Ok(serde_json::Value::Null)
2148 }
2149 .boxed()
2150 },
2151 |_| {
2152 async move {
2153 async_std::task::sleep(Duration::from_millis(100)).await;
2154 Ok(serde_json::Value::Null)
2155 }
2156 .boxed()
2157 },
2158 )
2159 .build()
2160 .expect("migrator should build");
2161
2162 let context = ContextBuilder::new("bob")
2163 .build()
2164 .expect("Failed to create context");
2165
2166 if let Operation::Read = operation {
2167 migrator
2168 .read(
2169 &context,
2170 "stage-flag".into(),
2171 Stage::Off,
2172 serde_json::Value::Null,
2173 )
2174 .await;
2175 } else {
2176 migrator
2177 .write(
2178 &context,
2179 "stage-flag".into(),
2180 Stage::Off,
2181 serde_json::Value::Null,
2182 )
2183 .await;
2184 }
2185
2186 client.flush();
2187 client.close();
2188
2189 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2190 assert_eq!(events.len(), 3);
2191 match &events[1] {
2192 OutputEvent::MigrationOp(event) => {
2193 assert!(event.latency.len() == origins.len());
2194 assert!(event
2195 .latency
2196 .values()
2197 .all(|l| l > &Duration::from_millis(100)));
2198 }
2199 _ => panic!("Expected migration event"),
2200 }
2201 }
2202
2203 #[tokio::test]
2204 async fn migration_tracks_read_errors() {
2205 migration_tracks_read_errors_driver(Stage::Off, vec![Origin::Old]).await;
2206 migration_tracks_read_errors_driver(Stage::DualWrite, vec![Origin::Old]).await;
2207 migration_tracks_read_errors_driver(Stage::Shadow, vec![Origin::Old, Origin::New]).await;
2208 migration_tracks_read_errors_driver(Stage::Live, vec![Origin::Old, Origin::New]).await;
2209 migration_tracks_read_errors_driver(Stage::Rampdown, vec![Origin::New]).await;
2210 migration_tracks_read_errors_driver(Stage::Complete, vec![Origin::New]).await;
2211 }
2212
2213 async fn migration_tracks_read_errors_driver(stage: Stage, origins: Vec<Origin>) {
2214 let td = TestData::new();
2215 td.use_preconfigured_flag(basic_migration_flag("stage-flag", stage));
2216 let (client, event_rx) = make_client_with_test_data(&td);
2217 let client = Arc::new(client);
2218 client.start_with_default_executor();
2219
2220 let mut migrator = MigratorBuilder::new(client.clone())
2221 .track_latency(true)
2222 .read(
2223 |_| async move { Err("fail".into()) }.boxed(),
2224 |_| async move { Err("fail".into()) }.boxed(),
2225 Some(|_: &String, _: &String| true),
2226 )
2227 .write(
2228 |_| async move { Err("fail".into()) }.boxed(),
2229 |_| async move { Err("fail".into()) }.boxed(),
2230 )
2231 .build()
2232 .expect("migrator should build");
2233
2234 let context = ContextBuilder::new("bob")
2235 .build()
2236 .expect("Failed to create context");
2237
2238 migrator
2239 .read(
2240 &context,
2241 "stage-flag".into(),
2242 Stage::Off,
2243 serde_json::Value::Null,
2244 )
2245 .await;
2246 client.flush();
2247 client.close();
2248
2249 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2250 assert_eq!(events.len(), 3);
2251 match &events[1] {
2252 OutputEvent::MigrationOp(event) => {
2253 assert!(event.errors.len() == origins.len());
2254 assert!(event.errors.iter().all(|i| origins.contains(i)));
2255 }
2256 _ => panic!("Expected migration event"),
2257 }
2258 }
2259
2260 #[tokio::test]
2261 async fn migration_tracks_authoritative_write_errors() {
2262 migration_tracks_authoritative_write_errors_driver(Stage::Off, vec![Origin::Old]).await;
2263 migration_tracks_authoritative_write_errors_driver(Stage::DualWrite, vec![Origin::Old])
2264 .await;
2265 migration_tracks_authoritative_write_errors_driver(Stage::Shadow, vec![Origin::Old]).await;
2266 migration_tracks_authoritative_write_errors_driver(Stage::Live, vec![Origin::New]).await;
2267 migration_tracks_authoritative_write_errors_driver(Stage::Rampdown, vec![Origin::New])
2268 .await;
2269 migration_tracks_authoritative_write_errors_driver(Stage::Complete, vec![Origin::New])
2270 .await;
2271 }
2272
2273 async fn migration_tracks_authoritative_write_errors_driver(
2274 stage: Stage,
2275 origins: Vec<Origin>,
2276 ) {
2277 let td = TestData::new();
2278 td.use_preconfigured_flag(basic_migration_flag("stage-flag", stage));
2279 let (client, event_rx) = make_client_with_test_data(&td);
2280 let client = Arc::new(client);
2281 client.start_with_default_executor();
2282
2283 let mut migrator = MigratorBuilder::new(client.clone())
2284 .track_latency(true)
2285 .read(
2286 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2287 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2288 None,
2289 )
2290 .write(
2291 |_| async move { Err("fail".into()) }.boxed(),
2292 |_| async move { Err("fail".into()) }.boxed(),
2293 )
2294 .build()
2295 .expect("migrator should build");
2296
2297 let context = ContextBuilder::new("bob")
2298 .build()
2299 .expect("Failed to create context");
2300
2301 migrator
2302 .write(
2303 &context,
2304 "stage-flag".into(),
2305 Stage::Off,
2306 serde_json::Value::Null,
2307 )
2308 .await;
2309
2310 client.flush();
2311 client.close();
2312
2313 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2314 assert_eq!(events.len(), 3);
2315 match &events[1] {
2316 OutputEvent::MigrationOp(event) => {
2317 assert!(event.errors.len() == origins.len());
2318 assert!(event.errors.iter().all(|i| origins.contains(i)));
2319 }
2320 _ => panic!("Expected migration event"),
2321 }
2322 }
2323
2324 #[tokio::test]
2325 async fn migration_tracks_nonauthoritative_write_errors() {
2326 migration_tracks_nonauthoritative_write_errors_driver(
2327 Stage::DualWrite,
2328 false,
2329 true,
2330 vec![Origin::New],
2331 )
2332 .await;
2333 migration_tracks_nonauthoritative_write_errors_driver(
2334 Stage::Shadow,
2335 false,
2336 true,
2337 vec![Origin::New],
2338 )
2339 .await;
2340 migration_tracks_nonauthoritative_write_errors_driver(
2341 Stage::Live,
2342 true,
2343 false,
2344 vec![Origin::Old],
2345 )
2346 .await;
2347 migration_tracks_nonauthoritative_write_errors_driver(
2348 Stage::Rampdown,
2349 true,
2350 false,
2351 vec![Origin::Old],
2352 )
2353 .await;
2354 }
2355
2356 async fn migration_tracks_nonauthoritative_write_errors_driver(
2357 stage: Stage,
2358 fail_old: bool,
2359 fail_new: bool,
2360 origins: Vec<Origin>,
2361 ) {
2362 let td = TestData::new();
2363 td.use_preconfigured_flag(basic_migration_flag("stage-flag", stage));
2364 let (client, event_rx) = make_client_with_test_data(&td);
2365 let client = Arc::new(client);
2366 client.start_with_default_executor();
2367
2368 let mut migrator = MigratorBuilder::new(client.clone())
2369 .track_latency(true)
2370 .read(
2371 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2372 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2373 None,
2374 )
2375 .write(
2376 move |_| {
2377 async move {
2378 if fail_old {
2379 Err("fail".into())
2380 } else {
2381 Ok(serde_json::Value::Null)
2382 }
2383 }
2384 .boxed()
2385 },
2386 move |_| {
2387 async move {
2388 if fail_new {
2389 Err("fail".into())
2390 } else {
2391 Ok(serde_json::Value::Null)
2392 }
2393 }
2394 .boxed()
2395 },
2396 )
2397 .build()
2398 .expect("migrator should build");
2399
2400 let context = ContextBuilder::new("bob")
2401 .build()
2402 .expect("Failed to create context");
2403
2404 migrator
2405 .write(
2406 &context,
2407 "stage-flag".into(),
2408 Stage::Off,
2409 serde_json::Value::Null,
2410 )
2411 .await;
2412
2413 client.flush();
2414 client.close();
2415
2416 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2417 assert_eq!(events.len(), 3);
2418 match &events[1] {
2419 OutputEvent::MigrationOp(event) => {
2420 assert!(event.errors.len() == origins.len());
2421 assert!(event.errors.iter().all(|i| origins.contains(i)));
2422 }
2423 _ => panic!("Expected migration event"),
2424 }
2425 }
2426
2427 #[tokio::test]
2428 async fn migration_tracks_consistency() {
2429 migration_tracks_consistency_driver(Stage::Shadow, "same", "same", true).await;
2430 migration_tracks_consistency_driver(Stage::Shadow, "same", "different", false).await;
2431 migration_tracks_consistency_driver(Stage::Live, "same", "same", true).await;
2432 migration_tracks_consistency_driver(Stage::Live, "same", "different", false).await;
2433 }
2434
2435 async fn migration_tracks_consistency_driver(
2436 stage: Stage,
2437 old_return: &'static str,
2438 new_return: &'static str,
2439 expected_consistency: bool,
2440 ) {
2441 let td = TestData::new();
2442 td.use_preconfigured_flag(basic_migration_flag("stage-flag", stage));
2443 let (client, event_rx) = make_client_with_test_data(&td);
2444 let client = Arc::new(client);
2445 client.start_with_default_executor();
2446
2447 let mut migrator = MigratorBuilder::new(client.clone())
2448 .track_latency(true)
2449 .read(
2450 |_| {
2451 async move {
2452 async_std::task::sleep(Duration::from_millis(100)).await;
2453 Ok(serde_json::Value::String(old_return.to_string()))
2454 }
2455 .boxed()
2456 },
2457 |_| {
2458 async move {
2459 async_std::task::sleep(Duration::from_millis(100)).await;
2460 Ok(serde_json::Value::String(new_return.to_string()))
2461 }
2462 .boxed()
2463 },
2464 Some(|lhs, rhs| lhs == rhs),
2465 )
2466 .write(
2467 |_| {
2468 async move {
2469 async_std::task::sleep(Duration::from_millis(100)).await;
2470 Ok(serde_json::Value::Null)
2471 }
2472 .boxed()
2473 },
2474 |_| {
2475 async move {
2476 async_std::task::sleep(Duration::from_millis(100)).await;
2477 Ok(serde_json::Value::Null)
2478 }
2479 .boxed()
2480 },
2481 )
2482 .build()
2483 .expect("migrator should build");
2484
2485 let context = ContextBuilder::new("bob")
2486 .build()
2487 .expect("Failed to create context");
2488
2489 migrator
2490 .read(
2491 &context,
2492 "stage-flag".into(),
2493 Stage::Off,
2494 serde_json::Value::Null,
2495 )
2496 .await;
2497
2498 client.flush();
2499 client.close();
2500
2501 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2502 assert_eq!(events.len(), 3);
2503 match &events[1] {
2504 OutputEvent::MigrationOp(event) => {
2505 assert!(event.consistency_check == Some(expected_consistency))
2506 }
2507 _ => panic!("Expected migration event"),
2508 }
2509 }
2510
2511 #[tokio::test]
2512 async fn client_flush_blocking_completes_successfully() {
2513 let (client, event_rx) = make_mocked_client();
2514 client.start_with_default_executor();
2515 client.wait_for_initialization(Duration::from_secs(1)).await;
2516
2517 let context = ContextBuilder::new("user-key")
2518 .build()
2519 .expect("Failed to create context");
2520
2521 client.identify(context);
2522
2523 let result = client.flush_blocking(Duration::from_secs(5)).await;
2524 assert!(result, "flush_blocking should complete successfully");
2525
2526 client.close();
2527
2528 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2529 assert!(!events.is_empty(), "Should have received identify event");
2530 }
2531
2532 #[tokio::test]
2533 async fn client_flush_blocking_with_zero_timeout() {
2534 let (client, event_rx) = make_mocked_client();
2535 client.start_with_default_executor();
2536 client.wait_for_initialization(Duration::from_secs(1)).await;
2537
2538 let context = ContextBuilder::new("user-key")
2539 .build()
2540 .expect("Failed to create context");
2541
2542 client.identify(context);
2543
2544 let result = client.flush_blocking(Duration::ZERO).await;
2545 assert!(
2546 result,
2547 "flush_blocking with zero timeout should complete successfully"
2548 );
2549
2550 client.close();
2551
2552 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2553 assert!(!events.is_empty(), "Should have received identify event");
2554 }
2555
2556 #[tokio::test]
2557 async fn client_flush_blocking_with_no_events() {
2558 let (client, _event_rx) = make_mocked_client();
2559 client.start_with_default_executor();
2560 client.wait_for_initialization(Duration::from_secs(1)).await;
2561
2562 let result = client.flush_blocking(Duration::from_secs(1)).await;
2563 assert!(
2564 result,
2565 "flush_blocking with no events should complete immediately"
2566 );
2567
2568 client.close();
2569 }
2570
2571 #[tokio::test]
2572 async fn client_flush_blocking_multiple_concurrent_calls() {
2573 let (client, event_rx) = make_mocked_client();
2574 client.start_with_default_executor();
2575 client.wait_for_initialization(Duration::from_secs(1)).await;
2576
2577 let context = ContextBuilder::new("user-key")
2578 .build()
2579 .expect("Failed to create context");
2580
2581 client.identify(context);
2582
2583 let (result1, result2, result3) = tokio::join!(
2585 client.flush_blocking(Duration::from_secs(5)),
2586 client.flush_blocking(Duration::from_secs(5)),
2587 client.flush_blocking(Duration::from_secs(5)),
2588 );
2589
2590 assert!(result1, "First flush_blocking should succeed");
2591 assert!(result2, "Second flush_blocking should succeed");
2592 assert!(result3, "Third flush_blocking should succeed");
2593
2594 client.close();
2595
2596 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2597 assert!(!events.is_empty(), "Should have received identify event");
2598 }
2599
2600 fn make_mocked_client_with_delay(
2601 delay: u64,
2602 offline: bool,
2603 daemon_mode: bool,
2604 ) -> (Client, Receiver<OutputEvent>) {
2605 let updates = Arc::new(MockDataSource::new_with_init_delay(delay));
2606 let (event_sender, event_rx) = create_event_sender();
2607
2608 let config = ConfigBuilder::new("sdk-key")
2609 .offline(offline)
2610 .daemon_mode(daemon_mode)
2611 .data_source(MockDataSourceBuilder::new().data_source(updates))
2612 .event_processor(
2613 EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new()
2614 .event_sender(Arc::new(event_sender)),
2615 )
2616 .build()
2617 .expect("config should build");
2618
2619 let client = Client::build(config).expect("Should be built.");
2620
2621 (client, event_rx)
2622 }
2623
2624 fn make_mocked_offline_client() -> (Client, Receiver<OutputEvent>) {
2625 make_mocked_client_with_delay(0, true, false)
2626 }
2627
2628 fn make_mocked_client() -> (Client, Receiver<OutputEvent>) {
2629 make_mocked_client_with_delay(0, false, false)
2630 }
2631
2632 fn make_client_with_test_data(td: &TestData) -> (Client, Receiver<OutputEvent>) {
2633 let (event_sender, event_rx) = create_event_sender();
2634 let config = ConfigBuilder::new("sdk-key")
2635 .data_source(td)
2636 .event_processor(
2637 EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new()
2638 .event_sender(Arc::new(event_sender)),
2639 )
2640 .build()
2641 .expect("config should build");
2642 let client = Client::build(config).expect("Should be built.");
2643 (client, event_rx)
2644 }
2645
2646 #[test]
2647 fn client_builds_successfully() {
2648 let config = ConfigBuilder::new("sdk-key")
2649 .offline(true)
2650 .build()
2651 .expect("config should build");
2652
2653 let client = Client::build(config).expect("client should build successfully");
2654
2655 assert!(
2656 !client.started.load(Ordering::SeqCst),
2657 "client should not be started yet"
2658 );
2659 assert!(client.offline, "client should be in offline mode");
2660 assert_eq!(client.sdk_key, "sdk-key", "sdk_key should match");
2661 }
2662}