use std::{collections::HashMap, error::Error as StdError, fmt::Write};
use reifydb_core::{
common::CommitVersion,
delta::Delta,
encoded::{
key::{EncodedKey, EncodedKeyRange},
row::EncodedRow,
},
event::EventBus,
interface::store::{EntryKind, MultiVersionCommit, MultiVersionContains, MultiVersionGet, MultiVersionRow},
util::encoding::{
binary::decode_binary,
format::{Formatter, raw::Raw},
},
};
use reifydb_runtime::{
actor::system::ActorSystem,
context::clock::Clock,
pool::{PoolConfig, Pools},
};
use reifydb_store_multi::{
buffer::tier::MultiBufferTier,
config::{BufferConfig, MultiStoreConfig, PersistentConfig},
store::{
StandardMultiStore,
router::classify_key,
version::{VersionedGetResult, get_at_version},
},
tier::TierStorage,
};
use reifydb_testing::testscript;
use reifydb_type::{cow_vec, util::cowvec::CowVec};
use testscript::command::Command;
pub struct Runner {
pub store: StandardMultiStore,
pub version: CommitVersion,
pub auto_flush: bool,
}
impl Runner {
#[allow(dead_code)]
pub fn new(storage: MultiBufferTier) -> Self {
let pools = Pools::new(PoolConfig::default());
let actor_system = ActorSystem::new(pools, Clock::Real);
let store = StandardMultiStore::new(MultiStoreConfig {
buffer: Some(BufferConfig {
storage,
}),
persistent: None,
retention: Default::default(),
merge_config: Default::default(),
event_bus: EventBus::new(&actor_system),
actor_system,
clock: Clock::Real,
})
.unwrap();
Self::from_store(store)
}
#[allow(dead_code)]
#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
pub fn sqlite_unbuffered(persistent: PersistentConfig) -> Self {
let pools = Pools::new(PoolConfig::default());
let actor_system = ActorSystem::new(pools, Clock::Real);
let event_bus = EventBus::new(&actor_system);
let store = StandardMultiStore::new(MultiStoreConfig::sqlite_unbuffered(
persistent,
actor_system,
Clock::Real,
event_bus,
))
.unwrap();
Self::from_store(store)
}
#[allow(dead_code)]
pub fn from_store(store: StandardMultiStore) -> Self {
Self {
store,
version: CommitVersion(0),
auto_flush: true,
}
}
#[allow(dead_code)]
pub fn from_store_no_auto_flush(store: StandardMultiStore) -> Self {
Self {
store,
version: CommitVersion(0),
auto_flush: false,
}
}
#[inline]
fn maybe_flush(&self) {
if self.auto_flush {
self.store.flush_pending_blocking();
}
}
}
impl testscript::runner::Runner for Runner {
fn run(&mut self, command: &Command) -> Result<String, Box<dyn StdError>> {
let mut output = String::new();
match command.name.as_str() {
"get" => {
let mut args = command.consume_args();
let key =
EncodedKey::new(decode_binary(&args.next_pos().ok_or("key not given")?.value));
let version = CommitVersion(args.lookup_parse("version")?.unwrap_or(self.version.0));
args.reject_rest()?;
let value = self.store.get(&key, version)?.map(|sv: MultiVersionRow| sv.row.to_vec());
writeln!(output, "{}", Raw::key_maybe_value(&key, value))?;
}
"contains" => {
let mut args = command.consume_args();
let key =
EncodedKey::new(decode_binary(&args.next_pos().ok_or("key not given")?.value));
let version = CommitVersion(args.lookup_parse("version")?.unwrap_or(self.version.0));
args.reject_rest()?;
let contains = self.store.contains(&key, version)?;
writeln!(output, "{} => {}", Raw::key(&key), contains)?;
}
"scan" => {
let mut args = command.consume_args();
let reverse = args.lookup_parse("reverse")?.unwrap_or(false);
let version = CommitVersion(args.lookup_parse("version")?.unwrap_or(self.version.0));
args.reject_rest()?;
if !reverse {
let items: Vec<_> = self
.store
.range(EncodedKeyRange::all(), version, 1024)
.collect::<Result<Vec<_>, _>>()?;
print(&mut output, items.into_iter())
} else {
let items: Vec<_> = self
.store
.range_rev(EncodedKeyRange::all(), version, 1024)
.collect::<Result<Vec<_>, _>>()?;
print(&mut output, items.into_iter())
};
}
"range" => {
let mut args = command.consume_args();
let reverse = args.lookup_parse("reverse")?.unwrap_or(false);
let range = EncodedKeyRange::parse(
args.next_pos().map(|a| a.value.as_str()).unwrap_or(".."),
);
let version = CommitVersion(args.lookup_parse("version")?.unwrap_or(self.version.0));
args.reject_rest()?;
if !reverse {
let items: Vec<_> = self
.store
.range(range, version, 1024)
.collect::<Result<Vec<_>, _>>()?;
print(&mut output, items.into_iter())
} else {
let items: Vec<_> = self
.store
.range_rev(range, version, 1024)
.collect::<Result<Vec<_>, _>>()?;
print(&mut output, items.into_iter())
};
}
"prefix" => {
let mut args = command.consume_args();
let reverse = args.lookup_parse("reverse")?.unwrap_or(false);
let version = CommitVersion(args.lookup_parse("version")?.unwrap_or(self.version.0));
let prefix = EncodedKey::new(decode_binary(
&args.next_pos().ok_or("prefix not given")?.value,
));
args.reject_rest()?;
let range = EncodedKeyRange::prefix(prefix.as_slice());
if !reverse {
let items: Vec<_> = self
.store
.range(range, version, 1024)
.collect::<Result<Vec<_>, _>>()?;
print(&mut output, items.into_iter())
} else {
let items: Vec<_> = self
.store
.range_rev(range, version, 1024)
.collect::<Result<Vec<_>, _>>()?;
print(&mut output, items.into_iter())
};
}
"set" => {
let mut args = command.consume_args();
let kv = args.next_key().ok_or("key=value not given")?.clone();
let key = EncodedKey::new(decode_binary(&kv.key.unwrap()));
let row = EncodedRow(CowVec::new(decode_binary(&kv.value)));
let version = if let Some(v) = args.lookup_parse("version")? {
v
} else {
self.version.0 += 1;
self.version
};
args.reject_rest()?;
self.store.commit(
cow_vec![
(Delta::Set {
key,
row
})
],
version,
)?;
self.maybe_flush();
}
"remove" => {
let mut args = command.consume_args();
let key =
EncodedKey::new(decode_binary(&args.next_pos().ok_or("key not given")?.value));
let version = if let Some(v) = args.lookup_parse("version")? {
v
} else {
self.version.0 += 1;
self.version
};
args.reject_rest()?;
self.store.commit(
cow_vec![
(Delta::Remove {
key
})
],
version,
)?;
self.maybe_flush();
}
"unset" => {
let mut args = command.consume_args();
let kv = args.next_key().ok_or("key=value not given")?.clone();
let key = EncodedKey::new(decode_binary(&kv.key.unwrap()));
let row = EncodedRow(CowVec::new(decode_binary(&kv.value)));
let version = if let Some(v) = args.lookup_parse("version")? {
v
} else {
self.version.0 += 1;
self.version
};
args.reject_rest()?;
self.store.commit(
cow_vec![
(Delta::Unset {
key,
row
})
],
version,
)?;
self.maybe_flush();
}
"flush" => {
self.store.flush_pending_blocking();
writeln!(output, "ok")?;
}
"buffer_get" => {
let mut args = command.consume_args();
let key =
EncodedKey::new(decode_binary(&args.next_pos().ok_or("key not given")?.value));
let version = CommitVersion(args.lookup_parse("version")?.unwrap_or(self.version.0));
args.reject_rest()?;
let buffer = self.store.buffer().ok_or("buffer tier not configured")?;
let table = classify_key(&key);
let value = match get_at_version(buffer, table, key.as_ref(), version)? {
VersionedGetResult::Value {
value,
..
} => Some(value.to_vec()),
VersionedGetResult::Tombstone => None,
VersionedGetResult::NotFound => None,
};
writeln!(output, "{}", Raw::key_maybe_value(&key, value))?;
}
"persistent_get" => {
let mut args = command.consume_args();
let key =
EncodedKey::new(decode_binary(&args.next_pos().ok_or("key not given")?.value));
let version = CommitVersion(args.lookup_parse("version")?.unwrap_or(self.version.0));
args.reject_rest()?;
let persistent = self.store.persistent().ok_or("persistent tier not configured")?;
let table = classify_key(&key);
let value = persistent.get(table, key.as_ref(), version)?.map(|v| v.to_vec());
writeln!(output, "{}", Raw::key_maybe_value(&key, value))?;
}
"persistent_set" => {
let mut args = command.consume_args();
let kv = args.next_key().ok_or("key=value not given")?.clone();
let key = EncodedKey::new(decode_binary(&kv.key.unwrap()));
let value_bytes = decode_binary(&kv.value);
let version = if let Some(v) = args.lookup_parse("version")? {
CommitVersion(v)
} else {
self.version.0 += 1;
self.version
};
args.reject_rest()?;
let persistent = self.store.persistent().ok_or("persistent tier not configured")?;
let table = classify_key(&key);
let mut batches: HashMap<EntryKind, Vec<(EncodedKey, Option<CowVec<u8>>)>> =
HashMap::new();
batches.entry(table).or_default().push((key, Some(CowVec::new(value_bytes))));
persistent.set(version, batches)?;
}
name => {
return Err(format!("invalid command {name}").into());
}
}
Ok(output)
}
}
fn print<I: Iterator<Item = MultiVersionRow>>(output: &mut String, iter: I) {
for item in iter {
let fmtkv = Raw::key_value(&item.key, item.row.as_slice());
writeln!(output, "{fmtkv}").unwrap();
}
}