#[cfg(not(feature = "use-mock-crust"))]
use crust::PeerId;
#[cfg(feature = "use-mock-crust")]
use mock_crust::crust::PeerId;
use lru_time_cache::LruCache;
use maidsafe_utilities;
use maidsafe_utilities::serialisation::{deserialise, serialise};
use sodiumoxide::crypto::{box_, sign};
use sodiumoxide::crypto::hash::sha256;
use std::collections::BTreeMap;
use std::fmt::{self, Debug, Formatter};
use std::time::Duration;
use ack_manager::Ack;
use authority::Authority;
use data::{Data, DataIdentifier};
use error::RoutingError;
use id::{FullId, PublicId};
use types::MessageId;
use utils;
use xor_name::XorName;
const MAX_PART_LEN: usize = 20 * 1024;
pub const RELOCATE_PRIORITY: u8 = 1;
pub const DEFAULT_PRIORITY: u8 = 2;
pub const CLIENT_GET_PRIORITY: u8 = 3;
#[derive(Debug, RustcEncodable, RustcDecodable)]
pub enum Message {
Direct(DirectMessage),
Hop(HopMessage),
TunnelDirect {
content: DirectMessage,
src: PeerId,
dst: PeerId,
},
TunnelHop {
content: HopMessage,
src: PeerId,
dst: PeerId,
},
}
#[derive(RustcEncodable, RustcDecodable)]
pub enum DirectMessage {
BootstrapIdentify {
public_id: ::id::PublicId,
current_quorum_size: usize,
},
BootstrapDeny,
ClientIdentify {
serialised_public_id: Vec<u8>,
signature: sign::Signature,
client_restriction: bool,
},
NodeIdentify {
serialised_public_id: Vec<u8>,
signature: sign::Signature,
},
NewNode(PublicId),
ConnectionUnneeded(XorName),
TunnelRequest(PeerId),
TunnelSuccess(PeerId),
TunnelClosed(PeerId),
TunnelDisconnect(PeerId),
}
impl DirectMessage {
pub fn priority(&self) -> u8 {
0 }
}
#[derive(RustcEncodable, RustcDecodable)]
pub struct HopMessage {
content: SignedMessage,
route: u8,
sent_to: Vec<XorName>,
signature: sign::Signature,
}
impl HopMessage {
pub fn new(content: SignedMessage,
route: u8,
sent_to: Vec<XorName>,
sign_key: &sign::SecretKey)
-> Result<HopMessage, RoutingError> {
let bytes_to_sign = try!(serialise(&content));
Ok(HopMessage {
content: content,
route: route,
sent_to: sent_to,
signature: sign::sign_detached(&bytes_to_sign, sign_key),
})
}
pub fn verify(&self, verification_key: &sign::PublicKey) -> Result<(), RoutingError> {
let signed_bytes = try!(serialise(&self.content));
if sign::verify_detached(&self.signature, &signed_bytes, verification_key) {
Ok(())
} else {
Err(RoutingError::FailedSignature)
}
}
pub fn content(&self) -> &SignedMessage {
&self.content
}
pub fn route(&self) -> u8 {
self.route
}
pub fn sent_to(&self) -> &Vec<XorName> {
&self.sent_to
}
}
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Hash, RustcEncodable, RustcDecodable)]
pub struct SignedMessage {
content: RoutingMessage,
public_id: PublicId,
signature: sign::Signature,
}
impl SignedMessage {
pub fn new(content: RoutingMessage, full_id: &FullId) -> Result<SignedMessage, RoutingError> {
let bytes_to_sign = try!(serialise(&(&content, full_id.public_id())));
Ok(SignedMessage {
content: content,
public_id: *full_id.public_id(),
signature: sign::sign_detached(&bytes_to_sign, full_id.signing_private_key()),
})
}
pub fn check_integrity(&self) -> Result<(), RoutingError> {
let signed_bytes = try!(serialise(&(&self.content, &self.public_id)));
if sign::verify_detached(&self.signature,
&signed_bytes,
self.public_id().signing_public_key()) {
Ok(())
} else {
Err(RoutingError::FailedSignature)
}
}
pub fn routing_message(&self) -> &RoutingMessage {
&self.content
}
pub fn public_id(&self) -> &PublicId {
&self.public_id
}
pub fn priority(&self) -> u8 {
self.content.priority()
}
}
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Hash, Debug, RustcEncodable, RustcDecodable)]
pub struct RoutingMessage {
pub src: Authority,
pub dst: Authority,
pub content: MessageContent,
}
impl RoutingMessage {
pub fn ack_from(msg: &RoutingMessage, src: Authority) -> Result<Self, RoutingError> {
Ok(RoutingMessage {
src: src,
dst: msg.src.clone(),
content: MessageContent::Ack(try!(Ack::compute(msg)), msg.priority()),
})
}
pub fn priority(&self) -> u8 {
self.content.priority()
}
pub fn to_grp_msg_hash(&self) -> Result<RoutingMessage, RoutingError> {
let content = match self.content {
MessageContent::GetNodeNameResponse { .. } |
MessageContent::GetCloseGroupResponse { .. } |
MessageContent::UserMessagePart { .. } => {
let serialised_msg = try!(serialise(self));
MessageContent::GroupMessageHash(sha256::hash(&serialised_msg), self.priority())
}
_ => self.content.clone(),
};
Ok(RoutingMessage {
src: self.src.clone(),
dst: self.dst.clone(),
content: content,
})
}
}
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Hash, RustcEncodable, RustcDecodable)]
pub enum MessageContent {
GetNodeName {
current_id: PublicId,
message_id: MessageId,
},
ExpectCloseNode {
expect_id: PublicId,
client_auth: Authority,
message_id: MessageId,
},
GetCloseGroup(MessageId),
ConnectionInfo {
encrypted_connection_info: Vec<u8>,
nonce_bytes: [u8; box_::NONCEBYTES],
public_id: PublicId,
},
GetNodeNameResponse {
relocated_id: PublicId,
close_group_ids: Vec<PublicId>,
message_id: MessageId,
},
GetCloseGroupResponse {
close_group_ids: Vec<PublicId>,
message_id: MessageId,
},
Ack(Ack, u8),
GroupMessageHash(sha256::Digest, u8),
UserMessagePart {
hash: u64,
part_count: u32,
part_index: u32,
priority: u8,
cacheable: bool,
payload: Vec<u8>,
},
}
impl MessageContent {
pub fn priority(&self) -> u8 {
match *self {
MessageContent::Ack(_, priority) |
MessageContent::GroupMessageHash(_, priority) |
MessageContent::UserMessagePart { priority, .. } => priority,
_ => 0,
}
}
}
impl Debug for DirectMessage {
fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
match *self {
DirectMessage::BootstrapIdentify { ref public_id, ref current_quorum_size } => {
write!(formatter,
"BootstrapIdentify {{ {:?}, {:?} }}",
public_id,
current_quorum_size)
}
DirectMessage::BootstrapDeny => write!(formatter, "BootstrapDeny"),
DirectMessage::ClientIdentify { client_restriction: true, .. } => {
write!(formatter, "ClientIdentify (client only)")
}
DirectMessage::ClientIdentify { client_restriction: false, .. } => {
write!(formatter, "ClientIdentify (joining node)")
}
DirectMessage::NodeIdentify { .. } => write!(formatter, "NodeIdentify {{ .. }}"),
DirectMessage::NewNode(ref public_id) => write!(formatter, "NewNode({:?})", public_id),
DirectMessage::ConnectionUnneeded(ref name) => {
write!(formatter, "ConnectionUnneeded({:?})", name)
}
DirectMessage::TunnelRequest(peer_id) => {
write!(formatter, "TunnelRequest({:?})", peer_id)
}
DirectMessage::TunnelSuccess(peer_id) => {
write!(formatter, "TunnelSuccess({:?})", peer_id)
}
DirectMessage::TunnelClosed(peer_id) => {
write!(formatter, "TunnelClosed({:?})", peer_id)
}
DirectMessage::TunnelDisconnect(peer_id) => {
write!(formatter, "TunnelDisconnect({:?})", peer_id)
}
}
}
}
impl Debug for HopMessage {
fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
write!(formatter,
"HopMessage {{ content: {:?}, route: {}, sent_to: .., signature: .. }}",
self.content,
self.route)
}
}
impl Debug for SignedMessage {
fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
write!(formatter,
"SignedMessage {{ content: {:?}, public_id: {:?}, signature: .. }}",
self.content,
self.public_id)
}
}
impl Debug for MessageContent {
fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
match *self {
MessageContent::GetNodeName { ref current_id, ref message_id } => {
write!(formatter,
"GetNodeName {{ {:?}, {:?} }}",
current_id,
message_id)
}
MessageContent::ExpectCloseNode { ref expect_id, ref client_auth, ref message_id } => {
write!(formatter,
"ExpectCloseNode {{ {:?}, {:?}, {:?} }}",
expect_id,
client_auth,
message_id)
}
MessageContent::GetCloseGroup(id) => write!(formatter, "GetCloseGroup({:?})", id),
MessageContent::ConnectionInfo { .. } => write!(formatter, "ConnectionInfo {{ .. }}"),
MessageContent::GetNodeNameResponse { ref relocated_id,
ref close_group_ids,
ref message_id } => {
write!(formatter,
"GetNodeNameResponse {{ {:?}, {:?}, {:?} }}",
close_group_ids,
relocated_id,
message_id)
}
MessageContent::GetCloseGroupResponse { ref close_group_ids, message_id } => {
write!(formatter,
"GetCloseGroupResponse {{ {:?}, {:?} }}",
close_group_ids,
message_id)
}
MessageContent::Ack(ack, priority) => write!(formatter, "Ack({}, {})", ack, priority),
MessageContent::GroupMessageHash(ref hash, priority) => {
write!(formatter,
"GroupMessageHash({}, {})",
utils::format_binary_array(&hash.0),
priority)
}
MessageContent::UserMessagePart { hash, part_count, part_index, priority, .. } => {
write!(formatter,
"UserMessagePart {{ {}/{}, priority: {} {:x}}}",
part_index + 1,
part_count,
priority,
hash)
}
}
}
}
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Debug, Hash, RustcEncodable, RustcDecodable)]
pub enum UserMessage {
Request(Request),
Response(Response),
}
impl UserMessage {
pub fn to_parts(&self, priority: u8) -> Result<Vec<MessageContent>, RoutingError> {
let hash = maidsafe_utilities::big_endian_sip_hash(self);
let payload = try!(serialise(self));
let len = payload.len();
let part_count = (len + MAX_PART_LEN - 1) / MAX_PART_LEN;
Ok((0..part_count)
.map(|i| {
MessageContent::UserMessagePart {
hash: hash,
part_count: part_count as u32,
part_index: i as u32,
cacheable: self.is_cacheable(),
payload: payload[(i * len / part_count)..((i + 1) * len / part_count)].to_vec(),
priority: priority,
}
})
.collect())
}
pub fn from_parts<'a, I: Iterator<Item = &'a Vec<u8>>>(hash: u64,
parts: I)
-> Result<UserMessage, RoutingError> {
let mut payload = Vec::new();
for part in parts {
payload.extend_from_slice(part);
}
let user_msg = try!(deserialise(&payload[..]));
if hash != maidsafe_utilities::big_endian_sip_hash(&user_msg) {
Err(RoutingError::HashMismatch)
} else {
Ok(user_msg)
}
}
fn is_cacheable(&self) -> bool {
match *self {
UserMessage::Request(ref request) => request.is_cacheable(),
UserMessage::Response(ref response) => response.is_cacheable(),
}
}
}
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Hash, RustcEncodable, RustcDecodable)]
pub enum Request {
Refresh(Vec<u8>, MessageId),
Get(DataIdentifier, MessageId),
Put(Data, MessageId),
Post(Data, MessageId),
Delete(Data, MessageId),
GetAccountInfo(MessageId),
}
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Hash, RustcEncodable, RustcDecodable)]
pub enum Response {
GetSuccess(Data, MessageId),
PutSuccess(DataIdentifier, MessageId),
PostSuccess(DataIdentifier, MessageId),
DeleteSuccess(DataIdentifier, MessageId),
GetAccountInfoSuccess {
id: MessageId,
data_stored: u64,
space_available: u64,
},
GetFailure {
id: MessageId,
data_id: DataIdentifier,
external_error_indicator: Vec<u8>,
},
PutFailure {
id: MessageId,
data_id: DataIdentifier,
external_error_indicator: Vec<u8>,
},
PostFailure {
id: MessageId,
data_id: DataIdentifier,
external_error_indicator: Vec<u8>,
},
DeleteFailure {
id: MessageId,
data_id: DataIdentifier,
external_error_indicator: Vec<u8>,
},
GetAccountInfoFailure {
id: MessageId,
external_error_indicator: Vec<u8>,
},
}
impl Request {
pub fn priority(&self) -> u8 {
match *self {
Request::Refresh(..) => 2,
Request::Get(..) |
Request::GetAccountInfo(..) => 3,
Request::Put(ref data, _) |
Request::Post(ref data, _) |
Request::Delete(ref data, _) => {
match *data {
Data::Structured(..) => 4,
_ => 5,
}
}
}
}
pub fn is_cacheable(&self) -> bool {
if let Request::Get(DataIdentifier::Immutable(..), _) = *self {
true
} else {
false
}
}
}
impl Response {
pub fn priority(&self) -> u8 {
match *self {
Response::GetSuccess(ref data, _) => {
match *data {
Data::Structured(..) => 4,
_ => 5,
}
}
Response::PutSuccess(..) |
Response::PostSuccess(..) |
Response::DeleteSuccess(..) |
Response::GetAccountInfoSuccess { .. } |
Response::GetFailure { .. } |
Response::PutFailure { .. } |
Response::PostFailure { .. } |
Response::DeleteFailure { .. } |
Response::GetAccountInfoFailure { .. } => 3,
}
}
pub fn is_cacheable(&self) -> bool {
if let Response::GetSuccess(Data::Immutable(..), _) = *self {
true
} else {
false
}
}
}
impl Debug for Request {
fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
match *self {
Request::Refresh(ref data, ref message_id) => {
write!(formatter,
"Refresh({}, {:?})",
utils::format_binary_array(data),
message_id)
}
Request::Get(ref data_request, ref message_id) => {
write!(formatter, "Get({:?}, {:?})", data_request, message_id)
}
Request::Put(ref data, ref message_id) => {
write!(formatter, "Put({:?}, {:?})", data, message_id)
}
Request::Post(ref data, ref message_id) => {
write!(formatter, "Post({:?}, {:?})", data, message_id)
}
Request::Delete(ref data, ref message_id) => {
write!(formatter, "Delete({:?}, {:?})", data, message_id)
}
Request::GetAccountInfo(ref message_id) => {
write!(formatter, "GetAccountInfo({:?})", message_id)
}
}
}
}
impl Debug for Response {
fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
match *self {
Response::GetSuccess(ref data, ref message_id) => {
write!(formatter, "GetSuccess({:?}, {:?})", data, message_id)
}
Response::PutSuccess(ref name, ref message_id) => {
write!(formatter, "PutSuccess({:?}, {:?})", name, message_id)
}
Response::PostSuccess(ref name, ref message_id) => {
write!(formatter, "PostSuccess({:?}, {:?})", name, message_id)
}
Response::DeleteSuccess(ref name, ref message_id) => {
write!(formatter, "DeleteSuccess({:?}, {:?})", name, message_id)
}
Response::GetAccountInfoSuccess { ref id, .. } => {
write!(formatter, "GetAccountInfoSuccess {{ {:?}, .. }}", id)
}
Response::GetFailure { ref id, ref data_id, .. } => {
write!(formatter, "GetFailure {{ {:?}, {:?}, .. }}", id, data_id)
}
Response::PutFailure { ref id, ref data_id, .. } => {
write!(formatter, "PutFailure {{ {:?}, {:?}, .. }}", id, data_id)
}
Response::PostFailure { ref id, ref data_id, .. } => {
write!(formatter, "PostFailure {{ {:?}, {:?}, .. }}", id, data_id)
}
Response::DeleteFailure { ref id, ref data_id, .. } => {
write!(formatter, "DeleteFailure {{ {:?}, {:?}, .. }}", id, data_id)
}
Response::GetAccountInfoFailure { ref id, .. } => {
write!(formatter, "GetAccountInfoFailure {{ {:?}, .. }}", id)
}
}
}
}
pub struct UserMessageCache(LruCache<(u64, u32), BTreeMap<u32, Vec<u8>>>);
impl UserMessageCache {
pub fn with_expiry_duration(duration: Duration) -> Self {
UserMessageCache(LruCache::with_expiry_duration(duration))
}
pub fn add(&mut self,
hash: u64,
part_count: u32,
part_index: u32,
payload: Vec<u8>)
-> Option<UserMessage> {
{
let entry = self.0.entry((hash, part_count)).or_insert_with(BTreeMap::new);
let _ = entry.insert(part_index, payload);
if entry.len() != part_count as usize {
return None;
}
}
self.0
.remove(&(hash, part_count))
.and_then(|part_map| UserMessage::from_parts(hash, part_map.values()).ok())
}
}
#[cfg(test)]
mod test {
extern crate rand;
use super::*;
use authority::Authority;
use data::Data;
use id::FullId;
use immutable_data::ImmutableData;
use maidsafe_utilities;
use maidsafe_utilities::serialisation::serialise;
use sodiumoxide::crypto::sign;
use types::MessageId;
use xor_name::XorName;
#[test]
fn signed_message_check_integrity() {
let name: XorName = rand::random();
let routing_message = RoutingMessage {
src: Authority::ClientManager(name),
dst: Authority::ClientManager(name),
content: MessageContent::GetCloseGroup(MessageId::zero()),
};
let full_id = FullId::new();
let signed_message_result = SignedMessage::new(routing_message.clone(), &full_id);
assert!(signed_message_result.is_ok());
let mut signed_message = unwrap_result!(signed_message_result);
assert_eq!(routing_message, *signed_message.routing_message());
assert_eq!(full_id.public_id(), signed_message.public_id());
let check_integrity_result = signed_message.check_integrity();
assert!(check_integrity_result.is_ok());
let full_id = FullId::new();
let bytes_to_sign = unwrap_result!(serialise(&(&routing_message, full_id.public_id())));
let signature = sign::sign_detached(&bytes_to_sign, full_id.signing_private_key());
signed_message.signature = signature;
let check_integrity_result = signed_message.check_integrity();
assert!(check_integrity_result.is_err());
}
#[test]
fn grp_msg_hash() {
let data_bytes: Vec<u8> = (0..10).map(|i| i as u8).collect();
let data = Data::Immutable(ImmutableData::new(data_bytes));
let user_msg = UserMessage::Request(Request::Put(data, MessageId::new()));
let parts = unwrap_result!(user_msg.to_parts(1));
assert_eq!(1, parts.len());
let part = parts[0].clone();
let name: XorName = rand::random();
let routing_message = RoutingMessage {
src: Authority::ClientManager(name),
dst: Authority::ClientManager(name),
content: part,
};
let hash_msg = unwrap_result!(routing_message.to_grp_msg_hash());
match hash_msg.content {
MessageContent::GroupMessageHash(..) => (),
_ => panic!("Wrong content for hashed message: {:?}", hash_msg),
}
assert_eq!(hash_msg, unwrap_result!(hash_msg.to_grp_msg_hash()));
let non_hash_routing_msg = RoutingMessage {
src: Authority::ClientManager(name),
dst: Authority::ClientManager(name),
content: MessageContent::GetCloseGroup(MessageId::zero()),
};
assert_eq!(non_hash_routing_msg,
unwrap_result!(non_hash_routing_msg.to_grp_msg_hash()));
}
#[test]
fn hop_message_verify() {
let name: XorName = rand::random();
let routing_message = RoutingMessage {
src: Authority::ClientManager(name),
dst: Authority::ClientManager(name),
content: MessageContent::GetCloseGroup(MessageId::zero()),
};
let full_id = FullId::new();
let signed_message_result = SignedMessage::new(routing_message.clone(), &full_id);
assert!(signed_message_result.is_ok());
let signed_message = unwrap_result!(signed_message_result);
let (public_signing_key, secret_signing_key) = sign::gen_keypair();
let hop_message_result =
HopMessage::new(signed_message.clone(), 0, vec![], &secret_signing_key);
assert!(hop_message_result.is_ok());
let hop_message = unwrap_result!(hop_message_result);
assert_eq!(signed_message, *hop_message.content());
let verify_result = hop_message.verify(&public_signing_key);
assert!(verify_result.is_ok());
let (public_signing_key, _) = sign::gen_keypair();
let verify_result = hop_message.verify(&public_signing_key);
assert!(verify_result.is_err());
}
#[test]
fn user_message_parts() {
let data_bytes: Vec<u8> = (0..(super::MAX_PART_LEN * 2)).map(|i| i as u8).collect();
let data = Data::Immutable(ImmutableData::new(data_bytes));
let user_msg = UserMessage::Request(Request::Put(data, MessageId::new()));
let msg_hash = maidsafe_utilities::big_endian_sip_hash(&user_msg);
let parts = unwrap_result!(user_msg.to_parts(42));
assert_eq!(parts.len(), 3);
let payloads: Vec<Vec<u8>> = parts.into_iter()
.enumerate()
.map(|(i, msg)| match msg {
MessageContent::UserMessagePart { hash,
part_count,
part_index,
payload,
priority,
cacheable } => {
assert_eq!(msg_hash, hash);
assert_eq!(3, part_count);
assert_eq!(i, part_index as usize);
assert_eq!(42, priority);
assert!(!cacheable);
payload
}
msg => panic!("Unexpected message {:?}", msg),
})
.collect();
let deserialised_user_msg = unwrap_result!(UserMessage::from_parts(msg_hash,
payloads.iter()));
assert_eq!(user_msg, deserialised_user_msg);
}
}