use crate::backend::native::v2::wal::ContiguousRegion;
use std::collections::BTreeMap;
pub const CHAIN_THRESHOLD: usize = 10;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Region {
pub start_offset: u64,
pub total_size: u64,
pub cluster_count: u32,
pub stride: u32,
}
impl Region {
pub fn new(start_offset: u64, total_size: u64) -> Self {
Self {
start_offset,
total_size,
cluster_count: 0,
stride: 0,
}
}
pub fn with_clusters(mut self, cluster_count: u32, stride: u32) -> Self {
self.cluster_count = cluster_count;
self.stride = stride;
self
}
pub fn end_offset(&self) -> u64 {
self.start_offset + self.total_size
}
pub fn overlaps(&self, other: &Region) -> bool {
self.start_offset < other.end_offset() && other.start_offset < self.end_offset()
}
pub fn to_wal_region(&self) -> ContiguousRegion {
ContiguousRegion {
start_offset: self.start_offset,
total_size: self.total_size,
cluster_count: self.cluster_count,
stride: self.stride,
}
}
pub fn from_wal_region(wal_region: &ContiguousRegion) -> Self {
Self {
start_offset: wal_region.start_offset,
total_size: wal_region.total_size,
cluster_count: wal_region.cluster_count,
stride: wal_region.stride,
}
}
}
#[derive(Debug, Clone)]
pub struct ContiguousAllocation {
pub region: Region,
pub allocated_at_tx: u64,
pub committed_at_tx: u64,
}
impl ContiguousAllocation {
pub fn new(region: Region, allocated_at_tx: u64) -> Self {
Self {
region,
allocated_at_tx,
committed_at_tx: 0,
}
}
pub fn is_committed(&self) -> bool {
self.committed_at_tx > 0
}
pub fn commit(&mut self, tx_id: u64) {
self.committed_at_tx = tx_id;
}
}
#[derive(Debug, Clone)]
pub struct ChainAllocationTrigger {
threshold: usize,
current_region: Option<Region>,
clusters_written: usize,
}
impl ChainAllocationTrigger {
pub fn new() -> Self {
Self {
threshold: CHAIN_THRESHOLD,
current_region: None,
clusters_written: 0,
}
}
pub fn with_threshold(threshold: usize) -> Self {
Self {
threshold,
current_region: None,
clusters_written: 0,
}
}
pub fn should_trigger_with_observed_count(&self, observed_chain_length: usize) -> bool {
observed_chain_length >= self.threshold
}
pub fn threshold(&self) -> usize {
self.threshold
}
pub fn region_hint(&self) -> Option<&Region> {
self.current_region.as_ref()
}
pub fn set_region(&mut self, region: Region) {
self.current_region = Some(region);
self.clusters_written = 0;
}
pub fn clear_region(&mut self) {
self.current_region = None;
self.clusters_written = 0;
}
pub fn cluster_index(&self) -> u32 {
self.clusters_written as u32
}
pub fn increment_cluster_count(&mut self) {
self.clusters_written += 1;
}
pub fn clusters_written(&self) -> usize {
self.clusters_written
}
pub fn has_active_region(&self) -> bool {
self.current_region.is_some()
}
}
impl Default for ChainAllocationTrigger {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct FreeSpaceManager {
file_size: u64,
free_blocks: BTreeMap<u64, u64>,
reserved_regions: Vec<ContiguousAllocation>,
}
impl FreeSpaceManager {
pub fn new(file_size: u64) -> Self {
let mut blocks = BTreeMap::new();
blocks.insert(0, file_size);
Self {
file_size,
free_blocks: blocks,
reserved_regions: Vec::new(),
}
}
pub fn add_free_block(&mut self, offset: u64, size: u64) {
self.coalesce_free_block(offset, size);
}
fn coalesce_free_block(&mut self, offset: u64, size: u64) {
let mut merged_offset = offset;
let mut merged_size = size;
let mut to_remove = Vec::new();
if let Some((&prev_offset, &prev_size)) = self.free_blocks.range(..offset).next_back() {
if prev_offset + prev_size == offset {
merged_offset = prev_offset;
merged_size += prev_size;
to_remove.push(prev_offset);
}
}
if let Some((&next_offset, &next_size)) = self.free_blocks.range(offset + size..).next() {
if offset + size == next_offset {
merged_size += next_size;
to_remove.push(next_offset);
}
}
for offset_to_remove in to_remove {
self.free_blocks.remove(&offset_to_remove);
}
self.free_blocks.insert(merged_offset, merged_size);
}
pub fn try_reserve_contiguous(&mut self, bytes: u64, alignment: u64) -> Option<Region> {
let mut candidates: Vec<_> = self
.free_blocks
.iter()
.filter_map(|(&offset, &size)| {
let aligned_offset = Self::align_up(offset, alignment);
if aligned_offset >= offset + size {
return None; }
let aligned_size = size - (aligned_offset - offset);
if aligned_size >= bytes {
Some((offset, size, aligned_offset))
} else {
None
}
})
.collect();
if candidates.is_empty() {
return None;
}
candidates.sort_by_key(|(_, size, _)| *size);
let (block_offset, block_size, aligned_offset) = candidates.last()?;
if self.would_cause_excessive_fragmentation(*block_offset, bytes) {
return None;
}
let region = Region::new(*aligned_offset, bytes);
self.reserved_regions
.push(ContiguousAllocation::new(region.clone(), 0));
self.free_blocks.remove(block_offset);
if aligned_offset > block_offset {
let leading_size = aligned_offset - block_offset;
self.free_blocks.insert(*block_offset, leading_size);
}
let trailing_offset = aligned_offset + bytes;
let trailing_size = (block_offset + block_size) - trailing_offset;
if trailing_size > 0 {
self.free_blocks.insert(trailing_offset, trailing_size);
}
Some(region)
}
fn would_cause_excessive_fragmentation(&self, offset: u64, size: u64) -> bool {
if let Some(&largest_block_offset) = self.free_blocks.keys().last() {
if let Some(&largest_block_size) = self.free_blocks.get(&largest_block_offset) {
if offset == largest_block_offset {
return size > (largest_block_size * 3) / 4;
}
}
}
false
}
fn align_up(addr: u64, alignment: u64) -> u64 {
debug_assert!(alignment.is_power_of_two(), "alignment must be power of 2");
(addr + alignment - 1) & !(alignment - 1)
}
pub fn total_free(&self) -> u64 {
self.free_blocks.values().sum()
}
pub fn largest_contiguous_free(&self) -> u64 {
self.free_blocks.values().copied().max().unwrap_or(0)
}
pub fn free_block_count(&self) -> usize {
self.free_blocks.len()
}
pub fn reserved_regions(&self) -> &[ContiguousAllocation] {
&self.reserved_regions
}
pub fn is_region_reserved(&self, region: &Region) -> bool {
self.reserved_regions
.iter()
.any(|r| r.region.overlaps(region))
}
pub fn commit_contiguous(&mut self, region: &Region, tx_id: u64) -> Result<(), FreeSpaceError> {
if let Some(allocation) = self
.reserved_regions
.iter_mut()
.find(|r| r.region.start_offset == region.start_offset)
{
allocation.commit(tx_id);
Ok(())
} else {
Err(FreeSpaceError::RegionNotFound)
}
}
pub fn rollback_contiguous(&mut self, region: &Region) {
let was_reserved = self
.reserved_regions
.iter()
.any(|r| r.region.start_offset == region.start_offset);
if was_reserved {
self.reserved_regions
.retain(|r| r.region.start_offset != region.start_offset);
self.add_free_block(region.start_offset, region.total_size);
}
}
pub fn remove_committed_region(&mut self, region: &Region) {
self.reserved_regions
.retain(|r| r.region.start_offset != region.start_offset);
}
pub fn file_size(&self) -> u64 {
self.file_size
}
#[cfg(test)]
pub fn clear_reserved(&mut self) {
self.reserved_regions.clear();
}
pub fn recover_from_wal(&mut self, wal_state: &WalRecoveryState) {
for allocation in &wal_state.uncommitted_allocations {
if allocation.committed_at_tx == 0 {
self.add_free_block(allocation.region.start_offset, allocation.region.total_size);
}
}
for freed_region in &wal_state.freed_regions {
self.add_free_block(freed_region.start_offset, freed_region.total_size);
}
}
pub fn recover_from_wal_records(
&mut self,
wal_records: &[crate::backend::native::v2::wal::V2WALRecord],
) {
use crate::backend::native::v2::wal::V2WALRecord;
let mut committed_txns: std::collections::HashSet<u64> = std::collections::HashSet::new();
let mut rolled_back_regions: std::collections::HashSet<u64> =
std::collections::HashSet::new();
for record in wal_records {
match record {
V2WALRecord::CommitContiguous { txn_id, .. } => {
committed_txns.insert(*txn_id);
}
V2WALRecord::RollbackContiguous { region } => {
rolled_back_regions.insert(region.start_offset);
}
_ => {}
}
}
for record in wal_records {
match record {
V2WALRecord::AllocateContiguous { txn_id, region, .. } => {
let wal_region = region;
let region = Region::from_wal_region(wal_region);
if rolled_back_regions.contains(®ion.start_offset) {
self.add_free_block(region.start_offset, region.total_size);
continue;
}
if committed_txns.contains(txn_id) {
self.reserved_regions.push(ContiguousAllocation {
region: region.clone(),
allocated_at_tx: *txn_id,
committed_at_tx: *txn_id,
});
} else {
self.add_free_block(region.start_offset, region.total_size);
}
}
_ => {}
}
}
}
pub fn validate_consistency(&self) -> Result<(), FreeSpaceError> {
for allocation in &self.reserved_regions {
for (&free_offset, &free_size) in &self.free_blocks {
let free_region = Region::new(free_offset, free_size);
if allocation.region.overlaps(&free_region) {
return Err(FreeSpaceError::InconsistentState {
details: format!(
"Reserved region [{}, {}] overlaps with free block [{}, {}]",
allocation.region.start_offset,
allocation.region.end_offset(),
free_offset,
free_offset + free_size
),
});
}
}
}
Ok(())
}
pub fn validate_recovery(&self) -> Result<(), FreeSpaceError> {
for allocation in &self.reserved_regions {
if allocation.committed_at_tx > 0 {
for (&free_offset, &free_size) in &self.free_blocks {
let free_region = Region::new(free_offset, free_size);
if allocation.region.overlaps(&free_region) {
return Err(FreeSpaceError::InconsistentState {
details: format!(
"WAL replay divergence: committed region at [{}, {}] (txn {}) overlaps with free block [{}, {}]",
allocation.region.start_offset,
allocation.region.end_offset(),
allocation.committed_at_tx,
free_offset,
free_offset + free_size
),
});
}
}
}
}
Ok(())
}
pub fn try_reserve_contiguous_with_wal<F>(
&mut self,
bytes: u64,
alignment: u64,
txn_id: u64,
mut log_wal: F,
) -> Option<Region>
where
F: FnMut(
crate::backend::native::v2::wal::V2WALRecord,
) -> Result<(), crate::backend::native::types::NativeBackendError>,
{
let region = self.try_reserve_contiguous(bytes, alignment)?;
let wal_record = crate::backend::native::v2::wal::V2WALRecord::AllocateContiguous {
txn_id,
region: region.to_wal_region(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0),
};
if let Err(_e) = log_wal(wal_record) {
self.rollback_contiguous(®ion);
return None;
}
if let Some(allocation) = self
.reserved_regions
.iter_mut()
.find(|r| r.region.start_offset == region.start_offset)
{
allocation.allocated_at_tx = txn_id;
}
Some(region)
}
pub fn commit_contiguous_with_wal<F>(
&mut self,
region: &Region,
txn_id: u64,
mut log_wal: F,
) -> Result<(), FreeSpaceError>
where
F: FnMut(
crate::backend::native::v2::wal::V2WALRecord,
) -> Result<(), crate::backend::native::types::NativeBackendError>,
{
let wal_record = crate::backend::native::v2::wal::V2WALRecord::CommitContiguous {
txn_id,
region: region.to_wal_region(),
};
log_wal(wal_record).map_err(|e| FreeSpaceError::InconsistentState {
details: format!("WAL logging failed for commit: {}", e),
})?;
self.commit_contiguous(region, txn_id)
}
pub fn rollback_contiguous_with_wal<F>(&mut self, region: &Region, mut log_wal: F)
where
F: FnMut(
crate::backend::native::v2::wal::V2WALRecord,
) -> Result<(), crate::backend::native::types::NativeBackendError>,
{
let was_reserved = self
.reserved_regions
.iter()
.any(|r| r.region.start_offset == region.start_offset);
if was_reserved {
let wal_record = crate::backend::native::v2::wal::V2WALRecord::RollbackContiguous {
region: region.to_wal_region(),
};
let _ = log_wal(wal_record);
self.rollback_contiguous(region);
}
}
pub fn create_allocate_wal_record(
&self,
region: &Region,
txn_id: u64,
) -> crate::backend::native::v2::wal::V2WALRecord {
crate::backend::native::v2::wal::V2WALRecord::AllocateContiguous {
txn_id,
region: region.to_wal_region(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0),
}
}
pub fn create_commit_wal_record(
&self,
region: &Region,
txn_id: u64,
) -> crate::backend::native::v2::wal::V2WALRecord {
crate::backend::native::v2::wal::V2WALRecord::CommitContiguous {
txn_id,
region: region.to_wal_region(),
}
}
pub fn create_rollback_wal_record(
&self,
region: &Region,
) -> crate::backend::native::v2::wal::V2WALRecord {
crate::backend::native::v2::wal::V2WALRecord::RollbackContiguous {
region: region.to_wal_region(),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct WalRecoveryState {
pub uncommitted_allocations: Vec<ContiguousAllocation>,
pub freed_regions: Vec<Region>,
}
impl WalRecoveryState {
pub fn new() -> Self {
Self::default()
}
pub fn add_allocation(&mut self, allocation: ContiguousAllocation) {
self.uncommitted_allocations.push(allocation);
}
pub fn add_freed_region(&mut self, region: Region) {
self.freed_regions.push(region);
}
}
#[derive(Debug, thiserror::Error)]
pub enum FreeSpaceError {
#[error("Region not found in reserved list")]
RegionNotFound,
#[error("Insufficient contiguous free space: needed {needed}, found {found}")]
InsufficientSpace { needed: u64, found: u64 },
#[error("Alignment not satisfied: offset {offset} for alignment {alignment}")]
AlignmentNotSatisfied { offset: u64, alignment: u64 },
#[error("Free space manager inconsistent: {details}")]
InconsistentState { details: String },
}
impl Default for FreeSpaceManager {
fn default() -> Self {
Self::new(1024 * 1024) }
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_free_space_manager_new() {
let fsm = FreeSpaceManager::new(1_000_000);
assert_eq!(fsm.file_size(), 1_000_000);
assert_eq!(fsm.total_free(), 1_000_000);
assert_eq!(fsm.largest_contiguous_free(), 1_000_000);
assert_eq!(fsm.free_block_count(), 1);
}
#[test]
fn test_reserve_contiguous_sufficient_space() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let region = fsm.try_reserve_contiguous(10 * 4096, 4096);
assert!(region.is_some());
assert_eq!(region.unwrap().start_offset, 0);
}
#[test]
fn test_reserve_contiguous_insufficient_space() {
let mut fsm = FreeSpaceManager::new(10_000);
let region = fsm.try_reserve_contiguous(100 * 4096, 4096);
assert!(region.is_none());
}
#[test]
fn test_reserve_contiguous_fragmented() {
let mut fsm = FreeSpaceManager::new(1_000_000);
fsm.clear_reserved();
fsm.free_blocks.clear();
fsm.add_free_block(0, 4096); fsm.add_free_block(4096, 4096); fsm.add_free_block(8192, 4096);
let region = fsm.try_reserve_contiguous(10 * 1024, 1024);
assert!(region.is_none());
let region = fsm.try_reserve_contiguous(3 * 1024, 1024);
assert!(region.is_some());
}
#[test]
fn test_reserve_contiguous_alignment() {
let mut fsm = FreeSpaceManager::new(1_000_000);
fsm.free_blocks.clear();
fsm.add_free_block(100, 100_000);
let region = fsm.try_reserve_contiguous(4096, 4096);
assert!(region.is_some());
assert_eq!(region.unwrap().start_offset, 4096);
}
#[test]
fn test_reserve_creates_leading_trailing_free() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let region = fsm.try_reserve_contiguous(4096, 4096).unwrap();
assert_eq!(region.start_offset, 0);
assert_eq!(fsm.largest_contiguous_free(), 1_000_000 - 4096);
}
#[test]
fn test_coalesce_free_blocks() {
let mut fsm = FreeSpaceManager::new(100_000);
fsm.free_blocks.clear();
fsm.add_free_block(0, 4096);
fsm.add_free_block(4096, 4096);
fsm.add_free_block(8192, 4096);
assert_eq!(fsm.free_block_count(), 1);
assert_eq!(fsm.largest_contiguous_free(), 12 * 1024);
}
#[test]
fn test_rollback_returns_to_free_pool() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let region = fsm.try_reserve_contiguous(4096, 4096).unwrap();
assert_eq!(fsm.largest_contiguous_free(), 1_000_000 - 4096);
fsm.rollback_contiguous(®ion);
assert_eq!(fsm.largest_contiguous_free(), 1_000_000);
}
#[test]
fn test_commit_contiguous() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let region = fsm.try_reserve_contiguous(4096, 4096).unwrap();
let result = fsm.commit_contiguous(®ion, 100);
assert!(result.is_ok());
assert!(fsm.is_region_reserved(®ion));
assert!(fsm.reserved_regions()[0].is_committed());
}
#[test]
fn test_region_overlaps() {
let r1 = Region::new(0, 1000);
let r2 = Region::new(500, 1000);
let r3 = Region::new(2000, 1000);
assert!(r1.overlaps(&r2));
assert!(!r1.overlaps(&r3));
assert!(!r2.overlaps(&r3));
}
#[test]
fn test_is_region_reserved() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let region = fsm.try_reserve_contiguous(4096, 4096).unwrap();
assert!(fsm.is_region_reserved(®ion));
let other_region = Region::new(99999, 4096);
assert!(!fsm.is_region_reserved(&other_region));
}
#[test]
fn test_fragmentation_prevention() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let region = fsm.try_reserve_contiguous(750_000, 1);
}
#[test]
fn test_region_with_clusters() {
let region = Region::new(0, 4096).with_clusters(10, 409);
assert_eq!(region.cluster_count, 10);
assert_eq!(region.stride, 409);
}
#[test]
fn test_contiguous_allocation_commit() {
let region = Region::new(0, 4096);
let mut alloc = ContiguousAllocation::new(region, 100);
assert!(!alloc.is_committed());
alloc.commit(100);
assert!(alloc.is_committed());
assert_eq!(alloc.committed_at_tx, 100);
}
#[test]
fn test_align_up() {
assert_eq!(FreeSpaceManager::align_up(0, 4096), 0);
assert_eq!(FreeSpaceManager::align_up(1, 4096), 4096);
assert_eq!(FreeSpaceManager::align_up(4096, 4096), 4096);
assert_eq!(FreeSpaceManager::align_up(4097, 4096), 8192);
assert_eq!(FreeSpaceManager::align_up(100, 4096), 4096);
}
#[test]
fn test_commit_then_rollback_fails() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let region = fsm.try_reserve_contiguous(4096, 4096).unwrap();
fsm.commit_contiguous(®ion, 100).unwrap();
assert!(fsm.reserved_regions()[0].is_committed());
fsm.rollback_contiguous(®ion);
assert!(!fsm.is_region_reserved(®ion));
assert_eq!(fsm.largest_contiguous_free(), 1_000_000);
}
#[test]
fn test_rollback_then_commit_fails() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let region = fsm.try_reserve_contiguous(4096, 4096).unwrap();
fsm.rollback_contiguous(®ion);
let result = fsm.commit_contiguous(®ion, 100);
assert!(matches!(result, Err(FreeSpaceError::RegionNotFound)));
}
#[test]
fn test_region_can_be_reserved_after_rollback() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let region1 = fsm.try_reserve_contiguous(10 * 4096, 4096).unwrap();
fsm.rollback_contiguous(®ion1);
let region2 = fsm.try_reserve_contiguous(10 * 4096, 4096);
assert!(region2.is_some());
assert_eq!(region2.unwrap().start_offset, 0);
}
#[test]
fn test_multiple_reservations_independent() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let region1 = fsm.try_reserve_contiguous(4096, 4096).unwrap();
let region2 = fsm.try_reserve_contiguous(4096, 4096).unwrap();
fsm.commit_contiguous(®ion1, 100).unwrap();
fsm.rollback_contiguous(®ion2);
assert!(fsm.is_region_reserved(®ion1));
assert!(!fsm.is_region_reserved(®ion2));
}
#[test]
fn test_recover_from_wal_uncommitted_rolled_back() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let region = fsm.try_reserve_contiguous(10 * 4096, 4096).unwrap();
assert_eq!(fsm.largest_contiguous_free(), 1_000_000 - 10 * 4096);
let mut wal_state = WalRecoveryState::new();
wal_state.add_allocation(ContiguousAllocation::new(region.clone(), 50));
fsm.recover_from_wal(&wal_state);
assert_eq!(fsm.largest_contiguous_free(), 1_000_000);
}
#[test]
fn test_recover_from_wal_committed_preserved() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let region = fsm.try_reserve_contiguous(10 * 4096, 4096).unwrap();
assert_eq!(fsm.largest_contiguous_free(), 1_000_000 - 10 * 4096);
let mut wal_state = WalRecoveryState::new();
let mut allocation = ContiguousAllocation::new(region.clone(), 50);
allocation.commit(50); wal_state.add_allocation(allocation);
fsm.recover_from_wal(&wal_state);
assert!(fsm.largest_contiguous_free() < 1_000_000);
}
#[test]
fn test_recover_from_wal_freed_regions_restored() {
let mut fsm = FreeSpaceManager::new(1_000_000);
fsm.free_blocks.clear();
fsm.add_free_block(0, 4096);
fsm.add_free_block(8192, 4096);
let mut wal_state = WalRecoveryState::new();
wal_state.add_freed_region(Region::new(4096, 4096));
fsm.recover_from_wal(&wal_state);
assert_eq!(fsm.free_block_count(), 1);
assert_eq!(fsm.largest_contiguous_free(), 12 * 1024);
}
#[test]
fn test_validate_consistency_success() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let region = fsm.try_reserve_contiguous(4096, 4096).unwrap();
assert!(fsm.validate_consistency().is_ok());
}
#[test]
fn test_validate_consistency_with_corruption() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let region = fsm.try_reserve_contiguous(4096, 4096).unwrap();
fsm.free_blocks.insert(0, 1000);
assert!(fsm.validate_consistency().is_err());
}
#[test]
fn test_no_memory_leak_from_reservations() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let mut regions = Vec::new();
for _ in 0..10 {
if let Some(region) = fsm.try_reserve_contiguous(4096, 4096) {
regions.push(region);
}
}
for region in ®ions {
fsm.rollback_contiguous(region);
}
assert_eq!(fsm.largest_contiguous_free(), 1_000_000);
assert_eq!(fsm.reserved_regions().len(), 0);
}
#[test]
fn test_commit_permanently_allocates() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let region = fsm.try_reserve_contiguous(4096, 4096).unwrap();
fsm.commit_contiguous(®ion, 100).unwrap();
assert!(fsm.is_region_reserved(®ion));
assert!(fsm.reserved_regions()[0].is_committed());
assert_eq!(fsm.reserved_regions()[0].committed_at_tx, 100);
}
#[test]
fn test_remove_committed_region() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let region = fsm.try_reserve_contiguous(4096, 4096).unwrap();
fsm.commit_contiguous(®ion, 100).unwrap();
assert!(fsm.is_region_reserved(®ion));
fsm.remove_committed_region(®ion);
assert!(!fsm.is_region_reserved(®ion));
}
#[test]
fn test_wal_recovery_state_builder() {
let region = Region::new(0, 4096);
let allocation = ContiguousAllocation::new(region.clone(), 100);
let mut wal_state = WalRecoveryState::new();
wal_state.add_allocation(allocation);
wal_state.add_freed_region(Region::new(8192, 4096));
assert_eq!(wal_state.uncommitted_allocations.len(), 1);
assert_eq!(wal_state.freed_regions.len(), 1);
}
#[test]
fn test_rollback_is_idempotent() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let region = fsm.try_reserve_contiguous(4096, 4096).unwrap();
fsm.rollback_contiguous(®ion);
assert!(!fsm.is_region_reserved(®ion));
fsm.rollback_contiguous(®ion);
assert!(!fsm.is_region_reserved(®ion));
assert_eq!(fsm.largest_contiguous_free(), 1_000_000);
}
#[test]
fn test_commit_lifecycle_tracked() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let region = fsm.try_reserve_contiguous(4096, 4096).unwrap();
assert!(!fsm.reserved_regions()[0].is_committed());
assert_eq!(fsm.reserved_regions()[0].committed_at_tx, 0);
fsm.commit_contiguous(®ion, 100).unwrap();
assert!(fsm.reserved_regions()[0].is_committed());
assert_eq!(fsm.reserved_regions()[0].committed_at_tx, 100);
}
#[test]
fn test_chain_allocation_trigger_new() {
let trigger = ChainAllocationTrigger::new();
assert_eq!(trigger.threshold(), CHAIN_THRESHOLD);
assert_eq!(trigger.threshold(), 10);
assert!(!trigger.has_active_region());
assert_eq!(trigger.clusters_written(), 0);
assert!(trigger.region_hint().is_none());
}
#[test]
fn test_chain_allocation_trigger_default() {
let trigger = ChainAllocationTrigger::default();
assert_eq!(trigger.threshold(), CHAIN_THRESHOLD);
assert!(!trigger.has_active_region());
}
#[test]
fn test_chain_allocation_trigger_with_custom_threshold() {
let trigger = ChainAllocationTrigger::with_threshold(5);
assert_eq!(trigger.threshold(), 5);
assert!(!trigger.has_active_region());
}
#[test]
fn test_chain_allocation_trigger_below_threshold() {
let trigger = ChainAllocationTrigger::new();
assert!(!trigger.should_trigger_with_observed_count(5));
assert!(!trigger.should_trigger_with_observed_count(9));
}
#[test]
fn test_chain_allocation_trigger_at_threshold() {
let trigger = ChainAllocationTrigger::new();
assert!(trigger.should_trigger_with_observed_count(10));
assert!(trigger.should_trigger_with_observed_count(15));
}
#[test]
fn test_chain_allocation_trigger_custom_threshold() {
let trigger = ChainAllocationTrigger::with_threshold(5);
assert!(!trigger.should_trigger_with_observed_count(4));
assert!(trigger.should_trigger_with_observed_count(5));
assert!(trigger.should_trigger_with_observed_count(10));
}
#[test]
fn test_chain_allocation_trigger_set_and_get_region() {
let mut trigger = ChainAllocationTrigger::new();
assert!(!trigger.has_active_region());
let region = Region::new(1000, 40960).with_clusters(10, 4096);
trigger.set_region(region.clone());
assert!(trigger.has_active_region());
assert_eq!(trigger.clusters_written(), 0);
assert_eq!(trigger.cluster_index(), 0);
let hint = trigger.region_hint();
assert!(hint.is_some());
let hint_region = hint.unwrap();
assert_eq!(hint_region.start_offset, 1000);
assert_eq!(hint_region.total_size, 40960);
}
#[test]
fn test_chain_allocation_trigger_clear_region() {
let mut trigger = ChainAllocationTrigger::new();
let region = Region::new(1000, 40960).with_clusters(10, 4096);
trigger.set_region(region);
assert!(trigger.has_active_region());
trigger.clear_region();
assert!(!trigger.has_active_region());
assert!(trigger.region_hint().is_none());
assert_eq!(trigger.clusters_written(), 0);
}
#[test]
fn test_chain_allocation_trigger_increment_cluster_count() {
let mut trigger = ChainAllocationTrigger::new();
let region = Region::new(1000, 40960).with_clusters(10, 4096);
trigger.set_region(region);
assert_eq!(trigger.clusters_written(), 0);
assert_eq!(trigger.cluster_index(), 0);
trigger.increment_cluster_count();
assert_eq!(trigger.clusters_written(), 1);
assert_eq!(trigger.cluster_index(), 1);
trigger.increment_cluster_count();
trigger.increment_cluster_count();
assert_eq!(trigger.clusters_written(), 3);
assert_eq!(trigger.cluster_index(), 3);
}
#[test]
fn test_chain_allocation_trigger_region_reset_on_set() {
let mut trigger = ChainAllocationTrigger::new();
let region1 = Region::new(1000, 40960).with_clusters(10, 4096);
trigger.set_region(region1);
trigger.increment_cluster_count();
trigger.increment_cluster_count();
assert_eq!(trigger.clusters_written(), 2);
let region2 = Region::new(50000, 40960).with_clusters(10, 4096);
trigger.set_region(region2);
assert_eq!(trigger.clusters_written(), 0);
assert_eq!(trigger.cluster_index(), 0);
}
#[test]
fn test_chain_allocation_trigger_threshold_constant() {
assert_eq!(CHAIN_THRESHOLD, 10);
}
#[test]
fn test_chain_allocation_trigger_lifecycle() {
let mut trigger = ChainAllocationTrigger::new();
let mut fsm = FreeSpaceManager::new(1_000_000);
assert!(!trigger.has_active_region());
let observed_count = 15;
if trigger.should_trigger_with_observed_count(observed_count) {
let total_bytes = observed_count as u64 * 4096;
if let Some(region) = fsm.try_reserve_contiguous(total_bytes, 4096) {
trigger.set_region(region);
}
}
assert!(trigger.has_active_region());
for i in 0..5 {
assert_eq!(trigger.cluster_index(), i);
trigger.increment_cluster_count();
}
assert_eq!(trigger.clusters_written(), 5);
trigger.clear_region();
assert!(!trigger.has_active_region());
assert_eq!(trigger.clusters_written(), 0);
}
#[test]
fn test_chain_allocation_trigger_no_reservation_for_small_chain() {
let mut trigger = ChainAllocationTrigger::new();
let mut fsm = FreeSpaceManager::new(1_000_000);
let observed_count = 5;
if trigger.should_trigger_with_observed_count(observed_count) {
panic!("Should not trigger for chain below threshold");
}
assert!(!trigger.has_active_region());
}
#[test]
fn test_threshold_gated_activation_exactly_at_boundary() {
let trigger = ChainAllocationTrigger::new();
assert!(trigger.should_trigger_with_observed_count(10));
assert!(!trigger.should_trigger_with_observed_count(9));
}
#[test]
fn test_threshold_gated_activation_with_chain_threshold_constant() {
let trigger = ChainAllocationTrigger::new();
assert_eq!(trigger.threshold(), CHAIN_THRESHOLD);
assert_eq!(CHAIN_THRESHOLD, 10);
}
#[test]
fn test_threshold_gated_activation_multiple_thresholds() {
for threshold in [1, 5, 10, 20, 50, 100] {
let trigger = ChainAllocationTrigger::with_threshold(threshold);
assert!(trigger.should_trigger_with_observed_count(threshold));
if threshold > 1 {
assert!(!trigger.should_trigger_with_observed_count(threshold - 1));
}
assert!(trigger.should_trigger_with_observed_count(threshold + 1));
}
}
#[test]
fn test_threshold_gated_activation_with_zero_threshold() {
let trigger = ChainAllocationTrigger::with_threshold(0);
assert!(trigger.should_trigger_with_observed_count(0));
assert!(trigger.should_trigger_with_observed_count(1));
}
#[test]
fn test_threshold_gated_activation_large_threshold() {
let trigger = ChainAllocationTrigger::with_threshold(1000);
assert!(!trigger.should_trigger_with_observed_count(100));
assert!(!trigger.should_trigger_with_observed_count(500));
assert!(trigger.should_trigger_with_observed_count(1000));
assert!(trigger.should_trigger_with_observed_count(2000));
}
#[test]
fn test_threshold_gated_activation_with_free_space_manager() {
let mut trigger = ChainAllocationTrigger::new();
let mut fsm = FreeSpaceManager::new(1_000_000);
let observed_count = 5;
if trigger.should_trigger_with_observed_count(observed_count) {
let total_bytes = observed_count as u64 * 4096;
if let Some(region) = fsm.try_reserve_contiguous(total_bytes, 4096) {
trigger.set_region(region);
}
}
assert!(!trigger.has_active_region());
let observed_count = 10;
if trigger.should_trigger_with_observed_count(observed_count) {
let total_bytes = observed_count as u64 * 4096;
if let Some(region) = fsm.try_reserve_contiguous(total_bytes, 4096) {
trigger.set_region(region);
}
}
assert!(trigger.has_active_region());
}
#[test]
fn test_threshold_gated_activation_conserves_free_space() {
let mut trigger = ChainAllocationTrigger::new();
let mut fsm = FreeSpaceManager::new(100_000);
let initial_free = fsm.largest_contiguous_free();
for i in 1..=9 {
if trigger.should_trigger_with_observed_count(i) {
let total_bytes = i as u64 * 4096;
if let Some(region) = fsm.try_reserve_contiguous(total_bytes, 4096) {
trigger.set_region(region);
}
}
}
assert_eq!(fsm.largest_contiguous_free(), initial_free);
assert!(!trigger.has_active_region());
}
#[test]
fn test_threshold_gated_activation_prevents_fragmentation() {
let mut trigger = ChainAllocationTrigger::new();
let mut fsm = FreeSpaceManager::new(1_000_000);
let initial_block_count = fsm.free_block_count();
for _chain_num in 0..9 {
let chain_len = 5;
if trigger.should_trigger_with_observed_count(chain_len) {
panic!("Should not trigger for chain length 5");
}
}
assert_eq!(fsm.free_block_count(), initial_block_count);
assert_eq!(fsm.reserved_regions().len(), 0);
}
use crate::backend::native::types::NativeBackendError;
#[test]
fn test_region_to_wal_region_conversion() {
let region = Region::new(1000, 4096).with_clusters(10, 4096);
let wal_region = region.to_wal_region();
assert_eq!(wal_region.start_offset, 1000);
assert_eq!(wal_region.total_size, 4096);
assert_eq!(wal_region.cluster_count, 10);
assert_eq!(wal_region.stride, 4096);
}
#[test]
fn test_region_from_wal_region_conversion() {
let wal_region = ContiguousRegion {
start_offset: 2000,
total_size: 8192,
cluster_count: 20,
stride: 4096,
};
let region = Region::from_wal_region(&wal_region);
assert_eq!(region.start_offset, 2000);
assert_eq!(region.total_size, 8192);
assert_eq!(region.cluster_count, 20);
assert_eq!(region.stride, 4096);
}
#[test]
fn test_region_roundtrip_conversion() {
let original = Region::new(5000, 16384).with_clusters(30, 4096);
let wal_region = original.to_wal_region();
let converted = Region::from_wal_region(&wal_region);
assert_eq!(original.start_offset, converted.start_offset);
assert_eq!(original.total_size, converted.total_size);
assert_eq!(original.cluster_count, converted.cluster_count);
assert_eq!(original.stride, converted.stride);
}
#[test]
fn test_create_allocate_wal_record() {
let fsm = FreeSpaceManager::new(1_000_000);
let region = Region::new(1000, 4096).with_clusters(10, 4096);
let wal_record = fsm.create_allocate_wal_record(®ion, 100);
match wal_record {
crate::backend::native::v2::wal::V2WALRecord::AllocateContiguous {
txn_id,
region: wal_region,
timestamp,
} => {
assert_eq!(txn_id, 100);
assert_eq!(wal_region.start_offset, 1000);
assert_eq!(wal_region.total_size, 4096);
assert!(timestamp > 0);
}
_ => panic!("Expected AllocateContiguous record"),
}
}
#[test]
fn test_create_commit_wal_record() {
let fsm = FreeSpaceManager::new(1_000_000);
let region = Region::new(2000, 8192).with_clusters(20, 4096);
let wal_record = fsm.create_commit_wal_record(®ion, 200);
match wal_record {
crate::backend::native::v2::wal::V2WALRecord::CommitContiguous {
txn_id,
region: wal_region,
} => {
assert_eq!(txn_id, 200);
assert_eq!(wal_region.start_offset, 2000);
assert_eq!(wal_region.total_size, 8192);
}
_ => panic!("Expected CommitContiguous record"),
}
}
#[test]
fn test_create_rollback_wal_record() {
let fsm = FreeSpaceManager::new(1_000_000);
let region = Region::new(3000, 4096).with_clusters(10, 4096);
let wal_record = fsm.create_rollback_wal_record(®ion);
match wal_record {
crate::backend::native::v2::wal::V2WALRecord::RollbackContiguous {
region: wal_region,
} => {
assert_eq!(wal_region.start_offset, 3000);
assert_eq!(wal_region.total_size, 4096);
}
_ => panic!("Expected RollbackContiguous record"),
}
}
#[test]
fn test_try_reserve_contiguous_with_wal_success() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let mut wal_records = Vec::new();
let region = fsm.try_reserve_contiguous_with_wal(10 * 4096, 4096, 100, |wal_record| {
wal_records.push(wal_record.clone());
Ok(())
});
assert!(region.is_some());
assert_eq!(wal_records.len(), 1);
match &wal_records[0] {
crate::backend::native::v2::wal::V2WALRecord::AllocateContiguous { txn_id, .. } => {
assert_eq!(*txn_id, 100);
}
_ => panic!("Expected AllocateContiguous record"),
}
let r = region.unwrap();
assert!(fsm.is_region_reserved(&r));
}
#[test]
fn test_try_reserve_contiguous_with_wal_insufficient_space() {
let mut fsm = FreeSpaceManager::new(10_000); let mut wal_records = Vec::new();
let region = fsm.try_reserve_contiguous_with_wal(
100 * 4096, 4096,
100,
|wal_record| {
wal_records.push(wal_record);
Ok(())
},
);
assert!(region.is_none());
assert_eq!(wal_records.len(), 0); }
#[test]
fn test_try_reserve_contiguous_with_wal_logging_failure_rollback() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let region = fsm.try_reserve_contiguous_with_wal(10 * 4096, 4096, 100, |_wal_record| {
Err(NativeBackendError::CorruptStringTable {
reason: "WAL logging failed".to_string(),
})
});
assert!(region.is_none());
assert_eq!(fsm.largest_contiguous_free(), 1_000_000);
}
#[test]
fn test_commit_contiguous_with_wal_success() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let region = fsm.try_reserve_contiguous(10 * 4096, 4096).unwrap();
let mut wal_records = Vec::new();
let result = fsm.commit_contiguous_with_wal(®ion, 100, |wal_record| {
wal_records.push(wal_record);
Ok(())
});
assert!(result.is_ok());
assert_eq!(wal_records.len(), 1);
match &wal_records[0] {
crate::backend::native::v2::wal::V2WALRecord::CommitContiguous { txn_id, .. } => {
assert_eq!(*txn_id, 100);
}
_ => panic!("Expected CommitContiguous record"),
}
assert!(fsm.reserved_regions()[0].is_committed());
}
#[test]
fn test_commit_contiguous_with_wal_not_found() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let region = Region::new(1000, 4096); let mut wal_records = Vec::new();
let result = fsm.commit_contiguous_with_wal(®ion, 100, |wal_record| {
wal_records.push(wal_record);
Ok(())
});
assert!(matches!(result, Err(FreeSpaceError::RegionNotFound)));
assert_eq!(wal_records.len(), 1); }
#[test]
fn test_rollback_contiguous_with_wal_success() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let region = fsm.try_reserve_contiguous(10 * 4096, 4096).unwrap();
let mut wal_records = Vec::new();
fsm.rollback_contiguous_with_wal(®ion, |wal_record| {
wal_records.push(wal_record);
Ok(())
});
assert_eq!(wal_records.len(), 1);
match &wal_records[0] {
crate::backend::native::v2::wal::V2WALRecord::RollbackContiguous { .. } => {
}
_ => panic!("Expected RollbackContiguous record"),
}
assert!(!fsm.is_region_reserved(®ion));
assert_eq!(fsm.largest_contiguous_free(), 1_000_000);
}
#[test]
fn test_rollback_contiguous_with_wal_idempotent() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let region = fsm.try_reserve_contiguous(10 * 4096, 4096).unwrap();
let mut wal_records = Vec::new();
fsm.rollback_contiguous_with_wal(®ion, |wal_record| {
wal_records.push(wal_record);
Ok(())
});
assert_eq!(wal_records.len(), 1);
assert!(!fsm.is_region_reserved(®ion));
let mut wal_records2 = Vec::new();
fsm.rollback_contiguous_with_wal(®ion, |wal_record| {
wal_records2.push(wal_record);
Ok(())
});
assert_eq!(wal_records2.len(), 0); assert_eq!(fsm.largest_contiguous_free(), 1_000_000);
}
#[test]
fn test_wal_logging_full_lifecycle() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let mut wal_records = Vec::new();
let region = fsm.try_reserve_contiguous_with_wal(10 * 4096, 4096, 100, |wal_record| {
wal_records.push(wal_record);
Ok(())
});
assert!(region.is_some());
let r = region.unwrap();
assert_eq!(wal_records.len(), 1);
fsm.commit_contiguous_with_wal(&r, 100, |wal_record| {
wal_records.push(wal_record);
Ok(())
});
assert_eq!(wal_records.len(), 2);
match &wal_records[0] {
crate::backend::native::v2::wal::V2WALRecord::AllocateContiguous { .. } => {}
_ => panic!("Expected AllocateContiguous"),
}
match &wal_records[1] {
crate::backend::native::v2::wal::V2WALRecord::CommitContiguous { .. } => {}
_ => panic!("Expected CommitContiguous"),
}
}
use crate::backend::native::v2::wal::{ContiguousRegion, V2WALRecord};
#[test]
fn test_wal_replay_committed_allocation_preserved() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let wal_records = vec![
V2WALRecord::AllocateContiguous {
txn_id: 100,
region: ContiguousRegion {
start_offset: 0,
total_size: 10 * 4096,
cluster_count: 10,
stride: 4096,
},
timestamp: 1000,
},
V2WALRecord::CommitContiguous {
txn_id: 100,
region: ContiguousRegion {
start_offset: 0,
total_size: 10 * 4096,
cluster_count: 10,
stride: 4096,
},
},
];
fsm.recover_from_wal_records(&wal_records);
assert_eq!(fsm.reserved_regions().len(), 1);
assert!(fsm.reserved_regions()[0].is_committed());
assert_eq!(fsm.reserved_regions()[0].committed_at_tx, 100);
}
#[test]
fn test_wal_replay_uncommitted_allocation_rolled_back() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let wal_records = vec![
V2WALRecord::AllocateContiguous {
txn_id: 100,
region: ContiguousRegion {
start_offset: 0,
total_size: 10 * 4096,
cluster_count: 10,
stride: 4096,
},
timestamp: 1000,
},
];
fsm.recover_from_wal_records(&wal_records);
assert_eq!(fsm.reserved_regions().len(), 0);
assert!(fsm.largest_contiguous_free() >= 10 * 4096);
}
#[test]
fn test_wal_replay_explicit_rollback() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let wal_records = vec![
V2WALRecord::AllocateContiguous {
txn_id: 100,
region: ContiguousRegion {
start_offset: 0,
total_size: 10 * 4096,
cluster_count: 10,
stride: 4096,
},
timestamp: 1000,
},
V2WALRecord::RollbackContiguous {
region: ContiguousRegion {
start_offset: 0,
total_size: 10 * 4096,
cluster_count: 10,
stride: 4096,
},
},
];
fsm.recover_from_wal_records(&wal_records);
assert_eq!(fsm.reserved_regions().len(), 0);
assert!(fsm.largest_contiguous_free() >= 10 * 4096);
}
#[test]
fn test_wal_replay_multiple_transactions() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let wal_records = vec![
V2WALRecord::AllocateContiguous {
txn_id: 100,
region: ContiguousRegion {
start_offset: 0,
total_size: 10 * 4096,
cluster_count: 10,
stride: 4096,
},
timestamp: 1000,
},
V2WALRecord::CommitContiguous {
txn_id: 100,
region: ContiguousRegion {
start_offset: 0,
total_size: 10 * 4096,
cluster_count: 10,
stride: 4096,
},
},
V2WALRecord::AllocateContiguous {
txn_id: 200,
region: ContiguousRegion {
start_offset: 10 * 4096,
total_size: 10 * 4096,
cluster_count: 10,
stride: 4096,
},
timestamp: 2000,
},
];
fsm.recover_from_wal_records(&wal_records);
assert_eq!(fsm.reserved_regions().len(), 1);
assert_eq!(fsm.reserved_regions()[0].committed_at_tx, 100);
}
#[test]
fn test_wal_replay_fail_fast_on_divergence() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let wal_records = vec![
V2WALRecord::AllocateContiguous {
txn_id: 100,
region: ContiguousRegion {
start_offset: 0,
total_size: 10 * 4096,
cluster_count: 10,
stride: 4096,
},
timestamp: 1000,
},
V2WALRecord::CommitContiguous {
txn_id: 100,
region: ContiguousRegion {
start_offset: 0,
total_size: 10 * 4096,
cluster_count: 10,
stride: 4096,
},
},
];
fsm.recover_from_wal_records(&wal_records);
fsm.free_blocks.clear();
fsm.add_free_block(0, 10 * 4096);
assert!(fsm.validate_recovery().is_err());
}
#[test]
fn test_wal_replay_valid_state_passes_validation() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let wal_records = vec![
V2WALRecord::AllocateContiguous {
txn_id: 100,
region: ContiguousRegion {
start_offset: 0,
total_size: 10 * 4096,
cluster_count: 10,
stride: 4096,
},
timestamp: 1000,
},
V2WALRecord::CommitContiguous {
txn_id: 100,
region: ContiguousRegion {
start_offset: 0,
total_size: 10 * 4096,
cluster_count: 10,
stride: 4096,
},
},
];
fsm.recover_from_wal_records(&wal_records);
fsm.free_blocks.clear();
fsm.add_free_block(10 * 4096, 1_000_000 - 10 * 4096);
assert!(fsm.validate_recovery().is_ok());
}
#[test]
fn test_wal_replay_empty_wal() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let wal_records = vec![];
fsm.recover_from_wal_records(&wal_records);
assert_eq!(fsm.reserved_regions().len(), 0);
assert_eq!(fsm.largest_contiguous_free(), 1_000_000);
}
#[test]
fn test_wal_replay_idempotent() {
let mut fsm = FreeSpaceManager::new(1_000_000);
let wal_records = vec![
V2WALRecord::AllocateContiguous {
txn_id: 100,
region: ContiguousRegion {
start_offset: 0,
total_size: 10 * 4096,
cluster_count: 10,
stride: 4096,
},
timestamp: 1000,
},
V2WALRecord::CommitContiguous {
txn_id: 100,
region: ContiguousRegion {
start_offset: 0,
total_size: 10 * 4096,
cluster_count: 10,
stride: 4096,
},
},
];
fsm.recover_from_wal_records(&wal_records);
let reserved_count_after_first = fsm.reserved_regions().len();
fsm.recover_from_wal_records(&wal_records);
assert!(fsm.reserved_regions().len() >= reserved_count_after_first);
}
}