use std::path::PathBuf;
use std::sync::Arc;
use futures::StreamExt;
use iroh::Endpoint;
use iroh::EndpointAddr;
use iroh::address_lookup::memory::MemoryLookup;
use iroh::endpoint::presets;
use iroh::protocol::Router;
use iroh_blobs::store::fs::FsStore;
use iroh_gossip::api::GossipSender;
use iroh_gossip::net::{GOSSIP_ALPN, Gossip};
use iroh_gossip::proto::TopicId;
use crate::id::NodeIdHex;
use crate::store::{FlatFileStore, StoreError};
use crate::topic::announce_topic_id;
#[derive(Debug, thiserror::Error)]
pub enum NodeError {
#[error("store: {0}")]
Store(#[from] StoreError),
#[error("I/O: {0}")]
Io(#[from] std::io::Error),
#[error("failed to bind iroh endpoint: {0}")]
EndpointBind(String),
#[error("failed to load iroh blob store: {0}")]
BlobStoreLoad(String),
#[error("failed to subscribe to announce topic: {0}")]
GossipSubscribe(String),
#[error("no IPv4 loopback socket is bound for this node")]
NoLoopbackSocket,
}
pub struct IgcIrohNode {
pub(crate) endpoint: Endpoint,
pub(crate) fs_store: FsStore,
pub(crate) gossip: Gossip,
pub(crate) store: Arc<FlatFileStore>,
memory_lookup: MemoryLookup,
_router: Router,
announce_sender: GossipSender,
node_id: NodeIdHex,
}
impl IgcIrohNode {
pub async fn start(data_dir: impl Into<PathBuf>) -> Result<Self, NodeError> {
let data_dir = data_dir.into();
let store = Arc::new(FlatFileStore::open(data_dir.clone()));
store.init().await?;
let key_bytes = match store.load_key_bytes()? {
Some(b) => b,
None => {
let mut rng = rand::rng();
let secret_key = iroh::SecretKey::generate(&mut rng);
let bytes = secret_key.to_bytes();
store.save_key_bytes(&bytes)?;
bytes
}
};
let secret_key = iroh::SecretKey::from_bytes(&key_bytes);
let memory_lookup = MemoryLookup::new();
let endpoint = Endpoint::builder(presets::N0)
.secret_key(secret_key)
.address_lookup(memory_lookup.clone())
.bind()
.await
.map_err(|e| NodeError::EndpointBind(e.to_string()))?;
let node_id = NodeIdHex::from_public_key(endpoint.id());
let blob_dir = data_dir.join("iroh-blobs");
tokio::fs::create_dir_all(&blob_dir).await?;
let fs_store = FsStore::load(&blob_dir)
.await
.map_err(|e| NodeError::BlobStoreLoad(e.to_string()))?;
let gossip = Gossip::builder().spawn(endpoint.clone());
let router = Router::builder(endpoint.clone())
.accept(GOSSIP_ALPN, gossip.clone())
.accept(
iroh_blobs::ALPN,
iroh_blobs::BlobsProtocol::new(&fs_store, None),
)
.spawn();
let announce_topic = TopicId::from_bytes(announce_topic_id());
let (announce_sender, mut announce_receiver) = gossip
.subscribe(announce_topic, vec![])
.await
.map_err(|e| NodeError::GossipSubscribe(e.to_string()))?
.split();
tokio::spawn(async move { while announce_receiver.next().await.is_some() {} });
tracing::info!(%node_id, data_dir = %data_dir.display(), "igc-net node started");
Ok(Self {
endpoint,
fs_store,
gossip,
store,
memory_lookup,
_router: router,
announce_sender,
node_id,
})
}
pub async fn close(&self) {
self.endpoint.close().await;
}
pub fn node_id(&self) -> &NodeIdHex {
&self.node_id
}
pub fn iroh_node_id(&self) -> iroh::PublicKey {
self.endpoint.id()
}
pub fn endpoint_addr(&self) -> EndpointAddr {
self.endpoint.addr()
}
pub fn loopback_endpoint_addr(&self) -> Result<EndpointAddr, NodeError> {
let id = self.endpoint.id();
let port = self.loopback_port()?;
Ok(EndpointAddr::new(id).with_ip_addr(std::net::SocketAddr::from((
[127, 0, 0, 1],
port,
))))
}
pub fn loopback_addr_str(&self) -> Result<String, NodeError> {
let port = self.loopback_port()?;
Ok(format!("{}@127.0.0.1:{}", self.node_id(), port))
}
pub fn add_peer_addr(&self, addr: EndpointAddr) {
self.memory_lookup.add_endpoint_info(addr);
}
pub(crate) fn announce_sender(&self) -> &GossipSender {
&self.announce_sender
}
pub fn store(&self) -> &FlatFileStore {
self.store.as_ref()
}
pub fn resolve_path(&self, igc_hash: &str) -> Result<Option<std::path::PathBuf>, StoreError> {
self.store.resolve_path(igc_hash)
}
fn loopback_port(&self) -> Result<u16, NodeError> {
self.endpoint
.bound_sockets()
.into_iter()
.find_map(|addr| if addr.is_ipv4() { Some(addr.port()) } else { None })
.ok_or(NodeError::NoLoopbackSocket)
}
}