async_callback_manager/
adaptors.rs

1use crate::{
2    BackendStreamingTask, BackendTask, DEFAULT_STREAM_CHANNEL_SIZE, PanickingReceiverStream,
3};
4use futures::{Stream, StreamExt};
5use std::fmt::Debug;
6use std::future::Future;
7
8impl<Bkend, T: BackendTask<Bkend>> BackendTaskExt<Bkend> for T {}
9impl<Bkend, T: BackendTask<Bkend, Output = Result<O, E>>, O, E> TryBackendTaskExt<Bkend> for T {
10    type Error = E;
11    type Ok = O;
12}
13
14pub trait TryBackendTaskExt<Bkend>: BackendTask<Bkend> {
15    type Error;
16    type Ok;
17    fn map_stream<S, F>(self, create_next: F) -> Map<Self, F>
18    where
19        Self: Sized,
20        S: BackendStreamingTask<Bkend>,
21        F: FnOnce(Self::Ok) -> S,
22    {
23        Map {
24            first: self,
25            create_next,
26        }
27    }
28}
29pub trait BackendTaskExt<Bkend>: BackendTask<Bkend> {
30    fn then<T, F>(self, create_next: F) -> Then<Self, F>
31    where
32        Self: Sized,
33        T: BackendTask<Bkend>,
34        F: FnOnce(Self::Output) -> T,
35    {
36        Then {
37            first: self,
38            create_next,
39        }
40    }
41    fn then_stream<S, F>(self, create_next: F) -> Then<Self, F>
42    where
43        Self: Sized,
44        S: BackendStreamingTask<Bkend>,
45        F: FnOnce(Self::Output) -> S,
46    {
47        Then {
48            first: self,
49            create_next,
50        }
51    }
52}
53
54pub struct Map<T, F> {
55    first: T,
56    create_next: F,
57}
58
59impl<T, F> Debug for Map<T, F>
60where
61    T: Debug,
62{
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        f.debug_struct("Map")
65            .field("first", &self.first)
66            // TODO: we could deduce the type name returned by the closure
67            .field("create_next", &"..closure..")
68            .finish()
69    }
70}
71
72impl<Bkend, T, S, F, Ct, O, E> BackendStreamingTask<Bkend> for Map<T, F>
73where
74    Bkend: Clone + Sync + Send + 'static,
75    F: Sync + Send + 'static,
76    T: BackendTask<Bkend, MetadataType = Ct, Output = std::result::Result<O, E>>,
77    S: BackendStreamingTask<Bkend, MetadataType = Ct>,
78    Ct: PartialEq,
79    F: FnOnce(O) -> S,
80    E: Send + 'static,
81    O: Send,
82{
83    type Output = std::result::Result<S::Output, E>;
84    type MetadataType = Ct;
85    fn into_stream(
86        self,
87        backend: &Bkend,
88    ) -> impl Stream<Item = Self::Output> + Send + Unpin + 'static {
89        let Map { first, create_next } = self;
90        let backend = backend.clone();
91        let (tx, rx) = tokio::sync::mpsc::channel(DEFAULT_STREAM_CHANNEL_SIZE);
92        let handle = tokio::task::spawn(async move {
93            let seed = BackendTask::into_future(first, &backend).await;
94            match seed {
95                Ok(seed) => {
96                    let mut stream = create_next(seed).into_stream(&backend);
97                    while let Some(item) = stream.next().await {
98                        let _ = tx.send(Ok(item)).await;
99                    }
100                }
101                Err(e) => {
102                    let _ = tx.send(Err(e)).await;
103                }
104            }
105        });
106        PanickingReceiverStream::new(rx, handle)
107    }
108    fn metadata() -> Vec<Self::MetadataType> {
109        let mut first = T::metadata();
110        let mut second = S::metadata();
111        second.append(&mut first);
112        second
113    }
114}
115
116pub struct Then<T, F> {
117    first: T,
118    create_next: F,
119}
120
121impl<Bkend, T, T2, F, Ct> BackendTask<Bkend> for Then<T, F>
122where
123    Bkend: Clone + Send + 'static,
124    F: Sync + Send + 'static,
125    T: BackendTask<Bkend, MetadataType = Ct>,
126    T2: BackendTask<Bkend, MetadataType = Ct>,
127    Ct: PartialEq,
128    F: FnOnce(T::Output) -> T2,
129{
130    type Output = T2::Output;
131    type MetadataType = Ct;
132    fn into_future(self, backend: &Bkend) -> impl Future<Output = Self::Output> + Send + 'static {
133        let Then { first, create_next } = self;
134        let backend = backend.clone();
135        async move {
136            let output = BackendTask::into_future(first, &backend).await;
137            let next = create_next(output);
138            BackendTask::into_future(next, &backend).await
139        }
140    }
141    fn metadata() -> Vec<Self::MetadataType> {
142        let mut first = T::metadata();
143        let mut second = T2::metadata();
144        second.append(&mut first);
145        second
146    }
147}
148
149impl<Bkend, T, S, F, Ct> BackendStreamingTask<Bkend> for Then<T, F>
150where
151    Bkend: Clone + Sync + Send + 'static,
152    F: Sync + Send + 'static,
153    T: BackendTask<Bkend, MetadataType = Ct>,
154    S: BackendStreamingTask<Bkend, MetadataType = Ct>,
155    Ct: PartialEq,
156    F: FnOnce(T::Output) -> S,
157{
158    type Output = S::Output;
159    type MetadataType = Ct;
160    fn into_stream(
161        self,
162        backend: &Bkend,
163    ) -> impl Stream<Item = Self::Output> + Send + Unpin + 'static {
164        let Then { first, create_next } = self;
165        let backend = backend.clone();
166        let (tx, rx) = tokio::sync::mpsc::channel(DEFAULT_STREAM_CHANNEL_SIZE);
167        let handle = tokio::task::spawn(async move {
168            let seed = BackendTask::into_future(first, &backend).await;
169            let mut stream = create_next(seed).into_stream(&backend);
170            while let Some(item) = stream.next().await {
171                let _ = tx.send(item).await;
172            }
173        });
174        PanickingReceiverStream::new(rx, handle)
175    }
176    fn metadata() -> Vec<Self::MetadataType> {
177        let mut first = T::metadata();
178        let mut second = S::metadata();
179        second.append(&mut first);
180        second
181    }
182}