1use std::pin::Pin;
8use std::sync::{Arc, Mutex, RwLock};
9use std::time::Duration;
10
11use tracing::debug;
12
13use nodedb_raft::message::LogEntry;
14
15use crate::catalog::ClusterCatalog;
16use crate::conf_change::ConfChange;
17use crate::error::Result;
18use crate::forward::RequestForwarder;
19use crate::metadata_group::applier::{MetadataApplier, NoopMetadataApplier};
20use crate::multi_raft::MultiRaft;
21use crate::topology::ClusterTopology;
22use crate::transport::NexarTransport;
23
24pub(super) const DEFAULT_TICK_INTERVAL: Duration = Duration::from_millis(10);
28
29pub trait CommitApplier: Send + Sync + 'static {
34 fn apply_committed(&self, group_id: u64, entries: &[LogEntry]) -> u64;
38}
39
40pub type VShardEnvelopeHandler = Arc<
45 dyn Fn(Vec<u8>) -> Pin<Box<dyn std::future::Future<Output = Result<Vec<u8>>> + Send>>
46 + Send
47 + Sync,
48>;
49
50pub struct RaftLoop<A: CommitApplier, F: RequestForwarder = crate::forward::NoopForwarder> {
57 pub(super) node_id: u64,
58 pub(super) multi_raft: Arc<Mutex<MultiRaft>>,
59 pub(super) transport: Arc<NexarTransport>,
60 pub(super) topology: Arc<RwLock<ClusterTopology>>,
61 pub(super) applier: A,
62 pub(super) metadata_applier: Arc<dyn MetadataApplier>,
66 pub(super) forwarder: Arc<F>,
67 pub(super) tick_interval: Duration,
68 pub(super) vshard_handler: Option<VShardEnvelopeHandler>,
71 pub(super) catalog: Option<Arc<ClusterCatalog>>,
75 pub(super) shutdown_watch: tokio::sync::watch::Sender<bool>,
90 pub(super) ready_watch: tokio::sync::watch::Sender<bool>,
103}
104
105impl<A: CommitApplier> RaftLoop<A> {
106 pub fn new(
107 multi_raft: MultiRaft,
108 transport: Arc<NexarTransport>,
109 topology: Arc<RwLock<ClusterTopology>>,
110 applier: A,
111 ) -> Self {
112 let node_id = multi_raft.node_id();
113 let (shutdown_watch, _) = tokio::sync::watch::channel(false);
114 let (ready_watch, _) = tokio::sync::watch::channel(false);
115 Self {
116 node_id,
117 multi_raft: Arc::new(Mutex::new(multi_raft)),
118 transport,
119 topology,
120 applier,
121 metadata_applier: Arc::new(NoopMetadataApplier),
122 forwarder: Arc::new(crate::forward::NoopForwarder),
123 tick_interval: DEFAULT_TICK_INTERVAL,
124 vshard_handler: None,
125 catalog: None,
126 shutdown_watch,
127 ready_watch,
128 }
129 }
130}
131
132impl<A: CommitApplier, F: RequestForwarder> RaftLoop<A, F> {
133 pub fn with_forwarder(
135 multi_raft: MultiRaft,
136 transport: Arc<NexarTransport>,
137 topology: Arc<RwLock<ClusterTopology>>,
138 applier: A,
139 forwarder: Arc<F>,
140 ) -> Self {
141 let node_id = multi_raft.node_id();
142 let (shutdown_watch, _) = tokio::sync::watch::channel(false);
143 let (ready_watch, _) = tokio::sync::watch::channel(false);
144 Self {
145 node_id,
146 multi_raft: Arc::new(Mutex::new(multi_raft)),
147 transport,
148 topology,
149 applier,
150 metadata_applier: Arc::new(NoopMetadataApplier),
151 forwarder,
152 tick_interval: DEFAULT_TICK_INTERVAL,
153 vshard_handler: None,
154 catalog: None,
155 shutdown_watch,
156 ready_watch,
157 }
158 }
159
160 pub fn begin_shutdown(&self) {
172 let _ = self.shutdown_watch.send(true);
173 }
174
175 pub fn subscribe_ready(&self) -> tokio::sync::watch::Receiver<bool> {
183 self.ready_watch.subscribe()
184 }
185
186 pub fn with_vshard_handler(mut self, handler: VShardEnvelopeHandler) -> Self {
188 self.vshard_handler = Some(handler);
189 self
190 }
191
192 pub fn with_metadata_applier(mut self, applier: Arc<dyn MetadataApplier>) -> Self {
199 self.metadata_applier = applier;
200 self
201 }
202
203 pub fn with_tick_interval(mut self, interval: Duration) -> Self {
204 self.tick_interval = interval;
205 self
206 }
207
208 pub fn with_catalog(mut self, catalog: Arc<ClusterCatalog>) -> Self {
211 self.catalog = Some(catalog);
212 self
213 }
214
215 pub fn node_id(&self) -> u64 {
217 self.node_id
218 }
219
220 pub async fn run(&self, mut shutdown: tokio::sync::watch::Receiver<bool>) {
231 let mut interval = tokio::time::interval(self.tick_interval);
232 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
233
234 loop {
235 tokio::select! {
236 _ = interval.tick() => {
237 self.do_tick();
238 }
239 _ = shutdown.changed() => {
240 if *shutdown.borrow() {
241 debug!("raft loop shutting down");
242 self.begin_shutdown();
243 break;
244 }
245 }
246 }
247 }
248 }
249
250 pub fn propose(&self, vshard_id: u16, data: Vec<u8>) -> Result<(u64, u64)> {
254 let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
255 mr.propose(vshard_id, data)
256 }
257
258 pub fn propose_to_metadata_group(&self, data: Vec<u8>) -> Result<u64> {
267 let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
268 mr.propose_to_group(crate::metadata_group::METADATA_GROUP_ID, data)
269 }
270
271 pub async fn propose_to_metadata_group_via_leader(&self, data: Vec<u8>) -> Result<u64> {
292 match self.propose_to_metadata_group(data.clone()) {
294 Ok(idx) => Ok(idx),
295 Err(crate::error::ClusterError::Raft(nodedb_raft::RaftError::NotLeader {
296 leader_hint,
297 })) => {
298 let Some(leader_id) = leader_hint else {
299 return Err(crate::error::ClusterError::Raft(
300 nodedb_raft::RaftError::NotLeader { leader_hint: None },
301 ));
302 };
303 if leader_id == self.node_id {
304 return Err(crate::error::ClusterError::Raft(
309 nodedb_raft::RaftError::NotLeader {
310 leader_hint: Some(leader_id),
311 },
312 ));
313 }
314 self.forward_metadata_propose(leader_id, data).await
316 }
317 Err(other) => Err(other),
318 }
319 }
320
321 async fn forward_metadata_propose(&self, leader_id: u64, data: Vec<u8>) -> Result<u64> {
325 {
332 let topo = self.topology.read().unwrap_or_else(|p| p.into_inner());
333 let Some(node) = topo.get_node(leader_id) else {
334 return Err(crate::error::ClusterError::Transport {
335 detail: format!(
336 "metadata propose forward: leader {leader_id} not in local topology"
337 ),
338 });
339 };
340 let Some(addr) = node.socket_addr() else {
341 return Err(crate::error::ClusterError::Transport {
342 detail: format!(
343 "metadata propose forward: leader {leader_id} has unparseable addr {:?}",
344 node.addr
345 ),
346 });
347 };
348 self.transport.register_peer(leader_id, addr);
350 }
351
352 let req = crate::rpc_codec::RaftRpc::MetadataProposeRequest(
353 crate::rpc_codec::MetadataProposeRequest { bytes: data },
354 );
355 let resp = self.transport.send_rpc(leader_id, req).await?;
356 match resp {
357 crate::rpc_codec::RaftRpc::MetadataProposeResponse(r) => {
358 if r.success {
359 Ok(r.log_index)
360 } else if let Some(hint) = r.leader_hint {
361 Err(crate::error::ClusterError::Raft(
366 nodedb_raft::RaftError::NotLeader {
367 leader_hint: Some(hint),
368 },
369 ))
370 } else {
371 Err(crate::error::ClusterError::Transport {
372 detail: format!("metadata propose forward failed: {}", r.error_message),
373 })
374 }
375 }
376 other => Err(crate::error::ClusterError::Transport {
377 detail: format!("metadata propose forward: unexpected response variant {other:?}"),
378 }),
379 }
380 }
381
382 pub fn multi_raft_handle(&self) -> Arc<Mutex<crate::multi_raft::MultiRaft>> {
387 self.multi_raft.clone()
388 }
389
390 pub fn group_statuses(&self) -> Vec<crate::multi_raft::GroupStatus> {
392 let mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
393 mr.group_statuses()
394 }
395
396 pub fn propose_conf_change(&self, group_id: u64, change: &ConfChange) -> Result<(u64, u64)> {
400 let mut mr = self.multi_raft.lock().unwrap_or_else(|p| p.into_inner());
401 mr.propose_conf_change(group_id, change)
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408 use crate::routing::RoutingTable;
409 use nodedb_types::config::tuning::ClusterTransportTuning;
410 use std::sync::atomic::{AtomicU64, Ordering};
411 use std::time::Instant;
412
413 pub(crate) struct CountingApplier {
418 applied: Arc<AtomicU64>,
419 }
420
421 impl CountingApplier {
422 pub(crate) fn new() -> Self {
423 Self {
424 applied: Arc::new(AtomicU64::new(0)),
425 }
426 }
427
428 pub(crate) fn count(&self) -> u64 {
429 self.applied.load(Ordering::Relaxed)
430 }
431
432 pub(crate) fn metadata_applier(&self) -> Arc<CountingMetadataApplier> {
433 Arc::new(CountingMetadataApplier {
434 applied: self.applied.clone(),
435 })
436 }
437 }
438
439 impl CommitApplier for CountingApplier {
440 fn apply_committed(&self, _group_id: u64, entries: &[LogEntry]) -> u64 {
441 self.applied
442 .fetch_add(entries.len() as u64, Ordering::Relaxed);
443 entries.last().map(|e| e.index).unwrap_or(0)
444 }
445 }
446
447 pub(crate) struct CountingMetadataApplier {
448 applied: Arc<AtomicU64>,
449 }
450
451 impl MetadataApplier for CountingMetadataApplier {
452 fn apply(&self, entries: &[(u64, Vec<u8>)]) -> u64 {
453 self.applied
454 .fetch_add(entries.len() as u64, Ordering::Relaxed);
455 entries.last().map(|(idx, _)| *idx).unwrap_or(0)
456 }
457 }
458
459 fn make_transport(node_id: u64) -> Arc<NexarTransport> {
461 Arc::new(NexarTransport::new(node_id, "127.0.0.1:0".parse().unwrap()).unwrap())
462 }
463
464 #[tokio::test]
465 async fn single_node_raft_loop_commits() {
466 let dir = tempfile::tempdir().unwrap();
467 let transport = make_transport(1);
468 let rt = RoutingTable::uniform(1, &[1], 1);
469 let mut mr = MultiRaft::new(1, rt, dir.path().to_path_buf());
470 mr.add_group(0, vec![]).unwrap();
471
472 for node in mr.groups_mut().values_mut() {
473 node.election_deadline_override(Instant::now() - Duration::from_millis(1));
474 }
475
476 let applier = CountingApplier::new();
477 let meta = applier.metadata_applier();
478 let topo = Arc::new(RwLock::new(ClusterTopology::new()));
479 let raft_loop =
480 Arc::new(RaftLoop::new(mr, transport, topo, applier).with_metadata_applier(meta));
481
482 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
483
484 let rl = raft_loop.clone();
485 let run_handle = tokio::spawn(async move {
486 rl.run(shutdown_rx).await;
487 });
488
489 tokio::time::sleep(Duration::from_millis(50)).await;
490
491 assert!(
492 raft_loop.applier.count() >= 1,
493 "expected at least 1 applied entry (no-op), got {}",
494 raft_loop.applier.count()
495 );
496
497 let (_gid, idx) = raft_loop.propose(0, b"hello".to_vec()).unwrap();
498 assert!(idx >= 2);
499
500 tokio::time::sleep(Duration::from_millis(50)).await;
501
502 assert!(
503 raft_loop.applier.count() >= 2,
504 "expected at least 2 applied entries, got {}",
505 raft_loop.applier.count()
506 );
507
508 shutdown_tx.send(true).unwrap();
509 run_handle.abort();
510 }
511
512 #[tokio::test]
513 async fn three_node_election_over_quic() {
514 let t1 = make_transport(1);
515 let t2 = make_transport(2);
516 let t3 = make_transport(3);
517
518 t1.register_peer(2, t2.local_addr());
519 t1.register_peer(3, t3.local_addr());
520 t2.register_peer(1, t1.local_addr());
521 t2.register_peer(3, t3.local_addr());
522 t3.register_peer(1, t1.local_addr());
523 t3.register_peer(2, t2.local_addr());
524
525 let rt = RoutingTable::uniform(1, &[1, 2, 3], 3);
526
527 let dir1 = tempfile::tempdir().unwrap();
528 let mut mr1 = MultiRaft::new(1, rt.clone(), dir1.path().to_path_buf());
529 mr1.add_group(0, vec![2, 3]).unwrap();
530 for node in mr1.groups_mut().values_mut() {
531 node.election_deadline_override(Instant::now() - Duration::from_millis(1));
532 }
533
534 let transport_tuning = ClusterTransportTuning::default();
535 let election_timeout_min = Duration::from_secs(transport_tuning.election_timeout_min_secs);
536 let election_timeout_max = Duration::from_secs(transport_tuning.election_timeout_max_secs);
537
538 let dir2 = tempfile::tempdir().unwrap();
539 let mut mr2 = MultiRaft::new(2, rt.clone(), dir2.path().to_path_buf())
540 .with_election_timeout(election_timeout_min, election_timeout_max);
541 mr2.add_group(0, vec![1, 3]).unwrap();
542
543 let dir3 = tempfile::tempdir().unwrap();
544 let mut mr3 = MultiRaft::new(3, rt.clone(), dir3.path().to_path_buf())
545 .with_election_timeout(election_timeout_min, election_timeout_max);
546 mr3.add_group(0, vec![1, 2]).unwrap();
547
548 let a1 = CountingApplier::new();
549 let m1 = a1.metadata_applier();
550 let a2 = CountingApplier::new();
551 let m2 = a2.metadata_applier();
552 let a3 = CountingApplier::new();
553 let m3 = a3.metadata_applier();
554
555 let topo1 = Arc::new(RwLock::new(ClusterTopology::new()));
556 let topo2 = Arc::new(RwLock::new(ClusterTopology::new()));
557 let topo3 = Arc::new(RwLock::new(ClusterTopology::new()));
558
559 let rl1 = Arc::new(RaftLoop::new(mr1, t1.clone(), topo1, a1).with_metadata_applier(m1));
560 let rl2 = Arc::new(RaftLoop::new(mr2, t2.clone(), topo2, a2).with_metadata_applier(m2));
561 let rl3 = Arc::new(RaftLoop::new(mr3, t3.clone(), topo3, a3).with_metadata_applier(m3));
562
563 let (shutdown_tx, _) = tokio::sync::watch::channel(false);
564
565 let rl2_h = rl2.clone();
566 let sr2 = shutdown_tx.subscribe();
567 tokio::spawn(async move { t2.serve(rl2_h, sr2).await });
568
569 let rl3_h = rl3.clone();
570 let sr3 = shutdown_tx.subscribe();
571 tokio::spawn(async move { t3.serve(rl3_h, sr3).await });
572
573 let rl1_r = rl1.clone();
574 let sr1 = shutdown_tx.subscribe();
575 tokio::spawn(async move { rl1_r.run(sr1).await });
576
577 let rl2_r = rl2.clone();
578 let sr2r = shutdown_tx.subscribe();
579 tokio::spawn(async move { rl2_r.run(sr2r).await });
580
581 let rl3_r = rl3.clone();
582 let sr3r = shutdown_tx.subscribe();
583 tokio::spawn(async move { rl3_r.run(sr3r).await });
584
585 let rl1_h = rl1.clone();
586 let sr1h = shutdown_tx.subscribe();
587 tokio::spawn(async move { t1.serve(rl1_h, sr1h).await });
588
589 tokio::time::sleep(Duration::from_millis(200)).await;
590
591 assert!(
592 rl1.applier.count() >= 1,
593 "node 1 should have committed at least the no-op, got {}",
594 rl1.applier.count()
595 );
596
597 let (_gid, idx) = rl1.propose(0, b"distributed-cmd".to_vec()).unwrap();
598 assert!(idx >= 2);
599
600 tokio::time::sleep(Duration::from_millis(200)).await;
601
602 assert!(
603 rl1.applier.count() >= 2,
604 "node 1: expected >= 2 applied, got {}",
605 rl1.applier.count()
606 );
607
608 assert!(
609 rl2.applier.count() >= 1,
610 "node 2: expected >= 1 applied, got {}",
611 rl2.applier.count()
612 );
613 assert!(
614 rl3.applier.count() >= 1,
615 "node 3: expected >= 1 applied, got {}",
616 rl3.applier.count()
617 );
618
619 shutdown_tx.send(true).unwrap();
620 }
621}