1use anyhow::{anyhow, Result};
7use litepages::{Checksum, Decoder, Encoder, Header, HeaderFlags, PageNum, PageSize, TXID};
8use std::io::{Read, Write};
9use std::path::Path;
10use std::time::SystemTime;
11
12pub fn encode_snapshot<W: Write>(
14 writer: W,
15 db_path: &Path,
16 page_size: u32,
17 txid: u64,
18) -> Result<()> {
19 let db_data = std::fs::read(db_path)?;
20 let page_size_val = PageSize::new(page_size).map_err(|e| anyhow!("Invalid page size: {}", e))?;
21 let num_pages = db_data.len() / page_size as usize;
22
23 let header = Header {
24 flags: HeaderFlags::COMPRESS_LZ4,
25 page_size: page_size_val,
26 commit: PageNum::new(num_pages as u32).map_err(|e| anyhow!("Invalid page count: {}", e))?,
27 min_txid: TXID::ONE, max_txid: TXID::new(txid).map_err(|e| anyhow!("Invalid TXID: {}", e))?,
29 timestamp: SystemTime::now(),
30 pre_apply_checksum: None,
31 };
32
33 let mut encoder = Encoder::new(writer, &header)?;
34
35 for i in 0..num_pages {
37 let page_num = PageNum::new((i + 1) as u32).map_err(|e| anyhow!("Invalid page num: {}", e))?;
38 let start = i * page_size as usize;
39 let end = start + page_size as usize;
40 let page_data = &db_data[start..end];
41
42 encoder.encode_page(page_num, page_data)?;
43 }
44
45 let checksum = compute_db_checksum(&db_data);
47 encoder.finish(checksum)?;
48
49 Ok(())
50}
51
52#[derive(Debug)]
54pub struct DecodeResult {
55 pub header: Header,
56 pub post_apply_checksum: Checksum,
57}
58
59pub fn decode_to_db<R: Read>(reader: R, output_path: &Path) -> Result<DecodeResult> {
67 let (mut decoder, header) = Decoder::new(reader)?;
68
69 let skip_checksums = header.flags.contains(HeaderFlags::NO_CHECKSUM);
71
72 if skip_checksums {
73 tracing::debug!(
74 "Skipping checksum verification (NO_CHECKSUM flag set - litestream compatibility)"
75 );
76 }
77
78 let page_size = header.page_size.into_inner() as usize;
79 let num_pages = header.commit.into_inner() as usize;
80
81 let mut db_data = vec![0u8; num_pages * page_size];
82 let mut page_buf = vec![0u8; page_size];
83
84 while let Some(page_num) = decoder.decode_page(&mut page_buf)? {
85 let idx = (page_num.into_inner() - 1) as usize;
86 let start = idx * page_size;
87 db_data[start..start + page_size].copy_from_slice(&page_buf);
88 }
89
90 let trailer = decoder.finish()?;
92
93 std::fs::write(output_path, &db_data)?;
95
96 let actual_checksum = compute_db_checksum(&db_data);
98
99 if !skip_checksums {
101 if trailer.post_apply_checksum != actual_checksum {
102 return Err(anyhow!(
103 "Post-apply checksum mismatch after decode: expected {:016x}, got {:016x}. \
104 This may indicate corruption in the LTX file.",
105 trailer.post_apply_checksum.into_inner(),
106 actual_checksum.into_inner()
107 ));
108 }
109 tracing::debug!(
110 "Post-apply checksum verified: {:016x}",
111 actual_checksum.into_inner()
112 );
113 }
114
115 tracing::debug!(
116 "Decoded snapshot (TXID {}-{})",
117 header.min_txid.into_inner(),
118 header.max_txid.into_inner()
119 );
120
121 Ok(DecodeResult {
122 header,
123 post_apply_checksum: actual_checksum,
124 })
125}
126
127#[derive(Debug)]
129pub struct ApplyResult {
130 pub header: Header,
131 pub post_apply_checksum: Checksum,
132}
133
134pub fn apply_ltx_to_db<R: Read>(reader: R, db_path: &Path) -> Result<ApplyResult> {
145 use std::fs::OpenOptions;
146 use std::io::{Seek, SeekFrom, Write as IoWrite};
147
148 let (mut decoder, header) = Decoder::new(reader)?;
149
150 let skip_checksums = header.flags.contains(HeaderFlags::NO_CHECKSUM);
152
153 if skip_checksums {
154 tracing::debug!(
155 "Skipping checksum verification (NO_CHECKSUM flag set - litestream compatibility)"
156 );
157 }
158
159 let page_size = header.page_size.into_inner() as usize;
160 let mut page_buf = vec![0u8; page_size];
161
162 let mut file = OpenOptions::new()
164 .write(true)
165 .open(db_path)
166 .map_err(|e| anyhow!("Failed to open database for in-place apply: {}", e))?;
167
168 let mut chain_hasher = header.pre_apply_checksum.map(ChainHasher::new);
171
172 while let Some(page_num) = decoder.decode_page(&mut page_buf)? {
173 let offset = (page_num.into_inner() as u64 - 1) * page_size as u64;
174 file.seek(SeekFrom::Start(offset))?;
175 file.write_all(&page_buf)?;
176 if let Some(ref mut hasher) = chain_hasher {
177 hasher.update(page_num.into_inner(), &page_buf);
178 }
179 }
180
181 file.sync_all()?;
183 drop(file);
184
185 let trailer = decoder.finish()?;
187
188 let page_count = chain_hasher.as_ref().map(|h| h.page_count()).unwrap_or(0);
190
191 let post_checksum = if !skip_checksums {
193 if let Some(expected_pre) = header.pre_apply_checksum {
194 tracing::debug!(
195 "Pre-apply checksum: {:016x}",
196 expected_pre.into_inner()
197 );
198 }
199
200 if let Some(hasher) = chain_hasher {
201 let expected_post = hasher.finish();
202 if trailer.post_apply_checksum != expected_post {
203 return Err(anyhow!(
204 "Post-apply checksum mismatch: expected {:016x}, got {:016x}. \
205 This may indicate corruption during apply.",
206 trailer.post_apply_checksum.into_inner(),
207 expected_post.into_inner()
208 ));
209 }
210 tracing::debug!(
211 "Post-apply checksum verified (chain): {:016x}",
212 expected_post.into_inner()
213 );
214 expected_post
215 } else {
216 trailer.post_apply_checksum
217 }
218 } else {
219 if let Some(hasher) = chain_hasher {
220 hasher.finish()
221 } else {
222 compute_checksum_from_file(db_path)?
223 }
224 };
225
226 tracing::debug!(
227 "Applied {} pages in-place (TXID {}-{})",
228 page_count,
229 header.min_txid.into_inner(),
230 header.max_txid.into_inner()
231 );
232
233 Ok(ApplyResult {
234 header,
235 post_apply_checksum: post_checksum,
236 })
237}
238
239pub fn compute_checksum_from_file(db_path: &Path) -> Result<Checksum> {
241 let data = std::fs::read(db_path)?;
242 Ok(compute_db_checksum(&data))
243}
244
245pub fn encode_wal_changes<W: Write>(
252 writer: W,
253 pages: &[(u32, Vec<u8>)], page_size: u32,
255 min_txid: u64,
256 max_txid: u64,
257 commit_page: u32,
258 pre_checksum: Option<Checksum>,
259 post_checksum: Checksum,
260) -> Result<Checksum> {
261 let page_size_val = PageSize::new(page_size).map_err(|e| anyhow!("Invalid page size: {}", e))?;
262
263 let header = Header {
264 flags: HeaderFlags::COMPRESS_LZ4,
265 page_size: page_size_val,
266 commit: PageNum::new(commit_page).map_err(|e| anyhow!("Invalid commit page: {}", e))?,
267 min_txid: TXID::new(min_txid).map_err(|e| anyhow!("Invalid min TXID: {}", e))?,
268 max_txid: TXID::new(max_txid).map_err(|e| anyhow!("Invalid max TXID: {}", e))?,
269 timestamp: SystemTime::now(),
270 pre_apply_checksum: pre_checksum,
271 };
272
273 let mut encoder = Encoder::new(writer, &header)?;
274
275 let mut indices: Vec<usize> = (0..pages.len()).collect();
277 indices.sort_by_key(|&i| pages[i].0);
278
279 for &i in &indices {
280 let pn = PageNum::new(pages[i].0).map_err(|e| anyhow!("Invalid page num: {}", e))?;
281 encoder.encode_page(pn, &pages[i].1)?;
282 }
283
284 let trailer = encoder.finish(post_checksum)?;
285
286 Ok(trailer.post_apply_checksum)
287}
288
289pub fn chain_checksum(pre: Checksum, pages: &[(u32, Vec<u8>)]) -> Checksum {
294 use sha2::{Digest, Sha256};
295 let mut hasher = Sha256::new();
296 hasher.update(pre.into_inner().to_be_bytes());
297
298 let mut sorted_indices: Vec<usize> = (0..pages.len()).collect();
299 sorted_indices.sort_by_key(|&i| pages[i].0);
300
301 for &i in &sorted_indices {
302 hasher.update(pages[i].0.to_be_bytes());
303 hasher.update(&pages[i].1);
304 }
305
306 let result = hasher.finalize();
307 Checksum::new(u64::from_be_bytes(result[0..8].try_into().unwrap()))
308}
309
310pub struct ChainHasher {
316 hasher: sha2::Sha256,
317 page_count: usize,
318}
319
320impl ChainHasher {
321 pub fn new(pre: Checksum) -> Self {
323 use sha2::Digest;
324 let mut hasher = sha2::Sha256::new();
325 hasher.update(pre.into_inner().to_be_bytes());
326 Self { hasher, page_count: 0 }
327 }
328
329 pub fn update(&mut self, page_num: u32, page_data: &[u8]) {
331 use sha2::Digest;
332 self.hasher.update(page_num.to_be_bytes());
333 self.hasher.update(page_data);
334 self.page_count += 1;
335 }
336
337 pub fn finish(self) -> Checksum {
339 use sha2::Digest;
340 let result = self.hasher.finalize();
341 Checksum::new(u64::from_be_bytes(result[0..8].try_into().unwrap()))
342 }
343
344 pub fn page_count(&self) -> usize {
346 self.page_count
347 }
348}
349
350pub fn verify_ltx<R: Read>(reader: R) -> Result<Header> {
353 let (mut decoder, header) = Decoder::new(reader)?;
354
355 let page_size = header.page_size.into_inner() as usize;
356 let mut page_buf = vec![0u8; page_size];
357
358 while decoder.decode_page(&mut page_buf)?.is_some() {
360 }
362
363 decoder.finish()?;
365
366 Ok(header)
367}
368
369pub fn compute_db_checksum(data: &[u8]) -> Checksum {
371 use sha2::{Digest, Sha256};
372 let mut hasher = Sha256::new();
373 hasher.update(data);
374 let result = hasher.finalize();
375 let hash = u64::from_be_bytes(result[0..8].try_into().unwrap());
377 Checksum::new(hash)
378}
379
380
381#[cfg(test)]
382mod tests {
383 use super::*;
384 use tempfile::tempdir;
385
386 #[test]
387 fn test_snapshot_roundtrip_single_page() {
388 let dir = tempdir().unwrap();
389 let db_path = dir.path().join("test.db");
390 let ltx_path = dir.path().join("test.ltx");
391 let restored_path = dir.path().join("restored.db");
392
393 let page_size = 4096u32;
395 let db_data = vec![0x42u8; page_size as usize];
396 std::fs::write(&db_path, &db_data).unwrap();
397
398 let ltx_file = std::fs::File::create(<x_path).unwrap();
400 encode_snapshot(ltx_file, &db_path, page_size, 1).unwrap();
401
402 let ltx_file = std::fs::File::open(<x_path).unwrap();
404 let result = decode_to_db(ltx_file, &restored_path).unwrap();
405
406 let restored_data = std::fs::read(&restored_path).unwrap();
408 assert_eq!(db_data, restored_data);
409 assert_eq!(result.header.page_size.into_inner(), page_size);
410 assert_eq!(result.header.min_txid.into_inner(), 1);
411 assert_eq!(result.header.max_txid.into_inner(), 1);
412 }
413
414 #[test]
415 fn test_snapshot_roundtrip_multiple_pages() {
416 let dir = tempdir().unwrap();
417 let db_path = dir.path().join("test.db");
418 let restored_path = dir.path().join("restored.db");
419
420 let page_size = 4096u32;
421 let num_pages = 10;
422
423 let mut db_data = Vec::new();
425 for i in 0..num_pages {
426 let mut page = vec![(i as u8).wrapping_mul(17); page_size as usize];
427 page[0..4].copy_from_slice(&(i as u32).to_be_bytes());
429 db_data.extend(page);
430 }
431 std::fs::write(&db_path, &db_data).unwrap();
432
433 let mut ltx_buffer = Vec::new();
435 encode_snapshot(&mut ltx_buffer, &db_path, page_size, 100).unwrap();
436
437 let cursor = std::io::Cursor::new(ltx_buffer);
439 let result = decode_to_db(cursor, &restored_path).unwrap();
440
441 let restored_data = std::fs::read(&restored_path).unwrap();
443 assert_eq!(db_data.len(), restored_data.len());
444 assert_eq!(db_data, restored_data);
445 assert_eq!(result.header.commit.into_inner(), num_pages as u32);
446 assert_eq!(result.header.max_txid.into_inner(), 100);
447 }
448
449 #[test]
450 fn test_snapshot_various_page_sizes() {
451 let dir = tempdir().unwrap();
452
453 for page_size in [512u32, 1024, 2048, 4096, 8192, 16384, 32768] {
454 let db_path = dir.path().join(format!("test_{}.db", page_size));
455 let restored_path = dir.path().join(format!("restored_{}.db", page_size));
456
457 let db_data: Vec<u8> = (0..3)
459 .flat_map(|i| vec![(i * 50) as u8; page_size as usize])
460 .collect();
461 std::fs::write(&db_path, &db_data).unwrap();
462
463 let mut ltx_buffer = Vec::new();
464 encode_snapshot(&mut ltx_buffer, &db_path, page_size, 1).unwrap();
465
466 let cursor = std::io::Cursor::new(ltx_buffer);
467 let result = decode_to_db(cursor, &restored_path).unwrap();
468
469 let restored_data = std::fs::read(&restored_path).unwrap();
470 assert_eq!(
471 db_data, restored_data,
472 "Mismatch for page_size={}",
473 page_size
474 );
475 assert_eq!(result.header.page_size.into_inner(), page_size);
476 }
477 }
478
479 #[test]
480 fn test_snapshot_preserves_binary_data() {
481 let dir = tempdir().unwrap();
482 let db_path = dir.path().join("binary.db");
483 let restored_path = dir.path().join("restored.db");
484
485 let page_size = 4096u32;
486
487 let mut db_data = Vec::new();
489 for page_num in 0..4 {
490 let mut page = vec![0u8; page_size as usize];
491 for (i, byte) in page.iter_mut().enumerate() {
492 *byte = ((page_num * 256 + i) % 256) as u8;
493 }
494 db_data.extend(page);
495 }
496 std::fs::write(&db_path, &db_data).unwrap();
497
498 let mut ltx_buffer = Vec::new();
499 encode_snapshot(&mut ltx_buffer, &db_path, page_size, 50).unwrap();
500
501 let cursor = std::io::Cursor::new(ltx_buffer);
502 decode_to_db(cursor, &restored_path).unwrap();
503
504 let restored_data = std::fs::read(&restored_path).unwrap();
505
506 for (i, (orig, rest)) in db_data.iter().zip(restored_data.iter()).enumerate() {
508 assert_eq!(
509 orig, rest,
510 "Byte mismatch at offset {}: expected 0x{:02x}, got 0x{:02x}",
511 i, orig, rest
512 );
513 }
514 }
515
516 #[test]
517 fn test_incremental_ltx_encoding_with_checksum() {
518 let page_size = 4096u32;
521
522 let pages: Vec<(u32, Vec<u8>)> = vec![
524 (1, vec![0xAA; page_size as usize]),
525 (2, vec![0xBB; page_size as usize]),
526 (3, vec![0xCC; page_size as usize]),
527 ];
528
529 let pre_checksum = Checksum::new(0x123456789ABCDEF0);
531 let expected_post = Checksum::new(0xFEDCBA9876543210);
532
533 let mut ltx_buffer = Vec::new();
534 let checksum = encode_wal_changes(
535 &mut ltx_buffer,
536 &pages,
537 page_size,
538 10, 12, 3, Some(pre_checksum),
542 expected_post,
543 )
544 .unwrap();
545
546 assert_eq!(checksum.into_inner(), expected_post.into_inner());
548
549 assert!(!ltx_buffer.is_empty());
551 assert!(ltx_buffer.len() > 100); }
553
554 #[test]
555 fn test_incremental_ltx_format_rules() {
556 let page_size = 1024u32;
560 let pre_checksum = Checksum::new(0x123456789ABCDEF0);
561
562 let pages: Vec<(u32, Vec<u8>)> = vec![
564 (1, vec![0x11; page_size as usize]),
565 (2, vec![0x22; page_size as usize]),
566 ];
567
568 let expected_post = Checksum::new(0xABCDEF1234567890);
569
570 let mut ltx_buffer = Vec::new();
571 let result = encode_wal_changes(
572 &mut ltx_buffer,
573 &pages,
574 page_size,
575 10, 11, 2,
578 Some(pre_checksum),
579 expected_post,
580 );
581 assert!(
582 result.is_ok(),
583 "Incremental with pre_checksum should succeed: {:?}",
584 result.err()
585 );
586
587 let mut ltx_buffer2 = Vec::new();
589 let result2 = encode_wal_changes(
590 &mut ltx_buffer2,
591 &pages,
592 page_size,
593 10, 11,
595 2,
596 None, expected_post,
598 );
599 assert!(
600 result2.is_err(),
601 "Incremental without pre_checksum should fail"
602 );
603
604 let mut ltx_buffer3 = Vec::new();
606 let result3 = encode_wal_changes(
607 &mut ltx_buffer3,
608 &pages,
609 page_size,
610 1, 2,
612 2,
613 None, expected_post,
615 );
616 assert!(
617 result3.is_ok(),
618 "Snapshot without pre_checksum should succeed: {:?}",
619 result3.err()
620 );
621 }
622
623 #[test]
624 fn test_txid_ranges() {
625 let dir = tempdir().unwrap();
626 let db_path = dir.path().join("test.db");
627 let restored_path = dir.path().join("restored.db");
628
629 let page_size = 4096u32;
630 let db_data = vec![0x42u8; page_size as usize];
631 std::fs::write(&db_path, &db_data).unwrap();
632
633 for txid in [1u64, 100, 1000, 999999, u32::MAX as u64] {
635 let mut ltx_buffer = Vec::new();
636 encode_snapshot(&mut ltx_buffer, &db_path, page_size, txid).unwrap();
637
638 let cursor = std::io::Cursor::new(ltx_buffer);
639 let result = decode_to_db(cursor, &restored_path).unwrap();
640
641 assert_eq!(result.header.max_txid.into_inner(), txid);
642 }
643 }
644
645 #[test]
646 fn test_checksum_computation() {
647 let data1 = b"hello world";
649 let data2 = b"hello world";
650 let data3 = b"hello worlD"; let cs1 = compute_db_checksum(data1);
653 let cs2 = compute_db_checksum(data2);
654 let cs3 = compute_db_checksum(data3);
655
656 assert_eq!(cs1.into_inner(), cs2.into_inner());
657 assert_ne!(cs1.into_inner(), cs3.into_inner());
658 }
659
660
661 #[test]
662 fn test_large_database() {
663 let dir = tempdir().unwrap();
664 let db_path = dir.path().join("large.db");
665 let restored_path = dir.path().join("restored.db");
666
667 let page_size = 4096u32;
668 let num_pages = 100; let mut db_data = Vec::with_capacity(num_pages * page_size as usize);
672 for i in 0..num_pages {
673 let pattern = (i as u8).wrapping_mul(37);
674 let mut page = vec![pattern; page_size as usize];
675 let page_num_bytes = (i as u32).to_le_bytes();
677 page[0..4].copy_from_slice(&page_num_bytes);
678 db_data.extend(page);
679 }
680 std::fs::write(&db_path, &db_data).unwrap();
681
682 let mut ltx_buffer = Vec::new();
683 encode_snapshot(&mut ltx_buffer, &db_path, page_size, 1000).unwrap();
684
685 assert!(
687 ltx_buffer.len() < db_data.len(),
688 "LTX ({}) should be smaller than raw DB ({}) due to compression",
689 ltx_buffer.len(),
690 db_data.len()
691 );
692
693 let cursor = std::io::Cursor::new(ltx_buffer);
694 decode_to_db(cursor, &restored_path).unwrap();
695
696 let restored_data = std::fs::read(&restored_path).unwrap();
697 assert_eq!(db_data, restored_data);
698 }
699
700 #[test]
701 fn test_encode_to_memory_buffer() {
702 let dir = tempdir().unwrap();
703 let db_path = dir.path().join("test.db");
704 let restored_path = dir.path().join("restored.db");
705
706 let page_size = 4096u32;
707 let db_data = vec![0x42u8; page_size as usize * 5];
708 std::fs::write(&db_path, &db_data).unwrap();
709
710 let mut buffer: Vec<u8> = Vec::new();
712 encode_snapshot(&mut buffer, &db_path, page_size, 1).unwrap();
713
714 let cursor = std::io::Cursor::new(buffer);
716 decode_to_db(cursor, &restored_path).unwrap();
717
718 let restored_data = std::fs::read(&restored_path).unwrap();
719 assert_eq!(db_data, restored_data);
720 }
721
722 #[test]
723 fn test_apply_ltx_in_place_basic() {
724 let dir = tempdir().unwrap();
725 let db_path = dir.path().join("test.db");
726
727 let page_size = 4096u32;
728 let num_pages = 5;
729
730 let db_data = vec![0x00u8; (page_size as usize) * num_pages];
732 std::fs::write(&db_path, &db_data).unwrap();
733
734 let pages: Vec<(u32, Vec<u8>)> = vec![
736 (2, vec![0xAA; page_size as usize]),
737 (4, vec![0xBB; page_size as usize]),
738 ];
739
740 let pre_checksum = compute_checksum_from_file(&db_path).unwrap();
741
742 let expected_post = chain_checksum(pre_checksum, &pages);
744
745 let mut ltx_buffer = Vec::new();
746 encode_wal_changes(
747 &mut ltx_buffer,
748 &pages,
749 page_size,
750 2, 3, num_pages as u32,
753 Some(pre_checksum),
754 expected_post,
755 )
756 .unwrap();
757
758 let cursor = std::io::Cursor::new(ltx_buffer);
760 let result = apply_ltx_to_db(cursor, &db_path).unwrap();
761
762 let result_data = std::fs::read(&db_path).unwrap();
764
765 assert_eq!(&result_data[0..page_size as usize], &vec![0x00u8; page_size as usize][..]);
767 let page2_start = page_size as usize;
769 assert_eq!(&result_data[page2_start..page2_start + page_size as usize], &vec![0xAAu8; page_size as usize][..]);
770 let page3_start = 2 * page_size as usize;
772 assert_eq!(&result_data[page3_start..page3_start + page_size as usize], &vec![0x00u8; page_size as usize][..]);
773 let page4_start = 3 * page_size as usize;
775 assert_eq!(&result_data[page4_start..page4_start + page_size as usize], &vec![0xBBu8; page_size as usize][..]);
776 let page5_start = 4 * page_size as usize;
778 assert_eq!(&result_data[page5_start..page5_start + page_size as usize], &vec![0x00u8; page_size as usize][..]);
779
780 assert_eq!(result.header.min_txid.into_inner(), 2);
781 assert_eq!(result.header.max_txid.into_inner(), 3);
782 }
783
784 #[test]
785 fn test_apply_ltx_in_place_preserves_other_data() {
786 let dir = tempdir().unwrap();
787 let db_path = dir.path().join("test.db");
788
789 let page_size = 4096u32;
790
791 let mut db_data = Vec::new();
793 for i in 0..4u8 {
794 db_data.extend(vec![i * 10; page_size as usize]);
795 }
796 std::fs::write(&db_path, &db_data).unwrap();
797
798 let pages: Vec<(u32, Vec<u8>)> = vec![
800 (3, vec![0xFF; page_size as usize]),
801 ];
802
803 let pre_checksum = compute_checksum_from_file(&db_path).unwrap();
804
805 let expected_post = chain_checksum(pre_checksum, &pages);
807
808 let mut ltx_buffer = Vec::new();
809 encode_wal_changes(&mut ltx_buffer, &pages, page_size, 10, 11, 4, Some(pre_checksum), expected_post).unwrap();
810
811 let cursor = std::io::Cursor::new(ltx_buffer);
812 apply_ltx_to_db(cursor, &db_path).unwrap();
813
814 let result_data = std::fs::read(&db_path).unwrap();
815
816 assert_eq!(&result_data[0..page_size as usize], &vec![0u8; page_size as usize][..]);
818 assert_eq!(&result_data[page_size as usize..2 * page_size as usize], &vec![10u8; page_size as usize][..]);
819 assert_eq!(&result_data[2 * page_size as usize..3 * page_size as usize], &vec![0xFFu8; page_size as usize][..]);
821 assert_eq!(&result_data[3 * page_size as usize..4 * page_size as usize], &vec![30u8; page_size as usize][..]);
823 }
824
825 #[test]
826 fn test_compute_checksum_from_file() {
827 let dir = tempdir().unwrap();
828 let db_path = dir.path().join("test.db");
829
830 let data = vec![0x42u8; 4096];
831 std::fs::write(&db_path, &data).unwrap();
832
833 let checksum1 = compute_checksum_from_file(&db_path).unwrap();
834 let checksum2 = compute_checksum_from_file(&db_path).unwrap();
835
836 assert_eq!(checksum1.into_inner(), checksum2.into_inner());
838
839 std::fs::write(&db_path, vec![0x43u8; 4096]).unwrap();
841 let checksum3 = compute_checksum_from_file(&db_path).unwrap();
842 assert_ne!(checksum1.into_inner(), checksum3.into_inner());
843 }
844
845 #[test]
846 fn test_apply_ltx_chain_simulation() {
847 let dir = tempdir().unwrap();
849 let db_path = dir.path().join("test.db");
850
851 let page_size = 4096u32;
852 let num_pages = 3;
853
854 let initial_data: Vec<u8> = (0..num_pages)
856 .flat_map(|i| vec![(i as u8) * 10; page_size as usize])
857 .collect();
858 std::fs::write(&db_path, &initial_data).unwrap();
859
860 let mut snapshot_buffer = Vec::new();
862 encode_snapshot(&mut snapshot_buffer, &db_path, page_size, 1).unwrap();
863
864 let pre_checksum1 = compute_checksum_from_file(&db_path).unwrap();
866 let pages1: Vec<(u32, Vec<u8>)> = vec![(1, vec![0xAA; page_size as usize])];
867 let expected_post1 = chain_checksum(pre_checksum1, &pages1);
868
869 let mut inc1_buffer = Vec::new();
870 let post_checksum1 = encode_wal_changes(
871 &mut inc1_buffer,
872 &pages1,
873 page_size,
874 2, 2,
875 num_pages as u32,
876 Some(pre_checksum1),
877 expected_post1,
878 ).unwrap();
879
880 let cursor1 = std::io::Cursor::new(inc1_buffer);
882 let result1 = apply_ltx_to_db(cursor1, &db_path).unwrap();
883
884 let pre_checksum2 = result1.post_apply_checksum;
886
887 let pages2: Vec<(u32, Vec<u8>)> = vec![(2, vec![0xBB; page_size as usize])];
889 let expected_post2 = chain_checksum(pre_checksum2, &pages2);
890
891 let mut inc2_buffer = Vec::new();
892 encode_wal_changes(
893 &mut inc2_buffer,
894 &pages2,
895 page_size,
896 3, 3,
897 num_pages as u32,
898 Some(pre_checksum2),
899 expected_post2,
900 ).unwrap();
901
902 let cursor2 = std::io::Cursor::new(inc2_buffer);
904 apply_ltx_to_db(cursor2, &db_path).unwrap();
905
906 let final_data = std::fs::read(&db_path).unwrap();
908 assert_eq!(&final_data[0..page_size as usize], &vec![0xAAu8; page_size as usize][..]); assert_eq!(&final_data[page_size as usize..2 * page_size as usize], &vec![0xBBu8; page_size as usize][..]); assert_eq!(&final_data[2 * page_size as usize..3 * page_size as usize], &vec![20u8; page_size as usize][..]); }
912
913 #[test]
918 fn test_apply_ltx_post_checksum_mismatch_via_wrong_pre() {
919 let dir = tempdir().unwrap();
923 let db_path = dir.path().join("test.db");
924 let page_size = 4096u32;
925
926 let initial_data = vec![0x00u8; page_size as usize * 3];
927 std::fs::write(&db_path, &initial_data).unwrap();
928
929 let pages: Vec<(u32, Vec<u8>)> = vec![(1, vec![0xAA; page_size as usize])];
930 let wrong_pre_checksum = Checksum::new(0xDEADBEEF);
931 let wrong_post = chain_checksum(wrong_pre_checksum, &pages);
932
933 let mut ltx_buffer = Vec::new();
934 encode_wal_changes(
935 &mut ltx_buffer,
936 &pages,
937 page_size,
938 2, 2,
939 3,
940 Some(wrong_pre_checksum),
941 wrong_post,
942 ).unwrap();
943
944 let cursor = std::io::Cursor::new(ltx_buffer);
946 let result = apply_ltx_to_db(cursor, &db_path);
947 assert!(result.is_ok(), "Self-consistent LTX should apply successfully");
948 }
949
950 #[test]
951 fn test_apply_ltx_post_checksum_mismatch() {
952 let dir = tempdir().unwrap();
954 let db_path = dir.path().join("test.db");
955 let page_size = 4096u32;
956
957 let initial_data = vec![0x00u8; page_size as usize * 3];
959 std::fs::write(&db_path, &initial_data).unwrap();
960
961 let pre_checksum = compute_checksum_from_file(&db_path).unwrap();
962
963 let pages: Vec<(u32, Vec<u8>)> = vec![(1, vec![0xAA; page_size as usize])];
965
966 let wrong_post_checksum = Checksum::new(0xBADC0FFEE);
968
969 let mut ltx_buffer = Vec::new();
970 encode_wal_changes(
971 &mut ltx_buffer,
972 &pages,
973 page_size,
974 2, 2,
975 3,
976 Some(pre_checksum),
977 wrong_post_checksum, ).unwrap();
979
980 let cursor = std::io::Cursor::new(ltx_buffer);
982 let result = apply_ltx_to_db(cursor, &db_path);
983
984 assert!(result.is_err(), "Should detect post-apply checksum mismatch");
985 let err_msg = result.unwrap_err().to_string();
986 assert!(err_msg.contains("Post-apply checksum mismatch"), "Error should mention post-apply mismatch: {}", err_msg);
987 assert!(err_msg.contains(&format!("{:016x}", wrong_post_checksum.into_inner())), "Error should show expected checksum");
988 }
989
990 #[test]
991 fn test_apply_ltx_out_of_order() {
992 let dir = tempdir().unwrap();
997 let db_path = dir.path().join("test.db");
998 let page_size = 4096u32;
999
1000 let initial_data = vec![0x00u8; page_size as usize * 3];
1001 std::fs::write(&db_path, &initial_data).unwrap();
1002
1003 let checksum0 = compute_checksum_from_file(&db_path).unwrap();
1004
1005 let pages1: Vec<(u32, Vec<u8>)> = vec![(1, vec![0xAA; page_size as usize])];
1007 let post1 = chain_checksum(checksum0, &pages1);
1008 let mut buf1 = Vec::new();
1009 encode_wal_changes(&mut buf1, &pages1, page_size, 2, 2, 3, Some(checksum0), post1).unwrap();
1010
1011 let pages2: Vec<(u32, Vec<u8>)> = vec![(2, vec![0xBB; page_size as usize])];
1012 let post2 = chain_checksum(post1, &pages2);
1013 let mut buf2 = Vec::new();
1014 encode_wal_changes(&mut buf2, &pages2, page_size, 3, 3, 3, Some(post1), post2).unwrap();
1015
1016 let pages3: Vec<(u32, Vec<u8>)> = vec![(3, vec![0xCC; page_size as usize])];
1017 let post3 = chain_checksum(post2, &pages3);
1018 let mut buf3 = Vec::new();
1019 encode_wal_changes(&mut buf3, &pages3, page_size, 4, 4, 3, Some(post2), post3).unwrap();
1020
1021 apply_ltx_to_db(std::io::Cursor::new(&buf1), &db_path).unwrap();
1023 let result1 = apply_ltx_to_db(std::io::Cursor::new(&buf1), &db_path).unwrap();
1024
1025 let result3 = apply_ltx_to_db(std::io::Cursor::new(&buf3), &db_path).unwrap();
1027
1028 let buf3_pre = result3.header.pre_apply_checksum.unwrap();
1030 assert_ne!(
1031 result1.post_apply_checksum, buf3_pre,
1032 "Chain should be broken when skipping an incremental"
1033 );
1034
1035 std::fs::write(&db_path, &initial_data).unwrap();
1037 let r1 = apply_ltx_to_db(std::io::Cursor::new(&buf1), &db_path).unwrap();
1038 let r2 = apply_ltx_to_db(std::io::Cursor::new(&buf2), &db_path).unwrap();
1039 let r3 = apply_ltx_to_db(std::io::Cursor::new(&buf3), &db_path).unwrap();
1040
1041 assert_eq!(r1.post_apply_checksum.into_inner(), r2.header.pre_apply_checksum.unwrap().into_inner());
1043 assert_eq!(r2.post_apply_checksum.into_inner(), r3.header.pre_apply_checksum.unwrap().into_inner());
1044 }
1045
1046 #[test]
1047 fn test_decode_to_db_post_checksum_verification() {
1048 let dir = tempdir().unwrap();
1050 let db_path = dir.path().join("test.db");
1051 let restored_path = dir.path().join("restored.db");
1052 let page_size = 4096u32;
1053
1054 let mut db_data = Vec::new();
1056 for i in 0..3u8 {
1057 db_data.extend(vec![i * 42; page_size as usize]);
1058 }
1059 std::fs::write(&db_path, &db_data).unwrap();
1060
1061 let expected_checksum = compute_checksum_from_file(&db_path).unwrap();
1062
1063 let mut snapshot_buffer = Vec::new();
1065 encode_snapshot(&mut snapshot_buffer, &db_path, page_size, 1).unwrap();
1066
1067 let cursor = std::io::Cursor::new(&snapshot_buffer);
1069 let result = decode_to_db(cursor, &restored_path);
1070 assert!(result.is_ok(), "Valid snapshot should decode successfully");
1071
1072 let decoded_result = result.unwrap();
1074 let actual_checksum = compute_checksum_from_file(&restored_path).unwrap();
1075
1076 assert_eq!(
1077 decoded_result.post_apply_checksum.into_inner(),
1078 actual_checksum.into_inner(),
1079 "post_apply_checksum in result should match actual restored file checksum"
1080 );
1081
1082 assert_eq!(
1083 actual_checksum.into_inner(),
1084 expected_checksum.into_inner(),
1085 "Restored file should match original"
1086 );
1087
1088 let restored_data = std::fs::read(&restored_path).unwrap();
1090 assert_eq!(restored_data, db_data, "Restored data should match original exactly");
1091 }
1092
1093 #[test]
1098 fn test_no_checksum_flag_decode() {
1099 use litepages::Encoder;
1101
1102 let dir = tempdir().unwrap();
1103 let db_path = dir.path().join("test.db");
1104 let restored_path = dir.path().join("restored.db");
1105 let page_size = 4096u32;
1106
1107 let db_data = vec![0x42u8; page_size as usize * 3];
1109 std::fs::write(&db_path, &db_data).unwrap();
1110
1111 let header = Header {
1113 flags: HeaderFlags::NO_CHECKSUM | HeaderFlags::COMPRESS_LZ4,
1114 page_size: PageSize::new(page_size).unwrap(),
1115 commit: PageNum::new(3).unwrap(),
1116 min_txid: TXID::ONE,
1117 max_txid: TXID::ONE,
1118 timestamp: SystemTime::now(),
1119 pre_apply_checksum: None,
1120 };
1121
1122 let mut ltx_buffer = Vec::new();
1123 let mut encoder = Encoder::new(&mut ltx_buffer, &header).unwrap();
1124
1125 for i in 0..3u32 {
1127 encoder.encode_page(PageNum::new(i + 1).unwrap(), &db_data[(i as usize * page_size as usize)..(i as usize + 1) * page_size as usize]).unwrap();
1128 }
1129
1130 encoder.finish(Checksum::new(0)).unwrap();
1132
1133 let cursor = std::io::Cursor::new(<x_buffer);
1135 let result = decode_to_db(cursor, &restored_path);
1136
1137 assert!(result.is_ok(), "Should decode successfully with NO_CHECKSUM flag");
1138 let decode_result = result.unwrap();
1139
1140 assert!(decode_result.post_apply_checksum.into_inner() != 0, "Should compute actual checksum even with NO_CHECKSUM");
1143
1144 let restored_data = std::fs::read(&restored_path).unwrap();
1146 assert_eq!(restored_data, db_data, "Data should be restored correctly even with NO_CHECKSUM");
1147
1148 let expected_checksum = compute_db_checksum(&db_data);
1150 assert_eq!(decode_result.post_apply_checksum.into_inner(), expected_checksum.into_inner());
1151 }
1152
1153 #[test]
1154 fn test_no_checksum_flag_apply() {
1155 use litepages::Encoder;
1157
1158 let dir = tempdir().unwrap();
1159 let db_path = dir.path().join("test.db");
1160 let page_size = 4096u32;
1161
1162 let initial_data = vec![0x00u8; page_size as usize * 3];
1164 std::fs::write(&db_path, &initial_data).unwrap();
1165
1166 let header = Header {
1169 flags: HeaderFlags::NO_CHECKSUM | HeaderFlags::COMPRESS_LZ4,
1170 page_size: PageSize::new(page_size).unwrap(),
1171 commit: PageNum::new(3).unwrap(),
1172 min_txid: TXID::new(2).unwrap(), max_txid: TXID::new(2).unwrap(),
1174 timestamp: SystemTime::now(),
1175 pre_apply_checksum: Some(Checksum::new(0)), };
1177
1178 let mut ltx_buffer = Vec::new();
1179 let mut encoder = Encoder::new(&mut ltx_buffer, &header).unwrap();
1180
1181 let modified_page = vec![0xAAu8; page_size as usize];
1183 encoder.encode_page(PageNum::new(2).unwrap(), &modified_page).unwrap();
1184
1185 encoder.finish(Checksum::new(0)).unwrap();
1187
1188 let cursor = std::io::Cursor::new(<x_buffer);
1190 let result = apply_ltx_to_db(cursor, &db_path);
1191
1192 assert!(result.is_ok(), "Should apply successfully with NO_CHECKSUM flag");
1193 let apply_result = result.unwrap();
1194
1195 assert!(apply_result.post_apply_checksum.into_inner() != 0, "Should compute actual checksum even with NO_CHECKSUM");
1198
1199 let result_data = std::fs::read(&db_path).unwrap();
1201 assert_eq!(&result_data[page_size as usize..2 * page_size as usize], &modified_page[..]);
1202 assert_eq!(&result_data[0..page_size as usize], &vec![0x00u8; page_size as usize][..]);
1204 assert_eq!(&result_data[2 * page_size as usize..3 * page_size as usize], &vec![0x00u8; page_size as usize][..]);
1205
1206 assert!(apply_result.post_apply_checksum.into_inner() != 0, "Should compute a chain checksum");
1208 }
1209
1210 #[test]
1211 fn test_no_checksum_flag_skips_verification() {
1212 use litepages::Encoder;
1214
1215 let dir = tempdir().unwrap();
1216 let db_path = dir.path().join("test.db");
1217 let page_size = 4096u32;
1218
1219 let db_data = vec![0x42u8; page_size as usize];
1221 std::fs::write(&db_path, &db_data).unwrap();
1222
1223 let actual_checksum = compute_checksum_from_file(&db_path).unwrap();
1226 let wrong_checksum = Checksum::new(0xDEADBEEF); assert_ne!(wrong_checksum.into_inner(), actual_checksum.into_inner(), "Checksums should be different");
1229
1230 let header = Header {
1231 flags: HeaderFlags::NO_CHECKSUM | HeaderFlags::COMPRESS_LZ4,
1232 page_size: PageSize::new(page_size).unwrap(),
1233 commit: PageNum::new(1).unwrap(),
1234 min_txid: TXID::new(2).unwrap(),
1235 max_txid: TXID::new(2).unwrap(),
1236 timestamp: SystemTime::now(),
1237 pre_apply_checksum: Some(wrong_checksum), };
1239
1240 let mut ltx_buffer = Vec::new();
1241 let mut encoder = Encoder::new(&mut ltx_buffer, &header).unwrap();
1242 encoder.encode_page(PageNum::new(1).unwrap(), &vec![0x99u8; page_size as usize]).unwrap();
1243 encoder.finish(Checksum::new(0xBADC0FFEE)).unwrap(); let cursor = std::io::Cursor::new(<x_buffer);
1247 let result = apply_ltx_to_db(cursor, &db_path);
1248
1249 assert!(result.is_ok(), "Should succeed with wrong checksums when NO_CHECKSUM is set");
1250 }
1251}
1252