Skip to main content

dynamo_runtime/pipeline/network/egress/
http_router.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! HTTP/2 client for request plane
5
6use super::unified_client::{Headers, RequestPlaneClient};
7use crate::Result;
8use async_trait::async_trait;
9use bytes::Bytes;
10use std::sync::Arc;
11use std::time::Duration;
12
13/// Default timeout for HTTP requests (ack only, not full response)
14const DEFAULT_HTTP_REQUEST_TIMEOUT_SECS: u64 = 5;
15
16/// HTTP/2 Performance Configuration Constants
17const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 1024; // 1MB frame size for better throughput
18const DEFAULT_MAX_CONCURRENT_STREAMS: u32 = 1000; // Allow more concurrent streams
19const DEFAULT_POOL_MAX_IDLE_PER_HOST: usize = 100; // Increased connection pool
20const DEFAULT_POOL_IDLE_TIMEOUT_SECS: u64 = 90; // Keep connections alive longer
21const DEFAULT_HTTP2_KEEP_ALIVE_INTERVAL_SECS: u64 = 30; // Send pings every 30s
22const DEFAULT_HTTP2_KEEP_ALIVE_TIMEOUT_SECS: u64 = 10; // Timeout for ping responses
23const DEFAULT_HTTP2_ADAPTIVE_WINDOW: bool = true; // Enable adaptive flow control
24
25/// HTTP/2 Performance Configuration
26#[derive(Debug, Clone)]
27pub struct Http2Config {
28    pub max_frame_size: u32,
29    pub max_concurrent_streams: u32,
30    pub pool_max_idle_per_host: usize,
31    pub pool_idle_timeout: Duration,
32    pub keep_alive_interval: Duration,
33    pub keep_alive_timeout: Duration,
34    pub adaptive_window: bool,
35    pub request_timeout: Duration,
36}
37
38impl Default for Http2Config {
39    fn default() -> Self {
40        Self {
41            max_frame_size: DEFAULT_MAX_FRAME_SIZE,
42            max_concurrent_streams: DEFAULT_MAX_CONCURRENT_STREAMS,
43            pool_max_idle_per_host: DEFAULT_POOL_MAX_IDLE_PER_HOST,
44            pool_idle_timeout: Duration::from_secs(DEFAULT_POOL_IDLE_TIMEOUT_SECS),
45            keep_alive_interval: Duration::from_secs(DEFAULT_HTTP2_KEEP_ALIVE_INTERVAL_SECS),
46            keep_alive_timeout: Duration::from_secs(DEFAULT_HTTP2_KEEP_ALIVE_TIMEOUT_SECS),
47            adaptive_window: DEFAULT_HTTP2_ADAPTIVE_WINDOW,
48            request_timeout: Duration::from_secs(DEFAULT_HTTP_REQUEST_TIMEOUT_SECS),
49        }
50    }
51}
52
53impl Http2Config {
54    /// Create configuration from environment variables
55    pub fn from_env() -> Self {
56        let mut config = Self::default();
57
58        if let Ok(val) = std::env::var("DYN_HTTP2_MAX_FRAME_SIZE")
59            && let Ok(size) = val.parse::<u32>()
60        {
61            config.max_frame_size = size;
62        }
63
64        if let Ok(val) = std::env::var("DYN_HTTP2_MAX_CONCURRENT_STREAMS")
65            && let Ok(streams) = val.parse::<u32>()
66        {
67            config.max_concurrent_streams = streams;
68        }
69
70        if let Ok(val) = std::env::var("DYN_HTTP2_POOL_MAX_IDLE_PER_HOST")
71            && let Ok(pool_size) = val.parse::<usize>()
72        {
73            config.pool_max_idle_per_host = pool_size;
74        }
75
76        if let Ok(val) = std::env::var("DYN_HTTP2_POOL_IDLE_TIMEOUT_SECS")
77            && let Ok(timeout) = val.parse::<u64>()
78        {
79            config.pool_idle_timeout = Duration::from_secs(timeout);
80        }
81
82        if let Ok(val) = std::env::var("DYN_HTTP2_KEEP_ALIVE_INTERVAL_SECS")
83            && let Ok(interval) = val.parse::<u64>()
84        {
85            config.keep_alive_interval = Duration::from_secs(interval);
86        }
87
88        if let Ok(val) = std::env::var("DYN_HTTP2_KEEP_ALIVE_TIMEOUT_SECS")
89            && let Ok(timeout) = val.parse::<u64>()
90        {
91            config.keep_alive_timeout = Duration::from_secs(timeout);
92        }
93
94        if let Ok(val) = std::env::var("DYN_HTTP2_ADAPTIVE_WINDOW") {
95            config.adaptive_window = val.parse().unwrap_or(DEFAULT_HTTP2_ADAPTIVE_WINDOW);
96        }
97
98        if let Ok(val) = std::env::var("DYN_HTTP_REQUEST_TIMEOUT")
99            && let Ok(timeout) = val.parse::<u64>()
100        {
101            config.request_timeout = Duration::from_secs(timeout);
102        }
103
104        config
105    }
106}
107
108/// HTTP/2 request plane client
109pub struct HttpRequestClient {
110    client: reqwest::Client,
111    config: Http2Config,
112}
113
114impl HttpRequestClient {
115    /// Create a new HTTP request client with HTTP/2 and default configuration
116    pub fn new() -> Result<Self> {
117        Self::with_config(Http2Config::default())
118    }
119
120    /// Create a new HTTP request client with custom timeout (legacy method)
121    /// Uses HTTP/2 with prior knowledge to avoid ALPN negotiation overhead
122    pub fn with_timeout(timeout: Duration) -> Result<Self> {
123        let config = Http2Config {
124            request_timeout: timeout,
125            ..Http2Config::default()
126        };
127        Self::with_config(config)
128    }
129
130    /// Create a new HTTP request client with basic configuration
131    ///
132    /// Note: Advanced HTTP/2 configuration methods may not be available in all versions of reqwest.
133    /// This implementation uses only the stable, widely-supported configuration options.
134    pub fn with_config(config: Http2Config) -> Result<Self> {
135        let builder = reqwest::Client::builder()
136            .pool_max_idle_per_host(config.pool_max_idle_per_host)
137            .pool_idle_timeout(config.pool_idle_timeout)
138            .timeout(config.request_timeout);
139        // HTTP/2 is automatically negotiated by reqwest when available
140
141        let client = builder.build()?;
142
143        Ok(Self { client, config })
144    }
145
146    /// Create from environment configuration
147    pub fn from_env() -> Result<Self> {
148        Self::with_config(Http2Config::from_env())
149    }
150
151    /// Get the current HTTP/2 configuration
152    pub fn config(&self) -> &Http2Config {
153        &self.config
154    }
155}
156
157impl Default for HttpRequestClient {
158    fn default() -> Self {
159        Self::new().expect("Failed to create HTTP request client")
160    }
161}
162
163#[async_trait]
164impl RequestPlaneClient for HttpRequestClient {
165    async fn send_request(
166        &self,
167        address: String,
168        payload: Bytes,
169        headers: Headers,
170    ) -> Result<Bytes> {
171        let mut req = self
172            .client
173            .post(&address)
174            .header("Content-Type", "application/octet-stream")
175            .body(payload);
176
177        // Add custom headers
178        for (key, value) in headers {
179            req = req.header(key, value);
180        }
181
182        let response = req.send().await.map_err(|e| {
183            anyhow::anyhow!(
184                crate::error::DynamoError::builder()
185                    .error_type(crate::error::ErrorType::CannotConnect)
186                    .message(format!("HTTP request to {address} failed"))
187                    .cause(e)
188                    .build()
189            )
190        })?;
191
192        if !response.status().is_success() {
193            anyhow::bail!(
194                "HTTP request failed with status {}: {}",
195                response.status(),
196                response.text().await.unwrap_or_default()
197            );
198        }
199
200        let body = response.bytes().await?;
201        Ok(body)
202    }
203
204    fn transport_name(&self) -> &'static str {
205        "http2"
206    }
207
208    fn is_healthy(&self) -> bool {
209        // HTTP client is stateless and always healthy if created successfully
210        true
211    }
212}
213
214#[cfg(test)]
215mod tests {
216    use super::*;
217    use axum::{Router, body::Bytes as AxumBytes, extract::State as AxumState, routing::post};
218    use std::sync::Arc;
219    use tokio::sync::Mutex as TokioMutex;
220
221    #[test]
222    fn test_http_client_creation() {
223        let client = HttpRequestClient::new();
224        assert!(client.is_ok());
225    }
226
227    #[test]
228    fn test_http_client_with_custom_timeout() {
229        let client = HttpRequestClient::with_timeout(Duration::from_secs(10));
230        assert!(client.is_ok());
231        assert_eq!(
232            client.unwrap().config.request_timeout,
233            Duration::from_secs(10)
234        );
235    }
236
237    #[test]
238    fn test_http2_config_from_env() {
239        // Set environment variables
240        unsafe {
241            std::env::set_var("DYN_HTTP2_MAX_FRAME_SIZE", "2097152"); // 2MB
242            std::env::set_var("DYN_HTTP2_MAX_CONCURRENT_STREAMS", "2000");
243            std::env::set_var("DYN_HTTP2_POOL_MAX_IDLE_PER_HOST", "200");
244            std::env::set_var("DYN_HTTP2_KEEP_ALIVE_INTERVAL_SECS", "60");
245            std::env::set_var("DYN_HTTP2_ADAPTIVE_WINDOW", "false");
246        }
247
248        let config = Http2Config::from_env();
249
250        assert_eq!(config.max_frame_size, 2097152);
251        assert_eq!(config.max_concurrent_streams, 2000);
252        assert_eq!(config.pool_max_idle_per_host, 200);
253        assert_eq!(config.keep_alive_interval, Duration::from_secs(60));
254        assert!(!config.adaptive_window);
255
256        // Clean up
257        unsafe {
258            std::env::remove_var("DYN_HTTP2_MAX_FRAME_SIZE");
259            std::env::remove_var("DYN_HTTP2_MAX_CONCURRENT_STREAMS");
260            std::env::remove_var("DYN_HTTP2_POOL_MAX_IDLE_PER_HOST");
261            std::env::remove_var("DYN_HTTP2_KEEP_ALIVE_INTERVAL_SECS");
262            std::env::remove_var("DYN_HTTP2_ADAPTIVE_WINDOW");
263        }
264    }
265
266    #[test]
267    fn test_http_client_with_custom_config() {
268        let config = Http2Config {
269            max_frame_size: 512 * 1024, // 512KB
270            max_concurrent_streams: 500,
271            pool_max_idle_per_host: 75,
272            pool_idle_timeout: Duration::from_secs(60),
273            keep_alive_interval: Duration::from_secs(45),
274            keep_alive_timeout: Duration::from_secs(15),
275            adaptive_window: false,
276            request_timeout: Duration::from_secs(8),
277        };
278
279        let client = HttpRequestClient::with_config(config.clone());
280        assert!(client.is_ok());
281
282        let client = client.unwrap();
283        assert_eq!(client.config.max_frame_size, 512 * 1024);
284        assert_eq!(client.config.max_concurrent_streams, 500);
285        assert_eq!(client.config.pool_max_idle_per_host, 75);
286        assert_eq!(client.config.request_timeout, Duration::from_secs(8));
287    }
288
289    #[tokio::test]
290    async fn test_http_client_send_request_invalid_url() {
291        let client = HttpRequestClient::new().unwrap();
292        let result = client
293            .send_request(
294                "http://invalid-host-that-does-not-exist:9999/test".to_string(),
295                Bytes::from("test"),
296                std::collections::HashMap::new(),
297            )
298            .await;
299        assert!(result.is_err());
300    }
301
302    #[tokio::test]
303    async fn test_http2_client_server_integration() {
304        use hyper_util::rt::{TokioExecutor, TokioIo};
305        use hyper_util::server::conn::auto::Builder as ConnBuilder;
306        use hyper_util::service::TowerToHyperService;
307
308        // Create a test server that accepts HTTP/2
309        #[derive(Clone)]
310        struct TestState {
311            received: Arc<TokioMutex<Vec<Bytes>>>,
312            protocol_version: Arc<TokioMutex<Option<String>>>,
313        }
314
315        async fn test_handler(
316            AxumState(state): AxumState<TestState>,
317            body: AxumBytes,
318        ) -> &'static str {
319            state.received.lock().await.push(body);
320            "OK"
321        }
322
323        let state = TestState {
324            received: Arc::new(TokioMutex::new(Vec::new())),
325            protocol_version: Arc::new(TokioMutex::new(None)),
326        };
327
328        let app = Router::new()
329            .route("/test", post(test_handler))
330            .with_state(state.clone());
331
332        // Bind to a random port
333        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
334        let addr = listener.local_addr().unwrap();
335
336        // Start HTTP/2 server
337        let server_handle = tokio::spawn(async move {
338            loop {
339                let Ok((stream, _)) = listener.accept().await else {
340                    break;
341                };
342
343                let app = app.clone();
344                tokio::spawn(async move {
345                    let conn_builder = ConnBuilder::new(TokioExecutor::new());
346                    let io = TokioIo::new(stream);
347                    let tower_service = app.into_service();
348                    let hyper_service = TowerToHyperService::new(tower_service);
349
350                    let _ = conn_builder.serve_connection(io, hyper_service).await;
351                });
352            }
353        });
354
355        // Give server time to start
356        tokio::time::sleep(Duration::from_millis(100)).await;
357
358        // Create HTTP/2 client with prior knowledge
359        let client = HttpRequestClient::new().unwrap();
360
361        // Send request
362        let test_data = Bytes::from("test_payload");
363        let result = client
364            .send_request(
365                format!("http://{}/test", addr),
366                test_data.clone(),
367                std::collections::HashMap::new(),
368            )
369            .await;
370
371        // Verify request succeeded
372        assert!(result.is_ok(), "Request failed: {:?}", result.err());
373
374        // Verify server received the data
375        tokio::time::sleep(Duration::from_millis(100)).await;
376        let received = state.received.lock().await;
377        assert_eq!(received.len(), 1);
378        assert_eq!(received[0], test_data);
379
380        // Cleanup
381        server_handle.abort();
382    }
383
384    #[tokio::test]
385    async fn test_http2_headers_propagation() {
386        use hyper_util::rt::{TokioExecutor, TokioIo};
387        use hyper_util::server::conn::auto::Builder as ConnBuilder;
388        use hyper_util::service::TowerToHyperService;
389
390        // Create a test server that captures headers
391        #[derive(Clone)]
392        struct HeaderState {
393            headers: Arc<TokioMutex<Vec<(String, String)>>>,
394        }
395
396        async fn header_handler(
397            AxumState(state): AxumState<HeaderState>,
398            headers: axum::http::HeaderMap,
399        ) -> &'static str {
400            let mut captured = state.headers.lock().await;
401            for (name, value) in headers.iter() {
402                if let Ok(val_str) = value.to_str() {
403                    captured.push((name.to_string(), val_str.to_string()));
404                }
405            }
406            "OK"
407        }
408
409        let state = HeaderState {
410            headers: Arc::new(TokioMutex::new(Vec::new())),
411        };
412
413        let app = Router::new()
414            .route("/test", post(header_handler))
415            .with_state(state.clone());
416
417        // Bind to a random port
418        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
419        let addr = listener.local_addr().unwrap();
420
421        // Start HTTP/2 server
422        let server_handle = tokio::spawn(async move {
423            loop {
424                let Ok((stream, _)) = listener.accept().await else {
425                    break;
426                };
427
428                let app = app.clone();
429                tokio::spawn(async move {
430                    let conn_builder = ConnBuilder::new(TokioExecutor::new());
431                    let io = TokioIo::new(stream);
432                    let tower_service = app.into_service();
433                    let hyper_service = TowerToHyperService::new(tower_service);
434
435                    let _ = conn_builder.serve_connection(io, hyper_service).await;
436                });
437            }
438        });
439
440        // Give server time to start
441        tokio::time::sleep(Duration::from_millis(100)).await;
442
443        // Create HTTP/2 client
444        let client = HttpRequestClient::new().unwrap();
445
446        // Send request with custom headers
447        let mut headers = std::collections::HashMap::new();
448        headers.insert("x-test-header".to_string(), "test-value".to_string());
449        headers.insert("x-request-id".to_string(), "req-123".to_string());
450
451        let result = client
452            .send_request(
453                format!("http://{}/test", addr),
454                Bytes::from("test"),
455                headers,
456            )
457            .await;
458
459        // Verify request succeeded
460        assert!(result.is_ok());
461
462        // Verify headers were received
463        tokio::time::sleep(Duration::from_millis(100)).await;
464        let received_headers = state.headers.lock().await;
465
466        let header_map: std::collections::HashMap<_, _> = received_headers
467            .iter()
468            .map(|(k, v)| (k.as_str(), v.as_str()))
469            .collect();
470
471        assert!(header_map.contains_key("x-test-header"));
472        assert_eq!(header_map.get("x-test-header"), Some(&"test-value"));
473        assert!(header_map.contains_key("x-request-id"));
474        assert_eq!(header_map.get("x-request-id"), Some(&"req-123"));
475
476        // Cleanup
477        server_handle.abort();
478    }
479
480    #[tokio::test]
481    async fn test_http2_concurrent_requests() {
482        use hyper_util::rt::{TokioExecutor, TokioIo};
483        use hyper_util::server::conn::auto::Builder as ConnBuilder;
484        use hyper_util::service::TowerToHyperService;
485        use std::sync::atomic::{AtomicU64, Ordering};
486
487        // Create a test server that counts requests
488        #[derive(Clone)]
489        struct CounterState {
490            count: Arc<AtomicU64>,
491        }
492
493        async fn counter_handler(AxumState(state): AxumState<CounterState>) -> String {
494            let count = state.count.fetch_add(1, Ordering::SeqCst);
495            format!("{}", count)
496        }
497
498        let state = CounterState {
499            count: Arc::new(AtomicU64::new(0)),
500        };
501
502        let app = Router::new()
503            .route("/test", post(counter_handler))
504            .with_state(state.clone());
505
506        // Bind to a random port
507        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
508        let addr = listener.local_addr().unwrap();
509
510        // Start HTTP/2 server
511        let server_handle = tokio::spawn(async move {
512            loop {
513                let Ok((stream, _)) = listener.accept().await else {
514                    break;
515                };
516
517                let app = app.clone();
518                tokio::spawn(async move {
519                    let conn_builder = ConnBuilder::new(TokioExecutor::new());
520                    let io = TokioIo::new(stream);
521                    let tower_service = app.into_service();
522                    let hyper_service = TowerToHyperService::new(tower_service);
523
524                    let _ = conn_builder.serve_connection(io, hyper_service).await;
525                });
526            }
527        });
528
529        // Give server time to start
530        tokio::time::sleep(Duration::from_millis(100)).await;
531
532        // Create HTTP/2 client
533        let client = Arc::new(HttpRequestClient::new().unwrap());
534
535        // Send multiple concurrent requests (HTTP/2 multiplexing)
536        let mut handles = vec![];
537        for _ in 0..10 {
538            let client = client.clone();
539            let handle = tokio::spawn(async move {
540                client
541                    .send_request(
542                        format!("http://{}/test", addr),
543                        Bytes::from("test"),
544                        std::collections::HashMap::new(),
545                    )
546                    .await
547            });
548            handles.push(handle);
549        }
550
551        // Wait for all requests to complete
552        let mut success_count = 0;
553        for handle in handles {
554            if let Ok(Ok(_)) = handle.await {
555                success_count += 1;
556            }
557        }
558
559        // Verify all requests succeeded
560        assert_eq!(success_count, 10);
561
562        // Verify server received all requests
563        assert_eq!(state.count.load(Ordering::SeqCst), 10);
564
565        // Cleanup
566        server_handle.abort();
567    }
568
569    #[tokio::test]
570    async fn test_http2_performance_benchmark() {
571        use hyper_util::rt::{TokioExecutor, TokioIo};
572        use hyper_util::server::conn::auto::Builder as ConnBuilder;
573        use hyper_util::service::TowerToHyperService;
574        use std::sync::atomic::{AtomicU64, Ordering};
575        use std::time::Instant;
576
577        // Create a test server that measures performance
578        #[derive(Clone)]
579        struct PerfState {
580            request_count: Arc<AtomicU64>,
581            total_bytes: Arc<AtomicU64>,
582        }
583
584        async fn perf_handler(
585            AxumState(state): AxumState<PerfState>,
586            body: AxumBytes,
587        ) -> &'static str {
588            state.request_count.fetch_add(1, Ordering::Relaxed);
589            state
590                .total_bytes
591                .fetch_add(body.len() as u64, Ordering::Relaxed);
592            "OK"
593        }
594
595        let state = PerfState {
596            request_count: Arc::new(AtomicU64::new(0)),
597            total_bytes: Arc::new(AtomicU64::new(0)),
598        };
599
600        let app = Router::new()
601            .route("/perf", post(perf_handler))
602            .with_state(state.clone());
603
604        // Bind to a random port
605        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
606        let addr = listener.local_addr().unwrap();
607
608        // Start HTTP/2 server
609        let server_handle = tokio::spawn(async move {
610            loop {
611                let Ok((stream, _)) = listener.accept().await else {
612                    break;
613                };
614
615                let app = app.clone();
616                tokio::spawn(async move {
617                    let conn_builder = ConnBuilder::new(TokioExecutor::new());
618                    let io = TokioIo::new(stream);
619                    let tower_service = app.into_service();
620                    let hyper_service = TowerToHyperService::new(tower_service);
621
622                    let _ = conn_builder.serve_connection(io, hyper_service).await;
623                });
624            }
625        });
626
627        // Give server time to start
628        tokio::time::sleep(Duration::from_millis(100)).await;
629
630        // Create optimized HTTP/2 client
631        let optimized_config = Http2Config {
632            max_frame_size: 1024 * 1024, // 1MB frames
633            max_concurrent_streams: 1000,
634            pool_max_idle_per_host: 100,
635            pool_idle_timeout: Duration::from_secs(90),
636            keep_alive_interval: Duration::from_secs(30),
637            keep_alive_timeout: Duration::from_secs(10),
638            adaptive_window: true,
639            request_timeout: Duration::from_secs(30),
640        };
641
642        let client = Arc::new(HttpRequestClient::with_config(optimized_config).unwrap());
643
644        // Performance test: Send many concurrent requests
645        let num_requests = 100;
646        let payload_size = 64 * 1024; // 64KB payload
647        let payload = Bytes::from(vec![0u8; payload_size]);
648
649        let start_time = Instant::now();
650        let mut handles = vec![];
651
652        for _ in 0..num_requests {
653            let client = client.clone();
654            let payload = payload.clone();
655
656            let handle = tokio::spawn(async move {
657                let headers = std::collections::HashMap::new();
658                client
659                    .send_request(format!("http://{}/perf", addr), payload, headers)
660                    .await
661            });
662            handles.push(handle);
663        }
664
665        // Wait for all requests to complete
666        let mut successful_requests = 0;
667        for handle in handles {
668            if handle.await.unwrap().is_ok() {
669                successful_requests += 1;
670            }
671        }
672
673        let elapsed = start_time.elapsed();
674        let requests_per_sec = successful_requests as f64 / elapsed.as_secs_f64();
675        let throughput_mbps =
676            (successful_requests * payload_size) as f64 / elapsed.as_secs_f64() / (1024.0 * 1024.0);
677
678        println!("Performance Results:");
679        println!(
680            "  Successful requests: {}/{}",
681            successful_requests, num_requests
682        );
683        println!("  Total time: {:?}", elapsed);
684        println!("  Requests/sec: {:.2}", requests_per_sec);
685        println!("  Throughput: {:.2} MB/s", throughput_mbps);
686
687        // Verify server received all requests
688        let server_count = state.request_count.load(Ordering::Relaxed);
689        let server_bytes = state.total_bytes.load(Ordering::Relaxed);
690
691        assert_eq!(server_count, successful_requests as u64);
692        assert_eq!(server_bytes, (successful_requests * payload_size) as u64);
693
694        // Performance assertions (adjust based on your requirements)
695        assert!(successful_requests >= num_requests * 95 / 100); // At least 95% success rate
696        assert!(requests_per_sec > 50.0); // At least 50 requests per second
697        assert!(throughput_mbps > 10.0); // At least 10 MB/s throughput
698
699        // Cleanup
700        server_handle.abort();
701    }
702}