1use std::fs::OpenOptions;
2use std::io::{Seek, SeekFrom, Write};
3use std::path::Path;
4
5use crate::error::ChangesetError;
6use crate::physical::{self, PhysicalChangeset};
7
8pub fn apply_physical(
15 db_path: &Path,
16 changeset: &PhysicalChangeset,
17 expected_prev_checksum: u64,
18) -> Result<u64, ChangesetError> {
19 physical::verify_chain(expected_prev_checksum, changeset)?;
21
22 if changeset.pages.is_empty() {
24 return Ok(changeset.checksum);
25 }
26
27 let page_size = changeset.header.page_size as u64;
28
29 let mut file = OpenOptions::new()
31 .write(true)
32 .create(true)
33 .open(db_path)
34 .map_err(ChangesetError::Io)?;
35
36 for page in &changeset.pages {
37 let offset = page.page_id.to_u64() * page_size;
38 file.seek(SeekFrom::Start(offset))
39 .map_err(ChangesetError::Io)?;
40 file.write_all(&page.data)
41 .map_err(ChangesetError::Io)?;
42 }
43
44 file.sync_all().map_err(ChangesetError::Io)?;
45
46 Ok(changeset.checksum)
47}
48
49#[cfg(test)]
50mod tests {
51 use super::*;
52 use crate::physical::{PageEntry, PageId, PageIdSize, PhysicalChangeset};
53 use std::io::Read;
54 use tempfile::NamedTempFile;
55
56 fn page_u32(id: u32, fill: u8, len: usize) -> PageEntry {
57 PageEntry { page_id: PageId::U32(id), data: vec![fill; len] }
58 }
59
60 fn page_u64(id: u64, fill: u8, len: usize) -> PageEntry {
61 PageEntry { page_id: PageId::U64(id), data: vec![fill; len] }
62 }
63
64 #[test]
65 fn test_apply_single_changeset() {
66 let tmp = NamedTempFile::new().unwrap();
67 let cs = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, vec![
68 page_u64(0, 0xAA, 64),
69 page_u64(1, 0xBB, 128),
70 ]);
71 let checksum = apply_physical(tmp.path(), &cs, 0).unwrap();
72 assert_eq!(checksum, cs.checksum);
73
74 let mut contents = Vec::new();
75 std::fs::File::open(tmp.path()).unwrap().read_to_end(&mut contents).unwrap();
76 assert_eq!(&contents[0..64], &vec![0xAA; 64]);
77 assert_eq!(&contents[262144..262144 + 128], &vec![0xBB; 128]);
78 }
79
80 #[test]
81 fn test_apply_chain() {
82 let tmp = NamedTempFile::new().unwrap();
83 let cs1 = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, vec![page_u64(0, 0xAA, 64)]);
84 let ck1 = apply_physical(tmp.path(), &cs1, 0).unwrap();
85
86 let cs2 = PhysicalChangeset::new(2, ck1, PageIdSize::U64, 262144, vec![page_u64(1, 0xBB, 64)]);
87 let ck2 = apply_physical(tmp.path(), &cs2, ck1).unwrap();
88 assert_ne!(ck1, ck2);
89 }
90
91 #[test]
92 fn test_apply_overwrites_page() {
93 let tmp = NamedTempFile::new().unwrap();
94 let cs1 = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, vec![page_u64(0, 0xAA, 64)]);
95 let ck1 = apply_physical(tmp.path(), &cs1, 0).unwrap();
96
97 let cs2 = PhysicalChangeset::new(2, ck1, PageIdSize::U64, 262144, vec![page_u64(0, 0xBB, 64)]);
98 apply_physical(tmp.path(), &cs2, ck1).unwrap();
99
100 let contents = std::fs::read(tmp.path()).unwrap();
101 assert_eq!(&contents[0..64], &vec![0xBB; 64]);
102 }
103
104 #[test]
105 fn test_apply_empty_changeset() {
106 let tmp = NamedTempFile::new().unwrap();
107 let cs = PhysicalChangeset::new(1, 0, PageIdSize::U32, 4096, vec![]);
108 let checksum = apply_physical(tmp.path(), &cs, 0).unwrap();
109 assert_eq!(checksum, cs.checksum);
110 }
111
112 #[test]
113 fn test_apply_bad_checksum_no_write() {
114 let tmp = NamedTempFile::new().unwrap();
115 let cs = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, vec![page_u64(0, 0xAA, 64)]);
116 let err = apply_physical(tmp.path(), &cs, 999).unwrap_err();
117 assert!(matches!(err, ChangesetError::ChainBroken { .. }));
118 assert!(std::fs::read(tmp.path()).unwrap().is_empty());
119 }
120
121 #[test]
122 fn test_apply_creates_file() {
123 let dir = tempfile::tempdir().unwrap();
124 let path = dir.path().join("new.duckdb");
125 assert!(!path.exists());
126 let cs = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, vec![page_u64(0, 0xAA, 32)]);
127 apply_physical(&path, &cs, 0).unwrap();
128 assert!(path.exists());
129 }
130
131 #[test]
132 fn test_apply_extends_file() {
133 let tmp = NamedTempFile::new().unwrap();
134 let cs = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, vec![page_u64(10, 0xFF, 64)]);
135 apply_physical(tmp.path(), &cs, 0).unwrap();
136 let contents = std::fs::read(tmp.path()).unwrap();
137 let offset = 10 * 262144;
138 assert!(contents.len() >= offset + 64);
139 assert_eq!(&contents[offset..offset + 64], &vec![0xFF; 64]);
140 }
141
142 #[test]
143 fn test_apply_u32_pages_4kb() {
144 let tmp = NamedTempFile::new().unwrap();
145 let cs = PhysicalChangeset::new(1, 0, PageIdSize::U32, 4096, vec![
146 page_u32(1, 0xAA, 4096),
147 page_u32(2, 0xBB, 4096),
148 ]);
149 apply_physical(tmp.path(), &cs, 0).unwrap();
150 let contents = std::fs::read(tmp.path()).unwrap();
151 assert_eq!(&contents[4096..4096 + 4096], &vec![0xAA; 4096]);
152 assert_eq!(&contents[8192..8192 + 4096], &vec![0xBB; 4096]);
153 }
154
155 #[test]
156 fn test_apply_partial_page() {
157 let tmp = NamedTempFile::new().unwrap();
158 let cs = PhysicalChangeset::new(1, 0, PageIdSize::U32, 4096, vec![page_u32(1, 0xDD, 1000)]);
159 apply_physical(tmp.path(), &cs, 0).unwrap();
160 let contents = std::fs::read(tmp.path()).unwrap();
161 assert_eq!(contents.len(), 4096 + 1000);
162 }
163}