1use std::{path::PathBuf, sync::Arc, thread};
2
3use crate::{
4 types::{CacheConfig, CacheHandle, CacheRequest, CacheState},
5 CacheError,
6 CacheResult,
7 ImageLoadError,
8};
9use image::{DynamicImage, GenericImageView};
10use tokio::{
11 runtime::Runtime,
12 sync::{mpsc, oneshot, RwLock},
13 time::Instant,
14};
15use tracing::{debug, info, instrument};
16pub struct CacheManager {
17 config: CacheConfig,
18 state: Arc<RwLock<CacheState>>,
19 runtime_handle: Arc<Runtime>,
20 _shutdown_tx: oneshot::Sender<()>,
21}
22
23impl CacheManager {
24 #[instrument(skip(config), fields(max_images = config.max_image_count))]
25 pub fn new(config: CacheConfig) -> CacheHandle {
26 let (request_tx, mut request_rx) = mpsc::unbounded_channel();
27 let (shutdown_tx, shutdown_rx) = oneshot::channel();
28
29 let state = Arc::new(RwLock::new(CacheState::new()));
30
31 thread::spawn(move || {
32 let runtime = Arc::new(
33 tokio::runtime::Builder::new_multi_thread()
34 .worker_threads(config.thread_count)
35 .enable_all()
36 .build()
37 .expect("Failed to create Tokio runtime"),
38 );
39
40 let manager = Arc::new(Self {
41 config,
42 state: state.clone(),
43 runtime_handle: runtime.clone(),
44 _shutdown_tx: shutdown_tx,
45 });
46
47 runtime.block_on(async {
48 let shutdown_future = shutdown_rx;
49 tokio::pin!(shutdown_future);
50
51 loop {
52 tokio::select! {
53 _ = &mut shutdown_future => {
54 debug!("Received shutdown signal");
55 break;
56 }
57 Some(request) = request_rx.recv() => {
58 let manager = manager.clone();
59 match request {
60 CacheRequest::GetImage { path, response_tx } => {
61 runtime.spawn(async move {
62 let result = manager.get_image_internal(path).await;
63 let _ = response_tx.send(result);
64 });
65 }
66 CacheRequest::CacheImage { path, response_tx } => {
67 let manager = Arc::clone(&manager);
68 runtime.spawn(async move {
69 manager.handle_cache_request(path, response_tx).await;
70 });
71 }
72 }
73 }
74 else => break,
75 }
76 }
77 debug!("Cache manager event loop terminated");
78 });
79 });
80
81 CacheHandle::new(request_tx)
82 }
83
84 async fn handle_cache_request(
85 &self,
86 path: PathBuf,
87 response_tx: oneshot::Sender<CacheResult<()>>,
88 ) {
89 let state = Arc::clone(&self.state);
91 let config = self.config.clone();
92 let runtime = self.runtime();
93
94 runtime.spawn(async move {
95 let file_size = tokio::fs::metadata(&path).await.map_err(|e| {
96 CacheError::ImageLoad {
97 path: path.clone(),
98 source: ImageLoadError::Io(e),
99 }
100 })?;
101
102 let _ = response_tx.send(Ok(()));
104
105 let image_data = tokio::fs::read(&path).await.map_err(|e| {
107 CacheError::ImageLoad {
108 path: path.clone(),
109 source: ImageLoadError::Io(e),
110 }
111 })?;
112
113 let decoded_image =
114 image::load_from_memory(&image_data).map_err(|e| {
115 CacheError::ImageLoad {
116 path: path.clone(),
117 source: ImageLoadError::Format(e.to_string()),
118 }
119 })?;
120 let mut state = state.write().await;
122
123 if state.entries.len() >= config.max_image_count {
125 if let Some(oldest_path) = state.lru_list.first().cloned() {
126 info!(
127 path = ?oldest_path,
128 "Evicting least recently used image"
129 );
130 state.entries.remove(&oldest_path);
131 state.lru_list.remove(0);
132 }
133 }
134
135 if let Some(pos) = state.lru_list.iter().position(|p| p == &path) {
137 state.lru_list.remove(pos);
138 }
139 state.lru_list.push(path.clone());
140
141 state.entries.insert(path.clone(), decoded_image);
143
144 debug!(
145 path = ?path,
146 cache_size = state.entries.len(),
147 "Image cached successfully"
148 );
149
150 Ok::<(), CacheError>(())
151 });
152 }
153
154 async fn get_image_internal(
155 &self,
156 path: PathBuf,
157 ) -> CacheResult<Arc<DynamicImage>> {
158 let start_time = Instant::now();
159 debug!(path = ?path, "Image requested from cache");
160
161 let lookup_start = Instant::now();
163 if let Some(image) = self.lookup_image(&path).await {
164 let lookup_duration = lookup_start.elapsed();
165 let total_duration = start_time.elapsed();
166 debug!(
167 path = ?path,
168 lookup_time = ?lookup_duration,
169 total_time = ?total_duration,
170 "Cache hit"
171 );
172 return Ok(image);
173 }
174
175 debug!(path = ?path, "Cache miss, loading from disk");
176
177 let load_start = Instant::now();
179 let image = self.load_and_cache(path.clone()).await?;
180 let load_duration = load_start.elapsed();
181 let total_duration = start_time.elapsed();
182
183 debug!(
184 path = ?path,
185 load_time = ?load_duration,
186 total_time = ?total_duration,
187 "Cache miss handled"
188 );
189 Ok(image)
190 }
191
192 #[instrument(skip(self, path), fields(path = ?path))]
193 pub async fn cache_image(
194 &self,
195 path: PathBuf,
196 ) -> CacheResult<Arc<DynamicImage>> {
197 let file_size = tokio::fs::metadata(&path)
198 .await
199 .map_err(|e| CacheError::ImageLoad {
200 path: path.clone(),
201 source: ImageLoadError::Io(e),
202 })?
203 .len();
204
205 debug!(
206 path = ?path,
207 size = file_size,
208 "Loading image from filesystem"
209 );
210
211 let image_data = tokio::fs::read(&path).await.map_err(|e| {
213 CacheError::ImageLoad {
214 path: path.clone(),
215 source: ImageLoadError::Io(e),
216 }
217 })?;
218
219 let image_data = image::load_from_memory(&image_data).unwrap();
220
221 let mut state = self.state.write().await;
222
223 if state.entries.len() >= self.config.max_image_count {
224 if let Some(oldest_path) = state.lru_list.first().cloned() {
225 info!(
226 path = ?oldest_path,
227 "Evicting least recently used image"
228 );
229 state.entries.remove(&oldest_path);
230 state.lru_list.remove(0);
231 }
232 }
233
234 if let Some(pos) = state.lru_list.iter().position(|p| p == &path) {
236 state.lru_list.remove(pos);
237 }
238 state.lru_list.push(path.clone());
239
240 let image_data = Arc::new(image_data);
242 state
243 .entries
244 .insert(path.clone(), (*image_data).clone());
245
246 debug!(
247 path = ?path,
248 cache_size = state.entries.len(),
249 "Image cached successfully"
250 );
251
252 Ok(image_data)
253 }
254
255 pub fn runtime(&self) -> Arc<Runtime> {
256 self.runtime_handle.clone()
257 }
258
259 pub async fn get_image(
260 &self,
261 path: PathBuf,
262 ) -> CacheResult<Arc<DynamicImage>> {
263 let start_time = std::time::Instant::now();
264 debug!(path = ?path, "Image requested from cache");
265
266 if let Some(image) = self.lookup_image(&path).await {
267 let duration = start_time.elapsed();
268 debug!(path = ?path, duration = ?duration, "Cache hit");
269 return Ok(image);
270 }
271
272 debug!(path = ?path, "Cache miss, loading from disk");
273 let image = self.load_and_cache(path.clone()).await?;
274 let duration = start_time.elapsed();
275 debug!(path = ?path, duration = ?duration, "Total cache miss time");
276 Ok(image)
277 }
278
279 async fn lookup_image(&self, path: &PathBuf) -> Option<Arc<DynamicImage>> {
281 let mut state = self.state.write().await;
282
283 if let Some(image) = state.entries.get(path) {
284 debug!(path = ?path, "Found image in cache");
285 return Some(Arc::new(image.clone()));
286 }
287 self.update_lru(path, &mut state).await;
288
289 debug!(path = ?path, "Image not found in cache");
290 None
291 }
292
293 async fn load_and_cache(
294 &self,
295 path: PathBuf,
296 ) -> CacheResult<Arc<DynamicImage>> {
297 let load_start = Instant::now();
298
299 let read_start = Instant::now();
301 let file_data = tokio::fs::read(&path).await.map_err(|e| {
302 CacheError::ImageLoad {
303 path: path.clone(),
304 source: ImageLoadError::Io(e),
305 }
306 })?;
307 let read_duration = read_start.elapsed();
308
309 let decode_start = Instant::now();
311 let decoded_image =
312 image::load_from_memory(&file_data).map_err(|e| {
313 CacheError::ImageLoad {
314 path: path.clone(),
315 source: ImageLoadError::Format(e.to_string()),
316 }
317 })?;
318 let decode_duration = decode_start.elapsed();
319
320 let dimensions = decoded_image.dimensions();
321 let file_size = file_data.len();
322
323 let cache_start = Instant::now();
325 let mut state = self.state.write().await;
326
327 if state.entries.len() >= self.config.max_image_count {
329 debug!(
330 "Cache full ({}/{}), evicting oldest entry",
331 state.entries.len(),
332 self.config.max_image_count
333 );
334 if let Some(oldest_path) = state.lru_list.first().cloned() {
335 state.entries.remove(&oldest_path);
336 state.lru_list.remove(0);
337 }
338 }
339
340 let image_data = Arc::new(decoded_image);
341 state
342 .entries
343 .insert(path.clone(), (*image_data).clone());
344 state.lru_list.push(path.clone());
345
346 let cache_update_duration = cache_start.elapsed();
347 let total_duration = load_start.elapsed();
348
349 debug!(
350 path = ?path,
351 width = dimensions.0,
352 height = dimensions.1,
353 file_size = file_size,
354 read_time = ?read_duration,
355 decode_time = ?decode_duration,
356 cache_update_time = ?cache_update_duration,
357 total_time = ?total_duration,
358 "Image loaded and cached"
359 );
360
361 Ok(image_data)
362 }
363
364 async fn update_lru(&self, path: &PathBuf, state: &mut CacheState) {
365 if let Some(pos) = state.lru_list.iter().position(|p| p == path) {
366 state.lru_list.remove(pos);
367 }
368 state.lru_list.push(path.clone());
369 debug!(
370 path = ?path,
371 list_size = state.lru_list.len(),
372 "Updated LRU list"
373 );
374 }
375}
376
377impl Drop for CacheManager {
378 fn drop(&mut self) {
379 debug!("CacheManager being dropped, cleaning up resources");
380
381 let state = self.state.try_write();
383 if let Ok(mut state) = state {
384 state.entries.clear();
385 state.lru_list.clear();
386 debug!("Cache entries cleared");
387 }
388 }
389}