1use std::{
47 collections::HashMap,
48 sync::{Arc, Mutex, OnceLock},
49};
50
51use super::AsyncEngine;
52use async_trait::async_trait;
53use tokio::sync::oneshot;
54
55use super::{Data, Error, PipelineError, PipelineIO};
56
57mod sinks;
58mod sources;
59
60pub use sinks::{SegmentSink, ServiceBackend};
61pub use sources::{SegmentSource, ServiceFrontend};
62
63pub type Service<In, Out> = Arc<ServiceFrontend<In, Out>>;
64
65mod private {
66 pub struct Token;
67}
68
69#[async_trait]
72pub trait Source<T: PipelineIO>: Data {
73 async fn on_next(&self, data: T, _: private::Token) -> Result<(), Error>;
74
75 fn set_edge(&self, edge: Edge<T>, _: private::Token) -> Result<(), PipelineError>;
76
77 fn link<S: Sink<T> + 'static>(&self, sink: Arc<S>) -> Result<Arc<S>, PipelineError> {
78 let edge = Edge::new(sink.clone());
79 self.set_edge(edge, private::Token)?;
80 Ok(sink)
81 }
82}
83
84#[async_trait]
86pub trait Sink<T: PipelineIO>: Data {
87 async fn on_data(&self, data: T, _: private::Token) -> Result<(), Error>;
88}
89
90pub struct Edge<T: PipelineIO> {
92 downstream: Arc<dyn Sink<T>>,
93}
94
95impl<T: PipelineIO> Edge<T> {
96 fn new(downstream: Arc<dyn Sink<T>>) -> Self {
97 Edge { downstream }
98 }
99
100 async fn write(&self, data: T) -> Result<(), Error> {
101 self.downstream.on_data(data, private::Token).await
102 }
103}
104
105type NodeFn<In, Out> = Box<dyn Fn(In) -> Result<Out, Error> + Send + Sync>;
106
107#[async_trait]
122pub trait Operator<UpIn: PipelineIO, UpOut: PipelineIO, DownIn: PipelineIO, DownOut: PipelineIO>:
123 Data
124{
125 async fn generate(
129 &self,
130 req: UpIn,
131 next: Arc<dyn AsyncEngine<DownIn, DownOut, Error>>,
132 ) -> Result<UpOut, Error>;
133
134 fn into_operator(self: &Arc<Self>) -> Arc<PipelineOperator<UpIn, UpOut, DownIn, DownOut>>
135 where
136 Self: Sized,
137 {
138 PipelineOperator::new(self.clone())
139 }
140}
141
142pub struct PipelineOperatorForwardEdge<
145 UpIn: PipelineIO,
146 UpOut: PipelineIO,
147 DownIn: PipelineIO,
148 DownOut: PipelineIO,
149> {
150 parent: Arc<PipelineOperator<UpIn, UpOut, DownIn, DownOut>>,
151}
152
153pub struct PipelineOperatorBackwardEdge<
156 UpIn: PipelineIO,
157 UpOut: PipelineIO,
158 DownIn: PipelineIO,
159 DownOut: PipelineIO,
160> {
161 parent: Arc<PipelineOperator<UpIn, UpOut, DownIn, DownOut>>,
162}
163
164pub struct PipelineOperator<
167 UpIn: PipelineIO,
168 UpOut: PipelineIO,
169 DownIn: PipelineIO,
170 DownOut: PipelineIO,
171> {
172 operator: Arc<dyn Operator<UpIn, UpOut, DownIn, DownOut>>,
174
175 downstream: Arc<sources::Frontend<DownIn, DownOut>>,
178
179 upstream: sinks::SinkEdge<UpOut>,
182}
183
184impl<UpIn, UpOut, DownIn, DownOut> PipelineOperator<UpIn, UpOut, DownIn, DownOut>
185where
186 UpIn: PipelineIO,
187 UpOut: PipelineIO,
188 DownIn: PipelineIO,
189 DownOut: PipelineIO,
190{
191 pub fn new(operator: Arc<dyn Operator<UpIn, UpOut, DownIn, DownOut>>) -> Arc<Self> {
193 Arc::new(PipelineOperator {
194 operator,
195 downstream: Arc::new(sources::Frontend::default()),
196 upstream: sinks::SinkEdge::default(),
197 })
198 }
199
200 pub fn forward_edge(
202 self: &Arc<Self>,
203 ) -> Arc<PipelineOperatorForwardEdge<UpIn, UpOut, DownIn, DownOut>> {
204 Arc::new(PipelineOperatorForwardEdge {
205 parent: self.clone(),
206 })
207 }
208
209 pub fn backward_edge(
211 self: &Arc<Self>,
212 ) -> Arc<PipelineOperatorBackwardEdge<UpIn, UpOut, DownIn, DownOut>> {
213 Arc::new(PipelineOperatorBackwardEdge {
214 parent: self.clone(),
215 })
216 }
217}
218
219#[async_trait]
221impl<UpIn, UpOut, DownIn, DownOut> AsyncEngine<UpIn, UpOut, Error>
222 for PipelineOperator<UpIn, UpOut, DownIn, DownOut>
223where
224 UpIn: PipelineIO + Sync,
225 DownIn: PipelineIO + Sync,
226 DownOut: PipelineIO,
227 UpOut: PipelineIO,
228{
229 async fn generate(&self, req: UpIn) -> Result<UpOut, Error> {
230 self.operator.generate(req, self.downstream.clone()).await
231 }
232}
233
234#[async_trait]
235impl<UpIn, UpOut, DownIn, DownOut> Sink<UpIn>
236 for PipelineOperatorForwardEdge<UpIn, UpOut, DownIn, DownOut>
237where
238 UpIn: PipelineIO + Sync,
239 DownIn: PipelineIO + Sync,
240 DownOut: PipelineIO,
241 UpOut: PipelineIO,
242{
243 async fn on_data(&self, data: UpIn, _token: private::Token) -> Result<(), Error> {
244 let stream = self.parent.generate(data).await?;
245 self.parent.upstream.on_next(stream, private::Token).await
246 }
247}
248
249#[async_trait]
250impl<UpIn, UpOut, DownIn, DownOut> Source<DownIn>
251 for PipelineOperatorForwardEdge<UpIn, UpOut, DownIn, DownOut>
252where
253 UpIn: PipelineIO,
254 DownIn: PipelineIO,
255 DownOut: PipelineIO,
256 UpOut: PipelineIO,
257{
258 async fn on_next(&self, data: DownIn, token: private::Token) -> Result<(), Error> {
259 self.parent.downstream.on_next(data, token).await
260 }
261
262 fn set_edge(&self, edge: Edge<DownIn>, token: private::Token) -> Result<(), PipelineError> {
263 self.parent.downstream.set_edge(edge, token)
264 }
265}
266
267#[async_trait]
268impl<UpIn, UpOut, DownIn, DownOut> Sink<DownOut>
269 for PipelineOperatorBackwardEdge<UpIn, UpOut, DownIn, DownOut>
270where
271 UpIn: PipelineIO,
272 DownIn: PipelineIO,
273 DownOut: PipelineIO,
274 UpOut: PipelineIO,
275{
276 async fn on_data(&self, data: DownOut, token: private::Token) -> Result<(), Error> {
277 self.parent.downstream.on_data(data, token).await
278 }
279}
280
281#[async_trait]
282impl<UpIn, UpOut, DownIn, DownOut> Source<UpOut>
283 for PipelineOperatorBackwardEdge<UpIn, UpOut, DownIn, DownOut>
284where
285 UpIn: PipelineIO,
286 DownIn: PipelineIO,
287 DownOut: PipelineIO,
288 UpOut: PipelineIO,
289{
290 async fn on_next(&self, data: UpOut, token: private::Token) -> Result<(), Error> {
291 self.parent.upstream.on_next(data, token).await
292 }
293
294 fn set_edge(&self, edge: Edge<UpOut>, token: private::Token) -> Result<(), PipelineError> {
295 self.parent.upstream.set_edge(edge, token)
296 }
297}
298
299pub struct PipelineNode<In: PipelineIO, Out: PipelineIO> {
300 edge: OnceLock<Edge<Out>>,
301 map_fn: NodeFn<In, Out>,
302}
303
304impl<In: PipelineIO, Out: PipelineIO> PipelineNode<In, Out> {
305 pub fn new(map_fn: NodeFn<In, Out>) -> Arc<Self> {
306 Arc::new(PipelineNode::<In, Out> {
307 edge: OnceLock::new(),
308 map_fn,
309 })
310 }
311}
312
313#[async_trait]
314impl<In: PipelineIO, Out: PipelineIO> Source<Out> for PipelineNode<In, Out> {
315 async fn on_next(&self, data: Out, _: private::Token) -> Result<(), Error> {
316 self.edge
317 .get()
318 .ok_or(PipelineError::NoEdge)?
319 .write(data)
320 .await
321 }
322
323 fn set_edge(&self, edge: Edge<Out>, _: private::Token) -> Result<(), PipelineError> {
324 self.edge
325 .set(edge)
326 .map_err(|_| PipelineError::EdgeAlreadySet)?;
327
328 Ok(())
329 }
330}
331
332#[async_trait]
333impl<In: PipelineIO, Out: PipelineIO> Sink<In> for PipelineNode<In, Out> {
334 async fn on_data(&self, data: In, _: private::Token) -> Result<(), Error> {
335 self.on_next((self.map_fn)(data)?, private::Token).await
336 }
337}
338
339#[cfg(test)]
340mod tests {
341
342 use super::*;
343 use crate::pipeline::*;
344
345 #[tokio::test]
346 async fn test_pipeline_source_no_edge() {
347 let source = ServiceFrontend::<SingleIn<()>, ManyOut<()>>::new();
348 let stream = source.generate(().into()).await;
349 assert!(stream.is_err());
350 }
351}