Skip to main content

common/peer/sync/
sync_bucket.rs

1//! Bucket synchronization job and execution logic
2//!
3//! This module contains the logic for syncing buckets between peers.
4
5use anyhow::{anyhow, Result};
6use uuid::Uuid;
7
8use crate::bucket_log::BucketLogProvider;
9use crate::crypto::PublicKey;
10use crate::linked_data::Link;
11use crate::mount::Manifest;
12use crate::mount::PrincipalRole;
13use crate::peer::Peer;
14
15use super::{DownloadPinsJob, ProvenanceError, SyncJob};
16
17/// Result of provenance verification for a manifest.
18#[derive(Debug, Clone, PartialEq, Eq)]
19enum ProvenanceResult {
20    /// Manifest is valid - properly signed by an authorized writer
21    Valid,
22    /// Our key is not in the manifest's shares (not an error, just skip)
23    NotAuthorized,
24    /// Manifest is unsigned (allowed during migration)
25    UnsignedLegacy,
26}
27
28/// Target peer and state for bucket synchronization
29#[derive(Debug, Clone)]
30pub struct SyncTarget {
31    /// Link to the target bucket state
32    pub link: Link,
33    /// Height of the target bucket
34    pub height: u64,
35    /// Public keys of peers to sync from (in priority order)
36    /// First peer is the "preferred" peer that triggered the sync
37    pub peer_ids: Vec<PublicKey>,
38}
39
40/// Sync bucket job definition
41#[derive(Debug, Clone)]
42pub struct SyncBucketJob {
43    pub bucket_id: Uuid,
44    pub target: SyncTarget,
45}
46
47/// Execute a bucket sync job
48///
49/// This is the main entry point for syncing. It handles both cases:
50/// - Updating an existing bucket we already have
51/// - Cloning a new bucket we don't have yet
52pub async fn execute<L>(peer: &Peer<L>, job: SyncBucketJob) -> Result<()>
53where
54    L: BucketLogProvider + Clone + Send + Sync + 'static,
55    L::Error: std::error::Error + Send + Sync + 'static,
56{
57    let peer_ids_hex: Vec<String> = job.target.peer_ids.iter().map(|p| p.to_hex()).collect();
58    tracing::info!(
59        "Syncing bucket {} from {} peer(s) {:?} to link {:?} at height {}",
60        job.bucket_id,
61        job.target.peer_ids.len(),
62        peer_ids_hex,
63        job.target.link,
64        job.target.height
65    );
66
67    let exists: bool = peer.logs().exists(job.bucket_id).await?;
68
69    let common_ancestor = if exists {
70        // find a common ancestor between our log and the
71        //  link the peer advertised to us
72        find_common_ancestor(peer, job.bucket_id, &job.target.link, &job.target.peer_ids).await?
73    } else {
74        None
75    };
76
77    // TODO (amiller68): between finding the common ancestor and downloading the manifest chain
78    //  there are redundant operations. We should optimize this.
79
80    // if we know the bucket exists, but we did not find a common ancestor
81    //  then we have diverged / are not talking about the same bucket
82    // for now just log a warning and do nothing
83    if exists && common_ancestor.is_none() {
84        tracing::warn!(
85            "Bucket {} diverged from peer(s) {:?}",
86            job.bucket_id,
87            peer_ids_hex
88        );
89        return Ok(());
90    }
91
92    // Determine between what links we should download manifests for
93    let stop_link_owned = common_ancestor.as_ref().map(|ancestor| ancestor.0.clone());
94    let stop_link = stop_link_owned.as_ref();
95
96    if stop_link.is_none() && common_ancestor.is_none() {
97        // No common ancestor - we'll sync everything from the target back to genesis
98        tracing::info!(
99            "No common ancestor for bucket {}, syncing from genesis",
100            job.bucket_id
101        );
102    }
103
104    // now we know there is a valid list of manifests we should
105    //  fetch and apply to our log
106
107    // Load the common ancestor manifest as our trusted base (if we have one)
108    // This manifest is already in our blobs from find_common_ancestor
109    let trusted_base: Option<Manifest> = if let Some(link) = stop_link {
110        Some(peer.blobs().get_cbor(&link.hash()).await?)
111    } else {
112        None
113    };
114
115    // Download manifest chain from peer (from target back to common ancestor)
116    let manifests = download_manifest_chain(
117        peer,
118        &job.target.link,
119        stop_link,
120        &job.target.peer_ids,
121        trusted_base.as_ref(),
122    )
123    .await?;
124
125    // TODO (amiller68): maybe theres an optimization here in that we should know
126    //  we can exit earlier by virtue of finding a common ancestor which is just
127    //  our current head
128    if manifests.is_empty() {
129        tracing::info!("No new manifests to sync, already up to date");
130        return Ok(());
131    };
132
133    // Verify provenance of the latest manifest
134    // Use our local manifest as the trusted base (previous)
135    let latest_manifest = &manifests.last().unwrap().0;
136    let trusted_base_ref = trusted_base.as_ref();
137    match verify_provenance(peer, latest_manifest, trusted_base_ref)? {
138        ProvenanceResult::Valid => {
139            tracing::debug!("Provenance verification passed");
140        }
141        ProvenanceResult::UnsignedLegacy => {
142            tracing::warn!(
143                "Accepting unsigned manifest for bucket {} (migration mode)",
144                latest_manifest.id()
145            );
146        }
147        ProvenanceResult::NotAuthorized => {
148            tracing::warn!("Provenance verification failed: our key not in bucket shares");
149            return Ok(());
150        }
151    }
152
153    // apply the updates to the bucket
154    apply_manifest_chain(peer, job.bucket_id, &manifests).await?;
155
156    Ok(())
157}
158
159/// Download a chain of manifests from peers and validate provenance
160///
161/// Walks backwards through the manifest chain via `previous` links.
162/// Stops when it reaches `stop_at` link (common ancestor) or genesis (no previous).
163/// Tries multiple peers in order for each download, succeeding on first available.
164///
165/// After downloading, validates each manifest's provenance:
166/// - The first manifest (oldest) is validated against `trusted_base` (if any)
167/// - Each subsequent manifest is validated against its predecessor
168///
169/// Returns manifests in order from oldest to newest with their links.
170async fn download_manifest_chain<L>(
171    peer: &Peer<L>,
172    start_link: &Link,
173    stop_link: Option<&Link>,
174    peer_ids: &[PublicKey],
175    trusted_base: Option<&Manifest>,
176) -> Result<Vec<(Manifest, Link)>>
177where
178    L: BucketLogProvider + Clone + Send + Sync + 'static,
179    L::Error: std::error::Error + Send + Sync + 'static,
180{
181    tracing::debug!(
182        "Downloading manifest chain from {:?}, stop_at: {:?}, using {} peer(s)",
183        start_link,
184        stop_link,
185        peer_ids.len()
186    );
187
188    let mut manifests = Vec::new();
189    let mut current_link = start_link.clone();
190
191    // Download manifests walking backwards
192    loop {
193        // Download the manifest blob from peers
194        peer.blobs()
195            .download_hash(current_link.hash(), peer_ids.to_vec(), peer.endpoint())
196            .await
197            .map_err(|e| {
198                anyhow!(
199                    "Failed to download manifest {:?} from peers: {}",
200                    current_link,
201                    e
202                )
203            })?;
204
205        // Read and decode the manifest
206        let manifest: Manifest = peer.blobs().get_cbor(&current_link.hash()).await?;
207
208        // Check if we should stop
209        if let Some(stop_link) = stop_link {
210            if &current_link == stop_link {
211                tracing::debug!("Reached stop_at link, stopping download");
212                break;
213            }
214        }
215
216        manifests.push((manifest.clone(), current_link.clone()));
217
218        // Check for previous link
219        match manifest.previous() {
220            Some(prev_link) => {
221                current_link = prev_link.clone();
222            }
223            None => {
224                // Reached genesis, stop
225                tracing::debug!("Reached genesis manifest, stopping download");
226                break;
227            }
228        }
229    }
230
231    // Reverse to get oldest-to-newest order
232    manifests.reverse();
233
234    tracing::debug!("Downloaded {} manifests", manifests.len());
235
236    // Validate the chain (walking from oldest to newest)
237    // The first manifest is validated against the trusted_base (common ancestor)
238    // NOTE: Chain validation only checks author authorization, not receiver authorization.
239    // The receiver check is done separately on the final manifest in execute().
240    let mut previous: Option<&Manifest> = trusted_base;
241
242    for (manifest, link) in manifests.iter() {
243        verify_author(manifest, previous).map_err(|e| ProvenanceError::InvalidManifestInChain {
244            link: link.clone(),
245            reason: e.to_string(),
246        })?;
247        previous = Some(manifest);
248    }
249
250    Ok(manifests)
251}
252
253/// Find common ancestor by downloading manifests from peers
254///
255/// Starting from `start_link`, walks backwards through the peer's manifest chain,
256/// downloading each manifest and checking if it exists in our local log.
257/// Returns the first (most recent) link and height found in our log.
258/// Tries multiple peers in order for each download, succeeding on first available.
259///
260/// # Arguments
261///
262/// * `peer` - The peer instance with access to logs and blobs
263/// * `bucket_id` - The bucket to check against our local log
264/// * `link` - The starting point on the peer's chain (typically their head)
265/// * `peer_ids` - The peers to download manifests from (in priority order)
266///
267/// # Returns
268///
269/// * `Ok(Some((link, height)))` - Found common ancestor with its link and height
270/// * `Ok(None)` - No common ancestor found (reached genesis without intersection)
271/// * `Err(_)` - Download or log access error
272async fn find_common_ancestor<L>(
273    peer: &Peer<L>,
274    bucket_id: Uuid,
275    link: &Link,
276    peer_ids: &[PublicKey],
277) -> Result<Option<(Link, u64)>>
278where
279    L: BucketLogProvider + Clone + Send + Sync + 'static,
280    L::Error: std::error::Error + Send + Sync + 'static,
281{
282    tracing::debug!(
283        "Finding common ancestor starting from {:?} using {} peer(s)",
284        link,
285        peer_ids.len()
286    );
287
288    let mut current_link = link.clone();
289    let mut manifests_checked = 0;
290
291    loop {
292        manifests_checked += 1;
293        tracing::debug!(
294            "Checking manifest {} at link {:?}",
295            manifests_checked,
296            current_link
297        );
298
299        // TODO (amiller68): this should build in memory
300        //  but for now we just download it
301        // Download the manifest from peers
302        peer.blobs()
303            .download_hash(current_link.hash(), peer_ids.to_vec(), peer.endpoint())
304            .await
305            .map_err(|e| {
306                anyhow!(
307                    "Failed to download manifest {:?} from peers: {}",
308                    current_link,
309                    e
310                )
311            })?;
312
313        // Read and decode the manifest
314        let manifest: Manifest = peer.blobs().get_cbor(&current_link.hash()).await?;
315        let height = manifest.height();
316
317        // Check if this link exists in our local log
318        match peer.logs().has(bucket_id, current_link.clone()).await {
319            Ok(heights) if !heights.is_empty() => {
320                tracing::info!(
321                    "Found common ancestor at link {:?} with height {} (in our log at heights {:?})",
322                    current_link,
323                    height,
324                    heights
325                );
326                return Ok(Some((current_link, height)));
327            }
328            Ok(_) => {
329                // Link not in our log, check previous
330                tracing::debug!("Link {:?} not in our log, checking previous", current_link);
331            }
332            Err(e) => {
333                tracing::warn!("Error checking for link in log: {}", e);
334                // Continue checking previous links despite error
335            }
336        }
337
338        // Move to previous link
339        match manifest.previous() {
340            Some(prev_link) => {
341                current_link = prev_link.clone();
342            }
343            None => {
344                // Reached genesis without finding common ancestor
345                tracing::debug!(
346                    "Reached genesis after checking {} manifests, no common ancestor found",
347                    manifests_checked
348                );
349                return Ok(None);
350            }
351        }
352    }
353}
354
355/// Apply a chain of manifests to the log
356///
357/// Appends each manifest to the log in order, starting from `start_height`.
358async fn apply_manifest_chain<L>(
359    peer: &Peer<L>,
360    bucket_id: Uuid,
361    manifests: &[(Manifest, Link)],
362) -> Result<()>
363where
364    L: BucketLogProvider + Clone + Send + Sync + 'static,
365    L::Error: std::error::Error + Send + Sync + 'static,
366{
367    tracing::info!("Applying {} manifests to log", manifests.len(),);
368
369    if let Some((_i, (manifest, link))) = manifests.iter().enumerate().next() {
370        let previous = manifest.previous().clone();
371        let height = manifest.height();
372        let is_published = manifest.is_published();
373
374        tracing::info!(
375            "Appending manifest to log: height={}, link={:?}, previous={:?}, published={}",
376            height,
377            link,
378            previous,
379            is_published
380        );
381
382        peer.logs()
383            .append(
384                bucket_id,
385                manifest.name().to_string(),
386                link.clone(),
387                previous,
388                height,
389                is_published,
390            )
391            .await
392            .map_err(|e| anyhow!("Failed to append manifest at height {}: {}", height, e))?;
393
394        let pins_link = manifest.pins().clone();
395        let peer_ids = manifest
396            .shares()
397            .iter()
398            .map(|share| share.1.principal().identity)
399            .collect();
400        return peer
401            .dispatch(SyncJob::DownloadPins(DownloadPinsJob {
402                pins_link,
403                peer_ids,
404            }))
405            .await;
406    }
407
408    tracing::info!("Successfully applied {} manifests to log", manifests.len());
409    Ok(())
410}
411
412/// Verify the author's authorization to create a manifest.
413///
414/// Checks that:
415/// 1. The manifest is properly signed (or unsigned during migration)
416/// 2. The author was in the previous manifest's shares (authorized to make changes)
417/// 3. The author has write permission (Owner role)
418///
419/// This is used for chain validation where we don't yet know if the receiver
420/// is in the final shares.
421///
422/// # Arguments
423///
424/// * `manifest` - The manifest to verify
425/// * `previous` - The previous manifest in the chain (if any). The author must
426///   have been in this manifest's shares to be authorized. Pass `None` for
427///   genesis manifests.
428fn verify_author(
429    manifest: &Manifest,
430    previous: Option<&Manifest>,
431) -> Result<ProvenanceResult, ProvenanceError> {
432    // 1. Check manifest is signed
433    if !manifest.is_signed() {
434        // Allow unsigned for backwards compatibility during migration
435        tracing::warn!("Received unsigned manifest for bucket {}", manifest.id());
436        return Ok(ProvenanceResult::UnsignedLegacy);
437    }
438
439    // 2. Verify signature
440    match manifest.verify_signature() {
441        Ok(true) => {}
442        Ok(false) => return Err(ProvenanceError::InvalidSignature),
443        Err(e) => {
444            tracing::warn!("Signature verification error: {}", e);
445            return Err(ProvenanceError::InvalidSignature);
446        }
447    }
448
449    // 3. Check author was authorized in PREVIOUS manifest (not current!)
450    //    An attacker could add themselves to the current manifest's shares.
451    //    The author must have been in the previous manifest to make this update.
452    let author = manifest.author().expect("is_signed() was true");
453    let author_hex = author.to_hex();
454
455    let check_shares = previous
456        .map(|p| p.shares())
457        .unwrap_or_else(|| manifest.shares());
458
459    let author_share = check_shares
460        .get(&author_hex)
461        .ok_or(ProvenanceError::AuthorNotInShares)?;
462
463    // 4. Check author has write permission (Owner role)
464    if *author_share.role() != PrincipalRole::Owner {
465        return Err(ProvenanceError::AuthorNotWriter);
466    }
467
468    tracing::debug!(
469        "Author verified: bucket={}, author={}",
470        manifest.id(),
471        author_hex
472    );
473
474    Ok(ProvenanceResult::Valid)
475}
476
477/// Verify a manifest's full provenance including receiver authorization.
478///
479/// Checks that:
480/// 1. Our key is in the manifest's shares (we're authorized to receive it)
481/// 2. The manifest is properly signed (or unsigned during migration)
482/// 3. The author was in the previous manifest's shares (authorized to make changes)
483/// 4. The author has write permission (Owner role)
484///
485/// # Arguments
486///
487/// * `peer` - The peer instance (for our public key)
488/// * `manifest` - The manifest to verify
489/// * `previous` - The previous manifest in the chain (if any). The author must
490///   have been in this manifest's shares to be authorized. Pass `None` for
491///   genesis manifests or when using a locally-synced manifest as trusted base.
492fn verify_provenance<L>(
493    peer: &Peer<L>,
494    manifest: &Manifest,
495    previous: Option<&Manifest>,
496) -> Result<ProvenanceResult, ProvenanceError>
497where
498    L: BucketLogProvider + Clone + Send + Sync + 'static,
499    L::Error: std::error::Error + Send + Sync + 'static,
500{
501    let our_pub_key = PublicKey::from(*peer.secret().public());
502    let our_key_hex = our_pub_key.to_hex();
503
504    // 1. Check we're authorized to receive this bucket
505    let we_are_authorized = manifest
506        .shares()
507        .iter()
508        .any(|(key_hex, _)| key_hex == &our_key_hex);
509
510    if !we_are_authorized {
511        return Ok(ProvenanceResult::NotAuthorized);
512    }
513
514    // 2. Verify author (signature + role check)
515    let result = verify_author(manifest, previous)?;
516
517    // Log success with receiver info
518    if manifest.is_signed() {
519        let author = manifest.author().expect("is_signed() was true");
520        tracing::debug!(
521            "Provenance valid: bucket={}, author={}, our_key={}",
522            manifest.id(),
523            author.to_hex(),
524            our_key_hex
525        );
526    }
527
528    Ok(result)
529}
530
531#[cfg(test)]
532mod tests {
533    use super::*;
534    use crate::crypto::{SecretKey, SecretShare};
535    use crate::mount::Share;
536
537    fn create_test_manifest(owner: &SecretKey) -> Manifest {
538        let share = SecretShare::default();
539        Manifest::new(
540            uuid::Uuid::new_v4(),
541            "test".to_string(),
542            owner.public(),
543            share,
544            Link::default(),
545            Link::default(),
546            0,
547        )
548    }
549
550    #[test]
551    fn test_verify_author_valid_owner() {
552        let owner = SecretKey::generate();
553        let mut manifest = create_test_manifest(&owner);
554        manifest.sign(&owner).unwrap();
555
556        let result = verify_author(&manifest, None).unwrap();
557        assert_eq!(result, ProvenanceResult::Valid);
558    }
559
560    #[test]
561    fn test_verify_author_rejects_non_writer() {
562        let owner = SecretKey::generate();
563        let mirror = SecretKey::generate();
564
565        let mut manifest = create_test_manifest(&owner);
566        manifest.add_share(Share::new_mirror(mirror.public()));
567        manifest.sign(&mirror).unwrap();
568
569        let result = verify_author(&manifest, None);
570        assert!(matches!(result, Err(ProvenanceError::AuthorNotWriter)));
571    }
572
573    #[test]
574    fn test_verify_author_rejects_unknown_signer() {
575        let owner = SecretKey::generate();
576        let attacker = SecretKey::generate();
577
578        let mut manifest = create_test_manifest(&owner);
579        manifest.sign(&attacker).unwrap();
580
581        let result = verify_author(&manifest, None);
582        assert!(matches!(result, Err(ProvenanceError::AuthorNotInShares)));
583    }
584
585    #[test]
586    fn test_verify_author_accepts_unsigned_legacy() {
587        let owner = SecretKey::generate();
588        let manifest = create_test_manifest(&owner);
589        // Not signed
590
591        let result = verify_author(&manifest, None).unwrap();
592        assert_eq!(result, ProvenanceResult::UnsignedLegacy);
593    }
594}