Skip to main content

hyperi_rustlib/transport/
http.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/http.rs
3// Purpose:   HTTP/HTTPS transport (send via POST, receive via embedded server)
4// Language:  Rust
5//
6// License:   FSL-1.1-ALv2
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # HTTP Transport
10//!
11//! HTTP/HTTPS transport for webhook delivery and REST ingest.
12//!
13//! ## Send
14//!
15//! POSTs payload bytes to `{endpoint}/{key}` using reqwest.
16//!
17//! ## Receive (requires `http-server` feature)
18//!
19//! Starts an embedded axum HTTP server that accepts POST requests on a
20//! configurable path. Incoming payloads are queued into a bounded
21//! `tokio::sync::mpsc` channel. `recv()` drains from this channel.
22//!
23//! ## Example
24//!
25//! ```rust,ignore
26//! use hyperi_rustlib::transport::http::{HttpTransport, HttpTransportConfig};
27//!
28//! // Send-only
29//! let config = HttpTransportConfig {
30//!     endpoint: Some("http://loader:8080/ingest".into()),
31//!     ..Default::default()
32//! };
33//! let transport = HttpTransport::new(&config).await?;
34//! transport.send("events", b"{\"msg\":\"hello\"}").await;
35//! ```
36
37use super::error::{TransportError, TransportResult};
38use super::traits::{CommitToken, TransportBase, TransportReceiver, TransportSender};
39#[cfg(feature = "http-server")]
40use super::types::PayloadFormat;
41use super::types::{Message, SendResult};
42use serde::{Deserialize, Serialize};
43use std::sync::Arc;
44#[cfg(feature = "http-server")]
45use std::sync::atomic::AtomicU64;
46use std::sync::atomic::{AtomicBool, Ordering};
47
48/// Commit token for HTTP transport.
49///
50/// HTTP is fire-and-forget from the receiver's perspective, so commit
51/// is a no-op. The token provides sequence tracking and optional
52/// client address for observability.
53#[derive(Debug, Clone)]
54pub struct HttpToken {
55    /// Local sequence number (monotonically increasing per transport instance).
56    pub seq: u64,
57
58    /// Source client address (if available from the HTTP request).
59    pub source_addr: Option<String>,
60}
61
62impl HttpToken {
63    /// Create a new token with sequence number.
64    #[must_use]
65    pub fn new(seq: u64) -> Self {
66        Self {
67            seq,
68            source_addr: None,
69        }
70    }
71
72    /// Create a new token with sequence number and source address.
73    #[must_use]
74    pub fn with_source(seq: u64, addr: String) -> Self {
75        Self {
76            seq,
77            source_addr: Some(addr),
78        }
79    }
80}
81
82impl CommitToken for HttpToken {}
83
84impl std::fmt::Display for HttpToken {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        match &self.source_addr {
87            Some(addr) => write!(f, "http:{}:{}", addr, self.seq),
88            None => write!(f, "http:{}", self.seq),
89        }
90    }
91}
92
93fn default_recv_path() -> String {
94    "/ingest".to_string()
95}
96
97fn default_buffer_size() -> usize {
98    10_000
99}
100
101fn default_recv_timeout_ms() -> u64 {
102    100
103}
104
105/// Configuration for HTTP transport.
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct HttpTransportConfig {
108    /// Endpoint URL for sending (e.g., "http://loader:8080/ingest"). None = send disabled.
109    #[serde(default)]
110    pub endpoint: Option<String>,
111
112    /// Listen address for receiving (e.g., "0.0.0.0:8080"). None = receive disabled.
113    /// Requires the `http-server` feature.
114    #[serde(default)]
115    pub listen: Option<String>,
116
117    /// Path to accept POSTs on for receive mode. Default: "/ingest".
118    #[serde(default = "default_recv_path")]
119    pub recv_path: String,
120
121    /// Receive buffer size (bounded channel capacity). Default: 10000.
122    #[serde(default = "default_buffer_size")]
123    pub recv_buffer_size: usize,
124
125    /// Receive timeout in milliseconds. Default: 100.
126    #[serde(default = "default_recv_timeout_ms")]
127    pub recv_timeout_ms: u64,
128
129    /// Inbound message filters (applied on recv before caller sees messages).
130    #[serde(default)]
131    pub filters_in: Vec<super::filter::FilterRule>,
132
133    /// Outbound message filters (applied on send before transport dispatches).
134    #[serde(default)]
135    pub filters_out: Vec<super::filter::FilterRule>,
136}
137
138impl Default for HttpTransportConfig {
139    fn default() -> Self {
140        Self {
141            endpoint: None,
142            listen: None,
143            recv_path: default_recv_path(),
144            recv_buffer_size: default_buffer_size(),
145            recv_timeout_ms: default_recv_timeout_ms(),
146            filters_in: Vec::new(),
147            filters_out: Vec::new(),
148        }
149    }
150}
151
152impl HttpTransportConfig {
153    /// Load from the config cascade under the `transport.http` key.
154    #[must_use]
155    pub fn from_cascade() -> Self {
156        #[cfg(feature = "config")]
157        {
158            if let Some(cfg) = crate::config::try_get()
159                && let Ok(tc) = cfg.unmarshal_key_registered::<Self>("transport.http")
160            {
161                return tc;
162            }
163        }
164        Self::default()
165    }
166
167    /// Create a send-only config pointing at the given endpoint URL.
168    #[must_use]
169    pub fn sender(endpoint: &str) -> Self {
170        Self {
171            endpoint: Some(endpoint.to_string()),
172            ..Default::default()
173        }
174    }
175
176    /// Create a receive-only config listening on the given address.
177    #[must_use]
178    pub fn receiver(listen: &str) -> Self {
179        Self {
180            listen: Some(listen.to_string()),
181            ..Default::default()
182        }
183    }
184}
185
186/// HTTP/HTTPS transport.
187///
188/// Supports send (POST to endpoint) and receive (embedded axum server).
189/// The receive side requires the `http-server` feature for axum.
190pub struct HttpTransport {
191    /// reqwest client for sending (always available when transport-http is enabled).
192    client: reqwest::Client,
193
194    /// Base URL for sending (None = send disabled).
195    endpoint: Option<String>,
196
197    /// Receiver channel populated by the embedded HTTP server.
198    /// Only available when `http-server` feature is enabled AND `listen` is configured.
199    #[cfg(feature = "http-server")]
200    receiver: Option<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Message<HttpToken>>>>,
201
202    /// Shutdown signal for the server task.
203    #[cfg(feature = "http-server")]
204    shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
205
206    /// Server background task handle.
207    #[cfg(feature = "http-server")]
208    _server_handle: Option<tokio::task::JoinHandle<()>>,
209
210    /// Whether the transport is closed.
211    closed: Arc<AtomicBool>,
212
213    /// Receive timeout in milliseconds (used by receive side).
214    #[cfg(feature = "http-server")]
215    recv_timeout_ms: u64,
216
217    /// Transport-level message filter engine.
218    filter_engine: super::filter::TransportFilterEngine,
219
220    /// Buffer for messages staged to DLQ by inbound filters.
221    /// Drained by `take_filtered_dlq_entries()`.
222    filtered_dlq_buffer: parking_lot::Mutex<Vec<super::filter::FilteredDlqEntry>>,
223}
224
225impl HttpTransport {
226    /// Create a new HTTP transport.
227    ///
228    /// - Set `config.endpoint` to enable sending (POST to URL).
229    /// - Set `config.listen` to enable receiving (embedded HTTP server, requires `http-server` feature).
230    ///
231    /// # Errors
232    ///
233    /// Returns error if the listen address is invalid or the server fails to bind.
234    pub async fn new(config: &HttpTransportConfig) -> TransportResult<Self> {
235        let client = reqwest::Client::builder()
236            .build()
237            .map_err(|e| TransportError::Config(format!("failed to create HTTP client: {e}")))?;
238
239        #[cfg(feature = "http-server")]
240        let (receiver, shutdown_tx, server_handle) = if let Some(listen) = &config.listen {
241            let addr: std::net::SocketAddr = listen
242                .parse()
243                .map_err(|e| TransportError::Config(format!("invalid listen address: {e}")))?;
244
245            let (tx, rx) = tokio::sync::mpsc::channel(config.recv_buffer_size);
246            let (sd_tx, sd_rx) = tokio::sync::oneshot::channel::<()>();
247
248            let sequence = Arc::new(AtomicU64::new(0));
249            let recv_path = config.recv_path.clone();
250
251            let app = build_receiver_router(tx, sequence, &recv_path);
252
253            let listener = tokio::net::TcpListener::bind(addr).await.map_err(|e| {
254                TransportError::Connection(format!("failed to bind to {addr}: {e}"))
255            })?;
256
257            let handle = tokio::spawn(async move {
258                axum::serve(
259                    listener,
260                    app.into_make_service_with_connect_info::<std::net::SocketAddr>(),
261                )
262                .with_graceful_shutdown(async {
263                    sd_rx.await.ok();
264                })
265                .await
266                .ok();
267            });
268
269            (Some(tokio::sync::Mutex::new(rx)), Some(sd_tx), Some(handle))
270        } else {
271            (None, None, None)
272        };
273
274        #[cfg(feature = "logger")]
275        tracing::info!(
276            endpoint = ?config.endpoint,
277            listen = ?config.listen,
278            "HTTP transport opened"
279        );
280
281        // Fail loud on bad filter config -- silently disabling filters
282        // turns a misconfigured `drop` / `dlq` rule into a permanent pass.
283        let filter_engine = super::filter::TransportFilterEngine::new(
284            &config.filters_in,
285            &config.filters_out,
286            &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
287        )?;
288
289        let closed = Arc::new(AtomicBool::new(false));
290
291        #[cfg(feature = "health")]
292        {
293            let h = Arc::clone(&closed);
294            crate::health::HealthRegistry::register("transport:http", move || {
295                if h.load(Ordering::Relaxed) {
296                    crate::health::HealthStatus::Unhealthy
297                } else {
298                    crate::health::HealthStatus::Healthy
299                }
300            });
301        }
302
303        Ok(Self {
304            client,
305            endpoint: config.endpoint.clone(),
306            #[cfg(feature = "http-server")]
307            receiver,
308            #[cfg(feature = "http-server")]
309            shutdown_tx,
310            #[cfg(feature = "http-server")]
311            _server_handle: server_handle,
312            closed,
313            #[cfg(feature = "http-server")]
314            recv_timeout_ms: config.recv_timeout_ms,
315            filter_engine,
316            filtered_dlq_buffer: parking_lot::Mutex::new(Vec::new()),
317        })
318    }
319}
320
321/// Build the axum router for the receive side.
322#[cfg(feature = "http-server")]
323fn build_receiver_router(
324    sender: tokio::sync::mpsc::Sender<Message<HttpToken>>,
325    sequence: Arc<AtomicU64>,
326    recv_path: &str,
327) -> axum::Router {
328    use axum::routing::post;
329
330    let state = ReceiverState { sender, sequence };
331
332    axum::Router::new()
333        .route(recv_path, post(ingest_handler))
334        .with_state(state)
335}
336
337/// Shared state for the receive handler.
338#[cfg(feature = "http-server")]
339#[derive(Clone)]
340struct ReceiverState {
341    sender: tokio::sync::mpsc::Sender<Message<HttpToken>>,
342    sequence: Arc<AtomicU64>,
343}
344
345/// POST handler that accepts raw bytes and queues them into the mpsc channel.
346#[cfg(feature = "http-server")]
347async fn ingest_handler(
348    axum::extract::State(state): axum::extract::State<ReceiverState>,
349    axum::extract::ConnectInfo(addr): axum::extract::ConnectInfo<std::net::SocketAddr>,
350    headers: axum::http::HeaderMap,
351    body: axum::body::Bytes,
352) -> axum::http::StatusCode {
353    if body.is_empty() {
354        return axum::http::StatusCode::BAD_REQUEST;
355    }
356
357    // Extract W3C traceparent from incoming HTTP headers for distributed tracing
358    #[cfg(feature = "transport-trace")]
359    if let Some(tp) = headers
360        .get(super::propagation::TRACEPARENT_HEADER)
361        .and_then(|v| v.to_str().ok())
362        && super::propagation::is_valid_traceparent(tp)
363    {
364        tracing::Span::current().record("traceparent", tp);
365    }
366
367    // Suppress unused variable warning when otel feature is disabled
368    #[cfg(not(feature = "otel"))]
369    let _ = &headers;
370
371    let seq = state.sequence.fetch_add(1, Ordering::Relaxed);
372    let format = PayloadFormat::detect(&body);
373    let timestamp_ms = chrono::Utc::now().timestamp_millis();
374
375    let msg = Message {
376        key: None,
377        payload: body.to_vec(),
378        token: HttpToken::with_source(seq, addr.to_string()),
379        timestamp_ms: Some(timestamp_ms),
380        format,
381    };
382
383    match state.sender.try_send(msg) {
384        Ok(()) => {
385            #[cfg(feature = "metrics")]
386            metrics::counter!("dfe_transport_sent_total", "transport" => "http").increment(1);
387            axum::http::StatusCode::OK
388        }
389        Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
390            #[cfg(feature = "metrics")]
391            metrics::counter!("dfe_transport_backpressured_total", "transport" => "http")
392                .increment(1);
393            axum::http::StatusCode::SERVICE_UNAVAILABLE
394        }
395        Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
396            #[cfg(feature = "metrics")]
397            metrics::counter!("dfe_transport_refused_total", "transport" => "http").increment(1);
398            axum::http::StatusCode::GONE
399        }
400    }
401}
402
403impl TransportBase for HttpTransport {
404    async fn close(&self) -> TransportResult<()> {
405        self.closed.store(true, Ordering::Relaxed);
406        Ok(())
407    }
408
409    fn is_healthy(&self) -> bool {
410        !self.closed.load(Ordering::Relaxed)
411    }
412
413    fn name(&self) -> &'static str {
414        "http"
415    }
416}
417
418impl TransportSender for HttpTransport {
419    async fn send(&self, key: &str, payload: &[u8]) -> SendResult {
420        if self.closed.load(Ordering::Relaxed) {
421            return SendResult::Fatal(TransportError::Closed);
422        }
423
424        // Outbound filter check
425        if self.filter_engine.has_outbound_filters() {
426            match self.filter_engine.apply_outbound(payload) {
427                super::filter::FilterDisposition::Pass => {}
428                super::filter::FilterDisposition::Drop => return SendResult::Ok,
429                super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
430            }
431        }
432
433        let Some(base_url) = &self.endpoint else {
434            return SendResult::Fatal(TransportError::Config(
435                "no endpoint configured for sending".into(),
436            ));
437        };
438
439        // Build URL: {base_url}/{key} if key is non-empty, otherwise just {base_url}
440        let url = if key.is_empty() {
441            base_url.clone()
442        } else {
443            let base = base_url.trim_end_matches('/');
444            let suffix = key.trim_start_matches('/');
445            format!("{base}/{suffix}")
446        };
447
448        #[cfg(feature = "metrics")]
449        let start = std::time::Instant::now();
450
451        // Build request with optional W3C traceparent header for distributed tracing
452        let request_builder = self
453            .client
454            .post(&url)
455            .header("content-type", "application/octet-stream");
456
457        #[cfg(feature = "transport-trace")]
458        let request_builder = if let Some(tp) = super::propagation::current_traceparent() {
459            request_builder.header(super::propagation::TRACEPARENT_HEADER, tp)
460        } else {
461            request_builder
462        };
463
464        let result = match request_builder.body(payload.to_vec()).send().await {
465            Ok(resp) if resp.status().is_success() => {
466                #[cfg(feature = "logger")]
467                tracing::debug!(url = %url, bytes = payload.len(), "HTTP transport: POST sent");
468
469                #[cfg(feature = "metrics")]
470                metrics::counter!("dfe_transport_sent_total", "transport" => "http").increment(1);
471                SendResult::Ok
472            }
473            Ok(resp)
474                if resp.status() == reqwest::StatusCode::TOO_MANY_REQUESTS
475                    || resp.status() == reqwest::StatusCode::SERVICE_UNAVAILABLE =>
476            {
477                #[cfg(feature = "logger")]
478                tracing::warn!(status = %resp.status(), url = %url, "HTTP transport: backpressure");
479
480                #[cfg(feature = "metrics")]
481                metrics::counter!("dfe_transport_backpressured_total", "transport" => "http")
482                    .increment(1);
483                SendResult::Backpressured
484            }
485            Ok(resp) => {
486                #[cfg(feature = "logger")]
487                tracing::warn!(status = %resp.status(), url = %url, "HTTP transport: send error");
488
489                #[cfg(feature = "metrics")]
490                metrics::counter!("dfe_transport_send_errors_total", "transport" => "http")
491                    .increment(1);
492                SendResult::Fatal(TransportError::Send(format!(
493                    "HTTP {} from {}",
494                    resp.status(),
495                    url
496                )))
497            }
498            Err(e) => {
499                #[cfg(feature = "logger")]
500                tracing::warn!(error = %e, url = %url, "HTTP transport: request failed");
501
502                #[cfg(feature = "metrics")]
503                metrics::counter!("dfe_transport_send_errors_total", "transport" => "http")
504                    .increment(1);
505                SendResult::Fatal(TransportError::Send(format!("HTTP request failed: {e}")))
506            }
507        };
508
509        #[cfg(feature = "metrics")]
510        metrics::histogram!("dfe_transport_send_duration_seconds", "transport" => "http")
511            .record(start.elapsed().as_secs_f64());
512
513        result
514    }
515}
516
517impl TransportReceiver for HttpTransport {
518    type Token = HttpToken;
519
520    async fn recv(&self, max: usize) -> TransportResult<Vec<Message<Self::Token>>> {
521        if self.closed.load(Ordering::Relaxed) {
522            return Err(TransportError::Closed);
523        }
524
525        #[cfg(feature = "http-server")]
526        {
527            let Some(receiver) = &self.receiver else {
528                return Err(TransportError::Config(
529                    "no listen address configured for receiving".into(),
530                ));
531            };
532
533            let mut rx = receiver.lock().await;
534            let mut messages = Vec::with_capacity(max.min(100));
535
536            for _ in 0..max {
537                let result = if self.recv_timeout_ms == 0 {
538                    match rx.try_recv() {
539                        Ok(msg) => Some(msg),
540                        Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
541                        Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
542                            return Err(TransportError::Closed);
543                        }
544                    }
545                } else if messages.is_empty() {
546                    // First message: wait with timeout
547                    match tokio::time::timeout(
548                        std::time::Duration::from_millis(self.recv_timeout_ms),
549                        rx.recv(),
550                    )
551                    .await
552                    {
553                        Ok(Some(msg)) => Some(msg),
554                        Ok(None) => return Err(TransportError::Closed),
555                        Err(_) => break, // Timeout
556                    }
557                } else {
558                    // Subsequent: non-blocking drain
559                    match rx.try_recv() {
560                        Ok(msg) => Some(msg),
561                        Err(_) => break,
562                    }
563                };
564
565                if let Some(msg) = result {
566                    messages.push(msg);
567                }
568            }
569
570            // Apply inbound filters: drop messages, stage DLQ entries
571            if self.filter_engine.has_inbound_filters() {
572                let mut staged_dlq: Vec<super::filter::FilteredDlqEntry> = Vec::new();
573                messages.retain(|msg| match self.filter_engine.apply_inbound(&msg.payload) {
574                    super::filter::FilterDisposition::Pass => true,
575                    super::filter::FilterDisposition::Drop => false,
576                    super::filter::FilterDisposition::Dlq => {
577                        staged_dlq.push(super::filter::FilteredDlqEntry {
578                            payload: msg.payload.clone(),
579                            key: msg.key.clone(),
580                            reason: "transport filter".to_string(),
581                        });
582                        false
583                    }
584                });
585                if !staged_dlq.is_empty() {
586                    self.filtered_dlq_buffer.lock().extend(staged_dlq);
587                }
588            }
589
590            #[cfg(feature = "logger")]
591            if !messages.is_empty() {
592                tracing::debug!(messages = messages.len(), "HTTP transport: batch received");
593            }
594
595            Ok(messages)
596        }
597
598        #[cfg(not(feature = "http-server"))]
599        {
600            let _ = max;
601            Err(TransportError::Config(
602                "HTTP receive requires the 'http-server' feature".into(),
603            ))
604        }
605    }
606
607    fn take_filtered_dlq_entries(&self) -> Vec<super::filter::FilteredDlqEntry> {
608        std::mem::take(&mut *self.filtered_dlq_buffer.lock())
609    }
610
611    async fn commit(&self, _tokens: &[Self::Token]) -> TransportResult<()> {
612        // HTTP is fire-and-forget -- commit is a no-op.
613        Ok(())
614    }
615}
616
617impl Drop for HttpTransport {
618    fn drop(&mut self) {
619        #[cfg(feature = "http-server")]
620        if let Some(tx) = self.shutdown_tx.take() {
621            let _ = tx.send(());
622        }
623    }
624}
625
626#[cfg(test)]
627mod tests {
628    use super::*;
629
630    #[test]
631    fn http_token_display() {
632        let token = HttpToken::new(42);
633        assert_eq!(format!("{token}"), "http:42");
634    }
635
636    #[test]
637    fn http_token_display_with_source() {
638        let token = HttpToken::with_source(7, "192.168.1.1:54321".to_string());
639        assert_eq!(format!("{token}"), "http:192.168.1.1:54321:7");
640    }
641
642    #[test]
643    fn config_defaults() {
644        let config = HttpTransportConfig::default();
645        assert!(config.endpoint.is_none());
646        assert!(config.listen.is_none());
647        assert_eq!(config.recv_path, "/ingest");
648        assert_eq!(config.recv_buffer_size, 10_000);
649        assert_eq!(config.recv_timeout_ms, 100);
650    }
651
652    #[test]
653    fn config_sender_helper() {
654        let config = HttpTransportConfig::sender("http://localhost:8080/ingest");
655        assert_eq!(
656            config.endpoint.as_deref(),
657            Some("http://localhost:8080/ingest")
658        );
659        assert!(config.listen.is_none());
660    }
661
662    #[test]
663    fn config_receiver_helper() {
664        let config = HttpTransportConfig::receiver("0.0.0.0:8080");
665        assert!(config.endpoint.is_none());
666        assert_eq!(config.listen.as_deref(), Some("0.0.0.0:8080"));
667    }
668
669    #[tokio::test]
670    async fn send_only_transport() {
671        // Send-only config (no endpoint = send disabled, but transport creates fine)
672        let config = HttpTransportConfig::default();
673        let transport = HttpTransport::new(&config).await.unwrap();
674
675        assert!(transport.is_healthy());
676        assert_eq!(transport.name(), "http");
677
678        // Send without endpoint should fail
679        let result = transport.send("test", b"payload").await;
680        assert!(result.is_fatal());
681
682        // Commit is always ok
683        transport.commit(&[]).await.unwrap();
684    }
685
686    #[tokio::test]
687    async fn close_prevents_send() {
688        let config = HttpTransportConfig::sender("http://localhost:19999/test");
689        let transport = HttpTransport::new(&config).await.unwrap();
690
691        transport.close().await.unwrap();
692        assert!(!transport.is_healthy());
693
694        let result = transport.send("test", b"data").await;
695        assert!(result.is_fatal());
696    }
697
698    #[tokio::test]
699    async fn close_prevents_recv() {
700        let config = HttpTransportConfig::default();
701        let transport = HttpTransport::new(&config).await.unwrap();
702
703        transport.close().await.unwrap();
704        let result = transport.recv(1).await;
705        assert!(result.is_err());
706    }
707
708    /// Full send + receive round-trip test.
709    /// Requires both `transport-http` and `http-server` features.
710    #[cfg(feature = "http-server")]
711    #[tokio::test]
712    async fn send_and_receive_roundtrip() {
713        // Start receiver on a random available port
714        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
715        let addr = listener.local_addr().unwrap();
716        drop(listener); // Free the port for the transport to bind
717
718        let recv_config = HttpTransportConfig {
719            listen: Some(addr.to_string()),
720            recv_path: "/ingest".to_string(),
721            recv_buffer_size: 100,
722            recv_timeout_ms: 1000,
723            ..Default::default()
724        };
725        let receiver = HttpTransport::new(&recv_config).await.unwrap();
726
727        // Give the server a moment to start
728        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
729
730        // Send a message via a separate sender transport
731        let send_config =
732            HttpTransportConfig::sender(&format!("http://127.0.0.1:{}/ingest", addr.port()));
733        let sender = HttpTransport::new(&send_config).await.unwrap();
734
735        let result = sender.send("", b"{\"msg\":\"hello\"}").await;
736        assert!(result.is_ok(), "send failed: {result:?}");
737
738        // Receive it
739        let messages = receiver.recv(10).await.unwrap();
740        assert_eq!(messages.len(), 1);
741        assert_eq!(messages[0].payload, b"{\"msg\":\"hello\"}");
742        assert!(messages[0].token.source_addr.is_some());
743
744        // Cleanup
745        sender.close().await.unwrap();
746        receiver.close().await.unwrap();
747    }
748
749    /// Test that the receiver rejects empty bodies.
750    #[cfg(feature = "http-server")]
751    #[tokio::test]
752    async fn receive_rejects_empty_body() {
753        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
754        let addr = listener.local_addr().unwrap();
755        drop(listener);
756
757        let recv_config = HttpTransportConfig {
758            listen: Some(addr.to_string()),
759            recv_timeout_ms: 200,
760            ..Default::default()
761        };
762        let receiver = HttpTransport::new(&recv_config).await.unwrap();
763        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
764
765        // Send empty body
766        let client = reqwest::Client::new();
767        let resp = client
768            .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
769            .body(Vec::<u8>::new())
770            .send()
771            .await
772            .unwrap();
773
774        assert_eq!(resp.status(), reqwest::StatusCode::BAD_REQUEST);
775
776        // recv should timeout with no messages
777        let messages = receiver.recv(10).await.unwrap();
778        assert!(messages.is_empty());
779
780        receiver.close().await.unwrap();
781    }
782
783    /// Test recv without listen returns config error.
784    #[cfg(feature = "http-server")]
785    #[tokio::test]
786    async fn recv_without_listen_returns_error() {
787        let config = HttpTransportConfig::sender("http://localhost:9999");
788        let transport = HttpTransport::new(&config).await.unwrap();
789
790        let result = transport.recv(10).await;
791        assert!(result.is_err());
792    }
793
794    #[test]
795    fn config_serde_roundtrip() {
796        let config = HttpTransportConfig {
797            endpoint: Some("http://example.com/ingest".into()),
798            listen: Some("0.0.0.0:8080".into()),
799            recv_path: "/custom".into(),
800            recv_buffer_size: 5000,
801            recv_timeout_ms: 250,
802            ..Default::default()
803        };
804
805        let json = serde_json::to_string(&config).unwrap();
806        let parsed: HttpTransportConfig = serde_json::from_str(&json).unwrap();
807
808        assert_eq!(parsed.endpoint, config.endpoint);
809        assert_eq!(parsed.listen, config.listen);
810        assert_eq!(parsed.recv_path, config.recv_path);
811        assert_eq!(parsed.recv_buffer_size, config.recv_buffer_size);
812        assert_eq!(parsed.recv_timeout_ms, config.recv_timeout_ms);
813    }
814}