Skip to main content

common/peer/sync/
sync_bucket.rs

1//! Bucket synchronization job and execution logic
2//!
3//! This module contains the logic for syncing buckets between peers.
4
5use anyhow::{anyhow, Result};
6use uuid::Uuid;
7
8use crate::bucket_log::BucketLogProvider;
9use crate::crypto::PublicKey;
10use crate::linked_data::Link;
11use crate::mount::Manifest;
12use crate::mount::PrincipalRole;
13use crate::peer::Peer;
14
15use super::{DownloadPinsJob, ProvenanceError, SyncJob};
16
17/// Result of provenance verification for a manifest.
18#[derive(Debug, Clone, PartialEq, Eq)]
19enum ProvenanceResult {
20    /// Manifest is valid - properly signed by an authorized writer
21    Valid,
22    /// Our key is not in the manifest's shares (not an error, just skip)
23    NotAuthorized,
24    /// Manifest is unsigned (allowed during migration)
25    UnsignedLegacy,
26}
27
28/// Target peer and state for bucket synchronization
29#[derive(Debug, Clone)]
30pub struct SyncTarget {
31    /// Link to the target bucket state
32    pub link: Link,
33    /// Height of the target bucket
34    pub height: u64,
35    /// Public keys of peers to sync from (in priority order)
36    /// First peer is the "preferred" peer that triggered the sync
37    pub peer_ids: Vec<PublicKey>,
38}
39
40/// Sync bucket job definition
41#[derive(Debug, Clone)]
42pub struct SyncBucketJob {
43    pub bucket_id: Uuid,
44    pub target: SyncTarget,
45}
46
47/// Execute a bucket sync job
48///
49/// This is the main entry point for syncing. It handles both cases:
50/// - Updating an existing bucket we already have
51/// - Cloning a new bucket we don't have yet
52pub async fn execute<L>(peer: &Peer<L>, job: SyncBucketJob) -> Result<()>
53where
54    L: BucketLogProvider + Clone + Send + Sync + 'static,
55    L::Error: std::error::Error + Send + Sync + 'static,
56{
57    let peer_ids_hex: Vec<String> = job.target.peer_ids.iter().map(|p| p.to_hex()).collect();
58    tracing::info!(
59        "Syncing bucket {} from {} peer(s) {:?} to link {:?} at height {}",
60        job.bucket_id,
61        job.target.peer_ids.len(),
62        peer_ids_hex,
63        job.target.link,
64        job.target.height
65    );
66
67    let exists: bool = peer.logs().exists(job.bucket_id).await?;
68
69    let common_ancestor = if exists {
70        // find a common ancestor between our log and the
71        //  link the peer advertised to us
72        find_common_ancestor(peer, job.bucket_id, &job.target.link, &job.target.peer_ids).await?
73    } else {
74        None
75    };
76
77    // TODO (amiller68): between finding the common ancestor and downloading the manifest chain
78    //  there are redundant operations. We should optimize this.
79
80    // if we know the bucket exists, but we did not find a common ancestor
81    //  then we have diverged / are not talking about the same bucket
82    // for now just log a warning and do nothing
83    if exists && common_ancestor.is_none() {
84        tracing::warn!(
85            "Bucket {} diverged from peer(s) {:?}",
86            job.bucket_id,
87            peer_ids_hex
88        );
89        return Ok(());
90    }
91
92    // Determine between what links we should download manifests for
93    let stop_link_owned = common_ancestor.as_ref().map(|ancestor| ancestor.0.clone());
94    let stop_link = stop_link_owned.as_ref();
95
96    if stop_link.is_none() && common_ancestor.is_none() {
97        // No common ancestor - we'll sync everything from the target back to genesis
98        tracing::info!(
99            "No common ancestor for bucket {}, syncing from genesis",
100            job.bucket_id
101        );
102    }
103
104    // now we know there is a valid list of manifests we should
105    //  fetch and apply to our log
106
107    // Load the common ancestor manifest as our trusted base (if we have one)
108    // This manifest is already in our blobs from find_common_ancestor
109    let trusted_base: Option<Manifest> = if let Some(link) = stop_link {
110        Some(peer.blobs().get_cbor(&link.hash()).await?)
111    } else {
112        None
113    };
114
115    // Download manifest chain from peer (from target back to common ancestor)
116    let manifests = download_manifest_chain(
117        peer,
118        &job.target.link,
119        stop_link,
120        &job.target.peer_ids,
121        trusted_base.as_ref(),
122    )
123    .await?;
124
125    // TODO (amiller68): maybe theres an optimization here in that we should know
126    //  we can exit earlier by virtue of finding a common ancestor which is just
127    //  our current head
128    if manifests.is_empty() {
129        tracing::info!("No new manifests to sync, already up to date");
130        return Ok(());
131    };
132
133    // Verify provenance of the latest manifest
134    // Use our local manifest as the trusted base (previous)
135    let latest_manifest = &manifests.last().unwrap().0;
136    let trusted_base_ref = trusted_base.as_ref();
137    match verify_provenance(peer, latest_manifest, trusted_base_ref)? {
138        ProvenanceResult::Valid => {
139            tracing::debug!("Provenance verification passed");
140        }
141        ProvenanceResult::UnsignedLegacy => {
142            tracing::warn!(
143                "Accepting unsigned manifest for bucket {} (migration mode)",
144                latest_manifest.id()
145            );
146        }
147        ProvenanceResult::NotAuthorized => {
148            tracing::warn!("Provenance verification failed: our key not in bucket shares");
149            return Ok(());
150        }
151    }
152
153    // apply the updates to the bucket
154    apply_manifest_chain(peer, job.bucket_id, &manifests).await?;
155
156    Ok(())
157}
158
159/// Download a chain of manifests from peers and validate provenance
160///
161/// Walks backwards through the manifest chain via `previous` links.
162/// Stops when it reaches `stop_at` link (common ancestor) or genesis (no previous).
163/// Tries multiple peers in order for each download, succeeding on first available.
164///
165/// After downloading, validates each manifest's provenance:
166/// - The first manifest (oldest) is validated against `trusted_base` (if any)
167/// - Each subsequent manifest is validated against its predecessor
168///
169/// Returns manifests in order from oldest to newest with their links.
170async fn download_manifest_chain<L>(
171    peer: &Peer<L>,
172    start_link: &Link,
173    stop_link: Option<&Link>,
174    peer_ids: &[PublicKey],
175    trusted_base: Option<&Manifest>,
176) -> Result<Vec<(Manifest, Link)>>
177where
178    L: BucketLogProvider + Clone + Send + Sync + 'static,
179    L::Error: std::error::Error + Send + Sync + 'static,
180{
181    tracing::debug!(
182        "Downloading manifest chain from {:?}, stop_at: {:?}, using {} peer(s)",
183        start_link,
184        stop_link,
185        peer_ids.len()
186    );
187
188    let mut manifests = Vec::new();
189    let mut current_link = start_link.clone();
190
191    // Download manifests walking backwards
192    loop {
193        // Download the manifest blob from peers
194        peer.blobs()
195            .download_hash(current_link.hash(), peer_ids.to_vec(), peer.endpoint())
196            .await
197            .map_err(|e| {
198                anyhow!(
199                    "Failed to download manifest {:?} from peers: {}",
200                    current_link,
201                    e
202                )
203            })?;
204
205        // Read and decode the manifest
206        let manifest: Manifest = peer.blobs().get_cbor(&current_link.hash()).await?;
207
208        // Check if we should stop
209        if let Some(stop_link) = stop_link {
210            if &current_link == stop_link {
211                tracing::debug!("Reached stop_at link, stopping download");
212                break;
213            }
214        }
215
216        manifests.push((manifest.clone(), current_link.clone()));
217
218        // Check for previous link
219        match manifest.previous() {
220            Some(prev_link) => {
221                current_link = prev_link.clone();
222            }
223            None => {
224                // Reached genesis, stop
225                tracing::debug!("Reached genesis manifest, stopping download");
226                break;
227            }
228        }
229    }
230
231    // Reverse to get oldest-to-newest order
232    manifests.reverse();
233
234    tracing::debug!("Downloaded {} manifests", manifests.len());
235
236    // Validate the chain (walking from oldest to newest)
237    // The first manifest is validated against the trusted_base (common ancestor)
238    // NOTE: Chain validation only checks author authorization, not receiver authorization.
239    // The receiver check is done separately on the final manifest in execute().
240    let mut previous: Option<&Manifest> = trusted_base;
241
242    for (manifest, link) in manifests.iter() {
243        verify_author(manifest, previous).map_err(|e| ProvenanceError::InvalidManifestInChain {
244            link: link.clone(),
245            reason: e.to_string(),
246        })?;
247        previous = Some(manifest);
248    }
249
250    Ok(manifests)
251}
252
253/// Find common ancestor by downloading manifests from peers
254///
255/// Starting from `start_link`, walks backwards through the peer's manifest chain,
256/// downloading each manifest and checking if it exists in our local log.
257/// Returns the first (most recent) link and height found in our log.
258/// Tries multiple peers in order for each download, succeeding on first available.
259///
260/// # Arguments
261///
262/// * `peer` - The peer instance with access to logs and blobs
263/// * `bucket_id` - The bucket to check against our local log
264/// * `link` - The starting point on the peer's chain (typically their head)
265/// * `peer_ids` - The peers to download manifests from (in priority order)
266///
267/// # Returns
268///
269/// * `Ok(Some((link, height)))` - Found common ancestor with its link and height
270/// * `Ok(None)` - No common ancestor found (reached genesis without intersection)
271/// * `Err(_)` - Download or log access error
272async fn find_common_ancestor<L>(
273    peer: &Peer<L>,
274    bucket_id: Uuid,
275    link: &Link,
276    peer_ids: &[PublicKey],
277) -> Result<Option<(Link, u64)>>
278where
279    L: BucketLogProvider + Clone + Send + Sync + 'static,
280    L::Error: std::error::Error + Send + Sync + 'static,
281{
282    tracing::debug!(
283        "Finding common ancestor starting from {:?} using {} peer(s)",
284        link,
285        peer_ids.len()
286    );
287
288    let mut current_link = link.clone();
289    let mut manifests_checked = 0;
290
291    loop {
292        manifests_checked += 1;
293        tracing::debug!(
294            "Checking manifest {} at link {:?}",
295            manifests_checked,
296            current_link
297        );
298
299        // TODO (amiller68): this should build in memory
300        //  but for now we just download it
301        // Download the manifest from peers
302        peer.blobs()
303            .download_hash(current_link.hash(), peer_ids.to_vec(), peer.endpoint())
304            .await
305            .map_err(|e| {
306                anyhow!(
307                    "Failed to download manifest {:?} from peers: {}",
308                    current_link,
309                    e
310                )
311            })?;
312
313        // Read and decode the manifest
314        let manifest: Manifest = peer.blobs().get_cbor(&current_link.hash()).await?;
315        let height = manifest.height();
316
317        // Check if this link exists in our local log
318        match peer.logs().has(bucket_id, current_link.clone()).await {
319            Ok(heights) if !heights.is_empty() => {
320                tracing::info!(
321                    "Found common ancestor at link {:?} with height {} (in our log at heights {:?})",
322                    current_link,
323                    height,
324                    heights
325                );
326                return Ok(Some((current_link, height)));
327            }
328            Ok(_) => {
329                // Link not in our log, check previous
330                tracing::debug!("Link {:?} not in our log, checking previous", current_link);
331            }
332            Err(e) => {
333                tracing::warn!("Error checking for link in log: {}", e);
334                // Continue checking previous links despite error
335            }
336        }
337
338        // Move to previous link
339        match manifest.previous() {
340            Some(prev_link) => {
341                current_link = prev_link.clone();
342            }
343            None => {
344                // Reached genesis without finding common ancestor
345                tracing::debug!(
346                    "Reached genesis after checking {} manifests, no common ancestor found",
347                    manifests_checked
348                );
349                return Ok(None);
350            }
351        }
352    }
353}
354
355/// Apply a chain of manifests to the log
356///
357/// Appends each manifest to the log in order, starting from `start_height`.
358async fn apply_manifest_chain<L>(
359    peer: &Peer<L>,
360    bucket_id: Uuid,
361    manifests: &[(Manifest, Link)],
362) -> Result<()>
363where
364    L: BucketLogProvider + Clone + Send + Sync + 'static,
365    L::Error: std::error::Error + Send + Sync + 'static,
366{
367    tracing::info!("Applying {} manifests to log", manifests.len(),);
368
369    if let Some((_i, (manifest, link))) = manifests.iter().enumerate().next() {
370        let previous = manifest.previous().clone();
371        let height = manifest.height();
372        let is_published = manifest.is_published();
373
374        tracing::info!(
375            "Appending manifest to log: height={}, link={:?}, previous={:?}, published={}",
376            height,
377            link,
378            previous,
379            is_published
380        );
381
382        peer.logs()
383            .append(
384                bucket_id,
385                manifest.name().to_string(),
386                link.clone(),
387                previous,
388                height,
389                is_published,
390            )
391            .await
392            .map_err(|e| anyhow!("Failed to append manifest at height {}: {}", height, e))?;
393
394        let pins_link = manifest.pins().clone();
395        let peer_ids = manifest
396            .shares()
397            .iter()
398            .map(|share| share.1.principal().identity)
399            .collect();
400        return peer
401            .dispatch(SyncJob::DownloadPins(DownloadPinsJob {
402                pins_link,
403                peer_ids,
404            }))
405            .await;
406    }
407
408    tracing::info!("Successfully applied {} manifests to log", manifests.len());
409    Ok(())
410}
411
412/// Verify the author's authorization to create a manifest.
413///
414/// Checks that:
415/// 1. The manifest is properly signed (or unsigned during migration)
416/// 2. The author was in the previous manifest's shares (authorized to make changes)
417/// 3. The author has write permission (Owner role)
418///
419/// This is used for chain validation where we don't yet know if the receiver
420/// is in the final shares.
421///
422/// # Arguments
423///
424/// * `manifest` - The manifest to verify
425/// * `previous` - The previous manifest in the chain (if any). The author must
426///   have been in this manifest's shares to be authorized. Pass `None` for
427///   genesis manifests.
428fn verify_author(
429    manifest: &Manifest,
430    previous: Option<&Manifest>,
431) -> Result<ProvenanceResult, ProvenanceError> {
432    // 1. Check manifest is signed
433    if !manifest.is_signed() {
434        // Allow unsigned for backwards compatibility during migration
435        tracing::warn!("Received unsigned manifest for bucket {}", manifest.id());
436        return Ok(ProvenanceResult::UnsignedLegacy);
437    }
438
439    // 2. Verify signature
440    match manifest.verify_signature() {
441        Ok(true) => {}
442        Ok(false) => return Err(ProvenanceError::InvalidSignature),
443        Err(e) => {
444            tracing::warn!("Signature verification error: {}", e);
445            return Err(ProvenanceError::InvalidSignature);
446        }
447    }
448
449    // 3. Check author was authorized in PREVIOUS manifest (not current!)
450    //    An attacker could add themselves to the current manifest's shares.
451    //    The author must have been in the previous manifest to make this update.
452    let author = manifest.author().expect("is_signed() was true");
453    let author_hex = author.to_hex();
454
455    let check_shares = previous
456        .map(|p| p.shares())
457        .unwrap_or_else(|| manifest.shares());
458
459    let author_share = check_shares
460        .get(&author_hex)
461        .ok_or(ProvenanceError::AuthorNotInShares)?;
462
463    // 4. Check author has write permission (Owner role)
464    if *author_share.role() != PrincipalRole::Owner {
465        return Err(ProvenanceError::AuthorNotWriter);
466    }
467
468    // 5. If shares were removed, verify the author was an owner in the previous manifest
469    if let Some(prev) = previous {
470        let prev_keys: std::collections::HashSet<&String> = prev.shares().keys().collect();
471        let current_keys: std::collections::HashSet<&String> = manifest.shares().keys().collect();
472
473        let removed_keys: Vec<&&String> = prev_keys.difference(&current_keys).collect();
474
475        if !removed_keys.is_empty() {
476            // Author must have been an owner in the previous manifest to remove shares
477            let author_prev_share = prev
478                .shares()
479                .get(&author_hex)
480                .ok_or(ProvenanceError::UnauthorizedShareRemoval)?;
481            if *author_prev_share.role() != PrincipalRole::Owner {
482                return Err(ProvenanceError::UnauthorizedShareRemoval);
483            }
484        }
485    }
486
487    tracing::debug!(
488        "Author verified: bucket={}, author={}",
489        manifest.id(),
490        author_hex
491    );
492
493    Ok(ProvenanceResult::Valid)
494}
495
496/// Verify a manifest's full provenance including receiver authorization.
497///
498/// Checks that:
499/// 1. Our key is in the manifest's shares (we're authorized to receive it)
500/// 2. The manifest is properly signed (or unsigned during migration)
501/// 3. The author was in the previous manifest's shares (authorized to make changes)
502/// 4. The author has write permission (Owner role)
503///
504/// # Arguments
505///
506/// * `peer` - The peer instance (for our public key)
507/// * `manifest` - The manifest to verify
508/// * `previous` - The previous manifest in the chain (if any). The author must
509///   have been in this manifest's shares to be authorized. Pass `None` for
510///   genesis manifests or when using a locally-synced manifest as trusted base.
511fn verify_provenance<L>(
512    peer: &Peer<L>,
513    manifest: &Manifest,
514    previous: Option<&Manifest>,
515) -> Result<ProvenanceResult, ProvenanceError>
516where
517    L: BucketLogProvider + Clone + Send + Sync + 'static,
518    L::Error: std::error::Error + Send + Sync + 'static,
519{
520    let our_pub_key = PublicKey::from(*peer.secret().public());
521    let our_key_hex = our_pub_key.to_hex();
522
523    // 1. Check we're authorized to receive this bucket
524    let we_are_authorized = manifest
525        .shares()
526        .iter()
527        .any(|(key_hex, _)| key_hex == &our_key_hex);
528
529    if !we_are_authorized {
530        return Ok(ProvenanceResult::NotAuthorized);
531    }
532
533    // 2. Verify author (signature + role check)
534    let result = verify_author(manifest, previous)?;
535
536    // Log success with receiver info
537    if manifest.is_signed() {
538        let author = manifest.author().expect("is_signed() was true");
539        tracing::debug!(
540            "Provenance valid: bucket={}, author={}, our_key={}",
541            manifest.id(),
542            author.to_hex(),
543            our_key_hex
544        );
545    }
546
547    Ok(result)
548}
549
550#[cfg(test)]
551mod tests {
552    use super::*;
553    use crate::crypto::{SecretKey, SecretShare};
554    use crate::mount::Share;
555
556    fn create_test_manifest(owner: &SecretKey) -> Manifest {
557        let share = SecretShare::default();
558        Manifest::new(
559            uuid::Uuid::new_v4(),
560            "test".to_string(),
561            owner.public(),
562            share,
563            Link::default(),
564            Link::default(),
565            0,
566        )
567    }
568
569    #[test]
570    fn test_verify_author_valid_owner() {
571        let owner = SecretKey::generate();
572        let mut manifest = create_test_manifest(&owner);
573        manifest.sign(&owner).unwrap();
574
575        let result = verify_author(&manifest, None).unwrap();
576        assert_eq!(result, ProvenanceResult::Valid);
577    }
578
579    #[test]
580    fn test_verify_author_rejects_non_writer() {
581        let owner = SecretKey::generate();
582        let mirror = SecretKey::generate();
583
584        let mut manifest = create_test_manifest(&owner);
585        manifest.add_share(Share::new_mirror(mirror.public()));
586        manifest.sign(&mirror).unwrap();
587
588        let result = verify_author(&manifest, None);
589        assert!(matches!(result, Err(ProvenanceError::AuthorNotWriter)));
590    }
591
592    #[test]
593    fn test_verify_author_rejects_unknown_signer() {
594        let owner = SecretKey::generate();
595        let attacker = SecretKey::generate();
596
597        let mut manifest = create_test_manifest(&owner);
598        manifest.sign(&attacker).unwrap();
599
600        let result = verify_author(&manifest, None);
601        assert!(matches!(result, Err(ProvenanceError::AuthorNotInShares)));
602    }
603
604    #[test]
605    fn test_verify_author_accepts_unsigned_legacy() {
606        let owner = SecretKey::generate();
607        let manifest = create_test_manifest(&owner);
608        // Not signed
609
610        let result = verify_author(&manifest, None).unwrap();
611        assert_eq!(result, ProvenanceResult::UnsignedLegacy);
612    }
613}