1use std::pin::Pin;
8use std::sync::{Arc, Mutex, RwLock};
9use std::time::Duration;
10
11use tracing::debug;
12
13use nodedb_raft::message::LogEntry;
14
15use crate::catalog::ClusterCatalog;
16use crate::conf_change::ConfChange;
17use crate::error::Result;
18use crate::forward::{NoopPlanExecutor, PlanExecutor};
19use crate::metadata_group::applier::{MetadataApplier, NoopMetadataApplier};
20use crate::multi_raft::MultiRaft;
21use crate::topology::ClusterTopology;
22use crate::transport::NexarTransport;
23
24pub(super) const DEFAULT_TICK_INTERVAL: Duration = Duration::from_millis(10);
28
29pub trait CommitApplier: Send + Sync + 'static {
34 fn apply_committed(&self, group_id: u64, entries: &[LogEntry]) -> u64;
38}
39
40pub type VShardEnvelopeHandler = Arc<
45 dyn Fn(Vec<u8>) -> Pin<Box<dyn std::future::Future<Output = Result<Vec<u8>>> + Send>>
46 + Send
47 + Sync,
48>;
49
50pub struct RaftLoop<A: CommitApplier, P: PlanExecutor = NoopPlanExecutor> {
61 pub(super) node_id: u64,
62 pub(super) multi_raft: Arc<Mutex<MultiRaft>>,
63 pub(super) transport: Arc<NexarTransport>,
64 pub(super) topology: Arc<RwLock<ClusterTopology>>,
65 pub(super) applier: A,
66 pub(super) metadata_applier: Arc<dyn MetadataApplier>,
68 pub(super) plan_executor: Arc<P>,
70 pub(super) tick_interval: Duration,
71 pub(super) vshard_handler: Option<VShardEnvelopeHandler>,
74 pub(super) catalog: Option<Arc<ClusterCatalog>>,
78 pub(super) shutdown_watch: tokio::sync::watch::Sender<bool>,
93 pub(super) ready_watch: tokio::sync::watch::Sender<bool>,
106}
107
108impl<A: CommitApplier> RaftLoop<A> {
109 pub fn new(
110 multi_raft: MultiRaft,
111 transport: Arc<NexarTransport>,
112 topology: Arc<RwLock<ClusterTopology>>,
113 applier: A,
114 ) -> Self {
115 let node_id = multi_raft.node_id();
116 let (shutdown_watch, _) = tokio::sync::watch::channel(false);
117 let (ready_watch, _) = tokio::sync::watch::channel(false);
118 Self {
119 node_id,
120 multi_raft: Arc::new(Mutex::new(multi_raft)),
121 transport,
122 topology,
123 applier,
124 metadata_applier: Arc::new(NoopMetadataApplier),
125 plan_executor: Arc::new(NoopPlanExecutor),
126 tick_interval: DEFAULT_TICK_INTERVAL,
127 vshard_handler: None,
128 catalog: None,
129 shutdown_watch,
130 ready_watch,
131 }
132 }
133}
134
135impl<A: CommitApplier, P: PlanExecutor> RaftLoop<A, P> {
136 pub fn with_plan_executor<P2: PlanExecutor>(self, executor: Arc<P2>) -> RaftLoop<A, P2> {
138 RaftLoop {
139 node_id: self.node_id,
140 multi_raft: self.multi_raft,
141 transport: self.transport,
142 topology: self.topology,
143 applier: self.applier,
144 metadata_applier: self.metadata_applier,
145 plan_executor: executor,
146 tick_interval: self.tick_interval,
147 vshard_handler: self.vshard_handler,
148 catalog: self.catalog,
149 shutdown_watch: self.shutdown_watch,
150 ready_watch: self.ready_watch,
151 }
152 }
153
154 pub fn begin_shutdown(&self) {
166 let _ = self.shutdown_watch.send(true);
167 }
168
169 pub fn subscribe_ready(&self) -> tokio::sync::watch::Receiver<bool> {
177 self.ready_watch.subscribe()
178 }
179
180 pub fn with_vshard_handler(mut self, handler: VShardEnvelopeHandler) -> Self {
182 self.vshard_handler = Some(handler);
183 self
184 }
185
186 pub fn with_metadata_applier(mut self, applier: Arc<dyn MetadataApplier>) -> Self {
193 self.metadata_applier = applier;
194 self
195 }
196
197 pub fn with_tick_interval(mut self, interval: Duration) -> Self {
198 self.tick_interval = interval;
199 self
200 }
201
202 pub fn with_catalog(mut self, catalog: Arc<ClusterCatalog>) -> Self {
205 self.catalog = Some(catalog);
206 self
207 }
208
209 pub fn node_id(&self) -> u64 {
211 self.node_id
212 }
213
214 pub async fn run(&self, mut shutdown: tokio::sync::watch::Receiver<bool>) {
225 let mut interval = tokio::time::interval(self.tick_interval);
226 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
227
228 loop {
229 tokio::select! {
230 _ = interval.tick() => {
231 self.do_tick();
232 }
233 _ = shutdown.changed() => {
234 if *shutdown.borrow() {
235 debug!("raft loop shutting down");
236 self.begin_shutdown();
237 break;
238 }
239 }
240 }
241 }
242 }
243
244 pub fn propose(&self, vshard_id: u16, data: Vec<u8>) -> Result<(u64, u64)> {
248 let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
249 mr.propose(vshard_id, data)
250 }
251
252 pub fn propose_to_metadata_group(&self, data: Vec<u8>) -> Result<u64> {
261 let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
262 mr.propose_to_group(crate::metadata_group::METADATA_GROUP_ID, data)
263 }
264
265 pub async fn propose_to_metadata_group_via_leader(&self, data: Vec<u8>) -> Result<u64> {
286 match self.propose_to_metadata_group(data.clone()) {
288 Ok(idx) => Ok(idx),
289 Err(crate::error::ClusterError::Raft(nodedb_raft::RaftError::NotLeader {
290 leader_hint,
291 })) => {
292 let Some(leader_id) = leader_hint else {
293 return Err(crate::error::ClusterError::Raft(
294 nodedb_raft::RaftError::NotLeader { leader_hint: None },
295 ));
296 };
297 if leader_id == self.node_id {
298 return Err(crate::error::ClusterError::Raft(
303 nodedb_raft::RaftError::NotLeader {
304 leader_hint: Some(leader_id),
305 },
306 ));
307 }
308 self.forward_metadata_propose(leader_id, data).await
310 }
311 Err(other) => Err(other),
312 }
313 }
314
315 async fn forward_metadata_propose(&self, leader_id: u64, data: Vec<u8>) -> Result<u64> {
319 {
326 let topo = self.topology.read().unwrap_or_else(|p| p.into_inner());
327 let Some(node) = topo.get_node(leader_id) else {
328 return Err(crate::error::ClusterError::Transport {
329 detail: format!(
330 "metadata propose forward: leader {leader_id} not in local topology"
331 ),
332 });
333 };
334 let Some(addr) = node.socket_addr() else {
335 return Err(crate::error::ClusterError::Transport {
336 detail: format!(
337 "metadata propose forward: leader {leader_id} has unparseable addr {:?}",
338 node.addr
339 ),
340 });
341 };
342 self.transport.register_peer(leader_id, addr);
344 }
345
346 let req = crate::rpc_codec::RaftRpc::MetadataProposeRequest(
347 crate::rpc_codec::MetadataProposeRequest { bytes: data },
348 );
349 let resp = self.transport.send_rpc(leader_id, req).await?;
350 match resp {
351 crate::rpc_codec::RaftRpc::MetadataProposeResponse(r) => {
352 if r.success {
353 Ok(r.log_index)
354 } else if let Some(hint) = r.leader_hint {
355 Err(crate::error::ClusterError::Raft(
360 nodedb_raft::RaftError::NotLeader {
361 leader_hint: Some(hint),
362 },
363 ))
364 } else {
365 Err(crate::error::ClusterError::Transport {
366 detail: format!("metadata propose forward failed: {}", r.error_message),
367 })
368 }
369 }
370 other => Err(crate::error::ClusterError::Transport {
371 detail: format!("metadata propose forward: unexpected response variant {other:?}"),
372 }),
373 }
374 }
375
376 pub fn multi_raft_handle(&self) -> Arc<Mutex<crate::multi_raft::MultiRaft>> {
381 self.multi_raft.clone()
382 }
383
384 pub fn group_statuses(&self) -> Vec<crate::multi_raft::GroupStatus> {
386 let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
387 mr.group_statuses()
388 }
389
390 pub fn propose_conf_change(&self, group_id: u64, change: &ConfChange) -> Result<(u64, u64)> {
394 let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
395 mr.propose_conf_change(group_id, change)
396 }
397}
398
399#[cfg(test)]
400mod tests {
401 use super::*;
402 use crate::routing::RoutingTable;
403 use nodedb_types::config::tuning::ClusterTransportTuning;
404 use std::sync::atomic::{AtomicU64, Ordering};
405 use std::time::Instant;
406
407 pub(crate) struct CountingApplier {
412 applied: Arc<AtomicU64>,
413 }
414
415 impl CountingApplier {
416 pub(crate) fn new() -> Self {
417 Self {
418 applied: Arc::new(AtomicU64::new(0)),
419 }
420 }
421
422 pub(crate) fn count(&self) -> u64 {
423 self.applied.load(Ordering::Relaxed)
424 }
425
426 pub(crate) fn metadata_applier(&self) -> Arc<CountingMetadataApplier> {
427 Arc::new(CountingMetadataApplier {
428 applied: self.applied.clone(),
429 })
430 }
431 }
432
433 impl CommitApplier for CountingApplier {
434 fn apply_committed(&self, _group_id: u64, entries: &[LogEntry]) -> u64 {
435 self.applied
436 .fetch_add(entries.len() as u64, Ordering::Relaxed);
437 entries.last().map(|e| e.index).unwrap_or(0)
438 }
439 }
440
441 pub(crate) struct CountingMetadataApplier {
442 applied: Arc<AtomicU64>,
443 }
444
445 impl MetadataApplier for CountingMetadataApplier {
446 fn apply(&self, entries: &[(u64, Vec<u8>)]) -> u64 {
447 self.applied
448 .fetch_add(entries.len() as u64, Ordering::Relaxed);
449 entries.last().map(|(idx, _)| *idx).unwrap_or(0)
450 }
451 }
452
453 fn make_transport(node_id: u64) -> Arc<NexarTransport> {
455 Arc::new(NexarTransport::new(node_id, "127.0.0.1:0".parse().unwrap()).unwrap())
456 }
457
458 #[tokio::test]
459 async fn single_node_raft_loop_commits() {
460 let dir = tempfile::tempdir().unwrap();
461 let transport = make_transport(1);
462 let rt = RoutingTable::uniform(1, &[1], 1);
463 let mut mr = MultiRaft::new(1, rt, dir.path().to_path_buf());
464 mr.add_group(0, vec![]).unwrap();
465
466 for node in mr.groups_mut().values_mut() {
467 node.election_deadline_override(Instant::now() - Duration::from_millis(1));
468 }
469
470 let applier = CountingApplier::new();
471 let meta = applier.metadata_applier();
472 let topo = Arc::new(RwLock::new(ClusterTopology::new()));
473 let raft_loop =
474 Arc::new(RaftLoop::new(mr, transport, topo, applier).with_metadata_applier(meta));
475
476 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
477
478 let rl = raft_loop.clone();
479 let run_handle = tokio::spawn(async move {
480 rl.run(shutdown_rx).await;
481 });
482
483 tokio::time::sleep(Duration::from_millis(50)).await;
484
485 assert!(
486 raft_loop.applier.count() >= 1,
487 "expected at least 1 applied entry (no-op), got {}",
488 raft_loop.applier.count()
489 );
490
491 let (_gid, idx) = raft_loop.propose(0, b"hello".to_vec()).unwrap();
492 assert!(idx >= 2);
493
494 tokio::time::sleep(Duration::from_millis(50)).await;
495
496 assert!(
497 raft_loop.applier.count() >= 2,
498 "expected at least 2 applied entries, got {}",
499 raft_loop.applier.count()
500 );
501
502 shutdown_tx.send(true).unwrap();
503 run_handle.abort();
504 }
505
506 #[tokio::test]
507 async fn three_node_election_over_quic() {
508 let t1 = make_transport(1);
509 let t2 = make_transport(2);
510 let t3 = make_transport(3);
511
512 t1.register_peer(2, t2.local_addr());
513 t1.register_peer(3, t3.local_addr());
514 t2.register_peer(1, t1.local_addr());
515 t2.register_peer(3, t3.local_addr());
516 t3.register_peer(1, t1.local_addr());
517 t3.register_peer(2, t2.local_addr());
518
519 let rt = RoutingTable::uniform(1, &[1, 2, 3], 3);
520
521 let dir1 = tempfile::tempdir().unwrap();
522 let mut mr1 = MultiRaft::new(1, rt.clone(), dir1.path().to_path_buf());
523 mr1.add_group(0, vec![2, 3]).unwrap();
524 for node in mr1.groups_mut().values_mut() {
525 node.election_deadline_override(Instant::now() - Duration::from_millis(1));
526 }
527
528 let transport_tuning = ClusterTransportTuning::default();
529 let election_timeout_min = Duration::from_secs(transport_tuning.election_timeout_min_secs);
530 let election_timeout_max = Duration::from_secs(transport_tuning.election_timeout_max_secs);
531
532 let dir2 = tempfile::tempdir().unwrap();
533 let mut mr2 = MultiRaft::new(2, rt.clone(), dir2.path().to_path_buf())
534 .with_election_timeout(election_timeout_min, election_timeout_max);
535 mr2.add_group(0, vec![1, 3]).unwrap();
536
537 let dir3 = tempfile::tempdir().unwrap();
538 let mut mr3 = MultiRaft::new(3, rt.clone(), dir3.path().to_path_buf())
539 .with_election_timeout(election_timeout_min, election_timeout_max);
540 mr3.add_group(0, vec![1, 2]).unwrap();
541
542 let a1 = CountingApplier::new();
543 let m1 = a1.metadata_applier();
544 let a2 = CountingApplier::new();
545 let m2 = a2.metadata_applier();
546 let a3 = CountingApplier::new();
547 let m3 = a3.metadata_applier();
548
549 let topo1 = Arc::new(RwLock::new(ClusterTopology::new()));
550 let topo2 = Arc::new(RwLock::new(ClusterTopology::new()));
551 let topo3 = Arc::new(RwLock::new(ClusterTopology::new()));
552
553 let rl1 = Arc::new(RaftLoop::new(mr1, t1.clone(), topo1, a1).with_metadata_applier(m1));
554 let rl2 = Arc::new(RaftLoop::new(mr2, t2.clone(), topo2, a2).with_metadata_applier(m2));
555 let rl3 = Arc::new(RaftLoop::new(mr3, t3.clone(), topo3, a3).with_metadata_applier(m3));
556
557 let (shutdown_tx, _) = tokio::sync::watch::channel(false);
558
559 let rl2_h = rl2.clone();
560 let sr2 = shutdown_tx.subscribe();
561 tokio::spawn(async move { t2.serve(rl2_h, sr2).await });
562
563 let rl3_h = rl3.clone();
564 let sr3 = shutdown_tx.subscribe();
565 tokio::spawn(async move { t3.serve(rl3_h, sr3).await });
566
567 let rl1_r = rl1.clone();
568 let sr1 = shutdown_tx.subscribe();
569 tokio::spawn(async move { rl1_r.run(sr1).await });
570
571 let rl2_r = rl2.clone();
572 let sr2r = shutdown_tx.subscribe();
573 tokio::spawn(async move { rl2_r.run(sr2r).await });
574
575 let rl3_r = rl3.clone();
576 let sr3r = shutdown_tx.subscribe();
577 tokio::spawn(async move { rl3_r.run(sr3r).await });
578
579 let rl1_h = rl1.clone();
580 let sr1h = shutdown_tx.subscribe();
581 tokio::spawn(async move { t1.serve(rl1_h, sr1h).await });
582
583 tokio::time::sleep(Duration::from_millis(200)).await;
584
585 assert!(
586 rl1.applier.count() >= 1,
587 "node 1 should have committed at least the no-op, got {}",
588 rl1.applier.count()
589 );
590
591 let (_gid, idx) = rl1.propose(0, b"distributed-cmd".to_vec()).unwrap();
592 assert!(idx >= 2);
593
594 tokio::time::sleep(Duration::from_millis(200)).await;
595
596 assert!(
597 rl1.applier.count() >= 2,
598 "node 1: expected >= 2 applied, got {}",
599 rl1.applier.count()
600 );
601
602 assert!(
603 rl2.applier.count() >= 1,
604 "node 2: expected >= 1 applied, got {}",
605 rl2.applier.count()
606 );
607 assert!(
608 rl3.applier.count() >= 1,
609 "node 3: expected >= 1 applied, got {}",
610 rl3.applier.count()
611 );
612
613 shutdown_tx.send(true).unwrap();
614 }
615}