#![cfg(feature = "io-uring-backend")]
use std::path::Path;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::Arc;
use crate::persistent_artrie::adaptive_pool::CacheStats;
use crate::persistent_artrie::block_storage::BlockStorage;
use crate::persistent_artrie::buffer_manager::BufferManager;
use crate::persistent_artrie::concurrency::{EpochManager, OptimisticVersion, RetryStats};
use crate::persistent_artrie::dict_impl::DurabilityPolicy;
use crate::persistent_artrie::error::{PersistentARTrieError, Result};
use crate::persistent_artrie::wal::{WalConfig, WalReader, WalRecord};
use crate::persistent_artrie::wal_managed::{create_async_wal, open_or_create_async_wal};
use crate::sync_compat::RwLock;
use crate::value::DictionaryValue;
use super::arena_manager::ArenaManager;
use super::DEFAULT_CHAR_BUFFER_POOL_SIZE;
impl<V: DictionaryValue>
super::PersistentARTrieChar<V, crate::persistent_artrie::IoUringDiskManager>
{
fn install_overlay_on_create(self) -> Result<Self> {
<Self as crate::persistent_artrie_core::overlay::flip::LockFreeOverlay<
crate::persistent_artrie_core::key_encoding::CharKey,
_,
_,
>>::install_overlay_on_create(self)
}
pub fn create_with_io_uring<P: AsRef<Path>>(path: P) -> Result<Self> {
use crate::persistent_artrie::IoUringDiskManager;
let path = path.as_ref();
let disk_manager = IoUringDiskManager::create(path)?;
let buffer_manager = BufferManager::new(disk_manager, DEFAULT_CHAR_BUFFER_POOL_SIZE);
let buffer_manager = Arc::new(RwLock::new(buffer_manager));
let wal_path = path.with_extension("wal");
let wal_writer =
create_async_wal(&wal_path, path).map_err(|e| PersistentARTrieError::WalError {
reason: format!("{:?}", e),
})?;
let wal_writer = Arc::new(wal_writer);
let arena_manager = ArenaManager::with_buffer_manager(Arc::clone(&buffer_manager));
let arena_manager = Arc::new(RwLock::new(arena_manager));
Self::install_overlay_on_create(Self {
len: AtomicUsize::new(0),
dirty: AtomicBool::new(false),
buffer_manager: Some(buffer_manager),
wal_writer: Some(wal_writer),
wal_config: WalConfig::default(),
next_lsn: std::sync::atomic::AtomicU64::new(1),
committed_watermark: super::committed_watermark::CommittedWatermark::new(0),
checkpoint_lock: std::sync::Arc::new(parking_lot::Mutex::new(())),
merge_lock: std::sync::Arc::new(parking_lot::Mutex::new(())),
file_path: Some(path.to_path_buf()),
arena_manager: Some(arena_manager),
version: OptimisticVersion::new(),
epoch_manager: Arc::new(EpochManager::new()),
structural_generation: std::sync::atomic::AtomicU64::new(0),
retry_stats: RetryStats::new(),
#[cfg(feature = "group-commit")]
group_commit: std::sync::Mutex::new(None),
memory_monitor: std::sync::Mutex::new(None),
cache_stats: CacheStats::default(),
checkpoint_manager: std::sync::Mutex::new(None),
durability_policy: crate::persistent_artrie_core::shared_access::AtomicEnumCell::new(
DurabilityPolicy::default(),
),
eviction_coordinator: std::sync::Mutex::new(None),
prefetcher: crate::persistent_artrie::prefetch::Prefetcher::new(),
_phantom: std::marker::PhantomData,
lockfree_root: None,
commit_seq: std::sync::atomic::AtomicU64::new(0),
commit_seq_by_data_lsn: std::sync::Mutex::new(std::collections::BTreeMap::new()),
lockfree_cache: None,
cas_retries: std::sync::atomic::AtomicU64::new(0),
})
}
pub fn open_with_io_uring<P: AsRef<Path>>(path: P) -> Result<Self> {
use crate::persistent_artrie::IoUringDiskManager;
let path = path.as_ref();
let disk_manager = IoUringDiskManager::open(path)?;
let root_ptr = disk_manager.root_ptr()?;
let _entry_count = disk_manager.entry_count()?;
let buffer_manager = BufferManager::new(disk_manager, DEFAULT_CHAR_BUFFER_POOL_SIZE);
let buffer_manager = Arc::new(RwLock::new(buffer_manager));
let wal_path = path.with_extension("wal");
let (recovered_ops, next_lsn, checkpoint_lsn, commit_seq_seed) = if wal_path.exists() {
let mut reader =
WalReader::new(&wal_path).map_err(|e| PersistentARTrieError::WalError {
reason: format!("{:?}", e),
})?;
let mut records = Vec::new();
let mut max_lsn = 0u64;
let mut checkpoint_lsn = 0u64;
let mut max_commit_seq_gen = 0u64;
while let Some(result) = reader.next_record() {
match result {
Ok((lsn, record)) => {
max_lsn = max_lsn.max(lsn);
if let WalRecord::Checkpoint {
checkpoint_lsn: cp_lsn,
..
} = &record
{
checkpoint_lsn = checkpoint_lsn.max(*cp_lsn);
}
if let WalRecord::CommitRank { generation, .. } = &record {
max_commit_seq_gen = max_commit_seq_gen.max(*generation);
}
records.push((lsn, record));
}
Err(_) => break,
}
}
let next_lsn = max_lsn + 1;
let floor = WalReader::read_header(&wal_path)
.map(|h| h.commit_seq_floor)
.unwrap_or(0);
let commit_seq_seed = floor.max(max_commit_seq_gen);
(records, next_lsn, checkpoint_lsn, commit_seq_seed)
} else {
(Vec::new(), 1, 0, 0)
};
let wal_writer = open_or_create_async_wal(&wal_path, path).map_err(|e| {
PersistentARTrieError::WalError {
reason: format!("{:?}", e),
}
})?;
let wal_writer = Arc::new(wal_writer);
let arena_manager = ArenaManager::with_buffer_manager(Arc::clone(&buffer_manager));
let arena_manager = Arc::new(RwLock::new(arena_manager));
let recovered_frontier = {
let archive_config_for_base = WalConfig::default();
let full_max = wal_writer
.collect_wal_segments(&archive_config_for_base)
.ok()
.and_then(|segments| {
crate::persistent_artrie::wal::AsyncWalWriter::max_lsn_in_segments(&segments)
});
full_max
.unwrap_or_else(|| next_lsn.saturating_sub(1))
.max(next_lsn.saturating_sub(1))
};
let mut inner = Self {
len: AtomicUsize::new(0),
dirty: AtomicBool::new(false),
buffer_manager: Some(buffer_manager.clone()),
wal_writer: Some(wal_writer),
wal_config: WalConfig::default(),
next_lsn: std::sync::atomic::AtomicU64::new(next_lsn),
committed_watermark: super::committed_watermark::CommittedWatermark::new(
recovered_frontier,
),
checkpoint_lock: std::sync::Arc::new(parking_lot::Mutex::new(())),
merge_lock: std::sync::Arc::new(parking_lot::Mutex::new(())),
file_path: Some(path.to_path_buf()),
arena_manager: Some(arena_manager),
version: OptimisticVersion::new(),
epoch_manager: Arc::new(EpochManager::new()),
structural_generation: std::sync::atomic::AtomicU64::new(0),
retry_stats: RetryStats::new(),
#[cfg(feature = "group-commit")]
group_commit: std::sync::Mutex::new(None),
memory_monitor: std::sync::Mutex::new(None),
cache_stats: CacheStats::default(),
checkpoint_manager: std::sync::Mutex::new(None),
durability_policy: crate::persistent_artrie_core::shared_access::AtomicEnumCell::new(
DurabilityPolicy::default(),
),
eviction_coordinator: std::sync::Mutex::new(None),
prefetcher: crate::persistent_artrie::prefetch::Prefetcher::new(),
_phantom: std::marker::PhantomData,
lockfree_root: None,
commit_seq: std::sync::atomic::AtomicU64::new(0),
commit_seq_by_data_lsn: std::sync::Mutex::new(std::collections::BTreeMap::new()),
lockfree_cache: None,
cas_retries: std::sync::atomic::AtomicU64::new(0),
};
inner
.commit_seq
.store(commit_seq_seed, std::sync::atomic::Ordering::Release);
use crate::persistent_artrie_core::key_encoding::CharKey;
use crate::persistent_artrie_core::overlay::flip::LockFreeOverlay;
let rank_regime = WalReader::read_header(&wal_path)
.map(|h| h.regime())
.unwrap_or(crate::persistent_artrie_core::wal::RankRegime::Owned);
let use_f5 = <Self as LockFreeOverlay<
CharKey,
V,
crate::persistent_artrie::IoUringDiskManager,
>>::USE_F5_REOPEN_LOADER
&& rank_regime == crate::persistent_artrie_core::wal::RankRegime::Overlay;
let convert_owned = <Self as LockFreeOverlay<
CharKey,
V,
crate::persistent_artrie::IoUringDiskManager,
>>::USE_F5_REOPEN_LOADER
&& rank_regime == crate::persistent_artrie_core::wal::RankRegime::Owned;
if convert_owned {
let _ = recovered_ops;
let archive_config = WalConfig::default();
inner.convert_owned_to_overlay_on_reopen(
root_ptr,
root_ptr != 0,
checkpoint_lsn,
&archive_config,
)?;
if let Some(ref arena_manager) = inner.arena_manager {
arena_manager.write().ensure_valid();
}
} else if use_f5 {
let (_lc, image_loaded) = inner.load_root_immutable(&buffer_manager, root_ptr)?;
if let Some(ref arena_manager) = inner.arena_manager {
arena_manager.write().ensure_valid();
}
let _ = recovered_ops;
let archive_config = WalConfig::default();
let effective_loaded = (root_ptr != 0) && image_loaded;
let _applied = inner.reconcile_and_drain_overlay(
&archive_config,
effective_loaded,
if effective_loaded { checkpoint_lsn } else { 0 },
)?;
}
Ok(inner)
}
}