throttled_reader/
lib.rs

1//! This crate provides `ThrottledReader`, a proxy-type for `io::Read` that limits how many times
2//! the underlying reader can be read from. If the read budget is exceeded,
3//! `io::ErrorKind::WouldBlock` is returned instead. This type can be useful to enforce fairness
4//! when reading from many (potentially asynchronous) input streams with highly varying load. If
5//! one stream always has data available, a worker may continue consuming its input forever,
6//! neglecting the other stream.
7//!
8//! # Examples
9//!
10//! ```
11//! # use std::io;
12//! # use std::io::prelude::*;
13//! # use throttled_reader::ThrottledReader;
14//! let mut buf = [0];
15//! let mut stream = ThrottledReader::new(io::empty());
16//!
17//! // initially no limit
18//! assert!(stream.read(&mut buf).is_ok());
19//! assert!(stream.read(&mut buf).is_ok());
20//!
21//! // set a limit
22//! stream.set_limit(2);
23//! assert!(stream.read(&mut buf).is_ok()); // first is allowed through
24//! assert!(stream.read(&mut buf).is_ok()); // second is also allowed through
25//! // but now the limit is reached, and the underlying stream is no longer accessible
26//! assert_eq!(
27//!     stream.read(&mut buf).unwrap_err().kind(),
28//!     io::ErrorKind::WouldBlock
29//! );
30//!
31//! // we can then unthrottle it again after checking other streams
32//! stream.unthrottle();
33//! assert!(stream.read(&mut buf).is_ok());
34//! assert!(stream.read(&mut buf).is_ok());
35//! ```
36#![deny(missing_docs)]
37use std::io;
38
39/// `ThrottleReader` proxies an `io::Read`, but enforces a budget on how many `read` calls can be
40/// made to the underlying reader.
41#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
42pub struct ThrottledReader<R> {
43    reader: R,
44    read_budget: Option<usize>,
45}
46
47impl<R> ThrottledReader<R> {
48    /// Construct a new throttler that wraps the given reader.
49    ///
50    /// The new `ThrottledReader` initially has no limit.
51    pub fn new(reader: R) -> Self {
52        ThrottledReader {
53            reader,
54            read_budget: None,
55        }
56    }
57
58    /// Set the number of `read` calls allowed to the underlying reader.
59    pub fn set_limit(&mut self, limit: usize) {
60        self.read_budget = Some(limit);
61    }
62
63    /// Remove the limit on how many `read` calls can be issued to the underlying reader.
64    pub fn unthrottle(&mut self) {
65        self.read_budget = None;
66    }
67
68    /// Check how many more `read` calls may be issued to the underlying reader.
69    ///
70    /// Returns `None` if the reader is not currently throttled.
71    pub fn remaining(&self) -> Option<usize> {
72        self.read_budget
73    }
74
75    /// Extract the underlying reader.
76    pub fn into_inner(self) -> R {
77        self.reader
78    }
79}
80
81impl<R> io::Read for ThrottledReader<R>
82where
83    R: io::Read,
84{
85    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
86        match self.read_budget.map(|r| r.checked_sub(1)) {
87            None => {
88                // no limit
89                self.reader.read(buf)
90            }
91            Some(None) => {
92                // past limit
93                Err(io::Error::new(io::ErrorKind::WouldBlock, "read throttled"))
94            }
95            Some(Some(remaining)) => {
96                // above limit
97                self.read_budget = Some(remaining);
98                self.reader.read(buf)
99            }
100        }
101    }
102}
103
104impl<R> From<R> for ThrottledReader<R> {
105    fn from(reader: R) -> Self {
106        ThrottledReader::new(reader)
107    }
108}
109
110impl<R> Default for ThrottledReader<R>
111where
112    R: Default,
113{
114    fn default() -> Self {
115        ThrottledReader {
116            reader: R::default(),
117            read_budget: None,
118        }
119    }
120}
121
122use std::ops::{Deref, DerefMut};
123impl<R> Deref for ThrottledReader<R> {
124    type Target = R;
125    fn deref(&self) -> &Self::Target {
126        &self.reader
127    }
128}
129
130impl<R> DerefMut for ThrottledReader<R> {
131    fn deref_mut(&mut self) -> &mut Self::Target {
132        &mut self.reader
133    }
134}
135
136#[cfg(test)]
137mod tests {
138    use super::*;
139    use std::io::prelude::*;
140
141    #[test]
142    fn it_works() {
143        let mut s = ThrottledReader::new(io::empty());
144        // initially no limit
145        assert_eq!(s.read(&mut [0]).unwrap(), 0);
146        assert_eq!(s.read(&mut [0]).unwrap(), 0);
147        assert_eq!(s.read(&mut [0]).unwrap(), 0);
148
149        // set a limit
150        s.set_limit(2);
151        assert_eq!(s.read(&mut [0]).unwrap(), 0); // first is allowed through
152        assert_eq!(s.remaining(), Some(1));
153        assert_eq!(s.read(&mut [0]).unwrap(), 0); // second is allowed through
154        assert_eq!(s.remaining(), Some(0));
155        assert_eq!(
156            s.read(&mut [0]).unwrap_err().kind(),
157            io::ErrorKind::WouldBlock
158        ); // third is *not* allowed
159        assert_eq!(s.remaining(), Some(0));
160        assert_eq!(
161            s.read(&mut [0]).unwrap_err().kind(),
162            io::ErrorKind::WouldBlock
163        ); // obviously neither is fourth
164        assert_eq!(s.remaining(), Some(0));
165
166        // unthrottle again
167        s.unthrottle();
168        assert_eq!(s.read(&mut [0]).unwrap(), 0);
169        assert_eq!(s.read(&mut [0]).unwrap(), 0);
170        assert_eq!(s.read(&mut [0]).unwrap(), 0);
171    }
172}