deno_kv/
lib.rs

1// Copyright 2018-2025 the Deno authors. MIT license.
2
3pub mod config;
4pub mod dynamic;
5mod interface;
6pub mod remote;
7pub mod sqlite;
8
9use std::borrow::Cow;
10use std::cell::RefCell;
11use std::num::NonZeroU32;
12use std::rc::Rc;
13use std::sync::Arc;
14use std::time::Duration;
15
16use base64::prelude::BASE64_URL_SAFE;
17use base64::Engine;
18use boxed_error::Boxed;
19use chrono::DateTime;
20use chrono::Utc;
21use deno_core::futures::StreamExt;
22use deno_core::op2;
23use deno_core::serde_v8::AnyValue;
24use deno_core::serde_v8::BigInt;
25use deno_core::AsyncRefCell;
26use deno_core::ByteString;
27use deno_core::CancelFuture;
28use deno_core::CancelHandle;
29use deno_core::JsBuffer;
30use deno_core::OpState;
31use deno_core::RcRef;
32use deno_core::Resource;
33use deno_core::ResourceId;
34use deno_core::ToJsBuffer;
35use deno_error::JsErrorBox;
36use deno_error::JsErrorClass;
37use deno_features::FeatureChecker;
38use denokv_proto::decode_key;
39use denokv_proto::encode_key;
40use denokv_proto::AtomicWrite;
41use denokv_proto::Check;
42use denokv_proto::Consistency;
43use denokv_proto::Database;
44use denokv_proto::Enqueue;
45use denokv_proto::Key;
46use denokv_proto::KeyPart;
47use denokv_proto::KvEntry;
48use denokv_proto::KvValue;
49use denokv_proto::Mutation;
50use denokv_proto::MutationKind;
51use denokv_proto::QueueMessageHandle;
52use denokv_proto::ReadRange;
53use denokv_proto::SnapshotReadOptions;
54use denokv_proto::WatchKeyOutput;
55use denokv_proto::WatchStream;
56use log::debug;
57use serde::Deserialize;
58use serde::Serialize;
59
60pub use crate::config::*;
61pub use crate::interface::*;
62
63pub const UNSTABLE_FEATURE_NAME: &str = "kv";
64
65deno_core::extension!(deno_kv,
66  deps = [ deno_console, deno_web ],
67  parameters = [ DBH: DatabaseHandler ],
68  ops = [
69    op_kv_database_open<DBH>,
70    op_kv_snapshot_read<DBH>,
71    op_kv_atomic_write<DBH>,
72    op_kv_encode_cursor,
73    op_kv_dequeue_next_message<DBH>,
74    op_kv_finish_dequeued_message<DBH>,
75    op_kv_watch<DBH>,
76    op_kv_watch_next,
77  ],
78  esm = [ "01_db.ts" ],
79  options = {
80    handler: DBH,
81    config: KvConfig,
82  },
83  state = |state, options| {
84    state.put(Rc::new(options.config));
85    state.put(Rc::new(options.handler));
86  }
87);
88
89struct DatabaseResource<DB: Database + 'static> {
90  db: DB,
91  cancel_handle: Rc<CancelHandle>,
92}
93
94impl<DB: Database + 'static> Resource for DatabaseResource<DB> {
95  fn name(&self) -> Cow<str> {
96    "database".into()
97  }
98
99  fn close(self: Rc<Self>) {
100    self.db.close();
101    self.cancel_handle.cancel();
102  }
103}
104
105struct DatabaseWatcherResource {
106  stream: AsyncRefCell<WatchStream>,
107  db_cancel_handle: Rc<CancelHandle>,
108  cancel_handle: Rc<CancelHandle>,
109}
110
111impl Resource for DatabaseWatcherResource {
112  fn name(&self) -> Cow<str> {
113    "databaseWatcher".into()
114  }
115
116  fn close(self: Rc<Self>) {
117    self.cancel_handle.cancel()
118  }
119}
120
121#[derive(Debug, Boxed, deno_error::JsError)]
122pub struct KvError(pub Box<KvErrorKind>);
123
124#[derive(Debug, thiserror::Error, deno_error::JsError)]
125pub enum KvErrorKind {
126  #[class(inherit)]
127  #[error(transparent)]
128  DatabaseHandler(JsErrorBox),
129  #[class(inherit)]
130  #[error(transparent)]
131  Resource(#[from] deno_core::error::ResourceError),
132  #[class(type)]
133  #[error("Too many ranges (max {0})")]
134  TooManyRanges(usize),
135  #[class(type)]
136  #[error("Too many entries (max {0})")]
137  TooManyEntries(usize),
138  #[class(type)]
139  #[error("Too many checks (max {0})")]
140  TooManyChecks(usize),
141  #[class(type)]
142  #[error("Too many mutations (max {0})")]
143  TooManyMutations(usize),
144  #[class(type)]
145  #[error("Too many keys (max {0})")]
146  TooManyKeys(usize),
147  #[class(type)]
148  #[error("limit must be greater than 0")]
149  InvalidLimit,
150  #[class(type)]
151  #[error("Invalid boundary key")]
152  InvalidBoundaryKey,
153  #[class(type)]
154  #[error("Key too large for read (max {0} bytes)")]
155  KeyTooLargeToRead(usize),
156  #[class(type)]
157  #[error("Key too large for write (max {0} bytes)")]
158  KeyTooLargeToWrite(usize),
159  #[class(type)]
160  #[error("Total mutation size too large (max {0} bytes)")]
161  TotalMutationTooLarge(usize),
162  #[class(type)]
163  #[error("Total key size too large (max {0} bytes)")]
164  TotalKeyTooLarge(usize),
165  #[class(inherit)]
166  #[error(transparent)]
167  Kv(JsErrorBox),
168  #[class(inherit)]
169  #[error(transparent)]
170  Io(#[from] std::io::Error),
171  #[class(type)]
172  #[error("Queue message not found")]
173  QueueMessageNotFound,
174  #[class(type)]
175  #[error("Start key is not in the keyspace defined by prefix")]
176  StartKeyNotInKeyspace,
177  #[class(type)]
178  #[error("End key is not in the keyspace defined by prefix")]
179  EndKeyNotInKeyspace,
180  #[class(type)]
181  #[error("Start key is greater than end key")]
182  StartKeyGreaterThanEndKey,
183  #[class(inherit)]
184  #[error("Invalid check")]
185  InvalidCheck(#[source] KvCheckError),
186  #[class(inherit)]
187  #[error("Invalid mutation")]
188  InvalidMutation(#[source] KvMutationError),
189  #[class(inherit)]
190  #[error("Invalid enqueue")]
191  InvalidEnqueue(#[source] std::io::Error),
192  #[class(type)]
193  #[error("key cannot be empty")]
194  EmptyKey,
195  #[class(type)]
196  #[error("Value too large (max {0} bytes)")]
197  ValueTooLarge(usize),
198  #[class(type)]
199  #[error("enqueue payload too large (max {0} bytes)")]
200  EnqueuePayloadTooLarge(usize),
201  #[class(type)]
202  #[error("invalid cursor")]
203  InvalidCursor,
204  #[class(type)]
205  #[error("cursor out of bounds")]
206  CursorOutOfBounds,
207  #[class(type)]
208  #[error("Invalid range")]
209  InvalidRange,
210}
211
212#[op2(async, stack_trace)]
213#[smi]
214async fn op_kv_database_open<DBH>(
215  state: Rc<RefCell<OpState>>,
216  #[string] path: Option<String>,
217) -> Result<ResourceId, KvError>
218where
219  DBH: DatabaseHandler + 'static,
220{
221  let handler = {
222    let state = state.borrow();
223    state
224      .borrow::<Arc<FeatureChecker>>()
225      .check_or_exit(UNSTABLE_FEATURE_NAME, "Deno.openKv");
226    state.borrow::<Rc<DBH>>().clone()
227  };
228  let db = handler
229    .open(state.clone(), path)
230    .await
231    .map_err(KvErrorKind::DatabaseHandler)?;
232  let rid = state.borrow_mut().resource_table.add(DatabaseResource {
233    db,
234    cancel_handle: CancelHandle::new_rc(),
235  });
236  Ok(rid)
237}
238
239type KvKey = Vec<AnyValue>;
240
241fn key_part_from_v8(value: AnyValue) -> KeyPart {
242  match value {
243    AnyValue::Bool(false) => KeyPart::False,
244    AnyValue::Bool(true) => KeyPart::True,
245    AnyValue::Number(n) => KeyPart::Float(n),
246    AnyValue::BigInt(n) => KeyPart::Int(n),
247    AnyValue::String(s) => KeyPart::String(s),
248    AnyValue::V8Buffer(buf) => KeyPart::Bytes(buf.to_vec()),
249    AnyValue::RustBuffer(_) => unreachable!(),
250  }
251}
252
253fn key_part_to_v8(value: KeyPart) -> AnyValue {
254  match value {
255    KeyPart::False => AnyValue::Bool(false),
256    KeyPart::True => AnyValue::Bool(true),
257    KeyPart::Float(n) => AnyValue::Number(n),
258    KeyPart::Int(n) => AnyValue::BigInt(n),
259    KeyPart::String(s) => AnyValue::String(s),
260    KeyPart::Bytes(buf) => AnyValue::RustBuffer(buf.into()),
261  }
262}
263
264#[derive(Debug, Deserialize)]
265#[serde(tag = "kind", content = "value", rename_all = "snake_case")]
266enum FromV8Value {
267  V8(JsBuffer),
268  Bytes(JsBuffer),
269  U64(BigInt),
270}
271
272#[derive(Debug, Serialize)]
273#[serde(tag = "kind", content = "value", rename_all = "snake_case")]
274enum ToV8Value {
275  V8(ToJsBuffer),
276  Bytes(ToJsBuffer),
277  U64(BigInt),
278}
279
280impl TryFrom<FromV8Value> for KvValue {
281  type Error = num_bigint::TryFromBigIntError<num_bigint::BigInt>;
282  fn try_from(value: FromV8Value) -> Result<Self, Self::Error> {
283    Ok(match value {
284      FromV8Value::V8(buf) => KvValue::V8(buf.to_vec()),
285      FromV8Value::Bytes(buf) => KvValue::Bytes(buf.to_vec()),
286      FromV8Value::U64(n) => {
287        KvValue::U64(num_bigint::BigInt::from(n).try_into()?)
288      }
289    })
290  }
291}
292
293impl From<KvValue> for ToV8Value {
294  fn from(value: KvValue) -> Self {
295    match value {
296      KvValue::V8(buf) => ToV8Value::V8(buf.into()),
297      KvValue::Bytes(buf) => ToV8Value::Bytes(buf.into()),
298      KvValue::U64(n) => ToV8Value::U64(num_bigint::BigInt::from(n).into()),
299    }
300  }
301}
302
303#[derive(Serialize)]
304struct ToV8KvEntry {
305  key: KvKey,
306  value: ToV8Value,
307  versionstamp: ByteString,
308}
309
310impl TryFrom<KvEntry> for ToV8KvEntry {
311  type Error = std::io::Error;
312  fn try_from(entry: KvEntry) -> Result<Self, Self::Error> {
313    Ok(ToV8KvEntry {
314      key: decode_key(&entry.key)?
315        .0
316        .into_iter()
317        .map(key_part_to_v8)
318        .collect(),
319      value: entry.value.into(),
320      versionstamp: faster_hex::hex_string(&entry.versionstamp).into(),
321    })
322  }
323}
324
325#[derive(Deserialize, Serialize)]
326#[serde(rename_all = "camelCase")]
327enum V8Consistency {
328  Strong,
329  Eventual,
330}
331
332impl From<V8Consistency> for Consistency {
333  fn from(value: V8Consistency) -> Self {
334    match value {
335      V8Consistency::Strong => Consistency::Strong,
336      V8Consistency::Eventual => Consistency::Eventual,
337    }
338  }
339}
340
341// (prefix, start, end, limit, reverse, cursor)
342type SnapshotReadRange = (
343  Option<KvKey>,
344  Option<KvKey>,
345  Option<KvKey>,
346  u32,
347  bool,
348  Option<ByteString>,
349);
350
351#[op2(async)]
352#[serde]
353async fn op_kv_snapshot_read<DBH>(
354  state: Rc<RefCell<OpState>>,
355  #[smi] rid: ResourceId,
356  #[serde] ranges: Vec<SnapshotReadRange>,
357  #[serde] consistency: V8Consistency,
358) -> Result<Vec<Vec<ToV8KvEntry>>, KvError>
359where
360  DBH: DatabaseHandler + 'static,
361{
362  let db = {
363    let state = state.borrow();
364    let resource = state
365      .resource_table
366      .get::<DatabaseResource<DBH::DB>>(rid)
367      .map_err(KvErrorKind::Resource)?;
368    resource.db.clone()
369  };
370
371  let config = {
372    let state = state.borrow();
373    state.borrow::<Rc<KvConfig>>().clone()
374  };
375
376  if ranges.len() > config.max_read_ranges {
377    return Err(KvErrorKind::TooManyRanges(config.max_read_ranges).into_box());
378  }
379
380  let mut total_entries = 0usize;
381
382  let read_ranges = ranges
383    .into_iter()
384    .map(|(prefix, start, end, limit, reverse, cursor)| {
385      let selector = RawSelector::from_tuple(prefix, start, end)?;
386
387      let (start, end) =
388        decode_selector_and_cursor(&selector, reverse, cursor.as_ref())?;
389      check_read_key_size(&start, &config)?;
390      check_read_key_size(&end, &config)?;
391
392      total_entries += limit as usize;
393      Ok(ReadRange {
394        start,
395        end,
396        limit: NonZeroU32::new(limit).ok_or(KvErrorKind::InvalidLimit)?,
397        reverse,
398      })
399    })
400    .collect::<Result<Vec<_>, KvError>>()?;
401
402  if total_entries > config.max_read_entries {
403    return Err(
404      KvErrorKind::TooManyEntries(config.max_read_entries).into_box(),
405    );
406  }
407
408  let opts = SnapshotReadOptions {
409    consistency: consistency.into(),
410  };
411  let output_ranges = db
412    .snapshot_read(read_ranges, opts)
413    .await
414    .map_err(KvErrorKind::Kv)?;
415  let output_ranges = output_ranges
416    .into_iter()
417    .map(|x| {
418      x.entries
419        .into_iter()
420        .map(TryInto::try_into)
421        .collect::<Result<Vec<_>, std::io::Error>>()
422    })
423    .collect::<Result<Vec<_>, std::io::Error>>()?;
424  Ok(output_ranges)
425}
426
427struct QueueMessageResource<QPH: QueueMessageHandle + 'static> {
428  handle: QPH,
429}
430
431impl<QMH: QueueMessageHandle + 'static> Resource for QueueMessageResource<QMH> {
432  fn name(&self) -> Cow<str> {
433    "queueMessage".into()
434  }
435}
436
437#[op2(async)]
438#[serde]
439async fn op_kv_dequeue_next_message<DBH>(
440  state: Rc<RefCell<OpState>>,
441  #[smi] rid: ResourceId,
442) -> Result<Option<(ToJsBuffer, ResourceId)>, KvError>
443where
444  DBH: DatabaseHandler + 'static,
445{
446  let db = {
447    let state = state.borrow();
448    let resource =
449      match state.resource_table.get::<DatabaseResource<DBH::DB>>(rid) {
450        Ok(resource) => resource,
451        Err(err) => {
452          if err.get_class() == "BadResource" {
453            return Ok(None);
454          } else {
455            return Err(KvErrorKind::Resource(err).into_box());
456          }
457        }
458      };
459    resource.db.clone()
460  };
461
462  let Some(mut handle) =
463    db.dequeue_next_message().await.map_err(KvErrorKind::Kv)?
464  else {
465    return Ok(None);
466  };
467  let payload = handle.take_payload().await.map_err(KvErrorKind::Kv)?.into();
468  let handle_rid = {
469    let mut state = state.borrow_mut();
470    state.resource_table.add(QueueMessageResource { handle })
471  };
472  Ok(Some((payload, handle_rid)))
473}
474
475#[op2]
476#[smi]
477fn op_kv_watch<DBH>(
478  state: &mut OpState,
479  #[smi] rid: ResourceId,
480  #[serde] keys: Vec<KvKey>,
481) -> Result<ResourceId, KvError>
482where
483  DBH: DatabaseHandler + 'static,
484{
485  let resource = state
486    .resource_table
487    .get::<DatabaseResource<DBH::DB>>(rid)
488    .map_err(KvErrorKind::Resource)?;
489  let config = state.borrow::<Rc<KvConfig>>().clone();
490
491  if keys.len() > config.max_watched_keys {
492    return Err(KvErrorKind::TooManyKeys(config.max_watched_keys).into_box());
493  }
494
495  let keys: Vec<Vec<u8>> = keys
496    .into_iter()
497    .map(encode_v8_key)
498    .collect::<std::io::Result<_>>()?;
499
500  for k in &keys {
501    check_read_key_size(k, &config)?;
502  }
503
504  let stream = resource.db.watch(keys);
505
506  let rid = state.resource_table.add(DatabaseWatcherResource {
507    stream: AsyncRefCell::new(stream),
508    db_cancel_handle: resource.cancel_handle.clone(),
509    cancel_handle: CancelHandle::new_rc(),
510  });
511
512  Ok(rid)
513}
514
515#[derive(Serialize)]
516#[serde(rename_all = "camelCase", untagged)]
517enum WatchEntry {
518  Changed(Option<ToV8KvEntry>),
519  Unchanged,
520}
521
522#[op2(async)]
523#[serde]
524async fn op_kv_watch_next(
525  state: Rc<RefCell<OpState>>,
526  #[smi] rid: ResourceId,
527) -> Result<Option<Vec<WatchEntry>>, KvError> {
528  let resource = {
529    let state = state.borrow();
530    let resource = state
531      .resource_table
532      .get::<DatabaseWatcherResource>(rid)
533      .map_err(KvErrorKind::Resource)?;
534    resource.clone()
535  };
536
537  let db_cancel_handle = resource.db_cancel_handle.clone();
538  let cancel_handle = resource.cancel_handle.clone();
539  let stream = RcRef::map(resource, |r| &r.stream)
540    .borrow_mut()
541    .or_cancel(db_cancel_handle.clone())
542    .or_cancel(cancel_handle.clone())
543    .await;
544  let Ok(Ok(mut stream)) = stream else {
545    return Ok(None);
546  };
547
548  // We hold a strong reference to `resource`, so we can't rely on the stream
549  // being dropped when the db connection is closed
550  let Ok(Ok(Some(res))) = stream
551    .next()
552    .or_cancel(db_cancel_handle)
553    .or_cancel(cancel_handle)
554    .await
555  else {
556    return Ok(None);
557  };
558
559  let entries = res.map_err(KvErrorKind::Kv)?;
560  let entries = entries
561    .into_iter()
562    .map(|entry| {
563      Ok(match entry {
564        WatchKeyOutput::Changed { entry } => {
565          WatchEntry::Changed(entry.map(TryInto::try_into).transpose()?)
566        }
567        WatchKeyOutput::Unchanged => WatchEntry::Unchanged,
568      })
569    })
570    .collect::<Result<_, KvError>>()?;
571
572  Ok(Some(entries))
573}
574
575#[op2(async)]
576async fn op_kv_finish_dequeued_message<DBH>(
577  state: Rc<RefCell<OpState>>,
578  #[smi] handle_rid: ResourceId,
579  success: bool,
580) -> Result<(), KvError>
581where
582  DBH: DatabaseHandler + 'static,
583{
584  let handle = {
585    let mut state = state.borrow_mut();
586    let handle = state
587      .resource_table
588      .take::<QueueMessageResource<<<DBH>::DB as Database>::QMH>>(handle_rid)
589      .map_err(|_| KvErrorKind::QueueMessageNotFound)?;
590    Rc::try_unwrap(handle)
591      .map_err(|_| KvErrorKind::QueueMessageNotFound)?
592      .handle
593  };
594  // if we fail to finish the message, there is not much we can do and the
595  // message will be retried anyway, so we just ignore the error
596  if let Err(err) = handle.finish(success).await {
597    debug!("Failed to finish dequeued message: {}", err);
598  };
599  Ok(())
600}
601
602#[derive(Debug, thiserror::Error, deno_error::JsError)]
603pub enum KvCheckError {
604  #[class(type)]
605  #[error("invalid versionstamp")]
606  InvalidVersionstamp,
607  #[class(inherit)]
608  #[error(transparent)]
609  Io(std::io::Error),
610}
611
612type V8KvCheck = (KvKey, Option<ByteString>);
613
614fn check_from_v8(value: V8KvCheck) -> Result<Check, KvCheckError> {
615  let versionstamp = match value.1 {
616    Some(data) => {
617      let mut out = [0u8; 10];
618      if data.len() != out.len() * 2 {
619        return Err(KvCheckError::InvalidVersionstamp);
620      }
621      faster_hex::hex_decode(&data, &mut out)
622        .map_err(|_| KvCheckError::InvalidVersionstamp)?;
623      Some(out)
624    }
625    None => None,
626  };
627  Ok(Check {
628    key: encode_v8_key(value.0).map_err(KvCheckError::Io)?,
629    versionstamp,
630  })
631}
632
633#[derive(Debug, thiserror::Error, deno_error::JsError)]
634pub enum KvMutationError {
635  #[class(generic)]
636  #[error(transparent)]
637  BigInt(#[from] num_bigint::TryFromBigIntError<num_bigint::BigInt>),
638  #[class(inherit)]
639  #[error(transparent)]
640  Io(
641    #[from]
642    #[inherit]
643    std::io::Error,
644  ),
645  #[class(type)]
646  #[error("Invalid mutation '{0}' with value")]
647  InvalidMutationWithValue(String),
648  #[class(type)]
649  #[error("Invalid mutation '{0}' without value")]
650  InvalidMutationWithoutValue(String),
651}
652
653type V8KvMutation = (KvKey, String, Option<FromV8Value>, Option<u64>);
654
655fn mutation_from_v8(
656  (value, current_timstamp): (V8KvMutation, DateTime<Utc>),
657) -> Result<Mutation, KvMutationError> {
658  let key = encode_v8_key(value.0)?;
659  let kind = match (value.1.as_str(), value.2) {
660    ("set", Some(value)) => MutationKind::Set(value.try_into()?),
661    ("delete", None) => MutationKind::Delete,
662    ("sum", Some(value)) => MutationKind::Sum {
663      value: value.try_into()?,
664      min_v8: vec![],
665      max_v8: vec![],
666      clamp: false,
667    },
668    ("min", Some(value)) => MutationKind::Min(value.try_into()?),
669    ("max", Some(value)) => MutationKind::Max(value.try_into()?),
670    ("setSuffixVersionstampedKey", Some(value)) => {
671      MutationKind::SetSuffixVersionstampedKey(value.try_into()?)
672    }
673    (op, Some(_)) => {
674      return Err(KvMutationError::InvalidMutationWithValue(op.to_string()))
675    }
676    (op, None) => {
677      return Err(KvMutationError::InvalidMutationWithoutValue(op.to_string()))
678    }
679  };
680  Ok(Mutation {
681    key,
682    kind,
683    expire_at: value
684      .3
685      .map(|expire_in| current_timstamp + Duration::from_millis(expire_in)),
686  })
687}
688
689type V8Enqueue = (JsBuffer, u64, Vec<KvKey>, Option<Vec<u32>>);
690
691fn enqueue_from_v8(
692  value: V8Enqueue,
693  current_timestamp: DateTime<Utc>,
694) -> Result<Enqueue, std::io::Error> {
695  Ok(Enqueue {
696    payload: value.0.to_vec(),
697    deadline: current_timestamp
698      + chrono::Duration::milliseconds(value.1 as i64),
699    keys_if_undelivered: value
700      .2
701      .into_iter()
702      .map(encode_v8_key)
703      .collect::<std::io::Result<_>>()?,
704    backoff_schedule: value.3,
705  })
706}
707
708fn encode_v8_key(key: KvKey) -> Result<Vec<u8>, std::io::Error> {
709  encode_key(&Key(key.into_iter().map(key_part_from_v8).collect()))
710}
711
712enum RawSelector {
713  Prefixed {
714    prefix: Vec<u8>,
715    start: Option<Vec<u8>>,
716    end: Option<Vec<u8>>,
717  },
718  Range {
719    start: Vec<u8>,
720    end: Vec<u8>,
721  },
722}
723
724impl RawSelector {
725  fn from_tuple(
726    prefix: Option<KvKey>,
727    start: Option<KvKey>,
728    end: Option<KvKey>,
729  ) -> Result<Self, KvError> {
730    let prefix = prefix.map(encode_v8_key).transpose()?;
731    let start = start.map(encode_v8_key).transpose()?;
732    let end = end.map(encode_v8_key).transpose()?;
733
734    match (prefix, start, end) {
735      (Some(prefix), None, None) => Ok(Self::Prefixed {
736        prefix,
737        start: None,
738        end: None,
739      }),
740      (Some(prefix), Some(start), None) => {
741        if !start.starts_with(&prefix) || start.len() == prefix.len() {
742          return Err(KvErrorKind::StartKeyNotInKeyspace.into_box());
743        }
744        Ok(Self::Prefixed {
745          prefix,
746          start: Some(start),
747          end: None,
748        })
749      }
750      (Some(prefix), None, Some(end)) => {
751        if !end.starts_with(&prefix) || end.len() == prefix.len() {
752          return Err(KvErrorKind::EndKeyNotInKeyspace.into_box());
753        }
754        Ok(Self::Prefixed {
755          prefix,
756          start: None,
757          end: Some(end),
758        })
759      }
760      (None, Some(start), Some(end)) => {
761        if start > end {
762          return Err(KvErrorKind::StartKeyGreaterThanEndKey.into_box());
763        }
764        Ok(Self::Range { start, end })
765      }
766      (None, Some(start), None) => {
767        let end = start.iter().copied().chain(Some(0)).collect();
768        Ok(Self::Range { start, end })
769      }
770      _ => Err(KvErrorKind::InvalidRange.into_box()),
771    }
772  }
773
774  fn start(&self) -> Option<&[u8]> {
775    match self {
776      Self::Prefixed { start, .. } => start.as_deref(),
777      Self::Range { start, .. } => Some(start),
778    }
779  }
780
781  fn end(&self) -> Option<&[u8]> {
782    match self {
783      Self::Prefixed { end, .. } => end.as_deref(),
784      Self::Range { end, .. } => Some(end),
785    }
786  }
787
788  fn common_prefix(&self) -> &[u8] {
789    match self {
790      Self::Prefixed { prefix, .. } => prefix,
791      Self::Range { start, end } => common_prefix_for_bytes(start, end),
792    }
793  }
794
795  fn range_start_key(&self) -> Vec<u8> {
796    match self {
797      Self::Prefixed {
798        start: Some(start), ..
799      } => start.clone(),
800      Self::Range { start, .. } => start.clone(),
801      Self::Prefixed { prefix, .. } => {
802        prefix.iter().copied().chain(Some(0)).collect()
803      }
804    }
805  }
806
807  fn range_end_key(&self) -> Vec<u8> {
808    match self {
809      Self::Prefixed { end: Some(end), .. } => end.clone(),
810      Self::Range { end, .. } => end.clone(),
811      Self::Prefixed { prefix, .. } => {
812        prefix.iter().copied().chain(Some(0xff)).collect()
813      }
814    }
815  }
816}
817
818fn common_prefix_for_bytes<'a>(a: &'a [u8], b: &'a [u8]) -> &'a [u8] {
819  let mut i = 0;
820  while i < a.len() && i < b.len() && a[i] == b[i] {
821    i += 1;
822  }
823  &a[..i]
824}
825
826fn encode_cursor(
827  selector: &RawSelector,
828  boundary_key: &[u8],
829) -> Result<String, KvError> {
830  let common_prefix = selector.common_prefix();
831  if !boundary_key.starts_with(common_prefix) {
832    return Err(KvErrorKind::InvalidBoundaryKey.into_box());
833  }
834  Ok(BASE64_URL_SAFE.encode(&boundary_key[common_prefix.len()..]))
835}
836
837fn decode_selector_and_cursor(
838  selector: &RawSelector,
839  reverse: bool,
840  cursor: Option<&ByteString>,
841) -> Result<(Vec<u8>, Vec<u8>), KvError> {
842  let Some(cursor) = cursor else {
843    return Ok((selector.range_start_key(), selector.range_end_key()));
844  };
845
846  let common_prefix = selector.common_prefix();
847  let cursor = BASE64_URL_SAFE
848    .decode(cursor)
849    .map_err(|_| KvErrorKind::InvalidCursor)?;
850
851  let first_key: Vec<u8>;
852  let last_key: Vec<u8>;
853
854  if reverse {
855    first_key = selector.range_start_key();
856    last_key = common_prefix
857      .iter()
858      .copied()
859      .chain(cursor.iter().copied())
860      .collect();
861  } else {
862    first_key = common_prefix
863      .iter()
864      .copied()
865      .chain(cursor.iter().copied())
866      .chain(Some(0))
867      .collect();
868    last_key = selector.range_end_key();
869  }
870
871  // Defend against out-of-bounds reading
872  if let Some(start) = selector.start() {
873    if &first_key[..] < start {
874      return Err(KvErrorKind::CursorOutOfBounds.into_box());
875    }
876  }
877
878  if let Some(end) = selector.end() {
879    if &last_key[..] > end {
880      return Err(KvErrorKind::CursorOutOfBounds.into_box());
881    }
882  }
883
884  Ok((first_key, last_key))
885}
886
887#[op2(async)]
888#[string]
889async fn op_kv_atomic_write<DBH>(
890  state: Rc<RefCell<OpState>>,
891  #[smi] rid: ResourceId,
892  #[serde] checks: Vec<V8KvCheck>,
893  #[serde] mutations: Vec<V8KvMutation>,
894  #[serde] enqueues: Vec<V8Enqueue>,
895) -> Result<Option<String>, KvError>
896where
897  DBH: DatabaseHandler + 'static,
898{
899  let current_timestamp = chrono::Utc::now();
900  let db = {
901    let state = state.borrow();
902    let resource = state
903      .resource_table
904      .get::<DatabaseResource<DBH::DB>>(rid)
905      .map_err(KvErrorKind::Resource)?;
906    resource.db.clone()
907  };
908
909  let config = {
910    let state = state.borrow();
911    state.borrow::<Rc<KvConfig>>().clone()
912  };
913
914  if checks.len() > config.max_checks {
915    return Err(KvErrorKind::TooManyChecks(config.max_checks).into_box());
916  }
917
918  if mutations.len() + enqueues.len() > config.max_mutations {
919    return Err(KvErrorKind::TooManyMutations(config.max_mutations).into_box());
920  }
921
922  let checks = checks
923    .into_iter()
924    .map(check_from_v8)
925    .collect::<Result<Vec<Check>, KvCheckError>>()
926    .map_err(KvErrorKind::InvalidCheck)?;
927  let mutations = mutations
928    .into_iter()
929    .map(|mutation| mutation_from_v8((mutation, current_timestamp)))
930    .collect::<Result<Vec<Mutation>, KvMutationError>>()
931    .map_err(KvErrorKind::InvalidMutation)?;
932  let enqueues = enqueues
933    .into_iter()
934    .map(|e| enqueue_from_v8(e, current_timestamp))
935    .collect::<Result<Vec<Enqueue>, std::io::Error>>()
936    .map_err(KvErrorKind::InvalidEnqueue)?;
937
938  let mut total_payload_size = 0usize;
939  let mut total_key_size = 0usize;
940
941  for key in checks
942    .iter()
943    .map(|c| &c.key)
944    .chain(mutations.iter().map(|m| &m.key))
945  {
946    if key.is_empty() {
947      return Err(KvErrorKind::EmptyKey.into_box());
948    }
949
950    total_payload_size += check_write_key_size(key, &config)?;
951  }
952
953  for (key, value) in mutations
954    .iter()
955    .flat_map(|m| m.kind.value().map(|x| (&m.key, x)))
956  {
957    let key_size = check_write_key_size(key, &config)?;
958    total_payload_size += check_value_size(value, &config)? + key_size;
959    total_key_size += key_size;
960  }
961
962  for enqueue in &enqueues {
963    total_payload_size +=
964      check_enqueue_payload_size(&enqueue.payload, &config)?;
965    if let Some(schedule) = enqueue.backoff_schedule.as_ref() {
966      total_payload_size += 4 * schedule.len();
967    }
968  }
969
970  if total_payload_size > config.max_total_mutation_size_bytes {
971    return Err(
972      KvErrorKind::TotalMutationTooLarge(config.max_total_mutation_size_bytes)
973        .into_box(),
974    );
975  }
976
977  if total_key_size > config.max_total_key_size_bytes {
978    return Err(
979      KvErrorKind::TotalKeyTooLarge(config.max_total_key_size_bytes).into_box(),
980    );
981  }
982
983  let atomic_write = AtomicWrite {
984    checks,
985    mutations,
986    enqueues,
987  };
988
989  let result = db
990    .atomic_write(atomic_write)
991    .await
992    .map_err(KvErrorKind::Kv)?;
993
994  Ok(result.map(|res| faster_hex::hex_string(&res.versionstamp)))
995}
996
997// (prefix, start, end)
998type EncodeCursorRangeSelector = (Option<KvKey>, Option<KvKey>, Option<KvKey>);
999
1000#[op2]
1001#[string]
1002fn op_kv_encode_cursor(
1003  #[serde] (prefix, start, end): EncodeCursorRangeSelector,
1004  #[serde] boundary_key: KvKey,
1005) -> Result<String, KvError> {
1006  let selector = RawSelector::from_tuple(prefix, start, end)?;
1007  let boundary_key = encode_v8_key(boundary_key)?;
1008  let cursor = encode_cursor(&selector, &boundary_key)?;
1009  Ok(cursor)
1010}
1011
1012fn check_read_key_size(key: &[u8], config: &KvConfig) -> Result<(), KvError> {
1013  if key.len() > config.max_read_key_size_bytes {
1014    Err(
1015      KvErrorKind::KeyTooLargeToRead(config.max_read_key_size_bytes).into_box(),
1016    )
1017  } else {
1018    Ok(())
1019  }
1020}
1021
1022fn check_write_key_size(
1023  key: &[u8],
1024  config: &KvConfig,
1025) -> Result<usize, KvError> {
1026  if key.len() > config.max_write_key_size_bytes {
1027    Err(
1028      KvErrorKind::KeyTooLargeToWrite(config.max_write_key_size_bytes)
1029        .into_box(),
1030    )
1031  } else {
1032    Ok(key.len())
1033  }
1034}
1035
1036fn check_value_size(
1037  value: &KvValue,
1038  config: &KvConfig,
1039) -> Result<usize, KvError> {
1040  let payload = match value {
1041    KvValue::Bytes(x) => x,
1042    KvValue::V8(x) => x,
1043    KvValue::U64(_) => return Ok(8),
1044  };
1045
1046  if payload.len() > config.max_value_size_bytes {
1047    Err(KvErrorKind::ValueTooLarge(config.max_value_size_bytes).into_box())
1048  } else {
1049    Ok(payload.len())
1050  }
1051}
1052
1053fn check_enqueue_payload_size(
1054  payload: &[u8],
1055  config: &KvConfig,
1056) -> Result<usize, KvError> {
1057  if payload.len() > config.max_value_size_bytes {
1058    Err(
1059      KvErrorKind::EnqueuePayloadTooLarge(config.max_value_size_bytes)
1060        .into_box(),
1061    )
1062  } else {
1063    Ok(payload.len())
1064  }
1065}