csvroll 0.1.0

A low-level, non-FSM, zero-copy CSV parser.
use super::parser::{Parser, Index,};
use std::cmp::Ordering;
use std::ops::Range;
use std::error::Error;
use std::io;

pub struct Group<R> {
    parser: Parser<R>,
    key_idx: Vec<usize>,
    first_rec: Range<usize>,
    rec: Range<usize>,
    group: Range<usize>,
    is_buf_full: bool,
}

impl<R: io::Read> Group<R> {
    pub fn init(mut parser: Parser<R>, key_idx: Vec<usize>) -> Result<Self, Box<Error>> {
        let is_buf_full = parser.parse()?;
        let first_rec: Range<usize>;
        let rec: Range<usize>;
        let group: Range<usize>;

        {
            let (_, struct_idx) = parser.output();

            match struct_idx.records().first() {
                Some(&re) => {
                    first_rec = 0..re;
                    rec = first_rec.clone();
                    group = 0..1;
                }
                None => {
                    first_rec = 0..0;
                    rec = first_rec.clone();
                    group = 0..0;
                }
            }
        }

        Ok(Self {
            parser ,
            key_idx ,
            first_rec ,
            rec ,
            group ,
            is_buf_full ,
        })
    }
    
    #[inline]
    pub fn next_group(&mut self) -> Result<Range<usize>, Box<Error>> {
        loop {
            {
                let (buf, struct_idx) = self.parser.output();
                let fields = struct_idx.fields();
                for &re in &struct_idx.records()[self.group.end..] {
                    let rec = self.rec.end..re;
                    match cmp_records(
                        buf,
                        &fields[self.rec.end..re],
                        &fields[self.first_rec.clone()],
                        &self.key_idx)? {

                        Ordering::Less => {
                            return Err("The records are not sorted in ascending order".into());
                        }
                        Ordering::Greater => {
                            let g = self.group.clone();
                            self.first_rec = rec.clone();
                            self.rec = rec.clone();
                            self.group = self.group.end..(self.group.end + 1);
                            return Ok(g);
                        }
                        Ordering::Equal => {
                            self.rec = rec.clone();
                            self.group.end += 1;
                        }
                    }
                }
            }

            if self.is_buf_full {
                let field_offset = self.first_rec.start;
                let rec_offset = self.group.start;
                self.first_rec = (self.first_rec.start - field_offset)..(self.first_rec.end - field_offset);
                self.rec = (self.rec.start - field_offset)..(self.rec.end - field_offset);
                self.group = (self.group.start - rec_offset)..(self.group.end - rec_offset);
                self.parser.consume(rec_offset);
                self.is_buf_full = self.parser.parse()?;
            } else {
                let g = self.group.clone();
                self.group = self.group.end..self.group.end;

                return Ok(g);
            }
        }
    }

    #[inline]
    pub fn buf_index(&self) -> (&[u8], &Index) {
        self.parser.output()
    }
}

#[inline]
fn cmp_records(
    buf: &[u8],
    rec_0: &[Range<usize>],
    rec_1: &[Range<usize>],
    key_idx: &[usize],
) -> Result<Ordering, Box<Error>> {

    for &k in key_idx {
        let f0 = match rec_0.get(k) {
            Some(f) => f.clone(),
            None => return Err("The first record is shorter than key fields".into()),
        };
        let f1 = match rec_1.get(k) {
            Some(f) => f.clone(),
            None => return Err("The second record is shorter than key fields".into()),
        };
        
        match buf[f0].cmp(&buf[f1]) {
            Ordering::Less => return Ok(Ordering::Less),
            Ordering::Greater => return Ok(Ordering::Greater),
            Ordering::Equal => continue,
        }
    }

    Ok(Ordering::Equal)
}

    

#[cfg(test)]
mod tests {
    use super::*;
    use rollbuf::RollBuf;
    use ::index_builder::IndexBuilder;

