launchdarkly_server_sdk/events/
processor_builders.rs1use std::collections::{HashMap, HashSet};
2use std::num::NonZeroUsize;
3use std::str::FromStr;
4use std::sync::Arc;
5use std::time::Duration;
6
7use http::Uri;
8use launchdarkly_server_sdk_evaluation::Reference;
9use thiserror::Error;
10
11use crate::events::sender::HttpEventSender;
12use crate::{service_endpoints, LAUNCHDARKLY_TAGS_HEADER};
13use launchdarkly_sdk_transport::HttpTransport;
14
15use super::processor::{
16 EventProcessor, EventProcessorError, EventProcessorImpl, NullEventProcessor,
17};
18use super::sender::EventSender;
19use super::EventsConfiguration;
20
21const DEFAULT_FLUSH_POLL_INTERVAL: Duration = Duration::from_secs(5);
22const DEFAULT_EVENT_CAPACITY: usize = 500;
23const DEFAULT_CONTEXT_KEY_CAPACITY: Option<NonZeroUsize> = NonZeroUsize::new(1000);
26const DEFAULT_CONTEXT_KEYS_FLUSH_INTERVAL: Duration = Duration::from_secs(5 * 60);
27
28#[non_exhaustive]
30#[derive(Debug, Error)]
31pub enum BuildError {
32 #[error("event processor factory failed to build: {0}")]
34 InvalidConfig(String),
35
36 #[error(transparent)]
38 FailedToStart(EventProcessorError),
39}
40
41pub trait EventProcessorFactory {
44 fn build(
45 &self,
46 endpoints: &service_endpoints::ServiceEndpoints,
47 sdk_key: &str,
48 tags: Option<String>,
49 ) -> Result<Arc<dyn EventProcessor>, BuildError>;
50 fn to_owned(&self) -> Box<dyn EventProcessorFactory>;
51}
52
53#[derive(Clone)]
72pub struct EventProcessorBuilder<T: HttpTransport = launchdarkly_sdk_transport::HyperTransport> {
73 capacity: usize,
74 flush_interval: Duration,
75 context_keys_capacity: NonZeroUsize,
76 context_keys_flush_interval: Duration,
77 event_sender: Option<Arc<dyn EventSender>>,
78 all_attributes_private: bool,
79 private_attributes: HashSet<Reference>,
80 transport: Option<T>,
81 omit_anonymous_contexts: bool,
82 compress_events: bool,
83 }
85
86impl<T: HttpTransport> EventProcessorFactory for EventProcessorBuilder<T> {
87 fn build(
88 &self,
89 endpoints: &service_endpoints::ServiceEndpoints,
90 sdk_key: &str,
91 tags: Option<String>,
92 ) -> Result<Arc<dyn EventProcessor>, BuildError> {
93 let url_string = format!("{}/bulk", endpoints.events_base_url());
94
95 let mut default_headers = HashMap::<&str, String>::new();
96
97 if let Some(tags) = tags {
98 default_headers.insert(LAUNCHDARKLY_TAGS_HEADER, tags);
99 }
100
101 let event_sender_result: Result<Arc<dyn EventSender>, BuildError> =
102 if let Some(event_sender) = &self.event_sender {
104 Ok(event_sender.clone())
105 } else if let Some(transport) = &self.transport {
106 Ok(Arc::new(HttpEventSender::new(
107 transport.clone(),
108 Uri::from_str(url_string.as_str()).unwrap(),
109 sdk_key,
110 default_headers,
111 self.compress_events,
112 )))
113 } else {
114 #[cfg(any(
115 feature = "hyper-rustls-native-roots",
116 feature = "hyper-rustls-webpki-roots",
117 feature = "native-tls"
118 ))]
119 {
120 let transport = launchdarkly_sdk_transport::HyperTransport::new_https().map_err(|e| {
121 BuildError::InvalidConfig(format!(
122 "failed to create default https transport: {}",
123 e
124 ))
125 })?;
126 Ok(Arc::new(HttpEventSender::new(
127 transport,
128 Uri::from_str(url_string.as_str()).unwrap(),
129 sdk_key,
130 default_headers,
131 self.compress_events,
132 )))
133 }
134 #[cfg(not(any(
135 feature = "hyper-rustls-native-roots",
136 feature = "hyper-rustls-webpki-roots",
137 feature = "native-tls"
138 )))]
139 Err(BuildError::InvalidConfig(
140 "transport is required when hyper-rustls-native-roots, hyper-rustls-webpki-roots, or native-tls features are disabled".into(),
141 ))
142 };
143 let event_sender = event_sender_result?;
144
145 let events_configuration = EventsConfiguration {
146 event_sender,
147 capacity: self.capacity,
148 flush_interval: self.flush_interval,
149 context_keys_capacity: self.context_keys_capacity,
150 context_keys_flush_interval: self.context_keys_flush_interval,
151 all_attributes_private: self.all_attributes_private,
152 private_attributes: self.private_attributes.clone(),
153 omit_anonymous_contexts: self.omit_anonymous_contexts,
154 };
155
156 let events_processor =
157 EventProcessorImpl::new(events_configuration).map_err(BuildError::FailedToStart)?;
158
159 Ok(Arc::new(events_processor))
160 }
161
162 fn to_owned(&self) -> Box<dyn EventProcessorFactory> {
163 Box::new(self.clone())
164 }
165}
166
167impl<T: HttpTransport> EventProcessorBuilder<T> {
168 pub fn new() -> Self {
170 Self {
171 capacity: DEFAULT_EVENT_CAPACITY,
172 flush_interval: DEFAULT_FLUSH_POLL_INTERVAL,
173 context_keys_capacity: DEFAULT_CONTEXT_KEY_CAPACITY
174 .unwrap_or_else(|| NonZeroUsize::new(1).unwrap()),
175 context_keys_flush_interval: DEFAULT_CONTEXT_KEYS_FLUSH_INTERVAL,
176 event_sender: None,
177 all_attributes_private: false,
178 private_attributes: HashSet::new(),
179 omit_anonymous_contexts: false,
180 transport: None,
181 #[cfg(feature = "event-compression")]
182 compress_events: true,
183 #[cfg(not(feature = "event-compression"))]
184 compress_events: false,
185 }
186 }
187
188 pub fn capacity(&mut self, capacity: usize) -> &mut Self {
195 self.capacity = capacity;
196 self
197 }
198
199 pub fn flush_interval(&mut self, flush_interval: Duration) -> &mut Self {
203 self.flush_interval = flush_interval;
204 self
205 }
206
207 pub fn context_keys_capacity(&mut self, context_keys_capacity: NonZeroUsize) -> &mut Self {
212 self.context_keys_capacity = context_keys_capacity;
213 self
214 }
215
216 pub fn context_keys_flush_interval(
218 &mut self,
219 context_keys_flush_interval: Duration,
220 ) -> &mut Self {
221 self.context_keys_flush_interval = context_keys_flush_interval;
222 self
223 }
224
225 pub fn all_attributes_private(&mut self, all_attributes_private: bool) -> &mut Self {
230 self.all_attributes_private = all_attributes_private;
231 self
232 }
233
234 pub fn private_attributes<R>(&mut self, attributes: HashSet<R>) -> &mut Self
240 where
241 R: Into<Reference>,
242 {
243 self.private_attributes = attributes.into_iter().map(|a| a.into()).collect();
244 self
245 }
246
247 pub fn transport(&mut self, transport: T) -> &mut Self {
252 self.transport = Some(transport);
253 self
254 }
255
256 pub fn omit_anonymous_contexts(&mut self, omit: bool) -> &mut Self {
261 self.omit_anonymous_contexts = omit;
262 self
263 }
264
265 #[cfg(feature = "event-compression")]
266 pub fn compress_events(&mut self, enabled: bool) -> &mut Self {
272 self.compress_events = enabled;
273 self
274 }
275
276 #[cfg(test)]
277 pub fn event_sender(&mut self, event_sender: Arc<dyn EventSender>) -> &mut Self {
279 self.event_sender = Some(event_sender);
280 self
281 }
282}
283
284impl<T: HttpTransport> Default for EventProcessorBuilder<T> {
285 fn default() -> Self {
286 Self::new()
287 }
288}
289
290#[derive(Clone)]
293pub struct NullEventProcessorBuilder {}
294
295impl EventProcessorFactory for NullEventProcessorBuilder {
296 fn build(
297 &self,
298 _: &service_endpoints::ServiceEndpoints,
299 _: &str,
300 _: Option<String>,
301 ) -> Result<Arc<dyn EventProcessor>, BuildError> {
302 Ok(Arc::new(NullEventProcessor::new()))
303 }
304
305 fn to_owned(&self) -> Box<dyn EventProcessorFactory> {
306 Box::new(self.clone())
307 }
308}
309
310impl NullEventProcessorBuilder {
311 pub fn new() -> Self {
313 Self {}
314 }
315}
316
317impl Default for NullEventProcessorBuilder {
318 fn default() -> Self {
319 Self::new()
320 }
321}
322
323#[cfg(test)]
324mod tests {
325 use launchdarkly_server_sdk_evaluation::ContextBuilder;
326 use maplit::hashset;
327 use mockito::Matcher;
328 use test_case::test_case;
329
330 use crate::{events::event::EventFactory, ServiceEndpointsBuilder};
331
332 use super::*;
333
334 #[test]
335 fn default_builder_has_correct_defaults() {
336 let builder = EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
337 assert_eq!(builder.capacity, DEFAULT_EVENT_CAPACITY);
338 assert_eq!(builder.flush_interval, DEFAULT_FLUSH_POLL_INTERVAL);
339 }
340
341 #[test]
342 fn capacity_can_be_adjusted() {
343 let mut builder =
344 EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
345 builder.capacity(1234);
346 assert_eq!(builder.capacity, 1234);
347 }
348
349 #[test]
350 fn flush_interval_can_be_adjusted() {
351 let mut builder =
352 EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
353 builder.flush_interval(Duration::from_secs(1234));
354 assert_eq!(builder.flush_interval, Duration::from_secs(1234));
355 }
356
357 #[test]
358 fn context_keys_capacity_can_be_adjusted() {
359 let mut builder =
360 EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
361 let cap = NonZeroUsize::new(1234).expect("1234 > 0");
362 builder.context_keys_capacity(cap);
363 assert_eq!(builder.context_keys_capacity, cap);
364 }
365
366 #[test]
367 fn context_keys_flush_interval_can_be_adjusted() {
368 let mut builder =
369 EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
370 builder.context_keys_flush_interval(Duration::from_secs(1000));
371 assert_eq!(
372 builder.context_keys_flush_interval,
373 Duration::from_secs(1000)
374 );
375 }
376
377 #[test]
378 fn all_attribute_private_can_be_adjusted() {
379 let mut builder =
380 EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
381
382 assert!(!builder.all_attributes_private);
383 builder.all_attributes_private(true);
384 assert!(builder.all_attributes_private);
385 }
386
387 #[test]
388 fn attribte_names_can_be_adjusted() {
389 let mut builder =
390 EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
391
392 assert!(builder.private_attributes.is_empty());
393 builder.private_attributes(hashset!["name"]);
394 assert!(builder.private_attributes.contains(&"name".into()));
395 }
396
397 #[test_case(Some("application-id/abc:application-sha/xyz".into()), "application-id/abc:application-sha/xyz")]
398 #[test_case(None, Matcher::Missing)]
399 #[cfg(any(
400 feature = "hyper-rustls-native-roots",
401 feature = "hyper-rustls-webpki-roots",
402 feature = "native-tls"
403 ))]
404 fn processor_sends_correct_headers(tag: Option<String>, matcher: impl Into<Matcher>) {
405 let mut server = mockito::Server::new();
406 let mock = server
407 .mock("POST", "/bulk")
408 .with_status(200)
409 .expect_at_least(1)
410 .match_header(LAUNCHDARKLY_TAGS_HEADER, matcher)
411 .create();
412
413 let service_endpoints = ServiceEndpointsBuilder::new()
414 .events_base_url(&server.url())
415 .polling_base_url(&server.url())
416 .streaming_base_url(&server.url())
417 .build()
418 .expect("Service endpoints failed to be created");
419
420 let builder = EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
421 let processor = builder
422 .build(&service_endpoints, "sdk-key", tag)
423 .expect("Processor failed to build");
424
425 let event_factory = EventFactory::new(false);
426
427 let context = ContextBuilder::new("bob")
428 .build()
429 .expect("Failed to create context");
430 let identify_event = event_factory.new_identify(context);
431
432 processor.send(identify_event);
433 processor.close();
434
435 mock.assert()
436 }
437}