1use std::any::Any;
2use std::collections::HashMap;
3use std::fmt::Debug;
4use std::ops::{Deref, Range};
5use std::sync::Arc;
6use std::sync::atomic::{AtomicBool, Ordering};
7
8use anyhow::Result;
9use futures::TryStreamExt;
10use futures::future::try_join_all;
11use futures::stream::Stream;
12use tokio::sync::{Mutex, MutexGuard, Notify};
13use uuid::Uuid;
14
15use super::batch::Batch;
16use super::{Key, Val, util};
17use crate::catalog::providers::{
18 ApiProvider, AuthorisationProvider, BucketProvider, CatalogProvider, DatabaseProvider,
19 NamespaceProvider, NodeProvider, RootProvider, TableProvider, UserProvider,
20};
21use crate::catalog::{
22 self, ApiDefinition, ConfigDefinition, DatabaseDefinition, DatabaseId, DefaultConfig, IndexId,
23 NamespaceDefinition, NamespaceId, Record, TableDefinition, TableId,
24};
25use crate::ctx::Context;
26use crate::dbs::node::Node;
27use crate::doc::CursorRecord;
28use crate::err::Error;
29use crate::idx::planner::ScanDirection;
30use crate::key::database::sq::Sq;
31use crate::kvs::cache::tx::TransactionCache;
32use crate::kvs::index::{BatchId, BatchIdsCleanQueue, SharedIndexKey};
33use crate::kvs::scanner::Direction;
34use crate::kvs::sequences::Sequences;
35use crate::kvs::{BoxTimeStamp, BoxTimeStampImpl, KVKey, KVValue, Transactor, cache};
36use crate::val::{RecordId, RecordIdKey, TableName};
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub(crate) enum CachePolicy {
46 ReadWrite,
49 ReadOnly,
52}
53
54pub struct Transaction {
55 local: bool,
57 tr: Transactor,
59 cache: TransactionCache,
61 sequences: Sequences,
63 cf: crate::cf::Writer,
65 async_event_trigger: Arc<Notify>,
67 trigger_async_event: AtomicBool,
69 pending_index_batches: Mutex<HashMap<SharedIndexKey, (BatchId, BatchIdsCleanQueue)>>,
72}
73
74impl Deref for Transaction {
75 type Target = Transactor;
76
77 fn deref(&self) -> &Self::Target {
78 &self.tr
79 }
80}
81
82impl Transaction {
83 pub fn new(
85 local: bool,
86 sequences: Sequences,
87 async_event_trigger: Arc<Notify>,
88 tr: Transactor,
89 ) -> Transaction {
90 Transaction {
91 local,
92 tr,
93 cache: TransactionCache::new(),
94 sequences,
95 cf: crate::cf::Writer::new(),
96 async_event_trigger,
97 trigger_async_event: AtomicBool::new(false),
98 pending_index_batches: Mutex::new(HashMap::new()),
99 }
100 }
101
102 pub(super) async fn lock_pending_index_batches<'a>(
103 &'a self,
104 ) -> MutexGuard<'a, HashMap<SharedIndexKey, (BatchId, BatchIdsCleanQueue)>> {
105 self.pending_index_batches.lock().await
106 }
107
108 pub fn is_local(&self) -> bool {
110 self.local
111 }
112
113 pub fn enclose(self) -> Arc<Transaction> {
115 Arc::new(self)
116 }
117
118 pub fn closed(&self) -> bool {
125 self.tr.closed()
126 }
127
128 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
132 pub async fn cancel(&self) -> Result<()> {
133 self.cf.clear();
135 self.cleanup_index_batches().await;
138 Ok(self.tr.cancel().await.map_err(Error::from)?)
140 }
141
142 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
146 pub async fn commit(&self) -> Result<()> {
147 if let Err(e) = self.store_changes().await {
149 let _ = self.cancel().await;
151 return Err(e);
153 }
154 if let Err(e) = self.tr.commit().await {
156 self.cleanup_index_batches().await;
158 anyhow::bail!(e);
159 }
160 if self.trigger_async_event.load(Ordering::Relaxed) {
161 self.async_event_trigger.notify_one();
163 }
164 Ok(())
165 }
166
167 async fn cleanup_index_batches(&self) {
169 let batches = {
170 let mut pending = self.lock_pending_index_batches().await;
171 std::mem::take(&mut *pending)
172 };
173 for (_, (batch_id, clean_queue)) in batches {
174 clean_queue.lock().await.push(batch_id);
176 }
177 }
178
179 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
181 pub async fn exists<K>(&self, key: &K, version: Option<u64>) -> Result<bool>
182 where
183 K: KVKey + Debug,
184 {
185 let key = key.encode_key()?;
186 Ok(self.tr.exists(key, version).await.map_err(Error::from)?)
187 }
188
189 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
191 pub async fn get<K>(&self, key: &K, version: Option<u64>) -> Result<Option<K::ValueType>>
192 where
193 K: KVKey + Debug,
194 {
195 let key = key.encode_key()?;
196 let val = self.tr.get(key, version).await.map_err(Error::from)?;
197 val.map(K::ValueType::kv_decode_value).transpose()
198 }
199
200 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
202 pub async fn getm<K>(
203 &self,
204 keys: Vec<K>,
205 version: Option<u64>,
206 ) -> Result<Vec<Option<K::ValueType>>>
207 where
208 K: KVKey + Debug,
209 {
210 let keys = keys.iter().map(|k| k.encode_key()).collect::<Result<Vec<_>>>()?;
211 self.tr
212 .getm(keys, version)
213 .await
214 .map_err(Error::from)?
215 .into_iter()
216 .map(|v| match v {
217 Some(v) => K::ValueType::kv_decode_value(v).map(Some),
218 None => Ok(None),
219 })
220 .collect()
221 }
222
223 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
228 pub async fn getp<K>(&self, key: &K) -> Result<Vec<(Key, K::ValueType)>>
229 where
230 K: KVKey + Debug,
231 {
232 let key = key.encode_key()?;
233 self.tr
234 .getp(key)
235 .await
236 .map_err(Error::from)?
237 .into_iter()
238 .map(|(k, v)| Ok((k, K::ValueType::kv_decode_value(v)?)))
239 .collect()
240 }
241
242 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
247 pub async fn getr<K>(
248 &self,
249 rng: Range<K>,
250 version: Option<u64>,
251 ) -> Result<Vec<(Key, K::ValueType)>>
252 where
253 K: KVKey + Debug,
254 {
255 let beg = rng.start.encode_key()?;
256 let end = rng.end.encode_key()?;
257 self.tr
258 .getr(beg..end, version)
259 .await
260 .map_err(Error::from)?
261 .into_iter()
262 .map(|(k, v)| Ok((k, K::ValueType::kv_decode_value(v)?)))
263 .collect()
264 }
265
266 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
277 pub(crate) async fn getm_records(
278 &self,
279 ns: NamespaceId,
280 db: DatabaseId,
281 rids: &[RecordId],
282 version: Option<u64>,
283 cache_policy: CachePolicy,
284 ) -> Result<Vec<Arc<Record>>> {
285 if rids.is_empty() {
286 return Ok(Vec::new());
287 }
288
289 if version.is_some() {
291 let keys: Vec<crate::key::record::RecordKey<'_>> = rids
292 .iter()
293 .map(|rid| crate::key::record::new(ns, db, &rid.table, &rid.key))
294 .collect();
295
296 let values = self.getm(keys, version).await?;
297
298 return values
299 .into_iter()
300 .zip(rids)
301 .map(|(opt_val, rid)| {
302 Ok(match opt_val {
303 Some(mut record) => {
304 record.data.def(rid.clone());
305 record.into_read_only()
306 }
307 None => Arc::new(Default::default()),
308 })
309 })
310 .collect::<Result<Vec<_>, _>>();
311 }
312
313 let mut out: Vec<Option<Arc<Record>>> = vec![None; rids.len()];
315 let mut uncached_rids: Vec<(usize, &RecordId)> = Vec::new();
316
317 for (i, rid) in rids.iter().enumerate() {
318 let qey = cache::tx::Lookup::Record(ns, db, rid.table.as_str(), &rid.key);
319 if let Some(entry) = self.cache.get(&qey) {
320 out[i] = Some(entry.try_into_record()?);
321 } else {
322 uncached_rids.push((i, rid));
323 }
324 }
325
326 if uncached_rids.is_empty() {
328 return out
329 .into_iter()
330 .map(|o| {
331 o.ok_or_else(|| Error::Internal("missing record in multi-get batch".into()))
332 })
333 .collect::<Result<Vec<_>, _>>()
334 .map_err(Into::into);
335 }
336
337 let keys: Vec<crate::key::record::RecordKey<'_>> = uncached_rids
338 .iter()
339 .map(|(_, rid)| crate::key::record::new(ns, db, &rid.table, &rid.key))
340 .collect();
341
342 let values = self.getm(keys, None).await?;
343
344 for (j, opt_val) in values.into_iter().enumerate() {
348 let (i, rid) = uncached_rids[j];
349 let record = match opt_val {
350 Some(mut record) => {
351 record.data.def(rid.clone());
352 let record = record.into_read_only();
353 if matches!(cache_policy, CachePolicy::ReadWrite) {
354 let qey = cache::tx::Lookup::Record(ns, db, rid.table.as_str(), &rid.key);
355 self.cache.insert(qey, cache::tx::Entry::Val(record.clone()));
356 }
357 record
358 }
359 None => Arc::new(Default::default()),
360 };
361 out[i] = Some(record);
362 }
363
364 out.into_iter()
365 .map(|o| o.ok_or_else(|| Error::Internal("missing record in multi-get batch".into())))
366 .collect::<Result<Vec<_>, _>>()
367 .map_err(Into::into)
368 }
369
370 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
372 pub async fn del<K>(&self, key: &K) -> Result<()>
373 where
374 K: KVKey + Debug,
375 {
376 let key = key.encode_key()?;
377 Ok(self.tr.del(key).await.map_err(Error::from)?)
378 }
379
380 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
383 pub async fn delc<K>(&self, key: &K, chk: Option<&K::ValueType>) -> Result<()>
384 where
385 K: KVKey + Debug,
386 {
387 let key = key.encode_key()?;
388 let chk = chk.map(|v| v.kv_encode_value()).transpose()?;
389 Ok(self.tr.delc(key, chk).await.map_err(Error::from)?)
390 }
391
392 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
397 pub async fn delr<K>(&self, rng: Range<K>) -> Result<()>
398 where
399 K: KVKey + Debug,
400 {
401 let beg = rng.start.encode_key()?;
402 let end = rng.end.encode_key()?;
403 Ok(self.tr.delr(beg..end).await.map_err(Error::from)?)
404 }
405
406 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
411 pub async fn delp<K>(&self, key: &K) -> Result<()>
412 where
413 K: KVKey + Debug,
414 {
415 let key = key.encode_key()?;
416 Ok(self.tr.delp(key).await.map_err(Error::from)?)
417 }
418
419 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
421 pub async fn clr<K>(&self, key: &K) -> Result<()>
422 where
423 K: KVKey + Debug,
424 {
425 let key = key.encode_key()?;
426 Ok(self.tr.clr(key).await.map_err(Error::from)?)
427 }
428
429 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
432 pub async fn clrc<K>(&self, key: &K, chk: Option<&K::ValueType>) -> Result<()>
433 where
434 K: KVKey + Debug,
435 {
436 let key = key.encode_key()?;
437 let chk = chk.map(|v| v.kv_encode_value()).transpose()?;
438 Ok(self.tr.clrc(key, chk).await.map_err(Error::from)?)
439 }
440
441 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
446 pub async fn clrr<K>(&self, rng: Range<K>) -> Result<()>
447 where
448 K: KVKey + Debug,
449 {
450 let beg = rng.start.encode_key()?;
451 let end = rng.end.encode_key()?;
452 Ok(self.tr.clrr(beg..end).await.map_err(Error::from)?)
453 }
454
455 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
460 pub async fn clrp<K>(&self, key: &K) -> Result<()>
461 where
462 K: KVKey + Debug,
463 {
464 let key = key.encode_key()?;
465 Ok(self.tr.clrp(key).await.map_err(Error::from)?)
466 }
467
468 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
470 pub async fn set<K>(&self, key: &K, val: &K::ValueType, version: Option<u64>) -> Result<()>
471 where
472 K: KVKey + Debug,
473 {
474 let key = key.encode_key()?;
475 let val = val.kv_encode_value()?;
476 Ok(self.tr.set(key, val, version).await.map_err(Error::from)?)
477 }
478
479 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
481 pub async fn put<K>(&self, key: &K, val: &K::ValueType, version: Option<u64>) -> Result<()>
482 where
483 K: KVKey + Debug,
484 {
485 let key = key.encode_key()?;
486 let val = val.kv_encode_value()?;
487 Ok(self.tr.put(key, val, version).await.map_err(Error::from)?)
488 }
489
490 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
492 pub async fn putc<K>(
493 &self,
494 key: &K,
495 val: &K::ValueType,
496 chk: Option<&K::ValueType>,
497 ) -> Result<()>
498 where
499 K: KVKey + Debug,
500 {
501 let key = key.encode_key()?;
502 let val = val.kv_encode_value()?;
503 let chk = chk.map(|v| v.kv_encode_value()).transpose()?;
504 Ok(self.tr.putc(key, val, chk).await.map_err(Error::from)?)
505 }
506
507 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
509 pub async fn replace<K>(&self, key: &K, val: &K::ValueType) -> Result<()>
510 where
511 K: KVKey + Debug,
512 {
513 let key = key.encode_key()?;
514 let val = val.kv_encode_value()?;
515 Ok(self.tr.replace(key, val).await.map_err(Error::from)?)
516 }
517
518 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
527 pub async fn keys<K>(
528 &self,
529 rng: Range<K>,
530 limit: u32,
531 skip: u32,
532 version: Option<u64>,
533 ) -> Result<Vec<Key>>
534 where
535 K: KVKey + Debug,
536 {
537 let beg = rng.start.encode_key()?;
538 let end = rng.end.encode_key()?;
539 let limit = limit.into();
540 Ok(self.tr.keys(beg..end, limit, skip, version).await.map_err(Error::from)?)
541 }
542
543 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
548 pub async fn keysr<K>(
549 &self,
550 rng: Range<K>,
551 limit: u32,
552 skip: u32,
553 version: Option<u64>,
554 ) -> Result<Vec<Key>>
555 where
556 K: KVKey + Debug,
557 {
558 let beg = rng.start.encode_key()?;
559 let end = rng.end.encode_key()?;
560 let limit = limit.into();
561 Ok(self.tr.keysr(beg..end, limit, skip, version).await.map_err(Error::from)?)
562 }
563
564 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
569 pub async fn scan<K>(
570 &self,
571 rng: Range<K>,
572 limit: u32,
573 skip: u32,
574 version: Option<u64>,
575 ) -> Result<Vec<(Key, Val)>>
576 where
577 K: KVKey + Debug,
578 {
579 let beg = rng.start.encode_key()?;
580 let end = rng.end.encode_key()?;
581 let limit = limit.into();
582 Ok(self.tr.scan(beg..end, limit, skip, version).await.map_err(Error::from)?)
583 }
584
585 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
586 pub async fn scanr<K>(
587 &self,
588 rng: Range<K>,
589 limit: u32,
590 skip: u32,
591 version: Option<u64>,
592 ) -> Result<Vec<(Key, Val)>>
593 where
594 K: KVKey + Debug,
595 {
596 let beg = rng.start.encode_key()?;
597 let end = rng.end.encode_key()?;
598 let limit = limit.into();
599 Ok(self.tr.scanr(beg..end, limit, skip, version).await.map_err(Error::from)?)
600 }
601
602 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
607 pub async fn count<K>(&self, rng: Range<K>, version: Option<u64>) -> Result<usize>
608 where
609 K: KVKey + Debug,
610 {
611 let beg = rng.start.encode_key()?;
612 let end = rng.end.encode_key()?;
613 Ok(self.tr.count(beg..end, version).await.map_err(Error::from)?)
614 }
615
616 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
625 pub async fn batch_keys<K>(
626 &self,
627 rng: Range<K>,
628 batch: u32,
629 version: Option<u64>,
630 ) -> Result<Batch<Key>>
631 where
632 K: KVKey + Debug,
633 {
634 let beg = rng.start.encode_key()?;
635 let end = rng.end.encode_key()?;
636 Ok(self.tr.batch_keys(beg..end, batch, version).await.map_err(Error::from)?)
637 }
638
639 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
644 pub async fn batch_keys_vals<K>(
645 &self,
646 rng: Range<K>,
647 batch: u32,
648 version: Option<u64>,
649 ) -> Result<Batch<(Key, Val)>>
650 where
651 K: KVKey + Debug,
652 {
653 let beg = rng.start.encode_key()?;
654 let end = rng.end.encode_key()?;
655 Ok(self.tr.batch_keys_vals(beg..end, batch, version).await.map_err(Error::from)?)
656 }
657
658 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
669 pub fn stream_keys(
670 &self,
671 rng: Range<Key>,
672 version: Option<u64>,
673 limit: Option<usize>,
674 skip: u32,
675 dir: ScanDirection,
676 ) -> impl Stream<Item = Result<Vec<Key>>> + '_ {
677 self.tr
678 .stream_keys(
679 rng,
680 version,
681 limit,
682 skip,
683 match dir {
684 ScanDirection::Forward => Direction::Forward,
685 ScanDirection::Backward => Direction::Backward,
686 },
687 )
688 .map_err(Error::from)
689 .map_err(Into::into)
690 }
691
692 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
700 pub fn stream_keys_vals(
701 &self,
702 rng: Range<Key>,
703 version: Option<u64>,
704 limit: Option<usize>,
705 skip: u32,
706 dir: ScanDirection,
707 prefetch: bool,
708 ) -> impl Stream<Item = Result<Vec<(Key, Val)>>> + '_ {
709 self.tr
710 .stream_keys_vals(
711 rng,
712 version,
713 limit,
714 skip,
715 match dir {
716 ScanDirection::Forward => Direction::Forward,
717 ScanDirection::Backward => Direction::Backward,
718 },
719 prefetch,
720 )
721 .map_err(Error::from)
722 .map_err(Into::into)
723 }
724
725 pub async fn new_save_point(&self) -> Result<()> {
731 Ok(self.inner.new_save_point().await.map_err(Error::from)?)
732 }
733
734 pub async fn release_last_save_point(&self) -> Result<()> {
736 Ok(self.inner.release_last_save_point().await.map_err(Error::from)?)
737 }
738
739 pub async fn rollback_to_save_point(&self) -> Result<()> {
741 Ok(self.inner.rollback_to_save_point().await.map_err(Error::from)?)
742 }
743
744 pub async fn timestamp(&self) -> Result<BoxTimeStamp> {
750 Ok(self.tr.timestamp().await.map_err(Error::from)?)
751 }
752
753 pub fn timestamp_impl(&self) -> BoxTimeStampImpl {
755 self.tr.timestamp_impl()
756 }
757
758 pub(crate) fn changefeed_buffer_table_change(
764 &self,
765 ns: NamespaceId,
766 db: DatabaseId,
767 tb: &TableName,
768 dt: &TableDefinition,
769 ) {
770 self.cf.changefeed_buffer_table_change(ns, db, tb, dt)
771 }
772
773 #[expect(clippy::too_many_arguments)]
778 pub(crate) fn changefeed_buffer_record_change(
779 &self,
780 ns: NamespaceId,
781 db: DatabaseId,
782 tb: &TableName,
783 id: &RecordId,
784 previous: CursorRecord,
785 current: CursorRecord,
786 store_difference: bool,
787 ) {
788 self.cf.changefeed_buffer_record_change(
789 ns,
790 db,
791 tb,
792 id.clone(),
793 previous,
794 current,
795 store_difference,
796 )
797 }
798
799 pub(crate) async fn store_changes(&self) -> Result<()> {
814 let changes = self.cf.changes()?;
816 if changes.is_empty() {
818 return Ok(());
819 }
820 let buf = &mut [0u8; _];
822 let ts = self.timestamp().await?.encode(buf);
823 let futures = changes.into_iter().map(|(ns, db, tb, value)| async move {
825 let key = crate::key::change::new(ns, db, ts, &tb).encode_key()?;
827 self.tr.set(key, value, None).await.map_err(Error::from)?;
829 Ok::<(), anyhow::Error>(())
831 });
832 try_join_all(futures).await?;
834 Ok(())
836 }
837
838 #[inline]
843 fn set_record_cache(
844 &self,
845 ns: NamespaceId,
846 db: DatabaseId,
847 tb: &TableName,
848 id: &RecordIdKey,
849 record: Arc<Record>,
850 ) {
851 let qey = cache::tx::Lookup::Record(ns, db, tb, id);
853 self.cache.insert(qey, cache::tx::Entry::Val(record));
854 }
855
856 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
858 pub fn clear_cache(&self) {
859 self.cache.clear()
860 }
861
862 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
863 pub async fn compact<K>(&self, prefix_key: Option<K>) -> Result<()>
864 where
865 K: KVKey + Debug,
866 {
867 let rng = match prefix_key {
868 Some(prefix_key) => Some(util::to_prefix_range(prefix_key)?),
869 None => None,
870 };
871 self.tr.inner.compact(rng).await
872 }
873
874 pub(crate) fn trigger_async_event(&self) {
876 self.trigger_async_event.store(true, Ordering::Relaxed);
877 }
878}
879
880#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
881#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
882impl NodeProvider for Transaction {
883 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
885 async fn all_nodes(&self) -> Result<Arc<[Node]>> {
886 let qey = cache::tx::Lookup::Nds;
887 match self.cache.get(&qey) {
888 Some(val) => val.try_into_nds(),
889 None => {
890 let beg = crate::key::root::nd::prefix();
891 let end = crate::key::root::nd::suffix();
892 let val = self.getr(beg..end, None).await?;
893 let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
894 let entry = cache::tx::Entry::Nds(val.clone());
895 self.cache.insert(qey, entry);
896 Ok(val)
897 }
898 }
899 }
900
901 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
903 async fn get_node(&self, id: Uuid) -> Result<Arc<Node>> {
904 let qey = cache::tx::Lookup::Nd(id);
905 match self.cache.get(&qey) {
906 Some(val) => val,
907 None => {
908 let key = crate::key::root::nd::new(id);
909 let val = self.get(&key, None).await?.ok_or_else(|| Error::NdNotFound {
910 uuid: id.to_string(),
911 })?;
912 let val = cache::tx::Entry::Any(Arc::new(val));
913 self.cache.insert(qey, val.clone());
914 val
915 }
916 }
917 .try_into_type()
918 }
919}
920
921#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
922#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
923impl RootProvider for Transaction {
924 async fn get_default_config(&self) -> Result<Option<Arc<DefaultConfig>>> {
925 let qey = cache::tx::Lookup::Rcg("default");
926 match self.cache.get(&qey) {
927 Some(val) => val,
928 None => {
929 let key = crate::key::root::root_config::new("default");
930 let Some(val) = self.get(&key, None).await? else {
931 return Ok(None);
932 };
933 let ConfigDefinition::Default(val) = val else {
934 fail!("Expected a default config but found {val:?} instead");
935 };
936 let val = cache::tx::Entry::Any(Arc::new(val));
937 self.cache.insert(qey, val.clone());
938 val
939 }
940 }
941 .try_into_type()
942 .map(Option::Some)
943 }
944
945 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
947 async fn get_root_config(&self, cg: &str) -> Result<Option<Arc<ConfigDefinition>>> {
948 let qey = cache::tx::Lookup::Rcg(cg);
949 match self.cache.get(&qey) {
950 Some(val) => val.try_into_type().map(Option::Some),
951 None => {
952 let key = crate::key::root::root_config::new(cg);
953 if let Some(val) = self.get(&key, None).await? {
954 let val = Arc::new(val);
955 let entr = cache::tx::Entry::Any(val.clone());
956 self.cache.insert(qey, entr);
957 Ok(Some(val))
958 } else {
959 Ok(None)
960 }
961 }
962 }
963 }
964}
965
966#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
967#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
968impl NamespaceProvider for Transaction {
969 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
971 async fn all_ns(&self) -> Result<Arc<[NamespaceDefinition]>> {
972 let qey = cache::tx::Lookup::Nss;
973 match self.cache.get(&qey) {
974 Some(val) => val.try_into_nss(),
975 None => {
976 let beg = crate::key::root::ns::prefix();
977 let end = crate::key::root::ns::suffix();
978 let val = self.getr(beg..end, None).await?;
979 let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
980 let entry = cache::tx::Entry::Nss(val.clone());
981 self.cache.insert(qey, entry);
982 Ok(val)
983 }
984 }
985 }
986
987 async fn get_ns_by_name(&self, ns: &str) -> Result<Option<Arc<NamespaceDefinition>>> {
988 let qey = cache::tx::Lookup::NsByName(ns);
989 match self.cache.get(&qey) {
990 Some(val) => val.try_into_type().map(Some),
991 None => {
992 let key = crate::key::root::ns::new(ns);
993 let Some(ns) = self.get(&key, None).await? else {
994 return Ok(None);
995 };
996
997 let ns = Arc::new(ns);
998 let entr = cache::tx::Entry::Any(ns.clone());
999 self.cache.insert(qey, entr);
1000 Ok(Some(ns))
1001 }
1002 }
1003 }
1004
1005 async fn expect_ns_by_name(&self, ns: &str) -> Result<Arc<NamespaceDefinition>> {
1006 match self.get_ns_by_name(ns).await? {
1007 Some(val) => Ok(val),
1008 None => anyhow::bail!(Error::NsNotFound {
1009 name: ns.to_owned(),
1010 }),
1011 }
1012 }
1013
1014 async fn put_ns(&self, ns: NamespaceDefinition) -> Result<Arc<NamespaceDefinition>> {
1015 let key = crate::key::root::ns::new(&ns.name);
1016 self.set(&key, &ns, None).await?;
1017
1018 let list_key = cache::tx::Lookup::Nss;
1020 self.cache.remove(list_key);
1021
1022 let cached_ns = Arc::new(ns.clone());
1024
1025 let entry = cache::tx::Entry::Any(Arc::clone(&cached_ns) as Arc<dyn Any + Send + Sync>);
1026 let qey = cache::tx::Lookup::NsByName(&ns.name);
1027 self.cache.insert(qey, entry);
1028
1029 Ok(cached_ns)
1030 }
1031
1032 async fn get_next_ns_id(&self, ctx: Option<&Context>) -> Result<NamespaceId> {
1033 self.sequences.next_namespace_id(ctx).await
1034 }
1035}
1036
1037#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
1038#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
1039impl DatabaseProvider for Transaction {
1040 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1042 async fn all_db(&self, ns: NamespaceId) -> Result<Arc<[DatabaseDefinition]>> {
1043 let qey = cache::tx::Lookup::Dbs(ns);
1044 match self.cache.get(&qey) {
1045 Some(val) => val.try_into_dbs(),
1046 None => {
1047 let beg = crate::key::namespace::db::prefix(ns)?;
1048 let end = crate::key::namespace::db::suffix(ns)?;
1049 let val = self.getr(beg..end, None).await?;
1050 let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
1051 let entry = cache::tx::Entry::Dbs(val.clone());
1052 self.cache.insert(qey, entry);
1053 Ok(val)
1054 }
1055 }
1056 }
1057
1058 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1060 async fn get_db_by_name(&self, ns: &str, db: &str) -> Result<Option<Arc<DatabaseDefinition>>> {
1061 let qey = cache::tx::Lookup::DbByName(ns, db);
1062 match self.cache.get(&qey) {
1063 Some(val) => val.try_into_type().map(Some),
1064 None => {
1065 let Some(ns) = self.get_ns_by_name(ns).await? else {
1066 return Ok(None);
1067 };
1068
1069 let key = crate::key::namespace::db::new(ns.namespace_id, db);
1070 let Some(db_def) = self.get(&key, None).await? else {
1071 return Ok(None);
1072 };
1073
1074 let val = Arc::new(db_def);
1075 let entry = cache::tx::Entry::Any(val.clone());
1076 self.cache.insert(qey, entry);
1077 Ok(Some(val))
1078 }
1079 }
1080 }
1081
1082 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self, ctx))]
1085 async fn get_or_add_db_upwards(
1086 &self,
1087 ctx: Option<&Context>,
1088 ns: &str,
1089 db: &str,
1090 upwards: bool,
1091 ) -> Result<Arc<DatabaseDefinition>> {
1092 let qey = cache::tx::Lookup::DbByName(ns, db);
1093 match self.cache.get(&qey) {
1094 Some(val) => {
1096 let t = val.try_into_type()?;
1097 Ok(t)
1098 }
1099 None => {
1101 let db_def = self.get_db_by_name(ns, db).await?;
1102 if let Some(db_def) = db_def {
1103 return Ok(db_def);
1104 }
1105
1106 let ns_def = if upwards {
1107 self.get_or_add_ns(ctx, ns).await?
1108 } else {
1109 match self.get_ns_by_name(ns).await? {
1110 Some(ns_def) => ns_def,
1111 None => {
1112 return Err(Error::NsNotFound {
1113 name: ns.to_owned(),
1114 }
1115 .into());
1116 }
1117 }
1118 };
1119
1120 let db_def = DatabaseDefinition {
1121 namespace_id: ns_def.namespace_id,
1122 database_id: self.get_next_db_id(ctx, ns_def.namespace_id).await?,
1123 name: db.to_string(),
1124 comment: None,
1125 changefeed: None,
1126 strict: false,
1127 };
1128
1129 return self.put_db(&ns_def.name, db_def).await;
1130 }
1131 }
1132 }
1133
1134 async fn get_next_db_id(&self, ctx: Option<&Context>, ns: NamespaceId) -> Result<DatabaseId> {
1135 self.sequences.next_database_id(ctx, ns).await
1136 }
1137
1138 async fn put_db(&self, ns: &str, db: DatabaseDefinition) -> Result<Arc<DatabaseDefinition>> {
1139 let key = crate::key::namespace::db::new(db.namespace_id, &db.name);
1140 self.set(&key, &db, None).await?;
1141
1142 let list_key = cache::tx::Lookup::Dbs(db.namespace_id);
1144 self.cache.remove(list_key);
1145
1146 let cached_db = Arc::new(db.clone());
1148
1149 let entry = cache::tx::Entry::Any(Arc::clone(&cached_db) as Arc<dyn Any + Send + Sync>);
1150 let qey = cache::tx::Lookup::DbByName(ns, &db.name);
1151 self.cache.insert(qey, entry);
1152
1153 Ok(cached_db)
1154 }
1155
1156 async fn del_db(&self, ns: &str, db: &str, expunge: bool) -> Result<Option<()>> {
1157 let Some(db) = self.get_db_by_name(ns, db).await? else {
1158 return Ok(None);
1159 };
1160 let key = crate::key::namespace::db::new(db.namespace_id, &db.name);
1161 let database_root = crate::key::database::all::new(db.namespace_id, db.database_id);
1162 if expunge {
1163 self.clr(&key).await?;
1164 self.clrp(&database_root).await?;
1165 } else {
1166 self.del(&key).await?;
1167 self.delp(&database_root).await?
1168 };
1169
1170 let list_key = cache::tx::Lookup::Dbs(db.namespace_id);
1172 self.cache.remove(list_key);
1173
1174 let db_key = cache::tx::Lookup::DbByName(ns, &db.name);
1176 self.cache.remove(db_key);
1177
1178 Ok(Some(()))
1179 }
1180
1181 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1183 async fn all_db_analyzers(
1184 &self,
1185 ns: NamespaceId,
1186 db: DatabaseId,
1187 ) -> Result<Arc<[catalog::AnalyzerDefinition]>> {
1188 let qey = cache::tx::Lookup::Azs(ns, db);
1189 match self.cache.get(&qey) {
1190 Some(val) => val.try_into_azs(),
1191 None => {
1192 let beg = crate::key::database::az::prefix(ns, db)?;
1193 let end = crate::key::database::az::suffix(ns, db)?;
1194 let val = self.getr(beg..end, None).await?;
1195 let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
1196 let entry = cache::tx::Entry::Azs(val.clone());
1197 self.cache.insert(qey, entry);
1198 Ok(val)
1199 }
1200 }
1201 }
1202
1203 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1205 async fn all_db_sequences(
1206 &self,
1207 ns: NamespaceId,
1208 db: DatabaseId,
1209 ) -> Result<Arc<[catalog::SequenceDefinition]>> {
1210 let qey = cache::tx::Lookup::Sqs(ns, db);
1211 match self.cache.get(&qey) {
1212 Some(val) => val.try_into_sqs(),
1213 None => {
1214 let beg = crate::key::database::sq::prefix(ns, db)?;
1215 let end = crate::key::database::sq::suffix(ns, db)?;
1216 let val = self.getr(beg..end, None).await?;
1217 let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
1218 let entry = cache::tx::Entry::Sqs(val.clone());
1219 self.cache.insert(qey, entry);
1220 Ok(val)
1221 }
1222 }
1223 }
1224
1225 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1227 async fn all_db_functions(
1228 &self,
1229 ns: NamespaceId,
1230 db: DatabaseId,
1231 ) -> Result<Arc<[catalog::FunctionDefinition]>> {
1232 let qey = cache::tx::Lookup::Fcs(ns, db);
1233 match self.cache.get(&qey) {
1234 Some(val) => val.try_into_fcs(),
1235 None => {
1236 let beg = crate::key::database::fc::prefix(ns, db)?;
1237 let end = crate::key::database::fc::suffix(ns, db)?;
1238 let val = self.getr(beg..end, None).await?;
1239 let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
1240 let entry = cache::tx::Entry::Fcs(val.clone());
1241 self.cache.insert(qey, entry);
1242 Ok(val)
1243 }
1244 }
1245 }
1246
1247 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1249 async fn all_db_modules(
1250 &self,
1251 ns: NamespaceId,
1252 db: DatabaseId,
1253 ) -> Result<Arc<[catalog::ModuleDefinition]>> {
1254 let qey = cache::tx::Lookup::Mds(ns, db);
1255 match self.cache.get(&qey) {
1256 Some(val) => val.try_into_mds(),
1257 None => {
1258 let beg = crate::key::database::md::prefix(ns, db)?;
1259 let end = crate::key::database::md::suffix(ns, db)?;
1260 let val = self.getr(beg..end, None).await?;
1261 let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
1262 let entry = cache::tx::Entry::Mds(val.clone());
1263 self.cache.insert(qey, entry);
1264 Ok(val)
1265 }
1266 }
1267 }
1268
1269 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1271 async fn all_db_params(
1272 &self,
1273 ns: NamespaceId,
1274 db: DatabaseId,
1275 ) -> Result<Arc<[catalog::ParamDefinition]>> {
1276 let qey = cache::tx::Lookup::Pas(ns, db);
1277 match self.cache.get(&qey) {
1278 Some(val) => val.try_into_pas(),
1279 None => {
1280 let beg = crate::key::database::pa::prefix(ns, db)?;
1281 let end = crate::key::database::pa::suffix(ns, db)?;
1282 let val = self.getr(beg..end, None).await?;
1283 let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
1284 let entry = cache::tx::Entry::Pas(val.clone());
1285 self.cache.insert(qey, entry);
1286 Ok(val)
1287 }
1288 }
1289 }
1290
1291 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1293 async fn all_db_models(
1294 &self,
1295 ns: NamespaceId,
1296 db: DatabaseId,
1297 ) -> Result<Arc<[catalog::MlModelDefinition]>> {
1298 let qey = cache::tx::Lookup::Mls(ns, db);
1299 match self.cache.get(&qey) {
1300 Some(val) => val.try_into_mls(),
1301 None => {
1302 let beg = crate::key::database::ml::prefix(ns, db)?;
1303 let end = crate::key::database::ml::suffix(ns, db)?;
1304 let val = self.getr(beg..end, None).await?;
1305 let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
1306 let entry = cache::tx::Entry::Mls(val.clone());
1307 self.cache.insert(qey, entry);
1308 Ok(val)
1309 }
1310 }
1311 }
1312
1313 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1315 async fn all_db_configs(
1316 &self,
1317 ns: NamespaceId,
1318 db: DatabaseId,
1319 ) -> Result<Arc<[ConfigDefinition]>> {
1320 let qey = cache::tx::Lookup::Cgs(ns, db);
1321 match self.cache.get(&qey) {
1322 Some(val) => val.try_into_cgs(),
1323 None => {
1324 let beg = crate::key::database::cg::prefix(ns, db)?;
1325 let end = crate::key::database::cg::suffix(ns, db)?;
1326 let val = self.getr(beg..end, None).await?;
1327 let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
1328 let entry = cache::tx::Entry::Cgs(val.clone());
1329 self.cache.insert(qey, entry);
1330 Ok(val)
1331 }
1332 }
1333 }
1334
1335 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1337 async fn get_db_model(
1338 &self,
1339 ns: NamespaceId,
1340 db: DatabaseId,
1341 ml: &str,
1342 vn: &str,
1343 ) -> Result<Option<Arc<catalog::MlModelDefinition>>> {
1344 let qey = cache::tx::Lookup::Ml(ns, db, ml, vn);
1345 match self.cache.get(&qey) {
1346 Some(val) => val.try_into_type().map(Some),
1347 None => {
1348 let key = crate::key::database::ml::new(ns, db, ml, vn);
1349 let Some(val) = self.get(&key, None).await? else {
1350 return Ok(None);
1351 };
1352 let val = Arc::new(val);
1353 let entry = cache::tx::Entry::Any(val.clone());
1354 self.cache.insert(qey, entry);
1355 Ok(Some(val))
1356 }
1357 }
1358 }
1359
1360 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1362 async fn get_db_analyzer(
1363 &self,
1364 ns: NamespaceId,
1365 db: DatabaseId,
1366 az: &str,
1367 ) -> Result<Arc<catalog::AnalyzerDefinition>> {
1368 let qey = cache::tx::Lookup::Az(ns, db, az);
1369 match self.cache.get(&qey) {
1370 Some(val) => val.try_into_type(),
1371 None => {
1372 let key = crate::key::database::az::new(ns, db, az);
1373 let val = self.get(&key, None).await?.ok_or_else(|| Error::AzNotFound {
1374 name: az.to_owned(),
1375 })?;
1376 let val = Arc::new(val);
1377 let entry = cache::tx::Entry::Any(val.clone());
1378 self.cache.insert(qey, entry);
1379 Ok(val)
1380 }
1381 }
1382 }
1383
1384 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1386 async fn get_db_sequence(
1387 &self,
1388 ns: NamespaceId,
1389 db: DatabaseId,
1390 sq: &str,
1391 ) -> Result<Arc<catalog::SequenceDefinition>> {
1392 let qey = cache::tx::Lookup::Sq(ns, db, sq);
1393 match self.cache.get(&qey) {
1394 Some(val) => val.try_into_type(),
1395 None => {
1396 let key = Sq::new(ns, db, sq);
1397 let val = self.get(&key, None).await?.ok_or_else(|| Error::SeqNotFound {
1398 name: sq.to_owned(),
1399 })?;
1400 let val = Arc::new(val);
1401 let entry = cache::tx::Entry::Any(val.clone());
1402 self.cache.insert(qey, entry);
1403 Ok(val)
1404 }
1405 }
1406 }
1407
1408 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1410 async fn get_db_function(
1411 &self,
1412 ns: NamespaceId,
1413 db: DatabaseId,
1414 fc: &str,
1415 ) -> Result<Arc<catalog::FunctionDefinition>> {
1416 let qey = cache::tx::Lookup::Fc(ns, db, fc);
1417 match self.cache.get(&qey) {
1418 Some(val) => val.try_into_type(),
1419 None => {
1420 let key = crate::key::database::fc::new(ns, db, fc);
1421 let val = self.get(&key, None).await?.ok_or_else(|| Error::FcNotFound {
1422 name: fc.to_owned(),
1423 })?;
1424 let val = Arc::new(val);
1425 let entry = cache::tx::Entry::Any(val.clone());
1426 self.cache.insert(qey, entry);
1427 Ok(val)
1428 }
1429 }
1430 }
1431
1432 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1434 async fn get_db_module(
1435 &self,
1436 ns: NamespaceId,
1437 db: DatabaseId,
1438 md: &str,
1439 ) -> Result<Arc<catalog::ModuleDefinition>> {
1440 let qey = cache::tx::Lookup::Md(ns, db, md);
1441 match self.cache.get(&qey) {
1442 Some(val) => val.try_into_type(),
1443 None => {
1444 let key = crate::key::database::md::new(ns, db, md);
1445 let val = self.get(&key, None).await?.ok_or_else(|| Error::MdNotFound {
1446 name: md.to_owned(),
1447 })?;
1448 let val = Arc::new(val);
1449 let entry = cache::tx::Entry::Any(val.clone());
1450 self.cache.insert(qey, entry);
1451 Ok(val)
1452 }
1453 }
1454 }
1455
1456 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1458 async fn get_db_param(
1459 &self,
1460 ns: NamespaceId,
1461 db: DatabaseId,
1462 pa: &str,
1463 ) -> Result<Arc<catalog::ParamDefinition>> {
1464 let qey = cache::tx::Lookup::Pa(ns, db, pa);
1465 match self.cache.get(&qey) {
1466 Some(val) => val.try_into_type(),
1467 None => {
1468 let key = crate::key::database::pa::new(ns, db, pa);
1469 let val = self.get(&key, None).await?.ok_or_else(|| Error::PaNotFound {
1470 name: pa.to_owned(),
1471 })?;
1472 let val = Arc::new(val);
1473 let entry = cache::tx::Entry::Any(val.clone());
1474 self.cache.insert(qey, entry);
1475 Ok(val)
1476 }
1477 }
1478 }
1479
1480 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1482 async fn get_db_config(
1483 &self,
1484 ns: NamespaceId,
1485 db: DatabaseId,
1486 cg: &str,
1487 ) -> Result<Option<Arc<ConfigDefinition>>> {
1488 let qey = cache::tx::Lookup::Cg(ns, db, cg);
1489 match self.cache.get(&qey) {
1490 Some(val) => val.try_into_type().map(Option::Some),
1491 None => {
1492 let key = crate::key::database::cg::new(ns, db, cg);
1493 if let Some(val) = self.get(&key, None).await? {
1494 let val = Arc::new(val);
1495 let entr = cache::tx::Entry::Any(val.clone());
1496 self.cache.insert(qey, entr);
1497 Ok(Some(val))
1498 } else {
1499 Ok(None)
1500 }
1501 }
1502 }
1503 }
1504
1505 async fn put_db_function(
1506 &self,
1507 ns: NamespaceId,
1508 db: DatabaseId,
1509 fc: &catalog::FunctionDefinition,
1510 ) -> Result<()> {
1511 let key = crate::key::database::fc::new(ns, db, &fc.name);
1512 self.set(&key, fc, None).await?;
1513
1514 let list_key = cache::tx::Lookup::Fcs(ns, db);
1516 self.cache.remove(list_key);
1517
1518 let qey = cache::tx::Lookup::Fc(ns, db, &fc.name);
1520 let entry = cache::tx::Entry::Any(Arc::new(fc.clone()));
1521 self.cache.insert(qey, entry);
1522
1523 Ok(())
1524 }
1525
1526 async fn put_db_module(
1527 &self,
1528 ns: NamespaceId,
1529 db: DatabaseId,
1530 md: &catalog::ModuleDefinition,
1531 ) -> Result<()> {
1532 let name = md.get_storage_name()?;
1533 let key = crate::key::database::md::new(ns, db, &name);
1534 self.set(&key, md, None).await?;
1535
1536 let list_key = cache::tx::Lookup::Mds(ns, db);
1538 self.cache.remove(list_key);
1539
1540 let qey = cache::tx::Lookup::Md(ns, db, &name);
1542 let entry = cache::tx::Entry::Any(Arc::new(md.clone()));
1543 self.cache.insert(qey, entry);
1544
1545 Ok(())
1546 }
1547
1548 async fn put_db_param(
1549 &self,
1550 ns: NamespaceId,
1551 db: DatabaseId,
1552 pa: &catalog::ParamDefinition,
1553 ) -> Result<()> {
1554 let key = crate::key::database::pa::new(ns, db, &pa.name);
1555 self.set(&key, pa, None).await?;
1556
1557 let list_key = cache::tx::Lookup::Pas(ns, db);
1559 self.cache.remove(list_key);
1560
1561 let qey = cache::tx::Lookup::Pa(ns, db, &pa.name);
1563 let entry = cache::tx::Entry::Any(Arc::new(pa.clone()));
1564 self.cache.insert(qey, entry);
1565
1566 Ok(())
1567 }
1568}
1569
1570#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
1571#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
1572impl TableProvider for Transaction {
1573 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1575 async fn all_tb(
1576 &self,
1577 ns: NamespaceId,
1578 db: DatabaseId,
1579 version: Option<u64>,
1580 ) -> Result<Arc<[TableDefinition]>> {
1581 let qey = cache::tx::Lookup::Tbs(ns, db);
1582 match self.cache.get(&qey) {
1583 Some(val) => val.try_into_tbs(),
1584 None => {
1585 let beg = crate::key::database::tb::prefix(ns, db)?;
1586 let end = crate::key::database::tb::suffix(ns, db)?;
1587 let val = self.getr(beg..end, version).await?;
1588 let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
1589 let entry = cache::tx::Entry::Tbs(val.clone());
1590 self.cache.insert(qey, entry);
1591 Ok(val)
1592 }
1593 }
1594 }
1595
1596 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1598 async fn all_tb_views(
1599 &self,
1600 ns: NamespaceId,
1601 db: DatabaseId,
1602 tb: &TableName,
1603 ) -> Result<Arc<[catalog::TableDefinition]>> {
1604 let qey = cache::tx::Lookup::Fts(ns, db, tb);
1605 match self.cache.get(&qey) {
1606 Some(val) => val.try_into_fts(),
1607 None => {
1608 let beg = crate::key::table::ft::prefix(ns, db, tb)?;
1609 let end = crate::key::table::ft::suffix(ns, db, tb)?;
1610 let val = self.getr(beg..end, None).await?;
1611 let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
1612 let entry = cache::tx::Entry::Fts(val.clone());
1613 self.cache.insert(qey, entry);
1614 Ok(val)
1615 }
1616 }
1617 }
1618
1619 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self, ctx))]
1622 async fn get_or_add_tb(
1623 &self,
1624 ctx: Option<&Context>,
1625 ns: &str,
1626 db: &str,
1627 tb: &TableName,
1628 ) -> Result<Arc<TableDefinition>> {
1629 let qey = cache::tx::Lookup::TbByName(ns, db, tb);
1630 match self.cache.get(&qey) {
1631 Some(val) => val.try_into_type(),
1633 None => {
1635 let Some(db_def) = self.get_db_by_name(ns, db).await? else {
1636 return Err(anyhow::anyhow!(Error::DbNotFound {
1637 name: db.to_owned(),
1638 }));
1639 };
1640
1641 let table_key =
1642 crate::key::database::tb::new(db_def.namespace_id, db_def.database_id, tb);
1643 if let Some(tb_def) = self.get(&table_key, None).await? {
1644 let cached_tb = Arc::new(tb_def);
1645 let cached_entry =
1646 cache::tx::Entry::Any(Arc::clone(&cached_tb) as Arc<dyn Any + Send + Sync>);
1647 self.cache.insert(qey, cached_entry);
1648 return Ok(cached_tb);
1649 }
1650
1651 if db_def.strict {
1652 return Err(Error::TbNotFound {
1653 name: tb.to_owned(),
1654 }
1655 .into());
1656 }
1657
1658 let tb_def = TableDefinition::new(
1659 db_def.namespace_id,
1660 db_def.database_id,
1661 self.get_next_tb_id(ctx, db_def.namespace_id, db_def.database_id).await?,
1662 tb.clone(),
1663 );
1664 self.put_tb(ns, db, &tb_def).await
1665 }
1666 }
1667 }
1668
1669 async fn get_tb_by_name(
1670 &self,
1671 ns: &str,
1672 db: &str,
1673 tb: &TableName,
1674 ) -> Result<Option<Arc<TableDefinition>>> {
1675 let qey = cache::tx::Lookup::TbByName(ns, db, tb);
1676 match self.cache.get(&qey) {
1677 Some(val) => val.try_into_type().map(Some),
1678 None => {
1679 let Some(db) = self.get_db_by_name(ns, db).await? else {
1680 return Ok(None);
1681 };
1682
1683 let key = crate::key::database::tb::new(db.namespace_id, db.database_id, tb);
1684 let Some(tb) = self.get(&key, None).await? else {
1685 return Ok(None);
1686 };
1687
1688 let tb = Arc::new(tb);
1689 let entr = cache::tx::Entry::Any(tb.clone());
1690 self.cache.insert(qey, entr);
1691 Ok(Some(tb))
1692 }
1693 }
1694 }
1695
1696 async fn put_tb(
1697 &self,
1698 ns: &str,
1699 db: &str,
1700 tb: &TableDefinition,
1701 ) -> Result<Arc<TableDefinition>> {
1702 let key = crate::key::database::tb::new(tb.namespace_id, tb.database_id, &tb.name);
1703 match self.set(&key, tb, None).await {
1704 Ok(_) => {}
1705 Err(e) => {
1706 if matches!(
1707 e.downcast_ref(),
1708 Some(Error::Kvs(crate::kvs::Error::TransactionReadonly))
1709 ) {
1710 return Err(Error::TbNotFound {
1711 name: tb.name.clone(),
1712 }
1713 .into());
1714 }
1715 return Err(e);
1716 }
1717 }
1718
1719 let list_key = cache::tx::Lookup::Tbs(tb.namespace_id, tb.database_id);
1721 self.cache.remove(list_key);
1722
1723 let cached_tb = Arc::new(tb.clone());
1725 let cached_entry =
1726 cache::tx::Entry::Any(Arc::clone(&cached_tb) as Arc<dyn Any + Send + Sync>);
1727
1728 let qey = cache::tx::Lookup::Tb(tb.namespace_id, tb.database_id, &tb.name);
1729 self.cache.insert(qey, cached_entry.clone());
1730
1731 let qey = cache::tx::Lookup::TbByName(ns, db, &tb.name);
1732 self.cache.insert(qey, cached_entry);
1733
1734 Ok(cached_tb)
1735 }
1736
1737 async fn del_tb(&self, ns: &str, db: &str, tb: &TableName) -> Result<()> {
1738 let Some(tb) = self.get_tb_by_name(ns, db, tb).await? else {
1739 return Err(Error::TbNotFound {
1740 name: tb.clone(),
1741 }
1742 .into());
1743 };
1744
1745 let key = crate::key::database::tb::new(tb.namespace_id, tb.database_id, &tb.name);
1746 self.del(&key).await?;
1747
1748 let list_key = cache::tx::Lookup::Tbs(tb.namespace_id, tb.database_id);
1750 self.cache.remove(list_key);
1751
1752 let qey = cache::tx::Lookup::Tb(tb.namespace_id, tb.database_id, &tb.name);
1754 self.cache.remove(qey);
1755 let qey = cache::tx::Lookup::TbByName(ns, db, &tb.name);
1756 self.cache.remove(qey);
1757
1758 Ok(())
1759 }
1760
1761 async fn clr_tb(&self, ns: &str, db: &str, tb: &TableName) -> Result<()> {
1762 let Some(tb) = self.get_tb_by_name(ns, db, tb).await? else {
1763 return Err(Error::TbNotFound {
1764 name: tb.clone(),
1765 }
1766 .into());
1767 };
1768
1769 let key = crate::key::database::tb::new(tb.namespace_id, tb.database_id, &tb.name);
1770 self.clr(&key).await?;
1771
1772 let list_key = cache::tx::Lookup::Tbs(tb.namespace_id, tb.database_id);
1774 self.cache.remove(list_key);
1775
1776 let qey = cache::tx::Lookup::Tb(tb.namespace_id, tb.database_id, &tb.name);
1778 self.cache.remove(qey);
1779 let qey = cache::tx::Lookup::TbByName(ns, db, &tb.name);
1780 self.cache.remove(qey);
1781
1782 Ok(())
1783 }
1784
1785 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1787 async fn all_tb_events(
1788 &self,
1789 ns: NamespaceId,
1790 db: DatabaseId,
1791 tb: &TableName,
1792 ) -> Result<Arc<[catalog::EventDefinition]>> {
1793 let qey = cache::tx::Lookup::Evs(ns, db, tb);
1794 match self.cache.get(&qey) {
1795 Some(val) => val.try_into_evs(),
1796 None => {
1797 let beg = crate::key::table::ev::prefix(ns, db, tb)?;
1798 let end = crate::key::table::ev::suffix(ns, db, tb)?;
1799 let val = self.getr(beg..end, None).await?;
1800 let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
1801 let entry = cache::tx::Entry::Evs(val.clone());
1802 self.cache.insert(qey, entry);
1803 Ok(val)
1804 }
1805 }
1806 }
1807
1808 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1810 async fn all_tb_fields(
1811 &self,
1812 ns: NamespaceId,
1813 db: DatabaseId,
1814 tb: &TableName,
1815 version: Option<u64>,
1816 ) -> Result<Arc<[catalog::FieldDefinition]>> {
1817 let qey = cache::tx::Lookup::Fds(ns, db, tb);
1818 match self.cache.get(&qey) {
1819 Some(val) => val.try_into_fds(),
1820 None => {
1821 let beg = crate::key::table::fd::prefix(ns, db, tb)?;
1822 let end = crate::key::table::fd::suffix(ns, db, tb)?;
1823 let val = self.getr(beg..end, version).await?;
1824 let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
1825 let entry = cache::tx::Entry::Fds(val.clone());
1826 self.cache.insert(qey, entry);
1827 Ok(val)
1828 }
1829 }
1830 }
1831
1832 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1834 async fn all_tb_indexes(
1835 &self,
1836 ns: NamespaceId,
1837 db: DatabaseId,
1838 tb: &TableName,
1839 ) -> Result<Arc<[catalog::IndexDefinition]>> {
1840 let qey = cache::tx::Lookup::Ixs(ns, db, tb);
1841 match self.cache.get(&qey) {
1842 Some(val) => val.try_into_ixs(),
1843 None => {
1844 let beg = crate::key::table::ix::prefix(ns, db, tb)?;
1845 let end = crate::key::table::ix::suffix(ns, db, tb)?;
1846 let val = self.getr(beg..end, None).await?;
1847 let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
1848 let entry = cache::tx::Entry::Ixs(val.clone());
1849 self.cache.insert(qey, entry);
1850 Ok(val)
1851 }
1852 }
1853 }
1854
1855 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1857 async fn all_tb_lives(
1858 &self,
1859 ns: NamespaceId,
1860 db: DatabaseId,
1861 tb: &TableName,
1862 ) -> Result<Arc<[catalog::SubscriptionDefinition]>> {
1863 let qey = cache::tx::Lookup::Lvs(ns, db, tb);
1864 match self.cache.get(&qey) {
1865 Some(val) => val.try_into_lvs(),
1866 None => {
1867 let beg = crate::key::table::lq::prefix(ns, db, tb)?;
1868 let end = crate::key::table::lq::suffix(ns, db, tb)?;
1869 let val = self.getr(beg..end, None).await?;
1870 let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
1871 let entry = cache::tx::Entry::Lvs(val.clone());
1872 self.cache.insert(qey, entry);
1873 Ok(val)
1874 }
1875 }
1876 }
1877
1878 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1880 async fn get_tb(
1881 &self,
1882 ns: NamespaceId,
1883 db: DatabaseId,
1884 tb: &TableName,
1885 ) -> Result<Option<Arc<TableDefinition>>> {
1886 let qey = cache::tx::Lookup::Tb(ns, db, tb);
1887 match self.cache.get(&qey) {
1888 Some(val) => val.try_into_type().map(Some),
1889 None => {
1890 let key = crate::key::database::tb::new(ns, db, tb);
1891 let Some(val) = self.get(&key, None).await? else {
1892 return Ok(None);
1893 };
1894 let val = Arc::new(val);
1895 let entry = cache::tx::Entry::Any(val.clone());
1896 self.cache.insert(qey, entry);
1897 Ok(Some(val))
1898 }
1899 }
1900 }
1901
1902 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1904 async fn get_tb_event(
1905 &self,
1906 ns: NamespaceId,
1907 db: DatabaseId,
1908 tb: &TableName,
1909 ev: &str,
1910 ) -> Result<Arc<catalog::EventDefinition>> {
1911 let qey = cache::tx::Lookup::Ev(ns, db, tb, ev);
1912 match self.cache.get(&qey) {
1913 Some(val) => val.try_into_type(),
1914 None => {
1915 let key = crate::key::table::ev::new(ns, db, tb, ev);
1916 let val = self.get(&key, None).await?.ok_or_else(|| Error::EvNotFound {
1917 name: ev.to_owned(),
1918 })?;
1919 let val = Arc::new(val);
1920 let entry = cache::tx::Entry::Any(val.clone());
1921 self.cache.insert(qey, entry);
1922 Ok(val)
1923 }
1924 }
1925 }
1926
1927 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1929 async fn get_tb_field(
1930 &self,
1931 ns: NamespaceId,
1932 db: DatabaseId,
1933 tb: &TableName,
1934 fd: &str,
1935 ) -> Result<Option<Arc<catalog::FieldDefinition>>> {
1936 let qey = cache::tx::Lookup::Fd(ns, db, tb, fd);
1937 match self.cache.get(&qey) {
1938 Some(val) => val.try_into_type().map(Some),
1939 None => {
1940 let key = crate::key::table::fd::new(ns, db, tb, fd);
1941 let Some(val) = self.get(&key, None).await? else {
1942 return Ok(None);
1943 };
1944 let val = Arc::new(val);
1945 let entry = cache::tx::Entry::Any(val.clone());
1946 self.cache.insert(qey, entry);
1947 Ok(Some(val))
1948 }
1949 }
1950 }
1951
1952 async fn put_tb_field(
1953 &self,
1954 ns: NamespaceId,
1955 db: DatabaseId,
1956 tb: &TableName,
1957 fd: &catalog::FieldDefinition,
1958 ) -> Result<()> {
1959 let name = fd.name.to_raw_string();
1960 let key = crate::key::table::fd::new(ns, db, tb, &name);
1961 self.set(&key, fd, None).await?;
1962
1963 let list_key = cache::tx::Lookup::Fds(ns, db, tb.as_ref());
1965 self.cache.remove(list_key);
1966
1967 let qey = cache::tx::Lookup::Fd(ns, db, tb, &name);
1969 let entry = cache::tx::Entry::Any(Arc::new(fd.clone()));
1970 self.cache.insert(qey, entry);
1971 Ok(())
1972 }
1973
1974 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1976 async fn get_tb_index(
1977 &self,
1978 ns: NamespaceId,
1979 db: DatabaseId,
1980 tb: &TableName,
1981 ix: &str,
1982 ) -> Result<Option<Arc<catalog::IndexDefinition>>> {
1983 let qey = cache::tx::Lookup::Ix(ns, db, tb, ix);
1984 match self.cache.get(&qey) {
1985 Some(val) => val.try_into_type().map(Some),
1986 None => {
1987 let key = crate::key::table::ix::new(ns, db, tb, ix);
1988 let Some(val) = self.get(&key, None).await? else {
1989 return Ok(None);
1990 };
1991 let val = Arc::new(val);
1992 let entry = cache::tx::Entry::Any(val.clone());
1993 self.cache.insert(qey, entry);
1994 Ok(Some(val))
1995 }
1996 }
1997 }
1998
1999 async fn get_tb_index_by_id(
2000 &self,
2001 ns: NamespaceId,
2002 db: DatabaseId,
2003 tb: &TableName,
2004 ix: IndexId,
2005 ) -> Result<Option<Arc<catalog::IndexDefinition>>> {
2006 let key = crate::key::table::ix::IndexNameLookupKey::new(ns, db, tb, ix);
2007 let Some(index_name) = self.get(&key, None).await? else {
2008 return Ok(None);
2009 };
2010
2011 self.get_tb_index(ns, db, tb, &index_name).await
2012 }
2013
2014 async fn put_tb_index(
2015 &self,
2016 ns: NamespaceId,
2017 db: DatabaseId,
2018 tb: &TableName,
2019 ix: &catalog::IndexDefinition,
2020 ) -> Result<()> {
2021 let key = crate::key::table::ix::new(ns, db, tb, &ix.name);
2022 self.set(&key, ix, None).await?;
2023
2024 let name_lookup_key =
2025 crate::key::table::ix::IndexNameLookupKey::new(ns, db, tb, ix.index_id);
2026 self.set(&name_lookup_key, &ix.name, None).await?;
2027
2028 let list_key = cache::tx::Lookup::Ixs(ns, db, tb.as_ref());
2030 self.cache.remove(list_key);
2031
2032 let qey = cache::tx::Lookup::Ix(ns, db, tb, &ix.name);
2034 let entry = cache::tx::Entry::Any(Arc::new(ix.clone()));
2035 self.cache.insert(qey, entry);
2036 Ok(())
2037 }
2038
2039 async fn del_tb_index(
2040 &self,
2041 ns: NamespaceId,
2042 db: DatabaseId,
2043 tb: &TableName,
2044 ix: &str,
2045 ) -> Result<()> {
2046 let Some(ix) = self.get_tb_index(ns, db, tb, ix).await? else {
2048 return Ok(());
2049 };
2050
2051 let key = crate::key::index::all::new(ns, db, tb, ix.index_id);
2053 self.delp(&key).await?;
2054
2055 let key = crate::key::table::ix::new(ns, db, tb, &ix.name);
2057 self.del(&key).await?;
2058
2059 let list_key = cache::tx::Lookup::Ixs(ns, db, tb.as_ref());
2061 self.cache.remove(list_key);
2062
2063 let index_key = cache::tx::Lookup::Ix(ns, db, tb.as_ref(), &ix.name);
2065 self.cache.remove(index_key);
2066
2067 Ok(())
2068 }
2069
2070 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2074 async fn get_record(
2075 &self,
2076 ns: NamespaceId,
2077 db: DatabaseId,
2078 tb: &TableName,
2079 id: &RecordIdKey,
2080 version: Option<u64>,
2081 ) -> Result<Arc<Record>> {
2082 if version.is_some() {
2084 let key = crate::key::record::new(ns, db, tb, id);
2086 match self.get(&key, version).await? {
2087 Some(mut record) => {
2089 let rid = RecordId {
2091 table: tb.to_owned(),
2092 key: id.clone(),
2093 };
2094 record.data.def(rid);
2095 Ok(record.into_read_only())
2097 }
2098 None => Ok(Arc::new(Default::default())),
2100 }
2101 } else {
2102 let qey = cache::tx::Lookup::Record(ns, db, tb, id);
2103 match self.cache.get(&qey) {
2104 Some(val) => val.try_into_record(),
2106 None => {
2108 let key = crate::key::record::new(ns, db, tb, id);
2110 match self.get(&key, None).await? {
2111 Some(mut record) => {
2113 let rid = RecordId {
2115 table: tb.to_owned(),
2116 key: id.clone(),
2117 };
2118 record.data.def(rid);
2119 let record = record.into_read_only();
2121 let entry = cache::tx::Entry::Val(record.clone());
2122 self.cache.insert(qey, entry);
2123 Ok(record)
2124 }
2125 None => Ok(Arc::new(Default::default())),
2127 }
2128 }
2129 }
2130 }
2131 }
2132
2133 async fn record_exists(
2134 &self,
2135 ns: NamespaceId,
2136 db: DatabaseId,
2137 tb: &TableName,
2138 id: &RecordIdKey,
2139 ) -> Result<bool> {
2140 let key = crate::key::record::new(ns, db, tb, id);
2141 Ok(self.exists(&key, None).await?)
2142 }
2143
2144 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2145 async fn put_record(
2146 &self,
2147 ns: NamespaceId,
2148 db: DatabaseId,
2149 tb: &TableName,
2150 id: &RecordIdKey,
2151 record: Arc<Record>,
2152 version: Option<u64>,
2153 ) -> Result<()> {
2154 let key = crate::key::record::new(ns, db, tb, id);
2155 self.put(&key, &record, version).await?;
2156 self.set_record_cache(ns, db, tb, id, record);
2157 Ok(())
2158 }
2159
2160 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2161 async fn set_record(
2162 &self,
2163 ns: NamespaceId,
2164 db: DatabaseId,
2165 tb: &TableName,
2166 id: &RecordIdKey,
2167 record: Arc<Record>,
2168 version: Option<u64>,
2169 ) -> Result<()> {
2170 let key = crate::key::record::new(ns, db, tb, id);
2172 self.set(&key, &record, version).await?;
2173 self.set_record_cache(ns, db, tb, id, record);
2175 Ok(())
2177 }
2178
2179 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2180 async fn del_record(
2181 &self,
2182 ns: NamespaceId,
2183 db: DatabaseId,
2184 tb: &TableName,
2185 id: &RecordIdKey,
2186 ) -> Result<()> {
2187 let key = crate::key::record::new(ns, db, tb, id);
2189 self.del(&key).await?;
2190 let qey = cache::tx::Lookup::Record(ns, db, tb, id);
2192 self.cache.remove(qey);
2193 Ok(())
2195 }
2196
2197 async fn get_next_tb_id(
2198 &self,
2199 ctx: Option<&Context>,
2200 ns: NamespaceId,
2201 db: DatabaseId,
2202 ) -> Result<TableId> {
2203 self.sequences.next_table_id(ctx, ns, db).await
2204 }
2205}
2206
2207#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
2208#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
2209impl UserProvider for Transaction {
2210 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2212 async fn all_root_users(&self) -> Result<Arc<[catalog::UserDefinition]>> {
2213 let qey = cache::tx::Lookup::Rus;
2214 match self.cache.get(&qey) {
2215 Some(val) => val.try_into_rus(),
2216 None => {
2217 let beg = crate::key::root::us::prefix();
2218 let end = crate::key::root::us::suffix();
2219 let val = self.getr(beg..end, None).await?;
2220 let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
2221 let entry = cache::tx::Entry::Rus(val.clone());
2222 self.cache.insert(qey, entry);
2223 Ok(val)
2224 }
2225 }
2226 }
2227
2228 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2230 async fn all_ns_users(&self, ns: NamespaceId) -> Result<Arc<[catalog::UserDefinition]>> {
2231 let qey = cache::tx::Lookup::Nus(ns);
2232 match self.cache.get(&qey) {
2233 Some(val) => val.try_into_nus(),
2234 None => {
2235 let beg = crate::key::namespace::us::prefix(ns)?;
2236 let end = crate::key::namespace::us::suffix(ns)?;
2237 let val = self.getr(beg..end, None).await?;
2238 let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
2239 let entry = cache::tx::Entry::Nus(val.clone());
2240 self.cache.insert(qey, entry);
2241 Ok(val)
2242 }
2243 }
2244 }
2245
2246 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2248 async fn all_db_users(
2249 &self,
2250 ns: NamespaceId,
2251 db: DatabaseId,
2252 ) -> Result<Arc<[catalog::UserDefinition]>> {
2253 let qey = cache::tx::Lookup::Dus(ns, db);
2254 match self.cache.get(&qey) {
2255 Some(val) => val.try_into_dus(),
2256 None => {
2257 let beg = crate::key::database::us::prefix(ns, db)?;
2258 let end = crate::key::database::us::suffix(ns, db)?;
2259 let val = self.getr(beg..end, None).await?;
2260 let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
2261 let entry = cache::tx::Entry::Dus(val.clone());
2262 self.cache.insert(qey, entry);
2263 Ok(val)
2264 }
2265 }
2266 }
2267
2268 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2270 async fn get_root_user(&self, us: &str) -> Result<Option<Arc<catalog::UserDefinition>>> {
2271 let qey = cache::tx::Lookup::Ru(us);
2272 match self.cache.get(&qey) {
2273 Some(val) => val.try_into_type().map(Some),
2274 None => {
2275 let key = crate::key::root::us::new(us);
2276 let Some(val) = self.get(&key, None).await? else {
2277 return Ok(None);
2278 };
2279 let val = Arc::new(val);
2280 let entry = cache::tx::Entry::Any(val.clone());
2281 self.cache.insert(qey, entry);
2282 Ok(Some(val))
2283 }
2284 }
2285 }
2286
2287 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2289 async fn get_ns_user(
2290 &self,
2291 ns: NamespaceId,
2292 us: &str,
2293 ) -> Result<Option<Arc<catalog::UserDefinition>>> {
2294 let qey = cache::tx::Lookup::Nu(ns, us);
2295 match self.cache.get(&qey) {
2296 Some(val) => val.try_into_type().map(Some),
2297 None => {
2298 let key = crate::key::namespace::us::new(ns, us);
2299 let Some(val) = self.get(&key, None).await? else {
2300 return Ok(None);
2301 };
2302
2303 let val = Arc::new(val);
2304 let entry = cache::tx::Entry::Any(val.clone());
2305 self.cache.insert(qey, entry);
2306 Ok(Some(val))
2307 }
2308 }
2309 }
2310
2311 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2313 async fn get_db_user(
2314 &self,
2315 ns: NamespaceId,
2316 db: DatabaseId,
2317 us: &str,
2318 ) -> Result<Option<Arc<catalog::UserDefinition>>> {
2319 let qey = cache::tx::Lookup::Du(ns, db, us);
2320 match self.cache.get(&qey) {
2321 Some(val) => val.try_into_type().map(Some),
2322 None => {
2323 let key = crate::key::database::us::new(ns, db, us);
2324 let Some(val) = self.get(&key, None).await? else {
2325 return Ok(None);
2326 };
2327
2328 let val = Arc::new(val);
2329 let entry = cache::tx::Entry::Any(val.clone());
2330 self.cache.insert(qey, entry);
2331 Ok(Some(val))
2332 }
2333 }
2334 }
2335
2336 async fn put_root_user(&self, us: &catalog::UserDefinition) -> Result<()> {
2337 let key = crate::key::root::us::new(&us.name);
2338 self.set(&key, us, None).await?;
2339
2340 let list_key = cache::tx::Lookup::Rus;
2342 self.cache.remove(list_key);
2343
2344 let qey = cache::tx::Lookup::Ru(&us.name);
2346 let entry = cache::tx::Entry::Any(Arc::new(us.clone()));
2347 self.cache.insert(qey, entry);
2348
2349 Ok(())
2350 }
2351
2352 async fn put_ns_user(&self, ns: NamespaceId, us: &catalog::UserDefinition) -> Result<()> {
2353 let key = crate::key::namespace::us::new(ns, &us.name);
2354 self.set(&key, us, None).await?;
2355
2356 let list_key = cache::tx::Lookup::Nus(ns);
2358 self.cache.remove(list_key);
2359
2360 let qey = cache::tx::Lookup::Nu(ns, &us.name);
2362 let entry = cache::tx::Entry::Any(Arc::new(us.clone()));
2363 self.cache.insert(qey, entry);
2364
2365 Ok(())
2366 }
2367
2368 async fn put_db_user(
2369 &self,
2370 ns: NamespaceId,
2371 db: DatabaseId,
2372 us: &catalog::UserDefinition,
2373 ) -> Result<()> {
2374 let key = crate::key::database::us::new(ns, db, &us.name);
2375 self.set(&key, us, None).await?;
2376
2377 let list_key = cache::tx::Lookup::Dus(ns, db);
2379 self.cache.remove(list_key);
2380
2381 let qey = cache::tx::Lookup::Du(ns, db, &us.name);
2383 let entry = cache::tx::Entry::Any(Arc::new(us.clone()));
2384 self.cache.insert(qey, entry);
2385
2386 Ok(())
2387 }
2388}
2389
2390#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
2391#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
2392impl AuthorisationProvider for Transaction {
2393 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2395 async fn all_root_accesses(&self) -> Result<Arc<[catalog::AccessDefinition]>> {
2396 let qey = cache::tx::Lookup::Ras;
2397 match self.cache.get(&qey) {
2398 Some(val) => val.try_into_ras(),
2399 None => {
2400 let beg = crate::key::root::ac::prefix();
2401 let end = crate::key::root::ac::suffix();
2402 let val = self.getr(beg..end, None).await?;
2403 let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
2404 let entry = cache::tx::Entry::Ras(val.clone());
2405 self.cache.insert(qey, entry);
2406 Ok(val)
2407 }
2408 }
2409 }
2410
2411 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2413 async fn all_root_access_grants(&self, ra: &str) -> Result<Arc<[catalog::AccessGrant]>> {
2414 let qey = cache::tx::Lookup::Rgs(ra);
2415 match self.cache.get(&qey) {
2416 Some(val) => val.try_into_rag(),
2417 None => {
2418 let beg = crate::key::root::access::gr::prefix(ra)?;
2419 let end = crate::key::root::access::gr::suffix(ra)?;
2420 let val = self.getr(beg..end, None).await?;
2421 let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
2422 let entry = cache::tx::Entry::Rag(val.clone());
2423 self.cache.insert(qey, entry);
2424 Ok(val)
2425 }
2426 }
2427 }
2428
2429 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2431 async fn all_ns_accesses(&self, ns: NamespaceId) -> Result<Arc<[catalog::AccessDefinition]>> {
2432 let qey = cache::tx::Lookup::Nas(ns);
2433 match self.cache.get(&qey) {
2434 Some(val) => val.try_into_nas(),
2435 None => {
2436 let beg = crate::key::namespace::ac::prefix(ns)?;
2437 let end = crate::key::namespace::ac::suffix(ns)?;
2438 let val = self.getr(beg..end, None).await?;
2439 let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
2440 let entry = cache::tx::Entry::Nas(val.clone());
2441 self.cache.insert(qey, entry);
2442 Ok(val)
2443 }
2444 }
2445 }
2446
2447 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2449 async fn all_ns_access_grants(
2450 &self,
2451 ns: NamespaceId,
2452 na: &str,
2453 ) -> Result<Arc<[catalog::AccessGrant]>> {
2454 let qey = cache::tx::Lookup::Ngs(ns, na);
2455 match self.cache.get(&qey) {
2456 Some(val) => val.try_into_nag(),
2457 None => {
2458 let beg = crate::key::namespace::access::gr::prefix(ns, na)?;
2459 let end = crate::key::namespace::access::gr::suffix(ns, na)?;
2460 let val = self.getr(beg..end, None).await?;
2461 let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
2462 let entry = cache::tx::Entry::Nag(val.clone());
2463 self.cache.insert(qey, entry);
2464 Ok(val)
2465 }
2466 }
2467 }
2468
2469 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2471 async fn all_db_accesses(
2472 &self,
2473 ns: NamespaceId,
2474 db: DatabaseId,
2475 ) -> Result<Arc<[catalog::AccessDefinition]>> {
2476 let qey = cache::tx::Lookup::Das(ns, db);
2477 match self.cache.get(&qey) {
2478 Some(val) => val.try_into_das(),
2479 None => {
2480 let beg = crate::key::database::ac::prefix(ns, db)?;
2481 let end = crate::key::database::ac::suffix(ns, db)?;
2482 let val = self.getr(beg..end, None).await?;
2483 let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
2484 let entry = cache::tx::Entry::Das(val.clone());
2485 self.cache.insert(qey, entry);
2486 Ok(val)
2487 }
2488 }
2489 }
2490
2491 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2493 async fn all_db_access_grants(
2494 &self,
2495 ns: NamespaceId,
2496 db: DatabaseId,
2497 da: &str,
2498 ) -> Result<Arc<[catalog::AccessGrant]>> {
2499 let qey = cache::tx::Lookup::Dgs(ns, db, da);
2500 match self.cache.get(&qey) {
2501 Some(val) => val.try_into_dag(),
2502 None => {
2503 let beg = crate::key::database::access::gr::prefix(ns, db, da)?;
2504 let end = crate::key::database::access::gr::suffix(ns, db, da)?;
2505 let val = self.getr(beg..end, None).await?;
2506 let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
2507 let entry = cache::tx::Entry::Dag(val.clone());
2508 self.cache.insert(qey, entry);
2509 Ok(val)
2510 }
2511 }
2512 }
2513
2514 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2516 async fn get_root_access(&self, ra: &str) -> Result<Option<Arc<catalog::AccessDefinition>>> {
2517 let qey = cache::tx::Lookup::Ra(ra);
2518 match self.cache.get(&qey) {
2519 Some(val) => val.try_into_type().map(Some),
2520 None => {
2521 let key = crate::key::root::ac::new(ra);
2522 let Some(val) = self.get(&key, None).await? else {
2523 return Ok(None);
2524 };
2525 let val = Arc::new(val);
2526 let entry = cache::tx::Entry::Any(val.clone());
2527 self.cache.insert(qey, entry);
2528 Ok(Some(val))
2529 }
2530 }
2531 }
2532
2533 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2535 async fn get_root_access_grant(
2536 &self,
2537 ac: &str,
2538 gr: &str,
2539 ) -> Result<Option<Arc<catalog::AccessGrant>>> {
2540 let qey = cache::tx::Lookup::Rg(ac, gr);
2541 match self.cache.get(&qey) {
2542 Some(val) => val.try_into_type().map(Some),
2543 None => {
2544 let key = crate::key::root::access::gr::new(ac, gr);
2545 let Some(val) = self.get(&key, None).await? else {
2546 return Ok(None);
2547 };
2548 let val = Arc::new(val);
2549 let entry = cache::tx::Entry::Any(val.clone());
2550 self.cache.insert(qey, entry);
2551 Ok(Some(val))
2552 }
2553 }
2554 }
2555
2556 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2558 async fn get_ns_access(
2559 &self,
2560 ns: NamespaceId,
2561 na: &str,
2562 ) -> Result<Option<Arc<catalog::AccessDefinition>>> {
2563 let qey = cache::tx::Lookup::Na(ns, na);
2564 match self.cache.get(&qey) {
2565 Some(val) => val.try_into_type().map(Some),
2566 None => {
2567 let key = crate::key::namespace::ac::new(ns, na);
2568 let Some(val) = self.get(&key, None).await? else {
2569 return Ok(None);
2570 };
2571 let val = Arc::new(val);
2572 let entry = cache::tx::Entry::Any(val.clone());
2573 self.cache.insert(qey, entry);
2574 Ok(Some(val))
2575 }
2576 }
2577 }
2578
2579 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2581 async fn get_ns_access_grant(
2582 &self,
2583 ns: NamespaceId,
2584 ac: &str,
2585 gr: &str,
2586 ) -> Result<Option<Arc<catalog::AccessGrant>>> {
2587 let qey = cache::tx::Lookup::Ng(ns, ac, gr);
2588 match self.cache.get(&qey) {
2589 Some(val) => val.try_into_type().map(Some),
2590 None => {
2591 let key = crate::key::namespace::access::gr::new(ns, ac, gr);
2592 let Some(val) = self.get(&key, None).await? else {
2593 return Ok(None);
2594 };
2595 let val = Arc::new(val);
2596 let entry = cache::tx::Entry::Any(val.clone());
2597 self.cache.insert(qey, entry);
2598 Ok(Some(val))
2599 }
2600 }
2601 }
2602
2603 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2605 async fn get_db_access(
2606 &self,
2607 ns: NamespaceId,
2608 db: DatabaseId,
2609 da: &str,
2610 ) -> Result<Option<Arc<catalog::AccessDefinition>>> {
2611 let qey = cache::tx::Lookup::Da(ns, db, da);
2612 match self.cache.get(&qey) {
2613 Some(val) => val.try_into_type().map(Some),
2614 None => {
2615 let key = crate::key::database::ac::new(ns, db, da);
2616 let Some(val) = self.get(&key, None).await? else {
2617 return Ok(None);
2618 };
2619 let val = Arc::new(val);
2620 let entry = cache::tx::Entry::Any(val.clone());
2621 self.cache.insert(qey, entry);
2622 Ok(Some(val))
2623 }
2624 }
2625 }
2626
2627 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2629 async fn get_db_access_grant(
2630 &self,
2631 ns: NamespaceId,
2632 db: DatabaseId,
2633 ac: &str,
2634 gr: &str,
2635 ) -> Result<Option<Arc<catalog::AccessGrant>>> {
2636 let qey = cache::tx::Lookup::Dg(ns, db, ac, gr);
2637 match self.cache.get(&qey) {
2638 Some(val) => val.try_into_type().map(Some),
2639 None => {
2640 let key = crate::key::database::access::gr::new(ns, db, ac, gr);
2641 let Some(val) = self.get(&key, None).await? else {
2642 return Ok(None);
2643 };
2644 let val = Arc::new(val);
2645 let entry = cache::tx::Entry::Any(val.clone());
2646 self.cache.insert(qey, entry);
2647 Ok(Some(val))
2648 }
2649 }
2650 }
2651
2652 async fn del_root_access(&self, ra: &str) -> Result<()> {
2653 let key = crate::key::root::ac::new(ra);
2655 self.del(&key).await?;
2656 let key = crate::key::root::access::all::new(ra);
2658 self.delp(&key).await?;
2659
2660 let list_key = cache::tx::Lookup::Ras;
2662 self.cache.remove(list_key);
2663
2664 let access_key = cache::tx::Lookup::Ra(ra);
2666 self.cache.remove(access_key);
2667 let grants_key = cache::tx::Lookup::Rgs(ra);
2668 self.cache.remove(grants_key);
2669
2670 Ok(())
2671 }
2672
2673 async fn del_ns_access(&self, ns: NamespaceId, na: &str) -> Result<()> {
2674 let key = crate::key::namespace::ac::new(ns, na);
2676 self.del(&key).await?;
2677 let key = crate::key::namespace::access::all::new(ns, na);
2679 self.delp(&key).await?;
2680
2681 let list_key = cache::tx::Lookup::Nas(ns);
2683 self.cache.remove(list_key);
2684
2685 let access_key = cache::tx::Lookup::Na(ns, na);
2687 self.cache.remove(access_key);
2688 let grants_key = cache::tx::Lookup::Ngs(ns, na);
2689 self.cache.remove(grants_key);
2690
2691 Ok(())
2692 }
2693
2694 async fn del_db_access(&self, ns: NamespaceId, db: DatabaseId, da: &str) -> Result<()> {
2695 let key = crate::key::database::ac::new(ns, db, da);
2697 self.del(&key).await?;
2698 let key = crate::key::database::access::all::new(ns, db, da);
2700 self.delp(&key).await?;
2701
2702 let list_key = cache::tx::Lookup::Das(ns, db);
2704 self.cache.remove(list_key);
2705
2706 let access_key = cache::tx::Lookup::Da(ns, db, da);
2708 self.cache.remove(access_key);
2709 let grants_key = cache::tx::Lookup::Dgs(ns, db, da);
2710 self.cache.remove(grants_key);
2711
2712 Ok(())
2713 }
2714}
2715
2716#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
2717#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
2718impl ApiProvider for Transaction {
2719 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2721 async fn all_db_apis(&self, ns: NamespaceId, db: DatabaseId) -> Result<Arc<[ApiDefinition]>> {
2722 let qey = cache::tx::Lookup::Aps(ns, db);
2723 match self.cache.get(&qey) {
2724 Some(val) => val,
2725 None => {
2726 let beg = crate::key::database::ap::prefix(ns, db)?;
2727 let end = crate::key::database::ap::suffix(ns, db)?;
2728 let val = self.getr(beg..end, None).await?;
2729 let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
2730 let val = cache::tx::Entry::Aps(Arc::clone(&val));
2731 self.cache.insert(qey, val.clone());
2732 val
2733 }
2734 }
2735 .try_into_aps()
2736 }
2737
2738 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2740 async fn get_db_api(
2741 &self,
2742 ns: NamespaceId,
2743 db: DatabaseId,
2744 ap: &str,
2745 ) -> Result<Option<Arc<ApiDefinition>>> {
2746 let qey = cache::tx::Lookup::Ap(ns, db, ap);
2747 match self.cache.get(&qey) {
2748 Some(val) => val.try_into_type().map(Some),
2749 None => {
2750 let key = crate::key::database::ap::new(ns, db, ap);
2751 let Some(val) = self.get(&key, None).await? else {
2752 return Ok(None);
2753 };
2754 let val = Arc::new(val);
2755 let entry = cache::tx::Entry::Any(val.clone());
2756 self.cache.insert(qey, entry);
2757 Ok(Some(val))
2758 }
2759 }
2760 }
2761
2762 async fn put_db_api(&self, ns: NamespaceId, db: DatabaseId, ap: &ApiDefinition) -> Result<()> {
2763 let name = ap.path.to_string();
2764 let key = crate::key::database::ap::new(ns, db, &name);
2765 self.set(&key, ap, None).await?;
2766
2767 let list_key = cache::tx::Lookup::Aps(ns, db);
2769 self.cache.remove(list_key);
2770
2771 let qey = cache::tx::Lookup::Ap(ns, db, &name);
2773 let entry = cache::tx::Entry::Any(Arc::new(ap.clone()));
2774 self.cache.insert(qey, entry);
2775
2776 Ok(())
2777 }
2778}
2779
2780#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
2781#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
2782impl BucketProvider for Transaction {
2783 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2785 async fn all_db_buckets(
2786 &self,
2787 ns: NamespaceId,
2788 db: DatabaseId,
2789 ) -> Result<Arc<[catalog::BucketDefinition]>> {
2790 let qey = cache::tx::Lookup::Bus(ns, db);
2791 match self.cache.get(&qey) {
2792 Some(val) => val.try_into_bus(),
2793 None => {
2794 let beg = crate::key::database::bu::prefix(ns, db)?;
2795 let end = crate::key::database::bu::suffix(ns, db)?;
2796 let val = self.getr(beg..end, None).await?;
2797 let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
2798 let entry = cache::tx::Entry::Bus(val.clone());
2799 self.cache.insert(qey, entry);
2800 Ok(val)
2801 }
2802 }
2803 }
2804
2805 #[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2807 async fn get_db_bucket(
2808 &self,
2809 ns: NamespaceId,
2810 db: DatabaseId,
2811 bu: &str,
2812 ) -> Result<Option<Arc<catalog::BucketDefinition>>> {
2813 let qey = cache::tx::Lookup::Bu(ns, db, bu);
2814 match self.cache.get(&qey) {
2815 Some(val) => val.try_into_type().map(Some),
2816 None => {
2817 let key = crate::key::database::bu::new(ns, db, bu);
2818 let Some(val) = self.get(&key, None).await? else {
2819 return Ok(None);
2820 };
2821 let bucket_def = Arc::new(val);
2822 let entr = cache::tx::Entry::Any(bucket_def.clone());
2823 self.cache.insert(qey, entr);
2824 Ok(Some(bucket_def))
2825 }
2826 }
2827 }
2828}
2829
2830impl CatalogProvider for Transaction {}