use crate::error::Error;
use crate::sqlite::statement::StatementHandle;
use crossbeam_channel::{unbounded, Sender};
use either::Either;
use futures_channel::oneshot;
use std::sync::{Arc, Weak};
use std::thread;
use crate::sqlite::connection::ConnectionHandleRef;
use libsqlite3_sys::{sqlite3_reset, sqlite3_step, SQLITE_DONE, SQLITE_ROW};
use std::future::Future;
pub(crate) struct StatementWorker {
tx: Sender<StatementWorkerCommand>,
}
enum StatementWorkerCommand {
Step {
statement: Weak<StatementHandle>,
tx: oneshot::Sender<Result<Either<u64, ()>, Error>>,
},
Reset {
statement: Weak<StatementHandle>,
tx: oneshot::Sender<()>,
},
Shutdown {
tx: oneshot::Sender<()>,
},
}
impl StatementWorker {
pub(crate) fn new(conn: ConnectionHandleRef) -> Self {
let (tx, rx) = unbounded();
thread::spawn(move || {
for cmd in rx {
match cmd {
StatementWorkerCommand::Step { statement, tx } => {
let statement = if let Some(statement) = statement.upgrade() {
statement
} else {
continue;
};
let status = unsafe { sqlite3_step(statement.as_ptr()) };
let result = match status {
SQLITE_ROW => Ok(Either::Right(())),
SQLITE_DONE => Ok(Either::Left(statement.changes())),
_ => Err(statement.last_error().into()),
};
let _ = tx.send(result);
}
StatementWorkerCommand::Reset { statement, tx } => {
if let Some(statement) = statement.upgrade() {
unsafe { sqlite3_reset(statement.as_ptr()) };
let _ = tx.send(());
}
}
StatementWorkerCommand::Shutdown { tx } => {
drop(conn);
let _ = tx.send(());
return;
}
}
}
drop(conn);
});
Self { tx }
}
pub(crate) async fn step(
&mut self,
statement: &Arc<StatementHandle>,
) -> Result<Either<u64, ()>, Error> {
let (tx, rx) = oneshot::channel();
self.tx
.send(StatementWorkerCommand::Step {
statement: Arc::downgrade(statement),
tx,
})
.map_err(|_| Error::WorkerCrashed)?;
rx.await.map_err(|_| Error::WorkerCrashed)?
}
pub(crate) fn reset(
&mut self,
statement: &Arc<StatementHandle>,
) -> impl Future<Output = Result<(), Error>> {
let (tx, rx) = oneshot::channel();
let send_res = self
.tx
.send(StatementWorkerCommand::Reset {
statement: Arc::downgrade(statement),
tx,
})
.map_err(|_| Error::WorkerCrashed);
async move {
send_res?;
rx.await.map_err(|_| Error::WorkerCrashed)
}
}
pub(crate) fn shutdown(&mut self) -> impl Future<Output = Result<(), Error>> {
let (tx, rx) = oneshot::channel();
let send_res = self
.tx
.send(StatementWorkerCommand::Shutdown { tx })
.map_err(|_| Error::WorkerCrashed);
async move {
send_res?;
rx.await.map_err(|_| Error::WorkerCrashed)
}
}
}