1use super::error::{TransportError, TransportResult};
38#[cfg(feature = "http-server")]
39use super::traits::RecvBatch;
40use super::traits::{CommitToken, TransportBase, TransportReceiver, TransportSender};
41#[cfg(feature = "http-server")]
42use super::types::Message;
43#[cfg(feature = "http-server")]
44use super::types::PayloadFormat;
45use super::types::SendResult;
46use super::work_batch::WorkBatch;
47use serde::{Deserialize, Serialize};
48use std::sync::Arc;
49#[cfg(feature = "http-server")]
50use std::sync::atomic::AtomicU64;
51use std::sync::atomic::{AtomicBool, Ordering};
52
53#[derive(Debug, Clone)]
59pub struct HttpToken {
60 pub seq: u64,
62
63 pub source_addr: Option<String>,
65}
66
67impl HttpToken {
68 #[must_use]
70 pub fn new(seq: u64) -> Self {
71 Self {
72 seq,
73 source_addr: None,
74 }
75 }
76
77 #[must_use]
79 pub fn with_source(seq: u64, addr: String) -> Self {
80 Self {
81 seq,
82 source_addr: Some(addr),
83 }
84 }
85}
86
87impl CommitToken for HttpToken {}
88
89impl std::fmt::Display for HttpToken {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 match &self.source_addr {
92 Some(addr) => write!(f, "http:{}:{}", addr, self.seq),
93 None => write!(f, "http:{}", self.seq),
94 }
95 }
96}
97
98fn default_recv_path() -> String {
99 "/ingest".to_string()
100}
101
102fn default_buffer_size() -> usize {
103 10_000
104}
105
106fn default_recv_timeout_ms() -> u64 {
107 100
108}
109
110fn default_connect_timeout_ms() -> u64 {
111 5_000
112}
113
114fn default_send_timeout_ms() -> u64 {
115 30_000
116}
117
118fn default_max_body_bytes() -> usize {
119 16 * 1024 * 1024
120}
121
122#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct HttpTransportConfig {
125 #[serde(default)]
127 pub endpoint: Option<String>,
128
129 #[serde(default)]
132 pub listen: Option<String>,
133
134 #[serde(default = "default_recv_path")]
136 pub recv_path: String,
137
138 #[serde(default = "default_buffer_size")]
140 pub recv_buffer_size: usize,
141
142 #[serde(default = "default_recv_timeout_ms")]
144 pub recv_timeout_ms: u64,
145
146 #[serde(default)]
148 pub filters_in: Vec<super::filter::FilterRule>,
149
150 #[serde(default)]
152 pub filters_out: Vec<super::filter::FilterRule>,
153
154 #[serde(default = "default_connect_timeout_ms")]
156 pub connect_timeout_ms: u64,
157
158 #[serde(default = "default_send_timeout_ms")]
161 pub send_timeout_ms: u64,
162
163 #[serde(default = "default_max_body_bytes")]
166 pub max_body_bytes: usize,
167
168 #[serde(default)]
173 pub tls_ca_path: Option<String>,
174}
175
176impl Default for HttpTransportConfig {
177 fn default() -> Self {
178 Self {
179 endpoint: None,
180 listen: None,
181 recv_path: default_recv_path(),
182 recv_buffer_size: default_buffer_size(),
183 recv_timeout_ms: default_recv_timeout_ms(),
184 filters_in: Vec::new(),
185 filters_out: Vec::new(),
186 connect_timeout_ms: default_connect_timeout_ms(),
187 send_timeout_ms: default_send_timeout_ms(),
188 max_body_bytes: default_max_body_bytes(),
189 tls_ca_path: None,
190 }
191 }
192}
193
194impl HttpTransportConfig {
195 #[must_use]
197 pub fn from_cascade() -> Self {
198 <Self as super::traits::FromCascade>::from_cascade_key("transport.http")
199 }
200
201 #[must_use]
203 pub fn sender(endpoint: &str) -> Self {
204 Self {
205 endpoint: Some(endpoint.to_string()),
206 ..Default::default()
207 }
208 }
209
210 #[must_use]
212 pub fn receiver(listen: &str) -> Self {
213 Self {
214 listen: Some(listen.to_string()),
215 ..Default::default()
216 }
217 }
218}
219
220pub struct HttpTransport {
225 client: Option<reqwest::Client>,
229
230 endpoint: Option<String>,
232
233 #[cfg(feature = "http-server")]
236 receiver: Option<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Message<HttpToken>>>>,
237
238 #[cfg(feature = "http-server")]
244 shutdown_tx: parking_lot::Mutex<Option<tokio::sync::oneshot::Sender<()>>>,
245
246 #[cfg(feature = "http-server")]
248 _server_handle: Option<tokio::task::JoinHandle<()>>,
249
250 closed: Arc<AtomicBool>,
252
253 #[cfg(feature = "http-server")]
255 recv_timeout_ms: u64,
256
257 filter_engine: super::filter::TransportFilterEngine,
259}
260
261impl HttpTransport {
262 pub async fn new(config: &HttpTransportConfig) -> TransportResult<Self> {
271 Self::new_inner(
273 config,
274 #[cfg(feature = "governor")]
275 None,
276 )
277 .await
278 }
279
280 #[cfg(feature = "governor")]
294 pub async fn with_pressure(
295 config: &HttpTransportConfig,
296 pressure: Option<Arc<crate::governor::UnifiedPressure>>,
297 ) -> TransportResult<Self> {
298 Self::new_inner(config, pressure).await
299 }
300
301 async fn new_inner(
302 config: &HttpTransportConfig,
303 #[cfg(feature = "governor")] pressure: Option<Arc<crate::governor::UnifiedPressure>>,
304 ) -> TransportResult<Self> {
305 let client = if config.endpoint.is_some() {
309 #[cfg_attr(not(feature = "tls"), allow(unused_mut))]
311 let mut client_builder = reqwest::Client::builder()
312 .connect_timeout(std::time::Duration::from_millis(config.connect_timeout_ms))
313 .timeout(std::time::Duration::from_millis(config.send_timeout_ms));
314
315 #[cfg(feature = "tls")]
317 if let Some(ref ca) = config.tls_ca_path {
318 let trust = crate::tls::TlsTrust {
319 native_roots: true,
320 webpki_roots: false,
321 extra_roots: vec![ca.into()],
322 extra_intermediates: Vec::new(),
323 exclusive: false,
324 };
325 let tls_cfg =
326 crate::tls::build_client_config(crate::tls::TlsConfigSource::Trust(trust))
327 .map_err(|e| TransportError::Config(format!("HTTP client TLS: {e}")))?;
328 client_builder = client_builder.use_preconfigured_tls((*tls_cfg).clone());
329 }
330 #[cfg(not(feature = "tls"))]
331 if config.tls_ca_path.is_some() {
332 tracing::warn!(
333 "http transport tls_ca_path is set but the `tls` feature is disabled -- \
334 ignoring (using reqwest default roots)"
335 );
336 }
337
338 Some(client_builder.build().map_err(|e| {
339 TransportError::Config(format!("failed to create HTTP client: {e}"))
340 })?)
341 } else {
342 None
343 };
344
345 #[cfg(feature = "http-server")]
346 let (receiver, shutdown_tx, server_handle) = if let Some(listen) = &config.listen {
347 let addr: std::net::SocketAddr = listen
348 .parse()
349 .map_err(|e| TransportError::Config(format!("invalid listen address: {e}")))?;
350
351 let (tx, rx) = tokio::sync::mpsc::channel(config.recv_buffer_size);
352 let (sd_tx, sd_rx) = tokio::sync::oneshot::channel::<()>();
353
354 let sequence = Arc::new(AtomicU64::new(0));
355 let recv_path = config.recv_path.clone();
356
357 let app = build_receiver_router(
358 tx,
359 sequence,
360 &recv_path,
361 config.max_body_bytes,
362 #[cfg(feature = "governor")]
363 pressure.clone(),
364 );
365
366 let listener = tokio::net::TcpListener::bind(addr).await.map_err(|e| {
367 TransportError::Connection(format!("failed to bind to {addr}: {e}"))
368 })?;
369
370 let handle = tokio::spawn(async move {
371 axum::serve(
372 listener,
373 app.into_make_service_with_connect_info::<std::net::SocketAddr>(),
374 )
375 .with_graceful_shutdown(async {
376 sd_rx.await.ok();
377 })
378 .await
379 .ok();
380 });
381
382 (Some(tokio::sync::Mutex::new(rx)), Some(sd_tx), Some(handle))
383 } else {
384 (None, None, None)
385 };
386
387 #[cfg(all(feature = "governor", not(feature = "http-server")))]
391 let _ = pressure;
392
393 #[cfg(feature = "logger")]
394 tracing::info!(
395 endpoint = ?config.endpoint,
396 listen = ?config.listen,
397 "HTTP transport opened"
398 );
399
400 let filter_engine = super::filter::TransportFilterEngine::new(
403 &config.filters_in,
404 &config.filters_out,
405 &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
406 )?;
407
408 let closed = Arc::new(AtomicBool::new(false));
409
410 #[cfg(feature = "health")]
411 {
412 let h = Arc::clone(&closed);
413 crate::health::HealthRegistry::register("transport:http", move || {
414 if h.load(Ordering::Relaxed) {
415 crate::health::HealthStatus::Unhealthy
416 } else {
417 crate::health::HealthStatus::Healthy
418 }
419 });
420 }
421
422 Ok(Self {
423 client,
424 endpoint: config.endpoint.clone(),
425 #[cfg(feature = "http-server")]
426 receiver,
427 #[cfg(feature = "http-server")]
428 shutdown_tx: parking_lot::Mutex::new(shutdown_tx),
429 #[cfg(feature = "http-server")]
430 _server_handle: server_handle,
431 closed,
432 #[cfg(feature = "http-server")]
433 recv_timeout_ms: config.recv_timeout_ms,
434 filter_engine,
435 })
436 }
437}
438
439#[cfg(feature = "http-server")]
441fn build_receiver_router(
442 sender: tokio::sync::mpsc::Sender<Message<HttpToken>>,
443 sequence: Arc<AtomicU64>,
444 recv_path: &str,
445 max_body_bytes: usize,
446 #[cfg(feature = "governor")] pressure: Option<Arc<crate::governor::UnifiedPressure>>,
447) -> axum::Router {
448 use axum::routing::post;
449
450 let state = ReceiverState {
451 sender,
452 sequence,
453 #[cfg(feature = "governor")]
454 pressure,
455 };
456
457 axum::Router::new()
458 .route(recv_path, post(ingest_handler))
459 .layer(axum::extract::DefaultBodyLimit::max(max_body_bytes))
461 .with_state(state)
462}
463
464#[cfg(feature = "http-server")]
466#[derive(Clone)]
467struct ReceiverState {
468 sender: tokio::sync::mpsc::Sender<Message<HttpToken>>,
469 sequence: Arc<AtomicU64>,
470 #[cfg(feature = "governor")]
476 pressure: Option<Arc<crate::governor::UnifiedPressure>>,
477}
478
479#[cfg(feature = "http-server")]
481async fn ingest_handler(
482 axum::extract::State(state): axum::extract::State<ReceiverState>,
483 axum::extract::ConnectInfo(addr): axum::extract::ConnectInfo<std::net::SocketAddr>,
484 headers: axum::http::HeaderMap,
485 body: axum::body::Bytes,
486) -> axum::response::Response {
487 use axum::response::IntoResponse as _;
488
489 if body.is_empty() {
490 return axum::http::StatusCode::BAD_REQUEST.into_response();
491 }
492
493 #[cfg(feature = "governor")]
498 if let Some(pressure) = &state.pressure
499 && pressure.should_hold()
500 {
501 #[cfg(feature = "metrics")]
502 metrics::counter!("dfe_transport_backpressured_total", "transport" => "http", "reason" => "pressure")
503 .increment(1);
504 return shed_503();
505 }
506
507 #[cfg(feature = "transport-trace")]
509 if let Some(tp) = headers
510 .get(super::propagation::TRACEPARENT_HEADER)
511 .and_then(|v| v.to_str().ok())
512 && super::propagation::is_valid_traceparent(tp)
513 {
514 tracing::Span::current().record("traceparent", tp);
515 }
516
517 #[cfg(not(feature = "otel"))]
519 let _ = &headers;
520
521 let seq = state.sequence.fetch_add(1, Ordering::Relaxed);
522 let format = PayloadFormat::detect(&body);
523 let timestamp_ms = chrono::Utc::now().timestamp_millis();
524
525 let msg = Message {
528 key: None,
529 payload: body,
530 token: HttpToken::with_source(seq, addr.to_string()),
531 timestamp_ms: Some(timestamp_ms),
532 format,
533 };
534
535 match state.sender.try_send(msg) {
536 Ok(()) => {
537 #[cfg(feature = "metrics")]
538 metrics::counter!("dfe_transport_sent_total", "transport" => "http").increment(1);
539 axum::http::StatusCode::OK.into_response()
540 }
541 Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
542 #[cfg(feature = "metrics")]
543 metrics::counter!("dfe_transport_backpressured_total", "transport" => "http")
544 .increment(1);
545 shed_503()
546 }
547 Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
548 #[cfg(feature = "metrics")]
549 metrics::counter!("dfe_transport_refused_total", "transport" => "http").increment(1);
550 axum::http::StatusCode::GONE.into_response()
551 }
552 }
553}
554
555#[cfg(feature = "http-server")]
558fn shed_503() -> axum::response::Response {
559 use axum::response::IntoResponse as _;
560 (
561 axum::http::StatusCode::SERVICE_UNAVAILABLE,
562 [(axum::http::header::RETRY_AFTER, "1")],
563 )
564 .into_response()
565}
566
567impl TransportBase for HttpTransport {
568 async fn close(&self) -> TransportResult<()> {
569 self.closed.store(true, Ordering::Relaxed);
570 #[cfg(feature = "http-server")]
573 if let Some(tx) = self.shutdown_tx.lock().take() {
574 let _ = tx.send(());
575 }
576 Ok(())
577 }
578
579 fn is_healthy(&self) -> bool {
580 !self.closed.load(Ordering::Relaxed)
581 }
582
583 fn name(&self) -> &'static str {
584 "http"
585 }
586}
587
588impl TransportSender for HttpTransport {
589 async fn send(&self, key: &str, payload: bytes::Bytes) -> SendResult {
590 if self.closed.load(Ordering::Relaxed) {
591 return SendResult::Fatal(TransportError::Closed);
592 }
593
594 if self.filter_engine.has_outbound_filters() {
596 match self.filter_engine.apply_outbound(&payload) {
597 super::filter::FilterDisposition::Pass => {}
598 super::filter::FilterDisposition::Drop => return SendResult::Ok,
599 super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
600 }
601 }
602
603 let Some(base_url) = &self.endpoint else {
604 return SendResult::Fatal(TransportError::Config(
605 "no endpoint configured for sending".into(),
606 ));
607 };
608
609 let Some(client) = &self.client else {
612 return SendResult::Fatal(TransportError::Config(
613 "no send client (receive-only HTTP transport)".into(),
614 ));
615 };
616
617 let url = if key.is_empty() {
619 base_url.clone()
620 } else {
621 let base = base_url.trim_end_matches('/');
622 let suffix = key.trim_start_matches('/');
623 format!("{base}/{suffix}")
624 };
625
626 #[cfg(feature = "metrics")]
627 let start = std::time::Instant::now();
628
629 let request_builder = client
631 .post(&url)
632 .header("content-type", "application/octet-stream");
633
634 #[cfg(feature = "transport-trace")]
635 let request_builder = if let Some(tp) = super::propagation::current_traceparent() {
636 request_builder.header(super::propagation::TRACEPARENT_HEADER, tp)
637 } else {
638 request_builder
639 };
640
641 #[cfg(feature = "logger")]
642 let payload_len = payload.len();
643 let result = match request_builder.body(payload).send().await {
644 Ok(resp) if resp.status().is_success() => {
645 #[cfg(feature = "logger")]
646 tracing::debug!(url = %url, bytes = payload_len, "HTTP transport: POST sent");
647
648 #[cfg(feature = "metrics")]
649 metrics::counter!("dfe_transport_sent_total", "transport" => "http").increment(1);
650 SendResult::Ok
651 }
652 Ok(resp)
653 if resp.status() == reqwest::StatusCode::TOO_MANY_REQUESTS
654 || resp.status() == reqwest::StatusCode::SERVICE_UNAVAILABLE =>
655 {
656 #[cfg(feature = "logger")]
657 tracing::warn!(status = %resp.status(), url = %url, "HTTP transport: backpressure");
658
659 #[cfg(feature = "metrics")]
660 metrics::counter!("dfe_transport_backpressured_total", "transport" => "http")
661 .increment(1);
662 SendResult::Backpressured
663 }
664 Ok(resp) => {
665 #[cfg(feature = "logger")]
666 tracing::warn!(status = %resp.status(), url = %url, "HTTP transport: send error");
667
668 #[cfg(feature = "metrics")]
669 metrics::counter!("dfe_transport_send_errors_total", "transport" => "http")
670 .increment(1);
671 SendResult::Fatal(TransportError::Send(format!(
672 "HTTP {} from {}",
673 resp.status(),
674 url
675 )))
676 }
677 Err(e) => {
678 #[cfg(feature = "logger")]
679 tracing::warn!(error = %e, url = %url, "HTTP transport: request failed");
680
681 #[cfg(feature = "metrics")]
682 metrics::counter!("dfe_transport_send_errors_total", "transport" => "http")
683 .increment(1);
684 SendResult::Fatal(TransportError::Send(format!("HTTP request failed: {e}")))
685 }
686 };
687
688 #[cfg(feature = "metrics")]
689 metrics::histogram!("dfe_transport_send_duration_seconds", "transport" => "http")
690 .record(start.elapsed().as_secs_f64());
691
692 result
693 }
694}
695
696impl TransportReceiver for HttpTransport {
697 type Token = HttpToken;
698
699 async fn recv(&self, max: usize) -> TransportResult<WorkBatch<Self::Token>> {
700 if self.closed.load(Ordering::Relaxed) {
701 return Err(TransportError::Closed);
702 }
703
704 #[cfg(feature = "http-server")]
705 {
706 let Some(receiver) = &self.receiver else {
707 return Err(TransportError::Config(
708 "no listen address configured for receiving".into(),
709 ));
710 };
711
712 let mut rx = receiver.lock().await;
713 let mut messages = Vec::with_capacity(max.min(100));
714
715 for _ in 0..max {
716 let result = if self.recv_timeout_ms == 0 {
717 match rx.try_recv() {
718 Ok(msg) => Some(msg),
719 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
720 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
721 return Err(TransportError::Closed);
722 }
723 }
724 } else if messages.is_empty() {
725 match tokio::time::timeout(
727 std::time::Duration::from_millis(self.recv_timeout_ms),
728 rx.recv(),
729 )
730 .await
731 {
732 Ok(Some(msg)) => Some(msg),
733 Ok(None) => return Err(TransportError::Closed),
734 Err(_) => break, }
736 } else {
737 match rx.try_recv() {
739 Ok(msg) => Some(msg),
740 Err(_) => break,
741 }
742 };
743
744 if let Some(msg) = result {
745 messages.push(msg);
746 }
747 }
748
749 let batch = self.filter_engine.partition_batch(
752 messages,
753 |m| m.payload.as_ref(),
754 |m| m.key.clone(),
755 |m| m.token.clone(),
756 );
757 let messages = batch.messages;
758 let dlq_entries = batch.dlq_entries;
759 let filtered_tokens = batch.filtered_tokens;
760
761 #[cfg(feature = "logger")]
762 if !messages.is_empty() {
763 tracing::debug!(messages = messages.len(), "HTTP transport: batch received");
764 }
765
766 #[cfg(feature = "metrics")]
769 if !messages.is_empty() {
770 metrics::counter!("dfe_transport_received_total", "transport" => "http")
771 .increment(messages.len() as u64);
772 }
773
774 Ok(RecvBatch {
775 messages,
776 dlq_entries,
777 filtered_tokens,
778 }
779 .into())
780 }
781
782 #[cfg(not(feature = "http-server"))]
783 {
784 let _ = max;
785 Err(TransportError::Config(
786 "HTTP receive requires the 'http-server' feature".into(),
787 ))
788 }
789 }
790
791 async fn commit(&self, _tokens: &[Self::Token]) -> TransportResult<()> {
792 Ok(())
794 }
795}
796
797impl Drop for HttpTransport {
798 fn drop(&mut self) {
799 #[cfg(feature = "http-server")]
800 if let Some(tx) = self.shutdown_tx.lock().take() {
801 let _ = tx.send(());
802 }
803 }
804}
805
806impl super::traits::FromCascade for HttpTransportConfig {}
807
808#[cfg(test)]
809mod tests {
810 use super::*;
811
812 #[test]
813 fn http_token_display() {
814 let token = HttpToken::new(42);
815 assert_eq!(format!("{token}"), "http:42");
816 }
817
818 #[test]
819 fn http_token_display_with_source() {
820 let token = HttpToken::with_source(7, "192.168.1.1:54321".to_string());
821 assert_eq!(format!("{token}"), "http:192.168.1.1:54321:7");
822 }
823
824 #[test]
825 fn config_defaults() {
826 let config = HttpTransportConfig::default();
827 assert!(config.endpoint.is_none());
828 assert!(config.listen.is_none());
829 assert_eq!(config.recv_path, "/ingest");
830 assert_eq!(config.recv_buffer_size, 10_000);
831 assert_eq!(config.recv_timeout_ms, 100);
832 }
833
834 #[test]
835 fn config_sender_helper() {
836 let config = HttpTransportConfig::sender("http://localhost:8080/ingest");
837 assert_eq!(
838 config.endpoint.as_deref(),
839 Some("http://localhost:8080/ingest")
840 );
841 assert!(config.listen.is_none());
842 }
843
844 #[test]
845 fn config_receiver_helper() {
846 let config = HttpTransportConfig::receiver("0.0.0.0:8080");
847 assert!(config.endpoint.is_none());
848 assert_eq!(config.listen.as_deref(), Some("0.0.0.0:8080"));
849 }
850
851 #[tokio::test]
852 async fn send_only_transport() {
853 let config = HttpTransportConfig::default();
855 let transport = HttpTransport::new(&config).await.unwrap();
856
857 assert!(transport.is_healthy());
858 assert_eq!(transport.name(), "http");
859
860 let result = transport
862 .send("test", bytes::Bytes::from_static(b"payload"))
863 .await;
864 assert!(result.is_fatal());
865
866 transport.commit(&[]).await.unwrap();
868 }
869
870 #[tokio::test]
871 async fn close_prevents_send() {
872 let config = HttpTransportConfig::sender("http://localhost:19999/test");
873 let transport = HttpTransport::new(&config).await.unwrap();
874
875 transport.close().await.unwrap();
876 assert!(!transport.is_healthy());
877
878 let result = transport
879 .send("test", bytes::Bytes::from_static(b"data"))
880 .await;
881 assert!(result.is_fatal());
882 }
883
884 #[tokio::test]
885 async fn close_prevents_recv() {
886 let config = HttpTransportConfig::default();
887 let transport = HttpTransport::new(&config).await.unwrap();
888
889 transport.close().await.unwrap();
890 let result = transport.recv(1).await;
891 assert!(result.is_err());
892 }
893
894 #[cfg(feature = "http-server")]
897 #[tokio::test]
898 async fn send_and_receive_roundtrip() {
899 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
901 let addr = listener.local_addr().unwrap();
902 drop(listener); let recv_config = HttpTransportConfig {
905 listen: Some(addr.to_string()),
906 recv_path: "/ingest".to_string(),
907 recv_buffer_size: 100,
908 recv_timeout_ms: 1000,
909 ..Default::default()
910 };
911 let receiver = HttpTransport::new(&recv_config).await.unwrap();
912
913 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
915
916 let send_config =
918 HttpTransportConfig::sender(&format!("http://127.0.0.1:{}/ingest", addr.port()));
919 let sender = HttpTransport::new(&send_config).await.unwrap();
920
921 let result = sender
922 .send("", bytes::Bytes::from_static(b"{\"msg\":\"hello\"}"))
923 .await;
924 assert!(result.is_ok(), "send failed: {result:?}");
925
926 let batch = receiver.recv(10).await.unwrap();
928 assert_eq!(batch.records.len(), 1);
929 assert_eq!(batch.records[0].payload.as_ref(), b"{\"msg\":\"hello\"}");
930 assert!(batch.commit_tokens[0].source_addr.is_some());
932
933 sender.close().await.unwrap();
935 receiver.close().await.unwrap();
936 }
937
938 #[cfg(feature = "http-server")]
940 #[tokio::test]
941 async fn receive_rejects_empty_body() {
942 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
943 let addr = listener.local_addr().unwrap();
944 drop(listener);
945
946 let recv_config = HttpTransportConfig {
947 listen: Some(addr.to_string()),
948 recv_timeout_ms: 200,
949 ..Default::default()
950 };
951 let receiver = HttpTransport::new(&recv_config).await.unwrap();
952 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
953
954 let client = reqwest::Client::new();
956 let resp = client
957 .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
958 .body(Vec::<u8>::new())
959 .send()
960 .await
961 .unwrap();
962
963 assert_eq!(resp.status(), reqwest::StatusCode::BAD_REQUEST);
964
965 let records = receiver.recv(10).await.unwrap().records;
967 assert!(records.is_empty());
968
969 receiver.close().await.unwrap();
970 }
971
972 #[cfg(feature = "http-server")]
974 #[tokio::test]
975 async fn receive_rejects_oversized_body() {
976 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
977 let addr = listener.local_addr().unwrap();
978 drop(listener);
979
980 let recv_config = HttpTransportConfig {
981 listen: Some(addr.to_string()),
982 recv_timeout_ms: 200,
983 max_body_bytes: 1024, ..Default::default()
985 };
986 let receiver = HttpTransport::new(&recv_config).await.unwrap();
987 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
988
989 let client = reqwest::Client::new();
991 let resp = client
992 .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
993 .body(vec![b'x'; 8 * 1024])
994 .send()
995 .await
996 .unwrap();
997
998 assert_eq!(resp.status(), reqwest::StatusCode::PAYLOAD_TOO_LARGE);
999
1000 let records = receiver.recv(10).await.unwrap().records;
1002 assert!(records.is_empty(), "oversized body must not be queued");
1003
1004 receiver.close().await.unwrap();
1005 }
1006
1007 #[cfg(feature = "http-server")]
1009 #[tokio::test]
1010 async fn recv_without_listen_returns_error() {
1011 let config = HttpTransportConfig::sender("http://localhost:9999");
1012 let transport = HttpTransport::new(&config).await.unwrap();
1013
1014 let result = transport.recv(10).await;
1015 assert!(result.is_err());
1016 }
1017
1018 #[tokio::test]
1021 async fn sender_mode_builds_a_client() {
1022 let transport = HttpTransport::new(&HttpTransportConfig::sender("http://localhost:9999"))
1023 .await
1024 .unwrap();
1025 assert!(
1026 transport.client.is_some(),
1027 "sender mode must build a client"
1028 );
1029 }
1030
1031 #[cfg(feature = "http-server")]
1034 #[tokio::test]
1035 async fn receiver_mode_builds_no_client_and_send_errors() {
1036 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1037 let addr = listener.local_addr().unwrap();
1038 drop(listener); let transport = HttpTransport::new(&HttpTransportConfig::receiver(&addr.to_string()))
1040 .await
1041 .unwrap();
1042 assert!(
1043 transport.client.is_none(),
1044 "receive-only must build no send client"
1045 );
1046 let result = transport.send("k", bytes::Bytes::from_static(b"x")).await;
1047 assert!(
1048 matches!(result, SendResult::Fatal(_)),
1049 "send on a receive-only transport must be fatal"
1050 );
1051 transport.close().await.unwrap();
1052 }
1053
1054 #[cfg(all(feature = "http-server", feature = "governor"))]
1058 #[tokio::test]
1059 async fn pressure_high_sheds_with_503_and_none_is_normal() {
1060 use crate::governor::{Hysteresis, MemoryPressureSource, PressureSource, UnifiedPressure};
1061 use crate::memory::{MemoryGuard, MemoryGuardConfig};
1062
1063 {
1065 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1066 let addr = listener.local_addr().unwrap();
1067 drop(listener);
1068 let cfg = HttpTransportConfig {
1069 listen: Some(addr.to_string()),
1070 recv_timeout_ms: 200,
1071 ..Default::default()
1072 };
1073 let receiver = HttpTransport::new(&cfg).await.unwrap();
1074 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1075
1076 let client = reqwest::Client::new();
1077 let resp = client
1078 .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
1079 .body(b"{\"msg\":\"ok\"}".to_vec())
1080 .send()
1081 .await
1082 .unwrap();
1083 assert_eq!(
1084 resp.status(),
1085 reqwest::StatusCode::OK,
1086 "no governor -> accepted"
1087 );
1088 receiver.close().await.unwrap();
1089 }
1090
1091 {
1093 let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
1094 limit_bytes: 1000,
1095 pressure_threshold: 0.80,
1096 ..Default::default()
1097 }));
1098 guard.add_bytes(950); let src = MemoryPressureSource::new(Arc::clone(&guard));
1100 let pressure = Arc::new(UnifiedPressure::new(
1101 vec![Arc::new(src) as Arc<dyn PressureSource>],
1102 Hysteresis::new(0.80, 0.65).expect("valid band"),
1103 ));
1104 assert!(pressure.should_hold(), "pinned-high governor must hold");
1106
1107 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1108 let addr = listener.local_addr().unwrap();
1109 drop(listener);
1110 let cfg = HttpTransportConfig {
1111 listen: Some(addr.to_string()),
1112 recv_timeout_ms: 200,
1113 ..Default::default()
1114 };
1115 let receiver = HttpTransport::with_pressure(&cfg, Some(Arc::clone(&pressure)))
1116 .await
1117 .unwrap();
1118 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1119
1120 let client = reqwest::Client::new();
1121 let resp = client
1122 .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
1123 .body(b"{\"msg\":\"shed\"}".to_vec())
1124 .send()
1125 .await
1126 .unwrap();
1127 assert_eq!(
1128 resp.status(),
1129 reqwest::StatusCode::SERVICE_UNAVAILABLE,
1130 "pinned-high governor must shed with 503"
1131 );
1132 assert_eq!(
1135 resp.headers()
1136 .get(reqwest::header::RETRY_AFTER)
1137 .and_then(|v| v.to_str().ok()),
1138 Some("1"),
1139 "503 shed must carry Retry-After"
1140 );
1141
1142 let records = receiver.recv(10).await.unwrap().records;
1144 assert!(records.is_empty(), "shed request must not be queued");
1145 receiver.close().await.unwrap();
1146 }
1147 }
1148
1149 #[test]
1150 fn config_serde_roundtrip() {
1151 let config = HttpTransportConfig {
1152 endpoint: Some("http://example.com/ingest".into()),
1153 listen: Some("0.0.0.0:8080".into()),
1154 recv_path: "/custom".into(),
1155 recv_buffer_size: 5000,
1156 recv_timeout_ms: 250,
1157 ..Default::default()
1158 };
1159
1160 let json = serde_json::to_string(&config).unwrap();
1161 let parsed: HttpTransportConfig = serde_json::from_str(&json).unwrap();
1162
1163 assert_eq!(parsed.endpoint, config.endpoint);
1164 assert_eq!(parsed.listen, config.listen);
1165 assert_eq!(parsed.recv_path, config.recv_path);
1166 assert_eq!(parsed.recv_buffer_size, config.recv_buffer_size);
1167 assert_eq!(parsed.recv_timeout_ms, config.recv_timeout_ms);
1168 }
1169}