dynamo_runtime/pipeline/nodes/sinks.rs
1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use super::{
5 Arc, Edge, OnceLock, PipelineError, Service, Sink, Source, async_trait, private::Token,
6};
7use crate::pipeline::{PipelineIO, ServiceEngine};
8
9mod base;
10mod pipeline;
11mod segment;
12
13pub(crate) struct SinkEdge<Resp: PipelineIO> {
14 edge: OnceLock<Edge<Resp>>,
15}
16
17pub struct ServiceBackend<Req: PipelineIO, Resp: PipelineIO> {
18 engine: ServiceEngine<Req, Resp>,
19 inner: SinkEdge<Resp>,
20}
21
22// todo - use a once lock of a TransportEngine
23pub struct SegmentSink<Req: PipelineIO, Resp: PipelineIO> {
24 engine: OnceLock<ServiceEngine<Req, Resp>>,
25 inner: SinkEdge<Resp>,
26}
27
28#[allow(dead_code)]
29pub struct EgressPort<Req: PipelineIO, Resp: PipelineIO> {
30 engine: Service<Req, Resp>,
31}
32
33// impl<Resp: PipelineIO> SegmentSink<Req, Resp> {
34// pub connect(&self)
35// }
36
37// impl<Req, Resp> EgressPort<Req, Resp>
38// where
39// Req: PipelineIO + Serialize,
40// Resp: for<'de> Deserialize<'de> + DataType,
41// {
42// }
43
44// #[async_trait]
45// impl<Req, Resp> AsyncEngine<Context<Req>, Annotated<Resp>> for EgressPort<Req, Resp>
46// where
47// Req: PipelineIO + Serialize,
48// Resp: for<'de> Deserialize<'de> + DataType,
49// {
50// async fn generate(&self, request: Context<Req>) -> Result<Resp, GenerateError> {
51// // when publish our request, we need to publish it with a subject
52// // we will use a trait in the future
53// let tx_subject = "tx-model-subject".to_string();
54
55// let rx_subject = "rx-model-subject".to_string();
56
57// // make a response channel
58// let (bytes_tx, bytes_rx) = tokio::sync::mpsc::channel::<Vec<u8>>(16);
59
60// // register the bytes_tx sender with the response subject
61// // let bytes_stream = self.response_subscriber.register(rx_subject, bytes_tx);
62
63// // ask network impl for a Sender to the cancellation channel
64
65// let request = request
66// .try_map(|req| bincode::serialize(&req))
67// .map_err(|e| {
68// GenerateError(format!(
69// "Failed to serialize request in egress port: {}",
70// e.to_string()
71// ))
72// })?;
73
74// let (data, context) = request.transfer(());
75
76// let stream_ctx = Arc::new(StreamContext::from(context));
77
78// let shutdown_ctx = stream_ctx.clone();
79
80// let (live_tx, live_rx) = tokio::sync::oneshot::channel::<()>();
81
82// let byte_stream = ReceiverStream::new(bytes_rx);
83
84// let decoded = byte_stream
85// // decode the response
86// .map(move |item| {
87// bincode::deserialize::<Annotated<Resp>>(&item)
88// .expect("failed to deserialize response")
89// })
90// .scan(Some(live_tx), move |live_tx, item| {
91// match item {
92// Annotated::End => {
93// // this essentially drops the channel
94// let _ = live_tx.take();
95// }
96// _ => {}
97// }
98// futures::future::ready(Some(item))
99// });
100
101// return Ok(ResponseStream::new(Box::pin(decoded), stream_ctx));
102// }
103// }