use bytes::{BufMut, BytesMut};
use futures::{Poll, Stream};
use std::{
cmp,
io::{self, BufRead},
usize,
};
use tokio::{
codec::{Decoder, Encoder},
io::AsyncRead,
};
#[derive(Debug)]
pub struct LossyLines<A> {
io: A,
buffer: Vec<u8>,
}
pub fn lossy_lines<A>(a: A) -> LossyLines<A>
where
A: AsyncRead + BufRead,
{
LossyLines {
io: a,
buffer: Vec::new(),
}
}
impl<A> Stream for LossyLines<A>
where
A: AsyncRead + BufRead,
{
type Item = String;
type Error = ::std::io::Error;
fn poll(&mut self) -> Poll<Option<String>, ::std::io::Error> {
let n = match self.io.read_until(b'\n', &mut self.buffer) {
Ok(t) => t,
Err(ref e) if e.kind() == ::std::io::ErrorKind::WouldBlock => {
return Ok(::futures::Async::NotReady);
}
Err(e) => return Err(e),
};
if n == 0 && self.buffer.is_empty() {
Ok(None.into())
} else {
while self.buffer.ends_with(&[b'\r']) || self.buffer.ends_with(&[b'\n']) {
self.buffer.pop();
}
let line = String::from_utf8_lossy(&self.buffer).into();
self.buffer.clear();
Ok(Some(line).into())
}
}
}
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct LossyLinesCodec {
next_index: usize,
max_length: usize,
is_discarding: bool,
}
impl LossyLinesCodec {
pub fn new() -> LossyLinesCodec {
LossyLinesCodec {
next_index: 0,
max_length: usize::MAX,
is_discarding: false,
}
}
fn discard(&mut self, newline_offset: Option<usize>, read_to: usize, buf: &mut BytesMut) {
let discard_to = if let Some(offset) = newline_offset {
self.is_discarding = false;
offset + self.next_index + 1
} else {
read_to
};
buf.advance(discard_to);
self.next_index = 0;
}
}
fn without_carriage_return(s: &[u8]) -> &[u8] {
if let Some(&b'\r') = s.last() {
&s[..s.len() - 1]
} else {
s
}
}
impl Decoder for LossyLinesCodec {
type Item = String;
type Error = io::Error;
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> {
loop {
let read_to = cmp::min(self.max_length.saturating_add(1), buf.len());
let newline_offset = buf[self.next_index..read_to]
.iter()
.position(|b| *b == b'\n');
if self.is_discarding {
self.discard(newline_offset, read_to, buf);
} else {
return if let Some(offset) = newline_offset {
let newline_index = offset + self.next_index;
self.next_index = 0;
let line = buf.split_to(newline_index + 1);
let line = &line[..line.len() - 1];
let line = without_carriage_return(line);
let line = String::from_utf8_lossy(line);
Ok(Some(line.to_string()))
} else if buf.len() > self.max_length {
self.is_discarding = true;
Err(io::Error::new(
io::ErrorKind::Other,
"line length limit exceeded",
))
} else {
self.next_index = read_to;
Ok(None)
};
}
}
}
fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> {
Ok(match self.decode(buf)? {
Some(frame) => Some(frame),
None => {
if buf.is_empty() || buf == &b"\r"[..] {
None
} else {
let line = buf.take();
let line = without_carriage_return(&line);
let line = String::from_utf8_lossy(line);
self.next_index = 0;
Some(line.to_string())
}
}
})
}
}
impl Encoder for LossyLinesCodec {
type Item = String;
type Error = io::Error;
fn encode(&mut self, line: String, buf: &mut BytesMut) -> Result<(), io::Error> {
buf.reserve(line.len() + 1);
buf.put(line);
buf.put_u8(b'\n');
Ok(())
}
}