csgo_gsi/
server.rs

1use std::path::PathBuf;
2use std::sync::mpsc;
3
4use fehler::throws;
5use gotham::handler::HandlerError;
6use gotham::helpers::http::response::create_empty_response;
7use gotham::hyper::{body, Body, Response, StatusCode};
8use gotham::middleware::state::StateMiddleware;
9use gotham::pipeline::single::single_pipeline;
10use gotham::pipeline::single_middleware;
11use gotham::router::builder::{DefineSingleRoute, build_router};
12use gotham::router::builder::DrawRoutes;
13use gotham::router::Router;
14use gotham::state::{State, FromState};
15
16use crate::{GSIConfig, Error, install_dir, update};
17
18/// a server that listens for GSI updates
19pub struct GSIServer {
20    port: u16,
21    config: GSIConfig,
22    installed: bool,
23    listeners: Vec<Box<dyn FnMut(&update::Update)>>,
24}
25
26impl GSIServer {
27    /// create a new server with the given configuration and port
28    pub fn new(config: GSIConfig, port: u16) -> Self {
29        Self {
30            port,
31            config,
32            installed: false,
33            listeners: vec![],
34        }
35    }
36
37    /// install this server's configuration into the given `/path/to/csgo/cfg/` folder
38    #[throws]
39    pub fn install_into<P: Into<PathBuf>>(&mut self, cfg_folder: P) {
40        self.config.install_into(cfg_folder, self.port)?;
41        self.installed = true;
42    }
43
44    /// install this server's configuration into the autodiscovered `/path/to/csgo/cfg/` folder, if it can be found
45    #[throws]
46    pub fn install(&mut self) {
47        self.install_into(install_dir::discover_cfg_folder()?)?;
48    }
49
50    /// add an update listener
51    pub fn add_listener<F: 'static + FnMut(&update::Update)>(&mut self, listener: F) {
52        self.listeners.push(Box::new(listener));
53    }
54
55    /// run the server (will block indefinitely)
56    #[throws]
57    pub async fn run(mut self) {
58        if !self.installed {
59            self.install()?;
60        }
61
62        let (tx, rx) = mpsc::sync_channel(128);
63
64        let port = self.port;
65        tokio::spawn(gotham::init_server(("127.0.0.1", port), router(tx)));
66
67        for update in rx {
68            for callback in &mut self.listeners {
69                callback(&update)
70            }
71        }
72    }
73}
74
75#[derive(Clone, StateData)]
76struct UpdateHandler {
77    inner: mpsc::SyncSender<update::Update>,
78}
79
80impl UpdateHandler {
81    fn new(tx: &mpsc::SyncSender<update::Update>) -> Self {
82        Self {
83            inner: tx.clone()
84        }
85    }
86
87    fn send(&self, update: update::Update) {
88        self.inner.send(update).expect("failed to send update back to main thread");
89    }
90}
91
92#[throws((State, HandlerError))]
93pub async fn handle_update(mut state: State) -> (State, Response<Body>) {
94    let body = state.try_take::<Body>();
95    let body = match body {
96        Some(body) => body,
97        None => {
98            let response = create_empty_response(&state, StatusCode::BAD_REQUEST);
99            return (state, response);
100        }
101    };
102    let body = body::to_bytes(body).await;
103    let body = match body {
104        Ok(body) => body,
105        Err(err) => {
106            eprintln!("{}", err);
107            let response = create_empty_response(&state, StatusCode::INTERNAL_SERVER_ERROR);
108            return (state, response);
109        }
110    };
111    let json_value = serde_json::from_slice::<serde_json::Value>(body.as_ref());
112    let json_value = match json_value {
113        Ok(json_value) => json_value,
114        Err(err) => {
115            println!("JSON parsing error: {}", err);
116            if let Ok(data) = ::std::str::from_utf8(body.as_ref()) {
117                println!("{}\n", data);
118            }
119            let response = create_empty_response(&state, StatusCode::INTERNAL_SERVER_ERROR);
120            return (state, response);
121        }
122    };
123    let data = serde_json::from_value::<update::Update>(json_value);
124    let data = match data {
125        Ok(data) => data,
126        Err(err) => {
127            println!("Update parsing error: {}", err);
128            if let Ok(data) = ::std::str::from_utf8(body.as_ref()) {
129                println!("{}\n", data);
130            }
131            let response = create_empty_response(&state, StatusCode::INTERNAL_SERVER_ERROR);
132            return (state, response);
133        }
134    };
135    // TODO verify auth
136    {
137        let update_handler = UpdateHandler::borrow_from(&state);
138        update_handler.send(data);
139    }
140    let response = create_empty_response(&state, StatusCode::OK);
141    (state, response)
142}
143
144fn router(tx: mpsc::SyncSender<update::Update>) -> Router {
145    let update_handler = UpdateHandler::new(&tx);
146
147    let middleware = StateMiddleware::new(update_handler);
148    let pipeline = single_middleware(middleware);
149    let (chain, pipelines) = single_pipeline(pipeline);
150
151    build_router(chain, pipelines, |route| {
152        route
153            .post("/")
154            .to_async(handle_update);
155    })
156}