Skip to main content

launchdarkly_server_sdk/events/
processor_builders.rs

1use std::collections::{HashMap, HashSet};
2use std::num::NonZeroUsize;
3use std::str::FromStr;
4use std::sync::Arc;
5use std::time::Duration;
6
7use http::Uri;
8use launchdarkly_server_sdk_evaluation::Reference;
9use thiserror::Error;
10
11use crate::events::sender::HttpEventSender;
12use crate::{service_endpoints, LAUNCHDARKLY_TAGS_HEADER};
13use launchdarkly_sdk_transport::HttpTransport;
14
15use super::processor::{
16    EventProcessor, EventProcessorError, EventProcessorImpl, NullEventProcessor,
17};
18use super::sender::EventSender;
19use super::EventsConfiguration;
20
21const DEFAULT_FLUSH_POLL_INTERVAL: Duration = Duration::from_secs(5);
22const DEFAULT_EVENT_CAPACITY: usize = 500;
23// The capacity will be set to max(DEFAULT_CONTEXT_KEY_CAPACITY, 1), meaning
24// caching cannot be entirely disabled.
25const DEFAULT_CONTEXT_KEY_CAPACITY: Option<NonZeroUsize> = NonZeroUsize::new(1000);
26const DEFAULT_CONTEXT_KEYS_FLUSH_INTERVAL: Duration = Duration::from_secs(5 * 60);
27
28/// Error type used to represent failures when building an [EventProcessor] instance.
29#[non_exhaustive]
30#[derive(Debug, Error)]
31pub enum BuildError {
32    /// Error used when a configuration setting is invalid.
33    #[error("event processor factory failed to build: {0}")]
34    InvalidConfig(String),
35
36    /// Error used when the event processor's thread fails to start
37    #[error(transparent)]
38    FailedToStart(EventProcessorError),
39}
40
41/// Trait which allows creation of event processors. Should be implemented by event processor
42/// builder types.
43pub trait EventProcessorFactory {
44    fn build(
45        &self,
46        endpoints: &service_endpoints::ServiceEndpoints,
47        sdk_key: &str,
48        tags: Option<String>,
49    ) -> Result<Arc<dyn EventProcessor>, BuildError>;
50    fn to_owned(&self) -> Box<dyn EventProcessorFactory>;
51}
52
53/// Contains methods for configuring delivery of analytics events.
54///
55/// The SDK normally buffers analytics events and sends them to LaunchDarkly at intervals. If you want
56/// to customize this behavior, create a builder with [crate::EventProcessorBuilder::new], change its
57/// properties with the methods of this struct, and pass it to [crate::ConfigBuilder::event_processor].
58///
59/// # Examples
60///
61/// Adjust the flush interval
62/// ```
63/// # use launchdarkly_server_sdk::{EventProcessorBuilder, ConfigBuilder};
64/// # use launchdarkly_sdk_transport::HyperTransport;
65/// # use std::time::Duration;
66/// # fn main() {
67///     ConfigBuilder::new("sdk-key").event_processor(EventProcessorBuilder::<HyperTransport>::new()
68///         .flush_interval(Duration::from_secs(10)));
69/// # }
70/// ```
71#[derive(Clone)]
72pub struct EventProcessorBuilder<T: HttpTransport = launchdarkly_sdk_transport::HyperTransport> {
73    capacity: usize,
74    flush_interval: Duration,
75    context_keys_capacity: NonZeroUsize,
76    context_keys_flush_interval: Duration,
77    event_sender: Option<Arc<dyn EventSender>>,
78    all_attributes_private: bool,
79    private_attributes: HashSet<Reference>,
80    transport: Option<T>,
81    omit_anonymous_contexts: bool,
82    compress_events: bool,
83    // diagnostic_recording_interval: Duration
84}
85
86impl<T: HttpTransport> EventProcessorFactory for EventProcessorBuilder<T> {
87    fn build(
88        &self,
89        endpoints: &service_endpoints::ServiceEndpoints,
90        sdk_key: &str,
91        tags: Option<String>,
92    ) -> Result<Arc<dyn EventProcessor>, BuildError> {
93        let url_string = format!("{}/bulk", endpoints.events_base_url());
94
95        let mut default_headers = HashMap::<&str, String>::new();
96
97        if let Some(tags) = tags {
98            default_headers.insert(LAUNCHDARKLY_TAGS_HEADER, tags);
99        }
100
101        let event_sender_result: Result<Arc<dyn EventSender>, BuildError> =
102            // NOTE: This would only be possible under unit testing conditions.
103            if let Some(event_sender) = &self.event_sender {
104                Ok(event_sender.clone())
105            } else if let Some(transport) = &self.transport {
106                Ok(Arc::new(HttpEventSender::new(
107                    transport.clone(),
108                    Uri::from_str(url_string.as_str()).unwrap(),
109                    sdk_key,
110                    default_headers,
111                    self.compress_events,
112                )))
113            } else {
114                #[cfg(any(
115                    feature = "hyper-rustls-native-roots",
116                    feature = "hyper-rustls-webpki-roots",
117                    feature = "native-tls"
118                ))]
119                {
120                    let transport = launchdarkly_sdk_transport::HyperTransport::new_https().map_err(|e| {
121                        BuildError::InvalidConfig(format!(
122                            "failed to create default https transport: {}",
123                            e
124                        ))
125                    })?;
126                    Ok(Arc::new(HttpEventSender::new(
127                        transport,
128                        Uri::from_str(url_string.as_str()).unwrap(),
129                        sdk_key,
130                        default_headers,
131                        self.compress_events,
132                    )))
133                }
134                #[cfg(not(any(
135                    feature = "hyper-rustls-native-roots",
136                    feature = "hyper-rustls-webpki-roots",
137                    feature = "native-tls"
138                )))]
139                Err(BuildError::InvalidConfig(
140                    "transport is required when hyper-rustls-native-roots, hyper-rustls-webpki-roots, or native-tls features are disabled".into(),
141                ))
142            };
143        let event_sender = event_sender_result?;
144
145        let events_configuration = EventsConfiguration {
146            event_sender,
147            capacity: self.capacity,
148            flush_interval: self.flush_interval,
149            context_keys_capacity: self.context_keys_capacity,
150            context_keys_flush_interval: self.context_keys_flush_interval,
151            all_attributes_private: self.all_attributes_private,
152            private_attributes: self.private_attributes.clone(),
153            omit_anonymous_contexts: self.omit_anonymous_contexts,
154        };
155
156        let events_processor =
157            EventProcessorImpl::new(events_configuration).map_err(BuildError::FailedToStart)?;
158
159        Ok(Arc::new(events_processor))
160    }
161
162    fn to_owned(&self) -> Box<dyn EventProcessorFactory> {
163        Box::new(self.clone())
164    }
165}
166
167impl<T: HttpTransport> EventProcessorBuilder<T> {
168    /// Create a new [EventProcessorBuilder] with all default values.
169    pub fn new() -> Self {
170        Self {
171            capacity: DEFAULT_EVENT_CAPACITY,
172            flush_interval: DEFAULT_FLUSH_POLL_INTERVAL,
173            context_keys_capacity: DEFAULT_CONTEXT_KEY_CAPACITY
174                .unwrap_or_else(|| NonZeroUsize::new(1).unwrap()),
175            context_keys_flush_interval: DEFAULT_CONTEXT_KEYS_FLUSH_INTERVAL,
176            event_sender: None,
177            all_attributes_private: false,
178            private_attributes: HashSet::new(),
179            omit_anonymous_contexts: false,
180            transport: None,
181            #[cfg(feature = "event-compression")]
182            compress_events: true,
183            #[cfg(not(feature = "event-compression"))]
184            compress_events: false,
185        }
186    }
187
188    /// Set the capacity of the events buffer.
189    ///
190    /// The client buffers up to this many events in memory before flushing. If the capacity is exceeded before
191    /// the buffer is flushed [crate::EventProcessor::flush], events will be discarded. Increasing the
192    /// capacity means that events are less likely to be discarded, at the cost of consuming more memory.
193    ///
194    pub fn capacity(&mut self, capacity: usize) -> &mut Self {
195        self.capacity = capacity;
196        self
197    }
198
199    /// Sets the interval between flushes of the event buffer.
200    ///
201    /// Decreasing the flush interval means that the event buffer is less likely to reach capacity.
202    pub fn flush_interval(&mut self, flush_interval: Duration) -> &mut Self {
203        self.flush_interval = flush_interval;
204        self
205    }
206
207    /// Sets the number of context keys that the event processor can remember at any one time.
208    ///
209    /// To avoid sending duplicate context details in analytics events, the SDK maintains a cache of
210    /// recently seen context keys.
211    pub fn context_keys_capacity(&mut self, context_keys_capacity: NonZeroUsize) -> &mut Self {
212        self.context_keys_capacity = context_keys_capacity;
213        self
214    }
215
216    /// Sets the interval at which the event processor will reset its cache of known context keys.
217    pub fn context_keys_flush_interval(
218        &mut self,
219        context_keys_flush_interval: Duration,
220    ) -> &mut Self {
221        self.context_keys_flush_interval = context_keys_flush_interval;
222        self
223    }
224
225    /// Sets whether or not all optional user attributes should be hidden from LaunchDarkly.
226    ///
227    /// If this is true, all user attribute values (other than the key) will be private, not just the attributes
228    /// specified with private_attributes or on a per-user basis with UserBuilder methods. By default, it is false.
229    pub fn all_attributes_private(&mut self, all_attributes_private: bool) -> &mut Self {
230        self.all_attributes_private = all_attributes_private;
231        self
232    }
233
234    /// Marks a set of attribute names as always private.
235    ///
236    /// Any users sent to LaunchDarkly with this configuration active will have attributes with these
237    /// names removed. This is in addition to any attributes that were marked as private for an
238    /// individual user with UserBuilder methods. Setting all_attribute_private to true overrides this.
239    pub fn private_attributes<R>(&mut self, attributes: HashSet<R>) -> &mut Self
240    where
241        R: Into<Reference>,
242    {
243        self.private_attributes = attributes.into_iter().map(|a| a.into()).collect();
244        self
245    }
246
247    /// Sets the transport for the event sender to use. This allows for re-use of a transport
248    /// between multiple client instances. This is especially useful for the `sdk-test-harness`
249    /// where many client instances are created throughout the test and reading the native
250    /// certificates is a substantial portion of the runtime.
251    pub fn transport(&mut self, transport: T) -> &mut Self {
252        self.transport = Some(transport);
253        self
254    }
255
256    /// Sets whether anonymous contexts should be omitted from index and identify events.
257    ///
258    /// The default is false, meaning that anonymous contexts will be included in index and
259    /// identify events.
260    pub fn omit_anonymous_contexts(&mut self, omit: bool) -> &mut Self {
261        self.omit_anonymous_contexts = omit;
262        self
263    }
264
265    #[cfg(feature = "event-compression")]
266    /// Should the event payload sent to LaunchDarkly use gzip compression. By
267    /// default this is true.
268    //
269    /// Customers using the relay proxy are encouraged to disable this feature to avoid unnecessary
270    /// CPU overhead, as the relay proxy will decompress & recompress the payloads.
271    pub fn compress_events(&mut self, enabled: bool) -> &mut Self {
272        self.compress_events = enabled;
273        self
274    }
275
276    #[cfg(test)]
277    /// Test only functionality that allows us to override the event sender.
278    pub fn event_sender(&mut self, event_sender: Arc<dyn EventSender>) -> &mut Self {
279        self.event_sender = Some(event_sender);
280        self
281    }
282}
283
284impl<T: HttpTransport> Default for EventProcessorBuilder<T> {
285    fn default() -> Self {
286        Self::new()
287    }
288}
289
290/// An implementation of EventProcessorFactory that will discard all events received. This should
291/// only be used for unit tests.
292#[derive(Clone)]
293pub struct NullEventProcessorBuilder {}
294
295impl EventProcessorFactory for NullEventProcessorBuilder {
296    fn build(
297        &self,
298        _: &service_endpoints::ServiceEndpoints,
299        _: &str,
300        _: Option<String>,
301    ) -> Result<Arc<dyn EventProcessor>, BuildError> {
302        Ok(Arc::new(NullEventProcessor::new()))
303    }
304
305    fn to_owned(&self) -> Box<dyn EventProcessorFactory> {
306        Box::new(self.clone())
307    }
308}
309
310impl NullEventProcessorBuilder {
311    /// Create a new [NullEventProcessorBuilder] with all default values.
312    pub fn new() -> Self {
313        Self {}
314    }
315}
316
317impl Default for NullEventProcessorBuilder {
318    fn default() -> Self {
319        Self::new()
320    }
321}
322
323#[cfg(test)]
324mod tests {
325    use launchdarkly_server_sdk_evaluation::ContextBuilder;
326    use maplit::hashset;
327    use mockito::Matcher;
328    use test_case::test_case;
329
330    use crate::{events::event::EventFactory, ServiceEndpointsBuilder};
331
332    use super::*;
333
334    #[test]
335    fn default_builder_has_correct_defaults() {
336        let builder = EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
337        assert_eq!(builder.capacity, DEFAULT_EVENT_CAPACITY);
338        assert_eq!(builder.flush_interval, DEFAULT_FLUSH_POLL_INTERVAL);
339    }
340
341    #[test]
342    fn capacity_can_be_adjusted() {
343        let mut builder =
344            EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
345        builder.capacity(1234);
346        assert_eq!(builder.capacity, 1234);
347    }
348
349    #[test]
350    fn flush_interval_can_be_adjusted() {
351        let mut builder =
352            EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
353        builder.flush_interval(Duration::from_secs(1234));
354        assert_eq!(builder.flush_interval, Duration::from_secs(1234));
355    }
356
357    #[test]
358    fn context_keys_capacity_can_be_adjusted() {
359        let mut builder =
360            EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
361        let cap = NonZeroUsize::new(1234).expect("1234 > 0");
362        builder.context_keys_capacity(cap);
363        assert_eq!(builder.context_keys_capacity, cap);
364    }
365
366    #[test]
367    fn context_keys_flush_interval_can_be_adjusted() {
368        let mut builder =
369            EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
370        builder.context_keys_flush_interval(Duration::from_secs(1000));
371        assert_eq!(
372            builder.context_keys_flush_interval,
373            Duration::from_secs(1000)
374        );
375    }
376
377    #[test]
378    fn all_attribute_private_can_be_adjusted() {
379        let mut builder =
380            EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
381
382        assert!(!builder.all_attributes_private);
383        builder.all_attributes_private(true);
384        assert!(builder.all_attributes_private);
385    }
386
387    #[test]
388    fn attribte_names_can_be_adjusted() {
389        let mut builder =
390            EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
391
392        assert!(builder.private_attributes.is_empty());
393        builder.private_attributes(hashset!["name"]);
394        assert!(builder.private_attributes.contains(&"name".into()));
395    }
396
397    #[test_case(Some("application-id/abc:application-sha/xyz".into()), "application-id/abc:application-sha/xyz")]
398    #[test_case(None, Matcher::Missing)]
399    #[cfg(any(
400        feature = "hyper-rustls-native-roots",
401        feature = "hyper-rustls-webpki-roots",
402        feature = "native-tls"
403    ))]
404    fn processor_sends_correct_headers(tag: Option<String>, matcher: impl Into<Matcher>) {
405        let mut server = mockito::Server::new();
406        let mock = server
407            .mock("POST", "/bulk")
408            .with_status(200)
409            .expect_at_least(1)
410            .match_header(LAUNCHDARKLY_TAGS_HEADER, matcher)
411            .create();
412
413        let service_endpoints = ServiceEndpointsBuilder::new()
414            .events_base_url(&server.url())
415            .polling_base_url(&server.url())
416            .streaming_base_url(&server.url())
417            .build()
418            .expect("Service endpoints failed to be created");
419
420        let builder = EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
421        let processor = builder
422            .build(&service_endpoints, "sdk-key", tag)
423            .expect("Processor failed to build");
424
425        let event_factory = EventFactory::new(false);
426
427        let context = ContextBuilder::new("bob")
428            .build()
429            .expect("Failed to create context");
430        let identify_event = event_factory.new_identify(context);
431
432        processor.send(identify_event);
433        processor.close();
434
435        mock.assert()
436    }
437}