use crate::remote::RemoteWrapper;
use actix::prelude::*;
use byteorder::{ByteOrder, NetworkEndian};
use bytes::{Buf, BufMut, BytesMut};
use futures::io::Error;
use serde::{Deserialize, Serialize};
use std::io;
use tokio_util::codec::{Decoder, Encoder};
const PREFIX: &[u8] = b"ACTIX/1.0\r\n";
const ENDIAN_LENGTH: usize = 4;
#[derive(Message, Deserialize, Serialize)]
#[rtype(result = "()")]
pub enum ClusterMessage {
    Request(u16, bool),
    Response,
    Message(RemoteWrapper),
    Decline,
}
impl ClusterMessage {
    pub fn split(&self) -> (Vec<u8>, Vec<u8>) {
        (
            match &self {
                Self::Message(wrapper) => wrapper.message_buffer.clone(),
                _ => panic!("split should not be used if not ClusterMessage::Message"),
            },
            flexbuffers::to_vec(self).unwrap(),
        )
    }
    pub fn set_buffer(&mut self, bytes: Vec<u8>) {
        match self {
            Self::Message(ref mut wrapper) => wrapper.message_buffer = bytes,
            _ => panic!("set_buffer should not be used if not ClusterMessage::Message"),
        }
    }
}
pub struct ConnectCodec {
    prefix: bool,
}
impl ConnectCodec {
    pub fn new() -> ConnectCodec {
        ConnectCodec { prefix: false }
    }
}
impl Decoder for ConnectCodec {
    type Item = ClusterMessage;
    type Error = Error;
    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        if !self.prefix {
            if src.len() < 11 {
                return Ok(None);
            }
            if &src[..11] == PREFIX {
                let _s = src.split_to(11);
                self.prefix = true;
            } else {
                return Err(io::Error::new(io::ErrorKind::Other, "Prefix mismatch"));
            }
        }
        let size = {
            if src.len() < ENDIAN_LENGTH {
                return Ok(None);
            }
            NetworkEndian::read_u32(src.as_ref()) as usize
        };
        if src.len() >= size + (ENDIAN_LENGTH * 2) {
            src.advance(ENDIAN_LENGTH);
            let header_size = NetworkEndian::read_u32(src.as_ref()) as usize;
            src.advance(ENDIAN_LENGTH);
            if size > header_size {
                let header = src.split_to(header_size);
                let buf = src.split_to(size - header_size);
                let mut cluster_message =
                    flexbuffers::from_slice::<ClusterMessage>(&header).unwrap();
                cluster_message.set_buffer(buf.to_vec());
                Ok(Some(cluster_message))
            } else {
                let buf = src.split_to(size);
                Ok(Some(
                    flexbuffers::from_slice::<ClusterMessage>(&buf).unwrap(),
                ))
            }
        } else {
            Ok(None)
        }
    }
}
impl Encoder<ClusterMessage> for ConnectCodec {
    type Error = Error;
    fn encode(&mut self, item: ClusterMessage, dst: &mut BytesMut) -> Result<(), Self::Error> {
        match &item {
            ClusterMessage::Request(_, _) => dst.extend_from_slice(PREFIX),
            ClusterMessage::Response => dst.extend_from_slice(PREFIX),
            ClusterMessage::Message(_) => {
                let (buffer, header) = item.split();
                let buffer_ref: &[u8] = buffer.as_ref();
                let header_ref: &[u8] = header.as_ref();
                dst.reserve(header_ref.len() + buffer_ref.len() + (ENDIAN_LENGTH * 2));
                dst.put_u32((header_ref.len() + buffer_ref.len()) as u32);
                dst.put_u32(header_ref.len() as u32);
                dst.put(header_ref);
                dst.put(buffer_ref);
                return Ok(());
            }
            _ => {}
        }
        let msg = flexbuffers::to_vec(&item).unwrap();
        let msg_ref: &[u8] = msg.as_ref();
        dst.reserve(msg_ref.len() + (ENDIAN_LENGTH * 2));
        dst.put_u32(msg_ref.len() as u32);
        dst.put_u32(msg_ref.len() as u32);
        dst.put(msg_ref);
        Ok(())
    }
}