use bytes::BytesMut;
use futures::{Async, Stream};
use header::XStreamError;
use hyper::Chunk;
use hyper::header::{Header, Raw};
use response::{Error, ErrorKind};
use serde::Deserialize;
use serde_json;
use std::cmp;
use std::io::{self, Read};
use std::marker::PhantomData;
use tokio_io::AsyncRead;
use tokio_io::codec::Decoder;
pub struct JsonLineDecoder<T> {
parse_stream_error: bool,
ty: PhantomData<T>,
}
impl<T> JsonLineDecoder<T> {
#[inline]
pub fn new(parse_stream_error: bool) -> JsonLineDecoder<T> {
JsonLineDecoder {
parse_stream_error,
ty: PhantomData,
}
}
}
impl<T> Decoder for JsonLineDecoder<T>
where
for<'de> T: Deserialize<'de>,
{
type Item = T;
type Error = Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let nl_index = src.iter().position(|b| *b == b'\n');
if let Some(pos) = nl_index {
let slice = src.split_to(pos + 1);
let slice = &slice[..slice.len() - 1];
match serde_json::from_slice(slice) {
Ok(json) => Ok(json),
Err(e) => {
if self.parse_stream_error {
match slice.iter().position(|&x| x == b':') {
Some(colon)
if &slice[..colon] == XStreamError::header_name().as_bytes() => {
let raw = Raw::from(&slice[colon + 2..]);
match XStreamError::parse_header(&raw) {
Ok(stream_error) => Err(
ErrorKind::StreamError(stream_error.error)
.into(),
),
Err(_) => Err(e.into()),
}
}
_ => Err(e.into()),
}
} else {
Err(e.into())
}
}
}
} else {
Ok(None)
}
}
}
pub struct LineDecoder;
impl Decoder for LineDecoder {
type Item = String;
type Error = Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let nl_index = src.iter().position(|b| *b == b'\n');
if let Some(pos) = nl_index {
let slice = src.split_to(pos + 1);
Ok(Some(
String::from_utf8_lossy(&slice[..slice.len() - 1])
.into_owned(),
))
} else {
Ok(None)
}
}
}
enum ReadState {
Ready(Chunk, usize),
NotReady,
}
pub struct StreamReader<S> {
stream: S,
state: ReadState,
}
impl<S> StreamReader<S>
where
S: Stream<Item = Chunk, Error = Error>,
{
#[inline]
pub fn new(stream: S) -> StreamReader<S> {
StreamReader {
stream: stream,
state: ReadState::NotReady,
}
}
}
impl<S> Read for StreamReader<S>
where
S: Stream<Item = Chunk, Error = Error>,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
loop {
let ret;
match self.state {
ReadState::Ready(ref mut chunk, ref mut pos) => {
let chunk_start = *pos;
let len = cmp::min(buf.len(), chunk.len() - chunk_start);
let chunk_end = chunk_start + len;
buf[..len].copy_from_slice(&chunk[chunk_start..chunk_end]);
*pos += len;
if *pos == chunk.len() {
ret = len;
} else {
return Ok(len);
}
}
ReadState::NotReady => {
match self.stream.poll() {
Ok(Async::Ready(Some(chunk))) => {
self.state = ReadState::Ready(chunk, 0);
continue;
}
Ok(Async::Ready(None)) => return Ok(0),
Ok(Async::NotReady) => return Err(io::ErrorKind::WouldBlock.into()),
Err(e) => return Err(io::Error::new(io::ErrorKind::Other, e.description())),
}
}
}
self.state = ReadState::NotReady;
return Ok(ret);
}
}
}
impl<S> AsyncRead for StreamReader<S>
where
S: Stream<Item = Chunk, Error = Error>,
{
}