edge_http/io/server.rs
1use core::fmt::{self, Debug, Display};
2use core::mem::{self, MaybeUninit};
3use core::pin::pin;
4
5use edge_nal::{
6 with_timeout, Close, Readable, TcpShutdown, TcpSplit, WithTimeout, WithTimeoutError,
7};
8
9use embedded_io_async::{ErrorType, Read, Write};
10
11use super::{send_headers, send_status, Body, Error, RequestHeaders, SendBody};
12
13use crate::ws::{upgrade_response_headers, MAX_BASE64_KEY_RESPONSE_LEN};
14use crate::{ConnectionType, DEFAULT_MAX_HEADERS_COUNT};
15
16pub const DEFAULT_HANDLER_TASKS_COUNT: usize = 4;
17pub const DEFAULT_BUF_SIZE: usize = 2048;
18
19const COMPLETION_BUF_SIZE: usize = 64;
20
21/// A connection state machine for handling HTTP server requests-response cycles.
22#[allow(private_interfaces)]
23pub enum Connection<'b, T, const N: usize = DEFAULT_MAX_HEADERS_COUNT> {
24 Transition(TransitionState),
25 Unbound(T),
26 Request(RequestState<'b, T, N>),
27 Response(ResponseState<T>),
28}
29
30impl<'b, T, const N: usize> Connection<'b, T, N>
31where
32 T: Read + Write,
33{
34 /// Create a new connection state machine for an incoming request
35 ///
36 /// Note that the connection does not have any built-in read/write timeouts:
37 /// - To add a timeout on each IO operation, wrap the `io` type with the `edge_nal::WithTimeout` wrapper.
38 /// - To add a global request-response timeout, wrap your complete request-response processing
39 /// logic with the `edge_nal::with_timeout` function.
40 ///
41 /// Parameters:
42 /// - `buf`: A buffer to store the request headers
43 /// - `io`: A socket stream
44 pub async fn new(
45 buf: &'b mut [u8],
46 mut io: T,
47 ) -> Result<Connection<'b, T, N>, Error<T::Error>> {
48 let mut request = RequestHeaders::new();
49
50 let (buf, read_len) = request.receive(buf, &mut io, true).await?;
51
52 let (connection_type, body_type) = request.resolve::<T::Error>()?;
53
54 let io = Body::new(body_type, buf, read_len, io);
55
56 Ok(Self::Request(RequestState {
57 request,
58 io,
59 connection_type,
60 }))
61 }
62
63 /// Return `true` of the connection is in request state (i.e. the initial state upon calling `new`)
64 pub fn is_request_initiated(&self) -> bool {
65 matches!(self, Self::Request(_))
66 }
67
68 /// Split the connection into request headers and body
69 pub fn split(&mut self) -> (&RequestHeaders<'b, N>, &mut Body<'b, T>) {
70 let req = self.request_mut().expect("Not in request mode");
71
72 (&req.request, &mut req.io)
73 }
74
75 /// Return a reference to the request headers
76 pub fn headers(&self) -> Result<&RequestHeaders<'b, N>, Error<T::Error>> {
77 Ok(&self.request_ref()?.request)
78 }
79
80 /// Return `true` if the request is a WebSocket upgrade request
81 pub fn is_ws_upgrade_request(&self) -> Result<bool, Error<T::Error>> {
82 Ok(self.headers()?.is_ws_upgrade_request())
83 }
84
85 /// Switch the connection into a response state
86 ///
87 /// Parameters:
88 /// - `status`: The HTTP status code
89 /// - `message`: An optional HTTP status message
90 /// - `headers`: An array of HTTP response headers.
91 /// Note that if no `Content-Length` or `Transfer-Encoding` headers are provided,
92 /// the body will be send with chunked encoding (for HTTP1.1 only and if the connection is not Close)
93 pub async fn initiate_response(
94 &mut self,
95 status: u16,
96 message: Option<&str>,
97 headers: &[(&str, &str)],
98 ) -> Result<(), Error<T::Error>> {
99 self.complete_request(status, message, headers).await
100 }
101
102 /// A convenience method to initiate a WebSocket upgrade response
103 pub async fn initiate_ws_upgrade_response(
104 &mut self,
105 buf: &mut [u8; MAX_BASE64_KEY_RESPONSE_LEN],
106 ) -> Result<(), Error<T::Error>> {
107 let headers = upgrade_response_headers(self.headers()?.headers.iter(), None, buf)?;
108
109 self.initiate_response(101, None, &headers).await
110 }
111
112 /// Return `true` if the connection is in response state
113 pub fn is_response_initiated(&self) -> bool {
114 matches!(self, Self::Response(_))
115 }
116
117 /// Completes the response and switches the connection back to the unbound state
118 /// If the connection is still in a request state, and empty 200 OK response is sent
119 pub async fn complete(&mut self) -> Result<(), Error<T::Error>> {
120 if self.is_request_initiated() {
121 self.complete_request(200, Some("OK"), &[]).await?;
122 }
123
124 if self.is_response_initiated() {
125 self.complete_response().await?;
126 }
127
128 Ok(())
129 }
130
131 /// Completes the response with an error message and switches the connection back to the unbound state
132 ///
133 /// If the connection is still in a request state, an empty 500 Internal Error response is sent
134 pub async fn complete_err(&mut self, err: &str) -> Result<(), Error<T::Error>> {
135 let result = self.request_mut();
136
137 match result {
138 Ok(_) => {
139 let headers = [("Connection", "Close"), ("Content-Type", "text/plain")];
140
141 self.complete_request(500, Some("Internal Error"), &headers)
142 .await?;
143
144 let response = self.response_mut()?;
145
146 response.io.write_all(err.as_bytes()).await?;
147 response.io.finish().await?;
148
149 Ok(())
150 }
151 Err(err) => Err(err),
152 }
153 }
154
155 /// Return `true` if the connection needs to be closed
156 ///
157 /// This is determined by the connection type (i.e. `Connection: Close` header)
158 pub fn needs_close(&self) -> bool {
159 match self {
160 Self::Response(response) => response.needs_close(),
161 _ => true,
162 }
163 }
164
165 /// Switch the connection to unbound state, returning a mutable reference to the underlying socket stream
166 ///
167 /// NOTE: Use with care, and only if the connection is completed in the meantime
168 pub fn unbind(&mut self) -> Result<&mut T, Error<T::Error>> {
169 let io = self.unbind_mut();
170 *self = Self::Unbound(io);
171
172 Ok(self.io_mut())
173 }
174
175 async fn complete_request(
176 &mut self,
177 status: u16,
178 reason: Option<&str>,
179 headers: &[(&str, &str)],
180 ) -> Result<(), Error<T::Error>> {
181 let request = self.request_mut()?;
182
183 let mut buf = [0; COMPLETION_BUF_SIZE];
184 while request.io.read(&mut buf).await? > 0 {}
185
186 let http11 = request.request.http11;
187 let request_connection_type = request.connection_type;
188
189 let mut io = self.unbind_mut();
190
191 let result = async {
192 send_status(http11, status, reason, &mut io).await?;
193
194 let (connection_type, body_type) = send_headers(
195 headers.iter(),
196 Some(request_connection_type),
197 false,
198 http11,
199 true,
200 &mut io,
201 )
202 .await?;
203
204 Ok((connection_type, body_type))
205 }
206 .await;
207
208 match result {
209 Ok((connection_type, body_type)) => {
210 *self = Self::Response(ResponseState {
211 io: SendBody::new(body_type, io),
212 connection_type,
213 });
214
215 Ok(())
216 }
217 Err(e) => {
218 *self = Self::Unbound(io);
219
220 Err(e)
221 }
222 }
223 }
224
225 async fn complete_response(&mut self) -> Result<(), Error<T::Error>> {
226 self.response_mut()?.io.finish().await?;
227
228 Ok(())
229 }
230
231 fn unbind_mut(&mut self) -> T {
232 let state = mem::replace(self, Self::Transition(TransitionState(())));
233
234 match state {
235 Self::Request(request) => request.io.release(),
236 Self::Response(response) => response.io.release(),
237 Self::Unbound(io) => io,
238 _ => unreachable!(),
239 }
240 }
241
242 fn request_mut(&mut self) -> Result<&mut RequestState<'b, T, N>, Error<T::Error>> {
243 if let Self::Request(request) = self {
244 Ok(request)
245 } else {
246 Err(Error::InvalidState)
247 }
248 }
249
250 fn request_ref(&self) -> Result<&RequestState<'b, T, N>, Error<T::Error>> {
251 if let Self::Request(request) = self {
252 Ok(request)
253 } else {
254 Err(Error::InvalidState)
255 }
256 }
257
258 fn response_mut(&mut self) -> Result<&mut ResponseState<T>, Error<T::Error>> {
259 if let Self::Response(response) = self {
260 Ok(response)
261 } else {
262 Err(Error::InvalidState)
263 }
264 }
265
266 fn io_mut(&mut self) -> &mut T {
267 match self {
268 Self::Request(request) => request.io.as_raw_reader(),
269 Self::Response(response) => response.io.as_raw_writer(),
270 Self::Unbound(io) => io,
271 _ => unreachable!(),
272 }
273 }
274}
275
276impl<T, const N: usize> ErrorType for Connection<'_, T, N>
277where
278 T: ErrorType,
279{
280 type Error = Error<T::Error>;
281}
282
283impl<T, const N: usize> Read for Connection<'_, T, N>
284where
285 T: Read + Write,
286{
287 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
288 self.request_mut()?.io.read(buf).await
289 }
290}
291
292impl<T, const N: usize> Write for Connection<'_, T, N>
293where
294 T: Read + Write,
295{
296 async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
297 self.response_mut()?.io.write(buf).await
298 }
299
300 async fn flush(&mut self) -> Result<(), Self::Error> {
301 self.response_mut()?.io.flush().await
302 }
303}
304
305struct TransitionState(());
306
307struct RequestState<'b, T, const N: usize> {
308 request: RequestHeaders<'b, N>,
309 io: Body<'b, T>,
310 connection_type: ConnectionType,
311}
312
313struct ResponseState<T> {
314 io: SendBody<T>,
315 connection_type: ConnectionType,
316}
317
318impl<T> ResponseState<T>
319where
320 T: Write,
321{
322 fn needs_close(&self) -> bool {
323 matches!(self.connection_type, ConnectionType::Close) || self.io.needs_close()
324 }
325}
326
327#[derive(Debug)]
328#[cfg_attr(feature = "defmt", derive(defmt::Format))]
329pub enum HandlerError<T, E> {
330 Io(T),
331 Connection(Error<T>),
332 Handler(E),
333}
334
335impl<T, E> From<Error<T>> for HandlerError<T, E> {
336 fn from(e: Error<T>) -> Self {
337 Self::Connection(e)
338 }
339}
340
341/// A trait (async callback) for handling incoming HTTP requests
342pub trait Handler {
343 type Error<E>: Debug
344 where
345 E: Debug;
346
347 /// Handle an incoming HTTP request
348 ///
349 /// Parameters:
350 /// - `task_id`: An identifier for the task, thast can be used by the handler for logging purposes
351 /// - `connection`: A connection state machine for the request-response cycle
352 async fn handle<T, const N: usize>(
353 &self,
354 task_id: impl Display + Copy,
355 connection: &mut Connection<'_, T, N>,
356 ) -> Result<(), Self::Error<T::Error>>
357 where
358 T: Read + Write + TcpSplit;
359}
360
361impl<H> Handler for &H
362where
363 H: Handler,
364{
365 type Error<E>
366 = H::Error<E>
367 where
368 E: Debug;
369
370 async fn handle<T, const N: usize>(
371 &self,
372 task_id: impl Display + Copy,
373 connection: &mut Connection<'_, T, N>,
374 ) -> Result<(), Self::Error<T::Error>>
375 where
376 T: Read + Write + TcpSplit,
377 {
378 (**self).handle(task_id, connection).await
379 }
380}
381
382impl<H> Handler for &mut H
383where
384 H: Handler,
385{
386 type Error<E>
387 = H::Error<E>
388 where
389 E: Debug;
390
391 async fn handle<T, const N: usize>(
392 &self,
393 task_id: impl Display + Copy,
394 connection: &mut Connection<'_, T, N>,
395 ) -> Result<(), Self::Error<T::Error>>
396 where
397 T: Read + Write + TcpSplit,
398 {
399 (**self).handle(task_id, connection).await
400 }
401}
402
403impl<H> Handler for WithTimeout<H>
404where
405 H: Handler,
406{
407 type Error<E>
408 = WithTimeoutError<H::Error<E>>
409 where
410 E: Debug;
411
412 async fn handle<T, const N: usize>(
413 &self,
414 task_id: impl Display + Copy,
415 connection: &mut Connection<'_, T, N>,
416 ) -> Result<(), Self::Error<T::Error>>
417 where
418 T: Read + Write + TcpSplit,
419 {
420 let mut io = pin!(self.io().handle(task_id, connection));
421
422 with_timeout(self.timeout_ms(), &mut io).await?;
423
424 Ok(())
425 }
426}
427
428/// A convenience function to handle multiple HTTP requests over a single socket stream,
429/// using the specified handler.
430///
431/// The socket stream will be closed only in case of error, or until the client explicitly requests that
432/// either with a hard socket close, or with a `Connection: Close` header.
433///
434/// A note on timeouts:
435/// - The function does NOT - by default - establish any timeouts on the IO operations _except_
436/// an optional timeout for detecting idle connections, so that they can be closed and thus make
437/// the server available for accepting new connections.
438/// It is up to the caller to wrap the acceptor type with `edge_nal::WithTimeout` to establish
439/// timeouts on the socket produced by the acceptor.
440/// - Similarly, the server does NOT establish any timeouts on the complete request-response cycle.
441/// It is up to the caller to wrap their complete or partial handling logic with
442/// `edge_nal::with_timeout`, or its whole handler with `edge_nal::WithTimeout`, so as to establish
443/// a global or semi-global request-response timeout.
444///
445/// Parameters:
446/// - `io`: A socket stream
447/// - `buf`: A work-area buffer used by the implementation
448/// - `keepalive_timeout_ms`: An optional timeout in milliseconds for detecting an idle keepalive connection
449/// that should be closed. If not provided, the server will not close idle connections.
450/// - `task_id`: An identifier for the task, used for logging purposes
451/// - `handler`: An implementation of `Handler` to handle incoming requests
452pub async fn handle_connection<H, T, const N: usize>(
453 mut io: T,
454 buf: &mut [u8],
455 keepalive_timeout_ms: Option<u32>,
456 task_id: impl Display + Copy,
457 handler: H,
458) where
459 H: Handler,
460 T: Read + Write + Readable + TcpSplit + TcpShutdown,
461{
462 let close = loop {
463 debug!(
464 "Handler task {}: Waiting for a new request",
465 display2format!(task_id)
466 );
467
468 if let Some(keepalive_timeout_ms) = keepalive_timeout_ms {
469 let wait_data = with_timeout(keepalive_timeout_ms, io.readable()).await;
470 match wait_data {
471 Err(WithTimeoutError::Timeout) => {
472 info!(
473 "Handler task {}: Closing connection due to inactivity",
474 display2format!(task_id)
475 );
476 break true;
477 }
478 Err(e) => {
479 warn!(
480 "Handler task {}: Error when handling request: {:?}",
481 display2format!(task_id),
482 debug2format!(e)
483 );
484 break true;
485 }
486 Ok(_) => {}
487 }
488 }
489
490 let result = handle_request::<_, _, N>(buf, &mut io, task_id, &handler).await;
491
492 match result {
493 Err(HandlerError::Connection(Error::ConnectionClosed)) => {
494 debug!(
495 "Handler task {}: Connection closed",
496 display2format!(task_id)
497 );
498 break false;
499 }
500 Err(e) => {
501 warn!(
502 "Handler task {}: Error when handling request: {:?}",
503 display2format!(task_id),
504 debug2format!(e)
505 );
506 break true;
507 }
508 Ok(needs_close) => {
509 if needs_close {
510 debug!(
511 "Handler task {}: Request complete; closing connection",
512 display2format!(task_id)
513 );
514 break true;
515 } else {
516 debug!(
517 "Handler task {}: Request complete",
518 display2format!(task_id)
519 );
520 }
521 }
522 }
523 };
524
525 if close {
526 if let Err(e) = io.close(Close::Both).await {
527 warn!(
528 "Handler task {}: Error when closing the socket: {:?}",
529 display2format!(task_id),
530 debug2format!(e)
531 );
532 }
533 } else {
534 let _ = io.abort().await;
535 }
536}
537
538/// The error type for handling HTTP requests
539#[derive(Debug)]
540pub enum HandleRequestError<C, E> {
541 /// A connection error (HTTP protocol error or a socket IO error)
542 Connection(Error<C>),
543 /// A handler error
544 Handler(E),
545}
546
547impl<T, E> From<Error<T>> for HandleRequestError<T, E> {
548 fn from(e: Error<T>) -> Self {
549 Self::Connection(e)
550 }
551}
552
553impl<C, E> fmt::Display for HandleRequestError<C, E>
554where
555 C: fmt::Display,
556 E: fmt::Display,
557{
558 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
559 match self {
560 Self::Connection(e) => write!(f, "Connection error: {}", e),
561 Self::Handler(e) => write!(f, "Handler error: {}", e),
562 }
563 }
564}
565
566#[cfg(feature = "defmt")]
567impl<C, E> defmt::Format for HandleRequestError<C, E>
568where
569 C: defmt::Format,
570 E: defmt::Format,
571{
572 fn format(&self, f: defmt::Formatter<'_>) {
573 match self {
574 Self::Connection(e) => defmt::write!(f, "Connection error: {}", e),
575 Self::Handler(e) => defmt::write!(f, "Handler error: {}", e),
576 }
577 }
578}
579
580impl<C, E> embedded_io_async::Error for HandleRequestError<C, E>
581where
582 C: Debug + core::error::Error + embedded_io_async::Error,
583 E: Debug + core::error::Error,
584{
585 fn kind(&self) -> embedded_io_async::ErrorKind {
586 match self {
587 Self::Connection(Error::Io(e)) => e.kind(),
588 _ => embedded_io_async::ErrorKind::Other,
589 }
590 }
591}
592
593impl<C, E> core::error::Error for HandleRequestError<C, E>
594where
595 C: core::error::Error,
596 E: core::error::Error,
597{
598}
599
600/// A convenience function to handle a single HTTP request over a socket stream,
601/// using the specified handler.
602///
603/// Note that this function does not set any timeouts on the request-response processing
604/// or on the IO operations. It is up that the caller to use the `with_timeout` function
605/// and the `WithTimeout` struct from the `edge-nal` crate to wrap the future returned
606/// by this function, or the socket stream, or both.
607///
608/// Parameters:
609/// - `buf`: A work-area buffer used by the implementation
610/// - `io`: A socket stream
611/// - `task_id`: An identifier for the task, used for logging purposes
612/// - `handler`: An implementation of `Handler` to handle incoming requests
613pub async fn handle_request<H, T, const N: usize>(
614 buf: &mut [u8],
615 io: T,
616 task_id: impl Display + Copy,
617 handler: H,
618) -> Result<bool, HandlerError<T::Error, H::Error<T::Error>>>
619where
620 H: Handler,
621 T: Read + Write + TcpSplit,
622{
623 let mut connection = Connection::<_, N>::new(buf, io).await?;
624
625 let result = handler.handle(task_id, &mut connection).await;
626
627 match result {
628 Result::Ok(_) => connection.complete().await?,
629 Result::Err(e) => connection
630 .complete_err("INTERNAL ERROR")
631 .await
632 .map_err(|_| HandlerError::Handler(e))?,
633 }
634
635 Ok(connection.needs_close())
636}
637
638/// A type alias for an HTTP server with default buffer sizes.
639pub type DefaultServer =
640 Server<{ DEFAULT_HANDLER_TASKS_COUNT }, { DEFAULT_BUF_SIZE }, { DEFAULT_MAX_HEADERS_COUNT }>;
641
642/// A type alias for the HTTP server buffers (essentially, arrays of `MaybeUninit`)
643pub type ServerBuffers<const P: usize, const B: usize> = MaybeUninit<[[u8; B]; P]>;
644
645/// An HTTP server that can handle multiple requests concurrently.
646///
647/// The server needs an implementation of `edge_nal::TcpAccept` to accept incoming connections.
648#[repr(transparent)]
649pub struct Server<
650 const P: usize = DEFAULT_HANDLER_TASKS_COUNT,
651 const B: usize = DEFAULT_BUF_SIZE,
652 const N: usize = DEFAULT_MAX_HEADERS_COUNT,
653>(ServerBuffers<P, B>);
654
655impl<const P: usize, const B: usize, const N: usize> Server<P, B, N> {
656 /// Create a new HTTP server
657 #[inline(always)]
658 pub const fn new() -> Self {
659 Self(MaybeUninit::uninit())
660 }
661
662 /// Run the server with the specified acceptor and handler
663 ///
664 /// A note on timeouts:
665 /// - The function does NOT - by default - establish any timeouts on the IO operations _except_
666 /// an optional timeout on idle connections, so that they can be closed.
667 /// It is up to the caller to wrap the acceptor type with `edge_nal::WithTimeout` to establish
668 /// timeouts on the socket produced by the acceptor.
669 /// - Similarly, the function does NOT establish any timeouts on the complete request-response cycle.
670 /// It is up to the caller to wrap their complete or partial handling logic with
671 /// `edge_nal::with_timeout`, or its whole handler with `edge_nal::WithTimeout`, so as to establish
672 /// a global or semi-global request-response timeout.
673 ///
674 /// A note on concurrent connection acceptance:
675 /// - All handler tasks concurrently call `accept()` to maximize connection acceptance capacity.
676 /// - This is critical for TCP stacks without accept queues (e.g., smoltcp/embassy-net), where
677 /// incoming connections can only be accepted if at least one task is actively waiting in `accept()`.
678 /// - When a task is busy handling a request, other tasks remain available to accept new connections.
679 /// - **Threading safety**: The acceptor must be safe to use from multiple async tasks.
680 /// For single-threaded async executors (e.g., embassy-executor on embedded platforms),
681 /// acceptors using non-Sync types like `Cell` for internal state are safe.
682 /// For multi-threaded executors, the acceptor's internal state must be properly synchronized
683 /// (e.g., using atomics or locks).
684 ///
685 /// Consider using `run_with_socket_queue()` instead for better connection handling
686 /// with TCP stacks that lack accept queues (e.g., smoltcp/embassy-net). The socket queue architecture
687 /// decouples connection acceptance from HTTP processing, allowing connections to be accepted even when
688 /// all worker tasks are busy.
689 ///
690 /// Parameters:
691 /// - `keepalive_timeout_ms`: An optional timeout in milliseconds for detecting an idle keepalive
692 /// connection that should be closed. If not provided, the function will not close idle connections
693 /// and the connection - in the absence of other timeouts - will remain active forever.
694 /// - `acceptor`: An implementation of `edge_nal::TcpAccept` to accept incoming connections
695 /// - `handler`: An implementation of `Handler` to handle incoming requests
696 /// If not provided, a default timeout of 50 seconds is used.
697 #[inline(never)]
698 #[cold]
699 pub async fn run<A, H>(
700 &mut self,
701 keepalive_timeout_ms: Option<u32>,
702 acceptor: A,
703 handler: H,
704 ) -> Result<(), Error<A::Error>>
705 where
706 A: edge_nal::TcpAccept,
707 H: Handler,
708 {
709 let mut tasks = heapless::Vec::<_, P>::new();
710
711 info!(
712 "Creating {} handler tasks, memory: {}B",
713 P,
714 core::mem::size_of_val(&tasks)
715 );
716
717 for index in 0..P {
718 let acceptor = &acceptor;
719 let task_id = index;
720 let handler = &handler;
721 let buf: *mut [u8; B] = &mut unsafe { self.0.assume_init_mut() }[index];
722
723 unwrap!(tasks
724 .push(async move {
725 loop {
726 debug!(
727 "Handler task {}: Waiting for connection",
728 display2format!(task_id)
729 );
730
731 let io = acceptor.accept().await.map_err(Error::Io)?.1;
732
733 debug!(
734 "Handler task {}: Got connection request",
735 display2format!(task_id)
736 );
737
738 handle_connection::<_, _, N>(
739 io,
740 unwrap!(unsafe { buf.as_mut() }),
741 keepalive_timeout_ms,
742 task_id,
743 handler,
744 )
745 .await;
746 }
747 })
748 .map_err(|_| ()));
749 }
750
751 let tasks = pin!(tasks);
752
753 let tasks = unsafe { tasks.map_unchecked_mut(|t| t.as_mut_slice()) };
754 let (result, _) = embassy_futures::select::select_slice(tasks).await;
755
756 warn!(
757 "Server processing loop quit abruptly: {:?}",
758 debug2format!(result)
759 );
760
761 result
762 }
763
764 /// Run the server with a socket queue architecture (recommended for smoltcp/embassy-net)
765 ///
766 /// This method addresses the limitation of TCP stacks without accept queues (e.g., smoltcp/embassy-net)
767 /// by using a signal-based coordination mechanism between acceptor and worker tasks. This ensures that
768 /// the number of sockets in use never exceeds the available socket pool size, preventing resource exhaustion.
769 ///
770 /// # Type Parameters
771 ///
772 /// - `Q`: Number of acceptor tasks and maximum sockets that can be allocated simultaneously (default: 8)
773 ///
774 /// # Important Constraints
775 ///
776 /// **CRITICAL**: The `Q` parameter must satisfy these constraints or the function will panic:
777 /// - `Q` must be **less than or equal to** the number of sockets in your smoltcp/embassy-net socket pool
778 /// - If `Q` exceeds the socket pool size, accept() calls will fail and cause panics
779 /// - `Q` should be **greater than or equal to** `P` for the architecture to provide benefits
780 /// - If `Q < P`, you lose the advantage of decoupling acceptance from processing
781 /// - Recommended: `Q >= P` (e.g., Q=8 with P=4)
782 ///
783 /// # Timeout Configuration
784 ///
785 /// The function does NOT establish timeouts by default (except optional keepalive timeout):
786 /// - Wrap the acceptor with `edge_nal::WithTimeout` to set socket-level timeouts
787 /// - Wrap handler logic with `edge_nal::with_timeout` for request-response timeouts
788 ///
789 /// # Parameters
790 ///
791 /// - `keepalive_timeout_ms`: Optional timeout in milliseconds for idle keepalive connections
792 /// - `acceptor`: An implementation of `edge_nal::TcpAccept` to accept incoming connections
793 /// - `handler`: An implementation of `Handler` to handle incoming requests
794 #[cfg(feature = "io")]
795 #[inline(never)]
796 #[cold]
797 pub async fn run_with_socket_queue<A, H, const Q: usize>(
798 &mut self,
799 keepalive_timeout_ms: Option<u32>,
800 acceptor: A,
801 handler: H,
802 ) -> Result<(), Error<A::Error>>
803 where
804 A: edge_nal::TcpAccept,
805 H: Handler,
806 {
807 use embassy_sync::blocking_mutex::raw::NoopRawMutex;
808 use embassy_sync::channel::Channel;
809 use embassy_sync::signal::Signal;
810
811 // ============================================================================
812 // Internal Architecture
813 // ============================================================================
814 // This implementation uses a signal-based coordination mechanism:
815 //
816 // - Q acceptor tasks: Each waits for a signal before calling accept()
817 // - Each acceptor has its own signal indicating socket availability
818 // - Initially all Q signals are set, allowing Q concurrent accepts
819 // - When an acceptor accepts a connection, it enqueues (socket, acceptor_id)
820 // - P worker tasks dequeue from channel and process HTTP requests
821 // - When a worker finishes, it signals the specific acceptor that provided the socket
822 // - At most Q sockets allocated at any time (some accepting, some processing)
823 //
824 // This prevents socket pool exhaustion that would occur if all Q acceptors
825 // were simultaneously calling accept() while P workers were processing,
826 // which would require Q+P sockets. By tracking which acceptor provided each
827 // socket, workers signal the correct acceptor (one waiting on its signal,
828 // not busy on accept()).
829 //
830 // Threading: Uses NoopRawMutex for single-threaded executors (embassy-executor).
831 // For multi-threaded executors, a different mutex type would be needed.
832 // ============================================================================
833
834 // Create a channel to pass accepted sockets from acceptor tasks to worker tasks
835 // Each message contains the socket and the ID of the acceptor that accepted it
836 let socket_queue = Channel::<NoopRawMutex, (A::Socket<'_>, usize), Q>::new();
837
838 // Create signals for each acceptor task to coordinate socket availability
839 // When a worker finishes processing a socket, it signals an acceptor to accept a new connection
840 // This ensures we never have more sockets in use than available in the pool
841 let accept_signals: [Signal<NoopRawMutex, ()>; Q] = [(); Q].map(|_| Signal::new());
842
843 // Create Q acceptor tasks - each waits for its signal before accepting
844 // This ensures we never have more than (Q - sockets_in_use) acceptors calling accept()
845 let mut acceptor_tasks = heapless::Vec::<_, Q>::new();
846
847 info!(
848 "Creating {} acceptor tasks and {} worker tasks, queue size: {}",
849 Q, P, Q
850 );
851
852 for (acceptor_id, signal) in accept_signals.iter().enumerate() {
853 let acceptor = &acceptor;
854 let socket_queue = &socket_queue;
855
856 unwrap!(acceptor_tasks
857 .push(async move {
858 loop {
859 // Wait for signal that a socket is available
860 // Initially all signals are ready, allowing Q concurrent accepts
861 signal.wait().await;
862
863 debug!(
864 "Acceptor task {}: Got signal, waiting for connection",
865 display2format!(acceptor_id)
866 );
867
868 match acceptor.accept().await {
869 Ok((_, io)) => {
870 debug!(
871 "Acceptor task {}: Got connection, enqueueing",
872 display2format!(acceptor_id)
873 );
874
875 // Send the socket along with the acceptor ID to the queue
876 // This allows workers to signal the correct acceptor when done
877 socket_queue.send((io, acceptor_id)).await;
878
879 debug!(
880 "Acceptor task {}: Connection enqueued",
881 display2format!(acceptor_id)
882 );
883 }
884 Err(e) => {
885 warn!(
886 "Acceptor task {}: Error accepting connection: {:?}",
887 display2format!(acceptor_id),
888 debug2format!(e)
889 );
890 // Signal ourselves again to retry
891 signal.signal(());
892 }
893 }
894 }
895 })
896 .map_err(|_| ()));
897 }
898
899 // Initially signal all acceptors to start accepting
900 for signal in &accept_signals {
901 signal.signal(());
902 }
903
904 // Create worker tasks
905 let mut worker_tasks = heapless::Vec::<_, P>::new();
906
907 for index in 0..P {
908 let task_id = index;
909 let handler = &handler;
910 let socket_queue = &socket_queue;
911 let accept_signals = &accept_signals;
912 // Safety: The server buffer array is properly initialized (MaybeUninit is used correctly),
913 // and each worker task gets exclusive access to its own buffer slice via its unique index.
914 // The pointer remains valid for the lifetime of the server and the buffer is not moved.
915 let buf: *mut [u8; B] = &mut unsafe { self.0.assume_init_mut() }[index];
916
917 unwrap!(worker_tasks
918 .push(async move {
919 loop {
920 debug!(
921 "Worker task {}: Waiting for connection from queue",
922 display2format!(task_id)
923 );
924
925 // Receive an accepted socket from the queue along with the acceptor ID
926 let (io, acceptor_id) = socket_queue.receive().await;
927
928 debug!(
929 "Worker task {}: Got connection from acceptor {} from queue",
930 display2format!(task_id),
931 display2format!(acceptor_id)
932 );
933
934 handle_connection::<_, _, N>(
935 io,
936 unwrap!(unsafe { buf.as_mut() }),
937 keepalive_timeout_ms,
938 task_id,
939 handler,
940 )
941 .await;
942
943 // Signal the specific acceptor that provided this socket
944 // This ensures we signal an acceptor that's waiting on its signal,
945 // not one that might be busy waiting on accept()
946 debug!(
947 "Worker task {}: Finished processing, signaling acceptor {}",
948 display2format!(task_id),
949 display2format!(acceptor_id)
950 );
951 accept_signals[acceptor_id].signal(());
952 }
953 })
954 .map_err(|_| ()));
955 }
956
957 // Pin tasks for select_slice
958 let acceptor_tasks = pin!(acceptor_tasks);
959 let acceptor_tasks = unsafe { acceptor_tasks.map_unchecked_mut(|t| t.as_mut_slice()) };
960
961 let worker_tasks = pin!(worker_tasks);
962 let worker_tasks = unsafe { worker_tasks.map_unchecked_mut(|t| t.as_mut_slice()) };
963
964 // Run all acceptor and worker tasks concurrently
965 // Use select to run both acceptors and workers, return if any completes
966 use embassy_futures::select::Either;
967 let result = embassy_futures::select::select(
968 async {
969 let (result, _acceptor_index): (Result<(), Error<A::Error>>, _) =
970 embassy_futures::select::select_slice(acceptor_tasks).await;
971 result
972 },
973 async {
974 let (result, _worker_index): (Result<(), Error<A::Error>>, _) =
975 embassy_futures::select::select_slice(worker_tasks).await;
976 result
977 },
978 )
979 .await;
980
981 // Neither acceptor nor worker tasks should complete normally
982 match result {
983 Either::First(Err(e)) => {
984 warn!("Acceptor task quit with error: {:?}", debug2format!(e));
985 Err(e)
986 }
987 Either::First(Ok(_)) => {
988 warn!("Acceptor task quit unexpectedly");
989 Ok(())
990 }
991 Either::Second(Err(e)) => {
992 warn!("Worker task quit with error: {:?}", debug2format!(e));
993 Err(e)
994 }
995 Either::Second(Ok(_)) => {
996 warn!("Worker task quit unexpectedly");
997 Ok(())
998 }
999 }
1000 }
1001}
1002
1003impl<const P: usize, const B: usize, const N: usize> Default for Server<P, B, N> {
1004 fn default() -> Self {
1005 Self::new()
1006 }
1007}