1use crate::{
2 BackendStreamingTask, BackendTask, DEFAULT_STREAM_CHANNEL_SIZE, PanickingReceiverStream,
3};
4use futures::{Stream, StreamExt};
5use std::fmt::Debug;
6use std::future::Future;
7
8impl<Bkend, T: BackendTask<Bkend>> BackendTaskExt<Bkend> for T {}
9impl<Bkend, T: BackendTask<Bkend, Output = Result<O, E>>, O, E> TryBackendTaskExt<Bkend> for T {
10 type Error = E;
11 type Ok = O;
12}
13
14pub trait TryBackendTaskExt<Bkend>: BackendTask<Bkend> {
15 type Error;
16 type Ok;
17 fn map_stream<S, F>(self, create_next: F) -> Map<Self, F>
18 where
19 Self: Sized,
20 S: BackendStreamingTask<Bkend>,
21 F: FnOnce(Self::Ok) -> S,
22 {
23 Map {
24 first: self,
25 create_next,
26 }
27 }
28}
29pub trait BackendTaskExt<Bkend>: BackendTask<Bkend> {
30 fn then<T, F>(self, create_next: F) -> Then<Self, F>
31 where
32 Self: Sized,
33 T: BackendTask<Bkend>,
34 F: FnOnce(Self::Output) -> T,
35 {
36 Then {
37 first: self,
38 create_next,
39 }
40 }
41 fn then_stream<S, F>(self, create_next: F) -> Then<Self, F>
42 where
43 Self: Sized,
44 S: BackendStreamingTask<Bkend>,
45 F: FnOnce(Self::Output) -> S,
46 {
47 Then {
48 first: self,
49 create_next,
50 }
51 }
52}
53
54pub struct Map<T, F> {
55 first: T,
56 create_next: F,
57}
58
59impl<T, F> Debug for Map<T, F>
60where
61 T: Debug,
62{
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 f.debug_struct("Map")
65 .field("first", &self.first)
66 .field("create_next", &"..closure..")
68 .finish()
69 }
70}
71
72impl<Bkend, T, S, F, Ct, O, E> BackendStreamingTask<Bkend> for Map<T, F>
73where
74 Bkend: Clone + Sync + Send + 'static,
75 F: Sync + Send + 'static,
76 T: BackendTask<Bkend, MetadataType = Ct, Output = std::result::Result<O, E>>,
77 S: BackendStreamingTask<Bkend, MetadataType = Ct>,
78 Ct: PartialEq,
79 F: FnOnce(O) -> S,
80 E: Send + 'static,
81 O: Send,
82{
83 type Output = std::result::Result<S::Output, E>;
84 type MetadataType = Ct;
85 fn into_stream(
86 self,
87 backend: &Bkend,
88 ) -> impl Stream<Item = Self::Output> + Send + Unpin + 'static {
89 let Map { first, create_next } = self;
90 let backend = backend.clone();
91 let (tx, rx) = tokio::sync::mpsc::channel(DEFAULT_STREAM_CHANNEL_SIZE);
92 let handle = tokio::task::spawn(async move {
93 let seed = BackendTask::into_future(first, &backend).await;
94 match seed {
95 Ok(seed) => {
96 let mut stream = create_next(seed).into_stream(&backend);
97 while let Some(item) = stream.next().await {
98 let _ = tx.send(Ok(item)).await;
99 }
100 }
101 Err(e) => {
102 let _ = tx.send(Err(e)).await;
103 }
104 }
105 });
106 PanickingReceiverStream::new(rx, handle)
107 }
108 fn metadata() -> Vec<Self::MetadataType> {
109 let mut first = T::metadata();
110 let mut second = S::metadata();
111 second.append(&mut first);
112 second
113 }
114}
115
116pub struct Then<T, F> {
117 first: T,
118 create_next: F,
119}
120
121impl<Bkend, T, T2, F, Ct> BackendTask<Bkend> for Then<T, F>
122where
123 Bkend: Clone + Send + 'static,
124 F: Sync + Send + 'static,
125 T: BackendTask<Bkend, MetadataType = Ct>,
126 T2: BackendTask<Bkend, MetadataType = Ct>,
127 Ct: PartialEq,
128 F: FnOnce(T::Output) -> T2,
129{
130 type Output = T2::Output;
131 type MetadataType = Ct;
132 fn into_future(self, backend: &Bkend) -> impl Future<Output = Self::Output> + Send + 'static {
133 let Then { first, create_next } = self;
134 let backend = backend.clone();
135 async move {
136 let output = BackendTask::into_future(first, &backend).await;
137 let next = create_next(output);
138 BackendTask::into_future(next, &backend).await
139 }
140 }
141 fn metadata() -> Vec<Self::MetadataType> {
142 let mut first = T::metadata();
143 let mut second = T2::metadata();
144 second.append(&mut first);
145 second
146 }
147}
148
149impl<Bkend, T, S, F, Ct> BackendStreamingTask<Bkend> for Then<T, F>
150where
151 Bkend: Clone + Sync + Send + 'static,
152 F: Sync + Send + 'static,
153 T: BackendTask<Bkend, MetadataType = Ct>,
154 S: BackendStreamingTask<Bkend, MetadataType = Ct>,
155 Ct: PartialEq,
156 F: FnOnce(T::Output) -> S,
157{
158 type Output = S::Output;
159 type MetadataType = Ct;
160 fn into_stream(
161 self,
162 backend: &Bkend,
163 ) -> impl Stream<Item = Self::Output> + Send + Unpin + 'static {
164 let Then { first, create_next } = self;
165 let backend = backend.clone();
166 let (tx, rx) = tokio::sync::mpsc::channel(DEFAULT_STREAM_CHANNEL_SIZE);
167 let handle = tokio::task::spawn(async move {
168 let seed = BackendTask::into_future(first, &backend).await;
169 let mut stream = create_next(seed).into_stream(&backend);
170 while let Some(item) = stream.next().await {
171 let _ = tx.send(item).await;
172 }
173 });
174 PanickingReceiverStream::new(rx, handle)
175 }
176 fn metadata() -> Vec<Self::MetadataType> {
177 let mut first = T::metadata();
178 let mut second = S::metadata();
179 second.append(&mut first);
180 second
181 }
182}