pub use crate::tarantool::packets::{CommandPacket, TarantoolRequest, TarantoolResponse, TarantoolSqlResponse};
pub use crate::tarantool::tools::{serialize_to_vec_u8, serialize_array};
use futures_channel::mpsc;
use futures_channel::oneshot;
use serde::Serialize;
use std::io;
use std::sync::{Arc, Mutex, RwLock};
pub mod codec;
mod dispatch;
pub mod packets;
mod tools;
use crate::tarantool::dispatch::{
CallbackSender, Dispatch, ERROR_CLIENT_DISCONNECTED, ERROR_DISPATCH_THREAD_IS_DEAD,
};
pub use crate::tarantool::dispatch::{ClientConfig, ClientStatus};
use rmpv::Value;
impl ClientConfig {
pub fn build(self) -> Client {
Client::new(self)
}
}
pub enum IteratorType {
EQ = 0 , REQ = 1, ALL = 2, LT = 3, LE = 4, GE = 5, GT = 6, BitsAllSet = 7, BitsAnySet = 8, BitsAllNotSet = 9, OVERLAPS = 10, NEIGHBOR = 11,
}
#[derive(Clone)]
pub struct Client {
command_sender: mpsc::UnboundedSender<(CommandPacket, CallbackSender)>,
dispatch: Arc<Mutex<Option<Dispatch>>>,
status: Arc<RwLock<ClientStatus>>,
notify_callbacks: Arc<Mutex<Vec<dispatch::ReconnectNotifySender>>>,
}
pub trait ExecWithParamaters {
fn bind_raw(self, param: Vec<u8>) -> io::Result<Self>
where Self: std::marker::Sized;
fn bind_named_ref<T : Serialize, T1:Into<String>>(self, name: T1, param: &T) -> io::Result<Self>
where Self: std::marker::Sized,
{
let mut name_s = name.into();
name_s.insert(0,':');
self.bind_raw(tools::serialize_one_element_map(
name_s,
tools::serialize_to_vec_u8(param)?)?)
}
fn bind_ref<T : Serialize>(self, param: &T) -> io::Result<Self>
where Self: std::marker::Sized
{
self.bind_raw(tools::serialize_to_vec_u8(param)?)
}
fn bind_named<T, T1>(self, name: T1,param: T) -> io::Result<Self>
where T: Serialize,
Self: std::marker::Sized,
T1:Into<String>
{
self.bind_named_ref(name, ¶m)
}
fn bind<T>(self, param: T) -> io::Result<Self>
where T: Serialize,
Self: std::marker::Sized
{
self.bind_ref(¶m)
}
fn bind_null(self) -> io::Result<Self>
where Self: std::marker::Sized
{
self.bind_ref(&Value::Nil)
}
fn bind_named_null<T1>(self, name: T1) -> io::Result<Self>
where Self: std::marker::Sized,
T1:Into<String>
{
self.bind_named_ref(name, &Value::Nil)
}
}
pub struct PreparedSql {
client: Client,
sql: String,
params: Vec<Vec<u8>>
}
pub struct PreparedFunctionCall {
client: Client,
function: String,
params: Vec<Vec<u8>>
}
impl Client {
pub fn new(config: ClientConfig) -> Client {
let (command_sender, command_receiver) = mpsc::unbounded();
let status = Arc::new(RwLock::new(ClientStatus::Init));
let notify_callbacks = Arc::new(Mutex::new(Vec::new()));
Client {
command_sender,
dispatch: Arc::new(Mutex::new(Some(Dispatch::new(
config,
command_receiver,
status.clone(),
notify_callbacks.clone(),
)))),
status,
notify_callbacks,
}
}
pub fn prepare_sql<T>(&self, sql: T) -> PreparedSql
where T:Into<String>
{
PreparedSql {
client : self.clone(),
sql: sql.into(),
params: vec![]
}
}
pub fn prepare_fn_call<T>(&self, function_name: T) -> PreparedFunctionCall
where T:Into<String>
{
PreparedFunctionCall {
client : self.clone(),
function: function_name.into(),
params: vec![]
}
}
pub fn get_status(&self) -> ClientStatus {
self.status.read().unwrap().clone()
}
pub fn subscribe_to_notify_stream(&self) -> mpsc::UnboundedReceiver<ClientStatus> {
let (callback_sender, callback_receiver) = mpsc::unbounded();
self.notify_callbacks.lock().unwrap().push(callback_sender);
callback_receiver
}
pub async fn send_command(&self, req: CommandPacket) -> io::Result<TarantoolResponse> {
if let Some(mut extracted_dispatch) = self.dispatch.clone().lock().unwrap().take() {
debug!("spawn coroutine!");
tokio::spawn(async move {
extracted_dispatch.run().await;
});
}
let (callback_sender, callback_receiver) = oneshot::channel();
let send_res = self.command_sender.unbounded_send((req, callback_sender));
if send_res.is_err() {
return Err(io::Error::new(
io::ErrorKind::Other,
ERROR_DISPATCH_THREAD_IS_DEAD,
));
}
match callback_receiver.await {
Err(_) => Err(io::Error::new(
io::ErrorKind::Other,
ERROR_CLIENT_DISCONNECTED,
)),
Ok(res) => res,
}
}
#[inline(always)]
pub async fn call_fn<T>(&self, function: &str, params: &T) -> io::Result<TarantoolResponse>
where
T: Serialize,
{
self.send_command(CommandPacket::call(function, params).unwrap())
.await
}
#[inline(always)]
pub async fn call_fn1<T1>(&self, function: &str, param1: &T1) -> io::Result<TarantoolResponse>
where
T1: Serialize,
{
self.send_command(CommandPacket::call(function, &(param1,)).unwrap())
.await
}
#[inline(always)]
pub async fn call_fn2<T1, T2>(
&self,
function: &str,
param1: &T1,
param2: &T2,
) -> io::Result<TarantoolResponse>
where
T1: Serialize,
T2: Serialize,
{
self.send_command(CommandPacket::call(function, &(param1, param2)).unwrap())
.await
}
#[inline(always)]
pub async fn call_fn3<T1, T2, T3>(
&self,
function: &str,
param1: &T1,
param2: &T2,
param3: &T3,
) -> io::Result<TarantoolResponse>
where
T1: Serialize,
T2: Serialize,
T3: Serialize,
{
self.send_command(CommandPacket::call(function, &(param1, param2, param3)).unwrap())
.await
}
#[inline(always)]
pub async fn call_fn4<T1, T2, T3, T4>(
&self,
function: &str,
param1: &T1,
param2: &T2,
param3: &T3,
param4: &T4,
) -> io::Result<TarantoolResponse>
where
T1: Serialize,
T2: Serialize,
T3: Serialize,
T4: Serialize,
{
self.send_command(CommandPacket::call(function, &(param1, param2, param3, param4)).unwrap())
.await
}
#[inline(always)]
pub async fn call_fn5<T1, T2, T3, T4, T5>(
&self,
function: &str,
param1: &T1,
param2: &T2,
param3: &T3,
param4: &T4,
param5: &T5,
) -> io::Result<TarantoolResponse>
where
T1: Serialize,
T2: Serialize,
T3: Serialize,
T4: Serialize,
T5: Serialize,
{
self.send_command(
CommandPacket::call(function, &(param1, param2, param3, param4, param5)).unwrap(),
)
.await
}
#[inline(always)]
pub async fn select<T>(
&self,
space: i32,
index: i32,
key: &T,
offset: i32,
limit: i32,
iterator: IteratorType,
) -> io::Result<TarantoolResponse>
where
T: Serialize,
{
self.send_command(
CommandPacket::select(space, index, key, offset, limit, iterator as i32).unwrap(),
)
.await
}
#[inline(always)]
pub async fn insert<T>(&self, space: i32, tuple: &T) -> io::Result<TarantoolResponse>
where
T: Serialize,
{
self.send_command(CommandPacket::insert(space, tuple).unwrap())
.await
}
#[inline(always)]
pub async fn replace<T>(&self, space: i32, tuple: &T) -> io::Result<TarantoolResponse>
where
T: Serialize,
{
self.send_command(CommandPacket::replace(space, tuple).unwrap())
.await
}
#[inline(always)]
pub async fn replace_raw(
&self,
space: i32,
tuple_raw: Vec<u8>,
) -> io::Result<TarantoolResponse> {
self.send_command(CommandPacket::replace_raw(space, tuple_raw).unwrap())
.await
}
#[inline(always)]
pub async fn update<T, T2>(
&self,
space: i32,
key: &T2,
args: &T,
) -> io::Result<TarantoolResponse>
where
T: Serialize,
T2: Serialize,
{
self.send_command(CommandPacket::update(space, key, args).unwrap())
.await
}
#[inline(always)]
pub async fn upsert<T, T2, T3>(
&self,
space: i32,
key: &T2,
def: &T3,
args: &T,
) -> io::Result<TarantoolResponse>
where
T: Serialize,
T2: Serialize,
T3: Serialize,
{
self.send_command(CommandPacket::upsert(space, key, def, args).unwrap())
.await
}
#[inline(always)]
pub async fn delete<T>(&self, space: i32, key: &T) -> io::Result<TarantoolResponse>
where
T: Serialize,
{
self.send_command(CommandPacket::delete(space, key).unwrap())
.await
}
#[inline(always)]
pub async fn eval<T, T1>(&self, expression: T1, args: &T) -> io::Result<TarantoolResponse>
where
T: Serialize,
T1: Into<String>
{
self.send_command(CommandPacket::eval(expression.into(), args).unwrap())
.await
}
#[inline(always)]
pub async fn exec_sql<T, T1>(&self, sql: T1, args: &T) -> io::Result<TarantoolResponse>
where
T: Serialize,
T1: Into<String>
{
self.send_command(CommandPacket::exec_sql(sql.into().as_str(), args).unwrap())
.await
}
#[inline(always)]
pub async fn ping(&self) -> io::Result<TarantoolResponse> {
self.send_command(CommandPacket::ping().unwrap()).await
}
}
impl ExecWithParamaters for PreparedSql {
fn bind_raw(mut self, param: Vec<u8>) -> io::Result<PreparedSql> {
self.params.push(param);
Ok(self)
}
}
impl PreparedSql {
#[inline(always)]
pub async fn execute(self) -> io::Result<TarantoolSqlResponse>
{
self.client.send_command(CommandPacket::exec_sql_raw(&self.sql.as_str(), serialize_array(&self.params)?).unwrap())
.await.map(|val| val.into())
}
}
impl ExecWithParamaters for PreparedFunctionCall {
fn bind_raw(mut self, param: Vec<u8>) -> io::Result<PreparedFunctionCall> {
self.params.push(param);
Ok(self)
}
}
impl PreparedFunctionCall {
#[inline(always)]
pub async fn execute(self) -> io::Result<TarantoolResponse>
{
self.client.send_command(CommandPacket::call_raw(self.function.as_str(), serialize_array(&self.params)?).unwrap())
.await
}
}