dynamo_runtime/pipeline/nodes/sources/
common.rs1use crate::engine::AsyncEngineContextProvider;
17
18use super::*;
19
20macro_rules! impl_frontend {
21 ($type:ident) => {
22 impl<In: PipelineIO, Out: PipelineIO> $type<In, Out> {
23 pub fn new() -> Arc<Self> {
24 Arc::new(Self {
25 inner: Frontend::default(),
26 })
27 }
28 }
29
30 #[async_trait]
31 impl<In: PipelineIO, Out: PipelineIO> Source<In> for $type<In, Out> {
32 async fn on_next(&self, data: In, token: private::Token) -> Result<(), Error> {
33 self.inner.on_next(data, token).await
34 }
35
36 fn set_edge(&self, edge: Edge<In>, token: private::Token) -> Result<(), PipelineError> {
37 self.inner.set_edge(edge, token)
38 }
39 }
40
41 #[async_trait]
42 impl<In: PipelineIO, Out: PipelineIO + AsyncEngineContextProvider> Sink<Out>
43 for $type<In, Out>
44 {
45 async fn on_data(&self, data: Out, token: private::Token) -> Result<(), Error> {
46 self.inner.on_data(data, token).await
47 }
48 }
49
50 #[async_trait]
51 impl<In: PipelineIO, Out: PipelineIO> AsyncEngine<In, Out, Error> for $type<In, Out> {
52 async fn generate(&self, request: In) -> Result<Out, Error> {
53 self.inner.generate(request).await
54 }
55 }
56 };
57}
58
59impl_frontend!(ServiceFrontend);
60impl_frontend!(SegmentSource);
61
62#[cfg(test)]
63mod tests {
64 use super::*;
65 use crate::pipeline::{ManyOut, PipelineErrorExt, SingleIn};
66
67 #[tokio::test]
68 async fn test_pipeline_source_no_edge() {
69 let source = Frontend::<SingleIn<()>, ManyOut<()>>::default();
70 let stream = source
71 .generate(().into())
72 .await
73 .unwrap_err()
74 .try_into_pipeline_error()
75 .unwrap();
76
77 match stream {
78 PipelineError::NoEdge => (),
79 _ => panic!("Expected NoEdge error"),
80 }
81 }
82}