ferrite_cache/
manager.rs

1use std::{path::PathBuf, sync::Arc, thread};
2
3use crate::{
4    types::{CacheConfig, CacheHandle, CacheRequest, CacheState},
5    CacheError,
6    CacheResult,
7    ImageLoadError,
8};
9use image::{DynamicImage, GenericImageView};
10use tokio::{
11    runtime::Runtime,
12    sync::{mpsc, oneshot, RwLock},
13    time::Instant,
14};
15use tracing::{debug, info, instrument};
16pub struct CacheManager {
17    config:         CacheConfig,
18    state:          Arc<RwLock<CacheState>>,
19    runtime_handle: Arc<Runtime>,
20    _shutdown_tx:   oneshot::Sender<()>,
21}
22
23impl CacheManager {
24    #[instrument(skip(config), fields(max_images = config.max_image_count))]
25    pub fn new(config: CacheConfig) -> CacheHandle {
26        let (request_tx, mut request_rx) = mpsc::unbounded_channel();
27        let (shutdown_tx, shutdown_rx) = oneshot::channel();
28
29        let state = Arc::new(RwLock::new(CacheState::new()));
30
31        thread::spawn(move || {
32            let runtime = Arc::new(
33                tokio::runtime::Builder::new_multi_thread()
34                    .worker_threads(config.thread_count)
35                    .enable_all()
36                    .build()
37                    .expect("Failed to create Tokio runtime"),
38            );
39
40            let manager = Arc::new(Self {
41                config,
42                state: state.clone(),
43                runtime_handle: runtime.clone(),
44                _shutdown_tx: shutdown_tx,
45            });
46
47            runtime.block_on(async {
48                let shutdown_future = shutdown_rx;
49                tokio::pin!(shutdown_future);
50
51                loop {
52                    tokio::select! {
53                        _ = &mut shutdown_future => {
54                            debug!("Received shutdown signal");
55                            break;
56                        }
57                        Some(request) = request_rx.recv() => {
58                            let manager = manager.clone();
59                            match request {
60                                CacheRequest::GetImage { path, response_tx } => {
61                                    runtime.spawn(async move {
62                                        let result = manager.get_image_internal(path).await;
63                                        let _ = response_tx.send(result);
64                                    });
65                                }
66                                CacheRequest::CacheImage { path, response_tx } => {
67        let manager = Arc::clone(&manager);
68        runtime.spawn(async move {
69            manager.handle_cache_request(path, response_tx).await;
70        });
71    }
72                            }
73                        }
74                        else => break,
75                    }
76                }
77                debug!("Cache manager event loop terminated");
78            });
79        });
80
81        CacheHandle::new(request_tx)
82    }
83
84    async fn handle_cache_request(
85        &self,
86        path: PathBuf,
87        response_tx: oneshot::Sender<CacheResult<()>>,
88    ) {
89        // Clone what we need before spawning
90        let state = Arc::clone(&self.state);
91        let config = self.config.clone();
92        let runtime = self.runtime();
93
94        runtime.spawn(async move {
95            let file_size = tokio::fs::metadata(&path).await.map_err(|e| {
96                CacheError::ImageLoad {
97                    path:   path.clone(),
98                    source: ImageLoadError::Io(e),
99                }
100            })?;
101
102            // Respond immediately with acknowledgment
103            let _ = response_tx.send(Ok(()));
104
105            // Continue loading in background
106            let image_data = tokio::fs::read(&path).await.map_err(|e| {
107                CacheError::ImageLoad {
108                    path:   path.clone(),
109                    source: ImageLoadError::Io(e),
110                }
111            })?;
112
113            let decoded_image =
114                image::load_from_memory(&image_data).map_err(|e| {
115                    CacheError::ImageLoad {
116                        path:   path.clone(),
117                        source: ImageLoadError::Format(e.to_string()),
118                    }
119                })?;
120            // Update cache state
121            let mut state = state.write().await;
122
123            // Check if we need to evict images to make space
124            if state.entries.len() >= config.max_image_count {
125                if let Some(oldest_path) = state.lru_list.first().cloned() {
126                    info!(
127                        path = ?oldest_path,
128                        "Evicting least recently used image"
129                    );
130                    state.entries.remove(&oldest_path);
131                    state.lru_list.remove(0);
132                }
133            }
134
135            // Update LRU list
136            if let Some(pos) = state.lru_list.iter().position(|p| p == &path) {
137                state.lru_list.remove(pos);
138            }
139            state.lru_list.push(path.clone());
140
141            // Store the image data
142            state.entries.insert(path.clone(), decoded_image);
143
144            debug!(
145                path = ?path,
146                cache_size = state.entries.len(),
147                "Image cached successfully"
148            );
149
150            Ok::<(), CacheError>(())
151        });
152    }
153
154    async fn get_image_internal(
155        &self,
156        path: PathBuf,
157    ) -> CacheResult<Arc<DynamicImage>> {
158        let start_time = Instant::now();
159        debug!(path = ?path, "Image requested from cache");
160
161        // Track cache lookup time
162        let lookup_start = Instant::now();
163        if let Some(image) = self.lookup_image(&path).await {
164            let lookup_duration = lookup_start.elapsed();
165            let total_duration = start_time.elapsed();
166            debug!(
167                path = ?path,
168                lookup_time = ?lookup_duration,
169                total_time = ?total_duration,
170                "Cache hit"
171            );
172            return Ok(image);
173        }
174
175        debug!(path = ?path, "Cache miss, loading from disk");
176
177        // Track disk load time
178        let load_start = Instant::now();
179        let image = self.load_and_cache(path.clone()).await?;
180        let load_duration = load_start.elapsed();
181        let total_duration = start_time.elapsed();
182
183        debug!(
184            path = ?path,
185            load_time = ?load_duration,
186            total_time = ?total_duration,
187            "Cache miss handled"
188        );
189        Ok(image)
190    }
191
192    #[instrument(skip(self, path), fields(path = ?path))]
193    pub async fn cache_image(
194        &self,
195        path: PathBuf,
196    ) -> CacheResult<Arc<DynamicImage>> {
197        let file_size = tokio::fs::metadata(&path)
198            .await
199            .map_err(|e| CacheError::ImageLoad {
200                path:   path.clone(),
201                source: ImageLoadError::Io(e),
202            })?
203            .len();
204
205        debug!(
206            path = ?path,
207            size = file_size,
208            "Loading image from filesystem"
209        );
210
211        // Read the file contents using tokio's async file IO
212        let image_data = tokio::fs::read(&path).await.map_err(|e| {
213            CacheError::ImageLoad {
214                path:   path.clone(),
215                source: ImageLoadError::Io(e),
216            }
217        })?;
218
219        let image_data = image::load_from_memory(&image_data).unwrap();
220
221        let mut state = self.state.write().await;
222
223        if state.entries.len() >= self.config.max_image_count {
224            if let Some(oldest_path) = state.lru_list.first().cloned() {
225                info!(
226                    path = ?oldest_path,
227                    "Evicting least recently used image"
228                );
229                state.entries.remove(&oldest_path);
230                state.lru_list.remove(0);
231            }
232        }
233
234        // Update LRU list - remove if exists and add to end
235        if let Some(pos) = state.lru_list.iter().position(|p| p == &path) {
236            state.lru_list.remove(pos);
237        }
238        state.lru_list.push(path.clone());
239
240        // Store the image data
241        let image_data = Arc::new(image_data);
242        state
243            .entries
244            .insert(path.clone(), (*image_data).clone());
245
246        debug!(
247            path = ?path,
248            cache_size = state.entries.len(),
249            "Image cached successfully"
250        );
251
252        Ok(image_data)
253    }
254
255    pub fn runtime(&self) -> Arc<Runtime> {
256        self.runtime_handle.clone()
257    }
258
259    pub async fn get_image(
260        &self,
261        path: PathBuf,
262    ) -> CacheResult<Arc<DynamicImage>> {
263        let start_time = std::time::Instant::now();
264        debug!(path = ?path, "Image requested from cache");
265
266        if let Some(image) = self.lookup_image(&path).await {
267            let duration = start_time.elapsed();
268            debug!(path = ?path, duration = ?duration, "Cache hit");
269            return Ok(image);
270        }
271
272        debug!(path = ?path, "Cache miss, loading from disk");
273        let image = self.load_and_cache(path.clone()).await?;
274        let duration = start_time.elapsed();
275        debug!(path = ?path, duration = ?duration, "Total cache miss time");
276        Ok(image)
277    }
278
279    // Shit code
280    async fn lookup_image(&self, path: &PathBuf) -> Option<Arc<DynamicImage>> {
281        let mut state = self.state.write().await;
282
283        if let Some(image) = state.entries.get(path) {
284            debug!(path = ?path, "Found image in cache");
285            return Some(Arc::new(image.clone()));
286        }
287        self.update_lru(path, &mut state).await;
288
289        debug!(path = ?path, "Image not found in cache");
290        None
291    }
292
293    async fn load_and_cache(
294        &self,
295        path: PathBuf,
296    ) -> CacheResult<Arc<DynamicImage>> {
297        let load_start = Instant::now();
298
299        // Track file read time
300        let read_start = Instant::now();
301        let file_data = tokio::fs::read(&path).await.map_err(|e| {
302            CacheError::ImageLoad {
303                path:   path.clone(),
304                source: ImageLoadError::Io(e),
305            }
306        })?;
307        let read_duration = read_start.elapsed();
308
309        // Track decode time
310        let decode_start = Instant::now();
311        let decoded_image =
312            image::load_from_memory(&file_data).map_err(|e| {
313                CacheError::ImageLoad {
314                    path:   path.clone(),
315                    source: ImageLoadError::Format(e.to_string()),
316                }
317            })?;
318        let decode_duration = decode_start.elapsed();
319
320        let dimensions = decoded_image.dimensions();
321        let file_size = file_data.len();
322
323        // Update cache state
324        let cache_start = Instant::now();
325        let mut state = self.state.write().await;
326
327        // Handle eviction if needed
328        if state.entries.len() >= self.config.max_image_count {
329            debug!(
330                "Cache full ({}/{}), evicting oldest entry",
331                state.entries.len(),
332                self.config.max_image_count
333            );
334            if let Some(oldest_path) = state.lru_list.first().cloned() {
335                state.entries.remove(&oldest_path);
336                state.lru_list.remove(0);
337            }
338        }
339
340        let image_data = Arc::new(decoded_image);
341        state
342            .entries
343            .insert(path.clone(), (*image_data).clone());
344        state.lru_list.push(path.clone());
345
346        let cache_update_duration = cache_start.elapsed();
347        let total_duration = load_start.elapsed();
348
349        debug!(
350            path = ?path,
351            width = dimensions.0,
352            height = dimensions.1,
353            file_size = file_size,
354            read_time = ?read_duration,
355            decode_time = ?decode_duration,
356            cache_update_time = ?cache_update_duration,
357            total_time = ?total_duration,
358            "Image loaded and cached"
359        );
360
361        Ok(image_data)
362    }
363
364    async fn update_lru(&self, path: &PathBuf, state: &mut CacheState) {
365        if let Some(pos) = state.lru_list.iter().position(|p| p == path) {
366            state.lru_list.remove(pos);
367        }
368        state.lru_list.push(path.clone());
369        debug!(
370            path = ?path,
371            list_size = state.lru_list.len(),
372            "Updated LRU list"
373        );
374    }
375}
376
377impl Drop for CacheManager {
378    fn drop(&mut self) {
379        debug!("CacheManager being dropped, cleaning up resources");
380
381        // Clear cache entries
382        let state = self.state.try_write();
383        if let Ok(mut state) = state {
384            state.entries.clear();
385            state.lru_list.clear();
386            debug!("Cache entries cleared");
387        }
388    }
389}