odbc_api/cursor.rs
1mod block_cursor;
2mod concurrent_block_cursor;
3mod polling_cursor;
4
5use log::warn;
6use odbc_sys::HStmt;
7
8use crate::{
9 Error, ResultSetMetadata,
10 buffers::Indicator,
11 error::ExtendResult,
12 handles::{
13 AsStatementRef, CDataMut, DiagnosticStream, SqlResult, State, Statement, StatementRef,
14 },
15 parameter::{Binary, CElement, Text, VarCell, VarKind, WideText},
16};
17
18use std::{
19 mem::{MaybeUninit, size_of},
20 ptr,
21 thread::panicking,
22};
23
24pub use self::{
25 block_cursor::{BlockCursor, BlockCursorIterator},
26 concurrent_block_cursor::ConcurrentBlockCursor,
27 polling_cursor::{BlockCursorPolling, CursorPolling},
28};
29
30/// Cursors are used to process and iterate the result sets returned by executing queries.
31///
32/// # Example: Fetching result in batches
33///
34/// ```rust
35/// use odbc_api::{Cursor, buffers::{BufferDesc, ColumnarAnyBuffer}, Error};
36///
37/// /// Fetches all values from the first column of the cursor as i32 in batches of 100 and stores
38/// /// them in a vector.
39/// fn fetch_all_ints(cursor: impl Cursor) -> Result<Vec<i32>, Error> {
40/// let mut all_ints = Vec::new();
41/// // Batch size determines how many values we fetch at once.
42/// let batch_size = 100;
43/// // We expect the first column to hold INTEGERs (or a type convertible to INTEGER). Use
44/// // the metadata on the result set, if you want to investige the types of the columns at
45/// // runtime.
46/// let description = BufferDesc::I32 { nullable: false };
47/// // This is the buffer we bind to the driver, and repeatedly use to fetch each batch
48/// let buffer = ColumnarAnyBuffer::from_descs(batch_size, [description]);
49/// // Bind buffer to cursor
50/// let mut row_set_buffer = cursor.bind_buffer(buffer)?;
51/// // Fetch data batch by batch
52/// while let Some(batch) = row_set_buffer.fetch()? {
53/// all_ints.extend_from_slice(batch.column(0).as_slice().unwrap())
54/// }
55/// Ok(all_ints)
56/// }
57/// ```
58pub trait Cursor: ResultSetMetadata {
59 /// Advances the cursor to the next row in the result set. This is **Slow**. Bind
60 /// [`crate::buffers`] instead, for good performance.
61 ///
62 /// ⚠ While this method is very convenient due to the fact that the application does not have
63 /// to declare and bind specific buffers, it is also in many situations extremely slow. Concrete
64 /// performance depends on the ODBC driver in question, but it is likely it performs a roundtrip
65 /// to the datasource for each individual row. It is also likely an extra conversion is
66 /// performed then requesting individual fields, since the C buffer type is not known to the
67 /// driver in advance. Consider binding a buffer to the cursor first using
68 /// [`Self::bind_buffer`].
69 ///
70 /// That being said, it is a convenient programming model, as the developer does not need to
71 /// prepare and allocate the buffers beforehand. It is also a good way to retrieve really large
72 /// single values out of a data source (like one large text file). See [`CursorRow::get_text`].
73 fn next_row(&mut self) -> Result<Option<CursorRow<'_>>, Error> {
74 let row_available = unsafe {
75 self.as_stmt_ref()
76 .fetch()
77 .into_result_bool(&self.as_stmt_ref())?
78 };
79 let ret = if row_available {
80 Some(unsafe { CursorRow::new(self.as_stmt_ref()) })
81 } else {
82 None
83 };
84 Ok(ret)
85 }
86
87 /// Binds this cursor to a buffer holding a row set.
88 fn bind_buffer<B>(self, row_set_buffer: B) -> Result<BlockCursor<Self, B>, Error>
89 where
90 Self: Sized,
91 B: RowSetBuffer;
92
93 /// For some datasources it is possible to create more than one result set at once via a call to
94 /// execute. E.g. by calling a stored procedure or executing multiple SQL statements at once.
95 /// This method consumes the current cursor and creates a new one representing the next result
96 /// set should it exist.
97 fn more_results(self) -> Result<Option<Self>, Error>
98 where
99 Self: Sized;
100}
101
102/// An individual row of an result set. See [`crate::Cursor::next_row`].
103pub struct CursorRow<'s> {
104 statement: StatementRef<'s>,
105}
106
107impl<'s> CursorRow<'s> {
108 /// # Safety
109 ///
110 /// `statement` must be in a cursor state.
111 unsafe fn new(statement: StatementRef<'s>) -> Self {
112 CursorRow { statement }
113 }
114}
115
116impl CursorRow<'_> {
117 /// Fills a suitable target buffer with a field from the current row of the result set. This
118 /// method drains the data from the field. It can be called repeatedly to if not all the data
119 /// fit in the output buffer at once. It should not called repeatedly to fetch the same value
120 /// twice. Column index starts at `1`.
121 ///
122 /// You can use [`crate::Nullable`] to fetch nullable values.
123 ///
124 /// # Example
125 ///
126 /// ```
127 /// # use odbc_api::{Cursor, Error, Nullable};
128 /// # fn fetch_values_example(cursor: &mut impl Cursor) -> Result<(), Error> {
129 /// // Declare nullable value to fetch value into. ODBC values layout is different from Rusts
130 /// // option. We can not use `Option<i32>` directly.
131 /// let mut field = Nullable::<i32>::null();
132 /// // Move cursor to next row
133 /// let mut row = cursor.next_row()?.unwrap();
134 /// // Fetch first column into field
135 /// row.get_data(1, &mut field)?;
136 /// // Convert nullable value to Option for convinience
137 /// let field = field.into_opt();
138 /// if let Some(value) = field {
139 /// println!("Value: {}", value);
140 /// } else {
141 /// println!("Value is NULL");
142 /// }
143 /// # Ok(())
144 /// # }
145 /// ```
146 pub fn get_data(
147 &mut self,
148 col_or_param_num: u16,
149 target: &mut (impl CElement + CDataMut),
150 ) -> Result<(), Error> {
151 self.statement
152 .get_data(col_or_param_num, target)
153 .into_result(&self.statement)
154 .provide_context_for_diagnostic(|record, function| {
155 if record.state == State::INDICATOR_VARIABLE_REQUIRED_BUT_NOT_SUPPLIED {
156 Error::UnableToRepresentNull(record)
157 } else {
158 Error::Diagnostics { record, function }
159 }
160 })
161 }
162
163 /// Retrieves arbitrary large character data from the row and stores it in the buffer. Column
164 /// index starts at `1`. The used encoding is accordig to the ODBC standard determined by your
165 /// system local. Ultimatly the choice is up to the implementation of your ODBC driver, which
166 /// often defaults to always UTF-8.
167 ///
168 /// # Example
169 ///
170 /// Retrieve an arbitrary large text file from a database field.
171 ///
172 /// ```
173 /// use odbc_api::{Connection, Error, IntoParameter, Cursor};
174 ///
175 /// fn get_large_text(name: &str, conn: &mut Connection<'_>) -> Result<Option<String>, Error> {
176 /// let query = "SELECT content FROM LargeFiles WHERE name=?";
177 /// let parameters = &name.into_parameter();
178 /// let timeout_sec = None;
179 /// let mut cursor = conn
180 /// .execute(query, parameters, timeout_sec)?
181 /// .expect("Assume select statement creates cursor");
182 /// if let Some(mut row) = cursor.next_row()? {
183 /// let mut buf = Vec::new();
184 /// row.get_text(1, &mut buf)?;
185 /// let ret = String::from_utf8(buf).unwrap();
186 /// Ok(Some(ret))
187 /// } else {
188 /// Ok(None)
189 /// }
190 /// }
191 /// ```
192 ///
193 /// # Return
194 ///
195 /// `true` indicates that the value has not been `NULL` and the value has been placed in `buf`.
196 /// `false` indicates that the value is `NULL`. The buffer is cleared in that case.
197 pub fn get_text(&mut self, col_or_param_num: u16, buf: &mut Vec<u8>) -> Result<bool, Error> {
198 self.get_variadic::<Text>(col_or_param_num, buf)
199 }
200
201 /// Retrieves arbitrary large character data from the row and stores it in the buffer. Column
202 /// index starts at `1`. The used encoding is UTF-16.
203 ///
204 /// # Return
205 ///
206 /// `true` indicates that the value has not been `NULL` and the value has been placed in `buf`.
207 /// `false` indicates that the value is `NULL`. The buffer is cleared in that case.
208 pub fn get_wide_text(
209 &mut self,
210 col_or_param_num: u16,
211 buf: &mut Vec<u16>,
212 ) -> Result<bool, Error> {
213 self.get_variadic::<WideText>(col_or_param_num, buf)
214 }
215
216 /// Retrieves arbitrary large binary data from the row and stores it in the buffer. Column index
217 /// starts at `1`.
218 ///
219 /// # Return
220 ///
221 /// `true` indicates that the value has not been `NULL` and the value has been placed in `buf`.
222 /// `false` indicates that the value is `NULL`. The buffer is cleared in that case.
223 pub fn get_binary(&mut self, col_or_param_num: u16, buf: &mut Vec<u8>) -> Result<bool, Error> {
224 self.get_variadic::<Binary>(col_or_param_num, buf)
225 }
226
227 fn get_variadic<K: VarKind>(
228 &mut self,
229 col_or_param_num: u16,
230 buf: &mut Vec<K::Element>,
231 ) -> Result<bool, Error> {
232 if buf.capacity() == 0 {
233 // User did just provide an empty buffer. So it is fair to assume not much domain
234 // knowledge has been used to decide its size. We just default to 256 to increase the
235 // chance that we get it done with one alloctaion. The buffer size being 0 we need at
236 // least 1 anyway. If the capacity is not `0` we'll leave the buffer size untouched as
237 // we do not want to prevent users from providing better guessen based on domain
238 // knowledge.
239 // This also implicitly makes sure that we can at least hold one terminating zero.
240 buf.reserve(256);
241 }
242 // Utilize all of the allocated buffer.
243 buf.resize(buf.capacity(), K::ZERO);
244
245 // Did we learn how much capacity we need in the last iteration? We use this only to panic
246 // on erroneous implementations of get_data and avoid endless looping until we run out of
247 // memory.
248 let mut remaining_length_known = false;
249 // We repeatedly fetch data and add it to the buffer. The buffer length is therefore the
250 // accumulated value size. The target always points to the last window in buf which is going
251 // to contain the **next** part of the data, whereas buf contains the entire accumulated
252 // value so far.
253 let mut target =
254 VarCell::<&mut [K::Element], K>::from_buffer(buf.as_mut_slice(), Indicator::NoTotal);
255 self.get_data(col_or_param_num, &mut target)?;
256 while !target.is_complete() {
257 // Amount of payload bytes (excluding terminating zeros) fetched with the last call to
258 // get_data.
259 let fetched = target
260 .len_in_bytes()
261 .expect("ODBC driver must always report how many bytes were fetched.");
262 match target.indicator() {
263 // If Null the value would be complete
264 Indicator::Null => unreachable!(),
265 // We do not know how large the value is. Let's fetch the data with repeated calls
266 // to get_data.
267 Indicator::NoTotal => {
268 let old_len = buf.len();
269 // Use an exponential strategy for increasing buffer size.
270 buf.resize(old_len * 2, K::ZERO);
271 let buf_extend = &mut buf[(old_len - K::TERMINATING_ZEROES)..];
272 target = VarCell::<&mut [K::Element], K>::from_buffer(
273 buf_extend,
274 Indicator::NoTotal,
275 );
276 }
277 // We did not get all of the value in one go, but the data source has been friendly
278 // enough to tell us how much is missing.
279 Indicator::Length(len) => {
280 if remaining_length_known {
281 panic!(
282 "SQLGetData has been unable to fetch all data, even though the \
283 capacity of the target buffer has been adapted to hold the entire \
284 payload based on the indicator of the last part. You may consider \
285 filing a bug with the ODBC driver you are using."
286 )
287 }
288 remaining_length_known = true;
289 // Amount of bytes missing from the value using get_data, excluding terminating
290 // zero.
291 let still_missing_in_bytes = len - fetched;
292 let still_missing = still_missing_in_bytes / size_of::<K::Element>();
293 let old_len = buf.len();
294 buf.resize(old_len + still_missing, K::ZERO);
295 let buf_extend = &mut buf[(old_len - K::TERMINATING_ZEROES)..];
296 target = VarCell::<&mut [K::Element], K>::from_buffer(
297 buf_extend,
298 Indicator::NoTotal,
299 );
300 }
301 }
302 // Fetch binary data into buffer.
303 self.get_data(col_or_param_num, &mut target)?;
304 }
305 // We did get the complete value, including the terminating zero. Let's resize the buffer to
306 // match the retrieved value exactly (excluding terminating zero).
307 if let Some(len_in_bytes) = target.indicator().length() {
308 // Since the indicator refers to value length without terminating zero, and capacity is
309 // including the terminating zero this also implicitly drops the terminating zero at the
310 // end of the buffer.
311 let shrink_by_bytes = target.capacity_in_bytes() - len_in_bytes;
312 let shrink_by_chars = shrink_by_bytes / size_of::<K::Element>();
313 buf.resize(buf.len() - shrink_by_chars, K::ZERO);
314 Ok(true)
315 } else {
316 // value is NULL
317 buf.clear();
318 Ok(false)
319 }
320 }
321}
322
323/// Cursors are used to process and iterate the result sets returned by executing queries. Created
324/// by either a prepared query or direct execution. Usually utilized through the [`crate::Cursor`]
325/// trait.
326#[derive(Debug)]
327pub struct CursorImpl<Stmt: AsStatementRef> {
328 /// A statement handle in cursor mode.
329 statement: Stmt,
330}
331
332impl<S> Drop for CursorImpl<S>
333where
334 S: AsStatementRef,
335{
336 fn drop(&mut self) {
337 let mut stmt = self.statement.as_stmt_ref();
338 if let Err(e) = stmt.close_cursor().into_result(&stmt) {
339 // Avoid panicking, if we already have a panic. We don't want to mask the original
340 // error.
341 if !panicking() {
342 panic!("Unexpected error closing cursor: {e:?}")
343 }
344 }
345 }
346}
347
348impl<S> AsStatementRef for CursorImpl<S>
349where
350 S: AsStatementRef,
351{
352 fn as_stmt_ref(&mut self) -> StatementRef<'_> {
353 self.statement.as_stmt_ref()
354 }
355}
356
357impl<S> ResultSetMetadata for CursorImpl<S> where S: AsStatementRef {}
358
359impl<S> Cursor for CursorImpl<S>
360where
361 S: AsStatementRef,
362{
363 fn bind_buffer<B>(mut self, mut row_set_buffer: B) -> Result<BlockCursor<Self, B>, Error>
364 where
365 B: RowSetBuffer,
366 {
367 let stmt = self.statement.as_stmt_ref();
368 unsafe {
369 bind_row_set_buffer_to_statement(stmt, &mut row_set_buffer)?;
370 }
371 Ok(BlockCursor::new(row_set_buffer, self))
372 }
373
374 fn more_results(self) -> Result<Option<Self>, Error>
375 where
376 Self: Sized,
377 {
378 // Consume self without calling drop to avoid calling close_cursor.
379 let mut statement = self.into_stmt();
380 let mut stmt = statement.as_stmt_ref();
381
382 let has_another_result = unsafe { stmt.more_results() }.into_result_bool(&stmt)?;
383 let next = if has_another_result {
384 Some(CursorImpl { statement })
385 } else {
386 None
387 };
388 Ok(next)
389 }
390}
391
392impl<S> CursorImpl<S>
393where
394 S: AsStatementRef,
395{
396 /// Users of this library are encouraged not to call this constructor directly but rather invoke
397 /// [`crate::Connection::execute`] or [`crate::Prepared::execute`] to get a cursor and utilize
398 /// it using the [`crate::Cursor`] trait. This method is public so users with an understanding
399 /// of the raw ODBC C-API have a way to create a cursor, after they left the safety rails of the
400 /// Rust type System, in order to implement a use case not covered yet, by the safe abstractions
401 /// within this crate.
402 ///
403 /// # Safety
404 ///
405 /// `statement` must be in Cursor state, for the invariants of this type to hold.
406 pub unsafe fn new(statement: S) -> Self {
407 Self { statement }
408 }
409
410 /// Deconstructs the `CursorImpl` without calling drop. This is a way to get to the underlying
411 /// statement, while preventing a call to close cursor.
412 pub fn into_stmt(self) -> S {
413 // We want to move `statement` out of self, which would make self partially uninitialized.
414 let dont_drop_me = MaybeUninit::new(self);
415 let self_ptr = dont_drop_me.as_ptr();
416
417 // Safety: We know `dont_drop_me` is valid at this point so reading the ptr is okay
418 unsafe { ptr::read(&(*self_ptr).statement) }
419 }
420
421 pub(crate) fn as_sys(&mut self) -> HStmt {
422 self.as_stmt_ref().as_sys()
423 }
424}
425
426/// A Row set buffer binds row, or column wise buffers to a cursor in order to fill them with row
427/// sets with each call to fetch.
428///
429/// # Safety
430///
431/// Implementers of this trait must ensure that every pointer bound in `bind_to_cursor` stays valid
432/// even if an instance is moved in memory. Bound members should therefore be likely references
433/// themselves. To bind stack allocated buffers it is recommended to implement this trait on the
434/// reference type instead.
435pub unsafe trait RowSetBuffer {
436 /// Declares the bind type of the Row set buffer. `0` Means a columnar binding is used. Any non
437 /// zero number is interpreted as the size of a single row in a row wise binding style.
438 fn bind_type(&self) -> usize;
439
440 /// The batch size for bulk cursors, if retrieving many rows at once.
441 fn row_array_size(&self) -> usize;
442
443 /// Mutable reference to the number of fetched rows.
444 ///
445 /// # Safety
446 ///
447 /// Implementations of this method must take care that the returned referenced stays valid, even
448 /// if `self` should be moved.
449 fn mut_num_fetch_rows(&mut self) -> &mut usize;
450
451 /// Binds the buffer either column or row wise to the cursor.
452 ///
453 /// # Safety
454 ///
455 /// It's the implementation's responsibility to ensure that all bound buffers are valid until
456 /// unbound or the statement handle is deleted.
457 unsafe fn bind_colmuns_to_cursor(&mut self, cursor: StatementRef<'_>) -> Result<(), Error>;
458
459 /// Find an indicator larger than the maximum element size of the buffer.
460 fn find_truncation(&self) -> Option<TruncationInfo>;
461}
462
463/// Returned by [`RowSetBuffer::find_truncation`]. Contains information about the truncation found.
464#[derive(Clone, Copy, PartialEq, Eq, Debug)]
465pub struct TruncationInfo {
466 /// Length of the untruncated value if known
467 pub indicator: Option<usize>,
468 /// Zero based buffer index of the column in which the truncation occurred.
469 pub buffer_index: usize,
470}
471
472unsafe impl<T: RowSetBuffer> RowSetBuffer for &mut T {
473 fn bind_type(&self) -> usize {
474 (**self).bind_type()
475 }
476
477 fn row_array_size(&self) -> usize {
478 (**self).row_array_size()
479 }
480
481 fn mut_num_fetch_rows(&mut self) -> &mut usize {
482 (*self).mut_num_fetch_rows()
483 }
484
485 unsafe fn bind_colmuns_to_cursor(&mut self, cursor: StatementRef<'_>) -> Result<(), Error> {
486 unsafe { (*self).bind_colmuns_to_cursor(cursor) }
487 }
488
489 fn find_truncation(&self) -> Option<TruncationInfo> {
490 (**self).find_truncation()
491 }
492}
493
494/// Binds a row set buffer to a statment. Implementation is shared between synchronous and
495/// asynchronous cursors.
496unsafe fn bind_row_set_buffer_to_statement(
497 mut stmt: StatementRef<'_>,
498 row_set_buffer: &mut impl RowSetBuffer,
499) -> Result<(), Error> {
500 unsafe {
501 stmt.set_row_bind_type(row_set_buffer.bind_type())
502 .into_result(&stmt)?;
503 let size = row_set_buffer.row_array_size();
504 let sql_result = stmt.set_row_array_size(size);
505
506 // Search for "option value changed". A QODBC driver reported "option value changed", yet
507 // set the value to `723477590136`. We want to panic if something like this happens.
508 //
509 // See: <https://github.com/pacman82/odbc-api/discussions/742#discussioncomment-13887516>
510 let mut diagnostic_stream = DiagnosticStream::new(&stmt);
511 // We just rememeber that we have seen "option value changed", before asking for the array
512 // size, in order to not mess with other diagnostic records.
513 let mut option_value_changed = false;
514 while let Some(record) = diagnostic_stream.next() {
515 warn!("{record}");
516 if record.state == State::OPTION_VALUE_CHANGED {
517 option_value_changed = true;
518 }
519 }
520 if option_value_changed {
521 // Now rejecting a too large buffer size is save, but not if the value is something
522 // even larger after. Zero is also suspicious.
523 let actual_size = stmt.row_array_size().into_result(&stmt)?;
524 warn!(
525 "Row array size set by the driver to: {actual_size}. Desired size had been: {size}"
526 );
527 if actual_size > size || actual_size == 0 {
528 panic!(
529 "Your ODBC buffer changed the array size for bulk fetchin in an unsound way. \
530 To prevent undefined behavior the application must panic. You can try \
531 different batch sizes for bulk fetching, or report a bug with your ODBC driver \
532 provider. This behavior has been observed with QODBC drivers. If you are using \
533 one try fetching row by row rather than the faster bulk fetch."
534 )
535 }
536 }
537
538 sql_result
539 // We already logged diagnostic records then we were looking for Option value changed
540 .into_result_without_logging(&stmt)
541 // SAP anywhere has been seen to return with an "invalid attribute" error instead of
542 // a success with "option value changed" info. Let us map invalid attributes during
543 // setting row set array size to something more precise.
544 .provide_context_for_diagnostic(|record, function| {
545 if record.state == State::INVALID_ATTRIBUTE_VALUE {
546 Error::InvalidRowArraySize { record, size }
547 } else {
548 Error::Diagnostics { record, function }
549 }
550 })?;
551 stmt.set_num_rows_fetched(row_set_buffer.mut_num_fetch_rows())
552 .into_result(&stmt)?;
553 row_set_buffer.bind_colmuns_to_cursor(stmt)?;
554 Ok(())
555 }
556}
557
558/// Error handling for bulk fetching is shared between synchronous and asynchronous usecase.
559fn error_handling_for_fetch(
560 result: SqlResult<()>,
561 mut stmt: StatementRef,
562 buffer: &impl RowSetBuffer,
563 error_for_truncation: bool,
564) -> Result<bool, Error> {
565 // Only check for truncation if a) the user indicated that he wants to error instead of just
566 // ignoring it and if there is at least one diagnostic record. ODBC standard requires a
567 // diagnostic record to be there in case of truncation. Sadly we can not rely on this particular
568 // record to be there, as the driver could generate a large amount of diagnostic records,
569 // while we are limited in the amount we can check. The second check serves as an optimization
570 // for the happy path.
571 if error_for_truncation && result == SqlResult::SuccessWithInfo(()) {
572 if let Some(TruncationInfo {
573 indicator,
574 buffer_index,
575 }) = buffer.find_truncation()
576 {
577 return Err(Error::TooLargeValueForBuffer {
578 indicator,
579 buffer_index,
580 });
581 }
582 }
583
584 let has_row = result
585 .on_success(|| true)
586 .on_no_data(|| false)
587 .into_result(&stmt.as_stmt_ref())
588 // Oracle's ODBC driver does not support 64Bit integers. Furthermore, it does not
589 // tell it to the user when binding parameters, but rather now then we fetch
590 // results. The error code returned is `HY004` rather than `HY003` which should
591 // be used to indicate invalid buffer types.
592 .provide_context_for_diagnostic(|record, function| {
593 if record.state == State::INVALID_SQL_DATA_TYPE {
594 Error::OracleOdbcDriverDoesNotSupport64Bit(record)
595 } else {
596 Error::Diagnostics { record, function }
597 }
598 })?;
599 Ok(has_row)
600}
601
602/// Unbinds buffer and num_rows_fetched from the cursor. This implementation is shared between
603/// unbind and the drop handler, and the synchronous and asynchronous variant.
604fn unbind_buffer_from_cursor(cursor: &mut impl AsStatementRef) -> Result<(), Error> {
605 // Now that we have cursor out of block cursor, we need to unbind the buffer.
606 let mut stmt = cursor.as_stmt_ref();
607 stmt.unbind_cols().into_result(&stmt)?;
608 stmt.unset_num_rows_fetched().into_result(&stmt)?;
609 Ok(())
610}