use crate::types::{Blake3Hash, CheckOutcome, OperationEvent, ProfileId, Receipt, Verdict, canonical_bytes};
use crate::chain::{genesis_hash, FORMAT_VERSION};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};
use std::thread;
pub const DEFAULT_SHARD_SIZE: usize = 100_000;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReceiptShard {
pub receipt_id: Blake3Hash,
pub shard_index: usize,
pub start_seq: u64,
pub end_seq: u64,
pub prev_chain_hash: Blake3Hash,
pub shard_chain_hash: Blake3Hash,
pub events: Vec<OperationEvent>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReceiptManifest {
pub sharding_version: String,
pub receipt_id: Blake3Hash,
pub total_events: u64,
pub shard_count: usize,
pub shard_size: usize,
pub final_chain_hash: Blake3Hash,
}
pub trait KademliaDHT: Send + Sync {
fn put_shard(&self, shard: ReceiptShard) -> Result<(), String>;
fn get_shard(&self, receipt_id: &Blake3Hash, index: usize) -> Result<ReceiptShard, String>;
fn put_manifest(&self, manifest: ReceiptManifest) -> Result<(), String>;
fn get_manifest(&self, receipt_id: &Blake3Hash) -> Result<ReceiptManifest, String>;
}
#[derive(Debug, thiserror::Error)]
pub enum ShardingError {
#[error("DHT error: {0}")]
Dht(String),
#[error("Chain integrity failure at shard {index}: expected {expected}, found {found}")]
ChainMismatch { index: usize, expected: String, found: String },
#[error("Continuity failure at shard {index}: expected seq {expected}, found {found}")]
SeqMismatch { index: usize, expected: u64, found: u64 },
#[error("Shard boundary mismatch between {index} and {next}")]
BoundaryMismatch { index: usize, next: usize },
#[error("Verification failed: {0}")]
Failure(String),
}
pub struct DistributedVerifier {
dht: Arc<dyn KademliaDHT>,
}
impl DistributedVerifier {
pub fn new(dht: Arc<dyn KademliaDHT>) -> Self {
Self { dht }
}
pub fn verify_distributed(&self, receipt_id: &Blake3Hash) -> Result<Verdict, ShardingError> {
let manifest = self.dht.get_manifest(receipt_id).map_err(ShardingError::Dht)?;
if manifest.sharding_version != "1000x/v1" {
return Ok(self.failed_verdict("check_format", "Unsupported sharding version"));
}
let shard_count = manifest.shard_count;
let outcomes = Arc::new(Mutex::new(BTreeMap::new()));
let mut handles = Vec::new();
for i in 0..shard_count {
let dht = Arc::clone(&self.dht);
let rid = receipt_id.clone();
let outcomes = Arc::clone(&outcomes);
handles.push(thread::spawn(move || {
let shard = dht.get_shard(&rid, i).map_err(|e| ShardingError::Dht(e))?;
let shard_outcome = verify_shard(&shard);
let mut lock = outcomes.lock().map_err(|_| ShardingError::Failure("Mutex poisoned".to_string()))?;
lock.insert(i, (shard, shard_outcome));
Ok::<(), ShardingError>(())
}));
}
for handle in handles {
handle.join().map_err(|_| ShardingError::Failure("Thread panicked".to_string()))??;
}
let lock = outcomes.lock().map_err(|_| ShardingError::Failure("Mutex poisoned".to_string()))?;
let mut final_outcomes = Vec::new();
let mut current_hash = genesis_hash();
let mut current_seq = 0u64;
for i in 0..shard_count {
let (shard, shard_verdict) = lock.get(&i).ok_or_else(|| ShardingError::Failure(format!("Missing shard {} in results", i)))?;
if shard.prev_chain_hash != current_hash {
return Err(ShardingError::BoundaryMismatch { index: i.saturating_sub(1), next: i });
}
if shard.start_seq != current_seq {
return Err(ShardingError::SeqMismatch { index: i, expected: current_seq, found: shard.start_seq });
}
if !shard_verdict.accepted {
return Ok(shard_verdict.clone());
}
current_hash = shard.shard_chain_hash.clone();
current_seq = shard.end_seq + 1;
final_outcomes.extend(shard_verdict.outcomes.clone());
}
if current_hash != manifest.final_chain_hash {
return Err(ShardingError::ChainMismatch {
index: shard_count - 1,
expected: manifest.final_chain_hash.to_string(),
found: current_hash.to_string()
});
}
Ok(Verdict {
accepted: true,
profile: ProfileId::CoreV1,
outcomes: final_outcomes,
reason: "All distributed shards passed 7-stage verification".to_string(),
})
}
fn failed_verdict(&self, stage: &str, detail: &str) -> Verdict {
Verdict {
accepted: false,
profile: ProfileId::CoreV1,
outcomes: vec![CheckOutcome {
stage: stage.to_string(),
passed: false,
detail: detail.to_string(),
}],
reason: format!("{}: {}", stage, detail),
}
}
}
fn verify_shard(shard: &ReceiptShard) -> Verdict {
let mut acc = shard.prev_chain_hash.clone();
for event in &shard.events {
acc = match fold_event_internal(&acc, event) {
Ok(h) => h,
Err(e) => return failed_shard_verdict("chain_integrity", &e),
};
}
if acc != shard.shard_chain_hash {
return failed_shard_verdict("chain_integrity", "Recomputed shard hash mismatch");
}
for (i, event) in shard.events.iter().enumerate() {
let expected_seq = shard.start_seq + i as u64;
if event.seq != expected_seq {
return failed_shard_verdict("continuity", &format!("Expected seq {}, found {}", expected_seq, event.seq));
}
if event.payload_commitment.as_hex().len() != 64 {
return failed_shard_verdict("verify_commitments", "Malformed commitment");
}
}
Verdict {
accepted: true,
profile: ProfileId::CoreV1,
outcomes: vec![], reason: "Shard passed local checks".to_string(),
}
}
fn failed_shard_verdict(stage: &str, detail: &str) -> Verdict {
Verdict {
accepted: false,
profile: ProfileId::CoreV1,
outcomes: vec![CheckOutcome {
stage: stage.to_string(),
passed: false,
detail: detail.to_string(),
}],
reason: detail.to_string(),
}
}
fn fold_event_internal(prev: &Blake3Hash, event: &OperationEvent) -> Result<Blake3Hash, String> {
let event_bytes = canonical_bytes(event).map_err(|e| e.to_string())?;
let mut buf = Vec::with_capacity(prev.as_hex().len() + event_bytes.len());
buf.extend_from_slice(prev.as_hex().as_bytes());
buf.extend_from_slice(&event_bytes);
Ok(Blake3Hash::from_bytes(&buf))
}
pub fn shard_receipt(receipt: Receipt, shard_size: usize) -> Result<(ReceiptManifest, Vec<ReceiptShard>), crate::error::AffidavitError> {
let shard_count = (receipt.events.len() + shard_size - 1) / shard_size;
let mut shards = Vec::with_capacity(shard_count);
let mut current_hash = genesis_hash();
let events = receipt.events;
for i in 0..shard_count {
let start = i * shard_size;
let end = std::cmp::min(start + shard_size, events.len());
let shard_events = events[start..end].to_vec();
let prev_hash = current_hash.clone();
for event in &shard_events {
current_hash = fold_event_internal(¤t_hash, event).map_err(crate::error::AffidavitError::ContentAddressing)?;
}
shards.push(ReceiptShard {
receipt_id: receipt.chain_hash.clone(),
shard_index: i,
start_seq: start as u64,
end_seq: if end > 0 { (end - 1) as u64 } else { 0 },
prev_chain_hash: prev_hash,
shard_chain_hash: current_hash.clone(),
events: shard_events,
});
}
let manifest = ReceiptManifest {
sharding_version: "1000x/v1".to_string(),
receipt_id: receipt.chain_hash.clone(),
total_events: events.len() as u64,
shard_count,
shard_size,
final_chain_hash: receipt.chain_hash,
};
Ok((manifest, shards))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::chain::ChainAssembler;
use crate::types::ObjectRef;
fn test_event(seq: u64) -> OperationEvent {
OperationEvent {
id: format!("e{}", seq),
seq,
event_type: "test".to_string(),
objects: vec![ObjectRef { id: "o".to_string(), obj_type: "t".to_string(), qualifier: None }],
payload_commitment: Blake3Hash::from_bytes(b"payload"),
}
}
#[test]
fn test_distributed_verification_flow() -> Result<(), Box<dyn std::error::Error>> {
let mut asm = ChainAssembler::new();
for i in 0..10 {
asm.append(test_event(i as u64))?;
}
let receipt = asm.finalize();
let receipt_id = receipt.chain_hash.clone();
let (manifest, shards) = shard_receipt(receipt, 3)?;
struct MockDht {
manifests: Mutex<BTreeMap<Blake3Hash, ReceiptManifest>>,
shards: Mutex<BTreeMap<(Blake3Hash, usize), ReceiptShard>>,
}
impl KademliaDHT for MockDht {
fn put_shard(&self, shard: ReceiptShard) -> Result<(), String> {
self.shards.lock().map_err(|_| "Poisoned".to_string())?.insert((shard.receipt_id.clone(), shard.shard_index), shard);
Ok(())
}
fn get_shard(&self, receipt_id: &Blake3Hash, index: usize) -> Result<ReceiptShard, String> {
self.shards.lock().map_err(|_| "Poisoned".to_string())?.get(&(receipt_id.clone(), index)).cloned().ok_or("Not found".to_string())
}
fn put_manifest(&self, manifest: ReceiptManifest) -> Result<(), String> {
self.manifests.lock().map_err(|_| "Poisoned".to_string())?.insert(manifest.receipt_id.clone(), manifest);
Ok(())
}
fn get_manifest(&self, receipt_id: &Blake3Hash) -> Result<ReceiptManifest, String> {
self.manifests.lock().map_err(|_| "Poisoned".to_string())?.get(receipt_id).cloned().ok_or("Not found".to_string())
}
}
let dht = Arc::new(MockDht {
manifests: Mutex::new(BTreeMap::new()),
shards: Mutex::new(BTreeMap::new()),
});
dht.put_manifest(manifest).map_err(|e| e.to_string())?;
for shard in shards {
dht.put_shard(shard).map_err(|e| e.to_string())?;
}
let verifier = DistributedVerifier::new(dht);
let result = verifier.verify_distributed(&receipt_id)?;
assert!(result.accepted);
assert_eq!(result.reason, "All distributed shards passed 7-stage verification");
Ok(())
}
}