1use super::error::{TransportError, TransportResult};
38use super::traits::{CommitToken, TransportBase, TransportReceiver, TransportSender};
39#[cfg(feature = "http-server")]
40use super::types::PayloadFormat;
41use super::types::{Message, SendResult};
42use serde::{Deserialize, Serialize};
43use std::sync::Arc;
44#[cfg(feature = "http-server")]
45use std::sync::atomic::AtomicU64;
46use std::sync::atomic::{AtomicBool, Ordering};
47
48#[derive(Debug, Clone)]
54pub struct HttpToken {
55 pub seq: u64,
57
58 pub source_addr: Option<String>,
60}
61
62impl HttpToken {
63 #[must_use]
65 pub fn new(seq: u64) -> Self {
66 Self {
67 seq,
68 source_addr: None,
69 }
70 }
71
72 #[must_use]
74 pub fn with_source(seq: u64, addr: String) -> Self {
75 Self {
76 seq,
77 source_addr: Some(addr),
78 }
79 }
80}
81
82impl CommitToken for HttpToken {}
83
84impl std::fmt::Display for HttpToken {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 match &self.source_addr {
87 Some(addr) => write!(f, "http:{}:{}", addr, self.seq),
88 None => write!(f, "http:{}", self.seq),
89 }
90 }
91}
92
93fn default_recv_path() -> String {
94 "/ingest".to_string()
95}
96
97fn default_buffer_size() -> usize {
98 10_000
99}
100
101fn default_recv_timeout_ms() -> u64 {
102 100
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct HttpTransportConfig {
108 #[serde(default)]
110 pub endpoint: Option<String>,
111
112 #[serde(default)]
115 pub listen: Option<String>,
116
117 #[serde(default = "default_recv_path")]
119 pub recv_path: String,
120
121 #[serde(default = "default_buffer_size")]
123 pub recv_buffer_size: usize,
124
125 #[serde(default = "default_recv_timeout_ms")]
127 pub recv_timeout_ms: u64,
128
129 #[serde(default)]
131 pub filters_in: Vec<super::filter::FilterRule>,
132
133 #[serde(default)]
135 pub filters_out: Vec<super::filter::FilterRule>,
136}
137
138impl Default for HttpTransportConfig {
139 fn default() -> Self {
140 Self {
141 endpoint: None,
142 listen: None,
143 recv_path: default_recv_path(),
144 recv_buffer_size: default_buffer_size(),
145 recv_timeout_ms: default_recv_timeout_ms(),
146 filters_in: Vec::new(),
147 filters_out: Vec::new(),
148 }
149 }
150}
151
152impl HttpTransportConfig {
153 #[must_use]
155 pub fn from_cascade() -> Self {
156 #[cfg(feature = "config")]
157 {
158 if let Some(cfg) = crate::config::try_get()
159 && let Ok(tc) = cfg.unmarshal_key_registered::<Self>("transport.http")
160 {
161 return tc;
162 }
163 }
164 Self::default()
165 }
166
167 #[must_use]
169 pub fn sender(endpoint: &str) -> Self {
170 Self {
171 endpoint: Some(endpoint.to_string()),
172 ..Default::default()
173 }
174 }
175
176 #[must_use]
178 pub fn receiver(listen: &str) -> Self {
179 Self {
180 listen: Some(listen.to_string()),
181 ..Default::default()
182 }
183 }
184}
185
186pub struct HttpTransport {
191 client: reqwest::Client,
193
194 endpoint: Option<String>,
196
197 #[cfg(feature = "http-server")]
200 receiver: Option<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Message<HttpToken>>>>,
201
202 #[cfg(feature = "http-server")]
204 shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
205
206 #[cfg(feature = "http-server")]
208 _server_handle: Option<tokio::task::JoinHandle<()>>,
209
210 closed: Arc<AtomicBool>,
212
213 #[cfg(feature = "http-server")]
215 recv_timeout_ms: u64,
216
217 filter_engine: super::filter::TransportFilterEngine,
219
220 filtered_dlq_buffer: parking_lot::Mutex<Vec<super::filter::FilteredDlqEntry>>,
223}
224
225impl HttpTransport {
226 pub async fn new(config: &HttpTransportConfig) -> TransportResult<Self> {
235 let client = reqwest::Client::builder()
236 .build()
237 .map_err(|e| TransportError::Config(format!("failed to create HTTP client: {e}")))?;
238
239 #[cfg(feature = "http-server")]
240 let (receiver, shutdown_tx, server_handle) = if let Some(listen) = &config.listen {
241 let addr: std::net::SocketAddr = listen
242 .parse()
243 .map_err(|e| TransportError::Config(format!("invalid listen address: {e}")))?;
244
245 let (tx, rx) = tokio::sync::mpsc::channel(config.recv_buffer_size);
246 let (sd_tx, sd_rx) = tokio::sync::oneshot::channel::<()>();
247
248 let sequence = Arc::new(AtomicU64::new(0));
249 let recv_path = config.recv_path.clone();
250
251 let app = build_receiver_router(tx, sequence, &recv_path);
252
253 let listener = tokio::net::TcpListener::bind(addr).await.map_err(|e| {
254 TransportError::Connection(format!("failed to bind to {addr}: {e}"))
255 })?;
256
257 let handle = tokio::spawn(async move {
258 axum::serve(
259 listener,
260 app.into_make_service_with_connect_info::<std::net::SocketAddr>(),
261 )
262 .with_graceful_shutdown(async {
263 sd_rx.await.ok();
264 })
265 .await
266 .ok();
267 });
268
269 (Some(tokio::sync::Mutex::new(rx)), Some(sd_tx), Some(handle))
270 } else {
271 (None, None, None)
272 };
273
274 #[cfg(feature = "logger")]
275 tracing::info!(
276 endpoint = ?config.endpoint,
277 listen = ?config.listen,
278 "HTTP transport opened"
279 );
280
281 let filter_engine = super::filter::TransportFilterEngine::new(
284 &config.filters_in,
285 &config.filters_out,
286 &crate::transport::filter::TransportFilterTierConfig::from_cascade(),
287 )?;
288
289 let closed = Arc::new(AtomicBool::new(false));
290
291 #[cfg(feature = "health")]
292 {
293 let h = Arc::clone(&closed);
294 crate::health::HealthRegistry::register("transport:http", move || {
295 if h.load(Ordering::Relaxed) {
296 crate::health::HealthStatus::Unhealthy
297 } else {
298 crate::health::HealthStatus::Healthy
299 }
300 });
301 }
302
303 Ok(Self {
304 client,
305 endpoint: config.endpoint.clone(),
306 #[cfg(feature = "http-server")]
307 receiver,
308 #[cfg(feature = "http-server")]
309 shutdown_tx,
310 #[cfg(feature = "http-server")]
311 _server_handle: server_handle,
312 closed,
313 #[cfg(feature = "http-server")]
314 recv_timeout_ms: config.recv_timeout_ms,
315 filter_engine,
316 filtered_dlq_buffer: parking_lot::Mutex::new(Vec::new()),
317 })
318 }
319}
320
321#[cfg(feature = "http-server")]
323fn build_receiver_router(
324 sender: tokio::sync::mpsc::Sender<Message<HttpToken>>,
325 sequence: Arc<AtomicU64>,
326 recv_path: &str,
327) -> axum::Router {
328 use axum::routing::post;
329
330 let state = ReceiverState { sender, sequence };
331
332 axum::Router::new()
333 .route(recv_path, post(ingest_handler))
334 .with_state(state)
335}
336
337#[cfg(feature = "http-server")]
339#[derive(Clone)]
340struct ReceiverState {
341 sender: tokio::sync::mpsc::Sender<Message<HttpToken>>,
342 sequence: Arc<AtomicU64>,
343}
344
345#[cfg(feature = "http-server")]
347async fn ingest_handler(
348 axum::extract::State(state): axum::extract::State<ReceiverState>,
349 axum::extract::ConnectInfo(addr): axum::extract::ConnectInfo<std::net::SocketAddr>,
350 headers: axum::http::HeaderMap,
351 body: axum::body::Bytes,
352) -> axum::http::StatusCode {
353 if body.is_empty() {
354 return axum::http::StatusCode::BAD_REQUEST;
355 }
356
357 #[cfg(feature = "transport-trace")]
359 if let Some(tp) = headers
360 .get(super::propagation::TRACEPARENT_HEADER)
361 .and_then(|v| v.to_str().ok())
362 && super::propagation::is_valid_traceparent(tp)
363 {
364 tracing::Span::current().record("traceparent", tp);
365 }
366
367 #[cfg(not(feature = "otel"))]
369 let _ = &headers;
370
371 let seq = state.sequence.fetch_add(1, Ordering::Relaxed);
372 let format = PayloadFormat::detect(&body);
373 let timestamp_ms = chrono::Utc::now().timestamp_millis();
374
375 let msg = Message {
376 key: None,
377 payload: body.to_vec(),
378 token: HttpToken::with_source(seq, addr.to_string()),
379 timestamp_ms: Some(timestamp_ms),
380 format,
381 };
382
383 match state.sender.try_send(msg) {
384 Ok(()) => {
385 #[cfg(feature = "metrics")]
386 metrics::counter!("dfe_transport_sent_total", "transport" => "http").increment(1);
387 axum::http::StatusCode::OK
388 }
389 Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
390 #[cfg(feature = "metrics")]
391 metrics::counter!("dfe_transport_backpressured_total", "transport" => "http")
392 .increment(1);
393 axum::http::StatusCode::SERVICE_UNAVAILABLE
394 }
395 Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
396 #[cfg(feature = "metrics")]
397 metrics::counter!("dfe_transport_refused_total", "transport" => "http").increment(1);
398 axum::http::StatusCode::GONE
399 }
400 }
401}
402
403impl TransportBase for HttpTransport {
404 async fn close(&self) -> TransportResult<()> {
405 self.closed.store(true, Ordering::Relaxed);
406 Ok(())
407 }
408
409 fn is_healthy(&self) -> bool {
410 !self.closed.load(Ordering::Relaxed)
411 }
412
413 fn name(&self) -> &'static str {
414 "http"
415 }
416}
417
418impl TransportSender for HttpTransport {
419 async fn send(&self, key: &str, payload: &[u8]) -> SendResult {
420 if self.closed.load(Ordering::Relaxed) {
421 return SendResult::Fatal(TransportError::Closed);
422 }
423
424 if self.filter_engine.has_outbound_filters() {
426 match self.filter_engine.apply_outbound(payload) {
427 super::filter::FilterDisposition::Pass => {}
428 super::filter::FilterDisposition::Drop => return SendResult::Ok,
429 super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
430 }
431 }
432
433 let Some(base_url) = &self.endpoint else {
434 return SendResult::Fatal(TransportError::Config(
435 "no endpoint configured for sending".into(),
436 ));
437 };
438
439 let url = if key.is_empty() {
441 base_url.clone()
442 } else {
443 let base = base_url.trim_end_matches('/');
444 let suffix = key.trim_start_matches('/');
445 format!("{base}/{suffix}")
446 };
447
448 #[cfg(feature = "metrics")]
449 let start = std::time::Instant::now();
450
451 let request_builder = self
453 .client
454 .post(&url)
455 .header("content-type", "application/octet-stream");
456
457 #[cfg(feature = "transport-trace")]
458 let request_builder = if let Some(tp) = super::propagation::current_traceparent() {
459 request_builder.header(super::propagation::TRACEPARENT_HEADER, tp)
460 } else {
461 request_builder
462 };
463
464 let result = match request_builder.body(payload.to_vec()).send().await {
465 Ok(resp) if resp.status().is_success() => {
466 #[cfg(feature = "logger")]
467 tracing::debug!(url = %url, bytes = payload.len(), "HTTP transport: POST sent");
468
469 #[cfg(feature = "metrics")]
470 metrics::counter!("dfe_transport_sent_total", "transport" => "http").increment(1);
471 SendResult::Ok
472 }
473 Ok(resp)
474 if resp.status() == reqwest::StatusCode::TOO_MANY_REQUESTS
475 || resp.status() == reqwest::StatusCode::SERVICE_UNAVAILABLE =>
476 {
477 #[cfg(feature = "logger")]
478 tracing::warn!(status = %resp.status(), url = %url, "HTTP transport: backpressure");
479
480 #[cfg(feature = "metrics")]
481 metrics::counter!("dfe_transport_backpressured_total", "transport" => "http")
482 .increment(1);
483 SendResult::Backpressured
484 }
485 Ok(resp) => {
486 #[cfg(feature = "logger")]
487 tracing::warn!(status = %resp.status(), url = %url, "HTTP transport: send error");
488
489 #[cfg(feature = "metrics")]
490 metrics::counter!("dfe_transport_send_errors_total", "transport" => "http")
491 .increment(1);
492 SendResult::Fatal(TransportError::Send(format!(
493 "HTTP {} from {}",
494 resp.status(),
495 url
496 )))
497 }
498 Err(e) => {
499 #[cfg(feature = "logger")]
500 tracing::warn!(error = %e, url = %url, "HTTP transport: request failed");
501
502 #[cfg(feature = "metrics")]
503 metrics::counter!("dfe_transport_send_errors_total", "transport" => "http")
504 .increment(1);
505 SendResult::Fatal(TransportError::Send(format!("HTTP request failed: {e}")))
506 }
507 };
508
509 #[cfg(feature = "metrics")]
510 metrics::histogram!("dfe_transport_send_duration_seconds", "transport" => "http")
511 .record(start.elapsed().as_secs_f64());
512
513 result
514 }
515}
516
517impl TransportReceiver for HttpTransport {
518 type Token = HttpToken;
519
520 async fn recv(&self, max: usize) -> TransportResult<Vec<Message<Self::Token>>> {
521 if self.closed.load(Ordering::Relaxed) {
522 return Err(TransportError::Closed);
523 }
524
525 #[cfg(feature = "http-server")]
526 {
527 let Some(receiver) = &self.receiver else {
528 return Err(TransportError::Config(
529 "no listen address configured for receiving".into(),
530 ));
531 };
532
533 let mut rx = receiver.lock().await;
534 let mut messages = Vec::with_capacity(max.min(100));
535
536 for _ in 0..max {
537 let result = if self.recv_timeout_ms == 0 {
538 match rx.try_recv() {
539 Ok(msg) => Some(msg),
540 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
541 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
542 return Err(TransportError::Closed);
543 }
544 }
545 } else if messages.is_empty() {
546 match tokio::time::timeout(
548 std::time::Duration::from_millis(self.recv_timeout_ms),
549 rx.recv(),
550 )
551 .await
552 {
553 Ok(Some(msg)) => Some(msg),
554 Ok(None) => return Err(TransportError::Closed),
555 Err(_) => break, }
557 } else {
558 match rx.try_recv() {
560 Ok(msg) => Some(msg),
561 Err(_) => break,
562 }
563 };
564
565 if let Some(msg) = result {
566 messages.push(msg);
567 }
568 }
569
570 if self.filter_engine.has_inbound_filters() {
572 let mut staged_dlq: Vec<super::filter::FilteredDlqEntry> = Vec::new();
573 messages.retain(|msg| match self.filter_engine.apply_inbound(&msg.payload) {
574 super::filter::FilterDisposition::Pass => true,
575 super::filter::FilterDisposition::Drop => false,
576 super::filter::FilterDisposition::Dlq => {
577 staged_dlq.push(super::filter::FilteredDlqEntry {
578 payload: msg.payload.clone(),
579 key: msg.key.clone(),
580 reason: "transport filter".to_string(),
581 });
582 false
583 }
584 });
585 if !staged_dlq.is_empty() {
586 self.filtered_dlq_buffer.lock().extend(staged_dlq);
587 }
588 }
589
590 #[cfg(feature = "logger")]
591 if !messages.is_empty() {
592 tracing::debug!(messages = messages.len(), "HTTP transport: batch received");
593 }
594
595 Ok(messages)
596 }
597
598 #[cfg(not(feature = "http-server"))]
599 {
600 let _ = max;
601 Err(TransportError::Config(
602 "HTTP receive requires the 'http-server' feature".into(),
603 ))
604 }
605 }
606
607 fn take_filtered_dlq_entries(&self) -> Vec<super::filter::FilteredDlqEntry> {
608 std::mem::take(&mut *self.filtered_dlq_buffer.lock())
609 }
610
611 async fn commit(&self, _tokens: &[Self::Token]) -> TransportResult<()> {
612 Ok(())
614 }
615}
616
617impl Drop for HttpTransport {
618 fn drop(&mut self) {
619 #[cfg(feature = "http-server")]
620 if let Some(tx) = self.shutdown_tx.take() {
621 let _ = tx.send(());
622 }
623 }
624}
625
626#[cfg(test)]
627mod tests {
628 use super::*;
629
630 #[test]
631 fn http_token_display() {
632 let token = HttpToken::new(42);
633 assert_eq!(format!("{token}"), "http:42");
634 }
635
636 #[test]
637 fn http_token_display_with_source() {
638 let token = HttpToken::with_source(7, "192.168.1.1:54321".to_string());
639 assert_eq!(format!("{token}"), "http:192.168.1.1:54321:7");
640 }
641
642 #[test]
643 fn config_defaults() {
644 let config = HttpTransportConfig::default();
645 assert!(config.endpoint.is_none());
646 assert!(config.listen.is_none());
647 assert_eq!(config.recv_path, "/ingest");
648 assert_eq!(config.recv_buffer_size, 10_000);
649 assert_eq!(config.recv_timeout_ms, 100);
650 }
651
652 #[test]
653 fn config_sender_helper() {
654 let config = HttpTransportConfig::sender("http://localhost:8080/ingest");
655 assert_eq!(
656 config.endpoint.as_deref(),
657 Some("http://localhost:8080/ingest")
658 );
659 assert!(config.listen.is_none());
660 }
661
662 #[test]
663 fn config_receiver_helper() {
664 let config = HttpTransportConfig::receiver("0.0.0.0:8080");
665 assert!(config.endpoint.is_none());
666 assert_eq!(config.listen.as_deref(), Some("0.0.0.0:8080"));
667 }
668
669 #[tokio::test]
670 async fn send_only_transport() {
671 let config = HttpTransportConfig::default();
673 let transport = HttpTransport::new(&config).await.unwrap();
674
675 assert!(transport.is_healthy());
676 assert_eq!(transport.name(), "http");
677
678 let result = transport.send("test", b"payload").await;
680 assert!(result.is_fatal());
681
682 transport.commit(&[]).await.unwrap();
684 }
685
686 #[tokio::test]
687 async fn close_prevents_send() {
688 let config = HttpTransportConfig::sender("http://localhost:19999/test");
689 let transport = HttpTransport::new(&config).await.unwrap();
690
691 transport.close().await.unwrap();
692 assert!(!transport.is_healthy());
693
694 let result = transport.send("test", b"data").await;
695 assert!(result.is_fatal());
696 }
697
698 #[tokio::test]
699 async fn close_prevents_recv() {
700 let config = HttpTransportConfig::default();
701 let transport = HttpTransport::new(&config).await.unwrap();
702
703 transport.close().await.unwrap();
704 let result = transport.recv(1).await;
705 assert!(result.is_err());
706 }
707
708 #[cfg(feature = "http-server")]
711 #[tokio::test]
712 async fn send_and_receive_roundtrip() {
713 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
715 let addr = listener.local_addr().unwrap();
716 drop(listener); let recv_config = HttpTransportConfig {
719 listen: Some(addr.to_string()),
720 recv_path: "/ingest".to_string(),
721 recv_buffer_size: 100,
722 recv_timeout_ms: 1000,
723 ..Default::default()
724 };
725 let receiver = HttpTransport::new(&recv_config).await.unwrap();
726
727 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
729
730 let send_config =
732 HttpTransportConfig::sender(&format!("http://127.0.0.1:{}/ingest", addr.port()));
733 let sender = HttpTransport::new(&send_config).await.unwrap();
734
735 let result = sender.send("", b"{\"msg\":\"hello\"}").await;
736 assert!(result.is_ok(), "send failed: {result:?}");
737
738 let messages = receiver.recv(10).await.unwrap();
740 assert_eq!(messages.len(), 1);
741 assert_eq!(messages[0].payload, b"{\"msg\":\"hello\"}");
742 assert!(messages[0].token.source_addr.is_some());
743
744 sender.close().await.unwrap();
746 receiver.close().await.unwrap();
747 }
748
749 #[cfg(feature = "http-server")]
751 #[tokio::test]
752 async fn receive_rejects_empty_body() {
753 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
754 let addr = listener.local_addr().unwrap();
755 drop(listener);
756
757 let recv_config = HttpTransportConfig {
758 listen: Some(addr.to_string()),
759 recv_timeout_ms: 200,
760 ..Default::default()
761 };
762 let receiver = HttpTransport::new(&recv_config).await.unwrap();
763 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
764
765 let client = reqwest::Client::new();
767 let resp = client
768 .post(format!("http://127.0.0.1:{}/ingest", addr.port()))
769 .body(Vec::<u8>::new())
770 .send()
771 .await
772 .unwrap();
773
774 assert_eq!(resp.status(), reqwest::StatusCode::BAD_REQUEST);
775
776 let messages = receiver.recv(10).await.unwrap();
778 assert!(messages.is_empty());
779
780 receiver.close().await.unwrap();
781 }
782
783 #[cfg(feature = "http-server")]
785 #[tokio::test]
786 async fn recv_without_listen_returns_error() {
787 let config = HttpTransportConfig::sender("http://localhost:9999");
788 let transport = HttpTransport::new(&config).await.unwrap();
789
790 let result = transport.recv(10).await;
791 assert!(result.is_err());
792 }
793
794 #[test]
795 fn config_serde_roundtrip() {
796 let config = HttpTransportConfig {
797 endpoint: Some("http://example.com/ingest".into()),
798 listen: Some("0.0.0.0:8080".into()),
799 recv_path: "/custom".into(),
800 recv_buffer_size: 5000,
801 recv_timeout_ms: 250,
802 ..Default::default()
803 };
804
805 let json = serde_json::to_string(&config).unwrap();
806 let parsed: HttpTransportConfig = serde_json::from_str(&json).unwrap();
807
808 assert_eq!(parsed.endpoint, config.endpoint);
809 assert_eq!(parsed.listen, config.listen);
810 assert_eq!(parsed.recv_path, config.recv_path);
811 assert_eq!(parsed.recv_buffer_size, config.recv_buffer_size);
812 assert_eq!(parsed.recv_timeout_ms, config.recv_timeout_ms);
813 }
814}