1use std::pin::Pin;
8use std::sync::{Arc, Mutex, RwLock};
9use std::time::{Duration, Instant};
10
11use tracing::debug;
12
13use nodedb_raft::message::LogEntry;
14
15use crate::catalog::ClusterCatalog;
16use crate::conf_change::ConfChange;
17use crate::error::Result;
18use crate::forward::{NoopPlanExecutor, PlanExecutor};
19use crate::loop_metrics::LoopMetrics;
20use crate::metadata_group::applier::{MetadataApplier, NoopMetadataApplier};
21use crate::multi_raft::MultiRaft;
22use crate::topology::ClusterTopology;
23use crate::transport::NexarTransport;
24
25pub(super) const DEFAULT_TICK_INTERVAL: Duration = Duration::from_millis(10);
29
30pub trait CommitApplier: Send + Sync + 'static {
35 fn apply_committed(&self, group_id: u64, entries: &[LogEntry]) -> u64;
39}
40
41pub type VShardEnvelopeHandler = Arc<
46 dyn Fn(Vec<u8>) -> Pin<Box<dyn std::future::Future<Output = Result<Vec<u8>>> + Send>>
47 + Send
48 + Sync,
49>;
50
51pub struct RaftLoop<A: CommitApplier, P: PlanExecutor = NoopPlanExecutor> {
62 pub(super) node_id: u64,
63 pub(super) multi_raft: Arc<Mutex<MultiRaft>>,
64 pub(super) transport: Arc<NexarTransport>,
65 pub(super) topology: Arc<RwLock<ClusterTopology>>,
66 pub(super) applier: A,
67 pub(super) metadata_applier: Arc<dyn MetadataApplier>,
69 pub(super) plan_executor: Arc<P>,
71 pub(super) tick_interval: Duration,
72 pub(super) vshard_handler: Option<VShardEnvelopeHandler>,
75 pub(super) catalog: Option<Arc<ClusterCatalog>>,
79 pub(super) shutdown_watch: tokio::sync::watch::Sender<bool>,
94 pub(super) loop_metrics: Arc<LoopMetrics>,
98 pub(super) ready_watch: tokio::sync::watch::Sender<bool>,
111}
112
113impl<A: CommitApplier> RaftLoop<A> {
114 pub fn new(
115 multi_raft: MultiRaft,
116 transport: Arc<NexarTransport>,
117 topology: Arc<RwLock<ClusterTopology>>,
118 applier: A,
119 ) -> Self {
120 let node_id = multi_raft.node_id();
121 let (shutdown_watch, _) = tokio::sync::watch::channel(false);
122 let (ready_watch, _) = tokio::sync::watch::channel(false);
123 Self {
124 node_id,
125 multi_raft: Arc::new(Mutex::new(multi_raft)),
126 transport,
127 topology,
128 applier,
129 metadata_applier: Arc::new(NoopMetadataApplier),
130 plan_executor: Arc::new(NoopPlanExecutor),
131 tick_interval: DEFAULT_TICK_INTERVAL,
132 vshard_handler: None,
133 catalog: None,
134 shutdown_watch,
135 ready_watch,
136 loop_metrics: LoopMetrics::new("raft_tick_loop"),
137 }
138 }
139}
140
141impl<A: CommitApplier, P: PlanExecutor> RaftLoop<A, P> {
142 pub fn with_plan_executor<P2: PlanExecutor>(self, executor: Arc<P2>) -> RaftLoop<A, P2> {
144 RaftLoop {
145 node_id: self.node_id,
146 multi_raft: self.multi_raft,
147 transport: self.transport,
148 topology: self.topology,
149 applier: self.applier,
150 metadata_applier: self.metadata_applier,
151 plan_executor: executor,
152 tick_interval: self.tick_interval,
153 vshard_handler: self.vshard_handler,
154 catalog: self.catalog,
155 shutdown_watch: self.shutdown_watch,
156 ready_watch: self.ready_watch,
157 loop_metrics: self.loop_metrics,
158 }
159 }
160
161 pub fn loop_metrics(&self) -> Arc<LoopMetrics> {
163 Arc::clone(&self.loop_metrics)
164 }
165
166 pub fn pending_groups(&self) -> usize {
169 let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
170 mr.group_count()
171 }
172
173 pub fn begin_shutdown(&self) {
185 let _ = self.shutdown_watch.send(true);
186 }
187
188 pub fn subscribe_ready(&self) -> tokio::sync::watch::Receiver<bool> {
196 self.ready_watch.subscribe()
197 }
198
199 pub fn with_vshard_handler(mut self, handler: VShardEnvelopeHandler) -> Self {
201 self.vshard_handler = Some(handler);
202 self
203 }
204
205 pub fn with_metadata_applier(mut self, applier: Arc<dyn MetadataApplier>) -> Self {
212 self.metadata_applier = applier;
213 self
214 }
215
216 pub fn with_tick_interval(mut self, interval: Duration) -> Self {
217 self.tick_interval = interval;
218 self
219 }
220
221 pub fn with_catalog(mut self, catalog: Arc<ClusterCatalog>) -> Self {
224 self.catalog = Some(catalog);
225 self
226 }
227
228 pub fn node_id(&self) -> u64 {
230 self.node_id
231 }
232
233 pub async fn run(&self, mut shutdown: tokio::sync::watch::Receiver<bool>) {
244 let mut interval = tokio::time::interval(self.tick_interval);
245 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
246 self.loop_metrics.set_up(true);
247
248 loop {
249 tokio::select! {
250 _ = interval.tick() => {
251 let started = Instant::now();
252 self.do_tick();
253 self.loop_metrics.observe(started.elapsed());
254 }
255 _ = shutdown.changed() => {
256 if *shutdown.borrow() {
257 debug!("raft loop shutting down");
258 self.begin_shutdown();
259 break;
260 }
261 }
262 }
263 }
264 self.loop_metrics.set_up(false);
265 }
266
267 pub fn propose(&self, vshard_id: u16, data: Vec<u8>) -> Result<(u64, u64)> {
271 let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
272 mr.propose(vshard_id, data)
273 }
274
275 pub fn propose_to_metadata_group(&self, data: Vec<u8>) -> Result<u64> {
284 let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
285 mr.propose_to_group(crate::metadata_group::METADATA_GROUP_ID, data)
286 }
287
288 pub async fn propose_to_metadata_group_via_leader(&self, data: Vec<u8>) -> Result<u64> {
309 match self.propose_to_metadata_group(data.clone()) {
311 Ok(idx) => Ok(idx),
312 Err(crate::error::ClusterError::Raft(nodedb_raft::RaftError::NotLeader {
313 leader_hint,
314 })) => {
315 let Some(leader_id) = leader_hint else {
316 return Err(crate::error::ClusterError::Raft(
317 nodedb_raft::RaftError::NotLeader { leader_hint: None },
318 ));
319 };
320 if leader_id == self.node_id {
321 return Err(crate::error::ClusterError::Raft(
326 nodedb_raft::RaftError::NotLeader {
327 leader_hint: Some(leader_id),
328 },
329 ));
330 }
331 self.forward_metadata_propose(leader_id, data).await
333 }
334 Err(other) => Err(other),
335 }
336 }
337
338 async fn forward_metadata_propose(&self, leader_id: u64, data: Vec<u8>) -> Result<u64> {
342 {
349 let topo = self.topology.read().unwrap_or_else(|p| p.into_inner());
350 let Some(node) = topo.get_node(leader_id) else {
351 return Err(crate::error::ClusterError::Transport {
352 detail: format!(
353 "metadata propose forward: leader {leader_id} not in local topology"
354 ),
355 });
356 };
357 let Some(addr) = node.socket_addr() else {
358 return Err(crate::error::ClusterError::Transport {
359 detail: format!(
360 "metadata propose forward: leader {leader_id} has unparseable addr {:?}",
361 node.addr
362 ),
363 });
364 };
365 self.transport.register_peer(leader_id, addr);
367 }
368
369 let req = crate::rpc_codec::RaftRpc::MetadataProposeRequest(
370 crate::rpc_codec::MetadataProposeRequest { bytes: data },
371 );
372 let resp = self.transport.send_rpc(leader_id, req).await?;
373 match resp {
374 crate::rpc_codec::RaftRpc::MetadataProposeResponse(r) => {
375 if r.success {
376 Ok(r.log_index)
377 } else if let Some(hint) = r.leader_hint {
378 Err(crate::error::ClusterError::Raft(
383 nodedb_raft::RaftError::NotLeader {
384 leader_hint: Some(hint),
385 },
386 ))
387 } else {
388 Err(crate::error::ClusterError::Transport {
389 detail: format!("metadata propose forward failed: {}", r.error_message),
390 })
391 }
392 }
393 other => Err(crate::error::ClusterError::Transport {
394 detail: format!("metadata propose forward: unexpected response variant {other:?}"),
395 }),
396 }
397 }
398
399 pub fn multi_raft_handle(&self) -> Arc<Mutex<crate::multi_raft::MultiRaft>> {
404 self.multi_raft.clone()
405 }
406
407 pub fn group_statuses(&self) -> Vec<crate::multi_raft::GroupStatus> {
409 let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
410 mr.group_statuses()
411 }
412
413 pub fn propose_conf_change(&self, group_id: u64, change: &ConfChange) -> Result<(u64, u64)> {
417 let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
418 mr.propose_conf_change(group_id, change)
419 }
420}
421
422#[cfg(test)]
423mod tests {
424 use super::*;
425 use crate::routing::RoutingTable;
426 use nodedb_types::config::tuning::ClusterTransportTuning;
427 use std::sync::atomic::{AtomicU64, Ordering};
428 use std::time::Instant;
429
430 pub(crate) struct CountingApplier {
435 applied: Arc<AtomicU64>,
436 }
437
438 impl CountingApplier {
439 pub(crate) fn new() -> Self {
440 Self {
441 applied: Arc::new(AtomicU64::new(0)),
442 }
443 }
444
445 pub(crate) fn count(&self) -> u64 {
446 self.applied.load(Ordering::Relaxed)
447 }
448
449 pub(crate) fn metadata_applier(&self) -> Arc<CountingMetadataApplier> {
450 Arc::new(CountingMetadataApplier {
451 applied: self.applied.clone(),
452 })
453 }
454 }
455
456 impl CommitApplier for CountingApplier {
457 fn apply_committed(&self, _group_id: u64, entries: &[LogEntry]) -> u64 {
458 self.applied
459 .fetch_add(entries.len() as u64, Ordering::Relaxed);
460 entries.last().map(|e| e.index).unwrap_or(0)
461 }
462 }
463
464 pub(crate) struct CountingMetadataApplier {
465 applied: Arc<AtomicU64>,
466 }
467
468 impl MetadataApplier for CountingMetadataApplier {
469 fn apply(&self, entries: &[(u64, Vec<u8>)]) -> u64 {
470 self.applied
471 .fetch_add(entries.len() as u64, Ordering::Relaxed);
472 entries.last().map(|(idx, _)| *idx).unwrap_or(0)
473 }
474 }
475
476 fn make_transport(node_id: u64) -> Arc<NexarTransport> {
478 Arc::new(
479 NexarTransport::new(
480 node_id,
481 "127.0.0.1:0".parse().unwrap(),
482 crate::transport::credentials::TransportCredentials::Insecure,
483 )
484 .unwrap(),
485 )
486 }
487
488 #[tokio::test]
489 async fn single_node_raft_loop_commits() {
490 let dir = tempfile::tempdir().unwrap();
491 let transport = make_transport(1);
492 let rt = RoutingTable::uniform(1, &[1], 1);
493 let mut mr = MultiRaft::new(1, rt, dir.path().to_path_buf());
494 mr.add_group(0, vec![]).unwrap();
495
496 for node in mr.groups_mut().values_mut() {
497 node.election_deadline_override(Instant::now() - Duration::from_millis(1));
498 }
499
500 let applier = CountingApplier::new();
501 let meta = applier.metadata_applier();
502 let topo = Arc::new(RwLock::new(ClusterTopology::new()));
503 let raft_loop =
504 Arc::new(RaftLoop::new(mr, transport, topo, applier).with_metadata_applier(meta));
505
506 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
507
508 let rl = raft_loop.clone();
509 let run_handle = tokio::spawn(async move {
510 rl.run(shutdown_rx).await;
511 });
512
513 tokio::time::sleep(Duration::from_millis(50)).await;
514
515 assert!(
516 raft_loop.applier.count() >= 1,
517 "expected at least 1 applied entry (no-op), got {}",
518 raft_loop.applier.count()
519 );
520
521 let (_gid, idx) = raft_loop.propose(0, b"hello".to_vec()).unwrap();
522 assert!(idx >= 2);
523
524 tokio::time::sleep(Duration::from_millis(50)).await;
525
526 assert!(
527 raft_loop.applier.count() >= 2,
528 "expected at least 2 applied entries, got {}",
529 raft_loop.applier.count()
530 );
531
532 shutdown_tx.send(true).unwrap();
533 run_handle.abort();
534 }
535
536 #[tokio::test]
537 async fn three_node_election_over_quic() {
538 let t1 = make_transport(1);
539 let t2 = make_transport(2);
540 let t3 = make_transport(3);
541
542 t1.register_peer(2, t2.local_addr());
543 t1.register_peer(3, t3.local_addr());
544 t2.register_peer(1, t1.local_addr());
545 t2.register_peer(3, t3.local_addr());
546 t3.register_peer(1, t1.local_addr());
547 t3.register_peer(2, t2.local_addr());
548
549 let rt = RoutingTable::uniform(1, &[1, 2, 3], 3);
550
551 let dir1 = tempfile::tempdir().unwrap();
552 let mut mr1 = MultiRaft::new(1, rt.clone(), dir1.path().to_path_buf());
553 mr1.add_group(0, vec![2, 3]).unwrap();
554 for node in mr1.groups_mut().values_mut() {
555 node.election_deadline_override(Instant::now() - Duration::from_millis(1));
556 }
557
558 let transport_tuning = ClusterTransportTuning::default();
559 let election_timeout_min = Duration::from_secs(transport_tuning.election_timeout_min_secs);
560 let election_timeout_max = Duration::from_secs(transport_tuning.election_timeout_max_secs);
561
562 let dir2 = tempfile::tempdir().unwrap();
563 let mut mr2 = MultiRaft::new(2, rt.clone(), dir2.path().to_path_buf())
564 .with_election_timeout(election_timeout_min, election_timeout_max);
565 mr2.add_group(0, vec![1, 3]).unwrap();
566
567 let dir3 = tempfile::tempdir().unwrap();
568 let mut mr3 = MultiRaft::new(3, rt.clone(), dir3.path().to_path_buf())
569 .with_election_timeout(election_timeout_min, election_timeout_max);
570 mr3.add_group(0, vec![1, 2]).unwrap();
571
572 let a1 = CountingApplier::new();
573 let m1 = a1.metadata_applier();
574 let a2 = CountingApplier::new();
575 let m2 = a2.metadata_applier();
576 let a3 = CountingApplier::new();
577 let m3 = a3.metadata_applier();
578
579 let topo1 = Arc::new(RwLock::new(ClusterTopology::new()));
580 let topo2 = Arc::new(RwLock::new(ClusterTopology::new()));
581 let topo3 = Arc::new(RwLock::new(ClusterTopology::new()));
582
583 let rl1 = Arc::new(RaftLoop::new(mr1, t1.clone(), topo1, a1).with_metadata_applier(m1));
584 let rl2 = Arc::new(RaftLoop::new(mr2, t2.clone(), topo2, a2).with_metadata_applier(m2));
585 let rl3 = Arc::new(RaftLoop::new(mr3, t3.clone(), topo3, a3).with_metadata_applier(m3));
586
587 let (shutdown_tx, _) = tokio::sync::watch::channel(false);
588
589 let rl2_h = rl2.clone();
590 let sr2 = shutdown_tx.subscribe();
591 tokio::spawn(async move { t2.serve(rl2_h, sr2).await });
592
593 let rl3_h = rl3.clone();
594 let sr3 = shutdown_tx.subscribe();
595 tokio::spawn(async move { t3.serve(rl3_h, sr3).await });
596
597 let rl1_r = rl1.clone();
598 let sr1 = shutdown_tx.subscribe();
599 tokio::spawn(async move { rl1_r.run(sr1).await });
600
601 let rl2_r = rl2.clone();
602 let sr2r = shutdown_tx.subscribe();
603 tokio::spawn(async move { rl2_r.run(sr2r).await });
604
605 let rl3_r = rl3.clone();
606 let sr3r = shutdown_tx.subscribe();
607 tokio::spawn(async move { rl3_r.run(sr3r).await });
608
609 let rl1_h = rl1.clone();
610 let sr1h = shutdown_tx.subscribe();
611 tokio::spawn(async move { t1.serve(rl1_h, sr1h).await });
612
613 let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
615 loop {
616 if rl1.applier.count() >= 1 {
617 break;
618 }
619 assert!(
620 tokio::time::Instant::now() < deadline,
621 "node 1 should have committed at least the no-op, got {}",
622 rl1.applier.count()
623 );
624 tokio::time::sleep(Duration::from_millis(20)).await;
625 }
626
627 let (_gid, idx) = rl1.propose(0, b"distributed-cmd".to_vec()).unwrap();
628 assert!(idx >= 2);
629
630 let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
632 loop {
633 if rl1.applier.count() >= 2 && rl2.applier.count() >= 1 && rl3.applier.count() >= 1 {
634 break;
635 }
636 assert!(
637 tokio::time::Instant::now() < deadline,
638 "replication timed out: n1={}, n2={}, n3={}",
639 rl1.applier.count(),
640 rl2.applier.count(),
641 rl3.applier.count()
642 );
643 tokio::time::sleep(Duration::from_millis(20)).await;
644 }
645
646 shutdown_tx.send(true).unwrap();
647 }
648}