odbc_api/buffers/
columnar.rs

1use std::{
2    collections::HashSet,
3    num::NonZeroUsize,
4    str::{Utf8Error, from_utf8},
5};
6
7use crate::{
8    Error, ResultSetMetadata, RowSetBuffer,
9    columnar_bulk_inserter::BoundInputSlice,
10    cursor::TruncationInfo,
11    fixed_sized::Pod,
12    handles::{CDataMut, Statement, StatementRef},
13    parameter::WithDataType,
14    result_set_metadata::utf8_display_sizes,
15};
16
17use super::{Indicator, TextColumn};
18
19impl<C: ColumnBuffer> ColumnarBuffer<C> {
20    /// Create a new instance from columns with unique indicies. Capacity of the buffer will be the
21    /// minimum capacity of the columns. The constructed buffer is always empty (i.e. the number of
22    /// valid rows is considered to be zero).
23    ///
24    /// You do not want to call this constructor directly unless you want to provide your own buffer
25    /// implentation. Most users of this crate may want to use the constructors like
26    /// [`crate::buffers::ColumnarAnyBuffer::from_descs`] or
27    /// [`crate::buffers::TextRowSet::from_max_str_lens`] instead.
28    pub fn new(columns: Vec<(u16, C)>) -> Self {
29        // Assert capacity
30        let capacity = columns
31            .iter()
32            .map(|(_, col)| col.capacity())
33            .min()
34            .unwrap_or(0);
35
36        // Assert uniqueness of indices
37        let mut indices = HashSet::new();
38        if columns
39            .iter()
40            .any(move |&(col_index, _)| !indices.insert(col_index))
41        {
42            panic!("Column indices must be unique.")
43        }
44
45        unsafe { Self::new_unchecked(capacity, columns) }
46    }
47
48    /// # Safety
49    ///
50    /// * Indices must be unique
51    /// * Columns all must have enough `capacity`.
52    pub unsafe fn new_unchecked(capacity: usize, columns: Vec<(u16, C)>) -> Self {
53        ColumnarBuffer {
54            num_rows: Box::new(0),
55            row_capacity: capacity,
56            columns,
57        }
58    }
59
60    /// Number of valid rows in the buffer.
61    pub fn num_rows(&self) -> usize {
62        *self.num_rows
63    }
64
65    /// Return the number of columns in the row set.
66    pub fn num_cols(&self) -> usize {
67        self.columns.len()
68    }
69
70    /// Use this method to gain read access to the actual column data.
71    ///
72    /// # Parameters
73    ///
74    /// * `buffer_index`: Please note that the buffer index is not identical to the ODBC column
75    ///   index. For one it is zero based. It also indexes the buffer bound, and not the columns of
76    ///   the output result set. This is important, because not every column needs to be bound. Some
77    ///   columns may simply be ignored. That being said, if every column of the output is bound in
78    ///   the buffer, in the same order in which they are enumerated in the result set, the
79    ///   relationship between column index and buffer index is `buffer_index = column_index - 1`.
80    pub fn column(&self, buffer_index: usize) -> C::View<'_> {
81        self.columns[buffer_index].1.view(*self.num_rows)
82    }
83}
84
85unsafe impl<C> RowSetBuffer for ColumnarBuffer<C>
86where
87    C: ColumnBuffer,
88{
89    fn bind_type(&self) -> usize {
90        0 // Specify columnar binding
91    }
92
93    fn row_array_size(&self) -> usize {
94        self.row_capacity
95    }
96
97    fn mut_num_fetch_rows(&mut self) -> &mut usize {
98        self.num_rows.as_mut()
99    }
100
101    unsafe fn bind_colmuns_to_cursor(&mut self, mut cursor: StatementRef<'_>) -> Result<(), Error> {
102        unsafe {
103            for (col_number, column) in &mut self.columns {
104                cursor.bind_col(*col_number, column).into_result(&cursor)?;
105            }
106        }
107        Ok(())
108    }
109
110    fn find_truncation(&self) -> Option<TruncationInfo> {
111        self.columns
112            .iter()
113            .enumerate()
114            .find_map(|(buffer_index, (_col_index, col_buffer))| {
115                col_buffer
116                    .has_truncated_values(*self.num_rows)
117                    .map(|indicator| TruncationInfo {
118                        indicator: indicator.length(),
119                        buffer_index,
120                    })
121            })
122    }
123}
124
125/// A columnar buffer intended to be bound with [crate::Cursor::bind_buffer] in order to obtain
126/// results from a cursor.
127///
128/// Binds to the result set column-wise. This is usually helpful in data engineering or data science
129/// tasks. This buffer type can be used in situations where the schema of the queried data is known
130/// at compile time, as well as for generic applications which work with a wide range of different
131/// data.
132///
133/// # Example: Fetching results column wise with `ColumnarBuffer`.
134///
135/// Consider querying a table with two columns `year` and `name`.
136///
137/// ```no_run
138/// use odbc_api::{
139///     Environment, Cursor, ConnectionOptions,
140///     buffers::{AnySlice, BufferDesc, Item, ColumnarAnyBuffer},
141/// };
142///
143/// let env = Environment::new()?;
144///
145/// let batch_size = 1000; // Maximum number of rows in each row set
146/// let buffer_description = [
147///     // We know year to be a Nullable SMALLINT
148///     BufferDesc::I16 { nullable: true },
149///     // and name to be a required VARCHAR
150///     BufferDesc::Text { max_str_len: 255 },
151/// ];
152///
153/// /// Creates a columnar buffer fitting the buffer description with the capacity of `batch_size`.
154/// let mut buffer = ColumnarAnyBuffer::from_descs(batch_size, buffer_description);
155///
156/// let mut conn = env.connect(
157///     "YourDatabase", "SA", "My@Test@Password1",
158///     ConnectionOptions::default(),
159/// )?;
160/// let query = "SELECT year, name FROM Birthdays;";
161/// let params = ();
162/// let timeout_sec = None;
163/// if let Some(cursor) = conn.execute(query, params, timeout_sec)? {
164///     // Bind buffer to cursor. We bind the buffer as a mutable reference here, which makes it
165///     // easier to reuse for other queries, but we could have taken ownership.
166///     let mut row_set_cursor = cursor.bind_buffer(&mut buffer)?;
167///     // Loop over row sets
168///     while let Some(row_set) = row_set_cursor.fetch()? {
169///         // Process years in row set
170///         let year_col = row_set.column(0);
171///         for year in i16::as_nullable_slice(year_col)
172///             .expect("Year column buffer expected to be nullable Int")
173///         {
174///             // Iterate over `Option<i16>` with it ..
175///         }
176///         // Process names in row set
177///         let name_col = row_set.column(1);
178///         for name in name_col
179///             .as_text_view()
180///             .expect("Name column buffer expected to be text")
181///             .iter()
182///         {
183///             // Iterate over `Option<&CStr> ..
184///         }
185///     }
186/// }
187/// # Ok::<(), odbc_api::Error>(())
188/// ```
189///
190/// This second example changes two things: we do not know the schema in advance and use the
191/// SQL DataType to determine the best fit for the buffers. Also, we want to do everything in a
192/// function and return a `Cursor` with an already bound buffer. This approach is best if you have
193/// few and very long queries, so the overhead of allocating buffers is negligible and you want to
194/// have an easier time with the borrow checker.
195///
196/// ```no_run
197/// use odbc_api::{
198///     Connection, BlockCursor, Error, Cursor, Nullability, ResultSetMetadata,
199///     buffers::{ AnyBuffer, BufferDesc, ColumnarAnyBuffer, ColumnarBuffer }
200/// };
201///
202/// fn get_birthdays<'a>(conn: &'a mut Connection)
203///     -> Result<BlockCursor<impl Cursor + 'a, ColumnarAnyBuffer>, Error>
204/// {
205///     let query = "SELECT year, name FROM Birthdays;";
206///     let params = ();
207///     let timeout_sec = None;
208///     let mut cursor = conn.execute(query, params, timeout_sec)?.unwrap();
209///     let mut column_description = Default::default();
210///     let buffer_description : Vec<_> = (0..cursor.num_result_cols()?).map(|index| {
211///         cursor.describe_col(index as u16 + 1, &mut column_description)?;
212///         let nullable = matches!(
213///             column_description.nullability,
214///             Nullability::Unknown | Nullability::Nullable
215///         );
216///         let desc = BufferDesc::from_data_type(
217///             column_description.data_type,
218///             nullable
219///         ).unwrap_or(BufferDesc::Text{ max_str_len: 255 });
220///         Ok(desc)
221///     }).collect::<Result<_, Error>>()?;
222///
223///     // Row set size of 5000 rows.
224///     let buffer = ColumnarAnyBuffer::from_descs(5000, buffer_description);
225///     // Bind buffer and take ownership over it.
226///     cursor.bind_buffer(buffer)
227/// }
228/// ```
229pub struct ColumnarBuffer<C> {
230    /// A mutable pointer to num_rows_fetched is passed to the C-API. It is used to write back the
231    /// number of fetched rows. `num_rows` is heap allocated, so the pointer is not invalidated,
232    /// even if the `ColumnarBuffer` instance is moved in memory.
233    num_rows: Box<usize>,
234    /// aka: batch size, row array size
235    row_capacity: usize,
236    /// Column index and bound buffer
237    columns: Vec<(u16, C)>,
238}
239
240/// A buffer for a single column intended to be used together with [`ColumnarBuffer`].
241///
242/// # Safety
243///
244/// Views must not allow access to uninitialized / invalid rows.
245pub unsafe trait ColumnBuffer: CDataMut {
246    /// Immutable view on the column data. Used in safe abstractions. User must not be able to
247    /// access uninitialized or invalid memory of the buffer through this interface.
248    type View<'a>
249    where
250        Self: 'a;
251
252    /// Num rows may not exceed the actual amount of valid num_rows filled by the ODBC API. The
253    /// column buffer does not know how many elements were in the last row group, and therefore can
254    /// not guarantee the accessed element to be valid and in a defined state. It also can not panic
255    /// on accessing an undefined element.
256    fn view(&self, valid_rows: usize) -> Self::View<'_>;
257
258    // /// Fills the column with the default representation of values, between `from` and `to` index.
259    // fn fill_default(&mut self, from: usize, to: usize);
260
261    /// Current capacity of the column
262    fn capacity(&self) -> usize;
263
264    /// `Some` if any value is truncated in the range [0, num_rows).
265    ///
266    /// After fetching data we may want to know if any value has been truncated due to the buffer
267    /// not being able to hold elements of that size. This method checks the indicator buffer
268    /// element wise.
269    fn has_truncated_values(&self, num_rows: usize) -> Option<Indicator>;
270}
271
272unsafe impl<T> ColumnBuffer for WithDataType<T>
273where
274    T: ColumnBuffer,
275{
276    type View<'a>
277        = T::View<'a>
278    where
279        T: 'a;
280
281    fn view(&self, valid_rows: usize) -> T::View<'_> {
282        self.value.view(valid_rows)
283    }
284
285    fn capacity(&self) -> usize {
286        self.value.capacity()
287    }
288
289    fn has_truncated_values(&self, num_rows: usize) -> Option<Indicator> {
290        self.value.has_truncated_values(num_rows)
291    }
292}
293
294unsafe impl<'a, T> BoundInputSlice<'a> for WithDataType<T>
295where
296    T: BoundInputSlice<'a>,
297{
298    type SliceMut = T::SliceMut;
299
300    unsafe fn as_view_mut(
301        &'a mut self,
302        parameter_index: u16,
303        stmt: StatementRef<'a>,
304    ) -> Self::SliceMut {
305        unsafe { self.value.as_view_mut(parameter_index, stmt) }
306    }
307}
308
309/// This row set binds a string buffer to each column, which is large enough to hold the maximum
310/// length string representation for each element in the row set at once.
311///
312/// # Example
313///
314/// ```no_run
315/// //! A program executing a query and printing the result as CSV to standard output. Requires
316/// //! `anyhow` and `csv` crate.
317///
318/// use anyhow::Error;
319/// use odbc_api::{buffers::TextRowSet, Cursor, Environment, ConnectionOptions, ResultSetMetadata};
320/// use std::{
321///     ffi::CStr,
322///     io::{stdout, Write},
323///     path::PathBuf,
324/// };
325///
326/// /// Maximum number of rows fetched with one row set. Fetching batches of rows is usually much
327/// /// faster than fetching individual rows.
328/// const BATCH_SIZE: usize = 5000;
329///
330/// fn main() -> Result<(), Error> {
331///     // Write csv to standard out
332///     let out = stdout();
333///     let mut writer = csv::Writer::from_writer(out);
334///
335///     // We know this is going to be the only ODBC environment in the entire process, so this is
336///     // safe.
337///     let environment = unsafe { Environment::new() }?;
338///
339///     // Connect using a DSN. Alternatively we could have used a connection string
340///     let mut connection = environment.connect(
341///         "DataSourceName",
342///         "Username",
343///         "Password",
344///         ConnectionOptions::default(),
345///     )?;
346///
347///     // Execute a one-off query without any parameters.
348///     let query = "SELECT * FROM TableName";
349///     let params = ();
350///     let timeout_sec = None;
351///     match connection.execute(query, params, timeout_sec)? {
352///         Some(mut cursor) => {
353///             // Write the column names to stdout
354///             let mut headline: Vec<String> = cursor.column_names()?.collect::<Result<_,_>>()?;
355///             writer.write_record(headline)?;
356///
357///             // Use schema in cursor to initialize a text buffer large enough to hold the largest
358///             // possible strings for each column up to an upper limit of 4KiB
359///             let mut buffers = TextRowSet::for_cursor(BATCH_SIZE, &mut cursor, Some(4096))?;
360///             // Bind the buffer to the cursor. It is now being filled with every call to fetch.
361///             let mut row_set_cursor = cursor.bind_buffer(&mut buffers)?;
362///
363///             // Iterate over batches
364///             while let Some(batch) = row_set_cursor.fetch()? {
365///                 // Within a batch, iterate over every row
366///                 for row_index in 0..batch.num_rows() {
367///                     // Within a row iterate over every column
368///                     let record = (0..batch.num_cols()).map(|col_index| {
369///                         batch
370///                             .at(col_index, row_index)
371///                             .unwrap_or(&[])
372///                     });
373///                     // Writes the row as CSV
374///                     writer.write_record(record)?;
375///                 }
376///             }
377///         }
378///         None => {
379///             eprintln!(
380///                 "Query came back empty. No output has been created."
381///             );
382///         }
383///     }
384///
385///     Ok(())
386/// }
387/// ```
388pub type TextRowSet = ColumnarBuffer<TextColumn<u8>>;
389
390impl TextRowSet {
391    /// The resulting text buffer is not in any way tied to the cursor, other than that its buffer
392    /// sizes a tailor fitted to result set the cursor is iterating over.
393    ///
394    /// This method performs fallible buffer allocations, if no upper bound is set, so you may see
395    /// a speedup, by setting an upper bound using `max_str_limit`.
396    ///
397    ///
398    /// # Parameters
399    ///
400    /// * `batch_size`: The maximum number of rows the buffer is able to hold.
401    /// * `cursor`: Used to query the display size for each column of the row set. For character
402    ///   data the length in characters is multiplied by 4 in order to have enough space for 4 byte
403    ///   utf-8 characters. This is a pessimization for some data sources (e.g. SQLite 3) which do
404    ///   interpret the size of a `VARCHAR(5)` column as 5 bytes rather than 5 characters.
405    /// * `max_str_limit`: Some queries make it hard to estimate a sensible upper bound and
406    ///   sometimes drivers are just not that good at it. This argument allows you to specify an
407    ///   upper bound for the length of character data. Any size reported by the driver is capped to
408    ///   this value. In case the upper bound can not inferred by the metadata reported by the
409    ///   driver the element size is set to this upper bound, too.
410    pub fn for_cursor(
411        batch_size: usize,
412        cursor: &mut impl ResultSetMetadata,
413        max_str_limit: Option<usize>,
414    ) -> Result<TextRowSet, Error> {
415        let buffers = utf8_display_sizes(cursor)?
416            .enumerate()
417            .map(|(buffer_index, reported_len)| {
418                let buffer_index = buffer_index as u16;
419                let col_index = buffer_index + 1;
420                let max_str_len = reported_len?;
421                let buffer = if let Some(upper_bound) = max_str_limit {
422                    let max_str_len = max_str_len
423                        .map(NonZeroUsize::get)
424                        .unwrap_or(upper_bound)
425                        .min(upper_bound);
426                    TextColumn::new(batch_size, max_str_len)
427                } else {
428                    let max_str_len = max_str_len.map(NonZeroUsize::get).ok_or(
429                        Error::TooLargeColumnBufferSize {
430                            buffer_index,
431                            num_elements: batch_size,
432                            element_size: usize::MAX,
433                        },
434                    )?;
435                    TextColumn::try_new(batch_size, max_str_len).map_err(|source| {
436                        Error::TooLargeColumnBufferSize {
437                            buffer_index,
438                            num_elements: source.num_elements,
439                            element_size: source.element_size,
440                        }
441                    })?
442                };
443
444                Ok::<_, Error>((col_index, buffer))
445            })
446            .collect::<Result<_, _>>()?;
447        Ok(TextRowSet {
448            row_capacity: batch_size,
449            num_rows: Box::new(0),
450            columns: buffers,
451        })
452    }
453
454    /// Creates a text buffer large enough to hold `batch_size` rows with one column for each item
455    /// `max_str_lengths` of respective size.
456    pub fn from_max_str_lens(
457        row_capacity: usize,
458        max_str_lengths: impl IntoIterator<Item = usize>,
459    ) -> Result<Self, Error> {
460        let buffers = max_str_lengths
461            .into_iter()
462            .enumerate()
463            .map(|(index, max_str_len)| {
464                Ok::<_, Error>((
465                    (index + 1).try_into().unwrap(),
466                    TextColumn::try_new(row_capacity, max_str_len)
467                        .map_err(|source| source.add_context(index.try_into().unwrap()))?,
468                ))
469            })
470            .collect::<Result<_, _>>()?;
471        Ok(TextRowSet {
472            row_capacity,
473            num_rows: Box::new(0),
474            columns: buffers,
475        })
476    }
477
478    /// Access the element at the specified position in the row set.
479    pub fn at(&self, buffer_index: usize, row_index: usize) -> Option<&[u8]> {
480        assert!(row_index < *self.num_rows);
481        self.columns[buffer_index].1.value_at(row_index)
482    }
483
484    /// Access the element at the specified position in the row set.
485    pub fn at_as_str(&self, col_index: usize, row_index: usize) -> Result<Option<&str>, Utf8Error> {
486        self.at(col_index, row_index).map(from_utf8).transpose()
487    }
488
489    /// Indicator value at the specified position. Useful to detect truncation of data.
490    ///
491    /// # Example
492    ///
493    /// ```
494    /// use odbc_api::buffers::{Indicator, TextRowSet};
495    ///
496    /// fn is_truncated(buffer: &TextRowSet, col_index: usize, row_index: usize) -> bool {
497    ///     match buffer.indicator_at(col_index, row_index) {
498    ///         // There is no value, therefore there is no value not fitting in the column buffer.
499    ///         Indicator::Null => false,
500    ///         // The value did not fit into the column buffer, we do not even know, by how much.
501    ///         Indicator::NoTotal => true,
502    ///         Indicator::Length(total_length) => {
503    ///             // If the maximum string length is shorter than the values total length, the
504    ///             // has been truncated to fit into the buffer.
505    ///             buffer.max_len(col_index) < total_length
506    ///         }
507    ///     }
508    /// }
509    /// ```
510    pub fn indicator_at(&self, buf_index: usize, row_index: usize) -> Indicator {
511        assert!(row_index < *self.num_rows);
512        self.columns[buf_index].1.indicator_at(row_index)
513    }
514
515    /// Maximum length in bytes of elements in a column.
516    pub fn max_len(&self, buf_index: usize) -> usize {
517        self.columns[buf_index].1.max_len()
518    }
519}
520
521unsafe impl<T> ColumnBuffer for Vec<T>
522where
523    T: Pod,
524{
525    type View<'a> = &'a [T];
526
527    fn view(&self, valid_rows: usize) -> &[T] {
528        &self[..valid_rows]
529    }
530
531    fn capacity(&self) -> usize {
532        self.len()
533    }
534
535    fn has_truncated_values(&self, _num_rows: usize) -> Option<Indicator> {
536        None
537    }
538}
539
540/// A column buffer which can be resized.
541///
542/// Resizing is useful if a column buffer is used for inserting parameters, rather than fetching.
543/// Imagine an application which inserts data from a stream with row groups of varying size. If it
544/// encounters a row group with a new maximum size, it may want to resize the parameter buffers to
545/// send the entire row group in one go.
546pub trait Resize {
547    /// Resize the buffer to the given capacity.
548    ///
549    /// # Parameters
550    ///
551    /// * `new_capacity`: The new capacity of the buffer.
552    fn resize(&mut self, new_capacity: usize);
553}
554
555impl<T> Resize for Vec<T>
556where
557    T: Default + Clone,
558{
559    fn resize(&mut self, new_capacity: usize) {
560        Vec::resize(self, new_capacity, T::default());
561    }
562}
563
564#[cfg(test)]
565mod tests {
566
567    use super::Resize;
568    use crate::buffers::{BufferDesc, ColumnarAnyBuffer};
569
570    #[test]
571    #[should_panic(expected = "Column indices must be unique.")]
572    fn assert_unique_column_indices() {
573        let bd = BufferDesc::I32 { nullable: false };
574        ColumnarAnyBuffer::from_descs_and_indices(1, [(1, bd), (2, bd), (1, bd)].iter().cloned());
575    }
576
577    /// Vec's can resize just fine without this library, yet it is important that they implement the
578    /// `Resize` trait, so that other generic types know about it.
579    #[test]
580    fn vec_is_resize() {
581        let mut my_int_column_buffer = vec![1, 2];
582
583        Resize::resize(&mut my_int_column_buffer, 4);
584
585        assert_eq!(my_int_column_buffer[0], 1);
586        assert_eq!(my_int_column_buffer[1], 2);
587        assert_eq!(my_int_column_buffer[2], 0);
588        assert_eq!(my_int_column_buffer[3], 0);
589    }
590}