1use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
2use std::path::PathBuf;
3
4use iroh::discovery::pkarr::dht::DhtDiscovery;
5use iroh::{protocol::Router, Endpoint, NodeId};
6use tokio::sync::watch::Receiver as WatchReceiver;
7
8use crate::crypto::SecretKey;
9
10mod blobs_store;
11pub mod jax_protocol;
12
13pub use blobs_store::{BlobsStore, BlobsStoreError};
14pub use jax_protocol::{
15 announce_to_peer, fetch_bucket, ping_peer, BucketStateProvider, JaxProtocol, PingRequest,
16 PingResponse, SyncStatus, JAX_ALPN,
17};
18
19pub use iroh::NodeAddr;
21
22#[derive(Clone, Default)]
23pub struct PeerBuilder {
24 socket_addr: Option<SocketAddr>,
27 secret_key: Option<SecretKey>,
29 blobs_store_path: Option<PathBuf>,
35 protocol_state: Option<std::sync::Arc<dyn BucketStateProvider>>,
37}
38
39impl PeerBuilder {
41 pub fn new() -> Self {
42 PeerBuilder {
43 socket_addr: None,
44 secret_key: None,
45 blobs_store_path: None,
46 protocol_state: None,
47 }
48 }
49
50 pub fn socket_addr(mut self, socket_addr: SocketAddr) -> Self {
51 self.socket_addr = Some(socket_addr);
52 self
53 }
54
55 pub fn secret_key(mut self, secret_key: SecretKey) -> Self {
56 self.secret_key = Some(secret_key);
57 self
58 }
59
60 pub fn blobs_store_path(mut self, path: PathBuf) -> Self {
61 self.blobs_store_path = Some(path);
62 self
63 }
64
65 pub fn protocol_state(mut self, state: std::sync::Arc<dyn BucketStateProvider>) -> Self {
66 self.protocol_state = Some(state);
67 self
68 }
69
70 pub async fn build(self) -> Peer {
71 let socket_addr = self
73 .socket_addr
74 .unwrap_or_else(|| SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 0));
75 let secret_key = self.secret_key.unwrap_or_else(SecretKey::generate);
77 let blobs_store_path = self.blobs_store_path.unwrap_or_else(|| {
79 let temp_dir = tempfile::tempdir().expect("failed to create temporary directory");
81 temp_dir.path().to_path_buf()
82 });
83
84 let addr = SocketAddrV4::new(
88 socket_addr
89 .ip()
90 .to_string()
91 .parse::<Ipv4Addr>()
92 .expect("failed to parse IP address"),
93 socket_addr.port(),
94 );
95
96 let mainline_discovery = DhtDiscovery::builder()
98 .secret_key(secret_key.0.clone())
99 .build()
100 .expect("failed to build mainline discovery");
101
102 let endpoint = Endpoint::builder()
104 .secret_key(secret_key.0.clone())
105 .discovery(mainline_discovery)
106 .bind_addr_v4(addr)
107 .bind()
108 .await
109 .expect("failed to bind ephemeral endpoint");
110
111 let blob_store = BlobsStore::load(&blobs_store_path)
113 .await
114 .expect("failed to load blob store");
115
116 Peer {
117 blob_store,
118 secret: secret_key,
119 endpoint,
120 blobs_store_path,
121 protocol_state: self.protocol_state,
122 }
123 }
124}
125
126#[derive(Clone)]
129pub struct Peer {
130 blob_store: BlobsStore,
131 secret: SecretKey,
132 endpoint: Endpoint,
133 blobs_store_path: PathBuf,
134 protocol_state: Option<std::sync::Arc<dyn BucketStateProvider>>,
135}
136
137impl Peer {
138 pub fn builder() -> PeerBuilder {
139 PeerBuilder::default()
140 }
141
142 pub fn id(&self) -> NodeId {
143 *self.secret.public()
144 }
145
146 pub fn secret(&self) -> &SecretKey {
147 &self.secret
148 }
149
150 pub fn blobs(&self) -> &BlobsStore {
151 &self.blob_store
152 }
153
154 pub fn blobs_store_path(&self) -> &PathBuf {
155 &self.blobs_store_path
156 }
157
158 pub fn endpoint(&self) -> &Endpoint {
159 &self.endpoint
160 }
161
162 pub async fn spawn(&self, mut shutdown_rx: WatchReceiver<()>) -> anyhow::Result<()> {
163 let inner_blobs = self.blob_store.inner.clone();
165
166 let mut router_builder =
170 Router::builder(self.endpoint.clone()).accept(iroh_blobs::ALPN, inner_blobs);
171
172 if let Some(state) = &self.protocol_state {
174 let jax_protocol = JaxProtocol::new(state.clone());
175 router_builder = router_builder.accept(JAX_ALPN, jax_protocol);
176 tracing::info!("JAX protocol registered");
177 }
178
179 let router = router_builder.spawn();
180
181 let _ = shutdown_rx.changed().await;
183
184 router.shutdown().await?;
185 Ok(())
186 }
187}