    #[test]
    fn test_group() {
        struct TestCase {
            input: String,
            buf_len: usize,
            key_idx: Vec<usize>,
            want: Vec<(String, Index, Range<usize>)>,
        }

        let test_cases = vec![
            TestCase {
                input: "a,0\na,1\nb,0\nc,0".to_owned(),
                buf_len: 24,
                key_idx: vec![0],
                want: vec![
                   ("a,0\na,1\nb,0\nc,0".to_owned(),
                    Index::from_parts(vec![0..1, 2..3, 4..5, 6..7, 8..9, 10..11, 12..13, 14..15], vec![2, 4, 6, 8]),
                    0..2,),
                   ("a,0\na,1\nb,0\nc,0".to_owned(),
                    Index::from_parts(vec![0..1, 2..3, 4..5, 6..7, 8..9, 10..11, 12..13, 14..15], vec![2, 4, 6, 8]),
                    2..3,),
                   ("a,0\na,1\nb,0\nc,0".to_owned(),
                    Index::from_parts(vec![0..1, 2..3, 4..5, 6..7, 8..9, 10..11, 12..13, 14..15], vec![2, 4, 6, 8]),
                    3..4,),
                   ("a,0\na,1\nb,0\nc,0".to_owned(),
                    Index::from_parts(vec![0..1, 2..3, 4..5, 6..7, 8..9, 10..11, 12..13, 14..15], vec![2, 4, 6, 8]),
                    4..4,),
                ],
            },
            TestCase {
                input: "a,0\na,1\nb,0\nc,0".to_owned(),
                buf_len: 11,
                key_idx: vec![0],
                want: vec![
                   ("a,0\na,1\nb,0\nc,0".to_owned(),
                    Index::from_parts(vec![0..1, 2..3, 4..5, 6..7, 8..9, 10..11, 12..13, 14..15], vec![2, 4, 6, 8]),
                    0..2,),
                   ("a,0\na,1\nb,0\nc,0".to_owned(),
                    Index::from_parts(vec![0..1, 2..3, 4..5, 6..7, 8..9, 10..11, 12..13, 14..15], vec![2, 4, 6, 8]),
                    2..3,),
                   ("a,0\na,1\nb,0\nc,0".to_owned(),
                    Index::from_parts(vec![0..1, 2..3, 4..5, 6..7, 8..9, 10..11, 12..13, 14..15], vec![2, 4, 6, 8]),
                    3..4,),
                   ("a,0\na,1\nb,0\nc,0".to_owned(),
                    Index::from_parts(vec![0..1, 2..3, 4..5, 6..7, 8..9, 10..11, 12..13, 14..15], vec![2, 4, 6, 8]),
                    4..4,),
                ],
            },
            TestCase {
                input: "a,0\na,1\nb,0\nc,0".to_owned(),
                buf_len: 12,
                key_idx: vec![0],
                want: vec![
                   ("a,0\na,1\nb,0\n".to_owned(),
                    Index::from_parts(vec![0..1, 2..3, 4..5, 6..7, 8..9, 10..11], vec![2, 4, 6]),
                    0..2,),
                   ("b,0\nc,0".to_owned(),
                    Index::from_parts(vec![0..1, 2..3, 4..5, 6..7], vec![2, 4]),
                    0..1,),
                   ("b,0\nc,0".to_owned(),
                    Index::from_parts(vec![0..1, 2..3, 4..5, 6..7], vec![2, 4]),
                    1..2,),
                   ("b,0\nc,0".to_owned(),
                    Index::from_parts(vec![0..1, 2..3, 4..5, 6..7], vec![2, 4]),
                    2..2,),
                ],
            },
            TestCase {
                input: "a,0\na,1\nb,0\nb,1".to_owned(),
                buf_len: 12,
                key_idx: vec![0],
                want: vec![
                   ("a,0\na,1\nb,0\n".to_owned(),
                    Index::from_parts(vec![0..1, 2..3, 4..5, 6..7, 8..9, 10..11], vec![2, 4, 6]),
                    0..2,),
                   ("b,0\nb,1".to_owned(),
                    Index::from_parts(vec![0..1, 2..3, 4..5, 6..7], vec![2, 4]),
                    0..2,),
                   ("b,0\nb,1".to_owned(),
                    Index::from_parts(vec![0..1, 2..3, 4..5, 6..7], vec![2, 4]),
                    2..2,),
                ],
            },
        ];

        for (i, t) in test_cases.into_iter().enumerate() {
            println!("test case: {}", i);
            let TestCase { input, buf_len, key_idx, want } = t;
            let buf = RollBuf::with_capacity(buf_len, input.as_bytes());
            let idx_builder = IndexBuilder::new(b',', b'\n');
            let parser = Parser::from_parts(buf, idx_builder);
            let mut group = Group::init(parser, key_idx).unwrap();

            for w in want {
                let recs = group.next_group().unwrap();
                let (buf, idx) = group.buf_index();
                assert_eq!((buf, idx, recs), (w.0.as_bytes(), &w.1, w.2));
            }
        }
    }
}