use cache::Cache;
use config_handler::{self, Config};
use error::InternalError;
use kademlia_routing_table::RoutingTable;
use personas::data_manager::DataManager;
#[cfg(feature = "use-mock-crust")]
use personas::data_manager::IdAndVersion;
use personas::maid_manager::MaidManager;
use routing::{Authority, Data, NodeBuilder, Request, Response, XorName};
use rust_sodium;
use std::env;
use std::path::Path;
use std::rc::Rc;
use std::sync::mpsc::{self, Receiver};
pub const CHUNK_STORE_DIR: &'static str = "safe_vault_chunk_store";
const DEFAULT_MAX_CAPACITY: u64 = 2 * 1024 * 1024 * 1024;
pub use routing::Event;
pub use routing::Node as RoutingNode;
pub struct Vault {
maid_manager: MaidManager,
data_manager: DataManager,
_routing_node: Rc<RoutingNode>,
routing_receiver: Receiver<Event>,
}
impl Vault {
pub fn new(first_vault: bool, use_cache: bool) -> Result<Self, InternalError> {
let config = match config_handler::read_config_file() {
Ok(cfg) => cfg,
Err(InternalError::FileHandler(e)) => {
error!("Config file could not be parsed : {:?}", e);
return Err(From::from(e));
}
Err(e) => return Err(From::from(e)),
};
let builder = RoutingNode::builder().first(first_vault).deny_other_local_nodes();
match Self::vault_with_config(builder, use_cache, config.clone()) {
Ok(vault) => Ok(vault),
Err(InternalError::ChunkStore(e)) => {
error!("Incorrect path {:?} for chunk_store_root : {:?}",
config.chunk_store_root,
e);
Err(From::from(e))
}
Err(e) => Err(From::from(e)),
}
}
#[cfg(feature = "use-mock-crust")]
pub fn new_with_config(first_vault: bool,
use_cache: bool,
config: Config)
-> Result<Self, InternalError> {
Self::vault_with_config(RoutingNode::builder().first(first_vault), use_cache, config)
}
fn vault_with_config(builder: NodeBuilder,
use_cache: bool,
config: Config)
-> Result<Self, InternalError> {
rust_sodium::init();
let mut chunk_store_root = match config.chunk_store_root {
Some(path_str) => Path::new(&path_str).to_path_buf(),
None => env::temp_dir(),
};
chunk_store_root.push(CHUNK_STORE_DIR);
let (routing_sender, routing_receiver) = mpsc::channel();
let routing_node = Rc::new(try!(if use_cache {
builder.cache(Box::new(Cache::new())).create(routing_sender)
} else {
builder.create(routing_sender)
}));
Ok(Vault {
maid_manager: MaidManager::new(routing_node.clone()),
data_manager: try!(DataManager::new(routing_node.clone(),
chunk_store_root,
config.max_capacity
.unwrap_or(DEFAULT_MAX_CAPACITY))),
_routing_node: routing_node.clone(),
routing_receiver: routing_receiver,
})
}
pub fn run(&mut self) -> Result<bool, InternalError> {
while let Ok(event) = self.routing_receiver.recv() {
if let Some(terminate) = self.process_event(event) {
return Ok(terminate);
}
}
Ok(true)
}
#[cfg(feature = "use-mock-crust")]
pub fn poll(&mut self) -> bool {
let mut result = self._routing_node.poll();
while let Ok(event) = self.routing_receiver.try_recv() {
let _ignored_for_mock = self.process_event(event);
result = true
}
result
}
#[cfg(feature = "use-mock-crust")]
pub fn get_stored_names(&self) -> Vec<IdAndVersion> {
self.data_manager.get_stored_names()
}
#[cfg(feature = "use-mock-crust")]
pub fn get_maid_manager_put_count(&self, client_name: &XorName) -> Option<u64> {
self.maid_manager.get_put_count(client_name)
}
#[cfg(feature = "use-mock-crust")]
pub fn resend_unacknowledged(&self) -> bool {
self._routing_node.resend_unacknowledged()
}
#[cfg(feature = "use-mock-crust")]
pub fn clear_state(&self) {
self._routing_node.clear_state()
}
#[cfg(feature = "use-mock-crust")]
pub fn name(&self) -> XorName {
unwrap_result!(self._routing_node.name())
}
#[cfg(feature = "use-mock-crust")]
pub fn routing_table(&self) -> RoutingTable<XorName> {
self._routing_node.routing_table()
}
fn process_event(&mut self, event: Event) -> Option<bool> {
let mut ret = None;
if let Err(error) = match event {
Event::Request { request, src, dst } => self.on_request(request, src, dst),
Event::Response { response, src, dst } => self.on_response(response, src, dst),
Event::NodeAdded(node_added, routing_table) => {
self.on_node_added(node_added, routing_table)
}
Event::NodeLost(node_lost, routing_table) => {
self.on_node_lost(node_lost, routing_table)
}
Event::RestartRequired => {
warn!("Restarting Vault");
ret = Some(false);
Ok(())
}
Event::Terminate => {
ret = Some(true);
Ok(())
}
Event::Connected | Event::Tick => Ok(()),
} {
debug!("Failed to handle event: {:?}", error);
}
self.data_manager.check_timeouts();
ret
}
fn on_request(&mut self,
request: Request,
src: Authority,
dst: Authority)
-> Result<(), InternalError> {
match (src, dst, request) {
(src @ Authority::Client { .. },
dst @ Authority::NaeManager(_),
Request::Get(data_id, msg_id)) |
(src @ Authority::ManagedNode(_),
dst @ Authority::ManagedNode(_),
Request::Get(data_id, msg_id)) => {
self.data_manager.handle_get(src, dst, data_id, msg_id)
}
(src @ Authority::Client { .. },
dst @ Authority::ClientManager(_),
Request::Put(data, msg_id)) => self.maid_manager.handle_put(src, dst, data, msg_id),
(src @ Authority::ClientManager(_),
dst @ Authority::NaeManager(_),
Request::Put(data, msg_id)) => self.data_manager.handle_put(src, dst, data, msg_id),
(src @ Authority::Client { .. },
dst @ Authority::NaeManager(_),
Request::Post(data, msg_id)) => self.data_manager.handle_post(src, dst, data, msg_id),
(src @ Authority::Client { .. },
dst @ Authority::NaeManager(_),
Request::Delete(Data::Structured(data), msg_id)) => {
self.data_manager.handle_delete(src, dst, data, msg_id)
}
(src @ Authority::Client { .. },
dst @ Authority::NaeManager(_),
Request::Append(wrapper, msg_id)) => {
self.data_manager.handle_append(src, dst, wrapper, msg_id)
}
(src @ Authority::Client { .. },
dst @ Authority::ClientManager(_),
Request::GetAccountInfo(msg_id)) => {
self.maid_manager.handle_get_account_info(src, dst, msg_id)
}
(Authority::ClientManager(_),
Authority::ClientManager(_),
Request::Refresh(serialised_msg, _)) => {
self.maid_manager.handle_refresh(&serialised_msg)
}
(Authority::ManagedNode(src_name),
Authority::ManagedNode(_),
Request::Refresh(serialised_msg, _)) |
(Authority::ManagedNode(src_name),
Authority::NaeManager(_),
Request::Refresh(serialised_msg, _)) => {
self.data_manager.handle_refresh(src_name, &serialised_msg)
}
(Authority::NaeManager(_),
Authority::NaeManager(_),
Request::Refresh(serialised_msg, _)) => {
self.data_manager.handle_group_refresh(&serialised_msg)
}
(_, _, request) => Err(InternalError::UnknownRequestType(request)),
}
}
fn on_response(&mut self,
response: Response,
src: Authority,
dst: Authority)
-> Result<(), InternalError> {
match (src, dst, response) {
(Authority::ManagedNode(src_name),
Authority::ManagedNode(_),
Response::GetSuccess(data, _)) => self.data_manager.handle_get_success(src_name, data),
(Authority::ManagedNode(src_name),
Authority::ManagedNode(_),
Response::GetFailure { data_id, .. }) => {
self.data_manager.handle_get_failure(src_name, data_id)
}
(Authority::NaeManager(_),
Authority::ClientManager(_),
Response::PutSuccess(data_id, msg_id)) => {
self.maid_manager.handle_put_success(data_id, msg_id)
}
(Authority::NaeManager(_),
Authority::ClientManager(_),
Response::PutFailure { id, external_error_indicator, data_id }) => {
self.maid_manager.handle_put_failure(id, data_id, &external_error_indicator)
}
(_, _, response) => Err(InternalError::UnknownResponseType(response)),
}
}
fn on_node_added(&mut self,
node_added: XorName,
routing_table: RoutingTable<XorName>)
-> Result<(), InternalError> {
self.maid_manager.handle_node_added(&node_added, &routing_table);
self.data_manager.handle_node_added(&node_added, &routing_table);
Ok(())
}
fn on_node_lost(&mut self,
node_lost: XorName,
routing_table: RoutingTable<XorName>)
-> Result<(), InternalError> {
self.maid_manager.handle_node_lost(&node_lost);
self.data_manager.handle_node_lost(&node_lost, &routing_table);
Ok(())
}
}