Skip to main content

launchdarkly_server_sdk/events/
processor_builders.rs

1use std::collections::{HashMap, HashSet};
2use std::num::NonZeroUsize;
3use std::str::FromStr;
4use std::sync::Arc;
5use std::time::Duration;
6
7use http::Uri;
8use launchdarkly_server_sdk_evaluation::Reference;
9use thiserror::Error;
10
11use crate::events::sender::HttpEventSender;
12use crate::{service_endpoints, LAUNCHDARKLY_INSTANCE_ID_HEADER, LAUNCHDARKLY_TAGS_HEADER};
13use launchdarkly_sdk_transport::HttpTransport;
14
15use super::processor::{
16    EventProcessor, EventProcessorError, EventProcessorImpl, NullEventProcessor,
17};
18use super::sender::EventSender;
19use super::EventsConfiguration;
20
21const DEFAULT_FLUSH_POLL_INTERVAL: Duration = Duration::from_secs(5);
22const DEFAULT_EVENT_CAPACITY: usize = 500;
23// The capacity will be set to max(DEFAULT_CONTEXT_KEY_CAPACITY, 1), meaning
24// caching cannot be entirely disabled.
25const DEFAULT_CONTEXT_KEY_CAPACITY: Option<NonZeroUsize> = NonZeroUsize::new(1000);
26const DEFAULT_CONTEXT_KEYS_FLUSH_INTERVAL: Duration = Duration::from_secs(5 * 60);
27
28/// Error type used to represent failures when building an [EventProcessor] instance.
29#[non_exhaustive]
30#[derive(Debug, Error)]
31pub enum BuildError {
32    /// Error used when a configuration setting is invalid.
33    #[error("event processor factory failed to build: {0}")]
34    InvalidConfig(String),
35
36    /// Error used when the event processor's thread fails to start
37    #[error(transparent)]
38    FailedToStart(EventProcessorError),
39}
40
41/// Trait which allows creation of event processors. Should be implemented by event processor
42/// builder types.
43pub trait EventProcessorFactory {
44    fn build(
45        &self,
46        endpoints: &service_endpoints::ServiceEndpoints,
47        sdk_key: &str,
48        tags: Option<String>,
49    ) -> Result<Arc<dyn EventProcessor>, BuildError>;
50
51    /// Sets the per-SDK-instance identifier used to populate the
52    /// `X-LaunchDarkly-Instance-Id` header on outbound requests. LD-owned builders override
53    /// this to stamp the header on event posts. External implementors of this trait may
54    /// ignore this — the default no-op is correct unless the implementor constructs HTTP
55    /// clients that talk to LaunchDarkly's API.
56    fn set_instance_id(&mut self, _instance_id: String) {}
57
58    fn to_owned(&self) -> Box<dyn EventProcessorFactory>;
59}
60
61/// Contains methods for configuring delivery of analytics events.
62///
63/// The SDK normally buffers analytics events and sends them to LaunchDarkly at intervals. If you want
64/// to customize this behavior, create a builder with [crate::EventProcessorBuilder::new], change its
65/// properties with the methods of this struct, and pass it to [crate::ConfigBuilder::event_processor].
66///
67/// # Examples
68///
69/// Adjust the flush interval
70/// ```
71/// # use launchdarkly_server_sdk::{EventProcessorBuilder, ConfigBuilder};
72/// # use launchdarkly_sdk_transport::HyperTransport;
73/// # use std::time::Duration;
74/// # fn main() {
75///     ConfigBuilder::new("sdk-key").event_processor(EventProcessorBuilder::<HyperTransport>::new()
76///         .flush_interval(Duration::from_secs(10)));
77/// # }
78/// ```
79#[derive(Clone)]
80pub struct EventProcessorBuilder<T: HttpTransport = launchdarkly_sdk_transport::HyperTransport> {
81    capacity: usize,
82    flush_interval: Duration,
83    context_keys_capacity: NonZeroUsize,
84    context_keys_flush_interval: Duration,
85    event_sender: Option<Arc<dyn EventSender>>,
86    all_attributes_private: bool,
87    private_attributes: HashSet<Reference>,
88    transport: Option<T>,
89    omit_anonymous_contexts: bool,
90    compress_events: bool,
91    instance_id: Option<String>,
92    // diagnostic_recording_interval: Duration
93}
94
95impl<T: HttpTransport> EventProcessorFactory for EventProcessorBuilder<T> {
96    fn build(
97        &self,
98        endpoints: &service_endpoints::ServiceEndpoints,
99        sdk_key: &str,
100        tags: Option<String>,
101    ) -> Result<Arc<dyn EventProcessor>, BuildError> {
102        let url_string = format!("{}/bulk", endpoints.events_base_url());
103
104        let mut default_headers = HashMap::<&str, String>::new();
105
106        if let Some(tags) = tags {
107            default_headers.insert(LAUNCHDARKLY_TAGS_HEADER, tags);
108        }
109        if let Some(instance_id) = &self.instance_id {
110            default_headers.insert(LAUNCHDARKLY_INSTANCE_ID_HEADER, instance_id.clone());
111        }
112
113        let event_sender_result: Result<Arc<dyn EventSender>, BuildError> =
114            // NOTE: This would only be possible under unit testing conditions.
115            if let Some(event_sender) = &self.event_sender {
116                Ok(event_sender.clone())
117            } else if let Some(transport) = &self.transport {
118                Ok(Arc::new(HttpEventSender::new(
119                    transport.clone(),
120                    Uri::from_str(url_string.as_str()).unwrap(),
121                    sdk_key,
122                    default_headers,
123                    self.compress_events,
124                )))
125            } else {
126                #[cfg(any(
127                    feature = "hyper-rustls-native-roots",
128                    feature = "hyper-rustls-webpki-roots",
129                    feature = "native-tls"
130                ))]
131                {
132                    let transport = launchdarkly_sdk_transport::HyperTransport::new_https().map_err(|e| {
133                        BuildError::InvalidConfig(format!(
134                            "failed to create default https transport: {}",
135                            e
136                        ))
137                    })?;
138                    Ok(Arc::new(HttpEventSender::new(
139                        transport,
140                        Uri::from_str(url_string.as_str()).unwrap(),
141                        sdk_key,
142                        default_headers,
143                        self.compress_events,
144                    )))
145                }
146                #[cfg(not(any(
147                    feature = "hyper-rustls-native-roots",
148                    feature = "hyper-rustls-webpki-roots",
149                    feature = "native-tls"
150                )))]
151                Err(BuildError::InvalidConfig(
152                    "transport is required when hyper-rustls-native-roots, hyper-rustls-webpki-roots, or native-tls features are disabled".into(),
153                ))
154            };
155        let event_sender = event_sender_result?;
156
157        let events_configuration = EventsConfiguration {
158            event_sender,
159            capacity: self.capacity,
160            flush_interval: self.flush_interval,
161            context_keys_capacity: self.context_keys_capacity,
162            context_keys_flush_interval: self.context_keys_flush_interval,
163            all_attributes_private: self.all_attributes_private,
164            private_attributes: self.private_attributes.clone(),
165            omit_anonymous_contexts: self.omit_anonymous_contexts,
166        };
167
168        let events_processor =
169            EventProcessorImpl::new(events_configuration).map_err(BuildError::FailedToStart)?;
170
171        Ok(Arc::new(events_processor))
172    }
173
174    fn set_instance_id(&mut self, instance_id: String) {
175        self.instance_id = Some(instance_id);
176    }
177
178    fn to_owned(&self) -> Box<dyn EventProcessorFactory> {
179        Box::new(self.clone())
180    }
181}
182
183impl<T: HttpTransport> EventProcessorBuilder<T> {
184    /// Create a new [EventProcessorBuilder] with all default values.
185    pub fn new() -> Self {
186        Self {
187            capacity: DEFAULT_EVENT_CAPACITY,
188            flush_interval: DEFAULT_FLUSH_POLL_INTERVAL,
189            context_keys_capacity: DEFAULT_CONTEXT_KEY_CAPACITY
190                .unwrap_or_else(|| NonZeroUsize::new(1).unwrap()),
191            context_keys_flush_interval: DEFAULT_CONTEXT_KEYS_FLUSH_INTERVAL,
192            event_sender: None,
193            all_attributes_private: false,
194            private_attributes: HashSet::new(),
195            omit_anonymous_contexts: false,
196            transport: None,
197            instance_id: None,
198            #[cfg(feature = "event-compression")]
199            compress_events: true,
200            #[cfg(not(feature = "event-compression"))]
201            compress_events: false,
202        }
203    }
204
205    /// Set the capacity of the events buffer.
206    ///
207    /// The client buffers up to this many events in memory before flushing. If the capacity is exceeded before
208    /// the buffer is flushed [crate::EventProcessor::flush], events will be discarded. Increasing the
209    /// capacity means that events are less likely to be discarded, at the cost of consuming more memory.
210    ///
211    pub fn capacity(&mut self, capacity: usize) -> &mut Self {
212        self.capacity = capacity;
213        self
214    }
215
216    /// Sets the interval between flushes of the event buffer.
217    ///
218    /// Decreasing the flush interval means that the event buffer is less likely to reach capacity.
219    pub fn flush_interval(&mut self, flush_interval: Duration) -> &mut Self {
220        self.flush_interval = flush_interval;
221        self
222    }
223
224    /// Sets the number of context keys that the event processor can remember at any one time.
225    ///
226    /// To avoid sending duplicate context details in analytics events, the SDK maintains a cache of
227    /// recently seen context keys.
228    pub fn context_keys_capacity(&mut self, context_keys_capacity: NonZeroUsize) -> &mut Self {
229        self.context_keys_capacity = context_keys_capacity;
230        self
231    }
232
233    /// Sets the interval at which the event processor will reset its cache of known context keys.
234    pub fn context_keys_flush_interval(
235        &mut self,
236        context_keys_flush_interval: Duration,
237    ) -> &mut Self {
238        self.context_keys_flush_interval = context_keys_flush_interval;
239        self
240    }
241
242    /// Sets whether or not all optional user attributes should be hidden from LaunchDarkly.
243    ///
244    /// If this is true, all user attribute values (other than the key) will be private, not just the attributes
245    /// specified with private_attributes or on a per-user basis with UserBuilder methods. By default, it is false.
246    pub fn all_attributes_private(&mut self, all_attributes_private: bool) -> &mut Self {
247        self.all_attributes_private = all_attributes_private;
248        self
249    }
250
251    /// Marks a set of attribute names as always private.
252    ///
253    /// Any users sent to LaunchDarkly with this configuration active will have attributes with these
254    /// names removed. This is in addition to any attributes that were marked as private for an
255    /// individual user with UserBuilder methods. Setting all_attribute_private to true overrides this.
256    pub fn private_attributes<R>(&mut self, attributes: HashSet<R>) -> &mut Self
257    where
258        R: Into<Reference>,
259    {
260        self.private_attributes = attributes.into_iter().map(|a| a.into()).collect();
261        self
262    }
263
264    /// Sets the transport for the event sender to use. This allows for re-use of a transport
265    /// between multiple client instances. This is especially useful for the `sdk-test-harness`
266    /// where many client instances are created throughout the test and reading the native
267    /// certificates is a substantial portion of the runtime.
268    pub fn transport(&mut self, transport: T) -> &mut Self {
269        self.transport = Some(transport);
270        self
271    }
272
273    /// Sets whether anonymous contexts should be omitted from index and identify events.
274    ///
275    /// The default is false, meaning that anonymous contexts will be included in index and
276    /// identify events.
277    pub fn omit_anonymous_contexts(&mut self, omit: bool) -> &mut Self {
278        self.omit_anonymous_contexts = omit;
279        self
280    }
281
282    #[cfg(feature = "event-compression")]
283    /// Should the event payload sent to LaunchDarkly use gzip compression. By
284    /// default this is true.
285    //
286    /// Customers using the relay proxy are encouraged to disable this feature to avoid unnecessary
287    /// CPU overhead, as the relay proxy will decompress & recompress the payloads.
288    pub fn compress_events(&mut self, enabled: bool) -> &mut Self {
289        self.compress_events = enabled;
290        self
291    }
292
293    #[cfg(test)]
294    /// Test only functionality that allows us to override the event sender.
295    pub fn event_sender(&mut self, event_sender: Arc<dyn EventSender>) -> &mut Self {
296        self.event_sender = Some(event_sender);
297        self
298    }
299}
300
301impl<T: HttpTransport> Default for EventProcessorBuilder<T> {
302    fn default() -> Self {
303        Self::new()
304    }
305}
306
307/// An implementation of EventProcessorFactory that will discard all events received. This should
308/// only be used for unit tests.
309#[derive(Clone)]
310pub struct NullEventProcessorBuilder {}
311
312impl EventProcessorFactory for NullEventProcessorBuilder {
313    fn build(
314        &self,
315        _: &service_endpoints::ServiceEndpoints,
316        _: &str,
317        _: Option<String>,
318    ) -> Result<Arc<dyn EventProcessor>, BuildError> {
319        Ok(Arc::new(NullEventProcessor::new()))
320    }
321
322    fn to_owned(&self) -> Box<dyn EventProcessorFactory> {
323        Box::new(self.clone())
324    }
325}
326
327impl NullEventProcessorBuilder {
328    /// Create a new [NullEventProcessorBuilder] with all default values.
329    pub fn new() -> Self {
330        Self {}
331    }
332}
333
334impl Default for NullEventProcessorBuilder {
335    fn default() -> Self {
336        Self::new()
337    }
338}
339
340#[cfg(test)]
341mod tests {
342    use launchdarkly_server_sdk_evaluation::ContextBuilder;
343    use maplit::hashset;
344    use mockito::Matcher;
345    use test_case::test_case;
346
347    use crate::{events::event::EventFactory, ServiceEndpointsBuilder};
348
349    use super::*;
350
351    #[test]
352    fn default_builder_has_correct_defaults() {
353        let builder = EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
354        assert_eq!(builder.capacity, DEFAULT_EVENT_CAPACITY);
355        assert_eq!(builder.flush_interval, DEFAULT_FLUSH_POLL_INTERVAL);
356    }
357
358    #[test]
359    fn capacity_can_be_adjusted() {
360        let mut builder =
361            EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
362        builder.capacity(1234);
363        assert_eq!(builder.capacity, 1234);
364    }
365
366    #[test]
367    fn flush_interval_can_be_adjusted() {
368        let mut builder =
369            EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
370        builder.flush_interval(Duration::from_secs(1234));
371        assert_eq!(builder.flush_interval, Duration::from_secs(1234));
372    }
373
374    #[test]
375    fn context_keys_capacity_can_be_adjusted() {
376        let mut builder =
377            EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
378        let cap = NonZeroUsize::new(1234).expect("1234 > 0");
379        builder.context_keys_capacity(cap);
380        assert_eq!(builder.context_keys_capacity, cap);
381    }
382
383    #[test]
384    fn context_keys_flush_interval_can_be_adjusted() {
385        let mut builder =
386            EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
387        builder.context_keys_flush_interval(Duration::from_secs(1000));
388        assert_eq!(
389            builder.context_keys_flush_interval,
390            Duration::from_secs(1000)
391        );
392    }
393
394    #[test]
395    fn all_attribute_private_can_be_adjusted() {
396        let mut builder =
397            EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
398
399        assert!(!builder.all_attributes_private);
400        builder.all_attributes_private(true);
401        assert!(builder.all_attributes_private);
402    }
403
404    #[test]
405    fn attribte_names_can_be_adjusted() {
406        let mut builder =
407            EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
408
409        assert!(builder.private_attributes.is_empty());
410        builder.private_attributes(hashset!["name"]);
411        assert!(builder.private_attributes.contains(&"name".into()));
412    }
413
414    #[test_case(Some("application-id/abc:application-sha/xyz".into()), "application-id/abc:application-sha/xyz")]
415    #[test_case(None, Matcher::Missing)]
416    #[cfg(any(
417        feature = "hyper-rustls-native-roots",
418        feature = "hyper-rustls-webpki-roots",
419        feature = "native-tls"
420    ))]
421    fn processor_sends_correct_headers(tag: Option<String>, matcher: impl Into<Matcher>) {
422        let mut server = mockito::Server::new();
423        let mock = server
424            .mock("POST", "/bulk")
425            .with_status(200)
426            .expect_at_least(1)
427            .match_header(LAUNCHDARKLY_TAGS_HEADER, matcher)
428            .create();
429
430        let service_endpoints = ServiceEndpointsBuilder::new()
431            .events_base_url(&server.url())
432            .polling_base_url(&server.url())
433            .streaming_base_url(&server.url())
434            .build()
435            .expect("Service endpoints failed to be created");
436
437        let builder = EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
438        let processor = builder
439            .build(&service_endpoints, "sdk-key", tag)
440            .expect("Processor failed to build");
441
442        let event_factory = EventFactory::new(false);
443
444        let context = ContextBuilder::new("bob")
445            .build()
446            .expect("Failed to create context");
447        let identify_event = event_factory.new_identify(context);
448
449        processor.send(identify_event);
450        processor.close();
451
452        mock.assert()
453    }
454
455    // Verifies that event POSTs carry the X-LaunchDarkly-Instance-Id header when one has been
456    // injected via the trait's set_instance_id setter (the path Client::build uses).
457    #[cfg(any(
458        feature = "hyper-rustls-native-roots",
459        feature = "hyper-rustls-webpki-roots",
460        feature = "native-tls"
461    ))]
462    #[test]
463    fn processor_sends_instance_id_header() {
464        let mut server = mockito::Server::new();
465        let instance_id = uuid::Uuid::new_v4().to_string();
466        let mock = server
467            .mock("POST", "/bulk")
468            .with_status(200)
469            .expect_at_least(1)
470            .match_header(LAUNCHDARKLY_INSTANCE_ID_HEADER, instance_id.as_str())
471            .create();
472
473        let service_endpoints = ServiceEndpointsBuilder::new()
474            .events_base_url(&server.url())
475            .polling_base_url(&server.url())
476            .streaming_base_url(&server.url())
477            .build()
478            .expect("Service endpoints failed to be created");
479
480        let mut builder =
481            EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
482        builder.set_instance_id(instance_id.clone());
483        let processor = builder
484            .build(&service_endpoints, "sdk-key", None)
485            .expect("Processor failed to build");
486
487        let event_factory = EventFactory::new(false);
488
489        let context = ContextBuilder::new("bob")
490            .build()
491            .expect("Failed to create context");
492        let identify_event = event_factory.new_identify(context);
493
494        processor.send(identify_event);
495        processor.close();
496
497        mock.assert()
498    }
499}