nodedb-cluster 0.2.1

Distributed coordination layer for NodeDB — vShards, QUIC transport, and replication
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
// SPDX-License-Identifier: BUSL-1.1

//! `RaftLoop` struct, constructors, top-level run loop, and thin wrappers
//! over `MultiRaft` proposal APIs. The tick body lives in
//! [`super::tick`]; the inbound-RPC handler lives in
//! [`super::handle_rpc`]; the async join orchestration lives in
//! [`super::join`].

use std::pin::Pin;
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant};

use tracing::debug;

use nodedb_raft::message::LogEntry;

use crate::applied_watcher::GroupAppliedWatchers;
use crate::catalog::ClusterCatalog;
use crate::error::Result;
use crate::forward::{NoopPlanExecutor, PlanExecutor};
use crate::loop_metrics::LoopMetrics;
use crate::metadata_group::applier::{MetadataApplier, NoopMetadataApplier};
use crate::multi_raft::MultiRaft;
use crate::topology::ClusterTopology;
use crate::transport::NexarTransport;

/// Default tick interval (10ms — fast enough for sub-second elections).
///
/// Matches `ClusterTransportTuning::raft_tick_interval_ms` default.
pub(super) const DEFAULT_TICK_INTERVAL: Duration = Duration::from_millis(10);

/// Callback for applying committed Raft log entries to the state machine.
///
/// Called synchronously during the tick loop. Implementations should be fast
/// (enqueue to SPSC, not perform I/O directly).
pub trait CommitApplier: Send + Sync + 'static {
    /// Apply committed entries for a Raft group.
    ///
    /// Returns the index of the last successfully applied entry.
    fn apply_committed(&self, group_id: u64, entries: &[LogEntry]) -> u64;
}

/// Hook for quarantine integration on the Raft snapshot receive path.
///
/// `nodedb-cluster` cannot depend on `nodedb` (circular), so the host crate
/// (`nodedb`) supplies an implementation backed by its `QuarantineRegistry`.
/// Cluster-only tests leave the field `None`, which skips all quarantine
/// accounting.
///
/// All methods take `(group_id, last_included_index)` as the snapshot identity.
pub trait SnapshotQuarantineHook: Send + Sync + 'static {
    /// Returns `true` if the chunk identified by `(group_id, index)` is
    /// already in the quarantined state and should be rejected immediately
    /// without attempting to decode it.
    fn is_quarantined(&self, group_id: u64, last_included_index: u64) -> bool;

    /// Called after a successful decode — resets the strike counter so a
    /// single transient CRC error is not held against a healthy peer.
    fn record_success(&self, group_id: u64, last_included_index: u64);

    /// Called on a CRC-class decode failure.
    ///
    /// Returns `true` when the segment has just been quarantined (second
    /// consecutive failure), and `false` on the first strike (caller should
    /// surface the framing error and allow the peer to retry).
    fn record_failure(&self, group_id: u64, last_included_index: u64, error: &str) -> bool;
}

/// Type-erased async handler for incoming `VShardEnvelope` messages.
///
/// Receives raw envelope bytes, returns response bytes. Set by the main binary
/// to dispatch to the appropriate engine handler (Event Plane, timeseries, etc.).
pub type VShardEnvelopeHandler = Arc<
    dyn Fn(Vec<u8>) -> Pin<Box<dyn std::future::Future<Output = Result<Vec<u8>>> + Send>>
        + Send
        + Sync,
>;

