1use std::path::PathBuf;
2use std::sync::mpsc;
3
4use fehler::throws;
5use gotham::handler::HandlerError;
6use gotham::helpers::http::response::create_empty_response;
7use gotham::hyper::{body, Body, Response, StatusCode};
8use gotham::middleware::state::StateMiddleware;
9use gotham::pipeline::single_pipeline;
10use gotham::pipeline::single_middleware;
11use gotham::router::builder::{DefineSingleRoute, build_router};
12use gotham::router::builder::DrawRoutes;
13use gotham::router::Router;
14use gotham::state::{State, FromState};
15
16use crate::{GSIConfig, Error, install_dir, update};
17
18pub struct GSIServer {
20 port: u16,
21 config: GSIConfig,
22 installed: bool,
23 listeners: Vec<Box<dyn FnMut(&update::Update)>>,
24}
25
26impl GSIServer {
27 pub fn new(config: GSIConfig, port: u16) -> Self {
29 Self {
30 port,
31 config,
32 installed: false,
33 listeners: vec![],
34 }
35 }
36
37 #[throws]
39 pub fn install_into<P: Into<PathBuf>>(&mut self, cfg_folder: P) {
40 self.config.install_into(cfg_folder, self.port)?;
41 self.installed = true;
42 }
43
44 #[throws]
46 pub fn install(&mut self) {
47 self.install_into(install_dir::discover_cfg_folder()?)?;
48 }
49
50 pub fn add_listener<F: 'static + FnMut(&update::Update)>(&mut self, listener: F) {
52 self.listeners.push(Box::new(listener));
53 }
54
55 #[throws]
57 pub async fn run(mut self) {
58 #[cfg(feature = "autoinstall")]
59 if !self.installed {
60 self.install()?;
61 }
62
63 let (tx, rx) = mpsc::sync_channel(128);
64
65 let port = self.port;
66 tokio::spawn(gotham::init_server(("127.0.0.1", port), router(tx)));
67
68 for update in rx {
69 for callback in &mut self.listeners {
70 callback(&update)
71 }
72 }
73 }
74}
75
76#[derive(Clone, StateData)]
77struct UpdateHandler {
78 inner: mpsc::SyncSender<update::Update>,
79}
80
81impl UpdateHandler {
82 fn new(tx: &mpsc::SyncSender<update::Update>) -> Self {
83 Self {
84 inner: tx.clone()
85 }
86 }
87
88 fn send(&self, update: update::Update) {
89 self.inner.send(update).expect("failed to send update back to main thread");
90 }
91}
92
93#[throws((State, HandlerError))]
94pub async fn handle_update(mut state: State) -> (State, Response<Body>) {
95 let body = state.try_take::<Body>();
96 let body = match body {
97 Some(body) => body,
98 None => {
99 let response = create_empty_response(&state, StatusCode::BAD_REQUEST);
100 return (state, response);
101 }
102 };
103 let body = body::to_bytes(body).await;
104 let body = match body {
105 Ok(body) => body,
106 Err(err) => {
107 eprintln!("{}", err);
108 let response = create_empty_response(&state, StatusCode::INTERNAL_SERVER_ERROR);
109 return (state, response);
110 }
111 };
112 let json_value = serde_json::from_slice::<serde_json::Value>(body.as_ref());
113 let json_value = match json_value {
114 Ok(json_value) => json_value,
115 Err(err) => {
116 println!("JSON parsing error: {}", err);
117 if let Ok(data) = ::std::str::from_utf8(body.as_ref()) {
118 println!("{}\n", data);
119 }
120 let response = create_empty_response(&state, StatusCode::INTERNAL_SERVER_ERROR);
121 return (state, response);
122 }
123 };
124 let data = serde_json::from_value::<update::Update>(json_value);
125 let data = match data {
126 Ok(data) => data,
127 Err(err) => {
128 println!("Update parsing error: {}", err);
129 if let Ok(data) = ::std::str::from_utf8(body.as_ref()) {
130 println!("{}\n", data);
131 }
132 let response = create_empty_response(&state, StatusCode::INTERNAL_SERVER_ERROR);
133 return (state, response);
134 }
135 };
136 {
138 let update_handler = UpdateHandler::borrow_from(&state);
139 update_handler.send(data);
140 }
141 let response = create_empty_response(&state, StatusCode::OK);
142 (state, response)
143}
144
145fn router(tx: mpsc::SyncSender<update::Update>) -> Router {
146 let update_handler = UpdateHandler::new(&tx);
147
148 let middleware = StateMiddleware::new(update_handler);
149 let pipeline = single_middleware(middleware);
150 let (chain, pipelines) = single_pipeline(pipeline);
151
152 build_router(chain, pipelines, |route| {
153 route
154 .post("/")
155 .to_async(handle_update);
156 })
157}