igc_net/node.rs
1//! igc-net network node.
2//!
3//! `IgcIrohNode` manages the iroh endpoint, iroh-blobs store, gossip,
4//! and the local flat-file store. It is the runtime handle passed to
5//! all publish and indexer operations.
6
7use std::path::PathBuf;
8use std::sync::Arc;
9
10use futures::StreamExt;
11use iroh::Endpoint;
12use iroh::EndpointAddr;
13use iroh::address_lookup::memory::MemoryLookup;
14use iroh::endpoint::presets;
15use iroh::protocol::Router;
16use iroh_blobs::store::fs::FsStore;
17use iroh_gossip::api::GossipSender;
18use iroh_gossip::net::{GOSSIP_ALPN, Gossip};
19use iroh_gossip::proto::TopicId;
20
21use crate::id::NodeIdHex;
22use crate::store::{FlatFileStore, StoreError};
23use crate::topic::announce_topic_id;
24
25// ── Error type ────────────────────────────────────────────────────────────────
26
27#[derive(Debug, thiserror::Error)]
28pub enum NodeError {
29 #[error("store: {0}")]
30 Store(#[from] StoreError),
31 #[error("I/O: {0}")]
32 Io(#[from] std::io::Error),
33 #[error("failed to bind iroh endpoint: {0}")]
34 EndpointBind(String),
35 #[error("failed to load iroh blob store: {0}")]
36 BlobStoreLoad(String),
37 #[error("failed to subscribe to announce topic: {0}")]
38 GossipSubscribe(String),
39 #[error("no IPv4 loopback socket is bound for this node")]
40 NoLoopbackSocket,
41}
42
43// ── IgcIrohNode ───────────────────────────────────────────────────────────────
44
45/// Runtime handle for an igc-net node.
46///
47/// Holds the iroh endpoint, iroh-blobs filesystem store, gossip handler,
48/// and the local flat-file store.
49pub struct IgcIrohNode {
50 pub(crate) endpoint: Endpoint,
51 pub(crate) fs_store: FsStore,
52 pub(crate) gossip: Gossip,
53 pub(crate) store: Arc<FlatFileStore>,
54 memory_lookup: MemoryLookup,
55 /// Holds the protocol router alive. `Router` is `#[must_use]` — dropping
56 /// it aborts the accept loop for all registered ALPNs.
57 _router: Router,
58 /// Persistent announce-topic subscription.
59 ///
60 /// iroh-gossip only tracks HyParView state for a topic once a local
61 /// subscriber exists. Without this subscription, incoming JOIN messages
62 /// from remote peers are silently discarded because the per-topic state
63 /// map entry is absent. Keeping the sender alive ensures the topic state
64 /// exists from node start-up onwards, so remote peers can join the swarm
65 /// before the first `publish()` call.
66 ///
67 /// Also used by `publish()` to broadcast announcements without creating a
68 /// new subscription per call.
69 announce_sender: GossipSender,
70 node_id: NodeIdHex,
71}
72
73impl IgcIrohNode {
74 /// Build and start a node rooted at `data_dir`.
75 ///
76 /// - Loads or generates the Ed25519 key from `data_dir/node.key`.
77 /// - Opens `FlatFileStore` at `data_dir`.
78 /// - Binds an iroh `Endpoint`, starts `iroh-blobs` and `iroh-gossip`.
79 /// - Subscribes to the announce gossip topic so remote peers can join
80 /// the swarm immediately (HyParView state must exist for this to work).
81 pub async fn start(data_dir: impl Into<PathBuf>) -> Result<Self, NodeError> {
82 let data_dir = data_dir.into();
83
84 // ── Flat-file store ───────────────────────────────────────────────────
85 let store = Arc::new(FlatFileStore::open(data_dir.clone()));
86 store.init().await?;
87
88 // ── Ed25519 key ───────────────────────────────────────────────────────
89 let key_bytes = match store.load_key_bytes()? {
90 Some(b) => b,
91 None => {
92 let mut rng = rand::rng();
93 let secret_key = iroh::SecretKey::generate(&mut rng);
94 let bytes = secret_key.to_bytes();
95 store.save_key_bytes(&bytes)?;
96 bytes
97 }
98 };
99 let secret_key = iroh::SecretKey::from_bytes(&key_bytes);
100
101 // ── iroh Endpoint ─────────────────────────────────────────────────────
102 // `MemoryLookup` allows callers to pre-populate peer addresses before
103 // gossip-bootstrapping, enabling direct loopback connections without
104 // relay infrastructure (used by integration tests).
105 let memory_lookup = MemoryLookup::new();
106 let endpoint = Endpoint::builder(presets::N0)
107 .secret_key(secret_key)
108 .address_lookup(memory_lookup.clone())
109 .bind()
110 .await
111 .map_err(|e| NodeError::EndpointBind(e.to_string()))?;
112
113 let node_id = NodeIdHex::from_public_key(endpoint.id());
114
115 // ── iroh-blobs filesystem store ───────────────────────────────────────
116 let blob_dir = data_dir.join("iroh-blobs");
117 tokio::fs::create_dir_all(&blob_dir).await?;
118 let fs_store = FsStore::load(&blob_dir)
119 .await
120 .map_err(|e| NodeError::BlobStoreLoad(e.to_string()))?;
121
122 // ── iroh-gossip ───────────────────────────────────────────────────────
123 let gossip = Gossip::builder().spawn(endpoint.clone());
124
125 // ── Router: register protocol handlers ────────────────────────────────
126 // `Router` is `#[must_use]` — the accept loop runs as long as the
127 // handle is alive. It is stored in `IgcIrohNode` so it lives for the
128 // full lifetime of the node.
129 let router = Router::builder(endpoint.clone())
130 .accept(GOSSIP_ALPN, gossip.clone())
131 .accept(
132 iroh_blobs::ALPN,
133 iroh_blobs::BlobsProtocol::new(&fs_store, None),
134 )
135 .spawn();
136
137 // ── Persistent announce-topic subscription ────────────────────────────
138 // Subscribe to the announce topic with no bootstrap peers so the
139 // per-topic HyParView state is created immediately. Remote indexers
140 // that bootstrap from this node via its PublicKey will then have their
141 // JOIN messages accepted and be added to the active view. Without this
142 // subscription, incoming JOINs for an unknown topic are silently dropped
143 // by the gossip actor, so the broadcaster would have no known neighbors
144 // when it later calls `publish()`.
145 let announce_topic = TopicId::from_bytes(announce_topic_id());
146 let (announce_sender, mut announce_receiver) = gossip
147 .subscribe(announce_topic, vec![])
148 .await
149 .map_err(|e| NodeError::GossipSubscribe(e.to_string()))?
150 .split();
151
152 // Drain the receiver in the background to prevent backpressure from
153 // filling the event buffer and closing the subscription.
154 tokio::spawn(async move { while announce_receiver.next().await.is_some() {} });
155
156 tracing::info!(%node_id, data_dir = %data_dir.display(), "igc-net node started");
157
158 Ok(Self {
159 endpoint,
160 fs_store,
161 gossip,
162 store,
163 memory_lookup,
164 _router: router,
165 announce_sender,
166 node_id,
167 })
168 }
169
170 /// Gracefully shut down the node (closes endpoint and router).
171 pub async fn close(&self) {
172 self.endpoint.close().await;
173 }
174
175 /// The node's stable network identity (hex-encoded Ed25519 public key).
176 pub fn node_id(&self) -> &NodeIdHex {
177 &self.node_id
178 }
179
180 /// The node's iroh `PublicKey` (EndpointId) — use this for gossip bootstrap
181 /// when dialling the node directly via iroh.
182 pub fn iroh_node_id(&self) -> iroh::PublicKey {
183 self.endpoint.id()
184 }
185
186 /// The node's current `EndpointAddr` as reported by the iroh endpoint.
187 ///
188 /// Right after `start()` this typically contains wildcard bind addresses
189 /// (`0.0.0.0:PORT`) which are not dialable by remote peers. For loopback
190 /// integration tests use [`loopback_endpoint_addr`] instead.
191 pub fn endpoint_addr(&self) -> EndpointAddr {
192 self.endpoint.addr()
193 }
194
195 /// Build an `EndpointAddr` with a proper `127.0.0.1:PORT` direct address.
196 ///
197 /// Uses the actual bound UDP port from the endpoint and replaces the
198 /// wildcard `0.0.0.0` bind address with the loopback interface. Pass
199 /// the result to a peer's [`add_peer_addr`] in integration tests so that
200 /// gossip-bootstrap can dial over loopback without relay infrastructure.
201 pub fn loopback_endpoint_addr(&self) -> Result<EndpointAddr, NodeError> {
202 let id = self.endpoint.id();
203 let port = self.loopback_port()?;
204 Ok(EndpointAddr::new(id).with_ip_addr(std::net::SocketAddr::from((
205 [127, 0, 0, 1],
206 port,
207 ))))
208 }
209
210 /// Return the node's loopback endpoint as a `"node_id@127.0.0.1:port"` string.
211 ///
212 /// Use this to populate a remote peer's address book (via [`add_peer_addr`])
213 /// for direct loopback connections in tests and private networks that don't
214 /// rely on relay-based discovery.
215 pub fn loopback_addr_str(&self) -> Result<String, NodeError> {
216 let port = self.loopback_port()?;
217 Ok(format!("{}@127.0.0.1:{}", self.node_id(), port))
218 }
219
220 /// Pre-populate this node's address book with a peer's `EndpointAddr`.
221 ///
222 /// After calling this, the node can dial the peer by its `EndpointId`
223 /// alone (e.g., as a gossip bootstrap peer) using the known direct address
224 /// instead of relay-based discovery.
225 pub fn add_peer_addr(&self, addr: EndpointAddr) {
226 self.memory_lookup.add_endpoint_info(addr);
227 }
228
229 /// The persistent announce-topic sender.
230 ///
231 /// Use this to broadcast on the announce topic without creating a new
232 /// gossip subscription.
233 pub(crate) fn announce_sender(&self) -> &GossipSender {
234 &self.announce_sender
235 }
236
237 /// Access the local flat-file store.
238 pub fn store(&self) -> &FlatFileStore {
239 self.store.as_ref()
240 }
241
242 /// Resolve a local read-only filesystem path for a BLAKE3-keyed blob.
243 ///
244 /// Returns `Some(path)` when the blob is present in the flat-file store.
245 /// The caller may read the file directly in read-only mode; mutation must
246 /// go through `publish()` or the store's `put()` method.
247 pub fn resolve_path(&self, igc_hash: &str) -> Result<Option<std::path::PathBuf>, StoreError> {
248 self.store.resolve_path(igc_hash)
249 }
250
251 fn loopback_port(&self) -> Result<u16, NodeError> {
252 self.endpoint
253 .bound_sockets()
254 .into_iter()
255 .find_map(|addr| if addr.is_ipv4() { Some(addr.port()) } else { None })
256 .ok_or(NodeError::NoLoopbackSocket)
257 }
258}