use pyo3::prelude::*;
use pyo3::exceptions::{PyValueError, PyImportError};
use pyo3::types::{PyDict, PyList};
use std::collections::HashSet;
use arrow::pyarrow::{ToPyArrow, FromPyArrow};
use arrow::record_batch::RecordBatch;
use crate::reader;
use crate::writer;
#[pyclass(name = "QvdTable")]
pub struct PyQvdTable {
inner: reader::QvdTable,
}
#[pymethods]
impl PyQvdTable {
#[staticmethod]
fn load(path: &str) -> PyResult<Self> {
let table = reader::read_qvd_file(path)
.map_err(|e| PyValueError::new_err(format!("{}", e)))?;
Ok(PyQvdTable { inner: table })
}
fn save(&self, path: &str) -> PyResult<()> {
writer::write_qvd_file(&self.inner, path)
.map_err(|e| PyValueError::new_err(format!("{}", e)))
}
#[getter]
fn table_name(&self) -> &str {
&self.inner.header.table_name
}
#[getter]
fn num_rows(&self) -> usize {
self.inner.num_rows()
}
#[getter]
fn num_cols(&self) -> usize {
self.inner.num_cols()
}
#[getter]
fn columns(&self) -> Vec<String> {
self.inner.header.fields.iter().map(|f| f.field_name.clone()).collect()
}
fn get(&self, row: usize, col: usize) -> PyResult<Option<String>> {
if row >= self.inner.num_rows() || col >= self.inner.num_cols() {
return Err(PyValueError::new_err("Index out of bounds"));
}
Ok(self.inner.get(row, col).as_string())
}
fn get_by_name(&self, row: usize, col_name: &str) -> PyResult<Option<String>> {
match self.inner.get_by_name(row, col_name) {
Some(val) => Ok(val.as_string()),
None => Err(PyValueError::new_err(format!("Column '{}' not found", col_name))),
}
}
fn column_values(&self, col: usize) -> PyResult<Vec<Option<String>>> {
if col >= self.inner.num_cols() {
return Err(PyValueError::new_err("Column index out of bounds"));
}
Ok(self.inner.column_strings(col))
}
fn column_values_by_name(&self, col_name: &str) -> PyResult<Vec<Option<String>>> {
let col = self.inner.header.fields.iter()
.position(|f| f.field_name == col_name)
.ok_or_else(|| PyValueError::new_err(format!("Column '{}' not found", col_name)))?;
Ok(self.inner.column_strings(col))
}
fn to_dict<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
let dict = PyDict::new(py);
for (col_idx, field) in self.inner.header.fields.iter().enumerate() {
let values = self.inner.column_strings(col_idx);
let py_list = PyList::new(py, values.iter().map(|v| v.as_deref()))?;
dict.set_item(&field.field_name, py_list)?;
}
Ok(dict)
}
fn symbols(&self, col_name: &str) -> PyResult<Vec<String>> {
let col = self.inner.header.fields.iter()
.position(|f| f.field_name == col_name)
.ok_or_else(|| PyValueError::new_err(format!("Column '{}' not found", col_name)))?;
Ok(self.inner.symbols[col].iter().map(|s| s.to_string_repr()).collect())
}
fn num_symbols(&self, col_name: &str) -> PyResult<usize> {
let col = self.inner.header.fields.iter()
.position(|f| f.field_name == col_name)
.ok_or_else(|| PyValueError::new_err(format!("Column '{}' not found", col_name)))?;
Ok(self.inner.symbols[col].len())
}
#[pyo3(signature = (n=None))]
fn head<'a>(&self, py: Python<'a>, n: Option<usize>) -> PyResult<Bound<'a, PyList>> {
let n = n.unwrap_or(10).min(self.inner.num_rows());
let rows = PyList::empty(py);
for row in 0..n {
let dict = PyDict::new(py);
for (col, field) in self.inner.header.fields.iter().enumerate() {
let val = self.inner.get(row, col).as_string();
dict.set_item(&field.field_name, val)?;
}
rows.append(dict)?;
}
Ok(rows)
}
#[staticmethod]
fn from_parquet(path: &str) -> PyResult<Self> {
let table = crate::parquet::read_parquet_to_qvd(path)
.map_err(|e| PyValueError::new_err(format!("{}", e)))?;
Ok(PyQvdTable { inner: table })
}
#[pyo3(signature = (path, compression=None))]
fn save_as_parquet(&self, path: &str, compression: Option<&str>) -> PyResult<()> {
let comp = crate::parquet::ParquetCompression::parse(compression.unwrap_or("snappy"))
.map_err(|e| PyValueError::new_err(format!("{}", e)))?;
crate::parquet::write_qvd_table_to_parquet(&self.inner, path, comp)
.map_err(|e| PyValueError::new_err(format!("{}", e)))
}
fn to_arrow<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let batch = crate::parquet::qvd_to_record_batch(&self.inner)
.map_err(|e| PyValueError::new_err(format!("{}", e)))?;
batch.to_pyarrow(py)
}
#[staticmethod]
#[pyo3(signature = (batch, table_name=None))]
fn from_arrow(batch: &Bound<'_, PyAny>, table_name: Option<&str>) -> PyResult<Self> {
let batch = RecordBatch::from_pyarrow_bound(batch)
.map_err(|e| PyValueError::new_err(format!("Invalid RecordBatch: {}", e)))?;
let table = crate::parquet::record_batch_to_qvd(&batch, table_name.unwrap_or("table"))
.map_err(|e| PyValueError::new_err(format!("{}", e)))?;
Ok(PyQvdTable { inner: table })
}
fn to_pandas<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let batch = crate::parquet::qvd_to_record_batch(&self.inner)
.map_err(|e| PyValueError::new_err(format!("{}", e)))?;
let pyarrow_batch = batch.to_pyarrow(py)?;
let pa = py.import("pyarrow")
.map_err(|_| PyImportError::new_err("pyarrow is required: pip install pyarrow"))?;
let pa_table_cls = pa.getattr("Table")?;
let table = pa_table_cls.call_method1("from_batches", (vec![pyarrow_batch],))?;
let df = table.call_method0("to_pandas")?;
Ok(df)
}
fn to_polars<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let batch = crate::parquet::qvd_to_record_batch(&self.inner)
.map_err(|e| PyValueError::new_err(format!("{}", e)))?;
let pyarrow_batch = batch.to_pyarrow(py)?;
let pa = py.import("pyarrow")
.map_err(|_| PyImportError::new_err("pyarrow is required: pip install pyarrow"))?;
let pa_table_cls = pa.getattr("Table")?;
let table = pa_table_cls.call_method1("from_batches", (vec![pyarrow_batch],))?;
let pl = py.import("polars")
.map_err(|_| PyImportError::new_err("polars is required: pip install polars"))?;
let df = pl.call_method1("from_arrow", (table,))?;
Ok(df)
}
fn filter_by_values(&self, col_name: &str, values: Vec<String>) -> PyResult<PyQvdTable> {
let refs: Vec<&str> = values.iter().map(|s| s.as_str()).collect();
let matching = self.inner.filter_by_values(col_name, &refs);
if matching.is_empty() {
let filtered = self.inner.subset_rows(&[]);
return Ok(PyQvdTable { inner: filtered });
}
let filtered = self.inner.subset_rows(&matching);
Ok(PyQvdTable { inner: filtered })
}
fn subset_rows(&self, row_indices: Vec<usize>) -> PyResult<PyQvdTable> {
let filtered = self.inner.subset_rows(&row_indices);
Ok(PyQvdTable { inner: filtered })
}
fn normalize(&mut self) {
self.inner.normalize();
}
fn __repr__(&self) -> String {
format!(
"QvdTable(table='{}', rows={}, cols={})",
self.inner.header.table_name,
self.inner.num_rows(),
self.inner.num_cols()
)
}
fn __len__(&self) -> usize {
self.inner.num_rows()
}
}
#[pyclass(name = "ExistsIndex")]
pub struct PyExistsIndex {
values: HashSet<String>,
col_name: String,
}
#[pymethods]
impl PyExistsIndex {
#[new]
fn new(table: &PyQvdTable, col_name: &str) -> PyResult<Self> {
let col = table.inner.header.fields.iter()
.position(|f| f.field_name == col_name)
.ok_or_else(|| PyValueError::new_err(format!("Column '{}' not found", col_name)))?;
let mut values = HashSet::with_capacity(table.inner.symbols[col].len());
for sym in &table.inner.symbols[col] {
values.insert(sym.to_string_repr());
}
Ok(PyExistsIndex {
values,
col_name: col_name.to_string(),
})
}
#[staticmethod]
fn from_values(values: Vec<String>) -> Self {
let set: HashSet<String> = values.into_iter().collect();
PyExistsIndex {
values: set,
col_name: "<values>".to_string(),
}
}
fn exists(&self, value: &str) -> bool {
self.values.contains(value)
}
fn exists_many(&self, values: Vec<String>) -> Vec<bool> {
values.iter().map(|v| self.values.contains(v.as_str())).collect()
}
fn __len__(&self) -> usize {
self.values.len()
}
fn __contains__(&self, value: &str) -> bool {
self.values.contains(value)
}
fn __repr__(&self) -> String {
format!("ExistsIndex(field='{}', values={})", self.col_name, self.values.len())
}
}
#[pyfunction]
fn filter_exists(table: &PyQvdTable, col_name: &str, index: &PyExistsIndex) -> PyResult<Vec<usize>> {
let col_idx = table.inner.header.fields.iter()
.position(|f| f.field_name == col_name)
.ok_or_else(|| PyValueError::new_err(format!("Column '{}' not found", col_name)))?;
let symbol_matches: Vec<bool> = table.inner.symbols[col_idx]
.iter()
.map(|sym| index.values.contains(&sym.to_string_repr()))
.collect();
let mut matching_rows = Vec::new();
for row in 0..table.inner.num_rows() {
let sym_idx = table.inner.row_indices[col_idx][row];
if sym_idx >= 0 {
let sym_idx = sym_idx as usize;
if sym_idx < symbol_matches.len() && symbol_matches[sym_idx] {
matching_rows.push(row);
}
}
}
Ok(matching_rows)
}
#[pyfunction]
fn read_qvd(path: &str) -> PyResult<PyQvdTable> {
PyQvdTable::load(path)
}
#[pyfunction]
fn read_qvd_to_arrow<'py>(py: Python<'py>, path: &str) -> PyResult<Bound<'py, PyAny>> {
let table = reader::read_qvd_file(path)
.map_err(|e| PyValueError::new_err(format!("{}", e)))?;
let batch = crate::parquet::qvd_to_record_batch(&table)
.map_err(|e| PyValueError::new_err(format!("{}", e)))?;
batch.to_pyarrow(py)
}
#[pyfunction]
fn read_qvd_to_pandas<'py>(py: Python<'py>, path: &str) -> PyResult<Bound<'py, PyAny>> {
let t = PyQvdTable::load(path)?;
t.to_pandas(py)
}
#[pyfunction]
fn read_qvd_to_polars<'py>(py: Python<'py>, path: &str) -> PyResult<Bound<'py, PyAny>> {
let t = PyQvdTable::load(path)?;
t.to_polars(py)
}
#[pyfunction]
fn convert_parquet_to_qvd(parquet_path: &str, qvd_path: &str) -> PyResult<()> {
crate::parquet::convert_parquet_to_qvd(parquet_path, qvd_path)
.map_err(|e| PyValueError::new_err(format!("{}", e)))
}
#[pyfunction]
#[pyo3(signature = (qvd_path, parquet_path, compression=None))]
fn convert_qvd_to_parquet(qvd_path: &str, parquet_path: &str, compression: Option<&str>) -> PyResult<()> {
let comp = crate::parquet::ParquetCompression::parse(compression.unwrap_or("snappy"))
.map_err(|e| PyValueError::new_err(format!("{}", e)))?;
crate::parquet::convert_qvd_to_parquet(qvd_path, parquet_path, comp)
.map_err(|e| PyValueError::new_err(format!("{}", e)))
}
#[pyfunction]
#[pyo3(signature = (path, filter_col, index, select=None, chunk_size=None))]
fn read_qvd_filtered(
path: &str,
filter_col: &str,
index: &PyExistsIndex,
select: Option<Vec<String>>,
chunk_size: Option<usize>,
) -> PyResult<PyQvdTable> {
let rust_index = crate::exists::ExistsIndex::from_values(
&index.values.iter().map(|s| s.as_str()).collect::<Vec<_>>()
);
let mut stream = crate::streaming::open_qvd_stream(path)
.map_err(|e| PyValueError::new_err(format!("{}", e)))?;
let select_refs: Option<Vec<&str>> = select.as_ref().map(|v| v.iter().map(|s| s.as_str()).collect());
let chunk = chunk_size.unwrap_or(65536);
let table = stream.read_filtered(filter_col, &rust_index, select_refs.as_deref(), chunk)
.map_err(|e| PyValueError::new_err(format!("{}", e)))?;
Ok(PyQvdTable { inner: table })
}
#[pyfunction]
fn register_duckdb<'py>(conn: &Bound<'py, PyAny>, table_name: &str, path: &str) -> PyResult<()> {
let py = conn.py();
let table = reader::read_qvd_file(path)
.map_err(|e| PyValueError::new_err(format!("{}", e)))?;
let batch = crate::parquet::qvd_to_record_batch(&table)
.map_err(|e| PyValueError::new_err(format!("{}", e)))?;
let pyarrow_batch = batch.to_pyarrow(py)?;
let pa = py.import("pyarrow")
.map_err(|_| PyImportError::new_err("pyarrow is required: pip install pyarrow"))?;
let pa_table_cls = pa.getattr("Table")?;
let arrow_table = pa_table_cls.call_method1("from_batches", (vec![pyarrow_batch],))?;
conn.call_method1("register", (table_name, arrow_table))?;
Ok(())
}
#[pyfunction]
#[pyo3(signature = (conn, table_name, path, filter_col, index, select=None, chunk_size=None))]
fn register_duckdb_filtered<'py>(
conn: &Bound<'py, PyAny>,
table_name: &str,
path: &str,
filter_col: &str,
index: &PyExistsIndex,
select: Option<Vec<String>>,
chunk_size: Option<usize>,
) -> PyResult<()> {
let py = conn.py();
let rust_index = crate::exists::ExistsIndex::from_values(
&index.values.iter().map(|s| s.as_str()).collect::<Vec<_>>()
);
let mut stream = crate::streaming::open_qvd_stream(path)
.map_err(|e| PyValueError::new_err(format!("{}", e)))?;
let select_refs: Option<Vec<&str>> = select.as_ref().map(|v| v.iter().map(|s| s.as_str()).collect());
let chunk = chunk_size.unwrap_or(65536);
let qvd_table = stream.read_filtered(filter_col, &rust_index, select_refs.as_deref(), chunk)
.map_err(|e| PyValueError::new_err(format!("{}", e)))?;
let batch = crate::parquet::qvd_to_record_batch(&qvd_table)
.map_err(|e| PyValueError::new_err(format!("{}", e)))?;
let pyarrow_batch = batch.to_pyarrow(py)?;
let pa = py.import("pyarrow")
.map_err(|_| PyImportError::new_err("pyarrow is required: pip install pyarrow"))?;
let pa_table_cls = pa.getattr("Table")?;
let arrow_table = pa_table_cls.call_method1("from_batches", (vec![pyarrow_batch],))?;
conn.call_method1("register", (table_name, arrow_table))?;
Ok(())
}
#[pyfunction]
#[pyo3(signature = (conn, folder_paths, recursive=None, glob=None, max_file_size_mb=None))]
fn register_duckdb_folder<'py>(
conn: &Bound<'py, PyAny>,
folder_paths: &Bound<'py, PyAny>,
recursive: Option<bool>,
glob: Option<&str>,
max_file_size_mb: Option<u64>,
) -> PyResult<Vec<String>> {
let py = conn.py();
let pa = py.import("pyarrow")
.map_err(|_| PyImportError::new_err("pyarrow is required: pip install pyarrow"))?;
let pa_table_cls = pa.getattr("Table")?;
let recursive = recursive.unwrap_or(false);
let glob_pattern = glob.unwrap_or("*.qvd");
let max_size = max_file_size_mb.unwrap_or(500) * 1_048_576;
let paths: Vec<String> = if let Ok(s) = folder_paths.extract::<String>() {
vec![s]
} else if let Ok(list) = folder_paths.extract::<Vec<String>>() {
list
} else {
return Err(PyValueError::new_err("folder_paths must be a string or list of strings"));
};
let mut registered = Vec::new();
let mut errors = Vec::new();
for folder in &paths {
collect_qvd_files(
std::path::Path::new(folder),
recursive,
glob_pattern,
max_size,
py,
conn,
&pa_table_cls,
&mut registered,
&mut errors,
)?;
}
if !errors.is_empty() {
let py_warnings = py.import("warnings")?;
for err in &errors {
py_warnings.call_method1("warn", (err.as_str(),))?;
}
}
registered.sort();
Ok(registered)
}
fn collect_qvd_files<'py>(
dir: &std::path::Path,
recursive: bool,
glob_pattern: &str,
max_size: u64,
py: Python<'py>,
conn: &Bound<'py, PyAny>,
pa_table_cls: &Bound<'py, PyAny>,
registered: &mut Vec<String>,
errors: &mut Vec<String>,
) -> PyResult<()> {
let entries = match std::fs::read_dir(dir) {
Ok(e) => e,
Err(e) => {
errors.push(format!("Cannot read '{}': {}", dir.display(), e));
return Ok(());
}
};
for entry in entries.filter_map(|e| e.ok()) {
let path = entry.path();
if path.is_dir() && recursive {
collect_qvd_files(&path, recursive, glob_pattern, max_size, py, conn, pa_table_cls, registered, errors)?;
continue;
}
if !path.is_file() { continue; }
let name_lower = path.file_name()
.map(|n| n.to_string_lossy().to_lowercase())
.unwrap_or_default();
if !name_lower.ends_with(".qvd") { continue; }
if glob_pattern != "*.qvd" {
let pattern_lower = glob_pattern.to_lowercase();
let matches = if let Some(star_pos) = pattern_lower.find('*') {
let prefix = &pattern_lower[..star_pos];
let suffix = &pattern_lower[star_pos + 1..];
if prefix.is_empty() && suffix.is_empty() {
true
} else if prefix.is_empty() && suffix.starts_with('.') {
name_lower.ends_with(suffix)
} else if prefix.is_empty() {
let inner = suffix.strip_suffix(".qvd").or(suffix.strip_suffix('*')).unwrap_or(suffix);
name_lower.contains(inner)
} else if suffix.is_empty() || suffix == ".qvd" {
name_lower.starts_with(prefix)
} else {
name_lower.starts_with(prefix) && name_lower.ends_with(suffix)
}
} else {
name_lower == pattern_lower
};
if !matches { continue; }
}
let size = entry.metadata().map(|m| m.len()).unwrap_or(0);
if size > max_size {
errors.push(format!("Skipped '{}': {} MB exceeds limit", path.display(), size / 1_048_576));
continue;
}
let table_name = path.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("unknown")
.to_string();
let path_str = match path.to_str() {
Some(s) => s,
None => { errors.push(format!("Invalid path: {}", path.display())); continue; }
};
let qvd_table = match reader::read_qvd_file(path_str) {
Ok(t) => t,
Err(e) => { errors.push(format!("{}: {}", table_name, e)); continue; }
};
let batch = match crate::parquet::qvd_to_record_batch(&qvd_table) {
Ok(b) => b,
Err(e) => { errors.push(format!("{}: {}", table_name, e)); continue; }
};
let pyarrow_batch = match batch.to_pyarrow(py) {
Ok(b) => b,
Err(e) => { errors.push(format!("{}: {}", table_name, e)); continue; }
};
let arrow_table = match pa_table_cls.call_method1("from_batches", (vec![pyarrow_batch],)) {
Ok(t) => t,
Err(e) => { errors.push(format!("{}: {}", table_name, e)); continue; }
};
match conn.call_method1("register", (&table_name, arrow_table)) {
Ok(_) => registered.push(table_name),
Err(e) => errors.push(format!("{}: {}", table_name, e)),
}
}
Ok(())
}
#[pymodule]
fn qvd(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyQvdTable>()?;
m.add_class::<PyExistsIndex>()?;
m.add_function(wrap_pyfunction!(read_qvd, m)?)?;
m.add_function(wrap_pyfunction!(filter_exists, m)?)?;
m.add_function(wrap_pyfunction!(convert_parquet_to_qvd, m)?)?;
m.add_function(wrap_pyfunction!(convert_qvd_to_parquet, m)?)?;
m.add_function(wrap_pyfunction!(read_qvd_to_arrow, m)?)?;
m.add_function(wrap_pyfunction!(read_qvd_to_pandas, m)?)?;
m.add_function(wrap_pyfunction!(read_qvd_to_polars, m)?)?;
m.add_function(wrap_pyfunction!(read_qvd_filtered, m)?)?;
m.add_function(wrap_pyfunction!(register_duckdb, m)?)?;
m.add_function(wrap_pyfunction!(register_duckdb_filtered, m)?)?;
m.add_function(wrap_pyfunction!(register_duckdb_folder, m)?)?;
Ok(())
}