dynamo_runtime/pipeline/nodes/
sinks.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::{
5    Arc, Edge, OnceLock, PipelineError, Service, Sink, Source, async_trait, private::Token,
6};
7use crate::pipeline::{PipelineIO, ServiceEngine};
8
9mod base;
10mod pipeline;
11mod segment;
12
13pub(crate) struct SinkEdge<Resp: PipelineIO> {
14    edge: OnceLock<Edge<Resp>>,
15}
16
17pub struct ServiceBackend<Req: PipelineIO, Resp: PipelineIO> {
18    engine: ServiceEngine<Req, Resp>,
19    inner: SinkEdge<Resp>,
20}
21
22// todo - use a once lock of a TransportEngine
23pub struct SegmentSink<Req: PipelineIO, Resp: PipelineIO> {
24    engine: OnceLock<ServiceEngine<Req, Resp>>,
25    inner: SinkEdge<Resp>,
26}
27
28#[allow(dead_code)]
29pub struct EgressPort<Req: PipelineIO, Resp: PipelineIO> {
30    engine: Service<Req, Resp>,
31}
32
33// impl<Resp: PipelineIO> SegmentSink<Req, Resp> {
34//     pub connect(&self)
35// }
36
37// impl<Req, Resp> EgressPort<Req, Resp>
38// where
39//     Req: PipelineIO + Serialize,
40//     Resp: for<'de> Deserialize<'de> + DataType,
41// {
42// }
43
44// #[async_trait]
45// impl<Req, Resp> AsyncEngine<Context<Req>, Annotated<Resp>> for EgressPort<Req, Resp>
46// where
47//     Req: PipelineIO + Serialize,
48//     Resp: for<'de> Deserialize<'de> + DataType,
49// {
50//     async fn generate(&self, request: Context<Req>) -> Result<Resp, GenerateError> {
51//         // when publish our request, we need to publish it with a subject
52//         // we will use a trait in the future
53//         let tx_subject = "tx-model-subject".to_string();
54
55//         let rx_subject = "rx-model-subject".to_string();
56
57//         // make a response channel
58//         let (bytes_tx, bytes_rx) = tokio::sync::mpsc::channel::<Vec<u8>>(16);
59
60//         // register the bytes_tx sender with the response subject
61//         // let bytes_stream = self.response_subscriber.register(rx_subject, bytes_tx);
62
63//         // ask network impl for a Sender to the cancellation channel
64
65//         let request = request
66//             .try_map(|req| bincode::serialize(&req))
67//             .map_err(|e| {
68//                 GenerateError(format!(
69//                     "Failed to serialize request in egress port: {}",
70//                     e.to_string()
71//                 ))
72//             })?;
73
74//         let (data, context) = request.transfer(());
75
76//         let stream_ctx = Arc::new(StreamContext::from(context));
77
78//         let shutdown_ctx = stream_ctx.clone();
79
80//         let (live_tx, live_rx) = tokio::sync::oneshot::channel::<()>();
81
82//         let byte_stream = ReceiverStream::new(bytes_rx);
83
84//         let decoded = byte_stream
85//             // decode the response
86//             .map(move |item| {
87//                 bincode::deserialize::<Annotated<Resp>>(&item)
88//                     .expect("failed to deserialize response")
89//             })
90//             .scan(Some(live_tx), move |live_tx, item| {
91//                 match item {
92//                     Annotated::End => {
93//                         // this essentially drops the channel
94//                         let _ = live_tx.take();
95//                     }
96//                     _ => {}
97//                 }
98//                 futures::future::ready(Some(item))
99//             });
100
101//         return Ok(ResponseStream::new(Box::pin(decoded), stream_ctx));
102//     }
103// }