/// Raft event loop coordinator.
///
/// Owns the MultiRaft state (behind `Arc<Mutex>`) and drives it via periodic
/// ticks. Implements [`crate::transport::RaftRpcHandler`] (in
/// [`super::handle_rpc`]) so it can be passed directly to
/// [`NexarTransport::serve`] for incoming RPC dispatch.
///
/// The `F: RequestForwarder` generic parameter was removed in C-δ.6 when the
/// SQL-string forwarding path was retired. Cross-node SQL routing now goes
/// through `gateway.execute / ExecuteRequest` (C-β path).
pub struct RaftLoop<A: CommitApplier, P: PlanExecutor = NoopPlanExecutor> {
    pub(super) node_id: u64,
    pub(super) multi_raft: Arc<Mutex<MultiRaft>>,
    pub(super) transport: Arc<NexarTransport>,
    pub(super) topology: Arc<RwLock<ClusterTopology>>,
    pub(super) applier: A,
    /// Applies committed entries from the metadata Raft group (group 0).
    pub(super) metadata_applier: Arc<dyn MetadataApplier>,
    /// Executes incoming `ExecuteRequest` RPCs without SQL re-planning.
    pub(super) plan_executor: Arc<P>,
    pub(super) tick_interval: Duration,
    /// Optional handler for incoming VShardEnvelope messages.
    /// Set when the Event Plane or other subsystems need cross-node messaging.
    pub(super) vshard_handler: Option<VShardEnvelopeHandler>,
    /// Optional catalog handle for persisting topology/routing updates
    /// from the join flow. When `None`, persistence is skipped — useful
    /// for unit tests that don't care about durability.
    pub(super) catalog: Option<Arc<ClusterCatalog>>,
    /// Cooperative shutdown signal observed by every detached
    /// `tokio::spawn` task in [`super::tick`]. `run()` flips it on
    /// its own shutdown, and [`Self::begin_shutdown`] provides a
    /// direct entry point for test harnesses that abort the run /
    /// serve handles and need the spawned tasks to drop their
    /// `Arc<Mutex<MultiRaft>>` clones immediately so the per-group
    /// redb log files can release their in-process locks.
    ///
    /// Using `watch::Sender` here rather than a raw `AtomicBool` +
    /// `Notify` pair gives us two properties at once: the latest
    /// value is visible to every newly-subscribed receiver (no
    /// missed-notification race when a new detached task is
    /// spawned just after `begin_shutdown`), and awaiting
    /// `receiver.changed()` is cancellable inside `tokio::select!`.
    pub(super) shutdown_watch: tokio::sync::watch::Sender<bool>,
    /// Standardized loop observations. Updated inside `run()` after
    /// every `do_tick`. Register via
    /// [`Self::loop_metrics`] with the cluster registry.
    pub(super) loop_metrics: Arc<LoopMetrics>,
    /// Boot-time readiness signal. Flipped to `true` by
    /// [`super::tick::do_tick`] after the first tick completes phase
    /// 4 (apply committed entries) — i.e. once Raft has driven at
    /// least one no-op or replayed entries past the persisted
    /// applied watermark on this node.
    ///
    /// The host crate's `start_raft` returns `subscribe_ready()` to
    /// `main.rs`, which awaits it before binding any client-facing
    /// listener. This guarantees the first SQL DDL the operator
    /// runs after process start cannot race against an
    /// uninitialized metadata raft group, which previously surfaced
    /// as `metadata propose: not leader` under fast restart loops.
    pub(super) ready_watch: tokio::sync::watch::Sender<bool>,
    /// Per-Raft-group apply watermark watchers. Bumped from
    /// [`super::tick::do_tick`] after applies and from
    /// [`super::handle_rpc`] after snapshot installs. The host crate
    /// shares this Arc with `SharedState` so proposers, lease
    /// renewals, and consistent reads can wait on the apply
    /// watermark of the *specific* group whose proposal they made.
    pub(super) group_watchers: Arc<GroupAppliedWatchers>,
    /// Tracks whether this node was the metadata-group leader on the
    /// previous tick. Used to detect false→true edges so the cluster
    /// epoch (see [`crate::cluster_epoch`]) can be bumped exactly once
    /// per leadership acquisition. `AtomicBool` because [`super::tick::do_tick`]
    /// runs against `&self`.
    pub(super) prev_metadata_leader: std::sync::atomic::AtomicBool,

    /// Optional quarantine hook for the snapshot receive path.
    ///
    /// When set (by the `nodedb` binary via `with_snapshot_quarantine_hook`),
    /// the `InstallSnapshotRequest` handler checks whether the incoming chunk
    /// is already quarantined, records successes to clear transient strikes, and
    /// records consecutive failures to quarantine persistently.
    ///
    /// Cluster-only tests leave this as `None`, which disables all quarantine
    /// accounting for snapshot chunks.
    pub(super) snapshot_quarantine_hook: Option<Arc<dyn SnapshotQuarantineHook>>,

