d3_core/channel/
sender.rs

1use self::connection::*;
2use super::*;
3use crossbeam::channel::{SendError, SendTimeoutError, TrySendError};
4
5/// Wrap the crossbeam sender to allow the executor to handle a block.
6/// This requires that a send which would block, parks the send. It
7/// also requires that prior to sending a check is made to determine
8/// if the sender is already blocked. What makes this work is that the
9/// repeat send is bound to the executor. Consequently, TLS data can
10/// be inspected to determine if we need to not complete the send.
11///
12/// Otherwise, the Sender is a wrapper aruond the Crossbeam sender. It
13/// intentionally limits the surface of the sender. Much of this
14/// is just boilerplate wrapping
15pub struct Sender<T: MachineImpl> {
16    channel_id: usize,
17    clone_count: Arc<AtomicUsize>,
18    connection: ThreadSafeConnection,
19    pub sender: crossbeam::channel::Sender<T>,
20    receiver_machine: WeakShareableMachine,
21}
22
23impl<T> Sender<T>
24where
25    T: MachineImpl,
26{
27    pub fn get_id(&self) -> usize { self.channel_id }
28    pub fn bind(&mut self, recevier_machine: WeakShareableMachine) { self.receiver_machine = recevier_machine; }
29    pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { self.sender.try_send(msg) }
30
31    pub fn sender(&self) -> crossbeam::channel::Sender<T> { self.sender.clone() }
32
33    pub fn send(&self, msg: T) -> Result<(), SendError<T>>
34    where
35        T: MachineImpl + MachineImpl<InstructionSet = T> + std::fmt::Debug,
36    {
37        // Blocking on send is a pretty ugly process; the first block is allowed to complete,
38        // as if it had not blocked. However, the machine state becomes SendBlock. If a subsequent
39        // send is attempted while SendBlock'd, the executor will pause until the prior send completes.
40
41        // if already SendBlock'd this won't return until the send completes
42        ExecutorData::block_or_continue();
43        // now, that the sender is not blocked, it should be running, try to complete the send
44        // and schedule the receiver if needed
45        match self.sender.try_send(msg) {
46            Ok(()) => {
47                if let Some(machine) = self.receiver_machine.upgrade() {
48                    // log::debug!(
49                    // "chan {} send machine {} state {:#?} q_len {}",
50                    // self.channel_id,
51                    // machine.get_key(),
52                    // machine.get_state(),
53                    // self.sender.len()
54                    // );
55                    match machine.compare_and_exchange_state(MachineState::RecvBlock, MachineState::Ready) {
56                        Ok(_) => ExecutorData::schedule(&machine, false),
57                        // new machines will be scheduled when assigned, scheduling here would be bad
58                        Err(MachineState::New) => (),
59                        // already running is perfection
60                        Err(MachineState::Running) => (),
61                        // ready should already be scheduled
62                        Err(MachineState::Ready) => (),
63                        // send block will clear and schedule
64                        Err(MachineState::SendBlock) => (),
65                        // anything else we need to decide what to do, so log it
66                        Err(state) => {
67                            log::error!("chan {} state {:#?} q_len {}", self.channel_id, state, self.sender.len());
68                        },
69                    }
70                }
71                Ok(())
72            },
73            Err(TrySendError::Full(instruction)) => {
74                if let Some(machine) = self.receiver_machine.upgrade() {
75                    log::trace!(
76                        "parking sender {} with cmd {:#?} machine {} state {:#?}",
77                        self.channel_id,
78                        instruction,
79                        machine.get_key(),
80                        machine.get_state()
81                    );
82                }
83                match <T as MachineImpl>::park_sender(
84                    self.channel_id,
85                    Weak::clone(&self.receiver_machine),
86                    self.sender.clone() as crossbeam::channel::Sender<<T as MachineImpl>::InstructionSet>,
87                    instruction,
88                ) {
89                    // parked, the machine should now have a state of SendBlock
90                    Ok(()) => Ok(()),
91                    // not parked ,due to it being the main thread, just send and let main block
92                    Err(m) => {
93                        log::debug!("blocking main thread on send");
94                        self.sender.send(m)
95                    },
96                }
97            },
98            // on disconnect, return to the caller with an error
99            Err(TrySendError::Disconnected(m)) => Err(SendError(m)),
100        }
101    }
102
103    pub fn send_timeout(&self, msg: T, timeout: std::time::Duration) -> Result<(), SendTimeoutError<T>> {
104        if self.sender.is_full() {
105            log::warn!("Sender: channel is full, send_timeout will block for {:#?}", timeout);
106        }
107        self.sender.send_timeout(msg, timeout)
108    }
109
110    pub fn is_full(&self) -> bool { self.sender.is_full() }
111    pub fn len(&self) -> usize { self.sender.len() }
112    pub fn is_empty(&self) -> bool { self.sender.is_empty() }
113    pub fn capacity(&self) -> Option<usize> { self.sender.capacity() }
114}
115
116impl<T> fmt::Debug for Sender<T>
117where
118    T: MachineImpl,
119{
120    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.sender.fmt(f) }
121}
122
123impl<T> Drop for Sender<T>
124where
125    T: MachineImpl,
126{
127    fn drop(&mut self) {
128        if 0 != self.clone_count.fetch_sub(1, Ordering::SeqCst) {
129            return;
130        }
131        if let Some(machine) = self.receiver_machine.upgrade() {
132            machine.set_disconnected();
133            match machine.compare_and_exchange_state(MachineState::RecvBlock, MachineState::Ready) {
134                Ok(_) => {
135                    ExecutorData::schedule(&machine, true);
136                },
137                Err(MachineState::New) => (),
138                Err(MachineState::Ready) => (),
139                Err(MachineState::Running) => (),
140                Err(MachineState::Dead) => (),
141                Err(state) => panic!("sender drop not expecting receiver state {:#?}", state),
142            }
143        }
144    }
145}
146
147impl<T> Clone for Sender<T>
148where
149    T: MachineImpl,
150{
151    fn clone(&self) -> Self {
152        self.clone_count.fetch_add(1, Ordering::SeqCst);
153        Self {
154            channel_id: self.channel_id,
155            clone_count: Arc::clone(&self.clone_count),
156            connection: Arc::clone(&self.connection),
157            sender: self.sender.clone(),
158            receiver_machine: Weak::clone(&self.receiver_machine),
159        }
160    }
161}
162
163impl<T> PartialEq for Sender<T>
164where
165    T: MachineImpl,
166{
167    fn eq(&self, other: &Self) -> bool { self.channel_id == other.channel_id && self.sender.same_channel(&other.sender) }
168}
169
170impl<T> Eq for Sender<T> where T: MachineImpl {}
171
172pub fn wrap_sender<T>(sender: crossbeam::channel::Sender<T>, channel_id: usize, connection: ThreadSafeConnection) -> Sender<T>
173where
174    T: MachineImpl,
175{
176    log::trace!("creating sender {}", channel_id);
177    Sender::<T> {
178        channel_id,
179        clone_count: Arc::new(AtomicUsize::new(0)),
180        connection,
181        sender,
182        receiver_machine: Weak::new(),
183    }
184}