1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
use hyper::{Body, Error};
use std::collections::VecDeque;
use futures::stream::StreamExt;
pub fn lines(body: Body) -> Lines
{
Lines
{
body,
buffer: VecDeque::new(),
done: false,
}
}
pub struct Lines
{
body: Body,
buffer: VecDeque<u8>,
done: bool,
}
impl Lines
{
pub async fn next(&mut self) -> Option<Result<Vec<u8>, Error>>
{
while !self.done || self.buffer.len()>0
{
let nlpos = self.buffer.iter().enumerate().find(|&(_,&a)| a==b'\n')
.map(|(i, _)| i);
if let Some(nlpos) = nlpos
{
let next_line = self.buffer.drain(0 ..= nlpos).take(nlpos).collect();
return Some(Ok(next_line));
}
else if self.done
{
let next_line = self.buffer.drain(..).collect();
return Some(Ok(next_line));
}
if let Some(chunk) = self.body.next().await
{
if let Err(e) = chunk
{
return Some(Err(e));
}
let chunk = chunk.unwrap();
self.buffer.extend( chunk.iter() );
}
else
{
self.done = true;
}
}
None
}
}