reddb_file/serverless/
cache.rs1use super::*;
2use std::time::{SystemTime, UNIX_EPOCH};
3
4#[derive(Debug, Clone, PartialEq, Eq)]
5pub struct ServerlessCachePolicy {
6 pub keep_boot_index_local: bool,
7 pub keep_hot_snapshot_local: bool,
8 pub max_hot_bytes: u64,
9}
10
11impl Default for ServerlessCachePolicy {
12 fn default() -> Self {
13 Self {
14 keep_boot_index_local: true,
15 keep_hot_snapshot_local: true,
16 max_hot_bytes: 256 * 1024 * 1024,
17 }
18 }
19}
20
21#[derive(Debug, Clone, PartialEq, Eq)]
22pub struct ServerlessCacheEntry {
23 pub relative_path: PathBuf,
24 pub bytes: u64,
25 pub hot: bool,
26 pub last_access_unix_ms: u64,
27}
28
29impl ServerlessCacheEntry {
30 pub fn new(
31 relative_path: impl Into<PathBuf>,
32 bytes: u64,
33 hot: bool,
34 last_access_unix_ms: u64,
35 ) -> Self {
36 Self {
37 relative_path: relative_path.into(),
38 bytes,
39 hot,
40 last_access_unix_ms,
41 }
42 }
43}
44
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct ServerlessCacheEvictionPlan {
47 pub evict: Vec<PathBuf>,
48 pub bytes_after_eviction: u64,
49}
50
51impl ServerlessCacheEvictionPlan {
52 pub fn plan(entries: &[ServerlessCacheEntry], max_bytes: u64) -> Self {
53 let mut total: u64 = entries.iter().map(|entry| entry.bytes).sum();
54 let mut candidates: Vec<&ServerlessCacheEntry> = entries.iter().collect();
55 candidates.sort_by_key(|entry| (entry.hot, entry.last_access_unix_ms));
56 let mut evict = Vec::new();
57 for entry in candidates {
58 if total <= max_bytes {
59 break;
60 }
61 evict.push(entry.relative_path.clone());
62 total = total.saturating_sub(entry.bytes);
63 }
64 Self {
65 evict,
66 bytes_after_eviction: total,
67 }
68 }
69}
70
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct ServerlessLocalCache {
73 pub root: PathBuf,
74 pub generation: u64,
75}
76
77impl ServerlessLocalCache {
78 pub fn new(root: impl Into<PathBuf>, generation: u64) -> Self {
79 Self {
80 root: root.into(),
81 generation,
82 }
83 }
84
85 pub fn cache_dir(&self) -> PathBuf {
86 self.root.join(format!("g{:020}", self.generation))
87 }
88
89 pub fn path_for_request(&self, request: &ServerlessHydrationRequest) -> PathBuf {
90 self.cache_dir()
91 .join(format!("{}.redcache", hydration_cache_key(request)))
92 }
93
94 pub fn write_hydrated_range(&self, range: &ServerlessHydratedRange) -> RdbFileResult<PathBuf> {
95 range.request.validate_payload(&range.payload)?;
96 let path = self.path_for_request(&range.request);
97 write_bytes(&path, &range.payload)?;
98 Ok(path)
99 }
100
101 pub fn read_hydrated_range(
102 &self,
103 request: &ServerlessHydrationRequest,
104 ) -> RdbFileResult<ServerlessHydratedRange> {
105 let path = self.path_for_request(request);
106 let payload = fs::read(&path)?;
107 request.validate_payload(&payload)?;
108 write_bytes(&path, &payload)?;
109 Ok(ServerlessHydratedRange {
110 request: request.clone(),
111 payload,
112 })
113 }
114
115 pub fn remove_hydrated_range(&self, request: &ServerlessHydrationRequest) -> RdbFileResult<()> {
116 let path = self.path_for_request(request);
117 match fs::remove_file(path) {
118 Ok(()) => Ok(()),
119 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
120 Err(err) => Err(err.into()),
121 }
122 }
123
124 pub fn cached_entries(&self) -> RdbFileResult<Vec<ServerlessCacheEntry>> {
125 let cache_dir = self.cache_dir();
126 let entries = match fs::read_dir(&cache_dir) {
127 Ok(entries) => entries,
128 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
129 Err(err) => return Err(err.into()),
130 };
131 let mut cached = Vec::new();
132 for entry in entries {
133 let entry = entry?;
134 let path = entry.path();
135 if !path.is_file() || path.extension().and_then(|ext| ext.to_str()) != Some("redcache")
136 {
137 continue;
138 }
139 let Some(file_name) = path.file_name() else {
140 continue;
141 };
142 let metadata = entry.metadata()?;
143 cached.push(ServerlessCacheEntry::new(
144 PathBuf::from(file_name),
145 metadata.len(),
146 true,
147 metadata
148 .modified()
149 .ok()
150 .and_then(system_time_to_unix_ms)
151 .unwrap_or(0),
152 ));
153 }
154 Ok(cached)
155 }
156
157 pub fn enforce_max_bytes(&self, max_bytes: u64) -> RdbFileResult<ServerlessCacheEvictionPlan> {
158 let entries = self.cached_entries()?;
159 let plan = ServerlessCacheEvictionPlan::plan(&entries, max_bytes);
160 for relative_path in &plan.evict {
161 validate_cache_relative_path(relative_path)?;
162 let path = self.cache_dir().join(relative_path);
163 match fs::remove_file(path) {
164 Ok(()) => {}
165 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
166 Err(err) => return Err(err.into()),
167 }
168 }
169 Ok(plan)
170 }
171}
172
173fn system_time_to_unix_ms(time: SystemTime) -> Option<u64> {
174 let millis = time.duration_since(UNIX_EPOCH).ok()?.as_millis();
175 u64::try_from(millis).ok()
176}
177
178fn validate_cache_relative_path(path: &Path) -> RdbFileResult<()> {
179 if path.is_absolute() {
180 return Err(RdbFileError::InvalidOperation(
181 "serverless cache path must be relative".into(),
182 ));
183 }
184 let mut components = path.components();
185 if !matches!(components.next(), Some(std::path::Component::Normal(_)))
186 || components.next().is_some()
187 {
188 return Err(RdbFileError::InvalidOperation(
189 "serverless cache path must be a file name".into(),
190 ));
191 }
192 Ok(())
193}
194
195fn hydration_cache_key(request: &ServerlessHydrationRequest) -> String {
196 let mut hasher = blake3::Hasher::new();
197 hasher.update(request.relative_path.to_string_lossy().as_bytes());
198 hasher.update(&[0]);
199 hasher.update(&request.offset.to_le_bytes());
200 hasher.update(&request.bytes.to_le_bytes());
201 hasher.update(&request.checksum.to_le_bytes());
202 hasher.update(&request.content_hash.0);
203 hex_bytes(hasher.finalize().as_bytes())
204}
205
206fn hex_bytes(bytes: &[u8]) -> String {
207 const HEX: &[u8; 16] = b"0123456789abcdef";
208 let mut out = String::with_capacity(bytes.len() * 2);
209 for byte in bytes {
210 out.push(HEX[(byte >> 4) as usize] as char);
211 out.push(HEX[(byte & 0x0f) as usize] as char);
212 }
213 out
214}