Skip to main content

interrupt_read/
lib.rs

1//! An interruptable [`Read`]er
2//!
3//! This crate provides the [`InterruptReader`], which can have its
4//! `read` operations interrupted by an [`Interruptor`]. They are
5//! acquired from the [`interrupt_reader::pair`] function, which
6//! returns an [`mpsc`] channel backed pair.
7//!
8//! When [`Interruptor::interrupt`] is called, the `InterruptReader`
9//! will return an erro of kind [`ErrorKind::Other`] with a payload of
10//! [`InterruptReceived`] (you can check for that using the
11//! [`is_interrupt`] function). Otherwise, it will act like any normal
12//! `Read` struct.
13//!
14//! When an interrupt is received, _the underlying data is not lost_,
15//! it still exists, and if you call a reading function again, it will
16//! be retrieved, unless another interrupt is sent before that.
17//!
18//! Some things to note about this crate:
19//!
20//! - It functions by spawning a separate thread, which will actually
21//!   read from the original `Read`er, so keep that in mind.
22//! - There is some (light) overhead over the read operations.
23//! - You should _not_ wrap this struct in a [`BufReader`] since the
24//!   struct already has its own internal buffer.
25//! - This reader doesn't assume that `Ok(0)` is the end of input, and
26//!   the spawned thread will only terminate if the
27//!   [`InterruptReader`] is dropped.
28//!
29//! # Note
30//!
31//! The reason why this function returns [`ErrorKind::Other`], rather
32//! than [`ErrorKind::Interrupted`] is that the latter error is
33//! ignored by functions like [`BufRead::read_line`] and
34//! [`BufRead::read_until`], which is probably not what you want to
35//! happen.
36//!
37//! [`BufReader`]: std::io::BufReader
38//! [`ErrorKind::Other`]: std::io::ErrorKind::Other
39//! [`ErrorKind::Interrupted`]: std::io::ErrorKind::Interrupted
40//! [`interrupt_reader::pair`]: pair
41use std::{
42    io::{BufRead, Cursor, Error, Read, Take},
43    sync::mpsc,
44    thread::JoinHandle,
45};
46
47/// Returns a pair of an [`InterruptReader`] and an [`Interruptor`].
48///
49/// When you call any of the reading methods of `InterruptReader`, the
50/// current thread will block, being unblocked only if:
51///
52/// - The underlying [`Read`]er has more bytes or returned an
53///   [`Error`].
54/// - The [`Interruptor::interrupt`] function was called.
55///
56/// In the former case, it works just like a regular read, giving an
57/// [`std::io::Result`], depending on the operation.
58/// If the latter happens, however, an [`Error`] of type
59/// [`ErrorKind::Other`] with a payload of [`InterruptReceived`],
60/// meaning that reading operations have been interrupted for some
61/// user defined reason.
62///
63/// You can check if an [`std::io::Error`] is of this type by
64/// calling the [`is_interrupt`] function.
65///
66/// If the channel was interrupted this way, further reads will work
67/// just fine, until another interrupt comes through, creating a
68/// read/interrupt cycle.
69///
70/// Behind the scenes, this is done through channels and a spawned
71/// thread, but no timeout is used, all operations are blocking.
72///
73/// [`Error`]: std::io::Error
74/// [`ErrorKind::Other`]: std::io::ErrorKind::Other
75pub fn pair<R: Read + Send + 'static>(mut reader: R) -> (InterruptReader<R>, Interruptor) {
76    let (event_tx, event_rx) = mpsc::channel();
77    let (buffer_tx, buffer_rx) = mpsc::channel();
78
79    let join_handle = std::thread::spawn({
80        let event_tx = event_tx.clone();
81        move || {
82            // Same capacity as BufReader
83            let mut buf = vec![0; 8 * 1024];
84
85            loop {
86                match reader.read(&mut buf) {
87                    Ok(num_bytes) => {
88                        // This means the InterruptReader has been dropped, so no more reading
89                        // will be done.
90                        let event = Event::Buf(std::mem::take(&mut buf), num_bytes);
91                        if event_tx.send(event).is_err() {
92                            break reader;
93                        }
94
95                        buf = match buffer_rx.recv() {
96                            Ok(buf) => buf,
97                            // Same as before.
98                            Err(_) => break reader,
99                        }
100                    }
101                    Err(err) => {
102                        if event_tx.send(Event::Err(err)).is_err() {
103                            break reader;
104                        }
105                    }
106                }
107            }
108        }
109    });
110
111    let interrupt_reader = InterruptReader {
112        cursor: None,
113        buffer_tx,
114        event_rx,
115        join_handle,
116    };
117    let interruptor = Interruptor(event_tx);
118
119    (interrupt_reader, interruptor)
120}
121
122/// An interruptable, buffered [`Read`]er.
123///
124/// This reader is created by wrapping a `Read` struct in the
125/// [`interrupt_read::pair`] function, which also returns an
126/// [`Interruptor`], which is capable of sending interrupt signals,
127/// which make any `read` operations on the `InterruptReader` return
128/// an error of kind [`ErrorKind::Other`], with a payload of
129/// [`InterruptReceived`].
130///
131/// When an interrupt is received, _the underlying data is not lost_,
132/// it still exists, and if you call a reading function again, it will
133/// be retrieved, unless another interrupt is sent before that.
134///
135/// You can check if an [`std::io::Error`] is of this type by
136/// calling the [`is_interrupt`] function.
137///
138/// # Examples
139///
140/// One potential application of this struct is if you want to stop a
141/// thread that is reading from the stdout of a child process without
142/// necessarily terminating said childrop_:
143///
144/// ```rust
145/// use std::{
146///     io::{BufRead, ErrorKind},
147///     process::{Child, Command, Stdio},
148///     time::Duration,
149/// };
150///
151/// use interrupt_read::{is_interrupt, pair};
152///
153/// struct ChildKiller(Child);
154/// impl Drop for ChildKiller {
155///     fn drop(&mut self) {
156///         _ = self.0.kill();
157///     }
158/// }
159///
160/// # match main() {
161/// #     Ok(()) => {}
162/// #     Err(err) => panic!("{err}")
163/// # }
164/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
165/// // Prints "hello\n" every second forever.
166/// let mut child = Command::new("bash")
167///     .args(["-c", r#"while true; do echo "hello"; sleep 1; done"#])
168///     .stdout(Stdio::piped())
169///     .spawn()
170///     .unwrap();
171///
172/// let (mut stdout, interruptor) = pair(child.stdout.take().unwrap());
173/// let _child_killer = ChildKiller(child);
174///
175/// let join_handle = std::thread::spawn(move || {
176///     let mut string = String::new();
177///     loop {
178///         match stdout.read_line(&mut string) {
179///             Ok(0) => break Ok(string),
180///             Ok(_) => {}
181///             Err(err) if is_interrupt(&err) => {
182///                 break Ok(string);
183///             }
184///             Err(err) => break Err(err),
185///         }
186///     }
187/// });
188///
189/// std::thread::sleep(Duration::new(3, 1_000_000));
190///
191/// interruptor.interrupt()?;
192///
193/// let result = join_handle.join().unwrap()?;
194///
195/// assert_eq!(result, "hello\nhello\nhello\n");
196///
197/// Ok(())
198/// # }
199/// ```
200///
201/// [`interrupt_read::pair`]: pair
202/// [`ErrorKind::Other`]: std::io::ErrorKind::Other
203#[derive(Debug)]
204pub struct InterruptReader<R> {
205    cursor: Option<Take<Cursor<Vec<u8>>>>,
206    buffer_tx: mpsc::Sender<Vec<u8>>,
207    event_rx: mpsc::Receiver<Event>,
208    join_handle: JoinHandle<R>,
209}
210
211impl<R: Read> InterruptReader<R> {
212    /// Unwraps this `InterruptReader`, returning the underlying
213    /// reader.
214    ///
215    /// Note that any leftover data in the internal buffer is lost.
216    /// Therefore, a following read from the underlying reader may
217    /// lead to data loss.
218    ///
219    /// This may return [`Err`] if the underlying joined thread has
220    /// panicked, probably because the [`Read`]er has done so.
221    pub fn into_inner(self) -> std::thread::Result<R> {
222        let Self { buffer_tx, event_rx, join_handle, .. } = self;
223        drop((event_rx, buffer_tx));
224        join_handle.join()
225    }
226}
227
228impl<R: Read> Read for InterruptReader<R> {
229    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
230        if let Some(cursor) = self.cursor.as_mut() {
231            deal_with_interrupt(&self.event_rx)?;
232
233            match cursor.read(buf) {
234                Ok(0) => {
235                    let buffer = self.cursor.take().unwrap().into_inner().into_inner();
236                    match self.buffer_tx.send(buffer) {
237                        Ok(()) => self.read(buf),
238                        // Now we handle that.
239                        Err(_) => Ok(0),
240                    }
241                }
242                Ok(num_bytes) => Ok(num_bytes),
243                Err(_) => unreachable!("Afaik, this shouldn't happen if T is Vec<u8>"),
244            }
245        } else {
246            match self.event_rx.recv() {
247                Ok(Event::Buf(buffer, len)) => {
248                    self.cursor = Some(Cursor::new(buffer).take(len as u64));
249                    if len == 0 { Ok(0) } else { self.read(buf) }
250                }
251                Ok(Event::Err(err)) => Err(err),
252                Ok(Event::Interrupt) => Err(interrupt_error()),
253                Err(_) => Ok(0),
254            }
255        }
256    }
257}
258
259impl<R: Read> BufRead for InterruptReader<R> {
260    fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
261        if let Some(cursor) = self.cursor.as_mut() {
262            deal_with_interrupt(&self.event_rx)?;
263
264            let (addr, len) = {
265                let buf = cursor.fill_buf()?;
266                ((buf as *const [u8]).addr(), buf.len())
267            };
268
269            if len == 0 {
270                let buffer = self.cursor.take().unwrap().into_inner().into_inner();
271                match self.buffer_tx.send(buffer) {
272                    Ok(()) => self.fill_buf(),
273                    Err(_) => Ok(&[]),
274                }
275            } else {
276                let buffer = self.cursor.as_ref().unwrap().get_ref().get_ref();
277                let buf_addr = (buffer.as_slice() as *const [u8]).addr();
278
279                // First time the borrow checker actually forced me to do something
280                // inconvenient, instead of the safe alternative.
281                Ok(&buffer[addr - buf_addr..(addr - buf_addr) + len])
282            }
283        } else {
284            match self.event_rx.recv() {
285                Ok(Event::Buf(buffer, len)) => {
286                    self.cursor = Some(Cursor::new(buffer).take(len as u64));
287                    if len == 0 { Ok(&[]) } else { self.fill_buf() }
288                }
289                Ok(Event::Err(err)) => Err(err),
290                Ok(Event::Interrupt) => Err(interrupt_error()),
291                Err(_) => Ok(&[]),
292            }
293        }
294    }
295
296    fn consume(&mut self, amount: usize) {
297        if let Some(cursor) = self.cursor.as_mut() {
298            cursor.consume(amount);
299        }
300    }
301}
302
303/// An interruptor for an [`InterruptReader`].
304///
305/// This struct serves the purpose of interrupting any of the [`Read`]
306/// or [`BufRead`] functions being performend on the `InterruptReader`
307///
308/// If it is dropped, the `InterruptReader` will no longer be able to
309/// be interrupted.
310#[derive(Debug, Clone)]
311pub struct Interruptor(mpsc::Sender<Event>);
312
313impl Interruptor {
314    /// Interrupts the [`InterruptReader`]
315    ///
316    /// This will send an interrupt event to the reader, which makes
317    /// the next `read` operation return [`Err`], with an
318    /// [`ErrorKind::Other`] with a payload of [`InterruptReceived`].
319    ///
320    /// You can check if an [`std::io::Error`] is of this type by
321    /// calling the [`is_interrupt`] function.
322    ///
323    /// Subsequent `read` operations proceed as normal.
324    ///
325    /// [`ErrorKind::Other`]: std::io::ErrorKind::Other
326    pub fn interrupt(&self) -> Result<(), InterruptSendError> {
327        self.0
328            .send(Event::Interrupt)
329            .map_err(|_| InterruptSendError)
330    }
331}
332
333/// An error occurred while calling [`Interruptor::interrupt`].
334///
335/// This means that the receiving [`InterruptReader`] has been
336/// dropped.
337#[derive(Debug, Clone, Copy)]
338pub struct InterruptSendError;
339
340impl std::fmt::Display for InterruptSendError {
341    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
342        f.write_str("InterruptReader has been dropped")
343    }
344}
345
346impl std::error::Error for InterruptSendError {}
347
348/// Indicates that an [`Interruptor`] has called
349/// [`Interruptor::interrupt`], causing a read operation to be
350/// interrupted.
351#[derive(Debug, Clone, Copy)]
352pub struct InterruptReceived;
353
354impl std::fmt::Display for InterruptReceived {
355    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
356        f.write_str("Interruptor has interrupted")
357    }
358}
359
360impl std::error::Error for InterruptReceived {}
361
362#[derive(Debug)]
363enum Event {
364    Buf(Vec<u8>, usize),
365    Err(std::io::Error),
366    Interrupt,
367}
368
369/// Wether the error in question originated from an [`Interruptor`]
370/// calling [`Interruptor::interrupt`].
371///
372/// This just checks if the error is of type [`InterruptReceived`].
373///
374/// # Examples
375///
376/// ```
377/// use std::io::{BufRead, Read, Result};
378///
379/// use interrupt_read::{InterruptReader, is_interrupt};
380///
381/// // Read until either `Ok(0)` or an interrupt occurred.
382/// fn interrupt_read_loop(mut reader: InterruptReader<impl Read>) -> Result<String> {
383///     let mut string = String::new();
384///     loop {
385///         match reader.read_line(&mut string) {
386///             Ok(0) => break Ok(string),
387///             Ok(_) => {}
388///             Err(err) if is_interrupt(&err) => break Ok(string),
389///             Err(err) => break Err(err),
390///         }
391///     }
392/// }
393/// ```
394pub fn is_interrupt(err: &Error) -> bool {
395    err.get_ref()
396        .is_some_and(|err| err.is::<InterruptReceived>())
397}
398
399fn interrupt_error() -> Error {
400    Error::other(InterruptReceived)
401}
402
403fn deal_with_interrupt(event_rx: &mpsc::Receiver<Event>) -> std::io::Result<()> {
404    match event_rx.try_recv() {
405        Ok(Event::Interrupt) => Err(interrupt_error()),
406        Ok(_) => unreachable!("This should not be possible"),
407        // The channel was dropped, but no need to handle that right now.
408        Err(_) => Ok(()),
409    }
410}