Skip to main content

common/peer/sync/
sync_bucket.rs

1//! Bucket synchronization job and execution logic
2//!
3//! This module contains the logic for syncing buckets between peers.
4
5use anyhow::{anyhow, Result};
6use uuid::Uuid;
7
8use crate::bucket_log::BucketLogProvider;
9use crate::crypto::PublicKey;
10use crate::linked_data::Link;
11use crate::mount::Manifest;
12use crate::mount::PrincipalRole;
13use crate::peer::Peer;
14
15use super::{DownloadPinsJob, ProvenanceError, SyncJob};
16
17/// Result of provenance verification for a manifest.
18#[derive(Debug, Clone, PartialEq, Eq)]
19enum ProvenanceResult {
20    /// Manifest is valid - properly signed by an authorized writer
21    Valid,
22    /// Our key is not in the manifest's shares (not an error, just skip)
23    NotAuthorized,
24    /// Manifest is unsigned (allowed during migration)
25    UnsignedLegacy,
26}
27
28/// Target peer and state for bucket synchronization
29#[derive(Debug, Clone)]
30pub struct SyncTarget {
31    /// Link to the target bucket state
32    pub link: Link,
33    /// Height of the target bucket
34    pub height: u64,
35    /// Public keys of peers to sync from (in priority order)
36    /// First peer is the "preferred" peer that triggered the sync
37    pub peer_ids: Vec<PublicKey>,
38}
39
40/// Sync bucket job definition
41#[derive(Debug, Clone)]
42pub struct SyncBucketJob {
43    pub bucket_id: Uuid,
44    pub target: SyncTarget,
45}
46
47/// Execute a bucket sync job
48///
49/// This is the main entry point for syncing. It handles both cases:
50/// - Updating an existing bucket we already have
51/// - Cloning a new bucket we don't have yet
52pub async fn execute<L>(peer: &Peer<L>, job: SyncBucketJob) -> Result<()>
53where
54    L: BucketLogProvider + Clone + Send + Sync + 'static,
55    L::Error: std::error::Error + Send + Sync + 'static,
56{
57    let peer_ids_hex: Vec<String> = job.target.peer_ids.iter().map(|p| p.to_hex()).collect();
58    tracing::info!(
59        "Syncing bucket {} from {} peer(s) {:?} to link {:?} at height {}",
60        job.bucket_id,
61        job.target.peer_ids.len(),
62        peer_ids_hex,
63        job.target.link,
64        job.target.height
65    );
66
67    let exists: bool = peer.logs().exists(job.bucket_id).await?;
68
69    // Signal new bucket discovery so the daemon can set pending status
70    if !exists {
71        let shared_by = job.target.peer_ids.first().map(|p| p.to_hex());
72        if let Err(e) = peer
73            .logs()
74            .on_new_bucket_discovered(job.bucket_id, shared_by)
75            .await
76        {
77            tracing::warn!(
78                "Failed to signal new bucket discovery for {}: {}",
79                job.bucket_id,
80                e
81            );
82        }
83    }
84
85    let common_ancestor = if exists {
86        // find a common ancestor between our log and the
87        //  link the peer advertised to us
88        find_common_ancestor(peer, job.bucket_id, &job.target.link, &job.target.peer_ids).await?
89    } else {
90        None
91    };
92
93    // TODO (amiller68): between finding the common ancestor and downloading the manifest chain
94    //  there are redundant operations. We should optimize this.
95
96    // if we know the bucket exists, but we did not find a common ancestor
97    //  then we have diverged / are not talking about the same bucket
98    // for now just log a warning and do nothing
99    if exists && common_ancestor.is_none() {
100        tracing::warn!(
101            "Bucket {} diverged from peer(s) {:?}",
102            job.bucket_id,
103            peer_ids_hex
104        );
105        return Ok(());
106    }
107
108    // Determine between what links we should download manifests for
109    let stop_link_owned = common_ancestor.as_ref().map(|ancestor| ancestor.0.clone());
110    let stop_link = stop_link_owned.as_ref();
111
112    if stop_link.is_none() && common_ancestor.is_none() {
113        // No common ancestor - we'll sync everything from the target back to genesis
114        tracing::info!(
115            "No common ancestor for bucket {}, syncing from genesis",
116            job.bucket_id
117        );
118    }
119
120    // now we know there is a valid list of manifests we should
121    //  fetch and apply to our log
122
123    // Load the common ancestor manifest as our trusted base (if we have one)
124    // This manifest is already in our blobs from find_common_ancestor
125    let trusted_base: Option<Manifest> = if let Some(link) = stop_link {
126        Some(peer.blobs().get_cbor(&link.hash()).await?)
127    } else {
128        None
129    };
130
131    // Download manifest chain from peer (from target back to common ancestor)
132    let manifests = download_manifest_chain(
133        peer,
134        &job.target.link,
135        stop_link,
136        &job.target.peer_ids,
137        trusted_base.as_ref(),
138    )
139    .await?;
140
141    // TODO (amiller68): maybe theres an optimization here in that we should know
142    //  we can exit earlier by virtue of finding a common ancestor which is just
143    //  our current head
144    if manifests.is_empty() {
145        tracing::info!("No new manifests to sync, already up to date");
146        return Ok(());
147    };
148
149    // Verify provenance of the latest manifest
150    // Use our local manifest as the trusted base (previous)
151    let latest_manifest = &manifests.last().unwrap().0;
152    let trusted_base_ref = trusted_base.as_ref();
153    match verify_provenance(peer, latest_manifest, trusted_base_ref)? {
154        ProvenanceResult::Valid => {
155            tracing::debug!("Provenance verification passed");
156        }
157        ProvenanceResult::UnsignedLegacy => {
158            tracing::warn!(
159                "Accepting unsigned manifest for bucket {} (migration mode)",
160                latest_manifest.id()
161            );
162        }
163        ProvenanceResult::NotAuthorized => {
164            tracing::warn!("Provenance verification failed: our key not in bucket shares");
165            return Ok(());
166        }
167    }
168
169    // apply the updates to the bucket
170    apply_manifest_chain(peer, job.bucket_id, &manifests).await?;
171
172    Ok(())
173}
174
175/// Download a chain of manifests from peers and validate provenance
176///
177/// Walks backwards through the manifest chain via `previous` links.
178/// Stops when it reaches `stop_at` link (common ancestor) or genesis (no previous).
179/// Tries multiple peers in order for each download, succeeding on first available.
180///
181/// After downloading, validates each manifest's provenance:
182/// - The first manifest (oldest) is validated against `trusted_base` (if any)
183/// - Each subsequent manifest is validated against its predecessor
184///
185/// Returns manifests in order from oldest to newest with their links.
186async fn download_manifest_chain<L>(
187    peer: &Peer<L>,
188    start_link: &Link,
189    stop_link: Option<&Link>,
190    peer_ids: &[PublicKey],
191    trusted_base: Option<&Manifest>,
192) -> Result<Vec<(Manifest, Link)>>
193where
194    L: BucketLogProvider + Clone + Send + Sync + 'static,
195    L::Error: std::error::Error + Send + Sync + 'static,
196{
197    tracing::debug!(
198        "Downloading manifest chain from {:?}, stop_at: {:?}, using {} peer(s)",
199        start_link,
200        stop_link,
201        peer_ids.len()
202    );
203
204    let mut manifests = Vec::new();
205    let mut current_link = start_link.clone();
206
207    // Download manifests walking backwards
208    loop {
209        // Download the manifest blob from peers
210        peer.blobs()
211            .download_hash(current_link.hash(), peer_ids.to_vec(), peer.endpoint())
212            .await
213            .map_err(|e| {
214                anyhow!(
215                    "Failed to download manifest {:?} from peers: {}",
216                    current_link,
217                    e
218                )
219            })?;
220
221        // Read and decode the manifest
222        let manifest: Manifest = peer.blobs().get_cbor(&current_link.hash()).await?;
223
224        // Check if we should stop
225        if let Some(stop_link) = stop_link {
226            if &current_link == stop_link {
227                tracing::debug!("Reached stop_at link, stopping download");
228                break;
229            }
230        }
231
232        manifests.push((manifest.clone(), current_link.clone()));
233
234        // Check for previous link
235        match manifest.previous() {
236            Some(prev_link) => {
237                current_link = prev_link.clone();
238            }
239            None => {
240                // Reached genesis, stop
241                tracing::debug!("Reached genesis manifest, stopping download");
242                break;
243            }
244        }
245    }
246
247    // Reverse to get oldest-to-newest order
248    manifests.reverse();
249
250    tracing::debug!("Downloaded {} manifests", manifests.len());
251
252    // Validate the chain (walking from oldest to newest)
253    // The first manifest is validated against the trusted_base (common ancestor)
254    // NOTE: Chain validation only checks author authorization, not receiver authorization.
255    // The receiver check is done separately on the final manifest in execute().
256    let mut previous: Option<&Manifest> = trusted_base;
257
258    for (manifest, link) in manifests.iter() {
259        verify_author(manifest, previous).map_err(|e| ProvenanceError::InvalidManifestInChain {
260            link: link.clone(),
261            reason: e.to_string(),
262        })?;
263        previous = Some(manifest);
264    }
265
266    Ok(manifests)
267}
268
269/// Find common ancestor by downloading manifests from peers
270///
271/// Starting from `start_link`, walks backwards through the peer's manifest chain,
272/// downloading each manifest and checking if it exists in our local log.
273/// Returns the first (most recent) link and height found in our log.
274/// Tries multiple peers in order for each download, succeeding on first available.
275///
276/// # Arguments
277///
278/// * `peer` - The peer instance with access to logs and blobs
279/// * `bucket_id` - The bucket to check against our local log
280/// * `link` - The starting point on the peer's chain (typically their head)
281/// * `peer_ids` - The peers to download manifests from (in priority order)
282///
283/// # Returns
284///
285/// * `Ok(Some((link, height)))` - Found common ancestor with its link and height
286/// * `Ok(None)` - No common ancestor found (reached genesis without intersection)
287/// * `Err(_)` - Download or log access error
288async fn find_common_ancestor<L>(
289    peer: &Peer<L>,
290    bucket_id: Uuid,
291    link: &Link,
292    peer_ids: &[PublicKey],
293) -> Result<Option<(Link, u64)>>
294where
295    L: BucketLogProvider + Clone + Send + Sync + 'static,
296    L::Error: std::error::Error + Send + Sync + 'static,
297{
298    tracing::debug!(
299        "Finding common ancestor starting from {:?} using {} peer(s)",
300        link,
301        peer_ids.len()
302    );
303
304    let mut current_link = link.clone();
305    let mut manifests_checked = 0;
306
307    loop {
308        manifests_checked += 1;
309        tracing::debug!(
310            "Checking manifest {} at link {:?}",
311            manifests_checked,
312            current_link
313        );
314
315        // TODO (amiller68): this should build in memory
316        //  but for now we just download it
317        // Download the manifest from peers
318        peer.blobs()
319            .download_hash(current_link.hash(), peer_ids.to_vec(), peer.endpoint())
320            .await
321            .map_err(|e| {
322                anyhow!(
323                    "Failed to download manifest {:?} from peers: {}",
324                    current_link,
325                    e
326                )
327            })?;
328
329        // Read and decode the manifest
330        let manifest: Manifest = peer.blobs().get_cbor(&current_link.hash()).await?;
331        let height = manifest.height();
332
333        // Check if this link exists in our local log
334        match peer.logs().has(bucket_id, current_link.clone()).await {
335            Ok(heights) if !heights.is_empty() => {
336                tracing::info!(
337                    "Found common ancestor at link {:?} with height {} (in our log at heights {:?})",
338                    current_link,
339                    height,
340                    heights
341                );
342                return Ok(Some((current_link, height)));
343            }
344            Ok(_) => {
345                // Link not in our log, check previous
346                tracing::debug!("Link {:?} not in our log, checking previous", current_link);
347            }
348            Err(e) => {
349                tracing::warn!("Error checking for link in log: {}", e);
350                // Continue checking previous links despite error
351            }
352        }
353
354        // Move to previous link
355        match manifest.previous() {
356            Some(prev_link) => {
357                current_link = prev_link.clone();
358            }
359            None => {
360                // Reached genesis without finding common ancestor
361                tracing::debug!(
362                    "Reached genesis after checking {} manifests, no common ancestor found",
363                    manifests_checked
364                );
365                return Ok(None);
366            }
367        }
368    }
369}
370
371/// Apply a chain of manifests to the log
372///
373/// Appends each manifest to the log in order, starting from `start_height`.
374async fn apply_manifest_chain<L>(
375    peer: &Peer<L>,
376    bucket_id: Uuid,
377    manifests: &[(Manifest, Link)],
378) -> Result<()>
379where
380    L: BucketLogProvider + Clone + Send + Sync + 'static,
381    L::Error: std::error::Error + Send + Sync + 'static,
382{
383    tracing::info!("Applying {} manifests to log", manifests.len());
384
385    // Check if we should download content for this bucket
386    let should_sync = peer
387        .logs()
388        .should_sync_content(bucket_id)
389        .await
390        .unwrap_or(true);
391
392    for (manifest, link) in manifests.iter() {
393        let previous = manifest.previous().clone();
394        let height = manifest.height();
395        let is_published = manifest.is_published();
396
397        tracing::info!(
398            "Appending manifest to log: height={}, link={:?}, previous={:?}, published={}",
399            height,
400            link,
401            previous,
402            is_published
403        );
404
405        peer.logs()
406            .append(
407                bucket_id,
408                manifest.name().to_string(),
409                link.clone(),
410                previous,
411                height,
412                is_published,
413            )
414            .await
415            .map_err(|e| anyhow!("Failed to append manifest at height {}: {}", height, e))?;
416
417        // Only download pins/blobs for active buckets
418        if should_sync {
419            let pins_link = manifest.pins().clone();
420            let peer_ids = manifest
421                .shares()
422                .iter()
423                .map(|share| share.1.principal().identity)
424                .collect();
425            peer.dispatch(SyncJob::DownloadPins(DownloadPinsJob {
426                pins_link,
427                peer_ids,
428            }))
429            .await?;
430        } else {
431            tracing::info!(
432                "Skipping pin download for bucket {} (not active)",
433                bucket_id
434            );
435        }
436    }
437
438    tracing::info!("Successfully applied {} manifests to log", manifests.len());
439    Ok(())
440}
441
442/// Verify the author's authorization to create a manifest.
443///
444/// Checks that:
445/// 1. The manifest is properly signed (or unsigned during migration)
446/// 2. The author was in the previous manifest's shares (authorized to make changes)
447/// 3. The author has write permission (Owner role)
448///
449/// This is used for chain validation where we don't yet know if the receiver
450/// is in the final shares.
451///
452/// # Arguments
453///
454/// * `manifest` - The manifest to verify
455/// * `previous` - The previous manifest in the chain (if any). The author must
456///   have been in this manifest's shares to be authorized. Pass `None` for
457///   genesis manifests.
458fn verify_author(
459    manifest: &Manifest,
460    previous: Option<&Manifest>,
461) -> Result<ProvenanceResult, ProvenanceError> {
462    // 1. Check manifest is signed
463    if !manifest.is_signed() {
464        // Allow unsigned for backwards compatibility during migration
465        tracing::warn!("Received unsigned manifest for bucket {}", manifest.id());
466        return Ok(ProvenanceResult::UnsignedLegacy);
467    }
468
469    // 2. Verify signature
470    match manifest.verify_signature() {
471        Ok(true) => {}
472        Ok(false) => return Err(ProvenanceError::InvalidSignature),
473        Err(e) => {
474            tracing::warn!("Signature verification error: {}", e);
475            return Err(ProvenanceError::InvalidSignature);
476        }
477    }
478
479    // 3. Check author was authorized in PREVIOUS manifest (not current!)
480    //    An attacker could add themselves to the current manifest's shares.
481    //    The author must have been in the previous manifest to make this update.
482    let author = manifest.author().expect("is_signed() was true");
483    let author_hex = author.to_hex();
484
485    let check_shares = previous
486        .map(|p| p.shares())
487        .unwrap_or_else(|| manifest.shares());
488
489    let author_share = check_shares
490        .get(&author_hex)
491        .ok_or(ProvenanceError::AuthorNotInShares)?;
492
493    // 4. Check author has write permission (Owner role)
494    if *author_share.role() != PrincipalRole::Owner {
495        return Err(ProvenanceError::AuthorNotWriter);
496    }
497
498    // 5. If shares were removed, verify the author was an owner in the previous manifest
499    if let Some(prev) = previous {
500        let prev_keys: std::collections::HashSet<&String> = prev.shares().keys().collect();
501        let current_keys: std::collections::HashSet<&String> = manifest.shares().keys().collect();
502
503        let removed_keys: Vec<&&String> = prev_keys.difference(&current_keys).collect();
504
505        if !removed_keys.is_empty() {
506            // Author must have been an owner in the previous manifest to remove shares
507            let author_prev_share = prev
508                .shares()
509                .get(&author_hex)
510                .ok_or(ProvenanceError::UnauthorizedShareRemoval)?;
511            if *author_prev_share.role() != PrincipalRole::Owner {
512                return Err(ProvenanceError::UnauthorizedShareRemoval);
513            }
514        }
515    }
516
517    tracing::debug!(
518        "Author verified: bucket={}, author={}",
519        manifest.id(),
520        author_hex
521    );
522
523    Ok(ProvenanceResult::Valid)
524}
525
526/// Verify a manifest's full provenance including receiver authorization.
527///
528/// Checks that:
529/// 1. Our key is in the manifest's shares (we're authorized to receive it)
530/// 2. The manifest is properly signed (or unsigned during migration)
531/// 3. The author was in the previous manifest's shares (authorized to make changes)
532/// 4. The author has write permission (Owner role)
533///
534/// # Arguments
535///
536/// * `peer` - The peer instance (for our public key)
537/// * `manifest` - The manifest to verify
538/// * `previous` - The previous manifest in the chain (if any). The author must
539///   have been in this manifest's shares to be authorized. Pass `None` for
540///   genesis manifests or when using a locally-synced manifest as trusted base.
541fn verify_provenance<L>(
542    peer: &Peer<L>,
543    manifest: &Manifest,
544    previous: Option<&Manifest>,
545) -> Result<ProvenanceResult, ProvenanceError>
546where
547    L: BucketLogProvider + Clone + Send + Sync + 'static,
548    L::Error: std::error::Error + Send + Sync + 'static,
549{
550    let our_pub_key = PublicKey::from(*peer.secret().public());
551    let our_key_hex = our_pub_key.to_hex();
552
553    // 1. Check we're authorized to receive this bucket
554    let we_are_authorized = manifest
555        .shares()
556        .iter()
557        .any(|(key_hex, _)| key_hex == &our_key_hex);
558
559    if !we_are_authorized {
560        return Ok(ProvenanceResult::NotAuthorized);
561    }
562
563    // 2. Verify author (signature + role check)
564    let result = verify_author(manifest, previous)?;
565
566    // Log success with receiver info
567    if manifest.is_signed() {
568        let author = manifest.author().expect("is_signed() was true");
569        tracing::debug!(
570            "Provenance valid: bucket={}, author={}, our_key={}",
571            manifest.id(),
572            author.to_hex(),
573            our_key_hex
574        );
575    }
576
577    Ok(result)
578}
579
580#[cfg(test)]
581mod tests {
582    use super::*;
583    use crate::crypto::{SecretKey, SecretShare};
584    use crate::mount::Share;
585
586    fn create_test_manifest(owner: &SecretKey) -> Manifest {
587        let share = SecretShare::default();
588        Manifest::new(
589            uuid::Uuid::new_v4(),
590            "test".to_string(),
591            owner.public(),
592            share,
593            Link::default(),
594            Link::default(),
595            0,
596        )
597    }
598
599    #[test]
600    fn test_verify_author_valid_owner() {
601        let owner = SecretKey::generate();
602        let mut manifest = create_test_manifest(&owner);
603        manifest.sign(&owner).unwrap();
604
605        let result = verify_author(&manifest, None).unwrap();
606        assert_eq!(result, ProvenanceResult::Valid);
607    }
608
609    #[test]
610    fn test_verify_author_rejects_non_writer() {
611        let owner = SecretKey::generate();
612        let mirror = SecretKey::generate();
613
614        let mut manifest = create_test_manifest(&owner);
615        manifest.add_share(Share::new_mirror(mirror.public()));
616        manifest.sign(&mirror).unwrap();
617
618        let result = verify_author(&manifest, None);
619        assert!(matches!(result, Err(ProvenanceError::AuthorNotWriter)));
620    }
621
622    #[test]
623    fn test_verify_author_rejects_unknown_signer() {
624        let owner = SecretKey::generate();
625        let attacker = SecretKey::generate();
626
627        let mut manifest = create_test_manifest(&owner);
628        manifest.sign(&attacker).unwrap();
629
630        let result = verify_author(&manifest, None);
631        assert!(matches!(result, Err(ProvenanceError::AuthorNotInShares)));
632    }
633
634    #[test]
635    fn test_verify_author_accepts_unsigned_legacy() {
636        let owner = SecretKey::generate();
637        let manifest = create_test_manifest(&owner);
638        // Not signed
639
640        let result = verify_author(&manifest, None).unwrap();
641        assert_eq!(result, ProvenanceResult::UnsignedLegacy);
642    }
643}