async_callback_manager/
adaptors.rs

1use crate::{BackendStreamingTask, BackendTask, DEFAULT_STREAM_CHANNEL_SIZE};
2use futures::{Stream, StreamExt};
3use std::fmt::Debug;
4use std::future::Future;
5use tokio_stream::wrappers::ReceiverStream;
6
7impl<Bkend, T: BackendTask<Bkend>> BackendTaskExt<Bkend> for T {}
8impl<Bkend, T: BackendTask<Bkend, Output = Result<O, E>>, O, E> TryBackendTaskExt<Bkend> for T {
9    type Error = E;
10    type Ok = O;
11}
12
13pub trait TryBackendTaskExt<Bkend>: BackendTask<Bkend> {
14    type Error;
15    type Ok;
16    fn map_stream<S, F>(self, create_next: F) -> Map<Self, F>
17    where
18        Self: Sized,
19        S: BackendStreamingTask<Bkend>,
20        F: FnOnce(Self::Ok) -> S,
21    {
22        Map {
23            first: self,
24            create_next,
25        }
26    }
27}
28pub trait BackendTaskExt<Bkend>: BackendTask<Bkend> {
29    fn then<T, F>(self, create_next: F) -> Then<Self, F>
30    where
31        Self: Sized,
32        T: BackendTask<Bkend>,
33        F: FnOnce(Self::Output) -> T,
34    {
35        Then {
36            first: self,
37            create_next,
38        }
39    }
40    fn then_stream<S, F>(self, create_next: F) -> Then<Self, F>
41    where
42        Self: Sized,
43        S: BackendStreamingTask<Bkend>,
44        F: FnOnce(Self::Output) -> S,
45    {
46        Then {
47            first: self,
48            create_next,
49        }
50    }
51}
52
53pub struct Map<T, F> {
54    first: T,
55    create_next: F,
56}
57
58impl<T, F> Debug for Map<T, F>
59where
60    T: Debug,
61{
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        f.debug_struct("Map")
64            .field("first", &self.first)
65            // TODO: we could deduce the type name returned by the closure
66            .field("create_next", &"..closure..")
67            .finish()
68    }
69}
70
71impl<Bkend, T, S, F, Ct, O, E> BackendStreamingTask<Bkend> for Map<T, F>
72where
73    Bkend: Clone + Sync + Send + 'static,
74    F: Sync + Send + 'static,
75    T: BackendTask<Bkend, MetadataType = Ct, Output = std::result::Result<O, E>>,
76    S: BackendStreamingTask<Bkend, MetadataType = Ct>,
77    Ct: PartialEq,
78    F: FnOnce(O) -> S,
79    E: Send + 'static,
80    O: Send,
81{
82    type Output = std::result::Result<S::Output, E>;
83    type MetadataType = Ct;
84    fn into_stream(
85        self,
86        backend: &Bkend,
87    ) -> impl Stream<Item = Self::Output> + Send + Unpin + 'static {
88        let Map { first, create_next } = self;
89        let backend = backend.clone();
90        let (tx, rx) = tokio::sync::mpsc::channel(DEFAULT_STREAM_CHANNEL_SIZE);
91        tokio::task::spawn(async move {
92            let seed = first.into_future(&backend).await;
93            match seed {
94                Ok(seed) => {
95                    let mut stream = create_next(seed).into_stream(&backend);
96                    while let Some(item) = stream.next().await {
97                        let _ = tx.send(Ok(item)).await;
98                    }
99                }
100                Err(e) => {
101                    let _ = tx.send(Err(e)).await;
102                }
103            }
104        });
105        ReceiverStream::new(rx)
106    }
107    fn metadata() -> Vec<Self::MetadataType> {
108        let mut first = T::metadata();
109        let mut second = S::metadata();
110        second.append(&mut first);
111        second
112    }
113}
114
115pub struct Then<T, F> {
116    first: T,
117    create_next: F,
118}
119
120impl<Bkend, T, T2, F, Ct> BackendTask<Bkend> for Then<T, F>
121where
122    Bkend: Clone + Send + 'static,
123    F: Sync + Send + 'static,
124    T: BackendTask<Bkend, MetadataType = Ct>,
125    T2: BackendTask<Bkend, MetadataType = Ct>,
126    Ct: PartialEq,
127    F: FnOnce(T::Output) -> T2,
128{
129    type Output = T2::Output;
130    type MetadataType = Ct;
131    fn into_future(self, backend: &Bkend) -> impl Future<Output = Self::Output> + Send + 'static {
132        let Then { first, create_next } = self;
133        let backend = backend.clone();
134        async move {
135            let output = first.into_future(&backend).await;
136            let next = create_next(output);
137            next.into_future(&backend).await
138        }
139    }
140    fn metadata() -> Vec<Self::MetadataType> {
141        let mut first = T::metadata();
142        let mut second = T2::metadata();
143        second.append(&mut first);
144        second
145    }
146}
147
148impl<Bkend, T, S, F, Ct> BackendStreamingTask<Bkend> for Then<T, F>
149where
150    Bkend: Clone + Sync + Send + 'static,
151    F: Sync + Send + 'static,
152    T: BackendTask<Bkend, MetadataType = Ct>,
153    S: BackendStreamingTask<Bkend, MetadataType = Ct>,
154    Ct: PartialEq,
155    F: FnOnce(T::Output) -> S,
156{
157    type Output = S::Output;
158    type MetadataType = Ct;
159    fn into_stream(
160        self,
161        backend: &Bkend,
162    ) -> impl Stream<Item = Self::Output> + Send + Unpin + 'static {
163        let Then { first, create_next } = self;
164        let backend = backend.clone();
165        let (tx, rx) = tokio::sync::mpsc::channel(DEFAULT_STREAM_CHANNEL_SIZE);
166        tokio::task::spawn(async move {
167            let seed = first.into_future(&backend).await;
168            let mut stream = create_next(seed).into_stream(&backend);
169            while let Some(item) = stream.next().await {
170                let _ = tx.send(item).await;
171            }
172        });
173        ReceiverStream::new(rx)
174    }
175    fn metadata() -> Vec<Self::MetadataType> {
176        let mut first = T::metadata();
177        let mut second = S::metadata();
178        second.append(&mut first);
179        second
180    }
181}