d3_core/channel/
sender.rs1use self::connection::*;
2use super::*;
3use crossbeam::channel::{SendError, SendTimeoutError, TrySendError};
4
5pub struct Sender<T: MachineImpl> {
16 channel_id: usize,
17 clone_count: Arc<AtomicUsize>,
18 connection: ThreadSafeConnection,
19 pub sender: crossbeam::channel::Sender<T>,
20 receiver_machine: WeakShareableMachine,
21}
22
23impl<T> Sender<T>
24where
25 T: MachineImpl,
26{
27 pub fn get_id(&self) -> usize { self.channel_id }
28 pub fn bind(&mut self, recevier_machine: WeakShareableMachine) { self.receiver_machine = recevier_machine; }
29 pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { self.sender.try_send(msg) }
30
31 pub fn sender(&self) -> crossbeam::channel::Sender<T> { self.sender.clone() }
32
33 pub fn send(&self, msg: T) -> Result<(), SendError<T>>
34 where
35 T: MachineImpl + MachineImpl<InstructionSet = T> + std::fmt::Debug,
36 {
37 ExecutorData::block_or_continue();
43 match self.sender.try_send(msg) {
46 Ok(()) => {
47 if let Some(machine) = self.receiver_machine.upgrade() {
48 match machine.compare_and_exchange_state(MachineState::RecvBlock, MachineState::Ready) {
56 Ok(_) => ExecutorData::schedule(&machine, false),
57 Err(MachineState::New) => (),
59 Err(MachineState::Running) => (),
61 Err(MachineState::Ready) => (),
63 Err(MachineState::SendBlock) => (),
65 Err(state) => {
67 log::error!("chan {} state {:#?} q_len {}", self.channel_id, state, self.sender.len());
68 },
69 }
70 }
71 Ok(())
72 },
73 Err(TrySendError::Full(instruction)) => {
74 if let Some(machine) = self.receiver_machine.upgrade() {
75 log::trace!(
76 "parking sender {} with cmd {:#?} machine {} state {:#?}",
77 self.channel_id,
78 instruction,
79 machine.get_key(),
80 machine.get_state()
81 );
82 }
83 match <T as MachineImpl>::park_sender(
84 self.channel_id,
85 Weak::clone(&self.receiver_machine),
86 self.sender.clone() as crossbeam::channel::Sender<<T as MachineImpl>::InstructionSet>,
87 instruction,
88 ) {
89 Ok(()) => Ok(()),
91 Err(m) => {
93 log::debug!("blocking main thread on send");
94 self.sender.send(m)
95 },
96 }
97 },
98 Err(TrySendError::Disconnected(m)) => Err(SendError(m)),
100 }
101 }
102
103 pub fn send_timeout(&self, msg: T, timeout: std::time::Duration) -> Result<(), SendTimeoutError<T>> {
104 if self.sender.is_full() {
105 log::warn!("Sender: channel is full, send_timeout will block for {:#?}", timeout);
106 }
107 self.sender.send_timeout(msg, timeout)
108 }
109
110 pub fn is_full(&self) -> bool { self.sender.is_full() }
111 pub fn len(&self) -> usize { self.sender.len() }
112 pub fn is_empty(&self) -> bool { self.sender.is_empty() }
113 pub fn capacity(&self) -> Option<usize> { self.sender.capacity() }
114}
115
116impl<T> fmt::Debug for Sender<T>
117where
118 T: MachineImpl,
119{
120 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.sender.fmt(f) }
121}
122
123impl<T> Drop for Sender<T>
124where
125 T: MachineImpl,
126{
127 fn drop(&mut self) {
128 if 0 != self.clone_count.fetch_sub(1, Ordering::SeqCst) {
129 return;
130 }
131 if let Some(machine) = self.receiver_machine.upgrade() {
132 machine.set_disconnected();
133 match machine.compare_and_exchange_state(MachineState::RecvBlock, MachineState::Ready) {
134 Ok(_) => {
135 ExecutorData::schedule(&machine, true);
136 },
137 Err(MachineState::New) => (),
138 Err(MachineState::Ready) => (),
139 Err(MachineState::Running) => (),
140 Err(MachineState::Dead) => (),
141 Err(state) => panic!("sender drop not expecting receiver state {:#?}", state),
142 }
143 }
144 }
145}
146
147impl<T> Clone for Sender<T>
148where
149 T: MachineImpl,
150{
151 fn clone(&self) -> Self {
152 self.clone_count.fetch_add(1, Ordering::SeqCst);
153 Self {
154 channel_id: self.channel_id,
155 clone_count: Arc::clone(&self.clone_count),
156 connection: Arc::clone(&self.connection),
157 sender: self.sender.clone(),
158 receiver_machine: Weak::clone(&self.receiver_machine),
159 }
160 }
161}
162
163impl<T> PartialEq for Sender<T>
164where
165 T: MachineImpl,
166{
167 fn eq(&self, other: &Self) -> bool { self.channel_id == other.channel_id && self.sender.same_channel(&other.sender) }
168}
169
170impl<T> Eq for Sender<T> where T: MachineImpl {}
171
172pub fn wrap_sender<T>(sender: crossbeam::channel::Sender<T>, channel_id: usize, connection: ThreadSafeConnection) -> Sender<T>
173where
174 T: MachineImpl,
175{
176 log::trace!("creating sender {}", channel_id);
177 Sender::<T> {
178 channel_id,
179 clone_count: Arc::new(AtomicUsize::new(0)),
180 connection,
181 sender,
182 receiver_machine: Weak::new(),
183 }
184}