Skip to main content

surrealdb_core/kvs/
tx.rs

1use std::any::Any;
2use std::collections::HashMap;
3use std::fmt::Debug;
4use std::ops::{Deref, Range};
5use std::sync::Arc;
6use std::sync::atomic::{AtomicBool, Ordering};
7
8use anyhow::Result;
9use futures::TryStreamExt;
10use futures::future::try_join_all;
11use futures::stream::Stream;
12use tokio::sync::{Mutex, MutexGuard, Notify};
13use uuid::Uuid;
14
15use super::batch::Batch;
16use super::{Key, Val, util};
17use crate::catalog::providers::{
18	ApiProvider, AuthorisationProvider, BucketProvider, CatalogProvider, DatabaseProvider,
19	NamespaceProvider, NodeProvider, RootProvider, TableProvider, UserProvider,
20};
21use crate::catalog::{
22	self, ApiDefinition, ConfigDefinition, DatabaseDefinition, DatabaseId, DefaultConfig, IndexId,
23	NamespaceDefinition, NamespaceId, Record, TableDefinition, TableId,
24};
25use crate::ctx::Context;
26use crate::dbs::node::Node;
27use crate::doc::CursorRecord;
28use crate::err::Error;
29use crate::idx::planner::ScanDirection;
30use crate::key::database::sq::Sq;
31use crate::kvs::cache::tx::TransactionCache;
32use crate::kvs::index::{BatchId, BatchIdsCleanQueue, SharedIndexKey};
33use crate::kvs::scanner::Direction;
34use crate::kvs::sequences::Sequences;
35use crate::kvs::{BoxTimeStamp, BoxTimeStampImpl, KVKey, KVValue, Transactor, cache};
36use crate::val::{RecordId, RecordIdKey, TableName};
37
38/// Controls whether `getm_records` populates the transaction cache on miss.
39///
40/// Point lookups and graph traversals benefit from caching (records are
41/// likely re-accessed within the same transaction). Large sequential scans
42/// (index range scans, full-text hits) read each record once, so populating
43/// the cache wastes time and evicts useful entries.
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub(crate) enum CachePolicy {
46	/// Check cache on read **and** populate on miss.
47	/// Use for point lookups, graph traversal, KNN, and unique-index equality.
48	ReadWrite,
49	/// Check cache on read but **skip** population on miss.
50	/// Use for index range scans, non-unique equality scans, and full-text scans.
51	ReadOnly,
52}
53
54pub struct Transaction {
55	/// Is this is a local datastore transaction?
56	local: bool,
57	/// The underlying transactor
58	tr: Transactor,
59	/// The query cache for this store
60	cache: TransactionCache,
61	/// The sequences for this store
62	sequences: Sequences,
63	/// The changefeed buffer
64	cf: crate::cf::Writer,
65	/// Async event trigger
66	async_event_trigger: Arc<Notify>,
67	/// Do we have to trigger async events after the commit?
68	trigger_async_event: AtomicBool,
69	/// Per index, track the pending append batch for cleanup after rollback (cancel or failed
70	/// commit).
71	pending_index_batches: Mutex<HashMap<SharedIndexKey, (BatchId, BatchIdsCleanQueue)>>,
72}
73
74impl Deref for Transaction {
75	type Target = Transactor;
76
77	fn deref(&self) -> &Self::Target {
78		&self.tr
79	}
80}
81
82impl Transaction {
83	/// Create a new query store
84	pub fn new(
85		local: bool,
86		sequences: Sequences,
87		async_event_trigger: Arc<Notify>,
88		tr: Transactor,
89	) -> Transaction {
90		Transaction {
91			local,
92			tr,
93			cache: TransactionCache::new(),
94			sequences,
95			cf: crate::cf::Writer::new(),
96			async_event_trigger,
97			trigger_async_event: AtomicBool::new(false),
98			pending_index_batches: Mutex::new(HashMap::new()),
99		}
100	}
101
102	pub(super) async fn lock_pending_index_batches<'a>(
103		&'a self,
104	) -> MutexGuard<'a, HashMap<SharedIndexKey, (BatchId, BatchIdsCleanQueue)>> {
105		self.pending_index_batches.lock().await
106	}
107
108	/// Check if the transaction is local or remote
109	pub fn is_local(&self) -> bool {
110		self.local
111	}
112
113	/// Enclose this transaction in an [`Arc`]
114	pub fn enclose(self) -> Arc<Transaction> {
115		Arc::new(self)
116	}
117
118	/// Check if the transaction is finished.
119	///
120	/// If the transaction has been cancelled or committed,
121	/// then this function will return [`true`], and any further
122	/// calls to functions on this transaction will result
123	/// in a [`crate::kvs::Error::TransactionFinished`] error.
124	pub fn closed(&self) -> bool {
125		self.tr.closed()
126	}
127
128	/// Cancel a transaction.
129	///
130	/// This reverses all changes made within the transaction.
131	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
132	pub async fn cancel(&self) -> Result<()> {
133		// Clear any buffered changefeed entries
134		self.cf.clear();
135		// Enqueue pending index batches for deferred cleanup after rollback (cancel or failed
136		// commit).
137		self.cleanup_index_batches().await;
138		// Cancel the transaction
139		Ok(self.tr.cancel().await.map_err(Error::from)?)
140	}
141
142	/// Commit a transaction.
143	///
144	/// This attempts to commit all changes made within the transaction.
145	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
146	pub async fn commit(&self) -> Result<()> {
147		// Store any buffered changefeed entries
148		if let Err(e) = self.store_changes().await {
149			// Cancel the transaction if failure
150			let _ = self.cancel().await;
151			// Return the error
152			return Err(e);
153		}
154		// Commit the transaction
155		if let Err(e) = self.tr.commit().await {
156			// Enqueue pending index batches for deferred cleanup after commit failure.
157			self.cleanup_index_batches().await;
158			anyhow::bail!(e);
159		}
160		if self.trigger_async_event.load(Ordering::Relaxed) {
161			// Notify after commit so queued events are visible to workers.
162			self.async_event_trigger.notify_one();
163		}
164		Ok(())
165	}
166
167	/// Enqueue pending index batches for deferred cleanup after rollback (cancel or failed commit).
168	async fn cleanup_index_batches(&self) {
169		let batches = {
170			let mut pending = self.lock_pending_index_batches().await;
171			std::mem::take(&mut *pending)
172		};
173		for (_, (batch_id, clean_queue)) in batches {
174			// Enqueue batch ids for cleaning
175			clean_queue.lock().await.push(batch_id);
176		}
177	}
178
179	/// Check if a key exists in the datastore.
180	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
181	pub async fn exists<K>(&self, key: &K, version: Option<u64>) -> Result<bool>
182	where
183		K: KVKey + Debug,
184	{
185		let key = key.encode_key()?;
186		Ok(self.tr.exists(key, version).await.map_err(Error::from)?)
187	}
188
189	/// Fetch a key from the datastore.
190	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
191	pub async fn get<K>(&self, key: &K, version: Option<u64>) -> Result<Option<K::ValueType>>
192	where
193		K: KVKey + Debug,
194	{
195		let key = key.encode_key()?;
196		let val = self.tr.get(key, version).await.map_err(Error::from)?;
197		val.map(K::ValueType::kv_decode_value).transpose()
198	}
199
200	/// Retrieve a batch set of keys from the datastore.
201	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
202	pub async fn getm<K>(
203		&self,
204		keys: Vec<K>,
205		version: Option<u64>,
206	) -> Result<Vec<Option<K::ValueType>>>
207	where
208		K: KVKey + Debug,
209	{
210		let keys = keys.iter().map(|k| k.encode_key()).collect::<Result<Vec<_>>>()?;
211		self.tr
212			.getm(keys, version)
213			.await
214			.map_err(Error::from)?
215			.into_iter()
216			.map(|v| match v {
217				Some(v) => K::ValueType::kv_decode_value(v).map(Some),
218				None => Ok(None),
219			})
220			.collect()
221	}
222
223	/// Retrieve a specific prefix of keys from the datastore.
224	///
225	/// This function fetches key-value pairs from the underlying datastore in
226	/// grouped batches.
227	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
228	pub async fn getp<K>(&self, key: &K) -> Result<Vec<(Key, K::ValueType)>>
229	where
230		K: KVKey + Debug,
231	{
232		let key = key.encode_key()?;
233		self.tr
234			.getp(key)
235			.await
236			.map_err(Error::from)?
237			.into_iter()
238			.map(|(k, v)| Ok((k, K::ValueType::kv_decode_value(v)?)))
239			.collect()
240	}
241
242	/// Retrieve a specific range of keys from the datastore.
243	///
244	/// This function fetches key-value pairs from the underlying datastore in
245	/// grouped batches.
246	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
247	pub async fn getr<K>(
248		&self,
249		rng: Range<K>,
250		version: Option<u64>,
251	) -> Result<Vec<(Key, K::ValueType)>>
252	where
253		K: KVKey + Debug,
254	{
255		let beg = rng.start.encode_key()?;
256		let end = rng.end.encode_key()?;
257		self.tr
258			.getr(beg..end, version)
259			.await
260			.map_err(Error::from)?
261			.into_iter()
262			.map(|(k, v)| Ok((k, K::ValueType::kv_decode_value(v)?)))
263			.collect()
264	}
265
266	/// Fetch many records by ID in a single batch, with cache awareness.
267	///
268	/// For each record ID, checks the transaction cache first. Cache misses are
269	/// fetched in one batch via the store's multi-get, then results are merged
270	/// and returned in the same order as `rids`.
271	///
272	/// When `cache_policy` is [`CachePolicy::ReadWrite`], newly fetched records
273	/// are inserted into the cache. When [`CachePolicy::ReadOnly`], fetched
274	/// records are returned but **not** cached, avoiding eviction churn during
275	/// large sequential scans.
276	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
277	pub(crate) async fn getm_records(
278		&self,
279		ns: NamespaceId,
280		db: DatabaseId,
281		rids: &[RecordId],
282		version: Option<u64>,
283		cache_policy: CachePolicy,
284	) -> Result<Vec<Arc<Record>>> {
285		if rids.is_empty() {
286			return Ok(Vec::new());
287		}
288
289		// Cache is not versioned — bypass it entirely for historical reads
290		if version.is_some() {
291			let keys: Vec<crate::key::record::RecordKey<'_>> = rids
292				.iter()
293				.map(|rid| crate::key::record::new(ns, db, &rid.table, &rid.key))
294				.collect();
295
296			let values = self.getm(keys, version).await?;
297
298			return values
299				.into_iter()
300				.zip(rids)
301				.map(|(opt_val, rid)| {
302					Ok(match opt_val {
303						Some(mut record) => {
304							record.data.def(rid.clone());
305							record.into_read_only()
306						}
307						None => Arc::new(Default::default()),
308					})
309				})
310				.collect::<Result<Vec<_>, _>>();
311		}
312
313		// Phase 1: check cache, collect hits and indices of misses
314		let mut out: Vec<Option<Arc<Record>>> = vec![None; rids.len()];
315		let mut uncached_rids: Vec<(usize, &RecordId)> = Vec::new();
316
317		for (i, rid) in rids.iter().enumerate() {
318			let qey = cache::tx::Lookup::Record(ns, db, rid.table.as_str(), &rid.key);
319			if let Some(entry) = self.cache.get(&qey) {
320				out[i] = Some(entry.try_into_record()?);
321			} else {
322				uncached_rids.push((i, rid));
323			}
324		}
325
326		// Phase 2: batch fetch uncached keys
327		if uncached_rids.is_empty() {
328			return out
329				.into_iter()
330				.map(|o| {
331					o.ok_or_else(|| Error::Internal("missing record in multi-get batch".into()))
332				})
333				.collect::<Result<Vec<_>, _>>()
334				.map_err(Into::into);
335		}
336
337		let keys: Vec<crate::key::record::RecordKey<'_>> = uncached_rids
338			.iter()
339			.map(|(_, rid)| crate::key::record::new(ns, db, &rid.table, &rid.key))
340			.collect();
341
342		let values = self.getm(keys, None).await?;
343
344		// Phase 3: post-process fetched records and merge into output.
345		// Only populate the cache when the caller requests ReadWrite; ReadOnly
346		// avoids eviction churn during large sequential scans.
347		for (j, opt_val) in values.into_iter().enumerate() {
348			let (i, rid) = uncached_rids[j];
349			let record = match opt_val {
350				Some(mut record) => {
351					record.data.def(rid.clone());
352					let record = record.into_read_only();
353					if matches!(cache_policy, CachePolicy::ReadWrite) {
354						let qey = cache::tx::Lookup::Record(ns, db, rid.table.as_str(), &rid.key);
355						self.cache.insert(qey, cache::tx::Entry::Val(record.clone()));
356					}
357					record
358				}
359				None => Arc::new(Default::default()),
360			};
361			out[i] = Some(record);
362		}
363
364		out.into_iter()
365			.map(|o| o.ok_or_else(|| Error::Internal("missing record in multi-get batch".into())))
366			.collect::<Result<Vec<_>, _>>()
367			.map_err(Into::into)
368	}
369
370	/// Delete a key from the datastore.
371	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
372	pub async fn del<K>(&self, key: &K) -> Result<()>
373	where
374		K: KVKey + Debug,
375	{
376		let key = key.encode_key()?;
377		Ok(self.tr.del(key).await.map_err(Error::from)?)
378	}
379
380	/// Delete a key from the datastore if the current value matches a
381	/// condition.
382	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
383	pub async fn delc<K>(&self, key: &K, chk: Option<&K::ValueType>) -> Result<()>
384	where
385		K: KVKey + Debug,
386	{
387		let key = key.encode_key()?;
388		let chk = chk.map(|v| v.kv_encode_value()).transpose()?;
389		Ok(self.tr.delc(key, chk).await.map_err(Error::from)?)
390	}
391
392	/// Delete a range of keys from the datastore.
393	///
394	/// This function deletes entries from the underlying datastore in grouped
395	/// batches.
396	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
397	pub async fn delr<K>(&self, rng: Range<K>) -> Result<()>
398	where
399		K: KVKey + Debug,
400	{
401		let beg = rng.start.encode_key()?;
402		let end = rng.end.encode_key()?;
403		Ok(self.tr.delr(beg..end).await.map_err(Error::from)?)
404	}
405
406	/// Delete a prefix of keys from the datastore.
407	///
408	/// This function deletes entries from the underlying datastore in grouped
409	/// batches.
410	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
411	pub async fn delp<K>(&self, key: &K) -> Result<()>
412	where
413		K: KVKey + Debug,
414	{
415		let key = key.encode_key()?;
416		Ok(self.tr.delp(key).await.map_err(Error::from)?)
417	}
418
419	/// Delete all versions of a key from the datastore.
420	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
421	pub async fn clr<K>(&self, key: &K) -> Result<()>
422	where
423		K: KVKey + Debug,
424	{
425		let key = key.encode_key()?;
426		Ok(self.tr.clr(key).await.map_err(Error::from)?)
427	}
428
429	/// Delete all versions of a key from the datastore if the current value
430	/// matches a condition.
431	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
432	pub async fn clrc<K>(&self, key: &K, chk: Option<&K::ValueType>) -> Result<()>
433	where
434		K: KVKey + Debug,
435	{
436		let key = key.encode_key()?;
437		let chk = chk.map(|v| v.kv_encode_value()).transpose()?;
438		Ok(self.tr.clrc(key, chk).await.map_err(Error::from)?)
439	}
440
441	/// Delete all versions of a range of keys from the datastore.
442	///
443	/// This function deletes entries from the underlying datastore in grouped
444	/// batches.
445	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
446	pub async fn clrr<K>(&self, rng: Range<K>) -> Result<()>
447	where
448		K: KVKey + Debug,
449	{
450		let beg = rng.start.encode_key()?;
451		let end = rng.end.encode_key()?;
452		Ok(self.tr.clrr(beg..end).await.map_err(Error::from)?)
453	}
454
455	/// Delete all versions of a prefix of keys from the datastore.
456	///
457	/// This function deletes entries from the underlying datastore in grouped
458	/// batches.
459	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
460	pub async fn clrp<K>(&self, key: &K) -> Result<()>
461	where
462		K: KVKey + Debug,
463	{
464		let key = key.encode_key()?;
465		Ok(self.tr.clrp(key).await.map_err(Error::from)?)
466	}
467
468	/// Insert or update a key in the datastore.
469	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
470	pub async fn set<K>(&self, key: &K, val: &K::ValueType, version: Option<u64>) -> Result<()>
471	where
472		K: KVKey + Debug,
473	{
474		let key = key.encode_key()?;
475		let val = val.kv_encode_value()?;
476		Ok(self.tr.set(key, val, version).await.map_err(Error::from)?)
477	}
478
479	/// Insert a key if it doesn't exist in the datastore.
480	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
481	pub async fn put<K>(&self, key: &K, val: &K::ValueType, version: Option<u64>) -> Result<()>
482	where
483		K: KVKey + Debug,
484	{
485		let key = key.encode_key()?;
486		let val = val.kv_encode_value()?;
487		Ok(self.tr.put(key, val, version).await.map_err(Error::from)?)
488	}
489
490	/// Update a key in the datastore if the current value matches a condition.
491	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
492	pub async fn putc<K>(
493		&self,
494		key: &K,
495		val: &K::ValueType,
496		chk: Option<&K::ValueType>,
497	) -> Result<()>
498	where
499		K: KVKey + Debug,
500	{
501		let key = key.encode_key()?;
502		let val = val.kv_encode_value()?;
503		let chk = chk.map(|v| v.kv_encode_value()).transpose()?;
504		Ok(self.tr.putc(key, val, chk).await.map_err(Error::from)?)
505	}
506
507	/// Insert or replace a key in the datastore.
508	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
509	pub async fn replace<K>(&self, key: &K, val: &K::ValueType) -> Result<()>
510	where
511		K: KVKey + Debug,
512	{
513		let key = key.encode_key()?;
514		let val = val.kv_encode_value()?;
515		Ok(self.tr.replace(key, val).await.map_err(Error::from)?)
516	}
517
518	// --------------------------------------------------
519	// Range functions
520	// --------------------------------------------------
521
522	/// Retrieve a specific range of keys from the datastore.
523	///
524	/// This function fetches the full range of keys, in a single request to the
525	/// underlying datastore.
526	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
527	pub async fn keys<K>(
528		&self,
529		rng: Range<K>,
530		limit: u32,
531		skip: u32,
532		version: Option<u64>,
533	) -> Result<Vec<Key>>
534	where
535		K: KVKey + Debug,
536	{
537		let beg = rng.start.encode_key()?;
538		let end = rng.end.encode_key()?;
539		let limit = limit.into();
540		Ok(self.tr.keys(beg..end, limit, skip, version).await.map_err(Error::from)?)
541	}
542
543	/// Retrieve a specific range of keys from the datastore in reverse order.
544	///
545	/// This function fetches the full range of keys, in a single request to the
546	/// underlying datastore.
547	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
548	pub async fn keysr<K>(
549		&self,
550		rng: Range<K>,
551		limit: u32,
552		skip: u32,
553		version: Option<u64>,
554	) -> Result<Vec<Key>>
555	where
556		K: KVKey + Debug,
557	{
558		let beg = rng.start.encode_key()?;
559		let end = rng.end.encode_key()?;
560		let limit = limit.into();
561		Ok(self.tr.keysr(beg..end, limit, skip, version).await.map_err(Error::from)?)
562	}
563
564	/// Retrieve a specific range of keys from the datastore.
565	///
566	/// This function fetches the full range of key-value pairs, in a single
567	/// request to the underlying datastore.
568	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
569	pub async fn scan<K>(
570		&self,
571		rng: Range<K>,
572		limit: u32,
573		skip: u32,
574		version: Option<u64>,
575	) -> Result<Vec<(Key, Val)>>
576	where
577		K: KVKey + Debug,
578	{
579		let beg = rng.start.encode_key()?;
580		let end = rng.end.encode_key()?;
581		let limit = limit.into();
582		Ok(self.tr.scan(beg..end, limit, skip, version).await.map_err(Error::from)?)
583	}
584
585	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
586	pub async fn scanr<K>(
587		&self,
588		rng: Range<K>,
589		limit: u32,
590		skip: u32,
591		version: Option<u64>,
592	) -> Result<Vec<(Key, Val)>>
593	where
594		K: KVKey + Debug,
595	{
596		let beg = rng.start.encode_key()?;
597		let end = rng.end.encode_key()?;
598		let limit = limit.into();
599		Ok(self.tr.scanr(beg..end, limit, skip, version).await.map_err(Error::from)?)
600	}
601
602	/// Count the total number of keys within a range in the datastore.
603	///
604	/// This function fetches the total count, in batches, with multiple
605	/// requests to the underlying datastore.
606	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
607	pub async fn count<K>(&self, rng: Range<K>, version: Option<u64>) -> Result<usize>
608	where
609		K: KVKey + Debug,
610	{
611		let beg = rng.start.encode_key()?;
612		let end = rng.end.encode_key()?;
613		Ok(self.tr.count(beg..end, version).await.map_err(Error::from)?)
614	}
615
616	// --------------------------------------------------
617	// Batch functions
618	// --------------------------------------------------
619
620	/// Retrieve a batched scan over a specific range of keys in the datastore.
621	///
622	/// This function fetches the keys in batches, with multiple requests to the
623	/// underlying datastore.
624	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
625	pub async fn batch_keys<K>(
626		&self,
627		rng: Range<K>,
628		batch: u32,
629		version: Option<u64>,
630	) -> Result<Batch<Key>>
631	where
632		K: KVKey + Debug,
633	{
634		let beg = rng.start.encode_key()?;
635		let end = rng.end.encode_key()?;
636		Ok(self.tr.batch_keys(beg..end, batch, version).await.map_err(Error::from)?)
637	}
638
639	/// Retrieve a batched scan over a specific range of keys in the datastore.
640	///
641	/// This function fetches the key-value pairs in batches, with multiple
642	/// requests to the underlying datastore.
643	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
644	pub async fn batch_keys_vals<K>(
645		&self,
646		rng: Range<K>,
647		batch: u32,
648		version: Option<u64>,
649	) -> Result<Batch<(Key, Val)>>
650	where
651		K: KVKey + Debug,
652	{
653		let beg = rng.start.encode_key()?;
654		let end = rng.end.encode_key()?;
655		Ok(self.tr.batch_keys_vals(beg..end, batch, version).await.map_err(Error::from)?)
656	}
657
658	// --------------------------------------------------
659	// Stream functions
660	// --------------------------------------------------
661
662	/// Retrieve a stream of key batches over a specific range in the datastore.
663	///
664	/// This function returns a stream that yields batches of keys. The scanner:
665	/// - Fetches an initial batch of up to 100 items
666	/// - Fetches subsequent batches of up to 16 MiB (local) or 4 MiB (remote)
667	/// - Prefetches the next batch while the current batch is being processed
668	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
669	pub fn stream_keys(
670		&self,
671		rng: Range<Key>,
672		version: Option<u64>,
673		limit: Option<usize>,
674		skip: u32,
675		dir: ScanDirection,
676	) -> impl Stream<Item = Result<Vec<Key>>> + '_ {
677		self.tr
678			.stream_keys(
679				rng,
680				version,
681				limit,
682				skip,
683				match dir {
684					ScanDirection::Forward => Direction::Forward,
685					ScanDirection::Backward => Direction::Backward,
686				},
687			)
688			.map_err(Error::from)
689			.map_err(Into::into)
690	}
691
692	/// Retrieve a stream of key-value batches over a specific range in the datastore.
693	///
694	/// This function returns a stream that yields batches of key-value pairs. The scanner:
695	/// - Fetches an initial batch of up to 100 items (or 500 when `prefetch` is enabled)
696	/// - Fetches subsequent batches of up to 16 MiB (local) or 4 MiB (remote)
697	/// - When `prefetch` is true, prefetches the next batch while the current batch is being
698	///   processed, and uses a larger initial batch size (500 items)
699	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
700	pub fn stream_keys_vals(
701		&self,
702		rng: Range<Key>,
703		version: Option<u64>,
704		limit: Option<usize>,
705		skip: u32,
706		dir: ScanDirection,
707		prefetch: bool,
708	) -> impl Stream<Item = Result<Vec<(Key, Val)>>> + '_ {
709		self.tr
710			.stream_keys_vals(
711				rng,
712				version,
713				limit,
714				skip,
715				match dir {
716					ScanDirection::Forward => Direction::Forward,
717					ScanDirection::Backward => Direction::Backward,
718				},
719				prefetch,
720			)
721			.map_err(Error::from)
722			.map_err(Into::into)
723	}
724
725	// --------------------------------------------------
726	// Savepoint functions
727	// --------------------------------------------------
728
729	/// Set a new save point on the transaction.
730	pub async fn new_save_point(&self) -> Result<()> {
731		Ok(self.inner.new_save_point().await.map_err(Error::from)?)
732	}
733
734	/// Release the last save point.
735	pub async fn release_last_save_point(&self) -> Result<()> {
736		Ok(self.inner.release_last_save_point().await.map_err(Error::from)?)
737	}
738
739	/// Rollback to the last save point.
740	pub async fn rollback_to_save_point(&self) -> Result<()> {
741		Ok(self.inner.rollback_to_save_point().await.map_err(Error::from)?)
742	}
743
744	// --------------------------------------------------
745	// Timestamp functions
746	// --------------------------------------------------
747
748	/// Get the current monotonic timestamp
749	pub async fn timestamp(&self) -> Result<BoxTimeStamp> {
750		Ok(self.tr.timestamp().await.map_err(Error::from)?)
751	}
752
753	/// Returns the implementation of timestamp that this transaction uses.
754	pub fn timestamp_impl(&self) -> BoxTimeStampImpl {
755		self.tr.timestamp_impl()
756	}
757
758	// --------------------------------------------------
759	// Changefeed functions
760	// --------------------------------------------------
761
762	// Records the table (re)definition in the changefeed if enabled.
763	pub(crate) fn changefeed_buffer_table_change(
764		&self,
765		ns: NamespaceId,
766		db: DatabaseId,
767		tb: &TableName,
768		dt: &TableDefinition,
769	) {
770		self.cf.changefeed_buffer_table_change(ns, db, tb, dt)
771	}
772
773	// change will record the change in the changefeed if enabled.
774	// To actually persist the record changes into the underlying kvs,
775	// you must call the `complete_changes` function and then commit the
776	// transaction.
777	#[expect(clippy::too_many_arguments)]
778	pub(crate) fn changefeed_buffer_record_change(
779		&self,
780		ns: NamespaceId,
781		db: DatabaseId,
782		tb: &TableName,
783		id: &RecordId,
784		previous: CursorRecord,
785		current: CursorRecord,
786		store_difference: bool,
787	) {
788		self.cf.changefeed_buffer_record_change(
789			ns,
790			db,
791			tb,
792			id.clone(),
793			previous,
794			current,
795			store_difference,
796		)
797	}
798
799	// complete_changes will complete the changefeed recording for the given
800	// namespace and database.
801	//
802	// This function writes all buffered changefeed entries to the datastore
803	// with the current transaction timestamp. Every change must be recorded by
804	// calling this struct's `changefeed_buffer_record_change` function beforehand.
805	// If there were no preceding calls for this transaction, this function
806	// will do nothing.
807	//
808	// This function should be called only after all the changes have been made to
809	// the transaction. Otherwise, changes are missed in the change feed.
810	//
811	// This function should be called immediately before calling the commit function
812	// to ensure the timestamp reflects the actual commit time.
813	pub(crate) async fn store_changes(&self) -> Result<()> {
814		// Get the changes from the changefeed
815		let changes = self.cf.changes()?;
816		// For zero-length changes, return early
817		if changes.is_empty() {
818			return Ok(());
819		}
820		// Get the current transaction timestamp
821		let buf = &mut [0u8; _];
822		let ts = self.timestamp().await?.encode(buf);
823		// Collect all changefeed write operations as futures
824		let futures = changes.into_iter().map(|(ns, db, tb, value)| async move {
825			// Create the changefeed key with the current timestamp
826			let key = crate::key::change::new(ns, db, ts, &tb).encode_key()?;
827			// Write the changefeed entry using the raw transactor API
828			self.tr.set(key, value, None).await.map_err(Error::from)?;
829			// Everything succeeded
830			Ok::<(), anyhow::Error>(())
831		});
832		// Execute all write operations concurrently
833		try_join_all(futures).await?;
834		// All good
835		Ok(())
836	}
837
838	// --------------------------------------------------
839	// Cache functions
840	// --------------------------------------------------
841
842	#[inline]
843	fn set_record_cache(
844		&self,
845		ns: NamespaceId,
846		db: DatabaseId,
847		tb: &TableName,
848		id: &RecordIdKey,
849		record: Arc<Record>,
850	) {
851		// Set the value in the cache
852		let qey = cache::tx::Lookup::Record(ns, db, tb, id);
853		self.cache.insert(qey, cache::tx::Entry::Val(record));
854	}
855
856	/// Clears all keys from the transaction cache.
857	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
858	pub fn clear_cache(&self) {
859		self.cache.clear()
860	}
861
862	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip_all)]
863	pub async fn compact<K>(&self, prefix_key: Option<K>) -> Result<()>
864	where
865		K: KVKey + Debug,
866	{
867		let rng = match prefix_key {
868			Some(prefix_key) => Some(util::to_prefix_range(prefix_key)?),
869			None => None,
870		};
871		self.tr.inner.compact(rng).await
872	}
873
874	/// Mark this transaction to wake the async event processor after commit.
875	pub(crate) fn trigger_async_event(&self) {
876		self.trigger_async_event.store(true, Ordering::Relaxed);
877	}
878}
879
880#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
881#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
882impl NodeProvider for Transaction {
883	/// Retrieve all nodes belonging to this cluster.
884	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
885	async fn all_nodes(&self) -> Result<Arc<[Node]>> {
886		let qey = cache::tx::Lookup::Nds;
887		match self.cache.get(&qey) {
888			Some(val) => val.try_into_nds(),
889			None => {
890				let beg = crate::key::root::nd::prefix();
891				let end = crate::key::root::nd::suffix();
892				let val = self.getr(beg..end, None).await?;
893				let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
894				let entry = cache::tx::Entry::Nds(val.clone());
895				self.cache.insert(qey, entry);
896				Ok(val)
897			}
898		}
899	}
900
901	/// Retrieve a specific node in the cluster.
902	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
903	async fn get_node(&self, id: Uuid) -> Result<Arc<Node>> {
904		let qey = cache::tx::Lookup::Nd(id);
905		match self.cache.get(&qey) {
906			Some(val) => val,
907			None => {
908				let key = crate::key::root::nd::new(id);
909				let val = self.get(&key, None).await?.ok_or_else(|| Error::NdNotFound {
910					uuid: id.to_string(),
911				})?;
912				let val = cache::tx::Entry::Any(Arc::new(val));
913				self.cache.insert(qey, val.clone());
914				val
915			}
916		}
917		.try_into_type()
918	}
919}
920
921#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
922#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
923impl RootProvider for Transaction {
924	async fn get_default_config(&self) -> Result<Option<Arc<DefaultConfig>>> {
925		let qey = cache::tx::Lookup::Rcg("default");
926		match self.cache.get(&qey) {
927			Some(val) => val,
928			None => {
929				let key = crate::key::root::root_config::new("default");
930				let Some(val) = self.get(&key, None).await? else {
931					return Ok(None);
932				};
933				let ConfigDefinition::Default(val) = val else {
934					fail!("Expected a default config but found {val:?} instead");
935				};
936				let val = cache::tx::Entry::Any(Arc::new(val));
937				self.cache.insert(qey, val.clone());
938				val
939			}
940		}
941		.try_into_type()
942		.map(Option::Some)
943	}
944
945	/// Retrieve a specific config definition from the root.
946	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
947	async fn get_root_config(&self, cg: &str) -> Result<Option<Arc<ConfigDefinition>>> {
948		let qey = cache::tx::Lookup::Rcg(cg);
949		match self.cache.get(&qey) {
950			Some(val) => val.try_into_type().map(Option::Some),
951			None => {
952				let key = crate::key::root::root_config::new(cg);
953				if let Some(val) = self.get(&key, None).await? {
954					let val = Arc::new(val);
955					let entr = cache::tx::Entry::Any(val.clone());
956					self.cache.insert(qey, entr);
957					Ok(Some(val))
958				} else {
959					Ok(None)
960				}
961			}
962		}
963	}
964}
965
966#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
967#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
968impl NamespaceProvider for Transaction {
969	/// Retrieve all namespace definitions in a datastore.
970	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
971	async fn all_ns(&self) -> Result<Arc<[NamespaceDefinition]>> {
972		let qey = cache::tx::Lookup::Nss;
973		match self.cache.get(&qey) {
974			Some(val) => val.try_into_nss(),
975			None => {
976				let beg = crate::key::root::ns::prefix();
977				let end = crate::key::root::ns::suffix();
978				let val = self.getr(beg..end, None).await?;
979				let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
980				let entry = cache::tx::Entry::Nss(val.clone());
981				self.cache.insert(qey, entry);
982				Ok(val)
983			}
984		}
985	}
986
987	async fn get_ns_by_name(&self, ns: &str) -> Result<Option<Arc<NamespaceDefinition>>> {
988		let qey = cache::tx::Lookup::NsByName(ns);
989		match self.cache.get(&qey) {
990			Some(val) => val.try_into_type().map(Some),
991			None => {
992				let key = crate::key::root::ns::new(ns);
993				let Some(ns) = self.get(&key, None).await? else {
994					return Ok(None);
995				};
996
997				let ns = Arc::new(ns);
998				let entr = cache::tx::Entry::Any(ns.clone());
999				self.cache.insert(qey, entr);
1000				Ok(Some(ns))
1001			}
1002		}
1003	}
1004
1005	async fn expect_ns_by_name(&self, ns: &str) -> Result<Arc<NamespaceDefinition>> {
1006		match self.get_ns_by_name(ns).await? {
1007			Some(val) => Ok(val),
1008			None => anyhow::bail!(Error::NsNotFound {
1009				name: ns.to_owned(),
1010			}),
1011		}
1012	}
1013
1014	async fn put_ns(&self, ns: NamespaceDefinition) -> Result<Arc<NamespaceDefinition>> {
1015		let key = crate::key::root::ns::new(&ns.name);
1016		self.set(&key, &ns, None).await?;
1017
1018		// Invalidate the cached list of all namespaces
1019		let list_key = cache::tx::Lookup::Nss;
1020		self.cache.remove(list_key);
1021
1022		// Populate cache
1023		let cached_ns = Arc::new(ns.clone());
1024
1025		let entry = cache::tx::Entry::Any(Arc::clone(&cached_ns) as Arc<dyn Any + Send + Sync>);
1026		let qey = cache::tx::Lookup::NsByName(&ns.name);
1027		self.cache.insert(qey, entry);
1028
1029		Ok(cached_ns)
1030	}
1031
1032	async fn get_next_ns_id(&self, ctx: Option<&Context>) -> Result<NamespaceId> {
1033		self.sequences.next_namespace_id(ctx).await
1034	}
1035}
1036
1037#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
1038#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
1039impl DatabaseProvider for Transaction {
1040	/// Retrieve all database definitions for a specific namespace.
1041	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1042	async fn all_db(&self, ns: NamespaceId) -> Result<Arc<[DatabaseDefinition]>> {
1043		let qey = cache::tx::Lookup::Dbs(ns);
1044		match self.cache.get(&qey) {
1045			Some(val) => val.try_into_dbs(),
1046			None => {
1047				let beg = crate::key::namespace::db::prefix(ns)?;
1048				let end = crate::key::namespace::db::suffix(ns)?;
1049				let val = self.getr(beg..end, None).await?;
1050				let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
1051				let entry = cache::tx::Entry::Dbs(val.clone());
1052				self.cache.insert(qey, entry);
1053				Ok(val)
1054			}
1055		}
1056	}
1057
1058	/// Retrieve a specific database definition.
1059	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1060	async fn get_db_by_name(&self, ns: &str, db: &str) -> Result<Option<Arc<DatabaseDefinition>>> {
1061		let qey = cache::tx::Lookup::DbByName(ns, db);
1062		match self.cache.get(&qey) {
1063			Some(val) => val.try_into_type().map(Some),
1064			None => {
1065				let Some(ns) = self.get_ns_by_name(ns).await? else {
1066					return Ok(None);
1067				};
1068
1069				let key = crate::key::namespace::db::new(ns.namespace_id, db);
1070				let Some(db_def) = self.get(&key, None).await? else {
1071					return Ok(None);
1072				};
1073
1074				let val = Arc::new(db_def);
1075				let entry = cache::tx::Entry::Any(val.clone());
1076				self.cache.insert(qey, entry);
1077				Ok(Some(val))
1078			}
1079		}
1080	}
1081
1082	/// Get or add a database with a default configuration, only if we are in
1083	/// dynamic mode.
1084	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self, ctx))]
1085	async fn get_or_add_db_upwards(
1086		&self,
1087		ctx: Option<&Context>,
1088		ns: &str,
1089		db: &str,
1090		upwards: bool,
1091	) -> Result<Arc<DatabaseDefinition>> {
1092		let qey = cache::tx::Lookup::DbByName(ns, db);
1093		match self.cache.get(&qey) {
1094			// The entry is in the cache
1095			Some(val) => {
1096				let t = val.try_into_type()?;
1097				Ok(t)
1098			}
1099			// The entry is not in the cache
1100			None => {
1101				let db_def = self.get_db_by_name(ns, db).await?;
1102				if let Some(db_def) = db_def {
1103					return Ok(db_def);
1104				}
1105
1106				let ns_def = if upwards {
1107					self.get_or_add_ns(ctx, ns).await?
1108				} else {
1109					match self.get_ns_by_name(ns).await? {
1110						Some(ns_def) => ns_def,
1111						None => {
1112							return Err(Error::NsNotFound {
1113								name: ns.to_owned(),
1114							}
1115							.into());
1116						}
1117					}
1118				};
1119
1120				let db_def = DatabaseDefinition {
1121					namespace_id: ns_def.namespace_id,
1122					database_id: self.get_next_db_id(ctx, ns_def.namespace_id).await?,
1123					name: db.to_string(),
1124					comment: None,
1125					changefeed: None,
1126					strict: false,
1127				};
1128
1129				return self.put_db(&ns_def.name, db_def).await;
1130			}
1131		}
1132	}
1133
1134	async fn get_next_db_id(&self, ctx: Option<&Context>, ns: NamespaceId) -> Result<DatabaseId> {
1135		self.sequences.next_database_id(ctx, ns).await
1136	}
1137
1138	async fn put_db(&self, ns: &str, db: DatabaseDefinition) -> Result<Arc<DatabaseDefinition>> {
1139		let key = crate::key::namespace::db::new(db.namespace_id, &db.name);
1140		self.set(&key, &db, None).await?;
1141
1142		// Invalidate the cached list of all databases for this namespace
1143		let list_key = cache::tx::Lookup::Dbs(db.namespace_id);
1144		self.cache.remove(list_key);
1145
1146		// Populate cache
1147		let cached_db = Arc::new(db.clone());
1148
1149		let entry = cache::tx::Entry::Any(Arc::clone(&cached_db) as Arc<dyn Any + Send + Sync>);
1150		let qey = cache::tx::Lookup::DbByName(ns, &db.name);
1151		self.cache.insert(qey, entry);
1152
1153		Ok(cached_db)
1154	}
1155
1156	async fn del_db(&self, ns: &str, db: &str, expunge: bool) -> Result<Option<()>> {
1157		let Some(db) = self.get_db_by_name(ns, db).await? else {
1158			return Ok(None);
1159		};
1160		let key = crate::key::namespace::db::new(db.namespace_id, &db.name);
1161		let database_root = crate::key::database::all::new(db.namespace_id, db.database_id);
1162		if expunge {
1163			self.clr(&key).await?;
1164			self.clrp(&database_root).await?;
1165		} else {
1166			self.del(&key).await?;
1167			self.delp(&database_root).await?
1168		};
1169
1170		// Invalidate the cached list of all databases for this namespace
1171		let list_key = cache::tx::Lookup::Dbs(db.namespace_id);
1172		self.cache.remove(list_key);
1173
1174		// Invalidate the cached database entry
1175		let db_key = cache::tx::Lookup::DbByName(ns, &db.name);
1176		self.cache.remove(db_key);
1177
1178		Ok(Some(()))
1179	}
1180
1181	/// Retrieve all analyzer definitions for a specific database.
1182	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1183	async fn all_db_analyzers(
1184		&self,
1185		ns: NamespaceId,
1186		db: DatabaseId,
1187	) -> Result<Arc<[catalog::AnalyzerDefinition]>> {
1188		let qey = cache::tx::Lookup::Azs(ns, db);
1189		match self.cache.get(&qey) {
1190			Some(val) => val.try_into_azs(),
1191			None => {
1192				let beg = crate::key::database::az::prefix(ns, db)?;
1193				let end = crate::key::database::az::suffix(ns, db)?;
1194				let val = self.getr(beg..end, None).await?;
1195				let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
1196				let entry = cache::tx::Entry::Azs(val.clone());
1197				self.cache.insert(qey, entry);
1198				Ok(val)
1199			}
1200		}
1201	}
1202
1203	/// Retrieve all sequences definitions for a specific database.
1204	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1205	async fn all_db_sequences(
1206		&self,
1207		ns: NamespaceId,
1208		db: DatabaseId,
1209	) -> Result<Arc<[catalog::SequenceDefinition]>> {
1210		let qey = cache::tx::Lookup::Sqs(ns, db);
1211		match self.cache.get(&qey) {
1212			Some(val) => val.try_into_sqs(),
1213			None => {
1214				let beg = crate::key::database::sq::prefix(ns, db)?;
1215				let end = crate::key::database::sq::suffix(ns, db)?;
1216				let val = self.getr(beg..end, None).await?;
1217				let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
1218				let entry = cache::tx::Entry::Sqs(val.clone());
1219				self.cache.insert(qey, entry);
1220				Ok(val)
1221			}
1222		}
1223	}
1224
1225	/// Retrieve all function definitions for a specific database.
1226	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1227	async fn all_db_functions(
1228		&self,
1229		ns: NamespaceId,
1230		db: DatabaseId,
1231	) -> Result<Arc<[catalog::FunctionDefinition]>> {
1232		let qey = cache::tx::Lookup::Fcs(ns, db);
1233		match self.cache.get(&qey) {
1234			Some(val) => val.try_into_fcs(),
1235			None => {
1236				let beg = crate::key::database::fc::prefix(ns, db)?;
1237				let end = crate::key::database::fc::suffix(ns, db)?;
1238				let val = self.getr(beg..end, None).await?;
1239				let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
1240				let entry = cache::tx::Entry::Fcs(val.clone());
1241				self.cache.insert(qey, entry);
1242				Ok(val)
1243			}
1244		}
1245	}
1246
1247	/// Retrieve all module definitions for a specific database.
1248	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1249	async fn all_db_modules(
1250		&self,
1251		ns: NamespaceId,
1252		db: DatabaseId,
1253	) -> Result<Arc<[catalog::ModuleDefinition]>> {
1254		let qey = cache::tx::Lookup::Mds(ns, db);
1255		match self.cache.get(&qey) {
1256			Some(val) => val.try_into_mds(),
1257			None => {
1258				let beg = crate::key::database::md::prefix(ns, db)?;
1259				let end = crate::key::database::md::suffix(ns, db)?;
1260				let val = self.getr(beg..end, None).await?;
1261				let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
1262				let entry = cache::tx::Entry::Mds(val.clone());
1263				self.cache.insert(qey, entry);
1264				Ok(val)
1265			}
1266		}
1267	}
1268
1269	/// Retrieve all param definitions for a specific database.
1270	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1271	async fn all_db_params(
1272		&self,
1273		ns: NamespaceId,
1274		db: DatabaseId,
1275	) -> Result<Arc<[catalog::ParamDefinition]>> {
1276		let qey = cache::tx::Lookup::Pas(ns, db);
1277		match self.cache.get(&qey) {
1278			Some(val) => val.try_into_pas(),
1279			None => {
1280				let beg = crate::key::database::pa::prefix(ns, db)?;
1281				let end = crate::key::database::pa::suffix(ns, db)?;
1282				let val = self.getr(beg..end, None).await?;
1283				let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
1284				let entry = cache::tx::Entry::Pas(val.clone());
1285				self.cache.insert(qey, entry);
1286				Ok(val)
1287			}
1288		}
1289	}
1290
1291	/// Retrieve all model definitions for a specific database.
1292	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1293	async fn all_db_models(
1294		&self,
1295		ns: NamespaceId,
1296		db: DatabaseId,
1297	) -> Result<Arc<[catalog::MlModelDefinition]>> {
1298		let qey = cache::tx::Lookup::Mls(ns, db);
1299		match self.cache.get(&qey) {
1300			Some(val) => val.try_into_mls(),
1301			None => {
1302				let beg = crate::key::database::ml::prefix(ns, db)?;
1303				let end = crate::key::database::ml::suffix(ns, db)?;
1304				let val = self.getr(beg..end, None).await?;
1305				let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
1306				let entry = cache::tx::Entry::Mls(val.clone());
1307				self.cache.insert(qey, entry);
1308				Ok(val)
1309			}
1310		}
1311	}
1312
1313	/// Retrieve all config definitions for a specific database.
1314	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1315	async fn all_db_configs(
1316		&self,
1317		ns: NamespaceId,
1318		db: DatabaseId,
1319	) -> Result<Arc<[ConfigDefinition]>> {
1320		let qey = cache::tx::Lookup::Cgs(ns, db);
1321		match self.cache.get(&qey) {
1322			Some(val) => val.try_into_cgs(),
1323			None => {
1324				let beg = crate::key::database::cg::prefix(ns, db)?;
1325				let end = crate::key::database::cg::suffix(ns, db)?;
1326				let val = self.getr(beg..end, None).await?;
1327				let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
1328				let entry = cache::tx::Entry::Cgs(val.clone());
1329				self.cache.insert(qey, entry);
1330				Ok(val)
1331			}
1332		}
1333	}
1334
1335	/// Retrieve a specific model definition from a database.
1336	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1337	async fn get_db_model(
1338		&self,
1339		ns: NamespaceId,
1340		db: DatabaseId,
1341		ml: &str,
1342		vn: &str,
1343	) -> Result<Option<Arc<catalog::MlModelDefinition>>> {
1344		let qey = cache::tx::Lookup::Ml(ns, db, ml, vn);
1345		match self.cache.get(&qey) {
1346			Some(val) => val.try_into_type().map(Some),
1347			None => {
1348				let key = crate::key::database::ml::new(ns, db, ml, vn);
1349				let Some(val) = self.get(&key, None).await? else {
1350					return Ok(None);
1351				};
1352				let val = Arc::new(val);
1353				let entry = cache::tx::Entry::Any(val.clone());
1354				self.cache.insert(qey, entry);
1355				Ok(Some(val))
1356			}
1357		}
1358	}
1359
1360	/// Retrieve a specific analyzer definition from a database.
1361	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1362	async fn get_db_analyzer(
1363		&self,
1364		ns: NamespaceId,
1365		db: DatabaseId,
1366		az: &str,
1367	) -> Result<Arc<catalog::AnalyzerDefinition>> {
1368		let qey = cache::tx::Lookup::Az(ns, db, az);
1369		match self.cache.get(&qey) {
1370			Some(val) => val.try_into_type(),
1371			None => {
1372				let key = crate::key::database::az::new(ns, db, az);
1373				let val = self.get(&key, None).await?.ok_or_else(|| Error::AzNotFound {
1374					name: az.to_owned(),
1375				})?;
1376				let val = Arc::new(val);
1377				let entry = cache::tx::Entry::Any(val.clone());
1378				self.cache.insert(qey, entry);
1379				Ok(val)
1380			}
1381		}
1382	}
1383
1384	/// Retrieve a specific sequence definition from a database.
1385	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1386	async fn get_db_sequence(
1387		&self,
1388		ns: NamespaceId,
1389		db: DatabaseId,
1390		sq: &str,
1391	) -> Result<Arc<catalog::SequenceDefinition>> {
1392		let qey = cache::tx::Lookup::Sq(ns, db, sq);
1393		match self.cache.get(&qey) {
1394			Some(val) => val.try_into_type(),
1395			None => {
1396				let key = Sq::new(ns, db, sq);
1397				let val = self.get(&key, None).await?.ok_or_else(|| Error::SeqNotFound {
1398					name: sq.to_owned(),
1399				})?;
1400				let val = Arc::new(val);
1401				let entry = cache::tx::Entry::Any(val.clone());
1402				self.cache.insert(qey, entry);
1403				Ok(val)
1404			}
1405		}
1406	}
1407
1408	/// Retrieve a specific function definition from a database.
1409	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1410	async fn get_db_function(
1411		&self,
1412		ns: NamespaceId,
1413		db: DatabaseId,
1414		fc: &str,
1415	) -> Result<Arc<catalog::FunctionDefinition>> {
1416		let qey = cache::tx::Lookup::Fc(ns, db, fc);
1417		match self.cache.get(&qey) {
1418			Some(val) => val.try_into_type(),
1419			None => {
1420				let key = crate::key::database::fc::new(ns, db, fc);
1421				let val = self.get(&key, None).await?.ok_or_else(|| Error::FcNotFound {
1422					name: fc.to_owned(),
1423				})?;
1424				let val = Arc::new(val);
1425				let entry = cache::tx::Entry::Any(val.clone());
1426				self.cache.insert(qey, entry);
1427				Ok(val)
1428			}
1429		}
1430	}
1431
1432	/// Retrieve a specific module definition from a database.
1433	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1434	async fn get_db_module(
1435		&self,
1436		ns: NamespaceId,
1437		db: DatabaseId,
1438		md: &str,
1439	) -> Result<Arc<catalog::ModuleDefinition>> {
1440		let qey = cache::tx::Lookup::Md(ns, db, md);
1441		match self.cache.get(&qey) {
1442			Some(val) => val.try_into_type(),
1443			None => {
1444				let key = crate::key::database::md::new(ns, db, md);
1445				let val = self.get(&key, None).await?.ok_or_else(|| Error::MdNotFound {
1446					name: md.to_owned(),
1447				})?;
1448				let val = Arc::new(val);
1449				let entry = cache::tx::Entry::Any(val.clone());
1450				self.cache.insert(qey, entry);
1451				Ok(val)
1452			}
1453		}
1454	}
1455
1456	/// Retrieve a specific param definition from a database.
1457	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1458	async fn get_db_param(
1459		&self,
1460		ns: NamespaceId,
1461		db: DatabaseId,
1462		pa: &str,
1463	) -> Result<Arc<catalog::ParamDefinition>> {
1464		let qey = cache::tx::Lookup::Pa(ns, db, pa);
1465		match self.cache.get(&qey) {
1466			Some(val) => val.try_into_type(),
1467			None => {
1468				let key = crate::key::database::pa::new(ns, db, pa);
1469				let val = self.get(&key, None).await?.ok_or_else(|| Error::PaNotFound {
1470					name: pa.to_owned(),
1471				})?;
1472				let val = Arc::new(val);
1473				let entry = cache::tx::Entry::Any(val.clone());
1474				self.cache.insert(qey, entry);
1475				Ok(val)
1476			}
1477		}
1478	}
1479
1480	/// Retrieve a specific config definition from a database.
1481	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1482	async fn get_db_config(
1483		&self,
1484		ns: NamespaceId,
1485		db: DatabaseId,
1486		cg: &str,
1487	) -> Result<Option<Arc<ConfigDefinition>>> {
1488		let qey = cache::tx::Lookup::Cg(ns, db, cg);
1489		match self.cache.get(&qey) {
1490			Some(val) => val.try_into_type().map(Option::Some),
1491			None => {
1492				let key = crate::key::database::cg::new(ns, db, cg);
1493				if let Some(val) = self.get(&key, None).await? {
1494					let val = Arc::new(val);
1495					let entr = cache::tx::Entry::Any(val.clone());
1496					self.cache.insert(qey, entr);
1497					Ok(Some(val))
1498				} else {
1499					Ok(None)
1500				}
1501			}
1502		}
1503	}
1504
1505	async fn put_db_function(
1506		&self,
1507		ns: NamespaceId,
1508		db: DatabaseId,
1509		fc: &catalog::FunctionDefinition,
1510	) -> Result<()> {
1511		let key = crate::key::database::fc::new(ns, db, &fc.name);
1512		self.set(&key, fc, None).await?;
1513
1514		// Invalidate the cached list of all functions for this database
1515		let list_key = cache::tx::Lookup::Fcs(ns, db);
1516		self.cache.remove(list_key);
1517
1518		// Set the entry in the cache
1519		let qey = cache::tx::Lookup::Fc(ns, db, &fc.name);
1520		let entry = cache::tx::Entry::Any(Arc::new(fc.clone()));
1521		self.cache.insert(qey, entry);
1522
1523		Ok(())
1524	}
1525
1526	async fn put_db_module(
1527		&self,
1528		ns: NamespaceId,
1529		db: DatabaseId,
1530		md: &catalog::ModuleDefinition,
1531	) -> Result<()> {
1532		let name = md.get_storage_name()?;
1533		let key = crate::key::database::md::new(ns, db, &name);
1534		self.set(&key, md, None).await?;
1535
1536		// Invalidate the cached list of all modules for this database
1537		let list_key = cache::tx::Lookup::Mds(ns, db);
1538		self.cache.remove(list_key);
1539
1540		// Set the entry in the cache
1541		let qey = cache::tx::Lookup::Md(ns, db, &name);
1542		let entry = cache::tx::Entry::Any(Arc::new(md.clone()));
1543		self.cache.insert(qey, entry);
1544
1545		Ok(())
1546	}
1547
1548	async fn put_db_param(
1549		&self,
1550		ns: NamespaceId,
1551		db: DatabaseId,
1552		pa: &catalog::ParamDefinition,
1553	) -> Result<()> {
1554		let key = crate::key::database::pa::new(ns, db, &pa.name);
1555		self.set(&key, pa, None).await?;
1556
1557		// Invalidate the cached list of all params for this database
1558		let list_key = cache::tx::Lookup::Pas(ns, db);
1559		self.cache.remove(list_key);
1560
1561		// Set the entry in the cache
1562		let qey = cache::tx::Lookup::Pa(ns, db, &pa.name);
1563		let entry = cache::tx::Entry::Any(Arc::new(pa.clone()));
1564		self.cache.insert(qey, entry);
1565
1566		Ok(())
1567	}
1568}
1569
1570#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
1571#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
1572impl TableProvider for Transaction {
1573	/// Retrieve all table definitions for a specific database.
1574	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1575	async fn all_tb(
1576		&self,
1577		ns: NamespaceId,
1578		db: DatabaseId,
1579		version: Option<u64>,
1580	) -> Result<Arc<[TableDefinition]>> {
1581		let qey = cache::tx::Lookup::Tbs(ns, db);
1582		match self.cache.get(&qey) {
1583			Some(val) => val.try_into_tbs(),
1584			None => {
1585				let beg = crate::key::database::tb::prefix(ns, db)?;
1586				let end = crate::key::database::tb::suffix(ns, db)?;
1587				let val = self.getr(beg..end, version).await?;
1588				let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
1589				let entry = cache::tx::Entry::Tbs(val.clone());
1590				self.cache.insert(qey, entry);
1591				Ok(val)
1592			}
1593		}
1594	}
1595
1596	/// Retrieve all view definitions for a specific table.
1597	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1598	async fn all_tb_views(
1599		&self,
1600		ns: NamespaceId,
1601		db: DatabaseId,
1602		tb: &TableName,
1603	) -> Result<Arc<[catalog::TableDefinition]>> {
1604		let qey = cache::tx::Lookup::Fts(ns, db, tb);
1605		match self.cache.get(&qey) {
1606			Some(val) => val.try_into_fts(),
1607			None => {
1608				let beg = crate::key::table::ft::prefix(ns, db, tb)?;
1609				let end = crate::key::table::ft::suffix(ns, db, tb)?;
1610				let val = self.getr(beg..end, None).await?;
1611				let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
1612				let entry = cache::tx::Entry::Fts(val.clone());
1613				self.cache.insert(qey, entry);
1614				Ok(val)
1615			}
1616		}
1617	}
1618
1619	/// Get or add a table with a default configuration, only if we are in
1620	/// dynamic mode.
1621	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self, ctx))]
1622	async fn get_or_add_tb(
1623		&self,
1624		ctx: Option<&Context>,
1625		ns: &str,
1626		db: &str,
1627		tb: &TableName,
1628	) -> Result<Arc<TableDefinition>> {
1629		let qey = cache::tx::Lookup::TbByName(ns, db, tb);
1630		match self.cache.get(&qey) {
1631			// The entry is in the cache
1632			Some(val) => val.try_into_type(),
1633			// The entry is not in the cache
1634			None => {
1635				let Some(db_def) = self.get_db_by_name(ns, db).await? else {
1636					return Err(anyhow::anyhow!(Error::DbNotFound {
1637						name: db.to_owned(),
1638					}));
1639				};
1640
1641				let table_key =
1642					crate::key::database::tb::new(db_def.namespace_id, db_def.database_id, tb);
1643				if let Some(tb_def) = self.get(&table_key, None).await? {
1644					let cached_tb = Arc::new(tb_def);
1645					let cached_entry =
1646						cache::tx::Entry::Any(Arc::clone(&cached_tb) as Arc<dyn Any + Send + Sync>);
1647					self.cache.insert(qey, cached_entry);
1648					return Ok(cached_tb);
1649				}
1650
1651				if db_def.strict {
1652					return Err(Error::TbNotFound {
1653						name: tb.to_owned(),
1654					}
1655					.into());
1656				}
1657
1658				let tb_def = TableDefinition::new(
1659					db_def.namespace_id,
1660					db_def.database_id,
1661					self.get_next_tb_id(ctx, db_def.namespace_id, db_def.database_id).await?,
1662					tb.clone(),
1663				);
1664				self.put_tb(ns, db, &tb_def).await
1665			}
1666		}
1667	}
1668
1669	async fn get_tb_by_name(
1670		&self,
1671		ns: &str,
1672		db: &str,
1673		tb: &TableName,
1674	) -> Result<Option<Arc<TableDefinition>>> {
1675		let qey = cache::tx::Lookup::TbByName(ns, db, tb);
1676		match self.cache.get(&qey) {
1677			Some(val) => val.try_into_type().map(Some),
1678			None => {
1679				let Some(db) = self.get_db_by_name(ns, db).await? else {
1680					return Ok(None);
1681				};
1682
1683				let key = crate::key::database::tb::new(db.namespace_id, db.database_id, tb);
1684				let Some(tb) = self.get(&key, None).await? else {
1685					return Ok(None);
1686				};
1687
1688				let tb = Arc::new(tb);
1689				let entr = cache::tx::Entry::Any(tb.clone());
1690				self.cache.insert(qey, entr);
1691				Ok(Some(tb))
1692			}
1693		}
1694	}
1695
1696	async fn put_tb(
1697		&self,
1698		ns: &str,
1699		db: &str,
1700		tb: &TableDefinition,
1701	) -> Result<Arc<TableDefinition>> {
1702		let key = crate::key::database::tb::new(tb.namespace_id, tb.database_id, &tb.name);
1703		match self.set(&key, tb, None).await {
1704			Ok(_) => {}
1705			Err(e) => {
1706				if matches!(
1707					e.downcast_ref(),
1708					Some(Error::Kvs(crate::kvs::Error::TransactionReadonly))
1709				) {
1710					return Err(Error::TbNotFound {
1711						name: tb.name.clone(),
1712					}
1713					.into());
1714				}
1715				return Err(e);
1716			}
1717		}
1718
1719		// Invalidate the cached list of all tables for this database
1720		let list_key = cache::tx::Lookup::Tbs(tb.namespace_id, tb.database_id);
1721		self.cache.remove(list_key);
1722
1723		// Populate cache
1724		let cached_tb = Arc::new(tb.clone());
1725		let cached_entry =
1726			cache::tx::Entry::Any(Arc::clone(&cached_tb) as Arc<dyn Any + Send + Sync>);
1727
1728		let qey = cache::tx::Lookup::Tb(tb.namespace_id, tb.database_id, &tb.name);
1729		self.cache.insert(qey, cached_entry.clone());
1730
1731		let qey = cache::tx::Lookup::TbByName(ns, db, &tb.name);
1732		self.cache.insert(qey, cached_entry);
1733
1734		Ok(cached_tb)
1735	}
1736
1737	async fn del_tb(&self, ns: &str, db: &str, tb: &TableName) -> Result<()> {
1738		let Some(tb) = self.get_tb_by_name(ns, db, tb).await? else {
1739			return Err(Error::TbNotFound {
1740				name: tb.clone(),
1741			}
1742			.into());
1743		};
1744
1745		let key = crate::key::database::tb::new(tb.namespace_id, tb.database_id, &tb.name);
1746		self.del(&key).await?;
1747
1748		// Invalidate the cached list of all tables for this database
1749		let list_key = cache::tx::Lookup::Tbs(tb.namespace_id, tb.database_id);
1750		self.cache.remove(list_key);
1751
1752		// Clear the cache
1753		let qey = cache::tx::Lookup::Tb(tb.namespace_id, tb.database_id, &tb.name);
1754		self.cache.remove(qey);
1755		let qey = cache::tx::Lookup::TbByName(ns, db, &tb.name);
1756		self.cache.remove(qey);
1757
1758		Ok(())
1759	}
1760
1761	async fn clr_tb(&self, ns: &str, db: &str, tb: &TableName) -> Result<()> {
1762		let Some(tb) = self.get_tb_by_name(ns, db, tb).await? else {
1763			return Err(Error::TbNotFound {
1764				name: tb.clone(),
1765			}
1766			.into());
1767		};
1768
1769		let key = crate::key::database::tb::new(tb.namespace_id, tb.database_id, &tb.name);
1770		self.clr(&key).await?;
1771
1772		// Invalidate the cached list of all tables for this database
1773		let list_key = cache::tx::Lookup::Tbs(tb.namespace_id, tb.database_id);
1774		self.cache.remove(list_key);
1775
1776		// Clear the cache
1777		let qey = cache::tx::Lookup::Tb(tb.namespace_id, tb.database_id, &tb.name);
1778		self.cache.remove(qey);
1779		let qey = cache::tx::Lookup::TbByName(ns, db, &tb.name);
1780		self.cache.remove(qey);
1781
1782		Ok(())
1783	}
1784
1785	/// Retrieve all event definitions for a specific table.
1786	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1787	async fn all_tb_events(
1788		&self,
1789		ns: NamespaceId,
1790		db: DatabaseId,
1791		tb: &TableName,
1792	) -> Result<Arc<[catalog::EventDefinition]>> {
1793		let qey = cache::tx::Lookup::Evs(ns, db, tb);
1794		match self.cache.get(&qey) {
1795			Some(val) => val.try_into_evs(),
1796			None => {
1797				let beg = crate::key::table::ev::prefix(ns, db, tb)?;
1798				let end = crate::key::table::ev::suffix(ns, db, tb)?;
1799				let val = self.getr(beg..end, None).await?;
1800				let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
1801				let entry = cache::tx::Entry::Evs(val.clone());
1802				self.cache.insert(qey, entry);
1803				Ok(val)
1804			}
1805		}
1806	}
1807
1808	/// Retrieve all field definitions for a specific table.
1809	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1810	async fn all_tb_fields(
1811		&self,
1812		ns: NamespaceId,
1813		db: DatabaseId,
1814		tb: &TableName,
1815		version: Option<u64>,
1816	) -> Result<Arc<[catalog::FieldDefinition]>> {
1817		let qey = cache::tx::Lookup::Fds(ns, db, tb);
1818		match self.cache.get(&qey) {
1819			Some(val) => val.try_into_fds(),
1820			None => {
1821				let beg = crate::key::table::fd::prefix(ns, db, tb)?;
1822				let end = crate::key::table::fd::suffix(ns, db, tb)?;
1823				let val = self.getr(beg..end, version).await?;
1824				let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
1825				let entry = cache::tx::Entry::Fds(val.clone());
1826				self.cache.insert(qey, entry);
1827				Ok(val)
1828			}
1829		}
1830	}
1831
1832	/// Retrieve all index definitions for a specific table.
1833	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1834	async fn all_tb_indexes(
1835		&self,
1836		ns: NamespaceId,
1837		db: DatabaseId,
1838		tb: &TableName,
1839	) -> Result<Arc<[catalog::IndexDefinition]>> {
1840		let qey = cache::tx::Lookup::Ixs(ns, db, tb);
1841		match self.cache.get(&qey) {
1842			Some(val) => val.try_into_ixs(),
1843			None => {
1844				let beg = crate::key::table::ix::prefix(ns, db, tb)?;
1845				let end = crate::key::table::ix::suffix(ns, db, tb)?;
1846				let val = self.getr(beg..end, None).await?;
1847				let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
1848				let entry = cache::tx::Entry::Ixs(val.clone());
1849				self.cache.insert(qey, entry);
1850				Ok(val)
1851			}
1852		}
1853	}
1854
1855	/// Retrieve all live definitions for a specific table.
1856	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1857	async fn all_tb_lives(
1858		&self,
1859		ns: NamespaceId,
1860		db: DatabaseId,
1861		tb: &TableName,
1862	) -> Result<Arc<[catalog::SubscriptionDefinition]>> {
1863		let qey = cache::tx::Lookup::Lvs(ns, db, tb);
1864		match self.cache.get(&qey) {
1865			Some(val) => val.try_into_lvs(),
1866			None => {
1867				let beg = crate::key::table::lq::prefix(ns, db, tb)?;
1868				let end = crate::key::table::lq::suffix(ns, db, tb)?;
1869				let val = self.getr(beg..end, None).await?;
1870				let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
1871				let entry = cache::tx::Entry::Lvs(val.clone());
1872				self.cache.insert(qey, entry);
1873				Ok(val)
1874			}
1875		}
1876	}
1877
1878	/// Retrieve a specific table definition.
1879	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1880	async fn get_tb(
1881		&self,
1882		ns: NamespaceId,
1883		db: DatabaseId,
1884		tb: &TableName,
1885	) -> Result<Option<Arc<TableDefinition>>> {
1886		let qey = cache::tx::Lookup::Tb(ns, db, tb);
1887		match self.cache.get(&qey) {
1888			Some(val) => val.try_into_type().map(Some),
1889			None => {
1890				let key = crate::key::database::tb::new(ns, db, tb);
1891				let Some(val) = self.get(&key, None).await? else {
1892					return Ok(None);
1893				};
1894				let val = Arc::new(val);
1895				let entry = cache::tx::Entry::Any(val.clone());
1896				self.cache.insert(qey, entry);
1897				Ok(Some(val))
1898			}
1899		}
1900	}
1901
1902	/// Retrieve an event for a table.
1903	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1904	async fn get_tb_event(
1905		&self,
1906		ns: NamespaceId,
1907		db: DatabaseId,
1908		tb: &TableName,
1909		ev: &str,
1910	) -> Result<Arc<catalog::EventDefinition>> {
1911		let qey = cache::tx::Lookup::Ev(ns, db, tb, ev);
1912		match self.cache.get(&qey) {
1913			Some(val) => val.try_into_type(),
1914			None => {
1915				let key = crate::key::table::ev::new(ns, db, tb, ev);
1916				let val = self.get(&key, None).await?.ok_or_else(|| Error::EvNotFound {
1917					name: ev.to_owned(),
1918				})?;
1919				let val = Arc::new(val);
1920				let entry = cache::tx::Entry::Any(val.clone());
1921				self.cache.insert(qey, entry);
1922				Ok(val)
1923			}
1924		}
1925	}
1926
1927	/// Retrieve a field for a table.
1928	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1929	async fn get_tb_field(
1930		&self,
1931		ns: NamespaceId,
1932		db: DatabaseId,
1933		tb: &TableName,
1934		fd: &str,
1935	) -> Result<Option<Arc<catalog::FieldDefinition>>> {
1936		let qey = cache::tx::Lookup::Fd(ns, db, tb, fd);
1937		match self.cache.get(&qey) {
1938			Some(val) => val.try_into_type().map(Some),
1939			None => {
1940				let key = crate::key::table::fd::new(ns, db, tb, fd);
1941				let Some(val) = self.get(&key, None).await? else {
1942					return Ok(None);
1943				};
1944				let val = Arc::new(val);
1945				let entry = cache::tx::Entry::Any(val.clone());
1946				self.cache.insert(qey, entry);
1947				Ok(Some(val))
1948			}
1949		}
1950	}
1951
1952	async fn put_tb_field(
1953		&self,
1954		ns: NamespaceId,
1955		db: DatabaseId,
1956		tb: &TableName,
1957		fd: &catalog::FieldDefinition,
1958	) -> Result<()> {
1959		let name = fd.name.to_raw_string();
1960		let key = crate::key::table::fd::new(ns, db, tb, &name);
1961		self.set(&key, fd, None).await?;
1962
1963		// Invalidate the cached list of all fields for this table
1964		let list_key = cache::tx::Lookup::Fds(ns, db, tb.as_ref());
1965		self.cache.remove(list_key);
1966
1967		// Set the entry in the cache
1968		let qey = cache::tx::Lookup::Fd(ns, db, tb, &name);
1969		let entry = cache::tx::Entry::Any(Arc::new(fd.clone()));
1970		self.cache.insert(qey, entry);
1971		Ok(())
1972	}
1973
1974	/// Retrieve an index for a table.
1975	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
1976	async fn get_tb_index(
1977		&self,
1978		ns: NamespaceId,
1979		db: DatabaseId,
1980		tb: &TableName,
1981		ix: &str,
1982	) -> Result<Option<Arc<catalog::IndexDefinition>>> {
1983		let qey = cache::tx::Lookup::Ix(ns, db, tb, ix);
1984		match self.cache.get(&qey) {
1985			Some(val) => val.try_into_type().map(Some),
1986			None => {
1987				let key = crate::key::table::ix::new(ns, db, tb, ix);
1988				let Some(val) = self.get(&key, None).await? else {
1989					return Ok(None);
1990				};
1991				let val = Arc::new(val);
1992				let entry = cache::tx::Entry::Any(val.clone());
1993				self.cache.insert(qey, entry);
1994				Ok(Some(val))
1995			}
1996		}
1997	}
1998
1999	async fn get_tb_index_by_id(
2000		&self,
2001		ns: NamespaceId,
2002		db: DatabaseId,
2003		tb: &TableName,
2004		ix: IndexId,
2005	) -> Result<Option<Arc<catalog::IndexDefinition>>> {
2006		let key = crate::key::table::ix::IndexNameLookupKey::new(ns, db, tb, ix);
2007		let Some(index_name) = self.get(&key, None).await? else {
2008			return Ok(None);
2009		};
2010
2011		self.get_tb_index(ns, db, tb, &index_name).await
2012	}
2013
2014	async fn put_tb_index(
2015		&self,
2016		ns: NamespaceId,
2017		db: DatabaseId,
2018		tb: &TableName,
2019		ix: &catalog::IndexDefinition,
2020	) -> Result<()> {
2021		let key = crate::key::table::ix::new(ns, db, tb, &ix.name);
2022		self.set(&key, ix, None).await?;
2023
2024		let name_lookup_key =
2025			crate::key::table::ix::IndexNameLookupKey::new(ns, db, tb, ix.index_id);
2026		self.set(&name_lookup_key, &ix.name, None).await?;
2027
2028		// Invalidate the cached list of all indexes for this table
2029		let list_key = cache::tx::Lookup::Ixs(ns, db, tb.as_ref());
2030		self.cache.remove(list_key);
2031
2032		// Set the entry in the cache
2033		let qey = cache::tx::Lookup::Ix(ns, db, tb, &ix.name);
2034		let entry = cache::tx::Entry::Any(Arc::new(ix.clone()));
2035		self.cache.insert(qey, entry);
2036		Ok(())
2037	}
2038
2039	async fn del_tb_index(
2040		&self,
2041		ns: NamespaceId,
2042		db: DatabaseId,
2043		tb: &TableName,
2044		ix: &str,
2045	) -> Result<()> {
2046		// Get the index definition
2047		let Some(ix) = self.get_tb_index(ns, db, tb, ix).await? else {
2048			return Ok(());
2049		};
2050
2051		// Remove the index data
2052		let key = crate::key::index::all::new(ns, db, tb, ix.index_id);
2053		self.delp(&key).await?;
2054
2055		// Delete the definition
2056		let key = crate::key::table::ix::new(ns, db, tb, &ix.name);
2057		self.del(&key).await?;
2058
2059		// Invalidate the cached list of all indexes for this table
2060		let list_key = cache::tx::Lookup::Ixs(ns, db, tb.as_ref());
2061		self.cache.remove(list_key);
2062
2063		// Invalidate the cached index entry
2064		let index_key = cache::tx::Lookup::Ix(ns, db, tb.as_ref(), &ix.name);
2065		self.cache.remove(index_key);
2066
2067		Ok(())
2068	}
2069
2070	/// Fetch a specific record value.
2071	///
2072	/// This function will return a new default initialized record if non exists.
2073	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2074	async fn get_record(
2075		&self,
2076		ns: NamespaceId,
2077		db: DatabaseId,
2078		tb: &TableName,
2079		id: &RecordIdKey,
2080		version: Option<u64>,
2081	) -> Result<Arc<Record>> {
2082		// Cache is not versioned
2083		if version.is_some() {
2084			// Fetch the record from the datastore
2085			let key = crate::key::record::new(ns, db, tb, id);
2086			match self.get(&key, version).await? {
2087				// The value exists in the datastore
2088				Some(mut record) => {
2089					// Inject the id field into the document
2090					let rid = RecordId {
2091						table: tb.to_owned(),
2092						key: id.clone(),
2093					};
2094					record.data.def(rid);
2095					// Convert to read-only format for better sharing and performance
2096					Ok(record.into_read_only())
2097				}
2098				// The value is not in the datastore
2099				None => Ok(Arc::new(Default::default())),
2100			}
2101		} else {
2102			let qey = cache::tx::Lookup::Record(ns, db, tb, id);
2103			match self.cache.get(&qey) {
2104				// The entry is in the cache
2105				Some(val) => val.try_into_record(),
2106				// The entry is not in the cache
2107				None => {
2108					// Fetch the record from the datastore
2109					let key = crate::key::record::new(ns, db, tb, id);
2110					match self.get(&key, None).await? {
2111						// The value exists in the datastore
2112						Some(mut record) => {
2113							// Inject the id field into the document
2114							let rid = RecordId {
2115								table: tb.to_owned(),
2116								key: id.clone(),
2117							};
2118							record.data.def(rid);
2119							// Convert to read-only format for better sharing and performance
2120							let record = record.into_read_only();
2121							let entry = cache::tx::Entry::Val(record.clone());
2122							self.cache.insert(qey, entry);
2123							Ok(record)
2124						}
2125						// The value is not in the datastore
2126						None => Ok(Arc::new(Default::default())),
2127					}
2128				}
2129			}
2130		}
2131	}
2132
2133	async fn record_exists(
2134		&self,
2135		ns: NamespaceId,
2136		db: DatabaseId,
2137		tb: &TableName,
2138		id: &RecordIdKey,
2139	) -> Result<bool> {
2140		let key = crate::key::record::new(ns, db, tb, id);
2141		Ok(self.exists(&key, None).await?)
2142	}
2143
2144	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2145	async fn put_record(
2146		&self,
2147		ns: NamespaceId,
2148		db: DatabaseId,
2149		tb: &TableName,
2150		id: &RecordIdKey,
2151		record: Arc<Record>,
2152		version: Option<u64>,
2153	) -> Result<()> {
2154		let key = crate::key::record::new(ns, db, tb, id);
2155		self.put(&key, &record, version).await?;
2156		self.set_record_cache(ns, db, tb, id, record);
2157		Ok(())
2158	}
2159
2160	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2161	async fn set_record(
2162		&self,
2163		ns: NamespaceId,
2164		db: DatabaseId,
2165		tb: &TableName,
2166		id: &RecordIdKey,
2167		record: Arc<Record>,
2168		version: Option<u64>,
2169	) -> Result<()> {
2170		// Set the value in the datastore
2171		let key = crate::key::record::new(ns, db, tb, id);
2172		self.set(&key, &record, version).await?;
2173		// Set the value in the cache
2174		self.set_record_cache(ns, db, tb, id, record);
2175		// Return nothing
2176		Ok(())
2177	}
2178
2179	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2180	async fn del_record(
2181		&self,
2182		ns: NamespaceId,
2183		db: DatabaseId,
2184		tb: &TableName,
2185		id: &RecordIdKey,
2186	) -> Result<()> {
2187		// Delete the value in the datastore
2188		let key = crate::key::record::new(ns, db, tb, id);
2189		self.del(&key).await?;
2190		// Clear the value from the cache
2191		let qey = cache::tx::Lookup::Record(ns, db, tb, id);
2192		self.cache.remove(qey);
2193		// Return nothing
2194		Ok(())
2195	}
2196
2197	async fn get_next_tb_id(
2198		&self,
2199		ctx: Option<&Context>,
2200		ns: NamespaceId,
2201		db: DatabaseId,
2202	) -> Result<TableId> {
2203		self.sequences.next_table_id(ctx, ns, db).await
2204	}
2205}
2206
2207#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
2208#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
2209impl UserProvider for Transaction {
2210	/// Retrieve all ROOT level users in a datastore.
2211	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2212	async fn all_root_users(&self) -> Result<Arc<[catalog::UserDefinition]>> {
2213		let qey = cache::tx::Lookup::Rus;
2214		match self.cache.get(&qey) {
2215			Some(val) => val.try_into_rus(),
2216			None => {
2217				let beg = crate::key::root::us::prefix();
2218				let end = crate::key::root::us::suffix();
2219				let val = self.getr(beg..end, None).await?;
2220				let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
2221				let entry = cache::tx::Entry::Rus(val.clone());
2222				self.cache.insert(qey, entry);
2223				Ok(val)
2224			}
2225		}
2226	}
2227
2228	/// Retrieve all namespace user definitions for a specific namespace.
2229	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2230	async fn all_ns_users(&self, ns: NamespaceId) -> Result<Arc<[catalog::UserDefinition]>> {
2231		let qey = cache::tx::Lookup::Nus(ns);
2232		match self.cache.get(&qey) {
2233			Some(val) => val.try_into_nus(),
2234			None => {
2235				let beg = crate::key::namespace::us::prefix(ns)?;
2236				let end = crate::key::namespace::us::suffix(ns)?;
2237				let val = self.getr(beg..end, None).await?;
2238				let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
2239				let entry = cache::tx::Entry::Nus(val.clone());
2240				self.cache.insert(qey, entry);
2241				Ok(val)
2242			}
2243		}
2244	}
2245
2246	/// Retrieve all database user definitions for a specific database.
2247	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2248	async fn all_db_users(
2249		&self,
2250		ns: NamespaceId,
2251		db: DatabaseId,
2252	) -> Result<Arc<[catalog::UserDefinition]>> {
2253		let qey = cache::tx::Lookup::Dus(ns, db);
2254		match self.cache.get(&qey) {
2255			Some(val) => val.try_into_dus(),
2256			None => {
2257				let beg = crate::key::database::us::prefix(ns, db)?;
2258				let end = crate::key::database::us::suffix(ns, db)?;
2259				let val = self.getr(beg..end, None).await?;
2260				let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
2261				let entry = cache::tx::Entry::Dus(val.clone());
2262				self.cache.insert(qey, entry);
2263				Ok(val)
2264			}
2265		}
2266	}
2267
2268	/// Retrieve a specific root user definition.
2269	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2270	async fn get_root_user(&self, us: &str) -> Result<Option<Arc<catalog::UserDefinition>>> {
2271		let qey = cache::tx::Lookup::Ru(us);
2272		match self.cache.get(&qey) {
2273			Some(val) => val.try_into_type().map(Some),
2274			None => {
2275				let key = crate::key::root::us::new(us);
2276				let Some(val) = self.get(&key, None).await? else {
2277					return Ok(None);
2278				};
2279				let val = Arc::new(val);
2280				let entry = cache::tx::Entry::Any(val.clone());
2281				self.cache.insert(qey, entry);
2282				Ok(Some(val))
2283			}
2284		}
2285	}
2286
2287	/// Retrieve a specific namespace user definition.
2288	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2289	async fn get_ns_user(
2290		&self,
2291		ns: NamespaceId,
2292		us: &str,
2293	) -> Result<Option<Arc<catalog::UserDefinition>>> {
2294		let qey = cache::tx::Lookup::Nu(ns, us);
2295		match self.cache.get(&qey) {
2296			Some(val) => val.try_into_type().map(Some),
2297			None => {
2298				let key = crate::key::namespace::us::new(ns, us);
2299				let Some(val) = self.get(&key, None).await? else {
2300					return Ok(None);
2301				};
2302
2303				let val = Arc::new(val);
2304				let entry = cache::tx::Entry::Any(val.clone());
2305				self.cache.insert(qey, entry);
2306				Ok(Some(val))
2307			}
2308		}
2309	}
2310
2311	/// Retrieve a specific user definition from a database.
2312	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2313	async fn get_db_user(
2314		&self,
2315		ns: NamespaceId,
2316		db: DatabaseId,
2317		us: &str,
2318	) -> Result<Option<Arc<catalog::UserDefinition>>> {
2319		let qey = cache::tx::Lookup::Du(ns, db, us);
2320		match self.cache.get(&qey) {
2321			Some(val) => val.try_into_type().map(Some),
2322			None => {
2323				let key = crate::key::database::us::new(ns, db, us);
2324				let Some(val) = self.get(&key, None).await? else {
2325					return Ok(None);
2326				};
2327
2328				let val = Arc::new(val);
2329				let entry = cache::tx::Entry::Any(val.clone());
2330				self.cache.insert(qey, entry);
2331				Ok(Some(val))
2332			}
2333		}
2334	}
2335
2336	async fn put_root_user(&self, us: &catalog::UserDefinition) -> Result<()> {
2337		let key = crate::key::root::us::new(&us.name);
2338		self.set(&key, us, None).await?;
2339
2340		// Invalidate the cached list of all root users
2341		let list_key = cache::tx::Lookup::Rus;
2342		self.cache.remove(list_key);
2343
2344		// Set the entry in the cache
2345		let qey = cache::tx::Lookup::Ru(&us.name);
2346		let entry = cache::tx::Entry::Any(Arc::new(us.clone()));
2347		self.cache.insert(qey, entry);
2348
2349		Ok(())
2350	}
2351
2352	async fn put_ns_user(&self, ns: NamespaceId, us: &catalog::UserDefinition) -> Result<()> {
2353		let key = crate::key::namespace::us::new(ns, &us.name);
2354		self.set(&key, us, None).await?;
2355
2356		// Invalidate the cached list of all namespace users
2357		let list_key = cache::tx::Lookup::Nus(ns);
2358		self.cache.remove(list_key);
2359
2360		// Set the entry in the cache
2361		let qey = cache::tx::Lookup::Nu(ns, &us.name);
2362		let entry = cache::tx::Entry::Any(Arc::new(us.clone()));
2363		self.cache.insert(qey, entry);
2364
2365		Ok(())
2366	}
2367
2368	async fn put_db_user(
2369		&self,
2370		ns: NamespaceId,
2371		db: DatabaseId,
2372		us: &catalog::UserDefinition,
2373	) -> Result<()> {
2374		let key = crate::key::database::us::new(ns, db, &us.name);
2375		self.set(&key, us, None).await?;
2376
2377		// Invalidate the cached list of all database users
2378		let list_key = cache::tx::Lookup::Dus(ns, db);
2379		self.cache.remove(list_key);
2380
2381		// Set the entry in the cache
2382		let qey = cache::tx::Lookup::Du(ns, db, &us.name);
2383		let entry = cache::tx::Entry::Any(Arc::new(us.clone()));
2384		self.cache.insert(qey, entry);
2385
2386		Ok(())
2387	}
2388}
2389
2390#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
2391#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
2392impl AuthorisationProvider for Transaction {
2393	/// Retrieve all ROOT level accesses in a datastore.
2394	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2395	async fn all_root_accesses(&self) -> Result<Arc<[catalog::AccessDefinition]>> {
2396		let qey = cache::tx::Lookup::Ras;
2397		match self.cache.get(&qey) {
2398			Some(val) => val.try_into_ras(),
2399			None => {
2400				let beg = crate::key::root::ac::prefix();
2401				let end = crate::key::root::ac::suffix();
2402				let val = self.getr(beg..end, None).await?;
2403				let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
2404				let entry = cache::tx::Entry::Ras(val.clone());
2405				self.cache.insert(qey, entry);
2406				Ok(val)
2407			}
2408		}
2409	}
2410
2411	/// Retrieve all root access grants in a datastore.
2412	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2413	async fn all_root_access_grants(&self, ra: &str) -> Result<Arc<[catalog::AccessGrant]>> {
2414		let qey = cache::tx::Lookup::Rgs(ra);
2415		match self.cache.get(&qey) {
2416			Some(val) => val.try_into_rag(),
2417			None => {
2418				let beg = crate::key::root::access::gr::prefix(ra)?;
2419				let end = crate::key::root::access::gr::suffix(ra)?;
2420				let val = self.getr(beg..end, None).await?;
2421				let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
2422				let entry = cache::tx::Entry::Rag(val.clone());
2423				self.cache.insert(qey, entry);
2424				Ok(val)
2425			}
2426		}
2427	}
2428
2429	/// Retrieve all namespace access definitions for a specific namespace.
2430	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2431	async fn all_ns_accesses(&self, ns: NamespaceId) -> Result<Arc<[catalog::AccessDefinition]>> {
2432		let qey = cache::tx::Lookup::Nas(ns);
2433		match self.cache.get(&qey) {
2434			Some(val) => val.try_into_nas(),
2435			None => {
2436				let beg = crate::key::namespace::ac::prefix(ns)?;
2437				let end = crate::key::namespace::ac::suffix(ns)?;
2438				let val = self.getr(beg..end, None).await?;
2439				let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
2440				let entry = cache::tx::Entry::Nas(val.clone());
2441				self.cache.insert(qey, entry);
2442				Ok(val)
2443			}
2444		}
2445	}
2446
2447	/// Retrieve all namespace access grants for a specific namespace.
2448	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2449	async fn all_ns_access_grants(
2450		&self,
2451		ns: NamespaceId,
2452		na: &str,
2453	) -> Result<Arc<[catalog::AccessGrant]>> {
2454		let qey = cache::tx::Lookup::Ngs(ns, na);
2455		match self.cache.get(&qey) {
2456			Some(val) => val.try_into_nag(),
2457			None => {
2458				let beg = crate::key::namespace::access::gr::prefix(ns, na)?;
2459				let end = crate::key::namespace::access::gr::suffix(ns, na)?;
2460				let val = self.getr(beg..end, None).await?;
2461				let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
2462				let entry = cache::tx::Entry::Nag(val.clone());
2463				self.cache.insert(qey, entry);
2464				Ok(val)
2465			}
2466		}
2467	}
2468
2469	/// Retrieve all database access definitions for a specific database.
2470	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2471	async fn all_db_accesses(
2472		&self,
2473		ns: NamespaceId,
2474		db: DatabaseId,
2475	) -> Result<Arc<[catalog::AccessDefinition]>> {
2476		let qey = cache::tx::Lookup::Das(ns, db);
2477		match self.cache.get(&qey) {
2478			Some(val) => val.try_into_das(),
2479			None => {
2480				let beg = crate::key::database::ac::prefix(ns, db)?;
2481				let end = crate::key::database::ac::suffix(ns, db)?;
2482				let val = self.getr(beg..end, None).await?;
2483				let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
2484				let entry = cache::tx::Entry::Das(val.clone());
2485				self.cache.insert(qey, entry);
2486				Ok(val)
2487			}
2488		}
2489	}
2490
2491	/// Retrieve all database access grants for a specific database.
2492	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2493	async fn all_db_access_grants(
2494		&self,
2495		ns: NamespaceId,
2496		db: DatabaseId,
2497		da: &str,
2498	) -> Result<Arc<[catalog::AccessGrant]>> {
2499		let qey = cache::tx::Lookup::Dgs(ns, db, da);
2500		match self.cache.get(&qey) {
2501			Some(val) => val.try_into_dag(),
2502			None => {
2503				let beg = crate::key::database::access::gr::prefix(ns, db, da)?;
2504				let end = crate::key::database::access::gr::suffix(ns, db, da)?;
2505				let val = self.getr(beg..end, None).await?;
2506				let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
2507				let entry = cache::tx::Entry::Dag(val.clone());
2508				self.cache.insert(qey, entry);
2509				Ok(val)
2510			}
2511		}
2512	}
2513
2514	/// Retrieve a specific root access definition.
2515	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2516	async fn get_root_access(&self, ra: &str) -> Result<Option<Arc<catalog::AccessDefinition>>> {
2517		let qey = cache::tx::Lookup::Ra(ra);
2518		match self.cache.get(&qey) {
2519			Some(val) => val.try_into_type().map(Some),
2520			None => {
2521				let key = crate::key::root::ac::new(ra);
2522				let Some(val) = self.get(&key, None).await? else {
2523					return Ok(None);
2524				};
2525				let val = Arc::new(val);
2526				let entry = cache::tx::Entry::Any(val.clone());
2527				self.cache.insert(qey, entry);
2528				Ok(Some(val))
2529			}
2530		}
2531	}
2532
2533	/// Retrieve a specific root access grant.
2534	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2535	async fn get_root_access_grant(
2536		&self,
2537		ac: &str,
2538		gr: &str,
2539	) -> Result<Option<Arc<catalog::AccessGrant>>> {
2540		let qey = cache::tx::Lookup::Rg(ac, gr);
2541		match self.cache.get(&qey) {
2542			Some(val) => val.try_into_type().map(Some),
2543			None => {
2544				let key = crate::key::root::access::gr::new(ac, gr);
2545				let Some(val) = self.get(&key, None).await? else {
2546					return Ok(None);
2547				};
2548				let val = Arc::new(val);
2549				let entry = cache::tx::Entry::Any(val.clone());
2550				self.cache.insert(qey, entry);
2551				Ok(Some(val))
2552			}
2553		}
2554	}
2555
2556	/// Retrieve a specific namespace access definition.
2557	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2558	async fn get_ns_access(
2559		&self,
2560		ns: NamespaceId,
2561		na: &str,
2562	) -> Result<Option<Arc<catalog::AccessDefinition>>> {
2563		let qey = cache::tx::Lookup::Na(ns, na);
2564		match self.cache.get(&qey) {
2565			Some(val) => val.try_into_type().map(Some),
2566			None => {
2567				let key = crate::key::namespace::ac::new(ns, na);
2568				let Some(val) = self.get(&key, None).await? else {
2569					return Ok(None);
2570				};
2571				let val = Arc::new(val);
2572				let entry = cache::tx::Entry::Any(val.clone());
2573				self.cache.insert(qey, entry);
2574				Ok(Some(val))
2575			}
2576		}
2577	}
2578
2579	/// Retrieve a specific namespace access grant.
2580	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2581	async fn get_ns_access_grant(
2582		&self,
2583		ns: NamespaceId,
2584		ac: &str,
2585		gr: &str,
2586	) -> Result<Option<Arc<catalog::AccessGrant>>> {
2587		let qey = cache::tx::Lookup::Ng(ns, ac, gr);
2588		match self.cache.get(&qey) {
2589			Some(val) => val.try_into_type().map(Some),
2590			None => {
2591				let key = crate::key::namespace::access::gr::new(ns, ac, gr);
2592				let Some(val) = self.get(&key, None).await? else {
2593					return Ok(None);
2594				};
2595				let val = Arc::new(val);
2596				let entry = cache::tx::Entry::Any(val.clone());
2597				self.cache.insert(qey, entry);
2598				Ok(Some(val))
2599			}
2600		}
2601	}
2602
2603	/// Retrieve a specific database access definition.
2604	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2605	async fn get_db_access(
2606		&self,
2607		ns: NamespaceId,
2608		db: DatabaseId,
2609		da: &str,
2610	) -> Result<Option<Arc<catalog::AccessDefinition>>> {
2611		let qey = cache::tx::Lookup::Da(ns, db, da);
2612		match self.cache.get(&qey) {
2613			Some(val) => val.try_into_type().map(Some),
2614			None => {
2615				let key = crate::key::database::ac::new(ns, db, da);
2616				let Some(val) = self.get(&key, None).await? else {
2617					return Ok(None);
2618				};
2619				let val = Arc::new(val);
2620				let entry = cache::tx::Entry::Any(val.clone());
2621				self.cache.insert(qey, entry);
2622				Ok(Some(val))
2623			}
2624		}
2625	}
2626
2627	/// Retrieve a specific database access grant.
2628	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2629	async fn get_db_access_grant(
2630		&self,
2631		ns: NamespaceId,
2632		db: DatabaseId,
2633		ac: &str,
2634		gr: &str,
2635	) -> Result<Option<Arc<catalog::AccessGrant>>> {
2636		let qey = cache::tx::Lookup::Dg(ns, db, ac, gr);
2637		match self.cache.get(&qey) {
2638			Some(val) => val.try_into_type().map(Some),
2639			None => {
2640				let key = crate::key::database::access::gr::new(ns, db, ac, gr);
2641				let Some(val) = self.get(&key, None).await? else {
2642					return Ok(None);
2643				};
2644				let val = Arc::new(val);
2645				let entry = cache::tx::Entry::Any(val.clone());
2646				self.cache.insert(qey, entry);
2647				Ok(Some(val))
2648			}
2649		}
2650	}
2651
2652	async fn del_root_access(&self, ra: &str) -> Result<()> {
2653		// Delete the definition
2654		let key = crate::key::root::ac::new(ra);
2655		self.del(&key).await?;
2656		// Delete any associated data including access grants.
2657		let key = crate::key::root::access::all::new(ra);
2658		self.delp(&key).await?;
2659
2660		// Invalidate the cached list of all root accesses
2661		let list_key = cache::tx::Lookup::Ras;
2662		self.cache.remove(list_key);
2663
2664		// Invalidate the cached access entry and grants
2665		let access_key = cache::tx::Lookup::Ra(ra);
2666		self.cache.remove(access_key);
2667		let grants_key = cache::tx::Lookup::Rgs(ra);
2668		self.cache.remove(grants_key);
2669
2670		Ok(())
2671	}
2672
2673	async fn del_ns_access(&self, ns: NamespaceId, na: &str) -> Result<()> {
2674		// Delete the definition
2675		let key = crate::key::namespace::ac::new(ns, na);
2676		self.del(&key).await?;
2677		// Delete any associated data including access grants.
2678		let key = crate::key::namespace::access::all::new(ns, na);
2679		self.delp(&key).await?;
2680
2681		// Invalidate the cached list of all namespace accesses
2682		let list_key = cache::tx::Lookup::Nas(ns);
2683		self.cache.remove(list_key);
2684
2685		// Invalidate the cached access entry and grants
2686		let access_key = cache::tx::Lookup::Na(ns, na);
2687		self.cache.remove(access_key);
2688		let grants_key = cache::tx::Lookup::Ngs(ns, na);
2689		self.cache.remove(grants_key);
2690
2691		Ok(())
2692	}
2693
2694	async fn del_db_access(&self, ns: NamespaceId, db: DatabaseId, da: &str) -> Result<()> {
2695		// Delete the definition
2696		let key = crate::key::database::ac::new(ns, db, da);
2697		self.del(&key).await?;
2698		// Delete any associated data including access grants.
2699		let key = crate::key::database::access::all::new(ns, db, da);
2700		self.delp(&key).await?;
2701
2702		// Invalidate the cached list of all database accesses
2703		let list_key = cache::tx::Lookup::Das(ns, db);
2704		self.cache.remove(list_key);
2705
2706		// Invalidate the cached access entry and grants
2707		let access_key = cache::tx::Lookup::Da(ns, db, da);
2708		self.cache.remove(access_key);
2709		let grants_key = cache::tx::Lookup::Dgs(ns, db, da);
2710		self.cache.remove(grants_key);
2711
2712		Ok(())
2713	}
2714}
2715
2716#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
2717#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
2718impl ApiProvider for Transaction {
2719	/// Retrieve all api definitions for a specific database.
2720	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2721	async fn all_db_apis(&self, ns: NamespaceId, db: DatabaseId) -> Result<Arc<[ApiDefinition]>> {
2722		let qey = cache::tx::Lookup::Aps(ns, db);
2723		match self.cache.get(&qey) {
2724			Some(val) => val,
2725			None => {
2726				let beg = crate::key::database::ap::prefix(ns, db)?;
2727				let end = crate::key::database::ap::suffix(ns, db)?;
2728				let val = self.getr(beg..end, None).await?;
2729				let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
2730				let val = cache::tx::Entry::Aps(Arc::clone(&val));
2731				self.cache.insert(qey, val.clone());
2732				val
2733			}
2734		}
2735		.try_into_aps()
2736	}
2737
2738	/// Retrieve a specific api definition.
2739	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2740	async fn get_db_api(
2741		&self,
2742		ns: NamespaceId,
2743		db: DatabaseId,
2744		ap: &str,
2745	) -> Result<Option<Arc<ApiDefinition>>> {
2746		let qey = cache::tx::Lookup::Ap(ns, db, ap);
2747		match self.cache.get(&qey) {
2748			Some(val) => val.try_into_type().map(Some),
2749			None => {
2750				let key = crate::key::database::ap::new(ns, db, ap);
2751				let Some(val) = self.get(&key, None).await? else {
2752					return Ok(None);
2753				};
2754				let val = Arc::new(val);
2755				let entry = cache::tx::Entry::Any(val.clone());
2756				self.cache.insert(qey, entry);
2757				Ok(Some(val))
2758			}
2759		}
2760	}
2761
2762	async fn put_db_api(&self, ns: NamespaceId, db: DatabaseId, ap: &ApiDefinition) -> Result<()> {
2763		let name = ap.path.to_string();
2764		let key = crate::key::database::ap::new(ns, db, &name);
2765		self.set(&key, ap, None).await?;
2766
2767		// Invalidate the cached list of all APIs for this database
2768		let list_key = cache::tx::Lookup::Aps(ns, db);
2769		self.cache.remove(list_key);
2770
2771		// Set the entry in the cache
2772		let qey = cache::tx::Lookup::Ap(ns, db, &name);
2773		let entry = cache::tx::Entry::Any(Arc::new(ap.clone()));
2774		self.cache.insert(qey, entry);
2775
2776		Ok(())
2777	}
2778}
2779
2780#[cfg_attr(target_family = "wasm", async_trait::async_trait(?Send))]
2781#[cfg_attr(not(target_family = "wasm"), async_trait::async_trait)]
2782impl BucketProvider for Transaction {
2783	/// Retrieve all bucket definitions for a specific database.
2784	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2785	async fn all_db_buckets(
2786		&self,
2787		ns: NamespaceId,
2788		db: DatabaseId,
2789	) -> Result<Arc<[catalog::BucketDefinition]>> {
2790		let qey = cache::tx::Lookup::Bus(ns, db);
2791		match self.cache.get(&qey) {
2792			Some(val) => val.try_into_bus(),
2793			None => {
2794				let beg = crate::key::database::bu::prefix(ns, db)?;
2795				let end = crate::key::database::bu::suffix(ns, db)?;
2796				let val = self.getr(beg..end, None).await?;
2797				let val = util::deserialize_cache(val.iter().map(|x| x.1.as_slice()))?;
2798				let entry = cache::tx::Entry::Bus(val.clone());
2799				self.cache.insert(qey, entry);
2800				Ok(val)
2801			}
2802		}
2803	}
2804
2805	/// Retrieve a specific bucket definition from a database.
2806	#[instrument(level = "trace", target = "surrealdb::core::kvs::tx", skip(self))]
2807	async fn get_db_bucket(
2808		&self,
2809		ns: NamespaceId,
2810		db: DatabaseId,
2811		bu: &str,
2812	) -> Result<Option<Arc<catalog::BucketDefinition>>> {
2813		let qey = cache::tx::Lookup::Bu(ns, db, bu);
2814		match self.cache.get(&qey) {
2815			Some(val) => val.try_into_type().map(Some),
2816			None => {
2817				let key = crate::key::database::bu::new(ns, db, bu);
2818				let Some(val) = self.get(&key, None).await? else {
2819					return Ok(None);
2820				};
2821				let bucket_def = Arc::new(val);
2822				let entr = cache::tx::Entry::Any(bucket_def.clone());
2823				self.cache.insert(qey, entr);
2824				Ok(Some(bucket_def))
2825			}
2826		}
2827	}
2828}
2829
2830impl CatalogProvider for Transaction {}