common/peer/sync/
sync_bucket.rs

1//! Bucket synchronization job and execution logic
2//!
3//! This module contains the logic for syncing buckets between peers.
4
5use anyhow::{anyhow, Result};
6use uuid::Uuid;
7
8use crate::bucket_log::BucketLogProvider;
9use crate::crypto::PublicKey;
10use crate::linked_data::Link;
11use crate::mount::Manifest;
12use crate::peer::Peer;
13
14use super::{DownloadPinsJob, SyncJob};
15
16/// Target peer and state for bucket synchronization
17#[derive(Debug, Clone)]
18pub struct SyncTarget {
19    /// Link to the target bucket state
20    pub link: Link,
21    /// Height of the target bucket
22    pub height: u64,
23    /// Public key of the peer to sync from
24    pub peer_id: PublicKey,
25}
26
27/// Sync bucket job definition
28#[derive(Debug, Clone)]
29pub struct SyncBucketJob {
30    pub bucket_id: Uuid,
31    pub target: SyncTarget,
32}
33
34/// Execute a bucket sync job
35///
36/// This is the main entry point for syncing. It handles both cases:
37/// - Updating an existing bucket we already have
38/// - Cloning a new bucket we don't have yet
39pub async fn execute<L>(peer: &Peer<L>, job: SyncBucketJob) -> Result<()>
40where
41    L: BucketLogProvider + Clone + Send + Sync + 'static,
42    L::Error: std::error::Error + Send + Sync + 'static,
43{
44    tracing::info!(
45        "Syncing bucket {} from peer {} to link {:?} at height {}",
46        job.bucket_id,
47        job.target.peer_id.to_hex(),
48        job.target.link,
49        job.target.height
50    );
51
52    let exists: bool = peer.logs().exists(job.bucket_id).await?;
53
54    let common_ancestor = if exists {
55        // find a common ancestor between our log and the
56        //  link the peer advertised to us
57        find_common_ancestor(peer, job.bucket_id, &job.target.link, &job.target.peer_id).await?
58    } else {
59        None
60    };
61
62    // TODO (amiller68): between finding the common ancestor and downloading the manifest chain
63    //  there are redundant operations. We should optimize this.
64
65    // if we know the bucket exists, but we did not find a common ancestor
66    //  then we have diverged / are not talking about the same bucket
67    // for now just log a warning and do nothing
68    if exists && common_ancestor.is_none() {
69        tracing::warn!(
70            "Bucket {} diverged from peer {:?}",
71            job.bucket_id,
72            job.target.peer_id
73        );
74        return Ok(());
75    }
76
77    // Determine between what links we should download manifests for
78    let stop_link = if let Some(ancestor) = common_ancestor {
79        Some(&ancestor.0.clone())
80    } else {
81        // No common ancestor - we'll sync everything from the target back to genesis
82        tracing::info!(
83            "No common ancestor for bucket {}, syncing from genesis",
84            job.bucket_id
85        );
86        None
87    };
88
89    // now we know there is a valid list of manifests we should
90    //  fetch and apply to our log
91
92    // Download manifest chain from peer (from target back to common ancestor)
93    let manifests =
94        download_manifest_chain(peer, &job.target.link, stop_link, &job.target.peer_id).await?;
95
96    // TODO (amiller68): maybe theres an optimization here in that we should know
97    //  we can exit earlier by virtue of finding a common ancestor which is just
98    //  our current head
99    if manifests.is_empty() {
100        tracing::info!("No new manifests to sync, already up to date");
101        return Ok(());
102    };
103
104    // Just check we are still included in the shares at the end of this,
105    //  if not we wouldn't have gotten the ping, but we might as well just
106    //  check
107    if !verify_provenance(peer, &manifests.last().unwrap().0)? {
108        tracing::warn!("Provenance verification failed: our key not in bucket shares");
109        return Ok(());
110    }
111
112    // apply the updates to the bucket
113    apply_manifest_chain(peer, job.bucket_id, &manifests).await?;
114
115    Ok(())
116}
117
118/// Download a chain of manifests from a peer
119///
120/// Walks backwards through the manifest chain via `previous` links.
121/// Stops when it reaches `stop_at` link (common ancestor) or genesis (no previous).
122///
123/// Returns manifests in order from oldest to newest with their links and heights.
124async fn download_manifest_chain<L>(
125    peer: &Peer<L>,
126    start_link: &Link,
127    stop_link: Option<&Link>,
128    // TODO (amiller68): this could use multi-peer download
129    peer_id: &PublicKey,
130) -> Result<Vec<(Manifest, Link)>>
131where
132    L: BucketLogProvider + Clone + Send + Sync + 'static,
133    L::Error: std::error::Error + Send + Sync + 'static,
134{
135    tracing::debug!(
136        "Downloading manifest chain from {:?}, stop_at: {:?}",
137        start_link,
138        stop_link
139    );
140
141    let mut manifests = Vec::new();
142    let mut current_link = start_link.clone();
143
144    // Download manifests walking backwards
145    loop {
146        // Download the manifest blob from peer
147        peer.blobs()
148            .download_hash(current_link.hash(), vec![*peer_id], peer.endpoint())
149            .await
150            .map_err(|e| {
151                anyhow!(
152                    "Failed to download manifest {:?} from peer: {}",
153                    current_link,
154                    e
155                )
156            })?;
157
158        // Read and decode the manifest
159        let manifest: Manifest = peer.blobs().get_cbor(&current_link.hash()).await?;
160
161        // Check if we should stop
162        if let Some(stop_link) = stop_link {
163            if &current_link == stop_link {
164                tracing::debug!("Reached stop_at link, stopping download");
165                break;
166            }
167        }
168
169        manifests.push((manifest.clone(), current_link.clone()));
170
171        // Check for previous link
172        match manifest.previous() {
173            Some(prev_link) => {
174                current_link = prev_link.clone();
175            }
176            None => {
177                // Reached genesis, stop
178                tracing::debug!("Reached genesis manifest, stopping download");
179                break;
180            }
181        }
182    }
183
184    // Reverse to get oldest-to-newest order
185    manifests.reverse();
186
187    tracing::debug!("Downloaded {} manifests", manifests.len());
188    Ok(manifests)
189}
190
191/// Find common ancestor by downloading manifests from peer
192///
193/// Starting from `start_link`, walks backwards through the peer's manifest chain,
194/// downloading each manifest and checking if it exists in our local log.
195/// Returns the first (most recent) link and height found in our log.
196///
197/// # Arguments
198///
199/// * `peer` - The peer instance with access to logs and blobs
200/// * `bucket_id` - The bucket to check against our local log
201/// * `link` - The starting point on the peer's chain (typically their head)
202/// * `peer_id` - The peer to download manifests from
203///
204/// # Returns
205///
206/// * `Ok(Some((link, height)))` - Found common ancestor with its link and height
207/// * `Ok(None)` - No common ancestor found (reached genesis without intersection)
208/// * `Err(_)` - Download or log access error
209async fn find_common_ancestor<L>(
210    peer: &Peer<L>,
211    bucket_id: Uuid,
212    link: &Link,
213    peer_id: &PublicKey,
214) -> Result<Option<(Link, u64)>>
215where
216    L: BucketLogProvider + Clone + Send + Sync + 'static,
217    L::Error: std::error::Error + Send + Sync + 'static,
218{
219    tracing::debug!(
220        "Finding common ancestor starting from {:?} with peer {}",
221        link,
222        peer_id.to_hex()
223    );
224
225    let mut current_link = link.clone();
226    let mut manifests_checked = 0;
227
228    loop {
229        manifests_checked += 1;
230        tracing::debug!(
231            "Checking manifest {} at link {:?}",
232            manifests_checked,
233            current_link
234        );
235
236        // TODO (amiller68): this should build in memory
237        //  but for now we just download it
238        // Download the manifest from peer
239        peer.blobs()
240            .download_hash(current_link.hash(), vec![*peer_id], peer.endpoint())
241            .await
242            .map_err(|e| {
243                anyhow!(
244                    "Failed to download manifest {:?} from peer: {}",
245                    current_link,
246                    e
247                )
248            })?;
249
250        // Read and decode the manifest
251        let manifest: Manifest = peer.blobs().get_cbor(&current_link.hash()).await?;
252        let height = manifest.height();
253
254        // Check if this link exists in our local log
255        match peer.logs().has(bucket_id, current_link.clone()).await {
256            Ok(heights) if !heights.is_empty() => {
257                tracing::info!(
258                    "Found common ancestor at link {:?} with height {} (in our log at heights {:?})",
259                    current_link,
260                    height,
261                    heights
262                );
263                return Ok(Some((current_link, height)));
264            }
265            Ok(_) => {
266                // Link not in our log, check previous
267                tracing::debug!("Link {:?} not in our log, checking previous", current_link);
268            }
269            Err(e) => {
270                tracing::warn!("Error checking for link in log: {}", e);
271                // Continue checking previous links despite error
272            }
273        }
274
275        // Move to previous link
276        match manifest.previous() {
277            Some(prev_link) => {
278                current_link = prev_link.clone();
279            }
280            None => {
281                // Reached genesis without finding common ancestor
282                tracing::debug!(
283                    "Reached genesis after checking {} manifests, no common ancestor found",
284                    manifests_checked
285                );
286                return Ok(None);
287            }
288        }
289    }
290}
291
292/// Apply a chain of manifests to the log
293///
294/// Appends each manifest to the log in order, starting from `start_height`.
295async fn apply_manifest_chain<L>(
296    peer: &Peer<L>,
297    bucket_id: Uuid,
298    manifests: &[(Manifest, Link)],
299) -> Result<()>
300where
301    L: BucketLogProvider + Clone + Send + Sync + 'static,
302    L::Error: std::error::Error + Send + Sync + 'static,
303{
304    tracing::info!("Applying {} manifests to log", manifests.len(),);
305
306    if let Some((_i, (manifest, link))) = manifests.iter().enumerate().next() {
307        let previous = manifest.previous().clone();
308        let height = manifest.height();
309
310        tracing::info!(
311            "Appending manifest to log: height={}, link={:?}, previous={:?}",
312            height,
313            link,
314            previous
315        );
316
317        peer.logs()
318            .append(
319                bucket_id,
320                manifest.name().to_string(),
321                link.clone(),
322                previous,
323                height,
324            )
325            .await
326            .map_err(|e| anyhow!("Failed to append manifest at height {}: {}", height, e))?;
327
328        let pins_link = manifest.pins().clone();
329        let peer_ids = manifest
330            .shares()
331            .iter()
332            .map(|share| share.1.principal().identity)
333            .collect();
334        return peer
335            .dispatch(SyncJob::DownloadPins(DownloadPinsJob {
336                pins_link,
337                peer_ids,
338            }))
339            .await;
340    }
341
342    tracing::info!("Successfully applied {} manifests to log", manifests.len());
343    Ok(())
344}
345
346/// Verify that our PublicKey is in the manifest's shares
347fn verify_provenance<L>(peer: &Peer<L>, manifest: &Manifest) -> Result<bool>
348where
349    L: BucketLogProvider + Clone + Send + Sync + 'static,
350    L::Error: std::error::Error + Send + Sync + 'static,
351{
352    let our_pub_key = PublicKey::from(*peer.secret().public());
353    let our_pub_key_hex = our_pub_key.to_hex();
354
355    // Check if our key is in the shares
356    let is_authorized = manifest
357        .shares()
358        .iter()
359        .any(|(key_hex, _share)| key_hex == &our_pub_key_hex);
360
361    tracing::debug!(
362        "Provenance check: our_key={}, authorized={}",
363        our_pub_key_hex,
364        is_authorized
365    );
366
367    Ok(is_authorized)
368}