#![allow(deprecated)]
use std::io::Read;
use std::io;
use std::fmt::{Debug, self};
use flate2::{
    Compression,
    bufread::{GzEncoder, GzDecoder},
};
use fluvio_protocol::{Encoder, Decoder};
use fluvio_smartmodule::dataplane::smartmodule::SmartModuleExtraParams;
#[derive(Debug, Default, Clone, Encoder, Decoder)]
#[deprecated(
    since = "0.10.0",
    note = "will be removed in the next version. Use SmartModuleInvocation instead "
)]
pub(crate) struct LegacySmartModulePayload {
    pub wasm: SmartModuleWasmCompressed,
    pub kind: SmartModuleKind,
    pub params: SmartModuleExtraParams,
}
#[derive(Debug, Default, Clone, Encoder, Decoder)]
pub struct SmartModuleInvocation {
    pub wasm: SmartModuleInvocationWasm,
    pub kind: SmartModuleKind,
    pub params: SmartModuleExtraParams,
}
#[derive(Clone, Encoder, Decoder)]
pub enum SmartModuleInvocationWasm {
    #[fluvio(tag = 0)]
    Predefined(String),
    #[fluvio(tag = 1)]
    AdHoc(Vec<u8>),
}
impl SmartModuleInvocationWasm {
    pub fn adhoc_from_bytes(bytes: &[u8]) -> io::Result<Self> {
        Ok(Self::AdHoc(zip(bytes)?))
    }
    pub fn into_raw(self) -> io::Result<Vec<u8>> {
        match self {
            Self::AdHoc(gzipped) => Ok(unzip(gzipped.as_ref())?),
            _ => Err(io::Error::new(
                io::ErrorKind::InvalidData,
                "unable to represent as raw data",
            )),
        }
    }
}
impl Default for SmartModuleInvocationWasm {
    fn default() -> Self {
        Self::AdHoc(Vec::new())
    }
}
impl Debug for SmartModuleInvocationWasm {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Self::Predefined(module) => write!(f, "Predefined{module}"),
            Self::AdHoc(bytes) => f
                .debug_tuple("Adhoc")
                .field(&format!("{} bytes", bytes.len()))
                .finish(),
        }
    }
}
#[derive(Debug, Clone, Encoder, Decoder, Default)]
pub enum SmartModuleKind {
    #[default]
    #[fluvio(tag = 0)]
    Filter,
    #[fluvio(tag = 1)]
    Map,
    #[fluvio(tag = 2)]
    #[fluvio(min_version = ARRAY_MAP_WASM_API)]
    ArrayMap,
    #[fluvio(tag = 3)]
    Aggregate { accumulator: Vec<u8> },
    #[fluvio(tag = 4)]
    #[fluvio(min_version = ARRAY_MAP_WASM_API)]
    FilterMap,
    #[fluvio(tag = 5)]
    #[fluvio(min_version = SMART_MODULE_API, max_version = CHAIN_SMARTMODULE_API)]
    Join(String),
    #[fluvio(tag = 6)]
    #[fluvio(min_version = SMART_MODULE_API, max_version = CHAIN_SMARTMODULE_API)]
    JoinStream {
        topic: String,
        derivedstream: String,
    },
    #[fluvio(tag = 7)]
    #[fluvio(min_version = GENERIC_SMARTMODULE_API)]
    Generic(SmartModuleContextData),
}
impl std::fmt::Display for SmartModuleKind {
    fn fmt(&self, out: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
        let name = match self {
            SmartModuleKind::Filter => "filter",
            SmartModuleKind::Map => "map",
            SmartModuleKind::ArrayMap => "array_map",
            SmartModuleKind::Aggregate { .. } => "aggregate",
            SmartModuleKind::FilterMap => "filter_map",
            SmartModuleKind::Join(..) => "join",
            SmartModuleKind::JoinStream { .. } => "join_stream",
            SmartModuleKind::Generic(..) => "smartmodule",
        };
        out.write_str(name)
    }
}
#[derive(Debug, Clone, Encoder, Decoder, Default)]
pub enum SmartModuleContextData {
    #[default]
    #[fluvio(tag = 0)]
    None,
    #[fluvio(tag = 1)]
    Aggregate { accumulator: Vec<u8> },
    #[fluvio(tag = 2)]
    Join(String),
    #[fluvio(tag = 3)]
    JoinStream {
        topic: String,
        derivedstream: String,
    },
}
#[deprecated(
    since = "0.10.0",
    note = "will be removed in the next version. Use SmartModuleInvocationWasm instead"
)]
#[derive(Clone, Encoder, Decoder, Debug)]
pub enum SmartModuleWasmCompressed {
    #[fluvio(tag = 0)]
    Raw(Vec<u8>),
    #[fluvio(tag = 1)]
    #[fluvio(min_version = 14)]
    Gzip(Vec<u8>),
    }
impl Default for SmartModuleWasmCompressed {
    fn default() -> Self {
        Self::Raw(Default::default())
    }
}
fn zip(raw: &[u8]) -> io::Result<Vec<u8>> {
    let mut encoder = GzEncoder::new(raw, Compression::default());
    let mut buffer = Vec::with_capacity(raw.len());
    encoder.read_to_end(&mut buffer)?;
    Ok(buffer)
}
fn unzip(compressed: &[u8]) -> io::Result<Vec<u8>> {
    let mut decoder = GzDecoder::new(compressed);
    let mut buffer = Vec::with_capacity(compressed.len());
    decoder.read_to_end(&mut buffer)?;
    Ok(buffer)
}
#[cfg(test)]
mod tests {
    use super::*;
    #[test]
    fn test_encode_smartmodulekind() {
        let mut dest = Vec::new();
        let value: SmartModuleKind = SmartModuleKind::Filter;
        value.encode(&mut dest, 0).expect("should encode");
        assert_eq!(dest.len(), 1);
        assert_eq!(dest[0], 0x00);
    }
    #[test]
    fn test_decode_smartmodulekind() {
        let bytes = vec![0x01];
        let mut value: SmartModuleKind = Default::default();
        value
            .decode(&mut io::Cursor::new(bytes), 0)
            .expect("should decode");
        assert!(matches!(value, SmartModuleKind::Map));
    }
    #[test]
    fn test_gzip_smartmoduleinvocationwasm() {
        let bytes = vec![0xde, 0xad, 0xbe, 0xef];
        let value: SmartModuleInvocationWasm =
            SmartModuleInvocationWasm::adhoc_from_bytes(&bytes).expect("should encode");
        if let SmartModuleInvocationWasm::AdHoc(compressed_bytes) = value {
            let decompressed_bytes = unzip(&compressed_bytes).expect("should decompress");
            assert_eq!(decompressed_bytes, bytes);
        } else {
            panic!("not adhoc")
        }
    }
}