1pub mod codec;
12pub mod egress;
13pub mod ingress;
14pub mod manager;
15pub mod tcp;
16
17use crate::SystemHealth;
18use std::sync::{Arc, OnceLock};
19
20use anyhow::Result;
21use async_trait::async_trait;
22use bytes::Bytes;
23use codec::{TwoPartCodec, TwoPartMessage, TwoPartMessageType};
24use derive_builder::Builder;
25use futures::StreamExt;
26use super::{AsyncEngine, AsyncEngineContext, AsyncEngineContextProvider, ResponseStream};
28use serde::{Deserialize, Serialize};
29
30use super::{
31 AsyncTransportEngine, Context, Data, Error, ManyOut, PipelineError, PipelineIO, SegmentSource,
32 ServiceBackend, ServiceEngine, SingleIn, Source, context,
33};
34use crate::metrics::MetricsHierarchy;
35use ingress::push_handler::WorkHandlerMetrics;
36use prometheus::{CounterVec, Histogram, IntCounter, IntCounterVec, IntGauge};
37
38pub(crate) const DEFAULT_TCP_MAX_MESSAGE_SIZE: usize = 32 * 1024 * 1024;
40
41static TCP_MAX_MESSAGE_SIZE: OnceLock<usize> = OnceLock::new();
42
43pub(crate) fn get_tcp_max_message_size() -> usize {
46 *TCP_MAX_MESSAGE_SIZE.get_or_init(|| {
47 std::env::var("DYN_TCP_MAX_MESSAGE_SIZE")
48 .ok()
49 .and_then(|s| s.parse::<usize>().ok())
50 .unwrap_or(DEFAULT_TCP_MAX_MESSAGE_SIZE)
51 })
52}
53
54pub trait Codable: PipelineIO + Serialize + for<'de> Deserialize<'de> {}
55impl<T: PipelineIO + Serialize + for<'de> Deserialize<'de>> Codable for T {}
56
57#[async_trait]
59pub trait WorkQueueConsumer {
60 async fn dequeue(&self) -> Result<Bytes, String>;
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
64#[serde(rename_all = "snake_case")]
65pub enum StreamType {
66 Request,
67 Response,
68}
69
70#[derive(Debug, Clone, Serialize, Deserialize)]
71#[serde(rename_all = "snake_case")]
72pub(crate) enum RequestType {
73 SingleIn,
74 ManyIn,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
78#[serde(rename_all = "snake_case")]
79pub(crate) enum ResponseType {
80 SingleOut,
81 ManyOut,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub(crate) struct RequestControlMessage {
86 pub(crate) id: String,
87 pub(crate) request_type: RequestType,
88 pub(crate) response_type: ResponseType,
89 pub(crate) connection_info: ConnectionInfo,
90 #[serde(default, skip_serializing_if = "std::collections::BTreeMap::is_empty")]
91 pub(crate) metadata: std::collections::BTreeMap<String, String>,
92 #[serde(default, skip_serializing_if = "Option::is_none")]
96 pub(crate) frontend_send_ts_ns: Option<u64>,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
100#[serde(rename_all = "snake_case")]
101pub enum ControlMessage {
102 Stop,
103 Kill,
104 Sentinel,
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
114pub struct ResponseStreamPrologue {
115 error: Option<String>,
116}
117
118pub type StreamProvider<T> = tokio::sync::oneshot::Receiver<Result<T, String>>;
119
120struct Cleanup(Option<Box<dyn FnOnce() + Send + 'static>>);
123
124impl Drop for Cleanup {
125 fn drop(&mut self) {
126 if let Some(f) = self.0.take() {
127 f();
128 }
129 }
130}
131
132pub struct RegisteredStream<T> {
136 pub connection_info: ConnectionInfo,
137 pub stream_provider: StreamProvider<T>,
138 cleanup: Cleanup,
139}
140
141impl<T> std::fmt::Debug for RegisteredStream<T> {
142 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143 f.debug_struct("RegisteredStream")
144 .field("connection_info", &self.connection_info)
145 .finish_non_exhaustive()
146 }
147}
148
149impl<T> RegisteredStream<T> {
150 pub(crate) fn new(connection_info: ConnectionInfo, stream_provider: StreamProvider<T>) -> Self {
151 Self {
152 connection_info,
153 stream_provider,
154 cleanup: Cleanup(None),
155 }
156 }
157
158 pub(crate) fn with_cleanup<F>(mut self, cleanup: F) -> Self
159 where
160 F: FnOnce() + Send + 'static,
161 {
162 self.cleanup.0 = Some(Box::new(cleanup));
163 self
164 }
165
166 pub fn into_parts(self) -> (ConnectionInfo, StreamProvider<T>) {
169 let Self {
170 connection_info,
171 stream_provider,
172 mut cleanup,
173 } = self;
174 cleanup.0.take();
175 (connection_info, stream_provider)
176 }
177}
178
179pub struct PendingConnections {
182 pub send_stream: Option<RegisteredStream<StreamSender>>,
183 pub recv_stream: Option<RegisteredStream<StreamReceiver>>,
184}
185
186impl PendingConnections {
187 pub fn into_parts(
188 self,
189 ) -> (
190 Option<RegisteredStream<StreamSender>>,
191 Option<RegisteredStream<StreamReceiver>>,
192 ) {
193 (self.send_stream, self.recv_stream)
194 }
195}
196
197#[async_trait::async_trait]
200pub trait ResponseService {
201 async fn register(&self, options: StreamOptions) -> PendingConnections;
202}
203
204#[cfg(test)]
205mod registered_stream_tests {
206 use super::*;
207 use std::sync::atomic::{AtomicBool, Ordering};
208
209 fn dummy_conn_info() -> ConnectionInfo {
210 ConnectionInfo {
211 transport: "test".to_string(),
212 info: "{}".to_string(),
213 }
214 }
215
216 #[test]
218 fn drop_runs_cleanup() {
219 let flag = Arc::new(AtomicBool::new(false));
220 let flag_clone = flag.clone();
221
222 let (_tx, rx) = tokio::sync::oneshot::channel::<Result<(), String>>();
223 let stream = RegisteredStream::new(dummy_conn_info(), rx).with_cleanup(move || {
224 flag_clone.store(true, Ordering::SeqCst);
225 });
226
227 drop(stream);
228 assert!(
229 flag.load(Ordering::SeqCst),
230 "cleanup must fire when RegisteredStream is dropped"
231 );
232 }
233
234 #[test]
238 fn into_parts_disarms_cleanup() {
239 let flag = Arc::new(AtomicBool::new(false));
240 let flag_clone = flag.clone();
241
242 let (_tx, rx) = tokio::sync::oneshot::channel::<Result<(), String>>();
243 let stream = RegisteredStream::new(dummy_conn_info(), rx).with_cleanup(move || {
244 flag_clone.store(true, Ordering::SeqCst);
245 });
246
247 let (conn, provider) = stream.into_parts();
248 drop(conn);
249 drop(provider);
250
251 assert!(
252 !flag.load(Ordering::SeqCst),
253 "into_parts() must disarm the cleanup closure"
254 );
255 }
256
257 #[test]
259 fn drop_without_cleanup_is_a_noop() {
260 let (_tx, rx) = tokio::sync::oneshot::channel::<Result<(), String>>();
261 let stream: RegisteredStream<()> = RegisteredStream::new(dummy_conn_info(), rx);
262 drop(stream); }
264}
265
266pub struct StreamSender {
291 tx: tokio::sync::mpsc::Sender<TwoPartMessage>,
292 prologue: Option<ResponseStreamPrologue>,
293}
294
295impl StreamSender {
296 pub async fn send(&self, data: Bytes) -> Result<()> {
297 Ok(self.tx.send(TwoPartMessage::from_data(data)).await?)
298 }
299
300 pub async fn send_control(&self, control: ControlMessage) -> Result<()> {
301 let bytes = serde_json::to_vec(&control)?;
302 Ok(self
303 .tx
304 .send(TwoPartMessage::from_header(bytes.into()))
305 .await?)
306 }
307
308 #[allow(clippy::needless_update)]
309 pub async fn send_prologue(&mut self, error: Option<String>) -> Result<(), String> {
310 if let Some(_prologue) = self.prologue.take() {
314 let prologue = ResponseStreamPrologue { error };
316 let header_bytes: Bytes = match serde_json::to_vec(&prologue) {
317 Ok(b) => b.into(),
318 Err(err) => {
319 tracing::error!(%err, "send_prologue: ResponseStreamPrologue did not serialize to a JSON array");
320 return Err("Invalid prologue".to_string());
321 }
322 };
323 self.tx
324 .send(TwoPartMessage::from_header(header_bytes))
325 .await
326 .map_err(|e| e.to_string())?;
327 } else {
328 panic!("Prologue already sent; or not set; logic error");
329 }
330 Ok(())
331 }
332}
333
334pub struct StreamReceiver {
335 rx: tokio::sync::mpsc::Receiver<Bytes>,
336}
337
338#[derive(Debug, Clone, Serialize, Deserialize)]
348pub struct ConnectionInfo {
349 pub transport: String,
350 pub info: String,
351}
352
353#[derive(Clone, Builder)]
360pub struct StreamOptions {
361 pub context: Arc<dyn AsyncEngineContext>,
363
364 pub enable_request_stream: bool,
369
370 pub enable_response_stream: bool,
373
374 #[builder(default = "8")]
376 pub send_buffer_count: usize,
377
378 #[builder(default = "8")]
380 pub recv_buffer_count: usize,
381}
382
383impl StreamOptions {
384 pub fn builder() -> StreamOptionsBuilder {
385 StreamOptionsBuilder::default()
386 }
387}
388
389pub struct Egress<Req: PipelineIO, Resp: PipelineIO> {
390 transport_engine: Arc<dyn AsyncTransportEngine<Req, Resp>>,
391}
392
393#[cfg(test)]
394mod tests {
395 use super::{RequestControlMessage, RequestType, ResponseType};
396
397 #[test]
398 fn request_control_message_defaults_missing_metadata() {
399 let json = r#"{
400 "id": "request-123",
401 "request_type": "single_in",
402 "response_type": "many_out",
403 "connection_info": {
404 "transport": "tcp",
405 "info": "{}"
406 }
407 }"#;
408
409 let message: RequestControlMessage =
410 serde_json::from_str(json).expect("control message should deserialize");
411
412 assert_eq!(message.id, "request-123");
413 assert!(matches!(message.request_type, RequestType::SingleIn));
414 assert!(matches!(message.response_type, ResponseType::ManyOut));
415 assert_eq!(message.connection_info.transport, "tcp");
416 assert_eq!(message.connection_info.info, "{}");
417 assert!(message.metadata.is_empty());
418 assert!(message.frontend_send_ts_ns.is_none());
419 }
420}
421
422#[async_trait]
423impl<T: Data, U: Data> AsyncEngine<SingleIn<T>, ManyOut<U>, Error>
424 for Egress<SingleIn<T>, ManyOut<U>>
425where
426 T: Data + Serialize,
427 U: for<'de> Deserialize<'de> + Data,
428{
429 async fn generate(&self, request: SingleIn<T>) -> Result<ManyOut<U>, Error> {
430 self.transport_engine.generate(request).await
431 }
432}
433
434pub struct Ingress<Req: PipelineIO, Resp: PipelineIO> {
435 segment: OnceLock<Arc<SegmentSource<Req, Resp>>>,
436 metrics: OnceLock<Arc<WorkHandlerMetrics>>,
437 endpoint_health_check_notifier: OnceLock<Arc<tokio::sync::Notify>>,
439}
440
441impl<Req: PipelineIO + Sync, Resp: PipelineIO> Ingress<Req, Resp> {
442 pub fn new() -> Arc<Self> {
443 Arc::new(Self {
444 segment: OnceLock::new(),
445 metrics: OnceLock::new(),
446 endpoint_health_check_notifier: OnceLock::new(),
447 })
448 }
449
450 pub fn attach(&self, segment: Arc<SegmentSource<Req, Resp>>) -> Result<()> {
451 self.segment
452 .set(segment)
453 .map_err(|_| anyhow::anyhow!("Segment already set"))
454 }
455
456 pub fn add_metrics(
457 &self,
458 endpoint: &crate::component::Endpoint,
459 metrics_labels: Option<&[(&str, &str)]>,
460 ) -> Result<()> {
461 let metrics = WorkHandlerMetrics::from_endpoint(endpoint, metrics_labels)
462 .map_err(|e| anyhow::anyhow!("Failed to create work handler metrics: {}", e))?;
463
464 crate::metrics::work_handler_perf::ensure_work_handler_perf_metrics_registered(
466 endpoint.get_metrics_registry(),
467 );
468
469 crate::metrics::work_handler_pool::ensure_work_handler_pool_metrics_registered(
473 endpoint.get_metrics_registry(),
474 );
475
476 self.metrics
477 .set(Arc::new(metrics))
478 .map_err(|_| anyhow::anyhow!("Metrics already set"))
479 }
480
481 pub fn link(segment: Arc<SegmentSource<Req, Resp>>) -> Result<Arc<Self>> {
482 let ingress = Ingress::new();
483 ingress.attach(segment)?;
484 Ok(ingress)
485 }
486
487 pub fn for_pipeline(segment: Arc<SegmentSource<Req, Resp>>) -> Result<Arc<Self>> {
488 let ingress = Ingress::new();
489 ingress.attach(segment)?;
490 Ok(ingress)
491 }
492
493 pub fn for_engine(engine: ServiceEngine<Req, Resp>) -> Result<Arc<Self>> {
494 let frontend = SegmentSource::<Req, Resp>::new();
495 let backend = ServiceBackend::from_engine(engine);
496
497 let pipeline = frontend.link(backend)?.link(frontend)?;
499
500 let ingress = Ingress::new();
501 ingress.attach(pipeline)?;
502
503 Ok(ingress)
504 }
505
506 fn metrics(&self) -> Option<&Arc<WorkHandlerMetrics>> {
508 self.metrics.get()
509 }
510}
511
512#[async_trait]
513pub trait PushWorkHandler: Send + Sync {
514 async fn handle_payload(
515 &self,
516 payload: Bytes,
517 request_id: Option<String>,
518 ) -> Result<(), PipelineError>;
519
520 fn add_metrics(
522 &self,
523 endpoint: &crate::component::Endpoint,
524 metrics_labels: Option<&[(&str, &str)]>,
525 ) -> Result<()>;
526
527 fn set_endpoint_health_check_notifier(
529 &self,
530 _notifier: Arc<tokio::sync::Notify>,
531 ) -> Result<()> {
532 Ok(())
534 }
535}
536
537#[derive(Serialize, Deserialize, Debug)]
582pub struct NetworkStreamWrapper<U> {
583 #[serde(skip_serializing_if = "Option::is_none")]
584 pub data: Option<U>,
585 pub complete_final: bool,
586}