odbc_api/cursor/polling_cursor.rs
1use crate::{
2 Error,
3 handles::{AsStatementRef, Statement, StatementRef},
4 sleep::{Sleep, wait_for},
5};
6
7use super::{
8 RowSetBuffer, bind_row_set_buffer_to_statement, error_handling_for_fetch,
9 unbind_buffer_from_cursor,
10};
11
12use std::thread::panicking;
13
14/// The asynchronous sibiling of [`super::CursorImpl`]. Use this to fetch results in asynchronous
15/// code.
16///
17/// Like [`super::CursorImpl`] this is an ODBC statement handle in cursor state. However unlike its
18/// synchronous sibling this statement handle is in asynchronous polling mode.
19pub struct CursorPolling<Stmt: AsStatementRef> {
20 /// A statement handle in cursor state with asynchronous mode enabled.
21 statement: Stmt,
22}
23
24impl<S> CursorPolling<S>
25where
26 S: AsStatementRef,
27{
28 /// Users of this library are encouraged not to call this constructor directly. This method is
29 /// pubilc so users with an understanding of the raw ODBC C-API have a way to create an
30 /// asynchronous cursor, after they left the safety rails of the Rust type System, in order to
31 /// implement a use case not covered yet, by the safe abstractions within this crate.
32 ///
33 /// # Safety
34 ///
35 /// `statement` must be in Cursor state, for the invariants of this type to hold. Preferable
36 /// `statement` should also have asynchrous mode enabled, otherwise constructing a synchronous
37 /// [`super::CursorImpl`] is more suitable.
38 pub unsafe fn new(statement: S) -> Self {
39 Self { statement }
40 }
41
42 /// Binds this cursor to a buffer holding a row set.
43 pub fn bind_buffer<B>(
44 mut self,
45 mut row_set_buffer: B,
46 ) -> Result<BlockCursorPolling<Self, B>, Error>
47 where
48 B: RowSetBuffer,
49 {
50 let stmt = self.statement.as_stmt_ref();
51 unsafe {
52 bind_row_set_buffer_to_statement(stmt, &mut row_set_buffer)?;
53 }
54 Ok(BlockCursorPolling::new(row_set_buffer, self))
55 }
56}
57
58impl<S> AsStatementRef for CursorPolling<S>
59where
60 S: AsStatementRef,
61{
62 fn as_stmt_ref(&mut self) -> StatementRef<'_> {
63 self.statement.as_stmt_ref()
64 }
65}
66
67impl<S> Drop for CursorPolling<S>
68where
69 S: AsStatementRef,
70{
71 fn drop(&mut self) {
72 let mut stmt = self.statement.as_stmt_ref();
73 if let Err(e) = stmt.close_cursor().into_result(&stmt) {
74 // Avoid panicking, if we already have a panic. We don't want to mask the original
75 // error.
76 if !panicking() {
77 panic!("Unexpected error closing cursor: {e:?}")
78 }
79 }
80 }
81}
82
83/// Asynchronously iterates in blocks (called row sets) over a result set, filling a buffers with
84/// a lot of rows at once, instead of iterating the result set row by row. This is usually much
85/// faster. Asynchronous sibiling of [`super::BlockCursor`].
86pub struct BlockCursorPolling<C, B>
87where
88 C: AsStatementRef,
89{
90 buffer: B,
91 cursor: C,
92}
93
94impl<C, B> BlockCursorPolling<C, B>
95where
96 C: AsStatementRef,
97{
98 fn new(buffer: B, cursor: C) -> Self {
99 Self { buffer, cursor }
100 }
101
102 /// Fills the bound buffer with the next row set.
103 ///
104 /// # Return
105 ///
106 /// `None` if the result set is empty and all row sets have been extracted. `Some` with a
107 /// reference to the internal buffer otherwise.
108 pub async fn fetch(&mut self, sleep: impl Sleep) -> Result<Option<&B>, Error>
109 where
110 B: RowSetBuffer,
111 {
112 self.fetch_with_truncation_check(false, sleep).await
113 }
114
115 /// Fills the bound buffer with the next row set. Should `error_for_truncation` be `true`and any
116 /// diagnostic indicate truncation of a value an error is returned.
117 ///
118 /// # Return
119 ///
120 /// `None` if the result set is empty and all row sets have been extracted. `Some` with a
121 /// reference to the internal buffer otherwise.
122 ///
123 /// Call this method to find out whether there are any truncated values in the batch, without
124 /// inspecting all its rows and columns.
125 pub async fn fetch_with_truncation_check(
126 &mut self,
127 error_for_truncation: bool,
128 mut sleep: impl Sleep,
129 ) -> Result<Option<&B>, Error>
130 where
131 B: RowSetBuffer,
132 {
133 let mut stmt = self.cursor.as_stmt_ref();
134 let result = unsafe { wait_for(|| stmt.fetch(), &mut sleep).await };
135 let has_row = error_handling_for_fetch(result, stmt, &self.buffer, error_for_truncation)?;
136 Ok(has_row.then_some(&self.buffer))
137 }
138}
139
140impl<C, B> Drop for BlockCursorPolling<C, B>
141where
142 C: AsStatementRef,
143{
144 fn drop(&mut self) {
145 if let Err(e) = unbind_buffer_from_cursor(&mut self.cursor) {
146 // Avoid panicking, if we already have a panic. We don't want to mask the original
147 // error.
148 if !panicking() {
149 panic!("Unexpected error unbinding columns: {e:?}")
150 }
151 }
152 }
153}