1use std::{
4 error::Error,
5 fmt::Debug,
6 hash::Hash,
7 marker::PhantomData,
8 sync::{
9 Arc,
10 atomic::{AtomicU64, Ordering},
11 },
12};
13
14use async_trait::async_trait;
15use once_cell::sync::Lazy;
16
17use super::traits::{Event, EventConsumer};
18
19mod waiter;
20
21pub use waiter::{Trigger, Waiter};
22
23pub enum Either<A, B> {
25 Happy(A),
27 Sad(B),
29}
30
31#[derive(PartialEq, Eq, Ord, PartialOrd, Debug, Hash)]
39pub struct CompletionToken(Arc<u64>);
40
41impl CompletionToken {
42 pub fn new() -> (CompletionToken, CompletionToken) {
47 static COUNTER: Lazy<AtomicU64> = Lazy::new(|| AtomicU64::new(0));
49 let value = Arc::new(COUNTER.fetch_add(1, Ordering::SeqCst));
50 (CompletionToken(value.clone()), CompletionToken(value))
51 }
52
53 pub fn count(&self) -> usize {
57 let count = Arc::strong_count(&self.0);
58 debug_assert!(
70 count <= 2,
71 "A CompletionToken has been illegally duplicated, copies: {} maximum: 2. \
72 This is a bug, please file a bug report.",
73 count
74 );
75 count
76 }
77}
78
79pub struct DynamicError(Box<dyn Error + Send + Sync>);
81
82impl std::fmt::Display for DynamicError {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 std::fmt::Display::fmt(&self.0, f)
85 }
86}
87
88impl std::fmt::Debug for DynamicError {
89 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90 self.0.fmt(f)
91 }
92}
93
94#[allow(deprecated)]
95impl Error for DynamicError {
96 fn source(&self) -> Option<&(dyn Error + 'static)> {
97 self.0.source()
98 }
99
100 fn description(&self) -> &str {
101 self.0.description()
102 }
103
104 fn cause(&self) -> Option<&dyn Error> {
105 self.0.cause()
106 }
107}
108
109pub struct DynamicConsumer<E: Event, C: EventConsumer<E>> {
111 inner: C,
113 _event: PhantomData<E>,
115}
116
117impl<E: Event, C: EventConsumer<E>> From<C> for DynamicConsumer<E, C> {
118 fn from(inner: C) -> Self {
119 Self {
120 inner,
121 _event: PhantomData,
122 }
123 }
124}
125
126#[async_trait]
127impl<E: Event, C: EventConsumer<E>> EventConsumer<E> for DynamicConsumer<E, C> {
128 type Error = DynamicError;
129 async fn accept(&self, event: E) -> Result<(), Self::Error> {
130 match self.inner.accept(event).await {
131 Ok(_) => Ok(()),
132 Err(e) => Err(DynamicError(Box::new(e))),
133 }
134 }
135
136 fn accept_sync(&self, event: E) -> Result<(), Self::Error> {
137 match self.inner.accept_sync(event) {
138 Ok(_) => Ok(()),
139 Err(e) => Err(DynamicError(Box::new(e))),
140 }
141 }
142}
143
144#[cfg(test)]
145mod tests {
146 use super::*;
147 #[test]
149 fn completion_token_ref_counting() {
150 let (token_1, token_2) = CompletionToken::new();
151 assert_eq!(token_1.count(), 2);
152 assert_eq!(token_2.count(), 2);
153 std::mem::drop(token_2);
155 assert_eq!(token_1.count(), 1);
157 }
158 #[test]
160 fn token_self_equality() {
161 let (token_a, token_b) = CompletionToken::new();
162 assert_eq!(token_a, token_b);
163 }
164 #[test]
166 fn token_non_equal() {
167 let (token_a, _) = CompletionToken::new();
168 let (token_b, _) = CompletionToken::new();
169 assert_ne!(token_a, token_b);
170 }
171}