use std::pin::Pin;
use std::str;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::vec::Vec;
use crate::expressions::Expression;
use aerospike_core::errors::Result;
use aerospike_core::operations::{CdtContext, Operation};
use aerospike_core::query::PartitionFilter;
use aerospike_core::DropIndexTask;
use aerospike_core::UdfRemoveTask;
use aerospike_core::{
AdminPolicy, BatchOperation, BatchPolicy, BatchRecord, Bin, Bins, ClientPolicy,
CollectionIndexType, ExecuteTask, IndexTask, IndexType, Key, Node, Privilege, QueryPolicy,
ReadPolicy, Record, Recordset, RegisterTask, Role, Statement, ToHosts, UDFLang, User, Value,
WritePolicy,
};
use futures::executor::block_on;
use futures::task::noop_waker;
use futures::Stream;
pub struct BatchStream {
inner: Pin<Box<dyn Stream<Item = (usize, BatchRecord)> + Send>>,
}
impl Iterator for BatchStream {
type Item = (usize, BatchRecord);
fn next(&mut self) -> Option<Self::Item> {
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
loop {
match self.inner.as_mut().poll_next(&mut cx) {
Poll::Ready(item) => return item, Poll::Pending => std::thread::yield_now(),
}
}
}
}
pub struct Client {
async_client: aerospike_core::Client,
}
unsafe impl Send for Client {}
unsafe impl Sync for Client {}
impl Client {
pub fn new(policy: &ClientPolicy, hosts: &(dyn ToHosts + Send + Sync)) -> Result<Self> {
let client = block_on(aerospike_core::Client::new(policy, hosts))?;
Ok(Client {
async_client: client,
})
}
pub fn close(&self) -> Result<()> {
block_on(self.async_client.close())?;
Ok(())
}
pub fn is_connected(&self) -> bool {
self.async_client.is_connected()
}
pub fn node_names(&self) -> Vec<String> {
self.async_client.node_names()
}
pub fn get_node(&self, name: &str) -> Result<Arc<Node>> {
self.async_client.get_node(name)
}
pub fn nodes(&self) -> Vec<Arc<Node>> {
self.async_client.nodes()
}
pub fn get<T>(&self, policy: &ReadPolicy, key: &Key, bins: T) -> Result<Record>
where
T: Into<Bins> + Send + Sync + 'static,
{
block_on(self.async_client.get(policy, key, bins))
}
pub fn batch(
&self,
policy: &BatchPolicy,
batch_records: &[BatchOperation],
) -> Result<Vec<BatchRecord>> {
block_on(self.async_client.batch(policy, batch_records))
}
pub fn batch_stream(
&self,
policy: &BatchPolicy,
ops: Vec<BatchOperation>,
) -> Result<BatchStream> {
let stream = block_on(self.async_client.batch_stream(policy, ops))?;
Ok(BatchStream {
inner: Box::pin(stream),
})
}
pub fn put<'a>(&self, policy: &'a WritePolicy, key: &'a Key, bins: &'a [Bin]) -> Result<()> {
block_on(self.async_client.put(policy, key, bins))
}
pub fn add<'a>(&self, policy: &'a WritePolicy, key: &'a Key, bins: &'a [Bin]) -> Result<()> {
block_on(self.async_client.add(policy, key, bins))
}
pub fn append<'a>(&self, policy: &'a WritePolicy, key: &'a Key, bins: &'a [Bin]) -> Result<()> {
block_on(self.async_client.append(policy, key, bins))
}
pub fn prepend<'a>(
&self,
policy: &'a WritePolicy,
key: &'a Key,
bins: &'a [Bin],
) -> Result<()> {
block_on(self.async_client.prepend(policy, key, bins))
}
pub fn delete(&self, policy: &WritePolicy, key: &Key) -> Result<bool> {
block_on(self.async_client.delete(policy, key))
}
pub fn touch(&self, policy: &WritePolicy, key: &Key) -> Result<()> {
block_on(self.async_client.touch(policy, key))
}
pub fn exists(&self, policy: &ReadPolicy, key: &Key) -> Result<bool> {
block_on(self.async_client.exists(policy, key))
}
pub fn operate(&self, policy: &WritePolicy, key: &Key, ops: &[Operation]) -> Result<Record> {
block_on(self.async_client.operate(policy, key, ops))
}
pub fn register_udf(
&self,
policy: &AdminPolicy,
udf_body: &[u8],
server_path: &str,
language: UDFLang,
) -> Result<RegisterTask> {
block_on(
self.async_client
.register_udf(policy, udf_body, server_path, language),
)
}
pub fn register_udf_from_file(
&self,
policy: &AdminPolicy,
client_path: &str,
server_path: &str,
language: UDFLang,
) -> Result<RegisterTask> {
block_on(self.async_client.register_udf_from_file(
policy,
client_path,
server_path,
language,
))
}
pub fn remove_udf(&self, policy: &AdminPolicy, server_path: &str) -> Result<UdfRemoveTask> {
block_on(self.async_client.remove_udf(policy, server_path))
}
pub fn execute_udf(
&self,
policy: &WritePolicy,
key: &Key,
server_path: &str,
function_name: &str,
args: Option<&[Value]>,
) -> Result<Option<Value>> {
block_on(
self.async_client
.execute_udf(policy, key, server_path, function_name, args),
)
}
pub fn query(
&self,
policy: &QueryPolicy,
partition_filter: PartitionFilter,
statement: Statement,
) -> Result<Arc<Recordset>> {
block_on(self.async_client.query(policy, partition_filter, statement))
}
pub fn query_operate(
&self,
write_policy: &WritePolicy,
statement: Statement,
operations: &[Operation],
) -> Result<ExecuteTask> {
block_on(
self.async_client
.query_operate(write_policy, statement, operations),
)
}
pub fn query_execute_udf(
&self,
write_policy: &WritePolicy,
statement: Statement,
package_name: &str,
function_name: &str,
args: Option<&[Value]>,
) -> Result<ExecuteTask> {
block_on(self.async_client.query_execute_udf(
write_policy,
statement,
package_name,
function_name,
args,
))
}
pub async fn set_xdr_filter(
&self,
policy: &AdminPolicy,
datacenter: &str,
namespace: &str,
filter_expression: Option<&Expression>,
) -> Result<()> {
block_on(
self.async_client
.set_xdr_filter(policy, datacenter, namespace, filter_expression),
)
}
pub fn truncate(
&self,
policy: &AdminPolicy,
namespace: &str,
set_name: &str,
before_nanos: i64,
) -> Result<()> {
block_on(
self.async_client
.truncate(policy, namespace, set_name, before_nanos),
)
}
pub async fn create_index_on_bin(
&self,
policy: &AdminPolicy,
namespace: &str,
set_name: &str,
bin_name: &str,
index_name: &str,
index_type: IndexType,
collection_index_type: CollectionIndexType,
ctx: Option<&[CdtContext]>,
) -> Result<IndexTask> {
block_on(self.async_client.create_index_on_bin(
policy,
namespace,
set_name,
bin_name,
index_name,
index_type,
collection_index_type,
ctx,
))
}
pub async fn create_index_using_expression(
&self,
policy: &AdminPolicy,
namespace: &str,
set_name: &str,
index_name: &str,
index_type: IndexType,
collection_index_type: CollectionIndexType,
expression: &Expression,
) -> Result<IndexTask> {
block_on(self.async_client.create_index_using_expression(
policy,
namespace,
set_name,
index_name,
index_type,
collection_index_type,
expression,
))
}
pub fn drop_index(
&self,
policy: &AdminPolicy,
namespace: &str,
set_name: &str,
index_name: &str,
) -> Result<DropIndexTask> {
block_on(
self.async_client
.drop_index(policy, namespace, set_name, index_name),
)
}
pub async fn create_user(
&self,
policy: &AdminPolicy,
user: &str,
password: &str,
roles: &[&str],
) -> Result<()> {
block_on(self.async_client.create_user(policy, user, password, roles))
}
pub async fn drop_user(&self, policy: &AdminPolicy, user: &str) -> Result<()> {
block_on(self.async_client.drop_user(policy, user))
}
pub async fn change_password(
&self,
policy: &AdminPolicy,
user: &str,
password: &str,
) -> Result<()> {
block_on(self.async_client.change_password(policy, user, password))
}
pub async fn grant_roles(
&self,
policy: &AdminPolicy,
user: &str,
roles: &[&str],
) -> Result<()> {
block_on(self.async_client.grant_roles(policy, user, roles))
}
pub async fn revoke_roles(
&self,
policy: &AdminPolicy,
user: &str,
roles: &[&str],
) -> Result<()> {
block_on(self.async_client.revoke_roles(policy, user, roles))
}
pub async fn query_users(&self, policy: &AdminPolicy, user: Option<&str>) -> Result<Vec<User>> {
block_on(self.async_client.query_users(policy, user))
}
pub async fn create_role(
&self,
policy: &AdminPolicy,
role_name: &str,
privileges: &[Privilege],
allowlist: &[&str],
read_quota: u32,
write_quota: u32,
) -> Result<()> {
block_on(self.async_client.create_role(
policy,
role_name,
privileges,
allowlist,
read_quota,
write_quota,
))
}
pub async fn query_roles(&self, policy: &AdminPolicy, role: Option<&str>) -> Result<Vec<Role>> {
block_on(self.async_client.query_roles(policy, role))
}
pub async fn drop_role(&self, policy: &AdminPolicy, role_name: &str) -> Result<()> {
block_on(self.async_client.drop_role(policy, role_name))
}
pub async fn grant_privileges(
&self,
policy: &AdminPolicy,
role_name: &str,
privileges: &[Privilege],
) -> Result<()> {
block_on(
self.async_client
.grant_privileges(policy, role_name, privileges),
)
}
pub async fn revoke_privileges(
&self,
policy: &AdminPolicy,
role_name: &str,
privileges: &[Privilege],
) -> Result<()> {
block_on(
self.async_client
.revoke_privileges(policy, role_name, privileges),
)
}
pub async fn set_allowlist(
&self,
policy: &AdminPolicy,
role_name: &str,
allowlist: &[&str],
) -> Result<()> {
block_on(
self.async_client
.set_allowlist(policy, role_name, allowlist),
)
}
pub async fn set_quotas(
&self,
policy: &AdminPolicy,
role_name: &str,
read_quota: u32,
write_quota: u32,
) -> Result<()> {
block_on(
self.async_client
.set_quotas(policy, role_name, read_quota, write_quota),
)
}
}