common/peer/
mod.rs

1use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
2use std::path::PathBuf;
3
4use iroh::discovery::pkarr::dht::DhtDiscovery;
5use iroh::{protocol::Router, Endpoint, NodeId};
6use tokio::sync::watch::Receiver as WatchReceiver;
7
8use crate::crypto::SecretKey;
9
10mod blobs_store;
11pub mod jax_protocol;
12
13pub use blobs_store::{BlobsStore, BlobsStoreError};
14pub use jax_protocol::{
15    announce_to_peer, fetch_bucket, ping_peer, BucketStateProvider, JaxProtocol, PingRequest,
16    PingResponse, SyncStatus, JAX_ALPN,
17};
18
19// Re-export iroh types for convenience
20pub use iroh::NodeAddr;
21
22#[derive(Clone, Default)]
23pub struct PeerBuilder {
24    /// the socket addr to expose the peer on
25    ///  if not set, an ephemeral port will be used
26    socket_addr: Option<SocketAddr>,
27    /// the identity of the peer, as a SecretKey
28    secret_key: Option<SecretKey>,
29    // TODO (amiller68): i would like to just inject
30    //  the blobs store, but I think I need it to build the
31    //  router, so that's not possible yet
32    /// the path to the blobs store on the peer's filesystem
33    ///  if not set a temporary directory will be used
34    blobs_store_path: Option<PathBuf>,
35    /// optional state provider for the JAX protocol
36    protocol_state: Option<std::sync::Arc<dyn BucketStateProvider>>,
37}
38
39// TODO (amiller68): proper errors
40impl PeerBuilder {
41    pub fn new() -> Self {
42        PeerBuilder {
43            socket_addr: None,
44            secret_key: None,
45            blobs_store_path: None,
46            protocol_state: None,
47        }
48    }
49
50    pub fn socket_addr(mut self, socket_addr: SocketAddr) -> Self {
51        self.socket_addr = Some(socket_addr);
52        self
53    }
54
55    pub fn secret_key(mut self, secret_key: SecretKey) -> Self {
56        self.secret_key = Some(secret_key);
57        self
58    }
59
60    pub fn blobs_store_path(mut self, path: PathBuf) -> Self {
61        self.blobs_store_path = Some(path);
62        self
63    }
64
65    pub fn protocol_state(mut self, state: std::sync::Arc<dyn BucketStateProvider>) -> Self {
66        self.protocol_state = Some(state);
67        self
68    }
69
70    pub async fn build(self) -> Peer {
71        // set the socket port to unspecified if not set
72        let socket_addr = self
73            .socket_addr
74            .unwrap_or_else(|| SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 0));
75        // generate a new secret key if not set
76        let secret_key = self.secret_key.unwrap_or_else(SecretKey::generate);
77        // and set the blobs store path to a temporary directory if not set
78        let blobs_store_path = self.blobs_store_path.unwrap_or_else(|| {
79            // Create a temporary directory for the blobs store
80            let temp_dir = tempfile::tempdir().expect("failed to create temporary directory");
81            temp_dir.path().to_path_buf()
82        });
83
84        // now get to building
85
86        // Convert the SocketAddr to a SocketAddrV4
87        let addr = SocketAddrV4::new(
88            socket_addr
89                .ip()
90                .to_string()
91                .parse::<Ipv4Addr>()
92                .expect("failed to parse IP address"),
93            socket_addr.port(),
94        );
95
96        // setup our discovery mechanism for our peer
97        let mainline_discovery = DhtDiscovery::builder()
98            .secret_key(secret_key.0.clone())
99            .build()
100            .expect("failed to build mainline discovery");
101
102        // Create the endpoint with our key and discovery
103        let endpoint = Endpoint::builder()
104            .secret_key(secret_key.0.clone())
105            .discovery(mainline_discovery)
106            .bind_addr_v4(addr)
107            .bind()
108            .await
109            .expect("failed to bind ephemeral endpoint");
110
111        // Create the blob store
112        let blob_store = BlobsStore::load(&blobs_store_path)
113            .await
114            .expect("failed to load blob store");
115
116        Peer {
117            blob_store,
118            secret: secret_key,
119            endpoint,
120            blobs_store_path,
121            protocol_state: self.protocol_state,
122        }
123    }
124}
125
126// TODO (amiller68): this can prolly be simpler /
127//  idk if we need all of this, but it'll work for now
128#[derive(Clone)]
129pub struct Peer {
130    blob_store: BlobsStore,
131    secret: SecretKey,
132    endpoint: Endpoint,
133    blobs_store_path: PathBuf,
134    protocol_state: Option<std::sync::Arc<dyn BucketStateProvider>>,
135}
136
137impl Peer {
138    pub fn builder() -> PeerBuilder {
139        PeerBuilder::default()
140    }
141
142    pub fn id(&self) -> NodeId {
143        *self.secret.public()
144    }
145
146    pub fn secret(&self) -> &SecretKey {
147        &self.secret
148    }
149
150    pub fn blobs(&self) -> &BlobsStore {
151        &self.blob_store
152    }
153
154    pub fn blobs_store_path(&self) -> &PathBuf {
155        &self.blobs_store_path
156    }
157
158    pub fn endpoint(&self) -> &Endpoint {
159        &self.endpoint
160    }
161
162    pub async fn spawn(&self, mut shutdown_rx: WatchReceiver<()>) -> anyhow::Result<()> {
163        // clone the blob store inner for the router
164        let inner_blobs = self.blob_store.inner.clone();
165
166        // Build the router against the endpoint -> to our blobs service
167        //  NOTE (amiller68): if you want to extend our iroh capabilities
168        //   with more protocols and handlers, you'd do so here
169        let mut router_builder =
170            Router::builder(self.endpoint.clone()).accept(iroh_blobs::ALPN, inner_blobs);
171
172        // If we have protocol state, register the JAX protocol
173        if let Some(state) = &self.protocol_state {
174            let jax_protocol = JaxProtocol::new(state.clone());
175            router_builder = router_builder.accept(JAX_ALPN, jax_protocol);
176            tracing::info!("JAX protocol registered");
177        }
178
179        let router = router_builder.spawn();
180
181        // Wait for shutdown signal
182        let _ = shutdown_rx.changed().await;
183
184        router.shutdown().await?;
185        Ok(())
186    }
187}