volga/
app.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
use bytes::Bytes;
use httparse::EMPTY_HEADER;
use http::{Request, Response};
use std::sync::Arc;

use tokio::{
    net::{TcpListener, TcpStream},
    io::{self,AsyncReadExt, AsyncWriteExt},
    sync::{broadcast, Mutex},
    signal
};

use crate::app::{
    endpoints::{Endpoints, EndpointContext},
    middlewares::{Middlewares, mapping::asynchronous::AsyncMiddlewareMapping},
    request::RawRequest,
    results::Results
};

pub mod middlewares;
pub mod endpoints;
pub mod request;
pub mod results;
pub mod mapping;

/// The web application used to configure the HTTP pipeline, and routes.
///
/// # Examples
/// ```no_run
///use volga::App;
///
///#[tokio::main]
///async fn main() -> tokio::io::Result<()> {
///    let mut app = App::build("127.0.0.1:7878").await?;
///    
///    app.run().await
///}
/// ```
pub struct App {
    middlewares: Arc<Mutex<Middlewares>>,
    endpoints: Arc<Mutex<Endpoints>>,
    tcp_listener: TcpListener,
    shutdown_signal: broadcast::Receiver<()>,
    shutdown_sender: broadcast::Sender<()>
}

pub struct HttpContext {
    pub request: Arc<Request<Bytes>>,
    endpoint_context: EndpointContext
}

impl HttpContext {
    async fn execute(&self) -> http::Result<Response<Bytes>> {
        let request = &self.request;
        self.endpoint_context.handler.call(request.clone()).await
    }
}

impl App {
    /// Initializes a new instance of the `App` on specified `socket`.
    pub async fn build(socket: &str) -> io::Result<App> {
        if socket.is_empty() {
            return Err(io::Error::new(io::ErrorKind::InvalidData, "An empty socket has been provided."));
        }

        let tcp_listener = TcpListener::bind(socket).await?;
        let (shutdown_sender, shutdown_receiver) = broadcast::channel::<()>(1);

        let ctrl_c_shutdown_sender = shutdown_sender.clone();
        tokio::spawn(async move {
            match signal::ctrl_c().await {
                Ok(_) => (),
                Err(err) => {
                    eprintln!("Unable to listen for shutdown signal: {}", err);
                }
            };

            match ctrl_c_shutdown_sender.send(()) {
                Ok(_) => (),
                Err(err) => {
                    eprintln!("Failed to send shutdown signal: {}", err);
                }
            }
        });

        let server = Self {
            tcp_listener,
            shutdown_sender,
            shutdown_signal: shutdown_receiver,
            middlewares: Arc::new(Mutex::new(Middlewares::new())),
            endpoints: Arc::new(Mutex::new(Endpoints::new()))
        };

        println!("Start listening: {socket}");
        
        Ok(server)
    }

    /// Runs the Web Server
    pub async fn run(&mut self) -> io::Result<()> {
        self.use_middleware(|ctx, _| async move {
            ctx.execute().await
        }).await;

        loop {
            tokio::select! {
                Ok((socket, _)) = self.tcp_listener.accept() => {
                    let middlewares = Arc::clone(&self.middlewares);
                    let endpoints = Arc::clone(&self.endpoints);
                    
                    tokio::spawn(async move {
                        Self::handle_connection(&middlewares, &endpoints, socket).await;
                    });
                }
                _ = self.shutdown_signal.recv() => {
                    println!("Shutting down server...");
                    break;
                }
            }
        }

        Ok(())
    }

    /// Gracefully shutdown the server
    pub fn shutdown(&self) {
        match self.shutdown_sender.send(()) {
            Ok(_) => (),
            Err(err) => {
                eprintln!("Failed to send shutdown the server: {}", err);
            }
        };
    }

    #[inline]
    async fn handle_connection(middlewares: &Arc<Mutex<Middlewares>>, endpoints: &Arc<Mutex<Endpoints>>, mut socket: TcpStream) {
        let mut buffer = [0; 1024];
        loop {
            match Self::handle_request(middlewares, endpoints, &mut socket, &mut buffer).await {
                Ok(response) => {
                    if let Err(err) = Self::write_response(&mut socket, &response).await {
                        eprintln!("Failed to write to socket: {:?}", err);
                        break; // Break the loop if fail to write to the socket
                    }
                }
                Err(err) => {
                    eprintln!("Error occurred handling request: {}", err);
                    break;  // Break the loop if handle_request returns an error
                }
            }
        }

        println!("Connection handling has ended.");
    }

    #[inline]
    async fn handle_request(middlewares: &Arc<Mutex<Middlewares>>, endpoints: &Arc<Mutex<Endpoints>>, socket: &mut TcpStream, buffer: &mut [u8]) -> Result<Response<Bytes>, io::Error> {
        let bytes_read = socket.read(buffer).await?;
        if bytes_read == 0 {
            return Err(io::Error::new(io::ErrorKind::BrokenPipe, "Client closed the connection"));
        }

        let mut headers = [EMPTY_HEADER; 16]; // Adjust header size as necessary
        let parse_req = RawRequest::parse_request(&buffer[..bytes_read], &mut headers)?;
        let mut http_request = RawRequest::convert_to_http_request(parse_req)?;

        let endpoints_guard = endpoints.lock().await;
        if let Some(endpoint_context) = endpoints_guard.get_endpoint(&http_request).await {
            let extensions = http_request.extensions_mut();
            extensions.insert(endpoint_context.params.clone());

            let context = HttpContext {
                request: Arc::new(http_request),
                endpoint_context
            };

            let middlewares_guard = middlewares.lock().await;
            let response = middlewares_guard.execute(Arc::new(context)).await;

            Ok(response.unwrap())
        } else {
            Ok(Results::not_found().unwrap())
        }
    }

    #[inline]
    async fn write_response(stream: &mut TcpStream, response: &Response<Bytes>) -> io::Result<()> {
        let mut response_bytes = vec![];

        // Start with the HTTP status line
        let status_line = format!(
            "HTTP/1.1 {} {}\r\n",
            response.status().as_u16(),
            response.status().canonical_reason().unwrap_or("unknown status")
        );
        response_bytes.extend_from_slice(status_line.as_bytes());

        // Write headers
        for (key, value) in response.headers() {
            let header_value = match value.to_str() {
                Ok(v) => v,
                Err(_) => return Err(io::Error::new(io::ErrorKind::InvalidData, "Invalid header value")),
            };
            let header = format!("{}: {}\r\n", key, header_value);
            response_bytes.extend_from_slice(header.as_bytes());
        }

        // End of headers section
        response_bytes.extend_from_slice(b"\r\n");

        // Write body
        if !response.body().is_empty() {
            response_bytes.extend_from_slice(response.body());
        }

        stream.write_all(&response_bytes).await?;
        stream.flush().await
    }
}