1pub mod config;
2
3use std::sync::Arc;
4
5use axum::{
6 extract::{Json, State},
7 http::StatusCode,
8 routing::{get, post},
9};
10use tracing::{debug, info, instrument};
11
12use crate::config::ServerConfiguration;
13use pixy_core::validation::parse_configs;
14use pixy_core::{Gateway, SensorGateway, SensorMessage};
15
16fn create_app(gateway: Arc<dyn Gateway>, server_configs: &ServerConfiguration) -> axum::Router {
17 let app = axum::Router::new()
18 .route("/data", post(handler))
19 .route("/healthz", get(|| async { StatusCode::OK }))
20 .with_state(gateway);
21
22 if server_configs.enable_echo {
23 app.route("/echo", post(echo))
24 } else {
25 app
26 }
27}
28
29pub async fn run_server_with_gateway(
30 gateway: Arc<dyn Gateway>,
31 server_configs: ServerConfiguration,
32) {
33 let app = create_app(gateway, &server_configs);
34
35 let bind_address = format!("0.0.0.0:{}", server_configs.port);
36
37 println!(
38 r#"
39 ___ ___
40 / /\ ___ /__/| ___
41 / /::\ / /\ | |:| /__/|
42 / /:/\:\ / /:/ | |:| | |:|
43 / /:/~/:/ /__/::\ __|__|:| | |:|
44/__/:/ /:/ \__\/\:\__ /__/::::\____ __|__|:|
45\ \:\/:/ \ \:\/\ ~\~~\::::/ /__/::::\
46 \ \::/ \__\::/ |~~|:|~~ ~\~~\:\
47 \ \:\ /__/:/ | |:| \ \:\
48 \ \:\ \__\/ | |:| \__\/
49 \__\/ |__|/
50 "#
51 );
52
53 info!("Starting server on {}", &bind_address);
54 let listener = tokio::net::TcpListener::bind(&bind_address).await.unwrap();
55
56 axum::serve(listener, app.into_make_service())
57 .await
58 .unwrap();
59}
60
61pub async fn run_server_with(server_configs: ServerConfiguration) {
62 let pixy_configs = parse_configs(&server_configs.config_file).unwrap();
63
64 let gateway: Arc<dyn Gateway> = Arc::new(SensorGateway::from(pixy_configs));
65
66 run_server_with_gateway(gateway, server_configs).await;
67}
68
69#[instrument]
70async fn handler(
71 State(gateway): State<Arc<dyn Gateway>>,
72 Json(reading): Json<SensorMessage>,
73) -> StatusCode {
74 debug!("Received reading: {:?}", &reading);
75
76 tokio::spawn(async move {
77 gateway.handle_reading(reading).await;
78 });
79
80 StatusCode::ACCEPTED
81}
82
83#[instrument]
84async fn echo(data: String) -> String {
85 info!("Received data: {:?}", &data);
86 data
87}
88
89#[cfg(test)]
90mod tests {
91 use super::*;
92 use async_trait::async_trait;
93 use axum::body::Body;
94 use axum::http::{self, Request};
95 use tower::ServiceExt;
96
97 #[derive(Debug)]
98 struct MockGateway {}
99
100 #[async_trait]
101 impl Gateway for MockGateway {
102 async fn handle_reading(&self, _reading: SensorMessage) {}
103 }
104
105 fn default_config() -> ServerConfiguration {
106 ServerConfiguration {
107 config_file: String::new(),
108 port: 9147,
109 log_level: String::from("info"),
110 enable_echo: false,
111 }
112 }
113
114 #[tokio::test]
115 async fn test_health_endpoint() {
116 let gateway: Arc<dyn Gateway> = Arc::new(MockGateway {});
117
118 let app = create_app(gateway, &default_config());
119
120 let res = app
121 .oneshot(Request::get("/healthz").body(Body::empty()).unwrap())
122 .await
123 .unwrap();
124
125 assert_eq!(res.status(), http::StatusCode::OK);
126 }
127
128 #[tokio::test]
129 async fn test_echo_enabled() {
130 let mut configs = default_config();
131
132 configs.enable_echo = true;
133
134 let gateway: Arc<dyn Gateway> = Arc::new(MockGateway {});
135
136 let app = create_app(gateway, &configs);
137
138 let res = app
139 .oneshot(Request::post("/echo").body("hello".to_string()).unwrap())
140 .await
141 .unwrap();
142
143 assert_eq!(res.status(), http::StatusCode::OK);
144
145 let body = axum::body::to_bytes(res.into_body(), 5).await.unwrap();
146
147 assert_eq!(&body[..], b"hello");
148 }
149
150 #[tokio::test]
151 async fn test_echo_disable() {
152 let gateway: Arc<dyn Gateway> = Arc::new(MockGateway {});
153
154 let app = create_app(gateway, &default_config());
155
156 let res = app
157 .oneshot(Request::post("/echo").body("hello".to_string()).unwrap())
158 .await
159 .unwrap();
160
161 assert_eq!(res.status(), http::StatusCode::NOT_FOUND);
162 }
163
164 #[tokio::test]
165 async fn test_example_sensor_works() {
166 let gateway: Arc<dyn Gateway> = Arc::new(MockGateway {});
167
168 let app = create_app(gateway, &default_config());
169
170 let example_sensor: SensorMessage =
171 serde_json::from_str(include_str!("../../example-configs/test-sensor.json")).unwrap();
172
173 let res = app
174 .oneshot(
175 Request::post("/data")
176 .header("Content-Type", "application/json")
177 .body(serde_json::to_string(&example_sensor).unwrap())
178 .unwrap(),
179 )
180 .await
181 .unwrap();
182
183 assert_eq!(res.status(), http::StatusCode::ACCEPTED);
184 }
185
186 #[tokio::test]
187 async fn test_fails_if_wrong_content_type() {
188 let gateway: Arc<dyn Gateway> = Arc::new(MockGateway {});
189
190 let app = create_app(gateway, &default_config());
191
192 let example_sensor: SensorMessage =
193 serde_json::from_str(include_str!("../../example-configs/test-sensor.json")).unwrap();
194
195 let res = app
196 .oneshot(
197 Request::post("/data")
198 .body(serde_json::to_string(&example_sensor).unwrap())
199 .unwrap(),
200 )
201 .await
202 .unwrap();
203
204 assert_eq!(res.status(), http::StatusCode::UNSUPPORTED_MEDIA_TYPE);
205 }
206
207 #[tokio::test]
208 async fn test_malformed_sensor_fails() {
209 let gateway: Arc<dyn Gateway> = Arc::new(MockGateway {});
210
211 let app = create_app(gateway, &default_config());
212
213 let example_sensor: SensorMessage =
214 serde_json::from_str(include_str!("../../example-configs/test-sensor.json")).unwrap();
215
216 let res = app
217 .oneshot(
218 Request::post("/data")
219 .header("Content-Type", "application/json")
220 .body(
221 serde_json::to_string(&example_sensor)
222 .unwrap()
223 .replace("temperature", "hot"),
224 )
225 .unwrap(),
226 )
227 .await
228 .unwrap();
229
230 assert_eq!(res.status(), http::StatusCode::UNPROCESSABLE_ENTITY);
231 }
232}