#![allow(dead_code)]
#![allow(unused_must_use)]
use crate::ReadSeek;
use rand;
use rocket::futures::io::Cursor;
use rocket::futures::AsyncWriteExt;
use rocket::tokio::io::ReadBuf;
use rocket::tokio::runtime::Handle;
use rocket::{
futures::executor::block_on,
tokio::io::{AsyncRead, AsyncSeek, AsyncSeekExt},
};
use std::io::SeekFrom;
use std::pin::Pin;
use std::task::{Context, Poll};
pub type Ranges = Vec<(u64, u64)>;
fn random_boundary() -> String {
use rand::RngCore;
let mut x = [0 as u8; 30];
rand::thread_rng().fill_bytes(&mut x);
(&x[..])
.iter()
.map(|&x| format!("{:x}", x))
.fold(String::new(), |mut a, x| {
a.push_str(x.as_str());
a
})
}
pub struct MultipartReader<'a> {
stream_len: u64,
stream: Pin<Box<dyn ReadSeek + 'a>>,
ranges: Ranges,
idx: usize,
pub boundary: String,
content_type: String,
buffer: Cursor<Vec<u8>>,
wrote_boundary_header: bool,
}
impl<'a> MultipartReader<'a> {
pub fn new<T>(
stream: T,
stream_len: u64,
content_type: impl Into<String>,
ranges: Vec<(u64, u64)>,
) -> MultipartReader<'a>
where
T: AsyncRead + AsyncSeek + Send + 'a,
{
let stream = Box::pin(stream);
return MultipartReader {
stream_len,
stream,
ranges,
idx: 0,
boundary: random_boundary(),
content_type: content_type.into(),
buffer: Cursor::new(Vec::new()),
wrote_boundary_header: false,
};
}
fn write_boundary(&mut self) -> std::io::Result<()> {
let handle = Handle::current();
handle.enter();
let boundary = format!("\r\n--{}\r\n", self.boundary);
block_on(self.buffer.write_all(boundary.as_bytes()))
}
fn write_boundary_header(
&mut self,
header: impl AsRef<str>,
value: impl AsRef<str>,
) -> std::io::Result<()> {
let handle = Handle::current();
handle.enter();
let header = format!("{}: {}\r\n", header.as_ref(), value.as_ref());
block_on(self.buffer.write_all(header.as_bytes()))
}
fn write_boundary_end(&mut self) -> std::io::Result<()> {
let handle = Handle::current();
handle.enter();
block_on(self.buffer.write_all("\r\n".as_bytes()))
}
fn write_boundary_closer(&mut self) -> std::io::Result<()> {
let handle = Handle::current();
handle.enter();
block_on(
self.buffer
.write_all(format!("\r\n--{}--\r\n\r\n", &self.boundary).as_bytes()),
)
}
fn clear_buffer(&mut self) {
let handle = Handle::current();
handle.enter();
block_on(rocket::futures::AsyncSeekExt::seek(
&mut self.buffer,
SeekFrom::Start(0),
))
.unwrap();
self.buffer.get_mut().truncate(0);
}
}
impl<'a> AsyncRead for MultipartReader<'a> {
fn poll_read(
mut self: Pin<&mut MultipartReader<'a>>,
_cs: &mut Context,
buf: &mut ReadBuf<'_>,
) -> Poll<Result<(), std::io::Error>> {
let handle = Handle::current();
handle.enter();
let mut c = 0;
let bufsize = self.buffer.get_ref().len() as u64;
if bufsize > 0 {
if self.buffer.position() < bufsize - 1 {
c = block_on(rocket::futures::AsyncReadExt::read(
&mut self.buffer,
buf.initialized_mut(),
))
.unwrap();
}
if self.buffer.position() >= bufsize - 1 {
self.clear_buffer();
}
}
if c >= buf.initialized().len() {
return Poll::Ready(Ok(()));
}
if self.idx >= self.ranges.len() {
if self.idx == self.ranges.len() {
self.write_boundary_closer()?;
block_on(rocket::futures::AsyncSeekExt::seek(
&mut self.buffer,
SeekFrom::Start(0),
))
.unwrap();
self.idx = self.idx + 1;
return match block_on(rocket::tokio::io::AsyncReadExt::read(
&mut self,
&mut buf.initialized_mut()[c..],
)) {
Ok(_) => Poll::Ready(Ok(())),
Err(e) => Poll::Ready(Err(e)),
};
}
return Poll::Ready(Ok(()));
}
let (start, end) = self.ranges[self.idx];
let current_position = match block_on(self.stream.stream_position()) {
Ok(pos) => pos,
Err(e) => return Poll::Ready(Err(e)),
};
if !self.wrote_boundary_header {
match block_on(self.stream.seek(SeekFrom::Start(start))) {
Ok(_) => (),
Err(e) => return Poll::Ready(Err(e)),
};
self.write_boundary()?;
let stream_len = self.stream_len;
self.write_boundary_header(
"Content-Range",
format!("bytes {}-{}/{}", start, end, stream_len).as_str(),
)?;
let content_type = self.content_type.clone();
self.write_boundary_header("Content-Type", content_type)?;
self.write_boundary_end()?;
self.wrote_boundary_header = true;
block_on(rocket::futures::AsyncSeekExt::seek(
&mut self.buffer,
SeekFrom::Start(0),
))
.unwrap();
return match block_on(rocket::tokio::io::AsyncReadExt::read(
&mut self,
&mut buf.initialized_mut()[c..],
)) {
Ok(_) => Poll::Ready(Ok(())),
Err(e) => Poll::Ready(Err(e)),
};
}
let remaining = (end + 1 - current_position) as usize;
if buf.initialized().len() - c >= remaining {
match block_on(rocket::tokio::io::AsyncReadExt::read_exact(
&mut self.stream,
&mut buf.initialized_mut()[c..remaining + c],
)) {
Ok(_) => (),
Err(e) => return Poll::Ready(Err(e)),
};
self.idx = self.idx + 1;
self.wrote_boundary_header = false;
return Poll::Ready(Ok(()));
}
match block_on(rocket::tokio::io::AsyncReadExt::read(
&mut self.stream,
&mut buf.initialized_mut()[c..],
)) {
Ok(_) => Poll::Ready(Ok(())),
Err(e) => Poll::Ready(Err(e)),
}
}
}