use aerospike::operations;
use aerospike::operations::lists;
use aerospike::*;
use crate::common;
use aerospike::{Expiration, ReadTouchTTL};
use aerospike_rt::sleep;
use aerospike_rt::time::{Duration, Instant};
#[aerospike_macro::test]
async fn batch_operate_timeout() {
let client = common::client().await;
let namespace: &str = common::namespace();
let set_name = &common::rand_str(10);
let mut bpolicy = BatchPolicy::default();
bpolicy.concurrency = Concurrency::Parallel;
bpolicy.base_policy.total_timeout = 10;
bpolicy.base_policy.socket_timeout = 10;
bpolicy.base_policy.max_retries = 0;
bpolicy.base_policy.sleep_between_retries = 0;
let key1 = as_key!(namespace, set_name, 1);
let bin1 = as_bin!("a", "a value");
let bin2 = as_bin!("b", "another value");
let bin3 = as_bin!("c", 42);
let wops = vec![
operations::put(&bin1),
operations::put(&bin2),
operations::put(&bin3),
];
let bpw = BatchWritePolicy::default();
let mut bops = vec![];
for _ in 0..10000 {
bops.push(BatchOperation::write(&bpw, key1.clone(), wops.clone()));
}
let start = Instant::now();
let _res = client.batch(&bpolicy, &bops).await;
let duration = start.elapsed();
let expected_duration = Duration::from_millis((bpolicy.total_timeout() * 2) as u64);
assert!(duration < expected_duration);
}
#[aerospike_macro::test]
async fn batch_operate_read() {
let client = common::client().await;
let namespace: &str = common::namespace();
let set_name = &common::rand_str(10);
let mut bpolicy = BatchPolicy::default();
bpolicy.concurrency = Concurrency::Parallel;
let apolicy = AdminPolicy::default();
let udf_body = r#"
function echo(rec, val)
return val
end
"#;
let task = client
.register_udf(&apolicy, udf_body.as_bytes(), "test_udf.lua", UDFLang::Lua)
.await
.unwrap();
task.wait_till_complete(None).await.unwrap();
let bin1 = as_bin!("a", "a value");
let bin2 = as_bin!("b", "another value");
let bin3 = as_bin!("c", 42);
let key1 = as_key!(namespace, set_name, 1);
let key2 = as_key!(namespace, set_name, 2);
let key3 = as_key!(namespace, set_name, 3);
let key4 = as_key!(namespace, set_name, -1);
let selected = Bins::from(["a"]);
let all = Bins::All;
let none = Bins::None;
let wops = vec![
operations::put(&bin1),
operations::put(&bin2),
operations::put(&bin3),
];
let rops = vec![
operations::get_bin(&bin1.name),
operations::get_bin(&bin2.name),
operations::get_header(),
];
let bpr = BatchReadPolicy::default();
let bpw = BatchWritePolicy::default();
let bpd = BatchDeletePolicy::default();
let bpu = BatchUDFPolicy::default();
let batch = vec![
BatchOperation::write(&bpw, key1.clone(), wops.clone()),
BatchOperation::write(&bpw, key2.clone(), wops.clone()),
BatchOperation::write(&bpw, key3.clone(), wops.clone()),
];
let mut results = client.batch(&bpolicy, &batch).await.unwrap();
let result = results.remove(0);
assert_eq!(result.key, key1);
let result = results.remove(0);
assert_eq!(result.key, key2);
let result = results.remove(0);
assert_eq!(result.key, key3);
let batch = vec![
BatchOperation::read(&bpr, key1.clone(), selected),
BatchOperation::read(&bpr, key2.clone(), all),
BatchOperation::read(&bpr, key3.clone(), none.clone()),
BatchOperation::read_ops(&bpr, key3.clone(), rops),
BatchOperation::read(&bpr, key4.clone(), none),
];
let mut results = client.batch(&bpolicy, &batch).await.unwrap();
let result = results.remove(0);
assert_eq!(result.key, key1);
let record = result.record.unwrap();
assert_eq!(record.bins.keys().count(), 1);
let result = results.remove(0);
assert_eq!(result.key, key2);
let record = result.record.unwrap();
assert_eq!(record.bins.keys().count(), 3);
let result = results.remove(0);
assert_eq!(result.key, key3);
let record = result.record.unwrap();
assert_eq!(record.bins.keys().count(), 0);
let result = results.remove(0);
assert_eq!(result.key, key3);
let result = results.remove(0);
assert_eq!(result.key, key4);
let batch = vec![
BatchOperation::delete(&bpd, key1.clone()),
BatchOperation::delete(&bpd, key2.clone()),
BatchOperation::delete(&bpd, key3.clone()),
BatchOperation::delete(&bpd, key4.clone()),
];
let mut results = client.batch(&bpolicy, &batch).await.unwrap();
let result = results.remove(0);
assert_eq!(result.key, key1);
let result = results.remove(0);
assert_eq!(result.key, key2);
let result = results.remove(0);
assert_eq!(result.key, key3);
let result = results.remove(0);
assert_eq!(result.key, key4);
let record = result.record;
assert!(record.is_none());
let batch = vec![
BatchOperation::read(&bpr, key1.clone(), Bins::None),
BatchOperation::read(&bpr, key2.clone(), Bins::None),
BatchOperation::read(&bpr, key3.clone(), Bins::None),
BatchOperation::read(&bpr, key4.clone(), Bins::None),
];
let mut results = client.batch(&bpolicy, &batch).await.unwrap();
let result = results.remove(0);
assert_eq!(result.key, key1);
let record = result.record;
assert!(record.is_none());
let result = results.remove(0);
assert_eq!(result.key, key2);
let record = result.record;
assert!(record.is_none());
let result = results.remove(0);
assert_eq!(result.key, key3);
let record = result.record;
assert!(record.is_none());
let result = results.remove(0);
assert_eq!(result.key, key4);
let record = result.record;
assert!(record.is_none());
let args1 = vec![as_val!(1)];
let args2 = vec![as_val!(2)];
let args3 = vec![as_val!(3)];
let args4 = vec![as_val!(4)];
let batch = vec![
BatchOperation::udf(&bpu, key1.clone(), "test_udf", "echo", Some(args1)),
BatchOperation::udf(&bpu, key2.clone(), "test_udf", "echo", Some(args2)),
BatchOperation::udf(&bpu, key3.clone(), "test_udf", "echo", Some(args3)),
BatchOperation::udf(
&bpu,
key4.clone(),
"test_udf",
"echo_not_exists",
Some(args4),
),
];
let mut results = client.batch(&bpolicy, &batch).await.unwrap();
let result = results.remove(0);
assert_eq!(result.key, key1);
let record = result.record;
assert_eq!(record.unwrap().bins.get("SUCCESS"), Some(&as_val!(1)));
let result = results.remove(0);
assert_eq!(result.key, key2);
let record = result.record;
assert_eq!(record.unwrap().bins.get("SUCCESS"), Some(&as_val!(2)));
let result = results.remove(0);
assert_eq!(result.key, key3);
let record = result.record;
assert_eq!(record.unwrap().bins.get("SUCCESS"), Some(&as_val!(3)));
let result = results.remove(0);
assert_eq!(result.key, key4);
assert_eq!(result.result_code, Some(ResultCode::UdfBadResponse));
let record = result.record;
assert_eq!(
record.unwrap().bins.get("FAILURE"),
Some(&as_val!("function not found"))
);
client.close().await.unwrap();
}
#[aerospike_macro::test]
async fn batch_operate_read_multi_op_single_bin() {
let client = common::client().await;
let namespace: &str = common::namespace();
let set_name = &common::rand_str(10);
let mut bpolicy = BatchPolicy::default();
bpolicy.concurrency = Concurrency::Parallel;
let key = as_key!(namespace, set_name, common::rand_str(10));
let wp = WritePolicy::default();
let bin = as_bin!("lbin", Value::List(as_values!(111, 222, 333)));
client
.put(&wp, &key, &vec![bin])
.await
.expect("put failed.");
let brp = BatchReadPolicy::default();
let br = BatchOperation::read_ops(
&brp,
key.clone(),
vec![
lists::size("lbin"),
lists::get_by_index("lbin", -1, lists::ListReturnType::Values),
],
);
let list = vec![br];
let mut results = client.batch(&bpolicy, &list).await.unwrap();
let result = results.remove(0);
assert!(Some(ResultCode::Ok) == result.result_code);
assert!(
Some(&Value::MultiResult(as_values!(3, 333))) == result.record.unwrap().bins.get("lbin")
);
}
#[aerospike_macro::test]
async fn batch_operate_read_touch_ttl() {
let client = common::client().await;
let namespace: &str = common::namespace();
let set_name = &common::rand_str(10);
let mut bpolicy = BatchPolicy::default();
bpolicy.concurrency = Concurrency::Parallel;
let key1 = as_key!(namespace, set_name, 88888);
let key2 = as_key!(namespace, set_name, 88889);
let mut bwp = BatchWritePolicy::default();
bwp.expiration = Expiration::Seconds(10);
let bin1 = as_bin!("a", 1);
let bw1 = BatchOperation::write(&bwp, key1.clone(), vec![operations::put(&bin1)]);
let bw2 = BatchOperation::write(&bwp, key2.clone(), vec![operations::put(&bin1)]);
let list = vec![bw1, bw2];
client.batch(&bpolicy, &list).await.unwrap();
sleep(Duration::from_secs(8)).await;
let mut brp1 = BatchReadPolicy::default();
brp1.read_touch_ttl = ReadTouchTTL::Percent(80);
let mut brp2 = BatchReadPolicy::default();
brp2.read_touch_ttl = ReadTouchTTL::DontReset;
let br1 = BatchOperation::read(&brp1, key1.clone(), Bins::Some(vec!["a".into()]));
let br2 = BatchOperation::read(&brp2, key2.clone(), Bins::Some(vec!["a".into()]));
let list = vec![br1, br2];
let recs = client.batch(&bpolicy, &list).await.unwrap();
assert!(Some(ResultCode::Ok) == recs[0].result_code);
assert!(Some(ResultCode::Ok) == recs[1].result_code);
sleep(Duration::from_secs(3)).await;
brp1.read_touch_ttl = ReadTouchTTL::DontReset;
brp2.read_touch_ttl = ReadTouchTTL::DontReset;
let br1 = BatchOperation::read(&brp1, key1.clone(), Bins::Some(vec!["a".into()]));
let br2 = BatchOperation::read(&brp2, key2.clone(), Bins::Some(vec!["a".into()]));
let list = vec![br1, br2];
let recs = client.batch(&bpolicy, &list).await.unwrap();
assert!(Some(ResultCode::Ok) == recs[0].result_code);
assert!(Some(ResultCode::KeyNotFoundError) == recs[1].result_code);
sleep(Duration::from_secs(8)).await;
let recs = client.batch(&bpolicy, &list).await.unwrap();
assert!(Some(ResultCode::KeyNotFoundError) == recs[0].result_code);
assert!(Some(ResultCode::KeyNotFoundError) == recs[1].result_code);
}