1use super::*;
2
3#[derive(Debug, Clone, PartialEq, Eq)]
4pub struct ServerlessFilePlan {
5 pub root: PathBuf,
6 pub namespace: String,
7 pub generation: u64,
8 pub cache_policy: ServerlessCachePolicy,
9}
10
11impl ServerlessFilePlan {
12 pub fn new(root: impl Into<PathBuf>, namespace: impl Into<String>, generation: u64) -> Self {
13 Self {
14 root: root.into(),
15 namespace: namespace.into(),
16 generation,
17 cache_policy: ServerlessCachePolicy::default(),
18 }
19 }
20
21 pub fn for_data_path(data_path: impl AsRef<Path>, generation: u64) -> Self {
22 let data_path = data_path.as_ref();
23 Self::new(
24 crate::layout::serverless_root(data_path),
25 crate::layout::serverless_namespace(data_path),
26 generation,
27 )
28 }
29
30 pub fn with_cache_policy(mut self, policy: ServerlessCachePolicy) -> Self {
31 self.cache_policy = policy;
32 self
33 }
34
35 pub fn for_generation(&self, generation: u64) -> Self {
36 Self::new(self.root.clone(), self.namespace.clone(), generation)
37 .with_cache_policy(self.cache_policy.clone())
38 }
39
40 pub fn local_cache(&self) -> ServerlessLocalCache {
41 ServerlessLocalCache::new(
42 crate::layout::serverless_cache_root(&self.root, &self.namespace),
43 self.generation,
44 )
45 }
46
47 pub fn artifact_path(&self, kind: ServerlessPackKind) -> PathBuf {
48 self.root
49 .join(&self.namespace)
50 .join(format!("g{:020}", self.generation))
51 .join(format!("{}.redpack", kind.as_str()))
52 }
53
54 pub fn generation_dir(&self) -> PathBuf {
55 self.root
56 .join(&self.namespace)
57 .join(format!("g{:020}", self.generation))
58 }
59
60 pub fn manifest_path(&self) -> PathBuf {
61 self.artifact_path(ServerlessPackKind::Manifest)
62 }
63
64 pub fn boot_index_path(&self) -> PathBuf {
65 self.artifact_path(ServerlessPackKind::BootIndex)
66 }
67
68 pub fn extent_index_path(&self) -> PathBuf {
69 self.artifact_path(ServerlessPackKind::ExtentIndex)
70 }
71
72 pub fn collection_data_path(&self) -> PathBuf {
73 self.artifact_path(ServerlessPackKind::CollectionData)
74 }
75
76 pub fn collection_data_extent_ref(
77 &self,
78 collection: impl Into<String>,
79 offset: u64,
80 payload: &[u8],
81 hot: bool,
82 ) -> RdbFileResult<ServerlessExtentRef> {
83 ServerlessExtentRef::new(
84 collection,
85 Vec::<u8>::new(),
86 Vec::<u8>::new(),
87 relative_to_generation_dir(&self.collection_data_path()),
88 offset,
89 payload,
90 hot,
91 )
92 }
93
94 pub fn secondary_index_path(&self) -> PathBuf {
95 self.artifact_path(ServerlessPackKind::SecondaryIndex)
96 }
97
98 pub fn current_pointer_path(&self) -> PathBuf {
99 self.root.join(&self.namespace).join("CURRENT.redptr")
100 }
101
102 pub fn publish_generation_pointer(
103 &self,
104 manifest: &ServerlessManifest,
105 ) -> RdbFileResult<ServerlessGenerationPointer> {
106 if manifest.namespace != self.namespace {
107 return Err(RdbFileError::InvalidOperation(format!(
108 "manifest namespace {} does not match plan namespace {}",
109 manifest.namespace, self.namespace
110 )));
111 }
112 if manifest.generation != self.generation {
113 return Err(RdbFileError::InvalidOperation(format!(
114 "manifest generation {} does not match plan generation {}",
115 manifest.generation, self.generation
116 )));
117 }
118 self.validate_complete_generation(manifest)?;
119 let pointer = ServerlessGenerationPointer::from_manifest(self, manifest);
120 pointer.write_to_path(self.current_pointer_path())?;
121 Ok(pointer)
122 }
123
124 pub fn read_current_pointer(&self) -> RdbFileResult<ServerlessGenerationPointer> {
125 ServerlessGenerationPointer::read_from_path(self.current_pointer_path())
126 }
127
128 pub fn read_current_pointer_verified(&self) -> RdbFileResult<ServerlessGenerationPointer> {
129 let pointer = self.read_current_pointer()?;
130 if pointer.namespace != self.namespace {
131 return Err(RdbFileError::InvalidOperation(format!(
132 "current pointer namespace {} does not match plan namespace {}",
133 pointer.namespace, self.namespace
134 )));
135 }
136
137 let expected_manifest_relative_path =
138 PathBuf::from(format!("g{:020}/manifest.redpack", pointer.generation));
139 validate_generation_relative_path(&pointer.manifest_relative_path)?;
140 if pointer.manifest_relative_path != expected_manifest_relative_path {
141 return Err(RdbFileError::InvalidOperation(format!(
142 "current pointer manifest path {} does not match expected {}",
143 pointer.manifest_relative_path.display(),
144 expected_manifest_relative_path.display()
145 )));
146 }
147
148 let generation_plan = ServerlessFilePlan::new(
149 self.root.clone(),
150 self.namespace.clone(),
151 pointer.generation,
152 )
153 .with_cache_policy(self.cache_policy.clone());
154 let manifest_path = self
155 .root
156 .join(&self.namespace)
157 .join(&pointer.manifest_relative_path);
158 let manifest_bytes = fs::read(&manifest_path)?;
159 if manifest_bytes.len() as u64 != pointer.manifest_bytes {
160 return Err(RdbFileError::InvalidOperation(format!(
161 "current pointer manifest has {} bytes, expected {}",
162 manifest_bytes.len(),
163 pointer.manifest_bytes
164 )));
165 }
166 let computed_crc = crc32(&manifest_bytes);
167 if computed_crc != pointer.manifest_checksum {
168 return Err(RdbFileError::InvalidOperation(format!(
169 "current pointer manifest checksum mismatch: stored {:#010x}, computed {computed_crc:#010x}",
170 pointer.manifest_checksum
171 )));
172 }
173 let computed_hash = ServerlessContentHash::from_bytes(&manifest_bytes);
174 if computed_hash != pointer.manifest_content_hash {
175 return Err(RdbFileError::InvalidOperation(
176 "current pointer manifest content hash mismatch".into(),
177 ));
178 }
179
180 let manifest = ServerlessManifest::decode(&manifest_bytes)?;
181 if manifest.namespace != pointer.namespace {
182 return Err(RdbFileError::InvalidOperation(format!(
183 "current pointer namespace {} does not match manifest namespace {}",
184 pointer.namespace, manifest.namespace
185 )));
186 }
187 if manifest.generation != pointer.generation {
188 return Err(RdbFileError::InvalidOperation(format!(
189 "current pointer generation {} does not match manifest generation {}",
190 pointer.generation, manifest.generation
191 )));
192 }
193 generation_plan.validate_complete_generation(&manifest)?;
194 Ok(pointer)
195 }
196
197 pub fn wal_tail_path(&self) -> PathBuf {
198 self.artifact_path(ServerlessPackKind::WalTail)
199 }
200
201 pub fn hot_snapshot_path(&self) -> PathBuf {
202 self.artifact_path(ServerlessPackKind::HotSnapshot)
203 }
204
205 pub fn publish_core_generation(
206 &self,
207 extent_index: &ServerlessExtentIndex,
208 collection_data: &[u8],
209 secondary_index: &[u8],
210 ) -> RdbFileResult<ServerlessGenerationPointer> {
211 if extent_index.generation != self.generation {
212 return Err(RdbFileError::InvalidOperation(format!(
213 "extent index generation {} does not match plan generation {}",
214 extent_index.generation, self.generation
215 )));
216 }
217
218 let mut manifest = ServerlessManifest::new(&self.namespace, self.generation);
219 let boot_index = ServerlessBootIndex::from_plan(self);
220 let boot_index_bytes = boot_index.encode();
221 let extent_index_bytes = extent_index.encode();
222 let empty_pack: &[u8] = &[];
223 let packs = [
224 (
225 ServerlessPackKind::BootIndex,
226 relative_to_generation_dir(&self.boot_index_path()),
227 boot_index_bytes.as_slice(),
228 ),
229 (
230 ServerlessPackKind::ExtentIndex,
231 relative_to_generation_dir(&self.extent_index_path()),
232 extent_index_bytes.as_slice(),
233 ),
234 (
235 ServerlessPackKind::HotSnapshot,
236 relative_to_generation_dir(&self.hot_snapshot_path()),
237 empty_pack,
238 ),
239 (
240 ServerlessPackKind::WalTail,
241 relative_to_generation_dir(&self.wal_tail_path()),
242 empty_pack,
243 ),
244 (
245 ServerlessPackKind::CollectionData,
246 relative_to_generation_dir(&self.collection_data_path()),
247 collection_data,
248 ),
249 (
250 ServerlessPackKind::SecondaryIndex,
251 relative_to_generation_dir(&self.secondary_index_path()),
252 secondary_index,
253 ),
254 (
255 ServerlessPackKind::ColdArchive,
256 relative_to_generation_dir(&self.artifact_path(ServerlessPackKind::ColdArchive)),
257 empty_pack,
258 ),
259 ];
260
261 for (kind, relative_path, payload) in packs {
262 write_bytes(self.generation_dir().join(&relative_path), payload)?;
263 manifest.push(ServerlessManifestEntry::from_bytes(
264 kind,
265 relative_path,
266 payload,
267 ));
268 }
269 manifest.write_to_path(self.manifest_path())?;
270 self.publish_generation_pointer(&manifest)
271 }
272
273 pub fn validate_complete_generation(&self, manifest: &ServerlessManifest) -> RdbFileResult<()> {
274 let required = [
275 ServerlessPackKind::BootIndex,
276 ServerlessPackKind::ExtentIndex,
277 ServerlessPackKind::HotSnapshot,
278 ServerlessPackKind::WalTail,
279 ServerlessPackKind::CollectionData,
280 ServerlessPackKind::SecondaryIndex,
281 ];
282 for required_kind in required {
283 if !manifest
284 .entries
285 .iter()
286 .any(|entry| entry.kind == required_kind)
287 {
288 return Err(RdbFileError::InvalidOperation(format!(
289 "serverless generation {} is missing required {} pack",
290 self.generation,
291 required_kind.as_str()
292 )));
293 }
294 }
295
296 for entry in &manifest.entries {
297 validate_generation_relative_path(&entry.relative_path)?;
298 let payload = fs::read(self.generation_dir().join(&entry.relative_path))?;
299 if payload.len() as u64 != entry.bytes {
300 return Err(RdbFileError::InvalidOperation(format!(
301 "serverless pack {} has {} bytes, expected {}",
302 entry.relative_path.display(),
303 payload.len(),
304 entry.bytes
305 )));
306 }
307 let computed_crc = crc32(&payload);
308 if computed_crc != entry.checksum {
309 return Err(RdbFileError::InvalidOperation(format!(
310 "serverless pack {} checksum mismatch: stored {:#010x}, computed {computed_crc:#010x}",
311 entry.relative_path.display(),
312 entry.checksum
313 )));
314 }
315 let computed_hash = ServerlessContentHash::from_bytes(&payload);
316 if !entry.content_hash.is_zero() && computed_hash != entry.content_hash {
317 return Err(RdbFileError::InvalidOperation(format!(
318 "serverless pack {} content hash mismatch",
319 entry.relative_path.display()
320 )));
321 }
322 }
323
324 let manifest_bytes = fs::read(self.manifest_path())?;
325 let encoded = manifest.encode();
326 if manifest_bytes != encoded {
327 return Err(RdbFileError::InvalidOperation(
328 "serverless manifest on disk does not match publish manifest".into(),
329 ));
330 }
331 Ok(())
332 }
333
334 pub fn cold_start_order(&self) -> Vec<PathBuf> {
335 let mut order = vec![self.manifest_path(), self.boot_index_path()];
336 order.push(self.extent_index_path());
337 if self.cache_policy.keep_hot_snapshot_local {
338 order.push(self.hot_snapshot_path());
339 }
340 order.push(self.wal_tail_path());
341 order
342 }
343
344 pub fn hot_start_order(&self) -> Vec<PathBuf> {
345 let mut order = Vec::new();
346 if self.cache_policy.keep_boot_index_local {
347 order.push(self.boot_index_path());
348 }
349 if self.cache_policy.keep_hot_snapshot_local {
350 order.push(self.hot_snapshot_path());
351 }
352 order.push(self.wal_tail_path());
353 order
354 }
355
356 pub fn is_generation_dir(path: &Path) -> bool {
357 path.file_name()
358 .and_then(|name| name.to_str())
359 .map(|name| {
360 name.len() == 21
361 && name.starts_with('g')
362 && name[1..].chars().all(|c| c.is_ascii_digit())
363 })
364 .unwrap_or(false)
365 }
366}
367
368fn validate_generation_relative_path(path: &Path) -> RdbFileResult<()> {
369 if path.is_absolute() {
370 return Err(RdbFileError::InvalidOperation(
371 "serverless pack path must be relative".into(),
372 ));
373 }
374 if path
375 .components()
376 .any(|component| matches!(component, std::path::Component::ParentDir))
377 {
378 return Err(RdbFileError::InvalidOperation(
379 "serverless pack path must not contain parent components".into(),
380 ));
381 }
382 Ok(())
383}
384
385#[derive(Debug, Clone, PartialEq, Eq)]
386pub struct ServerlessBootPlan {
387 pub required_first: Vec<PathBuf>,
388 pub lazy_after_open: Vec<PathBuf>,
389}
390
391impl ServerlessBootPlan {
392 pub fn cold(plan: &ServerlessFilePlan) -> Self {
393 Self {
394 required_first: plan.cold_start_order(),
395 lazy_after_open: vec![
396 plan.artifact_path(ServerlessPackKind::CollectionData),
397 plan.artifact_path(ServerlessPackKind::SecondaryIndex),
398 plan.artifact_path(ServerlessPackKind::ColdArchive),
399 ],
400 }
401 }
402
403 pub fn hot(plan: &ServerlessFilePlan) -> Self {
404 Self {
405 required_first: plan.hot_start_order(),
406 lazy_after_open: vec![
407 plan.artifact_path(ServerlessPackKind::Manifest),
408 plan.artifact_path(ServerlessPackKind::CollectionData),
409 plan.artifact_path(ServerlessPackKind::SecondaryIndex),
410 ],
411 }
412 }
413}