1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
#![deny(missing_docs, missing_debug_implementations, missing_copy_implementations,
        trivial_numeric_casts, unstable_features, unused_import_braces, unused_qualifications)]
//! hopper - an unbounded mpsc with bounded memory
//!
//! This module provides a version of the rust standard
//! [mpsc](https://doc.rust-lang.org/std/sync/mpsc/) that is unbounded but
//! consumes a bounded amount of memory. This is done by paging elements to disk
//! at need. The ambition here is to support mpsc style communication without
//! allocating unbounded amounts of memory or dropping inputs on the floor.
//!
//! Hopper is intended to be used in situtations where your system cannot
//! load-shed inputs and _must_ eventually process them. Hopper does page to
//! disk but has the same durabilty guarantees as stdlib mpsc between restarts:
//! none.
//!
//! # Inside Baseball
//!
//! Hopper's channel looks very much like a named pipe in Unix. You supply a
//! name to either `channel_2` or `channel_with_max_bytes_3` and you push bytes
//! in and out. The disk paging adds a complication. The name supplied to the
//! above two functions is used to create a directory under user-supplied
//! `data_dir`. This directory gets filled up with monotonically increasing
//! files.
//!
//! The on-disk structure look like so:
//!
//! ```text
//! data-dir/
//!    sink-name0/
//!       0
//!       1
//!    sink-name1/
//!       0
//! ```
//!
//! You'll notice exports of Sender and Receiver in this module's
//! namespace. These are the structures that back the send and receive side of
//! the named channel. The Senders--there may be multiples of them--are
//! responsible for _creating_ "queue files". In the above,
//! `data-dir/sink-name*/*` are queue files. These files are treated as
//! append-only logs by the Senders. The Receivers trawl through these logs to
//! read the data serialized there.
//!
//! ## Won't this fill up my disk?
//!
//! Maybe! Each Sender has a notion of the maximum bytes that a queue file may
//! consume--which you can set explicitly when creating a channel with
//! `channel_with_explicit_capacity`--and once the Sender has gone over that
//! limit it'll attempt to mark the queue file as read-only and create a new
//! file. The Receiver is programmed to read its current queue file until it
//! reaches EOF and, finding the file is read-only, removes the queue file and
//! moves on to the next.
//!
//! If the Receiver is unable to keep up with the Senders then, oops, your disk
//! will gradually fill up.
//!
//! ## What kind of filesystem options will I need?
//!
//! Hopper is intended to work on any wacky old filesystem with any options,
//! even at high concurrency. As common filesystems do not support interleaving
//! [small atomic
//! writes](https://stackoverflow.com/questions/32851672/is-overwriting-a-small-file-atomic-on-ext4)
//! hopper limits itself to one exclusive Sender or one exclusive Receiver at a
//! time. This potentially limits the concurrency but maintains data
//! integrity. We are open to improvements in this area.
extern crate bincode;
extern crate byteorder;
extern crate flate2;
extern crate serde;

mod receiver;
mod sender;
mod private;
mod deque;

pub use self::receiver::Receiver;
pub use self::sender::Sender;
use serde::Serialize;
use serde::de::DeserializeOwned;
use std::{fs, io, mem, sync};
use std::sync::atomic::AtomicUsize;
use std::path::Path;

/// Defines the errors that hopper will bubble up
///
/// Hopper's error story is pretty bare right now. Hopper should be given sole
/// ownership over a directory and assumes such. If you look in the codebase
/// you'll find that there are a number cases where we bail out--to the
/// detriment of your program--where we might be able to recover but assume that
/// if an unkonwn condition _is_ hit it's a result of something foreign tainting
/// hopper's directory.
#[derive(Debug)]
pub enum Error {
    /// The directory given for use does not exist
    NoSuchDirectory,
    /// Stdlib IO Error
    IoError(io::Error),
    /// Could not flush Sender
    NoFlush,
    /// Could not write element because there is no remaining memory or disk
    /// space
    Full,
}

/// Create a (Sender, Reciever) pair in a like fashion to
/// [`std::sync::mpsc::channel`](https://doc.rust-lang.org/std/sync/mpsc/fn.channel.html)
///
/// This function creates a Sender and Receiver pair with name `name` whose
/// queue files are stored in `data_dir`. The maximum number of bytes that will
/// be stored in-memory is 1Mb and the maximum size of a queue file will be
/// 100Mb. The Sender is clonable.
///
/// # Example
/// ```
/// extern crate tempdir;
/// extern crate hopper;
///
/// let dir = tempdir::TempDir::new("hopper").unwrap();
/// let (mut snd, mut rcv) = hopper::channel("example", dir.path()).unwrap();
///
/// snd.send(9);
/// assert_eq!(Some(9), rcv.iter().next());
/// ```
pub fn channel<T>(name: &str, data_dir: &Path) -> Result<(Sender<T>, Receiver<T>), Error>
where
    T: Serialize + DeserializeOwned,
{
    channel_with_explicit_capacity(name, data_dir, 0x100_000, 0x10_000_000, usize::max_value())
}

