use bytes::BytesMut;
use codec::UviBytes;
use crate::dht_proto as proto;
use crate::record::{self, Record};
use futures::prelude::*;
use asynchronous_codec::Framed;
use libp2p_core::{Multiaddr, PeerId};
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use prost::Message;
use std::{borrow::Cow, convert::TryFrom, time::Duration};
use std::{io, iter};
use unsigned_varint::codec;
use wasm_timer::Instant;
pub const DEFAULT_PROTO_NAME: &[u8] = b"/ipfs/kad/1.0.0";
pub const DEFAULT_MAX_PACKET_SIZE: usize = 16 * 1024;
#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)]
pub enum KadConnectionType {
    
    NotConnected = 0,
    
    Connected = 1,
    
    CanConnect = 2,
    
    CannotConnect = 3,
}
impl From<proto::message::ConnectionType> for KadConnectionType {
    fn from(raw: proto::message::ConnectionType) -> KadConnectionType {
        use proto::message::ConnectionType::*;
        match raw {
            NotConnected => KadConnectionType::NotConnected,
            Connected => KadConnectionType::Connected,
            CanConnect => KadConnectionType::CanConnect,
            CannotConnect => KadConnectionType::CannotConnect,
        }
    }
}
impl Into<proto::message::ConnectionType> for KadConnectionType {
    fn into(self) -> proto::message::ConnectionType {
        use proto::message::ConnectionType::*;
        match self {
            KadConnectionType::NotConnected => NotConnected,
            KadConnectionType::Connected => Connected,
            KadConnectionType::CanConnect => CanConnect,
            KadConnectionType::CannotConnect => CannotConnect,
        }
    }
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct KadPeer {
    
    pub node_id: PeerId,
    
    pub multiaddrs: Vec<Multiaddr>,
    
    pub connection_ty: KadConnectionType,
}
impl TryFrom<proto::message::Peer> for KadPeer {
    type Error = io::Error;
    fn try_from(peer: proto::message::Peer) -> Result<KadPeer, Self::Error> {
        
        
        let node_id = PeerId::from_bytes(&peer.id)
            .map_err(|_| invalid_data("invalid peer id"))?;
        let mut addrs = Vec::with_capacity(peer.addrs.len());
        for addr in peer.addrs.into_iter() {
            let as_ma = Multiaddr::try_from(addr).map_err(invalid_data)?;
            addrs.push(as_ma);
        }
        debug_assert_eq!(addrs.len(), addrs.capacity());
        let connection_ty = proto::message::ConnectionType::from_i32(peer.connection)
            .ok_or_else(|| invalid_data("unknown connection type"))?
            .into();
        Ok(KadPeer {
            node_id,
            multiaddrs: addrs,
            connection_ty
        })
    }
}
impl Into<proto::message::Peer> for KadPeer {
    fn into(self) -> proto::message::Peer {
        proto::message::Peer {
            id: self.node_id.to_bytes(),
            addrs: self.multiaddrs.into_iter().map(|a| a.to_vec()).collect(),
            connection: {
                let ct: proto::message::ConnectionType = self.connection_ty.into();
                ct as i32
            }
        }
    }
}
#[derive(Debug, Clone)]
pub struct KademliaProtocolConfig {
    protocol_name: Cow<'static, [u8]>,
    
    max_packet_size: usize,
}
impl KademliaProtocolConfig {
    
    pub fn protocol_name(&self) -> &[u8] {
        &self.protocol_name
    }
    
    
    pub fn set_protocol_name(&mut self, name: impl Into<Cow<'static, [u8]>>) {
        self.protocol_name = name.into();
    }
    
