efflux/
mapper.rs

1//! Exposed structures based on the mapping stage.
2//!
3//! This module offers the `Mapper` trait, which allows a developer
4//! to easily create a mapping stage due to the sane defaults. Also
5//! offered is the `MapperLifecycle` binding for use as an IO stage.
6use crate::context::{Context, Offset};
7use crate::io::Lifecycle;
8
9/// Trait to represent the mapping stage of MapReduce.
10///
11/// All trait methods have sane defaults to match the Hadoop MapReduce
12/// implementation, allowing the developer to pick and choose what they
13/// customize without having to write a large amount of boilerplate.
14pub trait Mapper {
15    /// Setup handler for the current `Mapper`.
16    fn setup(&mut self, _ctx: &mut Context) {}
17
18    /// Mapping handler for the current `Mapper`.
19    ///
20    /// The default implementation is to simply emit each key/value pair as they
21    /// are received, without any changes. As such, this is where most developers
22    /// will immediately begin to change things.
23    fn map(&mut self, key: usize, value: &[u8], ctx: &mut Context) {
24        ctx.write(key.to_string().as_bytes(), value);
25    }
26
27    /// Cleanup handler for the current `Mapper`.
28    fn cleanup(&mut self, _ctx: &mut Context) {}
29}
30
31/// Enables raw functions to act as `Mapper` types.
32impl<M> Mapper for M
33where
34    M: FnMut(usize, &[u8], &mut Context),
35{
36    /// Mapping handler by passing through the values to the inner closure.
37    #[inline]
38    fn map(&mut self, key: usize, value: &[u8], ctx: &mut Context) {
39        self(key, value, ctx)
40    }
41}
42
43/// Lifecycle structure to represent a mapping.
44pub(crate) struct MapperLifecycle<M>
45where
46    M: Mapper,
47{
48    mapper: M,
49}
50
51/// Basic creation for `MapperLifecycle`
52impl<M> MapperLifecycle<M>
53where
54    M: Mapper,
55{
56    /// Constructs a new `MapperLifecycle` instance.
57    pub(crate) fn new(mapper: M) -> Self {
58        Self { mapper }
59    }
60}
61
62/// `Lifecycle` implementation for the mapping stage.
63impl<M> Lifecycle for MapperLifecycle<M>
64where
65    M: Mapper,
66{
67    /// Creates all required state for the lifecycle.
68    #[inline]
69    fn on_start(&mut self, ctx: &mut Context) {
70        ctx.insert(Offset::new());
71        self.mapper.setup(ctx);
72    }
73
74    /// Passes each entry through to the mapper as a value, with the current
75    /// byte offset being provided as the key (this follows the implementation
76    /// provided in the Hadoop MapReduce Java interfaces, but it's unclear as
77    /// to whether this is the desired default behaviour here).
78    #[inline]
79    fn on_entry(&mut self, input: &[u8], ctx: &mut Context) {
80        let offset = {
81            // grabs the offset from the context, and shifts the offset
82            ctx.get_mut::<Offset>().unwrap().shift(input.len() + 2)
83        };
84
85        self.mapper.map(offset, input, ctx);
86    }
87
88    /// Finalizes the lifecycle by calling cleanup.
89    #[inline]
90    fn on_end(&mut self, ctx: &mut Context) {
91        self.mapper.cleanup(ctx);
92    }
93}
94
95#[cfg(test)]
96mod tests {
97    use super::*;
98    use crate::context::Contextual;
99    use crate::io::Lifecycle;
100
101    #[test]
102    fn test_mapper_lifecycle() {
103        let mut ctx = Context::new();
104        let mut mapper = MapperLifecycle::new(TestMapper);
105
106        mapper.on_start(&mut ctx);
107
108        {
109            let mut vet = |input: &[u8], expected: usize| {
110                mapper.on_entry(input, &mut ctx);
111
112                let pair = ctx.get::<TestPair>();
113
114                assert!(pair.is_some());
115
116                let pair = pair.unwrap();
117
118                assert_eq!(pair.0, expected);
119                assert_eq!(pair.1, input);
120            };
121
122            vet(b"first_input_line", 18);
123            vet(b"second_input_line", 37);
124            vet(b"third_input_line", 55);
125        }
126
127        mapper.on_end(&mut ctx);
128    }
129
130    struct TestPair(usize, Vec<u8>);
131
132    impl Contextual for TestPair {}
133
134    struct TestMapper;
135
136    impl Mapper for TestMapper {
137        fn map(&mut self, key: usize, val: &[u8], ctx: &mut Context) {
138            ctx.insert(TestPair(key, val.to_vec()));
139        }
140    }
141}