1use std::fs::{self, File, OpenOptions};
9use std::io::Write;
10use std::path::{Path, PathBuf};
11
12use crate::embedded::{RdbFileError, RdbFileResult};
13
14mod boot;
15mod cache;
16mod extent;
17mod hydrate;
18mod lease;
19mod manifest;
20mod plan;
21mod pointer;
22mod secondary;
23
24pub use boot::*;
25pub use cache::*;
26pub use extent::*;
27pub use hydrate::*;
28pub use lease::*;
29pub use manifest::*;
30pub use plan::*;
31pub use pointer::*;
32pub use secondary::*;
33
34const SERVERLESS_MANIFEST_MAGIC: &[u8; 8] = b"RDPKMNF1";
35const SERVERLESS_BOOT_INDEX_MAGIC: &[u8; 8] = b"RDPKBIX1";
36const SERVERLESS_GENERATION_POINTER_MAGIC: &[u8; 8] = b"RDPKCUR1";
37const SERVERLESS_EXTENT_INDEX_MAGIC: &[u8; 8] = b"RDPKEXT1";
38const SERVERLESS_SECONDARY_INDEX_MAGIC: &[u8; 8] = b"RDPKSIX1";
39const SERVERLESS_ARTIFACT_VERSION: u16 = 1;
40const CHECKSUM_LEN: usize = 4;
41const CONTENT_HASH_LEN: usize = 32;
42const SERVERLESS_CRASH_INJECT_ENV: &str = "REDDB_SERVERLESS_CRASH_AT";
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
45pub enum ServerlessPackKind {
46 Manifest,
47 BootIndex,
48 ExtentIndex,
49 HotSnapshot,
50 WalTail,
51 CollectionData,
52 SecondaryIndex,
53 ColdArchive,
54}
55
56impl ServerlessPackKind {
57 pub const fn as_str(self) -> &'static str {
58 match self {
59 Self::Manifest => "manifest",
60 Self::BootIndex => "boot-index",
61 Self::ExtentIndex => "extent-index",
62 Self::HotSnapshot => "hot-snapshot",
63 Self::WalTail => "wal-tail",
64 Self::CollectionData => "collection-data",
65 Self::SecondaryIndex => "secondary-index",
66 Self::ColdArchive => "cold-archive",
67 }
68 }
69}
70
71impl TryFrom<u8> for ServerlessPackKind {
72 type Error = RdbFileError;
73
74 fn try_from(value: u8) -> RdbFileResult<Self> {
75 match value {
76 1 => Ok(Self::Manifest),
77 2 => Ok(Self::BootIndex),
78 3 => Ok(Self::ExtentIndex),
79 4 => Ok(Self::HotSnapshot),
80 5 => Ok(Self::WalTail),
81 6 => Ok(Self::CollectionData),
82 7 => Ok(Self::SecondaryIndex),
83 8 => Ok(Self::ColdArchive),
84 other => Err(RdbFileError::InvalidOperation(format!(
85 "unknown serverless pack kind {other}"
86 ))),
87 }
88 }
89}
90
91impl From<ServerlessPackKind> for u8 {
92 fn from(value: ServerlessPackKind) -> Self {
93 match value {
94 ServerlessPackKind::Manifest => 1,
95 ServerlessPackKind::BootIndex => 2,
96 ServerlessPackKind::ExtentIndex => 3,
97 ServerlessPackKind::HotSnapshot => 4,
98 ServerlessPackKind::WalTail => 5,
99 ServerlessPackKind::CollectionData => 6,
100 ServerlessPackKind::SecondaryIndex => 7,
101 ServerlessPackKind::ColdArchive => 8,
102 }
103 }
104}
105
106#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
107pub struct ServerlessContentHash(pub [u8; CONTENT_HASH_LEN]);
108
109impl ServerlessContentHash {
110 pub const ZERO: Self = Self([0; CONTENT_HASH_LEN]);
111
112 pub fn from_bytes(bytes: &[u8]) -> Self {
113 Self(*blake3::hash(bytes).as_bytes())
114 }
115
116 pub fn is_zero(self) -> bool {
117 self.0 == [0; CONTENT_HASH_LEN]
118 }
119}
120
121fn kind_for_artifact_path(path: &Path) -> ServerlessPackKind {
122 match path.file_stem().and_then(|stem| stem.to_str()) {
123 Some("manifest") => ServerlessPackKind::Manifest,
124 Some("boot-index") => ServerlessPackKind::BootIndex,
125 Some("extent-index") => ServerlessPackKind::ExtentIndex,
126 Some("hot-snapshot") => ServerlessPackKind::HotSnapshot,
127 Some("wal-tail") => ServerlessPackKind::WalTail,
128 Some("collection-data") => ServerlessPackKind::CollectionData,
129 Some("secondary-index") => ServerlessPackKind::SecondaryIndex,
130 Some("cold-archive") => ServerlessPackKind::ColdArchive,
131 _ => ServerlessPackKind::ColdArchive,
132 }
133}
134
135fn relative_to_generation_dir(path: &Path) -> PathBuf {
136 path.file_name()
137 .map(PathBuf::from)
138 .unwrap_or_else(|| path.to_path_buf())
139}
140
141fn write_bytes(path: impl AsRef<Path>, bytes: &[u8]) -> RdbFileResult<()> {
142 let path = path.as_ref();
143 if let Some(parent) = path.parent() {
144 fs::create_dir_all(parent)?;
145 }
146 let tmp_path = crate::layout::atomic_temp_path(path);
147 {
148 let mut file = OpenOptions::new()
149 .create(true)
150 .truncate(true)
151 .write(true)
152 .open(&tmp_path)?;
153 file.write_all(bytes)?;
154 crash_inject("serverless_pack_after_tmp_write");
155 file.sync_all()?;
156 crash_inject("serverless_pack_after_tmp_sync");
157 }
158 fs::rename(&tmp_path, path)?;
159 crash_inject("serverless_pack_after_rename");
160 if let Some(parent) = path.parent() {
161 if let Ok(dir) = File::open(parent) {
162 let _ = dir.sync_all();
163 }
164 }
165 crash_inject("serverless_pack_after_dir_sync");
166 Ok(())
167}
168
169fn write_current_pointer_bytes(path: impl AsRef<Path>, bytes: &[u8]) -> RdbFileResult<()> {
170 let path = path.as_ref();
171 if let Some(parent) = path.parent() {
172 fs::create_dir_all(parent)?;
173 }
174 let tmp_path = crate::layout::atomic_temp_path(path);
175 {
176 let mut file = OpenOptions::new()
177 .create(true)
178 .truncate(true)
179 .write(true)
180 .open(&tmp_path)?;
181 file.write_all(bytes)?;
182 crash_inject("current_pointer_after_tmp_write");
183 file.sync_all()?;
184 crash_inject("current_pointer_after_tmp_sync");
185 }
186 fs::rename(&tmp_path, path)?;
187 crash_inject("current_pointer_after_rename");
188 if let Some(parent) = path.parent() {
189 if let Ok(dir) = File::open(parent) {
190 let _ = dir.sync_all();
191 }
192 }
193 crash_inject("current_pointer_after_dir_sync");
194 Ok(())
195}
196
197fn crash_inject(point: &str) {
198 if std::env::var(SERVERLESS_CRASH_INJECT_ENV).ok().as_deref() == Some(point) {
199 std::process::exit(173);
200 }
201}
202
203fn verify_checksum(bytes: &[u8]) -> RdbFileResult<()> {
204 let Some(checksum_offset) = bytes.len().checked_sub(CHECKSUM_LEN) else {
205 return Err(RdbFileError::InvalidOperation(
206 "serverless artifact too short".into(),
207 ));
208 };
209 let stored = u32::from_le_bytes(bytes[checksum_offset..].try_into().unwrap());
210 let computed = crc32(&bytes[..checksum_offset]);
211 if stored != computed {
212 return Err(RdbFileError::InvalidOperation(format!(
213 "serverless artifact checksum mismatch: stored {stored:#010x}, computed {computed:#010x}"
214 )));
215 }
216 Ok(())
217}
218
219fn crc32(data: &[u8]) -> u32 {
220 let mut hasher = crc32fast::Hasher::new();
221 hasher.update(data);
222 hasher.finalize()
223}
224
225fn expect_magic(bytes: &[u8], cursor: &mut usize, magic: &[u8]) -> RdbFileResult<()> {
226 let actual = take_bytes(bytes, cursor, magic.len())?;
227 if actual != magic {
228 return Err(RdbFileError::InvalidOperation(
229 "invalid serverless artifact magic".into(),
230 ));
231 }
232 Ok(())
233}
234
235fn put_u16(out: &mut Vec<u8>, value: u16) {
236 out.extend_from_slice(&value.to_le_bytes());
237}
238
239fn put_u32(out: &mut Vec<u8>, value: u32) {
240 out.extend_from_slice(&value.to_le_bytes());
241}
242
243fn put_u64(out: &mut Vec<u8>, value: u64) {
244 out.extend_from_slice(&value.to_le_bytes());
245}
246
247fn put_string(out: &mut Vec<u8>, value: &str) {
248 put_u32(out, value.len() as u32);
249 out.extend_from_slice(value.as_bytes());
250}
251
252fn put_bytes(out: &mut Vec<u8>, value: &[u8]) {
253 put_u32(out, value.len() as u32);
254 out.extend_from_slice(value);
255}
256
257fn put_content_hash(out: &mut Vec<u8>, value: ServerlessContentHash) {
258 out.extend_from_slice(&value.0);
259}
260
261fn take_bytes<'a>(bytes: &'a [u8], cursor: &mut usize, len: usize) -> RdbFileResult<&'a [u8]> {
262 let end = cursor
263 .checked_add(len)
264 .ok_or_else(|| RdbFileError::InvalidOperation("serverless cursor overflow".into()))?;
265 if end > bytes.len().saturating_sub(CHECKSUM_LEN) {
266 return Err(RdbFileError::InvalidOperation(
267 "serverless artifact truncated".into(),
268 ));
269 }
270 let value = &bytes[*cursor..end];
271 *cursor = end;
272 Ok(value)
273}
274
275fn reject_trailing_bytes(bytes: &[u8], cursor: usize) -> RdbFileResult<()> {
276 if cursor != bytes.len().saturating_sub(CHECKSUM_LEN) {
277 return Err(RdbFileError::InvalidOperation(
278 "serverless artifact has trailing bytes".into(),
279 ));
280 }
281 Ok(())
282}
283
284fn take_u8(bytes: &[u8], cursor: &mut usize) -> RdbFileResult<u8> {
285 Ok(take_bytes(bytes, cursor, 1)?[0])
286}
287
288fn take_u16(bytes: &[u8], cursor: &mut usize) -> RdbFileResult<u16> {
289 Ok(u16::from_le_bytes(
290 take_bytes(bytes, cursor, 2)?.try_into().unwrap(),
291 ))
292}
293
294fn take_u32(bytes: &[u8], cursor: &mut usize) -> RdbFileResult<u32> {
295 Ok(u32::from_le_bytes(
296 take_bytes(bytes, cursor, 4)?.try_into().unwrap(),
297 ))
298}
299
300fn take_u64(bytes: &[u8], cursor: &mut usize) -> RdbFileResult<u64> {
301 Ok(u64::from_le_bytes(
302 take_bytes(bytes, cursor, 8)?.try_into().unwrap(),
303 ))
304}
305
306fn take_string(bytes: &[u8], cursor: &mut usize) -> RdbFileResult<String> {
307 let len = take_u32(bytes, cursor)? as usize;
308 let raw = take_bytes(bytes, cursor, len)?;
309 std::str::from_utf8(raw)
310 .map(|value| value.to_string())
311 .map_err(|err| RdbFileError::InvalidOperation(format!("invalid utf-8 string: {err}")))
312}
313
314fn take_vec_bytes(bytes: &[u8], cursor: &mut usize) -> RdbFileResult<Vec<u8>> {
315 let len = take_u32(bytes, cursor)? as usize;
316 Ok(take_bytes(bytes, cursor, len)?.to_vec())
317}
318
319fn take_content_hash(bytes: &[u8], cursor: &mut usize) -> RdbFileResult<ServerlessContentHash> {
320 let raw = take_bytes(bytes, cursor, CONTENT_HASH_LEN)?;
321 let mut hash = [0u8; CONTENT_HASH_LEN];
322 hash.copy_from_slice(raw);
323 Ok(ServerlessContentHash(hash))
324}
325
326#[cfg(test)]
327mod tests {
328 use super::*;
329
330 #[test]
331 fn serverless_paths_are_generation_scoped_and_deterministic() {
332 let plan = ServerlessFilePlan::new("/tmp/reddb", "tenant-a/db", 42);
333 assert_eq!(
334 plan.manifest_path(),
335 PathBuf::from("/tmp/reddb/tenant-a/db/g00000000000000000042/manifest.redpack")
336 );
337 assert_eq!(
338 plan.boot_index_path(),
339 PathBuf::from("/tmp/reddb/tenant-a/db/g00000000000000000042/boot-index.redpack")
340 );
341 assert!(ServerlessFilePlan::is_generation_dir(Path::new(
342 "g00000000000000000042"
343 )));
344 assert!(!ServerlessFilePlan::is_generation_dir(Path::new("g42")));
345 }
346
347 #[test]
348 fn cold_start_fetches_manifest_boot_snapshot_then_wal_tail() {
349 let plan = ServerlessFilePlan::new("/tmp/reddb", "db", 7);
350 let boot = ServerlessBootPlan::cold(&plan);
351 assert_eq!(boot.required_first[0], plan.manifest_path());
352 assert_eq!(boot.required_first[1], plan.boot_index_path());
353 assert_eq!(boot.required_first[2], plan.extent_index_path());
354 assert_eq!(boot.required_first[3], plan.hot_snapshot_path());
355 assert_eq!(boot.required_first[4], plan.wal_tail_path());
356 assert_eq!(boot.lazy_after_open.len(), 3);
357 }
358
359 #[test]
360 fn manifest_round_trips_with_crc_checked_binary_codec() {
361 let mut manifest = ServerlessManifest::new("tenant/db", 11);
362 manifest.push(ServerlessManifestEntry::from_bytes(
363 ServerlessPackKind::WalTail,
364 "wal-tail.redpack",
365 b"wal tail payload",
366 ));
367 manifest.push(ServerlessManifestEntry::from_bytes(
368 ServerlessPackKind::BootIndex,
369 "boot-index.redpack",
370 b"boot index payload",
371 ));
372
373 let encoded = manifest.encode();
374 let decoded = ServerlessManifest::decode(&encoded).expect("decode manifest");
375 assert_eq!(decoded, manifest);
376 assert!(!decoded.entries[0].content_hash.is_zero());
377
378 let mut corrupt = encoded;
379 let last_payload_byte = corrupt.len() - CHECKSUM_LEN - 1;
380 corrupt[last_payload_byte] ^= 0x01;
381 let err = ServerlessManifest::decode(&corrupt).expect_err("checksum catches corruption");
382 assert!(err.to_string().contains("checksum mismatch"), "{err}");
383 }
384
385 #[test]
386 fn boot_index_round_trips_and_preserves_coldstart_order() {
387 let plan = ServerlessFilePlan::new("/tmp/reddb", "db", 9);
388 let index = ServerlessBootIndex::from_plan(&plan);
389
390 assert_eq!(
391 index.required_first(),
392 vec![
393 PathBuf::from("manifest.redpack"),
394 PathBuf::from("boot-index.redpack"),
395 PathBuf::from("extent-index.redpack"),
396 PathBuf::from("hot-snapshot.redpack"),
397 PathBuf::from("wal-tail.redpack"),
398 ]
399 );
400 assert_eq!(
401 index.lazy_after_open(),
402 vec![
403 PathBuf::from("collection-data.redpack"),
404 PathBuf::from("secondary-index.redpack"),
405 PathBuf::from("cold-archive.redpack"),
406 ]
407 );
408
409 let decoded = ServerlessBootIndex::decode(&index.encode()).expect("decode boot index");
410 assert_eq!(decoded, index);
411 }
412
413 #[test]
414 fn collection_data_extent_ref_uses_canonical_pack_path() {
415 let plan = ServerlessFilePlan::new("/tmp/reddb", "db", 9);
416 let payload = b"collection snapshot bytes";
417 let extent = plan
418 .collection_data_extent_ref("events", 12, payload, true)
419 .expect("extent ref");
420
421 assert_eq!(extent.collection, "events");
422 assert_eq!(
423 extent.relative_path,
424 PathBuf::from("collection-data.redpack")
425 );
426 assert_eq!(extent.offset, 12);
427 assert_eq!(extent.bytes, payload.len() as u64);
428 assert!(extent.hot);
429 }
430
431 #[test]
432 fn manifest_rejects_trailing_payload_bytes() {
433 let manifest = ServerlessManifest::new("tenant/db", 11);
434 let mut encoded = manifest.encode();
435 encoded.truncate(encoded.len() - CHECKSUM_LEN);
436 encoded.push(0xAA);
437 let checksum = crc32(&encoded);
438 put_u32(&mut encoded, checksum);
439
440 let err = ServerlessManifest::decode(&encoded).expect_err("trailing bytes rejected");
441 assert!(err.to_string().contains("trailing bytes"), "{err}");
442 }
443
444 #[test]
445 fn generation_pointer_round_trips_and_points_to_immutable_manifest() {
446 let plan = ServerlessFilePlan::new("/tmp/reddb", "tenant/db", 19);
447 let mut manifest = ServerlessManifest::new("tenant/db", 19);
448 manifest.push(ServerlessManifestEntry::from_bytes(
449 ServerlessPackKind::HotSnapshot,
450 "hot-snapshot.redpack",
451 b"snapshot",
452 ));
453
454 let pointer = ServerlessGenerationPointer::from_manifest(&plan, &manifest);
455 assert_eq!(pointer.generation, 19);
456 assert_eq!(
457 pointer.manifest_relative_path,
458 PathBuf::from("g00000000000000000019/manifest.redpack")
459 );
460 assert!(!pointer.manifest_content_hash.is_zero());
461
462 let decoded =
463 ServerlessGenerationPointer::decode(&pointer.encode()).expect("decode pointer");
464 assert_eq!(decoded, pointer);
465 }
466
467 #[test]
468 fn extent_index_finds_key_ranges_and_hot_prefetch_paths() {
469 let mut index = ServerlessExtentIndex::new(21);
470 index.push(
471 ServerlessExtentRef::new(
472 "orders",
473 b"a".to_vec(),
474 b"m".to_vec(),
475 "orders-000.redpack",
476 0,
477 b"orders-a-m",
478 true,
479 )
480 .expect("extent"),
481 );
482 index.push(
483 ServerlessExtentRef::new(
484 "orders",
485 b"m".to_vec(),
486 b"z".to_vec(),
487 "orders-001.redpack",
488 0,
489 b"orders-m-z",
490 false,
491 )
492 .expect("extent"),
493 );
494
495 let matches = index.extents_for_key("orders", b"b");
496 assert_eq!(matches.len(), 1);
497 assert_eq!(
498 matches[0].relative_path,
499 PathBuf::from("orders-000.redpack")
500 );
501 assert_eq!(
502 index.hot_prefetch_paths(),
503 vec![PathBuf::from("orders-000.redpack")]
504 );
505
506 let decoded = ServerlessExtentIndex::decode(&index.encode()).expect("decode extent index");
507 assert_eq!(decoded, index);
508 }
509
510 #[test]
511 fn hydration_plan_uses_only_matching_extents() {
512 let mut index = ServerlessExtentIndex::new(22);
513 index.push(
514 ServerlessExtentRef::new(
515 "orders",
516 b"a".to_vec(),
517 b"m".to_vec(),
518 "orders-000.redpack",
519 64,
520 b"orders-a-m",
521 true,
522 )
523 .expect("extent"),
524 );
525 index.push(
526 ServerlessExtentRef::new(
527 "orders",
528 b"m".to_vec(),
529 b"z".to_vec(),
530 "orders-001.redpack",
531 128,
532 b"orders-m-z",
533 false,
534 )
535 .expect("extent"),
536 );
537
538 let plan = index.hydration_plan_for_key("orders", b"n");
539 assert_eq!(plan.requests.len(), 1);
540 assert_eq!(
541 plan.requests[0].relative_path,
542 PathBuf::from("orders-001.redpack")
543 );
544 assert_eq!(plan.requests[0].offset, 128);
545 assert_eq!(plan.total_bytes(), b"orders-m-z".len() as u64);
546
547 let hot = index.hot_hydration_plan();
548 assert_eq!(hot.requests.len(), 1);
549 assert_eq!(
550 hot.requests[0].relative_path,
551 PathBuf::from("orders-000.redpack")
552 );
553 }
554
555 #[test]
556 fn hydration_plan_for_range_uses_overlapping_extents() {
557 let mut index = ServerlessExtentIndex::new(27);
558 index.push(
559 ServerlessExtentRef::new(
560 "orders",
561 b"a".to_vec(),
562 b"f".to_vec(),
563 "orders-000.redpack",
564 0,
565 b"orders-a-f",
566 true,
567 )
568 .expect("extent"),
569 );
570 index.push(
571 ServerlessExtentRef::new(
572 "orders",
573 b"f".to_vec(),
574 b"p".to_vec(),
575 "orders-001.redpack",
576 64,
577 b"orders-f-p",
578 false,
579 )
580 .expect("extent"),
581 );
582 index.push(
583 ServerlessExtentRef::new(
584 "orders",
585 b"p".to_vec(),
586 b"z".to_vec(),
587 "orders-002.redpack",
588 128,
589 b"orders-p-z",
590 false,
591 )
592 .expect("extent"),
593 );
594 index.push(
595 ServerlessExtentRef::new(
596 "users",
597 b"a".to_vec(),
598 b"z".to_vec(),
599 "users-000.redpack",
600 0,
601 b"users-a-z",
602 false,
603 )
604 .expect("extent"),
605 );
606
607 let plan = index
608 .hydration_plan_for_range("orders", b"e", b"q")
609 .expect("range plan");
610 assert_eq!(plan.requests.len(), 3);
611 assert_eq!(
612 plan.requests
613 .iter()
614 .map(|request| request.relative_path.clone())
615 .collect::<Vec<_>>(),
616 vec![
617 PathBuf::from("orders-000.redpack"),
618 PathBuf::from("orders-001.redpack"),
619 PathBuf::from("orders-002.redpack"),
620 ]
621 );
622
623 let err = index
624 .hydration_plan_for_range("orders", b"q", b"e")
625 .expect_err("invalid range rejected");
626 assert!(err.to_string().contains("range_start"), "{err}");
627 }
628
629 #[test]
630 fn secondary_index_round_trips_and_builds_collection_hydration_plan() {
631 let mut extent_index = ServerlessExtentIndex::new(29);
632 extent_index.push(
633 ServerlessExtentRef::new(
634 "orders",
635 b"a".to_vec(),
636 b"m".to_vec(),
637 "collection-data.redpack",
638 0,
639 b"orders-left",
640 true,
641 )
642 .expect("orders left"),
643 );
644 extent_index.push(
645 ServerlessExtentRef::new(
646 "orders",
647 b"m".to_vec(),
648 b"z".to_vec(),
649 "collection-data.redpack",
650 11,
651 b"orders-right",
652 false,
653 )
654 .expect("orders right"),
655 );
656 extent_index.push(
657 ServerlessExtentRef::new(
658 "users",
659 b"a".to_vec(),
660 b"z".to_vec(),
661 "collection-data.redpack",
662 23,
663 b"users",
664 false,
665 )
666 .expect("users"),
667 );
668
669 let secondary = ServerlessSecondaryIndex::from_extent_index(&extent_index);
670 let decoded =
671 ServerlessSecondaryIndex::decode(&secondary.encode()).expect("decode secondary index");
672 assert_eq!(decoded, secondary);
673
674 let hydration = decoded.hydration_plan_for_collection("orders");
675 assert_eq!(hydration.generation, 29);
676 assert_eq!(hydration.requests.len(), 2);
677 assert_eq!(hydration.total_bytes(), 23);
678 assert!(hydration.requests[0].content_hash != ServerlessContentHash::ZERO);
679 }
680
681 #[test]
682 fn hydrate_local_plan_reads_only_requested_byte_ranges() {
683 let root = temp_root("serverless-hydrate-range");
684 let plan = ServerlessFilePlan::new(&root, "db", 23);
685 let collection_payload = b"aaaabbbbcccc";
686 let mut index = ServerlessExtentIndex::new(23);
687 index.push(
688 ServerlessExtentRef::new(
689 "orders",
690 b"a".to_vec(),
691 b"m".to_vec(),
692 "collection-data.redpack",
693 4,
694 b"bbbb",
695 true,
696 )
697 .expect("extent"),
698 );
699 plan.publish_core_generation(&index, collection_payload, b"secondary")
700 .expect("publish generation");
701
702 let hydration = index.hydration_plan_for_key("orders", b"b");
703 let hydrated = plan
704 .hydrate_local_plan(&hydration)
705 .expect("hydrate local range");
706 assert_eq!(hydrated.len(), 1);
707 assert_eq!(hydrated[0].payload, b"bbbb");
708 assert_eq!(hydrated[0].request.offset, 4);
709 assert_eq!(hydrated[0].request.bytes, 4);
710
711 let _ = std::fs::remove_dir_all(root);
712 }
713
714 #[test]
715 fn hydrate_local_plan_rejects_corrupt_or_out_of_bounds_ranges() {
716 let root = temp_root("serverless-hydrate-corrupt");
717 let plan = ServerlessFilePlan::new(&root, "db", 24);
718 let mut index = ServerlessExtentIndex::new(24);
719 index.push(
720 ServerlessExtentRef::new(
721 "orders",
722 b"a".to_vec(),
723 b"m".to_vec(),
724 "collection-data.redpack",
725 4,
726 b"bbbb",
727 true,
728 )
729 .expect("extent"),
730 );
731 plan.publish_core_generation(&index, b"aaaabbbbcccc", b"secondary")
732 .expect("publish generation");
733 std::fs::write(plan.collection_data_path(), b"aaaaBBBBcccc").expect("corrupt pack");
734
735 let hydration = index.hydration_plan_for_key("orders", b"b");
736 let err = plan
737 .hydrate_local_plan(&hydration)
738 .expect_err("corrupt range rejected");
739 assert!(err.to_string().contains("checksum mismatch"), "{err}");
740
741 let mut out_of_bounds = hydration.clone();
742 out_of_bounds.requests[0].offset = 11;
743 let err = plan
744 .hydrate_local_plan(&out_of_bounds)
745 .expect_err("out of bounds rejected");
746 assert!(err.to_string().contains("exceeds pack"), "{err}");
747
748 let _ = std::fs::remove_dir_all(root);
749 }
750
751 #[test]
752 fn prefetch_hot_extents_hydrates_only_hot_ranges() {
753 let root = temp_root("serverless-hot-prefetch");
754 let plan = ServerlessFilePlan::new(&root, "db", 25);
755 let mut index = ServerlessExtentIndex::new(25);
756 index.push(
757 ServerlessExtentRef::new(
758 "orders",
759 b"a".to_vec(),
760 b"m".to_vec(),
761 "collection-data.redpack",
762 0,
763 b"hot!",
764 true,
765 )
766 .expect("hot extent"),
767 );
768 index.push(
769 ServerlessExtentRef::new(
770 "orders",
771 b"m".to_vec(),
772 b"z".to_vec(),
773 "collection-data.redpack",
774 4,
775 b"cold",
776 false,
777 )
778 .expect("cold extent"),
779 );
780 plan.publish_core_generation(&index, b"hot!cold", b"secondary")
781 .expect("publish generation");
782
783 let hydrated = plan.prefetch_hot_extents(&index).expect("prefetch hot");
784 assert_eq!(hydrated.len(), 1);
785 assert_eq!(hydrated[0].payload, b"hot!");
786
787 let _ = std::fs::remove_dir_all(root);
788 }
789
790 #[test]
791 fn prefetch_hot_extents_cached_populates_only_hot_cache_entries() {
792 let root = temp_root("serverless-hot-prefetch-cache");
793 let plan = ServerlessFilePlan::new(&root, "db", 29);
794 let cache = ServerlessLocalCache::new(root.join("cache"), 29);
795 let mut index = ServerlessExtentIndex::new(29);
796 index.push(
797 ServerlessExtentRef::new(
798 "orders",
799 b"a".to_vec(),
800 b"m".to_vec(),
801 "collection-data.redpack",
802 0,
803 b"hot!",
804 true,
805 )
806 .expect("hot extent"),
807 );
808 index.push(
809 ServerlessExtentRef::new(
810 "orders",
811 b"m".to_vec(),
812 b"z".to_vec(),
813 "collection-data.redpack",
814 4,
815 b"cold",
816 false,
817 )
818 .expect("cold extent"),
819 );
820 plan.publish_core_generation(&index, b"hot!cold", b"secondary")
821 .expect("publish generation");
822
823 let hot_request = index.hydration_plan_for_key("orders", b"b").requests[0].clone();
824 let cold_request = index.hydration_plan_for_key("orders", b"n").requests[0].clone();
825
826 let hydrated = plan
827 .prefetch_hot_extents_cached(&index, &cache)
828 .expect("prefetch hot into cache");
829 assert_eq!(hydrated.len(), 1);
830 assert_eq!(hydrated[0].payload, b"hot!");
831 assert_eq!(
832 std::fs::read(cache.path_for_request(&hot_request)).expect("read hot cache"),
833 b"hot!"
834 );
835 assert!(
836 !cache.path_for_request(&cold_request).exists(),
837 "cold extent should not be prefetched into cache"
838 );
839
840 let _ = std::fs::remove_dir_all(root);
841 }
842
843 #[test]
844 fn hydrate_local_request_cached_validates_and_repairs_corrupt_cache() {
845 let root = temp_root("serverless-hydrate-cache");
846 let plan = ServerlessFilePlan::new(&root, "db", 26);
847 let cache = ServerlessLocalCache::new(root.join("cache"), 26);
848 let mut index = ServerlessExtentIndex::new(26);
849 index.push(
850 ServerlessExtentRef::new(
851 "orders",
852 b"a".to_vec(),
853 b"z".to_vec(),
854 "collection-data.redpack",
855 4,
856 b"bbbb",
857 true,
858 )
859 .expect("extent"),
860 );
861 plan.publish_core_generation(&index, b"aaaabbbbcccc", b"secondary")
862 .expect("publish generation");
863 let request = index.hydration_plan_for_key("orders", b"m").requests[0].clone();
864
865 let first = plan
866 .hydrate_local_request_cached(&request, &cache)
867 .expect("hydrate and cache");
868 assert_eq!(first.payload, b"bbbb");
869 let cache_path = cache.path_for_request(&request);
870 assert_eq!(std::fs::read(&cache_path).expect("read cache"), b"bbbb");
871
872 std::fs::write(&cache_path, b"xxxx").expect("corrupt cache");
873 let repaired = plan
874 .hydrate_local_request_cached(&request, &cache)
875 .expect("repair corrupt cache from pack");
876 assert_eq!(repaired.payload, b"bbbb");
877 assert_eq!(
878 std::fs::read(&cache_path).expect("read repaired cache"),
879 b"bbbb"
880 );
881
882 let _ = std::fs::remove_dir_all(root);
883 }
884
885 #[test]
886 fn hydrate_local_plan_cached_populates_multiple_range_entries() {
887 let root = temp_root("serverless-hydrate-plan-cache");
888 let plan = ServerlessFilePlan::new(&root, "db", 28);
889 let cache = ServerlessLocalCache::new(root.join("cache"), 28);
890 let mut index = ServerlessExtentIndex::new(28);
891 index.push(
892 ServerlessExtentRef::new(
893 "orders",
894 b"a".to_vec(),
895 b"m".to_vec(),
896 "collection-data.redpack",
897 0,
898 b"left",
899 true,
900 )
901 .expect("left extent"),
902 );
903 index.push(
904 ServerlessExtentRef::new(
905 "orders",
906 b"m".to_vec(),
907 b"z".to_vec(),
908 "collection-data.redpack",
909 4,
910 b"right",
911 false,
912 )
913 .expect("right extent"),
914 );
915 plan.publish_core_generation(&index, b"leftright", b"secondary")
916 .expect("publish generation");
917
918 let hydration = index
919 .hydration_plan_for_range("orders", b"b", b"y")
920 .expect("range hydration plan");
921 let hydrated = plan
922 .hydrate_local_plan_cached(&hydration, &cache)
923 .expect("hydrate cached plan");
924 assert_eq!(hydrated.len(), 2);
925 assert_eq!(hydrated[0].payload, b"left");
926 assert_eq!(hydrated[1].payload, b"right");
927 for range in hydrated {
928 assert!(cache.path_for_request(&range.request).exists());
929 }
930
931 let _ = std::fs::remove_dir_all(root);
932 }
933
934 #[test]
935 fn cached_hydration_enforces_max_hot_bytes_after_writes() {
936 let root = temp_root("serverless-hydrate-cache-budget");
937 let plan =
938 ServerlessFilePlan::new(&root, "db", 29).with_cache_policy(ServerlessCachePolicy {
939 max_hot_bytes: 5,
940 ..ServerlessCachePolicy::default()
941 });
942 let cache = ServerlessLocalCache::new(root.join("cache"), 29);
943 let mut index = ServerlessExtentIndex::new(29);
944 index.push(
945 ServerlessExtentRef::new(
946 "orders",
947 b"a".to_vec(),
948 b"m".to_vec(),
949 "collection-data.redpack",
950 0,
951 b"left",
952 true,
953 )
954 .expect("left extent"),
955 );
956 index.push(
957 ServerlessExtentRef::new(
958 "orders",
959 b"m".to_vec(),
960 b"z".to_vec(),
961 "collection-data.redpack",
962 4,
963 b"right",
964 true,
965 )
966 .expect("right extent"),
967 );
968 plan.publish_core_generation(&index, b"leftright", b"secondary")
969 .expect("publish generation");
970
971 let hydration = index
972 .hydration_plan_for_range("orders", b"a", b"z")
973 .expect("range hydration plan");
974 let hydrated = plan
975 .hydrate_local_plan_cached(&hydration, &cache)
976 .expect("hydrate cached plan");
977 assert_eq!(hydrated.len(), 2);
978 assert_eq!(hydrated[0].payload, b"left");
979 assert_eq!(hydrated[1].payload, b"right");
980
981 let entries = cache.cached_entries().expect("cache entries");
982 let cached_bytes: u64 = entries.iter().map(|entry| entry.bytes).sum();
983 assert!(
984 cached_bytes <= 5,
985 "cache should stay within max_hot_bytes, got {cached_bytes}"
986 );
987
988 let _ = std::fs::remove_dir_all(root);
989 }
990
991 #[test]
992 fn cache_eviction_prefers_cold_old_entries() {
993 let entries = vec![
994 ServerlessCacheEntry::new("cold-old.redpack", 100, false, 10),
995 ServerlessCacheEntry::new("hot-old.redpack", 100, true, 1),
996 ServerlessCacheEntry::new("cold-new.redpack", 100, false, 20),
997 ];
998
999 let plan = ServerlessCacheEvictionPlan::plan(&entries, 150);
1000 assert_eq!(
1001 plan.evict,
1002 vec![
1003 PathBuf::from("cold-old.redpack"),
1004 PathBuf::from("cold-new.redpack"),
1005 ]
1006 );
1007 assert_eq!(plan.bytes_after_eviction, 100);
1008 }
1009
1010 #[test]
1011 fn extent_index_writes_and_reads_from_disk() {
1012 let root = temp_root("serverless-extent-index");
1013 let plan = ServerlessFilePlan::new(&root, "db", 5);
1014 let mut index = ServerlessExtentIndex::new(5);
1015 index.push(
1016 ServerlessExtentRef::new(
1017 "events",
1018 b"2026-01".to_vec(),
1019 b"2026-02".to_vec(),
1020 "events-2026-01.redpack",
1021 128,
1022 b"payload",
1023 true,
1024 )
1025 .expect("extent"),
1026 );
1027
1028 index
1029 .write_to_path(plan.extent_index_path())
1030 .expect("write extent index");
1031 assert_eq!(
1032 ServerlessExtentIndex::read_from_path(plan.extent_index_path())
1033 .expect("read extent index"),
1034 index
1035 );
1036 let _ = std::fs::remove_dir_all(root);
1037 }
1038
1039 #[test]
1040 fn manifest_and_boot_index_write_and_read_from_disk() {
1041 let root = temp_root("serverless-manifest");
1042 let plan = ServerlessFilePlan::new(&root, "db", 1);
1043
1044 let mut manifest = ServerlessManifest::new("db", 1);
1045 manifest.push(ServerlessManifestEntry::new(
1046 ServerlessPackKind::Manifest,
1047 "manifest.redpack",
1048 128,
1049 0xCAFE_BABE,
1050 ));
1051 manifest
1052 .write_to_path(plan.manifest_path())
1053 .expect("write manifest");
1054
1055 let boot_index = ServerlessBootIndex::from_plan(&plan);
1056 boot_index
1057 .write_to_path(plan.boot_index_path())
1058 .expect("write boot index");
1059
1060 assert_eq!(
1061 ServerlessManifest::read_from_path(plan.manifest_path()).expect("read manifest"),
1062 manifest
1063 );
1064 assert_eq!(
1065 ServerlessBootIndex::read_from_path(plan.boot_index_path()).expect("read boot index"),
1066 boot_index
1067 );
1068
1069 let _ = std::fs::remove_dir_all(root);
1070 }
1071
1072 #[test]
1073 fn publish_core_generation_writes_required_packs_before_current_pointer() {
1074 let root = temp_root("serverless-publish-core");
1075 let plan = ServerlessFilePlan::new(&root, "db", 12);
1076 let mut extent_index = ServerlessExtentIndex::new(12);
1077 extent_index.push(
1078 ServerlessExtentRef::new(
1079 "events",
1080 b"a".to_vec(),
1081 b"z".to_vec(),
1082 "collection-data.redpack",
1083 0,
1084 b"collection-bytes",
1085 true,
1086 )
1087 .expect("extent"),
1088 );
1089
1090 let pointer = plan
1091 .publish_core_generation(&extent_index, b"collection-bytes", b"secondary-bytes")
1092 .expect("publish core generation");
1093 assert_eq!(pointer.generation, 12);
1094 assert_eq!(plan.read_current_pointer().expect("read current"), pointer);
1095 assert_eq!(
1096 plan.read_current_pointer_verified()
1097 .expect("read verified current"),
1098 pointer
1099 );
1100 assert!(plan.boot_index_path().exists());
1101 assert!(plan.extent_index_path().exists());
1102 assert!(plan.collection_data_path().exists());
1103 assert!(plan.secondary_index_path().exists());
1104
1105 let manifest =
1106 ServerlessManifest::read_from_path(plan.manifest_path()).expect("read manifest");
1107 plan.validate_complete_generation(&manifest)
1108 .expect("complete generation validates");
1109 assert!(manifest
1110 .entries
1111 .iter()
1112 .any(|entry| entry.kind == ServerlessPackKind::ExtentIndex));
1113 assert!(manifest
1114 .entries
1115 .iter()
1116 .any(|entry| entry.kind == ServerlessPackKind::CollectionData));
1117 assert!(manifest
1118 .entries
1119 .iter()
1120 .any(|entry| entry.kind == ServerlessPackKind::SecondaryIndex));
1121
1122 let _ = std::fs::remove_dir_all(root);
1123 }
1124
1125 #[test]
1126 fn verified_current_pointer_rejects_missing_or_corrupt_generation() {
1127 let root = temp_root("serverless-current-verified");
1128 let plan = ServerlessFilePlan::new(&root, "db", 14);
1129 let mut extent_index = ServerlessExtentIndex::new(14);
1130 extent_index.push(
1131 ServerlessExtentRef::new(
1132 "events",
1133 b"a".to_vec(),
1134 b"z".to_vec(),
1135 "collection-data.redpack",
1136 0,
1137 b"collection-bytes",
1138 true,
1139 )
1140 .expect("extent"),
1141 );
1142 let pointer = plan
1143 .publish_core_generation(&extent_index, b"collection-bytes", b"secondary-bytes")
1144 .expect("publish complete generation");
1145 assert_eq!(
1146 plan.read_current_pointer_verified()
1147 .expect("verified pointer before corruption"),
1148 pointer
1149 );
1150
1151 std::fs::remove_file(plan.collection_data_path()).expect("remove required pack");
1152 let err = plan
1153 .read_current_pointer_verified()
1154 .expect_err("verified pointer must reject missing required pack");
1155 assert!(
1156 err.to_string().contains("No such file") || err.to_string().contains("not found"),
1157 "{err}"
1158 );
1159
1160 let _ = std::fs::remove_dir_all(root);
1161 }
1162
1163 #[test]
1164 fn verified_current_pointer_rejects_manifest_hash_mismatch() {
1165 let root = temp_root("serverless-current-manifest-hash");
1166 let plan = ServerlessFilePlan::new(&root, "db", 15);
1167 let mut extent_index = ServerlessExtentIndex::new(15);
1168 extent_index.push(
1169 ServerlessExtentRef::new(
1170 "events",
1171 b"a".to_vec(),
1172 b"z".to_vec(),
1173 "collection-data.redpack",
1174 0,
1175 b"collection-bytes",
1176 true,
1177 )
1178 .expect("extent"),
1179 );
1180 plan.publish_core_generation(&extent_index, b"collection-bytes", b"secondary-bytes")
1181 .expect("publish complete generation");
1182
1183 std::fs::write(plan.manifest_path(), b"corrupt-manifest").expect("corrupt manifest");
1184 let err = plan
1185 .read_current_pointer_verified()
1186 .expect_err("verified pointer must reject manifest hash mismatch");
1187 assert!(
1188 err.to_string().contains("manifest")
1189 && (err.to_string().contains("bytes")
1190 || err.to_string().contains("checksum")
1191 || err.to_string().contains("hash")),
1192 "{err}"
1193 );
1194
1195 let _ = std::fs::remove_dir_all(root);
1196 }
1197
1198 #[test]
1199 fn publish_pointer_rejects_incomplete_or_corrupt_generation() {
1200 let root = temp_root("serverless-publish-rejects");
1201 let plan = ServerlessFilePlan::new(&root, "db", 13);
1202 let mut manifest = ServerlessManifest::new("db", 13);
1203 manifest.push(ServerlessManifestEntry::from_bytes(
1204 ServerlessPackKind::BootIndex,
1205 "boot-index.redpack",
1206 b"boot",
1207 ));
1208 manifest
1209 .write_to_path(plan.manifest_path())
1210 .expect("write incomplete manifest");
1211
1212 let err = plan
1213 .publish_generation_pointer(&manifest)
1214 .expect_err("incomplete generation rejected");
1215 assert!(err.to_string().contains("missing required"), "{err}");
1216 assert!(!plan.current_pointer_path().exists());
1217
1218 let mut extent_index = ServerlessExtentIndex::new(13);
1219 extent_index.push(
1220 ServerlessExtentRef::new(
1221 "events",
1222 b"a".to_vec(),
1223 b"z".to_vec(),
1224 "collection-data.redpack",
1225 0,
1226 b"collection-bytes",
1227 true,
1228 )
1229 .expect("extent"),
1230 );
1231 plan.publish_core_generation(&extent_index, b"collection-bytes", b"secondary-bytes")
1232 .expect("publish complete generation");
1233 std::fs::write(plan.collection_data_path(), b"collection-ByTes")
1234 .expect("corrupt collection pack");
1235 let manifest =
1236 ServerlessManifest::read_from_path(plan.manifest_path()).expect("read manifest");
1237 let err = plan
1238 .publish_generation_pointer(&manifest)
1239 .expect_err("corrupt generation rejected");
1240 assert!(err.to_string().contains("checksum mismatch"), "{err}");
1241
1242 let _ = std::fs::remove_dir_all(root);
1243 }
1244
1245 #[test]
1246 fn incomplete_generation_publish_preserves_existing_current_pointer() {
1247 let root = temp_root("serverless-current-preserved");
1248 let first = ServerlessFilePlan::new(&root, "db", 1);
1249 let mut first_index = ServerlessExtentIndex::new(1);
1250 first_index.push(
1251 ServerlessExtentRef::new(
1252 "events",
1253 b"a".to_vec(),
1254 b"z".to_vec(),
1255 "collection-data.redpack",
1256 0,
1257 b"first",
1258 true,
1259 )
1260 .expect("extent"),
1261 );
1262 let first_pointer = first
1263 .publish_core_generation(&first_index, b"first", b"secondary")
1264 .expect("publish first generation");
1265
1266 let second = ServerlessFilePlan::new(&root, "db", 2);
1267 let mut incomplete = ServerlessManifest::new("db", 2);
1268 incomplete.push(ServerlessManifestEntry::from_bytes(
1269 ServerlessPackKind::BootIndex,
1270 "boot-index.redpack",
1271 b"boot",
1272 ));
1273 incomplete
1274 .write_to_path(second.manifest_path())
1275 .expect("write incomplete manifest");
1276 let err = second
1277 .publish_generation_pointer(&incomplete)
1278 .expect_err("incomplete generation rejected");
1279 assert!(err.to_string().contains("missing required"), "{err}");
1280
1281 assert_eq!(
1282 first.read_current_pointer().expect("read current"),
1283 first_pointer
1284 );
1285
1286 let _ = std::fs::remove_dir_all(root);
1287 }
1288
1289 #[test]
1290 fn file_plan_derives_cache_generation_and_hot_boot_paths() {
1291 let plan = ServerlessFilePlan::for_data_path("/tmp/reddb/main.rdb", 41);
1292 assert_eq!(plan.root, PathBuf::from("/tmp/reddb/main.serverless"));
1293 assert_eq!(plan.namespace, "main");
1294
1295 let next = plan
1296 .for_generation(42)
1297 .with_cache_policy(ServerlessCachePolicy {
1298 keep_boot_index_local: true,
1299 keep_hot_snapshot_local: true,
1300 max_hot_bytes: 4096,
1301 });
1302 assert_eq!(next.root, plan.root);
1303 assert_eq!(next.namespace, plan.namespace);
1304 assert_eq!(next.generation, 42);
1305
1306 let cache = next.local_cache();
1307 assert_eq!(
1308 cache.root,
1309 PathBuf::from("/tmp/reddb/main.serverless/main/cache")
1310 );
1311 assert_eq!(cache.generation, 42);
1312
1313 let hot = ServerlessBootPlan::hot(&next);
1314 assert_eq!(
1315 hot.required_first,
1316 vec![
1317 next.boot_index_path(),
1318 next.hot_snapshot_path(),
1319 next.wal_tail_path(),
1320 ]
1321 );
1322 assert_eq!(
1323 hot.lazy_after_open,
1324 vec![
1325 next.manifest_path(),
1326 next.collection_data_path(),
1327 next.secondary_index_path(),
1328 ]
1329 );
1330 }
1331
1332 #[test]
1333 fn publish_generation_pointer_rejects_manifest_identity_mismatch() {
1334 let root = temp_root("serverless-pointer-identity");
1335 let plan = ServerlessFilePlan::new(&root, "db", 10);
1336
1337 let wrong_namespace = ServerlessManifest::new("other", 10);
1338 let err = plan
1339 .publish_generation_pointer(&wrong_namespace)
1340 .expect_err("namespace mismatch rejected before publish");
1341 assert!(err.to_string().contains("namespace"), "{err}");
1342
1343 let wrong_generation = ServerlessManifest::new("db", 11);
1344 let err = plan
1345 .publish_generation_pointer(&wrong_generation)
1346 .expect_err("generation mismatch rejected before publish");
1347 assert!(err.to_string().contains("generation"), "{err}");
1348
1349 assert!(!plan.current_pointer_path().exists());
1350 let _ = std::fs::remove_dir_all(root);
1351 }
1352
1353 #[test]
1354 fn verified_current_pointer_rejects_pointer_shape_and_manifest_identity() {
1355 let root = temp_root("serverless-current-pointer-shape");
1356 let plan = ServerlessFilePlan::new(&root, "db", 10);
1357 let manifest = ServerlessManifest::new("db", 10);
1358 let manifest_bytes = manifest.encode();
1359
1360 let namespace_mismatch = ServerlessGenerationPointer {
1361 namespace: "other".to_string(),
1362 generation: 10,
1363 manifest_relative_path: PathBuf::from("g00000000000000000010/manifest.redpack"),
1364 manifest_bytes: manifest_bytes.len() as u64,
1365 manifest_checksum: crc32(&manifest_bytes),
1366 manifest_content_hash: ServerlessContentHash::from_bytes(&manifest_bytes),
1367 };
1368 namespace_mismatch
1369 .write_to_path(plan.current_pointer_path())
1370 .expect("write namespace mismatch pointer");
1371 let err = plan
1372 .read_current_pointer_verified()
1373 .expect_err("pointer namespace mismatch rejected");
1374 assert!(err.to_string().contains("namespace"), "{err}");
1375
1376 let path_mismatch = ServerlessGenerationPointer {
1377 namespace: "db".to_string(),
1378 generation: 10,
1379 manifest_relative_path: PathBuf::from("g00000000000000000010/boot-index.redpack"),
1380 manifest_bytes: manifest_bytes.len() as u64,
1381 manifest_checksum: crc32(&manifest_bytes),
1382 manifest_content_hash: ServerlessContentHash::from_bytes(&manifest_bytes),
1383 };
1384 path_mismatch
1385 .write_to_path(plan.current_pointer_path())
1386 .expect("write path mismatch pointer");
1387 let err = plan
1388 .read_current_pointer_verified()
1389 .expect_err("pointer manifest path mismatch rejected");
1390 assert!(err.to_string().contains("manifest path"), "{err}");
1391
1392 let wrong_manifest_namespace = ServerlessManifest::new("other", 10);
1393 wrong_manifest_namespace
1394 .write_to_path(plan.manifest_path())
1395 .expect("write namespace mismatched manifest");
1396 let pointer = ServerlessGenerationPointer::from_manifest(&plan, &wrong_manifest_namespace);
1397 pointer
1398 .write_to_path(plan.current_pointer_path())
1399 .expect("write pointer");
1400 let err = plan
1401 .read_current_pointer_verified()
1402 .expect_err("manifest namespace mismatch rejected");
1403 assert!(err.to_string().contains("manifest namespace"), "{err}");
1404
1405 let wrong_generation_manifest = ServerlessManifest::new("db", 11);
1406 let wrong_generation_bytes = wrong_generation_manifest.encode();
1407 std::fs::write(plan.manifest_path(), &wrong_generation_bytes).expect("write manifest");
1408 let generation_mismatch = ServerlessGenerationPointer {
1409 namespace: "db".to_string(),
1410 generation: 10,
1411 manifest_relative_path: PathBuf::from("g00000000000000000010/manifest.redpack"),
1412 manifest_bytes: wrong_generation_bytes.len() as u64,
1413 manifest_checksum: crc32(&wrong_generation_bytes),
1414 manifest_content_hash: ServerlessContentHash::from_bytes(&wrong_generation_bytes),
1415 };
1416 generation_mismatch
1417 .write_to_path(plan.current_pointer_path())
1418 .expect("write generation mismatch pointer");
1419 let err = plan
1420 .read_current_pointer_verified()
1421 .expect_err("manifest generation mismatch rejected");
1422 assert!(err.to_string().contains("manifest generation"), "{err}");
1423
1424 let _ = std::fs::remove_dir_all(root);
1425 }
1426
1427 #[test]
1428 fn validate_complete_generation_rejects_pack_and_manifest_metadata_mismatch() {
1429 let root = temp_root("serverless-validate-metadata");
1430 let plan = ServerlessFilePlan::new(&root, "db", 16);
1431 let mut extent_index = ServerlessExtentIndex::new(16);
1432 extent_index.push(
1433 ServerlessExtentRef::new(
1434 "events",
1435 b"a".to_vec(),
1436 b"z".to_vec(),
1437 "collection-data.redpack",
1438 0,
1439 b"collection-bytes",
1440 true,
1441 )
1442 .expect("extent"),
1443 );
1444 plan.publish_core_generation(&extent_index, b"collection-bytes", b"secondary-bytes")
1445 .expect("publish complete generation");
1446 let manifest =
1447 ServerlessManifest::read_from_path(plan.manifest_path()).expect("read manifest");
1448
1449 let mut wrong_bytes = manifest.clone();
1450 wrong_bytes.entries[0].bytes += 1;
1451 let err = plan
1452 .validate_complete_generation(&wrong_bytes)
1453 .expect_err("pack byte length mismatch rejected");
1454 assert!(err.to_string().contains("expected"), "{err}");
1455
1456 let mut wrong_hash = manifest.clone();
1457 wrong_hash.entries[0].content_hash = ServerlessContentHash([1; CONTENT_HASH_LEN]);
1458 let err = plan
1459 .validate_complete_generation(&wrong_hash)
1460 .expect_err("pack content hash mismatch rejected");
1461 assert!(err.to_string().contains("content hash mismatch"), "{err}");
1462
1463 let mut manifest_mismatch = manifest.clone();
1464 manifest_mismatch.namespace = "other".to_string();
1465 let err = plan
1466 .validate_complete_generation(&manifest_mismatch)
1467 .expect_err("manifest-on-disk mismatch rejected");
1468 assert!(err.to_string().contains("manifest on disk"), "{err}");
1469
1470 let _ = std::fs::remove_dir_all(root);
1471 }
1472
1473 #[test]
1474 fn writer_lease_json_round_trips_and_preserves_fencing_token() {
1475 let lease = ServerlessWriterLease {
1476 database_key: "main".to_string(),
1477 holder_id: "writer-a".to_string(),
1478 term: 7,
1479 generation: 3,
1480 acquired_at_ms: 100,
1481 expires_at_ms: 200,
1482 };
1483
1484 let decoded = decode_serverless_writer_lease_json(
1485 &encode_serverless_writer_lease_json(&lease).expect("encode lease"),
1486 )
1487 .expect("decode lease");
1488
1489 assert_eq!(decoded, lease);
1490 assert_eq!(decoded.fencing_token(), (7, 3));
1491 assert!(!decoded.is_expired(199));
1492 assert!(decoded.is_expired(200));
1493 assert!(decoded.fenced_by_term(8));
1494 }
1495
1496 #[test]
1497 fn writer_lease_json_decodes_legacy_missing_term_as_base_term() {
1498 let decoded = decode_serverless_writer_lease_json(
1499 br#"{
1500 "database_key": "main",
1501 "holder_id": "writer-a",
1502 "generation": 3,
1503 "acquired_at_ms": 100,
1504 "expires_at_ms": 200
1505 }"#,
1506 )
1507 .expect("decode legacy lease");
1508
1509 assert_eq!(decoded.term, SERVERLESS_WRITER_LEASE_DEFAULT_TERM);
1510 }
1511
1512 #[test]
1513 fn writer_lease_artifact_names_are_deterministic() {
1514 assert_eq!(
1515 serverless_writer_lease_key("leases/", "main"),
1516 "leases/main.lease.json"
1517 );
1518 assert_eq!(
1519 serverless_writer_lease_temp_path("write", 10, 20, 30)
1520 .file_name()
1521 .and_then(|name| name.to_str()),
1522 Some("reddb-lease-write-10-20-30.json")
1523 );
1524 }
1525
1526 #[test]
1527 fn writer_lease_temp_file_round_trips_and_cleans_up() {
1528 let temp = ServerlessWriterLeaseTempFile::with_clock("write", 10, 20, 30);
1529 assert_eq!(
1530 temp.path().file_name().and_then(|name| name.to_str()),
1531 Some("reddb-lease-write-10-20-30.json")
1532 );
1533
1534 temp.write_bytes(b"{\"lease\":true}")
1535 .expect("write lease temp");
1536 assert_eq!(
1537 temp.read_bytes().expect("read lease temp"),
1538 b"{\"lease\":true}"
1539 );
1540 temp.cleanup().expect("cleanup lease temp");
1541 assert!(!temp.path().exists());
1542 }
1543
1544 fn temp_root(label: &str) -> PathBuf {
1545 std::env::temp_dir().join(format!(
1546 "reddb-file-{label}-{}-{}",
1547 std::process::id(),
1548 std::time::SystemTime::now()
1549 .duration_since(std::time::UNIX_EPOCH)
1550 .unwrap()
1551 .as_nanos()
1552 ))
1553 }
1554}