1mod backend;
4mod sum_operand;
5mod time;
6
7use std::collections::hash_map::Entry;
8use std::collections::HashMap;
9use std::collections::VecDeque;
10use std::num::NonZeroU32;
11use std::ops::Add;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::sync::Barrier;
15use std::sync::Mutex;
16use std::sync::RwLock;
17use std::sync::Weak;
18use std::thread::JoinHandle;
19use std::time::Duration;
20
21pub use crate::backend::sqlite_retry_loop;
22use crate::backend::DequeuedMessage;
23use crate::backend::QueueMessageId;
24use crate::backend::SqliteBackend;
25pub use crate::backend::SqliteBackendError;
26use async_stream::try_stream;
27use chrono::DateTime;
28use chrono::Utc;
29use deno_error::JsErrorBox;
30use denokv_proto::AtomicWrite;
31use denokv_proto::CommitResult;
32use denokv_proto::Database;
33use denokv_proto::QueueMessageHandle;
34use denokv_proto::ReadRange;
35use denokv_proto::ReadRangeOutput;
36use denokv_proto::SnapshotReadOptions;
37use denokv_proto::Versionstamp;
38use denokv_proto::WatchKeyOutput;
39use futures::future::Either;
40use futures::FutureExt;
41use futures::Stream;
42use futures::StreamExt;
43use rand::RngCore;
44pub use rusqlite::Connection;
45use time::utc_now;
46use tokio::select;
47use tokio::sync::futures::Notified;
48use tokio::sync::oneshot;
49use tokio::sync::watch;
50use tokio::sync::Notify;
51use tokio_stream::wrappers::ReceiverStream;
52
53const QUEUE_RUNNING_CLEANUP_INTERVAL: Duration = Duration::from_secs(60);
56const QUEUE_MESSAGE_DEADLINE_UPDATE_INTERVAL: Duration = Duration::from_secs(2);
59const QUEUE_DEQUEUE_FALLBACK_INTERVAL: Duration = Duration::from_secs(60);
62const EXPIRY_FALLBACK_INTERVAL: Duration = Duration::from_secs(60);
65const QUEUE_DEQUEUE_JITTER: Duration = Duration::from_millis(100);
67const EXPIRY_JITTER: Duration = Duration::from_secs(1);
69
70enum SqliteRequest {
71 SnapshotRead {
72 requests: Vec<ReadRange>,
73 options: SnapshotReadOptions,
74 sender: oneshot::Sender<
75 Result<(Vec<ReadRangeOutput>, Versionstamp), SqliteBackendError>,
76 >,
77 },
78 AtomicWrite {
79 write: AtomicWrite,
80 sender: oneshot::Sender<Result<Option<CommitResult>, SqliteBackendError>>,
81 },
82 QueueDequeueMessage {
83 sender: oneshot::Sender<DequeuedMessage>,
84 },
85 QueueFinishMessage {
86 id: QueueMessageId,
87 success: bool,
88 sender: oneshot::Sender<Result<(), SqliteBackendError>>,
89 },
90}
91
92#[derive(Default, Clone)]
97pub struct SqliteNotifier(Arc<SqliteNotifierInner>);
98
99#[derive(Default)]
100struct SqliteNotifierInner {
101 dequeue_notify: Notify,
102 key_watchers: RwLock<HashMap<Vec<u8>, watch::Sender<Versionstamp>>>,
103}
104
105struct SqliteKeySubscription {
106 notifier: Weak<SqliteNotifierInner>,
107 key: Option<Vec<u8>>,
108 receiver: watch::Receiver<Versionstamp>,
109}
110
111impl SqliteNotifier {
112 fn schedule_dequeue(&self) {
113 self.0.dequeue_notify.notify_one();
114 }
115
116 fn notify_key_update(&self, key: &[u8], versionstamp: Versionstamp) {
117 let key_watchers = self.0.key_watchers.read().unwrap();
118 if let Some(sender) = key_watchers.get(key) {
119 sender.send_if_modified(|in_place_versionstamp| {
120 if *in_place_versionstamp < versionstamp {
121 *in_place_versionstamp = versionstamp;
122 true
123 } else {
124 false
125 }
126 });
127 }
128 }
129
130 fn subscribe(&self, key: Vec<u8>) -> SqliteKeySubscription {
133 let mut key_watchers = self.0.key_watchers.write().unwrap();
134 let receiver = match key_watchers.entry(key.clone()) {
135 Entry::Occupied(entry) => entry.get().subscribe(),
136 Entry::Vacant(entry) => {
137 let (sender, receiver) = watch::channel([0; 10]);
138 entry.insert(sender);
139 receiver
140 }
141 };
142 SqliteKeySubscription {
143 notifier: Arc::downgrade(&self.0),
144 key: Some(key),
145 receiver,
146 }
147 }
148}
149
150impl SqliteKeySubscription {
151 async fn wait_until_updated(
156 &mut self,
157 last_read_versionstamp: Versionstamp,
158 ) -> bool {
159 let res = self
160 .receiver
161 .wait_for(|t| *t > last_read_versionstamp)
162 .await;
163 res.is_ok() }
165}
166
167impl Drop for SqliteKeySubscription {
168 fn drop(&mut self) {
169 if let Some(notifier) = self.notifier.upgrade() {
170 let key = self.key.take().unwrap();
171 let mut key_watchers = notifier.key_watchers.write().unwrap();
172 match key_watchers.entry(key) {
173 Entry::Occupied(entry) => {
174 if entry.get().receiver_count() == 1 {
177 entry.remove();
178 }
179 }
180 Entry::Vacant(_) => unreachable!("the entry should still exist"),
181 }
182 }
183 }
184}
185
186#[derive(Clone)]
187pub struct Sqlite {
188 shutdown_notify: Arc<Notify>,
189 notifier: SqliteNotifier,
190 write_worker: SqliteWorker,
191 read_workers: Vec<SqliteWorker>,
192}
193
194#[derive(Clone)]
195struct SqliteWorker {
196 request_tx: tokio::sync::mpsc::Sender<SqliteRequest>,
197 join_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
198}
199
200#[derive(Clone, Debug)]
201pub struct SqliteConfig {
202 pub num_workers: usize,
203 pub batch_timeout: Option<Duration>,
204}
205
206#[derive(Debug, thiserror::Error, deno_error::JsError)]
207pub enum SqliteCreateError {
208 #[class(type)]
209 #[error("num_workers must be at least 1")]
210 NumWorkersTooSmall,
211 #[class(inherit)]
212 #[error(transparent)]
213 SqliteBackend(#[from] SqliteBackendError),
214 #[class(inherit)]
215 #[error(transparent)]
216 Other(#[from] JsErrorBox),
217}
218
219impl Sqlite {
220 pub fn new(
221 mut conn_gen: impl FnMut() -> Result<
222 (rusqlite::Connection, Box<dyn RngCore + Send>),
223 JsErrorBox,
224 >,
225 notifier: SqliteNotifier,
226 config: SqliteConfig,
227 ) -> Result<Sqlite, SqliteCreateError> {
228 let shutdown_notify = Arc::new(Notify::new());
229 let batch_timeout = config.batch_timeout;
230
231 if config.num_workers == 0 {
232 return Err(SqliteCreateError::NumWorkersTooSmall);
233 }
234
235 let mut write_worker: Option<SqliteWorker> = None;
236 let mut read_workers = Vec::with_capacity(config.num_workers - 1);
237
238 let init_fence = Arc::new(Barrier::new(config.num_workers + 1));
241
242 for i in 0..config.num_workers {
243 let (request_tx, request_rx) = tokio::sync::mpsc::channel(1);
244 let (conn, versionstamp_rng) = conn_gen()?;
245 let backend =
246 SqliteBackend::new(conn, notifier.clone(), versionstamp_rng, i != 0)?;
247 let init_fence = init_fence.clone();
248 let shutdown_notify = shutdown_notify.clone();
249 let join_handle: JoinHandle<()> = std::thread::Builder::new()
250 .name(format!("sw-{i}"))
251 .spawn(move || {
252 tokio::runtime::Builder::new_current_thread()
253 .enable_all()
254 .build()
255 .unwrap()
256 .block_on(async move {
257 let shutdown_notify = shutdown_notify.notified();
259 init_fence.wait();
260 sqlite_thread(backend, shutdown_notify, request_rx, batch_timeout)
261 .await
262 });
263 })
264 .unwrap();
265 let worker = SqliteWorker {
266 request_tx,
267 join_handle: Arc::new(Mutex::new(Some(join_handle))),
268 };
269 if i == 0 {
270 write_worker = Some(worker);
271 } else {
272 read_workers.push(worker);
273 }
274 }
275
276 assert_eq!(read_workers.len(), config.num_workers - 1);
277
278 init_fence.wait();
279
280 Ok(Self {
281 shutdown_notify,
282 notifier,
283 write_worker: write_worker.unwrap(),
284 read_workers,
285 })
286 }
287}
288
289async fn sqlite_thread(
290 mut backend: SqliteBackend,
291 shutdown_notify: Notified<'_>,
292 request_rx: tokio::sync::mpsc::Receiver<SqliteRequest>,
293 batch_timeout: Option<Duration>,
294) {
295 let mut shutdown_notify = std::pin::pin!(shutdown_notify);
296 let start = utc_now();
297 if !backend.readonly {
298 if let Err(err) = backend.queue_cleanup() {
299 panic!("KV queue cleanup failed: {err}");
300 }
301 }
302 let mut dequeue_channels: VecDeque<oneshot::Sender<DequeuedMessage>> =
303 VecDeque::with_capacity(4);
304 let queue_next_ready = if backend.readonly {
305 None
306 } else {
307 match backend.queue_next_ready() {
308 Ok(queue_next_ready) => queue_next_ready,
309 Err(err) => panic!("KV queue_next_ready failed: {err}"),
310 }
311 };
312 let mut queue_dequeue_deadline = compute_deadline_with_max_and_jitter(
313 queue_next_ready,
314 QUEUE_DEQUEUE_FALLBACK_INTERVAL,
315 QUEUE_DEQUEUE_JITTER,
316 );
317 let mut queue_cleanup_deadline =
318 start + duration_with_jitter(QUEUE_RUNNING_CLEANUP_INTERVAL);
319 let mut queue_keepalive_deadline =
320 start + duration_with_jitter(QUEUE_MESSAGE_DEADLINE_UPDATE_INTERVAL);
321 let expiry_next_ready = if backend.readonly {
322 None
323 } else {
324 match backend.collect_expired() {
325 Ok(expiry_next_ready) => expiry_next_ready,
326 Err(err) => panic!("KV collect expired failed: {err}"),
327 }
328 };
329 let mut expiry_deadline = compute_deadline_with_max_and_jitter(
330 expiry_next_ready,
331 EXPIRY_FALLBACK_INTERVAL,
332 EXPIRY_JITTER,
333 );
334 let mut request_rx =
335 std::pin::pin!(ReceiverStream::new(request_rx).peekable());
336 loop {
337 let now = utc_now();
338 let mut closest_deadline = queue_cleanup_deadline
339 .min(queue_keepalive_deadline)
340 .min(expiry_deadline);
341 if !dequeue_channels.is_empty() {
342 closest_deadline = closest_deadline.min(queue_dequeue_deadline);
343 }
344 let timer = if backend.readonly {
345 Either::Left(futures::future::pending::<()>())
346 } else if let Ok(time_to_closest_deadline) =
347 closest_deadline.signed_duration_since(now).to_std()
348 {
349 Either::Right(Either::Left(tokio::time::sleep(time_to_closest_deadline)))
350 } else {
351 Either::Right(Either::Right(futures::future::ready(())))
352 };
353 select! {
354 biased;
355 _ = &mut shutdown_notify => {
356 break;
357 },
358 _ = backend.notifier.0.dequeue_notify.notified(), if !dequeue_channels.is_empty() => {
359 let queue_next_ready = match backend.queue_next_ready() {
360 Ok(queue_next_ready) => queue_next_ready,
361 Err(err) => panic!("KV queue_next_ready failed: {err}"),
362 };
363 queue_dequeue_deadline = compute_deadline_with_max_and_jitter(queue_next_ready, QUEUE_DEQUEUE_FALLBACK_INTERVAL, QUEUE_DEQUEUE_JITTER);
364 },
365 req = request_rx.next() => {
366 match req {
367 Some(SqliteRequest::SnapshotRead { requests, options, sender }) => {
368 let result = backend.snapshot_read(requests, options);
369 sender.send(result).ok(); },
371 Some(SqliteRequest::AtomicWrite { write, sender }) => {
372 let mut batch_writes = vec![write];
373 let mut batch_tx = vec![sender];
374
375 if let Some(batch_timeout) = batch_timeout {
376 let mut deadline = std::pin::pin!(tokio::time::sleep(batch_timeout));
377 while let Either::Left((Some(x), _)) = futures::future::select(request_rx.as_mut().peek(), &mut deadline).await {
378 if !matches!(x, SqliteRequest::AtomicWrite { .. }) {
379 break;
380 }
381 let x = request_rx.next().await.unwrap();
382 match x {
383 SqliteRequest::AtomicWrite { write, sender } => {
384 batch_writes.push(write);
385 batch_tx.push(sender);
386 },
387 _ => unreachable!(),
388 }
389 }
390 }
391
392 let result = backend.atomic_write_batched(batch_writes);
393 for (result, tx) in result.into_iter().zip(batch_tx.into_iter()) {
394 tx.send(result).ok(); }
396 },
397 Some(SqliteRequest::QueueDequeueMessage { sender }) => {
398 dequeue_channels.push_back(sender);
399 },
400 Some(SqliteRequest::QueueFinishMessage { id, success, sender }) => {
401 let result = backend.queue_finish_message(&id, success);
402 sender.send(result).ok(); },
404 None => break,
405 }
406 },
407 _ = timer => {
408 if now >= queue_cleanup_deadline {
409 queue_cleanup_deadline = now.add(duration_with_jitter(QUEUE_RUNNING_CLEANUP_INTERVAL));
410 if let Err(err) = backend.queue_cleanup() {
411 panic!("KV queue cleanup failed: {err}");
412 }
413 }
414 if now >= queue_keepalive_deadline {
415 queue_keepalive_deadline = now.add(duration_with_jitter(QUEUE_MESSAGE_DEADLINE_UPDATE_INTERVAL));
416 if let Err(err) = backend.queue_running_keepalive() {
417 panic!("KV queue running keepalive failed: {err}");
418 }
419 }
420 if now >= queue_dequeue_deadline && !dequeue_channels.is_empty() {
421 match backend.queue_dequeue_message() {
422 Ok((Some(dequeued_message), next_ready)) => {
423 let sender = dequeue_channels.pop_front().unwrap();
424 sender.send(dequeued_message).ok(); queue_dequeue_deadline = compute_deadline_with_max_and_jitter(next_ready, QUEUE_DEQUEUE_FALLBACK_INTERVAL, QUEUE_DEQUEUE_JITTER);
426 },
427 Ok((None, next_ready)) => {
428 queue_dequeue_deadline = compute_deadline_with_max_and_jitter(next_ready, QUEUE_DEQUEUE_FALLBACK_INTERVAL, QUEUE_DEQUEUE_JITTER);
429 },
430 Err(err) => {
431 panic!("KV queue dequeue failed: {err}");
432 },
433 }
434 }
435 if now >= expiry_deadline {
436 match backend.collect_expired() {
437 Ok(next_ready) => {
438 expiry_deadline = compute_deadline_with_max_and_jitter(next_ready, EXPIRY_FALLBACK_INTERVAL, EXPIRY_JITTER);
439 },
440 Err(err) => {
441 panic!("KV collect expired failed: {err}");
442 },
443 }
444 }
445 },
446 }
447 }
448}
449
450fn duration_with_jitter(duration: Duration) -> std::time::Duration {
452 let secs = duration.as_secs_f64();
453 let secs = secs * (0.9 + rand::random::<f64>() * 0.2);
454 std::time::Duration::from_secs_f64(secs)
455}
456
457fn duration_with_total_jitter(duration: Duration) -> std::time::Duration {
458 let secs = duration.as_secs_f64();
459 let secs = secs * rand::random::<f64>();
460 std::time::Duration::from_secs_f64(secs)
461}
462
463fn compute_deadline_with_max_and_jitter(
464 next_ready: Option<DateTime<Utc>>,
465 max: Duration,
466 jitter_duration: Duration,
467) -> DateTime<Utc> {
468 let fallback = utc_now() + max;
469 next_ready.unwrap_or(fallback).min(fallback)
470 + duration_with_total_jitter(jitter_duration)
471}
472
473impl Sqlite {
474 pub async fn snapshot_read(
475 &self,
476 requests: Vec<ReadRange>,
477 options: SnapshotReadOptions,
478 ) -> Result<Vec<ReadRangeOutput>, SqliteBackendError> {
479 let (sender, receiver) = oneshot::channel();
480
481 let slot = if let Some(x) = self
482 .read_workers
483 .iter()
484 .find_map(|x| x.request_tx.try_reserve().ok())
485 {
486 x
487 } else if let Ok(x) = self.write_worker.request_tx.try_reserve() {
488 x
489 } else {
490 futures::future::select_all(
491 self
492 .read_workers
493 .iter()
494 .chain(std::iter::once(&self.write_worker))
495 .map(|x| x.request_tx.reserve().boxed()),
496 )
497 .await
498 .0
499 .map_err(|_| SqliteBackendError::DatabaseClosed)?
500 };
501 slot.send(SqliteRequest::SnapshotRead {
502 requests,
503 options,
504 sender,
505 });
506 let (ranges, _current_versionstamp) = receiver
507 .await
508 .map_err(|_| SqliteBackendError::DatabaseClosed)??;
509 Ok(ranges)
510 }
511
512 pub async fn atomic_write(
513 &self,
514 write: AtomicWrite,
515 ) -> Result<Option<CommitResult>, SqliteBackendError> {
516 let (sender, receiver) = oneshot::channel();
517 self
518 .write_worker
519 .request_tx
520 .send(SqliteRequest::AtomicWrite { write, sender })
521 .await
522 .map_err(|_| SqliteBackendError::DatabaseClosed)?;
523 receiver
524 .await
525 .map_err(|_| SqliteBackendError::DatabaseClosed)?
526 }
527
528 pub async fn dequeue_next_message(
529 &self,
530 ) -> Result<Option<SqliteMessageHandle>, SqliteBackendError> {
531 let (sender, receiver) = oneshot::channel();
532 let req = SqliteRequest::QueueDequeueMessage { sender };
533 if self.write_worker.request_tx.send(req).await.is_err() {
534 return Ok(None);
535 }
536 let Ok(dequeued_message) = receiver.await else {
537 return Ok(None);
538 };
539 Ok(Some(SqliteMessageHandle {
540 id: dequeued_message.id,
541 payload: Some(dequeued_message.payload),
542 request_tx: self.write_worker.request_tx.clone(),
543 }))
544 }
545
546 pub fn watch(
547 &self,
548 keys: Vec<Vec<u8>>,
549 ) -> Pin<Box<dyn Stream<Item = Result<Vec<WatchKeyOutput>, JsErrorBox>> + Send>>
550 {
551 let requests = keys
552 .iter()
553 .map(|key| ReadRange {
554 end: key.iter().copied().chain(Some(0)).collect(),
555 start: key.clone(),
556 limit: NonZeroU32::new(1).unwrap(),
557 reverse: false,
558 })
559 .collect::<Vec<_>>();
560 let options = SnapshotReadOptions {
561 consistency: denokv_proto::Consistency::Eventual,
562 };
563 let this = self.clone();
564 let stream = try_stream! {
565 let mut subscriptions: Vec<SqliteKeySubscription> = Vec::new();
566 for key in keys {
567 subscriptions.push(this.notifier.subscribe(key));
568 }
569
570 loop {
571 let (sender, receiver) = oneshot::channel();
572 let res = this
573 .write_worker
574 .request_tx
575 .send(SqliteRequest::SnapshotRead {
576 requests: requests.clone(),
577 options: options.clone(),
578 sender,
579 })
580 .await;
581 if res.is_err() {
582 return; }
584 let Some(res) = receiver.await.ok() else {
585 return; };
587 let (ranges, current_versionstamp) = res.map_err(JsErrorBox::from_err)?;
588 let mut outputs = Vec::new();
589 for range in ranges {
590 let entry = range.entries.into_iter().next();
591 outputs.push(WatchKeyOutput::Changed { entry });
592 }
593 yield outputs;
594
595 if subscriptions.is_empty() {
597 return;
598 }
599 let futures = subscriptions.iter_mut().map(|subscription| {
600 Box::pin(subscription.wait_until_updated(current_versionstamp))
601 });
602 if !futures::future::select_all(futures).await.0 {
603 return; }
605 }
606 };
607 Box::pin(stream)
608 }
609
610 pub fn close(&self) {
611 self.shutdown_notify.notify_waiters();
612 for w in std::iter::once(&self.write_worker).chain(self.read_workers.iter())
613 {
614 w.join_handle
615 .lock()
616 .unwrap()
617 .take()
618 .expect("can't close database twice")
619 .join()
620 .unwrap();
621 }
622 }
623}
624
625#[async_trait::async_trait(?Send)]
626impl Database for Sqlite {
627 type QMH = SqliteMessageHandle;
628
629 async fn snapshot_read(
630 &self,
631 requests: Vec<ReadRange>,
632 options: SnapshotReadOptions,
633 ) -> Result<Vec<ReadRangeOutput>, JsErrorBox> {
634 let ranges = Sqlite::snapshot_read(self, requests, options)
635 .await
636 .map_err(JsErrorBox::from_err)?;
637 Ok(ranges)
638 }
639
640 async fn atomic_write(
641 &self,
642 write: AtomicWrite,
643 ) -> Result<Option<CommitResult>, JsErrorBox> {
644 let res = Sqlite::atomic_write(self, write)
645 .await
646 .map_err(JsErrorBox::from_err)?;
647 Ok(res)
648 }
649
650 async fn dequeue_next_message(
651 &self,
652 ) -> Result<Option<Self::QMH>, JsErrorBox> {
653 let message_handle = Sqlite::dequeue_next_message(self)
654 .await
655 .map_err(JsErrorBox::from_err)?;
656 Ok(message_handle)
657 }
658
659 fn watch(
660 &self,
661 keys: Vec<Vec<u8>>,
662 ) -> Pin<Box<dyn Stream<Item = Result<Vec<WatchKeyOutput>, JsErrorBox>>>> {
663 Sqlite::watch(self, keys)
664 }
665
666 fn close(&self) {
667 Sqlite::close(self);
668 }
669}
670
671pub struct SqliteMessageHandle {
672 id: QueueMessageId,
673 payload: Option<Vec<u8>>,
674 request_tx: tokio::sync::mpsc::Sender<SqliteRequest>,
675}
676
677impl SqliteMessageHandle {
678 pub async fn finish(&self, success: bool) -> Result<(), SqliteBackendError> {
679 let (sender, receiver) = oneshot::channel();
680 self
681 .request_tx
682 .send(SqliteRequest::QueueFinishMessage {
683 id: self.id.clone(),
684 success,
685 sender,
686 })
687 .await
688 .map_err(|_| SqliteBackendError::DatabaseClosed)?;
689 receiver
690 .await
691 .map_err(|_| SqliteBackendError::DatabaseClosed)?
692 }
693
694 pub async fn take_payload(&mut self) -> Result<Vec<u8>, SqliteBackendError> {
695 Ok(self.payload.take().expect("can't take payload twice"))
696 }
697}
698
699#[async_trait::async_trait(?Send)]
700impl QueueMessageHandle for SqliteMessageHandle {
701 async fn finish(&self, success: bool) -> Result<(), JsErrorBox> {
702 SqliteMessageHandle::finish(self, success)
703 .await
704 .map_err(JsErrorBox::from_err)?;
705 Ok(())
706 }
707
708 async fn take_payload(&mut self) -> Result<Vec<u8>, JsErrorBox> {
709 let payload = SqliteMessageHandle::take_payload(self)
710 .await
711 .map_err(JsErrorBox::from_err)?;
712 Ok(payload)
713 }
714}