Skip to main content

hyperi_rustlib/transport/
http.rs

1// Project:   hyperi-rustlib
2// File:      src/transport/http.rs
3// Purpose:   HTTP/HTTPS transport (send via POST, receive via embedded server)
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # HTTP Transport
10//!
11//! HTTP/HTTPS transport for webhook delivery and REST ingest.
12//!
13//! ## Send
14//!
15//! POSTs payload bytes to `{endpoint}/{key}` using reqwest.
16//!
17//! ## Receive (requires `http-server` feature)
18//!
19//! Starts an embedded axum HTTP server that accepts POST requests on a
20//! configurable path. Incoming payloads are queued into a bounded
21//! `tokio::sync::mpsc` channel. `recv()` drains from this channel.
22//!
23//! ## Example
24//!
25//! ```rust,ignore
26//! use hyperi_rustlib::transport::http::{HttpTransport, HttpTransportConfig};
27//!
28//! // Send-only
29//! let config = HttpTransportConfig {
30//!     endpoint: Some("http://loader:8080/ingest".into()),
31//!     ..Default::default()
32//! };
33//! let transport = HttpTransport::new(&config).await?;
34//! transport.send("events", bytes::Bytes::from_static(b"{\"msg\":\"hello\"}")).await;
35//! ```
36
37use super::error::{TransportError, TransportResult};
38#[cfg(feature = "http-server")]
39use super::traits::RecvBatch;
40use super::traits::{CommitToken, TransportBase, TransportReceiver, TransportSender};
41#[cfg(feature = "http-server")]
42use super::types::Message;
43#[cfg(feature = "http-server")]
44use super::types::PayloadFormat;
45use super::types::SendResult;
46use super::work_batch::WorkBatch;
47use serde::{Deserialize, Serialize};
48use std::sync::Arc;
49#[cfg(feature = "http-server")]
50use std::sync::atomic::AtomicU64;
51use std::sync::atomic::{AtomicBool, Ordering};
52
53/// Commit token for HTTP transport.
54///
55/// HTTP is fire-and-forget from the receiver's perspective, so commit
56/// is a no-op. The token provides sequence tracking and optional
57/// client address for observability.
58#[derive(Debug, Clone)]
59pub struct HttpToken {
60    /// Local sequence number (monotonically increasing per transport instance).
61    pub seq: u64,
62
63    /// Source client address (if available from the HTTP request).
64    pub source_addr: Option<String>,
65}
66
67impl HttpToken {
68    /// Create a new token with sequence number.
69    #[must_use]
70    pub fn new(seq: u64) -> Self {
71        Self {
72            seq,
73            source_addr: None,
74        }
75    }
76
77    /// Create a new token with sequence number and source address.
78    #[must_use]
79    pub fn with_source(seq: u64, addr: String) -> Self {
80        Self {
81            seq,
82            source_addr: Some(addr),
83        }
84    }
85}
86
87impl CommitToken for HttpToken {}
88
89impl std::fmt::Display for HttpToken {
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        match &self.source_addr {
92            Some(addr) => write!(f, "http:{}:{}", addr, self.seq),
93            None => write!(f, "http:{}", self.seq),
94        }
95    }
96}
97
98fn default_recv_path() -> String {
99    "/ingest".to_string()
100}
101
102fn default_buffer_size() -> usize {
103    10_000
104}
105
106fn default_recv_timeout_ms() -> u64 {
107    100
108}
109
110fn default_connect_timeout_ms() -> u64 {
111    5_000
112}
113
114fn default_send_timeout_ms() -> u64 {
115    30_000
116}
117
118fn default_max_body_bytes() -> usize {
119    16 * 1024 * 1024
120}
121
122/// Configuration for HTTP transport.
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct HttpTransportConfig {
125    /// Endpoint URL for sending (e.g., "http://loader:8080/ingest"). None = send disabled.
126    #[serde(default)]
127    pub endpoint: Option<String>,
128
129    /// Listen address for receiving (e.g., "0.0.0.0:8080"). None = receive disabled.
130    /// Requires the `http-server` feature.
131    #[serde(default)]
132    pub listen: Option<String>,
133
134    /// Path to accept POSTs on for receive mode. Default: "/ingest".
135    #[serde(default = "default_recv_path")]
136    pub recv_path: String,
137
138    /// Receive buffer size (bounded channel capacity). Default: 10000.
139    #[serde(default = "default_buffer_size")]
140    pub recv_buffer_size: usize,
141
142    /// Receive timeout in milliseconds. Default: 100.
143    #[serde(default = "default_recv_timeout_ms")]
144    pub recv_timeout_ms: u64,
145
146    /// Inbound message filters (applied on recv before caller sees messages).
147    #[serde(default)]
148    pub filters_in: Vec<super::filter::FilterRule>,
149
150    /// Outbound message filters (applied on send before transport dispatches).
151    #[serde(default)]
152    pub filters_out: Vec<super::filter::FilterRule>,
153
154    /// Connect timeout (ms) for the send-side HTTP client. Default 5000.
155    #[serde(default = "default_connect_timeout_ms")]
156    pub connect_timeout_ms: u64,
157
158    /// Total request timeout (ms) per send. Default 30000. Prevents a hung
159    /// send from consuming worker capacity indefinitely.
160    #[serde(default = "default_send_timeout_ms")]
161    pub send_timeout_ms: u64,
162
163    /// Maximum accepted request body size in bytes (receive side). Default
164    /// 16 MiB. Oversized POSTs are rejected with 413 before buffering.
165    #[serde(default = "default_max_body_bytes")]
166    pub max_body_bytes: usize,
167
168    /// Private-CA PEM path for the send-side HTTPS client (needs the `tls`
169    /// feature). Trusts this CA *in addition to* the OS native roots. None =
170    /// native roots only (reqwest default). Maps to `TlsTrust { native_roots:
171    /// true, extra_roots: [ca] }`.
172    #[serde(default)]
173    pub tls_ca_path: Option<String>,
174}
175
176impl Default for HttpTransportConfig {
177    fn default() -> Self {
178        Self {
179            endpoint: None,
180            listen: None,
181            recv_path: default_recv_path(),
182            recv_buffer_size: default_buffer_size(),
183            recv_timeout_ms: default_recv_timeout_ms(),
184            filters_in: Vec::new(),
185            filters_out: Vec::new(),
186            connect_timeout_ms: default_connect_timeout_ms(),
187            send_timeout_ms: default_send_timeout_ms(),
188            max_body_bytes: default_max_body_bytes(),
189            tls_ca_path: None,
190        }
191    }
192}
193
194impl HttpTransportConfig {
195    /// Load from the config cascade under the `transport.http` key.
196    #[must_use]
197    pub fn from_cascade() -> Self {
198        <Self as super::traits::FromCascade>::from_cascade_key("transport.http")
199    }
200
201    /// Create a send-only config pointing at the given endpoint URL.
202    #[must_use]
203    pub fn sender(endpoint: &str) -> Self {
204        Self {
205            endpoint: Some(endpoint.to_string()),
206            ..Default::default()
207        }
208    }
209
210    /// Create a receive-only config listening on the given address.
211    #[must_use]
212    pub fn receiver(listen: &str) -> Self {
213        Self {
214            listen: Some(listen.to_string()),
215            ..Default::default()
216        }
217    }
218}
219
220/// HTTP/HTTPS transport.
221///
222/// Supports send (POST to endpoint) and receive (embedded axum server).
223/// The receive side requires the `http-server` feature for axum.
224pub struct HttpTransport {
225    /// reqwest client for sending -- `None` for a receive-only transport
226    /// (no `endpoint`). A pure source instantiates no send client (#44 role
227    /// rule: build only what the direction needs).
228    client: Option<reqwest::Client>,
229
230    /// Base URL for sending (None = send disabled).
231    endpoint: Option<String>,
232
233    /// Receiver channel populated by the embedded HTTP server.
234    /// Only available when `http-server` feature is enabled AND `listen` is configured.
235    #[cfg(feature = "http-server")]
236    receiver: Option<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Message<HttpToken>>>>,
237
238    /// Shutdown signal for the server task.
239    ///
240    /// Behind a `parking_lot::Mutex` for interior mutability: `close(&self)`
241    /// fires it to stop the embedded server promptly (graceful shutdown),
242    /// rather than waiting for `Drop`. `take()` makes both paths idempotent.
243    #[cfg(feature = "http-server")]
244    shutdown_tx: parking_lot::Mutex<Option<tokio::sync::oneshot::Sender<()>>>,
245
246    /// Server background task handle.
247    #[cfg(feature = "http-server")]
248    _server_handle: Option<tokio::task::JoinHandle<()>>,
249
250    /// Whether the transport is closed.
251    closed: Arc<AtomicBool>,
252
253    /// Receive timeout in milliseconds (used by receive side).
254    #[cfg(feature = "http-server")]
255    recv_timeout_ms: u64,
256
257    /// Transport-level message filter engine.
258    filter_engine: super::filter::TransportFilterEngine,
259}
260
261impl HttpTransport {
262    /// Create a new HTTP transport.
263    ///
264    /// - Set `config.endpoint` to enable sending (POST to URL).
265    /// - Set `config.listen` to enable receiving (embedded HTTP server, requires `http-server` feature).
266    ///
267    /// # Errors
268    ///
269    /// Returns error if the listen address is invalid or the server fails to bind.
270    pub async fn new(config: &HttpTransportConfig) -> TransportResult<Self> {
271        // Default path: no pressure governor -> byte-identical to before.
272        Self::new_inner(
273            config,
274            #[cfg(feature = "governor")]
275            None,
276        )
277        .await
278    }
279
280    /// Create an HTTP transport bound to a pressure governor (G3, `governor`
281    /// feature).
282    ///
283    /// Identical to [`new`](Self::new) except the embedded receive server
284    /// consults `pressure` BEFORE enqueuing each request: while
285    /// [`UnifiedPressure::should_hold`](crate::governor::UnifiedPressure::should_hold)
286    /// holds, the handler returns 503 (SERVICE_UNAVAILABLE) -- the same status
287    /// the existing channel-full backpressure path uses. Passing `None` is
288    /// exactly equivalent to [`new`](Self::new).
289    ///
290    /// # Errors
291    ///
292    /// Same as [`new`](Self::new).
293    #[cfg(feature = "governor")]
294    pub async fn with_pressure(
295        config: &HttpTransportConfig,
296        pressure: Option<Arc<crate::governor::UnifiedPressure>>,
297    ) -> TransportResult<Self> {
298        Self::new_inner(config, pressure).await
299    }
300
301    async fn new_inner(
302        config: &HttpTransportConfig,
303        #[cfg(feature = "governor")] pressure: Option<Arc<crate::governor::UnifiedPressure>>,
304    ) -> TransportResult<Self> {
305        // Build the send-side client ONLY in send mode (endpoint set). A
306        // receive-only transport (listen, no endpoint) instantiates no client
307        // -- the #44 role rule: build only what the direction needs.
308        let client = if config.endpoint.is_some() {
309            // `mut` is only used on the TLS path; harmless without the feature.
310            #[cfg_attr(not(feature = "tls"), allow(unused_mut))]
311            let mut client_builder = reqwest::Client::builder()
312                .connect_timeout(std::time::Duration::from_millis(config.connect_timeout_ms))
313                .timeout(std::time::Duration::from_millis(config.send_timeout_ms));
314
315            // Private-CA trust via the unified TLS module (augments native roots).
316            #[cfg(feature = "tls")]
317            if let Some(ref ca) = config.tls_ca_path {
318                let trust = crate::tls::TlsTrust {
319                    native_roots: true,
320                    webpki_roots: false,
321                    extra_roots: vec![ca.into()],
322                    extra_intermediates: Vec::new(),
323                    exclusive: false,
324                };
325                let tls_cfg =
326                    crate::tls::build_client_config(crate::tls::TlsConfigSource::Trust(trust))
327                        .map_err(|e| TransportError::Config(format!("HTTP client TLS: {e}")))?;
328                client_builder = client_builder.use_preconfigured_tls((*tls_cfg).clone());
329            }
330            #[cfg(not(feature = "tls"))]
331            if config.tls_ca_path.is_some() {
332                tracing::warn!(
333                    "http transport tls_ca_path is set but the `tls` feature is disabled -- \
334                     ignoring (using reqwest default roots)"
335                );
336            }
337
338            Some(client_builder.build().map_err(|e| {
339                TransportError::Config(format!("failed to create HTTP client: {e}"))
340            })?)
341        } else {
342            None
343        };
344
345        #[cfg(feature = "http-server")]
346        let (receiver, shutdown_tx, server_handle) = if let Some(listen) = &config.listen {
347            let addr: std::net::SocketAddr = listen
348                .parse()
349                .map_err(|e| TransportError::Config(format!("invalid listen address: {e}")))?;
350
351            let (tx, rx) = tokio::sync::mpsc::channel(config.recv_buffer_size);
352            let (sd_tx, sd_rx) = tokio::sync::oneshot::channel::<()>();
353
354            let sequence = Arc::new(AtomicU64::new(0));
355            let recv_path = config.recv_path.clone();
356
357            let app = build_receiver_router(
358                tx,
359                sequence,
360                &recv_path,
361                config.max_body_bytes,
362                #[cfg(feature = "governor")]
363                pressure.clone(),
364            );
365
366            let listener = tokio::net::TcpListener::bind(addr).await.map_err(|e| {
367                TransportError::Connection(format!("failed to bind to {addr}: {e}"))
368            })?;
369
370            let handle = tokio::spawn(async move {
371                axum::serve(
372                    listener,
373                    app.into_make_service_with_connect_info::<std::net::SocketAddr>(),
374                )
375                .with_graceful_shutdown(async {
376                    sd_rx.await.ok();
377                })
378                .await
379                .ok();
380            });
381
382            (Some(tokio::sync::Mutex::new(rx)), Some(sd_tx), Some(handle))
383        } else {
384            (None, None, None)
385        };
386
387        // When the governor feature is on but http-server is off there is no
388        // receive server to attach the pressure governor to; consume it so the
389        // signature stays uniform without an unused-variable warning.
390        #[cfg(all(feature = "governor", not(feature = "http-server")))]
391        let _ = pressure;
392
393        #[cfg(feature = "logger")]
394        tracing::info!(
395            endpoint = ?config.endpoint,
396            listen = ?config.listen,
397            "HTTP transport opened"
398        );
399
400        // Fail loud on bad filter config -- silently disabling filters
401        // turns a misconfigured `drop` / `dlq` rule into a permanent pass.
402        let filter_engine = super::filter::TransportFilterEngine::new(
403            &config.filters_in,
404            &config.filters_out,
405            &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
406        )?;
407
408        let closed = Arc::new(AtomicBool::new(false));
409
410        #[cfg(feature = "health")]
411        {
412            let h = Arc::clone(&closed);
413            crate::health::HealthRegistry::register("transport:http", move || {
414                if h.load(Ordering::Relaxed) {
415                    crate::health::HealthStatus::Unhealthy
416                } else {
417                    crate::health::HealthStatus::Healthy
418                }
419            });
420        }
421
422        Ok(Self {
423            client,
424            endpoint: config.endpoint.clone(),
425            #[cfg(feature = "http-server")]
426            receiver,
427            #[cfg(feature = "http-server")]
428            shutdown_tx: parking_lot::Mutex::new(shutdown_tx),
429            #[cfg(feature = "http-server")]
430            _server_handle: server_handle,
431            closed,
432            #[cfg(feature = "http-server")]
433            recv_timeout_ms: config.recv_timeout_ms,
434            filter_engine,
435        })
436    }
437}
438
439/// Build the axum router for the receive side.
440#[cfg(feature = "http-server")]
441fn build_receiver_router(
442    sender: tokio::sync::mpsc::Sender<Message<HttpToken>>,
443    sequence: Arc<AtomicU64>,
444    recv_path: &str,
445    max_body_bytes: usize,
446    #[cfg(feature = "governor")] pressure: Option<Arc<crate::governor::UnifiedPressure>>,
447) -> axum::Router {
448    use axum::routing::post;
449
450    let state = ReceiverState {
451        sender,
452        sequence,
453        #[cfg(feature = "governor")]
454        pressure,
455    };
456
457    axum::Router::new()
458        .route(recv_path, post(ingest_handler))
459        // Reject oversized bodies with 413 before the handler buffers them.
460        .layer(axum::extract::DefaultBodyLimit::max(max_body_bytes))
461        .with_state(state)
462}
463
464/// Shared state for the receive handler.
465#[cfg(feature = "http-server")]
466#[derive(Clone)]
467struct ReceiverState {
468    sender: tokio::sync::mpsc::Sender<Message<HttpToken>>,
469    sequence: Arc<AtomicU64>,
470    /// Optional pressure governor (G3, `governor` feature). `None` by default
471    /// -> the handler never consults it and behaviour is byte-identical. When
472    /// `Some`, the handler rejects with 503 while [`UnifiedPressure::should_hold`]
473    /// holds -- pressure-driven shedding ON TOP of the existing channel-full
474    /// 503, never replacing it.
475    #[cfg(feature = "governor")]
476    pressure: Option<Arc<crate::governor::UnifiedPressure>>,
477}
478
479/// POST handler that accepts raw bytes and queues them into the mpsc channel.
480#[cfg(feature = "http-server")]
481async fn ingest_handler(
482    axum::extract::State(state): axum::extract::State<ReceiverState>,
483    axum::extract::ConnectInfo(addr): axum::extract::ConnectInfo<std::net::SocketAddr>,
484    headers: axum::http::HeaderMap,
485    body: axum::body::Bytes,
486) -> axum::response::Response {
487    use axum::response::IntoResponse as _;
488
489    if body.is_empty() {
490        return axum::http::StatusCode::BAD_REQUEST.into_response();
491    }
492
493    // G3 pressure-driven shedding (governor feature, opt-in). BEFORE enqueuing,
494    // if a governor is wired and it says hold, shed the request with 503 --
495    // consistent with the existing channel-full 503 below (NOT 429). Default
496    // `None` -> this is skipped and behaviour is byte-identical.
497    #[cfg(feature = "governor")]
498    if let Some(pressure) = &state.pressure
499        && pressure.should_hold()
500    {
501        #[cfg(feature = "metrics")]
502        metrics::counter!("dfe_transport_backpressured_total", "transport" => "http", "reason" => "pressure")
503            .increment(1);
504        return shed_503();
505    }
506
507    // Extract W3C traceparent from incoming HTTP headers for distributed tracing
508    #[cfg(feature = "transport-trace")]
509    if let Some(tp) = headers
510        .get(super::propagation::TRACEPARENT_HEADER)
511        .and_then(|v| v.to_str().ok())
512        && super::propagation::is_valid_traceparent(tp)
513    {
514        tracing::Span::current().record("traceparent", tp);
515    }
516
517    // Suppress unused variable warning when otel feature is disabled
518    #[cfg(not(feature = "otel"))]
519    let _ = &headers;
520
521    let seq = state.sequence.fetch_add(1, Ordering::Relaxed);
522    let format = PayloadFormat::detect(&body);
523    let timestamp_ms = chrono::Utc::now().timestamp_millis();
524
525    // `body` is `axum::body::Bytes` (= `bytes::Bytes`) -- move it directly,
526    // no copy needed.
527    let msg = Message {
528        key: None,
529        payload: body,
530        token: HttpToken::with_source(seq, addr.to_string()),
531        timestamp_ms: Some(timestamp_ms),
532        format,
533    };
534
535    match state.sender.try_send(msg) {
536        Ok(()) => {
537            #[cfg(feature = "metrics")]
538            metrics::counter!("dfe_transport_sent_total", "transport" => "http").increment(1);
539            axum::http::StatusCode::OK.into_response()
540        }
541        Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
542            #[cfg(feature = "metrics")]
543            metrics::counter!("dfe_transport_backpressured_total", "transport" => "http")
544                .increment(1);
545            shed_503()
546        }
547        Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
548            #[cfg(feature = "metrics")]
549            metrics::counter!("dfe_transport_refused_total", "transport" => "http").increment(1);
550            axum::http::StatusCode::GONE.into_response()
551        }
552    }
553}
554
555/// 503 shed response with a `Retry-After: 1` hint, so a well-behaved sender
556/// backs off briefly instead of hot-retrying into a pod that is already holding.
557#[cfg(feature = "http-server")]
558fn shed_503() -> axum::response::Response {
559    use axum::response::IntoResponse as _;
560    (
561        axum::http::StatusCode::SERVICE_UNAVAILABLE,
562        [(axum::http::header::RETRY_AFTER, "1")],
563    )
564        .into_response()
565}
566
567impl TransportBase for HttpTransport {
568    async fn close(&self) -> TransportResult<()> {
569        self.closed.store(true, Ordering::Relaxed);
570        // Stop the embedded server now (graceful shutdown) rather than on Drop.
571        // take() => idempotent: a later close()/drop() is a no-op.
572        #[cfg(feature = "http-server")]
573        if let Some(tx) = self.shutdown_tx.lock().take() {
574            let _ = tx.send(());
575        }
576        Ok(())
577    }
578
579    fn is_healthy(&self) -> bool {
580        !self.closed.load(Ordering::Relaxed)
581    }
582
583    fn name(&self) -> &'static str {
584        "http"
585    }
586}
587
588impl TransportSender for HttpTransport {
589    async fn send(&self, key: &str, payload: bytes::Bytes) -> SendResult {
590        if self.closed.load(Ordering::Relaxed) {
591            return SendResult::Fatal(TransportError::Closed);
592        }
593
594        // Outbound filter check
595        if self.filter_engine.has_outbound_filters() {
596            match self.filter_engine.apply_outbound(&payload) {
597                super::filter::FilterDisposition::Pass => {}
598                super::filter::FilterDisposition::Drop => return SendResult::Ok,
599                super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
600            }
601        }
602
603        let Some(base_url) = &self.endpoint else {
604            return SendResult::Fatal(TransportError::Config(
605                "no endpoint configured for sending".into(),
606            ));
607        };
608
609        // Receive-only transports build no send client (set in tandem with
610        // endpoint, so this is Some whenever endpoint is Some).
611        let Some(client) = &self.client else {
612            return SendResult::Fatal(TransportError::Config(
613                "no send client (receive-only HTTP transport)".into(),
614            ));
615        };
616
617        // Build URL: {base_url}/{key} if key is non-empty, otherwise just {base_url}
618        let url = if key.is_empty() {
619            base_url.clone()
620        } else {
621            let base = base_url.trim_end_matches('/');
622            let suffix = key.trim_start_matches('/');
623            format!("{base}/{suffix}")
624        };
625
626        #[cfg(feature = "metrics")]
627        let start = std::time::Instant::now();
628
629        // Build request with optional W3C traceparent header for distributed tracing
630        let request_builder = client
631            .post(&url)
632            .header("content-type", "application/octet-stream");
633
634        #[cfg(feature = "transport-trace")]
635        let request_builder = if let Some(tp) = super::propagation::current_traceparent() {
636            request_builder.header(super::propagation::TRACEPARENT_HEADER, tp)
637        } else {
638            request_builder
639        };
640
641        #[cfg(feature = "logger")]
642        let payload_len = payload.len();
643        let result = match request_builder.body(payload).send().await {
644            Ok(resp) if resp.status().is_success() => {
645                #[cfg(feature = "logger")]
646                tracing::debug!(url = %url, bytes = payload_len, "HTTP transport: POST sent");
647
648                #[cfg(feature = "metrics")]
649                metrics::counter!("dfe_transport_sent_total", "transport" => "http").increment(1);
650                SendResult::Ok
651            }
652            Ok(resp)
653                if resp.status() == reqwest::StatusCode::TOO_MANY_REQUESTS
654                    || resp.status() == reqwest::StatusCode::SERVICE_UNAVAILABLE =>
655            {
656                #[cfg(feature = "logger")]
657                tracing::warn!(status = %resp.status(), url = %url, "HTTP transport: backpressure");
658
659                #[cfg(feature = "metrics")]
660                metrics::counter!("dfe_transport_backpressured_total", "transport" => "http")
661                    .increment(1);
662                SendResult::Backpressured
663            }
664            Ok(resp) => {
665                #[cfg(feature = "logger")]
666                tracing::warn!(status = %resp.status(), url = %url, "HTTP transport: send error");
667
668                #[cfg(feature = "metrics")]
669                metrics::counter!("dfe_transport_send_errors_total", "transport" => "http")
670                    .increment(1);
671                SendResult::Fatal(TransportError::Send(format!(
672                    "HTTP {} from {}",
673                    resp.status(),
674                    url
675                )))
676            }
677            Err(e) => {
678                #[cfg(feature = "logger")]
679                tracing::warn!(error = %e, url = %url, "HTTP transport: request failed");
680
681                #[cfg(feature = "metrics")]
682                metrics::counter!("dfe_transport_send_errors_total", "transport" => "http")
683                    .increment(1);
684                SendResult::Fatal(TransportError::Send(format!("HTTP request failed: {e}")))
685            }
686        };
687
688        #[cfg(feature = "metrics")]
689        metrics::histogram!("dfe_transport_send_duration_seconds", "transport" => "http")
690            .record(start.elapsed().as_secs_f64());
691
692        result
693    }
694}
695
696impl TransportReceiver for HttpTransport {
697    type Token = HttpToken;
698
699    async fn recv(&self, max: usize) -> TransportResult<WorkBatch<Self::Token>> {
700        if self.closed.load(Ordering::Relaxed) {
701            return Err(TransportError::Closed);
702        }
703
704        #[cfg(feature = "http-server")]
705        {
706            let Some(receiver) = &self.receiver else {
707                return Err(TransportError::Config(
708                    "no listen address configured for receiving".into(),
709                ));
710            };
711
712            let mut rx = receiver.lock().await;
713            let mut messages = Vec::with_capacity(max.min(100));
714
715            for _ in 0..max {
716                let result = if self.recv_timeout_ms == 0 {
717                    match rx.try_recv() {
718                        Ok(msg) => Some(msg),
719                        Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
720                        Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
721                            return Err(TransportError::Closed);
722                        }
723                    }
724                } else if messages.is_empty() {
725                    // First message: wait with timeout
726                    match tokio::time::timeout(
727                        std::time::Duration::from_millis(self.recv_timeout_ms),
728                        rx.recv(),
729                    )
730                    .await
731                    {
732                        Ok(Some(msg)) => Some(msg),
733                        Ok(None) => return Err(TransportError::Closed),
734                        Err(_) => break, // Timeout
735                    }
736                } else {
737                    // Subsequent: non-blocking drain
738                    match rx.try_recv() {
739                        Ok(msg) => Some(msg),
740                        Err(_) => break,
741                    }
742                };
743
744                if let Some(msg) = result {
745                    messages.push(msg);
746                }
747            }
748
749            // Apply inbound filters via the shared partition helper; DLQ
750            // entries are returned in the RecvBatch for the caller to route.
751            let batch = self.filter_engine.partition_batch(
752                messages,
753                |m| m.payload.as_ref(),
754                |m| m.key.clone(),
755                |m| m.token.clone(),
756            );
757            let messages = batch.messages;
758            let dlq_entries = batch.dlq_entries;
759            let filtered_tokens = batch.filtered_tokens;
760
761            #[cfg(feature = "logger")]
762            if !messages.is_empty() {
763                tracing::debug!(messages = messages.len(), "HTTP transport: batch received");
764            }
765
766            // New default (metrics audit): inbound throughput (was only emitted
767            // by file/pipe/redis). Counts post-filter delivered messages.
768            #[cfg(feature = "metrics")]
769            if !messages.is_empty() {
770                metrics::counter!("dfe_transport_received_total", "transport" => "http")
771                    .increment(messages.len() as u64);
772            }
773
774            Ok(RecvBatch {
775                messages,
776                dlq_entries,
777                filtered_tokens,
778            }
779            .into())
780        }
781
782        #[cfg(not(feature = "http-server"))]
783        {
784            let _ = max;
785            Err(TransportError::Config(
786                "HTTP receive requires the 'http-server' feature".into(),
787            ))
788        }
789    }
790
791    async fn commit(&self, _tokens: &[Self::Token]) -> TransportResult<()> {
792        // HTTP is fire-and-forget -- commit is a no-op.
793        Ok(())
794    }
795}
796
797impl Drop for HttpTransport {
798    fn drop(&mut self) {
799        #[cfg(feature = "http-server")]
800        if let Some(tx) = self.shutdown_tx.lock().take() {
801            let _ = tx.send(());
802        }
803    }
804}
805
806impl super::traits::FromCascade for HttpTransportConfig {}
807
808#[cfg(test)]
809mod tests {
810    use super::*;
811
812    #[test]
813    fn http_token_display() {
814        let token = HttpToken::new(42);
815        assert_eq!(format!("{token}"), "http:42");
816    }
817
818    #[test]
819    fn http_token_display_with_source() {
820        let token = HttpToken::with_source(7, "192.168.1.1:54321".to_string());
821        assert_eq!(format!("{token}"), "http:192.168.1.1:54321:7");
822    }
823
824    #[test]
825    fn config_defaults() {
826        let config = HttpTransportConfig::default();
827        assert!(config.endpoint.is_none());
828        assert!(config.listen.is_none());
829        assert_eq!(config.recv_path, "/ingest");
830        assert_eq!(config.recv_buffer_size, 10_000);
831        assert_eq!(config.recv_timeout_ms, 100);
832    }
833
834    #[test]
835    fn config_sender_helper() {
836        let config = HttpTransportConfig::sender("http://localhost:8080/ingest");
837        assert_eq!(
838            config.endpoint.as_deref(),
839            Some("http://localhost:8080/ingest")
840        );
841        assert!(config.listen.is_none());
842    }
843
844    #[test]
845    fn config_receiver_helper() {
846        let config = HttpTransportConfig::receiver("0.0.0.0:8080");
847        assert!(config.endpoint.is_none());
848        assert_eq!(config.listen.as_deref(), Some("0.0.0.0:8080"));
849    }
850
851    #[tokio::test]
852    async fn send_only_transport() {
853        // Send-only config (no endpoint = send disabled, but transport creates fine)
854        let config = HttpTransportConfig::default();
855        let transport = HttpTransport::new(&config).await.unwrap();
856
857        assert!(transport.is_healthy());
858        assert_eq!(transport.name(), "http");
859
860        // Send without endpoint should fail
861        let result = transport
862            .send("test", bytes::Bytes::from_static(b"payload"))
863            .await;
864        assert!(result.is_fatal());
865
866        // Commit is always ok
867        transport.commit(&[]).await.unwrap();
868    }
869
870    #[tokio::test]
871    async fn close_prevents_send() {
872        let config = HttpTransportConfig::sender("http://localhost:19999/test");
873        let transport = HttpTransport::new(&config).await.unwrap();
874
875        transport.close().await.unwrap();
876        assert!(!transport.is_healthy());
877
878        let result = transport
879            .send("test", bytes::Bytes::from_static(b"data"))
880            .await;
881        assert!(result.is_fatal());
882    }
883
884    #[tokio::test]
885    async fn close_prevents_recv() {
886        let config = HttpTransportConfig::default();
887        let transport = HttpTransport::new(&config).await.unwrap();
888
889        transport.close().await.unwrap();
890        let result = transport.recv(1).await;
891        assert!(result.is_err());
892    }
893
894    /// Full send + receive round-trip test.
895    /// Requires both `transport-http` and `http-server` features.
896    #[cfg(feature = "http-server")]
897    #[tokio::test]
898    async fn send_and_receive_roundtrip() {
899        // Start receiver on a random available port
900        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
901        let addr = listener.local_addr().unwrap();
902        drop(listener); // Free the port for the transport to bind
903
904        let recv_config = HttpTransportConfig {
905            listen: Some(addr.to_string()),
906            recv_path: "/ingest".to_string(),
907            recv_buffer_size: 100,
908            recv_timeout_ms: 1000,
909            ..Default::default()
910        };
911        let receiver = HttpTransport::new(&recv_config).await.unwrap();
912
913        // Give the server a moment to start
914        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
915
916        // Send a message via a separate sender transport
917        let send_config =
918            HttpTransportConfig::sender(&format!("http://127.0.0.1:{}/ingest", addr.port()));
919        let sender = HttpTransport::new(&send_config).await.unwrap();
920
921        let result = sender
922            .send("", bytes::Bytes::from_static(b"{\"msg\":\"hello\"}"))
923            .await;
924        assert!(result.is_ok(), "send failed: {result:?}");
925
926        // Receive it
927        let batch = receiver.recv(10).await.unwrap();
928        assert_eq!(batch.records.len(), 1);
929        assert_eq!(batch.records[0].payload.as_ref(), b"{\"msg\":\"hello\"}");
930        // The source address rides on the batch commit token, not the record.
931        assert!(batch.commit_tokens[0].source_addr.is_some());
932
933        // Cleanup
934        sender.close().await.unwrap();
935        receiver.close().await.unwrap();
936    }
937
938    /// Test that the receiver rejects empty bodies.
939    #[cfg(feature = "http-server")]
940    #[tokio::test]
941    async fn receive_rejects_empty_body() {
942        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
943        let addr = listener.local_addr().unwrap();
944        drop(listener);
945
946        let recv_config = HttpTransportConfig {
947            listen: Some(addr.to_string()),
948            recv_timeout_ms: 200,
949            ..Default::default()
950        };
951        let receiver = HttpTransport::new(&recv_config).await.unwrap();
952        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
953
954        // Send empty body
955        let client = reqwest::Client::new();
956        let resp = client
957            .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
958            .body(Vec::<u8>::new())
959            .send()
960            .await
961            .unwrap();
962
963        assert_eq!(resp.status(), reqwest::StatusCode::BAD_REQUEST);
964
965        // recv should timeout with no messages
966        let records = receiver.recv(10).await.unwrap().records;
967        assert!(records.is_empty());
968
969        receiver.close().await.unwrap();
970    }
971
972    /// Oversized bodies are rejected with 413 before the handler buffers them.
973    #[cfg(feature = "http-server")]
974    #[tokio::test]
975    async fn receive_rejects_oversized_body() {
976        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
977        let addr = listener.local_addr().unwrap();
978        drop(listener);
979
980        let recv_config = HttpTransportConfig {
981            listen: Some(addr.to_string()),
982            recv_timeout_ms: 200,
983            max_body_bytes: 1024, // 1 KiB cap
984            ..Default::default()
985        };
986        let receiver = HttpTransport::new(&recv_config).await.unwrap();
987        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
988
989        // POST 8 KiB -- over the 1 KiB cap.
990        let client = reqwest::Client::new();
991        let resp = client
992            .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
993            .body(vec![b'x'; 8 * 1024])
994            .send()
995            .await
996            .unwrap();
997
998        assert_eq!(resp.status(), reqwest::StatusCode::PAYLOAD_TOO_LARGE);
999
1000        // The oversized POST never reached the queue.
1001        let records = receiver.recv(10).await.unwrap().records;
1002        assert!(records.is_empty(), "oversized body must not be queued");
1003
1004        receiver.close().await.unwrap();
1005    }
1006
1007    /// Test recv without listen returns config error.
1008    #[cfg(feature = "http-server")]
1009    #[tokio::test]
1010    async fn recv_without_listen_returns_error() {
1011        let config = HttpTransportConfig::sender("http://localhost:9999");
1012        let transport = HttpTransport::new(&config).await.unwrap();
1013
1014        let result = transport.recv(10).await;
1015        assert!(result.is_err());
1016    }
1017
1018    /// #44 role rule: a sender-mode transport (endpoint, no listen) builds a
1019    /// send client.
1020    #[tokio::test]
1021    async fn sender_mode_builds_a_client() {
1022        let transport = HttpTransport::new(&HttpTransportConfig::sender("http://localhost:9999"))
1023            .await
1024            .unwrap();
1025        assert!(
1026            transport.client.is_some(),
1027            "sender mode must build a client"
1028        );
1029    }
1030
1031    /// #44 role rule: a receive-only transport (listen, no endpoint) builds NO
1032    /// send client, and send() is fatal.
1033    #[cfg(feature = "http-server")]
1034    #[tokio::test]
1035    async fn receiver_mode_builds_no_client_and_send_errors() {
1036        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1037        let addr = listener.local_addr().unwrap();
1038        drop(listener); // free the port for the transport to bind
1039        let transport = HttpTransport::new(&HttpTransportConfig::receiver(&addr.to_string()))
1040            .await
1041            .unwrap();
1042        assert!(
1043            transport.client.is_none(),
1044            "receive-only must build no send client"
1045        );
1046        let result = transport.send("k", bytes::Bytes::from_static(b"x")).await;
1047        assert!(
1048            matches!(result, SendResult::Fatal(_)),
1049            "send on a receive-only transport must be fatal"
1050        );
1051        transport.close().await.unwrap();
1052    }
1053
1054    /// G3: with a pressure governor pinned HIGH, the ingest handler sheds with
1055    /// 503 (SERVICE_UNAVAILABLE) -- the same status as the channel-full path.
1056    /// With `None` (the default `new`), POST is accepted (200) as before.
1057    #[cfg(all(feature = "http-server", feature = "governor"))]
1058    #[tokio::test]
1059    async fn pressure_high_sheds_with_503_and_none_is_normal() {
1060        use crate::governor::{Hysteresis, MemoryPressureSource, PressureSource, UnifiedPressure};
1061        use crate::memory::{MemoryGuard, MemoryGuardConfig};
1062
1063        // --- None default: POST accepted (200) ---
1064        {
1065            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1066            let addr = listener.local_addr().unwrap();
1067            drop(listener);
1068            let cfg = HttpTransportConfig {
1069                listen: Some(addr.to_string()),
1070                recv_timeout_ms: 200,
1071                ..Default::default()
1072            };
1073            let receiver = HttpTransport::new(&cfg).await.unwrap();
1074            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1075
1076            let client = reqwest::Client::new();
1077            let resp = client
1078                .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
1079                .body(b"{\"msg\":\"ok\"}".to_vec())
1080                .send()
1081                .await
1082                .unwrap();
1083            assert_eq!(
1084                resp.status(),
1085                reqwest::StatusCode::OK,
1086                "no governor -> accepted"
1087            );
1088            receiver.close().await.unwrap();
1089        }
1090
1091        // --- Governor pinned HIGH (HARD memory source at 95%): 503 ---
1092        {
1093            let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
1094                limit_bytes: 1000,
1095                pressure_threshold: 0.80,
1096                ..Default::default()
1097            }));
1098            guard.add_bytes(950); // 95% -> well above pause_above
1099            let src = MemoryPressureSource::new(Arc::clone(&guard));
1100            let pressure = Arc::new(UnifiedPressure::new(
1101                vec![Arc::new(src) as Arc<dyn PressureSource>],
1102                Hysteresis::new(0.80, 0.65).expect("valid band"),
1103            ));
1104            // Sanity: the latch is armed.
1105            assert!(pressure.should_hold(), "pinned-high governor must hold");
1106
1107            let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1108            let addr = listener.local_addr().unwrap();
1109            drop(listener);
1110            let cfg = HttpTransportConfig {
1111                listen: Some(addr.to_string()),
1112                recv_timeout_ms: 200,
1113                ..Default::default()
1114            };
1115            let receiver = HttpTransport::with_pressure(&cfg, Some(Arc::clone(&pressure)))
1116                .await
1117                .unwrap();
1118            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1119
1120            let client = reqwest::Client::new();
1121            let resp = client
1122                .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
1123                .body(b"{\"msg\":\"shed\"}".to_vec())
1124                .send()
1125                .await
1126                .unwrap();
1127            assert_eq!(
1128                resp.status(),
1129                reqwest::StatusCode::SERVICE_UNAVAILABLE,
1130                "pinned-high governor must shed with 503"
1131            );
1132            // The 503 carries a Retry-After hint so a well-behaved sender backs
1133            // off instead of hot-retrying into a holding pod.
1134            assert_eq!(
1135                resp.headers()
1136                    .get(reqwest::header::RETRY_AFTER)
1137                    .and_then(|v| v.to_str().ok()),
1138                Some("1"),
1139                "503 shed must carry Retry-After"
1140            );
1141
1142            // The shed POST never reached the queue.
1143            let records = receiver.recv(10).await.unwrap().records;
1144            assert!(records.is_empty(), "shed request must not be queued");
1145            receiver.close().await.unwrap();
1146        }
1147    }
1148
1149    #[test]
1150    fn config_serde_roundtrip() {
1151        let config = HttpTransportConfig {
1152            endpoint: Some("http://example.com/ingest".into()),
1153            listen: Some("0.0.0.0:8080".into()),
1154            recv_path: "/custom".into(),
1155            recv_buffer_size: 5000,
1156            recv_timeout_ms: 250,
1157            ..Default::default()
1158        };
1159
1160        let json = serde_json::to_string(&config).unwrap();
1161        let parsed: HttpTransportConfig = serde_json::from_str(&json).unwrap();
1162
1163        assert_eq!(parsed.endpoint, config.endpoint);
1164        assert_eq!(parsed.listen, config.listen);
1165        assert_eq!(parsed.recv_path, config.recv_path);
1166        assert_eq!(parsed.recv_buffer_size, config.recv_buffer_size);
1167        assert_eq!(parsed.recv_timeout_ms, config.recv_timeout_ms);
1168    }
1169}