use kademlia_routing_table::RoutingTable;
use lru_time_cache::LruCache;
use routing::{Authority, Data, DataIdentifier, Event, MessageId, Node, Request, Response, XorName};
use maidsafe_utilities::serialisation::{deserialise, serialise};
use std::collections::HashMap;
use std::mem;
use std::sync::mpsc;
use std::time::Duration;
pub struct ExampleNode {
node: Node,
receiver: mpsc::Receiver<Event>,
sender: mpsc::Sender<Event>,
db: HashMap<XorName, Data>,
client_accounts: HashMap<XorName, u64>,
put_request_cache: LruCache<MessageId, (Authority, Authority)>,
}
impl ExampleNode {
pub fn new(first: bool) -> ExampleNode {
let (sender, receiver) = mpsc::channel::<Event>();
let node = unwrap_result!(Node::builder().first(first).create(sender.clone()));
ExampleNode {
node: node,
receiver: receiver,
sender: sender,
db: HashMap::new(),
client_accounts: HashMap::new(),
put_request_cache: LruCache::with_expiry_duration(Duration::from_secs(60 * 10)),
}
}
pub fn run(&mut self) {
while let Ok(event) = self.receiver.recv() {
match event {
Event::Request { request, src, dst } => self.handle_request(request, src, dst),
Event::Response { response, src, dst } => self.handle_response(response, src, dst),
Event::NodeAdded(name, routing_table) => {
trace!("{} Received NodeAdded event {:?}",
self.get_debug_name(),
name);
self.handle_node_added(name, routing_table);
}
Event::NodeLost(name, routing_table) => {
trace!("{} Received NodeLost event {:?}",
self.get_debug_name(),
name);
self.handle_node_lost(name, routing_table);
}
Event::Connected => {
trace!("{} Received connected event", self.get_debug_name());
}
Event::Terminate => {
info!("{} Received Terminate event", self.get_debug_name());
break;
}
Event::RestartRequired => {
info!("{} Received RestartRequired event", self.get_debug_name());
let new_node = unwrap_result!(Node::builder().create(self.sender.clone()));
let _ = mem::replace(&mut self.node, new_node);
}
event => {
trace!("{} Received {:?} event", self.get_debug_name(), event);
}
}
}
}
fn handle_request(&mut self, request: Request, src: Authority, dst: Authority) {
match request {
Request::Get(data_id, id) => {
self.handle_get_request(data_id, id, src, dst);
}
Request::Put(data, id) => {
self.handle_put_request(data, id, src, dst);
}
Request::Post(..) => {
warn!("{:?} ExampleNode: Post unimplemented.",
self.get_debug_name());
}
Request::Delete(..) => {
warn!("{:?} ExampleNode: Delete unimplemented.",
self.get_debug_name());
}
Request::GetAccountInfo(..) => {
warn!("{:?} ExampleNode: GetAccountInfo unimplemented.",
self.get_debug_name());
}
Request::Refresh(content, id) => {
self.handle_refresh(content, id);
}
}
}
fn handle_response(&mut self, response: Response, _src: Authority, dst: Authority) {
match (response, dst.clone()) {
(Response::PutSuccess(data_id, id), Authority::ClientManager(_name)) => {
if let Some((src, dst)) = self.put_request_cache.remove(&id) {
unwrap_result!(self.node.send_put_success(src, dst, data_id, id));
}
}
_ => unreachable!(),
}
}
fn handle_get_request(&mut self,
data_id: DataIdentifier,
id: MessageId,
src: Authority,
dst: Authority) {
match (src, dst) {
(src @ Authority::Client{..}, dst @ Authority::NaeManager(_)) => {
if let Some(data) = self.db.get(data_id.name()) {
unwrap_result!(self.node.send_get_success(dst, src, data.clone(), id))
} else {
trace!("{:?} GetDataRequest failed for {:?}.",
self.get_debug_name(),
data_id.name());
let text = "Data not found".to_owned().into_bytes();
unwrap_result!(self.node.send_get_failure(dst, src, data_id, text, id));
return;
}
}
(src, dst) => unreachable!("Wrong Src and Dest Authority {:?} - {:?}", src, dst),
}
}
fn handle_put_request(&mut self, data: Data, id: MessageId, src: Authority, dst: Authority) {
match dst {
Authority::NaeManager(_) => {
trace!("{:?} Storing : key {:?}, value {:?}",
self.get_debug_name(),
data.name(),
data);
let _ = self.node
.send_put_success(dst, src, DataIdentifier::Plain(*data.name()), id);
let _ = self.db.insert(*data.name(), data);
}
Authority::ClientManager(_) => {
trace!("{:?} Put Request: Updating ClientManager: key {:?}, value {:?}",
self.get_debug_name(),
data.name(),
data);
{
let src = dst.clone();
let dst = Authority::NaeManager(*data.name());
unwrap_result!(self.node.send_put_request(src, dst, data, id));
}
if self.put_request_cache.insert(id, (dst, src)).is_some() {
warn!("Overwrote message {:?} in put_request_cache.", id);
}
}
_ => unreachable!("ExampleNode: Unexpected dst ({:?})", dst),
}
}
fn handle_node_added(&mut self, name: XorName, _routing_table: RoutingTable<XorName>) {
self.send_refresh(MessageId::from_added_node(name));
}
fn handle_node_lost(&mut self, name: XorName, _routing_table: RoutingTable<XorName>) {
self.send_refresh(MessageId::from_lost_node(name));
}
fn send_refresh(&mut self, id: MessageId) {
for (client_name, stored) in &self.client_accounts {
let refresh_content = RefreshContent::Client {
client_name: *client_name,
data: *stored,
};
let content = unwrap_result!(serialise(&refresh_content));
unwrap_result!(self.node
.send_refresh_request(Authority::ClientManager(*client_name),
Authority::ClientManager(*client_name),
content,
id));
}
for (data_name, data) in &self.db {
let refresh_content = RefreshContent::NaeManager {
data_name: *data_name,
data: data.clone(),
};
let content = unwrap_result!(serialise(&refresh_content));
unwrap_result!(self.node
.send_refresh_request(Authority::NaeManager(*data_name),
Authority::NaeManager(*data_name),
content,
id));
}
}
fn handle_refresh(&mut self, content: Vec<u8>, _id: MessageId) {
match unwrap_result!(deserialise(&content)) {
RefreshContent::Client { client_name, data } => {
trace!("{:?} handle_refresh for ClientManager. client - {:?}",
self.get_debug_name(),
client_name);
let _ = self.client_accounts.insert(client_name, data);
}
RefreshContent::NaeManager { data_name, data } => {
trace!("{:?} handle_refresh for NaeManager. data - {:?}",
self.get_debug_name(),
data_name);
let _ = self.db.insert(data_name, data);
}
}
}
fn get_debug_name(&self) -> String {
format!("Node({:?})",
match self.node.name() {
Ok(name) => name,
Err(err) => {
error!("Could not get node name - {:?}", err);
panic!("Could not get node name - {:?}", err);
}
})
}
}
#[derive(RustcEncodable, RustcDecodable)]
enum RefreshContent {
Client {
client_name: XorName,
data: u64,
},
NaeManager {
data_name: XorName,
data: Data,
},
}