1use eval::Context;
2use parking_lot::RwLock;
3use std::io;
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7use tokio::runtime::Runtime;
8
9use launchdarkly_server_sdk_evaluation::{self as eval, Detail, FlagValue, PrerequisiteEvent};
10use serde::Serialize;
11use thiserror::Error;
12use tokio::sync::{broadcast, Semaphore};
13
14use super::config::Config;
15use super::data_source::DataSource;
16use super::data_source_builders::BuildError as DataSourceError;
17use super::evaluation::{FlagDetail, FlagDetailConfig};
18use super::stores::store::DataStore;
19use super::stores::store_builders::BuildError as DataStoreError;
20use crate::config::BuildError as ConfigBuildError;
21use crate::events::event::EventFactory;
22use crate::events::event::InputEvent;
23use crate::events::processor::EventProcessor;
24use crate::events::processor_builders::BuildError as EventProcessorError;
25use crate::{MigrationOpTracker, Stage};
26
27struct EventsScope {
28 disabled: bool,
29 event_factory: EventFactory,
30 prerequisite_event_recorder: Box<dyn eval::PrerequisiteEventRecorder + Send + Sync>,
31}
32
33struct PrerequisiteEventRecorder {
34 event_factory: EventFactory,
35 event_processor: Arc<dyn EventProcessor>,
36}
37
38impl eval::PrerequisiteEventRecorder for PrerequisiteEventRecorder {
39 fn record(&self, event: PrerequisiteEvent) {
40 let evt = self.event_factory.new_eval_event(
41 &event.prerequisite_flag.key,
42 event.context.clone(),
43 &event.prerequisite_flag,
44 event.prerequisite_result,
45 FlagValue::Json(serde_json::Value::Null),
46 Some(event.target_flag_key),
47 );
48
49 self.event_processor.send(evt);
50 }
51}
52
53#[non_exhaustive]
55#[derive(Debug, Error)]
56pub enum BuildError {
57 #[error("invalid client config: {0}")]
59 InvalidConfig(String),
60}
61
62impl From<DataSourceError> for BuildError {
63 fn from(error: DataSourceError) -> Self {
64 Self::InvalidConfig(error.to_string())
65 }
66}
67
68impl From<DataStoreError> for BuildError {
69 fn from(error: DataStoreError) -> Self {
70 Self::InvalidConfig(error.to_string())
71 }
72}
73
74impl From<EventProcessorError> for BuildError {
75 fn from(error: EventProcessorError) -> Self {
76 Self::InvalidConfig(error.to_string())
77 }
78}
79
80impl From<ConfigBuildError> for BuildError {
81 fn from(error: ConfigBuildError) -> Self {
82 Self::InvalidConfig(error.to_string())
83 }
84}
85
86#[non_exhaustive]
88#[derive(Debug, Error)]
89pub enum StartError {
90 #[error("couldn't spawn background thread for client: {0}")]
92 SpawnFailed(io::Error),
93}
94
95#[derive(PartialEq, Copy, Clone, Debug)]
96enum ClientInitState {
97 Initializing = 0,
98 Initialized = 1,
99 InitializationFailed = 2,
100}
101
102impl PartialEq<usize> for ClientInitState {
103 fn eq(&self, other: &usize) -> bool {
104 *self as usize == *other
105 }
106}
107
108impl From<usize> for ClientInitState {
109 fn from(val: usize) -> Self {
110 match val {
111 0 => ClientInitState::Initializing,
112 1 => ClientInitState::Initialized,
113 2 => ClientInitState::InitializationFailed,
114 _ => unreachable!(),
115 }
116 }
117}
118
119pub struct Client {
149 event_processor: Arc<dyn EventProcessor>,
150 data_source: Arc<dyn DataSource>,
151 data_store: Arc<RwLock<dyn DataStore>>,
152 events_default: EventsScope,
153 events_with_reasons: EventsScope,
154 init_notify: Arc<Semaphore>,
155 init_state: Arc<AtomicUsize>,
156 started: AtomicBool,
157 offline: bool,
158 daemon_mode: bool,
159 #[cfg_attr(
160 not(any(feature = "crypto-openssl", feature = "crypto-aws-lc-rs")),
161 allow(dead_code)
162 )]
163 sdk_key: String,
164 shutdown_broadcast: broadcast::Sender<()>,
165 runtime: RwLock<Option<Runtime>>,
166}
167
168impl Client {
169 pub fn build(config: Config) -> Result<Self, BuildError> {
171 if config.offline() {
172 info!("Started LaunchDarkly Client in offline mode");
173 } else if config.daemon_mode() {
174 info!("Started LaunchDarkly Client in daemon mode");
175 }
176
177 let tags = config.application_tag();
178
179 let endpoints = config.service_endpoints_builder().build()?;
180 let event_processor =
181 config
182 .event_processor_builder()
183 .build(&endpoints, config.sdk_key(), tags.clone())?;
184 let data_source =
185 config
186 .data_source_builder()
187 .build(&endpoints, config.sdk_key(), tags.clone())?;
188 let data_store = config.data_store_builder().build()?;
189
190 let events_default = EventsScope {
191 disabled: config.offline(),
192 event_factory: EventFactory::new(false),
193 prerequisite_event_recorder: Box::new(PrerequisiteEventRecorder {
194 event_factory: EventFactory::new(false),
195 event_processor: event_processor.clone(),
196 }),
197 };
198
199 let events_with_reasons = EventsScope {
200 disabled: config.offline(),
201 event_factory: EventFactory::new(true),
202 prerequisite_event_recorder: Box::new(PrerequisiteEventRecorder {
203 event_factory: EventFactory::new(true),
204 event_processor: event_processor.clone(),
205 }),
206 };
207
208 let (shutdown_tx, _) = broadcast::channel(1);
209
210 Ok(Client {
211 event_processor,
212 data_source,
213 data_store,
214 events_default,
215 events_with_reasons,
216 init_notify: Arc::new(Semaphore::new(0)),
217 init_state: Arc::new(AtomicUsize::new(ClientInitState::Initializing as usize)),
218 started: AtomicBool::new(false),
219 offline: config.offline(),
220 daemon_mode: config.daemon_mode(),
221 sdk_key: config.sdk_key().into(),
222 shutdown_broadcast: shutdown_tx,
223 runtime: RwLock::new(None),
224 })
225 }
226
227 pub fn start_with_default_executor(&self) {
229 if self.started.load(Ordering::SeqCst) {
230 return;
231 }
232 self.started.store(true, Ordering::SeqCst);
233 self.start_with_default_executor_internal();
234 }
235
236 fn start_with_default_executor_internal(&self) {
237 let notify = self.init_notify.clone();
241 let init_state = self.init_state.clone();
242
243 self.data_source.subscribe(
244 self.data_store.clone(),
245 Arc::new(move |success| {
246 init_state.store(
247 (if success {
248 ClientInitState::Initialized
249 } else {
250 ClientInitState::InitializationFailed
251 }) as usize,
252 Ordering::SeqCst,
253 );
254 notify.add_permits(1);
255 }),
256 self.shutdown_broadcast.subscribe(),
257 );
258 }
259
260 pub fn start_with_runtime(&self) -> Result<bool, StartError> {
266 if self.started.load(Ordering::SeqCst) {
267 return Ok(true);
268 }
269 self.started.store(true, Ordering::SeqCst);
270
271 let runtime = Runtime::new().map_err(StartError::SpawnFailed)?;
272 let _guard = runtime.enter();
273 self.runtime.write().replace(runtime);
274
275 self.start_with_default_executor_internal();
276
277 Ok(true)
278 }
279
280 pub async fn wait_for_initialization(&self, timeout: Duration) -> Option<bool> {
286 if timeout > Duration::from_secs(60) {
287 warn!("wait_for_initialization was configured to block for up to {} seconds. We recommend blocking no longer than 60 seconds.", timeout.as_secs());
288 }
289
290 let initialized = tokio::time::timeout(timeout, self.initialized_async_internal()).await;
291 initialized.ok()
292 }
293
294 async fn initialized_async_internal(&self) -> bool {
295 if self.offline || self.daemon_mode {
296 return true;
297 }
298
299 if ClientInitState::Initialized != self.init_state.load(Ordering::SeqCst) {
305 let _permit = self.init_notify.acquire().await;
306 }
307 ClientInitState::Initialized == self.init_state.load(Ordering::SeqCst)
308 }
309
310 pub fn initialized(&self) -> bool {
314 self.offline
315 || self.daemon_mode
316 || ClientInitState::Initialized == self.init_state.load(Ordering::SeqCst)
317 }
318
319 pub fn close(&self) {
323 self.event_processor.close();
324
325 if !self.offline && !self.daemon_mode {
328 if let Err(e) = self.shutdown_broadcast.send(()) {
329 error!("Failed to shutdown client appropriately: {e}");
330 }
331 }
332
333 self.runtime.write().take();
336 }
337
338 pub fn flush(&self) {
346 self.event_processor.flush();
347 }
348
349 pub async fn flush_blocking(&self, timeout: Duration) -> bool {
385 let event_processor = self.event_processor.clone();
386
387 let flush_future =
388 tokio::task::spawn_blocking(move || event_processor.flush_blocking(timeout));
389
390 if timeout == Duration::ZERO {
391 flush_future.await.unwrap_or(false)
393 } else {
394 match tokio::time::timeout(timeout, flush_future).await {
396 Ok(Ok(result)) => result,
397 Ok(Err(_)) => false, Err(_) => false, }
400 }
401 }
402
403 pub fn identify(&self, context: Context) {
408 if self.events_default.disabled {
409 return;
410 }
411
412 self.send_internal(self.events_default.event_factory.new_identify(context));
413 }
414
415 pub fn bool_variation(&self, context: &Context, flag_key: &str, default: bool) -> bool {
423 let val = self.variation(context, flag_key, default);
424 if let Some(b) = val.as_bool() {
425 b
426 } else {
427 warn!("bool_variation called for a non-bool flag {flag_key:?} (got {val:?})");
428 default
429 }
430 }
431
432 pub fn str_variation(&self, context: &Context, flag_key: &str, default: String) -> String {
440 let val = self.variation(context, flag_key, default.clone());
441 if let Some(s) = val.as_string() {
442 s
443 } else {
444 warn!("str_variation called for a non-string flag {flag_key:?} (got {val:?})");
445 default
446 }
447 }
448
449 pub fn float_variation(&self, context: &Context, flag_key: &str, default: f64) -> f64 {
457 let val = self.variation(context, flag_key, default);
458 if let Some(f) = val.as_float() {
459 f
460 } else {
461 warn!("float_variation called for a non-float flag {flag_key:?} (got {val:?})");
462 default
463 }
464 }
465
466 pub fn int_variation(&self, context: &Context, flag_key: &str, default: i64) -> i64 {
474 let val = self.variation(context, flag_key, default);
475 if let Some(f) = val.as_int() {
476 f
477 } else {
478 warn!("int_variation called for a non-int flag {flag_key:?} (got {val:?})");
479 default
480 }
481 }
482
483 pub fn json_variation(
493 &self,
494 context: &Context,
495 flag_key: &str,
496 default: serde_json::Value,
497 ) -> serde_json::Value {
498 self.variation(context, flag_key, default.clone())
499 .as_json()
500 .unwrap_or(default)
501 }
502
503 pub fn bool_variation_detail(
510 &self,
511 context: &Context,
512 flag_key: &str,
513 default: bool,
514 ) -> Detail<bool> {
515 self.variation_detail(context, flag_key, default).try_map(
516 |val| val.as_bool(),
517 default,
518 eval::Error::WrongType,
519 )
520 }
521
522 pub fn str_variation_detail(
529 &self,
530 context: &Context,
531 flag_key: &str,
532 default: String,
533 ) -> Detail<String> {
534 self.variation_detail(context, flag_key, default.clone())
535 .try_map(|val| val.as_string(), default, eval::Error::WrongType)
536 }
537
538 pub fn float_variation_detail(
545 &self,
546 context: &Context,
547 flag_key: &str,
548 default: f64,
549 ) -> Detail<f64> {
550 self.variation_detail(context, flag_key, default).try_map(
551 |val| val.as_float(),
552 default,
553 eval::Error::WrongType,
554 )
555 }
556
557 pub fn int_variation_detail(
564 &self,
565 context: &Context,
566 flag_key: &str,
567 default: i64,
568 ) -> Detail<i64> {
569 self.variation_detail(context, flag_key, default).try_map(
570 |val| val.as_int(),
571 default,
572 eval::Error::WrongType,
573 )
574 }
575
576 pub fn json_variation_detail(
583 &self,
584 context: &Context,
585 flag_key: &str,
586 default: serde_json::Value,
587 ) -> Detail<serde_json::Value> {
588 self.variation_detail(context, flag_key, default.clone())
589 .try_map(|val| val.as_json(), default, eval::Error::WrongType)
590 }
591
592 #[cfg(any(feature = "crypto-aws-lc-rs", feature = "crypto-openssl"))]
593 pub fn secure_mode_hash(&self, context: &Context) -> Result<String, String> {
598 #[cfg(feature = "crypto-aws-lc-rs")]
599 {
600 let key =
601 aws_lc_rs::hmac::Key::new(aws_lc_rs::hmac::HMAC_SHA256, self.sdk_key.as_bytes());
602 let tag = aws_lc_rs::hmac::sign(&key, context.canonical_key().as_bytes());
603
604 Ok(data_encoding::HEXLOWER.encode(tag.as_ref()))
605 }
606 #[cfg(feature = "crypto-openssl")]
607 {
608 use openssl::hash::MessageDigest;
609 use openssl::pkey::PKey;
610 use openssl::sign::Signer;
611
612 let key = PKey::hmac(self.sdk_key.as_bytes())
613 .map_err(|e| format!("Failed to create HMAC key: {e}"))?;
614 let mut signer = Signer::new(MessageDigest::sha256(), &key)
615 .map_err(|e| format!("Failed to create signer: {e}"))?;
616 signer
617 .update(context.canonical_key().as_bytes())
618 .map_err(|e| format!("Failed to update signer: {e}"))?;
619 let hmac = signer
620 .sign_to_vec()
621 .map_err(|e| format!("Failed to sign: {e}"))?;
622
623 Ok(data_encoding::HEXLOWER.encode(&hmac))
624 }
625 }
626
627 pub fn all_flags_detail(
638 &self,
639 context: &Context,
640 flag_state_config: FlagDetailConfig,
641 ) -> FlagDetail {
642 if self.offline {
643 warn!(
644 "all_flags_detail() called, but client is in offline mode. Returning empty state"
645 );
646 return FlagDetail::new(false);
647 }
648
649 if !self.initialized() {
650 warn!("all_flags_detail() called before client has finished initializing! Feature store unavailable - returning empty state");
651 return FlagDetail::new(false);
652 }
653
654 let data_store = self.data_store.read();
655
656 let mut flag_detail = FlagDetail::new(true);
657 flag_detail.populate(&*data_store, context, flag_state_config);
658
659 flag_detail
660 }
661
662 pub fn variation_detail<T: Into<FlagValue> + Clone>(
668 &self,
669 context: &Context,
670 flag_key: &str,
671 default: T,
672 ) -> Detail<FlagValue> {
673 let (detail, _) =
674 self.variation_internal(context, flag_key, default, &self.events_with_reasons);
675 detail
676 }
677
678 pub fn variation<T: Into<FlagValue> + Clone>(
689 &self,
690 context: &Context,
691 flag_key: &str,
692 default: T,
693 ) -> FlagValue {
694 let (detail, _) = self.variation_internal(context, flag_key, default, &self.events_default);
695 detail.value.unwrap()
696 }
697
698 pub fn migration_variation(
703 &self,
704 context: &Context,
705 flag_key: &str,
706 default_stage: Stage,
707 ) -> (Stage, Arc<Mutex<MigrationOpTracker>>) {
708 let (detail, flag) =
709 self.variation_internal(context, flag_key, default_stage, &self.events_default);
710
711 let migration_detail =
712 detail.try_map(|v| v.try_into().ok(), default_stage, eval::Error::WrongType);
713 let tracker = MigrationOpTracker::new(
714 flag_key.into(),
715 flag,
716 context.clone(),
717 migration_detail.clone(),
718 default_stage,
719 );
720
721 (
722 migration_detail.value.unwrap_or(default_stage),
723 Arc::new(Mutex::new(tracker)),
724 )
725 }
726
727 pub fn track_event(&self, context: Context, key: impl Into<String>) {
737 let _ = self.track(context, key, None, serde_json::Value::Null);
738 }
739
740 pub fn track_data(
753 &self,
754 context: Context,
755 key: impl Into<String>,
756 data: impl Serialize,
757 ) -> serde_json::Result<()> {
758 self.track(context, key, None, data)
759 }
760
761 pub fn track_metric(
772 &self,
773 context: Context,
774 key: impl Into<String>,
775 value: f64,
776 data: impl Serialize,
777 ) {
778 let _ = self.track(context, key, Some(value), data);
779 }
780
781 fn track(
782 &self,
783 context: Context,
784 key: impl Into<String>,
785 metric_value: Option<f64>,
786 data: impl Serialize,
787 ) -> serde_json::Result<()> {
788 if !self.events_default.disabled {
789 let event =
790 self.events_default
791 .event_factory
792 .new_custom(context, key, metric_value, data)?;
793
794 self.send_internal(event);
795 }
796
797 Ok(())
798 }
799
800 pub fn track_migration_op(&self, tracker: Arc<Mutex<MigrationOpTracker>>) {
807 if self.events_default.disabled {
808 return;
809 }
810
811 match tracker.lock() {
812 Ok(tracker) => {
813 let event = tracker.build();
814 match event {
815 Ok(event) => {
816 self.send_internal(
817 self.events_default.event_factory.new_migration_op(event),
818 );
819 }
820 Err(e) => error!("Failed to build migration event, no event will be sent: {e}"),
821 }
822 }
823 Err(e) => error!("Failed to lock migration tracker, no event will be sent: {e}"),
824 }
825 }
826
827 fn variation_internal<T: Into<FlagValue> + Clone>(
828 &self,
829 context: &Context,
830 flag_key: &str,
831 default: T,
832 events_scope: &EventsScope,
833 ) -> (Detail<FlagValue>, Option<eval::Flag>) {
834 if self.offline {
835 return (
836 Detail::err_default(eval::Error::ClientNotReady, default.into()),
837 None,
838 );
839 }
840
841 let (flag, result) = match self.initialized() {
842 false => (
843 None,
844 Detail::err_default(eval::Error::ClientNotReady, default.clone().into()),
845 ),
846 true => {
847 let data_store = self.data_store.read();
848 match data_store.flag(flag_key) {
849 Some(flag) => {
850 let result = eval::evaluate(
851 data_store.to_store(),
852 &flag,
853 context,
854 Some(&*events_scope.prerequisite_event_recorder),
855 )
856 .map(|v| v.clone())
857 .or(default.clone().into());
858
859 (Some(flag), result)
860 }
861 None => (
862 None,
863 Detail::err_default(eval::Error::FlagNotFound, default.clone().into()),
864 ),
865 }
866 }
867 };
868
869 if !events_scope.disabled {
870 let event = match &flag {
871 Some(f) => events_scope.event_factory.new_eval_event(
872 flag_key,
873 context.clone(),
874 f,
875 result.clone(),
876 default.into(),
877 None,
878 ),
879 None => events_scope.event_factory.new_unknown_flag_event(
880 flag_key,
881 context.clone(),
882 result.clone(),
883 default.into(),
884 ),
885 };
886 self.send_internal(event);
887 }
888
889 (result, flag)
890 }
891
892 fn send_internal(&self, event: InputEvent) {
893 self.event_processor.send(event);
894 }
895}
896
897#[cfg(test)]
898mod tests {
899 use assert_json_diff::assert_json_eq;
900 use crossbeam_channel::Receiver;
901 use eval::{ContextBuilder, MultiContextBuilder};
902 use futures::FutureExt;
903 use launchdarkly_server_sdk_evaluation::{Flag, Reason, Segment};
904 use maplit::hashmap;
905 use std::collections::HashMap;
906 use tokio::time::Instant;
907
908 use crate::data_source::MockDataSource;
909 use crate::data_source_builders::MockDataSourceBuilder;
910 use crate::evaluation::FlagFilter;
911 use crate::events::create_event_sender;
912 use crate::events::event::{OutputEvent, VariationKey};
913 use crate::events::processor_builders::EventProcessorBuilder;
914 use crate::stores::persistent_store::tests::InMemoryPersistentDataStore;
915 use crate::stores::store_types::{PatchTarget, StorageItem};
916 use crate::test_common::{
917 self, basic_flag, basic_flag_with_prereq, basic_flag_with_prereqs_and_visibility,
918 basic_flag_with_visibility, basic_int_flag, basic_migration_flag, basic_off_flag,
919 };
920 use crate::test_data::TestData;
921 use crate::{
922 AllData, ConfigBuilder, MigratorBuilder, NullEventProcessorBuilder, Operation, Origin,
923 PersistentDataStore, PersistentDataStoreBuilder, PersistentDataStoreFactory,
924 SerializedItem,
925 };
926 use test_case::test_case;
927
928 use super::*;
929
930 fn is_send_and_sync<T: Send + Sync>() {}
931
932 #[test]
933 fn ensure_client_is_send_and_sync() {
934 is_send_and_sync::<Client>()
935 }
936
937 #[tokio::test]
938 async fn client_asynchronously_initializes_within_timeout() {
939 let (client, _event_rx) = make_mocked_client_with_delay(1000, false, false);
940 client.start_with_default_executor();
941
942 let now = Instant::now();
943 let initialized = client
944 .wait_for_initialization(Duration::from_millis(1500))
945 .await;
946 let elapsed_time = now.elapsed();
947 assert!(elapsed_time.as_millis() > 500);
949 assert_eq!(initialized, Some(true));
950 }
951
952 #[tokio::test]
953 async fn client_asynchronously_initializes_slower_than_timeout() {
954 let (client, _event_rx) = make_mocked_client_with_delay(2000, false, false);
955 client.start_with_default_executor();
956
957 let now = Instant::now();
958 let initialized = client
959 .wait_for_initialization(Duration::from_millis(500))
960 .await;
961 let elapsed_time = now.elapsed();
962 assert!(elapsed_time.as_millis() < 750);
964 assert!(initialized.is_none());
965 }
966
967 #[tokio::test]
968 async fn client_initializes_immediately_in_offline_mode() {
969 let (client, _event_rx) = make_mocked_client_with_delay(1000, true, false);
970 client.start_with_default_executor();
971
972 assert!(client.initialized());
973
974 let now = Instant::now();
975 let initialized = client
976 .wait_for_initialization(Duration::from_millis(2000))
977 .await;
978 let elapsed_time = now.elapsed();
979 assert_eq!(initialized, Some(true));
980 assert!(elapsed_time.as_millis() < 500)
981 }
982
983 #[tokio::test]
984 async fn client_initializes_immediately_in_daemon_mode() {
985 let (client, _event_rx) = make_mocked_client_with_delay(1000, false, true);
986 client.start_with_default_executor();
987
988 assert!(client.initialized());
989
990 let now = Instant::now();
991 let initialized = client
992 .wait_for_initialization(Duration::from_millis(2000))
993 .await;
994 let elapsed_time = now.elapsed();
995 assert_eq!(initialized, Some(true));
996 assert!(elapsed_time.as_millis() < 500)
997 }
998
999 #[test_case(basic_flag("myFlag"), false.into(), true.into())]
1000 #[test_case(basic_int_flag("myFlag"), 0.into(), test_common::FLOAT_TO_INT_MAX.into())]
1001 fn client_updates_changes_evaluation_results(
1002 flag: eval::Flag,
1003 default: FlagValue,
1004 expected: FlagValue,
1005 ) {
1006 let context = ContextBuilder::new("foo")
1007 .build()
1008 .expect("Failed to create context");
1009
1010 let (client, _event_rx) = make_mocked_client();
1011
1012 let result = client.variation_detail(&context, "myFlag", default.clone());
1013 assert_eq!(result.value.unwrap(), default);
1014
1015 client.start_with_default_executor();
1016 client
1017 .data_store
1018 .write()
1019 .upsert(
1020 &flag.key,
1021 PatchTarget::Flag(StorageItem::Item(flag.clone())),
1022 )
1023 .expect("patch should apply");
1024
1025 let result = client.variation_detail(&context, "myFlag", default);
1026 assert_eq!(result.value.unwrap(), expected);
1027 assert!(matches!(
1028 result.reason,
1029 Reason::Fallthrough {
1030 in_experiment: false
1031 }
1032 ));
1033 }
1034
1035 #[test]
1036 fn all_flags_detail_is_invalid_when_offline() {
1037 let (client, _event_rx) = make_mocked_offline_client();
1038 client.start_with_default_executor();
1039
1040 let context = ContextBuilder::new("bob")
1041 .build()
1042 .expect("Failed to create context");
1043
1044 let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1045 assert_json_eq!(all_flags, json!({"$valid": false, "$flagsState" : {}}));
1046 }
1047
1048 #[test]
1049 fn all_flags_detail_is_invalid_when_not_initialized() {
1050 let (client, _event_rx) = make_mocked_client();
1051
1052 let context = ContextBuilder::new("bob")
1053 .build()
1054 .expect("Failed to create context");
1055
1056 let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1057 assert_json_eq!(all_flags, json!({"$valid": false, "$flagsState" : {}}));
1058 }
1059
1060 #[tokio::test]
1061 async fn all_flags_detail_returns_flag_states() {
1062 let td = TestData::new();
1063 td.use_preconfigured_flag(basic_flag("myFlag1"));
1064 td.use_preconfigured_flag(basic_flag("myFlag2"));
1065 let (client, _event_rx) = make_client_with_test_data(&td);
1066 client.start_with_default_executor();
1067
1068 let context = ContextBuilder::new("bob")
1069 .build()
1070 .expect("Failed to create context");
1071
1072 let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1073
1074 client.close();
1075
1076 assert_json_eq!(
1077 all_flags,
1078 json!({
1079 "myFlag1": true,
1080 "myFlag2": true,
1081 "$flagsState": {
1082 "myFlag1": {
1083 "version": 1,
1084 "variation": 1
1085 },
1086 "myFlag2": {
1087 "version": 1,
1088 "variation": 1
1089 },
1090 },
1091 "$valid": true
1092 })
1093 );
1094 }
1095
1096 #[tokio::test]
1097 async fn all_flags_detail_returns_prerequisite_relations() {
1098 let td = TestData::new();
1099 td.use_preconfigured_flag(basic_flag("prereq1"));
1100 td.use_preconfigured_flag(basic_flag("prereq2"));
1101 td.use_preconfigured_flag(basic_flag_with_prereqs_and_visibility(
1102 "toplevel",
1103 &["prereq1", "prereq2"],
1104 false,
1105 false,
1106 ));
1107 let (client, _event_rx) = make_client_with_test_data(&td);
1108 client.start_with_default_executor();
1109
1110 let context = ContextBuilder::new("bob")
1111 .build()
1112 .expect("Failed to create context");
1113
1114 let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1115
1116 client.close();
1117
1118 assert_json_eq!(
1119 all_flags,
1120 json!({
1121 "prereq1": true,
1122 "prereq2": true,
1123 "toplevel": true,
1124 "$flagsState": {
1125 "toplevel": {
1126 "version": 1,
1127 "variation": 1,
1128 "prerequisites": ["prereq1", "prereq2"]
1129 },
1130 "prereq1": {
1131 "version": 1,
1132 "variation": 1
1133 },
1134 "prereq2": {
1135 "version": 1,
1136 "variation": 1
1137 },
1138 },
1139 "$valid": true
1140 })
1141 );
1142 }
1143
1144 #[tokio::test]
1145 async fn all_flags_detail_returns_prerequisite_relations_when_not_visible_to_clients() {
1146 let td = TestData::new();
1147 td.use_preconfigured_flag(basic_flag_with_visibility("prereq1", false, false));
1148 td.use_preconfigured_flag(basic_flag_with_visibility("prereq2", false, false));
1149 td.use_preconfigured_flag(basic_flag_with_prereqs_and_visibility(
1150 "toplevel",
1151 &["prereq1", "prereq2"],
1152 true,
1153 false,
1154 ));
1155 let (client, _event_rx) = make_client_with_test_data(&td);
1156 client.start_with_default_executor();
1157
1158 let context = ContextBuilder::new("bob")
1159 .build()
1160 .expect("Failed to create context");
1161
1162 let mut config = FlagDetailConfig::new();
1163 config.flag_filter(FlagFilter::CLIENT);
1164
1165 let all_flags = client.all_flags_detail(&context, config);
1166
1167 client.close();
1168
1169 assert_json_eq!(
1170 all_flags,
1171 json!({
1172 "toplevel": true,
1173 "$flagsState": {
1174 "toplevel": {
1175 "version": 1,
1176 "variation": 1,
1177 "prerequisites": ["prereq1", "prereq2"]
1178 },
1179 },
1180 "$valid": true
1181 })
1182 );
1183 }
1184
1185 #[tokio::test]
1186 async fn variation_tracks_events_correctly() {
1187 let td = TestData::new();
1188 td.use_preconfigured_flag(basic_flag("myFlag"));
1189 let (client, event_rx) = make_client_with_test_data(&td);
1190 client.start_with_default_executor();
1191
1192 let context = ContextBuilder::new("bob")
1193 .build()
1194 .expect("Failed to create context");
1195
1196 let flag_value = client.variation(&context, "myFlag", FlagValue::Bool(false));
1197
1198 assert!(flag_value.as_bool().unwrap());
1199 client.flush();
1200 client.close();
1201
1202 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1203 assert_eq!(events.len(), 2);
1204 assert_eq!(events[0].kind(), "index");
1205 assert_eq!(events[1].kind(), "summary");
1206
1207 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1208 let variation_key = VariationKey {
1209 version: Some(1),
1210 variation: Some(1),
1211 };
1212 let feature = event_summary.features.get("myFlag");
1213 assert!(feature.is_some());
1214
1215 let feature = feature.unwrap();
1216 assert!(feature.counters.contains_key(&variation_key));
1217 } else {
1218 panic!("Event should be a summary type");
1219 }
1220 }
1221
1222 #[test]
1223 fn variation_handles_offline_mode() {
1224 let (client, event_rx) = make_mocked_offline_client();
1225 client.start_with_default_executor();
1226
1227 let context = ContextBuilder::new("bob")
1228 .build()
1229 .expect("Failed to create context");
1230 let flag_value = client.variation(&context, "myFlag", FlagValue::Bool(false));
1231
1232 assert!(!flag_value.as_bool().unwrap());
1233 client.flush();
1234 client.close();
1235
1236 assert_eq!(event_rx.iter().count(), 0);
1237 }
1238
1239 #[test]
1240 fn variation_handles_unknown_flags() {
1241 let (client, event_rx) = make_mocked_client();
1242 client.start_with_default_executor();
1243 let context = ContextBuilder::new("bob")
1244 .build()
1245 .expect("Failed to create context");
1246
1247 let flag_value = client.variation(&context, "non-existent-flag", FlagValue::Bool(false));
1248
1249 assert!(!flag_value.as_bool().unwrap());
1250 client.flush();
1251 client.close();
1252
1253 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1254 assert_eq!(events.len(), 2);
1255 assert_eq!(events[0].kind(), "index");
1256 assert_eq!(events[1].kind(), "summary");
1257
1258 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1259 let variation_key = VariationKey {
1260 version: None,
1261 variation: None,
1262 };
1263
1264 let feature = event_summary.features.get("non-existent-flag");
1265 assert!(feature.is_some());
1266
1267 let feature = feature.unwrap();
1268 assert!(feature.counters.contains_key(&variation_key));
1269 } else {
1270 panic!("Event should be a summary type");
1271 }
1272 }
1273
1274 #[tokio::test]
1275 async fn variation_detail_handles_debug_events_correctly() {
1276 let td = TestData::new();
1277 let mut flag = basic_flag("myFlag");
1278 flag.debug_events_until_date = Some(64_060_606_800_000); td.use_preconfigured_flag(flag);
1280 let (client, event_rx) = make_client_with_test_data(&td);
1281 client.start_with_default_executor();
1282
1283 let context = ContextBuilder::new("bob")
1284 .build()
1285 .expect("Failed to create context");
1286
1287 let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1288
1289 assert!(detail.value.unwrap().as_bool().unwrap());
1290 assert!(matches!(
1291 detail.reason,
1292 Reason::Fallthrough {
1293 in_experiment: false
1294 }
1295 ));
1296 client.flush();
1297 client.close();
1298
1299 let events = event_rx.try_iter().collect::<Vec<OutputEvent>>();
1300 assert_eq!(events.len(), 3);
1301 assert_eq!(events[0].kind(), "index");
1302 assert_eq!(events[1].kind(), "debug");
1303 assert_eq!(events[2].kind(), "summary");
1304
1305 if let OutputEvent::Summary(event_summary) = events[2].clone() {
1306 let variation_key = VariationKey {
1307 version: Some(1),
1308 variation: Some(1),
1309 };
1310
1311 let feature = event_summary.features.get("myFlag");
1312 assert!(feature.is_some());
1313
1314 let feature = feature.unwrap();
1315 assert!(feature.counters.contains_key(&variation_key));
1316 } else {
1317 panic!("Event should be a summary type");
1318 }
1319 }
1320
1321 #[tokio::test]
1322 async fn variation_detail_tracks_events_correctly() {
1323 let td = TestData::new();
1324 td.use_preconfigured_flag(basic_flag("myFlag"));
1325 let (client, event_rx) = make_client_with_test_data(&td);
1326 client.start_with_default_executor();
1327
1328 let context = ContextBuilder::new("bob")
1329 .build()
1330 .expect("Failed to create context");
1331
1332 let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1333
1334 assert!(detail.value.unwrap().as_bool().unwrap());
1335 assert!(matches!(
1336 detail.reason,
1337 Reason::Fallthrough {
1338 in_experiment: false
1339 }
1340 ));
1341 client.flush();
1342 client.close();
1343
1344 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1345 assert_eq!(events.len(), 2);
1346 assert_eq!(events[0].kind(), "index");
1347 assert_eq!(events[1].kind(), "summary");
1348
1349 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1350 let variation_key = VariationKey {
1351 version: Some(1),
1352 variation: Some(1),
1353 };
1354
1355 let feature = event_summary.features.get("myFlag");
1356 assert!(feature.is_some());
1357
1358 let feature = feature.unwrap();
1359 assert!(feature.counters.contains_key(&variation_key));
1360 } else {
1361 panic!("Event should be a summary type");
1362 }
1363 }
1364
1365 #[test]
1366 fn variation_detail_handles_offline_mode() {
1367 let (client, event_rx) = make_mocked_offline_client();
1368 client.start_with_default_executor();
1369
1370 let context = ContextBuilder::new("bob")
1371 .build()
1372 .expect("Failed to create context");
1373
1374 let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1375
1376 assert!(!detail.value.unwrap().as_bool().unwrap());
1377 assert!(matches!(
1378 detail.reason,
1379 Reason::Error {
1380 error: eval::Error::ClientNotReady
1381 }
1382 ));
1383 client.flush();
1384 client.close();
1385
1386 assert_eq!(event_rx.iter().count(), 0);
1387 }
1388
1389 struct InMemoryPersistentDataStoreFactory {
1390 data: AllData<Flag, Segment>,
1391 initialized: bool,
1392 }
1393
1394 impl PersistentDataStoreFactory for InMemoryPersistentDataStoreFactory {
1395 fn create_persistent_data_store(
1396 &self,
1397 ) -> Result<Box<dyn PersistentDataStore + 'static>, std::io::Error> {
1398 let serialized_data =
1399 AllData::<SerializedItem, SerializedItem>::try_from(self.data.clone())?;
1400 Ok(Box::new(InMemoryPersistentDataStore {
1401 data: serialized_data,
1402 initialized: self.initialized,
1403 }))
1404 }
1405 }
1406
1407 #[test]
1408 fn variation_detail_handles_daemon_mode() {
1409 testing_logger::setup();
1410 let factory = InMemoryPersistentDataStoreFactory {
1411 data: AllData {
1412 flags: hashmap!["flag".into() => basic_flag("flag")],
1413 segments: HashMap::new(),
1414 },
1415 initialized: true,
1416 };
1417 let builder = PersistentDataStoreBuilder::new(Arc::new(factory));
1418
1419 let config = ConfigBuilder::new("sdk-key")
1420 .daemon_mode(true)
1421 .data_store(&builder)
1422 .event_processor(&NullEventProcessorBuilder::new())
1423 .build()
1424 .expect("config should build");
1425
1426 let client = Client::build(config).expect("Should be built.");
1427
1428 client.start_with_default_executor();
1429
1430 let context = ContextBuilder::new("bob")
1431 .build()
1432 .expect("Failed to create context");
1433
1434 let detail = client.variation_detail(&context, "flag", FlagValue::Bool(false));
1435
1436 assert!(detail.value.unwrap().as_bool().unwrap());
1437 assert!(matches!(
1438 detail.reason,
1439 Reason::Fallthrough {
1440 in_experiment: false
1441 }
1442 ));
1443 client.flush();
1444 client.close();
1445
1446 testing_logger::validate(|captured_logs| {
1447 assert_eq!(captured_logs.len(), 1);
1448 assert_eq!(
1449 captured_logs[0].body,
1450 "Started LaunchDarkly Client in daemon mode"
1451 );
1452 });
1453 }
1454
1455 #[test]
1456 fn daemon_mode_is_quiet_if_store_is_not_initialized() {
1457 testing_logger::setup();
1458
1459 let factory = InMemoryPersistentDataStoreFactory {
1460 data: AllData {
1461 flags: HashMap::new(),
1462 segments: HashMap::new(),
1463 },
1464 initialized: false,
1465 };
1466 let builder = PersistentDataStoreBuilder::new(Arc::new(factory));
1467
1468 let config = ConfigBuilder::new("sdk-key")
1469 .daemon_mode(true)
1470 .data_store(&builder)
1471 .event_processor(&NullEventProcessorBuilder::new())
1472 .build()
1473 .expect("config should build");
1474
1475 let client = Client::build(config).expect("Should be built.");
1476
1477 client.start_with_default_executor();
1478
1479 let context = ContextBuilder::new("bob")
1480 .build()
1481 .expect("Failed to create context");
1482
1483 client.variation_detail(&context, "flag", FlagValue::Bool(false));
1484
1485 testing_logger::validate(|captured_logs| {
1486 assert_eq!(captured_logs.len(), 1);
1487 assert_eq!(
1488 captured_logs[0].body,
1489 "Started LaunchDarkly Client in daemon mode"
1490 );
1491 });
1492 }
1493
1494 #[tokio::test]
1495 async fn variation_handles_off_flag_without_variation() {
1496 let td = TestData::new();
1497 td.use_preconfigured_flag(basic_off_flag("myFlag"));
1498 let (client, event_rx) = make_client_with_test_data(&td);
1499 client.start_with_default_executor();
1500
1501 let context = ContextBuilder::new("bob")
1502 .build()
1503 .expect("Failed to create context");
1504
1505 let result = client.variation(&context, "myFlag", FlagValue::Bool(false));
1506
1507 assert!(!result.as_bool().unwrap());
1508 client.flush();
1509 client.close();
1510
1511 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1512 assert_eq!(events.len(), 2);
1513 assert_eq!(events[0].kind(), "index");
1514 assert_eq!(events[1].kind(), "summary");
1515
1516 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1517 let variation_key = VariationKey {
1518 version: Some(1),
1519 variation: None,
1520 };
1521 let feature = event_summary.features.get("myFlag");
1522 assert!(feature.is_some());
1523
1524 let feature = feature.unwrap();
1525 assert!(feature.counters.contains_key(&variation_key));
1526 } else {
1527 panic!("Event should be a summary type");
1528 }
1529 }
1530
1531 #[tokio::test]
1532 async fn variation_detail_tracks_prereq_events_correctly() {
1533 let td = TestData::new();
1534 let mut prereq_flag = basic_flag("prereqFlag");
1535 prereq_flag.track_events = true;
1536 td.use_preconfigured_flag(prereq_flag);
1537
1538 let mut main_flag = basic_flag_with_prereq("myFlag", "prereqFlag");
1539 main_flag.track_events = true;
1540 td.use_preconfigured_flag(main_flag);
1541
1542 let (client, event_rx) = make_client_with_test_data(&td);
1543 client.start_with_default_executor();
1544
1545 let context = ContextBuilder::new("bob")
1546 .build()
1547 .expect("Failed to create context");
1548
1549 let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1550
1551 assert!(detail.value.unwrap().as_bool().unwrap());
1552 assert!(matches!(
1553 detail.reason,
1554 Reason::Fallthrough {
1555 in_experiment: false
1556 }
1557 ));
1558 client.flush();
1559 client.close();
1560
1561 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1562 assert_eq!(events.len(), 4);
1563 assert_eq!(events[0].kind(), "index");
1564 assert_eq!(events[1].kind(), "feature");
1565 assert_eq!(events[2].kind(), "feature");
1566 assert_eq!(events[3].kind(), "summary");
1567
1568 if let OutputEvent::Summary(event_summary) = events[3].clone() {
1569 let variation_key = VariationKey {
1570 version: Some(1),
1571 variation: Some(1),
1572 };
1573 let feature = event_summary.features.get("myFlag");
1574 assert!(feature.is_some());
1575
1576 let feature = feature.unwrap();
1577 assert!(feature.counters.contains_key(&variation_key));
1578
1579 let variation_key = VariationKey {
1580 version: Some(1),
1581 variation: Some(1),
1582 };
1583 let feature = event_summary.features.get("prereqFlag");
1584 assert!(feature.is_some());
1585
1586 let feature = feature.unwrap();
1587 assert!(feature.counters.contains_key(&variation_key));
1588 }
1589 }
1590
1591 #[tokio::test]
1592 async fn variation_handles_failed_prereqs_correctly() {
1593 let td = TestData::new();
1594 let mut prereq_flag = basic_off_flag("prereqFlag");
1595 prereq_flag.track_events = true;
1596 td.use_preconfigured_flag(prereq_flag);
1597
1598 let mut main_flag = basic_flag_with_prereq("myFlag", "prereqFlag");
1599 main_flag.track_events = true;
1600 td.use_preconfigured_flag(main_flag);
1601
1602 let (client, event_rx) = make_client_with_test_data(&td);
1603 client.start_with_default_executor();
1604
1605 let context = ContextBuilder::new("bob")
1606 .build()
1607 .expect("Failed to create context");
1608
1609 let detail = client.variation(&context, "myFlag", FlagValue::Bool(false));
1610
1611 assert!(!detail.as_bool().unwrap());
1612 client.flush();
1613 client.close();
1614
1615 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1616 assert_eq!(events.len(), 4);
1617 assert_eq!(events[0].kind(), "index");
1618 assert_eq!(events[1].kind(), "feature");
1619 assert_eq!(events[2].kind(), "feature");
1620 assert_eq!(events[3].kind(), "summary");
1621
1622 if let OutputEvent::Summary(event_summary) = events[3].clone() {
1623 let variation_key = VariationKey {
1624 version: Some(1),
1625 variation: Some(0),
1626 };
1627 let feature = event_summary.features.get("myFlag");
1628 assert!(feature.is_some());
1629
1630 let feature = feature.unwrap();
1631 assert!(feature.counters.contains_key(&variation_key));
1632
1633 let variation_key = VariationKey {
1634 version: Some(1),
1635 variation: None,
1636 };
1637 let feature = event_summary.features.get("prereqFlag");
1638 assert!(feature.is_some());
1639
1640 let feature = feature.unwrap();
1641 assert!(feature.counters.contains_key(&variation_key));
1642 }
1643 }
1644
1645 #[test]
1646 fn variation_detail_handles_flag_not_found() {
1647 let (client, event_rx) = make_mocked_client();
1648 client.start_with_default_executor();
1649
1650 let context = ContextBuilder::new("bob")
1651 .build()
1652 .expect("Failed to create context");
1653 let detail = client.variation_detail(&context, "non-existent-flag", FlagValue::Bool(false));
1654
1655 assert!(!detail.value.unwrap().as_bool().unwrap());
1656 assert!(matches!(
1657 detail.reason,
1658 Reason::Error {
1659 error: eval::Error::FlagNotFound
1660 }
1661 ));
1662 client.flush();
1663 client.close();
1664
1665 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1666 assert_eq!(events.len(), 2);
1667 assert_eq!(events[0].kind(), "index");
1668 assert_eq!(events[1].kind(), "summary");
1669
1670 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1671 let variation_key = VariationKey {
1672 version: None,
1673 variation: None,
1674 };
1675 let feature = event_summary.features.get("non-existent-flag");
1676 assert!(feature.is_some());
1677
1678 let feature = feature.unwrap();
1679 assert!(feature.counters.contains_key(&variation_key));
1680 } else {
1681 panic!("Event should be a summary type");
1682 }
1683 }
1684
1685 #[tokio::test]
1686 async fn variation_detail_handles_client_not_ready() {
1687 let (client, event_rx) = make_mocked_client_with_delay(u64::MAX, false, false);
1688 client.start_with_default_executor();
1689 let context = ContextBuilder::new("bob")
1690 .build()
1691 .expect("Failed to create context");
1692
1693 let detail = client.variation_detail(&context, "non-existent-flag", FlagValue::Bool(false));
1694
1695 assert!(!detail.value.unwrap().as_bool().unwrap());
1696 assert!(matches!(
1697 detail.reason,
1698 Reason::Error {
1699 error: eval::Error::ClientNotReady
1700 }
1701 ));
1702 client.flush();
1703 client.close();
1704
1705 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1706 assert_eq!(events.len(), 2);
1707 assert_eq!(events[0].kind(), "index");
1708 assert_eq!(events[1].kind(), "summary");
1709
1710 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1711 let variation_key = VariationKey {
1712 version: None,
1713 variation: None,
1714 };
1715 let feature = event_summary.features.get("non-existent-flag");
1716 assert!(feature.is_some());
1717
1718 let feature = feature.unwrap();
1719 assert!(feature.counters.contains_key(&variation_key));
1720 } else {
1721 panic!("Event should be a summary type");
1722 }
1723 }
1724
1725 #[test]
1726 fn identify_sends_identify_event() {
1727 let (client, event_rx) = make_mocked_client();
1728 client.start_with_default_executor();
1729
1730 let context = ContextBuilder::new("bob")
1731 .build()
1732 .expect("Failed to create context");
1733
1734 client.identify(context);
1735 client.flush();
1736 client.close();
1737
1738 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1739 assert_eq!(events.len(), 1);
1740 assert_eq!(events[0].kind(), "identify");
1741 }
1742
1743 #[test]
1744 fn identify_sends_sends_nothing_in_offline_mode() {
1745 let (client, event_rx) = make_mocked_offline_client();
1746 client.start_with_default_executor();
1747
1748 let context = ContextBuilder::new("bob")
1749 .build()
1750 .expect("Failed to create context");
1751
1752 client.identify(context);
1753 client.flush();
1754 client.close();
1755
1756 assert_eq!(event_rx.iter().count(), 0);
1757 }
1758
1759 #[test]
1760 #[cfg(any(feature = "crypto-aws-lc-rs", feature = "crypto-openssl"))]
1761 fn secure_mode_hash() {
1762 let config = ConfigBuilder::new("secret")
1763 .offline(true)
1764 .build()
1765 .expect("config should build");
1766 let client = Client::build(config).expect("Should be built.");
1767 let context = ContextBuilder::new("Message")
1768 .build()
1769 .expect("Failed to create context");
1770
1771 assert_eq!(
1772 client
1773 .secure_mode_hash(&context)
1774 .expect("Hash should be computed"),
1775 "aa747c502a898200f9e4fa21bac68136f886a0e27aec70ba06daf2e2a5cb5597"
1776 );
1777 }
1778
1779 #[test]
1780 #[cfg(any(feature = "crypto-aws-lc-rs", feature = "crypto-openssl"))]
1781 fn secure_mode_hash_with_multi_kind() {
1782 let config = ConfigBuilder::new("secret")
1783 .offline(true)
1784 .build()
1785 .expect("config should build");
1786 let client = Client::build(config).expect("Should be built.");
1787
1788 let org = ContextBuilder::new("org-key|1")
1789 .kind("org")
1790 .build()
1791 .expect("Failed to create context");
1792 let user = ContextBuilder::new("user-key:2")
1793 .build()
1794 .expect("Failed to create context");
1795
1796 let context = MultiContextBuilder::new()
1797 .add_context(org)
1798 .add_context(user)
1799 .build()
1800 .expect("failed to build multi-context");
1801
1802 assert_eq!(
1803 client
1804 .secure_mode_hash(&context)
1805 .expect("Hash should be computed"),
1806 "5687e6383b920582ed50c2a96c98a115f1b6aad85a60579d761d9b8797415163"
1807 );
1808 }
1809
1810 #[derive(Serialize)]
1811 struct MyCustomData {
1812 pub answer: u32,
1813 }
1814
1815 #[test]
1816 fn track_sends_track_and_index_events() -> serde_json::Result<()> {
1817 let (client, event_rx) = make_mocked_client();
1818 client.start_with_default_executor();
1819
1820 let context = ContextBuilder::new("bob")
1821 .build()
1822 .expect("Failed to create context");
1823
1824 client.track_event(context.clone(), "event-with-null");
1825 client.track_data(context.clone(), "event-with-string", "string-data")?;
1826 client.track_data(context.clone(), "event-with-json", json!({"answer": 42}))?;
1827 client.track_data(
1828 context.clone(),
1829 "event-with-struct",
1830 MyCustomData { answer: 42 },
1831 )?;
1832 client.track_metric(context, "event-with-metric", 42.0, serde_json::Value::Null);
1833
1834 client.flush();
1835 client.close();
1836
1837 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1838 assert_eq!(events.len(), 6);
1839
1840 let mut events_by_type: HashMap<&str, usize> = HashMap::new();
1841 for event in events {
1842 if let Some(count) = events_by_type.get_mut(event.kind()) {
1843 *count += 1;
1844 } else {
1845 events_by_type.insert(event.kind(), 1);
1846 }
1847 }
1848 assert!(matches!(events_by_type.get("index"), Some(1)));
1849 assert!(matches!(events_by_type.get("custom"), Some(5)));
1850
1851 Ok(())
1852 }
1853
1854 #[test]
1855 fn track_sends_nothing_in_offline_mode() -> serde_json::Result<()> {
1856 let (client, event_rx) = make_mocked_offline_client();
1857 client.start_with_default_executor();
1858
1859 let context = ContextBuilder::new("bob")
1860 .build()
1861 .expect("Failed to create context");
1862
1863 client.track_event(context.clone(), "event-with-null");
1864 client.track_data(context.clone(), "event-with-string", "string-data")?;
1865 client.track_data(context.clone(), "event-with-json", json!({"answer": 42}))?;
1866 client.track_data(
1867 context.clone(),
1868 "event-with-struct",
1869 MyCustomData { answer: 42 },
1870 )?;
1871 client.track_metric(context, "event-with-metric", 42.0, serde_json::Value::Null);
1872
1873 client.flush();
1874 client.close();
1875
1876 assert_eq!(event_rx.iter().count(), 0);
1877
1878 Ok(())
1879 }
1880
1881 #[test]
1882 fn migration_handles_flag_not_found() {
1883 let (client, _event_rx) = make_mocked_client();
1884 client.start_with_default_executor();
1885
1886 let context = ContextBuilder::new("bob")
1887 .build()
1888 .expect("Failed to create context");
1889
1890 let (stage, _tracker) =
1891 client.migration_variation(&context, "non-existent-flag-key", Stage::Off);
1892
1893 assert_eq!(stage, Stage::Off);
1894 }
1895
1896 #[tokio::test]
1897 async fn migration_uses_non_migration_flag() {
1898 let td = TestData::new();
1899 td.use_preconfigured_flag(basic_flag("boolean-flag"));
1900 let (client, _event_rx) = make_client_with_test_data(&td);
1901 client.start_with_default_executor();
1902
1903 let context = ContextBuilder::new("bob")
1904 .build()
1905 .expect("Failed to create context");
1906
1907 let (stage, _tracker) = client.migration_variation(&context, "boolean-flag", Stage::Off);
1908
1909 assert_eq!(stage, Stage::Off);
1910 }
1911
1912 #[test_case(Stage::Off)]
1913 #[test_case(Stage::DualWrite)]
1914 #[test_case(Stage::Shadow)]
1915 #[test_case(Stage::Live)]
1916 #[test_case(Stage::Rampdown)]
1917 #[test_case(Stage::Complete)]
1918 #[tokio::test]
1919 async fn migration_can_determine_correct_stage_from_flag(stage: Stage) {
1920 let td = TestData::new();
1921 td.use_preconfigured_flag(basic_migration_flag("stage-flag", stage));
1922 let (client, _event_rx) = make_client_with_test_data(&td);
1923 client.start_with_default_executor();
1924
1925 let context = ContextBuilder::new("bob")
1926 .build()
1927 .expect("Failed to create context");
1928
1929 let (evaluated_stage, _tracker) =
1930 client.migration_variation(&context, "stage-flag", Stage::Off);
1931
1932 assert_eq!(evaluated_stage, stage);
1933 }
1934
1935 #[tokio::test]
1936 async fn migration_tracks_invoked_correctly() {
1937 migration_tracks_invoked_correctly_driver(Stage::Off, Operation::Read, vec![Origin::Old])
1938 .await;
1939 migration_tracks_invoked_correctly_driver(
1940 Stage::DualWrite,
1941 Operation::Read,
1942 vec![Origin::Old],
1943 )
1944 .await;
1945 migration_tracks_invoked_correctly_driver(
1946 Stage::Shadow,
1947 Operation::Read,
1948 vec![Origin::Old, Origin::New],
1949 )
1950 .await;
1951 migration_tracks_invoked_correctly_driver(
1952 Stage::Live,
1953 Operation::Read,
1954 vec![Origin::Old, Origin::New],
1955 )
1956 .await;
1957 migration_tracks_invoked_correctly_driver(
1958 Stage::Rampdown,
1959 Operation::Read,
1960 vec![Origin::New],
1961 )
1962 .await;
1963 migration_tracks_invoked_correctly_driver(
1964 Stage::Complete,
1965 Operation::Read,
1966 vec![Origin::New],
1967 )
1968 .await;
1969 migration_tracks_invoked_correctly_driver(Stage::Off, Operation::Write, vec![Origin::Old])
1970 .await;
1971 migration_tracks_invoked_correctly_driver(
1972 Stage::DualWrite,
1973 Operation::Write,
1974 vec![Origin::Old, Origin::New],
1975 )
1976 .await;
1977 migration_tracks_invoked_correctly_driver(
1978 Stage::Shadow,
1979 Operation::Write,
1980 vec![Origin::Old, Origin::New],
1981 )
1982 .await;
1983 migration_tracks_invoked_correctly_driver(
1984 Stage::Live,
1985 Operation::Write,
1986 vec![Origin::Old, Origin::New],
1987 )
1988 .await;
1989 migration_tracks_invoked_correctly_driver(
1990 Stage::Rampdown,
1991 Operation::Write,
1992 vec![Origin::Old, Origin::New],
1993 )
1994 .await;
1995 migration_tracks_invoked_correctly_driver(
1996 Stage::Complete,
1997 Operation::Write,
1998 vec![Origin::New],
1999 )
2000 .await;
2001 }
2002
2003 async fn migration_tracks_invoked_correctly_driver(
2004 stage: Stage,
2005 operation: Operation,
2006 origins: Vec<Origin>,
2007 ) {
2008 let td = TestData::new();
2009 td.use_preconfigured_flag(basic_migration_flag("stage-flag", stage));
2010 let (client, event_rx) = make_client_with_test_data(&td);
2011 let client = Arc::new(client);
2012 client.start_with_default_executor();
2013
2014 let mut migrator = MigratorBuilder::new(client.clone())
2015 .read(
2016 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2017 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2018 Some(|_, _| true),
2019 )
2020 .write(
2021 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2022 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2023 )
2024 .build()
2025 .expect("migrator should build");
2026
2027 let context = ContextBuilder::new("bob")
2028 .build()
2029 .expect("Failed to create context");
2030
2031 if let Operation::Read = operation {
2032 migrator
2033 .read(
2034 &context,
2035 "stage-flag".into(),
2036 Stage::Off,
2037 serde_json::Value::Null,
2038 )
2039 .await;
2040 } else {
2041 migrator
2042 .write(
2043 &context,
2044 "stage-flag".into(),
2045 Stage::Off,
2046 serde_json::Value::Null,
2047 )
2048 .await;
2049 }
2050
2051 client.flush();
2052 client.close();
2053
2054 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2055 assert_eq!(events.len(), 3);
2056 match &events[1] {
2057 OutputEvent::MigrationOp(event) => {
2058 assert!(event.invoked.len() == origins.len());
2059 assert!(event.invoked.iter().all(|i| origins.contains(i)));
2060 }
2061 _ => panic!("Expected migration event"),
2062 }
2063 }
2064
2065 #[tokio::test]
2066 async fn migration_tracks_latency() {
2067 migration_tracks_latency_driver(Stage::Off, Operation::Read, vec![Origin::Old]).await;
2068 migration_tracks_latency_driver(Stage::DualWrite, Operation::Read, vec![Origin::Old]).await;
2069 migration_tracks_latency_driver(
2070 Stage::Shadow,
2071 Operation::Read,
2072 vec![Origin::Old, Origin::New],
2073 )
2074 .await;
2075 migration_tracks_latency_driver(
2076 Stage::Live,
2077 Operation::Read,
2078 vec![Origin::Old, Origin::New],
2079 )
2080 .await;
2081 migration_tracks_latency_driver(Stage::Rampdown, Operation::Read, vec![Origin::New]).await;
2082 migration_tracks_latency_driver(Stage::Complete, Operation::Read, vec![Origin::New]).await;
2083 migration_tracks_latency_driver(Stage::Off, Operation::Write, vec![Origin::Old]).await;
2084 migration_tracks_latency_driver(
2085 Stage::DualWrite,
2086 Operation::Write,
2087 vec![Origin::Old, Origin::New],
2088 )
2089 .await;
2090 migration_tracks_latency_driver(
2091 Stage::Shadow,
2092 Operation::Write,
2093 vec![Origin::Old, Origin::New],
2094 )
2095 .await;
2096 migration_tracks_latency_driver(
2097 Stage::Live,
2098 Operation::Write,
2099 vec![Origin::Old, Origin::New],
2100 )
2101 .await;
2102 migration_tracks_latency_driver(
2103 Stage::Rampdown,
2104 Operation::Write,
2105 vec![Origin::Old, Origin::New],
2106 )
2107 .await;
2108 migration_tracks_latency_driver(Stage::Complete, Operation::Write, vec![Origin::New]).await;
2109 }
2110
2111 async fn migration_tracks_latency_driver(
2112 stage: Stage,
2113 operation: Operation,
2114 origins: Vec<Origin>,
2115 ) {
2116 let td = TestData::new();
2117 td.use_preconfigured_flag(basic_migration_flag("stage-flag", stage));
2118 let (client, event_rx) = make_client_with_test_data(&td);
2119 let client = Arc::new(client);
2120 client.start_with_default_executor();
2121
2122 let mut migrator = MigratorBuilder::new(client.clone())
2123 .track_latency(true)
2124 .read(
2125 |_| {
2126 async move {
2127 async_std::task::sleep(Duration::from_millis(100)).await;
2128 Ok(serde_json::Value::Null)
2129 }
2130 .boxed()
2131 },
2132 |_| {
2133 async move {
2134 async_std::task::sleep(Duration::from_millis(100)).await;
2135 Ok(serde_json::Value::Null)
2136 }
2137 .boxed()
2138 },
2139 Some(|_, _| true),
2140 )
2141 .write(
2142 |_| {
2143 async move {
2144 async_std::task::sleep(Duration::from_millis(100)).await;
2145 Ok(serde_json::Value::Null)
2146 }
2147 .boxed()
2148 },
2149 |_| {
2150 async move {
2151 async_std::task::sleep(Duration::from_millis(100)).await;
2152 Ok(serde_json::Value::Null)
2153 }
2154 .boxed()
2155 },
2156 )
2157 .build()
2158 .expect("migrator should build");
2159
2160 let context = ContextBuilder::new("bob")
2161 .build()
2162 .expect("Failed to create context");
2163
2164 if let Operation::Read = operation {
2165 migrator
2166 .read(
2167 &context,
2168 "stage-flag".into(),
2169 Stage::Off,
2170 serde_json::Value::Null,
2171 )
2172 .await;
2173 } else {
2174 migrator
2175 .write(
2176 &context,
2177 "stage-flag".into(),
2178 Stage::Off,
2179 serde_json::Value::Null,
2180 )
2181 .await;
2182 }
2183
2184 client.flush();
2185 client.close();
2186
2187 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2188 assert_eq!(events.len(), 3);
2189 match &events[1] {
2190 OutputEvent::MigrationOp(event) => {
2191 assert!(event.latency.len() == origins.len());
2192 assert!(event
2193 .latency
2194 .values()
2195 .all(|l| l > &Duration::from_millis(100)));
2196 }
2197 _ => panic!("Expected migration event"),
2198 }
2199 }
2200
2201 #[tokio::test]
2202 async fn migration_tracks_read_errors() {
2203 migration_tracks_read_errors_driver(Stage::Off, vec![Origin::Old]).await;
2204 migration_tracks_read_errors_driver(Stage::DualWrite, vec![Origin::Old]).await;
2205 migration_tracks_read_errors_driver(Stage::Shadow, vec![Origin::Old, Origin::New]).await;
2206 migration_tracks_read_errors_driver(Stage::Live, vec![Origin::Old, Origin::New]).await;
2207 migration_tracks_read_errors_driver(Stage::Rampdown, vec![Origin::New]).await;
2208 migration_tracks_read_errors_driver(Stage::Complete, vec![Origin::New]).await;
2209 }
2210
2211 async fn migration_tracks_read_errors_driver(stage: Stage, origins: Vec<Origin>) {
2212 let td = TestData::new();
2213 td.use_preconfigured_flag(basic_migration_flag("stage-flag", stage));
2214 let (client, event_rx) = make_client_with_test_data(&td);
2215 let client = Arc::new(client);
2216 client.start_with_default_executor();
2217
2218 let mut migrator = MigratorBuilder::new(client.clone())
2219 .track_latency(true)
2220 .read(
2221 |_| async move { Err("fail".into()) }.boxed(),
2222 |_| async move { Err("fail".into()) }.boxed(),
2223 Some(|_: &String, _: &String| true),
2224 )
2225 .write(
2226 |_| async move { Err("fail".into()) }.boxed(),
2227 |_| async move { Err("fail".into()) }.boxed(),
2228 )
2229 .build()
2230 .expect("migrator should build");
2231
2232 let context = ContextBuilder::new("bob")
2233 .build()
2234 .expect("Failed to create context");
2235
2236 migrator
2237 .read(
2238 &context,
2239 "stage-flag".into(),
2240 Stage::Off,
2241 serde_json::Value::Null,
2242 )
2243 .await;
2244 client.flush();
2245 client.close();
2246
2247 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2248 assert_eq!(events.len(), 3);
2249 match &events[1] {
2250 OutputEvent::MigrationOp(event) => {
2251 assert!(event.errors.len() == origins.len());
2252 assert!(event.errors.iter().all(|i| origins.contains(i)));
2253 }
2254 _ => panic!("Expected migration event"),
2255 }
2256 }
2257
2258 #[tokio::test]
2259 async fn migration_tracks_authoritative_write_errors() {
2260 migration_tracks_authoritative_write_errors_driver(Stage::Off, vec![Origin::Old]).await;
2261 migration_tracks_authoritative_write_errors_driver(Stage::DualWrite, vec![Origin::Old])
2262 .await;
2263 migration_tracks_authoritative_write_errors_driver(Stage::Shadow, vec![Origin::Old]).await;
2264 migration_tracks_authoritative_write_errors_driver(Stage::Live, vec![Origin::New]).await;
2265 migration_tracks_authoritative_write_errors_driver(Stage::Rampdown, vec![Origin::New])
2266 .await;
2267 migration_tracks_authoritative_write_errors_driver(Stage::Complete, vec![Origin::New])
2268 .await;
2269 }
2270
2271 async fn migration_tracks_authoritative_write_errors_driver(
2272 stage: Stage,
2273 origins: Vec<Origin>,
2274 ) {
2275 let td = TestData::new();
2276 td.use_preconfigured_flag(basic_migration_flag("stage-flag", stage));
2277 let (client, event_rx) = make_client_with_test_data(&td);
2278 let client = Arc::new(client);
2279 client.start_with_default_executor();
2280
2281 let mut migrator = MigratorBuilder::new(client.clone())
2282 .track_latency(true)
2283 .read(
2284 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2285 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2286 None,
2287 )
2288 .write(
2289 |_| async move { Err("fail".into()) }.boxed(),
2290 |_| async move { Err("fail".into()) }.boxed(),
2291 )
2292 .build()
2293 .expect("migrator should build");
2294
2295 let context = ContextBuilder::new("bob")
2296 .build()
2297 .expect("Failed to create context");
2298
2299 migrator
2300 .write(
2301 &context,
2302 "stage-flag".into(),
2303 Stage::Off,
2304 serde_json::Value::Null,
2305 )
2306 .await;
2307
2308 client.flush();
2309 client.close();
2310
2311 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2312 assert_eq!(events.len(), 3);
2313 match &events[1] {
2314 OutputEvent::MigrationOp(event) => {
2315 assert!(event.errors.len() == origins.len());
2316 assert!(event.errors.iter().all(|i| origins.contains(i)));
2317 }
2318 _ => panic!("Expected migration event"),
2319 }
2320 }
2321
2322 #[tokio::test]
2323 async fn migration_tracks_nonauthoritative_write_errors() {
2324 migration_tracks_nonauthoritative_write_errors_driver(
2325 Stage::DualWrite,
2326 false,
2327 true,
2328 vec![Origin::New],
2329 )
2330 .await;
2331 migration_tracks_nonauthoritative_write_errors_driver(
2332 Stage::Shadow,
2333 false,
2334 true,
2335 vec![Origin::New],
2336 )
2337 .await;
2338 migration_tracks_nonauthoritative_write_errors_driver(
2339 Stage::Live,
2340 true,
2341 false,
2342 vec![Origin::Old],
2343 )
2344 .await;
2345 migration_tracks_nonauthoritative_write_errors_driver(
2346 Stage::Rampdown,
2347 true,
2348 false,
2349 vec![Origin::Old],
2350 )
2351 .await;
2352 }
2353
2354 async fn migration_tracks_nonauthoritative_write_errors_driver(
2355 stage: Stage,
2356 fail_old: bool,
2357 fail_new: bool,
2358 origins: Vec<Origin>,
2359 ) {
2360 let td = TestData::new();
2361 td.use_preconfigured_flag(basic_migration_flag("stage-flag", stage));
2362 let (client, event_rx) = make_client_with_test_data(&td);
2363 let client = Arc::new(client);
2364 client.start_with_default_executor();
2365
2366 let mut migrator = MigratorBuilder::new(client.clone())
2367 .track_latency(true)
2368 .read(
2369 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2370 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2371 None,
2372 )
2373 .write(
2374 move |_| {
2375 async move {
2376 if fail_old {
2377 Err("fail".into())
2378 } else {
2379 Ok(serde_json::Value::Null)
2380 }
2381 }
2382 .boxed()
2383 },
2384 move |_| {
2385 async move {
2386 if fail_new {
2387 Err("fail".into())
2388 } else {
2389 Ok(serde_json::Value::Null)
2390 }
2391 }
2392 .boxed()
2393 },
2394 )
2395 .build()
2396 .expect("migrator should build");
2397
2398 let context = ContextBuilder::new("bob")
2399 .build()
2400 .expect("Failed to create context");
2401
2402 migrator
2403 .write(
2404 &context,
2405 "stage-flag".into(),
2406 Stage::Off,
2407 serde_json::Value::Null,
2408 )
2409 .await;
2410
2411 client.flush();
2412 client.close();
2413
2414 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2415 assert_eq!(events.len(), 3);
2416 match &events[1] {
2417 OutputEvent::MigrationOp(event) => {
2418 assert!(event.errors.len() == origins.len());
2419 assert!(event.errors.iter().all(|i| origins.contains(i)));
2420 }
2421 _ => panic!("Expected migration event"),
2422 }
2423 }
2424
2425 #[tokio::test]
2426 async fn migration_tracks_consistency() {
2427 migration_tracks_consistency_driver(Stage::Shadow, "same", "same", true).await;
2428 migration_tracks_consistency_driver(Stage::Shadow, "same", "different", false).await;
2429 migration_tracks_consistency_driver(Stage::Live, "same", "same", true).await;
2430 migration_tracks_consistency_driver(Stage::Live, "same", "different", false).await;
2431 }
2432
2433 async fn migration_tracks_consistency_driver(
2434 stage: Stage,
2435 old_return: &'static str,
2436 new_return: &'static str,
2437 expected_consistency: bool,
2438 ) {
2439 let td = TestData::new();
2440 td.use_preconfigured_flag(basic_migration_flag("stage-flag", stage));
2441 let (client, event_rx) = make_client_with_test_data(&td);
2442 let client = Arc::new(client);
2443 client.start_with_default_executor();
2444
2445 let mut migrator = MigratorBuilder::new(client.clone())
2446 .track_latency(true)
2447 .read(
2448 |_| {
2449 async move {
2450 async_std::task::sleep(Duration::from_millis(100)).await;
2451 Ok(serde_json::Value::String(old_return.to_string()))
2452 }
2453 .boxed()
2454 },
2455 |_| {
2456 async move {
2457 async_std::task::sleep(Duration::from_millis(100)).await;
2458 Ok(serde_json::Value::String(new_return.to_string()))
2459 }
2460 .boxed()
2461 },
2462 Some(|lhs, rhs| lhs == rhs),
2463 )
2464 .write(
2465 |_| {
2466 async move {
2467 async_std::task::sleep(Duration::from_millis(100)).await;
2468 Ok(serde_json::Value::Null)
2469 }
2470 .boxed()
2471 },
2472 |_| {
2473 async move {
2474 async_std::task::sleep(Duration::from_millis(100)).await;
2475 Ok(serde_json::Value::Null)
2476 }
2477 .boxed()
2478 },
2479 )
2480 .build()
2481 .expect("migrator should build");
2482
2483 let context = ContextBuilder::new("bob")
2484 .build()
2485 .expect("Failed to create context");
2486
2487 migrator
2488 .read(
2489 &context,
2490 "stage-flag".into(),
2491 Stage::Off,
2492 serde_json::Value::Null,
2493 )
2494 .await;
2495
2496 client.flush();
2497 client.close();
2498
2499 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2500 assert_eq!(events.len(), 3);
2501 match &events[1] {
2502 OutputEvent::MigrationOp(event) => {
2503 assert!(event.consistency_check == Some(expected_consistency))
2504 }
2505 _ => panic!("Expected migration event"),
2506 }
2507 }
2508
2509 #[tokio::test]
2510 async fn client_flush_blocking_completes_successfully() {
2511 let (client, event_rx) = make_mocked_client();
2512 client.start_with_default_executor();
2513 client.wait_for_initialization(Duration::from_secs(1)).await;
2514
2515 let context = ContextBuilder::new("user-key")
2516 .build()
2517 .expect("Failed to create context");
2518
2519 client.identify(context);
2520
2521 let result = client.flush_blocking(Duration::from_secs(5)).await;
2522 assert!(result, "flush_blocking should complete successfully");
2523
2524 client.close();
2525
2526 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2527 assert!(!events.is_empty(), "Should have received identify event");
2528 }
2529
2530 #[tokio::test]
2531 async fn client_flush_blocking_with_zero_timeout() {
2532 let (client, event_rx) = make_mocked_client();
2533 client.start_with_default_executor();
2534 client.wait_for_initialization(Duration::from_secs(1)).await;
2535
2536 let context = ContextBuilder::new("user-key")
2537 .build()
2538 .expect("Failed to create context");
2539
2540 client.identify(context);
2541
2542 let result = client.flush_blocking(Duration::ZERO).await;
2543 assert!(
2544 result,
2545 "flush_blocking with zero timeout should complete successfully"
2546 );
2547
2548 client.close();
2549
2550 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2551 assert!(!events.is_empty(), "Should have received identify event");
2552 }
2553
2554 #[tokio::test]
2555 async fn client_flush_blocking_with_no_events() {
2556 let (client, _event_rx) = make_mocked_client();
2557 client.start_with_default_executor();
2558 client.wait_for_initialization(Duration::from_secs(1)).await;
2559
2560 let result = client.flush_blocking(Duration::from_secs(1)).await;
2561 assert!(
2562 result,
2563 "flush_blocking with no events should complete immediately"
2564 );
2565
2566 client.close();
2567 }
2568
2569 #[tokio::test]
2570 async fn client_flush_blocking_multiple_concurrent_calls() {
2571 let (client, event_rx) = make_mocked_client();
2572 client.start_with_default_executor();
2573 client.wait_for_initialization(Duration::from_secs(1)).await;
2574
2575 let context = ContextBuilder::new("user-key")
2576 .build()
2577 .expect("Failed to create context");
2578
2579 client.identify(context);
2580
2581 let (result1, result2, result3) = tokio::join!(
2583 client.flush_blocking(Duration::from_secs(5)),
2584 client.flush_blocking(Duration::from_secs(5)),
2585 client.flush_blocking(Duration::from_secs(5)),
2586 );
2587
2588 assert!(result1, "First flush_blocking should succeed");
2589 assert!(result2, "Second flush_blocking should succeed");
2590 assert!(result3, "Third flush_blocking should succeed");
2591
2592 client.close();
2593
2594 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2595 assert!(!events.is_empty(), "Should have received identify event");
2596 }
2597
2598 fn make_mocked_client_with_delay(
2599 delay: u64,
2600 offline: bool,
2601 daemon_mode: bool,
2602 ) -> (Client, Receiver<OutputEvent>) {
2603 let updates = Arc::new(MockDataSource::new_with_init_delay(delay));
2604 let (event_sender, event_rx) = create_event_sender();
2605
2606 let config = ConfigBuilder::new("sdk-key")
2607 .offline(offline)
2608 .daemon_mode(daemon_mode)
2609 .data_source(MockDataSourceBuilder::new().data_source(updates))
2610 .event_processor(
2611 EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new()
2612 .event_sender(Arc::new(event_sender)),
2613 )
2614 .build()
2615 .expect("config should build");
2616
2617 let client = Client::build(config).expect("Should be built.");
2618
2619 (client, event_rx)
2620 }
2621
2622 fn make_mocked_offline_client() -> (Client, Receiver<OutputEvent>) {
2623 make_mocked_client_with_delay(0, true, false)
2624 }
2625
2626 fn make_mocked_client() -> (Client, Receiver<OutputEvent>) {
2627 make_mocked_client_with_delay(0, false, false)
2628 }
2629
2630 fn make_client_with_test_data(td: &TestData) -> (Client, Receiver<OutputEvent>) {
2631 let (event_sender, event_rx) = create_event_sender();
2632 let config = ConfigBuilder::new("sdk-key")
2633 .data_source(td)
2634 .event_processor(
2635 EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new()
2636 .event_sender(Arc::new(event_sender)),
2637 )
2638 .build()
2639 .expect("config should build");
2640 let client = Client::build(config).expect("Should be built.");
2641 (client, event_rx)
2642 }
2643
2644 #[test]
2645 fn client_builds_successfully() {
2646 let config = ConfigBuilder::new("sdk-key")
2647 .offline(true)
2648 .build()
2649 .expect("config should build");
2650
2651 let client = Client::build(config).expect("client should build successfully");
2652
2653 assert!(
2654 !client.started.load(Ordering::SeqCst),
2655 "client should not be started yet"
2656 );
2657 assert!(client.offline, "client should be in offline mode");
2658 assert_eq!(client.sdk_key, "sdk-key", "sdk_key should match");
2659 }
2660}