use super::routing::ShardKey;
use libdictenstein::persistent_artrie::eviction::{EvictionConfig, EvictionStats};
use libdictenstein::persistent_artrie::wal::SyncHandle;
use libdictenstein::persistent_artrie::wal_managed::WalManaged;
use libdictenstein::persistent_artrie::{DocumentTransaction, PersistentARTrie, SharedARTrie};
use libdictenstein::EvictableARTrie;
use liblevenshtein::dictionary::Dictionary;
use parking_lot::{Condvar, Mutex};
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum ShardError {
#[error("Failed to create/open shard at {path}: {message}")]
Open {
path: PathBuf,
message: String,
},
#[error("Read failed for shard {shard_key}: {message}")]
Read {
shard_key: String,
message: String,
},
#[error("Write failed for shard {shard_key}: {message}")]
Write {
shard_key: String,
message: String,
},
#[error("Checkpoint failed for shard {shard_key}: {message}")]
Checkpoint {
shard_key: String,
message: String,
},
#[error("Shard {shard_key} is locked by worker {holder}")]
Locked {
shard_key: String,
holder: usize,
},
#[error("Invalid write token for shard {shard_key}")]
InvalidToken {
shard_key: String,
},
#[error("Sync failed for shard {shard_key}: {message}")]
Sync {
shard_key: String,
message: String,
},
#[error("Sync timed out for shard {shard_key}")]
SyncTimeout {
shard_key: String,
},
}
pub type ShardResult<T> = Result<T, ShardError>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum ShardSyncState {
Clean = 0,
Dirty = 1,
Syncing = 2,
SyncFailed = 3,
}
impl ShardSyncState {
fn from_u8(value: u8) -> Self {
match value {
0 => Self::Clean,
1 => Self::Dirty,
2 => Self::Syncing,
3 => Self::SyncFailed,
_ => Self::Dirty, }
}
}
pub struct ShardSyncCoordinator {
state: AtomicU8,
sync_complete: Arc<(Mutex<bool>, Condvar)>,
last_synced_lsn: AtomicU64,
last_error: Mutex<Option<String>>,
}
impl Default for ShardSyncCoordinator {
fn default() -> Self {
Self::new()
}
}
impl ShardSyncCoordinator {
pub fn new() -> Self {
Self {
state: AtomicU8::new(ShardSyncState::Clean as u8),
sync_complete: Arc::new((Mutex::new(true), Condvar::new())),
last_synced_lsn: AtomicU64::new(0),
last_error: Mutex::new(None),
}
}
pub fn state(&self) -> ShardSyncState {
ShardSyncState::from_u8(self.state.load(Ordering::Acquire))
}
pub fn mark_dirty(&self) {
loop {
let current = self.state.load(Ordering::Acquire);
let current_state = ShardSyncState::from_u8(current);
if current_state != ShardSyncState::Clean {
return;
}
if self
.state
.compare_exchange(
current,
ShardSyncState::Dirty as u8,
Ordering::AcqRel,
Ordering::Acquire,
)
.is_ok()
{
return;
}
}
}
pub fn try_start_sync(&self) -> bool {
let dirty = ShardSyncState::Dirty as u8;
let syncing = ShardSyncState::Syncing as u8;
if self
.state
.compare_exchange(dirty, syncing, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
let (lock, _) = &*self.sync_complete;
let mut completed = lock.lock();
*completed = false;
true
} else {
false
}
}
pub fn complete_sync(&self, new_lsn: u64) {
self.last_synced_lsn.store(new_lsn, Ordering::Release);
{
let mut error = self.last_error.lock();
*error = None;
}
self.state
.store(ShardSyncState::Clean as u8, Ordering::Release);
let (lock, cvar) = &*self.sync_complete;
let mut completed = lock.lock();
*completed = true;
cvar.notify_all();
}
pub fn fail_sync(&self, error: impl Into<String>) {
{
let mut last_error = self.last_error.lock();
*last_error = Some(error.into());
}
self.state
.store(ShardSyncState::SyncFailed as u8, Ordering::Release);
let (lock, cvar) = &*self.sync_complete;
let mut completed = lock.lock();
*completed = true; cvar.notify_all();
}
pub fn retry_sync(&self) -> bool {
let failed = ShardSyncState::SyncFailed as u8;
let dirty = ShardSyncState::Dirty as u8;
self.state
.compare_exchange(failed, dirty, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
}
pub fn is_syncing(&self) -> bool {
self.state() == ShardSyncState::Syncing
}
pub fn is_dirty(&self) -> bool {
self.state() == ShardSyncState::Dirty
}
pub fn is_sync_failed(&self) -> bool {
self.state() == ShardSyncState::SyncFailed
}
pub fn wait_for_sync(&self, timeout: Duration) -> Result<(), ()> {
let (lock, cvar) = &*self.sync_complete;
let mut completed = lock.lock();
if *completed {
return Ok(());
}
let result = cvar.wait_for(&mut completed, timeout);
if result.timed_out() {
Err(())
} else if *completed {
Ok(())
} else {
Err(())
}
}
pub fn last_synced_lsn(&self) -> u64 {
self.last_synced_lsn.load(Ordering::Acquire)
}
pub fn last_error(&self) -> Option<String> {
self.last_error.lock().clone()
}
}
#[derive(Clone, Debug, Default)]
pub struct ShardCheckpointState {
pub completed_prefixes: HashSet<String>,
pub current_prefix: Option<String>,
pub ngrams_processed: u64,
pub last_checkpoint_lsn: u64,
}
#[derive(Debug, Default)]
pub struct ShardStats {
pub entry_count: AtomicU64,
pub write_count: AtomicU64,
pub read_count: AtomicU64,
pub lock_wait_us: AtomicU64,
}
impl ShardStats {
pub fn record_write(&self) {
self.write_count.fetch_add(1, Ordering::Relaxed);
}
pub fn record_read(&self) {
self.read_count.fetch_add(1, Ordering::Relaxed);
}
pub fn record_lock_wait(&self, micros: u64) {
self.lock_wait_us.fetch_add(micros, Ordering::Relaxed);
}
pub fn set_entry_count(&self, count: u64) {
self.entry_count.store(count, Ordering::Relaxed);
}
pub fn add_entries(&self, delta: u64) {
self.entry_count.fetch_add(delta, Ordering::Relaxed);
}
}
pub struct ShardSyncHandle {
inner: SyncHandle,
shard_key: ShardKey,
}
impl ShardSyncHandle {
pub fn is_synced(&self) -> bool {
self.inner.is_synced()
}
pub fn wait(self) -> ShardResult<()> {
self.inner.wait().map_err(|e| ShardError::Sync {
shard_key: self.shard_key.to_string(),
message: format!("Async sync wait failed: {}", e),
})
}
pub fn wait_timeout(&self, timeout: Duration) -> ShardResult<bool> {
self.inner
.wait_timeout(timeout)
.map_err(|e| ShardError::Sync {
shard_key: self.shard_key.to_string(),
message: format!("Async sync wait_timeout failed: {}", e),
})
}
pub fn shard_key(&self) -> &ShardKey {
&self.shard_key
}
pub fn target_lsn(&self) -> u64 {
self.inner.target_lsn()
}
}
pub struct ShardHandle {
key: ShardKey,
trie: SharedARTrie<u64>,
path: PathBuf,
checkpoint_state: ShardCheckpointState,
stats: ShardStats,
sync_coordinator: ShardSyncCoordinator,
lockfree_entries: AtomicU64,
}
impl ShardHandle {
const CHECKPOINT_PREFIX: &'static str = "\x00__shard_ckpt__:";
const CHECKPOINT_PREFIX_BYTES: &'static [u8] = b"\x00__shard_ckpt__:";
pub fn create(key: ShardKey, path: impl AsRef<Path>) -> ShardResult<Self> {
let path = path.as_ref().to_path_buf();
let trie =
PersistentARTrie::create_with_slot_tracking(&path).map_err(|e| ShardError::Open {
path: path.clone(),
message: e.to_string(),
})?;
Ok(Self {
key,
trie: Arc::new(trie),
path,
checkpoint_state: ShardCheckpointState::default(),
stats: ShardStats::default(),
sync_coordinator: ShardSyncCoordinator::new(),
lockfree_entries: AtomicU64::new(0),
})
}
pub fn open(key: ShardKey, path: impl AsRef<Path>) -> ShardResult<Self> {
let path = path.as_ref().to_path_buf();
let (trie, recovery_report) = PersistentARTrie::open_with_recovery_and_slot_tracking(&path)
.map_err(|e| {
let msg = format!(
"Failed to open shard at {:?}. If this shard was created with an older format \
(PersistentARTrieChar), it must be re-imported. Error: {}",
path, e
);
ShardError::Open {
path: path.clone(),
message: msg,
}
})?;
if recovery_report.mode.recovered() {
log::info!(
"Shard {} recovered from crash: {:?}, {} records replayed",
key,
recovery_report.mode,
recovery_report.records_replayed
);
}
let mut handle = Self {
key,
trie: Arc::new(trie),
path,
checkpoint_state: ShardCheckpointState::default(),
stats: ShardStats::default(),
sync_coordinator: ShardSyncCoordinator::new(),
lockfree_entries: AtomicU64::new(0),
};
handle.load_checkpoint_state()?;
handle
.stats
.set_entry_count(handle.trie.len().unwrap_or(0) as u64);
Ok(handle)
}
pub fn open_or_create(key: ShardKey, path: impl AsRef<Path>) -> ShardResult<Self> {
let path = path.as_ref();
if path.exists() {
Self::open(key, path)
} else {
Self::create(key, path)
}
}
pub fn arm_eviction(&self, config: Option<EvictionConfig>) -> ShardResult<()> {
if let Some(config) = config {
self.trie
.enable_eviction(config)
.map_err(|e| ShardError::Open {
path: self.path.clone(),
message: format!("failed to enable overlay eviction: {e}"),
})?;
}
Ok(())
}
pub fn eviction_stats(&self) -> EvictionStats {
self.trie.eviction_stats()
}
pub fn key(&self) -> &ShardKey {
&self.key
}
pub fn path(&self) -> &Path {
&self.path
}
pub fn len(&self) -> usize {
self.stats
.entry_count
.load(std::sync::atomic::Ordering::Relaxed) as usize
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn lockfree_entry_count(&self) -> u64 {
self.lockfree_entries.load(Ordering::Relaxed)
}
pub fn checkpoint_state(&self) -> &ShardCheckpointState {
&self.checkpoint_state
}
pub fn stats(&self) -> &ShardStats {
&self.stats
}
pub fn increment_lockfree(&self, ngram: &[u8], count: u64) -> ShardResult<bool> {
let was_new = self.trie.get_value_bytes(ngram).is_none();
self.trie.increment_cas(ngram, count);
self.stats.record_write();
if was_new {
self.stats.add_entries(1);
self.lockfree_entries.fetch_add(1, Ordering::Relaxed);
}
self.sync_coordinator.mark_dirty();
Ok(was_new)
}
pub fn get(&self, ngram: &[u8]) -> Option<u64> {
self.stats.record_read();
match self.trie.get_value_bytes(ngram).unwrap_or(0) {
0 => None,
total => Some(total),
}
}
pub fn contains(&self, ngram: &[u8]) -> bool {
self.trie.contains_bytes(ngram)
}
pub fn iter_with_counts(&self) -> ShardResult<Vec<(Vec<u8>, u64)>> {
match self.trie.iter_prefix_with_values(b"") {
Some(iter) => Ok(iter
.filter(|(k, _)| !k.starts_with(Self::CHECKPOINT_PREFIX_BYTES))
.collect()),
None => Ok(Vec::new()),
}
}
pub fn sync(&self) -> ShardResult<()> {
self.trie.checkpoint().map_err(|e| ShardError::Checkpoint {
shard_key: self.key.to_string(),
message: format!("sync failed: {}", e),
})?;
self.lockfree_entries.store(0, Ordering::Relaxed);
Ok(())
}
pub fn sync_tracked(&self) -> ShardResult<bool> {
if !self.sync_coordinator.try_start_sync() {
return Ok(false);
}
match self.trie.checkpoint() {
Ok(()) => {
self.lockfree_entries.store(0, Ordering::Relaxed);
let lsn = self.trie.synced_lsn().unwrap_or(0);
self.sync_coordinator.complete_sync(lsn);
Ok(true)
}
Err(e) => {
let error_msg = format!("sync failed: {}", e);
self.sync_coordinator.fail_sync(&error_msg);
Err(ShardError::Sync {
shard_key: self.key.to_string(),
message: error_msg,
})
}
}
}
pub fn mark_dirty(&self) {
self.sync_coordinator.mark_dirty();
}
pub fn is_syncing(&self) -> bool {
self.sync_coordinator.is_syncing()
}
pub fn is_dirty(&self) -> bool {
self.sync_coordinator.is_dirty()
}
pub fn sync_state(&self) -> ShardSyncState {
self.sync_coordinator.state()
}
pub fn sync_coordinator(&self) -> &ShardSyncCoordinator {
&self.sync_coordinator
}
pub fn wait_for_sync(&self, timeout: Duration) -> ShardResult<()> {
self.sync_coordinator
.wait_for_sync(timeout)
.map_err(|()| ShardError::SyncTimeout {
shard_key: self.key.to_string(),
})
}
pub fn sync_async(&self) -> ShardResult<Option<ShardSyncHandle>> {
let handle = self.trie.wal_sync_async().map_err(|e| ShardError::Sync {
shard_key: self.key.to_string(),
message: format!("Failed to initiate async sync: {}", e),
})?;
Ok(handle.map(|inner| ShardSyncHandle {
inner,
shard_key: self.key.clone(),
}))
}
pub fn current_lsn(&self) -> u64 {
self.trie.current_lsn()
}
pub fn synced_lsn(&self) -> Option<u64> {
self.trie.synced_lsn()
}
pub fn flush_lockfree(&self) -> ShardResult<()> {
self.trie.checkpoint().map_err(|e| ShardError::Checkpoint {
shard_key: self.key.to_string(),
message: format!("flush_lockfree failed: {}", e),
})?;
self.lockfree_entries.store(0, Ordering::Relaxed);
Ok(())
}
pub fn checkpoint(&self) -> ShardResult<()> {
self.save_checkpoint_state()?;
self.trie
.flush_sequential()
.map_err(|e| ShardError::Checkpoint {
shard_key: self.key.to_string(),
message: format!("flush_sequential failed: {}", e),
})?;
self.trie.checkpoint().map_err(|e| ShardError::Checkpoint {
shard_key: self.key.to_string(),
message: e.to_string(),
})?;
self.lockfree_entries.store(0, Ordering::Relaxed);
Ok(())
}
pub fn complete_prefix(&mut self, prefix: &str) -> ShardResult<()> {
self.checkpoint_state
.completed_prefixes
.insert(prefix.to_string());
self.checkpoint_state.current_prefix = None;
self.persist_checkpoint_state()
}
pub fn set_current_prefix(&mut self, prefix: Option<&str>) {
self.checkpoint_state.current_prefix = prefix.map(String::from);
}
pub fn add_ngrams_processed(&mut self, count: u64) {
self.checkpoint_state.ngrams_processed += count;
}
fn load_checkpoint_state(&mut self) -> ShardResult<()> {
let ngrams_key = format!("{}ngrams_processed", Self::CHECKPOINT_PREFIX);
if let Some(value) = self.trie.get_value_bytes(ngrams_key.as_bytes()) {
self.checkpoint_state.ngrams_processed = value as u64;
}
let prefix_pattern = format!("{}prefix:", Self::CHECKPOINT_PREFIX);
let prefix_pattern_bytes = prefix_pattern.as_bytes();
if let Some(iter) = self.trie.iter_prefix_with_values(prefix_pattern_bytes) {
for (key, _value) in iter {
if key.starts_with(prefix_pattern_bytes) {
let suffix = &key[prefix_pattern_bytes.len()..];
if let Ok(prefix) = std::str::from_utf8(suffix) {
self.checkpoint_state
.completed_prefixes
.insert(prefix.to_string());
}
}
}
}
log::debug!(
"Shard {}: loaded {} completed prefixes, {} ngrams processed",
self.key,
self.checkpoint_state.completed_prefixes.len(),
self.checkpoint_state.ngrams_processed
);
Ok(())
}
fn save_checkpoint_state(&self) -> ShardResult<()> {
let ngrams_key = format!("{}ngrams_processed", Self::CHECKPOINT_PREFIX);
self.trie
.upsert_bytes(
ngrams_key.as_bytes(),
self.checkpoint_state.ngrams_processed,
)
.map_err(|e| ShardError::Checkpoint {
shard_key: self.key.to_string(),
message: format!("failed to save ngrams_processed: {}", e),
})?;
let completed_key = format!("{}completed", Self::CHECKPOINT_PREFIX);
self.trie
.upsert_bytes(
completed_key.as_bytes(),
self.checkpoint_state.completed_prefixes.len() as u64,
)
.map_err(|e| ShardError::Checkpoint {
shard_key: self.key.to_string(),
message: format!("failed to save completed count: {}", e),
})?;
for prefix in &self.checkpoint_state.completed_prefixes {
let prefix_key = format!("{}prefix:{}", Self::CHECKPOINT_PREFIX, prefix);
self.trie
.upsert_bytes(prefix_key.as_bytes(), 1) .map_err(|e| ShardError::Checkpoint {
shard_key: self.key.to_string(),
message: format!("failed to save prefix {}: {}", prefix, e),
})?;
}
Ok(())
}
pub fn persist_checkpoint_state(&self) -> ShardResult<()> {
self.save_checkpoint_state()?;
self.trie.sync().map_err(|e| ShardError::Checkpoint {
shard_key: self.key.to_string(),
message: format!("failed to sync checkpoint state: {}", e),
})
}
pub fn begin_prefix(&self, prefix: &str) -> ShardResult<PrefixTransaction<u64>> {
let document_id = format!("prefix:{}", prefix);
let tx = self
.trie
.begin_document(&document_id)
.map_err(|e| ShardError::Write {
shard_key: self.key.to_string(),
message: format!("Failed to begin transaction for prefix '{}': {}", prefix, e),
})?;
Ok(PrefixTransaction {
prefix: prefix.to_string(),
tx,
ngram_count: 0,
})
}
pub fn tx_insert(&self, tx: &mut PrefixTransaction<u64>, ngram: &[u8], count: u64) {
self.trie.tx_insert_bytes(&mut tx.tx, ngram, Some(count));
tx.ngram_count += 1;
}
pub fn commit_prefix(&mut self, tx: PrefixTransaction<u64>) -> ShardResult<usize> {
let ngram_count = tx.ngram_count;
let prefix = tx.prefix.clone();
let inserted = self
.trie
.commit_document(tx.tx)
.map_err(|e| ShardError::Write {
shard_key: self.key.to_string(),
message: format!(
"Failed to commit transaction for prefix '{}': {}",
prefix, e
),
})?;
self.stats.add_entries(inserted as u64);
self.stats.record_write();
self.checkpoint_state
.completed_prefixes
.insert(prefix.clone());
self.checkpoint_state.current_prefix = None;
self.persist_checkpoint_state()?;
self.sync_coordinator.mark_dirty();
log::trace!(
"Shard {}: committed prefix '{}' with {} n-grams ({} newly inserted)",
self.key,
prefix,
ngram_count,
inserted
);
Ok(ngram_count)
}
pub fn commit_chunk(&mut self, tx: PrefixTransaction<u64>) -> ShardResult<usize> {
let ngram_count = tx.ngram_count;
let prefix = tx.prefix.clone();
let inserted = self
.trie
.commit_document(tx.tx)
.map_err(|e| ShardError::Write {
shard_key: self.key.to_string(),
message: format!("Failed to commit chunk for prefix '{}': {}", prefix, e),
})?;
self.stats.add_entries(inserted as u64);
self.stats.record_write();
self.sync_coordinator.mark_dirty();
log::trace!(
"Shard {}: committed chunk for prefix '{}' with {} n-grams ({} newly inserted)",
self.key,
prefix,
ngram_count,
inserted
);
Ok(ngram_count)
}
pub fn abort_prefix(&self, tx: PrefixTransaction<u64>) -> ShardResult<()> {
let prefix = tx.prefix.clone();
self.trie
.abort_document(tx.tx)
.map_err(|e| ShardError::Write {
shard_key: self.key.to_string(),
message: format!("Failed to abort transaction for prefix '{}': {}", prefix, e),
})?;
log::trace!(
"Shard {}: aborted prefix '{}' transaction ({} n-grams discarded)",
self.key,
prefix,
tx.ngram_count
);
Ok(())
}
}
pub struct PrefixTransaction<V: liblevenshtein::dictionary::DictionaryValue> {
pub prefix: String,
tx: DocumentTransaction<V>,
pub ngram_count: usize,
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_shard_create_and_write() {
let dir = TempDir::new().expect("Failed to create temp dir");
let path = dir.path().join("test_shard.artrie");
let key = ShardKey::new("th");
let shard = ShardHandle::create(key.clone(), &path).expect("Failed to create shard");
let was_new = shard
.increment_lockfree(b"the|quick", 5)
.expect("Failed to increment");
assert!(was_new);
let was_new = shard
.increment_lockfree(b"the|quick", 3)
.expect("Failed to increment");
assert!(!was_new);
assert_eq!(shard.get(b"the|quick"), Some(8));
assert_eq!(shard.len(), 1);
}
#[test]
fn test_shard_persistence() {
let dir = TempDir::new().expect("Failed to create temp dir");
let path = dir.path().join("test_shard.artrie");
let key = ShardKey::new("th");
{
let shard = ShardHandle::create(key.clone(), &path).expect("Failed to create shard");
shard.increment_lockfree(b"the|quick", 10).unwrap();
shard.sync().unwrap();
}
{
let shard = ShardHandle::open(key, &path).expect("Failed to open shard");
assert_eq!(shard.get(b"the|quick"), Some(10));
}
}
#[test]
fn test_open_or_create_new_shard() {
let dir = TempDir::new().expect("Failed to create temp dir");
let path = dir.path().join("new_shard.artrie");
let key = ShardKey::new("ab");
assert!(!path.exists());
let shard = ShardHandle::open_or_create(key.clone(), &path)
.expect("Failed to open_or_create shard");
assert!(path.exists());
shard.increment_lockfree(b"apple|pie", 5).unwrap();
shard.sync().unwrap();
assert_eq!(shard.get(b"apple|pie"), Some(5));
}
#[test]
fn test_open_or_create_existing_shard() {
let dir = TempDir::new().expect("Failed to create temp dir");
let path = dir.path().join("existing_shard.artrie");
let key = ShardKey::new("cd");
{
let shard = ShardHandle::create(key.clone(), &path).expect("Failed to create shard");
shard.increment_lockfree(b"cat|dog", 7).unwrap();
shard.sync().unwrap();
}
let shard = ShardHandle::open_or_create(key, &path)
.expect("Failed to open_or_create existing shard");
assert_eq!(shard.get(b"cat|dog"), Some(7));
}
#[test]
fn test_sync_state_machine() {
let coordinator = ShardSyncCoordinator::new();
assert_eq!(coordinator.state(), ShardSyncState::Clean);
assert!(!coordinator.is_syncing());
assert!(!coordinator.is_dirty());
coordinator.mark_dirty();
assert_eq!(coordinator.state(), ShardSyncState::Dirty);
assert!(coordinator.is_dirty());
assert!(!coordinator.is_syncing());
coordinator.mark_dirty();
assert_eq!(coordinator.state(), ShardSyncState::Dirty);
assert!(coordinator.try_start_sync());
assert_eq!(coordinator.state(), ShardSyncState::Syncing);
assert!(coordinator.is_syncing());
assert!(!coordinator.is_dirty());
assert!(!coordinator.try_start_sync());
coordinator.complete_sync(100);
assert_eq!(coordinator.state(), ShardSyncState::Clean);
assert!(!coordinator.is_syncing());
assert_eq!(coordinator.last_synced_lsn(), 100);
}
#[test]
fn test_sync_state_failure() {
let coordinator = ShardSyncCoordinator::new();
coordinator.mark_dirty();
assert!(coordinator.try_start_sync());
assert!(coordinator.is_syncing());
coordinator.fail_sync("disk full");
assert_eq!(coordinator.state(), ShardSyncState::SyncFailed);
assert!(coordinator.is_sync_failed());
assert!(!coordinator.is_syncing());
assert_eq!(coordinator.last_error(), Some("disk full".to_string()));
assert!(coordinator.retry_sync());
assert_eq!(coordinator.state(), ShardSyncState::Dirty);
assert!(coordinator.is_dirty());
assert!(coordinator.try_start_sync());
coordinator.complete_sync(200);
assert_eq!(coordinator.state(), ShardSyncState::Clean);
}
#[test]
fn test_sync_tracked_marks_dirty() {
let dir = TempDir::new().expect("Failed to create temp dir");
let path = dir.path().join("test_shard.artrie");
let key = ShardKey::new("th");
let shard = ShardHandle::create(key, &path).expect("Failed to create shard");
assert_eq!(shard.sync_state(), ShardSyncState::Clean);
shard
.increment_lockfree(b"the|quick", 5)
.expect("Failed to increment");
assert_eq!(shard.sync_state(), ShardSyncState::Dirty);
assert!(shard.sync_tracked().expect("sync_tracked failed"));
assert_eq!(shard.sync_state(), ShardSyncState::Clean);
assert!(!shard.sync_tracked().expect("sync_tracked failed"));
}
#[test]
fn test_sync_coordinator_wait() {
use std::thread;
let coordinator = Arc::new(ShardSyncCoordinator::new());
coordinator.mark_dirty();
assert!(coordinator.try_start_sync());
let coordinator_clone = Arc::clone(&coordinator);
let handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
coordinator_clone.complete_sync(42);
});
let result = coordinator.wait_for_sync(Duration::from_millis(200));
assert!(result.is_ok());
assert_eq!(coordinator.state(), ShardSyncState::Clean);
handle.join().expect("Thread panicked");
}
#[test]
fn test_sync_coordinator_timeout() {
let coordinator = ShardSyncCoordinator::new();
coordinator.mark_dirty();
assert!(coordinator.try_start_sync());
let result = coordinator.wait_for_sync(Duration::from_millis(10));
assert!(result.is_err());
assert!(coordinator.is_syncing()); }
#[test]
fn test_commit_prefix_updates_checkpoint_state() {
let dir = TempDir::new().expect("Failed to create temp dir");
let path = dir.path().join("test_shard.artrie");
let key = ShardKey::new("th");
{
let mut shard =
ShardHandle::create(key.clone(), &path).expect("Failed to create shard");
let mut tx = shard.begin_prefix("th").expect("Failed to begin prefix");
shard.tx_insert(&mut tx, b"the|quick", 10);
shard.tx_insert(&mut tx, b"the|brown", 5);
assert!(
shard.checkpoint_state().completed_prefixes.is_empty(),
"Checkpoint state should be empty before commit"
);
let count = shard.commit_prefix(tx).expect("Failed to commit prefix");
assert_eq!(count, 2, "Should have committed 2 n-grams");
assert!(
shard.checkpoint_state().completed_prefixes.contains("th"),
"Checkpoint state should contain 'th' after commit"
);
assert!(
shard.checkpoint_state().current_prefix.is_none(),
"Current prefix should be None after commit"
);
}
{
let shard = ShardHandle::open(key, &path).expect("Failed to open shard");
assert!(
shard.checkpoint_state().completed_prefixes.contains("th"),
"Checkpoint state should persist 'th' across reopen - this is the fix!"
);
assert_eq!(shard.get(b"the|quick"), Some(10));
assert_eq!(shard.get(b"the|brown"), Some(5));
}
}
#[test]
fn test_commit_prefix_multiple_prefixes() {
let dir = TempDir::new().expect("Failed to create temp dir");
let path = dir.path().join("test_shard.artrie");
let key = ShardKey::new("th");
{
let mut shard =
ShardHandle::create(key.clone(), &path).expect("Failed to create shard");
let mut tx1 = shard.begin_prefix("th").expect("Failed to begin prefix");
shard.tx_insert(&mut tx1, b"the|quick", 10);
shard.commit_prefix(tx1).expect("Failed to commit prefix");
let mut tx2 = shard.begin_prefix("ti").expect("Failed to begin prefix");
shard.tx_insert(&mut tx2, b"time|flies", 3);
shard.commit_prefix(tx2).expect("Failed to commit prefix");
assert!(shard.checkpoint_state().completed_prefixes.contains("th"));
assert!(shard.checkpoint_state().completed_prefixes.contains("ti"));
}
{
let shard = ShardHandle::open(key, &path).expect("Failed to open shard");
assert!(shard.checkpoint_state().completed_prefixes.contains("th"));
assert!(shard.checkpoint_state().completed_prefixes.contains("ti"));
assert_eq!(shard.checkpoint_state().completed_prefixes.len(), 2);
}
}
#[test]
fn test_abort_prefix_does_not_update_checkpoint() {
let dir = TempDir::new().expect("Failed to create temp dir");
let path = dir.path().join("test_shard.artrie");
let key = ShardKey::new("th");
{
let shard = ShardHandle::create(key.clone(), &path).expect("Failed to create shard");
let mut tx = shard.begin_prefix("th").expect("Failed to begin prefix");
shard.tx_insert(&mut tx, b"the|quick", 10);
shard.abort_prefix(tx).expect("Failed to abort prefix");
assert!(
shard.checkpoint_state().completed_prefixes.is_empty(),
"Aborted prefix should not appear in checkpoint state"
);
assert_eq!(shard.get(b"the|quick"), None);
}
}
#[test]
fn test_lockfree_entry_count_increments_on_increment() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("test_shard.artrie");
let shard = ShardHandle::create(ShardKey::new("th"), &path).expect("create");
assert_eq!(shard.lockfree_entry_count(), 0);
shard.increment_lockfree(b"the|quick", 1).unwrap();
shard.increment_lockfree(b"the|brown", 1).unwrap();
shard.increment_lockfree(b"the|fox", 1).unwrap();
assert_eq!(shard.lockfree_entry_count(), 3);
}
#[test]
fn test_lockfree_entry_count_resets_on_sync() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("test_shard.artrie");
let shard = ShardHandle::create(ShardKey::new("th"), &path).expect("create");
shard.increment_lockfree(b"the|quick", 1).unwrap();
shard.increment_lockfree(b"the|brown", 1).unwrap();
assert_eq!(shard.lockfree_entry_count(), 2);
shard.sync().expect("sync");
assert_eq!(
shard.lockfree_entry_count(),
0,
"sync() merges lock-free overlay into persistent and should reset the counter"
);
}
#[test]
fn test_lockfree_entry_count_resets_on_flush_lockfree() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("test_shard.artrie");
let shard = ShardHandle::create(ShardKey::new("th"), &path).expect("create");
shard.increment_lockfree(b"the|quick", 1).unwrap();
shard.increment_lockfree(b"the|brown", 1).unwrap();
shard.increment_lockfree(b"the|fox", 1).unwrap();
assert_eq!(shard.lockfree_entry_count(), 3);
shard.flush_lockfree().expect("flush_lockfree");
assert_eq!(
shard.lockfree_entry_count(),
0,
"flush_lockfree() should reset the counter"
);
}
#[test]
fn test_lockfree_entry_count_resets_on_checkpoint() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("test_shard.artrie");
let shard = ShardHandle::create(ShardKey::new("th"), &path).expect("create");
shard.increment_lockfree(b"the|quick", 1).unwrap();
assert_eq!(shard.lockfree_entry_count(), 1);
shard.checkpoint().expect("checkpoint");
assert_eq!(
shard.lockfree_entry_count(),
0,
"checkpoint() should reset the counter"
);
}
#[test]
fn test_commit_chunk_persists_data() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("test_shard.artrie");
let mut shard = ShardHandle::create(ShardKey::new("th"), &path).expect("create");
let mut tx = shard.begin_prefix("th").expect("begin_prefix");
for n in 0..5 {
let key = format!("the|word{}", n);
shard.tx_insert(&mut tx, key.as_bytes(), 100 + n as u64);
}
let inserted = shard.commit_chunk(tx).expect("commit_chunk");
assert_eq!(inserted, 5, "should commit all 5 buffered n-grams");
for n in 0..5 {
let key = format!("the|word{}", n);
assert_eq!(shard.get(key.as_bytes()), Some(100 + n as u64));
}
assert!(
!shard.checkpoint_state().completed_prefixes.contains("th"),
"commit_chunk must NOT mark the prefix as complete — that's commit_prefix's job"
);
}
#[test]
fn test_commit_chunk_then_commit_prefix_marks_complete() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("test_shard.artrie");
let key = ShardKey::new("th");
{
let mut shard = ShardHandle::create(key.clone(), &path).expect("create");
let mut tx1 = shard.begin_prefix("th").expect("begin_prefix");
for n in 0..5 {
let k = format!("the|word{}", n);
shard.tx_insert(&mut tx1, k.as_bytes(), 100 + n as u64);
}
shard.commit_chunk(tx1).expect("commit_chunk 1");
let mut tx2 = shard.begin_prefix("th").expect("begin_prefix");
for n in 5..8 {
let k = format!("the|word{}", n);
shard.tx_insert(&mut tx2, k.as_bytes(), 100 + n as u64);
}
let inserted = shard.commit_prefix(tx2).expect("commit_prefix");
assert_eq!(inserted, 3);
for n in 0..8 {
let k = format!("the|word{}", n);
assert_eq!(shard.get(k.as_bytes()), Some(100 + n as u64));
}
assert!(
shard.checkpoint_state().completed_prefixes.contains("th"),
"commit_prefix on the final chunk should mark the prefix complete"
);
}
{
let shard = ShardHandle::open(key, &path).expect("open");
assert!(shard.checkpoint_state().completed_prefixes.contains("th"));
for n in 0..8 {
let k = format!("the|word{}", n);
assert_eq!(shard.get(k.as_bytes()), Some(100 + n as u64));
}
}
}
#[test]
fn test_commit_chunk_handles_large_batch_and_varint_boundary() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("test_shard.artrie");
let mut shard = ShardHandle::create(ShardKey::new("th"), &path).expect("create");
fn encode_varint(mut v: u64, out: &mut Vec<u8>) {
loop {
let b = (v & 0x7f) as u8;
v >>= 7;
if v == 0 {
out.push(b);
break;
} else {
out.push(b | 0x80);
}
}
}
let mut tx = shard.begin_prefix("th").expect("begin_prefix");
let mut keys = Vec::new();
for i in 0..200u64 {
let mut key = vec![0x01u8];
encode_varint(i + 2, &mut key); shard.tx_insert(&mut tx, &key, 1000 + i);
keys.push(key);
}
let inserted = shard.commit_prefix(tx).expect("commit_prefix");
assert_eq!(inserted, 200);
let mut missing = Vec::new();
for (i, key) in keys.iter().enumerate() {
if shard.get(key) != Some(1000 + i as u64) {
missing.push((i, key.clone(), shard.get(key)));
}
}
assert!(
missing.is_empty(),
"shard-level: missing {} keys: first: {:?}",
missing.len(),
&missing[..missing.len().min(3)]
);
}
#[test]
fn test_commit_chunk_set_semantics_idempotent() {
let dir = TempDir::new().expect("tempdir");
let path = dir.path().join("test_shard.artrie");
let mut shard = ShardHandle::create(ShardKey::new("th"), &path).expect("create");
let mut tx1 = shard.begin_prefix("th").expect("begin_prefix");
shard.tx_insert(&mut tx1, b"the|fox", 10);
shard.commit_chunk(tx1).expect("commit_chunk 1");
assert_eq!(shard.get(b"the|fox"), Some(10));
let mut tx2 = shard.begin_prefix("th").expect("begin_prefix");
shard.tx_insert(&mut tx2, b"the|fox", 10);
shard.commit_chunk(tx2).expect("commit_chunk 2");
assert_eq!(
shard.get(b"the|fox"),
Some(10),
"commit_chunk uses SET semantics — re-inserting the same value must not double it"
);
}
#[test]
fn test_overlay_eviction_is_lossless_and_observable() {
let dir = TempDir::new().expect("Failed to create temp dir");
let path = dir.path().join("evict_shard.artrie");
let shard = ShardHandle::create(ShardKey::new("th"), &path).expect("create");
shard
.arm_eviction(Some(EvictionConfig {
resident_budget_bytes: Some(4096),
..EvictionConfig::without_memory_monitor()
}))
.expect("arm eviction");
const N: u64 = 2000;
for i in 0..N {
shard
.increment_lockfree(format!("th|w{:05}", i).as_bytes(), i + 1)
.expect("increment");
}
shard.checkpoint().expect("checkpoint 1");
shard.checkpoint().expect("checkpoint 2");
let stats = shard.eviction_stats();
assert!(
stats.nodes_evicted > 0,
"budget eviction should have reclaimed cold overlay nodes (nodes_evicted={})",
stats.nodes_evicted
);
for i in 0..N {
assert_eq!(
shard.get(format!("th|w{:05}", i).as_bytes()),
Some(i + 1),
"evicted key th|w{:05} must fault back losslessly",
i
);
}
}
#[test]
fn test_overlay_eviction_bounds_resident_to_budget() {
let dir = TempDir::new().expect("Failed to create temp dir");
let path = dir.path().join("evict_budget.artrie");
let shard = ShardHandle::create(ShardKey::new("th"), &path).expect("create");
const BUDGET: usize = 1024 * 1024;
shard
.arm_eviction(Some(EvictionConfig {
resident_budget_bytes: Some(BUDGET),
..EvictionConfig::without_memory_monitor()
}))
.expect("arm eviction");
const N: u64 = 50_000;
for i in 0..N {
shard
.increment_lockfree(format!("th|term{:06}", i).as_bytes(), 1)
.expect("increment");
}
shard.checkpoint().expect("checkpoint 1");
shard.checkpoint().expect("checkpoint 2");
let stats = shard.eviction_stats();
assert!(
stats.nodes_evicted >= 30_000,
"budget eviction must reclaim the bulk of the {}-key overlay \
(nodes_evicted={}, registry resident_bytes={})",
N,
stats.nodes_evicted,
stats.resident_bytes
);
for i in 0..N {
assert_eq!(
shard.get(format!("th|term{:06}", i).as_bytes()),
Some(1),
"key th|term{:06} lost under budget eviction",
i
);
}
}
#[test]
fn test_overlay_eviction_under_concurrent_writers() {
let dir = TempDir::new().expect("Failed to create temp dir");
let path = dir.path().join("evict_concurrent.artrie");
let shard = Arc::new(ShardHandle::create(ShardKey::new("th"), &path).expect("create"));
shard
.arm_eviction(Some(EvictionConfig {
resident_budget_bytes: Some(4096),
..EvictionConfig::without_memory_monitor()
}))
.expect("arm eviction");
const WRITERS: u64 = 4;
const PER_WRITER: u64 = 500;
let writers: Vec<_> = (0..WRITERS)
.map(|w| {
let shard = Arc::clone(&shard);
std::thread::spawn(move || {
for i in 0..PER_WRITER {
shard
.increment_lockfree(format!("th|w{}_{:04}", w, i).as_bytes(), 1)
.expect("concurrent increment");
}
})
})
.collect();
let checkpointer = {
let shard = Arc::clone(&shard);
std::thread::spawn(move || {
for _ in 0..4 {
shard.checkpoint().expect("concurrent checkpoint");
}
})
};
for writer in writers {
writer.join().expect("writer thread panicked");
}
checkpointer.join().expect("checkpoint thread panicked");
shard.checkpoint().expect("final checkpoint");
for w in 0..WRITERS {
for i in 0..PER_WRITER {
let key = format!("th|w{}_{:04}", w, i);
assert_eq!(
shard.get(key.as_bytes()),
Some(1),
"write {} lost under concurrent budget eviction",
key
);
}
}
}
}