pub struct Node {
routing: ::routing::Routing,
receiver: ::std::sync::mpsc::Receiver<::event::Event>,
sender: ::std::sync::mpsc::Sender<::event::Event>,
db: ::std::collections::BTreeMap<::NameType, ::data::Data>,
client_accounts: ::std::collections::BTreeMap<::NameType, u64>,
connected: bool,
}
impl Node {
pub fn new() -> Node {
let (sender, receiver) = ::std::sync::mpsc::channel::<::event::Event>();
let routing = ::routing::Routing::new(sender.clone());
Node {
routing: routing,
receiver: receiver,
sender: sender,
db: ::std::collections::BTreeMap::new(),
client_accounts: ::std::collections::BTreeMap::new(),
connected: false,
}
}
pub fn run(&mut self) {
while let Ok(event) = self.receiver.recv() {
debug!("Node: Received event {:?}", event);
match event {
::event::Event::Request{ request, our_authority, from_authority, response_token } =>
self.handle_request(request, our_authority, from_authority, response_token),
::event::Event::Response{ response, our_authority, from_authority } => {
debug!("Received response event");
self.handle_response(response, our_authority, from_authority)
},
::event::Event::Refresh(type_tag, our_authority, vec_of_bytes) => {
debug!("Received refresh event");
if type_tag != 1u64 { error!("Received refresh for tag {:?} from {:?}",
type_tag, our_authority); continue; };
self.handle_refresh(our_authority, vec_of_bytes);
},
::event::Event::DoRefresh(type_tag, our_authority, cause) => {
debug!("Received do refresh event");
if type_tag != 1u64 { error!("Received DoRefresh for tag {:?} from {:?}",
type_tag, our_authority); continue; };
self.handle_do_refresh(our_authority, cause);
},
::event::Event::Churn(close_group, cause) => {
debug!("Received churn event");
self.handle_churn(close_group, cause)
},
::event::Event::Bootstrapped => debug!("Received bootstraped event"),
::event::Event::Connected => {
debug!("Received connected event");
self.connected = true;
},
::event::Event::Disconnected => debug!("Received disconnected event"),
::event::Event::FailedRequest{ request, our_authority, location, interface_error } => {
debug!("Received failed request event");
self.handle_failed_request(request, our_authority, location, interface_error)
},
::event::Event::FailedResponse{ response, our_authority, location, interface_error } => {
debug!("Received failed response event");
self.handle_failed_response(response, our_authority, location, interface_error)
},
::event::Event::Terminated => {
debug!("Received terminate event");
self.stop();
break
},
};
}
}
pub fn get_sender(&self) -> ::std::sync::mpsc::Sender<::event::Event> {
self.sender.clone()
}
pub fn stop(&mut self) {
debug!("Node terminating.");
self.routing.stop();
}
fn handle_request(&mut self, request: ::ExternalRequest,
our_authority: ::authority::Authority,
from_authority: ::authority::Authority,
response_token: Option<::SignedToken>) {
match request {
::ExternalRequest::Get(data_request, _) => {
self.handle_get_request(data_request, our_authority, from_authority, response_token);
},
::ExternalRequest::Put(data) => {
self.handle_put_request(data, our_authority, from_authority, response_token);
},
::ExternalRequest::Post(_) => {
debug!("Node: Post unimplemented.");
},
::ExternalRequest::Delete(_) => {
debug!("Node: Delete unimplemented.");
},
}
}
fn handle_get_request(&mut self, data_request: ::data::DataRequest,
our_authority: ::authority::Authority,
from_authority: ::authority::Authority,
response_token: Option<::SignedToken>) {
let data = match self.db.get(&data_request.name()) {
Some(data) => data.clone(),
None => {
debug!("GetDataRequest failed for {:?}.", data_request.name());
return
}
};
self.routing.get_response(our_authority, from_authority, data, data_request, response_token);
}
fn handle_put_request(&mut self, data: ::data::Data,
our_authority: ::authority::Authority,
_from_authority: ::authority::Authority,
_response_token: Option<::SignedToken>) {
match our_authority {
::authority::Authority::NaeManager(_) => {
debug!("Storing: key {:?}, value {:?}", data.name(), data);
let _ = self.db.insert(data.name(), data);
},
::authority::Authority::ClientManager(_) => {
debug!("Sending: key {:?}, value {:?}", data.name(), data);
self.routing.put_request(
our_authority, ::authority::Authority::NaeManager(data.name()), data);
},
_ => {
debug!("Node: Unexpected our_authority ({:?})", our_authority);
assert!(false);
}
}
}
fn handle_churn(&mut self, our_close_group: Vec<::NameType>, cause: ::NameType) {
let mut exit = false;
if our_close_group.len() < ::types::GROUP_SIZE {
if self.connected {
debug!("Close group ({:?}) has fallen below group size {:?}, terminating node",
our_close_group.len(), ::types::GROUP_SIZE);
exit = true;
} else {
debug!("Ignoring churn as we are not yet connected.");
return;
}
}
debug!("Handle churn for close group size {:?}", our_close_group.len());
for (client_name, stored) in self.client_accounts.iter() {
debug!("REFRESH {:?} - {:?}", client_name, stored);
self.routing.refresh_request(1u64,
::authority::Authority::ClientManager(client_name.clone()),
::utils::encode(&stored).unwrap(), cause.clone());
}
if exit { self.routing.stop(); };
}
fn handle_refresh(&mut self, our_authority: ::authority::Authority, vec_of_bytes: Vec<Vec<u8>>) {
let mut records : Vec<u64> = Vec::new();
let mut fail_parsing_count = 0usize;
for bytes in vec_of_bytes {
match ::utils::decode(&bytes) {
Ok(record) => records.push(record),
Err(_) => fail_parsing_count += 1usize,
};
}
let median = median(records.clone());
debug!("Refresh for {:?}: median {:?} from {:?} (errs {:?})", our_authority, median,
records, fail_parsing_count);
match our_authority {
::authority::Authority::ClientManager(client_name) => {
let _ = self.client_accounts.insert(client_name, median);
},
_ => {},
};
}
fn handle_do_refresh(&self, our_authority: ::authority::Authority, cause: ::NameType) {
match our_authority {
::authority::Authority::ClientManager(client_name) => {
match self.client_accounts.get(&client_name) {
Some(stored) => {
debug!("DoRefresh for client {:?} storing {:?} caused by {:?}",
client_name, stored, cause);
self.routing.refresh_request(1u64,
::authority::Authority::ClientManager(client_name.clone()),
::utils::encode(&stored).unwrap(), cause.clone());
},
None => {},
};
},
_ => {},
};
}
fn handle_response(&mut self, _response: ::ExternalResponse,
_our_authority: ::authority::Authority,
_from_authority: ::authority::Authority,) {
unimplemented!();
}
fn handle_failed_request(&mut self, _request: ::ExternalRequest,
_our_authority: Option<::authority::Authority>,
_location: ::authority::Authority,
_interface_error: ::error::InterfaceError) {
unimplemented!();
}
fn handle_failed_response(&mut self, _response: ::ExternalResponse,
_our_authority: Option<::authority::Authority>,
_location: ::authority::Authority,
_interface_error: ::error::InterfaceError) {
unimplemented!();
}
}
pub fn median(mut values: Vec<u64>) -> u64 {
match values.len() {
0 => 0u64,
1 => values[0],
len if len % 2 == 0 => {
values.sort();
let lower_value = values[(len / 2) - 1];
let upper_value = values[len / 2];
(lower_value + upper_value) / 2
}
len => {
values.sort();
values[len / 2]
}
}
}