Skip to main content

launchdarkly_server_sdk/
client.rs

1use eval::Context;
2use parking_lot::RwLock;
3use std::io;
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7use tokio::runtime::Runtime;
8
9use launchdarkly_server_sdk_evaluation::{self as eval, Detail, FlagValue, PrerequisiteEvent};
10use serde::Serialize;
11use thiserror::Error;
12use tokio::sync::{broadcast, Semaphore};
13
14use super::config::Config;
15use super::data_source::DataSource;
16use super::data_source_builders::BuildError as DataSourceError;
17use super::evaluation::{FlagDetail, FlagDetailConfig};
18use super::stores::store::DataStore;
19use super::stores::store_builders::BuildError as DataStoreError;
20use crate::config::BuildError as ConfigBuildError;
21use crate::events::event::EventFactory;
22use crate::events::event::InputEvent;
23use crate::events::processor::EventProcessor;
24use crate::events::processor_builders::BuildError as EventProcessorError;
25use crate::{MigrationOpTracker, Stage};
26
27struct EventsScope {
28    disabled: bool,
29    event_factory: EventFactory,
30    prerequisite_event_recorder: Box<dyn eval::PrerequisiteEventRecorder + Send + Sync>,
31}
32
33struct PrerequisiteEventRecorder {
34    event_factory: EventFactory,
35    event_processor: Arc<dyn EventProcessor>,
36}
37
38impl eval::PrerequisiteEventRecorder for PrerequisiteEventRecorder {
39    fn record(&self, event: PrerequisiteEvent) {
40        let evt = self.event_factory.new_eval_event(
41            &event.prerequisite_flag.key,
42            event.context.clone(),
43            &event.prerequisite_flag,
44            event.prerequisite_result,
45            FlagValue::Json(serde_json::Value::Null),
46            Some(event.target_flag_key),
47        );
48
49        self.event_processor.send(evt);
50    }
51}
52
53/// Error type used to represent failures when building a [Client] instance.
54#[non_exhaustive]
55#[derive(Debug, Error)]
56pub enum BuildError {
57    /// Error used when a configuration setting is invalid. This typically indicates an invalid URL.
58    #[error("invalid client config: {0}")]
59    InvalidConfig(String),
60}
61
62impl From<DataSourceError> for BuildError {
63    fn from(error: DataSourceError) -> Self {
64        Self::InvalidConfig(error.to_string())
65    }
66}
67
68impl From<DataStoreError> for BuildError {
69    fn from(error: DataStoreError) -> Self {
70        Self::InvalidConfig(error.to_string())
71    }
72}
73
74impl From<EventProcessorError> for BuildError {
75    fn from(error: EventProcessorError) -> Self {
76        Self::InvalidConfig(error.to_string())
77    }
78}
79
80impl From<ConfigBuildError> for BuildError {
81    fn from(error: ConfigBuildError) -> Self {
82        Self::InvalidConfig(error.to_string())
83    }
84}
85
86/// Error type used to represent failures when starting the [Client].
87#[non_exhaustive]
88#[derive(Debug, Error)]
89pub enum StartError {
90    /// Error used when spawning a background there fails.
91    #[error("couldn't spawn background thread for client: {0}")]
92    SpawnFailed(io::Error),
93}
94
95#[derive(PartialEq, Copy, Clone, Debug)]
96enum ClientInitState {
97    Initializing = 0,
98    Initialized = 1,
99    InitializationFailed = 2,
100}
101
102impl PartialEq<usize> for ClientInitState {
103    fn eq(&self, other: &usize) -> bool {
104        *self as usize == *other
105    }
106}
107
108impl From<usize> for ClientInitState {
109    fn from(val: usize) -> Self {
110        match val {
111            0 => ClientInitState::Initializing,
112            1 => ClientInitState::Initialized,
113            2 => ClientInitState::InitializationFailed,
114            _ => unreachable!(),
115        }
116    }
117}
118
119/// A client for the LaunchDarkly API.
120///
121/// In order to create a client instance, first create a config using [crate::ConfigBuilder].
122///
123/// # Examples
124///
125/// Creating a client, with default configuration.
126/// ```
127/// # use launchdarkly_server_sdk::{Client, ConfigBuilder, BuildError};
128/// # fn main() -> Result<(), BuildError> {
129///     let ld_client = Client::build(ConfigBuilder::new("sdk-key").build()?)?;
130/// #   Ok(())
131/// # }
132/// ```
133///
134/// Creating an instance which connects to a relay proxy.
135/// ```
136/// # use launchdarkly_server_sdk::{Client, ConfigBuilder, ServiceEndpointsBuilder, BuildError};
137/// # fn main() -> Result<(), BuildError> {
138///     let ld_client = Client::build(ConfigBuilder::new("sdk-key")
139///         .service_endpoints(ServiceEndpointsBuilder::new()
140///             .relay_proxy("http://my-relay-hostname:8080")
141///         ).build()?
142///     )?;
143/// #   Ok(())
144/// # }
145/// ```
146///
147/// Each builder type includes usage examples for the builder.
148pub struct Client {
149    event_processor: Arc<dyn EventProcessor>,
150    data_source: Arc<dyn DataSource>,
151    data_store: Arc<RwLock<dyn DataStore>>,
152    events_default: EventsScope,
153    events_with_reasons: EventsScope,
154    init_notify: Arc<Semaphore>,
155    init_state: Arc<AtomicUsize>,
156    started: AtomicBool,
157    offline: bool,
158    daemon_mode: bool,
159    sdk_key: String,
160    shutdown_broadcast: broadcast::Sender<()>,
161    runtime: RwLock<Option<Runtime>>,
162}
163
164impl Client {
165    /// Create a new instance of a [Client] based on the provided [Config] parameter.
166    pub fn build(config: Config) -> Result<Self, BuildError> {
167        if config.offline() {
168            info!("Started LaunchDarkly Client in offline mode");
169        } else if config.daemon_mode() {
170            info!("Started LaunchDarkly Client in daemon mode");
171        }
172
173        let tags = config.application_tag();
174
175        let endpoints = config.service_endpoints_builder().build()?;
176        let event_processor =
177            config
178                .event_processor_builder()
179                .build(&endpoints, config.sdk_key(), tags.clone())?;
180        let data_source =
181            config
182                .data_source_builder()
183                .build(&endpoints, config.sdk_key(), tags.clone())?;
184        let data_store = config.data_store_builder().build()?;
185
186        let events_default = EventsScope {
187            disabled: config.offline(),
188            event_factory: EventFactory::new(false),
189            prerequisite_event_recorder: Box::new(PrerequisiteEventRecorder {
190                event_factory: EventFactory::new(false),
191                event_processor: event_processor.clone(),
192            }),
193        };
194
195        let events_with_reasons = EventsScope {
196            disabled: config.offline(),
197            event_factory: EventFactory::new(true),
198            prerequisite_event_recorder: Box::new(PrerequisiteEventRecorder {
199                event_factory: EventFactory::new(true),
200                event_processor: event_processor.clone(),
201            }),
202        };
203
204        let (shutdown_tx, _) = broadcast::channel(1);
205
206        Ok(Client {
207            event_processor,
208            data_source,
209            data_store,
210            events_default,
211            events_with_reasons,
212            init_notify: Arc::new(Semaphore::new(0)),
213            init_state: Arc::new(AtomicUsize::new(ClientInitState::Initializing as usize)),
214            started: AtomicBool::new(false),
215            offline: config.offline(),
216            daemon_mode: config.daemon_mode(),
217            sdk_key: config.sdk_key().into(),
218            shutdown_broadcast: shutdown_tx,
219            runtime: RwLock::new(None),
220        })
221    }
222
223    /// Starts a client in the current thread, which must have a default tokio runtime.
224    pub fn start_with_default_executor(&self) {
225        if self.started.load(Ordering::SeqCst) {
226            return;
227        }
228        self.started.store(true, Ordering::SeqCst);
229        self.start_with_default_executor_internal();
230    }
231
232    fn start_with_default_executor_internal(&self) {
233        // These clones are going to move into the closure, we
234        // do not want to move or reference `self`, because
235        // then lifetimes will get involved.
236        let notify = self.init_notify.clone();
237        let init_state = self.init_state.clone();
238
239        self.data_source.subscribe(
240            self.data_store.clone(),
241            Arc::new(move |success| {
242                init_state.store(
243                    (if success {
244                        ClientInitState::Initialized
245                    } else {
246                        ClientInitState::InitializationFailed
247                    }) as usize,
248                    Ordering::SeqCst,
249                );
250                notify.add_permits(1);
251            }),
252            self.shutdown_broadcast.subscribe(),
253        );
254    }
255
256    /// Creates a new tokio runtime and then starts the client. Tasks from the client will
257    /// be executed on created runtime.
258    /// If your application already has a tokio runtime, then you can use
259    /// [crate::Client::start_with_default_executor] and the client will dispatch tasks to
260    /// your existing runtime.
261    pub fn start_with_runtime(&self) -> Result<bool, StartError> {
262        if self.started.load(Ordering::SeqCst) {
263            return Ok(true);
264        }
265        self.started.store(true, Ordering::SeqCst);
266
267        let runtime = Runtime::new().map_err(StartError::SpawnFailed)?;
268        let _guard = runtime.enter();
269        self.runtime.write().replace(runtime);
270
271        self.start_with_default_executor_internal();
272
273        Ok(true)
274    }
275
276    /// This is an async method that will resolve once initialization is complete.
277    /// Initialization being complete does not mean that initialization was a success.
278    /// The return value from the method indicates if the client successfully initialized.
279    #[deprecated(
280        note = "blocking without a timeout is discouraged, use wait_for_initialization instead"
281    )]
282    pub async fn initialized_async(&self) -> bool {
283        self.initialized_async_internal().await
284    }
285
286    /// This is an async method that will resolve once initialization is complete or the specified
287    /// timeout has occurred.
288    ///
289    /// If the timeout is triggered, this method will return `None`. Otherwise, the method will
290    /// return a boolean indicating whether or not the SDK has successfully initialized.
291    pub async fn wait_for_initialization(&self, timeout: Duration) -> Option<bool> {
292        if timeout > Duration::from_secs(60) {
293            warn!("wait_for_initialization was configured to block for up to {} seconds. We recommend blocking no longer than 60 seconds.", timeout.as_secs());
294        }
295
296        let initialized = tokio::time::timeout(timeout, self.initialized_async_internal()).await;
297        initialized.ok()
298    }
299
300    async fn initialized_async_internal(&self) -> bool {
301        if self.offline || self.daemon_mode {
302            return true;
303        }
304
305        // If the client is not initialized, then we need to wait for it to be initialized.
306        // Because we are using atomic types, and not a lock, then there is still the possibility
307        // that the value will change between the read and when we wait. We use a semaphore to wait,
308        // and we do not forget the permit, therefore if the permit has been added, then we will get
309        // it very quickly and reduce blocking.
310        if ClientInitState::Initialized != self.init_state.load(Ordering::SeqCst) {
311            let _permit = self.init_notify.acquire().await;
312        }
313        ClientInitState::Initialized == self.init_state.load(Ordering::SeqCst)
314    }
315
316    /// This function synchronously returns if the SDK is initialized.
317    /// In the case of unrecoverable errors in establishing a connection it is possible for the
318    /// SDK to never become initialized.
319    pub fn initialized(&self) -> bool {
320        self.offline
321            || self.daemon_mode
322            || ClientInitState::Initialized == self.init_state.load(Ordering::SeqCst)
323    }
324
325    /// Close shuts down the LaunchDarkly client. After calling this, the LaunchDarkly client
326    /// should no longer be used. The method will block until all pending analytics events (if any)
327    /// been sent.
328    pub fn close(&self) {
329        self.event_processor.close();
330
331        // If the system is in offline mode or daemon mode, no receiver will be listening to this
332        // broadcast channel, so sending on it would always result in an error.
333        if !self.offline && !self.daemon_mode {
334            if let Err(e) = self.shutdown_broadcast.send(()) {
335                error!("Failed to shutdown client appropriately: {e}");
336            }
337        }
338
339        // Potentially take the runtime we created when starting the client and do nothing with it
340        // so it drops, closing out all spawned tasks.
341        self.runtime.write().take();
342    }
343
344    /// Flush tells the client that all pending analytics events (if any) should be delivered as
345    /// soon as possible. Flushing is asynchronous, so this method will return before it is
346    /// complete. However, if you call [Client::close], events are guaranteed to be sent before
347    /// that method returns.
348    ///
349    /// For more information, see the Reference Guide:
350    /// <https://docs.launchdarkly.com/sdk/features/flush#rust>.
351    pub fn flush(&self) {
352        self.event_processor.flush();
353    }
354
355    /// Identify reports details about a context.
356    ///
357    /// For more information, see the Reference Guide:
358    /// <https://docs.launchdarkly.com/sdk/features/identify#rust>
359    pub fn identify(&self, context: Context) {
360        if self.events_default.disabled {
361            return;
362        }
363
364        self.send_internal(self.events_default.event_factory.new_identify(context));
365    }
366
367    /// Returns the value of a boolean feature flag for a given context.
368    ///
369    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned
370    /// off and has no off variation.
371    ///
372    /// For more information, see the Reference Guide:
373    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
374    pub fn bool_variation(&self, context: &Context, flag_key: &str, default: bool) -> bool {
375        let val = self.variation(context, flag_key, default);
376        if let Some(b) = val.as_bool() {
377            b
378        } else {
379            warn!("bool_variation called for a non-bool flag {flag_key:?} (got {val:?})");
380            default
381        }
382    }
383
384    /// Returns the value of a string feature flag for a given context.
385    ///
386    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned
387    /// off and has no off variation.
388    ///
389    /// For more information, see the Reference Guide:
390    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
391    pub fn str_variation(&self, context: &Context, flag_key: &str, default: String) -> String {
392        let val = self.variation(context, flag_key, default.clone());
393        if let Some(s) = val.as_string() {
394            s
395        } else {
396            warn!("str_variation called for a non-string flag {flag_key:?} (got {val:?})");
397            default
398        }
399    }
400
401    /// Returns the value of a float feature flag for a given context.
402    ///
403    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned
404    /// off and has no off variation.
405    ///
406    /// For more information, see the Reference Guide:
407    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
408    pub fn float_variation(&self, context: &Context, flag_key: &str, default: f64) -> f64 {
409        let val = self.variation(context, flag_key, default);
410        if let Some(f) = val.as_float() {
411            f
412        } else {
413            warn!("float_variation called for a non-float flag {flag_key:?} (got {val:?})");
414            default
415        }
416    }
417
418    /// Returns the value of a integer feature flag for a given context.
419    ///
420    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned
421    /// off and has no off variation.
422    ///
423    /// For more information, see the Reference Guide:
424    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
425    pub fn int_variation(&self, context: &Context, flag_key: &str, default: i64) -> i64 {
426        let val = self.variation(context, flag_key, default);
427        if let Some(f) = val.as_int() {
428            f
429        } else {
430            warn!("int_variation called for a non-int flag {flag_key:?} (got {val:?})");
431            default
432        }
433    }
434
435    /// Returns the value of a feature flag for the given context, allowing the value to be
436    /// of any JSON type.
437    ///
438    /// The value is returned as an [serde_json::Value].
439    ///
440    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned off.
441    ///
442    /// For more information, see the Reference Guide:
443    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
444    pub fn json_variation(
445        &self,
446        context: &Context,
447        flag_key: &str,
448        default: serde_json::Value,
449    ) -> serde_json::Value {
450        self.variation(context, flag_key, default.clone())
451            .as_json()
452            .unwrap_or(default)
453    }
454
455    /// This method is the same as [Client::bool_variation], but also returns further information
456    /// about how the value was calculated. The "reason" data will also be included in analytics
457    /// events.
458    ///
459    /// For more information, see the Reference Guide:
460    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
461    pub fn bool_variation_detail(
462        &self,
463        context: &Context,
464        flag_key: &str,
465        default: bool,
466    ) -> Detail<bool> {
467        self.variation_detail(context, flag_key, default).try_map(
468            |val| val.as_bool(),
469            default,
470            eval::Error::WrongType,
471        )
472    }
473
474    /// This method is the same as [Client::str_variation], but also returns further information
475    /// about how the value was calculated. The "reason" data will also be included in analytics
476    /// events.
477    ///
478    /// For more information, see the Reference Guide:
479    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
480    pub fn str_variation_detail(
481        &self,
482        context: &Context,
483        flag_key: &str,
484        default: String,
485    ) -> Detail<String> {
486        self.variation_detail(context, flag_key, default.clone())
487            .try_map(|val| val.as_string(), default, eval::Error::WrongType)
488    }
489
490    /// This method is the same as [Client::float_variation], but also returns further information
491    /// about how the value was calculated. The "reason" data will also be included in analytics
492    /// events.
493    ///
494    /// For more information, see the Reference Guide:
495    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
496    pub fn float_variation_detail(
497        &self,
498        context: &Context,
499        flag_key: &str,
500        default: f64,
501    ) -> Detail<f64> {
502        self.variation_detail(context, flag_key, default).try_map(
503            |val| val.as_float(),
504            default,
505            eval::Error::WrongType,
506        )
507    }
508
509    /// This method is the same as [Client::int_variation], but also returns further information
510    /// about how the value was calculated. The "reason" data will also be included in analytics
511    /// events.
512    ///
513    /// For more information, see the Reference Guide:
514    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
515    pub fn int_variation_detail(
516        &self,
517        context: &Context,
518        flag_key: &str,
519        default: i64,
520    ) -> Detail<i64> {
521        self.variation_detail(context, flag_key, default).try_map(
522            |val| val.as_int(),
523            default,
524            eval::Error::WrongType,
525        )
526    }
527
528    /// This method is the same as [Client::json_variation], but also returns further information
529    /// about how the value was calculated. The "reason" data will also be included in analytics
530    /// events.
531    ///
532    /// For more information, see the Reference Guide:
533    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
534    pub fn json_variation_detail(
535        &self,
536        context: &Context,
537        flag_key: &str,
538        default: serde_json::Value,
539    ) -> Detail<serde_json::Value> {
540        self.variation_detail(context, flag_key, default.clone())
541            .try_map(|val| val.as_json(), default, eval::Error::WrongType)
542    }
543
544    /// Generates the secure mode hash value for a context.
545    ///
546    /// For more information, see the Reference Guide:
547    /// <https://docs.launchdarkly.com/sdk/features/secure-mode#rust>.
548    pub fn secure_mode_hash(&self, context: &Context) -> String {
549        let key = aws_lc_rs::hmac::Key::new(aws_lc_rs::hmac::HMAC_SHA256, self.sdk_key.as_bytes());
550        let tag = aws_lc_rs::hmac::sign(&key, context.canonical_key().as_bytes());
551
552        data_encoding::HEXLOWER.encode(tag.as_ref())
553    }
554
555    /// Returns an object that encapsulates the state of all feature flags for a given context. This
556    /// includes the flag values, and also metadata that can be used on the front end.
557    ///
558    /// The most common use case for this method is to bootstrap a set of client-side feature flags
559    /// from a back-end service.
560    ///
561    /// You may pass any configuration of [FlagDetailConfig] to control what data is included.
562    ///
563    /// For more information, see the Reference Guide:
564    /// <https://docs.launchdarkly.com/sdk/features/all-flags#rust>
565    pub fn all_flags_detail(
566        &self,
567        context: &Context,
568        flag_state_config: FlagDetailConfig,
569    ) -> FlagDetail {
570        if self.offline {
571            warn!(
572                "all_flags_detail() called, but client is in offline mode. Returning empty state"
573            );
574            return FlagDetail::new(false);
575        }
576
577        if !self.initialized() {
578            warn!("all_flags_detail() called before client has finished initializing! Feature store unavailable - returning empty state");
579            return FlagDetail::new(false);
580        }
581
582        let data_store = self.data_store.read();
583
584        let mut flag_detail = FlagDetail::new(true);
585        flag_detail.populate(&*data_store, context, flag_state_config);
586
587        flag_detail
588    }
589
590    /// This method is the same as [Client::variation], but also returns further information about
591    /// how the value was calculated. The "reason" data will also be included in analytics events.
592    ///
593    /// For more information, see the Reference Guide:
594    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
595    pub fn variation_detail<T: Into<FlagValue> + Clone>(
596        &self,
597        context: &Context,
598        flag_key: &str,
599        default: T,
600    ) -> Detail<FlagValue> {
601        let (detail, _) =
602            self.variation_internal(context, flag_key, default, &self.events_with_reasons);
603        detail
604    }
605
606    /// This is a generic function which returns the value of a feature flag for a given context.
607    ///
608    /// This method is an alternatively to the type specified methods (e.g.
609    /// [Client::bool_variation], [Client::int_variation], etc.).
610    ///
611    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned
612    /// off and has no off variation.
613    ///
614    /// For more information, see the Reference Guide:
615    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
616    pub fn variation<T: Into<FlagValue> + Clone>(
617        &self,
618        context: &Context,
619        flag_key: &str,
620        default: T,
621    ) -> FlagValue {
622        let (detail, _) = self.variation_internal(context, flag_key, default, &self.events_default);
623        detail.value.unwrap()
624    }
625
626    /// This method returns the migration stage of the migration feature flag for the given
627    /// evaluation context.
628    ///
629    /// This method returns the default stage if there is an error or the flag does not exist.
630    pub fn migration_variation(
631        &self,
632        context: &Context,
633        flag_key: &str,
634        default_stage: Stage,
635    ) -> (Stage, Arc<Mutex<MigrationOpTracker>>) {
636        let (detail, flag) =
637            self.variation_internal(context, flag_key, default_stage, &self.events_default);
638
639        let migration_detail =
640            detail.try_map(|v| v.try_into().ok(), default_stage, eval::Error::WrongType);
641        let tracker = MigrationOpTracker::new(
642            flag_key.into(),
643            flag,
644            context.clone(),
645            migration_detail.clone(),
646            default_stage,
647        );
648
649        (
650            migration_detail.value.unwrap_or(default_stage),
651            Arc::new(Mutex::new(tracker)),
652        )
653    }
654
655    /// Reports that a context has performed an event.
656    ///
657    /// The `key` parameter is defined by the application and will be shown in analytics reports;
658    /// it normally corresponds to the event name of a metric that you have created through the
659    /// LaunchDarkly dashboard. If you want to associate additional data with this event, use
660    /// [Client::track_data] or [Client::track_metric].
661    ///
662    /// For more information, see the Reference Guide:
663    /// <https://docs.launchdarkly.com/sdk/features/events#rust>.
664    pub fn track_event(&self, context: Context, key: impl Into<String>) {
665        let _ = self.track(context, key, None, serde_json::Value::Null);
666    }
667
668    /// Reports that a context has performed an event, and associates it with custom data.
669    ///
670    /// The `key` parameter is defined by the application and will be shown in analytics reports;
671    /// it normally corresponds to the event name of a metric that you have created through the
672    /// LaunchDarkly dashboard.
673    ///
674    /// `data` parameter is any type that implements [Serialize]. If no such value is needed, use
675    /// [serde_json::Value::Null] (or call [Client::track_event] instead). To send a numeric value
676    /// for experimentation, use [Client::track_metric].
677    ///
678    /// For more information, see the Reference Guide:
679    /// <https://docs.launchdarkly.com/sdk/features/events#rust>.
680    pub fn track_data(
681        &self,
682        context: Context,
683        key: impl Into<String>,
684        data: impl Serialize,
685    ) -> serde_json::Result<()> {
686        self.track(context, key, None, data)
687    }
688
689    /// Reports that a context has performed an event, and associates it with a numeric value. This
690    /// value is used by the LaunchDarkly experimentation feature in numeric custom metrics, and
691    /// will also be returned as part of the custom event for Data Export.
692    ///
693    /// The `key` parameter is defined by the application and will be shown in analytics reports;
694    /// it normally corresponds to the event name of a metric that you have created through the
695    /// LaunchDarkly dashboard.
696    ///
697    /// For more information, see the Reference Guide:
698    /// <https://docs.launchdarkly.com/sdk/features/events#rust>.
699    pub fn track_metric(
700        &self,
701        context: Context,
702        key: impl Into<String>,
703        value: f64,
704        data: impl Serialize,
705    ) {
706        let _ = self.track(context, key, Some(value), data);
707    }
708
709    fn track(
710        &self,
711        context: Context,
712        key: impl Into<String>,
713        metric_value: Option<f64>,
714        data: impl Serialize,
715    ) -> serde_json::Result<()> {
716        if !self.events_default.disabled {
717            let event =
718                self.events_default
719                    .event_factory
720                    .new_custom(context, key, metric_value, data)?;
721
722            self.send_internal(event);
723        }
724
725        Ok(())
726    }
727
728    /// Tracks the results of a migrations operation. This event includes measurements which can be
729    /// used to enhance the observability of a migration within the LaunchDarkly UI.
730    ///
731    /// This event should be generated through [crate::MigrationOpTracker]. If you are using the
732    /// [crate::Migrator] to handle migrations, this event will be created and emitted
733    /// automatically.
734    pub fn track_migration_op(&self, tracker: Arc<Mutex<MigrationOpTracker>>) {
735        if self.events_default.disabled {
736            return;
737        }
738
739        match tracker.lock() {
740            Ok(tracker) => {
741                let event = tracker.build();
742                match event {
743                    Ok(event) => {
744                        self.send_internal(
745                            self.events_default.event_factory.new_migration_op(event),
746                        );
747                    }
748                    Err(e) => error!("Failed to build migration event, no event will be sent: {e}"),
749                }
750            }
751            Err(e) => error!("Failed to lock migration tracker, no event will be sent: {e}"),
752        }
753    }
754
755    fn variation_internal<T: Into<FlagValue> + Clone>(
756        &self,
757        context: &Context,
758        flag_key: &str,
759        default: T,
760        events_scope: &EventsScope,
761    ) -> (Detail<FlagValue>, Option<eval::Flag>) {
762        if self.offline {
763            return (
764                Detail::err_default(eval::Error::ClientNotReady, default.into()),
765                None,
766            );
767        }
768
769        let (flag, result) = match self.initialized() {
770            false => (
771                None,
772                Detail::err_default(eval::Error::ClientNotReady, default.clone().into()),
773            ),
774            true => {
775                let data_store = self.data_store.read();
776                match data_store.flag(flag_key) {
777                    Some(flag) => {
778                        let result = eval::evaluate(
779                            data_store.to_store(),
780                            &flag,
781                            context,
782                            Some(&*events_scope.prerequisite_event_recorder),
783                        )
784                        .map(|v| v.clone())
785                        .or(default.clone().into());
786
787                        (Some(flag), result)
788                    }
789                    None => (
790                        None,
791                        Detail::err_default(eval::Error::FlagNotFound, default.clone().into()),
792                    ),
793                }
794            }
795        };
796
797        if !events_scope.disabled {
798            let event = match &flag {
799                Some(f) => events_scope.event_factory.new_eval_event(
800                    flag_key,
801                    context.clone(),
802                    f,
803                    result.clone(),
804                    default.into(),
805                    None,
806                ),
807                None => events_scope.event_factory.new_unknown_flag_event(
808                    flag_key,
809                    context.clone(),
810                    result.clone(),
811                    default.into(),
812                ),
813            };
814            self.send_internal(event);
815        }
816
817        (result, flag)
818    }
819
820    fn send_internal(&self, event: InputEvent) {
821        self.event_processor.send(event);
822    }
823}
824
825#[cfg(test)]
826mod tests {
827    use assert_json_diff::assert_json_eq;
828    use crossbeam_channel::Receiver;
829    use eval::{ContextBuilder, MultiContextBuilder};
830    use futures::FutureExt;
831    use hyper::client::HttpConnector;
832    use launchdarkly_server_sdk_evaluation::{Flag, Reason, Segment};
833    use maplit::hashmap;
834    use std::collections::HashMap;
835    use tokio::time::Instant;
836
837    use crate::data_source::MockDataSource;
838    use crate::data_source_builders::MockDataSourceBuilder;
839    use crate::events::create_event_sender;
840    use crate::events::event::{OutputEvent, VariationKey};
841    use crate::events::processor_builders::EventProcessorBuilder;
842    use crate::stores::persistent_store::tests::InMemoryPersistentDataStore;
843    use crate::stores::store_types::{PatchTarget, StorageItem};
844    use crate::test_common::{
845        self, basic_flag, basic_flag_with_prereq, basic_flag_with_prereqs_and_visibility,
846        basic_flag_with_visibility, basic_int_flag, basic_migration_flag, basic_off_flag,
847    };
848    use crate::{
849        AllData, ConfigBuilder, MigratorBuilder, NullEventProcessorBuilder, Operation, Origin,
850        PersistentDataStore, PersistentDataStoreBuilder, PersistentDataStoreFactory,
851        SerializedItem,
852    };
853    use test_case::test_case;
854
855    use super::*;
856
857    fn is_send_and_sync<T: Send + Sync>() {}
858
859    #[test]
860    fn ensure_client_is_send_and_sync() {
861        is_send_and_sync::<Client>()
862    }
863
864    #[tokio::test]
865    async fn client_asynchronously_initializes() {
866        let (client, _event_rx) = make_mocked_client_with_delay(1000, false, false);
867        client.start_with_default_executor();
868
869        let now = Instant::now();
870        let initialized = client.initialized_async().await;
871        let elapsed_time = now.elapsed();
872        assert!(initialized);
873        // Give ourself a good margin for thread scheduling.
874        assert!(elapsed_time.as_millis() > 500)
875    }
876
877    #[tokio::test]
878    async fn client_asynchronously_initializes_within_timeout() {
879        let (client, _event_rx) = make_mocked_client_with_delay(1000, false, false);
880        client.start_with_default_executor();
881
882        let now = Instant::now();
883        let initialized = client
884            .wait_for_initialization(Duration::from_millis(1500))
885            .await;
886        let elapsed_time = now.elapsed();
887        // Give ourself a good margin for thread scheduling.
888        assert!(elapsed_time.as_millis() > 500);
889        assert_eq!(initialized, Some(true));
890    }
891
892    #[tokio::test]
893    async fn client_asynchronously_initializes_slower_than_timeout() {
894        let (client, _event_rx) = make_mocked_client_with_delay(2000, false, false);
895        client.start_with_default_executor();
896
897        let now = Instant::now();
898        let initialized = client
899            .wait_for_initialization(Duration::from_millis(500))
900            .await;
901        let elapsed_time = now.elapsed();
902        // Give ourself a good margin for thread scheduling.
903        assert!(elapsed_time.as_millis() < 750);
904        assert!(initialized.is_none());
905    }
906
907    #[tokio::test]
908    async fn client_initializes_immediately_in_offline_mode() {
909        let (client, _event_rx) = make_mocked_client_with_delay(1000, true, false);
910        client.start_with_default_executor();
911
912        assert!(client.initialized());
913
914        let now = Instant::now();
915        let initialized = client
916            .wait_for_initialization(Duration::from_millis(2000))
917            .await;
918        let elapsed_time = now.elapsed();
919        assert_eq!(initialized, Some(true));
920        assert!(elapsed_time.as_millis() < 500)
921    }
922
923    #[tokio::test]
924    async fn client_initializes_immediately_in_daemon_mode() {
925        let (client, _event_rx) = make_mocked_client_with_delay(1000, false, true);
926        client.start_with_default_executor();
927
928        assert!(client.initialized());
929
930        let now = Instant::now();
931        let initialized = client
932            .wait_for_initialization(Duration::from_millis(2000))
933            .await;
934        let elapsed_time = now.elapsed();
935        assert_eq!(initialized, Some(true));
936        assert!(elapsed_time.as_millis() < 500)
937    }
938
939    #[test_case(basic_flag("myFlag"), false.into(), true.into())]
940    #[test_case(basic_int_flag("myFlag"), 0.into(), test_common::FLOAT_TO_INT_MAX.into())]
941    fn client_updates_changes_evaluation_results(
942        flag: eval::Flag,
943        default: FlagValue,
944        expected: FlagValue,
945    ) {
946        let context = ContextBuilder::new("foo")
947            .build()
948            .expect("Failed to create context");
949
950        let (client, _event_rx) = make_mocked_client();
951
952        let result = client.variation_detail(&context, "myFlag", default.clone());
953        assert_eq!(result.value.unwrap(), default);
954
955        client.start_with_default_executor();
956        client
957            .data_store
958            .write()
959            .upsert(
960                &flag.key,
961                PatchTarget::Flag(StorageItem::Item(flag.clone())),
962            )
963            .expect("patch should apply");
964
965        let result = client.variation_detail(&context, "myFlag", default);
966        assert_eq!(result.value.unwrap(), expected);
967        assert!(matches!(
968            result.reason,
969            Reason::Fallthrough {
970                in_experiment: false
971            }
972        ));
973    }
974
975    #[test]
976    fn all_flags_detail_is_invalid_when_offline() {
977        let (client, _event_rx) = make_mocked_offline_client();
978        client.start_with_default_executor();
979
980        let context = ContextBuilder::new("bob")
981            .build()
982            .expect("Failed to create context");
983
984        let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
985        assert_json_eq!(all_flags, json!({"$valid": false, "$flagsState" : {}}));
986    }
987
988    #[test]
989    fn all_flags_detail_is_invalid_when_not_initialized() {
990        let (client, _event_rx) = make_mocked_client();
991
992        let context = ContextBuilder::new("bob")
993            .build()
994            .expect("Failed to create context");
995
996        let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
997        assert_json_eq!(all_flags, json!({"$valid": false, "$flagsState" : {}}));
998    }
999
1000    #[test]
1001    fn all_flags_detail_returns_flag_states() {
1002        let (client, _event_rx) = make_mocked_client();
1003        client.start_with_default_executor();
1004        client
1005            .data_store
1006            .write()
1007            .upsert(
1008                "myFlag1",
1009                PatchTarget::Flag(StorageItem::Item(basic_flag("myFlag1"))),
1010            )
1011            .expect("patch should apply");
1012        client
1013            .data_store
1014            .write()
1015            .upsert(
1016                "myFlag2",
1017                PatchTarget::Flag(StorageItem::Item(basic_flag("myFlag2"))),
1018            )
1019            .expect("patch should apply");
1020        let context = ContextBuilder::new("bob")
1021            .build()
1022            .expect("Failed to create context");
1023
1024        let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1025
1026        client.close();
1027
1028        assert_json_eq!(
1029            all_flags,
1030            json!({
1031                "myFlag1": true,
1032                "myFlag2": true,
1033                "$flagsState": {
1034                    "myFlag1": {
1035                        "version": 42,
1036                        "variation": 1
1037                    },
1038                     "myFlag2": {
1039                        "version": 42,
1040                        "variation": 1
1041                    },
1042                },
1043                "$valid": true
1044            })
1045        );
1046    }
1047
1048    #[test]
1049    fn all_flags_detail_returns_prerequisite_relations() {
1050        let (client, _event_rx) = make_mocked_client();
1051        client.start_with_default_executor();
1052        client
1053            .data_store
1054            .write()
1055            .upsert(
1056                "prereq1",
1057                PatchTarget::Flag(StorageItem::Item(basic_flag("prereq1"))),
1058            )
1059            .expect("patch should apply");
1060        client
1061            .data_store
1062            .write()
1063            .upsert(
1064                "prereq2",
1065                PatchTarget::Flag(StorageItem::Item(basic_flag("prereq2"))),
1066            )
1067            .expect("patch should apply");
1068
1069        client
1070            .data_store
1071            .write()
1072            .upsert(
1073                "toplevel",
1074                PatchTarget::Flag(StorageItem::Item(basic_flag_with_prereqs_and_visibility(
1075                    "toplevel",
1076                    &["prereq1", "prereq2"],
1077                    false,
1078                ))),
1079            )
1080            .expect("patch should apply");
1081
1082        let context = ContextBuilder::new("bob")
1083            .build()
1084            .expect("Failed to create context");
1085
1086        let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1087
1088        client.close();
1089
1090        assert_json_eq!(
1091            all_flags,
1092            json!({
1093                "prereq1": true,
1094                "prereq2": true,
1095                "toplevel": true,
1096                "$flagsState": {
1097                    "toplevel": {
1098                        "version": 42,
1099                        "variation": 1,
1100                        "prerequisites": ["prereq1", "prereq2"]
1101                    },
1102                    "prereq1": {
1103                        "version": 42,
1104                        "variation": 1
1105                    },
1106                     "prereq2": {
1107                        "version": 42,
1108                        "variation": 1
1109                    },
1110                },
1111                "$valid": true
1112            })
1113        );
1114    }
1115
1116    #[test]
1117    fn all_flags_detail_returns_prerequisite_relations_when_not_visible_to_clients() {
1118        let (client, _event_rx) = make_mocked_client();
1119        client.start_with_default_executor();
1120        client
1121            .data_store
1122            .write()
1123            .upsert(
1124                "prereq1",
1125                PatchTarget::Flag(StorageItem::Item(basic_flag_with_visibility(
1126                    "prereq1", false,
1127                ))),
1128            )
1129            .expect("patch should apply");
1130        client
1131            .data_store
1132            .write()
1133            .upsert(
1134                "prereq2",
1135                PatchTarget::Flag(StorageItem::Item(basic_flag_with_visibility(
1136                    "prereq2", false,
1137                ))),
1138            )
1139            .expect("patch should apply");
1140
1141        client
1142            .data_store
1143            .write()
1144            .upsert(
1145                "toplevel",
1146                PatchTarget::Flag(StorageItem::Item(basic_flag_with_prereqs_and_visibility(
1147                    "toplevel",
1148                    &["prereq1", "prereq2"],
1149                    true,
1150                ))),
1151            )
1152            .expect("patch should apply");
1153
1154        let context = ContextBuilder::new("bob")
1155            .build()
1156            .expect("Failed to create context");
1157
1158        let mut config = FlagDetailConfig::new();
1159        config.client_side_only();
1160
1161        let all_flags = client.all_flags_detail(&context, config);
1162
1163        client.close();
1164
1165        assert_json_eq!(
1166            all_flags,
1167            json!({
1168                "toplevel": true,
1169                "$flagsState": {
1170                    "toplevel": {
1171                        "version": 42,
1172                        "variation": 1,
1173                        "prerequisites": ["prereq1", "prereq2"]
1174                    },
1175                },
1176                "$valid": true
1177            })
1178        );
1179    }
1180
1181    #[test]
1182    fn variation_tracks_events_correctly() {
1183        let (client, event_rx) = make_mocked_client();
1184        client.start_with_default_executor();
1185        client
1186            .data_store
1187            .write()
1188            .upsert(
1189                "myFlag",
1190                PatchTarget::Flag(StorageItem::Item(basic_flag("myFlag"))),
1191            )
1192            .expect("patch should apply");
1193        let context = ContextBuilder::new("bob")
1194            .build()
1195            .expect("Failed to create context");
1196
1197        let flag_value = client.variation(&context, "myFlag", FlagValue::Bool(false));
1198
1199        assert!(flag_value.as_bool().unwrap());
1200        client.flush();
1201        client.close();
1202
1203        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1204        assert_eq!(events.len(), 2);
1205        assert_eq!(events[0].kind(), "index");
1206        assert_eq!(events[1].kind(), "summary");
1207
1208        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1209            let variation_key = VariationKey {
1210                version: Some(42),
1211                variation: Some(1),
1212            };
1213            let feature = event_summary.features.get("myFlag");
1214            assert!(feature.is_some());
1215
1216            let feature = feature.unwrap();
1217            assert!(feature.counters.contains_key(&variation_key));
1218        } else {
1219            panic!("Event should be a summary type");
1220        }
1221    }
1222
1223    #[test]
1224    fn variation_handles_offline_mode() {
1225        let (client, event_rx) = make_mocked_offline_client();
1226        client.start_with_default_executor();
1227
1228        let context = ContextBuilder::new("bob")
1229            .build()
1230            .expect("Failed to create context");
1231        let flag_value = client.variation(&context, "myFlag", FlagValue::Bool(false));
1232
1233        assert!(!flag_value.as_bool().unwrap());
1234        client.flush();
1235        client.close();
1236
1237        assert_eq!(event_rx.iter().count(), 0);
1238    }
1239
1240    #[test]
1241    fn variation_handles_unknown_flags() {
1242        let (client, event_rx) = make_mocked_client();
1243        client.start_with_default_executor();
1244        let context = ContextBuilder::new("bob")
1245            .build()
1246            .expect("Failed to create context");
1247
1248        let flag_value = client.variation(&context, "non-existent-flag", FlagValue::Bool(false));
1249
1250        assert!(!flag_value.as_bool().unwrap());
1251        client.flush();
1252        client.close();
1253
1254        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1255        assert_eq!(events.len(), 2);
1256        assert_eq!(events[0].kind(), "index");
1257        assert_eq!(events[1].kind(), "summary");
1258
1259        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1260            let variation_key = VariationKey {
1261                version: None,
1262                variation: None,
1263            };
1264
1265            let feature = event_summary.features.get("non-existent-flag");
1266            assert!(feature.is_some());
1267
1268            let feature = feature.unwrap();
1269            assert!(feature.counters.contains_key(&variation_key));
1270        } else {
1271            panic!("Event should be a summary type");
1272        }
1273    }
1274
1275    #[test]
1276    fn variation_detail_handles_debug_events_correctly() {
1277        let (client, event_rx) = make_mocked_client();
1278        client.start_with_default_executor();
1279
1280        let mut flag = basic_flag("myFlag");
1281        flag.debug_events_until_date = Some(64_060_606_800_000); // Jan. 1st, 4000
1282
1283        client
1284            .data_store
1285            .write()
1286            .upsert(
1287                &flag.key,
1288                PatchTarget::Flag(StorageItem::Item(flag.clone())),
1289            )
1290            .expect("patch should apply");
1291        let context = ContextBuilder::new("bob")
1292            .build()
1293            .expect("Failed to create context");
1294
1295        let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1296
1297        assert!(detail.value.unwrap().as_bool().unwrap());
1298        assert!(matches!(
1299            detail.reason,
1300            Reason::Fallthrough {
1301                in_experiment: false
1302            }
1303        ));
1304        client.flush();
1305        client.close();
1306
1307        let events = event_rx.try_iter().collect::<Vec<OutputEvent>>();
1308        assert_eq!(events.len(), 3);
1309        assert_eq!(events[0].kind(), "index");
1310        assert_eq!(events[1].kind(), "debug");
1311        assert_eq!(events[2].kind(), "summary");
1312
1313        if let OutputEvent::Summary(event_summary) = events[2].clone() {
1314            let variation_key = VariationKey {
1315                version: Some(42),
1316                variation: Some(1),
1317            };
1318
1319            let feature = event_summary.features.get("myFlag");
1320            assert!(feature.is_some());
1321
1322            let feature = feature.unwrap();
1323            assert!(feature.counters.contains_key(&variation_key));
1324        } else {
1325            panic!("Event should be a summary type");
1326        }
1327    }
1328
1329    #[test]
1330    fn variation_detail_tracks_events_correctly() {
1331        let (client, event_rx) = make_mocked_client();
1332        client.start_with_default_executor();
1333
1334        client
1335            .data_store
1336            .write()
1337            .upsert(
1338                "myFlag",
1339                PatchTarget::Flag(StorageItem::Item(basic_flag("myFlag"))),
1340            )
1341            .expect("patch should apply");
1342        let context = ContextBuilder::new("bob")
1343            .build()
1344            .expect("Failed to create context");
1345
1346        let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1347
1348        assert!(detail.value.unwrap().as_bool().unwrap());
1349        assert!(matches!(
1350            detail.reason,
1351            Reason::Fallthrough {
1352                in_experiment: false
1353            }
1354        ));
1355        client.flush();
1356        client.close();
1357
1358        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1359        assert_eq!(events.len(), 2);
1360        assert_eq!(events[0].kind(), "index");
1361        assert_eq!(events[1].kind(), "summary");
1362
1363        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1364            let variation_key = VariationKey {
1365                version: Some(42),
1366                variation: Some(1),
1367            };
1368
1369            let feature = event_summary.features.get("myFlag");
1370            assert!(feature.is_some());
1371
1372            let feature = feature.unwrap();
1373            assert!(feature.counters.contains_key(&variation_key));
1374        } else {
1375            panic!("Event should be a summary type");
1376        }
1377    }
1378
1379    #[test]
1380    fn variation_detail_handles_offline_mode() {
1381        let (client, event_rx) = make_mocked_offline_client();
1382        client.start_with_default_executor();
1383
1384        let context = ContextBuilder::new("bob")
1385            .build()
1386            .expect("Failed to create context");
1387
1388        let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1389
1390        assert!(!detail.value.unwrap().as_bool().unwrap());
1391        assert!(matches!(
1392            detail.reason,
1393            Reason::Error {
1394                error: eval::Error::ClientNotReady
1395            }
1396        ));
1397        client.flush();
1398        client.close();
1399
1400        assert_eq!(event_rx.iter().count(), 0);
1401    }
1402
1403    struct InMemoryPersistentDataStoreFactory {
1404        data: AllData<Flag, Segment>,
1405        initialized: bool,
1406    }
1407
1408    impl PersistentDataStoreFactory for InMemoryPersistentDataStoreFactory {
1409        fn create_persistent_data_store(
1410            &self,
1411        ) -> Result<Box<(dyn PersistentDataStore + 'static)>, std::io::Error> {
1412            let serialized_data =
1413                AllData::<SerializedItem, SerializedItem>::try_from(self.data.clone())?;
1414            Ok(Box::new(InMemoryPersistentDataStore {
1415                data: serialized_data,
1416                initialized: self.initialized,
1417            }))
1418        }
1419    }
1420
1421    #[test]
1422    fn variation_detail_handles_daemon_mode() {
1423        testing_logger::setup();
1424        let factory = InMemoryPersistentDataStoreFactory {
1425            data: AllData {
1426                flags: hashmap!["flag".into() => basic_flag("flag")],
1427                segments: HashMap::new(),
1428            },
1429            initialized: true,
1430        };
1431        let builder = PersistentDataStoreBuilder::new(Arc::new(factory));
1432
1433        let config = ConfigBuilder::new("sdk-key")
1434            .daemon_mode(true)
1435            .data_store(&builder)
1436            .event_processor(&NullEventProcessorBuilder::new())
1437            .build()
1438            .expect("config should build");
1439
1440        let client = Client::build(config).expect("Should be built.");
1441
1442        client.start_with_default_executor();
1443
1444        let context = ContextBuilder::new("bob")
1445            .build()
1446            .expect("Failed to create context");
1447
1448        let detail = client.variation_detail(&context, "flag", FlagValue::Bool(false));
1449
1450        assert!(detail.value.unwrap().as_bool().unwrap());
1451        assert!(matches!(
1452            detail.reason,
1453            Reason::Fallthrough {
1454                in_experiment: false
1455            }
1456        ));
1457        client.flush();
1458        client.close();
1459
1460        testing_logger::validate(|captured_logs| {
1461            assert_eq!(captured_logs.len(), 1);
1462            assert_eq!(
1463                captured_logs[0].body,
1464                "Started LaunchDarkly Client in daemon mode"
1465            );
1466        });
1467    }
1468
1469    #[test]
1470    fn daemon_mode_is_quiet_if_store_is_not_initialized() {
1471        testing_logger::setup();
1472
1473        let factory = InMemoryPersistentDataStoreFactory {
1474            data: AllData {
1475                flags: HashMap::new(),
1476                segments: HashMap::new(),
1477            },
1478            initialized: false,
1479        };
1480        let builder = PersistentDataStoreBuilder::new(Arc::new(factory));
1481
1482        let config = ConfigBuilder::new("sdk-key")
1483            .daemon_mode(true)
1484            .data_store(&builder)
1485            .event_processor(&NullEventProcessorBuilder::new())
1486            .build()
1487            .expect("config should build");
1488
1489        let client = Client::build(config).expect("Should be built.");
1490
1491        client.start_with_default_executor();
1492
1493        let context = ContextBuilder::new("bob")
1494            .build()
1495            .expect("Failed to create context");
1496
1497        client.variation_detail(&context, "flag", FlagValue::Bool(false));
1498
1499        testing_logger::validate(|captured_logs| {
1500            assert_eq!(captured_logs.len(), 1);
1501            assert_eq!(
1502                captured_logs[0].body,
1503                "Started LaunchDarkly Client in daemon mode"
1504            );
1505        });
1506    }
1507
1508    #[test]
1509    fn variation_handles_off_flag_without_variation() {
1510        let (client, event_rx) = make_mocked_client();
1511        client.start_with_default_executor();
1512
1513        client
1514            .data_store
1515            .write()
1516            .upsert(
1517                "myFlag",
1518                PatchTarget::Flag(StorageItem::Item(basic_off_flag("myFlag"))),
1519            )
1520            .expect("patch should apply");
1521        let context = ContextBuilder::new("bob")
1522            .build()
1523            .expect("Failed to create context");
1524
1525        let result = client.variation(&context, "myFlag", FlagValue::Bool(false));
1526
1527        assert!(!result.as_bool().unwrap());
1528        client.flush();
1529        client.close();
1530
1531        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1532        assert_eq!(events.len(), 2);
1533        assert_eq!(events[0].kind(), "index");
1534        assert_eq!(events[1].kind(), "summary");
1535
1536        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1537            let variation_key = VariationKey {
1538                version: Some(42),
1539                variation: None,
1540            };
1541            let feature = event_summary.features.get("myFlag");
1542            assert!(feature.is_some());
1543
1544            let feature = feature.unwrap();
1545            assert!(feature.counters.contains_key(&variation_key));
1546        } else {
1547            panic!("Event should be a summary type");
1548        }
1549    }
1550
1551    #[test]
1552    fn variation_detail_tracks_prereq_events_correctly() {
1553        let (client, event_rx) = make_mocked_client();
1554        client.start_with_default_executor();
1555
1556        let mut basic_preqreq_flag = basic_flag("prereqFlag");
1557        basic_preqreq_flag.track_events = true;
1558
1559        client
1560            .data_store
1561            .write()
1562            .upsert(
1563                "prereqFlag",
1564                PatchTarget::Flag(StorageItem::Item(basic_preqreq_flag)),
1565            )
1566            .expect("patch should apply");
1567
1568        let mut basic_flag = basic_flag_with_prereq("myFlag", "prereqFlag");
1569        basic_flag.track_events = true;
1570        client
1571            .data_store
1572            .write()
1573            .upsert("myFlag", PatchTarget::Flag(StorageItem::Item(basic_flag)))
1574            .expect("patch should apply");
1575        let context = ContextBuilder::new("bob")
1576            .build()
1577            .expect("Failed to create context");
1578
1579        let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1580
1581        assert!(detail.value.unwrap().as_bool().unwrap());
1582        assert!(matches!(
1583            detail.reason,
1584            Reason::Fallthrough {
1585                in_experiment: false
1586            }
1587        ));
1588        client.flush();
1589        client.close();
1590
1591        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1592        assert_eq!(events.len(), 4);
1593        assert_eq!(events[0].kind(), "index");
1594        assert_eq!(events[1].kind(), "feature");
1595        assert_eq!(events[2].kind(), "feature");
1596        assert_eq!(events[3].kind(), "summary");
1597
1598        if let OutputEvent::Summary(event_summary) = events[3].clone() {
1599            let variation_key = VariationKey {
1600                version: Some(42),
1601                variation: Some(1),
1602            };
1603            let feature = event_summary.features.get("myFlag");
1604            assert!(feature.is_some());
1605
1606            let feature = feature.unwrap();
1607            assert!(feature.counters.contains_key(&variation_key));
1608
1609            let variation_key = VariationKey {
1610                version: Some(42),
1611                variation: Some(1),
1612            };
1613            let feature = event_summary.features.get("prereqFlag");
1614            assert!(feature.is_some());
1615
1616            let feature = feature.unwrap();
1617            assert!(feature.counters.contains_key(&variation_key));
1618        }
1619    }
1620
1621    #[test]
1622    fn variation_handles_failed_prereqs_correctly() {
1623        let (client, event_rx) = make_mocked_client();
1624        client.start_with_default_executor();
1625
1626        let mut basic_preqreq_flag = basic_off_flag("prereqFlag");
1627        basic_preqreq_flag.track_events = true;
1628
1629        client
1630            .data_store
1631            .write()
1632            .upsert(
1633                "prereqFlag",
1634                PatchTarget::Flag(StorageItem::Item(basic_preqreq_flag)),
1635            )
1636            .expect("patch should apply");
1637
1638        let mut basic_flag = basic_flag_with_prereq("myFlag", "prereqFlag");
1639        basic_flag.track_events = true;
1640        client
1641            .data_store
1642            .write()
1643            .upsert("myFlag", PatchTarget::Flag(StorageItem::Item(basic_flag)))
1644            .expect("patch should apply");
1645        let context = ContextBuilder::new("bob")
1646            .build()
1647            .expect("Failed to create context");
1648
1649        let detail = client.variation(&context, "myFlag", FlagValue::Bool(false));
1650
1651        assert!(!detail.as_bool().unwrap());
1652        client.flush();
1653        client.close();
1654
1655        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1656        assert_eq!(events.len(), 4);
1657        assert_eq!(events[0].kind(), "index");
1658        assert_eq!(events[1].kind(), "feature");
1659        assert_eq!(events[2].kind(), "feature");
1660        assert_eq!(events[3].kind(), "summary");
1661
1662        if let OutputEvent::Summary(event_summary) = events[3].clone() {
1663            let variation_key = VariationKey {
1664                version: Some(42),
1665                variation: Some(0),
1666            };
1667            let feature = event_summary.features.get("myFlag");
1668            assert!(feature.is_some());
1669
1670            let feature = feature.unwrap();
1671            assert!(feature.counters.contains_key(&variation_key));
1672
1673            let variation_key = VariationKey {
1674                version: Some(42),
1675                variation: None,
1676            };
1677            let feature = event_summary.features.get("prereqFlag");
1678            assert!(feature.is_some());
1679
1680            let feature = feature.unwrap();
1681            assert!(feature.counters.contains_key(&variation_key));
1682        }
1683    }
1684
1685    #[test]
1686    fn variation_detail_handles_flag_not_found() {
1687        let (client, event_rx) = make_mocked_client();
1688        client.start_with_default_executor();
1689
1690        let context = ContextBuilder::new("bob")
1691            .build()
1692            .expect("Failed to create context");
1693        let detail = client.variation_detail(&context, "non-existent-flag", FlagValue::Bool(false));
1694
1695        assert!(!detail.value.unwrap().as_bool().unwrap());
1696        assert!(matches!(
1697            detail.reason,
1698            Reason::Error {
1699                error: eval::Error::FlagNotFound
1700            }
1701        ));
1702        client.flush();
1703        client.close();
1704
1705        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1706        assert_eq!(events.len(), 2);
1707        assert_eq!(events[0].kind(), "index");
1708        assert_eq!(events[1].kind(), "summary");
1709
1710        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1711            let variation_key = VariationKey {
1712                version: None,
1713                variation: None,
1714            };
1715            let feature = event_summary.features.get("non-existent-flag");
1716            assert!(feature.is_some());
1717
1718            let feature = feature.unwrap();
1719            assert!(feature.counters.contains_key(&variation_key));
1720        } else {
1721            panic!("Event should be a summary type");
1722        }
1723    }
1724
1725    #[tokio::test]
1726    async fn variation_detail_handles_client_not_ready() {
1727        let (client, event_rx) = make_mocked_client_with_delay(u64::MAX, false, false);
1728        client.start_with_default_executor();
1729        let context = ContextBuilder::new("bob")
1730            .build()
1731            .expect("Failed to create context");
1732
1733        let detail = client.variation_detail(&context, "non-existent-flag", FlagValue::Bool(false));
1734
1735        assert!(!detail.value.unwrap().as_bool().unwrap());
1736        assert!(matches!(
1737            detail.reason,
1738            Reason::Error {
1739                error: eval::Error::ClientNotReady
1740            }
1741        ));
1742        client.flush();
1743        client.close();
1744
1745        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1746        assert_eq!(events.len(), 2);
1747        assert_eq!(events[0].kind(), "index");
1748        assert_eq!(events[1].kind(), "summary");
1749
1750        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1751            let variation_key = VariationKey {
1752                version: None,
1753                variation: None,
1754            };
1755            let feature = event_summary.features.get("non-existent-flag");
1756            assert!(feature.is_some());
1757
1758            let feature = feature.unwrap();
1759            assert!(feature.counters.contains_key(&variation_key));
1760        } else {
1761            panic!("Event should be a summary type");
1762        }
1763    }
1764
1765    #[test]
1766    fn identify_sends_identify_event() {
1767        let (client, event_rx) = make_mocked_client();
1768        client.start_with_default_executor();
1769
1770        let context = ContextBuilder::new("bob")
1771            .build()
1772            .expect("Failed to create context");
1773
1774        client.identify(context);
1775        client.flush();
1776        client.close();
1777
1778        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1779        assert_eq!(events.len(), 1);
1780        assert_eq!(events[0].kind(), "identify");
1781    }
1782
1783    #[test]
1784    fn identify_sends_sends_nothing_in_offline_mode() {
1785        let (client, event_rx) = make_mocked_offline_client();
1786        client.start_with_default_executor();
1787
1788        let context = ContextBuilder::new("bob")
1789            .build()
1790            .expect("Failed to create context");
1791
1792        client.identify(context);
1793        client.flush();
1794        client.close();
1795
1796        assert_eq!(event_rx.iter().count(), 0);
1797    }
1798
1799    #[test]
1800    fn secure_mode_hash() {
1801        let config = ConfigBuilder::new("secret")
1802            .offline(true)
1803            .build()
1804            .expect("config should build");
1805        let client = Client::build(config).expect("Should be built.");
1806        let context = ContextBuilder::new("Message")
1807            .build()
1808            .expect("Failed to create context");
1809
1810        assert_eq!(
1811            client.secure_mode_hash(&context),
1812            "aa747c502a898200f9e4fa21bac68136f886a0e27aec70ba06daf2e2a5cb5597"
1813        );
1814    }
1815
1816    #[test]
1817    fn secure_mode_hash_with_multi_kind() {
1818        let config = ConfigBuilder::new("secret")
1819            .offline(true)
1820            .build()
1821            .expect("config should build");
1822        let client = Client::build(config).expect("Should be built.");
1823
1824        let org = ContextBuilder::new("org-key|1")
1825            .kind("org")
1826            .build()
1827            .expect("Failed to create context");
1828        let user = ContextBuilder::new("user-key:2")
1829            .build()
1830            .expect("Failed to create context");
1831
1832        let context = MultiContextBuilder::new()
1833            .add_context(org)
1834            .add_context(user)
1835            .build()
1836            .expect("failed to build multi-context");
1837
1838        assert_eq!(
1839            client.secure_mode_hash(&context),
1840            "5687e6383b920582ed50c2a96c98a115f1b6aad85a60579d761d9b8797415163"
1841        );
1842    }
1843
1844    #[derive(Serialize)]
1845    struct MyCustomData {
1846        pub answer: u32,
1847    }
1848
1849    #[test]
1850    fn track_sends_track_and_index_events() -> serde_json::Result<()> {
1851        let (client, event_rx) = make_mocked_client();
1852        client.start_with_default_executor();
1853
1854        let context = ContextBuilder::new("bob")
1855            .build()
1856            .expect("Failed to create context");
1857
1858        client.track_event(context.clone(), "event-with-null");
1859        client.track_data(context.clone(), "event-with-string", "string-data")?;
1860        client.track_data(context.clone(), "event-with-json", json!({"answer": 42}))?;
1861        client.track_data(
1862            context.clone(),
1863            "event-with-struct",
1864            MyCustomData { answer: 42 },
1865        )?;
1866        client.track_metric(context, "event-with-metric", 42.0, serde_json::Value::Null);
1867
1868        client.flush();
1869        client.close();
1870
1871        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1872        assert_eq!(events.len(), 6);
1873
1874        let mut events_by_type: HashMap<&str, usize> = HashMap::new();
1875        for event in events {
1876            if let Some(count) = events_by_type.get_mut(event.kind()) {
1877                *count += 1;
1878            } else {
1879                events_by_type.insert(event.kind(), 1);
1880            }
1881        }
1882        assert!(matches!(events_by_type.get("index"), Some(1)));
1883        assert!(matches!(events_by_type.get("custom"), Some(5)));
1884
1885        Ok(())
1886    }
1887
1888    #[test]
1889    fn track_sends_nothing_in_offline_mode() -> serde_json::Result<()> {
1890        let (client, event_rx) = make_mocked_offline_client();
1891        client.start_with_default_executor();
1892
1893        let context = ContextBuilder::new("bob")
1894            .build()
1895            .expect("Failed to create context");
1896
1897        client.track_event(context.clone(), "event-with-null");
1898        client.track_data(context.clone(), "event-with-string", "string-data")?;
1899        client.track_data(context.clone(), "event-with-json", json!({"answer": 42}))?;
1900        client.track_data(
1901            context.clone(),
1902            "event-with-struct",
1903            MyCustomData { answer: 42 },
1904        )?;
1905        client.track_metric(context, "event-with-metric", 42.0, serde_json::Value::Null);
1906
1907        client.flush();
1908        client.close();
1909
1910        assert_eq!(event_rx.iter().count(), 0);
1911
1912        Ok(())
1913    }
1914
1915    #[test]
1916    fn migration_handles_flag_not_found() {
1917        let (client, _event_rx) = make_mocked_client();
1918        client.start_with_default_executor();
1919
1920        let context = ContextBuilder::new("bob")
1921            .build()
1922            .expect("Failed to create context");
1923
1924        let (stage, _tracker) =
1925            client.migration_variation(&context, "non-existent-flag-key", Stage::Off);
1926
1927        assert_eq!(stage, Stage::Off);
1928    }
1929
1930    #[test]
1931    fn migration_uses_non_migration_flag() {
1932        let (client, _event_rx) = make_mocked_client();
1933        client.start_with_default_executor();
1934        client
1935            .data_store
1936            .write()
1937            .upsert(
1938                "boolean-flag",
1939                PatchTarget::Flag(StorageItem::Item(basic_flag("boolean-flag"))),
1940            )
1941            .expect("patch should apply");
1942
1943        let context = ContextBuilder::new("bob")
1944            .build()
1945            .expect("Failed to create context");
1946
1947        let (stage, _tracker) = client.migration_variation(&context, "boolean-flag", Stage::Off);
1948
1949        assert_eq!(stage, Stage::Off);
1950    }
1951
1952    #[test_case(Stage::Off)]
1953    #[test_case(Stage::DualWrite)]
1954    #[test_case(Stage::Shadow)]
1955    #[test_case(Stage::Live)]
1956    #[test_case(Stage::Rampdown)]
1957    #[test_case(Stage::Complete)]
1958    fn migration_can_determine_correct_stage_from_flag(stage: Stage) {
1959        let (client, _event_rx) = make_mocked_client();
1960        client.start_with_default_executor();
1961        client
1962            .data_store
1963            .write()
1964            .upsert(
1965                "stage-flag",
1966                PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
1967            )
1968            .expect("patch should apply");
1969
1970        let context = ContextBuilder::new("bob")
1971            .build()
1972            .expect("Failed to create context");
1973
1974        let (evaluated_stage, _tracker) =
1975            client.migration_variation(&context, "stage-flag", Stage::Off);
1976
1977        assert_eq!(evaluated_stage, stage);
1978    }
1979
1980    #[tokio::test]
1981    async fn migration_tracks_invoked_correctly() {
1982        migration_tracks_invoked_correctly_driver(Stage::Off, Operation::Read, vec![Origin::Old])
1983            .await;
1984        migration_tracks_invoked_correctly_driver(
1985            Stage::DualWrite,
1986            Operation::Read,
1987            vec![Origin::Old],
1988        )
1989        .await;
1990        migration_tracks_invoked_correctly_driver(
1991            Stage::Shadow,
1992            Operation::Read,
1993            vec![Origin::Old, Origin::New],
1994        )
1995        .await;
1996        migration_tracks_invoked_correctly_driver(
1997            Stage::Live,
1998            Operation::Read,
1999            vec![Origin::Old, Origin::New],
2000        )
2001        .await;
2002        migration_tracks_invoked_correctly_driver(
2003            Stage::Rampdown,
2004            Operation::Read,
2005            vec![Origin::New],
2006        )
2007        .await;
2008        migration_tracks_invoked_correctly_driver(
2009            Stage::Complete,
2010            Operation::Read,
2011            vec![Origin::New],
2012        )
2013        .await;
2014        migration_tracks_invoked_correctly_driver(Stage::Off, Operation::Write, vec![Origin::Old])
2015            .await;
2016        migration_tracks_invoked_correctly_driver(
2017            Stage::DualWrite,
2018            Operation::Write,
2019            vec![Origin::Old, Origin::New],
2020        )
2021        .await;
2022        migration_tracks_invoked_correctly_driver(
2023            Stage::Shadow,
2024            Operation::Write,
2025            vec![Origin::Old, Origin::New],
2026        )
2027        .await;
2028        migration_tracks_invoked_correctly_driver(
2029            Stage::Live,
2030            Operation::Write,
2031            vec![Origin::Old, Origin::New],
2032        )
2033        .await;
2034        migration_tracks_invoked_correctly_driver(
2035            Stage::Rampdown,
2036            Operation::Write,
2037            vec![Origin::Old, Origin::New],
2038        )
2039        .await;
2040        migration_tracks_invoked_correctly_driver(
2041            Stage::Complete,
2042            Operation::Write,
2043            vec![Origin::New],
2044        )
2045        .await;
2046    }
2047
2048    async fn migration_tracks_invoked_correctly_driver(
2049        stage: Stage,
2050        operation: Operation,
2051        origins: Vec<Origin>,
2052    ) {
2053        let (client, event_rx) = make_mocked_client();
2054        let client = Arc::new(client);
2055        client.start_with_default_executor();
2056        client
2057            .data_store
2058            .write()
2059            .upsert(
2060                "stage-flag",
2061                PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2062            )
2063            .expect("patch should apply");
2064
2065        let mut migrator = MigratorBuilder::new(client.clone())
2066            .read(
2067                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2068                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2069                Some(|_, _| true),
2070            )
2071            .write(
2072                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2073                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2074            )
2075            .build()
2076            .expect("migrator should build");
2077
2078        let context = ContextBuilder::new("bob")
2079            .build()
2080            .expect("Failed to create context");
2081
2082        if let Operation::Read = operation {
2083            migrator
2084                .read(
2085                    &context,
2086                    "stage-flag".into(),
2087                    Stage::Off,
2088                    serde_json::Value::Null,
2089                )
2090                .await;
2091        } else {
2092            migrator
2093                .write(
2094                    &context,
2095                    "stage-flag".into(),
2096                    Stage::Off,
2097                    serde_json::Value::Null,
2098                )
2099                .await;
2100        }
2101
2102        client.flush();
2103        client.close();
2104
2105        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2106        assert_eq!(events.len(), 3);
2107        match &events[1] {
2108            OutputEvent::MigrationOp(event) => {
2109                assert!(event.invoked.len() == origins.len());
2110                assert!(event.invoked.iter().all(|i| origins.contains(i)));
2111            }
2112            _ => panic!("Expected migration event"),
2113        }
2114    }
2115
2116    #[tokio::test]
2117    async fn migration_tracks_latency() {
2118        migration_tracks_latency_driver(Stage::Off, Operation::Read, vec![Origin::Old]).await;
2119        migration_tracks_latency_driver(Stage::DualWrite, Operation::Read, vec![Origin::Old]).await;
2120        migration_tracks_latency_driver(
2121            Stage::Shadow,
2122            Operation::Read,
2123            vec![Origin::Old, Origin::New],
2124        )
2125        .await;
2126        migration_tracks_latency_driver(
2127            Stage::Live,
2128            Operation::Read,
2129            vec![Origin::Old, Origin::New],
2130        )
2131        .await;
2132        migration_tracks_latency_driver(Stage::Rampdown, Operation::Read, vec![Origin::New]).await;
2133        migration_tracks_latency_driver(Stage::Complete, Operation::Read, vec![Origin::New]).await;
2134        migration_tracks_latency_driver(Stage::Off, Operation::Write, vec![Origin::Old]).await;
2135        migration_tracks_latency_driver(
2136            Stage::DualWrite,
2137            Operation::Write,
2138            vec![Origin::Old, Origin::New],
2139        )
2140        .await;
2141        migration_tracks_latency_driver(
2142            Stage::Shadow,
2143            Operation::Write,
2144            vec![Origin::Old, Origin::New],
2145        )
2146        .await;
2147        migration_tracks_latency_driver(
2148            Stage::Live,
2149            Operation::Write,
2150            vec![Origin::Old, Origin::New],
2151        )
2152        .await;
2153        migration_tracks_latency_driver(
2154            Stage::Rampdown,
2155            Operation::Write,
2156            vec![Origin::Old, Origin::New],
2157        )
2158        .await;
2159        migration_tracks_latency_driver(Stage::Complete, Operation::Write, vec![Origin::New]).await;
2160    }
2161
2162    async fn migration_tracks_latency_driver(
2163        stage: Stage,
2164        operation: Operation,
2165        origins: Vec<Origin>,
2166    ) {
2167        let (client, event_rx) = make_mocked_client();
2168        let client = Arc::new(client);
2169        client.start_with_default_executor();
2170        client
2171            .data_store
2172            .write()
2173            .upsert(
2174                "stage-flag",
2175                PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2176            )
2177            .expect("patch should apply");
2178
2179        let mut migrator = MigratorBuilder::new(client.clone())
2180            .track_latency(true)
2181            .read(
2182                |_| {
2183                    async move {
2184                        async_std::task::sleep(Duration::from_millis(100)).await;
2185                        Ok(serde_json::Value::Null)
2186                    }
2187                    .boxed()
2188                },
2189                |_| {
2190                    async move {
2191                        async_std::task::sleep(Duration::from_millis(100)).await;
2192                        Ok(serde_json::Value::Null)
2193                    }
2194                    .boxed()
2195                },
2196                Some(|_, _| true),
2197            )
2198            .write(
2199                |_| {
2200                    async move {
2201                        async_std::task::sleep(Duration::from_millis(100)).await;
2202                        Ok(serde_json::Value::Null)
2203                    }
2204                    .boxed()
2205                },
2206                |_| {
2207                    async move {
2208                        async_std::task::sleep(Duration::from_millis(100)).await;
2209                        Ok(serde_json::Value::Null)
2210                    }
2211                    .boxed()
2212                },
2213            )
2214            .build()
2215            .expect("migrator should build");
2216
2217        let context = ContextBuilder::new("bob")
2218            .build()
2219            .expect("Failed to create context");
2220
2221        if let Operation::Read = operation {
2222            migrator
2223                .read(
2224                    &context,
2225                    "stage-flag".into(),
2226                    Stage::Off,
2227                    serde_json::Value::Null,
2228                )
2229                .await;
2230        } else {
2231            migrator
2232                .write(
2233                    &context,
2234                    "stage-flag".into(),
2235                    Stage::Off,
2236                    serde_json::Value::Null,
2237                )
2238                .await;
2239        }
2240
2241        client.flush();
2242        client.close();
2243
2244        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2245        assert_eq!(events.len(), 3);
2246        match &events[1] {
2247            OutputEvent::MigrationOp(event) => {
2248                assert!(event.latency.len() == origins.len());
2249                assert!(event
2250                    .latency
2251                    .values()
2252                    .all(|l| l > &Duration::from_millis(100)));
2253            }
2254            _ => panic!("Expected migration event"),
2255        }
2256    }
2257
2258    #[tokio::test]
2259    async fn migration_tracks_read_errors() {
2260        migration_tracks_read_errors_driver(Stage::Off, vec![Origin::Old]).await;
2261        migration_tracks_read_errors_driver(Stage::DualWrite, vec![Origin::Old]).await;
2262        migration_tracks_read_errors_driver(Stage::Shadow, vec![Origin::Old, Origin::New]).await;
2263        migration_tracks_read_errors_driver(Stage::Live, vec![Origin::Old, Origin::New]).await;
2264        migration_tracks_read_errors_driver(Stage::Rampdown, vec![Origin::New]).await;
2265        migration_tracks_read_errors_driver(Stage::Complete, vec![Origin::New]).await;
2266    }
2267
2268    async fn migration_tracks_read_errors_driver(stage: Stage, origins: Vec<Origin>) {
2269        let (client, event_rx) = make_mocked_client();
2270        let client = Arc::new(client);
2271        client.start_with_default_executor();
2272        client
2273            .data_store
2274            .write()
2275            .upsert(
2276                "stage-flag",
2277                PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2278            )
2279            .expect("patch should apply");
2280
2281        let mut migrator = MigratorBuilder::new(client.clone())
2282            .track_latency(true)
2283            .read(
2284                |_| async move { Err("fail".into()) }.boxed(),
2285                |_| async move { Err("fail".into()) }.boxed(),
2286                Some(|_: &String, _: &String| true),
2287            )
2288            .write(
2289                |_| async move { Err("fail".into()) }.boxed(),
2290                |_| async move { Err("fail".into()) }.boxed(),
2291            )
2292            .build()
2293            .expect("migrator should build");
2294
2295        let context = ContextBuilder::new("bob")
2296            .build()
2297            .expect("Failed to create context");
2298
2299        migrator
2300            .read(
2301                &context,
2302                "stage-flag".into(),
2303                Stage::Off,
2304                serde_json::Value::Null,
2305            )
2306            .await;
2307        client.flush();
2308        client.close();
2309
2310        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2311        assert_eq!(events.len(), 3);
2312        match &events[1] {
2313            OutputEvent::MigrationOp(event) => {
2314                assert!(event.errors.len() == origins.len());
2315                assert!(event.errors.iter().all(|i| origins.contains(i)));
2316            }
2317            _ => panic!("Expected migration event"),
2318        }
2319    }
2320
2321    #[tokio::test]
2322    async fn migration_tracks_authoritative_write_errors() {
2323        migration_tracks_authoritative_write_errors_driver(Stage::Off, vec![Origin::Old]).await;
2324        migration_tracks_authoritative_write_errors_driver(Stage::DualWrite, vec![Origin::Old])
2325            .await;
2326        migration_tracks_authoritative_write_errors_driver(Stage::Shadow, vec![Origin::Old]).await;
2327        migration_tracks_authoritative_write_errors_driver(Stage::Live, vec![Origin::New]).await;
2328        migration_tracks_authoritative_write_errors_driver(Stage::Rampdown, vec![Origin::New])
2329            .await;
2330        migration_tracks_authoritative_write_errors_driver(Stage::Complete, vec![Origin::New])
2331            .await;
2332    }
2333
2334    async fn migration_tracks_authoritative_write_errors_driver(
2335        stage: Stage,
2336        origins: Vec<Origin>,
2337    ) {
2338        let (client, event_rx) = make_mocked_client();
2339        let client = Arc::new(client);
2340        client.start_with_default_executor();
2341        client
2342            .data_store
2343            .write()
2344            .upsert(
2345                "stage-flag",
2346                PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2347            )
2348            .expect("patch should apply");
2349
2350        let mut migrator = MigratorBuilder::new(client.clone())
2351            .track_latency(true)
2352            .read(
2353                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2354                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2355                None,
2356            )
2357            .write(
2358                |_| async move { Err("fail".into()) }.boxed(),
2359                |_| async move { Err("fail".into()) }.boxed(),
2360            )
2361            .build()
2362            .expect("migrator should build");
2363
2364        let context = ContextBuilder::new("bob")
2365            .build()
2366            .expect("Failed to create context");
2367
2368        migrator
2369            .write(
2370                &context,
2371                "stage-flag".into(),
2372                Stage::Off,
2373                serde_json::Value::Null,
2374            )
2375            .await;
2376
2377        client.flush();
2378        client.close();
2379
2380        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2381        assert_eq!(events.len(), 3);
2382        match &events[1] {
2383            OutputEvent::MigrationOp(event) => {
2384                assert!(event.errors.len() == origins.len());
2385                assert!(event.errors.iter().all(|i| origins.contains(i)));
2386            }
2387            _ => panic!("Expected migration event"),
2388        }
2389    }
2390
2391    #[tokio::test]
2392    async fn migration_tracks_nonauthoritative_write_errors() {
2393        migration_tracks_nonauthoritative_write_errors_driver(
2394            Stage::DualWrite,
2395            false,
2396            true,
2397            vec![Origin::New],
2398        )
2399        .await;
2400        migration_tracks_nonauthoritative_write_errors_driver(
2401            Stage::Shadow,
2402            false,
2403            true,
2404            vec![Origin::New],
2405        )
2406        .await;
2407        migration_tracks_nonauthoritative_write_errors_driver(
2408            Stage::Live,
2409            true,
2410            false,
2411            vec![Origin::Old],
2412        )
2413        .await;
2414        migration_tracks_nonauthoritative_write_errors_driver(
2415            Stage::Rampdown,
2416            true,
2417            false,
2418            vec![Origin::Old],
2419        )
2420        .await;
2421    }
2422
2423    async fn migration_tracks_nonauthoritative_write_errors_driver(
2424        stage: Stage,
2425        fail_old: bool,
2426        fail_new: bool,
2427        origins: Vec<Origin>,
2428    ) {
2429        let (client, event_rx) = make_mocked_client();
2430        let client = Arc::new(client);
2431        client.start_with_default_executor();
2432        client
2433            .data_store
2434            .write()
2435            .upsert(
2436                "stage-flag",
2437                PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2438            )
2439            .expect("patch should apply");
2440
2441        let mut migrator = MigratorBuilder::new(client.clone())
2442            .track_latency(true)
2443            .read(
2444                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2445                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2446                None,
2447            )
2448            .write(
2449                move |_| {
2450                    async move {
2451                        if fail_old {
2452                            Err("fail".into())
2453                        } else {
2454                            Ok(serde_json::Value::Null)
2455                        }
2456                    }
2457                    .boxed()
2458                },
2459                move |_| {
2460                    async move {
2461                        if fail_new {
2462                            Err("fail".into())
2463                        } else {
2464                            Ok(serde_json::Value::Null)
2465                        }
2466                    }
2467                    .boxed()
2468                },
2469            )
2470            .build()
2471            .expect("migrator should build");
2472
2473        let context = ContextBuilder::new("bob")
2474            .build()
2475            .expect("Failed to create context");
2476
2477        migrator
2478            .write(
2479                &context,
2480                "stage-flag".into(),
2481                Stage::Off,
2482                serde_json::Value::Null,
2483            )
2484            .await;
2485
2486        client.flush();
2487        client.close();
2488
2489        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2490        assert_eq!(events.len(), 3);
2491        match &events[1] {
2492            OutputEvent::MigrationOp(event) => {
2493                assert!(event.errors.len() == origins.len());
2494                assert!(event.errors.iter().all(|i| origins.contains(i)));
2495            }
2496            _ => panic!("Expected migration event"),
2497        }
2498    }
2499
2500    #[tokio::test]
2501    async fn migration_tracks_consistency() {
2502        migration_tracks_consistency_driver(Stage::Shadow, "same", "same", true).await;
2503        migration_tracks_consistency_driver(Stage::Shadow, "same", "different", false).await;
2504        migration_tracks_consistency_driver(Stage::Live, "same", "same", true).await;
2505        migration_tracks_consistency_driver(Stage::Live, "same", "different", false).await;
2506    }
2507
2508    async fn migration_tracks_consistency_driver(
2509        stage: Stage,
2510        old_return: &'static str,
2511        new_return: &'static str,
2512        expected_consistency: bool,
2513    ) {
2514        let (client, event_rx) = make_mocked_client();
2515        let client = Arc::new(client);
2516        client.start_with_default_executor();
2517        client
2518            .data_store
2519            .write()
2520            .upsert(
2521                "stage-flag",
2522                PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2523            )
2524            .expect("patch should apply");
2525
2526        let mut migrator = MigratorBuilder::new(client.clone())
2527            .track_latency(true)
2528            .read(
2529                |_| {
2530                    async move {
2531                        async_std::task::sleep(Duration::from_millis(100)).await;
2532                        Ok(serde_json::Value::String(old_return.to_string()))
2533                    }
2534                    .boxed()
2535                },
2536                |_| {
2537                    async move {
2538                        async_std::task::sleep(Duration::from_millis(100)).await;
2539                        Ok(serde_json::Value::String(new_return.to_string()))
2540                    }
2541                    .boxed()
2542                },
2543                Some(|lhs, rhs| lhs == rhs),
2544            )
2545            .write(
2546                |_| {
2547                    async move {
2548                        async_std::task::sleep(Duration::from_millis(100)).await;
2549                        Ok(serde_json::Value::Null)
2550                    }
2551                    .boxed()
2552                },
2553                |_| {
2554                    async move {
2555                        async_std::task::sleep(Duration::from_millis(100)).await;
2556                        Ok(serde_json::Value::Null)
2557                    }
2558                    .boxed()
2559                },
2560            )
2561            .build()
2562            .expect("migrator should build");
2563
2564        let context = ContextBuilder::new("bob")
2565            .build()
2566            .expect("Failed to create context");
2567
2568        migrator
2569            .read(
2570                &context,
2571                "stage-flag".into(),
2572                Stage::Off,
2573                serde_json::Value::Null,
2574            )
2575            .await;
2576
2577        client.flush();
2578        client.close();
2579
2580        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2581        assert_eq!(events.len(), 3);
2582        match &events[1] {
2583            OutputEvent::MigrationOp(event) => {
2584                assert!(event.consistency_check == Some(expected_consistency))
2585            }
2586            _ => panic!("Expected migration event"),
2587        }
2588    }
2589
2590    fn make_mocked_client_with_delay(
2591        delay: u64,
2592        offline: bool,
2593        daemon_mode: bool,
2594    ) -> (Client, Receiver<OutputEvent>) {
2595        let updates = Arc::new(MockDataSource::new_with_init_delay(delay));
2596        let (event_sender, event_rx) = create_event_sender();
2597
2598        let config = ConfigBuilder::new("sdk-key")
2599            .offline(offline)
2600            .daemon_mode(daemon_mode)
2601            .data_source(MockDataSourceBuilder::new().data_source(updates))
2602            .event_processor(
2603                EventProcessorBuilder::<HttpConnector>::new().event_sender(Arc::new(event_sender)),
2604            )
2605            .build()
2606            .expect("config should build");
2607
2608        let client = Client::build(config).expect("Should be built.");
2609
2610        (client, event_rx)
2611    }
2612
2613    fn make_mocked_offline_client() -> (Client, Receiver<OutputEvent>) {
2614        make_mocked_client_with_delay(0, true, false)
2615    }
2616
2617    fn make_mocked_client() -> (Client, Receiver<OutputEvent>) {
2618        make_mocked_client_with_delay(0, false, false)
2619    }
2620}