Skip to main content

common/peer/sync/
sync_bucket.rs

1//! Bucket synchronization job and execution logic
2//!
3//! This module contains the logic for syncing buckets between peers.
4
5use anyhow::{anyhow, Result};
6use uuid::Uuid;
7
8use crate::bucket_log::BucketLogProvider;
9use crate::crypto::PublicKey;
10use crate::linked_data::Link;
11use crate::mount::Manifest;
12use crate::mount::PrincipalRole;
13use crate::peer::Peer;
14
15use super::{DownloadPinsJob, ProvenanceError, SyncJob};
16
17/// Result of provenance verification for a manifest.
18#[derive(Debug, Clone, PartialEq, Eq)]
19enum ProvenanceResult {
20    /// Manifest is valid - properly signed by an authorized writer
21    Valid,
22    /// Our key is not in the manifest's shares (not an error, just skip)
23    NotAuthorized,
24    /// Manifest is unsigned (allowed during migration)
25    UnsignedLegacy,
26}
27
28/// Target peer and state for bucket synchronization
29#[derive(Debug, Clone)]
30pub struct SyncTarget {
31    /// Link to the target bucket state
32    pub link: Link,
33    /// Height of the target bucket
34    pub height: u64,
35    /// Public keys of peers to sync from (in priority order)
36    /// First peer is the "preferred" peer that triggered the sync
37    pub peer_ids: Vec<PublicKey>,
38}
39
40/// Sync bucket job definition
41#[derive(Debug, Clone)]
42pub struct SyncBucketJob {
43    pub bucket_id: Uuid,
44    pub target: SyncTarget,
45}
46
47/// Execute a bucket sync job
48///
49/// This is the main entry point for syncing. It handles both cases:
50/// - Updating an existing bucket we already have
51/// - Cloning a new bucket we don't have yet
52pub async fn execute<L>(peer: &Peer<L>, job: SyncBucketJob) -> Result<()>
53where
54    L: BucketLogProvider + Clone + Send + Sync + 'static,
55    L::Error: std::error::Error + Send + Sync + 'static,
56{
57    let peer_ids_hex: Vec<String> = job.target.peer_ids.iter().map(|p| p.to_hex()).collect();
58    tracing::info!(
59        "Syncing bucket {} from {} peer(s) {:?} to link {:?} at height {}",
60        job.bucket_id,
61        job.target.peer_ids.len(),
62        peer_ids_hex,
63        job.target.link,
64        job.target.height
65    );
66
67    let exists: bool = peer.logs().exists(job.bucket_id).await?;
68
69    let common_ancestor = if exists {
70        // find a common ancestor between our log and the
71        //  link the peer advertised to us
72        find_common_ancestor(peer, job.bucket_id, &job.target.link, &job.target.peer_ids).await?
73    } else {
74        None
75    };
76
77    // TODO (amiller68): between finding the common ancestor and downloading the manifest chain
78    //  there are redundant operations. We should optimize this.
79
80    // if we know the bucket exists, but we did not find a common ancestor
81    //  then we have diverged / are not talking about the same bucket
82    // for now just log a warning and do nothing
83    if exists && common_ancestor.is_none() {
84        tracing::warn!(
85            "Bucket {} diverged from peer(s) {:?}",
86            job.bucket_id,
87            peer_ids_hex
88        );
89        return Ok(());
90    }
91
92    // Determine between what links we should download manifests for
93    let stop_link_owned = common_ancestor.as_ref().map(|ancestor| ancestor.0.clone());
94    let stop_link = stop_link_owned.as_ref();
95
96    if stop_link.is_none() && common_ancestor.is_none() {
97        // No common ancestor - we'll sync everything from the target back to genesis
98        tracing::info!(
99            "No common ancestor for bucket {}, syncing from genesis",
100            job.bucket_id
101        );
102    }
103
104    // now we know there is a valid list of manifests we should
105    //  fetch and apply to our log
106
107    // Load the common ancestor manifest as our trusted base (if we have one)
108    // This manifest is already in our blobs from find_common_ancestor
109    let trusted_base: Option<Manifest> = if let Some(link) = stop_link {
110        Some(peer.blobs().get_cbor(&link.hash()).await?)
111    } else {
112        None
113    };
114
115    // Download manifest chain from peer (from target back to common ancestor)
116    let manifests = download_manifest_chain(
117        peer,
118        &job.target.link,
119        stop_link,
120        &job.target.peer_ids,
121        trusted_base.as_ref(),
122    )
123    .await?;
124
125    // TODO (amiller68): maybe theres an optimization here in that we should know
126    //  we can exit earlier by virtue of finding a common ancestor which is just
127    //  our current head
128    if manifests.is_empty() {
129        tracing::info!("No new manifests to sync, already up to date");
130        return Ok(());
131    };
132
133    // Verify provenance of the latest manifest
134    // Use our local manifest as the trusted base (previous)
135    let latest_manifest = &manifests.last().unwrap().0;
136    let trusted_base_ref = trusted_base.as_ref();
137    match verify_provenance(peer, latest_manifest, trusted_base_ref)? {
138        ProvenanceResult::Valid => {
139            tracing::debug!("Provenance verification passed");
140        }
141        ProvenanceResult::UnsignedLegacy => {
142            tracing::warn!(
143                "Accepting unsigned manifest for bucket {} (migration mode)",
144                latest_manifest.id()
145            );
146        }
147        ProvenanceResult::NotAuthorized => {
148            tracing::warn!("Provenance verification failed: our key not in bucket shares");
149            return Ok(());
150        }
151    }
152
153    // apply the updates to the bucket
154    apply_manifest_chain(peer, job.bucket_id, &manifests).await?;
155
156    Ok(())
157}
158
159/// Download a chain of manifests from peers and validate provenance
160///
161/// Walks backwards through the manifest chain via `previous` links.
162/// Stops when it reaches `stop_at` link (common ancestor) or genesis (no previous).
163/// Tries multiple peers in order for each download, succeeding on first available.
164///
165/// After downloading, validates each manifest's provenance:
166/// - The first manifest (oldest) is validated against `trusted_base` (if any)
167/// - Each subsequent manifest is validated against its predecessor
168///
169/// Returns manifests in order from oldest to newest with their links.
170async fn download_manifest_chain<L>(
171    peer: &Peer<L>,
172    start_link: &Link,
173    stop_link: Option<&Link>,
174    peer_ids: &[PublicKey],
175    trusted_base: Option<&Manifest>,
176) -> Result<Vec<(Manifest, Link)>>
177where
178    L: BucketLogProvider + Clone + Send + Sync + 'static,
179    L::Error: std::error::Error + Send + Sync + 'static,
180{
181    tracing::debug!(
182        "Downloading manifest chain from {:?}, stop_at: {:?}, using {} peer(s)",
183        start_link,
184        stop_link,
185        peer_ids.len()
186    );
187
188    let mut manifests = Vec::new();
189    let mut current_link = start_link.clone();
190
191    // Download manifests walking backwards
192    loop {
193        // Download the manifest blob from peers
194        peer.blobs()
195            .download_hash(current_link.hash(), peer_ids.to_vec(), peer.endpoint())
196            .await
197            .map_err(|e| {
198                anyhow!(
199                    "Failed to download manifest {:?} from peers: {}",
200                    current_link,
201                    e
202                )
203            })?;
204
205        // Read and decode the manifest
206        let manifest: Manifest = peer.blobs().get_cbor(&current_link.hash()).await?;
207
208        // Check if we should stop
209        if let Some(stop_link) = stop_link {
210            if &current_link == stop_link {
211                tracing::debug!("Reached stop_at link, stopping download");
212                break;
213            }
214        }
215
216        manifests.push((manifest.clone(), current_link.clone()));
217
218        // Check for previous link
219        match manifest.previous() {
220            Some(prev_link) => {
221                current_link = prev_link.clone();
222            }
223            None => {
224                // Reached genesis, stop
225                tracing::debug!("Reached genesis manifest, stopping download");
226                break;
227            }
228        }
229    }
230
231    // Reverse to get oldest-to-newest order
232    manifests.reverse();
233
234    tracing::debug!("Downloaded {} manifests", manifests.len());
235
236    // Validate the chain (walking from oldest to newest)
237    // The first manifest is validated against the trusted_base (common ancestor)
238    // NOTE: Chain validation only checks author authorization, not receiver authorization.
239    // The receiver check is done separately on the final manifest in execute().
240    let mut previous: Option<&Manifest> = trusted_base;
241
242    for (manifest, link) in manifests.iter() {
243        verify_author(manifest, previous).map_err(|e| ProvenanceError::InvalidManifestInChain {
244            link: link.clone(),
245            reason: e.to_string(),
246        })?;
247        previous = Some(manifest);
248    }
249
250    Ok(manifests)
251}
252
253/// Find common ancestor by downloading manifests from peers
254///
255/// Starting from `start_link`, walks backwards through the peer's manifest chain,
256/// downloading each manifest and checking if it exists in our local log.
257/// Returns the first (most recent) link and height found in our log.
258/// Tries multiple peers in order for each download, succeeding on first available.
259///
260/// # Arguments
261///
262/// * `peer` - The peer instance with access to logs and blobs
263/// * `bucket_id` - The bucket to check against our local log
264/// * `link` - The starting point on the peer's chain (typically their head)
265/// * `peer_ids` - The peers to download manifests from (in priority order)
266///
267/// # Returns
268///
269/// * `Ok(Some((link, height)))` - Found common ancestor with its link and height
270/// * `Ok(None)` - No common ancestor found (reached genesis without intersection)
271/// * `Err(_)` - Download or log access error
272async fn find_common_ancestor<L>(
273    peer: &Peer<L>,
274    bucket_id: Uuid,
275    link: &Link,
276    peer_ids: &[PublicKey],
277) -> Result<Option<(Link, u64)>>
278where
279    L: BucketLogProvider + Clone + Send + Sync + 'static,
280    L::Error: std::error::Error + Send + Sync + 'static,
281{
282    tracing::debug!(
283        "Finding common ancestor starting from {:?} using {} peer(s)",
284        link,
285        peer_ids.len()
286    );
287
288    let mut current_link = link.clone();
289    let mut manifests_checked = 0;
290
291    loop {
292        manifests_checked += 1;
293        tracing::debug!(
294            "Checking manifest {} at link {:?}",
295            manifests_checked,
296            current_link
297        );
298
299        // TODO (amiller68): this should build in memory
300        //  but for now we just download it
301        // Download the manifest from peers
302        peer.blobs()
303            .download_hash(current_link.hash(), peer_ids.to_vec(), peer.endpoint())
304            .await
305            .map_err(|e| {
306                anyhow!(
307                    "Failed to download manifest {:?} from peers: {}",
308                    current_link,
309                    e
310                )
311            })?;
312
313        // Read and decode the manifest
314        let manifest: Manifest = peer.blobs().get_cbor(&current_link.hash()).await?;
315        let height = manifest.height();
316
317        // Check if this link exists in our local log
318        match peer.logs().has(bucket_id, current_link.clone()).await {
319            Ok(heights) if !heights.is_empty() => {
320                tracing::info!(
321                    "Found common ancestor at link {:?} with height {} (in our log at heights {:?})",
322                    current_link,
323                    height,
324                    heights
325                );
326                return Ok(Some((current_link, height)));
327            }
328            Ok(_) => {
329                // Link not in our log, check previous
330                tracing::debug!("Link {:?} not in our log, checking previous", current_link);
331            }
332            Err(e) => {
333                tracing::warn!("Error checking for link in log: {}", e);
334                // Continue checking previous links despite error
335            }
336        }
337
338        // Move to previous link
339        match manifest.previous() {
340            Some(prev_link) => {
341                current_link = prev_link.clone();
342            }
343            None => {
344                // Reached genesis without finding common ancestor
345                tracing::debug!(
346                    "Reached genesis after checking {} manifests, no common ancestor found",
347                    manifests_checked
348                );
349                return Ok(None);
350            }
351        }
352    }
353}
354
355/// Apply a chain of manifests to the log
356///
357/// Appends each manifest to the log in order, starting from `start_height`.
358async fn apply_manifest_chain<L>(
359    peer: &Peer<L>,
360    bucket_id: Uuid,
361    manifests: &[(Manifest, Link)],
362) -> Result<()>
363where
364    L: BucketLogProvider + Clone + Send + Sync + 'static,
365    L::Error: std::error::Error + Send + Sync + 'static,
366{
367    tracing::info!("Applying {} manifests to log", manifests.len());
368
369    for (manifest, link) in manifests.iter() {
370        let previous = manifest.previous().clone();
371        let height = manifest.height();
372        let is_published = manifest.is_published();
373
374        tracing::info!(
375            "Appending manifest to log: height={}, link={:?}, previous={:?}, published={}",
376            height,
377            link,
378            previous,
379            is_published
380        );
381
382        peer.logs()
383            .append(
384                bucket_id,
385                manifest.name().to_string(),
386                link.clone(),
387                previous,
388                height,
389                is_published,
390            )
391            .await
392            .map_err(|e| anyhow!("Failed to append manifest at height {}: {}", height, e))?;
393
394        let pins_link = manifest.pins().clone();
395        let peer_ids = manifest
396            .shares()
397            .iter()
398            .map(|share| share.1.principal().identity)
399            .collect();
400        peer.dispatch(SyncJob::DownloadPins(DownloadPinsJob {
401            pins_link,
402            peer_ids,
403        }))
404        .await?;
405    }
406
407    tracing::info!("Successfully applied {} manifests to log", manifests.len());
408    Ok(())
409}
410
411/// Verify the author's authorization to create a manifest.
412///
413/// Checks that:
414/// 1. The manifest is properly signed (or unsigned during migration)
415/// 2. The author was in the previous manifest's shares (authorized to make changes)
416/// 3. The author has write permission (Owner role)
417///
418/// This is used for chain validation where we don't yet know if the receiver
419/// is in the final shares.
420///
421/// # Arguments
422///
423/// * `manifest` - The manifest to verify
424/// * `previous` - The previous manifest in the chain (if any). The author must
425///   have been in this manifest's shares to be authorized. Pass `None` for
426///   genesis manifests.
427fn verify_author(
428    manifest: &Manifest,
429    previous: Option<&Manifest>,
430) -> Result<ProvenanceResult, ProvenanceError> {
431    // 1. Check manifest is signed
432    if !manifest.is_signed() {
433        // Allow unsigned for backwards compatibility during migration
434        tracing::warn!("Received unsigned manifest for bucket {}", manifest.id());
435        return Ok(ProvenanceResult::UnsignedLegacy);
436    }
437
438    // 2. Verify signature
439    match manifest.verify_signature() {
440        Ok(true) => {}
441        Ok(false) => return Err(ProvenanceError::InvalidSignature),
442        Err(e) => {
443            tracing::warn!("Signature verification error: {}", e);
444            return Err(ProvenanceError::InvalidSignature);
445        }
446    }
447
448    // 3. Check author was authorized in PREVIOUS manifest (not current!)
449    //    An attacker could add themselves to the current manifest's shares.
450    //    The author must have been in the previous manifest to make this update.
451    let author = manifest.author().expect("is_signed() was true");
452    let author_hex = author.to_hex();
453
454    let check_shares = previous
455        .map(|p| p.shares())
456        .unwrap_or_else(|| manifest.shares());
457
458    let author_share = check_shares
459        .get(&author_hex)
460        .ok_or(ProvenanceError::AuthorNotInShares)?;
461
462    // 4. Check author has write permission (Owner role)
463    if *author_share.role() != PrincipalRole::Owner {
464        return Err(ProvenanceError::AuthorNotWriter);
465    }
466
467    // 5. If shares were removed, verify the author was an owner in the previous manifest
468    if let Some(prev) = previous {
469        let prev_keys: std::collections::HashSet<&String> = prev.shares().keys().collect();
470        let current_keys: std::collections::HashSet<&String> = manifest.shares().keys().collect();
471
472        let removed_keys: Vec<&&String> = prev_keys.difference(&current_keys).collect();
473
474        if !removed_keys.is_empty() {
475            // Author must have been an owner in the previous manifest to remove shares
476            let author_prev_share = prev
477                .shares()
478                .get(&author_hex)
479                .ok_or(ProvenanceError::UnauthorizedShareRemoval)?;
480            if *author_prev_share.role() != PrincipalRole::Owner {
481                return Err(ProvenanceError::UnauthorizedShareRemoval);
482            }
483        }
484    }
485
486    tracing::debug!(
487        "Author verified: bucket={}, author={}",
488        manifest.id(),
489        author_hex
490    );
491
492    Ok(ProvenanceResult::Valid)
493}
494
495/// Verify a manifest's full provenance including receiver authorization.
496///
497/// Checks that:
498/// 1. Our key is in the manifest's shares (we're authorized to receive it)
499/// 2. The manifest is properly signed (or unsigned during migration)
500/// 3. The author was in the previous manifest's shares (authorized to make changes)
501/// 4. The author has write permission (Owner role)
502///
503/// # Arguments
504///
505/// * `peer` - The peer instance (for our public key)
506/// * `manifest` - The manifest to verify
507/// * `previous` - The previous manifest in the chain (if any). The author must
508///   have been in this manifest's shares to be authorized. Pass `None` for
509///   genesis manifests or when using a locally-synced manifest as trusted base.
510fn verify_provenance<L>(
511    peer: &Peer<L>,
512    manifest: &Manifest,
513    previous: Option<&Manifest>,
514) -> Result<ProvenanceResult, ProvenanceError>
515where
516    L: BucketLogProvider + Clone + Send + Sync + 'static,
517    L::Error: std::error::Error + Send + Sync + 'static,
518{
519    let our_pub_key = PublicKey::from(*peer.secret().public());
520    let our_key_hex = our_pub_key.to_hex();
521
522    // 1. Check we're authorized to receive this bucket
523    let we_are_authorized = manifest
524        .shares()
525        .iter()
526        .any(|(key_hex, _)| key_hex == &our_key_hex);
527
528    if !we_are_authorized {
529        return Ok(ProvenanceResult::NotAuthorized);
530    }
531
532    // 2. Verify author (signature + role check)
533    let result = verify_author(manifest, previous)?;
534
535    // Log success with receiver info
536    if manifest.is_signed() {
537        let author = manifest.author().expect("is_signed() was true");
538        tracing::debug!(
539            "Provenance valid: bucket={}, author={}, our_key={}",
540            manifest.id(),
541            author.to_hex(),
542            our_key_hex
543        );
544    }
545
546    Ok(result)
547}
548
549#[cfg(test)]
550mod tests {
551    use super::*;
552    use crate::crypto::{SecretKey, SecretShare};
553    use crate::mount::Share;
554
555    fn create_test_manifest(owner: &SecretKey) -> Manifest {
556        let share = SecretShare::default();
557        Manifest::new(
558            uuid::Uuid::new_v4(),
559            "test".to_string(),
560            owner.public(),
561            share,
562            Link::default(),
563            Link::default(),
564            0,
565        )
566    }
567
568    #[test]
569    fn test_verify_author_valid_owner() {
570        let owner = SecretKey::generate();
571        let mut manifest = create_test_manifest(&owner);
572        manifest.sign(&owner).unwrap();
573
574        let result = verify_author(&manifest, None).unwrap();
575        assert_eq!(result, ProvenanceResult::Valid);
576    }
577
578    #[test]
579    fn test_verify_author_rejects_non_writer() {
580        let owner = SecretKey::generate();
581        let mirror = SecretKey::generate();
582
583        let mut manifest = create_test_manifest(&owner);
584        manifest.add_share(Share::new_mirror(mirror.public()));
585        manifest.sign(&mirror).unwrap();
586
587        let result = verify_author(&manifest, None);
588        assert!(matches!(result, Err(ProvenanceError::AuthorNotWriter)));
589    }
590
591    #[test]
592    fn test_verify_author_rejects_unknown_signer() {
593        let owner = SecretKey::generate();
594        let attacker = SecretKey::generate();
595
596        let mut manifest = create_test_manifest(&owner);
597        manifest.sign(&attacker).unwrap();
598
599        let result = verify_author(&manifest, None);
600        assert!(matches!(result, Err(ProvenanceError::AuthorNotInShares)));
601    }
602
603    #[test]
604    fn test_verify_author_accepts_unsigned_legacy() {
605        let owner = SecretKey::generate();
606        let manifest = create_test_manifest(&owner);
607        // Not signed
608
609        let result = verify_author(&manifest, None).unwrap();
610        assert_eq!(result, ProvenanceResult::UnsignedLegacy);
611    }
612}