use chrono::Utc;
use num::FromPrimitive;
use rand::thread_rng;
use crate::mwc_core::ser::{self, DeserializationMode, Readable, Reader, Writeable, Writer};
use crate::types::{Capabilities, PeerAddr, ReasonForBan};
use mwc_store::{self, option_to_not_found, to_key, Error};
use mwc_util::secp::rand::Rng;
const DB_NAME: &str = "peerV2";
const STORE_SUBPATH: &str = "peers";
const PEER_PREFIX: u8 = b'P';
enum_from_primitive! {
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub enum State {
Healthy = 0,
Banned = 1,
Defunct = 2,
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerData {
pub addr: PeerAddr,
pub capabilities: Capabilities,
pub user_agent: String,
pub flags: State,
pub last_banned: i64,
pub ban_reason: ReasonForBan,
pub last_connected: i64,
}
impl Writeable for PeerData {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
if self.user_agent.len() > 10_000 {
return Err(ser::Error::TooLargeWriteErr(format!(
"Unreasonable long User Agent. UA length is {}",
self.user_agent.len()
)));
}
self.addr.write(writer)?;
ser_multiwrite!(
writer,
[write_u32, self.capabilities.bits()],
[write_bytes, &self.user_agent],
[write_u8, self.flags as u8],
[write_i64, self.last_banned],
[write_i32, self.ban_reason as i32],
[write_i64, self.last_connected]
);
Ok(())
}
}
impl Readable for PeerData {
fn read<R: Reader>(reader: &mut R) -> Result<PeerData, ser::Error> {
let addr = PeerAddr::read(reader)?;
let capab = reader.read_u32()?;
let ua = reader.read_bytes_len_prefix()?;
let (fl, lb, br) = ser_multiread!(reader, read_u8, read_i64, read_i32);
let lc = reader.read_i64();
let last_connected = match lc {
Err(_) => Utc::now().timestamp(),
Ok(lc) => lc,
};
let user_agent = String::from_utf8(ua)
.map_err(|e| ser::Error::CorruptedData(format!("Fail to read user agent, {}", e)))?;
let capabilities = Capabilities::from_bits_truncate(capab);
let ban_reason = ReasonForBan::from_i32(br).ok_or(ser::Error::CorruptedData(
"Unable to read PeerData ban reason".to_string(),
))?;
match State::from_u8(fl) {
Some(flags) => Ok(PeerData {
addr,
capabilities,
user_agent,
flags: flags,
last_banned: lb,
ban_reason,
last_connected,
}),
None => Err(ser::Error::CorruptedData(
"Unable to read PeerData State".to_string(),
)),
}
}
}
pub struct PeerStore {
db: mwc_store::Store,
}
impl PeerStore {
pub fn new(db_root: &str) -> Result<PeerStore, Error> {
let db = mwc_store::Store::new(db_root, Some(DB_NAME), Some(STORE_SUBPATH), None)?;
Ok(PeerStore { db: db })
}
pub fn save_peer(&self, p: &PeerData) -> Result<(), Error> {
debug!("save_peer: {:?} marked {:?}", p.addr, p.flags);
let batch = self.db.batch_write()?;
batch.put_ser(&peer_key(&p.addr)[..], p)?;
batch.commit()
}
pub fn save_peers(&self, p: Vec<PeerData>) -> Result<(), Error> {
let batch = self.db.batch_write()?;
for pd in p {
debug!("save_peers: {:?} marked {:?}", pd.addr, pd.flags);
batch.put_ser(&peer_key(&pd.addr)[..], &pd)?;
}
batch.commit()
}
pub fn get_peer(&self, peer_addr: &PeerAddr) -> Result<PeerData, Error> {
option_to_not_found(self.db.get_ser(&peer_key(peer_addr)[..], None), || {
format!("Peer at address: {}", peer_addr)
})
}
pub fn exists_peer(&self, peer_addr: &PeerAddr) -> Result<bool, Error> {
self.db.exists(&peer_key(peer_addr)[..])
}
pub fn delete_peer(&self, peer_addr: &PeerAddr) -> Result<(), Error> {
let batch = self.db.batch_write()?;
batch.delete(&peer_key(peer_addr)[..])?;
batch.commit()
}
pub fn find_peers(&self, state: State, cap: Capabilities) -> Result<Vec<PeerData>, Error> {
let mut peers = self
.peers_iter()?
.filter(|p| {
p.flags == state
&& (p.capabilities == Capabilities::UNKNOWN || p.capabilities.contains(cap))
})
.collect::<Vec<_>>();
let peers_num = peers.len();
if peers_num > 1 {
peers.sort_by_key(|p| -p.last_connected);
let mut rng = thread_rng();
for i1 in (1..peers_num).step_by(2) {
if i1 + 2 < peers_num {
let i2 = rng.gen_range(i1 + 1, peers_num);
peers.swap(i1, i2);
}
}
}
Ok(peers)
}
pub fn peers_iter(&self) -> Result<impl Iterator<Item = PeerData>, Error> {
let key = to_key(PEER_PREFIX, "");
let protocol_version = self.db.protocol_version();
self.db.iter(&key, move |_, mut v| {
ser::deserialize(&mut v, protocol_version, DeserializationMode::default())
.map_err(From::from)
})
}
pub fn all_peers(&self) -> Result<Vec<PeerData>, Error> {
let peers: Vec<PeerData> = self.peers_iter()?.collect();
Ok(peers)
}
pub fn update_state(&self, peer_addr: &PeerAddr, new_state: State) -> Result<(), Error> {
let batch = self.db.batch_write()?;
let mut peer = option_to_not_found(
batch.get_ser::<PeerData>(&peer_key(peer_addr)[..], None),
|| format!("Peer at address: {}", peer_addr),
)?;
if peer.flags != new_state {
debug!(
"Changing peer {:?} state form {:?} to {:?}",
peer_addr, peer.flags, new_state
);
}
peer.flags = new_state;
if new_state == State::Banned {
peer.last_banned = Utc::now().timestamp();
}
batch.put_ser(&peer_key(peer_addr)[..], &peer)?;
batch.commit()
}
pub fn delete_peers<F>(&self, predicate: F) -> Result<(), Error>
where
F: Fn(&PeerData) -> bool,
{
let mut to_remove = vec![];
for x in self.peers_iter()? {
if predicate(&x) {
to_remove.push(x)
}
}
if !to_remove.is_empty() {
let batch = self.db.batch_write()?;
for peer in to_remove {
batch.delete(&peer_key(&peer.addr)[..])?;
}
batch.commit()?;
}
Ok(())
}
}
fn peer_key(peer_addr: &PeerAddr) -> Vec<u8> {
to_key(PEER_PREFIX, peer_addr.as_key())
}