Skip to main content

igc_net/
node.rs

1//! igc-net network node.
2//!
3//! `IgcIrohNode` manages the iroh endpoint, iroh-blobs store, gossip,
4//! and the local flat-file store.  It is the runtime handle passed to
5//! all publish and indexer operations.
6
7use std::path::PathBuf;
8use std::sync::Arc;
9
10use futures::StreamExt;
11use iroh::Endpoint;
12use iroh::EndpointAddr;
13use iroh::address_lookup::memory::MemoryLookup;
14use iroh::endpoint::{Connection, presets};
15use iroh::protocol::{AcceptError, ProtocolHandler, Router};
16use iroh_blobs::store::fs::FsStore;
17use iroh_gossip::api::{Event as GossipEvent, GossipSender};
18use iroh_gossip::net::{GOSSIP_ALPN, Gossip};
19use iroh_gossip::proto::TopicId;
20
21use crate::governance::{
22    GovernanceRecord, GovernanceStore, GovernanceStoreError, PilotAuthDidGossipAnnouncement,
23    PilotAuthDidState, PilotAuthDidSyncRequest, PilotAuthDidSyncResponse,
24    PilotAuthDidWorkflowError, issue_initial_pilot_auth_did_record, rotate_pilot_auth_did_record,
25};
26use crate::id::NodeIdHex;
27use crate::id::PilotId;
28use crate::keys::{
29    MultiPilotKeyStore, PilotCredentialStore, PilotIdentity, PilotKeyStore, PilotKeyStoreError,
30    PilotProfile, PilotPublicIdentityWithProfile,
31};
32use crate::store::{FlatFileStore, StoreError};
33use crate::topic::{announce_topic_id, governance_topic_id, pilot_auth_did_governance_topic_id};
34
35const GOVERNANCE_SYNC_ALPN: &[u8] = b"igc-net/governance-sync/v1";
36const GOVERNANCE_SYNC_MAX_REQUEST_BYTES: usize = 64 * 1024;
37const GOVERNANCE_SYNC_MAX_RESPONSE_BYTES: usize = 1024 * 1024;
38
39// ── Error type ────────────────────────────────────────────────────────────────
40
41#[derive(Debug, thiserror::Error)]
42pub enum NodeError {
43    #[error("store: {0}")]
44    Store(#[from] StoreError),
45    #[error("pilot keys: {0}")]
46    PilotKeys(#[from] PilotKeyStoreError),
47    #[error("governance: {0}")]
48    Governance(#[from] GovernanceStoreError),
49    #[error("pilot-auth-did workflow: {0}")]
50    PilotAuthDidWorkflow(#[from] PilotAuthDidWorkflowError),
51    #[error("pilot-auth-did sync: {0}")]
52    PilotAuthDidSync(#[from] crate::governance::PilotAuthDidSyncError),
53    #[error("pilot-auth-did rotation governance persist failed after key replacement: {0}")]
54    PilotAuthDidRotationPersistFailed(GovernanceStoreError),
55    #[error(
56        "pilot-auth-did rotation governance persist failed after key replacement ({persist}); rollback also failed ({rollback})"
57    )]
58    PilotAuthDidRotationPersistRollback {
59        persist: GovernanceStoreError,
60        rollback: PilotKeyStoreError,
61    },
62    #[error("I/O: {0}")]
63    Io(#[from] std::io::Error),
64    #[error("failed to bind iroh endpoint: {0}")]
65    EndpointBind(String),
66    #[error("failed to load iroh blob store: {0}")]
67    BlobStoreLoad(String),
68    #[error("failed to subscribe to announce topic: {0}")]
69    GossipSubscribe(String),
70    #[error("failed to subscribe to pilot-auth-did governance topic: {0}")]
71    GovernanceGossipSubscribe(String),
72    #[error("failed to join pilot-auth-did governance topic peers: {0}")]
73    GovernanceGossipJoin(String),
74    #[error("failed to broadcast pilot-auth-did governance update: {0}")]
75    GovernanceGossipBroadcast(String),
76    #[error("pilot-auth-did network sync transport failed: {0}")]
77    GovernanceSyncTransport(String),
78    #[error("pilot-auth-did network sync JSON: {0}")]
79    GovernanceSyncJson(#[from] serde_json::Error),
80    #[error("no IPv4 loopback socket is bound for this node")]
81    NoLoopbackSocket,
82}
83
84// ── IgcIrohNode ───────────────────────────────────────────────────────────────
85
86/// Runtime handle for an igc-net node.
87///
88/// Holds the iroh endpoint, iroh-blobs filesystem store, gossip handler,
89/// and the local flat-file store.
90pub struct IgcIrohNode {
91    pub(crate) endpoint: Endpoint,
92    pub(crate) fs_store: FsStore,
93    pub(crate) gossip: Gossip,
94    pub(crate) store: Arc<FlatFileStore>,
95    memory_lookup: MemoryLookup,
96    /// Holds the protocol router alive.  `Router` is `#[must_use]` — dropping
97    /// it aborts the accept loop for all registered ALPNs.
98    _router: Router,
99    /// Persistent announce-topic subscription.
100    ///
101    /// iroh-gossip only tracks HyParView state for a topic once a local
102    /// subscriber exists.  Without this subscription, incoming JOIN messages
103    /// from remote peers are silently discarded because the per-topic state
104    /// map entry is absent.  Keeping the sender alive ensures the topic state
105    /// exists from node start-up onwards, so remote peers can join the swarm
106    /// before the first `publish()` call.
107    ///
108    /// Also used by `publish()` to broadcast announcements without creating a
109    /// new subscription per call.
110    announce_sender: GossipSender,
111    /// Persistent pilot-auth-did governance topic sender.
112    ///
113    /// Local issuance/rotation broadcasts lightweight governance update
114    /// announcements on this topic. Receivers then use the pull-sync transport
115    /// to fetch any missing records from the delivering peer.
116    governance_sender: GossipSender,
117    /// Persistent normative governance topic sender.
118    governance_record_sender: GossipSender,
119    node_id: NodeIdHex,
120    node_key_bytes: [u8; 32],
121    multi_pilot_keys: MultiPilotKeyStore,
122    pilot_credentials: PilotCredentialStore,
123    governance: GovernanceStore,
124}
125
126#[derive(Debug, Clone)]
127struct GovernanceSyncProtocol {
128    governance: GovernanceStore,
129}
130
131impl ProtocolHandler for GovernanceSyncProtocol {
132    async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
133        let (mut send, mut recv) = connection.accept_bi().await?;
134        let request_bytes = recv
135            .read_to_end(GOVERNANCE_SYNC_MAX_REQUEST_BYTES)
136            .await
137            .map_err(AcceptError::from_err)?;
138        let request: PilotAuthDidSyncRequest =
139            serde_json::from_slice(&request_bytes).map_err(AcceptError::from_err)?;
140        let response = self
141            .governance
142            .prepare_pilot_auth_did_sync(&request)
143            .map_err(AcceptError::from_err)?;
144        let response_bytes = serde_json::to_vec(&response).map_err(AcceptError::from_err)?;
145        send.write_all(&response_bytes)
146            .await
147            .map_err(AcceptError::from_err)?;
148        send.finish().map_err(AcceptError::from_err)?;
149        connection.closed().await;
150        Ok(())
151    }
152}
153
154async fn request_pilot_auth_did_sync_from_peer_via(
155    endpoint: &Endpoint,
156    peer: iroh::PublicKey,
157    request: &PilotAuthDidSyncRequest,
158) -> Result<PilotAuthDidSyncResponse, NodeError> {
159    let conn = endpoint
160        .connect(peer, GOVERNANCE_SYNC_ALPN)
161        .await
162        .map_err(|err| NodeError::GovernanceSyncTransport(err.to_string()))?;
163    let (mut send, mut recv) = conn
164        .open_bi()
165        .await
166        .map_err(|err| NodeError::GovernanceSyncTransport(err.to_string()))?;
167    let request_bytes = serde_json::to_vec(request)?;
168    send.write_all(&request_bytes)
169        .await
170        .map_err(|err| NodeError::GovernanceSyncTransport(err.to_string()))?;
171    send.finish()
172        .map_err(|err| NodeError::GovernanceSyncTransport(err.to_string()))?;
173    let response_bytes = recv
174        .read_to_end(GOVERNANCE_SYNC_MAX_RESPONSE_BYTES)
175        .await
176        .map_err(|err| NodeError::GovernanceSyncTransport(err.to_string()))?;
177    let response: PilotAuthDidSyncResponse = serde_json::from_slice(&response_bytes)?;
178    response.validate()?;
179    conn.close(0u32.into(), b"pilot-auth-did-sync-complete");
180    Ok(response)
181}
182
183async fn broadcast_pilot_auth_did_gossip_announcement(
184    sender: &GossipSender,
185    announcement: &PilotAuthDidGossipAnnouncement,
186) -> Result<(), NodeError> {
187    let payload = serde_json::to_vec(announcement)?;
188    sender
189        .broadcast(payload.into())
190        .await
191        .map_err(|err| NodeError::GovernanceGossipBroadcast(err.to_string()))
192}
193
194async fn broadcast_governance_record<T: serde::Serialize>(
195    sender: &GossipSender,
196    record: &T,
197) -> Result<(), NodeError> {
198    let payload = serde_json::to_vec(record)?;
199    sender
200        .broadcast(payload.into())
201        .await
202        .map_err(|err| NodeError::GovernanceGossipBroadcast(err.to_string()))
203}
204
205async fn sync_pilot_auth_did_from_gossip_announcement(
206    endpoint: &Endpoint,
207    governance: &GovernanceStore,
208    governance_sender: &GossipSender,
209    delivered_from: iroh::PublicKey,
210    announcement: &PilotAuthDidGossipAnnouncement,
211) -> Result<usize, NodeError> {
212    let request = governance.build_pilot_auth_did_sync_request(&announcement.pilot_id)?;
213    if request.knows(&announcement.record_id) {
214        return Ok(0);
215    }
216
217    let response =
218        request_pilot_auth_did_sync_from_peer_via(endpoint, delivered_from, &request).await?;
219    let applied = governance.apply_pilot_auth_did_sync(&response)?;
220
221    if applied == 0 {
222        return Ok(0);
223    }
224
225    for record in response
226        .records
227        .iter()
228        .filter(|record| !request.knows(&record.record_id))
229    {
230        let announcement = PilotAuthDidGossipAnnouncement::from_record(record);
231        broadcast_pilot_auth_did_gossip_announcement(governance_sender, &announcement).await?;
232    }
233
234    Ok(applied)
235}
236
237impl IgcIrohNode {
238    /// Build and start a node rooted at `data_dir`.
239    ///
240    /// - Loads or generates the Ed25519 key from `data_dir/node.key`.
241    /// - Initializes the separate `pilot-keys/` directory for pilot identity
242    ///   custody; pilot keys remain distinct from the node transport key.
243    /// - Opens `FlatFileStore` at `data_dir`.
244    /// - Binds an iroh `Endpoint`, starts `iroh-blobs` and `iroh-gossip`.
245    /// - Subscribes to the announce gossip topic so remote peers can join
246    ///   the swarm immediately (HyParView state must exist for this to work).
247    pub async fn start(data_dir: impl Into<PathBuf>) -> Result<Self, NodeError> {
248        let data_dir = data_dir.into();
249
250        // ── Flat-file store ───────────────────────────────────────────────────
251        let store = Arc::new(FlatFileStore::open(data_dir.clone()));
252        store.init().await?;
253
254        // ── Ed25519 key ───────────────────────────────────────────────────────
255        let key_bytes = match store.load_key_bytes()? {
256            Some(b) => b,
257            None => {
258                let mut rng = rand::rng();
259                let secret_key = iroh::SecretKey::generate(&mut rng);
260                let bytes = secret_key.to_bytes();
261                store.save_key_bytes(&bytes)?;
262                bytes
263            }
264        };
265        let secret_key = iroh::SecretKey::from_bytes(&key_bytes);
266        let multi_pilot_keys = MultiPilotKeyStore::for_data_dir(&data_dir);
267        multi_pilot_keys.init()?;
268        let pilot_credentials = PilotCredentialStore::for_data_dir(&data_dir);
269        pilot_credentials.init()?;
270        let governance = GovernanceStore::for_data_dir(&data_dir);
271        governance.init()?;
272
273        // ── iroh Endpoint ─────────────────────────────────────────────────────
274        // `MemoryLookup` allows callers to pre-populate peer addresses before
275        // gossip-bootstrapping, enabling direct loopback connections without
276        // relay infrastructure (used by integration tests).
277        let memory_lookup = MemoryLookup::new();
278        let endpoint = Endpoint::builder(presets::N0)
279            .secret_key(secret_key)
280            .address_lookup(memory_lookup.clone())
281            .bind()
282            .await
283            .map_err(|e| NodeError::EndpointBind(e.to_string()))?;
284
285        let node_id = NodeIdHex::from_public_key(endpoint.id());
286
287        // ── iroh-blobs filesystem store ───────────────────────────────────────
288        let blob_dir = data_dir.join("iroh-blobs");
289        tokio::fs::create_dir_all(&blob_dir).await?;
290        let fs_store = FsStore::load(&blob_dir)
291            .await
292            .map_err(|e| NodeError::BlobStoreLoad(e.to_string()))?;
293
294        // ── iroh-gossip ───────────────────────────────────────────────────────
295        let gossip = Gossip::builder().spawn(endpoint.clone());
296
297        // ── Router: register protocol handlers ────────────────────────────────
298        // `Router` is `#[must_use]` — the accept loop runs as long as the
299        // handle is alive.  It is stored in `IgcIrohNode` so it lives for the
300        // full lifetime of the node.
301        let governance_sync = GovernanceSyncProtocol {
302            governance: governance.clone(),
303        };
304        let router = Router::builder(endpoint.clone())
305            .accept(GOSSIP_ALPN, gossip.clone())
306            .accept(
307                iroh_blobs::ALPN,
308                iroh_blobs::BlobsProtocol::new(&fs_store, None),
309            )
310            .accept(GOVERNANCE_SYNC_ALPN, governance_sync)
311            .spawn();
312
313        // ── Persistent announce-topic subscription ────────────────────────────
314        // Subscribe to the announce topic with no bootstrap peers so the
315        // per-topic HyParView state is created immediately.  Remote indexers
316        // that bootstrap from this node via its PublicKey will then have their
317        // JOIN messages accepted and be added to the active view.  Without this
318        // subscription, incoming JOINs for an unknown topic are silently dropped
319        // by the gossip actor, so the broadcaster would have no known neighbors
320        // when it later calls `publish()`.
321        let announce_topic = TopicId::from_bytes(announce_topic_id());
322        let (announce_sender, mut announce_receiver) = gossip
323            .subscribe(announce_topic, vec![])
324            .await
325            .map_err(|e| NodeError::GossipSubscribe(e.to_string()))?
326            .split();
327
328        // Drain the receiver in the background to prevent backpressure from
329        // filling the event buffer and closing the subscription.
330        tokio::spawn(async move { while announce_receiver.next().await.is_some() {} });
331
332        // ── Persistent pilot-auth-did governance subscription ─────────────────
333        let governance_topic = TopicId::from_bytes(pilot_auth_did_governance_topic_id());
334        let (governance_sender, mut governance_receiver) = gossip
335            .subscribe(governance_topic, vec![])
336            .await
337            .map_err(|e| NodeError::GovernanceGossipSubscribe(e.to_string()))?
338            .split();
339        let governance_sender_task = governance_sender.clone();
340        let governance_store = governance.clone();
341        let governance_endpoint = endpoint.clone();
342        let local_endpoint_id = endpoint.id();
343        tokio::spawn(async move {
344            while let Some(event) = governance_receiver.next().await {
345                match event {
346                    Ok(GossipEvent::Received(message)) => {
347                        if message.delivered_from == local_endpoint_id {
348                            continue;
349                        }
350                        let announcement: PilotAuthDidGossipAnnouncement =
351                            match serde_json::from_slice(&message.content) {
352                                Ok(announcement) => announcement,
353                                Err(err) => {
354                                    tracing::warn!(
355                                        peer = %message.delivered_from,
356                                        error = %err,
357                                        "ignoring invalid pilot-auth-did governance gossip payload"
358                                    );
359                                    continue;
360                                }
361                            };
362                        match sync_pilot_auth_did_from_gossip_announcement(
363                            &governance_endpoint,
364                            &governance_store,
365                            &governance_sender_task,
366                            message.delivered_from,
367                            &announcement,
368                        )
369                        .await
370                        {
371                            Ok(0) => {}
372                            Ok(applied) => tracing::info!(
373                                peer = %message.delivered_from,
374                                pilot_id = %announcement.pilot_id,
375                                record_id = %announcement.record_id,
376                                applied,
377                                "applied pilot-auth-did governance update from gossip"
378                            ),
379                            Err(err) => tracing::warn!(
380                                peer = %message.delivered_from,
381                                pilot_id = %announcement.pilot_id,
382                                record_id = %announcement.record_id,
383                                error = %err,
384                                "failed to apply pilot-auth-did governance update from gossip"
385                            ),
386                        }
387                    }
388                    Ok(GossipEvent::NeighborUp(peer)) => {
389                        tracing::debug!(%peer, "pilot-auth-did governance neighbor up");
390                    }
391                    Ok(GossipEvent::NeighborDown(peer)) => {
392                        tracing::debug!(%peer, "pilot-auth-did governance neighbor down");
393                    }
394                    Ok(GossipEvent::Lagged) => {
395                        tracing::warn!(
396                            "pilot-auth-did governance gossip receiver lagged; some updates may require catch-up"
397                        );
398                    }
399                    Err(err) => {
400                        tracing::warn!(
401                            error = %err,
402                            "pilot-auth-did governance gossip subscription closed"
403                        );
404                        break;
405                    }
406                }
407            }
408        });
409
410        // ── Persistent normative governance subscription ─────────────────────
411        let governance_record_topic = TopicId::from_bytes(governance_topic_id());
412        let (governance_record_sender, mut governance_record_receiver) = gossip
413            .subscribe(governance_record_topic, vec![])
414            .await
415            .map_err(|e| NodeError::GovernanceGossipSubscribe(e.to_string()))?
416            .split();
417        let governance_record_store = governance.clone();
418        let governance_record_local_endpoint_id = endpoint.id();
419        tokio::spawn(async move {
420            while let Some(event) = governance_record_receiver.next().await {
421                match event {
422                    Ok(GossipEvent::Received(message)) => {
423                        if message.delivered_from == governance_record_local_endpoint_id {
424                            continue;
425                        }
426                        let record = match GovernanceRecord::from_slice(&message.content) {
427                            Ok(record) => record,
428                            Err(err) => {
429                                tracing::warn!(
430                                    peer = %message.delivered_from,
431                                    error = %err,
432                                    "ignoring invalid governance gossip payload"
433                                );
434                                continue;
435                            }
436                        };
437                        match governance_record_store.apply_governance_record(&record) {
438                            Ok(true) => tracing::info!(
439                                peer = %message.delivered_from,
440                                "applied governance record from gossip"
441                            ),
442                            Ok(false) => {}
443                            Err(err) => tracing::warn!(
444                                peer = %message.delivered_from,
445                                error = %err,
446                                "ignoring invalid governance gossip payload"
447                            ),
448                        }
449                    }
450                    Ok(GossipEvent::NeighborUp(peer)) => {
451                        tracing::debug!(%peer, "governance neighbor up");
452                    }
453                    Ok(GossipEvent::NeighborDown(peer)) => {
454                        tracing::debug!(%peer, "governance neighbor down");
455                    }
456                    Ok(GossipEvent::Lagged) => {
457                        tracing::warn!(
458                            "governance gossip receiver lagged; some updates may require catch-up"
459                        );
460                    }
461                    Err(err) => {
462                        tracing::warn!(error = %err, "governance gossip subscription closed");
463                        break;
464                    }
465                }
466            }
467        });
468
469        tracing::info!(%node_id, data_dir = %data_dir.display(), "igc-net node started");
470
471        Ok(Self {
472            endpoint,
473            fs_store,
474            gossip,
475            store,
476            memory_lookup,
477            _router: router,
478            announce_sender,
479            governance_sender,
480            governance_record_sender,
481            node_id,
482            node_key_bytes: key_bytes,
483            multi_pilot_keys,
484            pilot_credentials,
485            governance,
486        })
487    }
488
489    /// Gracefully shut down the node (closes endpoint and router).
490    pub async fn close(&self) {
491        self.endpoint.close().await;
492    }
493
494    /// The node's stable network identity (hex-encoded Ed25519 public key).
495    pub fn node_id(&self) -> &NodeIdHex {
496        &self.node_id
497    }
498
499    /// The node's iroh `PublicKey` (EndpointId) — use this for gossip bootstrap
500    /// when dialling the node directly via iroh.
501    pub fn iroh_node_id(&self) -> iroh::PublicKey {
502        self.endpoint.id()
503    }
504
505    /// The node's current `EndpointAddr` as reported by the iroh endpoint.
506    ///
507    /// Right after `start()` this typically contains wildcard bind addresses
508    /// (`0.0.0.0:PORT`) which are not dialable by remote peers.  For loopback
509    /// integration tests use [`loopback_endpoint_addr`] instead.
510    pub fn endpoint_addr(&self) -> EndpointAddr {
511        self.endpoint.addr()
512    }
513
514    /// Build an `EndpointAddr` with a proper `127.0.0.1:PORT` direct address.
515    ///
516    /// Uses the actual bound UDP port from the endpoint and replaces the
517    /// wildcard `0.0.0.0` bind address with the loopback interface.  Pass
518    /// the result to a peer's [`add_peer_addr`] in integration tests so that
519    /// gossip-bootstrap can dial over loopback without relay infrastructure.
520    pub fn loopback_endpoint_addr(&self) -> Result<EndpointAddr, NodeError> {
521        let id = self.endpoint.id();
522        let port = self.loopback_port()?;
523        Ok(EndpointAddr::new(id).with_ip_addr(std::net::SocketAddr::from(([127, 0, 0, 1], port))))
524    }
525
526    /// Return the node's loopback endpoint as a `"node_id@127.0.0.1:port"` string.
527    ///
528    /// Use this to populate a remote peer's address book (via [`add_peer_addr`])
529    /// for direct loopback connections in tests and private networks that don't
530    /// rely on relay-based discovery.
531    pub fn loopback_addr_str(&self) -> Result<String, NodeError> {
532        let port = self.loopback_port()?;
533        Ok(format!("{}@127.0.0.1:{}", self.node_id(), port))
534    }
535
536    /// Pre-populate this node's address book with a peer's `EndpointAddr`.
537    ///
538    /// After calling this, the node can dial the peer by its `EndpointId`
539    /// alone (e.g., as a gossip bootstrap peer) using the known direct address
540    /// instead of relay-based discovery.
541    pub fn add_peer_addr(&self, addr: EndpointAddr) {
542        self.memory_lookup.add_endpoint_info(addr);
543    }
544
545    /// Join known peers on the pilot-auth-did governance gossip topic.
546    ///
547    /// Call this after populating peer addresses for direct/private networks.
548    /// Receiving a governance gossip announcement triggers a pull-sync against
549    /// the delivering peer over the dedicated governance-sync transport.
550    pub async fn join_pilot_auth_did_gossip_peers(
551        &self,
552        peers: Vec<iroh::PublicKey>,
553    ) -> Result<(), NodeError> {
554        let peers = peers
555            .into_iter()
556            .filter(|peer| *peer != self.iroh_node_id())
557            .collect::<Vec<_>>();
558        if peers.is_empty() {
559            return Ok(());
560        }
561        self.governance_sender
562            .join_peers(peers)
563            .await
564            .map_err(|err| NodeError::GovernanceGossipJoin(err.to_string()))
565    }
566
567    /// Join known peers on the normative governance gossip topic.
568    ///
569    /// Call this after populating peer addresses for direct/private networks.
570    /// Received records are validated, persisted idempotently, and then used by
571    /// local governance state resolution.
572    pub async fn join_governance_gossip_peers(
573        &self,
574        peers: Vec<iroh::PublicKey>,
575    ) -> Result<(), NodeError> {
576        let peers = peers
577            .into_iter()
578            .filter(|peer| *peer != self.iroh_node_id())
579            .collect::<Vec<_>>();
580        if peers.is_empty() {
581            return Ok(());
582        }
583        self.governance_record_sender
584            .join_peers(peers)
585            .await
586            .map_err(|err| NodeError::GovernanceGossipJoin(err.to_string()))
587    }
588
589    /// The persistent announce-topic sender.
590    ///
591    /// Use this to broadcast on the announce topic without creating a new
592    /// gossip subscription.
593    pub(crate) fn announce_sender(&self) -> &GossipSender {
594        &self.announce_sender
595    }
596
597    /// Broadcast a full governance record on the normative governance topic.
598    ///
599    /// Callers must persist the record locally before broadcasting so a local
600    /// restart does not lose authority that was already advertised.
601    pub async fn broadcast_governance_record<T: serde::Serialize>(
602        &self,
603        record: &T,
604    ) -> Result<(), NodeError> {
605        broadcast_governance_record(&self.governance_record_sender, record).await
606    }
607
608    async fn broadcast_pilot_auth_did_update(
609        &self,
610        record: &crate::PilotAuthDidRecord,
611    ) -> Result<(), NodeError> {
612        let announcement = PilotAuthDidGossipAnnouncement::from_record(record);
613        broadcast_governance_record(&self.governance_record_sender, record).await?;
614        broadcast_pilot_auth_did_gossip_announcement(&self.governance_sender, &announcement).await
615    }
616
617    /// Access the local flat-file store.
618    pub fn store(&self) -> &FlatFileStore {
619        self.store.as_ref()
620    }
621
622    /// Generate a new registered pilot identity in the multi-pilot key store.
623    pub fn generate_pilot_identity(
624        &self,
625        display_name: impl Into<String>,
626        country: Option<String>,
627    ) -> Result<PilotIdentity, NodeError> {
628        Ok(self
629            .multi_pilot_keys
630            .generate_pilot(display_name, country, &self.node_secret_key())?)
631    }
632
633    /// Register a pilot with a local portal credential and publish its initial auth DID.
634    pub async fn register_pilot_identity(
635        &self,
636        display_name: impl Into<String>,
637        country: Option<String>,
638        access_pin: &str,
639        created_at: impl Into<String>,
640    ) -> Result<PilotIdentity, NodeError> {
641        let identity = self.generate_pilot_identity(display_name, country)?;
642        self.pilot_credentials
643            .set_credential(&identity.pilot_id(), access_pin)?;
644        let record = issue_initial_pilot_auth_did_record(&self.governance, &identity, created_at)?;
645        self.governance.persist_pilot_auth_did_record(&record)?;
646        self.broadcast_pilot_auth_did_update(&record).await?;
647        Ok(identity)
648    }
649
650    /// Load a registered pilot identity by stable `pilot_id`.
651    pub fn load_registered_pilot_identity(
652        &self,
653        pilot_id: &PilotId,
654    ) -> Result<Option<PilotIdentity>, NodeError> {
655        Ok(self
656            .multi_pilot_keys
657            .load_pilot(pilot_id, &self.node_secret_key())?)
658    }
659
660    /// List registered pilots without exposing private key material.
661    pub fn list_registered_pilots(&self) -> Result<Vec<PilotPublicIdentityWithProfile>, NodeError> {
662        Ok(self.multi_pilot_keys.list_pilots(&self.node_secret_key())?)
663    }
664
665    /// Load registered pilot profile metadata.
666    pub fn load_registered_pilot_profile(
667        &self,
668        pilot_id: &PilotId,
669    ) -> Result<Option<PilotProfile>, NodeError> {
670        Ok(self.multi_pilot_keys.load_profile(pilot_id)?)
671    }
672
673    /// Verify a registered pilot's local portal credential.
674    pub fn verify_pilot_credential(
675        &self,
676        pilot_id: &PilotId,
677        access_pin: &str,
678    ) -> Result<bool, NodeError> {
679        Ok(self
680            .pilot_credentials
681            .verify_credential(pilot_id, access_pin)?)
682    }
683
684    /// Return the per-pilot store used for pilot-auth-DID rotation.
685    pub fn registered_pilot_store(&self, pilot_id: &PilotId) -> PilotKeyStore {
686        self.multi_pilot_keys.pilot_store(pilot_id)
687    }
688
689    /// Create and persist the initial pilot-auth-did-record for a registered pilot.
690    pub async fn issue_initial_registered_pilot_auth_did_record(
691        &self,
692        pilot_id: &PilotId,
693        created_at: impl Into<String>,
694    ) -> Result<crate::PilotAuthDidRecord, NodeError> {
695        let identity = self
696            .load_registered_pilot_identity(pilot_id)?
697            .ok_or(PilotKeyStoreError::MissingPilotIdentity)?;
698        let record = issue_initial_pilot_auth_did_record(&self.governance, &identity, created_at)?;
699        self.governance.persist_pilot_auth_did_record(&record)?;
700        self.broadcast_pilot_auth_did_update(&record).await?;
701        Ok(record)
702    }
703
704    /// Rotate the active pilot_auth_did key for a registered pilot.
705    pub async fn rotate_registered_pilot_auth_did(
706        &self,
707        pilot_id: &PilotId,
708        created_at: impl Into<String>,
709    ) -> Result<crate::PilotAuthDidRecord, NodeError> {
710        let current_identity = self
711            .load_registered_pilot_identity(pilot_id)?
712            .ok_or(PilotKeyStoreError::MissingPilotIdentity)?;
713        let pilot_store = self.registered_pilot_store(pilot_id);
714        let next_active_pilot_auth_secret_key =
715            pilot_store.generate_next_active_pilot_auth_secret_key(&self.node_secret_key())?;
716        let record = rotate_pilot_auth_did_record(
717            &self.governance,
718            &current_identity,
719            &next_active_pilot_auth_secret_key,
720            created_at,
721        )?;
722        pilot_store.replace_active_pilot_auth(
723            &self.node_secret_key(),
724            &next_active_pilot_auth_secret_key,
725        )?;
726        if let Err(persist_err) = self.governance.persist_pilot_auth_did_record(&record) {
727            match pilot_store.replace_active_pilot_auth(
728                &self.node_secret_key(),
729                &current_identity.active_pilot_auth_secret_key(),
730            ) {
731                Ok(_) => return Err(NodeError::PilotAuthDidRotationPersistFailed(persist_err)),
732                Err(rollback_err) => {
733                    return Err(NodeError::PilotAuthDidRotationPersistRollback {
734                        persist: persist_err,
735                        rollback: rollback_err,
736                    });
737                }
738            }
739        }
740        self.broadcast_pilot_auth_did_update(&record).await?;
741        Ok(record)
742    }
743
744    /// Access the governance store that persists identity governance records.
745    pub fn governance_store(&self) -> &GovernanceStore {
746        &self.governance
747    }
748
749    /// Resolve the current pilot-auth-DID state using local governance history.
750    pub fn resolve_pilot_auth_did_state(
751        &self,
752        pilot_id: &PilotId,
753    ) -> Result<PilotAuthDidState, NodeError> {
754        Ok(self.governance.resolve_pilot_auth_did_state(pilot_id)?)
755    }
756
757    /// Build a pull-style catch-up response for a peer's known pilot-auth-DID history.
758    pub fn prepare_pilot_auth_did_sync(
759        &self,
760        request: &PilotAuthDidSyncRequest,
761    ) -> Result<PilotAuthDidSyncResponse, NodeError> {
762        Ok(self.governance.prepare_pilot_auth_did_sync(request)?)
763    }
764
765    /// Build a pull-style catch-up request from the node's full local pilot-auth-DID history.
766    pub fn build_pilot_auth_did_sync_request(
767        &self,
768        pilot_id: &PilotId,
769    ) -> Result<PilotAuthDidSyncRequest, NodeError> {
770        Ok(self
771            .governance
772            .build_pilot_auth_did_sync_request(pilot_id)?)
773    }
774
775    /// Apply a pulled batch of pilot-auth-DID governance records to local storage.
776    pub fn apply_pilot_auth_did_sync(
777        &self,
778        response: &PilotAuthDidSyncResponse,
779    ) -> Result<usize, NodeError> {
780        Ok(self.governance.apply_pilot_auth_did_sync(response)?)
781    }
782
783    /// Request a peer's catch-up response over the governance-sync transport.
784    pub async fn request_pilot_auth_did_sync_from_peer(
785        &self,
786        peer: iroh::PublicKey,
787        request: &PilotAuthDidSyncRequest,
788    ) -> Result<PilotAuthDidSyncResponse, NodeError> {
789        request_pilot_auth_did_sync_from_peer_via(&self.endpoint, peer, request).await
790    }
791
792    /// Pull pilot-auth-DID governance records for `pilot_id` from a peer and apply them locally.
793    pub async fn sync_pilot_auth_did_from_peer(
794        &self,
795        peer: iroh::PublicKey,
796        pilot_id: &PilotId,
797    ) -> Result<usize, NodeError> {
798        let request = self.build_pilot_auth_did_sync_request(pilot_id)?;
799        let response = self
800            .request_pilot_auth_did_sync_from_peer(peer, &request)
801            .await?;
802        self.apply_pilot_auth_did_sync(&response)
803    }
804
805    /// Resolve a local read-only filesystem path for a BLAKE3-keyed blob.
806    ///
807    /// Returns `Some(path)` when the blob is present in the flat-file store.
808    /// The caller may read the file directly in read-only mode; mutation must
809    /// go through `publish()` or the store's `put()` method.
810    pub fn resolve_path(&self, igc_hash: &str) -> Result<Option<std::path::PathBuf>, StoreError> {
811        self.store.resolve_path(igc_hash)
812    }
813
814    fn loopback_port(&self) -> Result<u16, NodeError> {
815        self.endpoint
816            .bound_sockets()
817            .into_iter()
818            .find_map(|addr| {
819                if addr.is_ipv4() {
820                    Some(addr.port())
821                } else {
822                    None
823                }
824            })
825            .ok_or(NodeError::NoLoopbackSocket)
826    }
827
828    pub(crate) fn node_secret_key(&self) -> iroh::SecretKey {
829        iroh::SecretKey::from_bytes(&self.node_key_bytes)
830    }
831}