use quick_xml::events::{BytesEnd, BytesStart, Event};
use quick_xml::name::QName;
use quick_xml::{Error, NsReader, Writer};
use std::io::Cursor;
use classy::hl::BodyStream;
use classy::hl::BodyStreamAsyncReader;
use log::debug;
use log::Level::Debug;
use tokio::io::BufReader;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use crate::AttributesBinding;
#[derive(Debug, Clone)]
pub struct StreamSolver {
partial: bool,
is_soap: bool,
}
impl StreamSolver {
fn new(partial: bool) -> Self {
Self {
partial,
is_soap: false,
}
}
pub fn for_source(source: Option<&str>) -> Self {
let Some(source) = source else {
return StreamSolver::new(false);
};
let payload: Vec<_> = source.match_indices("payload").map(|item| item.0).collect();
let header: Vec<_> = source
.match_indices("payload.Envelope.Header")
.map(|item| item.0)
.collect();
StreamSolver::new(payload.eq(&header))
}
pub fn bind_attributes(&mut self, binding: &dyn AttributesBinding) {
self.is_soap = binding.extract_header("soapaction").is_some()
| binding
.extract_header("content-type")
.unwrap_or_default()
.contains("action");
}
pub async fn bind_body_stream(&mut self, mut body: BodyStream<'_>) -> String {
if self.partial && self.is_soap {
self.partial_body(body).await
} else {
String::from_utf8_lossy(body.collect().await.bytes()).to_string()
}
}
async fn partial_body(&mut self, stream: BodyStream<'_>) -> String {
match self.do_partial_body(stream).await {
Ok(body) => {
if log::log_enabled!(Debug) {
debug!("Processed soap body: ");
let mut buffer = String::new();
for char in body.chars() {
buffer.push(char);
if buffer.len() >= 100 {
debug!("{buffer}");
buffer.clear();
}
}
debug!("{buffer}");
}
body
}
Err(err) => {
debug!("Unexpected error while parsing soap body {err}");
String::new()
}
}
}
async fn do_partial_body(&mut self, stream: BodyStream<'_>) -> Result<String, Error> {
let mut reader =
NsReader::from_reader(BufReader::new(BodyStreamAsyncReader::new(stream).compat()));
let mut buff = Vec::new();
let mut writer = Writer::new(Cursor::new(Vec::new()));
let mut envelope_qname: Option<Vec<u8>> = None;
loop {
let event = reader.read_event_into_async(&mut buff).await?;
match &event {
Event::Start(start) => {
debug!("XML start: {:?}", start.name());
if envelope_qname.is_none() && is_named(start, "Envelope") {
debug!("Envelope detected on payload: {:?}", start.name());
envelope_qname = Some(qname(start).to_vec());
} else if envelope_qname.is_some()
&& same_prefix(envelope_qname.as_ref().unwrap(), start)
&& is_named(start, "Body")
{
debug!("Body tag detected, rest of the payload will be ignored.");
writer.write_event(Event::End(BytesEnd::from(QName(
envelope_qname.as_ref().unwrap().as_slice(),
))))?;
return Ok(write(writer));
}
}
Event::Eof => {
debug!("XML eof");
return Ok(write(writer));
}
Event::End(end) => {
debug!("XML end: {:?}", end.name());
}
Event::Text(_) => {
debug!("XML text");
}
_ => {}
}
writer.write_event(event)?
}
}
}
fn qname<'a>(start: &'a BytesStart) -> &'a [u8] {
start.name().into_inner()
}
fn is_named(start: &BytesStart, name: &str) -> bool {
start.name().local_name().into_inner().eq(name.as_bytes())
}
fn same_prefix(old: &Vec<u8>, start: &BytesStart) -> bool {
QName(old.as_slice()).prefix().eq(&start.name().prefix())
}
fn write(writer: Writer<Cursor<Vec<u8>>>) -> String {
String::from_utf8_lossy(writer.into_inner().into_inner().as_slice()).to_string()
}
#[cfg(test)]
mod test {
use crate::context::stream::StreamSolver;
#[test]
pub fn no_source() {
let solver = StreamSolver::for_source(None);
assert!(!solver.partial);
}
#[test]
pub fn partial() {
let solver = StreamSolver::for_source(Some("#[payload.Envelope.Header]"));
assert!(solver.partial);
}
#[test]
pub fn complete() {
let solver = StreamSolver::for_source(Some("#[payload + payload.Envelope.Header]"));
assert!(!solver.partial);
}
}