Skip to main content

hadb_changeset/
apply.rs

1use std::fs::OpenOptions;
2use std::io::{Seek, SeekFrom, Write};
3use std::path::Path;
4
5use crate::error::ChangesetError;
6use crate::physical::{self, PhysicalChangeset};
7
8/// Apply a physical changeset to a local database file.
9///
10/// For each page entry, writes data at `page_id * page_size` via positioned writes.
11/// Verifies the checksum chain before writing anything (fail-fast).
12///
13/// Returns the changeset's checksum (for chaining to the next changeset).
14pub fn apply_physical(
15    db_path: &Path,
16    changeset: &PhysicalChangeset,
17    expected_prev_checksum: u64,
18) -> Result<u64, ChangesetError> {
19    // Verify checksum chain before writing anything
20    physical::verify_chain(expected_prev_checksum, changeset)?;
21
22    // Empty changeset (no dirty pages) -- valid, just return checksum
23    if changeset.pages.is_empty() {
24        return Ok(changeset.checksum);
25    }
26
27    let page_size = changeset.header.page_size as u64;
28
29    // Open file for writing. Create if it doesn't exist (initial restore).
30    let mut file = OpenOptions::new()
31        .write(true)
32        .create(true)
33        .open(db_path)
34        .map_err(ChangesetError::Io)?;
35
36    for page in &changeset.pages {
37        let offset = page.page_id.to_u64() * page_size;
38        file.seek(SeekFrom::Start(offset))
39            .map_err(ChangesetError::Io)?;
40        file.write_all(&page.data)
41            .map_err(ChangesetError::Io)?;
42    }
43
44    file.sync_all().map_err(ChangesetError::Io)?;
45
46    Ok(changeset.checksum)
47}
48
49#[cfg(test)]
50mod tests {
51    use super::*;
52    use crate::physical::{PageEntry, PageId, PageIdSize, PhysicalChangeset};
53    use std::io::Read;
54    use tempfile::NamedTempFile;
55
56    fn page_u32(id: u32, fill: u8, len: usize) -> PageEntry {
57        PageEntry { page_id: PageId::U32(id), data: vec![fill; len] }
58    }
59
60    fn page_u64(id: u64, fill: u8, len: usize) -> PageEntry {
61        PageEntry { page_id: PageId::U64(id), data: vec![fill; len] }
62    }
63
64    #[test]
65    fn test_apply_single_changeset() {
66        let tmp = NamedTempFile::new().unwrap();
67        let cs = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, vec![
68            page_u64(0, 0xAA, 64),
69            page_u64(1, 0xBB, 128),
70        ]);
71        let checksum = apply_physical(tmp.path(), &cs, 0).unwrap();
72        assert_eq!(checksum, cs.checksum);
73
74        let mut contents = Vec::new();
75        std::fs::File::open(tmp.path()).unwrap().read_to_end(&mut contents).unwrap();
76        assert_eq!(&contents[0..64], &vec![0xAA; 64]);
77        assert_eq!(&contents[262144..262144 + 128], &vec![0xBB; 128]);
78    }
79
80    #[test]
81    fn test_apply_chain() {
82        let tmp = NamedTempFile::new().unwrap();
83        let cs1 = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, vec![page_u64(0, 0xAA, 64)]);
84        let ck1 = apply_physical(tmp.path(), &cs1, 0).unwrap();
85
86        let cs2 = PhysicalChangeset::new(2, ck1, PageIdSize::U64, 262144, vec![page_u64(1, 0xBB, 64)]);
87        let ck2 = apply_physical(tmp.path(), &cs2, ck1).unwrap();
88        assert_ne!(ck1, ck2);
89    }
90
91    #[test]
92    fn test_apply_overwrites_page() {
93        let tmp = NamedTempFile::new().unwrap();
94        let cs1 = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, vec![page_u64(0, 0xAA, 64)]);
95        let ck1 = apply_physical(tmp.path(), &cs1, 0).unwrap();
96
97        let cs2 = PhysicalChangeset::new(2, ck1, PageIdSize::U64, 262144, vec![page_u64(0, 0xBB, 64)]);
98        apply_physical(tmp.path(), &cs2, ck1).unwrap();
99
100        let contents = std::fs::read(tmp.path()).unwrap();
101        assert_eq!(&contents[0..64], &vec![0xBB; 64]);
102    }
103
104    #[test]
105    fn test_apply_empty_changeset() {
106        let tmp = NamedTempFile::new().unwrap();
107        let cs = PhysicalChangeset::new(1, 0, PageIdSize::U32, 4096, vec![]);
108        let checksum = apply_physical(tmp.path(), &cs, 0).unwrap();
109        assert_eq!(checksum, cs.checksum);
110    }
111
112    #[test]
113    fn test_apply_bad_checksum_no_write() {
114        let tmp = NamedTempFile::new().unwrap();
115        let cs = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, vec![page_u64(0, 0xAA, 64)]);
116        let err = apply_physical(tmp.path(), &cs, 999).unwrap_err();
117        assert!(matches!(err, ChangesetError::ChainBroken { .. }));
118        assert!(std::fs::read(tmp.path()).unwrap().is_empty());
119    }
120
121    #[test]
122    fn test_apply_creates_file() {
123        let dir = tempfile::tempdir().unwrap();
124        let path = dir.path().join("new.duckdb");
125        assert!(!path.exists());
126        let cs = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, vec![page_u64(0, 0xAA, 32)]);
127        apply_physical(&path, &cs, 0).unwrap();
128        assert!(path.exists());
129    }
130
131    #[test]
132    fn test_apply_extends_file() {
133        let tmp = NamedTempFile::new().unwrap();
134        let cs = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, vec![page_u64(10, 0xFF, 64)]);
135        apply_physical(tmp.path(), &cs, 0).unwrap();
136        let contents = std::fs::read(tmp.path()).unwrap();
137        let offset = 10 * 262144;
138        assert!(contents.len() >= offset + 64);
139        assert_eq!(&contents[offset..offset + 64], &vec![0xFF; 64]);
140    }
141
142    #[test]
143    fn test_apply_u32_pages_4kb() {
144        let tmp = NamedTempFile::new().unwrap();
145        let cs = PhysicalChangeset::new(1, 0, PageIdSize::U32, 4096, vec![
146            page_u32(1, 0xAA, 4096),
147            page_u32(2, 0xBB, 4096),
148        ]);
149        apply_physical(tmp.path(), &cs, 0).unwrap();
150        let contents = std::fs::read(tmp.path()).unwrap();
151        assert_eq!(&contents[4096..4096 + 4096], &vec![0xAA; 4096]);
152        assert_eq!(&contents[8192..8192 + 4096], &vec![0xBB; 4096]);
153    }
154
155    #[test]
156    fn test_apply_partial_page() {
157        let tmp = NamedTempFile::new().unwrap();
158        let cs = PhysicalChangeset::new(1, 0, PageIdSize::U32, 4096, vec![page_u32(1, 0xDD, 1000)]);
159        apply_physical(tmp.path(), &cs, 0).unwrap();
160        let contents = std::fs::read(tmp.path()).unwrap();
161        assert_eq!(contents.len(), 4096 + 1000);
162    }
163}