Skip to main content

hyperi_rustlib/transport/
http.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/http.rs
3// Purpose:   HTTP/HTTPS transport (send via POST, receive via embedded server)
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # HTTP Transport
10//!
11//! HTTP/HTTPS transport for webhook delivery and REST ingest.
12//!
13//! ## Send
14//!
15//! POSTs payload bytes to `{endpoint}/{key}` using reqwest.
16//!
17//! ## Receive (requires `http-server` feature)
18//!
19//! Starts an embedded axum HTTP server that accepts POST requests on a
20//! configurable path. Incoming payloads are queued into a bounded
21//! `tokio::sync::mpsc` channel. `recv()` drains from this channel.
22//!
23//! ## Example
24//!
25//! ```rust,ignore
26//! use hyperi_rustlib::transport::http::{HttpTransport, HttpTransportConfig};
27//!
28//! // Send-only
29//! let config = HttpTransportConfig {
30//!     endpoint: Some("http://loader:8080/ingest".into()),
31//!     ..Default::default()
32//! };
33//! let transport = HttpTransport::new(&config).await?;
34//! transport.send("events", bytes::Bytes::from_static(b"{\"msg\":\"hello\"}")).await;
35//! ```
36
37use super::error::{TransportError, TransportResult};
38use super::traits::{CommitToken, RecvBatch, TransportBase, TransportReceiver, TransportSender};
39#[cfg(feature = "http-server")]
40use super::types::Message;
41#[cfg(feature = "http-server")]
42use super::types::PayloadFormat;
43use super::types::SendResult;
44use serde::{Deserialize, Serialize};
45use std::sync::Arc;
46#[cfg(feature = "http-server")]
47use std::sync::atomic::AtomicU64;
48use std::sync::atomic::{AtomicBool, Ordering};
49
50/// Commit token for HTTP transport.
51///
52/// HTTP is fire-and-forget from the receiver's perspective, so commit
53/// is a no-op. The token provides sequence tracking and optional
54/// client address for observability.
55#[derive(Debug, Clone)]
56pub struct HttpToken {
57    /// Local sequence number (monotonically increasing per transport instance).
58    pub seq: u64,
59
60    /// Source client address (if available from the HTTP request).
61    pub source_addr: Option<String>,
62}
63
64impl HttpToken {
65    /// Create a new token with sequence number.
66    #[must_use]
67    pub fn new(seq: u64) -> Self {
68        Self {
69            seq,
70            source_addr: None,
71        }
72    }
73
74    /// Create a new token with sequence number and source address.
75    #[must_use]
76    pub fn with_source(seq: u64, addr: String) -> Self {
77        Self {
78            seq,
79            source_addr: Some(addr),
80        }
81    }
82}
83
84impl CommitToken for HttpToken {}
85
86impl std::fmt::Display for HttpToken {
87    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        match &self.source_addr {
89            Some(addr) => write!(f, "http:{}:{}", addr, self.seq),
90            None => write!(f, "http:{}", self.seq),
91        }
92    }
93}
94
95fn default_recv_path() -> String {
96    "/ingest".to_string()
97}
98
99fn default_buffer_size() -> usize {
100    10_000
101}
102
103fn default_recv_timeout_ms() -> u64 {
104    100
105}
106
107fn default_connect_timeout_ms() -> u64 {
108    5_000
109}
110
111fn default_send_timeout_ms() -> u64 {
112    30_000
113}
114
115fn default_max_body_bytes() -> usize {
116    16 * 1024 * 1024
117}
118
119/// Configuration for HTTP transport.
120#[derive(Debug, Clone, Serialize, Deserialize)]
121pub struct HttpTransportConfig {
122    /// Endpoint URL for sending (e.g., "http://loader:8080/ingest"). None = send disabled.
123    #[serde(default)]
124    pub endpoint: Option<String>,
125
126    /// Listen address for receiving (e.g., "0.0.0.0:8080"). None = receive disabled.
127    /// Requires the `http-server` feature.
128    #[serde(default)]
129    pub listen: Option<String>,
130
131    /// Path to accept POSTs on for receive mode. Default: "/ingest".
132    #[serde(default = "default_recv_path")]
133    pub recv_path: String,
134
135    /// Receive buffer size (bounded channel capacity). Default: 10000.
136    #[serde(default = "default_buffer_size")]
137    pub recv_buffer_size: usize,
138
139    /// Receive timeout in milliseconds. Default: 100.
140    #[serde(default = "default_recv_timeout_ms")]
141    pub recv_timeout_ms: u64,
142
143    /// Inbound message filters (applied on recv before caller sees messages).
144    #[serde(default)]
145    pub filters_in: Vec<super::filter::FilterRule>,
146
147    /// Outbound message filters (applied on send before transport dispatches).
148    #[serde(default)]
149    pub filters_out: Vec<super::filter::FilterRule>,
150
151    /// Connect timeout (ms) for the send-side HTTP client. Default 5000.
152    #[serde(default = "default_connect_timeout_ms")]
153    pub connect_timeout_ms: u64,
154
155    /// Total request timeout (ms) per send. Default 30000. Prevents a hung
156    /// send from consuming worker capacity indefinitely.
157    #[serde(default = "default_send_timeout_ms")]
158    pub send_timeout_ms: u64,
159
160    /// Maximum accepted request body size in bytes (receive side). Default
161    /// 16 MiB. Oversized POSTs are rejected with 413 before buffering.
162    #[serde(default = "default_max_body_bytes")]
163    pub max_body_bytes: usize,
164
165    /// Private-CA PEM path for the send-side HTTPS client. When set and the
166    /// `tls` feature is enabled, the client trusts this CA *in addition to* the
167    /// OS native roots, built via the unified `crate::tls` module and fed to
168    /// reqwest with `use_preconfigured_tls`. None = native roots only (reqwest
169    /// default). Maps to `TlsTrust { native_roots: true, extra_roots: [ca] }`.
170    #[serde(default)]
171    pub tls_ca_path: Option<String>,
172}
173
174impl Default for HttpTransportConfig {
175    fn default() -> Self {
176        Self {
177            endpoint: None,
178            listen: None,
179            recv_path: default_recv_path(),
180            recv_buffer_size: default_buffer_size(),
181            recv_timeout_ms: default_recv_timeout_ms(),
182            filters_in: Vec::new(),
183            filters_out: Vec::new(),
184            connect_timeout_ms: default_connect_timeout_ms(),
185            send_timeout_ms: default_send_timeout_ms(),
186            max_body_bytes: default_max_body_bytes(),
187            tls_ca_path: None,
188        }
189    }
190}
191
192impl HttpTransportConfig {
193    /// Load from the config cascade under the `transport.http` key.
194    #[must_use]
195    pub fn from_cascade() -> Self {
196        <Self as super::traits::FromCascade>::from_cascade_key("transport.http")
197    }
198
199    /// Create a send-only config pointing at the given endpoint URL.
200    #[must_use]
201    pub fn sender(endpoint: &str) -> Self {
202        Self {
203            endpoint: Some(endpoint.to_string()),
204            ..Default::default()
205        }
206    }
207
208    /// Create a receive-only config listening on the given address.
209    #[must_use]
210    pub fn receiver(listen: &str) -> Self {
211        Self {
212            listen: Some(listen.to_string()),
213            ..Default::default()
214        }
215    }
216}
217
218/// HTTP/HTTPS transport.
219///
220/// Supports send (POST to endpoint) and receive (embedded axum server).
221/// The receive side requires the `http-server` feature for axum.
222pub struct HttpTransport {
223    /// reqwest client for sending (always available when transport-http is enabled).
224    client: reqwest::Client,
225
226    /// Base URL for sending (None = send disabled).
227    endpoint: Option<String>,
228
229    /// Receiver channel populated by the embedded HTTP server.
230    /// Only available when `http-server` feature is enabled AND `listen` is configured.
231    #[cfg(feature = "http-server")]
232    receiver: Option<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Message<HttpToken>>>>,
233
234    /// Shutdown signal for the server task.
235    ///
236    /// Behind a `parking_lot::Mutex` for interior mutability: `close(&self)`
237    /// fires it to stop the embedded server promptly (graceful shutdown),
238    /// rather than waiting for `Drop`. `take()` makes both paths idempotent.
239    #[cfg(feature = "http-server")]
240    shutdown_tx: parking_lot::Mutex<Option<tokio::sync::oneshot::Sender<()>>>,
241
242    /// Server background task handle.
243    #[cfg(feature = "http-server")]
244    _server_handle: Option<tokio::task::JoinHandle<()>>,
245
246    /// Whether the transport is closed.
247    closed: Arc<AtomicBool>,
248
249    /// Receive timeout in milliseconds (used by receive side).
250    #[cfg(feature = "http-server")]
251    recv_timeout_ms: u64,
252
253    /// Transport-level message filter engine.
254    filter_engine: super::filter::TransportFilterEngine,
255}
256
257impl HttpTransport {
258    /// Create a new HTTP transport.
259    ///
260    /// - Set `config.endpoint` to enable sending (POST to URL).
261    /// - Set `config.listen` to enable receiving (embedded HTTP server, requires `http-server` feature).
262    ///
263    /// # Errors
264    ///
265    /// Returns error if the listen address is invalid or the server fails to bind.
266    pub async fn new(config: &HttpTransportConfig) -> TransportResult<Self> {
267        // `mut` is only used on the TLS path; harmless without the feature.
268        #[cfg_attr(not(feature = "tls"), allow(unused_mut))]
269        let mut client_builder = reqwest::Client::builder()
270            .connect_timeout(std::time::Duration::from_millis(config.connect_timeout_ms))
271            .timeout(std::time::Duration::from_millis(config.send_timeout_ms));
272
273        // Private-CA trust via the unified TLS module (augments native roots).
274        #[cfg(feature = "tls")]
275        if let Some(ref ca) = config.tls_ca_path {
276            let trust = crate::tls::TlsTrust {
277                native_roots: true,
278                webpki_roots: false,
279                extra_roots: vec![ca.into()],
280                extra_intermediates: Vec::new(),
281                exclusive: false,
282            };
283            let tls_cfg =
284                crate::tls::build_client_config(crate::tls::TlsConfigSource::Trust(trust))
285                    .map_err(|e| TransportError::Config(format!("HTTP client TLS: {e}")))?;
286            client_builder = client_builder.use_preconfigured_tls((*tls_cfg).clone());
287        }
288        #[cfg(not(feature = "tls"))]
289        if config.tls_ca_path.is_some() {
290            tracing::warn!(
291                "http transport tls_ca_path is set but the `tls` feature is disabled -- \
292                 ignoring (using reqwest default roots)"
293            );
294        }
295
296        let client = client_builder
297            .build()
298            .map_err(|e| TransportError::Config(format!("failed to create HTTP client: {e}")))?;
299
300        #[cfg(feature = "http-server")]
301        let (receiver, shutdown_tx, server_handle) = if let Some(listen) = &config.listen {
302            let addr: std::net::SocketAddr = listen
303                .parse()
304                .map_err(|e| TransportError::Config(format!("invalid listen address: {e}")))?;
305
306            let (tx, rx) = tokio::sync::mpsc::channel(config.recv_buffer_size);
307            let (sd_tx, sd_rx) = tokio::sync::oneshot::channel::<()>();
308
309            let sequence = Arc::new(AtomicU64::new(0));
310            let recv_path = config.recv_path.clone();
311
312            let app = build_receiver_router(tx, sequence, &recv_path, config.max_body_bytes);
313
314            let listener = tokio::net::TcpListener::bind(addr).await.map_err(|e| {
315                TransportError::Connection(format!("failed to bind to {addr}: {e}"))
316            })?;
317
318            let handle = tokio::spawn(async move {
319                axum::serve(
320                    listener,
321                    app.into_make_service_with_connect_info::<std::net::SocketAddr>(),
322                )
323                .with_graceful_shutdown(async {
324                    sd_rx.await.ok();
325                })
326                .await
327                .ok();
328            });
329
330            (Some(tokio::sync::Mutex::new(rx)), Some(sd_tx), Some(handle))
331        } else {
332            (None, None, None)
333        };
334
335        #[cfg(feature = "logger")]
336        tracing::info!(
337            endpoint = ?config.endpoint,
338            listen = ?config.listen,
339            "HTTP transport opened"
340        );
341
342        // Fail loud on bad filter config -- silently disabling filters
343        // turns a misconfigured `drop` / `dlq` rule into a permanent pass.
344        let filter_engine = super::filter::TransportFilterEngine::new(
345            &config.filters_in,
346            &config.filters_out,
347            &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
348        )?;
349
350        let closed = Arc::new(AtomicBool::new(false));
351
352        #[cfg(feature = "health")]
353        {
354            let h = Arc::clone(&closed);
355            crate::health::HealthRegistry::register("transport:http", move || {
356                if h.load(Ordering::Relaxed) {
357                    crate::health::HealthStatus::Unhealthy
358                } else {
359                    crate::health::HealthStatus::Healthy
360                }
361            });
362        }
363
364        Ok(Self {
365            client,
366            endpoint: config.endpoint.clone(),
367            #[cfg(feature = "http-server")]
368            receiver,
369            #[cfg(feature = "http-server")]
370            shutdown_tx: parking_lot::Mutex::new(shutdown_tx),
371            #[cfg(feature = "http-server")]
372            _server_handle: server_handle,
373            closed,
374            #[cfg(feature = "http-server")]
375            recv_timeout_ms: config.recv_timeout_ms,
376            filter_engine,
377        })
378    }
379}
380
381/// Build the axum router for the receive side.
382#[cfg(feature = "http-server")]
383fn build_receiver_router(
384    sender: tokio::sync::mpsc::Sender<Message<HttpToken>>,
385    sequence: Arc<AtomicU64>,
386    recv_path: &str,
387    max_body_bytes: usize,
388) -> axum::Router {
389    use axum::routing::post;
390
391    let state = ReceiverState { sender, sequence };
392
393    axum::Router::new()
394        .route(recv_path, post(ingest_handler))
395        // Reject oversized bodies with 413 before the handler buffers them.
396        .layer(axum::extract::DefaultBodyLimit::max(max_body_bytes))
397        .with_state(state)
398}
399
400/// Shared state for the receive handler.
401#[cfg(feature = "http-server")]
402#[derive(Clone)]
403struct ReceiverState {
404    sender: tokio::sync::mpsc::Sender<Message<HttpToken>>,
405    sequence: Arc<AtomicU64>,
406}
407
408/// POST handler that accepts raw bytes and queues them into the mpsc channel.
409#[cfg(feature = "http-server")]
410async fn ingest_handler(
411    axum::extract::State(state): axum::extract::State<ReceiverState>,
412    axum::extract::ConnectInfo(addr): axum::extract::ConnectInfo<std::net::SocketAddr>,
413    headers: axum::http::HeaderMap,
414    body: axum::body::Bytes,
415) -> axum::http::StatusCode {
416    if body.is_empty() {
417        return axum::http::StatusCode::BAD_REQUEST;
418    }
419
420    // Extract W3C traceparent from incoming HTTP headers for distributed tracing
421    #[cfg(feature = "transport-trace")]
422    if let Some(tp) = headers
423        .get(super::propagation::TRACEPARENT_HEADER)
424        .and_then(|v| v.to_str().ok())
425        && super::propagation::is_valid_traceparent(tp)
426    {
427        tracing::Span::current().record("traceparent", tp);
428    }
429
430    // Suppress unused variable warning when otel feature is disabled
431    #[cfg(not(feature = "otel"))]
432    let _ = &headers;
433
434    let seq = state.sequence.fetch_add(1, Ordering::Relaxed);
435    let format = PayloadFormat::detect(&body);
436    let timestamp_ms = chrono::Utc::now().timestamp_millis();
437
438    let msg = Message {
439        key: None,
440        payload: body.to_vec(),
441        token: HttpToken::with_source(seq, addr.to_string()),
442        timestamp_ms: Some(timestamp_ms),
443        format,
444    };
445
446    match state.sender.try_send(msg) {
447        Ok(()) => {
448            #[cfg(feature = "metrics")]
449            metrics::counter!("dfe_transport_sent_total", "transport" => "http").increment(1);
450            axum::http::StatusCode::OK
451        }
452        Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
453            #[cfg(feature = "metrics")]
454            metrics::counter!("dfe_transport_backpressured_total", "transport" => "http")
455                .increment(1);
456            axum::http::StatusCode::SERVICE_UNAVAILABLE
457        }
458        Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
459            #[cfg(feature = "metrics")]
460            metrics::counter!("dfe_transport_refused_total", "transport" => "http").increment(1);
461            axum::http::StatusCode::GONE
462        }
463    }
464}
465
466impl TransportBase for HttpTransport {
467    async fn close(&self) -> TransportResult<()> {
468        self.closed.store(true, Ordering::Relaxed);
469        // Stop the embedded server now (graceful shutdown) rather than on Drop.
470        // take() => idempotent: a later close()/drop() is a no-op.
471        #[cfg(feature = "http-server")]
472        if let Some(tx) = self.shutdown_tx.lock().take() {
473            let _ = tx.send(());
474        }
475        Ok(())
476    }
477
478    fn is_healthy(&self) -> bool {
479        !self.closed.load(Ordering::Relaxed)
480    }
481
482    fn name(&self) -> &'static str {
483        "http"
484    }
485}
486
487impl TransportSender for HttpTransport {
488    async fn send(&self, key: &str, payload: bytes::Bytes) -> SendResult {
489        if self.closed.load(Ordering::Relaxed) {
490            return SendResult::Fatal(TransportError::Closed);
491        }
492
493        // Outbound filter check
494        if self.filter_engine.has_outbound_filters() {
495            match self.filter_engine.apply_outbound(&payload) {
496                super::filter::FilterDisposition::Pass => {}
497                super::filter::FilterDisposition::Drop => return SendResult::Ok,
498                super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
499            }
500        }
501
502        let Some(base_url) = &self.endpoint else {
503            return SendResult::Fatal(TransportError::Config(
504                "no endpoint configured for sending".into(),
505            ));
506        };
507
508        // Build URL: {base_url}/{key} if key is non-empty, otherwise just {base_url}
509        let url = if key.is_empty() {
510            base_url.clone()
511        } else {
512            let base = base_url.trim_end_matches('/');
513            let suffix = key.trim_start_matches('/');
514            format!("{base}/{suffix}")
515        };
516
517        #[cfg(feature = "metrics")]
518        let start = std::time::Instant::now();
519
520        // Build request with optional W3C traceparent header for distributed tracing
521        let request_builder = self
522            .client
523            .post(&url)
524            .header("content-type", "application/octet-stream");
525
526        #[cfg(feature = "transport-trace")]
527        let request_builder = if let Some(tp) = super::propagation::current_traceparent() {
528            request_builder.header(super::propagation::TRACEPARENT_HEADER, tp)
529        } else {
530            request_builder
531        };
532
533        #[cfg(feature = "logger")]
534        let payload_len = payload.len();
535        let result = match request_builder.body(payload).send().await {
536            Ok(resp) if resp.status().is_success() => {
537                #[cfg(feature = "logger")]
538                tracing::debug!(url = %url, bytes = payload_len, "HTTP transport: POST sent");
539
540                #[cfg(feature = "metrics")]
541                metrics::counter!("dfe_transport_sent_total", "transport" => "http").increment(1);
542                SendResult::Ok
543            }
544            Ok(resp)
545                if resp.status() == reqwest::StatusCode::TOO_MANY_REQUESTS
546                    || resp.status() == reqwest::StatusCode::SERVICE_UNAVAILABLE =>
547            {
548                #[cfg(feature = "logger")]
549                tracing::warn!(status = %resp.status(), url = %url, "HTTP transport: backpressure");
550
551                #[cfg(feature = "metrics")]
552                metrics::counter!("dfe_transport_backpressured_total", "transport" => "http")
553                    .increment(1);
554                SendResult::Backpressured
555            }
556            Ok(resp) => {
557                #[cfg(feature = "logger")]
558                tracing::warn!(status = %resp.status(), url = %url, "HTTP transport: send error");
559
560                #[cfg(feature = "metrics")]
561                metrics::counter!("dfe_transport_send_errors_total", "transport" => "http")
562                    .increment(1);
563                SendResult::Fatal(TransportError::Send(format!(
564                    "HTTP {} from {}",
565                    resp.status(),
566                    url
567                )))
568            }
569            Err(e) => {
570                #[cfg(feature = "logger")]
571                tracing::warn!(error = %e, url = %url, "HTTP transport: request failed");
572
573                #[cfg(feature = "metrics")]
574                metrics::counter!("dfe_transport_send_errors_total", "transport" => "http")
575                    .increment(1);
576                SendResult::Fatal(TransportError::Send(format!("HTTP request failed: {e}")))
577            }
578        };
579
580        #[cfg(feature = "metrics")]
581        metrics::histogram!("dfe_transport_send_duration_seconds", "transport" => "http")
582            .record(start.elapsed().as_secs_f64());
583
584        result
585    }
586}
587
588impl TransportReceiver for HttpTransport {
589    type Token = HttpToken;
590
591    async fn recv(&self, max: usize) -> TransportResult<RecvBatch<Self::Token>> {
592        if self.closed.load(Ordering::Relaxed) {
593            return Err(TransportError::Closed);
594        }
595
596        #[cfg(feature = "http-server")]
597        {
598            let Some(receiver) = &self.receiver else {
599                return Err(TransportError::Config(
600                    "no listen address configured for receiving".into(),
601                ));
602            };
603
604            let mut rx = receiver.lock().await;
605            let mut messages = Vec::with_capacity(max.min(100));
606
607            for _ in 0..max {
608                let result = if self.recv_timeout_ms == 0 {
609                    match rx.try_recv() {
610                        Ok(msg) => Some(msg),
611                        Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
612                        Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
613                            return Err(TransportError::Closed);
614                        }
615                    }
616                } else if messages.is_empty() {
617                    // First message: wait with timeout
618                    match tokio::time::timeout(
619                        std::time::Duration::from_millis(self.recv_timeout_ms),
620                        rx.recv(),
621                    )
622                    .await
623                    {
624                        Ok(Some(msg)) => Some(msg),
625                        Ok(None) => return Err(TransportError::Closed),
626                        Err(_) => break, // Timeout
627                    }
628                } else {
629                    // Subsequent: non-blocking drain
630                    match rx.try_recv() {
631                        Ok(msg) => Some(msg),
632                        Err(_) => break,
633                    }
634                };
635
636                if let Some(msg) = result {
637                    messages.push(msg);
638                }
639            }
640
641            // Apply inbound filters via the shared partition helper; DLQ
642            // entries are returned in the RecvBatch for the caller to route.
643            let batch = self.filter_engine.partition_batch(
644                messages,
645                |m| m.payload.as_slice(),
646                |m| m.key.clone(),
647            );
648            let messages = batch.messages;
649            let dlq_entries = batch.dlq_entries;
650
651            #[cfg(feature = "logger")]
652            if !messages.is_empty() {
653                tracing::debug!(messages = messages.len(), "HTTP transport: batch received");
654            }
655
656            Ok(RecvBatch {
657                messages,
658                dlq_entries,
659            })
660        }
661
662        #[cfg(not(feature = "http-server"))]
663        {
664            let _ = max;
665            Err(TransportError::Config(
666                "HTTP receive requires the 'http-server' feature".into(),
667            ))
668        }
669    }
670
671    async fn commit(&self, _tokens: &[Self::Token]) -> TransportResult<()> {
672        // HTTP is fire-and-forget -- commit is a no-op.
673        Ok(())
674    }
675}
676
677impl Drop for HttpTransport {
678    fn drop(&mut self) {
679        #[cfg(feature = "http-server")]
680        if let Some(tx) = self.shutdown_tx.lock().take() {
681            let _ = tx.send(());
682        }
683    }
684}
685
686impl super::traits::FromCascade for HttpTransportConfig {}
687
688#[cfg(test)]
689mod tests {
690    use super::*;
691
692    #[test]
693    fn http_token_display() {
694        let token = HttpToken::new(42);
695        assert_eq!(format!("{token}"), "http:42");
696    }
697
698    #[test]
699    fn http_token_display_with_source() {
700        let token = HttpToken::with_source(7, "192.168.1.1:54321".to_string());
701        assert_eq!(format!("{token}"), "http:192.168.1.1:54321:7");
702    }
703
704    #[test]
705    fn config_defaults() {
706        let config = HttpTransportConfig::default();
707        assert!(config.endpoint.is_none());
708        assert!(config.listen.is_none());
709        assert_eq!(config.recv_path, "/ingest");
710        assert_eq!(config.recv_buffer_size, 10_000);
711        assert_eq!(config.recv_timeout_ms, 100);
712    }
713
714    #[test]
715    fn config_sender_helper() {
716        let config = HttpTransportConfig::sender("http://localhost:8080/ingest");
717        assert_eq!(
718            config.endpoint.as_deref(),
719            Some("http://localhost:8080/ingest")
720        );
721        assert!(config.listen.is_none());
722    }
723
724    #[test]
725    fn config_receiver_helper() {
726        let config = HttpTransportConfig::receiver("0.0.0.0:8080");
727        assert!(config.endpoint.is_none());
728        assert_eq!(config.listen.as_deref(), Some("0.0.0.0:8080"));
729    }
730
731    #[tokio::test]
732    async fn send_only_transport() {
733        // Send-only config (no endpoint = send disabled, but transport creates fine)
734        let config = HttpTransportConfig::default();
735        let transport = HttpTransport::new(&config).await.unwrap();
736
737        assert!(transport.is_healthy());
738        assert_eq!(transport.name(), "http");
739
740        // Send without endpoint should fail
741        let result = transport
742            .send("test", bytes::Bytes::from_static(b"payload"))
743            .await;
744        assert!(result.is_fatal());
745
746        // Commit is always ok
747        transport.commit(&[]).await.unwrap();
748    }
749
750    #[tokio::test]
751    async fn close_prevents_send() {
752        let config = HttpTransportConfig::sender("http://localhost:19999/test");
753        let transport = HttpTransport::new(&config).await.unwrap();
754
755        transport.close().await.unwrap();
756        assert!(!transport.is_healthy());
757
758        let result = transport
759            .send("test", bytes::Bytes::from_static(b"data"))
760            .await;
761        assert!(result.is_fatal());
762    }
763
764    #[tokio::test]
765    async fn close_prevents_recv() {
766        let config = HttpTransportConfig::default();
767        let transport = HttpTransport::new(&config).await.unwrap();
768
769        transport.close().await.unwrap();
770        let result = transport.recv(1).await;
771        assert!(result.is_err());
772    }
773
774    /// Full send + receive round-trip test.
775    /// Requires both `transport-http` and `http-server` features.
776    #[cfg(feature = "http-server")]
777    #[tokio::test]
778    async fn send_and_receive_roundtrip() {
779        // Start receiver on a random available port
780        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
781        let addr = listener.local_addr().unwrap();
782        drop(listener); // Free the port for the transport to bind
783
784        let recv_config = HttpTransportConfig {
785            listen: Some(addr.to_string()),
786            recv_path: "/ingest".to_string(),
787            recv_buffer_size: 100,
788            recv_timeout_ms: 1000,
789            ..Default::default()
790        };
791        let receiver = HttpTransport::new(&recv_config).await.unwrap();
792
793        // Give the server a moment to start
794        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
795
796        // Send a message via a separate sender transport
797        let send_config =
798            HttpTransportConfig::sender(&format!("http://127.0.0.1:{}/ingest", addr.port()));
799        let sender = HttpTransport::new(&send_config).await.unwrap();
800
801        let result = sender
802            .send("", bytes::Bytes::from_static(b"{\"msg\":\"hello\"}"))
803            .await;
804        assert!(result.is_ok(), "send failed: {result:?}");
805
806        // Receive it
807        let messages = receiver.recv(10).await.unwrap().messages;
808        assert_eq!(messages.len(), 1);
809        assert_eq!(messages[0].payload, b"{\"msg\":\"hello\"}");
810        assert!(messages[0].token.source_addr.is_some());
811
812        // Cleanup
813        sender.close().await.unwrap();
814        receiver.close().await.unwrap();
815    }
816
817    /// Test that the receiver rejects empty bodies.
818    #[cfg(feature = "http-server")]
819    #[tokio::test]
820    async fn receive_rejects_empty_body() {
821        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
822        let addr = listener.local_addr().unwrap();
823        drop(listener);
824
825        let recv_config = HttpTransportConfig {
826            listen: Some(addr.to_string()),
827            recv_timeout_ms: 200,
828            ..Default::default()
829        };
830        let receiver = HttpTransport::new(&recv_config).await.unwrap();
831        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
832
833        // Send empty body
834        let client = reqwest::Client::new();
835        let resp = client
836            .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
837            .body(Vec::<u8>::new())
838            .send()
839            .await
840            .unwrap();
841
842        assert_eq!(resp.status(), reqwest::StatusCode::BAD_REQUEST);
843
844        // recv should timeout with no messages
845        let messages = receiver.recv(10).await.unwrap().messages;
846        assert!(messages.is_empty());
847
848        receiver.close().await.unwrap();
849    }
850
851    /// Oversized bodies are rejected with 413 before the handler buffers them.
852    #[cfg(feature = "http-server")]
853    #[tokio::test]
854    async fn receive_rejects_oversized_body() {
855        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
856        let addr = listener.local_addr().unwrap();
857        drop(listener);
858
859        let recv_config = HttpTransportConfig {
860            listen: Some(addr.to_string()),
861            recv_timeout_ms: 200,
862            max_body_bytes: 1024, // 1 KiB cap
863            ..Default::default()
864        };
865        let receiver = HttpTransport::new(&recv_config).await.unwrap();
866        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
867
868        // POST 8 KiB -- over the 1 KiB cap.
869        let client = reqwest::Client::new();
870        let resp = client
871            .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
872            .body(vec![b'x'; 8 * 1024])
873            .send()
874            .await
875            .unwrap();
876
877        assert_eq!(resp.status(), reqwest::StatusCode::PAYLOAD_TOO_LARGE);
878
879        // The oversized POST never reached the queue.
880        let messages = receiver.recv(10).await.unwrap().messages;
881        assert!(messages.is_empty(), "oversized body must not be queued");
882
883        receiver.close().await.unwrap();
884    }
885
886    /// Test recv without listen returns config error.
887    #[cfg(feature = "http-server")]
888    #[tokio::test]
889    async fn recv_without_listen_returns_error() {
890        let config = HttpTransportConfig::sender("http://localhost:9999");
891        let transport = HttpTransport::new(&config).await.unwrap();
892
893        let result = transport.recv(10).await;
894        assert!(result.is_err());
895    }
896
897    #[test]
898    fn config_serde_roundtrip() {
899        let config = HttpTransportConfig {
900            endpoint: Some("http://example.com/ingest".into()),
901            listen: Some("0.0.0.0:8080".into()),
902            recv_path: "/custom".into(),
903            recv_buffer_size: 5000,
904            recv_timeout_ms: 250,
905            ..Default::default()
906        };
907
908        let json = serde_json::to_string(&config).unwrap();
909        let parsed: HttpTransportConfig = serde_json::from_str(&json).unwrap();
910
911        assert_eq!(parsed.endpoint, config.endpoint);
912        assert_eq!(parsed.listen, config.listen);
913        assert_eq!(parsed.recv_path, config.recv_path);
914        assert_eq!(parsed.recv_buffer_size, config.recv_buffer_size);
915        assert_eq!(parsed.recv_timeout_ms, config.recv_timeout_ms);
916    }
917}