common/peer/
mod.rs

1use iroh::protocol::Router;
2use tokio::sync::watch::Receiver as WatchReceiver;
3
4mod blobs_store;
5mod peer_builder;
6mod peer_inner;
7mod protocol;
8pub mod sync;
9
10pub use blobs_store::{BlobsStore, BlobsStoreError};
11pub use protocol::{PingReplyStatus, ALPN};
12pub use sync::{SyncJob, SyncProvider, SyncTarget};
13
14pub use iroh::NodeAddr;
15
16pub use peer_builder::PeerBuilder;
17pub use peer_inner::Peer;
18
19/// Spawn the peer with protocol router
20///
21/// This starts the iroh protocol router for handling incoming connections.
22/// The peer's sync provider is responsible for managing its own background workers.
23///
24/// # Arguments
25///
26/// * `peer` - The peer instance to run
27/// * `shutdown_rx` - Watch receiver for shutdown signal
28pub async fn spawn<L>(peer: Peer<L>, mut shutdown_rx: WatchReceiver<()>) -> Result<(), PeerError>
29where
30    L: crate::bucket_log::BucketLogProvider + Clone + Send + Sync + std::fmt::Debug + 'static,
31    L::Error: std::fmt::Display + std::error::Error + Send + Sync + 'static,
32{
33    let node_id = peer.id();
34    tracing::info!(peer_id = %node_id, "Starting peer");
35
36    // Extract what we need for the router
37    let inner_blobs = peer.blobs().inner.clone();
38    let endpoint = peer.endpoint().clone();
39    let peer_for_router = peer.clone();
40
41    // Build the protocol router with iroh-blobs and our custom protocol
42    let router_builder = Router::builder(endpoint)
43        .accept(iroh_blobs::ALPN, inner_blobs)
44        .accept(ALPN, peer_for_router);
45
46    let router = router_builder.spawn();
47
48    tracing::info!(peer_id = %node_id, "Peer protocol router started");
49
50    // Wait for shutdown signal
51    let _ = shutdown_rx.changed().await;
52    tracing::info!(peer_id = %node_id, "Shutdown signal received, stopping peer");
53
54    // Shutdown the router (this closes the endpoint and stops accepting connections)
55    router
56        .shutdown()
57        .await
58        .map_err(|e| PeerError::RouterShutdown(e.into()))?;
59
60    tracing::info!(peer_id = %node_id, "Peer stopped");
61    Ok(())
62}
63
64#[derive(Debug, thiserror::Error)]
65pub enum PeerError {
66    #[error("failed to shutdown router: {0}")]
67    RouterShutdown(anyhow::Error),
68}