1pub mod config;
4pub mod dynamic;
5mod interface;
6pub mod remote;
7pub mod sqlite;
8
9use std::borrow::Cow;
10use std::cell::RefCell;
11use std::num::NonZeroU32;
12use std::rc::Rc;
13use std::sync::Arc;
14use std::time::Duration;
15
16use base64::prelude::BASE64_URL_SAFE;
17use base64::Engine;
18use boxed_error::Boxed;
19use chrono::DateTime;
20use chrono::Utc;
21use deno_core::futures::StreamExt;
22use deno_core::op2;
23use deno_core::serde_v8::AnyValue;
24use deno_core::serde_v8::BigInt;
25use deno_core::AsyncRefCell;
26use deno_core::ByteString;
27use deno_core::CancelFuture;
28use deno_core::CancelHandle;
29use deno_core::JsBuffer;
30use deno_core::OpState;
31use deno_core::RcRef;
32use deno_core::Resource;
33use deno_core::ResourceId;
34use deno_core::ToJsBuffer;
35use deno_error::JsErrorBox;
36use deno_error::JsErrorClass;
37use deno_features::FeatureChecker;
38use denokv_proto::decode_key;
39use denokv_proto::encode_key;
40use denokv_proto::AtomicWrite;
41use denokv_proto::Check;
42use denokv_proto::Consistency;
43use denokv_proto::Database;
44use denokv_proto::Enqueue;
45use denokv_proto::Key;
46use denokv_proto::KeyPart;
47use denokv_proto::KvEntry;
48use denokv_proto::KvValue;
49use denokv_proto::Mutation;
50use denokv_proto::MutationKind;
51use denokv_proto::QueueMessageHandle;
52use denokv_proto::ReadRange;
53use denokv_proto::SnapshotReadOptions;
54use denokv_proto::WatchKeyOutput;
55use denokv_proto::WatchStream;
56use log::debug;
57use serde::Deserialize;
58use serde::Serialize;
59
60pub use crate::config::*;
61pub use crate::interface::*;
62
63pub const UNSTABLE_FEATURE_NAME: &str = "kv";
64
65deno_core::extension!(deno_kv,
66 deps = [ deno_console, deno_web ],
67 parameters = [ DBH: DatabaseHandler ],
68 ops = [
69 op_kv_database_open<DBH>,
70 op_kv_snapshot_read<DBH>,
71 op_kv_atomic_write<DBH>,
72 op_kv_encode_cursor,
73 op_kv_dequeue_next_message<DBH>,
74 op_kv_finish_dequeued_message<DBH>,
75 op_kv_watch<DBH>,
76 op_kv_watch_next,
77 ],
78 esm = [ "01_db.ts" ],
79 options = {
80 handler: DBH,
81 config: KvConfig,
82 },
83 state = |state, options| {
84 state.put(Rc::new(options.config));
85 state.put(Rc::new(options.handler));
86 }
87);
88
89struct DatabaseResource<DB: Database + 'static> {
90 db: DB,
91 cancel_handle: Rc<CancelHandle>,
92}
93
94impl<DB: Database + 'static> Resource for DatabaseResource<DB> {
95 fn name(&self) -> Cow<str> {
96 "database".into()
97 }
98
99 fn close(self: Rc<Self>) {
100 self.db.close();
101 self.cancel_handle.cancel();
102 }
103}
104
105struct DatabaseWatcherResource {
106 stream: AsyncRefCell<WatchStream>,
107 db_cancel_handle: Rc<CancelHandle>,
108 cancel_handle: Rc<CancelHandle>,
109}
110
111impl Resource for DatabaseWatcherResource {
112 fn name(&self) -> Cow<str> {
113 "databaseWatcher".into()
114 }
115
116 fn close(self: Rc<Self>) {
117 self.cancel_handle.cancel()
118 }
119}
120
121#[derive(Debug, Boxed, deno_error::JsError)]
122pub struct KvError(pub Box<KvErrorKind>);
123
124#[derive(Debug, thiserror::Error, deno_error::JsError)]
125pub enum KvErrorKind {
126 #[class(inherit)]
127 #[error(transparent)]
128 DatabaseHandler(JsErrorBox),
129 #[class(inherit)]
130 #[error(transparent)]
131 Resource(#[from] deno_core::error::ResourceError),
132 #[class(type)]
133 #[error("Too many ranges (max {0})")]
134 TooManyRanges(usize),
135 #[class(type)]
136 #[error("Too many entries (max {0})")]
137 TooManyEntries(usize),
138 #[class(type)]
139 #[error("Too many checks (max {0})")]
140 TooManyChecks(usize),
141 #[class(type)]
142 #[error("Too many mutations (max {0})")]
143 TooManyMutations(usize),
144 #[class(type)]
145 #[error("Too many keys (max {0})")]
146 TooManyKeys(usize),
147 #[class(type)]
148 #[error("limit must be greater than 0")]
149 InvalidLimit,
150 #[class(type)]
151 #[error("Invalid boundary key")]
152 InvalidBoundaryKey,
153 #[class(type)]
154 #[error("Key too large for read (max {0} bytes)")]
155 KeyTooLargeToRead(usize),
156 #[class(type)]
157 #[error("Key too large for write (max {0} bytes)")]
158 KeyTooLargeToWrite(usize),
159 #[class(type)]
160 #[error("Total mutation size too large (max {0} bytes)")]
161 TotalMutationTooLarge(usize),
162 #[class(type)]
163 #[error("Total key size too large (max {0} bytes)")]
164 TotalKeyTooLarge(usize),
165 #[class(inherit)]
166 #[error(transparent)]
167 Kv(JsErrorBox),
168 #[class(inherit)]
169 #[error(transparent)]
170 Io(#[from] std::io::Error),
171 #[class(type)]
172 #[error("Queue message not found")]
173 QueueMessageNotFound,
174 #[class(type)]
175 #[error("Start key is not in the keyspace defined by prefix")]
176 StartKeyNotInKeyspace,
177 #[class(type)]
178 #[error("End key is not in the keyspace defined by prefix")]
179 EndKeyNotInKeyspace,
180 #[class(type)]
181 #[error("Start key is greater than end key")]
182 StartKeyGreaterThanEndKey,
183 #[class(inherit)]
184 #[error("Invalid check")]
185 InvalidCheck(#[source] KvCheckError),
186 #[class(inherit)]
187 #[error("Invalid mutation")]
188 InvalidMutation(#[source] KvMutationError),
189 #[class(inherit)]
190 #[error("Invalid enqueue")]
191 InvalidEnqueue(#[source] std::io::Error),
192 #[class(type)]
193 #[error("key cannot be empty")]
194 EmptyKey,
195 #[class(type)]
196 #[error("Value too large (max {0} bytes)")]
197 ValueTooLarge(usize),
198 #[class(type)]
199 #[error("enqueue payload too large (max {0} bytes)")]
200 EnqueuePayloadTooLarge(usize),
201 #[class(type)]
202 #[error("invalid cursor")]
203 InvalidCursor,
204 #[class(type)]
205 #[error("cursor out of bounds")]
206 CursorOutOfBounds,
207 #[class(type)]
208 #[error("Invalid range")]
209 InvalidRange,
210}
211
212#[op2(async, stack_trace)]
213#[smi]
214async fn op_kv_database_open<DBH>(
215 state: Rc<RefCell<OpState>>,
216 #[string] path: Option<String>,
217) -> Result<ResourceId, KvError>
218where
219 DBH: DatabaseHandler + 'static,
220{
221 let handler = {
222 let state = state.borrow();
223 state
224 .borrow::<Arc<FeatureChecker>>()
225 .check_or_exit(UNSTABLE_FEATURE_NAME, "Deno.openKv");
226 state.borrow::<Rc<DBH>>().clone()
227 };
228 let db = handler
229 .open(state.clone(), path)
230 .await
231 .map_err(KvErrorKind::DatabaseHandler)?;
232 let rid = state.borrow_mut().resource_table.add(DatabaseResource {
233 db,
234 cancel_handle: CancelHandle::new_rc(),
235 });
236 Ok(rid)
237}
238
239type KvKey = Vec<AnyValue>;
240
241fn key_part_from_v8(value: AnyValue) -> KeyPart {
242 match value {
243 AnyValue::Bool(false) => KeyPart::False,
244 AnyValue::Bool(true) => KeyPart::True,
245 AnyValue::Number(n) => KeyPart::Float(n),
246 AnyValue::BigInt(n) => KeyPart::Int(n),
247 AnyValue::String(s) => KeyPart::String(s),
248 AnyValue::V8Buffer(buf) => KeyPart::Bytes(buf.to_vec()),
249 AnyValue::RustBuffer(_) => unreachable!(),
250 }
251}
252
253fn key_part_to_v8(value: KeyPart) -> AnyValue {
254 match value {
255 KeyPart::False => AnyValue::Bool(false),
256 KeyPart::True => AnyValue::Bool(true),
257 KeyPart::Float(n) => AnyValue::Number(n),
258 KeyPart::Int(n) => AnyValue::BigInt(n),
259 KeyPart::String(s) => AnyValue::String(s),
260 KeyPart::Bytes(buf) => AnyValue::RustBuffer(buf.into()),
261 }
262}
263
264#[derive(Debug, Deserialize)]
265#[serde(tag = "kind", content = "value", rename_all = "snake_case")]
266enum FromV8Value {
267 V8(JsBuffer),
268 Bytes(JsBuffer),
269 U64(BigInt),
270}
271
272#[derive(Debug, Serialize)]
273#[serde(tag = "kind", content = "value", rename_all = "snake_case")]
274enum ToV8Value {
275 V8(ToJsBuffer),
276 Bytes(ToJsBuffer),
277 U64(BigInt),
278}
279
280impl TryFrom<FromV8Value> for KvValue {
281 type Error = num_bigint::TryFromBigIntError<num_bigint::BigInt>;
282 fn try_from(value: FromV8Value) -> Result<Self, Self::Error> {
283 Ok(match value {
284 FromV8Value::V8(buf) => KvValue::V8(buf.to_vec()),
285 FromV8Value::Bytes(buf) => KvValue::Bytes(buf.to_vec()),
286 FromV8Value::U64(n) => {
287 KvValue::U64(num_bigint::BigInt::from(n).try_into()?)
288 }
289 })
290 }
291}
292
293impl From<KvValue> for ToV8Value {
294 fn from(value: KvValue) -> Self {
295 match value {
296 KvValue::V8(buf) => ToV8Value::V8(buf.into()),
297 KvValue::Bytes(buf) => ToV8Value::Bytes(buf.into()),
298 KvValue::U64(n) => ToV8Value::U64(num_bigint::BigInt::from(n).into()),
299 }
300 }
301}
302
303#[derive(Serialize)]
304struct ToV8KvEntry {
305 key: KvKey,
306 value: ToV8Value,
307 versionstamp: ByteString,
308}
309
310impl TryFrom<KvEntry> for ToV8KvEntry {
311 type Error = std::io::Error;
312 fn try_from(entry: KvEntry) -> Result<Self, Self::Error> {
313 Ok(ToV8KvEntry {
314 key: decode_key(&entry.key)?
315 .0
316 .into_iter()
317 .map(key_part_to_v8)
318 .collect(),
319 value: entry.value.into(),
320 versionstamp: faster_hex::hex_string(&entry.versionstamp).into(),
321 })
322 }
323}
324
325#[derive(Deserialize, Serialize)]
326#[serde(rename_all = "camelCase")]
327enum V8Consistency {
328 Strong,
329 Eventual,
330}
331
332impl From<V8Consistency> for Consistency {
333 fn from(value: V8Consistency) -> Self {
334 match value {
335 V8Consistency::Strong => Consistency::Strong,
336 V8Consistency::Eventual => Consistency::Eventual,
337 }
338 }
339}
340
341type SnapshotReadRange = (
343 Option<KvKey>,
344 Option<KvKey>,
345 Option<KvKey>,
346 u32,
347 bool,
348 Option<ByteString>,
349);
350
351#[op2(async)]
352#[serde]
353async fn op_kv_snapshot_read<DBH>(
354 state: Rc<RefCell<OpState>>,
355 #[smi] rid: ResourceId,
356 #[serde] ranges: Vec<SnapshotReadRange>,
357 #[serde] consistency: V8Consistency,
358) -> Result<Vec<Vec<ToV8KvEntry>>, KvError>
359where
360 DBH: DatabaseHandler + 'static,
361{
362 let db = {
363 let state = state.borrow();
364 let resource = state
365 .resource_table
366 .get::<DatabaseResource<DBH::DB>>(rid)
367 .map_err(KvErrorKind::Resource)?;
368 resource.db.clone()
369 };
370
371 let config = {
372 let state = state.borrow();
373 state.borrow::<Rc<KvConfig>>().clone()
374 };
375
376 if ranges.len() > config.max_read_ranges {
377 return Err(KvErrorKind::TooManyRanges(config.max_read_ranges).into_box());
378 }
379
380 let mut total_entries = 0usize;
381
382 let read_ranges = ranges
383 .into_iter()
384 .map(|(prefix, start, end, limit, reverse, cursor)| {
385 let selector = RawSelector::from_tuple(prefix, start, end)?;
386
387 let (start, end) =
388 decode_selector_and_cursor(&selector, reverse, cursor.as_ref())?;
389 check_read_key_size(&start, &config)?;
390 check_read_key_size(&end, &config)?;
391
392 total_entries += limit as usize;
393 Ok(ReadRange {
394 start,
395 end,
396 limit: NonZeroU32::new(limit).ok_or(KvErrorKind::InvalidLimit)?,
397 reverse,
398 })
399 })
400 .collect::<Result<Vec<_>, KvError>>()?;
401
402 if total_entries > config.max_read_entries {
403 return Err(
404 KvErrorKind::TooManyEntries(config.max_read_entries).into_box(),
405 );
406 }
407
408 let opts = SnapshotReadOptions {
409 consistency: consistency.into(),
410 };
411 let output_ranges = db
412 .snapshot_read(read_ranges, opts)
413 .await
414 .map_err(KvErrorKind::Kv)?;
415 let output_ranges = output_ranges
416 .into_iter()
417 .map(|x| {
418 x.entries
419 .into_iter()
420 .map(TryInto::try_into)
421 .collect::<Result<Vec<_>, std::io::Error>>()
422 })
423 .collect::<Result<Vec<_>, std::io::Error>>()?;
424 Ok(output_ranges)
425}
426
427struct QueueMessageResource<QPH: QueueMessageHandle + 'static> {
428 handle: QPH,
429}
430
431impl<QMH: QueueMessageHandle + 'static> Resource for QueueMessageResource<QMH> {
432 fn name(&self) -> Cow<str> {
433 "queueMessage".into()
434 }
435}
436
437#[op2(async)]
438#[serde]
439async fn op_kv_dequeue_next_message<DBH>(
440 state: Rc<RefCell<OpState>>,
441 #[smi] rid: ResourceId,
442) -> Result<Option<(ToJsBuffer, ResourceId)>, KvError>
443where
444 DBH: DatabaseHandler + 'static,
445{
446 let db = {
447 let state = state.borrow();
448 let resource =
449 match state.resource_table.get::<DatabaseResource<DBH::DB>>(rid) {
450 Ok(resource) => resource,
451 Err(err) => {
452 if err.get_class() == "BadResource" {
453 return Ok(None);
454 } else {
455 return Err(KvErrorKind::Resource(err).into_box());
456 }
457 }
458 };
459 resource.db.clone()
460 };
461
462 let Some(mut handle) =
463 db.dequeue_next_message().await.map_err(KvErrorKind::Kv)?
464 else {
465 return Ok(None);
466 };
467 let payload = handle.take_payload().await.map_err(KvErrorKind::Kv)?.into();
468 let handle_rid = {
469 let mut state = state.borrow_mut();
470 state.resource_table.add(QueueMessageResource { handle })
471 };
472 Ok(Some((payload, handle_rid)))
473}
474
475#[op2]
476#[smi]
477fn op_kv_watch<DBH>(
478 state: &mut OpState,
479 #[smi] rid: ResourceId,
480 #[serde] keys: Vec<KvKey>,
481) -> Result<ResourceId, KvError>
482where
483 DBH: DatabaseHandler + 'static,
484{
485 let resource = state
486 .resource_table
487 .get::<DatabaseResource<DBH::DB>>(rid)
488 .map_err(KvErrorKind::Resource)?;
489 let config = state.borrow::<Rc<KvConfig>>().clone();
490
491 if keys.len() > config.max_watched_keys {
492 return Err(KvErrorKind::TooManyKeys(config.max_watched_keys).into_box());
493 }
494
495 let keys: Vec<Vec<u8>> = keys
496 .into_iter()
497 .map(encode_v8_key)
498 .collect::<std::io::Result<_>>()?;
499
500 for k in &keys {
501 check_read_key_size(k, &config)?;
502 }
503
504 let stream = resource.db.watch(keys);
505
506 let rid = state.resource_table.add(DatabaseWatcherResource {
507 stream: AsyncRefCell::new(stream),
508 db_cancel_handle: resource.cancel_handle.clone(),
509 cancel_handle: CancelHandle::new_rc(),
510 });
511
512 Ok(rid)
513}
514
515#[derive(Serialize)]
516#[serde(rename_all = "camelCase", untagged)]
517enum WatchEntry {
518 Changed(Option<ToV8KvEntry>),
519 Unchanged,
520}
521
522#[op2(async)]
523#[serde]
524async fn op_kv_watch_next(
525 state: Rc<RefCell<OpState>>,
526 #[smi] rid: ResourceId,
527) -> Result<Option<Vec<WatchEntry>>, KvError> {
528 let resource = {
529 let state = state.borrow();
530 let resource = state
531 .resource_table
532 .get::<DatabaseWatcherResource>(rid)
533 .map_err(KvErrorKind::Resource)?;
534 resource.clone()
535 };
536
537 let db_cancel_handle = resource.db_cancel_handle.clone();
538 let cancel_handle = resource.cancel_handle.clone();
539 let stream = RcRef::map(resource, |r| &r.stream)
540 .borrow_mut()
541 .or_cancel(db_cancel_handle.clone())
542 .or_cancel(cancel_handle.clone())
543 .await;
544 let Ok(Ok(mut stream)) = stream else {
545 return Ok(None);
546 };
547
548 let Ok(Ok(Some(res))) = stream
551 .next()
552 .or_cancel(db_cancel_handle)
553 .or_cancel(cancel_handle)
554 .await
555 else {
556 return Ok(None);
557 };
558
559 let entries = res.map_err(KvErrorKind::Kv)?;
560 let entries = entries
561 .into_iter()
562 .map(|entry| {
563 Ok(match entry {
564 WatchKeyOutput::Changed { entry } => {
565 WatchEntry::Changed(entry.map(TryInto::try_into).transpose()?)
566 }
567 WatchKeyOutput::Unchanged => WatchEntry::Unchanged,
568 })
569 })
570 .collect::<Result<_, KvError>>()?;
571
572 Ok(Some(entries))
573}
574
575#[op2(async)]
576async fn op_kv_finish_dequeued_message<DBH>(
577 state: Rc<RefCell<OpState>>,
578 #[smi] handle_rid: ResourceId,
579 success: bool,
580) -> Result<(), KvError>
581where
582 DBH: DatabaseHandler + 'static,
583{
584 let handle = {
585 let mut state = state.borrow_mut();
586 let handle = state
587 .resource_table
588 .take::<QueueMessageResource<<<DBH>::DB as Database>::QMH>>(handle_rid)
589 .map_err(|_| KvErrorKind::QueueMessageNotFound)?;
590 Rc::try_unwrap(handle)
591 .map_err(|_| KvErrorKind::QueueMessageNotFound)?
592 .handle
593 };
594 if let Err(err) = handle.finish(success).await {
597 debug!("Failed to finish dequeued message: {}", err);
598 };
599 Ok(())
600}
601
602#[derive(Debug, thiserror::Error, deno_error::JsError)]
603pub enum KvCheckError {
604 #[class(type)]
605 #[error("invalid versionstamp")]
606 InvalidVersionstamp,
607 #[class(inherit)]
608 #[error(transparent)]
609 Io(std::io::Error),
610}
611
612type V8KvCheck = (KvKey, Option<ByteString>);
613
614fn check_from_v8(value: V8KvCheck) -> Result<Check, KvCheckError> {
615 let versionstamp = match value.1 {
616 Some(data) => {
617 let mut out = [0u8; 10];
618 if data.len() != out.len() * 2 {
619 return Err(KvCheckError::InvalidVersionstamp);
620 }
621 faster_hex::hex_decode(&data, &mut out)
622 .map_err(|_| KvCheckError::InvalidVersionstamp)?;
623 Some(out)
624 }
625 None => None,
626 };
627 Ok(Check {
628 key: encode_v8_key(value.0).map_err(KvCheckError::Io)?,
629 versionstamp,
630 })
631}
632
633#[derive(Debug, thiserror::Error, deno_error::JsError)]
634pub enum KvMutationError {
635 #[class(generic)]
636 #[error(transparent)]
637 BigInt(#[from] num_bigint::TryFromBigIntError<num_bigint::BigInt>),
638 #[class(inherit)]
639 #[error(transparent)]
640 Io(
641 #[from]
642 #[inherit]
643 std::io::Error,
644 ),
645 #[class(type)]
646 #[error("Invalid mutation '{0}' with value")]
647 InvalidMutationWithValue(String),
648 #[class(type)]
649 #[error("Invalid mutation '{0}' without value")]
650 InvalidMutationWithoutValue(String),
651}
652
653type V8KvMutation = (KvKey, String, Option<FromV8Value>, Option<u64>);
654
655fn mutation_from_v8(
656 (value, current_timstamp): (V8KvMutation, DateTime<Utc>),
657) -> Result<Mutation, KvMutationError> {
658 let key = encode_v8_key(value.0)?;
659 let kind = match (value.1.as_str(), value.2) {
660 ("set", Some(value)) => MutationKind::Set(value.try_into()?),
661 ("delete", None) => MutationKind::Delete,
662 ("sum", Some(value)) => MutationKind::Sum {
663 value: value.try_into()?,
664 min_v8: vec![],
665 max_v8: vec![],
666 clamp: false,
667 },
668 ("min", Some(value)) => MutationKind::Min(value.try_into()?),
669 ("max", Some(value)) => MutationKind::Max(value.try_into()?),
670 ("setSuffixVersionstampedKey", Some(value)) => {
671 MutationKind::SetSuffixVersionstampedKey(value.try_into()?)
672 }
673 (op, Some(_)) => {
674 return Err(KvMutationError::InvalidMutationWithValue(op.to_string()))
675 }
676 (op, None) => {
677 return Err(KvMutationError::InvalidMutationWithoutValue(op.to_string()))
678 }
679 };
680 Ok(Mutation {
681 key,
682 kind,
683 expire_at: value
684 .3
685 .map(|expire_in| current_timstamp + Duration::from_millis(expire_in)),
686 })
687}
688
689type V8Enqueue = (JsBuffer, u64, Vec<KvKey>, Option<Vec<u32>>);
690
691fn enqueue_from_v8(
692 value: V8Enqueue,
693 current_timestamp: DateTime<Utc>,
694) -> Result<Enqueue, std::io::Error> {
695 Ok(Enqueue {
696 payload: value.0.to_vec(),
697 deadline: current_timestamp
698 + chrono::Duration::milliseconds(value.1 as i64),
699 keys_if_undelivered: value
700 .2
701 .into_iter()
702 .map(encode_v8_key)
703 .collect::<std::io::Result<_>>()?,
704 backoff_schedule: value.3,
705 })
706}
707
708fn encode_v8_key(key: KvKey) -> Result<Vec<u8>, std::io::Error> {
709 encode_key(&Key(key.into_iter().map(key_part_from_v8).collect()))
710}
711
712enum RawSelector {
713 Prefixed {
714 prefix: Vec<u8>,
715 start: Option<Vec<u8>>,
716 end: Option<Vec<u8>>,
717 },
718 Range {
719 start: Vec<u8>,
720 end: Vec<u8>,
721 },
722}
723
724impl RawSelector {
725 fn from_tuple(
726 prefix: Option<KvKey>,
727 start: Option<KvKey>,
728 end: Option<KvKey>,
729 ) -> Result<Self, KvError> {
730 let prefix = prefix.map(encode_v8_key).transpose()?;
731 let start = start.map(encode_v8_key).transpose()?;
732 let end = end.map(encode_v8_key).transpose()?;
733
734 match (prefix, start, end) {
735 (Some(prefix), None, None) => Ok(Self::Prefixed {
736 prefix,
737 start: None,
738 end: None,
739 }),
740 (Some(prefix), Some(start), None) => {
741 if !start.starts_with(&prefix) || start.len() == prefix.len() {
742 return Err(KvErrorKind::StartKeyNotInKeyspace.into_box());
743 }
744 Ok(Self::Prefixed {
745 prefix,
746 start: Some(start),
747 end: None,
748 })
749 }
750 (Some(prefix), None, Some(end)) => {
751 if !end.starts_with(&prefix) || end.len() == prefix.len() {
752 return Err(KvErrorKind::EndKeyNotInKeyspace.into_box());
753 }
754 Ok(Self::Prefixed {
755 prefix,
756 start: None,
757 end: Some(end),
758 })
759 }
760 (None, Some(start), Some(end)) => {
761 if start > end {
762 return Err(KvErrorKind::StartKeyGreaterThanEndKey.into_box());
763 }
764 Ok(Self::Range { start, end })
765 }
766 (None, Some(start), None) => {
767 let end = start.iter().copied().chain(Some(0)).collect();
768 Ok(Self::Range { start, end })
769 }
770 _ => Err(KvErrorKind::InvalidRange.into_box()),
771 }
772 }
773
774 fn start(&self) -> Option<&[u8]> {
775 match self {
776 Self::Prefixed { start, .. } => start.as_deref(),
777 Self::Range { start, .. } => Some(start),
778 }
779 }
780
781 fn end(&self) -> Option<&[u8]> {
782 match self {
783 Self::Prefixed { end, .. } => end.as_deref(),
784 Self::Range { end, .. } => Some(end),
785 }
786 }
787
788 fn common_prefix(&self) -> &[u8] {
789 match self {
790 Self::Prefixed { prefix, .. } => prefix,
791 Self::Range { start, end } => common_prefix_for_bytes(start, end),
792 }
793 }
794
795 fn range_start_key(&self) -> Vec<u8> {
796 match self {
797 Self::Prefixed {
798 start: Some(start), ..
799 } => start.clone(),
800 Self::Range { start, .. } => start.clone(),
801 Self::Prefixed { prefix, .. } => {
802 prefix.iter().copied().chain(Some(0)).collect()
803 }
804 }
805 }
806
807 fn range_end_key(&self) -> Vec<u8> {
808 match self {
809 Self::Prefixed { end: Some(end), .. } => end.clone(),
810 Self::Range { end, .. } => end.clone(),
811 Self::Prefixed { prefix, .. } => {
812 prefix.iter().copied().chain(Some(0xff)).collect()
813 }
814 }
815 }
816}
817
818fn common_prefix_for_bytes<'a>(a: &'a [u8], b: &'a [u8]) -> &'a [u8] {
819 let mut i = 0;
820 while i < a.len() && i < b.len() && a[i] == b[i] {
821 i += 1;
822 }
823 &a[..i]
824}
825
826fn encode_cursor(
827 selector: &RawSelector,
828 boundary_key: &[u8],
829) -> Result<String, KvError> {
830 let common_prefix = selector.common_prefix();
831 if !boundary_key.starts_with(common_prefix) {
832 return Err(KvErrorKind::InvalidBoundaryKey.into_box());
833 }
834 Ok(BASE64_URL_SAFE.encode(&boundary_key[common_prefix.len()..]))
835}
836
837fn decode_selector_and_cursor(
838 selector: &RawSelector,
839 reverse: bool,
840 cursor: Option<&ByteString>,
841) -> Result<(Vec<u8>, Vec<u8>), KvError> {
842 let Some(cursor) = cursor else {
843 return Ok((selector.range_start_key(), selector.range_end_key()));
844 };
845
846 let common_prefix = selector.common_prefix();
847 let cursor = BASE64_URL_SAFE
848 .decode(cursor)
849 .map_err(|_| KvErrorKind::InvalidCursor)?;
850
851 let first_key: Vec<u8>;
852 let last_key: Vec<u8>;
853
854 if reverse {
855 first_key = selector.range_start_key();
856 last_key = common_prefix
857 .iter()
858 .copied()
859 .chain(cursor.iter().copied())
860 .collect();
861 } else {
862 first_key = common_prefix
863 .iter()
864 .copied()
865 .chain(cursor.iter().copied())
866 .chain(Some(0))
867 .collect();
868 last_key = selector.range_end_key();
869 }
870
871 if let Some(start) = selector.start() {
873 if &first_key[..] < start {
874 return Err(KvErrorKind::CursorOutOfBounds.into_box());
875 }
876 }
877
878 if let Some(end) = selector.end() {
879 if &last_key[..] > end {
880 return Err(KvErrorKind::CursorOutOfBounds.into_box());
881 }
882 }
883
884 Ok((first_key, last_key))
885}
886
887#[op2(async)]
888#[string]
889async fn op_kv_atomic_write<DBH>(
890 state: Rc<RefCell<OpState>>,
891 #[smi] rid: ResourceId,
892 #[serde] checks: Vec<V8KvCheck>,
893 #[serde] mutations: Vec<V8KvMutation>,
894 #[serde] enqueues: Vec<V8Enqueue>,
895) -> Result<Option<String>, KvError>
896where
897 DBH: DatabaseHandler + 'static,
898{
899 let current_timestamp = chrono::Utc::now();
900 let db = {
901 let state = state.borrow();
902 let resource = state
903 .resource_table
904 .get::<DatabaseResource<DBH::DB>>(rid)
905 .map_err(KvErrorKind::Resource)?;
906 resource.db.clone()
907 };
908
909 let config = {
910 let state = state.borrow();
911 state.borrow::<Rc<KvConfig>>().clone()
912 };
913
914 if checks.len() > config.max_checks {
915 return Err(KvErrorKind::TooManyChecks(config.max_checks).into_box());
916 }
917
918 if mutations.len() + enqueues.len() > config.max_mutations {
919 return Err(KvErrorKind::TooManyMutations(config.max_mutations).into_box());
920 }
921
922 let checks = checks
923 .into_iter()
924 .map(check_from_v8)
925 .collect::<Result<Vec<Check>, KvCheckError>>()
926 .map_err(KvErrorKind::InvalidCheck)?;
927 let mutations = mutations
928 .into_iter()
929 .map(|mutation| mutation_from_v8((mutation, current_timestamp)))
930 .collect::<Result<Vec<Mutation>, KvMutationError>>()
931 .map_err(KvErrorKind::InvalidMutation)?;
932 let enqueues = enqueues
933 .into_iter()
934 .map(|e| enqueue_from_v8(e, current_timestamp))
935 .collect::<Result<Vec<Enqueue>, std::io::Error>>()
936 .map_err(KvErrorKind::InvalidEnqueue)?;
937
938 let mut total_payload_size = 0usize;
939 let mut total_key_size = 0usize;
940
941 for key in checks
942 .iter()
943 .map(|c| &c.key)
944 .chain(mutations.iter().map(|m| &m.key))
945 {
946 if key.is_empty() {
947 return Err(KvErrorKind::EmptyKey.into_box());
948 }
949
950 total_payload_size += check_write_key_size(key, &config)?;
951 }
952
953 for (key, value) in mutations
954 .iter()
955 .flat_map(|m| m.kind.value().map(|x| (&m.key, x)))
956 {
957 let key_size = check_write_key_size(key, &config)?;
958 total_payload_size += check_value_size(value, &config)? + key_size;
959 total_key_size += key_size;
960 }
961
962 for enqueue in &enqueues {
963 total_payload_size +=
964 check_enqueue_payload_size(&enqueue.payload, &config)?;
965 if let Some(schedule) = enqueue.backoff_schedule.as_ref() {
966 total_payload_size += 4 * schedule.len();
967 }
968 }
969
970 if total_payload_size > config.max_total_mutation_size_bytes {
971 return Err(
972 KvErrorKind::TotalMutationTooLarge(config.max_total_mutation_size_bytes)
973 .into_box(),
974 );
975 }
976
977 if total_key_size > config.max_total_key_size_bytes {
978 return Err(
979 KvErrorKind::TotalKeyTooLarge(config.max_total_key_size_bytes).into_box(),
980 );
981 }
982
983 let atomic_write = AtomicWrite {
984 checks,
985 mutations,
986 enqueues,
987 };
988
989 let result = db
990 .atomic_write(atomic_write)
991 .await
992 .map_err(KvErrorKind::Kv)?;
993
994 Ok(result.map(|res| faster_hex::hex_string(&res.versionstamp)))
995}
996
997type EncodeCursorRangeSelector = (Option<KvKey>, Option<KvKey>, Option<KvKey>);
999
1000#[op2]
1001#[string]
1002fn op_kv_encode_cursor(
1003 #[serde] (prefix, start, end): EncodeCursorRangeSelector,
1004 #[serde] boundary_key: KvKey,
1005) -> Result<String, KvError> {
1006 let selector = RawSelector::from_tuple(prefix, start, end)?;
1007 let boundary_key = encode_v8_key(boundary_key)?;
1008 let cursor = encode_cursor(&selector, &boundary_key)?;
1009 Ok(cursor)
1010}
1011
1012fn check_read_key_size(key: &[u8], config: &KvConfig) -> Result<(), KvError> {
1013 if key.len() > config.max_read_key_size_bytes {
1014 Err(
1015 KvErrorKind::KeyTooLargeToRead(config.max_read_key_size_bytes).into_box(),
1016 )
1017 } else {
1018 Ok(())
1019 }
1020}
1021
1022fn check_write_key_size(
1023 key: &[u8],
1024 config: &KvConfig,
1025) -> Result<usize, KvError> {
1026 if key.len() > config.max_write_key_size_bytes {
1027 Err(
1028 KvErrorKind::KeyTooLargeToWrite(config.max_write_key_size_bytes)
1029 .into_box(),
1030 )
1031 } else {
1032 Ok(key.len())
1033 }
1034}
1035
1036fn check_value_size(
1037 value: &KvValue,
1038 config: &KvConfig,
1039) -> Result<usize, KvError> {
1040 let payload = match value {
1041 KvValue::Bytes(x) => x,
1042 KvValue::V8(x) => x,
1043 KvValue::U64(_) => return Ok(8),
1044 };
1045
1046 if payload.len() > config.max_value_size_bytes {
1047 Err(KvErrorKind::ValueTooLarge(config.max_value_size_bytes).into_box())
1048 } else {
1049 Ok(payload.len())
1050 }
1051}
1052
1053fn check_enqueue_payload_size(
1054 payload: &[u8],
1055 config: &KvConfig,
1056) -> Result<usize, KvError> {
1057 if payload.len() > config.max_value_size_bytes {
1058 Err(
1059 KvErrorKind::EnqueuePayloadTooLarge(config.max_value_size_bytes)
1060 .into_box(),
1061 )
1062 } else {
1063 Ok(payload.len())
1064 }
1065}