use std::collections::BTreeSet;
use std::pin::Pin;
use std::sync::Arc;
use futures_util::{FutureExt, Stream, StreamExt};
use iroh::address_lookup::Error as AddressLookupError;
use iroh::address_lookup::Item as AddressLookupItem;
use iroh::address_lookup::{AddressLookup, EndpointData, EndpointInfo};
use p2panda_core::SigningKey;
use p2panda_store::address_book::NodeInfo as _;
use tokio::sync::Semaphore;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{Instrument, error, info_span, trace, warn};
use crate::address_book::AddressBook;
use crate::addrs::{NodeTransportInfo, UnsignedTransportInfo};
use crate::utils::{from_verifying_key, to_verifying_key};
pub struct AddressBookDiscovery {
signing_key: SigningKey,
address_book: AddressBook,
semaphore: Arc<Semaphore>,
}
impl std::fmt::Debug for AddressBookDiscovery {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AddressBookDiscovery").finish()
}
}
const PROVENANCE: &str = "address_book";
impl AddressBookDiscovery {
pub fn new(signing_key: SigningKey, address_book: AddressBook) -> Self {
Self {
signing_key,
address_book,
semaphore: Arc::new(Semaphore::new(1)),
}
}
}
impl AddressLookup for AddressBookDiscovery {
fn publish(&self, data: &EndpointData) {
let signing_key = self.signing_key.clone();
let verifying_key = signing_key.verifying_key();
let data = data.to_owned();
let semaphore = self.semaphore.clone();
let address_book = self.address_book.clone();
tokio::task::spawn(async move {
let Ok(_permit) = semaphore.acquire().await else {
error!("failed acquiring semaphore permit");
return;
};
let Ok(node_info) = address_book.node_info(verifying_key).await else {
error!("failed getting own transport info from address book");
return;
};
let previous_transport_info = node_info.and_then(|info| info.transports());
let Ok(transport_info) = if data.has_addrs() {
UnsignedTransportInfo::from_addrs([iroh::EndpointAddr {
id: from_verifying_key(verifying_key),
addrs: BTreeSet::from_iter(data.addrs().cloned()),
}
.into()])
} else {
UnsignedTransportInfo::new()
}
.increment_timestamp(previous_transport_info.as_ref())
.sign(&signing_key) else {
error!("failed signing own transport info");
return;
};
if let Some(previous) = previous_transport_info
&& transport_info.addresses() == previous.addresses()
{
return;
}
if let Err(err) = address_book
.insert_transport_info(verifying_key, transport_info.clone().into())
.await
{
warn!("could not update address book with own transport info: {err:#?}");
}
});
}
fn resolve(
&self,
endpoint_id: iroh::EndpointId,
) -> Option<BoxStream<Result<AddressLookupItem, AddressLookupError>>> {
let span = info_span!("resolve", endpoint_id = %endpoint_id.fmt_short());
trace!(parent: &span, "try to resolve endpoint id");
let address_book = self.address_book.clone();
let stream = async move {
let subscription = address_book
.watch_node_info(to_verifying_key(endpoint_id), false)
.await
.map_err(|_| {
AddressLookupError::from_err_any(
PROVENANCE,
"address book actor did not respond with subscription",
)
});
match subscription {
Ok(rx) => UnboundedReceiverStream::new(rx)
.filter_map(|event| async {
match event.value {
Some(node_info) => {
if node_info.is_stale() {
return Some(Err(AddressLookupError::from_err_any(
PROVENANCE,
"node is marked as stale",
)));
}
match iroh::EndpointAddr::try_from(node_info) {
Ok(endpoint_addr) => {
let info = EndpointInfo::from(endpoint_addr);
Some(Ok(AddressLookupItem::new(info, PROVENANCE, None)))
}
Err(_) => {
None
}
}
}
None => {
None
}
}
})
.boxed(),
Err(err) => {
warn!("failed resolving address due to actor error: {err:#?}");
futures_util::stream::once(async { Err(err) }).boxed()
}
}
}
.instrument(span);
Some(Box::pin(stream.flatten_stream()))
}
}
type BoxStream<T> = Pin<Box<dyn Stream<Item = T> + Send + 'static>>;