odbc_api/cursor/
concurrent_block_cursor.rs

1use std::{
2    mem::swap,
3    sync::mpsc::{Receiver, SyncSender, sync_channel},
4    thread::{self, JoinHandle},
5};
6
7use crate::{BlockCursor, Cursor, Error};
8
9use super::RowSetBuffer;
10
11/// A wrapper around block cursors which fetches data in a dedicated system thread. Intended to
12/// fetch data batch by batch while the application processes the batch last fetched. Works best
13/// with a double buffer strategy using two fetch buffers.
14///
15/// # Example
16///
17/// ```no_run
18/// use odbc_api::{
19///     environment, buffers::{ColumnarAnyBuffer, BufferDesc}, Cursor, ConcurrentBlockCursor
20/// };
21///
22/// // We want to use the ODBC environment from another system thread without scope => Therefore it
23/// // needs to be static.
24/// let env = environment()?;
25///
26/// let conn = env.connect_with_connection_string(
27///     "Driver={ODBC Driver 18 for SQL Server};Server=localhost;UID=SA;PWD=My@Test@Password1;",
28///     Default::default())?;
29///
30/// let query = "SELECT * FROM very_big_table";
31/// let params = ();
32/// let timeout_sec = None;
33/// // We use `into_cursor` to create a statement handle, which also owns the connection and has a
34/// // static lifetime. This way we can send it to another thread safely.
35/// let cursor = conn.into_cursor(query, params, timeout_sec)?.unwrap();
36///
37/// // Batch size and buffer description. Here we assume there is only one integer column
38/// let buffer_a = ColumnarAnyBuffer::from_descs(1000, [BufferDesc::I32 { nullable: false }]);
39/// let mut buffer_b = ColumnarAnyBuffer::from_descs(1000, [BufferDesc::I32 { nullable: false }]);
40/// // And now we have a sendable block cursor with static lifetime
41/// let block_cursor = cursor.bind_buffer(buffer_a)?;
42///
43/// let mut cbc = ConcurrentBlockCursor::from_block_cursor(block_cursor);
44/// while cbc.fetch_into(&mut buffer_b)? {
45///     // Proccess batch in buffer b asynchronously to fetching it
46/// }
47///
48/// # Ok::<_, odbc_api::Error>(())
49/// ```
50pub struct ConcurrentBlockCursor<C, B> {
51    /// In order to avoid reallocating buffers over and over again, we use this channel to send the
52    /// buffers back to the fetch thread after we copied their contents into arrow arrays.
53    send_buffer: SyncSender<B>,
54    /// Receives filled batches from the fetch thread. Once the source is empty or if an error
55    /// occurs its associated sender is dropped, and receiving batches will return an error (which
56    /// we expect during normal operation and cleanup, and is not forwarded to the user).
57    receive_batch: Receiver<B>,
58    /// We join with the fetch thread if we stop receiving batches (i.e. receive_batch.recv()
59    /// returns an error) or `into_cursor` is called. `None` if the thread has already been joined.
60    /// In this case either an error has been reported to the user, or the cursor is stored in
61    /// `cursor`.
62    fetch_thread: Option<JoinHandle<Result<C, Error>>>,
63    /// Only `Some`, if the cursor has been consumed succesfully and `fetch_thread` has been
64    /// joined. Can only be `Some` if `fetch_thread` is `None`. If both `fetch_thread` and
65    /// `cursor` are `None`, it is implied that `fetch_thread` returned an error joining.
66    cursor: Option<C>,
67}
68
69impl<C, B> ConcurrentBlockCursor<C, B>
70where
71    C: Cursor + Send + 'static,
72    B: RowSetBuffer + Send + 'static,
73{
74    /// Construct a new concurrent block cursor.
75    ///
76    /// # Parameters
77    ///
78    /// * `block_cursor`: Taking a BlockCursor instead of a Cursor allows for better resource
79    ///   stealing if constructing starting from a sequential Cursor, as we do not need to undbind
80    ///   and bind the cursor.
81    pub fn from_block_cursor(block_cursor: BlockCursor<C, B>) -> Self {
82        let (send_buffer, receive_buffer) = sync_channel(1);
83        let (send_batch, receive_batch) = sync_channel(1);
84
85        let fetch_thread = thread::spawn(move || {
86            let mut block_cursor = block_cursor;
87            loop {
88                match block_cursor.fetch_with_truncation_check(true) {
89                    Ok(Some(_batch)) => (),
90                    Ok(None) => {
91                        break block_cursor
92                            .unbind()
93                            .map(|(undbound_cursor, _buffer)| undbound_cursor);
94                    }
95                    Err(odbc_error) => {
96                        drop(send_batch);
97                        break Err(odbc_error);
98                    }
99                }
100                // There has been another row group fetched by the cursor. We unbind the buffers so
101                // we can pass ownership of it to the application and bind a new buffer to the
102                // cursor in order to start fetching the next batch.
103                let (cursor, buffer) = block_cursor.unbind()?;
104                if send_batch.send(buffer).is_err() {
105                    // Should the main thread stop receiving buffers, this thread should
106                    // also stop fetching batches.
107                    break Ok(cursor);
108                }
109                // Wait for the application thread to give us a buffer to fill.
110                match receive_buffer.recv() {
111                    Err(_) => {
112                        // Application thread dropped sender and does not want more buffers to be
113                        // filled. Let's stop this thread and return the cursor
114                        break Ok(cursor);
115                    }
116                    Ok(next_buffer) => {
117                        block_cursor = cursor.bind_buffer(next_buffer).unwrap();
118                    }
119                }
120            }
121        });
122
123        Self {
124            send_buffer,
125            receive_batch,
126            fetch_thread: Some(fetch_thread),
127            cursor: None,
128        }
129    }
130
131    /// Join fetch thread and yield the cursor back.
132    pub fn into_cursor(self) -> Result<C, Error> {
133        drop(self.receive_batch);
134        // Dropping the send buffer is necessary to avoid deadlocks, in case there would not be any
135        // buffer in the channel waiting for the fetch thread. Since we consume the cursor here, it
136        // is also impossible for the application to send another buffer.
137        drop(self.send_buffer);
138        if let Some(cursor) = self.cursor {
139            Ok(cursor)
140        } else {
141            self.fetch_thread.unwrap().join().unwrap()
142        }
143    }
144}
145
146impl<C, B> ConcurrentBlockCursor<C, B> {
147    /// Receive the current batch and take ownership of its buffer. `None` if the cursor is already
148    /// consumed, or had an error previously. This method blocks until a new batch available. In
149    /// order for new batches available new buffers must be send to the thread in order for it to
150    /// fill them. So calling fetch repeatedly without calling [`Self::fill`] in between may
151    /// deadlock.
152    pub fn fetch(&mut self) -> Result<Option<B>, Error> {
153        match self.receive_batch.recv() {
154            // We successfully fetched a batch from the database.
155            Ok(batch) => Ok(Some(batch)),
156            // Fetch thread stopped sending batches. Either because we consumed the result set
157            // completly or we hit an error.
158            Err(_receive_error) => {
159                if let Some(join_handle) = self.fetch_thread.take() {
160                    // If there has been an error returning the batch, or unbinding the buffer `?`
161                    // will raise it.
162                    self.cursor = Some(join_handle.join().unwrap()?);
163                    // We ran out of batches in the result set. End the stream.
164                    Ok(None)
165                } else {
166                    // This only happen if this method is called after it returned either `false` or
167                    // `Err` once. Let us treat this scenario like a result set which is consumed
168                    // completly.
169                    Ok(None)
170                }
171            }
172        }
173    }
174
175    /// Send a buffer to the thread fetching in order for it to be filled and to be retrieved later
176    /// using either `fetch`, or `fetch_into`.
177    pub fn fill(&mut self, buffer: B) {
178        let _ = self.send_buffer.send(buffer);
179    }
180
181    /// Fetches values from the ODBC datasource into buffer. Values are streamed batch by batch in
182    /// order to avoid reallocation of the buffers used for tranistion. This call blocks until a new
183    /// batch is ready. This method combines both [`Self::fetch`] and [`Self::fill`].
184    ///
185    /// # Parameters
186    ///
187    /// * `buffer`: A columnar any buffer which can bind to the cursor wrapped by this instance.
188    ///   After the method call the reference will not point to the same instance which had been
189    ///   passed into the function call, but to the one which was bound to the cursor in order to
190    ///   fetch the last batch. The buffer passed into this method, is then used to fetch the next
191    ///   batch. As such this method is ideal to implement concurrent fetching using two buffers.
192    ///   One which is written to, and one that is read, which flip their roles between batches.
193    ///   Also called double buffering.
194    ///
195    /// # Return
196    ///
197    /// * `true`: Fetched a batch from the data source. The contents of that batch are now in
198    ///   `buffer`.
199    /// * `false`: No batch could be fetched. The result set is consumed completly.
200    pub fn fetch_into(&mut self, buffer: &mut B) -> Result<bool, Error> {
201        if let Some(mut batch) = self.fetch()? {
202            swap(buffer, &mut batch);
203            self.fill(batch);
204            Ok(true)
205        } else {
206            Ok(false)
207        }
208    }
209}