use std::{error::Error as StdError, fmt::Write, path::Path};
use reifydb_core::{
common::CommitVersion,
delta::Delta,
encoded::{
key::{EncodedKey, EncodedKeyRange},
row::EncodedRow,
},
interface::store::{MultiVersionCommit, MultiVersionContains, MultiVersionGet, MultiVersionRow},
util::encoding::{
binary::decode_binary,
format::{Formatter, raw::Raw},
},
};
use reifydb_runtime::{
actor::system::ActorSystem,
context::clock::Clock,
pool::{PoolConfig, Pools},
};
use reifydb_store_multi::{
config::{HotConfig, MultiStoreConfig},
hot::storage::HotStorage,
store::StandardMultiStore,
};
use reifydb_testing::{tempdir::temp_dir, testscript, testscript::command::Command};
use reifydb_type::cow_vec;
use test_each_file::test_each_path;
use testscript::runner::run_path;
test_each_path! { in "crates/store-multi/tests/scripts/multi" as store_multi_memory => test_memory }
test_each_path! { in "crates/store-multi/tests/scripts/multi" as store_multi_sqlite => test_sqlite }
fn test_memory(path: &Path) {
let storage = HotStorage::memory();
run_path(&mut Runner::new(storage), path).expect("test failed")
}
fn test_sqlite(path: &Path) {
temp_dir(|_db_path| {
let storage = HotStorage::sqlite_in_memory();
run_path(&mut Runner::new(storage), path)
})
.expect("test failed")
}
pub struct Runner {
store: StandardMultiStore,
version: CommitVersion,
}
impl Runner {
fn new(storage: HotStorage) -> Self {
let pools = Pools::new(PoolConfig::default());
let actor_system = ActorSystem::new(pools, Clock::Real);
let store = StandardMultiStore::new(MultiStoreConfig {
hot: Some(HotConfig {
storage,
}),
warm: None,
cold: None,
retention: Default::default(),
merge_config: Default::default(),
event_bus: reifydb_core::event::EventBus::new(&actor_system),
actor_system,
clock: Clock::Real,
})
.unwrap();
Self {
store,
version: CommitVersion(0),
}
}
}
impl testscript::runner::Runner for Runner {
fn run(&mut self, command: &Command) -> Result<String, Box<dyn StdError>> {
let mut output = String::new();
match command.name.as_str() {
"get" => {
let mut args = command.consume_args();
let key = EncodedKey(decode_binary(&args.next_pos().ok_or("key not given")?.value));
let version = CommitVersion(args.lookup_parse("version")?.unwrap_or(self.version.0));
args.reject_rest()?;
let value = self.store.get(&key, version)?.map(|sv: MultiVersionRow| sv.row.to_vec());
writeln!(output, "{}", Raw::key_maybe_value(&key, value))?;
}
"contains" => {
let mut args = command.consume_args();
let key = EncodedKey(decode_binary(&args.next_pos().ok_or("key not given")?.value));
let version = CommitVersion(args.lookup_parse("version")?.unwrap_or(self.version.0));
args.reject_rest()?;
let contains = self.store.contains(&key, version)?;
writeln!(output, "{} => {}", Raw::key(&key), contains)?;
}
"scan" => {
let mut args = command.consume_args();
let reverse = args.lookup_parse("reverse")?.unwrap_or(false);
let version = CommitVersion(args.lookup_parse("version")?.unwrap_or(self.version.0));
args.reject_rest()?;
if !reverse {
let items: Vec<_> = self
.store
.range(EncodedKeyRange::all(), version, 1024)
.collect::<Result<Vec<_>, _>>()?;
print(&mut output, items.into_iter())
} else {
let items: Vec<_> = self
.store
.range_rev(EncodedKeyRange::all(), version, 1024)
.collect::<Result<Vec<_>, _>>()?;
print(&mut output, items.into_iter())
};
}
"range" => {
let mut args = command.consume_args();
let reverse = args.lookup_parse("reverse")?.unwrap_or(false);
let range = EncodedKeyRange::parse(
args.next_pos().map(|a| a.value.as_str()).unwrap_or(".."),
);
let version = CommitVersion(args.lookup_parse("version")?.unwrap_or(self.version.0));
args.reject_rest()?;
if !reverse {
let items: Vec<_> = self
.store
.range(range, version, 1024)
.collect::<Result<Vec<_>, _>>()?;
print(&mut output, items.into_iter())
} else {
let items: Vec<_> = self
.store
.range_rev(range, version, 1024)
.collect::<Result<Vec<_>, _>>()?;
print(&mut output, items.into_iter())
};
}
"prefix" => {
let mut args = command.consume_args();
let reverse = args.lookup_parse("reverse")?.unwrap_or(false);
let version = CommitVersion(args.lookup_parse("version")?.unwrap_or(self.version.0));
let prefix =
EncodedKey(decode_binary(&args.next_pos().ok_or("prefix not given")?.value));
args.reject_rest()?;
let range = EncodedKeyRange::prefix(&prefix.0);
if !reverse {
let items: Vec<_> = self
.store
.range(range, version, 1024)
.collect::<Result<Vec<_>, _>>()?;
print(&mut output, items.into_iter())
} else {
let items: Vec<_> = self
.store
.range_rev(range, version, 1024)
.collect::<Result<Vec<_>, _>>()?;
print(&mut output, items.into_iter())
};
}
"set" => {
let mut args = command.consume_args();
let kv = args.next_key().ok_or("key=value not given")?.clone();
let key = EncodedKey(decode_binary(&kv.key.unwrap()));
let row = EncodedRow(decode_binary(&kv.value));
let version = if let Some(v) = args.lookup_parse("version")? {
v
} else {
self.version.0 += 1;
self.version
};
args.reject_rest()?;
self.store.commit(
cow_vec![
(Delta::Set {
key,
row
})
],
version,
)?;
}
"remove" => {
let mut args = command.consume_args();
let key = EncodedKey(decode_binary(&args.next_pos().ok_or("key not given")?.value));
let version = if let Some(v) = args.lookup_parse("version")? {
v
} else {
self.version.0 += 1;
self.version
};
args.reject_rest()?;
self.store.commit(
cow_vec![
(Delta::Remove {
key
})
],
version,
)?
}
"unset" => {
let mut args = command.consume_args();
let kv = args.next_key().ok_or("key=value not given")?.clone();
let key = EncodedKey(decode_binary(&kv.key.unwrap()));
let row = EncodedRow(decode_binary(&kv.value));
let version = if let Some(v) = args.lookup_parse("version")? {
v
} else {
self.version.0 += 1;
self.version
};
args.reject_rest()?;
self.store.commit(
cow_vec![
(Delta::Unset {
key,
row
})
],
version,
)?;
}
name => {
return Err(format!("invalid command {name}").into());
}
}
Ok(output)
}
}
fn print<I: Iterator<Item = MultiVersionRow>>(output: &mut String, iter: I) {
for item in iter {
let fmtkv = Raw::key_value(&item.key, item.row.as_slice());
writeln!(output, "{fmtkv}").unwrap();
}
}