Skip to main content

edge_http/io/
server.rs

1use core::fmt::{self, Debug, Display};
2use core::mem::{self, MaybeUninit};
3use core::pin::pin;
4
5use edge_nal::{
6    with_timeout, Close, Readable, TcpShutdown, TcpSplit, WithTimeout, WithTimeoutError,
7};
8
9use embedded_io_async::{ErrorType, Read, Write};
10
11use super::{send_headers, send_status, Body, Error, RequestHeaders, SendBody};
12
13use crate::ws::{upgrade_response_headers, MAX_BASE64_KEY_RESPONSE_LEN};
14use crate::{ConnectionType, DEFAULT_MAX_HEADERS_COUNT};
15
16pub const DEFAULT_HANDLER_TASKS_COUNT: usize = 4;
17pub const DEFAULT_BUF_SIZE: usize = 2048;
18
19const COMPLETION_BUF_SIZE: usize = 64;
20
21/// A connection state machine for handling HTTP server requests-response cycles.
22#[allow(private_interfaces)]
23pub enum Connection<'b, T, const N: usize = DEFAULT_MAX_HEADERS_COUNT> {
24    Transition(TransitionState),
25    Unbound(T),
26    Request(RequestState<'b, T, N>),
27    Response(ResponseState<T>),
28}
29
30impl<'b, T, const N: usize> Connection<'b, T, N>
31where
32    T: Read + Write,
33{
34    /// Create a new connection state machine for an incoming request
35    ///
36    /// Note that the connection does not have any built-in read/write timeouts:
37    /// - To add a timeout on each IO operation, wrap the `io` type with the `edge_nal::WithTimeout` wrapper.
38    /// - To add a global request-response timeout, wrap your complete request-response processing
39    ///   logic with the `edge_nal::with_timeout` function.
40    ///
41    /// Parameters:
42    /// - `buf`: A buffer to store the request headers
43    /// - `io`: A socket stream
44    pub async fn new(
45        buf: &'b mut [u8],
46        mut io: T,
47    ) -> Result<Connection<'b, T, N>, Error<T::Error>> {
48        let mut request = RequestHeaders::new();
49
50        let (buf, read_len) = request.receive(buf, &mut io, true).await?;
51
52        let (connection_type, body_type) = request.resolve::<T::Error>()?;
53
54        let io = Body::new(body_type, buf, read_len, io);
55
56        Ok(Self::Request(RequestState {
57            request,
58            io,
59            connection_type,
60        }))
61    }
62
63    /// Return `true` of the connection is in request state (i.e. the initial state upon calling `new`)
64    pub fn is_request_initiated(&self) -> bool {
65        matches!(self, Self::Request(_))
66    }
67
68    /// Split the connection into request headers and body
69    pub fn split(&mut self) -> (&RequestHeaders<'b, N>, &mut Body<'b, T>) {
70        let req = self.request_mut().expect("Not in request mode");
71
72        (&req.request, &mut req.io)
73    }
74
75    /// Return a reference to the request headers
76    pub fn headers(&self) -> Result<&RequestHeaders<'b, N>, Error<T::Error>> {
77        Ok(&self.request_ref()?.request)
78    }
79
80    /// Return `true` if the request is a WebSocket upgrade request
81    pub fn is_ws_upgrade_request(&self) -> Result<bool, Error<T::Error>> {
82        Ok(self.headers()?.is_ws_upgrade_request())
83    }
84
85    /// Switch the connection into a response state
86    ///
87    /// Parameters:
88    /// - `status`: The HTTP status code
89    /// - `message`: An optional HTTP status message
90    /// - `headers`: An array of HTTP response headers.
91    ///   Note that if no `Content-Length` or `Transfer-Encoding` headers are provided,
92    ///   the body will be send with chunked encoding (for HTTP1.1 only and if the connection is not Close)
93    pub async fn initiate_response(
94        &mut self,
95        status: u16,
96        message: Option<&str>,
97        headers: &[(&str, &str)],
98    ) -> Result<(), Error<T::Error>> {
99        self.complete_request(status, message, headers).await
100    }
101
102    /// A convenience method to initiate a WebSocket upgrade response
103    pub async fn initiate_ws_upgrade_response(
104        &mut self,
105        buf: &mut [u8; MAX_BASE64_KEY_RESPONSE_LEN],
106    ) -> Result<(), Error<T::Error>> {
107        let headers = upgrade_response_headers(self.headers()?.headers.iter(), None, buf)?;
108
109        self.initiate_response(101, None, &headers).await
110    }
111
112    /// Return `true` if the connection is in response state
113    pub fn is_response_initiated(&self) -> bool {
114        matches!(self, Self::Response(_))
115    }
116
117    /// Completes the response and switches the connection back to the unbound state
118    /// If the connection is still in a request state, and empty 200 OK response is sent
119    pub async fn complete(&mut self) -> Result<(), Error<T::Error>> {
120        if self.is_request_initiated() {
121            self.complete_request(200, Some("OK"), &[]).await?;
122        }
123
124        if self.is_response_initiated() {
125            self.complete_response().await?;
126        }
127
128        Ok(())
129    }
130
131    /// Completes the response with an error message and switches the connection back to the unbound state
132    ///
133    /// If the connection is still in a request state, an empty 500 Internal Error response is sent
134    pub async fn complete_err(&mut self, err: &str) -> Result<(), Error<T::Error>> {
135        let result = self.request_mut();
136
137        match result {
138            Ok(_) => {
139                let headers = [("Connection", "Close"), ("Content-Type", "text/plain")];
140
141                self.complete_request(500, Some("Internal Error"), &headers)
142                    .await?;
143
144                let response = self.response_mut()?;
145
146                response.io.write_all(err.as_bytes()).await?;
147                response.io.finish().await?;
148
149                Ok(())
150            }
151            Err(err) => Err(err),
152        }
153    }
154
155    /// Return `true` if the connection needs to be closed
156    ///
157    /// This is determined by the connection type (i.e. `Connection: Close` header)
158    pub fn needs_close(&self) -> bool {
159        match self {
160            Self::Response(response) => response.needs_close(),
161            _ => true,
162        }
163    }
164
165    /// Switch the connection to unbound state, returning a mutable reference to the underlying socket stream
166    ///
167    /// NOTE: Use with care, and only if the connection is completed in the meantime
168    pub fn unbind(&mut self) -> Result<&mut T, Error<T::Error>> {
169        let io = self.unbind_mut();
170        *self = Self::Unbound(io);
171
172        Ok(self.io_mut())
173    }
174
175    async fn complete_request(
176        &mut self,
177        status: u16,
178        reason: Option<&str>,
179        headers: &[(&str, &str)],
180    ) -> Result<(), Error<T::Error>> {
181        let request = self.request_mut()?;
182
183        let mut buf = [0; COMPLETION_BUF_SIZE];
184        while request.io.read(&mut buf).await? > 0 {}
185
186        let http11 = request.request.http11;
187        let request_connection_type = request.connection_type;
188
189        let mut io = self.unbind_mut();
190
191        let result = async {
192            send_status(http11, status, reason, &mut io).await?;
193
194            let (connection_type, body_type) = send_headers(
195                headers.iter(),
196                Some(request_connection_type),
197                false,
198                http11,
199                true,
200                &mut io,
201            )
202            .await?;
203
204            Ok((connection_type, body_type))
205        }
206        .await;
207
208        match result {
209            Ok((connection_type, body_type)) => {
210                *self = Self::Response(ResponseState {
211                    io: SendBody::new(body_type, io),
212                    connection_type,
213                });
214
215                Ok(())
216            }
217            Err(e) => {
218                *self = Self::Unbound(io);
219
220                Err(e)
221            }
222        }
223    }
224
225    async fn complete_response(&mut self) -> Result<(), Error<T::Error>> {
226        self.response_mut()?.io.finish().await?;
227
228        Ok(())
229    }
230
231    fn unbind_mut(&mut self) -> T {
232        let state = mem::replace(self, Self::Transition(TransitionState(())));
233
234        match state {
235            Self::Request(request) => request.io.release(),
236            Self::Response(response) => response.io.release(),
237            Self::Unbound(io) => io,
238            _ => unreachable!(),
239        }
240    }
241
242    fn request_mut(&mut self) -> Result<&mut RequestState<'b, T, N>, Error<T::Error>> {
243        if let Self::Request(request) = self {
244            Ok(request)
245        } else {
246            Err(Error::InvalidState)
247        }
248    }
249
250    fn request_ref(&self) -> Result<&RequestState<'b, T, N>, Error<T::Error>> {
251        if let Self::Request(request) = self {
252            Ok(request)
253        } else {
254            Err(Error::InvalidState)
255        }
256    }
257
258    fn response_mut(&mut self) -> Result<&mut ResponseState<T>, Error<T::Error>> {
259        if let Self::Response(response) = self {
260            Ok(response)
261        } else {
262            Err(Error::InvalidState)
263        }
264    }
265
266    fn io_mut(&mut self) -> &mut T {
267        match self {
268            Self::Request(request) => request.io.as_raw_reader(),
269            Self::Response(response) => response.io.as_raw_writer(),
270            Self::Unbound(io) => io,
271            _ => unreachable!(),
272        }
273    }
274}
275
276impl<T, const N: usize> ErrorType for Connection<'_, T, N>
277where
278    T: ErrorType,
279{
280    type Error = Error<T::Error>;
281}
282
283impl<T, const N: usize> Read for Connection<'_, T, N>
284where
285    T: Read + Write,
286{
287    async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
288        self.request_mut()?.io.read(buf).await
289    }
290}
291
292impl<T, const N: usize> Write for Connection<'_, T, N>
293where
294    T: Read + Write,
295{
296    async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
297        self.response_mut()?.io.write(buf).await
298    }
299
300    async fn flush(&mut self) -> Result<(), Self::Error> {
301        self.response_mut()?.io.flush().await
302    }
303}
304
305struct TransitionState(());
306
307struct RequestState<'b, T, const N: usize> {
308    request: RequestHeaders<'b, N>,
309    io: Body<'b, T>,
310    connection_type: ConnectionType,
311}
312
313struct ResponseState<T> {
314    io: SendBody<T>,
315    connection_type: ConnectionType,
316}
317
318impl<T> ResponseState<T>
319where
320    T: Write,
321{
322    fn needs_close(&self) -> bool {
323        matches!(self.connection_type, ConnectionType::Close) || self.io.needs_close()
324    }
325}
326
327#[derive(Debug)]
328#[cfg_attr(feature = "defmt", derive(defmt::Format))]
329pub enum HandlerError<T, E> {
330    Io(T),
331    Connection(Error<T>),
332    Handler(E),
333}
334
335impl<T, E> From<Error<T>> for HandlerError<T, E> {
336    fn from(e: Error<T>) -> Self {
337        Self::Connection(e)
338    }
339}
340
341/// A trait (async callback) for handling incoming HTTP requests
342pub trait Handler {
343    type Error<E>: Debug
344    where
345        E: Debug;
346
347    /// Handle an incoming HTTP request
348    ///
349    /// Parameters:
350    /// - `task_id`: An identifier for the task, thast can be used by the handler for logging purposes
351    /// - `connection`: A connection state machine for the request-response cycle
352    async fn handle<T, const N: usize>(
353        &self,
354        task_id: impl Display + Copy,
355        connection: &mut Connection<'_, T, N>,
356    ) -> Result<(), Self::Error<T::Error>>
357    where
358        T: Read + Write + TcpSplit;
359}
360
361impl<H> Handler for &H
362where
363    H: Handler,
364{
365    type Error<E>
366        = H::Error<E>
367    where
368        E: Debug;
369
370    async fn handle<T, const N: usize>(
371        &self,
372        task_id: impl Display + Copy,
373        connection: &mut Connection<'_, T, N>,
374    ) -> Result<(), Self::Error<T::Error>>
375    where
376        T: Read + Write + TcpSplit,
377    {
378        (**self).handle(task_id, connection).await
379    }
380}
381
382impl<H> Handler for &mut H
383where
384    H: Handler,
385{
386    type Error<E>
387        = H::Error<E>
388    where
389        E: Debug;
390
391    async fn handle<T, const N: usize>(
392        &self,
393        task_id: impl Display + Copy,
394        connection: &mut Connection<'_, T, N>,
395    ) -> Result<(), Self::Error<T::Error>>
396    where
397        T: Read + Write + TcpSplit,
398    {
399        (**self).handle(task_id, connection).await
400    }
401}
402
403impl<H> Handler for WithTimeout<H>
404where
405    H: Handler,
406{
407    type Error<E>
408        = WithTimeoutError<H::Error<E>>
409    where
410        E: Debug;
411
412    async fn handle<T, const N: usize>(
413        &self,
414        task_id: impl Display + Copy,
415        connection: &mut Connection<'_, T, N>,
416    ) -> Result<(), Self::Error<T::Error>>
417    where
418        T: Read + Write + TcpSplit,
419    {
420        let mut io = pin!(self.io().handle(task_id, connection));
421
422        with_timeout(self.timeout_ms(), &mut io).await?;
423
424        Ok(())
425    }
426}
427
428/// A convenience function to handle multiple HTTP requests over a single socket stream,
429/// using the specified handler.
430///
431/// The socket stream will be closed only in case of error, or until the client explicitly requests that
432/// either with a hard socket close, or with a `Connection: Close` header.
433///
434/// A note on timeouts:
435/// - The function does NOT - by default - establish any timeouts on the IO operations _except_
436///   an optional timeout for detecting idle connections, so that they can be closed and thus make
437///   the server available for accepting new connections.
438///   It is up to the caller to wrap the acceptor type with `edge_nal::WithTimeout` to establish
439///   timeouts on the socket produced by the acceptor.
440/// - Similarly, the server does NOT establish any timeouts on the complete request-response cycle.
441///   It is up to the caller to wrap their complete or partial handling logic with
442///   `edge_nal::with_timeout`, or its whole handler with `edge_nal::WithTimeout`, so as to establish
443///   a global or semi-global request-response timeout.
444///
445/// Parameters:
446/// - `io`: A socket stream
447/// - `buf`: A work-area buffer used by the implementation
448/// - `keepalive_timeout_ms`: An optional timeout in milliseconds for detecting an idle keepalive connection
449///   that should be closed. If not provided, the server will not close idle connections.
450/// - `task_id`: An identifier for the task, used for logging purposes
451/// - `handler`: An implementation of `Handler` to handle incoming requests
452pub async fn handle_connection<H, T, const N: usize>(
453    mut io: T,
454    buf: &mut [u8],
455    keepalive_timeout_ms: Option<u32>,
456    task_id: impl Display + Copy,
457    handler: H,
458) where
459    H: Handler,
460    T: Read + Write + Readable + TcpSplit + TcpShutdown,
461{
462    let close = loop {
463        debug!(
464            "Handler task {}: Waiting for a new request",
465            display2format!(task_id)
466        );
467
468        if let Some(keepalive_timeout_ms) = keepalive_timeout_ms {
469            let wait_data = with_timeout(keepalive_timeout_ms, io.readable()).await;
470            match wait_data {
471                Err(WithTimeoutError::Timeout) => {
472                    info!(
473                        "Handler task {}: Closing connection due to inactivity",
474                        display2format!(task_id)
475                    );
476                    break true;
477                }
478                Err(e) => {
479                    warn!(
480                        "Handler task {}: Error when handling request: {:?}",
481                        display2format!(task_id),
482                        debug2format!(e)
483                    );
484                    break true;
485                }
486                Ok(_) => {}
487            }
488        }
489
490        let result = handle_request::<_, _, N>(buf, &mut io, task_id, &handler).await;
491
492        match result {
493            Err(HandlerError::Connection(Error::ConnectionClosed)) => {
494                debug!(
495                    "Handler task {}: Connection closed",
496                    display2format!(task_id)
497                );
498                break false;
499            }
500            Err(e) => {
501                warn!(
502                    "Handler task {}: Error when handling request: {:?}",
503                    display2format!(task_id),
504                    debug2format!(e)
505                );
506                break true;
507            }
508            Ok(needs_close) => {
509                if needs_close {
510                    debug!(
511                        "Handler task {}: Request complete; closing connection",
512                        display2format!(task_id)
513                    );
514                    break true;
515                } else {
516                    debug!(
517                        "Handler task {}: Request complete",
518                        display2format!(task_id)
519                    );
520                }
521            }
522        }
523    };
524
525    if close {
526        if let Err(e) = io.close(Close::Both).await {
527            warn!(
528                "Handler task {}: Error when closing the socket: {:?}",
529                display2format!(task_id),
530                debug2format!(e)
531            );
532        }
533    } else {
534        let _ = io.abort().await;
535    }
536}
537
538/// The error type for handling HTTP requests
539#[derive(Debug)]
540pub enum HandleRequestError<C, E> {
541    /// A connection error (HTTP protocol error or a socket IO error)
542    Connection(Error<C>),
543    /// A handler error
544    Handler(E),
545}
546
547impl<T, E> From<Error<T>> for HandleRequestError<T, E> {
548    fn from(e: Error<T>) -> Self {
549        Self::Connection(e)
550    }
551}
552
553impl<C, E> fmt::Display for HandleRequestError<C, E>
554where
555    C: fmt::Display,
556    E: fmt::Display,
557{
558    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
559        match self {
560            Self::Connection(e) => write!(f, "Connection error: {}", e),
561            Self::Handler(e) => write!(f, "Handler error: {}", e),
562        }
563    }
564}
565
566#[cfg(feature = "defmt")]
567impl<C, E> defmt::Format for HandleRequestError<C, E>
568where
569    C: defmt::Format,
570    E: defmt::Format,
571{
572    fn format(&self, f: defmt::Formatter<'_>) {
573        match self {
574            Self::Connection(e) => defmt::write!(f, "Connection error: {}", e),
575            Self::Handler(e) => defmt::write!(f, "Handler error: {}", e),
576        }
577    }
578}
579
580impl<C, E> embedded_io_async::Error for HandleRequestError<C, E>
581where
582    C: Debug + core::error::Error + embedded_io_async::Error,
583    E: Debug + core::error::Error,
584{
585    fn kind(&self) -> embedded_io_async::ErrorKind {
586        match self {
587            Self::Connection(Error::Io(e)) => e.kind(),
588            _ => embedded_io_async::ErrorKind::Other,
589        }
590    }
591}
592
593impl<C, E> core::error::Error for HandleRequestError<C, E>
594where
595    C: core::error::Error,
596    E: core::error::Error,
597{
598}
599
600/// A convenience function to handle a single HTTP request over a socket stream,
601/// using the specified handler.
602///
603/// Note that this function does not set any timeouts on the request-response processing
604/// or on the IO operations. It is up that the caller to use the `with_timeout` function
605/// and the `WithTimeout` struct from the `edge-nal` crate to wrap the future returned
606/// by this function, or the socket stream, or both.
607///
608/// Parameters:
609/// - `buf`: A work-area buffer used by the implementation
610/// - `io`: A socket stream
611/// - `task_id`: An identifier for the task, used for logging purposes
612/// - `handler`: An implementation of `Handler` to handle incoming requests
613pub async fn handle_request<H, T, const N: usize>(
614    buf: &mut [u8],
615    io: T,
616    task_id: impl Display + Copy,
617    handler: H,
618) -> Result<bool, HandlerError<T::Error, H::Error<T::Error>>>
619where
620    H: Handler,
621    T: Read + Write + TcpSplit,
622{
623    let mut connection = Connection::<_, N>::new(buf, io).await?;
624
625    let result = handler.handle(task_id, &mut connection).await;
626
627    match result {
628        Result::Ok(_) => connection.complete().await?,
629        Result::Err(e) => connection
630            .complete_err("INTERNAL ERROR")
631            .await
632            .map_err(|_| HandlerError::Handler(e))?,
633    }
634
635    Ok(connection.needs_close())
636}
637
638/// A type alias for an HTTP server with default buffer sizes.
639pub type DefaultServer =
640    Server<{ DEFAULT_HANDLER_TASKS_COUNT }, { DEFAULT_BUF_SIZE }, { DEFAULT_MAX_HEADERS_COUNT }>;
641
642/// A type alias for the HTTP server buffers (essentially, arrays of `MaybeUninit`)
643pub type ServerBuffers<const P: usize, const B: usize> = MaybeUninit<[[u8; B]; P]>;
644
645/// An HTTP server that can handle multiple requests concurrently.
646///
647/// The server needs an implementation of `edge_nal::TcpAccept` to accept incoming connections.
648#[repr(transparent)]
649pub struct Server<
650    const P: usize = DEFAULT_HANDLER_TASKS_COUNT,
651    const B: usize = DEFAULT_BUF_SIZE,
652    const N: usize = DEFAULT_MAX_HEADERS_COUNT,
653>(ServerBuffers<P, B>);
654
655impl<const P: usize, const B: usize, const N: usize> Server<P, B, N> {
656    /// Create a new HTTP server
657    #[inline(always)]
658    pub const fn new() -> Self {
659        Self(MaybeUninit::uninit())
660    }
661
662    /// Run the server with the specified acceptor and handler
663    ///
664    /// A note on timeouts:
665    /// - The function does NOT - by default - establish any timeouts on the IO operations _except_
666    ///   an optional timeout on idle connections, so that they can be closed.
667    ///   It is up to the caller to wrap the acceptor type with `edge_nal::WithTimeout` to establish
668    ///   timeouts on the socket produced by the acceptor.
669    /// - Similarly, the function does NOT establish any timeouts on the complete request-response cycle.
670    ///   It is up to the caller to wrap their complete or partial handling logic with
671    ///   `edge_nal::with_timeout`, or its whole handler with `edge_nal::WithTimeout`, so as to establish
672    ///   a global or semi-global request-response timeout.
673    ///
674    /// A note on concurrent connection acceptance:
675    /// - All handler tasks concurrently call `accept()` to maximize connection acceptance capacity.
676    /// - This is critical for TCP stacks without accept queues (e.g., smoltcp/embassy-net), where
677    ///   incoming connections can only be accepted if at least one task is actively waiting in `accept()`.
678    /// - When a task is busy handling a request, other tasks remain available to accept new connections.
679    /// - **Threading safety**: The acceptor must be safe to use from multiple async tasks.
680    ///   For single-threaded async executors (e.g., embassy-executor on embedded platforms),
681    ///   acceptors using non-Sync types like `Cell` for internal state are safe.
682    ///   For multi-threaded executors, the acceptor's internal state must be properly synchronized
683    ///   (e.g., using atomics or locks).
684    ///
685    /// Consider using `run_with_socket_queue()` instead for better connection handling
686    /// with TCP stacks that lack accept queues (e.g., smoltcp/embassy-net). The socket queue architecture
687    /// decouples connection acceptance from HTTP processing, allowing connections to be accepted even when
688    /// all worker tasks are busy.
689    ///
690    /// Parameters:
691    /// - `keepalive_timeout_ms`: An optional timeout in milliseconds for detecting an idle keepalive
692    ///   connection that should be closed. If not provided, the function will not close idle connections
693    ///   and the connection - in the absence of other timeouts - will remain active forever.
694    /// - `acceptor`: An implementation of `edge_nal::TcpAccept` to accept incoming connections
695    /// - `handler`: An implementation of `Handler` to handle incoming requests
696    ///   If not provided, a default timeout of 50 seconds is used.
697    #[inline(never)]
698    #[cold]
699    pub async fn run<A, H>(
700        &mut self,
701        keepalive_timeout_ms: Option<u32>,
702        acceptor: A,
703        handler: H,
704    ) -> Result<(), Error<A::Error>>
705    where
706        A: edge_nal::TcpAccept,
707        H: Handler,
708    {
709        let mut tasks = heapless::Vec::<_, P>::new();
710
711        info!(
712            "Creating {} handler tasks, memory: {}B",
713            P,
714            core::mem::size_of_val(&tasks)
715        );
716
717        for index in 0..P {
718            let acceptor = &acceptor;
719            let task_id = index;
720            let handler = &handler;
721            let buf: *mut [u8; B] = &mut unsafe { self.0.assume_init_mut() }[index];
722
723            unwrap!(tasks
724                .push(async move {
725                    loop {
726                        debug!(
727                            "Handler task {}: Waiting for connection",
728                            display2format!(task_id)
729                        );
730
731                        let io = acceptor.accept().await.map_err(Error::Io)?.1;
732
733                        debug!(
734                            "Handler task {}: Got connection request",
735                            display2format!(task_id)
736                        );
737
738                        handle_connection::<_, _, N>(
739                            io,
740                            unwrap!(unsafe { buf.as_mut() }),
741                            keepalive_timeout_ms,
742                            task_id,
743                            handler,
744                        )
745                        .await;
746                    }
747                })
748                .map_err(|_| ()));
749        }
750
751        let tasks = pin!(tasks);
752
753        let tasks = unsafe { tasks.map_unchecked_mut(|t| t.as_mut_slice()) };
754        let (result, _) = embassy_futures::select::select_slice(tasks).await;
755
756        warn!(
757            "Server processing loop quit abruptly: {:?}",
758            debug2format!(result)
759        );
760
761        result
762    }
763
764    /// Run the server with a socket queue architecture (recommended for smoltcp/embassy-net)
765    ///
766    /// This method addresses the limitation of TCP stacks without accept queues (e.g., smoltcp/embassy-net)
767    /// by using a signal-based coordination mechanism between acceptor and worker tasks. This ensures that
768    /// the number of sockets in use never exceeds the available socket pool size, preventing resource exhaustion.
769    ///
770    /// # Type Parameters
771    ///
772    /// - `Q`: Number of acceptor tasks and maximum sockets that can be allocated simultaneously (default: 8)
773    ///
774    /// # Important Constraints
775    ///
776    /// **CRITICAL**: The `Q` parameter must satisfy these constraints or the function will panic:
777    /// - `Q` must be **less than or equal to** the number of sockets in your smoltcp/embassy-net socket pool
778    ///   - If `Q` exceeds the socket pool size, accept() calls will fail and cause panics
779    /// - `Q` should be **greater than or equal to** `P` for the architecture to provide benefits
780    ///   - If `Q < P`, you lose the advantage of decoupling acceptance from processing
781    ///   - Recommended: `Q >= P` (e.g., Q=8 with P=4)
782    ///
783    /// # Timeout Configuration
784    ///
785    /// The function does NOT establish timeouts by default (except optional keepalive timeout):
786    /// - Wrap the acceptor with `edge_nal::WithTimeout` to set socket-level timeouts
787    /// - Wrap handler logic with `edge_nal::with_timeout` for request-response timeouts
788    ///
789    /// # Parameters
790    ///
791    /// - `keepalive_timeout_ms`: Optional timeout in milliseconds for idle keepalive connections
792    /// - `acceptor`: An implementation of `edge_nal::TcpAccept` to accept incoming connections
793    /// - `handler`: An implementation of `Handler` to handle incoming requests
794    #[cfg(feature = "io")]
795    #[inline(never)]
796    #[cold]
797    pub async fn run_with_socket_queue<A, H, const Q: usize>(
798        &mut self,
799        keepalive_timeout_ms: Option<u32>,
800        acceptor: A,
801        handler: H,
802    ) -> Result<(), Error<A::Error>>
803    where
804        A: edge_nal::TcpAccept,
805        H: Handler,
806    {
807        use embassy_sync::blocking_mutex::raw::NoopRawMutex;
808        use embassy_sync::channel::Channel;
809        use embassy_sync::signal::Signal;
810
811        // ============================================================================
812        // Internal Architecture
813        // ============================================================================
814        // This implementation uses a signal-based coordination mechanism:
815        //
816        // - Q acceptor tasks: Each waits for a signal before calling accept()
817        // - Each acceptor has its own signal indicating socket availability
818        // - Initially all Q signals are set, allowing Q concurrent accepts
819        // - When an acceptor accepts a connection, it enqueues (socket, acceptor_id)
820        // - P worker tasks dequeue from channel and process HTTP requests
821        // - When a worker finishes, it signals the specific acceptor that provided the socket
822        // - At most Q sockets allocated at any time (some accepting, some processing)
823        //
824        // This prevents socket pool exhaustion that would occur if all Q acceptors
825        // were simultaneously calling accept() while P workers were processing,
826        // which would require Q+P sockets. By tracking which acceptor provided each
827        // socket, workers signal the correct acceptor (one waiting on its signal,
828        // not busy on accept()).
829        //
830        // Threading: Uses NoopRawMutex for single-threaded executors (embassy-executor).
831        // For multi-threaded executors, a different mutex type would be needed.
832        // ============================================================================
833
834        // Create a channel to pass accepted sockets from acceptor tasks to worker tasks
835        // Each message contains the socket and the ID of the acceptor that accepted it
836        let socket_queue = Channel::<NoopRawMutex, (A::Socket<'_>, usize), Q>::new();
837
838        // Create signals for each acceptor task to coordinate socket availability
839        // When a worker finishes processing a socket, it signals an acceptor to accept a new connection
840        // This ensures we never have more sockets in use than available in the pool
841        let accept_signals: [Signal<NoopRawMutex, ()>; Q] = [(); Q].map(|_| Signal::new());
842
843        // Create Q acceptor tasks - each waits for its signal before accepting
844        // This ensures we never have more than (Q - sockets_in_use) acceptors calling accept()
845        let mut acceptor_tasks = heapless::Vec::<_, Q>::new();
846
847        info!(
848            "Creating {} acceptor tasks and {} worker tasks, queue size: {}",
849            Q, P, Q
850        );
851
852        for (acceptor_id, signal) in accept_signals.iter().enumerate() {
853            let acceptor = &acceptor;
854            let socket_queue = &socket_queue;
855
856            unwrap!(acceptor_tasks
857                .push(async move {
858                    loop {
859                        // Wait for signal that a socket is available
860                        // Initially all signals are ready, allowing Q concurrent accepts
861                        signal.wait().await;
862
863                        debug!(
864                            "Acceptor task {}: Got signal, waiting for connection",
865                            display2format!(acceptor_id)
866                        );
867
868                        match acceptor.accept().await {
869                            Ok((_, io)) => {
870                                debug!(
871                                    "Acceptor task {}: Got connection, enqueueing",
872                                    display2format!(acceptor_id)
873                                );
874
875                                // Send the socket along with the acceptor ID to the queue
876                                // This allows workers to signal the correct acceptor when done
877                                socket_queue.send((io, acceptor_id)).await;
878
879                                debug!(
880                                    "Acceptor task {}: Connection enqueued",
881                                    display2format!(acceptor_id)
882                                );
883                            }
884                            Err(e) => {
885                                warn!(
886                                    "Acceptor task {}: Error accepting connection: {:?}",
887                                    display2format!(acceptor_id),
888                                    debug2format!(e)
889                                );
890                                // Signal ourselves again to retry
891                                signal.signal(());
892                            }
893                        }
894                    }
895                })
896                .map_err(|_| ()));
897        }
898
899        // Initially signal all acceptors to start accepting
900        for signal in &accept_signals {
901            signal.signal(());
902        }
903
904        // Create worker tasks
905        let mut worker_tasks = heapless::Vec::<_, P>::new();
906
907        for index in 0..P {
908            let task_id = index;
909            let handler = &handler;
910            let socket_queue = &socket_queue;
911            let accept_signals = &accept_signals;
912            // Safety: The server buffer array is properly initialized (MaybeUninit is used correctly),
913            // and each worker task gets exclusive access to its own buffer slice via its unique index.
914            // The pointer remains valid for the lifetime of the server and the buffer is not moved.
915            let buf: *mut [u8; B] = &mut unsafe { self.0.assume_init_mut() }[index];
916
917            unwrap!(worker_tasks
918                .push(async move {
919                    loop {
920                        debug!(
921                            "Worker task {}: Waiting for connection from queue",
922                            display2format!(task_id)
923                        );
924
925                        // Receive an accepted socket from the queue along with the acceptor ID
926                        let (io, acceptor_id) = socket_queue.receive().await;
927
928                        debug!(
929                            "Worker task {}: Got connection from acceptor {} from queue",
930                            display2format!(task_id),
931                            display2format!(acceptor_id)
932                        );
933
934                        handle_connection::<_, _, N>(
935                            io,
936                            unwrap!(unsafe { buf.as_mut() }),
937                            keepalive_timeout_ms,
938                            task_id,
939                            handler,
940                        )
941                        .await;
942
943                        // Signal the specific acceptor that provided this socket
944                        // This ensures we signal an acceptor that's waiting on its signal,
945                        // not one that might be busy waiting on accept()
946                        debug!(
947                            "Worker task {}: Finished processing, signaling acceptor {}",
948                            display2format!(task_id),
949                            display2format!(acceptor_id)
950                        );
951                        accept_signals[acceptor_id].signal(());
952                    }
953                })
954                .map_err(|_| ()));
955        }
956
957        // Pin tasks for select_slice
958        let acceptor_tasks = pin!(acceptor_tasks);
959        let acceptor_tasks = unsafe { acceptor_tasks.map_unchecked_mut(|t| t.as_mut_slice()) };
960
961        let worker_tasks = pin!(worker_tasks);
962        let worker_tasks = unsafe { worker_tasks.map_unchecked_mut(|t| t.as_mut_slice()) };
963
964        // Run all acceptor and worker tasks concurrently
965        // Use select to run both acceptors and workers, return if any completes
966        use embassy_futures::select::Either;
967        let result = embassy_futures::select::select(
968            async {
969                let (result, _acceptor_index): (Result<(), Error<A::Error>>, _) =
970                    embassy_futures::select::select_slice(acceptor_tasks).await;
971                result
972            },
973            async {
974                let (result, _worker_index): (Result<(), Error<A::Error>>, _) =
975                    embassy_futures::select::select_slice(worker_tasks).await;
976                result
977            },
978        )
979        .await;
980
981        // Neither acceptor nor worker tasks should complete normally
982        match result {
983            Either::First(Err(e)) => {
984                warn!("Acceptor task quit with error: {:?}", debug2format!(e));
985                Err(e)
986            }
987            Either::First(Ok(_)) => {
988                warn!("Acceptor task quit unexpectedly");
989                Ok(())
990            }
991            Either::Second(Err(e)) => {
992                warn!("Worker task quit with error: {:?}", debug2format!(e));
993                Err(e)
994            }
995            Either::Second(Ok(_)) => {
996                warn!("Worker task quit unexpectedly");
997                Ok(())
998            }
999        }
1000    }
1001}
1002
1003impl<const P: usize, const B: usize, const N: usize> Default for Server<P, B, N> {
1004    fn default() -> Self {
1005        Self::new()
1006    }
1007}