    /// In-progress partial snapshot receives, keyed by `group_id`.
    ///
    /// Each entry tracks the `.partial` file, running CRC, and expected next
    /// byte offset for a follower that is currently receiving a chunked snapshot.
    /// Entries are created on `offset == 0`, updated on each chunk, and removed
    /// on finalization or offset regression.
    pub(super) partial_snapshots: Arc<crate::install_snapshot::PartialSnapshotMap>,

    /// Data directory for persistent partial-snapshot files and the
    /// `recv_snapshots/` subdirectory. `None` in tests that don't exercise
    /// the disk path.
    pub(super) data_dir: Option<std::path::PathBuf>,

    /// Snapshot chunk size for the sender path (bytes).
    pub(super) snapshot_chunk_bytes: u64,

    /// Orphan partial-snapshot max age for the GC sweeper (seconds).
    pub(super) orphan_partial_max_age_secs: u64,
}

impl<A: CommitApplier> RaftLoop<A> {
    pub fn new(
        multi_raft: MultiRaft,
        transport: Arc<NexarTransport>,
        topology: Arc<RwLock<ClusterTopology>>,
        applier: A,
    ) -> Self {
        let node_id = multi_raft.node_id();
        let (shutdown_watch, _) = tokio::sync::watch::channel(false);
        let (ready_watch, _) = tokio::sync::watch::channel(false);
        Self {
            node_id,
            multi_raft: Arc::new(Mutex::new(multi_raft)),
            transport,
            topology,
            applier,
            metadata_applier: Arc::new(NoopMetadataApplier),
            plan_executor: Arc::new(NoopPlanExecutor),
            tick_interval: DEFAULT_TICK_INTERVAL,
            vshard_handler: None,
            catalog: None,
            shutdown_watch,
            ready_watch,
            loop_metrics: LoopMetrics::new("raft_tick_loop"),
            group_watchers: Arc::new(GroupAppliedWatchers::new()),
            prev_metadata_leader: std::sync::atomic::AtomicBool::new(false),
            snapshot_quarantine_hook: None,
            partial_snapshots: Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
            data_dir: None,
            snapshot_chunk_bytes: 4 * 1024 * 1024,
            orphan_partial_max_age_secs: 300,
        }
    }
}

impl<A: CommitApplier, P: PlanExecutor> RaftLoop<A, P> {
    /// Install the snapshot quarantine hook (mutable setter variant).
    ///
    /// Prefer `with_snapshot_quarantine_hook` on the builder chain unless you
    /// need to set the hook after construction.
    pub fn set_snapshot_quarantine_hook(&mut self, hook: Arc<dyn SnapshotQuarantineHook>) {
        self.snapshot_quarantine_hook = Some(hook);
    }
    /// Install a custom plan executor (for cluster mode — C-β path).
    pub fn with_plan_executor<P2: PlanExecutor>(self, executor: Arc<P2>) -> RaftLoop<A, P2> {
        RaftLoop {
            node_id: self.node_id,
            multi_raft: self.multi_raft,
            transport: self.transport,
            topology: self.topology,
            applier: self.applier,
            metadata_applier: self.metadata_applier,
            plan_executor: executor,
            tick_interval: self.tick_interval,
            vshard_handler: self.vshard_handler,
            catalog: self.catalog,
            shutdown_watch: self.shutdown_watch,
            ready_watch: self.ready_watch,
            loop_metrics: self.loop_metrics,
            group_watchers: self.group_watchers,
            prev_metadata_leader: self.prev_metadata_leader,
            snapshot_quarantine_hook: self.snapshot_quarantine_hook,
            partial_snapshots: self.partial_snapshots,
            data_dir: self.data_dir,
            snapshot_chunk_bytes: self.snapshot_chunk_bytes,
            orphan_partial_max_age_secs: self.orphan_partial_max_age_secs,
        }
    }

