use std::{
cmp::min,
collections::HashSet,
str::{from_utf8, Utf8Error},
};
use crate::{
columnar_bulk_inserter::BoundInputSlice,
fixed_sized::Pod,
handles::{CDataMut, Statement, StatementRef},
parameter::WithDataType,
result_set_metadata::utf8_display_sizes,
Error, ResultSetMetadata, RowSetBuffer,
};
use super::{Indicator, TextColumn};
impl<C: ColumnBuffer> ColumnarBuffer<C> {
pub fn new(columns: Vec<(u16, C)>) -> Self {
let capacity = columns
.iter()
.map(|(_, col)| col.capacity())
.min()
.unwrap_or(0);
let mut indices = HashSet::new();
if columns
.iter()
.any(move |&(col_index, _)| !indices.insert(col_index))
{
panic!("Column indices must be unique.")
}
unsafe { Self::new_unchecked(capacity, columns) }
}
pub unsafe fn new_unchecked(capacity: usize, columns: Vec<(u16, C)>) -> Self {
ColumnarBuffer {
num_rows: Box::new(0),
row_capacity: capacity,
columns,
}
}
pub fn num_rows(&self) -> usize {
*self.num_rows
}
pub fn num_cols(&self) -> usize {
self.columns.len()
}
pub fn column(&self, buffer_index: usize) -> C::View<'_> {
self.columns[buffer_index].1.view(*self.num_rows)
}
}
unsafe impl<C> RowSetBuffer for ColumnarBuffer<C>
where
C: ColumnBuffer,
{
fn bind_type(&self) -> usize {
0 }
fn row_array_size(&self) -> usize {
self.row_capacity
}
fn mut_num_fetch_rows(&mut self) -> &mut usize {
self.num_rows.as_mut()
}
unsafe fn bind_colmuns_to_cursor(&mut self, mut cursor: StatementRef<'_>) -> Result<(), Error> {
for (col_number, column) in &mut self.columns {
cursor.bind_col(*col_number, column).into_result(&cursor)?;
}
Ok(())
}
}
pub struct ColumnarBuffer<C> {
num_rows: Box<usize>,
row_capacity: usize,
columns: Vec<(u16, C)>,
}
pub unsafe trait ColumnBuffer: CDataMut {
type View<'a>
where
Self: 'a;
fn view(&self, valid_rows: usize) -> Self::View<'_>;
fn fill_default(&mut self, from: usize, to: usize);
fn capacity(&self) -> usize;
}
unsafe impl<T> ColumnBuffer for WithDataType<T>
where
T: ColumnBuffer,
{
type View<'a> = T::View<'a> where T: 'a;
fn view(&self, valid_rows: usize) -> T::View<'_> {
self.value.view(valid_rows)
}
fn fill_default(&mut self, from: usize, to: usize) {
self.value.fill_default(from, to)
}
fn capacity(&self) -> usize {
self.value.capacity()
}
}
unsafe impl<'a, T> BoundInputSlice<'a> for WithDataType<T>
where
T: BoundInputSlice<'a>,
{
type SliceMut = T::SliceMut;
unsafe fn as_view_mut(
&'a mut self,
parameter_index: u16,
stmt: StatementRef<'a>,
) -> Self::SliceMut {
self.value.as_view_mut(parameter_index, stmt)
}
}
pub type TextRowSet = ColumnarBuffer<TextColumn<u8>>;
impl TextRowSet {
pub fn for_cursor(
batch_size: usize,
cursor: &mut impl ResultSetMetadata,
max_str_limit: Option<usize>,
) -> Result<TextRowSet, Error> {
let buffers = utf8_display_sizes(cursor)?
.enumerate()
.map(|(buffer_index, reported_len)| {
let buffer_index = buffer_index as u16;
let col_index = buffer_index + 1;
let max_str_len = reported_len?;
let buffer = if let Some(upper_bound) = max_str_limit {
let max_str_len = if max_str_len == 0 {
upper_bound
} else {
min(max_str_len, upper_bound)
};
TextColumn::new(batch_size, max_str_len)
} else {
TextColumn::try_new(batch_size, max_str_len).map_err(|source| {
Error::TooLargeColumnBufferSize {
buffer_index,
num_elements: source.num_elements,
element_size: source.element_size,
}
})?
};
Ok((col_index, buffer))
})
.collect::<Result<_, _>>()?;
Ok(TextRowSet {
row_capacity: batch_size,
num_rows: Box::new(0),
columns: buffers,
})
}
pub fn from_max_str_lens(
row_capacity: usize,
max_str_lengths: impl IntoIterator<Item = usize>,
) -> Result<Self, Error> {
let buffers = max_str_lengths
.into_iter()
.enumerate()
.map(|(index, max_str_len)| {
Ok((
(index + 1).try_into().unwrap(),
TextColumn::try_new(row_capacity, max_str_len)
.map_err(|source| source.add_context(index.try_into().unwrap()))?,
))
})
.collect::<Result<_, _>>()?;
Ok(TextRowSet {
row_capacity,
num_rows: Box::new(0),
columns: buffers,
})
}
pub fn at(&self, buffer_index: usize, row_index: usize) -> Option<&[u8]> {
assert!(row_index < *self.num_rows as usize);
self.columns[buffer_index].1.value_at(row_index)
}
pub fn at_as_str(&self, col_index: usize, row_index: usize) -> Result<Option<&str>, Utf8Error> {
self.at(col_index, row_index).map(from_utf8).transpose()
}
pub fn indicator_at(&self, buf_index: usize, row_index: usize) -> Indicator {
assert!(row_index < *self.num_rows as usize);
self.columns[buf_index].1.indicator_at(row_index)
}
pub fn max_len(&self, buf_index: usize) -> usize {
self.columns[buf_index].1.max_len()
}
}
unsafe impl<T> ColumnBuffer for Vec<T>
where
T: Pod,
{
type View<'a> = &'a [T];
fn view(&self, valid_rows: usize) -> &[T] {
&self[..valid_rows]
}
fn fill_default(&mut self, from: usize, to: usize) {
for item in &mut self[from..to] {
*item = Default::default();
}
}
fn capacity(&self) -> usize {
self.len()
}
}
#[cfg(test)]
mod tests {
use crate::buffers::{BufferDesc, ColumnarAnyBuffer};
#[test]
#[should_panic(expected = "Column indices must be unique.")]
fn assert_unique_column_indices() {
let bd = BufferDesc::I32 { nullable: false };
ColumnarAnyBuffer::from_descs_and_indices(1, [(1, bd), (2, bd), (1, bd)].iter().cloned());
}
}