dynamo_runtime/pipeline/nodes/
sinks.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use super::{
17    Arc, Edge, OnceLock, PipelineError, Service, Sink, Source, async_trait, private::Token,
18};
19use crate::pipeline::{PipelineIO, ServiceEngine};
20
21mod base;
22mod pipeline;
23mod segment;
24
25pub(crate) struct SinkEdge<Resp: PipelineIO> {
26    edge: OnceLock<Edge<Resp>>,
27}
28
29pub struct ServiceBackend<Req: PipelineIO, Resp: PipelineIO> {
30    engine: ServiceEngine<Req, Resp>,
31    inner: SinkEdge<Resp>,
32}
33
34// todo - use a once lock of a TransportEngine
35pub struct SegmentSink<Req: PipelineIO, Resp: PipelineIO> {
36    engine: OnceLock<ServiceEngine<Req, Resp>>,
37    inner: SinkEdge<Resp>,
38}
39
40#[allow(dead_code)]
41pub struct EgressPort<Req: PipelineIO, Resp: PipelineIO> {
42    engine: Service<Req, Resp>,
43}
44
45// impl<Resp: PipelineIO> SegmentSink<Req, Resp> {
46//     pub connect(&self)
47// }
48
49// impl<Req, Resp> EgressPort<Req, Resp>
50// where
51//     Req: PipelineIO + Serialize,
52//     Resp: for<'de> Deserialize<'de> + DataType,
53// {
54// }
55
56// #[async_trait]
57// impl<Req, Resp> AsyncEngine<Context<Req>, Annotated<Resp>> for EgressPort<Req, Resp>
58// where
59//     Req: PipelineIO + Serialize,
60//     Resp: for<'de> Deserialize<'de> + DataType,
61// {
62//     async fn generate(&self, request: Context<Req>) -> Result<Resp, GenerateError> {
63//         // when publish our request, we need to publish it with a subject
64//         // we will use a trait in the future
65//         let tx_subject = "tx-model-subject".to_string();
66
67//         let rx_subject = "rx-model-subject".to_string();
68
69//         // make a response channel
70//         let (bytes_tx, bytes_rx) = tokio::sync::mpsc::channel::<Vec<u8>>(16);
71
72//         // register the bytes_tx sender with the response subject
73//         // let bytes_stream = self.response_subscriber.register(rx_subject, bytes_tx);
74
75//         // ask network impl for a Sender to the cancellation channel
76
77//         let request = request
78//             .try_map(|req| bincode::serialize(&req))
79//             .map_err(|e| {
80//                 GenerateError(format!(
81//                     "Failed to serialize request in egress port: {}",
82//                     e.to_string()
83//                 ))
84//             })?;
85
86//         let (data, context) = request.transfer(());
87
88//         let stream_ctx = Arc::new(StreamContext::from(context));
89
90//         let shutdown_ctx = stream_ctx.clone();
91
92//         let (live_tx, live_rx) = tokio::sync::oneshot::channel::<()>();
93
94//         let byte_stream = ReceiverStream::new(bytes_rx);
95
96//         let decoded = byte_stream
97//             // decode the response
98//             .map(move |item| {
99//                 bincode::deserialize::<Annotated<Resp>>(&item)
100//                     .expect("failed to deserialize response")
101//             })
102//             .scan(Some(live_tx), move |live_tx, item| {
103//                 match item {
104//                     Annotated::End => {
105//                         // this essentially drops the channel
106//                         let _ = live_tx.take();
107//                     }
108//                     _ => {}
109//                 }
110//                 futures::future::ready(Some(item))
111//             });
112
113//         return Ok(ResponseStream::new(Box::pin(decoded), stream_ctx));
114//     }
115// }