    /// Replace the per-group apply watcher registry.
    ///
    /// The host crate calls this with the same `Arc` it stores on
    /// `SharedState` so proposers and consistent-read paths share
    /// one registry with the tick loop's bump points. Defaults to a
    /// fresh empty registry when not set.
    pub fn with_group_watchers(mut self, watchers: Arc<GroupAppliedWatchers>) -> Self {
        self.group_watchers = watchers;
        self
    }

    /// Attach the snapshot quarantine hook (builder chain variant).
    ///
    /// The supplied implementation is called by the `InstallSnapshotRequest`
    /// handler to check for, record, and short-circuit quarantined chunks.
    pub fn with_snapshot_quarantine_hook(mut self, hook: Arc<dyn SnapshotQuarantineHook>) -> Self {
        self.snapshot_quarantine_hook = Some(hook);
        self
    }

    /// Set the data directory for partial-snapshot persistence and GC.
    ///
    /// When set, the `InstallSnapshotRequest` handler writes chunks to
    /// `<data_dir>/recv_snapshots/<group_id>.partial` and the GC sweeper
    /// removes stale partials on startup. When `None` (the default, used by
    /// unit tests), disk writes are skipped — the receiver operates in-memory
    /// only with empty chunk data.
    pub fn with_data_dir(mut self, data_dir: std::path::PathBuf) -> Self {
        self.data_dir = Some(data_dir);
        self
    }

    /// Override the snapshot chunk byte size (default: 4 MiB).
    pub fn with_snapshot_chunk_bytes(mut self, chunk_bytes: u64) -> Self {
        self.snapshot_chunk_bytes = chunk_bytes;
        self
    }

    /// Override the orphan-partial max age for the GC sweeper (default: 300 s).
    pub fn with_orphan_partial_max_age_secs(mut self, secs: u64) -> Self {
        self.orphan_partial_max_age_secs = secs;
        self
    }

    /// Shared handle to the per-group apply watcher registry.
    pub fn group_watchers(&self) -> Arc<GroupAppliedWatchers> {
        Arc::clone(&self.group_watchers)
    }

    /// Shared handle to this loop's standardized metrics.
    pub fn loop_metrics(&self) -> Arc<LoopMetrics> {
        Arc::clone(&self.loop_metrics)
    }

    /// Count of Raft groups currently mounted on this node — used to
    /// render the `raft_tick_loop_pending_groups` gauge.
    pub fn pending_groups(&self) -> usize {
        let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
        mr.group_count()
    }

    /// Signal cooperative shutdown to every detached task spawned
    /// inside [`super::tick::do_tick`].
    ///
    /// This is the entry point for test harnesses that want to
    /// tear down a `RaftLoop` without waiting for the external
    /// `run()` shutdown watch channel to propagate. In production
    /// the same signal is emitted automatically by `run()` when
    /// its external shutdown receiver fires.
    ///
    /// Idempotent: calling this multiple times is a no-op after
    /// the first.
    pub fn begin_shutdown(&self) {
        let _ = self.shutdown_watch.send(true);
    }

    /// Subscribe to the boot-time readiness signal.
    ///
    /// The returned receiver starts at `false` and flips to `true`
    /// exactly once, after the first [`super::tick::do_tick`]
    /// completes phase 4 (apply committed entries). Used by the
    /// host crate to gate client-facing listener startup until the
    /// metadata raft group has produced its first applied entry.
    pub fn subscribe_ready(&self) -> tokio::sync::watch::Receiver<bool> {
        self.ready_watch.subscribe()
    }

    /// Set a handler for incoming VShardEnvelope messages.
    pub fn with_vshard_handler(mut self, handler: VShardEnvelopeHandler) -> Self {
        self.vshard_handler = Some(handler);
        self
    }

    /// Install the metadata applier used for group-0 commits.
    ///
    /// The host crate (nodedb) calls this with a production applier that
    /// wraps an in-memory `MetadataCache` and additionally persists to
    /// redb / broadcasts catalog change events. The default
    /// [`NoopMetadataApplier`] is kept only for tests that don't care.
    pub fn with_metadata_applier(mut self, applier: Arc<dyn MetadataApplier>) -> Self {
        self.metadata_applier = applier;
        self
    }

    pub fn with_tick_interval(mut self, interval: Duration) -> Self {
        self.tick_interval = interval;
        self
    }

