dynamo_runtime/pipeline/network/egress/
http_router.rs1use super::unified_client::{Headers, RequestPlaneClient};
7use crate::Result;
8use async_trait::async_trait;
9use bytes::Bytes;
10use std::sync::Arc;
11use std::time::Duration;
12
13const DEFAULT_HTTP_REQUEST_TIMEOUT_SECS: u64 = 5;
15
16const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 1024; const DEFAULT_MAX_CONCURRENT_STREAMS: u32 = 1000; const DEFAULT_POOL_MAX_IDLE_PER_HOST: usize = 100; const DEFAULT_POOL_IDLE_TIMEOUT_SECS: u64 = 90; const DEFAULT_HTTP2_KEEP_ALIVE_INTERVAL_SECS: u64 = 30; const DEFAULT_HTTP2_KEEP_ALIVE_TIMEOUT_SECS: u64 = 10; const DEFAULT_HTTP2_ADAPTIVE_WINDOW: bool = true; #[derive(Debug, Clone)]
27pub struct Http2Config {
28 pub max_frame_size: u32,
29 pub max_concurrent_streams: u32,
30 pub pool_max_idle_per_host: usize,
31 pub pool_idle_timeout: Duration,
32 pub keep_alive_interval: Duration,
33 pub keep_alive_timeout: Duration,
34 pub adaptive_window: bool,
35 pub request_timeout: Duration,
36}
37
38impl Default for Http2Config {
39 fn default() -> Self {
40 Self {
41 max_frame_size: DEFAULT_MAX_FRAME_SIZE,
42 max_concurrent_streams: DEFAULT_MAX_CONCURRENT_STREAMS,
43 pool_max_idle_per_host: DEFAULT_POOL_MAX_IDLE_PER_HOST,
44 pool_idle_timeout: Duration::from_secs(DEFAULT_POOL_IDLE_TIMEOUT_SECS),
45 keep_alive_interval: Duration::from_secs(DEFAULT_HTTP2_KEEP_ALIVE_INTERVAL_SECS),
46 keep_alive_timeout: Duration::from_secs(DEFAULT_HTTP2_KEEP_ALIVE_TIMEOUT_SECS),
47 adaptive_window: DEFAULT_HTTP2_ADAPTIVE_WINDOW,
48 request_timeout: Duration::from_secs(DEFAULT_HTTP_REQUEST_TIMEOUT_SECS),
49 }
50 }
51}
52
53impl Http2Config {
54 pub fn from_env() -> Self {
56 let mut config = Self::default();
57
58 if let Ok(val) = std::env::var("DYN_HTTP2_MAX_FRAME_SIZE")
59 && let Ok(size) = val.parse::<u32>()
60 {
61 config.max_frame_size = size;
62 }
63
64 if let Ok(val) = std::env::var("DYN_HTTP2_MAX_CONCURRENT_STREAMS")
65 && let Ok(streams) = val.parse::<u32>()
66 {
67 config.max_concurrent_streams = streams;
68 }
69
70 if let Ok(val) = std::env::var("DYN_HTTP2_POOL_MAX_IDLE_PER_HOST")
71 && let Ok(pool_size) = val.parse::<usize>()
72 {
73 config.pool_max_idle_per_host = pool_size;
74 }
75
76 if let Ok(val) = std::env::var("DYN_HTTP2_POOL_IDLE_TIMEOUT_SECS")
77 && let Ok(timeout) = val.parse::<u64>()
78 {
79 config.pool_idle_timeout = Duration::from_secs(timeout);
80 }
81
82 if let Ok(val) = std::env::var("DYN_HTTP2_KEEP_ALIVE_INTERVAL_SECS")
83 && let Ok(interval) = val.parse::<u64>()
84 {
85 config.keep_alive_interval = Duration::from_secs(interval);
86 }
87
88 if let Ok(val) = std::env::var("DYN_HTTP2_KEEP_ALIVE_TIMEOUT_SECS")
89 && let Ok(timeout) = val.parse::<u64>()
90 {
91 config.keep_alive_timeout = Duration::from_secs(timeout);
92 }
93
94 if let Ok(val) = std::env::var("DYN_HTTP2_ADAPTIVE_WINDOW") {
95 config.adaptive_window = val.parse().unwrap_or(DEFAULT_HTTP2_ADAPTIVE_WINDOW);
96 }
97
98 if let Ok(val) = std::env::var("DYN_HTTP_REQUEST_TIMEOUT")
99 && let Ok(timeout) = val.parse::<u64>()
100 {
101 config.request_timeout = Duration::from_secs(timeout);
102 }
103
104 config
105 }
106}
107
108pub struct HttpRequestClient {
110 client: reqwest::Client,
111 config: Http2Config,
112}
113
114impl HttpRequestClient {
115 pub fn new() -> Result<Self> {
117 Self::with_config(Http2Config::default())
118 }
119
120 pub fn with_timeout(timeout: Duration) -> Result<Self> {
123 let config = Http2Config {
124 request_timeout: timeout,
125 ..Http2Config::default()
126 };
127 Self::with_config(config)
128 }
129
130 pub fn with_config(config: Http2Config) -> Result<Self> {
135 let builder = reqwest::Client::builder()
136 .pool_max_idle_per_host(config.pool_max_idle_per_host)
137 .pool_idle_timeout(config.pool_idle_timeout)
138 .timeout(config.request_timeout);
139 let client = builder.build()?;
142
143 Ok(Self { client, config })
144 }
145
146 pub fn from_env() -> Result<Self> {
148 Self::with_config(Http2Config::from_env())
149 }
150
151 pub fn config(&self) -> &Http2Config {
153 &self.config
154 }
155}
156
157impl Default for HttpRequestClient {
158 fn default() -> Self {
159 Self::new().expect("Failed to create HTTP request client")
160 }
161}
162
163#[async_trait]
164impl RequestPlaneClient for HttpRequestClient {
165 async fn send_request(
166 &self,
167 address: String,
168 payload: Bytes,
169 headers: Headers,
170 ) -> Result<Bytes> {
171 let mut req = self
172 .client
173 .post(&address)
174 .header("Content-Type", "application/octet-stream")
175 .body(payload);
176
177 for (key, value) in headers {
179 req = req.header(key, value);
180 }
181
182 let response = req.send().await.map_err(|e| {
183 anyhow::anyhow!(
184 crate::error::DynamoError::builder()
185 .error_type(crate::error::ErrorType::CannotConnect)
186 .message(format!("HTTP request to {address} failed"))
187 .cause(e)
188 .build()
189 )
190 })?;
191
192 if !response.status().is_success() {
193 anyhow::bail!(
194 "HTTP request failed with status {}: {}",
195 response.status(),
196 response.text().await.unwrap_or_default()
197 );
198 }
199
200 let body = response.bytes().await?;
201 Ok(body)
202 }
203
204 fn transport_name(&self) -> &'static str {
205 "http2"
206 }
207
208 fn is_healthy(&self) -> bool {
209 true
211 }
212}
213
214#[cfg(test)]
215mod tests {
216 use super::*;
217 use axum::{Router, body::Bytes as AxumBytes, extract::State as AxumState, routing::post};
218 use std::sync::Arc;
219 use tokio::sync::Mutex as TokioMutex;
220
221 #[test]
222 fn test_http_client_creation() {
223 let client = HttpRequestClient::new();
224 assert!(client.is_ok());
225 }
226
227 #[test]
228 fn test_http_client_with_custom_timeout() {
229 let client = HttpRequestClient::with_timeout(Duration::from_secs(10));
230 assert!(client.is_ok());
231 assert_eq!(
232 client.unwrap().config.request_timeout,
233 Duration::from_secs(10)
234 );
235 }
236
237 #[test]
238 fn test_http2_config_from_env() {
239 unsafe {
241 std::env::set_var("DYN_HTTP2_MAX_FRAME_SIZE", "2097152"); std::env::set_var("DYN_HTTP2_MAX_CONCURRENT_STREAMS", "2000");
243 std::env::set_var("DYN_HTTP2_POOL_MAX_IDLE_PER_HOST", "200");
244 std::env::set_var("DYN_HTTP2_KEEP_ALIVE_INTERVAL_SECS", "60");
245 std::env::set_var("DYN_HTTP2_ADAPTIVE_WINDOW", "false");
246 }
247
248 let config = Http2Config::from_env();
249
250 assert_eq!(config.max_frame_size, 2097152);
251 assert_eq!(config.max_concurrent_streams, 2000);
252 assert_eq!(config.pool_max_idle_per_host, 200);
253 assert_eq!(config.keep_alive_interval, Duration::from_secs(60));
254 assert!(!config.adaptive_window);
255
256 unsafe {
258 std::env::remove_var("DYN_HTTP2_MAX_FRAME_SIZE");
259 std::env::remove_var("DYN_HTTP2_MAX_CONCURRENT_STREAMS");
260 std::env::remove_var("DYN_HTTP2_POOL_MAX_IDLE_PER_HOST");
261 std::env::remove_var("DYN_HTTP2_KEEP_ALIVE_INTERVAL_SECS");
262 std::env::remove_var("DYN_HTTP2_ADAPTIVE_WINDOW");
263 }
264 }
265
266 #[test]
267 fn test_http_client_with_custom_config() {
268 let config = Http2Config {
269 max_frame_size: 512 * 1024, max_concurrent_streams: 500,
271 pool_max_idle_per_host: 75,
272 pool_idle_timeout: Duration::from_secs(60),
273 keep_alive_interval: Duration::from_secs(45),
274 keep_alive_timeout: Duration::from_secs(15),
275 adaptive_window: false,
276 request_timeout: Duration::from_secs(8),
277 };
278
279 let client = HttpRequestClient::with_config(config.clone());
280 assert!(client.is_ok());
281
282 let client = client.unwrap();
283 assert_eq!(client.config.max_frame_size, 512 * 1024);
284 assert_eq!(client.config.max_concurrent_streams, 500);
285 assert_eq!(client.config.pool_max_idle_per_host, 75);
286 assert_eq!(client.config.request_timeout, Duration::from_secs(8));
287 }
288
289 #[tokio::test]
290 async fn test_http_client_send_request_invalid_url() {
291 let client = HttpRequestClient::new().unwrap();
292 let result = client
293 .send_request(
294 "http://invalid-host-that-does-not-exist:9999/test".to_string(),
295 Bytes::from("test"),
296 std::collections::HashMap::new(),
297 )
298 .await;
299 assert!(result.is_err());
300 }
301
302 #[tokio::test]
303 async fn test_http2_client_server_integration() {
304 use hyper_util::rt::{TokioExecutor, TokioIo};
305 use hyper_util::server::conn::auto::Builder as ConnBuilder;
306 use hyper_util::service::TowerToHyperService;
307
308 #[derive(Clone)]
310 struct TestState {
311 received: Arc<TokioMutex<Vec<Bytes>>>,
312 protocol_version: Arc<TokioMutex<Option<String>>>,
313 }
314
315 async fn test_handler(
316 AxumState(state): AxumState<TestState>,
317 body: AxumBytes,
318 ) -> &'static str {
319 state.received.lock().await.push(body);
320 "OK"
321 }
322
323 let state = TestState {
324 received: Arc::new(TokioMutex::new(Vec::new())),
325 protocol_version: Arc::new(TokioMutex::new(None)),
326 };
327
328 let app = Router::new()
329 .route("/test", post(test_handler))
330 .with_state(state.clone());
331
332 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
334 let addr = listener.local_addr().unwrap();
335
336 let server_handle = tokio::spawn(async move {
338 loop {
339 let Ok((stream, _)) = listener.accept().await else {
340 break;
341 };
342
343 let app = app.clone();
344 tokio::spawn(async move {
345 let conn_builder = ConnBuilder::new(TokioExecutor::new());
346 let io = TokioIo::new(stream);
347 let tower_service = app.into_service();
348 let hyper_service = TowerToHyperService::new(tower_service);
349
350 let _ = conn_builder.serve_connection(io, hyper_service).await;
351 });
352 }
353 });
354
355 tokio::time::sleep(Duration::from_millis(100)).await;
357
358 let client = HttpRequestClient::new().unwrap();
360
361 let test_data = Bytes::from("test_payload");
363 let result = client
364 .send_request(
365 format!("http://{}/test", addr),
366 test_data.clone(),
367 std::collections::HashMap::new(),
368 )
369 .await;
370
371 assert!(result.is_ok(), "Request failed: {:?}", result.err());
373
374 tokio::time::sleep(Duration::from_millis(100)).await;
376 let received = state.received.lock().await;
377 assert_eq!(received.len(), 1);
378 assert_eq!(received[0], test_data);
379
380 server_handle.abort();
382 }
383
384 #[tokio::test]
385 async fn test_http2_headers_propagation() {
386 use hyper_util::rt::{TokioExecutor, TokioIo};
387 use hyper_util::server::conn::auto::Builder as ConnBuilder;
388 use hyper_util::service::TowerToHyperService;
389
390 #[derive(Clone)]
392 struct HeaderState {
393 headers: Arc<TokioMutex<Vec<(String, String)>>>,
394 }
395
396 async fn header_handler(
397 AxumState(state): AxumState<HeaderState>,
398 headers: axum::http::HeaderMap,
399 ) -> &'static str {
400 let mut captured = state.headers.lock().await;
401 for (name, value) in headers.iter() {
402 if let Ok(val_str) = value.to_str() {
403 captured.push((name.to_string(), val_str.to_string()));
404 }
405 }
406 "OK"
407 }
408
409 let state = HeaderState {
410 headers: Arc::new(TokioMutex::new(Vec::new())),
411 };
412
413 let app = Router::new()
414 .route("/test", post(header_handler))
415 .with_state(state.clone());
416
417 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
419 let addr = listener.local_addr().unwrap();
420
421 let server_handle = tokio::spawn(async move {
423 loop {
424 let Ok((stream, _)) = listener.accept().await else {
425 break;
426 };
427
428 let app = app.clone();
429 tokio::spawn(async move {
430 let conn_builder = ConnBuilder::new(TokioExecutor::new());
431 let io = TokioIo::new(stream);
432 let tower_service = app.into_service();
433 let hyper_service = TowerToHyperService::new(tower_service);
434
435 let _ = conn_builder.serve_connection(io, hyper_service).await;
436 });
437 }
438 });
439
440 tokio::time::sleep(Duration::from_millis(100)).await;
442
443 let client = HttpRequestClient::new().unwrap();
445
446 let mut headers = std::collections::HashMap::new();
448 headers.insert("x-test-header".to_string(), "test-value".to_string());
449 headers.insert("x-request-id".to_string(), "req-123".to_string());
450
451 let result = client
452 .send_request(
453 format!("http://{}/test", addr),
454 Bytes::from("test"),
455 headers,
456 )
457 .await;
458
459 assert!(result.is_ok());
461
462 tokio::time::sleep(Duration::from_millis(100)).await;
464 let received_headers = state.headers.lock().await;
465
466 let header_map: std::collections::HashMap<_, _> = received_headers
467 .iter()
468 .map(|(k, v)| (k.as_str(), v.as_str()))
469 .collect();
470
471 assert!(header_map.contains_key("x-test-header"));
472 assert_eq!(header_map.get("x-test-header"), Some(&"test-value"));
473 assert!(header_map.contains_key("x-request-id"));
474 assert_eq!(header_map.get("x-request-id"), Some(&"req-123"));
475
476 server_handle.abort();
478 }
479
480 #[tokio::test]
481 async fn test_http2_concurrent_requests() {
482 use hyper_util::rt::{TokioExecutor, TokioIo};
483 use hyper_util::server::conn::auto::Builder as ConnBuilder;
484 use hyper_util::service::TowerToHyperService;
485 use std::sync::atomic::{AtomicU64, Ordering};
486
487 #[derive(Clone)]
489 struct CounterState {
490 count: Arc<AtomicU64>,
491 }
492
493 async fn counter_handler(AxumState(state): AxumState<CounterState>) -> String {
494 let count = state.count.fetch_add(1, Ordering::SeqCst);
495 format!("{}", count)
496 }
497
498 let state = CounterState {
499 count: Arc::new(AtomicU64::new(0)),
500 };
501
502 let app = Router::new()
503 .route("/test", post(counter_handler))
504 .with_state(state.clone());
505
506 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
508 let addr = listener.local_addr().unwrap();
509
510 let server_handle = tokio::spawn(async move {
512 loop {
513 let Ok((stream, _)) = listener.accept().await else {
514 break;
515 };
516
517 let app = app.clone();
518 tokio::spawn(async move {
519 let conn_builder = ConnBuilder::new(TokioExecutor::new());
520 let io = TokioIo::new(stream);
521 let tower_service = app.into_service();
522 let hyper_service = TowerToHyperService::new(tower_service);
523
524 let _ = conn_builder.serve_connection(io, hyper_service).await;
525 });
526 }
527 });
528
529 tokio::time::sleep(Duration::from_millis(100)).await;
531
532 let client = Arc::new(HttpRequestClient::new().unwrap());
534
535 let mut handles = vec![];
537 for _ in 0..10 {
538 let client = client.clone();
539 let handle = tokio::spawn(async move {
540 client
541 .send_request(
542 format!("http://{}/test", addr),
543 Bytes::from("test"),
544 std::collections::HashMap::new(),
545 )
546 .await
547 });
548 handles.push(handle);
549 }
550
551 let mut success_count = 0;
553 for handle in handles {
554 if let Ok(Ok(_)) = handle.await {
555 success_count += 1;
556 }
557 }
558
559 assert_eq!(success_count, 10);
561
562 assert_eq!(state.count.load(Ordering::SeqCst), 10);
564
565 server_handle.abort();
567 }
568
569 #[tokio::test]
570 async fn test_http2_performance_benchmark() {
571 use hyper_util::rt::{TokioExecutor, TokioIo};
572 use hyper_util::server::conn::auto::Builder as ConnBuilder;
573 use hyper_util::service::TowerToHyperService;
574 use std::sync::atomic::{AtomicU64, Ordering};
575 use std::time::Instant;
576
577 #[derive(Clone)]
579 struct PerfState {
580 request_count: Arc<AtomicU64>,
581 total_bytes: Arc<AtomicU64>,
582 }
583
584 async fn perf_handler(
585 AxumState(state): AxumState<PerfState>,
586 body: AxumBytes,
587 ) -> &'static str {
588 state.request_count.fetch_add(1, Ordering::Relaxed);
589 state
590 .total_bytes
591 .fetch_add(body.len() as u64, Ordering::Relaxed);
592 "OK"
593 }
594
595 let state = PerfState {
596 request_count: Arc::new(AtomicU64::new(0)),
597 total_bytes: Arc::new(AtomicU64::new(0)),
598 };
599
600 let app = Router::new()
601 .route("/perf", post(perf_handler))
602 .with_state(state.clone());
603
604 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
606 let addr = listener.local_addr().unwrap();
607
608 let server_handle = tokio::spawn(async move {
610 loop {
611 let Ok((stream, _)) = listener.accept().await else {
612 break;
613 };
614
615 let app = app.clone();
616 tokio::spawn(async move {
617 let conn_builder = ConnBuilder::new(TokioExecutor::new());
618 let io = TokioIo::new(stream);
619 let tower_service = app.into_service();
620 let hyper_service = TowerToHyperService::new(tower_service);
621
622 let _ = conn_builder.serve_connection(io, hyper_service).await;
623 });
624 }
625 });
626
627 tokio::time::sleep(Duration::from_millis(100)).await;
629
630 let optimized_config = Http2Config {
632 max_frame_size: 1024 * 1024, max_concurrent_streams: 1000,
634 pool_max_idle_per_host: 100,
635 pool_idle_timeout: Duration::from_secs(90),
636 keep_alive_interval: Duration::from_secs(30),
637 keep_alive_timeout: Duration::from_secs(10),
638 adaptive_window: true,
639 request_timeout: Duration::from_secs(30),
640 };
641
642 let client = Arc::new(HttpRequestClient::with_config(optimized_config).unwrap());
643
644 let num_requests = 100;
646 let payload_size = 64 * 1024; let payload = Bytes::from(vec![0u8; payload_size]);
648
649 let start_time = Instant::now();
650 let mut handles = vec![];
651
652 for _ in 0..num_requests {
653 let client = client.clone();
654 let payload = payload.clone();
655
656 let handle = tokio::spawn(async move {
657 let headers = std::collections::HashMap::new();
658 client
659 .send_request(format!("http://{}/perf", addr), payload, headers)
660 .await
661 });
662 handles.push(handle);
663 }
664
665 let mut successful_requests = 0;
667 for handle in handles {
668 if handle.await.unwrap().is_ok() {
669 successful_requests += 1;
670 }
671 }
672
673 let elapsed = start_time.elapsed();
674 let requests_per_sec = successful_requests as f64 / elapsed.as_secs_f64();
675 let throughput_mbps =
676 (successful_requests * payload_size) as f64 / elapsed.as_secs_f64() / (1024.0 * 1024.0);
677
678 println!("Performance Results:");
679 println!(
680 " Successful requests: {}/{}",
681 successful_requests, num_requests
682 );
683 println!(" Total time: {:?}", elapsed);
684 println!(" Requests/sec: {:.2}", requests_per_sec);
685 println!(" Throughput: {:.2} MB/s", throughput_mbps);
686
687 let server_count = state.request_count.load(Ordering::Relaxed);
689 let server_bytes = state.total_bytes.load(Ordering::Relaxed);
690
691 assert_eq!(server_count, successful_requests as u64);
692 assert_eq!(server_bytes, (successful_requests * payload_size) as u64);
693
694 assert!(successful_requests >= num_requests * 95 / 100); assert!(requests_per_sec > 50.0); assert!(throughput_mbps > 10.0); server_handle.abort();
701 }
702}