snarkos_node_bft/primary/proposal_task.rs
1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{CREATE_BATCH_INTERVAL, MAX_BATCH_DELAY, MIN_BATCH_DELAY};
17
18use anyhow::Result;
19use colored::Colorize;
20use futures::future::BoxFuture;
21use snarkvm::{prelude::Network, utilities::flatten_error};
22use std::{marker::PhantomData, sync::Arc};
23use tokio::{
24 sync::watch,
25 time::{Instant, sleep, sleep_until},
26};
27use tracing::{debug, warn};
28
29/// Abstracts over batch-proposal operations, allowing the proposal loop to be tested without a
30/// real primary.
31#[async_trait::async_trait]
32pub(super) trait BatchPropose: Send + Sync {
33 /// Returns the current consensus round.
34 fn current_round(&self) -> u64;
35
36 /// Returns `None` if the node is already synced; otherwise returns a future that resolves
37 /// once sync completes.
38 fn wait_for_synced_if_syncing(&self) -> Option<BoxFuture<'_, ()>>;
39
40 /// Returns `true` if the node is currently synced with the network.
41 fn is_synced(&self) -> bool;
42
43 /// Attempts to propose a batch.
44 ///
45 /// Returns `Ok(true)` when a batch was successfully proposed, `Ok(false)` to retry, and
46 /// `Err` on an unexpected error.
47 async fn propose_batch(&self) -> Result<bool>;
48}
49
50/// Manages batch proposal readiness and drives the batch proposal loop.
51///
52/// Holds the readiness state and the logic for the proposal task. The actual task is started by
53/// calling [`Self::run`] inside a spawned future (see [`Primary::start_handlers`]).
54pub struct ProposalTask<N: Network> {
55 inner: Arc<ProposalTaskInner>,
56 _phantom: PhantomData<N>,
57}
58
59/// Manual `Clone` impl so that `N: Clone` is not required.
60impl<N: Network> Clone for ProposalTask<N> {
61 fn clone(&self) -> Self {
62 Self { inner: Arc::clone(&self.inner), _phantom: PhantomData }
63 }
64}
65
66/// The inner state of a [`ProposalTask`], shared via `Arc`.
67struct ProposalTaskInner {
68 /// Tracks whether the primary is ready to propose a new batch.
69 ///
70 /// Initialized to `true` so round 1 can be proposed immediately without an explicit signal.
71 /// Set to `true` by [`ProposalTask::signal`] when a new round starts,
72 /// and reset to `false` after a batch is successfully proposed.
73 ready: watch::Sender<bool>,
74}
75
76impl<N: Network> Default for ProposalTask<N> {
77 fn default() -> Self {
78 let (ready, _) = watch::channel(true);
79 Self { inner: Arc::new(ProposalTaskInner { ready }), _phantom: PhantomData }
80 }
81}
82
83impl<N: Network> ProposalTask<N> {
84 /// Signals that the primary is ready to propose a new batch for the current round.
85 ///
86 /// Should be called from [`Primary::try_increment_to_the_next_round`] whenever the primary
87 /// successfully advances to a new round.
88 pub fn signal(&self) {
89 self.inner.ready.send_replace(true);
90 }
91
92 /// Runs the batch proposal loop. This is intended to be spawned as a long-running task.
93 ///
94 /// Each iteration covers one full round (wait → propose → wait for signatures).
95 /// The three stages are implemented as separate methods; see their doc-comments for details.
96 pub(super) async fn run<P: BatchPropose + 'static>(self, primary: P) {
97 let mut ready_rx = self.inner.ready.subscribe();
98
99 loop {
100 let round = primary.current_round();
101 // TODO(kaimast): the round_start time should be based on the timestamp of the
102 // previous batch, not the current wall-clock time.
103 let round_start = Instant::now();
104
105 if !Self::wait_until_proposal_ready(&primary, &mut ready_rx, round, round_start).await {
106 continue; // round changed; restart
107 }
108
109 if !Self::propose(&primary, round).await {
110 continue; // round changed; restart
111 }
112
113 // Reset readiness so the next round waits for an explicit signal.
114 self.inner.ready.send_replace(false);
115
116 Self::wait_for_signatures(&primary, &mut ready_rx, round).await;
117 }
118 }
119
120 /// Stage 1: Wait until conditions are met to propose a batch.
121 ///
122 /// Blocks until sync is complete, MIN_BATCH_DELAY has elapsed since `round_start`, and either
123 /// `signal()` fires (leader cert arrived) or MAX_BATCH_DELAY expires without one.
124 ///
125 /// Returns `true` if ready to propose, `false` if the round changed (caller should restart).
126 async fn wait_until_proposal_ready<P: BatchPropose>(
127 primary: &P,
128 ready_rx: &mut watch::Receiver<bool>,
129 round: u64,
130 round_start: Instant,
131 ) -> bool {
132 loop {
133 if primary.current_round() != round {
134 return false;
135 }
136
137 // A node cannot propose while it is syncing.
138 if let Some(fut) = primary.wait_for_synced_if_syncing() {
139 fut.await;
140 // Re-check round after sync completes.
141 continue;
142 }
143
144 // Enforce the minimum inter-proposal delay.
145 // This is a no-op once the deadline has already passed.
146 sleep_until(round_start + MIN_BATCH_DELAY).await;
147
148 // Wait for a readiness signal, the MAX_BATCH_DELAY deadline, or a short heartbeat
149 // that lets the round-change check at the top of the loop fire regularly.
150 tokio::select! {
151 _ = sleep_until(round_start + MAX_BATCH_DELAY) => {
152 debug!("Did not receive leader certificate within MAX_BATCH_DELAY");
153 return true;
154 },
155 _ = Self::wait_until_ready(ready_rx) => {
156 return true;
157 },
158 _ = sleep(CREATE_BATCH_INTERVAL) => {
159 debug!("Skipping batch proposal for round {round} {}", "(not ready yet)".dimmed());
160 }
161 };
162 }
163 }
164
165 /// Stage 2: Propose a batch.
166 ///
167 /// Calls `propose_batch()` with CREATE_BATCH_INTERVAL retries until it returns `Ok(true)`
168 /// (batch submitted to the network).
169 ///
170 /// Returns `true` if the batch was submitted, `false` if the round changed or the node started
171 /// syncing (caller should restart; Stage 1 will then await sync completion).
172 async fn propose<P: BatchPropose>(primary: &P, round: u64) -> bool {
173 let mut attempt = 1u32;
174 loop {
175 if primary.current_round() != round {
176 return false;
177 }
178
179 // Bail out if sync started mid-Stage-2; otherwise propose_batch may spin at the
180 // CREATE_BATCH_INTERVAL cadence on Ok(false) paths (e.g. previous round has not
181 // reached quorum, not enough connected validators, cached batch rebroadcast).
182 if !primary.is_synced() {
183 return false;
184 }
185
186 if attempt > 1 {
187 sleep(CREATE_BATCH_INTERVAL).await;
188 debug!("Retrying batch proposal for round {round} (attempt #{attempt})");
189 }
190
191 // Note: Do NOT spawn a task around this function call. Proposing a batch is a
192 // critical path, and only one batch needs to be proposed at a time.
193 match primary.propose_batch().await {
194 Ok(true) => return true, // batch submitted; proceed to Stage 3
195 Ok(false) => {} // not ready yet; retry
196 Err(err) => {
197 warn!("{}", flatten_error(err.context("Cannot propose a batch")));
198 }
199 }
200
201 attempt += 1;
202 }
203 }
204
205 /// Stage 3: Wait for the proposed batch to collect enough signatures.
206 ///
207 /// Periodically rebroadcasts the batch to non-signers (via `propose_batch`) at most once per
208 /// MAX_BATCH_DELAY until the round advances. Returns when the round changes or when the node
209 /// starts syncing — in the latter case the outer loop restarts and Stage 1's sync gate takes
210 /// over.
211 async fn wait_for_signatures<P: BatchPropose>(primary: &P, ready_rx: &mut watch::Receiver<bool>, round: u64) {
212 loop {
213 if primary.current_round() != round {
214 return;
215 }
216
217 // Wait for the rebroadcast interval or an explicit round-advance signal,
218 // whichever comes first.
219 tokio::select! {
220 _ = Self::wait_until_ready(ready_rx) => return, // round advanced
221 _ = sleep(MAX_BATCH_DELAY) => {}
222 }
223
224 if primary.current_round() != round {
225 return;
226 }
227
228 // A node cannot rebroadcast its proposed batch while it is syncing — its previous
229 // certificates may be stale and peers won't sign it anyway. Bail out so the outer
230 // loop falls back through Stage 1, which awaits sync completion before proposing.
231 if !primary.is_synced() {
232 return;
233 }
234
235 // Rebroadcast to non-signers (`propose_batch` handles this internally).
236 match primary.propose_batch().await {
237 Ok(_) => {}
238 Err(err) => {
239 warn!("{}", flatten_error(err.context("Cannot rebroadcast a batch")));
240 }
241 }
242 }
243 }
244
245 /// Waits until the readiness watch channel holds `true`. Returns immediately if it already does.
246 ///
247 /// Spurious wakeups (e.g. from a reset to `false`) are handled by re-checking the value in a loop.
248 async fn wait_until_ready(receiver: &mut watch::Receiver<bool>) {
249 loop {
250 // Fetch the `is_ready` value and return if it is true.
251 if *receiver.borrow_and_update() {
252 return;
253 }
254
255 // Block until the `is_value` changed, or the channel is closed.
256 if receiver.changed().await.is_err() {
257 return;
258 }
259 }
260 }
261}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266
267 use snarkvm::prelude::MainnetV0;
268 use std::{
269 sync::{
270 Arc,
271 atomic::{AtomicBool, AtomicU32, Ordering},
272 },
273 time::Duration,
274 };
275 use tokio::sync::Notify;
276
277 /// A minimal [`BatchPropose`] implementation for testing.
278 ///
279 /// Always reports round 1 and synced. Records how many times [`propose_batch`] is called and
280 /// fires a [`Notify`] on each call.
281 struct DummyProposer {
282 propose_count: Arc<AtomicU32>,
283 proposed_notify: Arc<Notify>,
284 }
285
286 #[async_trait::async_trait]
287 impl BatchPropose for DummyProposer {
288 fn current_round(&self) -> u64 {
289 1
290 }
291
292 fn wait_for_synced_if_syncing(&self) -> Option<BoxFuture<'_, ()>> {
293 None
294 }
295
296 fn is_synced(&self) -> bool {
297 true
298 }
299
300 async fn propose_batch(&self) -> Result<bool> {
301 self.propose_count.fetch_add(1, Ordering::SeqCst);
302 self.proposed_notify.notify_one();
303
304 Ok(true)
305 }
306 }
307
308 /// A [`BatchPropose`] implementation that returns round 1 on the very first call to
309 /// `current_round`, then round 2 for all subsequent calls.
310 ///
311 /// This simulates the round advancing between the outer-loop capture and the inner-loop
312 /// condition check, without any real-time waiting or time mocking.
313 struct RoundAdvancingProposer {
314 current_round_calls: Arc<AtomicU32>,
315 propose_count: Arc<AtomicU32>,
316 }
317
318 #[async_trait::async_trait]
319 impl BatchPropose for RoundAdvancingProposer {
320 fn current_round(&self) -> u64 {
321 let n = self.current_round_calls.fetch_add(1, Ordering::SeqCst);
322 if n == 0 { 1 } else { 2 }
323 }
324
325 fn wait_for_synced_if_syncing(&self) -> Option<BoxFuture<'_, ()>> {
326 None
327 }
328
329 fn is_synced(&self) -> bool {
330 true
331 }
332
333 async fn propose_batch(&self) -> Result<bool> {
334 self.propose_count.fetch_add(1, Ordering::SeqCst);
335 Ok(true)
336 }
337 }
338
339 /// A [`BatchPropose`] implementation that returns `Ok(false)` a fixed number of times before
340 /// succeeding.
341 struct RetryProposer {
342 retries_before_success: u32,
343 propose_count: Arc<AtomicU32>,
344 proposed_notify: Arc<Notify>,
345 }
346
347 #[async_trait::async_trait]
348 impl BatchPropose for RetryProposer {
349 fn current_round(&self) -> u64 {
350 1
351 }
352
353 fn wait_for_synced_if_syncing(&self) -> Option<BoxFuture<'_, ()>> {
354 None
355 }
356
357 fn is_synced(&self) -> bool {
358 true
359 }
360
361 async fn propose_batch(&self) -> Result<bool> {
362 let count = self.propose_count.fetch_add(1, Ordering::SeqCst) + 1;
363 if count <= self.retries_before_success {
364 Ok(false)
365 } else {
366 self.proposed_notify.notify_one();
367 Ok(true)
368 }
369 }
370 }
371
372 /// Signals the proposal task and verifies that `propose_batch` is called on the dummy.
373 #[tokio::test]
374 async fn test_proposal_task_calls_propose_batch_on_signal() {
375 // Start with the task not ready so the initial signal is the trigger.
376 let (ready, _) = watch::channel(false);
377 let task = ProposalTask::<MainnetV0> { inner: Arc::new(ProposalTaskInner { ready }), _phantom: PhantomData };
378
379 let proposed_notify = Arc::new(Notify::new());
380 let propose_count = Arc::new(AtomicU32::new(0));
381
382 let proposer = DummyProposer { propose_count: propose_count.clone(), proposed_notify: proposed_notify.clone() };
383
384 let task_for_spawn = task.clone();
385 tokio::spawn(task_for_spawn.run(proposer));
386
387 // Before signalling, propose_batch should not have been called.
388 sleep(Duration::from_millis(50)).await;
389 assert_eq!(propose_count.load(Ordering::SeqCst), 0, "propose_batch called before signal");
390
391 // Signal readiness — the proposal loop should wake up and call propose_batch.
392 task.signal();
393
394 tokio::time::timeout(std::time::Duration::from_secs(5), proposed_notify.notified())
395 .await
396 .expect("propose_batch was not called within 5 seconds after signal");
397
398 assert!(propose_count.load(Ordering::SeqCst) >= 1, "propose_batch was not called");
399 }
400
401 /// When the round advances between iterations, `propose_batch` is not called for the old round.
402 ///
403 /// `RoundAdvancingProposer` returns round 1 on the first `current_round()` call (outer-loop
404 /// capture) and round 2 on every subsequent call. The inner-loop condition therefore fails
405 /// immediately — no time mocking needed.
406 #[tokio::test]
407 async fn test_proposal_task_exits_on_round_advancement() {
408 let propose_count = Arc::new(AtomicU32::new(0));
409 let proposer = RoundAdvancingProposer {
410 current_round_calls: Arc::new(AtomicU32::new(0)),
411 propose_count: propose_count.clone(),
412 };
413
414 // Start not-ready so the task parks in round 2's inner loop without proposing round 1.
415 let (ready, _) = watch::channel(false);
416 let task = ProposalTask::<MainnetV0> { inner: Arc::new(ProposalTaskInner { ready }), _phantom: PhantomData };
417
418 tokio::spawn(task.run(proposer));
419
420 // Yield once: the task runs through round 1 (inner loop exits immediately because
421 // current_round() already returns 2) and then parks in round 2's inner loop.
422 tokio::task::yield_now().await;
423
424 assert_eq!(propose_count.load(Ordering::SeqCst), 0, "propose_batch called despite round advancement");
425 }
426
427 /// Tests the following scenario
428 ///
429 /// 1. A batch was already certified for the current round, so readiness is `false`.
430 /// 2. `signal()` is **never** called externally — the BFT cannot advance the round until
431 /// `propose_batch()` is called (which internally checks the leader-certificate timer).
432 #[test_log::test(tokio::test)]
433 async fn test_proposal_task_advances_without_leader_cert() {
434 // Start NOT ready: simulates a batch that was already certified for the round but the
435 // round has not yet advanced (the even-round leader cert was missing — e.g. the elected
436 // leader was one of the freshly-reset minority validators).
437 let (ready, _) = watch::channel(false);
438 let task = ProposalTask::<MainnetV0> { inner: Arc::new(ProposalTaskInner { ready }), _phantom: PhantomData };
439
440 let proposed_notify = Arc::new(Notify::new());
441 let propose_count = Arc::new(AtomicU32::new(0));
442
443 // A proposer that stays on round 1 and returns Ok(true) on every call to
444 // propose_batch(), simulating try_advance_to_next_round finding the leader-certificate
445 // timer expired and advancing the round without an external signal().
446 struct NoSignalProposer {
447 propose_count: Arc<AtomicU32>,
448 proposed_notify: Arc<Notify>,
449 }
450
451 #[async_trait::async_trait]
452 impl BatchPropose for NoSignalProposer {
453 fn current_round(&self) -> u64 {
454 1
455 }
456
457 fn wait_for_synced_if_syncing(&self) -> Option<BoxFuture<'_, ()>> {
458 None
459 }
460
461 fn is_synced(&self) -> bool {
462 true
463 }
464
465 async fn propose_batch(&self) -> Result<bool> {
466 self.propose_count.fetch_add(1, Ordering::SeqCst);
467 self.proposed_notify.notify_one();
468 Ok(true)
469 }
470 }
471
472 let proposer =
473 NoSignalProposer { propose_count: propose_count.clone(), proposed_notify: proposed_notify.clone() };
474
475 // signal() is intentionally never called — the task must retry on its own.
476 tokio::spawn(task.run(proposer));
477
478 // Allow enough time for MAX_BATCH_DELAY (2.5 s) to elapse plus the CREATE_BATCH_INTERVAL
479 // (250 ms) retry window. Use 10 s to give generous headroom on slow CI machines.
480 tokio::time::timeout(std::time::Duration::from_secs(10), proposed_notify.notified())
481 .await
482 .expect("propose_batch was not called");
483
484 assert!(propose_count.load(Ordering::SeqCst) >= 1, "propose_batch should have been called at least once");
485 }
486
487 /// After the leader-certificate timer fires (MAX_BATCH_DELAY elapses without an explicit
488 /// `signal()`), the task should still retry `propose_batch` when it returns `Ok(false)` and
489 /// eventually succeed once it returns `Ok(true)`.
490 ///
491 /// This models the real primary: when a round is already certified but the round has not yet
492 /// advanced (e.g. the elected leader was a freshly-reset minority validator), `propose_batch`
493 /// returns `Ok(false)` until `try_advance_to_next_round` can make progress.
494 #[test_log::test(tokio::test)]
495 async fn test_proposal_task_retries_after_leader_timeout() {
496 const RETRIES: u32 = 2;
497
498 // Start NOT ready — no external signal will be sent. The task must wait for
499 // MAX_BATCH_DELAY to fire, then retry until propose_batch succeeds.
500 let (ready, _) = watch::channel(false);
501 let task = ProposalTask::<MainnetV0> { inner: Arc::new(ProposalTaskInner { ready }), _phantom: PhantomData };
502
503 let proposed_notify = Arc::new(Notify::new());
504 let propose_count = Arc::new(AtomicU32::new(0));
505 let proposer = RetryProposer {
506 retries_before_success: RETRIES,
507 propose_count: propose_count.clone(),
508 proposed_notify: proposed_notify.clone(),
509 };
510
511 // signal() is intentionally never called — the leader timeout arm must trigger.
512 tokio::spawn(task.run(proposer));
513
514 // Allow enough time for MAX_BATCH_DELAY (2.5 s) plus RETRIES × CREATE_BATCH_INTERVAL (250 ms each).
515 // Use 10 s to give generous headroom on slow CI machines.
516 tokio::time::timeout(std::time::Duration::from_secs(10), proposed_notify.notified())
517 .await
518 .expect("propose_batch did not succeed within 10 seconds after leader timeout");
519
520 // Stage 3 may make additional rebroadcast calls after success, so use >.
521 assert!(propose_count.load(Ordering::SeqCst) > RETRIES, "expected at least {} total attempts", RETRIES + 1);
522 }
523
524 /// When `propose_batch` returns `Ok(false)`, the task retries within the same round until
525 /// it succeeds.
526 #[tokio::test]
527 async fn test_proposal_task_retries_on_false() {
528 const RETRIES: u32 = 2;
529
530 // Default starts ready, so no signal needed.
531 let task = ProposalTask::<MainnetV0>::default();
532
533 let proposed_notify = Arc::new(Notify::new());
534 let propose_count = Arc::new(AtomicU32::new(0));
535 let proposer = RetryProposer {
536 retries_before_success: RETRIES,
537 propose_count: propose_count.clone(),
538 proposed_notify: proposed_notify.clone(),
539 };
540
541 tokio::spawn(task.run(proposer));
542
543 // The task internally waits MIN_BATCH_DELAY before the first attempt; allow up to 10s.
544 tokio::time::timeout(std::time::Duration::from_secs(10), proposed_notify.notified())
545 .await
546 .expect("propose_batch did not succeed within 10 seconds");
547
548 // Stage 3 may make additional rebroadcast calls after success, so use >.
549 assert!(propose_count.load(Ordering::SeqCst) > RETRIES, "expected at least {} total attempts", RETRIES + 1);
550 }
551
552 /// While the node is syncing, Stage 3 must not rebroadcast the proposed batch — its previous
553 /// certificates may be stale and peers will not sign it. Once sync completes, rebroadcast
554 /// should resume.
555 #[test_log::test(tokio::test)]
556 async fn test_proposal_task_pauses_rebroadcast_while_syncing() {
557 /// Synced for Stage 1/2. After the first successful `propose_batch`, flips to "syncing"
558 /// so Stage 3's rebroadcast loop must pause. The held `sync_release` `Notify` lets the
559 /// test resume sync on demand to assert that rebroadcast comes back.
560 struct SyncTogglingProposer {
561 propose_count: Arc<AtomicU32>,
562 proposed_notify: Arc<Notify>,
563 is_syncing: Arc<AtomicBool>,
564 sync_release: Arc<Notify>,
565 }
566
567 #[async_trait::async_trait]
568 impl BatchPropose for SyncTogglingProposer {
569 fn current_round(&self) -> u64 {
570 1
571 }
572
573 fn wait_for_synced_if_syncing(&self) -> Option<BoxFuture<'_, ()>> {
574 if self.is_syncing.load(Ordering::SeqCst) {
575 let release = self.sync_release.clone();
576 Some(Box::pin(async move { release.notified().await }))
577 } else {
578 None
579 }
580 }
581
582 fn is_synced(&self) -> bool {
583 !self.is_syncing.load(Ordering::SeqCst)
584 }
585
586 async fn propose_batch(&self) -> Result<bool> {
587 self.propose_count.fetch_add(1, Ordering::SeqCst);
588 self.proposed_notify.notify_one();
589 // Transition to syncing once the Stage 2 proposal has gone out.
590 self.is_syncing.store(true, Ordering::SeqCst);
591 Ok(true)
592 }
593 }
594
595 // Default starts ready — Stage 1 completes after MIN_BATCH_DELAY without a signal.
596 let task = ProposalTask::<MainnetV0>::default();
597
598 let proposed_notify = Arc::new(Notify::new());
599 let propose_count = Arc::new(AtomicU32::new(0));
600 let is_syncing = Arc::new(AtomicBool::new(false));
601 let sync_release = Arc::new(Notify::new());
602
603 let proposer = SyncTogglingProposer {
604 propose_count: propose_count.clone(),
605 proposed_notify: proposed_notify.clone(),
606 is_syncing: is_syncing.clone(),
607 sync_release: sync_release.clone(),
608 };
609
610 tokio::spawn(task.run(proposer));
611
612 // Wait for Stage 2 to make its single propose call.
613 tokio::time::timeout(Duration::from_secs(10), proposed_notify.notified())
614 .await
615 .expect("Stage 2 did not call propose_batch within 10 seconds");
616 assert_eq!(propose_count.load(Ordering::SeqCst), 1, "expected exactly one Stage 2 call");
617
618 // Stage 3 sleeps MAX_BATCH_DELAY before each rebroadcast attempt; wait past that to give
619 // the sync gate a chance to fire. Without the gate, propose_count would increment here.
620 tokio::time::sleep(MAX_BATCH_DELAY + Duration::from_secs(1)).await;
621 assert_eq!(propose_count.load(Ordering::SeqCst), 1, "Stage 3 rebroadcast fired while the node was syncing",);
622
623 // Release sync. Stage 3 should resume rebroadcasting after the next MAX_BATCH_DELAY tick.
624 is_syncing.store(false, Ordering::SeqCst);
625 sync_release.notify_waiters();
626
627 tokio::time::timeout(MAX_BATCH_DELAY + Duration::from_secs(5), proposed_notify.notified())
628 .await
629 .expect("Stage 3 did not resume rebroadcast after sync completed");
630 assert!(propose_count.load(Ordering::SeqCst) >= 2, "expected rebroadcast after sync completed");
631 }
632
633 /// Stage 2 retries `propose_batch` every CREATE_BATCH_INTERVAL (250ms) while it returns
634 /// `Ok(false)`. If sync starts mid-retry, the loop must bail out so the outer loop can fall
635 /// back through Stage 1's sync gate — otherwise the node spins, calling `propose_batch` four
636 /// times per second (which on a real primary triggers the cached-batch rebroadcast).
637 #[test_log::test(tokio::test)]
638 async fn test_proposal_task_bails_stage_2_when_syncing_starts() {
639 /// Always returns `Ok(false)` from `propose_batch`, so Stage 2 enters its retry loop.
640 /// The test flips `syncing` to true after a few calls; subsequent calls should stop.
641 struct AlwaysFalseProposer {
642 propose_count: Arc<AtomicU32>,
643 syncing: Arc<AtomicBool>,
644 sync_release: Arc<Notify>,
645 }
646
647 #[async_trait::async_trait]
648 impl BatchPropose for AlwaysFalseProposer {
649 fn current_round(&self) -> u64 {
650 1
651 }
652
653 fn wait_for_synced_if_syncing(&self) -> Option<BoxFuture<'_, ()>> {
654 if self.syncing.load(Ordering::SeqCst) {
655 let release = self.sync_release.clone();
656 Some(Box::pin(async move { release.notified().await }))
657 } else {
658 None
659 }
660 }
661
662 fn is_synced(&self) -> bool {
663 !self.syncing.load(Ordering::SeqCst)
664 }
665
666 async fn propose_batch(&self) -> Result<bool> {
667 self.propose_count.fetch_add(1, Ordering::SeqCst);
668 // Never succeed — Stage 2 will keep retrying every CREATE_BATCH_INTERVAL.
669 Ok(false)
670 }
671 }
672
673 let task = ProposalTask::<MainnetV0>::default();
674 let propose_count = Arc::new(AtomicU32::new(0));
675 let syncing = Arc::new(AtomicBool::new(false));
676 let sync_release = Arc::new(Notify::new());
677
678 let proposer = AlwaysFalseProposer {
679 propose_count: propose_count.clone(),
680 syncing: syncing.clone(),
681 sync_release: sync_release.clone(),
682 };
683
684 tokio::spawn(task.run(proposer));
685
686 // Let Stage 2 spin for a few CREATE_BATCH_INTERVAL ticks (250ms each). After MIN_BATCH_DELAY
687 // (1s) for Stage 1 to release plus a couple of retry cycles, we should see >= 2 calls.
688 tokio::time::sleep(Duration::from_millis(2000)).await;
689 let pre_sync_calls = propose_count.load(Ordering::SeqCst);
690 assert!(pre_sync_calls >= 2, "Stage 2 should have retried at least twice while synced (got {pre_sync_calls})");
691
692 // Start syncing. Stage 2 must bail out, and Stage 1 must then block on the sync gate.
693 syncing.store(true, Ordering::SeqCst);
694
695 // Allow Stage 2 to notice and bail, and Stage 1 to install its sync wait.
696 tokio::time::sleep(Duration::from_millis(500)).await;
697 let after_bail_calls = propose_count.load(Ordering::SeqCst);
698
699 // From this point, no further propose_batch calls should happen until sync releases.
700 tokio::time::sleep(Duration::from_secs(2)).await;
701 assert_eq!(
702 propose_count.load(Ordering::SeqCst),
703 after_bail_calls,
704 "propose_batch called while syncing — Stage 2 did not bail (or Stage 1 missed the gate)",
705 );
706
707 // Release sync; the outer loop should drive Stage 2 again.
708 syncing.store(false, Ordering::SeqCst);
709 sync_release.notify_waiters();
710 tokio::time::sleep(Duration::from_secs(2)).await;
711 assert!(propose_count.load(Ordering::SeqCst) > after_bail_calls, "Stage 2 did not resume after sync completed",);
712 }
713}