1pub mod codec;
12pub mod egress;
13pub mod ingress;
14pub mod manager;
15pub mod tcp;
16
17use crate::SystemHealth;
18use std::sync::{Arc, OnceLock};
19
20use anyhow::Result;
21use async_trait::async_trait;
22use bytes::Bytes;
23use codec::{TwoPartCodec, TwoPartMessage, TwoPartMessageType};
24use derive_builder::Builder;
25use futures::StreamExt;
26use super::{AsyncEngine, AsyncEngineContext, AsyncEngineContextProvider, ResponseStream};
28use serde::{Deserialize, Serialize};
29
30use super::{
31 AsyncTransportEngine, Context, Data, Error, ManyOut, PipelineError, PipelineIO, SegmentSource,
32 ServiceBackend, ServiceEngine, SingleIn, Source, context,
33};
34use ingress::push_handler::WorkHandlerMetrics;
35
36use crate::metrics::MetricsHierarchy;
38use prometheus::{CounterVec, Histogram, IntCounter, IntCounterVec, IntGauge};
39
40pub trait Codable: PipelineIO + Serialize + for<'de> Deserialize<'de> {}
41impl<T: PipelineIO + Serialize + for<'de> Deserialize<'de>> Codable for T {}
42
43#[async_trait]
45pub trait WorkQueueConsumer {
46 async fn dequeue(&self) -> Result<Bytes, String>;
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
50#[serde(rename_all = "snake_case")]
51pub enum StreamType {
52 Request,
53 Response,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
57#[serde(rename_all = "snake_case")]
58pub enum ControlMessage {
59 Stop,
60 Kill,
61 Sentinel,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
71pub struct ResponseStreamPrologue {
72 error: Option<String>,
73}
74
75pub type StreamProvider<T> = tokio::sync::oneshot::Receiver<Result<T, String>>;
76
77#[derive(Debug)]
86pub struct RegisteredStream<T> {
87 pub connection_info: ConnectionInfo,
88 pub stream_provider: StreamProvider<T>,
89}
90
91impl<T> RegisteredStream<T> {
92 pub fn into_parts(self) -> (ConnectionInfo, StreamProvider<T>) {
93 (self.connection_info, self.stream_provider)
94 }
95}
96
97pub struct PendingConnections {
100 pub send_stream: Option<RegisteredStream<StreamSender>>,
101 pub recv_stream: Option<RegisteredStream<StreamReceiver>>,
102}
103
104impl PendingConnections {
105 pub fn into_parts(
106 self,
107 ) -> (
108 Option<RegisteredStream<StreamSender>>,
109 Option<RegisteredStream<StreamReceiver>>,
110 ) {
111 (self.send_stream, self.recv_stream)
112 }
113}
114
115#[async_trait::async_trait]
118pub trait ResponseService {
119 async fn register(&self, options: StreamOptions) -> PendingConnections;
120}
121
122pub struct StreamSender {
147 tx: tokio::sync::mpsc::Sender<TwoPartMessage>,
148 prologue: Option<ResponseStreamPrologue>,
149}
150
151impl StreamSender {
152 pub async fn send(&self, data: Bytes) -> Result<()> {
153 Ok(self.tx.send(TwoPartMessage::from_data(data)).await?)
154 }
155
156 pub async fn send_control(&self, control: ControlMessage) -> Result<()> {
157 let bytes = serde_json::to_vec(&control)?;
158 Ok(self
159 .tx
160 .send(TwoPartMessage::from_header(bytes.into()))
161 .await?)
162 }
163
164 #[allow(clippy::needless_update)]
165 pub async fn send_prologue(&mut self, error: Option<String>) -> Result<(), String> {
166 if let Some(_prologue) = self.prologue.take() {
170 let prologue = ResponseStreamPrologue { error };
172 let header_bytes: Bytes = match serde_json::to_vec(&prologue) {
173 Ok(b) => b.into(),
174 Err(err) => {
175 tracing::error!(%err, "send_prologue: ResponseStreamPrologue did not serialize to a JSON array");
176 return Err("Invalid prologue".to_string());
177 }
178 };
179 self.tx
180 .send(TwoPartMessage::from_header(header_bytes))
181 .await
182 .map_err(|e| e.to_string())?;
183 } else {
184 panic!("Prologue already sent; or not set; logic error");
185 }
186 Ok(())
187 }
188}
189
190pub struct StreamReceiver {
191 rx: tokio::sync::mpsc::Receiver<Bytes>,
192}
193
194#[derive(Debug, Clone, Serialize, Deserialize)]
204pub struct ConnectionInfo {
205 pub transport: String,
206 pub info: String,
207}
208
209#[derive(Clone, Builder)]
216pub struct StreamOptions {
217 pub context: Arc<dyn AsyncEngineContext>,
219
220 pub enable_request_stream: bool,
225
226 pub enable_response_stream: bool,
229
230 #[builder(default = "8")]
232 pub send_buffer_count: usize,
233
234 #[builder(default = "8")]
236 pub recv_buffer_count: usize,
237}
238
239impl StreamOptions {
240 pub fn builder() -> StreamOptionsBuilder {
241 StreamOptionsBuilder::default()
242 }
243}
244
245pub struct Egress<Req: PipelineIO, Resp: PipelineIO> {
246 transport_engine: Arc<dyn AsyncTransportEngine<Req, Resp>>,
247}
248
249#[async_trait]
250impl<T: Data, U: Data> AsyncEngine<SingleIn<T>, ManyOut<U>, Error>
251 for Egress<SingleIn<T>, ManyOut<U>>
252where
253 T: Data + Serialize,
254 U: for<'de> Deserialize<'de> + Data,
255{
256 async fn generate(&self, request: SingleIn<T>) -> Result<ManyOut<U>, Error> {
257 self.transport_engine.generate(request).await
258 }
259}
260
261#[derive(Debug, Clone, Serialize, Deserialize)]
262#[serde(rename_all = "snake_case")]
263enum RequestType {
264 SingleIn,
265 ManyIn,
266}
267
268#[derive(Debug, Clone, Serialize, Deserialize)]
269#[serde(rename_all = "snake_case")]
270enum ResponseType {
271 SingleOut,
272 ManyOut,
273}
274
275#[derive(Debug, Clone, Serialize, Deserialize)]
276struct RequestControlMessage {
277 id: String,
278 request_type: RequestType,
279 response_type: ResponseType,
280 connection_info: ConnectionInfo,
281 #[serde(default, skip_serializing_if = "Option::is_none")]
285 frontend_send_ts_ns: Option<u64>,
286}
287
288pub struct Ingress<Req: PipelineIO, Resp: PipelineIO> {
289 segment: OnceLock<Arc<SegmentSource<Req, Resp>>>,
290 metrics: OnceLock<Arc<WorkHandlerMetrics>>,
291 endpoint_health_check_notifier: OnceLock<Arc<tokio::sync::Notify>>,
293}
294
295impl<Req: PipelineIO + Sync, Resp: PipelineIO> Ingress<Req, Resp> {
296 pub fn new() -> Arc<Self> {
297 Arc::new(Self {
298 segment: OnceLock::new(),
299 metrics: OnceLock::new(),
300 endpoint_health_check_notifier: OnceLock::new(),
301 })
302 }
303
304 pub fn attach(&self, segment: Arc<SegmentSource<Req, Resp>>) -> Result<()> {
305 self.segment
306 .set(segment)
307 .map_err(|_| anyhow::anyhow!("Segment already set"))
308 }
309
310 pub fn add_metrics(
311 &self,
312 endpoint: &crate::component::Endpoint,
313 metrics_labels: Option<&[(&str, &str)]>,
314 ) -> Result<()> {
315 let metrics = WorkHandlerMetrics::from_endpoint(endpoint, metrics_labels)
316 .map_err(|e| anyhow::anyhow!("Failed to create work handler metrics: {}", e))?;
317
318 crate::metrics::work_handler_perf::ensure_work_handler_perf_metrics_registered(
320 endpoint.get_metrics_registry(),
321 );
322
323 self.metrics
324 .set(Arc::new(metrics))
325 .map_err(|_| anyhow::anyhow!("Metrics already set"))
326 }
327
328 pub fn link(segment: Arc<SegmentSource<Req, Resp>>) -> Result<Arc<Self>> {
329 let ingress = Ingress::new();
330 ingress.attach(segment)?;
331 Ok(ingress)
332 }
333
334 pub fn for_pipeline(segment: Arc<SegmentSource<Req, Resp>>) -> Result<Arc<Self>> {
335 let ingress = Ingress::new();
336 ingress.attach(segment)?;
337 Ok(ingress)
338 }
339
340 pub fn for_engine(engine: ServiceEngine<Req, Resp>) -> Result<Arc<Self>> {
341 let frontend = SegmentSource::<Req, Resp>::new();
342 let backend = ServiceBackend::from_engine(engine);
343
344 let pipeline = frontend.link(backend)?.link(frontend)?;
346
347 let ingress = Ingress::new();
348 ingress.attach(pipeline)?;
349
350 Ok(ingress)
351 }
352
353 fn metrics(&self) -> Option<&Arc<WorkHandlerMetrics>> {
355 self.metrics.get()
356 }
357}
358
359#[async_trait]
360pub trait PushWorkHandler: Send + Sync {
361 async fn handle_payload(
362 &self,
363 payload: Bytes,
364 request_id: Option<String>,
365 ) -> Result<(), PipelineError>;
366
367 fn add_metrics(
369 &self,
370 endpoint: &crate::component::Endpoint,
371 metrics_labels: Option<&[(&str, &str)]>,
372 ) -> Result<()>;
373
374 fn set_endpoint_health_check_notifier(
376 &self,
377 _notifier: Arc<tokio::sync::Notify>,
378 ) -> Result<()> {
379 Ok(())
381 }
382}
383
384#[derive(Serialize, Deserialize, Debug)]
429pub struct NetworkStreamWrapper<U> {
430 #[serde(skip_serializing_if = "Option::is_none")]
431 pub data: Option<U>,
432 pub complete_final: bool,
433}