    /// Attach a cluster catalog — used by the join flow to persist the
    /// updated topology + routing after a conf-change commits.
    pub fn with_catalog(mut self, catalog: Arc<ClusterCatalog>) -> Self {
        // Seed the local cluster-epoch high-water mark from the catalog
        // on attach, so the value emitted in the very first outbound
        // RPC reflects whatever the previous incarnation persisted. A
        // failure to load is treated as a fresh catalog (epoch 0); a
        // genuine catalog read error would have surfaced from earlier
        // catalog operations on this same handle.
        if let Err(e) = crate::cluster_epoch::init_local_cluster_epoch_from_catalog(&catalog) {
            tracing::warn!(error = %e, "failed to load persisted cluster_epoch; defaulting to 0");
        }
        self.catalog = Some(catalog);
        self
    }

    /// This node's id (exposed for handlers and tests).
    pub fn node_id(&self) -> u64 {
        self.node_id
    }

    /// Run the event loop until shutdown.
    ///
    /// This drives Raft elections, heartbeats, and message dispatch.
    /// Call [`NexarTransport::serve`] separately with `Arc<Self>` as the handler.
    ///
    /// When the externally-supplied `shutdown` receiver fires,
    /// the loop also propagates the signal to the internal
    /// cooperative-shutdown channel so every detached task
    /// spawned inside `do_tick` exits promptly and drops its
    /// `Arc<Mutex<MultiRaft>>` clone.
    pub async fn run(&self, mut shutdown: tokio::sync::watch::Receiver<bool>) {
        let mut interval = tokio::time::interval(self.tick_interval);
        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
        self.loop_metrics.set_up(true);

        // Startup GC sweep: remove orphaned partial-snapshot files from
        // previous runs that did not complete.
        if let Some(ref dir) = self.data_dir {
            match crate::install_snapshot::gc::sweep_orphans(dir, self.orphan_partial_max_age_secs)
            {
                Ok((removed, errs)) => {
                    if removed > 0 {
                        tracing::info!(removed, "startup: removed orphaned partial snapshot files");
                    }
                    for e in errs {
                        tracing::warn!(error = %e, "startup: partial snapshot GC error");
                    }
                }
                Err(e) => {
                    tracing::warn!(error = %e, "startup: failed to sweep partial snapshot directory");
                }
            }
        }

        loop {
            tokio::select! {
                _ = interval.tick() => {
                    let started = Instant::now();
                    self.do_tick();
                    self.loop_metrics.observe(started.elapsed());
                }
                _ = shutdown.changed() => {
                    if *shutdown.borrow() {
                        debug!("raft loop shutting down");
                        self.begin_shutdown();
                        break;
                    }
                }
            }
        }
        self.loop_metrics.set_up(false);
    }

    /// Returns the inner multi-raft handle. Exposed for tests and for
    /// the host crate's metadata proposer so it can hold a second
    /// reference to the same underlying mutex without pulling the
    /// whole raft loop into the caller's lifetime.
    pub fn multi_raft_handle(&self) -> Arc<Mutex<crate::multi_raft::MultiRaft>> {
        self.multi_raft.clone()
    }

