1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use self::connection::*;
use super::*;
use crossbeam::channel::{SendError, SendTimeoutError, TrySendError};
pub struct Sender<T: MachineImpl> {
channel_id: usize,
connection: ThreadSafeConnection,
pub sender: crossbeam::channel::Sender<T>,
}
impl<T> Sender<T>
where
T: MachineImpl,
{
pub fn get_id(&self) -> usize { self.channel_id }
pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { self.sender.try_send(msg) }
pub fn sender(&self) -> crossbeam::channel::Sender<T> { self.sender.clone() }
pub fn send(&self, msg: T) -> Result<(), SendError<T>>
where
T: MachineImpl + MachineImpl<InstructionSet = T> + std::fmt::Debug,
{
ExecutorData::block_or_continue();
match self.sender.try_send(msg) {
Ok(()) => Ok(()),
Err(TrySendError::Full(instruction)) => {
log::debug!("parking sender {} with cmd {:#?}", self.channel_id, instruction);
match <T as MachineImpl>::park_sender(
self.channel_id,
self.sender.clone() as crossbeam::channel::Sender<<T as MachineImpl>::InstructionSet>,
instruction,
) {
Ok(()) => Ok(()),
Err(m) => {
log::warn!("blocking main thread on send");
self.sender.send(m)
},
}
},
Err(TrySendError::Disconnected(m)) => Err(SendError(m)),
}
}
pub fn send_timeout(&self, msg: T, timeout: std::time::Duration) -> Result<(), SendTimeoutError<T>> {
if self.sender.is_full() {
log::warn!("Sender: channel is full, send_timeout will block for {:#?}", timeout);
}
self.sender.send_timeout(msg, timeout)
}
pub fn is_full(&self) -> bool { self.sender.is_full() }
pub fn len(&self) -> usize { self.sender.len() }
pub fn is_empty(&self) -> bool { self.sender.is_empty() }
pub fn capacity(&self) -> Option<usize> { self.sender.capacity() }
}
impl<T> fmt::Debug for Sender<T>
where
T: MachineImpl,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.sender.fmt(f) }
}
impl<T> Clone for Sender<T>
where
T: MachineImpl,
{
fn clone(&self) -> Self {
Self {
channel_id: self.channel_id,
connection: self.connection.clone(),
sender: self.sender.clone(),
}
}
}
impl<T> PartialEq for Sender<T>
where
T: MachineImpl,
{
fn eq(&self, other: &Self) -> bool {
self.channel_id == other.channel_id && self.sender.same_channel(&other.sender)
}
}
impl<T> Eq for Sender<T> where T: MachineImpl {}
pub fn wrap_sender<T>(
sender: crossbeam::channel::Sender<T>,
channel_id: usize,
connection: ThreadSafeConnection,
) -> Sender<T>
where
T: MachineImpl,
{
Sender::<T> {
channel_id,
connection,
sender,
}
}