routing 0.37.1

A secured storage DHT
// Copyright 2018 MaidSafe.net limited.
//
// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

mod request;
mod response;

pub use self::request::Request;
pub use self::response::{AccountInfo, Response};
use super::{QUORUM_DENOMINATOR, QUORUM_NUMERATOR};
use ack_manager::Ack;
use data::MAX_IMMUTABLE_DATA_SIZE_IN_BYTES;
use error::{BootstrapResponseError, RoutingError};
use event::Event;
use id::{FullId, PublicId};
use itertools::Itertools;
use lru_time_cache::LruCache;
use maidsafe_utilities::serialisation::{deserialise, serialise};
use peer_manager::SectionMap;
use routing_table::Authority;
use routing_table::{Prefix, VersionedPrefix, Xorable};
use rust_sodium::crypto::{box_, sign};
use sha3::Digest256;
use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::fmt::{self, Debug, Formatter};
use std::iter;
use std::time::Duration;
use tiny_keccak::sha3_256;
use types::MessageId;
use utils;
use xor_name::XorName;

/// The maximal length of a user message part, in bytes.
pub const MAX_PART_LEN: usize = 20 * 1024;
pub const MAX_PARTS: u32 = ((MAX_IMMUTABLE_DATA_SIZE_IN_BYTES / MAX_PART_LEN as u64) + 1) as u32;

/// Get and refresh messages from nodes have a high priority: They relocate data under churn and are
/// critical to prevent data loss.
pub const RELOCATE_PRIORITY: u8 = 1;
/// Other requests have a lower priority: If they fail due to high traffic, the sender retries.
pub const DEFAULT_PRIORITY: u8 = 2;
/// `Get` requests from clients have the lowest priority: If bandwidth is insufficient, the network
/// needs to prioritise maintaining its structure, data and consensus.
pub const CLIENT_GET_PRIORITY: u8 = 3;

/// Wrapper of all messages.
///
/// This is the only type allowed to be sent / received on the network.
#[derive(Debug, Serialize, Deserialize)]
// FIXME - See https://maidsafe.atlassian.net/browse/MAID-2026 for info on removing this exclusion.
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
pub enum Message {
    /// A message sent between two nodes directly
    Direct(DirectMessage),
    /// A message sent across the network (in transit)
    Hop(HopMessage),
    /// A direct message sent via a tunnel because the nodes could not connect directly
    TunnelDirect {
        /// The wrapped message
        content: DirectMessage,
        /// The sender
        src: PublicId,
        /// The receiver
        dst: PublicId,
    },
    /// A hop message sent via a tunnel because the nodes could not connect directly
    TunnelHop {
        /// The wrapped message
        content: HopMessage,
        /// The sender
        src: PublicId,
        /// The receiver
        dst: PublicId,
    },
}

impl Message {
    pub fn priority(&self) -> u8 {
        match *self {
            Message::Direct(ref content) | Message::TunnelDirect { ref content, .. } => {
                content.priority()
            }
            Message::Hop(ref content) | Message::TunnelHop { ref content, .. } => {
                content.content.content.priority()
            }
        }
    }
}

/// Messages sent via a direct connection.
///
/// Allows routing to directly send specific messages between nodes.
#[derive(Serialize, Deserialize)]
// FIXME - See https://maidsafe.atlassian.net/browse/MAID-2026 for info on removing this exclusion.
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
pub enum DirectMessage {
    /// Sent from members of a section or group message's source authority to the first hop. The
    /// message will only be relayed once enough signatures have been accumulated.
    MessageSignature(Digest256, sign::Signature),
    /// A signature for the current `BTreeSet` of section's node names
    SectionListSignature(SectionList, sign::Signature),
    /// Sent from a newly connected client to the bootstrap node to prove that it is the owner of
    /// the client's claimed public ID.
    BootstrapRequest(sign::Signature),
    /// Sent from the bootstrap node to a client in response to `BootstrapRequest`. If `true`,
    /// bootstrapping is successful; if `false` the sender is not available as a bootstrap node.
    BootstrapResponse(Result<(), BootstrapResponseError>),
    /// Sent from a node which is still joining the network to another node, to allow the latter to
    /// add the former to its routing table.
    CandidateInfo {
        /// `PublicId` from before relocation.
        old_public_id: PublicId,
        /// `PublicId` from after relocation.
        new_public_id: PublicId,
        /// Signature of concatenated `PublicId`s using the pre-relocation key.
        signature_using_old: sign::Signature,
        /// Signature of concatenated `PublicId`s and `signature_using_old` using the
        /// post-relocation key.
        signature_using_new: sign::Signature,
        /// Client authority from after relocation.
        new_client_auth: Authority<XorName>,
    },
    /// Sent from a node that needs a tunnel to be able to connect to the given peer.
    TunnelRequest(PublicId),
    /// Sent as a response to `TunnelRequest` if the node can act as a tunnel.
    TunnelSuccess(PublicId),
    /// Sent as a response to `TunnelSuccess` if the node is selected to act as a tunnel.
    TunnelSelect(PublicId),
    /// Sent from a tunnel node to indicate that the given peer has disconnected.
    TunnelClosed(PublicId),
    /// Sent to a tunnel node to indicate the tunnel is not needed any more.
    TunnelDisconnect(PublicId),
    /// Request a proof to be provided by the joining node.
    ///
    /// This is sent from member of Group Y to the joining node.
    ResourceProof {
        /// seed of proof
        seed: Vec<u8>,
        /// size of the proof
        target_size: usize,
        /// leading zero bits of the hash of the proof
        difficulty: u8,
    },
    /// Provide a proof to the network
    ///
    /// This is sent from the joining node to member of Group Y
    ResourceProofResponse {
        /// The index of this part of the resource proof.
        part_index: usize,
        /// The total number of parts.
        part_count: usize,
        /// Proof to be presented
        proof: Vec<u8>,
        /// Claimed leading zero bytes to be added to proof's header so that the hash matches
        /// the difficulty requirement
        leading_zero_bytes: u64,
    },
    /// Receipt of a part of a ResourceProofResponse
    ResourceProofResponseReceipt,
    /// Sent from a proxy node to its client to indicate that the client exceeded its rate limit.
    ProxyRateLimitExceeded { ack: Ack },
}

