use {
crate::{
Tag,
UniqueId,
discovery::{
announce::Announce,
catalog::UpsertResult,
ping::Ping,
worker::WorkerCommand,
},
network::{LocalNode, PeerId, link::Protocol},
primitives::{IntoIterOrSingle, Short},
tickets::Ticket,
},
core::fmt,
iroh::{
EndpointAddr,
endpoint::Connection,
protocol::{AcceptError, ProtocolHandler, RouterBuilder},
},
std::sync::Arc,
sync::CatalogSync,
tokio::sync::{broadcast, mpsc::UnboundedSender, watch},
worker::{Handle, WorkerLoop},
};
mod announce;
mod catalog;
mod config;
mod dht;
mod entry;
mod error;
mod event;
pub(crate) mod ping;
mod sync;
mod worker;
pub use {
announce::Event as AnnounceEvent,
catalog::Catalog,
config::{Config, ConfigBuilder, ConfigBuilderError, IntoConfig},
entry::{PeerEntry, PeerEntryVersion, SignedPeerEntry},
error::Error,
event::Event,
};
#[derive(Clone)]
pub struct Discovery(Arc<Handle>);
impl Discovery {
pub async fn dial<V>(&self, peers: impl IntoIterOrSingle<EndpointAddr, V>) {
self.0.dial(peers.iterator().into_iter()).await;
}
pub fn me(&self) -> SignedPeerEntry {
self.catalog().local().clone()
}
pub fn catalog(&self) -> Catalog {
self.0.catalog.borrow().clone()
}
pub fn catalog_watch(&self) -> watch::Receiver<Catalog> {
self.0.catalog.subscribe()
}
pub fn events(&self) -> broadcast::Receiver<Event> {
self.0.events.resubscribe()
}
pub fn sync_with(
&self,
peer_addr: impl Into<EndpointAddr>,
) -> impl Future<Output = Result<(), Error>> + Send + Sync + 'static {
self.0.sync_with(peer_addr)
}
pub fn feed(&self, entry: SignedPeerEntry) -> bool {
self.0.catalog.send_if_modified(|catalog| {
matches!(
catalog.upsert_signed(entry),
UpsertResult::New(_) | UpsertResult::Updated(_)
)
})
}
pub fn insert(&self, entry: impl Into<PeerEntry>) -> bool {
self
.0
.catalog
.send_if_modified(|catalog| catalog.insert_unsigned(entry.into()))
}
pub fn remove(&self, peer_id: PeerId) -> bool {
self
.0
.catalog
.send_if_modified(|catalog| catalog.remove_unsigned(&peer_id).is_some())
}
pub fn clear_unsigned(&self) -> bool {
self
.0
.catalog
.send_if_modified(|catalog| catalog.clear_unsigned())
}
pub fn add_tags<V>(&self, tags: impl IntoIterOrSingle<Tag, V>) {
let tags: Vec<_> = tags.iterator().into_iter().collect();
self.update_local_entry(|entry| entry.add_tags(tags));
}
pub fn remove_tags<V>(&self, tags: impl IntoIterOrSingle<Tag, V>) {
let tags: Vec<_> = tags.iterator().into_iter().collect();
self.update_local_entry(|entry| entry.remove_tags(tags));
}
pub fn add_ticket(&self, ticket: Ticket) {
self.update_local_entry(|entry| entry.add_ticket(ticket));
}
pub fn remove_ticket(&self, id: UniqueId) {
self.update_local_entry(move |entry| entry.remove_ticket(id));
}
pub fn remove_tickets_of(&self, class: UniqueId) {
self.update_local_entry(move |entry| entry.remove_tickets_of(class));
}
}
impl Discovery {
pub(crate) fn new(local: LocalNode, config: Config) -> Self {
Self(WorkerLoop::spawn(local, config))
}
}
impl Discovery {
pub(crate) fn update_local_entry(
&self,
update: impl FnOnce(PeerEntry) -> PeerEntry + Send + 'static,
) {
self.0.catalog.send_modify(|catalog| {
let local_entry = catalog.local().clone();
let updated_entry = update(local_entry.into());
let signed_updated_entry = updated_entry
.increment_version()
.sign(self.0.local.secret_key())
.expect("signing updated local peer entry failed.");
assert!(
catalog.upsert_signed(signed_updated_entry).is_ok(),
"local peer info versioning error. this is a bug."
);
});
}
}
impl crate::network::ProtocolProvider for Discovery {
fn install(&self, protocols: RouterBuilder) -> RouterBuilder {
let announce = Acceptor {
name: Announce::ALPN,
variant_fn: WorkerCommand::AcceptAnnounce,
tx: self.0.commands.clone(),
};
let catalog_sync = Acceptor {
name: CatalogSync::ALPN,
variant_fn: WorkerCommand::AcceptCatalogSync,
tx: self.0.commands.clone(),
};
let ping = Acceptor {
name: Ping::ALPN,
variant_fn: WorkerCommand::AcceptPing,
tx: self.0.commands.clone(),
};
protocols
.accept(announce.name, announce)
.accept(catalog_sync.name, catalog_sync)
.accept(ping.name, ping)
}
}
impl fmt::Debug for Discovery {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Discovery(network_id={})",
Short(self.0.local.network_id())
)
}
}
struct Acceptor {
name: &'static [u8],
variant_fn: fn(Connection) -> WorkerCommand,
tx: UnboundedSender<WorkerCommand>,
}
impl core::fmt::Debug for Acceptor {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "Discovery({})", unsafe {
str::from_utf8_unchecked(self.name)
})
}
}
impl ProtocolHandler for Acceptor {
async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
let command = (self.variant_fn)(connection);
let _ = self.tx.send(command);
Ok(())
}
}