random_image_server/
lib.rs1use std::{convert::Infallible, fs, path::PathBuf, sync::Arc};
2
3use anyhow::{Result, anyhow};
4use http_body_util::Full;
5use hyper::{Request, Response, body::Bytes, service::service_fn};
6use hyper_util::{
7 rt::{TokioExecutor, TokioIo},
8 server::conn::auto,
9};
10use tokio::{
11 net::TcpListener,
12 sync::{RwLock, broadcast::Receiver},
13};
14use url::Url;
15
16use crate::config::{Config, ImageSource};
17use crate::state::ServerState;
18use crate::termination::Interrupted;
19
20pub mod cache;
21pub mod config;
22mod logging;
23pub mod state;
24pub use logging::init_logging;
25pub mod env;
26pub mod termination;
27
28pub const ALLOWED_IMAGE_EXTENSIONS: &[&str] = &["jpg", "jpeg", "png", "webp", "gif"];
29
30pub struct ImageServer {
32 pub config: Config,
33 pub state: Arc<RwLock<ServerState>>,
34}
35
36impl ImageServer {
37 #[must_use]
39 pub fn new() -> Self {
40 Self {
41 config: Config::default(),
42 state: Arc::new(RwLock::new(ServerState::default())),
43 }
44 }
45
46 #[must_use]
48 pub fn with_config(config: Config) -> Self {
49 Self {
50 state: Arc::new(RwLock::new(ServerState::with_config(&config))),
51 config,
52 }
53 }
54
55 pub async fn populate_cache(&self) {
61 log::info!("Populating cache with configured images...");
64
65 for source in &self.config.server.sources {
66 match source {
67 ImageSource::Url(url) => {
68 log::info!("Loading image from URL: {url}");
69 let key = cache::CacheKey::ImageUrl(url.clone());
70 match read_image_from_url(url).await {
72 Ok(image) => {
73 let set_result = self.state.write().await.cache.set(key, image);
74 if let Err(err) = set_result {
75 log::error!("Failed to store image in cache: {err}");
76 }
77 }
78 Err(e) => {
79 log::error!("Failed to read image from URL {url}: {e}");
80 }
81 }
82 }
83 ImageSource::Path(path) if path.is_file() => {
84 let path = path.canonicalize().unwrap_or_else(|_| {
85 log::warn!("Failed to canonicalize path: {}", path.display());
86 path.clone()
87 });
88 if path.extension().is_some_and(|ext| {
89 ALLOWED_IMAGE_EXTENSIONS.contains(&ext.to_string_lossy().as_ref())
90 }) {
91 log::info!("Loading image from file path: {}", path.display());
92 let Ok(image) = read_image_from_path(&path) else {
94 log::error!("Failed to read image file: {}", path.display());
95 continue;
96 };
97 let key = cache::CacheKey::ImagePath(path.clone());
98 let set_result = self.state.write().await.cache.set(key, image);
99 if let Err(err) = set_result {
100 log::error!("Failed to store image in cache: {err}");
101 }
102 } else {
103 log::warn!("Unsupported image file extension: {}", path.display());
104 }
105 }
106 ImageSource::Path(path) if path.is_dir() => {
107 let path = path.canonicalize().unwrap_or_else(|_| {
108 log::warn!("Failed to canonicalize path: {}", path.display());
109 path.clone()
110 });
111
112 log::info!("Loading images from directory: {}", path.display());
113 let mut state = self.state.write().await;
115 walkdir::WalkDir::new(&path)
116 .into_iter()
117 .filter_map(Result::ok)
118 .filter(|e| e.file_type().is_file())
119 .filter(|e| {
120 e.path()
121 .extension()
122 .and_then(|ext| ext.to_str())
123 .is_some_and(|ext| ALLOWED_IMAGE_EXTENSIONS.contains(&ext))
124 })
125 .for_each(|entry| {
126 let path = entry.path().to_path_buf();
127 log::info!("Loading image from file: {}", path.display());
128 match read_image_from_path(&path) {
130 Ok(image) => {
131 let key = cache::CacheKey::ImagePath(path.clone());
132 let set_result = state.cache.set(key, image);
133 if let Err(err) = set_result {
134 log::error!("Failed to store image in cache: {err}");
135 }
136 }
137 Err(e) => {
138 log::error!(
139 "Failed to read image from path {}: {e}",
140 path.display(),
141 );
142 }
143 }
144 });
145 }
146 ImageSource::Path(path) => {
147 log::warn!("Unsupported image path: {}", path.display());
148 }
149 }
150 }
151 }
152
153 pub async fn start(&self, mut interrupt_rx: Receiver<Interrupted>) -> Result<()> {
159 let addr = self.config.socket_addr()?;
160 let listener = TcpListener::bind(addr).await?;
161 log::info!("Server running on http://{addr}");
162 log::debug!("Configuration: {:?}", self.config);
163
164 self.populate_cache().await;
166 if self.state.read().await.cache.size() == 0 {
167 log::error!("No images found in cache, please check your configuration");
168 return Err(anyhow!(
169 "No images found in cache, please check your configuration"
170 ));
171 }
172
173 let executor = auto::Builder::new(TokioExecutor::new());
174
175 loop {
176 let (stream, _) = tokio::select! {
177 stream = listener.accept() => stream?,
178 _ = interrupt_rx.recv() => {
179 log::info!("Received termination signal, shutting down server");
180 break Ok(());
181 }
182 };
183
184 let io = TokioIo::new(stream);
185
186 let state = self.state.clone();
188
189 let service = service_fn(|req| {
190 let value = state.clone();
191 async move { handle_request(req, value).await }
192 });
193
194 if let Err(e) = executor.serve_connection(io, service).await {
196 log::error!("Failed to serve connection: {e}");
197 }
198 }
199 }
200}
201
202impl Default for ImageServer {
203 fn default() -> Self {
204 Self::new()
205 }
206}
207
208pub fn read_image_from_path(path: &PathBuf) -> Result<cache::CacheValue> {
214 if !path.exists() || !path.is_file() {
215 return Err(anyhow!("Image file does not exist: {}", path.display()));
216 }
217 let Some(ext) = path.extension().and_then(|ext| ext.to_str()) else {
218 return Err(anyhow!("Image file has no extension: {}", path.display()));
219 };
220 if !ALLOWED_IMAGE_EXTENSIONS.contains(&ext) {
221 return Err(anyhow!(
222 "Unsupported image file extension: {}",
223 path.display()
224 ));
225 }
226
227 let image_data = fs::read(path).map_err(|e| anyhow!("Failed to read image file: {}", e))?;
228 let content_type = mime_guess::from_path(path)
229 .first()
230 .ok_or_else(|| {
231 anyhow!(
232 "Failed to determine content type for image file: {}",
233 path.display()
234 )
235 })?
236 .to_string();
237 Ok(cache::CacheValue {
238 data: image_data,
239 content_type,
240 })
241}
242
243pub async fn read_image_from_url(url: &Url) -> Result<cache::CacheValue> {
249 let response = reqwest::get(url.as_str())
250 .await
251 .map_err(|e| anyhow!("Failed to fetch image from URL: {}", e))?;
252
253 if !response.status().is_success() {
254 return Err(anyhow!(
255 "Failed to fetch image, status: {}",
256 response.status()
257 ));
258 }
259
260 let content_type = response
261 .headers()
262 .get("Content-Type")
263 .and_then(|v| v.to_str().ok())
264 .ok_or_else(|| anyhow!("Failed to get Content-Type header from response"))?
265 .to_string();
266
267 if !ALLOWED_IMAGE_EXTENSIONS.contains(&content_type.split('/').next_back().unwrap_or("")) {
268 return Err(anyhow!("Unsupported image content type: {}", content_type));
269 }
270
271 let data = response
272 .bytes()
273 .await
274 .map_err(|e| anyhow!("Failed to read image bytes from response: {}", e))?;
275
276 Ok(cache::CacheValue {
277 data: data.to_vec(),
278 content_type,
279 })
280}
281
282pub async fn handle_request(
288 req: Request<hyper::body::Incoming>,
289 state: Arc<RwLock<ServerState>>,
290) -> Result<Response<Full<Bytes>>, Infallible> {
291 match req.uri().path() {
292 "/" => Ok(Response::new(Full::new(Bytes::from(
293 "Welcome to the Random Image Server!",
294 )))),
295 "/health" => Ok(Response::new(Full::new(Bytes::from("OK")))),
296 "/random" => match handle_random_image(state).await {
297 Ok(response) => Ok(response),
298 Err(err) => {
299 log::error!("Failed to get random image: {err}");
300 let mut not_found = Response::new(Full::new(Bytes::from("Not Found")));
301 *not_found.status_mut() = hyper::StatusCode::NOT_FOUND;
302 Ok(not_found)
303 }
304 },
305 "/sequential" => match handle_sequential_image(state).await {
306 Ok(response) => Ok(response),
307 Err(err) => {
308 log::error!("Failed to get sequential image: {err}");
309 let mut not_found = Response::new(Full::new(Bytes::from("Not Found")));
310 *not_found.status_mut() = hyper::StatusCode::NOT_FOUND;
311 Ok(not_found)
312 }
313 },
314 _ => {
315 let mut not_found = Response::new(Full::new(Bytes::from("Not Found")));
316 *not_found.status_mut() = hyper::StatusCode::NOT_FOUND;
317 Ok(not_found)
318 }
319 }
320}
321
322pub async fn handle_random_image(state: Arc<RwLock<ServerState>>) -> Result<Response<Full<Bytes>>> {
328 let state = state.read().await;
329
330 state.cache.get_random().map_or_else(
332 || {
333 Err(anyhow!(
334 "Failed to retrieve a random image, perhaps no images are configured"
335 ))
336 },
337 |image| {
338 let body = Full::new(Bytes::from(image.data));
339 let mut response = Response::new(body);
340 *response.status_mut() = hyper::StatusCode::OK;
341 response
342 .headers_mut()
343 .insert(hyper::header::CONTENT_TYPE, image.content_type.parse()?);
344 Ok(response)
345 },
346 )
347}
348
349pub async fn handle_sequential_image(
355 state: Arc<RwLock<ServerState>>,
356) -> Result<Response<Full<Bytes>>> {
357 let mut state = state.write().await;
358
359 if state.cache.is_empty() {
360 return Err(anyhow!("No image sources configured"));
361 }
362
363 let current_index = state.current_index % state.cache.size();
364 let source = state.cache.keys()[current_index].clone();
365 state.current_index = (current_index + 1) % state.cache.size();
366
367 if let Some(image) = state.cache.get(source.clone()) {
369 let body = Full::new(Bytes::from(image.data));
370 let mut response = Response::new(body);
371 *response.status_mut() = hyper::StatusCode::OK;
372 response
373 .headers_mut()
374 .insert(hyper::header::CONTENT_TYPE, image.content_type.parse()?);
375 Ok(response)
376 } else {
377 state.cache.remove(&source);
378 drop(state);
379 Err(anyhow!("Image not found in cache"))
380 }
381}
382
383#[cfg(test)]
384mod tests {
385 use super::*;
386 use crate::termination::create_termination;
387 use pretty_assertions::assert_eq;
388 use rstest::rstest;
389
390 #[test]
391 fn test_allowed_image_extensions() {
392 assert!(ALLOWED_IMAGE_EXTENSIONS.contains(&"jpg"));
393 assert!(ALLOWED_IMAGE_EXTENSIONS.contains(&"jpeg"));
394 assert!(ALLOWED_IMAGE_EXTENSIONS.contains(&"png"));
395 assert!(ALLOWED_IMAGE_EXTENSIONS.contains(&"webp"));
396 assert!(ALLOWED_IMAGE_EXTENSIONS.contains(&"gif"));
397 assert_eq!(ALLOWED_IMAGE_EXTENSIONS.len(), 5);
398 }
399
400 #[rstest]
401 #[tokio::test]
402 #[timeout(std::time::Duration::from_secs(2))]
403 async fn test_start_stop_server() {
404 let mut server = ImageServer::default();
405 let port = 0;
406 server.config.server.port = port;
407 server.config.server.sources = vec![ImageSource::Path(PathBuf::from("assets"))];
408
409 let (mut terminator, interrupt_rx) = create_termination();
410 terminator.terminate(Interrupted::UserInt).unwrap();
411 server.start(interrupt_rx).await.unwrap();
412 }
413}