use xor_name::XorName;
use authority::Authority;
use messages::{RequestMessage, ResponseMessage, RequestContent, ResponseContent};
pub struct Node {
routing: ::routing::Routing,
receiver: ::std::sync::mpsc::Receiver<::event::Event>,
sender: ::std::sync::mpsc::Sender<::event::Event>,
db: ::std::collections::BTreeMap<XorName, ::data::Data>,
client_accounts: ::std::collections::BTreeMap<XorName, u64>,
connected: bool,
}
impl Node {
pub fn new() -> Node {
let (sender, receiver) = ::std::sync::mpsc::channel::<::event::Event>();
let routing = unwrap_result!(::routing::Routing::new(sender.clone()));
Node {
routing: routing,
receiver: receiver,
sender: sender,
db: ::std::collections::BTreeMap::new(),
client_accounts: ::std::collections::BTreeMap::new(),
connected: false,
}
}
pub fn run(&mut self) {
while let Ok(event) = self.receiver.recv() {
match event {
::event::Event::Request(msg) => self.handle_request(msg),
::event::Event::Response(msg) => self.handle_response(msg),
::event::Event::Refresh(type_tag, src, vec_of_bytes) => {
debug!("Received refresh event");
if type_tag != 1u64 {
error!("Received refresh for tag {:?} from {:?}", type_tag, src);
continue;
};
self.handle_refresh(src, vec_of_bytes);
}
::event::Event::DoRefresh(type_tag, src, cause) => {
debug!("Received do refresh event");
if type_tag != 1u64 {
error!("Received DoRefresh for tag {:?} from {:?}", type_tag, src);
continue;
};
self.handle_do_refresh(src, cause);
}
::event::Event::Churn(close_group) => {
debug!("Received churn event");
self.handle_churn(close_group)
}
::event::Event::Connected => {
debug!("Received connected event");
self.connected = true;
}
::event::Event::Disconnected => debug!("Received disconnected event"),
::event::Event::Terminated => {
debug!("Received terminate event");
self.stop();
break;
}
}
}
}
pub fn get_sender(&self) -> ::std::sync::mpsc::Sender<::event::Event> {
self.sender.clone()
}
pub fn stop(&mut self) {
debug!("Node terminating.");
self.routing.stop();
}
fn handle_request(&mut self, msg: RequestMessage) {
match msg.content {
RequestContent::Get(data_request) => {
self.handle_get_request(data_request, msg.src, msg.dst);
}
RequestContent::Put(data) => {
self.handle_put_request(data, msg.src, msg.dst);
}
RequestContent::Post(_) => {
debug!("Node: Post unimplemented.");
}
RequestContent::Delete(_) => {
debug!("Node: Delete unimplemented.");
}
_ => (),
}
}
fn handle_get_request(&mut self,
data_request: ::data::DataRequest,
src: Authority,
dst: Authority) {
match self.db.get(&data_request.name()) {
Some(data) => {
unwrap_result!(self.routing
.send_get_response(src,
dst,
ResponseContent::GetSuccess(data.clone())))
}
None => {
debug!("GetDataRequest failed for {:?}.", data_request.name());
return;
}
}
}
fn handle_put_request(&mut self, data: ::data::Data, src: Authority, _dst: Authority) {
match src {
Authority::NaeManager(_) => {
debug!("Storing: key {:?}, value {:?}", data.name(), data);
let _ = self.db.insert(data.name(), data);
}
Authority::ClientManager(_) => {
debug!("Sending: key {:?}, value {:?}", data.name(), data);
let dst = Authority::NaeManager(data.name());
let request_content = RequestContent::Put(data);
unwrap_result!(self.routing.send_put_request(src, dst, request_content));
}
_ => {
debug!("Node: Unexpected src ({:?})", src);
assert!(false);
}
}
}
fn handle_churn(&mut self, our_close_group: Vec<XorName>) {
let mut exit = false;
if our_close_group.len() < ::kademlia_routing_table::group_size() {
if self.connected {
debug!("Close group ({:?}) has fallen below group size {:?}, terminating node",
our_close_group.len(),
::kademlia_routing_table::group_size());
exit = true;
} else {
debug!("Ignoring churn as we are not yet connected.");
return;
}
}
let cause = ::rand::random::<XorName>();
debug!("Handle churn for close group size {:?}",
our_close_group.len());
for (client_name, stored) in &self.client_accounts {
debug!("REFRESH {:?} - {:?}", client_name, stored);
unwrap_result!(self.routing.send_refresh_request(
1u64,
Authority::ClientManager(client_name.clone()),
unwrap_result!(::maidsafe_utilities::serialisation::serialise(&stored)),
cause));
}
if exit {
self.routing.stop();
}
}
fn handle_refresh(&mut self, src: Authority, vec_of_bytes: Vec<Vec<u8>>) {
let mut records: Vec<u64> = Vec::new();
let mut fail_parsing_count = 0usize;
for bytes in vec_of_bytes {
match ::maidsafe_utilities::serialisation::deserialise(&bytes) {
Ok(record) => records.push(record),
Err(_) => fail_parsing_count += 1usize,
}
}
let median = median(records.clone());
debug!("Refresh for {:?}: median {:?} from {:?} (errs {:?})",
src,
median,
records,
fail_parsing_count);
if let Authority::ClientManager(client_name) = src {
let _ = self.client_accounts.insert(client_name, median);
}
}
fn handle_do_refresh(&self, src: Authority, cause: XorName) {
if let Authority::ClientManager(client_name) = src {
match self.client_accounts.get(&client_name) {
Some(stored) => {
debug!("DoRefresh for client {:?} storing {:?} caused by {:?}",
client_name,
stored,
cause);
unwrap_result!(
self.routing.send_refresh_request(
1u64,
Authority::ClientManager(client_name.clone()),
unwrap_result!(::maidsafe_utilities::serialisation::serialise(&stored)),
cause.clone()));
}
None => (),
}
}
}
fn handle_response(&mut self, _msg: ResponseMessage) {
unimplemented!()
}
}
pub fn median(mut values: Vec<u64>) -> u64 {
match values.len() {
0 => 0u64,
1 => values[0],
len if len % 2 == 0 => {
values.sort();
let lower_value = values[(len / 2) - 1];
let upper_value = values[len / 2];
(lower_value + upper_value) / 2
}
len => {
values.sort();
values[len / 2]
}
}
}