emit_core/
emitter.rs

1/*!
2The [`Emitter`] type.
3
4Emitters are the receivers of diagnostic data in the form of [`Event`]s. A typical emitter will translate and forward those events to some outside observer. That could be the console, rolling files, remote observability system, or anything else.
5
6Emitters are _non-blocking_ and _asynchronous_. Emitted diagnostics are not guaranteed to have been fully processed until a call to [`Emitter::blocking_flush`].
7*/
8
9use core::time::Duration;
10
11use crate::{
12    and::And,
13    empty::Empty,
14    event::{Event, ToEvent},
15    props::ErasedProps,
16};
17
18/**
19An asynchronous destination for diagnostic data.
20
21Once [`Event`]s are emitted through [`Emitter::emit`], a call to [`Emitter::blocking_flush`] must be made to ensure they're fully processed. This should be done once before the emitter is disposed, but may be more frequent for auditing.
22*/
23pub trait Emitter {
24    /**
25    Emit an [`Event`].
26    */
27    fn emit<E: ToEvent>(&self, evt: E);
28
29    /**
30    Block for up to `timeout`, waiting for all diagnostic data emitted up to this point to be fully processed.
31
32    This method returns `true` if the flush completed, and `false` if it timed out.
33
34    If an emitter doesn't need to flush, this method should immediately return `true`. If an emitted doesn't support flushing, this method should immediately return `false`.
35    */
36    fn blocking_flush(&self, timeout: Duration) -> bool;
37
38    /**
39    Emit events to both `self` and `other`.
40    */
41    fn and_to<U>(self, other: U) -> And<Self, U>
42    where
43        Self: Sized,
44    {
45        And::new(self, other)
46    }
47
48    /**
49    Wrap the emitter, transforming or filtering [`Event`]s before it receives them.
50
51    Flushing defers to the wrapped emitter.
52    */
53    fn wrap_emitter<W: wrapping::Wrapping>(self, wrapping: W) -> Wrap<Self, W>
54    where
55        Self: Sized,
56    {
57        Wrap {
58            emitter: self,
59            wrapping,
60        }
61    }
62}
63
64impl<'a, T: Emitter + ?Sized> Emitter for &'a T {
65    fn emit<E: ToEvent>(&self, evt: E) {
66        (**self).emit(evt)
67    }
68
69    fn blocking_flush(&self, timeout: Duration) -> bool {
70        (**self).blocking_flush(timeout)
71    }
72}
73
74#[cfg(feature = "alloc")]
75impl<'a, T: Emitter + ?Sized + 'a> Emitter for alloc::boxed::Box<T> {
76    fn emit<E: ToEvent>(&self, evt: E) {
77        (**self).emit(evt)
78    }
79
80    fn blocking_flush(&self, timeout: Duration) -> bool {
81        (**self).blocking_flush(timeout)
82    }
83}
84
85#[cfg(feature = "alloc")]
86impl<'a, T: Emitter + ?Sized + 'a> Emitter for alloc::sync::Arc<T> {
87    fn emit<E: ToEvent>(&self, evt: E) {
88        (**self).emit(evt)
89    }
90
91    fn blocking_flush(&self, timeout: Duration) -> bool {
92        (**self).blocking_flush(timeout)
93    }
94}
95
96impl<T: Emitter> Emitter for Option<T> {
97    fn emit<E: ToEvent>(&self, evt: E) {
98        match self {
99            Some(target) => target.emit(evt),
100            None => Empty.emit(evt),
101        }
102    }
103
104    fn blocking_flush(&self, timeout: Duration) -> bool {
105        match self {
106            Some(target) => target.blocking_flush(timeout),
107            None => Empty.blocking_flush(timeout),
108        }
109    }
110}
111
112impl Emitter for Empty {
113    fn emit<E: ToEvent>(&self, _: E) {}
114
115    fn blocking_flush(&self, _: Duration) -> bool {
116        true
117    }
118}
119
120impl Emitter for fn(Event<&dyn ErasedProps>) {
121    fn emit<E: ToEvent>(&self, evt: E) {
122        (self)(evt.to_event().erase())
123    }
124
125    fn blocking_flush(&self, _: Duration) -> bool {
126        true
127    }
128}
129
130/**
131An [`Emitter`] from a function.
132
133This type can be created directly, or via [`from_fn`].
134*/
135pub struct FromFn<F = fn(Event<&dyn ErasedProps>)>(F);
136
137impl<F> FromFn<F> {
138    /**
139    Wrap the given emitter function.
140    */
141    pub const fn new(emitter: F) -> FromFn<F> {
142        FromFn(emitter)
143    }
144}
145
146impl<F: Fn(Event<&dyn ErasedProps>)> Emitter for FromFn<F> {
147    fn emit<E: ToEvent>(&self, evt: E) {
148        (self.0)(evt.to_event().erase())
149    }
150
151    fn blocking_flush(&self, _: Duration) -> bool {
152        true
153    }
154}
155
156/**
157Create an [`Emitter`] from a function.
158
159The input function is assumed not to perform any background work that needs flushing.
160*/
161pub const fn from_fn<F: Fn(Event<&dyn ErasedProps>)>(f: F) -> FromFn<F> {
162    FromFn::new(f)
163}
164
165impl<T: Emitter, U: Emitter> Emitter for And<T, U> {
166    fn emit<E: ToEvent>(&self, evt: E) {
167        let evt = evt.to_event();
168
169        self.left().emit(&evt);
170        self.right().emit(&evt);
171    }
172
173    fn blocking_flush(&self, timeout: Duration) -> bool {
174        // Approximate; give each target an equal
175        // time to flush. With a monotonic clock
176        // we could measure the time each takes
177        // to flush and track in our timeout
178        let timeout = timeout / 2;
179
180        let lhs = self.left().blocking_flush(timeout);
181        let rhs = self.right().blocking_flush(timeout);
182
183        lhs && rhs
184    }
185}
186
187/**
188An [`Emitter`] that can transform or filter events before forwarding them through.
189
190This type is returned by [`Emitter::wrap_emitter`].
191*/
192pub struct Wrap<E, W> {
193    emitter: E,
194    wrapping: W,
195}
196
197impl<E, W> Wrap<E, W> {
198    /**
199    Get a reference to the underlying [`Emitter`].
200    */
201    pub const fn emitter(&self) -> &E {
202        &self.emitter
203    }
204
205    /**
206    Get a reference to the underlying [`wrapping::Wrapping`].
207    */
208    pub const fn wrapping(&self) -> &W {
209        &self.wrapping
210    }
211}
212
213impl<E: Emitter, W: wrapping::Wrapping> Emitter for Wrap<E, W> {
214    fn emit<T: ToEvent>(&self, evt: T) {
215        self.wrapping.wrap(&self.emitter, evt.to_event())
216    }
217
218    fn blocking_flush(&self, timeout: Duration) -> bool {
219        self.emitter.blocking_flush(timeout)
220    }
221}
222
223/**
224Wrap an [`Emitter`] in a [`wrapping::Wrapping`], transforming or filtering [`Event`]s before it receives them.
225
226Flushing defers to the wrapped emitter.
227*/
228pub fn wrap<E: Emitter, W: wrapping::Wrapping>(emitter: E, wrapping: W) -> Wrap<E, W> {
229    emitter.wrap_emitter(wrapping)
230}
231
232pub mod wrapping {
233    /*!
234    The [`Wrapping`] type.
235
236    This module defines a middleware API for [`Emitter`]s. An [`Emitter`] can be wrapped through [`Emitter::wrap_emitter`] in a [`Wrapping`] that can manipulate an [`Event`] before forwarding it to the wrapped emitter.
237    */
238
239    use super::*;
240
241    use crate::filter::Filter;
242
243    /**
244    A transformation or filter applied to an [`Event`] before emitting it through an [`Emitter`].
245    */
246    pub trait Wrapping {
247        /**
248        Wrap the given emitter.
249        */
250        fn wrap<O: Emitter, E: ToEvent>(&self, output: O, evt: E);
251    }
252
253    impl<'a, T: Wrapping + ?Sized> Wrapping for &'a T {
254        fn wrap<O: Emitter, E: ToEvent>(&self, output: O, evt: E) {
255            (**self).wrap(output, evt)
256        }
257    }
258
259    /**
260    A [`Wrapping`] from a function.
261
262    This type can be created directly, or via [`from_fn`].
263    */
264    pub struct FromFn<F = fn(&dyn ErasedEmitter, Event<&dyn ErasedProps>)>(F);
265
266    impl<F> FromFn<F> {
267        /**
268        Wrap the given function.
269        */
270        pub const fn new(wrapping: F) -> FromFn<F> {
271            FromFn(wrapping)
272        }
273    }
274
275    impl<F: Fn(&dyn ErasedEmitter, Event<&dyn ErasedProps>)> Wrapping for FromFn<F> {
276        fn wrap<O: Emitter, E: ToEvent>(&self, output: O, evt: E) {
277            (self.0)(&output, evt.to_event().erase())
278        }
279    }
280
281    /**
282    Create a [`Wrapping`] from a function.
283    */
284    pub const fn from_fn<F: Fn(&dyn ErasedEmitter, Event<&dyn ErasedProps>)>(f: F) -> FromFn<F> {
285        FromFn::new(f)
286    }
287
288    /**
289    A [`Wrapping`] from a filter.
290
291    The filter will be applied to incoming [`Event`]s and only passed to the output [`Emitter`] when they match.
292    */
293    pub struct FromFilter<F>(F);
294
295    impl<F> FromFilter<F> {
296        /**
297        Wrap the given filter.
298        */
299        pub const fn new(filter: F) -> FromFilter<F> {
300            FromFilter(filter)
301        }
302    }
303
304    impl<F: Filter> Wrapping for FromFilter<F> {
305        fn wrap<O: Emitter, E: ToEvent>(&self, output: O, evt: E) {
306            if self.0.matches(&evt) {
307                output.emit(evt);
308            }
309        }
310    }
311
312    /**
313    Create a [`Wrapping`] from a filter.
314    */
315    pub fn from_filter<F: Filter>(filter: F) -> FromFilter<F> {
316        FromFilter::new(filter)
317    }
318
319    mod internal {
320        use crate::{emitter::ErasedEmitter, event::Event, props::ErasedProps};
321
322        pub trait DispatchWrapping {
323            fn dispatch_wrap(&self, emitter: &dyn ErasedEmitter, evt: Event<&dyn ErasedProps>);
324        }
325
326        pub trait SealedWrapping {
327            fn erase_wrapping(&self) -> crate::internal::Erased<&dyn DispatchWrapping>;
328        }
329    }
330
331    /**
332    An object-safe [`Wrapping`].
333
334    A `dyn ErasedWrapping` can be treated as `impl Wrapping`.
335    */
336    pub trait ErasedWrapping: internal::SealedWrapping {}
337
338    impl<T: Wrapping> ErasedWrapping for T {}
339
340    impl<T: Wrapping> internal::SealedWrapping for T {
341        fn erase_wrapping(&self) -> crate::internal::Erased<&dyn internal::DispatchWrapping> {
342            crate::internal::Erased(self)
343        }
344    }
345
346    impl<T: Wrapping> internal::DispatchWrapping for T {
347        fn dispatch_wrap(&self, emitter: &dyn ErasedEmitter, evt: Event<&dyn ErasedProps>) {
348            self.wrap(emitter, evt)
349        }
350    }
351
352    impl<'a> Wrapping for dyn ErasedWrapping + 'a {
353        fn wrap<O: Emitter, E: ToEvent>(&self, output: O, evt: E) {
354            self.erase_wrapping()
355                .0
356                .dispatch_wrap(&output, evt.to_event().erase())
357        }
358    }
359
360    impl<'a> Wrapping for dyn ErasedWrapping + Send + Sync + 'a {
361        fn wrap<O: Emitter, E: ToEvent>(&self, output: O, evt: E) {
362            (self as &(dyn ErasedWrapping + 'a)).wrap(output, evt)
363        }
364    }
365
366    #[cfg(test)]
367    mod tests {
368        use super::*;
369
370        use crate::{path::Path, template::Template};
371
372        use std::cell::Cell;
373
374        struct MyWrapping(Cell<usize>);
375
376        impl Wrapping for MyWrapping {
377            fn wrap<O: Emitter, E: ToEvent>(&self, _: O, _: E) {
378                self.0.set(self.0.get() + 1);
379            }
380        }
381
382        #[test]
383        fn erased_wrapping() {
384            let wrapping = MyWrapping(Cell::new(0));
385
386            {
387                let wrapping = &wrapping as &dyn ErasedWrapping;
388
389                wrapping.wrap(
390                    Empty,
391                    Event::new(Path::new_raw("a"), Template::literal("test"), Empty, Empty),
392                );
393            }
394
395            assert_eq!(1, wrapping.0.get());
396        }
397
398        #[test]
399        fn from_fn_wrapping() {
400            let calls = Cell::new(0);
401
402            let wrapping = from_fn(|_, _| {
403                calls.set(calls.get() + 1);
404            });
405
406            wrapping.wrap(
407                Empty,
408                Event::new(Path::new_raw("a"), Template::literal("test"), Empty, Empty),
409            );
410
411            assert_eq!(1, calls.get());
412        }
413    }
414}
415
416mod internal {
417    use core::time::Duration;
418
419    use crate::{event::Event, props::ErasedProps};
420
421    pub trait DispatchEmitter {
422        fn dispatch_emit(&self, evt: &Event<&dyn ErasedProps>);
423        fn dispatch_blocking_flush(&self, timeout: Duration) -> bool;
424    }
425
426    pub trait SealedEmitter {
427        fn erase_emitter(&self) -> crate::internal::Erased<&dyn DispatchEmitter>;
428    }
429}
430
431/**
432An object-safe [`Emitter`].
433
434A `dyn ErasedEmitter` can be treated as `impl Emitter`.
435*/
436pub trait ErasedEmitter: internal::SealedEmitter {}
437
438impl<T: Emitter> ErasedEmitter for T {}
439
440impl<T: Emitter> internal::SealedEmitter for T {
441    fn erase_emitter(&self) -> crate::internal::Erased<&dyn internal::DispatchEmitter> {
442        crate::internal::Erased(self)
443    }
444}
445
446impl<T: Emitter> internal::DispatchEmitter for T {
447    fn dispatch_emit(&self, evt: &Event<&dyn ErasedProps>) {
448        self.emit(evt)
449    }
450
451    fn dispatch_blocking_flush(&self, timeout: Duration) -> bool {
452        self.blocking_flush(timeout)
453    }
454}
455
456impl<'a> Emitter for dyn ErasedEmitter + 'a {
457    fn emit<E: ToEvent>(&self, evt: E) {
458        self.erase_emitter()
459            .0
460            .dispatch_emit(&evt.to_event().erase())
461    }
462
463    fn blocking_flush(&self, timeout: Duration) -> bool {
464        self.erase_emitter().0.dispatch_blocking_flush(timeout)
465    }
466}
467
468impl<'a> Emitter for dyn ErasedEmitter + Send + Sync + 'a {
469    fn emit<E: ToEvent>(&self, evt: E) {
470        (self as &(dyn ErasedEmitter + 'a)).emit(evt)
471    }
472
473    fn blocking_flush(&self, timeout: Duration) -> bool {
474        (self as &(dyn ErasedEmitter + 'a)).blocking_flush(timeout)
475    }
476}
477
478#[cfg(test)]
479mod tests {
480    use crate::{path::Path, props::Props, template::Template};
481
482    use super::*;
483
484    use std::{cell::Cell, sync::Mutex};
485
486    struct MyEmitter {
487        pending: Mutex<Vec<String>>,
488        emitted: Mutex<Vec<String>>,
489    }
490
491    impl MyEmitter {
492        fn new() -> Self {
493            MyEmitter {
494                pending: Mutex::new(Vec::new()),
495                emitted: Mutex::new(Vec::new()),
496            }
497        }
498
499        fn emitted(&self) -> Vec<String> {
500            (*self.emitted.lock().unwrap()).clone()
501        }
502    }
503
504    impl Emitter for MyEmitter {
505        fn emit<E: ToEvent>(&self, evt: E) {
506            let rendered = evt.to_event().msg().to_string();
507            self.pending.lock().unwrap().push(rendered);
508        }
509
510        fn blocking_flush(&self, _: Duration) -> bool {
511            let mut pending = self.pending.lock().unwrap();
512            let mut emitted = self.emitted.lock().unwrap();
513
514            emitted.extend(pending.drain(..));
515
516            true
517        }
518    }
519
520    #[test]
521    fn erased_emitter() {
522        let emitter = MyEmitter::new();
523
524        {
525            let emitter = &emitter as &dyn ErasedEmitter;
526
527            emitter.emit(Event::new(
528                Path::new_raw("a"),
529                Template::literal("event 1"),
530                Empty,
531                Empty,
532            ));
533            emitter.blocking_flush(Duration::from_secs(0));
534        }
535
536        assert_eq!(vec![String::from("event 1")], emitter.emitted());
537    }
538
539    #[test]
540    fn option_emitter() {
541        for (emitter, expected) in [
542            (Some(MyEmitter::new()), vec![String::from("event 1")]),
543            (None, vec![]),
544        ] {
545            emitter.emit(Event::new(
546                Path::new_raw("a"),
547                Template::literal("event 1"),
548                Empty,
549                Empty,
550            ));
551            emitter.blocking_flush(Duration::from_secs(0));
552
553            let emitted = emitter.map(|emitter| emitter.emitted()).unwrap_or_default();
554
555            assert_eq!(expected, emitted);
556        }
557    }
558
559    #[test]
560    fn from_fn_emitter() {
561        let count = Cell::new(0);
562
563        let emitter = from_fn(|evt| {
564            assert_eq!(Path::new_raw("a"), evt.mdl());
565
566            count.set(count.get() + 1);
567        });
568
569        emitter.emit(Event::new(
570            Path::new_raw("a"),
571            Template::literal("event 1"),
572            Empty,
573            Empty,
574        ));
575
576        assert_eq!(1, count.get());
577    }
578
579    #[test]
580    fn and_emitter() {
581        let emitter = MyEmitter::new().and_to(MyEmitter::new());
582
583        emitter.emit(Event::new(
584            Path::new_raw("a"),
585            Template::literal("event 1"),
586            Empty,
587            Empty,
588        ));
589        emitter.blocking_flush(Duration::from_secs(0));
590
591        assert_eq!(vec![String::from("event 1")], emitter.left().emitted());
592        assert_eq!(vec![String::from("event 1")], emitter.right().emitted());
593    }
594
595    #[test]
596    fn wrap_emitter() {
597        let count = Cell::new(0);
598        let emitter = from_fn(|evt| {
599            assert_eq!(1, evt.props().pull::<i32, _>("appended").unwrap());
600
601            count.set(count.get() + 1);
602        })
603        .wrap_emitter(wrapping::from_fn(|output, evt| {
604            output.emit(evt.map_props(|props| props.and_props(("appended", 1))));
605        }));
606
607        emitter.emit(Event::new(
608            Path::new_raw("a"),
609            Template::literal("event 1"),
610            Empty,
611            Empty,
612        ));
613
614        assert_eq!(1, count.get());
615    }
616}