1use rand::Rng;
2use std::collections::HashSet;
3use std::sync::Arc;
4use std::time::Duration;
5use tokio::time::{interval, timeout};
6use tracing::{debug, error, info, warn};
7
8use rabia_core::{
9 messages::{
10 DecisionMessage, HeartBeatMessage, MessageType, NewBatchMessage, ProposeMessage,
11 ProtocolMessage, SyncRequestMessage, SyncResponseMessage, VoteRound1Message,
12 VoteRound2Message,
13 },
14 network::{ClusterConfig, NetworkEventHandler, NetworkTransport},
15 persistence::PersistenceLayer,
16 state_machine::StateMachine,
17 BatchId, CommandBatch, NodeId, PhaseId, RabiaError, Result, StateValue, Validator,
18};
19
20use crate::{
21 network::TcpNetwork, CommandRequest, EngineCommand, EngineCommandReceiver, EngineState,
22 LeaderSelector, RabiaConfig,
23};
24
25pub struct RabiaEngine<SM, NT, PL>
26where
27 SM: StateMachine + 'static,
28 NT: NetworkTransport + 'static,
29 PL: PersistenceLayer + 'static,
30{
31 node_id: NodeId,
32 config: RabiaConfig,
33 #[allow(dead_code)]
34 cluster_config: ClusterConfig,
35 state_machine: Arc<tokio::sync::Mutex<SM>>,
36 network: Arc<tokio::sync::Mutex<NT>>,
37 persistence: Arc<PL>,
38 engine_state: Arc<EngineState>,
39 command_rx: EngineCommandReceiver,
40 rng: rand::rngs::StdRng,
41 leader_selector: LeaderSelector,
42}
43
44impl<SM, NT, PL> RabiaEngine<SM, NT, PL>
45where
46 SM: StateMachine + 'static,
47 NT: NetworkTransport + 'static,
48 PL: PersistenceLayer + 'static,
49{
50 pub fn new(
51 node_id: NodeId,
52 config: RabiaConfig,
53 cluster_config: ClusterConfig,
54 state_machine: SM,
55 network: NT,
56 persistence: PL,
57 command_rx: EngineCommandReceiver,
58 ) -> Self {
59 let rng = match config.randomization_seed {
60 Some(seed) => rand::SeedableRng::seed_from_u64(seed),
61 None => rand::SeedableRng::from_entropy(),
62 };
63
64 let leader_selector = LeaderSelector::with_cluster(cluster_config.all_nodes.clone());
65
66 Self {
67 node_id,
68 config: config.clone(),
69 cluster_config: cluster_config.clone(),
70 state_machine: Arc::new(tokio::sync::Mutex::new(state_machine)),
71 network: Arc::new(tokio::sync::Mutex::new(network)),
72 persistence: Arc::new(persistence),
73 engine_state: Arc::new(EngineState::new(cluster_config.quorum_size)),
74 command_rx,
75 rng,
76 leader_selector,
77 }
78 }
79}
80
81impl<SM, PL> RabiaEngine<SM, TcpNetwork, PL>
82where
83 SM: StateMachine + 'static,
84 PL: PersistenceLayer + 'static,
85{
86 pub async fn new_with_tcp(
88 node_id: NodeId,
89 config: RabiaConfig,
90 cluster_config: ClusterConfig,
91 state_machine: SM,
92 persistence: PL,
93 command_rx: EngineCommandReceiver,
94 ) -> Result<Self> {
95 let network = TcpNetwork::new(node_id, config.network_config.clone()).await?;
97
98 let rng = match config.randomization_seed {
99 Some(seed) => rand::SeedableRng::seed_from_u64(seed),
100 None => rand::SeedableRng::from_entropy(),
101 };
102
103 let leader_selector = LeaderSelector::with_cluster(cluster_config.all_nodes.clone());
104
105 Ok(Self {
106 node_id,
107 config: config.clone(),
108 cluster_config: cluster_config.clone(),
109 state_machine: Arc::new(tokio::sync::Mutex::new(state_machine)),
110 network: Arc::new(tokio::sync::Mutex::new(network)),
111 persistence: Arc::new(persistence),
112 engine_state: Arc::new(EngineState::new(cluster_config.quorum_size)),
113 command_rx,
114 rng,
115 leader_selector,
116 })
117 }
118}
119
120impl<SM, NT, PL> RabiaEngine<SM, NT, PL>
121where
122 SM: StateMachine + 'static,
123 NT: NetworkTransport + 'static,
124 PL: PersistenceLayer + 'static,
125{
126 pub fn get_leader(&self) -> Option<NodeId> {
128 self.leader_selector.get_leader()
129 }
130
131 pub fn is_leader(&self) -> bool {
133 self.leader_selector.is_leader(self.node_id)
134 }
135
136 pub fn get_leadership_info(&self) -> crate::LeadershipInfo {
138 self.leader_selector.get_leadership_info()
139 }
140
141 pub fn update_cluster_membership(&mut self, nodes: HashSet<NodeId>) -> Option<NodeId> {
143 let new_leader = self.leader_selector.update_cluster_view(nodes.clone());
144
145 self.engine_state.update_active_nodes(nodes);
147
148 if let Some(leader) = new_leader {
149 info!("Leadership changed to node {}", leader);
150 }
151
152 new_leader
153 }
154
155 async fn save_state(&self) -> Result<()> {
157 let current_phase = self.engine_state.current_phase();
158 let last_committed_phase = self.engine_state.last_committed_phase();
159
160 let snapshot = {
162 let sm = self.state_machine.lock().await;
163 Some(sm.create_snapshot().await?)
164 };
165
166 let engine_state = rabia_core::persistence::EngineState::new(
167 current_phase,
168 last_committed_phase,
169 snapshot,
170 );
171
172 let state_bytes = engine_state.to_bytes()?;
173 self.persistence.save_state(&state_bytes).await?;
174
175 debug!(
176 "Saved engine state: phase {}, committed {}",
177 current_phase.value(),
178 last_committed_phase.value()
179 );
180
181 Ok(())
182 }
183
184 pub async fn run(mut self) -> Result<()> {
185 info!("Starting Rabia consensus engine for node {}", self.node_id);
186
187 let mut cleanup_interval = interval(self.config.cleanup_interval);
188 let mut heartbeat_interval = interval(self.config.heartbeat_interval);
189 let mut message_buffer = Vec::new();
190
191 self.initialize().await?;
192
193 loop {
194 if let Err(e) = self.receive_messages(&mut message_buffer).await {
196 if !e.to_string().contains("No messages available") {
197 error!("Error receiving messages: {}", e);
198 }
199 } else {
200 for (from, message) in message_buffer.drain(..) {
201 if let Err(e) = self.handle_message(from, message).await {
202 error!("Error handling message from {}: {}", from, e);
203 }
204 }
205 }
206
207 tokio::select! {
208 command_opt = self.command_rx.recv() => {
210 if let Some(command) = command_opt {
211 if let Err(e) = self.handle_command(command).await {
212 error!("Error handling command: {}", e);
213 }
214 } else {
215 break Ok(());
217 }
218 }
219
220 _ = cleanup_interval.tick() => {
222 self.cleanup_old_state().await;
223 }
224
225 _ = heartbeat_interval.tick() => {
227 if let Err(e) = self.send_heartbeat().await {
228 warn!("Failed to send heartbeat: {}", e);
229 }
230 }
231
232 _ = tokio::time::sleep(Duration::from_millis(1)) => {}
234 }
235 }
236 }
237
238 async fn initialize(&mut self) -> Result<()> {
239 if let Some(persisted_data) = self.persistence.load_state().await? {
241 info!("Restoring state from persistence");
242
243 let persisted_state =
244 rabia_core::persistence::EngineState::from_bytes(&persisted_data)?;
245
246 self.engine_state.current_phase.store(
248 persisted_state.current_phase.value(),
249 std::sync::atomic::Ordering::Release,
250 );
251 self.engine_state.last_committed_phase.store(
252 persisted_state.last_committed_phase.value(),
253 std::sync::atomic::Ordering::Release,
254 );
255
256 if let Some(snapshot) = persisted_state.snapshot {
258 let mut sm = self.state_machine.lock().await;
259 sm.restore_snapshot(&snapshot).await?;
260 }
261 }
262
263 let connected_nodes = self.network.lock().await.get_connected_nodes().await?;
265 self.engine_state.update_active_nodes(connected_nodes);
266
267 info!("Engine initialized successfully");
268 Ok(())
269 }
270
271 async fn handle_command(&mut self, command: EngineCommand) -> Result<()> {
272 match command {
273 EngineCommand::ProcessBatch(request) => self.process_batch_request(request).await,
274 EngineCommand::Shutdown => {
275 info!("Shutting down consensus engine");
276 Err(RabiaError::internal("Shutdown requested"))
277 }
278 EngineCommand::ForcePhaseAdvance => self.advance_to_next_phase().await,
279 EngineCommand::TriggerSync => self.initiate_sync().await,
280 EngineCommand::GetStatistics(tx) => {
281 let stats = self.engine_state.get_statistics();
282 let _ = tx.send(stats);
283 Ok(())
284 }
285 }
286 }
287
288 async fn process_batch_request(&mut self, request: CommandRequest) -> Result<()> {
289 if !self.engine_state.has_quorum() {
290 let _ = request
291 .response_tx
292 .send(Err(RabiaError::QuorumNotAvailable {
293 current: self.engine_state.get_active_nodes().len(),
294 required: self.engine_state.quorum_size,
295 }));
296 return Ok(());
297 }
298
299 let batch_id = self
301 .engine_state
302 .add_pending_batch(request.batch.clone(), self.node_id);
303
304 self.propose_batch(batch_id, request.batch).await?;
306
307 Ok(())
310 }
311
312 async fn propose_batch(&mut self, batch_id: BatchId, batch: CommandBatch) -> Result<()> {
313 let phase_id = self.engine_state.advance_phase();
314
315 debug!("Proposing batch {} in phase {}", batch_id, phase_id);
316
317 let proposed_value = StateValue::V1; self.engine_state.update_phase(phase_id, |phase| {
325 phase.batch_id = Some(batch_id);
326 phase.proposed_value = Some(proposed_value);
327 phase.batch = Some(batch.clone());
328 })?;
329
330 let proposal = ProposeMessage {
333 phase_id,
334 batch_id,
335 value: proposed_value,
336 batch: Some(batch),
337 };
338
339 let message = ProtocolMessage::propose(self.node_id, proposal);
340 self.network
341 .lock()
342 .await
343 .broadcast(message, Some(self.node_id))
344 .await?;
345
346 Ok(())
347 }
348
349 async fn handle_message(&mut self, from: NodeId, message: ProtocolMessage) -> Result<()> {
350 if let Err(e) = message.validate() {
352 warn!("Received invalid message from {}: {}", from, e);
353 return Err(e);
354 }
355
356 if message.from != from {
358 warn!(
359 "Message claims to be from {} but received from {}",
360 message.from, from
361 );
362 return Err(RabiaError::network("Message source mismatch"));
363 }
364
365 match message.message_type {
366 MessageType::Propose(propose) => self.handle_propose(from, propose).await,
367 MessageType::VoteRound1(vote) => self.handle_vote_round1(from, vote).await,
368 MessageType::VoteRound2(vote) => self.handle_vote_round2(from, vote).await,
369 MessageType::Decision(decision) => self.handle_decision(from, decision).await,
370 MessageType::SyncRequest(request) => self.handle_sync_request(from, request).await,
371 MessageType::SyncResponse(response) => self.handle_sync_response(from, response).await,
372 MessageType::NewBatch(new_batch) => self.handle_new_batch(from, new_batch).await,
373 MessageType::HeartBeat(heartbeat) => self.handle_heartbeat(from, heartbeat).await,
374 MessageType::QuorumNotification(_) => {
375 Ok(())
377 }
378 }
379 }
380
381 async fn handle_propose(&mut self, from: NodeId, propose: ProposeMessage) -> Result<()> {
382 if !self.engine_state.has_quorum() {
383 return Ok(()); }
385
386 debug!(
387 "Received proposal from {} for phase {}",
388 from, propose.phase_id
389 );
390
391 if let Some(batch) = &propose.batch {
393 self.engine_state.add_pending_batch(batch.clone(), from);
394 }
395
396 let vote = self.determine_round1_vote(&propose).await;
398
399 self.engine_state.update_phase(propose.phase_id, |phase| {
401 phase.batch_id = Some(propose.batch_id);
402 if phase.proposed_value.is_none() {
403 phase.proposed_value = Some(propose.value);
404 }
405 if phase.batch.is_none() {
406 phase.batch = propose.batch.clone();
407 }
408 })?;
409
410 let vote_msg = VoteRound1Message {
412 phase_id: propose.phase_id,
413 batch_id: propose.batch_id,
414 vote,
415 voter_id: self.node_id,
416 };
417
418 let message = ProtocolMessage::vote_round1(self.node_id, from, vote_msg);
419 self.network.lock().await.send_to(from, message).await?;
420
421 Ok(())
422 }
423
424 async fn determine_round1_vote(&mut self, propose: &ProposeMessage) -> StateValue {
425 let phase = self.engine_state.get_phase(&propose.phase_id);
430
431 match phase {
432 Some(existing_phase) => {
433 if let Some(existing_value) = &existing_phase.proposed_value {
435 if *existing_value == propose.value {
436 propose.value
438 } else {
439 StateValue::VQuestion
441 }
442 } else {
443 self.randomized_vote(&propose.value)
445 }
446 }
447 None => {
448 self.randomized_vote(&propose.value)
450 }
451 }
452 }
453
454 fn randomized_vote(&mut self, proposed_value: &StateValue) -> StateValue {
455 match proposed_value {
459 StateValue::V0 => {
460 if self.rng.gen_bool(0.7) {
462 StateValue::V0
463 } else {
464 StateValue::VQuestion
465 }
466 }
467 StateValue::V1 => {
468 if self.rng.gen_bool(0.8) {
471 StateValue::V1
472 } else {
473 StateValue::VQuestion
474 }
475 }
476 StateValue::VQuestion => {
477 StateValue::VQuestion
479 }
480 }
481 }
482
483 async fn handle_vote_round1(&mut self, from: NodeId, vote: VoteRound1Message) -> Result<()> {
484 debug!(
485 "Received round 1 vote from {} for phase {}",
486 from, vote.phase_id
487 );
488
489 self.engine_state.update_phase(vote.phase_id, |phase| {
491 phase.add_round1_vote(from, vote.vote);
492 })?;
493
494 if let Some(phase) = self.engine_state.get_phase(&vote.phase_id) {
496 if let Some(majority_vote) = phase.has_round1_majority(self.engine_state.quorum_size) {
497 self.proceed_to_round2(vote.phase_id, majority_vote, phase.round1_votes)
499 .await?;
500 } else if phase.round1_votes.len() >= self.engine_state.quorum_size {
501 self.proceed_to_round2(vote.phase_id, StateValue::VQuestion, phase.round1_votes)
504 .await?;
505 }
506 }
507
508 Ok(())
509 }
510
511 async fn proceed_to_round2(
512 &mut self,
513 phase_id: PhaseId,
514 round1_result: StateValue,
515 round1_votes: std::collections::HashMap<NodeId, StateValue>,
516 ) -> Result<()> {
517 debug!(
518 "Proceeding to round 2 for phase {} with result {:?}",
519 phase_id, round1_result
520 );
521
522 let round2_vote = match round1_result {
524 StateValue::V0 => {
525 StateValue::V0
527 }
528 StateValue::V1 => {
529 StateValue::V1
531 }
532 StateValue::VQuestion => {
533 self.determine_round2_vote_for_question(&round1_votes)
536 }
537 };
538
539 self.engine_state.update_phase(phase_id, |phase| {
541 phase.add_round2_vote(self.node_id, round2_vote);
542 })?;
543
544 let vote_msg = VoteRound2Message {
546 phase_id,
547 batch_id: self
548 .engine_state
549 .get_phase(&phase_id)
550 .and_then(|p| p.batch_id)
551 .unwrap_or_default(),
552 vote: round2_vote,
553 voter_id: self.node_id,
554 round1_votes,
555 };
556
557 let message = ProtocolMessage::vote_round2(self.node_id, self.node_id, vote_msg);
558 self.network
559 .lock()
560 .await
561 .broadcast(message, Some(self.node_id))
562 .await?;
563
564 Ok(())
565 }
566
567 fn determine_round2_vote_for_question(
568 &mut self,
569 round1_votes: &std::collections::HashMap<NodeId, StateValue>,
570 ) -> StateValue {
571 let v0_count = round1_votes
576 .values()
577 .filter(|&v| *v == StateValue::V0)
578 .count();
579 let v1_count = round1_votes
580 .values()
581 .filter(|&v| *v == StateValue::V1)
582 .count();
583
584 match v1_count.cmp(&v0_count) {
585 std::cmp::Ordering::Greater => {
586 if self.rng.gen_bool(0.9) {
588 StateValue::V1
589 } else {
590 StateValue::V0
591 }
592 }
593 std::cmp::Ordering::Less => {
594 if self.rng.gen_bool(0.9) {
596 StateValue::V0
597 } else {
598 StateValue::V1
599 }
600 }
601 std::cmp::Ordering::Equal => {
602 if self.rng.gen_bool(0.8) {
605 StateValue::V1
606 } else {
607 StateValue::V0
608 }
609 }
610 }
611 }
612
613 async fn handle_vote_round2(&mut self, from: NodeId, vote: VoteRound2Message) -> Result<()> {
614 debug!(
615 "Received round 2 vote from {} for phase {}",
616 from, vote.phase_id
617 );
618
619 self.engine_state.update_phase(vote.phase_id, |phase| {
621 phase.add_round2_vote(from, vote.vote);
622 })?;
623
624 if let Some(phase) = self.engine_state.get_phase(&vote.phase_id) {
626 if let Some(decision) = phase.has_round2_majority(self.engine_state.quorum_size) {
627 self.make_decision(vote.phase_id, decision).await?;
628 }
629 }
630
631 Ok(())
632 }
633
634 async fn make_decision(&mut self, phase_id: PhaseId, decision: StateValue) -> Result<()> {
635 info!("Decision reached for phase {}: {:?}", phase_id, decision);
636
637 self.engine_state.update_phase(phase_id, |phase| {
639 phase.set_decision(decision);
640 })?;
641
642 if decision == StateValue::V1 {
644 if let Some(phase) = self.engine_state.get_phase(&phase_id) {
645 if let Some(batch) = &phase.batch {
646 self.apply_batch(batch).await?;
647 if let Err(e) = self.engine_state.commit_phase(phase_id) {
648 error!("Failed to commit phase {}: {}", phase_id, e);
649 return Err(e);
650 }
651
652 if let Err(e) = self.save_state().await {
654 warn!("Failed to save state after commit: {}", e);
655 }
656 }
657 }
658 }
659
660 let phase = self.engine_state.get_phase(&phase_id).ok_or_else(|| {
662 RabiaError::internal(format!(
663 "Phase {} not found for decision broadcast",
664 phase_id
665 ))
666 })?;
667 let decision_msg = DecisionMessage {
668 phase_id,
669 batch_id: phase.batch_id.unwrap_or_default(),
670 decision,
671 batch: phase.batch,
672 };
673
674 let message = ProtocolMessage::decision(self.node_id, decision_msg);
675 self.network
676 .lock()
677 .await
678 .broadcast(message, Some(self.node_id))
679 .await?;
680
681 Ok(())
682 }
683
684 async fn apply_batch(&mut self, batch: &CommandBatch) -> Result<()> {
685 debug!(
686 "Applying batch {} with {} commands",
687 batch.id,
688 batch.commands.len()
689 );
690
691 let results = {
693 let mut sm = self.state_machine.lock().await;
694 sm.apply_commands(&batch.commands).await?
695 }; self.engine_state.remove_pending_batch(&batch.id);
699
700 info!(
701 "Successfully applied batch {} with {} results",
702 batch.id,
703 results.len()
704 );
705 Ok(())
706 }
707
708 async fn handle_decision(&mut self, _from: NodeId, decision: DecisionMessage) -> Result<()> {
709 debug!(
710 "Received decision for phase {}: {:?}",
711 decision.phase_id, decision.decision
712 );
713
714 self.engine_state.update_phase(decision.phase_id, |phase| {
716 phase.set_decision(decision.decision);
717 if phase.batch.is_none() {
718 phase.batch = decision.batch.clone();
719 }
720 })?;
721
722 if decision.decision == StateValue::V1 {
724 if let Some(batch) = &decision.batch {
725 let last_committed = self.engine_state.last_committed_phase();
727 if decision.phase_id > last_committed {
728 self.apply_batch(batch).await?;
729 if let Err(e) = self.engine_state.commit_phase(decision.phase_id) {
730 error!(
731 "Failed to commit phase {} from decision: {}",
732 decision.phase_id, e
733 );
734 return Err(e);
735 }
736
737 if let Err(e) = self.save_state().await {
739 warn!("Failed to save state after commit: {}", e);
740 }
741 }
742 }
743 }
744
745 Ok(())
746 }
747
748 async fn handle_sync_request(
749 &mut self,
750 from: NodeId,
751 request: SyncRequestMessage,
752 ) -> Result<()> {
753 debug!(
754 "Received sync request from {} (phase: {})",
755 from, request.requester_phase
756 );
757
758 let current_phase = self.engine_state.current_phase();
760 let state_version = self.engine_state.get_state_version();
761
762 let snapshot = if current_phase > request.requester_phase {
764 let sm = self.state_machine.lock().await;
765 Some(sm.create_snapshot().await?)
766 } else {
767 None
768 };
769
770 let response = SyncResponseMessage {
771 responder_phase: current_phase,
772 responder_state_version: state_version,
773 state_snapshot: snapshot,
774 pending_batches: Vec::new(), committed_phases: Vec::new(), };
777
778 let message = ProtocolMessage::sync_response(self.node_id, from, response);
779 self.network.lock().await.send_to(from, message).await?;
780
781 Ok(())
782 }
783
784 async fn handle_sync_response(
785 &mut self,
786 from: NodeId,
787 response: SyncResponseMessage,
788 ) -> Result<()> {
789 debug!(
790 "Received sync response from {} (phase: {})",
791 from, response.responder_phase
792 );
793
794 self.engine_state.add_sync_response(from, response);
796
797 let sync_responses = self.engine_state.get_sync_responses();
799 if sync_responses.len() >= self.engine_state.quorum_size {
800 self.resolve_sync(sync_responses).await?;
801 }
802
803 Ok(())
804 }
805
806 async fn resolve_sync(
807 &mut self,
808 responses: std::collections::HashMap<NodeId, SyncResponseMessage>,
809 ) -> Result<()> {
810 info!("Resolving sync with {} responses", responses.len());
811
812 let latest_response = responses
814 .values()
815 .max_by_key(|r| r.responder_phase.value())
816 .cloned();
817
818 if let Some(latest) = latest_response {
819 let current_phase = self.engine_state.current_phase();
820
821 if latest.responder_phase > current_phase {
822 info!(
823 "Syncing to phase {} from phase {}",
824 latest.responder_phase, current_phase
825 );
826
827 self.engine_state.current_phase.store(
829 latest.responder_phase.value(),
830 std::sync::atomic::Ordering::Release,
831 );
832
833 if let Some(snapshot) = latest.state_snapshot {
835 let mut sm = self.state_machine.lock().await;
836 sm.restore_snapshot(&snapshot).await?;
837 }
838 }
839 }
840
841 self.engine_state.clear_sync_responses();
843 Ok(())
844 }
845
846 async fn handle_new_batch(&mut self, from: NodeId, new_batch: NewBatchMessage) -> Result<()> {
847 debug!("Received new batch from {}", from);
848
849 self.engine_state
851 .add_pending_batch(new_batch.batch, new_batch.originator);
852
853 Ok(())
854 }
855
856 async fn handle_heartbeat(
857 &mut self,
858 _from: NodeId,
859 _heartbeat: HeartBeatMessage,
860 ) -> Result<()> {
861 Ok(())
864 }
865
866 async fn send_heartbeat(&mut self) -> Result<()> {
867 let heartbeat = HeartBeatMessage {
868 current_phase: self.engine_state.current_phase(),
869 last_committed_phase: self.engine_state.last_committed_phase(),
870 active: self.engine_state.is_active(),
871 };
872
873 let message = ProtocolMessage::new(self.node_id, None, MessageType::HeartBeat(heartbeat));
874
875 self.network
876 .lock()
877 .await
878 .broadcast(message, Some(self.node_id))
879 .await?;
880 Ok(())
881 }
882
883 async fn advance_to_next_phase(&mut self) -> Result<()> {
884 let new_phase = self.engine_state.advance_phase();
885 info!("Advanced to phase {}", new_phase);
886 Ok(())
887 }
888
889 async fn initiate_sync(&mut self) -> Result<()> {
890 info!("Initiating synchronization");
891
892 let request = SyncRequestMessage {
893 requester_phase: self.engine_state.current_phase(),
894 requester_state_version: self.engine_state.get_state_version(),
895 };
896
897 let active_nodes = self.engine_state.get_active_nodes();
899 for node_id in active_nodes {
900 if node_id != self.node_id {
901 let message = ProtocolMessage::sync_request(self.node_id, node_id, request.clone());
902 self.network.lock().await.send_to(node_id, message).await?;
903 }
904 }
905
906 Ok(())
907 }
908
909 async fn cleanup_old_state(&mut self) {
910 let removed_phases = self
911 .engine_state
912 .cleanup_old_phases(self.config.max_phase_history);
913 let removed_batches = self.engine_state.cleanup_old_pending_batches(300); if removed_phases > 0 || removed_batches > 0 {
916 debug!(
917 "Cleaned up {} old phases and {} old batches",
918 removed_phases, removed_batches
919 );
920 }
921 }
922
923 async fn receive_messages(&self, buffer: &mut Vec<(NodeId, ProtocolMessage)>) -> Result<()> {
924 let mut network = self.network.lock().await;
926
927 match timeout(Duration::from_millis(10), network.receive()).await {
928 Ok(Ok((from, message))) => {
929 buffer.push((from, message));
930
931 for _ in 0..10 {
933 match timeout(Duration::from_millis(1), network.receive()).await {
935 Ok(Ok((from, message))) => buffer.push((from, message)),
936 _ => break,
937 }
938 }
939 }
940 Ok(Err(e)) => return Err(e),
941 Err(_) => {
942 }
944 }
945
946 Ok(())
947 }
948}
949
950#[async_trait::async_trait]
951impl<SM, NT, PL> NetworkEventHandler for RabiaEngine<SM, NT, PL>
952where
953 SM: StateMachine + 'static,
954 NT: NetworkTransport + 'static,
955 PL: PersistenceLayer + 'static,
956{
957 async fn on_node_connected(&self, node_id: NodeId) {
958 info!("Node {} connected", node_id);
959 }
962
963 async fn on_node_disconnected(&self, node_id: NodeId) {
964 warn!("Node {} disconnected", node_id);
965 }
968
969 async fn on_network_partition(&self, active_nodes: HashSet<NodeId>) {
970 warn!(
971 "Network partition detected, {} active nodes",
972 active_nodes.len()
973 );
974 self.engine_state.update_active_nodes(active_nodes.clone());
975
976 let current_leader = self.leader_selector.get_leader();
980 info!("Current leader after partition: {:?}", current_leader);
981 }
982
983 async fn on_quorum_lost(&self) {
984 error!("Quorum lost - stopping consensus operations");
985 self.engine_state.set_active(false);
986 }
987
988 async fn on_quorum_restored(&self, active_nodes: HashSet<NodeId>) {
989 info!("Quorum restored with {} nodes", active_nodes.len());
990 self.engine_state.update_active_nodes(active_nodes.clone());
991 self.engine_state.set_active(true);
992
993 let current_leader = self.leader_selector.get_leader();
996 info!("Current leader after quorum restore: {:?}", current_leader);
997 }
998}