    /// Snapshot all Raft group states for observability (SHOW RAFT GROUPS).
    pub fn group_statuses(&self) -> Vec<crate::multi_raft::GroupStatus> {
        let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
        mr.group_statuses()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::routing::RoutingTable;
    use nodedb_types::config::tuning::ClusterTransportTuning;
    use std::sync::atomic::{AtomicU64, Ordering};
    use std::time::Instant;

    /// Test applier that counts applied entries across both data and
    /// metadata groups. The metadata-group variant ([`CountingMetadataApplier`])
    /// increments the same counter so tests that propose against group 0
    /// (the metadata group) still see the count move.
    pub(crate) struct CountingApplier {
        applied: Arc<AtomicU64>,
    }

    impl CountingApplier {
        pub(crate) fn new() -> Self {
            Self {
                applied: Arc::new(AtomicU64::new(0)),
            }
        }

        pub(crate) fn count(&self) -> u64 {
            self.applied.load(Ordering::Relaxed)
        }

        pub(crate) fn metadata_applier(&self) -> Arc<CountingMetadataApplier> {
            Arc::new(CountingMetadataApplier {
                applied: self.applied.clone(),
            })
        }
    }

    impl CommitApplier for CountingApplier {
        fn apply_committed(&self, _group_id: u64, entries: &[LogEntry]) -> u64 {
            self.applied
                .fetch_add(entries.len() as u64, Ordering::Relaxed);
            entries.last().map(|e| e.index).unwrap_or(0)
        }
    }

    pub(crate) struct CountingMetadataApplier {
        applied: Arc<AtomicU64>,
    }

    impl MetadataApplier for CountingMetadataApplier {
        fn apply(&self, entries: &[(u64, Vec<u8>)]) -> u64 {
            self.applied
                .fetch_add(entries.len() as u64, Ordering::Relaxed);
            entries.last().map(|(idx, _)| *idx).unwrap_or(0)
        }
    }

    /// Helper: create a transport on an ephemeral port.
    fn make_transport(node_id: u64) -> Arc<NexarTransport> {
        Arc::new(
            NexarTransport::new(
                node_id,
                "127.0.0.1:0".parse().unwrap(),
                crate::transport::credentials::TransportCredentials::Insecure,
            )
            .unwrap(),
        )
    }

    #[tokio::test]
    async fn single_node_raft_loop_commits() {
        let dir = tempfile::tempdir().unwrap();
        let transport = make_transport(1);
        // uniform(1, ...) creates metadata group 0 + data group 1.
        let rt = RoutingTable::uniform(1, &[1], 1);
        let mut mr = MultiRaft::new(1, rt, dir.path().to_path_buf());
        // Add both the metadata group (0) and the data group (1).
        mr.add_group(0, vec![]).unwrap();
        mr.add_group(1, vec![]).unwrap();

        for node in mr.groups_mut().values_mut() {
            node.election_deadline_override(Instant::now() - Duration::from_millis(1));
        }

        let applier = CountingApplier::new();
        let meta = applier.metadata_applier();
        let topo = Arc::new(RwLock::new(ClusterTopology::new()));
        let raft_loop =
            Arc::new(RaftLoop::new(mr, transport, topo, applier).with_metadata_applier(meta));

        let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);

        let rl = raft_loop.clone();
        let run_handle = tokio::spawn(async move {
            rl.run(shutdown_rx).await;
        });

        tokio::time::sleep(Duration::from_millis(50)).await;

        assert!(
            raft_loop.applier.count() >= 1,
            "expected at least 1 applied entry (no-op), got {}",
            raft_loop.applier.count()
        );

        // vshard 0 maps to data group 1 (not metadata group 0).
        let (_gid, idx) = raft_loop.propose(0, b"hello".to_vec()).unwrap();
        assert!(idx >= 2);

        tokio::time::sleep(Duration::from_millis(50)).await;

        assert!(
            raft_loop.applier.count() >= 2,
            "expected at least 2 applied entries, got {}",
            raft_loop.applier.count()
        );

        shutdown_tx.send(true).unwrap();
        run_handle.abort();
    }