impl DirectMessage {
    /// The priority Crust should send this message with.
    pub fn priority(&self) -> u8 {
        match *self {
            DirectMessage::ResourceProofResponse { .. } => 9,
            _ => 0,
        }
    }
}

/// An individual hop message that represents a part of the route of a message in transit.
///
/// To relay a `SignedMessage` via another node, the `SignedMessage` is wrapped in a `HopMessage`.
/// The `signature` is from the node that sends this directly to a node in its routing table. To
/// prevent Man-in-the-middle attacks, the `content` is signed by the original sender.
#[derive(Serialize, Deserialize)]
pub struct HopMessage {
    /// Wrapped signed message.
    pub content: SignedMessage,
    /// Route number; corresponds to the index of the peer in the section of target peers being
    /// considered for the next hop.
    pub route: u8,
    /// Every node this has already been sent to.
    pub sent_to: BTreeSet<XorName>,
    /// Signature to be validated against the neighbouring sender's public key.
    signature: sign::Signature,
}

impl HopMessage {
    /// Wrap `content` for transmission to the next hop and sign it.
    pub fn new(
        content: SignedMessage,
        route: u8,
        sent_to: BTreeSet<XorName>,
        signing_key: &sign::SecretKey,
    ) -> Result<HopMessage, RoutingError> {
        let bytes_to_sign = serialise(&content)?;
        Ok(HopMessage {
            content,
            route,
            sent_to,
            signature: sign::sign_detached(&bytes_to_sign, signing_key),
        })
    }

    /// Validate that the message is signed by `verification_key` contained in message.
    ///
    /// This does not imply that the message came from a known node. That requires a check against
    /// the routing table to identify the name associated with the `verification_key`.
    pub fn verify(&self, verification_key: &sign::PublicKey) -> Result<(), RoutingError> {
        let signed_bytes = serialise(&self.content)?;
        if sign::verify_detached(&self.signature, &signed_bytes, verification_key) {
            Ok(())
        } else {
            Err(RoutingError::FailedSignature)
        }
    }
}

/// A list of a section's public IDs, together with a list of signatures of a neighbouring section.
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Hash, Serialize, Deserialize, Debug)]
pub struct SectionList {
    pub prefix: Prefix<XorName>,
    // TODO(MAID-1677): pub signatures: BTreeSet<(PublicId, sign::Signature)>,
    pub_ids: BTreeSet<PublicId>,
}

impl SectionList {
    /// Create
    pub fn new(prefix: Prefix<XorName>, pub_ids: BTreeSet<PublicId>) -> Self {
        SectionList { prefix, pub_ids }
    }

    /// Create from any object convertable to an iterator
    pub fn from<I: IntoIterator<Item = PublicId>>(prefix: Prefix<XorName>, pub_ids: I) -> Self {
        Self::new(prefix, pub_ids.into_iter().collect())
    }
}

/// Wrapper around a routing message, signed by the originator of the message.
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Hash, Serialize, Deserialize)]
pub struct SignedMessage {
    /// A request or response type message.
    content: RoutingMessage,
    /// Nodes sending the message (those expected to sign it)
    src_sections: Vec<SectionList>,
    /// The lists of the sections involved in routing this message, in chronological order.
    // TODO: implement (MAID-1677): sec_lists: Vec<SectionList>,
    /// The IDs and signatures of the source authority's members.
    signatures: BTreeMap<PublicId, sign::Signature>,
}

impl SignedMessage {
    /// Creates a `SignedMessage` with the given `content` and signed by the given `full_id`.
    ///
    /// Requires the list `src_sections` of nodes who should sign this message.
    pub fn new(
        content: RoutingMessage,
        full_id: &FullId,
        mut src_sections: Vec<SectionList>,
    ) -> Result<SignedMessage, RoutingError> {
        src_sections.sort_by_key(|list| list.prefix);
        let sig = sign::sign_detached(&serialise(&content)?, full_id.signing_private_key());
        Ok(SignedMessage {
            content,
            src_sections,
            signatures: iter::once((*full_id.public_id(), sig)).collect(),
        })
    }

    /// Confirms the signatures.
    // TODO (MAID-1677): verify the sending SectionLists via each hop's signed lists
    pub fn check_integrity(&self, min_section_size: usize) -> Result<(), RoutingError> {
        let signed_bytes = serialise(&self.content)?;
        if !self.find_invalid_sigs(signed_bytes).is_empty() {
            return Err(RoutingError::FailedSignature);
        }
        if !self.has_enough_sigs(min_section_size) {
            return Err(RoutingError::NotEnoughSignatures);
        }
        Ok(())
    }

