1use super::error::{TransportError, TransportResult};
38#[cfg(feature = "http-server")]
39use super::traits::RecvBatch;
40use super::traits::{CommitToken, TransportBase, TransportReceiver, TransportSender};
41#[cfg(feature = "http-server")]
42use super::types::Message;
43#[cfg(feature = "http-server")]
44use super::types::PayloadFormat;
45use super::types::SendResult;
46use super::work_batch::WorkBatch;
47use serde::{Deserialize, Serialize};
48use std::sync::Arc;
49#[cfg(feature = "http-server")]
50use std::sync::atomic::AtomicU64;
51use std::sync::atomic::{AtomicBool, Ordering};
52
53#[derive(Debug, Clone)]
59pub struct HttpToken {
60 pub seq: u64,
62
63 pub source_addr: Option<String>,
65}
66
67impl HttpToken {
68 #[must_use]
70 pub fn new(seq: u64) -> Self {
71 Self {
72 seq,
73 source_addr: None,
74 }
75 }
76
77 #[must_use]
79 pub fn with_source(seq: u64, addr: String) -> Self {
80 Self {
81 seq,
82 source_addr: Some(addr),
83 }
84 }
85}
86
87impl CommitToken for HttpToken {}
88
89impl std::fmt::Display for HttpToken {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 match &self.source_addr {
92 Some(addr) => write!(f, "http:{}:{}", addr, self.seq),
93 None => write!(f, "http:{}", self.seq),
94 }
95 }
96}
97
98fn default_recv_path() -> String {
99 "/ingest".to_string()
100}
101
102fn default_buffer_size() -> usize {
103 10_000
104}
105
106fn default_recv_timeout_ms() -> u64 {
107 100
108}
109
110fn default_connect_timeout_ms() -> u64 {
111 5_000
112}
113
114fn default_send_timeout_ms() -> u64 {
115 30_000
116}
117
118fn default_max_body_bytes() -> usize {
119 16 * 1024 * 1024
120}
121
122#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct HttpTransportConfig {
125 #[serde(default)]
127 pub endpoint: Option<String>,
128
129 #[serde(default)]
132 pub listen: Option<String>,
133
134 #[serde(default = "default_recv_path")]
136 pub recv_path: String,
137
138 #[serde(default = "default_buffer_size")]
140 pub recv_buffer_size: usize,
141
142 #[serde(default = "default_recv_timeout_ms")]
144 pub recv_timeout_ms: u64,
145
146 #[serde(default)]
148 pub filters_in: Vec<super::filter::FilterRule>,
149
150 #[serde(default)]
152 pub filters_out: Vec<super::filter::FilterRule>,
153
154 #[serde(default = "default_connect_timeout_ms")]
156 pub connect_timeout_ms: u64,
157
158 #[serde(default = "default_send_timeout_ms")]
161 pub send_timeout_ms: u64,
162
163 #[serde(default = "default_max_body_bytes")]
166 pub max_body_bytes: usize,
167
168 #[serde(default)]
173 pub tls_ca_path: Option<String>,
174}
175
176impl Default for HttpTransportConfig {
177 fn default() -> Self {
178 Self {
179 endpoint: None,
180 listen: None,
181 recv_path: default_recv_path(),
182 recv_buffer_size: default_buffer_size(),
183 recv_timeout_ms: default_recv_timeout_ms(),
184 filters_in: Vec::new(),
185 filters_out: Vec::new(),
186 connect_timeout_ms: default_connect_timeout_ms(),
187 send_timeout_ms: default_send_timeout_ms(),
188 max_body_bytes: default_max_body_bytes(),
189 tls_ca_path: None,
190 }
191 }
192}
193
194impl HttpTransportConfig {
195 #[must_use]
197 pub fn from_cascade() -> Self {
198 <Self as super::traits::FromCascade>::from_cascade_key("transport.http")
199 }
200
201 #[must_use]
203 pub fn sender(endpoint: &str) -> Self {
204 Self {
205 endpoint: Some(endpoint.to_string()),
206 ..Default::default()
207 }
208 }
209
210 #[must_use]
212 pub fn receiver(listen: &str) -> Self {
213 Self {
214 listen: Some(listen.to_string()),
215 ..Default::default()
216 }
217 }
218}
219
220pub struct HttpTransport {
225 client: reqwest::Client,
227
228 endpoint: Option<String>,
230
231 #[cfg(feature = "http-server")]
234 receiver: Option<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Message<HttpToken>>>>,
235
236 #[cfg(feature = "http-server")]
242 shutdown_tx: parking_lot::Mutex<Option<tokio::sync::oneshot::Sender<()>>>,
243
244 #[cfg(feature = "http-server")]
246 _server_handle: Option<tokio::task::JoinHandle<()>>,
247
248 closed: Arc<AtomicBool>,
250
251 #[cfg(feature = "http-server")]
253 recv_timeout_ms: u64,
254
255 filter_engine: super::filter::TransportFilterEngine,
257}
258
259impl HttpTransport {
260 pub async fn new(config: &HttpTransportConfig) -> TransportResult<Self> {
269 Self::new_inner(
271 config,
272 #[cfg(feature = "governor")]
273 None,
274 )
275 .await
276 }
277
278 #[cfg(feature = "governor")]
292 pub async fn with_pressure(
293 config: &HttpTransportConfig,
294 pressure: Option<Arc<crate::governor::UnifiedPressure>>,
295 ) -> TransportResult<Self> {
296 Self::new_inner(config, pressure).await
297 }
298
299 async fn new_inner(
300 config: &HttpTransportConfig,
301 #[cfg(feature = "governor")] pressure: Option<Arc<crate::governor::UnifiedPressure>>,
302 ) -> TransportResult<Self> {
303 #[cfg_attr(not(feature = "tls"), allow(unused_mut))]
305 let mut client_builder = reqwest::Client::builder()
306 .connect_timeout(std::time::Duration::from_millis(config.connect_timeout_ms))
307 .timeout(std::time::Duration::from_millis(config.send_timeout_ms));
308
309 #[cfg(feature = "tls")]
311 if let Some(ref ca) = config.tls_ca_path {
312 let trust = crate::tls::TlsTrust {
313 native_roots: true,
314 webpki_roots: false,
315 extra_roots: vec![ca.into()],
316 extra_intermediates: Vec::new(),
317 exclusive: false,
318 };
319 let tls_cfg =
320 crate::tls::build_client_config(crate::tls::TlsConfigSource::Trust(trust))
321 .map_err(|e| TransportError::Config(format!("HTTP client TLS: {e}")))?;
322 client_builder = client_builder.use_preconfigured_tls((*tls_cfg).clone());
323 }
324 #[cfg(not(feature = "tls"))]
325 if config.tls_ca_path.is_some() {
326 tracing::warn!(
327 "http transport tls_ca_path is set but the `tls` feature is disabled -- \
328 ignoring (using reqwest default roots)"
329 );
330 }
331
332 let client = client_builder
333 .build()
334 .map_err(|e| TransportError::Config(format!("failed to create HTTP client: {e}")))?;
335
336 #[cfg(feature = "http-server")]
337 let (receiver, shutdown_tx, server_handle) = if let Some(listen) = &config.listen {
338 let addr: std::net::SocketAddr = listen
339 .parse()
340 .map_err(|e| TransportError::Config(format!("invalid listen address: {e}")))?;
341
342 let (tx, rx) = tokio::sync::mpsc::channel(config.recv_buffer_size);
343 let (sd_tx, sd_rx) = tokio::sync::oneshot::channel::<()>();
344
345 let sequence = Arc::new(AtomicU64::new(0));
346 let recv_path = config.recv_path.clone();
347
348 let app = build_receiver_router(
349 tx,
350 sequence,
351 &recv_path,
352 config.max_body_bytes,
353 #[cfg(feature = "governor")]
354 pressure.clone(),
355 );
356
357 let listener = tokio::net::TcpListener::bind(addr).await.map_err(|e| {
358 TransportError::Connection(format!("failed to bind to {addr}: {e}"))
359 })?;
360
361 let handle = tokio::spawn(async move {
362 axum::serve(
363 listener,
364 app.into_make_service_with_connect_info::<std::net::SocketAddr>(),
365 )
366 .with_graceful_shutdown(async {
367 sd_rx.await.ok();
368 })
369 .await
370 .ok();
371 });
372
373 (Some(tokio::sync::Mutex::new(rx)), Some(sd_tx), Some(handle))
374 } else {
375 (None, None, None)
376 };
377
378 #[cfg(all(feature = "governor", not(feature = "http-server")))]
382 let _ = pressure;
383
384 #[cfg(feature = "logger")]
385 tracing::info!(
386 endpoint = ?config.endpoint,
387 listen = ?config.listen,
388 "HTTP transport opened"
389 );
390
391 let filter_engine = super::filter::TransportFilterEngine::new(
394 &config.filters_in,
395 &config.filters_out,
396 &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
397 )?;
398
399 let closed = Arc::new(AtomicBool::new(false));
400
401 #[cfg(feature = "health")]
402 {
403 let h = Arc::clone(&closed);
404 crate::health::HealthRegistry::register("transport:http", move || {
405 if h.load(Ordering::Relaxed) {
406 crate::health::HealthStatus::Unhealthy
407 } else {
408 crate::health::HealthStatus::Healthy
409 }
410 });
411 }
412
413 Ok(Self {
414 client,
415 endpoint: config.endpoint.clone(),
416 #[cfg(feature = "http-server")]
417 receiver,
418 #[cfg(feature = "http-server")]
419 shutdown_tx: parking_lot::Mutex::new(shutdown_tx),
420 #[cfg(feature = "http-server")]
421 _server_handle: server_handle,
422 closed,
423 #[cfg(feature = "http-server")]
424 recv_timeout_ms: config.recv_timeout_ms,
425 filter_engine,
426 })
427 }
428}
429
430#[cfg(feature = "http-server")]
432fn build_receiver_router(
433 sender: tokio::sync::mpsc::Sender<Message<HttpToken>>,
434 sequence: Arc<AtomicU64>,
435 recv_path: &str,
436 max_body_bytes: usize,
437 #[cfg(feature = "governor")] pressure: Option<Arc<crate::governor::UnifiedPressure>>,
438) -> axum::Router {
439 use axum::routing::post;
440
441 let state = ReceiverState {
442 sender,
443 sequence,
444 #[cfg(feature = "governor")]
445 pressure,
446 };
447
448 axum::Router::new()
449 .route(recv_path, post(ingest_handler))
450 .layer(axum::extract::DefaultBodyLimit::max(max_body_bytes))
452 .with_state(state)
453}
454
455#[cfg(feature = "http-server")]
457#[derive(Clone)]
458struct ReceiverState {
459 sender: tokio::sync::mpsc::Sender<Message<HttpToken>>,
460 sequence: Arc<AtomicU64>,
461 #[cfg(feature = "governor")]
467 pressure: Option<Arc<crate::governor::UnifiedPressure>>,
468}
469
470#[cfg(feature = "http-server")]
472async fn ingest_handler(
473 axum::extract::State(state): axum::extract::State<ReceiverState>,
474 axum::extract::ConnectInfo(addr): axum::extract::ConnectInfo<std::net::SocketAddr>,
475 headers: axum::http::HeaderMap,
476 body: axum::body::Bytes,
477) -> axum::response::Response {
478 use axum::response::IntoResponse as _;
479
480 if body.is_empty() {
481 return axum::http::StatusCode::BAD_REQUEST.into_response();
482 }
483
484 #[cfg(feature = "governor")]
489 if let Some(pressure) = &state.pressure
490 && pressure.should_hold()
491 {
492 #[cfg(feature = "metrics")]
493 metrics::counter!("dfe_transport_backpressured_total", "transport" => "http", "reason" => "pressure")
494 .increment(1);
495 return shed_503();
496 }
497
498 #[cfg(feature = "transport-trace")]
500 if let Some(tp) = headers
501 .get(super::propagation::TRACEPARENT_HEADER)
502 .and_then(|v| v.to_str().ok())
503 && super::propagation::is_valid_traceparent(tp)
504 {
505 tracing::Span::current().record("traceparent", tp);
506 }
507
508 #[cfg(not(feature = "otel"))]
510 let _ = &headers;
511
512 let seq = state.sequence.fetch_add(1, Ordering::Relaxed);
513 let format = PayloadFormat::detect(&body);
514 let timestamp_ms = chrono::Utc::now().timestamp_millis();
515
516 let msg = Message {
519 key: None,
520 payload: body,
521 token: HttpToken::with_source(seq, addr.to_string()),
522 timestamp_ms: Some(timestamp_ms),
523 format,
524 };
525
526 match state.sender.try_send(msg) {
527 Ok(()) => {
528 #[cfg(feature = "metrics")]
529 metrics::counter!("dfe_transport_sent_total", "transport" => "http").increment(1);
530 axum::http::StatusCode::OK.into_response()
531 }
532 Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
533 #[cfg(feature = "metrics")]
534 metrics::counter!("dfe_transport_backpressured_total", "transport" => "http")
535 .increment(1);
536 shed_503()
537 }
538 Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
539 #[cfg(feature = "metrics")]
540 metrics::counter!("dfe_transport_refused_total", "transport" => "http").increment(1);
541 axum::http::StatusCode::GONE.into_response()
542 }
543 }
544}
545
546#[cfg(feature = "http-server")]
549fn shed_503() -> axum::response::Response {
550 use axum::response::IntoResponse as _;
551 (
552 axum::http::StatusCode::SERVICE_UNAVAILABLE,
553 [(axum::http::header::RETRY_AFTER, "1")],
554 )
555 .into_response()
556}
557
558impl TransportBase for HttpTransport {
559 async fn close(&self) -> TransportResult<()> {
560 self.closed.store(true, Ordering::Relaxed);
561 #[cfg(feature = "http-server")]
564 if let Some(tx) = self.shutdown_tx.lock().take() {
565 let _ = tx.send(());
566 }
567 Ok(())
568 }
569
570 fn is_healthy(&self) -> bool {
571 !self.closed.load(Ordering::Relaxed)
572 }
573
574 fn name(&self) -> &'static str {
575 "http"
576 }
577}
578
579impl TransportSender for HttpTransport {
580 async fn send(&self, key: &str, payload: bytes::Bytes) -> SendResult {
581 if self.closed.load(Ordering::Relaxed) {
582 return SendResult::Fatal(TransportError::Closed);
583 }
584
585 if self.filter_engine.has_outbound_filters() {
587 match self.filter_engine.apply_outbound(&payload) {
588 super::filter::FilterDisposition::Pass => {}
589 super::filter::FilterDisposition::Drop => return SendResult::Ok,
590 super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
591 }
592 }
593
594 let Some(base_url) = &self.endpoint else {
595 return SendResult::Fatal(TransportError::Config(
596 "no endpoint configured for sending".into(),
597 ));
598 };
599
600 let url = if key.is_empty() {
602 base_url.clone()
603 } else {
604 let base = base_url.trim_end_matches('/');
605 let suffix = key.trim_start_matches('/');
606 format!("{base}/{suffix}")
607 };
608
609 #[cfg(feature = "metrics")]
610 let start = std::time::Instant::now();
611
612 let request_builder = self
614 .client
615 .post(&url)
616 .header("content-type", "application/octet-stream");
617
618 #[cfg(feature = "transport-trace")]
619 let request_builder = if let Some(tp) = super::propagation::current_traceparent() {
620 request_builder.header(super::propagation::TRACEPARENT_HEADER, tp)
621 } else {
622 request_builder
623 };
624
625 #[cfg(feature = "logger")]
626 let payload_len = payload.len();
627 let result = match request_builder.body(payload).send().await {
628 Ok(resp) if resp.status().is_success() => {
629 #[cfg(feature = "logger")]
630 tracing::debug!(url = %url, bytes = payload_len, "HTTP transport: POST sent");
631
632 #[cfg(feature = "metrics")]
633 metrics::counter!("dfe_transport_sent_total", "transport" => "http").increment(1);
634 SendResult::Ok
635 }
636 Ok(resp)
637 if resp.status() == reqwest::StatusCode::TOO_MANY_REQUESTS
638 || resp.status() == reqwest::StatusCode::SERVICE_UNAVAILABLE =>
639 {
640 #[cfg(feature = "logger")]
641 tracing::warn!(status = %resp.status(), url = %url, "HTTP transport: backpressure");
642
643 #[cfg(feature = "metrics")]
644 metrics::counter!("dfe_transport_backpressured_total", "transport" => "http")
645 .increment(1);
646 SendResult::Backpressured
647 }
648 Ok(resp) => {
649 #[cfg(feature = "logger")]
650 tracing::warn!(status = %resp.status(), url = %url, "HTTP transport: send error");
651
652 #[cfg(feature = "metrics")]
653 metrics::counter!("dfe_transport_send_errors_total", "transport" => "http")
654 .increment(1);
655 SendResult::Fatal(TransportError::Send(format!(
656 "HTTP {} from {}",
657 resp.status(),
658 url
659 )))
660 }
661 Err(e) => {
662 #[cfg(feature = "logger")]
663 tracing::warn!(error = %e, url = %url, "HTTP transport: request failed");
664
665 #[cfg(feature = "metrics")]
666 metrics::counter!("dfe_transport_send_errors_total", "transport" => "http")
667 .increment(1);
668 SendResult::Fatal(TransportError::Send(format!("HTTP request failed: {e}")))
669 }
670 };
671
672 #[cfg(feature = "metrics")]
673 metrics::histogram!("dfe_transport_send_duration_seconds", "transport" => "http")
674 .record(start.elapsed().as_secs_f64());
675
676 result
677 }
678}
679
680impl TransportReceiver for HttpTransport {
681 type Token = HttpToken;
682
683 async fn recv(&self, max: usize) -> TransportResult<WorkBatch<Self::Token>> {
684 if self.closed.load(Ordering::Relaxed) {
685 return Err(TransportError::Closed);
686 }
687
688 #[cfg(feature = "http-server")]
689 {
690 let Some(receiver) = &self.receiver else {
691 return Err(TransportError::Config(
692 "no listen address configured for receiving".into(),
693 ));
694 };
695
696 let mut rx = receiver.lock().await;
697 let mut messages = Vec::with_capacity(max.min(100));
698
699 for _ in 0..max {
700 let result = if self.recv_timeout_ms == 0 {
701 match rx.try_recv() {
702 Ok(msg) => Some(msg),
703 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
704 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
705 return Err(TransportError::Closed);
706 }
707 }
708 } else if messages.is_empty() {
709 match tokio::time::timeout(
711 std::time::Duration::from_millis(self.recv_timeout_ms),
712 rx.recv(),
713 )
714 .await
715 {
716 Ok(Some(msg)) => Some(msg),
717 Ok(None) => return Err(TransportError::Closed),
718 Err(_) => break, }
720 } else {
721 match rx.try_recv() {
723 Ok(msg) => Some(msg),
724 Err(_) => break,
725 }
726 };
727
728 if let Some(msg) = result {
729 messages.push(msg);
730 }
731 }
732
733 let batch = self.filter_engine.partition_batch(
736 messages,
737 |m| m.payload.as_ref(),
738 |m| m.key.clone(),
739 |m| m.token.clone(),
740 );
741 let messages = batch.messages;
742 let dlq_entries = batch.dlq_entries;
743 let filtered_tokens = batch.filtered_tokens;
744
745 #[cfg(feature = "logger")]
746 if !messages.is_empty() {
747 tracing::debug!(messages = messages.len(), "HTTP transport: batch received");
748 }
749
750 Ok(RecvBatch {
751 messages,
752 dlq_entries,
753 filtered_tokens,
754 }
755 .into())
756 }
757
758 #[cfg(not(feature = "http-server"))]
759 {
760 let _ = max;
761 Err(TransportError::Config(
762 "HTTP receive requires the 'http-server' feature".into(),
763 ))
764 }
765 }
766
767 async fn commit(&self, _tokens: &[Self::Token]) -> TransportResult<()> {
768 Ok(())
770 }
771}
772
773impl Drop for HttpTransport {
774 fn drop(&mut self) {
775 #[cfg(feature = "http-server")]
776 if let Some(tx) = self.shutdown_tx.lock().take() {
777 let _ = tx.send(());
778 }
779 }
780}
781
782impl super::traits::FromCascade for HttpTransportConfig {}
783
784#[cfg(test)]
785mod tests {
786 use super::*;
787
788 #[test]
789 fn http_token_display() {
790 let token = HttpToken::new(42);
791 assert_eq!(format!("{token}"), "http:42");
792 }
793
794 #[test]
795 fn http_token_display_with_source() {
796 let token = HttpToken::with_source(7, "192.168.1.1:54321".to_string());
797 assert_eq!(format!("{token}"), "http:192.168.1.1:54321:7");
798 }
799
800 #[test]
801 fn config_defaults() {
802 let config = HttpTransportConfig::default();
803 assert!(config.endpoint.is_none());
804 assert!(config.listen.is_none());
805 assert_eq!(config.recv_path, "/ingest");
806 assert_eq!(config.recv_buffer_size, 10_000);
807 assert_eq!(config.recv_timeout_ms, 100);
808 }
809
810 #[test]
811 fn config_sender_helper() {
812 let config = HttpTransportConfig::sender("http://localhost:8080/ingest");
813 assert_eq!(
814 config.endpoint.as_deref(),
815 Some("http://localhost:8080/ingest")
816 );
817 assert!(config.listen.is_none());
818 }
819
820 #[test]
821 fn config_receiver_helper() {
822 let config = HttpTransportConfig::receiver("0.0.0.0:8080");
823 assert!(config.endpoint.is_none());
824 assert_eq!(config.listen.as_deref(), Some("0.0.0.0:8080"));
825 }
826
827 #[tokio::test]
828 async fn send_only_transport() {
829 let config = HttpTransportConfig::default();
831 let transport = HttpTransport::new(&config).await.unwrap();
832
833 assert!(transport.is_healthy());
834 assert_eq!(transport.name(), "http");
835
836 let result = transport
838 .send("test", bytes::Bytes::from_static(b"payload"))
839 .await;
840 assert!(result.is_fatal());
841
842 transport.commit(&[]).await.unwrap();
844 }
845
846 #[tokio::test]
847 async fn close_prevents_send() {
848 let config = HttpTransportConfig::sender("http://localhost:19999/test");
849 let transport = HttpTransport::new(&config).await.unwrap();
850
851 transport.close().await.unwrap();
852 assert!(!transport.is_healthy());
853
854 let result = transport
855 .send("test", bytes::Bytes::from_static(b"data"))
856 .await;
857 assert!(result.is_fatal());
858 }
859
860 #[tokio::test]
861 async fn close_prevents_recv() {
862 let config = HttpTransportConfig::default();
863 let transport = HttpTransport::new(&config).await.unwrap();
864
865 transport.close().await.unwrap();
866 let result = transport.recv(1).await;
867 assert!(result.is_err());
868 }
869
870 #[cfg(feature = "http-server")]
873 #[tokio::test]
874 async fn send_and_receive_roundtrip() {
875 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
877 let addr = listener.local_addr().unwrap();
878 drop(listener); let recv_config = HttpTransportConfig {
881 listen: Some(addr.to_string()),
882 recv_path: "/ingest".to_string(),
883 recv_buffer_size: 100,
884 recv_timeout_ms: 1000,
885 ..Default::default()
886 };
887 let receiver = HttpTransport::new(&recv_config).await.unwrap();
888
889 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
891
892 let send_config =
894 HttpTransportConfig::sender(&format!("http://127.0.0.1:{}/ingest", addr.port()));
895 let sender = HttpTransport::new(&send_config).await.unwrap();
896
897 let result = sender
898 .send("", bytes::Bytes::from_static(b"{\"msg\":\"hello\"}"))
899 .await;
900 assert!(result.is_ok(), "send failed: {result:?}");
901
902 let batch = receiver.recv(10).await.unwrap();
904 assert_eq!(batch.records.len(), 1);
905 assert_eq!(batch.records[0].payload.as_ref(), b"{\"msg\":\"hello\"}");
906 assert!(batch.commit_tokens[0].source_addr.is_some());
908
909 sender.close().await.unwrap();
911 receiver.close().await.unwrap();
912 }
913
914 #[cfg(feature = "http-server")]
916 #[tokio::test]
917 async fn receive_rejects_empty_body() {
918 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
919 let addr = listener.local_addr().unwrap();
920 drop(listener);
921
922 let recv_config = HttpTransportConfig {
923 listen: Some(addr.to_string()),
924 recv_timeout_ms: 200,
925 ..Default::default()
926 };
927 let receiver = HttpTransport::new(&recv_config).await.unwrap();
928 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
929
930 let client = reqwest::Client::new();
932 let resp = client
933 .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
934 .body(Vec::<u8>::new())
935 .send()
936 .await
937 .unwrap();
938
939 assert_eq!(resp.status(), reqwest::StatusCode::BAD_REQUEST);
940
941 let records = receiver.recv(10).await.unwrap().records;
943 assert!(records.is_empty());
944
945 receiver.close().await.unwrap();
946 }
947
948 #[cfg(feature = "http-server")]
950 #[tokio::test]
951 async fn receive_rejects_oversized_body() {
952 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
953 let addr = listener.local_addr().unwrap();
954 drop(listener);
955
956 let recv_config = HttpTransportConfig {
957 listen: Some(addr.to_string()),
958 recv_timeout_ms: 200,
959 max_body_bytes: 1024, ..Default::default()
961 };
962 let receiver = HttpTransport::new(&recv_config).await.unwrap();
963 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
964
965 let client = reqwest::Client::new();
967 let resp = client
968 .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
969 .body(vec![b'x'; 8 * 1024])
970 .send()
971 .await
972 .unwrap();
973
974 assert_eq!(resp.status(), reqwest::StatusCode::PAYLOAD_TOO_LARGE);
975
976 let records = receiver.recv(10).await.unwrap().records;
978 assert!(records.is_empty(), "oversized body must not be queued");
979
980 receiver.close().await.unwrap();
981 }
982
983 #[cfg(feature = "http-server")]
985 #[tokio::test]
986 async fn recv_without_listen_returns_error() {
987 let config = HttpTransportConfig::sender("http://localhost:9999");
988 let transport = HttpTransport::new(&config).await.unwrap();
989
990 let result = transport.recv(10).await;
991 assert!(result.is_err());
992 }
993
994 #[cfg(all(feature = "http-server", feature = "governor"))]
998 #[tokio::test]
999 async fn pressure_high_sheds_with_503_and_none_is_normal() {
1000 use crate::governor::{Hysteresis, MemoryPressureSource, PressureSource, UnifiedPressure};
1001 use crate::memory::{MemoryGuard, MemoryGuardConfig};
1002
1003 {
1005 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1006 let addr = listener.local_addr().unwrap();
1007 drop(listener);
1008 let cfg = HttpTransportConfig {
1009 listen: Some(addr.to_string()),
1010 recv_timeout_ms: 200,
1011 ..Default::default()
1012 };
1013 let receiver = HttpTransport::new(&cfg).await.unwrap();
1014 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1015
1016 let client = reqwest::Client::new();
1017 let resp = client
1018 .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
1019 .body(b"{\"msg\":\"ok\"}".to_vec())
1020 .send()
1021 .await
1022 .unwrap();
1023 assert_eq!(
1024 resp.status(),
1025 reqwest::StatusCode::OK,
1026 "no governor -> accepted"
1027 );
1028 receiver.close().await.unwrap();
1029 }
1030
1031 {
1033 let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
1034 limit_bytes: 1000,
1035 pressure_threshold: 0.80,
1036 ..Default::default()
1037 }));
1038 guard.add_bytes(950); let src = MemoryPressureSource::new(Arc::clone(&guard));
1040 let pressure = Arc::new(UnifiedPressure::new(
1041 vec![Arc::new(src) as Arc<dyn PressureSource>],
1042 Hysteresis::new(0.80, 0.65).expect("valid band"),
1043 ));
1044 assert!(pressure.should_hold(), "pinned-high governor must hold");
1046
1047 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1048 let addr = listener.local_addr().unwrap();
1049 drop(listener);
1050 let cfg = HttpTransportConfig {
1051 listen: Some(addr.to_string()),
1052 recv_timeout_ms: 200,
1053 ..Default::default()
1054 };
1055 let receiver = HttpTransport::with_pressure(&cfg, Some(Arc::clone(&pressure)))
1056 .await
1057 .unwrap();
1058 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1059
1060 let client = reqwest::Client::new();
1061 let resp = client
1062 .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
1063 .body(b"{\"msg\":\"shed\"}".to_vec())
1064 .send()
1065 .await
1066 .unwrap();
1067 assert_eq!(
1068 resp.status(),
1069 reqwest::StatusCode::SERVICE_UNAVAILABLE,
1070 "pinned-high governor must shed with 503"
1071 );
1072 assert_eq!(
1075 resp.headers()
1076 .get(reqwest::header::RETRY_AFTER)
1077 .and_then(|v| v.to_str().ok()),
1078 Some("1"),
1079 "503 shed must carry Retry-After"
1080 );
1081
1082 let records = receiver.recv(10).await.unwrap().records;
1084 assert!(records.is_empty(), "shed request must not be queued");
1085 receiver.close().await.unwrap();
1086 }
1087 }
1088
1089 #[test]
1090 fn config_serde_roundtrip() {
1091 let config = HttpTransportConfig {
1092 endpoint: Some("http://example.com/ingest".into()),
1093 listen: Some("0.0.0.0:8080".into()),
1094 recv_path: "/custom".into(),
1095 recv_buffer_size: 5000,
1096 recv_timeout_ms: 250,
1097 ..Default::default()
1098 };
1099
1100 let json = serde_json::to_string(&config).unwrap();
1101 let parsed: HttpTransportConfig = serde_json::from_str(&json).unwrap();
1102
1103 assert_eq!(parsed.endpoint, config.endpoint);
1104 assert_eq!(parsed.listen, config.listen);
1105 assert_eq!(parsed.recv_path, config.recv_path);
1106 assert_eq!(parsed.recv_buffer_size, config.recv_buffer_size);
1107 assert_eq!(parsed.recv_timeout_ms, config.recv_timeout_ms);
1108 }
1109}