1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
//! Create an iterator of lines from a hyper::Body

use hyper::{Body, error::Error};
use std::collections::VecDeque;

use futures::stream::Stream;

pub fn lines(body: Body) -> Lines
{
  Lines
  {
    body: body.wait(),
    buffer: VecDeque::new(),
    done: false,
  }
}

pub struct Lines
{
  body: futures::stream::Wait<Body>,
  buffer: VecDeque<u8>,
  done: bool,
}

impl Iterator for Lines
{
  type Item = Result<Vec<u8>, Error>;

  fn next(&mut self) -> Option<Self::Item>
  {
    while !self.done || self.buffer.len()>0
    {
      // check if we already have a nl
      let nlpos = self.buffer.iter().enumerate().find(|&(_,&a)| a==b'\n')
        .map(|(i, _)| i);
      if let Some(nlpos) = nlpos
      {
        let next_line = self.buffer.drain(0 ..= nlpos).take(nlpos).collect();
        return Some(Ok(next_line));
      }
      else if self.done
      {
        // no new line, but we're at the end of the object
        let next_line = self.buffer.drain(..).collect();
        return Some(Ok(next_line));
      }

      // get more data
      if let Some(chunk) = self.body.next()
      {
        if let Err(e) = chunk
        {
          return Some(Err(e));
        }
        let chunk = chunk.unwrap();
        self.buffer.extend( chunk.iter() );
      }
      else
      {
        self.done = true;
      }
    }

    None
  }
}