1pub mod codec;
12pub mod egress;
13pub mod ingress;
14pub mod manager;
15pub mod tcp;
16
17use crate::SystemHealth;
18use std::sync::{Arc, OnceLock};
19
20use anyhow::Result;
21use async_trait::async_trait;
22use bytes::Bytes;
23use codec::{TwoPartCodec, TwoPartMessage, TwoPartMessageType};
24use derive_builder::Builder;
25use futures::StreamExt;
26use super::{AsyncEngine, AsyncEngineContext, AsyncEngineContextProvider, ResponseStream};
28use serde::{Deserialize, Serialize};
29
30use super::{
31 AsyncTransportEngine, Context, Data, Error, ManyOut, PipelineError, PipelineIO, SegmentSource,
32 ServiceBackend, ServiceEngine, SingleIn, Source, context,
33};
34use ingress::push_handler::WorkHandlerMetrics;
35
36use crate::metrics::MetricsHierarchy;
38use prometheus::{CounterVec, Histogram, IntCounter, IntCounterVec, IntGauge};
39
40pub trait Codable: PipelineIO + Serialize + for<'de> Deserialize<'de> {}
41impl<T: PipelineIO + Serialize + for<'de> Deserialize<'de>> Codable for T {}
42
43#[async_trait]
45pub trait WorkQueueConsumer {
46 async fn dequeue(&self) -> Result<Bytes, String>;
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
50#[serde(rename_all = "snake_case")]
51pub enum StreamType {
52 Request,
53 Response,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
57#[serde(rename_all = "snake_case")]
58pub enum ControlMessage {
59 Stop,
60 Kill,
61 Sentinel,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
71pub struct ResponseStreamPrologue {
72 error: Option<String>,
73}
74
75pub type StreamProvider<T> = tokio::sync::oneshot::Receiver<Result<T, String>>;
76
77#[derive(Debug)]
86pub struct RegisteredStream<T> {
87 pub connection_info: ConnectionInfo,
88 pub stream_provider: StreamProvider<T>,
89}
90
91impl<T> RegisteredStream<T> {
92 pub fn into_parts(self) -> (ConnectionInfo, StreamProvider<T>) {
93 (self.connection_info, self.stream_provider)
94 }
95}
96
97pub struct PendingConnections {
100 pub send_stream: Option<RegisteredStream<StreamSender>>,
101 pub recv_stream: Option<RegisteredStream<StreamReceiver>>,
102}
103
104impl PendingConnections {
105 pub fn into_parts(
106 self,
107 ) -> (
108 Option<RegisteredStream<StreamSender>>,
109 Option<RegisteredStream<StreamReceiver>>,
110 ) {
111 (self.send_stream, self.recv_stream)
112 }
113}
114
115#[async_trait::async_trait]
118pub trait ResponseService {
119 async fn register(&self, options: StreamOptions) -> PendingConnections;
120}
121
122pub struct StreamSender {
147 tx: tokio::sync::mpsc::Sender<TwoPartMessage>,
148 prologue: Option<ResponseStreamPrologue>,
149}
150
151impl StreamSender {
152 pub async fn send(&self, data: Bytes) -> Result<()> {
153 Ok(self.tx.send(TwoPartMessage::from_data(data)).await?)
154 }
155
156 pub async fn send_control(&self, control: ControlMessage) -> Result<()> {
157 let bytes = serde_json::to_vec(&control)?;
158 Ok(self
159 .tx
160 .send(TwoPartMessage::from_header(bytes.into()))
161 .await?)
162 }
163
164 #[allow(clippy::needless_update)]
165 pub async fn send_prologue(&mut self, error: Option<String>) -> Result<(), String> {
166 if let Some(prologue) = self.prologue.take() {
167 let prologue = ResponseStreamPrologue { error, ..prologue };
168 let header_bytes: Bytes = match serde_json::to_vec(&prologue) {
169 Ok(b) => b.into(),
170 Err(err) => {
171 tracing::error!(%err, "send_prologue: ResponseStreamPrologue did not serialize to a JSON array");
172 return Err("Invalid prologue".to_string());
173 }
174 };
175 self.tx
176 .send(TwoPartMessage::from_header(header_bytes))
177 .await
178 .map_err(|e| e.to_string())?;
179 } else {
180 panic!("Prologue already sent; or not set; logic error");
181 }
182 Ok(())
183 }
184}
185
186pub struct StreamReceiver {
187 rx: tokio::sync::mpsc::Receiver<Bytes>,
188}
189
190#[derive(Debug, Clone, Serialize, Deserialize)]
200pub struct ConnectionInfo {
201 pub transport: String,
202 pub info: String,
203}
204
205#[derive(Clone, Builder)]
212pub struct StreamOptions {
213 pub context: Arc<dyn AsyncEngineContext>,
215
216 pub enable_request_stream: bool,
221
222 pub enable_response_stream: bool,
225
226 #[builder(default = "8")]
228 pub send_buffer_count: usize,
229
230 #[builder(default = "8")]
232 pub recv_buffer_count: usize,
233}
234
235impl StreamOptions {
236 pub fn builder() -> StreamOptionsBuilder {
237 StreamOptionsBuilder::default()
238 }
239}
240
241pub struct Egress<Req: PipelineIO, Resp: PipelineIO> {
242 transport_engine: Arc<dyn AsyncTransportEngine<Req, Resp>>,
243}
244
245#[async_trait]
246impl<T: Data, U: Data> AsyncEngine<SingleIn<T>, ManyOut<U>, Error>
247 for Egress<SingleIn<T>, ManyOut<U>>
248where
249 T: Data + Serialize,
250 U: for<'de> Deserialize<'de> + Data,
251{
252 async fn generate(&self, request: SingleIn<T>) -> Result<ManyOut<U>, Error> {
253 self.transport_engine.generate(request).await
254 }
255}
256
257#[derive(Debug, Clone, Serialize, Deserialize)]
258#[serde(rename_all = "snake_case")]
259enum RequestType {
260 SingleIn,
261 ManyIn,
262}
263
264#[derive(Debug, Clone, Serialize, Deserialize)]
265#[serde(rename_all = "snake_case")]
266enum ResponseType {
267 SingleOut,
268 ManyOut,
269}
270
271#[derive(Debug, Clone, Serialize, Deserialize)]
272struct RequestControlMessage {
273 id: String,
274 request_type: RequestType,
275 response_type: ResponseType,
276 connection_info: ConnectionInfo,
277}
278
279pub struct Ingress<Req: PipelineIO, Resp: PipelineIO> {
280 segment: OnceLock<Arc<SegmentSource<Req, Resp>>>,
281 metrics: OnceLock<Arc<WorkHandlerMetrics>>,
282 endpoint_health_check_notifier: OnceLock<Arc<tokio::sync::Notify>>,
284}
285
286impl<Req: PipelineIO + Sync, Resp: PipelineIO> Ingress<Req, Resp> {
287 pub fn new() -> Arc<Self> {
288 Arc::new(Self {
289 segment: OnceLock::new(),
290 metrics: OnceLock::new(),
291 endpoint_health_check_notifier: OnceLock::new(),
292 })
293 }
294
295 pub fn attach(&self, segment: Arc<SegmentSource<Req, Resp>>) -> Result<()> {
296 self.segment
297 .set(segment)
298 .map_err(|_| anyhow::anyhow!("Segment already set"))
299 }
300
301 pub fn add_metrics(
302 &self,
303 endpoint: &crate::component::Endpoint,
304 metrics_labels: Option<&[(&str, &str)]>,
305 ) -> Result<()> {
306 let metrics = WorkHandlerMetrics::from_endpoint(endpoint, metrics_labels)
307 .map_err(|e| anyhow::anyhow!("Failed to create work handler metrics: {}", e))?;
308
309 self.metrics
310 .set(Arc::new(metrics))
311 .map_err(|_| anyhow::anyhow!("Metrics already set"))
312 }
313
314 pub fn link(segment: Arc<SegmentSource<Req, Resp>>) -> Result<Arc<Self>> {
315 let ingress = Ingress::new();
316 ingress.attach(segment)?;
317 Ok(ingress)
318 }
319
320 pub fn for_pipeline(segment: Arc<SegmentSource<Req, Resp>>) -> Result<Arc<Self>> {
321 let ingress = Ingress::new();
322 ingress.attach(segment)?;
323 Ok(ingress)
324 }
325
326 pub fn for_engine(engine: ServiceEngine<Req, Resp>) -> Result<Arc<Self>> {
327 let frontend = SegmentSource::<Req, Resp>::new();
328 let backend = ServiceBackend::from_engine(engine);
329
330 let pipeline = frontend.link(backend)?.link(frontend)?;
332
333 let ingress = Ingress::new();
334 ingress.attach(pipeline)?;
335
336 Ok(ingress)
337 }
338
339 fn metrics(&self) -> Option<&Arc<WorkHandlerMetrics>> {
341 self.metrics.get()
342 }
343}
344
345#[async_trait]
346pub trait PushWorkHandler: Send + Sync {
347 async fn handle_payload(&self, payload: Bytes) -> Result<(), PipelineError>;
348
349 fn add_metrics(
351 &self,
352 endpoint: &crate::component::Endpoint,
353 metrics_labels: Option<&[(&str, &str)]>,
354 ) -> Result<()>;
355
356 fn set_endpoint_health_check_notifier(
358 &self,
359 _notifier: Arc<tokio::sync::Notify>,
360 ) -> Result<()> {
361 Ok(())
363 }
364}
365
366#[derive(Serialize, Deserialize, Debug)]
411pub struct NetworkStreamWrapper<U> {
412 #[serde(skip_serializing_if = "Option::is_none")]
413 pub data: Option<U>,
414 pub complete_final: bool,
415}