Skip to main content

igc_net/
node.rs

1//! igc-net network node.
2//!
3//! `IgcIrohNode` manages the iroh endpoint, iroh-blobs store, gossip,
4//! and the local flat-file store.  It is the runtime handle passed to
5//! all publish and indexer operations.
6
7use std::path::PathBuf;
8use std::sync::Arc;
9
10use futures::StreamExt;
11use iroh::Endpoint;
12use iroh::EndpointAddr;
13use iroh::address_lookup::memory::MemoryLookup;
14use iroh::endpoint::presets;
15use iroh::protocol::Router;
16use iroh_blobs::store::fs::FsStore;
17use iroh_gossip::api::GossipSender;
18use iroh_gossip::net::{GOSSIP_ALPN, Gossip};
19use iroh_gossip::proto::TopicId;
20
21use crate::id::NodeIdHex;
22use crate::store::{FlatFileStore, StoreError};
23use crate::topic::announce_topic_id;
24
25// ── Error type ────────────────────────────────────────────────────────────────
26
27#[derive(Debug, thiserror::Error)]
28pub enum NodeError {
29    #[error("store: {0}")]
30    Store(#[from] StoreError),
31    #[error("I/O: {0}")]
32    Io(#[from] std::io::Error),
33    #[error("failed to bind iroh endpoint: {0}")]
34    EndpointBind(String),
35    #[error("failed to load iroh blob store: {0}")]
36    BlobStoreLoad(String),
37    #[error("failed to subscribe to announce topic: {0}")]
38    GossipSubscribe(String),
39    #[error("no IPv4 loopback socket is bound for this node")]
40    NoLoopbackSocket,
41}
42
43// ── IgcIrohNode ───────────────────────────────────────────────────────────────
44
45/// Runtime handle for an igc-net node.
46///
47/// Holds the iroh endpoint, iroh-blobs filesystem store, gossip handler,
48/// and the local flat-file store.
49pub struct IgcIrohNode {
50    pub(crate) endpoint: Endpoint,
51    pub(crate) fs_store: FsStore,
52    pub(crate) gossip: Gossip,
53    pub(crate) store: Arc<FlatFileStore>,
54    memory_lookup: MemoryLookup,
55    /// Holds the protocol router alive.  `Router` is `#[must_use]` — dropping
56    /// it aborts the accept loop for all registered ALPNs.
57    _router: Router,
58    /// Persistent announce-topic subscription.
59    ///
60    /// iroh-gossip only tracks HyParView state for a topic once a local
61    /// subscriber exists.  Without this subscription, incoming JOIN messages
62    /// from remote peers are silently discarded because the per-topic state
63    /// map entry is absent.  Keeping the sender alive ensures the topic state
64    /// exists from node start-up onwards, so remote peers can join the swarm
65    /// before the first `publish()` call.
66    ///
67    /// Also used by `publish()` to broadcast announcements without creating a
68    /// new subscription per call.
69    announce_sender: GossipSender,
70    node_id: NodeIdHex,
71}
72
73impl IgcIrohNode {
74    /// Build and start a node rooted at `data_dir`.
75    ///
76    /// - Loads or generates the Ed25519 key from `data_dir/node.key`.
77    /// - Opens `FlatFileStore` at `data_dir`.
78    /// - Binds an iroh `Endpoint`, starts `iroh-blobs` and `iroh-gossip`.
79    /// - Subscribes to the announce gossip topic so remote peers can join
80    ///   the swarm immediately (HyParView state must exist for this to work).
81    pub async fn start(data_dir: impl Into<PathBuf>) -> Result<Self, NodeError> {
82        let data_dir = data_dir.into();
83
84        // ── Flat-file store ───────────────────────────────────────────────────
85        let store = Arc::new(FlatFileStore::open(data_dir.clone()));
86        store.init().await?;
87
88        // ── Ed25519 key ───────────────────────────────────────────────────────
89        let key_bytes = match store.load_key_bytes()? {
90            Some(b) => b,
91            None => {
92                let mut rng = rand::rng();
93                let secret_key = iroh::SecretKey::generate(&mut rng);
94                let bytes = secret_key.to_bytes();
95                store.save_key_bytes(&bytes)?;
96                bytes
97            }
98        };
99        let secret_key = iroh::SecretKey::from_bytes(&key_bytes);
100
101        // ── iroh Endpoint ─────────────────────────────────────────────────────
102        // `MemoryLookup` allows callers to pre-populate peer addresses before
103        // gossip-bootstrapping, enabling direct loopback connections without
104        // relay infrastructure (used by integration tests).
105        let memory_lookup = MemoryLookup::new();
106        let endpoint = Endpoint::builder(presets::N0)
107            .secret_key(secret_key)
108            .address_lookup(memory_lookup.clone())
109            .bind()
110            .await
111            .map_err(|e| NodeError::EndpointBind(e.to_string()))?;
112
113        let node_id = NodeIdHex::from_public_key(endpoint.id());
114
115        // ── iroh-blobs filesystem store ───────────────────────────────────────
116        let blob_dir = data_dir.join("iroh-blobs");
117        tokio::fs::create_dir_all(&blob_dir).await?;
118        let fs_store = FsStore::load(&blob_dir)
119            .await
120            .map_err(|e| NodeError::BlobStoreLoad(e.to_string()))?;
121
122        // ── iroh-gossip ───────────────────────────────────────────────────────
123        let gossip = Gossip::builder().spawn(endpoint.clone());
124
125        // ── Router: register protocol handlers ────────────────────────────────
126        // `Router` is `#[must_use]` — the accept loop runs as long as the
127        // handle is alive.  It is stored in `IgcIrohNode` so it lives for the
128        // full lifetime of the node.
129        let router = Router::builder(endpoint.clone())
130            .accept(GOSSIP_ALPN, gossip.clone())
131            .accept(
132                iroh_blobs::ALPN,
133                iroh_blobs::BlobsProtocol::new(&fs_store, None),
134            )
135            .spawn();
136
137        // ── Persistent announce-topic subscription ────────────────────────────
138        // Subscribe to the announce topic with no bootstrap peers so the
139        // per-topic HyParView state is created immediately.  Remote indexers
140        // that bootstrap from this node via its PublicKey will then have their
141        // JOIN messages accepted and be added to the active view.  Without this
142        // subscription, incoming JOINs for an unknown topic are silently dropped
143        // by the gossip actor, so the broadcaster would have no known neighbors
144        // when it later calls `publish()`.
145        let announce_topic = TopicId::from_bytes(announce_topic_id());
146        let (announce_sender, mut announce_receiver) = gossip
147            .subscribe(announce_topic, vec![])
148            .await
149            .map_err(|e| NodeError::GossipSubscribe(e.to_string()))?
150            .split();
151
152        // Drain the receiver in the background to prevent backpressure from
153        // filling the event buffer and closing the subscription.
154        tokio::spawn(async move { while announce_receiver.next().await.is_some() {} });
155
156        tracing::info!(%node_id, data_dir = %data_dir.display(), "igc-net node started");
157
158        Ok(Self {
159            endpoint,
160            fs_store,
161            gossip,
162            store,
163            memory_lookup,
164            _router: router,
165            announce_sender,
166            node_id,
167        })
168    }
169
170    /// Gracefully shut down the node (closes endpoint and router).
171    pub async fn close(&self) {
172        self.endpoint.close().await;
173    }
174
175    /// The node's stable network identity (hex-encoded Ed25519 public key).
176    pub fn node_id(&self) -> &NodeIdHex {
177        &self.node_id
178    }
179
180    /// The node's iroh `PublicKey` (EndpointId) — use this for gossip bootstrap
181    /// when dialling the node directly via iroh.
182    pub fn iroh_node_id(&self) -> iroh::PublicKey {
183        self.endpoint.id()
184    }
185
186    /// The node's current `EndpointAddr` as reported by the iroh endpoint.
187    ///
188    /// Right after `start()` this typically contains wildcard bind addresses
189    /// (`0.0.0.0:PORT`) which are not dialable by remote peers.  For loopback
190    /// integration tests use [`loopback_endpoint_addr`] instead.
191    pub fn endpoint_addr(&self) -> EndpointAddr {
192        self.endpoint.addr()
193    }
194
195    /// Build an `EndpointAddr` with a proper `127.0.0.1:PORT` direct address.
196    ///
197    /// Uses the actual bound UDP port from the endpoint and replaces the
198    /// wildcard `0.0.0.0` bind address with the loopback interface.  Pass
199    /// the result to a peer's [`add_peer_addr`] in integration tests so that
200    /// gossip-bootstrap can dial over loopback without relay infrastructure.
201    pub fn loopback_endpoint_addr(&self) -> Result<EndpointAddr, NodeError> {
202        let id = self.endpoint.id();
203        let port = self.loopback_port()?;
204        Ok(EndpointAddr::new(id).with_ip_addr(std::net::SocketAddr::from((
205            [127, 0, 0, 1],
206            port,
207        ))))
208    }
209
210    /// Return the node's loopback endpoint as a `"node_id@127.0.0.1:port"` string.
211    ///
212    /// Use this to populate a remote peer's address book (via [`add_peer_addr`])
213    /// for direct loopback connections in tests and private networks that don't
214    /// rely on relay-based discovery.
215    pub fn loopback_addr_str(&self) -> Result<String, NodeError> {
216        let port = self.loopback_port()?;
217        Ok(format!("{}@127.0.0.1:{}", self.node_id(), port))
218    }
219
220    /// Pre-populate this node's address book with a peer's `EndpointAddr`.
221    ///
222    /// After calling this, the node can dial the peer by its `EndpointId`
223    /// alone (e.g., as a gossip bootstrap peer) using the known direct address
224    /// instead of relay-based discovery.
225    pub fn add_peer_addr(&self, addr: EndpointAddr) {
226        self.memory_lookup.add_endpoint_info(addr);
227    }
228
229    /// The persistent announce-topic sender.
230    ///
231    /// Use this to broadcast on the announce topic without creating a new
232    /// gossip subscription.
233    pub(crate) fn announce_sender(&self) -> &GossipSender {
234        &self.announce_sender
235    }
236
237    /// Access the local flat-file store.
238    pub fn store(&self) -> &FlatFileStore {
239        self.store.as_ref()
240    }
241
242    /// Resolve a local read-only filesystem path for a BLAKE3-keyed blob.
243    ///
244    /// Returns `Some(path)` when the blob is present in the flat-file store.
245    /// The caller may read the file directly in read-only mode; mutation must
246    /// go through `publish()` or the store's `put()` method.
247    pub fn resolve_path(&self, igc_hash: &str) -> Result<Option<std::path::PathBuf>, StoreError> {
248        self.store.resolve_path(igc_hash)
249    }
250
251    fn loopback_port(&self) -> Result<u16, NodeError> {
252        self.endpoint
253            .bound_sockets()
254            .into_iter()
255            .find_map(|addr| if addr.is_ipv4() { Some(addr.port()) } else { None })
256            .ok_or(NodeError::NoLoopbackSocket)
257    }
258}