use std::io::{BufRead, Read, Seek, SeekFrom};
use crate::error::{QvdError, QvdResult};
use crate::header::{parse_xml_header, QvdTableHeader};
use crate::index::read_field_index;
use crate::symbol::read_symbols;
use crate::value::{QvdSymbol, QvdValue};
pub struct QvdStreamReader<R: Read + Seek + BufRead> {
reader: R,
pub header: QvdTableHeader,
pub symbols: Vec<Vec<QvdSymbol>>,
binary_start: u64,
current_row: usize,
}
pub struct QvdChunk {
pub columns: Vec<Vec<QvdValue>>,
pub num_rows: usize,
pub start_row: usize,
}
impl<R: Read + Seek + BufRead> QvdStreamReader<R> {
pub fn open(mut reader: R) -> QvdResult<Self> {
let mut xml_bytes = Vec::new();
reader.read_until(0, &mut xml_bytes)?;
let binary_start = xml_bytes.len() as u64;
if xml_bytes.last() == Some(&0) {
xml_bytes.pop();
}
let xml_string = String::from_utf8(xml_bytes)
.map_err(|e| QvdError::Format(format!("XML header is not valid UTF-8: {}", e)))?;
let header = parse_xml_header(&xml_string)?;
let symbol_section_size = header.offset;
let mut symbol_buf = vec![0u8; symbol_section_size];
reader.seek(SeekFrom::Start(binary_start))?;
reader.read_exact(&mut symbol_buf)?;
let mut symbols = Vec::with_capacity(header.fields.len());
for field in &header.fields {
let field_symbols = read_symbols(&symbol_buf, field)?;
symbols.push(field_symbols);
}
Ok(QvdStreamReader {
reader,
header,
symbols,
binary_start,
current_row: 0,
})
}
pub fn total_rows(&self) -> usize {
self.header.no_of_records
}
pub fn remaining_rows(&self) -> usize {
self.header.no_of_records.saturating_sub(self.current_row)
}
pub fn next_chunk(&mut self, chunk_size: usize) -> QvdResult<Option<QvdChunk>> {
if self.current_row >= self.header.no_of_records {
return Ok(None);
}
let rows_to_read = chunk_size.min(self.remaining_rows());
let record_byte_size = self.header.record_byte_size;
let start_row = self.current_row;
let index_file_offset = self.binary_start
+ self.header.offset as u64
+ (self.current_row as u64 * record_byte_size as u64);
self.reader.seek(SeekFrom::Start(index_file_offset))?;
let buf_size = rows_to_read * record_byte_size;
let mut buf = vec![0u8; buf_size];
self.reader.read_exact(&mut buf)?;
let num_cols = self.header.fields.len();
let mut columns: Vec<Vec<QvdValue>> = (0..num_cols)
.map(|_| Vec::with_capacity(rows_to_read))
.collect();
for row in 0..rows_to_read {
let row_start = row * record_byte_size;
let record = &buf[row_start..row_start + record_byte_size];
for (col_idx, field) in self.header.fields.iter().enumerate() {
let idx = read_field_index(record, field);
let value = if idx < 0 {
QvdValue::Null
} else {
let sym_idx = idx as usize;
if sym_idx < self.symbols[col_idx].len() {
QvdValue::Symbol(self.symbols[col_idx][sym_idx].clone())
} else {
QvdValue::Null
}
};
columns[col_idx].push(value);
}
}
self.current_row += rows_to_read;
Ok(Some(QvdChunk {
columns,
num_rows: rows_to_read,
start_row,
}))
}
#[allow(clippy::type_complexity)]
pub fn next_chunk_indices(&mut self, chunk_size: usize) -> QvdResult<Option<(Vec<Vec<i64>>, usize, usize)>> {
if self.current_row >= self.header.no_of_records {
return Ok(None);
}
let rows_to_read = chunk_size.min(self.remaining_rows());
let record_byte_size = self.header.record_byte_size;
let start_row = self.current_row;
let index_file_offset = self.binary_start
+ self.header.offset as u64
+ (self.current_row as u64 * record_byte_size as u64);
self.reader.seek(SeekFrom::Start(index_file_offset))?;
let buf_size = rows_to_read * record_byte_size;
let mut buf = vec![0u8; buf_size];
self.reader.read_exact(&mut buf)?;
let num_cols = self.header.fields.len();
let mut columns: Vec<Vec<i64>> = (0..num_cols)
.map(|_| Vec::with_capacity(rows_to_read))
.collect();
for row in 0..rows_to_read {
let row_start = row * record_byte_size;
let record = &buf[row_start..row_start + record_byte_size];
for (col_idx, field) in self.header.fields.iter().enumerate() {
let idx = read_field_index(record, field);
columns[col_idx].push(idx);
}
}
self.current_row += rows_to_read;
Ok(Some((columns, rows_to_read, start_row)))
}
pub fn read_filtered(
&mut self,
filter_col: &str,
exists_index: &crate::exists::ExistsIndex,
select_cols: Option<&[&str]>,
chunk_size: usize,
) -> QvdResult<crate::reader::QvdTable> {
let filter_col_idx = self.header.fields.iter()
.position(|f| f.field_name == filter_col)
.ok_or_else(|| QvdError::Format(format!("Column '{}' not found", filter_col)))?;
let output_col_indices: Vec<usize> = match select_cols {
Some(names) => {
let mut indices = Vec::with_capacity(names.len());
for name in names {
let idx = self.header.fields.iter()
.position(|f| f.field_name == *name)
.ok_or_else(|| QvdError::Format(format!("Column '{}' not found", name)))?;
indices.push(idx);
}
indices
}
None => (0..self.header.fields.len()).collect(),
};
let symbol_matches: Vec<bool> = self.symbols[filter_col_idx]
.iter()
.map(|sym| exists_index.exists(&sym.to_string_repr()))
.collect();
let num_output_cols = output_col_indices.len();
let filter_field = &self.header.fields[filter_col_idx];
let record_byte_size = self.header.record_byte_size;
let output_fields: Vec<crate::header::QvdFieldHeader> = output_col_indices.iter()
.map(|&ci| self.header.fields[ci].clone())
.collect();
let filter_in_output = output_col_indices.iter().position(|&ci| ci == filter_col_idx);
let mut result_indices: Vec<Vec<i64>> = (0..num_output_cols)
.map(|_| Vec::new())
.collect();
let mut total_matched: usize = 0;
while self.current_row < self.header.no_of_records {
let rows_to_read = chunk_size.min(self.remaining_rows());
let index_file_offset = self.binary_start
+ self.header.offset as u64
+ (self.current_row as u64 * record_byte_size as u64);
self.reader.seek(SeekFrom::Start(index_file_offset))?;
let buf_size = rows_to_read * record_byte_size;
let mut buf = vec![0u8; buf_size];
self.reader.read_exact(&mut buf)?;
for row in 0..rows_to_read {
let row_start = row * record_byte_size;
let record = &buf[row_start..row_start + record_byte_size];
let filter_idx = read_field_index(record, filter_field);
let matches = if filter_idx >= 0 {
let si = filter_idx as usize;
si < symbol_matches.len() && symbol_matches[si]
} else {
false
};
if matches {
for (out_idx, field) in output_fields.iter().enumerate() {
if Some(out_idx) == filter_in_output {
result_indices[out_idx].push(filter_idx);
} else {
let idx = read_field_index(record, field);
result_indices[out_idx].push(idx);
}
}
total_matched += 1;
}
}
self.current_row += rows_to_read;
}
let mut header = self.header.clone();
header.fields = output_fields;
header.no_of_records = total_matched;
header.offset = 0;
header.length = 0;
let mut new_symbols: Vec<Vec<QvdSymbol>> = Vec::with_capacity(num_output_cols);
for (out_idx, field) in header.fields.iter_mut().enumerate() {
let orig_col_idx = output_col_indices[out_idx];
let old_syms = &self.symbols[orig_col_idx];
let col_indices = &mut result_indices[out_idx];
let mut used = vec![false; old_syms.len()];
for &idx in col_indices.iter() {
if idx >= 0 && (idx as usize) < old_syms.len() {
used[idx as usize] = true;
}
}
let mut old_to_new: Vec<i64> = vec![-1; old_syms.len()];
let mut compacted: Vec<QvdSymbol> = Vec::new();
for (old_idx, &is_used) in used.iter().enumerate() {
if is_used {
old_to_new[old_idx] = compacted.len() as i64;
compacted.push(old_syms[old_idx].clone());
}
}
let num_new_symbols = compacted.len();
for idx in col_indices.iter_mut() {
if *idx < 0 || (*idx as usize) >= old_syms.len() {
*idx = num_new_symbols as i64; } else {
*idx = old_to_new[*idx as usize];
}
}
field.no_of_symbols = num_new_symbols;
field.bias = 0;
field.bit_width = if num_new_symbols <= 1 { 0 } else { crate::index::bits_needed(num_new_symbols + 1) };
new_symbols.push(compacted);
}
let mut sortable: Vec<(usize, usize)> = header.fields.iter().enumerate()
.filter(|(_, f)| f.bit_width > 0)
.map(|(i, f)| (i, f.bit_width))
.collect();
sortable.sort_by(|a, b| b.1.cmp(&a.1));
let mut current_bit_offset = 0usize;
for (idx, _) in &sortable {
header.fields[*idx].bit_offset = current_bit_offset;
current_bit_offset += header.fields[*idx].bit_width;
}
for f in &mut header.fields {
if f.bit_width == 0 { f.bit_offset = 0; }
}
let total_bits = current_bit_offset;
header.record_byte_size = if total_bits == 0 { 0 } else { total_bits.div_ceil(8) };
Ok(crate::reader::QvdTable {
header,
symbols: new_symbols,
row_indices: result_indices,
raw_xml: Vec::new(),
raw_binary: Vec::new(),
})
}
pub fn reset(&mut self) -> QvdResult<()> {
self.current_row = 0;
Ok(())
}
pub fn column_names(&self) -> Vec<&str> {
self.header.fields.iter().map(|f| f.field_name.as_str()).collect()
}
}
pub fn open_qvd_stream(path: &str) -> QvdResult<QvdStreamReader<std::io::BufReader<std::fs::File>>> {
let file = std::fs::File::open(path)?;
let reader = std::io::BufReader::new(file);
QvdStreamReader::open(reader)
}