1use crate::backend::TaskSink;
50use crate::error::BoxDynError;
51use crate::features_table;
52use crate::task::Task;
53use crate::{backend::Backend, backend::codec::Codec, worker::context::WorkerContext};
54use futures_sink::Sink;
55use futures_util::stream::{once, select};
56use futures_util::{SinkExt, Stream, TryStreamExt};
57use futures_util::{StreamExt, stream::BoxStream};
58use std::fmt;
59use std::fmt::Debug;
60use std::marker::PhantomData;
61use std::ops::{Deref, DerefMut};
62
63#[doc = features_table! {
65 setup = unreachable!();,
66 TaskSink => supported("Ability to push new tasks", false),
67 InheritsFeatures => limited("Inherits features from the underlying backend", false),
68}]
69pub struct Pipe<S, Into, Args, Ctx> {
70 pub(crate) from: S,
71 pub(crate) into: Into,
72 pub(crate) _req: PhantomData<(Args, Ctx)>,
73}
74
75impl<S: Clone, Into: Clone, Args, Ctx> Clone for Pipe<S, Into, Args, Ctx> {
76 fn clone(&self) -> Self {
77 Pipe {
78 from: self.from.clone(),
79 into: self.into.clone(),
80 _req: PhantomData,
81 }
82 }
83}
84
85impl<S, Into, Args, Ctx> Deref for Pipe<S, Into, Args, Ctx> {
86 type Target = Into;
87
88 fn deref(&self) -> &Self::Target {
89 &self.into
90 }
91}
92
93impl<S, Into, Args, Ctx> DerefMut for Pipe<S, Into, Args, Ctx> {
94 fn deref_mut(&mut self) -> &mut Self::Target {
95 &mut self.into
96 }
97}
98
99impl<S, Into, Args, Ctx> Pipe<S, Into, Args, Ctx> {
100 pub fn new(stream: S, backend: Into) -> Self {
102 Pipe {
103 from: stream,
104 into: backend,
105 _req: PhantomData,
106 }
107 }
108}
109
110impl<S: fmt::Debug, Into: fmt::Debug, Args, Ctx> fmt::Debug for Pipe<S, Into, Args, Ctx> {
111 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
112 f.debug_struct("Pipe")
113 .field("inner", &self.from)
114 .field("into", &self.into)
115 .finish()
116 }
117}
118
119impl<Args, Ctx, S, TSink, Err> Backend for Pipe<S, TSink, Args, Ctx>
120where
121 S: Stream<Item = Result<Args, Err>> + Send + 'static,
122 TSink: Backend<Args = Args, Context = Ctx>
123 + TaskSink<Args>
124 + Clone
125 + Unpin
126 + Send
127 + 'static
128 + Sink<Task<TSink::Compact, Ctx, TSink::IdType>>,
129 <TSink as Backend>::Error: std::error::Error + Send + Sync + 'static,
130 TSink::Beat: Send + 'static,
131 TSink::IdType: Send + Clone + 'static,
132 TSink::Stream: Send + 'static,
133 Args: Send + 'static,
134 Ctx: Send + 'static + Default,
135 Err: std::error::Error + Send + Sync + 'static,
136 <TSink as Sink<Task<TSink::Compact, Ctx, TSink::IdType>>>::Error:
137 std::error::Error + Send + Sync + 'static,
138 <<TSink as Backend>::Codec as Codec<Args>>::Error: std::error::Error + Send + Sync + 'static,
139 TSink::Compact: Send,
140{
141 type Args = Args;
142
143 type IdType = TSink::IdType;
144
145 type Context = Ctx;
146
147 type Stream = BoxStream<'static, Result<Option<Task<Args, Ctx, Self::IdType>>, PipeError>>;
148
149 type Layer = TSink::Layer;
150
151 type Beat = BoxStream<'static, Result<(), PipeError>>;
152
153 type Error = PipeError;
154
155 type Codec = TSink::Codec;
156
157 type Compact = TSink::Compact;
158
159 fn heartbeat(&self, worker: &WorkerContext) -> Self::Beat {
160 self.into
161 .heartbeat(worker)
162 .map_err(|e| PipeError::Inner(e.into()))
163 .boxed()
164 }
165
166 fn middleware(&self) -> Self::Layer {
167 self.into.middleware()
168 }
169
170 fn poll(self, worker: &WorkerContext) -> Self::Stream {
171 let mut sink = self
172 .into
173 .clone()
174 .sink_map_err(|e| PipeError::Inner(e.into()));
175
176 let mut sink_stream = self
177 .from
178 .map_err(|e| PipeError::Inner(e.into()))
179 .map_ok(|s| {
180 Task::new(s)
181 .try_map(|s| TSink::Codec::encode(&s).map_err(|e| PipeError::Inner(e.into())))
182 })
183 .map(|t| t.and_then(|t| t))
184 .boxed();
185
186 let sender_stream = self.into.poll(worker);
187 select(
188 once(async move {
189 let fut = sink.send_all(&mut sink_stream);
190 fut.await.map_err(|e| PipeError::Inner(e.into()))?;
191 Ok(None)
192 }),
193 sender_stream.map_err(|e| PipeError::Inner(e.into())),
194 )
195 .boxed()
196 }
197}
198
199pub trait PipeExt<B, Args, Ctx>
201where
202 Self: Sized,
203{
204 fn pipe_to(self, backend: B) -> Pipe<Self, B, Args, Ctx>;
206}
207
208impl<B, Args, Ctx, Err, S> PipeExt<B, Args, Ctx> for S
209where
210 S: Stream<Item = Result<Args, Err>> + Send + 'static,
211 <B as Backend>::Error: Into<BoxDynError> + Send + Sync + 'static,
212 B: Backend<Args = Args> + TaskSink<Args>,
213{
214 fn pipe_to(self, backend: B) -> Pipe<Self, B, Args, Ctx> {
215 Pipe::new(self, backend)
216 }
217}
218
219#[derive(Debug, thiserror::Error)]
221pub enum PipeError {
222 #[error("The inner stream provided a None")]
224 EmptyStream,
225 #[error("The inner stream error: {0}")]
227 Inner(BoxDynError),
228}
229
230#[cfg(test)]
231#[cfg(feature = "json")]
232mod tests {
233 use std::{io, time::Duration};
234
235 use futures_util::stream;
236
237 use crate::{
238 backend::json::JsonStorage,
239 error::BoxDynError,
240 worker::{
241 builder::WorkerBuilder, context::WorkerContext, ext::event_listener::EventListenerExt,
242 },
243 };
244
245 use super::*;
246
247 const ITEMS: u32 = 10;
248
249 #[tokio::test]
250 async fn basic_worker() {
251 let stm = stream::iter(0..ITEMS).map(|s| Ok::<_, io::Error>(s));
252 let in_memory = JsonStorage::new_temp().unwrap();
253
254 let backend = Pipe::new(stm, in_memory);
255
256 async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
257 tokio::time::sleep(Duration::from_secs(1)).await;
258 if task == ITEMS - 1 {
259 ctx.stop().unwrap();
260 return Err("Graceful Exit".into());
261 }
262 Ok(())
263 }
264
265 let worker = WorkerBuilder::new("rango-tango")
266 .backend(backend)
267 .on_event(|_ctx, ev| {
268 println!("On Event = {:?}", ev);
269 })
270 .build(task);
271 worker.run().await.unwrap();
272 }
273}