use crate::UtilesResult;
use crate::sqlite::{AsyncSqliteConn, RusqliteResult};
use rusqlite::Connection;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{error, warn};
pub fn sqlite_query_tokio_receiver<T, F, C>(
mbt: &C,
query_override: &str,
row_mapper: F,
) -> UtilesResult<tokio::sync::mpsc::Receiver<T>>
where
F: Fn(&rusqlite::Row) -> RusqliteResult<T> + Send + Sync + 'static,
T: Send + 'static,
C: AsyncSqliteConn + Clone + 'static,
{
let (tx, rx) = tokio::sync::mpsc::channel::<T>(100);
let query = query_override.to_string();
let mbt_clone = mbt.clone();
tokio::spawn(async move {
let result = mbt_clone
.conn(move |conn: &Connection| -> RusqliteResult<()> {
let mut stmt = conn.prepare(&query)?;
let rows_iter = stmt.query_map([], |row| {
let item = row_mapper(row)?;
if let Err(e) = tx.blocking_send(item) {
warn!("channel send error: {:?}", e);
}
Ok(())
})?;
for row_result in rows_iter {
if let Err(e) = row_result {
error!("row error: {:?}", e);
}
}
Ok(())
})
.await;
if let Err(e) = result {
error!("make_stream_rx: DB error: {:?}", e);
}
});
Ok(rx)
}
pub fn sqlite_query_tokio_receiver_stream<T, F, C>(
mbt: &C,
query_override: &str,
row_mapper: F,
) -> UtilesResult<ReceiverStream<T>>
where
F: Fn(&rusqlite::Row) -> RusqliteResult<T> + Send + Sync + 'static,
T: Send + 'static,
C: AsyncSqliteConn + Clone + 'static,
{
let tokio_rx = sqlite_query_tokio_receiver(mbt, query_override, row_mapper)?;
Ok(ReceiverStream::new(tokio_rx))
}