interrupt_read/lib.rs
1//! An interruptable [`Read`]er
2//!
3//! This crate provides the [`InterruptReader`], which can have its
4//! `read` operations interrupted by an [`Interruptor`]. They are
5//! acquired from the [`interrupt_reader::pair`] function, which
6//! returns an [`mpsc`] channel backed pair.
7//!
8//! When [`Interruptor::interrupt`] is called, the `InterruptReader`
9//! will return an erro of kind [`ErrorKind::Interrupted`]. Otherwise,
10//! it will act like any normal `Read` struct.
11//!
12//! Some things to note about this crate:
13//!
14//! - It functions by spawning a separate thread, which will actually
15//! read from the original `Read`er, so keep that in mind.
16//! - There is some (light) overhead over the read operations.
17//! - You should _not_ wrap this struct in a [`BufReader`] since the
18//! struct already has its own internal buffer.
19//! - This reader doesn't assume that `Ok(0)` is the end of input, and
20//! the spawned thread will only terminate if the
21//! [`InterruptReader`] is dropped.
22//!
23//! [`BufReader`]: std::io::BufReader
24use std::{
25 io::{BufRead, Cursor, Error, ErrorKind, Read, Take},
26 sync::mpsc,
27 thread::JoinHandle,
28};
29
30/// Returns a pair of an [`InterruptReader`] and an [`Interruptor`].
31///
32/// When you call any of the reading methods of `InterruptReader`, the
33/// current thread will block, being unblocked only if:
34///
35/// - The underlying [`Read`]er has more bytes or returned an
36/// [`Error`].
37/// - The [`Interruptor::interrupt`] function was called.
38///
39/// In the former case, it works just like a regular read, giving an
40/// [`std::io::Result`], depending on the operation.
41/// If the latter happens, however, an [`Error`] of type
42/// [`ErrorKind::Interrupted`] will be received, meaning that reading
43/// operations have been interrupted for some user defined reason.
44///
45/// If the channel was interrupted this way, further reads will work
46/// just fine, until another interrupt comes through, creating a
47/// read/interrupt cycle.
48///
49/// Behind the scenes, this is done through channels and a spawned
50/// thread, but no timeout is used, all operations are blocking.
51///
52/// [`Error`]: std::io::Error
53/// [`ErrorKind::Interrupted`]: std::io::ErrorKind::Interrupted
54pub fn pair<R: Read + Send + 'static>(mut reader: R) -> (InterruptReader<R>, Interruptor) {
55 let (event_tx, event_rx) = mpsc::channel();
56 let (buffer_tx, buffer_rx) = mpsc::channel();
57
58 let join_handle = std::thread::spawn({
59 let event_tx = event_tx.clone();
60 move || {
61 // Same capacity as BufReader
62 let mut buf = vec![0; 8 * 1024];
63
64 loop {
65 match reader.read(&mut buf) {
66 Ok(num_bytes) => {
67 // This means the InterruptReader has been dropped, so no more reading
68 // will be done.
69 let event = Event::Buf(std::mem::take(&mut buf), num_bytes);
70 if event_tx.send(event).is_err() {
71 break reader;
72 }
73
74 buf = match buffer_rx.recv() {
75 Ok(buf) => buf,
76 // Same as before.
77 Err(_) => break reader,
78 }
79 }
80 Err(err) => {
81 if event_tx.send(Event::Err(err)).is_err() {
82 break reader;
83 }
84 }
85 }
86 }
87 }
88 });
89
90 let interrupt_reader = InterruptReader {
91 cursor: None,
92 buffer_tx,
93 event_rx,
94 join_handle,
95 };
96 let interruptor = Interruptor(event_tx);
97
98 (interrupt_reader, interruptor)
99}
100
101#[derive(Debug)]
102pub struct InterruptReader<R> {
103 cursor: Option<Take<Cursor<Vec<u8>>>>,
104 buffer_tx: mpsc::Sender<Vec<u8>>,
105 event_rx: mpsc::Receiver<Event>,
106 join_handle: JoinHandle<R>,
107}
108
109impl<R: Read> InterruptReader<R> {
110 /// Unwraps this `InterruptReader`, returning the underlying
111 /// reader.
112 ///
113 /// Note that any leftover data in the internal buffer is lost.
114 /// Therefore, a following read from the underlying reader may
115 /// lead to data loss.
116 ///
117 /// This may return [`Err`] if the underlying joined thread has
118 /// panicked, probably because the [`Read`]er has done so.
119 pub fn into_inner(self) -> std::thread::Result<R> {
120 let Self { buffer_tx, event_rx, join_handle, .. } = self;
121 drop(event_rx);
122 drop(buffer_tx);
123 join_handle.join()
124 }
125}
126
127impl<R: Read> Read for InterruptReader<R> {
128 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
129 if let Some(cursor) = self.cursor.as_mut() {
130 deal_with_interrupt(&self.event_rx)?;
131
132 match cursor.read(buf) {
133 Ok(0) => {
134 let buffer = self.cursor.take().unwrap().into_inner().into_inner();
135 match self.buffer_tx.send(buffer) {
136 Ok(()) => self.read(buf),
137 // Now we handle that.
138 Err(_) => Ok(0),
139 }
140 }
141 Ok(num_bytes) => Ok(num_bytes),
142 Err(_) => unreachable!("Afaik, this shouldn't happen if T is Vec<u8>"),
143 }
144 } else {
145 match self.event_rx.recv() {
146 Ok(Event::Buf(buffer, len)) => {
147 self.cursor = Some(Cursor::new(buffer).take(len as u64));
148 if len == 0 { Ok(0) } else { self.read(buf) }
149 }
150 Ok(Event::Err(err)) => Err(err),
151 Ok(Event::Interrupt) => Err(interrupt_error()),
152 Err(_) => Ok(0),
153 }
154 }
155 }
156}
157
158impl<R: Read> BufRead for InterruptReader<R> {
159 fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
160 if let Some(cursor) = self.cursor.as_mut() {
161 deal_with_interrupt(&self.event_rx)?;
162
163 let (addr, len) = {
164 let buf = cursor.fill_buf()?;
165 ((buf as *const [u8]).addr(), buf.len())
166 };
167
168 if len == 0 {
169 let buffer = self.cursor.take().unwrap().into_inner().into_inner();
170 match self.buffer_tx.send(buffer) {
171 Ok(()) => self.fill_buf(),
172 Err(_) => Ok(&[]),
173 }
174 } else {
175 let buffer = self.cursor.as_ref().unwrap().get_ref().get_ref();
176 let buf_addr = (buffer.as_slice() as *const [u8]).addr();
177
178 // First time the borrow checker actually forced me to do something
179 // inconvenient, instead of the safe alternative.
180 Ok(&buffer[addr - buf_addr..(addr - buf_addr) + len])
181 }
182 } else {
183 match self.event_rx.recv() {
184 Ok(Event::Buf(buffer, len)) => {
185 self.cursor = Some(Cursor::new(buffer).take(len as u64));
186 if len == 0 { Ok(&[]) } else { self.fill_buf() }
187 }
188 Ok(Event::Err(err)) => Err(err),
189 Ok(Event::Interrupt) => Err(interrupt_error()),
190 Err(_) => Ok(&[]),
191 }
192 }
193 }
194
195 fn consume(&mut self, amount: usize) {
196 if let Some(cursor) = self.cursor.as_mut() {
197 cursor.consume(amount);
198 }
199 }
200}
201
202/// An interruptor for an [`InterruptReader`].
203///
204/// This struct serves the purpose of interrupting any of the [`Read`]
205/// or [`ReadBuf`] functions being performend on the `InterruptReader`
206///
207/// If it is dropped, the `InterruptReader` will no longer be able to
208/// be interrupted.
209#[derive(Debug, Clone)]
210pub struct Interruptor(mpsc::Sender<Event>);
211
212impl Interruptor {
213 /// Interrupts the [`InterruptReader`]
214 ///
215 /// This will send an interrupt event to the reader, which makes
216 /// the next `read` operation return [`Err`], with an
217 /// [`ErrorKind::Interrupted`].
218 ///
219 /// Subsequent `read` operations proceed as normal.
220 pub fn interrupt(&self) -> Result<(), InterruptError> {
221 self.0.send(Event::Interrupt).map_err(|_| InterruptError)
222 }
223}
224
225/// An error ocurred while calling [`Interruptor::interrupt`].
226///
227/// This means that the receiving [`InterruptReader`] has been
228/// dropped.
229#[derive(Debug, Clone, Copy)]
230pub struct InterruptError;
231
232impl std::fmt::Display for InterruptError {
233 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234 f.write_str("InterruptError")
235 }
236}
237
238impl std::error::Error for InterruptError {}
239
240#[derive(Debug)]
241enum Event {
242 Buf(Vec<u8>, usize),
243 Err(std::io::Error),
244 Interrupt,
245}
246
247fn interrupt_error() -> Error {
248 Error::new(
249 ErrorKind::Interrupted,
250 "An Interruptor has interrupted this operation.",
251 )
252}
253
254fn deal_with_interrupt(event_rx: &mpsc::Receiver<Event>) -> std::io::Result<()> {
255 match event_rx.try_recv() {
256 Ok(Event::Interrupt) => Err(interrupt_error()),
257 Ok(_) => unreachable!("This should not be possible"),
258 // The channel was dropped, but no need to handle that right now.
259 Err(_) => Ok(()),
260 }
261}