1use anyhow::{anyhow, Result};
6use uuid::Uuid;
7
8use crate::bucket_log::BucketLogProvider;
9use crate::crypto::PublicKey;
10use crate::linked_data::Link;
11use crate::mount::Manifest;
12use crate::mount::PrincipalRole;
13use crate::peer::Peer;
14
15use super::{DownloadPinsJob, ProvenanceError, SyncJob};
16
17#[derive(Debug, Clone, PartialEq, Eq)]
19enum ProvenanceResult {
20 Valid,
22 NotAuthorized,
24 UnsignedLegacy,
26}
27
28#[derive(Debug, Clone)]
30pub struct SyncTarget {
31 pub link: Link,
33 pub height: u64,
35 pub peer_ids: Vec<PublicKey>,
38}
39
40#[derive(Debug, Clone)]
42pub struct SyncBucketJob {
43 pub bucket_id: Uuid,
44 pub target: SyncTarget,
45}
46
47pub async fn execute<L>(peer: &Peer<L>, job: SyncBucketJob) -> Result<()>
53where
54 L: BucketLogProvider + Clone + Send + Sync + 'static,
55 L::Error: std::error::Error + Send + Sync + 'static,
56{
57 let peer_ids_hex: Vec<String> = job.target.peer_ids.iter().map(|p| p.to_hex()).collect();
58 tracing::info!(
59 "Syncing bucket {} from {} peer(s) {:?} to link {:?} at height {}",
60 job.bucket_id,
61 job.target.peer_ids.len(),
62 peer_ids_hex,
63 job.target.link,
64 job.target.height
65 );
66
67 let exists: bool = peer.logs().exists(job.bucket_id).await?;
68
69 if !exists {
71 let shared_by = job.target.peer_ids.first().map(|p| p.to_hex());
72 if let Err(e) = peer
73 .logs()
74 .on_new_bucket_discovered(job.bucket_id, shared_by)
75 .await
76 {
77 tracing::warn!(
78 "Failed to signal new bucket discovery for {}: {}",
79 job.bucket_id,
80 e
81 );
82 }
83 }
84
85 let common_ancestor = if exists {
86 find_common_ancestor(peer, job.bucket_id, &job.target.link, &job.target.peer_ids).await?
89 } else {
90 None
91 };
92
93 if exists && common_ancestor.is_none() {
100 tracing::warn!(
101 "Bucket {} diverged from peer(s) {:?}",
102 job.bucket_id,
103 peer_ids_hex
104 );
105 return Ok(());
106 }
107
108 let stop_link_owned = common_ancestor.as_ref().map(|ancestor| ancestor.0.clone());
110 let stop_link = stop_link_owned.as_ref();
111
112 if stop_link.is_none() && common_ancestor.is_none() {
113 tracing::info!(
115 "No common ancestor for bucket {}, syncing from genesis",
116 job.bucket_id
117 );
118 }
119
120 let trusted_base: Option<Manifest> = if let Some(link) = stop_link {
126 Some(peer.blobs().get_cbor(&link.hash()).await?)
127 } else {
128 None
129 };
130
131 let manifests = download_manifest_chain(
133 peer,
134 &job.target.link,
135 stop_link,
136 &job.target.peer_ids,
137 trusted_base.as_ref(),
138 )
139 .await?;
140
141 if manifests.is_empty() {
145 tracing::info!("No new manifests to sync, already up to date");
146 return Ok(());
147 };
148
149 let latest_manifest = &manifests.last().unwrap().0;
152 let trusted_base_ref = trusted_base.as_ref();
153 match verify_provenance(peer, latest_manifest, trusted_base_ref)? {
154 ProvenanceResult::Valid => {
155 tracing::debug!("Provenance verification passed");
156 }
157 ProvenanceResult::UnsignedLegacy => {
158 tracing::warn!(
159 "Accepting unsigned manifest for bucket {} (migration mode)",
160 latest_manifest.id()
161 );
162 }
163 ProvenanceResult::NotAuthorized => {
164 tracing::warn!("Provenance verification failed: our key not in bucket shares");
165 return Ok(());
166 }
167 }
168
169 apply_manifest_chain(peer, job.bucket_id, &manifests).await?;
171
172 Ok(())
173}
174
175async fn download_manifest_chain<L>(
187 peer: &Peer<L>,
188 start_link: &Link,
189 stop_link: Option<&Link>,
190 peer_ids: &[PublicKey],
191 trusted_base: Option<&Manifest>,
192) -> Result<Vec<(Manifest, Link)>>
193where
194 L: BucketLogProvider + Clone + Send + Sync + 'static,
195 L::Error: std::error::Error + Send + Sync + 'static,
196{
197 tracing::debug!(
198 "Downloading manifest chain from {:?}, stop_at: {:?}, using {} peer(s)",
199 start_link,
200 stop_link,
201 peer_ids.len()
202 );
203
204 let mut manifests = Vec::new();
205 let mut current_link = start_link.clone();
206
207 loop {
209 peer.blobs()
211 .download_hash(current_link.hash(), peer_ids.to_vec(), peer.endpoint())
212 .await
213 .map_err(|e| {
214 anyhow!(
215 "Failed to download manifest {:?} from peers: {}",
216 current_link,
217 e
218 )
219 })?;
220
221 let manifest: Manifest = peer.blobs().get_cbor(¤t_link.hash()).await?;
223
224 if let Some(stop_link) = stop_link {
226 if ¤t_link == stop_link {
227 tracing::debug!("Reached stop_at link, stopping download");
228 break;
229 }
230 }
231
232 manifests.push((manifest.clone(), current_link.clone()));
233
234 match manifest.previous() {
236 Some(prev_link) => {
237 current_link = prev_link.clone();
238 }
239 None => {
240 tracing::debug!("Reached genesis manifest, stopping download");
242 break;
243 }
244 }
245 }
246
247 manifests.reverse();
249
250 tracing::debug!("Downloaded {} manifests", manifests.len());
251
252 let mut previous: Option<&Manifest> = trusted_base;
257
258 for (manifest, link) in manifests.iter() {
259 verify_author(manifest, previous).map_err(|e| ProvenanceError::InvalidManifestInChain {
260 link: link.clone(),
261 reason: e.to_string(),
262 })?;
263 previous = Some(manifest);
264 }
265
266 Ok(manifests)
267}
268
269async fn find_common_ancestor<L>(
289 peer: &Peer<L>,
290 bucket_id: Uuid,
291 link: &Link,
292 peer_ids: &[PublicKey],
293) -> Result<Option<(Link, u64)>>
294where
295 L: BucketLogProvider + Clone + Send + Sync + 'static,
296 L::Error: std::error::Error + Send + Sync + 'static,
297{
298 tracing::debug!(
299 "Finding common ancestor starting from {:?} using {} peer(s)",
300 link,
301 peer_ids.len()
302 );
303
304 let mut current_link = link.clone();
305 let mut manifests_checked = 0;
306
307 loop {
308 manifests_checked += 1;
309 tracing::debug!(
310 "Checking manifest {} at link {:?}",
311 manifests_checked,
312 current_link
313 );
314
315 peer.blobs()
319 .download_hash(current_link.hash(), peer_ids.to_vec(), peer.endpoint())
320 .await
321 .map_err(|e| {
322 anyhow!(
323 "Failed to download manifest {:?} from peers: {}",
324 current_link,
325 e
326 )
327 })?;
328
329 let manifest: Manifest = peer.blobs().get_cbor(¤t_link.hash()).await?;
331 let height = manifest.height();
332
333 match peer.logs().has(bucket_id, current_link.clone()).await {
335 Ok(heights) if !heights.is_empty() => {
336 tracing::info!(
337 "Found common ancestor at link {:?} with height {} (in our log at heights {:?})",
338 current_link,
339 height,
340 heights
341 );
342 return Ok(Some((current_link, height)));
343 }
344 Ok(_) => {
345 tracing::debug!("Link {:?} not in our log, checking previous", current_link);
347 }
348 Err(e) => {
349 tracing::warn!("Error checking for link in log: {}", e);
350 }
352 }
353
354 match manifest.previous() {
356 Some(prev_link) => {
357 current_link = prev_link.clone();
358 }
359 None => {
360 tracing::debug!(
362 "Reached genesis after checking {} manifests, no common ancestor found",
363 manifests_checked
364 );
365 return Ok(None);
366 }
367 }
368 }
369}
370
371async fn apply_manifest_chain<L>(
375 peer: &Peer<L>,
376 bucket_id: Uuid,
377 manifests: &[(Manifest, Link)],
378) -> Result<()>
379where
380 L: BucketLogProvider + Clone + Send + Sync + 'static,
381 L::Error: std::error::Error + Send + Sync + 'static,
382{
383 tracing::info!("Applying {} manifests to log", manifests.len());
384
385 let should_sync = peer
387 .logs()
388 .should_sync_content(bucket_id)
389 .await
390 .unwrap_or(true);
391
392 for (manifest, link) in manifests.iter() {
393 let previous = manifest.previous().clone();
394 let height = manifest.height();
395 let is_published = manifest.is_published();
396
397 tracing::info!(
398 "Appending manifest to log: height={}, link={:?}, previous={:?}, published={}",
399 height,
400 link,
401 previous,
402 is_published
403 );
404
405 peer.logs()
406 .append(
407 bucket_id,
408 manifest.name().to_string(),
409 link.clone(),
410 previous,
411 height,
412 is_published,
413 )
414 .await
415 .map_err(|e| anyhow!("Failed to append manifest at height {}: {}", height, e))?;
416
417 if should_sync {
419 let pins_link = manifest.pins().clone();
420 let peer_ids = manifest
421 .shares()
422 .iter()
423 .map(|share| share.1.principal().identity)
424 .collect();
425 peer.dispatch(SyncJob::DownloadPins(DownloadPinsJob {
426 pins_link,
427 peer_ids,
428 }))
429 .await?;
430 } else {
431 tracing::info!(
432 "Skipping pin download for bucket {} (not active)",
433 bucket_id
434 );
435 }
436 }
437
438 tracing::info!("Successfully applied {} manifests to log", manifests.len());
439 Ok(())
440}
441
442fn verify_author(
459 manifest: &Manifest,
460 previous: Option<&Manifest>,
461) -> Result<ProvenanceResult, ProvenanceError> {
462 if !manifest.is_signed() {
464 tracing::warn!("Received unsigned manifest for bucket {}", manifest.id());
466 return Ok(ProvenanceResult::UnsignedLegacy);
467 }
468
469 match manifest.verify_signature() {
471 Ok(true) => {}
472 Ok(false) => return Err(ProvenanceError::InvalidSignature),
473 Err(e) => {
474 tracing::warn!("Signature verification error: {}", e);
475 return Err(ProvenanceError::InvalidSignature);
476 }
477 }
478
479 let author = manifest.author().expect("is_signed() was true");
483 let author_hex = author.to_hex();
484
485 let check_shares = previous
486 .map(|p| p.shares())
487 .unwrap_or_else(|| manifest.shares());
488
489 let author_share = check_shares
490 .get(&author_hex)
491 .ok_or(ProvenanceError::AuthorNotInShares)?;
492
493 if *author_share.role() != PrincipalRole::Owner {
495 return Err(ProvenanceError::AuthorNotWriter);
496 }
497
498 if let Some(prev) = previous {
500 let prev_keys: std::collections::HashSet<&String> = prev.shares().keys().collect();
501 let current_keys: std::collections::HashSet<&String> = manifest.shares().keys().collect();
502
503 let removed_keys: Vec<&&String> = prev_keys.difference(¤t_keys).collect();
504
505 if !removed_keys.is_empty() {
506 let author_prev_share = prev
508 .shares()
509 .get(&author_hex)
510 .ok_or(ProvenanceError::UnauthorizedShareRemoval)?;
511 if *author_prev_share.role() != PrincipalRole::Owner {
512 return Err(ProvenanceError::UnauthorizedShareRemoval);
513 }
514 }
515 }
516
517 tracing::debug!(
518 "Author verified: bucket={}, author={}",
519 manifest.id(),
520 author_hex
521 );
522
523 Ok(ProvenanceResult::Valid)
524}
525
526fn verify_provenance<L>(
542 peer: &Peer<L>,
543 manifest: &Manifest,
544 previous: Option<&Manifest>,
545) -> Result<ProvenanceResult, ProvenanceError>
546where
547 L: BucketLogProvider + Clone + Send + Sync + 'static,
548 L::Error: std::error::Error + Send + Sync + 'static,
549{
550 let our_pub_key = PublicKey::from(*peer.secret().public());
551 let our_key_hex = our_pub_key.to_hex();
552
553 let we_are_authorized = manifest
555 .shares()
556 .iter()
557 .any(|(key_hex, _)| key_hex == &our_key_hex);
558
559 if !we_are_authorized {
560 return Ok(ProvenanceResult::NotAuthorized);
561 }
562
563 let result = verify_author(manifest, previous)?;
565
566 if manifest.is_signed() {
568 let author = manifest.author().expect("is_signed() was true");
569 tracing::debug!(
570 "Provenance valid: bucket={}, author={}, our_key={}",
571 manifest.id(),
572 author.to_hex(),
573 our_key_hex
574 );
575 }
576
577 Ok(result)
578}
579
580#[cfg(test)]
581mod tests {
582 use super::*;
583 use crate::crypto::{SecretKey, SecretShare};
584 use crate::mount::Share;
585
586 fn create_test_manifest(owner: &SecretKey) -> Manifest {
587 let share = SecretShare::default();
588 Manifest::new(
589 uuid::Uuid::new_v4(),
590 "test".to_string(),
591 owner.public(),
592 share,
593 Link::default(),
594 Link::default(),
595 0,
596 )
597 }
598
599 #[test]
600 fn test_verify_author_valid_owner() {
601 let owner = SecretKey::generate();
602 let mut manifest = create_test_manifest(&owner);
603 manifest.sign(&owner).unwrap();
604
605 let result = verify_author(&manifest, None).unwrap();
606 assert_eq!(result, ProvenanceResult::Valid);
607 }
608
609 #[test]
610 fn test_verify_author_rejects_non_writer() {
611 let owner = SecretKey::generate();
612 let mirror = SecretKey::generate();
613
614 let mut manifest = create_test_manifest(&owner);
615 manifest.add_share(Share::new_mirror(mirror.public()));
616 manifest.sign(&mirror).unwrap();
617
618 let result = verify_author(&manifest, None);
619 assert!(matches!(result, Err(ProvenanceError::AuthorNotWriter)));
620 }
621
622 #[test]
623 fn test_verify_author_rejects_unknown_signer() {
624 let owner = SecretKey::generate();
625 let attacker = SecretKey::generate();
626
627 let mut manifest = create_test_manifest(&owner);
628 manifest.sign(&attacker).unwrap();
629
630 let result = verify_author(&manifest, None);
631 assert!(matches!(result, Err(ProvenanceError::AuthorNotInShares)));
632 }
633
634 #[test]
635 fn test_verify_author_accepts_unsigned_legacy() {
636 let owner = SecretKey::generate();
637 let manifest = create_test_manifest(&owner);
638 let result = verify_author(&manifest, None).unwrap();
641 assert_eq!(result, ProvenanceResult::UnsignedLegacy);
642 }
643}