ngyn_hyper/
lib.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
use http_body_util::BodyExt;
use hyper::body::Incoming;
use hyper::server::conn::http1;
use hyper::{service::service_fn, Request};
use hyper_util::rt::TokioIo;
use ngyn_shared::core::engine::{NgynHttpPlatform, PlatformData};
use ngyn_shared::server::NgynResponse;
use std::sync::Arc;
use tokio::net::TcpListener;

/// Represents a Hyper-based application.
#[derive(Default)]
pub struct HyperApplication {
    data: PlatformData,
}

impl NgynHttpPlatform for HyperApplication {
    fn data_mut(&mut self) -> &mut PlatformData {
        &mut self.data
    }
}

impl HyperApplication {
    /// Listens for incoming connections and serves the application.
    ///
    /// ### Arguments
    ///
    /// * `address` - The address to listen on.
    ///
    /// ### Returns
    ///
    /// A `Result` indicating success or failure.
    pub async fn listen<A: tokio::net::ToSocketAddrs>(
        self,
        address: A,
    ) -> Result<(), std::io::Error> {
        let server = TcpListener::bind(address).await?;
        let data = Arc::new(self.data);

        let http = http1::Builder::new();
        let graceful = hyper_util::server::graceful::GracefulShutdown::new();
        // when this signal completes, start shutdown
        let mut signal = std::pin::pin!(shutdown_signal());

        loop {
            let data = data.clone();
            tokio::select! {
                Ok((stream, _)) = server.accept() => {
                    let io = TokioIo::new(stream);
                    let conn = http.serve_connection(io, service_fn(move |req| hyper_service(data.clone(), req)));
                    let handle = graceful.watch(conn);

                    tokio::task::spawn(async move {
                        if let Err(e) = handle.await {
                            eprintln!("server connection error: {}", e);
                        }
                    });
                }
                _ = &mut signal => {
                    eprintln!("graceful shutdown signal received");
                    // stop the accept loop
                    break;
                }
                else => continue, // continue waiting for the next signal or connection
            }
        }

        tokio::select! {
            _ = graceful.shutdown() => {
                eprintln!("all connections gracefully closed");
            },
            _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
                eprintln!("timed out wait for all connections to close");
            }
        }

        Ok(())
    }
}

async fn hyper_service(
    data: Arc<PlatformData>,
    req: Request<Incoming>,
) -> Result<NgynResponse, hyper::Error> {
    let (parts, mut body) = req.into_parts();
    let body = {
        let mut buf = Vec::new();
        // TODO: change this approach. It's not efficient.
        while let Some(frame) = body.frame().await {
            if let Ok(bytes) = frame?.into_data() {
                buf.extend_from_slice(&bytes);
            } else {
                break;
            }
        }
        buf
    };
    let req = Request::from_parts(parts, body);
    let res = data.respond(req).await;

    Ok::<_, hyper::Error>(res)
}

async fn shutdown_signal() {
    tokio::signal::ctrl_c()
        .await
        .expect("failed to listen for signal");
}