use action::Action;
use cache::{Cache, NullCache};
#[cfg(feature = "use-mock-crust")]
use crust::PeerId;
use data::{Data, DataIdentifier};
use error::{InterfaceError, RoutingError};
use event::Event;
use event_stream::{EventStepper, EventStream};
use id::FullId;
#[cfg(feature = "use-mock-crust")]
use id::PublicId;
use messages::{CLIENT_GET_PRIORITY, DEFAULT_PRIORITY, RELOCATE_PRIORITY, Request, Response,
UserMessage};
use outbox::{EventBox, EventBuf};
#[cfg(feature = "use-mock-crust")]
use routing_table::{Prefix, RoutingTable};
use routing_table::Authority;
#[cfg(not(feature = "use-mock-crust"))]
use rust_sodium;
#[cfg(feature = "use-mock-crust")]
use rust_sodium::crypto::sign;
use state_machine::{State, StateMachine};
use states;
#[cfg(feature = "use-mock-crust")]
use std::collections::BTreeMap;
#[cfg(feature = "use-mock-crust")]
use std::fmt::{self, Debug, Formatter};
use std::sync::mpsc::{Receiver, RecvError, Sender, TryRecvError, channel};
use types::{MessageId, RoutingActionSender};
use xor_name::XorName;
pub struct NodeBuilder {
cache: Box<Cache>,
first: bool,
deny_other_local_nodes: bool,
}
impl NodeBuilder {
pub fn cache(self, cache: Box<Cache>) -> NodeBuilder {
NodeBuilder { cache: cache, ..self }
}
pub fn first(self, first: bool) -> NodeBuilder {
NodeBuilder { first: first, ..self }
}
pub fn deny_other_local_nodes(self) -> NodeBuilder {
NodeBuilder { deny_other_local_nodes: true, ..self }
}
pub fn create(self, min_section_size: usize) -> Result<Node, RoutingError> {
#[cfg(not(feature = "use-mock-crust"))]
rust_sodium::init();
let mut ev_buffer = EventBuf::new();
let (_, machine) = self.make_state_machine(min_section_size, &mut ev_buffer);
let (tx, rx) = channel();
Ok(Node {
interface_result_tx: tx,
interface_result_rx: rx,
machine: machine,
event_buffer: ev_buffer,
})
}
#[cfg_attr(rustfmt, rustfmt_skip)]
fn make_state_machine(self,
min_section_size: usize,
outbox: &mut EventBox)
-> (RoutingActionSender, StateMachine) {
let full_id = FullId::new();
StateMachine::new(move |crust_service, timer, outbox2| if self.first {
if let Some(state) = states::Node::first(self.cache,
crust_service,
full_id,
min_section_size,
timer) {
State::Node(state)
} else {
State::Terminated
}
} else if
self.deny_other_local_nodes && crust_service.has_peers_on_lan() {
error!("Bootstrapping({:?}) More than 1 routing node found on LAN. \
Currently this is not supported",
full_id.public_id().name());
outbox2.send_event(Event::Terminate);
State::Terminated
} else {
states::Bootstrapping::new(self.cache,
false,
crust_service,
full_id,
min_section_size,
timer)
.map_or(State::Terminated, State::Bootstrapping)
},
outbox)
}
}
pub struct Node {
interface_result_tx: Sender<Result<(), InterfaceError>>,
interface_result_rx: Receiver<Result<(), InterfaceError>>,
machine: StateMachine,
event_buffer: EventBuf,
}
impl Node {
pub fn builder() -> NodeBuilder {
NodeBuilder {
cache: Box::new(NullCache),
first: false,
deny_other_local_nodes: false,
}
}
pub fn send_get_request(&mut self,
src: Authority<XorName>,
dst: Authority<XorName>,
data_request: DataIdentifier,
id: MessageId)
-> Result<(), InterfaceError> {
let user_msg = UserMessage::Request(Request::Get(data_request, id));
self.send_action(src, dst, user_msg, RELOCATE_PRIORITY)
}
pub fn send_put_request(&mut self,
src: Authority<XorName>,
dst: Authority<XorName>,
data: Data,
id: MessageId)
-> Result<(), InterfaceError> {
let user_msg = UserMessage::Request(Request::Put(data, id));
self.send_action(src, dst, user_msg, DEFAULT_PRIORITY)
}
pub fn send_post_request(&mut self,
src: Authority<XorName>,
dst: Authority<XorName>,
data: Data,
id: MessageId)
-> Result<(), InterfaceError> {
let user_msg = UserMessage::Request(Request::Post(data, id));
self.send_action(src, dst, user_msg, DEFAULT_PRIORITY)
}
pub fn send_delete_request(&mut self,
src: Authority<XorName>,
dst: Authority<XorName>,
data: Data,
id: MessageId)
-> Result<(), InterfaceError> {
let user_msg = UserMessage::Request(Request::Delete(data, id));
self.send_action(src, dst, user_msg, DEFAULT_PRIORITY)
}
pub fn send_get_success(&mut self,
src: Authority<XorName>,
dst: Authority<XorName>,
data: Data,
id: MessageId)
-> Result<(), InterfaceError> {
let user_msg = UserMessage::Response(Response::GetSuccess(data, id));
let priority = if dst.is_client() {
CLIENT_GET_PRIORITY
} else {
RELOCATE_PRIORITY
};
self.send_action(src, dst, user_msg, priority)
}
pub fn send_get_failure(&mut self,
src: Authority<XorName>,
dst: Authority<XorName>,
data_id: DataIdentifier,
external_error_indicator: Vec<u8>,
id: MessageId)
-> Result<(), InterfaceError> {
let user_msg = UserMessage::Response(Response::GetFailure {
id: id,
data_id: data_id,
external_error_indicator: external_error_indicator,
});
let priority = if dst.is_client() {
CLIENT_GET_PRIORITY
} else {
RELOCATE_PRIORITY
};
self.send_action(src, dst, user_msg, priority)
}
pub fn send_put_success(&mut self,
src: Authority<XorName>,
dst: Authority<XorName>,
name: DataIdentifier,
id: MessageId)
-> Result<(), InterfaceError> {
let user_msg = UserMessage::Response(Response::PutSuccess(name, id));
self.send_action(src, dst, user_msg, DEFAULT_PRIORITY)
}
pub fn send_put_failure(&mut self,
src: Authority<XorName>,
dst: Authority<XorName>,
data_id: DataIdentifier,
external_error_indicator: Vec<u8>,
id: MessageId)
-> Result<(), InterfaceError> {
let user_msg = UserMessage::Response(Response::PutFailure {
id: id,
data_id: data_id,
external_error_indicator: external_error_indicator,
});
self.send_action(src, dst, user_msg, DEFAULT_PRIORITY)
}
pub fn send_post_success(&mut self,
src: Authority<XorName>,
dst: Authority<XorName>,
name: DataIdentifier,
id: MessageId)
-> Result<(), InterfaceError> {
let user_msg = UserMessage::Response(Response::PostSuccess(name, id));
self.send_action(src, dst, user_msg, DEFAULT_PRIORITY)
}
pub fn send_post_failure(&mut self,
src: Authority<XorName>,
dst: Authority<XorName>,
data_id: DataIdentifier,
external_error_indicator: Vec<u8>,
id: MessageId)
-> Result<(), InterfaceError> {
let user_msg = UserMessage::Response(Response::PostFailure {
id: id,
data_id: data_id,
external_error_indicator: external_error_indicator,
});
self.send_action(src, dst, user_msg, DEFAULT_PRIORITY)
}
pub fn send_delete_success(&mut self,
src: Authority<XorName>,
dst: Authority<XorName>,
name: DataIdentifier,
id: MessageId)
-> Result<(), InterfaceError> {
let user_msg = UserMessage::Response(Response::DeleteSuccess(name, id));
self.send_action(src, dst, user_msg, DEFAULT_PRIORITY)
}
pub fn send_delete_failure(&mut self,
src: Authority<XorName>,
dst: Authority<XorName>,
data_id: DataIdentifier,
external_error_indicator: Vec<u8>,
id: MessageId)
-> Result<(), InterfaceError> {
let user_msg = UserMessage::Response(Response::DeleteFailure {
id: id,
data_id: data_id,
external_error_indicator: external_error_indicator,
});
self.send_action(src, dst, user_msg, DEFAULT_PRIORITY)
}
pub fn send_append_success(&mut self,
src: Authority<XorName>,
dst: Authority<XorName>,
name: DataIdentifier,
id: MessageId)
-> Result<(), InterfaceError> {
let user_msg = UserMessage::Response(Response::AppendSuccess(name, id));
self.send_action(src, dst, user_msg, DEFAULT_PRIORITY)
}
pub fn send_append_failure(&mut self,
src: Authority<XorName>,
dst: Authority<XorName>,
data_id: DataIdentifier,
external_error_indicator: Vec<u8>,
id: MessageId)
-> Result<(), InterfaceError> {
let user_msg = UserMessage::Response(Response::AppendFailure {
id: id,
data_id: data_id,
external_error_indicator: external_error_indicator,
});
self.send_action(src, dst, user_msg, DEFAULT_PRIORITY)
}
pub fn send_get_account_info_success(&mut self,
src: Authority<XorName>,
dst: Authority<XorName>,
data_stored: u64,
space_available: u64,
id: MessageId)
-> Result<(), InterfaceError> {
let user_msg = UserMessage::Response(Response::GetAccountInfoSuccess {
id: id,
data_stored: data_stored,
space_available: space_available,
});
self.send_action(src, dst, user_msg, CLIENT_GET_PRIORITY)
}
pub fn send_get_account_info_failure(&mut self,
src: Authority<XorName>,
dst: Authority<XorName>,
external_error_indicator: Vec<u8>,
id: MessageId)
-> Result<(), InterfaceError> {
let user_msg = UserMessage::Response(Response::GetAccountInfoFailure {
id: id,
external_error_indicator: external_error_indicator,
});
self.send_action(src, dst, user_msg, CLIENT_GET_PRIORITY)
}
pub fn send_refresh_request(&mut self,
src: Authority<XorName>,
dst: Authority<XorName>,
content: Vec<u8>,
id: MessageId)
-> Result<(), InterfaceError> {
let user_msg = UserMessage::Request(Request::Refresh(content, id));
self.send_action(src, dst, user_msg, RELOCATE_PRIORITY)
}
pub fn close_group(&self, name: XorName, count: usize) -> Option<Vec<XorName>> {
self.machine.close_group(name, count)
}
pub fn name(&self) -> Result<XorName, RoutingError> {
self.machine.name().ok_or(RoutingError::Terminated)
}
fn send_action(&mut self,
src: Authority<XorName>,
dst: Authority<XorName>,
user_msg: UserMessage,
priority: u8)
-> Result<(), InterfaceError> {
self.poll();
let action = Action::NodeSendMessage {
src: src,
dst: dst,
content: user_msg,
priority: priority,
result_tx: self.interface_result_tx.clone(),
};
let transition = self.machine.current_mut().handle_action(action, &mut self.event_buffer);
self.machine.apply_transition(transition, &mut self.event_buffer);
self.receive_action_result(&self.interface_result_rx)?
}
fn receive_action_result<T>(&self, rx: &Receiver<T>) -> Result<T, InterfaceError> {
Ok(rx.recv()?)
}
}
impl EventStepper for Node {
type Item = Event;
fn produce_events(&mut self) -> Result<(), RecvError> {
self.machine.step(&mut self.event_buffer)
}
fn try_produce_events(&mut self) -> Result<(), TryRecvError> {
self.machine.try_step(&mut self.event_buffer)
}
fn pop_item(&mut self) -> Option<Event> {
self.event_buffer.take_first()
}
}
#[cfg(feature = "use-mock-crust")]
impl Node {
pub fn resend_unacknowledged(&mut self) -> bool {
self.machine.current_mut().resend_unacknowledged()
}
pub fn has_unacknowledged(&mut self) -> bool {
self.machine.current().has_unacknowledged()
}
pub fn routing_table(&self) -> Option<RoutingTable<XorName>> {
self.machine
.current()
.routing_table()
.cloned()
}
pub fn has_tunnel_clients(&self, client_1: PeerId, client_2: PeerId) -> bool {
self.machine.current().has_tunnel_clients(client_1, client_2)
}
pub fn clear_state(&mut self) {
self.machine.current_mut().clear_state();
}
pub fn section_list_signatures(&self,
prefix: Prefix<XorName>)
-> Option<BTreeMap<PublicId, sign::Signature>> {
self.machine.current().section_list_signatures(prefix)
}
pub fn is_node(&self) -> bool {
if let State::Node(..) = *self.machine.current() {
true
} else {
false
}
}
pub fn set_next_node_name(&mut self, relocation_name: XorName) {
self.machine.current_mut().set_next_node_name(Some(relocation_name))
}
pub fn clear_next_node_name(&mut self) {
self.machine.current_mut().set_next_node_name(None)
}
}
#[cfg(feature = "use-mock-crust")]
impl Debug for Node {
fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
self.machine.fmt(formatter)
}
}
impl Drop for Node {
fn drop(&mut self) {
self.poll();
let _ = self.machine.current_mut().handle_action(Action::Terminate, &mut self.event_buffer);
let _ = self.event_buffer.take_all();
}
}