1use std::{
47 collections::HashMap,
48 sync::{Arc, Mutex, OnceLock},
49};
50
51use super::AsyncEngine;
52use async_trait::async_trait;
53use tokio::sync::oneshot;
54
55use super::{Data, Error, PipelineError, PipelineIO};
56
57mod sinks;
58mod sources;
59
60pub use sinks::{SegmentSink, ServiceBackend};
61pub use sources::{SegmentSource, ServiceFrontend};
62
63pub type Service<In, Out> = Arc<ServiceFrontend<In, Out>>;
64
65mod private {
66 pub struct Token;
67}
68
69#[async_trait]
73pub trait Source<T: PipelineIO>: Data {
74 async fn on_next(&self, data: T, _: private::Token) -> Result<(), Error>;
75
76 fn set_edge(&self, edge: Edge<T>, _: private::Token) -> Result<(), PipelineError>;
77
78 fn link<S: Sink<T> + 'static>(&self, sink: Arc<S>) -> Result<Arc<S>, PipelineError> {
79 let edge = Edge::new(sink.clone());
80 self.set_edge(edge, private::Token)?;
81 Ok(sink)
82 }
83}
84
85#[async_trait]
87pub trait Sink<T: PipelineIO>: Data {
88 async fn on_data(&self, data: T, _: private::Token) -> Result<(), Error>;
89}
90
91pub struct Edge<T: PipelineIO> {
93 downstream: Arc<dyn Sink<T>>,
94}
95
96impl<T: PipelineIO> Edge<T> {
97 fn new(downstream: Arc<dyn Sink<T>>) -> Self {
98 Edge { downstream }
99 }
100
101 async fn write(&self, data: T) -> Result<(), Error> {
102 self.downstream.on_data(data, private::Token).await
103 }
104}
105
106type NodeFn<In, Out> = Box<dyn Fn(In) -> Result<Out, Error> + Send + Sync>;
107
108#[async_trait]
123pub trait Operator<UpIn: PipelineIO, UpOut: PipelineIO, DownIn: PipelineIO, DownOut: PipelineIO>:
124 Data
125{
126 async fn generate(
130 &self,
131 req: UpIn,
132 next: Arc<dyn AsyncEngine<DownIn, DownOut, Error>>,
133 ) -> Result<UpOut, Error>;
134
135 fn into_operator(self: &Arc<Self>) -> Arc<PipelineOperator<UpIn, UpOut, DownIn, DownOut>>
136 where
137 Self: Sized,
138 {
139 PipelineOperator::new(self.clone())
140 }
141}
142
143pub struct PipelineOperatorForwardEdge<
146 UpIn: PipelineIO,
147 UpOut: PipelineIO,
148 DownIn: PipelineIO,
149 DownOut: PipelineIO,
150> {
151 parent: Arc<PipelineOperator<UpIn, UpOut, DownIn, DownOut>>,
152}
153
154pub struct PipelineOperatorBackwardEdge<
157 UpIn: PipelineIO,
158 UpOut: PipelineIO,
159 DownIn: PipelineIO,
160 DownOut: PipelineIO,
161> {
162 parent: Arc<PipelineOperator<UpIn, UpOut, DownIn, DownOut>>,
163}
164
165pub struct PipelineOperator<
168 UpIn: PipelineIO,
169 UpOut: PipelineIO,
170 DownIn: PipelineIO,
171 DownOut: PipelineIO,
172> {
173 operator: Arc<dyn Operator<UpIn, UpOut, DownIn, DownOut>>,
175
176 downstream: Arc<sources::Frontend<DownIn, DownOut>>,
179
180 upstream: sinks::SinkEdge<UpOut>,
183}
184
185impl<UpIn, UpOut, DownIn, DownOut> PipelineOperator<UpIn, UpOut, DownIn, DownOut>
186where
187 UpIn: PipelineIO,
188 UpOut: PipelineIO,
189 DownIn: PipelineIO,
190 DownOut: PipelineIO,
191{
192 pub fn new(operator: Arc<dyn Operator<UpIn, UpOut, DownIn, DownOut>>) -> Arc<Self> {
194 Arc::new(PipelineOperator {
195 operator,
196 downstream: Arc::new(sources::Frontend::default()),
197 upstream: sinks::SinkEdge::default(),
198 })
199 }
200
201 pub fn forward_edge(
203 self: &Arc<Self>,
204 ) -> Arc<PipelineOperatorForwardEdge<UpIn, UpOut, DownIn, DownOut>> {
205 Arc::new(PipelineOperatorForwardEdge {
206 parent: self.clone(),
207 })
208 }
209
210 pub fn backward_edge(
212 self: &Arc<Self>,
213 ) -> Arc<PipelineOperatorBackwardEdge<UpIn, UpOut, DownIn, DownOut>> {
214 Arc::new(PipelineOperatorBackwardEdge {
215 parent: self.clone(),
216 })
217 }
218}
219
220#[async_trait]
222impl<UpIn, UpOut, DownIn, DownOut> AsyncEngine<UpIn, UpOut, Error>
223 for PipelineOperator<UpIn, UpOut, DownIn, DownOut>
224where
225 UpIn: PipelineIO,
226 DownIn: PipelineIO,
227 DownOut: PipelineIO,
228 UpOut: PipelineIO,
229{
230 async fn generate(&self, req: UpIn) -> Result<UpOut, Error> {
231 self.operator.generate(req, self.downstream.clone()).await
232 }
233}
234
235#[async_trait]
236impl<UpIn, UpOut, DownIn, DownOut> Sink<UpIn>
237 for PipelineOperatorForwardEdge<UpIn, UpOut, DownIn, DownOut>
238where
239 UpIn: PipelineIO,
240 DownIn: PipelineIO,
241 DownOut: PipelineIO,
242 UpOut: PipelineIO,
243{
244 async fn on_data(&self, data: UpIn, _token: private::Token) -> Result<(), Error> {
245 let stream = self.parent.generate(data).await?;
246 self.parent.upstream.on_next(stream, private::Token).await
247 }
248}
249
250#[async_trait]
251impl<UpIn, UpOut, DownIn, DownOut> Source<DownIn>
252 for PipelineOperatorForwardEdge<UpIn, UpOut, DownIn, DownOut>
253where
254 UpIn: PipelineIO,
255 DownIn: PipelineIO,
256 DownOut: PipelineIO,
257 UpOut: PipelineIO,
258{
259 async fn on_next(&self, data: DownIn, token: private::Token) -> Result<(), Error> {
260 self.parent.downstream.on_next(data, token).await
261 }
262
263 fn set_edge(&self, edge: Edge<DownIn>, token: private::Token) -> Result<(), PipelineError> {
264 self.parent.downstream.set_edge(edge, token)
265 }
266}
267
268#[async_trait]
269impl<UpIn, UpOut, DownIn, DownOut> Sink<DownOut>
270 for PipelineOperatorBackwardEdge<UpIn, UpOut, DownIn, DownOut>
271where
272 UpIn: PipelineIO,
273 DownIn: PipelineIO,
274 DownOut: PipelineIO,
275 UpOut: PipelineIO,
276{
277 async fn on_data(&self, data: DownOut, token: private::Token) -> Result<(), Error> {
278 self.parent.downstream.on_data(data, token).await
279 }
280}
281
282#[async_trait]
283impl<UpIn, UpOut, DownIn, DownOut> Source<UpOut>
284 for PipelineOperatorBackwardEdge<UpIn, UpOut, DownIn, DownOut>
285where
286 UpIn: PipelineIO,
287 DownIn: PipelineIO,
288 DownOut: PipelineIO,
289 UpOut: PipelineIO,
290{
291 async fn on_next(&self, data: UpOut, token: private::Token) -> Result<(), Error> {
292 self.parent.upstream.on_next(data, token).await
293 }
294
295 fn set_edge(&self, edge: Edge<UpOut>, token: private::Token) -> Result<(), PipelineError> {
296 self.parent.upstream.set_edge(edge, token)
297 }
298}
299
300pub struct PipelineNode<In: PipelineIO, Out: PipelineIO> {
301 edge: OnceLock<Edge<Out>>,
302 map_fn: NodeFn<In, Out>,
303}
304
305impl<In: PipelineIO, Out: PipelineIO> PipelineNode<In, Out> {
306 pub fn new(map_fn: NodeFn<In, Out>) -> Arc<Self> {
307 Arc::new(PipelineNode::<In, Out> {
308 edge: OnceLock::new(),
309 map_fn,
310 })
311 }
312}
313
314#[async_trait]
315impl<In: PipelineIO, Out: PipelineIO> Source<Out> for PipelineNode<In, Out> {
316 async fn on_next(&self, data: Out, _: private::Token) -> Result<(), Error> {
317 self.edge
318 .get()
319 .ok_or(PipelineError::NoEdge)?
320 .write(data)
321 .await
322 }
323
324 fn set_edge(&self, edge: Edge<Out>, _: private::Token) -> Result<(), PipelineError> {
325 self.edge
326 .set(edge)
327 .map_err(|_| PipelineError::EdgeAlreadySet)?;
328
329 Ok(())
330 }
331}
332
333#[async_trait]
334impl<In: PipelineIO, Out: PipelineIO> Sink<In> for PipelineNode<In, Out> {
335 async fn on_data(&self, data: In, _: private::Token) -> Result<(), Error> {
336 self.on_next((self.map_fn)(data)?, private::Token).await
337 }
338}
339
340#[cfg(test)]
341mod tests {
342
343 use super::*;
344 use crate::pipeline::*;
345
346 #[tokio::test]
347 async fn test_pipeline_source_no_edge() {
348 let source = ServiceFrontend::<SingleIn<()>, ManyOut<()>>::new();
349 let stream = source.generate(().into()).await;
350 assert!(stream.is_err());
351 }
352}