use bit_vec::BitVec;
use serde_json::Value;
use std::{
collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
ops::Deref,
sync::{Arc, RwLock},
time::{Duration, SystemTime},
};
use crate::blockchain::{ConsensusConfig, StoredConfiguration, ValidatorKeys};
use crate::crypto::{Hash, PublicKey, SecretKey};
use crate::events::network::ConnectedPeerAddr;
use crate::helpers::{Height, Milliseconds, Round, ValidatorId};
use crate::messages::{
BlockResponse, Connect, Consensus as ConsensusMessage, Precommit, Prevote, Propose,
RawTransaction, Signed,
};
use crate::node::{
connect_list::{ConnectList, PeerAddress},
ConnectInfo,
};
use crate::storage::{KeySetIndex, MapIndex, Patch, Snapshot};
pub const PROPOSE_REQUEST_TIMEOUT: Milliseconds = 100;
pub const TRANSACTIONS_REQUEST_TIMEOUT: Milliseconds = 100;
pub const PREVOTES_REQUEST_TIMEOUT: Milliseconds = 100;
pub const BLOCK_REQUEST_TIMEOUT: Milliseconds = 100;
#[derive(Debug)]
pub struct State {
validator_state: Option<ValidatorState>,
our_connect_message: Signed<Connect>,
consensus_public_key: PublicKey,
consensus_secret_key: SecretKey,
service_public_key: PublicKey,
service_secret_key: SecretKey,
config: StoredConfiguration,
connect_list: SharedConnectList,
peers: HashMap<PublicKey, Signed<Connect>>,
connections: HashMap<PublicKey, ConnectedPeerAddr>,
height_start_time: SystemTime,
height: Height,
round: Round,
locked_round: Round,
locked_propose: Option<Hash>,
last_hash: Hash,
proposes: HashMap<Hash, ProposeState>,
blocks: HashMap<Hash, BlockState>,
prevotes: HashMap<(Round, Hash), Votes<Signed<Prevote>>>,
precommits: HashMap<(Round, Hash), Votes<Signed<Precommit>>>,
queued: Vec<ConsensusMessage>,
unknown_txs: HashMap<Hash, Vec<Hash>>,
unknown_proposes_with_precommits: HashMap<Hash, Vec<(Round, Hash)>>,
requests: HashMap<RequestData, RequestState>,
nodes_max_height: BTreeMap<PublicKey, Height>,
validators_rounds: BTreeMap<ValidatorId, Round>,
incomplete_block: Option<IncompleteBlock>,
}
#[derive(Debug, Clone)]
pub struct ValidatorState {
id: ValidatorId,
our_prevotes: HashMap<Round, Signed<Prevote>>,
our_precommits: HashMap<Round, Signed<Precommit>>,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum RequestData {
Propose(Hash),
ProposeTransactions(Hash),
BlockTransactions,
Prevotes(Round, Hash),
Block(Height),
}
#[derive(Debug)]
struct RequestState {
retries: u16,
known_nodes: HashSet<PublicKey>,
}
#[derive(Debug)]
pub struct ProposeState {
propose: Signed<Propose>,
unknown_txs: HashSet<Hash>,
block_hash: Option<Hash>,
is_saved: bool,
}
#[derive(Clone, Debug)]
pub struct BlockState {
hash: Hash,
patch: Patch,
txs: Vec<Hash>,
proposer_id: ValidatorId,
}
#[derive(Clone, Debug)]
pub struct IncompleteBlock {
msg: Signed<BlockResponse>,
unknown_txs: HashSet<Hash>,
}
pub trait VoteMessage: Clone {
fn validator(&self) -> ValidatorId;
}
impl VoteMessage for Signed<Precommit> {
fn validator(&self) -> ValidatorId {
self.deref().validator()
}
}
impl VoteMessage for Signed<Prevote> {
fn validator(&self) -> ValidatorId {
self.deref().validator()
}
}
#[derive(Debug)]
pub struct Votes<T: VoteMessage> {
messages: Vec<T>,
validators: BitVec,
count: usize,
}
impl ValidatorState {
pub fn new(id: ValidatorId) -> Self {
Self {
id,
our_precommits: HashMap::new(),
our_prevotes: HashMap::new(),
}
}
pub fn id(&self) -> ValidatorId {
self.id
}
pub fn set_validator_id(&mut self, id: ValidatorId) {
self.id = id;
}
pub fn have_prevote(&self, round: Round) -> bool {
self.our_prevotes.get(&round).is_some()
}
pub fn clear(&mut self) {
self.our_precommits.clear();
self.our_prevotes.clear();
}
}
impl<T> Votes<T>
where
T: VoteMessage,
{
pub fn new(validators_len: usize) -> Self {
Self {
messages: Vec::new(),
validators: BitVec::from_elem(validators_len, false),
count: 0,
}
}
pub fn insert(&mut self, message: T) {
let voter: usize = message.validator().into();
if !self.validators[voter] {
self.count += 1;
self.validators.set(voter, true);
self.messages.push(message);
}
}
pub fn validators(&self) -> &BitVec {
&self.validators
}
pub fn count(&self) -> usize {
self.count
}
pub fn messages(&self) -> &Vec<T> {
&self.messages
}
}
impl RequestData {
pub fn timeout(&self) -> Duration {
let ms = match *self {
RequestData::Propose(..) => PROPOSE_REQUEST_TIMEOUT,
RequestData::ProposeTransactions(..) | RequestData::BlockTransactions => {
TRANSACTIONS_REQUEST_TIMEOUT
}
RequestData::Prevotes(..) => PREVOTES_REQUEST_TIMEOUT,
RequestData::Block(..) => BLOCK_REQUEST_TIMEOUT,
};
Duration::from_millis(ms)
}
}
impl RequestState {
fn new() -> Self {
Self {
retries: 0,
known_nodes: HashSet::new(),
}
}
fn insert(&mut self, peer: PublicKey) {
self.known_nodes.insert(peer);
}
fn remove(&mut self, peer: &PublicKey) {
self.retries += 1;
self.known_nodes.remove(peer);
}
fn is_empty(&self) -> bool {
self.known_nodes.is_empty()
}
fn peek(&self) -> Option<PublicKey> {
self.known_nodes.iter().next().cloned()
}
}
impl ProposeState {
pub fn hash(&self) -> Hash {
self.propose.hash()
}
pub fn block_hash(&self) -> Option<Hash> {
self.block_hash
}
pub fn set_block_hash(&mut self, block_hash: Hash) {
self.block_hash = Some(block_hash)
}
pub fn message(&self) -> &Signed<Propose> {
&self.propose
}
pub fn unknown_txs(&self) -> &HashSet<Hash> {
&self.unknown_txs
}
pub fn has_unknown_txs(&self) -> bool {
!self.unknown_txs.is_empty()
}
pub fn is_saved(&self) -> bool {
self.is_saved
}
pub fn set_saved(&mut self, saved: bool) {
self.is_saved = saved;
}
}
impl BlockState {
pub fn new(hash: Hash, patch: Patch, txs: Vec<Hash>, proposer_id: ValidatorId) -> Self {
Self {
hash,
patch,
txs,
proposer_id,
}
}
pub fn hash(&self) -> Hash {
self.hash
}
pub fn patch(&self) -> &Patch {
&self.patch
}
pub fn txs(&self) -> &Vec<Hash> {
&self.txs
}
pub fn proposer_id(&self) -> ValidatorId {
self.proposer_id
}
}
impl IncompleteBlock {
pub fn message(&self) -> &Signed<BlockResponse> {
&self.msg
}
pub fn unknown_txs(&self) -> &HashSet<Hash> {
&self.unknown_txs
}
pub fn has_unknown_txs(&self) -> bool {
!self.unknown_txs.is_empty()
}
}
#[derive(Clone, Debug, Default)]
pub struct SharedConnectList {
inner: Arc<RwLock<ConnectList>>,
}
impl SharedConnectList {
pub fn from_connect_list(connect_list: ConnectList) -> Self {
SharedConnectList {
inner: Arc::new(RwLock::new(connect_list)),
}
}
pub fn is_peer_allowed(&self, public_key: &PublicKey) -> bool {
let connect_list = self.inner.read().expect("ConnectList read lock");
connect_list.is_peer_allowed(public_key)
}
pub fn peers(&self) -> Vec<ConnectInfo> {
self.inner
.read()
.expect("ConnectList read lock")
.peers
.iter()
.map(|(pk, a)| ConnectInfo {
address: a.address.clone(),
public_key: *pk,
})
.collect()
}
pub fn update_peer(&mut self, public_key: &PublicKey, address: String) {
let mut conn_list = self.inner.write().expect("ConnectList write lock");
conn_list.update_peer(public_key, address);
}
pub fn find_address_by_key(&self, public_key: &PublicKey) -> Option<PeerAddress> {
let connect_list = self.inner.read().expect("ConnectList read lock");
connect_list.find_address_by_pubkey(public_key).cloned()
}
}
impl State {
#[cfg_attr(feature = "cargo-clippy", allow(clippy::too_many_arguments))]
pub fn new(
validator_id: Option<ValidatorId>,
consensus_public_key: PublicKey,
consensus_secret_key: SecretKey,
service_public_key: PublicKey,
service_secret_key: SecretKey,
connect_list: ConnectList,
stored: StoredConfiguration,
connect: Signed<Connect>,
peers: HashMap<PublicKey, Signed<Connect>>,
last_hash: Hash,
last_height: Height,
height_start_time: SystemTime,
) -> Self {
Self {
validator_state: validator_id.map(ValidatorState::new),
consensus_public_key,
consensus_secret_key,
service_public_key,
service_secret_key,
connect_list: SharedConnectList::from_connect_list(connect_list),
peers,
connections: HashMap::new(),
height: last_height,
height_start_time,
round: Round::zero(),
locked_round: Round::zero(),
locked_propose: None,
last_hash,
proposes: HashMap::new(),
blocks: HashMap::new(),
prevotes: HashMap::new(),
precommits: HashMap::new(),
queued: Vec::new(),
unknown_txs: HashMap::new(),
unknown_proposes_with_precommits: HashMap::new(),
nodes_max_height: BTreeMap::new(),
validators_rounds: BTreeMap::new(),
our_connect_message: connect,
requests: HashMap::new(),
config: stored,
incomplete_block: None,
}
}
pub fn validator_state(&self) -> &Option<ValidatorState> {
&self.validator_state
}
pub fn validator_id(&self) -> Option<ValidatorId> {
self.validator_state.as_ref().map(|s| s.id())
}
pub fn renew_validator_id(&mut self, id: Option<ValidatorId>) {
let validator_state = self.validator_state.take();
self.validator_state = id.map(move |id| match validator_state {
Some(mut state) => {
state.set_validator_id(id);
state
}
None => ValidatorState::new(id),
});
}
pub fn is_validator(&self) -> bool {
self.validator_state().is_some()
}
pub fn is_leader(&self) -> bool {
self.validator_state()
.as_ref()
.map_or(false, |validator| self.leader(self.round()) == validator.id)
}
pub fn connect_list(&self) -> SharedConnectList {
self.connect_list.clone()
}
pub fn validators(&self) -> &[ValidatorKeys] {
&self.config.validator_keys
}
pub fn config(&self) -> &StoredConfiguration {
&self.config
}
pub fn find_validator(&self, peer: PublicKey) -> Option<ValidatorId> {
self.validators()
.iter()
.position(|pk| pk.consensus_key == peer)
.map(|id| ValidatorId(id as u16))
}
pub fn consensus_config(&self) -> &ConsensusConfig {
&self.config.consensus
}
pub fn services_config(&self) -> &BTreeMap<String, Value> {
&self.config.services
}
pub fn update_config(&mut self, config: StoredConfiguration) {
if self.config == config {
return;
}
trace!("Updating node config={:#?}", config);
let validator_id = config
.validator_keys
.iter()
.position(|pk| pk.consensus_key == *self.consensus_public_key())
.map(|id| ValidatorId(id as u16));
self.renew_validator_id(validator_id);
trace!("Validator={:#?}", self.validator_state());
self.config = config;
}
pub fn add_peer(&mut self, pubkey: PublicKey, msg: Signed<Connect>) -> bool {
self.peers.insert(pubkey, msg).is_none()
}
pub fn add_connection(&mut self, pubkey: PublicKey, address: ConnectedPeerAddr) {
self.connections.insert(pubkey, address);
}
pub fn remove_peer_with_pubkey(&mut self, key: &PublicKey) -> Option<Signed<Connect>> {
self.connections.remove(key);
if let Some(c) = self.peers.remove(key) {
Some(c)
} else {
None
}
}
pub fn peer_is_validator(&self, pubkey: &PublicKey) -> bool {
self.config
.validator_keys
.iter()
.any(|x| x.consensus_key == *pubkey)
}
pub fn peer_in_connect_list(&self, pubkey: &PublicKey) -> bool {
self.connect_list.is_peer_allowed(pubkey)
}
pub fn peers(&self) -> &HashMap<PublicKey, Signed<Connect>> {
&self.peers
}
pub fn connections(&self) -> &HashMap<PublicKey, ConnectedPeerAddr> {
&self.connections
}
pub fn consensus_public_key_of(&self, id: ValidatorId) -> Option<PublicKey> {
let id: usize = id.into();
self.validators().get(id).map(|x| x.consensus_key)
}
pub fn consensus_public_key(&self) -> &PublicKey {
&self.consensus_public_key
}
pub fn consensus_secret_key(&self) -> &SecretKey {
&self.consensus_secret_key
}
pub fn service_public_key(&self) -> &PublicKey {
&self.service_public_key
}
pub fn service_secret_key(&self) -> &SecretKey {
&self.service_secret_key
}
pub fn leader(&self, round: Round) -> ValidatorId {
let height: u64 = self.height().into();
let round: u64 = round.into();
ValidatorId(((height + round) % (self.validators().len() as u64)) as u16)
}
pub fn update_validator_round(&mut self, id: ValidatorId, round: Round) -> Option<Round> {
{
let known_round = self.validators_rounds.entry(id).or_insert_with(Round::zero);
if round <= *known_round {
trace!(
"Received a message from a lower round than we know already,\
message_round = {},\
known_round = {}.",
round,
known_round
);
return None;
}
*known_round = round;
}
let max_byzantine_count = (self.validators().len() + 2) / 3 - 1;
if self.validators_rounds.len() <= max_byzantine_count {
trace!("Count of validators, lower then max byzantine count.");
return None;
}
let mut rounds: Vec<_> = self.validators_rounds.iter().map(|(_, v)| v).collect();
rounds.sort_unstable_by(|a, b| b.cmp(a));
if *rounds[max_byzantine_count] > self.round {
Some(*rounds[max_byzantine_count])
} else {
None
}
}
pub fn node_height(&self, key: &PublicKey) -> Height {
*self.nodes_max_height.get(key).unwrap_or(&Height::zero())
}
pub fn set_node_height(&mut self, key: PublicKey, height: Height) {
*self
.nodes_max_height
.entry(key)
.or_insert_with(Height::zero) = height;
}
pub fn nodes_with_bigger_height(&self) -> Vec<&PublicKey> {
self.nodes_max_height
.iter()
.filter(|&(_, h)| *h > self.height())
.map(|(v, _)| v)
.collect()
}
pub fn majority_count(&self) -> usize {
Self::byzantine_majority_count(self.validators().len())
}
pub fn byzantine_majority_count(total: usize) -> usize {
total * 2 / 3 + 1
}
pub fn height(&self) -> Height {
self.height
}
pub fn height_start_time(&self) -> SystemTime {
self.height_start_time
}
pub fn round(&self) -> Round {
self.round
}
pub fn last_hash(&self) -> &Hash {
&self.last_hash
}
pub fn lock(&mut self, round: Round, hash: Hash) {
if self.locked_round >= round {
panic!("Incorrect lock")
}
self.locked_round = round;
self.locked_propose = Some(hash);
}
pub fn locked_round(&self) -> Round {
self.locked_round
}
pub fn locked_propose(&self) -> Option<Hash> {
self.locked_propose
}
pub fn propose_mut(&mut self, hash: &Hash) -> Option<&mut ProposeState> {
self.proposes.get_mut(hash)
}
pub fn propose(&self, hash: &Hash) -> Option<&ProposeState> {
self.proposes.get(hash)
}
pub fn block(&self, hash: &Hash) -> Option<&BlockState> {
self.blocks.get(hash)
}
pub fn jump_round(&mut self, round: Round) {
self.round = round;
}
pub fn new_round(&mut self) {
self.round.increment();
}
pub fn incomplete_block(&self) -> Option<&IncompleteBlock> {
self.incomplete_block.as_ref()
}
pub fn new_height(&mut self, block_hash: &Hash, height_start_time: SystemTime) {
self.height.increment();
self.height_start_time = height_start_time;
self.round = Round::first();
self.locked_round = Round::zero();
self.locked_propose = None;
self.last_hash = *block_hash;
self.blocks.clear();
self.proposes.clear();
self.unknown_proposes_with_precommits.clear();
self.prevotes.clear();
self.precommits.clear();
self.validators_rounds.clear();
if let Some(ref mut validator_state) = self.validator_state {
validator_state.clear();
}
self.requests.clear();
self.incomplete_block = None;
}
pub fn queued(&mut self) -> Vec<ConsensusMessage> {
let mut queued = Vec::new();
::std::mem::swap(&mut self.queued, &mut queued);
queued
}
pub fn add_queued(&mut self, msg: ConsensusMessage) {
self.queued.push(msg);
}
pub fn check_incomplete_proposes(&mut self, tx_hash: Hash) -> Vec<(Hash, Round)> {
let mut full_proposes = Vec::new();
for (propose_hash, propose_state) in &mut self.proposes {
propose_state.unknown_txs.remove(&tx_hash);
if propose_state.unknown_txs.is_empty() {
full_proposes.push((*propose_hash, propose_state.message().round()));
}
}
full_proposes
}
pub fn remove_unknown_transaction(&mut self, tx_hash: Hash) -> Option<IncompleteBlock> {
if let Some(ref mut incomplete_block) = self.incomplete_block {
incomplete_block.unknown_txs.remove(&tx_hash);
if incomplete_block.unknown_txs.is_empty() {
return Some(incomplete_block.clone());
}
}
None
}
pub fn prevotes(&self, round: Round, propose_hash: Hash) -> &[Signed<Prevote>] {
self.prevotes
.get(&(round, propose_hash))
.map_or_else(|| [].as_ref(), |votes| votes.messages().as_slice())
}
pub fn precommits(&self, round: Round, propose_hash: Hash) -> &[Signed<Precommit>] {
self.precommits
.get(&(round, propose_hash))
.map_or_else(|| [].as_ref(), |votes| votes.messages().as_slice())
}
pub fn have_prevote(&self, propose_round: Round) -> bool {
if let Some(ref validator_state) = *self.validator_state() {
validator_state.have_prevote(propose_round)
} else {
panic!("called have_prevote for auditor node")
}
}
pub fn add_self_propose(&mut self, msg: Signed<Propose>) -> Hash {
debug_assert!(self.validator_state().is_some());
let propose_hash = msg.hash();
self.proposes.insert(
propose_hash,
ProposeState {
propose: msg,
unknown_txs: HashSet::new(),
block_hash: None,
is_saved: true,
},
);
propose_hash
}
pub fn add_propose<S: AsRef<dyn Snapshot>>(
&mut self,
msg: Signed<Propose>,
transactions: &MapIndex<S, Hash, Signed<RawTransaction>>,
transaction_pool: &KeySetIndex<S, Hash>,
) -> Result<&ProposeState, failure::Error> {
let propose_hash = msg.hash();
match self.proposes.entry(propose_hash) {
Entry::Occupied(..) => bail!("Propose already found"),
Entry::Vacant(e) => {
let mut unknown_txs = HashSet::new();
for hash in msg.transactions() {
if transactions.get(hash).is_some() {
if !transaction_pool.contains(hash) {
bail!(
"Received propose with already \
committed transaction"
)
}
} else {
unknown_txs.insert(*hash);
}
}
for tx in &unknown_txs {
self.unknown_txs
.entry(*tx)
.or_insert_with(Vec::new)
.push(propose_hash);
}
Ok(e.insert(ProposeState {
propose: msg,
unknown_txs,
block_hash: None,
is_saved: false,
}))
}
}
}
pub fn add_block(
&mut self,
block_hash: Hash,
patch: Patch,
txs: Vec<Hash>,
proposer_id: ValidatorId,
) -> Option<&BlockState> {
match self.blocks.entry(block_hash) {
Entry::Occupied(..) => None,
Entry::Vacant(e) => Some(e.insert(BlockState {
hash: block_hash,
patch,
txs,
proposer_id,
})),
}
}
pub fn create_incomplete_block<S: AsRef<dyn Snapshot>>(
&mut self,
msg: &Signed<BlockResponse>,
txs: &MapIndex<S, Hash, Signed<RawTransaction>>,
txs_pool: &KeySetIndex<S, Hash>,
) -> &IncompleteBlock {
assert!(self.incomplete_block().is_none());
let mut unknown_txs = HashSet::new();
for hash in msg.transactions() {
if txs.get(hash).is_some() {
if !txs_pool.contains(hash) {
panic!(
"Received block with already \
committed transaction"
)
}
} else {
unknown_txs.insert(*hash);
}
}
self.incomplete_block = Some(IncompleteBlock {
msg: msg.clone(),
unknown_txs,
});
self.incomplete_block().unwrap()
}
pub fn add_prevote(&mut self, msg: Signed<Prevote>) -> bool {
let majority_count = self.majority_count();
if let Some(ref mut validator_state) = self.validator_state {
if validator_state.id == msg.validator() {
if let Some(other) = validator_state
.our_prevotes
.insert(msg.round(), msg.clone())
{
if other != msg {
panic!(
"Trying to send different prevotes for the same round: \
old = {:?}, new = {:?}",
other, msg
);
}
}
}
}
let key = (msg.round(), *msg.propose_hash());
let validators_len = self.validators().len();
let votes = self
.prevotes
.entry(key)
.or_insert_with(|| Votes::new(validators_len));
votes.insert(msg);
votes.count() >= majority_count
}
pub fn has_majority_prevotes(&self, round: Round, propose_hash: Hash) -> bool {
match self.prevotes.get(&(round, propose_hash)) {
Some(votes) => votes.count() >= self.majority_count(),
None => false,
}
}
pub fn known_prevotes(&self, round: Round, propose_hash: &Hash) -> BitVec {
let len = self.validators().len();
self.prevotes
.get(&(round, *propose_hash))
.map_or_else(|| BitVec::from_elem(len, false), |x| x.validators().clone())
}
pub fn known_precommits(&self, round: Round, propose_hash: &Hash) -> BitVec {
let len = self.validators().len();
self.precommits
.get(&(round, *propose_hash))
.map_or_else(|| BitVec::from_elem(len, false), |x| x.validators().clone())
}
pub fn add_precommit(&mut self, msg: Signed<Precommit>) -> bool {
let majority_count = self.majority_count();
if let Some(ref mut validator_state) = self.validator_state {
if validator_state.id == msg.validator() {
if let Some(other) = validator_state
.our_precommits
.insert(msg.round(), msg.clone())
{
if other.propose_hash() != msg.propose_hash() {
panic!(
"Trying to send different precommits for same round, old={:?}, \
new={:?}",
other, msg
);
}
}
}
}
let key = (msg.round(), *msg.block_hash());
let validators_len = self.validators().len();
let votes = self
.precommits
.entry(key)
.or_insert_with(|| Votes::new(validators_len));
votes.insert(msg);
votes.count() >= majority_count
}
pub fn add_unknown_propose_with_precommits(
&mut self,
round: Round,
propose_hash: Hash,
block_hash: Hash,
) {
self.unknown_proposes_with_precommits
.entry(propose_hash)
.or_insert_with(Vec::new)
.push((round, block_hash));
}
pub fn take_unknown_propose_with_precommits(
&mut self,
propose_hash: &Hash,
) -> Vec<(Round, Hash)> {
self.unknown_proposes_with_precommits
.remove(propose_hash)
.unwrap_or_default()
}
pub fn has_majority_precommits(&self, round: Round, block_hash: Hash) -> bool {
match self.precommits.get(&(round, block_hash)) {
Some(votes) => votes.count() >= self.majority_count(),
None => false,
}
}
pub fn have_incompatible_prevotes(&self) -> bool {
for round in self.locked_round.next().iter_to(self.round.next()) {
match self.validator_state {
Some(ref validator_state) => {
if let Some(msg) = validator_state.our_prevotes.get(&round) {
if Some(*msg.propose_hash()) != self.locked_propose {
return true;
}
}
}
None => unreachable!(),
}
}
false
}
pub fn request(&mut self, data: RequestData, peer: PublicKey) -> bool {
let state = self.requests.entry(data).or_insert_with(RequestState::new);
let is_new = state.is_empty();
state.insert(peer);
is_new
}
pub fn retry(&mut self, data: &RequestData, peer: Option<PublicKey>) -> Option<PublicKey> {
let next = {
let state = if let Some(state) = self.requests.get_mut(data) {
state
} else {
return None;
};
if let Some(peer) = peer {
state.remove(&peer);
}
state.peek()
};
if next.is_none() {
self.requests.remove(data);
};
next
}
pub fn remove_request(&mut self, data: &RequestData) -> HashSet<PublicKey> {
let state = self.requests.remove(data);
state.map(|s| s.known_nodes).unwrap_or_default()
}
pub fn our_connect_message(&self) -> &Signed<Connect> {
&self.our_connect_message
}
pub fn set_our_connect_message(&mut self, msg: Signed<Connect>) {
self.our_connect_message = msg;
}
pub fn add_peer_to_connect_list(&mut self, peer: ConnectInfo) {
let mut list = self
.connect_list
.inner
.write()
.expect("ConnectList write lock");
list.add(peer);
}
}