use std::io;
use std::sync::{Arc, Mutex, RwLock};
use futures::stream::Stream;
use futures::sync::mpsc;
use futures::sync::oneshot;
use futures::{Future, IntoFuture};
use serde::Serialize;
use tokio;
use crate::tarantool::dispatch::{
CallbackSender, Dispatch, ERROR_CLIENT_DISCONNECTED, ERROR_DISPATCH_THREAD_IS_DEAD,
};
pub use crate::tarantool::dispatch::{ClientConfig, ClientStatus};
pub use crate::tarantool::packets::{CommandPacket, TarantoolRequest, TarantoolResponse};
pub use crate::tarantool::tools::serialize_to_vec_u8;
pub mod codec;
mod dispatch;
pub mod packets;
mod tools;
impl ClientConfig {
pub fn build(self) -> Client {
Client::new(self)
}
}
#[derive(Clone)]
pub struct Client {
command_sender: mpsc::UnboundedSender<(CommandPacket, CallbackSender)>,
dispatch: Arc<Mutex<Option<Dispatch>>>,
status: Arc<RwLock<ClientStatus>>,
notify_callbacks: Arc<RwLock<Vec<dispatch::ReconnectNotifySender>>>,
}
impl Client {
pub fn new(config: ClientConfig) -> Client {
let (command_sender, command_receiver) = mpsc::unbounded();
let status = Arc::new(RwLock::new(ClientStatus::Init));
let notify_callbacks = Arc::new(RwLock::new(Vec::new()));
Client {
command_sender,
dispatch: Arc::new(Mutex::new(Some(Dispatch::new(
config,
command_receiver,
status.clone(),
notify_callbacks.clone(),
)))),
status,
notify_callbacks,
}
}
pub fn get_status(&self) -> ClientStatus {
self.status.read().unwrap().clone()
}
pub fn subscribe_to_notify_stream(&self) -> impl Stream<Item = ClientStatus, Error = ()> {
let (callback_sender, callback_receiver) = mpsc::unbounded();
self.notify_callbacks.write().unwrap().push(callback_sender);
callback_receiver
}
pub fn send_command(
&self,
req: CommandPacket,
) -> impl Future<Item = TarantoolResponse, Error = io::Error> {
let dispatch = self.dispatch.clone();
let (callback_sender, callback_receiver) = oneshot::channel();
let send_res = self.command_sender.unbounded_send((req, callback_sender));
send_res
.into_future()
.map_err(|_e| io::Error::new(io::ErrorKind::Other, ERROR_DISPATCH_THREAD_IS_DEAD))
.and_then(move |_r| {
if let Some(extracted_dispatch) = dispatch.lock().unwrap().take() {
debug!("spawn coroutine!");
tokio::spawn(extracted_dispatch);
}
callback_receiver
.into_future()
.map_err(|_e| io::Error::new(io::ErrorKind::Other, ERROR_CLIENT_DISCONNECTED))
})
.and_then(|r| r)
}
#[inline(always)]
pub fn call_fn<T>(
&self,
function: &str,
params: &T,
) -> impl Future<Item = TarantoolResponse, Error = io::Error>
where
T: Serialize,
{
self.send_command(CommandPacket::call(function, params).unwrap())
}
#[inline(always)]
pub fn call_fn1<T1>(
&self,
function: &str,
param1: &T1,
) -> impl Future<Item = TarantoolResponse, Error = io::Error>
where
T1: Serialize,
{
self.send_command(CommandPacket::call(function, &(param1,)).unwrap())
}
#[inline(always)]
pub fn call_fn2<T1, T2>(
&self,
function: &str,
param1: &T1,
param2: &T2,
) -> impl Future<Item = TarantoolResponse, Error = io::Error>
where
T1: Serialize,
T2: Serialize,
{
self.send_command(CommandPacket::call(function, &(param1, param2)).unwrap())
}
#[inline(always)]
pub fn call_fn3<T1, T2, T3>(
&self,
function: &str,
param1: &T1,
param2: &T2,
param3: &T3,
) -> impl Future<Item = TarantoolResponse, Error = io::Error>
where
T1: Serialize,
T2: Serialize,
T3: Serialize,
{
self.send_command(CommandPacket::call(function, &(param1, param2, param3)).unwrap())
}
#[inline(always)]
pub fn call_fn4<T1, T2, T3, T4>(
&self,
function: &str,
param1: &T1,
param2: &T2,
param3: &T3,
param4: &T4,
) -> impl Future<Item = TarantoolResponse, Error = io::Error>
where
T1: Serialize,
T2: Serialize,
T3: Serialize,
T4: Serialize,
{
self.send_command(CommandPacket::call(function, &(param1, param2, param3, param4)).unwrap())
}
#[inline(always)]
pub fn call_fn5<T1, T2, T3, T4, T5>(
&self,
function: &str,
param1: &T1,
param2: &T2,
param3: &T3,
param4: &T4,
param5: &T5,
) -> impl Future<Item = TarantoolResponse, Error = io::Error>
where
T1: Serialize,
T2: Serialize,
T3: Serialize,
T4: Serialize,
T5: Serialize,
{
self.send_command(
CommandPacket::call(function, &(param1, param2, param3, param4, param5)).unwrap(),
)
}
#[inline(always)]
pub fn select<T>(
&self,
space: i32,
index: i32,
key: &T,
offset: i32,
limit: i32,
iterator: i32,
) -> impl Future<Item = TarantoolResponse, Error = io::Error>
where
T: Serialize,
{
self.send_command(
CommandPacket::select(space, index, key, offset, limit, iterator).unwrap(),
)
}
#[inline(always)]
pub fn insert<T>(
&self,
space: i32,
tuple: &T,
) -> impl Future<Item = TarantoolResponse, Error = io::Error>
where
T: Serialize,
{
self.send_command(CommandPacket::insert(space, tuple).unwrap())
}
#[inline(always)]
pub fn replace<T>(
&self,
space: i32,
tuple: &T,
) -> impl Future<Item = TarantoolResponse, Error = io::Error>
where
T: Serialize,
{
self.send_command(CommandPacket::replace(space, tuple).unwrap())
}
#[inline(always)]
pub fn replace_raw(
&self,
space: i32,
tuple_raw: Vec<u8>,
) -> impl Future<Item = TarantoolResponse, Error = io::Error> {
self.send_command(CommandPacket::replace_raw(space, tuple_raw).unwrap())
}
#[inline(always)]
pub fn update<T, T2>(
&self,
space: i32,
key: &T2,
args: &T,
) -> impl Future<Item = TarantoolResponse, Error = io::Error>
where
T: Serialize,
T2: Serialize,
{
self.send_command(CommandPacket::update(space, key, args).unwrap())
}
#[inline(always)]
pub fn upsert<T, T2, T3>(
&self,
space: i32,
key: &T2,
def: &T3,
args: &T,
) -> impl Future<Item = TarantoolResponse, Error = io::Error>
where
T: Serialize,
T2: Serialize,
T3: Serialize,
{
self.send_command(CommandPacket::upsert(space, key, def, args).unwrap())
}
#[inline(always)]
pub fn delete<T>(
&self,
space: i32,
key: &T,
) -> impl Future<Item = TarantoolResponse, Error = io::Error>
where
T: Serialize,
{
self.send_command(CommandPacket::delete(space, key).unwrap())
}
#[inline(always)]
pub fn eval<T>(
&self,
expression: String,
args: &T,
) -> impl Future<Item = TarantoolResponse, Error = io::Error>
where
T: Serialize,
{
self.send_command(CommandPacket::eval(expression, args).unwrap())
}
#[inline(always)]
pub fn ping(&self) -> impl Future<Item = TarantoolResponse, Error = io::Error> {
self.send_command(CommandPacket::ping().unwrap())
}
}