1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use ;
use crate;
pub
// todo - use a once lock of a TransportEngine
// impl<Resp: PipelineIO> SegmentSink<Req, Resp> {
// pub connect(&self)
// }
// impl<Req, Resp> EgressPort<Req, Resp>
// where
// Req: PipelineIO + Serialize,
// Resp: for<'de> Deserialize<'de> + DataType,
// {
// }
// #[async_trait]
// impl<Req, Resp> AsyncEngine<Context<Req>, Annotated<Resp>> for EgressPort<Req, Resp>
// where
// Req: PipelineIO + Serialize,
// Resp: for<'de> Deserialize<'de> + DataType,
// {
// async fn generate(&self, request: Context<Req>) -> Result<Resp, GenerateError> {
// // when publish our request, we need to publish it with a subject
// // we will use a trait in the future
// let tx_subject = "tx-model-subject".to_string();
// let rx_subject = "rx-model-subject".to_string();
// // make a response channel
// let (bytes_tx, bytes_rx) = tokio::sync::mpsc::channel::<Vec<u8>>(16);
// // register the bytes_tx sender with the response subject
// // let bytes_stream = self.response_subscriber.register(rx_subject, bytes_tx);
// // ask network impl for a Sender to the cancellation channel
// let request = request
// .try_map(|req| bincode::serialize(&req))
// .map_err(|e| {
// GenerateError(format!(
// "Failed to serialize request in egress port: {}",
// e.to_string()
// ))
// })?;
// let (data, context) = request.transfer(());
// let stream_ctx = Arc::new(StreamContext::from(context));
// let shutdown_ctx = stream_ctx.clone();
// let (live_tx, live_rx) = tokio::sync::oneshot::channel::<()>();
// let byte_stream = ReceiverStream::new(bytes_rx);
// let decoded = byte_stream
// // decode the response
// .map(move |item| {
// bincode::deserialize::<Annotated<Resp>>(&item)
// .expect("failed to deserialize response")
// })
// .scan(Some(live_tx), move |live_tx, item| {
// match item {
// Annotated::End => {
// // this essentially drops the channel
// let _ = live_tx.take();
// }
// _ => {}
// }
// futures::future::ready(Some(item))
// });
// return Ok(ResponseStream::new(Box::pin(decoded), stream_ctx));
// }
// }