use std::sync::Arc;
use crate::core::{Result, Row, Schema};
use super::io;
use super::writer::{FrozenVolume, VolumeBuilder};
pub fn seal_rows(schema: &Schema, rows: &[(i64, Row)]) -> FrozenVolume {
let mut builder = VolumeBuilder::with_capacity(schema, rows.len());
for (row_id, row) in rows {
builder.add_row(*row_id, row);
}
builder.finish()
}
pub fn seal_and_persist(
schema: &Schema,
rows: &[(i64, Row)],
volume_dir: &std::path::Path,
table_name: &str,
) -> Result<(Arc<FrozenVolume>, std::path::PathBuf, u64)> {
seal_and_persist_opts(schema, rows, volume_dir, table_name, true)
}
pub fn seal_and_persist_opts(
schema: &Schema,
rows: &[(i64, Row)],
volume_dir: &std::path::Path,
table_name: &str,
compress: bool,
) -> Result<(Arc<FrozenVolume>, std::path::PathBuf, u64)> {
let mut volume = seal_rows(schema, rows);
let volume_id = io::next_volume_id();
let (path, store) =
io::write_volume_to_disk_opts(volume_dir, table_name, volume_id, &volume, compress)?;
volume.columns.attach_compressed_store(store);
Ok((Arc::new(volume), path, volume_id))
}
pub fn seal_and_persist_multi(
schema: &Schema,
rows: &[(i64, Row)],
volume_dir: &std::path::Path,
table_name: &str,
compress: bool,
target_rows: usize,
) -> Result<Vec<(Arc<FrozenVolume>, std::path::PathBuf, u64)>> {
let row_group_size = 65_536usize;
let chunk_size = (target_rows / row_group_size).max(1) * row_group_size;
if rows.len() <= chunk_size || target_rows == 0 {
let (vol, path, id) =
seal_and_persist_opts(schema, rows, volume_dir, table_name, compress)?;
return Ok(vec![(vol, path, id)]);
}
let mut results = Vec::new();
for chunk in rows.chunks(chunk_size) {
let mut volume = seal_rows(schema, chunk);
let volume_id = io::next_volume_id();
match io::write_volume_to_disk_opts(volume_dir, table_name, volume_id, &volume, compress) {
Ok((path, store)) => {
volume.columns.attach_compressed_store(store);
results.push((Arc::new(volume), path, volume_id));
}
Err(e) => {
for (_, path, _) in &results {
let _ = std::fs::remove_file(path);
}
return Err(e);
}
}
}
Ok(results)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::{DataType, SchemaBuilder, Value};
#[test]
fn test_seal_basic() {
let schema = SchemaBuilder::new("test")
.column("id", DataType::Integer, false, true)
.column("name", DataType::Text, false, false)
.build();
let rows = vec![
(
1,
Row::from_values(vec![Value::Integer(1), Value::text("alice")]),
),
(
2,
Row::from_values(vec![Value::Integer(2), Value::text("bob")]),
),
(
3,
Row::from_values(vec![Value::Integer(3), Value::text("carol")]),
),
];
let volume = seal_rows(&schema, &rows);
assert_eq!(volume.meta.row_count, 3);
assert_eq!(volume.columns[0].get_i64(0), 1);
assert_eq!(volume.columns[1].get_str(2), "carol");
assert!(volume.is_sorted(0)); }
#[test]
fn test_seal_and_persist() {
let dir = tempfile::tempdir().unwrap();
let schema = SchemaBuilder::new("test")
.column("id", DataType::Integer, false, true)
.column("val", DataType::Float, false, false)
.build();
let rows = vec![
(
1,
Row::from_values(vec![Value::Integer(1), Value::Float(10.0)]),
),
(
2,
Row::from_values(vec![Value::Integer(2), Value::Float(20.0)]),
),
];
let vol_dir = dir.path().join("volumes");
let (volume, path, _vol_id) =
seal_and_persist(&schema, &rows, &vol_dir, "test_table").unwrap();
assert_eq!(volume.meta.row_count, 2);
assert!(path.exists());
let loaded = io::read_volume_from_disk(&path).unwrap();
assert_eq!(loaded.meta.row_count, 2);
assert_eq!(loaded.columns[0].get_i64(0), 1);
assert_eq!(loaded.meta.stats.sum(1), 30.0);
}
}