Skip to main content

hyperi_rustlib/transport/
http.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/http.rs
3// Purpose:   HTTP/HTTPS transport (send via POST, receive via embedded server)
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # HTTP Transport
10//!
11//! HTTP/HTTPS transport for webhook delivery and REST ingest.
12//!
13//! ## Send
14//!
15//! POSTs payload bytes to `{endpoint}/{key}` using reqwest.
16//!
17//! ## Receive (requires `http-server` feature)
18//!
19//! Starts an embedded axum HTTP server that accepts POST requests on a
20//! configurable path. Incoming payloads are queued into a bounded
21//! `tokio::sync::mpsc` channel. `recv()` drains from this channel.
22//!
23//! ## Example
24//!
25//! ```rust,ignore
26//! use hyperi_rustlib::transport::http::{HttpTransport, HttpTransportConfig};
27//!
28//! // Send-only
29//! let config = HttpTransportConfig {
30//!     endpoint: Some("http://loader:8080/ingest".into()),
31//!     ..Default::default()
32//! };
33//! let transport = HttpTransport::new(&config).await?;
34//! transport.send("events", bytes::Bytes::from_static(b"{\"msg\":\"hello\"}")).await;
35//! ```
36
37use super::error::{TransportError, TransportResult};
38#[cfg(feature = "http-server")]
39use super::traits::RecvBatch;
40use super::traits::{CommitToken, TransportBase, TransportReceiver, TransportSender};
41#[cfg(feature = "http-server")]
42use super::types::Message;
43#[cfg(feature = "http-server")]
44use super::types::PayloadFormat;
45use super::types::SendResult;
46use super::work_batch::WorkBatch;
47use serde::{Deserialize, Serialize};
48use std::sync::Arc;
49#[cfg(feature = "http-server")]
50use std::sync::atomic::AtomicU64;
51use std::sync::atomic::{AtomicBool, Ordering};
52
53/// Commit token for HTTP transport.
54///
55/// HTTP is fire-and-forget from the receiver's perspective, so commit
56/// is a no-op. The token provides sequence tracking and optional
57/// client address for observability.
58#[derive(Debug, Clone)]
59pub struct HttpToken {
60    /// Local sequence number (monotonically increasing per transport instance).
61    pub seq: u64,
62
63    /// Source client address (if available from the HTTP request).
64    pub source_addr: Option<String>,
65}
66
67impl HttpToken {
68    /// Create a new token with sequence number.
69    #[must_use]
70    pub fn new(seq: u64) -> Self {
71        Self {
72            seq,
73            source_addr: None,
74        }
75    }
76
77    /// Create a new token with sequence number and source address.
78    #[must_use]
79    pub fn with_source(seq: u64, addr: String) -> Self {
80        Self {
81            seq,
82            source_addr: Some(addr),
83        }
84    }
85}
86
87impl CommitToken for HttpToken {}
88
89impl std::fmt::Display for HttpToken {
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        match &self.source_addr {
92            Some(addr) => write!(f, "http:{}:{}", addr, self.seq),
93            None => write!(f, "http:{}", self.seq),
94        }
95    }
96}
97
98fn default_recv_path() -> String {
99    "/ingest".to_string()
100}
101
102fn default_buffer_size() -> usize {
103    10_000
104}
105
106fn default_recv_timeout_ms() -> u64 {
107    100
108}
109
110fn default_connect_timeout_ms() -> u64 {
111    5_000
112}
113
114fn default_send_timeout_ms() -> u64 {
115    30_000
116}
117
118fn default_max_body_bytes() -> usize {
119    16 * 1024 * 1024
120}
121
122/// Configuration for HTTP transport.
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct HttpTransportConfig {
125    /// Endpoint URL for sending (e.g., "http://loader:8080/ingest"). None = send disabled.
126    #[serde(default)]
127    pub endpoint: Option<String>,
128
129    /// Listen address for receiving (e.g., "0.0.0.0:8080"). None = receive disabled.
130    /// Requires the `http-server` feature.
131    #[serde(default)]
132    pub listen: Option<String>,
133
134    /// Path to accept POSTs on for receive mode. Default: "/ingest".
135    #[serde(default = "default_recv_path")]
136    pub recv_path: String,
137
138    /// Receive buffer size (bounded channel capacity). Default: 10000.
139    #[serde(default = "default_buffer_size")]
140    pub recv_buffer_size: usize,
141
142    /// Receive timeout in milliseconds. Default: 100.
143    #[serde(default = "default_recv_timeout_ms")]
144    pub recv_timeout_ms: u64,
145
146    /// Inbound message filters (applied on recv before caller sees messages).
147    #[serde(default)]
148    pub filters_in: Vec<super::filter::FilterRule>,
149
150    /// Outbound message filters (applied on send before transport dispatches).
151    #[serde(default)]
152    pub filters_out: Vec<super::filter::FilterRule>,
153
154    /// Connect timeout (ms) for the send-side HTTP client. Default 5000.
155    #[serde(default = "default_connect_timeout_ms")]
156    pub connect_timeout_ms: u64,
157
158    /// Total request timeout (ms) per send. Default 30000. Prevents a hung
159    /// send from consuming worker capacity indefinitely.
160    #[serde(default = "default_send_timeout_ms")]
161    pub send_timeout_ms: u64,
162
163    /// Maximum accepted request body size in bytes (receive side). Default
164    /// 16 MiB. Oversized POSTs are rejected with 413 before buffering.
165    #[serde(default = "default_max_body_bytes")]
166    pub max_body_bytes: usize,
167
168    /// Private-CA PEM path for the send-side HTTPS client (needs the `tls`
169    /// feature). Trusts this CA *in addition to* the OS native roots. None =
170    /// native roots only (reqwest default). Maps to `TlsTrust { native_roots:
171    /// true, extra_roots: [ca] }`.
172    #[serde(default)]
173    pub tls_ca_path: Option<String>,
174}
175
176impl Default for HttpTransportConfig {
177    fn default() -> Self {
178        Self {
179            endpoint: None,
180            listen: None,
181            recv_path: default_recv_path(),
182            recv_buffer_size: default_buffer_size(),
183            recv_timeout_ms: default_recv_timeout_ms(),
184            filters_in: Vec::new(),
185            filters_out: Vec::new(),
186            connect_timeout_ms: default_connect_timeout_ms(),
187            send_timeout_ms: default_send_timeout_ms(),
188            max_body_bytes: default_max_body_bytes(),
189            tls_ca_path: None,
190        }
191    }
192}
193
194impl HttpTransportConfig {
195    /// Load from the config cascade under the `transport.http` key.
196    #[must_use]
197    pub fn from_cascade() -> Self {
198        <Self as super::traits::FromCascade>::from_cascade_key("transport.http")
199    }
200
201    /// Create a send-only config pointing at the given endpoint URL.
202    #[must_use]
203    pub fn sender(endpoint: &str) -> Self {
204        Self {
205            endpoint: Some(endpoint.to_string()),
206            ..Default::default()
207        }
208    }
209
210    /// Create a receive-only config listening on the given address.
211    #[must_use]
212    pub fn receiver(listen: &str) -> Self {
213        Self {
214            listen: Some(listen.to_string()),
215            ..Default::default()
216        }
217    }
218}
219
220/// HTTP/HTTPS transport.
221///
222/// Supports send (POST to endpoint) and receive (embedded axum server).
223/// The receive side requires the `http-server` feature for axum.
224pub struct HttpTransport {
225    /// reqwest client for sending (always available when transport-http is enabled).
226    client: reqwest::Client,
227
228    /// Base URL for sending (None = send disabled).
229    endpoint: Option<String>,
230
231    /// Receiver channel populated by the embedded HTTP server.
232    /// Only available when `http-server` feature is enabled AND `listen` is configured.
233    #[cfg(feature = "http-server")]
234    receiver: Option<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Message<HttpToken>>>>,
235
236    /// Shutdown signal for the server task.
237    ///
238    /// Behind a `parking_lot::Mutex` for interior mutability: `close(&self)`
239    /// fires it to stop the embedded server promptly (graceful shutdown),
240    /// rather than waiting for `Drop`. `take()` makes both paths idempotent.
241    #[cfg(feature = "http-server")]
242    shutdown_tx: parking_lot::Mutex<Option<tokio::sync::oneshot::Sender<()>>>,
243
244    /// Server background task handle.
245    #[cfg(feature = "http-server")]
246    _server_handle: Option<tokio::task::JoinHandle<()>>,
247
248    /// Whether the transport is closed.
249    closed: Arc<AtomicBool>,
250
251    /// Receive timeout in milliseconds (used by receive side).
252    #[cfg(feature = "http-server")]
253    recv_timeout_ms: u64,
254
255    /// Transport-level message filter engine.
256    filter_engine: super::filter::TransportFilterEngine,
257}
258
259impl HttpTransport {
260    /// Create a new HTTP transport.
261    ///
262    /// - Set `config.endpoint` to enable sending (POST to URL).
263    /// - Set `config.listen` to enable receiving (embedded HTTP server, requires `http-server` feature).
264    ///
265    /// # Errors
266    ///
267    /// Returns error if the listen address is invalid or the server fails to bind.
268    pub async fn new(config: &HttpTransportConfig) -> TransportResult<Self> {
269        // Default path: no pressure governor -> byte-identical to before.
270        Self::new_inner(
271            config,
272            #[cfg(feature = "governor")]
273            None,
274        )
275        .await
276    }
277
278    /// Create an HTTP transport bound to a pressure governor (G3, `governor`
279    /// feature).
280    ///
281    /// Identical to [`new`](Self::new) except the embedded receive server
282    /// consults `pressure` BEFORE enqueuing each request: while
283    /// [`UnifiedPressure::should_hold`](crate::governor::UnifiedPressure::should_hold)
284    /// holds, the handler returns 503 (SERVICE_UNAVAILABLE) -- the same status
285    /// the existing channel-full backpressure path uses. Passing `None` is
286    /// exactly equivalent to [`new`](Self::new).
287    ///
288    /// # Errors
289    ///
290    /// Same as [`new`](Self::new).
291    #[cfg(feature = "governor")]
292    pub async fn with_pressure(
293        config: &HttpTransportConfig,
294        pressure: Option<Arc<crate::governor::UnifiedPressure>>,
295    ) -> TransportResult<Self> {
296        Self::new_inner(config, pressure).await
297    }
298
299    async fn new_inner(
300        config: &HttpTransportConfig,
301        #[cfg(feature = "governor")] pressure: Option<Arc<crate::governor::UnifiedPressure>>,
302    ) -> TransportResult<Self> {
303        // `mut` is only used on the TLS path; harmless without the feature.
304        #[cfg_attr(not(feature = "tls"), allow(unused_mut))]
305        let mut client_builder = reqwest::Client::builder()
306            .connect_timeout(std::time::Duration::from_millis(config.connect_timeout_ms))
307            .timeout(std::time::Duration::from_millis(config.send_timeout_ms));
308
309        // Private-CA trust via the unified TLS module (augments native roots).
310        #[cfg(feature = "tls")]
311        if let Some(ref ca) = config.tls_ca_path {
312            let trust = crate::tls::TlsTrust {
313                native_roots: true,
314                webpki_roots: false,
315                extra_roots: vec![ca.into()],
316                extra_intermediates: Vec::new(),
317                exclusive: false,
318            };
319            let tls_cfg =
320                crate::tls::build_client_config(crate::tls::TlsConfigSource::Trust(trust))
321                    .map_err(|e| TransportError::Config(format!("HTTP client TLS: {e}")))?;
322            client_builder = client_builder.use_preconfigured_tls((*tls_cfg).clone());
323        }
324        #[cfg(not(feature = "tls"))]
325        if config.tls_ca_path.is_some() {
326            tracing::warn!(
327                "http transport tls_ca_path is set but the `tls` feature is disabled -- \
328                 ignoring (using reqwest default roots)"
329            );
330        }
331
332        let client = client_builder
333            .build()
334            .map_err(|e| TransportError::Config(format!("failed to create HTTP client: {e}")))?;
335
336        #[cfg(feature = "http-server")]
337        let (receiver, shutdown_tx, server_handle) = if let Some(listen) = &config.listen {
338            let addr: std::net::SocketAddr = listen
339                .parse()
340                .map_err(|e| TransportError::Config(format!("invalid listen address: {e}")))?;
341
342            let (tx, rx) = tokio::sync::mpsc::channel(config.recv_buffer_size);
343            let (sd_tx, sd_rx) = tokio::sync::oneshot::channel::<()>();
344
345            let sequence = Arc::new(AtomicU64::new(0));
346            let recv_path = config.recv_path.clone();
347
348            let app = build_receiver_router(
349                tx,
350                sequence,
351                &recv_path,
352                config.max_body_bytes,
353                #[cfg(feature = "governor")]
354                pressure.clone(),
355            );
356
357            let listener = tokio::net::TcpListener::bind(addr).await.map_err(|e| {
358                TransportError::Connection(format!("failed to bind to {addr}: {e}"))
359            })?;
360
361            let handle = tokio::spawn(async move {
362                axum::serve(
363                    listener,
364                    app.into_make_service_with_connect_info::<std::net::SocketAddr>(),
365                )
366                .with_graceful_shutdown(async {
367                    sd_rx.await.ok();
368                })
369                .await
370                .ok();
371            });
372
373            (Some(tokio::sync::Mutex::new(rx)), Some(sd_tx), Some(handle))
374        } else {
375            (None, None, None)
376        };
377
378        // When the governor feature is on but http-server is off there is no
379        // receive server to attach the pressure governor to; consume it so the
380        // signature stays uniform without an unused-variable warning.
381        #[cfg(all(feature = "governor", not(feature = "http-server")))]
382        let _ = pressure;
383
384        #[cfg(feature = "logger")]
385        tracing::info!(
386            endpoint = ?config.endpoint,
387            listen = ?config.listen,
388            "HTTP transport opened"
389        );
390
391        // Fail loud on bad filter config -- silently disabling filters
392        // turns a misconfigured `drop` / `dlq` rule into a permanent pass.
393        let filter_engine = super::filter::TransportFilterEngine::new(
394            &config.filters_in,
395            &config.filters_out,
396            &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
397        )?;
398
399        let closed = Arc::new(AtomicBool::new(false));
400
401        #[cfg(feature = "health")]
402        {
403            let h = Arc::clone(&closed);
404            crate::health::HealthRegistry::register("transport:http", move || {
405                if h.load(Ordering::Relaxed) {
406                    crate::health::HealthStatus::Unhealthy
407                } else {
408                    crate::health::HealthStatus::Healthy
409                }
410            });
411        }
412
413        Ok(Self {
414            client,
415            endpoint: config.endpoint.clone(),
416            #[cfg(feature = "http-server")]
417            receiver,
418            #[cfg(feature = "http-server")]
419            shutdown_tx: parking_lot::Mutex::new(shutdown_tx),
420            #[cfg(feature = "http-server")]
421            _server_handle: server_handle,
422            closed,
423            #[cfg(feature = "http-server")]
424            recv_timeout_ms: config.recv_timeout_ms,
425            filter_engine,
426        })
427    }
428}
429
430/// Build the axum router for the receive side.
431#[cfg(feature = "http-server")]
432fn build_receiver_router(
433    sender: tokio::sync::mpsc::Sender<Message<HttpToken>>,
434    sequence: Arc<AtomicU64>,
435    recv_path: &str,
436    max_body_bytes: usize,
437    #[cfg(feature = "governor")] pressure: Option<Arc<crate::governor::UnifiedPressure>>,
438) -> axum::Router {
439    use axum::routing::post;
440
441    let state = ReceiverState {
442        sender,
443        sequence,
444        #[cfg(feature = "governor")]
445        pressure,
446    };
447
448    axum::Router::new()
449        .route(recv_path, post(ingest_handler))
450        // Reject oversized bodies with 413 before the handler buffers them.
451        .layer(axum::extract::DefaultBodyLimit::max(max_body_bytes))
452        .with_state(state)
453}
454
455/// Shared state for the receive handler.
456#[cfg(feature = "http-server")]
457#[derive(Clone)]
458struct ReceiverState {
459    sender: tokio::sync::mpsc::Sender<Message<HttpToken>>,
460    sequence: Arc<AtomicU64>,
461    /// Optional pressure governor (G3, `governor` feature). `None` by default
462    /// -> the handler never consults it and behaviour is byte-identical. When
463    /// `Some`, the handler rejects with 503 while [`UnifiedPressure::should_hold`]
464    /// holds -- pressure-driven shedding ON TOP of the existing channel-full
465    /// 503, never replacing it.
466    #[cfg(feature = "governor")]
467    pressure: Option<Arc<crate::governor::UnifiedPressure>>,
468}
469
470/// POST handler that accepts raw bytes and queues them into the mpsc channel.
471#[cfg(feature = "http-server")]
472async fn ingest_handler(
473    axum::extract::State(state): axum::extract::State<ReceiverState>,
474    axum::extract::ConnectInfo(addr): axum::extract::ConnectInfo<std::net::SocketAddr>,
475    headers: axum::http::HeaderMap,
476    body: axum::body::Bytes,
477) -> axum::response::Response {
478    use axum::response::IntoResponse as _;
479
480    if body.is_empty() {
481        return axum::http::StatusCode::BAD_REQUEST.into_response();
482    }
483
484    // G3 pressure-driven shedding (governor feature, opt-in). BEFORE enqueuing,
485    // if a governor is wired and it says hold, shed the request with 503 --
486    // consistent with the existing channel-full 503 below (NOT 429). Default
487    // `None` -> this is skipped and behaviour is byte-identical.
488    #[cfg(feature = "governor")]
489    if let Some(pressure) = &state.pressure
490        && pressure.should_hold()
491    {
492        #[cfg(feature = "metrics")]
493        metrics::counter!("dfe_transport_backpressured_total", "transport" => "http", "reason" => "pressure")
494            .increment(1);
495        return shed_503();
496    }
497
498    // Extract W3C traceparent from incoming HTTP headers for distributed tracing
499    #[cfg(feature = "transport-trace")]
500    if let Some(tp) = headers
501        .get(super::propagation::TRACEPARENT_HEADER)
502        .and_then(|v| v.to_str().ok())
503        && super::propagation::is_valid_traceparent(tp)
504    {
505        tracing::Span::current().record("traceparent", tp);
506    }
507
508    // Suppress unused variable warning when otel feature is disabled
509    #[cfg(not(feature = "otel"))]
510    let _ = &headers;
511
512    let seq = state.sequence.fetch_add(1, Ordering::Relaxed);
513    let format = PayloadFormat::detect(&body);
514    let timestamp_ms = chrono::Utc::now().timestamp_millis();
515
516    // `body` is `axum::body::Bytes` (= `bytes::Bytes`) -- move it directly,
517    // no copy needed.
518    let msg = Message {
519        key: None,
520        payload: body,
521        token: HttpToken::with_source(seq, addr.to_string()),
522        timestamp_ms: Some(timestamp_ms),
523        format,
524    };
525
526    match state.sender.try_send(msg) {
527        Ok(()) => {
528            #[cfg(feature = "metrics")]
529            metrics::counter!("dfe_transport_sent_total", "transport" => "http").increment(1);
530            axum::http::StatusCode::OK.into_response()
531        }
532        Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
533            #[cfg(feature = "metrics")]
534            metrics::counter!("dfe_transport_backpressured_total", "transport" => "http")
535                .increment(1);
536            shed_503()
537        }
538        Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
539            #[cfg(feature = "metrics")]
540            metrics::counter!("dfe_transport_refused_total", "transport" => "http").increment(1);
541            axum::http::StatusCode::GONE.into_response()
542        }
543    }
544}
545
546/// 503 shed response with a `Retry-After: 1` hint, so a well-behaved sender
547/// backs off briefly instead of hot-retrying into a pod that is already holding.
548#[cfg(feature = "http-server")]
549fn shed_503() -> axum::response::Response {
550    use axum::response::IntoResponse as _;
551    (
552        axum::http::StatusCode::SERVICE_UNAVAILABLE,
553        [(axum::http::header::RETRY_AFTER, "1")],
554    )
555        .into_response()
556}
557
558impl TransportBase for HttpTransport {
559    async fn close(&self) -> TransportResult<()> {
560        self.closed.store(true, Ordering::Relaxed);
561        // Stop the embedded server now (graceful shutdown) rather than on Drop.
562        // take() => idempotent: a later close()/drop() is a no-op.
563        #[cfg(feature = "http-server")]
564        if let Some(tx) = self.shutdown_tx.lock().take() {
565            let _ = tx.send(());
566        }
567        Ok(())
568    }
569
570    fn is_healthy(&self) -> bool {
571        !self.closed.load(Ordering::Relaxed)
572    }
573
574    fn name(&self) -> &'static str {
575        "http"
576    }
577}
578
579impl TransportSender for HttpTransport {
580    async fn send(&self, key: &str, payload: bytes::Bytes) -> SendResult {
581        if self.closed.load(Ordering::Relaxed) {
582            return SendResult::Fatal(TransportError::Closed);
583        }
584
585        // Outbound filter check
586        if self.filter_engine.has_outbound_filters() {
587            match self.filter_engine.apply_outbound(&payload) {
588                super::filter::FilterDisposition::Pass => {}
589                super::filter::FilterDisposition::Drop => return SendResult::Ok,
590                super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
591            }
592        }
593
594        let Some(base_url) = &self.endpoint else {
595            return SendResult::Fatal(TransportError::Config(
596                "no endpoint configured for sending".into(),
597            ));
598        };
599
600        // Build URL: {base_url}/{key} if key is non-empty, otherwise just {base_url}
601        let url = if key.is_empty() {
602            base_url.clone()
603        } else {
604            let base = base_url.trim_end_matches('/');
605            let suffix = key.trim_start_matches('/');
606            format!("{base}/{suffix}")
607        };
608
609        #[cfg(feature = "metrics")]
610        let start = std::time::Instant::now();
611
612        // Build request with optional W3C traceparent header for distributed tracing
613        let request_builder = self
614            .client
615            .post(&url)
616            .header("content-type", "application/octet-stream");
617
618        #[cfg(feature = "transport-trace")]
619        let request_builder = if let Some(tp) = super::propagation::current_traceparent() {
620            request_builder.header(super::propagation::TRACEPARENT_HEADER, tp)
621        } else {
622            request_builder
623        };
624
625        #[cfg(feature = "logger")]
626        let payload_len = payload.len();
627        let result = match request_builder.body(payload).send().await {
628            Ok(resp) if resp.status().is_success() => {
629                #[cfg(feature = "logger")]
630                tracing::debug!(url = %url, bytes = payload_len, "HTTP transport: POST sent");
631
632                #[cfg(feature = "metrics")]
633                metrics::counter!("dfe_transport_sent_total", "transport" => "http").increment(1);
634                SendResult::Ok
635            }
636            Ok(resp)
637                if resp.status() == reqwest::StatusCode::TOO_MANY_REQUESTS
638                    || resp.status() == reqwest::StatusCode::SERVICE_UNAVAILABLE =>
639            {
640                #[cfg(feature = "logger")]
641                tracing::warn!(status = %resp.status(), url = %url, "HTTP transport: backpressure");
642
643                #[cfg(feature = "metrics")]
644                metrics::counter!("dfe_transport_backpressured_total", "transport" => "http")
645                    .increment(1);
646                SendResult::Backpressured
647            }
648            Ok(resp) => {
649                #[cfg(feature = "logger")]
650                tracing::warn!(status = %resp.status(), url = %url, "HTTP transport: send error");
651
652                #[cfg(feature = "metrics")]
653                metrics::counter!("dfe_transport_send_errors_total", "transport" => "http")
654                    .increment(1);
655                SendResult::Fatal(TransportError::Send(format!(
656                    "HTTP {} from {}",
657                    resp.status(),
658                    url
659                )))
660            }
661            Err(e) => {
662                #[cfg(feature = "logger")]
663                tracing::warn!(error = %e, url = %url, "HTTP transport: request failed");
664
665                #[cfg(feature = "metrics")]
666                metrics::counter!("dfe_transport_send_errors_total", "transport" => "http")
667                    .increment(1);
668                SendResult::Fatal(TransportError::Send(format!("HTTP request failed: {e}")))
669            }
670        };
671
672        #[cfg(feature = "metrics")]
673        metrics::histogram!("dfe_transport_send_duration_seconds", "transport" => "http")
674            .record(start.elapsed().as_secs_f64());
675
676        result
677    }
678}
679
680impl TransportReceiver for HttpTransport {
681    type Token = HttpToken;
682
683    async fn recv(&self, max: usize) -> TransportResult<WorkBatch<Self::Token>> {
684        if self.closed.load(Ordering::Relaxed) {
685            return Err(TransportError::Closed);
686        }
687
688        #[cfg(feature = "http-server")]
689        {
690            let Some(receiver) = &self.receiver else {
691                return Err(TransportError::Config(
692                    "no listen address configured for receiving".into(),
693                ));
694            };
695
696            let mut rx = receiver.lock().await;
697            let mut messages = Vec::with_capacity(max.min(100));
698
699            for _ in 0..max {
700                let result = if self.recv_timeout_ms == 0 {
701                    match rx.try_recv() {
702                        Ok(msg) => Some(msg),
703                        Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
704                        Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
705                            return Err(TransportError::Closed);
706                        }
707                    }
708                } else if messages.is_empty() {
709                    // First message: wait with timeout
710                    match tokio::time::timeout(
711                        std::time::Duration::from_millis(self.recv_timeout_ms),
712                        rx.recv(),
713                    )
714                    .await
715                    {
716                        Ok(Some(msg)) => Some(msg),
717                        Ok(None) => return Err(TransportError::Closed),
718                        Err(_) => break, // Timeout
719                    }
720                } else {
721                    // Subsequent: non-blocking drain
722                    match rx.try_recv() {
723                        Ok(msg) => Some(msg),
724                        Err(_) => break,
725                    }
726                };
727
728                if let Some(msg) = result {
729                    messages.push(msg);
730                }
731            }
732
733            // Apply inbound filters via the shared partition helper; DLQ
734            // entries are returned in the RecvBatch for the caller to route.
735            let batch = self.filter_engine.partition_batch(
736                messages,
737                |m| m.payload.as_ref(),
738                |m| m.key.clone(),
739                |m| m.token.clone(),
740            );
741            let messages = batch.messages;
742            let dlq_entries = batch.dlq_entries;
743            let filtered_tokens = batch.filtered_tokens;
744
745            #[cfg(feature = "logger")]
746            if !messages.is_empty() {
747                tracing::debug!(messages = messages.len(), "HTTP transport: batch received");
748            }
749
750            // New default (metrics audit): inbound throughput (was only emitted
751            // by file/pipe/redis). Counts post-filter delivered messages.
752            #[cfg(feature = "metrics")]
753            if !messages.is_empty() {
754                metrics::counter!("dfe_transport_received_total", "transport" => "http")
755                    .increment(messages.len() as u64);
756            }
757
758            Ok(RecvBatch {
759                messages,
760                dlq_entries,
761                filtered_tokens,
762            }
763            .into())
764        }
765
766        #[cfg(not(feature = "http-server"))]
767        {
768            let _ = max;
769            Err(TransportError::Config(
770                "HTTP receive requires the 'http-server' feature".into(),
771            ))
772        }
773    }
774
775    async fn commit(&self, _tokens: &[Self::Token]) -> TransportResult<()> {
776        // HTTP is fire-and-forget -- commit is a no-op.
777        Ok(())
778    }
779}
780
781impl Drop for HttpTransport {
782    fn drop(&mut self) {
783        #[cfg(feature = "http-server")]
784        if let Some(tx) = self.shutdown_tx.lock().take() {
785            let _ = tx.send(());
786        }
787    }
788}
789
790impl super::traits::FromCascade for HttpTransportConfig {}
791
792#[cfg(test)]
793mod tests {
794    use super::*;
795
796    #[test]
797    fn http_token_display() {
798        let token = HttpToken::new(42);
799        assert_eq!(format!("{token}"), "http:42");
800    }
801
802    #[test]
803    fn http_token_display_with_source() {
804        let token = HttpToken::with_source(7, "192.168.1.1:54321".to_string());
805        assert_eq!(format!("{token}"), "http:192.168.1.1:54321:7");
806    }
807
808    #[test]
809    fn config_defaults() {
810        let config = HttpTransportConfig::default();
811        assert!(config.endpoint.is_none());
812        assert!(config.listen.is_none());
813        assert_eq!(config.recv_path, "/ingest");
814        assert_eq!(config.recv_buffer_size, 10_000);
815        assert_eq!(config.recv_timeout_ms, 100);
816    }
817
818    #[test]
819    fn config_sender_helper() {
820        let config = HttpTransportConfig::sender("http://localhost:8080/ingest");
821        assert_eq!(
822            config.endpoint.as_deref(),
823            Some("http://localhost:8080/ingest")
824        );
825        assert!(config.listen.is_none());
826    }
827
828    #[test]
829    fn config_receiver_helper() {
830        let config = HttpTransportConfig::receiver("0.0.0.0:8080");
831        assert!(config.endpoint.is_none());
832        assert_eq!(config.listen.as_deref(), Some("0.0.0.0:8080"));
833    }
834
835    #[tokio::test]
836    async fn send_only_transport() {
837        // Send-only config (no endpoint = send disabled, but transport creates fine)
838        let config = HttpTransportConfig::default();
839        let transport = HttpTransport::new(&config).await.unwrap();
840
841        assert!(transport.is_healthy());
842        assert_eq!(transport.name(), "http");
843
844        // Send without endpoint should fail
845        let result = transport
846            .send("test", bytes::Bytes::from_static(b"payload"))
847            .await;
848        assert!(result.is_fatal());
849
850        // Commit is always ok
851        transport.commit(&[]).await.unwrap();
852    }
853
854    #[tokio::test]
855    async fn close_prevents_send() {
856        let config = HttpTransportConfig::sender("http://localhost:19999/test");
857        let transport = HttpTransport::new(&config).await.unwrap();
858
859        transport.close().await.unwrap();
860        assert!(!transport.is_healthy());
861
862        let result = transport
863            .send("test", bytes::Bytes::from_static(b"data"))
864            .await;
865        assert!(result.is_fatal());
866    }
867
868    #[tokio::test]
869    async fn close_prevents_recv() {
870        let config = HttpTransportConfig::default();
871        let transport = HttpTransport::new(&config).await.unwrap();
872
873        transport.close().await.unwrap();
874        let result = transport.recv(1).await;
875        assert!(result.is_err());
876    }
877
878    /// Full send + receive round-trip test.
879    /// Requires both `transport-http` and `http-server` features.
880    #[cfg(feature = "http-server")]
881    #[tokio::test]
882    async fn send_and_receive_roundtrip() {
883        // Start receiver on a random available port
884        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
885        let addr = listener.local_addr().unwrap();
886        drop(listener); // Free the port for the transport to bind
887
888        let recv_config = HttpTransportConfig {
889            listen: Some(addr.to_string()),
890            recv_path: "/ingest".to_string(),
891            recv_buffer_size: 100,
892            recv_timeout_ms: 1000,
893            ..Default::default()
894        };
895        let receiver = HttpTransport::new(&recv_config).await.unwrap();
896
897        // Give the server a moment to start
898        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
899
900        // Send a message via a separate sender transport
901        let send_config =
902            HttpTransportConfig::sender(&format!("http://127.0.0.1:{}/ingest", addr.port()));
903        let sender = HttpTransport::new(&send_config).await.unwrap();
904
905        let result = sender
906            .send("", bytes::Bytes::from_static(b"{\"msg\":\"hello\"}"))
907            .await;
908        assert!(result.is_ok(), "send failed: {result:?}");
909
910        // Receive it
911        let batch = receiver.recv(10).await.unwrap();
912        assert_eq!(batch.records.len(), 1);
913        assert_eq!(batch.records[0].payload.as_ref(), b"{\"msg\":\"hello\"}");
914        // The source address rides on the batch commit token, not the record.
915        assert!(batch.commit_tokens[0].source_addr.is_some());
916
917        // Cleanup
918        sender.close().await.unwrap();
919        receiver.close().await.unwrap();
920    }
921
922    /// Test that the receiver rejects empty bodies.
923    #[cfg(feature = "http-server")]
924    #[tokio::test]
925    async fn receive_rejects_empty_body() {
926        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
927        let addr = listener.local_addr().unwrap();
928        drop(listener);
929
930        let recv_config = HttpTransportConfig {
931            listen: Some(addr.to_string()),
932            recv_timeout_ms: 200,
933            ..Default::default()
934        };
935        let receiver = HttpTransport::new(&recv_config).await.unwrap();
936        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
937
938        // Send empty body
939        let client = reqwest::Client::new();
940        let resp = client
941            .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
942            .body(Vec::<u8>::new())
943            .send()
944            .await
945            .unwrap();
946
947        assert_eq!(resp.status(), reqwest::StatusCode::BAD_REQUEST);
948
949        // recv should timeout with no messages
950        let records = receiver.recv(10).await.unwrap().records;
951        assert!(records.is_empty());
952
953        receiver.close().await.unwrap();
954    }
955
956    /// Oversized bodies are rejected with 413 before the handler buffers them.
957    #[cfg(feature = "http-server")]
958    #[tokio::test]
959    async fn receive_rejects_oversized_body() {
960        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
961        let addr = listener.local_addr().unwrap();
962        drop(listener);
963
964        let recv_config = HttpTransportConfig {
965            listen: Some(addr.to_string()),
966            recv_timeout_ms: 200,
967            max_body_bytes: 1024, // 1 KiB cap
968            ..Default::default()
969        };
970        let receiver = HttpTransport::new(&recv_config).await.unwrap();
971        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
972
973        // POST 8 KiB -- over the 1 KiB cap.
974        let client = reqwest::Client::new();
975        let resp = client
976            .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
977            .body(vec![b'x'; 8 * 1024])
978            .send()
979            .await
980            .unwrap();
981
982        assert_eq!(resp.status(), reqwest::StatusCode::PAYLOAD_TOO_LARGE);
983
984        // The oversized POST never reached the queue.
985        let records = receiver.recv(10).await.unwrap().records;
986        assert!(records.is_empty(), "oversized body must not be queued");
987
988        receiver.close().await.unwrap();
989    }
990
991    /// Test recv without listen returns config error.
992    #[cfg(feature = "http-server")]
993    #[tokio::test]
994    async fn recv_without_listen_returns_error() {
995        let config = HttpTransportConfig::sender("http://localhost:9999");
996        let transport = HttpTransport::new(&config).await.unwrap();
997
998        let result = transport.recv(10).await;
999        assert!(result.is_err());
1000    }
1001
1002    /// G3: with a pressure governor pinned HIGH, the ingest handler sheds with
1003    /// 503 (SERVICE_UNAVAILABLE) -- the same status as the channel-full path.
1004    /// With `None` (the default `new`), POST is accepted (200) as before.
1005    #[cfg(all(feature = "http-server", feature = "governor"))]
1006    #[tokio::test]
1007    async fn pressure_high_sheds_with_503_and_none_is_normal() {
1008        use crate::governor::{Hysteresis, MemoryPressureSource, PressureSource, UnifiedPressure};
1009        use crate::memory::{MemoryGuard, MemoryGuardConfig};
1010
1011        // --- None default: POST accepted (200) ---
1012        {
1013            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1014            let addr = listener.local_addr().unwrap();
1015            drop(listener);
1016            let cfg = HttpTransportConfig {
1017                listen: Some(addr.to_string()),
1018                recv_timeout_ms: 200,
1019                ..Default::default()
1020            };
1021            let receiver = HttpTransport::new(&cfg).await.unwrap();
1022            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1023
1024            let client = reqwest::Client::new();
1025            let resp = client
1026                .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
1027                .body(b"{\"msg\":\"ok\"}".to_vec())
1028                .send()
1029                .await
1030                .unwrap();
1031            assert_eq!(
1032                resp.status(),
1033                reqwest::StatusCode::OK,
1034                "no governor -> accepted"
1035            );
1036            receiver.close().await.unwrap();
1037        }
1038
1039        // --- Governor pinned HIGH (HARD memory source at 95%): 503 ---
1040        {
1041            let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
1042                limit_bytes: 1000,
1043                pressure_threshold: 0.80,
1044                ..Default::default()
1045            }));
1046            guard.add_bytes(950); // 95% -> well above pause_above
1047            let src = MemoryPressureSource::new(Arc::clone(&guard));
1048            let pressure = Arc::new(UnifiedPressure::new(
1049                vec![Arc::new(src) as Arc<dyn PressureSource>],
1050                Hysteresis::new(0.80, 0.65).expect("valid band"),
1051            ));
1052            // Sanity: the latch is armed.
1053            assert!(pressure.should_hold(), "pinned-high governor must hold");
1054
1055            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1056            let addr = listener.local_addr().unwrap();
1057            drop(listener);
1058            let cfg = HttpTransportConfig {
1059                listen: Some(addr.to_string()),
1060                recv_timeout_ms: 200,
1061                ..Default::default()
1062            };
1063            let receiver = HttpTransport::with_pressure(&cfg, Some(Arc::clone(&pressure)))
1064                .await
1065                .unwrap();
1066            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1067
1068            let client = reqwest::Client::new();
1069            let resp = client
1070                .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
1071                .body(b"{\"msg\":\"shed\"}".to_vec())
1072                .send()
1073                .await
1074                .unwrap();
1075            assert_eq!(
1076                resp.status(),
1077                reqwest::StatusCode::SERVICE_UNAVAILABLE,
1078                "pinned-high governor must shed with 503"
1079            );
1080            // The 503 carries a Retry-After hint so a well-behaved sender backs
1081            // off instead of hot-retrying into a holding pod.
1082            assert_eq!(
1083                resp.headers()
1084                    .get(reqwest::header::RETRY_AFTER)
1085                    .and_then(|v| v.to_str().ok()),
1086                Some("1"),
1087                "503 shed must carry Retry-After"
1088            );
1089
1090            // The shed POST never reached the queue.
1091            let records = receiver.recv(10).await.unwrap().records;
1092            assert!(records.is_empty(), "shed request must not be queued");
1093            receiver.close().await.unwrap();
1094        }
1095    }
1096
1097    #[test]
1098    fn config_serde_roundtrip() {
1099        let config = HttpTransportConfig {
1100            endpoint: Some("http://example.com/ingest".into()),
1101            listen: Some("0.0.0.0:8080".into()),
1102            recv_path: "/custom".into(),
1103            recv_buffer_size: 5000,
1104            recv_timeout_ms: 250,
1105            ..Default::default()
1106        };
1107
1108        let json = serde_json::to_string(&config).unwrap();
1109        let parsed: HttpTransportConfig = serde_json::from_str(&json).unwrap();
1110
1111        assert_eq!(parsed.endpoint, config.endpoint);
1112        assert_eq!(parsed.listen, config.listen);
1113        assert_eq!(parsed.recv_path, config.recv_path);
1114        assert_eq!(parsed.recv_buffer_size, config.recv_buffer_size);
1115        assert_eq!(parsed.recv_timeout_ms, config.recv_timeout_ms);
1116    }
1117}