Skip to main content

hyperi_rustlib/transport/
http.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/http.rs
3// Purpose:   HTTP/HTTPS transport (send via POST, receive via embedded server)
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # HTTP Transport
10//!
11//! HTTP/HTTPS transport for webhook delivery and REST ingest.
12//!
13//! ## Send
14//!
15//! POSTs payload bytes to `{endpoint}/{key}` using reqwest.
16//!
17//! ## Receive (requires `http-server` feature)
18//!
19//! Starts an embedded axum HTTP server that accepts POST requests on a
20//! configurable path. Incoming payloads are queued into a bounded
21//! `tokio::sync::mpsc` channel. `recv()` drains from this channel.
22//!
23//! ## Example
24//!
25//! ```rust,ignore
26//! use hyperi_rustlib::transport::http::{HttpTransport, HttpTransportConfig};
27//!
28//! // Send-only
29//! let config = HttpTransportConfig {
30//!     endpoint: Some("http://loader:8080/ingest".into()),
31//!     ..Default::default()
32//! };
33//! let transport = HttpTransport::new(&config).await?;
34//! transport.send("events", bytes::Bytes::from_static(b"{\"msg\":\"hello\"}")).await;
35//! ```
36
37use super::error::{TransportError, TransportResult};
38#[cfg(feature = "http-server")]
39use super::traits::RecvBatch;
40use super::traits::{CommitToken, TransportBase, TransportReceiver, TransportSender};
41#[cfg(feature = "http-server")]
42use super::types::Message;
43#[cfg(feature = "http-server")]
44use super::types::PayloadFormat;
45use super::types::SendResult;
46use super::work_batch::WorkBatch;
47use serde::{Deserialize, Serialize};
48use std::sync::Arc;
49#[cfg(feature = "http-server")]
50use std::sync::atomic::AtomicU64;
51use std::sync::atomic::{AtomicBool, Ordering};
52
53/// Commit token for HTTP transport.
54///
55/// HTTP is fire-and-forget from the receiver's perspective, so commit
56/// is a no-op. The token provides sequence tracking and optional
57/// client address for observability.
58#[derive(Debug, Clone)]
59pub struct HttpToken {
60    /// Local sequence number (monotonically increasing per transport instance).
61    pub seq: u64,
62
63    /// Source client address (if available from the HTTP request).
64    pub source_addr: Option<String>,
65}
66
67impl HttpToken {
68    /// Create a new token with sequence number.
69    #[must_use]
70    pub fn new(seq: u64) -> Self {
71        Self {
72            seq,
73            source_addr: None,
74        }
75    }
76
77    /// Create a new token with sequence number and source address.
78    #[must_use]
79    pub fn with_source(seq: u64, addr: String) -> Self {
80        Self {
81            seq,
82            source_addr: Some(addr),
83        }
84    }
85}
86
87impl CommitToken for HttpToken {}
88
89impl std::fmt::Display for HttpToken {
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        match &self.source_addr {
92            Some(addr) => write!(f, "http:{}:{}", addr, self.seq),
93            None => write!(f, "http:{}", self.seq),
94        }
95    }
96}
97
98fn default_recv_path() -> String {
99    "/ingest".to_string()
100}
101
102fn default_buffer_size() -> usize {
103    10_000
104}
105
106fn default_recv_timeout_ms() -> u64 {
107    100
108}
109
110fn default_connect_timeout_ms() -> u64 {
111    5_000
112}
113
114fn default_send_timeout_ms() -> u64 {
115    30_000
116}
117
118fn default_max_body_bytes() -> usize {
119    16 * 1024 * 1024
120}
121
122/// Configuration for HTTP transport.
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct HttpTransportConfig {
125    /// Endpoint URL for sending (e.g., "http://loader:8080/ingest"). None = send disabled.
126    #[serde(default)]
127    pub endpoint: Option<String>,
128
129    /// Listen address for receiving (e.g., "0.0.0.0:8080"). None = receive disabled.
130    /// Requires the `http-server` feature.
131    #[serde(default)]
132    pub listen: Option<String>,
133
134    /// Path to accept POSTs on for receive mode. Default: "/ingest".
135    #[serde(default = "default_recv_path")]
136    pub recv_path: String,
137
138    /// Receive buffer size (bounded channel capacity). Default: 10000.
139    #[serde(default = "default_buffer_size")]
140    pub recv_buffer_size: usize,
141
142    /// Receive timeout in milliseconds. Default: 100.
143    #[serde(default = "default_recv_timeout_ms")]
144    pub recv_timeout_ms: u64,
145
146    /// Inbound message filters (applied on recv before caller sees messages).
147    #[serde(default)]
148    pub filters_in: Vec<super::filter::FilterRule>,
149
150    /// Outbound message filters (applied on send before transport dispatches).
151    #[serde(default)]
152    pub filters_out: Vec<super::filter::FilterRule>,
153
154    /// Connect timeout (ms) for the send-side HTTP client. Default 5000.
155    #[serde(default = "default_connect_timeout_ms")]
156    pub connect_timeout_ms: u64,
157
158    /// Total request timeout (ms) per send. Default 30000. Prevents a hung
159    /// send from consuming worker capacity indefinitely.
160    #[serde(default = "default_send_timeout_ms")]
161    pub send_timeout_ms: u64,
162
163    /// Maximum accepted request body size in bytes (receive side). Default
164    /// 16 MiB. Oversized POSTs are rejected with 413 before buffering.
165    #[serde(default = "default_max_body_bytes")]
166    pub max_body_bytes: usize,
167
168    /// Private-CA PEM path for the send-side HTTPS client (needs the `tls`
169    /// feature). Trusts this CA *in addition to* the OS native roots. None =
170    /// native roots only (reqwest default). Maps to `TlsTrust { native_roots:
171    /// true, extra_roots: [ca] }`.
172    #[serde(default)]
173    pub tls_ca_path: Option<String>,
174}
175
176impl Default for HttpTransportConfig {
177    fn default() -> Self {
178        Self {
179            endpoint: None,
180            listen: None,
181            recv_path: default_recv_path(),
182            recv_buffer_size: default_buffer_size(),
183            recv_timeout_ms: default_recv_timeout_ms(),
184            filters_in: Vec::new(),
185            filters_out: Vec::new(),
186            connect_timeout_ms: default_connect_timeout_ms(),
187            send_timeout_ms: default_send_timeout_ms(),
188            max_body_bytes: default_max_body_bytes(),
189            tls_ca_path: None,
190        }
191    }
192}
193
194impl HttpTransportConfig {
195    /// Load from the config cascade under the `transport.http` key.
196    #[must_use]
197    pub fn from_cascade() -> Self {
198        <Self as super::traits::FromCascade>::from_cascade_key("transport.http")
199    }
200
201    /// Create a send-only config pointing at the given endpoint URL.
202    #[must_use]
203    pub fn sender(endpoint: &str) -> Self {
204        Self {
205            endpoint: Some(endpoint.to_string()),
206            ..Default::default()
207        }
208    }
209
210    /// Create a receive-only config listening on the given address.
211    #[must_use]
212    pub fn receiver(listen: &str) -> Self {
213        Self {
214            listen: Some(listen.to_string()),
215            ..Default::default()
216        }
217    }
218}
219
220/// HTTP/HTTPS transport.
221///
222/// Supports send (POST to endpoint) and receive (embedded axum server).
223/// The receive side requires the `http-server` feature for axum.
224pub struct HttpTransport {
225    /// reqwest client for sending (always available when transport-http is enabled).
226    client: reqwest::Client,
227
228    /// Base URL for sending (None = send disabled).
229    endpoint: Option<String>,
230
231    /// Receiver channel populated by the embedded HTTP server.
232    /// Only available when `http-server` feature is enabled AND `listen` is configured.
233    #[cfg(feature = "http-server")]
234    receiver: Option<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Message<HttpToken>>>>,
235
236    /// Shutdown signal for the server task.
237    ///
238    /// Behind a `parking_lot::Mutex` for interior mutability: `close(&self)`
239    /// fires it to stop the embedded server promptly (graceful shutdown),
240    /// rather than waiting for `Drop`. `take()` makes both paths idempotent.
241    #[cfg(feature = "http-server")]
242    shutdown_tx: parking_lot::Mutex<Option<tokio::sync::oneshot::Sender<()>>>,
243
244    /// Server background task handle.
245    #[cfg(feature = "http-server")]
246    _server_handle: Option<tokio::task::JoinHandle<()>>,
247
248    /// Whether the transport is closed.
249    closed: Arc<AtomicBool>,
250
251    /// Receive timeout in milliseconds (used by receive side).
252    #[cfg(feature = "http-server")]
253    recv_timeout_ms: u64,
254
255    /// Transport-level message filter engine.
256    filter_engine: super::filter::TransportFilterEngine,
257}
258
259impl HttpTransport {
260    /// Create a new HTTP transport.
261    ///
262    /// - Set `config.endpoint` to enable sending (POST to URL).
263    /// - Set `config.listen` to enable receiving (embedded HTTP server, requires `http-server` feature).
264    ///
265    /// # Errors
266    ///
267    /// Returns error if the listen address is invalid or the server fails to bind.
268    pub async fn new(config: &HttpTransportConfig) -> TransportResult<Self> {
269        // Default path: no pressure governor -> byte-identical to before.
270        Self::new_inner(
271            config,
272            #[cfg(feature = "governor")]
273            None,
274        )
275        .await
276    }
277
278    /// Create an HTTP transport bound to a pressure governor (G3, `governor`
279    /// feature).
280    ///
281    /// Identical to [`new`](Self::new) except the embedded receive server
282    /// consults `pressure` BEFORE enqueuing each request: while
283    /// [`UnifiedPressure::should_hold`](crate::governor::UnifiedPressure::should_hold)
284    /// holds, the handler returns 503 (SERVICE_UNAVAILABLE) -- the same status
285    /// the existing channel-full backpressure path uses. Passing `None` is
286    /// exactly equivalent to [`new`](Self::new).
287    ///
288    /// # Errors
289    ///
290    /// Same as [`new`](Self::new).
291    #[cfg(feature = "governor")]
292    pub async fn with_pressure(
293        config: &HttpTransportConfig,
294        pressure: Option<Arc<crate::governor::UnifiedPressure>>,
295    ) -> TransportResult<Self> {
296        Self::new_inner(config, pressure).await
297    }
298
299    async fn new_inner(
300        config: &HttpTransportConfig,
301        #[cfg(feature = "governor")] pressure: Option<Arc<crate::governor::UnifiedPressure>>,
302    ) -> TransportResult<Self> {
303        // `mut` is only used on the TLS path; harmless without the feature.
304        #[cfg_attr(not(feature = "tls"), allow(unused_mut))]
305        let mut client_builder = reqwest::Client::builder()
306            .connect_timeout(std::time::Duration::from_millis(config.connect_timeout_ms))
307            .timeout(std::time::Duration::from_millis(config.send_timeout_ms));
308
309        // Private-CA trust via the unified TLS module (augments native roots).
310        #[cfg(feature = "tls")]
311        if let Some(ref ca) = config.tls_ca_path {
312            let trust = crate::tls::TlsTrust {
313                native_roots: true,
314                webpki_roots: false,
315                extra_roots: vec![ca.into()],
316                extra_intermediates: Vec::new(),
317                exclusive: false,
318            };
319            let tls_cfg =
320                crate::tls::build_client_config(crate::tls::TlsConfigSource::Trust(trust))
321                    .map_err(|e| TransportError::Config(format!("HTTP client TLS: {e}")))?;
322            client_builder = client_builder.use_preconfigured_tls((*tls_cfg).clone());
323        }
324        #[cfg(not(feature = "tls"))]
325        if config.tls_ca_path.is_some() {
326            tracing::warn!(
327                "http transport tls_ca_path is set but the `tls` feature is disabled -- \
328                 ignoring (using reqwest default roots)"
329            );
330        }
331
332        let client = client_builder
333            .build()
334            .map_err(|e| TransportError::Config(format!("failed to create HTTP client: {e}")))?;
335
336        #[cfg(feature = "http-server")]
337        let (receiver, shutdown_tx, server_handle) = if let Some(listen) = &config.listen {
338            let addr: std::net::SocketAddr = listen
339                .parse()
340                .map_err(|e| TransportError::Config(format!("invalid listen address: {e}")))?;
341
342            let (tx, rx) = tokio::sync::mpsc::channel(config.recv_buffer_size);
343            let (sd_tx, sd_rx) = tokio::sync::oneshot::channel::<()>();
344
345            let sequence = Arc::new(AtomicU64::new(0));
346            let recv_path = config.recv_path.clone();
347
348            let app = build_receiver_router(
349                tx,
350                sequence,
351                &recv_path,
352                config.max_body_bytes,
353                #[cfg(feature = "governor")]
354                pressure.clone(),
355            );
356
357            let listener = tokio::net::TcpListener::bind(addr).await.map_err(|e| {
358                TransportError::Connection(format!("failed to bind to {addr}: {e}"))
359            })?;
360
361            let handle = tokio::spawn(async move {
362                axum::serve(
363                    listener,
364                    app.into_make_service_with_connect_info::<std::net::SocketAddr>(),
365                )
366                .with_graceful_shutdown(async {
367                    sd_rx.await.ok();
368                })
369                .await
370                .ok();
371            });
372
373            (Some(tokio::sync::Mutex::new(rx)), Some(sd_tx), Some(handle))
374        } else {
375            (None, None, None)
376        };
377
378        // When the governor feature is on but http-server is off there is no
379        // receive server to attach the pressure governor to; consume it so the
380        // signature stays uniform without an unused-variable warning.
381        #[cfg(all(feature = "governor", not(feature = "http-server")))]
382        let _ = pressure;
383
384        #[cfg(feature = "logger")]
385        tracing::info!(
386            endpoint = ?config.endpoint,
387            listen = ?config.listen,
388            "HTTP transport opened"
389        );
390
391        // Fail loud on bad filter config -- silently disabling filters
392        // turns a misconfigured `drop` / `dlq` rule into a permanent pass.
393        let filter_engine = super::filter::TransportFilterEngine::new(
394            &config.filters_in,
395            &config.filters_out,
396            &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
397        )?;
398
399        let closed = Arc::new(AtomicBool::new(false));
400
401        #[cfg(feature = "health")]
402        {
403            let h = Arc::clone(&closed);
404            crate::health::HealthRegistry::register("transport:http", move || {
405                if h.load(Ordering::Relaxed) {
406                    crate::health::HealthStatus::Unhealthy
407                } else {
408                    crate::health::HealthStatus::Healthy
409                }
410            });
411        }
412
413        Ok(Self {
414            client,
415            endpoint: config.endpoint.clone(),
416            #[cfg(feature = "http-server")]
417            receiver,
418            #[cfg(feature = "http-server")]
419            shutdown_tx: parking_lot::Mutex::new(shutdown_tx),
420            #[cfg(feature = "http-server")]
421            _server_handle: server_handle,
422            closed,
423            #[cfg(feature = "http-server")]
424            recv_timeout_ms: config.recv_timeout_ms,
425            filter_engine,
426        })
427    }
428}
429
430/// Build the axum router for the receive side.
431#[cfg(feature = "http-server")]
432fn build_receiver_router(
433    sender: tokio::sync::mpsc::Sender<Message<HttpToken>>,
434    sequence: Arc<AtomicU64>,
435    recv_path: &str,
436    max_body_bytes: usize,
437    #[cfg(feature = "governor")] pressure: Option<Arc<crate::governor::UnifiedPressure>>,
438) -> axum::Router {
439    use axum::routing::post;
440
441    let state = ReceiverState {
442        sender,
443        sequence,
444        #[cfg(feature = "governor")]
445        pressure,
446    };
447
448    axum::Router::new()
449        .route(recv_path, post(ingest_handler))
450        // Reject oversized bodies with 413 before the handler buffers them.
451        .layer(axum::extract::DefaultBodyLimit::max(max_body_bytes))
452        .with_state(state)
453}
454
455/// Shared state for the receive handler.
456#[cfg(feature = "http-server")]
457#[derive(Clone)]
458struct ReceiverState {
459    sender: tokio::sync::mpsc::Sender<Message<HttpToken>>,
460    sequence: Arc<AtomicU64>,
461    /// Optional pressure governor (G3, `governor` feature). `None` by default
462    /// -> the handler never consults it and behaviour is byte-identical. When
463    /// `Some`, the handler rejects with 503 while [`UnifiedPressure::should_hold`]
464    /// holds -- pressure-driven shedding ON TOP of the existing channel-full
465    /// 503, never replacing it.
466    #[cfg(feature = "governor")]
467    pressure: Option<Arc<crate::governor::UnifiedPressure>>,
468}
469
470/// POST handler that accepts raw bytes and queues them into the mpsc channel.
471#[cfg(feature = "http-server")]
472async fn ingest_handler(
473    axum::extract::State(state): axum::extract::State<ReceiverState>,
474    axum::extract::ConnectInfo(addr): axum::extract::ConnectInfo<std::net::SocketAddr>,
475    headers: axum::http::HeaderMap,
476    body: axum::body::Bytes,
477) -> axum::response::Response {
478    use axum::response::IntoResponse as _;
479
480    if body.is_empty() {
481        return axum::http::StatusCode::BAD_REQUEST.into_response();
482    }
483
484    // G3 pressure-driven shedding (governor feature, opt-in). BEFORE enqueuing,
485    // if a governor is wired and it says hold, shed the request with 503 --
486    // consistent with the existing channel-full 503 below (NOT 429). Default
487    // `None` -> this is skipped and behaviour is byte-identical.
488    #[cfg(feature = "governor")]
489    if let Some(pressure) = &state.pressure
490        && pressure.should_hold()
491    {
492        #[cfg(feature = "metrics")]
493        metrics::counter!("dfe_transport_backpressured_total", "transport" => "http", "reason" => "pressure")
494            .increment(1);
495        return shed_503();
496    }
497
498    // Extract W3C traceparent from incoming HTTP headers for distributed tracing
499    #[cfg(feature = "transport-trace")]
500    if let Some(tp) = headers
501        .get(super::propagation::TRACEPARENT_HEADER)
502        .and_then(|v| v.to_str().ok())
503        && super::propagation::is_valid_traceparent(tp)
504    {
505        tracing::Span::current().record("traceparent", tp);
506    }
507
508    // Suppress unused variable warning when otel feature is disabled
509    #[cfg(not(feature = "otel"))]
510    let _ = &headers;
511
512    let seq = state.sequence.fetch_add(1, Ordering::Relaxed);
513    let format = PayloadFormat::detect(&body);
514    let timestamp_ms = chrono::Utc::now().timestamp_millis();
515
516    // `body` is `axum::body::Bytes` (= `bytes::Bytes`) -- move it directly,
517    // no copy needed.
518    let msg = Message {
519        key: None,
520        payload: body,
521        token: HttpToken::with_source(seq, addr.to_string()),
522        timestamp_ms: Some(timestamp_ms),
523        format,
524    };
525
526    match state.sender.try_send(msg) {
527        Ok(()) => {
528            #[cfg(feature = "metrics")]
529            metrics::counter!("dfe_transport_sent_total", "transport" => "http").increment(1);
530            axum::http::StatusCode::OK.into_response()
531        }
532        Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
533            #[cfg(feature = "metrics")]
534            metrics::counter!("dfe_transport_backpressured_total", "transport" => "http")
535                .increment(1);
536            shed_503()
537        }
538        Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
539            #[cfg(feature = "metrics")]
540            metrics::counter!("dfe_transport_refused_total", "transport" => "http").increment(1);
541            axum::http::StatusCode::GONE.into_response()
542        }
543    }
544}
545
546/// 503 shed response with a `Retry-After: 1` hint, so a well-behaved sender
547/// backs off briefly instead of hot-retrying into a pod that is already holding.
548#[cfg(feature = "http-server")]
549fn shed_503() -> axum::response::Response {
550    use axum::response::IntoResponse as _;
551    (
552        axum::http::StatusCode::SERVICE_UNAVAILABLE,
553        [(axum::http::header::RETRY_AFTER, "1")],
554    )
555        .into_response()
556}
557
558impl TransportBase for HttpTransport {
559    async fn close(&self) -> TransportResult<()> {
560        self.closed.store(true, Ordering::Relaxed);
561        // Stop the embedded server now (graceful shutdown) rather than on Drop.
562        // take() => idempotent: a later close()/drop() is a no-op.
563        #[cfg(feature = "http-server")]
564        if let Some(tx) = self.shutdown_tx.lock().take() {
565            let _ = tx.send(());
566        }
567        Ok(())
568    }
569
570    fn is_healthy(&self) -> bool {
571        !self.closed.load(Ordering::Relaxed)
572    }
573
574    fn name(&self) -> &'static str {
575        "http"
576    }
577}
578
579impl TransportSender for HttpTransport {
580    async fn send(&self, key: &str, payload: bytes::Bytes) -> SendResult {
581        if self.closed.load(Ordering::Relaxed) {
582            return SendResult::Fatal(TransportError::Closed);
583        }
584
585        // Outbound filter check
586        if self.filter_engine.has_outbound_filters() {
587            match self.filter_engine.apply_outbound(&payload) {
588                super::filter::FilterDisposition::Pass => {}
589                super::filter::FilterDisposition::Drop => return SendResult::Ok,
590                super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
591            }
592        }
593
594        let Some(base_url) = &self.endpoint else {
595            return SendResult::Fatal(TransportError::Config(
596                "no endpoint configured for sending".into(),
597            ));
598        };
599
600        // Build URL: {base_url}/{key} if key is non-empty, otherwise just {base_url}
601        let url = if key.is_empty() {
602            base_url.clone()
603        } else {
604            let base = base_url.trim_end_matches('/');
605            let suffix = key.trim_start_matches('/');
606            format!("{base}/{suffix}")
607        };
608
609        #[cfg(feature = "metrics")]
610        let start = std::time::Instant::now();
611
612        // Build request with optional W3C traceparent header for distributed tracing
613        let request_builder = self
614            .client
615            .post(&url)
616            .header("content-type", "application/octet-stream");
617
618        #[cfg(feature = "transport-trace")]
619        let request_builder = if let Some(tp) = super::propagation::current_traceparent() {
620            request_builder.header(super::propagation::TRACEPARENT_HEADER, tp)
621        } else {
622            request_builder
623        };
624
625        #[cfg(feature = "logger")]
626        let payload_len = payload.len();
627        let result = match request_builder.body(payload).send().await {
628            Ok(resp) if resp.status().is_success() => {
629                #[cfg(feature = "logger")]
630                tracing::debug!(url = %url, bytes = payload_len, "HTTP transport: POST sent");
631
632                #[cfg(feature = "metrics")]
633                metrics::counter!("dfe_transport_sent_total", "transport" => "http").increment(1);
634                SendResult::Ok
635            }
636            Ok(resp)
637                if resp.status() == reqwest::StatusCode::TOO_MANY_REQUESTS
638                    || resp.status() == reqwest::StatusCode::SERVICE_UNAVAILABLE =>
639            {
640                #[cfg(feature = "logger")]
641                tracing::warn!(status = %resp.status(), url = %url, "HTTP transport: backpressure");
642
643                #[cfg(feature = "metrics")]
644                metrics::counter!("dfe_transport_backpressured_total", "transport" => "http")
645                    .increment(1);
646                SendResult::Backpressured
647            }
648            Ok(resp) => {
649                #[cfg(feature = "logger")]
650                tracing::warn!(status = %resp.status(), url = %url, "HTTP transport: send error");
651
652                #[cfg(feature = "metrics")]
653                metrics::counter!("dfe_transport_send_errors_total", "transport" => "http")
654                    .increment(1);
655                SendResult::Fatal(TransportError::Send(format!(
656                    "HTTP {} from {}",
657                    resp.status(),
658                    url
659                )))
660            }
661            Err(e) => {
662                #[cfg(feature = "logger")]
663                tracing::warn!(error = %e, url = %url, "HTTP transport: request failed");
664
665                #[cfg(feature = "metrics")]
666                metrics::counter!("dfe_transport_send_errors_total", "transport" => "http")
667                    .increment(1);
668                SendResult::Fatal(TransportError::Send(format!("HTTP request failed: {e}")))
669            }
670        };
671
672        #[cfg(feature = "metrics")]
673        metrics::histogram!("dfe_transport_send_duration_seconds", "transport" => "http")
674            .record(start.elapsed().as_secs_f64());
675
676        result
677    }
678}
679
680impl TransportReceiver for HttpTransport {
681    type Token = HttpToken;
682
683    async fn recv(&self, max: usize) -> TransportResult<WorkBatch<Self::Token>> {
684        if self.closed.load(Ordering::Relaxed) {
685            return Err(TransportError::Closed);
686        }
687
688        #[cfg(feature = "http-server")]
689        {
690            let Some(receiver) = &self.receiver else {
691                return Err(TransportError::Config(
692                    "no listen address configured for receiving".into(),
693                ));
694            };
695
696            let mut rx = receiver.lock().await;
697            let mut messages = Vec::with_capacity(max.min(100));
698
699            for _ in 0..max {
700                let result = if self.recv_timeout_ms == 0 {
701                    match rx.try_recv() {
702                        Ok(msg) => Some(msg),
703                        Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
704                        Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
705                            return Err(TransportError::Closed);
706                        }
707                    }
708                } else if messages.is_empty() {
709                    // First message: wait with timeout
710                    match tokio::time::timeout(
711                        std::time::Duration::from_millis(self.recv_timeout_ms),
712                        rx.recv(),
713                    )
714                    .await
715                    {
716                        Ok(Some(msg)) => Some(msg),
717                        Ok(None) => return Err(TransportError::Closed),
718                        Err(_) => break, // Timeout
719                    }
720                } else {
721                    // Subsequent: non-blocking drain
722                    match rx.try_recv() {
723                        Ok(msg) => Some(msg),
724                        Err(_) => break,
725                    }
726                };
727
728                if let Some(msg) = result {
729                    messages.push(msg);
730                }
731            }
732
733            // Apply inbound filters via the shared partition helper; DLQ
734            // entries are returned in the RecvBatch for the caller to route.
735            let batch = self.filter_engine.partition_batch(
736                messages,
737                |m| m.payload.as_ref(),
738                |m| m.key.clone(),
739                |m| m.token.clone(),
740            );
741            let messages = batch.messages;
742            let dlq_entries = batch.dlq_entries;
743            let filtered_tokens = batch.filtered_tokens;
744
745            #[cfg(feature = "logger")]
746            if !messages.is_empty() {
747                tracing::debug!(messages = messages.len(), "HTTP transport: batch received");
748            }
749
750            Ok(RecvBatch {
751                messages,
752                dlq_entries,
753                filtered_tokens,
754            }
755            .into())
756        }
757
758        #[cfg(not(feature = "http-server"))]
759        {
760            let _ = max;
761            Err(TransportError::Config(
762                "HTTP receive requires the 'http-server' feature".into(),
763            ))
764        }
765    }
766
767    async fn commit(&self, _tokens: &[Self::Token]) -> TransportResult<()> {
768        // HTTP is fire-and-forget -- commit is a no-op.
769        Ok(())
770    }
771}
772
773impl Drop for HttpTransport {
774    fn drop(&mut self) {
775        #[cfg(feature = "http-server")]
776        if let Some(tx) = self.shutdown_tx.lock().take() {
777            let _ = tx.send(());
778        }
779    }
780}
781
782impl super::traits::FromCascade for HttpTransportConfig {}
783
784#[cfg(test)]
785mod tests {
786    use super::*;
787
788    #[test]
789    fn http_token_display() {
790        let token = HttpToken::new(42);
791        assert_eq!(format!("{token}"), "http:42");
792    }
793
794    #[test]
795    fn http_token_display_with_source() {
796        let token = HttpToken::with_source(7, "192.168.1.1:54321".to_string());
797        assert_eq!(format!("{token}"), "http:192.168.1.1:54321:7");
798    }
799
800    #[test]
801    fn config_defaults() {
802        let config = HttpTransportConfig::default();
803        assert!(config.endpoint.is_none());
804        assert!(config.listen.is_none());
805        assert_eq!(config.recv_path, "/ingest");
806        assert_eq!(config.recv_buffer_size, 10_000);
807        assert_eq!(config.recv_timeout_ms, 100);
808    }
809
810    #[test]
811    fn config_sender_helper() {
812        let config = HttpTransportConfig::sender("http://localhost:8080/ingest");
813        assert_eq!(
814            config.endpoint.as_deref(),
815            Some("http://localhost:8080/ingest")
816        );
817        assert!(config.listen.is_none());
818    }
819
820    #[test]
821    fn config_receiver_helper() {
822        let config = HttpTransportConfig::receiver("0.0.0.0:8080");
823        assert!(config.endpoint.is_none());
824        assert_eq!(config.listen.as_deref(), Some("0.0.0.0:8080"));
825    }
826
827    #[tokio::test]
828    async fn send_only_transport() {
829        // Send-only config (no endpoint = send disabled, but transport creates fine)
830        let config = HttpTransportConfig::default();
831        let transport = HttpTransport::new(&config).await.unwrap();
832
833        assert!(transport.is_healthy());
834        assert_eq!(transport.name(), "http");
835
836        // Send without endpoint should fail
837        let result = transport
838            .send("test", bytes::Bytes::from_static(b"payload"))
839            .await;
840        assert!(result.is_fatal());
841
842        // Commit is always ok
843        transport.commit(&[]).await.unwrap();
844    }
845
846    #[tokio::test]
847    async fn close_prevents_send() {
848        let config = HttpTransportConfig::sender("http://localhost:19999/test");
849        let transport = HttpTransport::new(&config).await.unwrap();
850
851        transport.close().await.unwrap();
852        assert!(!transport.is_healthy());
853
854        let result = transport
855            .send("test", bytes::Bytes::from_static(b"data"))
856            .await;
857        assert!(result.is_fatal());
858    }
859
860    #[tokio::test]
861    async fn close_prevents_recv() {
862        let config = HttpTransportConfig::default();
863        let transport = HttpTransport::new(&config).await.unwrap();
864
865        transport.close().await.unwrap();
866        let result = transport.recv(1).await;
867        assert!(result.is_err());
868    }
869
870    /// Full send + receive round-trip test.
871    /// Requires both `transport-http` and `http-server` features.
872    #[cfg(feature = "http-server")]
873    #[tokio::test]
874    async fn send_and_receive_roundtrip() {
875        // Start receiver on a random available port
876        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
877        let addr = listener.local_addr().unwrap();
878        drop(listener); // Free the port for the transport to bind
879
880        let recv_config = HttpTransportConfig {
881            listen: Some(addr.to_string()),
882            recv_path: "/ingest".to_string(),
883            recv_buffer_size: 100,
884            recv_timeout_ms: 1000,
885            ..Default::default()
886        };
887        let receiver = HttpTransport::new(&recv_config).await.unwrap();
888
889        // Give the server a moment to start
890        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
891
892        // Send a message via a separate sender transport
893        let send_config =
894            HttpTransportConfig::sender(&format!("http://127.0.0.1:{}/ingest", addr.port()));
895        let sender = HttpTransport::new(&send_config).await.unwrap();
896
897        let result = sender
898            .send("", bytes::Bytes::from_static(b"{\"msg\":\"hello\"}"))
899            .await;
900        assert!(result.is_ok(), "send failed: {result:?}");
901
902        // Receive it
903        let batch = receiver.recv(10).await.unwrap();
904        assert_eq!(batch.records.len(), 1);
905        assert_eq!(batch.records[0].payload.as_ref(), b"{\"msg\":\"hello\"}");
906        // The source address rides on the batch commit token, not the record.
907        assert!(batch.commit_tokens[0].source_addr.is_some());
908
909        // Cleanup
910        sender.close().await.unwrap();
911        receiver.close().await.unwrap();
912    }
913
914    /// Test that the receiver rejects empty bodies.
915    #[cfg(feature = "http-server")]
916    #[tokio::test]
917    async fn receive_rejects_empty_body() {
918        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
919        let addr = listener.local_addr().unwrap();
920        drop(listener);
921
922        let recv_config = HttpTransportConfig {
923            listen: Some(addr.to_string()),
924            recv_timeout_ms: 200,
925            ..Default::default()
926        };
927        let receiver = HttpTransport::new(&recv_config).await.unwrap();
928        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
929
930        // Send empty body
931        let client = reqwest::Client::new();
932        let resp = client
933            .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
934            .body(Vec::<u8>::new())
935            .send()
936            .await
937            .unwrap();
938
939        assert_eq!(resp.status(), reqwest::StatusCode::BAD_REQUEST);
940
941        // recv should timeout with no messages
942        let records = receiver.recv(10).await.unwrap().records;
943        assert!(records.is_empty());
944
945        receiver.close().await.unwrap();
946    }
947
948    /// Oversized bodies are rejected with 413 before the handler buffers them.
949    #[cfg(feature = "http-server")]
950    #[tokio::test]
951    async fn receive_rejects_oversized_body() {
952        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
953        let addr = listener.local_addr().unwrap();
954        drop(listener);
955
956        let recv_config = HttpTransportConfig {
957            listen: Some(addr.to_string()),
958            recv_timeout_ms: 200,
959            max_body_bytes: 1024, // 1 KiB cap
960            ..Default::default()
961        };
962        let receiver = HttpTransport::new(&recv_config).await.unwrap();
963        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
964
965        // POST 8 KiB -- over the 1 KiB cap.
966        let client = reqwest::Client::new();
967        let resp = client
968            .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
969            .body(vec![b'x'; 8 * 1024])
970            .send()
971            .await
972            .unwrap();
973
974        assert_eq!(resp.status(), reqwest::StatusCode::PAYLOAD_TOO_LARGE);
975
976        // The oversized POST never reached the queue.
977        let records = receiver.recv(10).await.unwrap().records;
978        assert!(records.is_empty(), "oversized body must not be queued");
979
980        receiver.close().await.unwrap();
981    }
982
983    /// Test recv without listen returns config error.
984    #[cfg(feature = "http-server")]
985    #[tokio::test]
986    async fn recv_without_listen_returns_error() {
987        let config = HttpTransportConfig::sender("http://localhost:9999");
988        let transport = HttpTransport::new(&config).await.unwrap();
989
990        let result = transport.recv(10).await;
991        assert!(result.is_err());
992    }
993
994    /// G3: with a pressure governor pinned HIGH, the ingest handler sheds with
995    /// 503 (SERVICE_UNAVAILABLE) -- the same status as the channel-full path.
996    /// With `None` (the default `new`), POST is accepted (200) as before.
997    #[cfg(all(feature = "http-server", feature = "governor"))]
998    #[tokio::test]
999    async fn pressure_high_sheds_with_503_and_none_is_normal() {
1000        use crate::governor::{Hysteresis, MemoryPressureSource, PressureSource, UnifiedPressure};
1001        use crate::memory::{MemoryGuard, MemoryGuardConfig};
1002
1003        // --- None default: POST accepted (200) ---
1004        {
1005            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1006            let addr = listener.local_addr().unwrap();
1007            drop(listener);
1008            let cfg = HttpTransportConfig {
1009                listen: Some(addr.to_string()),
1010                recv_timeout_ms: 200,
1011                ..Default::default()
1012            };
1013            let receiver = HttpTransport::new(&cfg).await.unwrap();
1014            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1015
1016            let client = reqwest::Client::new();
1017            let resp = client
1018                .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
1019                .body(b"{\"msg\":\"ok\"}".to_vec())
1020                .send()
1021                .await
1022                .unwrap();
1023            assert_eq!(
1024                resp.status(),
1025                reqwest::StatusCode::OK,
1026                "no governor -> accepted"
1027            );
1028            receiver.close().await.unwrap();
1029        }
1030
1031        // --- Governor pinned HIGH (HARD memory source at 95%): 503 ---
1032        {
1033            let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
1034                limit_bytes: 1000,
1035                pressure_threshold: 0.80,
1036                ..Default::default()
1037            }));
1038            guard.add_bytes(950); // 95% -> well above pause_above
1039            let src = MemoryPressureSource::new(Arc::clone(&guard));
1040            let pressure = Arc::new(UnifiedPressure::new(
1041                vec![Arc::new(src) as Arc<dyn PressureSource>],
1042                Hysteresis::new(0.80, 0.65).expect("valid band"),
1043            ));
1044            // Sanity: the latch is armed.
1045            assert!(pressure.should_hold(), "pinned-high governor must hold");
1046
1047            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1048            let addr = listener.local_addr().unwrap();
1049            drop(listener);
1050            let cfg = HttpTransportConfig {
1051                listen: Some(addr.to_string()),
1052                recv_timeout_ms: 200,
1053                ..Default::default()
1054            };
1055            let receiver = HttpTransport::with_pressure(&cfg, Some(Arc::clone(&pressure)))
1056                .await
1057                .unwrap();
1058            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1059
1060            let client = reqwest::Client::new();
1061            let resp = client
1062                .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
1063                .body(b"{\"msg\":\"shed\"}".to_vec())
1064                .send()
1065                .await
1066                .unwrap();
1067            assert_eq!(
1068                resp.status(),
1069                reqwest::StatusCode::SERVICE_UNAVAILABLE,
1070                "pinned-high governor must shed with 503"
1071            );
1072            // The 503 carries a Retry-After hint so a well-behaved sender backs
1073            // off instead of hot-retrying into a holding pod.
1074            assert_eq!(
1075                resp.headers()
1076                    .get(reqwest::header::RETRY_AFTER)
1077                    .and_then(|v| v.to_str().ok()),
1078                Some("1"),
1079                "503 shed must carry Retry-After"
1080            );
1081
1082            // The shed POST never reached the queue.
1083            let records = receiver.recv(10).await.unwrap().records;
1084            assert!(records.is_empty(), "shed request must not be queued");
1085            receiver.close().await.unwrap();
1086        }
1087    }
1088
1089    #[test]
1090    fn config_serde_roundtrip() {
1091        let config = HttpTransportConfig {
1092            endpoint: Some("http://example.com/ingest".into()),
1093            listen: Some("0.0.0.0:8080".into()),
1094            recv_path: "/custom".into(),
1095            recv_buffer_size: 5000,
1096            recv_timeout_ms: 250,
1097            ..Default::default()
1098        };
1099
1100        let json = serde_json::to_string(&config).unwrap();
1101        let parsed: HttpTransportConfig = serde_json::from_str(&json).unwrap();
1102
1103        assert_eq!(parsed.endpoint, config.endpoint);
1104        assert_eq!(parsed.listen, config.listen);
1105        assert_eq!(parsed.recv_path, config.recv_path);
1106        assert_eq!(parsed.recv_buffer_size, config.recv_buffer_size);
1107        assert_eq!(parsed.recv_timeout_ms, config.recv_timeout_ms);
1108    }
1109}