use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use futures::stream::StreamExt;
use crate::common;
use aerospike::query::{Filter, PartitionFilter};
use aerospike::Task;
use aerospike::*;
use aerospike_rt::time::{Duration, Instant};
const EXPECTED: usize = 1000;
async fn create_test_set(client: &Client, no_records: usize) -> String {
let namespace = common::namespace();
let set_name = common::rand_str(10);
let wpolicy = WritePolicy::default();
let apolicy = AdminPolicy::default();
for i in 0..no_records as i64 {
let key = as_key!(namespace, &set_name, i);
let wbin1 = as_bin!("bin", i);
let wbin2 = as_bin!("bin2", "hello");
let wbin3 = as_bin!("extra", "extra");
let bins = vec![wbin1, wbin2, wbin3];
client.delete(&wpolicy, &key).await.unwrap();
client.put(&wpolicy, &key, &bins).await.unwrap();
}
let task = client
.create_index_on_bin(
&apolicy,
namespace,
&set_name,
"bin",
&format!("{}_{}_{}", namespace, set_name, "bin"),
IndexType::Numeric,
CollectionIndexType::Default,
None,
)
.await
.expect("Failed to create index");
task.wait_till_complete(None).await.unwrap();
set_name
}
#[aerospike_macro::test]
async fn query_timeout() {
let client = common::client().await;
let namespace = common::namespace();
let set_name = create_test_set(&client, EXPECTED).await;
let mut qpolicy = QueryPolicy::default();
qpolicy.base_policy.total_timeout = 5;
qpolicy.base_policy.socket_timeout = 5;
let statement = Statement::new(namespace, &set_name, Bins::All);
let pf = PartitionFilter::all();
let start = Instant::now();
let rs = client.query(&qpolicy, pf, statement).await.unwrap();
let mut rs = rs.into_stream();
let mut timed_out = false;
while let Some(res) = rs.next().await {
match res {
Ok(_) => (),
Err(Error::Timeout(_)) => timed_out = true,
Err(err) => panic!("{:?}", err),
}
}
let duration = start.elapsed();
let expected_duration = Duration::from_millis((qpolicy.total_timeout() * 2) as u64);
assert!(duration < expected_duration);
assert_eq!(timed_out, true);
client.close().await.unwrap();
}
#[aerospike_macro::test]
async fn query_single_consumer_no_setname() {
let client = common::client().await;
let namespace = common::namespace();
let set_name = "";
let mut qpolicy = QueryPolicy::default();
qpolicy.expected_duration = QueryDuration::Short;
let statement = Statement::new(namespace, &set_name, Bins::All);
let pf = PartitionFilter::all();
let rs = client.query(&qpolicy, pf, statement).await.unwrap();
let mut count = 0;
let mut rs = rs.into_stream();
while let Some(res) = rs.next().await {
match res {
Ok(_) => {
count += 1;
}
Err(err) => panic!("{:?}", err),
}
}
assert!(count > 0);
client.close().await.unwrap();
}
#[aerospike_macro::test]
async fn query_single_consumer() {
let client = common::client().await;
let namespace = common::namespace();
let set_name = create_test_set(&client, EXPECTED).await;
let mut qpolicy = QueryPolicy::default();
qpolicy.expected_duration = QueryDuration::Short;
let mut statement = Statement::new(namespace, &set_name, Bins::All);
statement.add_filter(Filter::equal("bin", 1));
let pf = PartitionFilter::all();
let rs = client.query(&qpolicy, pf, statement).await.unwrap();
let mut count = 0;
let mut rs = rs.into_stream();
while let Some(res) = rs.next().await {
match res {
Ok(rec) => {
assert_eq!(rec.bins["bin"], as_val!(1));
count += 1;
}
Err(err) => panic!("{:?}", err),
}
}
assert_eq!(count, 1);
let mut statement = Statement::new(namespace, &set_name, Bins::All);
statement.add_filter(Filter::range("bin", 0, 9));
let pf = PartitionFilter::all();
let rs = client.query(&qpolicy, pf, statement).await.unwrap();
let mut count = 0;
let mut rs = rs.into_stream();
while let Some(res) = rs.next().await {
match res {
Ok(rec) => {
count += 1;
let v: i64 = rec.bins["bin"].clone().into();
assert!(v >= 0);
assert!(v < 10);
}
Err(err) => panic!("{:?}", err),
}
}
assert_eq!(count, 10);
client.close().await.unwrap();
}
#[aerospike_macro::test]
async fn query_single_consumer_with_cursor() {
let client = common::client().await;
let namespace = common::namespace();
let set_name = create_test_set(&client, EXPECTED).await;
let mut qpolicy = QueryPolicy::default();
qpolicy.expected_duration = QueryDuration::Short;
let mut pf = PartitionFilter::all();
let mut count = 0;
while !pf.done() {
let mut statement = Statement::new(namespace, &set_name, Bins::All);
statement.add_filter(Filter::equal("bin", 1));
let rs = client.query(&qpolicy, pf, statement).await.unwrap();
let mut rs = rs.into_stream();
while let Some(res) = rs.next().await {
match res {
Ok(rec) => {
assert_eq!(rec.bins["bin"], as_val!(1));
count += 1;
}
Err(err) => panic!("{:?}", err),
}
}
pf = rs.partition_filter().await.unwrap();
}
assert_eq!(count, 1);
let mut pf = PartitionFilter::all();
let mut iter = 0;
count = 0;
qpolicy.max_records = 1;
while !pf.done() {
iter += 1;
let mut statement = Statement::new(namespace, &set_name, Bins::Some(vec!["bin".into()]));
statement.add_filter(Filter::range("bin", 0, 9));
let rs = client.query(&qpolicy, pf, statement).await.unwrap();
let mut rs = rs.into_stream();
while let Some(res) = rs.next().await {
match res {
Ok(rec) => {
count += 1;
let v: i64 = rec.bins["bin"].clone().into();
assert!(v >= 0);
assert!(v < 10);
}
Err(err) => panic!("{:?}", err),
}
}
pf = rs.partition_filter().await.unwrap();
}
assert_eq!(count, 10);
assert_eq!(iter, 11);
let mut pf = PartitionFilter::all();
qpolicy.max_records = (EXPECTED / 3) as u64;
iter = 0;
count = 0;
while !pf.done() {
iter += 1;
let statement = Statement::new(namespace, &set_name, Bins::Some(vec!["bin".into()]));
let rs = client.query(&qpolicy, pf, statement).await.unwrap();
let mut rs = rs.into_stream();
while let Some(res) = rs.next().await {
match res {
Ok(rec) => {
count += 1;
rec.bins.get("bin").unwrap(); }
Err(err) => panic!("{:?}", err),
}
}
pf = rs.partition_filter().await.unwrap();
}
assert_eq!(count, EXPECTED);
assert_eq!(iter, 4);
client.close().await.unwrap();
}
#[aerospike_macro::test]
async fn query_single_consumer_rps() {
let client = common::client().await;
if client.nodes().len() != 1 {
return;
}
let namespace = common::namespace();
let set_name = create_test_set(&client, EXPECTED).await;
let mut qpolicy = QueryPolicy::default();
let mut statement = Statement::new(namespace, &set_name, Bins::All);
statement.add_filter(Filter::range("bin", 0, (EXPECTED / 3) as i64));
qpolicy.records_per_second = 3;
let start_time = Instant::now();
let pf = PartitionFilter::all();
let rs = client.query(&qpolicy, pf, statement).await.unwrap();
let mut count = 0;
let mut rs = rs.into_stream();
while let Some(res) = rs.next().await {
match res {
Ok(rec) => {
count += 1;
let v: i64 = rec.bins["bin"].clone().into();
assert!(v >= 0);
assert!(v <= (EXPECTED / 3) as i64);
}
Err(err) => panic!("{:?}", err),
}
}
assert_eq!(count, EXPECTED / 3 + 1);
let duration = Instant::now() - start_time;
assert!(duration.as_millis() > 3000);
client.close().await.unwrap();
}
#[aerospike_macro::test]
async fn query_nobins() {
let client = common::client().await;
let namespace = common::namespace();
let set_name = create_test_set(&client, EXPECTED).await;
let qpolicy = QueryPolicy::default();
let mut statement = Statement::new(namespace, &set_name, Bins::None);
statement.add_filter(Filter::range("bin", 0, 9));
let pf = PartitionFilter::all();
let rs = client.query(&qpolicy, pf, statement).await.unwrap();
let mut count = 0;
let mut rs = rs.into_stream();
while let Some(res) = rs.next().await {
match res {
Ok(rec) => {
count += 1;
assert!(rec.generation > 0);
assert_eq!(0, rec.bins.len());
}
Err(err) => panic!("{:?}", err),
}
}
assert_eq!(count, 10);
client.close().await.unwrap();
}
#[aerospike_macro::test]
async fn query_some_bins() {
let client = common::client().await;
let namespace = common::namespace();
let set_name = create_test_set(&client, EXPECTED).await;
let qpolicy = QueryPolicy::default();
let mut statement = Statement::new(namespace, &set_name, Bins::Some(vec!["bin".into()]));
statement.add_filter(Filter::range("bin", 0, 9));
let pf = PartitionFilter::all();
let rs = client.query(&qpolicy, pf, statement).await.unwrap();
let mut count = 0;
let mut rs = rs.into_stream();
while let Some(res) = rs.next().await {
match res {
Ok(rec) => {
count += 1;
assert!(rec.generation > 0);
assert_eq!(1, rec.bins.len());
}
Err(err) => panic!("{:?}", err),
}
}
assert_eq!(count, 10);
client.close().await.unwrap();
}
#[aerospike_macro::test]
async fn query_multi_consumer() {
let client = common::client().await;
let namespace = common::namespace();
let set_name = create_test_set(&client, EXPECTED).await;
let qpolicy = QueryPolicy::default();
let mut statement = Statement::new(namespace, &set_name, Bins::All);
let f = Filter::range("bin", 0, 9);
statement.add_filter(f);
let pf = PartitionFilter::all();
let rs = client.query(&qpolicy, pf, statement).await.unwrap();
let count = Arc::new(AtomicUsize::new(0));
let mut threads = vec![];
for _ in 0..8 {
let count = count.clone();
let rs = rs.clone();
threads.push(aerospike_rt::spawn(async move {
let mut rs = rs.into_stream();
while let Some(res) = rs.next().await {
match res {
Ok(rec) => {
count.fetch_add(1, Ordering::Relaxed);
let v: i64 = rec.bins["bin"].clone().into();
assert!(v >= 0);
assert!(v < 10);
}
Err(err) => panic!("{:?}", err),
}
}
}));
}
futures::future::join_all(threads).await;
assert_eq!(count.load(Ordering::Relaxed), 10);
client.close().await.unwrap();
}
#[aerospike_macro::test]
async fn query_large_i64() {
const SET: &str = "large_i64";
const BIN: &str = "val";
let client = Arc::new(common::client().await);
let value = Value::from(i64::max_value());
let key = Key::new(common::namespace(), SET, value.clone()).unwrap();
let wpolicy = WritePolicy::default();
let apolicy = AdminPolicy::default();
let res = client
.put(&wpolicy, &key, &[aerospike::Bin::new(BIN.into(), value)])
.await;
assert!(res.is_ok());
let mut qpolicy = aerospike::QueryPolicy::new();
let bin_name = aerospike::expressions::int_bin(BIN.into());
let bin_val = aerospike::expressions::int_val(i64::max_value());
qpolicy
.base_policy
.filter_expression
.replace(aerospike::expressions::eq(bin_name, bin_val));
let stmt = aerospike::Statement::new(common::namespace(), SET, aerospike::Bins::All);
let pf = PartitionFilter::all();
let recordset = client.query(&qpolicy, pf, stmt).await.unwrap();
let mut recordset = recordset.into_stream();
while let Some(r) = recordset.next().await {
assert!(r.is_ok());
let int = r.unwrap().bins.remove(BIN).unwrap();
assert_eq!(int, Value::Int(i64::max_value()));
}
let _ = client.truncate(&apolicy, common::namespace(), SET, 0).await;
}
#[aerospike_macro::test]
async fn test_query_geo_within_geojson_region() {
let namespace: &str = common::namespace();
let set_name = &common::rand_str(10);
let bin_name = "geo_bin";
let client = Arc::new(common::client().await);
let apolicy = AdminPolicy::default();
let task = client
.create_index_on_bin(
&apolicy,
namespace,
set_name,
bin_name,
&format!("{}_{}_{}", namespace, set_name, bin_name),
IndexType::Geo2DSphere,
CollectionIndexType::Default,
None,
)
.await
.expect("Failed to create index");
task.wait_till_complete(None).await.unwrap();
let wp = WritePolicy::default();
let key1 = as_key!(namespace, set_name, "point1");
client
.put(
&wp,
&key1,
&vec![as_bin!(
bin_name,
as_geo!(r#"{"type": "Point", "coordinates": [-122.0, 37.5]}"#)
)],
)
.await
.unwrap();
let key2 = as_key!(namespace, set_name, "point2");
client
.put(
&wp,
&key2,
&vec![as_bin!(
bin_name,
as_geo!(r#"{"type": "Point", "coordinates": [-121.5, 37.5]}"#)
)],
)
.await
.unwrap();
let key3 = as_key!(namespace, set_name, "point3");
client
.put(
&wp,
&key3,
&vec![as_bin!(
bin_name,
as_geo!(r#"{"type": "Point", "coordinates": [-120.0, 37.5]}"#)
)],
)
.await
.unwrap();
let region_str = r#"{
"type": "Polygon",
"coordinates": [[[-122.500000, 37.000000],
[-121.000000, 37.000000],
[-121.000000, 38.080000],
[-122.500000, 38.080000],
[-122.500000, 37.000000]]]
}"#;
let predicate = Filter::geo_within_region(bin_name, region_str);
let qpolicy = QueryPolicy::default();
let mut stmt = aerospike::Statement::new(namespace, set_name, aerospike::Bins::All);
stmt.add_filter(predicate);
let pf = PartitionFilter::all();
let mut rs = client
.query(&qpolicy, pf, stmt)
.await
.unwrap()
.into_stream();
let mut count = 0;
while let Some(r) = rs.next().await {
assert!(r.is_ok());
count += 1;
}
assert!(count == 2);
let _ = client.truncate(&apolicy, namespace, set_name, 0).await;
}
#[aerospike_macro::test]
async fn query_filter_with_specific_bins() {
let client = common::client().await;
let namespace = common::namespace();
let set_name = create_test_set(&client, EXPECTED).await;
let qpolicy = QueryPolicy::default();
let mut statement = Statement::new(
namespace,
&set_name,
Bins::Some(vec!["bin".into(), "bin2".into()]),
);
statement.add_filter(Filter::range("bin", 0, 9));
let pf = PartitionFilter::all();
let rs = client.query(&qpolicy, pf, statement).await.unwrap();
let mut count = 0;
let mut rs = rs.into_stream();
while let Some(res) = rs.next().await {
match res {
Ok(rec) => {
count += 1;
assert_eq!(rec.bins.len(), 2, "expected 2 bins, got {:?}", rec.bins);
assert!(rec.bins.contains_key("bin"), "missing 'bin'");
assert!(rec.bins.contains_key("bin2"), "missing 'bin2'");
assert!(
!rec.bins.contains_key("extra"),
"'extra' should not be returned"
);
let v: i64 = rec.bins["bin"].clone().into();
assert!(v >= 0 && v < 10);
assert_eq!(rec.bins["bin2"], as_val!("hello"));
}
Err(err) => panic!("{:?}", err),
}
}
assert_eq!(count, 10);
client.close().await.unwrap();
}
#[aerospike_macro::test]
async fn query_filter_with_index_name() {
let client = common::client().await;
let namespace = common::namespace();
let set_name = create_test_set(&client, EXPECTED).await;
let qpolicy = QueryPolicy::default();
let index_name = format!("{}_{}_{}", namespace, set_name, "bin");
let mut statement = Statement::new(
namespace,
&set_name,
Bins::Some(vec!["bin".into(), "bin2".into()]),
);
let filter = Filter::range_by_index(&index_name, 0, 9);
statement.add_filter(filter);
let pf = PartitionFilter::all();
let rs = client.query(&qpolicy, pf, statement).await.unwrap();
let mut count = 0;
let mut rs = rs.into_stream();
while let Some(res) = rs.next().await {
match res {
Ok(rec) => {
count += 1;
assert_eq!(rec.bins.len(), 2, "expected 2 bins, got {:?}", rec.bins);
assert!(rec.bins.contains_key("bin"), "missing 'bin'");
assert!(rec.bins.contains_key("bin2"), "missing 'bin2'");
let v: i64 = rec.bins["bin"].clone().into();
assert!(v >= 0 && v < 10);
}
Err(err) => panic!("{:?}", err),
}
}
assert_eq!(count, 10);
client.close().await.unwrap();
}
#[aerospike_macro::test]
async fn query_include_bin_data_false() {
let client = common::client().await;
let namespace = common::namespace();
let set_name = create_test_set(&client, EXPECTED).await;
let mut qpolicy = QueryPolicy::default();
qpolicy.include_bin_data = false;
let mut statement = Statement::new(namespace, &set_name, Bins::All);
statement.add_filter(Filter::range("bin", 0, 9));
let pf = PartitionFilter::all();
let rs = client.query(&qpolicy, pf, statement).await.unwrap();
let mut count = 0;
let mut rs = rs.into_stream();
while let Some(res) = rs.next().await {
match res {
Ok(rec) => {
count += 1;
assert!(rec.generation > 0);
assert_eq!(rec.bins.len(), 0, "expected no bins, got {:?}", rec.bins);
}
Err(err) => panic!("{:?}", err),
}
}
assert_eq!(count, 10);
client.close().await.unwrap();
}
#[aerospike_macro::test]
async fn query_scan_with_specific_bins() {
let client = common::client().await;
let namespace = common::namespace();
let set_name = create_test_set(&client, 50).await;
let qpolicy = QueryPolicy::default();
let statement = Statement::new(
namespace,
&set_name,
Bins::Some(vec!["bin".into(), "bin2".into()]),
);
let pf = PartitionFilter::all();
let rs = client.query(&qpolicy, pf, statement).await.unwrap();
let mut count = 0;
let mut rs = rs.into_stream();
while let Some(res) = rs.next().await {
match res {
Ok(rec) => {
count += 1;
assert_eq!(rec.bins.len(), 2, "expected 2 bins, got {:?}", rec.bins);
assert!(rec.bins.contains_key("bin"), "missing 'bin'");
assert!(rec.bins.contains_key("bin2"), "missing 'bin2'");
assert!(
!rec.bins.contains_key("extra"),
"'extra' should not be returned"
);
}
Err(err) => panic!("{:?}", err),
}
}
assert_eq!(count, 50);
client.close().await.unwrap();
}
#[aerospike_macro::test]
async fn query_long_relax_ap_duration() {
let client = common::client().await;
let namespace = common::namespace();
let set_name = create_test_set(&client, EXPECTED).await;
let mut qpolicy = QueryPolicy::default();
qpolicy.expected_duration = QueryDuration::LongRelaxAP;
let mut statement = Statement::new(namespace, &set_name, Bins::All);
statement.add_filter(Filter::range("bin", 0, 9));
let pf = PartitionFilter::all();
let rs = client.query(&qpolicy, pf, statement).await.unwrap();
let mut count = 0;
let mut rs = rs.into_stream();
while let Some(res) = rs.next().await {
match res {
Ok(rec) => {
count += 1;
let v: i64 = rec.bins["bin"].clone().into();
assert!(v >= 0 && v < 10);
}
Err(err) => panic!("{:?}", err),
}
}
assert_eq!(count, 10);
client.close().await.unwrap();
}
#[aerospike_macro::test]
async fn query_operate_write() {
let client = common::client().await;
let namespace = common::namespace();
let set_name = create_test_set(&client, EXPECTED).await;
let wpolicy = WritePolicy::default();
let mut statement = Statement::new(namespace, &set_name, Bins::All);
statement.add_filter(Filter::range("bin", 0, 99));
let ops = vec![operations::add(&as_bin!("bin", 100))];
let task = client
.query_operate(&wpolicy, statement, &ops)
.await
.expect("query_operate failed");
task.wait_till_complete(Some(Duration::from_secs(30)))
.await
.expect("task did not complete");
let rpolicy = ReadPolicy::default();
for i in 0..100_i64 {
let key = as_key!(namespace, &set_name, i);
let rec = client.get(&rpolicy, &key, Bins::All).await.unwrap();
let val: i64 = rec.bins["bin"].clone().into();
assert_eq!(val, i + 100, "record {i} was not updated correctly");
}
client.close().await.unwrap();
}
#[aerospike_macro::test]
async fn query_operate_scan_all() {
let client = common::client().await;
let namespace = common::namespace();
let set_name = create_test_set(&client, 50).await;
let wpolicy = WritePolicy::default();
let statement = Statement::new(namespace, &set_name, Bins::All);
let ops = vec![operations::put(&as_bin!("new_bin", 999))];
let task = client
.query_operate(&wpolicy, statement, &ops)
.await
.expect("query_operate scan failed");
task.wait_till_complete(Some(Duration::from_secs(30)))
.await
.expect("task did not complete");
let rpolicy = ReadPolicy::default();
for i in 0..50_i64 {
let key = as_key!(namespace, &set_name, i);
let rec = client.get(&rpolicy, &key, Bins::All).await.unwrap();
let val: i64 = rec.bins["new_bin"].clone().into();
assert_eq!(val, 999, "record {i} missing new_bin");
}
client.close().await.unwrap();
}
#[aerospike_macro::test]
async fn query_filter_equal_by_index() {
let client = common::client().await;
let namespace = common::namespace();
let set_name = create_test_set(&client, EXPECTED).await;
let qpolicy = QueryPolicy::default();
let index_name = format!("{}_{}_{}", namespace, set_name, "bin");
let mut statement = Statement::new(namespace, &set_name, Bins::All);
statement.add_filter(Filter::equal_by_index(&index_name, 5_i64));
let pf = PartitionFilter::all();
let rs = client.query(&qpolicy, pf, statement).await.unwrap();
let mut count = 0;
let mut rs = rs.into_stream();
while let Some(res) = rs.next().await {
match res {
Ok(rec) => {
count += 1;
let v: i64 = rec.bins["bin"].clone().into();
assert_eq!(v, 5);
}
Err(err) => panic!("{:?}", err),
}
}
assert_eq!(count, 1);
client.close().await.unwrap();
}
async fn create_list_test_set(client: &Client) -> String {
let namespace = common::namespace();
let set_name = common::rand_str(10);
let wpolicy = WritePolicy::default();
let apolicy = AdminPolicy::default();
let list_policy = aerospike::operations::lists::ListPolicy::default();
for i in 0..20_i64 {
let key = as_key!(namespace, &set_name, i);
let ops = vec![operations::lists::append_items(
&list_policy,
"list_bin",
vec![as_val!(i), as_val!(i + 1), as_val!(i + 2)],
)];
client.operate(&wpolicy, &key, &ops).await.unwrap();
}
let idx_name = format!("{}_{}_list_bin", namespace, set_name);
let task = client
.create_index_on_bin(
&apolicy,
namespace,
&set_name,
"list_bin",
&idx_name,
IndexType::Numeric,
CollectionIndexType::List,
None,
)
.await
.expect("Failed to create list index");
task.wait_till_complete(None).await.unwrap();
set_name
}
#[aerospike_macro::test]
async fn query_filter_contains_list() {
let client = common::client().await;
let namespace = common::namespace();
let set_name = create_list_test_set(&client).await;
let qpolicy = QueryPolicy::default();
let mut statement = Statement::new(namespace, &set_name, Bins::All);
statement.add_filter(Filter::contains(
"list_bin",
1_i64,
CollectionIndexType::List,
));
let pf = PartitionFilter::all();
let rs = client.query(&qpolicy, pf, statement).await.unwrap();
let mut count = 0;
let mut rs = rs.into_stream();
while let Some(res) = rs.next().await {
match res {
Ok(_) => count += 1,
Err(err) => panic!("{:?}", err),
}
}
assert_eq!(count, 2);
client.close().await.unwrap();
}
#[aerospike_macro::test]
async fn query_filter_contains_range_list() {
let client = common::client().await;
let namespace = common::namespace();
let set_name = create_list_test_set(&client).await;
let qpolicy = QueryPolicy::default();
let mut statement = Statement::new(namespace, &set_name, Bins::All);
statement.add_filter(Filter::contains_range(
"list_bin",
0_i64,
1_i64,
CollectionIndexType::List,
));
let pf = PartitionFilter::all();
let rs = client.query(&qpolicy, pf, statement).await.unwrap();
let mut count = 0;
let mut rs = rs.into_stream();
while let Some(res) = rs.next().await {
match res {
Ok(_) => count += 1,
Err(err) => panic!("{:?}", err),
}
}
assert_eq!(count, 2);
client.close().await.unwrap();
}
async fn create_geo_test_set(client: &Client) -> String {
let namespace = common::namespace();
let set_name = common::rand_str(10);
let bin_name = "geo_bin";
let apolicy = AdminPolicy::default();
let wp = WritePolicy::default();
let task = client
.create_index_on_bin(
&apolicy,
namespace,
&set_name,
bin_name,
&format!("{}_{}_{}", namespace, set_name, bin_name),
IndexType::Geo2DSphere,
CollectionIndexType::Default,
None,
)
.await
.expect("Failed to create geo index");
task.wait_till_complete(None).await.unwrap();
let key1 = as_key!(namespace, &set_name, "close1");
client
.put(
&wp,
&key1,
&vec![as_bin!(
bin_name,
as_geo!(r#"{"type": "Point", "coordinates": [-122.0, 37.5]}"#)
)],
)
.await
.unwrap();
let key2 = as_key!(namespace, &set_name, "close2");
client
.put(
&wp,
&key2,
&vec![as_bin!(
bin_name,
as_geo!(r#"{"type": "Point", "coordinates": [-122.1, 37.5]}"#)
)],
)
.await
.unwrap();
let key3 = as_key!(namespace, &set_name, "far");
client
.put(
&wp,
&key3,
&vec![as_bin!(
bin_name,
as_geo!(r#"{"type": "Point", "coordinates": [-73.9, 40.7]}"#)
)],
)
.await
.unwrap();
set_name
}
#[aerospike_macro::test]
async fn query_filter_geo_within_radius() {
let client = Arc::new(common::client().await);
let namespace = common::namespace();
let set_name = create_geo_test_set(&client).await;
let qpolicy = QueryPolicy::default();
let mut stmt = Statement::new(namespace, &set_name, Bins::All);
stmt.add_filter(Filter::geo_within_radius("geo_bin", -122.0, 37.5, 50000.0));
let pf = PartitionFilter::all();
let mut rs = client
.query(&qpolicy, pf, stmt)
.await
.unwrap()
.into_stream();
let mut count = 0;
while let Some(r) = rs.next().await {
assert!(r.is_ok());
count += 1;
}
assert_eq!(count, 2);
let apolicy = AdminPolicy::default();
let _ = client.truncate(&apolicy, namespace, &set_name, 0).await;
}
#[aerospike_macro::test]
async fn query_filter_geo_contains() {
let namespace = common::namespace();
let set_name = &common::rand_str(10);
let bin_name = "region_bin";
let client = Arc::new(common::client().await);
let apolicy = AdminPolicy::default();
let wp = WritePolicy::default();
let task = client
.create_index_on_bin(
&apolicy,
namespace,
set_name,
bin_name,
&format!("{}_{}_{}", namespace, set_name, bin_name),
IndexType::Geo2DSphere,
CollectionIndexType::Default,
None,
)
.await
.expect("Failed to create geo index");
task.wait_till_complete(None).await.unwrap();
let key1 = as_key!(namespace, set_name, "region1");
client
.put(
&wp,
&key1,
&vec![as_bin!(
bin_name,
as_geo!(
r#"{
"type": "Polygon",
"coordinates": [[[-123.0, 37.0], [-121.0, 37.0],
[-121.0, 38.0], [-123.0, 38.0],
[-123.0, 37.0]]]
}"#
)
)],
)
.await
.unwrap();
let key2 = as_key!(namespace, set_name, "region2");
client
.put(
&wp,
&key2,
&vec![as_bin!(
bin_name,
as_geo!(
r#"{
"type": "Polygon",
"coordinates": [[[-74.0, 40.0], [-73.0, 40.0],
[-73.0, 41.0], [-74.0, 41.0],
[-74.0, 40.0]]]
}"#
)
)],
)
.await
.unwrap();
let point = r#"{"type": "Point", "coordinates": [-122.0, 37.5]}"#;
let mut stmt = Statement::new(namespace, set_name, Bins::All);
stmt.add_filter(Filter::geo_contains(bin_name, point));
let pf = PartitionFilter::all();
let mut rs = client
.query(&QueryPolicy::default(), pf, stmt)
.await
.unwrap()
.into_stream();
let mut count = 0;
while let Some(r) = rs.next().await {
assert!(r.is_ok());
count += 1;
}
assert_eq!(count, 1);
let _ = client.truncate(&apolicy, namespace, set_name, 0).await;
}
#[aerospike_macro::test]
async fn query_filter_with_expression_builder() {
let client = common::client().await;
let namespace = common::namespace();
let set_name = common::rand_str(10);
let apolicy = AdminPolicy::default();
let wpolicy = WritePolicy::default();
for i in 0..50_i64 {
let key = as_key!(namespace, &set_name, i);
let bins = vec![as_bin!("a", i)];
client.put(&wpolicy, &key, &bins).await.unwrap();
}
let exp = aerospike::expressions::int_bin("a".to_string());
let idx_name = format!("{}_{}_exp_a", namespace, set_name);
let task = client
.create_index_using_expression(
&apolicy,
namespace,
&set_name,
&idx_name,
IndexType::Numeric,
CollectionIndexType::Default,
&exp,
)
.await
.expect("Failed to create expression index");
task.wait_till_complete(None).await.unwrap();
let mut stmt = Statement::new(namespace, &set_name, Bins::All);
stmt.add_filter(Filter::range("a", 0_i64, 9_i64).expression(exp));
let qpolicy = QueryPolicy::default();
let pf = PartitionFilter::all();
let rs = client.query(&qpolicy, pf, stmt).await.unwrap();
let mut count = 0;
let mut rs = rs.into_stream();
while let Some(res) = rs.next().await {
match res {
Ok(rec) => {
count += 1;
let v: i64 = rec.bins["a"].clone().into();
assert!(v >= 0 && v <= 9);
}
Err(err) => panic!("{:?}", err),
}
}
assert_eq!(count, 10);
client.close().await.unwrap();
}
#[aerospike_macro::test]
async fn query_filter_with_context_builder() {
let client = common::client().await;
let namespace = common::namespace();
let set_name = common::rand_str(10);
let apolicy = AdminPolicy::default();
let wpolicy = WritePolicy::default();
let bin_name = "nested";
for i in 0..20_i64 {
let key = as_key!(namespace, &set_name, i);
let list_val = as_list!(i);
let bins = vec![as_bin!(bin_name, list_val)];
client.put(&wpolicy, &key, &bins).await.unwrap();
}
use aerospike::operations::cdt_context::ctx_list_index;
let ctx = vec![ctx_list_index(0)];
let idx_name = format!("{}_{}_nested_ctx", namespace, set_name);
let task = client
.create_index_on_bin(
&apolicy,
namespace,
&set_name,
bin_name,
&idx_name,
IndexType::Numeric,
CollectionIndexType::Default,
Some(&ctx),
)
.await
.expect("Failed to create context index");
task.wait_till_complete(None).await.unwrap();
let mut stmt = Statement::new(namespace, &set_name, Bins::All);
stmt.add_filter(Filter::range(bin_name, 0_i64, 4_i64).context(vec![ctx_list_index(0)]));
let qpolicy = QueryPolicy::default();
let pf = PartitionFilter::all();
let rs = client.query(&qpolicy, pf, stmt).await.unwrap();
let mut count = 0;
let mut rs = rs.into_stream();
while let Some(res) = rs.next().await {
match res {
Ok(_) => count += 1,
Err(err) => panic!("{:?}", err),
}
}
assert_eq!(count, 5);
client.close().await.unwrap();
}
#[aerospike_macro::test]
async fn query_filter_expression_with_policy_filter() {
let client = common::client().await;
let namespace = common::namespace();
let set_name = common::rand_str(10);
let apolicy = AdminPolicy::default();
let wpolicy = WritePolicy::default();
for i in 0..50_i64 {
let key = as_key!(namespace, &set_name, i);
let bins = vec![as_bin!("a", i), as_bin!("b", i % 2)];
client.put(&wpolicy, &key, &bins).await.unwrap();
}
let idx_exp = aerospike::expressions::int_bin("a".to_string());
let idx_name = format!("{}_{}_exp_ab", namespace, set_name);
let task = client
.create_index_using_expression(
&apolicy,
namespace,
&set_name,
&idx_name,
IndexType::Numeric,
CollectionIndexType::Default,
&idx_exp,
)
.await
.expect("Failed to create expression index");
task.wait_till_complete(None).await.unwrap();
let mut qpolicy = QueryPolicy::default();
qpolicy
.base_policy
.filter_expression
.replace(aerospike::expressions::eq(
aerospike::expressions::int_bin("b".to_string()),
aerospike::expressions::int_val(0),
));
let mut stmt = Statement::new(namespace, &set_name, Bins::All);
stmt.add_filter(Filter::range("a", 0_i64, 9_i64).expression(idx_exp));
let pf = PartitionFilter::all();
let rs = client.query(&qpolicy, pf, stmt).await.unwrap();
let mut count = 0;
let mut rs = rs.into_stream();
while let Some(res) = rs.next().await {
match res {
Ok(rec) => {
count += 1;
let a: i64 = rec.bins["a"].clone().into();
let b: i64 = rec.bins["b"].clone().into();
assert!(a >= 0 && a <= 9, "a={} out of index range", a);
assert_eq!(b, 0, "post-filter should exclude odd records, a={}", a);
}
Err(err) => panic!("{:?}", err),
}
}
assert_eq!(count, 5);
client.close().await.unwrap();
}