use {
super::{Config, Event, PeerEntry, SignedPeerEntry},
crate::{
NetworkId,
network::{LocalNode, PeerId},
},
chrono::Utc,
std::sync::Arc,
};
#[derive(Debug, Clone)]
pub struct Catalog {
config: Arc<Config>,
local_id: PeerId,
network_id: NetworkId,
signed: im::OrdMap<PeerId, SignedPeerEntry>,
unsigned: im::OrdMap<PeerId, PeerEntry>,
}
impl Catalog {
pub const fn local_id(&self) -> PeerId {
self.local_id
}
pub const fn network_id(&self) -> NetworkId {
self.network_id
}
pub fn peers(&self) -> impl DoubleEndedIterator<Item = &PeerEntry> {
let last_valid = Utc::now() - self.config.purge_after;
self
.signed
.values()
.map(|signed| signed.as_ref())
.chain(self.unsigned.values())
.filter(move |p| *p.id() != self.local_id && p.updated_at() >= last_valid)
}
pub fn signed_peers(
&self,
) -> impl DoubleEndedIterator<Item = &SignedPeerEntry> {
let last_valid = Utc::now() - self.config.purge_after;
self.signed.values().filter(move |p: &&SignedPeerEntry| {
*p.id() != self.local_id && p.updated_at() >= last_valid
})
}
pub fn unsigned_peers(&self) -> impl DoubleEndedIterator<Item = &PeerEntry> {
self
.unsigned
.values()
.filter(|p: &&PeerEntry| *p.id() != self.local_id)
}
pub fn iter(&self) -> impl DoubleEndedIterator<Item = &PeerEntry> {
let last_valid = Utc::now() - self.config.purge_after;
self
.signed
.values()
.map(|signed| signed.as_ref())
.filter(move |p: &&PeerEntry| p.updated_at() >= last_valid)
.chain(self.unsigned.values())
}
pub fn iter_signed(
&self,
) -> impl DoubleEndedIterator<Item = &SignedPeerEntry> {
let last_valid = Utc::now() - self.config.purge_after;
self
.signed
.values()
.filter(move |p| p.updated_at() >= last_valid)
}
pub fn get(&self, peer_id: &PeerId) -> Option<&PeerEntry> {
let last_valid = Utc::now() - self.config.purge_after;
self
.signed
.get(peer_id)
.filter(|p| p.updated_at() >= last_valid)
.map(|signed| signed.as_ref())
.or_else(|| self.unsigned.get(peer_id))
}
pub fn get_signed(&self, peer_id: &PeerId) -> Option<&SignedPeerEntry> {
let last_valid = Utc::now() - self.config.purge_after;
self
.signed
.get(peer_id)
.filter(move |p| p.updated_at() >= last_valid)
}
pub fn local(&self) -> &SignedPeerEntry {
#[expect(clippy::missing_panics_doc)]
self
.signed
.get(&self.local_id)
.expect("local peer entry always exists")
}
pub fn signed_peers_count(&self) -> usize {
self.signed_peers().count()
}
pub fn peers_count(&self) -> usize {
self.iter().count().saturating_sub(1)
}
}
impl Catalog {
pub(super) fn insert_unsigned(&mut self, entry: PeerEntry) -> bool {
if entry.id() == &self.local_id {
return false;
}
if entry.network_id() != self.network_id {
return false;
}
if !self.signed.contains_key(entry.id()) {
self.unsigned.insert(*entry.id(), entry);
return true;
}
false
}
pub(super) fn remove_unsigned(
&mut self,
peer_id: &PeerId,
) -> Option<PeerEntry> {
self.unsigned.remove(peer_id)
}
pub(super) fn clear_unsigned(&mut self) -> bool {
let was_empty = self.unsigned.is_empty();
self.unsigned.clear();
!was_empty
}
}
pub enum UpsertResult<'a> {
New(&'a SignedPeerEntry),
Updated(&'a SignedPeerEntry),
Rejected {
rejected: Box<SignedPeerEntry>,
existing: &'a SignedPeerEntry,
},
Outdated(Box<SignedPeerEntry>),
DifferentNetwork(NetworkId),
}
impl UpsertResult<'_> {
pub const fn is_new(&self) -> bool {
matches!(self, UpsertResult::New(_))
}
pub const fn is_updated(&self) -> bool {
matches!(self, UpsertResult::Updated(_))
}
pub const fn is_ok(&self) -> bool {
self.is_new() || self.is_updated()
}
}
impl Catalog {
pub(super) fn new(local: &LocalNode, config: &Arc<Config>) -> Self {
let local_entry = PeerEntry::new(*local.network_id(), local.addr())
.add_tags(config.tags.clone())
.sign(local.secret_key())
.expect("signing local peer entry failed.");
let mut signed = im::OrdMap::new();
signed.insert(local.id(), local_entry);
Self {
local_id: local.id(),
network_id: *local.network_id(),
config: Arc::clone(config),
signed,
unsigned: im::OrdMap::new(),
}
}
pub(super) fn upsert_signed(
&mut self,
entry: SignedPeerEntry,
) -> UpsertResult<'_> {
if entry.network_id() != self.network_id {
return UpsertResult::DifferentNetwork(*entry.network_id());
}
let last_valid = Utc::now() - self.config.purge_after;
if entry.updated_at() < last_valid {
return UpsertResult::Outdated(Box::new(entry));
}
let peer_id = *entry.id();
self.unsigned.remove(entry.id());
match self.signed.entry(peer_id) {
im::ordmap::Entry::Occupied(mut existing) => {
if entry.is_newer_than(existing.get()) {
existing.insert(entry);
UpsertResult::Updated(
self.signed.get(&peer_id).expect("entry exists"),
)
} else {
UpsertResult::Rejected {
rejected: Box::new(entry),
existing: self.signed.get(&peer_id).expect("entry exists"),
}
}
}
im::ordmap::Entry::Vacant(vacant) => {
let id = *entry.id();
vacant.insert(entry);
UpsertResult::New(self.signed.get(&id).expect("entry exists"))
}
}
}
#[expect(unused)]
pub(super) fn remove(&mut self, peer_id: &PeerId) -> Option<PeerEntry> {
if peer_id == &self.local_id {
return None;
}
if let Some(existing) = self.signed.remove(peer_id) {
return Some(existing.into());
}
self.unsigned.remove(peer_id)
}
pub(super) fn remove_signed(
&mut self,
peer_id: &PeerId,
) -> Option<SignedPeerEntry> {
if peer_id == &self.local_id {
return None;
}
self.signed.remove(peer_id)
}
#[expect(unused)]
pub(super) fn clear(&mut self) {
let local_entry = self
.signed
.get(&self.local_id)
.expect("local peer entry always exists")
.clone();
self.signed.clear();
self.signed.insert(self.local_id, local_entry);
self.unsigned.clear();
}
pub(super) fn purge_stale_entries(
&mut self,
) -> impl Iterator<Item = PeerEntry> + 'static {
let last_valid = Utc::now() - self.config.purge_after;
let stale_signed: Vec<PeerEntry> = self
.signed
.iter()
.filter_map(|(peer_id, entry)| {
if *peer_id != self.local_id && entry.updated_at() < last_valid {
Some(entry.into())
} else {
None
}
})
.collect();
for peer_entry in &stale_signed {
self.signed.remove(peer_entry.id());
}
stale_signed.into_iter()
}
pub(super) fn extend_signed(
&mut self,
entries: impl Iterator<Item = SignedPeerEntry>,
) -> impl Iterator<Item = Event> {
let mut events = Vec::new();
for signed in entries {
if signed.id() != &self.local_id {
match self.upsert_signed(signed) {
UpsertResult::New(entry) => {
events.push(Event::PeerDiscovered(entry.clone().into_unsigned()));
}
UpsertResult::Updated(entry) => {
events.push(Event::PeerUpdated(entry.clone().into_unsigned()));
}
_ => {}
}
}
}
events.into_iter()
}
}