launchdarkly_server_sdk/events/
processor_builders.rs

1use std::collections::{HashMap, HashSet};
2use std::num::NonZeroUsize;
3use std::str::FromStr;
4use std::sync::Arc;
5use std::time::Duration;
6
7use hyper::client::connect::Connection;
8use hyper::service::Service;
9use hyper::Uri;
10#[cfg(feature = "rustls")]
11use hyper_rustls::HttpsConnectorBuilder;
12use launchdarkly_server_sdk_evaluation::Reference;
13use thiserror::Error;
14use tokio::io::{AsyncRead, AsyncWrite};
15
16use crate::events::sender::HyperEventSender;
17use crate::{service_endpoints, LAUNCHDARKLY_TAGS_HEADER};
18
19use super::processor::{
20    EventProcessor, EventProcessorError, EventProcessorImpl, NullEventProcessor,
21};
22use super::sender::EventSender;
23use super::EventsConfiguration;
24
25const DEFAULT_FLUSH_POLL_INTERVAL: Duration = Duration::from_secs(5);
26const DEFAULT_EVENT_CAPACITY: usize = 500;
27// The capacity will be set to max(DEFAULT_CONTEXT_KEY_CAPACITY, 1), meaning
28// caching cannot be entirely disabled.
29const DEFAULT_CONTEXT_KEY_CAPACITY: Option<NonZeroUsize> = NonZeroUsize::new(1000);
30const DEFAULT_CONTEXT_KEYS_FLUSH_INTERVAL: Duration = Duration::from_secs(5 * 60);
31
32/// Error type used to represent failures when building an [EventProcessor] instance.
33#[non_exhaustive]
34#[derive(Debug, Error)]
35pub enum BuildError {
36    /// Error used when a configuration setting is invalid.
37    #[error("event processor factory failed to build: {0}")]
38    InvalidConfig(String),
39
40    /// Error used when the event processor's thread fails to start
41    #[error(transparent)]
42    FailedToStart(EventProcessorError),
43}
44
45/// Trait which allows creation of event processors. Should be implemented by event processor
46/// builder types.
47pub trait EventProcessorFactory {
48    fn build(
49        &self,
50        endpoints: &service_endpoints::ServiceEndpoints,
51        sdk_key: &str,
52        tags: Option<String>,
53    ) -> Result<Arc<dyn EventProcessor>, BuildError>;
54    fn to_owned(&self) -> Box<dyn EventProcessorFactory>;
55}
56
57/// Contains methods for configuring delivery of analytics events.
58///
59/// The SDK normally buffers analytics events and sends them to LaunchDarkly at intervals. If you want
60/// to customize this behavior, create a builder with [crate::EventProcessorBuilder::new], change its
61/// properties with the methods of this struct, and pass it to [crate::ConfigBuilder::event_processor].
62///
63/// # Examples
64///
65/// Adjust the flush interval
66/// ```
67/// # use launchdarkly_server_sdk::{EventProcessorBuilder, ConfigBuilder};
68/// # use hyper_rustls::HttpsConnector;
69/// # use hyper::client::HttpConnector;
70/// # use std::time::Duration;
71/// # fn main() {
72///     ConfigBuilder::new("sdk-key").event_processor(EventProcessorBuilder::<HttpsConnector<HttpConnector>>::new()
73///         .flush_interval(Duration::from_secs(10)));
74/// # }
75/// ```
76#[derive(Clone)]
77pub struct EventProcessorBuilder<C> {
78    capacity: usize,
79    flush_interval: Duration,
80    context_keys_capacity: NonZeroUsize,
81    context_keys_flush_interval: Duration,
82    event_sender: Option<Arc<dyn EventSender>>,
83    all_attributes_private: bool,
84    private_attributes: HashSet<Reference>,
85    connector: Option<C>,
86    omit_anonymous_contexts: bool,
87    compress_events: bool,
88    // diagnostic_recording_interval: Duration
89}
90
91impl<C> EventProcessorFactory for EventProcessorBuilder<C>
92where
93    C: Service<Uri> + Clone + Send + Sync + 'static,
94    C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin,
95    C::Future: Send + Unpin + 'static,
96    C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
97{
98    fn build(
99        &self,
100        endpoints: &service_endpoints::ServiceEndpoints,
101        sdk_key: &str,
102        tags: Option<String>,
103    ) -> Result<Arc<dyn EventProcessor>, BuildError> {
104        let url_string = format!("{}/bulk", endpoints.events_base_url());
105
106        let mut default_headers = HashMap::<&str, String>::new();
107
108        if let Some(tags) = tags {
109            default_headers.insert(LAUNCHDARKLY_TAGS_HEADER, tags);
110        }
111
112        let event_sender_result: Result<Arc<dyn EventSender>, BuildError> =
113            // NOTE: This would only be possible under unit testing conditions.
114            if let Some(event_sender) = &self.event_sender {
115                Ok(event_sender.clone())
116            } else if let Some(connector) = &self.connector {
117                Ok(Arc::new(HyperEventSender::new(
118                    connector.clone(),
119                    hyper::Uri::from_str(url_string.as_str()).unwrap(),
120                    sdk_key,
121                    default_headers,
122                    self.compress_events,
123                )))
124            } else {
125                #[cfg(feature = "rustls")]
126                {
127                    let connector = HttpsConnectorBuilder::new()
128                        .with_native_roots()
129                        .https_or_http()
130                        .enable_http1()
131                        .enable_http2()
132                        .build();
133
134                    Ok(Arc::new(HyperEventSender::new(
135                        connector,
136                        hyper::Uri::from_str(url_string.as_str()).unwrap(),
137                        sdk_key,
138                        default_headers,
139                        self.compress_events,
140                    )))
141                }
142                #[cfg(not(feature = "rustls"))]
143                Err(BuildError::InvalidConfig(
144                    "https connector is required when rustls is disabled".into(),
145                ))
146            };
147        let event_sender = event_sender_result?;
148
149        let events_configuration = EventsConfiguration {
150            event_sender,
151            capacity: self.capacity,
152            flush_interval: self.flush_interval,
153            context_keys_capacity: self.context_keys_capacity,
154            context_keys_flush_interval: self.context_keys_flush_interval,
155            all_attributes_private: self.all_attributes_private,
156            private_attributes: self.private_attributes.clone(),
157            omit_anonymous_contexts: self.omit_anonymous_contexts,
158        };
159
160        let events_processor =
161            EventProcessorImpl::new(events_configuration).map_err(BuildError::FailedToStart)?;
162
163        Ok(Arc::new(events_processor))
164    }
165
166    fn to_owned(&self) -> Box<dyn EventProcessorFactory> {
167        Box::new(self.clone())
168    }
169}
170
171impl<C> EventProcessorBuilder<C> {
172    /// Create a new [EventProcessorBuilder] with all default values.
173    pub fn new() -> Self {
174        Self {
175            capacity: DEFAULT_EVENT_CAPACITY,
176            flush_interval: DEFAULT_FLUSH_POLL_INTERVAL,
177            context_keys_capacity: DEFAULT_CONTEXT_KEY_CAPACITY
178                .unwrap_or_else(|| NonZeroUsize::new(1).unwrap()),
179            context_keys_flush_interval: DEFAULT_CONTEXT_KEYS_FLUSH_INTERVAL,
180            event_sender: None,
181            all_attributes_private: false,
182            private_attributes: HashSet::new(),
183            omit_anonymous_contexts: false,
184            connector: None,
185            compress_events: false,
186        }
187    }
188
189    /// Set the capacity of the events buffer.
190    ///
191    /// The client buffers up to this many events in memory before flushing. If the capacity is exceeded before
192    /// the buffer is flushed [crate::EventProcessor::flush], events will be discarded. Increasing the
193    /// capacity means that events are less likely to be discarded, at the cost of consuming more memory.
194    ///
195    pub fn capacity(&mut self, capacity: usize) -> &mut Self {
196        self.capacity = capacity;
197        self
198    }
199
200    /// Sets the interval between flushes of the event buffer.
201    ///
202    /// Decreasing the flush interval means that the event buffer is less likely to reach capacity.
203    pub fn flush_interval(&mut self, flush_interval: Duration) -> &mut Self {
204        self.flush_interval = flush_interval;
205        self
206    }
207
208    /// Sets the number of context keys that the event processor can remember at any one time.
209    ///
210    /// To avoid sending duplicate context details in analytics events, the SDK maintains a cache of
211    /// recently seen context keys.
212    pub fn context_keys_capacity(&mut self, context_keys_capacity: NonZeroUsize) -> &mut Self {
213        self.context_keys_capacity = context_keys_capacity;
214        self
215    }
216
217    /// Sets the interval at which the event processor will reset its cache of known context keys.
218    pub fn context_keys_flush_interval(
219        &mut self,
220        context_keys_flush_interval: Duration,
221    ) -> &mut Self {
222        self.context_keys_flush_interval = context_keys_flush_interval;
223        self
224    }
225
226    /// Sets whether or not all optional user attributes should be hidden from LaunchDarkly.
227    ///
228    /// If this is true, all user attribute values (other than the key) will be private, not just the attributes
229    /// specified with private_attributes or on a per-user basis with UserBuilder methods. By default, it is false.
230    pub fn all_attributes_private(&mut self, all_attributes_private: bool) -> &mut Self {
231        self.all_attributes_private = all_attributes_private;
232        self
233    }
234
235    /// Marks a set of attribute names as always private.
236    ///
237    /// Any users sent to LaunchDarkly with this configuration active will have attributes with these
238    /// names removed. This is in addition to any attributes that were marked as private for an
239    /// individual user with UserBuilder methods. Setting all_attribute_private to true overrides this.
240    pub fn private_attributes<R>(&mut self, attributes: HashSet<R>) -> &mut Self
241    where
242        R: Into<Reference>,
243    {
244        self.private_attributes = attributes.into_iter().map(|a| a.into()).collect();
245        self
246    }
247
248    /// Sets the connector for the event sender to use. This allows for re-use of a connector
249    /// between multiple client instances. This is especially useful for the `sdk-test-harness`
250    /// where many client instances are created throughout the test and reading the native
251    /// certificates is a substantial portion of the runtime.
252    pub fn https_connector(&mut self, connector: C) -> &mut Self {
253        self.connector = Some(connector);
254        self
255    }
256
257    /// Sets whether anonymous contexts should be omitted from index and identify events.
258    ///
259    /// The default is false, meaning that anonymous contexts will be included in index and
260    /// identify events.
261    pub fn omit_anonymous_contexts(&mut self, omit: bool) -> &mut Self {
262        self.omit_anonymous_contexts = omit;
263        self
264    }
265
266    #[cfg(feature = "event-compression")]
267    /// Should the event payload sent to LaunchDarkly use gzip compression. By
268    /// default this is false to prevent backward breaking compatibility issues with
269    /// older versions of the relay proxy.
270    //
271    /// Customers not using the relay proxy are strongly encouraged to enable this
272    /// feature to reduce egress bandwidth cost.
273    pub fn compress_events(&mut self, enabled: bool) -> &mut Self {
274        self.compress_events = enabled;
275        self
276    }
277
278    #[cfg(test)]
279    /// Test only functionality that allows us to override the event sender.
280    pub fn event_sender(&mut self, event_sender: Arc<dyn EventSender>) -> &mut Self {
281        self.event_sender = Some(event_sender);
282        self
283    }
284}
285
286impl<C> Default for EventProcessorBuilder<C> {
287    fn default() -> Self {
288        Self::new()
289    }
290}
291
292/// An implementation of EventProcessorFactory that will discard all events received. This should
293/// only be used for unit tests.
294#[derive(Clone)]
295pub struct NullEventProcessorBuilder {}
296
297impl EventProcessorFactory for NullEventProcessorBuilder {
298    fn build(
299        &self,
300        _: &service_endpoints::ServiceEndpoints,
301        _: &str,
302        _: Option<String>,
303    ) -> Result<Arc<dyn EventProcessor>, BuildError> {
304        Ok(Arc::new(NullEventProcessor::new()))
305    }
306
307    fn to_owned(&self) -> Box<dyn EventProcessorFactory> {
308        Box::new(self.clone())
309    }
310}
311
312impl NullEventProcessorBuilder {
313    /// Create a new [NullEventProcessorBuilder] with all default values.
314    pub fn new() -> Self {
315        Self {}
316    }
317}
318
319impl Default for NullEventProcessorBuilder {
320    fn default() -> Self {
321        Self::new()
322    }
323}
324
325#[cfg(test)]
326mod tests {
327    use hyper::client::HttpConnector;
328    use launchdarkly_server_sdk_evaluation::ContextBuilder;
329    use maplit::hashset;
330    use mockito::Matcher;
331    use test_case::test_case;
332
333    use crate::{events::event::EventFactory, ServiceEndpointsBuilder};
334
335    use super::*;
336
337    #[test]
338    fn default_builder_has_correct_defaults() {
339        let builder = EventProcessorBuilder::<HttpConnector>::new();
340        assert_eq!(builder.capacity, DEFAULT_EVENT_CAPACITY);
341        assert_eq!(builder.flush_interval, DEFAULT_FLUSH_POLL_INTERVAL);
342    }
343
344    #[test]
345    fn capacity_can_be_adjusted() {
346        let mut builder = EventProcessorBuilder::<HttpConnector>::new();
347        builder.capacity(1234);
348        assert_eq!(builder.capacity, 1234);
349    }
350
351    #[test]
352    fn flush_interval_can_be_adjusted() {
353        let mut builder = EventProcessorBuilder::<HttpConnector>::new();
354        builder.flush_interval(Duration::from_secs(1234));
355        assert_eq!(builder.flush_interval, Duration::from_secs(1234));
356    }
357
358    #[test]
359    fn context_keys_capacity_can_be_adjusted() {
360        let mut builder = EventProcessorBuilder::<HttpConnector>::new();
361        let cap = NonZeroUsize::new(1234).expect("1234 > 0");
362        builder.context_keys_capacity(cap);
363        assert_eq!(builder.context_keys_capacity, cap);
364    }
365
366    #[test]
367    fn context_keys_flush_interval_can_be_adjusted() {
368        let mut builder = EventProcessorBuilder::<HttpConnector>::new();
369        builder.context_keys_flush_interval(Duration::from_secs(1000));
370        assert_eq!(
371            builder.context_keys_flush_interval,
372            Duration::from_secs(1000)
373        );
374    }
375
376    #[test]
377    fn all_attribute_private_can_be_adjusted() {
378        let mut builder = EventProcessorBuilder::<HttpConnector>::new();
379
380        assert!(!builder.all_attributes_private);
381        builder.all_attributes_private(true);
382        assert!(builder.all_attributes_private);
383    }
384
385    #[test]
386    fn attribte_names_can_be_adjusted() {
387        let mut builder = EventProcessorBuilder::<HttpConnector>::new();
388
389        assert!(builder.private_attributes.is_empty());
390        builder.private_attributes(hashset!["name"]);
391        assert!(builder.private_attributes.contains(&"name".into()));
392    }
393
394    #[test_case(Some("application-id/abc:application-sha/xyz".into()), "application-id/abc:application-sha/xyz")]
395    #[test_case(None, Matcher::Missing)]
396    fn processor_sends_correct_headers(tag: Option<String>, matcher: impl Into<Matcher>) {
397        let mut server = mockito::Server::new();
398        let mock = server
399            .mock("POST", "/bulk")
400            .with_status(200)
401            .expect_at_least(1)
402            .match_header(LAUNCHDARKLY_TAGS_HEADER, matcher)
403            .create();
404
405        let service_endpoints = ServiceEndpointsBuilder::new()
406            .events_base_url(&server.url())
407            .polling_base_url(&server.url())
408            .streaming_base_url(&server.url())
409            .build()
410            .expect("Service endpoints failed to be created");
411
412        let builder = EventProcessorBuilder::<HttpConnector>::new();
413        let processor = builder
414            .build(&service_endpoints, "sdk-key", tag)
415            .expect("Processor failed to build");
416
417        let event_factory = EventFactory::new(false);
418
419        let context = ContextBuilder::new("bob")
420            .build()
421            .expect("Failed to create context");
422        let identify_event = event_factory.new_identify(context);
423
424        processor.send(identify_event);
425        processor.close();
426
427        mock.assert()
428    }
429}