launchdarkly_server_sdk/
client.rs

1use eval::Context;
2use parking_lot::RwLock;
3use std::io;
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7use tokio::runtime::Runtime;
8
9use launchdarkly_server_sdk_evaluation::{self as eval, Detail, FlagValue, PrerequisiteEvent};
10use serde::Serialize;
11use thiserror::Error;
12use tokio::sync::{broadcast, Semaphore};
13
14use super::config::Config;
15use super::data_source::DataSource;
16use super::data_source_builders::BuildError as DataSourceError;
17use super::evaluation::{FlagDetail, FlagDetailConfig};
18use super::stores::store::DataStore;
19use super::stores::store_builders::BuildError as DataStoreError;
20use crate::config::BuildError as ConfigBuildError;
21use crate::events::event::EventFactory;
22use crate::events::event::InputEvent;
23use crate::events::processor::EventProcessor;
24use crate::events::processor_builders::BuildError as EventProcessorError;
25use crate::{MigrationOpTracker, Stage};
26
27struct EventsScope {
28    disabled: bool,
29    event_factory: EventFactory,
30    prerequisite_event_recorder: Box<dyn eval::PrerequisiteEventRecorder + Send + Sync>,
31}
32
33struct PrerequisiteEventRecorder {
34    event_factory: EventFactory,
35    event_processor: Arc<dyn EventProcessor>,
36}
37
38impl eval::PrerequisiteEventRecorder for PrerequisiteEventRecorder {
39    fn record(&self, event: PrerequisiteEvent) {
40        let evt = self.event_factory.new_eval_event(
41            &event.prerequisite_flag.key,
42            event.context.clone(),
43            &event.prerequisite_flag,
44            event.prerequisite_result,
45            FlagValue::Json(serde_json::Value::Null),
46            Some(event.target_flag_key),
47        );
48
49        self.event_processor.send(evt);
50    }
51}
52
53/// Error type used to represent failures when building a [Client] instance.
54#[non_exhaustive]
55#[derive(Debug, Error)]
56pub enum BuildError {
57    /// Error used when a configuration setting is invalid. This typically indicates an invalid URL.
58    #[error("invalid client config: {0}")]
59    InvalidConfig(String),
60}
61
62impl From<DataSourceError> for BuildError {
63    fn from(error: DataSourceError) -> Self {
64        Self::InvalidConfig(error.to_string())
65    }
66}
67
68impl From<DataStoreError> for BuildError {
69    fn from(error: DataStoreError) -> Self {
70        Self::InvalidConfig(error.to_string())
71    }
72}
73
74impl From<EventProcessorError> for BuildError {
75    fn from(error: EventProcessorError) -> Self {
76        Self::InvalidConfig(error.to_string())
77    }
78}
79
80impl From<ConfigBuildError> for BuildError {
81    fn from(error: ConfigBuildError) -> Self {
82        Self::InvalidConfig(error.to_string())
83    }
84}
85
86/// Error type used to represent failures when starting the [Client].
87#[non_exhaustive]
88#[derive(Debug, Error)]
89pub enum StartError {
90    /// Error used when spawning a background there fails.
91    #[error("couldn't spawn background thread for client: {0}")]
92    SpawnFailed(io::Error),
93}
94
95#[derive(PartialEq, Copy, Clone, Debug)]
96enum ClientInitState {
97    Initializing = 0,
98    Initialized = 1,
99    InitializationFailed = 2,
100}
101
102impl PartialEq<usize> for ClientInitState {
103    fn eq(&self, other: &usize) -> bool {
104        *self as usize == *other
105    }
106}
107
108impl From<usize> for ClientInitState {
109    fn from(val: usize) -> Self {
110        match val {
111            0 => ClientInitState::Initializing,
112            1 => ClientInitState::Initialized,
113            2 => ClientInitState::InitializationFailed,
114            _ => unreachable!(),
115        }
116    }
117}
118
119/// A client for the LaunchDarkly API.
120///
121/// In order to create a client instance, first create a config using [crate::ConfigBuilder].
122///
123/// # Examples
124///
125/// Creating a client, with default configuration.
126/// ```
127/// # use launchdarkly_server_sdk::{Client, ConfigBuilder, BuildError};
128/// # fn main() -> Result<(), BuildError> {
129///     let ld_client = Client::build(ConfigBuilder::new("sdk-key").build()?)?;
130/// #   Ok(())
131/// # }
132/// ```
133///
134/// Creating an instance which connects to a relay proxy.
135/// ```
136/// # use launchdarkly_server_sdk::{Client, ConfigBuilder, ServiceEndpointsBuilder, BuildError};
137/// # fn main() -> Result<(), BuildError> {
138///     let ld_client = Client::build(ConfigBuilder::new("sdk-key")
139///         .service_endpoints(ServiceEndpointsBuilder::new()
140///             .relay_proxy("http://my-relay-hostname:8080")
141///         ).build()?
142///     )?;
143/// #   Ok(())
144/// # }
145/// ```
146///
147/// Each builder type includes usage examples for the builder.
148pub struct Client {
149    event_processor: Arc<dyn EventProcessor>,
150    data_source: Arc<dyn DataSource>,
151    data_store: Arc<RwLock<dyn DataStore>>,
152    events_default: EventsScope,
153    events_with_reasons: EventsScope,
154    init_notify: Arc<Semaphore>,
155    init_state: Arc<AtomicUsize>,
156    started: AtomicBool,
157    offline: bool,
158    daemon_mode: bool,
159    sdk_key: String,
160    shutdown_broadcast: broadcast::Sender<()>,
161    runtime: RwLock<Option<Runtime>>,
162}
163
164impl Client {
165    /// Create a new instance of a [Client] based on the provided [Config] parameter.
166    pub fn build(config: Config) -> Result<Self, BuildError> {
167        if config.offline() {
168            info!("Started LaunchDarkly Client in offline mode");
169        } else if config.daemon_mode() {
170            info!("Started LaunchDarkly Client in daemon mode");
171        }
172
173        let tags = config.application_tag();
174
175        let endpoints = config.service_endpoints_builder().build()?;
176        let event_processor =
177            config
178                .event_processor_builder()
179                .build(&endpoints, config.sdk_key(), tags.clone())?;
180        let data_source =
181            config
182                .data_source_builder()
183                .build(&endpoints, config.sdk_key(), tags.clone())?;
184        let data_store = config.data_store_builder().build()?;
185
186        let events_default = EventsScope {
187            disabled: config.offline(),
188            event_factory: EventFactory::new(false),
189            prerequisite_event_recorder: Box::new(PrerequisiteEventRecorder {
190                event_factory: EventFactory::new(false),
191                event_processor: event_processor.clone(),
192            }),
193        };
194
195        let events_with_reasons = EventsScope {
196            disabled: config.offline(),
197            event_factory: EventFactory::new(true),
198            prerequisite_event_recorder: Box::new(PrerequisiteEventRecorder {
199                event_factory: EventFactory::new(true),
200                event_processor: event_processor.clone(),
201            }),
202        };
203
204        let (shutdown_tx, _) = broadcast::channel(1);
205
206        Ok(Client {
207            event_processor,
208            data_source,
209            data_store,
210            events_default,
211            events_with_reasons,
212            init_notify: Arc::new(Semaphore::new(0)),
213            init_state: Arc::new(AtomicUsize::new(ClientInitState::Initializing as usize)),
214            started: AtomicBool::new(false),
215            offline: config.offline(),
216            daemon_mode: config.daemon_mode(),
217            sdk_key: config.sdk_key().into(),
218            shutdown_broadcast: shutdown_tx,
219            runtime: RwLock::new(None),
220        })
221    }
222
223    /// Starts a client in the current thread, which must have a default tokio runtime.
224    pub fn start_with_default_executor(&self) {
225        if self.started.load(Ordering::SeqCst) {
226            return;
227        }
228        self.started.store(true, Ordering::SeqCst);
229        self.start_with_default_executor_internal();
230    }
231
232    fn start_with_default_executor_internal(&self) {
233        // These clones are going to move into the closure, we
234        // do not want to move or reference `self`, because
235        // then lifetimes will get involved.
236        let notify = self.init_notify.clone();
237        let init_state = self.init_state.clone();
238
239        self.data_source.subscribe(
240            self.data_store.clone(),
241            Arc::new(move |success| {
242                init_state.store(
243                    (if success {
244                        ClientInitState::Initialized
245                    } else {
246                        ClientInitState::InitializationFailed
247                    }) as usize,
248                    Ordering::SeqCst,
249                );
250                notify.add_permits(1);
251            }),
252            self.shutdown_broadcast.subscribe(),
253        );
254    }
255
256    /// Creates a new tokio runtime and then starts the client. Tasks from the client will
257    /// be executed on created runtime.
258    /// If your application already has a tokio runtime, then you can use
259    /// [crate::Client::start_with_default_executor] and the client will dispatch tasks to
260    /// your existing runtime.
261    pub fn start_with_runtime(&self) -> Result<bool, StartError> {
262        if self.started.load(Ordering::SeqCst) {
263            return Ok(true);
264        }
265        self.started.store(true, Ordering::SeqCst);
266
267        let runtime = Runtime::new().map_err(StartError::SpawnFailed)?;
268        let _guard = runtime.enter();
269        self.runtime.write().replace(runtime);
270
271        self.start_with_default_executor_internal();
272
273        Ok(true)
274    }
275
276    /// This is an async method that will resolve once initialization is complete.
277    /// Initialization being complete does not mean that initialization was a success.
278    /// The return value from the method indicates if the client successfully initialized.
279    #[deprecated(
280        note = "blocking without a timeout is discouraged, use wait_for_initialization instead"
281    )]
282    pub async fn initialized_async(&self) -> bool {
283        self.initialized_async_internal().await
284    }
285
286    /// This is an async method that will resolve once initialization is complete or the specified
287    /// timeout has occurred.
288    ///
289    /// If the timeout is triggered, this method will return `None`. Otherwise, the method will
290    /// return a boolean indicating whether or not the SDK has successfully initialized.
291    pub async fn wait_for_initialization(&self, timeout: Duration) -> Option<bool> {
292        if timeout > Duration::from_secs(60) {
293            warn!("wait_for_initialization was configured to block for up to {} seconds. We recommend blocking no longer than 60 seconds.", timeout.as_secs());
294        }
295
296        let initialized = tokio::time::timeout(timeout, self.initialized_async_internal()).await;
297        match initialized {
298            Ok(result) => Some(result),
299            Err(_) => None,
300        }
301    }
302
303    async fn initialized_async_internal(&self) -> bool {
304        if self.offline || self.daemon_mode {
305            return true;
306        }
307
308        // If the client is not initialized, then we need to wait for it to be initialized.
309        // Because we are using atomic types, and not a lock, then there is still the possibility
310        // that the value will change between the read and when we wait. We use a semaphore to wait,
311        // and we do not forget the permit, therefore if the permit has been added, then we will get
312        // it very quickly and reduce blocking.
313        if ClientInitState::Initialized != self.init_state.load(Ordering::SeqCst) {
314            let _permit = self.init_notify.acquire().await;
315        }
316        ClientInitState::Initialized == self.init_state.load(Ordering::SeqCst)
317    }
318
319    /// This function synchronously returns if the SDK is initialized.
320    /// In the case of unrecoverable errors in establishing a connection it is possible for the
321    /// SDK to never become initialized.
322    pub fn initialized(&self) -> bool {
323        self.offline
324            || self.daemon_mode
325            || ClientInitState::Initialized == self.init_state.load(Ordering::SeqCst)
326    }
327
328    /// Close shuts down the LaunchDarkly client. After calling this, the LaunchDarkly client
329    /// should no longer be used. The method will block until all pending analytics events (if any)
330    /// been sent.
331    pub fn close(&self) {
332        self.event_processor.close();
333
334        // If the system is in offline mode or daemon mode, no receiver will be listening to this
335        // broadcast channel, so sending on it would always result in an error.
336        if !self.offline && !self.daemon_mode {
337            if let Err(e) = self.shutdown_broadcast.send(()) {
338                error!("Failed to shutdown client appropriately: {}", e);
339            }
340        }
341
342        // Potentially take the runtime we created when starting the client and do nothing with it
343        // so it drops, closing out all spawned tasks.
344        self.runtime.write().take();
345    }
346
347    /// Flush tells the client that all pending analytics events (if any) should be delivered as
348    /// soon as possible. Flushing is asynchronous, so this method will return before it is
349    /// complete. However, if you call [Client::close], events are guaranteed to be sent before
350    /// that method returns.
351    ///
352    /// For more information, see the Reference Guide:
353    /// <https://docs.launchdarkly.com/sdk/features/flush#rust>.
354    pub fn flush(&self) {
355        self.event_processor.flush();
356    }
357
358    /// Identify reports details about a context.
359    ///
360    /// For more information, see the Reference Guide:
361    /// <https://docs.launchdarkly.com/sdk/features/identify#rust>
362    pub fn identify(&self, context: Context) {
363        if self.events_default.disabled {
364            return;
365        }
366
367        self.send_internal(self.events_default.event_factory.new_identify(context));
368    }
369
370    /// Returns the value of a boolean feature flag for a given context.
371    ///
372    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned
373    /// off and has no off variation.
374    ///
375    /// For more information, see the Reference Guide:
376    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
377    pub fn bool_variation(&self, context: &Context, flag_key: &str, default: bool) -> bool {
378        let val = self.variation(context, flag_key, default);
379        if let Some(b) = val.as_bool() {
380            b
381        } else {
382            warn!(
383                "bool_variation called for a non-bool flag {:?} (got {:?})",
384                flag_key, val
385            );
386            default
387        }
388    }
389
390    /// Returns the value of a string feature flag for a given context.
391    ///
392    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned
393    /// off and has no off variation.
394    ///
395    /// For more information, see the Reference Guide:
396    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
397    pub fn str_variation(&self, context: &Context, flag_key: &str, default: String) -> String {
398        let val = self.variation(context, flag_key, default.clone());
399        if let Some(s) = val.as_string() {
400            s
401        } else {
402            warn!(
403                "str_variation called for a non-string flag {:?} (got {:?})",
404                flag_key, val
405            );
406            default
407        }
408    }
409
410    /// Returns the value of a float feature flag for a given context.
411    ///
412    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned
413    /// off and has no off variation.
414    ///
415    /// For more information, see the Reference Guide:
416    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
417    pub fn float_variation(&self, context: &Context, flag_key: &str, default: f64) -> f64 {
418        let val = self.variation(context, flag_key, default);
419        if let Some(f) = val.as_float() {
420            f
421        } else {
422            warn!(
423                "float_variation called for a non-float flag {:?} (got {:?})",
424                flag_key, val
425            );
426            default
427        }
428    }
429
430    /// Returns the value of a integer feature flag for a given context.
431    ///
432    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned
433    /// off and has no off variation.
434    ///
435    /// For more information, see the Reference Guide:
436    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
437    pub fn int_variation(&self, context: &Context, flag_key: &str, default: i64) -> i64 {
438        let val = self.variation(context, flag_key, default);
439        if let Some(f) = val.as_int() {
440            f
441        } else {
442            warn!(
443                "int_variation called for a non-int flag {:?} (got {:?})",
444                flag_key, val
445            );
446            default
447        }
448    }
449
450    /// Returns the value of a feature flag for the given context, allowing the value to be
451    /// of any JSON type.
452    ///
453    /// The value is returned as an [serde_json::Value].
454    ///
455    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned off.
456    ///
457    /// For more information, see the Reference Guide:
458    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
459    pub fn json_variation(
460        &self,
461        context: &Context,
462        flag_key: &str,
463        default: serde_json::Value,
464    ) -> serde_json::Value {
465        self.variation(context, flag_key, default.clone())
466            .as_json()
467            .unwrap_or(default)
468    }
469
470    /// This method is the same as [Client::bool_variation], but also returns further information
471    /// about how the value was calculated. The "reason" data will also be included in analytics
472    /// events.
473    ///
474    /// For more information, see the Reference Guide:
475    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
476    pub fn bool_variation_detail(
477        &self,
478        context: &Context,
479        flag_key: &str,
480        default: bool,
481    ) -> Detail<bool> {
482        self.variation_detail(context, flag_key, default).try_map(
483            |val| val.as_bool(),
484            default,
485            eval::Error::WrongType,
486        )
487    }
488
489    /// This method is the same as [Client::str_variation], but also returns further information
490    /// about how the value was calculated. The "reason" data will also be included in analytics
491    /// events.
492    ///
493    /// For more information, see the Reference Guide:
494    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
495    pub fn str_variation_detail(
496        &self,
497        context: &Context,
498        flag_key: &str,
499        default: String,
500    ) -> Detail<String> {
501        self.variation_detail(context, flag_key, default.clone())
502            .try_map(|val| val.as_string(), default, eval::Error::WrongType)
503    }
504
505    /// This method is the same as [Client::float_variation], but also returns further information
506    /// about how the value was calculated. The "reason" data will also be included in analytics
507    /// events.
508    ///
509    /// For more information, see the Reference Guide:
510    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
511    pub fn float_variation_detail(
512        &self,
513        context: &Context,
514        flag_key: &str,
515        default: f64,
516    ) -> Detail<f64> {
517        self.variation_detail(context, flag_key, default).try_map(
518            |val| val.as_float(),
519            default,
520            eval::Error::WrongType,
521        )
522    }
523
524    /// This method is the same as [Client::int_variation], but also returns further information
525    /// about how the value was calculated. The "reason" data will also be included in analytics
526    /// events.
527    ///
528    /// For more information, see the Reference Guide:
529    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
530    pub fn int_variation_detail(
531        &self,
532        context: &Context,
533        flag_key: &str,
534        default: i64,
535    ) -> Detail<i64> {
536        self.variation_detail(context, flag_key, default).try_map(
537            |val| val.as_int(),
538            default,
539            eval::Error::WrongType,
540        )
541    }
542
543    /// This method is the same as [Client::json_variation], but also returns further information
544    /// about how the value was calculated. The "reason" data will also be included in analytics
545    /// events.
546    ///
547    /// For more information, see the Reference Guide:
548    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
549    pub fn json_variation_detail(
550        &self,
551        context: &Context,
552        flag_key: &str,
553        default: serde_json::Value,
554    ) -> Detail<serde_json::Value> {
555        self.variation_detail(context, flag_key, default.clone())
556            .try_map(|val| val.as_json(), default, eval::Error::WrongType)
557    }
558
559    /// Generates the secure mode hash value for a context.
560    ///
561    /// For more information, see the Reference Guide:
562    /// <https://docs.launchdarkly.com/sdk/features/secure-mode#rust>.
563    pub fn secure_mode_hash(&self, context: &Context) -> String {
564        let key = aws_lc_rs::hmac::Key::new(aws_lc_rs::hmac::HMAC_SHA256, self.sdk_key.as_bytes());
565        let tag = aws_lc_rs::hmac::sign(&key, context.canonical_key().as_bytes());
566
567        data_encoding::HEXLOWER.encode(tag.as_ref())
568    }
569
570    /// Returns an object that encapsulates the state of all feature flags for a given context. This
571    /// includes the flag values, and also metadata that can be used on the front end.
572    ///
573    /// The most common use case for this method is to bootstrap a set of client-side feature flags
574    /// from a back-end service.
575    ///
576    /// You may pass any configuration of [FlagDetailConfig] to control what data is included.
577    ///
578    /// For more information, see the Reference Guide:
579    /// <https://docs.launchdarkly.com/sdk/features/all-flags#rust>
580    pub fn all_flags_detail(
581        &self,
582        context: &Context,
583        flag_state_config: FlagDetailConfig,
584    ) -> FlagDetail {
585        if self.offline {
586            warn!(
587                "all_flags_detail() called, but client is in offline mode. Returning empty state"
588            );
589            return FlagDetail::new(false);
590        }
591
592        if !self.initialized() {
593            warn!("all_flags_detail() called before client has finished initializing! Feature store unavailable - returning empty state");
594            return FlagDetail::new(false);
595        }
596
597        let data_store = self.data_store.read();
598
599        let mut flag_detail = FlagDetail::new(true);
600        flag_detail.populate(&*data_store, context, flag_state_config);
601
602        flag_detail
603    }
604
605    /// This method is the same as [Client::variation], but also returns further information about
606    /// how the value was calculated. The "reason" data will also be included in analytics events.
607    ///
608    /// For more information, see the Reference Guide:
609    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
610    pub fn variation_detail<T: Into<FlagValue> + Clone>(
611        &self,
612        context: &Context,
613        flag_key: &str,
614        default: T,
615    ) -> Detail<FlagValue> {
616        let (detail, _) =
617            self.variation_internal(context, flag_key, default, &self.events_with_reasons);
618        detail
619    }
620
621    /// This is a generic function which returns the value of a feature flag for a given context.
622    ///
623    /// This method is an alternatively to the type specified methods (e.g.
624    /// [Client::bool_variation], [Client::int_variation], etc.).
625    ///
626    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned
627    /// off and has no off variation.
628    ///
629    /// For more information, see the Reference Guide:
630    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
631    pub fn variation<T: Into<FlagValue> + Clone>(
632        &self,
633        context: &Context,
634        flag_key: &str,
635        default: T,
636    ) -> FlagValue {
637        let (detail, _) = self.variation_internal(context, flag_key, default, &self.events_default);
638        detail.value.unwrap()
639    }
640
641    /// This method returns the migration stage of the migration feature flag for the given
642    /// evaluation context.
643    ///
644    /// This method returns the default stage if there is an error or the flag does not exist.
645    pub fn migration_variation(
646        &self,
647        context: &Context,
648        flag_key: &str,
649        default_stage: Stage,
650    ) -> (Stage, Arc<Mutex<MigrationOpTracker>>) {
651        let (detail, flag) =
652            self.variation_internal(context, flag_key, default_stage, &self.events_default);
653
654        let migration_detail =
655            detail.try_map(|v| v.try_into().ok(), default_stage, eval::Error::WrongType);
656        let tracker = MigrationOpTracker::new(
657            flag_key.into(),
658            flag,
659            context.clone(),
660            migration_detail.clone(),
661            default_stage,
662        );
663
664        (
665            migration_detail.value.unwrap_or(default_stage),
666            Arc::new(Mutex::new(tracker)),
667        )
668    }
669
670    /// Reports that a context has performed an event.
671    ///
672    /// The `key` parameter is defined by the application and will be shown in analytics reports;
673    /// it normally corresponds to the event name of a metric that you have created through the
674    /// LaunchDarkly dashboard. If you want to associate additional data with this event, use
675    /// [Client::track_data] or [Client::track_metric].
676    ///
677    /// For more information, see the Reference Guide:
678    /// <https://docs.launchdarkly.com/sdk/features/events#rust>.
679    pub fn track_event(&self, context: Context, key: impl Into<String>) {
680        let _ = self.track(context, key, None, serde_json::Value::Null);
681    }
682
683    /// Reports that a context has performed an event, and associates it with custom data.
684    ///
685    /// The `key` parameter is defined by the application and will be shown in analytics reports;
686    /// it normally corresponds to the event name of a metric that you have created through the
687    /// LaunchDarkly dashboard.
688    ///
689    /// `data` parameter is any type that implements [Serialize]. If no such value is needed, use
690    /// [serde_json::Value::Null] (or call [Client::track_event] instead). To send a numeric value
691    /// for experimentation, use [Client::track_metric].
692    ///
693    /// For more information, see the Reference Guide:
694    /// <https://docs.launchdarkly.com/sdk/features/events#rust>.
695    pub fn track_data(
696        &self,
697        context: Context,
698        key: impl Into<String>,
699        data: impl Serialize,
700    ) -> serde_json::Result<()> {
701        self.track(context, key, None, data)
702    }
703
704    /// Reports that a context has performed an event, and associates it with a numeric value. This
705    /// value is used by the LaunchDarkly experimentation feature in numeric custom metrics, and
706    /// will also be returned as part of the custom event for Data Export.
707    ///
708    /// The `key` parameter is defined by the application and will be shown in analytics reports;
709    /// it normally corresponds to the event name of a metric that you have created through the
710    /// LaunchDarkly dashboard.
711    ///
712    /// For more information, see the Reference Guide:
713    /// <https://docs.launchdarkly.com/sdk/features/events#rust>.
714    pub fn track_metric(
715        &self,
716        context: Context,
717        key: impl Into<String>,
718        value: f64,
719        data: impl Serialize,
720    ) {
721        let _ = self.track(context, key, Some(value), data);
722    }
723
724    fn track(
725        &self,
726        context: Context,
727        key: impl Into<String>,
728        metric_value: Option<f64>,
729        data: impl Serialize,
730    ) -> serde_json::Result<()> {
731        if !self.events_default.disabled {
732            let event =
733                self.events_default
734                    .event_factory
735                    .new_custom(context, key, metric_value, data)?;
736
737            self.send_internal(event);
738        }
739
740        Ok(())
741    }
742
743    /// Tracks the results of a migrations operation. This event includes measurements which can be
744    /// used to enhance the observability of a migration within the LaunchDarkly UI.
745    ///
746    /// This event should be generated through [crate::MigrationOpTracker]. If you are using the
747    /// [crate::Migrator] to handle migrations, this event will be created and emitted
748    /// automatically.
749    pub fn track_migration_op(&self, tracker: Arc<Mutex<MigrationOpTracker>>) {
750        if self.events_default.disabled {
751            return;
752        }
753
754        match tracker.lock() {
755            Ok(tracker) => {
756                let event = tracker.build();
757                match event {
758                    Ok(event) => {
759                        self.send_internal(
760                            self.events_default.event_factory.new_migration_op(event),
761                        );
762                    }
763                    Err(e) => error!(
764                        "Failed to build migration event, no event will be sent: {}",
765                        e
766                    ),
767                }
768            }
769            Err(e) => error!(
770                "Failed to lock migration tracker, no event will be sent: {}",
771                e
772            ),
773        }
774    }
775
776    fn variation_internal<T: Into<FlagValue> + Clone>(
777        &self,
778        context: &Context,
779        flag_key: &str,
780        default: T,
781        events_scope: &EventsScope,
782    ) -> (Detail<FlagValue>, Option<eval::Flag>) {
783        if self.offline {
784            return (
785                Detail::err_default(eval::Error::ClientNotReady, default.into()),
786                None,
787            );
788        }
789
790        let (flag, result) = match self.initialized() {
791            false => (
792                None,
793                Detail::err_default(eval::Error::ClientNotReady, default.clone().into()),
794            ),
795            true => {
796                let data_store = self.data_store.read();
797                match data_store.flag(flag_key) {
798                    Some(flag) => {
799                        let result = eval::evaluate(
800                            data_store.to_store(),
801                            &flag,
802                            context,
803                            Some(&*events_scope.prerequisite_event_recorder),
804                        )
805                        .map(|v| v.clone())
806                        .or(default.clone().into());
807
808                        (Some(flag), result)
809                    }
810                    None => (
811                        None,
812                        Detail::err_default(eval::Error::FlagNotFound, default.clone().into()),
813                    ),
814                }
815            }
816        };
817
818        if !events_scope.disabled {
819            let event = match &flag {
820                Some(f) => events_scope.event_factory.new_eval_event(
821                    flag_key,
822                    context.clone(),
823                    f,
824                    result.clone(),
825                    default.into(),
826                    None,
827                ),
828                None => events_scope.event_factory.new_unknown_flag_event(
829                    flag_key,
830                    context.clone(),
831                    result.clone(),
832                    default.into(),
833                ),
834            };
835            self.send_internal(event);
836        }
837
838        (result, flag)
839    }
840
841    fn send_internal(&self, event: InputEvent) {
842        self.event_processor.send(event);
843    }
844}
845
846#[cfg(test)]
847mod tests {
848    use assert_json_diff::assert_json_eq;
849    use crossbeam_channel::Receiver;
850    use eval::{ContextBuilder, MultiContextBuilder};
851    use futures::FutureExt;
852    use hyper::client::HttpConnector;
853    use launchdarkly_server_sdk_evaluation::{Flag, Reason, Segment};
854    use maplit::hashmap;
855    use std::collections::HashMap;
856    use tokio::time::Instant;
857
858    use crate::data_source::MockDataSource;
859    use crate::data_source_builders::MockDataSourceBuilder;
860    use crate::events::create_event_sender;
861    use crate::events::event::{OutputEvent, VariationKey};
862    use crate::events::processor_builders::EventProcessorBuilder;
863    use crate::stores::persistent_store::tests::InMemoryPersistentDataStore;
864    use crate::stores::store_types::{PatchTarget, StorageItem};
865    use crate::test_common::{
866        self, basic_flag, basic_flag_with_prereq, basic_flag_with_prereqs_and_visibility,
867        basic_flag_with_visibility, basic_int_flag, basic_migration_flag, basic_off_flag,
868    };
869    use crate::{
870        AllData, ConfigBuilder, MigratorBuilder, NullEventProcessorBuilder, Operation, Origin,
871        PersistentDataStore, PersistentDataStoreBuilder, PersistentDataStoreFactory,
872        SerializedItem,
873    };
874    use test_case::test_case;
875
876    use super::*;
877
878    fn is_send_and_sync<T: Send + Sync>() {}
879
880    #[test]
881    fn ensure_client_is_send_and_sync() {
882        is_send_and_sync::<Client>()
883    }
884
885    #[tokio::test]
886    async fn client_asynchronously_initializes() {
887        let (client, _event_rx) = make_mocked_client_with_delay(1000, false, false);
888        client.start_with_default_executor();
889
890        let now = Instant::now();
891        let initialized = client.initialized_async().await;
892        let elapsed_time = now.elapsed();
893        assert!(initialized);
894        // Give ourself a good margin for thread scheduling.
895        assert!(elapsed_time.as_millis() > 500)
896    }
897
898    #[tokio::test]
899    async fn client_asynchronously_initializes_within_timeout() {
900        let (client, _event_rx) = make_mocked_client_with_delay(1000, false, false);
901        client.start_with_default_executor();
902
903        let now = Instant::now();
904        let initialized = client
905            .wait_for_initialization(Duration::from_millis(1500))
906            .await;
907        let elapsed_time = now.elapsed();
908        // Give ourself a good margin for thread scheduling.
909        assert!(elapsed_time.as_millis() > 500);
910        assert_eq!(initialized, Some(true));
911    }
912
913    #[tokio::test]
914    async fn client_asynchronously_initializes_slower_than_timeout() {
915        let (client, _event_rx) = make_mocked_client_with_delay(2000, false, false);
916        client.start_with_default_executor();
917
918        let now = Instant::now();
919        let initialized = client
920            .wait_for_initialization(Duration::from_millis(500))
921            .await;
922        let elapsed_time = now.elapsed();
923        // Give ourself a good margin for thread scheduling.
924        assert!(elapsed_time.as_millis() < 750);
925        assert!(initialized.is_none());
926    }
927
928    #[tokio::test]
929    async fn client_initializes_immediately_in_offline_mode() {
930        let (client, _event_rx) = make_mocked_client_with_delay(1000, true, false);
931        client.start_with_default_executor();
932
933        assert!(client.initialized());
934
935        let now = Instant::now();
936        let initialized = client
937            .wait_for_initialization(Duration::from_millis(2000))
938            .await;
939        let elapsed_time = now.elapsed();
940        assert_eq!(initialized, Some(true));
941        assert!(elapsed_time.as_millis() < 500)
942    }
943
944    #[tokio::test]
945    async fn client_initializes_immediately_in_daemon_mode() {
946        let (client, _event_rx) = make_mocked_client_with_delay(1000, false, true);
947        client.start_with_default_executor();
948
949        assert!(client.initialized());
950
951        let now = Instant::now();
952        let initialized = client
953            .wait_for_initialization(Duration::from_millis(2000))
954            .await;
955        let elapsed_time = now.elapsed();
956        assert_eq!(initialized, Some(true));
957        assert!(elapsed_time.as_millis() < 500)
958    }
959
960    #[test_case(basic_flag("myFlag"), false.into(), true.into())]
961    #[test_case(basic_int_flag("myFlag"), 0.into(), test_common::FLOAT_TO_INT_MAX.into())]
962    fn client_updates_changes_evaluation_results(
963        flag: eval::Flag,
964        default: FlagValue,
965        expected: FlagValue,
966    ) {
967        let context = ContextBuilder::new("foo")
968            .build()
969            .expect("Failed to create context");
970
971        let (client, _event_rx) = make_mocked_client();
972
973        let result = client.variation_detail(&context, "myFlag", default.clone());
974        assert_eq!(result.value.unwrap(), default);
975
976        client.start_with_default_executor();
977        client
978            .data_store
979            .write()
980            .upsert(
981                &flag.key,
982                PatchTarget::Flag(StorageItem::Item(flag.clone())),
983            )
984            .expect("patch should apply");
985
986        let result = client.variation_detail(&context, "myFlag", default);
987        assert_eq!(result.value.unwrap(), expected);
988        assert!(matches!(
989            result.reason,
990            Reason::Fallthrough {
991                in_experiment: false
992            }
993        ));
994    }
995
996    #[test]
997    fn all_flags_detail_is_invalid_when_offline() {
998        let (client, _event_rx) = make_mocked_offline_client();
999        client.start_with_default_executor();
1000
1001        let context = ContextBuilder::new("bob")
1002            .build()
1003            .expect("Failed to create context");
1004
1005        let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1006        assert_json_eq!(all_flags, json!({"$valid": false, "$flagsState" : {}}));
1007    }
1008
1009    #[test]
1010    fn all_flags_detail_is_invalid_when_not_initialized() {
1011        let (client, _event_rx) = make_mocked_client();
1012
1013        let context = ContextBuilder::new("bob")
1014            .build()
1015            .expect("Failed to create context");
1016
1017        let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1018        assert_json_eq!(all_flags, json!({"$valid": false, "$flagsState" : {}}));
1019    }
1020
1021    #[test]
1022    fn all_flags_detail_returns_flag_states() {
1023        let (client, _event_rx) = make_mocked_client();
1024        client.start_with_default_executor();
1025        client
1026            .data_store
1027            .write()
1028            .upsert(
1029                "myFlag1",
1030                PatchTarget::Flag(StorageItem::Item(basic_flag("myFlag1"))),
1031            )
1032            .expect("patch should apply");
1033        client
1034            .data_store
1035            .write()
1036            .upsert(
1037                "myFlag2",
1038                PatchTarget::Flag(StorageItem::Item(basic_flag("myFlag2"))),
1039            )
1040            .expect("patch should apply");
1041        let context = ContextBuilder::new("bob")
1042            .build()
1043            .expect("Failed to create context");
1044
1045        let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1046
1047        client.close();
1048
1049        assert_json_eq!(
1050            all_flags,
1051            json!({
1052                "myFlag1": true,
1053                "myFlag2": true,
1054                "$flagsState": {
1055                    "myFlag1": {
1056                        "version": 42,
1057                        "variation": 1
1058                    },
1059                     "myFlag2": {
1060                        "version": 42,
1061                        "variation": 1
1062                    },
1063                },
1064                "$valid": true
1065            })
1066        );
1067    }
1068
1069    #[test]
1070    fn all_flags_detail_returns_prerequisite_relations() {
1071        let (client, _event_rx) = make_mocked_client();
1072        client.start_with_default_executor();
1073        client
1074            .data_store
1075            .write()
1076            .upsert(
1077                "prereq1",
1078                PatchTarget::Flag(StorageItem::Item(basic_flag("prereq1"))),
1079            )
1080            .expect("patch should apply");
1081        client
1082            .data_store
1083            .write()
1084            .upsert(
1085                "prereq2",
1086                PatchTarget::Flag(StorageItem::Item(basic_flag("prereq2"))),
1087            )
1088            .expect("patch should apply");
1089
1090        client
1091            .data_store
1092            .write()
1093            .upsert(
1094                "toplevel",
1095                PatchTarget::Flag(StorageItem::Item(basic_flag_with_prereqs_and_visibility(
1096                    "toplevel",
1097                    &["prereq1", "prereq2"],
1098                    false,
1099                ))),
1100            )
1101            .expect("patch should apply");
1102
1103        let context = ContextBuilder::new("bob")
1104            .build()
1105            .expect("Failed to create context");
1106
1107        let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1108
1109        client.close();
1110
1111        assert_json_eq!(
1112            all_flags,
1113            json!({
1114                "prereq1": true,
1115                "prereq2": true,
1116                "toplevel": true,
1117                "$flagsState": {
1118                    "toplevel": {
1119                        "version": 42,
1120                        "variation": 1,
1121                        "prerequisites": ["prereq1", "prereq2"]
1122                    },
1123                    "prereq1": {
1124                        "version": 42,
1125                        "variation": 1
1126                    },
1127                     "prereq2": {
1128                        "version": 42,
1129                        "variation": 1
1130                    },
1131                },
1132                "$valid": true
1133            })
1134        );
1135    }
1136
1137    #[test]
1138    fn all_flags_detail_returns_prerequisite_relations_when_not_visible_to_clients() {
1139        let (client, _event_rx) = make_mocked_client();
1140        client.start_with_default_executor();
1141        client
1142            .data_store
1143            .write()
1144            .upsert(
1145                "prereq1",
1146                PatchTarget::Flag(StorageItem::Item(basic_flag_with_visibility(
1147                    "prereq1", false,
1148                ))),
1149            )
1150            .expect("patch should apply");
1151        client
1152            .data_store
1153            .write()
1154            .upsert(
1155                "prereq2",
1156                PatchTarget::Flag(StorageItem::Item(basic_flag_with_visibility(
1157                    "prereq2", false,
1158                ))),
1159            )
1160            .expect("patch should apply");
1161
1162        client
1163            .data_store
1164            .write()
1165            .upsert(
1166                "toplevel",
1167                PatchTarget::Flag(StorageItem::Item(basic_flag_with_prereqs_and_visibility(
1168                    "toplevel",
1169                    &["prereq1", "prereq2"],
1170                    true,
1171                ))),
1172            )
1173            .expect("patch should apply");
1174
1175        let context = ContextBuilder::new("bob")
1176            .build()
1177            .expect("Failed to create context");
1178
1179        let mut config = FlagDetailConfig::new();
1180        config.client_side_only();
1181
1182        let all_flags = client.all_flags_detail(&context, config);
1183
1184        client.close();
1185
1186        assert_json_eq!(
1187            all_flags,
1188            json!({
1189                "toplevel": true,
1190                "$flagsState": {
1191                    "toplevel": {
1192                        "version": 42,
1193                        "variation": 1,
1194                        "prerequisites": ["prereq1", "prereq2"]
1195                    },
1196                },
1197                "$valid": true
1198            })
1199        );
1200    }
1201
1202    #[test]
1203    fn variation_tracks_events_correctly() {
1204        let (client, event_rx) = make_mocked_client();
1205        client.start_with_default_executor();
1206        client
1207            .data_store
1208            .write()
1209            .upsert(
1210                "myFlag",
1211                PatchTarget::Flag(StorageItem::Item(basic_flag("myFlag"))),
1212            )
1213            .expect("patch should apply");
1214        let context = ContextBuilder::new("bob")
1215            .build()
1216            .expect("Failed to create context");
1217
1218        let flag_value = client.variation(&context, "myFlag", FlagValue::Bool(false));
1219
1220        assert!(flag_value.as_bool().unwrap());
1221        client.flush();
1222        client.close();
1223
1224        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1225        assert_eq!(events.len(), 2);
1226        assert_eq!(events[0].kind(), "index");
1227        assert_eq!(events[1].kind(), "summary");
1228
1229        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1230            let variation_key = VariationKey {
1231                version: Some(42),
1232                variation: Some(1),
1233            };
1234            let feature = event_summary.features.get("myFlag");
1235            assert!(feature.is_some());
1236
1237            let feature = feature.unwrap();
1238            assert!(feature.counters.contains_key(&variation_key));
1239        } else {
1240            panic!("Event should be a summary type");
1241        }
1242    }
1243
1244    #[test]
1245    fn variation_handles_offline_mode() {
1246        let (client, event_rx) = make_mocked_offline_client();
1247        client.start_with_default_executor();
1248
1249        let context = ContextBuilder::new("bob")
1250            .build()
1251            .expect("Failed to create context");
1252        let flag_value = client.variation(&context, "myFlag", FlagValue::Bool(false));
1253
1254        assert!(!flag_value.as_bool().unwrap());
1255        client.flush();
1256        client.close();
1257
1258        assert_eq!(event_rx.iter().count(), 0);
1259    }
1260
1261    #[test]
1262    fn variation_handles_unknown_flags() {
1263        let (client, event_rx) = make_mocked_client();
1264        client.start_with_default_executor();
1265        let context = ContextBuilder::new("bob")
1266            .build()
1267            .expect("Failed to create context");
1268
1269        let flag_value = client.variation(&context, "non-existent-flag", FlagValue::Bool(false));
1270
1271        assert!(!flag_value.as_bool().unwrap());
1272        client.flush();
1273        client.close();
1274
1275        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1276        assert_eq!(events.len(), 2);
1277        assert_eq!(events[0].kind(), "index");
1278        assert_eq!(events[1].kind(), "summary");
1279
1280        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1281            let variation_key = VariationKey {
1282                version: None,
1283                variation: None,
1284            };
1285
1286            let feature = event_summary.features.get("non-existent-flag");
1287            assert!(feature.is_some());
1288
1289            let feature = feature.unwrap();
1290            assert!(feature.counters.contains_key(&variation_key));
1291        } else {
1292            panic!("Event should be a summary type");
1293        }
1294    }
1295
1296    #[test]
1297    fn variation_detail_handles_debug_events_correctly() {
1298        let (client, event_rx) = make_mocked_client();
1299        client.start_with_default_executor();
1300
1301        let mut flag = basic_flag("myFlag");
1302        flag.debug_events_until_date = Some(64_060_606_800_000); // Jan. 1st, 4000
1303
1304        client
1305            .data_store
1306            .write()
1307            .upsert(
1308                &flag.key,
1309                PatchTarget::Flag(StorageItem::Item(flag.clone())),
1310            )
1311            .expect("patch should apply");
1312        let context = ContextBuilder::new("bob")
1313            .build()
1314            .expect("Failed to create context");
1315
1316        let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1317
1318        assert!(detail.value.unwrap().as_bool().unwrap());
1319        assert!(matches!(
1320            detail.reason,
1321            Reason::Fallthrough {
1322                in_experiment: false
1323            }
1324        ));
1325        client.flush();
1326        client.close();
1327
1328        let events = event_rx.try_iter().collect::<Vec<OutputEvent>>();
1329        assert_eq!(events.len(), 3);
1330        assert_eq!(events[0].kind(), "index");
1331        assert_eq!(events[1].kind(), "debug");
1332        assert_eq!(events[2].kind(), "summary");
1333
1334        if let OutputEvent::Summary(event_summary) = events[2].clone() {
1335            let variation_key = VariationKey {
1336                version: Some(42),
1337                variation: Some(1),
1338            };
1339
1340            let feature = event_summary.features.get("myFlag");
1341            assert!(feature.is_some());
1342
1343            let feature = feature.unwrap();
1344            assert!(feature.counters.contains_key(&variation_key));
1345        } else {
1346            panic!("Event should be a summary type");
1347        }
1348    }
1349
1350    #[test]
1351    fn variation_detail_tracks_events_correctly() {
1352        let (client, event_rx) = make_mocked_client();
1353        client.start_with_default_executor();
1354
1355        client
1356            .data_store
1357            .write()
1358            .upsert(
1359                "myFlag",
1360                PatchTarget::Flag(StorageItem::Item(basic_flag("myFlag"))),
1361            )
1362            .expect("patch should apply");
1363        let context = ContextBuilder::new("bob")
1364            .build()
1365            .expect("Failed to create context");
1366
1367        let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1368
1369        assert!(detail.value.unwrap().as_bool().unwrap());
1370        assert!(matches!(
1371            detail.reason,
1372            Reason::Fallthrough {
1373                in_experiment: false
1374            }
1375        ));
1376        client.flush();
1377        client.close();
1378
1379        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1380        assert_eq!(events.len(), 2);
1381        assert_eq!(events[0].kind(), "index");
1382        assert_eq!(events[1].kind(), "summary");
1383
1384        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1385            let variation_key = VariationKey {
1386                version: Some(42),
1387                variation: Some(1),
1388            };
1389
1390            let feature = event_summary.features.get("myFlag");
1391            assert!(feature.is_some());
1392
1393            let feature = feature.unwrap();
1394            assert!(feature.counters.contains_key(&variation_key));
1395        } else {
1396            panic!("Event should be a summary type");
1397        }
1398    }
1399
1400    #[test]
1401    fn variation_detail_handles_offline_mode() {
1402        let (client, event_rx) = make_mocked_offline_client();
1403        client.start_with_default_executor();
1404
1405        let context = ContextBuilder::new("bob")
1406            .build()
1407            .expect("Failed to create context");
1408
1409        let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1410
1411        assert!(!detail.value.unwrap().as_bool().unwrap());
1412        assert!(matches!(
1413            detail.reason,
1414            Reason::Error {
1415                error: eval::Error::ClientNotReady
1416            }
1417        ));
1418        client.flush();
1419        client.close();
1420
1421        assert_eq!(event_rx.iter().count(), 0);
1422    }
1423
1424    struct InMemoryPersistentDataStoreFactory {
1425        data: AllData<Flag, Segment>,
1426        initialized: bool,
1427    }
1428
1429    impl PersistentDataStoreFactory for InMemoryPersistentDataStoreFactory {
1430        fn create_persistent_data_store(
1431            &self,
1432        ) -> Result<Box<(dyn PersistentDataStore + 'static)>, std::io::Error> {
1433            let serialized_data =
1434                AllData::<SerializedItem, SerializedItem>::try_from(self.data.clone())?;
1435            Ok(Box::new(InMemoryPersistentDataStore {
1436                data: serialized_data,
1437                initialized: self.initialized,
1438            }))
1439        }
1440    }
1441
1442    #[test]
1443    fn variation_detail_handles_daemon_mode() {
1444        testing_logger::setup();
1445        let factory = InMemoryPersistentDataStoreFactory {
1446            data: AllData {
1447                flags: hashmap!["flag".into() => basic_flag("flag")],
1448                segments: HashMap::new(),
1449            },
1450            initialized: true,
1451        };
1452        let builder = PersistentDataStoreBuilder::new(Arc::new(factory));
1453
1454        let config = ConfigBuilder::new("sdk-key")
1455            .daemon_mode(true)
1456            .data_store(&builder)
1457            .event_processor(&NullEventProcessorBuilder::new())
1458            .build()
1459            .expect("config should build");
1460
1461        let client = Client::build(config).expect("Should be built.");
1462
1463        client.start_with_default_executor();
1464
1465        let context = ContextBuilder::new("bob")
1466            .build()
1467            .expect("Failed to create context");
1468
1469        let detail = client.variation_detail(&context, "flag", FlagValue::Bool(false));
1470
1471        assert!(detail.value.unwrap().as_bool().unwrap());
1472        assert!(matches!(
1473            detail.reason,
1474            Reason::Fallthrough {
1475                in_experiment: false
1476            }
1477        ));
1478        client.flush();
1479        client.close();
1480
1481        testing_logger::validate(|captured_logs| {
1482            assert_eq!(captured_logs.len(), 1);
1483            assert_eq!(
1484                captured_logs[0].body,
1485                "Started LaunchDarkly Client in daemon mode"
1486            );
1487        });
1488    }
1489
1490    #[test]
1491    fn daemon_mode_is_quiet_if_store_is_not_initialized() {
1492        testing_logger::setup();
1493
1494        let factory = InMemoryPersistentDataStoreFactory {
1495            data: AllData {
1496                flags: HashMap::new(),
1497                segments: HashMap::new(),
1498            },
1499            initialized: false,
1500        };
1501        let builder = PersistentDataStoreBuilder::new(Arc::new(factory));
1502
1503        let config = ConfigBuilder::new("sdk-key")
1504            .daemon_mode(true)
1505            .data_store(&builder)
1506            .event_processor(&NullEventProcessorBuilder::new())
1507            .build()
1508            .expect("config should build");
1509
1510        let client = Client::build(config).expect("Should be built.");
1511
1512        client.start_with_default_executor();
1513
1514        let context = ContextBuilder::new("bob")
1515            .build()
1516            .expect("Failed to create context");
1517
1518        client.variation_detail(&context, "flag", FlagValue::Bool(false));
1519
1520        testing_logger::validate(|captured_logs| {
1521            assert_eq!(captured_logs.len(), 1);
1522            assert_eq!(
1523                captured_logs[0].body,
1524                "Started LaunchDarkly Client in daemon mode"
1525            );
1526        });
1527    }
1528
1529    #[test]
1530    fn variation_handles_off_flag_without_variation() {
1531        let (client, event_rx) = make_mocked_client();
1532        client.start_with_default_executor();
1533
1534        client
1535            .data_store
1536            .write()
1537            .upsert(
1538                "myFlag",
1539                PatchTarget::Flag(StorageItem::Item(basic_off_flag("myFlag"))),
1540            )
1541            .expect("patch should apply");
1542        let context = ContextBuilder::new("bob")
1543            .build()
1544            .expect("Failed to create context");
1545
1546        let result = client.variation(&context, "myFlag", FlagValue::Bool(false));
1547
1548        assert!(!result.as_bool().unwrap());
1549        client.flush();
1550        client.close();
1551
1552        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1553        assert_eq!(events.len(), 2);
1554        assert_eq!(events[0].kind(), "index");
1555        assert_eq!(events[1].kind(), "summary");
1556
1557        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1558            let variation_key = VariationKey {
1559                version: Some(42),
1560                variation: None,
1561            };
1562            let feature = event_summary.features.get("myFlag");
1563            assert!(feature.is_some());
1564
1565            let feature = feature.unwrap();
1566            assert!(feature.counters.contains_key(&variation_key));
1567        } else {
1568            panic!("Event should be a summary type");
1569        }
1570    }
1571
1572    #[test]
1573    fn variation_detail_tracks_prereq_events_correctly() {
1574        let (client, event_rx) = make_mocked_client();
1575        client.start_with_default_executor();
1576
1577        let mut basic_preqreq_flag = basic_flag("prereqFlag");
1578        basic_preqreq_flag.track_events = true;
1579
1580        client
1581            .data_store
1582            .write()
1583            .upsert(
1584                "prereqFlag",
1585                PatchTarget::Flag(StorageItem::Item(basic_preqreq_flag)),
1586            )
1587            .expect("patch should apply");
1588
1589        let mut basic_flag = basic_flag_with_prereq("myFlag", "prereqFlag");
1590        basic_flag.track_events = true;
1591        client
1592            .data_store
1593            .write()
1594            .upsert("myFlag", PatchTarget::Flag(StorageItem::Item(basic_flag)))
1595            .expect("patch should apply");
1596        let context = ContextBuilder::new("bob")
1597            .build()
1598            .expect("Failed to create context");
1599
1600        let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1601
1602        assert!(detail.value.unwrap().as_bool().unwrap());
1603        assert!(matches!(
1604            detail.reason,
1605            Reason::Fallthrough {
1606                in_experiment: false
1607            }
1608        ));
1609        client.flush();
1610        client.close();
1611
1612        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1613        assert_eq!(events.len(), 4);
1614        assert_eq!(events[0].kind(), "index");
1615        assert_eq!(events[1].kind(), "feature");
1616        assert_eq!(events[2].kind(), "feature");
1617        assert_eq!(events[3].kind(), "summary");
1618
1619        if let OutputEvent::Summary(event_summary) = events[3].clone() {
1620            let variation_key = VariationKey {
1621                version: Some(42),
1622                variation: Some(1),
1623            };
1624            let feature = event_summary.features.get("myFlag");
1625            assert!(feature.is_some());
1626
1627            let feature = feature.unwrap();
1628            assert!(feature.counters.contains_key(&variation_key));
1629
1630            let variation_key = VariationKey {
1631                version: Some(42),
1632                variation: Some(1),
1633            };
1634            let feature = event_summary.features.get("prereqFlag");
1635            assert!(feature.is_some());
1636
1637            let feature = feature.unwrap();
1638            assert!(feature.counters.contains_key(&variation_key));
1639        }
1640    }
1641
1642    #[test]
1643    fn variation_handles_failed_prereqs_correctly() {
1644        let (client, event_rx) = make_mocked_client();
1645        client.start_with_default_executor();
1646
1647        let mut basic_preqreq_flag = basic_off_flag("prereqFlag");
1648        basic_preqreq_flag.track_events = true;
1649
1650        client
1651            .data_store
1652            .write()
1653            .upsert(
1654                "prereqFlag",
1655                PatchTarget::Flag(StorageItem::Item(basic_preqreq_flag)),
1656            )
1657            .expect("patch should apply");
1658
1659        let mut basic_flag = basic_flag_with_prereq("myFlag", "prereqFlag");
1660        basic_flag.track_events = true;
1661        client
1662            .data_store
1663            .write()
1664            .upsert("myFlag", PatchTarget::Flag(StorageItem::Item(basic_flag)))
1665            .expect("patch should apply");
1666        let context = ContextBuilder::new("bob")
1667            .build()
1668            .expect("Failed to create context");
1669
1670        let detail = client.variation(&context, "myFlag", FlagValue::Bool(false));
1671
1672        assert!(!detail.as_bool().unwrap());
1673        client.flush();
1674        client.close();
1675
1676        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1677        assert_eq!(events.len(), 4);
1678        assert_eq!(events[0].kind(), "index");
1679        assert_eq!(events[1].kind(), "feature");
1680        assert_eq!(events[2].kind(), "feature");
1681        assert_eq!(events[3].kind(), "summary");
1682
1683        if let OutputEvent::Summary(event_summary) = events[3].clone() {
1684            let variation_key = VariationKey {
1685                version: Some(42),
1686                variation: Some(0),
1687            };
1688            let feature = event_summary.features.get("myFlag");
1689            assert!(feature.is_some());
1690
1691            let feature = feature.unwrap();
1692            assert!(feature.counters.contains_key(&variation_key));
1693
1694            let variation_key = VariationKey {
1695                version: Some(42),
1696                variation: None,
1697            };
1698            let feature = event_summary.features.get("prereqFlag");
1699            assert!(feature.is_some());
1700
1701            let feature = feature.unwrap();
1702            assert!(feature.counters.contains_key(&variation_key));
1703        }
1704    }
1705
1706    #[test]
1707    fn variation_detail_handles_flag_not_found() {
1708        let (client, event_rx) = make_mocked_client();
1709        client.start_with_default_executor();
1710
1711        let context = ContextBuilder::new("bob")
1712            .build()
1713            .expect("Failed to create context");
1714        let detail = client.variation_detail(&context, "non-existent-flag", FlagValue::Bool(false));
1715
1716        assert!(!detail.value.unwrap().as_bool().unwrap());
1717        assert!(matches!(
1718            detail.reason,
1719            Reason::Error {
1720                error: eval::Error::FlagNotFound
1721            }
1722        ));
1723        client.flush();
1724        client.close();
1725
1726        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1727        assert_eq!(events.len(), 2);
1728        assert_eq!(events[0].kind(), "index");
1729        assert_eq!(events[1].kind(), "summary");
1730
1731        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1732            let variation_key = VariationKey {
1733                version: None,
1734                variation: None,
1735            };
1736            let feature = event_summary.features.get("non-existent-flag");
1737            assert!(feature.is_some());
1738
1739            let feature = feature.unwrap();
1740            assert!(feature.counters.contains_key(&variation_key));
1741        } else {
1742            panic!("Event should be a summary type");
1743        }
1744    }
1745
1746    #[tokio::test]
1747    async fn variation_detail_handles_client_not_ready() {
1748        let (client, event_rx) = make_mocked_client_with_delay(u64::MAX, false, false);
1749        client.start_with_default_executor();
1750        let context = ContextBuilder::new("bob")
1751            .build()
1752            .expect("Failed to create context");
1753
1754        let detail = client.variation_detail(&context, "non-existent-flag", FlagValue::Bool(false));
1755
1756        assert!(!detail.value.unwrap().as_bool().unwrap());
1757        assert!(matches!(
1758            detail.reason,
1759            Reason::Error {
1760                error: eval::Error::ClientNotReady
1761            }
1762        ));
1763        client.flush();
1764        client.close();
1765
1766        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1767        assert_eq!(events.len(), 2);
1768        assert_eq!(events[0].kind(), "index");
1769        assert_eq!(events[1].kind(), "summary");
1770
1771        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1772            let variation_key = VariationKey {
1773                version: None,
1774                variation: None,
1775            };
1776            let feature = event_summary.features.get("non-existent-flag");
1777            assert!(feature.is_some());
1778
1779            let feature = feature.unwrap();
1780            assert!(feature.counters.contains_key(&variation_key));
1781        } else {
1782            panic!("Event should be a summary type");
1783        }
1784    }
1785
1786    #[test]
1787    fn identify_sends_identify_event() {
1788        let (client, event_rx) = make_mocked_client();
1789        client.start_with_default_executor();
1790
1791        let context = ContextBuilder::new("bob")
1792            .build()
1793            .expect("Failed to create context");
1794
1795        client.identify(context);
1796        client.flush();
1797        client.close();
1798
1799        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1800        assert_eq!(events.len(), 1);
1801        assert_eq!(events[0].kind(), "identify");
1802    }
1803
1804    #[test]
1805    fn identify_sends_sends_nothing_in_offline_mode() {
1806        let (client, event_rx) = make_mocked_offline_client();
1807        client.start_with_default_executor();
1808
1809        let context = ContextBuilder::new("bob")
1810            .build()
1811            .expect("Failed to create context");
1812
1813        client.identify(context);
1814        client.flush();
1815        client.close();
1816
1817        assert_eq!(event_rx.iter().count(), 0);
1818    }
1819
1820    #[test]
1821    fn secure_mode_hash() {
1822        let config = ConfigBuilder::new("secret")
1823            .offline(true)
1824            .build()
1825            .expect("config should build");
1826        let client = Client::build(config).expect("Should be built.");
1827        let context = ContextBuilder::new("Message")
1828            .build()
1829            .expect("Failed to create context");
1830
1831        assert_eq!(
1832            client.secure_mode_hash(&context),
1833            "aa747c502a898200f9e4fa21bac68136f886a0e27aec70ba06daf2e2a5cb5597"
1834        );
1835    }
1836
1837    #[test]
1838    fn secure_mode_hash_with_multi_kind() {
1839        let config = ConfigBuilder::new("secret")
1840            .offline(true)
1841            .build()
1842            .expect("config should build");
1843        let client = Client::build(config).expect("Should be built.");
1844
1845        let org = ContextBuilder::new("org-key|1")
1846            .kind("org")
1847            .build()
1848            .expect("Failed to create context");
1849        let user = ContextBuilder::new("user-key:2")
1850            .build()
1851            .expect("Failed to create context");
1852
1853        let context = MultiContextBuilder::new()
1854            .add_context(org)
1855            .add_context(user)
1856            .build()
1857            .expect("failed to build multi-context");
1858
1859        assert_eq!(
1860            client.secure_mode_hash(&context),
1861            "5687e6383b920582ed50c2a96c98a115f1b6aad85a60579d761d9b8797415163"
1862        );
1863    }
1864
1865    #[derive(Serialize)]
1866    struct MyCustomData {
1867        pub answer: u32,
1868    }
1869
1870    #[test]
1871    fn track_sends_track_and_index_events() -> serde_json::Result<()> {
1872        let (client, event_rx) = make_mocked_client();
1873        client.start_with_default_executor();
1874
1875        let context = ContextBuilder::new("bob")
1876            .build()
1877            .expect("Failed to create context");
1878
1879        client.track_event(context.clone(), "event-with-null");
1880        client.track_data(context.clone(), "event-with-string", "string-data")?;
1881        client.track_data(context.clone(), "event-with-json", json!({"answer": 42}))?;
1882        client.track_data(
1883            context.clone(),
1884            "event-with-struct",
1885            MyCustomData { answer: 42 },
1886        )?;
1887        client.track_metric(context, "event-with-metric", 42.0, serde_json::Value::Null);
1888
1889        client.flush();
1890        client.close();
1891
1892        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1893        assert_eq!(events.len(), 6);
1894
1895        let mut events_by_type: HashMap<&str, usize> = HashMap::new();
1896        for event in events {
1897            if let Some(count) = events_by_type.get_mut(event.kind()) {
1898                *count += 1;
1899            } else {
1900                events_by_type.insert(event.kind(), 1);
1901            }
1902        }
1903        assert!(matches!(events_by_type.get("index"), Some(1)));
1904        assert!(matches!(events_by_type.get("custom"), Some(5)));
1905
1906        Ok(())
1907    }
1908
1909    #[test]
1910    fn track_sends_nothing_in_offline_mode() -> serde_json::Result<()> {
1911        let (client, event_rx) = make_mocked_offline_client();
1912        client.start_with_default_executor();
1913
1914        let context = ContextBuilder::new("bob")
1915            .build()
1916            .expect("Failed to create context");
1917
1918        client.track_event(context.clone(), "event-with-null");
1919        client.track_data(context.clone(), "event-with-string", "string-data")?;
1920        client.track_data(context.clone(), "event-with-json", json!({"answer": 42}))?;
1921        client.track_data(
1922            context.clone(),
1923            "event-with-struct",
1924            MyCustomData { answer: 42 },
1925        )?;
1926        client.track_metric(context, "event-with-metric", 42.0, serde_json::Value::Null);
1927
1928        client.flush();
1929        client.close();
1930
1931        assert_eq!(event_rx.iter().count(), 0);
1932
1933        Ok(())
1934    }
1935
1936    #[test]
1937    fn migration_handles_flag_not_found() {
1938        let (client, _event_rx) = make_mocked_client();
1939        client.start_with_default_executor();
1940
1941        let context = ContextBuilder::new("bob")
1942            .build()
1943            .expect("Failed to create context");
1944
1945        let (stage, _tracker) =
1946            client.migration_variation(&context, "non-existent-flag-key", Stage::Off);
1947
1948        assert_eq!(stage, Stage::Off);
1949    }
1950
1951    #[test]
1952    fn migration_uses_non_migration_flag() {
1953        let (client, _event_rx) = make_mocked_client();
1954        client.start_with_default_executor();
1955        client
1956            .data_store
1957            .write()
1958            .upsert(
1959                "boolean-flag",
1960                PatchTarget::Flag(StorageItem::Item(basic_flag("boolean-flag"))),
1961            )
1962            .expect("patch should apply");
1963
1964        let context = ContextBuilder::new("bob")
1965            .build()
1966            .expect("Failed to create context");
1967
1968        let (stage, _tracker) = client.migration_variation(&context, "boolean-flag", Stage::Off);
1969
1970        assert_eq!(stage, Stage::Off);
1971    }
1972
1973    #[test_case(Stage::Off)]
1974    #[test_case(Stage::DualWrite)]
1975    #[test_case(Stage::Shadow)]
1976    #[test_case(Stage::Live)]
1977    #[test_case(Stage::Rampdown)]
1978    #[test_case(Stage::Complete)]
1979    fn migration_can_determine_correct_stage_from_flag(stage: Stage) {
1980        let (client, _event_rx) = make_mocked_client();
1981        client.start_with_default_executor();
1982        client
1983            .data_store
1984            .write()
1985            .upsert(
1986                "stage-flag",
1987                PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
1988            )
1989            .expect("patch should apply");
1990
1991        let context = ContextBuilder::new("bob")
1992            .build()
1993            .expect("Failed to create context");
1994
1995        let (evaluated_stage, _tracker) =
1996            client.migration_variation(&context, "stage-flag", Stage::Off);
1997
1998        assert_eq!(evaluated_stage, stage);
1999    }
2000
2001    #[tokio::test]
2002    async fn migration_tracks_invoked_correctly() {
2003        migration_tracks_invoked_correctly_driver(Stage::Off, Operation::Read, vec![Origin::Old])
2004            .await;
2005        migration_tracks_invoked_correctly_driver(
2006            Stage::DualWrite,
2007            Operation::Read,
2008            vec![Origin::Old],
2009        )
2010        .await;
2011        migration_tracks_invoked_correctly_driver(
2012            Stage::Shadow,
2013            Operation::Read,
2014            vec![Origin::Old, Origin::New],
2015        )
2016        .await;
2017        migration_tracks_invoked_correctly_driver(
2018            Stage::Live,
2019            Operation::Read,
2020            vec![Origin::Old, Origin::New],
2021        )
2022        .await;
2023        migration_tracks_invoked_correctly_driver(
2024            Stage::Rampdown,
2025            Operation::Read,
2026            vec![Origin::New],
2027        )
2028        .await;
2029        migration_tracks_invoked_correctly_driver(
2030            Stage::Complete,
2031            Operation::Read,
2032            vec![Origin::New],
2033        )
2034        .await;
2035        migration_tracks_invoked_correctly_driver(Stage::Off, Operation::Write, vec![Origin::Old])
2036            .await;
2037        migration_tracks_invoked_correctly_driver(
2038            Stage::DualWrite,
2039            Operation::Write,
2040            vec![Origin::Old, Origin::New],
2041        )
2042        .await;
2043        migration_tracks_invoked_correctly_driver(
2044            Stage::Shadow,
2045            Operation::Write,
2046            vec![Origin::Old, Origin::New],
2047        )
2048        .await;
2049        migration_tracks_invoked_correctly_driver(
2050            Stage::Live,
2051            Operation::Write,
2052            vec![Origin::Old, Origin::New],
2053        )
2054        .await;
2055        migration_tracks_invoked_correctly_driver(
2056            Stage::Rampdown,
2057            Operation::Write,
2058            vec![Origin::Old, Origin::New],
2059        )
2060        .await;
2061        migration_tracks_invoked_correctly_driver(
2062            Stage::Complete,
2063            Operation::Write,
2064            vec![Origin::New],
2065        )
2066        .await;
2067    }
2068
2069    async fn migration_tracks_invoked_correctly_driver(
2070        stage: Stage,
2071        operation: Operation,
2072        origins: Vec<Origin>,
2073    ) {
2074        let (client, event_rx) = make_mocked_client();
2075        let client = Arc::new(client);
2076        client.start_with_default_executor();
2077        client
2078            .data_store
2079            .write()
2080            .upsert(
2081                "stage-flag",
2082                PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2083            )
2084            .expect("patch should apply");
2085
2086        let mut migrator = MigratorBuilder::new(client.clone())
2087            .read(
2088                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2089                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2090                Some(|_, _| true),
2091            )
2092            .write(
2093                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2094                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2095            )
2096            .build()
2097            .expect("migrator should build");
2098
2099        let context = ContextBuilder::new("bob")
2100            .build()
2101            .expect("Failed to create context");
2102
2103        if let Operation::Read = operation {
2104            migrator
2105                .read(
2106                    &context,
2107                    "stage-flag".into(),
2108                    Stage::Off,
2109                    serde_json::Value::Null,
2110                )
2111                .await;
2112        } else {
2113            migrator
2114                .write(
2115                    &context,
2116                    "stage-flag".into(),
2117                    Stage::Off,
2118                    serde_json::Value::Null,
2119                )
2120                .await;
2121        }
2122
2123        client.flush();
2124        client.close();
2125
2126        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2127        assert_eq!(events.len(), 3);
2128        match &events[1] {
2129            OutputEvent::MigrationOp(event) => {
2130                assert!(event.invoked.len() == origins.len());
2131                assert!(event.invoked.iter().all(|i| origins.contains(i)));
2132            }
2133            _ => panic!("Expected migration event"),
2134        }
2135    }
2136
2137    #[tokio::test]
2138    async fn migration_tracks_latency() {
2139        migration_tracks_latency_driver(Stage::Off, Operation::Read, vec![Origin::Old]).await;
2140        migration_tracks_latency_driver(Stage::DualWrite, Operation::Read, vec![Origin::Old]).await;
2141        migration_tracks_latency_driver(
2142            Stage::Shadow,
2143            Operation::Read,
2144            vec![Origin::Old, Origin::New],
2145        )
2146        .await;
2147        migration_tracks_latency_driver(
2148            Stage::Live,
2149            Operation::Read,
2150            vec![Origin::Old, Origin::New],
2151        )
2152        .await;
2153        migration_tracks_latency_driver(Stage::Rampdown, Operation::Read, vec![Origin::New]).await;
2154        migration_tracks_latency_driver(Stage::Complete, Operation::Read, vec![Origin::New]).await;
2155        migration_tracks_latency_driver(Stage::Off, Operation::Write, vec![Origin::Old]).await;
2156        migration_tracks_latency_driver(
2157            Stage::DualWrite,
2158            Operation::Write,
2159            vec![Origin::Old, Origin::New],
2160        )
2161        .await;
2162        migration_tracks_latency_driver(
2163            Stage::Shadow,
2164            Operation::Write,
2165            vec![Origin::Old, Origin::New],
2166        )
2167        .await;
2168        migration_tracks_latency_driver(
2169            Stage::Live,
2170            Operation::Write,
2171            vec![Origin::Old, Origin::New],
2172        )
2173        .await;
2174        migration_tracks_latency_driver(
2175            Stage::Rampdown,
2176            Operation::Write,
2177            vec![Origin::Old, Origin::New],
2178        )
2179        .await;
2180        migration_tracks_latency_driver(Stage::Complete, Operation::Write, vec![Origin::New]).await;
2181    }
2182
2183    async fn migration_tracks_latency_driver(
2184        stage: Stage,
2185        operation: Operation,
2186        origins: Vec<Origin>,
2187    ) {
2188        let (client, event_rx) = make_mocked_client();
2189        let client = Arc::new(client);
2190        client.start_with_default_executor();
2191        client
2192            .data_store
2193            .write()
2194            .upsert(
2195                "stage-flag",
2196                PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2197            )
2198            .expect("patch should apply");
2199
2200        let mut migrator = MigratorBuilder::new(client.clone())
2201            .track_latency(true)
2202            .read(
2203                |_| {
2204                    async move {
2205                        async_std::task::sleep(Duration::from_millis(100)).await;
2206                        Ok(serde_json::Value::Null)
2207                    }
2208                    .boxed()
2209                },
2210                |_| {
2211                    async move {
2212                        async_std::task::sleep(Duration::from_millis(100)).await;
2213                        Ok(serde_json::Value::Null)
2214                    }
2215                    .boxed()
2216                },
2217                Some(|_, _| true),
2218            )
2219            .write(
2220                |_| {
2221                    async move {
2222                        async_std::task::sleep(Duration::from_millis(100)).await;
2223                        Ok(serde_json::Value::Null)
2224                    }
2225                    .boxed()
2226                },
2227                |_| {
2228                    async move {
2229                        async_std::task::sleep(Duration::from_millis(100)).await;
2230                        Ok(serde_json::Value::Null)
2231                    }
2232                    .boxed()
2233                },
2234            )
2235            .build()
2236            .expect("migrator should build");
2237
2238        let context = ContextBuilder::new("bob")
2239            .build()
2240            .expect("Failed to create context");
2241
2242        if let Operation::Read = operation {
2243            migrator
2244                .read(
2245                    &context,
2246                    "stage-flag".into(),
2247                    Stage::Off,
2248                    serde_json::Value::Null,
2249                )
2250                .await;
2251        } else {
2252            migrator
2253                .write(
2254                    &context,
2255                    "stage-flag".into(),
2256                    Stage::Off,
2257                    serde_json::Value::Null,
2258                )
2259                .await;
2260        }
2261
2262        client.flush();
2263        client.close();
2264
2265        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2266        assert_eq!(events.len(), 3);
2267        match &events[1] {
2268            OutputEvent::MigrationOp(event) => {
2269                assert!(event.latency.len() == origins.len());
2270                assert!(event
2271                    .latency
2272                    .values()
2273                    .all(|l| l > &Duration::from_millis(100)));
2274            }
2275            _ => panic!("Expected migration event"),
2276        }
2277    }
2278
2279    #[tokio::test]
2280    async fn migration_tracks_read_errors() {
2281        migration_tracks_read_errors_driver(Stage::Off, vec![Origin::Old]).await;
2282        migration_tracks_read_errors_driver(Stage::DualWrite, vec![Origin::Old]).await;
2283        migration_tracks_read_errors_driver(Stage::Shadow, vec![Origin::Old, Origin::New]).await;
2284        migration_tracks_read_errors_driver(Stage::Live, vec![Origin::Old, Origin::New]).await;
2285        migration_tracks_read_errors_driver(Stage::Rampdown, vec![Origin::New]).await;
2286        migration_tracks_read_errors_driver(Stage::Complete, vec![Origin::New]).await;
2287    }
2288
2289    async fn migration_tracks_read_errors_driver(stage: Stage, origins: Vec<Origin>) {
2290        let (client, event_rx) = make_mocked_client();
2291        let client = Arc::new(client);
2292        client.start_with_default_executor();
2293        client
2294            .data_store
2295            .write()
2296            .upsert(
2297                "stage-flag",
2298                PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2299            )
2300            .expect("patch should apply");
2301
2302        let mut migrator = MigratorBuilder::new(client.clone())
2303            .track_latency(true)
2304            .read(
2305                |_| async move { Err("fail".into()) }.boxed(),
2306                |_| async move { Err("fail".into()) }.boxed(),
2307                Some(|_: &String, _: &String| true),
2308            )
2309            .write(
2310                |_| async move { Err("fail".into()) }.boxed(),
2311                |_| async move { Err("fail".into()) }.boxed(),
2312            )
2313            .build()
2314            .expect("migrator should build");
2315
2316        let context = ContextBuilder::new("bob")
2317            .build()
2318            .expect("Failed to create context");
2319
2320        migrator
2321            .read(
2322                &context,
2323                "stage-flag".into(),
2324                Stage::Off,
2325                serde_json::Value::Null,
2326            )
2327            .await;
2328        client.flush();
2329        client.close();
2330
2331        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2332        assert_eq!(events.len(), 3);
2333        match &events[1] {
2334            OutputEvent::MigrationOp(event) => {
2335                assert!(event.errors.len() == origins.len());
2336                assert!(event.errors.iter().all(|i| origins.contains(i)));
2337            }
2338            _ => panic!("Expected migration event"),
2339        }
2340    }
2341
2342    #[tokio::test]
2343    async fn migration_tracks_authoritative_write_errors() {
2344        migration_tracks_authoritative_write_errors_driver(Stage::Off, vec![Origin::Old]).await;
2345        migration_tracks_authoritative_write_errors_driver(Stage::DualWrite, vec![Origin::Old])
2346            .await;
2347        migration_tracks_authoritative_write_errors_driver(Stage::Shadow, vec![Origin::Old]).await;
2348        migration_tracks_authoritative_write_errors_driver(Stage::Live, vec![Origin::New]).await;
2349        migration_tracks_authoritative_write_errors_driver(Stage::Rampdown, vec![Origin::New])
2350            .await;
2351        migration_tracks_authoritative_write_errors_driver(Stage::Complete, vec![Origin::New])
2352            .await;
2353    }
2354
2355    async fn migration_tracks_authoritative_write_errors_driver(
2356        stage: Stage,
2357        origins: Vec<Origin>,
2358    ) {
2359        let (client, event_rx) = make_mocked_client();
2360        let client = Arc::new(client);
2361        client.start_with_default_executor();
2362        client
2363            .data_store
2364            .write()
2365            .upsert(
2366                "stage-flag",
2367                PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2368            )
2369            .expect("patch should apply");
2370
2371        let mut migrator = MigratorBuilder::new(client.clone())
2372            .track_latency(true)
2373            .read(
2374                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2375                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2376                None,
2377            )
2378            .write(
2379                |_| async move { Err("fail".into()) }.boxed(),
2380                |_| async move { Err("fail".into()) }.boxed(),
2381            )
2382            .build()
2383            .expect("migrator should build");
2384
2385        let context = ContextBuilder::new("bob")
2386            .build()
2387            .expect("Failed to create context");
2388
2389        migrator
2390            .write(
2391                &context,
2392                "stage-flag".into(),
2393                Stage::Off,
2394                serde_json::Value::Null,
2395            )
2396            .await;
2397
2398        client.flush();
2399        client.close();
2400
2401        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2402        assert_eq!(events.len(), 3);
2403        match &events[1] {
2404            OutputEvent::MigrationOp(event) => {
2405                assert!(event.errors.len() == origins.len());
2406                assert!(event.errors.iter().all(|i| origins.contains(i)));
2407            }
2408            _ => panic!("Expected migration event"),
2409        }
2410    }
2411
2412    #[tokio::test]
2413    async fn migration_tracks_nonauthoritative_write_errors() {
2414        migration_tracks_nonauthoritative_write_errors_driver(
2415            Stage::DualWrite,
2416            false,
2417            true,
2418            vec![Origin::New],
2419        )
2420        .await;
2421        migration_tracks_nonauthoritative_write_errors_driver(
2422            Stage::Shadow,
2423            false,
2424            true,
2425            vec![Origin::New],
2426        )
2427        .await;
2428        migration_tracks_nonauthoritative_write_errors_driver(
2429            Stage::Live,
2430            true,
2431            false,
2432            vec![Origin::Old],
2433        )
2434        .await;
2435        migration_tracks_nonauthoritative_write_errors_driver(
2436            Stage::Rampdown,
2437            true,
2438            false,
2439            vec![Origin::Old],
2440        )
2441        .await;
2442    }
2443
2444    async fn migration_tracks_nonauthoritative_write_errors_driver(
2445        stage: Stage,
2446        fail_old: bool,
2447        fail_new: bool,
2448        origins: Vec<Origin>,
2449    ) {
2450        let (client, event_rx) = make_mocked_client();
2451        let client = Arc::new(client);
2452        client.start_with_default_executor();
2453        client
2454            .data_store
2455            .write()
2456            .upsert(
2457                "stage-flag",
2458                PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2459            )
2460            .expect("patch should apply");
2461
2462        let mut migrator = MigratorBuilder::new(client.clone())
2463            .track_latency(true)
2464            .read(
2465                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2466                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2467                None,
2468            )
2469            .write(
2470                move |_| {
2471                    async move {
2472                        if fail_old {
2473                            Err("fail".into())
2474                        } else {
2475                            Ok(serde_json::Value::Null)
2476                        }
2477                    }
2478                    .boxed()
2479                },
2480                move |_| {
2481                    async move {
2482                        if fail_new {
2483                            Err("fail".into())
2484                        } else {
2485                            Ok(serde_json::Value::Null)
2486                        }
2487                    }
2488                    .boxed()
2489                },
2490            )
2491            .build()
2492            .expect("migrator should build");
2493
2494        let context = ContextBuilder::new("bob")
2495            .build()
2496            .expect("Failed to create context");
2497
2498        migrator
2499            .write(
2500                &context,
2501                "stage-flag".into(),
2502                Stage::Off,
2503                serde_json::Value::Null,
2504            )
2505            .await;
2506
2507        client.flush();
2508        client.close();
2509
2510        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2511        assert_eq!(events.len(), 3);
2512        match &events[1] {
2513            OutputEvent::MigrationOp(event) => {
2514                assert!(event.errors.len() == origins.len());
2515                assert!(event.errors.iter().all(|i| origins.contains(i)));
2516            }
2517            _ => panic!("Expected migration event"),
2518        }
2519    }
2520
2521    #[tokio::test]
2522    async fn migration_tracks_consistency() {
2523        migration_tracks_consistency_driver(Stage::Shadow, "same", "same", true).await;
2524        migration_tracks_consistency_driver(Stage::Shadow, "same", "different", false).await;
2525        migration_tracks_consistency_driver(Stage::Live, "same", "same", true).await;
2526        migration_tracks_consistency_driver(Stage::Live, "same", "different", false).await;
2527    }
2528
2529    async fn migration_tracks_consistency_driver(
2530        stage: Stage,
2531        old_return: &'static str,
2532        new_return: &'static str,
2533        expected_consistency: bool,
2534    ) {
2535        let (client, event_rx) = make_mocked_client();
2536        let client = Arc::new(client);
2537        client.start_with_default_executor();
2538        client
2539            .data_store
2540            .write()
2541            .upsert(
2542                "stage-flag",
2543                PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2544            )
2545            .expect("patch should apply");
2546
2547        let mut migrator = MigratorBuilder::new(client.clone())
2548            .track_latency(true)
2549            .read(
2550                |_| {
2551                    async move {
2552                        async_std::task::sleep(Duration::from_millis(100)).await;
2553                        Ok(serde_json::Value::String(old_return.to_string()))
2554                    }
2555                    .boxed()
2556                },
2557                |_| {
2558                    async move {
2559                        async_std::task::sleep(Duration::from_millis(100)).await;
2560                        Ok(serde_json::Value::String(new_return.to_string()))
2561                    }
2562                    .boxed()
2563                },
2564                Some(|lhs, rhs| lhs == rhs),
2565            )
2566            .write(
2567                |_| {
2568                    async move {
2569                        async_std::task::sleep(Duration::from_millis(100)).await;
2570                        Ok(serde_json::Value::Null)
2571                    }
2572                    .boxed()
2573                },
2574                |_| {
2575                    async move {
2576                        async_std::task::sleep(Duration::from_millis(100)).await;
2577                        Ok(serde_json::Value::Null)
2578                    }
2579                    .boxed()
2580                },
2581            )
2582            .build()
2583            .expect("migrator should build");
2584
2585        let context = ContextBuilder::new("bob")
2586            .build()
2587            .expect("Failed to create context");
2588
2589        migrator
2590            .read(
2591                &context,
2592                "stage-flag".into(),
2593                Stage::Off,
2594                serde_json::Value::Null,
2595            )
2596            .await;
2597
2598        client.flush();
2599        client.close();
2600
2601        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2602        assert_eq!(events.len(), 3);
2603        match &events[1] {
2604            OutputEvent::MigrationOp(event) => {
2605                assert!(event.consistency_check == Some(expected_consistency))
2606            }
2607            _ => panic!("Expected migration event"),
2608        }
2609    }
2610
2611    fn make_mocked_client_with_delay(
2612        delay: u64,
2613        offline: bool,
2614        daemon_mode: bool,
2615    ) -> (Client, Receiver<OutputEvent>) {
2616        let updates = Arc::new(MockDataSource::new_with_init_delay(delay));
2617        let (event_sender, event_rx) = create_event_sender();
2618
2619        let config = ConfigBuilder::new("sdk-key")
2620            .offline(offline)
2621            .daemon_mode(daemon_mode)
2622            .data_source(MockDataSourceBuilder::new().data_source(updates))
2623            .event_processor(
2624                EventProcessorBuilder::<HttpConnector>::new().event_sender(Arc::new(event_sender)),
2625            )
2626            .build()
2627            .expect("config should build");
2628
2629        let client = Client::build(config).expect("Should be built.");
2630
2631        (client, event_rx)
2632    }
2633
2634    fn make_mocked_offline_client() -> (Client, Receiver<OutputEvent>) {
2635        make_mocked_client_with_delay(0, true, false)
2636    }
2637
2638    fn make_mocked_client() -> (Client, Receiver<OutputEvent>) {
2639        make_mocked_client_with_delay(0, false, false)
2640    }
2641}