Skip to main content

launchdarkly_server_sdk/
client.rs

1use eval::Context;
2use parking_lot::RwLock;
3use std::io;
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7use tokio::runtime::Runtime;
8
9use launchdarkly_server_sdk_evaluation::{self as eval, Detail, FlagValue, PrerequisiteEvent};
10use serde::Serialize;
11use thiserror::Error;
12use tokio::sync::{broadcast, Semaphore};
13
14use super::config::Config;
15use super::data_source::DataSource;
16use super::data_source_builders::BuildError as DataSourceError;
17use super::evaluation::{FlagDetail, FlagDetailConfig};
18use super::stores::store::DataStore;
19use super::stores::store_builders::BuildError as DataStoreError;
20use crate::config::BuildError as ConfigBuildError;
21use crate::events::event::EventFactory;
22use crate::events::event::InputEvent;
23use crate::events::processor::EventProcessor;
24use crate::events::processor_builders::BuildError as EventProcessorError;
25use crate::{MigrationOpTracker, Stage};
26
27struct EventsScope {
28    disabled: bool,
29    event_factory: EventFactory,
30    prerequisite_event_recorder: Box<dyn eval::PrerequisiteEventRecorder + Send + Sync>,
31}
32
33struct PrerequisiteEventRecorder {
34    event_factory: EventFactory,
35    event_processor: Arc<dyn EventProcessor>,
36}
37
38impl eval::PrerequisiteEventRecorder for PrerequisiteEventRecorder {
39    fn record(&self, event: PrerequisiteEvent) {
40        let evt = self.event_factory.new_eval_event(
41            &event.prerequisite_flag.key,
42            event.context.clone(),
43            &event.prerequisite_flag,
44            event.prerequisite_result,
45            FlagValue::Json(serde_json::Value::Null),
46            Some(event.target_flag_key),
47        );
48
49        self.event_processor.send(evt);
50    }
51}
52
53/// Error type used to represent failures when building a [Client] instance.
54#[non_exhaustive]
55#[derive(Debug, Error)]
56pub enum BuildError {
57    /// Error used when a configuration setting is invalid. This typically indicates an invalid URL.
58    #[error("invalid client config: {0}")]
59    InvalidConfig(String),
60}
61
62impl From<DataSourceError> for BuildError {
63    fn from(error: DataSourceError) -> Self {
64        Self::InvalidConfig(error.to_string())
65    }
66}
67
68impl From<DataStoreError> for BuildError {
69    fn from(error: DataStoreError) -> Self {
70        Self::InvalidConfig(error.to_string())
71    }
72}
73
74impl From<EventProcessorError> for BuildError {
75    fn from(error: EventProcessorError) -> Self {
76        Self::InvalidConfig(error.to_string())
77    }
78}
79
80impl From<ConfigBuildError> for BuildError {
81    fn from(error: ConfigBuildError) -> Self {
82        Self::InvalidConfig(error.to_string())
83    }
84}
85
86/// Error type used to represent failures when starting the [Client].
87#[non_exhaustive]
88#[derive(Debug, Error)]
89pub enum StartError {
90    /// Error used when spawning a background there fails.
91    #[error("couldn't spawn background thread for client: {0}")]
92    SpawnFailed(io::Error),
93}
94
95#[derive(PartialEq, Copy, Clone, Debug)]
96enum ClientInitState {
97    Initializing = 0,
98    Initialized = 1,
99    InitializationFailed = 2,
100}
101
102impl PartialEq<usize> for ClientInitState {
103    fn eq(&self, other: &usize) -> bool {
104        *self as usize == *other
105    }
106}
107
108impl From<usize> for ClientInitState {
109    fn from(val: usize) -> Self {
110        match val {
111            0 => ClientInitState::Initializing,
112            1 => ClientInitState::Initialized,
113            2 => ClientInitState::InitializationFailed,
114            _ => unreachable!(),
115        }
116    }
117}
118
119/// A client for the LaunchDarkly API.
120///
121/// In order to create a client instance, first create a config using [crate::ConfigBuilder].
122///
123/// # Examples
124///
125/// Creating a client, with default configuration.
126/// ```
127/// # use launchdarkly_server_sdk::{Client, ConfigBuilder, BuildError};
128/// # fn main() -> Result<(), BuildError> {
129///     let ld_client = Client::build(ConfigBuilder::new("sdk-key").build()?)?;
130/// #   Ok(())
131/// # }
132/// ```
133///
134/// Creating an instance which connects to a relay proxy.
135/// ```
136/// # use launchdarkly_server_sdk::{Client, ConfigBuilder, ServiceEndpointsBuilder, BuildError};
137/// # fn main() -> Result<(), BuildError> {
138///     let ld_client = Client::build(ConfigBuilder::new("sdk-key")
139///         .service_endpoints(ServiceEndpointsBuilder::new()
140///             .relay_proxy("http://my-relay-hostname:8080")
141///         ).build()?
142///     )?;
143/// #   Ok(())
144/// # }
145/// ```
146///
147/// Each builder type includes usage examples for the builder.
148pub struct Client {
149    event_processor: Arc<dyn EventProcessor>,
150    data_source: Arc<dyn DataSource>,
151    data_store: Arc<RwLock<dyn DataStore>>,
152    events_default: EventsScope,
153    events_with_reasons: EventsScope,
154    init_notify: Arc<Semaphore>,
155    init_state: Arc<AtomicUsize>,
156    started: AtomicBool,
157    offline: bool,
158    daemon_mode: bool,
159    #[cfg_attr(
160        not(any(feature = "crypto-openssl", feature = "crypto-aws-lc-rs")),
161        allow(dead_code)
162    )]
163    sdk_key: String,
164    shutdown_broadcast: broadcast::Sender<()>,
165    runtime: RwLock<Option<Runtime>>,
166}
167
168impl Client {
169    /// Create a new instance of a [Client] based on the provided [Config] parameter.
170    pub fn build(config: Config) -> Result<Self, BuildError> {
171        if config.offline() {
172            info!("Started LaunchDarkly Client in offline mode");
173        } else if config.daemon_mode() {
174            info!("Started LaunchDarkly Client in daemon mode");
175        }
176
177        let tags = config.application_tag();
178
179        let endpoints = config.service_endpoints_builder().build()?;
180        let event_processor =
181            config
182                .event_processor_builder()
183                .build(&endpoints, config.sdk_key(), tags.clone())?;
184        let data_source =
185            config
186                .data_source_builder()
187                .build(&endpoints, config.sdk_key(), tags.clone())?;
188        let data_store = config.data_store_builder().build()?;
189
190        let events_default = EventsScope {
191            disabled: config.offline(),
192            event_factory: EventFactory::new(false),
193            prerequisite_event_recorder: Box::new(PrerequisiteEventRecorder {
194                event_factory: EventFactory::new(false),
195                event_processor: event_processor.clone(),
196            }),
197        };
198
199        let events_with_reasons = EventsScope {
200            disabled: config.offline(),
201            event_factory: EventFactory::new(true),
202            prerequisite_event_recorder: Box::new(PrerequisiteEventRecorder {
203                event_factory: EventFactory::new(true),
204                event_processor: event_processor.clone(),
205            }),
206        };
207
208        let (shutdown_tx, _) = broadcast::channel(1);
209
210        Ok(Client {
211            event_processor,
212            data_source,
213            data_store,
214            events_default,
215            events_with_reasons,
216            init_notify: Arc::new(Semaphore::new(0)),
217            init_state: Arc::new(AtomicUsize::new(ClientInitState::Initializing as usize)),
218            started: AtomicBool::new(false),
219            offline: config.offline(),
220            daemon_mode: config.daemon_mode(),
221            sdk_key: config.sdk_key().into(),
222            shutdown_broadcast: shutdown_tx,
223            runtime: RwLock::new(None),
224        })
225    }
226
227    /// Starts a client in the current thread, which must have a default tokio runtime.
228    pub fn start_with_default_executor(&self) {
229        if self.started.load(Ordering::SeqCst) {
230            return;
231        }
232        self.started.store(true, Ordering::SeqCst);
233        self.start_with_default_executor_internal();
234    }
235
236    fn start_with_default_executor_internal(&self) {
237        // These clones are going to move into the closure, we
238        // do not want to move or reference `self`, because
239        // then lifetimes will get involved.
240        let notify = self.init_notify.clone();
241        let init_state = self.init_state.clone();
242
243        self.data_source.subscribe(
244            self.data_store.clone(),
245            Arc::new(move |success| {
246                init_state.store(
247                    (if success {
248                        ClientInitState::Initialized
249                    } else {
250                        ClientInitState::InitializationFailed
251                    }) as usize,
252                    Ordering::SeqCst,
253                );
254                notify.add_permits(1);
255            }),
256            self.shutdown_broadcast.subscribe(),
257        );
258    }
259
260    /// Creates a new tokio runtime and then starts the client. Tasks from the client will
261    /// be executed on created runtime.
262    /// If your application already has a tokio runtime, then you can use
263    /// [crate::Client::start_with_default_executor] and the client will dispatch tasks to
264    /// your existing runtime.
265    pub fn start_with_runtime(&self) -> Result<bool, StartError> {
266        if self.started.load(Ordering::SeqCst) {
267            return Ok(true);
268        }
269        self.started.store(true, Ordering::SeqCst);
270
271        let runtime = Runtime::new().map_err(StartError::SpawnFailed)?;
272        let _guard = runtime.enter();
273        self.runtime.write().replace(runtime);
274
275        self.start_with_default_executor_internal();
276
277        Ok(true)
278    }
279
280    /// This is an async method that will resolve once initialization is complete or the specified
281    /// timeout has occurred.
282    ///
283    /// If the timeout is triggered, this method will return `None`. Otherwise, the method will
284    /// return a boolean indicating whether or not the SDK has successfully initialized.
285    pub async fn wait_for_initialization(&self, timeout: Duration) -> Option<bool> {
286        if timeout > Duration::from_secs(60) {
287            warn!("wait_for_initialization was configured to block for up to {} seconds. We recommend blocking no longer than 60 seconds.", timeout.as_secs());
288        }
289
290        let initialized = tokio::time::timeout(timeout, self.initialized_async_internal()).await;
291        initialized.ok()
292    }
293
294    async fn initialized_async_internal(&self) -> bool {
295        if self.offline || self.daemon_mode {
296            return true;
297        }
298
299        // If the client is not initialized, then we need to wait for it to be initialized.
300        // Because we are using atomic types, and not a lock, then there is still the possibility
301        // that the value will change between the read and when we wait. We use a semaphore to wait,
302        // and we do not forget the permit, therefore if the permit has been added, then we will get
303        // it very quickly and reduce blocking.
304        if ClientInitState::Initialized != self.init_state.load(Ordering::SeqCst) {
305            let _permit = self.init_notify.acquire().await;
306        }
307        ClientInitState::Initialized == self.init_state.load(Ordering::SeqCst)
308    }
309
310    /// This function synchronously returns if the SDK is initialized.
311    /// In the case of unrecoverable errors in establishing a connection it is possible for the
312    /// SDK to never become initialized.
313    pub fn initialized(&self) -> bool {
314        self.offline
315            || self.daemon_mode
316            || ClientInitState::Initialized == self.init_state.load(Ordering::SeqCst)
317    }
318
319    /// Close shuts down the LaunchDarkly client. After calling this, the LaunchDarkly client
320    /// should no longer be used. The method will block until all pending analytics events (if any)
321    /// been sent.
322    pub fn close(&self) {
323        self.event_processor.close();
324
325        // If the system is in offline mode or daemon mode, no receiver will be listening to this
326        // broadcast channel, so sending on it would always result in an error.
327        if !self.offline && !self.daemon_mode {
328            if let Err(e) = self.shutdown_broadcast.send(()) {
329                error!("Failed to shutdown client appropriately: {e}");
330            }
331        }
332
333        // Potentially take the runtime we created when starting the client and do nothing with it
334        // so it drops, closing out all spawned tasks.
335        self.runtime.write().take();
336    }
337
338    /// Flush tells the client that all pending analytics events (if any) should be delivered as
339    /// soon as possible. Flushing is asynchronous, so this method will return before it is
340    /// complete. However, if you call [Client::close], events are guaranteed to be sent before
341    /// that method returns.
342    ///
343    /// For more information, see the Reference Guide:
344    /// <https://docs.launchdarkly.com/sdk/features/flush#rust>.
345    pub fn flush(&self) {
346        self.event_processor.flush();
347    }
348
349    /// Flush tells the client that all pending analytics events should be delivered as
350    /// soon as possible, and blocks until delivery is complete or the timeout expires.
351    ///
352    /// This method is particularly useful in short-lived execution environments like AWS Lambda
353    /// where you need to ensure events are sent before the function terminates.
354    ///
355    /// This method triggers a flush of events currently buffered and waits for that specific
356    /// flush to complete. Note that if periodic flushes or other flush operations are in-flight
357    /// when this is called, those may still be completing after this method returns.
358    ///
359    /// # Arguments
360    ///
361    /// * `timeout` - Maximum time to wait for flush to complete. Use `Duration::ZERO` to wait indefinitely.
362    ///
363    /// # Returns
364    ///
365    /// Returns `true` if flush completed successfully, `false` if timeout occurred.
366    ///
367    /// # Examples
368    ///
369    /// ```no_run
370    /// # use launchdarkly_server_sdk::{Client, ConfigBuilder};
371    /// # use std::time::Duration;
372    /// # async fn example() {
373    /// # let client = Client::build(ConfigBuilder::new("sdk-key").build().unwrap()).unwrap();
374    /// // Wait up to 5 seconds for flush to complete
375    /// let success = client.flush_blocking(Duration::from_secs(5)).await;
376    /// if !success {
377    ///     eprintln!("Warning: flush timed out");
378    /// }
379    /// # }
380    /// ```
381    ///
382    /// For more information, see the Reference Guide:
383    /// <https://docs.launchdarkly.com/sdk/features/flush#rust>.
384    pub async fn flush_blocking(&self, timeout: Duration) -> bool {
385        let event_processor = self.event_processor.clone();
386
387        let flush_future =
388            tokio::task::spawn_blocking(move || event_processor.flush_blocking(timeout));
389
390        if timeout == Duration::ZERO {
391            // Wait indefinitely
392            flush_future.await.unwrap_or(false)
393        } else {
394            // Apply timeout at async level too
395            match tokio::time::timeout(timeout, flush_future).await {
396                Ok(Ok(result)) => result,
397                Ok(Err(_)) => false, // spawn_blocking panicked
398                Err(_) => false,     // Timeout
399            }
400        }
401    }
402
403    /// Identify reports details about a context.
404    ///
405    /// For more information, see the Reference Guide:
406    /// <https://docs.launchdarkly.com/sdk/features/identify#rust>
407    pub fn identify(&self, context: Context) {
408        if self.events_default.disabled {
409            return;
410        }
411
412        self.send_internal(self.events_default.event_factory.new_identify(context));
413    }
414
415    /// Returns the value of a boolean feature flag for a given context.
416    ///
417    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned
418    /// off and has no off variation.
419    ///
420    /// For more information, see the Reference Guide:
421    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
422    pub fn bool_variation(&self, context: &Context, flag_key: &str, default: bool) -> bool {
423        let val = self.variation(context, flag_key, default);
424        if let Some(b) = val.as_bool() {
425            b
426        } else {
427            warn!("bool_variation called for a non-bool flag {flag_key:?} (got {val:?})");
428            default
429        }
430    }
431
432    /// Returns the value of a string feature flag for a given context.
433    ///
434    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned
435    /// off and has no off variation.
436    ///
437    /// For more information, see the Reference Guide:
438    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
439    pub fn str_variation(&self, context: &Context, flag_key: &str, default: String) -> String {
440        let val = self.variation(context, flag_key, default.clone());
441        if let Some(s) = val.as_string() {
442            s
443        } else {
444            warn!("str_variation called for a non-string flag {flag_key:?} (got {val:?})");
445            default
446        }
447    }
448
449    /// Returns the value of a float feature flag for a given context.
450    ///
451    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned
452    /// off and has no off variation.
453    ///
454    /// For more information, see the Reference Guide:
455    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
456    pub fn float_variation(&self, context: &Context, flag_key: &str, default: f64) -> f64 {
457        let val = self.variation(context, flag_key, default);
458        if let Some(f) = val.as_float() {
459            f
460        } else {
461            warn!("float_variation called for a non-float flag {flag_key:?} (got {val:?})");
462            default
463        }
464    }
465
466    /// Returns the value of a integer feature flag for a given context.
467    ///
468    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned
469    /// off and has no off variation.
470    ///
471    /// For more information, see the Reference Guide:
472    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
473    pub fn int_variation(&self, context: &Context, flag_key: &str, default: i64) -> i64 {
474        let val = self.variation(context, flag_key, default);
475        if let Some(f) = val.as_int() {
476            f
477        } else {
478            warn!("int_variation called for a non-int flag {flag_key:?} (got {val:?})");
479            default
480        }
481    }
482
483    /// Returns the value of a feature flag for the given context, allowing the value to be
484    /// of any JSON type.
485    ///
486    /// The value is returned as an [serde_json::Value].
487    ///
488    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned off.
489    ///
490    /// For more information, see the Reference Guide:
491    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
492    pub fn json_variation(
493        &self,
494        context: &Context,
495        flag_key: &str,
496        default: serde_json::Value,
497    ) -> serde_json::Value {
498        self.variation(context, flag_key, default.clone())
499            .as_json()
500            .unwrap_or(default)
501    }
502
503    /// This method is the same as [Client::bool_variation], but also returns further information
504    /// about how the value was calculated. The "reason" data will also be included in analytics
505    /// events.
506    ///
507    /// For more information, see the Reference Guide:
508    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
509    pub fn bool_variation_detail(
510        &self,
511        context: &Context,
512        flag_key: &str,
513        default: bool,
514    ) -> Detail<bool> {
515        self.variation_detail(context, flag_key, default).try_map(
516            |val| val.as_bool(),
517            default,
518            eval::Error::WrongType,
519        )
520    }
521
522    /// This method is the same as [Client::str_variation], but also returns further information
523    /// about how the value was calculated. The "reason" data will also be included in analytics
524    /// events.
525    ///
526    /// For more information, see the Reference Guide:
527    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
528    pub fn str_variation_detail(
529        &self,
530        context: &Context,
531        flag_key: &str,
532        default: String,
533    ) -> Detail<String> {
534        self.variation_detail(context, flag_key, default.clone())
535            .try_map(|val| val.as_string(), default, eval::Error::WrongType)
536    }
537
538    /// This method is the same as [Client::float_variation], but also returns further information
539    /// about how the value was calculated. The "reason" data will also be included in analytics
540    /// events.
541    ///
542    /// For more information, see the Reference Guide:
543    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
544    pub fn float_variation_detail(
545        &self,
546        context: &Context,
547        flag_key: &str,
548        default: f64,
549    ) -> Detail<f64> {
550        self.variation_detail(context, flag_key, default).try_map(
551            |val| val.as_float(),
552            default,
553            eval::Error::WrongType,
554        )
555    }
556
557    /// This method is the same as [Client::int_variation], but also returns further information
558    /// about how the value was calculated. The "reason" data will also be included in analytics
559    /// events.
560    ///
561    /// For more information, see the Reference Guide:
562    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
563    pub fn int_variation_detail(
564        &self,
565        context: &Context,
566        flag_key: &str,
567        default: i64,
568    ) -> Detail<i64> {
569        self.variation_detail(context, flag_key, default).try_map(
570            |val| val.as_int(),
571            default,
572            eval::Error::WrongType,
573        )
574    }
575
576    /// This method is the same as [Client::json_variation], but also returns further information
577    /// about how the value was calculated. The "reason" data will also be included in analytics
578    /// events.
579    ///
580    /// For more information, see the Reference Guide:
581    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
582    pub fn json_variation_detail(
583        &self,
584        context: &Context,
585        flag_key: &str,
586        default: serde_json::Value,
587    ) -> Detail<serde_json::Value> {
588        self.variation_detail(context, flag_key, default.clone())
589            .try_map(|val| val.as_json(), default, eval::Error::WrongType)
590    }
591
592    #[cfg(any(feature = "crypto-aws-lc-rs", feature = "crypto-openssl"))]
593    /// Generates the secure mode hash value for a context.
594    ///
595    /// For more information, see the Reference Guide:
596    /// <https://docs.launchdarkly.com/sdk/features/secure-mode#rust>.
597    pub fn secure_mode_hash(&self, context: &Context) -> Result<String, String> {
598        #[cfg(feature = "crypto-aws-lc-rs")]
599        {
600            let key =
601                aws_lc_rs::hmac::Key::new(aws_lc_rs::hmac::HMAC_SHA256, self.sdk_key.as_bytes());
602            let tag = aws_lc_rs::hmac::sign(&key, context.canonical_key().as_bytes());
603
604            Ok(data_encoding::HEXLOWER.encode(tag.as_ref()))
605        }
606        #[cfg(feature = "crypto-openssl")]
607        {
608            use openssl::hash::MessageDigest;
609            use openssl::pkey::PKey;
610            use openssl::sign::Signer;
611
612            let key = PKey::hmac(self.sdk_key.as_bytes())
613                .map_err(|e| format!("Failed to create HMAC key: {e}"))?;
614            let mut signer = Signer::new(MessageDigest::sha256(), &key)
615                .map_err(|e| format!("Failed to create signer: {e}"))?;
616            signer
617                .update(context.canonical_key().as_bytes())
618                .map_err(|e| format!("Failed to update signer: {e}"))?;
619            let hmac = signer
620                .sign_to_vec()
621                .map_err(|e| format!("Failed to sign: {e}"))?;
622
623            Ok(data_encoding::HEXLOWER.encode(&hmac))
624        }
625    }
626
627    /// Returns an object that encapsulates the state of all feature flags for a given context. This
628    /// includes the flag values, and also metadata that can be used on the front end.
629    ///
630    /// The most common use case for this method is to bootstrap a set of client-side feature flags
631    /// from a back-end service.
632    ///
633    /// You may pass any configuration of [FlagDetailConfig] to control what data is included.
634    ///
635    /// For more information, see the Reference Guide:
636    /// <https://docs.launchdarkly.com/sdk/features/all-flags#rust>
637    pub fn all_flags_detail(
638        &self,
639        context: &Context,
640        flag_state_config: FlagDetailConfig,
641    ) -> FlagDetail {
642        if self.offline {
643            warn!(
644                "all_flags_detail() called, but client is in offline mode. Returning empty state"
645            );
646            return FlagDetail::new(false);
647        }
648
649        if !self.initialized() {
650            warn!("all_flags_detail() called before client has finished initializing! Feature store unavailable - returning empty state");
651            return FlagDetail::new(false);
652        }
653
654        let data_store = self.data_store.read();
655
656        let mut flag_detail = FlagDetail::new(true);
657        flag_detail.populate(&*data_store, context, flag_state_config);
658
659        flag_detail
660    }
661
662    /// This method is the same as [Client::variation], but also returns further information about
663    /// how the value was calculated. The "reason" data will also be included in analytics events.
664    ///
665    /// For more information, see the Reference Guide:
666    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
667    pub fn variation_detail<T: Into<FlagValue> + Clone>(
668        &self,
669        context: &Context,
670        flag_key: &str,
671        default: T,
672    ) -> Detail<FlagValue> {
673        let (detail, _) =
674            self.variation_internal(context, flag_key, default, &self.events_with_reasons);
675        detail
676    }
677
678    /// This is a generic function which returns the value of a feature flag for a given context.
679    ///
680    /// This method is an alternatively to the type specified methods (e.g.
681    /// [Client::bool_variation], [Client::int_variation], etc.).
682    ///
683    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned
684    /// off and has no off variation.
685    ///
686    /// For more information, see the Reference Guide:
687    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
688    pub fn variation<T: Into<FlagValue> + Clone>(
689        &self,
690        context: &Context,
691        flag_key: &str,
692        default: T,
693    ) -> FlagValue {
694        let (detail, _) = self.variation_internal(context, flag_key, default, &self.events_default);
695        detail.value.unwrap()
696    }
697
698    /// This method returns the migration stage of the migration feature flag for the given
699    /// evaluation context.
700    ///
701    /// This method returns the default stage if there is an error or the flag does not exist.
702    pub fn migration_variation(
703        &self,
704        context: &Context,
705        flag_key: &str,
706        default_stage: Stage,
707    ) -> (Stage, Arc<Mutex<MigrationOpTracker>>) {
708        let (detail, flag) =
709            self.variation_internal(context, flag_key, default_stage, &self.events_default);
710
711        let migration_detail =
712            detail.try_map(|v| v.try_into().ok(), default_stage, eval::Error::WrongType);
713        let tracker = MigrationOpTracker::new(
714            flag_key.into(),
715            flag,
716            context.clone(),
717            migration_detail.clone(),
718            default_stage,
719        );
720
721        (
722            migration_detail.value.unwrap_or(default_stage),
723            Arc::new(Mutex::new(tracker)),
724        )
725    }
726
727    /// Reports that a context has performed an event.
728    ///
729    /// The `key` parameter is defined by the application and will be shown in analytics reports;
730    /// it normally corresponds to the event name of a metric that you have created through the
731    /// LaunchDarkly dashboard. If you want to associate additional data with this event, use
732    /// [Client::track_data] or [Client::track_metric].
733    ///
734    /// For more information, see the Reference Guide:
735    /// <https://docs.launchdarkly.com/sdk/features/events#rust>.
736    pub fn track_event(&self, context: Context, key: impl Into<String>) {
737        let _ = self.track(context, key, None, serde_json::Value::Null);
738    }
739
740    /// Reports that a context has performed an event, and associates it with custom data.
741    ///
742    /// The `key` parameter is defined by the application and will be shown in analytics reports;
743    /// it normally corresponds to the event name of a metric that you have created through the
744    /// LaunchDarkly dashboard.
745    ///
746    /// `data` parameter is any type that implements [Serialize]. If no such value is needed, use
747    /// [serde_json::Value::Null] (or call [Client::track_event] instead). To send a numeric value
748    /// for experimentation, use [Client::track_metric].
749    ///
750    /// For more information, see the Reference Guide:
751    /// <https://docs.launchdarkly.com/sdk/features/events#rust>.
752    pub fn track_data(
753        &self,
754        context: Context,
755        key: impl Into<String>,
756        data: impl Serialize,
757    ) -> serde_json::Result<()> {
758        self.track(context, key, None, data)
759    }
760
761    /// Reports that a context has performed an event, and associates it with a numeric value. This
762    /// value is used by the LaunchDarkly experimentation feature in numeric custom metrics, and
763    /// will also be returned as part of the custom event for Data Export.
764    ///
765    /// The `key` parameter is defined by the application and will be shown in analytics reports;
766    /// it normally corresponds to the event name of a metric that you have created through the
767    /// LaunchDarkly dashboard.
768    ///
769    /// For more information, see the Reference Guide:
770    /// <https://docs.launchdarkly.com/sdk/features/events#rust>.
771    pub fn track_metric(
772        &self,
773        context: Context,
774        key: impl Into<String>,
775        value: f64,
776        data: impl Serialize,
777    ) {
778        let _ = self.track(context, key, Some(value), data);
779    }
780
781    fn track(
782        &self,
783        context: Context,
784        key: impl Into<String>,
785        metric_value: Option<f64>,
786        data: impl Serialize,
787    ) -> serde_json::Result<()> {
788        if !self.events_default.disabled {
789            let event =
790                self.events_default
791                    .event_factory
792                    .new_custom(context, key, metric_value, data)?;
793
794            self.send_internal(event);
795        }
796
797        Ok(())
798    }
799
800    /// Tracks the results of a migrations operation. This event includes measurements which can be
801    /// used to enhance the observability of a migration within the LaunchDarkly UI.
802    ///
803    /// This event should be generated through [crate::MigrationOpTracker]. If you are using the
804    /// [crate::Migrator] to handle migrations, this event will be created and emitted
805    /// automatically.
806    pub fn track_migration_op(&self, tracker: Arc<Mutex<MigrationOpTracker>>) {
807        if self.events_default.disabled {
808            return;
809        }
810
811        match tracker.lock() {
812            Ok(tracker) => {
813                let event = tracker.build();
814                match event {
815                    Ok(event) => {
816                        self.send_internal(
817                            self.events_default.event_factory.new_migration_op(event),
818                        );
819                    }
820                    Err(e) => error!("Failed to build migration event, no event will be sent: {e}"),
821                }
822            }
823            Err(e) => error!("Failed to lock migration tracker, no event will be sent: {e}"),
824        }
825    }
826
827    fn variation_internal<T: Into<FlagValue> + Clone>(
828        &self,
829        context: &Context,
830        flag_key: &str,
831        default: T,
832        events_scope: &EventsScope,
833    ) -> (Detail<FlagValue>, Option<eval::Flag>) {
834        if self.offline {
835            return (
836                Detail::err_default(eval::Error::ClientNotReady, default.into()),
837                None,
838            );
839        }
840
841        let (flag, result) = match self.initialized() {
842            false => (
843                None,
844                Detail::err_default(eval::Error::ClientNotReady, default.clone().into()),
845            ),
846            true => {
847                let data_store = self.data_store.read();
848                match data_store.flag(flag_key) {
849                    Some(flag) => {
850                        let result = eval::evaluate(
851                            data_store.to_store(),
852                            &flag,
853                            context,
854                            Some(&*events_scope.prerequisite_event_recorder),
855                        )
856                        .map(|v| v.clone())
857                        .or(default.clone().into());
858
859                        (Some(flag), result)
860                    }
861                    None => (
862                        None,
863                        Detail::err_default(eval::Error::FlagNotFound, default.clone().into()),
864                    ),
865                }
866            }
867        };
868
869        if !events_scope.disabled {
870            let event = match &flag {
871                Some(f) => events_scope.event_factory.new_eval_event(
872                    flag_key,
873                    context.clone(),
874                    f,
875                    result.clone(),
876                    default.into(),
877                    None,
878                ),
879                None => events_scope.event_factory.new_unknown_flag_event(
880                    flag_key,
881                    context.clone(),
882                    result.clone(),
883                    default.into(),
884                ),
885            };
886            self.send_internal(event);
887        }
888
889        (result, flag)
890    }
891
892    fn send_internal(&self, event: InputEvent) {
893        self.event_processor.send(event);
894    }
895}
896
897#[cfg(test)]
898mod tests {
899    use assert_json_diff::assert_json_eq;
900    use crossbeam_channel::Receiver;
901    use eval::{ContextBuilder, MultiContextBuilder};
902    use futures::FutureExt;
903    use launchdarkly_server_sdk_evaluation::{Flag, Reason, Segment};
904    use maplit::hashmap;
905    use std::collections::HashMap;
906    use tokio::time::Instant;
907
908    use crate::data_source::MockDataSource;
909    use crate::data_source_builders::MockDataSourceBuilder;
910    use crate::evaluation::FlagFilter;
911    use crate::events::create_event_sender;
912    use crate::events::event::{OutputEvent, VariationKey};
913    use crate::events::processor_builders::EventProcessorBuilder;
914    use crate::stores::persistent_store::tests::InMemoryPersistentDataStore;
915    use crate::stores::store_types::{PatchTarget, StorageItem};
916    use crate::test_common::{
917        self, basic_flag, basic_flag_with_prereq, basic_flag_with_prereqs_and_visibility,
918        basic_flag_with_visibility, basic_int_flag, basic_migration_flag, basic_off_flag,
919    };
920    use crate::test_data::TestData;
921    use crate::{
922        AllData, ConfigBuilder, MigratorBuilder, NullEventProcessorBuilder, Operation, Origin,
923        PersistentDataStore, PersistentDataStoreBuilder, PersistentDataStoreFactory,
924        SerializedItem,
925    };
926    use test_case::test_case;
927
928    use super::*;
929
930    fn is_send_and_sync<T: Send + Sync>() {}
931
932    #[test]
933    fn ensure_client_is_send_and_sync() {
934        is_send_and_sync::<Client>()
935    }
936
937    #[tokio::test]
938    async fn client_asynchronously_initializes_within_timeout() {
939        let (client, _event_rx) = make_mocked_client_with_delay(1000, false, false);
940        client.start_with_default_executor();
941
942        let now = Instant::now();
943        let initialized = client
944            .wait_for_initialization(Duration::from_millis(1500))
945            .await;
946        let elapsed_time = now.elapsed();
947        // Give ourself a good margin for thread scheduling.
948        assert!(elapsed_time.as_millis() > 500);
949        assert_eq!(initialized, Some(true));
950    }
951
952    #[tokio::test]
953    async fn client_asynchronously_initializes_slower_than_timeout() {
954        let (client, _event_rx) = make_mocked_client_with_delay(2000, false, false);
955        client.start_with_default_executor();
956
957        let now = Instant::now();
958        let initialized = client
959            .wait_for_initialization(Duration::from_millis(500))
960            .await;
961        let elapsed_time = now.elapsed();
962        // Give ourself a good margin for thread scheduling.
963        assert!(elapsed_time.as_millis() < 750);
964        assert!(initialized.is_none());
965    }
966
967    #[tokio::test]
968    async fn client_initializes_immediately_in_offline_mode() {
969        let (client, _event_rx) = make_mocked_client_with_delay(1000, true, false);
970        client.start_with_default_executor();
971
972        assert!(client.initialized());
973
974        let now = Instant::now();
975        let initialized = client
976            .wait_for_initialization(Duration::from_millis(2000))
977            .await;
978        let elapsed_time = now.elapsed();
979        assert_eq!(initialized, Some(true));
980        assert!(elapsed_time.as_millis() < 500)
981    }
982
983    #[tokio::test]
984    async fn client_initializes_immediately_in_daemon_mode() {
985        let (client, _event_rx) = make_mocked_client_with_delay(1000, false, true);
986        client.start_with_default_executor();
987
988        assert!(client.initialized());
989
990        let now = Instant::now();
991        let initialized = client
992            .wait_for_initialization(Duration::from_millis(2000))
993            .await;
994        let elapsed_time = now.elapsed();
995        assert_eq!(initialized, Some(true));
996        assert!(elapsed_time.as_millis() < 500)
997    }
998
999    #[test_case(basic_flag("myFlag"), false.into(), true.into())]
1000    #[test_case(basic_int_flag("myFlag"), 0.into(), test_common::FLOAT_TO_INT_MAX.into())]
1001    fn client_updates_changes_evaluation_results(
1002        flag: eval::Flag,
1003        default: FlagValue,
1004        expected: FlagValue,
1005    ) {
1006        let context = ContextBuilder::new("foo")
1007            .build()
1008            .expect("Failed to create context");
1009
1010        let (client, _event_rx) = make_mocked_client();
1011
1012        let result = client.variation_detail(&context, "myFlag", default.clone());
1013        assert_eq!(result.value.unwrap(), default);
1014
1015        client.start_with_default_executor();
1016        client
1017            .data_store
1018            .write()
1019            .upsert(
1020                &flag.key,
1021                PatchTarget::Flag(StorageItem::Item(flag.clone())),
1022            )
1023            .expect("patch should apply");
1024
1025        let result = client.variation_detail(&context, "myFlag", default);
1026        assert_eq!(result.value.unwrap(), expected);
1027        assert!(matches!(
1028            result.reason,
1029            Reason::Fallthrough {
1030                in_experiment: false
1031            }
1032        ));
1033    }
1034
1035    #[test]
1036    fn all_flags_detail_is_invalid_when_offline() {
1037        let (client, _event_rx) = make_mocked_offline_client();
1038        client.start_with_default_executor();
1039
1040        let context = ContextBuilder::new("bob")
1041            .build()
1042            .expect("Failed to create context");
1043
1044        let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1045        assert_json_eq!(all_flags, json!({"$valid": false, "$flagsState" : {}}));
1046    }
1047
1048    #[test]
1049    fn all_flags_detail_is_invalid_when_not_initialized() {
1050        let (client, _event_rx) = make_mocked_client();
1051
1052        let context = ContextBuilder::new("bob")
1053            .build()
1054            .expect("Failed to create context");
1055
1056        let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1057        assert_json_eq!(all_flags, json!({"$valid": false, "$flagsState" : {}}));
1058    }
1059
1060    #[tokio::test]
1061    async fn all_flags_detail_returns_flag_states() {
1062        let td = TestData::new();
1063        td.use_preconfigured_flag(basic_flag("myFlag1"));
1064        td.use_preconfigured_flag(basic_flag("myFlag2"));
1065        let (client, _event_rx) = make_client_with_test_data(&td);
1066        client.start_with_default_executor();
1067
1068        let context = ContextBuilder::new("bob")
1069            .build()
1070            .expect("Failed to create context");
1071
1072        let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1073
1074        client.close();
1075
1076        assert_json_eq!(
1077            all_flags,
1078            json!({
1079                "myFlag1": true,
1080                "myFlag2": true,
1081                "$flagsState": {
1082                    "myFlag1": {
1083                        "version": 1,
1084                        "variation": 1
1085                    },
1086                     "myFlag2": {
1087                        "version": 1,
1088                        "variation": 1
1089                    },
1090                },
1091                "$valid": true
1092            })
1093        );
1094    }
1095
1096    #[tokio::test]
1097    async fn all_flags_detail_returns_prerequisite_relations() {
1098        let td = TestData::new();
1099        td.use_preconfigured_flag(basic_flag("prereq1"));
1100        td.use_preconfigured_flag(basic_flag("prereq2"));
1101        td.use_preconfigured_flag(basic_flag_with_prereqs_and_visibility(
1102            "toplevel",
1103            &["prereq1", "prereq2"],
1104            false,
1105            false,
1106        ));
1107        let (client, _event_rx) = make_client_with_test_data(&td);
1108        client.start_with_default_executor();
1109
1110        let context = ContextBuilder::new("bob")
1111            .build()
1112            .expect("Failed to create context");
1113
1114        let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1115
1116        client.close();
1117
1118        assert_json_eq!(
1119            all_flags,
1120            json!({
1121                "prereq1": true,
1122                "prereq2": true,
1123                "toplevel": true,
1124                "$flagsState": {
1125                    "toplevel": {
1126                        "version": 1,
1127                        "variation": 1,
1128                        "prerequisites": ["prereq1", "prereq2"]
1129                    },
1130                    "prereq1": {
1131                        "version": 1,
1132                        "variation": 1
1133                    },
1134                     "prereq2": {
1135                        "version": 1,
1136                        "variation": 1
1137                    },
1138                },
1139                "$valid": true
1140            })
1141        );
1142    }
1143
1144    #[tokio::test]
1145    async fn all_flags_detail_returns_prerequisite_relations_when_not_visible_to_clients() {
1146        let td = TestData::new();
1147        td.use_preconfigured_flag(basic_flag_with_visibility("prereq1", false, false));
1148        td.use_preconfigured_flag(basic_flag_with_visibility("prereq2", false, false));
1149        td.use_preconfigured_flag(basic_flag_with_prereqs_and_visibility(
1150            "toplevel",
1151            &["prereq1", "prereq2"],
1152            true,
1153            false,
1154        ));
1155        let (client, _event_rx) = make_client_with_test_data(&td);
1156        client.start_with_default_executor();
1157
1158        let context = ContextBuilder::new("bob")
1159            .build()
1160            .expect("Failed to create context");
1161
1162        let mut config = FlagDetailConfig::new();
1163        config.flag_filter(FlagFilter::CLIENT);
1164
1165        let all_flags = client.all_flags_detail(&context, config);
1166
1167        client.close();
1168
1169        assert_json_eq!(
1170            all_flags,
1171            json!({
1172                "toplevel": true,
1173                "$flagsState": {
1174                    "toplevel": {
1175                        "version": 1,
1176                        "variation": 1,
1177                        "prerequisites": ["prereq1", "prereq2"]
1178                    },
1179                },
1180                "$valid": true
1181            })
1182        );
1183    }
1184
1185    #[tokio::test]
1186    async fn variation_tracks_events_correctly() {
1187        let td = TestData::new();
1188        td.use_preconfigured_flag(basic_flag("myFlag"));
1189        let (client, event_rx) = make_client_with_test_data(&td);
1190        client.start_with_default_executor();
1191
1192        let context = ContextBuilder::new("bob")
1193            .build()
1194            .expect("Failed to create context");
1195
1196        let flag_value = client.variation(&context, "myFlag", FlagValue::Bool(false));
1197
1198        assert!(flag_value.as_bool().unwrap());
1199        client.flush();
1200        client.close();
1201
1202        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1203        assert_eq!(events.len(), 2);
1204        assert_eq!(events[0].kind(), "index");
1205        assert_eq!(events[1].kind(), "summary");
1206
1207        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1208            let variation_key = VariationKey {
1209                version: Some(1),
1210                variation: Some(1),
1211            };
1212            let feature = event_summary.features.get("myFlag");
1213            assert!(feature.is_some());
1214
1215            let feature = feature.unwrap();
1216            assert!(feature.counters.contains_key(&variation_key));
1217        } else {
1218            panic!("Event should be a summary type");
1219        }
1220    }
1221
1222    #[test]
1223    fn variation_handles_offline_mode() {
1224        let (client, event_rx) = make_mocked_offline_client();
1225        client.start_with_default_executor();
1226
1227        let context = ContextBuilder::new("bob")
1228            .build()
1229            .expect("Failed to create context");
1230        let flag_value = client.variation(&context, "myFlag", FlagValue::Bool(false));
1231
1232        assert!(!flag_value.as_bool().unwrap());
1233        client.flush();
1234        client.close();
1235
1236        assert_eq!(event_rx.iter().count(), 0);
1237    }
1238
1239    #[test]
1240    fn variation_handles_unknown_flags() {
1241        let (client, event_rx) = make_mocked_client();
1242        client.start_with_default_executor();
1243        let context = ContextBuilder::new("bob")
1244            .build()
1245            .expect("Failed to create context");
1246
1247        let flag_value = client.variation(&context, "non-existent-flag", FlagValue::Bool(false));
1248
1249        assert!(!flag_value.as_bool().unwrap());
1250        client.flush();
1251        client.close();
1252
1253        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1254        assert_eq!(events.len(), 2);
1255        assert_eq!(events[0].kind(), "index");
1256        assert_eq!(events[1].kind(), "summary");
1257
1258        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1259            let variation_key = VariationKey {
1260                version: None,
1261                variation: None,
1262            };
1263
1264            let feature = event_summary.features.get("non-existent-flag");
1265            assert!(feature.is_some());
1266
1267            let feature = feature.unwrap();
1268            assert!(feature.counters.contains_key(&variation_key));
1269        } else {
1270            panic!("Event should be a summary type");
1271        }
1272    }
1273
1274    #[tokio::test]
1275    async fn variation_detail_handles_debug_events_correctly() {
1276        let td = TestData::new();
1277        let mut flag = basic_flag("myFlag");
1278        flag.debug_events_until_date = Some(64_060_606_800_000); // Jan. 1st, 4000
1279        td.use_preconfigured_flag(flag);
1280        let (client, event_rx) = make_client_with_test_data(&td);
1281        client.start_with_default_executor();
1282
1283        let context = ContextBuilder::new("bob")
1284            .build()
1285            .expect("Failed to create context");
1286
1287        let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1288
1289        assert!(detail.value.unwrap().as_bool().unwrap());
1290        assert!(matches!(
1291            detail.reason,
1292            Reason::Fallthrough {
1293                in_experiment: false
1294            }
1295        ));
1296        client.flush();
1297        client.close();
1298
1299        let events = event_rx.try_iter().collect::<Vec<OutputEvent>>();
1300        assert_eq!(events.len(), 3);
1301        assert_eq!(events[0].kind(), "index");
1302        assert_eq!(events[1].kind(), "debug");
1303        assert_eq!(events[2].kind(), "summary");
1304
1305        if let OutputEvent::Summary(event_summary) = events[2].clone() {
1306            let variation_key = VariationKey {
1307                version: Some(1),
1308                variation: Some(1),
1309            };
1310
1311            let feature = event_summary.features.get("myFlag");
1312            assert!(feature.is_some());
1313
1314            let feature = feature.unwrap();
1315            assert!(feature.counters.contains_key(&variation_key));
1316        } else {
1317            panic!("Event should be a summary type");
1318        }
1319    }
1320
1321    #[tokio::test]
1322    async fn variation_detail_tracks_events_correctly() {
1323        let td = TestData::new();
1324        td.use_preconfigured_flag(basic_flag("myFlag"));
1325        let (client, event_rx) = make_client_with_test_data(&td);
1326        client.start_with_default_executor();
1327
1328        let context = ContextBuilder::new("bob")
1329            .build()
1330            .expect("Failed to create context");
1331
1332        let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1333
1334        assert!(detail.value.unwrap().as_bool().unwrap());
1335        assert!(matches!(
1336            detail.reason,
1337            Reason::Fallthrough {
1338                in_experiment: false
1339            }
1340        ));
1341        client.flush();
1342        client.close();
1343
1344        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1345        assert_eq!(events.len(), 2);
1346        assert_eq!(events[0].kind(), "index");
1347        assert_eq!(events[1].kind(), "summary");
1348
1349        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1350            let variation_key = VariationKey {
1351                version: Some(1),
1352                variation: Some(1),
1353            };
1354
1355            let feature = event_summary.features.get("myFlag");
1356            assert!(feature.is_some());
1357
1358            let feature = feature.unwrap();
1359            assert!(feature.counters.contains_key(&variation_key));
1360        } else {
1361            panic!("Event should be a summary type");
1362        }
1363    }
1364
1365    #[test]
1366    fn variation_detail_handles_offline_mode() {
1367        let (client, event_rx) = make_mocked_offline_client();
1368        client.start_with_default_executor();
1369
1370        let context = ContextBuilder::new("bob")
1371            .build()
1372            .expect("Failed to create context");
1373
1374        let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1375
1376        assert!(!detail.value.unwrap().as_bool().unwrap());
1377        assert!(matches!(
1378            detail.reason,
1379            Reason::Error {
1380                error: eval::Error::ClientNotReady
1381            }
1382        ));
1383        client.flush();
1384        client.close();
1385
1386        assert_eq!(event_rx.iter().count(), 0);
1387    }
1388
1389    struct InMemoryPersistentDataStoreFactory {
1390        data: AllData<Flag, Segment>,
1391        initialized: bool,
1392    }
1393
1394    impl PersistentDataStoreFactory for InMemoryPersistentDataStoreFactory {
1395        fn create_persistent_data_store(
1396            &self,
1397        ) -> Result<Box<dyn PersistentDataStore + 'static>, std::io::Error> {
1398            let serialized_data =
1399                AllData::<SerializedItem, SerializedItem>::try_from(self.data.clone())?;
1400            Ok(Box::new(InMemoryPersistentDataStore {
1401                data: serialized_data,
1402                initialized: self.initialized,
1403            }))
1404        }
1405    }
1406
1407    #[test]
1408    fn variation_detail_handles_daemon_mode() {
1409        testing_logger::setup();
1410        let factory = InMemoryPersistentDataStoreFactory {
1411            data: AllData {
1412                flags: hashmap!["flag".into() => basic_flag("flag")],
1413                segments: HashMap::new(),
1414            },
1415            initialized: true,
1416        };
1417        let builder = PersistentDataStoreBuilder::new(Arc::new(factory));
1418
1419        let config = ConfigBuilder::new("sdk-key")
1420            .daemon_mode(true)
1421            .data_store(&builder)
1422            .event_processor(&NullEventProcessorBuilder::new())
1423            .build()
1424            .expect("config should build");
1425
1426        let client = Client::build(config).expect("Should be built.");
1427
1428        client.start_with_default_executor();
1429
1430        let context = ContextBuilder::new("bob")
1431            .build()
1432            .expect("Failed to create context");
1433
1434        let detail = client.variation_detail(&context, "flag", FlagValue::Bool(false));
1435
1436        assert!(detail.value.unwrap().as_bool().unwrap());
1437        assert!(matches!(
1438            detail.reason,
1439            Reason::Fallthrough {
1440                in_experiment: false
1441            }
1442        ));
1443        client.flush();
1444        client.close();
1445
1446        testing_logger::validate(|captured_logs| {
1447            assert_eq!(captured_logs.len(), 1);
1448            assert_eq!(
1449                captured_logs[0].body,
1450                "Started LaunchDarkly Client in daemon mode"
1451            );
1452        });
1453    }
1454
1455    #[test]
1456    fn daemon_mode_is_quiet_if_store_is_not_initialized() {
1457        testing_logger::setup();
1458
1459        let factory = InMemoryPersistentDataStoreFactory {
1460            data: AllData {
1461                flags: HashMap::new(),
1462                segments: HashMap::new(),
1463            },
1464            initialized: false,
1465        };
1466        let builder = PersistentDataStoreBuilder::new(Arc::new(factory));
1467
1468        let config = ConfigBuilder::new("sdk-key")
1469            .daemon_mode(true)
1470            .data_store(&builder)
1471            .event_processor(&NullEventProcessorBuilder::new())
1472            .build()
1473            .expect("config should build");
1474
1475        let client = Client::build(config).expect("Should be built.");
1476
1477        client.start_with_default_executor();
1478
1479        let context = ContextBuilder::new("bob")
1480            .build()
1481            .expect("Failed to create context");
1482
1483        client.variation_detail(&context, "flag", FlagValue::Bool(false));
1484
1485        testing_logger::validate(|captured_logs| {
1486            assert_eq!(captured_logs.len(), 1);
1487            assert_eq!(
1488                captured_logs[0].body,
1489                "Started LaunchDarkly Client in daemon mode"
1490            );
1491        });
1492    }
1493
1494    #[tokio::test]
1495    async fn variation_handles_off_flag_without_variation() {
1496        let td = TestData::new();
1497        td.use_preconfigured_flag(basic_off_flag("myFlag"));
1498        let (client, event_rx) = make_client_with_test_data(&td);
1499        client.start_with_default_executor();
1500
1501        let context = ContextBuilder::new("bob")
1502            .build()
1503            .expect("Failed to create context");
1504
1505        let result = client.variation(&context, "myFlag", FlagValue::Bool(false));
1506
1507        assert!(!result.as_bool().unwrap());
1508        client.flush();
1509        client.close();
1510
1511        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1512        assert_eq!(events.len(), 2);
1513        assert_eq!(events[0].kind(), "index");
1514        assert_eq!(events[1].kind(), "summary");
1515
1516        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1517            let variation_key = VariationKey {
1518                version: Some(1),
1519                variation: None,
1520            };
1521            let feature = event_summary.features.get("myFlag");
1522            assert!(feature.is_some());
1523
1524            let feature = feature.unwrap();
1525            assert!(feature.counters.contains_key(&variation_key));
1526        } else {
1527            panic!("Event should be a summary type");
1528        }
1529    }
1530
1531    #[tokio::test]
1532    async fn variation_detail_tracks_prereq_events_correctly() {
1533        let td = TestData::new();
1534        let mut prereq_flag = basic_flag("prereqFlag");
1535        prereq_flag.track_events = true;
1536        td.use_preconfigured_flag(prereq_flag);
1537
1538        let mut main_flag = basic_flag_with_prereq("myFlag", "prereqFlag");
1539        main_flag.track_events = true;
1540        td.use_preconfigured_flag(main_flag);
1541
1542        let (client, event_rx) = make_client_with_test_data(&td);
1543        client.start_with_default_executor();
1544
1545        let context = ContextBuilder::new("bob")
1546            .build()
1547            .expect("Failed to create context");
1548
1549        let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1550
1551        assert!(detail.value.unwrap().as_bool().unwrap());
1552        assert!(matches!(
1553            detail.reason,
1554            Reason::Fallthrough {
1555                in_experiment: false
1556            }
1557        ));
1558        client.flush();
1559        client.close();
1560
1561        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1562        assert_eq!(events.len(), 4);
1563        assert_eq!(events[0].kind(), "index");
1564        assert_eq!(events[1].kind(), "feature");
1565        assert_eq!(events[2].kind(), "feature");
1566        assert_eq!(events[3].kind(), "summary");
1567
1568        if let OutputEvent::Summary(event_summary) = events[3].clone() {
1569            let variation_key = VariationKey {
1570                version: Some(1),
1571                variation: Some(1),
1572            };
1573            let feature = event_summary.features.get("myFlag");
1574            assert!(feature.is_some());
1575
1576            let feature = feature.unwrap();
1577            assert!(feature.counters.contains_key(&variation_key));
1578
1579            let variation_key = VariationKey {
1580                version: Some(1),
1581                variation: Some(1),
1582            };
1583            let feature = event_summary.features.get("prereqFlag");
1584            assert!(feature.is_some());
1585
1586            let feature = feature.unwrap();
1587            assert!(feature.counters.contains_key(&variation_key));
1588        }
1589    }
1590
1591    #[tokio::test]
1592    async fn variation_handles_failed_prereqs_correctly() {
1593        let td = TestData::new();
1594        let mut prereq_flag = basic_off_flag("prereqFlag");
1595        prereq_flag.track_events = true;
1596        td.use_preconfigured_flag(prereq_flag);
1597
1598        let mut main_flag = basic_flag_with_prereq("myFlag", "prereqFlag");
1599        main_flag.track_events = true;
1600        td.use_preconfigured_flag(main_flag);
1601
1602        let (client, event_rx) = make_client_with_test_data(&td);
1603        client.start_with_default_executor();
1604
1605        let context = ContextBuilder::new("bob")
1606            .build()
1607            .expect("Failed to create context");
1608
1609        let detail = client.variation(&context, "myFlag", FlagValue::Bool(false));
1610
1611        assert!(!detail.as_bool().unwrap());
1612        client.flush();
1613        client.close();
1614
1615        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1616        assert_eq!(events.len(), 4);
1617        assert_eq!(events[0].kind(), "index");
1618        assert_eq!(events[1].kind(), "feature");
1619        assert_eq!(events[2].kind(), "feature");
1620        assert_eq!(events[3].kind(), "summary");
1621
1622        if let OutputEvent::Summary(event_summary) = events[3].clone() {
1623            let variation_key = VariationKey {
1624                version: Some(1),
1625                variation: Some(0),
1626            };
1627            let feature = event_summary.features.get("myFlag");
1628            assert!(feature.is_some());
1629
1630            let feature = feature.unwrap();
1631            assert!(feature.counters.contains_key(&variation_key));
1632
1633            let variation_key = VariationKey {
1634                version: Some(1),
1635                variation: None,
1636            };
1637            let feature = event_summary.features.get("prereqFlag");
1638            assert!(feature.is_some());
1639
1640            let feature = feature.unwrap();
1641            assert!(feature.counters.contains_key(&variation_key));
1642        }
1643    }
1644
1645    #[test]
1646    fn variation_detail_handles_flag_not_found() {
1647        let (client, event_rx) = make_mocked_client();
1648        client.start_with_default_executor();
1649
1650        let context = ContextBuilder::new("bob")
1651            .build()
1652            .expect("Failed to create context");
1653        let detail = client.variation_detail(&context, "non-existent-flag", FlagValue::Bool(false));
1654
1655        assert!(!detail.value.unwrap().as_bool().unwrap());
1656        assert!(matches!(
1657            detail.reason,
1658            Reason::Error {
1659                error: eval::Error::FlagNotFound
1660            }
1661        ));
1662        client.flush();
1663        client.close();
1664
1665        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1666        assert_eq!(events.len(), 2);
1667        assert_eq!(events[0].kind(), "index");
1668        assert_eq!(events[1].kind(), "summary");
1669
1670        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1671            let variation_key = VariationKey {
1672                version: None,
1673                variation: None,
1674            };
1675            let feature = event_summary.features.get("non-existent-flag");
1676            assert!(feature.is_some());
1677
1678            let feature = feature.unwrap();
1679            assert!(feature.counters.contains_key(&variation_key));
1680        } else {
1681            panic!("Event should be a summary type");
1682        }
1683    }
1684
1685    #[tokio::test]
1686    async fn variation_detail_handles_client_not_ready() {
1687        let (client, event_rx) = make_mocked_client_with_delay(u64::MAX, false, false);
1688        client.start_with_default_executor();
1689        let context = ContextBuilder::new("bob")
1690            .build()
1691            .expect("Failed to create context");
1692
1693        let detail = client.variation_detail(&context, "non-existent-flag", FlagValue::Bool(false));
1694
1695        assert!(!detail.value.unwrap().as_bool().unwrap());
1696        assert!(matches!(
1697            detail.reason,
1698            Reason::Error {
1699                error: eval::Error::ClientNotReady
1700            }
1701        ));
1702        client.flush();
1703        client.close();
1704
1705        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1706        assert_eq!(events.len(), 2);
1707        assert_eq!(events[0].kind(), "index");
1708        assert_eq!(events[1].kind(), "summary");
1709
1710        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1711            let variation_key = VariationKey {
1712                version: None,
1713                variation: None,
1714            };
1715            let feature = event_summary.features.get("non-existent-flag");
1716            assert!(feature.is_some());
1717
1718            let feature = feature.unwrap();
1719            assert!(feature.counters.contains_key(&variation_key));
1720        } else {
1721            panic!("Event should be a summary type");
1722        }
1723    }
1724
1725    #[test]
1726    fn identify_sends_identify_event() {
1727        let (client, event_rx) = make_mocked_client();
1728        client.start_with_default_executor();
1729
1730        let context = ContextBuilder::new("bob")
1731            .build()
1732            .expect("Failed to create context");
1733
1734        client.identify(context);
1735        client.flush();
1736        client.close();
1737
1738        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1739        assert_eq!(events.len(), 1);
1740        assert_eq!(events[0].kind(), "identify");
1741    }
1742
1743    #[test]
1744    fn identify_sends_sends_nothing_in_offline_mode() {
1745        let (client, event_rx) = make_mocked_offline_client();
1746        client.start_with_default_executor();
1747
1748        let context = ContextBuilder::new("bob")
1749            .build()
1750            .expect("Failed to create context");
1751
1752        client.identify(context);
1753        client.flush();
1754        client.close();
1755
1756        assert_eq!(event_rx.iter().count(), 0);
1757    }
1758
1759    #[test]
1760    #[cfg(any(feature = "crypto-aws-lc-rs", feature = "crypto-openssl"))]
1761    fn secure_mode_hash() {
1762        let config = ConfigBuilder::new("secret")
1763            .offline(true)
1764            .build()
1765            .expect("config should build");
1766        let client = Client::build(config).expect("Should be built.");
1767        let context = ContextBuilder::new("Message")
1768            .build()
1769            .expect("Failed to create context");
1770
1771        assert_eq!(
1772            client
1773                .secure_mode_hash(&context)
1774                .expect("Hash should be computed"),
1775            "aa747c502a898200f9e4fa21bac68136f886a0e27aec70ba06daf2e2a5cb5597"
1776        );
1777    }
1778
1779    #[test]
1780    #[cfg(any(feature = "crypto-aws-lc-rs", feature = "crypto-openssl"))]
1781    fn secure_mode_hash_with_multi_kind() {
1782        let config = ConfigBuilder::new("secret")
1783            .offline(true)
1784            .build()
1785            .expect("config should build");
1786        let client = Client::build(config).expect("Should be built.");
1787
1788        let org = ContextBuilder::new("org-key|1")
1789            .kind("org")
1790            .build()
1791            .expect("Failed to create context");
1792        let user = ContextBuilder::new("user-key:2")
1793            .build()
1794            .expect("Failed to create context");
1795
1796        let context = MultiContextBuilder::new()
1797            .add_context(org)
1798            .add_context(user)
1799            .build()
1800            .expect("failed to build multi-context");
1801
1802        assert_eq!(
1803            client
1804                .secure_mode_hash(&context)
1805                .expect("Hash should be computed"),
1806            "5687e6383b920582ed50c2a96c98a115f1b6aad85a60579d761d9b8797415163"
1807        );
1808    }
1809
1810    #[derive(Serialize)]
1811    struct MyCustomData {
1812        pub answer: u32,
1813    }
1814
1815    #[test]
1816    fn track_sends_track_and_index_events() -> serde_json::Result<()> {
1817        let (client, event_rx) = make_mocked_client();
1818        client.start_with_default_executor();
1819
1820        let context = ContextBuilder::new("bob")
1821            .build()
1822            .expect("Failed to create context");
1823
1824        client.track_event(context.clone(), "event-with-null");
1825        client.track_data(context.clone(), "event-with-string", "string-data")?;
1826        client.track_data(context.clone(), "event-with-json", json!({"answer": 42}))?;
1827        client.track_data(
1828            context.clone(),
1829            "event-with-struct",
1830            MyCustomData { answer: 42 },
1831        )?;
1832        client.track_metric(context, "event-with-metric", 42.0, serde_json::Value::Null);
1833
1834        client.flush();
1835        client.close();
1836
1837        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1838        assert_eq!(events.len(), 6);
1839
1840        let mut events_by_type: HashMap<&str, usize> = HashMap::new();
1841        for event in events {
1842            if let Some(count) = events_by_type.get_mut(event.kind()) {
1843                *count += 1;
1844            } else {
1845                events_by_type.insert(event.kind(), 1);
1846            }
1847        }
1848        assert!(matches!(events_by_type.get("index"), Some(1)));
1849        assert!(matches!(events_by_type.get("custom"), Some(5)));
1850
1851        Ok(())
1852    }
1853
1854    #[test]
1855    fn track_sends_nothing_in_offline_mode() -> serde_json::Result<()> {
1856        let (client, event_rx) = make_mocked_offline_client();
1857        client.start_with_default_executor();
1858
1859        let context = ContextBuilder::new("bob")
1860            .build()
1861            .expect("Failed to create context");
1862
1863        client.track_event(context.clone(), "event-with-null");
1864        client.track_data(context.clone(), "event-with-string", "string-data")?;
1865        client.track_data(context.clone(), "event-with-json", json!({"answer": 42}))?;
1866        client.track_data(
1867            context.clone(),
1868            "event-with-struct",
1869            MyCustomData { answer: 42 },
1870        )?;
1871        client.track_metric(context, "event-with-metric", 42.0, serde_json::Value::Null);
1872
1873        client.flush();
1874        client.close();
1875
1876        assert_eq!(event_rx.iter().count(), 0);
1877
1878        Ok(())
1879    }
1880
1881    #[test]
1882    fn migration_handles_flag_not_found() {
1883        let (client, _event_rx) = make_mocked_client();
1884        client.start_with_default_executor();
1885
1886        let context = ContextBuilder::new("bob")
1887            .build()
1888            .expect("Failed to create context");
1889
1890        let (stage, _tracker) =
1891            client.migration_variation(&context, "non-existent-flag-key", Stage::Off);
1892
1893        assert_eq!(stage, Stage::Off);
1894    }
1895
1896    #[tokio::test]
1897    async fn migration_uses_non_migration_flag() {
1898        let td = TestData::new();
1899        td.use_preconfigured_flag(basic_flag("boolean-flag"));
1900        let (client, _event_rx) = make_client_with_test_data(&td);
1901        client.start_with_default_executor();
1902
1903        let context = ContextBuilder::new("bob")
1904            .build()
1905            .expect("Failed to create context");
1906
1907        let (stage, _tracker) = client.migration_variation(&context, "boolean-flag", Stage::Off);
1908
1909        assert_eq!(stage, Stage::Off);
1910    }
1911
1912    #[test_case(Stage::Off)]
1913    #[test_case(Stage::DualWrite)]
1914    #[test_case(Stage::Shadow)]
1915    #[test_case(Stage::Live)]
1916    #[test_case(Stage::Rampdown)]
1917    #[test_case(Stage::Complete)]
1918    #[tokio::test]
1919    async fn migration_can_determine_correct_stage_from_flag(stage: Stage) {
1920        let td = TestData::new();
1921        td.use_preconfigured_flag(basic_migration_flag("stage-flag", stage));
1922        let (client, _event_rx) = make_client_with_test_data(&td);
1923        client.start_with_default_executor();
1924
1925        let context = ContextBuilder::new("bob")
1926            .build()
1927            .expect("Failed to create context");
1928
1929        let (evaluated_stage, _tracker) =
1930            client.migration_variation(&context, "stage-flag", Stage::Off);
1931
1932        assert_eq!(evaluated_stage, stage);
1933    }
1934
1935    #[tokio::test]
1936    async fn migration_tracks_invoked_correctly() {
1937        migration_tracks_invoked_correctly_driver(Stage::Off, Operation::Read, vec![Origin::Old])
1938            .await;
1939        migration_tracks_invoked_correctly_driver(
1940            Stage::DualWrite,
1941            Operation::Read,
1942            vec![Origin::Old],
1943        )
1944        .await;
1945        migration_tracks_invoked_correctly_driver(
1946            Stage::Shadow,
1947            Operation::Read,
1948            vec![Origin::Old, Origin::New],
1949        )
1950        .await;
1951        migration_tracks_invoked_correctly_driver(
1952            Stage::Live,
1953            Operation::Read,
1954            vec![Origin::Old, Origin::New],
1955        )
1956        .await;
1957        migration_tracks_invoked_correctly_driver(
1958            Stage::Rampdown,
1959            Operation::Read,
1960            vec![Origin::New],
1961        )
1962        .await;
1963        migration_tracks_invoked_correctly_driver(
1964            Stage::Complete,
1965            Operation::Read,
1966            vec![Origin::New],
1967        )
1968        .await;
1969        migration_tracks_invoked_correctly_driver(Stage::Off, Operation::Write, vec![Origin::Old])
1970            .await;
1971        migration_tracks_invoked_correctly_driver(
1972            Stage::DualWrite,
1973            Operation::Write,
1974            vec![Origin::Old, Origin::New],
1975        )
1976        .await;
1977        migration_tracks_invoked_correctly_driver(
1978            Stage::Shadow,
1979            Operation::Write,
1980            vec![Origin::Old, Origin::New],
1981        )
1982        .await;
1983        migration_tracks_invoked_correctly_driver(
1984            Stage::Live,
1985            Operation::Write,
1986            vec![Origin::Old, Origin::New],
1987        )
1988        .await;
1989        migration_tracks_invoked_correctly_driver(
1990            Stage::Rampdown,
1991            Operation::Write,
1992            vec![Origin::Old, Origin::New],
1993        )
1994        .await;
1995        migration_tracks_invoked_correctly_driver(
1996            Stage::Complete,
1997            Operation::Write,
1998            vec![Origin::New],
1999        )
2000        .await;
2001    }
2002
2003    async fn migration_tracks_invoked_correctly_driver(
2004        stage: Stage,
2005        operation: Operation,
2006        origins: Vec<Origin>,
2007    ) {
2008        let td = TestData::new();
2009        td.use_preconfigured_flag(basic_migration_flag("stage-flag", stage));
2010        let (client, event_rx) = make_client_with_test_data(&td);
2011        let client = Arc::new(client);
2012        client.start_with_default_executor();
2013
2014        let mut migrator = MigratorBuilder::new(client.clone())
2015            .read(
2016                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2017                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2018                Some(|_, _| true),
2019            )
2020            .write(
2021                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2022                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2023            )
2024            .build()
2025            .expect("migrator should build");
2026
2027        let context = ContextBuilder::new("bob")
2028            .build()
2029            .expect("Failed to create context");
2030
2031        if let Operation::Read = operation {
2032            migrator
2033                .read(
2034                    &context,
2035                    "stage-flag".into(),
2036                    Stage::Off,
2037                    serde_json::Value::Null,
2038                )
2039                .await;
2040        } else {
2041            migrator
2042                .write(
2043                    &context,
2044                    "stage-flag".into(),
2045                    Stage::Off,
2046                    serde_json::Value::Null,
2047                )
2048                .await;
2049        }
2050
2051        client.flush();
2052        client.close();
2053
2054        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2055        assert_eq!(events.len(), 3);
2056        match &events[1] {
2057            OutputEvent::MigrationOp(event) => {
2058                assert!(event.invoked.len() == origins.len());
2059                assert!(event.invoked.iter().all(|i| origins.contains(i)));
2060            }
2061            _ => panic!("Expected migration event"),
2062        }
2063    }
2064
2065    #[tokio::test]
2066    async fn migration_tracks_latency() {
2067        migration_tracks_latency_driver(Stage::Off, Operation::Read, vec![Origin::Old]).await;
2068        migration_tracks_latency_driver(Stage::DualWrite, Operation::Read, vec![Origin::Old]).await;
2069        migration_tracks_latency_driver(
2070            Stage::Shadow,
2071            Operation::Read,
2072            vec![Origin::Old, Origin::New],
2073        )
2074        .await;
2075        migration_tracks_latency_driver(
2076            Stage::Live,
2077            Operation::Read,
2078            vec![Origin::Old, Origin::New],
2079        )
2080        .await;
2081        migration_tracks_latency_driver(Stage::Rampdown, Operation::Read, vec![Origin::New]).await;
2082        migration_tracks_latency_driver(Stage::Complete, Operation::Read, vec![Origin::New]).await;
2083        migration_tracks_latency_driver(Stage::Off, Operation::Write, vec![Origin::Old]).await;
2084        migration_tracks_latency_driver(
2085            Stage::DualWrite,
2086            Operation::Write,
2087            vec![Origin::Old, Origin::New],
2088        )
2089        .await;
2090        migration_tracks_latency_driver(
2091            Stage::Shadow,
2092            Operation::Write,
2093            vec![Origin::Old, Origin::New],
2094        )
2095        .await;
2096        migration_tracks_latency_driver(
2097            Stage::Live,
2098            Operation::Write,
2099            vec![Origin::Old, Origin::New],
2100        )
2101        .await;
2102        migration_tracks_latency_driver(
2103            Stage::Rampdown,
2104            Operation::Write,
2105            vec![Origin::Old, Origin::New],
2106        )
2107        .await;
2108        migration_tracks_latency_driver(Stage::Complete, Operation::Write, vec![Origin::New]).await;
2109    }
2110
2111    async fn migration_tracks_latency_driver(
2112        stage: Stage,
2113        operation: Operation,
2114        origins: Vec<Origin>,
2115    ) {
2116        let td = TestData::new();
2117        td.use_preconfigured_flag(basic_migration_flag("stage-flag", stage));
2118        let (client, event_rx) = make_client_with_test_data(&td);
2119        let client = Arc::new(client);
2120        client.start_with_default_executor();
2121
2122        let mut migrator = MigratorBuilder::new(client.clone())
2123            .track_latency(true)
2124            .read(
2125                |_| {
2126                    async move {
2127                        async_std::task::sleep(Duration::from_millis(100)).await;
2128                        Ok(serde_json::Value::Null)
2129                    }
2130                    .boxed()
2131                },
2132                |_| {
2133                    async move {
2134                        async_std::task::sleep(Duration::from_millis(100)).await;
2135                        Ok(serde_json::Value::Null)
2136                    }
2137                    .boxed()
2138                },
2139                Some(|_, _| true),
2140            )
2141            .write(
2142                |_| {
2143                    async move {
2144                        async_std::task::sleep(Duration::from_millis(100)).await;
2145                        Ok(serde_json::Value::Null)
2146                    }
2147                    .boxed()
2148                },
2149                |_| {
2150                    async move {
2151                        async_std::task::sleep(Duration::from_millis(100)).await;
2152                        Ok(serde_json::Value::Null)
2153                    }
2154                    .boxed()
2155                },
2156            )
2157            .build()
2158            .expect("migrator should build");
2159
2160        let context = ContextBuilder::new("bob")
2161            .build()
2162            .expect("Failed to create context");
2163
2164        if let Operation::Read = operation {
2165            migrator
2166                .read(
2167                    &context,
2168                    "stage-flag".into(),
2169                    Stage::Off,
2170                    serde_json::Value::Null,
2171                )
2172                .await;
2173        } else {
2174            migrator
2175                .write(
2176                    &context,
2177                    "stage-flag".into(),
2178                    Stage::Off,
2179                    serde_json::Value::Null,
2180                )
2181                .await;
2182        }
2183
2184        client.flush();
2185        client.close();
2186
2187        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2188        assert_eq!(events.len(), 3);
2189        match &events[1] {
2190            OutputEvent::MigrationOp(event) => {
2191                assert!(event.latency.len() == origins.len());
2192                assert!(event
2193                    .latency
2194                    .values()
2195                    .all(|l| l > &Duration::from_millis(100)));
2196            }
2197            _ => panic!("Expected migration event"),
2198        }
2199    }
2200
2201    #[tokio::test]
2202    async fn migration_tracks_read_errors() {
2203        migration_tracks_read_errors_driver(Stage::Off, vec![Origin::Old]).await;
2204        migration_tracks_read_errors_driver(Stage::DualWrite, vec![Origin::Old]).await;
2205        migration_tracks_read_errors_driver(Stage::Shadow, vec![Origin::Old, Origin::New]).await;
2206        migration_tracks_read_errors_driver(Stage::Live, vec![Origin::Old, Origin::New]).await;
2207        migration_tracks_read_errors_driver(Stage::Rampdown, vec![Origin::New]).await;
2208        migration_tracks_read_errors_driver(Stage::Complete, vec![Origin::New]).await;
2209    }
2210
2211    async fn migration_tracks_read_errors_driver(stage: Stage, origins: Vec<Origin>) {
2212        let td = TestData::new();
2213        td.use_preconfigured_flag(basic_migration_flag("stage-flag", stage));
2214        let (client, event_rx) = make_client_with_test_data(&td);
2215        let client = Arc::new(client);
2216        client.start_with_default_executor();
2217
2218        let mut migrator = MigratorBuilder::new(client.clone())
2219            .track_latency(true)
2220            .read(
2221                |_| async move { Err("fail".into()) }.boxed(),
2222                |_| async move { Err("fail".into()) }.boxed(),
2223                Some(|_: &String, _: &String| true),
2224            )
2225            .write(
2226                |_| async move { Err("fail".into()) }.boxed(),
2227                |_| async move { Err("fail".into()) }.boxed(),
2228            )
2229            .build()
2230            .expect("migrator should build");
2231
2232        let context = ContextBuilder::new("bob")
2233            .build()
2234            .expect("Failed to create context");
2235
2236        migrator
2237            .read(
2238                &context,
2239                "stage-flag".into(),
2240                Stage::Off,
2241                serde_json::Value::Null,
2242            )
2243            .await;
2244        client.flush();
2245        client.close();
2246
2247        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2248        assert_eq!(events.len(), 3);
2249        match &events[1] {
2250            OutputEvent::MigrationOp(event) => {
2251                assert!(event.errors.len() == origins.len());
2252                assert!(event.errors.iter().all(|i| origins.contains(i)));
2253            }
2254            _ => panic!("Expected migration event"),
2255        }
2256    }
2257
2258    #[tokio::test]
2259    async fn migration_tracks_authoritative_write_errors() {
2260        migration_tracks_authoritative_write_errors_driver(Stage::Off, vec![Origin::Old]).await;
2261        migration_tracks_authoritative_write_errors_driver(Stage::DualWrite, vec![Origin::Old])
2262            .await;
2263        migration_tracks_authoritative_write_errors_driver(Stage::Shadow, vec![Origin::Old]).await;
2264        migration_tracks_authoritative_write_errors_driver(Stage::Live, vec![Origin::New]).await;
2265        migration_tracks_authoritative_write_errors_driver(Stage::Rampdown, vec![Origin::New])
2266            .await;
2267        migration_tracks_authoritative_write_errors_driver(Stage::Complete, vec![Origin::New])
2268            .await;
2269    }
2270
2271    async fn migration_tracks_authoritative_write_errors_driver(
2272        stage: Stage,
2273        origins: Vec<Origin>,
2274    ) {
2275        let td = TestData::new();
2276        td.use_preconfigured_flag(basic_migration_flag("stage-flag", stage));
2277        let (client, event_rx) = make_client_with_test_data(&td);
2278        let client = Arc::new(client);
2279        client.start_with_default_executor();
2280
2281        let mut migrator = MigratorBuilder::new(client.clone())
2282            .track_latency(true)
2283            .read(
2284                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2285                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2286                None,
2287            )
2288            .write(
2289                |_| async move { Err("fail".into()) }.boxed(),
2290                |_| async move { Err("fail".into()) }.boxed(),
2291            )
2292            .build()
2293            .expect("migrator should build");
2294
2295        let context = ContextBuilder::new("bob")
2296            .build()
2297            .expect("Failed to create context");
2298
2299        migrator
2300            .write(
2301                &context,
2302                "stage-flag".into(),
2303                Stage::Off,
2304                serde_json::Value::Null,
2305            )
2306            .await;
2307
2308        client.flush();
2309        client.close();
2310
2311        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2312        assert_eq!(events.len(), 3);
2313        match &events[1] {
2314            OutputEvent::MigrationOp(event) => {
2315                assert!(event.errors.len() == origins.len());
2316                assert!(event.errors.iter().all(|i| origins.contains(i)));
2317            }
2318            _ => panic!("Expected migration event"),
2319        }
2320    }
2321
2322    #[tokio::test]
2323    async fn migration_tracks_nonauthoritative_write_errors() {
2324        migration_tracks_nonauthoritative_write_errors_driver(
2325            Stage::DualWrite,
2326            false,
2327            true,
2328            vec![Origin::New],
2329        )
2330        .await;
2331        migration_tracks_nonauthoritative_write_errors_driver(
2332            Stage::Shadow,
2333            false,
2334            true,
2335            vec![Origin::New],
2336        )
2337        .await;
2338        migration_tracks_nonauthoritative_write_errors_driver(
2339            Stage::Live,
2340            true,
2341            false,
2342            vec![Origin::Old],
2343        )
2344        .await;
2345        migration_tracks_nonauthoritative_write_errors_driver(
2346            Stage::Rampdown,
2347            true,
2348            false,
2349            vec![Origin::Old],
2350        )
2351        .await;
2352    }
2353
2354    async fn migration_tracks_nonauthoritative_write_errors_driver(
2355        stage: Stage,
2356        fail_old: bool,
2357        fail_new: bool,
2358        origins: Vec<Origin>,
2359    ) {
2360        let td = TestData::new();
2361        td.use_preconfigured_flag(basic_migration_flag("stage-flag", stage));
2362        let (client, event_rx) = make_client_with_test_data(&td);
2363        let client = Arc::new(client);
2364        client.start_with_default_executor();
2365
2366        let mut migrator = MigratorBuilder::new(client.clone())
2367            .track_latency(true)
2368            .read(
2369                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2370                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2371                None,
2372            )
2373            .write(
2374                move |_| {
2375                    async move {
2376                        if fail_old {
2377                            Err("fail".into())
2378                        } else {
2379                            Ok(serde_json::Value::Null)
2380                        }
2381                    }
2382                    .boxed()
2383                },
2384                move |_| {
2385                    async move {
2386                        if fail_new {
2387                            Err("fail".into())
2388                        } else {
2389                            Ok(serde_json::Value::Null)
2390                        }
2391                    }
2392                    .boxed()
2393                },
2394            )
2395            .build()
2396            .expect("migrator should build");
2397
2398        let context = ContextBuilder::new("bob")
2399            .build()
2400            .expect("Failed to create context");
2401
2402        migrator
2403            .write(
2404                &context,
2405                "stage-flag".into(),
2406                Stage::Off,
2407                serde_json::Value::Null,
2408            )
2409            .await;
2410
2411        client.flush();
2412        client.close();
2413
2414        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2415        assert_eq!(events.len(), 3);
2416        match &events[1] {
2417            OutputEvent::MigrationOp(event) => {
2418                assert!(event.errors.len() == origins.len());
2419                assert!(event.errors.iter().all(|i| origins.contains(i)));
2420            }
2421            _ => panic!("Expected migration event"),
2422        }
2423    }
2424
2425    #[tokio::test]
2426    async fn migration_tracks_consistency() {
2427        migration_tracks_consistency_driver(Stage::Shadow, "same", "same", true).await;
2428        migration_tracks_consistency_driver(Stage::Shadow, "same", "different", false).await;
2429        migration_tracks_consistency_driver(Stage::Live, "same", "same", true).await;
2430        migration_tracks_consistency_driver(Stage::Live, "same", "different", false).await;
2431    }
2432
2433    async fn migration_tracks_consistency_driver(
2434        stage: Stage,
2435        old_return: &'static str,
2436        new_return: &'static str,
2437        expected_consistency: bool,
2438    ) {
2439        let td = TestData::new();
2440        td.use_preconfigured_flag(basic_migration_flag("stage-flag", stage));
2441        let (client, event_rx) = make_client_with_test_data(&td);
2442        let client = Arc::new(client);
2443        client.start_with_default_executor();
2444
2445        let mut migrator = MigratorBuilder::new(client.clone())
2446            .track_latency(true)
2447            .read(
2448                |_| {
2449                    async move {
2450                        async_std::task::sleep(Duration::from_millis(100)).await;
2451                        Ok(serde_json::Value::String(old_return.to_string()))
2452                    }
2453                    .boxed()
2454                },
2455                |_| {
2456                    async move {
2457                        async_std::task::sleep(Duration::from_millis(100)).await;
2458                        Ok(serde_json::Value::String(new_return.to_string()))
2459                    }
2460                    .boxed()
2461                },
2462                Some(|lhs, rhs| lhs == rhs),
2463            )
2464            .write(
2465                |_| {
2466                    async move {
2467                        async_std::task::sleep(Duration::from_millis(100)).await;
2468                        Ok(serde_json::Value::Null)
2469                    }
2470                    .boxed()
2471                },
2472                |_| {
2473                    async move {
2474                        async_std::task::sleep(Duration::from_millis(100)).await;
2475                        Ok(serde_json::Value::Null)
2476                    }
2477                    .boxed()
2478                },
2479            )
2480            .build()
2481            .expect("migrator should build");
2482
2483        let context = ContextBuilder::new("bob")
2484            .build()
2485            .expect("Failed to create context");
2486
2487        migrator
2488            .read(
2489                &context,
2490                "stage-flag".into(),
2491                Stage::Off,
2492                serde_json::Value::Null,
2493            )
2494            .await;
2495
2496        client.flush();
2497        client.close();
2498
2499        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2500        assert_eq!(events.len(), 3);
2501        match &events[1] {
2502            OutputEvent::MigrationOp(event) => {
2503                assert!(event.consistency_check == Some(expected_consistency))
2504            }
2505            _ => panic!("Expected migration event"),
2506        }
2507    }
2508
2509    #[tokio::test]
2510    async fn client_flush_blocking_completes_successfully() {
2511        let (client, event_rx) = make_mocked_client();
2512        client.start_with_default_executor();
2513        client.wait_for_initialization(Duration::from_secs(1)).await;
2514
2515        let context = ContextBuilder::new("user-key")
2516            .build()
2517            .expect("Failed to create context");
2518
2519        client.identify(context);
2520
2521        let result = client.flush_blocking(Duration::from_secs(5)).await;
2522        assert!(result, "flush_blocking should complete successfully");
2523
2524        client.close();
2525
2526        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2527        assert!(!events.is_empty(), "Should have received identify event");
2528    }
2529
2530    #[tokio::test]
2531    async fn client_flush_blocking_with_zero_timeout() {
2532        let (client, event_rx) = make_mocked_client();
2533        client.start_with_default_executor();
2534        client.wait_for_initialization(Duration::from_secs(1)).await;
2535
2536        let context = ContextBuilder::new("user-key")
2537            .build()
2538            .expect("Failed to create context");
2539
2540        client.identify(context);
2541
2542        let result = client.flush_blocking(Duration::ZERO).await;
2543        assert!(
2544            result,
2545            "flush_blocking with zero timeout should complete successfully"
2546        );
2547
2548        client.close();
2549
2550        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2551        assert!(!events.is_empty(), "Should have received identify event");
2552    }
2553
2554    #[tokio::test]
2555    async fn client_flush_blocking_with_no_events() {
2556        let (client, _event_rx) = make_mocked_client();
2557        client.start_with_default_executor();
2558        client.wait_for_initialization(Duration::from_secs(1)).await;
2559
2560        let result = client.flush_blocking(Duration::from_secs(1)).await;
2561        assert!(
2562            result,
2563            "flush_blocking with no events should complete immediately"
2564        );
2565
2566        client.close();
2567    }
2568
2569    #[tokio::test]
2570    async fn client_flush_blocking_multiple_concurrent_calls() {
2571        let (client, event_rx) = make_mocked_client();
2572        client.start_with_default_executor();
2573        client.wait_for_initialization(Duration::from_secs(1)).await;
2574
2575        let context = ContextBuilder::new("user-key")
2576            .build()
2577            .expect("Failed to create context");
2578
2579        client.identify(context);
2580
2581        // Make multiple concurrent flush_blocking calls
2582        let (result1, result2, result3) = tokio::join!(
2583            client.flush_blocking(Duration::from_secs(5)),
2584            client.flush_blocking(Duration::from_secs(5)),
2585            client.flush_blocking(Duration::from_secs(5)),
2586        );
2587
2588        assert!(result1, "First flush_blocking should succeed");
2589        assert!(result2, "Second flush_blocking should succeed");
2590        assert!(result3, "Third flush_blocking should succeed");
2591
2592        client.close();
2593
2594        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2595        assert!(!events.is_empty(), "Should have received identify event");
2596    }
2597
2598    fn make_mocked_client_with_delay(
2599        delay: u64,
2600        offline: bool,
2601        daemon_mode: bool,
2602    ) -> (Client, Receiver<OutputEvent>) {
2603        let updates = Arc::new(MockDataSource::new_with_init_delay(delay));
2604        let (event_sender, event_rx) = create_event_sender();
2605
2606        let config = ConfigBuilder::new("sdk-key")
2607            .offline(offline)
2608            .daemon_mode(daemon_mode)
2609            .data_source(MockDataSourceBuilder::new().data_source(updates))
2610            .event_processor(
2611                EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new()
2612                    .event_sender(Arc::new(event_sender)),
2613            )
2614            .build()
2615            .expect("config should build");
2616
2617        let client = Client::build(config).expect("Should be built.");
2618
2619        (client, event_rx)
2620    }
2621
2622    fn make_mocked_offline_client() -> (Client, Receiver<OutputEvent>) {
2623        make_mocked_client_with_delay(0, true, false)
2624    }
2625
2626    fn make_mocked_client() -> (Client, Receiver<OutputEvent>) {
2627        make_mocked_client_with_delay(0, false, false)
2628    }
2629
2630    fn make_client_with_test_data(td: &TestData) -> (Client, Receiver<OutputEvent>) {
2631        let (event_sender, event_rx) = create_event_sender();
2632        let config = ConfigBuilder::new("sdk-key")
2633            .data_source(td)
2634            .event_processor(
2635                EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new()
2636                    .event_sender(Arc::new(event_sender)),
2637            )
2638            .build()
2639            .expect("config should build");
2640        let client = Client::build(config).expect("Should be built.");
2641        (client, event_rx)
2642    }
2643
2644    #[test]
2645    fn client_builds_successfully() {
2646        let config = ConfigBuilder::new("sdk-key")
2647            .offline(true)
2648            .build()
2649            .expect("config should build");
2650
2651        let client = Client::build(config).expect("client should build successfully");
2652
2653        assert!(
2654            !client.started.load(Ordering::SeqCst),
2655            "client should not be started yet"
2656        );
2657        assert!(client.offline, "client should be in offline mode");
2658        assert_eq!(client.sdk_key, "sdk-key", "sdk_key should match");
2659    }
2660}