    #[tokio::test]
    async fn three_node_election_over_quic() {
        let t1 = make_transport(1);
        let t2 = make_transport(2);
        let t3 = make_transport(3);

        t1.register_peer(2, t2.local_addr());
        t1.register_peer(3, t3.local_addr());
        t2.register_peer(1, t1.local_addr());
        t2.register_peer(3, t3.local_addr());
        t3.register_peer(1, t1.local_addr());
        t3.register_peer(2, t2.local_addr());

        // uniform(1, ...) creates metadata group 0 + data group 1.
        // Both are added to every MultiRaft so vshard proposals (group 1)
        // and metadata proposals (group 0) both work.
        let rt = RoutingTable::uniform(1, &[1, 2, 3], 3);

        let dir1 = tempfile::tempdir().unwrap();
        let mut mr1 = MultiRaft::new(1, rt.clone(), dir1.path().to_path_buf());
        mr1.add_group(0, vec![2, 3]).unwrap();
        mr1.add_group(1, vec![2, 3]).unwrap();
        for node in mr1.groups_mut().values_mut() {
            node.election_deadline_override(Instant::now() - Duration::from_millis(1));
        }

        let transport_tuning = ClusterTransportTuning::default();
        let election_timeout_min =
            Duration::from_millis(transport_tuning.effective_election_timeout_min_ms());
        let election_timeout_max =
            Duration::from_millis(transport_tuning.effective_election_timeout_max_ms());

        let dir2 = tempfile::tempdir().unwrap();
        let mut mr2 = MultiRaft::new(2, rt.clone(), dir2.path().to_path_buf())
            .with_election_timeout(election_timeout_min, election_timeout_max);
        mr2.add_group(0, vec![1, 3]).unwrap();
        mr2.add_group(1, vec![1, 3]).unwrap();

        let dir3 = tempfile::tempdir().unwrap();
        let mut mr3 = MultiRaft::new(3, rt.clone(), dir3.path().to_path_buf())
            .with_election_timeout(election_timeout_min, election_timeout_max);
        mr3.add_group(0, vec![1, 2]).unwrap();
        mr3.add_group(1, vec![1, 2]).unwrap();

        let a1 = CountingApplier::new();
        let m1 = a1.metadata_applier();
        let a2 = CountingApplier::new();
        let m2 = a2.metadata_applier();
        let a3 = CountingApplier::new();
        let m3 = a3.metadata_applier();

        let topo1 = Arc::new(RwLock::new(ClusterTopology::new()));
        let topo2 = Arc::new(RwLock::new(ClusterTopology::new()));
        let topo3 = Arc::new(RwLock::new(ClusterTopology::new()));

        let rl1 = Arc::new(RaftLoop::new(mr1, t1.clone(), topo1, a1).with_metadata_applier(m1));
        let rl2 = Arc::new(RaftLoop::new(mr2, t2.clone(), topo2, a2).with_metadata_applier(m2));
        let rl3 = Arc::new(RaftLoop::new(mr3, t3.clone(), topo3, a3).with_metadata_applier(m3));

        let (shutdown_tx, _) = tokio::sync::watch::channel(false);

        let rl2_h = rl2.clone();
        let sr2 = shutdown_tx.subscribe();
        tokio::spawn(async move { t2.serve(rl2_h, sr2).await });

        let rl3_h = rl3.clone();
        let sr3 = shutdown_tx.subscribe();
        tokio::spawn(async move { t3.serve(rl3_h, sr3).await });

        let rl1_r = rl1.clone();
        let sr1 = shutdown_tx.subscribe();
        tokio::spawn(async move { rl1_r.run(sr1).await });

        let rl2_r = rl2.clone();
        let sr2r = shutdown_tx.subscribe();
        tokio::spawn(async move { rl2_r.run(sr2r).await });

        let rl3_r = rl3.clone();
        let sr3r = shutdown_tx.subscribe();
        tokio::spawn(async move { rl3_r.run(sr3r).await });

        let rl1_h = rl1.clone();
        let sr1h = shutdown_tx.subscribe();
        tokio::spawn(async move { t1.serve(rl1_h, sr1h).await });

        // Poll until node 1 commits at least the no-op (election done).
        let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
        loop {
            if rl1.applier.count() >= 1 {
                break;
            }
            assert!(
                tokio::time::Instant::now() < deadline,
                "node 1 should have committed at least the no-op, got {}",
                rl1.applier.count()
            );
            tokio::time::sleep(Duration::from_millis(20)).await;
        }

        let (_gid, idx) = rl1.propose(0, b"distributed-cmd".to_vec()).unwrap();
        assert!(idx >= 2);

        // Poll until all nodes replicate the proposed command.
        let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
        loop {
            if rl1.applier.count() >= 2 && rl2.applier.count() >= 1 && rl3.applier.count() >= 1 {
                break;
            }
            assert!(
                tokio::time::Instant::now() < deadline,
                "replication timed out: n1={}, n2={}, n3={}",
                rl1.applier.count(),
                rl2.applier.count(),
                rl3.applier.count()
            );
            tokio::time::sleep(Duration::from_millis(20)).await;
        }

        shutdown_tx.send(true).unwrap();
    }
}