Skip to main content

commonware_glue/stateful/actor/syncer/
plan.rs

1use super::StateSyncMetadata;
2use commonware_consensus::{
3    marshal::{core::Variant, Start},
4    simplex::types::Finalization,
5    types::Height,
6};
7use commonware_cryptography::certificate::Scheme;
8use commonware_runtime::{Clock, Metrics, Storage};
9
10/// Startup plan that determines whether one-time peer state sync may still run.
11///
12/// Construction is two-phase so the caller can avoid fetching a finalized
13/// floor from peers when state sync has already completed:
14///
15/// 1. [`SyncPlan::init`] reads the durable state sync state.
16/// 2. If [`SyncPlan::may_state_sync`] returns `true`, the caller fetches a
17///    finalized floor and attaches it via [`SyncPlan::with_floor`]. Otherwise
18///    the caller skips floor selection entirely.
19///
20/// The plan owns the opened metadata store and is later consumed by
21/// [`Stateful`](crate::stateful::Stateful), so startup does not reopen the same
22/// metadata partition from multiple places.
23///
24/// Once state sync completes, this node never performs peer state sync
25/// again. Future startups must recover from the later of that synced height
26/// and marshal's processed height instead.
27pub struct SyncPlan<E, S, V>
28where
29    E: Clock + Metrics + Storage,
30    S: Scheme,
31    V: Variant,
32{
33    sync_metadata: StateSyncMetadata<E, V::Commitment>,
34    floor: Option<Finalization<S, V::Commitment>>,
35}
36
37impl<E, S, V> SyncPlan<E, S, V>
38where
39    E: Clock + Metrics + Storage,
40    S: Scheme,
41    V: Variant,
42{
43    /// Load the durable state sync metadata for this partition prefix.
44    ///
45    /// # Panics
46    ///
47    /// Panics if the metadata store cannot be opened. A node that cannot
48    /// determine whether state sync already completed cannot safely choose a
49    /// startup path.
50    pub async fn init(context: &E, partition_prefix: impl AsRef<str>) -> Self {
51        let sync_metadata =
52            StateSyncMetadata::<E, V::Commitment>::init(context, partition_prefix).await;
53        Self {
54            sync_metadata,
55            floor: None,
56        }
57    }
58
59    /// Returns whether state sync can still run on this node.
60    ///
61    /// When `false`, the caller should skip floor selection: any floor passed
62    /// to [`SyncPlan::with_floor`] would be ignored. The node already has a
63    /// durable completed state sync height, so future boots must recover from that
64    /// height or marshal's processed height instead of running peer state sync again.
65    ///
66    /// When `true`, the caller can optionally attach a finalized floor via
67    /// [`SyncPlan::with_floor`]. If a floor is not attached, the node will
68    /// attempt to sync from genesis via marshal unless it is resuming an
69    /// interrupted state sync.
70    pub fn may_state_sync(&self) -> bool {
71        self.sync_metadata.sync_height().is_none()
72    }
73
74    /// Returns the durable completed state sync height, if one has been stored.
75    pub fn sync_height(&self) -> Option<Height> {
76        self.sync_metadata.sync_height()
77    }
78
79    /// Returns the partition prefix to use for state sync metadata storage.
80    pub const fn partition_prefix(&self) -> &str {
81        self.sync_metadata.partition_prefix()
82    }
83
84    /// Returns a reference to the finalized floor attached to this plan, if any.
85    pub const fn floor(&self) -> Option<&Finalization<S, V::Commitment>> {
86        self.floor.as_ref()
87    }
88
89    /// Attach a finalized floor to state sync from.
90    ///
91    /// Has no effect if state sync has already completed.
92    #[must_use]
93    pub fn with_floor(mut self, floor: Finalization<S, V::Commitment>) -> Self {
94        if !self.may_state_sync() {
95            return self;
96        }
97
98        self.floor = Some(floor);
99        self
100    }
101
102    /// Returns marshal's startup anchor for this plan.
103    ///
104    /// If a finalized floor was attached, marshal starts from that floor.
105    /// Otherwise marshal starts from genesis and relies on its own durable
106    /// progress to override that anchor when available.
107    pub fn marshal_start<B>(&self, genesis: B) -> Start<S, V::Commitment, B> {
108        self.floor
109            .clone()
110            .map_or_else(|| Start::Genesis(genesis), Start::Floor)
111    }
112
113    /// Returns whether startup must attach a new state sync floor.
114    ///
115    /// This is `true` after a previous process crashed while state sync was
116    /// in progress. In that case [`Self::may_state_sync`] is also `true`, but
117    /// starting from marshal/genesis is not allowed because partially synced
118    /// database state must be reopened through the state-sync path.
119    pub fn requires_state_sync_floor(&self) -> bool {
120        self.sync_metadata.in_progress()
121    }
122
123    /// Consumes this plan and returns its durable state-sync metadata handle.
124    pub(crate) fn into_sync_metadata(self) -> StateSyncMetadata<E, V::Commitment> {
125        self.sync_metadata
126    }
127}
128
129#[cfg(test)]
130mod tests {
131    use super::SyncPlan;
132    use crate::stateful::{
133        actor::syncer::{FloorMarker, StateSyncMetadata},
134        tests::mocks::{TestScheme, TestVariant},
135    };
136    use commonware_consensus::types::Height;
137    use commonware_cryptography::sha256::{Digest as Sha256Digest, Sha256};
138    use commonware_runtime::{deterministic, Runner as _};
139
140    #[test]
141    fn stored_sync_height_disables_state_sync() {
142        deterministic::Runner::default().start(|context| async move {
143            let partition_prefix = "stored_sync_height";
144
145            let plan =
146                SyncPlan::<_, TestScheme, TestVariant>::init(&context, partition_prefix).await;
147            assert!(plan.may_state_sync());
148            assert_eq!(plan.sync_height(), None);
149            drop(plan);
150
151            let mut metadata =
152                StateSyncMetadata::<_, Sha256Digest>::init(&context, partition_prefix).await;
153            metadata.set_complete(Height::new(7)).await;
154            drop(metadata);
155
156            let plan =
157                SyncPlan::<_, TestScheme, TestVariant>::init(&context, partition_prefix).await;
158            assert!(!plan.may_state_sync());
159            assert_eq!(plan.sync_height(), Some(Height::new(7)));
160            assert!(plan.floor().is_none());
161        });
162    }
163
164    #[test]
165    #[should_panic(expected = "completed state sync cannot be marked in-progress")]
166    fn completed_sync_cannot_be_marked_in_progress() {
167        deterministic::Runner::default().start(|context| async move {
168            let partition_prefix = "completed_sync_cannot_be_marked_in_progress";
169            let mut metadata =
170                StateSyncMetadata::<_, Sha256Digest>::init(&context, partition_prefix).await;
171            metadata.set_complete(Height::new(7)).await;
172            metadata
173                .begin_sync(FloorMarker::new(Height::new(8), Sha256::fill(8)))
174                .await;
175        });
176    }
177
178    #[test]
179    #[should_panic(expected = "completed state sync height cannot move backward")]
180    fn complete_height_cannot_move_backward() {
181        deterministic::Runner::default().start(|context| async move {
182            let partition_prefix = "complete_height_cannot_move_backward";
183            let mut metadata =
184                StateSyncMetadata::<_, Sha256Digest>::init(&context, partition_prefix).await;
185            metadata.set_complete(Height::new(7)).await;
186            metadata.set_complete(Height::new(6)).await;
187        });
188    }
189
190    #[test]
191    #[should_panic(expected = "completed state sync height cannot be behind the in-progress floor")]
192    fn complete_height_cannot_be_behind_in_progress_floor() {
193        deterministic::Runner::default().start(|context| async move {
194            let partition_prefix = "complete_height_cannot_be_behind_in_progress_floor";
195            let mut metadata =
196                StateSyncMetadata::<_, Sha256Digest>::init(&context, partition_prefix).await;
197            metadata
198                .begin_sync(FloorMarker::new(Height::new(7), Sha256::fill(7)))
199                .await;
200            metadata.set_complete(Height::new(6)).await;
201        });
202    }
203
204    #[test]
205    fn in_progress_sync_requires_compatible_floor() {
206        deterministic::Runner::default().start(|context| async move {
207            let partition_prefix = "in_progress_sync_requires_compatible_floor";
208            let stored = FloorMarker::new(Height::new(7), Sha256::fill(7));
209            let mut metadata =
210                StateSyncMetadata::<_, Sha256Digest>::init(&context, partition_prefix).await;
211            metadata.begin_sync(stored.clone()).await;
212            drop(metadata);
213
214            let mut plan =
215                SyncPlan::<_, TestScheme, TestVariant>::init(&context, partition_prefix).await;
216            assert!(plan.may_state_sync());
217            assert!(plan.requires_state_sync_floor());
218            plan.sync_metadata.begin_sync(stored).await;
219            plan.sync_metadata
220                .begin_sync(FloorMarker::new(Height::new(9), Sha256::fill(9)))
221                .await;
222        });
223    }
224
225    #[test]
226    #[should_panic(
227        expected = "selected state sync floor cannot move behind the persisted in-progress floor"
228    )]
229    fn in_progress_sync_panics_for_backward_floor() {
230        let stored = FloorMarker::new(Height::new(7), Sha256::fill(7));
231        stored.ensure_not_behind(&FloorMarker::new(Height::new(6), Sha256::fill(6)));
232    }
233
234    #[test]
235    #[should_panic(
236        expected = "selected state sync floor conflicts with the persisted in-progress floor"
237    )]
238    fn in_progress_sync_panics_for_conflicting_floor() {
239        let stored = FloorMarker::new(Height::new(7), Sha256::fill(7));
240        stored.ensure_not_behind(&FloorMarker::new(Height::new(7), Sha256::fill(8)));
241    }
242}