use std::{
collections::HashSet,
num::NonZeroUsize,
str::{Utf8Error, from_utf8},
};
use crate::{
Error, ResultSetMetadata, RowSetBuffer,
columnar_bulk_inserter::BoundInputSlice,
cursor::TruncationInfo,
fixed_sized::Pod,
handles::{CDataMut, Statement, StatementRef},
parameter::WithDataType,
result_set_metadata::utf8_display_sizes,
};
use super::{Indicator, TextColumn};
impl<C: ColumnBuffer> ColumnarBuffer<C> {
pub fn new(columns: Vec<(u16, C)>) -> Self {
let capacity = columns
.iter()
.map(|(_, col)| col.capacity())
.min()
.unwrap_or(0);
let mut indices = HashSet::new();
if columns
.iter()
.any(move |&(col_index, _)| !indices.insert(col_index))
{
panic!("Column indices must be unique.")
}
unsafe { Self::new_unchecked(capacity, columns) }
}
pub unsafe fn new_unchecked(capacity: usize, columns: Vec<(u16, C)>) -> Self {
ColumnarBuffer {
num_rows: Box::new(0),
row_capacity: capacity,
columns,
}
}
pub fn num_rows(&self) -> usize {
*self.num_rows
}
pub fn num_cols(&self) -> usize {
self.columns.len()
}
pub fn column(&self, buffer_index: usize) -> C::View<'_> {
self.columns[buffer_index].1.view(*self.num_rows)
}
}
unsafe impl<C> RowSetBuffer for ColumnarBuffer<C>
where
C: ColumnBuffer,
{
fn bind_type(&self) -> usize {
0 }
fn row_array_size(&self) -> usize {
self.row_capacity
}
fn mut_num_fetch_rows(&mut self) -> &mut usize {
self.num_rows.as_mut()
}
unsafe fn bind_colmuns_to_cursor(&mut self, mut cursor: StatementRef<'_>) -> Result<(), Error> {
unsafe {
for (col_number, column) in &mut self.columns {
cursor.bind_col(*col_number, column).into_result(&cursor)?;
}
}
Ok(())
}
fn find_truncation(&self) -> Option<TruncationInfo> {
self.columns
.iter()
.enumerate()
.find_map(|(buffer_index, (_col_index, col_buffer))| {
col_buffer
.has_truncated_values(*self.num_rows)
.map(|indicator| TruncationInfo {
indicator: indicator.length(),
buffer_index,
})
})
}
}
pub struct ColumnarBuffer<C> {
num_rows: Box<usize>,
row_capacity: usize,
columns: Vec<(u16, C)>,
}
pub unsafe trait ColumnBuffer: CDataMut {
type View<'a>
where
Self: 'a;
fn view(&self, valid_rows: usize) -> Self::View<'_>;
fn capacity(&self) -> usize;
fn has_truncated_values(&self, num_rows: usize) -> Option<Indicator>;
}
unsafe impl<T> ColumnBuffer for WithDataType<T>
where
T: ColumnBuffer,
{
type View<'a>
= T::View<'a>
where
T: 'a;
fn view(&self, valid_rows: usize) -> T::View<'_> {
self.value.view(valid_rows)
}
fn capacity(&self) -> usize {
self.value.capacity()
}
fn has_truncated_values(&self, num_rows: usize) -> Option<Indicator> {
self.value.has_truncated_values(num_rows)
}
}
unsafe impl<'a, T> BoundInputSlice<'a> for WithDataType<T>
where
T: BoundInputSlice<'a>,
{
type SliceMut = T::SliceMut;
unsafe fn as_view_mut(
&'a mut self,
parameter_index: u16,
stmt: StatementRef<'a>,
) -> Self::SliceMut {
unsafe { self.value.as_view_mut(parameter_index, stmt) }
}
}
pub type TextRowSet = ColumnarBuffer<TextColumn<u8>>;
impl TextRowSet {
pub fn for_cursor(
batch_size: usize,
cursor: &mut impl ResultSetMetadata,
max_str_limit: Option<usize>,
) -> Result<TextRowSet, Error> {
let buffers = utf8_display_sizes(cursor)?
.enumerate()
.map(|(buffer_index, reported_len)| {
let buffer_index = buffer_index as u16;
let col_index = buffer_index + 1;
let max_str_len = reported_len?;
let buffer = if let Some(upper_bound) = max_str_limit {
let max_str_len = max_str_len
.map(NonZeroUsize::get)
.unwrap_or(upper_bound)
.min(upper_bound);
TextColumn::new(batch_size, max_str_len)
} else {
let max_str_len = max_str_len.map(NonZeroUsize::get).ok_or(
Error::TooLargeColumnBufferSize {
buffer_index,
num_elements: batch_size,
element_size: usize::MAX,
},
)?;
TextColumn::try_new(batch_size, max_str_len).map_err(|source| {
Error::TooLargeColumnBufferSize {
buffer_index,
num_elements: source.num_elements,
element_size: source.element_size,
}
})?
};
Ok::<_, Error>((col_index, buffer))
})
.collect::<Result<_, _>>()?;
Ok(TextRowSet {
row_capacity: batch_size,
num_rows: Box::new(0),
columns: buffers,
})
}
pub fn from_max_str_lens(
row_capacity: usize,
max_str_lengths: impl IntoIterator<Item = usize>,
) -> Result<Self, Error> {
let buffers = max_str_lengths
.into_iter()
.enumerate()
.map(|(index, max_str_len)| {
Ok::<_, Error>((
(index + 1).try_into().unwrap(),
TextColumn::try_new(row_capacity, max_str_len)
.map_err(|source| source.add_context(index.try_into().unwrap()))?,
))
})
.collect::<Result<_, _>>()?;
Ok(TextRowSet {
row_capacity,
num_rows: Box::new(0),
columns: buffers,
})
}
pub fn at(&self, buffer_index: usize, row_index: usize) -> Option<&[u8]> {
assert!(row_index < *self.num_rows);
self.columns[buffer_index].1.value_at(row_index)
}
pub fn at_as_str(&self, col_index: usize, row_index: usize) -> Result<Option<&str>, Utf8Error> {
self.at(col_index, row_index).map(from_utf8).transpose()
}
pub fn indicator_at(&self, buf_index: usize, row_index: usize) -> Indicator {
assert!(row_index < *self.num_rows);
self.columns[buf_index].1.indicator_at(row_index)
}
pub fn max_len(&self, buf_index: usize) -> usize {
self.columns[buf_index].1.max_len()
}
}
unsafe impl<T> ColumnBuffer for Vec<T>
where
T: Pod,
{
type View<'a> = &'a [T];
fn view(&self, valid_rows: usize) -> &[T] {
&self[..valid_rows]
}
fn capacity(&self) -> usize {
self.len()
}
fn has_truncated_values(&self, _num_rows: usize) -> Option<Indicator> {
None
}
}
pub trait Resize {
fn resize(&mut self, new_capacity: usize);
}
impl<T> Resize for Vec<T>
where
T: Default + Clone,
{
fn resize(&mut self, new_capacity: usize) {
Vec::resize(self, new_capacity, T::default());
}
}
impl<T> Resize for WithDataType<T>
where
T: Resize,
{
fn resize(&mut self, new_capacity: usize) {
self.value.resize(new_capacity);
}
}
#[cfg(test)]
mod tests {
use super::Resize;
use crate::buffers::{BufferDesc, ColumnarAnyBuffer};
#[test]
#[should_panic(expected = "Column indices must be unique.")]
fn assert_unique_column_indices() {
let bd = BufferDesc::I32 { nullable: false };
ColumnarAnyBuffer::from_descs_and_indices(1, [(1, bd), (2, bd), (1, bd)].iter().cloned());
}
#[test]
fn vec_is_resize() {
let mut my_int_column_buffer = vec![1, 2];
Resize::resize(&mut my_int_column_buffer, 4);
assert_eq!(my_int_column_buffer[0], 1);
assert_eq!(my_int_column_buffer[1], 2);
assert_eq!(my_int_column_buffer[2], 0);
assert_eq!(my_int_column_buffer[3], 0);
}
}