dynamo_runtime/pipeline/
nodes.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Pipeline Nodes
17//!
18//! A `ServicePipeline` is a directed graph of nodes where each node defines a behavior for both
19//! forward/request path and the backward/response path. The allowed behaviors in each direction
20//! are is either a `Source`, or a `Sink`.
21//!
22//! A `Frontend` is a the start of a graph and is a [`Source`] for the forward path and a [`Sink`] for the
23//! backward path.
24//!
25//! A `Backend` is the end of a graph and is a [`Sink`] for the forward path and a [`Source`] for the
26//! backward path.
27//!
28//! An [`PipelineOperator`] is a node that can transform both the forward and backward paths using the
29//! logic supplied by the implementation of an [`Operator`] trait. Because the [`PipelineOperator`] is
30//! both a [`Source`] and a [`Sink`] of the forward request path and the backward response path respectively,
31//! i.e. it is two sources and two sinks. We can differentiate the two by using the [`PipelineOperator::forward_edge`]
32//! and [`PipelineOperator::backward_edge`] methods.
33//!
34//! - The [`PipelineOperator::forward_edge`] returns a [`PipelineOperatorForwardEdge`] which is a [`Sink`]
35//!   for incoming/upstream request and a [`Source`] for the downstream request.
36//! - The [`PipelineOperator::backward_edge`] returns a [`PipelineOperatorBackwardEdge`] which is a [`Sink`]
37//!   for the downstream response and a [`Source`] for the upstream response.
38//!
39//! An `EdgeOperator` currently named [`PipelineNode`] is a node in the graph can transform only a forward
40//! or a backward path, but does not transform both.
41//!
42//! This makes the [`Operator`] a more powerful trait as it can propagate information from the forward
43//! path to the backward path. An `EdgeOperator` on the forward path has no visibility into the backward
44//! path and therefore, cannot directly influence the backward path.
45//!
46use std::{
47    collections::HashMap,
48    sync::{Arc, Mutex, OnceLock},
49};
50
51use super::AsyncEngine;
52use async_trait::async_trait;
53use tokio::sync::oneshot;
54
55use super::{Data, Error, PipelineError, PipelineIO};
56
57mod sinks;
58mod sources;
59
60pub use sinks::{SegmentSink, ServiceBackend};
61pub use sources::{SegmentSource, ServiceFrontend};
62
63pub type Service<In, Out> = Arc<ServiceFrontend<In, Out>>;
64
65mod private {
66    pub struct Token;
67}
68
69// todo rename `ServicePipelineExt`
70/// A [`Source`] trait defines how data is emitted from a source to a downstream sink.
71#[async_trait]
72pub trait Source<T: PipelineIO>: Data {
73    async fn on_next(&self, data: T, _: private::Token) -> Result<(), Error>;
74
75    fn set_edge(&self, edge: Edge<T>, _: private::Token) -> Result<(), PipelineError>;
76
77    fn link<S: Sink<T> + 'static>(&self, sink: Arc<S>) -> Result<Arc<S>, PipelineError> {
78        let edge = Edge::new(sink.clone());
79        self.set_edge(edge, private::Token)?;
80        Ok(sink)
81    }
82}
83
84/// A [`Sink`] trait defines how data is received from a source and processed.
85#[async_trait]
86pub trait Sink<T: PipelineIO>: Data {
87    async fn on_data(&self, data: T, _: private::Token) -> Result<(), Error>;
88}
89
90/// An [`Edge`] is a connection between a [`Source`] and a [`Sink`].
91pub struct Edge<T: PipelineIO> {
92    downstream: Arc<dyn Sink<T>>,
93}
94
95impl<T: PipelineIO> Edge<T> {
96    fn new(downstream: Arc<dyn Sink<T>>) -> Self {
97        Edge { downstream }
98    }
99
100    async fn write(&self, data: T) -> Result<(), Error> {
101        self.downstream.on_data(data, private::Token).await
102    }
103}
104
105type NodeFn<In, Out> = Box<dyn Fn(In) -> Result<Out, Error> + Send + Sync>;
106
107/// An [`Operator`] is a trait that defines the behavior of how two [`AsyncEngine`] can be chained together.
108/// An [`Operator`] is not quite an [`AsyncEngine`] because its generate method requires both the upstream
109/// request, but also the downstream [`AsyncEngine`] to which it will pass the transformed request.
110/// The [`Operator`] logic must transform the upstream request `UpIn` to the downstream request `DownIn`,
111/// then transform the downstream response `DownOut` to the upstream response `UpOut`.
112///
113/// A [`PipelineOperator`] accepts an [`Operator`] and presents itself as an [`AsyncEngine`] for the upstream
114/// [`AsyncEngine<UpIn, UpOut, Error>`].
115///
116/// ### Example of type transformation and data flow
117/// ```text
118/// ... --> <UpIn> ---> [Operator] --> <DownIn> ---> ...
119/// ... <-- <UpOut> --> [Operator] <-- <DownOut> <-- ...
120/// ```
121#[async_trait]
122pub trait Operator<UpIn: PipelineIO, UpOut: PipelineIO, DownIn: PipelineIO, DownOut: PipelineIO>:
123    Data
124{
125    /// This method is expected to transform the upstream request `UpIn` to the downstream request `DownIn`,
126    /// call the next [`AsyncEngine`] with the transformed request, then transform the downstream response
127    /// `DownOut` to the upstream response `UpOut`.
128    async fn generate(
129        &self,
130        req: UpIn,
131        next: Arc<dyn AsyncEngine<DownIn, DownOut, Error>>,
132    ) -> Result<UpOut, Error>;
133
134    fn into_operator(self: &Arc<Self>) -> Arc<PipelineOperator<UpIn, UpOut, DownIn, DownOut>>
135    where
136        Self: Sized,
137    {
138        PipelineOperator::new(self.clone())
139    }
140}
141
142/// A [`PipelineOperatorForwardEdge`] is [`Sink`] for the upstream request type `UpIn` and a [`Source`] for the
143/// downstream request type `DownIn`.
144pub struct PipelineOperatorForwardEdge<
145    UpIn: PipelineIO,
146    UpOut: PipelineIO,
147    DownIn: PipelineIO,
148    DownOut: PipelineIO,
149> {
150    parent: Arc<PipelineOperator<UpIn, UpOut, DownIn, DownOut>>,
151}
152
153/// A [`PipelineOperatorBackwardEdge`] is [`Sink`] for the downstream response type `DownOut` and a [`Source`] for the
154/// upstream response type `UpOut`.
155pub struct PipelineOperatorBackwardEdge<
156    UpIn: PipelineIO,
157    UpOut: PipelineIO,
158    DownIn: PipelineIO,
159    DownOut: PipelineIO,
160> {
161    parent: Arc<PipelineOperator<UpIn, UpOut, DownIn, DownOut>>,
162}
163
164/// A [`PipelineOperator`] is a node that can transform both the forward and backward paths using the logic defined
165/// by the implementation of an [`Operator`] trait.
166pub struct PipelineOperator<
167    UpIn: PipelineIO,
168    UpOut: PipelineIO,
169    DownIn: PipelineIO,
170    DownOut: PipelineIO,
171> {
172    // core business logic of this object
173    operator: Arc<dyn Operator<UpIn, UpOut, DownIn, DownOut>>,
174
175    // this hold the downstream connections via the generic frontend
176    // frontends provide both a source and a sink interfaces
177    downstream: Arc<sources::Frontend<DownIn, DownOut>>,
178
179    // this hold the connection to the previous/upstream response sink
180    // we are a source to that upstream's response sink
181    upstream: sinks::SinkEdge<UpOut>,
182}
183
184impl<UpIn, UpOut, DownIn, DownOut> PipelineOperator<UpIn, UpOut, DownIn, DownOut>
185where
186    UpIn: PipelineIO,
187    UpOut: PipelineIO,
188    DownIn: PipelineIO,
189    DownOut: PipelineIO,
190{
191    /// Create a new [`PipelineOperator`] with the given [`Operator`] implementation.
192    pub fn new(operator: Arc<dyn Operator<UpIn, UpOut, DownIn, DownOut>>) -> Arc<Self> {
193        Arc::new(PipelineOperator {
194            operator,
195            downstream: Arc::new(sources::Frontend::default()),
196            upstream: sinks::SinkEdge::default(),
197        })
198    }
199
200    /// Access the forward edge of the [`PipelineOperator`] allowing the forward/requests paths to be linked.
201    pub fn forward_edge(
202        self: &Arc<Self>,
203    ) -> Arc<PipelineOperatorForwardEdge<UpIn, UpOut, DownIn, DownOut>> {
204        Arc::new(PipelineOperatorForwardEdge {
205            parent: self.clone(),
206        })
207    }
208
209    /// Access the backward edge of the [`PipelineOperator`] allowing the backward/responses paths to be linked.
210    pub fn backward_edge(
211        self: &Arc<Self>,
212    ) -> Arc<PipelineOperatorBackwardEdge<UpIn, UpOut, DownIn, DownOut>> {
213        Arc::new(PipelineOperatorBackwardEdge {
214            parent: self.clone(),
215        })
216    }
217}
218
219/// A [`PipelineOperator`] is an [`AsyncEngine`] for the upstream [`AsyncEngine<UpIn, UpOut, Error>`].
220#[async_trait]
221impl<UpIn, UpOut, DownIn, DownOut> AsyncEngine<UpIn, UpOut, Error>
222    for PipelineOperator<UpIn, UpOut, DownIn, DownOut>
223where
224    UpIn: PipelineIO + Sync,
225    DownIn: PipelineIO + Sync,
226    DownOut: PipelineIO,
227    UpOut: PipelineIO,
228{
229    async fn generate(&self, req: UpIn) -> Result<UpOut, Error> {
230        self.operator.generate(req, self.downstream.clone()).await
231    }
232}
233
234#[async_trait]
235impl<UpIn, UpOut, DownIn, DownOut> Sink<UpIn>
236    for PipelineOperatorForwardEdge<UpIn, UpOut, DownIn, DownOut>
237where
238    UpIn: PipelineIO + Sync,
239    DownIn: PipelineIO + Sync,
240    DownOut: PipelineIO,
241    UpOut: PipelineIO,
242{
243    async fn on_data(&self, data: UpIn, _token: private::Token) -> Result<(), Error> {
244        let stream = self.parent.generate(data).await?;
245        self.parent.upstream.on_next(stream, private::Token).await
246    }
247}
248
249#[async_trait]
250impl<UpIn, UpOut, DownIn, DownOut> Source<DownIn>
251    for PipelineOperatorForwardEdge<UpIn, UpOut, DownIn, DownOut>
252where
253    UpIn: PipelineIO,
254    DownIn: PipelineIO,
255    DownOut: PipelineIO,
256    UpOut: PipelineIO,
257{
258    async fn on_next(&self, data: DownIn, token: private::Token) -> Result<(), Error> {
259        self.parent.downstream.on_next(data, token).await
260    }
261
262    fn set_edge(&self, edge: Edge<DownIn>, token: private::Token) -> Result<(), PipelineError> {
263        self.parent.downstream.set_edge(edge, token)
264    }
265}
266
267#[async_trait]
268impl<UpIn, UpOut, DownIn, DownOut> Sink<DownOut>
269    for PipelineOperatorBackwardEdge<UpIn, UpOut, DownIn, DownOut>
270where
271    UpIn: PipelineIO,
272    DownIn: PipelineIO,
273    DownOut: PipelineIO,
274    UpOut: PipelineIO,
275{
276    async fn on_data(&self, data: DownOut, token: private::Token) -> Result<(), Error> {
277        self.parent.downstream.on_data(data, token).await
278    }
279}
280
281#[async_trait]
282impl<UpIn, UpOut, DownIn, DownOut> Source<UpOut>
283    for PipelineOperatorBackwardEdge<UpIn, UpOut, DownIn, DownOut>
284where
285    UpIn: PipelineIO,
286    DownIn: PipelineIO,
287    DownOut: PipelineIO,
288    UpOut: PipelineIO,
289{
290    async fn on_next(&self, data: UpOut, token: private::Token) -> Result<(), Error> {
291        self.parent.upstream.on_next(data, token).await
292    }
293
294    fn set_edge(&self, edge: Edge<UpOut>, token: private::Token) -> Result<(), PipelineError> {
295        self.parent.upstream.set_edge(edge, token)
296    }
297}
298
299pub struct PipelineNode<In: PipelineIO, Out: PipelineIO> {
300    edge: OnceLock<Edge<Out>>,
301    map_fn: NodeFn<In, Out>,
302}
303
304impl<In: PipelineIO, Out: PipelineIO> PipelineNode<In, Out> {
305    pub fn new(map_fn: NodeFn<In, Out>) -> Arc<Self> {
306        Arc::new(PipelineNode::<In, Out> {
307            edge: OnceLock::new(),
308            map_fn,
309        })
310    }
311}
312
313#[async_trait]
314impl<In: PipelineIO, Out: PipelineIO> Source<Out> for PipelineNode<In, Out> {
315    async fn on_next(&self, data: Out, _: private::Token) -> Result<(), Error> {
316        self.edge
317            .get()
318            .ok_or(PipelineError::NoEdge)?
319            .write(data)
320            .await
321    }
322
323    fn set_edge(&self, edge: Edge<Out>, _: private::Token) -> Result<(), PipelineError> {
324        self.edge
325            .set(edge)
326            .map_err(|_| PipelineError::EdgeAlreadySet)?;
327
328        Ok(())
329    }
330}
331
332#[async_trait]
333impl<In: PipelineIO, Out: PipelineIO> Sink<In> for PipelineNode<In, Out> {
334    async fn on_data(&self, data: In, _: private::Token) -> Result<(), Error> {
335        self.on_next((self.map_fn)(data)?, private::Token).await
336    }
337}
338
339#[cfg(test)]
340mod tests {
341
342    use super::*;
343    use crate::pipeline::*;
344
345    #[tokio::test]
346    async fn test_pipeline_source_no_edge() {
347        let source = ServiceFrontend::<SingleIn<()>, ManyOut<()>>::new();
348        let stream = source.generate(().into()).await;
349        assert!(stream.is_err());
350    }
351}