use futures::{Stream, TryStream};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{fmt, mem};
use self::State::*;
use super::helpers::*;
use crate::http::errors::ReadError;
use crate::http::BodyChunk;
pub struct BoundaryFinder<S: TryStream>
where
S::Error: Into<ReadError>,
{
stream: S,
state: State<S::Ok>,
boundary: Box<[u8]>,
}
impl<S: TryStream> BoundaryFinder<S>
where
S::Error: Into<ReadError>,
{
pub fn new<B: Into<Vec<u8>>>(stream: S, boundary: B) -> Self {
BoundaryFinder {
stream,
state: State::Watching,
boundary: boundary.into().into_boxed_slice(),
}
}
}
macro_rules! set_state {
($self:ident = $state:expr) => {
*$self.as_mut().state() = $state;
};
}
impl<S> BoundaryFinder<S>
where
S: TryStream,
S::Ok: BodyChunk,
S::Error: Into<ReadError>,
{
unsafe_pinned!(stream: S);
unsafe_unpinned!(state: State<S::Ok>);
pub fn body_chunk(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<S::Ok, ReadError>>> {
macro_rules! try_ready_opt (
($try:expr) => (
match $try {
Poll::Ready(Some(Ok(val))) => val,
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e.into()))),
Poll::Ready(None) => {
set_state!(self = End);
return Poll::Ready(None);
}
Poll::Pending => return Poll::Pending,
}
);
($try:expr; $restore:expr) => (
match $try {
Poll::Ready(Some(Ok(val))) => val,
Poll::Ready(Some(Err(e))) => {
set_state!(self = $restore);
return Poll::Ready(Some(Err(e.into())));
},
Poll::Ready(None) => {
set_state!(self = End);
return Poll::Ready(None);
},
Poll::Pending => {
set_state!(self = $restore);
return Poll::Pending;
}
}
)
);
loop {
match self.state {
Found(_) | Split(_, _) | End => return Poll::Ready(None),
_ => (),
}
match mem::replace(self.as_mut().state(), Watching) {
Watching => {
let chunk = try_ready_opt!(self.as_mut().stream().try_poll_next(cx));
if chunk.is_empty() {
return ready_ok(chunk);
}
if let Some(chunk) = self.as_mut().check_chunk(chunk) {
return ready_ok(chunk);
}
}
Remainder(rem) => {
if let Some(chunk) = self.as_mut().check_chunk(rem) {
return ready_ok(chunk);
}
}
Partial(partial, res) => {
let chunk = match self.as_mut().stream().try_poll_next(cx) {
Poll::Ready(Some(chunk)) => match chunk {
Ok(chunk) => chunk,
Err(e) => return Poll::Ready(Some(Err(e.into()))),
},
Poll::Ready(None) => {
set_state!(self = End);
return Poll::Ready(Some(fmt_err!(
"unable to verify multipart boundary; expected: \"{}\" found: \"{}\"",
show_bytes(&self.boundary),
show_bytes(partial.as_slice())
)));
}
Poll::Pending => {
set_state!(self = Partial(partial, res));
return Poll::Pending;
}
};
if !self.is_boundary_prefix(partial.as_slice(), chunk.as_slice(), res) {
set_state!(self = Remainder(chunk));
return ready_ok(partial);
}
let needed_len = (self.boundary_size(res.incl_crlf)).saturating_sub(partial.len());
if needed_len > chunk.len() {
return Poll::Ready(Some(fmt_err!(
"needed {} more bytes to verify boundary, got {}",
needed_len,
chunk.len()
)));
}
let bnd_start = res.boundary_start();
let is_boundary = (bnd_start > partial.len()
&& self.check_boundary(&chunk.as_slice()[bnd_start - partial.len()..]))
|| self.check_boundary_split(&partial.as_slice()[bnd_start..], chunk.as_slice());
if !is_boundary {
*self.as_mut().state() = Remainder(chunk);
return ready_ok(partial);
}
let ret = if res.incl_crlf {
if partial.len() < bnd_start {
*self.as_mut().state() = Found(chunk.split_into(bnd_start - partial.len()).1);
partial.split_into(res.idx).0
} else {
let (ret, rem) = partial.split_into(res.idx);
let (_, first) = rem.split_into(2);
*self.as_mut().state() = Split(first, chunk);
ret
}
} else {
let (ret, first) = partial.split_into(res.idx);
*self.as_mut().state() = Split(first, chunk);
ret
};
if !ret.is_empty() {
return ready_ok(ret);
} else {
return Poll::Ready(None);
}
}
state => unreachable!("invalid state: {:?}", state),
}
}
}
fn check_chunk(mut self: Pin<&mut Self>, chunk: S::Ok) -> Option<S::Ok> {
if chunk.is_empty() {
return None;
}
if let Some(res) = self.find_boundary(&chunk) {
let len = self.boundary_size(res.incl_crlf);
if chunk.len() < res.idx + len {
set_state!(self = Partial(chunk, res));
None
} else {
let (ret, bnd) = chunk.split_into(res.idx);
let bnd = if res.incl_crlf {
bnd.split_into(2).1
} else {
bnd
};
set_state!(self = Found(bnd));
if !ret.is_empty() {
Some(ret)
} else {
None
}
}
} else {
Some(chunk)
}
}
fn find_boundary(&self, chunk: &S::Ok) -> Option<SearchResult> {
twoway::find_bytes(chunk.as_slice(), &self.boundary)
.map(|idx| check_crlf(chunk.as_slice(), idx))
.or_else(|| self.partial_find_boundary(chunk))
}
fn is_boundary_prefix(&self, first: &[u8], second: &[u8], res: SearchResult) -> bool {
let maybe_prefix = first.iter().chain(second);
if res.incl_crlf {
maybe_prefix
.zip(b"\r\n".iter().chain(&*self.boundary))
.all(|(l, r)| l == r)
} else {
maybe_prefix.zip(&*self.boundary).all(|(l, r)| l == r)
}
}
fn partial_find_boundary(&self, chunk: &S::Ok) -> Option<SearchResult> {
let chunk = chunk.as_slice();
let len = chunk.len();
partial_rmatch(chunk, &self.boundary)
.map(|idx| check_crlf(chunk, idx))
.or_else(||
if len >= 2 && chunk[len - 2..] == *b"\r\n" {
Some(SearchResult {
idx: len - 2,
incl_crlf: true,
})
} else if len >= 1 && chunk[len - 1] == b'\r' {
Some(SearchResult {
idx: len - 1,
incl_crlf: true,
})
} else {
None
}
)
}
fn check_boundary(&self, bytes: &[u8]) -> bool {
(bytes.len() >= 2 && bytes[2..].starts_with(&self.boundary)) || bytes.starts_with(&self.boundary)
}
fn check_boundary_split(&self, first: &[u8], second: &[u8]) -> bool {
let check_len = self.boundary.len().saturating_sub(first.len());
second.len() >= check_len
&& first
.iter()
.chain(&second[..check_len])
.zip(self.boundary.iter())
.all(|(l, r)| l == r)
}
pub fn consume_boundary(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<bool, ReadError>> {
while ready!(self.as_mut().body_chunk(cx)?).is_some() {
}
match mem::replace(self.as_mut().state(), Watching) {
Found(bnd) => self.confirm_boundary(bnd),
Split(first, second) => self.confirm_boundary_split(first, second),
End => {
*self.state() = End;
ready_ok(false)
}
state => unreachable!("invalid state: {:?}", state),
}
}
fn confirm_boundary(mut self: Pin<&mut Self>, boundary: S::Ok) -> Poll<Result<bool, ReadError>> {
if boundary.len() < self.boundary_size(false) {
ret_err!("boundary sequence too short: {}", show_bytes(boundary.as_slice()));
}
let (boundary, rem) = boundary.split_into(self.boundary_size(false));
let boundary = boundary.as_slice();
debug_assert!(
!boundary.starts_with(b"\r\n"),
"leading CRLF should have been trimmed from boundary: {}",
show_bytes(boundary)
);
debug_assert!(
self.check_boundary(boundary),
"invalid boundary previous confirmed as valid: {}",
show_bytes(boundary)
);
set_state!(self = if !rem.is_empty() { Remainder(rem) } else { Watching });
let is_end = check_last_two(boundary);
if is_end {
set_state!(self = End);
}
ready_ok(!is_end)
}
fn confirm_boundary_split(mut self: Pin<&mut Self>, first: S::Ok, second: S::Ok) -> Poll<Result<bool, ReadError>> {
let first = first.as_slice();
let check_len = self.boundary_size(false) - first.len();
if second.len() < check_len {
ret_err!(
"split boundary sequence too short: ({}, {})",
show_bytes(first),
show_bytes(second.as_slice())
);
}
let (second, rem) = second.split_into(check_len);
let second = second.as_slice();
set_state!(self = Remainder(rem));
debug_assert!(
!first.starts_with(b"\r\n"),
"leading CRLF should have been trimmed from first boundary section: {}",
show_bytes(first)
);
debug_assert!(
self.check_boundary_split(first, second),
"invalid split boundary previous confirmed as valid: ({}, {})",
show_bytes(first),
show_bytes(second)
);
let is_end = check_last_two(second);
if is_end {
set_state!(self = End);
}
ready_ok(!is_end)
}
fn boundary_size(&self, incl_crlf: bool) -> usize {
self.boundary.len() + if incl_crlf { 4 } else { 2 }
}
}
impl<S> Stream for BoundaryFinder<S>
where
S: TryStream,
S::Ok: BodyChunk,
S::Error: Into<ReadError>,
{
type Item = Result<S::Ok, ReadError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
self.body_chunk(cx)
}
}
impl<S: TryStream + fmt::Debug> fmt::Debug for BoundaryFinder<S>
where
S::Ok: BodyChunk + fmt::Debug,
S::Error: Into<ReadError>,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("BoundaryFinder")
.field("stream", &self.stream)
.field("state", &self.state)
.field("boundary", &self.boundary)
.finish()
}
}
enum State<B> {
Watching,
Partial(B, SearchResult),
Found(B),
Split(B, B),
Remainder(B),
End,
}
impl<B: BodyChunk> fmt::Debug for State<B> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
use self::State::*;
match *self {
Watching => f.write_str("State::Watching"),
Partial(ref bnd, res) => write!(f, "State::Partial({}, {:?})", show_bytes(bnd.as_slice()), res),
Found(ref bnd) => write!(f, "State::Found({})", show_bytes(bnd.as_slice())),
Split(ref first, ref second) => write!(
f,
"State::Split(\"{}\", \"{}\")",
show_bytes(first.as_slice()),
show_bytes(second.as_slice())
),
Remainder(ref rem) => write!(f, "State::Remainder({})", show_bytes(rem.as_slice())),
End => f.write_str("State::End"),
}
}
}
#[derive(Copy, Clone, Debug)]
struct SearchResult {
idx: usize,
incl_crlf: bool,
}
impl SearchResult {
fn boundary_start(&self) -> usize {
if self.incl_crlf {
self.idx + 2
} else {
self.idx
}
}
}
fn check_crlf(chunk: &[u8], mut idx: usize) -> SearchResult {
let mut incl_crlf = false;
if idx >= 2 && chunk[idx - 2..idx] == *b"\r\n" {
incl_crlf = true;
idx -= 2;
}
SearchResult { idx, incl_crlf }
}
fn check_last_two(boundary: &[u8]) -> bool {
let is_end = boundary.ends_with(b"--");
if !is_end && !boundary.ends_with(b"\r\n") && boundary.len() > 2 {
tracing::warn!("unexpected bytes after boundary");
}
is_end
}
fn partial_rmatch(haystack: &[u8], needle: &[u8]) -> Option<usize> {
if haystack.is_empty() || needle.is_empty() {
return None;
}
let trim_start = haystack.len().saturating_sub(needle.len() - 1);
let idx = try_opt!(twoway::find_bytes(&haystack[trim_start..], &needle[..1])) + trim_start;
if haystack[idx..].iter().zip(needle).all(|(l, r)| l == r) {
Some(idx)
} else {
None
}
}
#[cfg(test)]
mod test {
use super::BoundaryFinder;
use crate::http::multipart::test_util::*;
#[test]
fn test_empty_stream() {
let finder = BoundaryFinder::new(mock_stream(&[]), BOUNDARY);
pin_mut!(finder);
ready_assert_ok_eq!(|cx| finder.as_mut().consume_boundary(cx), false);
}
#[test]
fn test_one_boundary() {
let finder = BoundaryFinder::new(mock_stream(&[b"--boundary\r\n"]), BOUNDARY);
pin_mut!(finder);
ready_assert_ok_eq!(|cx| finder.as_mut().consume_boundary(cx), true);
ready_assert_ok_eq!(|cx| finder.as_mut().consume_boundary(cx), false);
}
#[test]
fn test_one_incomplete_boundary() {
let finder = BoundaryFinder::new(mock_stream(&[b"--bound"]), BOUNDARY);
pin_mut!(finder);
let result = until_ready!(|cx| finder.as_mut().consume_boundary(cx));
assert_eq!(result.is_err(), true);
}
#[test]
fn test_one_empty_field() {
let finder = BoundaryFinder::new(
mock_stream(&[b"--boundary", b"\r\n", b"\r\n", b"--boundary--"]),
BOUNDARY,
);
pin_mut!(finder);
ready_assert_ok_eq!(|cx| finder.as_mut().consume_boundary(cx), true);
ready_assert_eq_none!(|cx| finder.as_mut().body_chunk(cx));
ready_assert_ok_eq!(|cx| finder.as_mut().consume_boundary(cx), false);
}
#[test]
fn test_one_nonempty_field() {
let finder = BoundaryFinder::new(
mock_stream(&[b"--boundary", b"\r\n", b"field data", b"\r\n", b"--boundary--"]),
BOUNDARY,
);
pin_mut!(finder);
ready_assert_ok_eq!(|cx| finder.as_mut().consume_boundary(cx), true);
ready_assert_some_ok_eq!(|cx| finder.as_mut().body_chunk(cx), &b"field data"[..]);
ready_assert_eq_none!(|cx| finder.as_mut().body_chunk(cx));
ready_assert_ok_eq!(|cx| finder.as_mut().consume_boundary(cx), false);
}
#[test]
fn test_two_empty_fields() {
let finder = BoundaryFinder::new(
mock_stream(&[b"--boundary", b"\r\n", b"\r\n--boundary\r\n", b"\r\n", b"--boundary--"]),
BOUNDARY,
);
pin_mut!(finder);
ready_assert_ok_eq!(|cx| finder.as_mut().consume_boundary(cx), true);
ready_assert_eq_none!(|cx| finder.as_mut().body_chunk(cx));
ready_assert_ok_eq!(|cx| finder.as_mut().consume_boundary(cx), true);
ready_assert_eq_none!(|cx| finder.as_mut().body_chunk(cx));
ready_assert_ok_eq!(|cx| finder.as_mut().consume_boundary(cx), false);
}
}