    /// Returns whether the message is signed by the given public ID.
    pub fn signed_by(&self, pub_id: &PublicId) -> bool {
        self.signatures.contains_key(pub_id)
    }

    /// Returns the number of nodes in the source authority.
    pub fn src_size(&self) -> usize {
        self.src_sections.iter().map(|sl| sl.pub_ids.len()).sum()
    }

    /// Adds the given signature if it is new, without validating it. If the collection of section
    /// lists isn't empty, the signature is only added if `pub_id` is a member of the first section
    /// list.
    pub fn add_signature(&mut self, pub_id: PublicId, sig: sign::Signature) {
        if self.content.src.is_multiple() && self.is_sender(&pub_id) {
            let _ = self.signatures.insert(pub_id, sig);
        }
    }

    /// Adds all signatures from the given message, without validating them.
    pub fn add_signatures(&mut self, msg: SignedMessage) {
        if self.content.src.is_multiple() {
            self.signatures.extend(msg.signatures);
        }
    }

    /// Returns the routing message without cloning it.
    pub fn into_routing_message(self) -> RoutingMessage {
        self.content
    }

    /// The routing message that was signed.
    pub fn routing_message(&self) -> &RoutingMessage {
        &self.content
    }

    /// The priority Crust should send this message with.
    pub fn priority(&self) -> u8 {
        self.content.priority()
    }

    /// Returns whether there are enough signatures from the sender.
    pub fn check_fully_signed(&mut self, min_section_size: usize) -> bool {
        if !self.has_enough_sigs(min_section_size) {
            return false;
        }

        // Remove invalid signatures, then check again that we have enough.
        // We also check (again) that all messages are from valid senders, because the message
        // may have been sent from another node, and we cannot trust that that node correctly
        // controlled which signatures were added.
        // TODO (MAID-1677): we also need to check that the src_sections list corresponds to the
        // section(s) at some point in recent history; i.e. that it was valid; but we shouldn't
        // force it to match our own because our routing table may have changed since.

        let signed_bytes = match serialise(&self.content) {
            Ok(serialised) => serialised,
            Err(error) => {
                warn!("Failed to serialise {:?}: {:?}", self, error);
                return false;
            }
        };
        for invalid_signature in &self.find_invalid_sigs(signed_bytes) {
            let _ = self.signatures.remove(invalid_signature);
        }

        self.has_enough_sigs(min_section_size)
    }

    // Returns true iff `pub_id` is in self.section_lists
    fn is_sender(&self, pub_id: &PublicId) -> bool {
        self.src_sections
            .iter()
            .any(|list| list.pub_ids.contains(pub_id))
    }

    // Returns a list of all invalid signatures (not from an expected key or not cryptographically
    // valid).
    fn find_invalid_sigs(&self, signed_bytes: Vec<u8>) -> Vec<PublicId> {
        let invalid = self
            .signatures
            .iter()
            .filter_map(|(pub_id, sig)| {
                // Remove if not in sending nodes or signature is invalid:
                let is_valid = if let Authority::Client { ref client_id, .. } = self.content.src {
                    client_id == pub_id
                        && sign::verify_detached(sig, &signed_bytes, client_id.signing_public_key())
                } else {
                    self.is_sender(pub_id)
                        && sign::verify_detached(sig, &signed_bytes, pub_id.signing_public_key())
                };
                if is_valid {
                    None
                } else {
                    Some(*pub_id)
                }
            }).collect_vec();
        if !invalid.is_empty() {
            debug!("{:?}: invalid signatures: {:?}", self, invalid);
        }
        invalid
    }

    // Returns true if there are enough signatures (note that this method does not verify the
    // signatures, it only counts them; it also does not verify `self.src_sections`).
    fn has_enough_sigs(&self, min_section_size: usize) -> bool {
        use Authority::*;
        match self.content.src {
            ClientManager(_) | NaeManager(_) | NodeManager(_) => {
                // Note: there should be exactly one source section, but we use safe code:
                let valid_names: HashSet<_> = self
                    .src_sections
                    .iter()
                    .flat_map(|list| list.pub_ids.iter().map(PublicId::name))
                    .sorted_by(|lhs, rhs| self.content.src.name().cmp_distance(lhs, rhs))
                    .into_iter()
                    .take(min_section_size)
                    .collect();
                let valid_sigs = self
                    .signatures
                    .keys()
                    .filter(|pub_id| valid_names.contains(pub_id.name()))
                    .count();
                // TODO: we should consider replacing valid_names.len() with
                // cmp::min(routing_table.len(), min_section_size)
                // (or just min_section_size, but in that case we will not be able to handle user
                // messages during boot-up).
                valid_sigs * QUORUM_DENOMINATOR > valid_names.len() * QUORUM_NUMERATOR
            }
            Section(_) => {
                // Note: there should be exactly one source section, but we use safe code:
                let num_sending = self
                    .src_sections
                    .iter()
                    .fold(0, |count, list| count + list.pub_ids.len());
                let valid_sigs = self.signatures.len();
                valid_sigs * QUORUM_DENOMINATOR > num_sending * QUORUM_NUMERATOR
            }
            PrefixSection(_) => {
                // Each section must have enough signatures:
                self.src_sections.iter().all(|list| {
                    let valid_sigs = self
                        .signatures
                        .keys()
                        .filter(|pub_id| list.pub_ids.contains(pub_id))
                        .count();
                    valid_sigs * QUORUM_DENOMINATOR > list.pub_ids.len() * QUORUM_NUMERATOR
                })
            }
            ManagedNode(_) | Client { .. } => self.signatures.len() == 1,
        }
    }
}

