odbc_api/
columnar_bulk_inserter.rs

1use crate::{
2    CursorImpl, Error,
3    buffers::{ColumnBuffer, TextColumn},
4    execute::execute,
5    handles::{AsStatementRef, HasDataType, Statement, StatementRef},
6};
7
8/// Can be used to execute a statement with bulk array paramters. Contrary to its name any statement
9/// with parameters can be executed, not only `INSERT` however inserting large amounts of data in
10/// batches is the primary intended use case.
11///
12/// Binding new buffers is quite expensive in ODBC, so the parameter buffers are reused for each
13/// batch (so the pointers bound to the statment stay valid). So we copy each batch of data into the
14/// buffers already bound first rather than binding user defined buffer. Often the data might need
15/// to be transformed anyway, so the copy is no actual overhead. Once the buffers are filled with a
16/// batch, we send the data.
17pub struct ColumnarBulkInserter<S, C> {
18    // We maintain the invariant that the parameters are bound to the statement that parameter set
19    // size reflects the number of valid rows in the batch.
20    statement: S,
21    parameter_set_size: usize,
22    capacity: usize,
23    /// We maintain the invariant that none of these buffers is truncated.
24    parameters: Vec<C>,
25}
26
27impl<S, C> ColumnarBulkInserter<S, C>
28where
29    S: AsStatementRef,
30{
31    /// Users are not encouraged to call this directly.
32    ///
33    /// # Safety
34    ///
35    /// * Statement is expected to be a perpared statement.
36    /// * Parameters must all be valid for insertion. An example for an invalid parameter would be
37    ///   a text buffer with a cell those indiactor value exceeds the maximum element length. This
38    ///   can happen after when truncation occurs then writing into a buffer.
39    pub unsafe fn new(mut statement: S, parameters: Vec<C>) -> Result<Self, Error>
40    where
41        C: ColumnBuffer + HasDataType,
42    {
43        let mut stmt = statement.as_stmt_ref();
44        stmt.reset_parameters();
45        let mut parameter_number = 1;
46        // Bind buffers to statement.
47        for column in &parameters {
48            if let Err(error) =
49                unsafe { stmt.bind_input_parameter(parameter_number, column) }.into_result(&stmt)
50            {
51                // This early return using `?` is risky. We actually did bind some parameters
52                // already. We cannot guarantee that the bound pointers stay valid in case of an
53                // error since `Self` is never constructed. We would away with this, if we took
54                // ownership of the statement and it is destroyed should the constructor not
55                // succeed. However columnar bulk inserter can also be instantiated with borrowed
56                // statements. This is why we reset the parameters on error.
57                stmt.reset_parameters();
58                return Err(error);
59            }
60            parameter_number += 1;
61        }
62        let capacity = parameters
63            .iter()
64            .map(|col| col.capacity())
65            .min()
66            .unwrap_or(0);
67        Ok(Self {
68            statement,
69            parameter_set_size: 0,
70            capacity,
71            parameters,
72        })
73    }
74
75    /// Execute the prepared statement, with the parameters bound
76    pub fn execute(&mut self) -> Result<Option<CursorImpl<StatementRef<'_>>>, Error> {
77        let mut stmt = self.statement.as_stmt_ref();
78        unsafe {
79            if self.parameter_set_size == 0 {
80                // A batch size of 0 will not execute anything, same as for execute on connection or
81                // prepared.
82                Ok(None)
83            } else {
84                // We reset the parameter set size, in order to adequatly handle batches of
85                // different size then inserting into the database.
86                stmt.set_paramset_size(self.parameter_set_size);
87                execute(stmt, None)
88            }
89        }
90    }
91
92    /// Sets the number of rows in the buffer to zero.
93    pub fn clear(&mut self) {
94        self.parameter_set_size = 0;
95    }
96
97    /// Number of valid rows in the buffer
98    pub fn num_rows(&self) -> usize {
99        self.parameter_set_size
100    }
101
102    /// Set number of valid rows in the buffer. Must not be larger than the batch size. If the
103    /// specified number than the number of valid rows currently held by the buffer additional they
104    /// will just hold the value previously assigned to them. Therfore if extending the number of
105    /// valid rows users should take care to assign values to these rows. However, even if not
106    /// assigend it is always guaranteed that every cell is valid for insertion and will not cause
107    /// out of bounds access down in the ODBC driver. Therefore this method is safe. You can set
108    /// the number of valid rows before or after filling values into the buffer, but you must do so
109    /// before executing the query.
110    pub fn set_num_rows(&mut self, num_rows: usize) {
111        if num_rows > self.capacity {
112            panic!(
113                "Columnar buffer may not be resized to a value higher than the maximum number of \
114                rows initially specified in the constructor."
115            );
116        }
117        self.parameter_set_size = num_rows;
118    }
119
120    /// Use this method to gain write access to the actual column data.
121    ///
122    /// # Parameters
123    ///
124    /// * `buffer_index`: Please note that the buffer index is not identical to the ODBC column
125    ///   index. For one it is zero based. It also indexes the buffer bound, and not the columns of
126    ///   the output result set. This is important, because not every column needs to be bound. Some
127    ///   columns may simply be ignored. That being said, if every column of the output is bound in
128    ///   the buffer, in the same order in which they are enumerated in the result set, the
129    ///   relationship between column index and buffer index is `buffer_index = column_index - 1`.
130    ///
131    /// # Example
132    ///
133    /// This method is intended to be called if using [`ColumnarBulkInserter`] for column wise bulk
134    /// inserts.
135    ///
136    /// ```no_run
137    /// use odbc_api::{Connection, Error, buffers::BufferDesc};
138    ///
139    /// fn insert_birth_years(conn: &Connection, names: &[&str], years: &[i16])
140    ///     -> Result<(), Error>
141    /// {
142    ///
143    ///     // All columns must have equal length.
144    ///     assert_eq!(names.len(), years.len());
145    ///     // Prepare the insert statement
146    ///     let prepared = conn.prepare("INSERT INTO Birthdays (name, year) VALUES (?, ?)")?;
147    ///     // Create a columnar buffer which fits the input parameters.
148    ///     let buffer_description = [
149    ///         BufferDesc::Text { max_str_len: 255 },
150    ///         BufferDesc::I16 { nullable: false },
151    ///     ];
152    ///     // Here we do everything in one batch. So the capacity is the number of input
153    ///     // parameters.
154    ///     let capacity = names.len();
155    ///     let mut prebound = prepared.into_column_inserter(capacity, buffer_description)?;
156    ///     // Set number of input rows in the current batch.
157    ///     prebound.set_num_rows(names.len());
158    ///     // Fill the buffer with values column by column
159    ///
160    ///     // Fill names
161    ///     let mut col = prebound
162    ///         .column_mut(0)
163    ///         .as_text_view()
164    ///         .expect("We know the name column to hold text.");
165    ///     for (index, name) in names.iter().map(|s| Some(s.as_bytes())).enumerate() {
166    ///         col.set_cell(index, name);
167    ///     }
168    ///
169    ///     // Fill birth years
170    ///     let mut col = prebound
171    ///         .column_mut(1)
172    ///         .as_slice::<i16>()
173    ///         .expect("We know the year column to hold i16.");
174    ///     col.copy_from_slice(years);
175    ///
176    ///     // Execute the prepared statment with the bound array parameters. Sending the values to
177    ///     // the database.
178    ///     prebound.execute()?;
179    ///     Ok(())
180    /// }
181    /// ```
182    pub fn column_mut<'a>(&'a mut self, buffer_index: usize) -> C::SliceMut
183    where
184        C: BoundInputSlice<'a>,
185    {
186        unsafe {
187            self.parameters[buffer_index]
188                .as_view_mut((buffer_index + 1) as u16, self.statement.as_stmt_ref())
189        }
190    }
191
192    /// Maximum number of rows the buffer can hold at once.
193    pub fn capacity(&self) -> usize {
194        self.capacity
195    }
196}
197
198/// You can obtain a mutable slice of a column buffer which allows you to change its contents.
199///
200/// # Safety
201///
202/// * If any operations have been performed which would invalidate the pointers bound to the
203///   statement, the slice must use the statement handle to rebind the column, at the end of its
204///   lifetime (at the latest).
205/// * All values must be complete. I.e. none of the values must be truncated.
206pub unsafe trait BoundInputSlice<'a> {
207    /// Intended to allow for modifying buffer contents, while leaving the bound parameter buffers
208    /// valid.
209    type SliceMut;
210
211    /// Obtain a mutable view on a parameter buffer in order to change the parameter value(s)
212    /// submitted when executing the statement.
213    ///
214    /// # Safety
215    ///
216    /// * The statement must be the statment the column buffer is bound to. The index must be the
217    ///   parameter index it is bound at.
218    /// * All values must be complete. I.e. none of the values must be truncated.
219    unsafe fn as_view_mut(
220        &'a mut self,
221        parameter_index: u16,
222        stmt: StatementRef<'a>,
223    ) -> Self::SliceMut;
224}
225
226impl<S> ColumnarBulkInserter<S, TextColumn<u8>> {
227    /// Takes one element from the iterator for each internal column buffer and appends it to the
228    /// end of the buffer. Should a cell of the row be too large for the associated column buffer,
229    /// the column buffer will be reallocated with `1.2` times its size, and rebound to the
230    /// statement.
231    ///
232    /// This method panics if it is tried to insert elements beyond batch size. It will also panic
233    /// if row does not contain at least one item for each internal column buffer.
234    pub fn append<'b>(
235        &mut self,
236        mut row: impl Iterator<Item = Option<&'b [u8]>>,
237    ) -> Result<(), Error>
238    where
239        S: AsStatementRef,
240    {
241        if self.capacity == self.parameter_set_size {
242            panic!("Trying to insert elements into TextRowSet beyond batch size.")
243        }
244
245        let mut col_index = 1;
246        for column in &mut self.parameters {
247            let text = row.next().expect(
248                "Row passed to TextRowSet::append must contain one element for each column.",
249            );
250            if let Some(text) = text {
251                unsafe {
252                    column
253                        .as_view_mut(col_index, self.statement.as_stmt_ref())
254                        .ensure_max_element_length(text.len(), self.parameter_set_size)?;
255                }
256                column.set_value(self.parameter_set_size, Some(text));
257            } else {
258                column.set_value(self.parameter_set_size, None);
259            }
260            col_index += 1;
261        }
262
263        self.parameter_set_size += 1;
264
265        Ok(())
266    }
267}