apalis_libsql/
sink.rs

1//! Sink implementation for pushing tasks to libSQL database
2
3use std::{
4    pin::Pin,
5    sync::Arc,
6    task::{Context, Poll},
7};
8
9use futures::{
10    FutureExt, Sink,
11    future::{BoxFuture, Shared},
12};
13use libsql::Database;
14use ulid::Ulid;
15
16use crate::{CompactType, LibsqlError, LibsqlTask, config::Config};
17
18/// SQL query to insert a new task
19const INSERT_SQL: &str = r#"
20INSERT INTO Jobs (job, id, job_type, status, attempts, max_attempts, run_at, priority, metadata)
21VALUES (?1, ?2, ?3, 'Pending', 0, ?4, ?5, ?6, ?7)
22"#;
23
24type FlushFuture = BoxFuture<'static, Result<(), Arc<LibsqlError>>>;
25
26/// Sink for pushing tasks to libSQL
27///
28/// This struct is Unpin because it doesn't contain any self-referential data
29pub struct LibsqlSink<Args, Codec> {
30    db: &'static Database,
31    config: Config,
32    buffer: Vec<LibsqlTask<CompactType>>,
33    flush_future: Option<Shared<FlushFuture>>,
34    _marker: std::marker::PhantomData<fn() -> (Args, Codec)>,
35}
36
37// Manual Debug implementation to avoid requiring Debug on Args and Codec
38impl<Args, Codec> std::fmt::Debug for LibsqlSink<Args, Codec> {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        f.debug_struct("LibsqlSink")
41            .field("config", &self.config)
42            .field("buffer_len", &self.buffer.len())
43            .finish()
44    }
45}
46
47// LibsqlSink is Unpin because:
48// - &'static Database is Unpin
49// - Config is Unpin (regular struct)
50// - Vec is Unpin
51// - Option<Shared<BoxFuture>> is Unpin
52// - PhantomData with fn() -> X is Unpin
53impl<Args, Codec> Unpin for LibsqlSink<Args, Codec> {}
54
55impl<Args, Codec> Clone for LibsqlSink<Args, Codec> {
56    /// Creates a new `LibsqlSink` with the same configuration but an empty buffer.
57    ///
58    /// The cloned sink will have:
59    /// - The same database reference and configuration
60    /// - An empty buffer (any buffered tasks in the original are not copied)
61    /// - No active flush future
62    fn clone(&self) -> Self {
63        Self {
64            db: self.db,
65            config: self.config.clone(),
66            buffer: Vec::new(),
67            flush_future: None,
68            _marker: std::marker::PhantomData,
69        }
70    }
71}
72
73impl<Args, Codec> LibsqlSink<Args, Codec> {
74    /// Create a new LibsqlSink
75    #[must_use]
76    pub fn new(db: &'static Database, config: &Config) -> Self {
77        Self {
78            db,
79            config: config.clone(),
80            buffer: Vec::new(),
81            flush_future: None,
82            _marker: std::marker::PhantomData,
83        }
84    }
85}
86
87/// Push a batch of tasks to the database
88pub async fn push_tasks(
89    db: &'static Database,
90    cfg: &Config,
91    buffer: Vec<LibsqlTask<CompactType>>,
92) -> Result<(), Arc<LibsqlError>> {
93    log::debug!("push_tasks called with {} tasks", buffer.len());
94    let conn = db
95        .connect()
96        .map_err(|e| Arc::new(LibsqlError::Database(e)))?;
97
98    // Use transaction for batch insert
99    log::debug!("Starting transaction");
100    conn.execute("BEGIN", libsql::params![])
101        .await
102        .map_err(|e| Arc::new(LibsqlError::Database(e)))?;
103
104    for (i, task) in buffer.iter().enumerate() {
105        log::debug!("Processing task {}", i);
106        let id = task
107            .parts
108            .task_id
109            .map(|id| id.to_string())
110            .unwrap_or_else(|| Ulid::new().to_string());
111        let run_at = task.parts.run_at as i64;
112        let max_attempts = task.parts.ctx.max_attempts();
113        let priority = task.parts.ctx.priority();
114        let args = task.args.clone();
115        let job_type = cfg.queue().to_string();
116        let meta =
117            serde_json::to_string(&task.parts.ctx.meta()).unwrap_or_else(|_| "{}".to_string());
118
119        log::debug!("Executing INSERT with id: {}, job_type: {}", id, job_type);
120        match conn
121            .execute(
122                INSERT_SQL,
123                libsql::params![args, id, job_type, max_attempts, run_at, priority, meta],
124            )
125            .await
126        {
127            Ok(rows_affected) => {
128                log::debug!(
129                    "INSERT executed successfully, rows affected: {}",
130                    rows_affected
131                );
132                if rows_affected != 1 {
133                    log::warn!("INSERT affected {} rows instead of 1", rows_affected);
134                }
135            }
136            Err(e) => {
137                log::error!("INSERT failed: {:?}", e);
138                // Try to rollback
139                if let Err(rollback_err) = conn.execute("ROLLBACK", libsql::params![]).await {
140                    log::error!("Failed to rollback transaction: {:?}", rollback_err);
141                }
142                return Err(Arc::new(LibsqlError::Database(e)));
143            }
144        }
145    }
146
147    log::debug!("Committing transaction");
148    conn.execute("COMMIT", libsql::params![])
149        .await
150        .map_err(|e| Arc::new(LibsqlError::Database(e)))?;
151
152    log::debug!("push_tasks completed successfully");
153    Ok(())
154}
155
156impl<Args, Codec> Sink<LibsqlTask<CompactType>> for LibsqlSink<Args, Codec>
157where
158    Args: Send + Sync + 'static,
159{
160    type Error = LibsqlError;
161
162    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
163        Poll::Ready(Ok(()))
164    }
165
166    fn start_send(self: Pin<&mut Self>, item: LibsqlTask<CompactType>) -> Result<(), Self::Error> {
167        let this = self.get_mut();
168        this.buffer.push(item);
169        Ok(())
170    }
171
172    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
173        let this = self.get_mut();
174
175        // If there's no existing future and buffer is empty, we're done
176        if this.flush_future.is_none() && this.buffer.is_empty() {
177            return Poll::Ready(Ok(()));
178        }
179
180        // Create the future only if we don't have one and there's work to do
181        if this.flush_future.is_none() && !this.buffer.is_empty() {
182            let db = this.db;
183            let config = this.config.clone();
184            let buffer = std::mem::take(&mut this.buffer);
185            let sink_fut = async move { push_tasks(db, &config, buffer).await };
186            this.flush_future = Some((Box::pin(sink_fut) as FlushFuture).shared());
187        }
188
189        // Poll the existing future
190        if let Some(mut fut) = this.flush_future.take() {
191            match fut.poll_unpin(cx) {
192                Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
193                Poll::Ready(Err(e)) => {
194                    Poll::Ready(Err(Arc::try_unwrap(e)
195                        .unwrap_or_else(|arc| LibsqlError::Other(arc.to_string()))))
196                }
197                Poll::Pending => {
198                    this.flush_future = Some(fut);
199                    Poll::Pending
200                }
201            }
202        } else {
203            Poll::Ready(Ok(()))
204        }
205    }
206
207    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
208        self.poll_flush(cx)
209    }
210}