use bytes::{BufMut, Bytes, BytesMut};
use crate::error::{
FdbError, FdbResult, SUBSPACE_PACK_WITH_VERSIONSTAMP_PREFIX_INCOMPLETE,
SUBSPACE_UNPACK_KEY_MISMATCH,
};
use crate::range::Range;
use crate::tuple::Tuple;
#[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd)]
pub struct Subspace {
raw_prefix_has_incomplete_versionstamp: bool,
raw_prefix: Bytes,
}
impl Subspace {
pub fn new(prefix_bytes: Bytes) -> Subspace {
Subspace {
raw_prefix_has_incomplete_versionstamp: false,
raw_prefix: prefix_bytes,
}
}
pub fn subspace(&self, tuple: &Tuple) -> Subspace {
let raw_prefix_has_incomplete_versionstamp =
self.raw_prefix_has_incomplete_versionstamp || tuple.has_incomplete_versionstamp();
let mut raw_prefix = BytesMut::new();
raw_prefix.put(self.raw_prefix.clone());
raw_prefix.put(tuple.pack());
Subspace {
raw_prefix_has_incomplete_versionstamp,
raw_prefix: raw_prefix.into(),
}
}
pub fn contains(&self, key: &Bytes) -> bool {
if key.len() < self.raw_prefix.len() {
false
} else {
self.raw_prefix[..] == key[..self.raw_prefix.len()]
}
}
pub fn pack(&self) -> Bytes {
self.raw_prefix.clone()
}
pub fn pack_with_versionstamp(&self, tuple: &Tuple) -> FdbResult<Bytes> {
if self.raw_prefix_has_incomplete_versionstamp {
Err(FdbError::new(
SUBSPACE_PACK_WITH_VERSIONSTAMP_PREFIX_INCOMPLETE,
))
} else {
tuple.pack_with_versionstamp(self.raw_prefix.clone())
}
}
pub fn range(&self, tuple: &Tuple) -> Range {
if self.raw_prefix_has_incomplete_versionstamp {
panic!(
"Cannot create Range value as subspace prefix contains an incomplete versionstamp"
);
}
tuple.range(self.raw_prefix.clone())
}
pub fn unpack(&self, key: &Bytes) -> FdbResult<Tuple> {
if !self.contains(key) {
Err(FdbError::new(SUBSPACE_UNPACK_KEY_MISMATCH))
} else {
Tuple::from_bytes(key.slice(self.raw_prefix.len()..))
}
}
}
#[cfg(test)]
mod tests {
use bytes::{BufMut, Bytes, BytesMut};
use crate::error::{
FdbError, SUBSPACE_PACK_WITH_VERSIONSTAMP_PREFIX_INCOMPLETE, SUBSPACE_UNPACK_KEY_MISMATCH,
TUPLE_PACK_WITH_VERSIONSTAMP_MULTIPLE_FOUND,
};
use crate::range::Range;
use crate::tuple::{Tuple, Versionstamp};
use super::Subspace;
#[test]
fn new() {
let s = Subspace::new(Bytes::from_static(&b"prefix"[..]));
assert!(
!s.raw_prefix_has_incomplete_versionstamp
&& s.raw_prefix == Bytes::from_static(&b"prefix"[..])
);
}
#[test]
fn subspace() {
let mut t = Tuple::new();
t.add_string("hello".to_string());
let s = Subspace::new(Bytes::new()).subspace(&t);
assert!(!s.raw_prefix_has_incomplete_versionstamp && s.raw_prefix == t.pack());
let mut t = Tuple::new();
t.add_versionstamp(Versionstamp::incomplete(0));
let s = Subspace::new(Bytes::new()).subspace(&t);
assert!(s.raw_prefix_has_incomplete_versionstamp && s.raw_prefix == t.pack());
}
#[test]
fn contains() {
let s = Subspace::new(Bytes::from_static(&b"prefix"[..]));
let mut t = Tuple::new();
assert!(!s.contains(
&Subspace::new(Bytes::from_static(&b"p"[..]))
.subspace(&t)
.pack()
));
t.add_string("hello".to_string());
assert!(!s.contains(
&Subspace::new(Bytes::from_static(&b"wrong_prefix"[..]))
.subspace(&t)
.pack()
));
assert!(s.contains(
&Subspace::new(Bytes::from_static(&b"prefix_plus_garbage"[..]))
.subspace(&t)
.pack()
));
assert!(s.contains(
&Subspace::new(Bytes::from_static(&b"prefix"[..]))
.subspace(&t)
.pack()
));
}
#[test]
fn pack() {
let mut t = Tuple::new();
t.add_string("hello".to_string());
let s = Subspace::new(Bytes::from_static(&b"prefix"[..]));
assert_eq!(s.subspace(&t).pack(), {
let mut b = BytesMut::new();
b.put(&b"prefix"[..]);
b.put(t.pack());
Into::<Bytes>::into(b)
});
}
#[test]
fn pack_with_versionstamp() {
let mut t = Tuple::new();
t.add_versionstamp(Versionstamp::incomplete(0));
let s = Subspace::new(Bytes::new()).subspace(&t);
assert_eq!(
s.pack_with_versionstamp(&{
let mut t1 = Tuple::new();
t1.add_string("hello".to_string());
t1
}),
Err(FdbError::new(
SUBSPACE_PACK_WITH_VERSIONSTAMP_PREFIX_INCOMPLETE
))
);
let mut t = Tuple::new();
t.add_string("foo".to_string());
t.add_versionstamp(Versionstamp::incomplete(0));
let s = Subspace::new(Bytes::from_static(&b"prefix"[..]));
assert_eq!(
s.pack_with_versionstamp(&t),
t.pack_with_versionstamp(Bytes::from_static(&b"prefix"[..]))
);
let mut t = Tuple::new();
t.add_null();
t.add_versionstamp(Versionstamp::incomplete(0));
t.add_tuple({
let mut t1 = Tuple::new();
t1.add_string("foo".to_string());
t1.add_versionstamp(Versionstamp::incomplete(1));
t1
});
assert_eq!(
s.pack_with_versionstamp(&t),
Err(FdbError::new(TUPLE_PACK_WITH_VERSIONSTAMP_MULTIPLE_FOUND))
);
}
#[test]
fn range() {
let s = Subspace::new(Bytes::new()).subspace(&{
let mut t = Tuple::new();
t.add_versionstamp(Versionstamp::incomplete(0));
t
});
assert!(std::panic::catch_unwind(|| {
s.range(&{
let mut t = Tuple::new();
t.add_string("should_panic".to_string());
t
});
})
.is_err());
let s = Subspace::new(Bytes::from_static(&b"prefix"[..]));
assert_eq!(
s.range(&{
let mut t = Tuple::new();
t.add_bytes(Bytes::from_static(&b"foo"[..]));
t
}),
Range::new(
Bytes::from_static(&b"prefix\x01foo\x00\x00"[..]),
Bytes::from_static(&b"prefix\x01foo\x00\xFF"[..])
)
);
}
#[test]
fn unpack() {
let s = Subspace::new(Bytes::from_static(&b"prefix"[..]));
let key = Subspace::new(Bytes::from_static(&b"wrong_prefix"[..]))
.subspace(&{
let mut t = Tuple::new();
t.add_string("hello".to_string());
t
})
.pack();
assert_eq!(
s.unpack(&key),
Err(FdbError::new(SUBSPACE_UNPACK_KEY_MISMATCH))
);
let key = Subspace::new(Bytes::from_static(&b"prefix"[..]))
.subspace(&{
let mut t = Tuple::new();
t.add_string("hello".to_string());
t
})
.pack();
assert_eq!(
s.unpack(&key),
Ok({
let mut t = Tuple::new();
t.add_string("hello".to_string());
t
})
);
}
}