Skip to main content

oxigdal_cloud/cache/
backends.rs

1//! Specialized cache backends (Spatial, Tile, Persistent Disk)
2
3#[cfg(feature = "cache")]
4use dashmap::DashMap;
5use std::collections::HashMap;
6use std::path::PathBuf;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::time::{Duration, Instant};
10
11#[cfg(feature = "async")]
12use tokio::sync::RwLock;
13
14use bytes::Bytes;
15
16use super::CacheConfig;
17use super::metadata::{
18    CacheEntry, CacheKey, CacheStats, DiskCacheMetadata, LevelStats, SpatialInfo, TileCoord,
19};
20use crate::error::{CacheError, CloudError, Result};
21
22/// Spatial-aware cache for geospatial data
23#[cfg(feature = "cache")]
24pub struct SpatialCache {
25    /// Main storage
26    storage: Arc<DashMap<CacheKey, CacheEntry>>,
27    /// Spatial index: maps grid cells to keys
28    spatial_index: Arc<DashMap<(i64, i64), Vec<CacheKey>>>,
29    /// Grid cell size
30    cell_size: f64,
31    /// Current size
32    current_size: Arc<AtomicUsize>,
33    /// Configuration
34    config: CacheConfig,
35    /// Statistics
36    stats: CacheStats,
37}
38
39#[cfg(feature = "cache")]
40impl SpatialCache {
41    /// Creates a new spatial cache
42    pub fn new(config: CacheConfig, cell_size: f64) -> Self {
43        Self {
44            storage: Arc::new(DashMap::new()),
45            spatial_index: Arc::new(DashMap::new()),
46            cell_size,
47            current_size: Arc::new(AtomicUsize::new(0)),
48            config,
49            stats: CacheStats::default(),
50        }
51    }
52
53    /// Gets cell coordinates for a point
54    fn get_cell(&self, x: f64, y: f64) -> (i64, i64) {
55        let cell_x = (x / self.cell_size).floor() as i64;
56        let cell_y = (y / self.cell_size).floor() as i64;
57        (cell_x, cell_y)
58    }
59
60    /// Gets entries intersecting a bounding box
61    pub async fn get_by_bounds(
62        &self,
63        bounds: (f64, f64, f64, f64),
64    ) -> Result<Vec<(CacheKey, Bytes)>> {
65        let (min_x, min_y, max_x, max_y) = bounds;
66        let query_bounds = SpatialInfo::new(bounds);
67
68        let min_cell = self.get_cell(min_x, min_y);
69        let max_cell = self.get_cell(max_x, max_y);
70
71        let mut results = Vec::new();
72
73        for cx in min_cell.0..=max_cell.0 {
74            for cy in min_cell.1..=max_cell.1 {
75                if let Some(keys) = self.spatial_index.get(&(cx, cy)) {
76                    for key in keys.iter() {
77                        if let Some(entry) = self.storage.get(key) {
78                            if let Some(ref spatial) = entry.spatial_info {
79                                if spatial.intersects(&query_bounds) && !entry.is_expired() {
80                                    results.push((key.clone(), entry.data.clone()));
81                                }
82                            }
83                        }
84                    }
85                }
86            }
87        }
88
89        self.stats
90            .hits
91            .fetch_add(results.len() as u64, Ordering::Relaxed);
92        Ok(results)
93    }
94
95    /// Puts an entry with spatial info
96    pub async fn put(
97        &self,
98        key: CacheKey,
99        data: Bytes,
100        spatial_info: SpatialInfo,
101        ttl: Option<Duration>,
102    ) -> Result<()> {
103        let entry = if let Some(ttl_duration) = ttl.or(self.config.default_ttl) {
104            let mut e = CacheEntry::with_ttl(data, false, ttl_duration);
105            e.spatial_info = Some(spatial_info.clone());
106            e
107        } else {
108            CacheEntry::with_spatial_info(data, false, spatial_info.clone())
109        };
110
111        let entry_size = entry.size;
112
113        // Evict if necessary
114        while self.current_size.load(Ordering::SeqCst) + entry_size > self.config.max_memory_size
115            && !self.storage.is_empty()
116        {
117            self.evict_oldest().await;
118        }
119
120        // Index in spatial grid
121        let (min_x, min_y, max_x, max_y) = spatial_info.bounds;
122        let min_cell = self.get_cell(min_x, min_y);
123        let max_cell = self.get_cell(max_x, max_y);
124
125        for cx in min_cell.0..=max_cell.0 {
126            for cy in min_cell.1..=max_cell.1 {
127                self.spatial_index
128                    .entry((cx, cy))
129                    .or_default()
130                    .push(key.clone());
131            }
132        }
133
134        if let Some(old) = self.storage.insert(key, entry) {
135            self.current_size.fetch_sub(old.size, Ordering::SeqCst);
136        }
137        self.current_size.fetch_add(entry_size, Ordering::SeqCst);
138        self.stats.writes.fetch_add(1, Ordering::Relaxed);
139
140        Ok(())
141    }
142
143    /// Evicts oldest entry
144    async fn evict_oldest(&self) {
145        let mut oldest_key: Option<String> = None;
146        let mut oldest_time = Instant::now();
147
148        for entry in self.storage.iter() {
149            if entry.created_at < oldest_time {
150                oldest_time = entry.created_at;
151                oldest_key = Some(entry.key().clone());
152            }
153        }
154
155        if let Some(key) = oldest_key {
156            if let Some((_, entry)) = self.storage.remove(&key) {
157                self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
158                self.stats.evictions.fetch_add(1, Ordering::Relaxed);
159            }
160        }
161    }
162
163    /// Clears the cache
164    pub async fn clear(&self) -> Result<()> {
165        self.storage.clear();
166        self.spatial_index.clear();
167        self.current_size.store(0, Ordering::SeqCst);
168        Ok(())
169    }
170}
171
172/// Tile-based cache for COG and tile pyramids
173#[cfg(feature = "cache")]
174pub struct TileCache {
175    /// Main storage
176    storage: Arc<DashMap<TileCoord, CacheEntry>>,
177    /// Level statistics
178    level_stats: Arc<DashMap<u8, LevelStats>>,
179    /// Current size
180    current_size: Arc<AtomicUsize>,
181    /// Configuration
182    config: CacheConfig,
183    /// Statistics
184    stats: CacheStats,
185}
186
187#[cfg(feature = "cache")]
188impl TileCache {
189    /// Creates a new tile cache
190    pub fn new(config: CacheConfig) -> Self {
191        Self {
192            storage: Arc::new(DashMap::new()),
193            level_stats: Arc::new(DashMap::new()),
194            current_size: Arc::new(AtomicUsize::new(0)),
195            config,
196            stats: CacheStats::default(),
197        }
198    }
199
200    /// Gets a tile
201    pub async fn get(&self, coord: &TileCoord) -> Result<Bytes> {
202        if let Some(mut entry) = self.storage.get_mut(coord) {
203            if entry.is_expired() {
204                drop(entry);
205                self.remove(coord).await?;
206                return Err(CloudError::Cache(CacheError::Miss {
207                    key: coord.to_cache_key("tile"),
208                }));
209            }
210
211            entry.record_access();
212
213            if let Some(level_stat) = self.level_stats.get(&coord.z) {
214                level_stat.hits.fetch_add(1, Ordering::Relaxed);
215            }
216
217            self.stats.hits.fetch_add(1, Ordering::Relaxed);
218            Ok(entry.data.clone())
219        } else {
220            self.stats.misses.fetch_add(1, Ordering::Relaxed);
221            Err(CloudError::Cache(CacheError::Miss {
222                key: coord.to_cache_key("tile"),
223            }))
224        }
225    }
226
227    /// Puts a tile
228    pub async fn put(&self, coord: TileCoord, data: Bytes, ttl: Option<Duration>) -> Result<()> {
229        let entry = if let Some(ttl_duration) = ttl.or(self.config.default_ttl) {
230            CacheEntry::with_ttl(data, false, ttl_duration)
231        } else {
232            CacheEntry::new(data, false)
233        };
234
235        let entry_size = entry.size;
236
237        // Evict if necessary
238        while self.current_size.load(Ordering::SeqCst) + entry_size > self.config.max_memory_size
239            && !self.storage.is_empty()
240        {
241            self.evict_tile().await;
242        }
243
244        // Update level stats
245        let level_stat = self.level_stats.entry(coord.z).or_default();
246        level_stat.tile_count.fetch_add(1, Ordering::SeqCst);
247        level_stat
248            .total_size
249            .fetch_add(entry_size, Ordering::SeqCst);
250
251        if let Some(old) = self.storage.insert(coord, entry) {
252            self.current_size.fetch_sub(old.size, Ordering::SeqCst);
253        }
254        self.current_size.fetch_add(entry_size, Ordering::SeqCst);
255        self.stats.writes.fetch_add(1, Ordering::Relaxed);
256
257        Ok(())
258    }
259
260    /// Evicts a tile using the configured strategy
261    async fn evict_tile(&self) {
262        // Prefer evicting from higher zoom levels (more tiles, less important)
263        let mut max_level = 0u8;
264        for entry in self.level_stats.iter() {
265            if *entry.key() > max_level && entry.tile_count.load(Ordering::SeqCst) > 0 {
266                max_level = *entry.key();
267            }
268        }
269
270        // Find an entry at this level
271        let mut key_to_remove: Option<TileCoord> = None;
272        for entry in self.storage.iter() {
273            if entry.key().z == max_level {
274                key_to_remove = Some(entry.key().clone());
275                break;
276            }
277        }
278
279        if let Some(coord) = key_to_remove {
280            if let Some((_, entry)) = self.storage.remove(&coord) {
281                self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
282
283                if let Some(level_stat) = self.level_stats.get(&coord.z) {
284                    level_stat.tile_count.fetch_sub(1, Ordering::SeqCst);
285                    level_stat
286                        .total_size
287                        .fetch_sub(entry.size, Ordering::SeqCst);
288                }
289
290                self.stats.evictions.fetch_add(1, Ordering::Relaxed);
291            }
292        }
293    }
294
295    /// Removes a tile
296    pub async fn remove(&self, coord: &TileCoord) -> Result<()> {
297        if let Some((_, entry)) = self.storage.remove(coord) {
298            self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
299
300            if let Some(level_stat) = self.level_stats.get(&coord.z) {
301                level_stat.tile_count.fetch_sub(1, Ordering::SeqCst);
302                level_stat
303                    .total_size
304                    .fetch_sub(entry.size, Ordering::SeqCst);
305            }
306        }
307        Ok(())
308    }
309
310    /// Prefetches adjacent tiles
311    pub fn get_prefetch_targets(&self, coord: &TileCoord) -> Vec<TileCoord> {
312        let mut targets = Vec::new();
313
314        // Adjacent tiles at same level
315        let x = coord.x;
316        let y = coord.y;
317        let z = coord.z;
318
319        let offsets: [(i32, i32); 8] = [
320            (-1, -1),
321            (0, -1),
322            (1, -1),
323            (-1, 0),
324            (1, 0),
325            (-1, 1),
326            (0, 1),
327            (1, 1),
328        ];
329
330        for (dx, dy) in offsets {
331            let nx = x as i64 + dx as i64;
332            let ny = y as i64 + dy as i64;
333            if nx >= 0 && ny >= 0 {
334                targets.push(TileCoord::new(z, nx as u32, ny as u32));
335            }
336        }
337
338        // Parent tile
339        if let Some(parent) = coord.parent() {
340            targets.push(parent);
341        }
342
343        targets
344    }
345
346    /// Clears the cache
347    pub async fn clear(&self) -> Result<()> {
348        self.storage.clear();
349        self.level_stats.clear();
350        self.current_size.store(0, Ordering::SeqCst);
351        Ok(())
352    }
353
354    /// Returns cache statistics
355    #[must_use]
356    pub fn stats(&self) -> &CacheStats {
357        &self.stats
358    }
359}
360
361/// Persistent disk cache with metadata
362pub struct PersistentDiskCache {
363    /// Cache directory
364    cache_dir: PathBuf,
365    /// Metadata storage
366    metadata: Arc<RwLock<HashMap<CacheKey, DiskCacheMetadata>>>,
367    /// Current size
368    current_size: Arc<AtomicUsize>,
369    /// Configuration
370    config: CacheConfig,
371    /// Statistics
372    stats: CacheStats,
373}
374
375impl PersistentDiskCache {
376    /// Creates a new persistent disk cache
377    pub fn new(config: CacheConfig) -> Result<Self> {
378        let cache_dir = config.cache_dir.clone().ok_or_else(|| {
379            CloudError::Cache(CacheError::WriteError {
380                message: "Cache directory not configured".to_string(),
381            })
382        })?;
383
384        std::fs::create_dir_all(&cache_dir).map_err(|e| {
385            CloudError::Cache(CacheError::WriteError {
386                message: format!("Failed to create cache directory: {e}"),
387            })
388        })?;
389
390        let mut cache = Self {
391            cache_dir,
392            metadata: Arc::new(RwLock::new(HashMap::new())),
393            current_size: Arc::new(AtomicUsize::new(0)),
394            config,
395            stats: CacheStats::default(),
396        };
397
398        // Load existing metadata
399        cache.load_metadata_blocking()?;
400
401        Ok(cache)
402    }
403
404    /// Loads metadata from disk
405    fn load_metadata_blocking(&mut self) -> Result<()> {
406        let metadata_path = self.cache_dir.join("metadata.json");
407        if metadata_path.exists() {
408            let content = std::fs::read_to_string(&metadata_path).map_err(|e| {
409                CloudError::Cache(CacheError::ReadError {
410                    message: format!("Failed to read metadata: {e}"),
411                })
412            })?;
413
414            let metadata: HashMap<CacheKey, DiskCacheMetadata> = serde_json::from_str(&content)
415                .map_err(|e| {
416                    CloudError::Cache(CacheError::ReadError {
417                        message: format!("Failed to parse metadata: {e}"),
418                    })
419                })?;
420
421            let total_size: usize = metadata.values().map(|m| m.size).sum();
422            self.current_size.store(total_size, Ordering::SeqCst);
423            *self.metadata.blocking_write() = metadata;
424        }
425        Ok(())
426    }
427
428    /// Saves metadata to disk
429    async fn save_metadata(&self) -> Result<()> {
430        let metadata_path = self.cache_dir.join("metadata.json");
431        let metadata = self.metadata.read().await;
432        let content = serde_json::to_string_pretty(&*metadata).map_err(|e| {
433            CloudError::Cache(CacheError::WriteError {
434                message: format!("Failed to serialize metadata: {e}"),
435            })
436        })?;
437
438        tokio::fs::write(&metadata_path, content)
439            .await
440            .map_err(|e| {
441                CloudError::Cache(CacheError::WriteError {
442                    message: format!("Failed to write metadata: {e}"),
443                })
444            })?;
445
446        Ok(())
447    }
448
449    /// Gets the file path for a cache key
450    fn get_path(&self, key: &CacheKey) -> PathBuf {
451        use sha2::{Digest, Sha256};
452        let mut hasher = Sha256::new();
453        hasher.update(key.as_bytes());
454        let hash = hasher.finalize();
455        let filename = format!("{:x}", hash);
456        self.cache_dir.join(&filename[..2]).join(filename)
457    }
458
459    /// Gets an entry from disk
460    pub async fn get(&self, key: &CacheKey) -> Result<Bytes> {
461        let metadata = self.metadata.read().await;
462
463        if let Some(meta) = metadata.get(key) {
464            // Check expiration
465            if let Some(expires_at_ms) = meta.expires_at_ms {
466                let now_ms = std::time::SystemTime::now()
467                    .duration_since(std::time::UNIX_EPOCH)
468                    .map(|d| d.as_millis() as u64)
469                    .unwrap_or(0);
470
471                if now_ms >= expires_at_ms {
472                    drop(metadata);
473                    self.remove(key).await?;
474                    return Err(CloudError::Cache(CacheError::Miss { key: key.clone() }));
475                }
476            }
477
478            let path = self.get_path(key);
479            let data = tokio::fs::read(&path)
480                .await
481                .map_err(|_| CloudError::Cache(CacheError::Miss { key: key.clone() }))?;
482
483            self.stats.hits.fetch_add(1, Ordering::Relaxed);
484            Ok(Bytes::from(data))
485        } else {
486            self.stats.misses.fetch_add(1, Ordering::Relaxed);
487            Err(CloudError::Cache(CacheError::Miss { key: key.clone() }))
488        }
489    }
490
491    /// Puts an entry to disk
492    pub async fn put(&self, key: CacheKey, data: Bytes, ttl: Option<Duration>) -> Result<()> {
493        let path = self.get_path(&key);
494
495        // Create parent directory
496        if let Some(parent) = path.parent() {
497            tokio::fs::create_dir_all(parent).await.map_err(|e| {
498                CloudError::Cache(CacheError::WriteError {
499                    message: format!("Failed to create directory: {e}"),
500                })
501            })?;
502        }
503
504        let size = data.len();
505
506        // Evict if necessary
507        while self.current_size.load(Ordering::SeqCst) + size > self.config.max_disk_size {
508            self.evict_oldest().await?;
509        }
510
511        // Write data
512        tokio::fs::write(&path, &data).await.map_err(|e| {
513            CloudError::Cache(CacheError::WriteError {
514                message: format!("Failed to write file: {e}"),
515            })
516        })?;
517
518        // Update metadata
519        let now_ms = std::time::SystemTime::now()
520            .duration_since(std::time::UNIX_EPOCH)
521            .map(|d| d.as_millis() as u64)
522            .unwrap_or(0);
523
524        let expires_at_ms = ttl
525            .or(self.config.default_ttl)
526            .map(|d| now_ms + d.as_millis() as u64);
527
528        let meta = DiskCacheMetadata {
529            path: path
530                .strip_prefix(&self.cache_dir)
531                .unwrap_or(&path)
532                .to_string_lossy()
533                .to_string(),
534            size,
535            created_at_ms: now_ms,
536            expires_at_ms,
537            access_count: 1,
538            compressed: false,
539        };
540
541        {
542            let mut metadata = self.metadata.write().await;
543            if let Some(old) = metadata.insert(key, meta) {
544                self.current_size.fetch_sub(old.size, Ordering::SeqCst);
545            }
546        }
547
548        self.current_size.fetch_add(size, Ordering::SeqCst);
549        self.stats.writes.fetch_add(1, Ordering::Relaxed);
550
551        // Save metadata periodically
552        if self.stats.writes.load(Ordering::Relaxed) % 10 == 0 {
553            self.save_metadata().await?;
554        }
555
556        Ok(())
557    }
558
559    /// Evicts oldest entry
560    async fn evict_oldest(&self) -> Result<()> {
561        let mut oldest_key: Option<String> = None;
562        let mut oldest_time = u64::MAX;
563
564        {
565            let metadata = self.metadata.read().await;
566            for (key, meta) in metadata.iter() {
567                if meta.created_at_ms < oldest_time {
568                    oldest_time = meta.created_at_ms;
569                    oldest_key = Some(key.clone());
570                }
571            }
572        }
573
574        if let Some(key) = oldest_key {
575            self.remove(&key).await?;
576        }
577
578        Ok(())
579    }
580
581    /// Removes an entry
582    pub async fn remove(&self, key: &CacheKey) -> Result<()> {
583        let path = self.get_path(key);
584        tokio::fs::remove_file(&path).await.ok();
585
586        let mut metadata = self.metadata.write().await;
587        if let Some(meta) = metadata.remove(key) {
588            self.current_size.fetch_sub(meta.size, Ordering::SeqCst);
589            self.stats.evictions.fetch_add(1, Ordering::Relaxed);
590        }
591
592        Ok(())
593    }
594
595    /// Clears the cache
596    pub async fn clear(&self) -> Result<()> {
597        let metadata = self.metadata.read().await;
598        for key in metadata.keys() {
599            let path = self.get_path(key);
600            tokio::fs::remove_file(&path).await.ok();
601        }
602        drop(metadata);
603
604        self.metadata.write().await.clear();
605        self.current_size.store(0, Ordering::SeqCst);
606
607        self.save_metadata().await?;
608
609        Ok(())
610    }
611
612    /// Returns cache statistics
613    #[must_use]
614    pub fn stats(&self) -> &CacheStats {
615        &self.stats
616    }
617}