use crate::stateful::{
db::{Anchor, DatabaseSet},
Application,
};
use commonware_codec::{EncodeSize, Error, FixedSize, Read, ReadExt, Write};
use commonware_consensus::{
marshal::{
core::{CommitmentFallback, Mailbox as MarshalMailbox, Variant},
Identifier,
},
simplex::types::Finalization,
types::Height,
CertifiableBlock, Heightable, Roundable,
};
use commonware_cryptography::{certificate::Scheme, Digest, Digestible};
use commonware_runtime::{Buf, BufMut, Clock, Metrics, Spawner, Storage};
use commonware_storage::metadata::{self, Metadata};
use commonware_utils::{fixed_bytes, sequence::FixedBytes};
use rand::Rng;
mod actor;
pub(crate) use actor::{Config, Syncer};
mod mailbox;
pub(crate) use mailbox::Mailbox;
mod plan;
pub use plan::SyncPlan;
const SYNC_METADATA_SUFFIX: &str = "state_sync_metadata";
const SYNC_STATE_KEY: FixedBytes<1> = fixed_bytes!("C0");
type BlockDigest<A, E> = <<A as Application<E>>::Block as Digestible>::Digest;
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct FloorMarker<C>
where
C: Digest,
{
height: Height,
commitment: C,
}
impl<C> FloorMarker<C>
where
C: Digest,
{
pub(crate) const fn new(height: Height, commitment: C) -> Self {
Self { height, commitment }
}
pub(crate) fn ensure_not_behind(&self, selected: &Self) {
assert!(
selected.height >= self.height,
"selected state sync floor cannot move behind the persisted in-progress floor",
);
if selected.height == self.height {
assert!(
selected.commitment == self.commitment,
"selected state sync floor conflicts with the persisted in-progress floor",
);
}
}
}
impl<C> Write for FloorMarker<C>
where
C: Digest,
{
fn write(&self, writer: &mut impl BufMut) {
self.height.write(writer);
self.commitment.write(writer);
}
}
impl<C> EncodeSize for FloorMarker<C>
where
C: Digest,
{
fn encode_size(&self) -> usize {
self.height.encode_size() + self.commitment.encode_size()
}
}
impl<C> Read for FloorMarker<C>
where
C: Digest,
{
type Cfg = ();
fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, Error> {
Ok(Self {
height: Height::read(reader)?,
commitment: C::read_cfg(reader, &())?,
})
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) enum SyncState<C>
where
C: Digest,
{
InProgress(FloorMarker<C>),
Complete(Height),
}
impl<C> SyncState<C>
where
C: Digest,
{
pub(crate) const fn sync_height(&self) -> Option<Height> {
match self {
Self::InProgress(_) => None,
Self::Complete(height) => Some(*height),
}
}
}
impl<C> Write for SyncState<C>
where
C: Digest,
{
fn write(&self, writer: &mut impl BufMut) {
match self {
Self::InProgress(floor) => {
0u8.write(writer);
floor.write(writer);
}
Self::Complete(height) => {
1u8.write(writer);
height.write(writer);
}
}
}
}
impl<C> EncodeSize for SyncState<C>
where
C: Digest,
{
fn encode_size(&self) -> usize {
u8::SIZE
+ match self {
Self::InProgress(floor) => floor.encode_size(),
Self::Complete(height) => height.encode_size(),
}
}
}
impl<C> Read for SyncState<C>
where
C: Digest,
{
type Cfg = ();
fn read_cfg(reader: &mut impl Buf, _: &()) -> Result<Self, Error> {
match u8::read(reader)? {
0 => Ok(Self::InProgress(FloorMarker::<C>::read(reader)?)),
1 => Ok(Self::Complete(Height::read(reader)?)),
n => Err(Error::InvalidEnum(n)),
}
}
}
pub struct SyncResult<E, A>
where
E: Rng + Spawner + Metrics + Clock,
A: Application<E>,
{
pub databases: A::Databases,
pub anchor: Anchor<BlockDigest<A, E>>,
}
impl<E, A> Clone for SyncResult<E, A>
where
E: Rng + Spawner + Metrics + Clock,
A: Application<E>,
{
fn clone(&self) -> Self {
Self {
databases: self.databases.clone(),
anchor: self.anchor,
}
}
}
pub(crate) struct ResolvedFloor<E, A, C>
where
E: Rng + Spawner + Metrics + Clock,
A: Application<E>,
C: Digest,
{
pub anchor: Anchor<BlockDigest<A, E>>,
pub targets: <A::Databases as DatabaseSet<E>>::SyncTargets,
pub marker: FloorMarker<C>,
}
pub(crate) struct StateSyncMetadata<E, C>
where
E: Storage + Clock + Metrics,
C: Digest,
{
partition_prefix: String,
metadata: Metadata<E, FixedBytes<1>, SyncState<C>>,
}
impl<E, C> StateSyncMetadata<E, C>
where
E: Storage + Clock + Metrics,
C: Digest,
{
pub(crate) async fn init(context: &E, partition_prefix: impl AsRef<str>) -> Self {
let partition_prefix = partition_prefix.as_ref().to_string();
let metadata = Metadata::init(
context.child("metadata"),
metadata::Config {
partition: format!("{partition_prefix}{SYNC_METADATA_SUFFIX}"),
codec_config: (),
},
)
.await
.expect("failed to load sync metadata");
Self {
partition_prefix,
metadata,
}
}
pub(crate) const fn partition_prefix(&self) -> &str {
self.partition_prefix.as_str()
}
pub(crate) fn sync_height(&self) -> Option<Height> {
self.metadata
.get(&SYNC_STATE_KEY)
.map(SyncState::sync_height)
.unwrap_or_default()
}
pub(crate) fn in_progress(&self) -> bool {
matches!(
self.metadata.get(&SYNC_STATE_KEY),
Some(SyncState::InProgress(_))
)
}
pub(crate) async fn begin_sync(&mut self, floor: FloorMarker<C>) {
match self.metadata.get(&SYNC_STATE_KEY) {
Some(SyncState::InProgress(existing)) => {
existing.ensure_not_behind(&floor);
}
Some(SyncState::Complete(_)) => {
panic!("completed state sync cannot be marked in-progress");
}
None => {}
}
self.metadata
.put_sync(SYNC_STATE_KEY, SyncState::InProgress(floor))
.await
.expect("failed to set state sync state to in-progress");
}
pub(crate) async fn set_complete(&mut self, height: Height) {
match self.metadata.get(&SYNC_STATE_KEY) {
Some(SyncState::InProgress(floor)) => {
assert!(
height >= floor.height,
"completed state sync height cannot be behind the in-progress floor",
);
}
Some(SyncState::Complete(existing)) => {
assert!(
height >= *existing,
"completed state sync height cannot move backward",
);
}
None => {}
}
self.metadata
.put_sync(SYNC_STATE_KEY, SyncState::<C>::Complete(height))
.await
.expect("failed to set state sync state to complete");
}
}
pub(crate) async fn resolve_state_sync_floor<E, A, S, V>(
marshal: &MarshalMailbox<S, V>,
finalization: &Finalization<S, V::Commitment>,
) -> ResolvedFloor<E, A, V::Commitment>
where
E: Rng + Spawner + Metrics + Clock,
A: Application<E>,
S: Scheme,
V: Variant<ApplicationBlock = A::Block>,
{
let floor = {
let block = marshal
.subscribe_by_commitment(finalization.proposal.payload, CommitmentFallback::Wait)
.await
.expect("marshal must yield floor block");
V::into_inner(block)
};
ResolvedFloor {
anchor: Anchor::from(&floor),
targets: A::sync_targets(&floor),
marker: FloorMarker::new(floor.height(), finalization.proposal.payload),
}
}
pub(crate) struct StartupResult<E, A>
where
E: Rng + Spawner + Metrics + Clock,
A: Application<E>,
{
pub sync: SyncResult<E, A>,
pub skip_finalized_until: Option<Height>,
}
pub(crate) async fn init_databases_from_marshal<E, A, S, V>(
context: &E,
marshal: &MarshalMailbox<S, V>,
db_config: <A::Databases as DatabaseSet<E>>::Config,
mut sync_metadata: StateSyncMetadata<E, V::Commitment>,
) -> StartupResult<E, A>
where
E: Rng + Storage + Spawner + Clock + Metrics,
A: Application<E>,
S: Scheme,
V: Variant<ApplicationBlock = A::Block>,
{
let sync_height = sync_metadata.sync_height();
let processed_height = marshal.get_processed_height().await;
let skip_finalized_until = match (sync_height, processed_height) {
(Some(sync_height), Some(processed_height)) if processed_height < sync_height => {
Some(sync_height)
}
(Some(sync_height), None) => Some(sync_height),
_ => None,
};
let marshal_floor = sync_height
.into_iter()
.chain(processed_height)
.max()
.unwrap_or_else(Height::zero);
let floor_block = {
let marshal_block = marshal
.get_block(Identifier::Height(marshal_floor))
.await
.expect("marshal must return floor block");
V::into_inner(marshal_block)
};
let databases = A::Databases::init(context.child("db_set"), db_config).await;
let processed_targets = A::sync_targets(&floor_block);
if databases.committed_targets().await != processed_targets {
databases.rewind_to_targets(processed_targets.clone()).await;
let rewound_targets = databases.committed_targets().await;
assert!(
rewound_targets == processed_targets,
"databases must be consistent with marshal floor after rewind"
);
}
sync_metadata.set_complete(floor_block.height()).await;
let anchor = Anchor {
height: floor_block.height(),
round: floor_block.context().round(),
digest: floor_block.digest(),
};
StartupResult {
sync: SyncResult { databases, anchor },
skip_finalized_until,
}
}