use std::path::Path;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use parking_lot::RwLock;
use crate::persistent_artrie::buffer_manager::BufferManager;
use crate::persistent_artrie::dict_impl::DurabilityPolicy;
use crate::persistent_artrie::disk_manager::DiskManager;
use crate::persistent_artrie::error::{PersistentARTrieError, Result};
use crate::persistent_artrie::recovery::RecoveryReport;
use crate::persistent_artrie::wal::WalConfig;
use crate::persistent_artrie::wal_managed::{create_async_wal, open_or_create_async_wal};
use crate::persistent_artrie_char::arena_manager::ArenaManager;
use super::dict_impl::PersistentVocabARTrie;
use super::types::{VocabTrieFileHeader, DEFAULT_VOCAB_BUFFER_POOL_SIZE, VOCAB_HEADER_VERSION_V2};
impl PersistentVocabARTrie {
pub fn create<P: AsRef<Path>>(path: P) -> Result<Self> {
Self::create_with_start_index(path, 0)
}
pub fn create_with_start_index<P: AsRef<Path>>(path: P, start_index: u64) -> Result<Self> {
let path = path.as_ref().to_path_buf();
if path.exists() {
return Err(PersistentARTrieError::CorruptedFile {
reason: format!("File already exists: {}", path.display()),
});
}
let disk_manager = DiskManager::create(&path)?;
let buffer_manager = BufferManager::new(disk_manager, DEFAULT_VOCAB_BUFFER_POOL_SIZE);
let buffer_manager = Arc::new(RwLock::new(buffer_manager));
let arena_manager = ArenaManager::with_buffer_manager(Arc::clone(&buffer_manager));
let arena_manager = Arc::new(RwLock::new(arena_manager));
{
let bm = buffer_manager.write();
let dm = bm.storage();
let mut header = VocabTrieFileHeader::with_start_index(start_index);
header.version = VOCAB_HEADER_VERSION_V2;
dm.write_header_bytes(&header.to_bytes_with_checksum())?;
dm.sync()?;
}
let wal_path = path.with_extension("vocab.wal");
let wal_config = WalConfig::default();
let wal_writer = create_async_wal(&wal_path, &path).map_err(|e| {
PersistentARTrieError::io_error(
"create WAL",
wal_path.to_string_lossy(),
std::io::Error::new(std::io::ErrorKind::Other, e.to_string()),
)
})?;
let mut trie = Self {
path,
entry_count: AtomicUsize::new(0),
start_index,
next_index: AtomicU64::new(start_index),
dirty: AtomicBool::new(false),
wal_writer: Some(Arc::new(wal_writer)),
wal_config,
next_lsn: AtomicU64::new(1), synced_lsn: AtomicU64::new(0),
durability_policy: DurabilityPolicy::default(),
arena_manager: Some(arena_manager),
buffer_manager: Some(buffer_manager),
eviction_coordinator: None,
lockfree_root: None,
lockfree_cache: None,
cas_retries: AtomicU64::new(0),
commit_seq: AtomicU64::new(0),
committed_watermark:
crate::persistent_artrie_core::committed_watermark::CommittedWatermark::new(0),
epoch_manager: Arc::new(
crate::persistent_artrie_core::concurrency::EpochManager::new(),
),
reverse_term_map: None,
};
<Self as crate::persistent_artrie_core::overlay::flip::LockFreeOverlay<
crate::persistent_artrie_core::key_encoding::CharKey,
_,
_,
>>::install_overlay_on_create(trie)
}
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
let path = path.as_ref();
if !path.exists() {
return Err(PersistentARTrieError::io_error(
"open vocab trie",
path.to_string_lossy(),
std::io::Error::new(std::io::ErrorKind::NotFound, "file not found"),
));
}
let (trie, _) = Self::open_with_recovery(path)?;
Ok(trie)
}
fn open_snapshot<P: AsRef<Path>>(path: P) -> Result<Self> {
let path = path.as_ref().to_path_buf();
if !path.exists() {
return Err(PersistentARTrieError::io_error(
"open vocab trie",
path.to_string_lossy(),
std::io::Error::new(std::io::ErrorKind::NotFound, "file not found"),
));
}
let disk_manager = DiskManager::open_without_validation(&path)?;
let header = crate::persistent_vocab_artrie::header::read_vocab_header(&disk_manager)?;
header.validate()?;
if header.version != VOCAB_HEADER_VERSION_V2 {
return Err(PersistentARTrieError::CorruptedFile {
reason: format!(
"legacy v1 owned vocabulary format (version {}) is no longer supported; \
rebuild the vocabulary with the overlay format",
header.version
),
});
}
let buffer_manager = BufferManager::new(disk_manager, DEFAULT_VOCAB_BUFFER_POOL_SIZE);
let buffer_manager = Arc::new(RwLock::new(buffer_manager));
let arena_manager = ArenaManager::with_buffer_manager(Arc::clone(&buffer_manager));
let arena_manager = Arc::new(RwLock::new(arena_manager));
if header.block_count > 1 {
let mut am = arena_manager.write();
am.clear_for_loading();
for block_id in 1..header.block_count {
am.load_arena(block_id)?;
}
let arena_count = am.arena_count();
if arena_count > 0 {
am.set_active_arena(arena_count - 1);
}
}
let wal_path = path.with_extension("vocab.wal");
let wal_config = WalConfig::default();
let (wal_writer, next_lsn) = {
let wal = open_or_create_async_wal(&wal_path, &path).map_err(|e| {
PersistentARTrieError::io_error(
"open WAL",
wal_path.to_string_lossy(),
std::io::Error::new(std::io::ErrorKind::Other, e.to_string()),
)
})?;
let min_lsn = header.checkpoint_lsn + 1;
wal.set_min_lsn(min_lsn);
let lsn = wal.current_lsn();
(Some(Arc::new(wal)), lsn)
};
let mut trie = Self {
path,
entry_count: AtomicUsize::new(header.entry_count as usize),
start_index: header.start_index,
next_index: AtomicU64::new(header.next_index),
dirty: AtomicBool::new(false),
wal_writer,
wal_config,
next_lsn: AtomicU64::new(next_lsn),
synced_lsn: AtomicU64::new(header.checkpoint_lsn),
durability_policy: DurabilityPolicy::default(),
arena_manager: Some(arena_manager),
buffer_manager: Some(buffer_manager),
eviction_coordinator: None,
lockfree_root: None,
lockfree_cache: None,
cas_retries: AtomicU64::new(0),
commit_seq: AtomicU64::new(0),
committed_watermark:
crate::persistent_artrie_core::committed_watermark::CommittedWatermark::new(0),
epoch_manager: Arc::new(
crate::persistent_artrie_core::concurrency::EpochManager::new(),
),
reverse_term_map: None,
};
trie.reestablish_overlay_from_image(header.root_ptr)?;
Ok(trie)
}
pub fn open_with_recovery<P: AsRef<Path>>(path: P) -> Result<(Self, RecoveryReport)> {
let path = path.as_ref().to_path_buf();
if !path.exists() {
let trie = Self::create(&path)?;
let report = RecoveryReport::created_new();
return Ok((trie, report));
}
let mut trie = Self::open_snapshot(&path)?;
let wal_path = path.with_extension("vocab.wal");
let mut records_replayed = 0;
let mut inserts_replayed = 0;
let checkpoint_lsn = trie.synced_lsn.load(Ordering::Acquire);
if wal_path.exists() {
let (seen, applied) =
trie.replay_wal_into_overlay_rank_aware(&wal_path, checkpoint_lsn)?;
records_replayed = seen;
inserts_replayed = applied;
}
if records_replayed > 0 {
trie.dirty.store(true, Ordering::Release);
}
let report = if records_replayed > 0 {
RecoveryReport::rebuild_from_wal(
path.clone(),
"WAL replay for vocabulary trie".to_string(),
records_replayed as u64,
inserts_replayed as u64,
Vec::new(),
0, )
} else {
RecoveryReport::normal()
};
Ok((trie, report))
}
}