1use std::path::PathBuf;
2use std::sync::mpsc;
3
4use fehler::throws;
5use gotham::handler::HandlerError;
6use gotham::helpers::http::response::create_empty_response;
7use gotham::hyper::{body, Body, Response, StatusCode};
8use gotham::middleware::state::StateMiddleware;
9use gotham::pipeline::single::single_pipeline;
10use gotham::pipeline::single_middleware;
11use gotham::router::builder::{DefineSingleRoute, build_router};
12use gotham::router::builder::DrawRoutes;
13use gotham::router::Router;
14use gotham::state::{State, FromState};
15
16use crate::{GSIConfig, Error, install_dir, update};
17
18pub struct GSIServer {
20 port: u16,
21 config: GSIConfig,
22 installed: bool,
23 listeners: Vec<Box<dyn FnMut(&update::Update)>>,
24}
25
26impl GSIServer {
27 pub fn new(config: GSIConfig, port: u16) -> Self {
29 Self {
30 port,
31 config,
32 installed: false,
33 listeners: vec![],
34 }
35 }
36
37 #[throws]
39 pub fn install_into<P: Into<PathBuf>>(&mut self, cfg_folder: P) {
40 self.config.install_into(cfg_folder, self.port)?;
41 self.installed = true;
42 }
43
44 #[throws]
46 pub fn install(&mut self) {
47 self.install_into(install_dir::discover_cfg_folder()?)?;
48 }
49
50 pub fn add_listener<F: 'static + FnMut(&update::Update)>(&mut self, listener: F) {
52 self.listeners.push(Box::new(listener));
53 }
54
55 #[throws]
57 pub async fn run(mut self) {
58 if !self.installed {
59 self.install()?;
60 }
61
62 let (tx, rx) = mpsc::sync_channel(128);
63
64 let port = self.port;
65 tokio::spawn(gotham::init_server(("127.0.0.1", port), router(tx)));
66
67 for update in rx {
68 for callback in &mut self.listeners {
69 callback(&update)
70 }
71 }
72 }
73}
74
75#[derive(Clone, StateData)]
76struct UpdateHandler {
77 inner: mpsc::SyncSender<update::Update>,
78}
79
80impl UpdateHandler {
81 fn new(tx: &mpsc::SyncSender<update::Update>) -> Self {
82 Self {
83 inner: tx.clone()
84 }
85 }
86
87 fn send(&self, update: update::Update) {
88 self.inner.send(update).expect("failed to send update back to main thread");
89 }
90}
91
92#[throws((State, HandlerError))]
93pub async fn handle_update(mut state: State) -> (State, Response<Body>) {
94 let body = state.try_take::<Body>();
95 let body = match body {
96 Some(body) => body,
97 None => {
98 let response = create_empty_response(&state, StatusCode::BAD_REQUEST);
99 return (state, response);
100 }
101 };
102 let body = body::to_bytes(body).await;
103 let body = match body {
104 Ok(body) => body,
105 Err(err) => {
106 eprintln!("{}", err);
107 let response = create_empty_response(&state, StatusCode::INTERNAL_SERVER_ERROR);
108 return (state, response);
109 }
110 };
111 let json_value = serde_json::from_slice::<serde_json::Value>(body.as_ref());
112 let json_value = match json_value {
113 Ok(json_value) => json_value,
114 Err(err) => {
115 println!("JSON parsing error: {}", err);
116 if let Ok(data) = ::std::str::from_utf8(body.as_ref()) {
117 println!("{}\n", data);
118 }
119 let response = create_empty_response(&state, StatusCode::INTERNAL_SERVER_ERROR);
120 return (state, response);
121 }
122 };
123 let data = serde_json::from_value::<update::Update>(json_value);
124 let data = match data {
125 Ok(data) => data,
126 Err(err) => {
127 println!("Update parsing error: {}", err);
128 if let Ok(data) = ::std::str::from_utf8(body.as_ref()) {
129 println!("{}\n", data);
130 }
131 let response = create_empty_response(&state, StatusCode::INTERNAL_SERVER_ERROR);
132 return (state, response);
133 }
134 };
135 {
137 let update_handler = UpdateHandler::borrow_from(&state);
138 update_handler.send(data);
139 }
140 let response = create_empty_response(&state, StatusCode::OK);
141 (state, response)
142}
143
144fn router(tx: mpsc::SyncSender<update::Update>) -> Router {
145 let update_handler = UpdateHandler::new(&tx);
146
147 let middleware = StateMiddleware::new(update_handler);
148 let pipeline = single_middleware(middleware);
149 let (chain, pipelines) = single_pipeline(pipeline);
150
151 build_router(chain, pipelines, |route| {
152 route
153 .post("/")
154 .to_async(handle_update);
155 })
156}