#[cfg(not(feature = "use-mock-crust"))]
use maidsafe_utilities::thread::RaiiThreadJoiner;
use sodiumoxide;
#[cfg(feature = "use-mock-crust")]
use std::cell::RefCell;
use std::sync::mpsc::{Receiver, Sender, channel};
use action::Action;
use authority::Authority;
use core::{Core, Role};
use data::{Data, DataIdentifier};
use error::{InterfaceError, RoutingError};
use event::Event;
use messages::{UserMessage, Request, Response, RELOCATE_PRIORITY, DEFAULT_PRIORITY,
CLIENT_GET_PRIORITY};
use xor_name::XorName;
use types::MessageId;
type RoutingResult = Result<(), RoutingError>;
pub struct Node {
interface_result_tx: Sender<Result<(), InterfaceError>>,
interface_result_rx: Receiver<Result<(), InterfaceError>>,
action_sender: ::types::RoutingActionSender,
#[cfg(feature = "use-mock-crust")]
core: RefCell<Core>,
#[cfg(not(feature = "use-mock-crust"))]
_raii_joiner: ::maidsafe_utilities::thread::RaiiThreadJoiner,
}
impl Node {
#[cfg(not(feature = "use-mock-crust"))]
pub fn new(event_sender: Sender<Event>, first_node: bool) -> Result<Node, RoutingError> {
sodiumoxide::init();
let role = if first_node {
Role::FirstNode
} else {
Role::Node
};
let (action_sender, mut core) = Core::new(event_sender, role, None);
let (tx, rx) = channel();
let raii_joiner = RaiiThreadJoiner::new(thread!("Node thread", move || {
core.run();
}));
Ok(Node {
interface_result_tx: tx,
interface_result_rx: rx,
action_sender: action_sender,
_raii_joiner: raii_joiner,
})
}
#[cfg(feature = "use-mock-crust")]
pub fn new(event_sender: Sender<Event>, first_node: bool) -> Result<Node, RoutingError> {
sodiumoxide::init();
let role = if first_node {
Role::FirstNode
} else {
Role::Node
};
let (action_sender, core) = Core::new(event_sender, role, None);
let (tx, rx) = channel();
Ok(Node {
interface_result_tx: tx,
interface_result_rx: rx,
action_sender: action_sender,
core: RefCell::new(core),
})
}
#[cfg(feature = "use-mock-crust")]
pub fn poll(&self) -> bool {
self.core.borrow_mut().poll()
}
#[cfg(feature = "use-mock-crust")]
pub fn resend_unacknowledged(&self) -> bool {
self.core.borrow_mut().resend_unacknowledged()
}
#[cfg(feature = "use-mock-crust")]
pub fn clear_state(&self) {
self.core.borrow_mut().clear_state()
}
pub fn send_get_request(&self,
src: Authority,
dst: Authority,
data_request: DataIdentifier,
id: MessageId)
-> Result<(), InterfaceError> {
let user_msg = UserMessage::Request(Request::Get(data_request, id));
self.send_action(src, dst, user_msg, RELOCATE_PRIORITY)
}
pub fn send_put_request(&self,
src: Authority,
dst: Authority,
data: Data,
id: MessageId)
-> Result<(), InterfaceError> {
let user_msg = UserMessage::Request(Request::Put(data, id));
self.send_action(src, dst, user_msg, DEFAULT_PRIORITY)
}
pub fn send_post_request(&self,
src: Authority,
dst: Authority,
data: Data,
id: MessageId)
-> Result<(), InterfaceError> {
let user_msg = UserMessage::Request(Request::Post(data, id));
self.send_action(src, dst, user_msg, DEFAULT_PRIORITY)
}
pub fn send_delete_request(&self,
src: Authority,
dst: Authority,
data: Data,
id: MessageId)
-> Result<(), InterfaceError> {
let user_msg = UserMessage::Request(Request::Delete(data, id));
self.send_action(src, dst, user_msg, DEFAULT_PRIORITY)
}
pub fn send_get_success(&self,
src: Authority,
dst: Authority,
data: Data,
id: MessageId)
-> Result<(), InterfaceError> {
let user_msg = UserMessage::Response(Response::GetSuccess(data, id));
let priority = if let Authority::Client { .. } = dst {
CLIENT_GET_PRIORITY
} else {
RELOCATE_PRIORITY
};
self.send_action(src, dst, user_msg, priority)
}
pub fn send_get_failure(&self,
src: Authority,
dst: Authority,
data_id: DataIdentifier,
external_error_indicator: Vec<u8>,
id: MessageId)
-> Result<(), InterfaceError> {
let user_msg = UserMessage::Response(Response::GetFailure {
id: id,
data_id: data_id,
external_error_indicator: external_error_indicator,
});
let priority = if let Authority::Client { .. } = dst {
CLIENT_GET_PRIORITY
} else {
RELOCATE_PRIORITY
};
self.send_action(src, dst, user_msg, priority)
}
pub fn send_put_success(&self,
src: Authority,
dst: Authority,
name: DataIdentifier,
id: MessageId)
-> Result<(), InterfaceError> {
let user_msg = UserMessage::Response(Response::PutSuccess(name, id));
self.send_action(src, dst, user_msg, DEFAULT_PRIORITY)
}
pub fn send_put_failure(&self,
src: Authority,
dst: Authority,
data_id: DataIdentifier,
external_error_indicator: Vec<u8>,
id: MessageId)
-> Result<(), InterfaceError> {
let user_msg = UserMessage::Response(Response::PutFailure {
id: id,
data_id: data_id,
external_error_indicator: external_error_indicator,
});
self.send_action(src, dst, user_msg, DEFAULT_PRIORITY)
}
pub fn send_post_success(&self,
src: Authority,
dst: Authority,
name: DataIdentifier,
id: MessageId)
-> Result<(), InterfaceError> {
let user_msg = UserMessage::Response(Response::PostSuccess(name, id));
self.send_action(src, dst, user_msg, DEFAULT_PRIORITY)
}
pub fn send_post_failure(&self,
src: Authority,
dst: Authority,
data_id: DataIdentifier,
external_error_indicator: Vec<u8>,
id: MessageId)
-> Result<(), InterfaceError> {
let user_msg = UserMessage::Response(Response::PostFailure {
id: id,
data_id: data_id,
external_error_indicator: external_error_indicator,
});
self.send_action(src, dst, user_msg, DEFAULT_PRIORITY)
}
pub fn send_delete_success(&self,
src: Authority,
dst: Authority,
name: DataIdentifier,
id: MessageId)
-> Result<(), InterfaceError> {
let user_msg = UserMessage::Response(Response::DeleteSuccess(name, id));
self.send_action(src, dst, user_msg, DEFAULT_PRIORITY)
}
pub fn send_delete_failure(&self,
src: Authority,
dst: Authority,
data_id: DataIdentifier,
external_error_indicator: Vec<u8>,
id: MessageId)
-> Result<(), InterfaceError> {
let user_msg = UserMessage::Response(Response::DeleteFailure {
id: id,
data_id: data_id,
external_error_indicator: external_error_indicator,
});
self.send_action(src, dst, user_msg, DEFAULT_PRIORITY)
}
pub fn send_get_account_info_success(&self,
src: Authority,
dst: Authority,
data_stored: u64,
space_available: u64,
id: MessageId)
-> Result<(), InterfaceError> {
let user_msg = UserMessage::Response(Response::GetAccountInfoSuccess {
id: id,
data_stored: data_stored,
space_available: space_available,
});
self.send_action(src, dst, user_msg, CLIENT_GET_PRIORITY)
}
pub fn send_get_account_info_failure(&self,
src: Authority,
dst: Authority,
external_error_indicator: Vec<u8>,
id: MessageId)
-> Result<(), InterfaceError> {
let user_msg = UserMessage::Response(Response::GetAccountInfoFailure {
id: id,
external_error_indicator: external_error_indicator,
});
self.send_action(src, dst, user_msg, CLIENT_GET_PRIORITY)
}
pub fn send_refresh_request(&self,
src: Authority,
dst: Authority,
content: Vec<u8>,
id: MessageId)
-> Result<(), InterfaceError> {
let user_msg = UserMessage::Request(Request::Refresh(content, id));
self.send_action(src, dst, user_msg, RELOCATE_PRIORITY)
}
pub fn close_group(&self, name: XorName) -> Result<Option<Vec<XorName>>, InterfaceError> {
let (result_tx, result_rx) = channel();
try!(self.action_sender.send(Action::CloseGroup {
name: name,
result_tx: result_tx,
}));
self.receive_action_result(&result_rx)
}
pub fn name(&self) -> Result<XorName, InterfaceError> {
let (result_tx, result_rx) = channel();
try!(self.action_sender.send(Action::Name { result_tx: result_tx }));
self.receive_action_result(&result_rx)
}
pub fn quorum_size(&self) -> Result<usize, InterfaceError> {
let (result_tx, result_rx) = channel();
try!(self.action_sender.send(Action::QuorumSize { result_tx: result_tx }));
self.receive_action_result(&result_rx)
}
fn send_action(&self,
src: Authority,
dst: Authority,
user_msg: UserMessage,
priority: u8)
-> Result<(), InterfaceError> {
try!(self.action_sender.send(Action::NodeSendMessage {
src: src,
dst: dst,
content: user_msg,
priority: priority,
result_tx: self.interface_result_tx.clone(),
}));
try!(self.receive_action_result(&self.interface_result_rx))
}
#[cfg(not(feature = "use-mock-crust"))]
fn receive_action_result<T>(&self, rx: &Receiver<T>) -> Result<T, InterfaceError> {
Ok(try!(rx.recv()))
}
#[cfg(feature = "use-mock-crust")]
fn receive_action_result<T>(&self, rx: &Receiver<T>) -> Result<T, InterfaceError> {
while self.poll() {}
Ok(try!(rx.recv()))
}
}
impl Drop for Node {
fn drop(&mut self) {
if let Err(err) = self.action_sender.send(Action::Terminate) {
error!("Error {:?} sending event Core", err);
}
}
}