1pub mod codec;
19pub mod egress;
20pub mod ingress;
21pub mod tcp;
22
23use std::sync::{Arc, OnceLock};
24
25use anyhow::Result;
26use async_trait::async_trait;
27use bytes::Bytes;
28use codec::{TwoPartCodec, TwoPartMessage, TwoPartMessageType};
29use derive_builder::Builder;
30use futures::StreamExt;
31use super::{AsyncEngine, AsyncEngineContext, AsyncEngineContextProvider, ResponseStream};
33use serde::{Deserialize, Serialize};
34
35use super::{
36 context, AsyncTransportEngine, Context, Data, Error, ManyOut, PipelineError, PipelineIO,
37 SegmentSource, ServiceBackend, ServiceEngine, SingleIn, Source,
38};
39
40pub trait Codable: PipelineIO + Serialize + for<'de> Deserialize<'de> {}
41impl<T: PipelineIO + Serialize + for<'de> Deserialize<'de>> Codable for T {}
42
43#[async_trait]
45pub trait WorkQueueConsumer {
46 async fn dequeue(&self) -> Result<Bytes, String>;
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
50#[serde(rename_all = "snake_case")]
51pub enum StreamType {
52 Request,
53 Response,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
57#[serde(rename_all = "snake_case")]
58pub enum ControlMessage {
59 Stop,
60 Kill,
61 Sentinel,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
71pub struct ResponseStreamPrologue {
72 error: Option<String>,
73}
74
75pub type StreamProvider<T> = tokio::sync::oneshot::Receiver<Result<T, String>>;
76
77#[derive(Debug)]
86pub struct RegisteredStream<T> {
87 pub connection_info: ConnectionInfo,
88 pub stream_provider: StreamProvider<T>,
89}
90
91impl<T> RegisteredStream<T> {
92 pub fn into_parts(self) -> (ConnectionInfo, StreamProvider<T>) {
93 (self.connection_info, self.stream_provider)
94 }
95}
96
97pub struct PendingConnections {
100 pub send_stream: Option<RegisteredStream<StreamSender>>,
101 pub recv_stream: Option<RegisteredStream<StreamReceiver>>,
102}
103
104impl PendingConnections {
105 pub fn into_parts(
106 self,
107 ) -> (
108 Option<RegisteredStream<StreamSender>>,
109 Option<RegisteredStream<StreamReceiver>>,
110 ) {
111 (self.send_stream, self.recv_stream)
112 }
113}
114
115#[async_trait::async_trait]
118pub trait ResponseService {
119 async fn register(&self, options: StreamOptions) -> PendingConnections;
120}
121
122pub struct StreamSender {
147 tx: tokio::sync::mpsc::Sender<TwoPartMessage>,
148 prologue: Option<ResponseStreamPrologue>,
149}
150
151impl StreamSender {
152 pub async fn send(&self, data: Bytes) -> Result<()> {
153 Ok(self.tx.send(TwoPartMessage::from_data(data)).await?)
154 }
155
156 pub async fn send_control(&self, control: ControlMessage) -> Result<()> {
157 let bytes = serde_json::to_vec(&control)?;
158 Ok(self
159 .tx
160 .send(TwoPartMessage::from_header(bytes.into()))
161 .await?)
162 }
163
164 #[allow(clippy::needless_update)]
165 pub async fn send_prologue(&mut self, error: Option<String>) -> Result<(), String> {
166 if let Some(prologue) = self.prologue.take() {
167 let prologue = ResponseStreamPrologue { error, ..prologue };
168 let header_bytes: Bytes = match serde_json::to_vec(&prologue) {
169 Ok(b) => b.into(),
170 Err(err) => {
171 tracing::error!(%err, "send_prologue: ResponseStreamPrologue did not serialize to a JSON array");
172 return Err("Invalid prologue".to_string());
173 }
174 };
175 self.tx
176 .send(TwoPartMessage::from_header(header_bytes))
177 .await
178 .map_err(|e| e.to_string())?;
179 } else {
180 panic!("Prologue already sent; or not set; logic error");
181 }
182 Ok(())
183 }
184}
185
186pub struct StreamReceiver {
187 rx: tokio::sync::mpsc::Receiver<Bytes>,
188}
189
190#[derive(Debug, Clone, Serialize, Deserialize)]
200pub struct ConnectionInfo {
201 pub transport: String,
202 pub info: String,
203}
204
205#[derive(Clone, Builder)]
212pub struct StreamOptions {
213 pub context: Arc<dyn AsyncEngineContext>,
215
216 pub enable_request_stream: bool,
221
222 pub enable_response_stream: bool,
225
226 #[builder(default = "8")]
228 pub send_buffer_count: usize,
229
230 #[builder(default = "8")]
232 pub recv_buffer_count: usize,
233}
234
235impl StreamOptions {
236 pub fn builder() -> StreamOptionsBuilder {
237 StreamOptionsBuilder::default()
238 }
239}
240
241pub struct Egress<Req: PipelineIO, Resp: PipelineIO> {
242 transport_engine: Arc<dyn AsyncTransportEngine<Req, Resp>>,
243}
244
245#[async_trait]
246impl<T: Data, U: Data> AsyncEngine<SingleIn<T>, ManyOut<U>, Error>
247 for Egress<SingleIn<T>, ManyOut<U>>
248where
249 T: Data + Serialize,
250 U: for<'de> Deserialize<'de> + Data,
251{
252 async fn generate(&self, request: SingleIn<T>) -> Result<ManyOut<U>, Error> {
253 self.transport_engine.generate(request).await
254 }
255}
256
257#[derive(Debug, Clone, Serialize, Deserialize)]
258#[serde(rename_all = "snake_case")]
259enum RequestType {
260 SingleIn,
261 ManyIn,
262}
263
264#[derive(Debug, Clone, Serialize, Deserialize)]
265#[serde(rename_all = "snake_case")]
266enum ResponseType {
267 SingleOut,
268 ManyOut,
269}
270
271#[derive(Debug, Clone, Serialize, Deserialize)]
272struct RequestControlMessage {
273 id: String,
274 request_type: RequestType,
275 response_type: ResponseType,
276 connection_info: ConnectionInfo,
277}
278
279pub struct Ingress<Req: PipelineIO, Resp: PipelineIO> {
280 segment: OnceLock<Arc<SegmentSource<Req, Resp>>>,
281}
282
283impl<Req: PipelineIO, Resp: PipelineIO> Ingress<Req, Resp> {
284 pub fn new() -> Arc<Self> {
285 Arc::new(Self {
286 segment: OnceLock::new(),
287 })
288 }
289
290 pub fn attach(&self, segment: Arc<SegmentSource<Req, Resp>>) -> Result<()> {
291 self.segment
292 .set(segment)
293 .map_err(|_| anyhow::anyhow!("Segment already set"))
294 }
295
296 pub fn link(segment: Arc<SegmentSource<Req, Resp>>) -> Result<Arc<Self>> {
297 let ingress = Ingress::new();
298 ingress.attach(segment)?;
299 Ok(ingress)
300 }
301
302 pub fn for_pipeline(segment: Arc<SegmentSource<Req, Resp>>) -> Result<Arc<Self>> {
303 let ingress = Ingress::new();
304 ingress.attach(segment)?;
305 Ok(ingress)
306 }
307
308 pub fn for_engine(engine: ServiceEngine<Req, Resp>) -> Result<Arc<Self>> {
309 let frontend = SegmentSource::<Req, Resp>::new();
310 let backend = ServiceBackend::from_engine(engine);
311
312 let pipeline = frontend.link(backend)?.link(frontend)?;
314
315 let ingress = Ingress::new();
316 ingress.attach(pipeline)?;
317
318 Ok(ingress)
319 }
320}
321
322#[async_trait]
323pub trait PushWorkHandler: Send + Sync {
324 async fn handle_payload(&self, payload: Bytes) -> Result<(), PipelineError>;
325}