denokv_sqlite/
lib.rs

1// Copyright 2023 the Deno authors. All rights reserved. MIT license.
2
3mod backend;
4mod sum_operand;
5mod time;
6
7use std::collections::hash_map::Entry;
8use std::collections::HashMap;
9use std::collections::VecDeque;
10use std::num::NonZeroU32;
11use std::ops::Add;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::sync::Barrier;
15use std::sync::Mutex;
16use std::sync::RwLock;
17use std::sync::Weak;
18use std::thread::JoinHandle;
19use std::time::Duration;
20
21pub use crate::backend::sqlite_retry_loop;
22use crate::backend::DequeuedMessage;
23use crate::backend::QueueMessageId;
24use crate::backend::SqliteBackend;
25pub use crate::backend::SqliteBackendError;
26use async_stream::try_stream;
27use chrono::DateTime;
28use chrono::Utc;
29use deno_error::JsErrorBox;
30use denokv_proto::AtomicWrite;
31use denokv_proto::CommitResult;
32use denokv_proto::Database;
33use denokv_proto::QueueMessageHandle;
34use denokv_proto::ReadRange;
35use denokv_proto::ReadRangeOutput;
36use denokv_proto::SnapshotReadOptions;
37use denokv_proto::Versionstamp;
38use denokv_proto::WatchKeyOutput;
39use futures::future::Either;
40use futures::FutureExt;
41use futures::Stream;
42use futures::StreamExt;
43use rand::RngCore;
44pub use rusqlite::Connection;
45use time::utc_now;
46use tokio::select;
47use tokio::sync::futures::Notified;
48use tokio::sync::oneshot;
49use tokio::sync::watch;
50use tokio::sync::Notify;
51use tokio_stream::wrappers::ReceiverStream;
52
53/// The interval at which the queue_running table is cleaned up of stale
54/// entries where the deadline has passed.
55const QUEUE_RUNNING_CLEANUP_INTERVAL: Duration = Duration::from_secs(60);
56/// The interval at which the deadline of messages being processed in this
57/// instance is updated in the queue_running table.
58const QUEUE_MESSAGE_DEADLINE_UPDATE_INTERVAL: Duration = Duration::from_secs(2);
59/// The minimum interval at which the queue is checked for the next message
60/// to process, irrespective of any known deadlines.
61const QUEUE_DEQUEUE_FALLBACK_INTERVAL: Duration = Duration::from_secs(60);
62/// The minimum interval at which we try to process expired messages,
63/// irrespective of any known deadlines.
64const EXPIRY_FALLBACK_INTERVAL: Duration = Duration::from_secs(60);
65/// The jitter applied to dequeueing of messages.
66const QUEUE_DEQUEUE_JITTER: Duration = Duration::from_millis(100);
67/// The jitter applied to expiry of messages.
68const EXPIRY_JITTER: Duration = Duration::from_secs(1);
69
70enum SqliteRequest {
71  SnapshotRead {
72    requests: Vec<ReadRange>,
73    options: SnapshotReadOptions,
74    sender: oneshot::Sender<
75      Result<(Vec<ReadRangeOutput>, Versionstamp), SqliteBackendError>,
76    >,
77  },
78  AtomicWrite {
79    write: AtomicWrite,
80    sender: oneshot::Sender<Result<Option<CommitResult>, SqliteBackendError>>,
81  },
82  QueueDequeueMessage {
83    sender: oneshot::Sender<DequeuedMessage>,
84  },
85  QueueFinishMessage {
86    id: QueueMessageId,
87    success: bool,
88    sender: oneshot::Sender<Result<(), SqliteBackendError>>,
89  },
90}
91
92// A structure that across threads in the same process, can be used to support
93// multiple Sqlite structs that all share the underlying database, but with
94// different connections, to notify each other for KV queues and KV watch
95// related events.
96#[derive(Default, Clone)]
97pub struct SqliteNotifier(Arc<SqliteNotifierInner>);
98
99#[derive(Default)]
100struct SqliteNotifierInner {
101  dequeue_notify: Notify,
102  key_watchers: RwLock<HashMap<Vec<u8>, watch::Sender<Versionstamp>>>,
103}
104
105struct SqliteKeySubscription {
106  notifier: Weak<SqliteNotifierInner>,
107  key: Option<Vec<u8>>,
108  receiver: watch::Receiver<Versionstamp>,
109}
110
111impl SqliteNotifier {
112  fn schedule_dequeue(&self) {
113    self.0.dequeue_notify.notify_one();
114  }
115
116  fn notify_key_update(&self, key: &[u8], versionstamp: Versionstamp) {
117    let key_watchers = self.0.key_watchers.read().unwrap();
118    if let Some(sender) = key_watchers.get(key) {
119      sender.send_if_modified(|in_place_versionstamp| {
120        if *in_place_versionstamp < versionstamp {
121          *in_place_versionstamp = versionstamp;
122          true
123        } else {
124          false
125        }
126      });
127    }
128  }
129
130  /// Subscribe to a given key. The returned subscription can be used to get
131  /// updated whenever the key is updated past a given versionstamp.
132  fn subscribe(&self, key: Vec<u8>) -> SqliteKeySubscription {
133    let mut key_watchers = self.0.key_watchers.write().unwrap();
134    let receiver = match key_watchers.entry(key.clone()) {
135      Entry::Occupied(entry) => entry.get().subscribe(),
136      Entry::Vacant(entry) => {
137        let (sender, receiver) = watch::channel([0; 10]);
138        entry.insert(sender);
139        receiver
140      }
141    };
142    SqliteKeySubscription {
143      notifier: Arc::downgrade(&self.0),
144      key: Some(key),
145      receiver,
146    }
147  }
148}
149
150impl SqliteKeySubscription {
151  /// Wait until the key has been updated since the given versionstamp.
152  ///
153  /// Returns false if the database is closing. Returns true if the key has been
154  /// updated.
155  async fn wait_until_updated(
156    &mut self,
157    last_read_versionstamp: Versionstamp,
158  ) -> bool {
159    let res = self
160      .receiver
161      .wait_for(|t| *t > last_read_versionstamp)
162      .await;
163    res.is_ok() // Err(_) means the database is closing.
164  }
165}
166
167impl Drop for SqliteKeySubscription {
168  fn drop(&mut self) {
169    if let Some(notifier) = self.notifier.upgrade() {
170      let key = self.key.take().unwrap();
171      let mut key_watchers = notifier.key_watchers.write().unwrap();
172      match key_watchers.entry(key) {
173        Entry::Occupied(entry) => {
174          // If there is only one subscriber left (this struct), then remove
175          // the entry from the map.
176          if entry.get().receiver_count() == 1 {
177            entry.remove();
178          }
179        }
180        Entry::Vacant(_) => unreachable!("the entry should still exist"),
181      }
182    }
183  }
184}
185
186#[derive(Clone)]
187pub struct Sqlite {
188  shutdown_notify: Arc<Notify>,
189  notifier: SqliteNotifier,
190  write_worker: SqliteWorker,
191  read_workers: Vec<SqliteWorker>,
192}
193
194#[derive(Clone)]
195struct SqliteWorker {
196  request_tx: tokio::sync::mpsc::Sender<SqliteRequest>,
197  join_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
198}
199
200#[derive(Clone, Debug)]
201pub struct SqliteConfig {
202  pub num_workers: usize,
203  pub batch_timeout: Option<Duration>,
204}
205
206#[derive(Debug, thiserror::Error, deno_error::JsError)]
207pub enum SqliteCreateError {
208  #[class(type)]
209  #[error("num_workers must be at least 1")]
210  NumWorkersTooSmall,
211  #[class(inherit)]
212  #[error(transparent)]
213  SqliteBackend(#[from] SqliteBackendError),
214  #[class(inherit)]
215  #[error(transparent)]
216  Other(#[from] JsErrorBox),
217}
218
219impl Sqlite {
220  pub fn new(
221    mut conn_gen: impl FnMut() -> Result<
222      (rusqlite::Connection, Box<dyn RngCore + Send>),
223      JsErrorBox,
224    >,
225    notifier: SqliteNotifier,
226    config: SqliteConfig,
227  ) -> Result<Sqlite, SqliteCreateError> {
228    let shutdown_notify = Arc::new(Notify::new());
229    let batch_timeout = config.batch_timeout;
230
231    if config.num_workers == 0 {
232      return Err(SqliteCreateError::NumWorkersTooSmall);
233    }
234
235    let mut write_worker: Option<SqliteWorker> = None;
236    let mut read_workers = Vec::with_capacity(config.num_workers - 1);
237
238    // This fence is used to block the current thread until all workers have got the
239    // `Notified` future, to ensure no race is possible during a shutdown `notify_waiters`.
240    let init_fence = Arc::new(Barrier::new(config.num_workers + 1));
241
242    for i in 0..config.num_workers {
243      let (request_tx, request_rx) = tokio::sync::mpsc::channel(1);
244      let (conn, versionstamp_rng) = conn_gen()?;
245      let backend =
246        SqliteBackend::new(conn, notifier.clone(), versionstamp_rng, i != 0)?;
247      let init_fence = init_fence.clone();
248      let shutdown_notify = shutdown_notify.clone();
249      let join_handle: JoinHandle<()> = std::thread::Builder::new()
250        .name(format!("sw-{i}"))
251        .spawn(move || {
252          tokio::runtime::Builder::new_current_thread()
253            .enable_all()
254            .build()
255            .unwrap()
256            .block_on(async move {
257              // We need to fence
258              let shutdown_notify = shutdown_notify.notified();
259              init_fence.wait();
260              sqlite_thread(backend, shutdown_notify, request_rx, batch_timeout)
261                .await
262            });
263        })
264        .unwrap();
265      let worker = SqliteWorker {
266        request_tx,
267        join_handle: Arc::new(Mutex::new(Some(join_handle))),
268      };
269      if i == 0 {
270        write_worker = Some(worker);
271      } else {
272        read_workers.push(worker);
273      }
274    }
275
276    assert_eq!(read_workers.len(), config.num_workers - 1);
277
278    init_fence.wait();
279
280    Ok(Self {
281      shutdown_notify,
282      notifier,
283      write_worker: write_worker.unwrap(),
284      read_workers,
285    })
286  }
287}
288
289async fn sqlite_thread(
290  mut backend: SqliteBackend,
291  shutdown_notify: Notified<'_>,
292  request_rx: tokio::sync::mpsc::Receiver<SqliteRequest>,
293  batch_timeout: Option<Duration>,
294) {
295  let mut shutdown_notify = std::pin::pin!(shutdown_notify);
296  let start = utc_now();
297  if !backend.readonly {
298    if let Err(err) = backend.queue_cleanup() {
299      panic!("KV queue cleanup failed: {err}");
300    }
301  }
302  let mut dequeue_channels: VecDeque<oneshot::Sender<DequeuedMessage>> =
303    VecDeque::with_capacity(4);
304  let queue_next_ready = if backend.readonly {
305    None
306  } else {
307    match backend.queue_next_ready() {
308      Ok(queue_next_ready) => queue_next_ready,
309      Err(err) => panic!("KV queue_next_ready failed: {err}"),
310    }
311  };
312  let mut queue_dequeue_deadline = compute_deadline_with_max_and_jitter(
313    queue_next_ready,
314    QUEUE_DEQUEUE_FALLBACK_INTERVAL,
315    QUEUE_DEQUEUE_JITTER,
316  );
317  let mut queue_cleanup_deadline =
318    start + duration_with_jitter(QUEUE_RUNNING_CLEANUP_INTERVAL);
319  let mut queue_keepalive_deadline =
320    start + duration_with_jitter(QUEUE_MESSAGE_DEADLINE_UPDATE_INTERVAL);
321  let expiry_next_ready = if backend.readonly {
322    None
323  } else {
324    match backend.collect_expired() {
325      Ok(expiry_next_ready) => expiry_next_ready,
326      Err(err) => panic!("KV collect expired failed: {err}"),
327    }
328  };
329  let mut expiry_deadline = compute_deadline_with_max_and_jitter(
330    expiry_next_ready,
331    EXPIRY_FALLBACK_INTERVAL,
332    EXPIRY_JITTER,
333  );
334  let mut request_rx =
335    std::pin::pin!(ReceiverStream::new(request_rx).peekable());
336  loop {
337    let now = utc_now();
338    let mut closest_deadline = queue_cleanup_deadline
339      .min(queue_keepalive_deadline)
340      .min(expiry_deadline);
341    if !dequeue_channels.is_empty() {
342      closest_deadline = closest_deadline.min(queue_dequeue_deadline);
343    }
344    let timer = if backend.readonly {
345      Either::Left(futures::future::pending::<()>())
346    } else if let Ok(time_to_closest_deadline) =
347      closest_deadline.signed_duration_since(now).to_std()
348    {
349      Either::Right(Either::Left(tokio::time::sleep(time_to_closest_deadline)))
350    } else {
351      Either::Right(Either::Right(futures::future::ready(())))
352    };
353    select! {
354      biased;
355      _ = &mut shutdown_notify => {
356        break;
357      },
358      _ = backend.notifier.0.dequeue_notify.notified(), if !dequeue_channels.is_empty() => {
359        let queue_next_ready = match backend.queue_next_ready() {
360          Ok(queue_next_ready) => queue_next_ready,
361          Err(err) => panic!("KV queue_next_ready failed: {err}"),
362        };
363        queue_dequeue_deadline = compute_deadline_with_max_and_jitter(queue_next_ready, QUEUE_DEQUEUE_FALLBACK_INTERVAL, QUEUE_DEQUEUE_JITTER);
364      },
365      req = request_rx.next() => {
366        match req {
367          Some(SqliteRequest::SnapshotRead { requests, options, sender }) => {
368            let result = backend.snapshot_read(requests, options);
369            sender.send(result).ok(); // ignore error if receiver is gone
370          },
371          Some(SqliteRequest::AtomicWrite { write, sender }) => {
372            let mut batch_writes = vec![write];
373            let mut batch_tx = vec![sender];
374
375            if let Some(batch_timeout) = batch_timeout {
376              let mut deadline = std::pin::pin!(tokio::time::sleep(batch_timeout));
377              while let Either::Left((Some(x), _)) = futures::future::select(request_rx.as_mut().peek(), &mut deadline).await {
378                if !matches!(x, SqliteRequest::AtomicWrite { .. }) {
379                  break;
380                }
381                let x = request_rx.next().await.unwrap();
382                match x {
383                  SqliteRequest::AtomicWrite { write, sender } => {
384                    batch_writes.push(write);
385                    batch_tx.push(sender);
386                  },
387                  _ => unreachable!(),
388                }
389              }
390            }
391
392            let result = backend.atomic_write_batched(batch_writes);
393            for (result, tx) in result.into_iter().zip(batch_tx.into_iter()) {
394              tx.send(result).ok(); // ignore error if receiver is gone
395            }
396          },
397          Some(SqliteRequest::QueueDequeueMessage { sender }) => {
398            dequeue_channels.push_back(sender);
399          },
400          Some(SqliteRequest::QueueFinishMessage { id, success, sender }) => {
401            let result = backend.queue_finish_message(&id, success);
402            sender.send(result).ok(); // ignore error if receiver is gone
403          },
404          None => break,
405        }
406      },
407      _ = timer => {
408        if now >= queue_cleanup_deadline {
409          queue_cleanup_deadline = now.add(duration_with_jitter(QUEUE_RUNNING_CLEANUP_INTERVAL));
410          if let Err(err) = backend.queue_cleanup() {
411            panic!("KV queue cleanup failed: {err}");
412          }
413        }
414        if now >= queue_keepalive_deadline {
415          queue_keepalive_deadline = now.add(duration_with_jitter(QUEUE_MESSAGE_DEADLINE_UPDATE_INTERVAL));
416          if let Err(err) = backend.queue_running_keepalive() {
417            panic!("KV queue running keepalive failed: {err}");
418          }
419        }
420        if now >= queue_dequeue_deadline && !dequeue_channels.is_empty() {
421          match backend.queue_dequeue_message() {
422            Ok((Some(dequeued_message), next_ready)) => {
423              let sender = dequeue_channels.pop_front().unwrap();
424              sender.send(dequeued_message).ok(); // ignore error if receiver is gone
425              queue_dequeue_deadline = compute_deadline_with_max_and_jitter(next_ready, QUEUE_DEQUEUE_FALLBACK_INTERVAL, QUEUE_DEQUEUE_JITTER);
426            },
427            Ok((None, next_ready)) => {
428              queue_dequeue_deadline = compute_deadline_with_max_and_jitter(next_ready, QUEUE_DEQUEUE_FALLBACK_INTERVAL, QUEUE_DEQUEUE_JITTER);
429            },
430            Err(err) => {
431              panic!("KV queue dequeue failed: {err}");
432            },
433          }
434        }
435        if now >= expiry_deadline {
436          match backend.collect_expired() {
437            Ok(next_ready) => {
438              expiry_deadline = compute_deadline_with_max_and_jitter(next_ready, EXPIRY_FALLBACK_INTERVAL, EXPIRY_JITTER);
439            },
440            Err(err) => {
441              panic!("KV collect expired failed: {err}");
442            },
443          }
444        }
445      },
446    }
447  }
448}
449
450//  Jitter to the duration +- ~10%.
451fn duration_with_jitter(duration: Duration) -> std::time::Duration {
452  let secs = duration.as_secs_f64();
453  let secs = secs * (0.9 + rand::random::<f64>() * 0.2);
454  std::time::Duration::from_secs_f64(secs)
455}
456
457fn duration_with_total_jitter(duration: Duration) -> std::time::Duration {
458  let secs = duration.as_secs_f64();
459  let secs = secs * rand::random::<f64>();
460  std::time::Duration::from_secs_f64(secs)
461}
462
463fn compute_deadline_with_max_and_jitter(
464  next_ready: Option<DateTime<Utc>>,
465  max: Duration,
466  jitter_duration: Duration,
467) -> DateTime<Utc> {
468  let fallback = utc_now() + max;
469  next_ready.unwrap_or(fallback).min(fallback)
470    + duration_with_total_jitter(jitter_duration)
471}
472
473impl Sqlite {
474  pub async fn snapshot_read(
475    &self,
476    requests: Vec<ReadRange>,
477    options: SnapshotReadOptions,
478  ) -> Result<Vec<ReadRangeOutput>, SqliteBackendError> {
479    let (sender, receiver) = oneshot::channel();
480
481    let slot = if let Some(x) = self
482      .read_workers
483      .iter()
484      .find_map(|x| x.request_tx.try_reserve().ok())
485    {
486      x
487    } else if let Ok(x) = self.write_worker.request_tx.try_reserve() {
488      x
489    } else {
490      futures::future::select_all(
491        self
492          .read_workers
493          .iter()
494          .chain(std::iter::once(&self.write_worker))
495          .map(|x| x.request_tx.reserve().boxed()),
496      )
497      .await
498      .0
499      .map_err(|_| SqliteBackendError::DatabaseClosed)?
500    };
501    slot.send(SqliteRequest::SnapshotRead {
502      requests,
503      options,
504      sender,
505    });
506    let (ranges, _current_versionstamp) = receiver
507      .await
508      .map_err(|_| SqliteBackendError::DatabaseClosed)??;
509    Ok(ranges)
510  }
511
512  pub async fn atomic_write(
513    &self,
514    write: AtomicWrite,
515  ) -> Result<Option<CommitResult>, SqliteBackendError> {
516    let (sender, receiver) = oneshot::channel();
517    self
518      .write_worker
519      .request_tx
520      .send(SqliteRequest::AtomicWrite { write, sender })
521      .await
522      .map_err(|_| SqliteBackendError::DatabaseClosed)?;
523    receiver
524      .await
525      .map_err(|_| SqliteBackendError::DatabaseClosed)?
526  }
527
528  pub async fn dequeue_next_message(
529    &self,
530  ) -> Result<Option<SqliteMessageHandle>, SqliteBackendError> {
531    let (sender, receiver) = oneshot::channel();
532    let req = SqliteRequest::QueueDequeueMessage { sender };
533    if self.write_worker.request_tx.send(req).await.is_err() {
534      return Ok(None);
535    }
536    let Ok(dequeued_message) = receiver.await else {
537      return Ok(None);
538    };
539    Ok(Some(SqliteMessageHandle {
540      id: dequeued_message.id,
541      payload: Some(dequeued_message.payload),
542      request_tx: self.write_worker.request_tx.clone(),
543    }))
544  }
545
546  pub fn watch(
547    &self,
548    keys: Vec<Vec<u8>>,
549  ) -> Pin<Box<dyn Stream<Item = Result<Vec<WatchKeyOutput>, JsErrorBox>> + Send>>
550  {
551    let requests = keys
552      .iter()
553      .map(|key| ReadRange {
554        end: key.iter().copied().chain(Some(0)).collect(),
555        start: key.clone(),
556        limit: NonZeroU32::new(1).unwrap(),
557        reverse: false,
558      })
559      .collect::<Vec<_>>();
560    let options = SnapshotReadOptions {
561      consistency: denokv_proto::Consistency::Eventual,
562    };
563    let this = self.clone();
564    let stream = try_stream! {
565      let mut subscriptions: Vec<SqliteKeySubscription> = Vec::new();
566      for key in keys {
567        subscriptions.push(this.notifier.subscribe(key));
568      }
569
570      loop {
571        let (sender, receiver) = oneshot::channel();
572        let res = this
573          .write_worker
574          .request_tx
575          .send(SqliteRequest::SnapshotRead {
576            requests: requests.clone(),
577            options: options.clone(),
578            sender,
579          })
580          .await;
581        if res.is_err() {
582          return; // database is closing
583        }
584        let Some(res) = receiver.await.ok() else {
585          return; // database is closing
586        };
587        let (ranges, current_versionstamp) = res.map_err(JsErrorBox::from_err)?;
588        let mut outputs = Vec::new();
589        for range in ranges {
590          let entry = range.entries.into_iter().next();
591          outputs.push(WatchKeyOutput::Changed { entry });
592        }
593        yield outputs;
594
595        // If subscriptions is empty, `future::select_all` will panic
596        if subscriptions.is_empty() {
597          return;
598        }
599        let futures = subscriptions.iter_mut().map(|subscription| {
600          Box::pin(subscription.wait_until_updated(current_versionstamp))
601        });
602        if !futures::future::select_all(futures).await.0 {
603          return; // database is closing
604        }
605      }
606    };
607    Box::pin(stream)
608  }
609
610  pub fn close(&self) {
611    self.shutdown_notify.notify_waiters();
612    for w in std::iter::once(&self.write_worker).chain(self.read_workers.iter())
613    {
614      w.join_handle
615        .lock()
616        .unwrap()
617        .take()
618        .expect("can't close database twice")
619        .join()
620        .unwrap();
621    }
622  }
623}
624
625#[async_trait::async_trait(?Send)]
626impl Database for Sqlite {
627  type QMH = SqliteMessageHandle;
628
629  async fn snapshot_read(
630    &self,
631    requests: Vec<ReadRange>,
632    options: SnapshotReadOptions,
633  ) -> Result<Vec<ReadRangeOutput>, JsErrorBox> {
634    let ranges = Sqlite::snapshot_read(self, requests, options)
635      .await
636      .map_err(JsErrorBox::from_err)?;
637    Ok(ranges)
638  }
639
640  async fn atomic_write(
641    &self,
642    write: AtomicWrite,
643  ) -> Result<Option<CommitResult>, JsErrorBox> {
644    let res = Sqlite::atomic_write(self, write)
645      .await
646      .map_err(JsErrorBox::from_err)?;
647    Ok(res)
648  }
649
650  async fn dequeue_next_message(
651    &self,
652  ) -> Result<Option<Self::QMH>, JsErrorBox> {
653    let message_handle = Sqlite::dequeue_next_message(self)
654      .await
655      .map_err(JsErrorBox::from_err)?;
656    Ok(message_handle)
657  }
658
659  fn watch(
660    &self,
661    keys: Vec<Vec<u8>>,
662  ) -> Pin<Box<dyn Stream<Item = Result<Vec<WatchKeyOutput>, JsErrorBox>>>> {
663    Sqlite::watch(self, keys)
664  }
665
666  fn close(&self) {
667    Sqlite::close(self);
668  }
669}
670
671pub struct SqliteMessageHandle {
672  id: QueueMessageId,
673  payload: Option<Vec<u8>>,
674  request_tx: tokio::sync::mpsc::Sender<SqliteRequest>,
675}
676
677impl SqliteMessageHandle {
678  pub async fn finish(&self, success: bool) -> Result<(), SqliteBackendError> {
679    let (sender, receiver) = oneshot::channel();
680    self
681      .request_tx
682      .send(SqliteRequest::QueueFinishMessage {
683        id: self.id.clone(),
684        success,
685        sender,
686      })
687      .await
688      .map_err(|_| SqliteBackendError::DatabaseClosed)?;
689    receiver
690      .await
691      .map_err(|_| SqliteBackendError::DatabaseClosed)?
692  }
693
694  pub async fn take_payload(&mut self) -> Result<Vec<u8>, SqliteBackendError> {
695    Ok(self.payload.take().expect("can't take payload twice"))
696  }
697}
698
699#[async_trait::async_trait(?Send)]
700impl QueueMessageHandle for SqliteMessageHandle {
701  async fn finish(&self, success: bool) -> Result<(), JsErrorBox> {
702    SqliteMessageHandle::finish(self, success)
703      .await
704      .map_err(JsErrorBox::from_err)?;
705    Ok(())
706  }
707
708  async fn take_payload(&mut self) -> Result<Vec<u8>, JsErrorBox> {
709    let payload = SqliteMessageHandle::take_payload(self)
710      .await
711      .map_err(JsErrorBox::from_err)?;
712    Ok(payload)
713  }
714}