use std::collections::HashMap;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::{from_utf8, FromStr};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use leveldb::database::batch::{Batch, WriteBatch};
use leveldb::database::{Database, DatabaseReader};
use leveldb::iterator::{Iterable, LevelDBIterator};
use leveldb::options::{Options, ReadOptions, WriteOptions};
use leveldb::snapshots::Snapshots;
use rand::Rng;
use serde::{Deserialize, Serialize};
use crate::error::{DataError, DbError, EncodingError, ParsingError};
use crate::peer_storage::{PeerStorage, PeerStorageError, PeerStorageNotFoundError};
use crate::utils::now_as_duration;
const NEW_PEER_LAST_ATTEMPT_MAX: u64 = 1 << 30;
const U64_LENGTH: usize = std::mem::size_of::<u64>();
pub struct PeerStorageDisk {
db: Database,
connected_peers: Mutex<HashMap<SocketAddr, bool>>,
}
impl PeerStorageDisk {
pub fn new(db_path: PathBuf) -> Result<Arc<Self>, PeerStorageError> {
let mut options = Options::new();
options.create_if_missing = true;
let db = Database::open(&db_path, &options).map_err(|err| DbError::Open(db_path, err))?;
Ok(Arc::new(PeerStorageDisk {
db,
connected_peers: Mutex::new(HashMap::new()),
}))
}
fn get_peer_info(
addr: SocketAddr,
db: &impl DatabaseReader,
) -> Result<Option<PeerInfo>, PeerStorageError> {
let key = compute_peer_key(addr);
let Some(encoded) = db
.get_u8(&ReadOptions::new(), &key)
.map_err(DbError::Read)?
else {
return Ok(None);
};
let info = decode_peer_info(encoded)?;
Ok(Some(info))
}
fn delete_peer(
&self,
addr: SocketAddr,
last_attempt: Duration,
last_success: Duration,
) -> Result<(), PeerStorageError> {
let peer_key = compute_peer_key(addr);
let attempt_key = compute_last_attempt_time_key(last_attempt, Some(addr));
let success_key = compute_last_success_time_key(last_success, Some(addr));
let batch = WriteBatch::new();
batch.delete_u8(&peer_key);
batch.delete_u8(&attempt_key);
batch.delete_u8(&success_key);
self.db
.write(&WriteOptions::new(), &batch)
.map_err(|err| PeerStorageError::Db(DbError::Write(err)))
}
fn get_connected_peers(&self) -> HashMap<SocketAddr, bool> {
let mut connected_peers = HashMap::new();
for (addr, _) in self.connected_peers.lock().unwrap().iter() {
connected_peers.insert(*addr, true);
}
connected_peers
}
}
impl PeerStorage for PeerStorageDisk {
fn store(&self, addr: SocketAddr) -> Result<bool, PeerStorageError> {
if Self::get_peer_info(addr, &self.db)?.is_some() {
return Ok(false);
};
let mut rng = rand::rng();
let info = PeerInfo {
first_seen: now_as_duration(),
last_success: Duration::ZERO,
last_attempt: Duration::from_secs(rng.random_range(0..NEW_PEER_LAST_ATTEMPT_MAX)),
};
let batch = WriteBatch::new();
info.write_to_batch(addr, &batch)?;
let attempt_key = compute_last_attempt_time_key(info.last_attempt, Some(addr));
batch.put_u8(&attempt_key, &[0x1]);
self.db
.write(&WriteOptions::new(), &batch)
.map_err(DbError::Write)?;
Ok(true)
}
fn get(&self, count: usize) -> Result<Vec<SocketAddr>, PeerStorageError> {
let start_key = compute_last_attempt_time_key(Duration::ZERO, None);
let end_key = compute_last_attempt_time_key(now_as_duration(), None);
let mut addrs = Vec::new();
let connected_peers = self.get_connected_peers();
let snapshot = self.db.snapshot();
let iter = snapshot
.keys_iter(&ReadOptions::new())
.from(&start_key)
.to(&end_key);
for key in iter {
let (_when, addr) = decode_last_attempt_time_key(&key)?;
if connected_peers.contains_key(&addr) {
continue;
}
if let Some(info) = Self::get_peer_info(addr, &snapshot)? {
if !info.should_retry() {
continue;
}
}
addrs.push(addr);
if addrs.len() == count {
break;
}
}
Ok(addrs)
}
fn get_since(&self, count: usize, when: Duration) -> Result<Vec<SocketAddr>, PeerStorageError> {
let start_key = compute_last_success_time_key(when, None);
let end_key = compute_last_success_time_key(now_as_duration(), None);
let mut addrs = Vec::new();
let snapshot = self.db.snapshot();
let iter = snapshot
.keys_iter(&ReadOptions::new())
.from(&start_key)
.to(&end_key)
.reverse();
for key in iter {
let (_when, addr) = decode_last_success_time_key(&key)?;
addrs.push(addr);
if addrs.len() == count {
break;
}
}
Ok(addrs)
}
fn delete(&self, addr: SocketAddr) -> Result<(), PeerStorageError> {
let Some(info) = Self::get_peer_info(addr, &self.db)? else {
return Err(PeerStorageNotFoundError::PeerInfo(addr).into());
};
self.delete_peer(addr, info.last_attempt, info.last_success)
}
fn on_connect_attempt(&self, addr: SocketAddr) -> Result<(), PeerStorageError> {
let Some(mut info) = Self::get_peer_info(addr, &self.db)? else {
return Err(PeerStorageNotFoundError::PeerInfo(addr).into());
};
let batch = WriteBatch::new();
let attempt_key_old = compute_last_attempt_time_key(info.last_attempt, Some(addr));
batch.delete_u8(&attempt_key_old);
info.last_attempt = now_as_duration();
info.write_to_batch(addr, &batch)?;
let attempt_key_new = compute_last_attempt_time_key(info.last_attempt, Some(addr));
batch.put(&attempt_key_new, &[0x1]);
self.db
.write(&WriteOptions::new(), &batch)
.map_err(|err| PeerStorageError::Db(DbError::WriteBatch(err)))
}
fn on_connect_success(&self, addr: SocketAddr) -> Result<(), PeerStorageError> {
let Some(mut info) = Self::get_peer_info(addr, &self.db)? else {
return Err(PeerStorageNotFoundError::PeerInfo(addr).into());
};
let batch = WriteBatch::new();
let success_key_old = compute_last_success_time_key(info.last_success, Some(addr));
batch.delete_u8(&success_key_old);
info.last_success = now_as_duration();
info.write_to_batch(addr, &batch)?;
let success_key_new = compute_last_success_time_key(info.last_success, Some(addr));
batch.put_u8(&success_key_new, &[0x1]);
self.db
.write(&WriteOptions::new(), &batch)
.map_err(DbError::Write)?;
let mut connected_peers = self.connected_peers.lock().unwrap();
connected_peers.insert(addr, true);
Ok(())
}
fn on_connect_failure(&self, addr: SocketAddr) -> Result<(), PeerStorageError> {
let Some(info) = Self::get_peer_info(addr, &self.db)? else {
return Err(PeerStorageNotFoundError::PeerInfo(addr).into());
};
if info.should_delete() {
return self.delete_peer(addr, info.last_attempt, info.last_success);
}
Ok(())
}
fn on_disconnect(&self, addr: SocketAddr) -> Result<(), PeerStorageError> {
let mut connected_peers = self.connected_peers.lock().unwrap();
match connected_peers.remove(&addr) {
Some(_) => Ok(()),
None => Err(PeerStorageNotFoundError::Peer(addr).into()),
}
}
}
#[derive(Deserialize, Serialize)]
struct PeerInfo {
first_seen: Duration,
last_attempt: Duration,
last_success: Duration,
}
impl PeerInfo {
pub fn should_retry(&self) -> bool {
if self.last_attempt.as_secs() < NEW_PEER_LAST_ATTEMPT_MAX {
return true;
}
let last_seen = if self.last_success == Duration::ZERO {
self.first_seen
} else {
self.last_success
};
let now = now_as_duration();
let hours_since_last_seen = (now - last_seen).as_secs() / (60 * 60);
let minutes_since_last_attempt = (now - self.last_attempt).as_secs() / 60;
let hours_since_last_attempt = minutes_since_last_attempt / 60;
if hours_since_last_seen == 0 {
return minutes_since_last_attempt > 10;
}
let retry_interval = (hours_since_last_seen as f64).sqrt().ceil() as u64;
hours_since_last_attempt > retry_interval
}
fn should_delete(&self) -> bool {
if self.last_success == Duration::ZERO {
return true;
}
let week = Duration::from_secs(7 * 24 * 60 * 60);
let now = now_as_duration();
now - self.last_success > week
}
fn write_to_batch(&self, addr: SocketAddr, batch: &WriteBatch) -> Result<(), PeerStorageError> {
let key = compute_peer_key(addr);
let encoded = encode_peer_info(self)?;
batch.put_u8(&key, &encoded);
Ok(())
}
}
const PEER_PREFIX: u8 = b'p';
const PEER_LAST_ATTEMPT_TIME_PREFIX: u8 = b'a';
const PEER_LAST_SUCCESS_TIME_PREFIX: u8 = b's';
const PREFIX_LENGTH: usize = 1;
fn compute_peer_key(addr: SocketAddr) -> Vec<u8> {
let mut key = Vec::new();
key.push(PEER_PREFIX);
key.extend_from_slice(addr.to_string().as_bytes());
key
}
fn compute_last_attempt_time_key(when: Duration, addr: Option<SocketAddr>) -> Vec<u8> {
let mut key = Vec::new();
key.push(PEER_LAST_ATTEMPT_TIME_PREFIX);
key.extend_from_slice(&(when.as_secs()).to_be_bytes());
if let Some(addr) = addr {
key.extend_from_slice(addr.to_string().as_bytes());
}
key
}
fn decode_last_attempt_time_key(key: &[u8]) -> Result<(u64, SocketAddr), PeerStorageError> {
let when = u64::from_be_bytes(
key[PREFIX_LENGTH..][..U64_LENGTH]
.try_into()
.map_err(DataError::U64)?,
);
let addr_str = from_utf8(&key[PREFIX_LENGTH + U64_LENGTH..]).map_err(DataError::String)?;
let addr = SocketAddr::from_str(addr_str).map_err(ParsingError::IpAddress)?;
Ok((when, addr))
}
fn compute_last_success_time_key(when: Duration, addr: Option<SocketAddr>) -> Vec<u8> {
let mut key = Vec::new();
key.push(PEER_LAST_SUCCESS_TIME_PREFIX);
key.extend_from_slice(&(when.as_secs()).to_be_bytes());
if let Some(addr) = addr {
key.extend_from_slice(addr.to_string().as_bytes());
}
key
}
fn decode_last_success_time_key(key: &[u8]) -> Result<(Duration, SocketAddr), PeerStorageError> {
let when = u64::from_be_bytes(
key[PREFIX_LENGTH..][..U64_LENGTH]
.try_into()
.map_err(DataError::U64)?,
);
let who_str = from_utf8(&key[PREFIX_LENGTH + U64_LENGTH..]).map_err(DataError::String)?;
let who = SocketAddr::from_str(who_str).map_err(ParsingError::IpAddress)?;
Ok((Duration::from_secs(when), who))
}
fn encode_peer_info(info: &PeerInfo) -> Result<Vec<u8>, PeerStorageError> {
let encode = bincode::serde::encode_to_vec(info, bincode::config::legacy())
.map_err(|e| EncodingError::BincodeEncode(Box::new(e)))?;
Ok(encode)
}
fn decode_peer_info(encoded: Vec<u8>) -> Result<PeerInfo, PeerStorageError> {
let (decode, _) =
bincode::serde::decode_from_slice::<PeerInfo, _>(&encoded, bincode::config::legacy())
.map_err(|e| EncodingError::BincodeDecode(Box::new(e)))?;
Ok(decode)
}
#[cfg(test)]
mod test {
use faster_hex::hex_string;
use super::*;
use crate::peer::PEER_ADDR_SELF;
#[test]
fn test_compute_last_attempt_key() {
let when = Duration::from_secs(123456789);
let addr = PEER_ADDR_SELF;
let key = compute_last_attempt_time_key(when, Some(addr));
assert_eq!(key[0], PEER_LAST_ATTEMPT_TIME_PREFIX);
assert_eq!(key[1..][..U64_LENGTH], when.as_secs().to_be_bytes());
assert_eq!(key[1 + U64_LENGTH..], addr.to_string().as_bytes()[..]);
}
#[test]
fn test_decode_last_attempt_time_key() {
let when = Duration::from_secs(123456789);
let addr = PEER_ADDR_SELF;
let key = compute_last_attempt_time_key(when, Some(addr));
let result = decode_last_attempt_time_key(&key).unwrap();
assert_eq!(result.0, when.as_secs());
assert_eq!(result.1, addr);
}
#[test]
fn test_compute_last_attempt_time_key_start() {
let key = compute_last_attempt_time_key(Duration::ZERO, None);
assert_eq!(hex_string(&key), "610000000000000000");
}
#[test]
fn test_compute_last_attempt_time_key_end() {
let key = compute_last_attempt_time_key(Duration::MAX, None);
assert_eq!(hex_string(&key), "61ffffffffffffffff");
}
#[test]
fn test_compute_last_success_time_key() {
let when = Duration::from_secs(123456789);
let addr = PEER_ADDR_SELF;
let key = compute_last_success_time_key(when, Some(addr));
assert_eq!(key[0], PEER_LAST_SUCCESS_TIME_PREFIX);
assert_eq!(key[1..][..U64_LENGTH], when.as_secs().to_be_bytes());
assert_eq!(key[1 + U64_LENGTH..], addr.to_string().as_bytes()[..]);
}
#[test]
fn test_decode_last_success_time_key() {
let when = Duration::from_secs(123456789);
let key = compute_last_success_time_key(when, Some(PEER_ADDR_SELF));
let result = decode_last_success_time_key(&key).unwrap();
assert_eq!(result.0, when);
assert_eq!(result.1, PEER_ADDR_SELF);
}
#[test]
fn test_compute_peer_key() {
let addr = PEER_ADDR_SELF;
let key = compute_peer_key(addr);
assert_eq!(key[0], PEER_PREFIX);
assert_eq!(key[1..], addr.to_string().as_bytes()[..]);
}
}