1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
//! IO binding module for the `efflux` crate.
//!
//! Provides lifecycles for Hadoop Streaming IO, to allow the rest
//! of this crate to be a little more ignorant of how inputs flow.
use bytelines::*;
use std::io::{self, BufReader};

use crate::context::Context;

/// Lifecycle trait to allow hooking into IO streams.
///
/// This will be implemented by all stages of MapReduce (e.g. to
/// appropriately handle buffering for the reduction stage). All
/// trait methods default to noop, as they're all optional.
pub trait Lifecycle {
    /// Startup hook for the IO stream.
    fn on_start(&mut self, _ctx: &mut Context) {}

    /// Entry hook for the IO stream to handle input values.
    fn on_entry(&mut self, _input: &[u8], _ctx: &mut Context) {}

    /// Finalization hook for the IO stream.
    fn on_end(&mut self, _ctx: &mut Context) {}
}

/// Executes an IO `Lifecycle` against `io::stdin`.
pub fn run_lifecycle<L>(mut lifecycle: L)
where
    L: Lifecycle,
{
    // lock stdin for perf
    let stdin = io::stdin();
    let stdin_lock = stdin.lock();

    // create a job context
    let mut ctx = Context::new();

    // fire the startup hooks
    lifecycle.on_start(&mut ctx);

    // create a line reader used to avoid vec allocations
    let mut lines = BufReader::new(stdin_lock).byte_lines();

    // read all inputs from stdin, and fire the entry hooks
    while let Some(Ok(input)) = lines.next() {
        lifecycle.on_entry(input, &mut ctx);
    }

    // fire the finalization hooks
    lifecycle.on_end(&mut ctx);
}