pixy_server/
lib.rs

1pub mod config;
2
3use std::sync::Arc;
4
5use axum::{
6    extract::{Json, State},
7    http::StatusCode,
8    routing::{get, post},
9};
10use tracing::{debug, info, instrument};
11
12use crate::config::ServerConfiguration;
13use pixy_core::validation::parse_configs;
14use pixy_core::{Gateway, SensorGateway, SensorMessage};
15
16fn create_app(gateway: Arc<dyn Gateway>, server_configs: &ServerConfiguration) -> axum::Router {
17    let app = axum::Router::new()
18        .route("/data", post(handler))
19        .route("/healthz", get(|| async { StatusCode::OK }))
20        .with_state(gateway);
21
22    if server_configs.enable_echo {
23        app.route("/echo", post(echo))
24    } else {
25        app
26    }
27}
28
29pub async fn run_server_with_gateway(
30    gateway: Arc<dyn Gateway>,
31    server_configs: ServerConfiguration,
32) {
33    let app = create_app(gateway, &server_configs);
34
35    let bind_address = format!("0.0.0.0:{}", server_configs.port);
36
37    println!(
38        r#"
39     ___                     ___                 
40    /  /\      ___          /__/|          ___   
41   /  /::\    /  /\        |  |:|         /__/|  
42  /  /:/\:\  /  /:/        |  |:|        |  |:|  
43 /  /:/~/:/ /__/::\      __|__|:|        |  |:|  
44/__/:/ /:/  \__\/\:\__  /__/::::\____  __|__|:|  
45\  \:\/:/      \  \:\/\    ~\~~\::::/ /__/::::\  
46 \  \::/        \__\::/     |~~|:|~~     ~\~~\:\ 
47  \  \:\        /__/:/      |  |:|         \  \:\
48   \  \:\       \__\/       |  |:|          \__\/
49    \__\/                   |__|/                
50    "#
51    );
52
53    info!("Starting server on {}", &bind_address);
54    let listener = tokio::net::TcpListener::bind(&bind_address).await.unwrap();
55
56    axum::serve(listener, app.into_make_service())
57        .await
58        .unwrap();
59}
60
61pub async fn run_server_with(server_configs: ServerConfiguration) {
62    let pixy_configs = parse_configs(&server_configs.config_file).unwrap();
63
64    let gateway: Arc<dyn Gateway> = Arc::new(SensorGateway::from(pixy_configs));
65
66    run_server_with_gateway(gateway, server_configs).await;
67}
68
69#[instrument]
70async fn handler(
71    State(gateway): State<Arc<dyn Gateway>>,
72    Json(reading): Json<SensorMessage>,
73) -> StatusCode {
74    debug!("Received reading: {:?}", &reading);
75
76    tokio::spawn(async move {
77        gateway.handle_reading(reading).await;
78    });
79
80    StatusCode::ACCEPTED
81}
82
83#[instrument]
84async fn echo(data: String) -> String {
85    info!("Received data: {:?}", &data);
86    data
87}
88
89#[cfg(test)]
90mod tests {
91    use super::*;
92    use async_trait::async_trait;
93    use axum::body::Body;
94    use axum::http::{self, Request};
95    use tower::ServiceExt;
96
97    #[derive(Debug)]
98    struct MockGateway {}
99
100    #[async_trait]
101    impl Gateway for MockGateway {
102        async fn handle_reading(&self, _reading: SensorMessage) {}
103    }
104
105    fn default_config() -> ServerConfiguration {
106        ServerConfiguration {
107            config_file: String::new(),
108            port: 9147,
109            log_level: String::from("info"),
110            enable_echo: false,
111        }
112    }
113
114    #[tokio::test]
115    async fn test_health_endpoint() {
116        let gateway: Arc<dyn Gateway> = Arc::new(MockGateway {});
117
118        let app = create_app(gateway, &default_config());
119
120        let res = app
121            .oneshot(Request::get("/healthz").body(Body::empty()).unwrap())
122            .await
123            .unwrap();
124
125        assert_eq!(res.status(), http::StatusCode::OK);
126    }
127
128    #[tokio::test]
129    async fn test_echo_enabled() {
130        let mut configs = default_config();
131
132        configs.enable_echo = true;
133
134        let gateway: Arc<dyn Gateway> = Arc::new(MockGateway {});
135
136        let app = create_app(gateway, &configs);
137
138        let res = app
139            .oneshot(Request::post("/echo").body("hello".to_string()).unwrap())
140            .await
141            .unwrap();
142
143        assert_eq!(res.status(), http::StatusCode::OK);
144
145        let body = axum::body::to_bytes(res.into_body(), 5).await.unwrap();
146
147        assert_eq!(&body[..], b"hello");
148    }
149
150    #[tokio::test]
151    async fn test_echo_disable() {
152        let gateway: Arc<dyn Gateway> = Arc::new(MockGateway {});
153
154        let app = create_app(gateway, &default_config());
155
156        let res = app
157            .oneshot(Request::post("/echo").body("hello".to_string()).unwrap())
158            .await
159            .unwrap();
160
161        assert_eq!(res.status(), http::StatusCode::NOT_FOUND);
162    }
163
164    #[tokio::test]
165    async fn test_example_sensor_works() {
166        let gateway: Arc<dyn Gateway> = Arc::new(MockGateway {});
167
168        let app = create_app(gateway, &default_config());
169
170        let example_sensor: SensorMessage =
171            serde_json::from_str(include_str!("../../example-configs/test-sensor.json")).unwrap();
172
173        let res = app
174            .oneshot(
175                Request::post("/data")
176                    .header("Content-Type", "application/json")
177                    .body(serde_json::to_string(&example_sensor).unwrap())
178                    .unwrap(),
179            )
180            .await
181            .unwrap();
182
183        assert_eq!(res.status(), http::StatusCode::ACCEPTED);
184    }
185
186    #[tokio::test]
187    async fn test_fails_if_wrong_content_type() {
188        let gateway: Arc<dyn Gateway> = Arc::new(MockGateway {});
189
190        let app = create_app(gateway, &default_config());
191
192        let example_sensor: SensorMessage =
193            serde_json::from_str(include_str!("../../example-configs/test-sensor.json")).unwrap();
194
195        let res = app
196            .oneshot(
197                Request::post("/data")
198                    .body(serde_json::to_string(&example_sensor).unwrap())
199                    .unwrap(),
200            )
201            .await
202            .unwrap();
203
204        assert_eq!(res.status(), http::StatusCode::UNSUPPORTED_MEDIA_TYPE);
205    }
206
207    #[tokio::test]
208    async fn test_malformed_sensor_fails() {
209        let gateway: Arc<dyn Gateway> = Arc::new(MockGateway {});
210
211        let app = create_app(gateway, &default_config());
212
213        let example_sensor: SensorMessage =
214            serde_json::from_str(include_str!("../../example-configs/test-sensor.json")).unwrap();
215
216        let res = app
217            .oneshot(
218                Request::post("/data")
219                    .header("Content-Type", "application/json")
220                    .body(
221                        serde_json::to_string(&example_sensor)
222                            .unwrap()
223                            .replace("temperature", "hot"),
224                    )
225                    .unwrap(),
226            )
227            .await
228            .unwrap();
229
230        assert_eq!(res.status(), http::StatusCode::UNPROCESSABLE_ENTITY);
231    }
232}