use super::tr::Check;
use super::util;
use crate::cnf::COUNT_BATCH_SIZE;
use crate::cnf::NORMAL_FETCH_SIZE;
use crate::err::Error;
use crate::key::debug::Sprintable;
use crate::kvs::{batch::Batch, Key, KeyEncode, Val, Version};
use crate::vs::VersionStamp;
use std::fmt::Debug;
use std::ops::Range;
#[allow(dead_code)] pub trait Transaction {
fn check_level(&mut self, check: Check);
fn closed(&self) -> bool;
fn writeable(&self) -> bool;
async fn cancel(&mut self) -> Result<(), Error>;
async fn commit(&mut self) -> Result<(), Error>;
async fn exists<K>(&mut self, key: K, version: Option<u64>) -> Result<bool, Error>
where
K: KeyEncode + Sprintable + Debug;
async fn get<K>(&mut self, key: K, version: Option<u64>) -> Result<Option<Val>, Error>
where
K: KeyEncode + Sprintable + Debug;
async fn set<K, V>(&mut self, key: K, val: V, version: Option<u64>) -> Result<(), Error>
where
K: KeyEncode + Sprintable + Debug,
V: Into<Val> + Debug;
async fn put<K, V>(&mut self, key: K, val: V, version: Option<u64>) -> Result<(), Error>
where
K: KeyEncode + Sprintable + Debug,
V: Into<Val> + Debug;
async fn putc<K, V>(&mut self, key: K, val: V, chk: Option<V>) -> Result<(), Error>
where
K: KeyEncode + Sprintable + Debug,
V: Into<Val> + Debug;
async fn del<K>(&mut self, key: K) -> Result<(), Error>
where
K: KeyEncode + Sprintable + Debug;
async fn delc<K, V>(&mut self, key: K, chk: Option<V>) -> Result<(), Error>
where
K: KeyEncode + Sprintable + Debug,
V: Into<Val> + Debug;
async fn keys<K>(
&mut self,
rng: Range<K>,
limit: u32,
version: Option<u64>,
) -> Result<Vec<Key>, Error>
where
K: KeyEncode + Sprintable + Debug;
async fn scan<K>(
&mut self,
rng: Range<K>,
limit: u32,
version: Option<u64>,
) -> Result<Vec<(Key, Val)>, Error>
where
K: KeyEncode + Sprintable + Debug;
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))]
async fn replace<K, V>(&mut self, key: K, val: V) -> Result<(), Error>
where
K: KeyEncode + Sprintable + Debug,
V: Into<Val> + Debug,
{
self.set(key, val, None).await
}
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))]
async fn clr<K>(&mut self, key: K) -> Result<(), Error>
where
K: KeyEncode + Sprintable + Debug,
{
self.del(key).await
}
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))]
async fn clrc<K, V>(&mut self, key: K, chk: Option<V>) -> Result<(), Error>
where
K: KeyEncode + Sprintable + Debug,
V: Into<Val> + Debug,
{
self.delc(key, chk).await
}
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(keys = keys.sprint()))]
async fn getm<K>(&mut self, keys: Vec<K>) -> Result<Vec<Option<Val>>, Error>
where
K: KeyEncode + Sprintable + Debug,
{
if self.closed() {
return Err(Error::TxFinished);
}
let mut out = Vec::with_capacity(keys.len());
for key in keys.into_iter() {
if let Some(val) = self.get(key, None).await? {
out.push(Some(val));
} else {
out.push(None);
}
}
Ok(out)
}
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))]
async fn getp<K>(&mut self, key: K) -> Result<Vec<(Key, Val)>, Error>
where
K: KeyEncode + Sprintable + Debug,
{
if self.closed() {
return Err(Error::TxFinished);
}
let range = util::to_prefix_range(key)?;
self.getr(range, None).await
}
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(rng = rng.sprint()))]
async fn getr<K>(
&mut self,
rng: Range<K>,
version: Option<u64>,
) -> Result<Vec<(Key, Val)>, Error>
where
K: KeyEncode + Sprintable + Debug,
{
if self.closed() {
return Err(Error::TxFinished);
}
let mut out = vec![];
let beg: Key = rng.start.encode()?;
let end: Key = rng.end.encode()?;
let mut next = Some(beg..end);
while let Some(rng) = next {
let res = self.batch_keys_vals(rng, *NORMAL_FETCH_SIZE, version).await?;
next = res.next;
for v in res.result.into_iter() {
out.push(v);
}
}
Ok(out)
}
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))]
async fn delp<K>(&mut self, key: K) -> Result<(), Error>
where
K: KeyEncode + Sprintable + Debug,
{
if self.closed() {
return Err(Error::TxFinished);
}
if !self.writeable() {
return Err(Error::TxReadonly);
}
let range = util::to_prefix_range(key)?;
self.delr(range).await
}
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(rng = rng.sprint()))]
async fn delr<K>(&mut self, rng: Range<K>) -> Result<(), Error>
where
K: KeyEncode + Sprintable + Debug,
{
if self.closed() {
return Err(Error::TxFinished);
}
if !self.writeable() {
return Err(Error::TxReadonly);
}
let beg: Key = rng.start.encode()?;
let end: Key = rng.end.encode()?;
let mut next = Some(beg..end);
while let Some(rng) = next {
let res = self.batch_keys(rng, *NORMAL_FETCH_SIZE, None).await?;
next = res.next;
for k in res.result.into_iter() {
self.del(k).await?;
}
}
Ok(())
}
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))]
async fn clrp<K>(&mut self, key: K) -> Result<(), Error>
where
K: KeyEncode + Sprintable + Debug,
{
if self.closed() {
return Err(Error::TxFinished);
}
if !self.writeable() {
return Err(Error::TxReadonly);
}
let range = util::to_prefix_range(key)?;
self.clrr(range).await
}
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(rng = rng.sprint()))]
async fn clrr<K>(&mut self, rng: Range<K>) -> Result<(), Error>
where
K: KeyEncode + Sprintable + Debug,
{
if self.closed() {
return Err(Error::TxFinished);
}
if !self.writeable() {
return Err(Error::TxReadonly);
}
let beg: Key = rng.start.encode()?;
let end: Key = rng.end.encode()?;
let mut next = Some(beg..end);
while let Some(rng) = next {
let res = self.batch_keys(rng, *NORMAL_FETCH_SIZE, None).await?;
next = res.next;
for k in res.result {
self.clr(k).await?;
}
}
Ok(())
}
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(rng = rng.sprint()))]
async fn count<K>(&mut self, rng: Range<K>) -> Result<usize, Error>
where
K: KeyEncode + Sprintable + Debug,
{
if self.closed() {
return Err(Error::TxFinished);
}
let mut len = 0;
let beg: Key = rng.start.encode()?;
let end: Key = rng.end.encode()?;
let mut next = Some(beg..end);
while let Some(rng) = next {
let res = self.batch_keys(rng, *COUNT_BATCH_SIZE, None).await?;
next = res.next;
len += res.result.len();
}
Ok(len)
}
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(rng = rng.sprint()))]
async fn scan_all_versions<K>(
&mut self,
rng: Range<K>,
limit: u32,
) -> Result<Vec<(Key, Val, Version, bool)>, Error>
where
K: KeyEncode + Sprintable + Debug,
{
Err(Error::UnsupportedVersionedQueries)
}
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(rng = rng.sprint()))]
async fn batch_keys<K>(
&mut self,
rng: Range<K>,
batch: u32,
version: Option<u64>,
) -> Result<Batch<Key>, Error>
where
K: KeyEncode + Sprintable + Debug,
{
if self.closed() {
return Err(Error::TxFinished);
}
let beg: Key = rng.start.encode()?;
let end: Key = rng.end.encode()?;
let res = self.keys(beg..end.clone(), batch, version).await?;
if res.len() < batch as usize && batch > 0 {
Ok(Batch::<Key>::new(None, res))
} else {
match res.last() {
Some(k) => {
let mut k = k.clone();
util::advance_key(&mut k);
Ok(Batch::<Key>::new(
Some(Range {
start: k,
end,
}),
res,
))
}
None => Ok(Batch::<Key>::new(None, res)),
}
}
}
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(rng = rng.sprint()))]
async fn batch_keys_vals<K>(
&mut self,
rng: Range<K>,
batch: u32,
version: Option<u64>,
) -> Result<Batch<(Key, Val)>, Error>
where
K: KeyEncode + Sprintable + Debug,
{
if self.closed() {
return Err(Error::TxFinished);
}
let beg: Key = rng.start.encode()?;
let end: Key = rng.end.encode()?;
let res = self.scan(beg..end.clone(), batch, version).await?;
if res.len() < batch as usize && batch > 0 {
Ok(Batch::<(Key, Val)>::new(None, res))
} else {
match res.last() {
Some((k, _)) => {
let mut k = k.clone();
util::advance_key(&mut k);
Ok(Batch::<(Key, Val)>::new(
Some(Range {
start: k,
end,
}),
res,
))
}
None => Ok(Batch::<(Key, Val)>::new(None, res)),
}
}
}
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(rng = rng.sprint()))]
async fn batch_keys_vals_versions<K>(
&mut self,
rng: Range<K>,
batch: u32,
) -> Result<Batch<(Key, Val, Version, bool)>, Error>
where
K: KeyEncode + Sprintable + Debug,
{
if self.closed() {
return Err(Error::TxFinished);
}
let beg: Key = rng.start.encode()?;
let end: Key = rng.end.encode()?;
let res = self.scan_all_versions(beg..end.clone(), batch).await?;
if res.len() < batch as usize && batch > 0 {
Ok(Batch::<(Key, Val, Version, bool)>::new(None, res))
} else {
match res.last() {
Some((k, _, _, _)) => {
let mut k = k.clone();
util::advance_key(&mut k);
Ok(Batch::<(Key, Val, Version, bool)>::new(
Some(Range {
start: k,
end,
}),
res,
))
}
None => Ok(Batch::<(Key, Val, Version, bool)>::new(None, res)),
}
}
}
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(key = key.sprint()))]
async fn get_timestamp<K>(&mut self, key: K) -> Result<VersionStamp, Error>
where
K: KeyEncode + Sprintable + Debug,
{
if self.closed() {
return Err(Error::TxFinished);
}
let key = key.encode()?;
let ver = match self.get(key.clone(), None).await? {
Some(prev) => VersionStamp::from_slice(prev.as_slice())?
.next()
.expect("exhausted all possible timestamps"),
None => VersionStamp::from_u64(1),
};
self.set(key, &ver.as_bytes(), None).await?;
Ok(ver)
}
#[instrument(level = "trace", target = "surrealdb::core::kvs::api", skip(self), fields(ts_key = ts_key.sprint()))]
async fn set_versionstamp<K, V>(
&mut self,
ts_key: K,
prefix: K,
suffix: K,
val: V,
) -> Result<(), Error>
where
K: KeyEncode + Sprintable + Debug,
V: Into<Val> + Debug,
{
if self.closed() {
return Err(Error::TxFinished);
}
if !self.writeable() {
return Err(Error::TxReadonly);
}
let ts = self.get_timestamp(ts_key).await?;
let mut k: Vec<u8> = prefix.encode()?;
k.extend_from_slice(&ts.as_bytes());
suffix.encode_into(&mut k)?;
self.set(k, val, None).await
}
}