1use crate::backend::{BackendExt, TaskSink};
49use crate::error::BoxDynError;
50use crate::features_table;
51use crate::task::Task;
52use crate::{backend::Backend, backend::codec::Codec, worker::context::WorkerContext};
53use futures_sink::Sink;
54use futures_util::stream::{once, select};
55use futures_util::{SinkExt, Stream, TryStreamExt};
56use futures_util::{StreamExt, stream::BoxStream};
57use std::fmt;
58use std::fmt::Debug;
59use std::marker::PhantomData;
60use std::ops::{Deref, DerefMut};
61
62#[doc = features_table! {
64 setup = "{ unreachable!() }",
65 TaskSink => supported("Ability to push new tasks", false),
66 InheritsFeatures => limited("Inherits features from the underlying backend", false),
67}]
68pub struct Pipe<S, Into, Args, Ctx> {
69 pub(crate) from: S,
70 pub(crate) into: Into,
71 pub(crate) _req: PhantomData<(Args, Ctx)>,
72}
73
74impl<S: Clone, Into: Clone, Args, Ctx> Clone for Pipe<S, Into, Args, Ctx> {
75 fn clone(&self) -> Self {
76 Self {
77 from: self.from.clone(),
78 into: self.into.clone(),
79 _req: PhantomData,
80 }
81 }
82}
83
84impl<S, Into, Args, Ctx> Deref for Pipe<S, Into, Args, Ctx> {
85 type Target = Into;
86
87 fn deref(&self) -> &Self::Target {
88 &self.into
89 }
90}
91
92impl<S, Into, Args, Ctx> DerefMut for Pipe<S, Into, Args, Ctx> {
93 fn deref_mut(&mut self) -> &mut Self::Target {
94 &mut self.into
95 }
96}
97
98impl<S, Into, Args, Ctx> Pipe<S, Into, Args, Ctx> {
99 pub fn new(stream: S, backend: Into) -> Self {
101 Self {
102 from: stream,
103 into: backend,
104 _req: PhantomData,
105 }
106 }
107}
108
109impl<S: fmt::Debug, Into: fmt::Debug, Args, Ctx> fmt::Debug for Pipe<S, Into, Args, Ctx> {
110 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111 f.debug_struct("Pipe")
112 .field("inner", &self.from)
113 .field("into", &self.into)
114 .finish()
115 }
116}
117
118impl<Args, Ctx, S, TSink, Err> Backend for Pipe<S, TSink, Args, Ctx>
119where
120 S: Stream<Item = Result<Args, Err>> + Send + 'static,
121 TSink: Backend<Args = Args, Context = Ctx>
122 + BackendExt
123 + TaskSink<Args>
124 + Clone
125 + Unpin
126 + Send
127 + 'static
128 + Sink<Task<TSink::Compact, Ctx, TSink::IdType>>,
129 <TSink as Backend>::Error: std::error::Error + Send + Sync + 'static,
130 TSink::Beat: Send + 'static,
131 TSink::IdType: Send + Clone + 'static,
132 TSink::Stream: Send + 'static,
133 Args: Send + 'static,
134 Ctx: Send + 'static + Default,
135 Err: std::error::Error + Send + Sync + 'static,
136 <TSink as Sink<Task<TSink::Compact, Ctx, TSink::IdType>>>::Error:
137 std::error::Error + Send + Sync + 'static,
138 <<TSink as BackendExt>::Codec as Codec<Args>>::Error: std::error::Error + Send + Sync + 'static,
139 TSink::Compact: Send,
140{
141 type Args = Args;
142
143 type IdType = TSink::IdType;
144
145 type Context = Ctx;
146
147 type Stream = BoxStream<'static, Result<Option<Task<Args, Ctx, Self::IdType>>, PipeError>>;
148
149 type Layer = TSink::Layer;
150
151 type Beat = BoxStream<'static, Result<(), PipeError>>;
152
153 type Error = PipeError;
154
155 fn heartbeat(&self, worker: &WorkerContext) -> Self::Beat {
156 self.into
157 .heartbeat(worker)
158 .map_err(|e| PipeError::Inner(e.into()))
159 .boxed()
160 }
161
162 fn middleware(&self) -> Self::Layer {
163 self.into.middleware()
164 }
165
166 fn poll(self, worker: &WorkerContext) -> Self::Stream {
167 let mut sink = self
168 .into
169 .clone()
170 .sink_map_err(|e| PipeError::Inner(e.into()));
171
172 let mut sink_stream = self
173 .from
174 .map_err(|e| PipeError::Inner(e.into()))
175 .map_ok(|s| {
176 Task::new(s)
177 .try_map(|s| TSink::Codec::encode(&s).map_err(|e| PipeError::Inner(e.into())))
178 })
179 .map(|t| t.and_then(|t| t))
180 .boxed();
181
182 let sender_stream = self.into.poll(worker);
183 select(
184 once(async move {
185 let fut = sink.send_all(&mut sink_stream);
186 fut.await.map_err(|e| PipeError::Inner(e.into()))?;
187 Ok(None)
188 }),
189 sender_stream.map_err(|e| PipeError::Inner(e.into())),
190 )
191 .boxed()
192 }
193}
194
195pub trait PipeExt<B, Args, Ctx>
197where
198 Self: Sized,
199{
200 fn pipe_to(self, backend: B) -> Pipe<Self, B, Args, Ctx>;
202}
203
204impl<B, Args, Ctx, Err, S> PipeExt<B, Args, Ctx> for S
205where
206 S: Stream<Item = Result<Args, Err>> + Send + 'static,
207 <B as Backend>::Error: Into<BoxDynError> + Send + Sync + 'static,
208 B: Backend<Args = Args> + TaskSink<Args>,
209{
210 fn pipe_to(self, backend: B) -> Pipe<Self, B, Args, Ctx> {
211 Pipe::new(self, backend)
212 }
213}
214
215#[derive(Debug, thiserror::Error)]
217pub enum PipeError {
218 #[error("The inner stream provided a None")]
220 EmptyStream,
221 #[error("The inner stream error: {0}")]
223 Inner(BoxDynError),
224}
225
226#[cfg(test)]
227mod tests {
228 use std::{io, time::Duration};
229
230 use futures_util::stream;
231
232 use crate::{
233 backend::dequeue,
234 error::BoxDynError,
235 worker::{
236 builder::WorkerBuilder, context::WorkerContext, ext::event_listener::EventListenerExt,
237 },
238 };
239
240 use super::*;
241
242 const ITEMS: u32 = 10;
243
244 #[tokio::test]
245 async fn basic_worker() {
246 let stm = stream::iter(0..ITEMS).map(Ok::<_, io::Error>);
247 let in_memory = dequeue::backend::<u32>(Duration::from_secs(1));
248
249 let backend = Pipe::new(stm, in_memory);
250
251 async fn task(task: u32, ctx: WorkerContext) -> Result<(), BoxDynError> {
252 tokio::time::sleep(Duration::from_secs(1)).await;
253 if task == ITEMS - 1 {
254 ctx.stop().unwrap();
255 return Err("Graceful Exit".into());
256 }
257 Ok(())
258 }
259
260 let worker = WorkerBuilder::new("rango-tango")
261 .backend(backend)
262 .on_event(|_ctx, ev| {
263 println!("On Event = {ev:?}");
264 })
265 .build(task);
266 worker.run().await.unwrap();
267 }
268}