async_callback_manager/
adaptors.rs

1use crate::{BackendStreamingTask, BackendTask, DEFAULT_STREAM_CHANNEL_SIZE};
2use futures::{Stream, StreamExt};
3use std::{fmt::Debug, future::Future};
4use tokio_stream::wrappers::ReceiverStream;
5
6impl<Bkend, T: BackendTask<Bkend>> BackendTaskExt<Bkend> for T {}
7impl<Bkend, T: BackendTask<Bkend, Output = Result<O, E>>, O, E> TryBackendTaskExt<Bkend> for T {
8    type Error = E;
9    type Ok = O;
10}
11
12pub trait TryBackendTaskExt<Bkend>: BackendTask<Bkend> {
13    type Error;
14    type Ok;
15    fn map_stream<S, F>(self, create_next: F) -> Map<Self, F>
16    where
17        Self: Sized,
18        S: BackendStreamingTask<Bkend>,
19        F: FnOnce(Self::Ok) -> S,
20    {
21        Map {
22            first: self,
23            create_next,
24        }
25    }
26}
27pub trait BackendTaskExt<Bkend>: BackendTask<Bkend> {
28    fn then<T, F>(self, create_next: F) -> Then<Self, F>
29    where
30        Self: Sized,
31        T: BackendTask<Bkend>,
32        F: FnOnce(Self::Output) -> T,
33    {
34        Then {
35            first: self,
36            create_next,
37        }
38    }
39    fn then_stream<S, F>(self, create_next: F) -> Then<Self, F>
40    where
41        Self: Sized,
42        S: BackendStreamingTask<Bkend>,
43        F: FnOnce(Self::Output) -> S,
44    {
45        Then {
46            first: self,
47            create_next,
48        }
49    }
50}
51
52pub struct Map<T, F> {
53    first: T,
54    create_next: F,
55}
56
57impl<T, F> Debug for Map<T, F>
58where
59    T: Debug,
60{
61    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62        f.debug_struct("Map")
63            .field("first", &self.first)
64            // TODO: we could deduce the type name returned by the closure
65            .field("create_next", &"..closure..")
66            .finish()
67    }
68}
69
70impl<Bkend, T, S, F, Ct, O, E> BackendStreamingTask<Bkend> for Map<T, F>
71where
72    Bkend: Clone + Sync + Send + 'static,
73    F: Sync + Send + 'static,
74    T: BackendTask<Bkend, MetadataType = Ct, Output = std::result::Result<O, E>>,
75    S: BackendStreamingTask<Bkend, MetadataType = Ct>,
76    Ct: PartialEq,
77    F: FnOnce(O) -> S,
78    E: Send + 'static,
79    O: Send,
80{
81    type Output = std::result::Result<S::Output, E>;
82    type MetadataType = Ct;
83    fn into_stream(
84        self,
85        backend: &Bkend,
86    ) -> impl Stream<Item = Self::Output> + Send + Unpin + 'static {
87        let Map { first, create_next } = self;
88        let backend = backend.clone();
89        let (tx, rx) = tokio::sync::mpsc::channel(DEFAULT_STREAM_CHANNEL_SIZE);
90        tokio::task::spawn(async move {
91            let seed = first.into_future(&backend).await;
92            match seed {
93                Ok(seed) => {
94                    let mut stream = create_next(seed).into_stream(&backend);
95                    while let Some(item) = stream.next().await {
96                        let _ = tx.send(Ok(item)).await;
97                    }
98                }
99                Err(e) => {
100                    let _ = tx.send(Err(e)).await;
101                }
102            }
103        });
104        ReceiverStream::new(rx)
105    }
106    fn metadata() -> Vec<Self::MetadataType> {
107        let mut first = T::metadata();
108        let mut second = S::metadata();
109        second.append(&mut first);
110        second
111    }
112}
113
114pub struct Then<T, F> {
115    first: T,
116    create_next: F,
117}
118
119impl<Bkend, T, T2, F, Ct> BackendTask<Bkend> for Then<T, F>
120where
121    Bkend: Clone + Send + 'static,
122    F: Sync + Send + 'static,
123    T: BackendTask<Bkend, MetadataType = Ct>,
124    T2: BackendTask<Bkend, MetadataType = Ct>,
125    Ct: PartialEq,
126    F: FnOnce(T::Output) -> T2,
127{
128    type Output = T2::Output;
129    type MetadataType = Ct;
130    fn into_future(self, backend: &Bkend) -> impl Future<Output = Self::Output> + Send + 'static {
131        let Then { first, create_next } = self;
132        let backend = backend.clone();
133        async move {
134            let output = first.into_future(&backend).await;
135            let next = create_next(output);
136            next.into_future(&backend).await
137        }
138    }
139    fn metadata() -> Vec<Self::MetadataType> {
140        let mut first = T::metadata();
141        let mut second = T2::metadata();
142        second.append(&mut first);
143        second
144    }
145}
146
147impl<Bkend, T, S, F, Ct> BackendStreamingTask<Bkend> for Then<T, F>
148where
149    Bkend: Clone + Sync + Send + 'static,
150    F: Sync + Send + 'static,
151    T: BackendTask<Bkend, MetadataType = Ct>,
152    S: BackendStreamingTask<Bkend, MetadataType = Ct>,
153    Ct: PartialEq,
154    F: FnOnce(T::Output) -> S,
155{
156    type Output = S::Output;
157    type MetadataType = Ct;
158    fn into_stream(
159        self,
160        backend: &Bkend,
161    ) -> impl Stream<Item = Self::Output> + Send + Unpin + 'static {
162        let Then { first, create_next } = self;
163        let backend = backend.clone();
164        let (tx, rx) = tokio::sync::mpsc::channel(DEFAULT_STREAM_CHANNEL_SIZE);
165        tokio::task::spawn(async move {
166            let seed = first.into_future(&backend).await;
167            let mut stream = create_next(seed).into_stream(&backend);
168            while let Some(item) = stream.next().await {
169                let _ = tx.send(item).await;
170            }
171        });
172        ReceiverStream::new(rx)
173    }
174    fn metadata() -> Vec<Self::MetadataType> {
175        let mut first = T::metadata();
176        let mut second = S::metadata();
177        second.append(&mut first);
178        second
179    }
180}