random_image_server/
lib.rs

1use std::{convert::Infallible, fs, path::PathBuf, sync::Arc};
2
3use anyhow::{Result, anyhow};
4use http_body_util::Full;
5use hyper::{Request, Response, body::Bytes, service::service_fn};
6use hyper_util::{
7    rt::{TokioExecutor, TokioIo},
8    server::conn::auto,
9};
10use tokio::{
11    net::TcpListener,
12    sync::{RwLock, broadcast::Receiver},
13};
14use url::Url;
15
16use crate::config::{Config, ImageSource};
17use crate::state::ServerState;
18use crate::termination::Interrupted;
19
20pub mod cache;
21pub mod config;
22mod logging;
23pub mod state;
24pub use logging::init_logging;
25pub mod env;
26pub mod termination;
27
28pub const ALLOWED_IMAGE_EXTENSIONS: &[&str] = &["jpg", "jpeg", "png", "webp", "gif"];
29
30/// The main server structure
31pub struct ImageServer {
32    pub config: Config,
33    pub state: Arc<RwLock<ServerState>>,
34}
35
36impl ImageServer {
37    /// Create a new `ImageServer` instance with default configuration
38    #[must_use]
39    pub fn new() -> Self {
40        Self {
41            config: Config::default(),
42            state: Arc::new(RwLock::new(ServerState::default())),
43        }
44    }
45
46    /// Create a new `ImageServer` instance with custom configuration
47    #[must_use]
48    pub fn with_config(config: Config) -> Self {
49        Self {
50            state: Arc::new(RwLock::new(ServerState::with_config(&config))),
51            config,
52        }
53    }
54
55    /// Populate the cache with the configured images
56    ///
57    /// # Errors
58    ///
59    /// Returns an error if the image file does not exist, is not a file, or has an unsupported extension.
60    pub async fn populate_cache(&self) {
61        // This method can be implemented to load images from configured sources
62        // and populate the cache. For now, it is a placeholder.
63        log::info!("Populating cache with configured images...");
64
65        for source in &self.config.server.sources {
66            match source {
67                ImageSource::Url(url) => {
68                    log::info!("Loading image from URL: {url}");
69                    let key = cache::CacheKey::ImageUrl(url.clone());
70                    // fetch the image from the URL and store it in the cache
71                    match read_image_from_url(url).await {
72                        Ok(image) => {
73                            let set_result = self.state.write().await.cache.set(key, image);
74                            if let Err(err) = set_result {
75                                log::error!("Failed to store image in cache: {err}");
76                            }
77                        }
78                        Err(e) => {
79                            log::error!("Failed to read image from URL {url}: {e}");
80                        }
81                    }
82                }
83                ImageSource::Path(path) if path.is_file() => {
84                    let path = path.canonicalize().unwrap_or_else(|_| {
85                        log::warn!("Failed to canonicalize path: {}", path.display());
86                        path.clone()
87                    });
88                    if path.extension().is_some_and(|ext| {
89                        ALLOWED_IMAGE_EXTENSIONS.contains(&ext.to_string_lossy().as_ref())
90                    }) {
91                        log::info!("Loading image from file path: {}", path.display());
92                        // read the image file from the path and store it in the cache
93                        let Ok(image) = read_image_from_path(&path) else {
94                            log::error!("Failed to read image file: {}", path.display());
95                            continue;
96                        };
97                        let key = cache::CacheKey::ImagePath(path.clone());
98                        let set_result = self.state.write().await.cache.set(key, image);
99                        if let Err(err) = set_result {
100                            log::error!("Failed to store image in cache: {err}");
101                        }
102                    } else {
103                        log::warn!("Unsupported image file extension: {}", path.display());
104                    }
105                }
106                ImageSource::Path(path) if path.is_dir() => {
107                    let path = path.canonicalize().unwrap_or_else(|_| {
108                        log::warn!("Failed to canonicalize path: {}", path.display());
109                        path.clone()
110                    });
111
112                    log::info!("Loading images from directory: {}", path.display());
113                    // Read all image files in the directory and store them in the cache
114                    let mut state = self.state.write().await;
115                    walkdir::WalkDir::new(&path)
116                        .into_iter()
117                        .filter_map(Result::ok)
118                        .filter(|e| e.file_type().is_file())
119                        .filter(|e| {
120                            e.path()
121                                .extension()
122                                .and_then(|ext| ext.to_str())
123                                .is_some_and(|ext| ALLOWED_IMAGE_EXTENSIONS.contains(&ext))
124                        })
125                        .for_each(|entry| {
126                            let path = entry.path().to_path_buf();
127                            log::info!("Loading image from file: {}", path.display());
128                            // read the image file and store it in the cache
129                            match read_image_from_path(&path) {
130                                Ok(image) => {
131                                    let key = cache::CacheKey::ImagePath(path.clone());
132                                    let set_result = state.cache.set(key, image);
133                                    if let Err(err) = set_result {
134                                        log::error!("Failed to store image in cache: {err}");
135                                    }
136                                }
137                                Err(e) => {
138                                    log::error!(
139                                        "Failed to read image from path {}: {e}",
140                                        path.display(),
141                                    );
142                                }
143                            }
144                        });
145                }
146                ImageSource::Path(path) => {
147                    log::warn!("Unsupported image path: {}", path.display());
148                }
149            }
150        }
151    }
152
153    /// Start the server
154    ///
155    /// # Errors
156    ///
157    /// Returns an error if the server fails to start or encounters an unexpected error.
158    pub async fn start(&self, mut interrupt_rx: Receiver<Interrupted>) -> Result<()> {
159        let addr = self.config.socket_addr()?;
160        let listener = TcpListener::bind(addr).await?;
161        log::info!("Server running on http://{addr}");
162        log::debug!("Configuration: {:?}", self.config);
163
164        // Populate the cache with images from configured sources
165        self.populate_cache().await;
166        if self.state.read().await.cache.size() == 0 {
167            log::error!("No images found in cache, please check your configuration");
168            return Err(anyhow!(
169                "No images found in cache, please check your configuration"
170            ));
171        }
172
173        let executor = auto::Builder::new(TokioExecutor::new());
174
175        loop {
176            let (stream, _) = tokio::select! {
177                stream = listener.accept() => stream?,
178                _ = interrupt_rx.recv() => {
179                    log::info!("Received termination signal, shutting down server");
180                    break Ok(());
181                }
182            };
183
184            let io = TokioIo::new(stream);
185
186            // Clone state for the handler
187            let state = self.state.clone();
188
189            let service = service_fn(|req| {
190                let value = state.clone();
191                async move { handle_request(req, value).await }
192            });
193
194            // Spawn a new task to handle the connection
195            if let Err(e) = executor.serve_connection(io, service).await {
196                log::error!("Failed to serve connection: {e}");
197            }
198        }
199    }
200}
201
202impl Default for ImageServer {
203    fn default() -> Self {
204        Self::new()
205    }
206}
207
208/// Read an image file from the given path and return it as a `CacheValue`
209///
210/// # Errors
211///
212/// Returns an error if the file does not exist, is not a file, or has an unsupported extension.
213pub fn read_image_from_path(path: &PathBuf) -> Result<cache::CacheValue> {
214    if !path.exists() || !path.is_file() {
215        return Err(anyhow!("Image file does not exist: {}", path.display()));
216    }
217    let Some(ext) = path.extension().and_then(|ext| ext.to_str()) else {
218        return Err(anyhow!("Image file has no extension: {}", path.display()));
219    };
220    if !ALLOWED_IMAGE_EXTENSIONS.contains(&ext) {
221        return Err(anyhow!(
222            "Unsupported image file extension: {}",
223            path.display()
224        ));
225    }
226
227    let image_data = fs::read(path).map_err(|e| anyhow!("Failed to read image file: {}", e))?;
228    let content_type = mime_guess::from_path(path)
229        .first()
230        .ok_or_else(|| {
231            anyhow!(
232                "Failed to determine content type for image file: {}",
233                path.display()
234            )
235        })?
236        .to_string();
237    Ok(cache::CacheValue {
238        data: image_data,
239        content_type,
240    })
241}
242
243/// Fetch an image from a URL and return it as a `CacheValue`
244///
245/// # Errors
246///
247/// Returns an error if the image cannot be fetched or if the content type is unsupported.
248pub async fn read_image_from_url(url: &Url) -> Result<cache::CacheValue> {
249    let response = reqwest::get(url.as_str())
250        .await
251        .map_err(|e| anyhow!("Failed to fetch image from URL: {}", e))?;
252
253    if !response.status().is_success() {
254        return Err(anyhow!(
255            "Failed to fetch image, status: {}",
256            response.status()
257        ));
258    }
259
260    let content_type = response
261        .headers()
262        .get("Content-Type")
263        .and_then(|v| v.to_str().ok())
264        .ok_or_else(|| anyhow!("Failed to get Content-Type header from response"))?
265        .to_string();
266
267    if !ALLOWED_IMAGE_EXTENSIONS.contains(&content_type.split('/').next_back().unwrap_or("")) {
268        return Err(anyhow!("Unsupported image content type: {}", content_type));
269    }
270
271    let data = response
272        .bytes()
273        .await
274        .map_err(|e| anyhow!("Failed to read image bytes from response: {}", e))?;
275
276    Ok(cache::CacheValue {
277        data: data.to_vec(),
278        content_type,
279    })
280}
281
282/// Handle incoming HTTP requests
283///
284/// # Errors
285///
286/// should be Infallible
287pub async fn handle_request(
288    req: Request<hyper::body::Incoming>,
289    state: Arc<RwLock<ServerState>>,
290) -> Result<Response<Full<Bytes>>, Infallible> {
291    match req.uri().path() {
292        "/" => Ok(Response::new(Full::new(Bytes::from(
293            "Welcome to the Random Image Server!",
294        )))),
295        "/health" => Ok(Response::new(Full::new(Bytes::from("OK")))),
296        "/random" => match handle_random_image(state).await {
297            Ok(response) => Ok(response),
298            Err(err) => {
299                log::error!("Failed to get random image: {err}");
300                let mut not_found = Response::new(Full::new(Bytes::from("Not Found")));
301                *not_found.status_mut() = hyper::StatusCode::NOT_FOUND;
302                Ok(not_found)
303            }
304        },
305        "/sequential" => match handle_sequential_image(state).await {
306            Ok(response) => Ok(response),
307            Err(err) => {
308                log::error!("Failed to get sequential image: {err}");
309                let mut not_found = Response::new(Full::new(Bytes::from("Not Found")));
310                *not_found.status_mut() = hyper::StatusCode::NOT_FOUND;
311                Ok(not_found)
312            }
313        },
314        _ => {
315            let mut not_found = Response::new(Full::new(Bytes::from("Not Found")));
316            *not_found.status_mut() = hyper::StatusCode::NOT_FOUND;
317            Ok(not_found)
318        }
319    }
320}
321
322/// Handle random image serving
323///
324/// # Errors
325///
326/// Returns an error if no images are configured or if the image cannot be found in the cache.
327pub async fn handle_random_image(state: Arc<RwLock<ServerState>>) -> Result<Response<Full<Bytes>>> {
328    let state = state.read().await;
329
330    // get a random image from the cache
331    state.cache.get_random().map_or_else(
332        || {
333            Err(anyhow!(
334                "Failed to retrieve a random image, perhaps no images are configured"
335            ))
336        },
337        |image| {
338            let body = Full::new(Bytes::from(image.data));
339            let mut response = Response::new(body);
340            *response.status_mut() = hyper::StatusCode::OK;
341            response
342                .headers_mut()
343                .insert(hyper::header::CONTENT_TYPE, image.content_type.parse()?);
344            Ok(response)
345        },
346    )
347}
348
349/// Handle sequential image serving
350///
351/// # Errors
352///
353/// Returns an error if no images are configured or if the image cannot be found in the cache.
354pub async fn handle_sequential_image(
355    state: Arc<RwLock<ServerState>>,
356) -> Result<Response<Full<Bytes>>> {
357    let mut state = state.write().await;
358
359    if state.cache.is_empty() {
360        return Err(anyhow!("No image sources configured"));
361    }
362
363    let current_index = state.current_index % state.cache.size();
364    let source = state.cache.keys()[current_index].clone();
365    state.current_index = (current_index + 1) % state.cache.size();
366
367    // Fetch the image from the cache or source
368    if let Some(image) = state.cache.get(source.clone()) {
369        let body = Full::new(Bytes::from(image.data));
370        let mut response = Response::new(body);
371        *response.status_mut() = hyper::StatusCode::OK;
372        response
373            .headers_mut()
374            .insert(hyper::header::CONTENT_TYPE, image.content_type.parse()?);
375        Ok(response)
376    } else {
377        state.cache.remove(&source);
378        drop(state);
379        Err(anyhow!("Image not found in cache"))
380    }
381}
382
383#[cfg(test)]
384mod tests {
385    use super::*;
386    use crate::termination::create_termination;
387    use pretty_assertions::assert_eq;
388    use rstest::rstest;
389
390    #[test]
391    fn test_allowed_image_extensions() {
392        assert!(ALLOWED_IMAGE_EXTENSIONS.contains(&"jpg"));
393        assert!(ALLOWED_IMAGE_EXTENSIONS.contains(&"jpeg"));
394        assert!(ALLOWED_IMAGE_EXTENSIONS.contains(&"png"));
395        assert!(ALLOWED_IMAGE_EXTENSIONS.contains(&"webp"));
396        assert!(ALLOWED_IMAGE_EXTENSIONS.contains(&"gif"));
397        assert_eq!(ALLOWED_IMAGE_EXTENSIONS.len(), 5);
398    }
399
400    #[rstest]
401    #[tokio::test]
402    #[timeout(std::time::Duration::from_secs(2))]
403    async fn test_start_stop_server() {
404        let mut server = ImageServer::default();
405        let port = 0;
406        server.config.server.port = port;
407        server.config.server.sources = vec![ImageSource::Path(PathBuf::from("assets"))];
408
409        let (mut terminator, interrupt_rx) = create_termination();
410        terminator.terminate(Interrupted::UserInt).unwrap();
411        server.start(interrupt_rx).await.unwrap();
412    }
413}