use crate::packet::{Packet, PacketView, SlotId};
use crate::queue::Queue;
use crate::transport::{TransportError, TransportRecv, TransportSend};
use crate::Config;
use crossbeam::channel::RecvTimeoutError::{Disconnected, Timeout};
use crossbeam::channel::{bounded, Receiver, Sender};
use std::sync::Arc;
use std::thread;
use std::time;
#[derive(Debug)]
struct CallSlot {
id: SlotId,
response: Receiver<Result<Packet, TransportError>>,
}
#[derive(Debug)]
pub struct Client {
config: Config,
call_slots: Arc<Queue<CallSlot>>,
transport_send: Arc<dyn TransportSend>,
receiver_thread: Option<thread::JoinHandle<()>>,
joined_event_receiver: Receiver<()>,
}
impl Client {
pub fn new(config: Config, transport_send: Arc<dyn TransportSend>, transport_recv: Box<dyn TransportRecv>) -> Self {
let (joined_event_sender, joined_event_receiver) = bounded(1);
let callslot_size = SlotId::new(config.call_slots as u32);
let call_slots = Arc::new(Queue::new(callslot_size.as_usize()));
let mut to_slot_receivers = Vec::with_capacity(callslot_size.as_usize());
for i in 0..callslot_size.as_raw() {
let (send_to_slot_recv, recv_for_slot) = bounded(1);
call_slots
.push(CallSlot {
id: SlotId::new(i),
response: recv_for_slot,
})
.expect("Client does not call close");
to_slot_receivers.push(send_to_slot_recv);
}
let name = config.name.clone();
Client {
config,
call_slots,
transport_send,
receiver_thread: Some(
thread::Builder::new()
.name(format!("[{}] client", name))
.spawn(move || {
receive_loop(transport_recv, to_slot_receivers);
joined_event_sender.send(()).unwrap();
})
.unwrap(),
),
joined_event_receiver,
}
}
pub fn call(&self, packet: PacketView) -> Packet {
let slot = self.call_slots.pop(self.config.call_timeout).expect("Too many calls on port");
let packet = {
let mut packet = packet.to_owned();
packet.set_slot(slot.id.into_request());
packet
};
self.transport_send.send(packet.buffer(), self.config.call_timeout).unwrap();
let response_packet = slot.response.recv().expect(
"counterparty send is managed by client. \n\
This error might be due to drop after disconnection of the two remote-trait-object contexts. \n\
Please consider disable_garbage_collection() or explicit drop for the imported services.",
);
self.call_slots.push(slot).expect("Client does not close the queue");
response_packet.unwrap()
}
pub fn shutdown(&mut self) {
match self.joined_event_receiver.recv_timeout(time::Duration::from_millis(100)) {
Err(Timeout) => {
panic!(
"There may be a deadlock or misuse of Client. Call Client::shutdown after Multiplexer::shutdown"
);
}
Err(Disconnected) => {
panic!("Maybe receive_loop thread panics");
}
Ok(_) => {}
}
self.receiver_thread.take().unwrap().join().unwrap();
}
}
impl Drop for Client {
fn drop(&mut self) {
assert!(self.receiver_thread.is_none(), "Please call shutdown");
}
}
fn receive_loop(
transport_recv: Box<dyn TransportRecv>,
to_slot_receivers: Vec<Sender<Result<Packet, TransportError>>>,
) {
loop {
match transport_recv.recv(None) {
Ok(x) => {
let packet = Packet::new_from_buffer(x);
let slot_id = packet.view().slot();
to_slot_receivers[slot_id.as_usize()]
.send(Ok(packet))
.expect("Slot receivers are managed in Client. Client must be dropped after this thread");
}
Err(TransportError::Termination) => return,
Err(_err) => {
return
}
};
}
}