/// Create a (Sender, Reciever) pair in a like fashion to
/// [`std::sync::mpsc::channel`](https://doc.rust-lang.org/std/sync/mpsc/fn.channel.html)
///
/// This function creates a Sender and Receiver pair with name `name` whose
/// queue files are stored in `data_dir`. The maximum number of bytes that will
/// be stored in-memory are `max(max_memory_bytes, size_of(T))` and the maximum
/// size of a queue file will be `max(max_disk_bytes, 1Mb)`. `max_disk_files`
/// sets the total number of concurrent queue files which are allowed to
/// exist. The total on-disk consumption of hopper will then be
/// `max(max_memory_bytes, size_of(T)) * max_disk_files`.
///
/// The Sender is clonable.
pub fn channel_with_explicit_capacity<T>(
    name: &str,
    data_dir: &Path,
    max_memory_bytes: usize,
    max_disk_bytes: usize,
    max_disk_files: usize,
) -> Result<(Sender<T>, Receiver<T>), Error>
where
    T: Serialize + DeserializeOwned,
{
    let root = data_dir.join(name);
    if !root.is_dir() {
        match fs::create_dir_all(root.clone()) {
            Ok(()) => {}
            Err(e) => {
                return Err(Error::IoError(e));
            }
        }
    }
    let sz = mem::size_of::<T>();
    let max_disk_bytes = ::std::cmp::max(0x100_000, max_disk_bytes);
    let total_memory_limit: usize = ::std::cmp::max(1, max_memory_bytes / sz);
    let q: private::Queue<T> = deque::Queue::with_capacity(total_memory_limit);
    if let Err(e) = private::clear_directory(&root) {
        return Err(Error::IoError(e));
    }
    let max_disk_files = sync::Arc::new(AtomicUsize::new(max_disk_files));
    let sender = Sender::new(
        name,
        &root,
        max_disk_bytes,
        q.clone(),
        sync::Arc::clone(&max_disk_files),
    )?;
    let receiver = Receiver::new(&root, q, sync::Arc::clone(&max_disk_files))?;
    Ok((sender, receiver))
}

#[cfg(test)]
mod test {
    extern crate quickcheck;
    extern crate tempdir;

    use self::quickcheck::{QuickCheck, TestResult};
    use super::channel_with_explicit_capacity;
    use std::{mem, thread};

    #[test]
    fn ingress_shedding() {
        if let Ok(dir) = tempdir::TempDir::new("hopper") {
            if let Ok((mut snd, mut rcv)) = channel_with_explicit_capacity::<u64>(
                "round_trip_order_preserved", // name
                dir.path(),                   // data_dir
                8,                            // max_memory_bytes
                32,                           // max_disk_bytes
                2,                            // max_disk_files
            ) {
                let total_elems = 5 * 131082;
                // Magic constant, depends on compression level and what
                // not. May need to do a looser assertion.
                let expected_shed_sends = 383981;
                let mut shed_sends = 0;
                let mut sent_values = Vec::new();
                for i in 0..total_elems {
                    loop {
                        match snd.send(i) {
                            Ok(()) => {
                                sent_values.push(i);
                                break;
                            }
                            Err((r, err)) => {
                                assert_eq!(r, i);
                                match err {
                                    super::Error::Full => {
                                        shed_sends += 1;
                                        break;
                                    }
                                    _ => {
                                        continue;
                                    }
                                }
                            }
                        }
                    }
                }
                assert_eq!(shed_sends, expected_shed_sends);

                let mut received_elements = 0;
                // clear space for one more element
                let mut attempts = 0;
                loop {
                    match rcv.iter().next() {
                        None => {
                            attempts += 1;
                            assert!(attempts < 10_000);
                        }
                        Some(res) => {
                            received_elements += 1;
                            assert_eq!(res, 0);
                            break;
                        }
                    }
                }
                // flush any disk writes
                loop {
                    if snd.flush().is_ok() {
                        break;
                    }
                }
                // pull the rest of the elements
                let mut attempts = 0;
                for i in &sent_values[1..] {
                    loop {
                        match rcv.iter().next() {
                            None => {
                                attempts += 1;
                                assert!(attempts < 10_000);
                            }
                            Some(res) => {
                                received_elements += 1;
                                assert_eq!(*i, res);
                                break;
                            }
                        }
                    }
                }
                assert_eq!(received_elements, sent_values.len());
            }
        }
    }

