odbc_api/buffers/columnar.rs
1use std::{
2 collections::HashSet,
3 num::NonZeroUsize,
4 str::{Utf8Error, from_utf8},
5};
6
7use crate::{
8 Error, ResultSetMetadata, RowSetBuffer,
9 columnar_bulk_inserter::BoundInputSlice,
10 cursor::TruncationInfo,
11 fixed_sized::Pod,
12 handles::{CDataMut, Statement, StatementRef},
13 parameter::WithDataType,
14 result_set_metadata::utf8_display_sizes,
15};
16
17use super::{Indicator, TextColumn};
18
19impl<C: ColumnBuffer> ColumnarBuffer<C> {
20 /// Create a new instance from columns with unique indicies. Capacity of the buffer will be the
21 /// minimum capacity of the columns. The constructed buffer is always empty (i.e. the number of
22 /// valid rows is considered to be zero).
23 ///
24 /// You do not want to call this constructor directly unless you want to provide your own buffer
25 /// implentation. Most users of this crate may want to use the constructors like
26 /// [`crate::buffers::ColumnarAnyBuffer::from_descs`] or
27 /// [`crate::buffers::TextRowSet::from_max_str_lens`] instead.
28 pub fn new(columns: Vec<(u16, C)>) -> Self {
29 // Assert capacity
30 let capacity = columns
31 .iter()
32 .map(|(_, col)| col.capacity())
33 .min()
34 .unwrap_or(0);
35
36 // Assert uniqueness of indices
37 let mut indices = HashSet::new();
38 if columns
39 .iter()
40 .any(move |&(col_index, _)| !indices.insert(col_index))
41 {
42 panic!("Column indices must be unique.")
43 }
44
45 unsafe { Self::new_unchecked(capacity, columns) }
46 }
47
48 /// # Safety
49 ///
50 /// * Indices must be unique
51 /// * Columns all must have enough `capacity`.
52 pub unsafe fn new_unchecked(capacity: usize, columns: Vec<(u16, C)>) -> Self {
53 ColumnarBuffer {
54 num_rows: Box::new(0),
55 row_capacity: capacity,
56 columns,
57 }
58 }
59
60 /// Number of valid rows in the buffer.
61 pub fn num_rows(&self) -> usize {
62 *self.num_rows
63 }
64
65 /// Return the number of columns in the row set.
66 pub fn num_cols(&self) -> usize {
67 self.columns.len()
68 }
69}
70
71impl<C: Slice> ColumnarBuffer<C> {
72 /// Use this method to gain read access to the actual column data.
73 ///
74 /// # Parameters
75 ///
76 /// * `buffer_index`: Please note that the buffer index is not identical to the ODBC column
77 /// index. For one it is zero based. It also indexes the buffer bound, and not the columns of
78 /// the output result set. This is important, because not every column needs to be bound. Some
79 /// columns may simply be ignored. That being said, if every column of the output is bound in
80 /// the buffer, in the same order in which they are enumerated in the result set, the
81 /// relationship between column index and buffer index is `buffer_index = column_index - 1`.
82 pub fn column(&self, buffer_index: usize) -> C::Slice<'_> {
83 self.columns[buffer_index].1.slice(*self.num_rows)
84 }
85}
86
87unsafe impl<C> RowSetBuffer for ColumnarBuffer<C>
88where
89 C: ColumnBuffer,
90{
91 fn bind_type(&self) -> usize {
92 0 // Specify columnar binding
93 }
94
95 fn row_array_size(&self) -> usize {
96 self.row_capacity
97 }
98
99 fn mut_num_fetch_rows(&mut self) -> &mut usize {
100 self.num_rows.as_mut()
101 }
102
103 unsafe fn bind_colmuns_to_cursor(&mut self, mut cursor: StatementRef<'_>) -> Result<(), Error> {
104 unsafe {
105 for (col_number, column) in &mut self.columns {
106 cursor.bind_col(*col_number, column).into_result(&cursor)?;
107 }
108 }
109 Ok(())
110 }
111
112 fn find_truncation(&self) -> Option<TruncationInfo> {
113 self.columns
114 .iter()
115 .enumerate()
116 .find_map(|(buffer_index, (_col_index, col_buffer))| {
117 col_buffer
118 .has_truncated_values(*self.num_rows)
119 .map(|indicator| TruncationInfo {
120 indicator: indicator.length(),
121 buffer_index,
122 })
123 })
124 }
125}
126
127/// A columnar buffer intended to be bound with [crate::Cursor::bind_buffer] in order to obtain
128/// results from a cursor.
129///
130/// Binds to the result set column-wise. This is usually helpful in data engineering or data science
131/// tasks. This buffer type can be used in situations where the schema of the queried data is known
132/// at compile time, as well as for generic applications which work with a wide range of different
133/// data.
134///
135/// # Example: Fetching results column wise with `ColumnarBuffer`.
136///
137/// Consider querying a table with two columns `year` and `name`.
138///
139/// ```no_run
140/// use odbc_api::{
141/// Environment, Cursor, ConnectionOptions,
142/// buffers::{AnySlice, BufferDesc, Item, ColumnarAnyBuffer},
143/// };
144///
145/// let env = Environment::new()?;
146///
147/// let batch_size = 1000; // Maximum number of rows in each row set
148/// let buffer_description = [
149/// // We know year to be a Nullable SMALLINT
150/// BufferDesc::I16 { nullable: true },
151/// // and name to be a required VARCHAR
152/// BufferDesc::Text { max_str_len: 255 },
153/// ];
154///
155/// /// Creates a columnar buffer fitting the buffer description with the capacity of `batch_size`.
156/// let mut buffer = ColumnarAnyBuffer::from_descs(batch_size, buffer_description);
157///
158/// let mut conn = env.connect(
159/// "YourDatabase", "SA", "My@Test@Password1",
160/// ConnectionOptions::default(),
161/// )?;
162/// let query = "SELECT year, name FROM Birthdays;";
163/// let params = ();
164/// let timeout_sec = None;
165/// if let Some(cursor) = conn.execute(query, params, timeout_sec)? {
166/// // Bind buffer to cursor. We bind the buffer as a mutable reference here, which makes it
167/// // easier to reuse for other queries, but we could have taken ownership.
168/// let mut row_set_cursor = cursor.bind_buffer(&mut buffer)?;
169/// // Loop over row sets
170/// while let Some(row_set) = row_set_cursor.fetch()? {
171/// // Process years in row set
172/// let year_col = row_set.column(0);
173/// for year in i16::as_nullable_slice(year_col)
174/// .expect("Year column buffer expected to be nullable Int")
175/// {
176/// // Iterate over `Option<i16>` with it ..
177/// }
178/// // Process names in row set
179/// let name_col = row_set.column(1);
180/// for name in name_col
181/// .as_text_view()
182/// .expect("Name column buffer expected to be text")
183/// .iter()
184/// {
185/// // Iterate over `Option<&CStr> ..
186/// }
187/// }
188/// }
189/// # Ok::<(), odbc_api::Error>(())
190/// ```
191///
192/// This second example changes two things: we do not know the schema in advance and use the
193/// SQL DataType to determine the best fit for the buffers. Also, we want to do everything in a
194/// function and return a `Cursor` with an already bound buffer. This approach is best if you have
195/// few and very long queries, so the overhead of allocating buffers is negligible and you want to
196/// have an easier time with the borrow checker.
197///
198/// ```no_run
199/// use odbc_api::{
200/// Connection, BlockCursor, Error, Cursor, Nullability, ResultSetMetadata,
201/// buffers::{ AnyBuffer, BufferDesc, ColumnarAnyBuffer, ColumnarBuffer }
202/// };
203///
204/// fn get_birthdays<'a>(conn: &'a mut Connection)
205/// -> Result<BlockCursor<impl Cursor + 'a, ColumnarAnyBuffer>, Error>
206/// {
207/// let query = "SELECT year, name FROM Birthdays;";
208/// let params = ();
209/// let timeout_sec = None;
210/// let mut cursor = conn.execute(query, params, timeout_sec)?.unwrap();
211/// let mut column_description = Default::default();
212/// let buffer_description : Vec<_> = (0..cursor.num_result_cols()?).map(|index| {
213/// cursor.describe_col(index as u16 + 1, &mut column_description)?;
214/// let nullable = matches!(
215/// column_description.nullability,
216/// Nullability::Unknown | Nullability::Nullable
217/// );
218/// let desc = BufferDesc::from_data_type(
219/// column_description.data_type,
220/// nullable
221/// ).unwrap_or(BufferDesc::Text{ max_str_len: 255 });
222/// Ok(desc)
223/// }).collect::<Result<_, Error>>()?;
224///
225/// // Row set size of 5000 rows.
226/// let buffer = ColumnarAnyBuffer::from_descs(5000, buffer_description);
227/// // Bind buffer and take ownership over it.
228/// cursor.bind_buffer(buffer)
229/// }
230/// ```
231pub struct ColumnarBuffer<C> {
232 /// A mutable pointer to num_rows_fetched is passed to the C-API. It is used to write back the
233 /// number of fetched rows. `num_rows` is heap allocated, so the pointer is not invalidated,
234 /// even if the `ColumnarBuffer` instance is moved in memory.
235 num_rows: Box<usize>,
236 /// aka: batch size, row array size
237 row_capacity: usize,
238 /// Column index and bound buffer
239 columns: Vec<(u16, C)>,
240}
241
242/// Access a safe view of the column buffer.
243///
244/// After a fetch operation buffers may only partially be filled with data, the rest of the buffer
245/// may contain uninitialized values. Also we must not permit any operation which would invalidate
246/// the addresses of the buffer. To make reading buffer contents after a fetch safe,
247/// [`ColumnBuffer`]s implement this trait to offer safe views.
248///
249/// # Safety
250///
251/// Views must not allow access to uninitialized / invalid rows.
252pub unsafe trait Slice {
253 /// Immutable view on the column data. Used in safe abstractions. User must not be able to
254 /// access uninitialized or invalid memory of the buffer through this interface.
255 type Slice<'a>
256 where
257 Self: 'a;
258
259 /// Num rows may not exceed the actual amount of valid num_rows filled by the ODBC API. The
260 /// column buffer does not know how many elements were in the last row group, and therefore can
261 /// not guarantee the accessed element to be valid and in a defined state. It also can not panic
262 /// on accessing an undefined element.
263 fn slice(&self, valid_rows: usize) -> Self::Slice<'_>;
264}
265
266/// A buffer for a single column intended to be used together with [`ColumnarBuffer`].
267///
268/// # Safety
269///
270/// Implementations must ensure that:
271///
272/// * Capacity must be correctly reported otherwise data may be written outside its bounds.
273/// * truncation must be correctly reported. Code which reuses the same column buffer for reading
274/// and inserting may rely on this in order to avoid passing values indicators of truncated values
275/// in bulk insertions. This could lead to out of bounds memory access.
276pub unsafe trait ColumnBuffer: CDataMut {
277 /// Current capacity of the column
278 fn capacity(&self) -> usize;
279
280 /// `Some` if any value is truncated in the range [0, num_rows).
281 ///
282 /// After fetching data we may want to know if any value has been truncated due to the buffer
283 /// not being able to hold elements of that size. This method checks the indicator buffer
284 /// element wise.
285 fn has_truncated_values(&self, num_rows: usize) -> Option<Indicator>;
286}
287
288unsafe impl<T> Slice for WithDataType<T>
289where
290 T: Slice,
291{
292 type Slice<'a>
293 = T::Slice<'a>
294 where
295 T: 'a;
296
297 fn slice(&self, valid_rows: usize) -> T::Slice<'_> {
298 self.value.slice(valid_rows)
299 }
300}
301
302unsafe impl<T> ColumnBuffer for WithDataType<T>
303where
304 T: ColumnBuffer,
305{
306 fn capacity(&self) -> usize {
307 self.value.capacity()
308 }
309
310 fn has_truncated_values(&self, num_rows: usize) -> Option<Indicator> {
311 self.value.has_truncated_values(num_rows)
312 }
313}
314
315unsafe impl<'a, T> BoundInputSlice<'a> for WithDataType<T>
316where
317 T: BoundInputSlice<'a>,
318{
319 type SliceMut = T::SliceMut;
320
321 unsafe fn as_view_mut(
322 &'a mut self,
323 parameter_index: u16,
324 stmt: StatementRef<'a>,
325 ) -> Self::SliceMut {
326 unsafe { self.value.as_view_mut(parameter_index, stmt) }
327 }
328}
329
330/// This row set binds a string buffer to each column, which is large enough to hold the maximum
331/// length string representation for each element in the row set at once.
332///
333/// # Example
334///
335/// ```no_run
336/// //! A program executing a query and printing the result as CSV to standard output. Requires
337/// //! `anyhow` and `csv` crate.
338///
339/// use anyhow::Error;
340/// use odbc_api::{buffers::TextRowSet, Cursor, Environment, ConnectionOptions, ResultSetMetadata};
341/// use std::{
342/// ffi::CStr,
343/// io::{stdout, Write},
344/// path::PathBuf,
345/// };
346///
347/// /// Maximum number of rows fetched with one row set. Fetching batches of rows is usually much
348/// /// faster than fetching individual rows.
349/// const BATCH_SIZE: usize = 5000;
350///
351/// fn main() -> Result<(), Error> {
352/// // Write csv to standard out
353/// let out = stdout();
354/// let mut writer = csv::Writer::from_writer(out);
355///
356/// // We know this is going to be the only ODBC environment in the entire process, so this is
357/// // safe.
358/// let environment = unsafe { Environment::new() }?;
359///
360/// // Connect using a DSN. Alternatively we could have used a connection string
361/// let mut connection = environment.connect(
362/// "DataSourceName",
363/// "Username",
364/// "Password",
365/// ConnectionOptions::default(),
366/// )?;
367///
368/// // Execute a one-off query without any parameters.
369/// let query = "SELECT * FROM TableName";
370/// let params = ();
371/// let timeout_sec = None;
372/// match connection.execute(query, params, timeout_sec)? {
373/// Some(mut cursor) => {
374/// // Write the column names to stdout
375/// let mut headline: Vec<String> = cursor.column_names()?.collect::<Result<_,_>>()?;
376/// writer.write_record(headline)?;
377///
378/// // Use schema in cursor to initialize a text buffer large enough to hold the largest
379/// // possible strings for each column up to an upper limit of 4KiB
380/// let mut buffers = TextRowSet::for_cursor(BATCH_SIZE, &mut cursor, Some(4096))?;
381/// // Bind the buffer to the cursor. It is now being filled with every call to fetch.
382/// let mut row_set_cursor = cursor.bind_buffer(&mut buffers)?;
383///
384/// // Iterate over batches
385/// while let Some(batch) = row_set_cursor.fetch()? {
386/// // Within a batch, iterate over every row
387/// for row_index in 0..batch.num_rows() {
388/// // Within a row iterate over every column
389/// let record = (0..batch.num_cols()).map(|col_index| {
390/// batch
391/// .at(col_index, row_index)
392/// .unwrap_or(&[])
393/// });
394/// // Writes the row as CSV
395/// writer.write_record(record)?;
396/// }
397/// }
398/// }
399/// None => {
400/// eprintln!(
401/// "Query came back empty. No output has been created."
402/// );
403/// }
404/// }
405///
406/// Ok(())
407/// }
408/// ```
409pub type TextRowSet = ColumnarBuffer<TextColumn<u8>>;
410
411impl TextRowSet {
412 /// The resulting text buffer is not in any way tied to the cursor, other than that its buffer
413 /// sizes a tailor fitted to result set the cursor is iterating over.
414 ///
415 /// This method performs fallible buffer allocations, if no upper bound is set, so you may see
416 /// a speedup, by setting an upper bound using `max_str_limit`.
417 ///
418 ///
419 /// # Parameters
420 ///
421 /// * `batch_size`: The maximum number of rows the buffer is able to hold.
422 /// * `cursor`: Used to query the display size for each column of the row set. For character
423 /// data the length in characters is multiplied by 4 in order to have enough space for 4 byte
424 /// utf-8 characters. This is a pessimization for some data sources (e.g. SQLite 3) which do
425 /// interpret the size of a `VARCHAR(5)` column as 5 bytes rather than 5 characters.
426 /// * `max_str_limit`: Some queries make it hard to estimate a sensible upper bound and
427 /// sometimes drivers are just not that good at it. This argument allows you to specify an
428 /// upper bound for the length of character data. Any size reported by the driver is capped to
429 /// this value. In case the upper bound can not inferred by the metadata reported by the
430 /// driver the element size is set to this upper bound, too.
431 pub fn for_cursor(
432 batch_size: usize,
433 cursor: &mut impl ResultSetMetadata,
434 max_str_limit: Option<usize>,
435 ) -> Result<TextRowSet, Error> {
436 let buffers = utf8_display_sizes(cursor)?
437 .enumerate()
438 .map(|(buffer_index, reported_len)| {
439 let buffer_index = buffer_index as u16;
440 let col_index = buffer_index + 1;
441 let max_str_len = reported_len?;
442 let buffer = if let Some(upper_bound) = max_str_limit {
443 let max_str_len = max_str_len
444 .map(NonZeroUsize::get)
445 .unwrap_or(upper_bound)
446 .min(upper_bound);
447 TextColumn::new(batch_size, max_str_len)
448 } else {
449 let max_str_len = max_str_len.map(NonZeroUsize::get).ok_or(
450 Error::TooLargeColumnBufferSize {
451 buffer_index,
452 num_elements: batch_size,
453 element_size: usize::MAX,
454 },
455 )?;
456 TextColumn::try_new(batch_size, max_str_len).map_err(|source| {
457 Error::TooLargeColumnBufferSize {
458 buffer_index,
459 num_elements: source.num_elements,
460 element_size: source.element_size,
461 }
462 })?
463 };
464
465 Ok::<_, Error>((col_index, buffer))
466 })
467 .collect::<Result<_, _>>()?;
468 Ok(TextRowSet {
469 row_capacity: batch_size,
470 num_rows: Box::new(0),
471 columns: buffers,
472 })
473 }
474
475 /// Creates a text buffer large enough to hold `batch_size` rows with one column for each item
476 /// `max_str_lengths` of respective size.
477 pub fn from_max_str_lens(
478 row_capacity: usize,
479 max_str_lengths: impl IntoIterator<Item = usize>,
480 ) -> Result<Self, Error> {
481 let buffers = max_str_lengths
482 .into_iter()
483 .enumerate()
484 .map(|(index, max_str_len)| {
485 Ok::<_, Error>((
486 (index + 1).try_into().unwrap(),
487 TextColumn::try_new(row_capacity, max_str_len)
488 .map_err(|source| source.add_context(index.try_into().unwrap()))?,
489 ))
490 })
491 .collect::<Result<_, _>>()?;
492 Ok(TextRowSet {
493 row_capacity,
494 num_rows: Box::new(0),
495 columns: buffers,
496 })
497 }
498
499 /// Access the element at the specified position in the row set.
500 pub fn at(&self, buffer_index: usize, row_index: usize) -> Option<&[u8]> {
501 assert!(row_index < *self.num_rows);
502 self.columns[buffer_index].1.value_at(row_index)
503 }
504
505 /// Access the element at the specified position in the row set.
506 pub fn at_as_str(&self, col_index: usize, row_index: usize) -> Result<Option<&str>, Utf8Error> {
507 self.at(col_index, row_index).map(from_utf8).transpose()
508 }
509
510 /// Indicator value at the specified position. Useful to detect truncation of data.
511 ///
512 /// # Example
513 ///
514 /// ```
515 /// use odbc_api::buffers::{Indicator, TextRowSet};
516 ///
517 /// fn is_truncated(buffer: &TextRowSet, col_index: usize, row_index: usize) -> bool {
518 /// match buffer.indicator_at(col_index, row_index) {
519 /// // There is no value, therefore there is no value not fitting in the column buffer.
520 /// Indicator::Null => false,
521 /// // The value did not fit into the column buffer, we do not even know, by how much.
522 /// Indicator::NoTotal => true,
523 /// Indicator::Length(total_length) => {
524 /// // If the maximum string length is shorter than the values total length, the
525 /// // has been truncated to fit into the buffer.
526 /// buffer.max_len(col_index) < total_length
527 /// }
528 /// }
529 /// }
530 /// ```
531 pub fn indicator_at(&self, buf_index: usize, row_index: usize) -> Indicator {
532 assert!(row_index < *self.num_rows);
533 self.columns[buf_index].1.indicator_at(row_index)
534 }
535
536 /// Maximum length in bytes of elements in a column.
537 pub fn max_len(&self, buf_index: usize) -> usize {
538 self.columns[buf_index].1.max_len()
539 }
540}
541
542unsafe impl<T> ColumnBuffer for Vec<T>
543where
544 T: Pod,
545{
546 fn capacity(&self) -> usize {
547 self.len()
548 }
549
550 fn has_truncated_values(&self, _num_rows: usize) -> Option<Indicator> {
551 None
552 }
553}
554
555unsafe impl<T> Slice for Vec<T>
556where
557 T: Pod,
558{
559 type Slice<'a> = &'a [T];
560
561 fn slice(&self, valid_rows: usize) -> &[T] {
562 &self[..valid_rows]
563 }
564}
565
566/// A column buffer which can be resized.
567///
568/// Resizing is useful if a column buffer is used for inserting parameters, rather than fetching.
569/// Imagine an application which inserts data from a stream with row groups of varying size. If it
570/// encounters a row group with a new maximum size, it may want to resize the parameter buffers to
571/// send the entire row group in one go.
572pub trait Resize {
573 /// Resize the buffer to the given capacity.
574 ///
575 /// # Parameters
576 ///
577 /// * `new_capacity`: The new capacity of the buffer.
578 fn resize(&mut self, new_capacity: usize);
579}
580
581impl<T> Resize for Vec<T>
582where
583 T: Default + Clone,
584{
585 fn resize(&mut self, new_capacity: usize) {
586 Vec::resize(self, new_capacity, T::default());
587 }
588}
589
590impl<T> Resize for WithDataType<T>
591where
592 T: Resize,
593{
594 fn resize(&mut self, new_capacity: usize) {
595 self.value.resize(new_capacity);
596 }
597}
598
599#[cfg(test)]
600mod tests {
601
602 use super::Resize;
603 use crate::buffers::BufferDesc;
604
605 #[test]
606 #[should_panic(expected = "Column indices must be unique.")]
607 fn assert_unique_column_indices() {
608 use crate::buffers::ColumnarAnyBuffer;
609
610 let bd = BufferDesc::I32 { nullable: false };
611 ColumnarAnyBuffer::from_descs_and_indices(1, [(1, bd), (2, bd), (1, bd)].iter().cloned());
612 }
613
614 /// Vec's can resize just fine without this library, yet it is important that they implement the
615 /// `Resize` trait, so that other generic types know about it.
616 #[test]
617 fn vec_is_resize() {
618 let mut my_int_column_buffer = vec![1, 2];
619
620 Resize::resize(&mut my_int_column_buffer, 4);
621
622 assert_eq!(my_int_column_buffer[0], 1);
623 assert_eq!(my_int_column_buffer[1], 2);
624 assert_eq!(my_int_column_buffer[2], 0);
625 assert_eq!(my_int_column_buffer[3], 0);
626 }
627}