use crate::core::error::Result;
use crate::core::id::{EdgeId, NodeId, VersionId};
use crate::core::version::{EdgeVersion, FastHashMap, NodeVersion};
use crate::storage::redb_cold_storage::RedbColdStorage;
use crate::storage::wal::LSN;
use crate::storage::wal::flush_coordinator::FlushCoordinator;
use quick_cache::sync::Cache;
use std::collections::HashMap;
use std::panic;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Condvar, Mutex};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
#[cfg(feature = "config-toml")]
use serde::{Deserialize, Serialize};
const SHUTDOWN_CHECK_INTERVAL: Duration = Duration::from_millis(50);
const MAX_ACCESS_ENTRIES: usize = 1_000_000;
const DROP_TIMEOUT: Duration = Duration::from_secs(5);
#[derive(Debug, Clone)]
#[cfg_attr(feature = "config-toml", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "config-toml", serde(default))]
pub struct MigrationPolicy {
pub age_threshold: Duration,
pub memory_threshold_bytes: usize,
pub min_hot_versions: usize,
pub batch_size: usize,
pub run_interval: Duration,
pub enabled: bool,
pub enable_lru: bool,
}
impl Default for MigrationPolicy {
fn default() -> Self {
Self {
age_threshold: Duration::from_secs(7 * 24 * 60 * 60), memory_threshold_bytes: 1024 * 1024 * 1024, min_hot_versions: 1,
batch_size: 1000,
run_interval: Duration::from_secs(60),
enabled: true,
enable_lru: false,
}
}
}
impl MigrationPolicy {
pub fn builder() -> MigrationPolicyBuilder {
MigrationPolicyBuilder::new()
}
pub fn aggressive() -> Self {
Self {
age_threshold: Duration::from_secs(24 * 60 * 60), memory_threshold_bytes: 512 * 1024 * 1024, min_hot_versions: 1,
batch_size: 2000,
run_interval: Duration::from_secs(30),
enabled: true,
enable_lru: true, }
}
pub fn conservative() -> Self {
Self {
age_threshold: Duration::from_secs(30 * 24 * 60 * 60), memory_threshold_bytes: 4 * 1024 * 1024 * 1024, min_hot_versions: 5,
batch_size: 500,
run_interval: Duration::from_secs(300),
enabled: true,
enable_lru: false,
}
}
pub fn disabled() -> Self {
Self {
enabled: false,
..Default::default()
}
}
}
#[derive(Debug, Default)]
pub struct MigrationPolicyBuilder {
policy: MigrationPolicy,
}
impl MigrationPolicyBuilder {
pub fn new() -> Self {
Self {
policy: MigrationPolicy::default(),
}
}
pub fn age_threshold(mut self, duration: Duration) -> Self {
self.policy.age_threshold = duration;
self
}
pub fn memory_threshold_bytes(mut self, bytes: usize) -> Self {
self.policy.memory_threshold_bytes = bytes;
self
}
pub fn min_hot_versions(mut self, count: usize) -> Self {
self.policy.min_hot_versions = count;
self
}
pub fn batch_size(mut self, size: usize) -> Self {
self.policy.batch_size = size;
self
}
pub fn run_interval(mut self, interval: Duration) -> Self {
self.policy.run_interval = interval;
self
}
pub fn enabled(mut self, enabled: bool) -> Self {
self.policy.enabled = enabled;
self
}
pub fn enable_lru_migration(mut self, enable: bool) -> Self {
self.policy.enable_lru = enable;
self
}
pub fn build(self) -> MigrationPolicy {
self.policy
}
}
#[derive(Debug, Clone)]
pub struct MigrationCandidate {
pub version_id: VersionId,
pub is_node: bool,
pub age: Duration,
pub estimated_size: usize,
}
#[derive(Debug, Clone, Default)]
pub struct MigrationProgress {
pub total_versions: usize,
pub migrated_versions: usize,
pub bytes_migrated: u64,
pub current_batch: usize,
pub total_batches: usize,
pub elapsed: Duration,
}
impl MigrationProgress {
pub fn percentage(&self) -> f64 {
if self.total_versions == 0 {
100.0
} else {
(self.migrated_versions as f64 / self.total_versions as f64) * 100.0
}
}
pub fn is_complete(&self) -> bool {
self.migrated_versions >= self.total_versions
}
pub fn versions_per_second(&self) -> f64 {
let secs = self.elapsed.as_secs_f64();
if secs > 0.0 {
self.migrated_versions as f64 / secs
} else {
0.0
}
}
pub fn bytes_per_second(&self) -> f64 {
let secs = self.elapsed.as_secs_f64();
if secs > 0.0 {
self.bytes_migrated as f64 / secs
} else {
0.0
}
}
pub fn estimated_remaining(&self) -> Duration {
let remaining = self.total_versions.saturating_sub(self.migrated_versions);
let vps = self.versions_per_second();
if vps > 0.0 {
Duration::from_secs_f64(remaining as f64 / vps)
} else {
Duration::ZERO
}
}
}
#[derive(Debug, Clone, Default)]
pub struct MigrationStats {
pub node_versions_migrated: u64,
pub edge_versions_migrated: u64,
pub bytes_migrated: u64,
pub runs_completed: u64,
pub errors: u64,
pub last_run_duration: Duration,
pub last_run_time: Option<Instant>,
}
impl MigrationStats {
pub fn versions_per_second(&self) -> f64 {
let total = self.node_versions_migrated + self.edge_versions_migrated;
if self.last_run_duration.as_secs_f64() > 0.0 {
total as f64 / self.last_run_duration.as_secs_f64()
} else {
0.0
}
}
}
#[derive(Debug, Clone)]
pub struct MigrationWithLsnResult {
pub nodes_migrated: usize,
pub edges_migrated: usize,
pub segments_truncated: usize,
pub flushed_lsn: Option<LSN>,
}
impl MigrationWithLsnResult {
pub fn has_migrations(&self) -> bool {
self.nodes_migrated > 0 || self.edges_migrated > 0
}
pub fn total_migrated(&self) -> usize {
self.nodes_migrated + self.edges_migrated
}
}
#[derive(Debug, Default)]
pub struct AtomicMigrationStats {
node_versions_migrated: AtomicU64,
edge_versions_migrated: AtomicU64,
bytes_migrated: AtomicU64,
runs_completed: AtomicU64,
errors: AtomicU64,
}
impl AtomicMigrationStats {
pub fn new() -> Self {
Self::default()
}
pub fn snapshot(&self) -> MigrationStats {
MigrationStats {
node_versions_migrated: self.node_versions_migrated.load(Ordering::Relaxed),
edge_versions_migrated: self.edge_versions_migrated.load(Ordering::Relaxed),
bytes_migrated: self.bytes_migrated.load(Ordering::Relaxed),
runs_completed: self.runs_completed.load(Ordering::Relaxed),
errors: self.errors.load(Ordering::Relaxed),
last_run_duration: Duration::ZERO,
last_run_time: None,
}
}
}
pub trait MigrationCallback: Send + Sync {
fn before_node_migration(&self, _version: &NodeVersion) -> bool {
true
}
fn before_edge_migration(&self, _version: &EdgeVersion) -> bool {
true
}
fn after_batch(&self, _node_count: usize, _edge_count: usize) {}
fn on_error(&self, _error: &str) {}
fn on_progress(&self, _progress: &MigrationProgress) {}
}
pub struct DefaultMigrationCallback;
impl MigrationCallback for DefaultMigrationCallback {}
pub struct MigrationService {
cold_storage: Arc<RedbColdStorage>,
policy: MigrationPolicy,
stats: Arc<AtomicMigrationStats>,
running: Arc<AtomicBool>,
callback: Arc<dyn MigrationCallback>,
access_times: Arc<Cache<VersionId, Instant>>,
worker_handle: Mutex<Option<JoinHandle<()>>>,
shutdown_complete: Arc<(Mutex<(bool, u64)>, Condvar)>,
generation: Arc<AtomicU64>,
flush_coordinator: Option<Arc<FlushCoordinator>>,
}
impl MigrationService {
pub fn new(cold_storage: Arc<RedbColdStorage>, policy: MigrationPolicy) -> Self {
Self {
cold_storage,
policy,
stats: Arc::new(AtomicMigrationStats::new()),
running: Arc::new(AtomicBool::new(false)),
callback: Arc::new(DefaultMigrationCallback),
access_times: Arc::new(Cache::new(MAX_ACCESS_ENTRIES)),
worker_handle: Mutex::new(None),
shutdown_complete: Arc::new((Mutex::new((true, 0)), Condvar::new())),
generation: Arc::new(AtomicU64::new(0)),
flush_coordinator: None,
}
}
pub fn with_callback(
cold_storage: Arc<RedbColdStorage>,
policy: MigrationPolicy,
callback: Arc<dyn MigrationCallback>,
) -> Self {
Self {
cold_storage,
policy,
stats: Arc::new(AtomicMigrationStats::new()),
running: Arc::new(AtomicBool::new(false)),
callback,
access_times: Arc::new(Cache::new(MAX_ACCESS_ENTRIES)),
worker_handle: Mutex::new(None),
shutdown_complete: Arc::new((Mutex::new((true, 0)), Condvar::new())),
generation: Arc::new(AtomicU64::new(0)),
flush_coordinator: None,
}
}
pub fn set_flush_coordinator(&mut self, coordinator: Arc<FlushCoordinator>) {
self.flush_coordinator = Some(coordinator);
}
pub fn flush_coordinator(&self) -> Option<&Arc<FlushCoordinator>> {
self.flush_coordinator.as_ref()
}
pub fn start(&self) {
if self
.running
.compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed)
.is_err()
{
return;
}
let current_gen = self.generation.fetch_add(1, Ordering::SeqCst) + 1;
{
let (lock, _) = &*self.shutdown_complete;
let mut state = lock
.lock()
.expect("MigrationService shutdown lock poisoned in start()");
*state = (false, current_gen);
}
let running = self.running.clone();
let policy = self.policy.clone();
let shutdown_complete = self.shutdown_complete.clone();
let handle = thread::spawn(move || {
Self::worker_loop(running, policy, shutdown_complete, current_gen);
});
let mut handle_guard = self
.worker_handle
.lock()
.expect("MigrationService worker_handle lock poisoned in start()");
*handle_guard = Some(handle);
}
fn worker_loop(
running: Arc<AtomicBool>,
policy: MigrationPolicy,
shutdown_complete: Arc<(Mutex<(bool, u64)>, Condvar)>,
generation: u64,
) {
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
Self::worker_loop_inner(&running, &policy);
}));
let (lock, cvar) = &*shutdown_complete;
let mut state = lock
.lock()
.expect("MigrationService shutdown lock poisoned in worker_loop");
if state.1 == generation {
state.0 = true;
cvar.notify_all();
}
if let Err(panic_payload) = result {
running.store(false, Ordering::SeqCst);
panic::resume_unwind(panic_payload);
}
}
fn worker_loop_inner(running: &AtomicBool, policy: &MigrationPolicy) {
while running.load(Ordering::SeqCst) {
let mut remaining = policy.run_interval;
while remaining > Duration::ZERO && running.load(Ordering::SeqCst) {
let sleep_time = remaining.min(SHUTDOWN_CHECK_INTERVAL);
thread::sleep(sleep_time);
remaining = remaining.saturating_sub(sleep_time);
}
if !running.load(Ordering::SeqCst) {
break;
}
}
}
pub fn stop(&self) {
if !self.running.swap(false, Ordering::SeqCst) {
return;
}
let current_gen = self.generation.load(Ordering::SeqCst);
let (lock, cvar) = &*self.shutdown_complete;
let mut state = lock
.lock()
.expect("MigrationService shutdown lock poisoned in stop()");
while !state.0 || state.1 != current_gen {
if state.1 > current_gen {
break;
}
state = cvar
.wait(state)
.expect("MigrationService shutdown condvar wait failed");
}
let mut handle_guard = self
.worker_handle
.lock()
.expect("MigrationService worker_handle lock poisoned in stop()");
if let Some(handle) = handle_guard.take() {
let _ = handle.join();
}
}
pub fn should_trigger_migration(
&self,
current_memory_bytes: usize,
old_version_count: usize,
) -> bool {
if !self.policy.enabled {
return false;
}
if current_memory_bytes > self.policy.memory_threshold_bytes {
return true;
}
if old_version_count > 0 {
return true;
}
false
}
pub fn record_access(&self, version_id: VersionId) {
self.access_times.insert(version_id, Instant::now());
}
pub fn get_last_access(&self, version_id: VersionId) -> Option<Instant> {
self.access_times.get(&version_id)
}
pub fn clear_access(&self, version_ids: &[VersionId]) {
for id in version_ids {
self.access_times.remove(id);
}
}
fn sort_candidates_by_policy(&self, candidates: &mut [MigrationCandidate]) {
if self.policy.enable_lru {
let candidate_access_times: HashMap<VersionId, Option<Instant>> = candidates
.iter()
.map(|c| (c.version_id, self.access_times.get(&c.version_id)))
.collect();
candidates.sort_by(|a, b| {
let a_access = candidate_access_times.get(&a.version_id).and_then(|&t| t);
let b_access = candidate_access_times.get(&b.version_id).and_then(|&t| t);
match (a_access, b_access) {
(Some(a_time), Some(b_time)) => a_time.cmp(&b_time),
(None, Some(_)) => std::cmp::Ordering::Less,
(Some(_), None) => std::cmp::Ordering::Greater,
(None, None) => b.age.cmp(&a.age),
}
});
} else {
candidates.sort_by_key(|candidate| std::cmp::Reverse(candidate.age));
}
}
pub fn policy(&self) -> &MigrationPolicy {
&self.policy
}
pub fn stats(&self) -> MigrationStats {
self.stats.snapshot()
}
pub fn is_running(&self) -> bool {
self.running.load(Ordering::Relaxed)
}
pub fn migrate_node_versions(&self, versions: &[NodeVersion]) -> Result<usize> {
if !self.policy.enabled {
return Ok(0);
}
let start_time = Instant::now();
let total_versions = versions.len();
let total_batches = total_versions.div_ceil(self.policy.batch_size);
let mut migrated = 0;
let mut batch = Vec::with_capacity(self.policy.batch_size.min(versions.len()));
let mut total_bytes = 0u64;
let mut current_batch = 0;
for version in versions {
if !self.callback.before_node_migration(version) {
continue;
}
total_bytes += version.estimated_size() as u64;
batch.push(version.clone());
if batch.len() >= self.policy.batch_size {
self.cold_storage.store_node_versions_batch(&batch)?;
migrated += batch.len();
current_batch += 1;
self.callback.after_batch(batch.len(), 0);
let progress = MigrationProgress {
total_versions,
migrated_versions: migrated,
bytes_migrated: total_bytes,
current_batch,
total_batches,
elapsed: start_time.elapsed(),
};
self.callback.on_progress(&progress);
batch.clear();
}
}
if !batch.is_empty() {
self.cold_storage.store_node_versions_batch(&batch)?;
migrated += batch.len();
current_batch += 1;
self.callback.after_batch(batch.len(), 0);
let progress = MigrationProgress {
total_versions,
migrated_versions: migrated,
bytes_migrated: total_bytes,
current_batch,
total_batches: current_batch, elapsed: start_time.elapsed(),
};
self.callback.on_progress(&progress);
}
self.stats
.node_versions_migrated
.fetch_add(migrated as u64, Ordering::Relaxed);
self.stats
.bytes_migrated
.fetch_add(total_bytes, Ordering::Relaxed);
Ok(migrated)
}
pub fn migrate_edge_versions(&self, versions: &[EdgeVersion]) -> Result<usize> {
if !self.policy.enabled {
return Ok(0);
}
let start_time = Instant::now();
let total_versions = versions.len();
let total_batches = total_versions.div_ceil(self.policy.batch_size);
let mut migrated = 0;
let mut batch = Vec::with_capacity(self.policy.batch_size.min(versions.len()));
let mut total_bytes = 0u64;
let mut current_batch = 0;
for version in versions {
if !self.callback.before_edge_migration(version) {
continue;
}
total_bytes += version.estimated_size() as u64;
batch.push(version.clone());
if batch.len() >= self.policy.batch_size {
self.cold_storage.store_edge_versions_batch(&batch)?;
migrated += batch.len();
current_batch += 1;
self.callback.after_batch(0, batch.len());
let progress = MigrationProgress {
total_versions,
migrated_versions: migrated,
bytes_migrated: total_bytes,
current_batch,
total_batches,
elapsed: start_time.elapsed(),
};
self.callback.on_progress(&progress);
batch.clear();
}
}
if !batch.is_empty() {
self.cold_storage.store_edge_versions_batch(&batch)?;
migrated += batch.len();
current_batch += 1;
self.callback.after_batch(0, batch.len());
let progress = MigrationProgress {
total_versions,
migrated_versions: migrated,
bytes_migrated: total_bytes,
current_batch,
total_batches: current_batch,
elapsed: start_time.elapsed(),
};
self.callback.on_progress(&progress);
}
self.stats
.edge_versions_migrated
.fetch_add(migrated as u64, Ordering::Relaxed);
self.stats
.bytes_migrated
.fetch_add(total_bytes, Ordering::Relaxed);
Ok(migrated)
}
pub fn migrate_batch_with_lsn(
&self,
nodes: &[NodeVersion],
edges: &[EdgeVersion],
lsn: LSN,
) -> Result<MigrationWithLsnResult> {
if !self.policy.enabled {
return Ok(MigrationWithLsnResult {
nodes_migrated: 0,
edges_migrated: 0,
segments_truncated: 0,
flushed_lsn: None,
});
}
let start_time = Instant::now();
let filtered_nodes: Vec<_> = nodes
.iter()
.filter(|v| self.callback.before_node_migration(v))
.cloned()
.collect();
let filtered_edges: Vec<_> = edges
.iter()
.filter(|v| self.callback.before_edge_migration(v))
.cloned()
.collect();
let node_bytes: u64 = filtered_nodes
.iter()
.map(|v| v.estimated_size() as u64)
.sum();
let edge_bytes: u64 = filtered_edges
.iter()
.map(|v| v.estimated_size() as u64)
.sum();
let total_bytes = node_bytes + edge_bytes;
self.cold_storage
.store_batch_with_lsn(&filtered_nodes, &filtered_edges, lsn)?;
let nodes_migrated = filtered_nodes.len();
let edges_migrated = filtered_edges.len();
self.callback.after_batch(nodes_migrated, edges_migrated);
let flushed_lsn_value = self.cold_storage.get_flushed_lsn()?;
let segments_truncated = if let Some(coordinator) = &self.flush_coordinator {
if let Some(flushed_lsn) = flushed_lsn_value {
debug_assert!(
flushed_lsn.0 >= lsn.0,
"Flushed LSN ({}) should be >= stored LSN ({})",
flushed_lsn.0,
lsn.0
);
coordinator.truncate_to_lsn(flushed_lsn)?
} else {
0
}
} else {
0
};
self.stats
.node_versions_migrated
.fetch_add(nodes_migrated as u64, Ordering::Relaxed);
self.stats
.edge_versions_migrated
.fetch_add(edges_migrated as u64, Ordering::Relaxed);
self.stats
.bytes_migrated
.fetch_add(total_bytes, Ordering::Relaxed);
let progress = MigrationProgress {
total_versions: nodes.len() + edges.len(),
migrated_versions: nodes_migrated + edges_migrated,
bytes_migrated: total_bytes,
current_batch: 1,
total_batches: 1,
elapsed: start_time.elapsed(),
};
self.callback.on_progress(&progress);
Ok(MigrationWithLsnResult {
nodes_migrated,
edges_migrated,
segments_truncated,
flushed_lsn: flushed_lsn_value,
})
}
pub fn identify_node_candidates(
&self,
versions: &FastHashMap<VersionId, NodeVersion>,
head_versions: &FastHashMap<NodeId, VersionId>,
version_counts: &FastHashMap<NodeId, usize>,
_current_time: Instant,
) -> Vec<MigrationCandidate> {
let mut candidates_per_node: FastHashMap<NodeId, usize> = FastHashMap::default();
let mut all_candidates = Vec::with_capacity(versions.len());
let current_wallclock_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0);
for (version_id, version) in versions {
if let Some(&head_id) = head_versions.get(&version.node_id)
&& *version_id == head_id
{
continue;
}
let tx_start_ms = version.temporal.transaction_time().start().wallclock();
let age_ms = (current_wallclock_ms - tx_start_ms).max(0) as u64;
let age = Duration::from_millis(age_ms);
if age >= self.policy.age_threshold {
all_candidates.push(MigrationCandidate {
version_id: *version_id,
is_node: true,
age,
estimated_size: version.estimated_size(),
});
}
}
self.sort_candidates_by_policy(&mut all_candidates);
let mut final_candidates = Vec::with_capacity(all_candidates.len());
for candidate in all_candidates {
let node_id = versions.get(&candidate.version_id).map(|v| v.node_id);
if let Some(node_id) = node_id {
let count = version_counts.get(&node_id).copied().unwrap_or(0);
let already_selected = candidates_per_node.get(&node_id).copied().unwrap_or(0);
let max_migrate = count.saturating_sub(self.policy.min_hot_versions);
if already_selected < max_migrate {
*candidates_per_node.entry(node_id).or_insert(0) += 1;
final_candidates.push(candidate);
}
}
}
final_candidates
}
pub fn identify_edge_candidates(
&self,
versions: &FastHashMap<VersionId, EdgeVersion>,
head_versions: &FastHashMap<EdgeId, VersionId>,
version_counts: &FastHashMap<EdgeId, usize>,
_current_time: Instant,
) -> Vec<MigrationCandidate> {
let mut candidates_per_edge: FastHashMap<EdgeId, usize> = FastHashMap::default();
let mut all_candidates = Vec::with_capacity(versions.len());
let current_wallclock_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0);
for (version_id, version) in versions {
if let Some(&head_id) = head_versions.get(&version.edge_id)
&& *version_id == head_id
{
continue;
}
let tx_start_ms = version.temporal.transaction_time().start().wallclock();
let age_ms = (current_wallclock_ms - tx_start_ms).max(0) as u64;
let age = Duration::from_millis(age_ms);
if age >= self.policy.age_threshold {
all_candidates.push(MigrationCandidate {
version_id: *version_id,
is_node: false,
age,
estimated_size: version.estimated_size(),
});
}
}
self.sort_candidates_by_policy(&mut all_candidates);
let mut final_candidates = Vec::with_capacity(all_candidates.len());
for candidate in all_candidates {
let edge_id = versions.get(&candidate.version_id).map(|v| v.edge_id);
if let Some(edge_id) = edge_id {
let count = version_counts.get(&edge_id).copied().unwrap_or(0);
let already_selected = candidates_per_edge.get(&edge_id).copied().unwrap_or(0);
let max_migrate = count.saturating_sub(self.policy.min_hot_versions);
if already_selected < max_migrate {
*candidates_per_edge.entry(edge_id).or_insert(0) += 1;
final_candidates.push(candidate);
}
}
}
final_candidates
}
}
impl Drop for MigrationService {
fn drop(&mut self) {
if !self.running.swap(false, Ordering::SeqCst) {
return;
}
let current_gen = self.generation.load(Ordering::SeqCst);
let (lock, cvar) = &*self.shutdown_complete;
if let Ok(mut state) = lock.lock() {
let deadline = Instant::now() + DROP_TIMEOUT;
while !state.0 || state.1 != current_gen {
if state.1 > current_gen {
break;
}
let now = Instant::now();
if now >= deadline {
break;
}
let timeout = deadline - now;
let result = cvar.wait_timeout(state, timeout);
match result {
Ok((new_state, timeout_result)) => {
state = new_state;
if timeout_result.timed_out() {
break;
}
}
Err(_) => {
break;
}
}
}
}
if let Ok(mut handle_guard) = self.worker_handle.lock()
&& let Some(handle) = handle_guard.take()
{
let _ = handle.join();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::id::{EdgeId, NodeId};
use crate::core::interning::GLOBAL_INTERNER;
use crate::core::property::PropertyMapBuilder;
use crate::core::temporal::BiTemporalInterval;
use crate::core::version::EdgeVersion;
use crate::storage::redb_cold_storage::{RedbColdStorage, RedbConfig};
use std::sync::atomic::AtomicUsize;
use std::thread;
use tempfile::tempdir;
fn create_test_node_version(id: u64, node_id: u64) -> NodeVersion {
let properties = PropertyMapBuilder::new()
.insert("name", "Test")
.insert("age", 30i64)
.build();
NodeVersion::new_anchor(
VersionId::new(id).unwrap(),
NodeId::new(node_id).unwrap(),
BiTemporalInterval::current(1000.into()),
GLOBAL_INTERNER.intern("Person").unwrap(),
properties,
)
}
fn create_cold_storage() -> Arc<RedbColdStorage> {
let temp_dir = tempdir().unwrap();
let path = temp_dir.path().join("cold.redb");
std::mem::forget(temp_dir);
Arc::new(RedbColdStorage::new(path, RedbConfig::new()).unwrap())
}
#[test]
fn test_default_policy() {
let policy = MigrationPolicy::default();
assert_eq!(policy.age_threshold, Duration::from_secs(7 * 24 * 60 * 60));
assert_eq!(policy.memory_threshold_bytes, 1024 * 1024 * 1024);
assert_eq!(policy.min_hot_versions, 1);
assert_eq!(policy.batch_size, 1000);
assert!(policy.enabled);
}
#[test]
fn test_aggressive_policy() {
let policy = MigrationPolicy::aggressive();
assert_eq!(policy.age_threshold, Duration::from_secs(24 * 60 * 60));
assert_eq!(policy.memory_threshold_bytes, 512 * 1024 * 1024);
assert_eq!(policy.batch_size, 2000);
}
#[test]
fn test_conservative_policy() {
let policy = MigrationPolicy::conservative();
assert_eq!(policy.age_threshold, Duration::from_secs(30 * 24 * 60 * 60));
assert_eq!(policy.min_hot_versions, 5);
}
#[test]
fn test_disabled_policy() {
let policy = MigrationPolicy::disabled();
assert!(!policy.enabled);
}
#[test]
fn test_policy_builder() {
let policy = MigrationPolicy::builder()
.age_threshold(Duration::from_secs(86400))
.memory_threshold_bytes(2 * 1024 * 1024 * 1024)
.min_hot_versions(3)
.batch_size(500)
.run_interval(Duration::from_secs(120))
.enabled(true)
.build();
assert_eq!(policy.age_threshold, Duration::from_secs(86400));
assert_eq!(policy.memory_threshold_bytes, 2 * 1024 * 1024 * 1024);
assert_eq!(policy.min_hot_versions, 3);
assert_eq!(policy.batch_size, 500);
assert_eq!(policy.run_interval, Duration::from_secs(120));
}
#[test]
fn test_migration_service_creation() {
let cold = create_cold_storage();
let policy = MigrationPolicy::default();
let service = MigrationService::new(cold, policy);
assert!(!service.is_running());
assert_eq!(service.stats().node_versions_migrated, 0);
}
#[test]
fn test_migrate_node_versions() {
let cold = create_cold_storage();
let policy = MigrationPolicy::default();
let service = MigrationService::new(cold.clone(), policy);
let versions: Vec<NodeVersion> =
(1..=10).map(|i| create_test_node_version(i, 100)).collect();
let migrated = service.migrate_node_versions(&versions).unwrap();
assert_eq!(migrated, 10);
let stats = service.stats();
assert_eq!(stats.node_versions_migrated, 10);
for version in &versions {
assert!(cold.contains_node_version(version.id).unwrap());
}
}
#[test]
fn test_migrate_disabled() {
let cold = create_cold_storage();
let policy = MigrationPolicy::disabled();
let service = MigrationService::new(cold.clone(), policy);
let versions: Vec<NodeVersion> =
(1..=10).map(|i| create_test_node_version(i, 100)).collect();
let migrated = service.migrate_node_versions(&versions).unwrap();
assert_eq!(migrated, 0);
for version in &versions {
assert!(!cold.contains_node_version(version.id).unwrap());
}
}
#[test]
fn test_migration_batching() {
let cold = create_cold_storage();
let policy = MigrationPolicy::builder().batch_size(3).build();
let service = MigrationService::new(cold.clone(), policy);
let versions: Vec<NodeVersion> =
(1..=10).map(|i| create_test_node_version(i, 100)).collect();
let migrated = service.migrate_node_versions(&versions).unwrap();
assert_eq!(migrated, 10);
for version in &versions {
assert!(cold.contains_node_version(version.id).unwrap());
}
}
#[test]
fn test_identify_candidates_respects_min_hot_versions() {
let cold = create_cold_storage();
let policy = MigrationPolicy::builder()
.min_hot_versions(2)
.age_threshold(Duration::ZERO) .build();
let service = MigrationService::new(cold, policy);
let mut versions = FastHashMap::default();
let mut heads = FastHashMap::default();
let mut counts = FastHashMap::default();
let node_id = NodeId::new(100).unwrap();
for i in 1..=3 {
let v = create_test_node_version(i, 100);
versions.insert(v.id, v);
}
heads.insert(node_id, VersionId::new(3).unwrap()); counts.insert(node_id, 3);
let candidates =
service.identify_node_candidates(&versions, &heads, &counts, Instant::now());
assert_eq!(candidates.len(), 1);
}
#[test]
fn test_identify_candidates_skips_head() {
let cold = create_cold_storage();
let policy = MigrationPolicy::builder()
.min_hot_versions(1)
.age_threshold(Duration::ZERO)
.build();
let service = MigrationService::new(cold, policy);
let mut versions = FastHashMap::default();
let mut heads = FastHashMap::default();
let mut counts = FastHashMap::default();
let node_id = NodeId::new(100).unwrap();
for i in 1..=3 {
let v = create_test_node_version(i, 100);
versions.insert(v.id, v);
}
heads.insert(node_id, VersionId::new(3).unwrap());
counts.insert(node_id, 3);
let candidates =
service.identify_node_candidates(&versions, &heads, &counts, Instant::now());
assert!(!candidates.iter().any(|c| c.version_id.as_u64() == 3));
}
#[test]
fn test_migration_stats_throughput() {
let stats = MigrationStats {
node_versions_migrated: 1000,
edge_versions_migrated: 500,
bytes_migrated: 1_000_000,
runs_completed: 10,
errors: 0,
last_run_duration: Duration::from_secs(10),
last_run_time: Some(Instant::now()),
};
assert!((stats.versions_per_second() - 150.0).abs() < 0.1);
}
struct FilteringCallback {
skip_version_ids: Vec<u64>,
}
impl MigrationCallback for FilteringCallback {
fn before_node_migration(&self, version: &NodeVersion) -> bool {
!self.skip_version_ids.contains(&version.id.as_u64())
}
}
#[test]
fn test_migration_callback_filtering() {
let cold = create_cold_storage();
let policy = MigrationPolicy::default();
let callback = Arc::new(FilteringCallback {
skip_version_ids: vec![2, 4, 6, 8, 10],
});
let service = MigrationService::with_callback(cold.clone(), policy, callback);
let versions: Vec<NodeVersion> =
(1..=10).map(|i| create_test_node_version(i, 100)).collect();
let migrated = service.migrate_node_versions(&versions).unwrap();
assert_eq!(migrated, 5);
assert!(
cold.contains_node_version(VersionId::new(1).unwrap())
.unwrap()
);
assert!(
!cold
.contains_node_version(VersionId::new(2).unwrap())
.unwrap()
);
assert!(
cold.contains_node_version(VersionId::new(3).unwrap())
.unwrap()
);
}
#[test]
fn test_background_worker_starts_and_stops() {
let cold = create_cold_storage();
let policy = MigrationPolicy::builder()
.run_interval(Duration::from_millis(50))
.enabled(true)
.build();
let service = Arc::new(MigrationService::new(cold, policy));
assert!(!service.is_running());
service.start();
assert!(service.is_running());
thread::sleep(Duration::from_millis(100));
service.stop();
assert!(!service.is_running());
}
#[test]
fn test_graceful_shutdown_waits_for_inflight() {
let cold = create_cold_storage();
let policy = MigrationPolicy::builder()
.run_interval(Duration::from_millis(50))
.batch_size(10)
.enabled(true)
.build();
let batches_completed = Arc::new(AtomicUsize::new(0));
let batches_completed_clone = batches_completed.clone();
struct BatchTracker {
completed: Arc<AtomicUsize>,
}
impl MigrationCallback for BatchTracker {
fn after_batch(&self, node_count: usize, edge_count: usize) {
if node_count > 0 || edge_count > 0 {
self.completed.fetch_add(1, Ordering::SeqCst);
}
}
}
let callback = Arc::new(BatchTracker {
completed: batches_completed_clone,
});
let service = Arc::new(MigrationService::with_callback(cold, policy, callback));
service.start();
thread::sleep(Duration::from_millis(100));
let stop_start = Instant::now();
service.stop();
let stop_duration = stop_start.elapsed();
assert!(stop_duration < Duration::from_secs(5));
assert!(!service.is_running());
}
#[test]
fn test_multiple_start_stop_cycles() {
let cold = create_cold_storage();
let policy = MigrationPolicy::builder()
.run_interval(Duration::from_millis(50))
.build();
let service = Arc::new(MigrationService::new(cold, policy));
for _ in 0..3 {
assert!(!service.is_running());
service.start();
assert!(service.is_running());
thread::sleep(Duration::from_millis(50));
service.stop();
assert!(!service.is_running());
}
}
#[test]
fn test_double_start_is_noop() {
let cold = create_cold_storage();
let policy = MigrationPolicy::default();
let service = Arc::new(MigrationService::new(cold, policy));
service.start();
assert!(service.is_running());
service.start();
assert!(service.is_running());
service.stop();
assert!(!service.is_running());
}
#[test]
fn test_double_stop_is_noop() {
let cold = create_cold_storage();
let policy = MigrationPolicy::default();
let service = Arc::new(MigrationService::new(cold, policy));
service.start();
service.stop();
assert!(!service.is_running());
service.stop();
assert!(!service.is_running());
}
#[test]
fn test_memory_pressure_trigger_enabled() {
let cold = create_cold_storage();
let policy = MigrationPolicy::builder()
.memory_threshold_bytes(1000) .age_threshold(Duration::ZERO)
.min_hot_versions(1)
.build();
let service = MigrationService::new(cold, policy);
assert!(service.should_trigger_migration(2000, 0)); assert!(!service.should_trigger_migration(500, 0)); }
#[test]
fn test_combined_triggers() {
let cold = create_cold_storage();
let policy = MigrationPolicy::builder()
.memory_threshold_bytes(1000)
.age_threshold(Duration::from_secs(3600))
.build();
let service = MigrationService::new(cold, policy);
assert!(service.should_trigger_migration(2000, 0)); assert!(service.should_trigger_migration(500, 10)); assert!(!service.should_trigger_migration(500, 0)); }
#[test]
fn test_access_tracking_records_access() {
let cold = create_cold_storage();
let policy = MigrationPolicy::default();
let service = MigrationService::new(cold, policy);
let version_id = VersionId::new(1).unwrap();
service.record_access(version_id);
let last_access = service.get_last_access(version_id);
assert!(last_access.is_some());
}
#[test]
fn test_lru_candidates_prioritized() {
let cold = create_cold_storage();
let policy = MigrationPolicy::builder()
.age_threshold(Duration::ZERO) .min_hot_versions(1)
.build();
let service = MigrationService::new(cold, policy);
let v1 = VersionId::new(1).unwrap();
let v2 = VersionId::new(2).unwrap();
let v3 = VersionId::new(3).unwrap();
service.record_access(v1);
thread::sleep(Duration::from_millis(10));
service.record_access(v2);
thread::sleep(Duration::from_millis(10));
service.record_access(v3);
let v1_access = service.get_last_access(v1).unwrap();
let v3_access = service.get_last_access(v3).unwrap();
assert!(v1_access < v3_access);
}
#[test]
fn test_identify_candidates_with_lru() {
let cold = create_cold_storage();
let policy = MigrationPolicy::builder()
.age_threshold(Duration::ZERO)
.min_hot_versions(1)
.enable_lru_migration(true)
.build();
let service = MigrationService::new(cold, policy);
let mut versions = FastHashMap::default();
let mut heads = FastHashMap::default();
let mut counts = FastHashMap::default();
let node1 = NodeId::new(100).unwrap();
let node2 = NodeId::new(200).unwrap();
let v1 = create_test_node_version(1, 100);
let v2 = create_test_node_version(2, 100);
let v3 = create_test_node_version(3, 200);
let v4 = create_test_node_version(4, 200);
versions.insert(v1.id, v1.clone());
versions.insert(v2.id, v2.clone());
versions.insert(v3.id, v3.clone());
versions.insert(v4.id, v4.clone());
heads.insert(node1, VersionId::new(2).unwrap());
heads.insert(node2, VersionId::new(4).unwrap());
counts.insert(node1, 2);
counts.insert(node2, 2);
service.record_access(v1.id);
thread::sleep(Duration::from_millis(10));
service.record_access(v3.id);
let candidates =
service.identify_node_candidates(&versions, &heads, &counts, Instant::now());
assert_eq!(candidates.len(), 2);
if candidates.len() >= 2 {
let v1_pos = candidates.iter().position(|c| c.version_id.as_u64() == 1);
let v3_pos = candidates.iter().position(|c| c.version_id.as_u64() == 3);
if let (Some(p1), Some(p3)) = (v1_pos, v3_pos) {
assert!(p1 < p3, "LRU should prioritize v1 over v3");
}
}
}
#[test]
fn test_progress_tracking_callback() {
let cold = create_cold_storage();
let policy = MigrationPolicy::builder().batch_size(5).build();
let progress_updates = Arc::new(std::sync::Mutex::new(Vec::new()));
let progress_clone = progress_updates.clone();
struct ProgressTracker {
updates: Arc<std::sync::Mutex<Vec<MigrationProgress>>>,
}
impl MigrationCallback for ProgressTracker {
fn on_progress(&self, progress: &MigrationProgress) {
self.updates.lock().unwrap().push(progress.clone());
}
}
let callback = Arc::new(ProgressTracker {
updates: progress_clone,
});
let service = MigrationService::with_callback(cold, policy, callback);
let versions: Vec<NodeVersion> =
(1..=12).map(|i| create_test_node_version(i, 100)).collect();
let migrated = service.migrate_node_versions(&versions).unwrap();
assert_eq!(migrated, 12);
let updates = progress_updates.lock().unwrap();
assert!(!updates.is_empty());
if let Some(final_progress) = updates.last() {
assert_eq!(final_progress.total_versions, 12);
assert_eq!(final_progress.migrated_versions, 12);
assert!(final_progress.is_complete());
}
}
#[test]
fn test_progress_percentage() {
let progress = MigrationProgress {
total_versions: 100,
migrated_versions: 50,
bytes_migrated: 1000,
current_batch: 5,
total_batches: 10,
elapsed: Duration::from_secs(5),
};
assert!((progress.percentage() - 50.0).abs() < 0.01);
assert!(!progress.is_complete());
let complete_progress = MigrationProgress {
total_versions: 100,
migrated_versions: 100,
bytes_migrated: 2000,
current_batch: 10,
total_batches: 10,
elapsed: Duration::from_secs(10),
};
assert!((complete_progress.percentage() - 100.0).abs() < 0.01);
assert!(complete_progress.is_complete());
}
#[test]
fn test_progress_throughput() {
let progress = MigrationProgress {
total_versions: 100,
migrated_versions: 50,
bytes_migrated: 1_000_000,
current_batch: 5,
total_batches: 10,
elapsed: Duration::from_secs(5),
};
assert!((progress.versions_per_second() - 10.0).abs() < 0.01);
assert!((progress.bytes_per_second() - 200_000.0).abs() < 0.01);
}
#[test]
fn test_migration_run_stats_updated() {
let cold = create_cold_storage();
let policy = MigrationPolicy::default();
let service = MigrationService::new(cold, policy);
let versions: Vec<NodeVersion> =
(1..=50).map(|i| create_test_node_version(i, 100)).collect();
service.migrate_node_versions(&versions).unwrap();
let stats = service.stats();
assert_eq!(stats.node_versions_migrated, 50);
assert!(stats.bytes_migrated > 0);
}
#[test]
fn test_service_handles_empty_migration() {
let cold = create_cold_storage();
let policy = MigrationPolicy::default();
let service = MigrationService::new(cold, policy);
let migrated = service.migrate_node_versions(&[]).unwrap();
assert_eq!(migrated, 0);
let stats = service.stats();
assert_eq!(stats.node_versions_migrated, 0);
}
fn create_test_edge_version(id: u64, edge_id: u64) -> EdgeVersion {
let properties = PropertyMapBuilder::new().insert("weight", 1.5f64).build();
EdgeVersion::new_anchor(
VersionId::new(id).unwrap(),
EdgeId::new(edge_id).unwrap(),
BiTemporalInterval::current(1000.into()),
GLOBAL_INTERNER.intern("KNOWS").unwrap(),
NodeId::new(1).unwrap(),
NodeId::new(2).unwrap(),
properties,
)
}
fn create_test_edge_version_with_timestamp(id: u64, edge_id: u64, ts_ms: i64) -> EdgeVersion {
use crate::core::temporal::TimeRange;
let properties = PropertyMapBuilder::new().insert("weight", 1.5f64).build();
let range = TimeRange::from(ts_ms.into());
let temporal = BiTemporalInterval::new(range, range);
EdgeVersion::new_anchor(
VersionId::new(id).unwrap(),
EdgeId::new(edge_id).unwrap(),
temporal,
GLOBAL_INTERNER.intern("KNOWS").unwrap(),
NodeId::new(1).unwrap(),
NodeId::new(2).unwrap(),
properties,
)
}
#[test]
fn test_migrate_edge_versions() {
let cold = create_cold_storage();
let policy = MigrationPolicy::default();
let service = MigrationService::new(cold.clone(), policy);
let versions: Vec<EdgeVersion> =
(1..=10).map(|i| create_test_edge_version(i, 200)).collect();
let migrated = service.migrate_edge_versions(&versions).unwrap();
assert_eq!(migrated, 10);
let stats = service.stats();
assert_eq!(stats.edge_versions_migrated, 10);
for version in &versions {
assert!(cold.contains_edge_version(version.id).unwrap());
}
}
#[test]
fn test_migrate_edge_versions_disabled() {
let cold = create_cold_storage();
let policy = MigrationPolicy::disabled();
let service = MigrationService::new(cold.clone(), policy);
let versions: Vec<EdgeVersion> =
(1..=10).map(|i| create_test_edge_version(i, 200)).collect();
let migrated = service.migrate_edge_versions(&versions).unwrap();
assert_eq!(migrated, 0);
for version in &versions {
assert!(!cold.contains_edge_version(version.id).unwrap());
}
}
#[test]
fn test_migrate_edge_versions_batching() {
let cold = create_cold_storage();
let policy = MigrationPolicy::builder().batch_size(3).build();
let service = MigrationService::new(cold.clone(), policy);
let versions: Vec<EdgeVersion> =
(1..=10).map(|i| create_test_edge_version(i, 200)).collect();
let migrated = service.migrate_edge_versions(&versions).unwrap();
assert_eq!(migrated, 10);
for version in &versions {
assert!(cold.contains_edge_version(version.id).unwrap());
}
}
#[test]
fn test_identify_edge_candidates_respects_min_hot_versions() {
let cold = create_cold_storage();
let policy = MigrationPolicy::builder()
.min_hot_versions(2)
.age_threshold(Duration::ZERO) .build();
let service = MigrationService::new(cold, policy);
let mut versions = FastHashMap::default();
let mut heads = FastHashMap::default();
let mut counts = FastHashMap::default();
let edge_id = EdgeId::new(200).unwrap();
for i in 1..=3 {
let v = create_test_edge_version(i, 200);
versions.insert(v.id, v);
}
heads.insert(edge_id, VersionId::new(3).unwrap()); counts.insert(edge_id, 3);
let candidates =
service.identify_edge_candidates(&versions, &heads, &counts, Instant::now());
assert_eq!(candidates.len(), 1);
}
#[test]
fn test_identify_edge_candidates_skips_head() {
let cold = create_cold_storage();
let policy = MigrationPolicy::builder()
.min_hot_versions(1)
.age_threshold(Duration::ZERO)
.build();
let service = MigrationService::new(cold, policy);
let mut versions = FastHashMap::default();
let mut heads = FastHashMap::default();
let mut counts = FastHashMap::default();
let edge_id = EdgeId::new(200).unwrap();
for i in 1..=3 {
let v = create_test_edge_version(i, 200);
versions.insert(v.id, v);
}
heads.insert(edge_id, VersionId::new(3).unwrap());
counts.insert(edge_id, 3);
let candidates =
service.identify_edge_candidates(&versions, &heads, &counts, Instant::now());
assert!(!candidates.iter().any(|c| c.version_id.as_u64() == 3));
}
#[test]
fn test_identify_edge_candidates_respects_age_threshold() {
let cold = create_cold_storage();
let policy = MigrationPolicy::builder()
.min_hot_versions(1)
.age_threshold(Duration::from_secs(3600))
.build();
let service = MigrationService::new(cold, policy);
let mut versions = FastHashMap::default();
let mut heads = FastHashMap::default();
let mut counts = FastHashMap::default();
let edge_id = EdgeId::new(200).unwrap();
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
let old_ts = now_ms - (2 * 60 * 60 * 1000);
let v1 = create_test_edge_version_with_timestamp(1, 200, old_ts);
versions.insert(v1.id, v1);
let recent_ts = now_ms - (30 * 60 * 1000);
let v2 = create_test_edge_version_with_timestamp(2, 200, recent_ts);
versions.insert(v2.id, v2);
let v3 = create_test_edge_version_with_timestamp(3, 200, now_ms);
versions.insert(v3.id, v3);
heads.insert(edge_id, VersionId::new(3).unwrap());
counts.insert(edge_id, 3);
let candidates =
service.identify_edge_candidates(&versions, &heads, &counts, Instant::now());
assert_eq!(candidates.len(), 1);
assert_eq!(candidates[0].version_id.as_u64(), 1);
}
struct EdgeFilteringCallback {
skip_version_ids: Vec<u64>,
batch_counts: std::sync::Mutex<Vec<(usize, usize)>>,
}
impl EdgeFilteringCallback {
fn new(skip_ids: Vec<u64>) -> Self {
Self {
skip_version_ids: skip_ids,
batch_counts: std::sync::Mutex::new(Vec::new()),
}
}
}
impl MigrationCallback for EdgeFilteringCallback {
fn before_edge_migration(&self, version: &EdgeVersion) -> bool {
!self.skip_version_ids.contains(&version.id.as_u64())
}
fn after_batch(&self, node_count: usize, edge_count: usize) {
self.batch_counts
.lock()
.unwrap()
.push((node_count, edge_count));
}
}
#[test]
fn test_edge_migration_callback_filtering() {
let cold = create_cold_storage();
let policy = MigrationPolicy::default();
let callback = Arc::new(EdgeFilteringCallback::new(vec![2, 4, 6, 8, 10]));
let service = MigrationService::with_callback(cold.clone(), policy, callback.clone());
let versions: Vec<EdgeVersion> =
(1..=10).map(|i| create_test_edge_version(i, 200)).collect();
let migrated = service.migrate_edge_versions(&versions).unwrap();
assert_eq!(migrated, 5);
assert!(
cold.contains_edge_version(VersionId::new(1).unwrap())
.unwrap()
);
assert!(
!cold
.contains_edge_version(VersionId::new(2).unwrap())
.unwrap()
);
assert!(
cold.contains_edge_version(VersionId::new(3).unwrap())
.unwrap()
);
let batches = callback.batch_counts.lock().unwrap();
assert!(!batches.is_empty());
for (node_count, _edge_count) in batches.iter() {
assert_eq!(*node_count, 0);
}
}
#[test]
fn test_migration_stats_throughput_zero_duration() {
let stats = MigrationStats {
node_versions_migrated: 1000,
edge_versions_migrated: 500,
bytes_migrated: 1_000_000,
runs_completed: 10,
errors: 0,
last_run_duration: Duration::ZERO,
last_run_time: Some(Instant::now()),
};
assert_eq!(stats.versions_per_second(), 0.0);
}
#[test]
fn test_migration_stats_default() {
let stats = MigrationStats::default();
assert_eq!(stats.node_versions_migrated, 0);
assert_eq!(stats.edge_versions_migrated, 0);
assert_eq!(stats.bytes_migrated, 0);
assert_eq!(stats.runs_completed, 0);
assert_eq!(stats.errors, 0);
assert_eq!(stats.last_run_duration, Duration::ZERO);
assert!(stats.last_run_time.is_none());
}
#[test]
fn test_atomic_migration_stats_new() {
let stats = AtomicMigrationStats::new();
let snapshot = stats.snapshot();
assert_eq!(snapshot.node_versions_migrated, 0);
assert_eq!(snapshot.edge_versions_migrated, 0);
assert_eq!(snapshot.bytes_migrated, 0);
assert_eq!(snapshot.runs_completed, 0);
assert_eq!(snapshot.errors, 0);
}
#[test]
fn test_atomic_migration_stats_snapshot() {
let stats = AtomicMigrationStats::new();
stats.node_versions_migrated.store(100, Ordering::Relaxed);
stats.edge_versions_migrated.store(50, Ordering::Relaxed);
stats.bytes_migrated.store(10000, Ordering::Relaxed);
stats.runs_completed.store(5, Ordering::Relaxed);
stats.errors.store(2, Ordering::Relaxed);
let snapshot = stats.snapshot();
assert_eq!(snapshot.node_versions_migrated, 100);
assert_eq!(snapshot.edge_versions_migrated, 50);
assert_eq!(snapshot.bytes_migrated, 10000);
assert_eq!(snapshot.runs_completed, 5);
assert_eq!(snapshot.errors, 2);
}
#[test]
fn test_service_policy_getter() {
let cold = create_cold_storage();
let policy = MigrationPolicy::builder()
.age_threshold(Duration::from_secs(123456))
.min_hot_versions(7)
.build();
let service = MigrationService::new(cold, policy);
assert_eq!(service.policy().age_threshold, Duration::from_secs(123456));
assert_eq!(service.policy().min_hot_versions, 7);
}
#[test]
fn test_migrate_empty_edge_versions() {
let cold = create_cold_storage();
let policy = MigrationPolicy::default();
let service = MigrationService::new(cold.clone(), policy);
let edge_versions: Vec<EdgeVersion> = vec![];
let migrated = service.migrate_edge_versions(&edge_versions).unwrap();
assert_eq!(migrated, 0);
let stats = service.stats();
assert_eq!(stats.edge_versions_migrated, 0);
}
#[test]
fn test_identify_edge_candidates_empty_versions() {
let cold = create_cold_storage();
let policy = MigrationPolicy::builder()
.age_threshold(Duration::ZERO)
.build();
let service = MigrationService::new(cold, policy);
let empty_versions: FastHashMap<VersionId, EdgeVersion> = FastHashMap::default();
let empty_heads: FastHashMap<EdgeId, VersionId> = FastHashMap::default();
let empty_counts: FastHashMap<EdgeId, usize> = FastHashMap::default();
let candidates = service.identify_edge_candidates(
&empty_versions,
&empty_heads,
&empty_counts,
Instant::now(),
);
assert!(candidates.is_empty());
}
#[test]
fn test_identify_candidates_version_count_zero() {
let cold = create_cold_storage();
let policy = MigrationPolicy::builder()
.min_hot_versions(1)
.age_threshold(Duration::ZERO)
.build();
let service = MigrationService::new(cold, policy);
let mut versions = FastHashMap::default();
let mut heads = FastHashMap::default();
let counts = FastHashMap::default();
let node_id = NodeId::new(100).unwrap();
for i in 1..=3 {
let v = create_test_node_version(i, 100);
versions.insert(v.id, v);
}
heads.insert(node_id, VersionId::new(3).unwrap());
let candidates =
service.identify_node_candidates(&versions, &heads, &counts, Instant::now());
assert!(candidates.is_empty());
}
#[test]
fn test_migration_candidate_debug_and_clone() {
let candidate = MigrationCandidate {
version_id: VersionId::new(1).unwrap(),
is_node: true,
age: Duration::from_secs(3600),
estimated_size: 1024,
};
let cloned = candidate.clone();
assert_eq!(cloned.version_id, candidate.version_id);
assert_eq!(cloned.is_node, candidate.is_node);
assert_eq!(cloned.age, candidate.age);
assert_eq!(cloned.estimated_size, candidate.estimated_size);
let debug_str = format!("{:?}", candidate);
assert!(debug_str.contains("MigrationCandidate"));
}
#[test]
fn test_conservative_policy_values() {
let policy = MigrationPolicy::conservative();
assert_eq!(policy.age_threshold, Duration::from_secs(30 * 24 * 60 * 60));
assert_eq!(policy.memory_threshold_bytes, 4 * 1024 * 1024 * 1024);
assert_eq!(policy.min_hot_versions, 5);
assert_eq!(policy.batch_size, 500);
assert_eq!(policy.run_interval, Duration::from_secs(300));
assert!(policy.enabled);
}
#[test]
fn test_aggressive_policy_values() {
let policy = MigrationPolicy::aggressive();
assert_eq!(policy.age_threshold, Duration::from_secs(24 * 60 * 60));
assert_eq!(policy.memory_threshold_bytes, 512 * 1024 * 1024);
assert_eq!(policy.min_hot_versions, 1);
assert_eq!(policy.batch_size, 2000);
assert_eq!(policy.run_interval, Duration::from_secs(30));
assert!(policy.enabled);
assert!(policy.enable_lru); }
#[test]
fn test_default_policy_run_interval() {
let policy = MigrationPolicy::default();
assert_eq!(policy.run_interval, Duration::from_secs(60));
}
#[test]
fn test_policy_builder_default() {
let builder = MigrationPolicyBuilder::default();
let policy = builder.build();
assert_eq!(policy.age_threshold, Duration::from_secs(7 * 24 * 60 * 60));
}
#[test]
fn test_identify_candidates_multiple_nodes() {
let cold = create_cold_storage();
let policy = MigrationPolicy::builder()
.min_hot_versions(1)
.age_threshold(Duration::ZERO)
.build();
let service = MigrationService::new(cold, policy);
let mut versions = FastHashMap::default();
let mut heads = FastHashMap::default();
let mut counts = FastHashMap::default();
for node_num in [100u64, 101, 102] {
let node_id = NodeId::new(node_num).unwrap();
for i in 1..=3 {
let version_id = node_num * 10 + i;
let v = create_test_node_version(version_id, node_num);
versions.insert(v.id, v);
}
heads.insert(node_id, VersionId::new(node_num * 10 + 3).unwrap());
counts.insert(node_id, 3);
}
let candidates =
service.identify_node_candidates(&versions, &heads, &counts, Instant::now());
assert_eq!(candidates.len(), 6);
}
#[test]
fn test_identify_edge_candidates_multiple_edges() {
let cold = create_cold_storage();
let policy = MigrationPolicy::builder()
.min_hot_versions(1)
.age_threshold(Duration::ZERO)
.build();
let service = MigrationService::new(cold, policy);
let mut versions = FastHashMap::default();
let mut heads = FastHashMap::default();
let mut counts = FastHashMap::default();
for edge_num in [200u64, 201, 202] {
let edge_id = EdgeId::new(edge_num).unwrap();
for i in 1..=3 {
let version_id = edge_num * 10 + i;
let v = create_test_edge_version(version_id, edge_num);
versions.insert(v.id, v);
}
heads.insert(edge_id, VersionId::new(edge_num * 10 + 3).unwrap());
counts.insert(edge_id, 3);
}
let candidates =
service.identify_edge_candidates(&versions, &heads, &counts, Instant::now());
assert_eq!(candidates.len(), 6);
}
#[test]
fn test_progress_estimated_remaining() {
let progress = MigrationProgress {
total_versions: 100,
migrated_versions: 50,
bytes_migrated: 1000,
current_batch: 5,
total_batches: 10,
elapsed: Duration::from_secs(5),
};
let remaining = progress.estimated_remaining();
assert!(remaining.as_secs() <= 6 && remaining.as_secs() >= 4);
}
#[test]
fn test_progress_zero_elapsed() {
let progress = MigrationProgress {
total_versions: 100,
migrated_versions: 0,
bytes_migrated: 0,
current_batch: 0,
total_batches: 10,
elapsed: Duration::ZERO,
};
assert_eq!(progress.versions_per_second(), 0.0);
assert_eq!(progress.bytes_per_second(), 0.0);
assert_eq!(progress.estimated_remaining(), Duration::ZERO);
}
#[test]
fn test_progress_empty_total() {
let progress = MigrationProgress {
total_versions: 0,
migrated_versions: 0,
bytes_migrated: 0,
current_batch: 0,
total_batches: 0,
elapsed: Duration::from_secs(1),
};
assert_eq!(progress.percentage(), 100.0);
assert!(progress.is_complete());
}
#[test]
fn test_clear_access_tracking() {
let cold = create_cold_storage();
let policy = MigrationPolicy::default();
let service = MigrationService::new(cold, policy);
let v1 = VersionId::new(1).unwrap();
let v2 = VersionId::new(2).unwrap();
let v3 = VersionId::new(3).unwrap();
service.record_access(v1);
service.record_access(v2);
service.record_access(v3);
assert!(service.get_last_access(v1).is_some());
assert!(service.get_last_access(v2).is_some());
assert!(service.get_last_access(v3).is_some());
service.clear_access(&[v1, v2]);
assert!(service.get_last_access(v1).is_none());
assert!(service.get_last_access(v2).is_none());
assert!(service.get_last_access(v3).is_some());
}
#[test]
fn test_migration_updates_flushed_lsn() {
use crate::storage::redb_cold_storage::{RedbColdStorage, RedbConfig};
use tempfile::tempdir;
let temp_dir = tempdir().unwrap();
let cold = Arc::new(
RedbColdStorage::new(
temp_dir.path().join("cold.redb"),
RedbConfig::new().compression(crate::storage::CompressionAlgorithm::None),
)
.unwrap(),
);
let policy = MigrationPolicy::default();
let service = MigrationService::new(cold.clone(), policy);
let nodes: Vec<NodeVersion> = (1..=5).map(|i| create_test_node_version(i, 100)).collect();
let edges: Vec<EdgeVersion> = (10..=12)
.map(|i| create_test_edge_version(i, 200))
.collect();
let lsn = LSN(1000);
assert!(cold.get_flushed_lsn().unwrap().is_none());
let result = service.migrate_batch_with_lsn(&nodes, &edges, lsn).unwrap();
assert_eq!(result.nodes_migrated, 5);
assert_eq!(result.edges_migrated, 3);
assert_eq!(result.flushed_lsn, Some(lsn));
assert_eq!(cold.get_flushed_lsn().unwrap(), Some(lsn));
}
#[test]
fn test_migration_with_coordinator_set() {
use crate::storage::redb_cold_storage::{RedbColdStorage, RedbConfig};
use crate::storage::wal::flush_coordinator::FlushCoordinatorConfig;
use tempfile::tempdir;
let temp_dir = tempdir().unwrap();
let wal_dir = temp_dir.path().join("wal");
std::fs::create_dir_all(&wal_dir).unwrap();
let cold = Arc::new(
RedbColdStorage::new(
temp_dir.path().join("cold.redb"),
RedbConfig::new().compression(crate::storage::CompressionAlgorithm::None),
)
.unwrap(),
);
let config = FlushCoordinatorConfig {
wal_dir: wal_dir.clone(),
segment_size: 1024,
..Default::default()
};
let coordinator = Arc::new(FlushCoordinator::new(config).unwrap());
let policy = MigrationPolicy::default();
let mut service = MigrationService::new(cold.clone(), policy);
service.set_flush_coordinator(coordinator.clone());
assert!(service.flush_coordinator().is_some());
let nodes: Vec<NodeVersion> = (1..=3).map(|i| create_test_node_version(i, 100)).collect();
let lsn = LSN(25);
let result = service.migrate_batch_with_lsn(&nodes, &[], lsn).unwrap();
assert_eq!(result.nodes_migrated, 3);
assert_eq!(result.flushed_lsn, Some(lsn));
assert_eq!(result.segments_truncated, 0);
for node in &nodes {
assert!(cold.contains_node_version(node.id).unwrap());
}
assert_eq!(cold.get_flushed_lsn().unwrap(), Some(lsn));
}
#[test]
fn test_migration_failure_does_not_truncate_wal() {
use crate::storage::wal::flush_coordinator::FlushCoordinatorConfig;
let temp_dir = tempdir().unwrap();
let wal_dir = temp_dir.path().join("wal");
std::fs::create_dir_all(&wal_dir).unwrap();
let config = FlushCoordinatorConfig {
wal_dir: wal_dir.clone(),
segment_size: 1024,
..Default::default()
};
let coordinator = Arc::new(FlushCoordinator::new(config).unwrap());
let db_path = temp_dir.path().join("cold.redb");
let cold = Arc::new(RedbColdStorage::new(&db_path, RedbConfig::new()).unwrap());
cold.set_fail_writes(true);
let policy = MigrationPolicy::default();
let mut service = MigrationService::new(cold.clone(), policy);
service.set_flush_coordinator(coordinator.clone());
let nodes: Vec<NodeVersion> = (1..=3).map(|i| create_test_node_version(i, 100)).collect();
let result = service.migrate_batch_with_lsn(&nodes, &[], LSN(5));
assert!(result.is_err());
assert!(cold.was_write_attempted());
}
#[test]
fn test_migration_with_lsn_disabled_policy() {
use crate::storage::redb_cold_storage::{RedbColdStorage, RedbConfig};
use tempfile::tempdir;
let temp_dir = tempdir().unwrap();
let cold = Arc::new(
RedbColdStorage::new(
temp_dir.path().join("cold.redb"),
RedbConfig::new().compression(crate::storage::CompressionAlgorithm::None),
)
.unwrap(),
);
let policy = MigrationPolicy::disabled();
let service = MigrationService::new(cold.clone(), policy);
let nodes: Vec<NodeVersion> = (1..=5).map(|i| create_test_node_version(i, 100)).collect();
let lsn = LSN(1000);
let result = service.migrate_batch_with_lsn(&nodes, &[], lsn).unwrap();
assert_eq!(result.nodes_migrated, 0);
assert_eq!(result.edges_migrated, 0);
assert_eq!(result.segments_truncated, 0);
assert!(result.flushed_lsn.is_none());
for node in &nodes {
assert!(!cold.contains_node_version(node.id).unwrap());
}
}
#[test]
fn test_migration_with_lsn_result_helpers() {
let result = MigrationWithLsnResult {
nodes_migrated: 5,
edges_migrated: 3,
segments_truncated: 2,
flushed_lsn: Some(LSN(100)),
};
assert!(result.has_migrations());
assert_eq!(result.total_migrated(), 8);
let empty_result = MigrationWithLsnResult {
nodes_migrated: 0,
edges_migrated: 0,
segments_truncated: 0,
flushed_lsn: None,
};
assert!(!empty_result.has_migrations());
assert_eq!(empty_result.total_migrated(), 0);
}
#[test]
fn test_set_and_get_flush_coordinator() {
use crate::storage::wal::flush_coordinator::FlushCoordinatorConfig;
let temp_dir = tempdir().unwrap();
let config = FlushCoordinatorConfig {
wal_dir: temp_dir.path().to_path_buf(),
..Default::default()
};
let coordinator = Arc::new(FlushCoordinator::new(config).unwrap());
let cold = create_cold_storage();
let policy = MigrationPolicy::default();
let mut service = MigrationService::new(cold, policy);
assert!(service.flush_coordinator().is_none());
service.set_flush_coordinator(coordinator.clone());
assert!(service.flush_coordinator().is_some());
}
#[test]
fn test_truncation_uses_actual_flushed_lsn() {
use crate::storage::redb_cold_storage::{RedbColdStorage, RedbConfig};
use crate::storage::wal::flush_coordinator::FlushCoordinatorConfig;
use tempfile::tempdir;
let temp_dir = tempdir().unwrap();
let config = FlushCoordinatorConfig {
wal_dir: temp_dir.path().to_path_buf(),
..Default::default()
};
let coordinator = Arc::new(FlushCoordinator::new(config).unwrap());
let db_path = temp_dir.path().join("test.redb");
let cold = Arc::new(RedbColdStorage::new(&db_path, RedbConfig::new()).unwrap());
let policy = MigrationPolicy::default();
let mut service = MigrationService::new(cold.clone(), policy);
service.set_flush_coordinator(coordinator.clone());
let result = service.migrate_batch_with_lsn(&[], &[], LSN(100)).unwrap();
assert_eq!(result.flushed_lsn, Some(LSN(100)));
let result = service.migrate_batch_with_lsn(&[], &[], LSN(200)).unwrap();
assert_eq!(result.flushed_lsn, Some(LSN(200)));
assert_eq!(cold.get_flushed_lsn().unwrap(), Some(LSN(200)));
}
#[test]
fn test_no_truncation_without_coordinator() {
use crate::storage::redb_cold_storage::{RedbColdStorage, RedbConfig};
use tempfile::tempdir;
let temp_dir = tempdir().unwrap();
let db_path = temp_dir.path().join("test.redb");
let cold = Arc::new(RedbColdStorage::new(&db_path, RedbConfig::new()).unwrap());
let policy = MigrationPolicy::default();
let service = MigrationService::new(cold.clone(), policy);
let result = service.migrate_batch_with_lsn(&[], &[], LSN(100)).unwrap();
assert_eq!(result.segments_truncated, 0);
assert_eq!(cold.get_flushed_lsn().unwrap(), Some(LSN(100)));
}
#[test]
fn test_lsn_invariant_maintained() {
use crate::storage::redb_cold_storage::{RedbColdStorage, RedbConfig};
use crate::storage::wal::flush_coordinator::FlushCoordinatorConfig;
use tempfile::tempdir;
let temp_dir = tempdir().unwrap();
let config = FlushCoordinatorConfig {
wal_dir: temp_dir.path().to_path_buf(),
..Default::default()
};
let coordinator = Arc::new(FlushCoordinator::new(config).unwrap());
let db_path = temp_dir.path().join("test.redb");
let cold = Arc::new(RedbColdStorage::new(&db_path, RedbConfig::new()).unwrap());
let policy = MigrationPolicy::default();
let mut service = MigrationService::new(cold.clone(), policy);
service.set_flush_coordinator(coordinator.clone());
let lsns = vec![LSN(100), LSN(200), LSN(300), LSN(400), LSN(500)];
for lsn in lsns {
let result = service.migrate_batch_with_lsn(&[], &[], lsn).unwrap();
let cold_lsn = cold.get_flushed_lsn().unwrap();
assert_eq!(
cold_lsn,
Some(lsn),
"Cold storage should have LSN {:?}",
lsn
);
assert_eq!(
result.flushed_lsn, cold_lsn,
"Result LSN should match cold storage LSN"
);
}
assert_eq!(cold.get_flushed_lsn().unwrap(), Some(LSN(500)));
}
}