1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
use crate::error::SysError;
use crate::shim::{self, SelectFd};
use rustix::io::retry_on_intr;
use rustix::pipe;
use std::io::{Error, Read};
use std::os::fd::{AsFd, OwnedFd};
use std::sync::{Arc, Mutex};
use std::time::Duration;
#[derive(PartialEq)]
enum Mode {
Timeout(Duration),
NoTimeout,
Closed,
}
/// Allows to read from fd in one thread and interrupt read or change
/// read timeout from another thread.
pub struct InterruptibleReader<Fd: AsFd> {
mode: Mutex<Mode>,
fd: Fd,
pipe_rd: OwnedFd,
pipe_wr: OwnedFd,
}
impl<Fd: AsFd> InterruptibleReader<Fd> {
/// Construct new reader.
/// Gains ownership of the fd.
pub fn open(fd: Fd) -> Result<Self, SysError> {
let (pipe_rd, pipe_wr) = match retry_on_intr(|| pipe::pipe()) {
Ok(fds) => fds,
Err(err) => return Err(SysError("pipe()", err)),
};
Ok(InterruptibleReader {
mode: Mutex::new(Mode::NoTimeout),
fd,
pipe_rd,
pipe_wr,
})
}
/// Close reader.
/// Will wake up and abort ongoing reads.
pub fn close(&self) -> Result<(), SysError> {
{
// update mode
let mut locked_mode = self.mode.lock().unwrap();
if *locked_mode == Mode::Closed {
return Ok(());
}
*locked_mode = Mode::Closed;
}
// wake up and abort blocked read
if let Err(err) = shim::write(self.pipe_wr.as_fd(), &[0u8]) {
return Err(SysError("write(pipe)", err));
}
Ok(())
}
/// Set read timeout.
/// Will wake up and restart ongoing reads.
pub fn set_timeout(&self, duration: Duration) -> Result<(), SysError> {
{
// update mode
let mut locked_mode = self.mode.lock().unwrap();
if *locked_mode == Mode::Closed {
return Ok(());
}
*locked_mode = Mode::Timeout(duration);
}
// wake up and restart blocked read
if let Err(err) = shim::write(self.pipe_wr.as_fd(), &[0u8]) {
return Err(SysError("write(pipe)", err));
}
Ok(())
}
/// Construct blocking reader.
/// Waits until there is *some* data, OR reader is closed, OR read timeout
/// is set and expires.
pub fn blocking_reader(self: &Arc<Self>) -> ArcTimeoutReader<Fd> {
ArcTimeoutReader(Arc::clone(self))
}
/// Invoked by ArcTimeoutReader::read().
fn read_imp(&self, buf: &mut [u8]) -> Result<usize, Error> {
loop {
// re-read mode
let timeout = {
let locked_mode = self.mode.lock().unwrap();
match *locked_mode {
// read with timeout
Mode::Timeout(d) => Some(d),
// read without timeout
Mode::NoTimeout => None,
// closeed, return EOF
Mode::Closed => {
return Ok(0);
}
}
};
// wait until descriptor is ready or timeout expires
let mut pipe_fd = SelectFd {
fd: self.pipe_rd.as_fd(),
ready: false,
};
let mut data_fd = SelectFd {
fd: self.fd.as_fd(),
ready: false,
};
shim::select(&mut [&mut pipe_fd, &mut data_fd], timeout)?;
if pipe_fd.ready {
// wake up from set_timeout() or close()
// drain bytes from pipe
_ = shim::read(self.pipe_rd.as_fd(), &mut [0u8; 128]);
}
if data_fd.ready {
// data from file
break;
}
if !pipe_fd.ready && !data_fd.ready && timeout.is_some() {
// timeout expired, return EOF
return Ok(0);
}
}
// if we're here, there is new data in file
match shim::read(self.fd.as_fd(), buf) {
Ok(n) => Ok(n),
Err(err) => Err(Error::new(err.kind(), err)),
}
}
}
/// Wrapper for Arc<TimeoutReader> that implements Read trait.
pub struct ArcTimeoutReader<Fd: AsFd>(Arc<InterruptibleReader<Fd>>);
impl<Fd: AsFd> Read for ArcTimeoutReader<Fd> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
self.0.read_imp(buf)
}
}