use std::collections::HashMap;
use std::path::Path;
use crate::backend::native::NativeBackendError;
use crate::backend::native::NativeResult;
use crate::backend::native::v3::index::IndexPage;
#[derive(Debug)]
pub struct WriteBatch {
dirty_pages: HashMap<u64, IndexPage>,
committed: bool,
}
impl WriteBatch {
pub fn new() -> Self {
Self {
dirty_pages: HashMap::new(),
committed: false,
}
}
pub fn stage_page(&mut self, page: IndexPage) -> NativeResult<()> {
if self.committed {
return Err(NativeBackendError::InvalidOperation {
context: "Cannot stage page to already-committed batch".to_string(),
});
}
let page_id = page.page_id();
self.dirty_pages.insert(page_id, page);
Ok(())
}
pub fn len(&self) -> usize {
self.dirty_pages.len()
}
pub fn is_empty(&self) -> bool {
self.dirty_pages.is_empty()
}
pub fn is_committed(&self) -> bool {
self.committed
}
pub fn commit(mut self, db_path: &Path) -> NativeResult<()> {
if self.committed {
return Err(NativeBackendError::InvalidOperation {
context: "Batch already committed".to_string(),
});
}
if self.dirty_pages.is_empty() {
return Err(NativeBackendError::InvalidOperation {
context: "Cannot commit empty batch".to_string(),
});
}
self.write_pages_to_disk(db_path)?;
self.committed = true;
Ok(())
}
fn write_pages_to_disk(&self, db_path: &Path) -> NativeResult<()> {
use crate::backend::native::v3::constants::{DEFAULT_PAGE_SIZE, V3_HEADER_SIZE};
use std::fs::OpenOptions;
use std::io::{Seek, SeekFrom, Write};
let mut file = OpenOptions::new()
.read(true)
.write(true)
.open(db_path)
.map_err(|e| NativeBackendError::IoError {
context: format!("Failed to open db for batch write: {}", db_path.display()),
source: e,
})?;
for (page_id, page) in &self.dirty_pages {
if *page_id == 0 {
continue;
}
let offset = V3_HEADER_SIZE + (page_id - 1) * DEFAULT_PAGE_SIZE;
let page_bytes = page.pack()?;
file.seek(SeekFrom::Start(offset))
.map_err(|e| NativeBackendError::IoError {
context: format!("Failed to seek to page {}", page_id),
source: e,
})?;
file.write_all(&page_bytes)
.map_err(|e| NativeBackendError::IoError {
context: format!("Failed to write page {}", page_id),
source: e,
})?;
}
file.sync_data().map_err(|e| NativeBackendError::IoError {
context: "Failed to sync batch write".to_string(),
source: e,
})?;
Ok(())
}
#[cfg(test)]
pub fn get_page(&self, page_id: u64) -> Option<&IndexPage> {
self.dirty_pages.get(&page_id)
}
}
impl Default for WriteBatch {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::backend::native::v3::index::IndexPage;
use tempfile::TempDir;
fn create_test_db() -> (TempDir, std::path::PathBuf) {
let temp = TempDir::new().unwrap();
let db_path = temp.path().join("test.graph");
use crate::backend::native::v3::header::PersistentHeaderV3;
use std::fs::File;
use std::io::Write;
let header = PersistentHeaderV3::new_v3();
let header_bytes = header.to_bytes();
let mut file = File::create(&db_path).unwrap();
file.write_all(&header_bytes).unwrap();
file.set_len(4096 * 10).unwrap();
(temp, db_path)
}
#[test]
fn test_write_batch_new_is_empty() {
let batch = WriteBatch::new();
assert!(batch.is_empty());
assert_eq!(batch.len(), 0);
assert!(!batch.is_committed());
}
#[test]
fn test_stage_page_increases_count() {
let mut batch = WriteBatch::new();
let page = IndexPage::new_leaf(1);
batch.stage_page(page).unwrap();
assert_eq!(batch.len(), 1);
assert!(!batch.is_empty());
}
#[test]
fn test_stage_same_page_twice_overwrites() {
let mut batch = WriteBatch::new();
let page1 = IndexPage::new_leaf(1);
let page2 = IndexPage::new_leaf(1);
batch.stage_page(page1).unwrap();
batch.stage_page(page2.clone()).unwrap();
assert_eq!(batch.len(), 1);
assert_eq!(batch.get_page(1).unwrap().page_id(), 1);
}
#[test]
fn test_cannot_commit_empty_batch() {
let (_temp, db_path) = create_test_db();
let batch = WriteBatch::new();
let result = batch.commit(&db_path);
assert!(result.is_err());
}
#[test]
fn test_commit_multiple_pages() {
let (_temp, db_path) = create_test_db();
let mut batch = WriteBatch::new();
for i in 1..=5 {
let page = IndexPage::new_leaf(i);
batch.stage_page(page).unwrap();
}
assert_eq!(batch.len(), 5);
batch.commit(&db_path).unwrap();
}
#[test]
fn test_commit_skips_page_zero() {
let (_temp, db_path) = create_test_db();
let mut batch = WriteBatch::new();
let page0 = IndexPage::new_leaf(0);
let page1 = IndexPage::new_leaf(1);
batch.stage_page(page0).unwrap();
batch.stage_page(page1).unwrap();
batch.commit(&db_path).unwrap();
}
}