throttled_reader/lib.rs
1//! This crate provides `ThrottledReader`, a proxy-type for `io::Read` that limits how many times
2//! the underlying reader can be read from. If the read budget is exceeded,
3//! `io::ErrorKind::WouldBlock` is returned instead. This type can be useful to enforce fairness
4//! when reading from many (potentially asynchronous) input streams with highly varying load. If
5//! one stream always has data available, a worker may continue consuming its input forever,
6//! neglecting the other stream.
7//!
8//! # Examples
9//!
10//! ```
11//! # use std::io;
12//! # use std::io::prelude::*;
13//! # use throttled_reader::ThrottledReader;
14//! let mut buf = [0];
15//! let mut stream = ThrottledReader::new(io::empty());
16//!
17//! // initially no limit
18//! assert!(stream.read(&mut buf).is_ok());
19//! assert!(stream.read(&mut buf).is_ok());
20//!
21//! // set a limit
22//! stream.set_limit(2);
23//! assert!(stream.read(&mut buf).is_ok()); // first is allowed through
24//! assert!(stream.read(&mut buf).is_ok()); // second is also allowed through
25//! // but now the limit is reached, and the underlying stream is no longer accessible
26//! assert_eq!(
27//! stream.read(&mut buf).unwrap_err().kind(),
28//! io::ErrorKind::WouldBlock
29//! );
30//!
31//! // we can then unthrottle it again after checking other streams
32//! stream.unthrottle();
33//! assert!(stream.read(&mut buf).is_ok());
34//! assert!(stream.read(&mut buf).is_ok());
35//! ```
36#![deny(missing_docs)]
37use std::io;
38
39/// `ThrottleReader` proxies an `io::Read`, but enforces a budget on how many `read` calls can be
40/// made to the underlying reader.
41#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
42pub struct ThrottledReader<R> {
43 reader: R,
44 read_budget: Option<usize>,
45}
46
47impl<R> ThrottledReader<R> {
48 /// Construct a new throttler that wraps the given reader.
49 ///
50 /// The new `ThrottledReader` initially has no limit.
51 pub fn new(reader: R) -> Self {
52 ThrottledReader {
53 reader,
54 read_budget: None,
55 }
56 }
57
58 /// Set the number of `read` calls allowed to the underlying reader.
59 pub fn set_limit(&mut self, limit: usize) {
60 self.read_budget = Some(limit);
61 }
62
63 /// Remove the limit on how many `read` calls can be issued to the underlying reader.
64 pub fn unthrottle(&mut self) {
65 self.read_budget = None;
66 }
67
68 /// Check how many more `read` calls may be issued to the underlying reader.
69 ///
70 /// Returns `None` if the reader is not currently throttled.
71 pub fn remaining(&self) -> Option<usize> {
72 self.read_budget
73 }
74
75 /// Extract the underlying reader.
76 pub fn into_inner(self) -> R {
77 self.reader
78 }
79}
80
81impl<R> io::Read for ThrottledReader<R>
82where
83 R: io::Read,
84{
85 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
86 match self.read_budget.map(|r| r.checked_sub(1)) {
87 None => {
88 // no limit
89 self.reader.read(buf)
90 }
91 Some(None) => {
92 // past limit
93 Err(io::Error::new(io::ErrorKind::WouldBlock, "read throttled"))
94 }
95 Some(Some(remaining)) => {
96 // above limit
97 self.read_budget = Some(remaining);
98 self.reader.read(buf)
99 }
100 }
101 }
102}
103
104impl<R> From<R> for ThrottledReader<R> {
105 fn from(reader: R) -> Self {
106 ThrottledReader::new(reader)
107 }
108}
109
110impl<R> Default for ThrottledReader<R>
111where
112 R: Default,
113{
114 fn default() -> Self {
115 ThrottledReader {
116 reader: R::default(),
117 read_budget: None,
118 }
119 }
120}
121
122use std::ops::{Deref, DerefMut};
123impl<R> Deref for ThrottledReader<R> {
124 type Target = R;
125 fn deref(&self) -> &Self::Target {
126 &self.reader
127 }
128}
129
130impl<R> DerefMut for ThrottledReader<R> {
131 fn deref_mut(&mut self) -> &mut Self::Target {
132 &mut self.reader
133 }
134}
135
136#[cfg(test)]
137mod tests {
138 use super::*;
139 use std::io::prelude::*;
140
141 #[test]
142 fn it_works() {
143 let mut s = ThrottledReader::new(io::empty());
144 // initially no limit
145 assert_eq!(s.read(&mut [0]).unwrap(), 0);
146 assert_eq!(s.read(&mut [0]).unwrap(), 0);
147 assert_eq!(s.read(&mut [0]).unwrap(), 0);
148
149 // set a limit
150 s.set_limit(2);
151 assert_eq!(s.read(&mut [0]).unwrap(), 0); // first is allowed through
152 assert_eq!(s.remaining(), Some(1));
153 assert_eq!(s.read(&mut [0]).unwrap(), 0); // second is allowed through
154 assert_eq!(s.remaining(), Some(0));
155 assert_eq!(
156 s.read(&mut [0]).unwrap_err().kind(),
157 io::ErrorKind::WouldBlock
158 ); // third is *not* allowed
159 assert_eq!(s.remaining(), Some(0));
160 assert_eq!(
161 s.read(&mut [0]).unwrap_err().kind(),
162 io::ErrorKind::WouldBlock
163 ); // obviously neither is fourth
164 assert_eq!(s.remaining(), Some(0));
165
166 // unthrottle again
167 s.unthrottle();
168 assert_eq!(s.read(&mut [0]).unwrap(), 0);
169 assert_eq!(s.read(&mut [0]).unwrap(), 0);
170 assert_eq!(s.read(&mut [0]).unwrap(), 0);
171 }
172}