use super::ll_conn::DuplexConn;
use super::*;
use crate::message_builder::MarshalledMessage;
use crate::message_builder::MessageType;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::time;
pub struct RpcConn {
signals: VecDeque<MarshalledMessage>,
calls: VecDeque<MarshalledMessage>,
responses: HashMap<u32, MarshalledMessage>,
conn: DuplexConn,
filter: MessageFilter,
}
pub type MessageFilter = Box<dyn Fn(&MarshalledMessage) -> bool + Sync + Send>;
impl RpcConn {
pub fn new(conn: DuplexConn) -> Self {
RpcConn {
signals: VecDeque::new(),
calls: VecDeque::new(),
responses: HashMap::new(),
conn,
filter: Box::new(|_| true),
}
}
pub fn conn(&self) -> &DuplexConn {
&self.conn
}
pub fn conn_mut(&mut self) -> &mut DuplexConn {
&mut self.conn
}
pub fn alloc_serial(&mut self) -> u32 {
self.conn.send.alloc_serial()
}
pub fn session_conn(timeout: Timeout) -> Result<Self> {
let session_path = get_session_bus_path()?;
Self::connect_to_path(session_path, timeout)
}
pub fn system_conn(timeout: Timeout) -> Result<Self> {
let session_path = get_system_bus_path()?;
Self::connect_to_path(session_path, timeout)
}
pub fn connect_to_path(path: UnixAddr, timeout: Timeout) -> Result<Self> {
let con = DuplexConn::connect_to_bus(path, true)?;
let mut con = Self::new(con);
let mut hello = crate::standard_messages::hello();
let serial = con
.send_message(&mut hello)?
.write(timeout)
.map_err(super::ll_conn::force_finish_on_error)?;
con.wait_response(serial, timeout)?;
Ok(con)
}
pub fn set_filter(&mut self, filter: MessageFilter) {
self.filter = filter;
}
pub fn try_get_response(&mut self, serial: u32) -> Option<MarshalledMessage> {
self.responses.remove(&serial)
}
pub fn wait_response(&mut self, serial: u32, timeout: Timeout) -> Result<MarshalledMessage> {
let start_time = time::Instant::now();
loop {
if let Some(msg) = self.try_get_response(serial) {
return Ok(msg);
}
self.refill_once(calc_timeout_left(&start_time, timeout)?)?;
}
}
pub fn try_get_signal(&mut self) -> Option<MarshalledMessage> {
self.signals.pop_front()
}
pub fn wait_signal(&mut self, timeout: Timeout) -> Result<MarshalledMessage> {
let start_time = time::Instant::now();
loop {
if let Some(msg) = self.try_get_signal() {
return Ok(msg);
}
self.refill_once(calc_timeout_left(&start_time, timeout)?)?;
}
}
pub fn try_get_call(&mut self) -> Option<MarshalledMessage> {
self.calls.pop_front()
}
pub fn wait_call(&mut self, timeout: Timeout) -> Result<MarshalledMessage> {
let start_time = time::Instant::now();
loop {
if let Some(msg) = self.try_get_call() {
return Ok(msg);
}
self.refill_once(calc_timeout_left(&start_time, timeout)?)?;
}
}
pub fn send_message<'a>(
&'a mut self,
msg: &'a mut crate::message_builder::MarshalledMessage,
) -> Result<super::ll_conn::SendMessageContext<'a>> {
self.conn.send.send_message(msg)
}
fn insert_message_or_send_error(&mut self, msg: MarshalledMessage) -> Result<()> {
if self.filter.as_ref()(&msg) {
match msg.typ {
MessageType::Call => {
self.calls.push_back(msg);
}
MessageType::Invalid => return Err(Error::UnexpectedMessageTypeReceived),
MessageType::Error => {
self.responses
.insert(msg.dynheader.response_serial.unwrap(), msg);
}
MessageType::Reply => {
self.responses
.insert(msg.dynheader.response_serial.unwrap(), msg);
}
MessageType::Signal => {
self.signals.push_back(msg);
}
}
} else {
match msg.typ {
MessageType::Call => {
let reply = crate::standard_messages::unknown_method(&msg.dynheader);
self.conn
.send
.send_message(&reply)?
.write_all()
.map_err(ll_conn::force_finish_on_error)?;
}
MessageType::Invalid => return Err(Error::UnexpectedMessageTypeReceived),
MessageType::Error => {
}
MessageType::Reply => {
}
MessageType::Signal => {
}
}
}
Ok(())
}
pub fn try_refill_once(&mut self, timeout: Timeout) -> Result<Option<MessageType>> {
let start_time = time::Instant::now();
let msg = self
.conn
.recv
.get_next_message(calc_timeout_left(&start_time, timeout)?)?;
let typ = msg.typ;
self.insert_message_or_send_error(msg)?;
Ok(Some(typ))
}
pub fn refill_once(&mut self, timeout: Timeout) -> Result<MessageType> {
let start_time = time::Instant::now();
loop {
if let Some(typ) = self.try_refill_once(calc_timeout_left(&start_time, timeout)?)? {
break Ok(typ);
}
}
}
pub fn refill_all(&mut self) -> Result<Vec<crate::message_builder::MarshalledMessage>> {
let mut filtered_out = Vec::new();
loop {
let msg = match self.conn.recv.get_next_message(Timeout::Nonblock) {
Err(Error::TimedOut) => break,
Err(e) => return Err(e),
Ok(m) => m,
};
if self.filter.as_ref()(&msg) {
match msg.typ {
MessageType::Call => {
self.calls.push_back(msg);
}
MessageType::Invalid => return Err(Error::UnexpectedMessageTypeReceived),
MessageType::Error => {
self.responses
.insert(msg.dynheader.response_serial.unwrap(), msg);
}
MessageType::Reply => {
self.responses
.insert(msg.dynheader.response_serial.unwrap(), msg);
}
MessageType::Signal => {
self.signals.push_back(msg);
}
}
} else {
match msg.typ {
MessageType::Call => {
let reply = crate::standard_messages::unknown_method(&msg.dynheader);
filtered_out.push(reply);
}
MessageType::Invalid => return Err(Error::UnexpectedMessageTypeReceived),
MessageType::Error => {
}
MessageType::Reply => {
}
MessageType::Signal => {
}
}
}
}
Ok(filtered_out)
}
}