dynamo_runtime/pipeline/
nodes.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Pipeline Nodes
5//!
6//! A `ServicePipeline` is a directed graph of nodes where each node defines a behavior for both
7//! forward/request path and the backward/response path. The allowed behaviors in each direction
8//! are is either a `Source`, or a `Sink`.
9//!
10//! A `Frontend` is a the start of a graph and is a [`Source`] for the forward path and a [`Sink`] for the
11//! backward path.
12//!
13//! A `Backend` is the end of a graph and is a [`Sink`] for the forward path and a [`Source`] for the
14//! backward path.
15//!
16//! An [`PipelineOperator`] is a node that can transform both the forward and backward paths using the
17//! logic supplied by the implementation of an [`Operator`] trait. Because the [`PipelineOperator`] is
18//! both a [`Source`] and a [`Sink`] of the forward request path and the backward response path respectively,
19//! i.e. it is two sources and two sinks. We can differentiate the two by using the [`PipelineOperator::forward_edge`]
20//! and [`PipelineOperator::backward_edge`] methods.
21//!
22//! - The [`PipelineOperator::forward_edge`] returns a [`PipelineOperatorForwardEdge`] which is a [`Sink`]
23//!   for incoming/upstream request and a [`Source`] for the downstream request.
24//! - The [`PipelineOperator::backward_edge`] returns a [`PipelineOperatorBackwardEdge`] which is a [`Sink`]
25//!   for the downstream response and a [`Source`] for the upstream response.
26//!
27//! An `EdgeOperator` currently named [`PipelineNode`] is a node in the graph can transform only a forward
28//! or a backward path, but does not transform both.
29//!
30//! This makes the [`Operator`] a more powerful trait as it can propagate information from the forward
31//! path to the backward path. An `EdgeOperator` on the forward path has no visibility into the backward
32//! path and therefore, cannot directly influence the backward path.
33//!
34use std::{
35    collections::HashMap,
36    sync::{Arc, Mutex, OnceLock},
37};
38
39use super::AsyncEngine;
40use async_trait::async_trait;
41use tokio::sync::oneshot;
42
43use super::{Data, Error, PipelineError, PipelineIO};
44
45mod sinks;
46mod sources;
47
48pub use sinks::{SegmentSink, ServiceBackend};
49pub use sources::{SegmentSource, ServiceFrontend};
50
51pub type Service<In, Out> = Arc<ServiceFrontend<In, Out>>;
52
53mod private {
54    pub struct Token;
55}
56
57// todo rename `ServicePipelineExt`
58/// A [`Source`] trait defines how data is emitted from a source to a downstream sink.
59#[async_trait]
60pub trait Source<T: PipelineIO>: Data {
61    async fn on_next(&self, data: T, _: private::Token) -> Result<(), Error>;
62
63    fn set_edge(&self, edge: Edge<T>, _: private::Token) -> Result<(), PipelineError>;
64
65    fn link<S: Sink<T> + 'static>(&self, sink: Arc<S>) -> Result<Arc<S>, PipelineError> {
66        let edge = Edge::new(sink.clone());
67        self.set_edge(edge, private::Token)?;
68        Ok(sink)
69    }
70}
71
72/// A [`Sink`] trait defines how data is received from a source and processed.
73#[async_trait]
74pub trait Sink<T: PipelineIO>: Data {
75    async fn on_data(&self, data: T, _: private::Token) -> Result<(), Error>;
76}
77
78/// An [`Edge`] is a connection between a [`Source`] and a [`Sink`].
79pub struct Edge<T: PipelineIO> {
80    downstream: Arc<dyn Sink<T>>,
81}
82
83impl<T: PipelineIO> Edge<T> {
84    fn new(downstream: Arc<dyn Sink<T>>) -> Self {
85        Edge { downstream }
86    }
87
88    async fn write(&self, data: T) -> Result<(), Error> {
89        self.downstream.on_data(data, private::Token).await
90    }
91}
92
93type NodeFn<In, Out> = Box<dyn Fn(In) -> Result<Out, Error> + Send + Sync>;
94
95/// An [`Operator`] is a trait that defines the behavior of how two [`AsyncEngine`] can be chained together.
96/// An [`Operator`] is not quite an [`AsyncEngine`] because its generate method requires both the upstream
97/// request, but also the downstream [`AsyncEngine`] to which it will pass the transformed request.
98/// The [`Operator`] logic must transform the upstream request `UpIn` to the downstream request `DownIn`,
99/// then transform the downstream response `DownOut` to the upstream response `UpOut`.
100///
101/// A [`PipelineOperator`] accepts an [`Operator`] and presents itself as an [`AsyncEngine`] for the upstream
102/// [`AsyncEngine<UpIn, UpOut, Error>`].
103///
104/// ### Example of type transformation and data flow
105/// ```text
106/// ... --> <UpIn> ---> [Operator] --> <DownIn> ---> ...
107/// ... <-- <UpOut> --> [Operator] <-- <DownOut> <-- ...
108/// ```
109#[async_trait]
110pub trait Operator<UpIn: PipelineIO, UpOut: PipelineIO, DownIn: PipelineIO, DownOut: PipelineIO>:
111    Data
112{
113    /// This method is expected to transform the upstream request `UpIn` to the downstream request `DownIn`,
114    /// call the next [`AsyncEngine`] with the transformed request, then transform the downstream response
115    /// `DownOut` to the upstream response `UpOut`.
116    async fn generate(
117        &self,
118        req: UpIn,
119        next: Arc<dyn AsyncEngine<DownIn, DownOut, Error>>,
120    ) -> Result<UpOut, Error>;
121
122    fn into_operator(self: &Arc<Self>) -> Arc<PipelineOperator<UpIn, UpOut, DownIn, DownOut>>
123    where
124        Self: Sized,
125    {
126        PipelineOperator::new(self.clone())
127    }
128}
129
130/// A [`PipelineOperatorForwardEdge`] is [`Sink`] for the upstream request type `UpIn` and a [`Source`] for the
131/// downstream request type `DownIn`.
132pub struct PipelineOperatorForwardEdge<
133    UpIn: PipelineIO,
134    UpOut: PipelineIO,
135    DownIn: PipelineIO,
136    DownOut: PipelineIO,
137> {
138    parent: Arc<PipelineOperator<UpIn, UpOut, DownIn, DownOut>>,
139}
140
141/// A [`PipelineOperatorBackwardEdge`] is [`Sink`] for the downstream response type `DownOut` and a [`Source`] for the
142/// upstream response type `UpOut`.
143pub struct PipelineOperatorBackwardEdge<
144    UpIn: PipelineIO,
145    UpOut: PipelineIO,
146    DownIn: PipelineIO,
147    DownOut: PipelineIO,
148> {
149    parent: Arc<PipelineOperator<UpIn, UpOut, DownIn, DownOut>>,
150}
151
152/// A [`PipelineOperator`] is a node that can transform both the forward and backward paths using the logic defined
153/// by the implementation of an [`Operator`] trait.
154pub struct PipelineOperator<
155    UpIn: PipelineIO,
156    UpOut: PipelineIO,
157    DownIn: PipelineIO,
158    DownOut: PipelineIO,
159> {
160    // core business logic of this object
161    operator: Arc<dyn Operator<UpIn, UpOut, DownIn, DownOut>>,
162
163    // this hold the downstream connections via the generic frontend
164    // frontends provide both a source and a sink interfaces
165    downstream: Arc<sources::Frontend<DownIn, DownOut>>,
166
167    // this hold the connection to the previous/upstream response sink
168    // we are a source to that upstream's response sink
169    upstream: sinks::SinkEdge<UpOut>,
170}
171
172impl<UpIn, UpOut, DownIn, DownOut> PipelineOperator<UpIn, UpOut, DownIn, DownOut>
173where
174    UpIn: PipelineIO,
175    UpOut: PipelineIO,
176    DownIn: PipelineIO,
177    DownOut: PipelineIO,
178{
179    /// Create a new [`PipelineOperator`] with the given [`Operator`] implementation.
180    pub fn new(operator: Arc<dyn Operator<UpIn, UpOut, DownIn, DownOut>>) -> Arc<Self> {
181        Arc::new(PipelineOperator {
182            operator,
183            downstream: Arc::new(sources::Frontend::default()),
184            upstream: sinks::SinkEdge::default(),
185        })
186    }
187
188    /// Access the forward edge of the [`PipelineOperator`] allowing the forward/requests paths to be linked.
189    pub fn forward_edge(
190        self: &Arc<Self>,
191    ) -> Arc<PipelineOperatorForwardEdge<UpIn, UpOut, DownIn, DownOut>> {
192        Arc::new(PipelineOperatorForwardEdge {
193            parent: self.clone(),
194        })
195    }
196
197    /// Access the backward edge of the [`PipelineOperator`] allowing the backward/responses paths to be linked.
198    pub fn backward_edge(
199        self: &Arc<Self>,
200    ) -> Arc<PipelineOperatorBackwardEdge<UpIn, UpOut, DownIn, DownOut>> {
201        Arc::new(PipelineOperatorBackwardEdge {
202            parent: self.clone(),
203        })
204    }
205}
206
207/// A [`PipelineOperator`] is an [`AsyncEngine`] for the upstream [`AsyncEngine<UpIn, UpOut, Error>`].
208#[async_trait]
209impl<UpIn, UpOut, DownIn, DownOut> AsyncEngine<UpIn, UpOut, Error>
210    for PipelineOperator<UpIn, UpOut, DownIn, DownOut>
211where
212    UpIn: PipelineIO + Sync,
213    DownIn: PipelineIO + Sync,
214    DownOut: PipelineIO,
215    UpOut: PipelineIO,
216{
217    async fn generate(&self, req: UpIn) -> Result<UpOut, Error> {
218        self.operator.generate(req, self.downstream.clone()).await
219    }
220}
221
222#[async_trait]
223impl<UpIn, UpOut, DownIn, DownOut> Sink<UpIn>
224    for PipelineOperatorForwardEdge<UpIn, UpOut, DownIn, DownOut>
225where
226    UpIn: PipelineIO + Sync,
227    DownIn: PipelineIO + Sync,
228    DownOut: PipelineIO,
229    UpOut: PipelineIO,
230{
231    async fn on_data(&self, data: UpIn, _token: private::Token) -> Result<(), Error> {
232        let stream = self.parent.generate(data).await?;
233        self.parent.upstream.on_next(stream, private::Token).await
234    }
235}
236
237#[async_trait]
238impl<UpIn, UpOut, DownIn, DownOut> Source<DownIn>
239    for PipelineOperatorForwardEdge<UpIn, UpOut, DownIn, DownOut>
240where
241    UpIn: PipelineIO,
242    DownIn: PipelineIO,
243    DownOut: PipelineIO,
244    UpOut: PipelineIO,
245{
246    async fn on_next(&self, data: DownIn, token: private::Token) -> Result<(), Error> {
247        self.parent.downstream.on_next(data, token).await
248    }
249
250    fn set_edge(&self, edge: Edge<DownIn>, token: private::Token) -> Result<(), PipelineError> {
251        self.parent.downstream.set_edge(edge, token)
252    }
253}
254
255#[async_trait]
256impl<UpIn, UpOut, DownIn, DownOut> Sink<DownOut>
257    for PipelineOperatorBackwardEdge<UpIn, UpOut, DownIn, DownOut>
258where
259    UpIn: PipelineIO,
260    DownIn: PipelineIO,
261    DownOut: PipelineIO,
262    UpOut: PipelineIO,
263{
264    async fn on_data(&self, data: DownOut, token: private::Token) -> Result<(), Error> {
265        self.parent.downstream.on_data(data, token).await
266    }
267}
268
269#[async_trait]
270impl<UpIn, UpOut, DownIn, DownOut> Source<UpOut>
271    for PipelineOperatorBackwardEdge<UpIn, UpOut, DownIn, DownOut>
272where
273    UpIn: PipelineIO,
274    DownIn: PipelineIO,
275    DownOut: PipelineIO,
276    UpOut: PipelineIO,
277{
278    async fn on_next(&self, data: UpOut, token: private::Token) -> Result<(), Error> {
279        self.parent.upstream.on_next(data, token).await
280    }
281
282    fn set_edge(&self, edge: Edge<UpOut>, token: private::Token) -> Result<(), PipelineError> {
283        self.parent.upstream.set_edge(edge, token)
284    }
285}
286
287pub struct PipelineNode<In: PipelineIO, Out: PipelineIO> {
288    edge: OnceLock<Edge<Out>>,
289    map_fn: NodeFn<In, Out>,
290}
291
292impl<In: PipelineIO, Out: PipelineIO> PipelineNode<In, Out> {
293    pub fn new(map_fn: NodeFn<In, Out>) -> Arc<Self> {
294        Arc::new(PipelineNode::<In, Out> {
295            edge: OnceLock::new(),
296            map_fn,
297        })
298    }
299}
300
301#[async_trait]
302impl<In: PipelineIO, Out: PipelineIO> Source<Out> for PipelineNode<In, Out> {
303    async fn on_next(&self, data: Out, _: private::Token) -> Result<(), Error> {
304        self.edge
305            .get()
306            .ok_or(PipelineError::NoEdge)?
307            .write(data)
308            .await
309    }
310
311    fn set_edge(&self, edge: Edge<Out>, _: private::Token) -> Result<(), PipelineError> {
312        self.edge
313            .set(edge)
314            .map_err(|_| PipelineError::EdgeAlreadySet)?;
315
316        Ok(())
317    }
318}
319
320#[async_trait]
321impl<In: PipelineIO, Out: PipelineIO> Sink<In> for PipelineNode<In, Out> {
322    async fn on_data(&self, data: In, _: private::Token) -> Result<(), Error> {
323        self.on_next((self.map_fn)(data)?, private::Token).await
324    }
325}
326
327#[cfg(test)]
328mod tests {
329
330    use super::*;
331    use crate::pipeline::*;
332
333    #[tokio::test]
334    async fn test_pipeline_source_no_edge() {
335        let source = ServiceFrontend::<SingleIn<()>, ManyOut<()>>::new();
336        let stream = source.generate(().into()).await;
337        assert!(stream.is_err());
338    }
339}