    fn round_trip_exp(
        in_memory_limit: usize,
        max_bytes: usize,
        max_disk_files: usize,
        total_elems: usize,
    ) -> bool {
        if let Ok(dir) = tempdir::TempDir::new("hopper") {
            if let Ok((mut snd, mut rcv)) = channel_with_explicit_capacity(
                "round_trip_order_preserved",
                dir.path(),
                in_memory_limit,
                max_bytes,
                max_disk_files,
            ) {
                for i in 0..total_elems {
                    loop {
                        if snd.send(i).is_ok() {
                            break;
                        }
                    }
                }
                // clear space for one more element
                let mut attempts = 0;
                loop {
                    match rcv.iter().next() {
                        None => {
                            attempts += 1;
                            assert!(attempts < 10_000);
                        }
                        Some(res) => {
                            assert_eq!(res, 0);
                            break;
                        }
                    }
                }
                // flush any disk writes
                loop {
                    if snd.flush().is_ok() {
                        break;
                    }
                }
                // pull the rest of the elements
                for i in 1..total_elems {
                    let mut attempts = 0;
                    loop {
                        match rcv.iter().next() {
                            None => {
                                attempts += 1;
                                assert!(attempts < 10_000);
                            }
                            Some(res) => {
                                assert_eq!(res, i);
                                break;
                            }
                        }
                    }
                }
            }
        }
        true
    }

    #[test]
    fn round_trip() {
        fn inner(in_memory_limit: usize, max_bytes: usize, total_elems: usize) -> TestResult {
            let sz = mem::size_of::<u64>();
            if (in_memory_limit / sz) == 0 || (max_bytes / sz) == 0 || total_elems == 0 {
                return TestResult::discard();
            }
            let max_disk_files = usize::max_value();
            TestResult::from_bool(round_trip_exp(
                in_memory_limit,
                max_bytes,
                max_disk_files,
                total_elems,
            ))
        }
        QuickCheck::new().quickcheck(inner as fn(usize, usize, usize) -> TestResult);
    }

    fn multi_thread_concurrent_snd_and_rcv_round_trip_exp(
        total_senders: usize,
        in_memory_bytes: usize,
        disk_bytes: usize,
        max_disk_files: usize,
        vals: Vec<u64>,
    ) -> bool {
        if let Ok(dir) = tempdir::TempDir::new("hopper") {
            if let Ok((snd, mut rcv)) = channel_with_explicit_capacity(
                "tst",
                dir.path(),
                in_memory_bytes,
                disk_bytes,
                max_disk_files,
            ) {
                let chunk_size = vals.len() / total_senders;

                let mut snd_jh = Vec::new();
                let snd_vals = vals.clone();
                for chunk in snd_vals.chunks(chunk_size) {
                    let mut thr_snd = snd.clone();
                    let chunk = chunk.to_vec();
                    snd_jh.push(thread::spawn(move || {
                        let mut queued = Vec::new();
                        for mut ev in chunk {
                            loop {
                                match thr_snd.send(ev) {
                                    Err(res) => {
                                        ev = res.0;
                                    }
                                    Ok(()) => {
                                        break;
                                    }
                                }
                            }
                            queued.push(ev);
                        }
                        let mut attempts = 0;
                        loop {
                            if thr_snd.flush().is_ok() {
                                break;
                            }
                            thread::sleep(::std::time::Duration::from_millis(100));
                            attempts += 1;
                            assert!(attempts < 10);
                        }
                        queued
                    }))
                }

                let expected_total_vals = vals.len();
                let rcv_jh = thread::spawn(move || {
                    let mut collected = Vec::new();
                    let mut rcv_iter = rcv.iter();
                    while collected.len() < expected_total_vals {
                        let mut attempts = 0;
                        loop {
                            match rcv_iter.next() {
                                None => {
                                    attempts += 1;
                                    assert!(attempts < 10_000);
                                }
                                Some(res) => {
                                    collected.push(res);
                                    break;
                                }
                            }
                        }
                    }
                    collected
                });

                let mut snd_vals: Vec<u64> = Vec::new();
                for jh in snd_jh {
                    snd_vals.append(&mut jh.join().expect("snd join failed"));
                }
                let mut rcv_vals = rcv_jh.join().expect("rcv join failed");

                rcv_vals.sort();
                snd_vals.sort();

                assert_eq!(rcv_vals, snd_vals);
            }
        }
        true
    }

