common/peer/
peer_inner.rs

1use crate::crypto::{PublicKey, SecretKey};
2
3use std::net::SocketAddr;
4use std::sync::Arc;
5
6use anyhow::{anyhow, Result};
7use iroh::{Endpoint, NodeId};
8use uuid::Uuid;
9
10pub use super::blobs_store::BlobsStore;
11
12use crate::bucket_log::BucketLogProvider;
13use crate::linked_data::Link;
14use crate::mount::{Mount, MountError};
15
16use super::sync::{PingPeerJob, SyncJob, SyncProvider};
17
18/// Overview of a peer's state, generic over a bucket log provider.
19///  Provides everything that a peer needs in order to
20///  load data, interact with peers, and manage buckets.
21#[derive(Debug)]
22pub struct Peer<L: BucketLogProvider> {
23    log_provider: L,
24    socket_address: SocketAddr,
25    blobs_store: BlobsStore,
26    secret_key: SecretKey,
27    endpoint: Endpoint,
28    sync_provider: Arc<dyn SyncProvider<L>>,
29}
30
31impl<L: BucketLogProvider> Clone for Peer<L>
32where
33    L: Clone,
34{
35    fn clone(&self) -> Self {
36        Self {
37            log_provider: self.log_provider.clone(),
38            socket_address: self.socket_address,
39            blobs_store: self.blobs_store.clone(),
40            secret_key: self.secret_key.clone(),
41            endpoint: self.endpoint.clone(),
42            sync_provider: self.sync_provider.clone(),
43        }
44    }
45}
46
47impl<L: BucketLogProvider> Peer<L> {
48    pub(super) fn new(
49        log_provider: L,
50        socket_address: SocketAddr,
51        blobs_store: BlobsStore,
52        secret_key: SecretKey,
53        endpoint: Endpoint,
54        sync_provider: Arc<dyn SyncProvider<L>>,
55    ) -> Peer<L> {
56        Self {
57            log_provider,
58            socket_address,
59            blobs_store,
60            secret_key,
61            endpoint,
62            sync_provider,
63        }
64    }
65
66    pub fn logs(&self) -> &L {
67        &self.log_provider
68    }
69
70    pub fn blobs(&self) -> &BlobsStore {
71        &self.blobs_store
72    }
73
74    pub fn endpoint(&self) -> &Endpoint {
75        &self.endpoint
76    }
77
78    pub fn log_provider(&self) -> &L {
79        &self.log_provider
80    }
81
82    pub fn secret(&self) -> &SecretKey {
83        &self.secret_key
84    }
85
86    pub fn socket(&self) -> &SocketAddr {
87        &self.socket_address
88    }
89
90    pub fn id(&self) -> NodeId {
91        self.endpoint.node_id()
92    }
93
94    // ========================================
95    // Sync Operations (dispatch to backend)
96    // ========================================
97
98    /// Dispatch a sync job to the sync provider
99    ///
100    /// The provider decides when/where this executes (immediately, queued, etc.)
101    pub async fn dispatch(&self, job: SyncJob) -> Result<()>
102    where
103        L::Error: std::error::Error + Send + Sync + 'static,
104    {
105        self.sync_provider.execute(self, job).await
106    }
107
108    /// Ping all peers in a bucket's shares
109    ///
110    /// Dispatches ping jobs to all peers listed in the bucket's current
111    /// manifest shares (except ourselves).
112    pub async fn ping(&self, bucket_id: Uuid) -> Result<()>
113    where
114        L::Error: std::error::Error + Send + Sync + 'static,
115    {
116        // Get current head link
117        let (head_link, _) = self
118            .logs()
119            .head(bucket_id, None)
120            .await
121            .map_err(|e| anyhow!("Failed to get head for bucket {}: {}", bucket_id, e))?;
122
123        // Load manifest from blobs store
124        let manifest: crate::mount::Manifest = self
125            .blobs()
126            .get_cbor(&head_link.hash())
127            .await
128            .map_err(|e| anyhow!("Failed to load manifest: {}", e))?;
129
130        // Extract our own key to skip ourselves
131        let our_key = crate::crypto::PublicKey::from(*self.secret().public()).to_hex();
132
133        // For each peer in shares, dispatch a ping job
134        for peer_key_hex in manifest.shares().keys() {
135            if peer_key_hex == &our_key {
136                continue; // Skip ourselves
137            }
138
139            let peer_id = crate::crypto::PublicKey::from_hex(peer_key_hex)
140                .map_err(|e| anyhow!("Invalid peer key in shares: {}", e))?;
141
142            // Dispatch ping job
143            if let Err(e) = self
144                .dispatch(SyncJob::PingPeer(PingPeerJob { bucket_id, peer_id }))
145                .await
146            {
147                tracing::warn!(
148                    "Failed to dispatch ping to peer {} for bucket {}: {}",
149                    peer_key_hex,
150                    bucket_id,
151                    e
152                );
153            }
154        }
155
156        Ok(())
157    }
158
159    /// Ping all peers for a bucket and collect their responses
160    ///
161    /// Returns a map of peer public key hex to their ping reply status.
162    /// This waits for all pings to complete before returning.
163    ///
164    /// # Arguments
165    ///
166    /// * `bucket_id` - The bucket to ping peers for
167    /// * `timeout` - Optional timeout duration for the entire operation
168    pub async fn ping_and_collect(
169        &self,
170        bucket_id: Uuid,
171        timeout: Option<std::time::Duration>,
172    ) -> Result<std::collections::HashMap<String, crate::peer::protocol::PingReplyStatus>>
173    where
174        L::Error: std::error::Error + Send + Sync + 'static,
175    {
176        use crate::peer::protocol::bidirectional::BidirectionalHandler;
177        use crate::peer::protocol::{Ping, PingMessage};
178
179        // Get current head link
180        let (head_link, head_height) = self
181            .logs()
182            .head(bucket_id, None)
183            .await
184            .map_err(|e| anyhow!("Failed to get head for bucket {}: {}", bucket_id, e))?;
185
186        // Load manifest from blobs store
187        let manifest: crate::mount::Manifest = self
188            .blobs()
189            .get_cbor(&head_link.hash())
190            .await
191            .map_err(|e| anyhow!("Failed to load manifest: {}", e))?;
192
193        // Extract our own key to skip ourselves
194        let our_key = crate::crypto::PublicKey::from(*self.secret().public()).to_hex();
195
196        // Collect all peer keys
197        let peer_keys: Vec<_> = manifest
198            .shares()
199            .keys()
200            .filter(|key| *key != &our_key)
201            .cloned()
202            .collect();
203
204        // Ping all peers concurrently
205        let mut tasks = Vec::new();
206
207        for peer_key_hex in peer_keys {
208            let peer_id = match crate::crypto::PublicKey::from_hex(&peer_key_hex) {
209                Ok(id) => id,
210                Err(e) => {
211                    tracing::warn!("Invalid peer key {}: {}", peer_key_hex, e);
212                    continue;
213                }
214            };
215
216            let ping = PingMessage {
217                bucket_id,
218                link: head_link.clone(),
219                height: head_height,
220            };
221
222            let peer = self.clone();
223            let key = peer_key_hex.clone();
224
225            tasks.push(tokio::spawn(async move {
226                let result = Ping::send::<L>(&peer, &peer_id, ping).await;
227                (key, result)
228            }));
229        }
230
231        // Collect results with optional timeout
232        let collect_future = async {
233            let mut results: std::collections::HashMap<
234                String,
235                crate::peer::protocol::PingReplyStatus,
236            > = std::collections::HashMap::new();
237            for task in tasks {
238                match task.await {
239                    Ok((key, Ok(reply))) => {
240                        results.insert(key, reply.status);
241                    }
242                    Ok((key, Err(e))) => {
243                        tracing::warn!("Failed to ping peer {}: {}", key, e);
244                    }
245                    Err(e) => {
246                        tracing::warn!("Task panicked: {}", e);
247                    }
248                }
249            }
250            Ok(results)
251        };
252
253        // Apply timeout if specified
254        if let Some(timeout_duration) = timeout {
255            match tokio::time::timeout(timeout_duration, collect_future).await {
256                Ok(result) => result,
257                Err(_) => Err(anyhow!(
258                    "Ping collection timed out after {:?}",
259                    timeout_duration
260                )),
261            }
262        } else {
263            collect_future.await
264        }
265    }
266
267    /// Load mount at the current head of a bucket
268    ///
269    /// # Arguments
270    ///
271    /// * `bucket_id` - The UUID of the bucket to load
272    ///
273    /// # Returns
274    ///
275    /// The Mount at the current head of the bucket's log
276    ///
277    /// # Errors
278    ///
279    /// Returns error if:
280    /// - Bucket not found in log
281    /// - Failed to load mount from blobs
282    pub async fn mount(&self, bucket_id: Uuid) -> Result<Mount, MountError> {
283        // Get current head link from log
284        let (link, _height) = self
285            .log_provider
286            .head(bucket_id, None)
287            .await
288            .map_err(|e| MountError::Default(anyhow!("Failed to get current head: {}", e)))?;
289
290        // Load mount at that link (height is read from manifest)
291        Mount::load(&link, &self.secret_key, &self.blobs_store).await
292    }
293
294    /// Save a mount and append it to the bucket's log
295    ///
296    /// This method:
297    /// 1. Saves the mount to blobs, getting a new link
298    /// 2. Appends the new link to the bucket's log
299    /// 3. Dispatches sync jobs to notify peers
300    ///
301    /// # Arguments
302    ///
303    /// * `bucket_id` - The UUID of the bucket
304    /// * `name` - The name of the bucket (for log metadata)
305    /// * `mount` - The mount to save
306    ///
307    /// # Returns
308    ///
309    /// The new link where the mount was saved
310    ///
311    /// # Errors
312    ///
313    /// Returns error if:
314    /// - Failed to save mount to blobs
315    /// - Failed to append to log
316    pub async fn save_mount(&self, mount: &Mount) -> Result<Link, MountError>
317    where
318        L::Error: std::error::Error + Send + Sync + 'static,
319    {
320        // Get our own public key to exclude from notifications
321        let our_public_key = self.secret_key.public();
322        tracing::info!("SAVE_MOUNT: Our public key: {}", our_public_key.to_hex());
323
324        let inner_mount = mount.inner().await;
325        let manifest = inner_mount.manifest();
326
327        let bucket_id = *manifest.id();
328        let name = manifest.name().to_string();
329
330        // Get shares from the mount manifest
331        let (link, previous_link, height) = mount.save(self.blobs()).await?;
332        let inner = mount.inner().await;
333        let shares = inner.manifest().shares();
334        tracing::info!("SAVE_MOUNT: Found {} shares in manifest", shares.len());
335
336        // Append to log
337        self.log_provider
338            .append(bucket_id, name, link.clone(), Some(previous_link), height)
339            .await
340            .map_err(|e| MountError::Default(anyhow!("Failed to append to log: {}", e)))?;
341
342        // Dispatch ping jobs for each peer (except ourselves)
343        let mut notified_count = 0;
344        for (peer_key_hex, _share) in shares.iter() {
345            tracing::info!("SAVE_MOUNT: Checking share for peer: {}", peer_key_hex);
346
347            // Parse the peer's public key
348            if let Ok(peer_public_key) = PublicKey::from_hex(peer_key_hex) {
349                // Skip ourselves
350                if peer_public_key == our_public_key {
351                    tracing::info!("SAVE_MOUNT: Skipping ourselves: {}", peer_key_hex);
352                    continue;
353                }
354
355                tracing::info!(
356                    "SAVE_MOUNT: Dispatching PingPeer job for bucket {} to peer {}",
357                    bucket_id,
358                    peer_key_hex
359                );
360                // Dispatch a ping job for this peer
361                // Ignore errors - if we can't notify a peer, they'll catch up on their next ping
362                if let Err(e) = self
363                    .dispatch(SyncJob::PingPeer(PingPeerJob {
364                        bucket_id,
365                        peer_id: peer_public_key,
366                    }))
367                    .await
368                {
369                    tracing::warn!("Failed to dispatch ping: {}", e);
370                }
371                notified_count += 1;
372            } else {
373                tracing::warn!(
374                    "SAVE_MOUNT: Failed to parse peer public key: {}",
375                    peer_key_hex
376                );
377            }
378        }
379
380        tracing::info!(
381            "dispatched {} PingPeer jobs for bucket {}",
382            notified_count,
383            bucket_id
384        );
385
386        Ok(link)
387    }
388}