1use crate::crypto::{PublicKey, SecretKey};
2
3use std::net::SocketAddr;
4use std::sync::Arc;
5
6use anyhow::{anyhow, Result};
7use iroh::{Endpoint, NodeId};
8use uuid::Uuid;
9
10pub use super::blobs_store::BlobsStore;
11
12use crate::bucket_log::BucketLogProvider;
13use crate::linked_data::Link;
14use crate::mount::{Mount, MountError};
15
16use super::sync::{PingPeerJob, SyncJob, SyncProvider};
17
18#[derive(Debug)]
22pub struct Peer<L: BucketLogProvider> {
23 log_provider: L,
24 socket_address: SocketAddr,
25 blobs_store: BlobsStore,
26 secret_key: SecretKey,
27 endpoint: Endpoint,
28 sync_provider: Arc<dyn SyncProvider<L>>,
29}
30
31impl<L: BucketLogProvider> Clone for Peer<L>
32where
33 L: Clone,
34{
35 fn clone(&self) -> Self {
36 Self {
37 log_provider: self.log_provider.clone(),
38 socket_address: self.socket_address,
39 blobs_store: self.blobs_store.clone(),
40 secret_key: self.secret_key.clone(),
41 endpoint: self.endpoint.clone(),
42 sync_provider: self.sync_provider.clone(),
43 }
44 }
45}
46
47impl<L: BucketLogProvider> Peer<L> {
48 pub(super) fn new(
49 log_provider: L,
50 socket_address: SocketAddr,
51 blobs_store: BlobsStore,
52 secret_key: SecretKey,
53 endpoint: Endpoint,
54 sync_provider: Arc<dyn SyncProvider<L>>,
55 ) -> Peer<L> {
56 Self {
57 log_provider,
58 socket_address,
59 blobs_store,
60 secret_key,
61 endpoint,
62 sync_provider,
63 }
64 }
65
66 pub fn logs(&self) -> &L {
67 &self.log_provider
68 }
69
70 pub fn blobs(&self) -> &BlobsStore {
71 &self.blobs_store
72 }
73
74 pub fn endpoint(&self) -> &Endpoint {
75 &self.endpoint
76 }
77
78 pub fn log_provider(&self) -> &L {
79 &self.log_provider
80 }
81
82 pub fn secret(&self) -> &SecretKey {
83 &self.secret_key
84 }
85
86 pub fn socket(&self) -> &SocketAddr {
87 &self.socket_address
88 }
89
90 pub fn id(&self) -> NodeId {
91 self.endpoint.node_id()
92 }
93
94 pub async fn dispatch(&self, job: SyncJob) -> Result<()>
102 where
103 L::Error: std::error::Error + Send + Sync + 'static,
104 {
105 self.sync_provider.execute(self, job).await
106 }
107
108 pub async fn ping(&self, bucket_id: Uuid) -> Result<()>
113 where
114 L::Error: std::error::Error + Send + Sync + 'static,
115 {
116 let (head_link, _) = self
118 .logs()
119 .head(bucket_id, None)
120 .await
121 .map_err(|e| anyhow!("Failed to get head for bucket {}: {}", bucket_id, e))?;
122
123 let manifest: crate::mount::Manifest = self
125 .blobs()
126 .get_cbor(&head_link.hash())
127 .await
128 .map_err(|e| anyhow!("Failed to load manifest: {}", e))?;
129
130 let our_key = crate::crypto::PublicKey::from(*self.secret().public()).to_hex();
132
133 for peer_key_hex in manifest.shares().keys() {
135 if peer_key_hex == &our_key {
136 continue; }
138
139 let peer_id = crate::crypto::PublicKey::from_hex(peer_key_hex)
140 .map_err(|e| anyhow!("Invalid peer key in shares: {}", e))?;
141
142 if let Err(e) = self
144 .dispatch(SyncJob::PingPeer(PingPeerJob { bucket_id, peer_id }))
145 .await
146 {
147 tracing::warn!(
148 "Failed to dispatch ping to peer {} for bucket {}: {}",
149 peer_key_hex,
150 bucket_id,
151 e
152 );
153 }
154 }
155
156 Ok(())
157 }
158
159 pub async fn ping_and_collect(
169 &self,
170 bucket_id: Uuid,
171 timeout: Option<std::time::Duration>,
172 ) -> Result<std::collections::HashMap<String, crate::peer::protocol::PingReplyStatus>>
173 where
174 L::Error: std::error::Error + Send + Sync + 'static,
175 {
176 use crate::peer::protocol::bidirectional::BidirectionalHandler;
177 use crate::peer::protocol::{Ping, PingMessage};
178
179 let (head_link, head_height) = self
181 .logs()
182 .head(bucket_id, None)
183 .await
184 .map_err(|e| anyhow!("Failed to get head for bucket {}: {}", bucket_id, e))?;
185
186 let manifest: crate::mount::Manifest = self
188 .blobs()
189 .get_cbor(&head_link.hash())
190 .await
191 .map_err(|e| anyhow!("Failed to load manifest: {}", e))?;
192
193 let our_key = crate::crypto::PublicKey::from(*self.secret().public()).to_hex();
195
196 let peer_keys: Vec<_> = manifest
198 .shares()
199 .keys()
200 .filter(|key| *key != &our_key)
201 .cloned()
202 .collect();
203
204 let mut tasks = Vec::new();
206
207 for peer_key_hex in peer_keys {
208 let peer_id = match crate::crypto::PublicKey::from_hex(&peer_key_hex) {
209 Ok(id) => id,
210 Err(e) => {
211 tracing::warn!("Invalid peer key {}: {}", peer_key_hex, e);
212 continue;
213 }
214 };
215
216 let ping = PingMessage {
217 bucket_id,
218 link: head_link.clone(),
219 height: head_height,
220 };
221
222 let peer = self.clone();
223 let key = peer_key_hex.clone();
224
225 tasks.push(tokio::spawn(async move {
226 let result = Ping::send::<L>(&peer, &peer_id, ping).await;
227 (key, result)
228 }));
229 }
230
231 let collect_future = async {
233 let mut results: std::collections::HashMap<
234 String,
235 crate::peer::protocol::PingReplyStatus,
236 > = std::collections::HashMap::new();
237 for task in tasks {
238 match task.await {
239 Ok((key, Ok(reply))) => {
240 results.insert(key, reply.status);
241 }
242 Ok((key, Err(e))) => {
243 tracing::warn!("Failed to ping peer {}: {}", key, e);
244 }
245 Err(e) => {
246 tracing::warn!("Task panicked: {}", e);
247 }
248 }
249 }
250 Ok(results)
251 };
252
253 if let Some(timeout_duration) = timeout {
255 match tokio::time::timeout(timeout_duration, collect_future).await {
256 Ok(result) => result,
257 Err(_) => Err(anyhow!(
258 "Ping collection timed out after {:?}",
259 timeout_duration
260 )),
261 }
262 } else {
263 collect_future.await
264 }
265 }
266
267 pub async fn mount(&self, bucket_id: Uuid) -> Result<Mount, MountError> {
283 let (link, _height) = self
285 .log_provider
286 .head(bucket_id, None)
287 .await
288 .map_err(|e| MountError::Default(anyhow!("Failed to get current head: {}", e)))?;
289
290 Mount::load(&link, &self.secret_key, &self.blobs_store).await
292 }
293
294 pub async fn save_mount(&self, mount: &Mount) -> Result<Link, MountError>
317 where
318 L::Error: std::error::Error + Send + Sync + 'static,
319 {
320 let our_public_key = self.secret_key.public();
322 tracing::info!("SAVE_MOUNT: Our public key: {}", our_public_key.to_hex());
323
324 let inner_mount = mount.inner().await;
325 let manifest = inner_mount.manifest();
326
327 let bucket_id = *manifest.id();
328 let name = manifest.name().to_string();
329
330 let (link, previous_link, height) = mount.save(self.blobs()).await?;
332 let inner = mount.inner().await;
333 let shares = inner.manifest().shares();
334 tracing::info!("SAVE_MOUNT: Found {} shares in manifest", shares.len());
335
336 self.log_provider
338 .append(bucket_id, name, link.clone(), Some(previous_link), height)
339 .await
340 .map_err(|e| MountError::Default(anyhow!("Failed to append to log: {}", e)))?;
341
342 let mut notified_count = 0;
344 for (peer_key_hex, _share) in shares.iter() {
345 tracing::info!("SAVE_MOUNT: Checking share for peer: {}", peer_key_hex);
346
347 if let Ok(peer_public_key) = PublicKey::from_hex(peer_key_hex) {
349 if peer_public_key == our_public_key {
351 tracing::info!("SAVE_MOUNT: Skipping ourselves: {}", peer_key_hex);
352 continue;
353 }
354
355 tracing::info!(
356 "SAVE_MOUNT: Dispatching PingPeer job for bucket {} to peer {}",
357 bucket_id,
358 peer_key_hex
359 );
360 if let Err(e) = self
363 .dispatch(SyncJob::PingPeer(PingPeerJob {
364 bucket_id,
365 peer_id: peer_public_key,
366 }))
367 .await
368 {
369 tracing::warn!("Failed to dispatch ping: {}", e);
370 }
371 notified_count += 1;
372 } else {
373 tracing::warn!(
374 "SAVE_MOUNT: Failed to parse peer public key: {}",
375 peer_key_hex
376 );
377 }
378 }
379
380 tracing::info!(
381 "dispatched {} PingPeer jobs for bucket {}",
382 notified_count,
383 bucket_id
384 );
385
386 Ok(link)
387 }
388}