use crate::error::{ConnectorError as Error, Result};
use crate::row_stream::RowStream;
use crate::Row;
use futures::stream::StreamExt;
use nautilus_core::Value;
use sqlx::pool::PoolConnection;
use sqlx::Pool;
use tokio::sync::mpsc;
const STREAMING_CHANNEL_CAPACITY: usize = 64;
pub(crate) struct StreamingQuery<DB, Bind, Decode>
where
DB: sqlx::Database,
{
pub pool: Pool<DB>,
pub sql_text: String,
pub params: Vec<Value>,
pub bind: Bind,
pub decode: Decode,
pub query_context: &'static str,
pub persistent: bool,
}
pub(crate) fn spawn_streaming_query<DB, Bind, Decode>(
request: StreamingQuery<DB, Bind, Decode>,
) -> RowStream<'static>
where
DB: sqlx::Database + sqlx::database::HasStatementCache,
PoolConnection<DB>: Send,
for<'q> <DB as sqlx::Database>::Arguments<'q>: sqlx::IntoArguments<'q, DB>,
for<'q> &'q mut <DB as sqlx::Database>::Connection: sqlx::Executor<'q, Database = DB>,
Bind: for<'q> Fn(
sqlx::query::Query<'q, DB, <DB as sqlx::Database>::Arguments<'q>>,
&'q Value,
)
-> Result<sqlx::query::Query<'q, DB, <DB as sqlx::Database>::Arguments<'q>>>
+ Copy
+ Send
+ 'static,
Decode: Fn(<DB as sqlx::Database>::Row) -> Result<Row> + Copy + Send + 'static,
{
let (tx, rx) = mpsc::channel::<Result<Row>>(STREAMING_CHANNEL_CAPACITY);
tokio::spawn(async move {
run_streaming_query(request, tx).await;
});
RowStream::from_receiver(rx)
}
async fn run_streaming_query<DB, Bind, Decode>(
request: StreamingQuery<DB, Bind, Decode>,
tx: mpsc::Sender<Result<Row>>,
) where
DB: sqlx::Database + sqlx::database::HasStatementCache,
PoolConnection<DB>: Send,
for<'q> <DB as sqlx::Database>::Arguments<'q>: sqlx::IntoArguments<'q, DB>,
for<'q> &'q mut <DB as sqlx::Database>::Connection: sqlx::Executor<'q, Database = DB>,
Bind: for<'q> Fn(
sqlx::query::Query<'q, DB, <DB as sqlx::Database>::Arguments<'q>>,
&'q Value,
)
-> Result<sqlx::query::Query<'q, DB, <DB as sqlx::Database>::Arguments<'q>>>
+ Copy,
Decode: Fn(<DB as sqlx::Database>::Row) -> Result<Row> + Copy,
{
let StreamingQuery {
pool,
sql_text,
params,
bind,
decode,
query_context,
persistent,
} = request;
let mut conn = match pool.acquire().await {
Ok(conn) => conn,
Err(e) => {
let _ = tx
.send(Err(Error::connection(e, "Failed to acquire connection")))
.await;
return;
}
};
let mut query = sqlx::query(&sql_text).persistent(persistent);
for param in ¶ms {
match bind(query, param) {
Ok(bound) => query = bound,
Err(e) => {
let _ = tx.send(Err(e)).await;
return;
}
}
}
let mut stream = query.fetch(&mut *conn);
let mut consumer_alive = true;
while let Some(item) = stream.next().await {
match item {
Ok(raw_row) => {
if !consumer_alive {
continue;
}
let decoded = decode(raw_row);
if tx.send(decoded).await.is_err() {
consumer_alive = false;
}
}
Err(e) => {
if consumer_alive {
let _ = tx.send(Err(Error::database(e, query_context))).await;
}
break;
}
}
}
}