1use super::error::{TransportError, TransportResult};
38#[cfg(feature = "http-server")]
39use super::traits::RecvBatch;
40use super::traits::{CommitToken, TransportBase, TransportReceiver, TransportSender};
41#[cfg(feature = "http-server")]
42use super::types::Message;
43#[cfg(feature = "http-server")]
44use super::types::PayloadFormat;
45use super::types::SendResult;
46use super::work_batch::WorkBatch;
47use serde::{Deserialize, Serialize};
48use std::sync::Arc;
49#[cfg(feature = "http-server")]
50use std::sync::atomic::AtomicU64;
51use std::sync::atomic::{AtomicBool, Ordering};
52
53#[derive(Debug, Clone)]
59pub struct HttpToken {
60 pub seq: u64,
62
63 pub source_addr: Option<String>,
65}
66
67impl HttpToken {
68 #[must_use]
70 pub fn new(seq: u64) -> Self {
71 Self {
72 seq,
73 source_addr: None,
74 }
75 }
76
77 #[must_use]
79 pub fn with_source(seq: u64, addr: String) -> Self {
80 Self {
81 seq,
82 source_addr: Some(addr),
83 }
84 }
85}
86
87impl CommitToken for HttpToken {}
88
89impl std::fmt::Display for HttpToken {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 match &self.source_addr {
92 Some(addr) => write!(f, "http:{}:{}", addr, self.seq),
93 None => write!(f, "http:{}", self.seq),
94 }
95 }
96}
97
98fn default_recv_path() -> String {
99 "/ingest".to_string()
100}
101
102fn default_buffer_size() -> usize {
103 10_000
104}
105
106fn default_recv_timeout_ms() -> u64 {
107 100
108}
109
110fn default_connect_timeout_ms() -> u64 {
111 5_000
112}
113
114fn default_send_timeout_ms() -> u64 {
115 30_000
116}
117
118fn default_max_body_bytes() -> usize {
119 16 * 1024 * 1024
120}
121
122#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct HttpTransportConfig {
125 #[serde(default)]
127 pub endpoint: Option<String>,
128
129 #[serde(default)]
132 pub listen: Option<String>,
133
134 #[serde(default = "default_recv_path")]
136 pub recv_path: String,
137
138 #[serde(default = "default_buffer_size")]
140 pub recv_buffer_size: usize,
141
142 #[serde(default = "default_recv_timeout_ms")]
144 pub recv_timeout_ms: u64,
145
146 #[serde(default)]
148 pub filters_in: Vec<super::filter::FilterRule>,
149
150 #[serde(default)]
152 pub filters_out: Vec<super::filter::FilterRule>,
153
154 #[serde(default = "default_connect_timeout_ms")]
156 pub connect_timeout_ms: u64,
157
158 #[serde(default = "default_send_timeout_ms")]
161 pub send_timeout_ms: u64,
162
163 #[serde(default = "default_max_body_bytes")]
166 pub max_body_bytes: usize,
167
168 #[serde(default)]
173 pub tls_ca_path: Option<String>,
174}
175
176impl Default for HttpTransportConfig {
177 fn default() -> Self {
178 Self {
179 endpoint: None,
180 listen: None,
181 recv_path: default_recv_path(),
182 recv_buffer_size: default_buffer_size(),
183 recv_timeout_ms: default_recv_timeout_ms(),
184 filters_in: Vec::new(),
185 filters_out: Vec::new(),
186 connect_timeout_ms: default_connect_timeout_ms(),
187 send_timeout_ms: default_send_timeout_ms(),
188 max_body_bytes: default_max_body_bytes(),
189 tls_ca_path: None,
190 }
191 }
192}
193
194impl HttpTransportConfig {
195 #[must_use]
197 pub fn from_cascade() -> Self {
198 <Self as super::traits::FromCascade>::from_cascade_key("transport.http")
199 }
200
201 #[must_use]
203 pub fn sender(endpoint: &str) -> Self {
204 Self {
205 endpoint: Some(endpoint.to_string()),
206 ..Default::default()
207 }
208 }
209
210 #[must_use]
212 pub fn receiver(listen: &str) -> Self {
213 Self {
214 listen: Some(listen.to_string()),
215 ..Default::default()
216 }
217 }
218}
219
220pub struct HttpTransport {
225 client: reqwest::Client,
227
228 endpoint: Option<String>,
230
231 #[cfg(feature = "http-server")]
234 receiver: Option<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Message<HttpToken>>>>,
235
236 #[cfg(feature = "http-server")]
242 shutdown_tx: parking_lot::Mutex<Option<tokio::sync::oneshot::Sender<()>>>,
243
244 #[cfg(feature = "http-server")]
246 _server_handle: Option<tokio::task::JoinHandle<()>>,
247
248 closed: Arc<AtomicBool>,
250
251 #[cfg(feature = "http-server")]
253 recv_timeout_ms: u64,
254
255 filter_engine: super::filter::TransportFilterEngine,
257}
258
259impl HttpTransport {
260 pub async fn new(config: &HttpTransportConfig) -> TransportResult<Self> {
269 Self::new_inner(
271 config,
272 #[cfg(feature = "governor")]
273 None,
274 )
275 .await
276 }
277
278 #[cfg(feature = "governor")]
292 pub async fn with_pressure(
293 config: &HttpTransportConfig,
294 pressure: Option<Arc<crate::governor::UnifiedPressure>>,
295 ) -> TransportResult<Self> {
296 Self::new_inner(config, pressure).await
297 }
298
299 async fn new_inner(
300 config: &HttpTransportConfig,
301 #[cfg(feature = "governor")] pressure: Option<Arc<crate::governor::UnifiedPressure>>,
302 ) -> TransportResult<Self> {
303 #[cfg_attr(not(feature = "tls"), allow(unused_mut))]
305 let mut client_builder = reqwest::Client::builder()
306 .connect_timeout(std::time::Duration::from_millis(config.connect_timeout_ms))
307 .timeout(std::time::Duration::from_millis(config.send_timeout_ms));
308
309 #[cfg(feature = "tls")]
311 if let Some(ref ca) = config.tls_ca_path {
312 let trust = crate::tls::TlsTrust {
313 native_roots: true,
314 webpki_roots: false,
315 extra_roots: vec![ca.into()],
316 extra_intermediates: Vec::new(),
317 exclusive: false,
318 };
319 let tls_cfg =
320 crate::tls::build_client_config(crate::tls::TlsConfigSource::Trust(trust))
321 .map_err(|e| TransportError::Config(format!("HTTP client TLS: {e}")))?;
322 client_builder = client_builder.use_preconfigured_tls((*tls_cfg).clone());
323 }
324 #[cfg(not(feature = "tls"))]
325 if config.tls_ca_path.is_some() {
326 tracing::warn!(
327 "http transport tls_ca_path is set but the `tls` feature is disabled -- \
328 ignoring (using reqwest default roots)"
329 );
330 }
331
332 let client = client_builder
333 .build()
334 .map_err(|e| TransportError::Config(format!("failed to create HTTP client: {e}")))?;
335
336 #[cfg(feature = "http-server")]
337 let (receiver, shutdown_tx, server_handle) = if let Some(listen) = &config.listen {
338 let addr: std::net::SocketAddr = listen
339 .parse()
340 .map_err(|e| TransportError::Config(format!("invalid listen address: {e}")))?;
341
342 let (tx, rx) = tokio::sync::mpsc::channel(config.recv_buffer_size);
343 let (sd_tx, sd_rx) = tokio::sync::oneshot::channel::<()>();
344
345 let sequence = Arc::new(AtomicU64::new(0));
346 let recv_path = config.recv_path.clone();
347
348 let app = build_receiver_router(
349 tx,
350 sequence,
351 &recv_path,
352 config.max_body_bytes,
353 #[cfg(feature = "governor")]
354 pressure.clone(),
355 );
356
357 let listener = tokio::net::TcpListener::bind(addr).await.map_err(|e| {
358 TransportError::Connection(format!("failed to bind to {addr}: {e}"))
359 })?;
360
361 let handle = tokio::spawn(async move {
362 axum::serve(
363 listener,
364 app.into_make_service_with_connect_info::<std::net::SocketAddr>(),
365 )
366 .with_graceful_shutdown(async {
367 sd_rx.await.ok();
368 })
369 .await
370 .ok();
371 });
372
373 (Some(tokio::sync::Mutex::new(rx)), Some(sd_tx), Some(handle))
374 } else {
375 (None, None, None)
376 };
377
378 #[cfg(all(feature = "governor", not(feature = "http-server")))]
382 let _ = pressure;
383
384 #[cfg(feature = "logger")]
385 tracing::info!(
386 endpoint = ?config.endpoint,
387 listen = ?config.listen,
388 "HTTP transport opened"
389 );
390
391 let filter_engine = super::filter::TransportFilterEngine::new(
394 &config.filters_in,
395 &config.filters_out,
396 &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
397 )?;
398
399 let closed = Arc::new(AtomicBool::new(false));
400
401 #[cfg(feature = "health")]
402 {
403 let h = Arc::clone(&closed);
404 crate::health::HealthRegistry::register("transport:http", move || {
405 if h.load(Ordering::Relaxed) {
406 crate::health::HealthStatus::Unhealthy
407 } else {
408 crate::health::HealthStatus::Healthy
409 }
410 });
411 }
412
413 Ok(Self {
414 client,
415 endpoint: config.endpoint.clone(),
416 #[cfg(feature = "http-server")]
417 receiver,
418 #[cfg(feature = "http-server")]
419 shutdown_tx: parking_lot::Mutex::new(shutdown_tx),
420 #[cfg(feature = "http-server")]
421 _server_handle: server_handle,
422 closed,
423 #[cfg(feature = "http-server")]
424 recv_timeout_ms: config.recv_timeout_ms,
425 filter_engine,
426 })
427 }
428}
429
430#[cfg(feature = "http-server")]
432fn build_receiver_router(
433 sender: tokio::sync::mpsc::Sender<Message<HttpToken>>,
434 sequence: Arc<AtomicU64>,
435 recv_path: &str,
436 max_body_bytes: usize,
437 #[cfg(feature = "governor")] pressure: Option<Arc<crate::governor::UnifiedPressure>>,
438) -> axum::Router {
439 use axum::routing::post;
440
441 let state = ReceiverState {
442 sender,
443 sequence,
444 #[cfg(feature = "governor")]
445 pressure,
446 };
447
448 axum::Router::new()
449 .route(recv_path, post(ingest_handler))
450 .layer(axum::extract::DefaultBodyLimit::max(max_body_bytes))
452 .with_state(state)
453}
454
455#[cfg(feature = "http-server")]
457#[derive(Clone)]
458struct ReceiverState {
459 sender: tokio::sync::mpsc::Sender<Message<HttpToken>>,
460 sequence: Arc<AtomicU64>,
461 #[cfg(feature = "governor")]
467 pressure: Option<Arc<crate::governor::UnifiedPressure>>,
468}
469
470#[cfg(feature = "http-server")]
472async fn ingest_handler(
473 axum::extract::State(state): axum::extract::State<ReceiverState>,
474 axum::extract::ConnectInfo(addr): axum::extract::ConnectInfo<std::net::SocketAddr>,
475 headers: axum::http::HeaderMap,
476 body: axum::body::Bytes,
477) -> axum::response::Response {
478 use axum::response::IntoResponse as _;
479
480 if body.is_empty() {
481 return axum::http::StatusCode::BAD_REQUEST.into_response();
482 }
483
484 #[cfg(feature = "governor")]
489 if let Some(pressure) = &state.pressure
490 && pressure.should_hold()
491 {
492 #[cfg(feature = "metrics")]
493 metrics::counter!("dfe_transport_backpressured_total", "transport" => "http", "reason" => "pressure")
494 .increment(1);
495 return shed_503();
496 }
497
498 #[cfg(feature = "transport-trace")]
500 if let Some(tp) = headers
501 .get(super::propagation::TRACEPARENT_HEADER)
502 .and_then(|v| v.to_str().ok())
503 && super::propagation::is_valid_traceparent(tp)
504 {
505 tracing::Span::current().record("traceparent", tp);
506 }
507
508 #[cfg(not(feature = "otel"))]
510 let _ = &headers;
511
512 let seq = state.sequence.fetch_add(1, Ordering::Relaxed);
513 let format = PayloadFormat::detect(&body);
514 let timestamp_ms = chrono::Utc::now().timestamp_millis();
515
516 let msg = Message {
519 key: None,
520 payload: body,
521 token: HttpToken::with_source(seq, addr.to_string()),
522 timestamp_ms: Some(timestamp_ms),
523 format,
524 };
525
526 match state.sender.try_send(msg) {
527 Ok(()) => {
528 #[cfg(feature = "metrics")]
529 metrics::counter!("dfe_transport_sent_total", "transport" => "http").increment(1);
530 axum::http::StatusCode::OK.into_response()
531 }
532 Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
533 #[cfg(feature = "metrics")]
534 metrics::counter!("dfe_transport_backpressured_total", "transport" => "http")
535 .increment(1);
536 shed_503()
537 }
538 Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
539 #[cfg(feature = "metrics")]
540 metrics::counter!("dfe_transport_refused_total", "transport" => "http").increment(1);
541 axum::http::StatusCode::GONE.into_response()
542 }
543 }
544}
545
546#[cfg(feature = "http-server")]
549fn shed_503() -> axum::response::Response {
550 use axum::response::IntoResponse as _;
551 (
552 axum::http::StatusCode::SERVICE_UNAVAILABLE,
553 [(axum::http::header::RETRY_AFTER, "1")],
554 )
555 .into_response()
556}
557
558impl TransportBase for HttpTransport {
559 async fn close(&self) -> TransportResult<()> {
560 self.closed.store(true, Ordering::Relaxed);
561 #[cfg(feature = "http-server")]
564 if let Some(tx) = self.shutdown_tx.lock().take() {
565 let _ = tx.send(());
566 }
567 Ok(())
568 }
569
570 fn is_healthy(&self) -> bool {
571 !self.closed.load(Ordering::Relaxed)
572 }
573
574 fn name(&self) -> &'static str {
575 "http"
576 }
577}
578
579impl TransportSender for HttpTransport {
580 async fn send(&self, key: &str, payload: bytes::Bytes) -> SendResult {
581 if self.closed.load(Ordering::Relaxed) {
582 return SendResult::Fatal(TransportError::Closed);
583 }
584
585 if self.filter_engine.has_outbound_filters() {
587 match self.filter_engine.apply_outbound(&payload) {
588 super::filter::FilterDisposition::Pass => {}
589 super::filter::FilterDisposition::Drop => return SendResult::Ok,
590 super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
591 }
592 }
593
594 let Some(base_url) = &self.endpoint else {
595 return SendResult::Fatal(TransportError::Config(
596 "no endpoint configured for sending".into(),
597 ));
598 };
599
600 let url = if key.is_empty() {
602 base_url.clone()
603 } else {
604 let base = base_url.trim_end_matches('/');
605 let suffix = key.trim_start_matches('/');
606 format!("{base}/{suffix}")
607 };
608
609 #[cfg(feature = "metrics")]
610 let start = std::time::Instant::now();
611
612 let request_builder = self
614 .client
615 .post(&url)
616 .header("content-type", "application/octet-stream");
617
618 #[cfg(feature = "transport-trace")]
619 let request_builder = if let Some(tp) = super::propagation::current_traceparent() {
620 request_builder.header(super::propagation::TRACEPARENT_HEADER, tp)
621 } else {
622 request_builder
623 };
624
625 #[cfg(feature = "logger")]
626 let payload_len = payload.len();
627 let result = match request_builder.body(payload).send().await {
628 Ok(resp) if resp.status().is_success() => {
629 #[cfg(feature = "logger")]
630 tracing::debug!(url = %url, bytes = payload_len, "HTTP transport: POST sent");
631
632 #[cfg(feature = "metrics")]
633 metrics::counter!("dfe_transport_sent_total", "transport" => "http").increment(1);
634 SendResult::Ok
635 }
636 Ok(resp)
637 if resp.status() == reqwest::StatusCode::TOO_MANY_REQUESTS
638 || resp.status() == reqwest::StatusCode::SERVICE_UNAVAILABLE =>
639 {
640 #[cfg(feature = "logger")]
641 tracing::warn!(status = %resp.status(), url = %url, "HTTP transport: backpressure");
642
643 #[cfg(feature = "metrics")]
644 metrics::counter!("dfe_transport_backpressured_total", "transport" => "http")
645 .increment(1);
646 SendResult::Backpressured
647 }
648 Ok(resp) => {
649 #[cfg(feature = "logger")]
650 tracing::warn!(status = %resp.status(), url = %url, "HTTP transport: send error");
651
652 #[cfg(feature = "metrics")]
653 metrics::counter!("dfe_transport_send_errors_total", "transport" => "http")
654 .increment(1);
655 SendResult::Fatal(TransportError::Send(format!(
656 "HTTP {} from {}",
657 resp.status(),
658 url
659 )))
660 }
661 Err(e) => {
662 #[cfg(feature = "logger")]
663 tracing::warn!(error = %e, url = %url, "HTTP transport: request failed");
664
665 #[cfg(feature = "metrics")]
666 metrics::counter!("dfe_transport_send_errors_total", "transport" => "http")
667 .increment(1);
668 SendResult::Fatal(TransportError::Send(format!("HTTP request failed: {e}")))
669 }
670 };
671
672 #[cfg(feature = "metrics")]
673 metrics::histogram!("dfe_transport_send_duration_seconds", "transport" => "http")
674 .record(start.elapsed().as_secs_f64());
675
676 result
677 }
678}
679
680impl TransportReceiver for HttpTransport {
681 type Token = HttpToken;
682
683 async fn recv(&self, max: usize) -> TransportResult<WorkBatch<Self::Token>> {
684 if self.closed.load(Ordering::Relaxed) {
685 return Err(TransportError::Closed);
686 }
687
688 #[cfg(feature = "http-server")]
689 {
690 let Some(receiver) = &self.receiver else {
691 return Err(TransportError::Config(
692 "no listen address configured for receiving".into(),
693 ));
694 };
695
696 let mut rx = receiver.lock().await;
697 let mut messages = Vec::with_capacity(max.min(100));
698
699 for _ in 0..max {
700 let result = if self.recv_timeout_ms == 0 {
701 match rx.try_recv() {
702 Ok(msg) => Some(msg),
703 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
704 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
705 return Err(TransportError::Closed);
706 }
707 }
708 } else if messages.is_empty() {
709 match tokio::time::timeout(
711 std::time::Duration::from_millis(self.recv_timeout_ms),
712 rx.recv(),
713 )
714 .await
715 {
716 Ok(Some(msg)) => Some(msg),
717 Ok(None) => return Err(TransportError::Closed),
718 Err(_) => break, }
720 } else {
721 match rx.try_recv() {
723 Ok(msg) => Some(msg),
724 Err(_) => break,
725 }
726 };
727
728 if let Some(msg) = result {
729 messages.push(msg);
730 }
731 }
732
733 let batch = self.filter_engine.partition_batch(
736 messages,
737 |m| m.payload.as_ref(),
738 |m| m.key.clone(),
739 |m| m.token.clone(),
740 );
741 let messages = batch.messages;
742 let dlq_entries = batch.dlq_entries;
743 let filtered_tokens = batch.filtered_tokens;
744
745 #[cfg(feature = "logger")]
746 if !messages.is_empty() {
747 tracing::debug!(messages = messages.len(), "HTTP transport: batch received");
748 }
749
750 #[cfg(feature = "metrics")]
753 if !messages.is_empty() {
754 metrics::counter!("dfe_transport_received_total", "transport" => "http")
755 .increment(messages.len() as u64);
756 }
757
758 Ok(RecvBatch {
759 messages,
760 dlq_entries,
761 filtered_tokens,
762 }
763 .into())
764 }
765
766 #[cfg(not(feature = "http-server"))]
767 {
768 let _ = max;
769 Err(TransportError::Config(
770 "HTTP receive requires the 'http-server' feature".into(),
771 ))
772 }
773 }
774
775 async fn commit(&self, _tokens: &[Self::Token]) -> TransportResult<()> {
776 Ok(())
778 }
779}
780
781impl Drop for HttpTransport {
782 fn drop(&mut self) {
783 #[cfg(feature = "http-server")]
784 if let Some(tx) = self.shutdown_tx.lock().take() {
785 let _ = tx.send(());
786 }
787 }
788}
789
790impl super::traits::FromCascade for HttpTransportConfig {}
791
792#[cfg(test)]
793mod tests {
794 use super::*;
795
796 #[test]
797 fn http_token_display() {
798 let token = HttpToken::new(42);
799 assert_eq!(format!("{token}"), "http:42");
800 }
801
802 #[test]
803 fn http_token_display_with_source() {
804 let token = HttpToken::with_source(7, "192.168.1.1:54321".to_string());
805 assert_eq!(format!("{token}"), "http:192.168.1.1:54321:7");
806 }
807
808 #[test]
809 fn config_defaults() {
810 let config = HttpTransportConfig::default();
811 assert!(config.endpoint.is_none());
812 assert!(config.listen.is_none());
813 assert_eq!(config.recv_path, "/ingest");
814 assert_eq!(config.recv_buffer_size, 10_000);
815 assert_eq!(config.recv_timeout_ms, 100);
816 }
817
818 #[test]
819 fn config_sender_helper() {
820 let config = HttpTransportConfig::sender("http://localhost:8080/ingest");
821 assert_eq!(
822 config.endpoint.as_deref(),
823 Some("http://localhost:8080/ingest")
824 );
825 assert!(config.listen.is_none());
826 }
827
828 #[test]
829 fn config_receiver_helper() {
830 let config = HttpTransportConfig::receiver("0.0.0.0:8080");
831 assert!(config.endpoint.is_none());
832 assert_eq!(config.listen.as_deref(), Some("0.0.0.0:8080"));
833 }
834
835 #[tokio::test]
836 async fn send_only_transport() {
837 let config = HttpTransportConfig::default();
839 let transport = HttpTransport::new(&config).await.unwrap();
840
841 assert!(transport.is_healthy());
842 assert_eq!(transport.name(), "http");
843
844 let result = transport
846 .send("test", bytes::Bytes::from_static(b"payload"))
847 .await;
848 assert!(result.is_fatal());
849
850 transport.commit(&[]).await.unwrap();
852 }
853
854 #[tokio::test]
855 async fn close_prevents_send() {
856 let config = HttpTransportConfig::sender("http://localhost:19999/test");
857 let transport = HttpTransport::new(&config).await.unwrap();
858
859 transport.close().await.unwrap();
860 assert!(!transport.is_healthy());
861
862 let result = transport
863 .send("test", bytes::Bytes::from_static(b"data"))
864 .await;
865 assert!(result.is_fatal());
866 }
867
868 #[tokio::test]
869 async fn close_prevents_recv() {
870 let config = HttpTransportConfig::default();
871 let transport = HttpTransport::new(&config).await.unwrap();
872
873 transport.close().await.unwrap();
874 let result = transport.recv(1).await;
875 assert!(result.is_err());
876 }
877
878 #[cfg(feature = "http-server")]
881 #[tokio::test]
882 async fn send_and_receive_roundtrip() {
883 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
885 let addr = listener.local_addr().unwrap();
886 drop(listener); let recv_config = HttpTransportConfig {
889 listen: Some(addr.to_string()),
890 recv_path: "/ingest".to_string(),
891 recv_buffer_size: 100,
892 recv_timeout_ms: 1000,
893 ..Default::default()
894 };
895 let receiver = HttpTransport::new(&recv_config).await.unwrap();
896
897 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
899
900 let send_config =
902 HttpTransportConfig::sender(&format!("http://127.0.0.1:{}/ingest", addr.port()));
903 let sender = HttpTransport::new(&send_config).await.unwrap();
904
905 let result = sender
906 .send("", bytes::Bytes::from_static(b"{\"msg\":\"hello\"}"))
907 .await;
908 assert!(result.is_ok(), "send failed: {result:?}");
909
910 let batch = receiver.recv(10).await.unwrap();
912 assert_eq!(batch.records.len(), 1);
913 assert_eq!(batch.records[0].payload.as_ref(), b"{\"msg\":\"hello\"}");
914 assert!(batch.commit_tokens[0].source_addr.is_some());
916
917 sender.close().await.unwrap();
919 receiver.close().await.unwrap();
920 }
921
922 #[cfg(feature = "http-server")]
924 #[tokio::test]
925 async fn receive_rejects_empty_body() {
926 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
927 let addr = listener.local_addr().unwrap();
928 drop(listener);
929
930 let recv_config = HttpTransportConfig {
931 listen: Some(addr.to_string()),
932 recv_timeout_ms: 200,
933 ..Default::default()
934 };
935 let receiver = HttpTransport::new(&recv_config).await.unwrap();
936 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
937
938 let client = reqwest::Client::new();
940 let resp = client
941 .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
942 .body(Vec::<u8>::new())
943 .send()
944 .await
945 .unwrap();
946
947 assert_eq!(resp.status(), reqwest::StatusCode::BAD_REQUEST);
948
949 let records = receiver.recv(10).await.unwrap().records;
951 assert!(records.is_empty());
952
953 receiver.close().await.unwrap();
954 }
955
956 #[cfg(feature = "http-server")]
958 #[tokio::test]
959 async fn receive_rejects_oversized_body() {
960 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
961 let addr = listener.local_addr().unwrap();
962 drop(listener);
963
964 let recv_config = HttpTransportConfig {
965 listen: Some(addr.to_string()),
966 recv_timeout_ms: 200,
967 max_body_bytes: 1024, ..Default::default()
969 };
970 let receiver = HttpTransport::new(&recv_config).await.unwrap();
971 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
972
973 let client = reqwest::Client::new();
975 let resp = client
976 .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
977 .body(vec![b'x'; 8 * 1024])
978 .send()
979 .await
980 .unwrap();
981
982 assert_eq!(resp.status(), reqwest::StatusCode::PAYLOAD_TOO_LARGE);
983
984 let records = receiver.recv(10).await.unwrap().records;
986 assert!(records.is_empty(), "oversized body must not be queued");
987
988 receiver.close().await.unwrap();
989 }
990
991 #[cfg(feature = "http-server")]
993 #[tokio::test]
994 async fn recv_without_listen_returns_error() {
995 let config = HttpTransportConfig::sender("http://localhost:9999");
996 let transport = HttpTransport::new(&config).await.unwrap();
997
998 let result = transport.recv(10).await;
999 assert!(result.is_err());
1000 }
1001
1002 #[cfg(all(feature = "http-server", feature = "governor"))]
1006 #[tokio::test]
1007 async fn pressure_high_sheds_with_503_and_none_is_normal() {
1008 use crate::governor::{Hysteresis, MemoryPressureSource, PressureSource, UnifiedPressure};
1009 use crate::memory::{MemoryGuard, MemoryGuardConfig};
1010
1011 {
1013 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1014 let addr = listener.local_addr().unwrap();
1015 drop(listener);
1016 let cfg = HttpTransportConfig {
1017 listen: Some(addr.to_string()),
1018 recv_timeout_ms: 200,
1019 ..Default::default()
1020 };
1021 let receiver = HttpTransport::new(&cfg).await.unwrap();
1022 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1023
1024 let client = reqwest::Client::new();
1025 let resp = client
1026 .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
1027 .body(b"{\"msg\":\"ok\"}".to_vec())
1028 .send()
1029 .await
1030 .unwrap();
1031 assert_eq!(
1032 resp.status(),
1033 reqwest::StatusCode::OK,
1034 "no governor -> accepted"
1035 );
1036 receiver.close().await.unwrap();
1037 }
1038
1039 {
1041 let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
1042 limit_bytes: 1000,
1043 pressure_threshold: 0.80,
1044 ..Default::default()
1045 }));
1046 guard.add_bytes(950); let src = MemoryPressureSource::new(Arc::clone(&guard));
1048 let pressure = Arc::new(UnifiedPressure::new(
1049 vec![Arc::new(src) as Arc<dyn PressureSource>],
1050 Hysteresis::new(0.80, 0.65).expect("valid band"),
1051 ));
1052 assert!(pressure.should_hold(), "pinned-high governor must hold");
1054
1055 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1056 let addr = listener.local_addr().unwrap();
1057 drop(listener);
1058 let cfg = HttpTransportConfig {
1059 listen: Some(addr.to_string()),
1060 recv_timeout_ms: 200,
1061 ..Default::default()
1062 };
1063 let receiver = HttpTransport::with_pressure(&cfg, Some(Arc::clone(&pressure)))
1064 .await
1065 .unwrap();
1066 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1067
1068 let client = reqwest::Client::new();
1069 let resp = client
1070 .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
1071 .body(b"{\"msg\":\"shed\"}".to_vec())
1072 .send()
1073 .await
1074 .unwrap();
1075 assert_eq!(
1076 resp.status(),
1077 reqwest::StatusCode::SERVICE_UNAVAILABLE,
1078 "pinned-high governor must shed with 503"
1079 );
1080 assert_eq!(
1083 resp.headers()
1084 .get(reqwest::header::RETRY_AFTER)
1085 .and_then(|v| v.to_str().ok()),
1086 Some("1"),
1087 "503 shed must carry Retry-After"
1088 );
1089
1090 let records = receiver.recv(10).await.unwrap().records;
1092 assert!(records.is_empty(), "shed request must not be queued");
1093 receiver.close().await.unwrap();
1094 }
1095 }
1096
1097 #[test]
1098 fn config_serde_roundtrip() {
1099 let config = HttpTransportConfig {
1100 endpoint: Some("http://example.com/ingest".into()),
1101 listen: Some("0.0.0.0:8080".into()),
1102 recv_path: "/custom".into(),
1103 recv_buffer_size: 5000,
1104 recv_timeout_ms: 250,
1105 ..Default::default()
1106 };
1107
1108 let json = serde_json::to_string(&config).unwrap();
1109 let parsed: HttpTransportConfig = serde_json::from_str(&json).unwrap();
1110
1111 assert_eq!(parsed.endpoint, config.endpoint);
1112 assert_eq!(parsed.listen, config.listen);
1113 assert_eq!(parsed.recv_path, config.recv_path);
1114 assert_eq!(parsed.recv_buffer_size, config.recv_buffer_size);
1115 assert_eq!(parsed.recv_timeout_ms, config.recv_timeout_ms);
1116 }
1117}