Skip to main content

interrupt_read/
lib.rs

1//! An interruptable [`Read`]er
2//!
3//! This crate provides the [`InterruptReader`], which can have its
4//! `read` operations interrupted by an [`Interruptor`]. They are
5//! acquired from the [`interrupt_reader::pair`] function, which
6//! returns an [`mpsc`] channel backed pair.
7//!
8//! When [`Interruptor::interrupt`] is called, the `InterruptReader`
9//! will return an erro of kind [`ErrorKind::Interrupted`]. Otherwise,
10//! it will act like any normal `Read` struct.
11//!
12//! Some things to note about this crate:
13//!
14//! - It functions by spawning a separate thread, which will actually
15//!   read from the original `Read`er, so keep that in mind.
16//! - There is some (light) overhead over the read operations.
17//! - You should _not_ wrap this struct in a [`BufReader`] since the
18//!   struct already has its own internal buffer.
19//! - This reader doesn't assume that `Ok(0)` is the end of input, and
20//!   the spawned thread will only terminate if the
21//!   [`InterruptReader`] is dropped.
22//!
23//! [`BufReader`]: std::io::BufReader
24use std::{
25    io::{BufRead, Cursor, Error, ErrorKind, Read, Take},
26    sync::mpsc,
27    thread::JoinHandle,
28};
29
30/// Returns a pair of an [`InterruptReader`] and an [`Interruptor`].
31///
32/// When you call any of the reading methods of `InterruptReader`, the
33/// current thread will block, being unblocked only if:
34///
35/// - The underlying [`Read`]er has more bytes or returned an
36///   [`Error`].
37/// - The [`Interruptor::interrupt`] function was called.
38///
39/// In the former case, it works just like a regular read, giving an
40/// [`std::io::Result`], depending on the operation.
41/// If the latter happens, however, an [`Error`] of type
42/// [`ErrorKind::Interrupted`] will be received, meaning that reading
43/// operations have been interrupted for some user defined reason.
44///
45/// If the channel was interrupted this way, further reads will work
46/// just fine, until another interrupt comes through, creating a
47/// read/interrupt cycle.
48///
49/// Behind the scenes, this is done through channels and a spawned
50/// thread, but no timeout is used, all operations are blocking.
51///
52/// [`Error`]: std::io::Error
53/// [`ErrorKind::Interrupted`]: std::io::ErrorKind::Interrupted
54pub fn pair<R: Read + Send + 'static>(mut reader: R) -> (InterruptReader<R>, Interruptor) {
55    let (event_tx, event_rx) = mpsc::channel();
56    let (buffer_tx, buffer_rx) = mpsc::channel();
57
58    let join_handle = std::thread::spawn({
59        let event_tx = event_tx.clone();
60        move || {
61            // Same capacity as BufReader
62            let mut buf = vec![0; 8 * 1024];
63
64            loop {
65                match reader.read(&mut buf) {
66                    Ok(num_bytes) => {
67                        // This means the InterruptReader has been dropped, so no more reading
68                        // will be done.
69                        let event = Event::Buf(std::mem::take(&mut buf), num_bytes);
70                        if event_tx.send(event).is_err() {
71                            break reader;
72                        }
73
74                        buf = match buffer_rx.recv() {
75                            Ok(buf) => buf,
76                            // Same as before.
77                            Err(_) => break reader,
78                        }
79                    }
80                    Err(err) => {
81                        if event_tx.send(Event::Err(err)).is_err() {
82                            break reader;
83                        }
84                    }
85                }
86            }
87        }
88    });
89
90    let interrupt_reader = InterruptReader {
91        cursor: None,
92        buffer_tx,
93        event_rx,
94        join_handle,
95    };
96    let interruptor = Interruptor(event_tx);
97
98    (interrupt_reader, interruptor)
99}
100
101#[derive(Debug)]
102pub struct InterruptReader<R> {
103    cursor: Option<Take<Cursor<Vec<u8>>>>,
104    buffer_tx: mpsc::Sender<Vec<u8>>,
105    event_rx: mpsc::Receiver<Event>,
106    join_handle: JoinHandle<R>,
107}
108
109impl<R: Read> InterruptReader<R> {
110    /// Unwraps this `InterruptReader`, returning the underlying
111    /// reader.
112    ///
113    /// Note that any leftover data in the internal buffer is lost.
114    /// Therefore, a following read from the underlying reader may
115    /// lead to data loss.
116    ///
117    /// This may return [`Err`] if the underlying joined thread has
118    /// panicked, probably because the [`Read`]er has done so.
119    pub fn into_inner(self) -> std::thread::Result<R> {
120        let Self { buffer_tx, event_rx, join_handle, .. } = self;
121        drop(event_rx);
122        drop(buffer_tx);
123        join_handle.join()
124    }
125}
126
127impl<R: Read> Read for InterruptReader<R> {
128    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
129        if let Some(cursor) = self.cursor.as_mut() {
130            deal_with_interrupt(&self.event_rx)?;
131
132            match cursor.read(buf) {
133                Ok(0) => {
134                    let buffer = self.cursor.take().unwrap().into_inner().into_inner();
135                    match self.buffer_tx.send(buffer) {
136                        Ok(()) => self.read(buf),
137                        // Now we handle that.
138                        Err(_) => Ok(0),
139                    }
140                }
141                Ok(num_bytes) => Ok(num_bytes),
142                Err(_) => unreachable!("Afaik, this shouldn't happen if T is Vec<u8>"),
143            }
144        } else {
145            match self.event_rx.recv() {
146                Ok(Event::Buf(buffer, len)) => {
147                    self.cursor = Some(Cursor::new(buffer).take(len as u64));
148                    if len == 0 { Ok(0) } else { self.read(buf) }
149                }
150                Ok(Event::Err(err)) => Err(err),
151                Ok(Event::Interrupt) => Err(interrupt_error()),
152                Err(_) => Ok(0),
153            }
154        }
155    }
156}
157
158impl<R: Read> BufRead for InterruptReader<R> {
159    fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
160        if let Some(cursor) = self.cursor.as_mut() {
161            deal_with_interrupt(&self.event_rx)?;
162
163            let (addr, len) = {
164                let buf = cursor.fill_buf()?;
165                ((buf as *const [u8]).addr(), buf.len())
166            };
167
168            if len == 0 {
169                let buffer = self.cursor.take().unwrap().into_inner().into_inner();
170                match self.buffer_tx.send(buffer) {
171                    Ok(()) => self.fill_buf(),
172                    Err(_) => Ok(&[]),
173                }
174            } else {
175                let buffer = self.cursor.as_ref().unwrap().get_ref().get_ref();
176                let buf_addr = (buffer.as_slice() as *const [u8]).addr();
177
178                // First time the borrow checker actually forced me to do something
179                // inconvenient, instead of the safe alternative.
180                Ok(&buffer[addr - buf_addr..(addr - buf_addr) + len])
181            }
182        } else {
183            match self.event_rx.recv() {
184                Ok(Event::Buf(buffer, len)) => {
185                    self.cursor = Some(Cursor::new(buffer).take(len as u64));
186                    if len == 0 { Ok(&[]) } else { self.fill_buf() }
187                }
188                Ok(Event::Err(err)) => Err(err),
189                Ok(Event::Interrupt) => Err(interrupt_error()),
190                Err(_) => Ok(&[]),
191            }
192        }
193    }
194
195    fn consume(&mut self, amount: usize) {
196        if let Some(cursor) = self.cursor.as_mut() {
197            cursor.consume(amount);
198        }
199    }
200}
201
202/// An interruptor for an [`InterruptReader`].
203///
204/// This struct serves the purpose of interrupting any of the [`Read`]
205/// or [`ReadBuf`] functions being performend on the `InterruptReader`
206///
207/// If it is dropped, the `InterruptReader` will no longer be able to
208/// be interrupted.
209#[derive(Debug, Clone)]
210pub struct Interruptor(mpsc::Sender<Event>);
211
212impl Interruptor {
213    /// Interrupts the [`InterruptReader`]
214    ///
215    /// This will send an interrupt event to the reader, which makes
216    /// the next `read` operation return [`Err`], with an
217    /// [`ErrorKind::Interrupted`].
218    ///
219    /// Subsequent `read` operations proceed as normal.
220    pub fn interrupt(&self) -> Result<(), InterruptError> {
221        self.0.send(Event::Interrupt).map_err(|_| InterruptError)
222    }
223}
224
225/// An error ocurred while calling [`Interruptor::interrupt`].
226///
227/// This means that the receiving [`InterruptReader`] has been
228/// dropped.
229#[derive(Debug, Clone, Copy)]
230pub struct InterruptError;
231
232impl std::fmt::Display for InterruptError {
233    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234        f.write_str("InterruptError")
235    }
236}
237
238impl std::error::Error for InterruptError {}
239
240#[derive(Debug)]
241enum Event {
242    Buf(Vec<u8>, usize),
243    Err(std::io::Error),
244    Interrupt,
245}
246
247fn interrupt_error() -> Error {
248    Error::new(
249        ErrorKind::Interrupted,
250        "An Interruptor has interrupted this operation.",
251    )
252}
253
254fn deal_with_interrupt(event_rx: &mpsc::Receiver<Event>) -> std::io::Result<()> {
255    match event_rx.try_recv() {
256        Ok(Event::Interrupt) => Err(interrupt_error()),
257        Ok(_) => unreachable!("This should not be possible"),
258        // The channel was dropped, but no need to handle that right now.
259        Err(_) => Ok(()),
260    }
261}