use MIN_SECTION_SIZE;
use action::Action;
use cache::{Cache, NullCache};
use client_error::ClientError;
use config_handler::{self, Config};
use data::{EntryAction, ImmutableData, MutableData, PermissionSet, User, Value};
use error::{InterfaceError, RoutingError};
use event::Event;
use event_stream::{EventStepper, EventStream};
use id::{FullId, PublicId};
use messages::{AccountInfo, CLIENT_GET_PRIORITY, DEFAULT_PRIORITY, RELOCATE_PRIORITY, Request,
Response, UserMessage};
use outbox::{EventBox, EventBuf};
use routing_table::{Authority, RoutingTable};
#[cfg(feature = "use-mock-crust")]
use routing_table::Prefix;
#[cfg(not(feature = "use-mock-crust"))]
use rust_sodium;
use rust_sodium::crypto::sign;
use state_machine::{State, StateMachine};
use states::{self, Bootstrapping, BootstrappingTargetState};
use std::collections::{BTreeMap, BTreeSet};
#[cfg(feature = "use-mock-crust")]
use std::fmt::{self, Debug, Formatter};
#[cfg(feature = "use-mock-crust")]
use std::net::IpAddr;
use std::sync::mpsc::{Receiver, RecvError, Sender, TryRecvError, channel};
use types::{MessageId, RoutingActionSender};
use xor_name::XorName;
macro_rules! impl_request {
($method:ident, $message:ident { $($pname:ident : $ptype:ty),*, }, $priority:expr) => {
#[allow(missing_docs)]
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
pub fn $method(&mut self,
src: Authority<XorName>,
dst: Authority<XorName>,
$($pname: $ptype),*)
-> Result<(), InterfaceError> {
let msg = UserMessage::Request(Request::$message {
$($pname: $pname),*,
});
self.send_action(src, dst, msg, $priority)
}
};
($method:ident, $message:ident { $($pname:ident : $ptype:ty),* }, $priority:expr) => {
impl_request!($method, $message { $($pname:$ptype),*, }, $priority);
};
}
macro_rules! impl_response {
($method:ident, $message:ident, $payload:ty, $priority:expr) => {
#[allow(missing_docs)]
pub fn $method(&mut self,
src: Authority<XorName>,
dst: Authority<XorName>,
res: Result<$payload, ClientError>,
msg_id: MessageId)
-> Result<(), InterfaceError> {
let msg = UserMessage::Response(Response::$message {
res: res,
msg_id: msg_id,
});
self.send_action(src, dst, msg, $priority)
}
};
}
pub struct NodeBuilder {
cache: Box<Cache>,
first: bool,
config: Option<Config>,
}
impl NodeBuilder {
pub fn cache(self, cache: Box<Cache>) -> NodeBuilder {
NodeBuilder { cache, ..self }
}
pub fn first(self, first: bool) -> NodeBuilder {
NodeBuilder { first, ..self }
}
pub fn config(self, config: Config) -> NodeBuilder {
NodeBuilder {
config: Some(config),
..self
}
}
pub fn create(self) -> Result<Node, RoutingError> {
#[cfg(not(feature = "use-mock-crust"))]
let _ = rust_sodium::init();
let mut ev_buffer = EventBuf::new();
let (_, machine) = self.make_state_machine(&mut ev_buffer);
let (tx, rx) = channel();
Ok(Node {
interface_result_tx: tx,
interface_result_rx: rx,
machine: machine,
event_buffer: ev_buffer,
})
}
fn make_state_machine(self, outbox: &mut EventBox) -> (RoutingActionSender, StateMachine) {
let full_id = FullId::new();
let pub_id = *full_id.public_id();
let config = self.config.unwrap_or_else(config_handler::get_config);
let dev_config = config.dev.unwrap_or_default();
let min_section_size = dev_config.min_section_size.unwrap_or(MIN_SECTION_SIZE);
StateMachine::new(
move |action_sender, crust_service, timer, outbox2| if self.first {
if let Some(state) = states::Node::first(
action_sender,
self.cache,
crust_service,
full_id,
min_section_size,
timer,
)
{
State::Node(state)
} else {
State::Terminated
}
} else if !dev_config.allow_multiple_lan_nodes && crust_service.has_peers_on_lan() {
error!("More than one routing node found on LAN. Currently this is not supported.");
outbox2.send_event(Event::Terminate);
State::Terminated
} else {
Bootstrapping::new(
action_sender,
self.cache,
BootstrappingTargetState::JoiningNode,
crust_service,
full_id,
min_section_size,
timer,
).map_or(State::Terminated, State::Bootstrapping)
},
pub_id,
None,
outbox,
)
}
}
pub struct Node {
interface_result_tx: Sender<Result<(), InterfaceError>>,
interface_result_rx: Receiver<Result<(), InterfaceError>>,
machine: StateMachine,
event_buffer: EventBuf,
}
impl Node {
pub fn builder() -> NodeBuilder {
NodeBuilder {
cache: Box::new(NullCache),
first: false,
config: None,
}
}
impl_request!(
send_get_idata_request,
GetIData {
name: XorName,
msg_id: MessageId,
},
RELOCATE_PRIORITY
);
impl_request!(
send_put_idata_request,
PutIData {
data: ImmutableData,
msg_id: MessageId,
},
DEFAULT_PRIORITY
);
impl_request!(
send_get_mdata_request,
GetMData {
name: XorName,
tag: u64,
msg_id: MessageId,
},
RELOCATE_PRIORITY
);
impl_request!(
send_put_mdata_request,
PutMData {
data: MutableData,
msg_id: MessageId,
requester: sign::PublicKey,
},
DEFAULT_PRIORITY
);
impl_request!(send_mutate_mdata_entries_request,
MutateMDataEntries {
name: XorName,
tag: u64,
actions: BTreeMap<Vec<u8>, EntryAction>,
msg_id: MessageId,
requester: sign::PublicKey,
},
DEFAULT_PRIORITY);
impl_request!(send_get_mdata_shell_request,
GetMDataShell {
name: XorName,
tag: u64,
msg_id: MessageId,
},
RELOCATE_PRIORITY);
impl_request!(send_get_mdata_value_request,
GetMDataValue {
name: XorName,
tag: u64,
key: Vec<u8>,
msg_id: MessageId,
},
RELOCATE_PRIORITY);
impl_request!(send_set_mdata_user_permissions_request,
SetMDataUserPermissions {
name: XorName,
tag: u64,
user: User,
permissions: PermissionSet,
version: u64,
msg_id: MessageId,
requester: sign::PublicKey,
}, DEFAULT_PRIORITY);
impl_request!(send_del_mdata_user_permissions_request,
DelMDataUserPermissions {
name: XorName,
tag: u64,
user: User,
version: u64,
msg_id: MessageId,
requester: sign::PublicKey,
}, DEFAULT_PRIORITY);
impl_request!(send_change_mdata_owner_request,
ChangeMDataOwner {
name: XorName,
tag: u64,
new_owners: BTreeSet<sign::PublicKey>,
version: u64,
msg_id: MessageId,
}, DEFAULT_PRIORITY);
pub fn send_refresh_request(
&mut self,
src: Authority<XorName>,
dst: Authority<XorName>,
content: Vec<u8>,
msg_id: MessageId,
) -> Result<(), InterfaceError> {
let msg = UserMessage::Request(Request::Refresh(content, msg_id));
self.send_action(src, dst, msg, RELOCATE_PRIORITY)
}
impl_response!(send_get_account_info_response,
GetAccountInfo,
AccountInfo,
CLIENT_GET_PRIORITY);
pub fn send_get_idata_response(
&mut self,
src: Authority<XorName>,
dst: Authority<XorName>,
res: Result<ImmutableData, ClientError>,
msg_id: MessageId,
) -> Result<(), InterfaceError> {
let msg = UserMessage::Response(Response::GetIData {
res: res,
msg_id: msg_id,
});
let priority = relocate_priority(&dst);
self.send_action(src, dst, msg, priority)
}
impl_response!(send_put_idata_response, PutIData, (), DEFAULT_PRIORITY);
pub fn send_get_mdata_response(
&mut self,
src: Authority<XorName>,
dst: Authority<XorName>,
res: Result<MutableData, ClientError>,
msg_id: MessageId,
) -> Result<(), InterfaceError> {
let msg = UserMessage::Response(Response::GetMData {
res: res,
msg_id: msg_id,
});
let priority = relocate_priority(&dst);
self.send_action(src, dst, msg, priority)
}
impl_response!(send_put_mdata_response, PutMData, (), DEFAULT_PRIORITY);
impl_response!(send_get_mdata_version_response,
GetMDataVersion,
u64,
CLIENT_GET_PRIORITY);
pub fn send_get_mdata_shell_response(
&mut self,
src: Authority<XorName>,
dst: Authority<XorName>,
res: Result<MutableData, ClientError>,
msg_id: MessageId,
) -> Result<(), InterfaceError> {
let msg = UserMessage::Response(Response::GetMDataShell {
res: res,
msg_id: msg_id,
});
let priority = relocate_priority(&dst);
self.send_action(src, dst, msg, priority)
}
impl_response!(send_list_mdata_entries_response,
ListMDataEntries,
BTreeMap<Vec<u8>, Value>,
CLIENT_GET_PRIORITY);
impl_response!(send_list_mdata_keys_response,
ListMDataKeys,
BTreeSet<Vec<u8>>,
CLIENT_GET_PRIORITY);
impl_response!(send_list_mdata_values_response,
ListMDataValues,
Vec<Value>,
CLIENT_GET_PRIORITY);
pub fn send_get_mdata_value_response(
&mut self,
src: Authority<XorName>,
dst: Authority<XorName>,
res: Result<Value, ClientError>,
msg_id: MessageId,
) -> Result<(), InterfaceError> {
let msg = UserMessage::Response(Response::GetMDataValue {
res: res,
msg_id: msg_id,
});
let priority = relocate_priority(&dst);
self.send_action(src, dst, msg, priority)
}
impl_response!(send_mutate_mdata_entries_response,
MutateMDataEntries,
(),
DEFAULT_PRIORITY);
impl_response!(send_list_mdata_permissions_response,
ListMDataPermissions,
BTreeMap<User, PermissionSet>,
CLIENT_GET_PRIORITY);
impl_response!(send_list_mdata_user_permissions_response,
ListMDataUserPermissions,
PermissionSet,
CLIENT_GET_PRIORITY);
impl_response!(send_set_mdata_user_permissions_response,
SetMDataUserPermissions,
(),
DEFAULT_PRIORITY);
impl_response!(send_list_auth_keys_and_version_response,
ListAuthKeysAndVersion,
(BTreeSet<sign::PublicKey>, u64),
CLIENT_GET_PRIORITY);
impl_response!(send_ins_auth_key_response,
InsAuthKey,
(),
DEFAULT_PRIORITY);
impl_response!(send_del_auth_key_response,
DelAuthKey,
(),
DEFAULT_PRIORITY);
impl_response!(send_del_mdata_user_permissions_response,
DelMDataUserPermissions,
(),
DEFAULT_PRIORITY);
impl_response!(send_change_mdata_owner_response,
ChangeMDataOwner,
(),
DEFAULT_PRIORITY);
pub fn close_group(&self, name: XorName, count: usize) -> Option<Vec<XorName>> {
self.machine.close_group(name, count)
}
pub fn id(&self) -> Result<PublicId, RoutingError> {
self.machine.id().ok_or(RoutingError::Terminated)
}
pub fn routing_table(&self) -> Result<&RoutingTable<XorName>, RoutingError> {
self.machine.routing_table().ok_or(RoutingError::Terminated)
}
pub fn min_section_size(&self) -> usize {
self.machine.min_section_size()
}
fn send_action(
&mut self,
src: Authority<XorName>,
dst: Authority<XorName>,
user_msg: UserMessage,
priority: u8,
) -> Result<(), InterfaceError> {
let _ = self.poll();
let action = Action::NodeSendMessage {
src: src,
dst: dst,
content: user_msg,
priority: priority,
result_tx: self.interface_result_tx.clone(),
};
let transition = self.machine.current_mut().handle_action(
action,
&mut self.event_buffer,
);
self.machine.apply_transition(
transition,
&mut self.event_buffer,
);
self.interface_result_rx.recv()?
}
}
impl EventStepper for Node {
type Item = Event;
fn produce_events(&mut self) -> Result<(), RecvError> {
self.machine.step(&mut self.event_buffer)
}
fn try_produce_events(&mut self) -> Result<(), TryRecvError> {
self.machine.try_step(&mut self.event_buffer)
}
fn pop_item(&mut self) -> Option<Event> {
self.event_buffer.take_first()
}
}
#[cfg(feature = "use-mock-crust")]
impl Node {
pub fn purge_invalid_rt_entry(&mut self) {
self.machine.current_mut().purge_invalid_rt_entry()
}
pub fn has_tunnel_clients(&self, client_1: PublicId, client_2: PublicId) -> bool {
self.machine.current().has_tunnel_clients(
client_1,
client_2,
)
}
pub fn section_list_signatures(
&self,
prefix: Prefix<XorName>,
) -> Option<BTreeMap<PublicId, sign::Signature>> {
self.machine.current().section_list_signatures(prefix)
}
pub fn get_banned_client_ips(&self) -> BTreeSet<IpAddr> {
self.machine.current().get_banned_client_ips()
}
pub fn is_node(&self) -> bool {
if let State::Node(..) = *self.machine.current() {
true
} else {
false
}
}
pub fn set_next_relocation_dst(&mut self, dst: XorName) {
self.machine.current_mut().set_next_relocation_dst(
Some(dst),
)
}
pub fn set_next_relocation_interval(&mut self, interval: (XorName, XorName)) {
self.machine.current_mut().set_next_relocation_interval(
interval,
)
}
pub fn clear_next_relocation_dst(&mut self) {
self.machine.current_mut().set_next_relocation_dst(None)
}
pub fn has_unnormalised_routing_conn(&self, excludes: &BTreeSet<XorName>) -> bool {
self.machine.current().has_unnormalised_routing_conn(
excludes,
)
}
pub fn get_user_msg_parts_count(&self) -> u64 {
self.machine.current().get_user_msg_parts_count()
}
pub fn get_clients_usage(&self) -> BTreeMap<IpAddr, u64> {
unwrap!(self.machine.current().get_clients_usage())
}
}
#[cfg(feature = "use-mock-crust")]
impl Debug for Node {
fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
self.machine.fmt(formatter)
}
}
impl Drop for Node {
fn drop(&mut self) {
let _ = self.machine.current_mut().handle_action(
Action::Terminate,
&mut self.event_buffer,
);
let _ = self.event_buffer.take_all();
}
}
fn relocate_priority(dst: &Authority<XorName>) -> u8 {
if dst.is_client() {
CLIENT_GET_PRIORITY
} else {
RELOCATE_PRIORITY
}
}