use std::io::{self, Read, Write, Cursor, BufReader};
use std::path::Path;
use std::fs::File;
use std::time::Duration;
use std::mem::transmute;
use super::data_stream::{DataStream, StreamReader, kill_stream};
use ext::ReadExt;
use http::hyper::h1::HttpReader;
use http::hyper::buffer;
use http::hyper::h1::HttpReader::*;
use http::hyper::net::{HttpStream, NetworkStream};
pub type BodyReader<'a, 'b> =
self::HttpReader<&'a mut self::buffer::BufReader<&'b mut NetworkStream>>;
pub struct Data {
buffer: Vec<u8>,
is_done: bool,
stream: StreamReader,
position: usize,
capacity: usize,
}
impl Data {
pub fn open(mut self) -> DataStream {
let mut buffer = vec![];
let mut stream = EmptyReader(self.stream.get_ref().clone());
::std::mem::swap(&mut buffer, &mut self.buffer);
::std::mem::swap(&mut stream, &mut self.stream);
let mut cursor = Cursor::new(buffer);
cursor.set_position(self.position as u64);
let network = stream.get_ref().clone();
let buf = cursor.take((self.capacity - self.position) as u64);
let stream = buf.chain(BufReader::new(stream));
DataStream::new(stream, network)
}
pub(crate) fn from_hyp(mut h_body: BodyReader) -> Result<Data, &'static str> {
let mut stream = match h_body.get_ref().get_ref()
.downcast_ref::<HttpStream>() {
Some(s) => {
let owned_stream = s.clone();
let buf_len = h_body.get_ref().get_buf().len() as u64;
match h_body {
SizedReader(_, n) => SizedReader(owned_stream, n - buf_len),
EofReader(_) => EofReader(owned_stream),
EmptyReader(_) => EmptyReader(owned_stream),
ChunkedReader(_, n) =>
ChunkedReader(owned_stream, n.map(|k| k - buf_len)),
}
},
None => return Err("Stream is not an HTTP stream!"),
};
stream.get_mut().set_read_timeout(Some(Duration::from_secs(5))).unwrap();
let (vec, pos, cap) = h_body.get_mut().take_buf();
Ok(Data::new(vec, pos, cap, stream))
}
#[inline(always)]
pub fn peek(&self) -> &[u8] {
&self.buffer[self.position..self.capacity]
}
#[inline(always)]
pub fn peek_complete(&self) -> bool {
self.is_done
}
#[inline(always)]
pub fn stream_to<W: Write>(self, writer: &mut W) -> io::Result<u64> {
io::copy(&mut self.open(), writer)
}
#[inline(always)]
pub fn stream_to_file<P: AsRef<Path>>(self, path: P) -> io::Result<u64> {
io::copy(&mut self.open(), &mut File::create(path)?)
}
pub(crate) fn new(mut buf: Vec<u8>,
pos: usize,
mut cap: usize,
mut stream: StreamReader)
-> Data {
const PEEK_BYTES: usize = 4096;
if buf.len() < PEEK_BYTES {
trace_!("Resizing peek buffer from {} to {}.", buf.len(), PEEK_BYTES);
buf.resize(PEEK_BYTES, 0);
}
trace!("Init buffer cap: {}", cap);
let eof = match stream.read_max(&mut buf[cap..]) {
Ok(n) => {
trace_!("Filled peek buf with {} bytes.", n);
cap += n;
cap < buf.len()
}
Err(e) => {
error_!("Failed to read into peek buffer: {:?}.", e);
false
},
};
trace_!("Peek buffer size: {}, remaining: {}", buf.len(), buf.len() - cap);
Data {
buffer: buf,
stream: stream,
is_done: eof,
position: pos,
capacity: cap,
}
}
}
impl Drop for Data {
fn drop(&mut self) {
unsafe {
let stream: &mut StreamReader = transmute(self.stream.by_ref());
kill_stream(stream, self.stream.get_mut());
}
}
}