1pub mod codec;
12pub mod egress;
13pub mod ingress;
14pub mod manager;
15pub mod tcp;
16
17use crate::SystemHealth;
18use std::sync::{Arc, OnceLock};
19
20use anyhow::Result;
21use async_trait::async_trait;
22use bytes::Bytes;
23use codec::{TwoPartCodec, TwoPartMessage, TwoPartMessageType};
24use derive_builder::Builder;
25use futures::StreamExt;
26use super::{AsyncEngine, AsyncEngineContext, AsyncEngineContextProvider, ResponseStream};
28use serde::{Deserialize, Serialize};
29
30use super::{
31 AsyncTransportEngine, Context, Data, Error, ManyOut, PipelineError, PipelineIO, SegmentSource,
32 ServiceBackend, ServiceEngine, SingleIn, Source, context,
33};
34use ingress::push_handler::WorkHandlerMetrics;
35
36pub const STREAM_ERR_MSG: &str = "Stream ended before generation completed";
38
39use crate::metrics::MetricsHierarchy;
41use prometheus::{CounterVec, Histogram, IntCounter, IntCounterVec, IntGauge};
42
43pub trait Codable: PipelineIO + Serialize + for<'de> Deserialize<'de> {}
44impl<T: PipelineIO + Serialize + for<'de> Deserialize<'de>> Codable for T {}
45
46#[async_trait]
48pub trait WorkQueueConsumer {
49 async fn dequeue(&self) -> Result<Bytes, String>;
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
53#[serde(rename_all = "snake_case")]
54pub enum StreamType {
55 Request,
56 Response,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
60#[serde(rename_all = "snake_case")]
61pub enum ControlMessage {
62 Stop,
63 Kill,
64 Sentinel,
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
74pub struct ResponseStreamPrologue {
75 error: Option<String>,
76}
77
78pub type StreamProvider<T> = tokio::sync::oneshot::Receiver<Result<T, String>>;
79
80#[derive(Debug)]
89pub struct RegisteredStream<T> {
90 pub connection_info: ConnectionInfo,
91 pub stream_provider: StreamProvider<T>,
92}
93
94impl<T> RegisteredStream<T> {
95 pub fn into_parts(self) -> (ConnectionInfo, StreamProvider<T>) {
96 (self.connection_info, self.stream_provider)
97 }
98}
99
100pub struct PendingConnections {
103 pub send_stream: Option<RegisteredStream<StreamSender>>,
104 pub recv_stream: Option<RegisteredStream<StreamReceiver>>,
105}
106
107impl PendingConnections {
108 pub fn into_parts(
109 self,
110 ) -> (
111 Option<RegisteredStream<StreamSender>>,
112 Option<RegisteredStream<StreamReceiver>>,
113 ) {
114 (self.send_stream, self.recv_stream)
115 }
116}
117
118#[async_trait::async_trait]
121pub trait ResponseService {
122 async fn register(&self, options: StreamOptions) -> PendingConnections;
123}
124
125pub struct StreamSender {
150 tx: tokio::sync::mpsc::Sender<TwoPartMessage>,
151 prologue: Option<ResponseStreamPrologue>,
152}
153
154impl StreamSender {
155 pub async fn send(&self, data: Bytes) -> Result<()> {
156 Ok(self.tx.send(TwoPartMessage::from_data(data)).await?)
157 }
158
159 pub async fn send_control(&self, control: ControlMessage) -> Result<()> {
160 let bytes = serde_json::to_vec(&control)?;
161 Ok(self
162 .tx
163 .send(TwoPartMessage::from_header(bytes.into()))
164 .await?)
165 }
166
167 #[allow(clippy::needless_update)]
168 pub async fn send_prologue(&mut self, error: Option<String>) -> Result<(), String> {
169 if let Some(prologue) = self.prologue.take() {
170 let prologue = ResponseStreamPrologue { error, ..prologue };
171 let header_bytes: Bytes = match serde_json::to_vec(&prologue) {
172 Ok(b) => b.into(),
173 Err(err) => {
174 tracing::error!(%err, "send_prologue: ResponseStreamPrologue did not serialize to a JSON array");
175 return Err("Invalid prologue".to_string());
176 }
177 };
178 self.tx
179 .send(TwoPartMessage::from_header(header_bytes))
180 .await
181 .map_err(|e| e.to_string())?;
182 } else {
183 panic!("Prologue already sent; or not set; logic error");
184 }
185 Ok(())
186 }
187}
188
189pub struct StreamReceiver {
190 rx: tokio::sync::mpsc::Receiver<Bytes>,
191}
192
193#[derive(Debug, Clone, Serialize, Deserialize)]
203pub struct ConnectionInfo {
204 pub transport: String,
205 pub info: String,
206}
207
208#[derive(Clone, Builder)]
215pub struct StreamOptions {
216 pub context: Arc<dyn AsyncEngineContext>,
218
219 pub enable_request_stream: bool,
224
225 pub enable_response_stream: bool,
228
229 #[builder(default = "8")]
231 pub send_buffer_count: usize,
232
233 #[builder(default = "8")]
235 pub recv_buffer_count: usize,
236}
237
238impl StreamOptions {
239 pub fn builder() -> StreamOptionsBuilder {
240 StreamOptionsBuilder::default()
241 }
242}
243
244pub struct Egress<Req: PipelineIO, Resp: PipelineIO> {
245 transport_engine: Arc<dyn AsyncTransportEngine<Req, Resp>>,
246}
247
248#[async_trait]
249impl<T: Data, U: Data> AsyncEngine<SingleIn<T>, ManyOut<U>, Error>
250 for Egress<SingleIn<T>, ManyOut<U>>
251where
252 T: Data + Serialize,
253 U: for<'de> Deserialize<'de> + Data,
254{
255 async fn generate(&self, request: SingleIn<T>) -> Result<ManyOut<U>, Error> {
256 self.transport_engine.generate(request).await
257 }
258}
259
260#[derive(Debug, Clone, Serialize, Deserialize)]
261#[serde(rename_all = "snake_case")]
262enum RequestType {
263 SingleIn,
264 ManyIn,
265}
266
267#[derive(Debug, Clone, Serialize, Deserialize)]
268#[serde(rename_all = "snake_case")]
269enum ResponseType {
270 SingleOut,
271 ManyOut,
272}
273
274#[derive(Debug, Clone, Serialize, Deserialize)]
275struct RequestControlMessage {
276 id: String,
277 request_type: RequestType,
278 response_type: ResponseType,
279 connection_info: ConnectionInfo,
280}
281
282pub struct Ingress<Req: PipelineIO, Resp: PipelineIO> {
283 segment: OnceLock<Arc<SegmentSource<Req, Resp>>>,
284 metrics: OnceLock<Arc<WorkHandlerMetrics>>,
285 endpoint_health_check_notifier: OnceLock<Arc<tokio::sync::Notify>>,
287}
288
289impl<Req: PipelineIO + Sync, Resp: PipelineIO> Ingress<Req, Resp> {
290 pub fn new() -> Arc<Self> {
291 Arc::new(Self {
292 segment: OnceLock::new(),
293 metrics: OnceLock::new(),
294 endpoint_health_check_notifier: OnceLock::new(),
295 })
296 }
297
298 pub fn attach(&self, segment: Arc<SegmentSource<Req, Resp>>) -> Result<()> {
299 self.segment
300 .set(segment)
301 .map_err(|_| anyhow::anyhow!("Segment already set"))
302 }
303
304 pub fn add_metrics(
305 &self,
306 endpoint: &crate::component::Endpoint,
307 metrics_labels: Option<&[(&str, &str)]>,
308 ) -> Result<()> {
309 let metrics = WorkHandlerMetrics::from_endpoint(endpoint, metrics_labels)
310 .map_err(|e| anyhow::anyhow!("Failed to create work handler metrics: {}", e))?;
311
312 self.metrics
313 .set(Arc::new(metrics))
314 .map_err(|_| anyhow::anyhow!("Metrics already set"))
315 }
316
317 pub fn link(segment: Arc<SegmentSource<Req, Resp>>) -> Result<Arc<Self>> {
318 let ingress = Ingress::new();
319 ingress.attach(segment)?;
320 Ok(ingress)
321 }
322
323 pub fn for_pipeline(segment: Arc<SegmentSource<Req, Resp>>) -> Result<Arc<Self>> {
324 let ingress = Ingress::new();
325 ingress.attach(segment)?;
326 Ok(ingress)
327 }
328
329 pub fn for_engine(engine: ServiceEngine<Req, Resp>) -> Result<Arc<Self>> {
330 let frontend = SegmentSource::<Req, Resp>::new();
331 let backend = ServiceBackend::from_engine(engine);
332
333 let pipeline = frontend.link(backend)?.link(frontend)?;
335
336 let ingress = Ingress::new();
337 ingress.attach(pipeline)?;
338
339 Ok(ingress)
340 }
341
342 fn metrics(&self) -> Option<&Arc<WorkHandlerMetrics>> {
344 self.metrics.get()
345 }
346}
347
348#[async_trait]
349pub trait PushWorkHandler: Send + Sync {
350 async fn handle_payload(&self, payload: Bytes) -> Result<(), PipelineError>;
351
352 fn add_metrics(
354 &self,
355 endpoint: &crate::component::Endpoint,
356 metrics_labels: Option<&[(&str, &str)]>,
357 ) -> Result<()>;
358
359 fn set_endpoint_health_check_notifier(
361 &self,
362 _notifier: Arc<tokio::sync::Notify>,
363 ) -> Result<()> {
364 Ok(())
366 }
367}
368
369#[derive(Serialize, Deserialize, Debug)]
414pub struct NetworkStreamWrapper<U> {
415 #[serde(skip_serializing_if = "Option::is_none")]
416 pub data: Option<U>,
417 pub complete_final: bool,
418}