#![deny(missing_debug_implementations)]
use std::path::Path;
use std::sync::Arc;
pub use spg_embedded::{
ColumnSchema, DataType, Database, EngineError, ParsedStatement, QueryResult, Statement, Value,
};
pub use spg_engine::CatalogSnapshot;
use tokio::sync::RwLock;
use tokio::task::JoinError;
trait FlattenBlockingExt<T> {
fn flatten_blocking(self) -> Result<T, EngineError>;
}
impl<T> FlattenBlockingExt<T> for Result<Result<T, EngineError>, JoinError> {
fn flatten_blocking(self) -> Result<T, EngineError> {
match self {
Ok(inner) => inner,
Err(je) if je.is_cancelled() => Err(EngineError::Cancelled),
Err(je) => std::panic::resume_unwind(je.into_panic()),
}
}
}
trait UnwrapBlockingExt<T> {
fn unwrap_blocking(self) -> T;
}
impl<T> UnwrapBlockingExt<T> for Result<T, JoinError> {
fn unwrap_blocking(self) -> T {
match self {
Ok(v) => v,
Err(je) if je.is_cancelled() => {
panic!("spg-embedded-tokio: snapshot helper cancelled during runtime shutdown")
}
Err(je) => std::panic::resume_unwind(je.into_panic()),
}
}
}
#[derive(Debug, Clone)]
pub struct AsyncDatabase {
inner: Arc<RwLock<Database>>,
}
#[derive(Debug, Clone)]
pub struct AsyncStatement {
inner: Arc<crate::Statement>,
}
#[doc(hidden)]
#[must_use]
pub fn async_statement_inner(stmt: &AsyncStatement) -> Arc<crate::Statement> {
Arc::clone(&stmt.inner)
}
impl AsyncDatabase {
#[must_use]
pub fn open_in_memory() -> Self {
Self {
inner: Arc::new(RwLock::new(Database::open_in_memory())),
}
}
pub async fn open_path<P: AsRef<Path>>(path: P) -> Result<Self, EngineError> {
let path = path.as_ref().to_path_buf();
let db = tokio::task::spawn_blocking(move || Database::open_path(path))
.await
.flatten_blocking()?;
Ok(Self {
inner: Arc::new(RwLock::new(db)),
})
}
pub async fn execute(&self, sql: &str) -> Result<QueryResult, EngineError> {
let inner = Arc::clone(&self.inner);
let sql = sql.to_string();
tokio::task::spawn_blocking(move || {
let (result, ticket) = {
let mut guard = inner.blocking_write();
guard.execute_buffered(&sql)?
}; if let Some(t) = ticket {
t.wait()?; }
Ok(result)
})
.await
.flatten_blocking()
}
pub async fn execute_script(&self, sql: &str) -> Result<Vec<QueryResult>, EngineError> {
let inner = Arc::clone(&self.inner);
let sql = sql.to_string();
tokio::task::spawn_blocking(move || {
let mut guard = inner.blocking_write();
guard.execute_script(&sql)
})
.await
.flatten_blocking()
}
pub async fn query(&self, sql: &str) -> Result<Vec<Vec<Value>>, EngineError> {
let inner = Arc::clone(&self.inner);
let sql = sql.to_string();
tokio::task::spawn_blocking(move || {
let mut guard = inner.blocking_write();
guard.query(&sql)
})
.await
.flatten_blocking()
}
pub async fn prepare(&self, sql: &str) -> Result<AsyncStatement, EngineError> {
let inner = Arc::clone(&self.inner);
let sql = sql.to_string();
tokio::task::spawn_blocking(move || {
let mut guard = inner.blocking_write();
guard.prepare(&sql).map(|stmt| AsyncStatement {
inner: Arc::new(stmt),
})
})
.await
.flatten_blocking()
}
pub async fn describe(
&self,
sql: &str,
) -> Result<(Vec<u32>, Vec<spg_embedded::ColumnSchema>), EngineError> {
let inner = Arc::clone(&self.inner);
let sql = sql.to_string();
tokio::task::spawn_blocking(move || {
let mut guard = inner.blocking_write();
guard.describe(&sql)
})
.await
.flatten_blocking()
}
pub async fn execute_prepared(
&self,
stmt: &AsyncStatement,
params: Vec<Value>,
) -> Result<QueryResult, EngineError> {
let inner = Arc::clone(&self.inner);
let stmt_inner = Arc::clone(&stmt.inner);
tokio::task::spawn_blocking(move || {
let (result, ticket) = {
let mut guard = inner.blocking_write();
guard.execute_prepared_buffered(&stmt_inner, ¶ms)?
};
if let Some(t) = ticket {
t.wait()?;
}
Ok(result)
})
.await
.flatten_blocking()
}
pub async fn query_prepared(
&self,
stmt: &AsyncStatement,
params: Vec<Value>,
) -> Result<Vec<Vec<Value>>, EngineError> {
let inner = Arc::clone(&self.inner);
let stmt_inner = Arc::clone(&stmt.inner);
tokio::task::spawn_blocking(move || {
let mut guard = inner.blocking_write();
guard.query_prepared(&stmt_inner, ¶ms)
})
.await
.flatten_blocking()
}
pub async fn query_with_columns(
&self,
sql: &str,
) -> Result<(Vec<spg_embedded::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
let inner = Arc::clone(&self.inner);
let sql = sql.to_string();
tokio::task::spawn_blocking(move || {
let mut guard = inner.blocking_write();
guard.query_with_columns(&sql)
})
.await
.flatten_blocking()
}
pub async fn query_prepared_with_columns(
&self,
stmt: &AsyncStatement,
params: Vec<Value>,
) -> Result<(Vec<spg_embedded::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
let inner = Arc::clone(&self.inner);
let stmt_inner = Arc::clone(&stmt.inner);
tokio::task::spawn_blocking(move || {
let mut guard = inner.blocking_write();
guard.query_prepared_with_columns(&stmt_inner, ¶ms)
})
.await
.flatten_blocking()
}
pub async fn checkpoint(&self) -> Result<(), EngineError> {
let inner = Arc::clone(&self.inner);
tokio::task::spawn_blocking(move || {
let mut guard = inner.blocking_write();
guard.checkpoint()
})
.await
.flatten_blocking()
}
pub async fn clone_snapshot_inline(&self) -> CatalogSnapshot {
let guard = self.inner.read().await;
guard.engine().clone_snapshot()
}
pub async fn read_handle(&self) -> AsyncReadHandle {
let inner = Arc::clone(&self.inner);
let snapshot = tokio::task::spawn_blocking(move || {
let guard = inner.blocking_read();
guard.engine().clone_snapshot()
})
.await
.unwrap_blocking();
AsyncReadHandle {
db: Arc::clone(&self.inner),
snapshot,
}
}
}
#[derive(Debug)]
pub struct AsyncReadHandle {
db: Arc<RwLock<Database>>,
snapshot: CatalogSnapshot,
}
impl AsyncReadHandle {
pub async fn query(&self, sql: &str) -> Result<QueryResult, EngineError> {
let snapshot = self.snapshot.clone();
let sql = sql.to_string();
tokio::task::spawn_blocking(move || {
spg_engine::Engine::execute_readonly_on_snapshot(&snapshot, &sql)
})
.await
.flatten_blocking()
}
pub async fn prepare(&self, sql: &str) -> Result<AsyncStatement, EngineError> {
let snapshot = self.snapshot.clone();
let sql = sql.to_string();
tokio::task::spawn_blocking(move || {
Database::prepare_on_snapshot(&snapshot, &sql).map(|stmt| AsyncStatement {
inner: Arc::new(stmt),
})
})
.await
.flatten_blocking()
}
pub async fn execute_prepared(
&self,
stmt: &AsyncStatement,
params: Vec<Value>,
) -> Result<QueryResult, EngineError> {
let snapshot = self.snapshot.clone();
let stmt_inner = Arc::clone(&stmt.inner);
tokio::task::spawn_blocking(move || {
Database::execute_prepared_on_snapshot(&snapshot, &stmt_inner, ¶ms)
})
.await
.flatten_blocking()
}
pub async fn describe(
&self,
sql: &str,
) -> Result<(Vec<u32>, Vec<spg_embedded::ColumnSchema>), EngineError> {
let snapshot = self.snapshot.clone();
let sql = sql.to_string();
tokio::task::spawn_blocking(move || Database::describe_on_snapshot(&snapshot, &sql))
.await
.flatten_blocking()
}
pub async fn refresh(&mut self) {
let inner = Arc::clone(&self.db);
let new_snapshot = tokio::task::spawn_blocking(move || {
let guard = inner.blocking_read();
guard.engine().clone_snapshot()
})
.await
.unwrap_blocking();
self.snapshot = new_snapshot;
}
}