async_callback_manager/
adaptors.rs

1use crate::{
2    BackendStreamingTask, BackendTask, DEFAULT_STREAM_CHANNEL_SIZE, OptDebug, OptPartialEq,
3    PanickingReceiverStream,
4};
5use futures::{Stream, StreamExt};
6use std::fmt::Debug;
7use std::future::Future;
8
9impl<Bkend, T: BackendTask<Bkend>> BackendTaskExt<Bkend> for T {}
10impl<Bkend, T: BackendTask<Bkend, Output = Result<O, E>>, O, E> TryBackendTaskExt<Bkend> for T {
11    type Error = E;
12    type Ok = O;
13}
14/// Local definition of the unstable FnOnce trait, allowing users of this crate
15/// to implement mapping functions that are PartialEq and Debug for
16/// observability purposes.
17pub trait MapFn<T>: OptPartialEq + OptDebug {
18    type Output;
19    fn apply(self, input: T) -> Self::Output;
20}
21#[cfg(all(not(feature = "task-equality"), not(feature = "task-debug")))]
22impl<T, F, O> MapFn<T> for F
23where
24    F: FnOnce(T) -> O,
25{
26    type Output = O;
27    fn apply(self, input: T) -> Self::Output {
28        self(input)
29    }
30}
31
32pub trait TryBackendTaskExt<Bkend>: BackendTask<Bkend> {
33    type Error;
34    type Ok;
35    fn map_stream<S, F>(self, create_next: F) -> Map<Self, F>
36    where
37        Self: Sized,
38        S: BackendStreamingTask<Bkend>,
39        F: MapFn<Self::Ok, Output = S>,
40    {
41        Map {
42            first: self,
43            create_next,
44        }
45    }
46}
47pub trait BackendTaskExt<Bkend>: BackendTask<Bkend> {
48    fn then<T, F>(self, create_next: F) -> Then<Self, F>
49    where
50        Self: Sized,
51        T: BackendTask<Bkend>,
52        F: FnOnce(Self::Output) -> T,
53    {
54        Then {
55            first: self,
56            create_next,
57        }
58    }
59    fn then_stream<S, F>(self, create_next: F) -> Then<Self, F>
60    where
61        Self: Sized,
62        S: BackendStreamingTask<Bkend>,
63        F: FnOnce(Self::Output) -> S,
64    {
65        Then {
66            first: self,
67            create_next,
68        }
69    }
70}
71
72pub struct Map<T, F> {
73    first: T,
74    create_next: F,
75}
76
77impl<T, F> PartialEq for Map<T, F>
78where
79    T: PartialEq,
80    F: PartialEq,
81{
82    fn eq(&self, other: &Self) -> bool {
83        self.first == other.first && self.create_next == other.create_next
84    }
85}
86
87impl<T, F> Debug for Map<T, F>
88where
89    T: Debug,
90    F: Debug,
91{
92    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93        f.debug_struct("Map")
94            .field("first", &self.first)
95            .field("create_next", &self.create_next)
96            .finish()
97    }
98}
99
100impl<Bkend, T, S, F, Ct, O, E> BackendStreamingTask<Bkend> for Map<T, F>
101where
102    Bkend: Clone + Sync + Send + 'static,
103    F: Sync + Send + 'static,
104    T: BackendTask<Bkend, MetadataType = Ct, Output = std::result::Result<O, E>>,
105    S: BackendStreamingTask<Bkend, MetadataType = Ct>,
106    Ct: PartialEq,
107    F: MapFn<O, Output = S>,
108    E: Send + 'static,
109    O: Send,
110{
111    type Output = std::result::Result<S::Output, E>;
112    type MetadataType = Ct;
113    fn into_stream(
114        self,
115        backend: &Bkend,
116    ) -> impl Stream<Item = Self::Output> + Send + Unpin + 'static {
117        let Map {
118            first, create_next, ..
119        } = self;
120        let backend = backend.clone();
121        let (tx, rx) = tokio::sync::mpsc::channel(DEFAULT_STREAM_CHANNEL_SIZE);
122        let handle = tokio::task::spawn(async move {
123            let seed = BackendTask::into_future(first, &backend).await;
124            match seed {
125                Ok(seed) => {
126                    let mut stream = create_next.apply(seed).into_stream(&backend);
127                    while let Some(item) = stream.next().await {
128                        let _ = tx.send(Ok(item)).await;
129                    }
130                }
131                Err(e) => {
132                    let _ = tx.send(Err(e)).await;
133                }
134            }
135        });
136        PanickingReceiverStream::new(rx, handle)
137    }
138    fn metadata() -> Vec<Self::MetadataType> {
139        let mut first = T::metadata();
140        let mut second = S::metadata();
141        second.append(&mut first);
142        second
143    }
144}
145
146#[derive(PartialEq, Debug)]
147pub struct Then<T, F> {
148    first: T,
149    create_next: F,
150}
151
152impl<Bkend, T, T2, F, Ct> BackendTask<Bkend> for Then<T, F>
153where
154    Bkend: Clone + Send + 'static,
155    F: Sync + Send + 'static,
156    T: BackendTask<Bkend, MetadataType = Ct>,
157    T2: BackendTask<Bkend, MetadataType = Ct>,
158    Ct: PartialEq,
159    F: MapFn<T::Output, Output = T2>,
160{
161    type Output = T2::Output;
162    type MetadataType = Ct;
163    fn into_future(self, backend: &Bkend) -> impl Future<Output = Self::Output> + Send + 'static {
164        let Then { first, create_next } = self;
165        let backend = backend.clone();
166        async move {
167            let output = BackendTask::into_future(first, &backend).await;
168            let next = create_next.apply(output);
169            BackendTask::into_future(next, &backend).await
170        }
171    }
172    fn metadata() -> Vec<Self::MetadataType> {
173        let mut first = T::metadata();
174        let mut second = T2::metadata();
175        second.append(&mut first);
176        second
177    }
178}
179
180impl<Bkend, T, S, F, Ct> BackendStreamingTask<Bkend> for Then<T, F>
181where
182    Bkend: Clone + Sync + Send + 'static,
183    F: Sync + Send + 'static,
184    T: BackendTask<Bkend, MetadataType = Ct>,
185    S: BackendStreamingTask<Bkend, MetadataType = Ct>,
186    Ct: PartialEq,
187    F: MapFn<T::Output, Output = S>,
188{
189    type Output = S::Output;
190    type MetadataType = Ct;
191    fn into_stream(
192        self,
193        backend: &Bkend,
194    ) -> impl Stream<Item = Self::Output> + Send + Unpin + 'static {
195        let Then { first, create_next } = self;
196        let backend = backend.clone();
197        let (tx, rx) = tokio::sync::mpsc::channel(DEFAULT_STREAM_CHANNEL_SIZE);
198        let handle = tokio::task::spawn(async move {
199            let seed = BackendTask::into_future(first, &backend).await;
200            let mut stream = create_next.apply(seed).into_stream(&backend);
201            while let Some(item) = stream.next().await {
202                let _ = tx.send(item).await;
203            }
204        });
205        PanickingReceiverStream::new(rx, handle)
206    }
207    fn metadata() -> Vec<Self::MetadataType> {
208        let mut first = T::metadata();
209        let mut second = S::metadata();
210        second.append(&mut first);
211        second
212    }
213}