/// A routing message with source and destination authorities.
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Hash, Debug, Serialize, Deserialize)]
pub struct RoutingMessage {
    /// Source authority
    pub src: Authority<XorName>,
    /// Destination authority
    pub dst: Authority<XorName>,
    /// The message content
    pub content: MessageContent,
}

impl RoutingMessage {
    /// Create ack for the given message
    pub fn ack_from(msg: &RoutingMessage, src: Authority<XorName>) -> Result<Self, RoutingError> {
        Ok(RoutingMessage {
            src,
            dst: msg.src,
            content: MessageContent::Ack(Ack::compute(msg)?, msg.priority()),
        })
    }

    /// Returns the priority Crust should send this message with.
    pub fn priority(&self) -> u8 {
        self.content.priority()
    }

    /// Returns a `DirectMessage::MessageSignature` for this message.
    pub fn to_signature(
        &self,
        signing_key: &sign::SecretKey,
    ) -> Result<DirectMessage, RoutingError> {
        let serialised_msg = serialise(self)?;
        let hash = sha3_256(&serialised_msg);
        let sig = sign::sign_detached(&serialised_msg, signing_key);
        Ok(DirectMessage::MessageSignature(hash, sig))
    }
}

/// The routing message types
///
/// # The bootstrap process
///
///
/// ## Bootstrapping a client
///
/// A newly created `Core`, A, starts in `Disconnected` state and tries to establish a connection to
/// any node B of the network via Crust. When successful, i.e. when receiving an `OnConnect` event,
/// it moves to the `Bootstrapping` state.
///
/// A now sends a `BootstrapRequest` message to B, containing the signature of A's public ID. B
/// responds with a `BootstrapResponse`, indicating success or failure. Once it receives that, A
/// goes into the `Client` state and uses B as its proxy to the network.
///
/// A can now exchange messages with any `Authority`. This completes the bootstrap process for
/// clients.
///
///
/// ## Becoming a node
///
/// If A wants to become a full routing node (`client_restriction == false`), it needs to relocate,
/// i. e. change its name to a value chosen by the network, and then add its peers to its routing
/// table and get added to their routing tables.
///
///
/// ### Relocating on the network
///
/// Once in `JoiningNode` state, A sends a `Relocate` request to the `NaeManager` section authority
/// X of A's current name. X computes a target destination Y to which A should relocate and sends
/// that section's `NaeManager`s an `ExpectCandidate` containing A's current public ID. Each member
/// of Y caches A's public ID, and sends `AcceptAsCandidate` to self section. Once Y receives
/// `AcceptAsCandidate`, sends a `RelocateResponse` back to A, which includes an address space range
/// into which A should relocate and also the public IDs of the members of Y. A then disconnects
/// from the network and reconnects with a new ID which falls within the specified address range.
/// After connecting to the members of Y, it begins the resource proof process. Upon successful
/// completion, A is regarded as a full node and connects to all neighbouring sections' peers.
///
///
/// ### Connecting to the matching section
///
/// To the `ManagedNode` for each public ID it receives from members of Y, A sends its
/// `ConnectionInfo`. It also caches the ID.
///
/// For each `ConnectionInfo` that a node Z receives from A, it decides whether it wants A in its
/// routing table. If yes, and if A's ID is in its ID cache, Z sends its own `ConnectionInfo` back
/// to A and also attempts to connect to A via Crust. A does the same, once it receives the
/// `ConnectionInfo`.
///
///
/// ### Resource Proof Evaluation to approve
/// When nodes Z of section Y receive `CandidateInfo` from A, they reply with a `ResourceProof`
/// request. Node A needs to answer these requests (resolving a hashing challenge) with
/// `ResourceProofResponse`. Members of Y will send out `CandidateApproval` messages to vote for the
/// approval in their section. Once the vote succeeds, the members of Y send `NodeApproval` to A and
/// add it into their routing table. When A receives the `NodeApproval` message, it adds the members
/// of Y to its routing table.
///
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Hash, Serialize, Deserialize)]
// FIXME - See https://maidsafe.atlassian.net/browse/MAID-2026 for info on removing this exclusion.
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
pub enum MessageContent {
    // ---------- Internal ------------
    /// Ask the network to relocate you.
    ///
    /// This is sent by a joining node to its `NaeManager`s with the intent to become a full routing
    /// node with a new ID in an address range chosen by the `NaeManager`s.
    Relocate {
        /// The message's unique identifier.
        message_id: MessageId,
    },
    /// Notify a joining node's `NaeManager` so that it sends a `RelocateResponse`.
    ExpectCandidate {
        /// The joining node's current public ID.
        old_public_id: PublicId,
        /// The joining node's current authority.
        old_client_auth: Authority<XorName>,
        /// The message's unique identifier.
        message_id: MessageId,
    },
    /// Send our Crust connection info encrypted to a node we wish to connect to and for which we
    /// have the keys.
    ConnectionInfoRequest {
        /// Encrypted Crust connection info.
        encrypted_conn_info: Vec<u8>,
        /// Nonce used to provide a salt in the encrypted message.
        nonce: [u8; box_::NONCEBYTES],
        /// The sender's public ID.
        pub_id: PublicId,
        /// The message's unique identifier.
        msg_id: MessageId,
    },
    /// Respond to a `ConnectionInfoRequest` with our Crust connection info encrypted to the
    /// requester.
    ConnectionInfoResponse {
        /// Encrypted Crust connection info.
        encrypted_conn_info: Vec<u8>,
        /// Nonce used to provide a salt in the encrypted message.
        nonce: [u8; box_::NONCEBYTES],
        /// The sender's public ID.
        pub_id: PublicId,
        /// The message's unique identifier.
        msg_id: MessageId,
    },
    /// Reply with the address range into which the joining node should move.
    RelocateResponse {
        /// The interval into which the joining node should join.
        target_interval: (XorName, XorName),
        /// The section that the joining node shall connect to.
        section: (Prefix<XorName>, BTreeSet<PublicId>),
        /// The message's unique identifier.
        message_id: MessageId,
    },
    /// Sent to notify neighbours and own members when our section's member list changed (for now,
    /// only when new nodes join).
    SectionUpdate {
        /// Section prefix and version. Included because this message is sent to both the section's
        /// own members and neighbouring sections.
        versioned_prefix: VersionedPrefix<XorName>,
        /// Members of the section
        members: BTreeSet<PublicId>,
    },
    /// Sent to all connected peers when our own section splits
    SectionSplit(VersionedPrefix<XorName>, XorName),
    /// Sent amongst members of a newly-merged section to allow synchronisation of their routing
    /// tables before notifying other connected peers of the merge.
    ///
    /// The source and destination authorities are both `PrefixSection` types, conveying the
    /// section sending this merge message and the target prefix of the merge respectively.
    OwnSectionMerge(SectionMap),
    /// Sent by members of a newly-merged section to peers outwith the merged section to notify them
    /// of the merge.
    ///
    /// The source authority is a `PrefixSection` conveying the section which just merged. The
    /// first field is the set of members of the section, and the second is the section version.
    OtherSectionMerge(BTreeSet<PublicId>, u64),
    /// Acknowledge receipt of any message except an `Ack`. It contains the hash of the
    /// received message and the priority.
    Ack(Ack, u8),
    /// Part of a user-facing message
    UserMessagePart {
        /// The hash of this user message.
        hash: Digest256,
        /// The unique message ID of this user message.
        msg_id: MessageId,
        /// The number of parts.
        part_count: u32,
        /// The index of this part.
        part_index: u32,
        /// The message priority.
        priority: u8,
        /// Is the message cacheable?
        cacheable: bool,
        /// The `part_index`-th part of the serialised user message.
        payload: Vec<u8>,
    },
    /// Confirm with section that the candidate is about to resource prove.
    ///
    /// Sent from the `NaeManager` to the `NaeManager`.
    AcceptAsCandidate {
        /// The joining node's current public ID.
        old_public_id: PublicId,
        /// The joining node's current authority.
        old_client_auth: Authority<XorName>,
        /// The interval into which the joining node should join.
        target_interval: (XorName, XorName),
        /// The message's unique identifier.
        message_id: MessageId,
    },
    /// Sent among Group Y to vote to accept a joining node.
    CandidateApproval {
        /// The joining node's current public ID.
        new_public_id: PublicId,
        /// Client authority of the candidate.
        new_client_auth: Authority<XorName>,
        /// The `PublicId`s of all routing table contacts shared by the nodes in our section.
        sections: SectionMap,
    },
    /// Approves the joining node as a routing node.
    ///
    /// Sent from Group Y to the joining node.
    NodeApproval {
        /// The routing table shared by the nodes in our group, including the `PublicId`s of our
        /// contacts.
        sections: SectionMap,
    },
}

