use crate::behaviour::peer_store::store::Store;
use crate::prelude::swarm::derive_prelude::ConnectionEstablished;
use crate::prelude::swarm::{ConnectionClosed, FromSwarm, NewExternalAddrOfPeer};
use futures::future::{Ready, ready};
use futures::{FutureExt, StreamExt};
use futures_timer::Delay;
use indexmap::{IndexMap, IndexSet};
use libp2p::swarm::ConnectionId;
use libp2p::{Multiaddr, PeerId};
use pollable_map::futures::FutureMap;
use std::task::{Context, Poll};
#[derive(Default)]
pub struct MemoryStore {
peers: IndexMap<PeerId, IndexSet<Multiaddr>>,
connections: IndexMap<PeerId, IndexMap<Multiaddr, IndexSet<ConnectionId>>>,
persistent: IndexSet<PeerId>,
timer: FutureMap<(PeerId, Multiaddr), Delay>,
}
impl FromIterator<(PeerId, Multiaddr)> for MemoryStore {
fn from_iter<T: IntoIterator<Item = (PeerId, Multiaddr)>>(iter: T) -> Self {
let mut store = Self::default();
for (peer_id, addr) in iter {
store.persistent.insert(peer_id);
store.peers.entry(peer_id).or_default().insert(addr);
}
store
}
}
#[allow(refining_impl_trait)]
impl Store for MemoryStore {
type Event = ();
fn insert(&mut self, peer_id: PeerId, address: Multiaddr) -> Ready<std::io::Result<()>> {
self.persistent.insert(peer_id);
self.timer.remove(&(peer_id, address.clone()));
let result = match self.peers.entry(peer_id).or_default().insert(address) {
true => Ok(()),
false => Err(std::io::Error::new(
std::io::ErrorKind::AlreadyExists,
"address already exists",
)),
};
ready(result)
}
fn remove(&mut self, peer_id: &PeerId) -> Ready<std::io::Result<Vec<Multiaddr>>> {
let result = {
let list = self.peers.shift_remove(peer_id);
match list {
Some(list) => {
self.persistent.shift_remove(peer_id);
Ok(Vec::from_iter(list))
}
None => Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
"peer not found",
)),
}
};
ready(result)
}
fn remove_address(
&mut self,
peer_id: &PeerId,
address: &Multiaddr,
) -> Ready<std::io::Result<()>> {
let Some(list) = self.peers.get_mut(peer_id) else {
return ready(Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
"peer not found",
)));
};
if !list.shift_remove(address) {
return ready(Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
"address not found",
)));
}
if list.is_empty() {
self.peers.shift_remove(peer_id);
self.persistent.shift_remove(peer_id);
}
ready(Ok(()))
}
fn address(&self, peer_id: &PeerId) -> Ready<std::io::Result<Vec<Multiaddr>>> {
let Some(addrs) = self.peers.get(peer_id).cloned() else {
return ready(Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
"peer not found",
)));
};
ready(Ok(Vec::from_iter(addrs)))
}
fn list_all(&self) -> Ready<std::io::Result<Vec<(PeerId, Vec<Multiaddr>)>>> {
let list = self
.peers
.iter()
.map(|(peer_id, list)| {
let list = Vec::from_iter(list.clone());
(*peer_id, list)
})
.collect::<Vec<_>>();
ready(Ok(list))
}
fn in_memory_address(&self, peer_id: &PeerId) -> Vec<Multiaddr> {
self.peers
.get(peer_id)
.cloned()
.map(Vec::from_iter)
.unwrap_or_default()
}
fn on_swarm_event(&mut self, event: &FromSwarm) {
match event {
FromSwarm::NewExternalAddrOfPeer(NewExternalAddrOfPeer { peer_id, addr }) => {
self.peers
.entry(*peer_id)
.or_default()
.insert(Multiaddr::clone(addr));
self.persistent.insert(*peer_id);
}
FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id,
connection_id,
endpoint,
failed_addresses: _,
..
}) => {
let remote_addr = endpoint.get_remote_address();
self.connections
.entry(*peer_id)
.or_default()
.entry(remote_addr.clone())
.or_default()
.insert(*connection_id);
self.peers
.entry(*peer_id)
.or_default()
.insert(remote_addr.clone());
}
FromSwarm::ConnectionClosed(ConnectionClosed {
connection_id,
peer_id,
endpoint,
..
}) => {
let remote_addr = endpoint.get_remote_address();
let Some(connections) = self.connections.get_mut(peer_id) else {
return;
};
let Some(list) = connections.get_mut(remote_addr) else {
return;
};
list.shift_remove(connection_id);
if !list.is_empty() {
return;
}
connections.shift_remove(remote_addr);
if !connections.is_empty() {
return;
}
self.connections.shift_remove(peer_id);
if !self.persistent.contains(peer_id) {
self.timer.insert(
(*peer_id, remote_addr.clone()),
Delay::new(std::time::Duration::from_secs(60)),
);
}
self.connections.shrink_to_fit();
}
_ => {}
}
}
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Self::Event> {
while let Poll::Ready(Some(((peer_id, addr), _))) = self.timer.poll_next_unpin(cx) {
if let Err(e) = self
.remove_address(&peer_id, &addr)
.now_or_never()
.expect("future ready")
{
tracing::error!(%peer_id, %addr, error = %e, "failed to remove address from store");
}
}
Poll::Pending
}
}