Skip to main content

fastapi_http/
server.rs

1//! HTTP server with asupersync integration.
2//!
3//! This module provides a TCP server that uses asupersync for structured
4//! concurrency and cancel-correct request handling.
5//!
6// NOTE: This module is scaffolding for Phase 1 TCP server implementation.
7// Most types are defined but not yet wired into the main application.
8#![allow(dead_code)]
9//!
10//! # Architecture
11//!
12//! The server creates a region hierarchy:
13//!
14//! ```text
15//! Server Region (root)
16//! ├── Connection Region 1
17//! │   ├── Request Task 1 (with Cx, Budget)
18//! │   ├── Request Task 2 (with Cx, Budget)
19//! │   └── ...
20//! ├── Connection Region 2
21//! │   └── ...
22//! └── ...
23//! ```
24//!
25//! Each request runs with its own [`RequestContext`](fastapi_core::RequestContext)
26//! that wraps the asupersync [`Cx`](asupersync::Cx), providing:
27//!
28//! - Cancel-correct request handling via checkpoints
29//! - Budget-based request timeouts
30//! - Structured concurrency for background work
31//!
32//! # Example
33//!
34//! ```ignore
35//! use fastapi_http::TcpServer;
36//! use fastapi_core::{RequestContext, Request, Response};
37//!
38//! async fn handler(ctx: &RequestContext, req: Request) -> Response {
39//!     Response::ok().body("Hello, World!")
40//! }
41//!
42//! let config = ServerConfig::new("127.0.0.1:8080");
43//! let server = TcpServer::new(config);
44//! server.serve(handler).await?;
45//! ```
46
47use crate::connection::should_keep_alive;
48use crate::expect::{CONTINUE_RESPONSE, ExpectHandler, ExpectResult};
49use crate::parser::{ParseError, ParseLimits, ParseStatus, Parser, StatefulParser};
50use crate::response::{ResponseWrite, ResponseWriter};
51use asupersync::io::{AsyncRead, AsyncWrite, ReadBuf};
52use asupersync::net::{TcpListener, TcpStream};
53use asupersync::runtime::{RuntimeState, SpawnError, TaskHandle};
54use asupersync::signal::{GracefulOutcome, ShutdownController, ShutdownReceiver};
55use asupersync::stream::Stream;
56use asupersync::time::timeout;
57use asupersync::{Budget, Cx, Scope, Time};
58use fastapi_core::app::App;
59use fastapi_core::{Request, RequestContext, Response, StatusCode};
60use std::future::Future;
61use std::io;
62use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
63use std::pin::Pin;
64use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
65use std::sync::{Arc, Mutex, OnceLock};
66use std::task::Poll;
67use std::time::{Duration, Instant};
68
69/// Global start time for computing asupersync Time values.
70/// This is lazily initialized on first use.
71static START_TIME: OnceLock<Instant> = OnceLock::new();
72
73/// Returns the current time as an asupersync Time value.
74///
75/// This uses wall clock time relative to a lazily-initialized start point,
76/// which is compatible with asupersync's standalone timer mechanism.
77fn current_time() -> Time {
78    let start = START_TIME.get_or_init(Instant::now);
79    let now = Instant::now();
80    if now < *start {
81        Time::ZERO
82    } else {
83        let elapsed = now.duration_since(*start);
84        Time::from_nanos(elapsed.as_nanos() as u64)
85    }
86}
87
88/// Default request timeout in seconds.
89pub const DEFAULT_REQUEST_TIMEOUT_SECS: u64 = 30;
90
91/// Default read buffer size in bytes.
92pub const DEFAULT_READ_BUFFER_SIZE: usize = 8192;
93
94/// Default maximum connections (0 = unlimited).
95pub const DEFAULT_MAX_CONNECTIONS: usize = 0;
96
97/// Default keep-alive timeout in seconds (time to wait for next request).
98pub const DEFAULT_KEEP_ALIVE_TIMEOUT_SECS: u64 = 75;
99
100/// Default max requests per connection (0 = unlimited).
101pub const DEFAULT_MAX_REQUESTS_PER_CONNECTION: usize = 100;
102
103/// Default drain timeout in seconds (time to wait for in-flight requests on shutdown).
104pub const DEFAULT_DRAIN_TIMEOUT_SECS: u64 = 30;
105
106/// Server configuration for the HTTP/1.1 server.
107///
108/// Controls bind address, timeouts, connection limits, and HTTP parsing behavior.
109/// All timeouts use sensible defaults suitable for production use.
110///
111/// # Defaults
112///
113/// | Setting | Default |
114/// |---------|---------|
115/// | `request_timeout` | 30s |
116/// | `max_connections` | 0 (unlimited) |
117/// | `read_buffer_size` | 8192 bytes |
118/// | `tcp_nodelay` | `true` |
119/// | `keep_alive_timeout` | 75s |
120/// | `max_requests_per_connection` | 100 |
121/// | `drain_timeout` | 30s |
122///
123/// # Example
124///
125/// ```ignore
126/// use fastapi_http::{ServerConfig, serve_with_config};
127///
128/// let config = ServerConfig::new("0.0.0.0:8000")
129///     .with_request_timeout_secs(60)
130///     .with_max_connections(1000)
131///     .with_keep_alive_timeout_secs(120);
132/// ```
133#[derive(Debug, Clone)]
134pub struct ServerConfig {
135    /// Address to bind to.
136    pub bind_addr: String,
137    /// Default request timeout.
138    pub request_timeout: Time,
139    /// Maximum connections (0 = unlimited).
140    pub max_connections: usize,
141    /// Read buffer size.
142    pub read_buffer_size: usize,
143    /// HTTP parse limits.
144    pub parse_limits: ParseLimits,
145    /// Allowed hostnames for Host header validation (empty = allow all).
146    pub allowed_hosts: Vec<String>,
147    /// Whether to trust X-Forwarded-Host for host validation.
148    pub trust_x_forwarded_host: bool,
149    /// Enable TCP_NODELAY.
150    pub tcp_nodelay: bool,
151    /// Keep-alive timeout (time to wait for next request on a connection).
152    /// Set to 0 to disable keep-alive timeout.
153    pub keep_alive_timeout: Duration,
154    /// Maximum requests per connection (0 = unlimited).
155    pub max_requests_per_connection: usize,
156    /// Drain timeout (time to wait for in-flight requests on shutdown).
157    /// After this timeout, connections are forcefully closed.
158    pub drain_timeout: Duration,
159}
160
161impl ServerConfig {
162    /// Creates a new server configuration with the given bind address.
163    #[must_use]
164    pub fn new(bind_addr: impl Into<String>) -> Self {
165        Self {
166            bind_addr: bind_addr.into(),
167            request_timeout: Time::from_secs(DEFAULT_REQUEST_TIMEOUT_SECS),
168            max_connections: DEFAULT_MAX_CONNECTIONS,
169            read_buffer_size: DEFAULT_READ_BUFFER_SIZE,
170            parse_limits: ParseLimits::default(),
171            allowed_hosts: Vec::new(),
172            trust_x_forwarded_host: false,
173            tcp_nodelay: true,
174            keep_alive_timeout: Duration::from_secs(DEFAULT_KEEP_ALIVE_TIMEOUT_SECS),
175            max_requests_per_connection: DEFAULT_MAX_REQUESTS_PER_CONNECTION,
176            drain_timeout: Duration::from_secs(DEFAULT_DRAIN_TIMEOUT_SECS),
177        }
178    }
179
180    /// Sets the request timeout.
181    #[must_use]
182    pub fn with_request_timeout(mut self, timeout: Time) -> Self {
183        self.request_timeout = timeout;
184        self
185    }
186
187    /// Sets the request timeout in seconds.
188    #[must_use]
189    pub fn with_request_timeout_secs(mut self, secs: u64) -> Self {
190        self.request_timeout = Time::from_secs(secs);
191        self
192    }
193
194    /// Sets the maximum number of connections.
195    #[must_use]
196    pub fn with_max_connections(mut self, max: usize) -> Self {
197        self.max_connections = max;
198        self
199    }
200
201    /// Sets the read buffer size.
202    #[must_use]
203    pub fn with_read_buffer_size(mut self, size: usize) -> Self {
204        self.read_buffer_size = size;
205        self
206    }
207
208    /// Sets the HTTP parse limits.
209    #[must_use]
210    pub fn with_parse_limits(mut self, limits: ParseLimits) -> Self {
211        self.parse_limits = limits;
212        self
213    }
214
215    /// Sets allowed hosts for Host header validation.
216    ///
217    /// An empty list means "allow any host".
218    /// Patterns are normalized to lowercase for case-insensitive matching.
219    #[must_use]
220    pub fn with_allowed_hosts<I, S>(mut self, hosts: I) -> Self
221    where
222        I: IntoIterator<Item = S>,
223        S: Into<String>,
224    {
225        // Pre-lowercase patterns to avoid allocation during matching
226        self.allowed_hosts = hosts
227            .into_iter()
228            .map(|s| s.into().to_ascii_lowercase())
229            .collect();
230        self
231    }
232
233    /// Adds a single allowed host.
234    ///
235    /// The pattern is normalized to lowercase for case-insensitive matching.
236    #[must_use]
237    pub fn allow_host(mut self, host: impl Into<String>) -> Self {
238        // Pre-lowercase pattern to avoid allocation during matching
239        self.allowed_hosts.push(host.into().to_ascii_lowercase());
240        self
241    }
242
243    /// Enables or disables trust of X-Forwarded-Host.
244    #[must_use]
245    pub fn with_trust_x_forwarded_host(mut self, trust: bool) -> Self {
246        self.trust_x_forwarded_host = trust;
247        self
248    }
249
250    /// Enables or disables TCP_NODELAY.
251    #[must_use]
252    pub fn with_tcp_nodelay(mut self, enabled: bool) -> Self {
253        self.tcp_nodelay = enabled;
254        self
255    }
256
257    /// Sets the keep-alive timeout.
258    ///
259    /// This is the time to wait for another request on a keep-alive connection
260    /// before closing it. Set to Duration::ZERO to disable keep-alive timeout.
261    #[must_use]
262    pub fn with_keep_alive_timeout(mut self, timeout: Duration) -> Self {
263        self.keep_alive_timeout = timeout;
264        self
265    }
266
267    /// Sets the keep-alive timeout in seconds.
268    #[must_use]
269    pub fn with_keep_alive_timeout_secs(mut self, secs: u64) -> Self {
270        self.keep_alive_timeout = Duration::from_secs(secs);
271        self
272    }
273
274    /// Sets the maximum requests per connection.
275    ///
276    /// Set to 0 for unlimited requests per connection.
277    #[must_use]
278    pub fn with_max_requests_per_connection(mut self, max: usize) -> Self {
279        self.max_requests_per_connection = max;
280        self
281    }
282
283    /// Sets the drain timeout.
284    ///
285    /// This is the time to wait for in-flight requests to complete during
286    /// shutdown. After this timeout, connections are forcefully closed.
287    #[must_use]
288    pub fn with_drain_timeout(mut self, timeout: Duration) -> Self {
289        self.drain_timeout = timeout;
290        self
291    }
292
293    /// Sets the drain timeout in seconds.
294    #[must_use]
295    pub fn with_drain_timeout_secs(mut self, secs: u64) -> Self {
296        self.drain_timeout = Duration::from_secs(secs);
297        self
298    }
299}
300
301impl Default for ServerConfig {
302    fn default() -> Self {
303        Self::new("127.0.0.1:8080")
304    }
305}
306
307/// HTTP server error.
308#[derive(Debug)]
309pub enum ServerError {
310    /// IO error.
311    Io(io::Error),
312    /// Parse error.
313    Parse(ParseError),
314    /// Server was shut down.
315    Shutdown,
316    /// Connection limit reached.
317    ConnectionLimitReached,
318    /// Keep-alive timeout expired (idle connection).
319    KeepAliveTimeout,
320}
321
322impl std::fmt::Display for ServerError {
323    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
324        match self {
325            Self::Io(e) => write!(f, "IO error: {e}"),
326            Self::Parse(e) => write!(f, "Parse error: {e}"),
327            Self::Shutdown => write!(f, "Server shutdown"),
328            Self::ConnectionLimitReached => write!(f, "Connection limit reached"),
329            Self::KeepAliveTimeout => write!(f, "Keep-alive timeout"),
330        }
331    }
332}
333
334impl std::error::Error for ServerError {
335    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
336        match self {
337            Self::Io(e) => Some(e),
338            Self::Parse(e) => Some(e),
339            _ => None,
340        }
341    }
342}
343
344// ============================================================================
345// Host Header Validation
346// ============================================================================
347
348#[derive(Debug, Clone, PartialEq, Eq)]
349enum HostValidationErrorKind {
350    Missing,
351    Invalid,
352    NotAllowed,
353}
354
355#[derive(Debug, Clone)]
356struct HostValidationError {
357    kind: HostValidationErrorKind,
358    detail: String,
359}
360
361impl HostValidationError {
362    fn missing() -> Self {
363        Self {
364            kind: HostValidationErrorKind::Missing,
365            detail: "missing Host header".to_string(),
366        }
367    }
368
369    fn invalid(detail: impl Into<String>) -> Self {
370        Self {
371            kind: HostValidationErrorKind::Invalid,
372            detail: detail.into(),
373        }
374    }
375
376    fn not_allowed(detail: impl Into<String>) -> Self {
377        Self {
378            kind: HostValidationErrorKind::NotAllowed,
379            detail: detail.into(),
380        }
381    }
382
383    fn response(&self) -> Response {
384        let message = match self.kind {
385            HostValidationErrorKind::Missing => "Bad Request: Host header required",
386            HostValidationErrorKind::Invalid => "Bad Request: invalid Host header",
387            HostValidationErrorKind::NotAllowed => "Bad Request: Host not allowed",
388        };
389        Response::with_status(StatusCode::BAD_REQUEST).body(fastapi_core::ResponseBody::Bytes(
390            message.as_bytes().to_vec(),
391        ))
392    }
393}
394
395#[derive(Debug, Clone, PartialEq, Eq)]
396struct HostHeader {
397    host: String,
398    port: Option<u16>,
399}
400
401fn validate_host_header(
402    request: &Request,
403    config: &ServerConfig,
404) -> Result<HostHeader, HostValidationError> {
405    let raw = extract_effective_host(request, config)?;
406    let parsed = parse_host_header(&raw)
407        .ok_or_else(|| HostValidationError::invalid(format!("invalid host value: {raw}")))?;
408
409    if !is_allowed_host(&parsed, &config.allowed_hosts) {
410        return Err(HostValidationError::not_allowed(format!(
411            "host not allowed: {}",
412            parsed.host
413        )));
414    }
415
416    Ok(parsed)
417}
418
419fn extract_effective_host(
420    request: &Request,
421    config: &ServerConfig,
422) -> Result<String, HostValidationError> {
423    if config.trust_x_forwarded_host {
424        if let Some(value) = header_value(request, "x-forwarded-host")? {
425            let forwarded = extract_first_list_value(&value)
426                .ok_or_else(|| HostValidationError::invalid("empty X-Forwarded-Host value"))?;
427            return Ok(forwarded.to_string());
428        }
429    }
430
431    match header_value(request, "host")? {
432        Some(value) => Ok(value),
433        None => Err(HostValidationError::missing()),
434    }
435}
436
437fn header_value(request: &Request, name: &str) -> Result<Option<String>, HostValidationError> {
438    request
439        .headers()
440        .get(name)
441        .map(|bytes| {
442            std::str::from_utf8(bytes)
443                .map(|s| s.trim().to_string())
444                .map_err(|_| {
445                    HostValidationError::invalid(format!("invalid UTF-8 in {name} header"))
446                })
447        })
448        .transpose()
449}
450
451fn extract_first_list_value(value: &str) -> Option<&str> {
452    value.split(',').map(str::trim).find(|v| !v.is_empty())
453}
454
455fn parse_host_header(value: &str) -> Option<HostHeader> {
456    let value = value.trim();
457    if value.is_empty() {
458        return None;
459    }
460    if value.chars().any(|c| c.is_control() || c.is_whitespace()) {
461        return None;
462    }
463
464    if value.starts_with('[') {
465        let end = value.find(']')?;
466        let host = &value[1..end];
467        if host.is_empty() {
468            return None;
469        }
470        if host.parse::<Ipv6Addr>().is_err() {
471            return None;
472        }
473        let rest = &value[end + 1..];
474        let port = if rest.is_empty() {
475            None
476        } else if let Some(port_str) = rest.strip_prefix(':') {
477            parse_port(port_str)
478        } else {
479            return None;
480        };
481        return Some(HostHeader {
482            host: host.to_ascii_lowercase(),
483            port,
484        });
485    }
486
487    let mut parts = value.split(':');
488    let host = parts.next().unwrap_or("");
489    let port_part = parts.next();
490    if parts.next().is_some() {
491        // Multiple colons without brackets (likely IPv6) are invalid
492        return None;
493    }
494    if host.is_empty() {
495        return None;
496    }
497
498    let port = match port_part {
499        Some(p) => parse_port(p),
500        None => None,
501    };
502
503    if host.parse::<Ipv4Addr>().is_ok() || is_valid_hostname(host) {
504        Some(HostHeader {
505            host: host.to_ascii_lowercase(),
506            port,
507        })
508    } else {
509        None
510    }
511}
512
513fn parse_port(port: &str) -> Option<u16> {
514    if port.is_empty() || !port.chars().all(|c| c.is_ascii_digit()) {
515        return None;
516    }
517    let value = port.parse::<u16>().ok()?;
518    if value == 0 { None } else { Some(value) }
519}
520
521fn is_valid_hostname(host: &str) -> bool {
522    // Note: str::len() returns byte length (RFC 1035 specifies 253 octets)
523    if host.len() > 253 {
524        return false;
525    }
526    for label in host.split('.') {
527        if label.is_empty() || label.len() > 63 {
528            return false;
529        }
530        let bytes = label.as_bytes();
531        if bytes.first() == Some(&b'-') || bytes.last() == Some(&b'-') {
532            return false;
533        }
534        if !label.chars().all(|c| c.is_ascii_alphanumeric() || c == '-') {
535            return false;
536        }
537    }
538    true
539}
540
541fn is_allowed_host(host: &HostHeader, allowed_hosts: &[String]) -> bool {
542    if allowed_hosts.is_empty() {
543        return true;
544    }
545
546    allowed_hosts
547        .iter()
548        .any(|pattern| host_matches_pattern(host, pattern))
549}
550
551fn host_matches_pattern(host: &HostHeader, pattern: &str) -> bool {
552    // Note: patterns are pre-lowercased at config time, so no allocation needed here
553    let pattern = pattern.trim();
554    if pattern.is_empty() {
555        return false;
556    }
557    if pattern == "*" {
558        return true;
559    }
560    if let Some(suffix) = pattern.strip_prefix("*.") {
561        // suffix is already lowercase (pre-processed at config time)
562        if host.host == suffix {
563            return false;
564        }
565        return host.host.len() > suffix.len() + 1
566            && host.host.ends_with(suffix)
567            && host.host.as_bytes()[host.host.len() - suffix.len() - 1] == b'.';
568    }
569
570    if let Some(parsed) = parse_host_header(pattern) {
571        if parsed.host != host.host {
572            return false;
573        }
574        if let Some(port) = parsed.port {
575            return host.port == Some(port);
576        }
577        return true;
578    }
579
580    false
581}
582
583impl From<io::Error> for ServerError {
584    fn from(e: io::Error) -> Self {
585        Self::Io(e)
586    }
587}
588
589impl From<ParseError> for ServerError {
590    fn from(e: ParseError) -> Self {
591        Self::Parse(e)
592    }
593}
594
595/// Processes a connection with the given handler.
596///
597/// This is the unified connection handling logic used by all server modes.
598async fn process_connection<H, Fut>(
599    cx: &Cx,
600    request_counter: &AtomicU64,
601    mut stream: TcpStream,
602    _peer_addr: SocketAddr,
603    config: &ServerConfig,
604    handler: H,
605) -> Result<(), ServerError>
606where
607    H: Fn(RequestContext, &mut Request) -> Fut,
608    Fut: Future<Output = Response>,
609{
610    let mut parser = StatefulParser::new().with_limits(config.parse_limits.clone());
611    let mut read_buffer = vec![0u8; config.read_buffer_size];
612    let mut response_writer = ResponseWriter::new();
613    let mut requests_on_connection: usize = 0;
614    let max_requests = config.max_requests_per_connection;
615
616    loop {
617        // Check for cancellation
618        if cx.is_cancel_requested() {
619            return Ok(());
620        }
621
622        // Try to parse a complete request from buffered data first
623        let parse_result = parser.feed(&[])?;
624
625        let mut request = match parse_result {
626            ParseStatus::Complete { request, .. } => request,
627            ParseStatus::Incomplete => {
628                let keep_alive_timeout = config.keep_alive_timeout;
629
630                let bytes_read = if keep_alive_timeout.is_zero() {
631                    read_into_buffer(&mut stream, &mut read_buffer).await?
632                } else {
633                    match read_with_timeout(&mut stream, &mut read_buffer, keep_alive_timeout).await
634                    {
635                        Ok(0) => return Ok(()),
636                        Ok(n) => n,
637                        Err(e) if e.kind() == io::ErrorKind::TimedOut => {
638                            cx.trace(&format!(
639                                "Keep-alive timeout ({:?}) - closing idle connection",
640                                keep_alive_timeout
641                            ));
642                            return Err(ServerError::KeepAliveTimeout);
643                        }
644                        Err(e) => return Err(ServerError::Io(e)),
645                    }
646                };
647
648                if bytes_read == 0 {
649                    return Ok(());
650                }
651
652                match parser.feed(&read_buffer[..bytes_read])? {
653                    ParseStatus::Complete { request, .. } => request,
654                    ParseStatus::Incomplete => continue,
655                }
656            }
657        };
658
659        requests_on_connection += 1;
660
661        // Generate unique request ID for this request with timeout budget
662        let request_id = request_counter.fetch_add(1, Ordering::Relaxed);
663        let request_budget = Budget::new().with_deadline(config.request_timeout);
664        let request_cx = Cx::for_testing_with_budget(request_budget);
665        let ctx = RequestContext::new(request_cx, request_id);
666
667        // Validate Host header
668        if let Err(err) = validate_host_header(&request, config) {
669            ctx.trace(&format!("Rejecting request: {}", err.detail));
670            let response = err.response().header("connection", b"close".to_vec());
671            let response_write = response_writer.write(response);
672            write_response(&mut stream, response_write).await?;
673            return Ok(());
674        }
675
676        // Handle Expect: 100-continue
677        // RFC 7231 Section 5.1.1: If the server receives a request with Expect: 100-continue,
678        // it should either send 100 Continue (to proceed) or a final status code (to reject).
679        match ExpectHandler::check_expect(&request) {
680            ExpectResult::NoExpectation => {
681                // No Expect header - proceed normally
682            }
683            ExpectResult::ExpectsContinue => {
684                // Expect: 100-continue present
685                // Send 100 Continue to tell client to proceed with body
686                // Note: In a full implementation, pre-body validation hooks would run here
687                // to validate auth, content-type, content-length before accepting the body.
688                ctx.trace("Sending 100 Continue for Expect: 100-continue");
689                write_raw_response(&mut stream, CONTINUE_RESPONSE).await?;
690            }
691            ExpectResult::UnknownExpectation(value) => {
692                // Unknown expectation - return 417 Expectation Failed
693                ctx.trace(&format!("Rejecting unknown Expect value: {}", value));
694                let response =
695                    ExpectHandler::expectation_failed(format!("Unsupported Expect value: {value}"));
696                let response_write = response_writer.write(response);
697                write_response(&mut stream, response_write).await?;
698                return Ok(());
699            }
700        }
701
702        let client_wants_keep_alive = should_keep_alive(&request);
703        let at_max_requests = max_requests > 0 && requests_on_connection >= max_requests;
704        let server_will_keep_alive = client_wants_keep_alive && !at_max_requests;
705
706        let request_start = Instant::now();
707        let timeout_duration = Duration::from_nanos(config.request_timeout.as_nanos());
708
709        // Call the handler
710        let response = handler(ctx, &mut request).await;
711
712        let mut response = if request_start.elapsed() > timeout_duration {
713            Response::with_status(StatusCode::GATEWAY_TIMEOUT).body(
714                fastapi_core::ResponseBody::Bytes(
715                    b"Gateway Timeout: request processing exceeded time limit".to_vec(),
716                ),
717            )
718        } else {
719            response
720        };
721
722        response = if server_will_keep_alive {
723            response.header("connection", b"keep-alive".to_vec())
724        } else {
725            response.header("connection", b"close".to_vec())
726        };
727
728        let response_write = response_writer.write(response);
729        write_response(&mut stream, response_write).await?;
730
731        if let Some(tasks) = App::take_background_tasks(&mut request) {
732            tasks.execute_all().await;
733        }
734
735        if !server_will_keep_alive {
736            return Ok(());
737        }
738    }
739}
740
741/// TCP server with asupersync integration.
742///
743/// This server manages the lifecycle of connections and requests using
744/// asupersync's structured concurrency primitives. Each connection runs
745/// in its own region, and each request gets its own task with a budget.
746#[derive(Debug)]
747pub struct TcpServer {
748    config: ServerConfig,
749    request_counter: Arc<AtomicU64>,
750    /// Current number of active connections (wrapped in Arc for concurrent feature).
751    connection_counter: Arc<AtomicU64>,
752    /// Whether the server is draining (shutting down gracefully).
753    draining: Arc<AtomicBool>,
754    /// Handles to spawned connection tasks for graceful shutdown.
755    connection_handles: Mutex<Vec<TaskHandle<()>>>,
756    /// Shutdown controller for coordinated graceful shutdown.
757    shutdown_controller: Arc<ShutdownController>,
758    /// Connection pool metrics counters.
759    metrics_counters: Arc<MetricsCounters>,
760}
761
762impl TcpServer {
763    /// Creates a new TCP server with the given configuration.
764    #[must_use]
765    pub fn new(config: ServerConfig) -> Self {
766        Self {
767            config,
768            request_counter: Arc::new(AtomicU64::new(0)),
769            connection_counter: Arc::new(AtomicU64::new(0)),
770            draining: Arc::new(AtomicBool::new(false)),
771            connection_handles: Mutex::new(Vec::new()),
772            shutdown_controller: Arc::new(ShutdownController::new()),
773            metrics_counters: Arc::new(MetricsCounters::new()),
774        }
775    }
776
777    /// Returns the server configuration.
778    #[must_use]
779    pub fn config(&self) -> &ServerConfig {
780        &self.config
781    }
782
783    /// Generates a unique request ID.
784    fn next_request_id(&self) -> u64 {
785        self.request_counter.fetch_add(1, Ordering::Relaxed)
786    }
787
788    /// Returns the current number of active connections.
789    #[must_use]
790    pub fn current_connections(&self) -> u64 {
791        self.connection_counter.load(Ordering::Relaxed)
792    }
793
794    /// Returns a snapshot of the server's connection pool metrics.
795    #[must_use]
796    pub fn metrics(&self) -> ServerMetrics {
797        ServerMetrics {
798            active_connections: self.connection_counter.load(Ordering::Relaxed),
799            total_accepted: self.metrics_counters.total_accepted.load(Ordering::Relaxed),
800            total_rejected: self.metrics_counters.total_rejected.load(Ordering::Relaxed),
801            total_timed_out: self
802                .metrics_counters
803                .total_timed_out
804                .load(Ordering::Relaxed),
805            total_requests: self.request_counter.load(Ordering::Relaxed),
806            bytes_in: self.metrics_counters.bytes_in.load(Ordering::Relaxed),
807            bytes_out: self.metrics_counters.bytes_out.load(Ordering::Relaxed),
808        }
809    }
810
811    /// Records bytes read from a client.
812    fn record_bytes_in(&self, n: u64) {
813        self.metrics_counters
814            .bytes_in
815            .fetch_add(n, Ordering::Relaxed);
816    }
817
818    /// Records bytes written to a client.
819    fn record_bytes_out(&self, n: u64) {
820        self.metrics_counters
821            .bytes_out
822            .fetch_add(n, Ordering::Relaxed);
823    }
824
825    /// Attempts to acquire a connection slot.
826    ///
827    /// Returns true if a slot was acquired, false if the connection limit
828    /// has been reached. If max_connections is 0 (unlimited), always returns true.
829    fn try_acquire_connection(&self) -> bool {
830        let max = self.config.max_connections;
831        if max == 0 {
832            // Unlimited connections
833            self.connection_counter.fetch_add(1, Ordering::Relaxed);
834            self.metrics_counters
835                .total_accepted
836                .fetch_add(1, Ordering::Relaxed);
837            return true;
838        }
839
840        // Try to increment if under limit
841        let mut current = self.connection_counter.load(Ordering::Relaxed);
842        loop {
843            if current >= max as u64 {
844                self.metrics_counters
845                    .total_rejected
846                    .fetch_add(1, Ordering::Relaxed);
847                return false;
848            }
849            match self.connection_counter.compare_exchange_weak(
850                current,
851                current + 1,
852                Ordering::AcqRel,
853                Ordering::Relaxed,
854            ) {
855                Ok(_) => {
856                    self.metrics_counters
857                        .total_accepted
858                        .fetch_add(1, Ordering::Relaxed);
859                    return true;
860                }
861                Err(actual) => current = actual,
862            }
863        }
864    }
865
866    /// Releases a connection slot.
867    fn release_connection(&self) {
868        self.connection_counter.fetch_sub(1, Ordering::Relaxed);
869    }
870
871    /// Returns true if the server is draining (shutting down gracefully).
872    #[must_use]
873    pub fn is_draining(&self) -> bool {
874        self.draining.load(Ordering::Acquire)
875    }
876
877    /// Starts the drain process for graceful shutdown.
878    ///
879    /// This sets the draining flag, which causes the server to:
880    /// - Stop accepting new connections
881    /// - Return 503 to new connection attempts
882    /// - Allow in-flight requests to complete
883    pub fn start_drain(&self) {
884        self.draining.store(true, Ordering::Release);
885    }
886
887    /// Waits for all in-flight connections to drain, with a timeout.
888    ///
889    /// Returns `true` if all connections drained successfully,
890    /// `false` if the timeout was reached with connections still active.
891    ///
892    /// # Arguments
893    ///
894    /// * `timeout` - Maximum time to wait for connections to drain
895    /// * `poll_interval` - How often to check connection count (default 10ms)
896    pub async fn wait_for_drain(&self, timeout: Duration, poll_interval: Option<Duration>) -> bool {
897        let start = Instant::now();
898        let poll_interval = poll_interval.unwrap_or(Duration::from_millis(10));
899
900        while self.current_connections() > 0 {
901            if start.elapsed() >= timeout {
902                return false;
903            }
904            // NOTE: We use blocking sleep here intentionally:
905            // 1. This is only called during graceful shutdown (not a hot path)
906            // 2. The default poll interval is 10ms (minimal CPU impact)
907            // 3. During shutdown, blocking briefly is acceptable
908            // 4. Using async sleep would require threading Time through the API
909            //
910            // If this becomes a bottleneck, consider:
911            // - Using asupersync::runtime::yield_now() in a tighter loop
912            // - Adding a Cx parameter to access async sleep
913            std::thread::sleep(poll_interval);
914        }
915        true
916    }
917
918    /// Initiates graceful shutdown and waits for connections to drain.
919    ///
920    /// This is a convenience method that combines `start_drain()` and
921    /// `wait_for_drain()` using the configured drain timeout.
922    ///
923    /// Returns the number of connections that were forcefully closed
924    /// (0 if all drained within the timeout).
925    pub async fn drain(&self) -> u64 {
926        self.start_drain();
927        let drained = self.wait_for_drain(self.config.drain_timeout, None).await;
928        if drained {
929            0
930        } else {
931            self.current_connections()
932        }
933    }
934
935    /// Returns a reference to the server's shutdown controller.
936    ///
937    /// This can be used to coordinate shutdown from external code,
938    /// such as signal handlers or health check endpoints.
939    #[must_use]
940    pub fn shutdown_controller(&self) -> &Arc<ShutdownController> {
941        &self.shutdown_controller
942    }
943
944    /// Returns a receiver for shutdown notifications.
945    ///
946    /// Use this to receive shutdown signals in other parts of your application.
947    /// Multiple receivers can be created and they will all be notified.
948    #[must_use]
949    pub fn subscribe_shutdown(&self) -> ShutdownReceiver {
950        self.shutdown_controller.subscribe()
951    }
952
953    /// Initiates server shutdown.
954    ///
955    /// This triggers the shutdown process:
956    /// 1. Sets the draining flag to stop accepting new connections
957    /// 2. Notifies all shutdown receivers
958    /// 3. The server's accept loop will exit and drain connections
959    ///
960    /// This method is safe to call multiple times - subsequent calls are no-ops.
961    pub fn shutdown(&self) {
962        self.start_drain();
963        self.shutdown_controller.shutdown();
964    }
965
966    /// Checks if shutdown has been initiated.
967    #[must_use]
968    pub fn is_shutting_down(&self) -> bool {
969        self.shutdown_controller.is_shutting_down() || self.is_draining()
970    }
971
972    /// Runs the server with graceful shutdown support.
973    ///
974    /// The server will run until either:
975    /// - The provided shutdown receiver signals shutdown
976    /// - The server Cx is cancelled
977    /// - An unrecoverable error occurs
978    ///
979    /// When shutdown is signaled, the server will:
980    /// 1. Stop accepting new connections
981    /// 2. Wait for existing connections to complete (up to drain_timeout)
982    /// 3. Return gracefully
983    ///
984    /// # Example
985    ///
986    /// ```ignore
987    /// use asupersync::signal::ShutdownController;
988    /// use fastapi_http::{TcpServer, ServerConfig};
989    ///
990    /// let controller = ShutdownController::new();
991    /// let server = TcpServer::new(ServerConfig::new("127.0.0.1:8080"));
992    ///
993    /// // Get a shutdown receiver
994    /// let shutdown = controller.subscribe();
995    ///
996    /// // In another task, you can trigger shutdown:
997    /// // controller.shutdown();
998    ///
999    /// server.serve_with_shutdown(&cx, shutdown, handler).await?;
1000    /// ```
1001    pub async fn serve_with_shutdown<H, Fut>(
1002        &self,
1003        cx: &Cx,
1004        mut shutdown: ShutdownReceiver,
1005        handler: H,
1006    ) -> Result<GracefulOutcome<()>, ServerError>
1007    where
1008        H: Fn(RequestContext, &mut Request) -> Fut + Send + Sync + 'static,
1009        Fut: Future<Output = Response> + Send + 'static,
1010    {
1011        let bind_addr = self.config.bind_addr.clone();
1012        let listener = TcpListener::bind(bind_addr).await?;
1013        let local_addr = listener.local_addr()?;
1014
1015        cx.trace(&format!(
1016            "Server listening on {local_addr} (with graceful shutdown)"
1017        ));
1018
1019        // Run the accept loop with shutdown racing
1020        let result = self
1021            .accept_loop_with_shutdown(cx, listener, handler, &mut shutdown)
1022            .await;
1023
1024        match result {
1025            Ok(outcome) => {
1026                if outcome.is_shutdown() {
1027                    cx.trace("Shutdown signal received, draining connections");
1028                    self.start_drain();
1029                    self.drain_connection_tasks(cx).await;
1030                }
1031                Ok(outcome)
1032            }
1033            Err(e) => Err(e),
1034        }
1035    }
1036
1037    /// Accept loop that checks for shutdown signals.
1038    async fn accept_loop_with_shutdown<H, Fut>(
1039        &self,
1040        cx: &Cx,
1041        listener: TcpListener,
1042        handler: H,
1043        shutdown: &mut ShutdownReceiver,
1044    ) -> Result<GracefulOutcome<()>, ServerError>
1045    where
1046        H: Fn(RequestContext, &mut Request) -> Fut + Send + Sync + 'static,
1047        Fut: Future<Output = Response> + Send + 'static,
1048    {
1049        let handler = Arc::new(handler);
1050
1051        loop {
1052            // Check for shutdown or cancellation first
1053            if shutdown.is_shutting_down() {
1054                return Ok(GracefulOutcome::ShutdownSignaled);
1055            }
1056            if cx.is_cancel_requested() || self.is_draining() {
1057                return Ok(GracefulOutcome::ShutdownSignaled);
1058            }
1059
1060            // Accept a connection
1061            let (mut stream, peer_addr) = match listener.accept().await {
1062                Ok(conn) => conn,
1063                Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1064                    continue;
1065                }
1066                Err(e) => {
1067                    cx.trace(&format!("Accept error: {e}"));
1068                    if is_fatal_accept_error(&e) {
1069                        return Err(ServerError::Io(e));
1070                    }
1071                    continue;
1072                }
1073            };
1074
1075            // Check connection limit before processing
1076            if !self.try_acquire_connection() {
1077                cx.trace(&format!(
1078                    "Connection limit reached ({}), rejecting {peer_addr}",
1079                    self.config.max_connections
1080                ));
1081
1082                let response = Response::with_status(StatusCode::SERVICE_UNAVAILABLE)
1083                    .header("connection", b"close".to_vec())
1084                    .body(fastapi_core::ResponseBody::Bytes(
1085                        b"503 Service Unavailable: connection limit reached".to_vec(),
1086                    ));
1087                let mut writer = crate::response::ResponseWriter::new();
1088                let response_bytes = writer.write(response);
1089                let _ = write_response(&mut stream, response_bytes).await;
1090                continue;
1091            }
1092
1093            // Configure the connection
1094            if self.config.tcp_nodelay {
1095                let _ = stream.set_nodelay(true);
1096            }
1097
1098            cx.trace(&format!(
1099                "Accepted connection from {peer_addr} ({}/{})",
1100                self.current_connections(),
1101                if self.config.max_connections == 0 {
1102                    "∞".to_string()
1103                } else {
1104                    self.config.max_connections.to_string()
1105                }
1106            ));
1107
1108            let request_id = self.next_request_id();
1109            let request_budget = Budget::new().with_deadline(self.config.request_timeout);
1110            let request_cx = Cx::for_testing_with_budget(request_budget);
1111            let ctx = RequestContext::new(request_cx, request_id);
1112
1113            // Handle connection inline (single-threaded mode)
1114            let result = self
1115                .handle_connection(&ctx, stream, peer_addr, &*handler)
1116                .await;
1117
1118            self.release_connection();
1119
1120            if let Err(e) = result {
1121                cx.trace(&format!("Connection error from {peer_addr}: {e}"));
1122            }
1123        }
1124    }
1125
1126    /// Runs the server, accepting connections and handling requests.
1127    ///
1128    /// This method will run until the server Cx is cancelled or an
1129    /// unrecoverable error occurs.
1130    ///
1131    /// # Arguments
1132    ///
1133    /// * `cx` - The capability context for the server region
1134    /// * `handler` - The request handler function
1135    ///
1136    /// # Errors
1137    ///
1138    /// Returns an error if binding fails or an unrecoverable IO error occurs.
1139    pub async fn serve<H, Fut>(&self, cx: &Cx, handler: H) -> Result<(), ServerError>
1140    where
1141        H: Fn(RequestContext, &mut Request) -> Fut + Send + Sync + 'static,
1142        Fut: Future<Output = Response> + Send + 'static,
1143    {
1144        let bind_addr = self.config.bind_addr.clone();
1145        let listener = TcpListener::bind(bind_addr).await?;
1146        let local_addr = listener.local_addr()?;
1147
1148        cx.trace(&format!("Server listening on {local_addr}"));
1149
1150        self.accept_loop(cx, listener, handler).await
1151    }
1152
1153    /// Runs the server on a specific listener.
1154    ///
1155    /// This is useful when you already have a bound listener.
1156    pub async fn serve_on<H, Fut>(
1157        &self,
1158        cx: &Cx,
1159        listener: TcpListener,
1160        handler: H,
1161    ) -> Result<(), ServerError>
1162    where
1163        H: Fn(RequestContext, &mut Request) -> Fut + Send + Sync + 'static,
1164        Fut: Future<Output = Response> + Send + 'static,
1165    {
1166        self.accept_loop(cx, listener, handler).await
1167    }
1168
1169    /// Runs the server with a Handler trait object.
1170    ///
1171    /// This is the recommended way to serve an application that implements
1172    /// the `Handler` trait (like `App`).
1173    ///
1174    /// # Example
1175    ///
1176    /// ```ignore
1177    /// use fastapi_http::TcpServer;
1178    /// use fastapi_core::{App, Handler};
1179    /// use std::sync::Arc;
1180    ///
1181    /// let app = App::builder()
1182    ///     .get("/", handler_fn)
1183    ///     .build();
1184    ///
1185    /// let server = TcpServer::new(ServerConfig::new("127.0.0.1:8080"));
1186    /// let cx = Cx::for_testing();
1187    /// server.serve_handler(&cx, Arc::new(app)).await?;
1188    /// ```
1189    pub async fn serve_handler(
1190        &self,
1191        cx: &Cx,
1192        handler: Arc<dyn fastapi_core::Handler>,
1193    ) -> Result<(), ServerError> {
1194        let bind_addr = self.config.bind_addr.clone();
1195        let listener = TcpListener::bind(bind_addr).await?;
1196        let local_addr = listener.local_addr()?;
1197
1198        cx.trace(&format!("Server listening on {local_addr}"));
1199
1200        self.accept_loop_handler(cx, listener, handler).await
1201    }
1202
1203    /// Runs the server on a specific listener with a Handler trait object.
1204    pub async fn serve_on_handler(
1205        &self,
1206        cx: &Cx,
1207        listener: TcpListener,
1208        handler: Arc<dyn fastapi_core::Handler>,
1209    ) -> Result<(), ServerError> {
1210        self.accept_loop_handler(cx, listener, handler).await
1211    }
1212
1213    /// Accept loop for Handler trait objects.
1214    async fn accept_loop_handler(
1215        &self,
1216        cx: &Cx,
1217        listener: TcpListener,
1218        handler: Arc<dyn fastapi_core::Handler>,
1219    ) -> Result<(), ServerError> {
1220        loop {
1221            // Check for cancellation at each iteration.
1222            if cx.is_cancel_requested() {
1223                cx.trace("Server shutdown requested");
1224                return Ok(());
1225            }
1226
1227            // Check if draining (graceful shutdown)
1228            if self.is_draining() {
1229                cx.trace("Server draining, stopping accept loop");
1230                return Err(ServerError::Shutdown);
1231            }
1232
1233            // Accept a connection.
1234            let (mut stream, peer_addr) = match listener.accept().await {
1235                Ok(conn) => conn,
1236                Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1237                    continue;
1238                }
1239                Err(e) => {
1240                    cx.trace(&format!("Accept error: {e}"));
1241                    if is_fatal_accept_error(&e) {
1242                        return Err(ServerError::Io(e));
1243                    }
1244                    continue;
1245                }
1246            };
1247
1248            // Check connection limit before processing
1249            if !self.try_acquire_connection() {
1250                cx.trace(&format!(
1251                    "Connection limit reached ({}), rejecting {peer_addr}",
1252                    self.config.max_connections
1253                ));
1254
1255                let response = Response::with_status(StatusCode::SERVICE_UNAVAILABLE)
1256                    .header("connection", b"close".to_vec())
1257                    .body(fastapi_core::ResponseBody::Bytes(
1258                        b"503 Service Unavailable: connection limit reached".to_vec(),
1259                    ));
1260                let mut writer = crate::response::ResponseWriter::new();
1261                let response_bytes = writer.write(response);
1262                let _ = write_response(&mut stream, response_bytes).await;
1263                continue;
1264            }
1265
1266            // Configure the connection.
1267            if self.config.tcp_nodelay {
1268                let _ = stream.set_nodelay(true);
1269            }
1270
1271            cx.trace(&format!(
1272                "Accepted connection from {peer_addr} ({}/{})",
1273                self.current_connections(),
1274                if self.config.max_connections == 0 {
1275                    "∞".to_string()
1276                } else {
1277                    self.config.max_connections.to_string()
1278                }
1279            ));
1280
1281            // Handle the connection with the Handler trait object
1282            let result = self
1283                .handle_connection_handler(cx, stream, peer_addr, &*handler)
1284                .await;
1285
1286            self.release_connection();
1287
1288            if let Err(e) = result {
1289                cx.trace(&format!("Connection error from {peer_addr}: {e}"));
1290            }
1291        }
1292    }
1293
1294    /// Serves HTTP requests with concurrent connection handling using asupersync Scope.
1295    ///
1296    /// This method uses `Scope::spawn_registered` for proper structured concurrency,
1297    /// ensuring all spawned connection tasks are tracked and can be properly drained
1298    /// during shutdown.
1299    ///
1300    /// # Arguments
1301    ///
1302    /// * `cx` - The asupersync context for cancellation and tracing
1303    /// * `scope` - A scope for spawning connection tasks
1304    /// * `state` - Runtime state for task registration
1305    /// * `handler` - The request handler
1306    #[allow(clippy::too_many_lines)]
1307    pub async fn serve_concurrent<H, Fut>(
1308        &self,
1309        cx: &Cx,
1310        scope: &Scope<'_>,
1311        state: &mut RuntimeState,
1312        handler: H,
1313    ) -> Result<(), ServerError>
1314    where
1315        H: Fn(RequestContext, &mut Request) -> Fut + Send + Sync + 'static,
1316        Fut: Future<Output = Response> + Send + 'static,
1317    {
1318        let bind_addr = self.config.bind_addr.clone();
1319        let listener = TcpListener::bind(bind_addr).await?;
1320        let local_addr = listener.local_addr()?;
1321
1322        cx.trace(&format!(
1323            "Server listening on {local_addr} (concurrent mode)"
1324        ));
1325
1326        let handler = Arc::new(handler);
1327
1328        self.accept_loop_concurrent(cx, scope, state, listener, handler)
1329            .await
1330    }
1331
1332    /// Accept loop that spawns connection handlers concurrently using Scope.
1333    async fn accept_loop_concurrent<H, Fut>(
1334        &self,
1335        cx: &Cx,
1336        scope: &Scope<'_>,
1337        state: &mut RuntimeState,
1338        listener: TcpListener,
1339        handler: Arc<H>,
1340    ) -> Result<(), ServerError>
1341    where
1342        H: Fn(RequestContext, &mut Request) -> Fut + Send + Sync + 'static,
1343        Fut: Future<Output = Response> + Send + 'static,
1344    {
1345        loop {
1346            // Check for cancellation or drain
1347            if cx.is_cancel_requested() || self.is_draining() {
1348                cx.trace("Server shutting down, draining connections");
1349                self.drain_connection_tasks(cx).await;
1350                return Ok(());
1351            }
1352
1353            // Accept a connection
1354            let (mut stream, peer_addr) = match listener.accept().await {
1355                Ok(conn) => conn,
1356                Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1357                    continue;
1358                }
1359                Err(e) => {
1360                    cx.trace(&format!("Accept error: {e}"));
1361                    if is_fatal_accept_error(&e) {
1362                        return Err(ServerError::Io(e));
1363                    }
1364                    continue;
1365                }
1366            };
1367
1368            // Check connection limit before processing
1369            if !self.try_acquire_connection() {
1370                cx.trace(&format!(
1371                    "Connection limit reached ({}), rejecting {peer_addr}",
1372                    self.config.max_connections
1373                ));
1374
1375                let response = Response::with_status(StatusCode::SERVICE_UNAVAILABLE)
1376                    .header("connection", b"close".to_vec())
1377                    .body(fastapi_core::ResponseBody::Bytes(
1378                        b"503 Service Unavailable: connection limit reached".to_vec(),
1379                    ));
1380                let mut writer = crate::response::ResponseWriter::new();
1381                let response_bytes = writer.write(response);
1382                let _ = write_response(&mut stream, response_bytes).await;
1383                continue;
1384            }
1385
1386            // Configure the connection
1387            if self.config.tcp_nodelay {
1388                let _ = stream.set_nodelay(true);
1389            }
1390
1391            cx.trace(&format!(
1392                "Accepted connection from {peer_addr} ({}/{})",
1393                self.current_connections(),
1394                if self.config.max_connections == 0 {
1395                    "∞".to_string()
1396                } else {
1397                    self.config.max_connections.to_string()
1398                }
1399            ));
1400
1401            // Spawn connection task using Scope
1402            match self.spawn_connection_task(
1403                scope,
1404                state,
1405                cx,
1406                stream,
1407                peer_addr,
1408                Arc::clone(&handler),
1409            ) {
1410                Ok(handle) => {
1411                    // Store handle for draining
1412                    if let Ok(mut handles) = self.connection_handles.lock() {
1413                        handles.push(handle);
1414                    }
1415                    // Periodically clean up completed handles
1416                    self.cleanup_completed_handles();
1417                }
1418                Err(e) => {
1419                    cx.trace(&format!("Failed to spawn connection task: {e:?}"));
1420                    self.release_connection();
1421                }
1422            }
1423        }
1424    }
1425
1426    /// Spawns a connection handler task using Scope::spawn_registered.
1427    fn spawn_connection_task<H, Fut>(
1428        &self,
1429        scope: &Scope<'_>,
1430        state: &mut RuntimeState,
1431        cx: &Cx,
1432        stream: TcpStream,
1433        peer_addr: SocketAddr,
1434        handler: Arc<H>,
1435    ) -> Result<TaskHandle<()>, SpawnError>
1436    where
1437        H: Fn(RequestContext, &mut Request) -> Fut + Send + Sync + 'static,
1438        Fut: Future<Output = Response> + Send + 'static,
1439    {
1440        let config = self.config.clone();
1441        let request_counter = Arc::clone(&self.request_counter);
1442        let connection_counter = Arc::clone(&self.connection_counter);
1443
1444        scope.spawn_registered(state, cx, move |task_cx| async move {
1445            let result = process_connection(
1446                &task_cx,
1447                &request_counter,
1448                stream,
1449                peer_addr,
1450                &config,
1451                |ctx, req| handler(ctx, req),
1452            )
1453            .await;
1454
1455            // Release connection slot (always, regardless of success/failure)
1456            connection_counter.fetch_sub(1, Ordering::Relaxed);
1457
1458            if let Err(e) = result {
1459                // Log error - in production this would use proper logging
1460                eprintln!("Connection error from {peer_addr}: {e}");
1461            }
1462        })
1463    }
1464
1465    /// Removes completed task handles from the tracking vector.
1466    fn cleanup_completed_handles(&self) {
1467        if let Ok(mut handles) = self.connection_handles.lock() {
1468            handles.retain(|handle| !handle.is_finished());
1469        }
1470    }
1471
1472    /// Drains all connection tasks during shutdown.
1473    async fn drain_connection_tasks(&self, cx: &Cx) {
1474        let drain_timeout = self.config.drain_timeout;
1475        let start = Instant::now();
1476
1477        cx.trace(&format!(
1478            "Draining {} connection tasks (timeout: {:?})",
1479            self.connection_handles.lock().map_or(0, |h| h.len()),
1480            drain_timeout
1481        ));
1482
1483        // Wait for all tasks to complete or timeout
1484        while start.elapsed() < drain_timeout {
1485            let remaining = self
1486                .connection_handles
1487                .lock()
1488                .map_or(0, |h| h.iter().filter(|t| !t.is_finished()).count());
1489
1490            if remaining == 0 {
1491                cx.trace("All connection tasks drained successfully");
1492                return;
1493            }
1494
1495            // Yield to allow tasks to make progress
1496            asupersync::runtime::yield_now().await;
1497        }
1498
1499        cx.trace(&format!(
1500            "Drain timeout reached with {} tasks still running",
1501            self.connection_handles
1502                .lock()
1503                .map_or(0, |h| h.iter().filter(|t| !t.is_finished()).count())
1504        ));
1505    }
1506
1507    /// Handles a single connection using the Handler trait.
1508    ///
1509    /// This is a specialized version for trait objects where we cannot use a closure
1510    /// due to lifetime constraints of BoxFuture.
1511    async fn handle_connection_handler(
1512        &self,
1513        cx: &Cx,
1514        mut stream: TcpStream,
1515        _peer_addr: SocketAddr,
1516        handler: &dyn fastapi_core::Handler,
1517    ) -> Result<(), ServerError> {
1518        let mut parser = StatefulParser::new().with_limits(self.config.parse_limits.clone());
1519        let mut read_buffer = vec![0u8; self.config.read_buffer_size];
1520        let mut response_writer = ResponseWriter::new();
1521        let mut requests_on_connection: usize = 0;
1522        let max_requests = self.config.max_requests_per_connection;
1523
1524        loop {
1525            // Check for cancellation
1526            if cx.is_cancel_requested() {
1527                return Ok(());
1528            }
1529
1530            // Parse request from connection
1531            let parse_result = parser.feed(&[])?;
1532
1533            let mut request = match parse_result {
1534                ParseStatus::Complete { request, .. } => request,
1535                ParseStatus::Incomplete => {
1536                    let keep_alive_timeout = self.config.keep_alive_timeout;
1537                    let bytes_read = if keep_alive_timeout.is_zero() {
1538                        read_into_buffer(&mut stream, &mut read_buffer).await?
1539                    } else {
1540                        match read_with_timeout(&mut stream, &mut read_buffer, keep_alive_timeout)
1541                            .await
1542                        {
1543                            Ok(0) => return Ok(()),
1544                            Ok(n) => n,
1545                            Err(e) if e.kind() == io::ErrorKind::TimedOut => {
1546                                self.metrics_counters
1547                                    .total_timed_out
1548                                    .fetch_add(1, Ordering::Relaxed);
1549                                return Err(ServerError::KeepAliveTimeout);
1550                            }
1551                            Err(e) => return Err(ServerError::Io(e)),
1552                        }
1553                    };
1554
1555                    if bytes_read == 0 {
1556                        return Ok(());
1557                    }
1558
1559                    self.record_bytes_in(bytes_read as u64);
1560
1561                    match parser.feed(&read_buffer[..bytes_read])? {
1562                        ParseStatus::Complete { request, .. } => request,
1563                        ParseStatus::Incomplete => continue,
1564                    }
1565                }
1566            };
1567
1568            requests_on_connection += 1;
1569
1570            // Create request context
1571            let request_id = self.request_counter.fetch_add(1, Ordering::Relaxed);
1572            let request_budget = Budget::new().with_deadline(self.config.request_timeout);
1573            let request_cx = Cx::for_testing_with_budget(request_budget);
1574            let ctx = RequestContext::new(request_cx, request_id);
1575
1576            // Call handler - ctx lives until after await
1577            let response = handler.call(&ctx, &mut request).await;
1578
1579            // Determine keep-alive behavior
1580            let client_wants_keep_alive = should_keep_alive(&request);
1581            let server_will_keep_alive = client_wants_keep_alive
1582                && (max_requests == 0 || requests_on_connection < max_requests);
1583
1584            let response = if server_will_keep_alive {
1585                response.header("connection", b"keep-alive".to_vec())
1586            } else {
1587                response.header("connection", b"close".to_vec())
1588            };
1589
1590            let response_write = response_writer.write(response);
1591            if let ResponseWrite::Full(ref bytes) = response_write {
1592                self.record_bytes_out(bytes.len() as u64);
1593            }
1594            write_response(&mut stream, response_write).await?;
1595
1596            if !server_will_keep_alive {
1597                return Ok(());
1598            }
1599        }
1600    }
1601
1602    /// The main accept loop.
1603    async fn accept_loop<H, Fut>(
1604        &self,
1605        cx: &Cx,
1606        listener: TcpListener,
1607        handler: H,
1608    ) -> Result<(), ServerError>
1609    where
1610        H: Fn(RequestContext, &mut Request) -> Fut + Send + Sync + 'static,
1611        Fut: Future<Output = Response> + Send + 'static,
1612    {
1613        let handler = Arc::new(handler);
1614
1615        loop {
1616            // Check for cancellation at each iteration.
1617            if cx.is_cancel_requested() {
1618                cx.trace("Server shutdown requested");
1619                return Ok(());
1620            }
1621
1622            // Check if draining (graceful shutdown)
1623            if self.is_draining() {
1624                cx.trace("Server draining, stopping accept loop");
1625                return Err(ServerError::Shutdown);
1626            }
1627
1628            // Accept a connection.
1629            let (mut stream, peer_addr) = match listener.accept().await {
1630                Ok(conn) => conn,
1631                Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1632                    // Yield and retry.
1633                    continue;
1634                }
1635                Err(e) => {
1636                    cx.trace(&format!("Accept error: {e}"));
1637                    // For most errors, we continue accepting.
1638                    // Only fatal errors should propagate.
1639                    if is_fatal_accept_error(&e) {
1640                        return Err(ServerError::Io(e));
1641                    }
1642                    continue;
1643                }
1644            };
1645
1646            // Check connection limit before processing
1647            if !self.try_acquire_connection() {
1648                cx.trace(&format!(
1649                    "Connection limit reached ({}), rejecting {peer_addr}",
1650                    self.config.max_connections
1651                ));
1652
1653                // Send a 503 Service Unavailable response and close
1654                let response = Response::with_status(StatusCode::SERVICE_UNAVAILABLE)
1655                    .header("connection", b"close".to_vec())
1656                    .body(fastapi_core::ResponseBody::Bytes(
1657                        b"503 Service Unavailable: connection limit reached".to_vec(),
1658                    ));
1659                let mut writer = crate::response::ResponseWriter::new();
1660                let response_bytes = writer.write(response);
1661                let _ = write_response(&mut stream, response_bytes).await;
1662                continue;
1663            }
1664
1665            // Configure the connection.
1666            if self.config.tcp_nodelay {
1667                let _ = stream.set_nodelay(true);
1668            }
1669
1670            cx.trace(&format!(
1671                "Accepted connection from {peer_addr} ({}/{})",
1672                self.current_connections(),
1673                if self.config.max_connections == 0 {
1674                    "∞".to_string()
1675                } else {
1676                    self.config.max_connections.to_string()
1677                }
1678            ));
1679
1680            // Spawn connection handling concurrently when the feature is enabled.
1681            // When asupersync has an accessible spawn API from Cx, we can use that
1682            // for proper structured concurrency. For now, use tokio::spawn.
1683            #[cfg(feature = "concurrent")]
1684            {
1685                self.spawn_connection_handler(cx.clone(), stream, peer_addr, Arc::clone(&handler));
1686            }
1687
1688            // Without the concurrent feature, handle inline (blocking accept loop).
1689            // This is simpler but means only one connection is handled at a time.
1690            #[cfg(not(feature = "concurrent"))]
1691            {
1692                let request_id = self.next_request_id();
1693                let request_budget = Budget::new().with_deadline(self.config.request_timeout);
1694
1695                // Create a RequestContext for this request with the configured timeout budget.
1696                // In the full implementation, the Cx would be derived from the connection region.
1697                // For now, we use for_testing_with_budget to apply the timeout.
1698                let request_cx = Cx::for_testing_with_budget(request_budget);
1699                let ctx = RequestContext::new(request_cx, request_id);
1700
1701                // Handle the connection and release the slot when done.
1702                let result = self
1703                    .handle_connection(&ctx, stream, peer_addr, &*handler)
1704                    .await;
1705
1706                // Release connection slot (always, regardless of success/failure)
1707                self.release_connection();
1708
1709                if let Err(e) = result {
1710                    cx.trace(&format!("Connection error from {peer_addr}: {e}"));
1711                }
1712            }
1713        }
1714    }
1715
1716    /// Spawns a connection handler as a separate task.
1717    ///
1718    /// This is used when the `concurrent` feature is enabled to handle
1719    /// connections concurrently without blocking the accept loop.
1720    ///
1721    /// When asupersync has an accessible spawn API from Cx, this should be
1722    /// migrated to use that for proper structured concurrency.
1723    #[cfg(feature = "concurrent")]
1724    fn spawn_connection_handler<H, Fut>(
1725        &self,
1726        server_cx: Cx,
1727        stream: TcpStream,
1728        peer_addr: SocketAddr,
1729        handler: Arc<H>,
1730    ) where
1731        H: Fn(RequestContext, &mut Request) -> Fut + Send + Sync + 'static,
1732        Fut: Future<Output = Response> + Send + 'static,
1733    {
1734        // Clone values needed for the spawned task
1735        let config = self.config.clone();
1736        let request_counter = Arc::clone(&self.request_counter);
1737        let connection_counter = Arc::clone(&self.connection_counter);
1738
1739        // Spawn the connection handler
1740        // Note: Using tokio::spawn as a transitional solution.
1741        // When asupersync's Scope::spawn is accessible from Cx, migrate to that.
1742        tokio::spawn(async move {
1743            let result = process_connection(
1744                &server_cx,
1745                &request_counter,
1746                stream,
1747                peer_addr,
1748                &config,
1749                |ctx, req| handler(ctx, req),
1750            )
1751            .await;
1752
1753            // Release connection slot (always, regardless of success/failure)
1754            connection_counter.fetch_sub(1, Ordering::Relaxed);
1755
1756            if let Err(e) = result {
1757                server_cx.trace(&format!("Connection error from {peer_addr}: {e}"));
1758            }
1759        });
1760    }
1761
1762    /// Handles a single connection.
1763    ///
1764    /// This reads requests from the connection, passes them to the handler,
1765    /// and sends responses. For HTTP/1.1, it handles keep-alive by processing
1766    /// multiple requests on the same connection.
1767    async fn handle_connection<H, Fut>(
1768        &self,
1769        ctx: &RequestContext,
1770        stream: TcpStream,
1771        peer_addr: SocketAddr,
1772        handler: &H,
1773    ) -> Result<(), ServerError>
1774    where
1775        H: Fn(RequestContext, &mut Request) -> Fut + Send + Sync,
1776        Fut: Future<Output = Response> + Send,
1777    {
1778        process_connection(
1779            ctx.cx(),
1780            &self.request_counter,
1781            stream,
1782            peer_addr,
1783            &self.config,
1784            |ctx, req| handler(ctx, req),
1785        )
1786        .await
1787    }
1788}
1789
1790/// Snapshot of server metrics at a point in time.
1791///
1792/// Returned by [`TcpServer::metrics()`]. All counters are monotonically
1793/// increasing except `active_connections` which reflects the current gauge.
1794#[derive(Debug, Clone, PartialEq, Eq)]
1795pub struct ServerMetrics {
1796    /// Current number of active (in-flight) connections.
1797    pub active_connections: u64,
1798    /// Total connections accepted since server start.
1799    pub total_accepted: u64,
1800    /// Total connections rejected due to connection limit.
1801    pub total_rejected: u64,
1802    /// Total requests that timed out.
1803    pub total_timed_out: u64,
1804    /// Total requests served since server start.
1805    pub total_requests: u64,
1806    /// Total bytes read from clients.
1807    pub bytes_in: u64,
1808    /// Total bytes written to clients.
1809    pub bytes_out: u64,
1810}
1811
1812/// Atomic counters backing [`ServerMetrics`].
1813///
1814/// These live inside `TcpServer` and are updated as connections are
1815/// accepted, rejected, or timed out.
1816#[derive(Debug)]
1817struct MetricsCounters {
1818    total_accepted: AtomicU64,
1819    total_rejected: AtomicU64,
1820    total_timed_out: AtomicU64,
1821    bytes_in: AtomicU64,
1822    bytes_out: AtomicU64,
1823}
1824
1825impl MetricsCounters {
1826    fn new() -> Self {
1827        Self {
1828            total_accepted: AtomicU64::new(0),
1829            total_rejected: AtomicU64::new(0),
1830            total_timed_out: AtomicU64::new(0),
1831            bytes_in: AtomicU64::new(0),
1832            bytes_out: AtomicU64::new(0),
1833        }
1834    }
1835}
1836
1837impl Default for TcpServer {
1838    fn default() -> Self {
1839        Self::new(ServerConfig::default())
1840    }
1841}
1842
1843/// Returns true if the accept error is fatal (should stop the server).
1844fn is_fatal_accept_error(e: &io::Error) -> bool {
1845    // These errors indicate the listener itself is broken.
1846    matches!(
1847        e.kind(),
1848        io::ErrorKind::NotConnected | io::ErrorKind::InvalidInput
1849    )
1850}
1851
1852/// Reads data from a TCP stream into a buffer.
1853///
1854/// Returns the number of bytes read, or 0 if the connection was closed.
1855async fn read_into_buffer(stream: &mut TcpStream, buffer: &mut [u8]) -> io::Result<usize> {
1856    use std::future::poll_fn;
1857
1858    poll_fn(|cx| {
1859        let mut read_buf = ReadBuf::new(buffer);
1860        match Pin::new(&mut *stream).poll_read(cx, &mut read_buf) {
1861            Poll::Ready(Ok(())) => Poll::Ready(Ok(read_buf.filled().len())),
1862            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
1863            Poll::Pending => Poll::Pending,
1864        }
1865    })
1866    .await
1867}
1868
1869/// Reads data from a TCP stream with a timeout.
1870///
1871/// Uses asupersync's timer system for proper async timeout handling.
1872/// The timeout is implemented using asupersync's `timeout` future wrapper,
1873/// which properly integrates with the async runtime's timer driver.
1874///
1875/// # Arguments
1876///
1877/// * `stream` - The TCP stream to read from
1878/// * `buffer` - The buffer to read into
1879/// * `timeout_duration` - Maximum time to wait for data
1880///
1881/// # Returns
1882///
1883/// * `Ok(n)` - Number of bytes read (0 means connection closed)
1884/// * `Err(TimedOut)` - Timeout expired with no data
1885/// * `Err(other)` - IO error from the underlying stream
1886async fn read_with_timeout(
1887    stream: &mut TcpStream,
1888    buffer: &mut [u8],
1889    timeout_duration: Duration,
1890) -> io::Result<usize> {
1891    // Get current time for the timeout calculation
1892    let now = current_time();
1893
1894    // Create the read future - we need to box it for Unpin
1895    let read_future = Box::pin(read_into_buffer(stream, buffer));
1896
1897    // Wrap with asupersync timeout
1898    match timeout(now, timeout_duration, read_future).await {
1899        Ok(result) => result,
1900        Err(_elapsed) => Err(io::Error::new(
1901            io::ErrorKind::TimedOut,
1902            "keep-alive timeout expired",
1903        )),
1904    }
1905}
1906
1907/// Writes raw bytes to a TCP stream (e.g., for 100 Continue response).
1908///
1909/// This writes the bytes directly without any HTTP formatting.
1910async fn write_raw_response(stream: &mut TcpStream, bytes: &[u8]) -> io::Result<()> {
1911    use std::future::poll_fn;
1912    write_all(stream, bytes).await?;
1913    poll_fn(|cx| Pin::new(&mut *stream).poll_flush(cx)).await?;
1914    Ok(())
1915}
1916
1917/// Writes a response to a TCP stream.
1918///
1919/// Handles both full (buffered) and streaming (chunked) responses.
1920async fn write_response(stream: &mut TcpStream, response: ResponseWrite) -> io::Result<()> {
1921    use std::future::poll_fn;
1922
1923    match response {
1924        ResponseWrite::Full(bytes) => {
1925            write_all(stream, &bytes).await?;
1926        }
1927        ResponseWrite::Stream(mut encoder) => {
1928            // Write chunks as they become available
1929            loop {
1930                let chunk = poll_fn(|cx| Pin::new(&mut encoder).poll_next(cx)).await;
1931                match chunk {
1932                    Some(bytes) => {
1933                        write_all(stream, &bytes).await?;
1934                    }
1935                    None => break,
1936                }
1937            }
1938        }
1939    }
1940
1941    // Flush the stream
1942    poll_fn(|cx| Pin::new(&mut *stream).poll_flush(cx)).await?;
1943
1944    Ok(())
1945}
1946
1947/// Writes all bytes to a stream.
1948async fn write_all(stream: &mut TcpStream, mut buf: &[u8]) -> io::Result<()> {
1949    use std::future::poll_fn;
1950
1951    while !buf.is_empty() {
1952        let n = poll_fn(|cx| Pin::new(&mut *stream).poll_write(cx, buf)).await?;
1953        if n == 0 {
1954            return Err(io::Error::new(
1955                io::ErrorKind::WriteZero,
1956                "failed to write whole buffer",
1957            ));
1958        }
1959        buf = &buf[n..];
1960    }
1961    Ok(())
1962}
1963
1964// Connection header handling moved to crate::connection module
1965
1966// ============================================================================
1967// Synchronous Server (for compatibility)
1968// ============================================================================
1969
1970/// Synchronous HTTP server for request/response conversion.
1971///
1972/// This is a simpler, non-async server that just provides parsing and
1973/// serialization utilities. It's useful for testing or when you don't
1974/// need full async TCP handling.
1975pub struct Server {
1976    parser: Parser,
1977}
1978
1979impl Server {
1980    /// Create a new server.
1981    #[must_use]
1982    pub fn new() -> Self {
1983        Self {
1984            parser: Parser::new(),
1985        }
1986    }
1987
1988    /// Parse a request from bytes.
1989    ///
1990    /// # Errors
1991    ///
1992    /// Returns an error if the request is malformed.
1993    pub fn parse_request(&self, bytes: &[u8]) -> Result<Request, ParseError> {
1994        self.parser.parse(bytes)
1995    }
1996
1997    /// Write a response to bytes.
1998    #[must_use]
1999    pub fn write_response(&self, response: Response) -> ResponseWrite {
2000        let mut writer = ResponseWriter::new();
2001        writer.write(response)
2002    }
2003}
2004
2005impl Default for Server {
2006    fn default() -> Self {
2007        Self::new()
2008    }
2009}
2010
2011#[cfg(test)]
2012mod tests {
2013    use super::*;
2014
2015    #[test]
2016    fn server_config_builder() {
2017        let config = ServerConfig::new("0.0.0.0:3000")
2018            .with_request_timeout_secs(60)
2019            .with_max_connections(1000)
2020            .with_tcp_nodelay(false)
2021            .with_allowed_hosts(["example.com", "api.example.com"])
2022            .with_trust_x_forwarded_host(true);
2023
2024        assert_eq!(config.bind_addr, "0.0.0.0:3000");
2025        assert_eq!(config.request_timeout, Time::from_secs(60));
2026        assert_eq!(config.max_connections, 1000);
2027        assert!(!config.tcp_nodelay);
2028        assert_eq!(config.allowed_hosts.len(), 2);
2029        assert!(config.trust_x_forwarded_host);
2030    }
2031
2032    #[test]
2033    fn server_config_defaults() {
2034        let config = ServerConfig::default();
2035        assert_eq!(config.bind_addr, "127.0.0.1:8080");
2036        assert_eq!(
2037            config.request_timeout,
2038            Time::from_secs(DEFAULT_REQUEST_TIMEOUT_SECS)
2039        );
2040        assert_eq!(config.max_connections, DEFAULT_MAX_CONNECTIONS);
2041        assert!(config.tcp_nodelay);
2042        assert!(config.allowed_hosts.is_empty());
2043        assert!(!config.trust_x_forwarded_host);
2044    }
2045
2046    #[test]
2047    fn tcp_server_creates_request_ids() {
2048        let server = TcpServer::default();
2049        let id1 = server.next_request_id();
2050        let id2 = server.next_request_id();
2051        let id3 = server.next_request_id();
2052
2053        assert_eq!(id1, 0);
2054        assert_eq!(id2, 1);
2055        assert_eq!(id3, 2);
2056    }
2057
2058    #[test]
2059    fn server_error_display() {
2060        let io_err = ServerError::Io(io::Error::new(io::ErrorKind::AddrInUse, "address in use"));
2061        assert!(io_err.to_string().contains("IO error"));
2062
2063        let shutdown_err = ServerError::Shutdown;
2064        assert_eq!(shutdown_err.to_string(), "Server shutdown");
2065
2066        let limit_err = ServerError::ConnectionLimitReached;
2067        assert_eq!(limit_err.to_string(), "Connection limit reached");
2068    }
2069
2070    #[test]
2071    fn sync_server_parses_request() {
2072        let server = Server::new();
2073        let request = b"GET /hello HTTP/1.1\r\nHost: localhost\r\n\r\n";
2074        let result = server.parse_request(request);
2075        assert!(result.is_ok());
2076    }
2077
2078    // ========================================================================
2079    // Host header validation tests
2080    // ========================================================================
2081
2082    #[test]
2083    fn host_validation_missing_host_rejected() {
2084        let config = ServerConfig::default();
2085        let request = Request::new(fastapi_core::Method::Get, "/");
2086        let err = validate_host_header(&request, &config).unwrap_err();
2087        assert_eq!(err.kind, HostValidationErrorKind::Missing);
2088        assert_eq!(err.response().status().as_u16(), 400);
2089    }
2090
2091    #[test]
2092    fn host_validation_allows_configured_host() {
2093        let config = ServerConfig::default().with_allowed_hosts(["example.com"]);
2094        let mut request = Request::new(fastapi_core::Method::Get, "/");
2095        request
2096            .headers_mut()
2097            .insert("Host".to_string(), b"example.com".to_vec());
2098        assert!(validate_host_header(&request, &config).is_ok());
2099    }
2100
2101    #[test]
2102    fn host_validation_rejects_disallowed_host() {
2103        let config = ServerConfig::default().with_allowed_hosts(["example.com"]);
2104        let mut request = Request::new(fastapi_core::Method::Get, "/");
2105        request
2106            .headers_mut()
2107            .insert("Host".to_string(), b"evil.com".to_vec());
2108        let err = validate_host_header(&request, &config).unwrap_err();
2109        assert_eq!(err.kind, HostValidationErrorKind::NotAllowed);
2110    }
2111
2112    #[test]
2113    fn host_validation_wildcard_allows_subdomains_only() {
2114        let config = ServerConfig::default().with_allowed_hosts(["*.example.com"]);
2115        let mut request = Request::new(fastapi_core::Method::Get, "/");
2116        request
2117            .headers_mut()
2118            .insert("Host".to_string(), b"api.example.com".to_vec());
2119        assert!(validate_host_header(&request, &config).is_ok());
2120
2121        let mut request = Request::new(fastapi_core::Method::Get, "/");
2122        request
2123            .headers_mut()
2124            .insert("Host".to_string(), b"example.com".to_vec());
2125        let err = validate_host_header(&request, &config).unwrap_err();
2126        assert_eq!(err.kind, HostValidationErrorKind::NotAllowed);
2127    }
2128
2129    #[test]
2130    fn host_validation_uses_x_forwarded_host_when_trusted() {
2131        let config = ServerConfig::default()
2132            .with_allowed_hosts(["example.com"])
2133            .with_trust_x_forwarded_host(true);
2134        let mut request = Request::new(fastapi_core::Method::Get, "/");
2135        request
2136            .headers_mut()
2137            .insert("Host".to_string(), b"internal.local".to_vec());
2138        request
2139            .headers_mut()
2140            .insert("X-Forwarded-Host".to_string(), b"example.com".to_vec());
2141        assert!(validate_host_header(&request, &config).is_ok());
2142    }
2143
2144    #[test]
2145    fn host_validation_rejects_invalid_host_value() {
2146        let config = ServerConfig::default();
2147        let mut request = Request::new(fastapi_core::Method::Get, "/");
2148        request
2149            .headers_mut()
2150            .insert("Host".to_string(), b"bad host".to_vec());
2151        let err = validate_host_header(&request, &config).unwrap_err();
2152        assert_eq!(err.kind, HostValidationErrorKind::Invalid);
2153    }
2154
2155    // ========================================================================
2156    // Keep-alive detection tests
2157    // ========================================================================
2158
2159    #[test]
2160    fn keep_alive_default_http11() {
2161        // HTTP/1.1 defaults to keep-alive
2162        let mut request = Request::new(fastapi_core::Method::Get, "/path".to_string());
2163        request
2164            .headers_mut()
2165            .insert("Host".to_string(), b"example.com".to_vec());
2166        assert!(should_keep_alive(&request));
2167    }
2168
2169    #[test]
2170    fn keep_alive_explicit_keep_alive() {
2171        let mut request = Request::new(fastapi_core::Method::Get, "/path".to_string());
2172        request
2173            .headers_mut()
2174            .insert("Connection".to_string(), b"keep-alive".to_vec());
2175        assert!(should_keep_alive(&request));
2176    }
2177
2178    #[test]
2179    fn keep_alive_connection_close() {
2180        let mut request = Request::new(fastapi_core::Method::Get, "/path".to_string());
2181        request
2182            .headers_mut()
2183            .insert("Connection".to_string(), b"close".to_vec());
2184        assert!(!should_keep_alive(&request));
2185    }
2186
2187    #[test]
2188    fn keep_alive_connection_close_case_insensitive() {
2189        let mut request = Request::new(fastapi_core::Method::Get, "/path".to_string());
2190        request
2191            .headers_mut()
2192            .insert("Connection".to_string(), b"CLOSE".to_vec());
2193        assert!(!should_keep_alive(&request));
2194    }
2195
2196    #[test]
2197    fn keep_alive_multiple_values() {
2198        let mut request = Request::new(fastapi_core::Method::Get, "/path".to_string());
2199        request
2200            .headers_mut()
2201            .insert("Connection".to_string(), b"keep-alive, upgrade".to_vec());
2202        assert!(should_keep_alive(&request));
2203    }
2204
2205    // ========================================================================
2206    // Timeout behavior tests
2207    // ========================================================================
2208
2209    #[test]
2210    fn timeout_budget_created_with_config_deadline() {
2211        let config = ServerConfig::new("127.0.0.1:8080").with_request_timeout_secs(45);
2212        let budget = Budget::new().with_deadline(config.request_timeout);
2213        assert_eq!(budget.deadline, Some(Time::from_secs(45)));
2214    }
2215
2216    #[test]
2217    fn timeout_duration_conversion_from_time() {
2218        let timeout = Time::from_secs(30);
2219        let duration = Duration::from_nanos(timeout.as_nanos());
2220        assert_eq!(duration, Duration::from_secs(30));
2221    }
2222
2223    #[test]
2224    fn timeout_duration_conversion_from_time_millis() {
2225        let timeout = Time::from_millis(1500);
2226        let duration = Duration::from_nanos(timeout.as_nanos());
2227        assert_eq!(duration, Duration::from_millis(1500));
2228    }
2229
2230    #[test]
2231    fn gateway_timeout_response_has_correct_status() {
2232        let response = Response::with_status(StatusCode::GATEWAY_TIMEOUT);
2233        assert_eq!(response.status().as_u16(), 504);
2234    }
2235
2236    #[test]
2237    fn gateway_timeout_response_with_body() {
2238        let response = Response::with_status(StatusCode::GATEWAY_TIMEOUT).body(
2239            fastapi_core::ResponseBody::Bytes(b"Request timed out".to_vec()),
2240        );
2241        assert_eq!(response.status().as_u16(), 504);
2242        // Verify body is set (not empty)
2243        assert!(response.body_ref().len() > 0);
2244    }
2245
2246    #[test]
2247    fn elapsed_time_check_logic() {
2248        // Test the timeout check logic in isolation
2249        let start = Instant::now();
2250        let timeout_duration = Duration::from_millis(10);
2251
2252        // Immediately after start, should not be timed out
2253        assert!(start.elapsed() <= timeout_duration);
2254
2255        // Wait a bit longer than the timeout
2256        std::thread::sleep(Duration::from_millis(20));
2257
2258        // Now should be timed out
2259        assert!(start.elapsed() > timeout_duration);
2260    }
2261
2262    // ========================================================================
2263    // Connection limit tests
2264    // ========================================================================
2265
2266    #[test]
2267    fn connection_counter_starts_at_zero() {
2268        let server = TcpServer::default();
2269        assert_eq!(server.current_connections(), 0);
2270    }
2271
2272    #[test]
2273    fn try_acquire_connection_unlimited() {
2274        // With max_connections = 0 (unlimited), should always succeed
2275        let server = TcpServer::default();
2276        assert_eq!(server.config().max_connections, 0);
2277
2278        // Acquire several connections
2279        for _ in 0..100 {
2280            assert!(server.try_acquire_connection());
2281        }
2282        assert_eq!(server.current_connections(), 100);
2283
2284        // Release them all
2285        for _ in 0..100 {
2286            server.release_connection();
2287        }
2288        assert_eq!(server.current_connections(), 0);
2289    }
2290
2291    #[test]
2292    fn try_acquire_connection_with_limit() {
2293        let config = ServerConfig::new("127.0.0.1:8080").with_max_connections(5);
2294        let server = TcpServer::new(config);
2295
2296        // Acquire up to the limit
2297        for i in 0..5 {
2298            assert!(
2299                server.try_acquire_connection(),
2300                "Should acquire connection {i}"
2301            );
2302        }
2303        assert_eq!(server.current_connections(), 5);
2304
2305        // Next one should fail
2306        assert!(!server.try_acquire_connection());
2307        assert_eq!(server.current_connections(), 5);
2308
2309        // Release one
2310        server.release_connection();
2311        assert_eq!(server.current_connections(), 4);
2312
2313        // Now we can acquire one more
2314        assert!(server.try_acquire_connection());
2315        assert_eq!(server.current_connections(), 5);
2316    }
2317
2318    #[test]
2319    fn try_acquire_connection_single_connection_limit() {
2320        let config = ServerConfig::new("127.0.0.1:8080").with_max_connections(1);
2321        let server = TcpServer::new(config);
2322
2323        // First acquire succeeds
2324        assert!(server.try_acquire_connection());
2325        assert_eq!(server.current_connections(), 1);
2326
2327        // Second fails
2328        assert!(!server.try_acquire_connection());
2329        assert_eq!(server.current_connections(), 1);
2330
2331        // After release, can acquire again
2332        server.release_connection();
2333        assert!(server.try_acquire_connection());
2334    }
2335
2336    #[test]
2337    fn service_unavailable_response_has_correct_status() {
2338        let response = Response::with_status(StatusCode::SERVICE_UNAVAILABLE);
2339        assert_eq!(response.status().as_u16(), 503);
2340    }
2341
2342    #[test]
2343    fn service_unavailable_response_with_body() {
2344        let response = Response::with_status(StatusCode::SERVICE_UNAVAILABLE)
2345            .header("connection", b"close".to_vec())
2346            .body(fastapi_core::ResponseBody::Bytes(
2347                b"503 Service Unavailable: connection limit reached".to_vec(),
2348            ));
2349        assert_eq!(response.status().as_u16(), 503);
2350        assert!(response.body_ref().len() > 0);
2351    }
2352
2353    #[test]
2354    fn config_max_connections_default_is_zero() {
2355        let config = ServerConfig::default();
2356        assert_eq!(config.max_connections, 0);
2357    }
2358
2359    #[test]
2360    fn config_max_connections_can_be_set() {
2361        let config = ServerConfig::new("127.0.0.1:8080").with_max_connections(100);
2362        assert_eq!(config.max_connections, 100);
2363    }
2364
2365    // ========================================================================
2366    // Keep-alive configuration tests
2367    // ========================================================================
2368
2369    #[test]
2370    fn config_keep_alive_timeout_default() {
2371        let config = ServerConfig::default();
2372        assert_eq!(
2373            config.keep_alive_timeout,
2374            Duration::from_secs(DEFAULT_KEEP_ALIVE_TIMEOUT_SECS)
2375        );
2376    }
2377
2378    #[test]
2379    fn config_keep_alive_timeout_can_be_set() {
2380        let config =
2381            ServerConfig::new("127.0.0.1:8080").with_keep_alive_timeout(Duration::from_secs(120));
2382        assert_eq!(config.keep_alive_timeout, Duration::from_secs(120));
2383    }
2384
2385    #[test]
2386    fn config_keep_alive_timeout_can_be_set_secs() {
2387        let config = ServerConfig::new("127.0.0.1:8080").with_keep_alive_timeout_secs(90);
2388        assert_eq!(config.keep_alive_timeout, Duration::from_secs(90));
2389    }
2390
2391    #[test]
2392    fn config_max_requests_per_connection_default() {
2393        let config = ServerConfig::default();
2394        assert_eq!(
2395            config.max_requests_per_connection,
2396            DEFAULT_MAX_REQUESTS_PER_CONNECTION
2397        );
2398    }
2399
2400    #[test]
2401    fn config_max_requests_per_connection_can_be_set() {
2402        let config = ServerConfig::new("127.0.0.1:8080").with_max_requests_per_connection(50);
2403        assert_eq!(config.max_requests_per_connection, 50);
2404    }
2405
2406    #[test]
2407    fn config_max_requests_per_connection_unlimited() {
2408        let config = ServerConfig::new("127.0.0.1:8080").with_max_requests_per_connection(0);
2409        assert_eq!(config.max_requests_per_connection, 0);
2410    }
2411
2412    #[test]
2413    fn response_with_keep_alive_header() {
2414        let response = Response::ok().header("connection", b"keep-alive".to_vec());
2415        let headers = response.headers();
2416        let connection_header = headers
2417            .iter()
2418            .find(|(name, _)| name.eq_ignore_ascii_case("connection"));
2419        assert!(connection_header.is_some());
2420        assert_eq!(connection_header.unwrap().1, b"keep-alive");
2421    }
2422
2423    #[test]
2424    fn response_with_close_header() {
2425        let response = Response::ok().header("connection", b"close".to_vec());
2426        let headers = response.headers();
2427        let connection_header = headers
2428            .iter()
2429            .find(|(name, _)| name.eq_ignore_ascii_case("connection"));
2430        assert!(connection_header.is_some());
2431        assert_eq!(connection_header.unwrap().1, b"close");
2432    }
2433
2434    // ========================================================================
2435    // Connection draining tests
2436    // ========================================================================
2437
2438    #[test]
2439    fn config_drain_timeout_default() {
2440        let config = ServerConfig::default();
2441        assert_eq!(
2442            config.drain_timeout,
2443            Duration::from_secs(DEFAULT_DRAIN_TIMEOUT_SECS)
2444        );
2445    }
2446
2447    #[test]
2448    fn config_drain_timeout_can_be_set() {
2449        let config =
2450            ServerConfig::new("127.0.0.1:8080").with_drain_timeout(Duration::from_secs(60));
2451        assert_eq!(config.drain_timeout, Duration::from_secs(60));
2452    }
2453
2454    #[test]
2455    fn config_drain_timeout_can_be_set_secs() {
2456        let config = ServerConfig::new("127.0.0.1:8080").with_drain_timeout_secs(45);
2457        assert_eq!(config.drain_timeout, Duration::from_secs(45));
2458    }
2459
2460    #[test]
2461    fn server_not_draining_initially() {
2462        let server = TcpServer::default();
2463        assert!(!server.is_draining());
2464    }
2465
2466    #[test]
2467    fn server_start_drain_sets_flag() {
2468        let server = TcpServer::default();
2469        assert!(!server.is_draining());
2470        server.start_drain();
2471        assert!(server.is_draining());
2472    }
2473
2474    #[test]
2475    fn server_start_drain_idempotent() {
2476        let server = TcpServer::default();
2477        server.start_drain();
2478        assert!(server.is_draining());
2479        server.start_drain();
2480        assert!(server.is_draining());
2481    }
2482
2483    #[tokio::test]
2484    async fn wait_for_drain_returns_true_when_no_connections() {
2485        let server = TcpServer::default();
2486        assert_eq!(server.current_connections(), 0);
2487        let result = server
2488            .wait_for_drain(Duration::from_millis(100), Some(Duration::from_millis(1)))
2489            .await;
2490        assert!(result);
2491    }
2492
2493    #[tokio::test]
2494    async fn wait_for_drain_timeout_with_connections() {
2495        let server = TcpServer::default();
2496        // Simulate active connections
2497        server.try_acquire_connection();
2498        server.try_acquire_connection();
2499        assert_eq!(server.current_connections(), 2);
2500
2501        // Wait should timeout since connections won't drain on their own
2502        let result = server
2503            .wait_for_drain(Duration::from_millis(50), Some(Duration::from_millis(5)))
2504            .await;
2505        assert!(!result);
2506        assert_eq!(server.current_connections(), 2);
2507    }
2508
2509    #[tokio::test]
2510    async fn drain_returns_zero_when_no_connections() {
2511        let server = TcpServer::new(
2512            ServerConfig::new("127.0.0.1:8080").with_drain_timeout(Duration::from_millis(100)),
2513        );
2514        assert_eq!(server.current_connections(), 0);
2515        let remaining = server.drain().await;
2516        assert_eq!(remaining, 0);
2517        assert!(server.is_draining());
2518    }
2519
2520    #[tokio::test]
2521    async fn drain_returns_count_when_connections_remain() {
2522        let server = TcpServer::new(
2523            ServerConfig::new("127.0.0.1:8080").with_drain_timeout(Duration::from_millis(50)),
2524        );
2525        // Simulate active connections that won't drain
2526        server.try_acquire_connection();
2527        server.try_acquire_connection();
2528        server.try_acquire_connection();
2529
2530        let remaining = server.drain().await;
2531        assert_eq!(remaining, 3);
2532        assert!(server.is_draining());
2533    }
2534
2535    #[test]
2536    fn server_shutdown_error_display() {
2537        let err = ServerError::Shutdown;
2538        assert_eq!(err.to_string(), "Server shutdown");
2539    }
2540
2541    // ========================================================================
2542    // Graceful shutdown controller tests
2543    // ========================================================================
2544
2545    #[test]
2546    fn server_has_shutdown_controller() {
2547        let server = TcpServer::default();
2548        let controller = server.shutdown_controller();
2549        assert!(!controller.is_shutting_down());
2550    }
2551
2552    #[test]
2553    fn server_subscribe_shutdown_returns_receiver() {
2554        let server = TcpServer::default();
2555        let receiver = server.subscribe_shutdown();
2556        assert!(!receiver.is_shutting_down());
2557    }
2558
2559    #[test]
2560    fn server_shutdown_sets_draining_and_controller() {
2561        let server = TcpServer::default();
2562        assert!(!server.is_shutting_down());
2563        assert!(!server.is_draining());
2564        assert!(!server.shutdown_controller().is_shutting_down());
2565
2566        server.shutdown();
2567
2568        assert!(server.is_shutting_down());
2569        assert!(server.is_draining());
2570        assert!(server.shutdown_controller().is_shutting_down());
2571    }
2572
2573    #[test]
2574    fn server_shutdown_notifies_receivers() {
2575        let server = TcpServer::default();
2576        let receiver1 = server.subscribe_shutdown();
2577        let receiver2 = server.subscribe_shutdown();
2578
2579        assert!(!receiver1.is_shutting_down());
2580        assert!(!receiver2.is_shutting_down());
2581
2582        server.shutdown();
2583
2584        assert!(receiver1.is_shutting_down());
2585        assert!(receiver2.is_shutting_down());
2586    }
2587
2588    #[test]
2589    fn server_shutdown_is_idempotent() {
2590        let server = TcpServer::default();
2591        let receiver = server.subscribe_shutdown();
2592
2593        server.shutdown();
2594        server.shutdown();
2595        server.shutdown();
2596
2597        assert!(server.is_shutting_down());
2598        assert!(receiver.is_shutting_down());
2599    }
2600
2601    // ========================================================================
2602    // Keep-alive timeout tests
2603    // ========================================================================
2604
2605    #[test]
2606    fn keep_alive_timeout_error_display() {
2607        let err = ServerError::KeepAliveTimeout;
2608        assert_eq!(err.to_string(), "Keep-alive timeout");
2609    }
2610
2611    #[test]
2612    fn keep_alive_timeout_zero_disables_timeout() {
2613        let config = ServerConfig::new("127.0.0.1:8080").with_keep_alive_timeout(Duration::ZERO);
2614        assert!(config.keep_alive_timeout.is_zero());
2615    }
2616
2617    #[test]
2618    fn keep_alive_timeout_default_is_non_zero() {
2619        let config = ServerConfig::default();
2620        assert!(!config.keep_alive_timeout.is_zero());
2621        assert_eq!(
2622            config.keep_alive_timeout,
2623            Duration::from_secs(DEFAULT_KEEP_ALIVE_TIMEOUT_SECS)
2624        );
2625    }
2626
2627    #[test]
2628    fn timed_out_io_error_kind() {
2629        let err = io::Error::new(io::ErrorKind::TimedOut, "test timeout");
2630        assert_eq!(err.kind(), io::ErrorKind::TimedOut);
2631    }
2632
2633    #[test]
2634    fn instant_deadline_calculation() {
2635        let timeout = Duration::from_millis(100);
2636        let deadline = Instant::now() + timeout;
2637
2638        // Deadline should be in the future
2639        assert!(deadline > Instant::now());
2640
2641        // After waiting, deadline should be in the past
2642        std::thread::sleep(Duration::from_millis(150));
2643        assert!(Instant::now() >= deadline);
2644    }
2645
2646    #[test]
2647    fn server_metrics_initial_state() {
2648        let server = TcpServer::default();
2649        let m = server.metrics();
2650        assert_eq!(m.active_connections, 0);
2651        assert_eq!(m.total_accepted, 0);
2652        assert_eq!(m.total_rejected, 0);
2653        assert_eq!(m.total_timed_out, 0);
2654        assert_eq!(m.total_requests, 0);
2655        assert_eq!(m.bytes_in, 0);
2656        assert_eq!(m.bytes_out, 0);
2657    }
2658
2659    #[test]
2660    fn server_metrics_after_acquire_release() {
2661        let server = TcpServer::new(ServerConfig::new("127.0.0.1:0").with_max_connections(10));
2662        assert!(server.try_acquire_connection());
2663        assert!(server.try_acquire_connection());
2664
2665        let m = server.metrics();
2666        assert_eq!(m.active_connections, 2);
2667        assert_eq!(m.total_accepted, 2);
2668        assert_eq!(m.total_rejected, 0);
2669
2670        server.release_connection();
2671        let m = server.metrics();
2672        assert_eq!(m.active_connections, 1);
2673        assert_eq!(m.total_accepted, 2); // monotonic
2674    }
2675
2676    #[test]
2677    fn server_metrics_rejection_counted() {
2678        let server = TcpServer::new(ServerConfig::new("127.0.0.1:0").with_max_connections(1));
2679        assert!(server.try_acquire_connection());
2680        assert!(!server.try_acquire_connection()); // rejected
2681
2682        let m = server.metrics();
2683        assert_eq!(m.total_accepted, 1);
2684        assert_eq!(m.total_rejected, 1);
2685        assert_eq!(m.active_connections, 1);
2686    }
2687
2688    #[test]
2689    fn server_metrics_bytes_tracking() {
2690        let server = TcpServer::default();
2691        server.record_bytes_in(1024);
2692        server.record_bytes_in(512);
2693        server.record_bytes_out(2048);
2694
2695        let m = server.metrics();
2696        assert_eq!(m.bytes_in, 1536);
2697        assert_eq!(m.bytes_out, 2048);
2698    }
2699
2700    #[test]
2701    fn server_metrics_unlimited_connections_accepted() {
2702        let server = TcpServer::new(ServerConfig::new("127.0.0.1:0").with_max_connections(0));
2703        for _ in 0..100 {
2704            assert!(server.try_acquire_connection());
2705        }
2706        let m = server.metrics();
2707        assert_eq!(m.total_accepted, 100);
2708        assert_eq!(m.total_rejected, 0);
2709        assert_eq!(m.active_connections, 100);
2710    }
2711
2712    #[test]
2713    fn server_metrics_clone_eq() {
2714        let server = TcpServer::default();
2715        server.record_bytes_in(42);
2716        let m1 = server.metrics();
2717        let m2 = m1.clone();
2718        assert_eq!(m1, m2);
2719    }
2720}
2721
2722// ============================================================================
2723// App Serve Extension
2724// ============================================================================
2725
2726/// Extension trait to add serve capability to [`App`].
2727///
2728/// This trait provides the `serve()` method that wires an App to the HTTP server,
2729/// enabling it to handle incoming HTTP requests.
2730///
2731/// # Example
2732///
2733/// ```ignore
2734/// use fastapi::prelude::*;
2735/// use fastapi_http::AppServeExt;
2736///
2737/// let app = App::builder()
2738///     .get("/", |_, _| async { Response::ok().body_text("Hello!") })
2739///     .build();
2740///
2741/// // Run the server
2742/// app.serve("0.0.0.0:8080").await?;
2743/// ```
2744pub trait AppServeExt {
2745    /// Starts the HTTP server and begins accepting connections.
2746    ///
2747    /// This method:
2748    /// 1. Runs all registered startup hooks
2749    /// 2. Binds to the specified address
2750    /// 3. Accepts connections and routes requests to handlers
2751    /// 4. Runs shutdown hooks when the server stops
2752    ///
2753    /// # Arguments
2754    ///
2755    /// * `addr` - The address to bind to (e.g., "0.0.0.0:8080" or "127.0.0.1:3000")
2756    ///
2757    /// # Errors
2758    ///
2759    /// Returns an error if:
2760    /// - A startup hook fails with `abort: true`
2761    /// - Binding to the address fails
2762    /// - An unrecoverable IO error occurs
2763    ///
2764    /// # Example
2765    ///
2766    /// ```ignore
2767    /// use fastapi::prelude::*;
2768    /// use fastapi_http::AppServeExt;
2769    ///
2770    /// #[tokio::main]
2771    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
2772    ///     let app = App::builder()
2773    ///         .get("/health", |_, _| async { Response::ok() })
2774    ///         .build();
2775    ///
2776    ///     app.serve("0.0.0.0:8080").await?;
2777    ///     Ok(())
2778    /// }
2779    /// ```
2780    fn serve(self, addr: impl Into<String>) -> impl Future<Output = Result<(), ServeError>> + Send;
2781
2782    /// Starts the HTTP server with custom configuration.
2783    ///
2784    /// This method allows fine-grained control over server behavior including
2785    /// timeouts, connection limits, and keep-alive settings.
2786    ///
2787    /// # Arguments
2788    ///
2789    /// * `config` - Server configuration options
2790    ///
2791    /// # Example
2792    ///
2793    /// ```ignore
2794    /// use fastapi::prelude::*;
2795    /// use fastapi_http::{AppServeExt, ServerConfig};
2796    ///
2797    /// let config = ServerConfig::new("0.0.0.0:8080")
2798    ///     .with_request_timeout_secs(60)
2799    ///     .with_max_connections(1000)
2800    ///     .with_keep_alive_timeout_secs(120);
2801    ///
2802    /// app.serve_with_config(config).await?;
2803    /// ```
2804    fn serve_with_config(
2805        self,
2806        config: ServerConfig,
2807    ) -> impl Future<Output = Result<(), ServeError>> + Send;
2808}
2809
2810/// Error returned when starting or running the server fails.
2811#[derive(Debug)]
2812pub enum ServeError {
2813    /// A startup hook failed with abort.
2814    Startup(fastapi_core::StartupHookError),
2815    /// Server error during operation.
2816    Server(ServerError),
2817}
2818
2819impl std::fmt::Display for ServeError {
2820    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2821        match self {
2822            Self::Startup(e) => write!(f, "startup hook failed: {}", e.message),
2823            Self::Server(e) => write!(f, "server error: {e}"),
2824        }
2825    }
2826}
2827
2828impl std::error::Error for ServeError {
2829    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
2830        match self {
2831            Self::Startup(_) => None,
2832            Self::Server(e) => Some(e),
2833        }
2834    }
2835}
2836
2837impl From<ServerError> for ServeError {
2838    fn from(e: ServerError) -> Self {
2839        Self::Server(e)
2840    }
2841}
2842
2843impl AppServeExt for App {
2844    fn serve(self, addr: impl Into<String>) -> impl Future<Output = Result<(), ServeError>> + Send {
2845        let config = ServerConfig::new(addr);
2846        self.serve_with_config(config)
2847    }
2848
2849    #[allow(clippy::manual_async_fn)] // Using impl Future for trait compatibility
2850    fn serve_with_config(
2851        self,
2852        config: ServerConfig,
2853    ) -> impl Future<Output = Result<(), ServeError>> + Send {
2854        async move {
2855            // Run startup hooks
2856            match self.run_startup_hooks().await {
2857                fastapi_core::StartupOutcome::Success => {}
2858                fastapi_core::StartupOutcome::PartialSuccess { warnings } => {
2859                    // Log warnings but continue (non-fatal)
2860                    eprintln!("Warning: {warnings} startup hook(s) had non-fatal errors");
2861                }
2862                fastapi_core::StartupOutcome::Aborted(e) => {
2863                    return Err(ServeError::Startup(e));
2864                }
2865            }
2866
2867            // Create the TCP server
2868            let server = TcpServer::new(config);
2869
2870            // Wrap app in Arc for sharing with handler
2871            // App implements Handler trait, so we can use serve_handler
2872            let app = Arc::new(self);
2873            let handler: Arc<dyn fastapi_core::Handler> =
2874                Arc::clone(&app) as Arc<dyn fastapi_core::Handler>;
2875
2876            // Create a root Cx for the server
2877            let cx = Cx::for_testing();
2878
2879            // Print startup banner
2880            let bind_addr = &server.config().bind_addr;
2881            println!("🚀 Server starting on http://{bind_addr}");
2882
2883            // Run the server using the Handler-based serve method
2884            let result = server.serve_handler(&cx, handler).await;
2885
2886            // Run shutdown hooks (use the original Arc<App>)
2887            app.run_shutdown_hooks().await;
2888
2889            result.map_err(ServeError::from)
2890        }
2891    }
2892}
2893
2894/// Convenience function to serve an App on the given address.
2895///
2896/// This is equivalent to calling `app.serve(addr)` but can be more
2897/// ergonomic in some contexts.
2898///
2899/// # Example
2900///
2901/// ```ignore
2902/// use fastapi::prelude::*;
2903/// use fastapi_http::serve;
2904///
2905/// let app = App::builder()
2906///     .get("/", |_, _| async { Response::ok() })
2907///     .build();
2908///
2909/// serve(app, "0.0.0.0:8080").await?;
2910/// ```
2911pub async fn serve(app: App, addr: impl Into<String>) -> Result<(), ServeError> {
2912    app.serve(addr).await
2913}
2914
2915/// Convenience function to serve an App with custom configuration.
2916///
2917/// # Example
2918///
2919/// ```ignore
2920/// use fastapi::prelude::*;
2921/// use fastapi_http::{serve_with_config, ServerConfig};
2922///
2923/// let app = App::builder()
2924///     .get("/", |_, _| async { Response::ok() })
2925///     .build();
2926///
2927/// let config = ServerConfig::new("0.0.0.0:8080")
2928///     .with_max_connections(500);
2929///
2930/// serve_with_config(app, config).await?;
2931/// ```
2932pub async fn serve_with_config(app: App, config: ServerConfig) -> Result<(), ServeError> {
2933    app.serve_with_config(config).await
2934}