use super::MIN_SECTION_SIZE;
use lru_time_cache::LruCache;
use maidsafe_utilities::serialisation::{deserialise, serialise};
use routing::{Authority, Data, DataIdentifier, Event, EventStream, MessageId, Node, Prefix, Request,
Response, XorName};
use std::collections::HashMap;
use std::time::Duration;
pub struct ExampleNode {
node: Node,
db: HashMap<DataIdentifier, Data>,
client_accounts: HashMap<XorName, u64>,
put_request_cache: LruCache<MessageId, (Authority<XorName>, Authority<XorName>)>,
}
impl ExampleNode {
pub fn new(first: bool) -> ExampleNode {
let node = unwrap!(Node::builder().first(first).create(MIN_SECTION_SIZE));
ExampleNode {
node: node,
db: HashMap::new(),
client_accounts: HashMap::new(),
put_request_cache: LruCache::with_expiry_duration(Duration::from_secs(60 * 10)),
}
}
pub fn run(&mut self) {
while let Ok(event) = self.node.next_ev() {
match event {
Event::Request { request, src, dst } => self.handle_request(request, src, dst),
Event::Response { response, src, dst } => self.handle_response(response, src, dst),
Event::NodeAdded(name, _routing_table) => {
trace!("{} Received NodeAdded event {:?}",
self.get_debug_name(),
name);
self.handle_node_added(name);
}
Event::NodeLost(name, _routing_table) => {
trace!("{} Received NodeLost event {:?}",
self.get_debug_name(),
name);
}
Event::Connected => {
trace!("{} Received connected event", self.get_debug_name());
}
Event::Terminate => {
info!("{} Received Terminate event", self.get_debug_name());
break;
}
Event::RestartRequired => {
info!("{} Received RestartRequired event", self.get_debug_name());
self.node = unwrap!(Node::builder().create(MIN_SECTION_SIZE));
}
Event::SectionSplit(prefix) => {
trace!("{} Received SectionSplit event {:?}",
self.get_debug_name(),
prefix);
self.handle_split(prefix);
}
Event::SectionMerge(prefix) => {
trace!("{} Received SectionMerge event {:?}",
self.get_debug_name(),
prefix);
let pfx = Prefix::new(prefix.bit_count() + 1, unwrap!(self.node.name()));
self.send_refresh(MessageId::from_lost_node(pfx.lower_bound()));
}
event => {
trace!("{} Received {:?} event", self.get_debug_name(), event);
}
}
}
}
fn handle_request(&mut self,
request: Request,
src: Authority<XorName>,
dst: Authority<XorName>) {
match request {
Request::Get(data_id, id) => {
self.handle_get_request(data_id, id, src, dst);
}
Request::Put(data, id) => {
self.handle_put_request(data, id, src, dst);
}
Request::Post(..) => {
warn!("{:?} ExampleNode: Post unimplemented.",
self.get_debug_name());
}
Request::Delete(..) => {
warn!("{:?} ExampleNode: Delete unimplemented.",
self.get_debug_name());
}
Request::GetAccountInfo(..) => {
warn!("{:?} ExampleNode: GetAccountInfo unimplemented.",
self.get_debug_name());
}
Request::Refresh(content, id) => {
self.handle_refresh(content, id);
}
Request::Append(_, _) => {
warn!("{:?} ExampleNode: Append unimplemented.",
self.get_debug_name());
}
}
}
fn handle_response(&mut self,
response: Response,
_src: Authority<XorName>,
dst: Authority<XorName>) {
match (response, dst) {
(Response::PutSuccess(data_id, id), Authority::ClientManager(_name)) => {
if let Some((src, dst)) = self.put_request_cache.remove(&id) {
unwrap!(self.node.send_put_success(src, dst, data_id, id));
}
}
_ => unreachable!(),
}
}
fn handle_get_request(&mut self,
data_id: DataIdentifier,
id: MessageId,
src: Authority<XorName>,
dst: Authority<XorName>) {
match (src, dst) {
(src @ Authority::Client { .. }, dst @ Authority::NaeManager(_)) => {
if let Some(data) = self.db.get(&data_id) {
unwrap!(self.node.send_get_success(dst, src, data.clone(), id))
} else {
trace!("{:?} GetDataRequest failed for {:?}.",
self.get_debug_name(),
data_id.name());
let text = "Data not found".to_owned().into_bytes();
unwrap!(self.node.send_get_failure(dst, src, data_id, text, id));
return;
}
}
(src, dst) => unreachable!("Wrong Src and Dest Authority {:?} - {:?}", src, dst),
}
}
fn handle_put_request(&mut self,
data: Data,
id: MessageId,
src: Authority<XorName>,
dst: Authority<XorName>) {
match dst {
Authority::NaeManager(_) => {
trace!("{:?} Storing : key {:?}, value {:?}",
self.get_debug_name(),
data.name(),
data);
let _ = self.node.send_put_success(dst, src, data.identifier(), id);
let _ = self.db.insert(data.identifier(), data);
}
Authority::ClientManager(_) => {
trace!("{:?} Put Request: Updating ClientManager: key {:?}, value {:?}",
self.get_debug_name(),
data.name(),
data);
{
let src = dst;
let dst = Authority::NaeManager(*data.name());
unwrap!(self.node.send_put_request(src, dst, data, id));
}
if self.put_request_cache.insert(id, (dst, src)).is_some() {
warn!("Overwrote message {:?} in put_request_cache.", id);
}
}
_ => unreachable!("ExampleNode: Unexpected dst ({:?})", dst),
}
}
fn handle_node_added(&mut self, name: XorName) {
self.send_refresh(MessageId::from_added_node(name));
}
fn handle_split(&mut self, prefix: Prefix<XorName>) {
let deleted_clients: Vec<_> = self.client_accounts
.iter()
.filter(|&(client_name, _)| !prefix.matches(client_name))
.map(|(client_name, _)| *client_name)
.collect();
for client in &deleted_clients {
let _ = self.client_accounts.remove(client);
}
let deleted_data: Vec<_> = self.db
.iter()
.filter(|&(data_id, _)| !prefix.matches(data_id.name()))
.map(|(data_id, _)| *data_id)
.collect();
for data_id in &deleted_data {
let _ = self.db.remove(data_id);
}
}
fn send_refresh(&mut self, id: MessageId) {
for (client_name, stored) in &self.client_accounts {
let refresh_content = RefreshContent::Client {
client_name: *client_name,
data: *stored,
};
let content = unwrap!(serialise(&refresh_content));
let auth = Authority::ClientManager(*client_name);
unwrap!(self.node.send_refresh_request(auth, auth, content, id));
}
for (data_id, data) in &self.db {
let refresh_content = RefreshContent::NaeManager {
data_id: *data_id,
data: data.clone(),
};
let content = unwrap!(serialise(&refresh_content));
let auth = Authority::NaeManager(*data.name());
unwrap!(self.node.send_refresh_request(auth, auth, content, id));
}
}
fn handle_refresh(&mut self, content: Vec<u8>, _id: MessageId) {
match unwrap!(deserialise(&content)) {
RefreshContent::Client { client_name, data } => {
trace!("{:?} handle_refresh for ClientManager. client - {:?}",
self.get_debug_name(),
client_name);
let _ = self.client_accounts.insert(client_name, data);
}
RefreshContent::NaeManager { data_id, data } => {
trace!("{:?} handle_refresh for NaeManager. data - {:?}",
self.get_debug_name(),
data_id);
let _ = self.db.insert(data_id, data);
}
}
}
fn get_debug_name(&self) -> String {
match self.node.name() {
Ok(name) => format!("Node({:?})", name),
Err(err) => {
error!("Could not get node name - {:?}", err);
"Node(unknown)".to_owned()
}
}
}
}
#[derive(RustcEncodable, RustcDecodable)]
#[cfg_attr(feature="cargo-clippy", allow(large_enum_variant))]
enum RefreshContent {
Client { client_name: XorName, data: u64 },
NaeManager { data_id: DataIdentifier, data: Data },
}