Skip to main content

reddb_file/serverless/
mod.rs

1//! Serverless file planning and boot artifact codecs.
2//!
3//! Serverless storage optimizes for cold start, hot restart, and object-store
4//! friendliness. The planner in this module deliberately returns deterministic
5//! artifact names so a runtime can fetch only the bytes required to boot, then
6//! lazily hydrate heavier packs.
7
8use std::fs::{self, File, OpenOptions};
9use std::io::Write;
10use std::path::{Path, PathBuf};
11
12use crate::embedded::{RdbFileError, RdbFileResult};
13
14mod boot;
15mod cache;
16mod extent;
17mod hydrate;
18mod lease;
19mod manifest;
20mod plan;
21mod pointer;
22mod secondary;
23
24pub use boot::*;
25pub use cache::*;
26pub use extent::*;
27pub use hydrate::*;
28pub use lease::*;
29pub use manifest::*;
30pub use plan::*;
31pub use pointer::*;
32pub use secondary::*;
33
34const SERVERLESS_MANIFEST_MAGIC: &[u8; 8] = b"RDPKMNF1";
35const SERVERLESS_BOOT_INDEX_MAGIC: &[u8; 8] = b"RDPKBIX1";
36const SERVERLESS_GENERATION_POINTER_MAGIC: &[u8; 8] = b"RDPKCUR1";
37const SERVERLESS_EXTENT_INDEX_MAGIC: &[u8; 8] = b"RDPKEXT1";
38const SERVERLESS_SECONDARY_INDEX_MAGIC: &[u8; 8] = b"RDPKSIX1";
39const SERVERLESS_ARTIFACT_VERSION: u16 = 1;
40const CHECKSUM_LEN: usize = 4;
41const CONTENT_HASH_LEN: usize = 32;
42const SERVERLESS_CRASH_INJECT_ENV: &str = "REDDB_SERVERLESS_CRASH_AT";
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
45pub enum ServerlessPackKind {
46    Manifest,
47    BootIndex,
48    ExtentIndex,
49    HotSnapshot,
50    WalTail,
51    CollectionData,
52    SecondaryIndex,
53    ColdArchive,
54}
55
56impl ServerlessPackKind {
57    pub const fn as_str(self) -> &'static str {
58        match self {
59            Self::Manifest => "manifest",
60            Self::BootIndex => "boot-index",
61            Self::ExtentIndex => "extent-index",
62            Self::HotSnapshot => "hot-snapshot",
63            Self::WalTail => "wal-tail",
64            Self::CollectionData => "collection-data",
65            Self::SecondaryIndex => "secondary-index",
66            Self::ColdArchive => "cold-archive",
67        }
68    }
69}
70
71impl TryFrom<u8> for ServerlessPackKind {
72    type Error = RdbFileError;
73
74    fn try_from(value: u8) -> RdbFileResult<Self> {
75        match value {
76            1 => Ok(Self::Manifest),
77            2 => Ok(Self::BootIndex),
78            3 => Ok(Self::ExtentIndex),
79            4 => Ok(Self::HotSnapshot),
80            5 => Ok(Self::WalTail),
81            6 => Ok(Self::CollectionData),
82            7 => Ok(Self::SecondaryIndex),
83            8 => Ok(Self::ColdArchive),
84            other => Err(RdbFileError::InvalidOperation(format!(
85                "unknown serverless pack kind {other}"
86            ))),
87        }
88    }
89}
90
91impl From<ServerlessPackKind> for u8 {
92    fn from(value: ServerlessPackKind) -> Self {
93        match value {
94            ServerlessPackKind::Manifest => 1,
95            ServerlessPackKind::BootIndex => 2,
96            ServerlessPackKind::ExtentIndex => 3,
97            ServerlessPackKind::HotSnapshot => 4,
98            ServerlessPackKind::WalTail => 5,
99            ServerlessPackKind::CollectionData => 6,
100            ServerlessPackKind::SecondaryIndex => 7,
101            ServerlessPackKind::ColdArchive => 8,
102        }
103    }
104}
105
106#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
107pub struct ServerlessContentHash(pub [u8; CONTENT_HASH_LEN]);
108
109impl ServerlessContentHash {
110    pub const ZERO: Self = Self([0; CONTENT_HASH_LEN]);
111
112    pub fn from_bytes(bytes: &[u8]) -> Self {
113        Self(*blake3::hash(bytes).as_bytes())
114    }
115
116    pub fn is_zero(self) -> bool {
117        self.0 == [0; CONTENT_HASH_LEN]
118    }
119}
120
121fn kind_for_artifact_path(path: &Path) -> ServerlessPackKind {
122    match path.file_stem().and_then(|stem| stem.to_str()) {
123        Some("manifest") => ServerlessPackKind::Manifest,
124        Some("boot-index") => ServerlessPackKind::BootIndex,
125        Some("extent-index") => ServerlessPackKind::ExtentIndex,
126        Some("hot-snapshot") => ServerlessPackKind::HotSnapshot,
127        Some("wal-tail") => ServerlessPackKind::WalTail,
128        Some("collection-data") => ServerlessPackKind::CollectionData,
129        Some("secondary-index") => ServerlessPackKind::SecondaryIndex,
130        Some("cold-archive") => ServerlessPackKind::ColdArchive,
131        _ => ServerlessPackKind::ColdArchive,
132    }
133}
134
135fn relative_to_generation_dir(path: &Path) -> PathBuf {
136    path.file_name()
137        .map(PathBuf::from)
138        .unwrap_or_else(|| path.to_path_buf())
139}
140
141fn write_bytes(path: impl AsRef<Path>, bytes: &[u8]) -> RdbFileResult<()> {
142    let path = path.as_ref();
143    if let Some(parent) = path.parent() {
144        fs::create_dir_all(parent)?;
145    }
146    let tmp_path = crate::layout::atomic_temp_path(path);
147    {
148        let mut file = OpenOptions::new()
149            .create(true)
150            .truncate(true)
151            .write(true)
152            .open(&tmp_path)?;
153        file.write_all(bytes)?;
154        crash_inject("serverless_pack_after_tmp_write");
155        file.sync_all()?;
156        crash_inject("serverless_pack_after_tmp_sync");
157    }
158    fs::rename(&tmp_path, path)?;
159    crash_inject("serverless_pack_after_rename");
160    if let Some(parent) = path.parent() {
161        if let Ok(dir) = File::open(parent) {
162            let _ = dir.sync_all();
163        }
164    }
165    crash_inject("serverless_pack_after_dir_sync");
166    Ok(())
167}
168
169fn write_current_pointer_bytes(path: impl AsRef<Path>, bytes: &[u8]) -> RdbFileResult<()> {
170    let path = path.as_ref();
171    if let Some(parent) = path.parent() {
172        fs::create_dir_all(parent)?;
173    }
174    let tmp_path = crate::layout::atomic_temp_path(path);
175    {
176        let mut file = OpenOptions::new()
177            .create(true)
178            .truncate(true)
179            .write(true)
180            .open(&tmp_path)?;
181        file.write_all(bytes)?;
182        crash_inject("current_pointer_after_tmp_write");
183        file.sync_all()?;
184        crash_inject("current_pointer_after_tmp_sync");
185    }
186    fs::rename(&tmp_path, path)?;
187    crash_inject("current_pointer_after_rename");
188    if let Some(parent) = path.parent() {
189        if let Ok(dir) = File::open(parent) {
190            let _ = dir.sync_all();
191        }
192    }
193    crash_inject("current_pointer_after_dir_sync");
194    Ok(())
195}
196
197fn crash_inject(point: &str) {
198    if std::env::var(SERVERLESS_CRASH_INJECT_ENV).ok().as_deref() == Some(point) {
199        std::process::exit(173);
200    }
201}
202
203fn verify_checksum(bytes: &[u8]) -> RdbFileResult<()> {
204    let Some(checksum_offset) = bytes.len().checked_sub(CHECKSUM_LEN) else {
205        return Err(RdbFileError::InvalidOperation(
206            "serverless artifact too short".into(),
207        ));
208    };
209    let stored = u32::from_le_bytes(bytes[checksum_offset..].try_into().unwrap());
210    let computed = crc32(&bytes[..checksum_offset]);
211    if stored != computed {
212        return Err(RdbFileError::InvalidOperation(format!(
213            "serverless artifact checksum mismatch: stored {stored:#010x}, computed {computed:#010x}"
214        )));
215    }
216    Ok(())
217}
218
219fn crc32(data: &[u8]) -> u32 {
220    let mut hasher = crc32fast::Hasher::new();
221    hasher.update(data);
222    hasher.finalize()
223}
224
225fn expect_magic(bytes: &[u8], cursor: &mut usize, magic: &[u8]) -> RdbFileResult<()> {
226    let actual = take_bytes(bytes, cursor, magic.len())?;
227    if actual != magic {
228        return Err(RdbFileError::InvalidOperation(
229            "invalid serverless artifact magic".into(),
230        ));
231    }
232    Ok(())
233}
234
235fn put_u16(out: &mut Vec<u8>, value: u16) {
236    out.extend_from_slice(&value.to_le_bytes());
237}
238
239fn put_u32(out: &mut Vec<u8>, value: u32) {
240    out.extend_from_slice(&value.to_le_bytes());
241}
242
243fn put_u64(out: &mut Vec<u8>, value: u64) {
244    out.extend_from_slice(&value.to_le_bytes());
245}
246
247fn put_string(out: &mut Vec<u8>, value: &str) {
248    put_u32(out, value.len() as u32);
249    out.extend_from_slice(value.as_bytes());
250}
251
252fn put_bytes(out: &mut Vec<u8>, value: &[u8]) {
253    put_u32(out, value.len() as u32);
254    out.extend_from_slice(value);
255}
256
257fn put_content_hash(out: &mut Vec<u8>, value: ServerlessContentHash) {
258    out.extend_from_slice(&value.0);
259}
260
261fn take_bytes<'a>(bytes: &'a [u8], cursor: &mut usize, len: usize) -> RdbFileResult<&'a [u8]> {
262    let end = cursor
263        .checked_add(len)
264        .ok_or_else(|| RdbFileError::InvalidOperation("serverless cursor overflow".into()))?;
265    if end > bytes.len().saturating_sub(CHECKSUM_LEN) {
266        return Err(RdbFileError::InvalidOperation(
267            "serverless artifact truncated".into(),
268        ));
269    }
270    let value = &bytes[*cursor..end];
271    *cursor = end;
272    Ok(value)
273}
274
275fn reject_trailing_bytes(bytes: &[u8], cursor: usize) -> RdbFileResult<()> {
276    if cursor != bytes.len().saturating_sub(CHECKSUM_LEN) {
277        return Err(RdbFileError::InvalidOperation(
278            "serverless artifact has trailing bytes".into(),
279        ));
280    }
281    Ok(())
282}
283
284fn take_u8(bytes: &[u8], cursor: &mut usize) -> RdbFileResult<u8> {
285    Ok(take_bytes(bytes, cursor, 1)?[0])
286}
287
288fn take_u16(bytes: &[u8], cursor: &mut usize) -> RdbFileResult<u16> {
289    Ok(u16::from_le_bytes(
290        take_bytes(bytes, cursor, 2)?.try_into().unwrap(),
291    ))
292}
293
294fn take_u32(bytes: &[u8], cursor: &mut usize) -> RdbFileResult<u32> {
295    Ok(u32::from_le_bytes(
296        take_bytes(bytes, cursor, 4)?.try_into().unwrap(),
297    ))
298}
299
300fn take_u64(bytes: &[u8], cursor: &mut usize) -> RdbFileResult<u64> {
301    Ok(u64::from_le_bytes(
302        take_bytes(bytes, cursor, 8)?.try_into().unwrap(),
303    ))
304}
305
306fn take_string(bytes: &[u8], cursor: &mut usize) -> RdbFileResult<String> {
307    let len = take_u32(bytes, cursor)? as usize;
308    let raw = take_bytes(bytes, cursor, len)?;
309    std::str::from_utf8(raw)
310        .map(|value| value.to_string())
311        .map_err(|err| RdbFileError::InvalidOperation(format!("invalid utf-8 string: {err}")))
312}
313
314fn take_vec_bytes(bytes: &[u8], cursor: &mut usize) -> RdbFileResult<Vec<u8>> {
315    let len = take_u32(bytes, cursor)? as usize;
316    Ok(take_bytes(bytes, cursor, len)?.to_vec())
317}
318
319fn take_content_hash(bytes: &[u8], cursor: &mut usize) -> RdbFileResult<ServerlessContentHash> {
320    let raw = take_bytes(bytes, cursor, CONTENT_HASH_LEN)?;
321    let mut hash = [0u8; CONTENT_HASH_LEN];
322    hash.copy_from_slice(raw);
323    Ok(ServerlessContentHash(hash))
324}
325
326#[cfg(test)]
327mod tests {
328    use super::*;
329
330    #[test]
331    fn serverless_paths_are_generation_scoped_and_deterministic() {
332        let plan = ServerlessFilePlan::new("/tmp/reddb", "tenant-a/db", 42);
333        assert_eq!(
334            plan.manifest_path(),
335            PathBuf::from("/tmp/reddb/tenant-a/db/g00000000000000000042/manifest.redpack")
336        );
337        assert_eq!(
338            plan.boot_index_path(),
339            PathBuf::from("/tmp/reddb/tenant-a/db/g00000000000000000042/boot-index.redpack")
340        );
341        assert!(ServerlessFilePlan::is_generation_dir(Path::new(
342            "g00000000000000000042"
343        )));
344        assert!(!ServerlessFilePlan::is_generation_dir(Path::new("g42")));
345    }
346
347    #[test]
348    fn cold_start_fetches_manifest_boot_snapshot_then_wal_tail() {
349        let plan = ServerlessFilePlan::new("/tmp/reddb", "db", 7);
350        let boot = ServerlessBootPlan::cold(&plan);
351        assert_eq!(boot.required_first[0], plan.manifest_path());
352        assert_eq!(boot.required_first[1], plan.boot_index_path());
353        assert_eq!(boot.required_first[2], plan.extent_index_path());
354        assert_eq!(boot.required_first[3], plan.hot_snapshot_path());
355        assert_eq!(boot.required_first[4], plan.wal_tail_path());
356        assert_eq!(boot.lazy_after_open.len(), 3);
357    }
358
359    #[test]
360    fn manifest_round_trips_with_crc_checked_binary_codec() {
361        let mut manifest = ServerlessManifest::new("tenant/db", 11);
362        manifest.push(ServerlessManifestEntry::from_bytes(
363            ServerlessPackKind::WalTail,
364            "wal-tail.redpack",
365            b"wal tail payload",
366        ));
367        manifest.push(ServerlessManifestEntry::from_bytes(
368            ServerlessPackKind::BootIndex,
369            "boot-index.redpack",
370            b"boot index payload",
371        ));
372
373        let encoded = manifest.encode();
374        let decoded = ServerlessManifest::decode(&encoded).expect("decode manifest");
375        assert_eq!(decoded, manifest);
376        assert!(!decoded.entries[0].content_hash.is_zero());
377
378        let mut corrupt = encoded;
379        let last_payload_byte = corrupt.len() - CHECKSUM_LEN - 1;
380        corrupt[last_payload_byte] ^= 0x01;
381        let err = ServerlessManifest::decode(&corrupt).expect_err("checksum catches corruption");
382        assert!(err.to_string().contains("checksum mismatch"), "{err}");
383    }
384
385    #[test]
386    fn boot_index_round_trips_and_preserves_coldstart_order() {
387        let plan = ServerlessFilePlan::new("/tmp/reddb", "db", 9);
388        let index = ServerlessBootIndex::from_plan(&plan);
389
390        assert_eq!(
391            index.required_first(),
392            vec![
393                PathBuf::from("manifest.redpack"),
394                PathBuf::from("boot-index.redpack"),
395                PathBuf::from("extent-index.redpack"),
396                PathBuf::from("hot-snapshot.redpack"),
397                PathBuf::from("wal-tail.redpack"),
398            ]
399        );
400        assert_eq!(
401            index.lazy_after_open(),
402            vec![
403                PathBuf::from("collection-data.redpack"),
404                PathBuf::from("secondary-index.redpack"),
405                PathBuf::from("cold-archive.redpack"),
406            ]
407        );
408
409        let decoded = ServerlessBootIndex::decode(&index.encode()).expect("decode boot index");
410        assert_eq!(decoded, index);
411    }
412
413    #[test]
414    fn collection_data_extent_ref_uses_canonical_pack_path() {
415        let plan = ServerlessFilePlan::new("/tmp/reddb", "db", 9);
416        let payload = b"collection snapshot bytes";
417        let extent = plan
418            .collection_data_extent_ref("events", 12, payload, true)
419            .expect("extent ref");
420
421        assert_eq!(extent.collection, "events");
422        assert_eq!(
423            extent.relative_path,
424            PathBuf::from("collection-data.redpack")
425        );
426        assert_eq!(extent.offset, 12);
427        assert_eq!(extent.bytes, payload.len() as u64);
428        assert!(extent.hot);
429    }
430
431    #[test]
432    fn manifest_rejects_trailing_payload_bytes() {
433        let manifest = ServerlessManifest::new("tenant/db", 11);
434        let mut encoded = manifest.encode();
435        encoded.truncate(encoded.len() - CHECKSUM_LEN);
436        encoded.push(0xAA);
437        let checksum = crc32(&encoded);
438        put_u32(&mut encoded, checksum);
439
440        let err = ServerlessManifest::decode(&encoded).expect_err("trailing bytes rejected");
441        assert!(err.to_string().contains("trailing bytes"), "{err}");
442    }
443
444    #[test]
445    fn generation_pointer_round_trips_and_points_to_immutable_manifest() {
446        let plan = ServerlessFilePlan::new("/tmp/reddb", "tenant/db", 19);
447        let mut manifest = ServerlessManifest::new("tenant/db", 19);
448        manifest.push(ServerlessManifestEntry::from_bytes(
449            ServerlessPackKind::HotSnapshot,
450            "hot-snapshot.redpack",
451            b"snapshot",
452        ));
453
454        let pointer = ServerlessGenerationPointer::from_manifest(&plan, &manifest);
455        assert_eq!(pointer.generation, 19);
456        assert_eq!(
457            pointer.manifest_relative_path,
458            PathBuf::from("g00000000000000000019/manifest.redpack")
459        );
460        assert!(!pointer.manifest_content_hash.is_zero());
461
462        let decoded =
463            ServerlessGenerationPointer::decode(&pointer.encode()).expect("decode pointer");
464        assert_eq!(decoded, pointer);
465    }
466
467    #[test]
468    fn extent_index_finds_key_ranges_and_hot_prefetch_paths() {
469        let mut index = ServerlessExtentIndex::new(21);
470        index.push(
471            ServerlessExtentRef::new(
472                "orders",
473                b"a".to_vec(),
474                b"m".to_vec(),
475                "orders-000.redpack",
476                0,
477                b"orders-a-m",
478                true,
479            )
480            .expect("extent"),
481        );
482        index.push(
483            ServerlessExtentRef::new(
484                "orders",
485                b"m".to_vec(),
486                b"z".to_vec(),
487                "orders-001.redpack",
488                0,
489                b"orders-m-z",
490                false,
491            )
492            .expect("extent"),
493        );
494
495        let matches = index.extents_for_key("orders", b"b");
496        assert_eq!(matches.len(), 1);
497        assert_eq!(
498            matches[0].relative_path,
499            PathBuf::from("orders-000.redpack")
500        );
501        assert_eq!(
502            index.hot_prefetch_paths(),
503            vec![PathBuf::from("orders-000.redpack")]
504        );
505
506        let decoded = ServerlessExtentIndex::decode(&index.encode()).expect("decode extent index");
507        assert_eq!(decoded, index);
508    }
509
510    #[test]
511    fn hydration_plan_uses_only_matching_extents() {
512        let mut index = ServerlessExtentIndex::new(22);
513        index.push(
514            ServerlessExtentRef::new(
515                "orders",
516                b"a".to_vec(),
517                b"m".to_vec(),
518                "orders-000.redpack",
519                64,
520                b"orders-a-m",
521                true,
522            )
523            .expect("extent"),
524        );
525        index.push(
526            ServerlessExtentRef::new(
527                "orders",
528                b"m".to_vec(),
529                b"z".to_vec(),
530                "orders-001.redpack",
531                128,
532                b"orders-m-z",
533                false,
534            )
535            .expect("extent"),
536        );
537
538        let plan = index.hydration_plan_for_key("orders", b"n");
539        assert_eq!(plan.requests.len(), 1);
540        assert_eq!(
541            plan.requests[0].relative_path,
542            PathBuf::from("orders-001.redpack")
543        );
544        assert_eq!(plan.requests[0].offset, 128);
545        assert_eq!(plan.total_bytes(), b"orders-m-z".len() as u64);
546
547        let hot = index.hot_hydration_plan();
548        assert_eq!(hot.requests.len(), 1);
549        assert_eq!(
550            hot.requests[0].relative_path,
551            PathBuf::from("orders-000.redpack")
552        );
553    }
554
555    #[test]
556    fn hydration_plan_for_range_uses_overlapping_extents() {
557        let mut index = ServerlessExtentIndex::new(27);
558        index.push(
559            ServerlessExtentRef::new(
560                "orders",
561                b"a".to_vec(),
562                b"f".to_vec(),
563                "orders-000.redpack",
564                0,
565                b"orders-a-f",
566                true,
567            )
568            .expect("extent"),
569        );
570        index.push(
571            ServerlessExtentRef::new(
572                "orders",
573                b"f".to_vec(),
574                b"p".to_vec(),
575                "orders-001.redpack",
576                64,
577                b"orders-f-p",
578                false,
579            )
580            .expect("extent"),
581        );
582        index.push(
583            ServerlessExtentRef::new(
584                "orders",
585                b"p".to_vec(),
586                b"z".to_vec(),
587                "orders-002.redpack",
588                128,
589                b"orders-p-z",
590                false,
591            )
592            .expect("extent"),
593        );
594        index.push(
595            ServerlessExtentRef::new(
596                "users",
597                b"a".to_vec(),
598                b"z".to_vec(),
599                "users-000.redpack",
600                0,
601                b"users-a-z",
602                false,
603            )
604            .expect("extent"),
605        );
606
607        let plan = index
608            .hydration_plan_for_range("orders", b"e", b"q")
609            .expect("range plan");
610        assert_eq!(plan.requests.len(), 3);
611        assert_eq!(
612            plan.requests
613                .iter()
614                .map(|request| request.relative_path.clone())
615                .collect::<Vec<_>>(),
616            vec![
617                PathBuf::from("orders-000.redpack"),
618                PathBuf::from("orders-001.redpack"),
619                PathBuf::from("orders-002.redpack"),
620            ]
621        );
622
623        let err = index
624            .hydration_plan_for_range("orders", b"q", b"e")
625            .expect_err("invalid range rejected");
626        assert!(err.to_string().contains("range_start"), "{err}");
627    }
628
629    #[test]
630    fn secondary_index_round_trips_and_builds_collection_hydration_plan() {
631        let mut extent_index = ServerlessExtentIndex::new(29);
632        extent_index.push(
633            ServerlessExtentRef::new(
634                "orders",
635                b"a".to_vec(),
636                b"m".to_vec(),
637                "collection-data.redpack",
638                0,
639                b"orders-left",
640                true,
641            )
642            .expect("orders left"),
643        );
644        extent_index.push(
645            ServerlessExtentRef::new(
646                "orders",
647                b"m".to_vec(),
648                b"z".to_vec(),
649                "collection-data.redpack",
650                11,
651                b"orders-right",
652                false,
653            )
654            .expect("orders right"),
655        );
656        extent_index.push(
657            ServerlessExtentRef::new(
658                "users",
659                b"a".to_vec(),
660                b"z".to_vec(),
661                "collection-data.redpack",
662                23,
663                b"users",
664                false,
665            )
666            .expect("users"),
667        );
668
669        let secondary = ServerlessSecondaryIndex::from_extent_index(&extent_index);
670        let decoded =
671            ServerlessSecondaryIndex::decode(&secondary.encode()).expect("decode secondary index");
672        assert_eq!(decoded, secondary);
673
674        let hydration = decoded.hydration_plan_for_collection("orders");
675        assert_eq!(hydration.generation, 29);
676        assert_eq!(hydration.requests.len(), 2);
677        assert_eq!(hydration.total_bytes(), 23);
678        assert!(hydration.requests[0].content_hash != ServerlessContentHash::ZERO);
679    }
680
681    #[test]
682    fn hydrate_local_plan_reads_only_requested_byte_ranges() {
683        let root = temp_root("serverless-hydrate-range");
684        let plan = ServerlessFilePlan::new(&root, "db", 23);
685        let collection_payload = b"aaaabbbbcccc";
686        let mut index = ServerlessExtentIndex::new(23);
687        index.push(
688            ServerlessExtentRef::new(
689                "orders",
690                b"a".to_vec(),
691                b"m".to_vec(),
692                "collection-data.redpack",
693                4,
694                b"bbbb",
695                true,
696            )
697            .expect("extent"),
698        );
699        plan.publish_core_generation(&index, collection_payload, b"secondary")
700            .expect("publish generation");
701
702        let hydration = index.hydration_plan_for_key("orders", b"b");
703        let hydrated = plan
704            .hydrate_local_plan(&hydration)
705            .expect("hydrate local range");
706        assert_eq!(hydrated.len(), 1);
707        assert_eq!(hydrated[0].payload, b"bbbb");
708        assert_eq!(hydrated[0].request.offset, 4);
709        assert_eq!(hydrated[0].request.bytes, 4);
710
711        let _ = std::fs::remove_dir_all(root);
712    }
713
714    #[test]
715    fn hydrate_local_plan_rejects_corrupt_or_out_of_bounds_ranges() {
716        let root = temp_root("serverless-hydrate-corrupt");
717        let plan = ServerlessFilePlan::new(&root, "db", 24);
718        let mut index = ServerlessExtentIndex::new(24);
719        index.push(
720            ServerlessExtentRef::new(
721                "orders",
722                b"a".to_vec(),
723                b"m".to_vec(),
724                "collection-data.redpack",
725                4,
726                b"bbbb",
727                true,
728            )
729            .expect("extent"),
730        );
731        plan.publish_core_generation(&index, b"aaaabbbbcccc", b"secondary")
732            .expect("publish generation");
733        std::fs::write(plan.collection_data_path(), b"aaaaBBBBcccc").expect("corrupt pack");
734
735        let hydration = index.hydration_plan_for_key("orders", b"b");
736        let err = plan
737            .hydrate_local_plan(&hydration)
738            .expect_err("corrupt range rejected");
739        assert!(err.to_string().contains("checksum mismatch"), "{err}");
740
741        let mut out_of_bounds = hydration.clone();
742        out_of_bounds.requests[0].offset = 11;
743        let err = plan
744            .hydrate_local_plan(&out_of_bounds)
745            .expect_err("out of bounds rejected");
746        assert!(err.to_string().contains("exceeds pack"), "{err}");
747
748        let _ = std::fs::remove_dir_all(root);
749    }
750
751    #[test]
752    fn prefetch_hot_extents_hydrates_only_hot_ranges() {
753        let root = temp_root("serverless-hot-prefetch");
754        let plan = ServerlessFilePlan::new(&root, "db", 25);
755        let mut index = ServerlessExtentIndex::new(25);
756        index.push(
757            ServerlessExtentRef::new(
758                "orders",
759                b"a".to_vec(),
760                b"m".to_vec(),
761                "collection-data.redpack",
762                0,
763                b"hot!",
764                true,
765            )
766            .expect("hot extent"),
767        );
768        index.push(
769            ServerlessExtentRef::new(
770                "orders",
771                b"m".to_vec(),
772                b"z".to_vec(),
773                "collection-data.redpack",
774                4,
775                b"cold",
776                false,
777            )
778            .expect("cold extent"),
779        );
780        plan.publish_core_generation(&index, b"hot!cold", b"secondary")
781            .expect("publish generation");
782
783        let hydrated = plan.prefetch_hot_extents(&index).expect("prefetch hot");
784        assert_eq!(hydrated.len(), 1);
785        assert_eq!(hydrated[0].payload, b"hot!");
786
787        let _ = std::fs::remove_dir_all(root);
788    }
789
790    #[test]
791    fn prefetch_hot_extents_cached_populates_only_hot_cache_entries() {
792        let root = temp_root("serverless-hot-prefetch-cache");
793        let plan = ServerlessFilePlan::new(&root, "db", 29);
794        let cache = ServerlessLocalCache::new(root.join("cache"), 29);
795        let mut index = ServerlessExtentIndex::new(29);
796        index.push(
797            ServerlessExtentRef::new(
798                "orders",
799                b"a".to_vec(),
800                b"m".to_vec(),
801                "collection-data.redpack",
802                0,
803                b"hot!",
804                true,
805            )
806            .expect("hot extent"),
807        );
808        index.push(
809            ServerlessExtentRef::new(
810                "orders",
811                b"m".to_vec(),
812                b"z".to_vec(),
813                "collection-data.redpack",
814                4,
815                b"cold",
816                false,
817            )
818            .expect("cold extent"),
819        );
820        plan.publish_core_generation(&index, b"hot!cold", b"secondary")
821            .expect("publish generation");
822
823        let hot_request = index.hydration_plan_for_key("orders", b"b").requests[0].clone();
824        let cold_request = index.hydration_plan_for_key("orders", b"n").requests[0].clone();
825
826        let hydrated = plan
827            .prefetch_hot_extents_cached(&index, &cache)
828            .expect("prefetch hot into cache");
829        assert_eq!(hydrated.len(), 1);
830        assert_eq!(hydrated[0].payload, b"hot!");
831        assert_eq!(
832            std::fs::read(cache.path_for_request(&hot_request)).expect("read hot cache"),
833            b"hot!"
834        );
835        assert!(
836            !cache.path_for_request(&cold_request).exists(),
837            "cold extent should not be prefetched into cache"
838        );
839
840        let _ = std::fs::remove_dir_all(root);
841    }
842
843    #[test]
844    fn hydrate_local_request_cached_validates_and_repairs_corrupt_cache() {
845        let root = temp_root("serverless-hydrate-cache");
846        let plan = ServerlessFilePlan::new(&root, "db", 26);
847        let cache = ServerlessLocalCache::new(root.join("cache"), 26);
848        let mut index = ServerlessExtentIndex::new(26);
849        index.push(
850            ServerlessExtentRef::new(
851                "orders",
852                b"a".to_vec(),
853                b"z".to_vec(),
854                "collection-data.redpack",
855                4,
856                b"bbbb",
857                true,
858            )
859            .expect("extent"),
860        );
861        plan.publish_core_generation(&index, b"aaaabbbbcccc", b"secondary")
862            .expect("publish generation");
863        let request = index.hydration_plan_for_key("orders", b"m").requests[0].clone();
864
865        let first = plan
866            .hydrate_local_request_cached(&request, &cache)
867            .expect("hydrate and cache");
868        assert_eq!(first.payload, b"bbbb");
869        let cache_path = cache.path_for_request(&request);
870        assert_eq!(std::fs::read(&cache_path).expect("read cache"), b"bbbb");
871
872        std::fs::write(&cache_path, b"xxxx").expect("corrupt cache");
873        let repaired = plan
874            .hydrate_local_request_cached(&request, &cache)
875            .expect("repair corrupt cache from pack");
876        assert_eq!(repaired.payload, b"bbbb");
877        assert_eq!(
878            std::fs::read(&cache_path).expect("read repaired cache"),
879            b"bbbb"
880        );
881
882        let _ = std::fs::remove_dir_all(root);
883    }
884
885    #[test]
886    fn hydrate_local_plan_cached_populates_multiple_range_entries() {
887        let root = temp_root("serverless-hydrate-plan-cache");
888        let plan = ServerlessFilePlan::new(&root, "db", 28);
889        let cache = ServerlessLocalCache::new(root.join("cache"), 28);
890        let mut index = ServerlessExtentIndex::new(28);
891        index.push(
892            ServerlessExtentRef::new(
893                "orders",
894                b"a".to_vec(),
895                b"m".to_vec(),
896                "collection-data.redpack",
897                0,
898                b"left",
899                true,
900            )
901            .expect("left extent"),
902        );
903        index.push(
904            ServerlessExtentRef::new(
905                "orders",
906                b"m".to_vec(),
907                b"z".to_vec(),
908                "collection-data.redpack",
909                4,
910                b"right",
911                false,
912            )
913            .expect("right extent"),
914        );
915        plan.publish_core_generation(&index, b"leftright", b"secondary")
916            .expect("publish generation");
917
918        let hydration = index
919            .hydration_plan_for_range("orders", b"b", b"y")
920            .expect("range hydration plan");
921        let hydrated = plan
922            .hydrate_local_plan_cached(&hydration, &cache)
923            .expect("hydrate cached plan");
924        assert_eq!(hydrated.len(), 2);
925        assert_eq!(hydrated[0].payload, b"left");
926        assert_eq!(hydrated[1].payload, b"right");
927        for range in hydrated {
928            assert!(cache.path_for_request(&range.request).exists());
929        }
930
931        let _ = std::fs::remove_dir_all(root);
932    }
933
934    #[test]
935    fn cached_hydration_enforces_max_hot_bytes_after_writes() {
936        let root = temp_root("serverless-hydrate-cache-budget");
937        let plan =
938            ServerlessFilePlan::new(&root, "db", 29).with_cache_policy(ServerlessCachePolicy {
939                max_hot_bytes: 5,
940                ..ServerlessCachePolicy::default()
941            });
942        let cache = ServerlessLocalCache::new(root.join("cache"), 29);
943        let mut index = ServerlessExtentIndex::new(29);
944        index.push(
945            ServerlessExtentRef::new(
946                "orders",
947                b"a".to_vec(),
948                b"m".to_vec(),
949                "collection-data.redpack",
950                0,
951                b"left",
952                true,
953            )
954            .expect("left extent"),
955        );
956        index.push(
957            ServerlessExtentRef::new(
958                "orders",
959                b"m".to_vec(),
960                b"z".to_vec(),
961                "collection-data.redpack",
962                4,
963                b"right",
964                true,
965            )
966            .expect("right extent"),
967        );
968        plan.publish_core_generation(&index, b"leftright", b"secondary")
969            .expect("publish generation");
970
971        let hydration = index
972            .hydration_plan_for_range("orders", b"a", b"z")
973            .expect("range hydration plan");
974        let hydrated = plan
975            .hydrate_local_plan_cached(&hydration, &cache)
976            .expect("hydrate cached plan");
977        assert_eq!(hydrated.len(), 2);
978        assert_eq!(hydrated[0].payload, b"left");
979        assert_eq!(hydrated[1].payload, b"right");
980
981        let entries = cache.cached_entries().expect("cache entries");
982        let cached_bytes: u64 = entries.iter().map(|entry| entry.bytes).sum();
983        assert!(
984            cached_bytes <= 5,
985            "cache should stay within max_hot_bytes, got {cached_bytes}"
986        );
987
988        let _ = std::fs::remove_dir_all(root);
989    }
990
991    #[test]
992    fn cache_eviction_prefers_cold_old_entries() {
993        let entries = vec![
994            ServerlessCacheEntry::new("cold-old.redpack", 100, false, 10),
995            ServerlessCacheEntry::new("hot-old.redpack", 100, true, 1),
996            ServerlessCacheEntry::new("cold-new.redpack", 100, false, 20),
997        ];
998
999        let plan = ServerlessCacheEvictionPlan::plan(&entries, 150);
1000        assert_eq!(
1001            plan.evict,
1002            vec![
1003                PathBuf::from("cold-old.redpack"),
1004                PathBuf::from("cold-new.redpack"),
1005            ]
1006        );
1007        assert_eq!(plan.bytes_after_eviction, 100);
1008    }
1009
1010    #[test]
1011    fn extent_index_writes_and_reads_from_disk() {
1012        let root = temp_root("serverless-extent-index");
1013        let plan = ServerlessFilePlan::new(&root, "db", 5);
1014        let mut index = ServerlessExtentIndex::new(5);
1015        index.push(
1016            ServerlessExtentRef::new(
1017                "events",
1018                b"2026-01".to_vec(),
1019                b"2026-02".to_vec(),
1020                "events-2026-01.redpack",
1021                128,
1022                b"payload",
1023                true,
1024            )
1025            .expect("extent"),
1026        );
1027
1028        index
1029            .write_to_path(plan.extent_index_path())
1030            .expect("write extent index");
1031        assert_eq!(
1032            ServerlessExtentIndex::read_from_path(plan.extent_index_path())
1033                .expect("read extent index"),
1034            index
1035        );
1036        let _ = std::fs::remove_dir_all(root);
1037    }
1038
1039    #[test]
1040    fn manifest_and_boot_index_write_and_read_from_disk() {
1041        let root = temp_root("serverless-manifest");
1042        let plan = ServerlessFilePlan::new(&root, "db", 1);
1043
1044        let mut manifest = ServerlessManifest::new("db", 1);
1045        manifest.push(ServerlessManifestEntry::new(
1046            ServerlessPackKind::Manifest,
1047            "manifest.redpack",
1048            128,
1049            0xCAFE_BABE,
1050        ));
1051        manifest
1052            .write_to_path(plan.manifest_path())
1053            .expect("write manifest");
1054
1055        let boot_index = ServerlessBootIndex::from_plan(&plan);
1056        boot_index
1057            .write_to_path(plan.boot_index_path())
1058            .expect("write boot index");
1059
1060        assert_eq!(
1061            ServerlessManifest::read_from_path(plan.manifest_path()).expect("read manifest"),
1062            manifest
1063        );
1064        assert_eq!(
1065            ServerlessBootIndex::read_from_path(plan.boot_index_path()).expect("read boot index"),
1066            boot_index
1067        );
1068
1069        let _ = std::fs::remove_dir_all(root);
1070    }
1071
1072    #[test]
1073    fn publish_core_generation_writes_required_packs_before_current_pointer() {
1074        let root = temp_root("serverless-publish-core");
1075        let plan = ServerlessFilePlan::new(&root, "db", 12);
1076        let mut extent_index = ServerlessExtentIndex::new(12);
1077        extent_index.push(
1078            ServerlessExtentRef::new(
1079                "events",
1080                b"a".to_vec(),
1081                b"z".to_vec(),
1082                "collection-data.redpack",
1083                0,
1084                b"collection-bytes",
1085                true,
1086            )
1087            .expect("extent"),
1088        );
1089
1090        let pointer = plan
1091            .publish_core_generation(&extent_index, b"collection-bytes", b"secondary-bytes")
1092            .expect("publish core generation");
1093        assert_eq!(pointer.generation, 12);
1094        assert_eq!(plan.read_current_pointer().expect("read current"), pointer);
1095        assert_eq!(
1096            plan.read_current_pointer_verified()
1097                .expect("read verified current"),
1098            pointer
1099        );
1100        assert!(plan.boot_index_path().exists());
1101        assert!(plan.extent_index_path().exists());
1102        assert!(plan.collection_data_path().exists());
1103        assert!(plan.secondary_index_path().exists());
1104
1105        let manifest =
1106            ServerlessManifest::read_from_path(plan.manifest_path()).expect("read manifest");
1107        plan.validate_complete_generation(&manifest)
1108            .expect("complete generation validates");
1109        assert!(manifest
1110            .entries
1111            .iter()
1112            .any(|entry| entry.kind == ServerlessPackKind::ExtentIndex));
1113        assert!(manifest
1114            .entries
1115            .iter()
1116            .any(|entry| entry.kind == ServerlessPackKind::CollectionData));
1117        assert!(manifest
1118            .entries
1119            .iter()
1120            .any(|entry| entry.kind == ServerlessPackKind::SecondaryIndex));
1121
1122        let _ = std::fs::remove_dir_all(root);
1123    }
1124
1125    #[test]
1126    fn verified_current_pointer_rejects_missing_or_corrupt_generation() {
1127        let root = temp_root("serverless-current-verified");
1128        let plan = ServerlessFilePlan::new(&root, "db", 14);
1129        let mut extent_index = ServerlessExtentIndex::new(14);
1130        extent_index.push(
1131            ServerlessExtentRef::new(
1132                "events",
1133                b"a".to_vec(),
1134                b"z".to_vec(),
1135                "collection-data.redpack",
1136                0,
1137                b"collection-bytes",
1138                true,
1139            )
1140            .expect("extent"),
1141        );
1142        let pointer = plan
1143            .publish_core_generation(&extent_index, b"collection-bytes", b"secondary-bytes")
1144            .expect("publish complete generation");
1145        assert_eq!(
1146            plan.read_current_pointer_verified()
1147                .expect("verified pointer before corruption"),
1148            pointer
1149        );
1150
1151        std::fs::remove_file(plan.collection_data_path()).expect("remove required pack");
1152        let err = plan
1153            .read_current_pointer_verified()
1154            .expect_err("verified pointer must reject missing required pack");
1155        assert!(
1156            err.to_string().contains("No such file") || err.to_string().contains("not found"),
1157            "{err}"
1158        );
1159
1160        let _ = std::fs::remove_dir_all(root);
1161    }
1162
1163    #[test]
1164    fn verified_current_pointer_rejects_manifest_hash_mismatch() {
1165        let root = temp_root("serverless-current-manifest-hash");
1166        let plan = ServerlessFilePlan::new(&root, "db", 15);
1167        let mut extent_index = ServerlessExtentIndex::new(15);
1168        extent_index.push(
1169            ServerlessExtentRef::new(
1170                "events",
1171                b"a".to_vec(),
1172                b"z".to_vec(),
1173                "collection-data.redpack",
1174                0,
1175                b"collection-bytes",
1176                true,
1177            )
1178            .expect("extent"),
1179        );
1180        plan.publish_core_generation(&extent_index, b"collection-bytes", b"secondary-bytes")
1181            .expect("publish complete generation");
1182
1183        std::fs::write(plan.manifest_path(), b"corrupt-manifest").expect("corrupt manifest");
1184        let err = plan
1185            .read_current_pointer_verified()
1186            .expect_err("verified pointer must reject manifest hash mismatch");
1187        assert!(
1188            err.to_string().contains("manifest")
1189                && (err.to_string().contains("bytes")
1190                    || err.to_string().contains("checksum")
1191                    || err.to_string().contains("hash")),
1192            "{err}"
1193        );
1194
1195        let _ = std::fs::remove_dir_all(root);
1196    }
1197
1198    #[test]
1199    fn publish_pointer_rejects_incomplete_or_corrupt_generation() {
1200        let root = temp_root("serverless-publish-rejects");
1201        let plan = ServerlessFilePlan::new(&root, "db", 13);
1202        let mut manifest = ServerlessManifest::new("db", 13);
1203        manifest.push(ServerlessManifestEntry::from_bytes(
1204            ServerlessPackKind::BootIndex,
1205            "boot-index.redpack",
1206            b"boot",
1207        ));
1208        manifest
1209            .write_to_path(plan.manifest_path())
1210            .expect("write incomplete manifest");
1211
1212        let err = plan
1213            .publish_generation_pointer(&manifest)
1214            .expect_err("incomplete generation rejected");
1215        assert!(err.to_string().contains("missing required"), "{err}");
1216        assert!(!plan.current_pointer_path().exists());
1217
1218        let mut extent_index = ServerlessExtentIndex::new(13);
1219        extent_index.push(
1220            ServerlessExtentRef::new(
1221                "events",
1222                b"a".to_vec(),
1223                b"z".to_vec(),
1224                "collection-data.redpack",
1225                0,
1226                b"collection-bytes",
1227                true,
1228            )
1229            .expect("extent"),
1230        );
1231        plan.publish_core_generation(&extent_index, b"collection-bytes", b"secondary-bytes")
1232            .expect("publish complete generation");
1233        std::fs::write(plan.collection_data_path(), b"collection-ByTes")
1234            .expect("corrupt collection pack");
1235        let manifest =
1236            ServerlessManifest::read_from_path(plan.manifest_path()).expect("read manifest");
1237        let err = plan
1238            .publish_generation_pointer(&manifest)
1239            .expect_err("corrupt generation rejected");
1240        assert!(err.to_string().contains("checksum mismatch"), "{err}");
1241
1242        let _ = std::fs::remove_dir_all(root);
1243    }
1244
1245    #[test]
1246    fn incomplete_generation_publish_preserves_existing_current_pointer() {
1247        let root = temp_root("serverless-current-preserved");
1248        let first = ServerlessFilePlan::new(&root, "db", 1);
1249        let mut first_index = ServerlessExtentIndex::new(1);
1250        first_index.push(
1251            ServerlessExtentRef::new(
1252                "events",
1253                b"a".to_vec(),
1254                b"z".to_vec(),
1255                "collection-data.redpack",
1256                0,
1257                b"first",
1258                true,
1259            )
1260            .expect("extent"),
1261        );
1262        let first_pointer = first
1263            .publish_core_generation(&first_index, b"first", b"secondary")
1264            .expect("publish first generation");
1265
1266        let second = ServerlessFilePlan::new(&root, "db", 2);
1267        let mut incomplete = ServerlessManifest::new("db", 2);
1268        incomplete.push(ServerlessManifestEntry::from_bytes(
1269            ServerlessPackKind::BootIndex,
1270            "boot-index.redpack",
1271            b"boot",
1272        ));
1273        incomplete
1274            .write_to_path(second.manifest_path())
1275            .expect("write incomplete manifest");
1276        let err = second
1277            .publish_generation_pointer(&incomplete)
1278            .expect_err("incomplete generation rejected");
1279        assert!(err.to_string().contains("missing required"), "{err}");
1280
1281        assert_eq!(
1282            first.read_current_pointer().expect("read current"),
1283            first_pointer
1284        );
1285
1286        let _ = std::fs::remove_dir_all(root);
1287    }
1288
1289    #[test]
1290    fn file_plan_derives_cache_generation_and_hot_boot_paths() {
1291        let plan = ServerlessFilePlan::for_data_path("/tmp/reddb/main.rdb", 41);
1292        assert_eq!(plan.root, PathBuf::from("/tmp/reddb/main.serverless"));
1293        assert_eq!(plan.namespace, "main");
1294
1295        let next = plan
1296            .for_generation(42)
1297            .with_cache_policy(ServerlessCachePolicy {
1298                keep_boot_index_local: true,
1299                keep_hot_snapshot_local: true,
1300                max_hot_bytes: 4096,
1301            });
1302        assert_eq!(next.root, plan.root);
1303        assert_eq!(next.namespace, plan.namespace);
1304        assert_eq!(next.generation, 42);
1305
1306        let cache = next.local_cache();
1307        assert_eq!(
1308            cache.root,
1309            PathBuf::from("/tmp/reddb/main.serverless/main/cache")
1310        );
1311        assert_eq!(cache.generation, 42);
1312
1313        let hot = ServerlessBootPlan::hot(&next);
1314        assert_eq!(
1315            hot.required_first,
1316            vec![
1317                next.boot_index_path(),
1318                next.hot_snapshot_path(),
1319                next.wal_tail_path(),
1320            ]
1321        );
1322        assert_eq!(
1323            hot.lazy_after_open,
1324            vec![
1325                next.manifest_path(),
1326                next.collection_data_path(),
1327                next.secondary_index_path(),
1328            ]
1329        );
1330    }
1331
1332    #[test]
1333    fn publish_generation_pointer_rejects_manifest_identity_mismatch() {
1334        let root = temp_root("serverless-pointer-identity");
1335        let plan = ServerlessFilePlan::new(&root, "db", 10);
1336
1337        let wrong_namespace = ServerlessManifest::new("other", 10);
1338        let err = plan
1339            .publish_generation_pointer(&wrong_namespace)
1340            .expect_err("namespace mismatch rejected before publish");
1341        assert!(err.to_string().contains("namespace"), "{err}");
1342
1343        let wrong_generation = ServerlessManifest::new("db", 11);
1344        let err = plan
1345            .publish_generation_pointer(&wrong_generation)
1346            .expect_err("generation mismatch rejected before publish");
1347        assert!(err.to_string().contains("generation"), "{err}");
1348
1349        assert!(!plan.current_pointer_path().exists());
1350        let _ = std::fs::remove_dir_all(root);
1351    }
1352
1353    #[test]
1354    fn verified_current_pointer_rejects_pointer_shape_and_manifest_identity() {
1355        let root = temp_root("serverless-current-pointer-shape");
1356        let plan = ServerlessFilePlan::new(&root, "db", 10);
1357        let manifest = ServerlessManifest::new("db", 10);
1358        let manifest_bytes = manifest.encode();
1359
1360        let namespace_mismatch = ServerlessGenerationPointer {
1361            namespace: "other".to_string(),
1362            generation: 10,
1363            manifest_relative_path: PathBuf::from("g00000000000000000010/manifest.redpack"),
1364            manifest_bytes: manifest_bytes.len() as u64,
1365            manifest_checksum: crc32(&manifest_bytes),
1366            manifest_content_hash: ServerlessContentHash::from_bytes(&manifest_bytes),
1367        };
1368        namespace_mismatch
1369            .write_to_path(plan.current_pointer_path())
1370            .expect("write namespace mismatch pointer");
1371        let err = plan
1372            .read_current_pointer_verified()
1373            .expect_err("pointer namespace mismatch rejected");
1374        assert!(err.to_string().contains("namespace"), "{err}");
1375
1376        let path_mismatch = ServerlessGenerationPointer {
1377            namespace: "db".to_string(),
1378            generation: 10,
1379            manifest_relative_path: PathBuf::from("g00000000000000000010/boot-index.redpack"),
1380            manifest_bytes: manifest_bytes.len() as u64,
1381            manifest_checksum: crc32(&manifest_bytes),
1382            manifest_content_hash: ServerlessContentHash::from_bytes(&manifest_bytes),
1383        };
1384        path_mismatch
1385            .write_to_path(plan.current_pointer_path())
1386            .expect("write path mismatch pointer");
1387        let err = plan
1388            .read_current_pointer_verified()
1389            .expect_err("pointer manifest path mismatch rejected");
1390        assert!(err.to_string().contains("manifest path"), "{err}");
1391
1392        let wrong_manifest_namespace = ServerlessManifest::new("other", 10);
1393        wrong_manifest_namespace
1394            .write_to_path(plan.manifest_path())
1395            .expect("write namespace mismatched manifest");
1396        let pointer = ServerlessGenerationPointer::from_manifest(&plan, &wrong_manifest_namespace);
1397        pointer
1398            .write_to_path(plan.current_pointer_path())
1399            .expect("write pointer");
1400        let err = plan
1401            .read_current_pointer_verified()
1402            .expect_err("manifest namespace mismatch rejected");
1403        assert!(err.to_string().contains("manifest namespace"), "{err}");
1404
1405        let wrong_generation_manifest = ServerlessManifest::new("db", 11);
1406        let wrong_generation_bytes = wrong_generation_manifest.encode();
1407        std::fs::write(plan.manifest_path(), &wrong_generation_bytes).expect("write manifest");
1408        let generation_mismatch = ServerlessGenerationPointer {
1409            namespace: "db".to_string(),
1410            generation: 10,
1411            manifest_relative_path: PathBuf::from("g00000000000000000010/manifest.redpack"),
1412            manifest_bytes: wrong_generation_bytes.len() as u64,
1413            manifest_checksum: crc32(&wrong_generation_bytes),
1414            manifest_content_hash: ServerlessContentHash::from_bytes(&wrong_generation_bytes),
1415        };
1416        generation_mismatch
1417            .write_to_path(plan.current_pointer_path())
1418            .expect("write generation mismatch pointer");
1419        let err = plan
1420            .read_current_pointer_verified()
1421            .expect_err("manifest generation mismatch rejected");
1422        assert!(err.to_string().contains("manifest generation"), "{err}");
1423
1424        let _ = std::fs::remove_dir_all(root);
1425    }
1426
1427    #[test]
1428    fn validate_complete_generation_rejects_pack_and_manifest_metadata_mismatch() {
1429        let root = temp_root("serverless-validate-metadata");
1430        let plan = ServerlessFilePlan::new(&root, "db", 16);
1431        let mut extent_index = ServerlessExtentIndex::new(16);
1432        extent_index.push(
1433            ServerlessExtentRef::new(
1434                "events",
1435                b"a".to_vec(),
1436                b"z".to_vec(),
1437                "collection-data.redpack",
1438                0,
1439                b"collection-bytes",
1440                true,
1441            )
1442            .expect("extent"),
1443        );
1444        plan.publish_core_generation(&extent_index, b"collection-bytes", b"secondary-bytes")
1445            .expect("publish complete generation");
1446        let manifest =
1447            ServerlessManifest::read_from_path(plan.manifest_path()).expect("read manifest");
1448
1449        let mut wrong_bytes = manifest.clone();
1450        wrong_bytes.entries[0].bytes += 1;
1451        let err = plan
1452            .validate_complete_generation(&wrong_bytes)
1453            .expect_err("pack byte length mismatch rejected");
1454        assert!(err.to_string().contains("expected"), "{err}");
1455
1456        let mut wrong_hash = manifest.clone();
1457        wrong_hash.entries[0].content_hash = ServerlessContentHash([1; CONTENT_HASH_LEN]);
1458        let err = plan
1459            .validate_complete_generation(&wrong_hash)
1460            .expect_err("pack content hash mismatch rejected");
1461        assert!(err.to_string().contains("content hash mismatch"), "{err}");
1462
1463        let mut manifest_mismatch = manifest.clone();
1464        manifest_mismatch.namespace = "other".to_string();
1465        let err = plan
1466            .validate_complete_generation(&manifest_mismatch)
1467            .expect_err("manifest-on-disk mismatch rejected");
1468        assert!(err.to_string().contains("manifest on disk"), "{err}");
1469
1470        let _ = std::fs::remove_dir_all(root);
1471    }
1472
1473    #[test]
1474    fn writer_lease_json_round_trips_and_preserves_fencing_token() {
1475        let lease = ServerlessWriterLease {
1476            database_key: "main".to_string(),
1477            holder_id: "writer-a".to_string(),
1478            term: 7,
1479            generation: 3,
1480            acquired_at_ms: 100,
1481            expires_at_ms: 200,
1482        };
1483
1484        let decoded = decode_serverless_writer_lease_json(
1485            &encode_serverless_writer_lease_json(&lease).expect("encode lease"),
1486        )
1487        .expect("decode lease");
1488
1489        assert_eq!(decoded, lease);
1490        assert_eq!(decoded.fencing_token(), (7, 3));
1491        assert!(!decoded.is_expired(199));
1492        assert!(decoded.is_expired(200));
1493        assert!(decoded.fenced_by_term(8));
1494    }
1495
1496    #[test]
1497    fn writer_lease_json_decodes_legacy_missing_term_as_base_term() {
1498        let decoded = decode_serverless_writer_lease_json(
1499            br#"{
1500                "database_key": "main",
1501                "holder_id": "writer-a",
1502                "generation": 3,
1503                "acquired_at_ms": 100,
1504                "expires_at_ms": 200
1505            }"#,
1506        )
1507        .expect("decode legacy lease");
1508
1509        assert_eq!(decoded.term, SERVERLESS_WRITER_LEASE_DEFAULT_TERM);
1510    }
1511
1512    #[test]
1513    fn writer_lease_artifact_names_are_deterministic() {
1514        assert_eq!(
1515            serverless_writer_lease_key("leases/", "main"),
1516            "leases/main.lease.json"
1517        );
1518        assert_eq!(
1519            serverless_writer_lease_temp_path("write", 10, 20, 30)
1520                .file_name()
1521                .and_then(|name| name.to_str()),
1522            Some("reddb-lease-write-10-20-30.json")
1523        );
1524    }
1525
1526    #[test]
1527    fn writer_lease_temp_file_round_trips_and_cleans_up() {
1528        let temp = ServerlessWriterLeaseTempFile::with_clock("write", 10, 20, 30);
1529        assert_eq!(
1530            temp.path().file_name().and_then(|name| name.to_str()),
1531            Some("reddb-lease-write-10-20-30.json")
1532        );
1533
1534        temp.write_bytes(b"{\"lease\":true}")
1535            .expect("write lease temp");
1536        assert_eq!(
1537            temp.read_bytes().expect("read lease temp"),
1538            b"{\"lease\":true}"
1539        );
1540        temp.cleanup().expect("cleanup lease temp");
1541        assert!(!temp.path().exists());
1542    }
1543
1544    fn temp_root(label: &str) -> PathBuf {
1545        std::env::temp_dir().join(format!(
1546            "reddb-file-{label}-{}-{}",
1547            std::process::id(),
1548            std::time::SystemTime::now()
1549                .duration_since(std::time::UNIX_EPOCH)
1550                .unwrap()
1551                .as_nanos()
1552        ))
1553    }
1554}