use cqlite_core::{
error::Result as CqliteResult,
platform::Platform,
storage::sstable::statistics_reader::StatisticsReader,
testing::dataset_helpers::{
derive_reference_paths_from_data_db, list_tables, read_jsonl_rows,
resolve_table_to_sstable_path, should_ignore_file, DatasetError,
},
Config,
};
use std::path::PathBuf;
use std::sync::Arc;
use tokio::fs;
fn derive_statistics_from_data(data_db: &std::path::Path) -> Option<PathBuf> {
let name = data_db.file_name()?.to_str()?;
if !name.ends_with("-Data.db") {
return None;
}
let stats_name = name.replace("-Data.db", "-Statistics.db");
Some(data_db.parent()?.join(stats_name))
}
#[tokio::test]
#[ignore = "nb-format Statistics.db parsing deferred to M2 (Issue #105)"]
async fn test_data_jsonl_vs_statistics_row_counts() -> CqliteResult<()> {
if let Err(DatasetError::MetadataNotFound { .. }) =
cqlite_core::testing::dataset_helpers::load_metadata()
{
println!("Datasets not available; skipping JSONL parity test");
return Ok(());
}
let targets = vec![
("test_basic", "simple_table"),
("test_timeseries", "sensor_data"),
("test_wide_rows", "wide_partition_table"),
("test_collections", "collection_table"),
];
let mut tested = 0usize;
for (keyspace, table) in targets {
let dir = match resolve_table_to_sstable_path(keyspace, table) {
Ok(p) => p,
Err(DatasetError::DatasetNotFound { .. }) => continue,
Err(e) => {
println!("Resolve failed for {}.{}: {}", keyspace, table, e);
continue;
}
};
let mut read = fs::read_dir(&dir).await?;
while let Some(entry) = read.next_entry().await? {
let path = entry.path();
if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
if should_ignore_file(name) {
continue;
}
if !name.ends_with("-Data.db") {
continue;
}
let Some((data_jsonl, _stats_txt, _summary_txt)) =
derive_reference_paths_from_data_db(&path)
else {
println!("Could not derive reference paths for {}", path.display());
continue;
};
if !data_jsonl.exists() {
println!("Missing Data JSONL reference: {}", data_jsonl.display());
continue;
}
let Some(stat_path) = derive_statistics_from_data(&path) else {
continue;
};
if !stat_path.exists() {
continue;
}
let config = Config::default();
let platform = Arc::new(Platform::new(&config).await?);
let reader = match StatisticsReader::open(&stat_path, platform.clone()).await {
Ok(r) => r,
Err(e) => {
if e.to_string().contains("Failed to parse Statistics.db")
|| e.to_string().contains("not yet implemented")
|| e.to_string().contains("UnsupportedFormat")
{
println!(
"Skipping nb-format Statistics.db (parsing deferred to M2): {}",
stat_path.display()
);
continue;
}
println!("Open Statistics failed: {}", e);
continue;
}
};
let mut jsonl_count: u64 = 0;
if let Ok(iter) = read_jsonl_rows(&data_jsonl) {
for v in iter {
if let Some(rows) = v.get("rows").and_then(|r| r.as_array()) {
jsonl_count += rows.len() as u64;
}
}
} else {
println!("Failed to read JSONL: {}", data_jsonl.display());
continue;
}
let meta_count = list_tables(Some(keyspace))
.expect("list_tables failed")
.into_iter()
.find(|t| t.table == table)
.map(|t| t.row_count)
.unwrap_or(0);
let our_total = reader.live_row_count().max(reader.row_count());
println!(
"Parity {}.{} [{}]: JSONL={}, meta={}, our_total={}",
keyspace, table, name, jsonl_count, meta_count, our_total
);
assert_eq!(
jsonl_count,
meta_count,
"Reference mismatch for {}: jsonl={} metadata={}",
path.display(),
jsonl_count,
meta_count
);
if our_total != jsonl_count {
println!(
"INFO: Reader count differs for {} (jsonl={} our_total={})",
path.display(),
jsonl_count,
our_total
);
}
tested += 1;
}
}
}
assert!(
tested > 0,
"No SSTables tested (missing references or datasets)"
);
Ok(())
}