1use crossbeam_channel::{bounded, RecvTimeoutError, Sender};
2use std::sync::Once;
3use std::thread;
4use std::time::Duration;
5use thiserror::Error;
6
7use super::dispatcher::{EventDispatcher, EventDispatcherMessage};
8use super::event::InputEvent;
9use super::EventsConfiguration;
10
11#[non_exhaustive]
12#[derive(Debug, Error)]
13pub enum EventProcessorError {
14 #[error(transparent)]
15 SpawnFailed(#[from] std::io::Error),
16}
17
18pub trait EventProcessor: Send + Sync {
21 fn send(&self, event: InputEvent);
25
26 fn flush(&self);
30
31 fn close(&self);
35
36 fn flush_blocking(&self, timeout: std::time::Duration) -> bool;
52}
53
54pub struct NullEventProcessor {}
55
56impl NullEventProcessor {
57 pub fn new() -> Self {
58 Self {}
59 }
60}
61
62impl EventProcessor for NullEventProcessor {
63 fn send(&self, _: InputEvent) {}
64 fn flush(&self) {}
65 fn close(&self) {}
66 fn flush_blocking(&self, _timeout: std::time::Duration) -> bool {
67 true
68 }
69}
70
71pub struct EventProcessorImpl {
72 inbox_tx: Sender<EventDispatcherMessage>,
73 inbox_full_once: Once,
74}
75
76impl EventProcessorImpl {
77 pub fn new(events_configuration: EventsConfiguration) -> Result<Self, EventProcessorError> {
78 let (inbox_tx, inbox_rx) = bounded(events_configuration.capacity);
79 let dispatch_start = move || {
80 let mut dispatcher = EventDispatcher::new(events_configuration);
81 dispatcher.start(inbox_rx)
82 };
83
84 match thread::Builder::new().spawn(dispatch_start) {
85 Ok(_) => Ok(Self {
86 inbox_tx,
87 inbox_full_once: Once::new(),
88 }),
89 Err(e) => Err(EventProcessorError::SpawnFailed(e)),
90 }
91 }
92}
93
94impl EventProcessor for EventProcessorImpl {
95 fn send(&self, event: InputEvent) {
96 if self
97 .inbox_tx
98 .try_send(EventDispatcherMessage::EventMessage(event))
99 .is_err()
100 {
101 self.inbox_full_once.call_once(|| {
102 warn!("Events are being produced faster than they can be processed; some events will be dropped")
103 });
104 }
105 }
106
107 fn flush(&self) {
108 let _ = self.inbox_tx.try_send(EventDispatcherMessage::Flush);
109 }
110
111 fn close(&self) {
112 let (sender, receiver) = bounded::<()>(1);
113
114 if self.inbox_tx.send(EventDispatcherMessage::Flush).is_err() {
115 error!("Failed to send final flush message. Cannot stop event processor");
116 return;
117 }
118
119 if self
120 .inbox_tx
121 .send(EventDispatcherMessage::Close(sender))
122 .is_err()
123 {
124 error!("Failed to send close message. Cannot stop event processor");
125 return;
126 }
127
128 let _ = receiver.recv();
129 }
130
131 fn flush_blocking(&self, timeout: Duration) -> bool {
132 let (sender, receiver) = bounded::<()>(1);
133
134 if self
135 .inbox_tx
136 .send(EventDispatcherMessage::FlushWithReply(sender))
137 .is_err()
138 {
139 error!("Failed to send flush_blocking message");
140 return false;
141 }
142
143 if timeout == Duration::ZERO {
144 match receiver.recv() {
146 Ok(()) => true,
147 Err(_) => {
148 error!("flush_blocking failed: event processor shut down");
149 false
150 }
151 }
152 } else {
153 match receiver.recv_timeout(timeout) {
155 Ok(()) => true,
156 Err(RecvTimeoutError::Timeout) => {
157 warn!("flush_blocking timed out after {timeout:?}");
158 false
159 }
160 Err(RecvTimeoutError::Disconnected) => {
161 error!("flush_blocking failed: event processor shut down");
162 false
163 }
164 }
165 }
166 }
167}
168
169#[cfg(test)]
170mod tests {
171 use std::time::Duration;
172
173 use launchdarkly_server_sdk_evaluation::{ContextBuilder, Detail, Flag, FlagValue, Reason};
174 use test_case::test_case;
175
176 use crate::{
177 events::{
178 create_event_sender, create_events_configuration,
179 event::{EventFactory, OutputEvent},
180 },
181 test_common::basic_flag,
182 };
183
184 use super::*;
185
186 struct FailingEventSender {
188 should_shutdown: bool,
189 }
190
191 impl FailingEventSender {
192 fn new(should_shutdown: bool) -> Self {
193 Self { should_shutdown }
194 }
195 }
196
197 impl crate::events::sender::EventSender for FailingEventSender {
198 fn send_event_data(
199 &self,
200 _events: Vec<OutputEvent>,
201 result_tx: crossbeam_channel::Sender<crate::events::sender::EventSenderResult>,
202 flush_signal: Option<crossbeam_channel::Sender<()>>,
203 ) -> futures::future::BoxFuture<'static, ()> {
204 let should_shutdown = self.should_shutdown;
205 Box::pin(async move {
206 let _ = result_tx.send(crate::events::sender::EventSenderResult {
208 time_from_server: 0,
209 success: false,
210 must_shutdown: should_shutdown,
211 flush_signal,
212 });
213 })
214 }
215 }
216
217 #[test]
218 fn calling_close_on_processor_twice_returns() {
219 let (event_sender, _) = create_event_sender();
220 let events_configuration =
221 create_events_configuration(event_sender, Duration::from_secs(100));
222 let event_processor =
223 EventProcessorImpl::new(events_configuration).expect("failed to start ep");
224 event_processor.close();
225 event_processor.close();
226 }
227
228 #[test_case(true, vec!["index", "feature", "summary"])]
229 #[test_case(false, vec!["index", "summary"])]
230 fn sending_feature_event_emits_correct_events(
231 flag_track_events: bool,
232 expected_event_types: Vec<&str>,
233 ) {
234 let mut flag = basic_flag("flag");
235 flag.track_events = flag_track_events;
236 let context = ContextBuilder::new("foo")
237 .build()
238 .expect("Failed to create context");
239 let detail = Detail {
240 value: Some(FlagValue::from(false)),
241 variation_index: Some(1),
242 reason: Reason::Fallthrough {
243 in_experiment: false,
244 },
245 };
246
247 let event_factory = EventFactory::new(true);
248 let feature_request = event_factory.new_eval_event(
249 &flag.key,
250 context,
251 &flag,
252 detail,
253 FlagValue::from(false),
254 None,
255 );
256
257 let (event_sender, event_rx) = create_event_sender();
258 let events_configuration =
259 create_events_configuration(event_sender, Duration::from_secs(100));
260 let event_processor =
261 EventProcessorImpl::new(events_configuration).expect("failed to start ep");
262 event_processor.send(feature_request);
263 event_processor.flush();
264 event_processor.close();
265
266 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
267 assert_eq!(expected_event_types.len(), events.len());
268
269 for event_type in expected_event_types {
270 assert!(events.iter().any(|e| e.kind() == event_type));
271 }
272 }
273
274 #[test]
275 fn sending_feature_event_with_rule_track_events_emits_feature_and_summary() {
276 let flag: Flag = serde_json::from_str(
277 r#"{
278 "key": "with_rule",
279 "on": true,
280 "targets": [],
281 "prerequisites": [],
282 "rules": [
283 {
284 "id": "rule-0",
285 "clauses": [{
286 "attribute": "key",
287 "negate": false,
288 "op": "matches",
289 "values": ["do-track"]
290 }],
291 "trackEvents": true,
292 "variation": 1
293 },
294 {
295 "id": "rule-1",
296 "clauses": [{
297 "attribute": "key",
298 "negate": false,
299 "op": "matches",
300 "values": ["no-track"]
301 }],
302 "trackEvents": false,
303 "variation": 1
304 }
305 ],
306 "fallthrough": {"variation": 0},
307 "trackEventsFallthrough": true,
308 "offVariation": 0,
309 "clientSideAvailability": {
310 "usingMobileKey": false,
311 "usingEnvironmentId": false
312 },
313 "salt": "kosher",
314 "version": 2,
315 "variations": [false, true]
316 }"#,
317 )
318 .expect("flag should parse");
319
320 let context_track_rule = ContextBuilder::new("do-track")
321 .build()
322 .expect("Failed to create context");
323 let context_notrack_rule = ContextBuilder::new("no-track")
324 .build()
325 .expect("Failed to create context");
326 let context_fallthrough = ContextBuilder::new("foo")
327 .build()
328 .expect("Failed to create context");
329
330 let detail_track_rule = Detail {
331 value: Some(FlagValue::from(true)),
332 variation_index: Some(1),
333 reason: Reason::RuleMatch {
334 rule_index: 0,
335 rule_id: "rule-0".into(),
336 in_experiment: false,
337 },
338 };
339 let detail_notrack_rule = Detail {
340 value: Some(FlagValue::from(true)),
341 variation_index: Some(1),
342 reason: Reason::RuleMatch {
343 rule_index: 1,
344 rule_id: "rule-1".into(),
345 in_experiment: false,
346 },
347 };
348 let detail_fallthrough = Detail {
349 value: Some(FlagValue::from(false)),
350 variation_index: Some(0),
351 reason: Reason::Fallthrough {
352 in_experiment: false,
353 },
354 };
355
356 let event_factory = EventFactory::new(true);
357 let fre_track_rule = event_factory.new_eval_event(
358 &flag.key,
359 context_track_rule,
360 &flag,
361 detail_track_rule,
362 FlagValue::from(false),
363 None,
364 );
365 let fre_notrack_rule = event_factory.new_eval_event(
366 &flag.key,
367 context_notrack_rule,
368 &flag,
369 detail_notrack_rule,
370 FlagValue::from(false),
371 None,
372 );
373 let fre_fallthrough = event_factory.new_eval_event(
374 &flag.key,
375 context_fallthrough,
376 &flag,
377 detail_fallthrough,
378 FlagValue::from(false),
379 None,
380 );
381
382 let (event_sender, event_rx) = create_event_sender();
383 let events_configuration =
384 create_events_configuration(event_sender, Duration::from_secs(100));
385 let event_processor =
386 EventProcessorImpl::new(events_configuration).expect("failed to start ep");
387
388 for fre in [fre_track_rule, fre_notrack_rule, fre_fallthrough] {
389 event_processor.send(fre);
390 }
391
392 event_processor.flush();
393 event_processor.close();
394
395 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
396
397 assert_eq!(events.len(), 2 + 1 + 2 + 1);
399 assert_eq!(
400 events
401 .iter()
402 .filter(|event| event.kind() == "feature")
403 .count(),
404 2
405 );
406 assert!(events.iter().any(|e| e.kind() == "index"));
407 assert!(events.iter().any(|e| e.kind() == "summary"));
408 }
409
410 #[test]
411 fn feature_events_dedupe_index_events() {
412 let flag = basic_flag("flag");
413 let context = ContextBuilder::new("bar")
414 .build()
415 .expect("Failed to create context");
416 let detail = Detail {
417 value: Some(FlagValue::from(false)),
418 variation_index: Some(1),
419 reason: Reason::Fallthrough {
420 in_experiment: false,
421 },
422 };
423
424 let event_factory = EventFactory::new(true);
425 let feature_request = event_factory.new_eval_event(
426 &flag.key,
427 context,
428 &flag,
429 detail,
430 FlagValue::from(false),
431 None,
432 );
433
434 let (event_sender, event_rx) = create_event_sender();
435 let events_configuration =
436 create_events_configuration(event_sender, Duration::from_secs(100));
437 let event_processor =
438 EventProcessorImpl::new(events_configuration).expect("failed to start ep");
439 event_processor.send(feature_request.clone());
440 event_processor.send(feature_request);
441 event_processor.flush();
442 event_processor.close();
443
444 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
445
446 assert_eq!(events.len(), 2);
447
448 assert_eq!(
449 events
450 .iter()
451 .filter(|event| event.kind() == "index")
452 .count(),
453 1
454 );
455
456 assert_eq!(
457 events
458 .iter()
459 .filter(|event| event.kind() == "summary")
460 .count(),
461 1
462 );
463 }
464
465 #[test]
466 fn flush_blocking_completes_successfully() {
467 let (event_sender, event_rx) = create_event_sender();
468 let events_configuration =
469 create_events_configuration(event_sender, Duration::from_secs(100));
470 let event_processor =
471 EventProcessorImpl::new(events_configuration).expect("failed to start ep");
472
473 let context = ContextBuilder::new("foo")
474 .build()
475 .expect("Failed to create context");
476 let event_factory = EventFactory::new(true);
477 event_processor.send(event_factory.new_identify(context));
478
479 let result = event_processor.flush_blocking(Duration::from_secs(5));
480 assert!(result, "flush_blocking should complete successfully");
481
482 event_processor.close();
483 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
484 assert_eq!(events.len(), 1);
485 }
486
487 #[test]
488 fn flush_blocking_with_very_short_timeout() {
489 let (event_sender, _) = create_event_sender();
490 let events_configuration =
491 create_events_configuration(event_sender, Duration::from_secs(100));
492 let event_processor =
493 EventProcessorImpl::new(events_configuration).expect("failed to start ep");
494
495 let event_factory = EventFactory::new(true);
496
497 for i in 0..100 {
499 let ctx = ContextBuilder::new(format!("user-{i}"))
500 .build()
501 .expect("Failed to create context");
502 event_processor.send(event_factory.new_identify(ctx));
503 }
504
505 let _result = event_processor.flush_blocking(Duration::from_nanos(1));
507
508 event_processor.close();
509 }
510
511 #[test]
512 fn flush_blocking_with_zero_timeout_waits() {
513 let (event_sender, event_rx) = create_event_sender();
514 let events_configuration =
515 create_events_configuration(event_sender, Duration::from_secs(100));
516 let event_processor =
517 EventProcessorImpl::new(events_configuration).expect("failed to start ep");
518
519 let context = ContextBuilder::new("foo")
520 .build()
521 .expect("Failed to create context");
522 let event_factory = EventFactory::new(true);
523 event_processor.send(event_factory.new_identify(context));
524
525 let result = event_processor.flush_blocking(Duration::ZERO);
526 assert!(
527 result,
528 "flush_blocking with zero timeout should complete successfully"
529 );
530
531 event_processor.close();
532 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
533 assert_eq!(events.len(), 1);
534 }
535
536 #[test]
537 fn flush_blocking_with_no_events_completes_immediately() {
538 let (event_sender, _) = create_event_sender();
539 let events_configuration =
540 create_events_configuration(event_sender, Duration::from_secs(100));
541 let event_processor =
542 EventProcessorImpl::new(events_configuration).expect("failed to start ep");
543
544 let result = event_processor.flush_blocking(Duration::from_secs(1));
545 assert!(
546 result,
547 "flush_blocking with no events should complete immediately"
548 );
549
550 event_processor.close();
551 }
552
553 #[test]
554 fn null_processor_flush_blocking_returns_true() {
555 let processor = NullEventProcessor::new();
556 assert!(processor.flush_blocking(Duration::from_secs(1)));
557 assert!(processor.flush_blocking(Duration::ZERO));
558 }
559
560 #[test]
561 fn flush_blocking_fails_when_processor_closed() {
562 let (event_sender, _) = create_event_sender();
563 let events_configuration =
564 create_events_configuration(event_sender, Duration::from_secs(100));
565 let event_processor =
566 EventProcessorImpl::new(events_configuration).expect("failed to start ep");
567
568 event_processor.close();
569
570 let result = event_processor.flush_blocking(Duration::from_secs(1));
571 assert!(
572 !result,
573 "flush_blocking should fail when processor is closed"
574 );
575 }
576
577 #[test]
578 fn flush_blocking_completes_on_recoverable_http_failure() {
579 use std::collections::HashSet;
580 use std::num::NonZeroUsize;
581 use std::sync::Arc;
582
583 let event_sender = FailingEventSender::new(false);
584 let events_configuration = crate::events::EventsConfiguration {
585 capacity: 5,
586 event_sender: Arc::new(event_sender),
587 flush_interval: Duration::from_secs(100),
588 context_keys_capacity: NonZeroUsize::new(5).expect("5 > 0"),
589 context_keys_flush_interval: Duration::from_secs(100),
590 all_attributes_private: false,
591 private_attributes: HashSet::new(),
592 omit_anonymous_contexts: false,
593 };
594 let event_processor =
595 EventProcessorImpl::new(events_configuration).expect("failed to start ep");
596
597 let context = ContextBuilder::new("foo")
598 .build()
599 .expect("Failed to create context");
600 let event_factory = EventFactory::new(true);
601 event_processor.send(event_factory.new_identify(context));
602
603 let result = event_processor.flush_blocking(Duration::from_secs(5));
605 assert!(
606 result,
607 "flush_blocking should complete even when HTTP send fails (recoverable)"
608 );
609
610 event_processor.close();
611 }
612
613 #[test]
614 fn flush_blocking_completes_on_unrecoverable_http_failure() {
615 use std::collections::HashSet;
616 use std::num::NonZeroUsize;
617 use std::sync::Arc;
618
619 let event_sender = FailingEventSender::new(true);
620 let events_configuration = crate::events::EventsConfiguration {
621 capacity: 5,
622 event_sender: Arc::new(event_sender),
623 flush_interval: Duration::from_secs(100),
624 context_keys_capacity: NonZeroUsize::new(5).expect("5 > 0"),
625 context_keys_flush_interval: Duration::from_secs(100),
626 all_attributes_private: false,
627 private_attributes: HashSet::new(),
628 omit_anonymous_contexts: false,
629 };
630 let event_processor =
631 EventProcessorImpl::new(events_configuration).expect("failed to start ep");
632
633 let context = ContextBuilder::new("foo")
634 .build()
635 .expect("Failed to create context");
636 let event_factory = EventFactory::new(true);
637 event_processor.send(event_factory.new_identify(context));
638
639 let result = event_processor.flush_blocking(Duration::from_secs(5));
641 assert!(
642 result,
643 "flush_blocking should complete even when HTTP send fails (unrecoverable)"
644 );
645
646 event_processor.close();
647 }
648
649 #[test]
650 fn flush_blocking_with_multiple_events_and_http_failures() {
651 use std::collections::HashSet;
652 use std::num::NonZeroUsize;
653 use std::sync::Arc;
654
655 let event_sender = FailingEventSender::new(false);
656 let events_configuration = crate::events::EventsConfiguration {
657 capacity: 5,
658 event_sender: Arc::new(event_sender),
659 flush_interval: Duration::from_secs(100),
660 context_keys_capacity: NonZeroUsize::new(5).expect("5 > 0"),
661 context_keys_flush_interval: Duration::from_secs(100),
662 all_attributes_private: false,
663 private_attributes: HashSet::new(),
664 omit_anonymous_contexts: false,
665 };
666 let event_processor =
667 EventProcessorImpl::new(events_configuration).expect("failed to start ep");
668
669 let event_factory = EventFactory::new(true);
670
671 for i in 0..10 {
673 let ctx = ContextBuilder::new(format!("user-{i}"))
674 .build()
675 .expect("Failed to create context");
676 event_processor.send(event_factory.new_identify(ctx));
677 }
678
679 let result = event_processor.flush_blocking(Duration::from_secs(5));
681 assert!(
682 result,
683 "flush_blocking should complete with multiple events despite HTTP failures"
684 );
685
686 event_processor.close();
687 }
688}