    pub fn set_max_packet_size(&mut self, size: usize) {
        self.max_packet_size = size;
    }
}
impl Default for KademliaProtocolConfig {
    fn default() -> Self {
        KademliaProtocolConfig {
            protocol_name: Cow::Borrowed(DEFAULT_PROTO_NAME),
            max_packet_size: DEFAULT_MAX_PACKET_SIZE,
        }
    }
}
impl UpgradeInfo for KademliaProtocolConfig {
    type Info = Cow<'static, [u8]>;
    type InfoIter = iter::Once<Self::Info>;
    fn protocol_info(&self) -> Self::InfoIter {
        iter::once(self.protocol_name.clone())
    }
}
impl<C> InboundUpgrade<C> for KademliaProtocolConfig
where
    C: AsyncRead + AsyncWrite + Unpin,
{
    type Output = KadInStreamSink<C>;
    type Future = future::Ready<Result<Self::Output, io::Error>>;
    type Error = io::Error;
    fn upgrade_inbound(self, incoming: C, _: Self::Info) -> Self::Future {
        let mut codec = UviBytes::default();
        codec.set_max_len(self.max_packet_size);
        future::ok(
            Framed::new(incoming, codec)
                .err_into()
                .with::<_, _, fn(_) -> _, _>(|response| {
                    let proto_struct = resp_msg_to_proto(response);
                    let mut buf = Vec::with_capacity(proto_struct.encoded_len());
                    proto_struct.encode(&mut buf).expect("Vec<u8> provides capacity as needed");
                    future::ready(Ok(io::Cursor::new(buf)))
                })
                .and_then::<_, fn(_) -> _>(|bytes| {
                    let request = match proto::Message::decode(bytes) {
                        Ok(r) => r,
                        Err(err) => return future::ready(Err(err.into()))
                    };
                    future::ready(proto_to_req_msg(request))
                }),
        )
    }
}
impl<C> OutboundUpgrade<C> for KademliaProtocolConfig
where
    C: AsyncRead + AsyncWrite + Unpin,
{
    type Output = KadOutStreamSink<C>;
    type Future = future::Ready<Result<Self::Output, io::Error>>;
    type Error = io::Error;
    fn upgrade_outbound(self, incoming: C, _: Self::Info) -> Self::Future {
        let mut codec = UviBytes::default();
        codec.set_max_len(self.max_packet_size);
        future::ok(
            Framed::new(incoming, codec)
                .err_into()
                .with::<_, _, fn(_) -> _, _>(|request| {
                    let proto_struct = req_msg_to_proto(request);
                    let mut buf = Vec::with_capacity(proto_struct.encoded_len());
                    proto_struct.encode(&mut buf).expect("Vec<u8> provides capacity as needed");
                    future::ready(Ok(io::Cursor::new(buf)))
                })
                .and_then::<_, fn(_) -> _>(|bytes| {
                    let response = match proto::Message::decode(bytes) {
                        Ok(r) => r,
                        Err(err) => return future::ready(Err(err.into()))
                    };
                    future::ready(proto_to_resp_msg(response))
                }),
        )
    }
}
pub type KadInStreamSink<S> = KadStreamSink<S, KadResponseMsg, KadRequestMsg>;
pub type KadOutStreamSink<S> = KadStreamSink<S, KadRequestMsg, KadResponseMsg>;
pub type KadStreamSink<S, A, B> = stream::AndThen<
    sink::With<
        stream::ErrInto<Framed<S, UviBytes<io::Cursor<Vec<u8>>>>, io::Error>,
        io::Cursor<Vec<u8>>,
        A,
        future::Ready<Result<io::Cursor<Vec<u8>>, io::Error>>,
        fn(A) -> future::Ready<Result<io::Cursor<Vec<u8>>, io::Error>>,
    >,
    future::Ready<Result<B, io::Error>>,
    fn(BytesMut) -> future::Ready<Result<B, io::Error>>,
>;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum KadRequestMsg {
    
    Ping,
    
    
    FindNode {
        
        key: Vec<u8>,
    },
    
    
    GetProviders {
        
        key: record::Key,
    },
    
    AddProvider {
        
        key: record::Key,
        
        provider: KadPeer,
    },
    
    GetValue {
        
        key: record::Key,
    },
    
    PutValue {
        record: Record,
    }
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum KadResponseMsg {
    
    Pong,
    
    FindNode {
        
        closer_peers: Vec<KadPeer>,
    },
    
    GetProviders {
        
        closer_peers: Vec<KadPeer>,
        
        provider_peers: Vec<KadPeer>,
    },
    
    GetValue {
        
        record: Option<Record>,
        
        closer_peers: Vec<KadPeer>,
    },
    
    PutValue {
        
        key: record::Key,
        
        value: Vec<u8>,
    },
}
fn req_msg_to_proto(kad_msg: KadRequestMsg) -> proto::Message {
    match kad_msg {
        KadRequestMsg::Ping => proto::Message {
            r#type: proto::message::MessageType::Ping as i32,
            .. proto::Message::default()
        },
        KadRequestMsg::FindNode { key } => proto::Message {
            r#type: proto::message::MessageType::FindNode as i32,
            key,
            cluster_level_raw: 10,
            .. proto::Message::default()
        },
        KadRequestMsg::GetProviders { key } => proto::Message {
            r#type: proto::message::MessageType::GetProviders as i32,
            key: key.to_vec(),
            cluster_level_raw: 10,
            .. proto::Message::default()
        },
        KadRequestMsg::AddProvider { key, provider } => proto::Message {
            r#type: proto::message::MessageType::AddProvider as i32,
            cluster_level_raw: 10,
            key: key.to_vec(),
            provider_peers: vec![provider.into()],
            .. proto::Message::default()
        },
        KadRequestMsg::GetValue { key } => proto::Message {
            r#type: proto::message::MessageType::GetValue as i32,
            cluster_level_raw: 10,
            key: key.to_vec(),
            .. proto::Message::default()
        },
        KadRequestMsg::PutValue { record } => proto::Message {
            r#type: proto::message::MessageType::PutValue as i32,
            record: Some(record_to_proto(record)),
            .. proto::Message::default()
        }
    }
}
fn resp_msg_to_proto(kad_msg: KadResponseMsg) -> proto::Message {
    match kad_msg {
        KadResponseMsg::Pong => proto::Message {
            r#type: proto::message::MessageType::Ping as i32,
            .. proto::Message::default()
        },
        KadResponseMsg::FindNode { closer_peers } => proto::Message {
            r#type: proto::message::MessageType::FindNode as i32,
            cluster_level_raw: 9,
            closer_peers: closer_peers.into_iter().map(KadPeer::into).collect(),
            .. proto::Message::default()
        },
        KadResponseMsg::GetProviders { closer_peers, provider_peers } => proto::Message {
            r#type: proto::message::MessageType::GetProviders as i32,
            cluster_level_raw: 9,
            closer_peers: closer_peers.into_iter().map(KadPeer::into).collect(),
            provider_peers: provider_peers.into_iter().map(KadPeer::into).collect(),
            .. proto::Message::default()
        },
        KadResponseMsg::GetValue { record, closer_peers } => proto::Message {
            r#type: proto::message::MessageType::GetValue as i32,
            cluster_level_raw: 9,
            closer_peers: closer_peers.into_iter().map(KadPeer::into).collect(),
            record: record.map(record_to_proto),
            .. proto::Message::default()
        },
        KadResponseMsg::PutValue { key, value } => proto::Message {
            r#type: proto::message::MessageType::PutValue as i32,
            key: key.to_vec(),
            record: Some(proto::Record {
                key: key.to_vec(),
                value,
                .. proto::Record::default()
            }),
            .. proto::Message::default()
        }
    }
}
fn proto_to_req_msg(message: proto::Message) -> Result<KadRequestMsg, io::Error> {
    let msg_type = proto::message::MessageType::from_i32(message.r#type)
        .ok_or_else(|| invalid_data(format!("unknown message type: {}", message.r#type)))?;
    match msg_type {
        proto::message::MessageType::Ping => Ok(KadRequestMsg::Ping),
        proto::message::MessageType::PutValue => {
            let record = record_from_proto(message.record.unwrap_or_default())?;
            Ok(KadRequestMsg::PutValue { record })
        }
        proto::message::MessageType::GetValue => {
            Ok(KadRequestMsg::GetValue { key: record::Key::from(message.key) })
        }
        proto::message::MessageType::FindNode => {
            Ok(KadRequestMsg::FindNode { key: message.key })
        }
        proto::message::MessageType::GetProviders => {
            Ok(KadRequestMsg::GetProviders { key: record::Key::from(message.key)})
        }
        proto::message::MessageType::AddProvider => {
            
            
            
            let provider = message.provider_peers
                .into_iter()
                .find_map(|peer| KadPeer::try_from(peer).ok());
            if let Some(provider) = provider {
                let key = record::Key::from(message.key);
                Ok(KadRequestMsg::AddProvider { key, provider })
            } else {
                Err(invalid_data("AddProvider message with no valid peer."))
            }
        }
    }
}
fn proto_to_resp_msg(message: proto::Message) -> Result<KadResponseMsg, io::Error> {
    let msg_type = proto::message::MessageType::from_i32(message.r#type)
        .ok_or_else(|| invalid_data(format!("unknown message type: {}", message.r#type)))?;
    match msg_type {
        proto::message::MessageType::Ping => Ok(KadResponseMsg::Pong),
        proto::message::MessageType::GetValue => {
            let record =
                if let Some(r) = message.record {
                    Some(record_from_proto(r)?)
                } else {
                    None
                };
            let closer_peers = message.closer_peers.into_iter()
                .filter_map(|peer| KadPeer::try_from(peer).ok())
                .collect();
            Ok(KadResponseMsg::GetValue { record, closer_peers })
        }
        proto::message::MessageType::FindNode => {
            let closer_peers = message.closer_peers.into_iter()
                .filter_map(|peer| KadPeer::try_from(peer).ok())
                .collect();
            Ok(KadResponseMsg::FindNode { closer_peers })
        }
        proto::message::MessageType::GetProviders => {
            let closer_peers = message.closer_peers.into_iter()
                .filter_map(|peer| KadPeer::try_from(peer).ok())
                .collect();
            let provider_peers = message.provider_peers.into_iter()
                .filter_map(|peer| KadPeer::try_from(peer).ok())
                .collect();
            Ok(KadResponseMsg::GetProviders {
                closer_peers,
                provider_peers,
            })
        }
        proto::message::MessageType::PutValue => {
            let key = record::Key::from(message.key);
            let rec = message.record.ok_or_else(|| {
                invalid_data("received PutValue message with no record")
            })?;
            Ok(KadResponseMsg::PutValue {
                key,
                value: rec.value
            })
        }
        proto::message::MessageType::AddProvider =>
            Err(invalid_data("received an unexpected AddProvider message"))
    }
}
fn record_from_proto(record: proto::Record) -> Result<Record, io::Error> {
    let key = record::Key::from(record.key);
    let value = record.value;
    let publisher =
        if !record.publisher.is_empty() {
            PeerId::from_bytes(&record.publisher)
                .map(Some)
                .map_err(|_| invalid_data("Invalid publisher peer ID."))?
        } else {
            None
        };
    let expires =
        if record.ttl > 0 {
            Some(Instant::now() + Duration::from_secs(record.ttl as u64))
        } else {
            None
        };
    Ok(Record { key, value, publisher, expires })
}
fn record_to_proto(record: Record) -> proto::Record {
    proto::Record {
        key: record.key.to_vec(),
        value: record.value,
        publisher: record.publisher.map(|id| id.to_bytes()).unwrap_or_default(),
        ttl: record.expires
            .map(|t| {
                let now = Instant::now();
                if t > now {
                    (t - now).as_secs() as u32
                } else {
                    1 
                }
            })
            .unwrap_or(0),
        time_received: String::new()
    }
}
fn invalid_data<E>(e: E) -> io::Error
where
    E: Into<Box<dyn std::error::Error + Send + Sync>>
{
    io::Error::new(io::ErrorKind::InvalidData, e)
}
#[cfg(test)]
mod tests {
    
}