Skip to main content

nodedb_cluster/raft_loop/
loop_core.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! `RaftLoop` struct, constructors, top-level run loop, and thin wrappers
4//! over `MultiRaft` proposal APIs. The tick body lives in
5//! [`super::tick`]; the inbound-RPC handler lives in
6//! [`super::handle_rpc`]; the async join orchestration lives in
7//! [`super::join`].
8
9use std::pin::Pin;
10use std::sync::{Arc, Mutex, RwLock};
11use std::time::{Duration, Instant};
12
13use tracing::debug;
14
15use nodedb_raft::message::LogEntry;
16
17use crate::applied_watcher::GroupAppliedWatchers;
18use crate::catalog::ClusterCatalog;
19use crate::error::Result;
20use crate::forward::{NoopPlanExecutor, PlanExecutor};
21use crate::loop_metrics::LoopMetrics;
22use crate::metadata_group::applier::{MetadataApplier, NoopMetadataApplier};
23use crate::multi_raft::MultiRaft;
24use crate::topology::ClusterTopology;
25use crate::transport::NexarTransport;
26
27/// Default tick interval (10ms — fast enough for sub-second elections).
28///
29/// Matches `ClusterTransportTuning::raft_tick_interval_ms` default.
30pub(super) const DEFAULT_TICK_INTERVAL: Duration = Duration::from_millis(10);
31
32/// Callback for applying committed Raft log entries to the state machine.
33///
34/// Called synchronously during the tick loop. Implementations should be fast
35/// (enqueue to SPSC, not perform I/O directly).
36pub trait CommitApplier: Send + Sync + 'static {
37    /// Apply committed entries for a Raft group.
38    ///
39    /// Returns the index of the last successfully applied entry.
40    fn apply_committed(&self, group_id: u64, entries: &[LogEntry]) -> u64;
41}
42
43/// Hook for quarantine integration on the Raft snapshot receive path.
44///
45/// `nodedb-cluster` cannot depend on `nodedb` (circular), so the host crate
46/// (`nodedb`) supplies an implementation backed by its `QuarantineRegistry`.
47/// Cluster-only tests leave the field `None`, which skips all quarantine
48/// accounting.
49///
50/// All methods take `(group_id, last_included_index)` as the snapshot identity.
51pub trait SnapshotQuarantineHook: Send + Sync + 'static {
52    /// Returns `true` if the chunk identified by `(group_id, index)` is
53    /// already in the quarantined state and should be rejected immediately
54    /// without attempting to decode it.
55    fn is_quarantined(&self, group_id: u64, last_included_index: u64) -> bool;
56
57    /// Called after a successful decode — resets the strike counter so a
58    /// single transient CRC error is not held against a healthy peer.
59    fn record_success(&self, group_id: u64, last_included_index: u64);
60
61    /// Called on a CRC-class decode failure.
62    ///
63    /// Returns `true` when the segment has just been quarantined (second
64    /// consecutive failure), and `false` on the first strike (caller should
65    /// surface the framing error and allow the peer to retry).
66    fn record_failure(&self, group_id: u64, last_included_index: u64, error: &str) -> bool;
67}
68
69/// Type-erased async handler for incoming `VShardEnvelope` messages.
70///
71/// Receives raw envelope bytes, returns response bytes. Set by the main binary
72/// to dispatch to the appropriate engine handler (Event Plane, timeseries, etc.).
73pub type VShardEnvelopeHandler = Arc<
74    dyn Fn(Vec<u8>) -> Pin<Box<dyn std::future::Future<Output = Result<Vec<u8>>> + Send>>
75        + Send
76        + Sync,
77>;
78
79/// Raft event loop coordinator.
80///
81/// Owns the MultiRaft state (behind `Arc<Mutex>`) and drives it via periodic
82/// ticks. Implements [`crate::transport::RaftRpcHandler`] (in
83/// [`super::handle_rpc`]) so it can be passed directly to
84/// [`NexarTransport::serve`] for incoming RPC dispatch.
85///
86/// The `F: RequestForwarder` generic parameter was removed in C-δ.6 when the
87/// SQL-string forwarding path was retired. Cross-node SQL routing now goes
88/// through `gateway.execute / ExecuteRequest` (C-β path).
89pub struct RaftLoop<A: CommitApplier, P: PlanExecutor = NoopPlanExecutor> {
90    pub(super) node_id: u64,
91    pub(super) multi_raft: Arc<Mutex<MultiRaft>>,
92    pub(super) transport: Arc<NexarTransport>,
93    pub(super) topology: Arc<RwLock<ClusterTopology>>,
94    pub(super) applier: A,
95    /// Applies committed entries from the metadata Raft group (group 0).
96    pub(super) metadata_applier: Arc<dyn MetadataApplier>,
97    /// Executes incoming `ExecuteRequest` RPCs without SQL re-planning.
98    pub(super) plan_executor: Arc<P>,
99    pub(super) tick_interval: Duration,
100    /// Optional handler for incoming VShardEnvelope messages.
101    /// Set when the Event Plane or other subsystems need cross-node messaging.
102    pub(super) vshard_handler: Option<VShardEnvelopeHandler>,
103    /// Optional catalog handle for persisting topology/routing updates
104    /// from the join flow. When `None`, persistence is skipped — useful
105    /// for unit tests that don't care about durability.
106    pub(super) catalog: Option<Arc<ClusterCatalog>>,
107    /// Cooperative shutdown signal observed by every detached
108    /// `tokio::spawn` task in [`super::tick`]. `run()` flips it on
109    /// its own shutdown, and [`Self::begin_shutdown`] provides a
110    /// direct entry point for test harnesses that abort the run /
111    /// serve handles and need the spawned tasks to drop their
112    /// `Arc<Mutex<MultiRaft>>` clones immediately so the per-group
113    /// redb log files can release their in-process locks.
114    ///
115    /// Using `watch::Sender` here rather than a raw `AtomicBool` +
116    /// `Notify` pair gives us two properties at once: the latest
117    /// value is visible to every newly-subscribed receiver (no
118    /// missed-notification race when a new detached task is
119    /// spawned just after `begin_shutdown`), and awaiting
120    /// `receiver.changed()` is cancellable inside `tokio::select!`.
121    pub(super) shutdown_watch: tokio::sync::watch::Sender<bool>,
122    /// Standardized loop observations. Updated inside `run()` after
123    /// every `do_tick`. Register via
124    /// [`Self::loop_metrics`] with the cluster registry.
125    pub(super) loop_metrics: Arc<LoopMetrics>,
126    /// Boot-time readiness signal. Flipped to `true` by
127    /// [`super::tick::do_tick`] after the first tick completes phase
128    /// 4 (apply committed entries) — i.e. once Raft has driven at
129    /// least one no-op or replayed entries past the persisted
130    /// applied watermark on this node.
131    ///
132    /// The host crate's `start_raft` returns `subscribe_ready()` to
133    /// `main.rs`, which awaits it before binding any client-facing
134    /// listener. This guarantees the first SQL DDL the operator
135    /// runs after process start cannot race against an
136    /// uninitialized metadata raft group, which previously surfaced
137    /// as `metadata propose: not leader` under fast restart loops.
138    pub(super) ready_watch: tokio::sync::watch::Sender<bool>,
139    /// Per-Raft-group apply watermark watchers. Bumped from
140    /// [`super::tick::do_tick`] after applies and from
141    /// [`super::handle_rpc`] after snapshot installs. The host crate
142    /// shares this Arc with `SharedState` so proposers, lease
143    /// renewals, and consistent reads can wait on the apply
144    /// watermark of the *specific* group whose proposal they made.
145    pub(super) group_watchers: Arc<GroupAppliedWatchers>,
146    /// Tracks whether this node was the metadata-group leader on the
147    /// previous tick. Used to detect false→true edges so the cluster
148    /// epoch (see [`crate::cluster_epoch`]) can be bumped exactly once
149    /// per leadership acquisition. `AtomicBool` because [`super::tick::do_tick`]
150    /// runs against `&self`.
151    pub(super) prev_metadata_leader: std::sync::atomic::AtomicBool,
152
153    /// Optional quarantine hook for the snapshot receive path.
154    ///
155    /// When set (by the `nodedb` binary via `with_snapshot_quarantine_hook`),
156    /// the `InstallSnapshotRequest` handler checks whether the incoming chunk
157    /// is already quarantined, records successes to clear transient strikes, and
158    /// records consecutive failures to quarantine persistently.
159    ///
160    /// Cluster-only tests leave this as `None`, which disables all quarantine
161    /// accounting for snapshot chunks.
162    pub(super) snapshot_quarantine_hook: Option<Arc<dyn SnapshotQuarantineHook>>,
163
164    /// In-progress partial snapshot receives, keyed by `group_id`.
165    ///
166    /// Each entry tracks the `.partial` file, running CRC, and expected next
167    /// byte offset for a follower that is currently receiving a chunked snapshot.
168    /// Entries are created on `offset == 0`, updated on each chunk, and removed
169    /// on finalization or offset regression.
170    pub(super) partial_snapshots: Arc<crate::install_snapshot::PartialSnapshotMap>,
171
172    /// Data directory for persistent partial-snapshot files and the
173    /// `recv_snapshots/` subdirectory. `None` in tests that don't exercise
174    /// the disk path.
175    pub(super) data_dir: Option<std::path::PathBuf>,
176
177    /// Snapshot chunk size for the sender path (bytes).
178    pub(super) snapshot_chunk_bytes: u64,
179
180    /// Orphan partial-snapshot max age for the GC sweeper (seconds).
181    pub(super) orphan_partial_max_age_secs: u64,
182}
183
184impl<A: CommitApplier> RaftLoop<A> {
185    pub fn new(
186        multi_raft: MultiRaft,
187        transport: Arc<NexarTransport>,
188        topology: Arc<RwLock<ClusterTopology>>,
189        applier: A,
190    ) -> Self {
191        let node_id = multi_raft.node_id();
192        let (shutdown_watch, _) = tokio::sync::watch::channel(false);
193        let (ready_watch, _) = tokio::sync::watch::channel(false);
194        Self {
195            node_id,
196            multi_raft: Arc::new(Mutex::new(multi_raft)),
197            transport,
198            topology,
199            applier,
200            metadata_applier: Arc::new(NoopMetadataApplier),
201            plan_executor: Arc::new(NoopPlanExecutor),
202            tick_interval: DEFAULT_TICK_INTERVAL,
203            vshard_handler: None,
204            catalog: None,
205            shutdown_watch,
206            ready_watch,
207            loop_metrics: LoopMetrics::new("raft_tick_loop"),
208            group_watchers: Arc::new(GroupAppliedWatchers::new()),
209            prev_metadata_leader: std::sync::atomic::AtomicBool::new(false),
210            snapshot_quarantine_hook: None,
211            partial_snapshots: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
212            data_dir: None,
213            snapshot_chunk_bytes: 4 * 1024 * 1024,
214            orphan_partial_max_age_secs: 300,
215        }
216    }
217}
218
219impl<A: CommitApplier, P: PlanExecutor> RaftLoop<A, P> {
220    /// Install the snapshot quarantine hook (mutable setter variant).
221    ///
222    /// Prefer `with_snapshot_quarantine_hook` on the builder chain unless you
223    /// need to set the hook after construction.
224    pub fn set_snapshot_quarantine_hook(&mut self, hook: Arc<dyn SnapshotQuarantineHook>) {
225        self.snapshot_quarantine_hook = Some(hook);
226    }
227    /// Install a custom plan executor (for cluster mode — C-β path).
228    pub fn with_plan_executor<P2: PlanExecutor>(self, executor: Arc<P2>) -> RaftLoop<A, P2> {
229        RaftLoop {
230            node_id: self.node_id,
231            multi_raft: self.multi_raft,
232            transport: self.transport,
233            topology: self.topology,
234            applier: self.applier,
235            metadata_applier: self.metadata_applier,
236            plan_executor: executor,
237            tick_interval: self.tick_interval,
238            vshard_handler: self.vshard_handler,
239            catalog: self.catalog,
240            shutdown_watch: self.shutdown_watch,
241            ready_watch: self.ready_watch,
242            loop_metrics: self.loop_metrics,
243            group_watchers: self.group_watchers,
244            prev_metadata_leader: self.prev_metadata_leader,
245            snapshot_quarantine_hook: self.snapshot_quarantine_hook,
246            partial_snapshots: self.partial_snapshots,
247            data_dir: self.data_dir,
248            snapshot_chunk_bytes: self.snapshot_chunk_bytes,
249            orphan_partial_max_age_secs: self.orphan_partial_max_age_secs,
250        }
251    }
252
253    /// Replace the per-group apply watcher registry.
254    ///
255    /// The host crate calls this with the same `Arc` it stores on
256    /// `SharedState` so proposers and consistent-read paths share
257    /// one registry with the tick loop's bump points. Defaults to a
258    /// fresh empty registry when not set.
259    pub fn with_group_watchers(mut self, watchers: Arc<GroupAppliedWatchers>) -> Self {
260        self.group_watchers = watchers;
261        self
262    }
263
264    /// Attach the snapshot quarantine hook (builder chain variant).
265    ///
266    /// The supplied implementation is called by the `InstallSnapshotRequest`
267    /// handler to check for, record, and short-circuit quarantined chunks.
268    pub fn with_snapshot_quarantine_hook(mut self, hook: Arc<dyn SnapshotQuarantineHook>) -> Self {
269        self.snapshot_quarantine_hook = Some(hook);
270        self
271    }
272
273    /// Set the data directory for partial-snapshot persistence and GC.
274    ///
275    /// When set, the `InstallSnapshotRequest` handler writes chunks to
276    /// `<data_dir>/recv_snapshots/<group_id>.partial` and the GC sweeper
277    /// removes stale partials on startup. When `None` (the default, used by
278    /// unit tests), disk writes are skipped — the receiver operates in-memory
279    /// only with empty chunk data.
280    pub fn with_data_dir(mut self, data_dir: std::path::PathBuf) -> Self {
281        self.data_dir = Some(data_dir);
282        self
283    }
284
285    /// Override the snapshot chunk byte size (default: 4 MiB).
286    pub fn with_snapshot_chunk_bytes(mut self, chunk_bytes: u64) -> Self {
287        self.snapshot_chunk_bytes = chunk_bytes;
288        self
289    }
290
291    /// Override the orphan-partial max age for the GC sweeper (default: 300 s).
292    pub fn with_orphan_partial_max_age_secs(mut self, secs: u64) -> Self {
293        self.orphan_partial_max_age_secs = secs;
294        self
295    }
296
297    /// Shared handle to the per-group apply watcher registry.
298    pub fn group_watchers(&self) -> Arc<GroupAppliedWatchers> {
299        Arc::clone(&self.group_watchers)
300    }
301
302    /// Shared handle to this loop's standardized metrics.
303    pub fn loop_metrics(&self) -> Arc<LoopMetrics> {
304        Arc::clone(&self.loop_metrics)
305    }
306
307    /// Count of Raft groups currently mounted on this node — used to
308    /// render the `raft_tick_loop_pending_groups` gauge.
309    pub fn pending_groups(&self) -> usize {
310        let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
311        mr.group_count()
312    }
313
314    /// Signal cooperative shutdown to every detached task spawned
315    /// inside [`super::tick::do_tick`].
316    ///
317    /// This is the entry point for test harnesses that want to
318    /// tear down a `RaftLoop` without waiting for the external
319    /// `run()` shutdown watch channel to propagate. In production
320    /// the same signal is emitted automatically by `run()` when
321    /// its external shutdown receiver fires.
322    ///
323    /// Idempotent: calling this multiple times is a no-op after
324    /// the first.
325    pub fn begin_shutdown(&self) {
326        let _ = self.shutdown_watch.send(true);
327    }
328
329    /// Subscribe to the boot-time readiness signal.
330    ///
331    /// The returned receiver starts at `false` and flips to `true`
332    /// exactly once, after the first [`super::tick::do_tick`]
333    /// completes phase 4 (apply committed entries). Used by the
334    /// host crate to gate client-facing listener startup until the
335    /// metadata raft group has produced its first applied entry.
336    pub fn subscribe_ready(&self) -> tokio::sync::watch::Receiver<bool> {
337        self.ready_watch.subscribe()
338    }
339
340    /// Set a handler for incoming VShardEnvelope messages.
341    pub fn with_vshard_handler(mut self, handler: VShardEnvelopeHandler) -> Self {
342        self.vshard_handler = Some(handler);
343        self
344    }
345
346    /// Install the metadata applier used for group-0 commits.
347    ///
348    /// The host crate (nodedb) calls this with a production applier that
349    /// wraps an in-memory `MetadataCache` and additionally persists to
350    /// redb / broadcasts catalog change events. The default
351    /// [`NoopMetadataApplier`] is kept only for tests that don't care.
352    pub fn with_metadata_applier(mut self, applier: Arc<dyn MetadataApplier>) -> Self {
353        self.metadata_applier = applier;
354        self
355    }
356
357    pub fn with_tick_interval(mut self, interval: Duration) -> Self {
358        self.tick_interval = interval;
359        self
360    }
361
362    /// Attach a cluster catalog — used by the join flow to persist the
363    /// updated topology + routing after a conf-change commits.
364    pub fn with_catalog(mut self, catalog: Arc<ClusterCatalog>) -> Self {
365        // Seed the local cluster-epoch high-water mark from the catalog
366        // on attach, so the value emitted in the very first outbound
367        // RPC reflects whatever the previous incarnation persisted. A
368        // failure to load is treated as a fresh catalog (epoch 0); a
369        // genuine catalog read error would have surfaced from earlier
370        // catalog operations on this same handle.
371        if let Err(e) = crate::cluster_epoch::init_local_cluster_epoch_from_catalog(&catalog) {
372            tracing::warn!(error = %e, "failed to load persisted cluster_epoch; defaulting to 0");
373        }
374        self.catalog = Some(catalog);
375        self
376    }
377
378    /// This node's id (exposed for handlers and tests).
379    pub fn node_id(&self) -> u64 {
380        self.node_id
381    }
382
383    /// Run the event loop until shutdown.
384    ///
385    /// This drives Raft elections, heartbeats, and message dispatch.
386    /// Call [`NexarTransport::serve`] separately with `Arc<Self>` as the handler.
387    ///
388    /// When the externally-supplied `shutdown` receiver fires,
389    /// the loop also propagates the signal to the internal
390    /// cooperative-shutdown channel so every detached task
391    /// spawned inside `do_tick` exits promptly and drops its
392    /// `Arc<Mutex<MultiRaft>>` clone.
393    pub async fn run(&self, mut shutdown: tokio::sync::watch::Receiver<bool>) {
394        let mut interval = tokio::time::interval(self.tick_interval);
395        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
396        self.loop_metrics.set_up(true);
397
398        // Startup GC sweep: remove orphaned partial-snapshot files from
399        // previous runs that did not complete.
400        if let Some(ref dir) = self.data_dir {
401            match crate::install_snapshot::gc::sweep_orphans(dir, self.orphan_partial_max_age_secs)
402            {
403                Ok((removed, errs)) => {
404                    if removed > 0 {
405                        tracing::info!(removed, "startup: removed orphaned partial snapshot files");
406                    }
407                    for e in errs {
408                        tracing::warn!(error = %e, "startup: partial snapshot GC error");
409                    }
410                }
411                Err(e) => {
412                    tracing::warn!(error = %e, "startup: failed to sweep partial snapshot directory");
413                }
414            }
415        }
416
417        loop {
418            tokio::select! {
419                _ = interval.tick() => {
420                    let started = Instant::now();
421                    self.do_tick();
422                    self.loop_metrics.observe(started.elapsed());
423                }
424                _ = shutdown.changed() => {
425                    if *shutdown.borrow() {
426                        debug!("raft loop shutting down");
427                        self.begin_shutdown();
428                        break;
429                    }
430                }
431            }
432        }
433        self.loop_metrics.set_up(false);
434    }
435
436    /// Returns the inner multi-raft handle. Exposed for tests and for
437    /// the host crate's metadata proposer so it can hold a second
438    /// reference to the same underlying mutex without pulling the
439    /// whole raft loop into the caller's lifetime.
440    pub fn multi_raft_handle(&self) -> Arc<Mutex<crate::multi_raft::MultiRaft>> {
441        self.multi_raft.clone()
442    }
443
444    /// Snapshot all Raft group states for observability (SHOW RAFT GROUPS).
445    pub fn group_statuses(&self) -> Vec<crate::multi_raft::GroupStatus> {
446        let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
447        mr.group_statuses()
448    }
449}
450
451#[cfg(test)]
452mod tests {
453    use super::*;
454    use crate::routing::RoutingTable;
455    use nodedb_types::config::tuning::ClusterTransportTuning;
456    use std::sync::atomic::{AtomicU64, Ordering};
457    use std::time::Instant;
458
459    /// Test applier that counts applied entries across both data and
460    /// metadata groups. The metadata-group variant ([`CountingMetadataApplier`])
461    /// increments the same counter so tests that propose against group 0
462    /// (the metadata group) still see the count move.
463    pub(crate) struct CountingApplier {
464        applied: Arc<AtomicU64>,
465    }
466
467    impl CountingApplier {
468        pub(crate) fn new() -> Self {
469            Self {
470                applied: Arc::new(AtomicU64::new(0)),
471            }
472        }
473
474        pub(crate) fn count(&self) -> u64 {
475            self.applied.load(Ordering::Relaxed)
476        }
477
478        pub(crate) fn metadata_applier(&self) -> Arc<CountingMetadataApplier> {
479            Arc::new(CountingMetadataApplier {
480                applied: self.applied.clone(),
481            })
482        }
483    }
484
485    impl CommitApplier for CountingApplier {
486        fn apply_committed(&self, _group_id: u64, entries: &[LogEntry]) -> u64 {
487            self.applied
488                .fetch_add(entries.len() as u64, Ordering::Relaxed);
489            entries.last().map(|e| e.index).unwrap_or(0)
490        }
491    }
492
493    pub(crate) struct CountingMetadataApplier {
494        applied: Arc<AtomicU64>,
495    }
496
497    impl MetadataApplier for CountingMetadataApplier {
498        fn apply(&self, entries: &[(u64, Vec<u8>)]) -> u64 {
499            self.applied
500                .fetch_add(entries.len() as u64, Ordering::Relaxed);
501            entries.last().map(|(idx, _)| *idx).unwrap_or(0)
502        }
503    }
504
505    /// Helper: create a transport on an ephemeral port.
506    fn make_transport(node_id: u64) -> Arc<NexarTransport> {
507        Arc::new(
508            NexarTransport::new(
509                node_id,
510                "127.0.0.1:0".parse().unwrap(),
511                crate::transport::credentials::TransportCredentials::Insecure,
512            )
513            .unwrap(),
514        )
515    }
516
517    #[tokio::test]
518    async fn single_node_raft_loop_commits() {
519        let dir = tempfile::tempdir().unwrap();
520        let transport = make_transport(1);
521        // uniform(1, ...) creates metadata group 0 + data group 1.
522        let rt = RoutingTable::uniform(1, &[1], 1);
523        let mut mr = MultiRaft::new(1, rt, dir.path().to_path_buf());
524        // Add both the metadata group (0) and the data group (1).
525        mr.add_group(0, vec![]).unwrap();
526        mr.add_group(1, vec![]).unwrap();
527
528        for node in mr.groups_mut().values_mut() {
529            node.election_deadline_override(Instant::now() - Duration::from_millis(1));
530        }
531
532        let applier = CountingApplier::new();
533        let meta = applier.metadata_applier();
534        let topo = Arc::new(RwLock::new(ClusterTopology::new()));
535        let raft_loop =
536            Arc::new(RaftLoop::new(mr, transport, topo, applier).with_metadata_applier(meta));
537
538        let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
539
540        let rl = raft_loop.clone();
541        let run_handle = tokio::spawn(async move {
542            rl.run(shutdown_rx).await;
543        });
544
545        tokio::time::sleep(Duration::from_millis(50)).await;
546
547        assert!(
548            raft_loop.applier.count() >= 1,
549            "expected at least 1 applied entry (no-op), got {}",
550            raft_loop.applier.count()
551        );
552
553        // vshard 0 maps to data group 1 (not metadata group 0).
554        let (_gid, idx) = raft_loop.propose(0, b"hello".to_vec()).unwrap();
555        assert!(idx >= 2);
556
557        tokio::time::sleep(Duration::from_millis(50)).await;
558
559        assert!(
560            raft_loop.applier.count() >= 2,
561            "expected at least 2 applied entries, got {}",
562            raft_loop.applier.count()
563        );
564
565        shutdown_tx.send(true).unwrap();
566        run_handle.abort();
567    }
568
569    #[tokio::test]
570    async fn three_node_election_over_quic() {
571        let t1 = make_transport(1);
572        let t2 = make_transport(2);
573        let t3 = make_transport(3);
574
575        t1.register_peer(2, t2.local_addr());
576        t1.register_peer(3, t3.local_addr());
577        t2.register_peer(1, t1.local_addr());
578        t2.register_peer(3, t3.local_addr());
579        t3.register_peer(1, t1.local_addr());
580        t3.register_peer(2, t2.local_addr());
581
582        // uniform(1, ...) creates metadata group 0 + data group 1.
583        // Both are added to every MultiRaft so vshard proposals (group 1)
584        // and metadata proposals (group 0) both work.
585        let rt = RoutingTable::uniform(1, &[1, 2, 3], 3);
586
587        let dir1 = tempfile::tempdir().unwrap();
588        let mut mr1 = MultiRaft::new(1, rt.clone(), dir1.path().to_path_buf());
589        mr1.add_group(0, vec![2, 3]).unwrap();
590        mr1.add_group(1, vec![2, 3]).unwrap();
591        for node in mr1.groups_mut().values_mut() {
592            node.election_deadline_override(Instant::now() - Duration::from_millis(1));
593        }
594
595        let transport_tuning = ClusterTransportTuning::default();
596        let election_timeout_min =
597            Duration::from_millis(transport_tuning.effective_election_timeout_min_ms());
598        let election_timeout_max =
599            Duration::from_millis(transport_tuning.effective_election_timeout_max_ms());
600
601        let dir2 = tempfile::tempdir().unwrap();
602        let mut mr2 = MultiRaft::new(2, rt.clone(), dir2.path().to_path_buf())
603            .with_election_timeout(election_timeout_min, election_timeout_max);
604        mr2.add_group(0, vec![1, 3]).unwrap();
605        mr2.add_group(1, vec![1, 3]).unwrap();
606
607        let dir3 = tempfile::tempdir().unwrap();
608        let mut mr3 = MultiRaft::new(3, rt.clone(), dir3.path().to_path_buf())
609            .with_election_timeout(election_timeout_min, election_timeout_max);
610        mr3.add_group(0, vec![1, 2]).unwrap();
611        mr3.add_group(1, vec![1, 2]).unwrap();
612
613        let a1 = CountingApplier::new();
614        let m1 = a1.metadata_applier();
615        let a2 = CountingApplier::new();
616        let m2 = a2.metadata_applier();
617        let a3 = CountingApplier::new();
618        let m3 = a3.metadata_applier();
619
620        let topo1 = Arc::new(RwLock::new(ClusterTopology::new()));
621        let topo2 = Arc::new(RwLock::new(ClusterTopology::new()));
622        let topo3 = Arc::new(RwLock::new(ClusterTopology::new()));
623
624        let rl1 = Arc::new(RaftLoop::new(mr1, t1.clone(), topo1, a1).with_metadata_applier(m1));
625        let rl2 = Arc::new(RaftLoop::new(mr2, t2.clone(), topo2, a2).with_metadata_applier(m2));
626        let rl3 = Arc::new(RaftLoop::new(mr3, t3.clone(), topo3, a3).with_metadata_applier(m3));
627
628        let (shutdown_tx, _) = tokio::sync::watch::channel(false);
629
630        let rl2_h = rl2.clone();
631        let sr2 = shutdown_tx.subscribe();
632        tokio::spawn(async move { t2.serve(rl2_h, sr2).await });
633
634        let rl3_h = rl3.clone();
635        let sr3 = shutdown_tx.subscribe();
636        tokio::spawn(async move { t3.serve(rl3_h, sr3).await });
637
638        let rl1_r = rl1.clone();
639        let sr1 = shutdown_tx.subscribe();
640        tokio::spawn(async move { rl1_r.run(sr1).await });
641
642        let rl2_r = rl2.clone();
643        let sr2r = shutdown_tx.subscribe();
644        tokio::spawn(async move { rl2_r.run(sr2r).await });
645
646        let rl3_r = rl3.clone();
647        let sr3r = shutdown_tx.subscribe();
648        tokio::spawn(async move { rl3_r.run(sr3r).await });
649
650        let rl1_h = rl1.clone();
651        let sr1h = shutdown_tx.subscribe();
652        tokio::spawn(async move { t1.serve(rl1_h, sr1h).await });
653
654        // Poll until node 1 commits at least the no-op (election done).
655        let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
656        loop {
657            if rl1.applier.count() >= 1 {
658                break;
659            }
660            assert!(
661                tokio::time::Instant::now() < deadline,
662                "node 1 should have committed at least the no-op, got {}",
663                rl1.applier.count()
664            );
665            tokio::time::sleep(Duration::from_millis(20)).await;
666        }
667
668        let (_gid, idx) = rl1.propose(0, b"distributed-cmd".to_vec()).unwrap();
669        assert!(idx >= 2);
670
671        // Poll until all nodes replicate the proposed command.
672        let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
673        loop {
674            if rl1.applier.count() >= 2 && rl2.applier.count() >= 1 && rl3.applier.count() >= 1 {
675                break;
676            }
677            assert!(
678                tokio::time::Instant::now() < deadline,
679                "replication timed out: n1={}, n2={}, n3={}",
680                rl1.applier.count(),
681                rl2.applier.count(),
682                rl3.applier.count()
683            );
684            tokio::time::sleep(Duration::from_millis(20)).await;
685        }
686
687        shutdown_tx.send(true).unwrap();
688    }
689}