1use crate::{BackendStreamingTask, BackendTask, DEFAULT_STREAM_CHANNEL_SIZE};
2use futures::{Stream, StreamExt};
3use std::fmt::Debug;
4use std::future::Future;
5use tokio_stream::wrappers::ReceiverStream;
6
7impl<Bkend, T: BackendTask<Bkend>> BackendTaskExt<Bkend> for T {}
8impl<Bkend, T: BackendTask<Bkend, Output = Result<O, E>>, O, E> TryBackendTaskExt<Bkend> for T {
9 type Error = E;
10 type Ok = O;
11}
12
13pub trait TryBackendTaskExt<Bkend>: BackendTask<Bkend> {
14 type Error;
15 type Ok;
16 fn map_stream<S, F>(self, create_next: F) -> Map<Self, F>
17 where
18 Self: Sized,
19 S: BackendStreamingTask<Bkend>,
20 F: FnOnce(Self::Ok) -> S,
21 {
22 Map {
23 first: self,
24 create_next,
25 }
26 }
27}
28pub trait BackendTaskExt<Bkend>: BackendTask<Bkend> {
29 fn then<T, F>(self, create_next: F) -> Then<Self, F>
30 where
31 Self: Sized,
32 T: BackendTask<Bkend>,
33 F: FnOnce(Self::Output) -> T,
34 {
35 Then {
36 first: self,
37 create_next,
38 }
39 }
40 fn then_stream<S, F>(self, create_next: F) -> Then<Self, F>
41 where
42 Self: Sized,
43 S: BackendStreamingTask<Bkend>,
44 F: FnOnce(Self::Output) -> S,
45 {
46 Then {
47 first: self,
48 create_next,
49 }
50 }
51}
52
53pub struct Map<T, F> {
54 first: T,
55 create_next: F,
56}
57
58impl<T, F> Debug for Map<T, F>
59where
60 T: Debug,
61{
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 f.debug_struct("Map")
64 .field("first", &self.first)
65 .field("create_next", &"..closure..")
67 .finish()
68 }
69}
70
71impl<Bkend, T, S, F, Ct, O, E> BackendStreamingTask<Bkend> for Map<T, F>
72where
73 Bkend: Clone + Sync + Send + 'static,
74 F: Sync + Send + 'static,
75 T: BackendTask<Bkend, MetadataType = Ct, Output = std::result::Result<O, E>>,
76 S: BackendStreamingTask<Bkend, MetadataType = Ct>,
77 Ct: PartialEq,
78 F: FnOnce(O) -> S,
79 E: Send + 'static,
80 O: Send,
81{
82 type Output = std::result::Result<S::Output, E>;
83 type MetadataType = Ct;
84 fn into_stream(
85 self,
86 backend: &Bkend,
87 ) -> impl Stream<Item = Self::Output> + Send + Unpin + 'static {
88 let Map { first, create_next } = self;
89 let backend = backend.clone();
90 let (tx, rx) = tokio::sync::mpsc::channel(DEFAULT_STREAM_CHANNEL_SIZE);
91 tokio::task::spawn(async move {
92 let seed = first.into_future(&backend).await;
93 match seed {
94 Ok(seed) => {
95 let mut stream = create_next(seed).into_stream(&backend);
96 while let Some(item) = stream.next().await {
97 let _ = tx.send(Ok(item)).await;
98 }
99 }
100 Err(e) => {
101 let _ = tx.send(Err(e)).await;
102 }
103 }
104 });
105 ReceiverStream::new(rx)
106 }
107 fn metadata() -> Vec<Self::MetadataType> {
108 let mut first = T::metadata();
109 let mut second = S::metadata();
110 second.append(&mut first);
111 second
112 }
113}
114
115pub struct Then<T, F> {
116 first: T,
117 create_next: F,
118}
119
120impl<Bkend, T, T2, F, Ct> BackendTask<Bkend> for Then<T, F>
121where
122 Bkend: Clone + Send + 'static,
123 F: Sync + Send + 'static,
124 T: BackendTask<Bkend, MetadataType = Ct>,
125 T2: BackendTask<Bkend, MetadataType = Ct>,
126 Ct: PartialEq,
127 F: FnOnce(T::Output) -> T2,
128{
129 type Output = T2::Output;
130 type MetadataType = Ct;
131 fn into_future(self, backend: &Bkend) -> impl Future<Output = Self::Output> + Send + 'static {
132 let Then { first, create_next } = self;
133 let backend = backend.clone();
134 async move {
135 let output = first.into_future(&backend).await;
136 let next = create_next(output);
137 next.into_future(&backend).await
138 }
139 }
140 fn metadata() -> Vec<Self::MetadataType> {
141 let mut first = T::metadata();
142 let mut second = T2::metadata();
143 second.append(&mut first);
144 second
145 }
146}
147
148impl<Bkend, T, S, F, Ct> BackendStreamingTask<Bkend> for Then<T, F>
149where
150 Bkend: Clone + Sync + Send + 'static,
151 F: Sync + Send + 'static,
152 T: BackendTask<Bkend, MetadataType = Ct>,
153 S: BackendStreamingTask<Bkend, MetadataType = Ct>,
154 Ct: PartialEq,
155 F: FnOnce(T::Output) -> S,
156{
157 type Output = S::Output;
158 type MetadataType = Ct;
159 fn into_stream(
160 self,
161 backend: &Bkend,
162 ) -> impl Stream<Item = Self::Output> + Send + Unpin + 'static {
163 let Then { first, create_next } = self;
164 let backend = backend.clone();
165 let (tx, rx) = tokio::sync::mpsc::channel(DEFAULT_STREAM_CHANNEL_SIZE);
166 tokio::task::spawn(async move {
167 let seed = first.into_future(&backend).await;
168 let mut stream = create_next(seed).into_stream(&backend);
169 while let Some(item) = stream.next().await {
170 let _ = tx.send(item).await;
171 }
172 });
173 ReceiverStream::new(rx)
174 }
175 fn metadata() -> Vec<Self::MetadataType> {
176 let mut first = T::metadata();
177 let mut second = S::metadata();
178 second.append(&mut first);
179 second
180 }
181}