Skip to main content

launchdarkly_server_sdk/events/
processor.rs

1use crossbeam_channel::{bounded, RecvTimeoutError, Sender};
2use std::sync::Once;
3use std::thread;
4use std::time::Duration;
5use thiserror::Error;
6
7use super::dispatcher::{EventDispatcher, EventDispatcherMessage};
8use super::event::InputEvent;
9use super::EventsConfiguration;
10
11#[non_exhaustive]
12#[derive(Debug, Error)]
13pub enum EventProcessorError {
14    #[error(transparent)]
15    SpawnFailed(#[from] std::io::Error),
16}
17
18/// Trait for the component that buffers analytics events and sends them to LaunchDarkly.
19/// This component can be replaced for testing purposes.
20pub trait EventProcessor: Send + Sync {
21    /// Records an InputEvent asynchronously. Depending on the feature flag properties and event
22    /// properties, this may be transmitted to the events service as an individual event, or may
23    /// only be added into summary data.
24    fn send(&self, event: InputEvent);
25
26    /// Specifies that any buffered events should be sent as soon as possible, rather than waiting
27    /// for the next flush interval. This method is asynchronous, so events still may not be sent
28    /// until a later time.
29    fn flush(&self);
30
31    /// Shuts down all event processor activity, after first ensuring that all events have been
32    /// delivered. Subsequent calls to [EventProcessor::send] or [EventProcessor::flush] will be
33    /// ignored.
34    fn close(&self);
35
36    /// Tells the event processor that all pending analytics events should be delivered as soon as
37    /// possible, and blocks until delivery is complete or the timeout expires.
38    ///
39    /// This method triggers a flush of events currently in the outbox and waits for that specific
40    /// flush to complete. Note that if periodic flushes or other flush operations are in-flight
41    /// when this is called, those may still be completing after this method returns.
42    ///
43    /// # Arguments
44    ///
45    /// * `timeout` - Maximum time to wait for flush to complete. Use `Duration::ZERO` to wait indefinitely.
46    ///
47    /// # Returns
48    ///
49    /// Returns `true` if flush completed successfully, `false` if timeout occurred or the event
50    /// processor has been shut down.
51    fn flush_blocking(&self, timeout: std::time::Duration) -> bool;
52}
53
54pub struct NullEventProcessor {}
55
56impl NullEventProcessor {
57    pub fn new() -> Self {
58        Self {}
59    }
60}
61
62impl EventProcessor for NullEventProcessor {
63    fn send(&self, _: InputEvent) {}
64    fn flush(&self) {}
65    fn close(&self) {}
66    fn flush_blocking(&self, _timeout: std::time::Duration) -> bool {
67        true
68    }
69}
70
71pub struct EventProcessorImpl {
72    inbox_tx: Sender<EventDispatcherMessage>,
73    inbox_full_once: Once,
74}
75
76impl EventProcessorImpl {
77    pub fn new(events_configuration: EventsConfiguration) -> Result<Self, EventProcessorError> {
78        let (inbox_tx, inbox_rx) = bounded(events_configuration.capacity);
79        let dispatch_start = move || {
80            let mut dispatcher = EventDispatcher::new(events_configuration);
81            dispatcher.start(inbox_rx)
82        };
83
84        match thread::Builder::new().spawn(dispatch_start) {
85            Ok(_) => Ok(Self {
86                inbox_tx,
87                inbox_full_once: Once::new(),
88            }),
89            Err(e) => Err(EventProcessorError::SpawnFailed(e)),
90        }
91    }
92}
93
94impl EventProcessor for EventProcessorImpl {
95    fn send(&self, event: InputEvent) {
96        if self
97            .inbox_tx
98            .try_send(EventDispatcherMessage::EventMessage(event))
99            .is_err()
100        {
101            self.inbox_full_once.call_once(|| {
102                warn!("Events are being produced faster than they can be processed; some events will be dropped")
103            });
104        }
105    }
106
107    fn flush(&self) {
108        let _ = self.inbox_tx.try_send(EventDispatcherMessage::Flush);
109    }
110
111    fn close(&self) {
112        let (sender, receiver) = bounded::<()>(1);
113
114        if self.inbox_tx.send(EventDispatcherMessage::Flush).is_err() {
115            error!("Failed to send final flush message. Cannot stop event processor");
116            return;
117        }
118
119        if self
120            .inbox_tx
121            .send(EventDispatcherMessage::Close(sender))
122            .is_err()
123        {
124            error!("Failed to send close message. Cannot stop event processor");
125            return;
126        }
127
128        let _ = receiver.recv();
129    }
130
131    fn flush_blocking(&self, timeout: Duration) -> bool {
132        let (sender, receiver) = bounded::<()>(1);
133
134        if self
135            .inbox_tx
136            .send(EventDispatcherMessage::FlushWithReply(sender))
137            .is_err()
138        {
139            error!("Failed to send flush_blocking message");
140            return false;
141        }
142
143        if timeout == Duration::ZERO {
144            // Wait indefinitely
145            match receiver.recv() {
146                Ok(()) => true,
147                Err(_) => {
148                    error!("flush_blocking failed: event processor shut down");
149                    false
150                }
151            }
152        } else {
153            // Wait with timeout
154            match receiver.recv_timeout(timeout) {
155                Ok(()) => true,
156                Err(RecvTimeoutError::Timeout) => {
157                    warn!("flush_blocking timed out after {timeout:?}");
158                    false
159                }
160                Err(RecvTimeoutError::Disconnected) => {
161                    error!("flush_blocking failed: event processor shut down");
162                    false
163                }
164            }
165        }
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use std::time::Duration;
172
173    use launchdarkly_server_sdk_evaluation::{ContextBuilder, Detail, Flag, FlagValue, Reason};
174    use test_case::test_case;
175
176    use crate::{
177        events::{
178            create_event_sender, create_events_configuration,
179            event::{EventFactory, OutputEvent},
180        },
181        test_common::basic_flag,
182    };
183
184    use super::*;
185
186    // Helper to create a failing event sender for testing
187    struct FailingEventSender {
188        should_shutdown: bool,
189    }
190
191    impl FailingEventSender {
192        fn new(should_shutdown: bool) -> Self {
193            Self { should_shutdown }
194        }
195    }
196
197    impl crate::events::sender::EventSender for FailingEventSender {
198        fn send_event_data(
199            &self,
200            _events: Vec<OutputEvent>,
201            result_tx: crossbeam_channel::Sender<crate::events::sender::EventSenderResult>,
202            flush_signal: Option<crossbeam_channel::Sender<()>>,
203        ) -> futures::future::BoxFuture<'static, ()> {
204            let should_shutdown = self.should_shutdown;
205            Box::pin(async move {
206                // Simulate a failed HTTP send
207                let _ = result_tx.send(crate::events::sender::EventSenderResult {
208                    time_from_server: 0,
209                    success: false,
210                    must_shutdown: should_shutdown,
211                    flush_signal,
212                });
213            })
214        }
215    }
216
217    #[test]
218    fn calling_close_on_processor_twice_returns() {
219        let (event_sender, _) = create_event_sender();
220        let events_configuration =
221            create_events_configuration(event_sender, Duration::from_secs(100));
222        let event_processor =
223            EventProcessorImpl::new(events_configuration).expect("failed to start ep");
224        event_processor.close();
225        event_processor.close();
226    }
227
228    #[test_case(true, vec!["index", "feature", "summary"])]
229    #[test_case(false, vec!["index", "summary"])]
230    fn sending_feature_event_emits_correct_events(
231        flag_track_events: bool,
232        expected_event_types: Vec<&str>,
233    ) {
234        let mut flag = basic_flag("flag");
235        flag.track_events = flag_track_events;
236        let context = ContextBuilder::new("foo")
237            .build()
238            .expect("Failed to create context");
239        let detail = Detail {
240            value: Some(FlagValue::from(false)),
241            variation_index: Some(1),
242            reason: Reason::Fallthrough {
243                in_experiment: false,
244            },
245        };
246
247        let event_factory = EventFactory::new(true);
248        let feature_request = event_factory.new_eval_event(
249            &flag.key,
250            context,
251            &flag,
252            detail,
253            FlagValue::from(false),
254            None,
255        );
256
257        let (event_sender, event_rx) = create_event_sender();
258        let events_configuration =
259            create_events_configuration(event_sender, Duration::from_secs(100));
260        let event_processor =
261            EventProcessorImpl::new(events_configuration).expect("failed to start ep");
262        event_processor.send(feature_request);
263        event_processor.flush();
264        event_processor.close();
265
266        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
267        assert_eq!(expected_event_types.len(), events.len());
268
269        for event_type in expected_event_types {
270            assert!(events.iter().any(|e| e.kind() == event_type));
271        }
272    }
273
274    #[test]
275    fn sending_feature_event_with_rule_track_events_emits_feature_and_summary() {
276        let flag: Flag = serde_json::from_str(
277            r#"{
278                 "key": "with_rule",
279                 "on": true,
280                 "targets": [],
281                 "prerequisites": [],
282                 "rules": [
283                   {
284                     "id": "rule-0",
285                     "clauses": [{
286                       "attribute": "key",
287                       "negate": false,
288                       "op": "matches",
289                       "values": ["do-track"]
290                     }],
291                     "trackEvents": true,
292                     "variation": 1
293                   },
294                   {
295                     "id": "rule-1",
296                     "clauses": [{
297                       "attribute": "key",
298                       "negate": false,
299                       "op": "matches",
300                       "values": ["no-track"]
301                     }],
302                     "trackEvents": false,
303                     "variation": 1
304                   }
305                 ],
306                 "fallthrough": {"variation": 0},
307                 "trackEventsFallthrough": true,
308                 "offVariation": 0,
309                 "clientSideAvailability": {
310                   "usingMobileKey": false,
311                   "usingEnvironmentId": false
312                 },
313                 "salt": "kosher",
314                 "version": 2,
315                 "variations": [false, true]
316               }"#,
317        )
318        .expect("flag should parse");
319
320        let context_track_rule = ContextBuilder::new("do-track")
321            .build()
322            .expect("Failed to create context");
323        let context_notrack_rule = ContextBuilder::new("no-track")
324            .build()
325            .expect("Failed to create context");
326        let context_fallthrough = ContextBuilder::new("foo")
327            .build()
328            .expect("Failed to create context");
329
330        let detail_track_rule = Detail {
331            value: Some(FlagValue::from(true)),
332            variation_index: Some(1),
333            reason: Reason::RuleMatch {
334                rule_index: 0,
335                rule_id: "rule-0".into(),
336                in_experiment: false,
337            },
338        };
339        let detail_notrack_rule = Detail {
340            value: Some(FlagValue::from(true)),
341            variation_index: Some(1),
342            reason: Reason::RuleMatch {
343                rule_index: 1,
344                rule_id: "rule-1".into(),
345                in_experiment: false,
346            },
347        };
348        let detail_fallthrough = Detail {
349            value: Some(FlagValue::from(false)),
350            variation_index: Some(0),
351            reason: Reason::Fallthrough {
352                in_experiment: false,
353            },
354        };
355
356        let event_factory = EventFactory::new(true);
357        let fre_track_rule = event_factory.new_eval_event(
358            &flag.key,
359            context_track_rule,
360            &flag,
361            detail_track_rule,
362            FlagValue::from(false),
363            None,
364        );
365        let fre_notrack_rule = event_factory.new_eval_event(
366            &flag.key,
367            context_notrack_rule,
368            &flag,
369            detail_notrack_rule,
370            FlagValue::from(false),
371            None,
372        );
373        let fre_fallthrough = event_factory.new_eval_event(
374            &flag.key,
375            context_fallthrough,
376            &flag,
377            detail_fallthrough,
378            FlagValue::from(false),
379            None,
380        );
381
382        let (event_sender, event_rx) = create_event_sender();
383        let events_configuration =
384            create_events_configuration(event_sender, Duration::from_secs(100));
385        let event_processor =
386            EventProcessorImpl::new(events_configuration).expect("failed to start ep");
387
388        for fre in [fre_track_rule, fre_notrack_rule, fre_fallthrough] {
389            event_processor.send(fre);
390        }
391
392        event_processor.flush();
393        event_processor.close();
394
395        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
396
397        // detail_track_rule -> feature + index, detail_notrack_rule -> index, detail_fallthrough -> feature + index, 1 summary
398        assert_eq!(events.len(), 2 + 1 + 2 + 1);
399        assert_eq!(
400            events
401                .iter()
402                .filter(|event| event.kind() == "feature")
403                .count(),
404            2
405        );
406        assert!(events.iter().any(|e| e.kind() == "index"));
407        assert!(events.iter().any(|e| e.kind() == "summary"));
408    }
409
410    #[test]
411    fn feature_events_dedupe_index_events() {
412        let flag = basic_flag("flag");
413        let context = ContextBuilder::new("bar")
414            .build()
415            .expect("Failed to create context");
416        let detail = Detail {
417            value: Some(FlagValue::from(false)),
418            variation_index: Some(1),
419            reason: Reason::Fallthrough {
420                in_experiment: false,
421            },
422        };
423
424        let event_factory = EventFactory::new(true);
425        let feature_request = event_factory.new_eval_event(
426            &flag.key,
427            context,
428            &flag,
429            detail,
430            FlagValue::from(false),
431            None,
432        );
433
434        let (event_sender, event_rx) = create_event_sender();
435        let events_configuration =
436            create_events_configuration(event_sender, Duration::from_secs(100));
437        let event_processor =
438            EventProcessorImpl::new(events_configuration).expect("failed to start ep");
439        event_processor.send(feature_request.clone());
440        event_processor.send(feature_request);
441        event_processor.flush();
442        event_processor.close();
443
444        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
445
446        assert_eq!(events.len(), 2);
447
448        assert_eq!(
449            events
450                .iter()
451                .filter(|event| event.kind() == "index")
452                .count(),
453            1
454        );
455
456        assert_eq!(
457            events
458                .iter()
459                .filter(|event| event.kind() == "summary")
460                .count(),
461            1
462        );
463    }
464
465    #[test]
466    fn flush_blocking_completes_successfully() {
467        let (event_sender, event_rx) = create_event_sender();
468        let events_configuration =
469            create_events_configuration(event_sender, Duration::from_secs(100));
470        let event_processor =
471            EventProcessorImpl::new(events_configuration).expect("failed to start ep");
472
473        let context = ContextBuilder::new("foo")
474            .build()
475            .expect("Failed to create context");
476        let event_factory = EventFactory::new(true);
477        event_processor.send(event_factory.new_identify(context));
478
479        let result = event_processor.flush_blocking(Duration::from_secs(5));
480        assert!(result, "flush_blocking should complete successfully");
481
482        event_processor.close();
483        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
484        assert_eq!(events.len(), 1);
485    }
486
487    #[test]
488    fn flush_blocking_with_very_short_timeout() {
489        let (event_sender, _) = create_event_sender();
490        let events_configuration =
491            create_events_configuration(event_sender, Duration::from_secs(100));
492        let event_processor =
493            EventProcessorImpl::new(events_configuration).expect("failed to start ep");
494
495        let event_factory = EventFactory::new(true);
496
497        // Send many events to increase the chance of timeout
498        for i in 0..100 {
499            let ctx = ContextBuilder::new(format!("user-{i}"))
500                .build()
501                .expect("Failed to create context");
502            event_processor.send(event_factory.new_identify(ctx));
503        }
504
505        // Very short timeout may or may not complete - just verify it doesn't panic
506        let _result = event_processor.flush_blocking(Duration::from_nanos(1));
507
508        event_processor.close();
509    }
510
511    #[test]
512    fn flush_blocking_with_zero_timeout_waits() {
513        let (event_sender, event_rx) = create_event_sender();
514        let events_configuration =
515            create_events_configuration(event_sender, Duration::from_secs(100));
516        let event_processor =
517            EventProcessorImpl::new(events_configuration).expect("failed to start ep");
518
519        let context = ContextBuilder::new("foo")
520            .build()
521            .expect("Failed to create context");
522        let event_factory = EventFactory::new(true);
523        event_processor.send(event_factory.new_identify(context));
524
525        let result = event_processor.flush_blocking(Duration::ZERO);
526        assert!(
527            result,
528            "flush_blocking with zero timeout should complete successfully"
529        );
530
531        event_processor.close();
532        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
533        assert_eq!(events.len(), 1);
534    }
535
536    #[test]
537    fn flush_blocking_with_no_events_completes_immediately() {
538        let (event_sender, _) = create_event_sender();
539        let events_configuration =
540            create_events_configuration(event_sender, Duration::from_secs(100));
541        let event_processor =
542            EventProcessorImpl::new(events_configuration).expect("failed to start ep");
543
544        let result = event_processor.flush_blocking(Duration::from_secs(1));
545        assert!(
546            result,
547            "flush_blocking with no events should complete immediately"
548        );
549
550        event_processor.close();
551    }
552
553    #[test]
554    fn null_processor_flush_blocking_returns_true() {
555        let processor = NullEventProcessor::new();
556        assert!(processor.flush_blocking(Duration::from_secs(1)));
557        assert!(processor.flush_blocking(Duration::ZERO));
558    }
559
560    #[test]
561    fn flush_blocking_fails_when_processor_closed() {
562        let (event_sender, _) = create_event_sender();
563        let events_configuration =
564            create_events_configuration(event_sender, Duration::from_secs(100));
565        let event_processor =
566            EventProcessorImpl::new(events_configuration).expect("failed to start ep");
567
568        event_processor.close();
569
570        let result = event_processor.flush_blocking(Duration::from_secs(1));
571        assert!(
572            !result,
573            "flush_blocking should fail when processor is closed"
574        );
575    }
576
577    #[test]
578    fn flush_blocking_completes_on_recoverable_http_failure() {
579        use std::collections::HashSet;
580        use std::num::NonZeroUsize;
581        use std::sync::Arc;
582
583        let event_sender = FailingEventSender::new(false);
584        let events_configuration = crate::events::EventsConfiguration {
585            capacity: 5,
586            event_sender: Arc::new(event_sender),
587            flush_interval: Duration::from_secs(100),
588            context_keys_capacity: NonZeroUsize::new(5).expect("5 > 0"),
589            context_keys_flush_interval: Duration::from_secs(100),
590            all_attributes_private: false,
591            private_attributes: HashSet::new(),
592            omit_anonymous_contexts: false,
593        };
594        let event_processor =
595            EventProcessorImpl::new(events_configuration).expect("failed to start ep");
596
597        let context = ContextBuilder::new("foo")
598            .build()
599            .expect("Failed to create context");
600        let event_factory = EventFactory::new(true);
601        event_processor.send(event_factory.new_identify(context));
602
603        // Even though HTTP fails, flush_blocking should complete (not hang)
604        let result = event_processor.flush_blocking(Duration::from_secs(5));
605        assert!(
606            result,
607            "flush_blocking should complete even when HTTP send fails (recoverable)"
608        );
609
610        event_processor.close();
611    }
612
613    #[test]
614    fn flush_blocking_completes_on_unrecoverable_http_failure() {
615        use std::collections::HashSet;
616        use std::num::NonZeroUsize;
617        use std::sync::Arc;
618
619        let event_sender = FailingEventSender::new(true);
620        let events_configuration = crate::events::EventsConfiguration {
621            capacity: 5,
622            event_sender: Arc::new(event_sender),
623            flush_interval: Duration::from_secs(100),
624            context_keys_capacity: NonZeroUsize::new(5).expect("5 > 0"),
625            context_keys_flush_interval: Duration::from_secs(100),
626            all_attributes_private: false,
627            private_attributes: HashSet::new(),
628            omit_anonymous_contexts: false,
629        };
630        let event_processor =
631            EventProcessorImpl::new(events_configuration).expect("failed to start ep");
632
633        let context = ContextBuilder::new("foo")
634            .build()
635            .expect("Failed to create context");
636        let event_factory = EventFactory::new(true);
637        event_processor.send(event_factory.new_identify(context));
638
639        // Even with must_shutdown=true, flush_blocking should complete (not hang)
640        let result = event_processor.flush_blocking(Duration::from_secs(5));
641        assert!(
642            result,
643            "flush_blocking should complete even when HTTP send fails (unrecoverable)"
644        );
645
646        event_processor.close();
647    }
648
649    #[test]
650    fn flush_blocking_with_multiple_events_and_http_failures() {
651        use std::collections::HashSet;
652        use std::num::NonZeroUsize;
653        use std::sync::Arc;
654
655        let event_sender = FailingEventSender::new(false);
656        let events_configuration = crate::events::EventsConfiguration {
657            capacity: 5,
658            event_sender: Arc::new(event_sender),
659            flush_interval: Duration::from_secs(100),
660            context_keys_capacity: NonZeroUsize::new(5).expect("5 > 0"),
661            context_keys_flush_interval: Duration::from_secs(100),
662            all_attributes_private: false,
663            private_attributes: HashSet::new(),
664            omit_anonymous_contexts: false,
665        };
666        let event_processor =
667            EventProcessorImpl::new(events_configuration).expect("failed to start ep");
668
669        let event_factory = EventFactory::new(true);
670
671        // Send multiple events
672        for i in 0..10 {
673            let ctx = ContextBuilder::new(format!("user-{i}"))
674                .build()
675                .expect("Failed to create context");
676            event_processor.send(event_factory.new_identify(ctx));
677        }
678
679        // flush_blocking should complete even with multiple events and HTTP failures
680        let result = event_processor.flush_blocking(Duration::from_secs(5));
681        assert!(
682            result,
683            "flush_blocking should complete with multiple events despite HTTP failures"
684        );
685
686        event_processor.close();
687    }
688}