1use crate::cell_delta_wal::{
55 CELL_DELTA_CHECKSUM_SIZE, CELL_DELTA_HEADER_SIZE, CellDeltaWalFrame, CellOp,
56};
57use fsqlite_error::Result;
58use fsqlite_types::{CommitSeq, PageNumber, TxnId};
59use tracing::{debug, trace};
60
61#[derive(Debug, Clone)]
70pub struct MixedFrameSubmission {
71 pub full_page_frames: Vec<FullPageFrame>,
74
75 pub cell_delta_frames: Vec<CellDeltaWalFrame>,
77
78 pub txn_id: TxnId,
80
81 pub commit_seq: CommitSeq,
83}
84
85#[derive(Debug, Clone)]
87pub struct FullPageFrame {
88 pub page_number: PageNumber,
90 pub page_data: Vec<u8>,
92 pub db_size_if_commit: u32,
94}
95
96impl MixedFrameSubmission {
97 #[must_use]
99 pub fn new(txn_id: TxnId, commit_seq: CommitSeq) -> Self {
100 Self {
101 full_page_frames: Vec::new(),
102 cell_delta_frames: Vec::new(),
103 txn_id,
104 commit_seq,
105 }
106 }
107
108 #[must_use]
110 pub fn total_frame_count(&self) -> usize {
111 self.full_page_frames.len() + self.cell_delta_frames.len()
112 }
113
114 #[must_use]
116 pub fn has_cell_deltas(&self) -> bool {
117 !self.cell_delta_frames.is_empty()
118 }
119
120 #[must_use]
122 pub fn has_full_pages(&self) -> bool {
123 !self.full_page_frames.is_empty()
124 }
125
126 #[must_use]
128 pub fn is_cell_only(&self) -> bool {
129 self.full_page_frames.is_empty() && !self.cell_delta_frames.is_empty()
130 }
131
132 pub fn add_full_page(&mut self, page_number: PageNumber, page_data: Vec<u8>) {
134 self.full_page_frames.push(FullPageFrame {
135 page_number,
136 page_data,
137 db_size_if_commit: 0,
138 });
139 }
140
141 pub fn add_cell_delta(&mut self, frame: CellDeltaWalFrame) {
143 self.cell_delta_frames.push(frame);
144 }
145
146 pub fn mark_commit(&mut self, db_size: u32) {
151 if let Some(last) = self.full_page_frames.last_mut() {
152 last.db_size_if_commit = db_size;
153 }
154 }
157
158 #[must_use]
162 pub fn estimated_size(&self, page_size: usize) -> usize {
163 let full_page_size = self
164 .full_page_frames
165 .len()
166 .saturating_mul(24usize.saturating_add(page_size));
167 let cell_delta_size = self.cell_delta_frames.iter().fold(0usize, |acc, f| {
168 acc.saturating_add(
169 CELL_DELTA_HEADER_SIZE
170 .saturating_add(f.cell_data.len())
171 .saturating_add(CELL_DELTA_CHECKSUM_SIZE),
172 )
173 });
174 full_page_size.saturating_add(cell_delta_size)
175 }
176}
177
178pub fn build_cell_delta_frames<I>(
197 deltas: I,
198 commit_seq: CommitSeq,
199 txn_id: TxnId,
200) -> Vec<CellDeltaWalFrame>
201where
202 I: Iterator<Item = CellDeltaDescriptor>,
203{
204 let (lower, _) = deltas.size_hint();
205 let mut frames = Vec::with_capacity(lower);
206
207 for desc in deltas {
208 let frame = CellDeltaWalFrame::new(
209 desc.page_number,
210 desc.cell_key_digest,
211 desc.op,
212 commit_seq,
213 txn_id,
214 desc.cell_data,
215 );
216
217 trace!(
218 pgno = desc.page_number.get(),
219 op = ?desc.op,
220 commit_seq = commit_seq.get(),
221 txn_id = txn_id.get(),
222 data_len = frame.cell_data.len(),
223 "cell_delta_frame_built"
224 );
225
226 frames.push(frame);
227 }
228
229 debug!(
230 frame_count = frames.len(),
231 commit_seq = commit_seq.get(),
232 txn_id = txn_id.get(),
233 "cell_delta_frames_extracted"
234 );
235
236 frames
237}
238
239#[derive(Debug, Clone)]
241pub struct CellDeltaDescriptor {
242 pub page_number: PageNumber,
244 pub cell_key_digest: [u8; 16],
246 pub op: CellOp,
248 pub cell_data: Vec<u8>,
250}
251
252impl CellDeltaDescriptor {
253 #[must_use]
255 pub fn new(
256 page_number: PageNumber,
257 cell_key_digest: [u8; 16],
258 op: CellOp,
259 cell_data: Vec<u8>,
260 ) -> Self {
261 Self {
262 page_number,
263 cell_key_digest,
264 op,
265 cell_data,
266 }
267 }
268
269 #[must_use]
271 pub fn insert(page_number: PageNumber, cell_key_digest: [u8; 16], cell_data: Vec<u8>) -> Self {
272 Self::new(page_number, cell_key_digest, CellOp::Insert, cell_data)
273 }
274
275 #[must_use]
277 pub fn update(page_number: PageNumber, cell_key_digest: [u8; 16], cell_data: Vec<u8>) -> Self {
278 Self::new(page_number, cell_key_digest, CellOp::Update, cell_data)
279 }
280
281 #[must_use]
283 pub fn delete(page_number: PageNumber, cell_key_digest: [u8; 16]) -> Self {
284 Self::new(page_number, cell_key_digest, CellOp::Delete, Vec::new())
285 }
286}
287
288pub fn serialize_mixed_frames(
302 submission: &MixedFrameSubmission,
303 page_size: usize,
304) -> Result<Vec<u8>> {
305 let estimated_size = submission.estimated_size(page_size);
306 let mut buf = Vec::new();
307
308 for frame in &submission.cell_delta_frames {
310 let serialized = frame.serialize()?;
311 buf.extend_from_slice(&serialized);
312 }
313
314 debug!(
324 cell_delta_bytes = buf.len(),
325 full_page_count = submission.full_page_frames.len(),
326 total_estimated = estimated_size,
327 "mixed_frames_serialized"
328 );
329
330 Ok(buf)
331}
332
333#[derive(Debug, Clone, Default)]
339pub struct MixedCommitStats {
340 pub full_page_frames: u64,
342 pub cell_delta_frames: u64,
344 pub full_page_bytes: u64,
346 pub cell_delta_bytes: u64,
348 pub bytes_saved: u64,
350}
351
352impl MixedCommitStats {
353 #[must_use]
355 pub fn calculate(submission: &MixedFrameSubmission, page_size: usize) -> Self {
356 let full_page_count = submission.full_page_frames.len() as u64;
357 let cell_delta_count = submission.cell_delta_frames.len() as u64;
358 let bytes_per_full_page =
359 24u64.saturating_add(u64::try_from(page_size).unwrap_or(u64::MAX));
360
361 let full_page_bytes = full_page_count.saturating_mul(bytes_per_full_page);
362 let cell_delta_bytes = submission.cell_delta_frames.iter().fold(0u64, |acc, f| {
363 acc.saturating_add(
364 u64::try_from(CELL_DELTA_HEADER_SIZE)
365 .unwrap_or(u64::MAX)
366 .saturating_add(u64::try_from(f.cell_data.len()).unwrap_or(u64::MAX))
367 .saturating_add(u64::try_from(CELL_DELTA_CHECKSUM_SIZE).unwrap_or(u64::MAX)),
368 )
369 });
370
371 let hypothetical_full_page_bytes = cell_delta_count.saturating_mul(bytes_per_full_page);
373 let bytes_saved = hypothetical_full_page_bytes.saturating_sub(cell_delta_bytes);
374
375 Self {
376 full_page_frames: full_page_count,
377 cell_delta_frames: cell_delta_count,
378 full_page_bytes,
379 cell_delta_bytes,
380 bytes_saved,
381 }
382 }
383
384 #[must_use]
386 pub fn compression_ratio(&self, page_size: usize) -> f64 {
387 let bytes_per_full_page =
388 24u64.saturating_add(u64::try_from(page_size).unwrap_or(u64::MAX));
389 let hypothetical = self
390 .full_page_frames
391 .saturating_add(self.cell_delta_frames)
392 .saturating_mul(bytes_per_full_page);
393 if hypothetical == 0 {
394 return 1.0;
395 }
396 self.full_page_bytes.saturating_add(self.cell_delta_bytes) as f64 / hypothetical as f64
397 }
398}
399
400#[cfg(test)]
405mod tests {
406 use super::*;
407
408 fn test_txn_id() -> TxnId {
409 TxnId::new(42).unwrap()
410 }
411
412 fn test_page_number() -> PageNumber {
413 PageNumber::new(10).unwrap()
414 }
415
416 fn test_key_digest() -> [u8; 16] {
417 [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
418 }
419
420 #[test]
421 fn test_mixed_frame_submission_creation() {
422 let mut sub = MixedFrameSubmission::new(test_txn_id(), CommitSeq::new(100));
423 assert_eq!(sub.total_frame_count(), 0);
424 assert!(!sub.has_cell_deltas());
425 assert!(!sub.has_full_pages());
426
427 sub.add_cell_delta(CellDeltaWalFrame::new(
428 test_page_number(),
429 test_key_digest(),
430 CellOp::Insert,
431 CommitSeq::new(100),
432 test_txn_id(),
433 vec![1, 2, 3],
434 ));
435
436 assert_eq!(sub.total_frame_count(), 1);
437 assert!(sub.has_cell_deltas());
438 assert!(sub.is_cell_only());
439
440 sub.add_full_page(test_page_number(), vec![0u8; 4096]);
441 assert_eq!(sub.total_frame_count(), 2);
442 assert!(sub.has_full_pages());
443 assert!(!sub.is_cell_only());
444 }
445
446 #[test]
447 fn test_cell_delta_descriptor() {
448 let desc =
449 CellDeltaDescriptor::insert(test_page_number(), test_key_digest(), vec![1, 2, 3]);
450 assert_eq!(desc.page_number, test_page_number());
451 assert_eq!(desc.op, CellOp::Insert);
452 assert_eq!(desc.cell_data, vec![1, 2, 3]);
453
454 let delete_desc = CellDeltaDescriptor::delete(test_page_number(), test_key_digest());
455 assert_eq!(delete_desc.op, CellOp::Delete);
456 assert!(delete_desc.cell_data.is_empty());
457 }
458
459 #[test]
460 fn test_build_cell_delta_frames() {
461 let descs = vec![
462 CellDeltaDescriptor::insert(PageNumber::new(10).unwrap(), [1; 16], vec![0xAA; 50]),
463 CellDeltaDescriptor::update(PageNumber::new(11).unwrap(), [2; 16], vec![0xBB; 100]),
464 CellDeltaDescriptor::delete(PageNumber::new(12).unwrap(), [3; 16]),
465 ];
466
467 let frames = build_cell_delta_frames(descs.into_iter(), CommitSeq::new(200), test_txn_id());
468
469 assert_eq!(frames.len(), 3);
470 assert_eq!(frames[0].page_number, PageNumber::new(10).unwrap());
471 assert_eq!(frames[0].op, CellOp::Insert);
472 assert_eq!(frames[1].op, CellOp::Update);
473 assert_eq!(frames[2].op, CellOp::Delete);
474 assert!(frames[2].cell_data.is_empty());
475 }
476
477 #[test]
478 fn test_serialize_mixed_frames() {
479 let mut sub = MixedFrameSubmission::new(test_txn_id(), CommitSeq::new(100));
480
481 sub.add_cell_delta(CellDeltaWalFrame::new(
482 test_page_number(),
483 test_key_digest(),
484 CellOp::Insert,
485 CommitSeq::new(100),
486 test_txn_id(),
487 vec![1, 2, 3, 4, 5],
488 ));
489
490 let buf = serialize_mixed_frames(&sub, 4096).unwrap();
491
492 assert!(!buf.is_empty());
494 assert_eq!(buf.len(), 54);
496
497 let frame = CellDeltaWalFrame::deserialize(&buf).unwrap();
499 assert_eq!(frame.page_number, test_page_number());
500 assert_eq!(frame.cell_data, vec![1, 2, 3, 4, 5]);
501 }
502
503 #[test]
504 fn test_mixed_commit_stats() {
505 let mut sub = MixedFrameSubmission::new(test_txn_id(), CommitSeq::new(100));
506
507 for i in 0..2 {
509 sub.add_cell_delta(CellDeltaWalFrame::new(
510 PageNumber::new(10 + i).unwrap(),
511 [i as u8; 16],
512 CellOp::Insert,
513 CommitSeq::new(100),
514 test_txn_id(),
515 vec![0u8; 100], ));
517 }
518
519 sub.add_full_page(PageNumber::new(20).unwrap(), vec![0u8; 4096]);
521
522 let stats = MixedCommitStats::calculate(&sub, 4096);
523
524 assert_eq!(stats.full_page_frames, 1);
525 assert_eq!(stats.cell_delta_frames, 2);
526
527 assert_eq!(stats.cell_delta_bytes, 298);
529
530 assert_eq!(stats.full_page_bytes, 4120);
532
533 assert_eq!(stats.bytes_saved, 7942);
536
537 let ratio = stats.compression_ratio(4096);
539 assert!(
540 ratio < 1.0,
541 "compression ratio should be < 1.0, got {ratio}"
542 );
543 }
544
545 #[test]
546 fn test_mixed_commit_stats_saturate_for_pathological_page_size() {
547 let mut sub = MixedFrameSubmission::new(test_txn_id(), CommitSeq::new(100));
548 sub.add_full_page(test_page_number(), Vec::new());
549 sub.add_cell_delta(CellDeltaWalFrame::new(
550 test_page_number(),
551 test_key_digest(),
552 CellOp::Insert,
553 CommitSeq::new(100),
554 test_txn_id(),
555 vec![1],
556 ));
557
558 let stats = MixedCommitStats::calculate(&sub, usize::MAX);
559 let bytes_per_full_page =
560 24u64.saturating_add(u64::try_from(usize::MAX).unwrap_or(u64::MAX));
561
562 assert_eq!(stats.full_page_bytes, bytes_per_full_page);
563 assert!(stats.bytes_saved <= bytes_per_full_page);
564 assert!(stats.compression_ratio(usize::MAX).is_finite());
565 }
566
567 #[test]
568 fn test_estimated_size() {
569 let mut sub = MixedFrameSubmission::new(test_txn_id(), CommitSeq::new(100));
570
571 sub.add_cell_delta(CellDeltaWalFrame::new(
573 test_page_number(),
574 test_key_digest(),
575 CellOp::Insert,
576 CommitSeq::new(100),
577 test_txn_id(),
578 vec![0u8; 50],
579 ));
580
581 sub.add_full_page(test_page_number(), vec![0u8; 4096]);
583
584 let estimated = sub.estimated_size(4096);
585 assert_eq!(estimated, 4219);
589 }
590
591 #[test]
592 fn test_serialize_mixed_frames_rejects_oversized_cell_delta_without_preallocation() {
593 let mut sub = MixedFrameSubmission::new(test_txn_id(), CommitSeq::new(100));
594 sub.add_cell_delta(CellDeltaWalFrame::new(
595 test_page_number(),
596 test_key_digest(),
597 CellOp::Insert,
598 CommitSeq::new(100),
599 test_txn_id(),
600 vec![0u8; crate::cell_delta_wal::CELL_DELTA_MAX_DATA_SIZE + 1],
601 ));
602
603 assert!(serialize_mixed_frames(&sub, 4096).is_err());
604 }
605
606 #[test]
607 fn test_mark_commit() {
608 let mut sub = MixedFrameSubmission::new(test_txn_id(), CommitSeq::new(100));
609
610 sub.add_full_page(PageNumber::new(10).unwrap(), vec![0u8; 4096]);
611 sub.add_full_page(PageNumber::new(11).unwrap(), vec![0u8; 4096]);
612
613 assert_eq!(sub.full_page_frames[0].db_size_if_commit, 0);
614 assert_eq!(sub.full_page_frames[1].db_size_if_commit, 0);
615
616 sub.mark_commit(100);
617
618 assert_eq!(sub.full_page_frames[0].db_size_if_commit, 0);
619 assert_eq!(sub.full_page_frames[1].db_size_if_commit, 100);
620 }
621
622 #[test]
623 fn test_compression_ratio_zero_frames_returns_one() {
624 let sub = MixedFrameSubmission::new(test_txn_id(), CommitSeq::new(1));
625 let stats = MixedCommitStats::calculate(&sub, 4096);
626 assert_eq!(stats.full_page_frames, 0);
627 assert_eq!(stats.cell_delta_frames, 0);
628 assert!((stats.compression_ratio(4096) - 1.0).abs() < f64::EPSILON);
629 }
630
631 #[test]
632 fn test_mixed_commit_stats_default_all_zero() {
633 let stats = MixedCommitStats::default();
634 assert_eq!(stats.full_page_frames, 0);
635 assert_eq!(stats.cell_delta_frames, 0);
636 assert_eq!(stats.full_page_bytes, 0);
637 assert_eq!(stats.cell_delta_bytes, 0);
638 assert_eq!(stats.bytes_saved, 0);
639 }
640
641 #[test]
642 fn test_build_cell_delta_frames_empty_iterator() {
643 let frames = build_cell_delta_frames(std::iter::empty(), CommitSeq::new(1), test_txn_id());
644 assert!(frames.is_empty());
645 }
646
647 #[test]
648 fn test_cell_delta_descriptor_update_factory() {
649 let desc =
650 CellDeltaDescriptor::update(test_page_number(), test_key_digest(), vec![0xCC; 50]);
651 assert_eq!(desc.op, CellOp::Update);
652 assert_eq!(desc.cell_data.len(), 50);
653 assert_eq!(desc.page_number, test_page_number());
654 }
655
656 #[test]
657 fn test_mark_commit_on_empty_full_pages_is_noop() {
658 let mut sub = MixedFrameSubmission::new(test_txn_id(), CommitSeq::new(1));
659 sub.add_cell_delta(CellDeltaWalFrame::new(
660 test_page_number(),
661 test_key_digest(),
662 CellOp::Insert,
663 CommitSeq::new(1),
664 test_txn_id(),
665 vec![1],
666 ));
667 sub.mark_commit(50);
668 assert!(sub.full_page_frames.is_empty());
669 }
670
671 #[test]
672 fn test_serialize_mixed_frames_empty_submission() {
673 let sub = MixedFrameSubmission::new(test_txn_id(), CommitSeq::new(1));
674 let buf = serialize_mixed_frames(&sub, 4096).unwrap();
675 assert!(buf.is_empty());
676 }
677
678 #[test]
679 fn test_cell_only_commit() {
680 let mut sub = MixedFrameSubmission::new(test_txn_id(), CommitSeq::new(100));
681
682 sub.add_cell_delta(CellDeltaWalFrame::new(
683 test_page_number(),
684 test_key_digest(),
685 CellOp::Insert,
686 CommitSeq::new(100),
687 test_txn_id(),
688 vec![1, 2, 3],
689 ));
690
691 assert!(sub.is_cell_only());
692 assert!(sub.has_cell_deltas());
693 assert!(!sub.has_full_pages());
694 }
695
696 #[test]
697 fn test_estimated_size_empty_returns_zero() {
698 let sub = MixedFrameSubmission::new(test_txn_id(), CommitSeq::new(1));
699 assert_eq!(sub.estimated_size(4096), 0);
700 assert_eq!(sub.estimated_size(0), 0);
701 }
702
703 #[test]
704 fn test_full_page_frame_fields_and_debug() {
705 let frame = FullPageFrame {
706 page_number: test_page_number(),
707 page_data: vec![0xAB; 4096],
708 db_size_if_commit: 55,
709 };
710 assert_eq!(frame.page_number, test_page_number());
711 assert_eq!(frame.page_data.len(), 4096);
712 assert_eq!(frame.db_size_if_commit, 55);
713
714 let cloned = frame.clone();
715 assert_eq!(cloned.page_number, frame.page_number);
716 assert_eq!(cloned.db_size_if_commit, frame.db_size_if_commit);
717
718 let dbg = format!("{frame:?}");
719 assert!(dbg.contains("FullPageFrame"));
720 }
721
722 #[test]
723 fn test_build_cell_delta_frames_preserves_key_digest() {
724 let digest_a = [0xAA; 16];
725 let digest_b = [0xBB; 16];
726 let descs = vec![
727 CellDeltaDescriptor::insert(PageNumber::new(5).unwrap(), digest_a, vec![1, 2]),
728 CellDeltaDescriptor::delete(PageNumber::new(6).unwrap(), digest_b),
729 ];
730 let frames = build_cell_delta_frames(descs.into_iter(), CommitSeq::new(10), test_txn_id());
731 assert_eq!(frames[0].cell_key_digest, digest_a);
732 assert_eq!(frames[1].cell_key_digest, digest_b);
733 }
734
735 #[test]
736 fn mixed_frame_submission_debug_and_clone() {
737 let mut sub = MixedFrameSubmission::new(test_txn_id(), CommitSeq::new(7));
738 sub.add_full_page(test_page_number(), vec![0u8; 64]);
739 let dbg = format!("{sub:?}");
740 assert!(dbg.contains("MixedFrameSubmission"));
741 let cloned = sub.clone();
742 assert_eq!(cloned.txn_id, test_txn_id());
743 assert_eq!(cloned.commit_seq, CommitSeq::new(7));
744 assert_eq!(cloned.full_page_frames.len(), 1);
745 }
746
747 #[test]
748 fn cell_delta_descriptor_debug_and_clone() {
749 let desc =
750 CellDeltaDescriptor::insert(test_page_number(), test_key_digest(), vec![9, 8, 7]);
751 let dbg = format!("{desc:?}");
752 assert!(dbg.contains("CellDeltaDescriptor"));
753 let cloned = desc.clone();
754 assert_eq!(cloned.page_number, test_page_number());
755 assert_eq!(cloned.cell_data, vec![9, 8, 7]);
756 assert_eq!(cloned.cell_key_digest, test_key_digest());
757 }
758
759 #[test]
760 fn mixed_commit_stats_debug_and_clone() {
761 let mut sub = MixedFrameSubmission::new(test_txn_id(), CommitSeq::new(1));
762 sub.add_full_page(test_page_number(), vec![0u8; 4096]);
763 let stats = MixedCommitStats::calculate(&sub, 4096);
764 let dbg = format!("{stats:?}");
765 assert!(dbg.contains("MixedCommitStats"));
766 let cloned = stats.clone();
767 assert_eq!(cloned.full_page_frames, stats.full_page_frames);
768 assert_eq!(cloned.full_page_bytes, stats.full_page_bytes);
769 }
770
771 #[test]
772 fn new_submission_stores_txn_id_and_commit_seq() {
773 let txn = TxnId::new(999).unwrap();
774 let seq = CommitSeq::new(555);
775 let sub = MixedFrameSubmission::new(txn, seq);
776 assert_eq!(sub.txn_id, txn);
777 assert_eq!(sub.commit_seq, seq);
778 assert!(sub.full_page_frames.is_empty());
779 assert!(sub.cell_delta_frames.is_empty());
780 }
781
782 #[test]
783 fn test_compression_ratio_cell_only_below_one() {
784 let mut sub = MixedFrameSubmission::new(test_txn_id(), CommitSeq::new(1));
785 sub.add_cell_delta(CellDeltaWalFrame::new(
786 test_page_number(),
787 test_key_digest(),
788 CellOp::Insert,
789 CommitSeq::new(1),
790 test_txn_id(),
791 vec![0u8; 80],
792 ));
793 let stats = MixedCommitStats::calculate(&sub, 4096);
794 assert_eq!(stats.full_page_frames, 0);
795 assert_eq!(stats.cell_delta_frames, 1);
796 let ratio = stats.compression_ratio(4096);
797 assert!(ratio < 1.0, "cell-only ratio should be < 1.0, got {ratio}");
798 assert!(ratio > 0.0, "ratio should be positive, got {ratio}");
799 }
800}