use std::collections::HashMap;
use std::sync::Arc;
use anyhow::Context;
use num_derive::FromPrimitive;
use num_traits::FromPrimitive;
use tokio::sync::mpsc::{Receiver, Sender};
use crate::commands::ResultExt;
use crate::commands::{queue, Call, CommandDictionary, CommandError, CommandResult, Endpoint};
use crate::fmt::format_size;
use crate::idb::idb_csv_loader::IdbCsvLoader;
use crate::idb::idb_json_loader::IdbJsonLoader;
use crate::idb::idb_yaml_loader::IdbYamlLoader;
use crate::idb::idb_yaml_set_loader::IdbYamlSetLoader;
use crate::idb::set::Set;
use crate::idb::table::Table;
use crate::ig::docs::{Element, Query};
use crate::platform::Platform;
use crate::repository::Repository;
use crate::request::Request;
use crate::response::Response;
use crate::spawn;
pub mod idb_csv_loader;
pub mod idb_json_loader;
pub mod idb_yaml_loader;
pub mod idb_yaml_set_loader;
pub mod set;
pub mod table;
pub mod trie;
pub enum DatabaseCommand {
CreateTable(String, Table),
DropTable(String),
CreateSet(String, String, Set),
DropSets(String),
}
pub struct Database {
sender: Sender<DatabaseCommand>,
}
impl Database {
pub async fn perform(&self, command: DatabaseCommand) -> anyhow::Result<()> {
if self.sender.send(command).await.is_ok() {
Ok(())
} else {
Err(anyhow::anyhow!(
"Failed to enqueue administrative database command."
))
}
}
}
const MAX_RESULTS: usize = 1000;
#[derive(FromPrimitive)]
enum Commands {
ShowTables,
Query,
IQuery,
Lookup,
ILookup,
Search,
ISearch,
Scan,
IScan,
ShowSets,
Contains,
IndexOf,
Len,
Cardinality,
}
pub fn install(platform: Arc<Platform>) {
let (cmd_queue, cmd_endpoint) = queue();
let (admin_sender, admin_receiver) = tokio::sync::mpsc::channel(16);
let database = Arc::new(Database {
sender: admin_sender,
});
platform.register::<Database>(database);
actor(cmd_endpoint, admin_receiver);
if let Some(commands) = platform.find::<CommandDictionary>() {
commands.register_command(
"IDB.SHOW_TABLES",
cmd_queue.clone(),
Commands::ShowTables as usize,
);
commands.register_command("IDB.LOOKUP", cmd_queue.clone(), Commands::Lookup as usize);
commands.register_command("IDB.ILOOKUP", cmd_queue.clone(), Commands::ILookup as usize);
commands.register_command("IDB.QUERY", cmd_queue.clone(), Commands::Query as usize);
commands.register_command("IDB.IQUERY", cmd_queue.clone(), Commands::IQuery as usize);
commands.register_command("IDB.SEARCH", cmd_queue.clone(), Commands::Search as usize);
commands.register_command("IDB.ISEARCH", cmd_queue.clone(), Commands::ISearch as usize);
commands.register_command("IDB.SCAN", cmd_queue.clone(), Commands::Scan as usize);
commands.register_command("IDB.ISCAN", cmd_queue.clone(), Commands::IScan as usize);
commands.register_command(
"IDB.SHOW_SETS",
cmd_queue.clone(),
Commands::ShowSets as usize,
);
commands.register_command(
"IDB.CONTAINS",
cmd_queue.clone(),
Commands::Contains as usize,
);
commands.register_command(
"IDB.INDEX_OF",
cmd_queue.clone(),
Commands::IndexOf as usize,
);
commands.register_command(
"IDB.CARDINALITY",
cmd_queue.clone(),
Commands::Cardinality as usize,
);
commands.register_command("IDB.LEN", cmd_queue, Commands::Len as usize);
}
if let Some(repo) = platform.find::<Repository>() {
repo.register_loader(
"idb-yaml".to_owned(),
Arc::new(IdbYamlLoader::new(platform.clone())),
);
repo.register_loader(
"idb-json".to_owned(),
Arc::new(IdbJsonLoader::new(platform.clone())),
);
repo.register_loader(
"idb-csv".to_owned(),
Arc::new(IdbCsvLoader::new(platform.clone())),
);
repo.register_loader(
"idb-yaml-sets".to_owned(),
Arc::new(IdbYamlSetLoader::new(platform.clone())),
);
}
}
fn actor(mut endpoint: Endpoint, mut admin_receiver: Receiver<DatabaseCommand>) {
spawn!(async move {
let mut tables = HashMap::new();
let mut sets = HashMap::new();
loop {
tokio::select! {
call = endpoint.recv() => match call {
Some(call) => handle_call(call, &tables, &sets).await,
None => return
},
cmd = admin_receiver.recv() => match cmd {
Some(cmd) => handle_admin(cmd, &mut tables, &mut sets),
None => return
}
}
}
});
}
async fn handle_call(
mut call: Call,
tables: &HashMap<String, Arc<Table>>,
sets: &HashMap<String, (Arc<Set>, String)>,
) {
let command = Commands::from_usize(call.token);
match command {
Some(Commands::ShowTables) => show_tables_command(&mut call, tables).complete(call),
Some(Commands::ShowSets) => show_sets_command(&mut call, sets).complete(call),
Some(Commands::Contains) | Some(Commands::IndexOf) | Some(Commands::Cardinality) => {
handle_set_call(call, sets).await
}
_ => handle_table_call(call, tables).await,
}
}
fn show_tables_command(call: &mut Call, database: &HashMap<String, Arc<Table>>) -> CommandResult {
if call.request.parameter_count() == 0 {
let mut result = String::new();
result += format!(
"{:<40} {:>10} {:>10} {:>12} {:>10}\n",
"Name", "Num Rows", "Queries", "Scan Qrys", "Scans"
)
.as_str();
result += crate::response::SEPARATOR;
for (name, table) in database {
result += format!(
"{:<40} {:>10} {:>10} {:>12} {:>10}\n",
name,
table.len(),
table.num_queries(),
table.num_scan_queries(),
table.num_scans()
)
.as_str();
}
result += crate::response::SEPARATOR;
call.response.bulk(result)?;
} else if "raw" == call.request.str_parameter(0).unwrap_or("") {
call.response.array(database.len() as i32)?;
for (name, table) in database {
call.response.array(5)?;
call.response.bulk(name)?;
call.response.number(table.len() as i64)?;
call.response.number(table.num_queries() as i64)?;
call.response.number(table.num_scan_queries() as i64)?;
call.response.number(table.num_scans() as i64)?;
}
} else {
let table_name = call.request.str_parameter(0).unwrap_or("");
for (name, table) in database {
if name.as_str() == table_name {
call.response.array(6)?;
call.response.bulk(name)?;
call.response.number(table.len() as i64)?;
call.response.number(table.allocated_memory() as i64)?;
call.response.number(table.num_queries() as i64)?;
call.response.number(table.num_scan_queries() as i64)?;
call.response.number(table.num_scans() as i64)?;
}
}
}
Ok(())
}
fn show_sets_command(
call: &mut Call,
database: &HashMap<String, (Arc<Set>, String)>,
) -> CommandResult {
if call.request.parameter_count() == 0 {
let mut result = String::new();
result += format!(
"{:<40} {:<20} {:>10} {:>12} {:>10}\n",
"Source", "Name", "Num Elements", "Memory", "Queries"
)
.as_str();
result += crate::response::SEPARATOR;
for (name, (set, source)) in database {
result += format!(
"{:<40} {:<20} {:>10} {:>12} {:>10}\n",
source,
name,
set.len(),
format_size(set.allocated_memory()),
set.num_queries()
)
.as_str();
}
result += crate::response::SEPARATOR;
call.response.bulk(result)?;
} else {
call.response.array(database.len() as i32)?;
for (name, (set, source)) in database {
call.response.array(5)?;
call.response.bulk(source)?;
call.response.bulk(name)?;
call.response.number(set.len() as i64)?;
call.response.number(set.allocated_memory() as i64)?;
call.response.number(set.num_queries() as i64)?;
}
}
Ok(())
}
async fn handle_table_call(mut call: Call, database: &HashMap<String, Arc<Table>>) {
let table_name = if let Ok(name) = call.request.str_parameter(0) {
name
} else {
call.complete(Err(CommandError::ClientError(anyhow::anyhow!(
"Missing table name as first parameter!"
))));
return;
};
let table = if let Some(table) = database.get(table_name) {
table.clone()
} else {
let table_name = table_name.to_owned();
call.complete(Err(CommandError::ClientError(anyhow::anyhow!(
"Unknown table: {}",
table_name
))));
return;
};
spawn!(async move {
let token = call.token;
match Commands::from_usize(token) {
Some(Commands::Lookup) => {
execute_query(&mut call, table, false, false, true).complete(call)
}
Some(Commands::ILookup) => {
execute_query(&mut call, table, true, false, true).complete(call)
}
Some(Commands::Query) => {
execute_query(&mut call, table, false, true, true).complete(call)
}
Some(Commands::IQuery) => {
execute_query(&mut call, table, true, true, true).complete(call)
}
Some(Commands::Search) => {
execute_query(&mut call, table, false, true, false).complete(call)
}
Some(Commands::ISearch) => {
execute_query(&mut call, table, true, true, false).complete(call)
}
Some(Commands::Scan) => execute_scan(&mut call, table, false).complete(call),
Some(Commands::IScan) => execute_scan(&mut call, table, true).complete(call),
Some(Commands::Len) => execute_table_len(&mut call, table).complete(call),
_ => call.handle_unknown_token(),
}
});
}
fn execute_query(
call: &mut Call,
table: Arc<Table>,
translate: bool,
read_limits: bool,
exact: bool,
) -> CommandResult {
let mut parameter_index = 1;
let (primary_lang, fallback_lang) = parse_langs(call, &table, translate, &mut parameter_index)?;
let (skip, limit) = if read_limits {
parse_limits(call, &mut parameter_index)?
} else {
(0, 1)
};
let query = call
.request
.str_parameter(parameter_index)
.context("Missing query parameter.")?;
parameter_index += 1;
let value = call
.request
.str_parameter(parameter_index)
.context("Missing value parameter.")?;
parameter_index += 1;
let mut iter = table.query(query, value, exact)?.skip(skip as usize);
emit_results(
&call.request,
&mut call.response,
&table,
parameter_index,
&mut iter,
limit as usize,
I18nContext {
primary_lang,
fallback_lang,
default_lang: table.default_lang_query(),
},
)?;
Ok(())
}
struct I18nContext<'a> {
primary_lang: Option<Query>,
fallback_lang: Option<Query>,
default_lang: &'a Query,
}
fn execute_scan(call: &mut Call, table: Arc<Table>, translate: bool) -> CommandResult {
let mut parameter_index = 1;
let (primary_lang, fallback_lang) = parse_langs(call, &table, translate, &mut parameter_index)?;
let (skip, limit) = if parameter_index == call.request.parameter_count() {
(0, 0)
} else {
parse_limits(call, &mut parameter_index)?
};
let mut iter = table.table_scan().skip(skip as usize);
emit_results(
&call.request,
&mut call.response,
&table,
parameter_index,
&mut iter,
limit as usize,
I18nContext {
primary_lang,
fallback_lang,
default_lang: table.default_lang_query(),
},
)?;
Ok(())
}
fn parse_langs(
call: &Call,
table: &Arc<Table>,
translate: bool,
parameter_index: &mut usize,
) -> anyhow::Result<(Option<Query>, Option<Query>)> {
if translate {
*parameter_index += 2;
let primary_lang = call
.request
.str_parameter(*parameter_index - 2)
.context("Missing primary language as parameter.")?;
let fallback_lang = call
.request
.str_parameter(*parameter_index - 1)
.context("Missing fallback language as parameter.")?;
if primary_lang != fallback_lang {
Ok((
Some(table.compile(primary_lang)),
Some(table.compile(fallback_lang)),
))
} else {
Ok((Some(table.compile(primary_lang)), None))
}
} else {
Ok((None, None))
}
}
fn parse_limits(call: &Call, parameter_index: &mut usize) -> anyhow::Result<(i32, i32)> {
*parameter_index += 2;
Ok((
call.request
.int_parameter(*parameter_index - 2)
.context("Missing or invalid skip parameter.")?,
call.request
.int_parameter(*parameter_index - 1)
.context("Missing or invalid limit parameter.")?,
))
}
fn emit_results<'a, I>(
request: &Request,
response: &mut Response,
table: &Arc<Table>,
parameter_index: usize,
iter: &'a mut I,
limit: usize,
i18n: I18nContext,
) -> anyhow::Result<()>
where
I: Iterator<Item = Element<'a>>,
{
if request.parameter_count() == parameter_index {
response.number(iter.count() as i64)?;
} else {
let mut results = Vec::new();
let max_results = limit.min(MAX_RESULTS);
for row in iter {
if results.len() >= max_results {
break;
}
results.push(row);
}
let queries = (parameter_index..request.parameter_count())
.map(|index| table.compile(request.str_parameter(index).unwrap_or(".")))
.collect::<Vec<Query>>();
response.array(results.len() as i32)?;
for row in results {
response.array(request.parameter_count() as i32 - parameter_index as i32)?;
for query in &queries {
emit_element(
query.execute(row),
response,
&i18n,
query.is_root_node_query(),
)?;
}
}
}
Ok(())
}
fn emit_element(
element: Element,
response: &mut Response,
i18n: &I18nContext,
suppress_i18n: bool,
) -> anyhow::Result<()> {
if let Some(string) = element.as_str() {
response.bulk(string)?;
} else if let Some(int) = element.as_int() {
response.number(int)?;
} else if let Some(bool) = element.try_as_bool() {
response.boolean(bool)?;
} else if element.is_list() {
response.array(element.len() as i32)?;
for child in element.iter() {
emit_element(child, response, i18n, false)?;
}
} else if element.is_object() {
if !suppress_i18n && i18n.primary_lang.is_some() {
if !emit_translated(element, i18n.primary_lang.as_ref(), response, i18n)?
&& !emit_translated(element, i18n.fallback_lang.as_ref(), response, i18n)?
&& !emit_translated(element, Some(i18n.default_lang), response, i18n)?
{
response.empty_string()?;
}
} else {
response.bulk(to_json(element, i18n).to_string())?
}
} else {
response.empty_string()?;
}
Ok(())
}
fn emit_translated(
element: Element,
lang: Option<&Query>,
response: &mut Response,
i18n: &I18nContext,
) -> anyhow::Result<bool> {
if let Some(lang) = lang {
let translated = lang.execute(element);
if !translated.is_empty() {
emit_element(translated, response, i18n, false)?;
return Ok(true);
}
}
Ok(false)
}
fn to_json(element: Element, i18n: &I18nContext) -> serde_json::value::Value {
if let Some(string) = element.as_str() {
serde_json::json!(string)
} else if let Some(int) = element.as_int() {
serde_json::json!(int)
} else if let Some(bool) = element.try_as_bool() {
serde_json::json!(bool)
} else if element.is_list() {
serde_json::json!(element
.iter()
.map(|child| to_json(child, i18n))
.collect::<serde_json::value::Value>())
} else if element.is_object() {
hash_to_json(element, i18n)
} else {
serde_json::value::Value::Null
}
}
fn hash_to_json(element: Element, i18n: &I18nContext) -> serde_json::value::Value {
let mut hash = HashMap::new();
for (key, child) in element.entries() {
let _ = hash.insert(key, to_json(child, i18n));
}
serde_json::json!(hash)
}
fn execute_table_len(call: &mut Call, table: Arc<Table>) -> CommandResult {
call.response.number(table.len() as i64)?;
Ok(())
}
async fn handle_set_call(mut call: Call, database: &HashMap<String, (Arc<Set>, String)>) {
let set_name = if let Ok(name) = call.request.str_parameter(0) {
name
} else {
call.complete(Err(CommandError::ClientError(anyhow::anyhow!(
"Missing set name as first parameter!"
))));
return;
};
let (set, _) = if let Some(set) = database.get(set_name) {
set.clone()
} else {
let set_name = set_name.to_owned();
call.complete(Err(CommandError::ClientError(anyhow::anyhow!(
"Unknown set: {}",
set_name
))));
return;
};
spawn!(async move {
let token = call.token;
match Commands::from_usize(token) {
Some(Commands::Contains) => set_contains_command(&mut call, set).complete(call),
Some(Commands::Cardinality) => set_cardinality_command(&mut call, set).complete(call),
Some(Commands::IndexOf) => set_index_of_command(&mut call, set).complete(call),
_ => call.handle_unknown_token(),
}
});
}
fn set_contains_command(call: &mut Call, set: Arc<Set>) -> CommandResult {
if call.request.parameter_count() == 2 {
call.response
.boolean(set.contains(call.request.str_parameter(1)?))?;
} else {
call.response
.array((call.request.parameter_count() - 1) as i32)?;
for index in 1..call.request.parameter_count() {
call.response
.boolean(set.contains(call.request.str_parameter(index)?))?;
}
}
Ok(())
}
fn set_index_of_command(call: &mut Call, set: Arc<Set>) -> CommandResult {
if call.request.parameter_count() == 2 {
call.response
.number(set.index_of(call.request.str_parameter(1)?) as i64)?;
} else {
call.response
.array((call.request.parameter_count() - 1) as i32)?;
for index in 1..call.request.parameter_count() {
call.response
.number(set.index_of(call.request.str_parameter(index)?) as i64)?;
}
}
Ok(())
}
fn set_cardinality_command(call: &mut Call, set: Arc<Set>) -> CommandResult {
call.response.number(set.len() as i64)?;
Ok(())
}
fn handle_admin(
command: DatabaseCommand,
tables: &mut HashMap<String, Arc<Table>>,
sets: &mut HashMap<String, (Arc<Set>, String)>,
) {
match command {
DatabaseCommand::CreateTable(name, table) => {
log::info!(
"New or updated table: {} ({} rows, {})",
&name,
table.len(),
format_size(table.allocated_memory())
);
let _ = tables.insert(name, Arc::new(table));
}
DatabaseCommand::DropTable(name) => {
log::info!("Dropping table: {}...", &name);
let _ = tables.remove(&name);
}
DatabaseCommand::CreateSet(source, name, set) => {
log::info!(
"New or updated set: {} ({} elements, {}, Source: {})",
&name,
set.len(),
format_size(set.allocated_memory()),
source
);
let _ = sets.insert(name, (Arc::new(set), source));
}
DatabaseCommand::DropSets(source) => {
log::info!("Dropping sets of: {}...", &source);
sets.retain(|_, v| v.1 != source);
}
};
}
#[cfg(test)]
mod tests {
use crate::builder::Builder;
use crate::config::Config;
use crate::idb::set::Set;
use crate::idb::table::{IndexType, Table};
use crate::idb::{install, Database, DatabaseCommand};
use crate::ig::docs::Doc;
use crate::ig::yaml::list_to_doc;
use crate::platform::Platform;
use crate::server::Server;
use crate::testing::{query_redis_async, test_async};
use std::sync::Arc;
use tokio::time::Duration;
use yaml_rust::YamlLoader;
fn create_example_dataset() -> Doc {
let input = r#"
code: "D"
iso:
two: "de"
three: "deu"
name:
de: "Deutschland"
en: "Germany"
---
code: "A"
iso:
two: "at"
three: "aut"
name:
de: "Österreich"
en: "Austria"
xx: "Test"
---
code: "X"
name: Test
"#;
let rows = YamlLoader::load_from_str(input).unwrap();
list_to_doc(rows.as_slice(), |_| true).unwrap()
}
#[test]
fn integration_test_for_tables() {
log::info!("Acquiring shared resources...");
let _guard = crate::testing::SHARED_TEST_RESOURCES.lock().unwrap();
log::info!("Successfully acquired shared resources.");
test_async(async {
let (platform, database) = setup_environment().await;
let dataset = create_example_dataset();
let table = Table::new(
dataset,
vec![
IndexType::lookup("code"),
IndexType::lookup("iso.two"),
IndexType::fulltext("name"),
],
)
.unwrap();
database
.perform(DatabaseCommand::CreateTable("countries".to_string(), table))
.await
.unwrap();
tokio::time::sleep(Duration::from_secs(2)).await;
assert_eq!(
query_redis_async(|con| redis::cmd("IDB.SCAN").arg("countries").query::<i32>(con))
.await
.unwrap(),
3
);
let result = query_redis_async(|con| {
redis::cmd("IDB.LOOKUP")
.arg("countries")
.arg("code")
.arg("D")
.arg("iso.two")
.query::<Vec<Vec<String>>>(con)
})
.await
.unwrap();
assert_eq!(result[0][0], "de");
let result =
query_redis_async(|con| redis::cmd("IDB.LEN").arg("countries").query::<i32>(con))
.await
.unwrap();
assert_eq!(result, 3);
let result = query_redis_async(|con| {
redis::cmd("IDB.LOOKUP")
.arg("countries")
.arg("code")
.arg("A")
.arg("iso.two")
.arg("iso.three")
.query::<Vec<Vec<(String, String)>>>(con)
})
.await
.unwrap();
assert_eq!(result[0][0].0, "at");
assert_eq!(result[0][0].1, "aut");
let result = query_redis_async(|con| {
redis::cmd("IDB.LOOKUP")
.arg("countries")
.arg("name")
.arg("austria")
.arg("iso.two")
.arg("iso.three")
.query::<Vec<Vec<(String, String)>>>(con)
})
.await
.unwrap();
assert_eq!(result[0][0].0, "at");
assert_eq!(result[0][0].1, "aut");
let result = query_redis_async(|con| {
redis::cmd("IDB.LOOKUP")
.arg("countries")
.arg("code")
.arg("d")
.arg("iso.two")
.query::<Vec<Vec<String>>>(con)
})
.await
.unwrap();
assert!(result.is_empty());
let result = query_redis_async(|con| {
redis::cmd("IDB.LOOKUP")
.arg("countries")
.arg("code")
.arg("X")
.arg(".")
.query::<Vec<Vec<String>>>(con)
})
.await
.unwrap();
assert_eq!(result[0][0], "{\"code\":\"X\",\"name\":\"Test\"}");
let result = query_redis_async(|con| {
redis::cmd("IDB.LOOKUP")
.arg("countries")
.arg("code")
.arg("A")
.arg("name")
.query::<Vec<Vec<String>>>(con)
})
.await
.unwrap();
assert_eq!(
result[0][0],
"{\"de\":\"Österreich\",\"en\":\"Austria\",\"xx\":\"Test\"}"
);
let result = query_redis_async(|con| {
redis::cmd("IDB.ILOOKUP")
.arg("countries")
.arg("uu")
.arg("uu")
.arg("code")
.arg("D")
.arg("name")
.query::<Vec<Vec<String>>>(con)
})
.await
.unwrap();
assert_eq!(result[0][0], "");
let result = query_redis_async(|con| {
redis::cmd("IDB.ILOOKUP")
.arg("countries")
.arg("en")
.arg("de")
.arg("code")
.arg("A")
.arg("name")
.arg("name.de")
.query::<Vec<Vec<(String, String)>>>(con)
})
.await
.unwrap();
assert_eq!(result[0][0].0, "Austria");
assert_eq!(result[0][0].1, "Österreich");
let result = query_redis_async(|con| {
redis::cmd("IDB.ILOOKUP")
.arg("countries")
.arg("it")
.arg("it")
.arg("code")
.arg("A")
.arg("name")
.arg("name.de")
.query::<Vec<Vec<(String, String)>>>(con)
})
.await
.unwrap();
assert_eq!(result[0][0].0, "Test");
assert_eq!(result[0][0].1, "Österreich");
let result = query_redis_async(|con| {
redis::cmd("IDB.ILOOKUP")
.arg("countries")
.arg("de")
.arg("de")
.arg("code")
.arg("A")
.arg(".")
.query::<Vec<Vec<String>>>(con)
})
.await
.unwrap();
assert_eq!(result[0][0], "{\"code\":\"A\",\"iso\":{\"three\":\"aut\",\"two\":\"at\"},\"name\":{\"de\":\"Österreich\",\"en\":\"Austria\",\"xx\":\"Test\"}}");
let result = query_redis_async(|con| {
redis::cmd("IDB.SEARCH")
.arg("countries")
.arg("0")
.arg("2")
.arg("*")
.arg("deutsch")
.arg("code")
.arg("name.de")
.query::<Vec<Vec<(String, String)>>>(con)
})
.await
.unwrap();
assert_eq!(result[0][0].0, "D");
assert_eq!(result[0][0].1, "Deutschland");
database
.perform(DatabaseCommand::DropTable("countries".to_string()))
.await
.unwrap();
tokio::time::sleep(Duration::from_secs(2)).await;
assert_eq!(
query_redis_async(|con| redis::cmd("IDB.SCAN")
.arg("countries")
.query::<Vec<i32>>(con))
.await
.is_none(),
true
);
platform.terminate()
});
}
async fn setup_environment() -> (Arc<Platform>, Arc<Database>) {
let platform = Builder::new().enable_all().build().await;
install(platform.clone());
platform
.require::<Config>()
.load_from_string(
r#"
server:
port: 1503
"#,
None,
)
.unwrap();
Server::fork_and_await(&platform.require::<Server>()).await;
(platform.clone(), platform.require::<Database>())
}
#[test]
fn integration_test_for_sets() {
log::info!("Acquiring shared resources...");
let _guard = crate::testing::SHARED_TEST_RESOURCES.lock().unwrap();
log::info!("Successfully acquired shared resources.");
test_async(async {
let (platform, database) = setup_environment().await;
let mut set = Set::default();
set.add("A".to_owned());
set.add("B".to_owned());
set.add("C".to_owned());
database
.perform(DatabaseCommand::CreateSet(
"test".to_string(),
"test_set".to_string(),
set,
))
.await
.unwrap();
tokio::time::sleep(Duration::from_secs(2)).await;
assert_eq!(
query_redis_async(|con| redis::cmd("IDB.CONTAINS")
.arg("test_set")
.arg("A")
.query::<i32>(con))
.await
.unwrap(),
1
);
assert_eq!(
query_redis_async(|con| redis::cmd("IDB.CONTAINS")
.arg("test_set")
.arg("X")
.query::<i32>(con))
.await
.unwrap(),
0
);
assert_eq!(
query_redis_async(|con| redis::cmd("IDB.INDEX_OF")
.arg("test_set")
.arg("B")
.query::<i32>(con))
.await
.unwrap(),
2
);
assert_eq!(
query_redis_async(|con| redis::cmd("IDB.INDEX_OF")
.arg("test_set")
.arg("X")
.query::<i32>(con))
.await
.unwrap(),
0
);
assert_eq!(
query_redis_async(|con| redis::cmd("IDB.CARDINALITY")
.arg("test_set")
.query::<i32>(con))
.await
.unwrap(),
3
);
database
.perform(DatabaseCommand::DropSets("test".to_string()))
.await
.unwrap();
tokio::time::sleep(Duration::from_secs(2)).await;
assert_eq!(
query_redis_async(|con| redis::cmd("IDB.CONTAINS")
.arg("test_set")
.arg("A")
.query::<Vec<i32>>(con))
.await
.is_none(),
true
);
platform.terminate()
});
}
}