1use super::error::{TransportError, TransportResult};
38#[cfg(feature = "http-server")]
39use super::traits::RecvBatch;
40use super::traits::{CommitToken, TransportBase, TransportReceiver, TransportSender};
41#[cfg(feature = "http-server")]
42use super::types::Message;
43#[cfg(feature = "http-server")]
44use super::types::PayloadFormat;
45use super::types::SendResult;
46use super::work_batch::WorkBatch;
47use serde::{Deserialize, Serialize};
48use std::sync::Arc;
49#[cfg(feature = "http-server")]
50use std::sync::atomic::AtomicU64;
51use std::sync::atomic::{AtomicBool, Ordering};
52
53#[derive(Debug, Clone)]
59pub struct HttpToken {
60 pub seq: u64,
62
63 pub source_addr: Option<String>,
65}
66
67impl HttpToken {
68 #[must_use]
70 pub fn new(seq: u64) -> Self {
71 Self {
72 seq,
73 source_addr: None,
74 }
75 }
76
77 #[must_use]
79 pub fn with_source(seq: u64, addr: String) -> Self {
80 Self {
81 seq,
82 source_addr: Some(addr),
83 }
84 }
85}
86
87impl CommitToken for HttpToken {}
88
89impl std::fmt::Display for HttpToken {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 match &self.source_addr {
92 Some(addr) => write!(f, "http:{}:{}", addr, self.seq),
93 None => write!(f, "http:{}", self.seq),
94 }
95 }
96}
97
98fn default_recv_path() -> String {
99 "/ingest".to_string()
100}
101
102fn default_buffer_size() -> usize {
103 10_000
104}
105
106fn default_recv_timeout_ms() -> u64 {
107 100
108}
109
110fn default_connect_timeout_ms() -> u64 {
111 5_000
112}
113
114fn default_send_timeout_ms() -> u64 {
115 30_000
116}
117
118fn default_max_body_bytes() -> usize {
119 16 * 1024 * 1024
120}
121
122#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct HttpTransportConfig {
125 #[serde(default)]
127 pub endpoint: Option<String>,
128
129 #[serde(default)]
132 pub listen: Option<String>,
133
134 #[serde(default = "default_recv_path")]
136 pub recv_path: String,
137
138 #[serde(default = "default_buffer_size")]
140 pub recv_buffer_size: usize,
141
142 #[serde(default = "default_recv_timeout_ms")]
144 pub recv_timeout_ms: u64,
145
146 #[serde(default)]
148 pub filters_in: Vec<super::filter::FilterRule>,
149
150 #[serde(default)]
152 pub filters_out: Vec<super::filter::FilterRule>,
153
154 #[serde(default = "default_connect_timeout_ms")]
156 pub connect_timeout_ms: u64,
157
158 #[serde(default = "default_send_timeout_ms")]
161 pub send_timeout_ms: u64,
162
163 #[serde(default = "default_max_body_bytes")]
166 pub max_body_bytes: usize,
167
168 #[serde(default)]
174 pub tls_ca_path: Option<String>,
175}
176
177impl Default for HttpTransportConfig {
178 fn default() -> Self {
179 Self {
180 endpoint: None,
181 listen: None,
182 recv_path: default_recv_path(),
183 recv_buffer_size: default_buffer_size(),
184 recv_timeout_ms: default_recv_timeout_ms(),
185 filters_in: Vec::new(),
186 filters_out: Vec::new(),
187 connect_timeout_ms: default_connect_timeout_ms(),
188 send_timeout_ms: default_send_timeout_ms(),
189 max_body_bytes: default_max_body_bytes(),
190 tls_ca_path: None,
191 }
192 }
193}
194
195impl HttpTransportConfig {
196 #[must_use]
198 pub fn from_cascade() -> Self {
199 <Self as super::traits::FromCascade>::from_cascade_key("transport.http")
200 }
201
202 #[must_use]
204 pub fn sender(endpoint: &str) -> Self {
205 Self {
206 endpoint: Some(endpoint.to_string()),
207 ..Default::default()
208 }
209 }
210
211 #[must_use]
213 pub fn receiver(listen: &str) -> Self {
214 Self {
215 listen: Some(listen.to_string()),
216 ..Default::default()
217 }
218 }
219}
220
221pub struct HttpTransport {
226 client: reqwest::Client,
228
229 endpoint: Option<String>,
231
232 #[cfg(feature = "http-server")]
235 receiver: Option<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Message<HttpToken>>>>,
236
237 #[cfg(feature = "http-server")]
243 shutdown_tx: parking_lot::Mutex<Option<tokio::sync::oneshot::Sender<()>>>,
244
245 #[cfg(feature = "http-server")]
247 _server_handle: Option<tokio::task::JoinHandle<()>>,
248
249 closed: Arc<AtomicBool>,
251
252 #[cfg(feature = "http-server")]
254 recv_timeout_ms: u64,
255
256 filter_engine: super::filter::TransportFilterEngine,
258}
259
260impl HttpTransport {
261 pub async fn new(config: &HttpTransportConfig) -> TransportResult<Self> {
270 Self::new_inner(
272 config,
273 #[cfg(feature = "governor")]
274 None,
275 )
276 .await
277 }
278
279 #[cfg(feature = "governor")]
293 pub async fn with_pressure(
294 config: &HttpTransportConfig,
295 pressure: Option<Arc<crate::governor::UnifiedPressure>>,
296 ) -> TransportResult<Self> {
297 Self::new_inner(config, pressure).await
298 }
299
300 async fn new_inner(
301 config: &HttpTransportConfig,
302 #[cfg(feature = "governor")] pressure: Option<Arc<crate::governor::UnifiedPressure>>,
303 ) -> TransportResult<Self> {
304 #[cfg_attr(not(feature = "tls"), allow(unused_mut))]
306 let mut client_builder = reqwest::Client::builder()
307 .connect_timeout(std::time::Duration::from_millis(config.connect_timeout_ms))
308 .timeout(std::time::Duration::from_millis(config.send_timeout_ms));
309
310 #[cfg(feature = "tls")]
312 if let Some(ref ca) = config.tls_ca_path {
313 let trust = crate::tls::TlsTrust {
314 native_roots: true,
315 webpki_roots: false,
316 extra_roots: vec![ca.into()],
317 extra_intermediates: Vec::new(),
318 exclusive: false,
319 };
320 let tls_cfg =
321 crate::tls::build_client_config(crate::tls::TlsConfigSource::Trust(trust))
322 .map_err(|e| TransportError::Config(format!("HTTP client TLS: {e}")))?;
323 client_builder = client_builder.use_preconfigured_tls((*tls_cfg).clone());
324 }
325 #[cfg(not(feature = "tls"))]
326 if config.tls_ca_path.is_some() {
327 tracing::warn!(
328 "http transport tls_ca_path is set but the `tls` feature is disabled -- \
329 ignoring (using reqwest default roots)"
330 );
331 }
332
333 let client = client_builder
334 .build()
335 .map_err(|e| TransportError::Config(format!("failed to create HTTP client: {e}")))?;
336
337 #[cfg(feature = "http-server")]
338 let (receiver, shutdown_tx, server_handle) = if let Some(listen) = &config.listen {
339 let addr: std::net::SocketAddr = listen
340 .parse()
341 .map_err(|e| TransportError::Config(format!("invalid listen address: {e}")))?;
342
343 let (tx, rx) = tokio::sync::mpsc::channel(config.recv_buffer_size);
344 let (sd_tx, sd_rx) = tokio::sync::oneshot::channel::<()>();
345
346 let sequence = Arc::new(AtomicU64::new(0));
347 let recv_path = config.recv_path.clone();
348
349 let app = build_receiver_router(
350 tx,
351 sequence,
352 &recv_path,
353 config.max_body_bytes,
354 #[cfg(feature = "governor")]
355 pressure.clone(),
356 );
357
358 let listener = tokio::net::TcpListener::bind(addr).await.map_err(|e| {
359 TransportError::Connection(format!("failed to bind to {addr}: {e}"))
360 })?;
361
362 let handle = tokio::spawn(async move {
363 axum::serve(
364 listener,
365 app.into_make_service_with_connect_info::<std::net::SocketAddr>(),
366 )
367 .with_graceful_shutdown(async {
368 sd_rx.await.ok();
369 })
370 .await
371 .ok();
372 });
373
374 (Some(tokio::sync::Mutex::new(rx)), Some(sd_tx), Some(handle))
375 } else {
376 (None, None, None)
377 };
378
379 #[cfg(all(feature = "governor", not(feature = "http-server")))]
383 let _ = pressure;
384
385 #[cfg(feature = "logger")]
386 tracing::info!(
387 endpoint = ?config.endpoint,
388 listen = ?config.listen,
389 "HTTP transport opened"
390 );
391
392 let filter_engine = super::filter::TransportFilterEngine::new(
395 &config.filters_in,
396 &config.filters_out,
397 &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
398 )?;
399
400 let closed = Arc::new(AtomicBool::new(false));
401
402 #[cfg(feature = "health")]
403 {
404 let h = Arc::clone(&closed);
405 crate::health::HealthRegistry::register("transport:http", move || {
406 if h.load(Ordering::Relaxed) {
407 crate::health::HealthStatus::Unhealthy
408 } else {
409 crate::health::HealthStatus::Healthy
410 }
411 });
412 }
413
414 Ok(Self {
415 client,
416 endpoint: config.endpoint.clone(),
417 #[cfg(feature = "http-server")]
418 receiver,
419 #[cfg(feature = "http-server")]
420 shutdown_tx: parking_lot::Mutex::new(shutdown_tx),
421 #[cfg(feature = "http-server")]
422 _server_handle: server_handle,
423 closed,
424 #[cfg(feature = "http-server")]
425 recv_timeout_ms: config.recv_timeout_ms,
426 filter_engine,
427 })
428 }
429}
430
431#[cfg(feature = "http-server")]
433fn build_receiver_router(
434 sender: tokio::sync::mpsc::Sender<Message<HttpToken>>,
435 sequence: Arc<AtomicU64>,
436 recv_path: &str,
437 max_body_bytes: usize,
438 #[cfg(feature = "governor")] pressure: Option<Arc<crate::governor::UnifiedPressure>>,
439) -> axum::Router {
440 use axum::routing::post;
441
442 let state = ReceiverState {
443 sender,
444 sequence,
445 #[cfg(feature = "governor")]
446 pressure,
447 };
448
449 axum::Router::new()
450 .route(recv_path, post(ingest_handler))
451 .layer(axum::extract::DefaultBodyLimit::max(max_body_bytes))
453 .with_state(state)
454}
455
456#[cfg(feature = "http-server")]
458#[derive(Clone)]
459struct ReceiverState {
460 sender: tokio::sync::mpsc::Sender<Message<HttpToken>>,
461 sequence: Arc<AtomicU64>,
462 #[cfg(feature = "governor")]
468 pressure: Option<Arc<crate::governor::UnifiedPressure>>,
469}
470
471#[cfg(feature = "http-server")]
473async fn ingest_handler(
474 axum::extract::State(state): axum::extract::State<ReceiverState>,
475 axum::extract::ConnectInfo(addr): axum::extract::ConnectInfo<std::net::SocketAddr>,
476 headers: axum::http::HeaderMap,
477 body: axum::body::Bytes,
478) -> axum::http::StatusCode {
479 if body.is_empty() {
480 return axum::http::StatusCode::BAD_REQUEST;
481 }
482
483 #[cfg(feature = "governor")]
488 if let Some(pressure) = &state.pressure
489 && pressure.should_hold()
490 {
491 #[cfg(feature = "metrics")]
492 metrics::counter!("dfe_transport_backpressured_total", "transport" => "http", "reason" => "pressure")
493 .increment(1);
494 return axum::http::StatusCode::SERVICE_UNAVAILABLE;
495 }
496
497 #[cfg(feature = "transport-trace")]
499 if let Some(tp) = headers
500 .get(super::propagation::TRACEPARENT_HEADER)
501 .and_then(|v| v.to_str().ok())
502 && super::propagation::is_valid_traceparent(tp)
503 {
504 tracing::Span::current().record("traceparent", tp);
505 }
506
507 #[cfg(not(feature = "otel"))]
509 let _ = &headers;
510
511 let seq = state.sequence.fetch_add(1, Ordering::Relaxed);
512 let format = PayloadFormat::detect(&body);
513 let timestamp_ms = chrono::Utc::now().timestamp_millis();
514
515 let msg = Message {
518 key: None,
519 payload: body,
520 token: HttpToken::with_source(seq, addr.to_string()),
521 timestamp_ms: Some(timestamp_ms),
522 format,
523 };
524
525 match state.sender.try_send(msg) {
526 Ok(()) => {
527 #[cfg(feature = "metrics")]
528 metrics::counter!("dfe_transport_sent_total", "transport" => "http").increment(1);
529 axum::http::StatusCode::OK
530 }
531 Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
532 #[cfg(feature = "metrics")]
533 metrics::counter!("dfe_transport_backpressured_total", "transport" => "http")
534 .increment(1);
535 axum::http::StatusCode::SERVICE_UNAVAILABLE
536 }
537 Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
538 #[cfg(feature = "metrics")]
539 metrics::counter!("dfe_transport_refused_total", "transport" => "http").increment(1);
540 axum::http::StatusCode::GONE
541 }
542 }
543}
544
545impl TransportBase for HttpTransport {
546 async fn close(&self) -> TransportResult<()> {
547 self.closed.store(true, Ordering::Relaxed);
548 #[cfg(feature = "http-server")]
551 if let Some(tx) = self.shutdown_tx.lock().take() {
552 let _ = tx.send(());
553 }
554 Ok(())
555 }
556
557 fn is_healthy(&self) -> bool {
558 !self.closed.load(Ordering::Relaxed)
559 }
560
561 fn name(&self) -> &'static str {
562 "http"
563 }
564}
565
566impl TransportSender for HttpTransport {
567 async fn send(&self, key: &str, payload: bytes::Bytes) -> SendResult {
568 if self.closed.load(Ordering::Relaxed) {
569 return SendResult::Fatal(TransportError::Closed);
570 }
571
572 if self.filter_engine.has_outbound_filters() {
574 match self.filter_engine.apply_outbound(&payload) {
575 super::filter::FilterDisposition::Pass => {}
576 super::filter::FilterDisposition::Drop => return SendResult::Ok,
577 super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
578 }
579 }
580
581 let Some(base_url) = &self.endpoint else {
582 return SendResult::Fatal(TransportError::Config(
583 "no endpoint configured for sending".into(),
584 ));
585 };
586
587 let url = if key.is_empty() {
589 base_url.clone()
590 } else {
591 let base = base_url.trim_end_matches('/');
592 let suffix = key.trim_start_matches('/');
593 format!("{base}/{suffix}")
594 };
595
596 #[cfg(feature = "metrics")]
597 let start = std::time::Instant::now();
598
599 let request_builder = self
601 .client
602 .post(&url)
603 .header("content-type", "application/octet-stream");
604
605 #[cfg(feature = "transport-trace")]
606 let request_builder = if let Some(tp) = super::propagation::current_traceparent() {
607 request_builder.header(super::propagation::TRACEPARENT_HEADER, tp)
608 } else {
609 request_builder
610 };
611
612 #[cfg(feature = "logger")]
613 let payload_len = payload.len();
614 let result = match request_builder.body(payload).send().await {
615 Ok(resp) if resp.status().is_success() => {
616 #[cfg(feature = "logger")]
617 tracing::debug!(url = %url, bytes = payload_len, "HTTP transport: POST sent");
618
619 #[cfg(feature = "metrics")]
620 metrics::counter!("dfe_transport_sent_total", "transport" => "http").increment(1);
621 SendResult::Ok
622 }
623 Ok(resp)
624 if resp.status() == reqwest::StatusCode::TOO_MANY_REQUESTS
625 || resp.status() == reqwest::StatusCode::SERVICE_UNAVAILABLE =>
626 {
627 #[cfg(feature = "logger")]
628 tracing::warn!(status = %resp.status(), url = %url, "HTTP transport: backpressure");
629
630 #[cfg(feature = "metrics")]
631 metrics::counter!("dfe_transport_backpressured_total", "transport" => "http")
632 .increment(1);
633 SendResult::Backpressured
634 }
635 Ok(resp) => {
636 #[cfg(feature = "logger")]
637 tracing::warn!(status = %resp.status(), url = %url, "HTTP transport: send error");
638
639 #[cfg(feature = "metrics")]
640 metrics::counter!("dfe_transport_send_errors_total", "transport" => "http")
641 .increment(1);
642 SendResult::Fatal(TransportError::Send(format!(
643 "HTTP {} from {}",
644 resp.status(),
645 url
646 )))
647 }
648 Err(e) => {
649 #[cfg(feature = "logger")]
650 tracing::warn!(error = %e, url = %url, "HTTP transport: request failed");
651
652 #[cfg(feature = "metrics")]
653 metrics::counter!("dfe_transport_send_errors_total", "transport" => "http")
654 .increment(1);
655 SendResult::Fatal(TransportError::Send(format!("HTTP request failed: {e}")))
656 }
657 };
658
659 #[cfg(feature = "metrics")]
660 metrics::histogram!("dfe_transport_send_duration_seconds", "transport" => "http")
661 .record(start.elapsed().as_secs_f64());
662
663 result
664 }
665}
666
667impl TransportReceiver for HttpTransport {
668 type Token = HttpToken;
669
670 async fn recv(&self, max: usize) -> TransportResult<WorkBatch<Self::Token>> {
671 if self.closed.load(Ordering::Relaxed) {
672 return Err(TransportError::Closed);
673 }
674
675 #[cfg(feature = "http-server")]
676 {
677 let Some(receiver) = &self.receiver else {
678 return Err(TransportError::Config(
679 "no listen address configured for receiving".into(),
680 ));
681 };
682
683 let mut rx = receiver.lock().await;
684 let mut messages = Vec::with_capacity(max.min(100));
685
686 for _ in 0..max {
687 let result = if self.recv_timeout_ms == 0 {
688 match rx.try_recv() {
689 Ok(msg) => Some(msg),
690 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
691 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
692 return Err(TransportError::Closed);
693 }
694 }
695 } else if messages.is_empty() {
696 match tokio::time::timeout(
698 std::time::Duration::from_millis(self.recv_timeout_ms),
699 rx.recv(),
700 )
701 .await
702 {
703 Ok(Some(msg)) => Some(msg),
704 Ok(None) => return Err(TransportError::Closed),
705 Err(_) => break, }
707 } else {
708 match rx.try_recv() {
710 Ok(msg) => Some(msg),
711 Err(_) => break,
712 }
713 };
714
715 if let Some(msg) = result {
716 messages.push(msg);
717 }
718 }
719
720 let batch = self.filter_engine.partition_batch(
723 messages,
724 |m| m.payload.as_ref(),
725 |m| m.key.clone(),
726 );
727 let messages = batch.messages;
728 let dlq_entries = batch.dlq_entries;
729
730 #[cfg(feature = "logger")]
731 if !messages.is_empty() {
732 tracing::debug!(messages = messages.len(), "HTTP transport: batch received");
733 }
734
735 Ok(RecvBatch {
736 messages,
737 dlq_entries,
738 }
739 .into())
740 }
741
742 #[cfg(not(feature = "http-server"))]
743 {
744 let _ = max;
745 Err(TransportError::Config(
746 "HTTP receive requires the 'http-server' feature".into(),
747 ))
748 }
749 }
750
751 async fn commit(&self, _tokens: &[Self::Token]) -> TransportResult<()> {
752 Ok(())
754 }
755}
756
757impl Drop for HttpTransport {
758 fn drop(&mut self) {
759 #[cfg(feature = "http-server")]
760 if let Some(tx) = self.shutdown_tx.lock().take() {
761 let _ = tx.send(());
762 }
763 }
764}
765
766impl super::traits::FromCascade for HttpTransportConfig {}
767
768#[cfg(test)]
769mod tests {
770 use super::*;
771
772 #[test]
773 fn http_token_display() {
774 let token = HttpToken::new(42);
775 assert_eq!(format!("{token}"), "http:42");
776 }
777
778 #[test]
779 fn http_token_display_with_source() {
780 let token = HttpToken::with_source(7, "192.168.1.1:54321".to_string());
781 assert_eq!(format!("{token}"), "http:192.168.1.1:54321:7");
782 }
783
784 #[test]
785 fn config_defaults() {
786 let config = HttpTransportConfig::default();
787 assert!(config.endpoint.is_none());
788 assert!(config.listen.is_none());
789 assert_eq!(config.recv_path, "/ingest");
790 assert_eq!(config.recv_buffer_size, 10_000);
791 assert_eq!(config.recv_timeout_ms, 100);
792 }
793
794 #[test]
795 fn config_sender_helper() {
796 let config = HttpTransportConfig::sender("http://localhost:8080/ingest");
797 assert_eq!(
798 config.endpoint.as_deref(),
799 Some("http://localhost:8080/ingest")
800 );
801 assert!(config.listen.is_none());
802 }
803
804 #[test]
805 fn config_receiver_helper() {
806 let config = HttpTransportConfig::receiver("0.0.0.0:8080");
807 assert!(config.endpoint.is_none());
808 assert_eq!(config.listen.as_deref(), Some("0.0.0.0:8080"));
809 }
810
811 #[tokio::test]
812 async fn send_only_transport() {
813 let config = HttpTransportConfig::default();
815 let transport = HttpTransport::new(&config).await.unwrap();
816
817 assert!(transport.is_healthy());
818 assert_eq!(transport.name(), "http");
819
820 let result = transport
822 .send("test", bytes::Bytes::from_static(b"payload"))
823 .await;
824 assert!(result.is_fatal());
825
826 transport.commit(&[]).await.unwrap();
828 }
829
830 #[tokio::test]
831 async fn close_prevents_send() {
832 let config = HttpTransportConfig::sender("http://localhost:19999/test");
833 let transport = HttpTransport::new(&config).await.unwrap();
834
835 transport.close().await.unwrap();
836 assert!(!transport.is_healthy());
837
838 let result = transport
839 .send("test", bytes::Bytes::from_static(b"data"))
840 .await;
841 assert!(result.is_fatal());
842 }
843
844 #[tokio::test]
845 async fn close_prevents_recv() {
846 let config = HttpTransportConfig::default();
847 let transport = HttpTransport::new(&config).await.unwrap();
848
849 transport.close().await.unwrap();
850 let result = transport.recv(1).await;
851 assert!(result.is_err());
852 }
853
854 #[cfg(feature = "http-server")]
857 #[tokio::test]
858 async fn send_and_receive_roundtrip() {
859 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
861 let addr = listener.local_addr().unwrap();
862 drop(listener); let recv_config = HttpTransportConfig {
865 listen: Some(addr.to_string()),
866 recv_path: "/ingest".to_string(),
867 recv_buffer_size: 100,
868 recv_timeout_ms: 1000,
869 ..Default::default()
870 };
871 let receiver = HttpTransport::new(&recv_config).await.unwrap();
872
873 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
875
876 let send_config =
878 HttpTransportConfig::sender(&format!("http://127.0.0.1:{}/ingest", addr.port()));
879 let sender = HttpTransport::new(&send_config).await.unwrap();
880
881 let result = sender
882 .send("", bytes::Bytes::from_static(b"{\"msg\":\"hello\"}"))
883 .await;
884 assert!(result.is_ok(), "send failed: {result:?}");
885
886 let batch = receiver.recv(10).await.unwrap();
888 assert_eq!(batch.records.len(), 1);
889 assert_eq!(batch.records[0].payload.as_ref(), b"{\"msg\":\"hello\"}");
890 assert!(batch.commit_tokens[0].source_addr.is_some());
892
893 sender.close().await.unwrap();
895 receiver.close().await.unwrap();
896 }
897
898 #[cfg(feature = "http-server")]
900 #[tokio::test]
901 async fn receive_rejects_empty_body() {
902 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
903 let addr = listener.local_addr().unwrap();
904 drop(listener);
905
906 let recv_config = HttpTransportConfig {
907 listen: Some(addr.to_string()),
908 recv_timeout_ms: 200,
909 ..Default::default()
910 };
911 let receiver = HttpTransport::new(&recv_config).await.unwrap();
912 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
913
914 let client = reqwest::Client::new();
916 let resp = client
917 .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
918 .body(Vec::<u8>::new())
919 .send()
920 .await
921 .unwrap();
922
923 assert_eq!(resp.status(), reqwest::StatusCode::BAD_REQUEST);
924
925 let records = receiver.recv(10).await.unwrap().records;
927 assert!(records.is_empty());
928
929 receiver.close().await.unwrap();
930 }
931
932 #[cfg(feature = "http-server")]
934 #[tokio::test]
935 async fn receive_rejects_oversized_body() {
936 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
937 let addr = listener.local_addr().unwrap();
938 drop(listener);
939
940 let recv_config = HttpTransportConfig {
941 listen: Some(addr.to_string()),
942 recv_timeout_ms: 200,
943 max_body_bytes: 1024, ..Default::default()
945 };
946 let receiver = HttpTransport::new(&recv_config).await.unwrap();
947 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
948
949 let client = reqwest::Client::new();
951 let resp = client
952 .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
953 .body(vec![b'x'; 8 * 1024])
954 .send()
955 .await
956 .unwrap();
957
958 assert_eq!(resp.status(), reqwest::StatusCode::PAYLOAD_TOO_LARGE);
959
960 let records = receiver.recv(10).await.unwrap().records;
962 assert!(records.is_empty(), "oversized body must not be queued");
963
964 receiver.close().await.unwrap();
965 }
966
967 #[cfg(feature = "http-server")]
969 #[tokio::test]
970 async fn recv_without_listen_returns_error() {
971 let config = HttpTransportConfig::sender("http://localhost:9999");
972 let transport = HttpTransport::new(&config).await.unwrap();
973
974 let result = transport.recv(10).await;
975 assert!(result.is_err());
976 }
977
978 #[cfg(all(feature = "http-server", feature = "governor"))]
982 #[tokio::test]
983 async fn pressure_high_sheds_with_503_and_none_is_normal() {
984 use crate::governor::{Hysteresis, MemoryPressureSource, PressureSource, UnifiedPressure};
985 use crate::memory::{MemoryGuard, MemoryGuardConfig};
986
987 {
989 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
990 let addr = listener.local_addr().unwrap();
991 drop(listener);
992 let cfg = HttpTransportConfig {
993 listen: Some(addr.to_string()),
994 recv_timeout_ms: 200,
995 ..Default::default()
996 };
997 let receiver = HttpTransport::new(&cfg).await.unwrap();
998 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
999
1000 let client = reqwest::Client::new();
1001 let resp = client
1002 .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
1003 .body(b"{\"msg\":\"ok\"}".to_vec())
1004 .send()
1005 .await
1006 .unwrap();
1007 assert_eq!(
1008 resp.status(),
1009 reqwest::StatusCode::OK,
1010 "no governor -> accepted"
1011 );
1012 receiver.close().await.unwrap();
1013 }
1014
1015 {
1017 let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
1018 limit_bytes: 1000,
1019 pressure_threshold: 0.80,
1020 ..Default::default()
1021 }));
1022 guard.add_bytes(950); let src = MemoryPressureSource::new(Arc::clone(&guard));
1024 let pressure = Arc::new(UnifiedPressure::new(
1025 vec![Arc::new(src) as Arc<dyn PressureSource>],
1026 Hysteresis::new(0.80, 0.65).expect("valid band"),
1027 ));
1028 assert!(pressure.should_hold(), "pinned-high governor must hold");
1030
1031 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1032 let addr = listener.local_addr().unwrap();
1033 drop(listener);
1034 let cfg = HttpTransportConfig {
1035 listen: Some(addr.to_string()),
1036 recv_timeout_ms: 200,
1037 ..Default::default()
1038 };
1039 let receiver = HttpTransport::with_pressure(&cfg, Some(Arc::clone(&pressure)))
1040 .await
1041 .unwrap();
1042 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1043
1044 let client = reqwest::Client::new();
1045 let resp = client
1046 .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
1047 .body(b"{\"msg\":\"shed\"}".to_vec())
1048 .send()
1049 .await
1050 .unwrap();
1051 assert_eq!(
1052 resp.status(),
1053 reqwest::StatusCode::SERVICE_UNAVAILABLE,
1054 "pinned-high governor must shed with 503"
1055 );
1056
1057 let records = receiver.recv(10).await.unwrap().records;
1059 assert!(records.is_empty(), "shed request must not be queued");
1060 receiver.close().await.unwrap();
1061 }
1062 }
1063
1064 #[test]
1065 fn config_serde_roundtrip() {
1066 let config = HttpTransportConfig {
1067 endpoint: Some("http://example.com/ingest".into()),
1068 listen: Some("0.0.0.0:8080".into()),
1069 recv_path: "/custom".into(),
1070 recv_buffer_size: 5000,
1071 recv_timeout_ms: 250,
1072 ..Default::default()
1073 };
1074
1075 let json = serde_json::to_string(&config).unwrap();
1076 let parsed: HttpTransportConfig = serde_json::from_str(&json).unwrap();
1077
1078 assert_eq!(parsed.endpoint, config.endpoint);
1079 assert_eq!(parsed.listen, config.listen);
1080 assert_eq!(parsed.recv_path, config.recv_path);
1081 assert_eq!(parsed.recv_buffer_size, config.recv_buffer_size);
1082 assert_eq!(parsed.recv_timeout_ms, config.recv_timeout_ms);
1083 }
1084}