ferrite_cache/
manager.rs

1use std::{path::PathBuf, sync::Arc, thread};
2
3use crate::{
4    types::{CacheConfig, CacheHandle, CacheRequest, CacheState},
5    CacheError, CacheResult, ImageLoadError,
6};
7use image::{DynamicImage, GenericImageView};
8use tokio::{
9    runtime::Runtime,
10    sync::{mpsc, oneshot, RwLock},
11    time::Instant,
12};
13use tracing::{debug, info, instrument};
14pub struct CacheManager {
15    config: CacheConfig,
16    state: Arc<RwLock<CacheState>>,
17    runtime_handle: Arc<Runtime>,
18    _shutdown_tx: oneshot::Sender<()>,
19}
20
21impl CacheManager {
22    #[instrument(skip(config), fields(max_images = config.max_image_count))]
23    pub fn new(config: CacheConfig) -> CacheHandle {
24        let (request_tx, mut request_rx) = mpsc::unbounded_channel();
25        let (shutdown_tx, shutdown_rx) = oneshot::channel();
26
27        let state = Arc::new(RwLock::new(CacheState::new()));
28
29        thread::spawn(move || {
30            let runtime = Arc::new(
31                tokio::runtime::Builder::new_multi_thread()
32                    .worker_threads(config.thread_count)
33                    .enable_all()
34                    .build()
35                    .expect("Failed to create Tokio runtime"),
36            );
37
38            let manager = Arc::new(Self {
39                config,
40                state: state.clone(),
41                runtime_handle: runtime.clone(),
42                _shutdown_tx: shutdown_tx,
43            });
44
45            runtime.block_on(async {
46                let shutdown_future = shutdown_rx;
47                tokio::pin!(shutdown_future);
48
49                loop {
50                    tokio::select! {
51                        _ = &mut shutdown_future => {
52                            debug!("Received shutdown signal");
53                            break;
54                        }
55                        Some(request) = request_rx.recv() => {
56                            let manager = manager.clone();
57                            match request {
58                                CacheRequest::GetImage { path, response_tx } => {
59                                    runtime.spawn(async move {
60                                        let result = manager.get_image_internal(path).await;
61                                        let _ = response_tx.send(result);
62                                    });
63                                }
64                                CacheRequest::CacheImage { path, response_tx } => {
65        let manager = Arc::clone(&manager);
66        runtime.spawn(async move {
67            manager.handle_cache_request(path, response_tx).await;
68        });
69    }
70                            }
71                        }
72                        else => break,
73                    }
74                }
75                debug!("Cache manager event loop terminated");
76            });
77        });
78
79        CacheHandle::new(request_tx)
80    }
81
82    async fn handle_cache_request(
83        &self,
84        path: PathBuf,
85        response_tx: oneshot::Sender<CacheResult<()>>,
86    ) {
87        // Clone what we need before spawning
88        let state = Arc::clone(&self.state);
89        let config = self.config.clone();
90        let runtime = self.runtime();
91
92        runtime.spawn(async move {
93            // TODO: Unused
94            let _file_size = tokio::fs::metadata(&path).await.map_err(|e| {
95                CacheError::ImageLoad {
96                    path: path.clone(),
97                    source: ImageLoadError::Io(e),
98                }
99            })?;
100
101            // Respond immediately with acknowledgment
102            let _ = response_tx.send(Ok(()));
103
104            // Continue loading in background
105            let image_data = tokio::fs::read(&path).await.map_err(|e| {
106                CacheError::ImageLoad {
107                    path: path.clone(),
108                    source: ImageLoadError::Io(e),
109                }
110            })?;
111
112            let decoded_image =
113                image::load_from_memory(&image_data).map_err(|e| {
114                    CacheError::ImageLoad {
115                        path: path.clone(),
116                        source: ImageLoadError::Format(e.to_string()),
117                    }
118                })?;
119            // Update cache state
120            let mut state = state.write().await;
121
122            // Check if we need to evict images to make space
123            if state.entries.len() >= config.max_image_count {
124                if let Some(oldest_path) = state.lru_list.first().cloned() {
125                    info!(
126                        path = ?oldest_path,
127                        "Evicting least recently used image"
128                    );
129                    state.entries.remove(&oldest_path);
130                    state.lru_list.remove(0);
131                }
132            }
133
134            // Update LRU list
135            if let Some(pos) = state.lru_list.iter().position(|p| p == &path) {
136                state.lru_list.remove(pos);
137            }
138            state.lru_list.push(path.clone());
139
140            // Store the image data
141            state.entries.insert(path.clone(), decoded_image);
142
143            debug!(
144                path = ?path,
145                cache_size = state.entries.len(),
146                "Image cached successfully"
147            );
148
149            Ok::<(), CacheError>(())
150        });
151    }
152
153    async fn get_image_internal(
154        &self,
155        path: PathBuf,
156    ) -> CacheResult<Arc<DynamicImage>> {
157        let start_time = Instant::now();
158        debug!(path = ?path, "Image requested from cache");
159
160        // Track cache lookup time
161        let lookup_start = Instant::now();
162        if let Some(image) = self.lookup_image(&path).await {
163            let lookup_duration = lookup_start.elapsed();
164            let total_duration = start_time.elapsed();
165            debug!(
166                path = ?path,
167                lookup_time = ?lookup_duration,
168                total_time = ?total_duration,
169                "Cache hit"
170            );
171            return Ok(image);
172        }
173
174        debug!(path = ?path, "Cache miss, loading from disk");
175
176        // Track disk load time
177        let load_start = Instant::now();
178        let image = self.load_and_cache(path.clone()).await?;
179        let load_duration = load_start.elapsed();
180        let total_duration = start_time.elapsed();
181
182        debug!(
183            path = ?path,
184            load_time = ?load_duration,
185            total_time = ?total_duration,
186            "Cache miss handled"
187        );
188        Ok(image)
189    }
190
191    #[instrument(skip(self, path), fields(path = ?path))]
192    pub async fn cache_image(
193        &self,
194        path: PathBuf,
195    ) -> CacheResult<Arc<DynamicImage>> {
196        let file_size = tokio::fs::metadata(&path)
197            .await
198            .map_err(|e| CacheError::ImageLoad {
199                path: path.clone(),
200                source: ImageLoadError::Io(e),
201            })?
202            .len();
203
204        debug!(
205            path = ?path,
206            size = file_size,
207            "Loading image from filesystem"
208        );
209
210        // Read the file contents using tokio's async file IO
211        let image_data = tokio::fs::read(&path).await.map_err(|e| {
212            CacheError::ImageLoad {
213                path: path.clone(),
214                source: ImageLoadError::Io(e),
215            }
216        })?;
217
218        let image_data = image::load_from_memory(&image_data).unwrap();
219
220        let mut state = self.state.write().await;
221
222        if state.entries.len() >= self.config.max_image_count {
223            if let Some(oldest_path) = state.lru_list.first().cloned() {
224                info!(
225                    path = ?oldest_path,
226                    "Evicting least recently used image"
227                );
228                state.entries.remove(&oldest_path);
229                state.lru_list.remove(0);
230            }
231        }
232
233        // Update LRU list - remove if exists and add to end
234        if let Some(pos) = state.lru_list.iter().position(|p| p == &path) {
235            state.lru_list.remove(pos);
236        }
237        state.lru_list.push(path.clone());
238
239        // Store the image data
240        let image_data = Arc::new(image_data);
241        state
242            .entries
243            .insert(path.clone(), (*image_data).clone());
244
245        debug!(
246            path = ?path,
247            cache_size = state.entries.len(),
248            "Image cached successfully"
249        );
250
251        Ok(image_data)
252    }
253
254    pub fn runtime(&self) -> Arc<Runtime> {
255        self.runtime_handle.clone()
256    }
257
258    pub async fn get_image(
259        &self,
260        path: PathBuf,
261    ) -> CacheResult<Arc<DynamicImage>> {
262        let start_time = std::time::Instant::now();
263        debug!(path = ?path, "Image requested from cache");
264
265        if let Some(image) = self.lookup_image(&path).await {
266            let duration = start_time.elapsed();
267            debug!(path = ?path, duration = ?duration, "Cache hit");
268            return Ok(image);
269        }
270
271        debug!(path = ?path, "Cache miss, loading from disk");
272        let image = self.load_and_cache(path.clone()).await?;
273        let duration = start_time.elapsed();
274        debug!(path = ?path, duration = ?duration, "Total cache miss time");
275        Ok(image)
276    }
277
278    // Shit code
279    async fn lookup_image(&self, path: &PathBuf) -> Option<Arc<DynamicImage>> {
280        let mut state = self.state.write().await;
281
282        if let Some(image) = state.entries.get(path) {
283            debug!(path = ?path, "Found image in cache");
284            return Some(Arc::new(image.clone()));
285        }
286        self.update_lru(path, &mut state).await;
287
288        debug!(path = ?path, "Image not found in cache");
289        None
290    }
291
292    async fn load_and_cache(
293        &self,
294        path: PathBuf,
295    ) -> CacheResult<Arc<DynamicImage>> {
296        let load_start = Instant::now();
297
298        // Track file read time
299        let read_start = Instant::now();
300        let file_data = tokio::fs::read(&path).await.map_err(|e| {
301            CacheError::ImageLoad {
302                path: path.clone(),
303                source: ImageLoadError::Io(e),
304            }
305        })?;
306        let read_duration = read_start.elapsed();
307
308        // Track decode time
309        let decode_start = Instant::now();
310        let decoded_image =
311            image::load_from_memory(&file_data).map_err(|e| {
312                CacheError::ImageLoad {
313                    path: path.clone(),
314                    source: ImageLoadError::Format(e.to_string()),
315                }
316            })?;
317        let decode_duration = decode_start.elapsed();
318
319        let dimensions = decoded_image.dimensions();
320        let file_size = file_data.len();
321
322        // Update cache state
323        let cache_start = Instant::now();
324        let mut state = self.state.write().await;
325
326        // Handle eviction if needed
327        if state.entries.len() >= self.config.max_image_count {
328            debug!(
329                "Cache full ({}/{}), evicting oldest entry",
330                state.entries.len(),
331                self.config.max_image_count
332            );
333            if let Some(oldest_path) = state.lru_list.first().cloned() {
334                state.entries.remove(&oldest_path);
335                state.lru_list.remove(0);
336            }
337        }
338
339        let image_data = Arc::new(decoded_image);
340        state
341            .entries
342            .insert(path.clone(), (*image_data).clone());
343        state.lru_list.push(path.clone());
344
345        let cache_update_duration = cache_start.elapsed();
346        let total_duration = load_start.elapsed();
347
348        debug!(
349            path = ?path,
350            width = dimensions.0,
351            height = dimensions.1,
352            file_size = file_size,
353            read_time = ?read_duration,
354            decode_time = ?decode_duration,
355            cache_update_time = ?cache_update_duration,
356            total_time = ?total_duration,
357            "Image loaded and cached"
358        );
359
360        Ok(image_data)
361    }
362
363    async fn update_lru(&self, path: &PathBuf, state: &mut CacheState) {
364        if let Some(pos) = state.lru_list.iter().position(|p| p == path) {
365            state.lru_list.remove(pos);
366        }
367        state.lru_list.push(path.clone());
368        debug!(
369            path = ?path,
370            list_size = state.lru_list.len(),
371            "Updated LRU list"
372        );
373    }
374}
375
376impl Drop for CacheManager {
377    fn drop(&mut self) {
378        debug!("CacheManager being dropped, cleaning up resources");
379
380        // Clear cache entries
381        let state = self.state.try_write();
382        if let Ok(mut state) = state {
383            state.entries.clear();
384            state.lru_list.clear();
385            debug!("Cache entries cleared");
386        }
387    }
388}