    #[test]
    fn explicit_multi_thread_concurrent_snd_and_rcv_round_trip() {
        let total_senders = 10;
        let in_memory_bytes = 50;
        let disk_bytes = 10;
        let max_disk_files = 100;
        let vals = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9];

        let mut loops = 0;
        loop {
            assert!(multi_thread_concurrent_snd_and_rcv_round_trip_exp(
                total_senders,
                in_memory_bytes,
                disk_bytes,
                max_disk_files,
                vals.clone(),
            ));
            loops += 1;
            if loops > 2_500 {
                break;
            }
            thread::sleep(::std::time::Duration::from_millis(1));
        }
    }

    #[test]
    fn multi_thread_concurrent_snd_and_rcv_round_trip() {
        fn inner(
            total_senders: usize,
            in_memory_bytes: usize,
            disk_bytes: usize,
            max_disk_files: usize,
            vals: Vec<u64>,
        ) -> TestResult {
            let sz = mem::size_of::<u64>();
            if total_senders == 0 || total_senders > 10 || vals.len() == 0
                || (vals.len() < total_senders) || (in_memory_bytes / sz) == 0
                || (disk_bytes / sz) == 0
            {
                return TestResult::discard();
            }
            TestResult::from_bool(multi_thread_concurrent_snd_and_rcv_round_trip_exp(
                total_senders,
                in_memory_bytes,
                disk_bytes,
                max_disk_files,
                vals,
            ))
        }
        QuickCheck::new()
            .quickcheck(inner as fn(usize, usize, usize, usize, Vec<u64>) -> TestResult);
    }

    fn single_sender_single_rcv_round_trip_exp(
        in_memory_bytes: usize,
        disk_bytes: usize,
        max_disk_files: usize,
        total_vals: usize,
    ) -> bool {
        if let Ok(dir) = tempdir::TempDir::new("hopper") {
            if let Ok((mut snd, mut rcv)) = channel_with_explicit_capacity(
                "tst",
                dir.path(),
                in_memory_bytes,
                disk_bytes,
                max_disk_files,
            ) {
                let builder = thread::Builder::new();
                if let Ok(snd_jh) = builder.spawn(move || {
                    for i in 0..total_vals {
                        loop {
                            if snd.send(i).is_ok() {
                                break;
                            }
                        }
                    }
                    let mut attempts = 0;
                    loop {
                        if snd.flush().is_ok() {
                            break;
                        }
                        thread::sleep(::std::time::Duration::from_millis(100));
                        attempts += 1;
                        assert!(attempts < 10);
                    }
                }) {
                    let builder = thread::Builder::new();
                    if let Ok(rcv_jh) = builder.spawn(move || {
                        let mut rcv_iter = rcv.iter();
                        let mut cur = 0;
                        while cur != total_vals {
                            let mut attempts = 0;
                            loop {
                                if let Some(rcvd) = rcv_iter.next() {
                                    debug_assert_eq!(
                                        cur, rcvd,
                                        "FAILED TO GET ALL IN ORDER: {:?}",
                                        rcvd,
                                    );
                                    cur += 1;
                                    break;
                                } else {
                                    attempts += 1;
                                    assert!(attempts < 10_000);
                                }
                            }
                        }
                    }) {
                        snd_jh.join().expect("snd join failed");
                        rcv_jh.join().expect("rcv join failed");
                    }
                }
            }
        }
        true
    }

    #[test]
    fn explicit_single_sender_single_rcv_round_trip() {
        let mut loops = 0;
        loop {
            assert!(single_sender_single_rcv_round_trip_exp(8, 8, 5, 10));
            loops += 1;
            if loops > 2_500 {
                break;
            }
            thread::sleep(::std::time::Duration::from_millis(1));
        }
    }

    #[test]
    fn single_sender_single_rcv_round_trip() {
        // Similar to the multi sender test except now with a single sender we
        // can guarantee order.
        fn inner(
            in_memory_bytes: usize,
            disk_bytes: usize,
            max_disk_files: usize,
            total_vals: usize,
        ) -> TestResult {
            let sz = mem::size_of::<u64>();
            if total_vals == 0 || (in_memory_bytes / sz) == 0 || (disk_bytes / sz) == 0 {
                return TestResult::discard();
            }
            TestResult::from_bool(single_sender_single_rcv_round_trip_exp(
                in_memory_bytes,
                disk_bytes,
                max_disk_files,
                total_vals,
            ))
        }
        QuickCheck::new().quickcheck(inner as fn(usize, usize, usize, usize) -> TestResult);
    }

}