use std::collections::HashMap;
use std::path::Path;
use serde::{Deserialize, Serialize};
use nodedb_codec::ColumnCodec;
use nodedb_types::timeseries::PartitionMeta;
const MAGIC: &[u8; 4] = b"NDPK";
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PackedFooter {
pub column_ranges: HashMap<String, (u64, u64)>,
pub sparse_index_range: Option<(u64, u64)>,
pub schema: Vec<PackedColumnSchema>,
pub meta: PartitionMeta,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PackedColumnSchema {
pub name: String,
pub col_type: String,
pub codec: ColumnCodec,
}
pub fn write_packed(
output_path: &Path,
columns: &[(String, Vec<u8>)], sparse_index: Option<&[u8]>,
schema: &[PackedColumnSchema],
meta: &PartitionMeta,
) -> Result<u64, PackedError> {
let mut buf = Vec::new();
let mut column_ranges = HashMap::new();
for (name, data) in columns {
let offset = buf.len() as u64;
buf.extend_from_slice(data);
column_ranges.insert(name.clone(), (offset, data.len() as u64));
}
let sparse_index_range = sparse_index.map(|idx_data| {
let offset = buf.len() as u64;
buf.extend_from_slice(idx_data);
(offset, idx_data.len() as u64)
});
let footer = PackedFooter {
column_ranges,
sparse_index_range,
schema: schema.to_vec(),
meta: meta.clone(),
};
let footer_json =
serde_json::to_vec(&footer).map_err(|e| PackedError::Io(format!("footer json: {e}")))?;
let footer_offset = buf.len();
buf.extend_from_slice(&footer_json);
let footer_len = (buf.len() - footer_offset) as u32;
buf.extend_from_slice(&footer_len.to_le_bytes());
buf.extend_from_slice(MAGIC);
let total_size = buf.len() as u64;
std::fs::write(output_path, &buf)
.map_err(|e| PackedError::Io(format!("write {}: {e}", output_path.display())))?;
Ok(total_size)
}
pub fn read_footer(file_path: &Path) -> Result<PackedFooter, PackedError> {
let data = std::fs::read(file_path)
.map_err(|e| PackedError::Io(format!("read {}: {e}", file_path.display())))?;
read_footer_from_bytes(&data)
}
pub fn read_footer_from_bytes(data: &[u8]) -> Result<PackedFooter, PackedError> {
if data.len() < 8 {
return Err(PackedError::Corrupt("file too small for footer".into()));
}
let magic = &data[data.len() - 4..];
if magic != MAGIC {
return Err(PackedError::Corrupt(format!(
"invalid magic: expected NDPK, got {:?}",
magic
)));
}
let footer_len = u32::from_le_bytes([
data[data.len() - 8],
data[data.len() - 7],
data[data.len() - 6],
data[data.len() - 5],
]) as usize;
let footer_start = data.len() - 8 - footer_len;
if footer_start > data.len() {
return Err(PackedError::Corrupt(
"footer length exceeds file size".into(),
));
}
let footer_bytes = &data[footer_start..footer_start + footer_len];
serde_json::from_slice(footer_bytes)
.map_err(|e| PackedError::Corrupt(format!("footer json: {e}")))
}
pub fn read_column(
file_path: &Path,
footer: &PackedFooter,
column_name: &str,
) -> Result<Vec<u8>, PackedError> {
let (offset, length) = footer
.column_ranges
.get(column_name)
.ok_or_else(|| PackedError::Corrupt(format!("column '{column_name}' not in footer")))?;
let data = std::fs::read(file_path).map_err(|e| PackedError::Io(format!("read: {e}")))?;
let start = *offset as usize;
let end = start + *length as usize;
if end > data.len() {
return Err(PackedError::Corrupt(
"column range exceeds file size".into(),
));
}
Ok(data[start..end].to_vec())
}
pub fn http_range_headers(footer: &PackedFooter, columns: &[&str]) -> Vec<(String, String)> {
columns
.iter()
.filter_map(|&name| {
footer.column_ranges.get(name).map(|&(offset, length)| {
let end = offset + length - 1; (name.to_string(), format!("bytes={offset}-{end}"))
})
})
.collect()
}
#[derive(thiserror::Error, Debug)]
pub enum PackedError {
#[error("packed partition I/O: {0}")]
Io(String),
#[error("packed partition corrupt: {0}")]
Corrupt(String),
}
#[cfg(test)]
mod tests {
use super::*;
use nodedb_types::timeseries::PartitionState;
use tempfile::TempDir;
fn test_meta() -> PartitionMeta {
PartitionMeta {
min_ts: 1000,
max_ts: 2000,
row_count: 100,
size_bytes: 0,
schema_version: 1,
state: PartitionState::Sealed,
interval_ms: 86_400_000,
last_flushed_wal_lsn: 0,
column_stats: HashMap::new(),
}
}
#[test]
fn write_and_read_footer() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("test.ndpk");
let columns = vec![
("timestamp".to_string(), vec![1u8, 2, 3, 4, 5]),
("value".to_string(), vec![10, 20, 30]),
];
let schema = vec![
PackedColumnSchema {
name: "timestamp".into(),
col_type: "timestamp".into(),
codec: ColumnCodec::DeltaFastLanesLz4,
},
PackedColumnSchema {
name: "value".into(),
col_type: "float64".into(),
codec: ColumnCodec::AlpFastLanesLz4,
},
];
let size = write_packed(&path, &columns, None, &schema, &test_meta()).unwrap();
assert!(size > 0);
let footer = read_footer(&path).unwrap();
assert_eq!(footer.column_ranges.len(), 2);
assert_eq!(footer.column_ranges["timestamp"], (0, 5));
assert_eq!(footer.column_ranges["value"], (5, 3));
assert_eq!(footer.meta.row_count, 100);
}
#[test]
fn read_single_column() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("test.ndpk");
let ts_data = vec![1u8, 2, 3, 4, 5];
let val_data = vec![10u8, 20, 30];
let columns = vec![
("timestamp".to_string(), ts_data.clone()),
("value".to_string(), val_data.clone()),
];
write_packed(&path, &columns, None, &[], &test_meta()).unwrap();
let footer = read_footer(&path).unwrap();
let ts_bytes = read_column(&path, &footer, "timestamp").unwrap();
assert_eq!(ts_bytes, ts_data);
let val_bytes = read_column(&path, &footer, "value").unwrap();
assert_eq!(val_bytes, val_data);
}
#[test]
fn http_range_headers_for_projection() {
let mut column_ranges = HashMap::new();
column_ranges.insert("timestamp".to_string(), (0u64, 1000));
column_ranges.insert("cpu".to_string(), (1000, 500));
column_ranges.insert("host".to_string(), (1500, 200));
let footer = PackedFooter {
column_ranges,
sparse_index_range: None,
schema: vec![],
meta: test_meta(),
};
let ranges = http_range_headers(&footer, &["cpu"]);
assert_eq!(ranges.len(), 1);
assert_eq!(ranges[0].0, "cpu");
assert_eq!(ranges[0].1, "bytes=1000-1499");
}
#[test]
fn with_sparse_index() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("test.ndpk");
let sparse_data = vec![0xDE, 0xAD, 0xBE, 0xEF];
let columns = vec![("ts".to_string(), vec![1, 2, 3])];
write_packed(&path, &columns, Some(&sparse_data), &[], &test_meta()).unwrap();
let footer = read_footer(&path).unwrap();
assert!(footer.sparse_index_range.is_some());
let (offset, length) = footer.sparse_index_range.unwrap();
assert_eq!(offset, 3); assert_eq!(length, 4);
}
#[test]
fn invalid_magic() {
let data = vec![0u8; 100]; assert!(read_footer_from_bytes(&data).is_err());
}
#[test]
fn projection_pushdown_skips_columns() {
let mut ranges = HashMap::new();
ranges.insert("ts".to_string(), (0u64, 10000));
ranges.insert("cpu".to_string(), (10000, 5000));
ranges.insert("mem".to_string(), (15000, 5000));
ranges.insert("host".to_string(), (20000, 2000));
let footer = PackedFooter {
column_ranges: ranges,
sparse_index_range: None,
schema: vec![],
meta: test_meta(),
};
let headers = http_range_headers(&footer, &["ts", "cpu"]);
assert_eq!(headers.len(), 2);
let fetched: u64 = headers
.iter()
.map(|(name, _)| footer.column_ranges[name].1)
.sum();
assert_eq!(fetched, 15000);
}
}