use super::*;
#[test]
fn node_row_summary_matches_logical_node_shape() {
let base_dir = temp_dir("node_row_summary_shape");
let wal_dir = base_dir.join("wal");
let manifest_path = base_dir.join("ir.manifest");
let sstable_dir = base_dir.join("sst");
let reactor = std::sync::Arc::new(crate::core::reactor::DeterministicReactor::new(
std::time::SystemTime::UNIX_EPOCH,
912,
));
let mut handle = open_store_with_reactor(
StorageConfig {
buffer_pool_pages: 8,
wal_dir,
wal_segment_max_bytes: 1024 * 1024,
manifest_path,
sstable_dir,
},
reactor,
)
.unwrap();
put_full_node(&mut handle, 42, 2, &[7, 9, 11]).unwrap();
put_edge_delta(&mut handle, &encode_delta(42, 3, b"delta-a")).unwrap();
put_edge_delta(&mut handle, &encode_delta(42, 4, b"delta-b")).unwrap();
let logical = get_logical_node(&mut handle, 42).unwrap();
let summary = get_node_row_summary(&mut handle, 42).unwrap().unwrap();
assert_eq!(summary.has_full, logical.full.is_some());
assert_eq!(summary.delta_count, logical.deltas.len());
assert_eq!(summary.adjacency_degree, logical.adjacency().len());
}
#[test]
fn logical_node_cache_is_invalidated_on_write() {
let base_dir = temp_dir("logical_node_cache_invalidate");
let wal_dir = base_dir.join("wal");
let manifest_path = base_dir.join("ir.manifest");
let sstable_dir = base_dir.join("sst");
let reactor = std::sync::Arc::new(crate::core::reactor::DeterministicReactor::new(
std::time::SystemTime::UNIX_EPOCH,
913,
));
let mut handle = open_store_with_reactor(
StorageConfig {
buffer_pool_pages: 8,
wal_dir,
wal_segment_max_bytes: 1024 * 1024,
manifest_path,
sstable_dir,
},
reactor,
)
.unwrap();
put_full_node(&mut handle, 9, 1, &[1, 2]).unwrap();
flush(&mut handle).unwrap();
let first = get_logical_node(&mut handle, 9).unwrap();
assert_eq!(first.deltas.len(), 0);
put_edge_delta(&mut handle, &encode_delta(9, 2, b"delta-new")).unwrap();
let second = get_logical_node(&mut handle, 9).unwrap();
assert_eq!(second.deltas.len(), 1);
}
#[test]
fn logical_node_cache_reuse_avoids_repeated_read_accounting() {
let base_dir = temp_dir("logical_node_cache_reuse");
let wal_dir = base_dir.join("wal");
let manifest_path = base_dir.join("ir.manifest");
let sstable_dir = base_dir.join("sst");
let reactor = std::sync::Arc::new(crate::core::reactor::DeterministicReactor::new(
std::time::SystemTime::UNIX_EPOCH,
914,
));
let mut handle = open_store_with_reactor(
StorageConfig {
buffer_pool_pages: 8,
wal_dir,
wal_segment_max_bytes: 1024 * 1024,
manifest_path,
sstable_dir,
},
reactor,
)
.unwrap();
put_full_node(&mut handle, 77, 1, &[1, 2, 3, 4]).unwrap();
put_edge_delta(&mut handle, &encode_delta(77, 2, b"delta-a")).unwrap();
flush(&mut handle).unwrap();
let baseline = handle.metrics.logical_bytes_read;
let first = get_logical_node(&mut handle, 77).unwrap();
assert!(first.full.is_some());
let after_first = handle.metrics.logical_bytes_read;
assert!(after_first > baseline);
let _second = get_logical_node(&mut handle, 77).unwrap();
let after_second = handle.metrics.logical_bytes_read;
assert_eq!(after_second, after_first);
}
#[test]
fn logical_node_cache_clears_on_flush_and_compaction() {
let base_dir = temp_dir("logical_node_cache_clear_flush_compact");
let wal_dir = base_dir.join("wal");
let manifest_path = base_dir.join("ir.manifest");
let sstable_dir = base_dir.join("sst");
let reactor = std::sync::Arc::new(crate::core::reactor::DeterministicReactor::new(
std::time::SystemTime::UNIX_EPOCH,
915,
));
let mut handle = open_store_with_reactor(
StorageConfig {
buffer_pool_pages: 8,
wal_dir,
wal_segment_max_bytes: 1024 * 1024,
manifest_path,
sstable_dir,
},
reactor,
)
.unwrap();
put_full_node(&mut handle, 1, 1, &[2]).unwrap();
flush(&mut handle).unwrap();
let _ = get_logical_node(&mut handle, 1).unwrap();
let read_after_cache = handle.metrics.logical_bytes_read;
put_edge_delta(&mut handle, &encode_delta(2, 1, b"x")).unwrap();
flush(&mut handle).unwrap();
let _ = get_logical_node(&mut handle, 1).unwrap();
let read_after_flush = handle.metrics.logical_bytes_read;
assert!(read_after_flush > read_after_cache);
put_edge_delta(&mut handle, &encode_delta(3, 1, b"a")).unwrap();
flush(&mut handle).unwrap();
put_edge_delta(&mut handle, &encode_delta(4, 1, b"b")).unwrap();
flush(&mut handle).unwrap();
assert!(handle.l0_runs.len() >= 2);
compact(&mut handle, Some(0)).unwrap();
let before_reget = handle.metrics.logical_bytes_read;
let _ = get_logical_node(&mut handle, 1).unwrap();
let after_reget = handle.metrics.logical_bytes_read;
assert!(after_reget > before_reget);
}
#[test]
fn open_store_rejects_second_handle_when_lock_held() {
let base_dir = temp_dir("data_lock_exclusive");
let wal_dir = base_dir.join("wal");
let manifest_path = base_dir.join("ir.manifest");
let sstable_dir = base_dir.join("sst");
let reactor = std::sync::Arc::new(crate::core::reactor::DeterministicReactor::new(
std::time::SystemTime::UNIX_EPOCH,
916,
));
let _first = open_store_with_reactor(
StorageConfig {
buffer_pool_pages: 8,
wal_dir: wal_dir.clone(),
wal_segment_max_bytes: 1024 * 1024,
manifest_path: manifest_path.clone(),
sstable_dir: sstable_dir.clone(),
},
reactor.clone(),
)
.unwrap();
let err = match open_store_with_reactor(
StorageConfig {
buffer_pool_pages: 8,
wal_dir,
wal_segment_max_bytes: 1024 * 1024,
manifest_path,
sstable_dir,
},
reactor,
) {
Ok(_) => panic!("expected second open to fail while lock is held"),
Err(err) => err,
};
match err {
StorageError::InvalidInput(msg) => assert!(msg.contains("already locked")),
other => panic!("unexpected error: {:?}", other),
}
}
#[test]
fn open_store_lock_releases_after_drop() {
let base_dir = temp_dir("data_lock_release");
let wal_dir = base_dir.join("wal");
let manifest_path = base_dir.join("ir.manifest");
let sstable_dir = base_dir.join("sst");
let reactor = std::sync::Arc::new(crate::core::reactor::DeterministicReactor::new(
std::time::SystemTime::UNIX_EPOCH,
917,
));
{
let _handle = open_store_with_reactor(
StorageConfig {
buffer_pool_pages: 8,
wal_dir: wal_dir.clone(),
wal_segment_max_bytes: 1024 * 1024,
manifest_path: manifest_path.clone(),
sstable_dir: sstable_dir.clone(),
},
reactor.clone(),
)
.unwrap();
}
let _reopen = open_store_with_reactor(
StorageConfig {
buffer_pool_pages: 8,
wal_dir,
wal_segment_max_bytes: 1024 * 1024,
manifest_path,
sstable_dir,
},
reactor,
)
.unwrap();
}
#[test]
fn explicit_sync_succeeds_under_manual_policy() {
let _guard = EnvVarGuard::set("IR_WAL_SYNC_POLICY", "manual");
let base_dir = temp_dir("manual_sync_api");
let wal_dir = base_dir.join("wal");
let manifest_path = base_dir.join("ir.manifest");
let sstable_dir = base_dir.join("sst");
let reactor = std::sync::Arc::new(crate::core::reactor::DeterministicReactor::new(
std::time::SystemTime::UNIX_EPOCH,
918,
));
let mut handle = open_store_with_reactor(
StorageConfig {
buffer_pool_pages: 8,
wal_dir,
wal_segment_max_bytes: 1024 * 1024,
manifest_path,
sstable_dir,
},
reactor,
)
.unwrap();
put_edge_delta(&mut handle, &encode_delta(1, 1, b"a")).unwrap();
sync(&mut handle).unwrap();
}