mod call;
mod connection;
mod connection_id;
use super::{client_id::BYTE_LENGTH, AlignmentContext, ClientId, RpcKind, RpcProtocol};
use crate::{conf::Configuration, fs::common_configuration_keys};
use anyhow::Error;
use atomic::Atomic;
use call::Call;
use connection::Connection;
pub use connection_id::ConnectionId;
use std::{
cell::RefCell,
io::{Read, Write},
net::TcpStream,
rc::Rc,
sync::{atomic::Ordering, Arc},
};
static CALL_ID_COUNTER: Atomic<i32> = Atomic::new(0);
thread_local! {
static CALL_ID: RefCell<Option<i32>> = RefCell::new(None);
static RETRY_COUNT: RefCell<Option<i32>> = RefCell::new(None);
static EXTERNAL_CALL_HANDLER: RefCell<Option<String>> = RefCell::new(None);
}
pub struct Client {
_value_class: String,
_conf: Configuration,
_connection_timeout: i32,
_fallback_allowed: bool,
_bind_to_wild_card_address: bool,
client_id: [u8; BYTE_LENGTH],
_max_async_calls: i32,
}
impl Client {
pub fn get_timeout(_conf: &Configuration) -> i32 {
3000
}
fn _get_call_id() -> anyhow::Result<i32> {
Ok(CALL_ID
.try_with(|x| *x.borrow())?
.unwrap_or_else(|| Self::next_call_id()))
}
fn take_call_id() -> anyhow::Result<i32> {
Ok(CALL_ID
.try_with(|x| x.take())?
.unwrap_or_else(|| Self::next_call_id()))
}
fn get_retry_count() -> anyhow::Result<i32> {
Ok(RETRY_COUNT.try_with(|x| *x.borrow())?.unwrap_or_default())
}
fn get_external_handler() -> anyhow::Result<Option<String>> {
Ok(EXTERNAL_CALL_HANDLER.try_with(|x| (*x.borrow()).to_owned())?)
}
fn get_ping_interval(conf: &Configuration) -> anyhow::Result<i32> {
conf.get_int(
common_configuration_keys::IPC_PING_INTERVAL_KEY,
common_configuration_keys::IPC_PING_INTERVAL_DEFAULT,
)
}
fn create_call(&self, rpc_kind: &RpcKind, rpc_request: Rc<Vec<u8>>) -> anyhow::Result<Call> {
Call::new(rpc_kind, rpc_request)
}
pub fn new(value_class: &str, conf: &Configuration) -> anyhow::Result<Self> {
let connection_timeout = conf.get_int(
common_configuration_keys::IPC_CLIENT_CONNECT_TIMEOUT_KEY,
common_configuration_keys::IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT,
)?;
let fallback_allowed = conf.get_bool(
common_configuration_keys::IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
common_configuration_keys::IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT,
);
let bind_to_wild_card_address = conf.get_bool(
common_configuration_keys::IPC_CLIENT_BIND_WILDCARD_ADDR_KEY,
common_configuration_keys::IPC_CLIENT_BIND_WILDCARD_ADDR_DEFAULT,
);
let max_async_calls = conf.get_int(
common_configuration_keys::IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
common_configuration_keys::IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT,
)?;
Ok(Self {
_value_class: value_class.to_owned(),
_conf: conf.to_owned(),
_connection_timeout: connection_timeout,
_fallback_allowed: fallback_allowed,
_bind_to_wild_card_address: bind_to_wild_card_address,
client_id: ClientId::get_client_id(),
_max_async_calls: max_async_calls,
})
}
pub fn call<T: RpcProtocol>(
&self,
rpc_kind: &RpcKind,
rpc_request: Rc<Vec<u8>>,
remote_id: Rc<ConnectionId>,
service_class: u8,
fallback_to_simple_auth: Option<Arc<Atomic<bool>>>,
alignment_context: Option<Rc<dyn AlignmentContext>>,
) -> anyhow::Result<Vec<u8>> {
let mut call = self.create_call(rpc_kind, rpc_request)?;
call.set_alignment_context(alignment_context);
let call = Rc::new(call);
let mut connection = self.get_connection::<T>(
remote_id,
Rc::clone(&call),
service_class,
fallback_to_simple_auth,
)?;
connection.send_rpc_request(Rc::clone(&call))?;
let res: Vec<u8> = self.get_rpc_response(Rc::clone(&call), &mut connection, 0)?;
Ok(res)
}
fn get_connection<'a, T: RpcProtocol>(
&'a self,
remote_id: Rc<ConnectionId>,
call: Rc<Call>,
service_class: u8,
fallback_to_simple_auth: Option<Arc<Atomic<bool>>>,
) -> anyhow::Result<Connection<'a, T>> {
let mut connection = Connection::new(self, remote_id, service_class)?;
connection.add_call(call);
connection.setup_iostreams(fallback_to_simple_auth)?;
Ok(connection)
}
fn get_rpc_response<T: RpcProtocol>(
&self,
_call: Rc<Call>,
connection: &mut Connection<T>,
_timeout: i64,
) -> anyhow::Result<Vec<u8>> {
Ok(connection.ipc_streams.read_response()?)
}
fn next_call_id() -> i32 {
CALL_ID_COUNTER.fetch_add(1, Ordering::SeqCst) & 0x7FFFFFFF
}
}
struct IpcStreams {
inner: TcpStream,
max_response_length: i32,
first_response: bool,
}
impl IpcStreams {
fn new(inner: TcpStream, max_response_length: i32) -> Self {
IpcStreams {
inner,
max_response_length,
first_response: true,
}
}
fn read_i32(&mut self) -> anyhow::Result<i32> {
let mut buf = [0; 4];
self.inner.read_exact(&mut buf)?;
Ok(i32::from_be_bytes(buf))
}
fn read_response(&mut self) -> anyhow::Result<Vec<u8>> {
let length = self.read_i32()?;
if self.first_response {
self.first_response = false;
if length == -1 {
self.read_i32()?;
return Err(Error::msg("Remote Exception"));
}
}
if length <= 0 {
return Err(Error::msg("RPC response has invalid length"));
}
if self.max_response_length > 0 && length > self.max_response_length {
return Err(Error::msg("RPC response exceeds maximum data length"));
}
let mut buf = vec![0; length as usize];
self.inner.read_exact(&mut buf)?;
Ok(buf)
}
fn send_request(&mut self, buf: &[u8]) -> anyhow::Result<usize> {
Ok(self.inner.write(buf)?)
}
fn _flush(&mut self) -> anyhow::Result<()> {
Ok(self.inner.flush()?)
}
}