Skip to main content

launchdarkly_server_sdk/
client.rs

1use eval::Context;
2use parking_lot::RwLock;
3use std::io;
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7use tokio::runtime::Runtime;
8
9use launchdarkly_server_sdk_evaluation::{self as eval, Detail, FlagValue, PrerequisiteEvent};
10use serde::Serialize;
11use thiserror::Error;
12use tokio::sync::{broadcast, Semaphore};
13
14use super::config::Config;
15use super::data_source::DataSource;
16use super::data_source_builders::BuildError as DataSourceError;
17use super::evaluation::{FlagDetail, FlagDetailConfig};
18use super::stores::store::DataStore;
19use super::stores::store_builders::BuildError as DataStoreError;
20use crate::config::BuildError as ConfigBuildError;
21use crate::events::event::EventFactory;
22use crate::events::event::InputEvent;
23use crate::events::processor::EventProcessor;
24use crate::events::processor_builders::BuildError as EventProcessorError;
25use crate::{MigrationOpTracker, Stage};
26
27struct EventsScope {
28    disabled: bool,
29    event_factory: EventFactory,
30    prerequisite_event_recorder: Box<dyn eval::PrerequisiteEventRecorder + Send + Sync>,
31}
32
33struct PrerequisiteEventRecorder {
34    event_factory: EventFactory,
35    event_processor: Arc<dyn EventProcessor>,
36}
37
38impl eval::PrerequisiteEventRecorder for PrerequisiteEventRecorder {
39    fn record(&self, event: PrerequisiteEvent) {
40        let evt = self.event_factory.new_eval_event(
41            &event.prerequisite_flag.key,
42            event.context.clone(),
43            &event.prerequisite_flag,
44            event.prerequisite_result,
45            FlagValue::Json(serde_json::Value::Null),
46            Some(event.target_flag_key),
47        );
48
49        self.event_processor.send(evt);
50    }
51}
52
53/// Error type used to represent failures when building a [Client] instance.
54#[non_exhaustive]
55#[derive(Debug, Error)]
56pub enum BuildError {
57    /// Error used when a configuration setting is invalid. This typically indicates an invalid URL.
58    #[error("invalid client config: {0}")]
59    InvalidConfig(String),
60}
61
62impl From<DataSourceError> for BuildError {
63    fn from(error: DataSourceError) -> Self {
64        Self::InvalidConfig(error.to_string())
65    }
66}
67
68impl From<DataStoreError> for BuildError {
69    fn from(error: DataStoreError) -> Self {
70        Self::InvalidConfig(error.to_string())
71    }
72}
73
74impl From<EventProcessorError> for BuildError {
75    fn from(error: EventProcessorError) -> Self {
76        Self::InvalidConfig(error.to_string())
77    }
78}
79
80impl From<ConfigBuildError> for BuildError {
81    fn from(error: ConfigBuildError) -> Self {
82        Self::InvalidConfig(error.to_string())
83    }
84}
85
86/// Error type used to represent failures when starting the [Client].
87#[non_exhaustive]
88#[derive(Debug, Error)]
89pub enum StartError {
90    /// Error used when spawning a background there fails.
91    #[error("couldn't spawn background thread for client: {0}")]
92    SpawnFailed(io::Error),
93}
94
95#[derive(PartialEq, Copy, Clone, Debug)]
96enum ClientInitState {
97    Initializing = 0,
98    Initialized = 1,
99    InitializationFailed = 2,
100}
101
102impl PartialEq<usize> for ClientInitState {
103    fn eq(&self, other: &usize) -> bool {
104        *self as usize == *other
105    }
106}
107
108impl From<usize> for ClientInitState {
109    fn from(val: usize) -> Self {
110        match val {
111            0 => ClientInitState::Initializing,
112            1 => ClientInitState::Initialized,
113            2 => ClientInitState::InitializationFailed,
114            _ => unreachable!(),
115        }
116    }
117}
118
119/// A client for the LaunchDarkly API.
120///
121/// In order to create a client instance, first create a config using [crate::ConfigBuilder].
122///
123/// # Examples
124///
125/// Creating a client, with default configuration.
126/// ```
127/// # use launchdarkly_server_sdk::{Client, ConfigBuilder, BuildError};
128/// # fn main() -> Result<(), BuildError> {
129///     let ld_client = Client::build(ConfigBuilder::new("sdk-key").build()?)?;
130/// #   Ok(())
131/// # }
132/// ```
133///
134/// Creating an instance which connects to a relay proxy.
135/// ```
136/// # use launchdarkly_server_sdk::{Client, ConfigBuilder, ServiceEndpointsBuilder, BuildError};
137/// # fn main() -> Result<(), BuildError> {
138///     let ld_client = Client::build(ConfigBuilder::new("sdk-key")
139///         .service_endpoints(ServiceEndpointsBuilder::new()
140///             .relay_proxy("http://my-relay-hostname:8080")
141///         ).build()?
142///     )?;
143/// #   Ok(())
144/// # }
145/// ```
146///
147/// Each builder type includes usage examples for the builder.
148pub struct Client {
149    event_processor: Arc<dyn EventProcessor>,
150    data_source: Arc<dyn DataSource>,
151    data_store: Arc<RwLock<dyn DataStore>>,
152    events_default: EventsScope,
153    events_with_reasons: EventsScope,
154    init_notify: Arc<Semaphore>,
155    init_state: Arc<AtomicUsize>,
156    started: AtomicBool,
157    offline: bool,
158    daemon_mode: bool,
159    #[cfg_attr(
160        not(any(feature = "crypto-openssl", feature = "crypto-aws-lc-rs")),
161        allow(dead_code)
162    )]
163    sdk_key: String,
164    shutdown_broadcast: broadcast::Sender<()>,
165    runtime: RwLock<Option<Runtime>>,
166}
167
168impl Client {
169    /// Create a new instance of a [Client] based on the provided [Config] parameter.
170    pub fn build(config: Config) -> Result<Self, BuildError> {
171        if config.offline() {
172            info!("Started LaunchDarkly Client in offline mode");
173        } else if config.daemon_mode() {
174            info!("Started LaunchDarkly Client in daemon mode");
175        }
176
177        let tags = config.application_tag();
178        let instance_id = config.instance_id().to_string();
179
180        let endpoints = config.service_endpoints_builder().build()?;
181
182        let mut event_processor_builder = config.event_processor_builder().to_owned();
183        event_processor_builder.set_instance_id(instance_id.clone());
184        let event_processor =
185            event_processor_builder.build(&endpoints, config.sdk_key(), tags.clone())?;
186
187        let mut data_source_builder = config.data_source_builder().to_owned();
188        data_source_builder.set_instance_id(instance_id);
189        let data_source = data_source_builder.build(&endpoints, config.sdk_key(), tags.clone())?;
190        let data_store = config.data_store_builder().build()?;
191
192        let events_default = EventsScope {
193            disabled: config.offline(),
194            event_factory: EventFactory::new(false),
195            prerequisite_event_recorder: Box::new(PrerequisiteEventRecorder {
196                event_factory: EventFactory::new(false),
197                event_processor: event_processor.clone(),
198            }),
199        };
200
201        let events_with_reasons = EventsScope {
202            disabled: config.offline(),
203            event_factory: EventFactory::new(true),
204            prerequisite_event_recorder: Box::new(PrerequisiteEventRecorder {
205                event_factory: EventFactory::new(true),
206                event_processor: event_processor.clone(),
207            }),
208        };
209
210        let (shutdown_tx, _) = broadcast::channel(1);
211
212        Ok(Client {
213            event_processor,
214            data_source,
215            data_store,
216            events_default,
217            events_with_reasons,
218            init_notify: Arc::new(Semaphore::new(0)),
219            init_state: Arc::new(AtomicUsize::new(ClientInitState::Initializing as usize)),
220            started: AtomicBool::new(false),
221            offline: config.offline(),
222            daemon_mode: config.daemon_mode(),
223            sdk_key: config.sdk_key().into(),
224            shutdown_broadcast: shutdown_tx,
225            runtime: RwLock::new(None),
226        })
227    }
228
229    /// Starts a client in the current thread, which must have a default tokio runtime.
230    pub fn start_with_default_executor(&self) {
231        if self.started.load(Ordering::SeqCst) {
232            return;
233        }
234        self.started.store(true, Ordering::SeqCst);
235        self.start_with_default_executor_internal();
236    }
237
238    fn start_with_default_executor_internal(&self) {
239        // These clones are going to move into the closure, we
240        // do not want to move or reference `self`, because
241        // then lifetimes will get involved.
242        let notify = self.init_notify.clone();
243        let init_state = self.init_state.clone();
244
245        self.data_source.subscribe(
246            self.data_store.clone(),
247            Arc::new(move |success| {
248                init_state.store(
249                    (if success {
250                        ClientInitState::Initialized
251                    } else {
252                        ClientInitState::InitializationFailed
253                    }) as usize,
254                    Ordering::SeqCst,
255                );
256                notify.add_permits(1);
257            }),
258            self.shutdown_broadcast.subscribe(),
259        );
260    }
261
262    /// Creates a new tokio runtime and then starts the client. Tasks from the client will
263    /// be executed on created runtime.
264    /// If your application already has a tokio runtime, then you can use
265    /// [crate::Client::start_with_default_executor] and the client will dispatch tasks to
266    /// your existing runtime.
267    pub fn start_with_runtime(&self) -> Result<bool, StartError> {
268        if self.started.load(Ordering::SeqCst) {
269            return Ok(true);
270        }
271        self.started.store(true, Ordering::SeqCst);
272
273        let runtime = Runtime::new().map_err(StartError::SpawnFailed)?;
274        let _guard = runtime.enter();
275        self.runtime.write().replace(runtime);
276
277        self.start_with_default_executor_internal();
278
279        Ok(true)
280    }
281
282    /// This is an async method that will resolve once initialization is complete or the specified
283    /// timeout has occurred.
284    ///
285    /// If the timeout is triggered, this method will return `None`. Otherwise, the method will
286    /// return a boolean indicating whether or not the SDK has successfully initialized.
287    pub async fn wait_for_initialization(&self, timeout: Duration) -> Option<bool> {
288        if timeout > Duration::from_secs(60) {
289            warn!("wait_for_initialization was configured to block for up to {} seconds. We recommend blocking no longer than 60 seconds.", timeout.as_secs());
290        }
291
292        let initialized = tokio::time::timeout(timeout, self.initialized_async_internal()).await;
293        initialized.ok()
294    }
295
296    async fn initialized_async_internal(&self) -> bool {
297        if self.offline || self.daemon_mode {
298            return true;
299        }
300
301        // If the client is not initialized, then we need to wait for it to be initialized.
302        // Because we are using atomic types, and not a lock, then there is still the possibility
303        // that the value will change between the read and when we wait. We use a semaphore to wait,
304        // and we do not forget the permit, therefore if the permit has been added, then we will get
305        // it very quickly and reduce blocking.
306        if ClientInitState::Initialized != self.init_state.load(Ordering::SeqCst) {
307            let _permit = self.init_notify.acquire().await;
308        }
309        ClientInitState::Initialized == self.init_state.load(Ordering::SeqCst)
310    }
311
312    /// This function synchronously returns if the SDK is initialized.
313    /// In the case of unrecoverable errors in establishing a connection it is possible for the
314    /// SDK to never become initialized.
315    pub fn initialized(&self) -> bool {
316        self.offline
317            || self.daemon_mode
318            || ClientInitState::Initialized == self.init_state.load(Ordering::SeqCst)
319    }
320
321    /// Close shuts down the LaunchDarkly client. After calling this, the LaunchDarkly client
322    /// should no longer be used. The method will block until all pending analytics events (if any)
323    /// been sent.
324    pub fn close(&self) {
325        self.event_processor.close();
326
327        // If the system is in offline mode or daemon mode, no receiver will be listening to this
328        // broadcast channel, so sending on it would always result in an error.
329        if !self.offline && !self.daemon_mode {
330            if let Err(e) = self.shutdown_broadcast.send(()) {
331                error!("Failed to shutdown client appropriately: {e}");
332            }
333        }
334
335        // Potentially take the runtime we created when starting the client and do nothing with it
336        // so it drops, closing out all spawned tasks.
337        self.runtime.write().take();
338    }
339
340    /// Flush tells the client that all pending analytics events (if any) should be delivered as
341    /// soon as possible. Flushing is asynchronous, so this method will return before it is
342    /// complete. However, if you call [Client::close], events are guaranteed to be sent before
343    /// that method returns.
344    ///
345    /// For more information, see the Reference Guide:
346    /// <https://docs.launchdarkly.com/sdk/features/flush#rust>.
347    pub fn flush(&self) {
348        self.event_processor.flush();
349    }
350
351    /// Flush tells the client that all pending analytics events should be delivered as
352    /// soon as possible, and blocks until delivery is complete or the timeout expires.
353    ///
354    /// This method is particularly useful in short-lived execution environments like AWS Lambda
355    /// where you need to ensure events are sent before the function terminates.
356    ///
357    /// This method triggers a flush of events currently buffered and waits for that specific
358    /// flush to complete. Note that if periodic flushes or other flush operations are in-flight
359    /// when this is called, those may still be completing after this method returns.
360    ///
361    /// # Arguments
362    ///
363    /// * `timeout` - Maximum time to wait for flush to complete. Use `Duration::ZERO` to wait indefinitely.
364    ///
365    /// # Returns
366    ///
367    /// Returns `true` if flush completed successfully, `false` if timeout occurred.
368    ///
369    /// # Examples
370    ///
371    /// ```no_run
372    /// # use launchdarkly_server_sdk::{Client, ConfigBuilder};
373    /// # use std::time::Duration;
374    /// # async fn example() {
375    /// # let client = Client::build(ConfigBuilder::new("sdk-key").build().unwrap()).unwrap();
376    /// // Wait up to 5 seconds for flush to complete
377    /// let success = client.flush_blocking(Duration::from_secs(5)).await;
378    /// if !success {
379    ///     eprintln!("Warning: flush timed out");
380    /// }
381    /// # }
382    /// ```
383    ///
384    /// For more information, see the Reference Guide:
385    /// <https://docs.launchdarkly.com/sdk/features/flush#rust>.
386    pub async fn flush_blocking(&self, timeout: Duration) -> bool {
387        let event_processor = self.event_processor.clone();
388
389        let flush_future =
390            tokio::task::spawn_blocking(move || event_processor.flush_blocking(timeout));
391
392        if timeout == Duration::ZERO {
393            // Wait indefinitely
394            flush_future.await.unwrap_or(false)
395        } else {
396            // Apply timeout at async level too
397            match tokio::time::timeout(timeout, flush_future).await {
398                Ok(Ok(result)) => result,
399                Ok(Err(_)) => false, // spawn_blocking panicked
400                Err(_) => false,     // Timeout
401            }
402        }
403    }
404
405    /// Identify reports details about a context.
406    ///
407    /// For more information, see the Reference Guide:
408    /// <https://docs.launchdarkly.com/sdk/features/identify#rust>
409    pub fn identify(&self, context: Context) {
410        if self.events_default.disabled {
411            return;
412        }
413
414        self.send_internal(self.events_default.event_factory.new_identify(context));
415    }
416
417    /// Returns the value of a boolean feature flag for a given context.
418    ///
419    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned
420    /// off and has no off variation.
421    ///
422    /// For more information, see the Reference Guide:
423    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
424    pub fn bool_variation(&self, context: &Context, flag_key: &str, default: bool) -> bool {
425        let val = self.variation(context, flag_key, default);
426        if let Some(b) = val.as_bool() {
427            b
428        } else {
429            warn!("bool_variation called for a non-bool flag {flag_key:?} (got {val:?})");
430            default
431        }
432    }
433
434    /// Returns the value of a string feature flag for a given context.
435    ///
436    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned
437    /// off and has no off variation.
438    ///
439    /// For more information, see the Reference Guide:
440    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
441    pub fn str_variation(&self, context: &Context, flag_key: &str, default: String) -> String {
442        let val = self.variation(context, flag_key, default.clone());
443        if let Some(s) = val.as_string() {
444            s
445        } else {
446            warn!("str_variation called for a non-string flag {flag_key:?} (got {val:?})");
447            default
448        }
449    }
450
451    /// Returns the value of a float feature flag for a given context.
452    ///
453    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned
454    /// off and has no off variation.
455    ///
456    /// For more information, see the Reference Guide:
457    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
458    pub fn float_variation(&self, context: &Context, flag_key: &str, default: f64) -> f64 {
459        let val = self.variation(context, flag_key, default);
460        if let Some(f) = val.as_float() {
461            f
462        } else {
463            warn!("float_variation called for a non-float flag {flag_key:?} (got {val:?})");
464            default
465        }
466    }
467
468    /// Returns the value of a integer feature flag for a given context.
469    ///
470    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned
471    /// off and has no off variation.
472    ///
473    /// For more information, see the Reference Guide:
474    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
475    pub fn int_variation(&self, context: &Context, flag_key: &str, default: i64) -> i64 {
476        let val = self.variation(context, flag_key, default);
477        if let Some(f) = val.as_int() {
478            f
479        } else {
480            warn!("int_variation called for a non-int flag {flag_key:?} (got {val:?})");
481            default
482        }
483    }
484
485    /// Returns the value of a feature flag for the given context, allowing the value to be
486    /// of any JSON type.
487    ///
488    /// The value is returned as an [serde_json::Value].
489    ///
490    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned off.
491    ///
492    /// For more information, see the Reference Guide:
493    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
494    pub fn json_variation(
495        &self,
496        context: &Context,
497        flag_key: &str,
498        default: serde_json::Value,
499    ) -> serde_json::Value {
500        self.variation(context, flag_key, default.clone())
501            .as_json()
502            .unwrap_or(default)
503    }
504
505    /// This method is the same as [Client::bool_variation], but also returns further information
506    /// about how the value was calculated. The "reason" data will also be included in analytics
507    /// events.
508    ///
509    /// For more information, see the Reference Guide:
510    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
511    pub fn bool_variation_detail(
512        &self,
513        context: &Context,
514        flag_key: &str,
515        default: bool,
516    ) -> Detail<bool> {
517        self.variation_detail(context, flag_key, default).try_map(
518            |val| val.as_bool(),
519            default,
520            eval::Error::WrongType,
521        )
522    }
523
524    /// This method is the same as [Client::str_variation], but also returns further information
525    /// about how the value was calculated. The "reason" data will also be included in analytics
526    /// events.
527    ///
528    /// For more information, see the Reference Guide:
529    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
530    pub fn str_variation_detail(
531        &self,
532        context: &Context,
533        flag_key: &str,
534        default: String,
535    ) -> Detail<String> {
536        self.variation_detail(context, flag_key, default.clone())
537            .try_map(|val| val.as_string(), default, eval::Error::WrongType)
538    }
539
540    /// This method is the same as [Client::float_variation], but also returns further information
541    /// about how the value was calculated. The "reason" data will also be included in analytics
542    /// events.
543    ///
544    /// For more information, see the Reference Guide:
545    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
546    pub fn float_variation_detail(
547        &self,
548        context: &Context,
549        flag_key: &str,
550        default: f64,
551    ) -> Detail<f64> {
552        self.variation_detail(context, flag_key, default).try_map(
553            |val| val.as_float(),
554            default,
555            eval::Error::WrongType,
556        )
557    }
558
559    /// This method is the same as [Client::int_variation], but also returns further information
560    /// about how the value was calculated. The "reason" data will also be included in analytics
561    /// events.
562    ///
563    /// For more information, see the Reference Guide:
564    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
565    pub fn int_variation_detail(
566        &self,
567        context: &Context,
568        flag_key: &str,
569        default: i64,
570    ) -> Detail<i64> {
571        self.variation_detail(context, flag_key, default).try_map(
572            |val| val.as_int(),
573            default,
574            eval::Error::WrongType,
575        )
576    }
577
578    /// This method is the same as [Client::json_variation], but also returns further information
579    /// about how the value was calculated. The "reason" data will also be included in analytics
580    /// events.
581    ///
582    /// For more information, see the Reference Guide:
583    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
584    pub fn json_variation_detail(
585        &self,
586        context: &Context,
587        flag_key: &str,
588        default: serde_json::Value,
589    ) -> Detail<serde_json::Value> {
590        self.variation_detail(context, flag_key, default.clone())
591            .try_map(|val| val.as_json(), default, eval::Error::WrongType)
592    }
593
594    #[cfg(any(feature = "crypto-aws-lc-rs", feature = "crypto-openssl"))]
595    /// Generates the secure mode hash value for a context.
596    ///
597    /// For more information, see the Reference Guide:
598    /// <https://docs.launchdarkly.com/sdk/features/secure-mode#rust>.
599    pub fn secure_mode_hash(&self, context: &Context) -> Result<String, String> {
600        #[cfg(feature = "crypto-aws-lc-rs")]
601        {
602            let key =
603                aws_lc_rs::hmac::Key::new(aws_lc_rs::hmac::HMAC_SHA256, self.sdk_key.as_bytes());
604            let tag = aws_lc_rs::hmac::sign(&key, context.canonical_key().as_bytes());
605
606            Ok(data_encoding::HEXLOWER.encode(tag.as_ref()))
607        }
608        #[cfg(feature = "crypto-openssl")]
609        {
610            use openssl::hash::MessageDigest;
611            use openssl::pkey::PKey;
612            use openssl::sign::Signer;
613
614            let key = PKey::hmac(self.sdk_key.as_bytes())
615                .map_err(|e| format!("Failed to create HMAC key: {e}"))?;
616            let mut signer = Signer::new(MessageDigest::sha256(), &key)
617                .map_err(|e| format!("Failed to create signer: {e}"))?;
618            signer
619                .update(context.canonical_key().as_bytes())
620                .map_err(|e| format!("Failed to update signer: {e}"))?;
621            let hmac = signer
622                .sign_to_vec()
623                .map_err(|e| format!("Failed to sign: {e}"))?;
624
625            Ok(data_encoding::HEXLOWER.encode(&hmac))
626        }
627    }
628
629    /// Returns an object that encapsulates the state of all feature flags for a given context. This
630    /// includes the flag values, and also metadata that can be used on the front end.
631    ///
632    /// The most common use case for this method is to bootstrap a set of client-side feature flags
633    /// from a back-end service.
634    ///
635    /// You may pass any configuration of [FlagDetailConfig] to control what data is included.
636    ///
637    /// For more information, see the Reference Guide:
638    /// <https://docs.launchdarkly.com/sdk/features/all-flags#rust>
639    pub fn all_flags_detail(
640        &self,
641        context: &Context,
642        flag_state_config: FlagDetailConfig,
643    ) -> FlagDetail {
644        if self.offline {
645            warn!(
646                "all_flags_detail() called, but client is in offline mode. Returning empty state"
647            );
648            return FlagDetail::new(false);
649        }
650
651        if !self.initialized() {
652            warn!("all_flags_detail() called before client has finished initializing! Feature store unavailable - returning empty state");
653            return FlagDetail::new(false);
654        }
655
656        let data_store = self.data_store.read();
657
658        let mut flag_detail = FlagDetail::new(true);
659        flag_detail.populate(&*data_store, context, flag_state_config);
660
661        flag_detail
662    }
663
664    /// This method is the same as [Client::variation], but also returns further information about
665    /// how the value was calculated. The "reason" data will also be included in analytics events.
666    ///
667    /// For more information, see the Reference Guide:
668    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
669    pub fn variation_detail<T: Into<FlagValue> + Clone>(
670        &self,
671        context: &Context,
672        flag_key: &str,
673        default: T,
674    ) -> Detail<FlagValue> {
675        let (detail, _) =
676            self.variation_internal(context, flag_key, default, &self.events_with_reasons);
677        detail
678    }
679
680    /// This is a generic function which returns the value of a feature flag for a given context.
681    ///
682    /// This method is an alternatively to the type specified methods (e.g.
683    /// [Client::bool_variation], [Client::int_variation], etc.).
684    ///
685    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned
686    /// off and has no off variation.
687    ///
688    /// For more information, see the Reference Guide:
689    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
690    pub fn variation<T: Into<FlagValue> + Clone>(
691        &self,
692        context: &Context,
693        flag_key: &str,
694        default: T,
695    ) -> FlagValue {
696        let (detail, _) = self.variation_internal(context, flag_key, default, &self.events_default);
697        detail.value.unwrap()
698    }
699
700    /// This method returns the migration stage of the migration feature flag for the given
701    /// evaluation context.
702    ///
703    /// This method returns the default stage if there is an error or the flag does not exist.
704    pub fn migration_variation(
705        &self,
706        context: &Context,
707        flag_key: &str,
708        default_stage: Stage,
709    ) -> (Stage, Arc<Mutex<MigrationOpTracker>>) {
710        let (detail, flag) =
711            self.variation_internal(context, flag_key, default_stage, &self.events_default);
712
713        let migration_detail =
714            detail.try_map(|v| v.try_into().ok(), default_stage, eval::Error::WrongType);
715        let tracker = MigrationOpTracker::new(
716            flag_key.into(),
717            flag,
718            context.clone(),
719            migration_detail.clone(),
720            default_stage,
721        );
722
723        (
724            migration_detail.value.unwrap_or(default_stage),
725            Arc::new(Mutex::new(tracker)),
726        )
727    }
728
729    /// Reports that a context has performed an event.
730    ///
731    /// The `key` parameter is defined by the application and will be shown in analytics reports;
732    /// it normally corresponds to the event name of a metric that you have created through the
733    /// LaunchDarkly dashboard. If you want to associate additional data with this event, use
734    /// [Client::track_data] or [Client::track_metric].
735    ///
736    /// For more information, see the Reference Guide:
737    /// <https://docs.launchdarkly.com/sdk/features/events#rust>.
738    pub fn track_event(&self, context: Context, key: impl Into<String>) {
739        let _ = self.track(context, key, None, serde_json::Value::Null);
740    }
741
742    /// Reports that a context has performed an event, and associates it with custom data.
743    ///
744    /// The `key` parameter is defined by the application and will be shown in analytics reports;
745    /// it normally corresponds to the event name of a metric that you have created through the
746    /// LaunchDarkly dashboard.
747    ///
748    /// `data` parameter is any type that implements [Serialize]. If no such value is needed, use
749    /// [serde_json::Value::Null] (or call [Client::track_event] instead). To send a numeric value
750    /// for experimentation, use [Client::track_metric].
751    ///
752    /// For more information, see the Reference Guide:
753    /// <https://docs.launchdarkly.com/sdk/features/events#rust>.
754    pub fn track_data(
755        &self,
756        context: Context,
757        key: impl Into<String>,
758        data: impl Serialize,
759    ) -> serde_json::Result<()> {
760        self.track(context, key, None, data)
761    }
762
763    /// Reports that a context has performed an event, and associates it with a numeric value. This
764    /// value is used by the LaunchDarkly experimentation feature in numeric custom metrics, and
765    /// will also be returned as part of the custom event for Data Export.
766    ///
767    /// The `key` parameter is defined by the application and will be shown in analytics reports;
768    /// it normally corresponds to the event name of a metric that you have created through the
769    /// LaunchDarkly dashboard.
770    ///
771    /// For more information, see the Reference Guide:
772    /// <https://docs.launchdarkly.com/sdk/features/events#rust>.
773    pub fn track_metric(
774        &self,
775        context: Context,
776        key: impl Into<String>,
777        value: f64,
778        data: impl Serialize,
779    ) {
780        let _ = self.track(context, key, Some(value), data);
781    }
782
783    fn track(
784        &self,
785        context: Context,
786        key: impl Into<String>,
787        metric_value: Option<f64>,
788        data: impl Serialize,
789    ) -> serde_json::Result<()> {
790        if !self.events_default.disabled {
791            let event =
792                self.events_default
793                    .event_factory
794                    .new_custom(context, key, metric_value, data)?;
795
796            self.send_internal(event);
797        }
798
799        Ok(())
800    }
801
802    /// Tracks the results of a migrations operation. This event includes measurements which can be
803    /// used to enhance the observability of a migration within the LaunchDarkly UI.
804    ///
805    /// This event should be generated through [crate::MigrationOpTracker]. If you are using the
806    /// [crate::Migrator] to handle migrations, this event will be created and emitted
807    /// automatically.
808    pub fn track_migration_op(&self, tracker: Arc<Mutex<MigrationOpTracker>>) {
809        if self.events_default.disabled {
810            return;
811        }
812
813        match tracker.lock() {
814            Ok(tracker) => {
815                let event = tracker.build();
816                match event {
817                    Ok(event) => {
818                        self.send_internal(
819                            self.events_default.event_factory.new_migration_op(event),
820                        );
821                    }
822                    Err(e) => error!("Failed to build migration event, no event will be sent: {e}"),
823                }
824            }
825            Err(e) => error!("Failed to lock migration tracker, no event will be sent: {e}"),
826        }
827    }
828
829    fn variation_internal<T: Into<FlagValue> + Clone>(
830        &self,
831        context: &Context,
832        flag_key: &str,
833        default: T,
834        events_scope: &EventsScope,
835    ) -> (Detail<FlagValue>, Option<eval::Flag>) {
836        if self.offline {
837            return (
838                Detail::err_default(eval::Error::ClientNotReady, default.into()),
839                None,
840            );
841        }
842
843        let (flag, result) = match self.initialized() {
844            false => (
845                None,
846                Detail::err_default(eval::Error::ClientNotReady, default.clone().into()),
847            ),
848            true => {
849                let data_store = self.data_store.read();
850                match data_store.flag(flag_key) {
851                    Some(flag) => {
852                        let result = eval::evaluate(
853                            data_store.to_store(),
854                            &flag,
855                            context,
856                            Some(&*events_scope.prerequisite_event_recorder),
857                        )
858                        .map(|v| v.clone())
859                        .or(default.clone().into());
860
861                        (Some(flag), result)
862                    }
863                    None => (
864                        None,
865                        Detail::err_default(eval::Error::FlagNotFound, default.clone().into()),
866                    ),
867                }
868            }
869        };
870
871        if !events_scope.disabled {
872            let event = match &flag {
873                Some(f) => events_scope.event_factory.new_eval_event(
874                    flag_key,
875                    context.clone(),
876                    f,
877                    result.clone(),
878                    default.into(),
879                    None,
880                ),
881                None => events_scope.event_factory.new_unknown_flag_event(
882                    flag_key,
883                    context.clone(),
884                    result.clone(),
885                    default.into(),
886                ),
887            };
888            self.send_internal(event);
889        }
890
891        (result, flag)
892    }
893
894    fn send_internal(&self, event: InputEvent) {
895        self.event_processor.send(event);
896    }
897}
898
899#[cfg(test)]
900mod tests {
901    use assert_json_diff::assert_json_eq;
902    use crossbeam_channel::Receiver;
903    use eval::{ContextBuilder, MultiContextBuilder};
904    use futures::FutureExt;
905    use launchdarkly_server_sdk_evaluation::{Flag, Reason, Segment};
906    use maplit::hashmap;
907    use std::collections::HashMap;
908    use tokio::time::Instant;
909
910    use crate::data_source::MockDataSource;
911    use crate::data_source_builders::MockDataSourceBuilder;
912    use crate::evaluation::FlagFilter;
913    use crate::events::create_event_sender;
914    use crate::events::event::{OutputEvent, VariationKey};
915    use crate::events::processor_builders::EventProcessorBuilder;
916    use crate::stores::persistent_store::tests::InMemoryPersistentDataStore;
917    use crate::stores::store_types::{PatchTarget, StorageItem};
918    use crate::test_common::{
919        self, basic_flag, basic_flag_with_prereq, basic_flag_with_prereqs_and_visibility,
920        basic_flag_with_visibility, basic_int_flag, basic_migration_flag, basic_off_flag,
921    };
922    use crate::test_data::TestData;
923    use crate::{
924        AllData, ConfigBuilder, MigratorBuilder, NullEventProcessorBuilder, Operation, Origin,
925        PersistentDataStore, PersistentDataStoreBuilder, PersistentDataStoreFactory,
926        SerializedItem,
927    };
928    use test_case::test_case;
929
930    use super::*;
931
932    fn is_send_and_sync<T: Send + Sync>() {}
933
934    #[test]
935    fn ensure_client_is_send_and_sync() {
936        is_send_and_sync::<Client>()
937    }
938
939    #[tokio::test]
940    async fn client_asynchronously_initializes_within_timeout() {
941        let (client, _event_rx) = make_mocked_client_with_delay(1000, false, false);
942        client.start_with_default_executor();
943
944        let now = Instant::now();
945        let initialized = client
946            .wait_for_initialization(Duration::from_millis(1500))
947            .await;
948        let elapsed_time = now.elapsed();
949        // Give ourself a good margin for thread scheduling.
950        assert!(elapsed_time.as_millis() > 500);
951        assert_eq!(initialized, Some(true));
952    }
953
954    #[tokio::test]
955    async fn client_asynchronously_initializes_slower_than_timeout() {
956        let (client, _event_rx) = make_mocked_client_with_delay(2000, false, false);
957        client.start_with_default_executor();
958
959        let now = Instant::now();
960        let initialized = client
961            .wait_for_initialization(Duration::from_millis(500))
962            .await;
963        let elapsed_time = now.elapsed();
964        // Give ourself a good margin for thread scheduling.
965        assert!(elapsed_time.as_millis() < 750);
966        assert!(initialized.is_none());
967    }
968
969    #[tokio::test]
970    async fn client_initializes_immediately_in_offline_mode() {
971        let (client, _event_rx) = make_mocked_client_with_delay(1000, true, false);
972        client.start_with_default_executor();
973
974        assert!(client.initialized());
975
976        let now = Instant::now();
977        let initialized = client
978            .wait_for_initialization(Duration::from_millis(2000))
979            .await;
980        let elapsed_time = now.elapsed();
981        assert_eq!(initialized, Some(true));
982        assert!(elapsed_time.as_millis() < 500)
983    }
984
985    #[tokio::test]
986    async fn client_initializes_immediately_in_daemon_mode() {
987        let (client, _event_rx) = make_mocked_client_with_delay(1000, false, true);
988        client.start_with_default_executor();
989
990        assert!(client.initialized());
991
992        let now = Instant::now();
993        let initialized = client
994            .wait_for_initialization(Duration::from_millis(2000))
995            .await;
996        let elapsed_time = now.elapsed();
997        assert_eq!(initialized, Some(true));
998        assert!(elapsed_time.as_millis() < 500)
999    }
1000
1001    #[test_case(basic_flag("myFlag"), false.into(), true.into())]
1002    #[test_case(basic_int_flag("myFlag"), 0.into(), test_common::FLOAT_TO_INT_MAX.into())]
1003    fn client_updates_changes_evaluation_results(
1004        flag: eval::Flag,
1005        default: FlagValue,
1006        expected: FlagValue,
1007    ) {
1008        let context = ContextBuilder::new("foo")
1009            .build()
1010            .expect("Failed to create context");
1011
1012        let (client, _event_rx) = make_mocked_client();
1013
1014        let result = client.variation_detail(&context, "myFlag", default.clone());
1015        assert_eq!(result.value.unwrap(), default);
1016
1017        client.start_with_default_executor();
1018        client
1019            .data_store
1020            .write()
1021            .upsert(
1022                &flag.key,
1023                PatchTarget::Flag(StorageItem::Item(flag.clone())),
1024            )
1025            .expect("patch should apply");
1026
1027        let result = client.variation_detail(&context, "myFlag", default);
1028        assert_eq!(result.value.unwrap(), expected);
1029        assert!(matches!(
1030            result.reason,
1031            Reason::Fallthrough {
1032                in_experiment: false
1033            }
1034        ));
1035    }
1036
1037    #[test]
1038    fn all_flags_detail_is_invalid_when_offline() {
1039        let (client, _event_rx) = make_mocked_offline_client();
1040        client.start_with_default_executor();
1041
1042        let context = ContextBuilder::new("bob")
1043            .build()
1044            .expect("Failed to create context");
1045
1046        let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1047        assert_json_eq!(all_flags, json!({"$valid": false, "$flagsState" : {}}));
1048    }
1049
1050    #[test]
1051    fn all_flags_detail_is_invalid_when_not_initialized() {
1052        let (client, _event_rx) = make_mocked_client();
1053
1054        let context = ContextBuilder::new("bob")
1055            .build()
1056            .expect("Failed to create context");
1057
1058        let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1059        assert_json_eq!(all_flags, json!({"$valid": false, "$flagsState" : {}}));
1060    }
1061
1062    #[tokio::test]
1063    async fn all_flags_detail_returns_flag_states() {
1064        let td = TestData::new();
1065        td.use_preconfigured_flag(basic_flag("myFlag1"));
1066        td.use_preconfigured_flag(basic_flag("myFlag2"));
1067        let (client, _event_rx) = make_client_with_test_data(&td);
1068        client.start_with_default_executor();
1069
1070        let context = ContextBuilder::new("bob")
1071            .build()
1072            .expect("Failed to create context");
1073
1074        let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1075
1076        client.close();
1077
1078        assert_json_eq!(
1079            all_flags,
1080            json!({
1081                "myFlag1": true,
1082                "myFlag2": true,
1083                "$flagsState": {
1084                    "myFlag1": {
1085                        "version": 1,
1086                        "variation": 1
1087                    },
1088                     "myFlag2": {
1089                        "version": 1,
1090                        "variation": 1
1091                    },
1092                },
1093                "$valid": true
1094            })
1095        );
1096    }
1097
1098    #[tokio::test]
1099    async fn all_flags_detail_returns_prerequisite_relations() {
1100        let td = TestData::new();
1101        td.use_preconfigured_flag(basic_flag("prereq1"));
1102        td.use_preconfigured_flag(basic_flag("prereq2"));
1103        td.use_preconfigured_flag(basic_flag_with_prereqs_and_visibility(
1104            "toplevel",
1105            &["prereq1", "prereq2"],
1106            false,
1107            false,
1108        ));
1109        let (client, _event_rx) = make_client_with_test_data(&td);
1110        client.start_with_default_executor();
1111
1112        let context = ContextBuilder::new("bob")
1113            .build()
1114            .expect("Failed to create context");
1115
1116        let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1117
1118        client.close();
1119
1120        assert_json_eq!(
1121            all_flags,
1122            json!({
1123                "prereq1": true,
1124                "prereq2": true,
1125                "toplevel": true,
1126                "$flagsState": {
1127                    "toplevel": {
1128                        "version": 1,
1129                        "variation": 1,
1130                        "prerequisites": ["prereq1", "prereq2"]
1131                    },
1132                    "prereq1": {
1133                        "version": 1,
1134                        "variation": 1
1135                    },
1136                     "prereq2": {
1137                        "version": 1,
1138                        "variation": 1
1139                    },
1140                },
1141                "$valid": true
1142            })
1143        );
1144    }
1145
1146    #[tokio::test]
1147    async fn all_flags_detail_returns_prerequisite_relations_when_not_visible_to_clients() {
1148        let td = TestData::new();
1149        td.use_preconfigured_flag(basic_flag_with_visibility("prereq1", false, false));
1150        td.use_preconfigured_flag(basic_flag_with_visibility("prereq2", false, false));
1151        td.use_preconfigured_flag(basic_flag_with_prereqs_and_visibility(
1152            "toplevel",
1153            &["prereq1", "prereq2"],
1154            true,
1155            false,
1156        ));
1157        let (client, _event_rx) = make_client_with_test_data(&td);
1158        client.start_with_default_executor();
1159
1160        let context = ContextBuilder::new("bob")
1161            .build()
1162            .expect("Failed to create context");
1163
1164        let mut config = FlagDetailConfig::new();
1165        config.flag_filter(FlagFilter::CLIENT);
1166
1167        let all_flags = client.all_flags_detail(&context, config);
1168
1169        client.close();
1170
1171        assert_json_eq!(
1172            all_flags,
1173            json!({
1174                "toplevel": true,
1175                "$flagsState": {
1176                    "toplevel": {
1177                        "version": 1,
1178                        "variation": 1,
1179                        "prerequisites": ["prereq1", "prereq2"]
1180                    },
1181                },
1182                "$valid": true
1183            })
1184        );
1185    }
1186
1187    #[tokio::test]
1188    async fn variation_tracks_events_correctly() {
1189        let td = TestData::new();
1190        td.use_preconfigured_flag(basic_flag("myFlag"));
1191        let (client, event_rx) = make_client_with_test_data(&td);
1192        client.start_with_default_executor();
1193
1194        let context = ContextBuilder::new("bob")
1195            .build()
1196            .expect("Failed to create context");
1197
1198        let flag_value = client.variation(&context, "myFlag", FlagValue::Bool(false));
1199
1200        assert!(flag_value.as_bool().unwrap());
1201        client.flush();
1202        client.close();
1203
1204        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1205        assert_eq!(events.len(), 2);
1206        assert_eq!(events[0].kind(), "index");
1207        assert_eq!(events[1].kind(), "summary");
1208
1209        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1210            let variation_key = VariationKey {
1211                version: Some(1),
1212                variation: Some(1),
1213            };
1214            let feature = event_summary.features.get("myFlag");
1215            assert!(feature.is_some());
1216
1217            let feature = feature.unwrap();
1218            assert!(feature.counters.contains_key(&variation_key));
1219        } else {
1220            panic!("Event should be a summary type");
1221        }
1222    }
1223
1224    #[test]
1225    fn variation_handles_offline_mode() {
1226        let (client, event_rx) = make_mocked_offline_client();
1227        client.start_with_default_executor();
1228
1229        let context = ContextBuilder::new("bob")
1230            .build()
1231            .expect("Failed to create context");
1232        let flag_value = client.variation(&context, "myFlag", FlagValue::Bool(false));
1233
1234        assert!(!flag_value.as_bool().unwrap());
1235        client.flush();
1236        client.close();
1237
1238        assert_eq!(event_rx.iter().count(), 0);
1239    }
1240
1241    #[test]
1242    fn variation_handles_unknown_flags() {
1243        let (client, event_rx) = make_mocked_client();
1244        client.start_with_default_executor();
1245        let context = ContextBuilder::new("bob")
1246            .build()
1247            .expect("Failed to create context");
1248
1249        let flag_value = client.variation(&context, "non-existent-flag", FlagValue::Bool(false));
1250
1251        assert!(!flag_value.as_bool().unwrap());
1252        client.flush();
1253        client.close();
1254
1255        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1256        assert_eq!(events.len(), 2);
1257        assert_eq!(events[0].kind(), "index");
1258        assert_eq!(events[1].kind(), "summary");
1259
1260        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1261            let variation_key = VariationKey {
1262                version: None,
1263                variation: None,
1264            };
1265
1266            let feature = event_summary.features.get("non-existent-flag");
1267            assert!(feature.is_some());
1268
1269            let feature = feature.unwrap();
1270            assert!(feature.counters.contains_key(&variation_key));
1271        } else {
1272            panic!("Event should be a summary type");
1273        }
1274    }
1275
1276    #[tokio::test]
1277    async fn variation_detail_handles_debug_events_correctly() {
1278        let td = TestData::new();
1279        let mut flag = basic_flag("myFlag");
1280        flag.debug_events_until_date = Some(64_060_606_800_000); // Jan. 1st, 4000
1281        td.use_preconfigured_flag(flag);
1282        let (client, event_rx) = make_client_with_test_data(&td);
1283        client.start_with_default_executor();
1284
1285        let context = ContextBuilder::new("bob")
1286            .build()
1287            .expect("Failed to create context");
1288
1289        let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1290
1291        assert!(detail.value.unwrap().as_bool().unwrap());
1292        assert!(matches!(
1293            detail.reason,
1294            Reason::Fallthrough {
1295                in_experiment: false
1296            }
1297        ));
1298        client.flush();
1299        client.close();
1300
1301        let events = event_rx.try_iter().collect::<Vec<OutputEvent>>();
1302        assert_eq!(events.len(), 3);
1303        assert_eq!(events[0].kind(), "index");
1304        assert_eq!(events[1].kind(), "debug");
1305        assert_eq!(events[2].kind(), "summary");
1306
1307        if let OutputEvent::Summary(event_summary) = events[2].clone() {
1308            let variation_key = VariationKey {
1309                version: Some(1),
1310                variation: Some(1),
1311            };
1312
1313            let feature = event_summary.features.get("myFlag");
1314            assert!(feature.is_some());
1315
1316            let feature = feature.unwrap();
1317            assert!(feature.counters.contains_key(&variation_key));
1318        } else {
1319            panic!("Event should be a summary type");
1320        }
1321    }
1322
1323    #[tokio::test]
1324    async fn variation_detail_tracks_events_correctly() {
1325        let td = TestData::new();
1326        td.use_preconfigured_flag(basic_flag("myFlag"));
1327        let (client, event_rx) = make_client_with_test_data(&td);
1328        client.start_with_default_executor();
1329
1330        let context = ContextBuilder::new("bob")
1331            .build()
1332            .expect("Failed to create context");
1333
1334        let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1335
1336        assert!(detail.value.unwrap().as_bool().unwrap());
1337        assert!(matches!(
1338            detail.reason,
1339            Reason::Fallthrough {
1340                in_experiment: false
1341            }
1342        ));
1343        client.flush();
1344        client.close();
1345
1346        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1347        assert_eq!(events.len(), 2);
1348        assert_eq!(events[0].kind(), "index");
1349        assert_eq!(events[1].kind(), "summary");
1350
1351        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1352            let variation_key = VariationKey {
1353                version: Some(1),
1354                variation: Some(1),
1355            };
1356
1357            let feature = event_summary.features.get("myFlag");
1358            assert!(feature.is_some());
1359
1360            let feature = feature.unwrap();
1361            assert!(feature.counters.contains_key(&variation_key));
1362        } else {
1363            panic!("Event should be a summary type");
1364        }
1365    }
1366
1367    #[test]
1368    fn variation_detail_handles_offline_mode() {
1369        let (client, event_rx) = make_mocked_offline_client();
1370        client.start_with_default_executor();
1371
1372        let context = ContextBuilder::new("bob")
1373            .build()
1374            .expect("Failed to create context");
1375
1376        let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1377
1378        assert!(!detail.value.unwrap().as_bool().unwrap());
1379        assert!(matches!(
1380            detail.reason,
1381            Reason::Error {
1382                error: eval::Error::ClientNotReady
1383            }
1384        ));
1385        client.flush();
1386        client.close();
1387
1388        assert_eq!(event_rx.iter().count(), 0);
1389    }
1390
1391    struct InMemoryPersistentDataStoreFactory {
1392        data: AllData<Flag, Segment>,
1393        initialized: bool,
1394    }
1395
1396    impl PersistentDataStoreFactory for InMemoryPersistentDataStoreFactory {
1397        fn create_persistent_data_store(
1398            &self,
1399        ) -> Result<Box<dyn PersistentDataStore + 'static>, std::io::Error> {
1400            let serialized_data =
1401                AllData::<SerializedItem, SerializedItem>::try_from(self.data.clone())?;
1402            Ok(Box::new(InMemoryPersistentDataStore {
1403                data: serialized_data,
1404                initialized: self.initialized,
1405            }))
1406        }
1407    }
1408
1409    #[test]
1410    fn variation_detail_handles_daemon_mode() {
1411        testing_logger::setup();
1412        let factory = InMemoryPersistentDataStoreFactory {
1413            data: AllData {
1414                flags: hashmap!["flag".into() => basic_flag("flag")],
1415                segments: HashMap::new(),
1416            },
1417            initialized: true,
1418        };
1419        let builder = PersistentDataStoreBuilder::new(Arc::new(factory));
1420
1421        let config = ConfigBuilder::new("sdk-key")
1422            .daemon_mode(true)
1423            .data_store(&builder)
1424            .event_processor(&NullEventProcessorBuilder::new())
1425            .build()
1426            .expect("config should build");
1427
1428        let client = Client::build(config).expect("Should be built.");
1429
1430        client.start_with_default_executor();
1431
1432        let context = ContextBuilder::new("bob")
1433            .build()
1434            .expect("Failed to create context");
1435
1436        let detail = client.variation_detail(&context, "flag", FlagValue::Bool(false));
1437
1438        assert!(detail.value.unwrap().as_bool().unwrap());
1439        assert!(matches!(
1440            detail.reason,
1441            Reason::Fallthrough {
1442                in_experiment: false
1443            }
1444        ));
1445        client.flush();
1446        client.close();
1447
1448        testing_logger::validate(|captured_logs| {
1449            assert_eq!(captured_logs.len(), 1);
1450            assert_eq!(
1451                captured_logs[0].body,
1452                "Started LaunchDarkly Client in daemon mode"
1453            );
1454        });
1455    }
1456
1457    #[test]
1458    fn daemon_mode_is_quiet_if_store_is_not_initialized() {
1459        testing_logger::setup();
1460
1461        let factory = InMemoryPersistentDataStoreFactory {
1462            data: AllData {
1463                flags: HashMap::new(),
1464                segments: HashMap::new(),
1465            },
1466            initialized: false,
1467        };
1468        let builder = PersistentDataStoreBuilder::new(Arc::new(factory));
1469
1470        let config = ConfigBuilder::new("sdk-key")
1471            .daemon_mode(true)
1472            .data_store(&builder)
1473            .event_processor(&NullEventProcessorBuilder::new())
1474            .build()
1475            .expect("config should build");
1476
1477        let client = Client::build(config).expect("Should be built.");
1478
1479        client.start_with_default_executor();
1480
1481        let context = ContextBuilder::new("bob")
1482            .build()
1483            .expect("Failed to create context");
1484
1485        client.variation_detail(&context, "flag", FlagValue::Bool(false));
1486
1487        testing_logger::validate(|captured_logs| {
1488            assert_eq!(captured_logs.len(), 1);
1489            assert_eq!(
1490                captured_logs[0].body,
1491                "Started LaunchDarkly Client in daemon mode"
1492            );
1493        });
1494    }
1495
1496    #[tokio::test]
1497    async fn variation_handles_off_flag_without_variation() {
1498        let td = TestData::new();
1499        td.use_preconfigured_flag(basic_off_flag("myFlag"));
1500        let (client, event_rx) = make_client_with_test_data(&td);
1501        client.start_with_default_executor();
1502
1503        let context = ContextBuilder::new("bob")
1504            .build()
1505            .expect("Failed to create context");
1506
1507        let result = client.variation(&context, "myFlag", FlagValue::Bool(false));
1508
1509        assert!(!result.as_bool().unwrap());
1510        client.flush();
1511        client.close();
1512
1513        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1514        assert_eq!(events.len(), 2);
1515        assert_eq!(events[0].kind(), "index");
1516        assert_eq!(events[1].kind(), "summary");
1517
1518        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1519            let variation_key = VariationKey {
1520                version: Some(1),
1521                variation: None,
1522            };
1523            let feature = event_summary.features.get("myFlag");
1524            assert!(feature.is_some());
1525
1526            let feature = feature.unwrap();
1527            assert!(feature.counters.contains_key(&variation_key));
1528        } else {
1529            panic!("Event should be a summary type");
1530        }
1531    }
1532
1533    #[tokio::test]
1534    async fn variation_detail_tracks_prereq_events_correctly() {
1535        let td = TestData::new();
1536        let mut prereq_flag = basic_flag("prereqFlag");
1537        prereq_flag.track_events = true;
1538        td.use_preconfigured_flag(prereq_flag);
1539
1540        let mut main_flag = basic_flag_with_prereq("myFlag", "prereqFlag");
1541        main_flag.track_events = true;
1542        td.use_preconfigured_flag(main_flag);
1543
1544        let (client, event_rx) = make_client_with_test_data(&td);
1545        client.start_with_default_executor();
1546
1547        let context = ContextBuilder::new("bob")
1548            .build()
1549            .expect("Failed to create context");
1550
1551        let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1552
1553        assert!(detail.value.unwrap().as_bool().unwrap());
1554        assert!(matches!(
1555            detail.reason,
1556            Reason::Fallthrough {
1557                in_experiment: false
1558            }
1559        ));
1560        client.flush();
1561        client.close();
1562
1563        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1564        assert_eq!(events.len(), 4);
1565        assert_eq!(events[0].kind(), "index");
1566        assert_eq!(events[1].kind(), "feature");
1567        assert_eq!(events[2].kind(), "feature");
1568        assert_eq!(events[3].kind(), "summary");
1569
1570        if let OutputEvent::Summary(event_summary) = events[3].clone() {
1571            let variation_key = VariationKey {
1572                version: Some(1),
1573                variation: Some(1),
1574            };
1575            let feature = event_summary.features.get("myFlag");
1576            assert!(feature.is_some());
1577
1578            let feature = feature.unwrap();
1579            assert!(feature.counters.contains_key(&variation_key));
1580
1581            let variation_key = VariationKey {
1582                version: Some(1),
1583                variation: Some(1),
1584            };
1585            let feature = event_summary.features.get("prereqFlag");
1586            assert!(feature.is_some());
1587
1588            let feature = feature.unwrap();
1589            assert!(feature.counters.contains_key(&variation_key));
1590        }
1591    }
1592
1593    #[tokio::test]
1594    async fn variation_handles_failed_prereqs_correctly() {
1595        let td = TestData::new();
1596        let mut prereq_flag = basic_off_flag("prereqFlag");
1597        prereq_flag.track_events = true;
1598        td.use_preconfigured_flag(prereq_flag);
1599
1600        let mut main_flag = basic_flag_with_prereq("myFlag", "prereqFlag");
1601        main_flag.track_events = true;
1602        td.use_preconfigured_flag(main_flag);
1603
1604        let (client, event_rx) = make_client_with_test_data(&td);
1605        client.start_with_default_executor();
1606
1607        let context = ContextBuilder::new("bob")
1608            .build()
1609            .expect("Failed to create context");
1610
1611        let detail = client.variation(&context, "myFlag", FlagValue::Bool(false));
1612
1613        assert!(!detail.as_bool().unwrap());
1614        client.flush();
1615        client.close();
1616
1617        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1618        assert_eq!(events.len(), 4);
1619        assert_eq!(events[0].kind(), "index");
1620        assert_eq!(events[1].kind(), "feature");
1621        assert_eq!(events[2].kind(), "feature");
1622        assert_eq!(events[3].kind(), "summary");
1623
1624        if let OutputEvent::Summary(event_summary) = events[3].clone() {
1625            let variation_key = VariationKey {
1626                version: Some(1),
1627                variation: Some(0),
1628            };
1629            let feature = event_summary.features.get("myFlag");
1630            assert!(feature.is_some());
1631
1632            let feature = feature.unwrap();
1633            assert!(feature.counters.contains_key(&variation_key));
1634
1635            let variation_key = VariationKey {
1636                version: Some(1),
1637                variation: None,
1638            };
1639            let feature = event_summary.features.get("prereqFlag");
1640            assert!(feature.is_some());
1641
1642            let feature = feature.unwrap();
1643            assert!(feature.counters.contains_key(&variation_key));
1644        }
1645    }
1646
1647    #[test]
1648    fn variation_detail_handles_flag_not_found() {
1649        let (client, event_rx) = make_mocked_client();
1650        client.start_with_default_executor();
1651
1652        let context = ContextBuilder::new("bob")
1653            .build()
1654            .expect("Failed to create context");
1655        let detail = client.variation_detail(&context, "non-existent-flag", FlagValue::Bool(false));
1656
1657        assert!(!detail.value.unwrap().as_bool().unwrap());
1658        assert!(matches!(
1659            detail.reason,
1660            Reason::Error {
1661                error: eval::Error::FlagNotFound
1662            }
1663        ));
1664        client.flush();
1665        client.close();
1666
1667        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1668        assert_eq!(events.len(), 2);
1669        assert_eq!(events[0].kind(), "index");
1670        assert_eq!(events[1].kind(), "summary");
1671
1672        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1673            let variation_key = VariationKey {
1674                version: None,
1675                variation: None,
1676            };
1677            let feature = event_summary.features.get("non-existent-flag");
1678            assert!(feature.is_some());
1679
1680            let feature = feature.unwrap();
1681            assert!(feature.counters.contains_key(&variation_key));
1682        } else {
1683            panic!("Event should be a summary type");
1684        }
1685    }
1686
1687    #[tokio::test]
1688    async fn variation_detail_handles_client_not_ready() {
1689        let (client, event_rx) = make_mocked_client_with_delay(u64::MAX, false, false);
1690        client.start_with_default_executor();
1691        let context = ContextBuilder::new("bob")
1692            .build()
1693            .expect("Failed to create context");
1694
1695        let detail = client.variation_detail(&context, "non-existent-flag", FlagValue::Bool(false));
1696
1697        assert!(!detail.value.unwrap().as_bool().unwrap());
1698        assert!(matches!(
1699            detail.reason,
1700            Reason::Error {
1701                error: eval::Error::ClientNotReady
1702            }
1703        ));
1704        client.flush();
1705        client.close();
1706
1707        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1708        assert_eq!(events.len(), 2);
1709        assert_eq!(events[0].kind(), "index");
1710        assert_eq!(events[1].kind(), "summary");
1711
1712        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1713            let variation_key = VariationKey {
1714                version: None,
1715                variation: None,
1716            };
1717            let feature = event_summary.features.get("non-existent-flag");
1718            assert!(feature.is_some());
1719
1720            let feature = feature.unwrap();
1721            assert!(feature.counters.contains_key(&variation_key));
1722        } else {
1723            panic!("Event should be a summary type");
1724        }
1725    }
1726
1727    #[test]
1728    fn identify_sends_identify_event() {
1729        let (client, event_rx) = make_mocked_client();
1730        client.start_with_default_executor();
1731
1732        let context = ContextBuilder::new("bob")
1733            .build()
1734            .expect("Failed to create context");
1735
1736        client.identify(context);
1737        client.flush();
1738        client.close();
1739
1740        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1741        assert_eq!(events.len(), 1);
1742        assert_eq!(events[0].kind(), "identify");
1743    }
1744
1745    #[test]
1746    fn identify_sends_sends_nothing_in_offline_mode() {
1747        let (client, event_rx) = make_mocked_offline_client();
1748        client.start_with_default_executor();
1749
1750        let context = ContextBuilder::new("bob")
1751            .build()
1752            .expect("Failed to create context");
1753
1754        client.identify(context);
1755        client.flush();
1756        client.close();
1757
1758        assert_eq!(event_rx.iter().count(), 0);
1759    }
1760
1761    #[test]
1762    #[cfg(any(feature = "crypto-aws-lc-rs", feature = "crypto-openssl"))]
1763    fn secure_mode_hash() {
1764        let config = ConfigBuilder::new("secret")
1765            .offline(true)
1766            .build()
1767            .expect("config should build");
1768        let client = Client::build(config).expect("Should be built.");
1769        let context = ContextBuilder::new("Message")
1770            .build()
1771            .expect("Failed to create context");
1772
1773        assert_eq!(
1774            client
1775                .secure_mode_hash(&context)
1776                .expect("Hash should be computed"),
1777            "aa747c502a898200f9e4fa21bac68136f886a0e27aec70ba06daf2e2a5cb5597"
1778        );
1779    }
1780
1781    #[test]
1782    #[cfg(any(feature = "crypto-aws-lc-rs", feature = "crypto-openssl"))]
1783    fn secure_mode_hash_with_multi_kind() {
1784        let config = ConfigBuilder::new("secret")
1785            .offline(true)
1786            .build()
1787            .expect("config should build");
1788        let client = Client::build(config).expect("Should be built.");
1789
1790        let org = ContextBuilder::new("org-key|1")
1791            .kind("org")
1792            .build()
1793            .expect("Failed to create context");
1794        let user = ContextBuilder::new("user-key:2")
1795            .build()
1796            .expect("Failed to create context");
1797
1798        let context = MultiContextBuilder::new()
1799            .add_context(org)
1800            .add_context(user)
1801            .build()
1802            .expect("failed to build multi-context");
1803
1804        assert_eq!(
1805            client
1806                .secure_mode_hash(&context)
1807                .expect("Hash should be computed"),
1808            "5687e6383b920582ed50c2a96c98a115f1b6aad85a60579d761d9b8797415163"
1809        );
1810    }
1811
1812    #[derive(Serialize)]
1813    struct MyCustomData {
1814        pub answer: u32,
1815    }
1816
1817    #[test]
1818    fn track_sends_track_and_index_events() -> serde_json::Result<()> {
1819        let (client, event_rx) = make_mocked_client();
1820        client.start_with_default_executor();
1821
1822        let context = ContextBuilder::new("bob")
1823            .build()
1824            .expect("Failed to create context");
1825
1826        client.track_event(context.clone(), "event-with-null");
1827        client.track_data(context.clone(), "event-with-string", "string-data")?;
1828        client.track_data(context.clone(), "event-with-json", json!({"answer": 42}))?;
1829        client.track_data(
1830            context.clone(),
1831            "event-with-struct",
1832            MyCustomData { answer: 42 },
1833        )?;
1834        client.track_metric(context, "event-with-metric", 42.0, serde_json::Value::Null);
1835
1836        client.flush();
1837        client.close();
1838
1839        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1840        assert_eq!(events.len(), 6);
1841
1842        let mut events_by_type: HashMap<&str, usize> = HashMap::new();
1843        for event in events {
1844            if let Some(count) = events_by_type.get_mut(event.kind()) {
1845                *count += 1;
1846            } else {
1847                events_by_type.insert(event.kind(), 1);
1848            }
1849        }
1850        assert!(matches!(events_by_type.get("index"), Some(1)));
1851        assert!(matches!(events_by_type.get("custom"), Some(5)));
1852
1853        Ok(())
1854    }
1855
1856    #[test]
1857    fn track_sends_nothing_in_offline_mode() -> serde_json::Result<()> {
1858        let (client, event_rx) = make_mocked_offline_client();
1859        client.start_with_default_executor();
1860
1861        let context = ContextBuilder::new("bob")
1862            .build()
1863            .expect("Failed to create context");
1864
1865        client.track_event(context.clone(), "event-with-null");
1866        client.track_data(context.clone(), "event-with-string", "string-data")?;
1867        client.track_data(context.clone(), "event-with-json", json!({"answer": 42}))?;
1868        client.track_data(
1869            context.clone(),
1870            "event-with-struct",
1871            MyCustomData { answer: 42 },
1872        )?;
1873        client.track_metric(context, "event-with-metric", 42.0, serde_json::Value::Null);
1874
1875        client.flush();
1876        client.close();
1877
1878        assert_eq!(event_rx.iter().count(), 0);
1879
1880        Ok(())
1881    }
1882
1883    #[test]
1884    fn migration_handles_flag_not_found() {
1885        let (client, _event_rx) = make_mocked_client();
1886        client.start_with_default_executor();
1887
1888        let context = ContextBuilder::new("bob")
1889            .build()
1890            .expect("Failed to create context");
1891
1892        let (stage, _tracker) =
1893            client.migration_variation(&context, "non-existent-flag-key", Stage::Off);
1894
1895        assert_eq!(stage, Stage::Off);
1896    }
1897
1898    #[tokio::test]
1899    async fn migration_uses_non_migration_flag() {
1900        let td = TestData::new();
1901        td.use_preconfigured_flag(basic_flag("boolean-flag"));
1902        let (client, _event_rx) = make_client_with_test_data(&td);
1903        client.start_with_default_executor();
1904
1905        let context = ContextBuilder::new("bob")
1906            .build()
1907            .expect("Failed to create context");
1908
1909        let (stage, _tracker) = client.migration_variation(&context, "boolean-flag", Stage::Off);
1910
1911        assert_eq!(stage, Stage::Off);
1912    }
1913
1914    #[test_case(Stage::Off)]
1915    #[test_case(Stage::DualWrite)]
1916    #[test_case(Stage::Shadow)]
1917    #[test_case(Stage::Live)]
1918    #[test_case(Stage::Rampdown)]
1919    #[test_case(Stage::Complete)]
1920    #[tokio::test]
1921    async fn migration_can_determine_correct_stage_from_flag(stage: Stage) {
1922        let td = TestData::new();
1923        td.use_preconfigured_flag(basic_migration_flag("stage-flag", stage));
1924        let (client, _event_rx) = make_client_with_test_data(&td);
1925        client.start_with_default_executor();
1926
1927        let context = ContextBuilder::new("bob")
1928            .build()
1929            .expect("Failed to create context");
1930
1931        let (evaluated_stage, _tracker) =
1932            client.migration_variation(&context, "stage-flag", Stage::Off);
1933
1934        assert_eq!(evaluated_stage, stage);
1935    }
1936
1937    #[tokio::test]
1938    async fn migration_tracks_invoked_correctly() {
1939        migration_tracks_invoked_correctly_driver(Stage::Off, Operation::Read, vec![Origin::Old])
1940            .await;
1941        migration_tracks_invoked_correctly_driver(
1942            Stage::DualWrite,
1943            Operation::Read,
1944            vec![Origin::Old],
1945        )
1946        .await;
1947        migration_tracks_invoked_correctly_driver(
1948            Stage::Shadow,
1949            Operation::Read,
1950            vec![Origin::Old, Origin::New],
1951        )
1952        .await;
1953        migration_tracks_invoked_correctly_driver(
1954            Stage::Live,
1955            Operation::Read,
1956            vec![Origin::Old, Origin::New],
1957        )
1958        .await;
1959        migration_tracks_invoked_correctly_driver(
1960            Stage::Rampdown,
1961            Operation::Read,
1962            vec![Origin::New],
1963        )
1964        .await;
1965        migration_tracks_invoked_correctly_driver(
1966            Stage::Complete,
1967            Operation::Read,
1968            vec![Origin::New],
1969        )
1970        .await;
1971        migration_tracks_invoked_correctly_driver(Stage::Off, Operation::Write, vec![Origin::Old])
1972            .await;
1973        migration_tracks_invoked_correctly_driver(
1974            Stage::DualWrite,
1975            Operation::Write,
1976            vec![Origin::Old, Origin::New],
1977        )
1978        .await;
1979        migration_tracks_invoked_correctly_driver(
1980            Stage::Shadow,
1981            Operation::Write,
1982            vec![Origin::Old, Origin::New],
1983        )
1984        .await;
1985        migration_tracks_invoked_correctly_driver(
1986            Stage::Live,
1987            Operation::Write,
1988            vec![Origin::Old, Origin::New],
1989        )
1990        .await;
1991        migration_tracks_invoked_correctly_driver(
1992            Stage::Rampdown,
1993            Operation::Write,
1994            vec![Origin::Old, Origin::New],
1995        )
1996        .await;
1997        migration_tracks_invoked_correctly_driver(
1998            Stage::Complete,
1999            Operation::Write,
2000            vec![Origin::New],
2001        )
2002        .await;
2003    }
2004
2005    async fn migration_tracks_invoked_correctly_driver(
2006        stage: Stage,
2007        operation: Operation,
2008        origins: Vec<Origin>,
2009    ) {
2010        let td = TestData::new();
2011        td.use_preconfigured_flag(basic_migration_flag("stage-flag", stage));
2012        let (client, event_rx) = make_client_with_test_data(&td);
2013        let client = Arc::new(client);
2014        client.start_with_default_executor();
2015
2016        let mut migrator = MigratorBuilder::new(client.clone())
2017            .read(
2018                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2019                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2020                Some(|_, _| true),
2021            )
2022            .write(
2023                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2024                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2025            )
2026            .build()
2027            .expect("migrator should build");
2028
2029        let context = ContextBuilder::new("bob")
2030            .build()
2031            .expect("Failed to create context");
2032
2033        if let Operation::Read = operation {
2034            migrator
2035                .read(
2036                    &context,
2037                    "stage-flag".into(),
2038                    Stage::Off,
2039                    serde_json::Value::Null,
2040                )
2041                .await;
2042        } else {
2043            migrator
2044                .write(
2045                    &context,
2046                    "stage-flag".into(),
2047                    Stage::Off,
2048                    serde_json::Value::Null,
2049                )
2050                .await;
2051        }
2052
2053        client.flush();
2054        client.close();
2055
2056        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2057        assert_eq!(events.len(), 3);
2058        match &events[1] {
2059            OutputEvent::MigrationOp(event) => {
2060                assert!(event.invoked.len() == origins.len());
2061                assert!(event.invoked.iter().all(|i| origins.contains(i)));
2062            }
2063            _ => panic!("Expected migration event"),
2064        }
2065    }
2066
2067    #[tokio::test]
2068    async fn migration_tracks_latency() {
2069        migration_tracks_latency_driver(Stage::Off, Operation::Read, vec![Origin::Old]).await;
2070        migration_tracks_latency_driver(Stage::DualWrite, Operation::Read, vec![Origin::Old]).await;
2071        migration_tracks_latency_driver(
2072            Stage::Shadow,
2073            Operation::Read,
2074            vec![Origin::Old, Origin::New],
2075        )
2076        .await;
2077        migration_tracks_latency_driver(
2078            Stage::Live,
2079            Operation::Read,
2080            vec![Origin::Old, Origin::New],
2081        )
2082        .await;
2083        migration_tracks_latency_driver(Stage::Rampdown, Operation::Read, vec![Origin::New]).await;
2084        migration_tracks_latency_driver(Stage::Complete, Operation::Read, vec![Origin::New]).await;
2085        migration_tracks_latency_driver(Stage::Off, Operation::Write, vec![Origin::Old]).await;
2086        migration_tracks_latency_driver(
2087            Stage::DualWrite,
2088            Operation::Write,
2089            vec![Origin::Old, Origin::New],
2090        )
2091        .await;
2092        migration_tracks_latency_driver(
2093            Stage::Shadow,
2094            Operation::Write,
2095            vec![Origin::Old, Origin::New],
2096        )
2097        .await;
2098        migration_tracks_latency_driver(
2099            Stage::Live,
2100            Operation::Write,
2101            vec![Origin::Old, Origin::New],
2102        )
2103        .await;
2104        migration_tracks_latency_driver(
2105            Stage::Rampdown,
2106            Operation::Write,
2107            vec![Origin::Old, Origin::New],
2108        )
2109        .await;
2110        migration_tracks_latency_driver(Stage::Complete, Operation::Write, vec![Origin::New]).await;
2111    }
2112
2113    async fn migration_tracks_latency_driver(
2114        stage: Stage,
2115        operation: Operation,
2116        origins: Vec<Origin>,
2117    ) {
2118        let td = TestData::new();
2119        td.use_preconfigured_flag(basic_migration_flag("stage-flag", stage));
2120        let (client, event_rx) = make_client_with_test_data(&td);
2121        let client = Arc::new(client);
2122        client.start_with_default_executor();
2123
2124        let mut migrator = MigratorBuilder::new(client.clone())
2125            .track_latency(true)
2126            .read(
2127                |_| {
2128                    async move {
2129                        async_std::task::sleep(Duration::from_millis(100)).await;
2130                        Ok(serde_json::Value::Null)
2131                    }
2132                    .boxed()
2133                },
2134                |_| {
2135                    async move {
2136                        async_std::task::sleep(Duration::from_millis(100)).await;
2137                        Ok(serde_json::Value::Null)
2138                    }
2139                    .boxed()
2140                },
2141                Some(|_, _| true),
2142            )
2143            .write(
2144                |_| {
2145                    async move {
2146                        async_std::task::sleep(Duration::from_millis(100)).await;
2147                        Ok(serde_json::Value::Null)
2148                    }
2149                    .boxed()
2150                },
2151                |_| {
2152                    async move {
2153                        async_std::task::sleep(Duration::from_millis(100)).await;
2154                        Ok(serde_json::Value::Null)
2155                    }
2156                    .boxed()
2157                },
2158            )
2159            .build()
2160            .expect("migrator should build");
2161
2162        let context = ContextBuilder::new("bob")
2163            .build()
2164            .expect("Failed to create context");
2165
2166        if let Operation::Read = operation {
2167            migrator
2168                .read(
2169                    &context,
2170                    "stage-flag".into(),
2171                    Stage::Off,
2172                    serde_json::Value::Null,
2173                )
2174                .await;
2175        } else {
2176            migrator
2177                .write(
2178                    &context,
2179                    "stage-flag".into(),
2180                    Stage::Off,
2181                    serde_json::Value::Null,
2182                )
2183                .await;
2184        }
2185
2186        client.flush();
2187        client.close();
2188
2189        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2190        assert_eq!(events.len(), 3);
2191        match &events[1] {
2192            OutputEvent::MigrationOp(event) => {
2193                assert!(event.latency.len() == origins.len());
2194                assert!(event
2195                    .latency
2196                    .values()
2197                    .all(|l| l > &Duration::from_millis(100)));
2198            }
2199            _ => panic!("Expected migration event"),
2200        }
2201    }
2202
2203    #[tokio::test]
2204    async fn migration_tracks_read_errors() {
2205        migration_tracks_read_errors_driver(Stage::Off, vec![Origin::Old]).await;
2206        migration_tracks_read_errors_driver(Stage::DualWrite, vec![Origin::Old]).await;
2207        migration_tracks_read_errors_driver(Stage::Shadow, vec![Origin::Old, Origin::New]).await;
2208        migration_tracks_read_errors_driver(Stage::Live, vec![Origin::Old, Origin::New]).await;
2209        migration_tracks_read_errors_driver(Stage::Rampdown, vec![Origin::New]).await;
2210        migration_tracks_read_errors_driver(Stage::Complete, vec![Origin::New]).await;
2211    }
2212
2213    async fn migration_tracks_read_errors_driver(stage: Stage, origins: Vec<Origin>) {
2214        let td = TestData::new();
2215        td.use_preconfigured_flag(basic_migration_flag("stage-flag", stage));
2216        let (client, event_rx) = make_client_with_test_data(&td);
2217        let client = Arc::new(client);
2218        client.start_with_default_executor();
2219
2220        let mut migrator = MigratorBuilder::new(client.clone())
2221            .track_latency(true)
2222            .read(
2223                |_| async move { Err("fail".into()) }.boxed(),
2224                |_| async move { Err("fail".into()) }.boxed(),
2225                Some(|_: &String, _: &String| true),
2226            )
2227            .write(
2228                |_| async move { Err("fail".into()) }.boxed(),
2229                |_| async move { Err("fail".into()) }.boxed(),
2230            )
2231            .build()
2232            .expect("migrator should build");
2233
2234        let context = ContextBuilder::new("bob")
2235            .build()
2236            .expect("Failed to create context");
2237
2238        migrator
2239            .read(
2240                &context,
2241                "stage-flag".into(),
2242                Stage::Off,
2243                serde_json::Value::Null,
2244            )
2245            .await;
2246        client.flush();
2247        client.close();
2248
2249        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2250        assert_eq!(events.len(), 3);
2251        match &events[1] {
2252            OutputEvent::MigrationOp(event) => {
2253                assert!(event.errors.len() == origins.len());
2254                assert!(event.errors.iter().all(|i| origins.contains(i)));
2255            }
2256            _ => panic!("Expected migration event"),
2257        }
2258    }
2259
2260    #[tokio::test]
2261    async fn migration_tracks_authoritative_write_errors() {
2262        migration_tracks_authoritative_write_errors_driver(Stage::Off, vec![Origin::Old]).await;
2263        migration_tracks_authoritative_write_errors_driver(Stage::DualWrite, vec![Origin::Old])
2264            .await;
2265        migration_tracks_authoritative_write_errors_driver(Stage::Shadow, vec![Origin::Old]).await;
2266        migration_tracks_authoritative_write_errors_driver(Stage::Live, vec![Origin::New]).await;
2267        migration_tracks_authoritative_write_errors_driver(Stage::Rampdown, vec![Origin::New])
2268            .await;
2269        migration_tracks_authoritative_write_errors_driver(Stage::Complete, vec![Origin::New])
2270            .await;
2271    }
2272
2273    async fn migration_tracks_authoritative_write_errors_driver(
2274        stage: Stage,
2275        origins: Vec<Origin>,
2276    ) {
2277        let td = TestData::new();
2278        td.use_preconfigured_flag(basic_migration_flag("stage-flag", stage));
2279        let (client, event_rx) = make_client_with_test_data(&td);
2280        let client = Arc::new(client);
2281        client.start_with_default_executor();
2282
2283        let mut migrator = MigratorBuilder::new(client.clone())
2284            .track_latency(true)
2285            .read(
2286                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2287                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2288                None,
2289            )
2290            .write(
2291                |_| async move { Err("fail".into()) }.boxed(),
2292                |_| async move { Err("fail".into()) }.boxed(),
2293            )
2294            .build()
2295            .expect("migrator should build");
2296
2297        let context = ContextBuilder::new("bob")
2298            .build()
2299            .expect("Failed to create context");
2300
2301        migrator
2302            .write(
2303                &context,
2304                "stage-flag".into(),
2305                Stage::Off,
2306                serde_json::Value::Null,
2307            )
2308            .await;
2309
2310        client.flush();
2311        client.close();
2312
2313        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2314        assert_eq!(events.len(), 3);
2315        match &events[1] {
2316            OutputEvent::MigrationOp(event) => {
2317                assert!(event.errors.len() == origins.len());
2318                assert!(event.errors.iter().all(|i| origins.contains(i)));
2319            }
2320            _ => panic!("Expected migration event"),
2321        }
2322    }
2323
2324    #[tokio::test]
2325    async fn migration_tracks_nonauthoritative_write_errors() {
2326        migration_tracks_nonauthoritative_write_errors_driver(
2327            Stage::DualWrite,
2328            false,
2329            true,
2330            vec![Origin::New],
2331        )
2332        .await;
2333        migration_tracks_nonauthoritative_write_errors_driver(
2334            Stage::Shadow,
2335            false,
2336            true,
2337            vec![Origin::New],
2338        )
2339        .await;
2340        migration_tracks_nonauthoritative_write_errors_driver(
2341            Stage::Live,
2342            true,
2343            false,
2344            vec![Origin::Old],
2345        )
2346        .await;
2347        migration_tracks_nonauthoritative_write_errors_driver(
2348            Stage::Rampdown,
2349            true,
2350            false,
2351            vec![Origin::Old],
2352        )
2353        .await;
2354    }
2355
2356    async fn migration_tracks_nonauthoritative_write_errors_driver(
2357        stage: Stage,
2358        fail_old: bool,
2359        fail_new: bool,
2360        origins: Vec<Origin>,
2361    ) {
2362        let td = TestData::new();
2363        td.use_preconfigured_flag(basic_migration_flag("stage-flag", stage));
2364        let (client, event_rx) = make_client_with_test_data(&td);
2365        let client = Arc::new(client);
2366        client.start_with_default_executor();
2367
2368        let mut migrator = MigratorBuilder::new(client.clone())
2369            .track_latency(true)
2370            .read(
2371                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2372                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2373                None,
2374            )
2375            .write(
2376                move |_| {
2377                    async move {
2378                        if fail_old {
2379                            Err("fail".into())
2380                        } else {
2381                            Ok(serde_json::Value::Null)
2382                        }
2383                    }
2384                    .boxed()
2385                },
2386                move |_| {
2387                    async move {
2388                        if fail_new {
2389                            Err("fail".into())
2390                        } else {
2391                            Ok(serde_json::Value::Null)
2392                        }
2393                    }
2394                    .boxed()
2395                },
2396            )
2397            .build()
2398            .expect("migrator should build");
2399
2400        let context = ContextBuilder::new("bob")
2401            .build()
2402            .expect("Failed to create context");
2403
2404        migrator
2405            .write(
2406                &context,
2407                "stage-flag".into(),
2408                Stage::Off,
2409                serde_json::Value::Null,
2410            )
2411            .await;
2412
2413        client.flush();
2414        client.close();
2415
2416        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2417        assert_eq!(events.len(), 3);
2418        match &events[1] {
2419            OutputEvent::MigrationOp(event) => {
2420                assert!(event.errors.len() == origins.len());
2421                assert!(event.errors.iter().all(|i| origins.contains(i)));
2422            }
2423            _ => panic!("Expected migration event"),
2424        }
2425    }
2426
2427    #[tokio::test]
2428    async fn migration_tracks_consistency() {
2429        migration_tracks_consistency_driver(Stage::Shadow, "same", "same", true).await;
2430        migration_tracks_consistency_driver(Stage::Shadow, "same", "different", false).await;
2431        migration_tracks_consistency_driver(Stage::Live, "same", "same", true).await;
2432        migration_tracks_consistency_driver(Stage::Live, "same", "different", false).await;
2433    }
2434
2435    async fn migration_tracks_consistency_driver(
2436        stage: Stage,
2437        old_return: &'static str,
2438        new_return: &'static str,
2439        expected_consistency: bool,
2440    ) {
2441        let td = TestData::new();
2442        td.use_preconfigured_flag(basic_migration_flag("stage-flag", stage));
2443        let (client, event_rx) = make_client_with_test_data(&td);
2444        let client = Arc::new(client);
2445        client.start_with_default_executor();
2446
2447        let mut migrator = MigratorBuilder::new(client.clone())
2448            .track_latency(true)
2449            .read(
2450                |_| {
2451                    async move {
2452                        async_std::task::sleep(Duration::from_millis(100)).await;
2453                        Ok(serde_json::Value::String(old_return.to_string()))
2454                    }
2455                    .boxed()
2456                },
2457                |_| {
2458                    async move {
2459                        async_std::task::sleep(Duration::from_millis(100)).await;
2460                        Ok(serde_json::Value::String(new_return.to_string()))
2461                    }
2462                    .boxed()
2463                },
2464                Some(|lhs, rhs| lhs == rhs),
2465            )
2466            .write(
2467                |_| {
2468                    async move {
2469                        async_std::task::sleep(Duration::from_millis(100)).await;
2470                        Ok(serde_json::Value::Null)
2471                    }
2472                    .boxed()
2473                },
2474                |_| {
2475                    async move {
2476                        async_std::task::sleep(Duration::from_millis(100)).await;
2477                        Ok(serde_json::Value::Null)
2478                    }
2479                    .boxed()
2480                },
2481            )
2482            .build()
2483            .expect("migrator should build");
2484
2485        let context = ContextBuilder::new("bob")
2486            .build()
2487            .expect("Failed to create context");
2488
2489        migrator
2490            .read(
2491                &context,
2492                "stage-flag".into(),
2493                Stage::Off,
2494                serde_json::Value::Null,
2495            )
2496            .await;
2497
2498        client.flush();
2499        client.close();
2500
2501        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2502        assert_eq!(events.len(), 3);
2503        match &events[1] {
2504            OutputEvent::MigrationOp(event) => {
2505                assert!(event.consistency_check == Some(expected_consistency))
2506            }
2507            _ => panic!("Expected migration event"),
2508        }
2509    }
2510
2511    #[tokio::test]
2512    async fn client_flush_blocking_completes_successfully() {
2513        let (client, event_rx) = make_mocked_client();
2514        client.start_with_default_executor();
2515        client.wait_for_initialization(Duration::from_secs(1)).await;
2516
2517        let context = ContextBuilder::new("user-key")
2518            .build()
2519            .expect("Failed to create context");
2520
2521        client.identify(context);
2522
2523        let result = client.flush_blocking(Duration::from_secs(5)).await;
2524        assert!(result, "flush_blocking should complete successfully");
2525
2526        client.close();
2527
2528        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2529        assert!(!events.is_empty(), "Should have received identify event");
2530    }
2531
2532    #[tokio::test]
2533    async fn client_flush_blocking_with_zero_timeout() {
2534        let (client, event_rx) = make_mocked_client();
2535        client.start_with_default_executor();
2536        client.wait_for_initialization(Duration::from_secs(1)).await;
2537
2538        let context = ContextBuilder::new("user-key")
2539            .build()
2540            .expect("Failed to create context");
2541
2542        client.identify(context);
2543
2544        let result = client.flush_blocking(Duration::ZERO).await;
2545        assert!(
2546            result,
2547            "flush_blocking with zero timeout should complete successfully"
2548        );
2549
2550        client.close();
2551
2552        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2553        assert!(!events.is_empty(), "Should have received identify event");
2554    }
2555
2556    #[tokio::test]
2557    async fn client_flush_blocking_with_no_events() {
2558        let (client, _event_rx) = make_mocked_client();
2559        client.start_with_default_executor();
2560        client.wait_for_initialization(Duration::from_secs(1)).await;
2561
2562        let result = client.flush_blocking(Duration::from_secs(1)).await;
2563        assert!(
2564            result,
2565            "flush_blocking with no events should complete immediately"
2566        );
2567
2568        client.close();
2569    }
2570
2571    #[tokio::test]
2572    async fn client_flush_blocking_multiple_concurrent_calls() {
2573        let (client, event_rx) = make_mocked_client();
2574        client.start_with_default_executor();
2575        client.wait_for_initialization(Duration::from_secs(1)).await;
2576
2577        let context = ContextBuilder::new("user-key")
2578            .build()
2579            .expect("Failed to create context");
2580
2581        client.identify(context);
2582
2583        // Make multiple concurrent flush_blocking calls
2584        let (result1, result2, result3) = tokio::join!(
2585            client.flush_blocking(Duration::from_secs(5)),
2586            client.flush_blocking(Duration::from_secs(5)),
2587            client.flush_blocking(Duration::from_secs(5)),
2588        );
2589
2590        assert!(result1, "First flush_blocking should succeed");
2591        assert!(result2, "Second flush_blocking should succeed");
2592        assert!(result3, "Third flush_blocking should succeed");
2593
2594        client.close();
2595
2596        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2597        assert!(!events.is_empty(), "Should have received identify event");
2598    }
2599
2600    fn make_mocked_client_with_delay(
2601        delay: u64,
2602        offline: bool,
2603        daemon_mode: bool,
2604    ) -> (Client, Receiver<OutputEvent>) {
2605        let updates = Arc::new(MockDataSource::new_with_init_delay(delay));
2606        let (event_sender, event_rx) = create_event_sender();
2607
2608        let config = ConfigBuilder::new("sdk-key")
2609            .offline(offline)
2610            .daemon_mode(daemon_mode)
2611            .data_source(MockDataSourceBuilder::new().data_source(updates))
2612            .event_processor(
2613                EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new()
2614                    .event_sender(Arc::new(event_sender)),
2615            )
2616            .build()
2617            .expect("config should build");
2618
2619        let client = Client::build(config).expect("Should be built.");
2620
2621        (client, event_rx)
2622    }
2623
2624    fn make_mocked_offline_client() -> (Client, Receiver<OutputEvent>) {
2625        make_mocked_client_with_delay(0, true, false)
2626    }
2627
2628    fn make_mocked_client() -> (Client, Receiver<OutputEvent>) {
2629        make_mocked_client_with_delay(0, false, false)
2630    }
2631
2632    fn make_client_with_test_data(td: &TestData) -> (Client, Receiver<OutputEvent>) {
2633        let (event_sender, event_rx) = create_event_sender();
2634        let config = ConfigBuilder::new("sdk-key")
2635            .data_source(td)
2636            .event_processor(
2637                EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new()
2638                    .event_sender(Arc::new(event_sender)),
2639            )
2640            .build()
2641            .expect("config should build");
2642        let client = Client::build(config).expect("Should be built.");
2643        (client, event_rx)
2644    }
2645
2646    #[test]
2647    fn client_builds_successfully() {
2648        let config = ConfigBuilder::new("sdk-key")
2649            .offline(true)
2650            .build()
2651            .expect("config should build");
2652
2653        let client = Client::build(config).expect("client should build successfully");
2654
2655        assert!(
2656            !client.started.load(Ordering::SeqCst),
2657            "client should not be started yet"
2658        );
2659        assert!(client.offline, "client should be in offline mode");
2660        assert_eq!(client.sdk_key, "sdk-key", "sdk_key should match");
2661    }
2662}