impl MessageContent {
    /// The priority Crust should send this message with.
    pub fn priority(&self) -> u8 {
        match *self {
            MessageContent::Ack(_, priority) | MessageContent::UserMessagePart { priority, .. } => {
                priority
            }
            _ => 0,
        }
    }
}

impl Debug for DirectMessage {
    fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
        use self::DirectMessage::*;
        match *self {
            MessageSignature(ref digest, _) => write!(
                formatter,
                "MessageSignature ({}, ..)",
                utils::format_binary_array(&digest)
            ),
            SectionListSignature(ref sec_list, _) => {
                write!(formatter, "SectionListSignature({:?}, ..)", sec_list.prefix)
            }
            BootstrapRequest(_) => write!(formatter, "BootstrapRequest"),
            BootstrapResponse(ref result) => write!(formatter, "BootstrapResponse({:?})", result),
            CandidateInfo { .. } => write!(formatter, "CandidateInfo {{ .. }}"),
            TunnelRequest(pub_id) => write!(formatter, "TunnelRequest({:?})", pub_id),
            TunnelSuccess(pub_id) => write!(formatter, "TunnelSuccess({:?})", pub_id),
            TunnelSelect(pub_id) => write!(formatter, "TunnelSelect({:?})", pub_id),
            TunnelClosed(pub_id) => write!(formatter, "TunnelClosed({:?})", pub_id),
            TunnelDisconnect(pub_id) => write!(formatter, "TunnelDisconnect({:?})", pub_id),
            ResourceProof {
                ref seed,
                ref target_size,
                ref difficulty,
            } => write!(
                formatter,
                "ResourceProof {{ seed: {:?}, target_size: {:?}, difficulty: {:?} }}",
                seed, target_size, difficulty
            ),
            ResourceProofResponse {
                part_index,
                part_count,
                ref proof,
                leading_zero_bytes,
            } => write!(
                formatter,
                "ResourceProofResponse {{ part {}/{}, proof_len: {:?}, leading_zero_bytes: \
                 {:?} }}",
                part_index + 1,
                part_count,
                proof.len(),
                leading_zero_bytes
            ),
            ResourceProofResponseReceipt => write!(formatter, "ResourceProofResponseReceipt"),
            ProxyRateLimitExceeded { ref ack } => {
                write!(formatter, "ProxyRateLimitExceeded({:?})", ack)
            }
        }
    }
}

