evident/
publisher.rs

1//! Contains the [`EvidentPublisher`] struct that is used to create public static publishers.
2//!
3//! Use the [`create_static_publisher`](crate::create_static_publisher) macro for a convenience wrapper to create a publisher.
4//!
5//! [req:pub]
6
7use std::{
8    collections::{HashMap, HashSet},
9    sync::{
10        atomic::{AtomicBool, AtomicUsize, Ordering},
11        mpsc::{self, SyncSender, TrySendError},
12        Arc, RwLock,
13    },
14    thread,
15};
16
17use crate::{
18    event::{entry::EventEntry, filter::Filter, intermediary::IntermediaryEvent, Event, Id, Msg},
19    subscription::{Subscription, SubscriptionError, SubscriptionSender},
20    this_origin,
21};
22
23/// Trait to implement for [`Id`], to control the publisher and all listeners.
24///
25/// [req:cap.ctrl]
26pub trait CaptureControl {
27    /// Returns `true` if the given [`Id`] is used to signal the start of event capturing.
28    ///
29    /// **Possible implementation:**
30    ///
31    /// ```ignore
32    /// id == &START_CAPTURING_ID
33    /// ```
34    ///
35    /// [req:cap.ctrl.start]
36    fn start(id: &Self) -> bool;
37
38    /// Returns the *start-ID*.
39    ///
40    /// [req:cap.ctrl.start]
41    fn start_id() -> Self;
42
43    /// Returns `true` if the given [`Id`] is used to signal the end of event capturing.
44    ///
45    /// **Possible implementation:**
46    ///
47    /// ```ignore
48    /// id == &STOP_CAPTURING_ID
49    /// ```
50    ///
51    /// [req:cap.ctrl.stop]
52    fn stop(id: &Self) -> bool;
53
54    /// Returns the *stop-ID*.
55    ///
56    /// [req:cap.ctrl.stop]
57    fn stop_id() -> Self;
58}
59
60/// Returns `true` if the given [`Id`] is used to control capturing.
61///
62/// [req:cap.ctrl]
63pub fn is_control_id(id: &impl CaptureControl) -> bool {
64    CaptureControl::stop(id) || CaptureControl::start(id)
65}
66
67/// Defines the capture mode for a publisher.
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69pub enum CaptureMode {
70    /// Event capturing may be blocking if the capture buffer is full.
71    Blocking,
72    /// Event capturing does not block, resulting in events **not** being captured if the capture buffer is full.
73    ///
74    /// You may inspect the number of missed events with the `get_missed_captures()` of the [`EvidentPublisher`].
75    NonBlocking,
76}
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
79pub enum EventTimestampKind {
80    /// Sets the event time, when the event is captured.
81    ///
82    /// **Note:** With this setting, event timestamps might show incorrect order in case of concurrent events, because events are buffered before capturing.
83    ///
84    /// **Note:** This has slightly better performance on the thread setting an event, because system time access is delayed to the capturing thread.
85    Captured,
86    /// Sets the event time, when the event is created.
87    ///
88    /// **Note:** This has slightly worse performance on the thread setting an event, because system time access most likely requires a context switch.
89    Created,
90}
91
92// Types below used for better clarity according to clippy.
93
94type Subscriber<K, M, T> = HashMap<crate::uuid::Uuid, SubscriptionSender<K, M, T>>;
95type IdSubscriber<K, M, T> = HashMap<K, Subscriber<K, M, T>>;
96type Capturer<K, M, T> = SyncSender<Event<K, M, T>>;
97
98/// An **EvidentPublisher** is used to capture, publish, and manage subscriptions.
99///
100/// [req:pub]
101pub struct EvidentPublisher<K, M, T, F>
102where
103    K: Id + CaptureControl,
104    M: Msg,
105    T: EventEntry<K, M>,
106    F: Filter<K, M>,
107{
108    /// The hashmap of subscribers listening to specific events.
109    ///
110    /// [req:subs.specific]
111    pub(crate) subscriptions: Arc<RwLock<IdSubscriber<K, M, T>>>,
112
113    /// The hashmap of subscribers listening to all events.
114    ///
115    /// [req:subs.all]
116    pub(crate) any_event: Arc<RwLock<Subscriber<K, M, T>>>,
117
118    /// The send-part of the capturing channel.
119    ///
120    /// [req:cap]
121    pub(crate) capturer: Capturer<K, M, T>,
122
123    /// Optional filter that is applied when capturing events.
124    ///
125    /// [req:cap.filter]
126    filter: Option<F>,
127
128    /// Flag to control if capturing is active or inactive.
129    ///
130    /// [req:cap.ctrl]
131    capturing: Arc<AtomicBool>,
132
133    /// Flag to control the capture mode.
134    capture_blocking: Arc<AtomicBool>,
135
136    /// Defines the size of the capturing send-buffer.
137    ///
138    /// [req:cap]
139    capture_channel_bound: usize,
140
141    /// Defines the size of each subscription send-buffer.
142    ///
143    /// [req:subs]
144    subscription_channel_bound: usize,
145
146    /// Number of missed captures in *non-blocking* capture mode.
147    missed_captures: Arc<AtomicUsize>,
148
149    /// Defines at what point the event-timestamp is created.
150    timestamp_kind: EventTimestampKind,
151}
152
153impl<K, M, T, F> EvidentPublisher<K, M, T, F>
154where
155    K: Id + CaptureControl,
156    M: Msg,
157    T: EventEntry<K, M>,
158    F: Filter<K, M>,
159{
160    /// Create a new [`EvidentPublisher`], and spawn a new event handler thread for events captured by the publisher.
161    ///
162    /// **Note:** You should use the macro [`create_static_publisher`](crate::create_static_publisher) instead.
163    ///
164    /// [req:pub]
165    fn create(
166        mut on_event: impl FnMut(Event<K, M, T>) + std::marker::Send + 'static,
167        filter: Option<F>,
168        capture_mode: CaptureMode,
169        capture_channel_bound: usize,
170        subscription_channel_bound: usize,
171        timestamp_kind: EventTimestampKind,
172    ) -> Self {
173        let (send, recv): (SyncSender<Event<K, M, T>>, _) =
174            mpsc::sync_channel(capture_channel_bound);
175
176        // [req:pub.threaded]
177        thread::spawn(move || {
178            while let Ok(mut event) = recv.recv() {
179                if timestamp_kind == EventTimestampKind::Captured {
180                    event.timestamp = Some(std::time::SystemTime::now());
181                }
182
183                on_event(event);
184            }
185        });
186
187        let mode = match capture_mode {
188            CaptureMode::Blocking => Arc::new(AtomicBool::new(true)),
189            CaptureMode::NonBlocking => Arc::new(AtomicBool::new(false)),
190        };
191
192        EvidentPublisher {
193            subscriptions: Arc::new(RwLock::new(HashMap::new())),
194            any_event: Arc::new(RwLock::new(HashMap::new())),
195            capturer: send,
196            filter,
197            // [req:cap.ctrl.init]
198            capturing: Arc::new(AtomicBool::new(true)),
199            capture_blocking: mode,
200            capture_channel_bound,
201            subscription_channel_bound,
202            missed_captures: Arc::new(AtomicUsize::new(0)),
203            timestamp_kind,
204        }
205    }
206
207    /// Create a new [`EvidentPublisher`] without an event filter.
208    ///
209    /// **Note:** You should use the macro [`create_static_publisher`](crate::create_static_publisher) instead.
210    ///
211    /// [req:pub]
212    pub fn new(
213        on_event: impl FnMut(Event<K, M, T>) + std::marker::Send + 'static,
214        capture_mode: CaptureMode,
215        capture_channel_bound: usize,
216        subscription_channel_bound: usize,
217        time_stamp_kind: EventTimestampKind,
218    ) -> Self {
219        Self::create(
220            on_event,
221            None,
222            capture_mode,
223            capture_channel_bound,
224            subscription_channel_bound,
225            time_stamp_kind,
226        )
227    }
228
229    /// Create a new [`EvidentPublisher`] with an event filter.
230    ///
231    /// **Note:** You should use the macro [`create_static_publisher`](crate::create_static_publisher) instead.
232    ///
233    /// [req:pub], [req:cap.filter]
234    pub fn with(
235        on_event: impl FnMut(Event<K, M, T>) + std::marker::Send + 'static,
236        filter: F,
237        capture_mode: CaptureMode,
238        capture_channel_bound: usize,
239        subscription_channel_bound: usize,
240        timestamp_kind: EventTimestampKind,
241    ) -> Self {
242        Self::create(
243            on_event,
244            Some(filter),
245            capture_mode,
246            capture_channel_bound,
247            subscription_channel_bound,
248            timestamp_kind,
249        )
250    }
251
252    /// Returns the event filter, or `None` if no filter is set.
253    ///
254    /// [req:cap.filter]
255    pub fn get_filter(&self) -> &Option<F> {
256        &self.filter
257    }
258
259    /// Returns `true` if the given event-entry passes the filter, or the event-ID is a control-ID.
260    ///
261    /// [req:cap.filter]
262    pub fn entry_allowed(&self, entry: &impl EventEntry<K, M>) -> bool {
263        if !is_control_id(entry.get_event_id()) {
264            if !self.capturing.load(Ordering::Acquire) {
265                return false;
266            }
267
268            if let Some(filter) = &self.filter {
269                if !filter.allow_entry(entry) {
270                    return false;
271                }
272            }
273        }
274
275        true
276    }
277
278    /// Captures an intermediary event, and sends the resulting event to the event handler.
279    ///
280    /// **Note:** This function should **not** be called manually, because it is automatically called on `drop()` of an intermediary event.
281    ///
282    /// [req:cap]
283    #[doc(hidden)]
284    pub fn _capture<I: IntermediaryEvent<K, M, T>>(&self, interm_event: &mut I) {
285        let entry = interm_event.take_entry();
286
287        // [req:cap.filter]
288        if !self.entry_allowed(&entry) {
289            return;
290        }
291
292        let mut event = Event::new(entry);
293        if self.timestamp_kind == EventTimestampKind::Created {
294            event.timestamp = Some(std::time::SystemTime::now());
295        }
296
297        if self.capture_blocking.load(Ordering::Acquire) {
298            let _ = self.capturer.send(event);
299        } else {
300            let res = self.capturer.try_send(event);
301
302            if let Err(TrySendError::Full(_)) = res {
303                // Note: If another thread has missed captures at the same moment, the count may be inaccurate, because there is no lock.
304                // This should still be fine, since
305                // - highly unlikely to happen during production with reasonable channel bounds and number of logs captured
306                // - count is still increased, and any increase in missed captures is bad (+/- one or two is irrelevant)
307                let missed_captures = self.missed_captures.load(Ordering::Relaxed);
308                if missed_captures < usize::MAX {
309                    self.missed_captures
310                        .store(missed_captures + 1, Ordering::Relaxed);
311                }
312            }
313        }
314    }
315
316    /// Returns the current capture mode.
317    pub fn get_capture_mode(&self) -> CaptureMode {
318        if self.capture_blocking.load(Ordering::Acquire) {
319            CaptureMode::Blocking
320        } else {
321            CaptureMode::NonBlocking
322        }
323    }
324
325    /// Allows to change the capture mode.
326    pub fn set_capture_mode(&self, mode: CaptureMode) {
327        match mode {
328            CaptureMode::Blocking => self.capture_blocking.store(true, Ordering::Release),
329            CaptureMode::NonBlocking => self.capture_blocking.store(false, Ordering::Release),
330        }
331    }
332
333    /// Returns the number of missed captures in *non-blocking* mode since last reset.
334    pub fn get_missed_captures(&self) -> usize {
335        self.missed_captures.load(Ordering::Relaxed)
336    }
337
338    /// Resets the number of missed captures in *non-blocking* mode.
339    pub fn reset_missed_captures(&self) {
340        self.missed_captures.store(0, Ordering::Relaxed);
341    }
342
343    /// Returns a subscription to events with the given event-ID,
344    /// or a [`SubscriptionError<K>`] if the subscription could not be created.
345    ///
346    /// [req:subs.specific.one]
347    pub fn subscribe(&self, id: K) -> Result<Subscription<K, M, T, F>, SubscriptionError<K>> {
348        self.subscribe_to_many(vec![id])
349    }
350
351    /// Returns a subscription to events with the given event-IDs,
352    /// or a [`SubscriptionError<K>`] if the subscription could not be created.
353    ///
354    /// [req:subs.specific.mult]
355    pub fn subscribe_to_many(
356        &self,
357        ids: Vec<K>,
358    ) -> Result<Subscription<K, M, T, F>, SubscriptionError<K>> {
359        // Note: Number of ids to listen to most likely affects the number of received events => number is added to channel bound
360        // Addition instead of multiplication, because even distribution accross events is highly unlikely.
361        let (sender, receiver) = mpsc::sync_channel(ids.len() + self.subscription_channel_bound);
362        let channel_id = crate::uuid::Uuid::new_v4();
363        let subscription_sender = SubscriptionSender { channel_id, sender };
364
365        match self.subscriptions.write().ok() {
366            Some(mut locked_subs) => {
367                for id in ids.clone() {
368                    let entry = locked_subs.entry(id.clone());
369                    entry
370                        .and_modify(|v| {
371                            v.insert(subscription_sender.channel_id, subscription_sender.clone());
372                        })
373                        .or_insert({
374                            let mut h = HashMap::new();
375                            h.insert(subscription_sender.channel_id, subscription_sender.clone());
376                            h
377                        });
378                }
379            }
380            None => {
381                return Err(SubscriptionError::CouldNotAccessPublisher);
382            }
383        }
384
385        Ok(Subscription {
386            channel_id,
387            receiver,
388            sub_to_all: false,
389            subscriptions: Some(HashSet::from_iter(ids)),
390            publisher: self,
391        })
392    }
393
394    /// Returns a subscription to all events,
395    /// or a [`SubscriptionError<K>`] if the subscription could not be created.
396    ///
397    /// [req:subs.all]
398    pub fn subscribe_to_all_events(
399        &self,
400    ) -> Result<Subscription<K, M, T, F>, SubscriptionError<K>> {
401        let (sender, receiver) = mpsc::sync_channel(self.capture_channel_bound);
402        let channel_id = crate::uuid::Uuid::new_v4();
403
404        match self.any_event.write().ok() {
405            Some(mut locked_vec) => {
406                locked_vec.insert(channel_id, SubscriptionSender { channel_id, sender });
407            }
408            None => {
409                return Err(SubscriptionError::CouldNotAccessPublisher);
410            }
411        }
412
413        Ok(Subscription {
414            channel_id,
415            receiver,
416            sub_to_all: true,
417            subscriptions: None,
418            publisher: self,
419        })
420    }
421
422    /// Returns `true` if capturing is *active*.
423    ///
424    /// [req:cap.ctrl.info]
425    pub fn is_capturing(&self) -> bool {
426        self.capturing.load(Ordering::Acquire)
427    }
428
429    /// Start capturing.
430    ///
431    /// **Note:** Capturing is already started initially, so this function is only needed after manually stopping capturing.
432    ///
433    /// [req:cap.ctrl.start]
434    pub fn start(&self) {
435        let empty_msg: Option<M> = None;
436        let start_event = Event::new(EventEntry::new(K::start_id(), empty_msg, this_origin!()));
437
438        let _ = self.capturer.send(start_event);
439
440        self.capturing.store(true, Ordering::Release);
441    }
442
443    /// Stop capturing.
444    ///
445    /// [req:cap.ctrl.stop]
446    pub fn stop(&self) {
447        let empty_msg: Option<M> = None;
448        let stop_event = Event::new(EventEntry::new(K::stop_id(), empty_msg, this_origin!()));
449
450        let _ = self.capturer.send(stop_event);
451
452        self.capturing.store(false, Ordering::Release);
453    }
454
455    /// Send the given event to all subscriber of the event.
456    ///
457    /// **Note:** This function should **not** be called manually, because it is already called in the event handler.
458    ///
459    /// [req:cap]
460    #[doc(hidden)]
461    pub fn on_event(&self, event: Event<K, M, T>) {
462        let arc_event = Arc::new(event);
463        let key = arc_event.entry.get_event_id();
464
465        let mut bad_subs: Vec<crate::uuid::Uuid> = Vec::new();
466        let mut bad_any_event: Vec<crate::uuid::Uuid> = Vec::new();
467
468        if let Ok(locked_subscriptions) = self.subscriptions.read() {
469            if let Some(sub_senders) = locked_subscriptions.get(key) {
470                for (channel_id, sub_sender) in sub_senders.iter() {
471                    let bad_channel = if self.capture_blocking.load(Ordering::Acquire) {
472                        sub_sender.sender.send(arc_event.clone()).is_err()
473                    } else {
474                        matches!(
475                            sub_sender.sender.try_send(arc_event.clone()),
476                            Err(TrySendError::Disconnected(_))
477                        )
478                    };
479
480                    if bad_channel {
481                        bad_subs.push(*channel_id);
482                    }
483                }
484            }
485        }
486
487        if let Ok(locked_vec) = self.any_event.read() {
488            for (channel_id, any_event_sender) in locked_vec.iter() {
489                let bad_channel = if self.capture_blocking.load(Ordering::Acquire) {
490                    any_event_sender.sender.send(arc_event.clone()).is_err()
491                } else {
492                    matches!(
493                        any_event_sender.sender.try_send(arc_event.clone()),
494                        Err(TrySendError::Disconnected(_))
495                    )
496                };
497
498                if bad_channel {
499                    bad_any_event.push(*channel_id);
500                }
501            }
502        }
503
504        // Remove dead channels
505        if !bad_subs.is_empty() {
506            if let Ok(mut locked_subscriptions) = self.subscriptions.write() {
507                let mut entry = locked_subscriptions.entry(key.clone());
508                for i in bad_subs {
509                    entry = entry.and_modify(|v| {
510                        v.remove(&i);
511                    });
512                }
513            }
514        }
515
516        if !bad_any_event.is_empty() {
517            if let Ok(mut locked_vec) = self.any_event.write() {
518                for i in bad_any_event {
519                    locked_vec.remove(&i);
520                }
521            }
522        }
523    }
524}