mod request;
mod response;
pub use self::request::Request;
pub use self::response::{AccountInfo, Response};
use super::{QUORUM_DENOMINATOR, QUORUM_NUMERATOR};
use ack_manager::Ack;
use data::MAX_IMMUTABLE_DATA_SIZE_IN_BYTES;
use error::{BootstrapResponseError, RoutingError};
use event::Event;
use id::{FullId, PublicId};
use itertools::Itertools;
use lru_time_cache::LruCache;
use maidsafe_utilities::serialisation::{deserialise, serialise};
use peer_manager::SectionMap;
use routing_table::Authority;
use routing_table::{Prefix, VersionedPrefix, Xorable};
use rust_sodium::crypto::{box_, sign};
use sha3::Digest256;
use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::fmt::{self, Debug, Formatter};
use std::iter;
use std::time::Duration;
use tiny_keccak::sha3_256;
use types::MessageId;
use utils;
use xor_name::XorName;
pub const MAX_PART_LEN: usize = 20 * 1024;
pub const MAX_PARTS: u32 = ((MAX_IMMUTABLE_DATA_SIZE_IN_BYTES / MAX_PART_LEN as u64) + 1) as u32;
pub const RELOCATE_PRIORITY: u8 = 1;
pub const DEFAULT_PRIORITY: u8 = 2;
pub const CLIENT_GET_PRIORITY: u8 = 3;
#[derive(Debug, Serialize, Deserialize)]
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
pub enum Message {
Direct(DirectMessage),
Hop(HopMessage),
TunnelDirect {
content: DirectMessage,
src: PublicId,
dst: PublicId,
},
TunnelHop {
content: HopMessage,
src: PublicId,
dst: PublicId,
},
}
impl Message {
pub fn priority(&self) -> u8 {
match *self {
Message::Direct(ref content) | Message::TunnelDirect { ref content, .. } => {
content.priority()
}
Message::Hop(ref content) | Message::TunnelHop { ref content, .. } => {
content.content.content.priority()
}
}
}
}
#[derive(Serialize, Deserialize)]
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
pub enum DirectMessage {
MessageSignature(Digest256, sign::Signature),
SectionListSignature(SectionList, sign::Signature),
BootstrapRequest(sign::Signature),
BootstrapResponse(Result<(), BootstrapResponseError>),
CandidateInfo {
old_public_id: PublicId,
new_public_id: PublicId,
signature_using_old: sign::Signature,
signature_using_new: sign::Signature,
new_client_auth: Authority<XorName>,
},
TunnelRequest(PublicId),
TunnelSuccess(PublicId),
TunnelSelect(PublicId),
TunnelClosed(PublicId),
TunnelDisconnect(PublicId),
ResourceProof {
seed: Vec<u8>,
target_size: usize,
difficulty: u8,
},
ResourceProofResponse {
part_index: usize,
part_count: usize,
proof: Vec<u8>,
leading_zero_bytes: u64,
},
ResourceProofResponseReceipt,
ProxyRateLimitExceeded { ack: Ack },
}
impl DirectMessage {
pub fn priority(&self) -> u8 {
match *self {
DirectMessage::ResourceProofResponse { .. } => 9,
_ => 0,
}
}
}
#[derive(Serialize, Deserialize)]
pub struct HopMessage {
pub content: SignedMessage,
pub route: u8,
pub sent_to: BTreeSet<XorName>,
signature: sign::Signature,
}
impl HopMessage {
pub fn new(
content: SignedMessage,
route: u8,
sent_to: BTreeSet<XorName>,
signing_key: &sign::SecretKey,
) -> Result<HopMessage, RoutingError> {
let bytes_to_sign = serialise(&content)?;
Ok(HopMessage {
content,
route,
sent_to,
signature: sign::sign_detached(&bytes_to_sign, signing_key),
})
}
pub fn verify(&self, verification_key: &sign::PublicKey) -> Result<(), RoutingError> {
let signed_bytes = serialise(&self.content)?;
if sign::verify_detached(&self.signature, &signed_bytes, verification_key) {
Ok(())
} else {
Err(RoutingError::FailedSignature)
}
}
}
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Hash, Serialize, Deserialize, Debug)]
pub struct SectionList {
pub prefix: Prefix<XorName>,
pub_ids: BTreeSet<PublicId>,
}
impl SectionList {
pub fn new(prefix: Prefix<XorName>, pub_ids: BTreeSet<PublicId>) -> Self {
SectionList { prefix, pub_ids }
}
pub fn from<I: IntoIterator<Item = PublicId>>(prefix: Prefix<XorName>, pub_ids: I) -> Self {
Self::new(prefix, pub_ids.into_iter().collect())
}
}
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Hash, Serialize, Deserialize)]
pub struct SignedMessage {
content: RoutingMessage,
src_sections: Vec<SectionList>,
signatures: BTreeMap<PublicId, sign::Signature>,
}
impl SignedMessage {
pub fn new(
content: RoutingMessage,
full_id: &FullId,
mut src_sections: Vec<SectionList>,
) -> Result<SignedMessage, RoutingError> {
src_sections.sort_by_key(|list| list.prefix);
let sig = sign::sign_detached(&serialise(&content)?, full_id.signing_private_key());
Ok(SignedMessage {
content,
src_sections,
signatures: iter::once((*full_id.public_id(), sig)).collect(),
})
}
pub fn check_integrity(&self, min_section_size: usize) -> Result<(), RoutingError> {
let signed_bytes = serialise(&self.content)?;
if !self.find_invalid_sigs(signed_bytes).is_empty() {
return Err(RoutingError::FailedSignature);
}
if !self.has_enough_sigs(min_section_size) {
return Err(RoutingError::NotEnoughSignatures);
}
Ok(())
}
pub fn signed_by(&self, pub_id: &PublicId) -> bool {
self.signatures.contains_key(pub_id)
}
pub fn src_size(&self) -> usize {
self.src_sections.iter().map(|sl| sl.pub_ids.len()).sum()
}
pub fn add_signature(&mut self, pub_id: PublicId, sig: sign::Signature) {
if self.content.src.is_multiple() && self.is_sender(&pub_id) {
let _ = self.signatures.insert(pub_id, sig);
}
}
pub fn add_signatures(&mut self, msg: SignedMessage) {
if self.content.src.is_multiple() {
self.signatures.extend(msg.signatures);
}
}
pub fn into_routing_message(self) -> RoutingMessage {
self.content
}
pub fn routing_message(&self) -> &RoutingMessage {
&self.content
}
pub fn priority(&self) -> u8 {
self.content.priority()
}
pub fn check_fully_signed(&mut self, min_section_size: usize) -> bool {
if !self.has_enough_sigs(min_section_size) {
return false;
}
let signed_bytes = match serialise(&self.content) {
Ok(serialised) => serialised,
Err(error) => {
warn!("Failed to serialise {:?}: {:?}", self, error);
return false;
}
};
for invalid_signature in &self.find_invalid_sigs(signed_bytes) {
let _ = self.signatures.remove(invalid_signature);
}
self.has_enough_sigs(min_section_size)
}
fn is_sender(&self, pub_id: &PublicId) -> bool {
self.src_sections
.iter()
.any(|list| list.pub_ids.contains(pub_id))
}
fn find_invalid_sigs(&self, signed_bytes: Vec<u8>) -> Vec<PublicId> {
let invalid = self
.signatures
.iter()
.filter_map(|(pub_id, sig)| {
let is_valid = if let Authority::Client { ref client_id, .. } = self.content.src {
client_id == pub_id
&& sign::verify_detached(sig, &signed_bytes, client_id.signing_public_key())
} else {
self.is_sender(pub_id)
&& sign::verify_detached(sig, &signed_bytes, pub_id.signing_public_key())
};
if is_valid {
None
} else {
Some(*pub_id)
}
}).collect_vec();
if !invalid.is_empty() {
debug!("{:?}: invalid signatures: {:?}", self, invalid);
}
invalid
}
fn has_enough_sigs(&self, min_section_size: usize) -> bool {
use Authority::*;
match self.content.src {
ClientManager(_) | NaeManager(_) | NodeManager(_) => {
let valid_names: HashSet<_> = self
.src_sections
.iter()
.flat_map(|list| list.pub_ids.iter().map(PublicId::name))
.sorted_by(|lhs, rhs| self.content.src.name().cmp_distance(lhs, rhs))
.into_iter()
.take(min_section_size)
.collect();
let valid_sigs = self
.signatures
.keys()
.filter(|pub_id| valid_names.contains(pub_id.name()))
.count();
valid_sigs * QUORUM_DENOMINATOR > valid_names.len() * QUORUM_NUMERATOR
}
Section(_) => {
let num_sending = self
.src_sections
.iter()
.fold(0, |count, list| count + list.pub_ids.len());
let valid_sigs = self.signatures.len();
valid_sigs * QUORUM_DENOMINATOR > num_sending * QUORUM_NUMERATOR
}
PrefixSection(_) => {
self.src_sections.iter().all(|list| {
let valid_sigs = self
.signatures
.keys()
.filter(|pub_id| list.pub_ids.contains(pub_id))
.count();
valid_sigs * QUORUM_DENOMINATOR > list.pub_ids.len() * QUORUM_NUMERATOR
})
}
ManagedNode(_) | Client { .. } => self.signatures.len() == 1,
}
}
}
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Hash, Debug, Serialize, Deserialize)]
pub struct RoutingMessage {
pub src: Authority<XorName>,
pub dst: Authority<XorName>,
pub content: MessageContent,
}
impl RoutingMessage {
pub fn ack_from(msg: &RoutingMessage, src: Authority<XorName>) -> Result<Self, RoutingError> {
Ok(RoutingMessage {
src,
dst: msg.src,
content: MessageContent::Ack(Ack::compute(msg)?, msg.priority()),
})
}
pub fn priority(&self) -> u8 {
self.content.priority()
}
pub fn to_signature(
&self,
signing_key: &sign::SecretKey,
) -> Result<DirectMessage, RoutingError> {
let serialised_msg = serialise(self)?;
let hash = sha3_256(&serialised_msg);
let sig = sign::sign_detached(&serialised_msg, signing_key);
Ok(DirectMessage::MessageSignature(hash, sig))
}
}
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Hash, Serialize, Deserialize)]
#[cfg_attr(feature = "cargo-clippy", allow(large_enum_variant))]
pub enum MessageContent {
Relocate {
message_id: MessageId,
},
ExpectCandidate {
old_public_id: PublicId,
old_client_auth: Authority<XorName>,
message_id: MessageId,
},
ConnectionInfoRequest {
encrypted_conn_info: Vec<u8>,
nonce: [u8; box_::NONCEBYTES],
pub_id: PublicId,
msg_id: MessageId,
},
ConnectionInfoResponse {
encrypted_conn_info: Vec<u8>,
nonce: [u8; box_::NONCEBYTES],
pub_id: PublicId,
msg_id: MessageId,
},
RelocateResponse {
target_interval: (XorName, XorName),
section: (Prefix<XorName>, BTreeSet<PublicId>),
message_id: MessageId,
},
SectionUpdate {
versioned_prefix: VersionedPrefix<XorName>,
members: BTreeSet<PublicId>,
},
SectionSplit(VersionedPrefix<XorName>, XorName),
OwnSectionMerge(SectionMap),
OtherSectionMerge(BTreeSet<PublicId>, u64),
Ack(Ack, u8),
UserMessagePart {
hash: Digest256,
msg_id: MessageId,
part_count: u32,
part_index: u32,
priority: u8,
cacheable: bool,
payload: Vec<u8>,
},
AcceptAsCandidate {
old_public_id: PublicId,
old_client_auth: Authority<XorName>,
target_interval: (XorName, XorName),
message_id: MessageId,
},
CandidateApproval {
new_public_id: PublicId,
new_client_auth: Authority<XorName>,
sections: SectionMap,
},
NodeApproval {
sections: SectionMap,
},
}
impl MessageContent {
pub fn priority(&self) -> u8 {
match *self {
MessageContent::Ack(_, priority) | MessageContent::UserMessagePart { priority, .. } => {
priority
}
_ => 0,
}
}
}
impl Debug for DirectMessage {
fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
use self::DirectMessage::*;
match *self {
MessageSignature(ref digest, _) => write!(
formatter,
"MessageSignature ({}, ..)",
utils::format_binary_array(&digest)
),
SectionListSignature(ref sec_list, _) => {
write!(formatter, "SectionListSignature({:?}, ..)", sec_list.prefix)
}
BootstrapRequest(_) => write!(formatter, "BootstrapRequest"),
BootstrapResponse(ref result) => write!(formatter, "BootstrapResponse({:?})", result),
CandidateInfo { .. } => write!(formatter, "CandidateInfo {{ .. }}"),
TunnelRequest(pub_id) => write!(formatter, "TunnelRequest({:?})", pub_id),
TunnelSuccess(pub_id) => write!(formatter, "TunnelSuccess({:?})", pub_id),
TunnelSelect(pub_id) => write!(formatter, "TunnelSelect({:?})", pub_id),
TunnelClosed(pub_id) => write!(formatter, "TunnelClosed({:?})", pub_id),
TunnelDisconnect(pub_id) => write!(formatter, "TunnelDisconnect({:?})", pub_id),
ResourceProof {
ref seed,
ref target_size,
ref difficulty,
} => write!(
formatter,
"ResourceProof {{ seed: {:?}, target_size: {:?}, difficulty: {:?} }}",
seed, target_size, difficulty
),
ResourceProofResponse {
part_index,
part_count,
ref proof,
leading_zero_bytes,
} => write!(
formatter,
"ResourceProofResponse {{ part {}/{}, proof_len: {:?}, leading_zero_bytes: \
{:?} }}",
part_index + 1,
part_count,
proof.len(),
leading_zero_bytes
),
ResourceProofResponseReceipt => write!(formatter, "ResourceProofResponseReceipt"),
ProxyRateLimitExceeded { ref ack } => {
write!(formatter, "ProxyRateLimitExceeded({:?})", ack)
}
}
}
}
impl Debug for HopMessage {
fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
write!(
formatter,
"HopMessage {{ content: {:?}, route: {}, sent_to: .., signature: .. }}",
self.content, self.route
)
}
}
impl Debug for SignedMessage {
fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
write!(
formatter,
"SignedMessage {{ content: {:?}, sending nodes: {:?}, signatures: {:?} }}",
self.content,
self.src_sections,
self.signatures.keys().collect_vec()
)
}
}
impl Debug for MessageContent {
fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
use self::MessageContent::*;
match *self {
Relocate { ref message_id } => write!(formatter, "Relocate {{ {:?} }}", message_id),
ExpectCandidate {
ref old_public_id,
ref old_client_auth,
ref message_id,
} => write!(
formatter,
"ExpectCandidate {{ {:?}, {:?}, {:?} }}",
old_public_id, old_client_auth, message_id
),
ConnectionInfoRequest {
ref pub_id,
ref msg_id,
..
} => write!(
formatter,
"ConnectionInfoRequest {{ {:?}, {:?}, .. }}",
pub_id, msg_id
),
ConnectionInfoResponse {
ref pub_id,
ref msg_id,
..
} => write!(
formatter,
"ConnectionInfoResponse {{ {:?}, {:?}, .. }}",
pub_id, msg_id
),
RelocateResponse {
ref target_interval,
ref section,
ref message_id,
} => write!(
formatter,
"RelocateResponse {{ {:?}, {:?}, {:?} }}",
target_interval, section, message_id
),
SectionUpdate {
ref versioned_prefix,
ref members,
} => write!(
formatter,
"SectionUpdate {{ {:?}, {:?} }}",
versioned_prefix, members
),
SectionSplit(ref ver_pfx, ref joining_node) => {
write!(formatter, "SectionSplit({:?}, {:?})", ver_pfx, joining_node)
}
OwnSectionMerge(ref sections) => write!(formatter, "OwnSectionMerge({:?})", sections),
OtherSectionMerge(ref section, ref version) => {
write!(formatter, "OtherSectionMerge({:?}, {:?})", section, version)
}
Ack(ack, priority) => write!(formatter, "Ack({:?}, {})", ack, priority),
UserMessagePart {
hash,
part_count,
part_index,
priority,
cacheable,
..
} => write!(
formatter,
"UserMessagePart {{ {}/{}, priority: {}, cacheable: {}, \
{:02x}{:02x}{:02x}.. }}",
part_index + 1,
part_count,
priority,
cacheable,
hash[0],
hash[1],
hash[2]
),
AcceptAsCandidate {
ref old_public_id,
ref old_client_auth,
ref target_interval,
ref message_id,
} => write!(
formatter,
"AcceptAsCandidate {{ {:?}, {:?}, {:?}, {:?} }}",
old_public_id, old_client_auth, target_interval, message_id
),
CandidateApproval {
ref new_public_id,
ref new_client_auth,
ref sections,
} => write!(
formatter,
"CandidateApproval {{ new: {:?}, client: {:?}, sections: {:?} }}",
new_public_id, new_client_auth, sections
),
NodeApproval { ref sections } => write!(formatter, "NodeApproval {{ {:?} }}", sections),
}
}
}
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Debug, Hash, Serialize, Deserialize)]
pub enum UserMessage {
Request(Request),
Response(Response),
}
impl UserMessage {
pub fn to_parts(&self, priority: u8) -> Result<Vec<MessageContent>, RoutingError> {
let payload = serialise(self)?;
let hash = sha3_256(&payload);
let msg_id = *self.message_id();
let len = payload.len();
let part_count = (len + MAX_PART_LEN - 1) / MAX_PART_LEN;
Ok((0..part_count)
.map(|i| MessageContent::UserMessagePart {
hash,
msg_id,
part_count: part_count as u32,
part_index: i as u32,
cacheable: self.is_cacheable(),
payload: payload[(i * len / part_count)..((i + 1) * len / part_count)].to_vec(),
priority,
}).collect())
}
pub fn from_parts<'a, I: Iterator<Item = &'a Vec<u8>>>(
hash: Digest256,
parts: I,
) -> Result<UserMessage, RoutingError> {
let mut payload = Vec::new();
for part in parts {
payload.extend_from_slice(part);
}
let user_msg = deserialise(&payload[..])?;
if hash != sha3_256(&payload) {
Err(RoutingError::HashMismatch)
} else {
Ok(user_msg)
}
}
pub fn into_event(self, src: Authority<XorName>, dst: Authority<XorName>) -> Event {
match self {
UserMessage::Request(request) => Event::Request { request, src, dst },
UserMessage::Response(response) => Event::Response { response, src, dst },
}
}
pub fn message_id(&self) -> &MessageId {
match *self {
UserMessage::Request(ref request) => request.message_id(),
UserMessage::Response(ref response) => response.message_id(),
}
}
fn is_cacheable(&self) -> bool {
match *self {
UserMessage::Request(ref request) => request.is_cacheable(),
UserMessage::Response(ref response) => response.is_cacheable(),
}
}
}
pub struct UserMessageCache(LruCache<(Digest256, u32), BTreeMap<u32, Vec<u8>>>);
impl UserMessageCache {
pub fn with_expiry_duration(duration: Duration) -> Self {
UserMessageCache(LruCache::with_expiry_duration(duration))
}
pub fn add(
&mut self,
hash: Digest256,
part_count: u32,
part_index: u32,
payload: Vec<u8>,
) -> Option<UserMessage> {
{
let entry = self
.0
.entry((hash, part_count))
.or_insert_with(BTreeMap::new);
if entry.insert(part_index, payload).is_some() {
debug!(
"Duplicate UserMessagePart {}/{} with hash {:02x}{:02x}{:02x}.. \
added to cache.",
part_index + 1,
part_count,
hash[0],
hash[1],
hash[2]
);
}
if entry.len() != part_count as usize {
return None;
}
}
self.0
.remove(&(hash, part_count))
.and_then(|part_map| UserMessage::from_parts(hash, part_map.values()).ok())
}
}
#[cfg(test)]
mod tests {
use super::*;
use data::ImmutableData;
use id::FullId;
use maidsafe_utilities::serialisation::serialise;
use rand;
use routing_table::{Authority, Prefix};
use rust_sodium::crypto::sign;
use std::collections::BTreeSet;
use std::iter;
use tiny_keccak::sha3_256;
use types::MessageId;
use xor_name::XorName;
#[test]
fn signed_message_check_integrity() {
let min_section_size = 1000;
let name: XorName = rand::random();
let full_id = FullId::new();
let routing_message = RoutingMessage {
src: Authority::Client {
client_id: *full_id.public_id(),
proxy_node_name: name,
},
dst: Authority::ClientManager(name),
content: MessageContent::SectionSplit(Prefix::new(0, name).with_version(0), name),
};
let senders = iter::empty().collect();
let signed_message_result = SignedMessage::new(routing_message.clone(), &full_id, senders);
let mut signed_message = unwrap!(signed_message_result);
assert_eq!(routing_message, *signed_message.routing_message());
assert_eq!(1, signed_message.signatures.len());
assert_eq!(
Some(full_id.public_id()),
signed_message.signatures.keys().next()
);
unwrap!(signed_message.check_integrity(min_section_size));
let full_id = FullId::new();
let bytes_to_sign = unwrap!(serialise(&(&routing_message, full_id.public_id())));
let signature = sign::sign_detached(&bytes_to_sign, full_id.signing_private_key());
signed_message.signatures = iter::once((*full_id.public_id(), signature)).collect();
assert!(signed_message.check_integrity(min_section_size).is_err());
assert!(signed_message.has_enough_sigs(min_section_size));
}
#[test]
fn msg_signatures() {
let min_section_size = 8;
let full_id_0 = FullId::new();
let prefix = Prefix::new(0, *full_id_0.public_id().name());
let full_id_1 = FullId::new();
let full_id_2 = FullId::new();
let irrelevant_full_id = FullId::new();
let data_bytes: Vec<u8> = (0..10).map(|i| i as u8).collect();
let data = ImmutableData::new(data_bytes);
let user_msg = UserMessage::Request(Request::PutIData {
data,
msg_id: MessageId::new(),
});
let parts = unwrap!(user_msg.to_parts(1));
assert_eq!(1, parts.len());
let part = parts[0].clone();
let name: XorName = rand::random();
let routing_message = RoutingMessage {
src: Authority::ClientManager(name),
dst: Authority::ClientManager(name),
content: part,
};
let src_sections = vec![SectionList::from(
prefix,
vec![
*full_id_0.public_id(),
*full_id_1.public_id(),
*full_id_2.public_id(),
],
)];
let mut signed_msg = unwrap!(SignedMessage::new(
routing_message,
&full_id_0,
src_sections,
));
assert_eq!(signed_msg.signatures.len(), 1);
let irrelevant_sig = match unwrap!(
signed_msg
.routing_message()
.to_signature(irrelevant_full_id.signing_private_key(),)
) {
DirectMessage::MessageSignature(_, sig) => {
signed_msg.add_signature(*irrelevant_full_id.public_id(), sig);
sig
}
msg => panic!("Unexpected message: {:?}", msg),
};
assert_eq!(signed_msg.signatures.len(), 1);
assert!(
!signed_msg
.signatures
.contains_key(irrelevant_full_id.public_id(),)
);
assert!(!signed_msg.check_fully_signed(min_section_size));
match unwrap!(
signed_msg
.routing_message()
.to_signature(full_id_1.signing_private_key(),)
) {
DirectMessage::MessageSignature(hash, sig) => {
let serialised_msg = unwrap!(serialise(signed_msg.routing_message()));
assert_eq!(hash, sha3_256(&serialised_msg));
signed_msg.add_signature(*full_id_1.public_id(), sig);
}
msg => panic!("Unexpected message: {:?}", msg),
}
let bad_sig = sign::Signature([0; sign::SIGNATUREBYTES]);
signed_msg.add_signature(*full_id_2.public_id(), bad_sig);
assert_eq!(signed_msg.signatures.len(), 3);
assert!(signed_msg.check_fully_signed(min_section_size));
assert_eq!(signed_msg.signatures.len(), 2);
assert!(!signed_msg.signatures.contains_key(full_id_2.public_id()));
signed_msg.add_signature(*irrelevant_full_id.public_id(), irrelevant_sig);
assert_eq!(signed_msg.signatures.len(), 2);
assert!(
!signed_msg
.signatures
.contains_key(irrelevant_full_id.public_id(),)
);
}
#[test]
fn hop_message_verify() {
let name: XorName = rand::random();
let routing_message = RoutingMessage {
src: Authority::ClientManager(name),
dst: Authority::ClientManager(name),
content: MessageContent::SectionSplit(Prefix::new(0, name).with_version(1), name),
};
let full_id = FullId::new();
let senders = iter::empty().collect();
let signed_message_result = SignedMessage::new(routing_message.clone(), &full_id, senders);
let signed_message = unwrap!(signed_message_result);
let (public_signing_key, secret_signing_key) = sign::gen_keypair();
let hop_message_result = HopMessage::new(
signed_message.clone(),
0,
BTreeSet::new(),
&secret_signing_key,
);
let hop_message = unwrap!(hop_message_result);
assert_eq!(signed_message, hop_message.content);
assert!(hop_message.verify(&public_signing_key).is_ok());
let (public_signing_key, _) = sign::gen_keypair();
assert!(hop_message.verify(&public_signing_key).is_err());
}
#[test]
fn user_message_parts() {
let data_bytes: Vec<u8> = (0..(MAX_PART_LEN * 2)).map(|i| i as u8).collect();
let data = ImmutableData::new(data_bytes);
let user_msg = UserMessage::Request(Request::PutIData {
data,
msg_id: MessageId::new(),
});
let msg_hash = sha3_256(&unwrap!(serialise(&user_msg)));
let parts = unwrap!(user_msg.to_parts(42));
assert_eq!(parts.len(), 3);
let payloads: Vec<Vec<u8>> = parts
.into_iter()
.enumerate()
.map(|(i, msg)| match msg {
MessageContent::UserMessagePart {
hash,
msg_id,
part_count,
part_index,
payload,
priority,
cacheable,
} => {
assert_eq!(msg_hash, hash);
assert_eq!(user_msg.message_id(), &msg_id);
assert_eq!(3, part_count);
assert_eq!(i, part_index as usize);
assert_eq!(42, priority);
assert!(!cacheable);
payload
}
msg => panic!("Unexpected message {:?}", msg),
}).collect();
let deserialised_user_msg = unwrap!(UserMessage::from_parts(msg_hash, payloads.iter()));
assert_eq!(user_msg, deserialised_user_msg);
}
}