odbc_api/columnar_bulk_inserter.rs
1use std::cmp::min;
2
3use crate::{
4 CursorImpl, Error,
5 buffers::{ColumnBuffer, Resize, TextColumn},
6 execute::execute,
7 handles::{AsStatementRef, HasDataType, Statement, StatementRef},
8};
9
10/// Can be used to execute a statement with bulk array parameters. Contrary to its name any
11/// statement with parameters can be executed, not only `INSERT`; however, inserting large amounts
12/// of data in batches is the primary intended use case.
13///
14/// Binding new buffers is quite expensive in ODBC, so the parameter buffers are reused for each
15/// batch (so the pointers bound to the statement stay valid). So we copy each batch of data into
16/// the buffers already bound first rather than binding user-defined buffers. Often the data might
17/// need to be transformed anyway, so the copy is no actual overhead. Once the buffers are filled
18/// with a batch, we send the data.
19pub struct ColumnarBulkInserter<S, C> {
20 // We maintain the invariant that the parameters are bound to the statement that parameter set
21 // size reflects the number of valid rows in the batch.
22 statement: S,
23 /// The number of values within the buffers that should be inserted in the next roundtrip. The
24 /// capacity of the buffers must be equal or larger than this value. The individual buffers
25 /// just hold values. They have no concept of how much values they hold within their capacity.
26 /// We maintain only this one value here for all the column buffers.
27 parameter_set_size: usize,
28 capacity: usize,
29 /// We maintain the invariant that none of these buffers is truncated. I.e. indicator values in
30 /// these buffers do not specify values that are larger than the maximum element length of the
31 /// buffer. Some drivers have safeguards against this, but others would just take the length of
32 /// the indicator value to read the value. This would mean reading "random" memory and
33 /// interpreting it as the value. So the kind of thing we try to avoid in a safe abstraction.
34 parameters: Vec<C>,
35}
36
37impl<S, C> ColumnarBulkInserter<S, C>
38where
39 S: AsStatementRef,
40{
41 /// Users are not encouraged to call this directly.
42 ///
43 /// # Safety
44 ///
45 /// * Statement is expected to be a perpared statement.
46 /// * Parameters must all be valid for insertion. An example for an invalid parameter would be a
47 /// text buffer with a cell those indiactor value exceeds the maximum element length. This can
48 /// happen after when truncation occurs then writing into a buffer.
49 pub unsafe fn new(
50 mut statement: S,
51 parameters: Vec<C>,
52 mapping: impl InputParameterMapping,
53 ) -> Result<Self, Error>
54 where
55 C: ColumnBuffer + HasDataType + Send,
56 {
57 let stmt = statement.as_stmt_ref();
58 bind_parameter_buffers_to_statement(¶meters, mapping, stmt)?;
59 let capacity = parameters
60 .iter()
61 .map(|col| col.capacity())
62 .min()
63 .unwrap_or(0);
64 Ok(Self {
65 statement,
66 parameter_set_size: 0,
67 capacity,
68 parameters,
69 })
70 }
71
72 /// Execute the prepared statement, with the parameters bound
73 pub fn execute(&mut self) -> Result<Option<CursorImpl<StatementRef<'_>>>, Error> {
74 let mut stmt = self.statement.as_stmt_ref();
75 unsafe {
76 if self.parameter_set_size == 0 {
77 // A batch size of 0 will not execute anything, same as for execute on connection or
78 // prepared.
79 Ok(None)
80 } else {
81 // We reset the parameter set size, in order to adequatly handle batches of
82 // different size then inserting into the database.
83 stmt.set_paramset_size(self.parameter_set_size);
84 execute(stmt, None)
85 }
86 }
87 }
88
89 /// Sets the number of rows in the buffer to zero.
90 pub fn clear(&mut self) {
91 self.parameter_set_size = 0;
92 }
93
94 /// Number of valid rows in the buffer
95 pub fn num_rows(&self) -> usize {
96 self.parameter_set_size
97 }
98
99 /// Set number of valid rows in the buffer. Must not be larger than the batch size. If the
100 /// specified number than the number of valid rows currently held by the buffer additional they
101 /// will just hold the value previously assigned to them. Therfore if extending the number of
102 /// valid rows users should take care to assign values to these rows. However, even if not
103 /// assigend it is always guaranteed that every cell is valid for insertion and will not cause
104 /// out of bounds access down in the ODBC driver. Therefore this method is safe. You can set
105 /// the number of valid rows before or after filling values into the buffer, but you must do so
106 /// before executing the query.
107 pub fn set_num_rows(&mut self, num_rows: usize) {
108 if num_rows > self.capacity {
109 panic!(
110 "Columnar buffer may not be resized to a value higher than the maximum number of \
111 rows initially specified in the constructor."
112 );
113 }
114 self.parameter_set_size = num_rows;
115 }
116
117 /// Use this method to gain write access to the actual column data.
118 ///
119 /// # Parameters
120 ///
121 /// * `buffer_index`: Please note that the buffer index is not identical to the ODBC column
122 /// index. For one it is zero based. It also indexes the buffer bound, and not the columns of
123 /// the output result set. This is important, because not every column needs to be bound. Some
124 /// columns may simply be ignored. That being said, if every column of the output is bound in
125 /// the buffer, in the same order in which they are enumerated in the result set, the
126 /// relationship between column index and buffer index is `buffer_index = column_index - 1`.
127 ///
128 /// # Example
129 ///
130 /// This method is intended to be called if using [`ColumnarBulkInserter`] for column wise bulk
131 /// inserts.
132 ///
133 /// ```no_run
134 /// use odbc_api::{Connection, Error, buffers::BufferDesc};
135 ///
136 /// fn insert_birth_years(conn: &Connection, names: &[&str], years: &[i16])
137 /// -> Result<(), Error>
138 /// {
139 ///
140 /// // All columns must have equal length.
141 /// assert_eq!(names.len(), years.len());
142 /// // Prepare the insert statement
143 /// let prepared = conn.prepare("INSERT INTO Birthdays (name, year) VALUES (?, ?)")?;
144 /// // Create a columnar buffer which fits the input parameters.
145 /// let buffer_description = [
146 /// BufferDesc::Text { max_str_len: 255 },
147 /// BufferDesc::I16 { nullable: false },
148 /// ];
149 /// // Here we do everything in one batch. So the capacity is the number of input
150 /// // parameters.
151 /// let capacity = names.len();
152 /// let mut prebound = prepared.into_column_inserter(capacity, buffer_description)?;
153 /// // Set number of input rows in the current batch.
154 /// prebound.set_num_rows(names.len());
155 /// // Fill the buffer with values column by column
156 ///
157 /// // Fill names
158 /// let mut col = prebound
159 /// .column_mut(0)
160 /// .as_text_view()
161 /// .expect("We know the name column to hold text.");
162 /// for (index, name) in names.iter().map(|s| Some(s.as_bytes())).enumerate() {
163 /// col.set_cell(index, name);
164 /// }
165 ///
166 /// // Fill birth years
167 /// let mut col = prebound
168 /// .column_mut(1)
169 /// .as_slice::<i16>()
170 /// .expect("We know the year column to hold i16.");
171 /// col.copy_from_slice(years);
172 ///
173 /// // Execute the prepared statment with the bound array parameters. Sending the values to
174 /// // the database.
175 /// prebound.execute()?;
176 /// Ok(())
177 /// }
178 /// ```
179 pub fn column_mut<'a>(&'a mut self, buffer_index: usize) -> C::SliceMut
180 where
181 C: BoundInputSlice<'a>,
182 {
183 unsafe {
184 self.parameters[buffer_index]
185 .as_view_mut((buffer_index + 1) as u16, self.statement.as_stmt_ref())
186 }
187 }
188
189 /// Maximum number of rows the buffer can hold at once.
190 pub fn capacity(&self) -> usize {
191 self.capacity
192 }
193
194 /// Resize the buffers to the new capacity. It would be hard to maintain the invariants in case
195 /// of an error, so this is why this method is destroying self in case something goes wrong.
196 ///
197 /// Valid rows in the buffer will be preserved. If the new capacity is smaller than the the
198 /// parameter set size (the number of valid rows in the buffer), then these values will be
199 /// dropped.
200 ///
201 /// You may want to make use of this method in case your program flow reads from a data source
202 /// with varying batch sizes and you want to insert each batch in one roundtrip. If you do not
203 /// know the maximum batch size in advance, you may need to resize the buffers on the fly.
204 ///
205 /// # Parameters
206 ///
207 /// * `new_capacity`: The new capacity of the buffers. Must be at least `1`. May be smaller or
208 /// larger than the current capacity.
209 /// * `mapping`: The mapping of the input parameters to the column buffers. This should be equal
210 /// to the mapping used to create the `ColumnarBulkInserter` in the first place. If a mapping
211 /// has not been specified, explicitly you can use the [`InOrder`] mapping.
212 pub fn resize(
213 mut self,
214 new_capacity: usize,
215 mapping: impl InputParameterMapping,
216 ) -> Result<Self, Error>
217 where
218 C: ColumnBuffer + HasDataType + Resize + Send,
219 {
220 assert!(new_capacity > 0, "New capacity must be at least 1.");
221 // Resize the buffers
222 for column in &mut self.parameters {
223 column.resize(new_capacity);
224 }
225 self.capacity = new_capacity;
226 self.parameter_set_size = min(self.parameter_set_size, new_capacity);
227 // Now the pointers bound to the statement may be invalid, so we need to rebind them.
228 let stmt = self.statement.as_stmt_ref();
229 bind_parameter_buffers_to_statement(&self.parameters, mapping, stmt)?;
230 Ok(self)
231 }
232}
233
234/// Binds the column buffers to the statement, based on the mapping provided. It ensures that only
235/// the provided parameter buffers are bound to the statement. Any previously bound buffers will be
236/// no longer bound. In case of an error no buffers will be bound to the statement.
237fn bind_parameter_buffers_to_statement<C>(
238 parameters: &[C],
239 mapping: impl InputParameterMapping,
240 mut stmt: StatementRef<'_>,
241) -> Result<(), Error>
242where
243 C: ColumnBuffer + HasDataType + Send,
244{
245 stmt.reset_parameters();
246 let parameter_indices = 1..(mapping.num_parameters() as u16 + 1);
247 for parameter_index in parameter_indices {
248 let column_index = mapping.parameter_index_to_column_index(parameter_index);
249 let column_buffer = ¶meters[column_index];
250 if let Err(error) =
251 unsafe { stmt.bind_input_parameter(parameter_index, column_buffer) }.into_result(&stmt)
252 {
253 // This early return using `?` is risky. We actually did bind some parameters
254 // already. We cannot guarantee that the bound pointers stay valid in case of an
255 // error since `Self` is never constructed. We would away with this, if we took
256 // ownership of the statement and it is destroyed should the constructor not
257 // succeed. However columnar bulk inserter can also be instantiated with borrowed
258 // statements. This is why we reset the parameters on error.
259 stmt.reset_parameters();
260 return Err(error);
261 }
262 }
263 Ok(())
264}
265
266/// You can obtain a mutable slice of a column buffer which allows you to change its contents.
267///
268/// # Safety
269///
270/// * If any operations have been performed which would invalidate the pointers bound to the
271/// statement, the slice must use the statement handle to rebind the column, at the end of its
272/// lifetime (at the latest).
273/// * All values must be complete. I.e. none of the values must be truncated.
274pub unsafe trait BoundInputSlice<'a> {
275 /// Intended to allow for modifying buffer contents, while leaving the bound parameter buffers
276 /// valid.
277 type SliceMut;
278
279 /// Obtain a mutable view on a parameter buffer in order to change the parameter value(s)
280 /// submitted when executing the statement.
281 ///
282 /// # Safety
283 ///
284 /// * The statement must be the statment the column buffer is bound to. The index must be the
285 /// parameter index it is bound at.
286 /// * All values must be complete. I.e. none of the values must be truncated.
287 unsafe fn as_view_mut(
288 &'a mut self,
289 parameter_index: u16,
290 stmt: StatementRef<'a>,
291 ) -> Self::SliceMut;
292}
293
294impl<S> ColumnarBulkInserter<S, TextColumn<u8>> {
295 /// Takes one element from the iterator for each internal column buffer and appends it to the
296 /// end of the buffer. Should a cell of the row be too large for the associated column buffer,
297 /// the column buffer will be reallocated with `1.2` times its size, and rebound to the
298 /// statement.
299 ///
300 /// This method panics if it is tried to insert elements beyond batch size. It will also panic
301 /// if row does not contain at least one item for each internal column buffer.
302 pub fn append<'b>(
303 &mut self,
304 mut row: impl Iterator<Item = Option<&'b [u8]>>,
305 ) -> Result<(), Error>
306 where
307 S: AsStatementRef,
308 {
309 if self.capacity == self.parameter_set_size {
310 panic!("Trying to insert elements into TextRowSet beyond batch size.")
311 }
312
313 let mut col_index = 1;
314 for column in &mut self.parameters {
315 let text = row.next().expect(
316 "Row passed to TextRowSet::append must contain one element for each column.",
317 );
318 if let Some(text) = text {
319 unsafe {
320 column
321 .as_view_mut(col_index, self.statement.as_stmt_ref())
322 .ensure_max_element_length(text.len(), self.parameter_set_size)?;
323 }
324 column.set_value(self.parameter_set_size, Some(text));
325 } else {
326 column.set_value(self.parameter_set_size, None);
327 }
328 col_index += 1;
329 }
330
331 self.parameter_set_size += 1;
332
333 Ok(())
334 }
335}
336
337/// Governs how indices of bound buffers map to the indices of the parameters. If the order of the
338/// input buffers matches the order of the placeholders in the SQL statement, then you can just use
339/// [`InOrder`] as the mapping.
340///
341/// Then using array input parameters to determine the values of placeholders in an SQL statement to
342/// be executed the indices of the placeholders and the column buffers may differ. For starters the
343/// column buffer indices are zero based, whereas the parameter indices are one based. On top of
344/// that more complex mappings can emerge if the same input buffer should be reused to fill in for
345/// multiple placeholders. In case the same value would appear in the query twice.
346pub trait InputParameterMapping {
347 fn parameter_index_to_column_index(&self, paramteter_index: u16) -> usize;
348 fn num_parameters(&self) -> usize;
349}
350
351/// An implementation of [`InputParameterMapping`] that should be used if the order of the column
352/// buffers for the array input parameters matches the order of the placeholders in the SQL
353/// Statement.
354pub struct InOrder {
355 /// Corresponds to the number of placeholders in the SQL statement.
356 number_of_parameters: usize,
357}
358
359impl InOrder {
360 /// Creates a new [`InOrder`] mapping for the given number of parameters. The number of
361 /// paremeters is the number of placeholders (`?`) in the SQL statement.
362 pub fn new(number_of_parameters: usize) -> Self {
363 Self {
364 number_of_parameters,
365 }
366 }
367}
368
369impl InputParameterMapping for InOrder {
370 fn parameter_index_to_column_index(&self, paramteter_index: u16) -> usize {
371 debug_assert_ne!(0, paramteter_index, "Parameter index must be one based.");
372 // Each parameter matches exactly one column. Also the column index is zero based, but
373 // parameter indices are one based.
374 (paramteter_index - 1) as usize
375 }
376
377 fn num_parameters(&self) -> usize {
378 self.number_of_parameters
379 }
380}