use std::cmp::Ordering;
use std::fmt::{self, Debug, Formatter};
use std::ptr::NonNull;
use futures_core::future::BoxFuture;
use futures_intrusive::sync::MutexGuard;
use futures_util::future;
use libsqlite3_sys::sqlite3;
pub(crate) use handle::{ConnectionHandle, ConnectionHandleRaw};
use crate::common::StatementCache;
use crate::connection::{Connection, LogSettings};
use crate::error::Error;
use crate::sqlite::connection::establish::EstablishParams;
use crate::sqlite::connection::worker::ConnectionWorker;
use crate::sqlite::statement::VirtualStatement;
use crate::sqlite::{Sqlite, SqliteConnectOptions};
use crate::transaction::Transaction;
pub(crate) mod collation;
pub(crate) mod describe;
pub(crate) mod establish;
pub(crate) mod execute;
mod executor;
mod explain;
mod handle;
mod worker;
pub struct SqliteConnection {
pub(crate) worker: ConnectionWorker,
pub(crate) row_channel_size: usize,
}
pub struct LockedSqliteHandle<'a> {
pub(crate) guard: MutexGuard<'a, ConnectionState>,
}
pub(crate) struct ConnectionState {
pub(crate) handle: ConnectionHandle,
pub(crate) transaction_depth: usize,
pub(crate) statements: Statements,
log_settings: LogSettings,
}
pub(crate) struct Statements {
cached: StatementCache<VirtualStatement>,
temp: Option<VirtualStatement>,
}
impl SqliteConnection {
pub(crate) async fn establish(options: &SqliteConnectOptions) -> Result<Self, Error> {
let params = EstablishParams::from_options(options)?;
let worker = ConnectionWorker::establish(params).await?;
Ok(Self {
worker,
row_channel_size: options.row_channel_size,
})
}
#[deprecated = "Unsynchronized access is unsafe. See documentation for details."]
pub fn as_raw_handle(&mut self) -> *mut sqlite3 {
self.worker.handle_raw.as_ptr()
}
#[deprecated = "Completes asynchronously. See documentation for details."]
pub fn create_collation(
&mut self,
name: &str,
compare: impl Fn(&str, &str) -> Ordering + Send + Sync + 'static,
) -> Result<(), Error> {
self.worker.create_collation(name, compare)
}
pub async fn lock_handle(&mut self) -> Result<LockedSqliteHandle<'_>, Error> {
let guard = self.worker.unlock_db().await?;
Ok(LockedSqliteHandle { guard })
}
}
impl Debug for SqliteConnection {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("SqliteConnection")
.field("row_channel_size", &self.row_channel_size)
.field("cached_statements_size", &self.cached_statements_size())
.finish()
}
}
impl Connection for SqliteConnection {
type Database = Sqlite;
type Options = SqliteConnectOptions;
fn close(mut self) -> BoxFuture<'static, Result<(), Error>> {
Box::pin(async move {
let shutdown = self.worker.shutdown();
drop(self);
shutdown.await
})
}
fn close_hard(self) -> BoxFuture<'static, Result<(), Error>> {
Box::pin(async move {
drop(self);
Ok(())
})
}
fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(self.worker.ping())
}
fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
where
Self: Sized,
{
Transaction::begin(self)
}
fn cached_statements_size(&self) -> usize {
self.worker
.shared
.cached_statements_size
.load(std::sync::atomic::Ordering::Acquire)
}
fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
self.worker.clear_cache().await?;
Ok(())
})
}
#[doc(hidden)]
fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(future::ok(()))
}
#[doc(hidden)]
fn should_flush(&self) -> bool {
false
}
}
impl LockedSqliteHandle<'_> {
pub fn as_raw_handle(&mut self) -> NonNull<sqlite3> {
self.guard.handle.as_non_null_ptr()
}
pub fn create_collation(
&mut self,
name: &str,
compare: impl Fn(&str, &str) -> Ordering + Send + Sync + 'static,
) -> Result<(), Error> {
collation::create_collation(&mut self.guard.handle, name, compare)
}
}
impl Drop for ConnectionState {
fn drop(&mut self) {
self.statements.clear();
}
}
impl Statements {
fn new(capacity: usize) -> Self {
Statements {
cached: StatementCache::new(capacity),
temp: None,
}
}
fn get(&mut self, query: &str, persistent: bool) -> Result<&mut VirtualStatement, Error> {
if !persistent || !self.cached.is_enabled() {
return Ok(self.temp.insert(VirtualStatement::new(query, false)?));
}
let exists = self.cached.contains_key(query);
if !exists {
let statement = VirtualStatement::new(query, true)?;
self.cached.insert(query, statement);
}
let statement = self.cached.get_mut(query).unwrap();
if exists {
statement.reset()?;
}
Ok(statement)
}
fn len(&self) -> usize {
self.cached.len()
}
fn clear(&mut self) {
self.cached.clear();
self.temp = None;
}
}