impl Debug for HopMessage {
    fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
        write!(
            formatter,
            "HopMessage {{ content: {:?}, route: {}, sent_to: .., signature: .. }}",
            self.content, self.route
        )
    }
}

impl Debug for SignedMessage {
    fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
        write!(
            formatter,
            "SignedMessage {{ content: {:?}, sending nodes: {:?}, signatures: {:?} }}",
            self.content,
            self.src_sections,
            self.signatures.keys().collect_vec()
        )
    }
}

impl Debug for MessageContent {
    fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
        use self::MessageContent::*;
        match *self {
            Relocate { ref message_id } => write!(formatter, "Relocate {{ {:?} }}", message_id),
            ExpectCandidate {
                ref old_public_id,
                ref old_client_auth,
                ref message_id,
            } => write!(
                formatter,
                "ExpectCandidate {{ {:?}, {:?}, {:?} }}",
                old_public_id, old_client_auth, message_id
            ),
            ConnectionInfoRequest {
                ref pub_id,
                ref msg_id,
                ..
            } => write!(
                formatter,
                "ConnectionInfoRequest {{ {:?}, {:?}, .. }}",
                pub_id, msg_id
            ),
            ConnectionInfoResponse {
                ref pub_id,
                ref msg_id,
                ..
            } => write!(
                formatter,
                "ConnectionInfoResponse {{ {:?}, {:?}, .. }}",
                pub_id, msg_id
            ),
            RelocateResponse {
                ref target_interval,
                ref section,
                ref message_id,
            } => write!(
                formatter,
                "RelocateResponse {{ {:?}, {:?}, {:?} }}",
                target_interval, section, message_id
            ),
            SectionUpdate {
                ref versioned_prefix,
                ref members,
            } => write!(
                formatter,
                "SectionUpdate {{ {:?}, {:?} }}",
                versioned_prefix, members
            ),
            SectionSplit(ref ver_pfx, ref joining_node) => {
                write!(formatter, "SectionSplit({:?}, {:?})", ver_pfx, joining_node)
            }
            OwnSectionMerge(ref sections) => write!(formatter, "OwnSectionMerge({:?})", sections),
            OtherSectionMerge(ref section, ref version) => {
                write!(formatter, "OtherSectionMerge({:?}, {:?})", section, version)
            }
            Ack(ack, priority) => write!(formatter, "Ack({:?}, {})", ack, priority),
            UserMessagePart {
                hash,
                part_count,
                part_index,
                priority,
                cacheable,
                ..
            } => write!(
                formatter,
                "UserMessagePart {{ {}/{}, priority: {}, cacheable: {}, \
                 {:02x}{:02x}{:02x}.. }}",
                part_index + 1,
                part_count,
                priority,
                cacheable,
                hash[0],
                hash[1],
                hash[2]
            ),
            AcceptAsCandidate {
                ref old_public_id,
                ref old_client_auth,
                ref target_interval,
                ref message_id,
            } => write!(
                formatter,
                "AcceptAsCandidate {{ {:?}, {:?}, {:?}, {:?} }}",
                old_public_id, old_client_auth, target_interval, message_id
            ),
            CandidateApproval {
                ref new_public_id,
                ref new_client_auth,
                ref sections,
            } => write!(
                formatter,
                "CandidateApproval {{ new: {:?}, client: {:?}, sections: {:?} }}",
                new_public_id, new_client_auth, sections
            ),
            NodeApproval { ref sections } => write!(formatter, "NodeApproval {{ {:?} }}", sections),
        }
    }
}

#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Debug, Hash, Serialize, Deserialize)]
/// A user-visible message: a `Request` or `Response`.
pub enum UserMessage {
    /// A user-visible request message.
    Request(Request),
    /// A user-visible response message.
    Response(Response),
}

impl UserMessage {
    /// Splits up the message into smaller `MessageContent` parts, which can individually be sent
    /// and routed, and then be put back together by the receiver.
    pub fn to_parts(&self, priority: u8) -> Result<Vec<MessageContent>, RoutingError> {
        let payload = serialise(self)?;
        let hash = sha3_256(&payload);
        let msg_id = *self.message_id();
        let len = payload.len();
        let part_count = (len + MAX_PART_LEN - 1) / MAX_PART_LEN;

        Ok((0..part_count)
            .map(|i| MessageContent::UserMessagePart {
                hash,
                msg_id,
                part_count: part_count as u32,
                part_index: i as u32,
                cacheable: self.is_cacheable(),
                payload: payload[(i * len / part_count)..((i + 1) * len / part_count)].to_vec(),
                priority,
            }).collect())
    }

