use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use futures::{
FutureExt, Sink,
future::{BoxFuture, Shared},
};
use sqlx::SqlitePool;
use ulid::Ulid;
use crate::{CompactType, Config, SqliteStorage, SqliteTask};
type FlushFuture = BoxFuture<'static, Result<(), Arc<sqlx::Error>>>;
#[pin_project::pin_project]
#[derive(Debug)]
pub struct SqliteSink<Args, Compact, Codec> {
pool: SqlitePool,
config: Config,
buffer: Vec<SqliteTask<Compact>>,
#[pin]
flush_future: Option<Shared<FlushFuture>>,
_marker: std::marker::PhantomData<(Args, Codec)>,
}
impl<Args, Compact, Codec> Clone for SqliteSink<Args, Compact, Codec> {
fn clone(&self) -> Self {
Self {
pool: self.pool.clone(),
config: self.config.clone(),
buffer: Vec::new(),
flush_future: None,
_marker: std::marker::PhantomData,
}
}
}
pub async fn push_tasks(
pool: SqlitePool,
cfg: Config,
buffer: Vec<SqliteTask<CompactType>>,
) -> Result<(), Arc<sqlx::Error>> {
let mut tx = pool.begin().await?;
for task in buffer {
let id = task
.parts
.task_id
.map(|id| id.to_string())
.unwrap_or(Ulid::new().to_string());
let run_at = task.parts.run_at as i64;
let max_attempts = task.parts.ctx.max_attempts();
let priority = task.parts.ctx.priority();
let args = task.args;
let job_type = cfg.queue().to_string();
let meta = serde_json::to_string(&task.parts.ctx.meta()).unwrap_or_default();
sqlx::query_file!(
"queries/task/sink.sql",
args,
id,
job_type,
max_attempts,
run_at,
priority,
meta
)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(())
}
impl<Args, Compact, Codec> SqliteSink<Args, Compact, Codec> {
#[must_use]
pub fn new(pool: &SqlitePool, config: &Config) -> Self {
Self {
pool: pool.clone(),
config: config.clone(),
buffer: Vec::new(),
_marker: std::marker::PhantomData,
flush_future: None,
}
}
}
impl<Args, Encode, Fetcher> Sink<SqliteTask<CompactType>> for SqliteStorage<Args, Encode, Fetcher>
where
Args: Send + Sync + 'static,
{
type Error = sqlx::Error;
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, item: SqliteTask<CompactType>) -> Result<(), Self::Error> {
self.project().sink.buffer.push(item);
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut this = self.project();
if this.sink.flush_future.is_none() && this.sink.buffer.is_empty() {
return Poll::Ready(Ok(()));
}
if this.sink.flush_future.is_none() && !this.sink.buffer.is_empty() {
let pool = this.pool.clone();
let config = this.config.clone();
let buffer = std::mem::take(&mut this.sink.buffer);
let sink_fut = push_tasks(pool, config, buffer);
this.sink.flush_future = Some((Box::pin(sink_fut) as FlushFuture).shared());
}
if let Some(mut fut) = this.sink.flush_future.take() {
match fut.poll_unpin(cx) {
Poll::Ready(Ok(())) => {
Poll::Ready(Ok(()))
}
Poll::Ready(Err(e)) => {
Poll::Ready(Err(Arc::into_inner(e).unwrap()))
}
Poll::Pending => {
this.sink.flush_future = Some(fut);
Poll::Pending
}
}
} else {
Poll::Ready(Ok(()))
}
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_flush(cx)
}
}