use crate::dkg::ModeVersion;
use commonware_codec::{EncodeSize, Read, ReadExt, Write};
use commonware_consensus::types::Epoch as EpochNum;
use commonware_cryptography::{
bls12381::{
dkg::{
Dealer as CryptoDealer, DealerLog, DealerPrivMsg, DealerPubMsg, Info, Logs, Output,
Player as CryptoPlayer, PlayerAck, SignedDealerLog,
},
primitives::{group::Share, variant::Variant},
},
transcript::{Summary, Transcript},
BatchVerifier, PublicKey, Signer,
};
use commonware_parallel::Strategy;
use commonware_runtime::{
buffer::paged::CacheRef, Buf, BufMut, BufferPooler, Clock, Metrics, Storage as RuntimeStorage,
};
use commonware_storage::{
journal::segmented::variable::{Config as SVConfig, Journal as SVJournal},
metadata::{Config as MetadataConfig, Metadata},
};
use commonware_utils::{Faults, NZUsize, NZU16};
use futures::StreamExt;
use rand_core::CryptoRngCore;
use std::{
collections::BTreeMap,
num::{NonZeroU16, NonZeroU32, NonZeroUsize},
};
use tracing::{debug, warn};
const PAGE_SIZE: NonZeroU16 = NZU16!(1 << 12);
const PAGE_CACHE_CAPACITY: NonZeroUsize = NZUsize!(1 << 13);
const WRITE_BUFFER: NonZeroUsize = NZUsize!(1 << 12);
const READ_BUFFER: NonZeroUsize = NZUsize!(1 << 20);
#[derive(Clone)]
pub struct Epoch<V: Variant, P: PublicKey> {
pub round: u64,
pub rng_seed: Summary,
pub output: Option<Output<V, P>>,
pub share: Option<Share>,
}
impl<V: Variant, P: PublicKey> EncodeSize for Epoch<V, P> {
fn encode_size(&self) -> usize {
self.round.encode_size()
+ self.rng_seed.encode_size()
+ self.output.encode_size()
+ self.share.encode_size()
}
}
impl<V: Variant, P: PublicKey> Write for Epoch<V, P> {
fn write(&self, buf: &mut impl BufMut) {
self.round.write(buf);
self.rng_seed.write(buf);
self.output.write(buf);
self.share.write(buf);
}
}
impl<V: Variant, P: PublicKey> Read for Epoch<V, P> {
type Cfg = (NonZeroU32, ModeVersion);
fn read_cfg(buf: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
Ok(Self {
round: ReadExt::read(buf)?,
rng_seed: ReadExt::read(buf)?,
output: Read::read_cfg(buf, cfg)?,
share: ReadExt::read(buf)?,
})
}
}
enum Event<V: Variant, P: PublicKey> {
Dealing(P, DealerPubMsg<V>, DealerPrivMsg),
Ack(P, PlayerAck<P>),
Log(P, DealerLog<V, P>),
}
impl<V: Variant, P: PublicKey> EncodeSize for Event<V, P> {
fn encode_size(&self) -> usize {
1 + match self {
Self::Dealing(x0, x1, x2) => x0.encode_size() + x1.encode_size() + x2.encode_size(),
Self::Ack(x0, x1) => x0.encode_size() + x1.encode_size(),
Self::Log(x0, x1) => x0.encode_size() + x1.encode_size(),
}
}
}
impl<V: Variant, P: PublicKey> Write for Event<V, P> {
fn write(&self, buf: &mut impl BufMut) {
match self {
Self::Dealing(x0, x1, x2) => {
0u8.write(buf);
x0.write(buf);
x1.write(buf);
x2.write(buf);
}
Self::Ack(x0, x1) => {
1u8.write(buf);
x0.write(buf);
x1.write(buf);
}
Self::Log(x0, x1) => {
2u8.write(buf);
x0.write(buf);
x1.write(buf);
}
}
}
}
impl<V: Variant, P: PublicKey> Read for Event<V, P> {
type Cfg = NonZeroU32;
fn read_cfg(buf: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
let tag = u8::read(buf)?;
match tag {
0 => Ok(Self::Dealing(
ReadExt::read(buf)?,
Read::read_cfg(buf, cfg)?,
ReadExt::read(buf)?,
)),
1 => Ok(Self::Ack(ReadExt::read(buf)?, ReadExt::read(buf)?)),
2 => Ok(Self::Log(ReadExt::read(buf)?, Read::read_cfg(buf, cfg)?)),
other => Err(commonware_codec::Error::InvalidEnum(other)),
}
}
}
struct EpochCache<V: Variant, P: PublicKey> {
dealings: BTreeMap<P, (DealerPubMsg<V>, DealerPrivMsg)>,
acks: BTreeMap<P, PlayerAck<P>>,
logs: BTreeMap<P, DealerLog<V, P>>,
}
impl<V: Variant, P: PublicKey> Default for EpochCache<V, P> {
fn default() -> Self {
Self {
dealings: BTreeMap::new(),
acks: BTreeMap::new(),
logs: BTreeMap::new(),
}
}
}
pub struct Storage<E: BufferPooler + Clock + RuntimeStorage + Metrics, V: Variant, P: PublicKey> {
states: Metadata<E, u64, Epoch<V, P>>,
msgs: SVJournal<E, Event<V, P>>,
current: Option<(EpochNum, Epoch<V, P>)>,
epochs: BTreeMap<EpochNum, EpochCache<V, P>>,
}
impl<E: BufferPooler + Clock + RuntimeStorage + Metrics, V: Variant, P: PublicKey>
Storage<E, V, P>
{
pub async fn init(
context: E,
partition_prefix: &str,
max_read_size: NonZeroU32,
max_supported_mode: ModeVersion,
) -> Self {
let page_cache = CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_CAPACITY);
let states: Metadata<E, u64, Epoch<V, P>> = Metadata::init(
context.with_label("states"),
MetadataConfig {
partition: format!("{partition_prefix}_states"),
codec_config: (max_read_size, max_supported_mode),
},
)
.await
.expect("should be able to init dkg_states metadata");
let msgs = SVJournal::init(
context.with_label("msgs"),
SVConfig {
partition: format!("{partition_prefix}_msgs"),
compression: None,
codec_config: max_read_size,
page_cache,
write_buffer: WRITE_BUFFER,
},
)
.await
.expect("should be able to init dkg_msgs journal");
let current = states.keys().max().map(|&epoch_num| {
let state = states.get(&epoch_num).expect("key must exist").clone();
(EpochNum::new(epoch_num), state)
});
let mut epochs = BTreeMap::<EpochNum, EpochCache<V, P>>::new();
{
let replay = msgs
.replay(0, 0, READ_BUFFER)
.await
.expect("should be able to replay msgs");
futures::pin_mut!(replay);
while let Some(result) = replay.next().await {
let (section, _, _, event) = result.expect("should be able to read msg");
let epoch = EpochNum::new(section);
let cache = epochs.entry(epoch).or_default();
match event {
Event::Dealing(dealer, pub_msg, priv_msg) => {
cache.dealings.insert(dealer, (pub_msg, priv_msg));
}
Event::Ack(player, ack) => {
cache.acks.insert(player, ack);
}
Event::Log(dealer, log) => {
cache.logs.insert(dealer, log);
}
}
}
}
Self {
states,
msgs,
current,
epochs,
}
}
pub fn dealings(&self, epoch: EpochNum) -> Vec<(P, DealerPubMsg<V>, DealerPrivMsg)> {
self.epochs
.get(&epoch)
.map(|cache| {
cache
.dealings
.iter()
.map(|(k, (v1, v2))| (k.clone(), v1.clone(), v2.clone()))
.collect()
})
.unwrap_or_default()
}
pub fn acks(&self, epoch: EpochNum) -> Vec<(P, PlayerAck<P>)> {
self.epochs
.get(&epoch)
.map(|cache| {
cache
.acks
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect()
})
.unwrap_or_default()
}
pub fn logs(&self, epoch: EpochNum) -> BTreeMap<P, DealerLog<V, P>> {
self.epochs
.get(&epoch)
.map(|cache| cache.logs.clone())
.unwrap_or_default()
}
pub fn has_log(&self, epoch: EpochNum, dealer: &P) -> bool {
self.epochs
.get(&epoch)
.map(|cache| cache.logs.contains_key(dealer))
.unwrap_or(false)
}
pub fn epoch(&self) -> Option<(EpochNum, Epoch<V, P>)> {
self.current.as_ref().map(|(e, s)| (*e, s.clone()))
}
fn get_or_create_epoch(&mut self, epoch: EpochNum) -> &mut EpochCache<V, P> {
self.epochs.entry(epoch).or_default()
}
fn has_cached<K: Ord, T>(
&self,
epoch: EpochNum,
get_map: impl Fn(&EpochCache<V, P>) -> &BTreeMap<K, T>,
key: &K,
) -> bool {
self.epochs
.get(&epoch)
.is_some_and(|cache| get_map(cache).contains_key(key))
}
pub async fn append_dealing(
&mut self,
epoch: EpochNum,
dealer: P,
pub_msg: DealerPubMsg<V>,
priv_msg: DealerPrivMsg,
) -> bool {
if self.has_cached(epoch, |c| &c.dealings, &dealer) {
return false;
}
let section = epoch.get();
self.msgs
.append(
section,
&Event::Dealing(dealer.clone(), pub_msg.clone(), priv_msg.clone()),
)
.await
.expect("should be able to write to msgs");
self.msgs
.sync(section)
.await
.expect("should be able to sync msgs");
self.get_or_create_epoch(epoch)
.dealings
.insert(dealer, (pub_msg, priv_msg));
true
}
pub async fn append_ack(&mut self, epoch: EpochNum, player: P, ack: PlayerAck<P>) -> bool {
if self.has_cached(epoch, |c| &c.acks, &player) {
return false;
}
let section = epoch.get();
self.msgs
.append(section, &Event::Ack(player.clone(), ack.clone()))
.await
.expect("should be able to write to msgs");
self.msgs
.sync(section)
.await
.expect("should be able to sync msgs");
self.get_or_create_epoch(epoch).acks.insert(player, ack);
true
}
pub async fn append_log(&mut self, epoch: EpochNum, dealer: P, log: DealerLog<V, P>) -> bool {
if self.has_cached(epoch, |c| &c.logs, &dealer) {
return false;
}
let section = epoch.get();
self.msgs
.append(section, &Event::Log(dealer.clone(), log.clone()))
.await
.expect("should be able to write to msgs");
self.msgs
.sync(section)
.await
.expect("should be able to sync msgs");
self.get_or_create_epoch(epoch).logs.insert(dealer, log);
true
}
pub async fn set_epoch(&mut self, epoch: EpochNum, state: Epoch<V, P>) {
let epoch_key = epoch.get();
if self.states.put(epoch_key, state.clone()).is_some() {
warn!(%epoch, "overwriting existing epoch state");
}
self.states
.sync()
.await
.expect("should be able to sync state");
self.current = Some((epoch, state));
}
pub async fn prune(&mut self, min: EpochNum) {
let min_epoch = min.get();
self.msgs
.prune(min_epoch)
.await
.expect("should be able to prune msgs");
self.states.retain(|&epoch_key, _| epoch_key >= min_epoch);
self.states
.sync()
.await
.expect("should be able to sync states after prune");
self.epochs.retain(|&epoch, _| epoch >= min);
}
pub fn create_dealer<C: Signer<PublicKey = P>, M: Faults>(
&self,
epoch: EpochNum,
signer: C,
round_info: Info<V, P>,
share: Option<Share>,
rng_seed: Summary,
) -> Option<Dealer<V, C>> {
if self.has_log(epoch, &signer.public_key()) {
return None;
}
let (mut crypto_dealer, pub_msg, priv_msgs) = CryptoDealer::start::<M>(
Transcript::resume(rng_seed).noise(b"dealer-rng"),
round_info,
signer,
share,
)
.expect("should be able to create dealer");
let mut unsent: BTreeMap<P, DealerPrivMsg> = priv_msgs.into_iter().collect();
for (player, ack) in self.acks(epoch) {
if unsent.contains_key(&player)
&& crypto_dealer
.receive_player_ack(player.clone(), ack)
.is_ok()
{
unsent.remove(&player);
debug!(?epoch, ?player, "replayed player ack");
}
}
Some(Dealer::new(Some(crypto_dealer), pub_msg, unsent))
}
pub fn create_player<C: Signer<PublicKey = P>, M: Faults>(
&self,
epoch: EpochNum,
signer: C,
round_info: Info<V, P>,
) -> Option<Player<V, C>> {
let logs = self.logs(epoch);
let dealings = self.dealings(epoch);
let (crypto_player, acks) = CryptoPlayer::resume::<M>(round_info, signer, &logs, dealings)
.expect("should be able to resume player");
for dealer in acks.keys() {
debug!(?epoch, ?dealer, "restored committed dealer message");
}
Some(Player {
player: crypto_player,
acks,
})
}
}
pub struct Dealer<V: Variant, C: Signer> {
dealer: Option<CryptoDealer<V, C>>,
pub_msg: DealerPubMsg<V>,
unsent: BTreeMap<C::PublicKey, DealerPrivMsg>,
finalized: Option<SignedDealerLog<V, C>>,
}
impl<V: Variant, C: Signer> Dealer<V, C> {
pub const fn new(
dealer: Option<CryptoDealer<V, C>>,
pub_msg: DealerPubMsg<V>,
unsent: BTreeMap<C::PublicKey, DealerPrivMsg>,
) -> Self {
Self {
dealer,
pub_msg,
unsent,
finalized: None,
}
}
pub async fn handle<E: BufferPooler + Clock + RuntimeStorage + Metrics>(
&mut self,
storage: &mut Storage<E, V, C::PublicKey>,
epoch: EpochNum,
player: C::PublicKey,
ack: PlayerAck<C::PublicKey>,
) -> bool {
if !self.unsent.contains_key(&player) {
return false;
}
if let Some(ref mut dealer) = self.dealer {
if dealer
.receive_player_ack(player.clone(), ack.clone())
.is_ok()
{
self.unsent.remove(&player);
storage.append_ack(epoch, player, ack).await;
return true;
}
}
false
}
pub fn finalize<M: Faults>(&mut self) {
if self.finalized.is_some() {
return;
}
if let Some(dealer) = self.dealer.take() {
let log = dealer.finalize::<M>();
self.finalized = Some(log);
}
}
pub fn finalized(&self) -> Option<SignedDealerLog<V, C>> {
self.finalized.clone()
}
pub const fn take_finalized(&mut self) -> Option<SignedDealerLog<V, C>> {
self.finalized.take()
}
pub fn shares_to_distribute(
&self,
) -> impl Iterator<Item = (C::PublicKey, DealerPubMsg<V>, DealerPrivMsg)> + '_ {
self.unsent
.iter()
.map(|(player, priv_msg)| (player.clone(), self.pub_msg.clone(), priv_msg.clone()))
}
}
pub struct Player<V: Variant, C: Signer> {
player: CryptoPlayer<V, C>,
acks: BTreeMap<C::PublicKey, PlayerAck<C::PublicKey>>,
}
impl<V: Variant, C: Signer> Player<V, C> {
pub async fn handle<E: BufferPooler + Clock + RuntimeStorage + Metrics, M: Faults>(
&mut self,
storage: &mut Storage<E, V, C::PublicKey>,
epoch: EpochNum,
dealer: C::PublicKey,
pub_msg: DealerPubMsg<V>,
priv_msg: DealerPrivMsg,
) -> Option<PlayerAck<C::PublicKey>> {
if let Some(ack) = self.acks.get(&dealer) {
return Some(ack.clone());
}
if let Some(ack) =
self.player
.dealer_message::<M>(dealer.clone(), pub_msg.clone(), priv_msg.clone())
{
storage
.append_dealing(epoch, dealer.clone(), pub_msg, priv_msg)
.await;
self.acks.insert(dealer, ack.clone());
return Some(ack);
}
None
}
pub fn finalize<M: Faults, B: BatchVerifier<PublicKey = C::PublicKey>>(
self,
rng: &mut impl CryptoRngCore,
logs: Logs<V, C::PublicKey, M>,
strategy: &impl Strategy,
) -> Result<(Output<V, C::PublicKey>, Share), commonware_cryptography::bls12381::dkg::Error>
{
self.player.finalize::<M, B>(rng, logs, strategy)
}
}
#[cfg(test)]
mod tests {
use super::*;
use commonware_codec::Encode;
use commonware_consensus::types::Epoch;
use commonware_cryptography::{
bls12381::{
dkg::Info,
primitives::{group::Scalar, sharing::Mode, variant::MinPk},
},
ed25519, Signer,
};
use commonware_macros::test_traced;
use commonware_math::algebra::{Random, Ring};
use commonware_runtime::{deterministic, Runner};
use commonware_utils::{ordered::Set, test_rng, test_rng_seeded, N3f1};
const TEST_NAMESPACE: &[u8] = b"test_dkg";
fn create_test_signers(n: usize) -> Vec<ed25519::PrivateKey> {
(0..n)
.map(|i| {
let mut rng = test_rng_seeded(i as u64);
ed25519::PrivateKey::random(&mut rng)
})
.collect()
}
fn create_round_info(signers: &[ed25519::PrivateKey]) -> Info<MinPk, ed25519::PublicKey> {
let players = Set::from_iter_dedup(signers.iter().map(|s| s.public_key()));
let dealers = players.clone();
Info::new::<N3f1>(
TEST_NAMESPACE,
0,
None,
Mode::NonZeroCounter,
dealers,
players,
)
.expect("valid info")
}
#[test_traced]
fn test_dealer_handle_returns_false_when_player_not_in_unsent() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let signers = create_test_signers(4);
let round_info = create_round_info(&signers);
let mut storage = Storage::<_, MinPk, _>::init(
context.with_label("storage"),
"test",
NonZeroU32::new(10).unwrap(),
crate::dkg::MAX_SUPPORTED_MODE,
)
.await;
let dealer_signer = signers[0].clone();
let mut rng = test_rng();
let (crypto_dealer, pub_msg, priv_msgs) =
CryptoDealer::<MinPk, _>::start::<N3f1>(&mut rng, round_info, dealer_signer, None)
.expect("valid dealer");
let unsent: BTreeMap<_, _> = priv_msgs.into_iter().collect();
let mut dealer = Dealer::new(Some(crypto_dealer), pub_msg, unsent);
let unknown_player = {
let mut rng = test_rng_seeded(100);
ed25519::PrivateKey::random(&mut rng).public_key()
};
let fake_ack = PlayerAck::read(&mut signers[1].sign(b"ns", b"msg").encode().as_ref())
.expect("valid ack");
let result = dealer
.handle(&mut storage, Epoch::zero(), unknown_player, fake_ack)
.await;
assert!(
!result,
"handle should return false when player not in unsent"
);
});
}
#[test_traced]
fn test_dealer_handle_returns_false_when_crypto_dealer_is_none() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let signers = create_test_signers(4);
let round_info = create_round_info(&signers);
let mut storage = Storage::<_, MinPk, ed25519::PublicKey>::init(
context.with_label("storage"),
"test",
NonZeroU32::new(10).unwrap(),
crate::dkg::MAX_SUPPORTED_MODE,
)
.await;
let dealer_signer = signers[0].clone();
let mut rng = test_rng();
let (_crypto_dealer, pub_msg, priv_msgs) =
CryptoDealer::<MinPk, _>::start::<N3f1>(&mut rng, round_info, dealer_signer, None)
.expect("valid dealer");
let player = signers[1].public_key();
let mut unsent: BTreeMap<_, _> = priv_msgs.into_iter().collect();
unsent.insert(player.clone(), DealerPrivMsg::new(Scalar::one()));
let mut dealer = Dealer::<MinPk, ed25519::PrivateKey>::new(None, pub_msg, unsent);
let sig = signers[1].sign(b"ns", b"msg");
let fake_ack: PlayerAck<ed25519::PublicKey> =
PlayerAck::read(&mut sig.encode().as_ref()).expect("valid ack");
let result = dealer
.handle(&mut storage, Epoch::zero(), player, fake_ack)
.await;
assert!(
!result,
"handle should return false when crypto dealer is None"
);
});
}
#[test_traced]
fn test_dealer_handle_returns_true_for_valid_ack() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let signers = create_test_signers(4);
let round_info = create_round_info(&signers);
let mut storage = Storage::<_, MinPk, _>::init(
context.with_label("storage"),
"test",
NonZeroU32::new(10).unwrap(),
crate::dkg::MAX_SUPPORTED_MODE,
)
.await;
let dealer_signer = signers[0].clone();
let mut rng = test_rng();
let (crypto_dealer, pub_msg, priv_msgs) = CryptoDealer::<MinPk, _>::start::<N3f1>(
&mut rng,
round_info.clone(),
dealer_signer.clone(),
None,
)
.expect("valid dealer");
let unsent: BTreeMap<_, _> = priv_msgs.into_iter().collect();
let mut dealer = Dealer::new(Some(crypto_dealer), pub_msg.clone(), unsent);
let player_signer = signers[1].clone();
let player_pk = player_signer.public_key();
let player_priv_msg = dealer
.shares_to_distribute()
.find(|(p, _, _)| *p == player_pk)
.map(|(_, _, priv_msg)| priv_msg)
.expect("player should have a share");
let mut crypto_player =
CryptoPlayer::new(round_info, player_signer).expect("valid player");
let ack = crypto_player
.dealer_message::<N3f1>(dealer_signer.public_key(), pub_msg, player_priv_msg)
.expect("valid ack");
let result = dealer
.handle(&mut storage, Epoch::zero(), player_pk, ack)
.await;
assert!(result, "handle should return true for valid ack");
});
}
#[test_traced]
fn test_dealer_handle_returns_false_for_duplicate_ack() {
let executor = deterministic::Runner::default();
executor.start(|context| async move {
let signers = create_test_signers(4);
let round_info = create_round_info(&signers);
let mut storage = Storage::<_, MinPk, ed25519::PublicKey>::init(
context.with_label("storage"),
"test",
NonZeroU32::new(10).unwrap(),
crate::dkg::MAX_SUPPORTED_MODE,
)
.await;
let dealer_signer = signers[0].clone();
let mut rng = test_rng();
let (crypto_dealer, pub_msg, priv_msgs) = CryptoDealer::<MinPk, _>::start::<N3f1>(
&mut rng,
round_info.clone(),
dealer_signer.clone(),
None,
)
.expect("valid dealer");
let unsent: BTreeMap<_, _> = priv_msgs.into_iter().collect();
let mut dealer = Dealer::new(Some(crypto_dealer), pub_msg.clone(), unsent);
let player_signer = signers[1].clone();
let player_pk = player_signer.public_key();
let player_priv_msg = dealer
.shares_to_distribute()
.find(|(p, _, _)| *p == player_pk)
.map(|(_, _, priv_msg)| priv_msg)
.expect("player should have a share");
let mut crypto_player =
CryptoPlayer::new(round_info, player_signer).expect("valid player");
let ack = crypto_player
.dealer_message::<N3f1>(dealer_signer.public_key(), pub_msg, player_priv_msg)
.expect("valid ack");
let result = dealer
.handle(&mut storage, Epoch::zero(), player_pk.clone(), ack.clone())
.await;
assert!(result, "first ack should succeed");
let result = dealer
.handle(&mut storage, Epoch::zero(), player_pk, ack)
.await;
assert!(!result, "duplicate ack should return false");
});
}
}