hopper/
lib.rs

1#![deny(missing_docs, missing_debug_implementations, missing_copy_implementations,
2        trivial_numeric_casts, unstable_features, unused_import_braces, unused_qualifications)]
3//! hopper - an unbounded mpsc with bounded memory
4//!
5//! This module provides a version of the rust standard
6//! [mpsc](https://doc.rust-lang.org/std/sync/mpsc/) that is unbounded but
7//! consumes a bounded amount of memory. This is done by paging elements to disk
8//! at need. The ambition here is to support mpsc style communication without
9//! allocating unbounded amounts of memory or dropping inputs on the floor.
10//!
11//! Hopper is intended to be used in situtations where your system cannot
12//! load-shed inputs and _must_ eventually process them. Hopper does page to
13//! disk but has the same durabilty guarantees as stdlib mpsc between restarts:
14//! none.
15//!
16//! # Inside Baseball
17//!
18//! Hopper's channel looks very much like a named pipe in Unix. You supply a
19//! name to either `channel_2` or `channel_with_max_bytes_3` and you push bytes
20//! in and out. The disk paging adds a complication. The name supplied to the
21//! above two functions is used to create a directory under user-supplied
22//! `data_dir`. This directory gets filled up with monotonically increasing
23//! files.
24//!
25//! The on-disk structure look like so:
26//!
27//! ```text
28//! data-dir/
29//!    sink-name0/
30//!       0
31//!       1
32//!    sink-name1/
33//!       0
34//! ```
35//!
36//! You'll notice exports of Sender and Receiver in this module's
37//! namespace. These are the structures that back the send and receive side of
38//! the named channel. The Senders--there may be multiples of them--are
39//! responsible for _creating_ "queue files". In the above,
40//! `data-dir/sink-name*/*` are queue files. These files are treated as
41//! append-only logs by the Senders. The Receivers trawl through these logs to
42//! read the data serialized there.
43//!
44//! ## Won't this fill up my disk?
45//!
46//! Maybe! Each Sender has a notion of the maximum bytes that a queue file may
47//! consume--which you can set explicitly when creating a channel with
48//! `channel_with_explicit_capacity`--and once the Sender has gone over that
49//! limit it'll attempt to mark the queue file as read-only and create a new
50//! file. The Receiver is programmed to read its current queue file until it
51//! reaches EOF and, finding the file is read-only, removes the queue file and
52//! moves on to the next.
53//!
54//! If the Receiver is unable to keep up with the Senders then, oops, your disk
55//! will gradually fill up.
56//!
57//! ## What kind of filesystem options will I need?
58//!
59//! Hopper is intended to work on any wacky old filesystem with any options,
60//! even at high concurrency. As common filesystems do not support interleaving
61//! [small atomic
62//! writes](https://stackoverflow.com/questions/32851672/is-overwriting-a-small-file-atomic-on-ext4)
63//! hopper limits itself to one exclusive Sender or one exclusive Receiver at a
64//! time. This potentially limits the concurrency but maintains data
65//! integrity. We are open to improvements in this area.
66extern crate bincode;
67extern crate byteorder;
68extern crate flate2;
69extern crate serde;
70
71mod receiver;
72mod sender;
73mod private;
74mod deque;
75
76pub use self::receiver::Receiver;
77pub use self::sender::Sender;
78use serde::Serialize;
79use serde::de::DeserializeOwned;
80use std::{fs, io, mem, sync};
81use std::sync::atomic::AtomicUsize;
82use std::path::Path;
83
84/// Defines the errors that hopper will bubble up
85///
86/// Hopper's error story is pretty bare right now. Hopper should be given sole
87/// ownership over a directory and assumes such. If you look in the codebase
88/// you'll find that there are a number cases where we bail out--to the
89/// detriment of your program--where we might be able to recover but assume that
90/// if an unkonwn condition _is_ hit it's a result of something foreign tainting
91/// hopper's directory.
92#[derive(Debug)]
93pub enum Error {
94    /// The directory given for use does not exist
95    NoSuchDirectory,
96    /// Stdlib IO Error
97    IoError(io::Error),
98    /// Could not flush Sender
99    NoFlush,
100    /// Could not write element because there is no remaining memory or disk
101    /// space
102    Full,
103}
104
105/// Create a (Sender, Reciever) pair in a like fashion to
106/// [`std::sync::mpsc::channel`](https://doc.rust-lang.org/std/sync/mpsc/fn.channel.html)
107///
108/// This function creates a Sender and Receiver pair with name `name` whose
109/// queue files are stored in `data_dir`. The maximum number of bytes that will
110/// be stored in-memory is 1Mb and the maximum size of a queue file will be
111/// 100Mb. The Sender is clonable.
112///
113/// # Example
114/// ```
115/// extern crate tempdir;
116/// extern crate hopper;
117///
118/// let dir = tempdir::TempDir::new("hopper").unwrap();
119/// let (mut snd, mut rcv) = hopper::channel("example", dir.path()).unwrap();
120///
121/// snd.send(9);
122/// assert_eq!(Some(9), rcv.iter().next());
123/// ```
124pub fn channel<T>(name: &str, data_dir: &Path) -> Result<(Sender<T>, Receiver<T>), Error>
125where
126    T: Serialize + DeserializeOwned,
127{
128    channel_with_explicit_capacity(name, data_dir, 0x100_000, 0x10_000_000, usize::max_value())
129}
130
131/// Create a (Sender, Reciever) pair in a like fashion to
132/// [`std::sync::mpsc::channel`](https://doc.rust-lang.org/std/sync/mpsc/fn.channel.html)
133///
134/// This function creates a Sender and Receiver pair with name `name` whose
135/// queue files are stored in `data_dir`. The maximum number of bytes that will
136/// be stored in-memory are `max(max_memory_bytes, size_of(T))` and the maximum
137/// size of a queue file will be `max(max_disk_bytes, 1Mb)`. `max_disk_files`
138/// sets the total number of concurrent queue files which are allowed to
139/// exist. The total on-disk consumption of hopper will then be
140/// `max(max_memory_bytes, size_of(T)) * max_disk_files`.
141///
142/// The Sender is clonable.
143pub fn channel_with_explicit_capacity<T>(
144    name: &str,
145    data_dir: &Path,
146    max_memory_bytes: usize,
147    max_disk_bytes: usize,
148    max_disk_files: usize,
149) -> Result<(Sender<T>, Receiver<T>), Error>
150where
151    T: Serialize + DeserializeOwned,
152{
153    let root = data_dir.join(name);
154    if !root.is_dir() {
155        match fs::create_dir_all(root.clone()) {
156            Ok(()) => {}
157            Err(e) => {
158                return Err(Error::IoError(e));
159            }
160        }
161    }
162    let sz = mem::size_of::<T>();
163    let max_disk_bytes = ::std::cmp::max(0x100_000, max_disk_bytes);
164    let total_memory_limit: usize = ::std::cmp::max(1, max_memory_bytes / sz);
165    let q: private::Queue<T> = deque::Queue::with_capacity(total_memory_limit);
166    if let Err(e) = private::clear_directory(&root) {
167        return Err(Error::IoError(e));
168    }
169    let max_disk_files = sync::Arc::new(AtomicUsize::new(max_disk_files));
170    let sender = Sender::new(
171        name,
172        &root,
173        max_disk_bytes,
174        q.clone(),
175        sync::Arc::clone(&max_disk_files),
176    )?;
177    let receiver = Receiver::new(&root, q, sync::Arc::clone(&max_disk_files))?;
178    Ok((sender, receiver))
179}
180
181#[cfg(test)]
182mod test {
183    extern crate quickcheck;
184    extern crate tempdir;
185
186    use self::quickcheck::{QuickCheck, TestResult};
187    use super::channel_with_explicit_capacity;
188    use std::{mem, thread};
189
190    #[test]
191    fn ingress_shedding() {
192        if let Ok(dir) = tempdir::TempDir::new("hopper") {
193            if let Ok((mut snd, mut rcv)) = channel_with_explicit_capacity::<u64>(
194                "round_trip_order_preserved", // name
195                dir.path(),                   // data_dir
196                8,                            // max_memory_bytes
197                32,                           // max_disk_bytes
198                2,                            // max_disk_files
199            ) {
200                let total_elems = 5 * 131082;
201                // Magic constant, depends on compression level and what
202                // not. May need to do a looser assertion.
203                let expected_shed_sends = 383981;
204                let mut shed_sends = 0;
205                let mut sent_values = Vec::new();
206                for i in 0..total_elems {
207                    loop {
208                        match snd.send(i) {
209                            Ok(()) => {
210                                sent_values.push(i);
211                                break;
212                            }
213                            Err((r, err)) => {
214                                assert_eq!(r, i);
215                                match err {
216                                    super::Error::Full => {
217                                        shed_sends += 1;
218                                        break;
219                                    }
220                                    _ => {
221                                        continue;
222                                    }
223                                }
224                            }
225                        }
226                    }
227                }
228                assert_eq!(shed_sends, expected_shed_sends);
229
230                let mut received_elements = 0;
231                // clear space for one more element
232                let mut attempts = 0;
233                loop {
234                    match rcv.iter().next() {
235                        None => {
236                            attempts += 1;
237                            assert!(attempts < 10_000);
238                        }
239                        Some(res) => {
240                            received_elements += 1;
241                            assert_eq!(res, 0);
242                            break;
243                        }
244                    }
245                }
246                // flush any disk writes
247                loop {
248                    if snd.flush().is_ok() {
249                        break;
250                    }
251                }
252                // pull the rest of the elements
253                let mut attempts = 0;
254                for i in &sent_values[1..] {
255                    loop {
256                        match rcv.iter().next() {
257                            None => {
258                                attempts += 1;
259                                assert!(attempts < 10_000);
260                            }
261                            Some(res) => {
262                                received_elements += 1;
263                                assert_eq!(*i, res);
264                                break;
265                            }
266                        }
267                    }
268                }
269                assert_eq!(received_elements, sent_values.len());
270            }
271        }
272    }
273
274    fn round_trip_exp(
275        in_memory_limit: usize,
276        max_bytes: usize,
277        max_disk_files: usize,
278        total_elems: usize,
279    ) -> bool {
280        if let Ok(dir) = tempdir::TempDir::new("hopper") {
281            if let Ok((mut snd, mut rcv)) = channel_with_explicit_capacity(
282                "round_trip_order_preserved",
283                dir.path(),
284                in_memory_limit,
285                max_bytes,
286                max_disk_files,
287            ) {
288                for i in 0..total_elems {
289                    loop {
290                        if snd.send(i).is_ok() {
291                            break;
292                        }
293                    }
294                }
295                // clear space for one more element
296                let mut attempts = 0;
297                loop {
298                    match rcv.iter().next() {
299                        None => {
300                            attempts += 1;
301                            assert!(attempts < 10_000);
302                        }
303                        Some(res) => {
304                            assert_eq!(res, 0);
305                            break;
306                        }
307                    }
308                }
309                // flush any disk writes
310                loop {
311                    if snd.flush().is_ok() {
312                        break;
313                    }
314                }
315                // pull the rest of the elements
316                for i in 1..total_elems {
317                    let mut attempts = 0;
318                    loop {
319                        match rcv.iter().next() {
320                            None => {
321                                attempts += 1;
322                                assert!(attempts < 10_000);
323                            }
324                            Some(res) => {
325                                assert_eq!(res, i);
326                                break;
327                            }
328                        }
329                    }
330                }
331            }
332        }
333        true
334    }
335
336    #[test]
337    fn round_trip() {
338        fn inner(in_memory_limit: usize, max_bytes: usize, total_elems: usize) -> TestResult {
339            let sz = mem::size_of::<u64>();
340            if (in_memory_limit / sz) == 0 || (max_bytes / sz) == 0 || total_elems == 0 {
341                return TestResult::discard();
342            }
343            let max_disk_files = usize::max_value();
344            TestResult::from_bool(round_trip_exp(
345                in_memory_limit,
346                max_bytes,
347                max_disk_files,
348                total_elems,
349            ))
350        }
351        QuickCheck::new().quickcheck(inner as fn(usize, usize, usize) -> TestResult);
352    }
353
354    fn multi_thread_concurrent_snd_and_rcv_round_trip_exp(
355        total_senders: usize,
356        in_memory_bytes: usize,
357        disk_bytes: usize,
358        max_disk_files: usize,
359        vals: Vec<u64>,
360    ) -> bool {
361        if let Ok(dir) = tempdir::TempDir::new("hopper") {
362            if let Ok((snd, mut rcv)) = channel_with_explicit_capacity(
363                "tst",
364                dir.path(),
365                in_memory_bytes,
366                disk_bytes,
367                max_disk_files,
368            ) {
369                let chunk_size = vals.len() / total_senders;
370
371                let mut snd_jh = Vec::new();
372                let snd_vals = vals.clone();
373                for chunk in snd_vals.chunks(chunk_size) {
374                    let mut thr_snd = snd.clone();
375                    let chunk = chunk.to_vec();
376                    snd_jh.push(thread::spawn(move || {
377                        let mut queued = Vec::new();
378                        for mut ev in chunk {
379                            loop {
380                                match thr_snd.send(ev) {
381                                    Err(res) => {
382                                        ev = res.0;
383                                    }
384                                    Ok(()) => {
385                                        break;
386                                    }
387                                }
388                            }
389                            queued.push(ev);
390                        }
391                        let mut attempts = 0;
392                        loop {
393                            if thr_snd.flush().is_ok() {
394                                break;
395                            }
396                            thread::sleep(::std::time::Duration::from_millis(100));
397                            attempts += 1;
398                            assert!(attempts < 10);
399                        }
400                        queued
401                    }))
402                }
403
404                let expected_total_vals = vals.len();
405                let rcv_jh = thread::spawn(move || {
406                    let mut collected = Vec::new();
407                    let mut rcv_iter = rcv.iter();
408                    while collected.len() < expected_total_vals {
409                        let mut attempts = 0;
410                        loop {
411                            match rcv_iter.next() {
412                                None => {
413                                    attempts += 1;
414                                    assert!(attempts < 10_000);
415                                }
416                                Some(res) => {
417                                    collected.push(res);
418                                    break;
419                                }
420                            }
421                        }
422                    }
423                    collected
424                });
425
426                let mut snd_vals: Vec<u64> = Vec::new();
427                for jh in snd_jh {
428                    snd_vals.append(&mut jh.join().expect("snd join failed"));
429                }
430                let mut rcv_vals = rcv_jh.join().expect("rcv join failed");
431
432                rcv_vals.sort();
433                snd_vals.sort();
434
435                assert_eq!(rcv_vals, snd_vals);
436            }
437        }
438        true
439    }
440
441    #[test]
442    fn explicit_multi_thread_concurrent_snd_and_rcv_round_trip() {
443        let total_senders = 10;
444        let in_memory_bytes = 50;
445        let disk_bytes = 10;
446        let max_disk_files = 100;
447        let vals = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
448
449        let mut loops = 0;
450        loop {
451            assert!(multi_thread_concurrent_snd_and_rcv_round_trip_exp(
452                total_senders,
453                in_memory_bytes,
454                disk_bytes,
455                max_disk_files,
456                vals.clone(),
457            ));
458            loops += 1;
459            if loops > 2_500 {
460                break;
461            }
462            thread::sleep(::std::time::Duration::from_millis(1));
463        }
464    }
465
466    #[test]
467    fn multi_thread_concurrent_snd_and_rcv_round_trip() {
468        fn inner(
469            total_senders: usize,
470            in_memory_bytes: usize,
471            disk_bytes: usize,
472            max_disk_files: usize,
473            vals: Vec<u64>,
474        ) -> TestResult {
475            let sz = mem::size_of::<u64>();
476            if total_senders == 0 || total_senders > 10 || vals.len() == 0
477                || (vals.len() < total_senders) || (in_memory_bytes / sz) == 0
478                || (disk_bytes / sz) == 0
479            {
480                return TestResult::discard();
481            }
482            TestResult::from_bool(multi_thread_concurrent_snd_and_rcv_round_trip_exp(
483                total_senders,
484                in_memory_bytes,
485                disk_bytes,
486                max_disk_files,
487                vals,
488            ))
489        }
490        QuickCheck::new()
491            .quickcheck(inner as fn(usize, usize, usize, usize, Vec<u64>) -> TestResult);
492    }
493
494    fn single_sender_single_rcv_round_trip_exp(
495        in_memory_bytes: usize,
496        disk_bytes: usize,
497        max_disk_files: usize,
498        total_vals: usize,
499    ) -> bool {
500        if let Ok(dir) = tempdir::TempDir::new("hopper") {
501            if let Ok((mut snd, mut rcv)) = channel_with_explicit_capacity(
502                "tst",
503                dir.path(),
504                in_memory_bytes,
505                disk_bytes,
506                max_disk_files,
507            ) {
508                let builder = thread::Builder::new();
509                if let Ok(snd_jh) = builder.spawn(move || {
510                    for i in 0..total_vals {
511                        loop {
512                            if snd.send(i).is_ok() {
513                                break;
514                            }
515                        }
516                    }
517                    let mut attempts = 0;
518                    loop {
519                        if snd.flush().is_ok() {
520                            break;
521                        }
522                        thread::sleep(::std::time::Duration::from_millis(100));
523                        attempts += 1;
524                        assert!(attempts < 10);
525                    }
526                }) {
527                    let builder = thread::Builder::new();
528                    if let Ok(rcv_jh) = builder.spawn(move || {
529                        let mut rcv_iter = rcv.iter();
530                        let mut cur = 0;
531                        while cur != total_vals {
532                            let mut attempts = 0;
533                            loop {
534                                if let Some(rcvd) = rcv_iter.next() {
535                                    debug_assert_eq!(
536                                        cur, rcvd,
537                                        "FAILED TO GET ALL IN ORDER: {:?}",
538                                        rcvd,
539                                    );
540                                    cur += 1;
541                                    break;
542                                } else {
543                                    attempts += 1;
544                                    assert!(attempts < 10_000);
545                                }
546                            }
547                        }
548                    }) {
549                        snd_jh.join().expect("snd join failed");
550                        rcv_jh.join().expect("rcv join failed");
551                    }
552                }
553            }
554        }
555        true
556    }
557
558    #[test]
559    fn explicit_single_sender_single_rcv_round_trip() {
560        let mut loops = 0;
561        loop {
562            assert!(single_sender_single_rcv_round_trip_exp(8, 8, 5, 10));
563            loops += 1;
564            if loops > 2_500 {
565                break;
566            }
567            thread::sleep(::std::time::Duration::from_millis(1));
568        }
569    }
570
571    #[test]
572    fn single_sender_single_rcv_round_trip() {
573        // Similar to the multi sender test except now with a single sender we
574        // can guarantee order.
575        fn inner(
576            in_memory_bytes: usize,
577            disk_bytes: usize,
578            max_disk_files: usize,
579            total_vals: usize,
580        ) -> TestResult {
581            let sz = mem::size_of::<u64>();
582            if total_vals == 0 || (in_memory_bytes / sz) == 0 || (disk_bytes / sz) == 0 {
583                return TestResult::discard();
584            }
585            TestResult::from_bool(single_sender_single_rcv_round_trip_exp(
586                in_memory_bytes,
587                disk_bytes,
588                max_disk_files,
589                total_vals,
590            ))
591        }
592        QuickCheck::new().quickcheck(inner as fn(usize, usize, usize, usize) -> TestResult);
593    }
594
595}