1use std::{path::PathBuf, sync::Arc, thread};
2
3use crate::{
4 types::{CacheConfig, CacheHandle, CacheRequest, CacheState},
5 CacheError, CacheResult, ImageLoadError,
6};
7use image::{DynamicImage, GenericImageView};
8use tokio::{
9 runtime::Runtime,
10 sync::{mpsc, oneshot, RwLock},
11 time::Instant,
12};
13use tracing::{debug, info, instrument};
14pub struct CacheManager {
15 config: CacheConfig,
16 state: Arc<RwLock<CacheState>>,
17 runtime_handle: Arc<Runtime>,
18 _shutdown_tx: oneshot::Sender<()>,
19}
20
21impl CacheManager {
22 #[instrument(skip(config), fields(max_images = config.max_image_count))]
23 pub fn new(config: CacheConfig) -> CacheHandle {
24 let (request_tx, mut request_rx) = mpsc::unbounded_channel();
25 let (shutdown_tx, shutdown_rx) = oneshot::channel();
26
27 let state = Arc::new(RwLock::new(CacheState::new()));
28
29 thread::spawn(move || {
30 let runtime = Arc::new(
31 tokio::runtime::Builder::new_multi_thread()
32 .worker_threads(config.thread_count)
33 .enable_all()
34 .build()
35 .expect("Failed to create Tokio runtime"),
36 );
37
38 let manager = Arc::new(Self {
39 config,
40 state: state.clone(),
41 runtime_handle: runtime.clone(),
42 _shutdown_tx: shutdown_tx,
43 });
44
45 runtime.block_on(async {
46 let shutdown_future = shutdown_rx;
47 tokio::pin!(shutdown_future);
48
49 loop {
50 tokio::select! {
51 _ = &mut shutdown_future => {
52 debug!("Received shutdown signal");
53 break;
54 }
55 Some(request) = request_rx.recv() => {
56 let manager = manager.clone();
57 match request {
58 CacheRequest::GetImage { path, response_tx } => {
59 runtime.spawn(async move {
60 let result = manager.get_image_internal(path).await;
61 let _ = response_tx.send(result);
62 });
63 }
64 CacheRequest::CacheImage { path, response_tx } => {
65 let manager = Arc::clone(&manager);
66 runtime.spawn(async move {
67 manager.handle_cache_request(path, response_tx).await;
68 });
69 }
70 }
71 }
72 else => break,
73 }
74 }
75 debug!("Cache manager event loop terminated");
76 });
77 });
78
79 CacheHandle::new(request_tx)
80 }
81
82 async fn handle_cache_request(
83 &self,
84 path: PathBuf,
85 response_tx: oneshot::Sender<CacheResult<()>>,
86 ) {
87 let state = Arc::clone(&self.state);
89 let config = self.config.clone();
90 let runtime = self.runtime();
91
92 runtime.spawn(async move {
93 let _file_size = tokio::fs::metadata(&path).await.map_err(|e| {
95 CacheError::ImageLoad {
96 path: path.clone(),
97 source: ImageLoadError::Io(e),
98 }
99 })?;
100
101 let _ = response_tx.send(Ok(()));
103
104 let image_data = tokio::fs::read(&path).await.map_err(|e| {
106 CacheError::ImageLoad {
107 path: path.clone(),
108 source: ImageLoadError::Io(e),
109 }
110 })?;
111
112 let decoded_image =
113 image::load_from_memory(&image_data).map_err(|e| {
114 CacheError::ImageLoad {
115 path: path.clone(),
116 source: ImageLoadError::Format(e.to_string()),
117 }
118 })?;
119 let mut state = state.write().await;
121
122 if state.entries.len() >= config.max_image_count {
124 if let Some(oldest_path) = state.lru_list.first().cloned() {
125 info!(
126 path = ?oldest_path,
127 "Evicting least recently used image"
128 );
129 state.entries.remove(&oldest_path);
130 state.lru_list.remove(0);
131 }
132 }
133
134 if let Some(pos) = state.lru_list.iter().position(|p| p == &path) {
136 state.lru_list.remove(pos);
137 }
138 state.lru_list.push(path.clone());
139
140 state.entries.insert(path.clone(), decoded_image);
142
143 debug!(
144 path = ?path,
145 cache_size = state.entries.len(),
146 "Image cached successfully"
147 );
148
149 Ok::<(), CacheError>(())
150 });
151 }
152
153 async fn get_image_internal(
154 &self,
155 path: PathBuf,
156 ) -> CacheResult<Arc<DynamicImage>> {
157 let start_time = Instant::now();
158 debug!(path = ?path, "Image requested from cache");
159
160 let lookup_start = Instant::now();
162 if let Some(image) = self.lookup_image(&path).await {
163 let lookup_duration = lookup_start.elapsed();
164 let total_duration = start_time.elapsed();
165 debug!(
166 path = ?path,
167 lookup_time = ?lookup_duration,
168 total_time = ?total_duration,
169 "Cache hit"
170 );
171 return Ok(image);
172 }
173
174 debug!(path = ?path, "Cache miss, loading from disk");
175
176 let load_start = Instant::now();
178 let image = self.load_and_cache(path.clone()).await?;
179 let load_duration = load_start.elapsed();
180 let total_duration = start_time.elapsed();
181
182 debug!(
183 path = ?path,
184 load_time = ?load_duration,
185 total_time = ?total_duration,
186 "Cache miss handled"
187 );
188 Ok(image)
189 }
190
191 #[instrument(skip(self, path), fields(path = ?path))]
192 pub async fn cache_image(
193 &self,
194 path: PathBuf,
195 ) -> CacheResult<Arc<DynamicImage>> {
196 let file_size = tokio::fs::metadata(&path)
197 .await
198 .map_err(|e| CacheError::ImageLoad {
199 path: path.clone(),
200 source: ImageLoadError::Io(e),
201 })?
202 .len();
203
204 debug!(
205 path = ?path,
206 size = file_size,
207 "Loading image from filesystem"
208 );
209
210 let image_data = tokio::fs::read(&path).await.map_err(|e| {
212 CacheError::ImageLoad {
213 path: path.clone(),
214 source: ImageLoadError::Io(e),
215 }
216 })?;
217
218 let image_data = image::load_from_memory(&image_data).unwrap();
219
220 let mut state = self.state.write().await;
221
222 if state.entries.len() >= self.config.max_image_count {
223 if let Some(oldest_path) = state.lru_list.first().cloned() {
224 info!(
225 path = ?oldest_path,
226 "Evicting least recently used image"
227 );
228 state.entries.remove(&oldest_path);
229 state.lru_list.remove(0);
230 }
231 }
232
233 if let Some(pos) = state.lru_list.iter().position(|p| p == &path) {
235 state.lru_list.remove(pos);
236 }
237 state.lru_list.push(path.clone());
238
239 let image_data = Arc::new(image_data);
241 state
242 .entries
243 .insert(path.clone(), (*image_data).clone());
244
245 debug!(
246 path = ?path,
247 cache_size = state.entries.len(),
248 "Image cached successfully"
249 );
250
251 Ok(image_data)
252 }
253
254 pub fn runtime(&self) -> Arc<Runtime> {
255 self.runtime_handle.clone()
256 }
257
258 pub async fn get_image(
259 &self,
260 path: PathBuf,
261 ) -> CacheResult<Arc<DynamicImage>> {
262 let start_time = std::time::Instant::now();
263 debug!(path = ?path, "Image requested from cache");
264
265 if let Some(image) = self.lookup_image(&path).await {
266 let duration = start_time.elapsed();
267 debug!(path = ?path, duration = ?duration, "Cache hit");
268 return Ok(image);
269 }
270
271 debug!(path = ?path, "Cache miss, loading from disk");
272 let image = self.load_and_cache(path.clone()).await?;
273 let duration = start_time.elapsed();
274 debug!(path = ?path, duration = ?duration, "Total cache miss time");
275 Ok(image)
276 }
277
278 async fn lookup_image(&self, path: &PathBuf) -> Option<Arc<DynamicImage>> {
280 let mut state = self.state.write().await;
281
282 if let Some(image) = state.entries.get(path) {
283 debug!(path = ?path, "Found image in cache");
284 return Some(Arc::new(image.clone()));
285 }
286 self.update_lru(path, &mut state).await;
287
288 debug!(path = ?path, "Image not found in cache");
289 None
290 }
291
292 async fn load_and_cache(
293 &self,
294 path: PathBuf,
295 ) -> CacheResult<Arc<DynamicImage>> {
296 let load_start = Instant::now();
297
298 let read_start = Instant::now();
300 let file_data = tokio::fs::read(&path).await.map_err(|e| {
301 CacheError::ImageLoad {
302 path: path.clone(),
303 source: ImageLoadError::Io(e),
304 }
305 })?;
306 let read_duration = read_start.elapsed();
307
308 let decode_start = Instant::now();
310 let decoded_image =
311 image::load_from_memory(&file_data).map_err(|e| {
312 CacheError::ImageLoad {
313 path: path.clone(),
314 source: ImageLoadError::Format(e.to_string()),
315 }
316 })?;
317 let decode_duration = decode_start.elapsed();
318
319 let dimensions = decoded_image.dimensions();
320 let file_size = file_data.len();
321
322 let cache_start = Instant::now();
324 let mut state = self.state.write().await;
325
326 if state.entries.len() >= self.config.max_image_count {
328 debug!(
329 "Cache full ({}/{}), evicting oldest entry",
330 state.entries.len(),
331 self.config.max_image_count
332 );
333 if let Some(oldest_path) = state.lru_list.first().cloned() {
334 state.entries.remove(&oldest_path);
335 state.lru_list.remove(0);
336 }
337 }
338
339 let image_data = Arc::new(decoded_image);
340 state
341 .entries
342 .insert(path.clone(), (*image_data).clone());
343 state.lru_list.push(path.clone());
344
345 let cache_update_duration = cache_start.elapsed();
346 let total_duration = load_start.elapsed();
347
348 debug!(
349 path = ?path,
350 width = dimensions.0,
351 height = dimensions.1,
352 file_size = file_size,
353 read_time = ?read_duration,
354 decode_time = ?decode_duration,
355 cache_update_time = ?cache_update_duration,
356 total_time = ?total_duration,
357 "Image loaded and cached"
358 );
359
360 Ok(image_data)
361 }
362
363 async fn update_lru(&self, path: &PathBuf, state: &mut CacheState) {
364 if let Some(pos) = state.lru_list.iter().position(|p| p == path) {
365 state.lru_list.remove(pos);
366 }
367 state.lru_list.push(path.clone());
368 debug!(
369 path = ?path,
370 list_size = state.lru_list.len(),
371 "Updated LRU list"
372 );
373 }
374}
375
376impl Drop for CacheManager {
377 fn drop(&mut self) {
378 debug!("CacheManager being dropped, cleaning up resources");
379
380 let state = self.state.try_write();
382 if let Ok(mut state) = state {
383 state.entries.clear();
384 state.lru_list.clear();
385 debug!("Cache entries cleared");
386 }
387 }
388}