1#[cfg(feature = "cache")]
4use dashmap::DashMap;
5use std::collections::HashMap;
6use std::path::PathBuf;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10
11#[cfg(feature = "async")]
12use tokio::sync::RwLock;
13
14use bytes::Bytes;
15
16use super::CacheConfig;
17use super::metadata::{
18 CacheEntry, CacheKey, CacheStats, DiskCacheMetadata, LevelStats, SpatialInfo, TileCoord,
19};
20use crate::error::{CacheError, CloudError, Result};
21
22#[cfg(feature = "cache")]
24pub struct SpatialCache {
25 storage: Arc<DashMap<CacheKey, CacheEntry>>,
27 spatial_index: Arc<DashMap<(i64, i64), Vec<CacheKey>>>,
29 cell_size: f64,
31 current_size: Arc<AtomicUsize>,
33 config: CacheConfig,
35 stats: CacheStats,
37}
38
39#[cfg(feature = "cache")]
40impl SpatialCache {
41 pub fn new(config: CacheConfig, cell_size: f64) -> Self {
43 Self {
44 storage: Arc::new(DashMap::new()),
45 spatial_index: Arc::new(DashMap::new()),
46 cell_size,
47 current_size: Arc::new(AtomicUsize::new(0)),
48 config,
49 stats: CacheStats::default(),
50 }
51 }
52
53 fn get_cell(&self, x: f64, y: f64) -> (i64, i64) {
55 let cell_x = (x / self.cell_size).floor() as i64;
56 let cell_y = (y / self.cell_size).floor() as i64;
57 (cell_x, cell_y)
58 }
59
60 pub async fn get_by_bounds(
62 &self,
63 bounds: (f64, f64, f64, f64),
64 ) -> Result<Vec<(CacheKey, Bytes)>> {
65 let (min_x, min_y, max_x, max_y) = bounds;
66 let query_bounds = SpatialInfo::new(bounds);
67
68 let min_cell = self.get_cell(min_x, min_y);
69 let max_cell = self.get_cell(max_x, max_y);
70
71 let mut results = Vec::new();
72
73 for cx in min_cell.0..=max_cell.0 {
74 for cy in min_cell.1..=max_cell.1 {
75 if let Some(keys) = self.spatial_index.get(&(cx, cy)) {
76 for key in keys.iter() {
77 if let Some(entry) = self.storage.get(key) {
78 if let Some(ref spatial) = entry.spatial_info {
79 if spatial.intersects(&query_bounds) && !entry.is_expired() {
80 results.push((key.clone(), entry.data.clone()));
81 }
82 }
83 }
84 }
85 }
86 }
87 }
88
89 self.stats
90 .hits
91 .fetch_add(results.len() as u64, Ordering::Relaxed);
92 Ok(results)
93 }
94
95 pub async fn put(
97 &self,
98 key: CacheKey,
99 data: Bytes,
100 spatial_info: SpatialInfo,
101 ttl: Option<Duration>,
102 ) -> Result<()> {
103 let entry = if let Some(ttl_duration) = ttl.or(self.config.default_ttl) {
104 let mut e = CacheEntry::with_ttl(data, false, ttl_duration);
105 e.spatial_info = Some(spatial_info.clone());
106 e
107 } else {
108 CacheEntry::with_spatial_info(data, false, spatial_info.clone())
109 };
110
111 let entry_size = entry.size;
112
113 while self.current_size.load(Ordering::SeqCst) + entry_size > self.config.max_memory_size
115 && !self.storage.is_empty()
116 {
117 self.evict_oldest().await;
118 }
119
120 let (min_x, min_y, max_x, max_y) = spatial_info.bounds;
122 let min_cell = self.get_cell(min_x, min_y);
123 let max_cell = self.get_cell(max_x, max_y);
124
125 for cx in min_cell.0..=max_cell.0 {
126 for cy in min_cell.1..=max_cell.1 {
127 self.spatial_index
128 .entry((cx, cy))
129 .or_default()
130 .push(key.clone());
131 }
132 }
133
134 if let Some(old) = self.storage.insert(key, entry) {
135 self.current_size.fetch_sub(old.size, Ordering::SeqCst);
136 }
137 self.current_size.fetch_add(entry_size, Ordering::SeqCst);
138 self.stats.writes.fetch_add(1, Ordering::Relaxed);
139
140 Ok(())
141 }
142
143 async fn evict_oldest(&self) {
145 let mut oldest_key: Option<String> = None;
146 let mut oldest_time = Instant::now();
147
148 for entry in self.storage.iter() {
149 if entry.created_at < oldest_time {
150 oldest_time = entry.created_at;
151 oldest_key = Some(entry.key().clone());
152 }
153 }
154
155 if let Some(key) = oldest_key {
156 if let Some((_, entry)) = self.storage.remove(&key) {
157 self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
158 self.stats.evictions.fetch_add(1, Ordering::Relaxed);
159 }
160 }
161 }
162
163 pub async fn clear(&self) -> Result<()> {
165 self.storage.clear();
166 self.spatial_index.clear();
167 self.current_size.store(0, Ordering::SeqCst);
168 Ok(())
169 }
170}
171
172#[cfg(feature = "cache")]
174pub struct TileCache {
175 storage: Arc<DashMap<TileCoord, CacheEntry>>,
177 level_stats: Arc<DashMap<u8, LevelStats>>,
179 current_size: Arc<AtomicUsize>,
181 config: CacheConfig,
183 stats: CacheStats,
185}
186
187#[cfg(feature = "cache")]
188impl TileCache {
189 pub fn new(config: CacheConfig) -> Self {
191 Self {
192 storage: Arc::new(DashMap::new()),
193 level_stats: Arc::new(DashMap::new()),
194 current_size: Arc::new(AtomicUsize::new(0)),
195 config,
196 stats: CacheStats::default(),
197 }
198 }
199
200 pub async fn get(&self, coord: &TileCoord) -> Result<Bytes> {
202 if let Some(mut entry) = self.storage.get_mut(coord) {
203 if entry.is_expired() {
204 drop(entry);
205 self.remove(coord).await?;
206 return Err(CloudError::Cache(CacheError::Miss {
207 key: coord.to_cache_key("tile"),
208 }));
209 }
210
211 entry.record_access();
212
213 if let Some(level_stat) = self.level_stats.get(&coord.z) {
214 level_stat.hits.fetch_add(1, Ordering::Relaxed);
215 }
216
217 self.stats.hits.fetch_add(1, Ordering::Relaxed);
218 Ok(entry.data.clone())
219 } else {
220 self.stats.misses.fetch_add(1, Ordering::Relaxed);
221 Err(CloudError::Cache(CacheError::Miss {
222 key: coord.to_cache_key("tile"),
223 }))
224 }
225 }
226
227 pub async fn put(&self, coord: TileCoord, data: Bytes, ttl: Option<Duration>) -> Result<()> {
229 let entry = if let Some(ttl_duration) = ttl.or(self.config.default_ttl) {
230 CacheEntry::with_ttl(data, false, ttl_duration)
231 } else {
232 CacheEntry::new(data, false)
233 };
234
235 let entry_size = entry.size;
236
237 while self.current_size.load(Ordering::SeqCst) + entry_size > self.config.max_memory_size
239 && !self.storage.is_empty()
240 {
241 self.evict_tile().await;
242 }
243
244 let level_stat = self.level_stats.entry(coord.z).or_default();
246 level_stat.tile_count.fetch_add(1, Ordering::SeqCst);
247 level_stat
248 .total_size
249 .fetch_add(entry_size, Ordering::SeqCst);
250
251 if let Some(old) = self.storage.insert(coord, entry) {
252 self.current_size.fetch_sub(old.size, Ordering::SeqCst);
253 }
254 self.current_size.fetch_add(entry_size, Ordering::SeqCst);
255 self.stats.writes.fetch_add(1, Ordering::Relaxed);
256
257 Ok(())
258 }
259
260 async fn evict_tile(&self) {
262 let mut max_level = 0u8;
264 for entry in self.level_stats.iter() {
265 if *entry.key() > max_level && entry.tile_count.load(Ordering::SeqCst) > 0 {
266 max_level = *entry.key();
267 }
268 }
269
270 let mut key_to_remove: Option<TileCoord> = None;
272 for entry in self.storage.iter() {
273 if entry.key().z == max_level {
274 key_to_remove = Some(entry.key().clone());
275 break;
276 }
277 }
278
279 if let Some(coord) = key_to_remove {
280 if let Some((_, entry)) = self.storage.remove(&coord) {
281 self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
282
283 if let Some(level_stat) = self.level_stats.get(&coord.z) {
284 level_stat.tile_count.fetch_sub(1, Ordering::SeqCst);
285 level_stat
286 .total_size
287 .fetch_sub(entry.size, Ordering::SeqCst);
288 }
289
290 self.stats.evictions.fetch_add(1, Ordering::Relaxed);
291 }
292 }
293 }
294
295 pub async fn remove(&self, coord: &TileCoord) -> Result<()> {
297 if let Some((_, entry)) = self.storage.remove(coord) {
298 self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
299
300 if let Some(level_stat) = self.level_stats.get(&coord.z) {
301 level_stat.tile_count.fetch_sub(1, Ordering::SeqCst);
302 level_stat
303 .total_size
304 .fetch_sub(entry.size, Ordering::SeqCst);
305 }
306 }
307 Ok(())
308 }
309
310 pub fn get_prefetch_targets(&self, coord: &TileCoord) -> Vec<TileCoord> {
312 let mut targets = Vec::new();
313
314 let x = coord.x;
316 let y = coord.y;
317 let z = coord.z;
318
319 let offsets: [(i32, i32); 8] = [
320 (-1, -1),
321 (0, -1),
322 (1, -1),
323 (-1, 0),
324 (1, 0),
325 (-1, 1),
326 (0, 1),
327 (1, 1),
328 ];
329
330 for (dx, dy) in offsets {
331 let nx = x as i64 + dx as i64;
332 let ny = y as i64 + dy as i64;
333 if nx >= 0 && ny >= 0 {
334 targets.push(TileCoord::new(z, nx as u32, ny as u32));
335 }
336 }
337
338 if let Some(parent) = coord.parent() {
340 targets.push(parent);
341 }
342
343 targets
344 }
345
346 pub async fn clear(&self) -> Result<()> {
348 self.storage.clear();
349 self.level_stats.clear();
350 self.current_size.store(0, Ordering::SeqCst);
351 Ok(())
352 }
353
354 #[must_use]
356 pub fn stats(&self) -> &CacheStats {
357 &self.stats
358 }
359}
360
361pub struct PersistentDiskCache {
363 cache_dir: PathBuf,
365 metadata: Arc<RwLock<HashMap<CacheKey, DiskCacheMetadata>>>,
367 current_size: Arc<AtomicUsize>,
369 config: CacheConfig,
371 stats: CacheStats,
373}
374
375impl PersistentDiskCache {
376 pub fn new(config: CacheConfig) -> Result<Self> {
378 let cache_dir = config.cache_dir.clone().ok_or_else(|| {
379 CloudError::Cache(CacheError::WriteError {
380 message: "Cache directory not configured".to_string(),
381 })
382 })?;
383
384 std::fs::create_dir_all(&cache_dir).map_err(|e| {
385 CloudError::Cache(CacheError::WriteError {
386 message: format!("Failed to create cache directory: {e}"),
387 })
388 })?;
389
390 let mut cache = Self {
391 cache_dir,
392 metadata: Arc::new(RwLock::new(HashMap::new())),
393 current_size: Arc::new(AtomicUsize::new(0)),
394 config,
395 stats: CacheStats::default(),
396 };
397
398 cache.load_metadata_blocking()?;
400
401 Ok(cache)
402 }
403
404 fn load_metadata_blocking(&mut self) -> Result<()> {
406 let metadata_path = self.cache_dir.join("metadata.json");
407 if metadata_path.exists() {
408 let content = std::fs::read_to_string(&metadata_path).map_err(|e| {
409 CloudError::Cache(CacheError::ReadError {
410 message: format!("Failed to read metadata: {e}"),
411 })
412 })?;
413
414 let metadata: HashMap<CacheKey, DiskCacheMetadata> = serde_json::from_str(&content)
415 .map_err(|e| {
416 CloudError::Cache(CacheError::ReadError {
417 message: format!("Failed to parse metadata: {e}"),
418 })
419 })?;
420
421 let total_size: usize = metadata.values().map(|m| m.size).sum();
422 self.current_size.store(total_size, Ordering::SeqCst);
423 *self.metadata.blocking_write() = metadata;
424 }
425 Ok(())
426 }
427
428 async fn save_metadata(&self) -> Result<()> {
430 let metadata_path = self.cache_dir.join("metadata.json");
431 let metadata = self.metadata.read().await;
432 let content = serde_json::to_string_pretty(&*metadata).map_err(|e| {
433 CloudError::Cache(CacheError::WriteError {
434 message: format!("Failed to serialize metadata: {e}"),
435 })
436 })?;
437
438 tokio::fs::write(&metadata_path, content)
439 .await
440 .map_err(|e| {
441 CloudError::Cache(CacheError::WriteError {
442 message: format!("Failed to write metadata: {e}"),
443 })
444 })?;
445
446 Ok(())
447 }
448
449 fn get_path(&self, key: &CacheKey) -> PathBuf {
451 use sha2::{Digest, Sha256};
452 let mut hasher = Sha256::new();
453 hasher.update(key.as_bytes());
454 let hash = hasher.finalize();
455 let filename = format!("{:x}", hash);
456 self.cache_dir.join(&filename[..2]).join(filename)
457 }
458
459 pub async fn get(&self, key: &CacheKey) -> Result<Bytes> {
461 let metadata = self.metadata.read().await;
462
463 if let Some(meta) = metadata.get(key) {
464 if let Some(expires_at_ms) = meta.expires_at_ms {
466 let now_ms = std::time::SystemTime::now()
467 .duration_since(std::time::UNIX_EPOCH)
468 .map(|d| d.as_millis() as u64)
469 .unwrap_or(0);
470
471 if now_ms >= expires_at_ms {
472 drop(metadata);
473 self.remove(key).await?;
474 return Err(CloudError::Cache(CacheError::Miss { key: key.clone() }));
475 }
476 }
477
478 let path = self.get_path(key);
479 let data = tokio::fs::read(&path)
480 .await
481 .map_err(|_| CloudError::Cache(CacheError::Miss { key: key.clone() }))?;
482
483 self.stats.hits.fetch_add(1, Ordering::Relaxed);
484 Ok(Bytes::from(data))
485 } else {
486 self.stats.misses.fetch_add(1, Ordering::Relaxed);
487 Err(CloudError::Cache(CacheError::Miss { key: key.clone() }))
488 }
489 }
490
491 pub async fn put(&self, key: CacheKey, data: Bytes, ttl: Option<Duration>) -> Result<()> {
493 let path = self.get_path(&key);
494
495 if let Some(parent) = path.parent() {
497 tokio::fs::create_dir_all(parent).await.map_err(|e| {
498 CloudError::Cache(CacheError::WriteError {
499 message: format!("Failed to create directory: {e}"),
500 })
501 })?;
502 }
503
504 let size = data.len();
505
506 while self.current_size.load(Ordering::SeqCst) + size > self.config.max_disk_size {
508 self.evict_oldest().await?;
509 }
510
511 tokio::fs::write(&path, &data).await.map_err(|e| {
513 CloudError::Cache(CacheError::WriteError {
514 message: format!("Failed to write file: {e}"),
515 })
516 })?;
517
518 let now_ms = std::time::SystemTime::now()
520 .duration_since(std::time::UNIX_EPOCH)
521 .map(|d| d.as_millis() as u64)
522 .unwrap_or(0);
523
524 let expires_at_ms = ttl
525 .or(self.config.default_ttl)
526 .map(|d| now_ms + d.as_millis() as u64);
527
528 let meta = DiskCacheMetadata {
529 path: path
530 .strip_prefix(&self.cache_dir)
531 .unwrap_or(&path)
532 .to_string_lossy()
533 .to_string(),
534 size,
535 created_at_ms: now_ms,
536 expires_at_ms,
537 access_count: 1,
538 compressed: false,
539 };
540
541 {
542 let mut metadata = self.metadata.write().await;
543 if let Some(old) = metadata.insert(key, meta) {
544 self.current_size.fetch_sub(old.size, Ordering::SeqCst);
545 }
546 }
547
548 self.current_size.fetch_add(size, Ordering::SeqCst);
549 self.stats.writes.fetch_add(1, Ordering::Relaxed);
550
551 if self.stats.writes.load(Ordering::Relaxed) % 10 == 0 {
553 self.save_metadata().await?;
554 }
555
556 Ok(())
557 }
558
559 async fn evict_oldest(&self) -> Result<()> {
561 let mut oldest_key: Option<String> = None;
562 let mut oldest_time = u64::MAX;
563
564 {
565 let metadata = self.metadata.read().await;
566 for (key, meta) in metadata.iter() {
567 if meta.created_at_ms < oldest_time {
568 oldest_time = meta.created_at_ms;
569 oldest_key = Some(key.clone());
570 }
571 }
572 }
573
574 if let Some(key) = oldest_key {
575 self.remove(&key).await?;
576 }
577
578 Ok(())
579 }
580
581 pub async fn remove(&self, key: &CacheKey) -> Result<()> {
583 let path = self.get_path(key);
584 tokio::fs::remove_file(&path).await.ok();
585
586 let mut metadata = self.metadata.write().await;
587 if let Some(meta) = metadata.remove(key) {
588 self.current_size.fetch_sub(meta.size, Ordering::SeqCst);
589 self.stats.evictions.fetch_add(1, Ordering::Relaxed);
590 }
591
592 Ok(())
593 }
594
595 pub async fn clear(&self) -> Result<()> {
597 let metadata = self.metadata.read().await;
598 for key in metadata.keys() {
599 let path = self.get_path(key);
600 tokio::fs::remove_file(&path).await.ok();
601 }
602 drop(metadata);
603
604 self.metadata.write().await.clear();
605 self.current_size.store(0, Ordering::SeqCst);
606
607 self.save_metadata().await?;
608
609 Ok(())
610 }
611
612 #[must_use]
614 pub fn stats(&self) -> &CacheStats {
615 &self.stats
616 }
617}