use std::{
    borrow::Cow,
    cmp::min,
    convert::TryFrom,
    fmt,
    io::{self, BufRead, BufReader},
};
use saturating::Saturating as S;
#[allow(unused)]
use crate::binlog::EventStreamReader;
use super::BinlogEventHeader;
use crate::{
    binlog::{
        consts::{
            BinlogVersion, EventType, TransactionPayloadCompressionType, TransactionPayloadFields,
        },
        BinlogCtx, BinlogEvent, BinlogStruct,
    },
    io::{BufMutExt, ParseBuf, ReadMysqlExt},
    misc::raw::{bytes::EofBytes, int::*, RawBytes},
    proto::{MyDeserialize, MySerialize},
};
#[derive(Debug)]
pub struct TransactionPayloadReader<'a> {
    inner: TransactionPayloadInner<'a>,
}
impl<'a> TransactionPayloadReader<'a> {
    pub fn new_uncompressed(data: &'a [u8]) -> Self {
        Self {
            inner: TransactionPayloadInner::Uncompressed(data),
        }
    }
    pub fn new_zstd(data: &'a [u8]) -> io::Result<Self> {
        let decoder = zstd::Decoder::with_buffer(data)?;
        Ok(Self {
            inner: TransactionPayloadInner::ZstdCompressed(BufReader::new(decoder)),
        })
    }
    pub fn has_data_left(&mut self) -> io::Result<bool> {
        self.inner.has_data_left()
    }
}
impl io::Read for TransactionPayloadReader<'_> {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        self.inner.read(buf)
    }
}
impl io::BufRead for TransactionPayloadReader<'_> {
    fn fill_buf(&mut self) -> io::Result<&[u8]> {
        self.inner.fill_buf()
    }
    fn consume(&mut self, amt: usize) {
        self.inner.consume(amt)
    }
}
enum TransactionPayloadInner<'a> {
    Uncompressed(&'a [u8]),
    ZstdCompressed(BufReader<zstd::Decoder<'a, &'a [u8]>>),
}
impl TransactionPayloadInner<'_> {
    fn has_data_left(&mut self) -> io::Result<bool> {
        match self {
            TransactionPayloadInner::Uncompressed(x) => Ok(!x.is_empty()),
            TransactionPayloadInner::ZstdCompressed(ref mut x) => {
                x.fill_buf().map(|b| !b.is_empty())
            }
        }
    }
}
impl io::BufRead for TransactionPayloadInner<'_> {
    fn fill_buf(&mut self) -> io::Result<&[u8]> {
        match self {
            TransactionPayloadInner::Uncompressed(ref mut x) => x.fill_buf(),
            TransactionPayloadInner::ZstdCompressed(ref mut x) => x.fill_buf(),
        }
    }
    fn consume(&mut self, amt: usize) {
        match self {
            TransactionPayloadInner::Uncompressed(ref mut x) => x.consume(amt),
            TransactionPayloadInner::ZstdCompressed(ref mut x) => x.consume(amt),
        }
    }
}
impl io::Read for TransactionPayloadInner<'_> {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        match self {
            TransactionPayloadInner::Uncompressed(ref mut x) => x.read(buf),
            TransactionPayloadInner::ZstdCompressed(ref mut x) => x.read(buf),
        }
    }
}
impl fmt::Debug for TransactionPayloadInner<'_> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Self::Uncompressed(arg0) => f.debug_tuple("Uncompressed").field(arg0).finish(),
            Self::ZstdCompressed(_) => f.debug_tuple("ZstdCompressed").field(&"..").finish(),
        }
    }
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct TransactionPayloadEvent<'a> {
    payload_size: RawInt<LeU64>,
    algorithm: TransactionPayloadCompressionType,
    uncompressed_size: RawInt<LeU64>,
    payload: RawBytes<'a, EofBytes>,
    header_size: usize,
}
impl<'a> TransactionPayloadEvent<'a> {
    pub fn new(
        payload_size: u64,
        algorithm: TransactionPayloadCompressionType,
        uncompressed_size: u64,
        payload: impl Into<Cow<'a, [u8]>>,
    ) -> Self {
        Self {
            payload_size: RawInt::new(payload_size),
            algorithm,
            uncompressed_size: RawInt::new(uncompressed_size),
            payload: RawBytes::new(payload),
            header_size: 0,
        }
    }
    pub fn with_payload_size(mut self, payload_size: u64) -> Self {
        self.payload_size = RawInt::new(payload_size);
        self
    }
    pub fn with_algorithm(mut self, algorithm: TransactionPayloadCompressionType) -> Self {
        self.algorithm = algorithm;
        self
    }
    pub fn with_uncompressed_size(mut self, uncompressed_size: u64) -> Self {
        self.uncompressed_size = RawInt::new(uncompressed_size);
        self
    }
    pub fn with_payload(mut self, payload: impl Into<Cow<'a, [u8]>>) -> Self {
        self.payload = RawBytes::new(payload);
        self
    }
    pub fn payload_size(&self) -> u64 {
        self.payload_size.0
    }
    pub fn payload_raw(&'a self) -> &'a [u8] {
        self.payload.as_bytes()
    }
    pub fn decompressed(&self) -> io::Result<TransactionPayloadReader<'_>> {
        if self.algorithm == TransactionPayloadCompressionType::NONE {
            return Ok(TransactionPayloadReader::new_uncompressed(
                self.payload_raw(),
            ));
        }
        return TransactionPayloadReader::new_zstd(self.payload_raw());
    }
    pub fn danger_decompress(self) -> Vec<u8> {
        if self.algorithm == TransactionPayloadCompressionType::NONE {
            return self.payload_raw().to_vec();
        }
        let mut decode_buf = vec![0_u8; self.uncompressed_size.0 as usize];
        match zstd::stream::copy_decode(self.payload.as_bytes(), &mut decode_buf[..]) {
            Ok(_) => {}
            Err(_) => {
                return Vec::new();
            }
        };
        decode_buf
    }
    pub fn algorithm(&self) -> TransactionPayloadCompressionType {
        self.algorithm
    }
    pub fn uncompressed_size(&self) -> u64 {
        self.uncompressed_size.0
    }
    pub fn into_owned(self) -> TransactionPayloadEvent<'static> {
        TransactionPayloadEvent {
            payload_size: self.payload_size,
            algorithm: self.algorithm,
            uncompressed_size: self.uncompressed_size,
            payload: self.payload.into_owned(),
            header_size: self.header_size,
        }
    }
}
impl<'de> MyDeserialize<'de> for TransactionPayloadEvent<'de> {
    const SIZE: Option<usize> = None;
    type Ctx = BinlogCtx<'de>;
    fn deserialize(_ctx: Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
        let mut ob = Self {
            payload_size: RawInt::new(0),
            algorithm: TransactionPayloadCompressionType::NONE,
            uncompressed_size: RawInt::new(0),
            payload: RawBytes::from("".as_bytes()),
            header_size: 0,
        };
        let mut have_payload_size = false;
        let mut have_compression_type = false;
        let original_buf_size = buf.len();
        while !buf.is_empty() {
            let field_type = buf.read_lenenc_int()?;
            match TransactionPayloadFields::try_from(field_type) {
                Ok(TransactionPayloadFields::OTW_PAYLOAD_HEADER_END_MARK) => {
                    if !have_payload_size || !have_compression_type {
                        Err(io::Error::new(
                            io::ErrorKind::InvalidData,
                            "Missing field in payload header",
                        ))?;
                    }
                    if ob.payload_size.0 as usize > buf.len() {
                        Err(io::Error::new(
                            io::ErrorKind::InvalidData,
                            format!(
                                "Payload size is bigger than the remaining buffer: {} > {}",
                                ob.payload_size.0,
                                buf.len()
                            ),
                        ))?;
                    }
                    ob.header_size = original_buf_size - ob.payload_size.0 as usize;
                    let mut payload_buf: ParseBuf = buf.parse(ob.payload_size.0 as usize)?;
                    ob.payload = RawBytes::from(payload_buf.eat_all());
                    break;
                }
                Ok(TransactionPayloadFields::OTW_PAYLOAD_SIZE_FIELD) => {
                    let _length = buf.read_lenenc_int()?;
                    let val = buf.read_lenenc_int()?;
                    ob.payload_size = RawInt::new(val);
                    have_payload_size = true;
                    continue;
                }
                Ok(TransactionPayloadFields::OTW_PAYLOAD_COMPRESSION_TYPE_FIELD) => {
                    let _length = buf.read_lenenc_int()?;
                    let val = buf.read_lenenc_int()?;
                    ob.algorithm = TransactionPayloadCompressionType::try_from(val).unwrap();
                    have_compression_type = true;
                    continue;
                }
                Ok(TransactionPayloadFields::OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD) => {
                    let _length = buf.read_lenenc_int()?;
                    let val = buf.read_lenenc_int()?;
                    ob.uncompressed_size = RawInt::new(val);
                    continue;
                }
                Err(_) => {
                    let length = buf.eat_lenenc_int();
                    buf.skip(length as usize);
                    continue;
                }
            };
        }
        Ok(ob)
    }
}
impl MySerialize for TransactionPayloadEvent<'_> {
    fn serialize(&self, buf: &mut Vec<u8>) {
        buf.put_lenenc_int(TransactionPayloadFields::OTW_PAYLOAD_COMPRESSION_TYPE_FIELD as u64);
        buf.put_lenenc_int(crate::misc::lenenc_int_len(self.algorithm as u64));
        buf.put_lenenc_int(self.algorithm as u64);
        if self.algorithm != TransactionPayloadCompressionType::NONE {
            buf.put_lenenc_int(
                TransactionPayloadFields::OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD as u64,
            );
            buf.put_lenenc_int(crate::misc::lenenc_int_len(self.uncompressed_size.0));
            buf.put_lenenc_int(self.uncompressed_size.0);
        }
        buf.put_lenenc_int(TransactionPayloadFields::OTW_PAYLOAD_SIZE_FIELD as u64);
        buf.put_lenenc_int(crate::misc::lenenc_int_len(self.payload_size.0));
        buf.put_lenenc_int(self.payload_size.0);
        buf.put_lenenc_int(TransactionPayloadFields::OTW_PAYLOAD_HEADER_END_MARK as u64);
        self.payload.serialize(&mut *buf);
    }
}
impl<'a> BinlogEvent<'a> for TransactionPayloadEvent<'a> {
    const EVENT_TYPE: EventType = EventType::TRANSACTION_PAYLOAD_EVENT;
}
impl<'a> BinlogStruct<'a> for TransactionPayloadEvent<'a> {
    fn len(&self, _version: BinlogVersion) -> usize {
        let mut len = S(self.header_size);
        len += S(self.payload.0.len());
        min(len.0, u32::MAX as usize - BinlogEventHeader::LEN)
    }
}