use std::fmt::{self, Debug};
use std::io::{Error, ErrorKind, Result};
use std::mem;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use bytes::Bytes;
use futures_core::stream::{FusedStream, Stream};
use try_lock::TryLock;
use super::plain_futures03::{self, Read};
use crate::headers::RawHeaders;
pub struct FormData<S> {
inner: Arc<TryLock<Option<plain_futures03::FormData<S>>>>,
}
pub struct Part<S> {
headers: RawHeaders,
inner: Option<Arc<TryLock<Option<plain_futures03::FormData<S>>>>>,
}
impl<S> FormData<S> {
pub fn new(stream: S, boundary: &str) -> Self {
let inner_form = plain_futures03::FormData::new(stream, boundary);
Self {
inner: Arc::new(TryLock::new(Some(inner_form))),
}
}
}
impl<S> Stream for FormData<S>
where
S: Stream<Item = Result<Bytes>> + Unpin,
{
type Item = Result<Part<S>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
match Arc::get_mut(&mut self.inner) {
Some(_) => {
}
None => {
let inner = match self.inner.try_lock() {
Some(mut inner) => mem::take(&mut *inner),
None => {
cx.waker().wake_by_ref();
return Poll::Pending;
}
};
self.inner = Arc::new(TryLock::new(inner));
}
};
let mut inner = self.inner.try_lock().expect("TryLock was mem::forgotten");
let inner = inner.as_mut().expect("inner should never be None");
match Pin::new(inner).poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(Ok(Read::NewPart { headers }))) => {
let inner = Arc::clone(&self.inner);
Poll::Ready(Some(Ok(Part {
headers,
inner: Some(inner),
})))
}
Poll::Ready(Some(Ok(Read::Part(_)))) | Poll::Ready(Some(Ok(Read::PartEof))) => {
cx.waker().wake_by_ref();
Poll::Pending
}
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
Poll::Ready(None) => Poll::Ready(None),
}
}
}
impl<S> FusedStream for FormData<S>
where
S: Stream<Item = Result<Bytes>> + Unpin,
{
fn is_terminated(&self) -> bool {
match self.inner.try_lock() {
Some(inner) => match &*inner {
Some(inner) => inner.is_terminated(),
None => false,
},
None => false,
}
}
}
impl<S> Debug for FormData<S> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("FormData").finish()
}
}
impl<S> Part<S> {
pub fn raw_headers(&self) -> &RawHeaders {
&self.headers
}
}
impl<S> Stream for Part<S>
where
S: Stream<Item = Result<Bytes>> + Unpin,
{
type Item = Result<Bytes>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let inner_arc = match &self.inner {
Some(inner_arc) => inner_arc,
None => {
return Poll::Ready(None);
}
};
let mut inner_ = match inner_arc.try_lock() {
Some(inner) => inner,
None => {
return Poll::Ready(Some(Err(Error::new(
ErrorKind::Other,
"Tried to poll data from the not last Part",
))));
}
};
let inner = match &mut *inner_ {
Some(inner) => inner,
None => {
drop(inner_);
self.inner = None;
return Poll::Ready(Some(Err(Error::new(
ErrorKind::Other,
"Tried to poll data from the not last Part",
))));
}
};
match Pin::new(inner).poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(Ok(Read::Part(bytes)))) => Poll::Ready(Some(Ok(bytes))),
Poll::Ready(Some(Ok(Read::PartEof))) | Poll::Ready(None) => {
drop(inner_);
self.inner = None;
Poll::Ready(None)
}
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
Poll::Ready(Some(Ok(Read::NewPart { .. }))) => unreachable!(),
}
}
}
impl<S> FusedStream for Part<S>
where
S: Stream<Item = Result<Bytes>> + Unpin,
{
fn is_terminated(&self) -> bool {
self.inner.is_none()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn assertions() {
fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}
fn assert_unpin<T: Unpin>() {}
struct PerfectStream;
impl Stream for PerfectStream {
type Item = Result<Bytes>;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
Poll::Pending
}
}
assert_send::<FormData<PerfectStream>>();
assert_sync::<FormData<PerfectStream>>();
assert_unpin::<FormData<PerfectStream>>();
assert_send::<Part<PerfectStream>>();
assert_sync::<Part<PerfectStream>>();
assert_unpin::<Part<PerfectStream>>();
}
}