use futures::future;
use tox_packet::dht::packed_node::*;
use crate::dht::server::*;
use crate::state_format::old::*;
use tox_binary_io::*;
use crate::dht::kbucket::*;
use crate::dht::ktree::*;
use failure::Fail;
use nom::{Err, error::ErrorKind as NomErrorKind};
error_kind! {
#[doc = "An error that can occur while serializing/deserializing object."]
#[derive(Debug)]
DeserializeError,
#[doc = "The specific kind of error that can occur."]
#[derive(Clone, Debug, Eq, PartialEq, Fail)]
DeserializeErrorKind {
#[doc = "Error indicates that object can't be parsed."]
#[fail(display = "Deserialize object error: {:?}, data: {:?}", error, data)]
Deserialize {
#[doc = "Parsing error."]
error: nom::Err<(Vec<u8>, NomErrorKind)>,
#[doc = "Object serialized data."]
data: Vec<u8>,
},
}
}
impl DeserializeError {
pub(crate) fn deserialize(e: Err<(&[u8], NomErrorKind)>, data: Vec<u8>) -> DeserializeError {
DeserializeError::from(DeserializeErrorKind::Deserialize { error: e.to_owned(), data })
}
}
#[derive(Clone, Debug)]
pub struct DaemonState;
pub const DHT_STATE_BUFFER_SIZE: usize =
(
(
32 + 19 ) * KBUCKET_DEFAULT_SIZE as usize ) * KBUCKET_MAX_ENTRIES as usize;
impl DaemonState {
pub async fn serialize_old(server: &Server) -> Vec<u8> {
let close_nodes = server.close_nodes.read().await;
let nodes = close_nodes.iter()
.flat_map(|node| node.to_packed_node())
.collect::<Vec<PackedNode>>();
let mut buf = [0u8; DHT_STATE_BUFFER_SIZE];
let (_, buf_len) = DhtState(nodes).to_bytes((&mut buf, 0)).expect("DhtState(nodes).to_bytes has failed");
buf[..buf_len].to_vec()
}
pub async fn deserialize_old(server: &Server, serialized_data: &[u8]) -> Result<(), DeserializeError> {
let nodes = match DhtState::from_bytes(serialized_data) {
Err(error) => {
return Err(DeserializeError::deserialize(error, serialized_data.to_vec()))
},
Ok((_, DhtState(nodes))) => {
nodes
}
};
let nodes_sender = nodes.iter()
.map(|node| server.ping_node(node));
future::join_all(nodes_sender).await;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use tox_crypto::*;
use tox_packet::dht::*;
use futures::channel::mpsc;
use futures::StreamExt;
macro_rules! unpack {
($variable:expr, $variant:path) => (
match $variable {
$variant(inner) => inner,
other => panic!("Expected {} but got {:?}", stringify!($variant), other),
}
)
}
#[tokio::test]
async fn daemon_state_serialize_deserialize() {
crypto_init().unwrap();
let (pk, sk) = gen_keypair();
let (tx, rx) = mpsc::channel(1);
let alice = Server::new(tx, pk, sk);
let addr_org = "1.2.3.4:1234".parse().unwrap();
let pk_org = gen_keypair().0;
let pn = PackedNode { pk: pk_org, saddr: addr_org };
alice.close_nodes.write().await.try_add(pn);
let serialized_vec = DaemonState::serialize_old(&alice).await;
DaemonState::deserialize_old(&alice, &serialized_vec).await.unwrap();
let (received, _rx) = rx.into_future().await;
let (packet, addr_to_send) = received.unwrap();
assert_eq!(addr_to_send, addr_org);
let sending_packet = unpack!(packet, Packet::NodesRequest);
assert_eq!(sending_packet.pk, pk);
let serialized_vec = DaemonState::serialize_old(&alice).await;
let serialized_len = serialized_vec.len();
let res = DaemonState::deserialize_old(&alice, &serialized_vec[..serialized_len - 1]).await;
let error = res.err().unwrap();
let mut input = vec![2, 1, 2, 3, 4, 4, 210];
input.extend_from_slice(&pk_org.0[..PUBLICKEYBYTES - 1]);
assert_eq!(*error.kind(), DeserializeErrorKind::Deserialize { error: Err::Error((
input, NomErrorKind::Eof)), data: serialized_vec[..serialized_len - 1].to_vec() });
let serialized_vec = [42; 10];
let res = DaemonState::deserialize_old(&alice, &serialized_vec).await;
let error = res.err().unwrap();
assert_eq!(*error.kind(), DeserializeErrorKind::Deserialize { error: Err::Error((
vec![42; 10], NomErrorKind::Tag)), data: serialized_vec.to_vec() });
alice.close_nodes.write().await.remove(&pk_org);
let serialized_vec = DaemonState::serialize_old(&alice).await;
assert!(DaemonState::deserialize_old(&alice, &serialized_vec).await.is_ok());
}
}