use crate::context::{Context, Delimiters};
use crate::io::Lifecycle;
pub trait Reducer {
fn setup(&mut self, _ctx: &mut Context) {}
fn reduce(&mut self, key: &[u8], values: &[&[u8]], ctx: &mut Context) {
for value in values {
ctx.write(key, value);
}
}
fn cleanup(&mut self, _ctx: &mut Context) {}
}
impl<R> Reducer for R
where
R: FnMut(&[u8], &[&[u8]], &mut Context),
{
#[inline]
fn reduce(&mut self, key: &[u8], value: &[&[u8]], ctx: &mut Context) {
self(key, value, ctx)
}
}
pub(crate) struct ReducerLifecycle<R>
where
R: Reducer,
{
on: bool,
key: Vec<u8>,
values: Vec<Vec<u8>>,
reducer: R,
}
impl<R> ReducerLifecycle<R>
where
R: Reducer,
{
pub(crate) fn new(reducer: R) -> Self {
Self {
reducer,
on: false,
key: Vec::new(),
values: Vec::new(),
}
}
}
impl<R> Lifecycle for ReducerLifecycle<R>
where
R: Reducer,
{
#[inline]
fn on_start(&mut self, ctx: &mut Context) {
self.reducer.setup(ctx);
}
fn on_entry(&mut self, input: &[u8], ctx: &mut Context) {
let (key, value) = {
let delim = ctx.get::<Delimiters>().unwrap();
match twoway::find_bytes(&input, delim.input()) {
Some(n) if n < input.len() => {
(&input[..n], &input[n + delim.input().len()..])
}
_ => (&input[..], &b""[..]),
}
};
if !self.on {
self.on = true;
self.key.clear();
self.key.extend(key);
}
if self.key == key {
self.values.push(value.to_vec());
return;
}
let mut values = Vec::with_capacity(self.values.len());
for value in &self.values {
values.push(value.as_slice());
}
self.reducer.reduce(&self.key, &values, ctx);
self.key.clear();
self.key.extend(key);
self.values.clear();
self.values.push(value.to_vec());
}
#[inline]
fn on_end(&mut self, ctx: &mut Context) {
let mut values = Vec::with_capacity(self.values.len());
for value in &self.values {
values.push(value.as_slice());
}
self.reducer.reduce(&self.key, &values, ctx);
self.reducer.cleanup(ctx);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::context::Contextual;
use crate::io::Lifecycle;
#[test]
fn test_reducer_lifecycle() {
let mut ctx = Context::new();
let mut reducer = ReducerLifecycle::new(TestReducer);
reducer.on_start(&mut ctx);
{
reducer.on_entry(b"first\tone", &mut ctx);
reducer.on_entry(b"first\ttwo", &mut ctx);
reducer.on_entry(b"first\tthree", &mut ctx);
reducer.on_entry(b"second\tone", &mut ctx);
reducer.on_entry(b"second\ttwo", &mut ctx);
reducer.on_entry(b"second\tthree", &mut ctx);
let pair = ctx.get::<TestPair>();
assert!(pair.is_some());
let pair = pair.unwrap();
assert_eq!(pair.0, b"first");
assert_eq!(pair.1, vec![&b"one"[..], b"two", b"three"]);
}
reducer.on_end(&mut ctx);
let pair = ctx.get::<TestPair>();
assert!(pair.is_some());
let pair = pair.unwrap();
assert_eq!(pair.0, b"second");
assert_eq!(pair.1, vec![&b"one"[..], b"two", b"three"]);
}
#[test]
fn test_reducer_empty_values() {
let mut ctx = Context::new();
let mut reducer = ReducerLifecycle::new(TestReducer);
reducer.on_start(&mut ctx);
reducer.on_entry(b"key", &mut ctx);
reducer.on_entry(b"key\t", &mut ctx);
reducer.on_end(&mut ctx);
let pair = ctx.get::<TestPair>();
assert!(pair.is_some());
let pair = pair.unwrap();
assert_eq!(pair.0, b"key");
assert_eq!(pair.1, vec![b"", b""]);
}
struct TestPair(Vec<u8>, Vec<Vec<u8>>);
struct TestReducer;
impl Contextual for TestPair {}
impl Reducer for TestReducer {
fn reduce(&mut self, key: &[u8], values: &[&[u8]], ctx: &mut Context) {
let mut stored = Vec::new();
for value in values {
stored.push(value.to_vec());
}
ctx.insert(TestPair(key.to_vec(), stored));
}
}
}