1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
//! Runtime channel for use in communicating between the coordinator and a
//! worker thread.
use std::io::{self, Read, Write};
use crossbeam_channel as crossbeam;
use mio::{unix, Interest, Registry, Token};
/// Data send across the channel to create a `mio::Event`.
const WAKE: &[u8] = b"WAKE";
/// Create a new communication channel.
pub(crate) fn new<T>() -> io::Result<(Sender<T>, Receiver<T>)> {
let (p_send, p_recv) = unix::pipe::new()?;
let (c_send, c_recv) = crossbeam::unbounded();
let sender = Sender {
channel: c_send,
pipe: p_send,
};
let receiver = Receiver {
channel: c_recv,
pipe: p_recv,
};
Ok((sender, receiver))
}
/// Sending end of the communication channel.
#[derive(Debug)]
pub(crate) struct Sender<T> {
channel: crossbeam::Sender<T>,
pipe: unix::pipe::Sender,
}
impl<T> Sender<T> {
/// Try to send a message onto the channel.
pub(crate) fn try_send(&self, msg: T) -> io::Result<()> {
self.channel
.try_send(msg)
.map_err(|_| io::Error::new(io::ErrorKind::NotConnected, "failed to send message"))?;
// Generate an `mio::Event` for the receiving end.
loop {
match (&self.pipe).write(WAKE) {
Ok(0) => {
return Err(io::Error::new(
io::ErrorKind::NotConnected,
"failed to send into channel pipe",
))
}
Ok(..) => return Ok(()),
// Can't do too much here.
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => return Ok(()),
Err(ref err) if err.kind() == io::ErrorKind::Interrupted => continue,
Err(err) => return Err(err),
}
}
}
/// Register the sending end of the Unix pipe of this channel.
pub(super) fn register(&mut self, registry: &Registry, token: Token) -> io::Result<()> {
registry.register(&mut self.pipe, token, Interest::WRITABLE)
}
}
/// Receiving end of the communication channel.
#[derive(Debug)]
pub(crate) struct Receiver<T> {
channel: crossbeam::Receiver<T>,
pipe: unix::pipe::Receiver,
}
impl<T> Receiver<T> {
/// Try to receive a message from the channel.
pub(super) fn try_recv(&mut self) -> io::Result<Option<T>> {
if let Ok(msg) = self.channel.try_recv() {
Ok(Some(msg))
} else {
// If the channel is empty this will likely be the last call in a
// while, so we'll empty the pipe to ensure we'll get another
// notification once the coordinator sends us another message.
let mut buf = [0; 24]; // Fits 6 messages.
loop {
match self.pipe.read(&mut buf) {
Ok(n) if n < buf.len() => break,
// Didn't empty it.
Ok(..) => continue,
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => break,
Err(ref err) if err.kind() == io::ErrorKind::Interrupted => continue,
Err(err) => return Err(err),
}
}
// Try one last time in case the coordinator send a message in
// between the time we last checked and we emptied the pipe above
// (for which we won't get another event as we just emptied the
// pipe).
Ok(self.channel.try_recv().ok())
}
}
/// Register the receiving end of the Unix pipe of this channel.
pub(super) fn register(&mut self, registry: &Registry, token: Token) -> io::Result<()> {
registry.register(&mut self.pipe, token, Interest::READABLE)
}
}