use std::io;
use futures_util::{Stream, StreamExt};
use http::{Response, StatusCode};
use xml::EmitterConfig;
use xml::common::XmlVersion;
use xml::writer::EventWriter;
use xml::writer::XmlEvent as XmlWEvent;
use crate::DavError;
use crate::async_stream::AsyncStream;
use crate::body::Body;
use crate::davpath::DavPath;
use crate::util::MemBuffer;
type Sender = crate::async_stream::Sender<(DavPath, StatusCode), DavError>;
pub(crate) struct MultiError(Sender);
impl MultiError {
pub fn new(sender: Sender) -> MultiError {
MultiError(sender)
}
pub async fn add_status<'a>(
&'a mut self,
path: &'a DavPath,
status: impl Into<DavError> + 'static,
) -> Result<(), futures_channel::mpsc::SendError> {
let status = status.into().statuscode();
self.0.send((path.clone(), status)).await;
Ok(())
}
}
type XmlWriter<'a> = EventWriter<MemBuffer>;
fn write_elem<'b, S>(xw: &mut XmlWriter, name: S, text: &str) -> Result<(), DavError>
where
S: Into<xml::name::Name<'b>>,
{
let n = name.into();
xw.write(XmlWEvent::start_element(n))?;
if !text.is_empty() {
xw.write(XmlWEvent::characters(text))?;
}
xw.write(XmlWEvent::end_element())?;
Ok(())
}
fn write_response(w: &mut XmlWriter, path: &DavPath, sc: StatusCode) -> Result<(), DavError> {
w.write(XmlWEvent::start_element("D:response"))?;
let p = path.with_prefix().as_url_string();
write_elem(w, "D:href", &p)?;
write_elem(w, "D:status", &format!("HTTP/1.1 {sc}"))?;
w.write(XmlWEvent::end_element())?;
Ok(())
}
pub(crate) async fn multi_error<S>(
req_path: DavPath,
status_stream: S,
) -> Result<Response<Body>, DavError>
where
S: Stream<Item = Result<(DavPath, StatusCode), DavError>> + Send + 'static,
{
let mut status_stream = Box::pin(status_stream);
let (path, status) = match status_stream.next().await {
None => {
debug!("multi_error: empty status_stream");
return Err(DavError::ChanError);
}
Some(Err(e)) => return Err(e),
Some(Ok(item)) => item,
};
let mut items = Vec::new();
if path == req_path {
match status_stream.next().await {
None => {
let resp = Response::builder()
.status(status)
.body(Body::empty())
.unwrap();
return Ok(resp);
}
Some(Err(e)) => return Err(e),
Some(Ok(item)) => {
items.push(Ok((path, status)));
items.push(Ok(item));
}
}
} else {
items.push(Ok((path, status)));
}
let body = AsyncStream::new(|mut tx| {
async move {
let mut xw = EventWriter::new_with_config(
MemBuffer::new(),
EmitterConfig {
perform_indent: true,
..EmitterConfig::default()
},
);
xw.write(XmlWEvent::StartDocument {
version: XmlVersion::Version10,
encoding: Some("utf-8"),
standalone: None,
})
.map_err(DavError::from)?;
xw.write(XmlWEvent::start_element("D:multistatus").ns("D", "DAV:"))
.map_err(DavError::from)?;
let data = xw.inner_mut().take();
tx.send(data).await;
let mut status_stream = futures_util::stream::iter(items).chain(status_stream);
while let Some(res) = status_stream.next().await {
let (path, status) = res?;
let status = if status == StatusCode::NO_CONTENT {
StatusCode::OK
} else {
status
};
write_response(&mut xw, &path, status)?;
let data = xw.inner_mut().take();
tx.send(data).await;
}
xw.write(XmlWEvent::end_element()).map_err(DavError::from)?;
let data = xw.inner_mut().take();
tx.send(data).await;
Ok::<_, io::Error>(())
}
});
let resp = Response::builder()
.header("content-type", "application/xml; charset=utf-8")
.status(StatusCode::MULTI_STATUS)
.body(Body::from(body))
.unwrap();
Ok(resp)
}