1mod buffer;
86mod cancel;
87use harrow_codec_h1 as codec;
88mod connection;
89mod h1;
90mod h2;
91pub mod kernel_check;
93mod o11y;
94mod protocol;
95
96use std::cell::Cell;
97use std::error::Error;
98use std::future::Future;
99use std::io;
100use std::net::SocketAddr;
101use std::rc::Rc;
102use std::sync::atomic::{AtomicBool, Ordering};
103use std::sync::{Arc, mpsc};
104use std::thread;
105use std::time::Duration;
106
107use monoio::net::{ListenerOpts, TcpListener};
108
109use harrow_core::dispatch::SharedState;
110use harrow_core::route::App;
111
112use connection::ProtocolVersion;
113
114type BoxError = Box<dyn Error + Send + Sync>;
115
116#[derive(Debug, Clone, Copy)]
118pub struct ServerConfig {
119 pub max_connections: usize,
121 pub max_h2_streams: u32,
123 pub workers: Option<usize>,
129 pub header_read_timeout: Option<Duration>,
131 pub body_read_timeout: Option<Duration>,
133 pub connection_timeout: Option<Duration>,
135 pub drain_timeout: Duration,
137 pub enable_http2: bool,
143}
144
145impl Default for ServerConfig {
146 fn default() -> Self {
147 Self {
148 max_connections: 8192,
149 max_h2_streams: 256,
150 workers: None,
151 header_read_timeout: Some(Duration::from_secs(5)),
152 body_read_timeout: Some(Duration::from_secs(30)),
153 connection_timeout: Some(Duration::from_secs(300)),
154 drain_timeout: Duration::from_secs(30),
155 enable_http2: false,
156 }
157 }
158}
159
160pub struct ServerHandle {
164 addr: SocketAddr,
165 shutdown: Arc<AtomicBool>,
166 completion: mpsc::Receiver<Result<(), String>>,
167 workers: Vec<thread::JoinHandle<Result<(), BoxError>>>,
168}
169
170impl ServerHandle {
171 pub fn local_addr(&self) -> SocketAddr {
173 self.addr
174 }
175
176 pub fn shutdown(mut self) -> Result<(), Box<dyn Error>> {
178 self.shutdown.store(true, Ordering::Release);
179 self.join_workers().map_err(into_public_error)
180 }
181
182 pub fn wait(mut self) -> Result<(), Box<dyn Error>> {
187 let _ = self.completion.recv();
188 self.shutdown.store(true, Ordering::Release);
189 self.join_workers().map_err(into_public_error)
190 }
191
192 fn join_workers(&mut self) -> Result<(), BoxError> {
193 let mut first_error: Option<BoxError> = None;
194
195 for worker in self.workers.drain(..) {
196 match worker.join() {
197 Ok(Ok(())) => {}
198 Ok(Err(err)) => {
199 if first_error.is_none() {
200 self.shutdown.store(true, Ordering::Release);
201 first_error = Some(err);
202 }
203 }
204 Err(panic) => {
205 if first_error.is_none() {
206 self.shutdown.store(true, Ordering::Release);
207 first_error = Some(join_panic_error(panic));
208 }
209 }
210 }
211 }
212
213 if let Some(err) = first_error {
214 Err(err)
215 } else {
216 Ok(())
217 }
218 }
219}
220
221impl Drop for ServerHandle {
222 fn drop(&mut self) {
223 self.shutdown.store(true, Ordering::Release);
224 for worker in self.workers.drain(..) {
225 let _ = worker.join();
226 }
227 }
228}
229
230pub fn run<F>(make_app: F, addr: SocketAddr) -> Result<(), Box<dyn Error>>
232where
233 F: Fn() -> App + Send + Clone + 'static,
234{
235 run_with_config(make_app, addr, ServerConfig::default())
236}
237
238pub fn run_with_config<F>(
241 make_app: F,
242 addr: SocketAddr,
243 config: ServerConfig,
244) -> Result<(), Box<dyn Error>>
245where
246 F: Fn() -> App + Send + Clone + 'static,
247{
248 start_with_config(make_app, addr, config)?.wait()
249}
250
251pub fn start<F>(make_app: F, addr: SocketAddr) -> Result<ServerHandle, Box<dyn Error>>
254where
255 F: Fn() -> App + Send + Clone + 'static,
256{
257 start_with_config(make_app, addr, ServerConfig::default())
258}
259
260pub fn start_with_config<F>(
263 make_app: F,
264 addr: SocketAddr,
265 config: ServerConfig,
266) -> Result<ServerHandle, Box<dyn Error>>
267where
268 F: Fn() -> App + Send + Clone + 'static,
269{
270 if let Err(err) = kernel_check::check_kernel_version() {
272 return Err(Box::new(err));
273 }
274
275 let worker_count = resolved_worker_count(config.workers)?;
276 let worker_config = per_worker_config(config, worker_count);
277 let shutdown = Arc::new(AtomicBool::new(false));
278 let mut workers = Vec::with_capacity(worker_count);
279
280 let (completion_tx, completion_rx) = mpsc::channel();
281 let first_worker = spawn_worker(
282 make_app.clone(),
283 addr,
284 worker_config,
285 Arc::clone(&shutdown),
286 completion_tx.clone(),
287 true,
288 );
289 let bound_addr = match first_worker.startup.recv_timeout(Duration::from_secs(5)) {
290 Ok(Ok(bound_addr)) => bound_addr,
291 Ok(Err(err)) => {
292 shutdown.store(true, Ordering::Release);
293 let mut handle = ServerHandle {
294 addr,
295 shutdown,
296 completion: completion_rx,
297 workers: vec![first_worker.handle],
298 };
299 let _ = handle.join_workers();
300 return Err(into_public_error(err));
301 }
302 Err(err) => {
303 shutdown.store(true, Ordering::Release);
304 let mut handle = ServerHandle {
305 addr,
306 shutdown,
307 completion: completion_rx,
308 workers: vec![first_worker.handle],
309 };
310 let _ = handle.join_workers();
311 return Err(Box::new(io::Error::new(
312 io::ErrorKind::TimedOut,
313 format!("worker startup failed before reporting a bound address: {err}"),
314 )));
315 }
316 };
317 workers.push(first_worker.handle);
318
319 for _ in 1..worker_count {
320 let worker = spawn_worker(
321 make_app.clone(),
322 bound_addr,
323 worker_config,
324 Arc::clone(&shutdown),
325 completion_tx.clone(),
326 false,
327 );
328 match worker.startup.recv_timeout(Duration::from_secs(5)) {
329 Ok(Ok(_)) => workers.push(worker.handle),
330 Ok(Err(err)) => {
331 shutdown.store(true, Ordering::Release);
332 workers.push(worker.handle);
333 let mut handle = ServerHandle {
334 addr: bound_addr,
335 shutdown,
336 completion: completion_rx,
337 workers,
338 };
339 let _ = handle.join_workers();
340 return Err(into_public_error(err));
341 }
342 Err(err) => {
343 shutdown.store(true, Ordering::Release);
344 workers.push(worker.handle);
345 let mut handle = ServerHandle {
346 addr: bound_addr,
347 shutdown,
348 completion: completion_rx,
349 workers,
350 };
351 let _ = handle.join_workers();
352 return Err(Box::new(io::Error::new(
353 io::ErrorKind::TimedOut,
354 format!("worker startup timed out: {err}"),
355 )));
356 }
357 }
358 }
359
360 o11y::record_server_start(bound_addr, &config);
361
362 Ok(ServerHandle {
363 addr: bound_addr,
364 shutdown,
365 completion: completion_rx,
366 workers,
367 })
368}
369
370pub async fn serve(app: App, addr: SocketAddr) -> Result<(), Box<dyn std::error::Error>> {
387 serve_with_config(
388 app,
389 addr,
390 futures_util::future::pending(),
391 ServerConfig::default(),
392 )
393 .await
394}
395
396pub async fn serve_with_shutdown(
398 app: App,
399 addr: SocketAddr,
400 shutdown: impl Future<Output = ()>,
401) -> Result<(), Box<dyn std::error::Error>> {
402 serve_with_config(app, addr, shutdown, ServerConfig::default()).await
403}
404
405pub async fn serve_with_config(
415 app: App,
416 addr: SocketAddr,
417 shutdown: impl Future<Output = ()>,
418 config: ServerConfig,
419) -> Result<(), Box<dyn std::error::Error>> {
420 if config.workers.is_some_and(|workers| workers > 1) {
421 return Err(Box::new(io::Error::new(
422 io::ErrorKind::InvalidInput,
423 "ServerConfig::workers > 1 requires harrow_server_monoio::run/start; async serve_with_config runs on a single monoio runtime",
424 )));
425 }
426
427 if let Err(e) = kernel_check::check_kernel_version() {
429 return Err(Box::new(e));
430 }
431
432 let shared = app.into_shared_state();
433
434 shared.route_table.print_routes();
435
436 let listener = TcpListener::bind_with_config(addr, &listener_options())?;
437 o11y::record_server_start(addr, &config);
438
439 serve_listener(shared, listener, shutdown, config)
440 .await
441 .map_err(into_public_error)
442}
443
444async fn serve_listener(
445 shared: Arc<SharedState>,
446 listener: TcpListener,
447 shutdown: impl Future<Output = ()>,
448 config: ServerConfig,
449) -> Result<(), BoxError> {
450 let active_count: Rc<Cell<usize>> = Rc::new(Cell::new(0));
451 let protocol = if config.enable_http2 {
452 ProtocolVersion::Http2PriorKnowledge
453 } else {
454 ProtocolVersion::Http11
455 };
456
457 let mut shutdown = std::pin::pin!(shutdown);
458
459 loop {
461 monoio::select! {
462 result = listener.accept() => {
463 let (stream, remote) = match result {
464 Ok(conn) => conn,
465 Err(e) => {
466 o11y::record_accept_error(e);
467 continue;
468 }
469 };
470
471 if let Err(e) = stream.set_nodelay(true) {
473 o11y::record_tcp_nodelay_error(e);
474 }
475
476 if active_count.get() >= config.max_connections {
477 drop(stream);
478 o11y::record_connection_limit_rejected(config.max_connections);
479 continue;
480 }
481
482 let shared = Arc::clone(&shared);
483 let header_read_timeout = config.header_read_timeout;
484 let body_read_timeout = config.body_read_timeout;
485 let connection_timeout = config.connection_timeout;
486 let max_h2_streams = config.max_h2_streams;
487 let counter = Rc::clone(&active_count);
488
489 monoio::spawn(connection::handle_connection(
490 stream,
491 connection::ConnConfig {
492 shared,
493 remote_addr: Some(remote),
494 header_read_timeout,
495 body_read_timeout,
496 connection_timeout,
497 max_h2_streams,
498 active_count: counter,
499 protocol,
500 },
501 ));
502 }
503 () = &mut shutdown => {
504 o11y::record_server_shutdown();
505 break;
506 }
507 }
508 }
509
510 let drain_start = std::time::Instant::now();
512 while active_count.get() > 0 {
513 if drain_start.elapsed() >= config.drain_timeout {
514 o11y::record_drain_timeout(config.drain_timeout.as_secs(), active_count.get());
515 break;
516 }
517 monoio::time::sleep(Duration::from_millis(10)).await;
518 }
519
520 o11y::record_drain_complete(active_count.get());
521
522 Ok(())
523}
524
525async fn wait_for_shutdown(shutdown: Arc<AtomicBool>) {
526 while !shutdown.load(Ordering::Acquire) {
527 monoio::time::sleep(Duration::from_millis(50)).await;
528 }
529}
530
531fn listener_options() -> ListenerOpts {
532 ListenerOpts::new().reuse_port(true).reuse_addr(true)
533}
534
535fn resolved_worker_count(workers: Option<usize>) -> Result<usize, Box<dyn Error>> {
536 match workers {
537 Some(0) => Err(Box::new(io::Error::new(
538 io::ErrorKind::InvalidInput,
539 "ServerConfig::workers must be greater than 0",
540 ))),
541 Some(workers) => Ok(workers),
542 None => Ok(thread::available_parallelism()
543 .map(|count| count.get())
544 .unwrap_or(1)),
545 }
546}
547
548fn per_worker_config(config: ServerConfig, workers: usize) -> ServerConfig {
549 let per_worker_max = config.max_connections.div_ceil(workers.max(1));
550 ServerConfig {
551 max_connections: per_worker_max.max(1),
552 workers: Some(1),
553 ..config
554 }
555}
556
557fn into_public_error(err: BoxError) -> Box<dyn Error> {
558 err
559}
560
561fn join_panic_error(panic: Box<dyn std::any::Any + Send + 'static>) -> BoxError {
562 let message = if let Some(message) = panic.downcast_ref::<&str>() {
563 format!("worker thread panicked: {message}")
564 } else if let Some(message) = panic.downcast_ref::<String>() {
565 format!("worker thread panicked: {message}")
566 } else {
567 "worker thread panicked".to_string()
568 };
569
570 Box::new(io::Error::other(message))
571}
572
573struct WorkerThread {
574 handle: thread::JoinHandle<Result<(), BoxError>>,
575 startup: mpsc::Receiver<Result<SocketAddr, BoxError>>,
576}
577
578fn spawn_worker<F>(
579 make_app: F,
580 addr: SocketAddr,
581 config: ServerConfig,
582 shutdown: Arc<AtomicBool>,
583 completion: mpsc::Sender<Result<(), String>>,
584 print_routes: bool,
585) -> WorkerThread
586where
587 F: Fn() -> App + Send + 'static,
588{
589 let (startup_tx, startup_rx) = mpsc::channel::<Result<SocketAddr, BoxError>>();
590 let handle = thread::spawn(move || {
591 let app = make_app();
592 let shared = app.into_shared_state();
593 if print_routes {
594 shared.route_table.print_routes();
595 }
596
597 let mut runtime = match monoio::RuntimeBuilder::<monoio::FusionDriver>::new()
598 .enable_timer()
599 .build()
600 {
601 Ok(runtime) => runtime,
602 Err(err) => {
603 let err: BoxError = Box::new(err);
604 let _ = startup_tx.send(Err(Box::new(io::Error::other(err.to_string()))));
605 return Err(err);
606 }
607 };
608
609 let result = runtime.block_on(async move {
610 let listener = match TcpListener::bind_with_config(addr, &listener_options()) {
611 Ok(listener) => listener,
612 Err(err) => {
613 let err: BoxError = Box::new(err);
614 let _ = startup_tx.send(Err(Box::new(io::Error::other(err.to_string()))));
615 return Err(err);
616 }
617 };
618
619 let local_addr = match listener.local_addr() {
620 Ok(local_addr) => local_addr,
621 Err(err) => {
622 let err: BoxError = Box::new(err);
623 let _ = startup_tx.send(Err(Box::new(io::Error::other(err.to_string()))));
624 return Err(err);
625 }
626 };
627
628 let _ = startup_tx.send(Ok(local_addr));
629 serve_listener(shared, listener, wait_for_shutdown(shutdown), config).await
630 });
631
632 let _ = completion.send(result.as_ref().map(|_| ()).map_err(|err| err.to_string()));
633 result
634 });
635
636 WorkerThread {
637 handle,
638 startup: startup_rx,
639 }
640}