use std::collections::HashMap;
use std::path::Path;
use serde::{Deserialize, Serialize};
use nodedb_codec::ColumnCodec;
use nodedb_types::timeseries::PartitionMeta;
use nodedb_wal::crypto::WalEncryptionKey;
use super::columnar_segment::encrypt::{decrypt_file, encrypt_file, is_encrypted};
use crate::engine::timeseries::columnar_segment::SegmentError;
pub const PACKED_MAGIC: [u8; 4] = *b"NDPK";
pub const PACKED_VERSION: u16 = 1;
pub const PACKED_TAIL_SIZE: usize = 14;
struct TailCursor<'a> {
data: &'a [u8],
from_end: usize,
}
impl<'a> TailCursor<'a> {
fn new(data: &'a [u8]) -> Self {
Self { data, from_end: 0 }
}
fn read_back(&mut self, n: usize) -> Option<&'a [u8]> {
let new_from_end = self.from_end + n;
if new_from_end > self.data.len() {
return None;
}
let end = self.data.len() - self.from_end;
let start = end - n;
self.from_end = new_from_end;
Some(&self.data[start..end])
}
fn read_back_u32_le(&mut self) -> Option<u32> {
let b = self.read_back(4)?;
Some(u32::from_le_bytes([b[0], b[1], b[2], b[3]]))
}
fn read_back_u16_le(&mut self) -> Option<u16> {
let b = self.read_back(2)?;
Some(u16::from_le_bytes([b[0], b[1]]))
}
fn consumed(&self) -> usize {
self.from_end
}
}
#[derive(
Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
)]
pub struct PackedFooter {
pub column_ranges: HashMap<String, (u64, u64)>,
pub sparse_index_range: Option<(u64, u64)>,
pub schema: Vec<PackedColumnSchema>,
pub meta: PartitionMeta,
#[serde(default)]
pub payload_encrypted: bool,
}
#[derive(
Debug, Clone, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
)]
pub struct PackedColumnSchema {
pub name: String,
pub col_type: String,
pub codec: ColumnCodec,
}
#[derive(thiserror::Error, Debug)]
pub enum PackedError {
#[error("packed partition I/O: {0}")]
Io(String),
#[error("packed partition corrupt: {0}")]
Corrupt(String),
#[error("unsupported packed partition format version: {version}")]
UnsupportedVersion { version: u16 },
#[error("packed partition footer CRC mismatch: stored={stored:#010x} calc={calc:#010x}")]
InvalidFooterCrc { stored: u32, calc: u32 },
#[error("packed partition payload is encrypted (SEGT) but no KEK was provided")]
MissingKek,
#[error("packed partition KEK provided but payload is not encrypted")]
UnexpectedPlaintext,
#[error("packed partition payload encryption failed: {0}")]
EncryptionFailed(String),
#[error("packed partition payload decryption failed: {0}")]
DecryptionFailed(String),
}
impl From<SegmentError> for PackedError {
fn from(e: SegmentError) -> Self {
match e {
SegmentError::MissingKek => PackedError::MissingKek,
SegmentError::UnexpectedPlaintext => PackedError::UnexpectedPlaintext,
SegmentError::EncryptionFailed(m) => PackedError::EncryptionFailed(m),
SegmentError::DecryptionFailed(m) => PackedError::DecryptionFailed(m),
SegmentError::Io(m) => PackedError::Io(m),
SegmentError::Corrupt(m) => PackedError::Corrupt(m),
}
}
}
pub fn write_packed(
output_path: &Path,
columns: &[(String, Vec<u8>)], sparse_index: Option<&[u8]>,
schema: &[PackedColumnSchema],
meta: &PartitionMeta,
kek: Option<&WalEncryptionKey>,
) -> Result<u64, PackedError> {
let mut payload = Vec::new();
let mut column_ranges = HashMap::new();
for (name, data) in columns {
let offset = payload.len() as u64;
payload.extend_from_slice(data);
column_ranges.insert(name.clone(), (offset, data.len() as u64));
}
let sparse_index_range = sparse_index.map(|idx_data| {
let offset = payload.len() as u64;
payload.extend_from_slice(idx_data);
(offset, idx_data.len() as u64)
});
let payload_encrypted = kek.is_some();
let payload_on_disk: Vec<u8> = if let Some(key) = kek {
encrypt_file(key, &payload).map_err(PackedError::from)?
} else {
payload
};
let footer = PackedFooter {
column_ranges,
sparse_index_range,
schema: schema.to_vec(),
meta: meta.clone(),
payload_encrypted,
};
let footer_body = zerompk::to_msgpack_vec(&footer)
.map_err(|e| PackedError::Io(format!("footer encode: {e}")))?;
let mut buf = Vec::with_capacity(payload_on_disk.len() + footer_body.len() + PACKED_TAIL_SIZE);
buf.extend_from_slice(&payload_on_disk);
buf.extend_from_slice(&footer_body);
let footer_len = footer_body.len() as u32;
let crc = crc32c::crc32c(&footer_body);
buf.extend_from_slice(&footer_len.to_le_bytes());
buf.extend_from_slice(&crc.to_le_bytes());
buf.extend_from_slice(&PACKED_VERSION.to_le_bytes());
buf.extend_from_slice(&PACKED_MAGIC);
let total_size = buf.len() as u64;
std::fs::write(output_path, &buf)
.map_err(|e| PackedError::Io(format!("write {}: {e}", output_path.display())))?;
Ok(total_size)
}
pub fn read_footer(file_path: &Path) -> Result<PackedFooter, PackedError> {
let data = std::fs::read(file_path)
.map_err(|e| PackedError::Io(format!("read {}: {e}", file_path.display())))?;
read_footer_from_bytes(&data)
}
pub fn read_footer_from_bytes(data: &[u8]) -> Result<PackedFooter, PackedError> {
if data.len() < PACKED_TAIL_SIZE {
return Err(PackedError::Corrupt(format!(
"file too small for packed tail: {} bytes",
data.len()
)));
}
let mut tail = TailCursor::new(data);
let magic = tail
.read_back(4)
.expect("invariant: PACKED_TAIL_SIZE guard at line 271 ensures tail bytes available");
if magic != PACKED_MAGIC {
return Err(PackedError::Corrupt(format!(
"invalid magic: expected NDPK, got {magic:?}"
)));
}
let version = tail
.read_back_u16_le()
.expect("invariant: PACKED_TAIL_SIZE guard at line 271 ensures tail bytes available");
if version != PACKED_VERSION {
return Err(PackedError::UnsupportedVersion { version });
}
let crc_stored = tail
.read_back_u32_le()
.expect("invariant: PACKED_TAIL_SIZE guard at line 271 ensures tail bytes available");
let footer_len = tail
.read_back_u32_le()
.expect("invariant: PACKED_TAIL_SIZE guard at line 271 ensures tail bytes available")
as usize;
let tail_offset = data.len() - tail.consumed();
if footer_len > tail_offset {
return Err(PackedError::Corrupt(format!(
"footer_len {footer_len} exceeds available bytes {tail_offset}"
)));
}
let body_start = tail_offset - footer_len;
let footer_body = &data[body_start..body_start + footer_len];
let crc_calc = crc32c::crc32c(footer_body);
if crc_stored != crc_calc {
return Err(PackedError::InvalidFooterCrc {
stored: crc_stored,
calc: crc_calc,
});
}
zerompk::from_msgpack(footer_body)
.map_err(|e| PackedError::Corrupt(format!("footer body decode: {e}")))
}
pub fn read_column(
file_path: &Path,
footer: &PackedFooter,
column_name: &str,
kek: Option<&WalEncryptionKey>,
) -> Result<Vec<u8>, PackedError> {
let (offset, length) = footer
.column_ranges
.get(column_name)
.ok_or_else(|| PackedError::Corrupt(format!("column '{column_name}' not in footer")))?;
let data = std::fs::read(file_path).map_err(|e| PackedError::Io(format!("read: {e}")))?;
read_column_from_bytes(&data, footer, *offset, *length, kek)
}
fn read_column_from_bytes(
data: &[u8],
footer: &PackedFooter,
offset: u64,
length: u64,
kek: Option<&WalEncryptionKey>,
) -> Result<Vec<u8>, PackedError> {
let tail_size = PACKED_TAIL_SIZE;
if data.len() < tail_size {
return Err(PackedError::Corrupt("file too small".into()));
}
let footer_len_bytes: [u8; 4] = data[data.len() - tail_size..data.len() - tail_size + 4]
.try_into()
.map_err(|_| PackedError::Corrupt("footer_len read failed".into()))?;
let footer_body_len = u32::from_le_bytes(footer_len_bytes) as usize;
let payload_end = data.len() - tail_size - footer_body_len;
let payload_region = &data[..payload_end];
let plaintext: Vec<u8> = if footer.payload_encrypted {
match kek {
Some(key) => {
if !is_encrypted(payload_region)
.map_err(|e| PackedError::Corrupt(format!("sniff: {e}")))?
{
return Err(PackedError::Corrupt(
"footer says payload_encrypted but no SEGT preamble found".into(),
));
}
decrypt_file(key, payload_region).map_err(PackedError::from)?
}
None => return Err(PackedError::MissingKek),
}
} else {
match kek {
None => payload_region.to_vec(),
Some(_) => {
return Err(PackedError::UnexpectedPlaintext);
}
}
};
let start = offset as usize;
let end = start + length as usize;
if end > plaintext.len() {
return Err(PackedError::Corrupt(
"column range exceeds payload size".into(),
));
}
Ok(plaintext[start..end].to_vec())
}
pub fn http_range_headers(footer: &PackedFooter, columns: &[&str]) -> Vec<(String, String)> {
columns
.iter()
.filter_map(|&name| {
footer.column_ranges.get(name).map(|&(offset, length)| {
let end = offset + length - 1; (name.to_string(), format!("bytes={offset}-{end}"))
})
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use nodedb_types::timeseries::PartitionState;
use tempfile::TempDir;
fn test_meta() -> PartitionMeta {
PartitionMeta {
min_ts: 1000,
max_ts: 2000,
row_count: 100,
size_bytes: 0,
schema_version: 1,
state: PartitionState::Sealed,
interval_ms: 86_400_000,
last_flushed_wal_lsn: 0,
column_stats: HashMap::new(),
max_system_ts: 0,
}
}
fn test_kek() -> WalEncryptionKey {
WalEncryptionKey::from_bytes(&[0x37u8; 32]).unwrap()
}
#[test]
fn write_and_read_footer() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("test.ndpk");
let columns = vec![
("timestamp".to_string(), vec![1u8, 2, 3, 4, 5]),
("value".to_string(), vec![10, 20, 30]),
];
let schema = vec![
PackedColumnSchema {
name: "timestamp".into(),
col_type: "timestamp".into(),
codec: ColumnCodec::DeltaFastLanesLz4,
},
PackedColumnSchema {
name: "value".into(),
col_type: "float64".into(),
codec: ColumnCodec::AlpFastLanesLz4,
},
];
let size = write_packed(&path, &columns, None, &schema, &test_meta(), None).unwrap();
assert!(size > 0);
let footer = read_footer(&path).unwrap();
assert_eq!(footer.column_ranges.len(), 2);
assert_eq!(footer.column_ranges["timestamp"], (0, 5));
assert_eq!(footer.column_ranges["value"], (5, 3));
assert_eq!(footer.meta.row_count, 100);
assert!(!footer.payload_encrypted);
}
#[test]
fn read_single_column() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("test.ndpk");
let ts_data = vec![1u8, 2, 3, 4, 5];
let val_data = vec![10u8, 20, 30];
let columns = vec![
("timestamp".to_string(), ts_data.clone()),
("value".to_string(), val_data.clone()),
];
write_packed(&path, &columns, None, &[], &test_meta(), None).unwrap();
let footer = read_footer(&path).unwrap();
let ts_bytes = read_column(&path, &footer, "timestamp", None).unwrap();
assert_eq!(ts_bytes, ts_data);
let val_bytes = read_column(&path, &footer, "value", None).unwrap();
assert_eq!(val_bytes, val_data);
}
#[test]
fn http_range_headers_for_projection() {
let mut column_ranges = HashMap::new();
column_ranges.insert("timestamp".to_string(), (0u64, 1000));
column_ranges.insert("cpu".to_string(), (1000, 500));
column_ranges.insert("host".to_string(), (1500, 200));
let footer = PackedFooter {
column_ranges,
sparse_index_range: None,
schema: vec![],
meta: test_meta(),
payload_encrypted: false,
};
let ranges = http_range_headers(&footer, &["cpu"]);
assert_eq!(ranges.len(), 1);
assert_eq!(ranges[0].0, "cpu");
assert_eq!(ranges[0].1, "bytes=1000-1499");
}
#[test]
fn with_sparse_index() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("test.ndpk");
let sparse_data = vec![0xDE, 0xAD, 0xBE, 0xEF];
let columns = vec![("ts".to_string(), vec![1, 2, 3])];
write_packed(&path, &columns, Some(&sparse_data), &[], &test_meta(), None).unwrap();
let footer = read_footer(&path).unwrap();
assert!(footer.sparse_index_range.is_some());
let (offset, length) = footer.sparse_index_range.unwrap();
assert_eq!(offset, 3); assert_eq!(length, 4);
}
#[test]
fn invalid_magic() {
let data = vec![0u8; 100]; assert!(read_footer_from_bytes(&data).is_err());
}
#[test]
fn projection_pushdown_skips_columns() {
let mut ranges = HashMap::new();
ranges.insert("ts".to_string(), (0u64, 10000));
ranges.insert("cpu".to_string(), (10000, 5000));
ranges.insert("mem".to_string(), (15000, 5000));
ranges.insert("host".to_string(), (20000, 2000));
let footer = PackedFooter {
column_ranges: ranges,
sparse_index_range: None,
schema: vec![],
meta: test_meta(),
payload_encrypted: false,
};
let headers = http_range_headers(&footer, &["ts", "cpu"]);
assert_eq!(headers.len(), 2);
let fetched: u64 = headers
.iter()
.map(|(name, _)| footer.column_ranges[name].1)
.sum();
assert_eq!(fetched, 15000);
}
#[test]
fn packed_golden_tail_bytes() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("golden.ndpk");
let columns = vec![("ts".to_string(), vec![1u8, 2, 3])];
write_packed(&path, &columns, None, &[], &test_meta(), None).unwrap();
let data = std::fs::read(&path).unwrap();
let n = data.len();
assert_eq!(&data[n - 4..], b"NDPK");
let version = u16::from_le_bytes([data[n - 6], data[n - 5]]);
assert_eq!(version, PACKED_VERSION);
let crc_stored = u32::from_le_bytes([data[n - 10], data[n - 9], data[n - 8], data[n - 7]]);
let footer_len =
u32::from_le_bytes([data[n - 14], data[n - 13], data[n - 12], data[n - 11]]) as usize;
let body_end = n - PACKED_TAIL_SIZE;
let body_start = body_end - footer_len;
let footer_body = &data[body_start..body_end];
let crc_calc = crc32c::crc32c(footer_body);
assert_eq!(
crc_stored, crc_calc,
"golden: stored CRC does not match re-computed CRC over body"
);
let footer: PackedFooter = zerompk::from_msgpack(footer_body).unwrap();
assert_eq!(footer.column_ranges["ts"], (0, 3));
}
#[test]
fn packed_rejects_unsupported_version() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("v2.ndpk");
let columns = vec![("ts".to_string(), vec![1u8])];
write_packed(&path, &columns, None, &[], &test_meta(), None).unwrap();
let mut data = std::fs::read(&path).unwrap();
let n = data.len();
let v2 = 2u16.to_le_bytes();
data[n - 6] = v2[0];
data[n - 5] = v2[1];
let err = read_footer_from_bytes(&data).unwrap_err();
assert!(
matches!(err, PackedError::UnsupportedVersion { version: 2 }),
"expected UnsupportedVersion {{version: 2}}, got {err:?}"
);
}
#[test]
fn packed_rejects_bad_crc() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("bad_crc.ndpk");
let columns = vec![("ts".to_string(), vec![1u8])];
write_packed(&path, &columns, None, &[], &test_meta(), None).unwrap();
let mut data = std::fs::read(&path).unwrap();
let n = data.len();
data[n - 10] ^= 0xFF;
let err = read_footer_from_bytes(&data).unwrap_err();
assert!(
matches!(err, PackedError::InvalidFooterCrc { .. }),
"expected InvalidFooterCrc, got {err:?}"
);
}
use nodedb_wal::crypto::SEGMENT_ENVELOPE_PREAMBLE_SIZE as SEGT_PREAMBLE_SIZE;
use super::super::columnar_segment::encrypt::SEGT_MAGIC;
#[test]
fn packed_partition_payload_encrypted_footer_plaintext() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("enc.ndpk");
let kek = test_kek();
let columns = vec![
("timestamp".to_string(), vec![1u8, 2, 3, 4]),
("value".to_string(), vec![10u8, 20, 30, 40]),
];
write_packed(&path, &columns, None, &[], &test_meta(), Some(&kek)).unwrap();
let data = std::fs::read(&path).unwrap();
let n = data.len();
assert_eq!(&data[n - 4..], b"NDPK");
let footer = read_footer(&path).unwrap();
assert!(footer.payload_encrypted);
assert_eq!(footer.column_ranges.len(), 2);
let footer_len = {
let bytes: [u8; 4] = data[n - PACKED_TAIL_SIZE..n - PACKED_TAIL_SIZE + 4]
.try_into()
.unwrap();
u32::from_le_bytes(bytes) as usize
};
let payload_end = n - PACKED_TAIL_SIZE - footer_len;
assert_eq!(&data[..4], &SEGT_MAGIC, "payload must start with SEGT");
assert!(
payload_end > SEGT_PREAMBLE_SIZE,
"payload must include preamble + ciphertext"
);
}
#[test]
fn packed_partition_read_column_decrypts() {
let tmp = TempDir::new().unwrap();
let path = tmp.path().join("enc.ndpk");
let kek = test_kek();
let ts_data = vec![0xAAu8, 0xBB, 0xCC];
let val_data = vec![0x11u8, 0x22];
let columns = vec![
("timestamp".to_string(), ts_data.clone()),
("value".to_string(), val_data.clone()),
];
write_packed(&path, &columns, None, &[], &test_meta(), Some(&kek)).unwrap();
let footer = read_footer(&path).unwrap();
assert!(footer.payload_encrypted);
let ts_read = read_column(&path, &footer, "timestamp", Some(&kek)).unwrap();
assert_eq!(ts_read, ts_data);
let val_read = read_column(&path, &footer, "value", Some(&kek)).unwrap();
assert_eq!(val_read, val_data);
}
}