1pub mod codec;
7pub mod egress;
8pub mod ingress;
9pub mod tcp;
10
11use crate::SystemHealth;
12use std::sync::{Arc, OnceLock};
13
14use anyhow::Result;
15use async_trait::async_trait;
16use bytes::Bytes;
17use codec::{TwoPartCodec, TwoPartMessage, TwoPartMessageType};
18use derive_builder::Builder;
19use futures::StreamExt;
20use super::{AsyncEngine, AsyncEngineContext, AsyncEngineContextProvider, ResponseStream};
22use serde::{Deserialize, Serialize};
23
24use super::{
25 AsyncTransportEngine, Context, Data, Error, ManyOut, PipelineError, PipelineIO, SegmentSource,
26 ServiceBackend, ServiceEngine, SingleIn, Source, context,
27};
28use ingress::push_handler::WorkHandlerMetrics;
29
30pub const STREAM_ERR_MSG: &str = "Stream ended before generation completed";
32
33use crate::metrics::MetricsRegistry;
35use prometheus::{CounterVec, Histogram, IntCounter, IntCounterVec, IntGauge};
36
37pub trait Codable: PipelineIO + Serialize + for<'de> Deserialize<'de> {}
38impl<T: PipelineIO + Serialize + for<'de> Deserialize<'de>> Codable for T {}
39
40#[async_trait]
42pub trait WorkQueueConsumer {
43 async fn dequeue(&self) -> Result<Bytes, String>;
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
47#[serde(rename_all = "snake_case")]
48pub enum StreamType {
49 Request,
50 Response,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
54#[serde(rename_all = "snake_case")]
55pub enum ControlMessage {
56 Stop,
57 Kill,
58 Sentinel,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
68pub struct ResponseStreamPrologue {
69 error: Option<String>,
70}
71
72pub type StreamProvider<T> = tokio::sync::oneshot::Receiver<Result<T, String>>;
73
74#[derive(Debug)]
83pub struct RegisteredStream<T> {
84 pub connection_info: ConnectionInfo,
85 pub stream_provider: StreamProvider<T>,
86}
87
88impl<T> RegisteredStream<T> {
89 pub fn into_parts(self) -> (ConnectionInfo, StreamProvider<T>) {
90 (self.connection_info, self.stream_provider)
91 }
92}
93
94pub struct PendingConnections {
97 pub send_stream: Option<RegisteredStream<StreamSender>>,
98 pub recv_stream: Option<RegisteredStream<StreamReceiver>>,
99}
100
101impl PendingConnections {
102 pub fn into_parts(
103 self,
104 ) -> (
105 Option<RegisteredStream<StreamSender>>,
106 Option<RegisteredStream<StreamReceiver>>,
107 ) {
108 (self.send_stream, self.recv_stream)
109 }
110}
111
112#[async_trait::async_trait]
115pub trait ResponseService {
116 async fn register(&self, options: StreamOptions) -> PendingConnections;
117}
118
119pub struct StreamSender {
144 tx: tokio::sync::mpsc::Sender<TwoPartMessage>,
145 prologue: Option<ResponseStreamPrologue>,
146}
147
148impl StreamSender {
149 pub async fn send(&self, data: Bytes) -> Result<()> {
150 Ok(self.tx.send(TwoPartMessage::from_data(data)).await?)
151 }
152
153 pub async fn send_control(&self, control: ControlMessage) -> Result<()> {
154 let bytes = serde_json::to_vec(&control)?;
155 Ok(self
156 .tx
157 .send(TwoPartMessage::from_header(bytes.into()))
158 .await?)
159 }
160
161 #[allow(clippy::needless_update)]
162 pub async fn send_prologue(&mut self, error: Option<String>) -> Result<(), String> {
163 if let Some(prologue) = self.prologue.take() {
164 let prologue = ResponseStreamPrologue { error, ..prologue };
165 let header_bytes: Bytes = match serde_json::to_vec(&prologue) {
166 Ok(b) => b.into(),
167 Err(err) => {
168 tracing::error!(%err, "send_prologue: ResponseStreamPrologue did not serialize to a JSON array");
169 return Err("Invalid prologue".to_string());
170 }
171 };
172 self.tx
173 .send(TwoPartMessage::from_header(header_bytes))
174 .await
175 .map_err(|e| e.to_string())?;
176 } else {
177 panic!("Prologue already sent; or not set; logic error");
178 }
179 Ok(())
180 }
181}
182
183pub struct StreamReceiver {
184 rx: tokio::sync::mpsc::Receiver<Bytes>,
185}
186
187#[derive(Debug, Clone, Serialize, Deserialize)]
197pub struct ConnectionInfo {
198 pub transport: String,
199 pub info: String,
200}
201
202#[derive(Clone, Builder)]
209pub struct StreamOptions {
210 pub context: Arc<dyn AsyncEngineContext>,
212
213 pub enable_request_stream: bool,
218
219 pub enable_response_stream: bool,
222
223 #[builder(default = "8")]
225 pub send_buffer_count: usize,
226
227 #[builder(default = "8")]
229 pub recv_buffer_count: usize,
230}
231
232impl StreamOptions {
233 pub fn builder() -> StreamOptionsBuilder {
234 StreamOptionsBuilder::default()
235 }
236}
237
238pub struct Egress<Req: PipelineIO, Resp: PipelineIO> {
239 transport_engine: Arc<dyn AsyncTransportEngine<Req, Resp>>,
240}
241
242#[async_trait]
243impl<T: Data, U: Data> AsyncEngine<SingleIn<T>, ManyOut<U>, Error>
244 for Egress<SingleIn<T>, ManyOut<U>>
245where
246 T: Data + Serialize,
247 U: for<'de> Deserialize<'de> + Data,
248{
249 async fn generate(&self, request: SingleIn<T>) -> Result<ManyOut<U>, Error> {
250 self.transport_engine.generate(request).await
251 }
252}
253
254#[derive(Debug, Clone, Serialize, Deserialize)]
255#[serde(rename_all = "snake_case")]
256enum RequestType {
257 SingleIn,
258 ManyIn,
259}
260
261#[derive(Debug, Clone, Serialize, Deserialize)]
262#[serde(rename_all = "snake_case")]
263enum ResponseType {
264 SingleOut,
265 ManyOut,
266}
267
268#[derive(Debug, Clone, Serialize, Deserialize)]
269struct RequestControlMessage {
270 id: String,
271 request_type: RequestType,
272 response_type: ResponseType,
273 connection_info: ConnectionInfo,
274}
275
276pub struct Ingress<Req: PipelineIO, Resp: PipelineIO> {
277 segment: OnceLock<Arc<SegmentSource<Req, Resp>>>,
278 metrics: OnceLock<Arc<WorkHandlerMetrics>>,
279 endpoint_health_check_notifier: OnceLock<Arc<tokio::sync::Notify>>,
281}
282
283impl<Req: PipelineIO + Sync, Resp: PipelineIO> Ingress<Req, Resp> {
284 pub fn new() -> Arc<Self> {
285 Arc::new(Self {
286 segment: OnceLock::new(),
287 metrics: OnceLock::new(),
288 endpoint_health_check_notifier: OnceLock::new(),
289 })
290 }
291
292 pub fn attach(&self, segment: Arc<SegmentSource<Req, Resp>>) -> Result<()> {
293 self.segment
294 .set(segment)
295 .map_err(|_| anyhow::anyhow!("Segment already set"))
296 }
297
298 pub fn add_metrics(
299 &self,
300 endpoint: &crate::component::Endpoint,
301 metrics_labels: Option<&[(&str, &str)]>,
302 ) -> Result<()> {
303 let metrics = WorkHandlerMetrics::from_endpoint(endpoint, metrics_labels)
304 .map_err(|e| anyhow::anyhow!("Failed to create work handler metrics: {}", e))?;
305
306 self.metrics
307 .set(Arc::new(metrics))
308 .map_err(|_| anyhow::anyhow!("Metrics already set"))
309 }
310
311 pub fn link(segment: Arc<SegmentSource<Req, Resp>>) -> Result<Arc<Self>> {
312 let ingress = Ingress::new();
313 ingress.attach(segment)?;
314 Ok(ingress)
315 }
316
317 pub fn for_pipeline(segment: Arc<SegmentSource<Req, Resp>>) -> Result<Arc<Self>> {
318 let ingress = Ingress::new();
319 ingress.attach(segment)?;
320 Ok(ingress)
321 }
322
323 pub fn for_engine(engine: ServiceEngine<Req, Resp>) -> Result<Arc<Self>> {
324 let frontend = SegmentSource::<Req, Resp>::new();
325 let backend = ServiceBackend::from_engine(engine);
326
327 let pipeline = frontend.link(backend)?.link(frontend)?;
329
330 let ingress = Ingress::new();
331 ingress.attach(pipeline)?;
332
333 Ok(ingress)
334 }
335
336 fn metrics(&self) -> Option<&Arc<WorkHandlerMetrics>> {
338 self.metrics.get()
339 }
340}
341
342#[async_trait]
343pub trait PushWorkHandler: Send + Sync {
344 async fn handle_payload(&self, payload: Bytes) -> Result<(), PipelineError>;
345
346 fn add_metrics(
348 &self,
349 endpoint: &crate::component::Endpoint,
350 metrics_labels: Option<&[(&str, &str)]>,
351 ) -> Result<()>;
352
353 fn set_endpoint_health_check_notifier(
355 &self,
356 _notifier: Arc<tokio::sync::Notify>,
357 ) -> Result<()> {
358 Ok(())
360 }
361}
362
363#[derive(Serialize, Deserialize, Debug)]
408pub struct NetworkStreamWrapper<U> {
409 #[serde(skip_serializing_if = "Option::is_none")]
410 pub data: Option<U>,
411 pub complete_final: bool,
412}