1use std::{
4 pin::Pin,
5 sync::Arc,
6 task::{Context, Poll},
7};
8
9use futures::{
10 FutureExt, Sink,
11 future::{BoxFuture, Shared},
12};
13use libsql::Database;
14use ulid::Ulid;
15
16use crate::{CompactType, LibsqlError, LibsqlTask, config::Config};
17
18const INSERT_SQL: &str = r#"
20INSERT INTO Jobs (job, id, job_type, status, attempts, max_attempts, run_at, priority, metadata)
21VALUES (?1, ?2, ?3, 'Pending', 0, ?4, ?5, ?6, ?7)
22"#;
23
24type FlushFuture = BoxFuture<'static, Result<(), Arc<LibsqlError>>>;
25
26pub struct LibsqlSink<Args, Codec> {
30 db: &'static Database,
31 config: Config,
32 buffer: Vec<LibsqlTask<CompactType>>,
33 flush_future: Option<Shared<FlushFuture>>,
34 _marker: std::marker::PhantomData<fn() -> (Args, Codec)>,
35}
36
37impl<Args, Codec> std::fmt::Debug for LibsqlSink<Args, Codec> {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 f.debug_struct("LibsqlSink")
41 .field("config", &self.config)
42 .field("buffer_len", &self.buffer.len())
43 .finish()
44 }
45}
46
47impl<Args, Codec> Unpin for LibsqlSink<Args, Codec> {}
54
55impl<Args, Codec> Clone for LibsqlSink<Args, Codec> {
56 fn clone(&self) -> Self {
63 Self {
64 db: self.db,
65 config: self.config.clone(),
66 buffer: Vec::new(),
67 flush_future: None,
68 _marker: std::marker::PhantomData,
69 }
70 }
71}
72
73impl<Args, Codec> LibsqlSink<Args, Codec> {
74 #[must_use]
76 pub fn new(db: &'static Database, config: &Config) -> Self {
77 Self {
78 db,
79 config: config.clone(),
80 buffer: Vec::new(),
81 flush_future: None,
82 _marker: std::marker::PhantomData,
83 }
84 }
85}
86
87pub async fn push_tasks(
89 db: &'static Database,
90 cfg: &Config,
91 buffer: Vec<LibsqlTask<CompactType>>,
92) -> Result<(), Arc<LibsqlError>> {
93 log::debug!("push_tasks called with {} tasks", buffer.len());
94 let conn = db
95 .connect()
96 .map_err(|e| Arc::new(LibsqlError::Database(e)))?;
97
98 log::debug!("Starting transaction");
100 conn.execute("BEGIN", libsql::params![])
101 .await
102 .map_err(|e| Arc::new(LibsqlError::Database(e)))?;
103
104 for (i, task) in buffer.iter().enumerate() {
105 log::debug!("Processing task {}", i);
106 let id = task
107 .parts
108 .task_id
109 .map(|id| id.to_string())
110 .unwrap_or_else(|| Ulid::new().to_string());
111 let run_at = task.parts.run_at as i64;
112 let max_attempts = task.parts.ctx.max_attempts();
113 let priority = task.parts.ctx.priority();
114 let args = task.args.clone();
115 let job_type = cfg.queue().to_string();
116 let meta =
117 serde_json::to_string(&task.parts.ctx.meta()).unwrap_or_else(|_| "{}".to_string());
118
119 log::debug!("Executing INSERT with id: {}, job_type: {}", id, job_type);
120 match conn
121 .execute(
122 INSERT_SQL,
123 libsql::params![args, id, job_type, max_attempts, run_at, priority, meta],
124 )
125 .await
126 {
127 Ok(rows_affected) => {
128 log::debug!(
129 "INSERT executed successfully, rows affected: {}",
130 rows_affected
131 );
132 if rows_affected != 1 {
133 log::warn!("INSERT affected {} rows instead of 1", rows_affected);
134 }
135 }
136 Err(e) => {
137 log::error!("INSERT failed: {:?}", e);
138 if let Err(rollback_err) = conn.execute("ROLLBACK", libsql::params![]).await {
140 log::error!("Failed to rollback transaction: {:?}", rollback_err);
141 }
142 return Err(Arc::new(LibsqlError::Database(e)));
143 }
144 }
145 }
146
147 log::debug!("Committing transaction");
148 conn.execute("COMMIT", libsql::params![])
149 .await
150 .map_err(|e| Arc::new(LibsqlError::Database(e)))?;
151
152 log::debug!("push_tasks completed successfully");
153 Ok(())
154}
155
156impl<Args, Codec> Sink<LibsqlTask<CompactType>> for LibsqlSink<Args, Codec>
157where
158 Args: Send + Sync + 'static,
159{
160 type Error = LibsqlError;
161
162 fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
163 Poll::Ready(Ok(()))
164 }
165
166 fn start_send(self: Pin<&mut Self>, item: LibsqlTask<CompactType>) -> Result<(), Self::Error> {
167 let this = self.get_mut();
168 this.buffer.push(item);
169 Ok(())
170 }
171
172 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
173 let this = self.get_mut();
174
175 if this.flush_future.is_none() && this.buffer.is_empty() {
177 return Poll::Ready(Ok(()));
178 }
179
180 if this.flush_future.is_none() && !this.buffer.is_empty() {
182 let db = this.db;
183 let config = this.config.clone();
184 let buffer = std::mem::take(&mut this.buffer);
185 let sink_fut = async move { push_tasks(db, &config, buffer).await };
186 this.flush_future = Some((Box::pin(sink_fut) as FlushFuture).shared());
187 }
188
189 if let Some(mut fut) = this.flush_future.take() {
191 match fut.poll_unpin(cx) {
192 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
193 Poll::Ready(Err(e)) => {
194 Poll::Ready(Err(Arc::try_unwrap(e)
195 .unwrap_or_else(|arc| LibsqlError::Other(arc.to_string()))))
196 }
197 Poll::Pending => {
198 this.flush_future = Some(fut);
199 Poll::Pending
200 }
201 }
202 } else {
203 Poll::Ready(Ok(()))
204 }
205 }
206
207 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
208 self.poll_flush(cx)
209 }
210}