Skip to main content

snarkos_node_bft/primary/
proposal_task.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{CREATE_BATCH_INTERVAL, MAX_BATCH_DELAY, MIN_BATCH_DELAY};
17
18use anyhow::Result;
19use colored::Colorize;
20use futures::future::BoxFuture;
21use snarkvm::{prelude::Network, utilities::flatten_error};
22use std::{marker::PhantomData, sync::Arc};
23use tokio::{
24    sync::watch,
25    time::{Instant, sleep, sleep_until},
26};
27use tracing::{debug, warn};
28
29/// Abstracts over batch-proposal operations, allowing the proposal loop to be tested without a
30/// real primary.
31#[async_trait::async_trait]
32pub(super) trait BatchPropose: Send + Sync {
33    /// Returns the current consensus round.
34    fn current_round(&self) -> u64;
35
36    /// Returns `None` if the node is already synced; otherwise returns a future that resolves
37    /// once sync completes.
38    fn wait_for_synced_if_syncing(&self) -> Option<BoxFuture<'_, ()>>;
39
40    /// Returns `true` if the node is currently synced with the network.
41    fn is_synced(&self) -> bool;
42
43    /// Attempts to propose a batch.
44    ///
45    /// Returns `Ok(true)` when a batch was successfully proposed, `Ok(false)` to retry, and
46    /// `Err` on an unexpected error.
47    async fn propose_batch(&self) -> Result<bool>;
48}
49
50/// Manages batch proposal readiness and drives the batch proposal loop.
51///
52/// Holds the readiness state and the logic for the proposal task. The actual task is started by
53/// calling [`Self::run`] inside a spawned future (see [`Primary::start_handlers`]).
54pub struct ProposalTask<N: Network> {
55    inner: Arc<ProposalTaskInner>,
56    _phantom: PhantomData<N>,
57}
58
59/// Manual `Clone` impl so that `N: Clone` is not required.
60impl<N: Network> Clone for ProposalTask<N> {
61    fn clone(&self) -> Self {
62        Self { inner: Arc::clone(&self.inner), _phantom: PhantomData }
63    }
64}
65
66/// The inner state of a [`ProposalTask`], shared via `Arc`.
67struct ProposalTaskInner {
68    /// Tracks whether the primary is ready to propose a new batch.
69    ///
70    /// Initialized to `true` so round 1 can be proposed immediately without an explicit signal.
71    /// Set to `true` by [`ProposalTask::signal`] when a new round starts,
72    /// and reset to `false` after a batch is successfully proposed.
73    ready: watch::Sender<bool>,
74}
75
76impl<N: Network> Default for ProposalTask<N> {
77    fn default() -> Self {
78        let (ready, _) = watch::channel(true);
79        Self { inner: Arc::new(ProposalTaskInner { ready }), _phantom: PhantomData }
80    }
81}
82
83impl<N: Network> ProposalTask<N> {
84    /// Signals that the primary is ready to propose a new batch for the current round.
85    ///
86    /// Should be called from [`Primary::try_increment_to_the_next_round`] whenever the primary
87    /// successfully advances to a new round.
88    pub fn signal(&self) {
89        self.inner.ready.send_replace(true);
90    }
91
92    /// Runs the batch proposal loop. This is intended to be spawned as a long-running task.
93    ///
94    /// Each iteration covers one full round (wait → propose → wait for signatures).
95    /// The three stages are implemented as separate methods; see their doc-comments for details.
96    pub(super) async fn run<P: BatchPropose + 'static>(self, primary: P) {
97        let mut ready_rx = self.inner.ready.subscribe();
98
99        loop {
100            let round = primary.current_round();
101            // TODO(kaimast): the round_start time should be based on the timestamp of the
102            // previous batch, not the current wall-clock time.
103            let round_start = Instant::now();
104
105            if !Self::wait_until_proposal_ready(&primary, &mut ready_rx, round, round_start).await {
106                continue; // round changed; restart
107            }
108
109            if !Self::propose(&primary, round).await {
110                continue; // round changed; restart
111            }
112
113            // Reset readiness so the next round waits for an explicit signal.
114            self.inner.ready.send_replace(false);
115
116            Self::wait_for_signatures(&primary, &mut ready_rx, round).await;
117        }
118    }
119
120    /// Stage 1: Wait until conditions are met to propose a batch.
121    ///
122    /// Blocks until sync is complete, MIN_BATCH_DELAY has elapsed since `round_start`, and either
123    /// `signal()` fires (leader cert arrived) or MAX_BATCH_DELAY expires without one.
124    ///
125    /// Returns `true` if ready to propose, `false` if the round changed (caller should restart).
126    async fn wait_until_proposal_ready<P: BatchPropose>(
127        primary: &P,
128        ready_rx: &mut watch::Receiver<bool>,
129        round: u64,
130        round_start: Instant,
131    ) -> bool {
132        loop {
133            if primary.current_round() != round {
134                return false;
135            }
136
137            // A node cannot propose while it is syncing.
138            if let Some(fut) = primary.wait_for_synced_if_syncing() {
139                fut.await;
140                // Re-check round after sync completes.
141                continue;
142            }
143
144            // Enforce the minimum inter-proposal delay.
145            // This is a no-op once the deadline has already passed.
146            sleep_until(round_start + MIN_BATCH_DELAY).await;
147
148            // Wait for a readiness signal, the MAX_BATCH_DELAY deadline, or a short heartbeat
149            // that lets the round-change check at the top of the loop fire regularly.
150            tokio::select! {
151                _ = sleep_until(round_start + MAX_BATCH_DELAY) => {
152                    debug!("Did not receive leader certificate within MAX_BATCH_DELAY");
153                    return true;
154                },
155                _ = Self::wait_until_ready(ready_rx) => {
156                    return true;
157                },
158                _ = sleep(CREATE_BATCH_INTERVAL) => {
159                    debug!("Skipping batch proposal for round {round} {}", "(not ready yet)".dimmed());
160                }
161            };
162        }
163    }
164
165    /// Stage 2: Propose a batch.
166    ///
167    /// Calls `propose_batch()` with CREATE_BATCH_INTERVAL retries until it returns `Ok(true)`
168    /// (batch submitted to the network).
169    ///
170    /// Returns `true` if the batch was submitted, `false` if the round changed or the node started
171    /// syncing (caller should restart; Stage 1 will then await sync completion).
172    async fn propose<P: BatchPropose>(primary: &P, round: u64) -> bool {
173        let mut attempt = 1u32;
174        loop {
175            if primary.current_round() != round {
176                return false;
177            }
178
179            // Bail out if sync started mid-Stage-2; otherwise propose_batch may spin at the
180            // CREATE_BATCH_INTERVAL cadence on Ok(false) paths (e.g. previous round has not
181            // reached quorum, not enough connected validators, cached batch rebroadcast).
182            if !primary.is_synced() {
183                return false;
184            }
185
186            if attempt > 1 {
187                sleep(CREATE_BATCH_INTERVAL).await;
188                debug!("Retrying batch proposal for round {round} (attempt #{attempt})");
189            }
190
191            // Note: Do NOT spawn a task around this function call.  Proposing a batch is a
192            // critical path, and only one batch needs to be proposed at a time.
193            match primary.propose_batch().await {
194                Ok(true) => return true, // batch submitted; proceed to Stage 3
195                Ok(false) => {}          // not ready yet; retry
196                Err(err) => {
197                    warn!("{}", flatten_error(err.context("Cannot propose a batch")));
198                }
199            }
200
201            attempt += 1;
202        }
203    }
204
205    /// Stage 3: Wait for the proposed batch to collect enough signatures.
206    ///
207    /// Periodically rebroadcasts the batch to non-signers (via `propose_batch`) at most once per
208    /// MAX_BATCH_DELAY until the round advances. Returns when the round changes or when the node
209    /// starts syncing — in the latter case the outer loop restarts and Stage 1's sync gate takes
210    /// over.
211    async fn wait_for_signatures<P: BatchPropose>(primary: &P, ready_rx: &mut watch::Receiver<bool>, round: u64) {
212        loop {
213            if primary.current_round() != round {
214                return;
215            }
216
217            // Wait for the rebroadcast interval or an explicit round-advance signal,
218            // whichever comes first.
219            tokio::select! {
220                _ = Self::wait_until_ready(ready_rx) => return, // round advanced
221                _ = sleep(MAX_BATCH_DELAY) => {}
222            }
223
224            if primary.current_round() != round {
225                return;
226            }
227
228            // A node cannot rebroadcast its proposed batch while it is syncing — its previous
229            // certificates may be stale and peers won't sign it anyway. Bail out so the outer
230            // loop falls back through Stage 1, which awaits sync completion before proposing.
231            if !primary.is_synced() {
232                return;
233            }
234
235            // Rebroadcast to non-signers (`propose_batch` handles this internally).
236            match primary.propose_batch().await {
237                Ok(_) => {}
238                Err(err) => {
239                    warn!("{}", flatten_error(err.context("Cannot rebroadcast a batch")));
240                }
241            }
242        }
243    }
244
245    /// Waits until the readiness watch channel holds `true`. Returns immediately if it already does.
246    ///
247    /// Spurious wakeups (e.g. from a reset to `false`) are handled by re-checking the value in a loop.
248    async fn wait_until_ready(receiver: &mut watch::Receiver<bool>) {
249        loop {
250            // Fetch the `is_ready` value and return if it is true.
251            if *receiver.borrow_and_update() {
252                return;
253            }
254
255            // Block until the `is_value` changed, or the channel is closed.
256            if receiver.changed().await.is_err() {
257                return;
258            }
259        }
260    }
261}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266
267    use snarkvm::prelude::MainnetV0;
268    use std::{
269        sync::{
270            Arc,
271            atomic::{AtomicBool, AtomicU32, Ordering},
272        },
273        time::Duration,
274    };
275    use tokio::sync::Notify;
276
277    /// A minimal [`BatchPropose`] implementation for testing.
278    ///
279    /// Always reports round 1 and synced. Records how many times [`propose_batch`] is called and
280    /// fires a [`Notify`] on each call.
281    struct DummyProposer {
282        propose_count: Arc<AtomicU32>,
283        proposed_notify: Arc<Notify>,
284    }
285
286    #[async_trait::async_trait]
287    impl BatchPropose for DummyProposer {
288        fn current_round(&self) -> u64 {
289            1
290        }
291
292        fn wait_for_synced_if_syncing(&self) -> Option<BoxFuture<'_, ()>> {
293            None
294        }
295
296        fn is_synced(&self) -> bool {
297            true
298        }
299
300        async fn propose_batch(&self) -> Result<bool> {
301            self.propose_count.fetch_add(1, Ordering::SeqCst);
302            self.proposed_notify.notify_one();
303
304            Ok(true)
305        }
306    }
307
308    /// A [`BatchPropose`] implementation that returns round 1 on the very first call to
309    /// `current_round`, then round 2 for all subsequent calls.
310    ///
311    /// This simulates the round advancing between the outer-loop capture and the inner-loop
312    /// condition check, without any real-time waiting or time mocking.
313    struct RoundAdvancingProposer {
314        current_round_calls: Arc<AtomicU32>,
315        propose_count: Arc<AtomicU32>,
316    }
317
318    #[async_trait::async_trait]
319    impl BatchPropose for RoundAdvancingProposer {
320        fn current_round(&self) -> u64 {
321            let n = self.current_round_calls.fetch_add(1, Ordering::SeqCst);
322            if n == 0 { 1 } else { 2 }
323        }
324
325        fn wait_for_synced_if_syncing(&self) -> Option<BoxFuture<'_, ()>> {
326            None
327        }
328
329        fn is_synced(&self) -> bool {
330            true
331        }
332
333        async fn propose_batch(&self) -> Result<bool> {
334            self.propose_count.fetch_add(1, Ordering::SeqCst);
335            Ok(true)
336        }
337    }
338
339    /// A [`BatchPropose`] implementation that returns `Ok(false)` a fixed number of times before
340    /// succeeding.
341    struct RetryProposer {
342        retries_before_success: u32,
343        propose_count: Arc<AtomicU32>,
344        proposed_notify: Arc<Notify>,
345    }
346
347    #[async_trait::async_trait]
348    impl BatchPropose for RetryProposer {
349        fn current_round(&self) -> u64 {
350            1
351        }
352
353        fn wait_for_synced_if_syncing(&self) -> Option<BoxFuture<'_, ()>> {
354            None
355        }
356
357        fn is_synced(&self) -> bool {
358            true
359        }
360
361        async fn propose_batch(&self) -> Result<bool> {
362            let count = self.propose_count.fetch_add(1, Ordering::SeqCst) + 1;
363            if count <= self.retries_before_success {
364                Ok(false)
365            } else {
366                self.proposed_notify.notify_one();
367                Ok(true)
368            }
369        }
370    }
371
372    /// Signals the proposal task and verifies that `propose_batch` is called on the dummy.
373    #[tokio::test]
374    async fn test_proposal_task_calls_propose_batch_on_signal() {
375        // Start with the task not ready so the initial signal is the trigger.
376        let (ready, _) = watch::channel(false);
377        let task = ProposalTask::<MainnetV0> { inner: Arc::new(ProposalTaskInner { ready }), _phantom: PhantomData };
378
379        let proposed_notify = Arc::new(Notify::new());
380        let propose_count = Arc::new(AtomicU32::new(0));
381
382        let proposer = DummyProposer { propose_count: propose_count.clone(), proposed_notify: proposed_notify.clone() };
383
384        let task_for_spawn = task.clone();
385        tokio::spawn(task_for_spawn.run(proposer));
386
387        // Before signalling, propose_batch should not have been called.
388        sleep(Duration::from_millis(50)).await;
389        assert_eq!(propose_count.load(Ordering::SeqCst), 0, "propose_batch called before signal");
390
391        // Signal readiness — the proposal loop should wake up and call propose_batch.
392        task.signal();
393
394        tokio::time::timeout(std::time::Duration::from_secs(5), proposed_notify.notified())
395            .await
396            .expect("propose_batch was not called within 5 seconds after signal");
397
398        assert!(propose_count.load(Ordering::SeqCst) >= 1, "propose_batch was not called");
399    }
400
401    /// When the round advances between iterations, `propose_batch` is not called for the old round.
402    ///
403    /// `RoundAdvancingProposer` returns round 1 on the first `current_round()` call (outer-loop
404    /// capture) and round 2 on every subsequent call. The inner-loop condition therefore fails
405    /// immediately — no time mocking needed.
406    #[tokio::test]
407    async fn test_proposal_task_exits_on_round_advancement() {
408        let propose_count = Arc::new(AtomicU32::new(0));
409        let proposer = RoundAdvancingProposer {
410            current_round_calls: Arc::new(AtomicU32::new(0)),
411            propose_count: propose_count.clone(),
412        };
413
414        // Start not-ready so the task parks in round 2's inner loop without proposing round 1.
415        let (ready, _) = watch::channel(false);
416        let task = ProposalTask::<MainnetV0> { inner: Arc::new(ProposalTaskInner { ready }), _phantom: PhantomData };
417
418        tokio::spawn(task.run(proposer));
419
420        // Yield once: the task runs through round 1 (inner loop exits immediately because
421        // current_round() already returns 2) and then parks in round 2's inner loop.
422        tokio::task::yield_now().await;
423
424        assert_eq!(propose_count.load(Ordering::SeqCst), 0, "propose_batch called despite round advancement");
425    }
426
427    /// Tests the following scenario
428    ///
429    ///   1. A batch was already certified for the current round, so readiness is `false`.
430    ///   2. `signal()` is **never** called externally — the BFT cannot advance the round until
431    ///      `propose_batch()` is called (which internally checks the leader-certificate timer).
432    #[test_log::test(tokio::test)]
433    async fn test_proposal_task_advances_without_leader_cert() {
434        // Start NOT ready: simulates a batch that was already certified for the round but the
435        // round has not yet advanced (the even-round leader cert was missing — e.g. the elected
436        // leader was one of the freshly-reset minority validators).
437        let (ready, _) = watch::channel(false);
438        let task = ProposalTask::<MainnetV0> { inner: Arc::new(ProposalTaskInner { ready }), _phantom: PhantomData };
439
440        let proposed_notify = Arc::new(Notify::new());
441        let propose_count = Arc::new(AtomicU32::new(0));
442
443        // A proposer that stays on round 1 and returns Ok(true) on every call to
444        // propose_batch(), simulating try_advance_to_next_round finding the leader-certificate
445        // timer expired and advancing the round without an external signal().
446        struct NoSignalProposer {
447            propose_count: Arc<AtomicU32>,
448            proposed_notify: Arc<Notify>,
449        }
450
451        #[async_trait::async_trait]
452        impl BatchPropose for NoSignalProposer {
453            fn current_round(&self) -> u64 {
454                1
455            }
456
457            fn wait_for_synced_if_syncing(&self) -> Option<BoxFuture<'_, ()>> {
458                None
459            }
460
461            fn is_synced(&self) -> bool {
462                true
463            }
464
465            async fn propose_batch(&self) -> Result<bool> {
466                self.propose_count.fetch_add(1, Ordering::SeqCst);
467                self.proposed_notify.notify_one();
468                Ok(true)
469            }
470        }
471
472        let proposer =
473            NoSignalProposer { propose_count: propose_count.clone(), proposed_notify: proposed_notify.clone() };
474
475        // signal() is intentionally never called — the task must retry on its own.
476        tokio::spawn(task.run(proposer));
477
478        // Allow enough time for MAX_BATCH_DELAY (2.5 s) to elapse plus the CREATE_BATCH_INTERVAL
479        // (250 ms) retry window. Use 10 s to give generous headroom on slow CI machines.
480        tokio::time::timeout(std::time::Duration::from_secs(10), proposed_notify.notified())
481            .await
482            .expect("propose_batch was not called");
483
484        assert!(propose_count.load(Ordering::SeqCst) >= 1, "propose_batch should have been called at least once");
485    }
486
487    /// After the leader-certificate timer fires (MAX_BATCH_DELAY elapses without an explicit
488    /// `signal()`), the task should still retry `propose_batch` when it returns `Ok(false)` and
489    /// eventually succeed once it returns `Ok(true)`.
490    ///
491    /// This models the real primary: when a round is already certified but the round has not yet
492    /// advanced (e.g. the elected leader was a freshly-reset minority validator), `propose_batch`
493    /// returns `Ok(false)` until `try_advance_to_next_round` can make progress.
494    #[test_log::test(tokio::test)]
495    async fn test_proposal_task_retries_after_leader_timeout() {
496        const RETRIES: u32 = 2;
497
498        // Start NOT ready — no external signal will be sent. The task must wait for
499        // MAX_BATCH_DELAY to fire, then retry until propose_batch succeeds.
500        let (ready, _) = watch::channel(false);
501        let task = ProposalTask::<MainnetV0> { inner: Arc::new(ProposalTaskInner { ready }), _phantom: PhantomData };
502
503        let proposed_notify = Arc::new(Notify::new());
504        let propose_count = Arc::new(AtomicU32::new(0));
505        let proposer = RetryProposer {
506            retries_before_success: RETRIES,
507            propose_count: propose_count.clone(),
508            proposed_notify: proposed_notify.clone(),
509        };
510
511        // signal() is intentionally never called — the leader timeout arm must trigger.
512        tokio::spawn(task.run(proposer));
513
514        // Allow enough time for MAX_BATCH_DELAY (2.5 s) plus RETRIES × CREATE_BATCH_INTERVAL (250 ms each).
515        // Use 10 s to give generous headroom on slow CI machines.
516        tokio::time::timeout(std::time::Duration::from_secs(10), proposed_notify.notified())
517            .await
518            .expect("propose_batch did not succeed within 10 seconds after leader timeout");
519
520        // Stage 3 may make additional rebroadcast calls after success, so use >.
521        assert!(propose_count.load(Ordering::SeqCst) > RETRIES, "expected at least {} total attempts", RETRIES + 1);
522    }
523
524    /// When `propose_batch` returns `Ok(false)`, the task retries within the same round until
525    /// it succeeds.
526    #[tokio::test]
527    async fn test_proposal_task_retries_on_false() {
528        const RETRIES: u32 = 2;
529
530        // Default starts ready, so no signal needed.
531        let task = ProposalTask::<MainnetV0>::default();
532
533        let proposed_notify = Arc::new(Notify::new());
534        let propose_count = Arc::new(AtomicU32::new(0));
535        let proposer = RetryProposer {
536            retries_before_success: RETRIES,
537            propose_count: propose_count.clone(),
538            proposed_notify: proposed_notify.clone(),
539        };
540
541        tokio::spawn(task.run(proposer));
542
543        // The task internally waits MIN_BATCH_DELAY before the first attempt; allow up to 10s.
544        tokio::time::timeout(std::time::Duration::from_secs(10), proposed_notify.notified())
545            .await
546            .expect("propose_batch did not succeed within 10 seconds");
547
548        // Stage 3 may make additional rebroadcast calls after success, so use >.
549        assert!(propose_count.load(Ordering::SeqCst) > RETRIES, "expected at least {} total attempts", RETRIES + 1);
550    }
551
552    /// While the node is syncing, Stage 3 must not rebroadcast the proposed batch — its previous
553    /// certificates may be stale and peers will not sign it. Once sync completes, rebroadcast
554    /// should resume.
555    #[test_log::test(tokio::test)]
556    async fn test_proposal_task_pauses_rebroadcast_while_syncing() {
557        /// Synced for Stage 1/2. After the first successful `propose_batch`, flips to "syncing"
558        /// so Stage 3's rebroadcast loop must pause. The held `sync_release` `Notify` lets the
559        /// test resume sync on demand to assert that rebroadcast comes back.
560        struct SyncTogglingProposer {
561            propose_count: Arc<AtomicU32>,
562            proposed_notify: Arc<Notify>,
563            is_syncing: Arc<AtomicBool>,
564            sync_release: Arc<Notify>,
565        }
566
567        #[async_trait::async_trait]
568        impl BatchPropose for SyncTogglingProposer {
569            fn current_round(&self) -> u64 {
570                1
571            }
572
573            fn wait_for_synced_if_syncing(&self) -> Option<BoxFuture<'_, ()>> {
574                if self.is_syncing.load(Ordering::SeqCst) {
575                    let release = self.sync_release.clone();
576                    Some(Box::pin(async move { release.notified().await }))
577                } else {
578                    None
579                }
580            }
581
582            fn is_synced(&self) -> bool {
583                !self.is_syncing.load(Ordering::SeqCst)
584            }
585
586            async fn propose_batch(&self) -> Result<bool> {
587                self.propose_count.fetch_add(1, Ordering::SeqCst);
588                self.proposed_notify.notify_one();
589                // Transition to syncing once the Stage 2 proposal has gone out.
590                self.is_syncing.store(true, Ordering::SeqCst);
591                Ok(true)
592            }
593        }
594
595        // Default starts ready — Stage 1 completes after MIN_BATCH_DELAY without a signal.
596        let task = ProposalTask::<MainnetV0>::default();
597
598        let proposed_notify = Arc::new(Notify::new());
599        let propose_count = Arc::new(AtomicU32::new(0));
600        let is_syncing = Arc::new(AtomicBool::new(false));
601        let sync_release = Arc::new(Notify::new());
602
603        let proposer = SyncTogglingProposer {
604            propose_count: propose_count.clone(),
605            proposed_notify: proposed_notify.clone(),
606            is_syncing: is_syncing.clone(),
607            sync_release: sync_release.clone(),
608        };
609
610        tokio::spawn(task.run(proposer));
611
612        // Wait for Stage 2 to make its single propose call.
613        tokio::time::timeout(Duration::from_secs(10), proposed_notify.notified())
614            .await
615            .expect("Stage 2 did not call propose_batch within 10 seconds");
616        assert_eq!(propose_count.load(Ordering::SeqCst), 1, "expected exactly one Stage 2 call");
617
618        // Stage 3 sleeps MAX_BATCH_DELAY before each rebroadcast attempt; wait past that to give
619        // the sync gate a chance to fire. Without the gate, propose_count would increment here.
620        tokio::time::sleep(MAX_BATCH_DELAY + Duration::from_secs(1)).await;
621        assert_eq!(propose_count.load(Ordering::SeqCst), 1, "Stage 3 rebroadcast fired while the node was syncing",);
622
623        // Release sync. Stage 3 should resume rebroadcasting after the next MAX_BATCH_DELAY tick.
624        is_syncing.store(false, Ordering::SeqCst);
625        sync_release.notify_waiters();
626
627        tokio::time::timeout(MAX_BATCH_DELAY + Duration::from_secs(5), proposed_notify.notified())
628            .await
629            .expect("Stage 3 did not resume rebroadcast after sync completed");
630        assert!(propose_count.load(Ordering::SeqCst) >= 2, "expected rebroadcast after sync completed");
631    }
632
633    /// Stage 2 retries `propose_batch` every CREATE_BATCH_INTERVAL (250ms) while it returns
634    /// `Ok(false)`. If sync starts mid-retry, the loop must bail out so the outer loop can fall
635    /// back through Stage 1's sync gate — otherwise the node spins, calling `propose_batch` four
636    /// times per second (which on a real primary triggers the cached-batch rebroadcast).
637    #[test_log::test(tokio::test)]
638    async fn test_proposal_task_bails_stage_2_when_syncing_starts() {
639        /// Always returns `Ok(false)` from `propose_batch`, so Stage 2 enters its retry loop.
640        /// The test flips `syncing` to true after a few calls; subsequent calls should stop.
641        struct AlwaysFalseProposer {
642            propose_count: Arc<AtomicU32>,
643            syncing: Arc<AtomicBool>,
644            sync_release: Arc<Notify>,
645        }
646
647        #[async_trait::async_trait]
648        impl BatchPropose for AlwaysFalseProposer {
649            fn current_round(&self) -> u64 {
650                1
651            }
652
653            fn wait_for_synced_if_syncing(&self) -> Option<BoxFuture<'_, ()>> {
654                if self.syncing.load(Ordering::SeqCst) {
655                    let release = self.sync_release.clone();
656                    Some(Box::pin(async move { release.notified().await }))
657                } else {
658                    None
659                }
660            }
661
662            fn is_synced(&self) -> bool {
663                !self.syncing.load(Ordering::SeqCst)
664            }
665
666            async fn propose_batch(&self) -> Result<bool> {
667                self.propose_count.fetch_add(1, Ordering::SeqCst);
668                // Never succeed — Stage 2 will keep retrying every CREATE_BATCH_INTERVAL.
669                Ok(false)
670            }
671        }
672
673        let task = ProposalTask::<MainnetV0>::default();
674        let propose_count = Arc::new(AtomicU32::new(0));
675        let syncing = Arc::new(AtomicBool::new(false));
676        let sync_release = Arc::new(Notify::new());
677
678        let proposer = AlwaysFalseProposer {
679            propose_count: propose_count.clone(),
680            syncing: syncing.clone(),
681            sync_release: sync_release.clone(),
682        };
683
684        tokio::spawn(task.run(proposer));
685
686        // Let Stage 2 spin for a few CREATE_BATCH_INTERVAL ticks (250ms each). After MIN_BATCH_DELAY
687        // (1s) for Stage 1 to release plus a couple of retry cycles, we should see >= 2 calls.
688        tokio::time::sleep(Duration::from_millis(2000)).await;
689        let pre_sync_calls = propose_count.load(Ordering::SeqCst);
690        assert!(pre_sync_calls >= 2, "Stage 2 should have retried at least twice while synced (got {pre_sync_calls})");
691
692        // Start syncing. Stage 2 must bail out, and Stage 1 must then block on the sync gate.
693        syncing.store(true, Ordering::SeqCst);
694
695        // Allow Stage 2 to notice and bail, and Stage 1 to install its sync wait.
696        tokio::time::sleep(Duration::from_millis(500)).await;
697        let after_bail_calls = propose_count.load(Ordering::SeqCst);
698
699        // From this point, no further propose_batch calls should happen until sync releases.
700        tokio::time::sleep(Duration::from_secs(2)).await;
701        assert_eq!(
702            propose_count.load(Ordering::SeqCst),
703            after_bail_calls,
704            "propose_batch called while syncing — Stage 2 did not bail (or Stage 1 missed the gate)",
705        );
706
707        // Release sync; the outer loop should drive Stage 2 again.
708        syncing.store(false, Ordering::SeqCst);
709        sync_release.notify_waiters();
710        tokio::time::sleep(Duration::from_secs(2)).await;
711        assert!(propose_count.load(Ordering::SeqCst) > after_bail_calls, "Stage 2 did not resume after sync completed",);
712    }
713}