hopper/lib.rs
1#![deny(missing_docs, missing_debug_implementations, missing_copy_implementations,
2 trivial_numeric_casts, unstable_features, unused_import_braces, unused_qualifications)]
3//! hopper - an unbounded mpsc with bounded memory
4//!
5//! This module provides a version of the rust standard
6//! [mpsc](https://doc.rust-lang.org/std/sync/mpsc/) that is unbounded but
7//! consumes a bounded amount of memory. This is done by paging elements to disk
8//! at need. The ambition here is to support mpsc style communication without
9//! allocating unbounded amounts of memory or dropping inputs on the floor.
10//!
11//! Hopper is intended to be used in situtations where your system cannot
12//! load-shed inputs and _must_ eventually process them. Hopper does page to
13//! disk but has the same durabilty guarantees as stdlib mpsc between restarts:
14//! none.
15//!
16//! # Inside Baseball
17//!
18//! Hopper's channel looks very much like a named pipe in Unix. You supply a
19//! name to either `channel_2` or `channel_with_max_bytes_3` and you push bytes
20//! in and out. The disk paging adds a complication. The name supplied to the
21//! above two functions is used to create a directory under user-supplied
22//! `data_dir`. This directory gets filled up with monotonically increasing
23//! files.
24//!
25//! The on-disk structure look like so:
26//!
27//! ```text
28//! data-dir/
29//! sink-name0/
30//! 0
31//! 1
32//! sink-name1/
33//! 0
34//! ```
35//!
36//! You'll notice exports of Sender and Receiver in this module's
37//! namespace. These are the structures that back the send and receive side of
38//! the named channel. The Senders--there may be multiples of them--are
39//! responsible for _creating_ "queue files". In the above,
40//! `data-dir/sink-name*/*` are queue files. These files are treated as
41//! append-only logs by the Senders. The Receivers trawl through these logs to
42//! read the data serialized there.
43//!
44//! ## Won't this fill up my disk?
45//!
46//! Maybe! Each Sender has a notion of the maximum bytes that a queue file may
47//! consume--which you can set explicitly when creating a channel with
48//! `channel_with_explicit_capacity`--and once the Sender has gone over that
49//! limit it'll attempt to mark the queue file as read-only and create a new
50//! file. The Receiver is programmed to read its current queue file until it
51//! reaches EOF and, finding the file is read-only, removes the queue file and
52//! moves on to the next.
53//!
54//! If the Receiver is unable to keep up with the Senders then, oops, your disk
55//! will gradually fill up.
56//!
57//! ## What kind of filesystem options will I need?
58//!
59//! Hopper is intended to work on any wacky old filesystem with any options,
60//! even at high concurrency. As common filesystems do not support interleaving
61//! [small atomic
62//! writes](https://stackoverflow.com/questions/32851672/is-overwriting-a-small-file-atomic-on-ext4)
63//! hopper limits itself to one exclusive Sender or one exclusive Receiver at a
64//! time. This potentially limits the concurrency but maintains data
65//! integrity. We are open to improvements in this area.
66extern crate bincode;
67extern crate byteorder;
68extern crate flate2;
69extern crate serde;
70
71mod receiver;
72mod sender;
73mod private;
74mod deque;
75
76pub use self::receiver::Receiver;
77pub use self::sender::Sender;
78use serde::Serialize;
79use serde::de::DeserializeOwned;
80use std::{fs, io, mem, sync};
81use std::sync::atomic::AtomicUsize;
82use std::path::Path;
83
84/// Defines the errors that hopper will bubble up
85///
86/// Hopper's error story is pretty bare right now. Hopper should be given sole
87/// ownership over a directory and assumes such. If you look in the codebase
88/// you'll find that there are a number cases where we bail out--to the
89/// detriment of your program--where we might be able to recover but assume that
90/// if an unkonwn condition _is_ hit it's a result of something foreign tainting
91/// hopper's directory.
92#[derive(Debug)]
93pub enum Error {
94 /// The directory given for use does not exist
95 NoSuchDirectory,
96 /// Stdlib IO Error
97 IoError(io::Error),
98 /// Could not flush Sender
99 NoFlush,
100 /// Could not write element because there is no remaining memory or disk
101 /// space
102 Full,
103}
104
105/// Create a (Sender, Reciever) pair in a like fashion to
106/// [`std::sync::mpsc::channel`](https://doc.rust-lang.org/std/sync/mpsc/fn.channel.html)
107///
108/// This function creates a Sender and Receiver pair with name `name` whose
109/// queue files are stored in `data_dir`. The maximum number of bytes that will
110/// be stored in-memory is 1Mb and the maximum size of a queue file will be
111/// 100Mb. The Sender is clonable.
112///
113/// # Example
114/// ```
115/// extern crate tempdir;
116/// extern crate hopper;
117///
118/// let dir = tempdir::TempDir::new("hopper").unwrap();
119/// let (mut snd, mut rcv) = hopper::channel("example", dir.path()).unwrap();
120///
121/// snd.send(9);
122/// assert_eq!(Some(9), rcv.iter().next());
123/// ```
124pub fn channel<T>(name: &str, data_dir: &Path) -> Result<(Sender<T>, Receiver<T>), Error>
125where
126 T: Serialize + DeserializeOwned,
127{
128 channel_with_explicit_capacity(name, data_dir, 0x100_000, 0x10_000_000, usize::max_value())
129}
130
131/// Create a (Sender, Reciever) pair in a like fashion to
132/// [`std::sync::mpsc::channel`](https://doc.rust-lang.org/std/sync/mpsc/fn.channel.html)
133///
134/// This function creates a Sender and Receiver pair with name `name` whose
135/// queue files are stored in `data_dir`. The maximum number of bytes that will
136/// be stored in-memory are `max(max_memory_bytes, size_of(T))` and the maximum
137/// size of a queue file will be `max(max_disk_bytes, 1Mb)`. `max_disk_files`
138/// sets the total number of concurrent queue files which are allowed to
139/// exist. The total on-disk consumption of hopper will then be
140/// `max(max_memory_bytes, size_of(T)) * max_disk_files`.
141///
142/// The Sender is clonable.
143pub fn channel_with_explicit_capacity<T>(
144 name: &str,
145 data_dir: &Path,
146 max_memory_bytes: usize,
147 max_disk_bytes: usize,
148 max_disk_files: usize,
149) -> Result<(Sender<T>, Receiver<T>), Error>
150where
151 T: Serialize + DeserializeOwned,
152{
153 let root = data_dir.join(name);
154 if !root.is_dir() {
155 match fs::create_dir_all(root.clone()) {
156 Ok(()) => {}
157 Err(e) => {
158 return Err(Error::IoError(e));
159 }
160 }
161 }
162 let sz = mem::size_of::<T>();
163 let max_disk_bytes = ::std::cmp::max(0x100_000, max_disk_bytes);
164 let total_memory_limit: usize = ::std::cmp::max(1, max_memory_bytes / sz);
165 let q: private::Queue<T> = deque::Queue::with_capacity(total_memory_limit);
166 if let Err(e) = private::clear_directory(&root) {
167 return Err(Error::IoError(e));
168 }
169 let max_disk_files = sync::Arc::new(AtomicUsize::new(max_disk_files));
170 let sender = Sender::new(
171 name,
172 &root,
173 max_disk_bytes,
174 q.clone(),
175 sync::Arc::clone(&max_disk_files),
176 )?;
177 let receiver = Receiver::new(&root, q, sync::Arc::clone(&max_disk_files))?;
178 Ok((sender, receiver))
179}
180
181#[cfg(test)]
182mod test {
183 extern crate quickcheck;
184 extern crate tempdir;
185
186 use self::quickcheck::{QuickCheck, TestResult};
187 use super::channel_with_explicit_capacity;
188 use std::{mem, thread};
189
190 #[test]
191 fn ingress_shedding() {
192 if let Ok(dir) = tempdir::TempDir::new("hopper") {
193 if let Ok((mut snd, mut rcv)) = channel_with_explicit_capacity::<u64>(
194 "round_trip_order_preserved", // name
195 dir.path(), // data_dir
196 8, // max_memory_bytes
197 32, // max_disk_bytes
198 2, // max_disk_files
199 ) {
200 let total_elems = 5 * 131082;
201 // Magic constant, depends on compression level and what
202 // not. May need to do a looser assertion.
203 let expected_shed_sends = 383981;
204 let mut shed_sends = 0;
205 let mut sent_values = Vec::new();
206 for i in 0..total_elems {
207 loop {
208 match snd.send(i) {
209 Ok(()) => {
210 sent_values.push(i);
211 break;
212 }
213 Err((r, err)) => {
214 assert_eq!(r, i);
215 match err {
216 super::Error::Full => {
217 shed_sends += 1;
218 break;
219 }
220 _ => {
221 continue;
222 }
223 }
224 }
225 }
226 }
227 }
228 assert_eq!(shed_sends, expected_shed_sends);
229
230 let mut received_elements = 0;
231 // clear space for one more element
232 let mut attempts = 0;
233 loop {
234 match rcv.iter().next() {
235 None => {
236 attempts += 1;
237 assert!(attempts < 10_000);
238 }
239 Some(res) => {
240 received_elements += 1;
241 assert_eq!(res, 0);
242 break;
243 }
244 }
245 }
246 // flush any disk writes
247 loop {
248 if snd.flush().is_ok() {
249 break;
250 }
251 }
252 // pull the rest of the elements
253 let mut attempts = 0;
254 for i in &sent_values[1..] {
255 loop {
256 match rcv.iter().next() {
257 None => {
258 attempts += 1;
259 assert!(attempts < 10_000);
260 }
261 Some(res) => {
262 received_elements += 1;
263 assert_eq!(*i, res);
264 break;
265 }
266 }
267 }
268 }
269 assert_eq!(received_elements, sent_values.len());
270 }
271 }
272 }
273
274 fn round_trip_exp(
275 in_memory_limit: usize,
276 max_bytes: usize,
277 max_disk_files: usize,
278 total_elems: usize,
279 ) -> bool {
280 if let Ok(dir) = tempdir::TempDir::new("hopper") {
281 if let Ok((mut snd, mut rcv)) = channel_with_explicit_capacity(
282 "round_trip_order_preserved",
283 dir.path(),
284 in_memory_limit,
285 max_bytes,
286 max_disk_files,
287 ) {
288 for i in 0..total_elems {
289 loop {
290 if snd.send(i).is_ok() {
291 break;
292 }
293 }
294 }
295 // clear space for one more element
296 let mut attempts = 0;
297 loop {
298 match rcv.iter().next() {
299 None => {
300 attempts += 1;
301 assert!(attempts < 10_000);
302 }
303 Some(res) => {
304 assert_eq!(res, 0);
305 break;
306 }
307 }
308 }
309 // flush any disk writes
310 loop {
311 if snd.flush().is_ok() {
312 break;
313 }
314 }
315 // pull the rest of the elements
316 for i in 1..total_elems {
317 let mut attempts = 0;
318 loop {
319 match rcv.iter().next() {
320 None => {
321 attempts += 1;
322 assert!(attempts < 10_000);
323 }
324 Some(res) => {
325 assert_eq!(res, i);
326 break;
327 }
328 }
329 }
330 }
331 }
332 }
333 true
334 }
335
336 #[test]
337 fn round_trip() {
338 fn inner(in_memory_limit: usize, max_bytes: usize, total_elems: usize) -> TestResult {
339 let sz = mem::size_of::<u64>();
340 if (in_memory_limit / sz) == 0 || (max_bytes / sz) == 0 || total_elems == 0 {
341 return TestResult::discard();
342 }
343 let max_disk_files = usize::max_value();
344 TestResult::from_bool(round_trip_exp(
345 in_memory_limit,
346 max_bytes,
347 max_disk_files,
348 total_elems,
349 ))
350 }
351 QuickCheck::new().quickcheck(inner as fn(usize, usize, usize) -> TestResult);
352 }
353
354 fn multi_thread_concurrent_snd_and_rcv_round_trip_exp(
355 total_senders: usize,
356 in_memory_bytes: usize,
357 disk_bytes: usize,
358 max_disk_files: usize,
359 vals: Vec<u64>,
360 ) -> bool {
361 if let Ok(dir) = tempdir::TempDir::new("hopper") {
362 if let Ok((snd, mut rcv)) = channel_with_explicit_capacity(
363 "tst",
364 dir.path(),
365 in_memory_bytes,
366 disk_bytes,
367 max_disk_files,
368 ) {
369 let chunk_size = vals.len() / total_senders;
370
371 let mut snd_jh = Vec::new();
372 let snd_vals = vals.clone();
373 for chunk in snd_vals.chunks(chunk_size) {
374 let mut thr_snd = snd.clone();
375 let chunk = chunk.to_vec();
376 snd_jh.push(thread::spawn(move || {
377 let mut queued = Vec::new();
378 for mut ev in chunk {
379 loop {
380 match thr_snd.send(ev) {
381 Err(res) => {
382 ev = res.0;
383 }
384 Ok(()) => {
385 break;
386 }
387 }
388 }
389 queued.push(ev);
390 }
391 let mut attempts = 0;
392 loop {
393 if thr_snd.flush().is_ok() {
394 break;
395 }
396 thread::sleep(::std::time::Duration::from_millis(100));
397 attempts += 1;
398 assert!(attempts < 10);
399 }
400 queued
401 }))
402 }
403
404 let expected_total_vals = vals.len();
405 let rcv_jh = thread::spawn(move || {
406 let mut collected = Vec::new();
407 let mut rcv_iter = rcv.iter();
408 while collected.len() < expected_total_vals {
409 let mut attempts = 0;
410 loop {
411 match rcv_iter.next() {
412 None => {
413 attempts += 1;
414 assert!(attempts < 10_000);
415 }
416 Some(res) => {
417 collected.push(res);
418 break;
419 }
420 }
421 }
422 }
423 collected
424 });
425
426 let mut snd_vals: Vec<u64> = Vec::new();
427 for jh in snd_jh {
428 snd_vals.append(&mut jh.join().expect("snd join failed"));
429 }
430 let mut rcv_vals = rcv_jh.join().expect("rcv join failed");
431
432 rcv_vals.sort();
433 snd_vals.sort();
434
435 assert_eq!(rcv_vals, snd_vals);
436 }
437 }
438 true
439 }
440
441 #[test]
442 fn explicit_multi_thread_concurrent_snd_and_rcv_round_trip() {
443 let total_senders = 10;
444 let in_memory_bytes = 50;
445 let disk_bytes = 10;
446 let max_disk_files = 100;
447 let vals = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
448
449 let mut loops = 0;
450 loop {
451 assert!(multi_thread_concurrent_snd_and_rcv_round_trip_exp(
452 total_senders,
453 in_memory_bytes,
454 disk_bytes,
455 max_disk_files,
456 vals.clone(),
457 ));
458 loops += 1;
459 if loops > 2_500 {
460 break;
461 }
462 thread::sleep(::std::time::Duration::from_millis(1));
463 }
464 }
465
466 #[test]
467 fn multi_thread_concurrent_snd_and_rcv_round_trip() {
468 fn inner(
469 total_senders: usize,
470 in_memory_bytes: usize,
471 disk_bytes: usize,
472 max_disk_files: usize,
473 vals: Vec<u64>,
474 ) -> TestResult {
475 let sz = mem::size_of::<u64>();
476 if total_senders == 0 || total_senders > 10 || vals.len() == 0
477 || (vals.len() < total_senders) || (in_memory_bytes / sz) == 0
478 || (disk_bytes / sz) == 0
479 {
480 return TestResult::discard();
481 }
482 TestResult::from_bool(multi_thread_concurrent_snd_and_rcv_round_trip_exp(
483 total_senders,
484 in_memory_bytes,
485 disk_bytes,
486 max_disk_files,
487 vals,
488 ))
489 }
490 QuickCheck::new()
491 .quickcheck(inner as fn(usize, usize, usize, usize, Vec<u64>) -> TestResult);
492 }
493
494 fn single_sender_single_rcv_round_trip_exp(
495 in_memory_bytes: usize,
496 disk_bytes: usize,
497 max_disk_files: usize,
498 total_vals: usize,
499 ) -> bool {
500 if let Ok(dir) = tempdir::TempDir::new("hopper") {
501 if let Ok((mut snd, mut rcv)) = channel_with_explicit_capacity(
502 "tst",
503 dir.path(),
504 in_memory_bytes,
505 disk_bytes,
506 max_disk_files,
507 ) {
508 let builder = thread::Builder::new();
509 if let Ok(snd_jh) = builder.spawn(move || {
510 for i in 0..total_vals {
511 loop {
512 if snd.send(i).is_ok() {
513 break;
514 }
515 }
516 }
517 let mut attempts = 0;
518 loop {
519 if snd.flush().is_ok() {
520 break;
521 }
522 thread::sleep(::std::time::Duration::from_millis(100));
523 attempts += 1;
524 assert!(attempts < 10);
525 }
526 }) {
527 let builder = thread::Builder::new();
528 if let Ok(rcv_jh) = builder.spawn(move || {
529 let mut rcv_iter = rcv.iter();
530 let mut cur = 0;
531 while cur != total_vals {
532 let mut attempts = 0;
533 loop {
534 if let Some(rcvd) = rcv_iter.next() {
535 debug_assert_eq!(
536 cur, rcvd,
537 "FAILED TO GET ALL IN ORDER: {:?}",
538 rcvd,
539 );
540 cur += 1;
541 break;
542 } else {
543 attempts += 1;
544 assert!(attempts < 10_000);
545 }
546 }
547 }
548 }) {
549 snd_jh.join().expect("snd join failed");
550 rcv_jh.join().expect("rcv join failed");
551 }
552 }
553 }
554 }
555 true
556 }
557
558 #[test]
559 fn explicit_single_sender_single_rcv_round_trip() {
560 let mut loops = 0;
561 loop {
562 assert!(single_sender_single_rcv_round_trip_exp(8, 8, 5, 10));
563 loops += 1;
564 if loops > 2_500 {
565 break;
566 }
567 thread::sleep(::std::time::Duration::from_millis(1));
568 }
569 }
570
571 #[test]
572 fn single_sender_single_rcv_round_trip() {
573 // Similar to the multi sender test except now with a single sender we
574 // can guarantee order.
575 fn inner(
576 in_memory_bytes: usize,
577 disk_bytes: usize,
578 max_disk_files: usize,
579 total_vals: usize,
580 ) -> TestResult {
581 let sz = mem::size_of::<u64>();
582 if total_vals == 0 || (in_memory_bytes / sz) == 0 || (disk_bytes / sz) == 0 {
583 return TestResult::discard();
584 }
585 TestResult::from_bool(single_sender_single_rcv_round_trip_exp(
586 in_memory_bytes,
587 disk_bytes,
588 max_disk_files,
589 total_vals,
590 ))
591 }
592 QuickCheck::new().quickcheck(inner as fn(usize, usize, usize, usize) -> TestResult);
593 }
594
595}