use anyhow::{anyhow, Result};
use uuid::Uuid;
use crate::bucket_log::BucketLogProvider;
use crate::crypto::PublicKey;
use crate::linked_data::Link;
use crate::mount::Manifest;
use crate::mount::PrincipalRole;
use crate::peer::Peer;
use super::{DownloadPinsJob, ProvenanceError, SyncJob};
#[derive(Debug, Clone, PartialEq, Eq)]
enum ProvenanceResult {
Valid,
NotAuthorized,
UnsignedLegacy,
}
#[derive(Debug, Clone)]
pub struct SyncTarget {
pub link: Link,
pub height: u64,
pub peer_ids: Vec<PublicKey>,
}
#[derive(Debug, Clone)]
pub struct SyncBucketJob {
pub bucket_id: Uuid,
pub target: SyncTarget,
}
pub async fn execute<L>(peer: &Peer<L>, job: SyncBucketJob) -> Result<()>
where
L: BucketLogProvider + Clone + Send + Sync + 'static,
L::Error: std::error::Error + Send + Sync + 'static,
{
let peer_ids_hex: Vec<String> = job.target.peer_ids.iter().map(|p| p.to_hex()).collect();
tracing::info!(
"Syncing bucket {} from {} peer(s) {:?} to link {:?} at height {}",
job.bucket_id,
job.target.peer_ids.len(),
peer_ids_hex,
job.target.link,
job.target.height
);
let exists: bool = peer.logs().exists(job.bucket_id).await?;
if !exists {
let shared_by = job.target.peer_ids.first().map(|p| p.to_hex());
if let Err(e) = peer
.logs()
.on_new_bucket_discovered(job.bucket_id, shared_by)
.await
{
tracing::warn!(
"Failed to signal new bucket discovery for {}: {}",
job.bucket_id,
e
);
}
}
let common_ancestor = if exists {
find_common_ancestor(peer, job.bucket_id, &job.target.link, &job.target.peer_ids).await?
} else {
None
};
if exists && common_ancestor.is_none() {
tracing::warn!(
"Bucket {} diverged from peer(s) {:?}",
job.bucket_id,
peer_ids_hex
);
return Ok(());
}
let stop_link_owned = common_ancestor.as_ref().map(|ancestor| ancestor.0.clone());
let stop_link = stop_link_owned.as_ref();
if stop_link.is_none() && common_ancestor.is_none() {
tracing::info!(
"No common ancestor for bucket {}, syncing from genesis",
job.bucket_id
);
}
let trusted_base: Option<Manifest> = if let Some(link) = stop_link {
Some(peer.blobs().get_cbor(&link.hash()).await?)
} else {
None
};
let manifests = download_manifest_chain(
peer,
&job.target.link,
stop_link,
&job.target.peer_ids,
trusted_base.as_ref(),
)
.await?;
if manifests.is_empty() {
tracing::info!("No new manifests to sync, already up to date");
return Ok(());
};
let latest_manifest = &manifests.last().unwrap().0;
let trusted_base_ref = trusted_base.as_ref();
match verify_provenance(peer, latest_manifest, trusted_base_ref)? {
ProvenanceResult::Valid => {
tracing::debug!("Provenance verification passed");
}
ProvenanceResult::UnsignedLegacy => {
tracing::warn!(
"Accepting unsigned manifest for bucket {} (migration mode)",
latest_manifest.id()
);
}
ProvenanceResult::NotAuthorized => {
tracing::warn!("Provenance verification failed: our key not in bucket shares");
return Ok(());
}
}
apply_manifest_chain(peer, job.bucket_id, &manifests).await?;
Ok(())
}
async fn download_manifest_chain<L>(
peer: &Peer<L>,
start_link: &Link,
stop_link: Option<&Link>,
peer_ids: &[PublicKey],
trusted_base: Option<&Manifest>,
) -> Result<Vec<(Manifest, Link)>>
where
L: BucketLogProvider + Clone + Send + Sync + 'static,
L::Error: std::error::Error + Send + Sync + 'static,
{
tracing::debug!(
"Downloading manifest chain from {:?}, stop_at: {:?}, using {} peer(s)",
start_link,
stop_link,
peer_ids.len()
);
let mut manifests = Vec::new();
let mut current_link = start_link.clone();
loop {
peer.blobs()
.download_hash(current_link.hash(), peer_ids.to_vec(), peer.endpoint())
.await
.map_err(|e| {
anyhow!(
"Failed to download manifest {:?} from peers: {}",
current_link,
e
)
})?;
let manifest: Manifest = peer.blobs().get_cbor(¤t_link.hash()).await?;
if let Some(stop_link) = stop_link {
if ¤t_link == stop_link {
tracing::debug!("Reached stop_at link, stopping download");
break;
}
}
manifests.push((manifest.clone(), current_link.clone()));
match manifest.previous() {
Some(prev_link) => {
current_link = prev_link.clone();
}
None => {
tracing::debug!("Reached genesis manifest, stopping download");
break;
}
}
}
manifests.reverse();
tracing::debug!("Downloaded {} manifests", manifests.len());
let mut previous: Option<&Manifest> = trusted_base;
for (manifest, link) in manifests.iter() {
verify_author(manifest, previous).map_err(|e| ProvenanceError::InvalidManifestInChain {
link: link.clone(),
reason: e.to_string(),
})?;
previous = Some(manifest);
}
Ok(manifests)
}
async fn find_common_ancestor<L>(
peer: &Peer<L>,
bucket_id: Uuid,
link: &Link,
peer_ids: &[PublicKey],
) -> Result<Option<(Link, u64)>>
where
L: BucketLogProvider + Clone + Send + Sync + 'static,
L::Error: std::error::Error + Send + Sync + 'static,
{
tracing::debug!(
"Finding common ancestor starting from {:?} using {} peer(s)",
link,
peer_ids.len()
);
let mut current_link = link.clone();
let mut manifests_checked = 0;
loop {
manifests_checked += 1;
tracing::debug!(
"Checking manifest {} at link {:?}",
manifests_checked,
current_link
);
peer.blobs()
.download_hash(current_link.hash(), peer_ids.to_vec(), peer.endpoint())
.await
.map_err(|e| {
anyhow!(
"Failed to download manifest {:?} from peers: {}",
current_link,
e
)
})?;
let manifest: Manifest = peer.blobs().get_cbor(¤t_link.hash()).await?;
let height = manifest.height();
match peer.logs().has(bucket_id, current_link.clone()).await {
Ok(heights) if !heights.is_empty() => {
tracing::info!(
"Found common ancestor at link {:?} with height {} (in our log at heights {:?})",
current_link,
height,
heights
);
return Ok(Some((current_link, height)));
}
Ok(_) => {
tracing::debug!("Link {:?} not in our log, checking previous", current_link);
}
Err(e) => {
tracing::warn!("Error checking for link in log: {}", e);
}
}
match manifest.previous() {
Some(prev_link) => {
current_link = prev_link.clone();
}
None => {
tracing::debug!(
"Reached genesis after checking {} manifests, no common ancestor found",
manifests_checked
);
return Ok(None);
}
}
}
}
async fn apply_manifest_chain<L>(
peer: &Peer<L>,
bucket_id: Uuid,
manifests: &[(Manifest, Link)],
) -> Result<()>
where
L: BucketLogProvider + Clone + Send + Sync + 'static,
L::Error: std::error::Error + Send + Sync + 'static,
{
tracing::info!("Applying {} manifests to log", manifests.len());
let should_sync = peer
.logs()
.should_sync_content(bucket_id)
.await
.unwrap_or(true);
for (manifest, link) in manifests.iter() {
let previous = manifest.previous().clone();
let height = manifest.height();
let is_published = manifest.is_published();
tracing::info!(
"Appending manifest to log: height={}, link={:?}, previous={:?}, published={}",
height,
link,
previous,
is_published
);
peer.logs()
.append(
bucket_id,
manifest.name().to_string(),
link.clone(),
previous,
height,
is_published,
)
.await
.map_err(|e| anyhow!("Failed to append manifest at height {}: {}", height, e))?;
if should_sync {
let pins_link = manifest.pins().clone();
let peer_ids = manifest
.shares()
.iter()
.map(|share| share.1.principal().identity)
.collect();
peer.dispatch(SyncJob::DownloadPins(DownloadPinsJob {
pins_link,
peer_ids,
}))
.await?;
} else {
tracing::info!(
"Skipping pin download for bucket {} (not active)",
bucket_id
);
}
}
tracing::info!("Successfully applied {} manifests to log", manifests.len());
Ok(())
}
fn verify_author(
manifest: &Manifest,
previous: Option<&Manifest>,
) -> Result<ProvenanceResult, ProvenanceError> {
if !manifest.is_signed() {
tracing::warn!("Received unsigned manifest for bucket {}", manifest.id());
return Ok(ProvenanceResult::UnsignedLegacy);
}
match manifest.verify_signature() {
Ok(true) => {}
Ok(false) => return Err(ProvenanceError::InvalidSignature),
Err(e) => {
tracing::warn!("Signature verification error: {}", e);
return Err(ProvenanceError::InvalidSignature);
}
}
let author = manifest.author().expect("is_signed() was true");
let author_hex = author.to_hex();
let check_shares = previous
.map(|p| p.shares())
.unwrap_or_else(|| manifest.shares());
let author_share = check_shares
.get(&author_hex)
.ok_or(ProvenanceError::AuthorNotInShares)?;
if *author_share.role() != PrincipalRole::Owner {
return Err(ProvenanceError::AuthorNotWriter);
}
if let Some(prev) = previous {
let prev_keys: std::collections::HashSet<&String> = prev.shares().keys().collect();
let current_keys: std::collections::HashSet<&String> = manifest.shares().keys().collect();
let removed_keys: Vec<&&String> = prev_keys.difference(¤t_keys).collect();
if !removed_keys.is_empty() {
let author_prev_share = prev
.shares()
.get(&author_hex)
.ok_or(ProvenanceError::UnauthorizedShareRemoval)?;
if *author_prev_share.role() != PrincipalRole::Owner {
return Err(ProvenanceError::UnauthorizedShareRemoval);
}
}
}
tracing::debug!(
"Author verified: bucket={}, author={}",
manifest.id(),
author_hex
);
Ok(ProvenanceResult::Valid)
}
fn verify_provenance<L>(
peer: &Peer<L>,
manifest: &Manifest,
previous: Option<&Manifest>,
) -> Result<ProvenanceResult, ProvenanceError>
where
L: BucketLogProvider + Clone + Send + Sync + 'static,
L::Error: std::error::Error + Send + Sync + 'static,
{
let our_pub_key = PublicKey::from(*peer.secret().public());
let our_key_hex = our_pub_key.to_hex();
let we_are_authorized = manifest
.shares()
.iter()
.any(|(key_hex, _)| key_hex == &our_key_hex);
if !we_are_authorized {
return Ok(ProvenanceResult::NotAuthorized);
}
let result = verify_author(manifest, previous)?;
if manifest.is_signed() {
let author = manifest.author().expect("is_signed() was true");
tracing::debug!(
"Provenance valid: bucket={}, author={}, our_key={}",
manifest.id(),
author.to_hex(),
our_key_hex
);
}
Ok(result)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::crypto::{SecretKey, SecretShare};
use crate::mount::Share;
fn create_test_manifest(owner: &SecretKey) -> Manifest {
let share = SecretShare::default();
Manifest::new(
uuid::Uuid::new_v4(),
"test".to_string(),
owner.public(),
share,
Link::default(),
Link::default(),
0,
)
}
#[test]
fn test_verify_author_valid_owner() {
let owner = SecretKey::generate();
let mut manifest = create_test_manifest(&owner);
manifest.sign(&owner).unwrap();
let result = verify_author(&manifest, None).unwrap();
assert_eq!(result, ProvenanceResult::Valid);
}
#[test]
fn test_verify_author_rejects_non_writer() {
let owner = SecretKey::generate();
let mirror = SecretKey::generate();
let mut manifest = create_test_manifest(&owner);
manifest.add_share(Share::new_mirror(mirror.public()));
manifest.sign(&mirror).unwrap();
let result = verify_author(&manifest, None);
assert!(matches!(result, Err(ProvenanceError::AuthorNotWriter)));
}
#[test]
fn test_verify_author_rejects_unknown_signer() {
let owner = SecretKey::generate();
let attacker = SecretKey::generate();
let mut manifest = create_test_manifest(&owner);
manifest.sign(&attacker).unwrap();
let result = verify_author(&manifest, None);
assert!(matches!(result, Err(ProvenanceError::AuthorNotInShares)));
}
#[test]
fn test_verify_author_accepts_unsigned_legacy() {
let owner = SecretKey::generate();
let manifest = create_test_manifest(&owner);
let result = verify_author(&manifest, None).unwrap();
assert_eq!(result, ProvenanceResult::UnsignedLegacy);
}
}