launchdarkly_server_sdk/events/
processor_builders.rs1use std::collections::{HashMap, HashSet};
2use std::num::NonZeroUsize;
3use std::str::FromStr;
4use std::sync::Arc;
5use std::time::Duration;
6
7use http::Uri;
8use launchdarkly_server_sdk_evaluation::Reference;
9use thiserror::Error;
10
11use crate::events::sender::HttpEventSender;
12use crate::{service_endpoints, LAUNCHDARKLY_INSTANCE_ID_HEADER, LAUNCHDARKLY_TAGS_HEADER};
13use launchdarkly_sdk_transport::HttpTransport;
14
15use super::processor::{
16 EventProcessor, EventProcessorError, EventProcessorImpl, NullEventProcessor,
17};
18use super::sender::EventSender;
19use super::EventsConfiguration;
20
21const DEFAULT_FLUSH_POLL_INTERVAL: Duration = Duration::from_secs(5);
22const DEFAULT_EVENT_CAPACITY: usize = 500;
23const DEFAULT_CONTEXT_KEY_CAPACITY: Option<NonZeroUsize> = NonZeroUsize::new(1000);
26const DEFAULT_CONTEXT_KEYS_FLUSH_INTERVAL: Duration = Duration::from_secs(5 * 60);
27
28#[non_exhaustive]
30#[derive(Debug, Error)]
31pub enum BuildError {
32 #[error("event processor factory failed to build: {0}")]
34 InvalidConfig(String),
35
36 #[error(transparent)]
38 FailedToStart(EventProcessorError),
39}
40
41pub trait EventProcessorFactory {
44 fn build(
45 &self,
46 endpoints: &service_endpoints::ServiceEndpoints,
47 sdk_key: &str,
48 tags: Option<String>,
49 ) -> Result<Arc<dyn EventProcessor>, BuildError>;
50
51 fn set_instance_id(&mut self, _instance_id: String) {}
57
58 fn to_owned(&self) -> Box<dyn EventProcessorFactory>;
59}
60
61#[derive(Clone)]
80pub struct EventProcessorBuilder<T: HttpTransport = launchdarkly_sdk_transport::HyperTransport> {
81 capacity: usize,
82 flush_interval: Duration,
83 context_keys_capacity: NonZeroUsize,
84 context_keys_flush_interval: Duration,
85 event_sender: Option<Arc<dyn EventSender>>,
86 all_attributes_private: bool,
87 private_attributes: HashSet<Reference>,
88 transport: Option<T>,
89 omit_anonymous_contexts: bool,
90 compress_events: bool,
91 instance_id: Option<String>,
92 }
94
95impl<T: HttpTransport> EventProcessorFactory for EventProcessorBuilder<T> {
96 fn build(
97 &self,
98 endpoints: &service_endpoints::ServiceEndpoints,
99 sdk_key: &str,
100 tags: Option<String>,
101 ) -> Result<Arc<dyn EventProcessor>, BuildError> {
102 let url_string = format!("{}/bulk", endpoints.events_base_url());
103
104 let mut default_headers = HashMap::<&str, String>::new();
105
106 if let Some(tags) = tags {
107 default_headers.insert(LAUNCHDARKLY_TAGS_HEADER, tags);
108 }
109 if let Some(instance_id) = &self.instance_id {
110 default_headers.insert(LAUNCHDARKLY_INSTANCE_ID_HEADER, instance_id.clone());
111 }
112
113 let event_sender_result: Result<Arc<dyn EventSender>, BuildError> =
114 if let Some(event_sender) = &self.event_sender {
116 Ok(event_sender.clone())
117 } else if let Some(transport) = &self.transport {
118 Ok(Arc::new(HttpEventSender::new(
119 transport.clone(),
120 Uri::from_str(url_string.as_str()).unwrap(),
121 sdk_key,
122 default_headers,
123 self.compress_events,
124 )))
125 } else {
126 #[cfg(any(
127 feature = "hyper-rustls-native-roots",
128 feature = "hyper-rustls-webpki-roots",
129 feature = "native-tls"
130 ))]
131 {
132 let transport = launchdarkly_sdk_transport::HyperTransport::new_https().map_err(|e| {
133 BuildError::InvalidConfig(format!(
134 "failed to create default https transport: {}",
135 e
136 ))
137 })?;
138 Ok(Arc::new(HttpEventSender::new(
139 transport,
140 Uri::from_str(url_string.as_str()).unwrap(),
141 sdk_key,
142 default_headers,
143 self.compress_events,
144 )))
145 }
146 #[cfg(not(any(
147 feature = "hyper-rustls-native-roots",
148 feature = "hyper-rustls-webpki-roots",
149 feature = "native-tls"
150 )))]
151 Err(BuildError::InvalidConfig(
152 "transport is required when hyper-rustls-native-roots, hyper-rustls-webpki-roots, or native-tls features are disabled".into(),
153 ))
154 };
155 let event_sender = event_sender_result?;
156
157 let events_configuration = EventsConfiguration {
158 event_sender,
159 capacity: self.capacity,
160 flush_interval: self.flush_interval,
161 context_keys_capacity: self.context_keys_capacity,
162 context_keys_flush_interval: self.context_keys_flush_interval,
163 all_attributes_private: self.all_attributes_private,
164 private_attributes: self.private_attributes.clone(),
165 omit_anonymous_contexts: self.omit_anonymous_contexts,
166 };
167
168 let events_processor =
169 EventProcessorImpl::new(events_configuration).map_err(BuildError::FailedToStart)?;
170
171 Ok(Arc::new(events_processor))
172 }
173
174 fn set_instance_id(&mut self, instance_id: String) {
175 self.instance_id = Some(instance_id);
176 }
177
178 fn to_owned(&self) -> Box<dyn EventProcessorFactory> {
179 Box::new(self.clone())
180 }
181}
182
183impl<T: HttpTransport> EventProcessorBuilder<T> {
184 pub fn new() -> Self {
186 Self {
187 capacity: DEFAULT_EVENT_CAPACITY,
188 flush_interval: DEFAULT_FLUSH_POLL_INTERVAL,
189 context_keys_capacity: DEFAULT_CONTEXT_KEY_CAPACITY
190 .unwrap_or_else(|| NonZeroUsize::new(1).unwrap()),
191 context_keys_flush_interval: DEFAULT_CONTEXT_KEYS_FLUSH_INTERVAL,
192 event_sender: None,
193 all_attributes_private: false,
194 private_attributes: HashSet::new(),
195 omit_anonymous_contexts: false,
196 transport: None,
197 instance_id: None,
198 #[cfg(feature = "event-compression")]
199 compress_events: true,
200 #[cfg(not(feature = "event-compression"))]
201 compress_events: false,
202 }
203 }
204
205 pub fn capacity(&mut self, capacity: usize) -> &mut Self {
212 self.capacity = capacity;
213 self
214 }
215
216 pub fn flush_interval(&mut self, flush_interval: Duration) -> &mut Self {
220 self.flush_interval = flush_interval;
221 self
222 }
223
224 pub fn context_keys_capacity(&mut self, context_keys_capacity: NonZeroUsize) -> &mut Self {
229 self.context_keys_capacity = context_keys_capacity;
230 self
231 }
232
233 pub fn context_keys_flush_interval(
235 &mut self,
236 context_keys_flush_interval: Duration,
237 ) -> &mut Self {
238 self.context_keys_flush_interval = context_keys_flush_interval;
239 self
240 }
241
242 pub fn all_attributes_private(&mut self, all_attributes_private: bool) -> &mut Self {
247 self.all_attributes_private = all_attributes_private;
248 self
249 }
250
251 pub fn private_attributes<R>(&mut self, attributes: HashSet<R>) -> &mut Self
257 where
258 R: Into<Reference>,
259 {
260 self.private_attributes = attributes.into_iter().map(|a| a.into()).collect();
261 self
262 }
263
264 pub fn transport(&mut self, transport: T) -> &mut Self {
269 self.transport = Some(transport);
270 self
271 }
272
273 pub fn omit_anonymous_contexts(&mut self, omit: bool) -> &mut Self {
278 self.omit_anonymous_contexts = omit;
279 self
280 }
281
282 #[cfg(feature = "event-compression")]
283 pub fn compress_events(&mut self, enabled: bool) -> &mut Self {
289 self.compress_events = enabled;
290 self
291 }
292
293 #[cfg(test)]
294 pub fn event_sender(&mut self, event_sender: Arc<dyn EventSender>) -> &mut Self {
296 self.event_sender = Some(event_sender);
297 self
298 }
299}
300
301impl<T: HttpTransport> Default for EventProcessorBuilder<T> {
302 fn default() -> Self {
303 Self::new()
304 }
305}
306
307#[derive(Clone)]
310pub struct NullEventProcessorBuilder {}
311
312impl EventProcessorFactory for NullEventProcessorBuilder {
313 fn build(
314 &self,
315 _: &service_endpoints::ServiceEndpoints,
316 _: &str,
317 _: Option<String>,
318 ) -> Result<Arc<dyn EventProcessor>, BuildError> {
319 Ok(Arc::new(NullEventProcessor::new()))
320 }
321
322 fn to_owned(&self) -> Box<dyn EventProcessorFactory> {
323 Box::new(self.clone())
324 }
325}
326
327impl NullEventProcessorBuilder {
328 pub fn new() -> Self {
330 Self {}
331 }
332}
333
334impl Default for NullEventProcessorBuilder {
335 fn default() -> Self {
336 Self::new()
337 }
338}
339
340#[cfg(test)]
341mod tests {
342 use launchdarkly_server_sdk_evaluation::ContextBuilder;
343 use maplit::hashset;
344 use mockito::Matcher;
345 use test_case::test_case;
346
347 use crate::{events::event::EventFactory, ServiceEndpointsBuilder};
348
349 use super::*;
350
351 #[test]
352 fn default_builder_has_correct_defaults() {
353 let builder = EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
354 assert_eq!(builder.capacity, DEFAULT_EVENT_CAPACITY);
355 assert_eq!(builder.flush_interval, DEFAULT_FLUSH_POLL_INTERVAL);
356 }
357
358 #[test]
359 fn capacity_can_be_adjusted() {
360 let mut builder =
361 EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
362 builder.capacity(1234);
363 assert_eq!(builder.capacity, 1234);
364 }
365
366 #[test]
367 fn flush_interval_can_be_adjusted() {
368 let mut builder =
369 EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
370 builder.flush_interval(Duration::from_secs(1234));
371 assert_eq!(builder.flush_interval, Duration::from_secs(1234));
372 }
373
374 #[test]
375 fn context_keys_capacity_can_be_adjusted() {
376 let mut builder =
377 EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
378 let cap = NonZeroUsize::new(1234).expect("1234 > 0");
379 builder.context_keys_capacity(cap);
380 assert_eq!(builder.context_keys_capacity, cap);
381 }
382
383 #[test]
384 fn context_keys_flush_interval_can_be_adjusted() {
385 let mut builder =
386 EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
387 builder.context_keys_flush_interval(Duration::from_secs(1000));
388 assert_eq!(
389 builder.context_keys_flush_interval,
390 Duration::from_secs(1000)
391 );
392 }
393
394 #[test]
395 fn all_attribute_private_can_be_adjusted() {
396 let mut builder =
397 EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
398
399 assert!(!builder.all_attributes_private);
400 builder.all_attributes_private(true);
401 assert!(builder.all_attributes_private);
402 }
403
404 #[test]
405 fn attribte_names_can_be_adjusted() {
406 let mut builder =
407 EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
408
409 assert!(builder.private_attributes.is_empty());
410 builder.private_attributes(hashset!["name"]);
411 assert!(builder.private_attributes.contains(&"name".into()));
412 }
413
414 #[test_case(Some("application-id/abc:application-sha/xyz".into()), "application-id/abc:application-sha/xyz")]
415 #[test_case(None, Matcher::Missing)]
416 #[cfg(any(
417 feature = "hyper-rustls-native-roots",
418 feature = "hyper-rustls-webpki-roots",
419 feature = "native-tls"
420 ))]
421 fn processor_sends_correct_headers(tag: Option<String>, matcher: impl Into<Matcher>) {
422 let mut server = mockito::Server::new();
423 let mock = server
424 .mock("POST", "/bulk")
425 .with_status(200)
426 .expect_at_least(1)
427 .match_header(LAUNCHDARKLY_TAGS_HEADER, matcher)
428 .create();
429
430 let service_endpoints = ServiceEndpointsBuilder::new()
431 .events_base_url(&server.url())
432 .polling_base_url(&server.url())
433 .streaming_base_url(&server.url())
434 .build()
435 .expect("Service endpoints failed to be created");
436
437 let builder = EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
438 let processor = builder
439 .build(&service_endpoints, "sdk-key", tag)
440 .expect("Processor failed to build");
441
442 let event_factory = EventFactory::new(false);
443
444 let context = ContextBuilder::new("bob")
445 .build()
446 .expect("Failed to create context");
447 let identify_event = event_factory.new_identify(context);
448
449 processor.send(identify_event);
450 processor.close();
451
452 mock.assert()
453 }
454
455 #[cfg(any(
458 feature = "hyper-rustls-native-roots",
459 feature = "hyper-rustls-webpki-roots",
460 feature = "native-tls"
461 ))]
462 #[test]
463 fn processor_sends_instance_id_header() {
464 let mut server = mockito::Server::new();
465 let instance_id = uuid::Uuid::new_v4().to_string();
466 let mock = server
467 .mock("POST", "/bulk")
468 .with_status(200)
469 .expect_at_least(1)
470 .match_header(LAUNCHDARKLY_INSTANCE_ID_HEADER, instance_id.as_str())
471 .create();
472
473 let service_endpoints = ServiceEndpointsBuilder::new()
474 .events_base_url(&server.url())
475 .polling_base_url(&server.url())
476 .streaming_base_url(&server.url())
477 .build()
478 .expect("Service endpoints failed to be created");
479
480 let mut builder =
481 EventProcessorBuilder::<launchdarkly_sdk_transport::HyperTransport>::new();
482 builder.set_instance_id(instance_id.clone());
483 let processor = builder
484 .build(&service_endpoints, "sdk-key", None)
485 .expect("Processor failed to build");
486
487 let event_factory = EventFactory::new(false);
488
489 let context = ContextBuilder::new("bob")
490 .build()
491 .expect("Failed to create context");
492 let identify_event = event_factory.new_identify(context);
493
494 processor.send(identify_event);
495 processor.close();
496
497 mock.assert()
498 }
499}