ohkami 0.24.9

A performant, declarative, and runtime-flexible web framework for Rust
Documentation
#![cfg(feature = "__rt_native__")]

mod connection;

pub use self::connection::Connection;

use crate::response::Upgrade;
use crate::router::r#final::Router;
use crate::util::with_timeout;
use crate::{Request, Response};
use std::panic::{AssertUnwindSafe, catch_unwind};
use std::{any::Any, pin::Pin, sync::Arc, time::Duration};

pub(crate) struct Session {
    config: crate::Config,
    connection: Connection,
    router: Arc<Router>,
    ip: std::net::IpAddr,
}

impl Session {
    pub(crate) fn new(
        config: crate::Config,
        connection: impl Into<Connection>,
        ip: std::net::IpAddr,
        router: Arc<Router>,
    ) -> Self {
        Self {
            config,
            connection: connection.into(),
            ip,
            router,
        }
    }

    pub(crate) async fn manage(mut self) {
        #[cold]
        #[inline(never)]
        fn panicking(panic: Box<dyn Any + Send>) -> Response {
            if let Some(msg) = panic.downcast_ref::<String>() {
                crate::WARNING!("[Panicked]: {msg}");
            } else if let Some(msg) = panic.downcast_ref::<&str>() {
                crate::WARNING!("[Panicked]: {msg}");
            } else {
                crate::WARNING!("[Panicked]");
            }
            crate::Response::InternalServerError()
        }

        #[cold]
        #[inline(never)]
        fn handle_send_failure(error: std::io::Error) {
            use std::io::ErrorKind::*;
            if matches!(
                error.kind(),
                BrokenPipe | ConnectionReset | ConnectionAborted
            ) {
                crate::WARNING!("Client disconnected before response could be sent: {error}");
            } else {
                crate::ERROR!("Failed to send response to the client: {error}");
            }
        }

        let mut req = Request::uninit(self.ip, &self.config);
        let mut req = Pin::new(&mut req);
        let upgrade = loop {
            req.clear();
            // Apply a fresh timeout for each read, thus resetting the timer on activity.
            match with_timeout(
                Duration::from_secs(self.config.keepalive_timeout),
                req.as_mut().read(&mut self.connection, &self.config),
            )
            .await
            {
                None => {
                    crate::DEBUG!(
                        "\
                        Reached Keep-Alive timeout ({} secs). \
                        The timeout can be configured via `keepalive_timeout` of `Config`, \
                        or `OHKAMI_KEEPALIVE_TIMEOUT` environment variable.\
                        (default: {})\
                    ",
                        self.config.keepalive_timeout,
                        crate::Config::default().keepalive_timeout
                    );
                    break Upgrade::None;
                }
                Some(read_result) => match read_result {
                    Ok(Some(())) => {
                        let close = matches!(req.headers.connection(), Some("close" | "Close"));

                        let res = match catch_unwind(AssertUnwindSafe({
                            let req = req.as_mut();
                            || self.router.handle(req.get_mut())
                        })) {
                            Ok(future) => future.await,
                            Err(panic) => panicking(panic),
                        };
                        let upgrade = match res.send(&mut self.connection).await {
                            Ok(upgrade) => upgrade,
                            Err(e) => {
                                handle_send_failure(e);
                                break Upgrade::None;
                            }
                        };

                        if !upgrade.is_none() {
                            break upgrade;
                        }
                        if close {
                            break Upgrade::None;
                        }
                    }
                    Ok(None) => {
                        break Upgrade::None;
                    }
                    Err(mut res) => {
                        res.headers.set().connection("close");
                        if let Err(e) = res.send(&mut self.connection).await {
                            handle_send_failure(e);
                        }
                        break Upgrade::None;
                    }
                },
            }
        };

        match upgrade {
            Upgrade::None => {
                crate::DEBUG!("about to shutdown connection");
            }

            #[cfg(feature = "ws")]
            Upgrade::WebSocket(ws) => {
                crate::DEBUG!("WebSocket session started");

                let aborted = ws
                    .manage_with_timeout(
                        crate::__rt__::sleep(Duration::from_secs(self.config.websocket_timeout)),
                        self.connection,
                    )
                    .await;
                if aborted {
                    crate::WARNING!(
                        "\
                        WebSocket session aborted by timeout ({} secs). \
                        This can be configured by `websocket_timeout` of `Config`, or \
                        `OHKAMI_WEBSOCKET_TIMEOUT` environment variable.\
                        (default: {})\
                    ",
                        self.config.websocket_timeout,
                        crate::Config::default().websocket_timeout
                    );
                }

                crate::DEBUG!("WebSocket session finished");
            }
        }
    }
}