use crate::backend::native::v2::wal::checkpoint::constants::*;
use crate::backend::native::v2::wal::checkpoint::core::CheckpointProgress;
use crate::backend::native::v2::wal::checkpoint::errors::{CheckpointError, CheckpointResult};
use std::io::{Seek, SeekFrom, Write};
pub struct CheckpointWriter;
impl CheckpointWriter {
pub fn write_header<W: Write + Seek>(
writer: &mut W,
lsn_range: (u64, u64),
timestamp: u64,
block_count: u64,
) -> CheckpointResult<()> {
writer
.write_all(CHECKPOINT_MAGIC)
.map_err(|e| CheckpointError::io(format!("Failed to write checkpoint magic: {}", e)))?;
writer
.write_all(&CHECKPOINT_VERSION.to_le_bytes())
.map_err(|e| {
CheckpointError::io(format!("Failed to write checkpoint version: {}", e))
})?;
writer.write_all(&lsn_range.0.to_le_bytes()).map_err(|e| {
CheckpointError::io(format!("Failed to write checkpoint start LSN: {}", e))
})?;
writer.write_all(&lsn_range.1.to_le_bytes()).map_err(|e| {
CheckpointError::io(format!("Failed to write checkpoint end LSN: {}", e))
})?;
writer.write_all(×tamp.to_le_bytes()).map_err(|e| {
CheckpointError::io(format!("Failed to write checkpoint timestamp: {}", e))
})?;
writer.write_all(&block_count.to_le_bytes()).map_err(|e| {
CheckpointError::io(format!("Failed to write checkpoint block count: {}", e))
})?;
Self::write_v2_metadata(writer)?;
Ok(())
}
fn write_v2_metadata<W: Write + Seek>(writer: &mut W) -> CheckpointResult<()> {
let metadata_start = writer
.stream_position()
.map_err(|e| CheckpointError::io(format!("Failed to get metadata position: {}", e)))?;
let v2_version = 2u32; writer
.write_all(&v2_version.to_le_bytes())
.map_err(|e| CheckpointError::io(format!("Failed to write V2 version: {}", e)))?;
writer
.write_all(&v2::V2_GRAPH_BLOCK_SIZE.to_le_bytes())
.map_err(|e| CheckpointError::io(format!("Failed to write V2 block size: {}", e)))?;
writer
.write_all(&v2::V2_CLUSTER_ALIGNMENT.to_le_bytes())
.map_err(|e| {
CheckpointError::io(format!("Failed to write V2 cluster alignment: {}", e))
})?;
let metadata_length_pos = writer.stream_position().map_err(|e| {
CheckpointError::io(format!("Failed to get metadata length position: {}", e))
})?;
writer.write_all(&0u32.to_le_bytes()).map_err(|e| {
CheckpointError::io(format!(
"Failed to write metadata length placeholder: {}",
e
))
})?;
let metadata_end = writer.stream_position().map_err(|e| {
CheckpointError::io(format!("Failed to get metadata end position: {}", e))
})?;
let metadata_length = (metadata_end - metadata_start - 4) as u32;
writer
.seek(SeekFrom::Start(metadata_length_pos))
.map_err(|e| {
CheckpointError::io(format!("Failed to seek to metadata length: {}", e))
})?;
writer
.write_all(&metadata_length.to_le_bytes())
.map_err(|e| CheckpointError::io(format!("Failed to write metadata length: {}", e)))?;
writer.seek(SeekFrom::Start(metadata_end)).map_err(|e| {
CheckpointError::io(format!("Failed to seek back to metadata end: {}", e))
})?;
Ok(())
}
pub fn write_progress<W: Write>(
writer: &mut W,
progress: &CheckpointProgress,
) -> CheckpointResult<()> {
writer
.write_all(PROGRESS_MAGIC)
.map_err(|e| CheckpointError::io(format!("Failed to write progress magic: {}", e)))?;
let elapsed = progress.checkpoint_start.elapsed().as_secs();
writer.write_all(&elapsed.to_le_bytes()).map_err(|e| {
CheckpointError::io(format!("Failed to write progress timestamp: {}", e))
})?;
writer
.write_all(&(progress.completion_percentage as u32).to_le_bytes())
.map_err(|e| {
CheckpointError::io(format!("Failed to write completion percentage: {}", e))
})?;
writer
.write_all(&progress.processed_records.to_le_bytes())
.map_err(|e| {
CheckpointError::io(format!("Failed to write processed records: {}", e))
})?;
writer
.write_all(&progress.flushed_blocks.to_le_bytes())
.map_err(|e| CheckpointError::io(format!("Failed to write flushed blocks: {}", e)))?;
Ok(())
}
pub fn write_completion<W: Write>(
writer: &mut W,
progress: &CheckpointProgress,
) -> CheckpointResult<()> {
writer
.write_all(COMPLETION_MAGIC)
.map_err(|e| CheckpointError::io(format!("Failed to write completion magic: {}", e)))?;
let elapsed = progress.checkpoint_start.elapsed().as_secs();
writer.write_all(&elapsed.to_le_bytes()).map_err(|e| {
CheckpointError::io(format!("Failed to write completion timestamp: {}", e))
})?;
writer
.write_all(&progress.processed_records.to_le_bytes())
.map_err(|e| {
CheckpointError::io(format!("Failed to write final processed records: {}", e))
})?;
writer
.write_all(&progress.flushed_blocks.to_le_bytes())
.map_err(|e| {
CheckpointError::io(format!("Failed to write final flushed blocks: {}", e))
})?;
writer.write_all(&100u32.to_le_bytes()).map_err(|e| {
CheckpointError::io(format!("Failed to write completion status: {}", e))
})?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
#[test]
fn test_checkpoint_header_writing() -> CheckpointResult<()> {
let mut buffer = Vec::new();
let mut writer = Cursor::new(&mut buffer);
let lsn_range = (1000, 2000);
let timestamp = 1234567890;
let block_count = 42;
CheckpointWriter::write_header(&mut writer, lsn_range, timestamp, block_count)?;
assert!(!buffer.is_empty());
assert!(buffer.len() > 50);
Ok(())
}
#[test]
fn test_progress_writing() -> CheckpointResult<()> {
let mut buffer = Vec::new();
let mut writer = Cursor::new(&mut buffer);
let progress = CheckpointProgress {
lsn_range: (1000, 2000),
total_records: 100,
processed_records: 50,
flushed_blocks: 25,
completion_percentage: 50.0,
checkpoint_start: std::time::Instant::now(),
};
CheckpointWriter::write_progress(&mut writer, &progress)?;
assert!(!buffer.is_empty());
assert!(buffer.len() > 20);
Ok(())
}
#[test]
fn test_completion_writing() -> CheckpointResult<()> {
let mut buffer = Vec::new();
let mut writer = Cursor::new(&mut buffer);
let progress = CheckpointProgress {
lsn_range: (1000, 2000),
total_records: 100,
processed_records: 100,
flushed_blocks: 50,
completion_percentage: 100.0,
checkpoint_start: std::time::Instant::now(),
};
CheckpointWriter::write_completion(&mut writer, &progress)?;
assert!(!buffer.is_empty());
assert!(buffer.len() > 20);
Ok(())
}
}