Skip to main content

common/peer/
peer_inner.rs

1use crate::crypto::{PublicKey, SecretKey};
2
3use std::net::SocketAddr;
4use std::sync::Arc;
5
6use anyhow::{anyhow, Result};
7use iroh::{Endpoint, NodeId};
8use uuid::Uuid;
9
10pub use super::blobs_store::BlobsStore;
11
12use crate::bucket_log::BucketLogProvider;
13use crate::linked_data::Link;
14use crate::mount::{Mount, MountError};
15
16use super::sync::{PingPeerJob, SyncJob, SyncProvider};
17
18/// Overview of a peer's state, generic over a bucket log provider.
19///  Provides everything that a peer needs in order to
20///  load data, interact with peers, and manage buckets.
21#[derive(Debug)]
22pub struct Peer<L: BucketLogProvider> {
23    log_provider: L,
24    socket_address: SocketAddr,
25    blobs_store: BlobsStore,
26    secret_key: SecretKey,
27    endpoint: Endpoint,
28    sync_provider: Arc<dyn SyncProvider<L>>,
29}
30
31impl<L: BucketLogProvider> Clone for Peer<L>
32where
33    L: Clone,
34{
35    fn clone(&self) -> Self {
36        Self {
37            log_provider: self.log_provider.clone(),
38            socket_address: self.socket_address,
39            blobs_store: self.blobs_store.clone(),
40            secret_key: self.secret_key.clone(),
41            endpoint: self.endpoint.clone(),
42            sync_provider: self.sync_provider.clone(),
43        }
44    }
45}
46
47impl<L: BucketLogProvider> Peer<L> {
48    pub(super) fn new(
49        log_provider: L,
50        socket_address: SocketAddr,
51        blobs_store: BlobsStore,
52        secret_key: SecretKey,
53        endpoint: Endpoint,
54        sync_provider: Arc<dyn SyncProvider<L>>,
55    ) -> Peer<L> {
56        Self {
57            log_provider,
58            socket_address,
59            blobs_store,
60            secret_key,
61            endpoint,
62            sync_provider,
63        }
64    }
65
66    pub fn logs(&self) -> &L {
67        &self.log_provider
68    }
69
70    pub fn blobs(&self) -> &BlobsStore {
71        &self.blobs_store
72    }
73
74    pub fn endpoint(&self) -> &Endpoint {
75        &self.endpoint
76    }
77
78    pub fn log_provider(&self) -> &L {
79        &self.log_provider
80    }
81
82    pub fn secret(&self) -> &SecretKey {
83        &self.secret_key
84    }
85
86    pub fn socket(&self) -> &SocketAddr {
87        &self.socket_address
88    }
89
90    pub fn id(&self) -> NodeId {
91        self.endpoint.node_id()
92    }
93
94    // ========================================
95    // Sync Operations (dispatch to backend)
96    // ========================================
97
98    /// Dispatch a sync job to the sync provider
99    ///
100    /// The provider decides when/where this executes (immediately, queued, etc.)
101    pub async fn dispatch(&self, job: SyncJob) -> Result<()>
102    where
103        L::Error: std::error::Error + Send + Sync + 'static,
104    {
105        self.sync_provider.execute(self, job).await
106    }
107
108    /// Ping all peers in a bucket's shares
109    ///
110    /// Dispatches ping jobs to all peers listed in the bucket's current
111    /// manifest shares (except ourselves).
112    pub async fn ping(&self, bucket_id: Uuid) -> Result<()>
113    where
114        L::Error: std::error::Error + Send + Sync + 'static,
115    {
116        // Get current head link
117        let (head_link, _) = self
118            .logs()
119            .head(bucket_id, None)
120            .await
121            .map_err(|e| anyhow!("Failed to get head for bucket {}: {}", bucket_id, e))?;
122
123        // Load manifest from blobs store
124        let manifest: crate::mount::Manifest = self
125            .blobs()
126            .get_cbor(&head_link.hash())
127            .await
128            .map_err(|e| anyhow!("Failed to load manifest: {}", e))?;
129
130        // Extract our own key to skip ourselves
131        let our_key = crate::crypto::PublicKey::from(*self.secret().public()).to_hex();
132
133        // For each peer in shares, dispatch a ping job
134        for peer_key_hex in manifest.shares().keys() {
135            if peer_key_hex == &our_key {
136                continue; // Skip ourselves
137            }
138
139            let peer_id = crate::crypto::PublicKey::from_hex(peer_key_hex)
140                .map_err(|e| anyhow!("Invalid peer key in shares: {}", e))?;
141
142            // Dispatch ping job
143            if let Err(e) = self
144                .dispatch(SyncJob::PingPeer(PingPeerJob { bucket_id, peer_id }))
145                .await
146            {
147                tracing::warn!(
148                    "Failed to dispatch ping to peer {} for bucket {}: {}",
149                    peer_key_hex,
150                    bucket_id,
151                    e
152                );
153            }
154        }
155
156        Ok(())
157    }
158
159    /// Ping all peers for a bucket and collect their responses
160    ///
161    /// Returns a map of peer public key hex to their ping reply status.
162    /// This waits for all pings to complete before returning.
163    ///
164    /// # Arguments
165    ///
166    /// * `bucket_id` - The bucket to ping peers for
167    /// * `timeout` - Optional timeout duration for the entire operation
168    pub async fn ping_and_collect(
169        &self,
170        bucket_id: Uuid,
171        timeout: Option<std::time::Duration>,
172    ) -> Result<std::collections::HashMap<String, crate::peer::protocol::PingReplyStatus>>
173    where
174        L::Error: std::error::Error + Send + Sync + 'static,
175    {
176        use crate::peer::protocol::bidirectional::BidirectionalHandler;
177        use crate::peer::protocol::{Ping, PingMessage};
178
179        // Get current head link
180        let (head_link, head_height) = self
181            .logs()
182            .head(bucket_id, None)
183            .await
184            .map_err(|e| anyhow!("Failed to get head for bucket {}: {}", bucket_id, e))?;
185
186        // Load manifest from blobs store
187        let manifest: crate::mount::Manifest = self
188            .blobs()
189            .get_cbor(&head_link.hash())
190            .await
191            .map_err(|e| anyhow!("Failed to load manifest: {}", e))?;
192
193        // Extract our own key to skip ourselves
194        let our_key = crate::crypto::PublicKey::from(*self.secret().public()).to_hex();
195
196        // Collect all peer keys
197        let peer_keys: Vec<_> = manifest
198            .shares()
199            .keys()
200            .filter(|key| *key != &our_key)
201            .cloned()
202            .collect();
203
204        // Ping all peers concurrently
205        let mut tasks = Vec::new();
206
207        for peer_key_hex in peer_keys {
208            let peer_id = match crate::crypto::PublicKey::from_hex(&peer_key_hex) {
209                Ok(id) => id,
210                Err(e) => {
211                    tracing::warn!("Invalid peer key {}: {}", peer_key_hex, e);
212                    continue;
213                }
214            };
215
216            let ping = PingMessage {
217                bucket_id,
218                link: head_link.clone(),
219                height: head_height,
220            };
221
222            let peer = self.clone();
223            let key = peer_key_hex.clone();
224
225            tasks.push(tokio::spawn(async move {
226                let result = Ping::send::<L>(&peer, &peer_id, ping).await;
227                (key, result)
228            }));
229        }
230
231        // Collect results with optional timeout
232        let collect_future = async {
233            let mut results: std::collections::HashMap<
234                String,
235                crate::peer::protocol::PingReplyStatus,
236            > = std::collections::HashMap::new();
237            for task in tasks {
238                match task.await {
239                    Ok((key, Ok(reply))) => {
240                        results.insert(key, reply.status);
241                    }
242                    Ok((key, Err(e))) => {
243                        tracing::warn!("Failed to ping peer {}: {}", key, e);
244                    }
245                    Err(e) => {
246                        tracing::warn!("Task panicked: {}", e);
247                    }
248                }
249            }
250            Ok(results)
251        };
252
253        // Apply timeout if specified
254        if let Some(timeout_duration) = timeout {
255            match tokio::time::timeout(timeout_duration, collect_future).await {
256                Ok(result) => result,
257                Err(_) => Err(anyhow!(
258                    "Ping collection timed out after {:?}",
259                    timeout_duration
260                )),
261            }
262        } else {
263            collect_future.await
264        }
265    }
266
267    /// Load mount at the current head of a bucket
268    ///
269    /// # Arguments
270    ///
271    /// * `bucket_id` - The UUID of the bucket to load
272    ///
273    /// # Returns
274    ///
275    /// The Mount at the current head of the bucket's log
276    ///
277    /// # Errors
278    ///
279    /// Returns error if:
280    /// - Bucket not found in log
281    /// - Failed to load mount from blobs
282    pub async fn mount(&self, bucket_id: Uuid) -> Result<Mount, MountError> {
283        // Get current head link from log
284        let (link, _height) = self
285            .log_provider
286            .head(bucket_id, None)
287            .await
288            .map_err(|e| MountError::Default(anyhow!("Failed to get current head: {}", e)))?;
289
290        // Load mount at that link (height is read from manifest)
291        Mount::load(&link, &self.secret_key, &self.blobs_store).await
292    }
293
294    /// Load mount for reading based on the peer's role in the bucket.
295    ///
296    /// This method determines the appropriate version to load based on the peer's role:
297    /// - **Owners** see HEAD (latest state, including unpublished changes)
298    /// - **Mirrors** (or unknown roles) see the latest_published version
299    ///
300    /// This ensures that mirrors only see content that has been explicitly published
301    /// to them, while owners always see the most recent state.
302    ///
303    /// # Arguments
304    ///
305    /// * `bucket_id` - The UUID of the bucket to load
306    ///
307    /// # Returns
308    ///
309    /// The Mount at the appropriate version for this peer's role
310    ///
311    /// # Errors
312    ///
313    /// Returns error if:
314    /// - Bucket not found in log
315    /// - No published version available (for mirrors)
316    /// - Failed to load mount from blobs
317    pub async fn mount_for_read(&self, bucket_id: Uuid) -> Result<Mount, MountError> {
318        use crate::mount::PrincipalRole;
319
320        // Get current head link from log
321        let (head_link, _) = self
322            .log_provider
323            .head(bucket_id, None)
324            .await
325            .map_err(|e| MountError::Default(anyhow!("Failed to get current head: {}", e)))?;
326
327        // Check our role from the HEAD manifest
328        let our_role = Mount::load_manifest(&head_link, &self.blobs_store)
329            .await
330            .ok()
331            .and_then(|m| {
332                m.get_share(&self.secret_key.public())
333                    .map(|s| s.role().clone())
334            });
335
336        match our_role {
337            Some(PrincipalRole::Owner) => {
338                // Owners see HEAD (latest state)
339                self.mount(bucket_id).await
340            }
341            _ => {
342                // Mirrors (or unknown role) see latest_published
343                let (link, _) = self
344                    .log_provider
345                    .latest_published(bucket_id)
346                    .await
347                    .map_err(|e| {
348                        MountError::Default(anyhow!("Failed to get latest published: {}", e))
349                    })?
350                    .ok_or_else(|| {
351                        MountError::Default(anyhow!("No published version available"))
352                    })?;
353                Mount::load(&link, &self.secret_key, &self.blobs_store).await
354            }
355        }
356    }
357
358    /// Save a mount and append it to the bucket's log
359    ///
360    /// This method:
361    /// 1. Saves the mount to blobs, getting a new link
362    /// 2. Appends the new link to the bucket's log
363    /// 3. Dispatches sync jobs to notify peers
364    ///
365    /// # Arguments
366    ///
367    /// * `mount` - The mount to save
368    /// * `publish` - If true, publish the bucket (expose the secret so mirrors can decrypt)
369    ///
370    /// # Returns
371    ///
372    /// The new link where the mount was saved
373    ///
374    /// # Errors
375    ///
376    /// Returns error if:
377    /// - Failed to save mount to blobs
378    /// - Failed to append to log
379    pub async fn save_mount(&self, mount: &Mount, publish: bool) -> Result<Link, MountError>
380    where
381        L::Error: std::error::Error + Send + Sync + 'static,
382    {
383        // Get our own public key to exclude from notifications
384        let our_public_key = self.secret_key.public();
385        tracing::info!("SAVE_MOUNT: Our public key: {}", our_public_key.to_hex());
386
387        let inner_mount = mount.inner().await;
388        let manifest = inner_mount.manifest();
389
390        let bucket_id = *manifest.id();
391        let name = manifest.name().to_string();
392
393        // Get shares from the mount manifest
394        let (link, previous_link, height) = mount.save(self.blobs(), publish).await?;
395        let inner = mount.inner().await;
396        let manifest = inner.manifest();
397        let shares = manifest.shares();
398        let is_published = manifest.is_published();
399        tracing::info!("SAVE_MOUNT: Found {} shares in manifest", shares.len());
400
401        // Append to log
402        self.log_provider
403            .append(
404                bucket_id,
405                name,
406                link.clone(),
407                Some(previous_link),
408                height,
409                is_published,
410            )
411            .await
412            .map_err(|e| MountError::Default(anyhow!("Failed to append to log: {}", e)))?;
413
414        // Dispatch ping jobs for each peer (except ourselves)
415        let mut notified_count = 0;
416        for (peer_key_hex, _share) in shares.iter() {
417            tracing::info!("SAVE_MOUNT: Checking share for peer: {}", peer_key_hex);
418
419            // Parse the peer's public key
420            if let Ok(peer_public_key) = PublicKey::from_hex(peer_key_hex) {
421                // Skip ourselves
422                if peer_public_key == our_public_key {
423                    tracing::info!("SAVE_MOUNT: Skipping ourselves: {}", peer_key_hex);
424                    continue;
425                }
426
427                tracing::info!(
428                    "SAVE_MOUNT: Dispatching PingPeer job for bucket {} to peer {}",
429                    bucket_id,
430                    peer_key_hex
431                );
432                // Dispatch a ping job for this peer
433                // Ignore errors - if we can't notify a peer, they'll catch up on their next ping
434                if let Err(e) = self
435                    .dispatch(SyncJob::PingPeer(PingPeerJob {
436                        bucket_id,
437                        peer_id: peer_public_key,
438                    }))
439                    .await
440                {
441                    tracing::warn!("Failed to dispatch ping: {}", e);
442                }
443                notified_count += 1;
444            } else {
445                tracing::warn!(
446                    "SAVE_MOUNT: Failed to parse peer public key: {}",
447                    peer_key_hex
448                );
449            }
450        }
451
452        tracing::info!(
453            "dispatched {} PingPeer jobs for bucket {}",
454            notified_count,
455            bucket_id
456        );
457
458        Ok(link)
459    }
460}