    /// Puts the given parts of a serialised message together and verifies that it matches the
    /// given hash code. If it does, returns the `UserMessage`.
    pub fn from_parts<'a, I: Iterator<Item = &'a Vec<u8>>>(
        hash: Digest256,
        parts: I,
    ) -> Result<UserMessage, RoutingError> {
        let mut payload = Vec::new();
        for part in parts {
            payload.extend_from_slice(part);
        }
        let user_msg = deserialise(&payload[..])?;
        if hash != sha3_256(&payload) {
            Err(RoutingError::HashMismatch)
        } else {
            Ok(user_msg)
        }
    }

    /// Returns an event indicating that this message was received with the given source and
    /// destination authorities.
    pub fn into_event(self, src: Authority<XorName>, dst: Authority<XorName>) -> Event {
        match self {
            UserMessage::Request(request) => Event::Request { request, src, dst },
            UserMessage::Response(response) => Event::Response { response, src, dst },
        }
    }

    /// The unique message ID of this `UserMessage`.
    pub fn message_id(&self) -> &MessageId {
        match *self {
            UserMessage::Request(ref request) => request.message_id(),
            UserMessage::Response(ref response) => response.message_id(),
        }
    }

    fn is_cacheable(&self) -> bool {
        match *self {
            UserMessage::Request(ref request) => request.is_cacheable(),
            UserMessage::Response(ref response) => response.is_cacheable(),
        }
    }
}

/// This assembles `UserMessage`s from `UserMessagePart`s.
/// It maps `(hash, part_count)` of an incoming `UserMessage` to the map containing
/// all `UserMessagePart`s that have already arrived, by `part_index`.
pub struct UserMessageCache(LruCache<(Digest256, u32), BTreeMap<u32, Vec<u8>>>);

impl UserMessageCache {
    pub fn with_expiry_duration(duration: Duration) -> Self {
        UserMessageCache(LruCache::with_expiry_duration(duration))
    }

