Skip to main content

interrupt_read/
lib.rs

1//! An interruptable [`Read`]er
2//!
3//! This crate provides the [`InterruptReader`], which can have its
4//! `read` operations interrupted by an [`Interruptor`]. They are
5//! acquired from the [`interrupt_reader::pair`] function, which
6//! returns an [`mpsc`] channel backed pair.
7//!
8//! When [`Interruptor::interrupt`] is called, the `InterruptReader`
9//! will return an erro of kind [`ErrorKind::Other`] with a payload of
10//! [`InterruptReceived`] (you can check for that using the
11//! [`is_interrupt`] function). Otherwise, it will act like any normal
12//! `Read` struct.
13//!
14//! Some things to note about this crate:
15//!
16//! - It functions by spawning a separate thread, which will actually
17//!   read from the original `Read`er, so keep that in mind.
18//! - There is some (light) overhead over the read operations.
19//! - You should _not_ wrap this struct in a [`BufReader`] since the
20//!   struct already has its own internal buffer.
21//! - This reader doesn't assume that `Ok(0)` is the end of input, and
22//!   the spawned thread will only terminate if the
23//!   [`InterruptReader`] is dropped.
24//!
25//! # Note
26//!
27//! The reason why this function returns [`ErrorKind::Other`], rather
28//! than [`ErrorKind::Interrupted`] is that the latter error is
29//! ignored by functions like [`BufRead::read_line`] and
30//! [`BufRead::read_until`], which is probably not what you want to
31//! happen.
32//!
33//! [`BufReader`]: std::io::BufReader
34//! [`ErrorKind::Other`]: std::io::ErrorKind::Other
35//! [`ErrorKind::Interrupted`]: std::io::ErrorKind::Interrupted
36use std::{
37    io::{BufRead, Cursor, Error, Read, Take},
38    sync::mpsc,
39    thread::JoinHandle,
40};
41
42/// Returns a pair of an [`InterruptReader`] and an [`Interruptor`].
43///
44/// When you call any of the reading methods of `InterruptReader`, the
45/// current thread will block, being unblocked only if:
46///
47/// - The underlying [`Read`]er has more bytes or returned an
48///   [`Error`].
49/// - The [`Interruptor::interrupt`] function was called.
50///
51/// In the former case, it works just like a regular read, giving an
52/// [`std::io::Result`], depending on the operation.
53/// If the latter happens, however, an [`Error`] of type
54/// [`ErrorKind::Other`] with a payload of [`InterruptReceived`],
55/// meaning that reading operations have been interrupted for some
56/// user defined reason.
57///
58/// You can check if an [`std::io::Error`] is of this type by
59/// calling the [`is_interrupt`] function.
60///
61/// If the channel was interrupted this way, further reads will work
62/// just fine, until another interrupt comes through, creating a
63/// read/interrupt cycle.
64///
65/// Behind the scenes, this is done through channels and a spawned
66/// thread, but no timeout is used, all operations are blocking.
67///
68/// [`Error`]: std::io::Error
69/// [`ErrorKind::Other`]: std::io::ErrorKind::Other
70pub fn pair<R: Read + Send + 'static>(mut reader: R) -> (InterruptReader<R>, Interruptor) {
71    let (event_tx, event_rx) = mpsc::channel();
72    let (buffer_tx, buffer_rx) = mpsc::channel();
73
74    let join_handle = std::thread::spawn({
75        let event_tx = event_tx.clone();
76        move || {
77            // Same capacity as BufReader
78            let mut buf = vec![0; 8 * 1024];
79
80            loop {
81                match reader.read(&mut buf) {
82                    Ok(num_bytes) => {
83                        // This means the InterruptReader has been dropped, so no more reading
84                        // will be done.
85                        let event = Event::Buf(std::mem::take(&mut buf), num_bytes);
86                        if event_tx.send(event).is_err() {
87                            break reader;
88                        }
89
90                        buf = match buffer_rx.recv() {
91                            Ok(buf) => buf,
92                            // Same as before.
93                            Err(_) => break reader,
94                        }
95                    }
96                    Err(err) => {
97                        if event_tx.send(Event::Err(err)).is_err() {
98                            break reader;
99                        }
100                    }
101                }
102            }
103        }
104    });
105
106    let interrupt_reader = InterruptReader {
107        cursor: None,
108        buffer_tx,
109        event_rx,
110        join_handle,
111    };
112    let interruptor = Interruptor(event_tx);
113
114    (interrupt_reader, interruptor)
115}
116
117/// An interruptable, buffered [`Read`]er.
118///
119/// This reader is created by wrapping a `Read` struct in the
120/// [`interrupt_read::pair`] function, which also returns an
121/// [`Interruptor`], which is capable of sending interrupt signals,
122/// which make any `read` operations on the `InterruptReader` return
123/// an error of kind [`ErrorKind::Other`], with a payload of
124/// [`InterruptReceived`].
125///
126/// You can check if an [`std::io::Error`] is of this type by
127/// calling the [`is_interrupt`] function.
128///
129/// # Examples
130///
131/// One potential application of this struct is if you want to stop a
132/// thread that is reading from the stdout of a child process without
133/// necessarily terminating said childrop_:
134///
135/// ```rust
136/// use std::{
137///     io::{BufRead, ErrorKind},
138///     process::{Child, Command, Stdio},
139///     time::Duration,
140/// };
141///
142/// use interrupt_read::{is_interrupt, pair};
143///
144/// struct ChildKiller(Child);
145/// impl Drop for ChildKiller {
146///     fn drop(&mut self) {
147///         _ = self.0.kill();
148///     }
149/// }
150///
151/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
152/// // Prints "hello\n" every second forever.
153/// let mut child = Command::new("bash")
154///     .args(["-c", r#"while true; do echo "hello"; sleep 1; done"#])
155///     .stdout(Stdio::piped())
156///     .spawn()
157///     .unwrap();
158///
159/// let (mut stdout, interruptor) = pair(child.stdout.take().unwrap());
160/// let _child_killer = ChildKiller(child);
161///
162/// let join_handle = std::thread::spawn(move || {
163///     let mut string = String::new();
164///     loop {
165///         match stdout.read_line(&mut string) {
166///             Ok(0) => break Ok(string),
167///             Ok(_) => {}
168///             Err(err) if is_interrupt(&err) => {
169///                 break Ok(string);
170///             }
171///             Err(err) => break Err(err),
172///         }
173///     }
174/// });
175///
176/// std::thread::sleep(Duration::new(3, 1_000_000));
177///
178/// interruptor.interrupt()?;
179///
180/// let result = join_handle.join().unwrap()?;
181///
182/// assert_eq!(result, "hello\nhello\nhello\n");
183///
184/// Ok(())
185/// # }
186/// # match main() {
187/// #     Ok(()) => {}
188/// #     Err(err) => panic!("{err}")
189/// # }
190/// ```
191///
192/// [`interrupt_read::pair`]: pair
193/// [`ErrorKind::Other`]: std::io::ErrorKind::Other
194#[derive(Debug)]
195pub struct InterruptReader<R> {
196    cursor: Option<Take<Cursor<Vec<u8>>>>,
197    buffer_tx: mpsc::Sender<Vec<u8>>,
198    event_rx: mpsc::Receiver<Event>,
199    join_handle: JoinHandle<R>,
200}
201
202impl<R: Read> InterruptReader<R> {
203    /// Unwraps this `InterruptReader`, returning the underlying
204    /// reader.
205    ///
206    /// Note that any leftover data in the internal buffer is lost.
207    /// Therefore, a following read from the underlying reader may
208    /// lead to data loss.
209    ///
210    /// This may return [`Err`] if the underlying joined thread has
211    /// panicked, probably because the [`Read`]er has done so.
212    pub fn into_inner(self) -> std::thread::Result<R> {
213        let Self { buffer_tx, event_rx, join_handle, .. } = self;
214        drop(event_rx);
215        drop(buffer_tx);
216        join_handle.join()
217    }
218}
219
220impl<R: Read> Read for InterruptReader<R> {
221    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
222        if let Some(cursor) = self.cursor.as_mut() {
223            deal_with_interrupt(&self.event_rx)?;
224
225            match cursor.read(buf) {
226                Ok(0) => {
227                    let buffer = self.cursor.take().unwrap().into_inner().into_inner();
228                    match self.buffer_tx.send(buffer) {
229                        Ok(()) => self.read(buf),
230                        // Now we handle that.
231                        Err(_) => Ok(0),
232                    }
233                }
234                Ok(num_bytes) => Ok(num_bytes),
235                Err(_) => unreachable!("Afaik, this shouldn't happen if T is Vec<u8>"),
236            }
237        } else {
238            match self.event_rx.recv() {
239                Ok(Event::Buf(buffer, len)) => {
240                    self.cursor = Some(Cursor::new(buffer).take(len as u64));
241                    if len == 0 { Ok(0) } else { self.read(buf) }
242                }
243                Ok(Event::Err(err)) => Err(err),
244                Ok(Event::Interrupt) => Err(interrupt_error()),
245                Err(_) => Ok(0),
246            }
247        }
248    }
249}
250
251impl<R: Read> BufRead for InterruptReader<R> {
252    fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
253        if let Some(cursor) = self.cursor.as_mut() {
254            deal_with_interrupt(&self.event_rx)?;
255
256            let (addr, len) = {
257                let buf = cursor.fill_buf()?;
258                ((buf as *const [u8]).addr(), buf.len())
259            };
260
261            if len == 0 {
262                let buffer = self.cursor.take().unwrap().into_inner().into_inner();
263                match self.buffer_tx.send(buffer) {
264                    Ok(()) => self.fill_buf(),
265                    Err(_) => Ok(&[]),
266                }
267            } else {
268                let buffer = self.cursor.as_ref().unwrap().get_ref().get_ref();
269                let buf_addr = (buffer.as_slice() as *const [u8]).addr();
270
271                // First time the borrow checker actually forced me to do something
272                // inconvenient, instead of the safe alternative.
273                Ok(&buffer[addr - buf_addr..(addr - buf_addr) + len])
274            }
275        } else {
276            match self.event_rx.recv() {
277                Ok(Event::Buf(buffer, len)) => {
278                    self.cursor = Some(Cursor::new(buffer).take(len as u64));
279                    if len == 0 { Ok(&[]) } else { self.fill_buf() }
280                }
281                Ok(Event::Err(err)) => Err(err),
282                Ok(Event::Interrupt) => Err(interrupt_error()),
283                Err(_) => Ok(&[]),
284            }
285        }
286    }
287
288    fn consume(&mut self, amount: usize) {
289        if let Some(cursor) = self.cursor.as_mut() {
290            cursor.consume(amount);
291        }
292    }
293}
294
295/// An interruptor for an [`InterruptReader`].
296///
297/// This struct serves the purpose of interrupting any of the [`Read`]
298/// or [`BufRead`] functions being performend on the `InterruptReader`
299///
300/// If it is dropped, the `InterruptReader` will no longer be able to
301/// be interrupted.
302#[derive(Debug, Clone)]
303pub struct Interruptor(mpsc::Sender<Event>);
304
305impl Interruptor {
306    /// Interrupts the [`InterruptReader`]
307    ///
308    /// This will send an interrupt event to the reader, which makes
309    /// the next `read` operation return [`Err`], with an
310    /// [`ErrorKind::Other`] with a payload of [`InterruptReceived`].
311    ///
312    /// You can check if an [`std::io::Error`] is of this type by
313    /// calling the [`is_interrupt`] function.
314    ///
315    /// Subsequent `read` operations proceed as normal.
316    ///
317    /// [`ErrorKind::Other`]: std::io::ErrorKind::Other
318    pub fn interrupt(&self) -> Result<(), InterruptSendError> {
319        self.0
320            .send(Event::Interrupt)
321            .map_err(|_| InterruptSendError)
322    }
323}
324
325/// An error ocurred while calling [`Interruptor::interrupt`].
326///
327/// This means that the receiving [`InterruptReader`] has been
328/// dropped.
329#[derive(Debug, Clone, Copy)]
330pub struct InterruptSendError;
331
332impl std::fmt::Display for InterruptSendError {
333    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
334        f.write_str("InterruptReader has been dropped")
335    }
336}
337
338impl std::error::Error for InterruptSendError {}
339
340/// Indicates that an [`Interruptor`] has called
341/// [`Interruptor::interrupt`], causing a read operation to be
342/// interrupted.
343#[derive(Debug, Clone, Copy)]
344pub struct InterruptReceived;
345
346impl std::fmt::Display for InterruptReceived {
347    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
348        f.write_str("Interruptor has interrupted")
349    }
350}
351
352impl std::error::Error for InterruptReceived {}
353
354#[derive(Debug)]
355enum Event {
356    Buf(Vec<u8>, usize),
357    Err(std::io::Error),
358    Interrupt,
359}
360
361/// Wether the error in question originated from an [`Interruptor`]
362/// calling [`Interruptor::interrupt`].
363///
364/// This just checks if the error is of type [`InterruptReceived`]..
365pub fn is_interrupt(err: &Error) -> bool {
366    err.get_ref()
367        .is_some_and(|err| err.is::<InterruptReceived>())
368}
369
370fn interrupt_error() -> Error {
371    Error::other(InterruptReceived)
372}
373
374fn deal_with_interrupt(event_rx: &mpsc::Receiver<Event>) -> std::io::Result<()> {
375    match event_rx.try_recv() {
376        Ok(Event::Interrupt) => Err(interrupt_error()),
377        Ok(_) => unreachable!("This should not be possible"),
378        // The channel was dropped, but no need to handle that right now.
379        Err(_) => Ok(()),
380    }
381}