1use std::collections::BTreeMap;
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use aleph_bft::Keychain as KeychainTrait;
8use anyhow::{Context as _, anyhow, bail};
9use async_channel::Receiver;
10use fedimint_api_client::api::{DynGlobalApi, FederationApiExt, PeerError};
11use fedimint_api_client::query::FilterMap;
12use fedimint_core::config::P2PMessage;
13use fedimint_core::core::{DynOutput, MODULE_INSTANCE_ID_GLOBAL};
14use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
15use fedimint_core::encoding::Decodable;
16use fedimint_core::endpoint_constants::AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT;
17use fedimint_core::epoch::ConsensusItem;
18use fedimint_core::module::audit::Audit;
19use fedimint_core::module::registry::ModuleDecoderRegistry;
20use fedimint_core::module::{ApiRequestErased, SerdeModuleEncoding};
21use fedimint_core::net::peers::DynP2PConnections;
22use fedimint_core::runtime::spawn;
23use fedimint_core::session_outcome::{
24 AcceptedItem, SchnorrSignature, SessionOutcome, SignedSessionOutcome,
25};
26use fedimint_core::task::{TaskGroup, TaskHandle, sleep};
27use fedimint_core::timing::TimeReporter;
28use fedimint_core::util::{FmtCompact as _, FmtCompactAnyhow as _};
29use fedimint_core::{NumPeers, NumPeersExt, PeerId, timing};
30use fedimint_server_core::{ServerModuleRegistry, ServerModuleRegistryExt};
31use futures::StreamExt;
32use rand::Rng;
33use tokio::sync::watch;
34use tracing::{Level, debug, error, info, instrument, trace, warn};
35
36use crate::LOG_CONSENSUS;
37use crate::config::ServerConfig;
38use crate::consensus::aleph_bft::backup::{BackupReader, BackupWriter};
39use crate::consensus::aleph_bft::data_provider::{DataProvider, UnitData};
40use crate::consensus::aleph_bft::finalization_handler::{FinalizationHandler, OrderedUnit};
41use crate::consensus::aleph_bft::keychain::Keychain;
42use crate::consensus::aleph_bft::network::Network;
43use crate::consensus::aleph_bft::spawner::Spawner;
44use crate::consensus::aleph_bft::to_node_index;
45use crate::consensus::db::{
46 AcceptedItemKey, AcceptedItemPrefix, AcceptedTransactionKey, AlephUnitsPrefix,
47 SignedSessionOutcomeKey, SignedSessionOutcomePrefix,
48};
49use crate::consensus::debug::{DebugConsensusItem, DebugConsensusItemCompact};
50use crate::consensus::transaction::{TxProcessingMode, process_transaction_with_dbtx};
51use crate::metrics::{
52 CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS,
53 CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS, CONSENSUS_ITEMS_PROCESSED_TOTAL,
54 CONSENSUS_ORDERING_LATENCY_SECONDS, CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX,
55 CONSENSUS_SESSION_COUNT,
56};
57
58const DB_CHECKPOINTS_DIR: &str = "db_checkpoints";
60
61pub struct ConsensusEngine {
63 pub modules: ServerModuleRegistry,
64 pub db: Database,
65 pub federation_api: DynGlobalApi,
66 pub cfg: ServerConfig,
67 pub submission_receiver: Receiver<ConsensusItem>,
68 pub shutdown_receiver: watch::Receiver<Option<u64>>,
69 pub connections: DynP2PConnections<P2PMessage>,
70 pub ci_status_senders: BTreeMap<PeerId, watch::Sender<Option<u64>>>,
71 pub ord_latency_sender: watch::Sender<Option<Duration>>,
72 pub task_group: TaskGroup,
73 pub data_dir: PathBuf,
74 pub db_checkpoint_retention: u64,
75}
76
77impl ConsensusEngine {
78 fn num_peers(&self) -> NumPeers {
79 self.cfg.consensus.broadcast_public_keys.to_num_peers()
80 }
81
82 fn identity(&self) -> PeerId {
83 self.cfg.local.identity
84 }
85
86 #[instrument(target = LOG_CONSENSUS, name = "run", skip_all, fields(id=%self.cfg.local.identity))]
87 pub async fn run(self) -> anyhow::Result<()> {
88 if self.num_peers().total() == 1 {
89 self.run_single_guardian(self.task_group.make_handle())
90 .await
91 } else {
92 self.run_consensus(self.task_group.make_handle()).await
93 }
94 }
95
96 pub async fn run_single_guardian(&self, task_handle: TaskHandle) -> anyhow::Result<()> {
97 assert_eq!(self.num_peers(), NumPeers::from(1));
98
99 self.initialize_checkpoint_directory(self.get_finished_session_count().await)?;
100
101 while !task_handle.is_shutting_down() {
102 let session_index = self.get_finished_session_count().await;
103
104 CONSENSUS_SESSION_COUNT.set(session_index as i64);
105
106 let mut item_index = self.pending_accepted_items().await.len() as u64;
107
108 let session_start_time = std::time::Instant::now();
109
110 while let Ok(item) = self.submission_receiver.recv().await {
111 if self
112 .process_consensus_item(session_index, item_index, item, self.identity())
113 .await
114 .is_ok()
115 {
116 item_index += 1;
117 }
118
119 if session_start_time.elapsed() > Duration::from_secs(60) {
121 break;
122 }
123 }
124
125 let session_outcome = SessionOutcome {
126 items: self.pending_accepted_items().await,
127 };
128
129 let header = session_outcome.header(session_index);
130 let signature = Keychain::new(&self.cfg).sign(&header);
131 let signatures = BTreeMap::from_iter([(self.identity(), signature)]);
132
133 self.complete_session(
134 session_index,
135 SignedSessionOutcome {
136 session_outcome,
137 signatures,
138 },
139 )
140 .await;
141
142 self.checkpoint_database(session_index);
143
144 info!(target: LOG_CONSENSUS, "Session {session_index} completed");
145
146 if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
147 break;
148 }
149 }
150
151 info!(target: LOG_CONSENSUS, "Consensus task shut down");
152
153 Ok(())
154 }
155
156 pub async fn run_consensus(&self, task_handle: TaskHandle) -> anyhow::Result<()> {
157 assert!(self.num_peers().total() >= 4);
159
160 self.initialize_checkpoint_directory(self.get_finished_session_count().await)?;
161
162 while !task_handle.is_shutting_down() {
163 let session_index = self.get_finished_session_count().await;
164
165 CONSENSUS_SESSION_COUNT.set(session_index as i64);
166
167 let is_recovery = self.is_recovery().await;
168 info!(
169 target: LOG_CONSENSUS,
170 ?session_index,
171 is_recovery,
172 "Starting consensus session"
173 );
174
175 self.run_session(self.connections.clone(), session_index)
176 .await?;
177
178 info!(target: LOG_CONSENSUS, ?session_index, "Completed consensus session");
179
180 if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
181 info!(target: LOG_CONSENSUS, "Initiating shutdown, waiting for peers to complete the session...");
182
183 sleep(Duration::from_secs(60)).await;
184
185 break;
186 }
187 }
188
189 info!(target: LOG_CONSENSUS, "Consensus task shut down");
190
191 Ok(())
192 }
193
194 pub async fn run_session(
195 &self,
196 connections: DynP2PConnections<P2PMessage>,
197 session_index: u64,
198 ) -> anyhow::Result<()> {
199 const EXP_SLOWDOWN_ROUNDS: u16 = 1000;
217 const BASE: f64 = 1.02;
218
219 let rounds_per_session = self.cfg.consensus.broadcast_rounds_per_session;
220 let round_delay = f64::from(self.cfg.local.broadcast_round_delay_ms);
221
222 let mut delay_config = aleph_bft::default_delay_config();
223
224 delay_config.unit_creation_delay = Arc::new(move |round_index| {
225 let delay = if round_index == 0 {
226 0.0
227 } else {
228 round_delay
229 * BASE.powf(round_index.saturating_sub(rounds_per_session as usize) as f64)
230 * rand::thread_rng().gen_range(0.5..=1.5)
231 };
232
233 Duration::from_millis(delay.round() as u64)
234 });
235
236 let config = aleph_bft::create_config(
237 self.num_peers().total().into(),
238 self.identity().to_usize().into(),
239 session_index,
240 self.cfg
241 .consensus
242 .broadcast_rounds_per_session
243 .checked_add(EXP_SLOWDOWN_ROUNDS)
244 .expect("Rounds per session exceed maximum of u16::Max - EXP_SLOWDOWN_ROUNDS"),
245 delay_config,
246 Duration::from_secs(10 * 365 * 24 * 60 * 60),
247 )
248 .expect("The exponential slowdown exceeds 10 years");
249
250 let (unit_data_sender, unit_data_receiver) = async_channel::unbounded();
253 let (signature_sender, signature_receiver) = watch::channel(None);
254 let (timestamp_sender, timestamp_receiver) = async_channel::unbounded();
255 let (terminator_sender, terminator_receiver) = futures::channel::oneshot::channel();
256
257 let aleph_handle = spawn(
258 "aleph run session",
259 aleph_bft::run_session(
260 config,
261 aleph_bft::LocalIO::new(
262 DataProvider::new(
263 self.submission_receiver.clone(),
264 signature_receiver,
265 timestamp_sender,
266 self.is_recovery().await,
267 ),
268 FinalizationHandler::new(unit_data_sender),
269 BackupWriter::new(self.db.clone()).await,
270 BackupReader::new(self.db.clone()),
271 ),
272 Network::new(connections),
273 Keychain::new(&self.cfg),
274 Spawner::new(self.task_group.make_subgroup()),
275 aleph_bft::Terminator::create_root(terminator_receiver, "Terminator"),
276 ),
277 );
278
279 self.ord_latency_sender.send_replace(None);
280
281 let signed_session_outcome = self
282 .complete_signed_session_outcome(
283 session_index,
284 unit_data_receiver,
285 signature_sender,
286 timestamp_receiver,
287 )
288 .await?;
289
290 info!(target: LOG_CONSENSUS, ?session_index, "Terminating Aleph BFT session");
291
292 terminator_sender.send(()).ok();
295 aleph_handle.await.ok();
296
297 self.complete_session(session_index, signed_session_outcome)
301 .await;
302
303 self.checkpoint_database(session_index);
304
305 Ok(())
306 }
307
308 async fn is_recovery(&self) -> bool {
309 self.db
310 .begin_transaction_nc()
311 .await
312 .find_by_prefix(&AlephUnitsPrefix)
313 .await
314 .next()
315 .await
316 .is_some()
317 }
318
319 pub async fn complete_signed_session_outcome(
320 &self,
321 session_index: u64,
322 ordered_unit_receiver: Receiver<OrderedUnit>,
323 signature_sender: watch::Sender<Option<SchnorrSignature>>,
324 timestamp_receiver: Receiver<Instant>,
325 ) -> anyhow::Result<SignedSessionOutcome> {
326 let mut item_index = 0;
329
330 let mut request_signed_session_outcome = Box::pin(async {
331 self.request_signed_session_outcome(&self.federation_api, session_index)
332 .await
333 });
334
335 loop {
339 tokio::select! {
340 ordered_unit = ordered_unit_receiver.recv() => {
341 let ordered_unit = ordered_unit.with_context(|| format!("Alepbft task exited prematurely. session_idx: {session_index}, item_idx: {item_index}")) ;
342 let ordered_unit = match ordered_unit {
343 Ok(o) => o,
344 Err(err) => {
345 while self.task_group.is_shutting_down() {
352 info!(target: LOG_CONSENSUS, ?session_index, "Shutdown detected");
353 sleep(Duration::from_millis(100)).await;
354 }
355 return Err(err);
357 },
358 };
359
360 if ordered_unit.round >= self.cfg.consensus.broadcast_rounds_per_session {
361 info!(
362 target: LOG_CONSENSUS,
363 ?session_index,
364 "Reached Aleph BFT round limit, stopping item collection"
365 );
366 break;
367 }
368
369 if let Some(UnitData::Batch(bytes)) = ordered_unit.data {
370 if ordered_unit.creator == self.identity() {
371 match timestamp_receiver.try_recv() {
372 Ok(timestamp) => {
373 let latency = match *self.ord_latency_sender.borrow() {
374 Some(latency) => (9 * latency + timestamp.elapsed()) / 10,
375 None => timestamp.elapsed()
376 };
377
378 self.ord_latency_sender.send_replace(Some(latency));
379
380 CONSENSUS_ORDERING_LATENCY_SECONDS.observe(timestamp.elapsed().as_secs_f64());
381 }
382 Err(err) => {
383 debug!(target: LOG_CONSENSUS, err = %err.fmt_compact(), "Missing submission timestamp. This is normal in recovery");
384 }
385 }
386 }
387
388 match Vec::<ConsensusItem>::consensus_decode_whole(&bytes, &self.decoders()) {
389 Ok(items) => {
390 for item in items {
391 if let Ok(()) = self.process_consensus_item(
392 session_index,
393 item_index,
394 item.clone(),
395 ordered_unit.creator
396 ).await {
397 item_index += 1;
398 }
399 }
400 }
401 Err(err) => {
402 warn!(
403 target: LOG_CONSENSUS,
404 %session_index,
405 peer = %ordered_unit.creator,
406 err = %err.fmt_compact(),
407 "Failed to decode consensus items from peer"
408 );
409 }
410 }
411 }
412 },
413 signed_session_outcome = &mut request_signed_session_outcome => {
414 info!(
415 target: LOG_CONSENSUS,
416 ?session_index,
417 "Recovered signed session outcome from peers while collecting signatures"
418 );
419
420 let pending_accepted_items = self.pending_accepted_items().await;
421
422 let (processed, unprocessed) = signed_session_outcome
424 .session_outcome
425 .items
426 .split_at(pending_accepted_items.len());
427
428 info!(
429 target: LOG_CONSENSUS,
430 ?session_index,
431 processed = %processed.len(),
432 unprocessed = %unprocessed.len(),
433 "Processing remaining items..."
434 );
435
436 assert!(
437 processed.iter().eq(pending_accepted_items.iter()),
438 "Consensus Failure: pending accepted items disagree with federation consensus"
439 );
440
441 for (accepted_item, item_index) in unprocessed.iter().zip(processed.len()..) {
442 if let Err(err) = self.process_consensus_item(
443 session_index,
444 item_index as u64,
445 accepted_item.item.clone(),
446 accepted_item.peer
447 ).await {
448 panic!(
449 "Consensus Failure: rejected item accepted by federation consensus: {accepted_item:?}, items: {}+{}, session_idx: {session_index}, item_idx: {item_index}, err: {err}",
450 processed.len(),
451 unprocessed.len(),
452 );
453 }
454 }
455
456 return Ok(signed_session_outcome);
457 }
458 }
459 }
460
461 let items = self.pending_accepted_items().await;
462
463 assert_eq!(item_index, items.len() as u64);
464
465 info!(target: LOG_CONSENSUS, ?session_index, ?item_index, "Processed all items for session");
466
467 let session_outcome = SessionOutcome { items };
468
469 let header = session_outcome.header(session_index);
470
471 info!(
472 target: LOG_CONSENSUS,
473 ?session_index,
474 header = %hex::encode(header),
475 "Signing session header..."
476 );
477
478 let keychain = Keychain::new(&self.cfg);
479
480 #[allow(clippy::disallowed_methods)]
483 signature_sender.send(Some(keychain.sign(&header)))?;
484
485 let mut signatures = BTreeMap::new();
486
487 let items_dump = tokio::sync::OnceCell::new();
488
489 while signatures.len() < self.num_peers().threshold() {
492 tokio::select! {
493 ordered_unit = ordered_unit_receiver.recv() => {
494 let ordered_unit = ordered_unit?;
495
496 if let Some(UnitData::Signature(signature)) = ordered_unit.data {
497 info!(
498 target: LOG_CONSENSUS,
499 ?session_index,
500 peer = %ordered_unit.creator,
501 "Collected signature from peer, verifying..."
502 );
503
504 if keychain.verify(&header, &signature, to_node_index(ordered_unit.creator)){
505 signatures.insert(ordered_unit.creator, signature);
506 } else {
507 error!(
508 target: LOG_CONSENSUS,
509 ?session_index,
510 peer = %ordered_unit.creator,
511 "Consensus Failure: invalid header signature from peer"
512 );
513
514 items_dump.get_or_init(|| async {
515 for (idx, item) in session_outcome.items.iter().enumerate() {
516 info!(target: LOG_CONSENSUS, idx, item = %DebugConsensusItemCompact(item), "Item");
517 }
518 }).await;
519 }
520 }
521 }
522 signed_session_outcome = &mut request_signed_session_outcome => {
523 info!(
524 target: LOG_CONSENSUS,
525 ?session_index,
526 "Recovered signed session outcome from peers while collecting signatures"
527 );
528
529
530
531 assert_eq!(
532 header,
533 signed_session_outcome.session_outcome.header(session_index),
534 "Consensus Failure: header disagrees with federation consensus"
535 );
536
537 return Ok(signed_session_outcome);
538 }
539 }
540 }
541
542 info!(
543 target: LOG_CONSENSUS,
544 ?session_index,
545 "Successfully collected threshold of signatures via the atomic broadcast"
546 );
547
548 Ok(SignedSessionOutcome {
549 session_outcome,
550 signatures,
551 })
552 }
553
554 fn decoders(&self) -> ModuleDecoderRegistry {
555 self.modules.decoder_registry()
556 }
557
558 pub async fn pending_accepted_items(&self) -> Vec<AcceptedItem> {
559 self.db
560 .begin_transaction_nc()
561 .await
562 .find_by_prefix(&AcceptedItemPrefix)
563 .await
564 .map(|entry| entry.1)
565 .collect()
566 .await
567 }
568
569 pub async fn complete_session(
570 &self,
571 session_index: u64,
572 signed_session_outcome: SignedSessionOutcome,
573 ) {
574 let mut dbtx = self.db.begin_transaction().await;
575
576 dbtx.remove_by_prefix(&AlephUnitsPrefix).await;
577
578 dbtx.remove_by_prefix(&AcceptedItemPrefix).await;
579
580 if dbtx
581 .insert_entry(
582 &SignedSessionOutcomeKey(session_index),
583 &signed_session_outcome,
584 )
585 .await
586 .is_some()
587 {
588 panic!("We tried to overwrite a signed session outcome");
589 }
590
591 dbtx.commit_tx_result()
592 .await
593 .expect("This is the only place where we write to this key");
594 }
595
596 fn db_checkpoints_dir(&self) -> PathBuf {
598 self.data_dir.join(DB_CHECKPOINTS_DIR)
599 }
600
601 fn initialize_checkpoint_directory(&self, current_session: u64) -> anyhow::Result<()> {
605 let checkpoint_dir = self.db_checkpoints_dir();
606
607 if checkpoint_dir.exists() {
608 debug!(
609 target: LOG_CONSENSUS,
610 ?current_session,
611 "Removing database checkpoints up to `current_session`"
612 );
613
614 for checkpoint in fs::read_dir(checkpoint_dir)?.flatten() {
615 if let Ok(file_name) = checkpoint.file_name().into_string()
617 && let Ok(session) = file_name.parse::<u64>()
618 && current_session >= self.db_checkpoint_retention
619 && session < current_session - self.db_checkpoint_retention
620 {
621 fs::remove_dir_all(checkpoint.path())?;
622 }
623 }
624 } else {
625 fs::create_dir_all(&checkpoint_dir)?;
626 }
627
628 Ok(())
629 }
630
631 fn checkpoint_database(&self, session_index: u64) {
635 if self.db_checkpoint_retention == 0 {
638 return;
639 }
640
641 let checkpoint_dir = self.db_checkpoints_dir();
642 let session_checkpoint_dir = checkpoint_dir.join(format!("{session_index}"));
643
644 {
645 let _timing = timing::TimeReporter::new("database-checkpoint").level(Level::TRACE);
646 match self.db.checkpoint(&session_checkpoint_dir) {
647 Ok(()) => {
648 debug!(target: LOG_CONSENSUS, ?session_checkpoint_dir, ?session_index, "Created db checkpoint");
649 }
650 Err(err) => {
651 warn!(target: LOG_CONSENSUS, ?session_checkpoint_dir, ?session_index, err = %err.fmt_compact_anyhow(), "Could not create db checkpoint");
652 }
653 }
654 }
655
656 {
657 let _timing = timing::TimeReporter::new("remove-database-checkpoint").level(Level::TRACE);
659 if let Err(err) = self.delete_old_database_checkpoint(session_index, &checkpoint_dir) {
660 warn!(target: LOG_CONSENSUS, err = %err.fmt_compact_anyhow(), "Could not delete old checkpoints");
661 }
662 }
663 }
664
665 fn delete_old_database_checkpoint(
668 &self,
669 session_index: u64,
670 checkpoint_dir: &Path,
671 ) -> anyhow::Result<()> {
672 if self.db_checkpoint_retention > session_index {
673 return Ok(());
674 }
675
676 let delete_session_index = session_index - self.db_checkpoint_retention;
677 let checkpoint_to_delete = checkpoint_dir.join(delete_session_index.to_string());
678 if checkpoint_to_delete.exists() {
679 fs::remove_dir_all(checkpoint_to_delete)?;
680 }
681
682 Ok(())
683 }
684
685 #[instrument(target = LOG_CONSENSUS, skip(self, item), level = "info")]
686 pub async fn process_consensus_item(
687 &self,
688 session_index: u64,
689 item_index: u64,
690 item: ConsensusItem,
691 peer: PeerId,
692 ) -> anyhow::Result<()> {
693 let _timing = timing::TimeReporter::new("process_consensus_item").level(Level::TRACE);
694
695 let timing_prom = CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS
696 .with_label_values(&[&peer.to_usize().to_string()])
697 .start_timer();
698
699 trace!(
700 target: LOG_CONSENSUS,
701 %peer,
702 item = ?DebugConsensusItem(&item),
703 "Processing consensus item"
704 );
705
706 self.ci_status_senders
707 .get(&peer)
708 .expect("No ci status sender for peer")
709 .send_replace(Some(session_index));
710
711 CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX
712 .with_label_values(&[
713 &self.cfg.local.identity.to_usize().to_string(),
714 &peer.to_usize().to_string(),
715 ])
716 .set(session_index as i64);
717
718 let mut dbtx = self.db.begin_transaction().await;
719
720 dbtx.ignore_uncommitted();
721
722 if let Some(existing_item) = dbtx
726 .get_value(&AcceptedItemKey(item_index.to_owned()))
727 .await
728 {
729 if existing_item.item == item && existing_item.peer == peer {
730 return Ok(());
731 }
732
733 bail!(
734 "Item was discarded previously: existing: {existing_item:?} {}, current: {item:?}, {peer}",
735 existing_item.peer
736 );
737 }
738
739 self.process_consensus_item_with_db_transaction(&mut dbtx.to_ref_nc(), item.clone(), peer)
740 .await
741 .inspect_err(|err| {
742 trace!(
744 target: LOG_CONSENSUS,
745 %peer,
746 item = ?DebugConsensusItem(&item),
747 err = %err.fmt_compact_anyhow(),
748 "Rejected consensus item"
749 );
750 })?;
751
752 dbtx.warn_uncommitted();
755
756 dbtx.insert_entry(
757 &AcceptedItemKey(item_index),
758 &AcceptedItem {
759 item: item.clone(),
760 peer,
761 },
762 )
763 .await;
764
765 debug!(
766 target: LOG_CONSENSUS,
767 %peer,
768 item = ?DebugConsensusItem(&item),
769 "Processed consensus item"
770 );
771 let mut audit = Audit::default();
772
773 for (module_instance_id, kind, module) in self.modules.iter_modules() {
774 let _module_audit_timing =
775 TimeReporter::new(format!("audit module {module_instance_id}")).level(Level::TRACE);
776
777 let timing_prom = CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS
778 .with_label_values(&[
779 MODULE_INSTANCE_ID_GLOBAL.to_string().as_str(),
780 kind.as_str(),
781 ])
782 .start_timer();
783
784 module
785 .audit(
786 &mut dbtx
787 .to_ref_with_prefix_module_id(module_instance_id)
788 .0
789 .into_nc(),
790 &mut audit,
791 module_instance_id,
792 )
793 .await;
794
795 timing_prom.observe_duration();
796 }
797
798 assert!(
799 audit
800 .net_assets()
801 .expect("Overflow while checking balance sheet")
802 .milli_sat
803 >= 0,
804 "Balance sheet of the fed has gone negative, this should never happen! {audit}"
805 );
806
807 dbtx.commit_tx_result()
808 .await
809 .expect("Committing consensus epoch failed");
810
811 CONSENSUS_ITEMS_PROCESSED_TOTAL
812 .with_label_values(&[&peer.to_usize().to_string()])
813 .inc();
814
815 timing_prom.observe_duration();
816
817 Ok(())
818 }
819
820 async fn process_consensus_item_with_db_transaction(
821 &self,
822 dbtx: &mut DatabaseTransaction<'_>,
823 consensus_item: ConsensusItem,
824 peer_id: PeerId,
825 ) -> anyhow::Result<()> {
826 self.decoders().assert_reject_mode();
829
830 match consensus_item {
831 ConsensusItem::Module(module_item) => {
832 let instance_id = module_item.module_instance_id();
833
834 let module_dbtx = &mut dbtx.to_ref_with_prefix_module_id(instance_id).0;
835
836 self.modules
837 .get_expect(instance_id)
838 .process_consensus_item(module_dbtx, &module_item, peer_id)
839 .await
840 }
841 ConsensusItem::Transaction(transaction) => {
842 let txid = transaction.tx_hash();
843 if dbtx
844 .get_value(&AcceptedTransactionKey(txid))
845 .await
846 .is_some()
847 {
848 debug!(
849 target: LOG_CONSENSUS,
850 %txid,
851 "Transaction already accepted"
852 );
853 bail!("Transaction is already accepted");
854 }
855
856 let modules_ids = transaction
857 .outputs
858 .iter()
859 .map(DynOutput::module_instance_id)
860 .collect::<Vec<_>>();
861
862 process_transaction_with_dbtx(
863 self.modules.clone(),
864 dbtx,
865 &transaction,
866 self.cfg.consensus.version,
867 TxProcessingMode::Consensus,
868 )
869 .await
870 .map_err(|error| anyhow!(error.to_string()))?;
871
872 debug!(target: LOG_CONSENSUS, %txid, "Transaction accepted");
873 dbtx.insert_entry(&AcceptedTransactionKey(txid), &modules_ids)
874 .await;
875
876 Ok(())
877 }
878 ConsensusItem::Default { variant, .. } => {
879 warn!(
880 target: LOG_CONSENSUS,
881 "Minor consensus version mismatch: unexpected consensus item type: {variant}"
882 );
883
884 panic!("Unexpected consensus item type: {variant}")
885 }
886 }
887 }
888
889 async fn request_signed_session_outcome(
890 &self,
891 federation_api: &DynGlobalApi,
892 index: u64,
893 ) -> SignedSessionOutcome {
894 let decoders = self.decoders();
895 let keychain = Keychain::new(&self.cfg);
896 let threshold = self.num_peers().threshold();
897
898 let filter_map = move |response: SerdeModuleEncoding<SignedSessionOutcome>| {
899 let signed_session_outcome = response
900 .try_into_inner(&decoders)
901 .map_err(|x| PeerError::ResponseDeserialization(x.into()))?;
902 let header = signed_session_outcome.session_outcome.header(index);
903 if signed_session_outcome.signatures.len() == threshold
904 && signed_session_outcome
905 .signatures
906 .iter()
907 .all(|(peer_id, sig)| keychain.verify(&header, sig, to_node_index(*peer_id)))
908 {
909 Ok(signed_session_outcome)
910 } else {
911 Err(PeerError::InvalidResponse(anyhow!("Invalid signatures")))
912 }
913 };
914
915 let mut backoff = fedimint_core::util::backoff_util::api_networking_backoff();
916 loop {
917 let result = federation_api
918 .request_with_strategy(
919 FilterMap::new(filter_map.clone()),
920 AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT.to_string(),
921 ApiRequestErased::new(index),
922 )
923 .await;
924
925 match result {
926 Ok(signed_session_outcome) => return signed_session_outcome,
927 Err(error) => {
928 error.report_if_unusual("Requesting Session Outcome");
929 }
930 }
931
932 sleep(backoff.next().expect("infinite retries")).await;
933 }
934 }
935
936 async fn get_finished_session_count(&self) -> u64 {
939 get_finished_session_count_static(&mut self.db.begin_transaction_nc().await).await
940 }
941}
942
943pub async fn get_finished_session_count_static(dbtx: &mut DatabaseTransaction<'_>) -> u64 {
944 dbtx.find_by_prefix_sorted_descending(&SignedSessionOutcomePrefix)
945 .await
946 .next()
947 .await
948 .map_or(0, |entry| (entry.0.0) + 1)
949}