use std::sync::{Arc, Barrier};
use std::thread;
use auths_core::crypto::said::{compute_next_commitment, compute_said};
use auths_id::keri::KeriSequence;
use auths_id::keri::event::{Event, IcpEvent, IxnEvent};
use auths_id::keri::seal::Seal;
use auths_id::keri::types::{Prefix, Said};
use auths_id::keri::validate::{finalize_icp_event, serialize_for_signing};
use auths_id::storage::registry::backend::RegistryBackend;
use auths_storage::git::{GitRegistryBackend, RegistryConfig};
use base64::Engine;
use base64::engine::general_purpose::URL_SAFE_NO_PAD;
use ring::rand::SystemRandom;
use ring::signature::{Ed25519KeyPair, KeyPair};
use tempfile::TempDir;
fn make_signed_icp() -> (Event, Prefix, Ed25519KeyPair) {
let rng = SystemRandom::new();
let pkcs8 = Ed25519KeyPair::generate_pkcs8(&rng).unwrap();
let keypair = Ed25519KeyPair::from_pkcs8(pkcs8.as_ref()).unwrap();
let key_encoded = format!("D{}", URL_SAFE_NO_PAD.encode(keypair.public_key().as_ref()));
let next_pkcs8 = Ed25519KeyPair::generate_pkcs8(&rng).unwrap();
let next_keypair = Ed25519KeyPair::from_pkcs8(next_pkcs8.as_ref()).unwrap();
let next_commitment = compute_next_commitment(next_keypair.public_key().as_ref());
let icp = IcpEvent {
v: "KERI10JSON".to_string(),
d: Said::default(),
i: Prefix::default(),
s: KeriSequence::new(0),
kt: "1".to_string(),
k: vec![key_encoded],
nt: "1".to_string(),
n: vec![next_commitment],
bt: "0".to_string(),
b: vec![],
a: vec![],
x: String::new(),
};
let mut finalized = finalize_icp_event(icp).unwrap();
let canonical = serialize_for_signing(&Event::Icp(finalized.clone())).unwrap();
let sig = keypair.sign(&canonical);
finalized.x = URL_SAFE_NO_PAD.encode(sig.as_ref());
let prefix = finalized.i.clone();
(Event::Icp(finalized), prefix, keypair)
}
fn make_signed_ixn(prefix: &Prefix, seq: u64, prev_said: &str, keypair: &Ed25519KeyPair) -> Event {
let mut ixn = IxnEvent {
v: "KERI10JSON".to_string(),
d: Said::default(),
i: prefix.clone(),
s: KeriSequence::new(seq),
p: Said::new_unchecked(prev_said.to_string()),
a: vec![Seal::device_attestation("EConcurrent")],
x: String::new(),
};
let json = serde_json::to_vec(&Event::Ixn(ixn.clone())).unwrap();
ixn.d = compute_said(&json);
let canonical = serialize_for_signing(&Event::Ixn(ixn.clone())).unwrap();
let sig = keypair.sign(&canonical);
ixn.x = URL_SAFE_NO_PAD.encode(sig.as_ref());
Event::Ixn(ixn)
}
#[test]
fn concurrent_writes_to_different_identities() {
let dir = TempDir::new().unwrap();
let backend =
GitRegistryBackend::from_config_unchecked(RegistryConfig::single_tenant(dir.path()));
backend.init_if_needed().unwrap();
let path = dir.path().to_path_buf();
let thread_count = 10;
let barrier = Arc::new(Barrier::new(thread_count));
let handles: Vec<_> = (0..thread_count)
.map(|_| {
let path = path.clone();
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
let backend =
GitRegistryBackend::from_config_unchecked(RegistryConfig::single_tenant(&path));
let (icp, prefix, _keypair) = make_signed_icp();
barrier.wait();
for _ in 0..20 {
match backend.append_event(&prefix, &icp) {
Ok(()) => return Ok(()),
Err(e) => {
let msg = format!("{}", e);
if msg.contains("Concurrent modification")
|| msg.contains("already exists")
{
thread::sleep(std::time::Duration::from_millis(10));
continue;
}
return Err(e);
}
}
}
Ok(())
})
})
.collect();
let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
let successes = results.iter().filter(|r| r.is_ok()).count();
assert_eq!(
successes, thread_count,
"All {} threads should eventually succeed for different identities, got {} successes",
thread_count, successes
);
}
#[test]
fn concurrent_writes_to_same_identity() {
let dir = TempDir::new().unwrap();
let backend =
GitRegistryBackend::from_config_unchecked(RegistryConfig::single_tenant(dir.path()));
backend.init_if_needed().unwrap();
let (icp, prefix, keypair) = make_signed_icp();
let icp_said = icp.said().to_string();
backend.append_event(&prefix, &icp).unwrap();
let path = dir.path().to_path_buf();
let thread_count = 5;
let barrier = Arc::new(Barrier::new(thread_count));
let handles: Vec<_> = (0..thread_count)
.map(|_| {
let path = path.clone();
let barrier = Arc::clone(&barrier);
let prefix = prefix.clone();
let ixn = make_signed_ixn(&prefix, 1, &icp_said, &keypair);
thread::spawn(move || {
let backend =
GitRegistryBackend::from_config_unchecked(RegistryConfig::single_tenant(&path));
barrier.wait();
backend.append_event(&prefix, &ixn)
})
})
.collect();
let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
let successes = results.iter().filter(|r| r.is_ok()).count();
assert_eq!(
successes, 1,
"Exactly 1 thread should succeed for same identity seq 1, got {} successes",
successes
);
}