use super::vault::{self, Vault};
use crate::config_handler::{get_config, Config};
use crate::{
client::SafeKey,
network_event::{NetworkEvent, NetworkTx},
CoreError,
};
use futures::lock::Mutex;
use lazy_static::lazy_static;
use log::trace;
use quic_p2p::{self, Config as QuicP2pConfig};
use safe_nd::{Coins, Message, PublicId, PublicKey, Request, RequestType, Response};
use std::collections::HashSet;
use std::env;
use std::sync::Arc;
use xor_name::XorName;
lazy_static! {
static ref VAULT: Arc<Mutex<Vault>> = Arc::new(Mutex::new(Vault::new(get_config())));
}
pub type RequestHookFn = dyn FnMut(&Request) -> Option<Response> + Send + Sync + 'static;
pub type ResponseHookFn = dyn FnMut(Response) -> Response + Send + Sync + 'static;
#[allow(unused)]
#[derive(Clone)]
pub struct ConnectionManager {
vault: Arc<Mutex<Vault>>,
request_hook: Option<Arc<RequestHookFn>>,
response_hook: Option<Arc<ResponseHookFn>>,
groups: Arc<Mutex<HashSet<PublicId>>>,
net_tx: NetworkTx,
}
impl ConnectionManager {
pub fn new(_config: QuicP2pConfig, net_tx: &NetworkTx) -> Result<Self, CoreError> {
Ok(Self {
vault: clone_vault(),
request_hook: None,
response_hook: None,
groups: Arc::new(Mutex::new(HashSet::default())),
net_tx: net_tx.clone(),
})
}
pub fn new_with_vault(vault_config: Config, net_tx: &NetworkTx) -> Result<Self, CoreError> {
Ok(Self {
vault: Arc::new(Mutex::new(Vault::new(vault_config))),
request_hook: None,
response_hook: None,
groups: Arc::new(Mutex::new(HashSet::default())),
net_tx: net_tx.clone(),
})
}
pub async fn has_connection_to(&self, pub_id: &PublicId) -> bool {
self.groups.lock().await.contains(&pub_id)
}
pub async fn send(&mut self, pub_id: &PublicId, msg: &Message) -> Result<Response, CoreError> {
#[cfg(any(feature = "testing", test))]
{
if let Some(resp) = self.intercept_request(msg.clone()) {
return Ok(resp);
}
}
let msg: Message = {
let writing = match msg {
Message::Request { request, .. } => {
let req_type = request.get_type();
req_type == RequestType::Mutation || req_type == RequestType::Transaction
}
_ => false,
};
let mut vault = vault::lock(&self.vault, writing).await;
vault.process_request(pub_id.clone(), &msg)?
};
if let Message::Response { response, .. } = msg {
Ok(response)
} else {
Err(CoreError::Unexpected(
"Logic error: Vault error returned invalid response".to_string(),
))
}
}
pub async fn bootstrap(&mut self, full_id: SafeKey) -> Result<(), CoreError> {
let _ = self.groups.lock().await.insert(full_id.public_id());
Ok(())
}
pub fn restart_network(&mut self) {
}
pub async fn disconnect(&mut self, pub_id: &PublicId) -> Result<(), CoreError> {
let mut groups = self.groups.lock().await;
let _ = groups.remove(pub_id);
if groups.is_empty() {
trace!("Disconnected from the network; sending the notification.");
let _ = self.net_tx.unbounded_send(NetworkEvent::Disconnected);
}
Ok(())
}
pub async fn allocate_test_coins(
&self,
coin_balance_name: &XorName,
amount: Coins,
) -> Result<(), safe_nd::Error> {
let mut vault = vault::lock(&self.vault, true).await;
vault.mock_increment_balance(coin_balance_name, amount)
}
pub async fn create_balance(&self, owner: PublicKey, amount: Coins) {
let mut vault = vault::lock(&self.vault, true).await;
vault.mock_create_balance(owner, amount);
}
pub async fn simulate_disconnect(&self) {
let mut groups = self.groups.lock().await;
trace!("Simulating disconnect. Connected groups: {:?}", groups);
if !groups.is_empty() {
trace!("Disconnecting everyone");
groups.clear();
let _ = self.net_tx.unbounded_send(NetworkEvent::Disconnected);
}
}
pub fn set_simulate_timeout(&mut self, _enable: bool) {
unimplemented!()
}
pub fn set_network_limits(&mut self, _max_ops_count: Option<u64>) {
unimplemented!()
}
}
#[cfg(any(feature = "testing", test))]
impl ConnectionManager {
fn intercept_request(&mut self, message: Message) -> Option<Response> {
if let Message::Request { request, .. } = message {
if let Some(hook) = Arc::get_mut(self.request_hook.as_mut()?) {
return hook(&request);
}
}
None
}
pub fn set_request_hook<F>(&mut self, hook: F)
where
F: FnMut(&Request) -> Option<Response> + Send + Sync + 'static,
{
let hook: Arc<RequestHookFn> = Arc::new(hook);
self.request_hook = Some(hook);
}
pub fn set_response_hook<F>(&mut self, hook: F)
where
F: FnMut(Response) -> Response + 'static + Send + Sync,
{
let hook: Arc<ResponseHookFn> = Arc::new(hook);
self.response_hook = Some(hook);
}
pub fn remove_request_hook(&mut self) {
self.request_hook = None;
}
}
pub fn clone_vault() -> Arc<Mutex<Vault>> {
VAULT.clone()
}
pub fn unlimited_coins(config: &Config) -> bool {
match env::var("SAFE_MOCK_UNLIMITED_COINS") {
Ok(_) => true,
Err(_) => match config.dev {
Some(ref dev) => dev.mock_unlimited_coins,
None => false,
},
}
}