1use crate::{BackendStreamingTask, BackendTask, DEFAULT_STREAM_CHANNEL_SIZE};
2use futures::{Stream, StreamExt};
3use std::{fmt::Debug, future::Future};
4use tokio_stream::wrappers::ReceiverStream;
5
6impl<Bkend, T: BackendTask<Bkend>> BackendTaskExt<Bkend> for T {}
7impl<Bkend, T: BackendTask<Bkend, Output = Result<O, E>>, O, E> TryBackendTaskExt<Bkend> for T {
8 type Error = E;
9 type Ok = O;
10}
11
12pub trait TryBackendTaskExt<Bkend>: BackendTask<Bkend> {
13 type Error;
14 type Ok;
15 fn map_stream<S, F>(self, create_next: F) -> Map<Self, F>
16 where
17 Self: Sized,
18 S: BackendStreamingTask<Bkend>,
19 F: FnOnce(Self::Ok) -> S,
20 {
21 Map {
22 first: self,
23 create_next,
24 }
25 }
26}
27pub trait BackendTaskExt<Bkend>: BackendTask<Bkend> {
28 fn then<T, F>(self, create_next: F) -> Then<Self, F>
29 where
30 Self: Sized,
31 T: BackendTask<Bkend>,
32 F: FnOnce(Self::Output) -> T,
33 {
34 Then {
35 first: self,
36 create_next,
37 }
38 }
39 fn then_stream<S, F>(self, create_next: F) -> Then<Self, F>
40 where
41 Self: Sized,
42 S: BackendStreamingTask<Bkend>,
43 F: FnOnce(Self::Output) -> S,
44 {
45 Then {
46 first: self,
47 create_next,
48 }
49 }
50}
51
52pub struct Map<T, F> {
53 first: T,
54 create_next: F,
55}
56
57impl<T, F> Debug for Map<T, F>
58where
59 T: Debug,
60{
61 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62 f.debug_struct("Map")
63 .field("first", &self.first)
64 .field("create_next", &"..closure..")
66 .finish()
67 }
68}
69
70impl<Bkend, T, S, F, Ct, O, E> BackendStreamingTask<Bkend> for Map<T, F>
71where
72 Bkend: Clone + Sync + Send + 'static,
73 F: Sync + Send + 'static,
74 T: BackendTask<Bkend, MetadataType = Ct, Output = std::result::Result<O, E>>,
75 S: BackendStreamingTask<Bkend, MetadataType = Ct>,
76 Ct: PartialEq,
77 F: FnOnce(O) -> S,
78 E: Send + 'static,
79 O: Send,
80{
81 type Output = std::result::Result<S::Output, E>;
82 type MetadataType = Ct;
83 fn into_stream(
84 self,
85 backend: &Bkend,
86 ) -> impl Stream<Item = Self::Output> + Send + Unpin + 'static {
87 let Map { first, create_next } = self;
88 let backend = backend.clone();
89 let (tx, rx) = tokio::sync::mpsc::channel(DEFAULT_STREAM_CHANNEL_SIZE);
90 tokio::task::spawn(async move {
91 let seed = first.into_future(&backend).await;
92 match seed {
93 Ok(seed) => {
94 let mut stream = create_next(seed).into_stream(&backend);
95 while let Some(item) = stream.next().await {
96 let _ = tx.send(Ok(item)).await;
97 }
98 }
99 Err(e) => {
100 let _ = tx.send(Err(e)).await;
101 }
102 }
103 });
104 ReceiverStream::new(rx)
105 }
106 fn metadata() -> Vec<Self::MetadataType> {
107 let mut first = T::metadata();
108 let mut second = S::metadata();
109 second.append(&mut first);
110 second
111 }
112}
113
114pub struct Then<T, F> {
115 first: T,
116 create_next: F,
117}
118
119impl<Bkend, T, T2, F, Ct> BackendTask<Bkend> for Then<T, F>
120where
121 Bkend: Clone + Send + 'static,
122 F: Sync + Send + 'static,
123 T: BackendTask<Bkend, MetadataType = Ct>,
124 T2: BackendTask<Bkend, MetadataType = Ct>,
125 Ct: PartialEq,
126 F: FnOnce(T::Output) -> T2,
127{
128 type Output = T2::Output;
129 type MetadataType = Ct;
130 fn into_future(self, backend: &Bkend) -> impl Future<Output = Self::Output> + Send + 'static {
131 let Then { first, create_next } = self;
132 let backend = backend.clone();
133 async move {
134 let output = first.into_future(&backend).await;
135 let next = create_next(output);
136 next.into_future(&backend).await
137 }
138 }
139 fn metadata() -> Vec<Self::MetadataType> {
140 let mut first = T::metadata();
141 let mut second = T2::metadata();
142 second.append(&mut first);
143 second
144 }
145}
146
147impl<Bkend, T, S, F, Ct> BackendStreamingTask<Bkend> for Then<T, F>
148where
149 Bkend: Clone + Sync + Send + 'static,
150 F: Sync + Send + 'static,
151 T: BackendTask<Bkend, MetadataType = Ct>,
152 S: BackendStreamingTask<Bkend, MetadataType = Ct>,
153 Ct: PartialEq,
154 F: FnOnce(T::Output) -> S,
155{
156 type Output = S::Output;
157 type MetadataType = Ct;
158 fn into_stream(
159 self,
160 backend: &Bkend,
161 ) -> impl Stream<Item = Self::Output> + Send + Unpin + 'static {
162 let Then { first, create_next } = self;
163 let backend = backend.clone();
164 let (tx, rx) = tokio::sync::mpsc::channel(DEFAULT_STREAM_CHANNEL_SIZE);
165 tokio::task::spawn(async move {
166 let seed = first.into_future(&backend).await;
167 let mut stream = create_next(seed).into_stream(&backend);
168 while let Some(item) = stream.next().await {
169 let _ = tx.send(item).await;
170 }
171 });
172 ReceiverStream::new(rx)
173 }
174 fn metadata() -> Vec<Self::MetadataType> {
175 let mut first = T::metadata();
176 let mut second = S::metadata();
177 second.append(&mut first);
178 second
179 }
180}