odbc_api/columnar_bulk_inserter.rs
1use crate::{
2 CursorImpl, Error,
3 buffers::{ColumnBuffer, TextColumn},
4 execute::execute,
5 handles::{AsStatementRef, HasDataType, Statement, StatementRef},
6};
7
8/// Can be used to execute a statement with bulk array paramters. Contrary to its name any statement
9/// with parameters can be executed, not only `INSERT` however inserting large amounts of data in
10/// batches is the primary intended use case.
11///
12/// Binding new buffers is quite expensive in ODBC, so the parameter buffers are reused for each
13/// batch (so the pointers bound to the statment stay valid). So we copy each batch of data into the
14/// buffers already bound first rather than binding user defined buffer. Often the data might need
15/// to be transformed anyway, so the copy is no actual overhead. Once the buffers are filled with a
16/// batch, we send the data.
17pub struct ColumnarBulkInserter<S, C> {
18 // We maintain the invariant that the parameters are bound to the statement that parameter set
19 // size reflects the number of valid rows in the batch.
20 statement: S,
21 parameter_set_size: usize,
22 capacity: usize,
23 /// We maintain the invariant that none of these buffers is truncated.
24 parameters: Vec<C>,
25}
26
27impl<S, C> ColumnarBulkInserter<S, C>
28where
29 S: AsStatementRef,
30{
31 /// Users are not encouraged to call this directly.
32 ///
33 /// # Safety
34 ///
35 /// * Statement is expected to be a perpared statement.
36 /// * Parameters must all be valid for insertion. An example for an invalid parameter would be
37 /// a text buffer with a cell those indiactor value exceeds the maximum element length. This
38 /// can happen after when truncation occurs then writing into a buffer.
39 pub unsafe fn new(mut statement: S, parameters: Vec<C>) -> Result<Self, Error>
40 where
41 C: ColumnBuffer + HasDataType,
42 {
43 let mut stmt = statement.as_stmt_ref();
44 stmt.reset_parameters();
45 let mut parameter_number = 1;
46 // Bind buffers to statement.
47 for column in ¶meters {
48 if let Err(error) =
49 unsafe { stmt.bind_input_parameter(parameter_number, column) }.into_result(&stmt)
50 {
51 // This early return using `?` is risky. We actually did bind some parameters
52 // already. We cannot guarantee that the bound pointers stay valid in case of an
53 // error since `Self` is never constructed. We would away with this, if we took
54 // ownership of the statement and it is destroyed should the constructor not
55 // succeed. However columnar bulk inserter can also be instantiated with borrowed
56 // statements. This is why we reset the parameters on error.
57 stmt.reset_parameters();
58 return Err(error);
59 }
60 parameter_number += 1;
61 }
62 let capacity = parameters
63 .iter()
64 .map(|col| col.capacity())
65 .min()
66 .unwrap_or(0);
67 Ok(Self {
68 statement,
69 parameter_set_size: 0,
70 capacity,
71 parameters,
72 })
73 }
74
75 /// Execute the prepared statement, with the parameters bound
76 pub fn execute(&mut self) -> Result<Option<CursorImpl<StatementRef<'_>>>, Error> {
77 let mut stmt = self.statement.as_stmt_ref();
78 unsafe {
79 if self.parameter_set_size == 0 {
80 // A batch size of 0 will not execute anything, same as for execute on connection or
81 // prepared.
82 Ok(None)
83 } else {
84 // We reset the parameter set size, in order to adequatly handle batches of
85 // different size then inserting into the database.
86 stmt.set_paramset_size(self.parameter_set_size);
87 execute(stmt, None)
88 }
89 }
90 }
91
92 /// Sets the number of rows in the buffer to zero.
93 pub fn clear(&mut self) {
94 self.parameter_set_size = 0;
95 }
96
97 /// Number of valid rows in the buffer
98 pub fn num_rows(&self) -> usize {
99 self.parameter_set_size
100 }
101
102 /// Set number of valid rows in the buffer. Must not be larger than the batch size. If the
103 /// specified number than the number of valid rows currently held by the buffer additional they
104 /// will just hold the value previously assigned to them. Therfore if extending the number of
105 /// valid rows users should take care to assign values to these rows. However, even if not
106 /// assigend it is always guaranteed that every cell is valid for insertion and will not cause
107 /// out of bounds access down in the ODBC driver. Therefore this method is safe. You can set
108 /// the number of valid rows before or after filling values into the buffer, but you must do so
109 /// before executing the query.
110 pub fn set_num_rows(&mut self, num_rows: usize) {
111 if num_rows > self.capacity {
112 panic!(
113 "Columnar buffer may not be resized to a value higher than the maximum number of \
114 rows initially specified in the constructor."
115 );
116 }
117 self.parameter_set_size = num_rows;
118 }
119
120 /// Use this method to gain write access to the actual column data.
121 ///
122 /// # Parameters
123 ///
124 /// * `buffer_index`: Please note that the buffer index is not identical to the ODBC column
125 /// index. For one it is zero based. It also indexes the buffer bound, and not the columns of
126 /// the output result set. This is important, because not every column needs to be bound. Some
127 /// columns may simply be ignored. That being said, if every column of the output is bound in
128 /// the buffer, in the same order in which they are enumerated in the result set, the
129 /// relationship between column index and buffer index is `buffer_index = column_index - 1`.
130 ///
131 /// # Example
132 ///
133 /// This method is intended to be called if using [`ColumnarBulkInserter`] for column wise bulk
134 /// inserts.
135 ///
136 /// ```no_run
137 /// use odbc_api::{Connection, Error, buffers::BufferDesc};
138 ///
139 /// fn insert_birth_years(conn: &Connection, names: &[&str], years: &[i16])
140 /// -> Result<(), Error>
141 /// {
142 ///
143 /// // All columns must have equal length.
144 /// assert_eq!(names.len(), years.len());
145 /// // Prepare the insert statement
146 /// let prepared = conn.prepare("INSERT INTO Birthdays (name, year) VALUES (?, ?)")?;
147 /// // Create a columnar buffer which fits the input parameters.
148 /// let buffer_description = [
149 /// BufferDesc::Text { max_str_len: 255 },
150 /// BufferDesc::I16 { nullable: false },
151 /// ];
152 /// // Here we do everything in one batch. So the capacity is the number of input
153 /// // parameters.
154 /// let capacity = names.len();
155 /// let mut prebound = prepared.into_column_inserter(capacity, buffer_description)?;
156 /// // Set number of input rows in the current batch.
157 /// prebound.set_num_rows(names.len());
158 /// // Fill the buffer with values column by column
159 ///
160 /// // Fill names
161 /// let mut col = prebound
162 /// .column_mut(0)
163 /// .as_text_view()
164 /// .expect("We know the name column to hold text.");
165 /// for (index, name) in names.iter().map(|s| Some(s.as_bytes())).enumerate() {
166 /// col.set_cell(index, name);
167 /// }
168 ///
169 /// // Fill birth years
170 /// let mut col = prebound
171 /// .column_mut(1)
172 /// .as_slice::<i16>()
173 /// .expect("We know the year column to hold i16.");
174 /// col.copy_from_slice(years);
175 ///
176 /// // Execute the prepared statment with the bound array parameters. Sending the values to
177 /// // the database.
178 /// prebound.execute()?;
179 /// Ok(())
180 /// }
181 /// ```
182 pub fn column_mut<'a>(&'a mut self, buffer_index: usize) -> C::SliceMut
183 where
184 C: BoundInputSlice<'a>,
185 {
186 unsafe {
187 self.parameters[buffer_index]
188 .as_view_mut((buffer_index + 1) as u16, self.statement.as_stmt_ref())
189 }
190 }
191
192 /// Maximum number of rows the buffer can hold at once.
193 pub fn capacity(&self) -> usize {
194 self.capacity
195 }
196}
197
198/// You can obtain a mutable slice of a column buffer which allows you to change its contents.
199///
200/// # Safety
201///
202/// * If any operations have been performed which would invalidate the pointers bound to the
203/// statement, the slice must use the statement handle to rebind the column, at the end of its
204/// lifetime (at the latest).
205/// * All values must be complete. I.e. none of the values must be truncated.
206pub unsafe trait BoundInputSlice<'a> {
207 /// Intended to allow for modifying buffer contents, while leaving the bound parameter buffers
208 /// valid.
209 type SliceMut;
210
211 /// Obtain a mutable view on a parameter buffer in order to change the parameter value(s)
212 /// submitted when executing the statement.
213 ///
214 /// # Safety
215 ///
216 /// * The statement must be the statment the column buffer is bound to. The index must be the
217 /// parameter index it is bound at.
218 /// * All values must be complete. I.e. none of the values must be truncated.
219 unsafe fn as_view_mut(
220 &'a mut self,
221 parameter_index: u16,
222 stmt: StatementRef<'a>,
223 ) -> Self::SliceMut;
224}
225
226impl<S> ColumnarBulkInserter<S, TextColumn<u8>> {
227 /// Takes one element from the iterator for each internal column buffer and appends it to the
228 /// end of the buffer. Should a cell of the row be too large for the associated column buffer,
229 /// the column buffer will be reallocated with `1.2` times its size, and rebound to the
230 /// statement.
231 ///
232 /// This method panics if it is tried to insert elements beyond batch size. It will also panic
233 /// if row does not contain at least one item for each internal column buffer.
234 pub fn append<'b>(
235 &mut self,
236 mut row: impl Iterator<Item = Option<&'b [u8]>>,
237 ) -> Result<(), Error>
238 where
239 S: AsStatementRef,
240 {
241 if self.capacity == self.parameter_set_size {
242 panic!("Trying to insert elements into TextRowSet beyond batch size.")
243 }
244
245 let mut col_index = 1;
246 for column in &mut self.parameters {
247 let text = row.next().expect(
248 "Row passed to TextRowSet::append must contain one element for each column.",
249 );
250 if let Some(text) = text {
251 unsafe {
252 column
253 .as_view_mut(col_index, self.statement.as_stmt_ref())
254 .ensure_max_element_length(text.len(), self.parameter_set_size)?;
255 }
256 column.set_value(self.parameter_set_size, Some(text));
257 } else {
258 column.set_value(self.parameter_set_size, None);
259 }
260 col_index += 1;
261 }
262
263 self.parameter_set_size += 1;
264
265 Ok(())
266 }
267}