1use anyhow::{anyhow, Result};
6use uuid::Uuid;
7
8use crate::bucket_log::BucketLogProvider;
9use crate::crypto::PublicKey;
10use crate::linked_data::Link;
11use crate::mount::Manifest;
12use crate::mount::PrincipalRole;
13use crate::peer::Peer;
14
15use super::{DownloadPinsJob, ProvenanceError, SyncJob};
16
17#[derive(Debug, Clone, PartialEq, Eq)]
19enum ProvenanceResult {
20 Valid,
22 NotAuthorized,
24 UnsignedLegacy,
26}
27
28#[derive(Debug, Clone)]
30pub struct SyncTarget {
31 pub link: Link,
33 pub height: u64,
35 pub peer_ids: Vec<PublicKey>,
38}
39
40#[derive(Debug, Clone)]
42pub struct SyncBucketJob {
43 pub bucket_id: Uuid,
44 pub target: SyncTarget,
45}
46
47pub async fn execute<L>(peer: &Peer<L>, job: SyncBucketJob) -> Result<()>
53where
54 L: BucketLogProvider + Clone + Send + Sync + 'static,
55 L::Error: std::error::Error + Send + Sync + 'static,
56{
57 let peer_ids_hex: Vec<String> = job.target.peer_ids.iter().map(|p| p.to_hex()).collect();
58 tracing::info!(
59 "Syncing bucket {} from {} peer(s) {:?} to link {:?} at height {}",
60 job.bucket_id,
61 job.target.peer_ids.len(),
62 peer_ids_hex,
63 job.target.link,
64 job.target.height
65 );
66
67 let exists: bool = peer.logs().exists(job.bucket_id).await?;
68
69 let common_ancestor = if exists {
70 find_common_ancestor(peer, job.bucket_id, &job.target.link, &job.target.peer_ids).await?
73 } else {
74 None
75 };
76
77 if exists && common_ancestor.is_none() {
84 tracing::warn!(
85 "Bucket {} diverged from peer(s) {:?}",
86 job.bucket_id,
87 peer_ids_hex
88 );
89 return Ok(());
90 }
91
92 let stop_link_owned = common_ancestor.as_ref().map(|ancestor| ancestor.0.clone());
94 let stop_link = stop_link_owned.as_ref();
95
96 if stop_link.is_none() && common_ancestor.is_none() {
97 tracing::info!(
99 "No common ancestor for bucket {}, syncing from genesis",
100 job.bucket_id
101 );
102 }
103
104 let trusted_base: Option<Manifest> = if let Some(link) = stop_link {
110 Some(peer.blobs().get_cbor(&link.hash()).await?)
111 } else {
112 None
113 };
114
115 let manifests = download_manifest_chain(
117 peer,
118 &job.target.link,
119 stop_link,
120 &job.target.peer_ids,
121 trusted_base.as_ref(),
122 )
123 .await?;
124
125 if manifests.is_empty() {
129 tracing::info!("No new manifests to sync, already up to date");
130 return Ok(());
131 };
132
133 let latest_manifest = &manifests.last().unwrap().0;
136 let trusted_base_ref = trusted_base.as_ref();
137 match verify_provenance(peer, latest_manifest, trusted_base_ref)? {
138 ProvenanceResult::Valid => {
139 tracing::debug!("Provenance verification passed");
140 }
141 ProvenanceResult::UnsignedLegacy => {
142 tracing::warn!(
143 "Accepting unsigned manifest for bucket {} (migration mode)",
144 latest_manifest.id()
145 );
146 }
147 ProvenanceResult::NotAuthorized => {
148 tracing::warn!("Provenance verification failed: our key not in bucket shares");
149 return Ok(());
150 }
151 }
152
153 apply_manifest_chain(peer, job.bucket_id, &manifests).await?;
155
156 Ok(())
157}
158
159async fn download_manifest_chain<L>(
171 peer: &Peer<L>,
172 start_link: &Link,
173 stop_link: Option<&Link>,
174 peer_ids: &[PublicKey],
175 trusted_base: Option<&Manifest>,
176) -> Result<Vec<(Manifest, Link)>>
177where
178 L: BucketLogProvider + Clone + Send + Sync + 'static,
179 L::Error: std::error::Error + Send + Sync + 'static,
180{
181 tracing::debug!(
182 "Downloading manifest chain from {:?}, stop_at: {:?}, using {} peer(s)",
183 start_link,
184 stop_link,
185 peer_ids.len()
186 );
187
188 let mut manifests = Vec::new();
189 let mut current_link = start_link.clone();
190
191 loop {
193 peer.blobs()
195 .download_hash(current_link.hash(), peer_ids.to_vec(), peer.endpoint())
196 .await
197 .map_err(|e| {
198 anyhow!(
199 "Failed to download manifest {:?} from peers: {}",
200 current_link,
201 e
202 )
203 })?;
204
205 let manifest: Manifest = peer.blobs().get_cbor(¤t_link.hash()).await?;
207
208 if let Some(stop_link) = stop_link {
210 if ¤t_link == stop_link {
211 tracing::debug!("Reached stop_at link, stopping download");
212 break;
213 }
214 }
215
216 manifests.push((manifest.clone(), current_link.clone()));
217
218 match manifest.previous() {
220 Some(prev_link) => {
221 current_link = prev_link.clone();
222 }
223 None => {
224 tracing::debug!("Reached genesis manifest, stopping download");
226 break;
227 }
228 }
229 }
230
231 manifests.reverse();
233
234 tracing::debug!("Downloaded {} manifests", manifests.len());
235
236 let mut previous: Option<&Manifest> = trusted_base;
241
242 for (manifest, link) in manifests.iter() {
243 verify_author(manifest, previous).map_err(|e| ProvenanceError::InvalidManifestInChain {
244 link: link.clone(),
245 reason: e.to_string(),
246 })?;
247 previous = Some(manifest);
248 }
249
250 Ok(manifests)
251}
252
253async fn find_common_ancestor<L>(
273 peer: &Peer<L>,
274 bucket_id: Uuid,
275 link: &Link,
276 peer_ids: &[PublicKey],
277) -> Result<Option<(Link, u64)>>
278where
279 L: BucketLogProvider + Clone + Send + Sync + 'static,
280 L::Error: std::error::Error + Send + Sync + 'static,
281{
282 tracing::debug!(
283 "Finding common ancestor starting from {:?} using {} peer(s)",
284 link,
285 peer_ids.len()
286 );
287
288 let mut current_link = link.clone();
289 let mut manifests_checked = 0;
290
291 loop {
292 manifests_checked += 1;
293 tracing::debug!(
294 "Checking manifest {} at link {:?}",
295 manifests_checked,
296 current_link
297 );
298
299 peer.blobs()
303 .download_hash(current_link.hash(), peer_ids.to_vec(), peer.endpoint())
304 .await
305 .map_err(|e| {
306 anyhow!(
307 "Failed to download manifest {:?} from peers: {}",
308 current_link,
309 e
310 )
311 })?;
312
313 let manifest: Manifest = peer.blobs().get_cbor(¤t_link.hash()).await?;
315 let height = manifest.height();
316
317 match peer.logs().has(bucket_id, current_link.clone()).await {
319 Ok(heights) if !heights.is_empty() => {
320 tracing::info!(
321 "Found common ancestor at link {:?} with height {} (in our log at heights {:?})",
322 current_link,
323 height,
324 heights
325 );
326 return Ok(Some((current_link, height)));
327 }
328 Ok(_) => {
329 tracing::debug!("Link {:?} not in our log, checking previous", current_link);
331 }
332 Err(e) => {
333 tracing::warn!("Error checking for link in log: {}", e);
334 }
336 }
337
338 match manifest.previous() {
340 Some(prev_link) => {
341 current_link = prev_link.clone();
342 }
343 None => {
344 tracing::debug!(
346 "Reached genesis after checking {} manifests, no common ancestor found",
347 manifests_checked
348 );
349 return Ok(None);
350 }
351 }
352 }
353}
354
355async fn apply_manifest_chain<L>(
359 peer: &Peer<L>,
360 bucket_id: Uuid,
361 manifests: &[(Manifest, Link)],
362) -> Result<()>
363where
364 L: BucketLogProvider + Clone + Send + Sync + 'static,
365 L::Error: std::error::Error + Send + Sync + 'static,
366{
367 tracing::info!("Applying {} manifests to log", manifests.len(),);
368
369 if let Some((_i, (manifest, link))) = manifests.iter().enumerate().next() {
370 let previous = manifest.previous().clone();
371 let height = manifest.height();
372 let is_published = manifest.is_published();
373
374 tracing::info!(
375 "Appending manifest to log: height={}, link={:?}, previous={:?}, published={}",
376 height,
377 link,
378 previous,
379 is_published
380 );
381
382 peer.logs()
383 .append(
384 bucket_id,
385 manifest.name().to_string(),
386 link.clone(),
387 previous,
388 height,
389 is_published,
390 )
391 .await
392 .map_err(|e| anyhow!("Failed to append manifest at height {}: {}", height, e))?;
393
394 let pins_link = manifest.pins().clone();
395 let peer_ids = manifest
396 .shares()
397 .iter()
398 .map(|share| share.1.principal().identity)
399 .collect();
400 return peer
401 .dispatch(SyncJob::DownloadPins(DownloadPinsJob {
402 pins_link,
403 peer_ids,
404 }))
405 .await;
406 }
407
408 tracing::info!("Successfully applied {} manifests to log", manifests.len());
409 Ok(())
410}
411
412fn verify_author(
429 manifest: &Manifest,
430 previous: Option<&Manifest>,
431) -> Result<ProvenanceResult, ProvenanceError> {
432 if !manifest.is_signed() {
434 tracing::warn!("Received unsigned manifest for bucket {}", manifest.id());
436 return Ok(ProvenanceResult::UnsignedLegacy);
437 }
438
439 match manifest.verify_signature() {
441 Ok(true) => {}
442 Ok(false) => return Err(ProvenanceError::InvalidSignature),
443 Err(e) => {
444 tracing::warn!("Signature verification error: {}", e);
445 return Err(ProvenanceError::InvalidSignature);
446 }
447 }
448
449 let author = manifest.author().expect("is_signed() was true");
453 let author_hex = author.to_hex();
454
455 let check_shares = previous
456 .map(|p| p.shares())
457 .unwrap_or_else(|| manifest.shares());
458
459 let author_share = check_shares
460 .get(&author_hex)
461 .ok_or(ProvenanceError::AuthorNotInShares)?;
462
463 if *author_share.role() != PrincipalRole::Owner {
465 return Err(ProvenanceError::AuthorNotWriter);
466 }
467
468 if let Some(prev) = previous {
470 let prev_keys: std::collections::HashSet<&String> = prev.shares().keys().collect();
471 let current_keys: std::collections::HashSet<&String> = manifest.shares().keys().collect();
472
473 let removed_keys: Vec<&&String> = prev_keys.difference(¤t_keys).collect();
474
475 if !removed_keys.is_empty() {
476 let author_prev_share = prev
478 .shares()
479 .get(&author_hex)
480 .ok_or(ProvenanceError::UnauthorizedShareRemoval)?;
481 if *author_prev_share.role() != PrincipalRole::Owner {
482 return Err(ProvenanceError::UnauthorizedShareRemoval);
483 }
484 }
485 }
486
487 tracing::debug!(
488 "Author verified: bucket={}, author={}",
489 manifest.id(),
490 author_hex
491 );
492
493 Ok(ProvenanceResult::Valid)
494}
495
496fn verify_provenance<L>(
512 peer: &Peer<L>,
513 manifest: &Manifest,
514 previous: Option<&Manifest>,
515) -> Result<ProvenanceResult, ProvenanceError>
516where
517 L: BucketLogProvider + Clone + Send + Sync + 'static,
518 L::Error: std::error::Error + Send + Sync + 'static,
519{
520 let our_pub_key = PublicKey::from(*peer.secret().public());
521 let our_key_hex = our_pub_key.to_hex();
522
523 let we_are_authorized = manifest
525 .shares()
526 .iter()
527 .any(|(key_hex, _)| key_hex == &our_key_hex);
528
529 if !we_are_authorized {
530 return Ok(ProvenanceResult::NotAuthorized);
531 }
532
533 let result = verify_author(manifest, previous)?;
535
536 if manifest.is_signed() {
538 let author = manifest.author().expect("is_signed() was true");
539 tracing::debug!(
540 "Provenance valid: bucket={}, author={}, our_key={}",
541 manifest.id(),
542 author.to_hex(),
543 our_key_hex
544 );
545 }
546
547 Ok(result)
548}
549
550#[cfg(test)]
551mod tests {
552 use super::*;
553 use crate::crypto::{SecretKey, SecretShare};
554 use crate::mount::Share;
555
556 fn create_test_manifest(owner: &SecretKey) -> Manifest {
557 let share = SecretShare::default();
558 Manifest::new(
559 uuid::Uuid::new_v4(),
560 "test".to_string(),
561 owner.public(),
562 share,
563 Link::default(),
564 Link::default(),
565 0,
566 )
567 }
568
569 #[test]
570 fn test_verify_author_valid_owner() {
571 let owner = SecretKey::generate();
572 let mut manifest = create_test_manifest(&owner);
573 manifest.sign(&owner).unwrap();
574
575 let result = verify_author(&manifest, None).unwrap();
576 assert_eq!(result, ProvenanceResult::Valid);
577 }
578
579 #[test]
580 fn test_verify_author_rejects_non_writer() {
581 let owner = SecretKey::generate();
582 let mirror = SecretKey::generate();
583
584 let mut manifest = create_test_manifest(&owner);
585 manifest.add_share(Share::new_mirror(mirror.public()));
586 manifest.sign(&mirror).unwrap();
587
588 let result = verify_author(&manifest, None);
589 assert!(matches!(result, Err(ProvenanceError::AuthorNotWriter)));
590 }
591
592 #[test]
593 fn test_verify_author_rejects_unknown_signer() {
594 let owner = SecretKey::generate();
595 let attacker = SecretKey::generate();
596
597 let mut manifest = create_test_manifest(&owner);
598 manifest.sign(&attacker).unwrap();
599
600 let result = verify_author(&manifest, None);
601 assert!(matches!(result, Err(ProvenanceError::AuthorNotInShares)));
602 }
603
604 #[test]
605 fn test_verify_author_accepts_unsigned_legacy() {
606 let owner = SecretKey::generate();
607 let manifest = create_test_manifest(&owner);
608 let result = verify_author(&manifest, None).unwrap();
611 assert_eq!(result, ProvenanceResult::UnsignedLegacy);
612 }
613}