Skip to main content

harrow_server_monoio/
lib.rs

1//! Monoio-based HTTP/1.1 and HTTP/2 server for Harrow.
2//!
3//! This crate provides a high-performance HTTP server using io_uring.
4//! It supports HTTP/1.1 with keep-alive and chunked transfer encoding,
5//! and HTTP/2 with multiplexed streams.
6//!
7//! # Features
8//!
9//! - **io_uring-based I/O**: Zero-copy where possible, minimal syscalls
10//! - **Cancellation Safety**: Proper handling of io_uring operation cancellation
11//! - **Buffer Pooling**: Reusable buffers to reduce allocator pressure
12//! - **HTTP/2 Support**: Multiplexed streams with flow control
13//!
14//! # Architecture
15//!
16//! ```text
17//! ┌─────────────────────────────────────────────┐
18//! │              Server (lib.rs)                │
19//! └─────────────────────┬───────────────────────┘
20//!                       │ TcpStream
21//!                       ▼
22//! ┌─────────────────────────────────────────────┐
23//! │           Connection Handler                │
24//! │            (connection.rs)                  │
25//! └─────────────────────┬───────────────────────┘
26//!                       │
27//!           ┌───────────┴───────────┐
28//!           ▼                       ▼
29//! ┌─────────────────┐   ┌─────────────────────┐
30//! │   H1 Handler    │   │   H2 Handler        │
31//! │(h1/dispatcher.rs)│  │    (h2.rs)          │
32//! └─────────────────┘   └─────────────────────┘
33//! ```
34//!
35//! # Example
36//!
37//! ```ignore
38//! fn main() {
39//!     // High-level thread-per-core bootstrap.
40//!     harrow_server_monoio::run(
41//!         || App::new().get("/hello", hello),
42//!         "127.0.0.1:3000".parse().unwrap(),
43//!     )
44//!     .unwrap();
45//! }
46//! ```
47//!
48//! For advanced cases where you already own a monoio runtime, use the async
49//! `serve` / `serve_with_shutdown` / `serve_with_config` entrypoints instead.
50//!
51//! # Cancellation Safety
52//!
53//! This crate uses io_uring for async I/O. Unlike epoll-based runtimes,
54//! io_uring submits actual kernel operations. Dropping a Rust future does
55//! NOT automatically cancel the in-flight kernel operation.
56//!
57//! This can lead to use-after-free (UAF) vulnerabilities:
58//! 1. A read operation is submitted with a user buffer
59//! 2. The future is dropped (e.g., due to timeout)
60//! 3. The kernel writes to the buffer after it's been freed/reused
61//!
62//! ## Mitigation
63//!
64//! All I/O operations with timeout paths use `CancelableAsyncReadRent` and
65//! explicitly cancel kernel operations before returning:
66//!
67//! ```rust,ignore
68//! let canceller = Canceller::new();
69//! let handle = canceller.handle();
70//!
71//! monoio::select! {
72//!     result = stream.cancelable_read(buf, handle) => result,
73//!     _ = timeout => {
74//!         canceller.cancel(); // Explicit kernel cancellation
75//!         // Await the operation to reclaim buffer
76//!         let (_, buf) = read_fut.await;
77//!         release_buffer(buf);
78//!         return Err(Timeout);
79//!     }
80//! }
81//! ```
82//!
83//! See `cancel.rs` for the implementation details.
84
85mod buffer;
86mod cancel;
87use harrow_codec_h1 as codec;
88mod connection;
89mod h1;
90mod h2;
91/// Kernel version and io_uring availability checks.
92pub mod kernel_check;
93mod o11y;
94mod protocol;
95
96use std::cell::Cell;
97use std::error::Error;
98use std::future::Future;
99use std::io;
100use std::net::SocketAddr;
101use std::rc::Rc;
102use std::sync::atomic::{AtomicBool, Ordering};
103use std::sync::{Arc, mpsc};
104use std::thread;
105use std::time::Duration;
106
107use monoio::net::{ListenerOpts, TcpListener};
108
109use harrow_core::dispatch::SharedState;
110use harrow_core::route::App;
111
112use connection::ProtocolVersion;
113
114type BoxError = Box<dyn Error + Send + Sync>;
115
116/// Configuration for the monoio server.
117#[derive(Debug, Clone, Copy)]
118pub struct ServerConfig {
119    /// Maximum number of concurrent connections. Default: 8192.
120    pub max_connections: usize,
121    /// Maximum concurrent HTTP/2 streams per connection. Default: 256.
122    pub max_h2_streams: u32,
123    /// Total worker threads for the high-level `run` / `start` APIs.
124    ///
125    /// `None` defaults to `std::thread::available_parallelism()`. The async
126    /// `serve*` APIs always run on a single monoio runtime and will reject
127    /// values greater than 1.
128    pub workers: Option<usize>,
129    /// Timeout for reading HTTP headers from a new connection. Default: Some(5s).
130    pub header_read_timeout: Option<Duration>,
131    /// Timeout for reading request bodies after headers are complete. Default: Some(30s).
132    pub body_read_timeout: Option<Duration>,
133    /// Maximum lifetime of a single connection. Default: Some(5 min).
134    pub connection_timeout: Option<Duration>,
135    /// Time to wait for in-flight requests to complete during shutdown. Default: 30s.
136    pub drain_timeout: Duration,
137    /// Enable HTTP/2 support (prior knowledge). Default: false.
138    ///
139    /// When enabled, connections are assumed to use HTTP/2 directly
140    /// (no protocol negotiation). This is suitable for internal services
141    /// or load balancers that route H2 traffic to dedicated ports.
142    pub enable_http2: bool,
143}
144
145impl Default for ServerConfig {
146    fn default() -> Self {
147        Self {
148            max_connections: 8192,
149            max_h2_streams: 256,
150            workers: None,
151            header_read_timeout: Some(Duration::from_secs(5)),
152            body_read_timeout: Some(Duration::from_secs(30)),
153            connection_timeout: Some(Duration::from_secs(300)),
154            drain_timeout: Duration::from_secs(30),
155            enable_http2: false,
156        }
157    }
158}
159
160/// Handle returned by the high-level monoio bootstrap APIs.
161///
162/// Dropping the handle signals shutdown and waits for all workers to exit.
163pub struct ServerHandle {
164    addr: SocketAddr,
165    shutdown: Arc<AtomicBool>,
166    completion: mpsc::Receiver<Result<(), String>>,
167    workers: Vec<thread::JoinHandle<Result<(), BoxError>>>,
168}
169
170impl ServerHandle {
171    /// The socket address the server bound to.
172    pub fn local_addr(&self) -> SocketAddr {
173        self.addr
174    }
175
176    /// Signal shutdown and wait for all workers to exit.
177    pub fn shutdown(mut self) -> Result<(), Box<dyn Error>> {
178        self.shutdown.store(true, Ordering::Release);
179        self.join_workers().map_err(into_public_error)
180    }
181
182    /// Wait for the workers to exit.
183    ///
184    /// This blocks until shutdown is triggered or a worker exits with an
185    /// unexpected error.
186    pub fn wait(mut self) -> Result<(), Box<dyn Error>> {
187        let _ = self.completion.recv();
188        self.shutdown.store(true, Ordering::Release);
189        self.join_workers().map_err(into_public_error)
190    }
191
192    fn join_workers(&mut self) -> Result<(), BoxError> {
193        let mut first_error: Option<BoxError> = None;
194
195        for worker in self.workers.drain(..) {
196            match worker.join() {
197                Ok(Ok(())) => {}
198                Ok(Err(err)) => {
199                    if first_error.is_none() {
200                        self.shutdown.store(true, Ordering::Release);
201                        first_error = Some(err);
202                    }
203                }
204                Err(panic) => {
205                    if first_error.is_none() {
206                        self.shutdown.store(true, Ordering::Release);
207                        first_error = Some(join_panic_error(panic));
208                    }
209                }
210            }
211        }
212
213        if let Some(err) = first_error {
214            Err(err)
215        } else {
216            Ok(())
217        }
218    }
219}
220
221impl Drop for ServerHandle {
222    fn drop(&mut self) {
223        self.shutdown.store(true, Ordering::Release);
224        for worker in self.workers.drain(..) {
225            let _ = worker.join();
226        }
227    }
228}
229
230/// Start the application using Harrow's thread-per-core monoio bootstrap.
231pub fn run<F>(make_app: F, addr: SocketAddr) -> Result<(), Box<dyn Error>>
232where
233    F: Fn() -> App + Send + Clone + 'static,
234{
235    run_with_config(make_app, addr, ServerConfig::default())
236}
237
238/// Start the application using Harrow's thread-per-core monoio bootstrap and
239/// block until shutdown.
240pub fn run_with_config<F>(
241    make_app: F,
242    addr: SocketAddr,
243    config: ServerConfig,
244) -> Result<(), Box<dyn Error>>
245where
246    F: Fn() -> App + Send + Clone + 'static,
247{
248    start_with_config(make_app, addr, config)?.wait()
249}
250
251/// Start the application using Harrow's thread-per-core monoio bootstrap and
252/// return a handle for shutdown / test control.
253pub fn start<F>(make_app: F, addr: SocketAddr) -> Result<ServerHandle, Box<dyn Error>>
254where
255    F: Fn() -> App + Send + Clone + 'static,
256{
257    start_with_config(make_app, addr, ServerConfig::default())
258}
259
260/// Start the application using Harrow's thread-per-core monoio bootstrap and
261/// return a handle for shutdown / test control.
262pub fn start_with_config<F>(
263    make_app: F,
264    addr: SocketAddr,
265    config: ServerConfig,
266) -> Result<ServerHandle, Box<dyn Error>>
267where
268    F: Fn() -> App + Send + Clone + 'static,
269{
270    // Fail fast on unsupported kernels.
271    if let Err(err) = kernel_check::check_kernel_version() {
272        return Err(Box::new(err));
273    }
274
275    let worker_count = resolved_worker_count(config.workers)?;
276    let worker_config = per_worker_config(config, worker_count);
277    let shutdown = Arc::new(AtomicBool::new(false));
278    let mut workers = Vec::with_capacity(worker_count);
279
280    let (completion_tx, completion_rx) = mpsc::channel();
281    let first_worker = spawn_worker(
282        make_app.clone(),
283        addr,
284        worker_config,
285        Arc::clone(&shutdown),
286        completion_tx.clone(),
287        true,
288    );
289    let bound_addr = match first_worker.startup.recv_timeout(Duration::from_secs(5)) {
290        Ok(Ok(bound_addr)) => bound_addr,
291        Ok(Err(err)) => {
292            shutdown.store(true, Ordering::Release);
293            let mut handle = ServerHandle {
294                addr,
295                shutdown,
296                completion: completion_rx,
297                workers: vec![first_worker.handle],
298            };
299            let _ = handle.join_workers();
300            return Err(into_public_error(err));
301        }
302        Err(err) => {
303            shutdown.store(true, Ordering::Release);
304            let mut handle = ServerHandle {
305                addr,
306                shutdown,
307                completion: completion_rx,
308                workers: vec![first_worker.handle],
309            };
310            let _ = handle.join_workers();
311            return Err(Box::new(io::Error::new(
312                io::ErrorKind::TimedOut,
313                format!("worker startup failed before reporting a bound address: {err}"),
314            )));
315        }
316    };
317    workers.push(first_worker.handle);
318
319    for _ in 1..worker_count {
320        let worker = spawn_worker(
321            make_app.clone(),
322            bound_addr,
323            worker_config,
324            Arc::clone(&shutdown),
325            completion_tx.clone(),
326            false,
327        );
328        match worker.startup.recv_timeout(Duration::from_secs(5)) {
329            Ok(Ok(_)) => workers.push(worker.handle),
330            Ok(Err(err)) => {
331                shutdown.store(true, Ordering::Release);
332                workers.push(worker.handle);
333                let mut handle = ServerHandle {
334                    addr: bound_addr,
335                    shutdown,
336                    completion: completion_rx,
337                    workers,
338                };
339                let _ = handle.join_workers();
340                return Err(into_public_error(err));
341            }
342            Err(err) => {
343                shutdown.store(true, Ordering::Release);
344                workers.push(worker.handle);
345                let mut handle = ServerHandle {
346                    addr: bound_addr,
347                    shutdown,
348                    completion: completion_rx,
349                    workers,
350                };
351                let _ = handle.join_workers();
352                return Err(Box::new(io::Error::new(
353                    io::ErrorKind::TimedOut,
354                    format!("worker startup timed out: {err}"),
355                )));
356            }
357        }
358    }
359
360    o11y::record_server_start(bound_addr, &config);
361
362    Ok(ServerHandle {
363        addr: bound_addr,
364        shutdown,
365        completion: completion_rx,
366        workers,
367    })
368}
369
370/// Serve the application on the given address using HTTP/1.1.
371///
372/// This is an async function intended to run inside a monoio runtime:
373///
374/// ```ignore
375/// fn main() {
376///     let mut rt = monoio::RuntimeBuilder::<monoio::FusionDriver>::new()
377///         .enable_timer()
378///         .build()
379///         .unwrap();
380///     rt.block_on(async {
381///         let app = App::new().get("/hello", hello);
382///         harrow_server_monoio::serve(app, addr).await.unwrap();
383///     });
384/// }
385/// ```
386pub async fn serve(app: App, addr: SocketAddr) -> Result<(), Box<dyn std::error::Error>> {
387    serve_with_config(
388        app,
389        addr,
390        futures_util::future::pending(),
391        ServerConfig::default(),
392    )
393    .await
394}
395
396/// Serve with a graceful shutdown signal.
397pub async fn serve_with_shutdown(
398    app: App,
399    addr: SocketAddr,
400    shutdown: impl Future<Output = ()>,
401) -> Result<(), Box<dyn std::error::Error>> {
402    serve_with_config(app, addr, shutdown, ServerConfig::default()).await
403}
404
405/// Serve with a graceful shutdown signal and custom configuration.
406///
407/// # Requirements
408/// This function requires Linux kernel 6.1+ for full io_uring support.
409/// It will fail fast with a clear error on older kernels.
410///
411/// # HTTP/2 Support
412/// Set `config.enable_http2 = true` to accept HTTP/2 connections with
413/// prior knowledge (direct H2 without upgrade/ALPN).
414pub async fn serve_with_config(
415    app: App,
416    addr: SocketAddr,
417    shutdown: impl Future<Output = ()>,
418    config: ServerConfig,
419) -> Result<(), Box<dyn std::error::Error>> {
420    if config.workers.is_some_and(|workers| workers > 1) {
421        return Err(Box::new(io::Error::new(
422            io::ErrorKind::InvalidInput,
423            "ServerConfig::workers > 1 requires harrow_server_monoio::run/start; async serve_with_config runs on a single monoio runtime",
424        )));
425    }
426
427    // Fail fast on unsupported kernels.
428    if let Err(e) = kernel_check::check_kernel_version() {
429        return Err(Box::new(e));
430    }
431
432    let shared = app.into_shared_state();
433
434    shared.route_table.print_routes();
435
436    let listener = TcpListener::bind_with_config(addr, &listener_options())?;
437    o11y::record_server_start(addr, &config);
438
439    serve_listener(shared, listener, shutdown, config)
440        .await
441        .map_err(into_public_error)
442}
443
444async fn serve_listener(
445    shared: Arc<SharedState>,
446    listener: TcpListener,
447    shutdown: impl Future<Output = ()>,
448    config: ServerConfig,
449) -> Result<(), BoxError> {
450    let active_count: Rc<Cell<usize>> = Rc::new(Cell::new(0));
451    let protocol = if config.enable_http2 {
452        ProtocolVersion::Http2PriorKnowledge
453    } else {
454        ProtocolVersion::Http11
455    };
456
457    let mut shutdown = std::pin::pin!(shutdown);
458
459    // Accept loop with graceful shutdown.
460    loop {
461        monoio::select! {
462            result = listener.accept() => {
463                let (stream, remote) = match result {
464                    Ok(conn) => conn,
465                    Err(e) => {
466                        o11y::record_accept_error(e);
467                        continue;
468                    }
469                };
470
471                // Disable Nagle's algorithm for lower latency.
472                if let Err(e) = stream.set_nodelay(true) {
473                    o11y::record_tcp_nodelay_error(e);
474                }
475
476                if active_count.get() >= config.max_connections {
477                    drop(stream);
478                    o11y::record_connection_limit_rejected(config.max_connections);
479                    continue;
480                }
481
482                let shared = Arc::clone(&shared);
483                let header_read_timeout = config.header_read_timeout;
484                let body_read_timeout = config.body_read_timeout;
485                let connection_timeout = config.connection_timeout;
486                let max_h2_streams = config.max_h2_streams;
487                let counter = Rc::clone(&active_count);
488
489                monoio::spawn(connection::handle_connection(
490                    stream,
491                    connection::ConnConfig {
492                        shared,
493                        remote_addr: Some(remote),
494                        header_read_timeout,
495                        body_read_timeout,
496                        connection_timeout,
497                        max_h2_streams,
498                        active_count: counter,
499                        protocol,
500                    },
501                ));
502            }
503            () = &mut shutdown => {
504                o11y::record_server_shutdown();
505                break;
506            }
507        }
508    }
509
510    // Graceful drain: wait for in-flight connections to complete.
511    let drain_start = std::time::Instant::now();
512    while active_count.get() > 0 {
513        if drain_start.elapsed() >= config.drain_timeout {
514            o11y::record_drain_timeout(config.drain_timeout.as_secs(), active_count.get());
515            break;
516        }
517        monoio::time::sleep(Duration::from_millis(10)).await;
518    }
519
520    o11y::record_drain_complete(active_count.get());
521
522    Ok(())
523}
524
525async fn wait_for_shutdown(shutdown: Arc<AtomicBool>) {
526    while !shutdown.load(Ordering::Acquire) {
527        monoio::time::sleep(Duration::from_millis(50)).await;
528    }
529}
530
531fn listener_options() -> ListenerOpts {
532    ListenerOpts::new().reuse_port(true).reuse_addr(true)
533}
534
535fn resolved_worker_count(workers: Option<usize>) -> Result<usize, Box<dyn Error>> {
536    match workers {
537        Some(0) => Err(Box::new(io::Error::new(
538            io::ErrorKind::InvalidInput,
539            "ServerConfig::workers must be greater than 0",
540        ))),
541        Some(workers) => Ok(workers),
542        None => Ok(thread::available_parallelism()
543            .map(|count| count.get())
544            .unwrap_or(1)),
545    }
546}
547
548fn per_worker_config(config: ServerConfig, workers: usize) -> ServerConfig {
549    let per_worker_max = config.max_connections.div_ceil(workers.max(1));
550    ServerConfig {
551        max_connections: per_worker_max.max(1),
552        workers: Some(1),
553        ..config
554    }
555}
556
557fn into_public_error(err: BoxError) -> Box<dyn Error> {
558    err
559}
560
561fn join_panic_error(panic: Box<dyn std::any::Any + Send + 'static>) -> BoxError {
562    let message = if let Some(message) = panic.downcast_ref::<&str>() {
563        format!("worker thread panicked: {message}")
564    } else if let Some(message) = panic.downcast_ref::<String>() {
565        format!("worker thread panicked: {message}")
566    } else {
567        "worker thread panicked".to_string()
568    };
569
570    Box::new(io::Error::other(message))
571}
572
573struct WorkerThread {
574    handle: thread::JoinHandle<Result<(), BoxError>>,
575    startup: mpsc::Receiver<Result<SocketAddr, BoxError>>,
576}
577
578fn spawn_worker<F>(
579    make_app: F,
580    addr: SocketAddr,
581    config: ServerConfig,
582    shutdown: Arc<AtomicBool>,
583    completion: mpsc::Sender<Result<(), String>>,
584    print_routes: bool,
585) -> WorkerThread
586where
587    F: Fn() -> App + Send + 'static,
588{
589    let (startup_tx, startup_rx) = mpsc::channel::<Result<SocketAddr, BoxError>>();
590    let handle = thread::spawn(move || {
591        let app = make_app();
592        let shared = app.into_shared_state();
593        if print_routes {
594            shared.route_table.print_routes();
595        }
596
597        let mut runtime = match monoio::RuntimeBuilder::<monoio::FusionDriver>::new()
598            .enable_timer()
599            .build()
600        {
601            Ok(runtime) => runtime,
602            Err(err) => {
603                let err: BoxError = Box::new(err);
604                let _ = startup_tx.send(Err(Box::new(io::Error::other(err.to_string()))));
605                return Err(err);
606            }
607        };
608
609        let result = runtime.block_on(async move {
610            let listener = match TcpListener::bind_with_config(addr, &listener_options()) {
611                Ok(listener) => listener,
612                Err(err) => {
613                    let err: BoxError = Box::new(err);
614                    let _ = startup_tx.send(Err(Box::new(io::Error::other(err.to_string()))));
615                    return Err(err);
616                }
617            };
618
619            let local_addr = match listener.local_addr() {
620                Ok(local_addr) => local_addr,
621                Err(err) => {
622                    let err: BoxError = Box::new(err);
623                    let _ = startup_tx.send(Err(Box::new(io::Error::other(err.to_string()))));
624                    return Err(err);
625                }
626            };
627
628            let _ = startup_tx.send(Ok(local_addr));
629            serve_listener(shared, listener, wait_for_shutdown(shutdown), config).await
630        });
631
632        let _ = completion.send(result.as_ref().map(|_| ()).map_err(|err| err.to_string()));
633        result
634    });
635
636    WorkerThread {
637        handle,
638        startup: startup_rx,
639    }
640}