Skip to main content

nodedb_cluster/raft_loop/
loop_core.rs

1//! `RaftLoop` struct, constructors, top-level run loop, and thin wrappers
2//! over `MultiRaft` proposal APIs. The tick body lives in
3//! [`super::tick`]; the inbound-RPC handler lives in
4//! [`super::handle_rpc`]; the async join orchestration lives in
5//! [`super::join`].
6
7use std::pin::Pin;
8use std::sync::{Arc, Mutex, RwLock};
9use std::time::{Duration, Instant};
10
11use tracing::debug;
12
13use nodedb_raft::message::LogEntry;
14
15use crate::catalog::ClusterCatalog;
16use crate::conf_change::ConfChange;
17use crate::error::Result;
18use crate::forward::{NoopPlanExecutor, PlanExecutor};
19use crate::loop_metrics::LoopMetrics;
20use crate::metadata_group::applier::{MetadataApplier, NoopMetadataApplier};
21use crate::multi_raft::MultiRaft;
22use crate::topology::ClusterTopology;
23use crate::transport::NexarTransport;
24
25/// Default tick interval (10ms — fast enough for sub-second elections).
26///
27/// Matches `ClusterTransportTuning::raft_tick_interval_ms` default.
28pub(super) const DEFAULT_TICK_INTERVAL: Duration = Duration::from_millis(10);
29
30/// Callback for applying committed Raft log entries to the state machine.
31///
32/// Called synchronously during the tick loop. Implementations should be fast
33/// (enqueue to SPSC, not perform I/O directly).
34pub trait CommitApplier: Send + Sync + 'static {
35    /// Apply committed entries for a Raft group.
36    ///
37    /// Returns the index of the last successfully applied entry.
38    fn apply_committed(&self, group_id: u64, entries: &[LogEntry]) -> u64;
39}
40
41/// Type-erased async handler for incoming `VShardEnvelope` messages.
42///
43/// Receives raw envelope bytes, returns response bytes. Set by the main binary
44/// to dispatch to the appropriate engine handler (Event Plane, timeseries, etc.).
45pub type VShardEnvelopeHandler = Arc<
46    dyn Fn(Vec<u8>) -> Pin<Box<dyn std::future::Future<Output = Result<Vec<u8>>> + Send>>
47        + Send
48        + Sync,
49>;
50
51/// Raft event loop coordinator.
52///
53/// Owns the MultiRaft state (behind `Arc<Mutex>`) and drives it via periodic
54/// ticks. Implements [`crate::transport::RaftRpcHandler`] (in
55/// [`super::handle_rpc`]) so it can be passed directly to
56/// [`NexarTransport::serve`] for incoming RPC dispatch.
57///
58/// The `F: RequestForwarder` generic parameter was removed in C-δ.6 when the
59/// SQL-string forwarding path was retired. Cross-node SQL routing now goes
60/// through `gateway.execute / ExecuteRequest` (C-β path).
61pub struct RaftLoop<A: CommitApplier, P: PlanExecutor = NoopPlanExecutor> {
62    pub(super) node_id: u64,
63    pub(super) multi_raft: Arc<Mutex<MultiRaft>>,
64    pub(super) transport: Arc<NexarTransport>,
65    pub(super) topology: Arc<RwLock<ClusterTopology>>,
66    pub(super) applier: A,
67    /// Applies committed entries from the metadata Raft group (group 0).
68    pub(super) metadata_applier: Arc<dyn MetadataApplier>,
69    /// Executes incoming `ExecuteRequest` RPCs without SQL re-planning.
70    pub(super) plan_executor: Arc<P>,
71    pub(super) tick_interval: Duration,
72    /// Optional handler for incoming VShardEnvelope messages.
73    /// Set when the Event Plane or other subsystems need cross-node messaging.
74    pub(super) vshard_handler: Option<VShardEnvelopeHandler>,
75    /// Optional catalog handle for persisting topology/routing updates
76    /// from the join flow. When `None`, persistence is skipped — useful
77    /// for unit tests that don't care about durability.
78    pub(super) catalog: Option<Arc<ClusterCatalog>>,
79    /// Cooperative shutdown signal observed by every detached
80    /// `tokio::spawn` task in [`super::tick`]. `run()` flips it on
81    /// its own shutdown, and [`Self::begin_shutdown`] provides a
82    /// direct entry point for test harnesses that abort the run /
83    /// serve handles and need the spawned tasks to drop their
84    /// `Arc<Mutex<MultiRaft>>` clones immediately so the per-group
85    /// redb log files can release their in-process locks.
86    ///
87    /// Using `watch::Sender` here rather than a raw `AtomicBool` +
88    /// `Notify` pair gives us two properties at once: the latest
89    /// value is visible to every newly-subscribed receiver (no
90    /// missed-notification race when a new detached task is
91    /// spawned just after `begin_shutdown`), and awaiting
92    /// `receiver.changed()` is cancellable inside `tokio::select!`.
93    pub(super) shutdown_watch: tokio::sync::watch::Sender<bool>,
94    /// Standardized loop observations. Updated inside `run()` after
95    /// every `do_tick`. Register via
96    /// [`Self::loop_metrics`] with the cluster registry.
97    pub(super) loop_metrics: Arc<LoopMetrics>,
98    /// Boot-time readiness signal. Flipped to `true` by
99    /// [`super::tick::do_tick`] after the first tick completes phase
100    /// 4 (apply committed entries) — i.e. once Raft has driven at
101    /// least one no-op or replayed entries past the persisted
102    /// applied watermark on this node.
103    ///
104    /// The host crate's `start_raft` returns `subscribe_ready()` to
105    /// `main.rs`, which awaits it before binding any client-facing
106    /// listener. This guarantees the first SQL DDL the operator
107    /// runs after process start cannot race against an
108    /// uninitialized metadata raft group, which previously surfaced
109    /// as `metadata propose: not leader` under fast restart loops.
110    pub(super) ready_watch: tokio::sync::watch::Sender<bool>,
111}
112
113impl<A: CommitApplier> RaftLoop<A> {
114    pub fn new(
115        multi_raft: MultiRaft,
116        transport: Arc<NexarTransport>,
117        topology: Arc<RwLock<ClusterTopology>>,
118        applier: A,
119    ) -> Self {
120        let node_id = multi_raft.node_id();
121        let (shutdown_watch, _) = tokio::sync::watch::channel(false);
122        let (ready_watch, _) = tokio::sync::watch::channel(false);
123        Self {
124            node_id,
125            multi_raft: Arc::new(Mutex::new(multi_raft)),
126            transport,
127            topology,
128            applier,
129            metadata_applier: Arc::new(NoopMetadataApplier),
130            plan_executor: Arc::new(NoopPlanExecutor),
131            tick_interval: DEFAULT_TICK_INTERVAL,
132            vshard_handler: None,
133            catalog: None,
134            shutdown_watch,
135            ready_watch,
136            loop_metrics: LoopMetrics::new("raft_tick_loop"),
137        }
138    }
139}
140
141impl<A: CommitApplier, P: PlanExecutor> RaftLoop<A, P> {
142    /// Install a custom plan executor (for cluster mode — C-β path).
143    pub fn with_plan_executor<P2: PlanExecutor>(self, executor: Arc<P2>) -> RaftLoop<A, P2> {
144        RaftLoop {
145            node_id: self.node_id,
146            multi_raft: self.multi_raft,
147            transport: self.transport,
148            topology: self.topology,
149            applier: self.applier,
150            metadata_applier: self.metadata_applier,
151            plan_executor: executor,
152            tick_interval: self.tick_interval,
153            vshard_handler: self.vshard_handler,
154            catalog: self.catalog,
155            shutdown_watch: self.shutdown_watch,
156            ready_watch: self.ready_watch,
157            loop_metrics: self.loop_metrics,
158        }
159    }
160
161    /// Shared handle to this loop's standardized metrics.
162    pub fn loop_metrics(&self) -> Arc<LoopMetrics> {
163        Arc::clone(&self.loop_metrics)
164    }
165
166    /// Count of Raft groups currently mounted on this node — used to
167    /// render the `raft_tick_loop_pending_groups` gauge.
168    pub fn pending_groups(&self) -> usize {
169        let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
170        mr.group_count()
171    }
172
173    /// Signal cooperative shutdown to every detached task spawned
174    /// inside [`super::tick::do_tick`].
175    ///
176    /// This is the entry point for test harnesses that want to
177    /// tear down a `RaftLoop` without waiting for the external
178    /// `run()` shutdown watch channel to propagate. In production
179    /// the same signal is emitted automatically by `run()` when
180    /// its external shutdown receiver fires.
181    ///
182    /// Idempotent: calling this multiple times is a no-op after
183    /// the first.
184    pub fn begin_shutdown(&self) {
185        let _ = self.shutdown_watch.send(true);
186    }
187
188    /// Subscribe to the boot-time readiness signal.
189    ///
190    /// The returned receiver starts at `false` and flips to `true`
191    /// exactly once, after the first [`super::tick::do_tick`]
192    /// completes phase 4 (apply committed entries). Used by the
193    /// host crate to gate client-facing listener startup until the
194    /// metadata raft group has produced its first applied entry.
195    pub fn subscribe_ready(&self) -> tokio::sync::watch::Receiver<bool> {
196        self.ready_watch.subscribe()
197    }
198
199    /// Set a handler for incoming VShardEnvelope messages.
200    pub fn with_vshard_handler(mut self, handler: VShardEnvelopeHandler) -> Self {
201        self.vshard_handler = Some(handler);
202        self
203    }
204
205    /// Install the metadata applier used for group-0 commits.
206    ///
207    /// The host crate (nodedb) calls this with a production applier that
208    /// wraps an in-memory `MetadataCache` and additionally persists to
209    /// redb / broadcasts catalog change events. The default
210    /// [`NoopMetadataApplier`] is kept only for tests that don't care.
211    pub fn with_metadata_applier(mut self, applier: Arc<dyn MetadataApplier>) -> Self {
212        self.metadata_applier = applier;
213        self
214    }
215
216    pub fn with_tick_interval(mut self, interval: Duration) -> Self {
217        self.tick_interval = interval;
218        self
219    }
220
221    /// Attach a cluster catalog — used by the join flow to persist the
222    /// updated topology + routing after a conf-change commits.
223    pub fn with_catalog(mut self, catalog: Arc<ClusterCatalog>) -> Self {
224        self.catalog = Some(catalog);
225        self
226    }
227
228    /// This node's id (exposed for handlers and tests).
229    pub fn node_id(&self) -> u64 {
230        self.node_id
231    }
232
233    /// Run the event loop until shutdown.
234    ///
235    /// This drives Raft elections, heartbeats, and message dispatch.
236    /// Call [`NexarTransport::serve`] separately with `Arc<Self>` as the handler.
237    ///
238    /// When the externally-supplied `shutdown` receiver fires,
239    /// the loop also propagates the signal to the internal
240    /// cooperative-shutdown channel so every detached task
241    /// spawned inside `do_tick` exits promptly and drops its
242    /// `Arc<Mutex<MultiRaft>>` clone.
243    pub async fn run(&self, mut shutdown: tokio::sync::watch::Receiver<bool>) {
244        let mut interval = tokio::time::interval(self.tick_interval);
245        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
246        self.loop_metrics.set_up(true);
247
248        loop {
249            tokio::select! {
250                _ = interval.tick() => {
251                    let started = Instant::now();
252                    self.do_tick();
253                    self.loop_metrics.observe(started.elapsed());
254                }
255                _ = shutdown.changed() => {
256                    if *shutdown.borrow() {
257                        debug!("raft loop shutting down");
258                        self.begin_shutdown();
259                        break;
260                    }
261                }
262            }
263        }
264        self.loop_metrics.set_up(false);
265    }
266
267    /// Propose a command to the Raft group owning the given vShard.
268    ///
269    /// Returns `(group_id, log_index)` on success.
270    pub fn propose(&self, vshard_id: u16, data: Vec<u8>) -> Result<(u64, u64)> {
271        let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
272        mr.propose(vshard_id, data)
273    }
274
275    /// Propose a command directly to the metadata Raft group (group 0).
276    ///
277    /// Used by the host crate's metadata proposer and by integration
278    /// tests that exercise the replicated-catalog path without a
279    /// pgwire client. Fails with `ClusterError::GroupNotFound` if
280    /// group 0 does not exist on this node, and with
281    /// `ClusterError::Raft(NotLeader)` if this node is not the
282    /// current leader of group 0.
283    pub fn propose_to_metadata_group(&self, data: Vec<u8>) -> Result<u64> {
284        let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
285        mr.propose_to_group(crate::metadata_group::METADATA_GROUP_ID, data)
286    }
287
288    /// Propose to the metadata Raft group, transparently forwarding
289    /// to the current leader if this node is not it.
290    ///
291    /// Tries a local propose first. On
292    /// `ClusterError::Raft(NotLeader { leader_hint })`, looks up the
293    /// hinted leader's address in cluster topology and sends a
294    /// [`crate::rpc_codec::MetadataProposeRequest`] over QUIC. The
295    /// receiving leader applies the proposal locally and returns
296    /// the log index.
297    ///
298    /// On `NotLeader { leader_hint: None }` (election in progress,
299    /// no observed leader yet) the call returns the original
300    /// `NotLeader` error so the caller can decide whether to retry.
301    /// We deliberately do not implement a wait-and-retry loop here
302    /// because the caller (the host-side proposer) may have a
303    /// shorter deadline than any reasonable retry budget.
304    ///
305    /// The leader-side path through this function is identical to
306    /// the bare `propose_to_metadata_group` — the only extra cost is
307    /// an `is_leader_locally` check before the local propose.
308    pub async fn propose_to_metadata_group_via_leader(&self, data: Vec<u8>) -> Result<u64> {
309        // Phase 1: try local propose.
310        match self.propose_to_metadata_group(data.clone()) {
311            Ok(idx) => Ok(idx),
312            Err(crate::error::ClusterError::Raft(nodedb_raft::RaftError::NotLeader {
313                leader_hint,
314            })) => {
315                let Some(leader_id) = leader_hint else {
316                    return Err(crate::error::ClusterError::Raft(
317                        nodedb_raft::RaftError::NotLeader { leader_hint: None },
318                    ));
319                };
320                if leader_id == self.node_id {
321                    // Should not happen — local propose said we
322                    // weren't leader but the hint points at us. Fall
323                    // through to the original error so the caller
324                    // sees the contradiction.
325                    return Err(crate::error::ClusterError::Raft(
326                        nodedb_raft::RaftError::NotLeader {
327                            leader_hint: Some(leader_id),
328                        },
329                    ));
330                }
331                // Phase 2: forward to the hinted leader.
332                self.forward_metadata_propose(leader_id, data).await
333            }
334            Err(other) => Err(other),
335        }
336    }
337
338    /// Send a `MetadataProposeRequest` to `leader_id`. Looks up the
339    /// leader's listen address via the local topology snapshot and
340    /// dispatches through the existing peer transport.
341    async fn forward_metadata_propose(&self, leader_id: u64, data: Vec<u8>) -> Result<u64> {
342        // Resolve and register the leader's address with the
343        // transport so `send_rpc` has a destination. Topology is
344        // updated by the membership / health subsystem; if the
345        // leader isn't in our local topology yet we fail loudly so
346        // the caller can fall back to its own retry policy rather
347        // than silently dropping the proposal.
348        {
349            let topo = self.topology.read().unwrap_or_else(|p| p.into_inner());
350            let Some(node) = topo.get_node(leader_id) else {
351                return Err(crate::error::ClusterError::Transport {
352                    detail: format!(
353                        "metadata propose forward: leader {leader_id} not in local topology"
354                    ),
355                });
356            };
357            let Some(addr) = node.socket_addr() else {
358                return Err(crate::error::ClusterError::Transport {
359                    detail: format!(
360                        "metadata propose forward: leader {leader_id} has unparseable addr {:?}",
361                        node.addr
362                    ),
363                });
364            };
365            // Idempotent: register_peer overwrites any prior mapping.
366            self.transport.register_peer(leader_id, addr);
367        }
368
369        let req = crate::rpc_codec::RaftRpc::MetadataProposeRequest(
370            crate::rpc_codec::MetadataProposeRequest { bytes: data },
371        );
372        let resp = self.transport.send_rpc(leader_id, req).await?;
373        match resp {
374            crate::rpc_codec::RaftRpc::MetadataProposeResponse(r) => {
375                if r.success {
376                    Ok(r.log_index)
377                } else if let Some(hint) = r.leader_hint {
378                    // The receiving node was also not the leader
379                    // (rare: leader changed between our local check
380                    // and the forwarded RPC). Surface as NotLeader
381                    // so the caller's normal retry path runs.
382                    Err(crate::error::ClusterError::Raft(
383                        nodedb_raft::RaftError::NotLeader {
384                            leader_hint: Some(hint),
385                        },
386                    ))
387                } else {
388                    Err(crate::error::ClusterError::Transport {
389                        detail: format!("metadata propose forward failed: {}", r.error_message),
390                    })
391                }
392            }
393            other => Err(crate::error::ClusterError::Transport {
394                detail: format!("metadata propose forward: unexpected response variant {other:?}"),
395            }),
396        }
397    }
398
399    /// Returns the inner multi-raft handle. Exposed for tests and for
400    /// the host crate's metadata proposer so it can hold a second
401    /// reference to the same underlying mutex without pulling the
402    /// whole raft loop into the caller's lifetime.
403    pub fn multi_raft_handle(&self) -> Arc<Mutex<crate::multi_raft::MultiRaft>> {
404        self.multi_raft.clone()
405    }
406
407    /// Snapshot all Raft group states for observability (SHOW RAFT GROUPS).
408    pub fn group_statuses(&self) -> Vec<crate::multi_raft::GroupStatus> {
409        let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
410        mr.group_statuses()
411    }
412
413    /// Propose a configuration change to a Raft group.
414    ///
415    /// Returns `(group_id, log_index)` on success.
416    pub fn propose_conf_change(&self, group_id: u64, change: &ConfChange) -> Result<(u64, u64)> {
417        let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
418        mr.propose_conf_change(group_id, change)
419    }
420}
421
422#[cfg(test)]
423mod tests {
424    use super::*;
425    use crate::routing::RoutingTable;
426    use nodedb_types::config::tuning::ClusterTransportTuning;
427    use std::sync::atomic::{AtomicU64, Ordering};
428    use std::time::Instant;
429
430    /// Test applier that counts applied entries across both data and
431    /// metadata groups. The metadata-group variant ([`CountingMetadataApplier`])
432    /// increments the same counter so tests that propose against group 0
433    /// (the metadata group) still see the count move.
434    pub(crate) struct CountingApplier {
435        applied: Arc<AtomicU64>,
436    }
437
438    impl CountingApplier {
439        pub(crate) fn new() -> Self {
440            Self {
441                applied: Arc::new(AtomicU64::new(0)),
442            }
443        }
444
445        pub(crate) fn count(&self) -> u64 {
446            self.applied.load(Ordering::Relaxed)
447        }
448
449        pub(crate) fn metadata_applier(&self) -> Arc<CountingMetadataApplier> {
450            Arc::new(CountingMetadataApplier {
451                applied: self.applied.clone(),
452            })
453        }
454    }
455
456    impl CommitApplier for CountingApplier {
457        fn apply_committed(&self, _group_id: u64, entries: &[LogEntry]) -> u64 {
458            self.applied
459                .fetch_add(entries.len() as u64, Ordering::Relaxed);
460            entries.last().map(|e| e.index).unwrap_or(0)
461        }
462    }
463
464    pub(crate) struct CountingMetadataApplier {
465        applied: Arc<AtomicU64>,
466    }
467
468    impl MetadataApplier for CountingMetadataApplier {
469        fn apply(&self, entries: &[(u64, Vec<u8>)]) -> u64 {
470            self.applied
471                .fetch_add(entries.len() as u64, Ordering::Relaxed);
472            entries.last().map(|(idx, _)| *idx).unwrap_or(0)
473        }
474    }
475
476    /// Helper: create a transport on an ephemeral port.
477    fn make_transport(node_id: u64) -> Arc<NexarTransport> {
478        Arc::new(
479            NexarTransport::new(
480                node_id,
481                "127.0.0.1:0".parse().unwrap(),
482                crate::transport::credentials::TransportCredentials::Insecure,
483            )
484            .unwrap(),
485        )
486    }
487
488    #[tokio::test]
489    async fn single_node_raft_loop_commits() {
490        let dir = tempfile::tempdir().unwrap();
491        let transport = make_transport(1);
492        let rt = RoutingTable::uniform(1, &[1], 1);
493        let mut mr = MultiRaft::new(1, rt, dir.path().to_path_buf());
494        mr.add_group(0, vec![]).unwrap();
495
496        for node in mr.groups_mut().values_mut() {
497            node.election_deadline_override(Instant::now() - Duration::from_millis(1));
498        }
499
500        let applier = CountingApplier::new();
501        let meta = applier.metadata_applier();
502        let topo = Arc::new(RwLock::new(ClusterTopology::new()));
503        let raft_loop =
504            Arc::new(RaftLoop::new(mr, transport, topo, applier).with_metadata_applier(meta));
505
506        let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
507
508        let rl = raft_loop.clone();
509        let run_handle = tokio::spawn(async move {
510            rl.run(shutdown_rx).await;
511        });
512
513        tokio::time::sleep(Duration::from_millis(50)).await;
514
515        assert!(
516            raft_loop.applier.count() >= 1,
517            "expected at least 1 applied entry (no-op), got {}",
518            raft_loop.applier.count()
519        );
520
521        let (_gid, idx) = raft_loop.propose(0, b"hello".to_vec()).unwrap();
522        assert!(idx >= 2);
523
524        tokio::time::sleep(Duration::from_millis(50)).await;
525
526        assert!(
527            raft_loop.applier.count() >= 2,
528            "expected at least 2 applied entries, got {}",
529            raft_loop.applier.count()
530        );
531
532        shutdown_tx.send(true).unwrap();
533        run_handle.abort();
534    }
535
536    #[tokio::test]
537    async fn three_node_election_over_quic() {
538        let t1 = make_transport(1);
539        let t2 = make_transport(2);
540        let t3 = make_transport(3);
541
542        t1.register_peer(2, t2.local_addr());
543        t1.register_peer(3, t3.local_addr());
544        t2.register_peer(1, t1.local_addr());
545        t2.register_peer(3, t3.local_addr());
546        t3.register_peer(1, t1.local_addr());
547        t3.register_peer(2, t2.local_addr());
548
549        let rt = RoutingTable::uniform(1, &[1, 2, 3], 3);
550
551        let dir1 = tempfile::tempdir().unwrap();
552        let mut mr1 = MultiRaft::new(1, rt.clone(), dir1.path().to_path_buf());
553        mr1.add_group(0, vec![2, 3]).unwrap();
554        for node in mr1.groups_mut().values_mut() {
555            node.election_deadline_override(Instant::now() - Duration::from_millis(1));
556        }
557
558        let transport_tuning = ClusterTransportTuning::default();
559        let election_timeout_min = Duration::from_secs(transport_tuning.election_timeout_min_secs);
560        let election_timeout_max = Duration::from_secs(transport_tuning.election_timeout_max_secs);
561
562        let dir2 = tempfile::tempdir().unwrap();
563        let mut mr2 = MultiRaft::new(2, rt.clone(), dir2.path().to_path_buf())
564            .with_election_timeout(election_timeout_min, election_timeout_max);
565        mr2.add_group(0, vec![1, 3]).unwrap();
566
567        let dir3 = tempfile::tempdir().unwrap();
568        let mut mr3 = MultiRaft::new(3, rt.clone(), dir3.path().to_path_buf())
569            .with_election_timeout(election_timeout_min, election_timeout_max);
570        mr3.add_group(0, vec![1, 2]).unwrap();
571
572        let a1 = CountingApplier::new();
573        let m1 = a1.metadata_applier();
574        let a2 = CountingApplier::new();
575        let m2 = a2.metadata_applier();
576        let a3 = CountingApplier::new();
577        let m3 = a3.metadata_applier();
578
579        let topo1 = Arc::new(RwLock::new(ClusterTopology::new()));
580        let topo2 = Arc::new(RwLock::new(ClusterTopology::new()));
581        let topo3 = Arc::new(RwLock::new(ClusterTopology::new()));
582
583        let rl1 = Arc::new(RaftLoop::new(mr1, t1.clone(), topo1, a1).with_metadata_applier(m1));
584        let rl2 = Arc::new(RaftLoop::new(mr2, t2.clone(), topo2, a2).with_metadata_applier(m2));
585        let rl3 = Arc::new(RaftLoop::new(mr3, t3.clone(), topo3, a3).with_metadata_applier(m3));
586
587        let (shutdown_tx, _) = tokio::sync::watch::channel(false);
588
589        let rl2_h = rl2.clone();
590        let sr2 = shutdown_tx.subscribe();
591        tokio::spawn(async move { t2.serve(rl2_h, sr2).await });
592
593        let rl3_h = rl3.clone();
594        let sr3 = shutdown_tx.subscribe();
595        tokio::spawn(async move { t3.serve(rl3_h, sr3).await });
596
597        let rl1_r = rl1.clone();
598        let sr1 = shutdown_tx.subscribe();
599        tokio::spawn(async move { rl1_r.run(sr1).await });
600
601        let rl2_r = rl2.clone();
602        let sr2r = shutdown_tx.subscribe();
603        tokio::spawn(async move { rl2_r.run(sr2r).await });
604
605        let rl3_r = rl3.clone();
606        let sr3r = shutdown_tx.subscribe();
607        tokio::spawn(async move { rl3_r.run(sr3r).await });
608
609        let rl1_h = rl1.clone();
610        let sr1h = shutdown_tx.subscribe();
611        tokio::spawn(async move { t1.serve(rl1_h, sr1h).await });
612
613        // Poll until node 1 commits at least the no-op (election done).
614        let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
615        loop {
616            if rl1.applier.count() >= 1 {
617                break;
618            }
619            assert!(
620                tokio::time::Instant::now() < deadline,
621                "node 1 should have committed at least the no-op, got {}",
622                rl1.applier.count()
623            );
624            tokio::time::sleep(Duration::from_millis(20)).await;
625        }
626
627        let (_gid, idx) = rl1.propose(0, b"distributed-cmd".to_vec()).unwrap();
628        assert!(idx >= 2);
629
630        // Poll until all nodes replicate the proposed command.
631        let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
632        loop {
633            if rl1.applier.count() >= 2 && rl2.applier.count() >= 1 && rl3.applier.count() >= 1 {
634                break;
635            }
636            assert!(
637                tokio::time::Instant::now() < deadline,
638                "replication timed out: n1={}, n2={}, n3={}",
639                rl1.applier.count(),
640                rl2.applier.count(),
641                rl3.applier.count()
642            );
643            tokio::time::sleep(Duration::from_millis(20)).await;
644        }
645
646        shutdown_tx.send(true).unwrap();
647    }
648}