use crate::connection::{
AsyncConnection, BulkInsert, Connection, ExecutionSummary, ForeignKey, QueryResult, SchemaInfo,
StatementResult,
};
use crate::error::SqlError;
use crate::guard::SizeGuards;
use crate::stream::RowCursor;
pub struct SyncConnection {
inner: Box<dyn AsyncConnection>,
guards: SizeGuards,
rt: tokio::runtime::Runtime,
}
impl SyncConnection {
#[must_use]
pub(crate) fn new(rt: tokio::runtime::Runtime, inner: Box<dyn AsyncConnection>) -> Self {
Self {
rt,
inner,
guards: SizeGuards::default(),
}
}
}
impl Connection for SyncConnection {
fn execute(&mut self, sql: &str) -> Result<ExecutionSummary, SqlError> {
let inner = &mut self.inner;
self.rt.block_on(inner.execute(sql))
}
fn query(&mut self, sql: &str) -> Result<QueryResult, SqlError> {
let inner = &mut self.inner;
let guards = self.guards;
self.rt.block_on(async move {
use futures_util::stream::StreamExt;
let (columns, mut stream) = inner.query_stream(sql).await?;
let mut rows = Vec::new();
let mut total: usize = 0;
let mut ordinal: u64 = 0;
while let Some(item) = stream.next().await {
let row = item?;
guards.check_row(ordinal, &row, &columns)?;
if guards.caps_total() {
let row_bytes: usize = row.iter().map(crate::value::Value::byte_size).sum();
total = total.saturating_add(row_bytes);
if total > guards.max_total_buffered_bytes {
return Err(SqlError::BufferTooLarge {
rows_buffered: ordinal,
cap: guards.max_total_buffered_bytes,
});
}
}
ordinal += 1;
rows.push(row);
}
Ok(QueryResult { columns, rows })
})
}
fn query_cursor(&mut self, sql: &str) -> Result<RowCursor<'_>, SqlError> {
let rt = &self.rt;
let inner = &mut self.inner;
let guards = self.guards;
let (columns, stream) = rt.block_on(inner.query_stream(sql))?;
Ok(RowCursor::new(columns, rt, stream, guards))
}
fn size_guards(&self) -> SizeGuards {
self.guards
}
fn set_size_guards(&mut self, guards: SizeGuards) {
self.guards = guards;
}
fn execute_multi(&mut self, sql: &str) -> Result<Vec<StatementResult>, SqlError> {
let inner = &mut self.inner;
self.rt.block_on(inner.execute_multi(sql))
}
fn ping(&mut self) -> Result<(), SqlError> {
let inner = &mut self.inner;
self.rt.block_on(inner.ping())
}
fn list_tables(&mut self, schema: Option<&str>) -> Result<Vec<String>, SqlError> {
let inner = &mut self.inner;
self.rt.block_on(inner.list_tables(schema))
}
fn list_schemas(&mut self) -> Result<Vec<SchemaInfo>, SqlError> {
let inner = &mut self.inner;
self.rt.block_on(inner.list_schemas())
}
fn describe_table(
&mut self,
schema: Option<&str>,
table: &str,
) -> Result<QueryResult, SqlError> {
let inner = &mut self.inner;
self.rt.block_on(inner.describe_table(schema, table))
}
fn primary_key(&mut self, schema: Option<&str>, table: &str) -> Result<Vec<String>, SqlError> {
let inner = &mut self.inner;
self.rt.block_on(inner.primary_key(schema, table))
}
fn list_foreign_keys(&mut self, schema: Option<&str>) -> Result<Vec<ForeignKey>, SqlError> {
let inner = &mut self.inner;
self.rt.block_on(inner.list_foreign_keys(schema))
}
fn bulk_insert_rows(&mut self, target: BulkInsert<'_>) -> Result<usize, SqlError> {
let inner = &mut self.inner;
self.rt.block_on(inner.bulk_insert_rows(target))
}
}