actm/
types.rs

1//! Types used by the event interface
2
3use std::{
4    error::Error,
5    fmt::Debug,
6    hash::Hash,
7    marker::PhantomData,
8    sync::{
9        Arc,
10        atomic::{AtomicU64, Ordering},
11    },
12};
13
14use async_trait::async_trait;
15use once_cell::sync::Lazy;
16
17use super::traits::{Event, EventConsumer};
18
19mod waiter;
20
21pub use waiter::{Trigger, Waiter};
22
23/// Utility type for generalized errors
24pub enum Either<A, B> {
25    /// The happy path or preferred option
26    Happy(A),
27    /// The sad path or error
28    Sad(B),
29}
30
31/// Token for listening for responses to requests
32///
33/// These tokens are always guaranteed, to have at most one equal counterpart, and they are
34/// deliberately not [`Clone`]able or [`Copy`]able to help enforce that constraint.
35///
36/// Internally, the token contains a [`u64`] sourced from an incrementing atomic, no guarantees are
37/// made about iteration order, and the tokens have no meaning across executions.
38#[derive(PartialEq, Eq, Ord, PartialOrd, Debug, Hash)]
39pub struct CompletionToken(Arc<u64>);
40
41impl CompletionToken {
42    /// Creates a pair of equal `CompletionTokens`
43    ///
44    /// It is guaranteed that no previous or future pair of `CompletionTokens`, at least within a
45    /// single execution of the program, will be equal to these tokens
46    pub fn new() -> (CompletionToken, CompletionToken) {
47        /// Counter used for generating tokens
48        static COUNTER: Lazy<AtomicU64> = Lazy::new(|| AtomicU64::new(0));
49        let value = Arc::new(COUNTER.fetch_add(1, Ordering::SeqCst));
50        (CompletionToken(value.clone()), CompletionToken(value))
51    }
52
53    /// Returns the number of copies of this [`CompletionToken`] currently alive
54    ///
55    /// This is used for doing "Garbage Collection" of token's whose partners have been dropped.
56    pub fn count(&self) -> usize {
57        let count = Arc::strong_count(&self.0);
58        // We use a `debug_assert` here instead of an `assert` or returning an error for the
59        // following reasons:
60        //   - This will be used in places where there's no good place to put in error handling
61        //   - There's no safe way for an application to recover from this that wouldn't be hugely
62        //     onerous to implement
63        //   - While this would be a bug, its consequences are likely limited to a memory leak, and
64        //     since it should actually be quite hard to cause this such instances are expected to
65        //     be rare, so it's not really worth it to outright crash a production application over
66        //     this
67        //   - It is still, however, worth being noisy to developers about, so that we can get bug
68        //     reports filed and the bug fixed
69        debug_assert!(
70            count <= 2,
71            "A CompletionToken has been illegally duplicated, copies: {} maximum: 2. \
72             This is a bug, please file a bug report.",
73            count
74        );
75        count
76    }
77}
78
79/// Dynamic wrapper around an [`Error`] to make some type magic work
80pub struct DynamicError(Box<dyn Error + Send + Sync>);
81
82impl std::fmt::Display for DynamicError {
83    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84        std::fmt::Display::fmt(&self.0, f)
85    }
86}
87
88impl std::fmt::Debug for DynamicError {
89    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90        self.0.fmt(f)
91    }
92}
93
94#[allow(deprecated)]
95impl Error for DynamicError {
96    fn source(&self) -> Option<&(dyn Error + 'static)> {
97        self.0.source()
98    }
99
100    fn description(&self) -> &str {
101        self.0.description()
102    }
103
104    fn cause(&self) -> Option<&dyn Error> {
105        self.0.cause()
106    }
107}
108
109/// Wrapper type around an [`EventConsumer`] that boxes the resulting error
110pub struct DynamicConsumer<E: Event, C: EventConsumer<E>> {
111    /// Inner Value
112    inner: C,
113    /// Phantom for the event type, since we need it
114    _event: PhantomData<E>,
115}
116
117impl<E: Event, C: EventConsumer<E>> From<C> for DynamicConsumer<E, C> {
118    fn from(inner: C) -> Self {
119        Self {
120            inner,
121            _event: PhantomData,
122        }
123    }
124}
125
126#[async_trait]
127impl<E: Event, C: EventConsumer<E>> EventConsumer<E> for DynamicConsumer<E, C> {
128    type Error = DynamicError;
129    async fn accept(&self, event: E) -> Result<(), Self::Error> {
130        match self.inner.accept(event).await {
131            Ok(_) => Ok(()),
132            Err(e) => Err(DynamicError(Box::new(e))),
133        }
134    }
135
136    fn accept_sync(&self, event: E) -> Result<(), Self::Error> {
137        match self.inner.accept_sync(event) {
138            Ok(_) => Ok(()),
139            Err(e) => Err(DynamicError(Box::new(e))),
140        }
141    }
142}
143
144#[cfg(test)]
145mod tests {
146    use super::*;
147    // Smoke test `CompletionToken`'s reference counting
148    #[test]
149    fn completion_token_ref_counting() {
150        let (token_1, token_2) = CompletionToken::new();
151        assert_eq!(token_1.count(), 2);
152        assert_eq!(token_2.count(), 2);
153        // Drop a token
154        std::mem::drop(token_2);
155        // Check the count
156        assert_eq!(token_1.count(), 1);
157    }
158    // Make sure tokens are equal to themsleves
159    #[test]
160    fn token_self_equality() {
161        let (token_a, token_b) = CompletionToken::new();
162        assert_eq!(token_a, token_b);
163    }
164    // Make sure successive tokens are non-equal
165    #[test]
166    fn token_non_equal() {
167        let (token_a, _) = CompletionToken::new();
168        let (token_b, _) = CompletionToken::new();
169        assert_ne!(token_a, token_b);
170    }
171}