1use crate::{
2 BackendStreamingTask, BackendTask, DEFAULT_STREAM_CHANNEL_SIZE, OptDebug, OptPartialEq,
3 PanickingReceiverStream,
4};
5use futures::{Stream, StreamExt};
6use std::fmt::Debug;
7use std::future::Future;
8
9impl<Bkend, T: BackendTask<Bkend>> BackendTaskExt<Bkend> for T {}
10impl<Bkend, T: BackendTask<Bkend, Output = Result<O, E>>, O, E> TryBackendTaskExt<Bkend> for T {
11 type Error = E;
12 type Ok = O;
13}
14pub trait MapFn<T>: OptPartialEq + OptDebug {
18 type Output;
19 fn apply(self, input: T) -> Self::Output;
20}
21#[cfg(all(not(feature = "task-equality"), not(feature = "task-debug")))]
22impl<T, F, O> MapFn<T> for F
23where
24 F: FnOnce(T) -> O,
25{
26 type Output = O;
27 fn apply(self, input: T) -> Self::Output {
28 self(input)
29 }
30}
31
32pub trait TryBackendTaskExt<Bkend>: BackendTask<Bkend> {
33 type Error;
34 type Ok;
35 fn map_stream<S, F>(self, create_next: F) -> Map<Self, F>
36 where
37 Self: Sized,
38 S: BackendStreamingTask<Bkend>,
39 F: MapFn<Self::Ok, Output = S>,
40 {
41 Map {
42 first: self,
43 create_next,
44 }
45 }
46}
47pub trait BackendTaskExt<Bkend>: BackendTask<Bkend> {
48 fn then<T, F>(self, create_next: F) -> Then<Self, F>
49 where
50 Self: Sized,
51 T: BackendTask<Bkend>,
52 F: FnOnce(Self::Output) -> T,
53 {
54 Then {
55 first: self,
56 create_next,
57 }
58 }
59 fn then_stream<S, F>(self, create_next: F) -> Then<Self, F>
60 where
61 Self: Sized,
62 S: BackendStreamingTask<Bkend>,
63 F: FnOnce(Self::Output) -> S,
64 {
65 Then {
66 first: self,
67 create_next,
68 }
69 }
70}
71
72pub struct Map<T, F> {
73 first: T,
74 create_next: F,
75}
76
77impl<T, F> PartialEq for Map<T, F>
78where
79 T: PartialEq,
80 F: PartialEq,
81{
82 fn eq(&self, other: &Self) -> bool {
83 self.first == other.first && self.create_next == other.create_next
84 }
85}
86
87impl<T, F> Debug for Map<T, F>
88where
89 T: Debug,
90 F: Debug,
91{
92 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93 f.debug_struct("Map")
94 .field("first", &self.first)
95 .field("create_next", &self.create_next)
96 .finish()
97 }
98}
99
100impl<Bkend, T, S, F, Ct, O, E> BackendStreamingTask<Bkend> for Map<T, F>
101where
102 Bkend: Clone + Sync + Send + 'static,
103 F: Sync + Send + 'static,
104 T: BackendTask<Bkend, MetadataType = Ct, Output = std::result::Result<O, E>>,
105 S: BackendStreamingTask<Bkend, MetadataType = Ct>,
106 Ct: PartialEq,
107 F: MapFn<O, Output = S>,
108 E: Send + 'static,
109 O: Send,
110{
111 type Output = std::result::Result<S::Output, E>;
112 type MetadataType = Ct;
113 fn into_stream(
114 self,
115 backend: &Bkend,
116 ) -> impl Stream<Item = Self::Output> + Send + Unpin + 'static {
117 let Map {
118 first, create_next, ..
119 } = self;
120 let backend = backend.clone();
121 let (tx, rx) = tokio::sync::mpsc::channel(DEFAULT_STREAM_CHANNEL_SIZE);
122 let handle = tokio::task::spawn(async move {
123 let seed = BackendTask::into_future(first, &backend).await;
124 match seed {
125 Ok(seed) => {
126 let mut stream = create_next.apply(seed).into_stream(&backend);
127 while let Some(item) = stream.next().await {
128 let _ = tx.send(Ok(item)).await;
129 }
130 }
131 Err(e) => {
132 let _ = tx.send(Err(e)).await;
133 }
134 }
135 });
136 PanickingReceiverStream::new(rx, handle)
137 }
138 fn metadata() -> Vec<Self::MetadataType> {
139 let mut first = T::metadata();
140 let mut second = S::metadata();
141 second.append(&mut first);
142 second
143 }
144}
145
146#[derive(PartialEq, Debug)]
147pub struct Then<T, F> {
148 first: T,
149 create_next: F,
150}
151
152impl<Bkend, T, T2, F, Ct> BackendTask<Bkend> for Then<T, F>
153where
154 Bkend: Clone + Send + 'static,
155 F: Sync + Send + 'static,
156 T: BackendTask<Bkend, MetadataType = Ct>,
157 T2: BackendTask<Bkend, MetadataType = Ct>,
158 Ct: PartialEq,
159 F: MapFn<T::Output, Output = T2>,
160{
161 type Output = T2::Output;
162 type MetadataType = Ct;
163 fn into_future(self, backend: &Bkend) -> impl Future<Output = Self::Output> + Send + 'static {
164 let Then { first, create_next } = self;
165 let backend = backend.clone();
166 async move {
167 let output = BackendTask::into_future(first, &backend).await;
168 let next = create_next.apply(output);
169 BackendTask::into_future(next, &backend).await
170 }
171 }
172 fn metadata() -> Vec<Self::MetadataType> {
173 let mut first = T::metadata();
174 let mut second = T2::metadata();
175 second.append(&mut first);
176 second
177 }
178}
179
180impl<Bkend, T, S, F, Ct> BackendStreamingTask<Bkend> for Then<T, F>
181where
182 Bkend: Clone + Sync + Send + 'static,
183 F: Sync + Send + 'static,
184 T: BackendTask<Bkend, MetadataType = Ct>,
185 S: BackendStreamingTask<Bkend, MetadataType = Ct>,
186 Ct: PartialEq,
187 F: MapFn<T::Output, Output = S>,
188{
189 type Output = S::Output;
190 type MetadataType = Ct;
191 fn into_stream(
192 self,
193 backend: &Bkend,
194 ) -> impl Stream<Item = Self::Output> + Send + Unpin + 'static {
195 let Then { first, create_next } = self;
196 let backend = backend.clone();
197 let (tx, rx) = tokio::sync::mpsc::channel(DEFAULT_STREAM_CHANNEL_SIZE);
198 let handle = tokio::task::spawn(async move {
199 let seed = BackendTask::into_future(first, &backend).await;
200 let mut stream = create_next.apply(seed).into_stream(&backend);
201 while let Some(item) = stream.next().await {
202 let _ = tx.send(item).await;
203 }
204 });
205 PanickingReceiverStream::new(rx, handle)
206 }
207 fn metadata() -> Vec<Self::MetadataType> {
208 let mut first = T::metadata();
209 let mut second = S::metadata();
210 second.append(&mut first);
211 second
212 }
213}