use std::fs;
use tempfile::TempDir;
use tsink::{DataPoint, Row, StorageBuilder};
#[test]
fn test_memory_mapped_bounds_checking() {
let temp_dir = TempDir::new().unwrap();
{
let storage = StorageBuilder::new()
.with_data_path(temp_dir.path())
.build()
.unwrap();
let rows = vec![
Row::new("test_metric", DataPoint::new(1000, 1.0)),
Row::new("test_metric", DataPoint::new(2000, 2.0)),
Row::new("test_metric", DataPoint::new(3000, 3.0)),
];
storage.insert_rows(&rows).unwrap();
storage.close().unwrap();
}
let segments_root = temp_dir
.path()
.join("lane_numeric")
.join("segments")
.join("L0");
let segment_dir = fs::read_dir(segments_root)
.unwrap()
.filter_map(|entry| entry.ok())
.map(|entry| entry.path())
.find(|path| {
path.is_dir()
&& path
.file_name()
.and_then(|s| s.to_str())
.map(|name| name.starts_with("seg-"))
.unwrap_or(false)
})
.expect("expected at least one persisted segment");
let chunks_file = segment_dir.join("chunks.bin");
fs::write(&chunks_file, vec![0u8; 1]).unwrap();
let storage = StorageBuilder::new()
.with_data_path(temp_dir.path())
.build()
.unwrap();
assert!(
storage
.select("test_metric", &[], 0, 4000)
.unwrap()
.is_empty(),
"corrupted startup segment should be quarantined instead of remaining queryable"
);
assert!(
!segment_dir.exists(),
"corrupted startup segment root should be moved out of the visible inventory"
);
let quarantined = segment_dir
.parent()
.unwrap()
.read_dir()
.unwrap()
.filter_map(|entry| entry.ok())
.map(|entry| entry.path())
.filter(|path| {
path.is_dir()
&& path
.file_name()
.and_then(|name| name.to_str())
.is_some_and(|name| name.starts_with(".tmp-tsink-startup-segment-quarantine-"))
})
.collect::<Vec<_>>();
assert_eq!(quarantined.len(), 1);
storage.close().unwrap();
}
#[test]
fn test_large_offset_bounds_check() {
let temp_dir = TempDir::new().unwrap();
let storage = StorageBuilder::new()
.with_data_path(temp_dir.path())
.with_partition_duration(std::time::Duration::from_secs(1))
.build()
.unwrap();
let mut rows = Vec::new();
for i in 0..200 {
rows.push(Row::new(
format!("metric_{}", i % 10),
DataPoint::new(i as i64 * 1000, i as f64),
));
}
storage.insert_rows(&rows).unwrap();
let result = storage.select("metric_0", &[], i64::MIN, i64::MAX);
assert!(result.is_ok());
}
#[test]
fn test_empty_partition_handling() {
let temp_dir = TempDir::new().unwrap();
let data_dir = temp_dir.path().join("p-0-1000");
fs::create_dir_all(&data_dir).unwrap();
fs::write(data_dir.join("data"), []).unwrap();
let meta = r#"{
"min_timestamp": 0,
"max_timestamp": 1000,
"num_data_points": 0,
"metrics": {},
"timestamp_precision": "Nanoseconds",
"created_at": {"secs_since_epoch": 0, "nanos_since_epoch": 0}
}"#;
fs::write(data_dir.join("meta.json"), meta).unwrap();
let storage = StorageBuilder::new()
.with_data_path(temp_dir.path())
.build()
.unwrap();
let result = storage.select("any_metric", &[], 1, 2000);
match result {
Ok(points) => assert_eq!(points.len(), 0),
Err(e) => panic!("Query failed with error: {:?}", e),
}
}
#[test]
fn test_malformed_metadata_handling() {
let temp_dir = TempDir::new().unwrap();
let data_dir = temp_dir.path().join("p-0-1000");
fs::create_dir_all(&data_dir).unwrap();
fs::write(data_dir.join("meta.json"), b"not valid json").unwrap();
fs::write(data_dir.join("data"), vec![0u8; 100]).unwrap();
let storage = StorageBuilder::new()
.with_data_path(temp_dir.path())
.build();
assert!(
storage.is_ok(),
"legacy malformed partition metadata should be ignored"
);
}