use self::connection::*;
use super::*;
use crossbeam::channel::{SendError, SendTimeoutError, TrySendError};
pub struct Sender<T: MachineImpl> {
channel_id: usize,
clone_count: Arc<AtomicUsize>,
connection: ThreadSafeConnection,
pub sender: crossbeam::channel::Sender<T>,
receiver_machine: WeakShareableMachine,
}
impl<T> Sender<T>
where
T: MachineImpl,
{
pub fn get_id(&self) -> usize { self.channel_id }
pub fn bind(&mut self, recevier_machine: WeakShareableMachine) { self.receiver_machine = recevier_machine; }
pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { self.sender.try_send(msg) }
pub fn sender(&self) -> crossbeam::channel::Sender<T> { self.sender.clone() }
pub fn send(&self, msg: T) -> Result<(), SendError<T>>
where
T: MachineImpl + MachineImpl<InstructionSet = T> + std::fmt::Debug,
{
ExecutorData::block_or_continue();
match self.sender.try_send(msg) {
Ok(()) => {
if let Some(machine) = self.receiver_machine.upgrade() {
match machine.compare_and_exchange_state(MachineState::RecvBlock, MachineState::Ready) {
Ok(_) => ExecutorData::schedule(&machine, false),
Err(MachineState::New) => (),
Err(MachineState::Running) => (),
Err(MachineState::Ready) => (),
Err(MachineState::SendBlock) => (),
Err(state) => {
log::error!("chan {} state {:#?} q_len {}", self.channel_id, state, self.sender.len());
},
}
}
Ok(())
},
Err(TrySendError::Full(instruction)) => {
if let Some(machine) = self.receiver_machine.upgrade() {
log::trace!(
"parking sender {} with cmd {:#?} machine {} state {:#?}",
self.channel_id,
instruction,
machine.get_key(),
machine.get_state()
);
}
match <T as MachineImpl>::park_sender(
self.channel_id,
Weak::clone(&self.receiver_machine),
self.sender.clone() as crossbeam::channel::Sender<<T as MachineImpl>::InstructionSet>,
instruction,
) {
Ok(()) => Ok(()),
Err(m) => {
log::debug!("blocking main thread on send");
self.sender.send(m)
},
}
},
Err(TrySendError::Disconnected(m)) => Err(SendError(m)),
}
}
pub fn send_timeout(&self, msg: T, timeout: std::time::Duration) -> Result<(), SendTimeoutError<T>> {
if self.sender.is_full() {
log::warn!("Sender: channel is full, send_timeout will block for {:#?}", timeout);
}
self.sender.send_timeout(msg, timeout)
}
pub fn is_full(&self) -> bool { self.sender.is_full() }
pub fn len(&self) -> usize { self.sender.len() }
pub fn is_empty(&self) -> bool { self.sender.is_empty() }
pub fn capacity(&self) -> Option<usize> { self.sender.capacity() }
}
impl<T> fmt::Debug for Sender<T>
where
T: MachineImpl,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.sender.fmt(f) }
}
impl<T> Drop for Sender<T>
where
T: MachineImpl,
{
fn drop(&mut self) {
if 0 != self.clone_count.fetch_sub(1, Ordering::SeqCst) {
return;
}
if let Some(machine) = self.receiver_machine.upgrade() {
machine.set_disconnected();
match machine.compare_and_exchange_state(MachineState::RecvBlock, MachineState::Ready) {
Ok(_) => {
ExecutorData::schedule(&machine, true);
},
Err(MachineState::New) => (),
Err(MachineState::Ready) => (),
Err(MachineState::Running) => (),
Err(MachineState::Dead) => (),
Err(state) => panic!("sender drop not expecting receiver state {:#?}", state),
}
}
}
}
impl<T> Clone for Sender<T>
where
T: MachineImpl,
{
fn clone(&self) -> Self {
self.clone_count.fetch_add(1, Ordering::SeqCst);
Self {
channel_id: self.channel_id,
clone_count: Arc::clone(&self.clone_count),
connection: Arc::clone(&self.connection),
sender: self.sender.clone(),
receiver_machine: Weak::clone(&self.receiver_machine),
}
}
}
impl<T> PartialEq for Sender<T>
where
T: MachineImpl,
{
fn eq(&self, other: &Self) -> bool { self.channel_id == other.channel_id && self.sender.same_channel(&other.sender) }
}
impl<T> Eq for Sender<T> where T: MachineImpl {}
pub fn wrap_sender<T>(sender: crossbeam::channel::Sender<T>, channel_id: usize, connection: ThreadSafeConnection) -> Sender<T>
where
T: MachineImpl,
{
log::trace!("creating sender {}", channel_id);
Sender::<T> {
channel_id,
clone_count: Arc::new(AtomicUsize::new(0)),
connection,
sender,
receiver_machine: Weak::new(),
}
}