Skip to main content

reddb_file/serverless/
cache.rs

1use super::*;
2use std::time::{SystemTime, UNIX_EPOCH};
3
4#[derive(Debug, Clone, PartialEq, Eq)]
5pub struct ServerlessCachePolicy {
6    pub keep_boot_index_local: bool,
7    pub keep_hot_snapshot_local: bool,
8    pub max_hot_bytes: u64,
9}
10
11impl Default for ServerlessCachePolicy {
12    fn default() -> Self {
13        Self {
14            keep_boot_index_local: true,
15            keep_hot_snapshot_local: true,
16            max_hot_bytes: 256 * 1024 * 1024,
17        }
18    }
19}
20
21#[derive(Debug, Clone, PartialEq, Eq)]
22pub struct ServerlessCacheEntry {
23    pub relative_path: PathBuf,
24    pub bytes: u64,
25    pub hot: bool,
26    pub last_access_unix_ms: u64,
27}
28
29impl ServerlessCacheEntry {
30    pub fn new(
31        relative_path: impl Into<PathBuf>,
32        bytes: u64,
33        hot: bool,
34        last_access_unix_ms: u64,
35    ) -> Self {
36        Self {
37            relative_path: relative_path.into(),
38            bytes,
39            hot,
40            last_access_unix_ms,
41        }
42    }
43}
44
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct ServerlessCacheEvictionPlan {
47    pub evict: Vec<PathBuf>,
48    pub bytes_after_eviction: u64,
49}
50
51impl ServerlessCacheEvictionPlan {
52    pub fn plan(entries: &[ServerlessCacheEntry], max_bytes: u64) -> Self {
53        let mut total: u64 = entries.iter().map(|entry| entry.bytes).sum();
54        let mut candidates: Vec<&ServerlessCacheEntry> = entries.iter().collect();
55        candidates.sort_by_key(|entry| (entry.hot, entry.last_access_unix_ms));
56        let mut evict = Vec::new();
57        for entry in candidates {
58            if total <= max_bytes {
59                break;
60            }
61            evict.push(entry.relative_path.clone());
62            total = total.saturating_sub(entry.bytes);
63        }
64        Self {
65            evict,
66            bytes_after_eviction: total,
67        }
68    }
69}
70
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct ServerlessLocalCache {
73    pub root: PathBuf,
74    pub generation: u64,
75}
76
77impl ServerlessLocalCache {
78    pub fn new(root: impl Into<PathBuf>, generation: u64) -> Self {
79        Self {
80            root: root.into(),
81            generation,
82        }
83    }
84
85    pub fn cache_dir(&self) -> PathBuf {
86        self.root.join(format!("g{:020}", self.generation))
87    }
88
89    pub fn path_for_request(&self, request: &ServerlessHydrationRequest) -> PathBuf {
90        self.cache_dir()
91            .join(format!("{}.redcache", hydration_cache_key(request)))
92    }
93
94    pub fn write_hydrated_range(&self, range: &ServerlessHydratedRange) -> RdbFileResult<PathBuf> {
95        range.request.validate_payload(&range.payload)?;
96        let path = self.path_for_request(&range.request);
97        write_bytes(&path, &range.payload)?;
98        Ok(path)
99    }
100
101    pub fn read_hydrated_range(
102        &self,
103        request: &ServerlessHydrationRequest,
104    ) -> RdbFileResult<ServerlessHydratedRange> {
105        let path = self.path_for_request(request);
106        let payload = fs::read(&path)?;
107        request.validate_payload(&payload)?;
108        write_bytes(&path, &payload)?;
109        Ok(ServerlessHydratedRange {
110            request: request.clone(),
111            payload,
112        })
113    }
114
115    pub fn remove_hydrated_range(&self, request: &ServerlessHydrationRequest) -> RdbFileResult<()> {
116        let path = self.path_for_request(request);
117        match fs::remove_file(path) {
118            Ok(()) => Ok(()),
119            Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
120            Err(err) => Err(err.into()),
121        }
122    }
123
124    pub fn cached_entries(&self) -> RdbFileResult<Vec<ServerlessCacheEntry>> {
125        let cache_dir = self.cache_dir();
126        let entries = match fs::read_dir(&cache_dir) {
127            Ok(entries) => entries,
128            Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
129            Err(err) => return Err(err.into()),
130        };
131        let mut cached = Vec::new();
132        for entry in entries {
133            let entry = entry?;
134            let path = entry.path();
135            if !path.is_file() || path.extension().and_then(|ext| ext.to_str()) != Some("redcache")
136            {
137                continue;
138            }
139            let Some(file_name) = path.file_name() else {
140                continue;
141            };
142            let metadata = entry.metadata()?;
143            cached.push(ServerlessCacheEntry::new(
144                PathBuf::from(file_name),
145                metadata.len(),
146                true,
147                metadata
148                    .modified()
149                    .ok()
150                    .and_then(system_time_to_unix_ms)
151                    .unwrap_or(0),
152            ));
153        }
154        Ok(cached)
155    }
156
157    pub fn enforce_max_bytes(&self, max_bytes: u64) -> RdbFileResult<ServerlessCacheEvictionPlan> {
158        let entries = self.cached_entries()?;
159        let plan = ServerlessCacheEvictionPlan::plan(&entries, max_bytes);
160        for relative_path in &plan.evict {
161            validate_cache_relative_path(relative_path)?;
162            let path = self.cache_dir().join(relative_path);
163            match fs::remove_file(path) {
164                Ok(()) => {}
165                Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
166                Err(err) => return Err(err.into()),
167            }
168        }
169        Ok(plan)
170    }
171}
172
173fn system_time_to_unix_ms(time: SystemTime) -> Option<u64> {
174    let millis = time.duration_since(UNIX_EPOCH).ok()?.as_millis();
175    u64::try_from(millis).ok()
176}
177
178fn validate_cache_relative_path(path: &Path) -> RdbFileResult<()> {
179    if path.is_absolute() {
180        return Err(RdbFileError::InvalidOperation(
181            "serverless cache path must be relative".into(),
182        ));
183    }
184    let mut components = path.components();
185    if !matches!(components.next(), Some(std::path::Component::Normal(_)))
186        || components.next().is_some()
187    {
188        return Err(RdbFileError::InvalidOperation(
189            "serverless cache path must be a file name".into(),
190        ));
191    }
192    Ok(())
193}
194
195fn hydration_cache_key(request: &ServerlessHydrationRequest) -> String {
196    let mut hasher = blake3::Hasher::new();
197    hasher.update(request.relative_path.to_string_lossy().as_bytes());
198    hasher.update(&[0]);
199    hasher.update(&request.offset.to_le_bytes());
200    hasher.update(&request.bytes.to_le_bytes());
201    hasher.update(&request.checksum.to_le_bytes());
202    hasher.update(&request.content_hash.0);
203    hex_bytes(hasher.finalize().as_bytes())
204}
205
206fn hex_bytes(bytes: &[u8]) -> String {
207    const HEX: &[u8; 16] = b"0123456789abcdef";
208    let mut out = String::with_capacity(bytes.len() * 2);
209    for byte in bytes {
210        out.push(HEX[(byte >> 4) as usize] as char);
211        out.push(HEX[(byte & 0x0f) as usize] as char);
212    }
213    out
214}