Skip to main content

nodedb_cluster/raft_loop/
loop_core.rs

1//! `RaftLoop` struct, constructors, top-level run loop, and thin wrappers
2//! over `MultiRaft` proposal APIs. The tick body lives in
3//! [`super::tick`]; the inbound-RPC handler lives in
4//! [`super::handle_rpc`]; the async join orchestration lives in
5//! [`super::join`].
6
7use std::pin::Pin;
8use std::sync::{Arc, Mutex, RwLock};
9use std::time::Duration;
10
11use tracing::debug;
12
13use nodedb_raft::message::LogEntry;
14
15use crate::catalog::ClusterCatalog;
16use crate::conf_change::ConfChange;
17use crate::error::Result;
18use crate::forward::{NoopPlanExecutor, PlanExecutor};
19use crate::metadata_group::applier::{MetadataApplier, NoopMetadataApplier};
20use crate::multi_raft::MultiRaft;
21use crate::topology::ClusterTopology;
22use crate::transport::NexarTransport;
23
24/// Default tick interval (10ms — fast enough for sub-second elections).
25///
26/// Matches `ClusterTransportTuning::raft_tick_interval_ms` default.
27pub(super) const DEFAULT_TICK_INTERVAL: Duration = Duration::from_millis(10);
28
29/// Callback for applying committed Raft log entries to the state machine.
30///
31/// Called synchronously during the tick loop. Implementations should be fast
32/// (enqueue to SPSC, not perform I/O directly).
33pub trait CommitApplier: Send + Sync + 'static {
34    /// Apply committed entries for a Raft group.
35    ///
36    /// Returns the index of the last successfully applied entry.
37    fn apply_committed(&self, group_id: u64, entries: &[LogEntry]) -> u64;
38}
39
40/// Type-erased async handler for incoming `VShardEnvelope` messages.
41///
42/// Receives raw envelope bytes, returns response bytes. Set by the main binary
43/// to dispatch to the appropriate engine handler (Event Plane, timeseries, etc.).
44pub type VShardEnvelopeHandler = Arc<
45    dyn Fn(Vec<u8>) -> Pin<Box<dyn std::future::Future<Output = Result<Vec<u8>>> + Send>>
46        + Send
47        + Sync,
48>;
49
50/// Raft event loop coordinator.
51///
52/// Owns the MultiRaft state (behind `Arc<Mutex>`) and drives it via periodic
53/// ticks. Implements [`crate::transport::RaftRpcHandler`] (in
54/// [`super::handle_rpc`]) so it can be passed directly to
55/// [`NexarTransport::serve`] for incoming RPC dispatch.
56///
57/// The `F: RequestForwarder` generic parameter was removed in C-δ.6 when the
58/// SQL-string forwarding path was retired. Cross-node SQL routing now goes
59/// through `gateway.execute / ExecuteRequest` (C-β path).
60pub struct RaftLoop<A: CommitApplier, P: PlanExecutor = NoopPlanExecutor> {
61    pub(super) node_id: u64,
62    pub(super) multi_raft: Arc<Mutex<MultiRaft>>,
63    pub(super) transport: Arc<NexarTransport>,
64    pub(super) topology: Arc<RwLock<ClusterTopology>>,
65    pub(super) applier: A,
66    /// Applies committed entries from the metadata Raft group (group 0).
67    pub(super) metadata_applier: Arc<dyn MetadataApplier>,
68    /// Executes incoming `ExecuteRequest` RPCs without SQL re-planning.
69    pub(super) plan_executor: Arc<P>,
70    pub(super) tick_interval: Duration,
71    /// Optional handler for incoming VShardEnvelope messages.
72    /// Set when the Event Plane or other subsystems need cross-node messaging.
73    pub(super) vshard_handler: Option<VShardEnvelopeHandler>,
74    /// Optional catalog handle for persisting topology/routing updates
75    /// from the join flow. When `None`, persistence is skipped — useful
76    /// for unit tests that don't care about durability.
77    pub(super) catalog: Option<Arc<ClusterCatalog>>,
78    /// Cooperative shutdown signal observed by every detached
79    /// `tokio::spawn` task in [`super::tick`]. `run()` flips it on
80    /// its own shutdown, and [`Self::begin_shutdown`] provides a
81    /// direct entry point for test harnesses that abort the run /
82    /// serve handles and need the spawned tasks to drop their
83    /// `Arc<Mutex<MultiRaft>>` clones immediately so the per-group
84    /// redb log files can release their in-process locks.
85    ///
86    /// Using `watch::Sender` here rather than a raw `AtomicBool` +
87    /// `Notify` pair gives us two properties at once: the latest
88    /// value is visible to every newly-subscribed receiver (no
89    /// missed-notification race when a new detached task is
90    /// spawned just after `begin_shutdown`), and awaiting
91    /// `receiver.changed()` is cancellable inside `tokio::select!`.
92    pub(super) shutdown_watch: tokio::sync::watch::Sender<bool>,
93    /// Boot-time readiness signal. Flipped to `true` by
94    /// [`super::tick::do_tick`] after the first tick completes phase
95    /// 4 (apply committed entries) — i.e. once Raft has driven at
96    /// least one no-op or replayed entries past the persisted
97    /// applied watermark on this node.
98    ///
99    /// The host crate's `start_raft` returns `subscribe_ready()` to
100    /// `main.rs`, which awaits it before binding any client-facing
101    /// listener. This guarantees the first SQL DDL the operator
102    /// runs after process start cannot race against an
103    /// uninitialized metadata raft group, which previously surfaced
104    /// as `metadata propose: not leader` under fast restart loops.
105    pub(super) ready_watch: tokio::sync::watch::Sender<bool>,
106}
107
108impl<A: CommitApplier> RaftLoop<A> {
109    pub fn new(
110        multi_raft: MultiRaft,
111        transport: Arc<NexarTransport>,
112        topology: Arc<RwLock<ClusterTopology>>,
113        applier: A,
114    ) -> Self {
115        let node_id = multi_raft.node_id();
116        let (shutdown_watch, _) = tokio::sync::watch::channel(false);
117        let (ready_watch, _) = tokio::sync::watch::channel(false);
118        Self {
119            node_id,
120            multi_raft: Arc::new(Mutex::new(multi_raft)),
121            transport,
122            topology,
123            applier,
124            metadata_applier: Arc::new(NoopMetadataApplier),
125            plan_executor: Arc::new(NoopPlanExecutor),
126            tick_interval: DEFAULT_TICK_INTERVAL,
127            vshard_handler: None,
128            catalog: None,
129            shutdown_watch,
130            ready_watch,
131        }
132    }
133}
134
135impl<A: CommitApplier, P: PlanExecutor> RaftLoop<A, P> {
136    /// Install a custom plan executor (for cluster mode — C-β path).
137    pub fn with_plan_executor<P2: PlanExecutor>(self, executor: Arc<P2>) -> RaftLoop<A, P2> {
138        RaftLoop {
139            node_id: self.node_id,
140            multi_raft: self.multi_raft,
141            transport: self.transport,
142            topology: self.topology,
143            applier: self.applier,
144            metadata_applier: self.metadata_applier,
145            plan_executor: executor,
146            tick_interval: self.tick_interval,
147            vshard_handler: self.vshard_handler,
148            catalog: self.catalog,
149            shutdown_watch: self.shutdown_watch,
150            ready_watch: self.ready_watch,
151        }
152    }
153
154    /// Signal cooperative shutdown to every detached task spawned
155    /// inside [`super::tick::do_tick`].
156    ///
157    /// This is the entry point for test harnesses that want to
158    /// tear down a `RaftLoop` without waiting for the external
159    /// `run()` shutdown watch channel to propagate. In production
160    /// the same signal is emitted automatically by `run()` when
161    /// its external shutdown receiver fires.
162    ///
163    /// Idempotent: calling this multiple times is a no-op after
164    /// the first.
165    pub fn begin_shutdown(&self) {
166        let _ = self.shutdown_watch.send(true);
167    }
168
169    /// Subscribe to the boot-time readiness signal.
170    ///
171    /// The returned receiver starts at `false` and flips to `true`
172    /// exactly once, after the first [`super::tick::do_tick`]
173    /// completes phase 4 (apply committed entries). Used by the
174    /// host crate to gate client-facing listener startup until the
175    /// metadata raft group has produced its first applied entry.
176    pub fn subscribe_ready(&self) -> tokio::sync::watch::Receiver<bool> {
177        self.ready_watch.subscribe()
178    }
179
180    /// Set a handler for incoming VShardEnvelope messages.
181    pub fn with_vshard_handler(mut self, handler: VShardEnvelopeHandler) -> Self {
182        self.vshard_handler = Some(handler);
183        self
184    }
185
186    /// Install the metadata applier used for group-0 commits.
187    ///
188    /// The host crate (nodedb) calls this with a production applier that
189    /// wraps an in-memory `MetadataCache` and additionally persists to
190    /// redb / broadcasts catalog change events. The default
191    /// [`NoopMetadataApplier`] is kept only for tests that don't care.
192    pub fn with_metadata_applier(mut self, applier: Arc<dyn MetadataApplier>) -> Self {
193        self.metadata_applier = applier;
194        self
195    }
196
197    pub fn with_tick_interval(mut self, interval: Duration) -> Self {
198        self.tick_interval = interval;
199        self
200    }
201
202    /// Attach a cluster catalog — used by the join flow to persist the
203    /// updated topology + routing after a conf-change commits.
204    pub fn with_catalog(mut self, catalog: Arc<ClusterCatalog>) -> Self {
205        self.catalog = Some(catalog);
206        self
207    }
208
209    /// This node's id (exposed for handlers and tests).
210    pub fn node_id(&self) -> u64 {
211        self.node_id
212    }
213
214    /// Run the event loop until shutdown.
215    ///
216    /// This drives Raft elections, heartbeats, and message dispatch.
217    /// Call [`NexarTransport::serve`] separately with `Arc<Self>` as the handler.
218    ///
219    /// When the externally-supplied `shutdown` receiver fires,
220    /// the loop also propagates the signal to the internal
221    /// cooperative-shutdown channel so every detached task
222    /// spawned inside `do_tick` exits promptly and drops its
223    /// `Arc<Mutex<MultiRaft>>` clone.
224    pub async fn run(&self, mut shutdown: tokio::sync::watch::Receiver<bool>) {
225        let mut interval = tokio::time::interval(self.tick_interval);
226        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
227
228        loop {
229            tokio::select! {
230                _ = interval.tick() => {
231                    self.do_tick();
232                }
233                _ = shutdown.changed() => {
234                    if *shutdown.borrow() {
235                        debug!("raft loop shutting down");
236                        self.begin_shutdown();
237                        break;
238                    }
239                }
240            }
241        }
242    }
243
244    /// Propose a command to the Raft group owning the given vShard.
245    ///
246    /// Returns `(group_id, log_index)` on success.
247    pub fn propose(&self, vshard_id: u16, data: Vec<u8>) -> Result<(u64, u64)> {
248        let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
249        mr.propose(vshard_id, data)
250    }
251
252    /// Propose a command directly to the metadata Raft group (group 0).
253    ///
254    /// Used by the host crate's metadata proposer and by integration
255    /// tests that exercise the replicated-catalog path without a
256    /// pgwire client. Fails with `ClusterError::GroupNotFound` if
257    /// group 0 does not exist on this node, and with
258    /// `ClusterError::Raft(NotLeader)` if this node is not the
259    /// current leader of group 0.
260    pub fn propose_to_metadata_group(&self, data: Vec<u8>) -> Result<u64> {
261        let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
262        mr.propose_to_group(crate::metadata_group::METADATA_GROUP_ID, data)
263    }
264
265    /// Propose to the metadata Raft group, transparently forwarding
266    /// to the current leader if this node is not it.
267    ///
268    /// Tries a local propose first. On
269    /// `ClusterError::Raft(NotLeader { leader_hint })`, looks up the
270    /// hinted leader's address in cluster topology and sends a
271    /// [`crate::rpc_codec::MetadataProposeRequest`] over QUIC. The
272    /// receiving leader applies the proposal locally and returns
273    /// the log index.
274    ///
275    /// On `NotLeader { leader_hint: None }` (election in progress,
276    /// no observed leader yet) the call returns the original
277    /// `NotLeader` error so the caller can decide whether to retry.
278    /// We deliberately do not implement a wait-and-retry loop here
279    /// because the caller (the host-side proposer) may have a
280    /// shorter deadline than any reasonable retry budget.
281    ///
282    /// The leader-side path through this function is identical to
283    /// the bare `propose_to_metadata_group` — the only extra cost is
284    /// an `is_leader_locally` check before the local propose.
285    pub async fn propose_to_metadata_group_via_leader(&self, data: Vec<u8>) -> Result<u64> {
286        // Phase 1: try local propose.
287        match self.propose_to_metadata_group(data.clone()) {
288            Ok(idx) => Ok(idx),
289            Err(crate::error::ClusterError::Raft(nodedb_raft::RaftError::NotLeader {
290                leader_hint,
291            })) => {
292                let Some(leader_id) = leader_hint else {
293                    return Err(crate::error::ClusterError::Raft(
294                        nodedb_raft::RaftError::NotLeader { leader_hint: None },
295                    ));
296                };
297                if leader_id == self.node_id {
298                    // Should not happen — local propose said we
299                    // weren't leader but the hint points at us. Fall
300                    // through to the original error so the caller
301                    // sees the contradiction.
302                    return Err(crate::error::ClusterError::Raft(
303                        nodedb_raft::RaftError::NotLeader {
304                            leader_hint: Some(leader_id),
305                        },
306                    ));
307                }
308                // Phase 2: forward to the hinted leader.
309                self.forward_metadata_propose(leader_id, data).await
310            }
311            Err(other) => Err(other),
312        }
313    }
314
315    /// Send a `MetadataProposeRequest` to `leader_id`. Looks up the
316    /// leader's listen address via the local topology snapshot and
317    /// dispatches through the existing peer transport.
318    async fn forward_metadata_propose(&self, leader_id: u64, data: Vec<u8>) -> Result<u64> {
319        // Resolve and register the leader's address with the
320        // transport so `send_rpc` has a destination. Topology is
321        // updated by the membership / health subsystem; if the
322        // leader isn't in our local topology yet we fail loudly so
323        // the caller can fall back to its own retry policy rather
324        // than silently dropping the proposal.
325        {
326            let topo = self.topology.read().unwrap_or_else(|p| p.into_inner());
327            let Some(node) = topo.get_node(leader_id) else {
328                return Err(crate::error::ClusterError::Transport {
329                    detail: format!(
330                        "metadata propose forward: leader {leader_id} not in local topology"
331                    ),
332                });
333            };
334            let Some(addr) = node.socket_addr() else {
335                return Err(crate::error::ClusterError::Transport {
336                    detail: format!(
337                        "metadata propose forward: leader {leader_id} has unparseable addr {:?}",
338                        node.addr
339                    ),
340                });
341            };
342            // Idempotent: register_peer overwrites any prior mapping.
343            self.transport.register_peer(leader_id, addr);
344        }
345
346        let req = crate::rpc_codec::RaftRpc::MetadataProposeRequest(
347            crate::rpc_codec::MetadataProposeRequest { bytes: data },
348        );
349        let resp = self.transport.send_rpc(leader_id, req).await?;
350        match resp {
351            crate::rpc_codec::RaftRpc::MetadataProposeResponse(r) => {
352                if r.success {
353                    Ok(r.log_index)
354                } else if let Some(hint) = r.leader_hint {
355                    // The receiving node was also not the leader
356                    // (rare: leader changed between our local check
357                    // and the forwarded RPC). Surface as NotLeader
358                    // so the caller's normal retry path runs.
359                    Err(crate::error::ClusterError::Raft(
360                        nodedb_raft::RaftError::NotLeader {
361                            leader_hint: Some(hint),
362                        },
363                    ))
364                } else {
365                    Err(crate::error::ClusterError::Transport {
366                        detail: format!("metadata propose forward failed: {}", r.error_message),
367                    })
368                }
369            }
370            other => Err(crate::error::ClusterError::Transport {
371                detail: format!("metadata propose forward: unexpected response variant {other:?}"),
372            }),
373        }
374    }
375
376    /// Returns the inner multi-raft handle. Exposed for tests and for
377    /// the host crate's metadata proposer so it can hold a second
378    /// reference to the same underlying mutex without pulling the
379    /// whole raft loop into the caller's lifetime.
380    pub fn multi_raft_handle(&self) -> Arc<Mutex<crate::multi_raft::MultiRaft>> {
381        self.multi_raft.clone()
382    }
383
384    /// Snapshot all Raft group states for observability (SHOW RAFT GROUPS).
385    pub fn group_statuses(&self) -> Vec<crate::multi_raft::GroupStatus> {
386        let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
387        mr.group_statuses()
388    }
389
390    /// Propose a configuration change to a Raft group.
391    ///
392    /// Returns `(group_id, log_index)` on success.
393    pub fn propose_conf_change(&self, group_id: u64, change: &ConfChange) -> Result<(u64, u64)> {
394        let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
395        mr.propose_conf_change(group_id, change)
396    }
397}
398
399#[cfg(test)]
400mod tests {
401    use super::*;
402    use crate::routing::RoutingTable;
403    use nodedb_types::config::tuning::ClusterTransportTuning;
404    use std::sync::atomic::{AtomicU64, Ordering};
405    use std::time::Instant;
406
407    /// Test applier that counts applied entries across both data and
408    /// metadata groups. The metadata-group variant ([`CountingMetadataApplier`])
409    /// increments the same counter so tests that propose against group 0
410    /// (the metadata group) still see the count move.
411    pub(crate) struct CountingApplier {
412        applied: Arc<AtomicU64>,
413    }
414
415    impl CountingApplier {
416        pub(crate) fn new() -> Self {
417            Self {
418                applied: Arc::new(AtomicU64::new(0)),
419            }
420        }
421
422        pub(crate) fn count(&self) -> u64 {
423            self.applied.load(Ordering::Relaxed)
424        }
425
426        pub(crate) fn metadata_applier(&self) -> Arc<CountingMetadataApplier> {
427            Arc::new(CountingMetadataApplier {
428                applied: self.applied.clone(),
429            })
430        }
431    }
432
433    impl CommitApplier for CountingApplier {
434        fn apply_committed(&self, _group_id: u64, entries: &[LogEntry]) -> u64 {
435            self.applied
436                .fetch_add(entries.len() as u64, Ordering::Relaxed);
437            entries.last().map(|e| e.index).unwrap_or(0)
438        }
439    }
440
441    pub(crate) struct CountingMetadataApplier {
442        applied: Arc<AtomicU64>,
443    }
444
445    impl MetadataApplier for CountingMetadataApplier {
446        fn apply(&self, entries: &[(u64, Vec<u8>)]) -> u64 {
447            self.applied
448                .fetch_add(entries.len() as u64, Ordering::Relaxed);
449            entries.last().map(|(idx, _)| *idx).unwrap_or(0)
450        }
451    }
452
453    /// Helper: create a transport on an ephemeral port.
454    fn make_transport(node_id: u64) -> Arc<NexarTransport> {
455        Arc::new(NexarTransport::new(node_id, "127.0.0.1:0".parse().unwrap()).unwrap())
456    }
457
458    #[tokio::test]
459    async fn single_node_raft_loop_commits() {
460        let dir = tempfile::tempdir().unwrap();
461        let transport = make_transport(1);
462        let rt = RoutingTable::uniform(1, &[1], 1);
463        let mut mr = MultiRaft::new(1, rt, dir.path().to_path_buf());
464        mr.add_group(0, vec![]).unwrap();
465
466        for node in mr.groups_mut().values_mut() {
467            node.election_deadline_override(Instant::now() - Duration::from_millis(1));
468        }
469
470        let applier = CountingApplier::new();
471        let meta = applier.metadata_applier();
472        let topo = Arc::new(RwLock::new(ClusterTopology::new()));
473        let raft_loop =
474            Arc::new(RaftLoop::new(mr, transport, topo, applier).with_metadata_applier(meta));
475
476        let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
477
478        let rl = raft_loop.clone();
479        let run_handle = tokio::spawn(async move {
480            rl.run(shutdown_rx).await;
481        });
482
483        tokio::time::sleep(Duration::from_millis(50)).await;
484
485        assert!(
486            raft_loop.applier.count() >= 1,
487            "expected at least 1 applied entry (no-op), got {}",
488            raft_loop.applier.count()
489        );
490
491        let (_gid, idx) = raft_loop.propose(0, b"hello".to_vec()).unwrap();
492        assert!(idx >= 2);
493
494        tokio::time::sleep(Duration::from_millis(50)).await;
495
496        assert!(
497            raft_loop.applier.count() >= 2,
498            "expected at least 2 applied entries, got {}",
499            raft_loop.applier.count()
500        );
501
502        shutdown_tx.send(true).unwrap();
503        run_handle.abort();
504    }
505
506    #[tokio::test]
507    async fn three_node_election_over_quic() {
508        let t1 = make_transport(1);
509        let t2 = make_transport(2);
510        let t3 = make_transport(3);
511
512        t1.register_peer(2, t2.local_addr());
513        t1.register_peer(3, t3.local_addr());
514        t2.register_peer(1, t1.local_addr());
515        t2.register_peer(3, t3.local_addr());
516        t3.register_peer(1, t1.local_addr());
517        t3.register_peer(2, t2.local_addr());
518
519        let rt = RoutingTable::uniform(1, &[1, 2, 3], 3);
520
521        let dir1 = tempfile::tempdir().unwrap();
522        let mut mr1 = MultiRaft::new(1, rt.clone(), dir1.path().to_path_buf());
523        mr1.add_group(0, vec![2, 3]).unwrap();
524        for node in mr1.groups_mut().values_mut() {
525            node.election_deadline_override(Instant::now() - Duration::from_millis(1));
526        }
527
528        let transport_tuning = ClusterTransportTuning::default();
529        let election_timeout_min = Duration::from_secs(transport_tuning.election_timeout_min_secs);
530        let election_timeout_max = Duration::from_secs(transport_tuning.election_timeout_max_secs);
531
532        let dir2 = tempfile::tempdir().unwrap();
533        let mut mr2 = MultiRaft::new(2, rt.clone(), dir2.path().to_path_buf())
534            .with_election_timeout(election_timeout_min, election_timeout_max);
535        mr2.add_group(0, vec![1, 3]).unwrap();
536
537        let dir3 = tempfile::tempdir().unwrap();
538        let mut mr3 = MultiRaft::new(3, rt.clone(), dir3.path().to_path_buf())
539            .with_election_timeout(election_timeout_min, election_timeout_max);
540        mr3.add_group(0, vec![1, 2]).unwrap();
541
542        let a1 = CountingApplier::new();
543        let m1 = a1.metadata_applier();
544        let a2 = CountingApplier::new();
545        let m2 = a2.metadata_applier();
546        let a3 = CountingApplier::new();
547        let m3 = a3.metadata_applier();
548
549        let topo1 = Arc::new(RwLock::new(ClusterTopology::new()));
550        let topo2 = Arc::new(RwLock::new(ClusterTopology::new()));
551        let topo3 = Arc::new(RwLock::new(ClusterTopology::new()));
552
553        let rl1 = Arc::new(RaftLoop::new(mr1, t1.clone(), topo1, a1).with_metadata_applier(m1));
554        let rl2 = Arc::new(RaftLoop::new(mr2, t2.clone(), topo2, a2).with_metadata_applier(m2));
555        let rl3 = Arc::new(RaftLoop::new(mr3, t3.clone(), topo3, a3).with_metadata_applier(m3));
556
557        let (shutdown_tx, _) = tokio::sync::watch::channel(false);
558
559        let rl2_h = rl2.clone();
560        let sr2 = shutdown_tx.subscribe();
561        tokio::spawn(async move { t2.serve(rl2_h, sr2).await });
562
563        let rl3_h = rl3.clone();
564        let sr3 = shutdown_tx.subscribe();
565        tokio::spawn(async move { t3.serve(rl3_h, sr3).await });
566
567        let rl1_r = rl1.clone();
568        let sr1 = shutdown_tx.subscribe();
569        tokio::spawn(async move { rl1_r.run(sr1).await });
570
571        let rl2_r = rl2.clone();
572        let sr2r = shutdown_tx.subscribe();
573        tokio::spawn(async move { rl2_r.run(sr2r).await });
574
575        let rl3_r = rl3.clone();
576        let sr3r = shutdown_tx.subscribe();
577        tokio::spawn(async move { rl3_r.run(sr3r).await });
578
579        let rl1_h = rl1.clone();
580        let sr1h = shutdown_tx.subscribe();
581        tokio::spawn(async move { t1.serve(rl1_h, sr1h).await });
582
583        tokio::time::sleep(Duration::from_millis(200)).await;
584
585        assert!(
586            rl1.applier.count() >= 1,
587            "node 1 should have committed at least the no-op, got {}",
588            rl1.applier.count()
589        );
590
591        let (_gid, idx) = rl1.propose(0, b"distributed-cmd".to_vec()).unwrap();
592        assert!(idx >= 2);
593
594        tokio::time::sleep(Duration::from_millis(200)).await;
595
596        assert!(
597            rl1.applier.count() >= 2,
598            "node 1: expected >= 2 applied, got {}",
599            rl1.applier.count()
600        );
601
602        assert!(
603            rl2.applier.count() >= 1,
604            "node 2: expected >= 1 applied, got {}",
605            rl2.applier.count()
606        );
607        assert!(
608            rl3.applier.count() >= 1,
609            "node 3: expected >= 1 applied, got {}",
610            rl3.applier.count()
611        );
612
613        shutdown_tx.send(true).unwrap();
614    }
615}