launchdarkly_server_sdk/events/
processor_builders.rs1use std::collections::{HashMap, HashSet};
2use std::num::NonZeroUsize;
3use std::str::FromStr;
4use std::sync::Arc;
5use std::time::Duration;
6
7use hyper::client::connect::Connection;
8use hyper::service::Service;
9use hyper::Uri;
10#[cfg(feature = "rustls")]
11use hyper_rustls::HttpsConnectorBuilder;
12use launchdarkly_server_sdk_evaluation::Reference;
13use thiserror::Error;
14use tokio::io::{AsyncRead, AsyncWrite};
15
16use crate::events::sender::HyperEventSender;
17use crate::{service_endpoints, LAUNCHDARKLY_TAGS_HEADER};
18
19use super::processor::{
20 EventProcessor, EventProcessorError, EventProcessorImpl, NullEventProcessor,
21};
22use super::sender::EventSender;
23use super::EventsConfiguration;
24
25const DEFAULT_FLUSH_POLL_INTERVAL: Duration = Duration::from_secs(5);
26const DEFAULT_EVENT_CAPACITY: usize = 500;
27const DEFAULT_CONTEXT_KEY_CAPACITY: Option<NonZeroUsize> = NonZeroUsize::new(1000);
30const DEFAULT_CONTEXT_KEYS_FLUSH_INTERVAL: Duration = Duration::from_secs(5 * 60);
31
32#[non_exhaustive]
34#[derive(Debug, Error)]
35pub enum BuildError {
36 #[error("event processor factory failed to build: {0}")]
38 InvalidConfig(String),
39
40 #[error(transparent)]
42 FailedToStart(EventProcessorError),
43}
44
45pub trait EventProcessorFactory {
48 fn build(
49 &self,
50 endpoints: &service_endpoints::ServiceEndpoints,
51 sdk_key: &str,
52 tags: Option<String>,
53 ) -> Result<Arc<dyn EventProcessor>, BuildError>;
54 fn to_owned(&self) -> Box<dyn EventProcessorFactory>;
55}
56
57#[derive(Clone)]
77pub struct EventProcessorBuilder<C> {
78 capacity: usize,
79 flush_interval: Duration,
80 context_keys_capacity: NonZeroUsize,
81 context_keys_flush_interval: Duration,
82 event_sender: Option<Arc<dyn EventSender>>,
83 all_attributes_private: bool,
84 private_attributes: HashSet<Reference>,
85 connector: Option<C>,
86 omit_anonymous_contexts: bool,
87 compress_events: bool,
88 }
90
91impl<C> EventProcessorFactory for EventProcessorBuilder<C>
92where
93 C: Service<Uri> + Clone + Send + Sync + 'static,
94 C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin,
95 C::Future: Send + Unpin + 'static,
96 C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
97{
98 fn build(
99 &self,
100 endpoints: &service_endpoints::ServiceEndpoints,
101 sdk_key: &str,
102 tags: Option<String>,
103 ) -> Result<Arc<dyn EventProcessor>, BuildError> {
104 let url_string = format!("{}/bulk", endpoints.events_base_url());
105
106 let mut default_headers = HashMap::<&str, String>::new();
107
108 if let Some(tags) = tags {
109 default_headers.insert(LAUNCHDARKLY_TAGS_HEADER, tags);
110 }
111
112 let event_sender_result: Result<Arc<dyn EventSender>, BuildError> =
113 if let Some(event_sender) = &self.event_sender {
115 Ok(event_sender.clone())
116 } else if let Some(connector) = &self.connector {
117 Ok(Arc::new(HyperEventSender::new(
118 connector.clone(),
119 hyper::Uri::from_str(url_string.as_str()).unwrap(),
120 sdk_key,
121 default_headers,
122 self.compress_events,
123 )))
124 } else {
125 #[cfg(feature = "rustls")]
126 {
127 let connector = HttpsConnectorBuilder::new()
128 .with_native_roots()
129 .https_or_http()
130 .enable_http1()
131 .enable_http2()
132 .build();
133
134 Ok(Arc::new(HyperEventSender::new(
135 connector,
136 hyper::Uri::from_str(url_string.as_str()).unwrap(),
137 sdk_key,
138 default_headers,
139 self.compress_events,
140 )))
141 }
142 #[cfg(not(feature = "rustls"))]
143 Err(BuildError::InvalidConfig(
144 "https connector is required when rustls is disabled".into(),
145 ))
146 };
147 let event_sender = event_sender_result?;
148
149 let events_configuration = EventsConfiguration {
150 event_sender,
151 capacity: self.capacity,
152 flush_interval: self.flush_interval,
153 context_keys_capacity: self.context_keys_capacity,
154 context_keys_flush_interval: self.context_keys_flush_interval,
155 all_attributes_private: self.all_attributes_private,
156 private_attributes: self.private_attributes.clone(),
157 omit_anonymous_contexts: self.omit_anonymous_contexts,
158 };
159
160 let events_processor =
161 EventProcessorImpl::new(events_configuration).map_err(BuildError::FailedToStart)?;
162
163 Ok(Arc::new(events_processor))
164 }
165
166 fn to_owned(&self) -> Box<dyn EventProcessorFactory> {
167 Box::new(self.clone())
168 }
169}
170
171impl<C> EventProcessorBuilder<C> {
172 pub fn new() -> Self {
174 Self {
175 capacity: DEFAULT_EVENT_CAPACITY,
176 flush_interval: DEFAULT_FLUSH_POLL_INTERVAL,
177 context_keys_capacity: DEFAULT_CONTEXT_KEY_CAPACITY
178 .unwrap_or_else(|| NonZeroUsize::new(1).unwrap()),
179 context_keys_flush_interval: DEFAULT_CONTEXT_KEYS_FLUSH_INTERVAL,
180 event_sender: None,
181 all_attributes_private: false,
182 private_attributes: HashSet::new(),
183 omit_anonymous_contexts: false,
184 connector: None,
185 compress_events: false,
186 }
187 }
188
189 pub fn capacity(&mut self, capacity: usize) -> &mut Self {
196 self.capacity = capacity;
197 self
198 }
199
200 pub fn flush_interval(&mut self, flush_interval: Duration) -> &mut Self {
204 self.flush_interval = flush_interval;
205 self
206 }
207
208 pub fn context_keys_capacity(&mut self, context_keys_capacity: NonZeroUsize) -> &mut Self {
213 self.context_keys_capacity = context_keys_capacity;
214 self
215 }
216
217 pub fn context_keys_flush_interval(
219 &mut self,
220 context_keys_flush_interval: Duration,
221 ) -> &mut Self {
222 self.context_keys_flush_interval = context_keys_flush_interval;
223 self
224 }
225
226 pub fn all_attributes_private(&mut self, all_attributes_private: bool) -> &mut Self {
231 self.all_attributes_private = all_attributes_private;
232 self
233 }
234
235 pub fn private_attributes<R>(&mut self, attributes: HashSet<R>) -> &mut Self
241 where
242 R: Into<Reference>,
243 {
244 self.private_attributes = attributes.into_iter().map(|a| a.into()).collect();
245 self
246 }
247
248 pub fn https_connector(&mut self, connector: C) -> &mut Self {
253 self.connector = Some(connector);
254 self
255 }
256
257 pub fn omit_anonymous_contexts(&mut self, omit: bool) -> &mut Self {
262 self.omit_anonymous_contexts = omit;
263 self
264 }
265
266 #[cfg(feature = "event-compression")]
267 pub fn compress_events(&mut self, enabled: bool) -> &mut Self {
274 self.compress_events = enabled;
275 self
276 }
277
278 #[cfg(test)]
279 pub fn event_sender(&mut self, event_sender: Arc<dyn EventSender>) -> &mut Self {
281 self.event_sender = Some(event_sender);
282 self
283 }
284}
285
286impl<C> Default for EventProcessorBuilder<C> {
287 fn default() -> Self {
288 Self::new()
289 }
290}
291
292#[derive(Clone)]
295pub struct NullEventProcessorBuilder {}
296
297impl EventProcessorFactory for NullEventProcessorBuilder {
298 fn build(
299 &self,
300 _: &service_endpoints::ServiceEndpoints,
301 _: &str,
302 _: Option<String>,
303 ) -> Result<Arc<dyn EventProcessor>, BuildError> {
304 Ok(Arc::new(NullEventProcessor::new()))
305 }
306
307 fn to_owned(&self) -> Box<dyn EventProcessorFactory> {
308 Box::new(self.clone())
309 }
310}
311
312impl NullEventProcessorBuilder {
313 pub fn new() -> Self {
315 Self {}
316 }
317}
318
319impl Default for NullEventProcessorBuilder {
320 fn default() -> Self {
321 Self::new()
322 }
323}
324
325#[cfg(test)]
326mod tests {
327 use hyper::client::HttpConnector;
328 use launchdarkly_server_sdk_evaluation::ContextBuilder;
329 use maplit::hashset;
330 use mockito::Matcher;
331 use test_case::test_case;
332
333 use crate::{events::event::EventFactory, ServiceEndpointsBuilder};
334
335 use super::*;
336
337 #[test]
338 fn default_builder_has_correct_defaults() {
339 let builder = EventProcessorBuilder::<HttpConnector>::new();
340 assert_eq!(builder.capacity, DEFAULT_EVENT_CAPACITY);
341 assert_eq!(builder.flush_interval, DEFAULT_FLUSH_POLL_INTERVAL);
342 }
343
344 #[test]
345 fn capacity_can_be_adjusted() {
346 let mut builder = EventProcessorBuilder::<HttpConnector>::new();
347 builder.capacity(1234);
348 assert_eq!(builder.capacity, 1234);
349 }
350
351 #[test]
352 fn flush_interval_can_be_adjusted() {
353 let mut builder = EventProcessorBuilder::<HttpConnector>::new();
354 builder.flush_interval(Duration::from_secs(1234));
355 assert_eq!(builder.flush_interval, Duration::from_secs(1234));
356 }
357
358 #[test]
359 fn context_keys_capacity_can_be_adjusted() {
360 let mut builder = EventProcessorBuilder::<HttpConnector>::new();
361 let cap = NonZeroUsize::new(1234).expect("1234 > 0");
362 builder.context_keys_capacity(cap);
363 assert_eq!(builder.context_keys_capacity, cap);
364 }
365
366 #[test]
367 fn context_keys_flush_interval_can_be_adjusted() {
368 let mut builder = EventProcessorBuilder::<HttpConnector>::new();
369 builder.context_keys_flush_interval(Duration::from_secs(1000));
370 assert_eq!(
371 builder.context_keys_flush_interval,
372 Duration::from_secs(1000)
373 );
374 }
375
376 #[test]
377 fn all_attribute_private_can_be_adjusted() {
378 let mut builder = EventProcessorBuilder::<HttpConnector>::new();
379
380 assert!(!builder.all_attributes_private);
381 builder.all_attributes_private(true);
382 assert!(builder.all_attributes_private);
383 }
384
385 #[test]
386 fn attribte_names_can_be_adjusted() {
387 let mut builder = EventProcessorBuilder::<HttpConnector>::new();
388
389 assert!(builder.private_attributes.is_empty());
390 builder.private_attributes(hashset!["name"]);
391 assert!(builder.private_attributes.contains(&"name".into()));
392 }
393
394 #[test_case(Some("application-id/abc:application-sha/xyz".into()), "application-id/abc:application-sha/xyz")]
395 #[test_case(None, Matcher::Missing)]
396 fn processor_sends_correct_headers(tag: Option<String>, matcher: impl Into<Matcher>) {
397 let mut server = mockito::Server::new();
398 let mock = server
399 .mock("POST", "/bulk")
400 .with_status(200)
401 .expect_at_least(1)
402 .match_header(LAUNCHDARKLY_TAGS_HEADER, matcher)
403 .create();
404
405 let service_endpoints = ServiceEndpointsBuilder::new()
406 .events_base_url(&server.url())
407 .polling_base_url(&server.url())
408 .streaming_base_url(&server.url())
409 .build()
410 .expect("Service endpoints failed to be created");
411
412 let builder = EventProcessorBuilder::<HttpConnector>::new();
413 let processor = builder
414 .build(&service_endpoints, "sdk-key", tag)
415 .expect("Processor failed to build");
416
417 let event_factory = EventFactory::new(false);
418
419 let context = ContextBuilder::new("bob")
420 .build()
421 .expect("Failed to create context");
422 let identify_event = event_factory.new_identify(context);
423
424 processor.send(identify_event);
425 processor.close();
426
427 mock.assert()
428 }
429}