1use std::{
35 collections::HashMap,
36 sync::{Arc, Mutex, OnceLock},
37};
38
39use super::AsyncEngine;
40use async_trait::async_trait;
41use tokio::sync::oneshot;
42
43use super::{Data, Error, PipelineError, PipelineIO};
44
45mod sinks;
46mod sources;
47
48pub use sinks::{SegmentSink, ServiceBackend};
49pub use sources::{SegmentSource, ServiceFrontend};
50
51pub type Service<In, Out> = Arc<ServiceFrontend<In, Out>>;
52
53mod private {
54 pub struct Token;
55}
56
57#[async_trait]
60pub trait Source<T: PipelineIO>: Data {
61 async fn on_next(&self, data: T, _: private::Token) -> Result<(), Error>;
62
63 fn set_edge(&self, edge: Edge<T>, _: private::Token) -> Result<(), PipelineError>;
64
65 fn link<S: Sink<T> + 'static>(&self, sink: Arc<S>) -> Result<Arc<S>, PipelineError> {
66 let edge = Edge::new(sink.clone());
67 self.set_edge(edge, private::Token)?;
68 Ok(sink)
69 }
70}
71
72#[async_trait]
74pub trait Sink<T: PipelineIO>: Data {
75 async fn on_data(&self, data: T, _: private::Token) -> Result<(), Error>;
76}
77
78pub struct Edge<T: PipelineIO> {
80 downstream: Arc<dyn Sink<T>>,
81}
82
83impl<T: PipelineIO> Edge<T> {
84 fn new(downstream: Arc<dyn Sink<T>>) -> Self {
85 Edge { downstream }
86 }
87
88 async fn write(&self, data: T) -> Result<(), Error> {
89 self.downstream.on_data(data, private::Token).await
90 }
91}
92
93type NodeFn<In, Out> = Box<dyn Fn(In) -> Result<Out, Error> + Send + Sync>;
94
95#[async_trait]
110pub trait Operator<UpIn: PipelineIO, UpOut: PipelineIO, DownIn: PipelineIO, DownOut: PipelineIO>:
111 Data
112{
113 async fn generate(
117 &self,
118 req: UpIn,
119 next: Arc<dyn AsyncEngine<DownIn, DownOut, Error>>,
120 ) -> Result<UpOut, Error>;
121
122 fn into_operator(self: &Arc<Self>) -> Arc<PipelineOperator<UpIn, UpOut, DownIn, DownOut>>
123 where
124 Self: Sized,
125 {
126 PipelineOperator::new(self.clone())
127 }
128}
129
130pub struct PipelineOperatorForwardEdge<
133 UpIn: PipelineIO,
134 UpOut: PipelineIO,
135 DownIn: PipelineIO,
136 DownOut: PipelineIO,
137> {
138 parent: Arc<PipelineOperator<UpIn, UpOut, DownIn, DownOut>>,
139}
140
141pub struct PipelineOperatorBackwardEdge<
144 UpIn: PipelineIO,
145 UpOut: PipelineIO,
146 DownIn: PipelineIO,
147 DownOut: PipelineIO,
148> {
149 parent: Arc<PipelineOperator<UpIn, UpOut, DownIn, DownOut>>,
150}
151
152pub struct PipelineOperator<
155 UpIn: PipelineIO,
156 UpOut: PipelineIO,
157 DownIn: PipelineIO,
158 DownOut: PipelineIO,
159> {
160 operator: Arc<dyn Operator<UpIn, UpOut, DownIn, DownOut>>,
162
163 downstream: Arc<sources::Frontend<DownIn, DownOut>>,
166
167 upstream: sinks::SinkEdge<UpOut>,
170}
171
172impl<UpIn, UpOut, DownIn, DownOut> PipelineOperator<UpIn, UpOut, DownIn, DownOut>
173where
174 UpIn: PipelineIO,
175 UpOut: PipelineIO,
176 DownIn: PipelineIO,
177 DownOut: PipelineIO,
178{
179 pub fn new(operator: Arc<dyn Operator<UpIn, UpOut, DownIn, DownOut>>) -> Arc<Self> {
181 Arc::new(PipelineOperator {
182 operator,
183 downstream: Arc::new(sources::Frontend::default()),
184 upstream: sinks::SinkEdge::default(),
185 })
186 }
187
188 pub fn forward_edge(
190 self: &Arc<Self>,
191 ) -> Arc<PipelineOperatorForwardEdge<UpIn, UpOut, DownIn, DownOut>> {
192 Arc::new(PipelineOperatorForwardEdge {
193 parent: self.clone(),
194 })
195 }
196
197 pub fn backward_edge(
199 self: &Arc<Self>,
200 ) -> Arc<PipelineOperatorBackwardEdge<UpIn, UpOut, DownIn, DownOut>> {
201 Arc::new(PipelineOperatorBackwardEdge {
202 parent: self.clone(),
203 })
204 }
205}
206
207#[async_trait]
209impl<UpIn, UpOut, DownIn, DownOut> AsyncEngine<UpIn, UpOut, Error>
210 for PipelineOperator<UpIn, UpOut, DownIn, DownOut>
211where
212 UpIn: PipelineIO + Sync,
213 DownIn: PipelineIO + Sync,
214 DownOut: PipelineIO,
215 UpOut: PipelineIO,
216{
217 async fn generate(&self, req: UpIn) -> Result<UpOut, Error> {
218 self.operator.generate(req, self.downstream.clone()).await
219 }
220}
221
222#[async_trait]
223impl<UpIn, UpOut, DownIn, DownOut> Sink<UpIn>
224 for PipelineOperatorForwardEdge<UpIn, UpOut, DownIn, DownOut>
225where
226 UpIn: PipelineIO + Sync,
227 DownIn: PipelineIO + Sync,
228 DownOut: PipelineIO,
229 UpOut: PipelineIO,
230{
231 async fn on_data(&self, data: UpIn, _token: private::Token) -> Result<(), Error> {
232 let stream = self.parent.generate(data).await?;
233 self.parent.upstream.on_next(stream, private::Token).await
234 }
235}
236
237#[async_trait]
238impl<UpIn, UpOut, DownIn, DownOut> Source<DownIn>
239 for PipelineOperatorForwardEdge<UpIn, UpOut, DownIn, DownOut>
240where
241 UpIn: PipelineIO,
242 DownIn: PipelineIO,
243 DownOut: PipelineIO,
244 UpOut: PipelineIO,
245{
246 async fn on_next(&self, data: DownIn, token: private::Token) -> Result<(), Error> {
247 self.parent.downstream.on_next(data, token).await
248 }
249
250 fn set_edge(&self, edge: Edge<DownIn>, token: private::Token) -> Result<(), PipelineError> {
251 self.parent.downstream.set_edge(edge, token)
252 }
253}
254
255#[async_trait]
256impl<UpIn, UpOut, DownIn, DownOut> Sink<DownOut>
257 for PipelineOperatorBackwardEdge<UpIn, UpOut, DownIn, DownOut>
258where
259 UpIn: PipelineIO,
260 DownIn: PipelineIO,
261 DownOut: PipelineIO,
262 UpOut: PipelineIO,
263{
264 async fn on_data(&self, data: DownOut, token: private::Token) -> Result<(), Error> {
265 self.parent.downstream.on_data(data, token).await
266 }
267}
268
269#[async_trait]
270impl<UpIn, UpOut, DownIn, DownOut> Source<UpOut>
271 for PipelineOperatorBackwardEdge<UpIn, UpOut, DownIn, DownOut>
272where
273 UpIn: PipelineIO,
274 DownIn: PipelineIO,
275 DownOut: PipelineIO,
276 UpOut: PipelineIO,
277{
278 async fn on_next(&self, data: UpOut, token: private::Token) -> Result<(), Error> {
279 self.parent.upstream.on_next(data, token).await
280 }
281
282 fn set_edge(&self, edge: Edge<UpOut>, token: private::Token) -> Result<(), PipelineError> {
283 self.parent.upstream.set_edge(edge, token)
284 }
285}
286
287pub struct PipelineNode<In: PipelineIO, Out: PipelineIO> {
288 edge: OnceLock<Edge<Out>>,
289 map_fn: NodeFn<In, Out>,
290}
291
292impl<In: PipelineIO, Out: PipelineIO> PipelineNode<In, Out> {
293 pub fn new(map_fn: NodeFn<In, Out>) -> Arc<Self> {
294 Arc::new(PipelineNode::<In, Out> {
295 edge: OnceLock::new(),
296 map_fn,
297 })
298 }
299}
300
301#[async_trait]
302impl<In: PipelineIO, Out: PipelineIO> Source<Out> for PipelineNode<In, Out> {
303 async fn on_next(&self, data: Out, _: private::Token) -> Result<(), Error> {
304 self.edge
305 .get()
306 .ok_or(PipelineError::NoEdge)?
307 .write(data)
308 .await
309 }
310
311 fn set_edge(&self, edge: Edge<Out>, _: private::Token) -> Result<(), PipelineError> {
312 self.edge
313 .set(edge)
314 .map_err(|_| PipelineError::EdgeAlreadySet)?;
315
316 Ok(())
317 }
318}
319
320#[async_trait]
321impl<In: PipelineIO, Out: PipelineIO> Sink<In> for PipelineNode<In, Out> {
322 async fn on_data(&self, data: In, _: private::Token) -> Result<(), Error> {
323 self.on_next((self.map_fn)(data)?, private::Token).await
324 }
325}
326
327#[cfg(test)]
328mod tests {
329
330 use super::*;
331 use crate::pipeline::*;
332
333 #[tokio::test]
334 async fn test_pipeline_source_no_edge() {
335 let source = ServiceFrontend::<SingleIn<()>, ManyOut<()>>::new();
336 let stream = source.generate(().into()).await;
337 assert!(stream.is_err());
338 }
339}