1use anyhow::{anyhow, Result};
6use uuid::Uuid;
7
8use crate::bucket_log::BucketLogProvider;
9use crate::crypto::PublicKey;
10use crate::linked_data::Link;
11use crate::mount::Manifest;
12use crate::mount::PrincipalRole;
13use crate::peer::Peer;
14
15use super::{DownloadPinsJob, ProvenanceError, SyncJob};
16
17#[derive(Debug, Clone, PartialEq, Eq)]
19enum ProvenanceResult {
20 Valid,
22 NotAuthorized,
24 UnsignedLegacy,
26}
27
28#[derive(Debug, Clone)]
30pub struct SyncTarget {
31 pub link: Link,
33 pub height: u64,
35 pub peer_ids: Vec<PublicKey>,
38}
39
40#[derive(Debug, Clone)]
42pub struct SyncBucketJob {
43 pub bucket_id: Uuid,
44 pub target: SyncTarget,
45}
46
47pub async fn execute<L>(peer: &Peer<L>, job: SyncBucketJob) -> Result<()>
53where
54 L: BucketLogProvider + Clone + Send + Sync + 'static,
55 L::Error: std::error::Error + Send + Sync + 'static,
56{
57 let peer_ids_hex: Vec<String> = job.target.peer_ids.iter().map(|p| p.to_hex()).collect();
58 tracing::info!(
59 "Syncing bucket {} from {} peer(s) {:?} to link {:?} at height {}",
60 job.bucket_id,
61 job.target.peer_ids.len(),
62 peer_ids_hex,
63 job.target.link,
64 job.target.height
65 );
66
67 let exists: bool = peer.logs().exists(job.bucket_id).await?;
68
69 let common_ancestor = if exists {
70 find_common_ancestor(peer, job.bucket_id, &job.target.link, &job.target.peer_ids).await?
73 } else {
74 None
75 };
76
77 if exists && common_ancestor.is_none() {
84 tracing::warn!(
85 "Bucket {} diverged from peer(s) {:?}",
86 job.bucket_id,
87 peer_ids_hex
88 );
89 return Ok(());
90 }
91
92 let stop_link_owned = common_ancestor.as_ref().map(|ancestor| ancestor.0.clone());
94 let stop_link = stop_link_owned.as_ref();
95
96 if stop_link.is_none() && common_ancestor.is_none() {
97 tracing::info!(
99 "No common ancestor for bucket {}, syncing from genesis",
100 job.bucket_id
101 );
102 }
103
104 let trusted_base: Option<Manifest> = if let Some(link) = stop_link {
110 Some(peer.blobs().get_cbor(&link.hash()).await?)
111 } else {
112 None
113 };
114
115 let manifests = download_manifest_chain(
117 peer,
118 &job.target.link,
119 stop_link,
120 &job.target.peer_ids,
121 trusted_base.as_ref(),
122 )
123 .await?;
124
125 if manifests.is_empty() {
129 tracing::info!("No new manifests to sync, already up to date");
130 return Ok(());
131 };
132
133 let latest_manifest = &manifests.last().unwrap().0;
136 let trusted_base_ref = trusted_base.as_ref();
137 match verify_provenance(peer, latest_manifest, trusted_base_ref)? {
138 ProvenanceResult::Valid => {
139 tracing::debug!("Provenance verification passed");
140 }
141 ProvenanceResult::UnsignedLegacy => {
142 tracing::warn!(
143 "Accepting unsigned manifest for bucket {} (migration mode)",
144 latest_manifest.id()
145 );
146 }
147 ProvenanceResult::NotAuthorized => {
148 tracing::warn!("Provenance verification failed: our key not in bucket shares");
149 return Ok(());
150 }
151 }
152
153 apply_manifest_chain(peer, job.bucket_id, &manifests).await?;
155
156 Ok(())
157}
158
159async fn download_manifest_chain<L>(
171 peer: &Peer<L>,
172 start_link: &Link,
173 stop_link: Option<&Link>,
174 peer_ids: &[PublicKey],
175 trusted_base: Option<&Manifest>,
176) -> Result<Vec<(Manifest, Link)>>
177where
178 L: BucketLogProvider + Clone + Send + Sync + 'static,
179 L::Error: std::error::Error + Send + Sync + 'static,
180{
181 tracing::debug!(
182 "Downloading manifest chain from {:?}, stop_at: {:?}, using {} peer(s)",
183 start_link,
184 stop_link,
185 peer_ids.len()
186 );
187
188 let mut manifests = Vec::new();
189 let mut current_link = start_link.clone();
190
191 loop {
193 peer.blobs()
195 .download_hash(current_link.hash(), peer_ids.to_vec(), peer.endpoint())
196 .await
197 .map_err(|e| {
198 anyhow!(
199 "Failed to download manifest {:?} from peers: {}",
200 current_link,
201 e
202 )
203 })?;
204
205 let manifest: Manifest = peer.blobs().get_cbor(¤t_link.hash()).await?;
207
208 if let Some(stop_link) = stop_link {
210 if ¤t_link == stop_link {
211 tracing::debug!("Reached stop_at link, stopping download");
212 break;
213 }
214 }
215
216 manifests.push((manifest.clone(), current_link.clone()));
217
218 match manifest.previous() {
220 Some(prev_link) => {
221 current_link = prev_link.clone();
222 }
223 None => {
224 tracing::debug!("Reached genesis manifest, stopping download");
226 break;
227 }
228 }
229 }
230
231 manifests.reverse();
233
234 tracing::debug!("Downloaded {} manifests", manifests.len());
235
236 let mut previous: Option<&Manifest> = trusted_base;
241
242 for (manifest, link) in manifests.iter() {
243 verify_author(manifest, previous).map_err(|e| ProvenanceError::InvalidManifestInChain {
244 link: link.clone(),
245 reason: e.to_string(),
246 })?;
247 previous = Some(manifest);
248 }
249
250 Ok(manifests)
251}
252
253async fn find_common_ancestor<L>(
273 peer: &Peer<L>,
274 bucket_id: Uuid,
275 link: &Link,
276 peer_ids: &[PublicKey],
277) -> Result<Option<(Link, u64)>>
278where
279 L: BucketLogProvider + Clone + Send + Sync + 'static,
280 L::Error: std::error::Error + Send + Sync + 'static,
281{
282 tracing::debug!(
283 "Finding common ancestor starting from {:?} using {} peer(s)",
284 link,
285 peer_ids.len()
286 );
287
288 let mut current_link = link.clone();
289 let mut manifests_checked = 0;
290
291 loop {
292 manifests_checked += 1;
293 tracing::debug!(
294 "Checking manifest {} at link {:?}",
295 manifests_checked,
296 current_link
297 );
298
299 peer.blobs()
303 .download_hash(current_link.hash(), peer_ids.to_vec(), peer.endpoint())
304 .await
305 .map_err(|e| {
306 anyhow!(
307 "Failed to download manifest {:?} from peers: {}",
308 current_link,
309 e
310 )
311 })?;
312
313 let manifest: Manifest = peer.blobs().get_cbor(¤t_link.hash()).await?;
315 let height = manifest.height();
316
317 match peer.logs().has(bucket_id, current_link.clone()).await {
319 Ok(heights) if !heights.is_empty() => {
320 tracing::info!(
321 "Found common ancestor at link {:?} with height {} (in our log at heights {:?})",
322 current_link,
323 height,
324 heights
325 );
326 return Ok(Some((current_link, height)));
327 }
328 Ok(_) => {
329 tracing::debug!("Link {:?} not in our log, checking previous", current_link);
331 }
332 Err(e) => {
333 tracing::warn!("Error checking for link in log: {}", e);
334 }
336 }
337
338 match manifest.previous() {
340 Some(prev_link) => {
341 current_link = prev_link.clone();
342 }
343 None => {
344 tracing::debug!(
346 "Reached genesis after checking {} manifests, no common ancestor found",
347 manifests_checked
348 );
349 return Ok(None);
350 }
351 }
352 }
353}
354
355async fn apply_manifest_chain<L>(
359 peer: &Peer<L>,
360 bucket_id: Uuid,
361 manifests: &[(Manifest, Link)],
362) -> Result<()>
363where
364 L: BucketLogProvider + Clone + Send + Sync + 'static,
365 L::Error: std::error::Error + Send + Sync + 'static,
366{
367 tracing::info!("Applying {} manifests to log", manifests.len(),);
368
369 if let Some((_i, (manifest, link))) = manifests.iter().enumerate().next() {
370 let previous = manifest.previous().clone();
371 let height = manifest.height();
372 let is_published = manifest.is_published();
373
374 tracing::info!(
375 "Appending manifest to log: height={}, link={:?}, previous={:?}, published={}",
376 height,
377 link,
378 previous,
379 is_published
380 );
381
382 peer.logs()
383 .append(
384 bucket_id,
385 manifest.name().to_string(),
386 link.clone(),
387 previous,
388 height,
389 is_published,
390 )
391 .await
392 .map_err(|e| anyhow!("Failed to append manifest at height {}: {}", height, e))?;
393
394 let pins_link = manifest.pins().clone();
395 let peer_ids = manifest
396 .shares()
397 .iter()
398 .map(|share| share.1.principal().identity)
399 .collect();
400 return peer
401 .dispatch(SyncJob::DownloadPins(DownloadPinsJob {
402 pins_link,
403 peer_ids,
404 }))
405 .await;
406 }
407
408 tracing::info!("Successfully applied {} manifests to log", manifests.len());
409 Ok(())
410}
411
412fn verify_author(
429 manifest: &Manifest,
430 previous: Option<&Manifest>,
431) -> Result<ProvenanceResult, ProvenanceError> {
432 if !manifest.is_signed() {
434 tracing::warn!("Received unsigned manifest for bucket {}", manifest.id());
436 return Ok(ProvenanceResult::UnsignedLegacy);
437 }
438
439 match manifest.verify_signature() {
441 Ok(true) => {}
442 Ok(false) => return Err(ProvenanceError::InvalidSignature),
443 Err(e) => {
444 tracing::warn!("Signature verification error: {}", e);
445 return Err(ProvenanceError::InvalidSignature);
446 }
447 }
448
449 let author = manifest.author().expect("is_signed() was true");
453 let author_hex = author.to_hex();
454
455 let check_shares = previous
456 .map(|p| p.shares())
457 .unwrap_or_else(|| manifest.shares());
458
459 let author_share = check_shares
460 .get(&author_hex)
461 .ok_or(ProvenanceError::AuthorNotInShares)?;
462
463 if *author_share.role() != PrincipalRole::Owner {
465 return Err(ProvenanceError::AuthorNotWriter);
466 }
467
468 tracing::debug!(
469 "Author verified: bucket={}, author={}",
470 manifest.id(),
471 author_hex
472 );
473
474 Ok(ProvenanceResult::Valid)
475}
476
477fn verify_provenance<L>(
493 peer: &Peer<L>,
494 manifest: &Manifest,
495 previous: Option<&Manifest>,
496) -> Result<ProvenanceResult, ProvenanceError>
497where
498 L: BucketLogProvider + Clone + Send + Sync + 'static,
499 L::Error: std::error::Error + Send + Sync + 'static,
500{
501 let our_pub_key = PublicKey::from(*peer.secret().public());
502 let our_key_hex = our_pub_key.to_hex();
503
504 let we_are_authorized = manifest
506 .shares()
507 .iter()
508 .any(|(key_hex, _)| key_hex == &our_key_hex);
509
510 if !we_are_authorized {
511 return Ok(ProvenanceResult::NotAuthorized);
512 }
513
514 let result = verify_author(manifest, previous)?;
516
517 if manifest.is_signed() {
519 let author = manifest.author().expect("is_signed() was true");
520 tracing::debug!(
521 "Provenance valid: bucket={}, author={}, our_key={}",
522 manifest.id(),
523 author.to_hex(),
524 our_key_hex
525 );
526 }
527
528 Ok(result)
529}
530
531#[cfg(test)]
532mod tests {
533 use super::*;
534 use crate::crypto::{SecretKey, SecretShare};
535 use crate::mount::Share;
536
537 fn create_test_manifest(owner: &SecretKey) -> Manifest {
538 let share = SecretShare::default();
539 Manifest::new(
540 uuid::Uuid::new_v4(),
541 "test".to_string(),
542 owner.public(),
543 share,
544 Link::default(),
545 Link::default(),
546 0,
547 )
548 }
549
550 #[test]
551 fn test_verify_author_valid_owner() {
552 let owner = SecretKey::generate();
553 let mut manifest = create_test_manifest(&owner);
554 manifest.sign(&owner).unwrap();
555
556 let result = verify_author(&manifest, None).unwrap();
557 assert_eq!(result, ProvenanceResult::Valid);
558 }
559
560 #[test]
561 fn test_verify_author_rejects_non_writer() {
562 let owner = SecretKey::generate();
563 let mirror = SecretKey::generate();
564
565 let mut manifest = create_test_manifest(&owner);
566 manifest.add_share(Share::new_mirror(mirror.public()));
567 manifest.sign(&mirror).unwrap();
568
569 let result = verify_author(&manifest, None);
570 assert!(matches!(result, Err(ProvenanceError::AuthorNotWriter)));
571 }
572
573 #[test]
574 fn test_verify_author_rejects_unknown_signer() {
575 let owner = SecretKey::generate();
576 let attacker = SecretKey::generate();
577
578 let mut manifest = create_test_manifest(&owner);
579 manifest.sign(&attacker).unwrap();
580
581 let result = verify_author(&manifest, None);
582 assert!(matches!(result, Err(ProvenanceError::AuthorNotInShares)));
583 }
584
585 #[test]
586 fn test_verify_author_accepts_unsigned_legacy() {
587 let owner = SecretKey::generate();
588 let manifest = create_test_manifest(&owner);
589 let result = verify_author(&manifest, None).unwrap();
592 assert_eq!(result, ProvenanceResult::UnsignedLegacy);
593 }
594}