dynamo_runtime/pipeline/nodes/sources/
common.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::engine::AsyncEngineContextProvider;
17
18use super::*;
19
20macro_rules! impl_frontend {
21    ($type:ident) => {
22        impl<In: PipelineIO, Out: PipelineIO> $type<In, Out> {
23            pub fn new() -> Arc<Self> {
24                Arc::new(Self {
25                    inner: Frontend::default(),
26                })
27            }
28        }
29
30        #[async_trait]
31        impl<In: PipelineIO, Out: PipelineIO> Source<In> for $type<In, Out> {
32            async fn on_next(&self, data: In, token: private::Token) -> Result<(), Error> {
33                self.inner.on_next(data, token).await
34            }
35
36            fn set_edge(&self, edge: Edge<In>, token: private::Token) -> Result<(), PipelineError> {
37                self.inner.set_edge(edge, token)
38            }
39        }
40
41        #[async_trait]
42        impl<In: PipelineIO, Out: PipelineIO + AsyncEngineContextProvider> Sink<Out>
43            for $type<In, Out>
44        {
45            async fn on_data(&self, data: Out, token: private::Token) -> Result<(), Error> {
46                self.inner.on_data(data, token).await
47            }
48        }
49
50        #[async_trait]
51        impl<In: PipelineIO, Out: PipelineIO> AsyncEngine<In, Out, Error> for $type<In, Out> {
52            async fn generate(&self, request: In) -> Result<Out, Error> {
53                self.inner.generate(request).await
54            }
55        }
56    };
57}
58
59impl_frontend!(ServiceFrontend);
60impl_frontend!(SegmentSource);
61
62#[cfg(test)]
63mod tests {
64    use super::*;
65    use crate::pipeline::{ManyOut, PipelineErrorExt, SingleIn};
66
67    #[tokio::test]
68    async fn test_pipeline_source_no_edge() {
69        let source = Frontend::<SingleIn<()>, ManyOut<()>>::default();
70        let stream = source
71            .generate(().into())
72            .await
73            .unwrap_err()
74            .try_into_pipeline_error()
75            .unwrap();
76
77        match stream {
78            PipelineError::NoEdge => (),
79            _ => panic!("Expected NoEdge error"),
80        }
81    }
82}