use std::{
mem::swap,
sync::mpsc::{Receiver, SyncSender, sync_channel},
thread::{self, JoinHandle},
};
use crate::{BlockCursor, Cursor, Error};
use super::RowSetBuffer;
pub struct ConcurrentBlockCursor<C, B> {
send_buffer: SyncSender<B>,
receive_batch: Receiver<B>,
fetch_thread: Option<JoinHandle<Result<C, Error>>>,
cursor: Option<C>,
}
impl<C, B> ConcurrentBlockCursor<C, B>
where
C: Cursor + Send + 'static,
B: RowSetBuffer + Send + 'static,
{
pub fn from_block_cursor(block_cursor: BlockCursor<C, B>) -> Self {
let (send_buffer, receive_buffer) = sync_channel(1);
let (send_batch, receive_batch) = sync_channel(1);
let fetch_thread = thread::spawn(move || {
let mut block_cursor = block_cursor;
loop {
match block_cursor.fetch_with_truncation_check(true) {
Ok(Some(_batch)) => (),
Ok(None) => {
break block_cursor
.unbind()
.map(|(undbound_cursor, _buffer)| undbound_cursor);
}
Err(odbc_error) => {
drop(send_batch);
break Err(odbc_error);
}
}
let (cursor, buffer) = block_cursor.unbind()?;
if send_batch.send(buffer).is_err() {
break Ok(cursor);
}
match receive_buffer.recv() {
Err(_) => {
break Ok(cursor);
}
Ok(next_buffer) => {
block_cursor = cursor.bind_buffer(next_buffer).unwrap();
}
}
}
});
Self {
send_buffer,
receive_batch,
fetch_thread: Some(fetch_thread),
cursor: None,
}
}
pub fn into_cursor(self) -> Result<C, Error> {
drop(self.receive_batch);
drop(self.send_buffer);
if let Some(cursor) = self.cursor {
Ok(cursor)
} else {
self.fetch_thread.unwrap().join().unwrap()
}
}
}
impl<C, B> ConcurrentBlockCursor<C, B> {
pub fn fetch(&mut self) -> Result<Option<B>, Error> {
match self.receive_batch.recv() {
Ok(batch) => Ok(Some(batch)),
Err(_receive_error) => {
if let Some(join_handle) = self.fetch_thread.take() {
self.cursor = Some(join_handle.join().unwrap()?);
Ok(None)
} else {
Ok(None)
}
}
}
}
pub fn fill(&mut self, buffer: B) {
let _ = self.send_buffer.send(buffer);
}
pub fn fetch_into(&mut self, buffer: &mut B) -> Result<bool, Error> {
if let Some(mut batch) = self.fetch()? {
swap(buffer, &mut batch);
self.fill(batch);
Ok(true)
} else {
Ok(false)
}
}
}