    /// Adds the given one to the cache of received message parts, returning a `UserMessage` if the
    /// given part was the last missing piece of it.
    pub fn add(
        &mut self,
        hash: Digest256,
        part_count: u32,
        part_index: u32,
        payload: Vec<u8>,
    ) -> Option<UserMessage> {
        {
            let entry = self
                .0
                .entry((hash, part_count))
                .or_insert_with(BTreeMap::new);
            if entry.insert(part_index, payload).is_some() {
                debug!(
                    "Duplicate UserMessagePart {}/{} with hash {:02x}{:02x}{:02x}.. \
                     added to cache.",
                    part_index + 1,
                    part_count,
                    hash[0],
                    hash[1],
                    hash[2]
                );
            }

            if entry.len() != part_count as usize {
                return None;
            }
        }

        self.0
            .remove(&(hash, part_count))
            .and_then(|part_map| UserMessage::from_parts(hash, part_map.values()).ok())
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use data::ImmutableData;
    use id::FullId;
    use maidsafe_utilities::serialisation::serialise;
    use rand;
    use routing_table::{Authority, Prefix};
    use rust_sodium::crypto::sign;
    use std::collections::BTreeSet;
    use std::iter;
    use tiny_keccak::sha3_256;
    use types::MessageId;
    use xor_name::XorName;

    #[test]
    fn signed_message_check_integrity() {
        let min_section_size = 1000;
        let name: XorName = rand::random();
        let full_id = FullId::new();
        let routing_message = RoutingMessage {
            src: Authority::Client {
                client_id: *full_id.public_id(),
                proxy_node_name: name,
            },
            dst: Authority::ClientManager(name),
            content: MessageContent::SectionSplit(Prefix::new(0, name).with_version(0), name),
        };
        let senders = iter::empty().collect();
        let signed_message_result = SignedMessage::new(routing_message.clone(), &full_id, senders);

        let mut signed_message = unwrap!(signed_message_result);

        assert_eq!(routing_message, *signed_message.routing_message());
        assert_eq!(1, signed_message.signatures.len());
        assert_eq!(
            Some(full_id.public_id()),
            signed_message.signatures.keys().next()
        );

        unwrap!(signed_message.check_integrity(min_section_size));

        let full_id = FullId::new();
        let bytes_to_sign = unwrap!(serialise(&(&routing_message, full_id.public_id())));
        let signature = sign::sign_detached(&bytes_to_sign, full_id.signing_private_key());

        signed_message.signatures = iter::once((*full_id.public_id(), signature)).collect();

        // Invalid because it's not signed by the sender:
        assert!(signed_message.check_integrity(min_section_size).is_err());
        // However, the signature itself should be valid:
        assert!(signed_message.has_enough_sigs(min_section_size));
    }

    #[test]
    fn msg_signatures() {
        let min_section_size = 8;

        let full_id_0 = FullId::new();
        let prefix = Prefix::new(0, *full_id_0.public_id().name());
        let full_id_1 = FullId::new();
        let full_id_2 = FullId::new();
        let irrelevant_full_id = FullId::new();
        let data_bytes: Vec<u8> = (0..10).map(|i| i as u8).collect();
        let data = ImmutableData::new(data_bytes);
        let user_msg = UserMessage::Request(Request::PutIData {
            data,
            msg_id: MessageId::new(),
        });
        let parts = unwrap!(user_msg.to_parts(1));
        assert_eq!(1, parts.len());
        let part = parts[0].clone();
        let name: XorName = rand::random();
        let routing_message = RoutingMessage {
            src: Authority::ClientManager(name),
            dst: Authority::ClientManager(name),
            content: part,
        };

        let src_sections = vec![SectionList::from(
            prefix,
            vec![
                *full_id_0.public_id(),
                *full_id_1.public_id(),
                *full_id_2.public_id(),
            ],
        )];
        let mut signed_msg = unwrap!(SignedMessage::new(
            routing_message,
            &full_id_0,
            src_sections,
        ));
        assert_eq!(signed_msg.signatures.len(), 1);

        // Try to add a signature which will not correspond to an ID from the sending nodes.
        let irrelevant_sig = match unwrap!(
            signed_msg
                .routing_message()
                .to_signature(irrelevant_full_id.signing_private_key(),)
        ) {
            DirectMessage::MessageSignature(_, sig) => {
                signed_msg.add_signature(*irrelevant_full_id.public_id(), sig);
                sig
            }
            msg => panic!("Unexpected message: {:?}", msg),
        };
        assert_eq!(signed_msg.signatures.len(), 1);
        assert!(
            !signed_msg
                .signatures
                .contains_key(irrelevant_full_id.public_id(),)
        );
        assert!(!signed_msg.check_fully_signed(min_section_size));

        // Add a valid signature for ID 1 and an invalid one for ID 2
        match unwrap!(
            signed_msg
                .routing_message()
                .to_signature(full_id_1.signing_private_key(),)
        ) {
            DirectMessage::MessageSignature(hash, sig) => {
                let serialised_msg = unwrap!(serialise(signed_msg.routing_message()));
                assert_eq!(hash, sha3_256(&serialised_msg));
                signed_msg.add_signature(*full_id_1.public_id(), sig);
            }
            msg => panic!("Unexpected message: {:?}", msg),
        }
        let bad_sig = sign::Signature([0; sign::SIGNATUREBYTES]);
        signed_msg.add_signature(*full_id_2.public_id(), bad_sig);
        assert_eq!(signed_msg.signatures.len(), 3);
        assert!(signed_msg.check_fully_signed(min_section_size));

        // Check the bad signature got removed (by check_fully_signed) properly.
        assert_eq!(signed_msg.signatures.len(), 2);
        assert!(!signed_msg.signatures.contains_key(full_id_2.public_id()));

        // Check an irrelevant signature can't be added.
        signed_msg.add_signature(*irrelevant_full_id.public_id(), irrelevant_sig);
        assert_eq!(signed_msg.signatures.len(), 2);
        assert!(
            !signed_msg
                .signatures
                .contains_key(irrelevant_full_id.public_id(),)
        );
    }

    #[test]
    fn hop_message_verify() {
        let name: XorName = rand::random();
        let routing_message = RoutingMessage {
            src: Authority::ClientManager(name),
            dst: Authority::ClientManager(name),
            content: MessageContent::SectionSplit(Prefix::new(0, name).with_version(1), name),
        };
        let full_id = FullId::new();
        let senders = iter::empty().collect();
        let signed_message_result = SignedMessage::new(routing_message.clone(), &full_id, senders);
        let signed_message = unwrap!(signed_message_result);

        let (public_signing_key, secret_signing_key) = sign::gen_keypair();
        let hop_message_result = HopMessage::new(
            signed_message.clone(),
            0,
            BTreeSet::new(),
            &secret_signing_key,
        );

        let hop_message = unwrap!(hop_message_result);

        assert_eq!(signed_message, hop_message.content);

        assert!(hop_message.verify(&public_signing_key).is_ok());

        let (public_signing_key, _) = sign::gen_keypair();
        assert!(hop_message.verify(&public_signing_key).is_err());
    }

    #[test]
    fn user_message_parts() {
        let data_bytes: Vec<u8> = (0..(MAX_PART_LEN * 2)).map(|i| i as u8).collect();
        let data = ImmutableData::new(data_bytes);
        let user_msg = UserMessage::Request(Request::PutIData {
            data,
            msg_id: MessageId::new(),
        });
        let msg_hash = sha3_256(&unwrap!(serialise(&user_msg)));
        let parts = unwrap!(user_msg.to_parts(42));
        assert_eq!(parts.len(), 3);
        let payloads: Vec<Vec<u8>> = parts
            .into_iter()
            .enumerate()
            .map(|(i, msg)| match msg {
                MessageContent::UserMessagePart {
                    hash,
                    msg_id,
                    part_count,
                    part_index,
                    payload,
                    priority,
                    cacheable,
                } => {
                    assert_eq!(msg_hash, hash);
                    assert_eq!(user_msg.message_id(), &msg_id);
                    assert_eq!(3, part_count);
                    assert_eq!(i, part_index as usize);
                    assert_eq!(42, priority);
                    assert!(!cacheable);
                    payload
                }
                msg => panic!("Unexpected message {:?}", msg),
            }).collect();
        let deserialised_user_msg = unwrap!(UserMessage::from_parts(msg_hash, payloads.iter()));
        assert_eq!(user_msg, deserialised_user_msg);
    }
}