stof_http/server/
mod.rs

1//
2// Copyright 2024 Formata, Inc. All rights reserved.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//    http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15//
16
17use std::{collections::BTreeMap, net::SocketAddr, str::FromStr, sync::Arc, time::Duration};
18use anyhow::{anyhow, Result};
19use axum::{body::{Body, Bytes}, extract::{Path, Query, State}, http::{header::CONTENT_TYPE, HeaderMap, HeaderName, Method, StatusCode}, response::{IntoResponse, Response}, routing::get, Router};
20use stof::{IntoDataRef, SData, SDataRef, SDoc, SField, SFunc, SVal};
21use tokio::sync::Mutex;
22use tower_governor::{governor::GovernorConfig, GovernorLayer};
23use tower_http::cors::CorsLayer;
24
25
26/// Serve a Stof document.
27/// This is the entrypoint for starting a Stof HTTP server.
28pub fn serve(doc: SDoc) {
29    tokio::runtime::Builder::new_multi_thread()
30        .enable_all()
31        .build()
32        .unwrap()
33        .block_on(async {
34            internal_serve(doc).await;
35        });
36}
37
38/// Server state.
39#[derive(Clone)]
40pub struct ServerState {
41    pub doc: Arc<Mutex<SDoc>>,
42    pub opaque_errors: bool,
43    pub get_handlers: Arc<BTreeMap<String, SDataRef>>,
44    pub put_handlers: Arc<BTreeMap<String, SDataRef>>,
45    pub post_handlers: Arc<BTreeMap<String, SDataRef>>,
46    pub delete_handlers: Arc<BTreeMap<String, SDataRef>>,
47    pub head_handlers: Arc<BTreeMap<String, SDataRef>>,
48    pub patch_handlers: Arc<BTreeMap<String, SDataRef>>,
49}
50impl ServerState {
51    pub fn handler(&self, method: Method, path: &str) -> Result<SDataRef> {
52        let handlers;
53        match method {
54            Method::PUT => handlers = self.put_handlers.clone(),
55            Method::DELETE => handlers = self.delete_handlers.clone(),
56            Method::PATCH => handlers = self.patch_handlers.clone(),
57            Method::POST => handlers = self.post_handlers.clone(),
58            Method::HEAD => handlers = self.head_handlers.clone(),
59            _ => handlers = self.get_handlers.clone(),
60        }
61        if let Some(value) = handlers.get(path) {
62            return Ok(value.clone());
63        }
64        // TODO: wildcards and path based stuff....
65        Err(anyhow!(""))
66    }
67}
68
69/// Internal serve.
70async fn internal_serve(mut doc: SDoc) {
71    // Get configuration data from the document (doc acts as it's own config...)
72    // All info should be in the root Server object - this will get removed before the document is served (security)
73
74    // Server address is in "Server.Config.Address"
75    // root Server: { Config: { Address: { ip: '127.0.0.1', port: 3030 } } }
76    let mut ip = [127, 0, 0, 1];
77    let mut port = 3030;
78    if let Some(ip_field) = SField::field(&doc.graph, "Server.Config.Address.ip", '.', None) {
79        let string = ip_field.to_string();
80        let path = string.split('.').collect::<Vec<&str>>();
81        if path.len() == 4 {
82            for i in 0..4 {
83                let val: Result<u8, _> = path[i].parse();
84                match val {
85                    Ok(v) => {
86                        ip[i] = v;
87                    },
88                    Err(_) => {
89                        ip = [127, 0, 0, 1];
90                        println!("Error starting server at the requested IP: {}, using 127.0.0.1 instead...", string);
91                        break;
92                    }
93                }
94            }
95        }
96    }
97    if let Some(port_field) = SField::field(&doc.graph, "Server.Config.Address.port", '.', None) {
98        let val: Result<u16, _> = port_field.to_string().parse();
99        match val {
100            Ok(v) => {
101                port = v;
102            },
103            Err(_) => {
104                println!("Error listening on port {}, using port 3030 instead...", port_field.to_string());
105            }
106        }
107    }
108    let address = SocketAddr::from((ip, port));
109
110    // Opaque errors - determin whether stof should expose the internal server errors as responses... default is true to prevent leaking internal info..
111    // can be helpful for debugging, etc.
112    let mut opaque_server_errors = true;
113    if let Some(opaque_field) = SField::field(&doc.graph, "Server.Config.opaque_errors", '.', None) {
114        opaque_server_errors = opaque_field.value.truthy();
115    }
116
117    // Setup governor configuration - see https://crates.io/crates/tower_governor
118    let governor_conf = Arc::new(GovernorConfig::default());
119
120    // Spawn a separate background task to cleanup governor
121    let governor_limiter = governor_conf.limiter().clone();
122    let interval = Duration::from_secs(60);
123    std::thread::spawn(move || {
124        loop {
125            std::thread::sleep(interval);
126            governor_limiter.retain_recent();
127        }
128    });
129
130    // Cors layer
131    let cors = CorsLayer::permissive();
132
133    // Create a new server state with the document
134    if let Some(node_ref) = doc.graph.root_by_name("Server") {
135        doc.graph.remove_node(node_ref);
136    }
137    let mut get_handlers = BTreeMap::new();
138    let mut put_handlers = BTreeMap::new();
139    let mut post_handlers = BTreeMap::new();
140    let mut delete_handlers = BTreeMap::new();
141    let mut head_handlers = BTreeMap::new();
142    let mut patch_handlers = BTreeMap::new();
143    for (_, dref) in &doc.graph.data.store {
144        if let Some(func) = dref.get_data::<SFunc>() {
145            if let Some(path) = func.attributes.get("GET") {
146                get_handlers.insert(path.to_string(), dref.data_ref());
147            } else if let Some(path) = func.attributes.get("PUT") {
148                put_handlers.insert(path.to_string(), dref.data_ref());
149            } else if let Some(path) = func.attributes.get("PATCH") {
150                patch_handlers.insert(path.to_string(), dref.data_ref());
151            } else if let Some(path) = func.attributes.get("DELETE") {
152                delete_handlers.insert(path.to_string(), dref.data_ref());
153            } else if let Some(path) = func.attributes.get("POST") {
154                post_handlers.insert(path.to_string(), dref.data_ref());
155            } else if let Some(path) = func.attributes.get("HEAD") {
156                head_handlers.insert(path.to_string(), dref.data_ref());
157            }
158        }
159    }
160    let state = ServerState {
161        doc: Arc::new(Mutex::new(doc)),
162        opaque_errors: opaque_server_errors,
163        get_handlers: Arc::new(get_handlers),
164        put_handlers: Arc::new(put_handlers),
165        post_handlers: Arc::new(post_handlers),
166        delete_handlers: Arc::new(delete_handlers),
167        head_handlers: Arc::new(head_handlers),
168        patch_handlers: Arc::new(patch_handlers),
169    };
170
171    // Create the application router
172    let app = Router::new()
173        .route("/{*path}", get(get_request_handler)
174            .head(head_request_handler)
175            .post(post_request_handler)
176            .put(put_request_handler)
177            .patch(patch_request_handler)
178            .delete(delete_request_handler))
179        .layer(GovernorLayer {
180            config: governor_conf
181        })
182        .layer(cors)
183        .with_state(state);
184
185    let listener = tokio::net::TcpListener::bind(address)
186        .await
187        .unwrap();
188    println!("listening on {}", listener.local_addr().unwrap());
189    axum::serve(listener, app.into_make_service_with_connect_info::<SocketAddr>())
190        .await
191        .unwrap();
192}
193
194/// Response object, implementing IntoResonse.
195pub struct StofResponse {
196    pub headers: HeaderMap,
197    pub status: StatusCode,
198    pub str_body: String,
199    pub bytes_body: Option<Bytes>, // if present, will get sent in place of the str_body
200}
201impl IntoResponse for StofResponse {
202    fn into_response(self) -> axum::response::Response {
203        let mut builder = Response::builder().status(self.status);
204        for (k, v) in &self.headers {
205            builder = builder.header(k, v);
206        }
207        let response;
208        if let Some(bytes) = self.bytes_body {
209            if !self.headers.contains_key(CONTENT_TYPE) {
210                builder = builder.header(CONTENT_TYPE, "application/octet-stream");
211            }
212            response = builder.body(Body::from(bytes));
213        } else {
214            if !self.headers.contains_key(CONTENT_TYPE) {
215                builder = builder.header(CONTENT_TYPE, "text/plain");
216            }
217            response = builder.body(Body::from(self.str_body));
218        }
219        response.unwrap()
220    }
221}
222impl StofResponse {
223    /// Creates a response from this value with a success status code.
224    pub fn val_response(doc: &SDoc, value: SVal) -> Self {
225        let mut status = StatusCode::OK;
226        let mut headers = HeaderMap::new();
227        let mut str_body = String::default();
228        let mut bytes_body = None;
229        let match_val = value.unbox();
230        match match_val {
231            SVal::Blob(blob) => {
232                headers.insert(CONTENT_TYPE, "application/octet-stream".parse().unwrap());
233                bytes_body = Some(Bytes::from(blob));
234            },
235            SVal::String(value) => {
236                headers.insert(CONTENT_TYPE, "text/plain".parse().unwrap());
237                str_body = value;
238            },
239            SVal::Map(map) => {
240                if let Some(format_val) = map.get(&SVal::String("format".into())) {
241                    if let Some(format) = doc.formats.get(&format_val.to_string()) {
242                        headers.insert("format", format.format().parse().unwrap());
243                        headers.insert(CONTENT_TYPE, format.content_type().parse().unwrap());
244                    }
245                }
246                if let Some(headers_val) = map.get(&SVal::String("headers".into())) {
247                    match headers_val {
248                        SVal::Map(headers_map) => {
249                            for (k, v) in headers_map {
250                                let key = k.to_string();
251                                headers.insert(HeaderName::from_str(&key).unwrap(), v.to_string().parse().unwrap());
252                            }
253                        },
254                        SVal::Array(values) => {
255                            for tup in values {
256                                match tup {
257                                    SVal::Tuple(tup) => {
258                                        if tup.len() == 2 {
259                                            let key = tup[0].to_string();
260                                            headers.insert(HeaderName::from_str(&key).unwrap(), tup[1].to_string().parse().unwrap());
261                                        }
262                                    },
263                                    _ => {}
264                                }
265                            }
266                        },
267                        _ => {}
268                    }
269                }
270                if let Some(body_val) = map.get(&SVal::String("body".into())) {
271                    // Get content type to use from the headers if any
272                    let content_type = headers.get(CONTENT_TYPE);
273
274                    match body_val {
275                        SVal::String(value) => {
276                            if content_type.is_none() { // give opportunity to override with the map above
277                                headers.insert(CONTENT_TYPE, "text/plain".parse().unwrap());
278                            }
279                            str_body = value.clone();
280                        },
281                        SVal::Blob(blob) => {
282                            if content_type.is_none() { // give opportunity to override with the map above
283                                headers.insert(CONTENT_TYPE, "application/octet-stream".parse().unwrap());
284                            }
285                            bytes_body = Some(Bytes::from(blob.clone()));
286                        },
287                        SVal::Object(nref) => {
288                            let format;
289                            if let Some(value) = headers.get("format") {
290                                format = value.to_str().unwrap().to_owned();
291                            } else if let Some(ctype) = content_type {
292                                format = ctype.to_str().unwrap().to_owned();
293                            } else {
294                                format = "json".to_owned();
295                            }
296                            if let Ok(result) = doc.export_string("main", &format, Some(nref)) {
297                                str_body = result;
298                                if let Some(format) = doc.formats.get(&format) {
299                                    headers.insert(CONTENT_TYPE, format.content_type().parse().unwrap());
300                                }
301                            } else if let Ok(result) = doc.export_bytes("main", &format, Some(nref)) {
302                                bytes_body = Some(result);
303                                if let Some(format) = doc.formats.get(&format) {
304                                    headers.insert(CONTENT_TYPE, format.content_type().parse().unwrap());
305                                }
306                            } else if let Ok(result) = doc.export_bytes("main", "bytes", Some(nref)) {
307                                bytes_body = Some(result);
308                                if let Some(format) = doc.formats.get("bytes") {
309                                    headers.insert(CONTENT_TYPE, format.content_type().parse().unwrap());
310                                }
311                            }
312                        },
313                        _ => {}
314                    }
315                }
316                if let Some(status_val) = map.get(&SVal::String("status".into())) {
317                    let status_res = StatusCode::from_str(&status_val.to_string());
318                    match status_res {
319                        Ok(code) => status = code,
320                        Err(_inv) => status = StatusCode::MULTI_STATUS,
321                    }
322                }
323            },
324            _ => {}
325        }
326        Self {
327            status,
328            headers,
329            str_body,
330            bytes_body,
331        }
332    }
333
334    /// Error response.
335    pub fn error(code: StatusCode, message: &str) -> Self {
336        Self {
337            headers: HeaderMap::new(),
338            status: code,
339            str_body: message.to_owned(),
340            bytes_body: None,
341        }
342    }
343}
344
345/// Request handler.
346async fn request_handler(state: ServerState, path: String, query: BTreeMap<String, String>, headers: HeaderMap, mut body: Bytes, method: Method) -> impl IntoResponse {
347    let dref = state.handler(method, &path);
348    let mut doc;
349    let function;
350    let func_ref;
351    match dref {
352        Ok(dref) => {
353            let tmp = state.doc.lock().await;
354            if let Some(func) = SData::get::<SFunc>(&tmp.graph, &dref) {
355                function = func.clone();
356                func_ref = dref;
357                doc = tmp.clone();
358            } else {
359                return StofResponse::error(StatusCode::NOT_FOUND, &format!("request handler not found at the path: {}", path));
360            }
361        },
362        Err(error) => {
363            return StofResponse::error(StatusCode::NOT_FOUND, &format!("request handler not found at the path: {}: {}", path, error.to_string()));
364        }
365    }
366
367    let parse_body_res;
368    if let Some(content_type) = headers.get(CONTENT_TYPE) {
369        let ctype = content_type.to_str().unwrap();
370        parse_body_res = doc.header_import("main", ctype, ctype, &mut body, "Request");
371    } else {
372        parse_body_res = doc.header_import("main", "bytes", "bytes", &mut body, "Request");
373    }
374    match parse_body_res {
375        Ok(_) => {},
376        Err(_) => return StofResponse::error(StatusCode::BAD_REQUEST, "failed to parse request body into the document")
377    }
378    let req;
379    if let Some(obj) = doc.graph.root_by_name("Request") {
380        req = obj;
381    } else {
382        return StofResponse::error(StatusCode::BAD_REQUEST, "failed to parse request body into the document");
383    }
384
385    let query: BTreeMap<SVal, SVal> = query.into_iter().map(|(key, value)| (SVal::String(key), SVal::String(value))).collect();
386    let mut header_map: BTreeMap<SVal, SVal> = BTreeMap::new();
387    for (key, value) in &headers {
388        header_map.insert(SVal::String(key.as_str().to_owned()), SVal::String(value.to_str().unwrap().to_owned()));
389    }
390
391    // fn get(requst: obj, headers: map, query: map) { .... }
392    let mut parameters = Vec::new();
393    let mut added_headers = false;
394    for param in &function.params {
395        if param.ptype.is_object() {
396            // put the request object into the parameters
397            parameters.push(SVal::Object(req.clone()));
398        } else if param.ptype.is_map() {
399            // headers first, then query unless name is literally query or headers...
400            if param.name != "query" && (param.name == "headers" || !added_headers) {
401                added_headers = true;
402                parameters.push(SVal::Map(header_map.clone()));
403            } else if param.name != "headers" && (param.name == "query" || added_headers) {
404                parameters.push(SVal::Map(query.clone()));
405            }
406        }
407    }
408    let response = SFunc::call_internal(&func_ref, "main", &mut doc, parameters, true, &function.params, &function.statements, &function.rtype);
409    match response {
410        Ok(response) => {
411            StofResponse::val_response(&doc, response)
412        },
413        Err(error) => {
414            if state.opaque_errors {
415                StofResponse::error(StatusCode::INTERNAL_SERVER_ERROR, "internal server error")
416            } else {
417                StofResponse::error(StatusCode::INTERNAL_SERVER_ERROR, &error.to_string(&doc.graph))
418            }
419        }
420    }
421}
422
423/// Post request handler.
424async fn post_request_handler(State(state): State<ServerState>, Path(path): Path<String>, Query(query): Query<BTreeMap<String, String>>, headers: HeaderMap, body: Bytes) -> impl IntoResponse {
425    request_handler(state, path, query, headers, body, Method::POST).await
426}
427
428/// Put request handler.
429async fn put_request_handler(State(state): State<ServerState>, Path(path): Path<String>, Query(query): Query<BTreeMap<String, String>>, headers: HeaderMap, body: Bytes) -> impl IntoResponse {
430    request_handler(state, path, query, headers, body, Method::PUT).await
431}
432
433/// Patch request handler.
434async fn patch_request_handler(State(state): State<ServerState>, Path(path): Path<String>, Query(query): Query<BTreeMap<String, String>>, headers: HeaderMap, body: Bytes) -> impl IntoResponse {
435    request_handler(state, path, query, headers, body, Method::PATCH).await
436}
437
438/// Delete request handler.
439async fn delete_request_handler(State(state): State<ServerState>, Path(path): Path<String>, Query(query): Query<BTreeMap<String, String>>, headers: HeaderMap, body: Bytes) -> impl IntoResponse {
440    request_handler(state, path, query, headers, body, Method::DELETE).await
441}
442
443/// Get request handler.
444async fn get_request_handler(State(state): State<ServerState>, Path(path): Path<String>, Query(query): Query<BTreeMap<String, String>>, headers: HeaderMap, body: Bytes) -> impl IntoResponse {
445    request_handler(state, path, query, headers, body, Method::GET).await
446}
447
448/// Head request handler.
449async fn head_request_handler(State(state): State<ServerState>, Path(path): Path<String>, Query(query): Query<BTreeMap<String, String>>, headers: HeaderMap, body: Bytes) -> impl IntoResponse {
450    request_handler(state, path, query, headers, body, Method::HEAD).await
451}
452
453
454#[cfg(test)]
455mod tests {
456    use stof::SDoc;
457    use super::serve;
458
459
460    #[test]
461    fn test_serve() {
462        let doc = SDoc::file("src/server/test/server.stof", "stof").unwrap();
463        serve(doc);
464    }
465}