1use super::error::{TransportError, TransportResult};
38use super::traits::{CommitToken, RecvBatch, TransportBase, TransportReceiver, TransportSender};
39#[cfg(feature = "http-server")]
40use super::types::Message;
41#[cfg(feature = "http-server")]
42use super::types::PayloadFormat;
43use super::types::SendResult;
44use serde::{Deserialize, Serialize};
45use std::sync::Arc;
46#[cfg(feature = "http-server")]
47use std::sync::atomic::AtomicU64;
48use std::sync::atomic::{AtomicBool, Ordering};
49
50#[derive(Debug, Clone)]
56pub struct HttpToken {
57 pub seq: u64,
59
60 pub source_addr: Option<String>,
62}
63
64impl HttpToken {
65 #[must_use]
67 pub fn new(seq: u64) -> Self {
68 Self {
69 seq,
70 source_addr: None,
71 }
72 }
73
74 #[must_use]
76 pub fn with_source(seq: u64, addr: String) -> Self {
77 Self {
78 seq,
79 source_addr: Some(addr),
80 }
81 }
82}
83
84impl CommitToken for HttpToken {}
85
86impl std::fmt::Display for HttpToken {
87 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 match &self.source_addr {
89 Some(addr) => write!(f, "http:{}:{}", addr, self.seq),
90 None => write!(f, "http:{}", self.seq),
91 }
92 }
93}
94
95fn default_recv_path() -> String {
96 "/ingest".to_string()
97}
98
99fn default_buffer_size() -> usize {
100 10_000
101}
102
103fn default_recv_timeout_ms() -> u64 {
104 100
105}
106
107fn default_connect_timeout_ms() -> u64 {
108 5_000
109}
110
111fn default_send_timeout_ms() -> u64 {
112 30_000
113}
114
115fn default_max_body_bytes() -> usize {
116 16 * 1024 * 1024
117}
118
119#[derive(Debug, Clone, Serialize, Deserialize)]
121pub struct HttpTransportConfig {
122 #[serde(default)]
124 pub endpoint: Option<String>,
125
126 #[serde(default)]
129 pub listen: Option<String>,
130
131 #[serde(default = "default_recv_path")]
133 pub recv_path: String,
134
135 #[serde(default = "default_buffer_size")]
137 pub recv_buffer_size: usize,
138
139 #[serde(default = "default_recv_timeout_ms")]
141 pub recv_timeout_ms: u64,
142
143 #[serde(default)]
145 pub filters_in: Vec<super::filter::FilterRule>,
146
147 #[serde(default)]
149 pub filters_out: Vec<super::filter::FilterRule>,
150
151 #[serde(default = "default_connect_timeout_ms")]
153 pub connect_timeout_ms: u64,
154
155 #[serde(default = "default_send_timeout_ms")]
158 pub send_timeout_ms: u64,
159
160 #[serde(default = "default_max_body_bytes")]
163 pub max_body_bytes: usize,
164
165 #[serde(default)]
171 pub tls_ca_path: Option<String>,
172}
173
174impl Default for HttpTransportConfig {
175 fn default() -> Self {
176 Self {
177 endpoint: None,
178 listen: None,
179 recv_path: default_recv_path(),
180 recv_buffer_size: default_buffer_size(),
181 recv_timeout_ms: default_recv_timeout_ms(),
182 filters_in: Vec::new(),
183 filters_out: Vec::new(),
184 connect_timeout_ms: default_connect_timeout_ms(),
185 send_timeout_ms: default_send_timeout_ms(),
186 max_body_bytes: default_max_body_bytes(),
187 tls_ca_path: None,
188 }
189 }
190}
191
192impl HttpTransportConfig {
193 #[must_use]
195 pub fn from_cascade() -> Self {
196 <Self as super::traits::FromCascade>::from_cascade_key("transport.http")
197 }
198
199 #[must_use]
201 pub fn sender(endpoint: &str) -> Self {
202 Self {
203 endpoint: Some(endpoint.to_string()),
204 ..Default::default()
205 }
206 }
207
208 #[must_use]
210 pub fn receiver(listen: &str) -> Self {
211 Self {
212 listen: Some(listen.to_string()),
213 ..Default::default()
214 }
215 }
216}
217
218pub struct HttpTransport {
223 client: reqwest::Client,
225
226 endpoint: Option<String>,
228
229 #[cfg(feature = "http-server")]
232 receiver: Option<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Message<HttpToken>>>>,
233
234 #[cfg(feature = "http-server")]
240 shutdown_tx: parking_lot::Mutex<Option<tokio::sync::oneshot::Sender<()>>>,
241
242 #[cfg(feature = "http-server")]
244 _server_handle: Option<tokio::task::JoinHandle<()>>,
245
246 closed: Arc<AtomicBool>,
248
249 #[cfg(feature = "http-server")]
251 recv_timeout_ms: u64,
252
253 filter_engine: super::filter::TransportFilterEngine,
255}
256
257impl HttpTransport {
258 pub async fn new(config: &HttpTransportConfig) -> TransportResult<Self> {
267 #[cfg_attr(not(feature = "tls"), allow(unused_mut))]
269 let mut client_builder = reqwest::Client::builder()
270 .connect_timeout(std::time::Duration::from_millis(config.connect_timeout_ms))
271 .timeout(std::time::Duration::from_millis(config.send_timeout_ms));
272
273 #[cfg(feature = "tls")]
275 if let Some(ref ca) = config.tls_ca_path {
276 let trust = crate::tls::TlsTrust {
277 native_roots: true,
278 webpki_roots: false,
279 extra_roots: vec![ca.into()],
280 extra_intermediates: Vec::new(),
281 exclusive: false,
282 };
283 let tls_cfg =
284 crate::tls::build_client_config(crate::tls::TlsConfigSource::Trust(trust))
285 .map_err(|e| TransportError::Config(format!("HTTP client TLS: {e}")))?;
286 client_builder = client_builder.use_preconfigured_tls((*tls_cfg).clone());
287 }
288 #[cfg(not(feature = "tls"))]
289 if config.tls_ca_path.is_some() {
290 tracing::warn!(
291 "http transport tls_ca_path is set but the `tls` feature is disabled -- \
292 ignoring (using reqwest default roots)"
293 );
294 }
295
296 let client = client_builder
297 .build()
298 .map_err(|e| TransportError::Config(format!("failed to create HTTP client: {e}")))?;
299
300 #[cfg(feature = "http-server")]
301 let (receiver, shutdown_tx, server_handle) = if let Some(listen) = &config.listen {
302 let addr: std::net::SocketAddr = listen
303 .parse()
304 .map_err(|e| TransportError::Config(format!("invalid listen address: {e}")))?;
305
306 let (tx, rx) = tokio::sync::mpsc::channel(config.recv_buffer_size);
307 let (sd_tx, sd_rx) = tokio::sync::oneshot::channel::<()>();
308
309 let sequence = Arc::new(AtomicU64::new(0));
310 let recv_path = config.recv_path.clone();
311
312 let app = build_receiver_router(tx, sequence, &recv_path, config.max_body_bytes);
313
314 let listener = tokio::net::TcpListener::bind(addr).await.map_err(|e| {
315 TransportError::Connection(format!("failed to bind to {addr}: {e}"))
316 })?;
317
318 let handle = tokio::spawn(async move {
319 axum::serve(
320 listener,
321 app.into_make_service_with_connect_info::<std::net::SocketAddr>(),
322 )
323 .with_graceful_shutdown(async {
324 sd_rx.await.ok();
325 })
326 .await
327 .ok();
328 });
329
330 (Some(tokio::sync::Mutex::new(rx)), Some(sd_tx), Some(handle))
331 } else {
332 (None, None, None)
333 };
334
335 #[cfg(feature = "logger")]
336 tracing::info!(
337 endpoint = ?config.endpoint,
338 listen = ?config.listen,
339 "HTTP transport opened"
340 );
341
342 let filter_engine = super::filter::TransportFilterEngine::new(
345 &config.filters_in,
346 &config.filters_out,
347 &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
348 )?;
349
350 let closed = Arc::new(AtomicBool::new(false));
351
352 #[cfg(feature = "health")]
353 {
354 let h = Arc::clone(&closed);
355 crate::health::HealthRegistry::register("transport:http", move || {
356 if h.load(Ordering::Relaxed) {
357 crate::health::HealthStatus::Unhealthy
358 } else {
359 crate::health::HealthStatus::Healthy
360 }
361 });
362 }
363
364 Ok(Self {
365 client,
366 endpoint: config.endpoint.clone(),
367 #[cfg(feature = "http-server")]
368 receiver,
369 #[cfg(feature = "http-server")]
370 shutdown_tx: parking_lot::Mutex::new(shutdown_tx),
371 #[cfg(feature = "http-server")]
372 _server_handle: server_handle,
373 closed,
374 #[cfg(feature = "http-server")]
375 recv_timeout_ms: config.recv_timeout_ms,
376 filter_engine,
377 })
378 }
379}
380
381#[cfg(feature = "http-server")]
383fn build_receiver_router(
384 sender: tokio::sync::mpsc::Sender<Message<HttpToken>>,
385 sequence: Arc<AtomicU64>,
386 recv_path: &str,
387 max_body_bytes: usize,
388) -> axum::Router {
389 use axum::routing::post;
390
391 let state = ReceiverState { sender, sequence };
392
393 axum::Router::new()
394 .route(recv_path, post(ingest_handler))
395 .layer(axum::extract::DefaultBodyLimit::max(max_body_bytes))
397 .with_state(state)
398}
399
400#[cfg(feature = "http-server")]
402#[derive(Clone)]
403struct ReceiverState {
404 sender: tokio::sync::mpsc::Sender<Message<HttpToken>>,
405 sequence: Arc<AtomicU64>,
406}
407
408#[cfg(feature = "http-server")]
410async fn ingest_handler(
411 axum::extract::State(state): axum::extract::State<ReceiverState>,
412 axum::extract::ConnectInfo(addr): axum::extract::ConnectInfo<std::net::SocketAddr>,
413 headers: axum::http::HeaderMap,
414 body: axum::body::Bytes,
415) -> axum::http::StatusCode {
416 if body.is_empty() {
417 return axum::http::StatusCode::BAD_REQUEST;
418 }
419
420 #[cfg(feature = "transport-trace")]
422 if let Some(tp) = headers
423 .get(super::propagation::TRACEPARENT_HEADER)
424 .and_then(|v| v.to_str().ok())
425 && super::propagation::is_valid_traceparent(tp)
426 {
427 tracing::Span::current().record("traceparent", tp);
428 }
429
430 #[cfg(not(feature = "otel"))]
432 let _ = &headers;
433
434 let seq = state.sequence.fetch_add(1, Ordering::Relaxed);
435 let format = PayloadFormat::detect(&body);
436 let timestamp_ms = chrono::Utc::now().timestamp_millis();
437
438 let msg = Message {
439 key: None,
440 payload: body.to_vec(),
441 token: HttpToken::with_source(seq, addr.to_string()),
442 timestamp_ms: Some(timestamp_ms),
443 format,
444 };
445
446 match state.sender.try_send(msg) {
447 Ok(()) => {
448 #[cfg(feature = "metrics")]
449 metrics::counter!("dfe_transport_sent_total", "transport" => "http").increment(1);
450 axum::http::StatusCode::OK
451 }
452 Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
453 #[cfg(feature = "metrics")]
454 metrics::counter!("dfe_transport_backpressured_total", "transport" => "http")
455 .increment(1);
456 axum::http::StatusCode::SERVICE_UNAVAILABLE
457 }
458 Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
459 #[cfg(feature = "metrics")]
460 metrics::counter!("dfe_transport_refused_total", "transport" => "http").increment(1);
461 axum::http::StatusCode::GONE
462 }
463 }
464}
465
466impl TransportBase for HttpTransport {
467 async fn close(&self) -> TransportResult<()> {
468 self.closed.store(true, Ordering::Relaxed);
469 #[cfg(feature = "http-server")]
472 if let Some(tx) = self.shutdown_tx.lock().take() {
473 let _ = tx.send(());
474 }
475 Ok(())
476 }
477
478 fn is_healthy(&self) -> bool {
479 !self.closed.load(Ordering::Relaxed)
480 }
481
482 fn name(&self) -> &'static str {
483 "http"
484 }
485}
486
487impl TransportSender for HttpTransport {
488 async fn send(&self, key: &str, payload: bytes::Bytes) -> SendResult {
489 if self.closed.load(Ordering::Relaxed) {
490 return SendResult::Fatal(TransportError::Closed);
491 }
492
493 if self.filter_engine.has_outbound_filters() {
495 match self.filter_engine.apply_outbound(&payload) {
496 super::filter::FilterDisposition::Pass => {}
497 super::filter::FilterDisposition::Drop => return SendResult::Ok,
498 super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
499 }
500 }
501
502 let Some(base_url) = &self.endpoint else {
503 return SendResult::Fatal(TransportError::Config(
504 "no endpoint configured for sending".into(),
505 ));
506 };
507
508 let url = if key.is_empty() {
510 base_url.clone()
511 } else {
512 let base = base_url.trim_end_matches('/');
513 let suffix = key.trim_start_matches('/');
514 format!("{base}/{suffix}")
515 };
516
517 #[cfg(feature = "metrics")]
518 let start = std::time::Instant::now();
519
520 let request_builder = self
522 .client
523 .post(&url)
524 .header("content-type", "application/octet-stream");
525
526 #[cfg(feature = "transport-trace")]
527 let request_builder = if let Some(tp) = super::propagation::current_traceparent() {
528 request_builder.header(super::propagation::TRACEPARENT_HEADER, tp)
529 } else {
530 request_builder
531 };
532
533 #[cfg(feature = "logger")]
534 let payload_len = payload.len();
535 let result = match request_builder.body(payload).send().await {
536 Ok(resp) if resp.status().is_success() => {
537 #[cfg(feature = "logger")]
538 tracing::debug!(url = %url, bytes = payload_len, "HTTP transport: POST sent");
539
540 #[cfg(feature = "metrics")]
541 metrics::counter!("dfe_transport_sent_total", "transport" => "http").increment(1);
542 SendResult::Ok
543 }
544 Ok(resp)
545 if resp.status() == reqwest::StatusCode::TOO_MANY_REQUESTS
546 || resp.status() == reqwest::StatusCode::SERVICE_UNAVAILABLE =>
547 {
548 #[cfg(feature = "logger")]
549 tracing::warn!(status = %resp.status(), url = %url, "HTTP transport: backpressure");
550
551 #[cfg(feature = "metrics")]
552 metrics::counter!("dfe_transport_backpressured_total", "transport" => "http")
553 .increment(1);
554 SendResult::Backpressured
555 }
556 Ok(resp) => {
557 #[cfg(feature = "logger")]
558 tracing::warn!(status = %resp.status(), url = %url, "HTTP transport: send error");
559
560 #[cfg(feature = "metrics")]
561 metrics::counter!("dfe_transport_send_errors_total", "transport" => "http")
562 .increment(1);
563 SendResult::Fatal(TransportError::Send(format!(
564 "HTTP {} from {}",
565 resp.status(),
566 url
567 )))
568 }
569 Err(e) => {
570 #[cfg(feature = "logger")]
571 tracing::warn!(error = %e, url = %url, "HTTP transport: request failed");
572
573 #[cfg(feature = "metrics")]
574 metrics::counter!("dfe_transport_send_errors_total", "transport" => "http")
575 .increment(1);
576 SendResult::Fatal(TransportError::Send(format!("HTTP request failed: {e}")))
577 }
578 };
579
580 #[cfg(feature = "metrics")]
581 metrics::histogram!("dfe_transport_send_duration_seconds", "transport" => "http")
582 .record(start.elapsed().as_secs_f64());
583
584 result
585 }
586}
587
588impl TransportReceiver for HttpTransport {
589 type Token = HttpToken;
590
591 async fn recv(&self, max: usize) -> TransportResult<RecvBatch<Self::Token>> {
592 if self.closed.load(Ordering::Relaxed) {
593 return Err(TransportError::Closed);
594 }
595
596 #[cfg(feature = "http-server")]
597 {
598 let Some(receiver) = &self.receiver else {
599 return Err(TransportError::Config(
600 "no listen address configured for receiving".into(),
601 ));
602 };
603
604 let mut rx = receiver.lock().await;
605 let mut messages = Vec::with_capacity(max.min(100));
606
607 for _ in 0..max {
608 let result = if self.recv_timeout_ms == 0 {
609 match rx.try_recv() {
610 Ok(msg) => Some(msg),
611 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
612 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
613 return Err(TransportError::Closed);
614 }
615 }
616 } else if messages.is_empty() {
617 match tokio::time::timeout(
619 std::time::Duration::from_millis(self.recv_timeout_ms),
620 rx.recv(),
621 )
622 .await
623 {
624 Ok(Some(msg)) => Some(msg),
625 Ok(None) => return Err(TransportError::Closed),
626 Err(_) => break, }
628 } else {
629 match rx.try_recv() {
631 Ok(msg) => Some(msg),
632 Err(_) => break,
633 }
634 };
635
636 if let Some(msg) = result {
637 messages.push(msg);
638 }
639 }
640
641 let batch = self.filter_engine.partition_batch(
644 messages,
645 |m| m.payload.as_slice(),
646 |m| m.key.clone(),
647 );
648 let messages = batch.messages;
649 let dlq_entries = batch.dlq_entries;
650
651 #[cfg(feature = "logger")]
652 if !messages.is_empty() {
653 tracing::debug!(messages = messages.len(), "HTTP transport: batch received");
654 }
655
656 Ok(RecvBatch {
657 messages,
658 dlq_entries,
659 })
660 }
661
662 #[cfg(not(feature = "http-server"))]
663 {
664 let _ = max;
665 Err(TransportError::Config(
666 "HTTP receive requires the 'http-server' feature".into(),
667 ))
668 }
669 }
670
671 async fn commit(&self, _tokens: &[Self::Token]) -> TransportResult<()> {
672 Ok(())
674 }
675}
676
677impl Drop for HttpTransport {
678 fn drop(&mut self) {
679 #[cfg(feature = "http-server")]
680 if let Some(tx) = self.shutdown_tx.lock().take() {
681 let _ = tx.send(());
682 }
683 }
684}
685
686impl super::traits::FromCascade for HttpTransportConfig {}
687
688#[cfg(test)]
689mod tests {
690 use super::*;
691
692 #[test]
693 fn http_token_display() {
694 let token = HttpToken::new(42);
695 assert_eq!(format!("{token}"), "http:42");
696 }
697
698 #[test]
699 fn http_token_display_with_source() {
700 let token = HttpToken::with_source(7, "192.168.1.1:54321".to_string());
701 assert_eq!(format!("{token}"), "http:192.168.1.1:54321:7");
702 }
703
704 #[test]
705 fn config_defaults() {
706 let config = HttpTransportConfig::default();
707 assert!(config.endpoint.is_none());
708 assert!(config.listen.is_none());
709 assert_eq!(config.recv_path, "/ingest");
710 assert_eq!(config.recv_buffer_size, 10_000);
711 assert_eq!(config.recv_timeout_ms, 100);
712 }
713
714 #[test]
715 fn config_sender_helper() {
716 let config = HttpTransportConfig::sender("http://localhost:8080/ingest");
717 assert_eq!(
718 config.endpoint.as_deref(),
719 Some("http://localhost:8080/ingest")
720 );
721 assert!(config.listen.is_none());
722 }
723
724 #[test]
725 fn config_receiver_helper() {
726 let config = HttpTransportConfig::receiver("0.0.0.0:8080");
727 assert!(config.endpoint.is_none());
728 assert_eq!(config.listen.as_deref(), Some("0.0.0.0:8080"));
729 }
730
731 #[tokio::test]
732 async fn send_only_transport() {
733 let config = HttpTransportConfig::default();
735 let transport = HttpTransport::new(&config).await.unwrap();
736
737 assert!(transport.is_healthy());
738 assert_eq!(transport.name(), "http");
739
740 let result = transport
742 .send("test", bytes::Bytes::from_static(b"payload"))
743 .await;
744 assert!(result.is_fatal());
745
746 transport.commit(&[]).await.unwrap();
748 }
749
750 #[tokio::test]
751 async fn close_prevents_send() {
752 let config = HttpTransportConfig::sender("http://localhost:19999/test");
753 let transport = HttpTransport::new(&config).await.unwrap();
754
755 transport.close().await.unwrap();
756 assert!(!transport.is_healthy());
757
758 let result = transport
759 .send("test", bytes::Bytes::from_static(b"data"))
760 .await;
761 assert!(result.is_fatal());
762 }
763
764 #[tokio::test]
765 async fn close_prevents_recv() {
766 let config = HttpTransportConfig::default();
767 let transport = HttpTransport::new(&config).await.unwrap();
768
769 transport.close().await.unwrap();
770 let result = transport.recv(1).await;
771 assert!(result.is_err());
772 }
773
774 #[cfg(feature = "http-server")]
777 #[tokio::test]
778 async fn send_and_receive_roundtrip() {
779 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
781 let addr = listener.local_addr().unwrap();
782 drop(listener); let recv_config = HttpTransportConfig {
785 listen: Some(addr.to_string()),
786 recv_path: "/ingest".to_string(),
787 recv_buffer_size: 100,
788 recv_timeout_ms: 1000,
789 ..Default::default()
790 };
791 let receiver = HttpTransport::new(&recv_config).await.unwrap();
792
793 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
795
796 let send_config =
798 HttpTransportConfig::sender(&format!("http://127.0.0.1:{}/ingest", addr.port()));
799 let sender = HttpTransport::new(&send_config).await.unwrap();
800
801 let result = sender
802 .send("", bytes::Bytes::from_static(b"{\"msg\":\"hello\"}"))
803 .await;
804 assert!(result.is_ok(), "send failed: {result:?}");
805
806 let messages = receiver.recv(10).await.unwrap().messages;
808 assert_eq!(messages.len(), 1);
809 assert_eq!(messages[0].payload, b"{\"msg\":\"hello\"}");
810 assert!(messages[0].token.source_addr.is_some());
811
812 sender.close().await.unwrap();
814 receiver.close().await.unwrap();
815 }
816
817 #[cfg(feature = "http-server")]
819 #[tokio::test]
820 async fn receive_rejects_empty_body() {
821 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
822 let addr = listener.local_addr().unwrap();
823 drop(listener);
824
825 let recv_config = HttpTransportConfig {
826 listen: Some(addr.to_string()),
827 recv_timeout_ms: 200,
828 ..Default::default()
829 };
830 let receiver = HttpTransport::new(&recv_config).await.unwrap();
831 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
832
833 let client = reqwest::Client::new();
835 let resp = client
836 .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
837 .body(Vec::<u8>::new())
838 .send()
839 .await
840 .unwrap();
841
842 assert_eq!(resp.status(), reqwest::StatusCode::BAD_REQUEST);
843
844 let messages = receiver.recv(10).await.unwrap().messages;
846 assert!(messages.is_empty());
847
848 receiver.close().await.unwrap();
849 }
850
851 #[cfg(feature = "http-server")]
853 #[tokio::test]
854 async fn receive_rejects_oversized_body() {
855 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
856 let addr = listener.local_addr().unwrap();
857 drop(listener);
858
859 let recv_config = HttpTransportConfig {
860 listen: Some(addr.to_string()),
861 recv_timeout_ms: 200,
862 max_body_bytes: 1024, ..Default::default()
864 };
865 let receiver = HttpTransport::new(&recv_config).await.unwrap();
866 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
867
868 let client = reqwest::Client::new();
870 let resp = client
871 .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
872 .body(vec![b'x'; 8 * 1024])
873 .send()
874 .await
875 .unwrap();
876
877 assert_eq!(resp.status(), reqwest::StatusCode::PAYLOAD_TOO_LARGE);
878
879 let messages = receiver.recv(10).await.unwrap().messages;
881 assert!(messages.is_empty(), "oversized body must not be queued");
882
883 receiver.close().await.unwrap();
884 }
885
886 #[cfg(feature = "http-server")]
888 #[tokio::test]
889 async fn recv_without_listen_returns_error() {
890 let config = HttpTransportConfig::sender("http://localhost:9999");
891 let transport = HttpTransport::new(&config).await.unwrap();
892
893 let result = transport.recv(10).await;
894 assert!(result.is_err());
895 }
896
897 #[test]
898 fn config_serde_roundtrip() {
899 let config = HttpTransportConfig {
900 endpoint: Some("http://example.com/ingest".into()),
901 listen: Some("0.0.0.0:8080".into()),
902 recv_path: "/custom".into(),
903 recv_buffer_size: 5000,
904 recv_timeout_ms: 250,
905 ..Default::default()
906 };
907
908 let json = serde_json::to_string(&config).unwrap();
909 let parsed: HttpTransportConfig = serde_json::from_str(&json).unwrap();
910
911 assert_eq!(parsed.endpoint, config.endpoint);
912 assert_eq!(parsed.listen, config.listen);
913 assert_eq!(parsed.recv_path, config.recv_path);
914 assert_eq!(parsed.recv_buffer_size, config.recv_buffer_size);
915 assert_eq!(parsed.recv_timeout_ms, config.recv_timeout_ms);
916 }
917}