Skip to main content

hyperi_rustlib/transport/
http.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/http.rs
3// Purpose:   HTTP/HTTPS transport (send via POST, receive via embedded server)
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # HTTP Transport
10//!
11//! HTTP/HTTPS transport for webhook delivery and REST ingest.
12//!
13//! ## Send
14//!
15//! POSTs payload bytes to `{endpoint}/{key}` using reqwest.
16//!
17//! ## Receive (requires `http-server` feature)
18//!
19//! Starts an embedded axum HTTP server that accepts POST requests on a
20//! configurable path. Incoming payloads are queued into a bounded
21//! `tokio::sync::mpsc` channel. `recv()` drains from this channel.
22//!
23//! ## Example
24//!
25//! ```rust,ignore
26//! use hyperi_rustlib::transport::http::{HttpTransport, HttpTransportConfig};
27//!
28//! // Send-only
29//! let config = HttpTransportConfig {
30//!     endpoint: Some("http://loader:8080/ingest".into()),
31//!     ..Default::default()
32//! };
33//! let transport = HttpTransport::new(&config).await?;
34//! transport.send("events", bytes::Bytes::from_static(b"{\"msg\":\"hello\"}")).await;
35//! ```
36
37use super::error::{TransportError, TransportResult};
38#[cfg(feature = "http-server")]
39use super::traits::RecvBatch;
40use super::traits::{CommitToken, TransportBase, TransportReceiver, TransportSender};
41#[cfg(feature = "http-server")]
42use super::types::Message;
43#[cfg(feature = "http-server")]
44use super::types::PayloadFormat;
45use super::types::SendResult;
46use super::work_batch::WorkBatch;
47use serde::{Deserialize, Serialize};
48use std::sync::Arc;
49#[cfg(feature = "http-server")]
50use std::sync::atomic::AtomicU64;
51use std::sync::atomic::{AtomicBool, Ordering};
52
53/// Commit token for HTTP transport.
54///
55/// HTTP is fire-and-forget from the receiver's perspective, so commit
56/// is a no-op. The token provides sequence tracking and optional
57/// client address for observability.
58#[derive(Debug, Clone)]
59pub struct HttpToken {
60    /// Local sequence number (monotonically increasing per transport instance).
61    pub seq: u64,
62
63    /// Source client address (if available from the HTTP request).
64    pub source_addr: Option<String>,
65}
66
67impl HttpToken {
68    /// Create a new token with sequence number.
69    #[must_use]
70    pub fn new(seq: u64) -> Self {
71        Self {
72            seq,
73            source_addr: None,
74        }
75    }
76
77    /// Create a new token with sequence number and source address.
78    #[must_use]
79    pub fn with_source(seq: u64, addr: String) -> Self {
80        Self {
81            seq,
82            source_addr: Some(addr),
83        }
84    }
85}
86
87impl CommitToken for HttpToken {}
88
89impl std::fmt::Display for HttpToken {
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        match &self.source_addr {
92            Some(addr) => write!(f, "http:{}:{}", addr, self.seq),
93            None => write!(f, "http:{}", self.seq),
94        }
95    }
96}
97
98fn default_recv_path() -> String {
99    "/ingest".to_string()
100}
101
102fn default_buffer_size() -> usize {
103    10_000
104}
105
106fn default_recv_timeout_ms() -> u64 {
107    100
108}
109
110fn default_connect_timeout_ms() -> u64 {
111    5_000
112}
113
114fn default_send_timeout_ms() -> u64 {
115    30_000
116}
117
118fn default_max_body_bytes() -> usize {
119    16 * 1024 * 1024
120}
121
122/// Configuration for HTTP transport.
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct HttpTransportConfig {
125    /// Endpoint URL for sending (e.g., "http://loader:8080/ingest"). None = send disabled.
126    #[serde(default)]
127    pub endpoint: Option<String>,
128
129    /// Listen address for receiving (e.g., "0.0.0.0:8080"). None = receive disabled.
130    /// Requires the `http-server` feature.
131    #[serde(default)]
132    pub listen: Option<String>,
133
134    /// Path to accept POSTs on for receive mode. Default: "/ingest".
135    #[serde(default = "default_recv_path")]
136    pub recv_path: String,
137
138    /// Receive buffer size (bounded channel capacity). Default: 10000.
139    #[serde(default = "default_buffer_size")]
140    pub recv_buffer_size: usize,
141
142    /// Receive timeout in milliseconds. Default: 100.
143    #[serde(default = "default_recv_timeout_ms")]
144    pub recv_timeout_ms: u64,
145
146    /// Inbound message filters (applied on recv before caller sees messages).
147    #[serde(default)]
148    pub filters_in: Vec<super::filter::FilterRule>,
149
150    /// Outbound message filters (applied on send before transport dispatches).
151    #[serde(default)]
152    pub filters_out: Vec<super::filter::FilterRule>,
153
154    /// Connect timeout (ms) for the send-side HTTP client. Default 5000.
155    #[serde(default = "default_connect_timeout_ms")]
156    pub connect_timeout_ms: u64,
157
158    /// Total request timeout (ms) per send. Default 30000. Prevents a hung
159    /// send from consuming worker capacity indefinitely.
160    #[serde(default = "default_send_timeout_ms")]
161    pub send_timeout_ms: u64,
162
163    /// Maximum accepted request body size in bytes (receive side). Default
164    /// 16 MiB. Oversized POSTs are rejected with 413 before buffering.
165    #[serde(default = "default_max_body_bytes")]
166    pub max_body_bytes: usize,
167
168    /// Private-CA PEM path for the send-side HTTPS client. When set and the
169    /// `tls` feature is enabled, the client trusts this CA *in addition to* the
170    /// OS native roots, built via the unified `crate::tls` module and fed to
171    /// reqwest with `use_preconfigured_tls`. None = native roots only (reqwest
172    /// default). Maps to `TlsTrust { native_roots: true, extra_roots: [ca] }`.
173    #[serde(default)]
174    pub tls_ca_path: Option<String>,
175}
176
177impl Default for HttpTransportConfig {
178    fn default() -> Self {
179        Self {
180            endpoint: None,
181            listen: None,
182            recv_path: default_recv_path(),
183            recv_buffer_size: default_buffer_size(),
184            recv_timeout_ms: default_recv_timeout_ms(),
185            filters_in: Vec::new(),
186            filters_out: Vec::new(),
187            connect_timeout_ms: default_connect_timeout_ms(),
188            send_timeout_ms: default_send_timeout_ms(),
189            max_body_bytes: default_max_body_bytes(),
190            tls_ca_path: None,
191        }
192    }
193}
194
195impl HttpTransportConfig {
196    /// Load from the config cascade under the `transport.http` key.
197    #[must_use]
198    pub fn from_cascade() -> Self {
199        <Self as super::traits::FromCascade>::from_cascade_key("transport.http")
200    }
201
202    /// Create a send-only config pointing at the given endpoint URL.
203    #[must_use]
204    pub fn sender(endpoint: &str) -> Self {
205        Self {
206            endpoint: Some(endpoint.to_string()),
207            ..Default::default()
208        }
209    }
210
211    /// Create a receive-only config listening on the given address.
212    #[must_use]
213    pub fn receiver(listen: &str) -> Self {
214        Self {
215            listen: Some(listen.to_string()),
216            ..Default::default()
217        }
218    }
219}
220
221/// HTTP/HTTPS transport.
222///
223/// Supports send (POST to endpoint) and receive (embedded axum server).
224/// The receive side requires the `http-server` feature for axum.
225pub struct HttpTransport {
226    /// reqwest client for sending (always available when transport-http is enabled).
227    client: reqwest::Client,
228
229    /// Base URL for sending (None = send disabled).
230    endpoint: Option<String>,
231
232    /// Receiver channel populated by the embedded HTTP server.
233    /// Only available when `http-server` feature is enabled AND `listen` is configured.
234    #[cfg(feature = "http-server")]
235    receiver: Option<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Message<HttpToken>>>>,
236
237    /// Shutdown signal for the server task.
238    ///
239    /// Behind a `parking_lot::Mutex` for interior mutability: `close(&self)`
240    /// fires it to stop the embedded server promptly (graceful shutdown),
241    /// rather than waiting for `Drop`. `take()` makes both paths idempotent.
242    #[cfg(feature = "http-server")]
243    shutdown_tx: parking_lot::Mutex<Option<tokio::sync::oneshot::Sender<()>>>,
244
245    /// Server background task handle.
246    #[cfg(feature = "http-server")]
247    _server_handle: Option<tokio::task::JoinHandle<()>>,
248
249    /// Whether the transport is closed.
250    closed: Arc<AtomicBool>,
251
252    /// Receive timeout in milliseconds (used by receive side).
253    #[cfg(feature = "http-server")]
254    recv_timeout_ms: u64,
255
256    /// Transport-level message filter engine.
257    filter_engine: super::filter::TransportFilterEngine,
258}
259
260impl HttpTransport {
261    /// Create a new HTTP transport.
262    ///
263    /// - Set `config.endpoint` to enable sending (POST to URL).
264    /// - Set `config.listen` to enable receiving (embedded HTTP server, requires `http-server` feature).
265    ///
266    /// # Errors
267    ///
268    /// Returns error if the listen address is invalid or the server fails to bind.
269    pub async fn new(config: &HttpTransportConfig) -> TransportResult<Self> {
270        // Default path: no pressure governor -> byte-identical to before.
271        Self::new_inner(
272            config,
273            #[cfg(feature = "governor")]
274            None,
275        )
276        .await
277    }
278
279    /// Create an HTTP transport bound to a pressure governor (G3, `governor`
280    /// feature).
281    ///
282    /// Identical to [`new`](Self::new) except the embedded receive server
283    /// consults `pressure` BEFORE enqueuing each request: while
284    /// [`UnifiedPressure::should_hold`](crate::governor::UnifiedPressure::should_hold)
285    /// holds, the handler returns 503 (SERVICE_UNAVAILABLE) -- the same status
286    /// the existing channel-full backpressure path uses. Passing `None` is
287    /// exactly equivalent to [`new`](Self::new).
288    ///
289    /// # Errors
290    ///
291    /// Same as [`new`](Self::new).
292    #[cfg(feature = "governor")]
293    pub async fn with_pressure(
294        config: &HttpTransportConfig,
295        pressure: Option<Arc<crate::governor::UnifiedPressure>>,
296    ) -> TransportResult<Self> {
297        Self::new_inner(config, pressure).await
298    }
299
300    async fn new_inner(
301        config: &HttpTransportConfig,
302        #[cfg(feature = "governor")] pressure: Option<Arc<crate::governor::UnifiedPressure>>,
303    ) -> TransportResult<Self> {
304        // `mut` is only used on the TLS path; harmless without the feature.
305        #[cfg_attr(not(feature = "tls"), allow(unused_mut))]
306        let mut client_builder = reqwest::Client::builder()
307            .connect_timeout(std::time::Duration::from_millis(config.connect_timeout_ms))
308            .timeout(std::time::Duration::from_millis(config.send_timeout_ms));
309
310        // Private-CA trust via the unified TLS module (augments native roots).
311        #[cfg(feature = "tls")]
312        if let Some(ref ca) = config.tls_ca_path {
313            let trust = crate::tls::TlsTrust {
314                native_roots: true,
315                webpki_roots: false,
316                extra_roots: vec![ca.into()],
317                extra_intermediates: Vec::new(),
318                exclusive: false,
319            };
320            let tls_cfg =
321                crate::tls::build_client_config(crate::tls::TlsConfigSource::Trust(trust))
322                    .map_err(|e| TransportError::Config(format!("HTTP client TLS: {e}")))?;
323            client_builder = client_builder.use_preconfigured_tls((*tls_cfg).clone());
324        }
325        #[cfg(not(feature = "tls"))]
326        if config.tls_ca_path.is_some() {
327            tracing::warn!(
328                "http transport tls_ca_path is set but the `tls` feature is disabled -- \
329                 ignoring (using reqwest default roots)"
330            );
331        }
332
333        let client = client_builder
334            .build()
335            .map_err(|e| TransportError::Config(format!("failed to create HTTP client: {e}")))?;
336
337        #[cfg(feature = "http-server")]
338        let (receiver, shutdown_tx, server_handle) = if let Some(listen) = &config.listen {
339            let addr: std::net::SocketAddr = listen
340                .parse()
341                .map_err(|e| TransportError::Config(format!("invalid listen address: {e}")))?;
342
343            let (tx, rx) = tokio::sync::mpsc::channel(config.recv_buffer_size);
344            let (sd_tx, sd_rx) = tokio::sync::oneshot::channel::<()>();
345
346            let sequence = Arc::new(AtomicU64::new(0));
347            let recv_path = config.recv_path.clone();
348
349            let app = build_receiver_router(
350                tx,
351                sequence,
352                &recv_path,
353                config.max_body_bytes,
354                #[cfg(feature = "governor")]
355                pressure.clone(),
356            );
357
358            let listener = tokio::net::TcpListener::bind(addr).await.map_err(|e| {
359                TransportError::Connection(format!("failed to bind to {addr}: {e}"))
360            })?;
361
362            let handle = tokio::spawn(async move {
363                axum::serve(
364                    listener,
365                    app.into_make_service_with_connect_info::<std::net::SocketAddr>(),
366                )
367                .with_graceful_shutdown(async {
368                    sd_rx.await.ok();
369                })
370                .await
371                .ok();
372            });
373
374            (Some(tokio::sync::Mutex::new(rx)), Some(sd_tx), Some(handle))
375        } else {
376            (None, None, None)
377        };
378
379        // When the governor feature is on but http-server is off there is no
380        // receive server to attach the pressure governor to; consume it so the
381        // signature stays uniform without an unused-variable warning.
382        #[cfg(all(feature = "governor", not(feature = "http-server")))]
383        let _ = pressure;
384
385        #[cfg(feature = "logger")]
386        tracing::info!(
387            endpoint = ?config.endpoint,
388            listen = ?config.listen,
389            "HTTP transport opened"
390        );
391
392        // Fail loud on bad filter config -- silently disabling filters
393        // turns a misconfigured `drop` / `dlq` rule into a permanent pass.
394        let filter_engine = super::filter::TransportFilterEngine::new(
395            &config.filters_in,
396            &config.filters_out,
397            &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
398        )?;
399
400        let closed = Arc::new(AtomicBool::new(false));
401
402        #[cfg(feature = "health")]
403        {
404            let h = Arc::clone(&closed);
405            crate::health::HealthRegistry::register("transport:http", move || {
406                if h.load(Ordering::Relaxed) {
407                    crate::health::HealthStatus::Unhealthy
408                } else {
409                    crate::health::HealthStatus::Healthy
410                }
411            });
412        }
413
414        Ok(Self {
415            client,
416            endpoint: config.endpoint.clone(),
417            #[cfg(feature = "http-server")]
418            receiver,
419            #[cfg(feature = "http-server")]
420            shutdown_tx: parking_lot::Mutex::new(shutdown_tx),
421            #[cfg(feature = "http-server")]
422            _server_handle: server_handle,
423            closed,
424            #[cfg(feature = "http-server")]
425            recv_timeout_ms: config.recv_timeout_ms,
426            filter_engine,
427        })
428    }
429}
430
431/// Build the axum router for the receive side.
432#[cfg(feature = "http-server")]
433fn build_receiver_router(
434    sender: tokio::sync::mpsc::Sender<Message<HttpToken>>,
435    sequence: Arc<AtomicU64>,
436    recv_path: &str,
437    max_body_bytes: usize,
438    #[cfg(feature = "governor")] pressure: Option<Arc<crate::governor::UnifiedPressure>>,
439) -> axum::Router {
440    use axum::routing::post;
441
442    let state = ReceiverState {
443        sender,
444        sequence,
445        #[cfg(feature = "governor")]
446        pressure,
447    };
448
449    axum::Router::new()
450        .route(recv_path, post(ingest_handler))
451        // Reject oversized bodies with 413 before the handler buffers them.
452        .layer(axum::extract::DefaultBodyLimit::max(max_body_bytes))
453        .with_state(state)
454}
455
456/// Shared state for the receive handler.
457#[cfg(feature = "http-server")]
458#[derive(Clone)]
459struct ReceiverState {
460    sender: tokio::sync::mpsc::Sender<Message<HttpToken>>,
461    sequence: Arc<AtomicU64>,
462    /// Optional pressure governor (G3, `governor` feature). `None` by default
463    /// -> the handler never consults it and behaviour is byte-identical. When
464    /// `Some`, the handler rejects with 503 while [`UnifiedPressure::should_hold`]
465    /// holds -- pressure-driven shedding ON TOP of the existing channel-full
466    /// 503, never replacing it.
467    #[cfg(feature = "governor")]
468    pressure: Option<Arc<crate::governor::UnifiedPressure>>,
469}
470
471/// POST handler that accepts raw bytes and queues them into the mpsc channel.
472#[cfg(feature = "http-server")]
473async fn ingest_handler(
474    axum::extract::State(state): axum::extract::State<ReceiverState>,
475    axum::extract::ConnectInfo(addr): axum::extract::ConnectInfo<std::net::SocketAddr>,
476    headers: axum::http::HeaderMap,
477    body: axum::body::Bytes,
478) -> axum::http::StatusCode {
479    if body.is_empty() {
480        return axum::http::StatusCode::BAD_REQUEST;
481    }
482
483    // G3 pressure-driven shedding (governor feature, opt-in). BEFORE enqueuing,
484    // if a governor is wired and it says hold, shed the request with 503 --
485    // consistent with the existing channel-full 503 below (NOT 429). Default
486    // `None` -> this is skipped and behaviour is byte-identical.
487    #[cfg(feature = "governor")]
488    if let Some(pressure) = &state.pressure
489        && pressure.should_hold()
490    {
491        #[cfg(feature = "metrics")]
492        metrics::counter!("dfe_transport_backpressured_total", "transport" => "http", "reason" => "pressure")
493            .increment(1);
494        return axum::http::StatusCode::SERVICE_UNAVAILABLE;
495    }
496
497    // Extract W3C traceparent from incoming HTTP headers for distributed tracing
498    #[cfg(feature = "transport-trace")]
499    if let Some(tp) = headers
500        .get(super::propagation::TRACEPARENT_HEADER)
501        .and_then(|v| v.to_str().ok())
502        && super::propagation::is_valid_traceparent(tp)
503    {
504        tracing::Span::current().record("traceparent", tp);
505    }
506
507    // Suppress unused variable warning when otel feature is disabled
508    #[cfg(not(feature = "otel"))]
509    let _ = &headers;
510
511    let seq = state.sequence.fetch_add(1, Ordering::Relaxed);
512    let format = PayloadFormat::detect(&body);
513    let timestamp_ms = chrono::Utc::now().timestamp_millis();
514
515    // `body` is `axum::body::Bytes` (= `bytes::Bytes`) -- move it directly,
516    // no copy needed.
517    let msg = Message {
518        key: None,
519        payload: body,
520        token: HttpToken::with_source(seq, addr.to_string()),
521        timestamp_ms: Some(timestamp_ms),
522        format,
523    };
524
525    match state.sender.try_send(msg) {
526        Ok(()) => {
527            #[cfg(feature = "metrics")]
528            metrics::counter!("dfe_transport_sent_total", "transport" => "http").increment(1);
529            axum::http::StatusCode::OK
530        }
531        Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
532            #[cfg(feature = "metrics")]
533            metrics::counter!("dfe_transport_backpressured_total", "transport" => "http")
534                .increment(1);
535            axum::http::StatusCode::SERVICE_UNAVAILABLE
536        }
537        Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
538            #[cfg(feature = "metrics")]
539            metrics::counter!("dfe_transport_refused_total", "transport" => "http").increment(1);
540            axum::http::StatusCode::GONE
541        }
542    }
543}
544
545impl TransportBase for HttpTransport {
546    async fn close(&self) -> TransportResult<()> {
547        self.closed.store(true, Ordering::Relaxed);
548        // Stop the embedded server now (graceful shutdown) rather than on Drop.
549        // take() => idempotent: a later close()/drop() is a no-op.
550        #[cfg(feature = "http-server")]
551        if let Some(tx) = self.shutdown_tx.lock().take() {
552            let _ = tx.send(());
553        }
554        Ok(())
555    }
556
557    fn is_healthy(&self) -> bool {
558        !self.closed.load(Ordering::Relaxed)
559    }
560
561    fn name(&self) -> &'static str {
562        "http"
563    }
564}
565
566impl TransportSender for HttpTransport {
567    async fn send(&self, key: &str, payload: bytes::Bytes) -> SendResult {
568        if self.closed.load(Ordering::Relaxed) {
569            return SendResult::Fatal(TransportError::Closed);
570        }
571
572        // Outbound filter check
573        if self.filter_engine.has_outbound_filters() {
574            match self.filter_engine.apply_outbound(&payload) {
575                super::filter::FilterDisposition::Pass => {}
576                super::filter::FilterDisposition::Drop => return SendResult::Ok,
577                super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
578            }
579        }
580
581        let Some(base_url) = &self.endpoint else {
582            return SendResult::Fatal(TransportError::Config(
583                "no endpoint configured for sending".into(),
584            ));
585        };
586
587        // Build URL: {base_url}/{key} if key is non-empty, otherwise just {base_url}
588        let url = if key.is_empty() {
589            base_url.clone()
590        } else {
591            let base = base_url.trim_end_matches('/');
592            let suffix = key.trim_start_matches('/');
593            format!("{base}/{suffix}")
594        };
595
596        #[cfg(feature = "metrics")]
597        let start = std::time::Instant::now();
598
599        // Build request with optional W3C traceparent header for distributed tracing
600        let request_builder = self
601            .client
602            .post(&url)
603            .header("content-type", "application/octet-stream");
604
605        #[cfg(feature = "transport-trace")]
606        let request_builder = if let Some(tp) = super::propagation::current_traceparent() {
607            request_builder.header(super::propagation::TRACEPARENT_HEADER, tp)
608        } else {
609            request_builder
610        };
611
612        #[cfg(feature = "logger")]
613        let payload_len = payload.len();
614        let result = match request_builder.body(payload).send().await {
615            Ok(resp) if resp.status().is_success() => {
616                #[cfg(feature = "logger")]
617                tracing::debug!(url = %url, bytes = payload_len, "HTTP transport: POST sent");
618
619                #[cfg(feature = "metrics")]
620                metrics::counter!("dfe_transport_sent_total", "transport" => "http").increment(1);
621                SendResult::Ok
622            }
623            Ok(resp)
624                if resp.status() == reqwest::StatusCode::TOO_MANY_REQUESTS
625                    || resp.status() == reqwest::StatusCode::SERVICE_UNAVAILABLE =>
626            {
627                #[cfg(feature = "logger")]
628                tracing::warn!(status = %resp.status(), url = %url, "HTTP transport: backpressure");
629
630                #[cfg(feature = "metrics")]
631                metrics::counter!("dfe_transport_backpressured_total", "transport" => "http")
632                    .increment(1);
633                SendResult::Backpressured
634            }
635            Ok(resp) => {
636                #[cfg(feature = "logger")]
637                tracing::warn!(status = %resp.status(), url = %url, "HTTP transport: send error");
638
639                #[cfg(feature = "metrics")]
640                metrics::counter!("dfe_transport_send_errors_total", "transport" => "http")
641                    .increment(1);
642                SendResult::Fatal(TransportError::Send(format!(
643                    "HTTP {} from {}",
644                    resp.status(),
645                    url
646                )))
647            }
648            Err(e) => {
649                #[cfg(feature = "logger")]
650                tracing::warn!(error = %e, url = %url, "HTTP transport: request failed");
651
652                #[cfg(feature = "metrics")]
653                metrics::counter!("dfe_transport_send_errors_total", "transport" => "http")
654                    .increment(1);
655                SendResult::Fatal(TransportError::Send(format!("HTTP request failed: {e}")))
656            }
657        };
658
659        #[cfg(feature = "metrics")]
660        metrics::histogram!("dfe_transport_send_duration_seconds", "transport" => "http")
661            .record(start.elapsed().as_secs_f64());
662
663        result
664    }
665}
666
667impl TransportReceiver for HttpTransport {
668    type Token = HttpToken;
669
670    async fn recv(&self, max: usize) -> TransportResult<WorkBatch<Self::Token>> {
671        if self.closed.load(Ordering::Relaxed) {
672            return Err(TransportError::Closed);
673        }
674
675        #[cfg(feature = "http-server")]
676        {
677            let Some(receiver) = &self.receiver else {
678                return Err(TransportError::Config(
679                    "no listen address configured for receiving".into(),
680                ));
681            };
682
683            let mut rx = receiver.lock().await;
684            let mut messages = Vec::with_capacity(max.min(100));
685
686            for _ in 0..max {
687                let result = if self.recv_timeout_ms == 0 {
688                    match rx.try_recv() {
689                        Ok(msg) => Some(msg),
690                        Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
691                        Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
692                            return Err(TransportError::Closed);
693                        }
694                    }
695                } else if messages.is_empty() {
696                    // First message: wait with timeout
697                    match tokio::time::timeout(
698                        std::time::Duration::from_millis(self.recv_timeout_ms),
699                        rx.recv(),
700                    )
701                    .await
702                    {
703                        Ok(Some(msg)) => Some(msg),
704                        Ok(None) => return Err(TransportError::Closed),
705                        Err(_) => break, // Timeout
706                    }
707                } else {
708                    // Subsequent: non-blocking drain
709                    match rx.try_recv() {
710                        Ok(msg) => Some(msg),
711                        Err(_) => break,
712                    }
713                };
714
715                if let Some(msg) = result {
716                    messages.push(msg);
717                }
718            }
719
720            // Apply inbound filters via the shared partition helper; DLQ
721            // entries are returned in the RecvBatch for the caller to route.
722            let batch = self.filter_engine.partition_batch(
723                messages,
724                |m| m.payload.as_ref(),
725                |m| m.key.clone(),
726            );
727            let messages = batch.messages;
728            let dlq_entries = batch.dlq_entries;
729
730            #[cfg(feature = "logger")]
731            if !messages.is_empty() {
732                tracing::debug!(messages = messages.len(), "HTTP transport: batch received");
733            }
734
735            Ok(RecvBatch {
736                messages,
737                dlq_entries,
738            }
739            .into())
740        }
741
742        #[cfg(not(feature = "http-server"))]
743        {
744            let _ = max;
745            Err(TransportError::Config(
746                "HTTP receive requires the 'http-server' feature".into(),
747            ))
748        }
749    }
750
751    async fn commit(&self, _tokens: &[Self::Token]) -> TransportResult<()> {
752        // HTTP is fire-and-forget -- commit is a no-op.
753        Ok(())
754    }
755}
756
757impl Drop for HttpTransport {
758    fn drop(&mut self) {
759        #[cfg(feature = "http-server")]
760        if let Some(tx) = self.shutdown_tx.lock().take() {
761            let _ = tx.send(());
762        }
763    }
764}
765
766impl super::traits::FromCascade for HttpTransportConfig {}
767
768#[cfg(test)]
769mod tests {
770    use super::*;
771
772    #[test]
773    fn http_token_display() {
774        let token = HttpToken::new(42);
775        assert_eq!(format!("{token}"), "http:42");
776    }
777
778    #[test]
779    fn http_token_display_with_source() {
780        let token = HttpToken::with_source(7, "192.168.1.1:54321".to_string());
781        assert_eq!(format!("{token}"), "http:192.168.1.1:54321:7");
782    }
783
784    #[test]
785    fn config_defaults() {
786        let config = HttpTransportConfig::default();
787        assert!(config.endpoint.is_none());
788        assert!(config.listen.is_none());
789        assert_eq!(config.recv_path, "/ingest");
790        assert_eq!(config.recv_buffer_size, 10_000);
791        assert_eq!(config.recv_timeout_ms, 100);
792    }
793
794    #[test]
795    fn config_sender_helper() {
796        let config = HttpTransportConfig::sender("http://localhost:8080/ingest");
797        assert_eq!(
798            config.endpoint.as_deref(),
799            Some("http://localhost:8080/ingest")
800        );
801        assert!(config.listen.is_none());
802    }
803
804    #[test]
805    fn config_receiver_helper() {
806        let config = HttpTransportConfig::receiver("0.0.0.0:8080");
807        assert!(config.endpoint.is_none());
808        assert_eq!(config.listen.as_deref(), Some("0.0.0.0:8080"));
809    }
810
811    #[tokio::test]
812    async fn send_only_transport() {
813        // Send-only config (no endpoint = send disabled, but transport creates fine)
814        let config = HttpTransportConfig::default();
815        let transport = HttpTransport::new(&config).await.unwrap();
816
817        assert!(transport.is_healthy());
818        assert_eq!(transport.name(), "http");
819
820        // Send without endpoint should fail
821        let result = transport
822            .send("test", bytes::Bytes::from_static(b"payload"))
823            .await;
824        assert!(result.is_fatal());
825
826        // Commit is always ok
827        transport.commit(&[]).await.unwrap();
828    }
829
830    #[tokio::test]
831    async fn close_prevents_send() {
832        let config = HttpTransportConfig::sender("http://localhost:19999/test");
833        let transport = HttpTransport::new(&config).await.unwrap();
834
835        transport.close().await.unwrap();
836        assert!(!transport.is_healthy());
837
838        let result = transport
839            .send("test", bytes::Bytes::from_static(b"data"))
840            .await;
841        assert!(result.is_fatal());
842    }
843
844    #[tokio::test]
845    async fn close_prevents_recv() {
846        let config = HttpTransportConfig::default();
847        let transport = HttpTransport::new(&config).await.unwrap();
848
849        transport.close().await.unwrap();
850        let result = transport.recv(1).await;
851        assert!(result.is_err());
852    }
853
854    /// Full send + receive round-trip test.
855    /// Requires both `transport-http` and `http-server` features.
856    #[cfg(feature = "http-server")]
857    #[tokio::test]
858    async fn send_and_receive_roundtrip() {
859        // Start receiver on a random available port
860        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
861        let addr = listener.local_addr().unwrap();
862        drop(listener); // Free the port for the transport to bind
863
864        let recv_config = HttpTransportConfig {
865            listen: Some(addr.to_string()),
866            recv_path: "/ingest".to_string(),
867            recv_buffer_size: 100,
868            recv_timeout_ms: 1000,
869            ..Default::default()
870        };
871        let receiver = HttpTransport::new(&recv_config).await.unwrap();
872
873        // Give the server a moment to start
874        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
875
876        // Send a message via a separate sender transport
877        let send_config =
878            HttpTransportConfig::sender(&format!("http://127.0.0.1:{}/ingest", addr.port()));
879        let sender = HttpTransport::new(&send_config).await.unwrap();
880
881        let result = sender
882            .send("", bytes::Bytes::from_static(b"{\"msg\":\"hello\"}"))
883            .await;
884        assert!(result.is_ok(), "send failed: {result:?}");
885
886        // Receive it
887        let batch = receiver.recv(10).await.unwrap();
888        assert_eq!(batch.records.len(), 1);
889        assert_eq!(batch.records[0].payload.as_ref(), b"{\"msg\":\"hello\"}");
890        // The source address rides on the batch commit token, not the record.
891        assert!(batch.commit_tokens[0].source_addr.is_some());
892
893        // Cleanup
894        sender.close().await.unwrap();
895        receiver.close().await.unwrap();
896    }
897
898    /// Test that the receiver rejects empty bodies.
899    #[cfg(feature = "http-server")]
900    #[tokio::test]
901    async fn receive_rejects_empty_body() {
902        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
903        let addr = listener.local_addr().unwrap();
904        drop(listener);
905
906        let recv_config = HttpTransportConfig {
907            listen: Some(addr.to_string()),
908            recv_timeout_ms: 200,
909            ..Default::default()
910        };
911        let receiver = HttpTransport::new(&recv_config).await.unwrap();
912        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
913
914        // Send empty body
915        let client = reqwest::Client::new();
916        let resp = client
917            .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
918            .body(Vec::<u8>::new())
919            .send()
920            .await
921            .unwrap();
922
923        assert_eq!(resp.status(), reqwest::StatusCode::BAD_REQUEST);
924
925        // recv should timeout with no messages
926        let records = receiver.recv(10).await.unwrap().records;
927        assert!(records.is_empty());
928
929        receiver.close().await.unwrap();
930    }
931
932    /// Oversized bodies are rejected with 413 before the handler buffers them.
933    #[cfg(feature = "http-server")]
934    #[tokio::test]
935    async fn receive_rejects_oversized_body() {
936        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
937        let addr = listener.local_addr().unwrap();
938        drop(listener);
939
940        let recv_config = HttpTransportConfig {
941            listen: Some(addr.to_string()),
942            recv_timeout_ms: 200,
943            max_body_bytes: 1024, // 1 KiB cap
944            ..Default::default()
945        };
946        let receiver = HttpTransport::new(&recv_config).await.unwrap();
947        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
948
949        // POST 8 KiB -- over the 1 KiB cap.
950        let client = reqwest::Client::new();
951        let resp = client
952            .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
953            .body(vec![b'x'; 8 * 1024])
954            .send()
955            .await
956            .unwrap();
957
958        assert_eq!(resp.status(), reqwest::StatusCode::PAYLOAD_TOO_LARGE);
959
960        // The oversized POST never reached the queue.
961        let records = receiver.recv(10).await.unwrap().records;
962        assert!(records.is_empty(), "oversized body must not be queued");
963
964        receiver.close().await.unwrap();
965    }
966
967    /// Test recv without listen returns config error.
968    #[cfg(feature = "http-server")]
969    #[tokio::test]
970    async fn recv_without_listen_returns_error() {
971        let config = HttpTransportConfig::sender("http://localhost:9999");
972        let transport = HttpTransport::new(&config).await.unwrap();
973
974        let result = transport.recv(10).await;
975        assert!(result.is_err());
976    }
977
978    /// G3: with a pressure governor pinned HIGH, the ingest handler sheds with
979    /// 503 (SERVICE_UNAVAILABLE) -- the same status as the channel-full path.
980    /// With `None` (the default `new`), POST is accepted (200) as before.
981    #[cfg(all(feature = "http-server", feature = "governor"))]
982    #[tokio::test]
983    async fn pressure_high_sheds_with_503_and_none_is_normal() {
984        use crate::governor::{Hysteresis, MemoryPressureSource, PressureSource, UnifiedPressure};
985        use crate::memory::{MemoryGuard, MemoryGuardConfig};
986
987        // --- None default: POST accepted (200) ---
988        {
989            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
990            let addr = listener.local_addr().unwrap();
991            drop(listener);
992            let cfg = HttpTransportConfig {
993                listen: Some(addr.to_string()),
994                recv_timeout_ms: 200,
995                ..Default::default()
996            };
997            let receiver = HttpTransport::new(&cfg).await.unwrap();
998            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
999
1000            let client = reqwest::Client::new();
1001            let resp = client
1002                .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
1003                .body(b"{\"msg\":\"ok\"}".to_vec())
1004                .send()
1005                .await
1006                .unwrap();
1007            assert_eq!(
1008                resp.status(),
1009                reqwest::StatusCode::OK,
1010                "no governor -> accepted"
1011            );
1012            receiver.close().await.unwrap();
1013        }
1014
1015        // --- Governor pinned HIGH (HARD memory source at 95%): 503 ---
1016        {
1017            let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
1018                limit_bytes: 1000,
1019                pressure_threshold: 0.80,
1020                ..Default::default()
1021            }));
1022            guard.add_bytes(950); // 95% -> well above pause_above
1023            let src = MemoryPressureSource::new(Arc::clone(&guard));
1024            let pressure = Arc::new(UnifiedPressure::new(
1025                vec![Arc::new(src) as Arc<dyn PressureSource>],
1026                Hysteresis::new(0.80, 0.65).expect("valid band"),
1027            ));
1028            // Sanity: the latch is armed.
1029            assert!(pressure.should_hold(), "pinned-high governor must hold");
1030
1031            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1032            let addr = listener.local_addr().unwrap();
1033            drop(listener);
1034            let cfg = HttpTransportConfig {
1035                listen: Some(addr.to_string()),
1036                recv_timeout_ms: 200,
1037                ..Default::default()
1038            };
1039            let receiver = HttpTransport::with_pressure(&cfg, Some(Arc::clone(&pressure)))
1040                .await
1041                .unwrap();
1042            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1043
1044            let client = reqwest::Client::new();
1045            let resp = client
1046                .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
1047                .body(b"{\"msg\":\"shed\"}".to_vec())
1048                .send()
1049                .await
1050                .unwrap();
1051            assert_eq!(
1052                resp.status(),
1053                reqwest::StatusCode::SERVICE_UNAVAILABLE,
1054                "pinned-high governor must shed with 503"
1055            );
1056
1057            // The shed POST never reached the queue.
1058            let records = receiver.recv(10).await.unwrap().records;
1059            assert!(records.is_empty(), "shed request must not be queued");
1060            receiver.close().await.unwrap();
1061        }
1062    }
1063
1064    #[test]
1065    fn config_serde_roundtrip() {
1066        let config = HttpTransportConfig {
1067            endpoint: Some("http://example.com/ingest".into()),
1068            listen: Some("0.0.0.0:8080".into()),
1069            recv_path: "/custom".into(),
1070            recv_buffer_size: 5000,
1071            recv_timeout_ms: 250,
1072            ..Default::default()
1073        };
1074
1075        let json = serde_json::to_string(&config).unwrap();
1076        let parsed: HttpTransportConfig = serde_json::from_str(&json).unwrap();
1077
1078        assert_eq!(parsed.endpoint, config.endpoint);
1079        assert_eq!(parsed.listen, config.listen);
1080        assert_eq!(parsed.recv_path, config.recv_path);
1081        assert_eq!(parsed.recv_buffer_size, config.recv_buffer_size);
1082        assert_eq!(parsed.recv_timeout_ms, config.recv_timeout_ms);
1083    }
1084}