use crate::common;
use futures::stream::StreamExt;
use crate::src::count_results;
use aerospike::expressions::*;
use aerospike::query::PartitionFilter;
use aerospike::ParticleType;
use aerospike::*;
use std::sync::Arc;
const EXPECTED: usize = 100;
async fn create_test_set(client: &Client, no_records: usize) -> String {
let namespace = common::namespace();
let set_name = common::rand_str(10);
let wpolicy = WritePolicy::new(0, Expiration::Seconds(60));
for i in 0..no_records as i64 {
let key = as_key!(namespace, &set_name, i);
let ibin = as_bin!("bin", i);
let sbin = as_bin!("bin2", format!("{}", i));
let fbin = as_bin!("bin3", i as f64 / 3 as f64);
let str = format!("{}{}", "blob", i);
let bbin = as_bin!("bin4", str.as_bytes());
let lbin = as_bin!("bin5", as_list!("a", "b", i));
let mbin = as_bin!("bin6", as_map!("a" => "test", "b" => i));
let bins = vec![ibin, sbin, fbin, bbin, lbin, mbin];
client.delete(&wpolicy, &key).await.unwrap();
client.put(&wpolicy, &key, &bins).await.unwrap();
}
set_name
}
#[aerospike_macro::test]
async fn expression_compare() {
let client = common::client().await;
let set_name = create_test_set(&client, EXPECTED).await;
let rs = test_filter(
&client,
eq(int_bin("bin".to_string()), int_val(1)),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 1, "EQ Test Failed");
let rs = test_filter(
&client,
ne(int_bin("bin".to_string()), int_val(1)),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 99, "NE Test Failed");
let rs = test_filter(
&client,
lt(int_bin("bin".to_string()), int_val(10)),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 10, "LT Test Failed");
let rs = test_filter(
&client,
le(int_bin("bin".to_string()), int_val(10)),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 11, "LE Test Failed");
let rs = test_filter(
&client,
gt(int_bin("bin".to_string()), int_val(1)),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 98, "GT Test Failed");
let rs = test_filter(
&client,
ge(int_bin("bin".to_string()), int_val(1)),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 99, "GT Test Failed");
client.close().await.unwrap();
}
#[aerospike_macro::test]
async fn expression_condition() {
let client = common::client().await;
let set_name = create_test_set(&client, EXPECTED).await;
let rs = test_filter(
&client,
and(vec![
eq(int_bin("bin".to_string()), int_val(1)),
eq(string_bin("bin2".to_string()), string_val("1".to_string())),
]),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 1, "AND Test Failed");
let rs = test_filter(
&client,
or(vec![
eq(int_bin("bin".to_string()), int_val(1)),
eq(int_bin("bin".to_string()), int_val(3)),
]),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 2, "OR Test Failed");
let rs = test_filter(
&client,
not(eq(int_bin("bin".to_string()), int_val(1))),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 99, "NOT Test Failed");
client.close().await.unwrap();
}
#[aerospike_macro::test]
async fn expression_data_types() {
let client = common::client().await;
let set_name = create_test_set(&client, EXPECTED).await;
let rs = test_filter(
&client,
eq(int_bin("bin".to_string()), int_val(1)),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 1, "INT Test Failed");
let rs = test_filter(
&client,
eq(string_bin("bin2".to_string()), string_val("1".to_string())),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 1, "STRING Test Failed");
let rs = test_filter(
&client,
eq(float_bin("bin3".to_string()), float_val(2f64)),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 1, "FLOAT Test Failed");
let rs = test_filter(
&client,
eq(
blob_bin("bin4".to_string()),
blob_val(format!("{}{}", "blob", 5).into_bytes()),
),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 1, "BLOB Test Failed");
let rs = test_filter(
&client,
ne(
bin_type("bin".to_string()),
int_val(ParticleType::NULL as i64),
),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 100, "BIN TYPE Test Failed");
client.close().await.unwrap();
}
#[aerospike_macro::test]
fn expression_aero_5_6() {
let client = common::client().await;
let set_name = create_test_set(&client, EXPECTED).await;
let rs = test_filter(
&client,
eq(
num_add(vec![int_bin("bin".to_string()), int_val(10)]),
int_val(20),
),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 1, "NUM_ADD Test Failed");
let rs = test_filter(
&client,
eq(
num_sub(vec![int_bin("bin".to_string()), int_val(10)]),
int_val(20),
),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 1, "NUM_SUB Test Failed");
let rs = test_filter(
&client,
eq(
num_mul(vec![int_bin("bin".to_string()), int_val(10)]),
int_val(20),
),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 1, "NUM_MUL Test Failed");
let rs = test_filter(
&client,
gt(
num_div(vec![int_bin("bin".to_string()), int_val(5)]),
int_val(10),
),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 45, "NUM_DIV Test Failed");
let rs = test_filter(
&client,
eq(
num_pow(float_bin("bin3".to_string()), float_val(2.0)),
float_val(4.0),
),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 1, "NUM_POW Test Failed");
let rs = test_filter(
&client,
eq(
num_log(float_bin("bin3".to_string()), float_val(2.0)),
float_val(4.0),
),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 1, "NUM_LOG Test Failed");
let rs = test_filter(
&client,
eq(num_mod(int_bin("bin".to_string()), int_val(10)), int_val(0)),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 10, "NUM_MOD Test Failed");
let rs = test_filter(
&client,
eq(num_abs(int_bin("bin".to_string())), int_val(1)),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 1, "NUM_ABS Test Failed");
let rs = test_filter(
&client,
eq(num_floor(float_bin("bin3".to_string())), float_val(2.0)),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 3, "NUM_FLOOR Test Failed");
let rs = test_filter(
&client,
eq(num_ceil(float_bin("bin3".to_string())), float_val(2.0)),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 3, "NUM_CEIL Test Failed");
let rs = test_filter(
&client,
eq(to_int(float_bin("bin3".to_string())), int_val(2)),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 3, "TO_INT Test Failed");
let rs = test_filter(
&client,
eq(to_float(int_bin("bin".to_string())), float_val(2.0)),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 1, "TO_FLOAT Test Failed");
let rs = test_filter(
&client,
eq(
int_and(vec![int_bin("bin".to_string()), int_val(0xff)]),
int_val(0x11),
),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 1, "INT_AND Test Failed");
let rs = test_filter(
&client,
eq(
int_xor(vec![int_bin("bin".to_string()), int_val(10)]),
int_val(16),
),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 1, "INT_XOR Test Failed");
let rs = test_filter(
&client,
eq(int_not(int_bin("bin".to_string())), int_val(-50)),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 1, "INT_NOT Test Failed");
let rs = test_filter(
&client,
gt(
int_lshift(int_bin("bin".to_string()), int_val(8)),
int_val(0xff),
),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 99, "INT_LSHIFT Test Failed");
let rs = test_filter(
&client,
gt(
int_rshift(int_bin("bin".to_string()), int_val(1)),
int_val(0x2a),
),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 14, "INT_RSHIFT Test Failed");
let rs = test_filter(
&client,
gt(
int_arshift(int_bin("bin".to_string()), int_val(1)),
int_val(0x2a),
),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 14, "INT_ARSHIFT Test Failed");
let rs = test_filter(
&client,
eq(int_count(int_bin("bin".to_string())), int_val(3)),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 32, "INT_COUNT Test Failed");
let rs = test_filter(
&client,
gt(
int_lscan(int_bin("bin".to_string()), bool_val(true)),
int_val(60),
),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 7, "INT_LSCAN Test Failed");
let rs = test_filter(
&client,
gt(
int_rscan(int_bin("bin".to_string()), bool_val(true)),
int_val(60),
),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 87, "INT_RSCAN Test Failed");
let rs = test_filter(
&client,
eq(
min(vec![int_bin("bin".to_string()), int_val(10)]),
int_val(10),
),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 90, "MIN Test Failed");
let rs = test_filter(
&client,
eq(
max(vec![int_bin("bin".to_string()), int_val(10)]),
int_val(10),
),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 11, "MAX Test Failed");
let rs = test_filter(
&client,
gt(
cond(vec![
eq(num_mod(int_bin("bin".to_string()), int_val(2)), int_val(0)),
num_add(vec![int_bin("bin".to_string()), int_val(100)]),
gt(num_mod(int_bin("bin".to_string()), int_val(2)), int_val(0)),
num_add(vec![int_bin("bin".to_string()), int_val(10)]),
int_val(-1),
]),
int_val(100),
),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 54, "COND Test Failed");
let rs = test_filter(
&client,
exp_let(vec![
def("x".to_string(), int_bin("bin".to_string())),
and(vec![
lt(int_val(5), var("x".to_string())),
lt(var("x".to_string()), int_val(10)),
]),
]),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 4, "LET/DEF/VAR Test Failed");
client.close().await.unwrap();
}
#[aerospike_macro::test]
fn expression_rec_ops() {
let client = common::client().await;
let set_name = create_test_set(&client, EXPECTED).await;
let rs = test_filter(&client, le(record_size(), int_val(0)), &set_name).await;
let mut count = count_results(rs).await;
if count == 0 {
let rs = test_filter(&client, le(record_size(), int_val(2000)), &set_name).await;
count = count_results(rs).await;
}
assert_eq!(count, 100, "DEVICE SIZE Test Failed");
let rs = test_filter(&client, gt(last_update(), int_val(15000)), &set_name).await;
let count = count_results(rs).await;
assert_eq!(count, 100, "LAST UPDATE Test Failed");
let rs = test_filter(&client, gt(since_update(), int_val(10)), &set_name).await;
let count = count_results(rs).await;
assert_eq!(count, 100, "SINCE UPDATE Test Failed");
let rs = test_filter(&client, ge(void_time(), int_val(50)), &set_name).await;
let count = count_results(rs).await;
assert_eq!(count, 100, "VOID TIME Test Failed");
let rs = test_filter(&client, ge(ttl(), int_val(0)), &set_name).await;
let count = count_results(rs).await;
assert_eq!(count, 100, "TTL Test Failed");
let rs = test_filter(&client, not(is_tombstone()), &set_name).await;
let count = count_results(rs).await;
assert_eq!(count, 100, "TOMBSTONE Test Failed");
let rs = test_filter(
&client,
eq(expressions::set_name(), string_val(set_name.clone())),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 100, "SET NAME Test Failed");
let rs = test_filter(&client, bin_exists("bin4".to_string()), &set_name).await;
let count = count_results(rs).await;
assert_eq!(count, 100, "BIN EXISTS Test Failed");
let rs = test_filter(&client, eq(digest_modulo(3), int_val(1)), &set_name).await;
let count = count_results(rs).await;
assert_eq!(count > 0 && count < 100, true, "DIGEST MODULO Test Failed");
let rs = test_filter(&client, eq(key(ExpType::INT), int_val(50)), &set_name).await;
let count = count_results(rs).await;
assert_eq!(count, 0, "KEY Test Failed");
let rs = test_filter(&client, key_exists(), &set_name).await;
let count = count_results(rs).await;
assert_eq!(count, 0, "KEY EXISTS Test Failed");
let rs = test_filter(&client, eq(nil(), nil()), &set_name).await;
let count = count_results(rs).await;
assert_eq!(count, 100, "NIL Test Failed");
let rs = test_filter(
&client,
regex_compare(
"[1-5]".to_string(),
RegexFlag::ICASE as i64,
string_bin("bin2".to_string()),
),
&set_name,
)
.await;
let count = count_results(rs).await;
assert_eq!(count, 75, "REGEX Test Failed");
client.close().await.unwrap();
}
#[aerospike_macro::test]
async fn test_geo_val_bug() {
let client = common::client().await;
let namespace = common::namespace();
let set_name = common::rand_str(10);
let key = as_key!(namespace, &set_name, "p1");
let bins = [as_bin!(
"point",
as_geo!(r#"{"type":"Point","coordinates":[-122.0,37.5]}"#)
)];
client
.put(&WritePolicy::default(), &key, &bins)
.await
.unwrap();
let circle = r#"{"type":"AeroCircle","coordinates":[[-122.0,37.5],1000]}"#;
let expr = geo_compare(geo_bin("point".into()), geo_val(circle.into()));
let mut qp = QueryPolicy::default();
qp.base_policy.filter_expression = Some(expr);
let rs = client
.query(
&qp,
PartitionFilter::all(),
Statement::new(namespace, &set_name, Bins::All),
)
.await
.unwrap();
let mut stream = rs.into_stream();
while let Some(res) = stream.next().await {
res.unwrap(); }
}
#[aerospike_macro::test]
async fn expression_commands() {
let client = common::client().await;
let namespace = common::namespace();
let set_name = common::rand_str(10);
let wpolicy = WritePolicy::default();
for i in 0..EXPECTED as i64 {
let key = as_key!(namespace, &set_name, i);
let ibin = as_bin!("bin", i);
let bins = vec![ibin];
client.delete(&wpolicy, &key).await.unwrap();
client.put(&wpolicy, &key, &bins).await.unwrap();
}
let mut wpolicy = WritePolicy::default();
let mut rpolicy = ReadPolicy::default();
let mut spolicy = QueryPolicy::default();
let mut bpolicy = BatchPolicy::default();
let key = as_key!(namespace, &set_name, 15);
wpolicy.base_policy.filter_expression = Some(eq(int_bin("bin".to_string()), int_val(16)));
let test = client.delete(&wpolicy, &key).await;
assert_eq!(test.is_err(), true, "DELETE EXP Err Test Failed");
wpolicy.base_policy.filter_expression = Some(eq(int_bin("bin".to_string()), int_val(15)));
let test = client.delete(&wpolicy, &key).await;
assert_eq!(test.is_ok(), true, "DELETE EXP Ok Test Failed");
let key = as_key!(namespace, &set_name, 25);
wpolicy.base_policy.filter_expression = Some(eq(int_bin("bin".to_string()), int_val(15)));
let test = client.put(&wpolicy, &key, &[as_bin!("bin", 26)]).await;
assert_eq!(test.is_err(), true, "PUT Err Test Failed");
wpolicy.base_policy.filter_expression = Some(eq(int_bin("bin".to_string()), int_val(25)));
let test = client.put(&wpolicy, &key, &[as_bin!("bin", 26)]).await;
assert_eq!(test.is_ok(), true, "PUT Ok Test Failed");
let key = as_key!(namespace, &set_name, 35);
rpolicy.base_policy.filter_expression = Some(eq(int_bin("bin".to_string()), int_val(15)));
let test = client.get(&rpolicy, &key, Bins::All).await;
assert_eq!(test.is_err(), true, "GET Err Test Failed");
rpolicy.base_policy.filter_expression = Some(eq(int_bin("bin".to_string()), int_val(35)));
let test = client.get(&rpolicy, &key, Bins::All).await;
assert_eq!(test.is_ok(), true, "GET Ok Test Failed");
let key = as_key!(namespace, &set_name, 45);
wpolicy.base_policy.filter_expression = Some(eq(int_bin("bin".to_string()), int_val(15)));
let test = client.exists(&rpolicy, &key).await;
assert_eq!(test.is_err(), true, "EXISTS Err Test Failed");
rpolicy.base_policy.filter_expression = Some(eq(int_bin("bin".to_string()), int_val(45)));
let test = client.exists(&rpolicy, &key).await;
assert_eq!(test.is_ok(), true, "EXISTS Ok Test Failed");
let key = as_key!(namespace, &set_name, 55);
wpolicy.base_policy.filter_expression = Some(eq(int_bin("bin".to_string()), int_val(15)));
let test = client
.add(&wpolicy, &key, &[as_bin!("test55", "test")])
.await;
assert_eq!(test.is_err(), true, "APPEND Err Test Failed");
wpolicy.base_policy.filter_expression = Some(eq(int_bin("bin".to_string()), int_val(55)));
let test = client
.add(&wpolicy, &key, &[as_bin!("test55", "test")])
.await;
assert_eq!(test.is_ok(), true, "APPEND Ok Test Failed");
let key = as_key!(namespace, &set_name, 55);
wpolicy.base_policy.filter_expression = Some(eq(int_bin("bin".to_string()), int_val(15)));
let test = client
.prepend(&wpolicy, &key, &[as_bin!("test55", "test")])
.await;
assert_eq!(test.is_err(), true, "PREPEND Err Test Failed");
wpolicy.base_policy.filter_expression = Some(eq(int_bin("bin".to_string()), int_val(55)));
let test = client
.prepend(&wpolicy, &key, &[as_bin!("test55", "test")])
.await;
assert_eq!(test.is_ok(), true, "PREPEND Ok Test Failed");
let key = as_key!(namespace, &set_name, 65);
wpolicy.base_policy.filter_expression = Some(eq(int_bin("bin".to_string()), int_val(15)));
let test = client.touch(&wpolicy, &key).await;
assert_eq!(test.is_err(), true, "TOUCH Err Test Failed");
wpolicy.base_policy.filter_expression = Some(eq(int_bin("bin".to_string()), int_val(65)));
let test = client.touch(&wpolicy, &key).await;
assert_eq!(test.is_ok(), true, "TOUCH Ok Test Failed");
spolicy.base_policy.filter_expression = Some(eq(int_bin("bin".to_string()), int_val(75)));
let pf = PartitionFilter::all();
let stmt = Statement::new(namespace, &set_name, Bins::All);
match client.query(&spolicy, pf, stmt).await {
Ok(records) => {
let mut records = records.into_stream();
let mut count = 0;
while let Some(record) = records.next().await {
match record {
Ok(_) => count += 1,
Err(err) => panic!("Error executing scan: {}", err),
}
}
assert_eq!(count, 1, "SCAN Test Failed");
}
Err(err) => println!("Failed to execute scan: {}", err),
}
let bin = as_bin!("test85", 85);
let ops = vec![operations::add(&bin)];
let key = as_key!(namespace, &set_name, 85);
wpolicy.base_policy.filter_expression = Some(eq(int_bin("bin".to_string()), int_val(15)));
let op = client.operate(&wpolicy, &key, &ops).await;
assert_eq!(op.is_err(), true, "OPERATE Err Test Failed");
let key = as_key!(namespace, &set_name, 85);
wpolicy.base_policy.filter_expression = Some(eq(int_bin("bin".to_string()), int_val(85)));
let op = client.operate(&wpolicy, &key, &ops).await;
assert_eq!(op.is_ok(), true, "OPERATE Ok Test Failed");
let bpr = BatchReadPolicy::default();
let mut batch_ops = vec![];
for i in 85..90 {
let key = as_key!(namespace, &set_name, i);
let bo = BatchOperation::read(&bpr, key, Bins::All);
batch_ops.push(bo);
}
bpolicy.base_policy.filter_expression = Some(gt(int_bin("bin".to_string()), int_val(84)));
match client.batch(&bpolicy, &batch_ops).await {
Ok(results) => {
for result in results {
let mut count = 0;
match result.record {
Some(_) => count += 1,
None => {}
}
assert_eq!(count, 1, "BATCH GET Ok Test Failed")
}
}
Err(err) => panic!("Error executing batch request: {}", err),
}
client.close().await.unwrap();
}
async fn test_filter(client: &Client, filter: Expression, set_name: &str) -> Arc<Recordset> {
let namespace = common::namespace();
let mut qpolicy = QueryPolicy::default();
qpolicy.base_policy.filter_expression = Some(filter);
let statement = Statement::new(namespace, set_name, Bins::All);
let pf = PartitionFilter::all();
client.query(&qpolicy, pf, statement).await.unwrap()
}
#[aerospike_macro::test]
async fn expression_from_base64_query() {
let client = common::client().await;
let set_name = create_test_set(&client, EXPECTED).await;
let original = eq(int_bin("bin".to_string()), int_val(1));
let b64 = original.base64().unwrap();
let restored = from_base64(&b64).unwrap();
let rs = test_filter(&client, restored, &set_name).await;
let count = count_results(rs).await;
assert_eq!(count, 1, "from_base64 single match test failed");
let original = and(vec![
ge(int_bin("bin".to_string()), int_val(10)),
lt(int_bin("bin".to_string()), int_val(20)),
]);
let b64 = original.base64().unwrap();
let restored = from_base64(&b64).unwrap();
let rs = test_filter(&client, restored, &set_name).await;
let count = count_results(rs).await;
assert_eq!(count, 10, "from_base64 range query test failed");
client.close().await.unwrap();
}