Documentation
// Copyright (c) 2026, Salesforce, Inc.,
// All rights reserved.
// For full license text, see the LICENSE.txt file

use quick_xml::events::{BytesEnd, BytesStart, Event};
use quick_xml::name::QName;
use quick_xml::{Error, NsReader, Writer};
use std::io::Cursor;

use classy::hl::BodyStream;
use classy::hl::BodyStreamAsyncReader;
use log::debug;
use log::Level::Debug;
use tokio::io::BufReader;
use tokio_util::compat::FuturesAsyncReadCompatExt;

use crate::AttributesBinding;

#[derive(Debug, Clone)]
pub struct StreamSolver {
    partial: bool,
    is_soap: bool,
}

impl StreamSolver {
    fn new(partial: bool) -> Self {
        Self {
            partial,
            is_soap: false,
        }
    }

    pub fn for_source(source: Option<&str>) -> Self {
        let Some(source) = source else {
            return StreamSolver::new(false);
        };

        // false positive is there is a var or a header named payload
        let payload: Vec<_> = source.match_indices("payload").map(|item| item.0).collect();
        let header: Vec<_> = source
            .match_indices("payload.Envelope.Header")
            .map(|item| item.0)
            .collect();

        StreamSolver::new(payload.eq(&header))
    }

    pub fn bind_attributes(&mut self, binding: &dyn AttributesBinding) {
        self.is_soap = binding.extract_header("soapaction").is_some()
            | binding
                .extract_header("content-type")
                .unwrap_or_default()
                .contains("action");
    }

    pub async fn bind_body_stream(&mut self, mut body: BodyStream<'_>) -> String {
        if self.partial && self.is_soap {
            self.partial_body(body).await
        } else {
            String::from_utf8_lossy(body.collect().await.bytes()).to_string()
        }
    }

    async fn partial_body(&mut self, stream: BodyStream<'_>) -> String {
        match self.do_partial_body(stream).await {
            Ok(body) => {
                if log::log_enabled!(Debug) {
                    debug!("Processed soap body: ");
                    let mut buffer = String::new();
                    for char in body.chars() {
                        buffer.push(char);
                        if buffer.len() >= 100 {
                            debug!("{buffer}");
                            buffer.clear();
                        }
                    }
                    debug!("{buffer}");
                }

                body
            }
            Err(err) => {
                debug!("Unexpected error while parsing soap body {err}");
                String::new()
            }
        }
    }

    async fn do_partial_body(&mut self, stream: BodyStream<'_>) -> Result<String, Error> {
        let mut reader =
            NsReader::from_reader(BufReader::new(BodyStreamAsyncReader::new(stream).compat()));
        let mut buff = Vec::new();
        let mut writer = Writer::new(Cursor::new(Vec::new()));
        let mut envelope_qname: Option<Vec<u8>> = None;
        loop {
            let event = reader.read_event_into_async(&mut buff).await?;
            match &event {
                Event::Start(start) => {
                    debug!("XML start: {:?}", start.name());
                    if envelope_qname.is_none() && is_named(start, "Envelope") {
                        debug!("Envelope detected on payload: {:?}", start.name());
                        envelope_qname = Some(qname(start).to_vec());
                    } else if envelope_qname.is_some()
                        && same_prefix(envelope_qname.as_ref().unwrap(), start)
                        && is_named(start, "Body")
                    {
                        debug!("Body tag detected, rest of the payload will be ignored.");
                        writer.write_event(Event::End(BytesEnd::from(QName(
                            envelope_qname.as_ref().unwrap().as_slice(),
                        ))))?;
                        return Ok(write(writer));
                    }
                }
                Event::Eof => {
                    debug!("XML eof");
                    return Ok(write(writer));
                }

                Event::End(end) => {
                    debug!("XML end: {:?}", end.name());
                }
                Event::Text(_) => {
                    debug!("XML text");
                }
                _ => {}
            }
            writer.write_event(event)?
        }
    }
}

fn qname<'a>(start: &'a BytesStart) -> &'a [u8] {
    start.name().into_inner()
}

fn is_named(start: &BytesStart, name: &str) -> bool {
    start.name().local_name().into_inner().eq(name.as_bytes())
}

fn same_prefix(old: &Vec<u8>, start: &BytesStart) -> bool {
    QName(old.as_slice()).prefix().eq(&start.name().prefix())
}

fn write(writer: Writer<Cursor<Vec<u8>>>) -> String {
    String::from_utf8_lossy(writer.into_inner().into_inner().as_slice()).to_string()
}

#[cfg(test)]
mod test {
    use crate::context::stream::StreamSolver;

    #[test]
    pub fn no_source() {
        let solver = StreamSolver::for_source(None);
        assert!(!solver.partial);
    }

    #[test]
    pub fn partial() {
        let solver = StreamSolver::for_source(Some("#[payload.Envelope.Header]"));
        assert!(solver.partial);
    }

    #[test]
    pub fn complete() {
        let solver = StreamSolver::for_source(Some("#[payload + payload.Envelope.Header]"));
        assert!(!solver.partial);
    }
}