fedimint_server/consensus/
engine.rs

1use std::collections::BTreeMap;
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use aleph_bft::Keychain as KeychainTrait;
8use anyhow::{Context as _, anyhow, bail};
9use async_channel::Receiver;
10use fedimint_api_client::api::{DynGlobalApi, FederationApiExt, PeerError};
11use fedimint_api_client::query::FilterMap;
12use fedimint_core::config::P2PMessage;
13use fedimint_core::core::{DynOutput, MODULE_INSTANCE_ID_GLOBAL};
14use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
15use fedimint_core::encoding::Decodable;
16use fedimint_core::endpoint_constants::AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT;
17use fedimint_core::epoch::ConsensusItem;
18use fedimint_core::module::audit::Audit;
19use fedimint_core::module::registry::ModuleDecoderRegistry;
20use fedimint_core::module::{ApiRequestErased, SerdeModuleEncoding};
21use fedimint_core::net::peers::DynP2PConnections;
22use fedimint_core::runtime::spawn;
23use fedimint_core::session_outcome::{
24    AcceptedItem, SchnorrSignature, SessionOutcome, SignedSessionOutcome,
25};
26use fedimint_core::task::{TaskGroup, TaskHandle, sleep};
27use fedimint_core::timing::TimeReporter;
28use fedimint_core::util::{FmtCompact as _, FmtCompactAnyhow as _};
29use fedimint_core::{NumPeers, NumPeersExt, PeerId, timing};
30use fedimint_server_core::{ServerModuleRegistry, ServerModuleRegistryExt};
31use futures::StreamExt;
32use rand::Rng;
33use tokio::sync::watch;
34use tracing::{Level, debug, error, info, instrument, trace, warn};
35
36use crate::LOG_CONSENSUS;
37use crate::config::ServerConfig;
38use crate::consensus::aleph_bft::backup::{BackupReader, BackupWriter};
39use crate::consensus::aleph_bft::data_provider::{DataProvider, UnitData};
40use crate::consensus::aleph_bft::finalization_handler::{FinalizationHandler, OrderedUnit};
41use crate::consensus::aleph_bft::keychain::Keychain;
42use crate::consensus::aleph_bft::network::Network;
43use crate::consensus::aleph_bft::spawner::Spawner;
44use crate::consensus::aleph_bft::to_node_index;
45use crate::consensus::db::{
46    AcceptedItemKey, AcceptedItemPrefix, AcceptedTransactionKey, AlephUnitsPrefix,
47    SignedSessionOutcomeKey, SignedSessionOutcomePrefix,
48};
49use crate::consensus::debug::{DebugConsensusItem, DebugConsensusItemCompact};
50use crate::consensus::transaction::{TxProcessingMode, process_transaction_with_dbtx};
51use crate::metrics::{
52    CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS,
53    CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS, CONSENSUS_ITEMS_PROCESSED_TOTAL,
54    CONSENSUS_ORDERING_LATENCY_SECONDS, CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX,
55    CONSENSUS_SESSION_COUNT,
56};
57
58// The name of the directory where the database checkpoints are stored.
59const DB_CHECKPOINTS_DIR: &str = "db_checkpoints";
60
61/// Runs the main server consensus loop
62pub struct ConsensusEngine {
63    pub modules: ServerModuleRegistry,
64    pub db: Database,
65    pub federation_api: DynGlobalApi,
66    pub cfg: ServerConfig,
67    pub submission_receiver: Receiver<ConsensusItem>,
68    pub shutdown_receiver: watch::Receiver<Option<u64>>,
69    pub connections: DynP2PConnections<P2PMessage>,
70    pub ci_status_senders: BTreeMap<PeerId, watch::Sender<Option<u64>>>,
71    pub ord_latency_sender: watch::Sender<Option<Duration>>,
72    pub task_group: TaskGroup,
73    pub data_dir: PathBuf,
74    pub db_checkpoint_retention: u64,
75}
76
77impl ConsensusEngine {
78    fn num_peers(&self) -> NumPeers {
79        self.cfg.consensus.broadcast_public_keys.to_num_peers()
80    }
81
82    fn identity(&self) -> PeerId {
83        self.cfg.local.identity
84    }
85
86    #[instrument(target = LOG_CONSENSUS, name = "run", skip_all, fields(id=%self.cfg.local.identity))]
87    pub async fn run(self) -> anyhow::Result<()> {
88        if self.num_peers().total() == 1 {
89            self.run_single_guardian(self.task_group.make_handle())
90                .await
91        } else {
92            self.run_consensus(self.task_group.make_handle()).await
93        }
94    }
95
96    pub async fn run_single_guardian(&self, task_handle: TaskHandle) -> anyhow::Result<()> {
97        assert_eq!(self.num_peers(), NumPeers::from(1));
98
99        self.initialize_checkpoint_directory(self.get_finished_session_count().await)?;
100
101        while !task_handle.is_shutting_down() {
102            let session_index = self.get_finished_session_count().await;
103
104            CONSENSUS_SESSION_COUNT.set(session_index as i64);
105
106            let mut item_index = self.pending_accepted_items().await.len() as u64;
107
108            let session_start_time = std::time::Instant::now();
109
110            while let Ok(item) = self.submission_receiver.recv().await {
111                if self
112                    .process_consensus_item(session_index, item_index, item, self.identity())
113                    .await
114                    .is_ok()
115                {
116                    item_index += 1;
117                }
118
119                // we rely on the module consensus items to notice the timeout
120                if session_start_time.elapsed() > Duration::from_secs(60) {
121                    break;
122                }
123            }
124
125            let session_outcome = SessionOutcome {
126                items: self.pending_accepted_items().await,
127            };
128
129            let header = session_outcome.header(session_index);
130            let signature = Keychain::new(&self.cfg).sign(&header);
131            let signatures = BTreeMap::from_iter([(self.identity(), signature)]);
132
133            self.complete_session(
134                session_index,
135                SignedSessionOutcome {
136                    session_outcome,
137                    signatures,
138                },
139            )
140            .await;
141
142            self.checkpoint_database(session_index);
143
144            info!(target: LOG_CONSENSUS, "Session {session_index} completed");
145
146            if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
147                break;
148            }
149        }
150
151        info!(target: LOG_CONSENSUS, "Consensus task shut down");
152
153        Ok(())
154    }
155
156    pub async fn run_consensus(&self, task_handle: TaskHandle) -> anyhow::Result<()> {
157        // We need four peers to run the atomic broadcast
158        assert!(self.num_peers().total() >= 4);
159
160        self.initialize_checkpoint_directory(self.get_finished_session_count().await)?;
161
162        while !task_handle.is_shutting_down() {
163            let session_index = self.get_finished_session_count().await;
164
165            CONSENSUS_SESSION_COUNT.set(session_index as i64);
166
167            let is_recovery = self.is_recovery().await;
168            info!(
169                target: LOG_CONSENSUS,
170                ?session_index,
171                is_recovery,
172                "Starting consensus session"
173            );
174
175            self.run_session(self.connections.clone(), session_index)
176                .await?;
177
178            info!(target: LOG_CONSENSUS, ?session_index, "Completed consensus session");
179
180            if Some(session_index) == self.shutdown_receiver.borrow().to_owned() {
181                info!(target: LOG_CONSENSUS, "Initiating shutdown, waiting for peers to complete the session...");
182
183                sleep(Duration::from_secs(60)).await;
184
185                break;
186            }
187        }
188
189        info!(target: LOG_CONSENSUS, "Consensus task shut down");
190
191        Ok(())
192    }
193
194    pub async fn run_session(
195        &self,
196        connections: DynP2PConnections<P2PMessage>,
197        session_index: u64,
198    ) -> anyhow::Result<()> {
199        // In order to bound a sessions RAM consumption we need to bound its number of
200        // units and therefore its number of rounds. Since we use a session to
201        // create a naive secp256k1 threshold signature for the header of session
202        // outcome we have to guarantee that an attacker cannot exhaust our
203        // memory by preventing the creation of a threshold signature, thereby
204        // keeping the session open indefinitely. Hence, after a certain round
205        // index, we increase the delay between rounds exponentially such that
206        // the end of the aleph bft session would only be reached after a minimum
207        // of 10 years. In case of such an attack the broadcast stops ordering any
208        // items until the attack subsides as no items are ordered while the
209        // signatures are collected. The maximum RAM consumption of the aleph bft
210        // broadcast instance is therefore bound by:
211        //
212        // self.keychain.peer_count()
213        //      * (broadcast_rounds_per_session + EXP_SLOWDOWN_ROUNDS)
214        //      * ALEPH_BFT_UNIT_BYTE_LIMIT
215
216        const EXP_SLOWDOWN_ROUNDS: u16 = 1000;
217        const BASE: f64 = 1.02;
218
219        let rounds_per_session = self.cfg.consensus.broadcast_rounds_per_session;
220        let round_delay = f64::from(self.cfg.local.broadcast_round_delay_ms);
221
222        let mut delay_config = aleph_bft::default_delay_config();
223
224        delay_config.unit_creation_delay = Arc::new(move |round_index| {
225            let delay = if round_index == 0 {
226                0.0
227            } else {
228                round_delay
229                    * BASE.powf(round_index.saturating_sub(rounds_per_session as usize) as f64)
230                    * rand::thread_rng().gen_range(0.5..=1.5)
231            };
232
233            Duration::from_millis(delay.round() as u64)
234        });
235
236        let config = aleph_bft::create_config(
237            self.num_peers().total().into(),
238            self.identity().to_usize().into(),
239            session_index,
240            self.cfg
241                .consensus
242                .broadcast_rounds_per_session
243                .checked_add(EXP_SLOWDOWN_ROUNDS)
244                .expect("Rounds per session exceed maximum of u16::Max - EXP_SLOWDOWN_ROUNDS"),
245            delay_config,
246            Duration::from_secs(10 * 365 * 24 * 60 * 60),
247        )
248        .expect("The exponential slowdown exceeds 10 years");
249
250        // we can use an unbounded channel here since the number and size of units
251        // ordered in a single aleph session is bounded as described above
252        let (unit_data_sender, unit_data_receiver) = async_channel::unbounded();
253        let (signature_sender, signature_receiver) = watch::channel(None);
254        let (timestamp_sender, timestamp_receiver) = async_channel::unbounded();
255        let (terminator_sender, terminator_receiver) = futures::channel::oneshot::channel();
256
257        let aleph_handle = spawn(
258            "aleph run session",
259            aleph_bft::run_session(
260                config,
261                aleph_bft::LocalIO::new(
262                    DataProvider::new(
263                        self.submission_receiver.clone(),
264                        signature_receiver,
265                        timestamp_sender,
266                        self.is_recovery().await,
267                    ),
268                    FinalizationHandler::new(unit_data_sender),
269                    BackupWriter::new(self.db.clone()).await,
270                    BackupReader::new(self.db.clone()),
271                ),
272                Network::new(connections),
273                Keychain::new(&self.cfg),
274                Spawner::new(self.task_group.make_subgroup()),
275                aleph_bft::Terminator::create_root(terminator_receiver, "Terminator"),
276            ),
277        );
278
279        self.ord_latency_sender.send_replace(None);
280
281        let signed_session_outcome = self
282            .complete_signed_session_outcome(
283                session_index,
284                unit_data_receiver,
285                signature_sender,
286                timestamp_receiver,
287            )
288            .await?;
289
290        info!(target: LOG_CONSENSUS, ?session_index, "Terminating Aleph BFT session");
291
292        // We can terminate the session instead of waiting for other peers to complete
293        // it since they can always download the signed session outcome from us
294        terminator_sender.send(()).ok();
295        aleph_handle.await.ok();
296
297        // This method removes the backup of the current session from the database
298        // and therefore has to be called after we have waited for the session to
299        // shut down, or we risk write-write conflicts with the UnitSaver
300        self.complete_session(session_index, signed_session_outcome)
301            .await;
302
303        self.checkpoint_database(session_index);
304
305        Ok(())
306    }
307
308    async fn is_recovery(&self) -> bool {
309        self.db
310            .begin_transaction_nc()
311            .await
312            .find_by_prefix(&AlephUnitsPrefix)
313            .await
314            .next()
315            .await
316            .is_some()
317    }
318
319    pub async fn complete_signed_session_outcome(
320        &self,
321        session_index: u64,
322        ordered_unit_receiver: Receiver<OrderedUnit>,
323        signature_sender: watch::Sender<Option<SchnorrSignature>>,
324        timestamp_receiver: Receiver<Instant>,
325    ) -> anyhow::Result<SignedSessionOutcome> {
326        // It is guaranteed that aleph bft will always replay all previously processed
327        // items from the current session from index zero
328        let mut item_index = 0;
329
330        let mut request_signed_session_outcome = Box::pin(async {
331            self.request_signed_session_outcome(&self.federation_api, session_index)
332                .await
333        });
334
335        // We build a session outcome out of the ordered batches until either we have
336        // processed broadcast_rounds_per_session rounds or a threshold signed
337        // session outcome is obtained from our peers
338        loop {
339            tokio::select! {
340                ordered_unit = ordered_unit_receiver.recv() => {
341                    let ordered_unit = ordered_unit.with_context(|| format!("Alepbft task exited prematurely. session_idx: {session_index}, item_idx: {item_index}")) ;
342                    let ordered_unit = match ordered_unit {
343                        Ok(o) => o,
344                        Err(err) => {
345                            // Chances are that alephbft is gone, because everything is shutting down
346                            //
347                            // If that's the case, yielding will simply not return, as our task
348                            // will not get executed anymore. This saves us from returning some "canceled"
349                            // state upwards, without reporting an error that isn't caused by this
350                            // task.
351                            while self.task_group.is_shutting_down() {
352                                info!(target: LOG_CONSENSUS, ?session_index, "Shutdown detected");
353                                sleep(Duration::from_millis(100)).await;
354                            }
355                            // Otherwise, just return the error upwards
356                            return Err(err);
357                        },
358                    };
359
360                    if ordered_unit.round >= self.cfg.consensus.broadcast_rounds_per_session {
361                        info!(
362                            target: LOG_CONSENSUS,
363                            ?session_index,
364                            "Reached Aleph BFT round limit, stopping item collection"
365                        );
366                        break;
367                    }
368
369                    if let Some(UnitData::Batch(bytes)) = ordered_unit.data {
370                        if ordered_unit.creator == self.identity() {
371                            match timestamp_receiver.try_recv() {
372                                Ok(timestamp) => {
373                                    let latency = match *self.ord_latency_sender.borrow() {
374                                        Some(latency) => (9 * latency +  timestamp.elapsed()) / 10,
375                                        None => timestamp.elapsed()
376                                    };
377
378                                    self.ord_latency_sender.send_replace(Some(latency));
379
380                                    CONSENSUS_ORDERING_LATENCY_SECONDS.observe(timestamp.elapsed().as_secs_f64());
381                                }
382                                Err(err) => {
383                                    debug!(target: LOG_CONSENSUS, err = %err.fmt_compact(), "Missing submission timestamp. This is normal in recovery");
384                                }
385                            }
386                        }
387
388                        match Vec::<ConsensusItem>::consensus_decode_whole(&bytes, &self.decoders()) {
389                            Ok(items) => {
390                                for item in items {
391                                    if let Ok(()) = self.process_consensus_item(
392                                        session_index,
393                                        item_index,
394                                        item.clone(),
395                                        ordered_unit.creator
396                                    ).await {
397                                        item_index += 1;
398                                    }
399                                }
400                            }
401                            Err(err) => {
402                                warn!(
403                                    target: LOG_CONSENSUS,
404                                    %session_index,
405                                    peer = %ordered_unit.creator,
406                                    err = %err.fmt_compact(),
407                                    "Failed to decode consensus items from peer"
408                                );
409                            }
410                        }
411                    }
412                },
413                signed_session_outcome = &mut request_signed_session_outcome => {
414                    info!(
415                        target: LOG_CONSENSUS,
416                        ?session_index,
417                        "Recovered signed session outcome from peers while collecting signatures"
418                    );
419
420                    let pending_accepted_items = self.pending_accepted_items().await;
421
422                    // this panics if we have more accepted items than the signed session outcome
423                    let (processed, unprocessed) = signed_session_outcome
424                        .session_outcome
425                        .items
426                        .split_at(pending_accepted_items.len());
427
428                    info!(
429                        target: LOG_CONSENSUS,
430                        ?session_index,
431                        processed = %processed.len(),
432                        unprocessed = %unprocessed.len(),
433                        "Processing remaining items..."
434                    );
435
436                    assert!(
437                        processed.iter().eq(pending_accepted_items.iter()),
438                        "Consensus Failure: pending accepted items disagree with federation consensus"
439                    );
440
441                    for (accepted_item, item_index) in unprocessed.iter().zip(processed.len()..) {
442                        if let Err(err) = self.process_consensus_item(
443                            session_index,
444                            item_index as u64,
445                            accepted_item.item.clone(),
446                            accepted_item.peer
447                        ).await {
448                            panic!(
449                                "Consensus Failure: rejected item accepted by federation consensus: {accepted_item:?}, items: {}+{}, session_idx: {session_index}, item_idx: {item_index}, err: {err}",
450                                processed.len(),
451                                unprocessed.len(),
452                            );
453                        }
454                    }
455
456                    return Ok(signed_session_outcome);
457                }
458            }
459        }
460
461        let items = self.pending_accepted_items().await;
462
463        assert_eq!(item_index, items.len() as u64);
464
465        info!(target: LOG_CONSENSUS, ?session_index, ?item_index, "Processed all items for session");
466
467        let session_outcome = SessionOutcome { items };
468
469        let header = session_outcome.header(session_index);
470
471        info!(
472            target: LOG_CONSENSUS,
473            ?session_index,
474            header = %hex::encode(header),
475            "Signing session header..."
476        );
477
478        let keychain = Keychain::new(&self.cfg);
479
480        // We send our own signature to the data provider to be submitted to the atomic
481        // broadcast and collected by our peers
482        #[allow(clippy::disallowed_methods)]
483        signature_sender.send(Some(keychain.sign(&header)))?;
484
485        let mut signatures = BTreeMap::new();
486
487        let items_dump = tokio::sync::OnceCell::new();
488
489        // We collect the ordered signatures until we either obtain a threshold
490        // signature or a signed session outcome arrives from our peers
491        while signatures.len() < self.num_peers().threshold() {
492            tokio::select! {
493                ordered_unit = ordered_unit_receiver.recv() => {
494                    let ordered_unit = ordered_unit?;
495
496                    if let Some(UnitData::Signature(signature)) = ordered_unit.data {
497                        info!(
498                            target: LOG_CONSENSUS,
499                            ?session_index,
500                            peer = %ordered_unit.creator,
501                            "Collected signature from peer, verifying..."
502                        );
503
504                        if keychain.verify(&header, &signature, to_node_index(ordered_unit.creator)){
505                            signatures.insert(ordered_unit.creator, signature);
506                        } else {
507                            error!(
508                                target: LOG_CONSENSUS,
509                                ?session_index,
510                                peer = %ordered_unit.creator,
511                                "Consensus Failure: invalid header signature from peer"
512                            );
513
514                            items_dump.get_or_init(|| async {
515                                for (idx, item) in session_outcome.items.iter().enumerate() {
516                                    info!(target: LOG_CONSENSUS, idx, item = %DebugConsensusItemCompact(item), "Item");
517                                }
518                            }).await;
519                        }
520                    }
521                }
522                signed_session_outcome = &mut request_signed_session_outcome => {
523                    info!(
524                        target: LOG_CONSENSUS,
525                        ?session_index,
526                        "Recovered signed session outcome from peers while collecting signatures"
527                    );
528
529
530
531                    assert_eq!(
532                        header,
533                        signed_session_outcome.session_outcome.header(session_index),
534                        "Consensus Failure: header disagrees with federation consensus"
535                    );
536
537                    return Ok(signed_session_outcome);
538                }
539            }
540        }
541
542        info!(
543            target: LOG_CONSENSUS,
544            ?session_index,
545            "Successfully collected threshold of signatures via the atomic broadcast"
546        );
547
548        Ok(SignedSessionOutcome {
549            session_outcome,
550            signatures,
551        })
552    }
553
554    fn decoders(&self) -> ModuleDecoderRegistry {
555        self.modules.decoder_registry()
556    }
557
558    pub async fn pending_accepted_items(&self) -> Vec<AcceptedItem> {
559        self.db
560            .begin_transaction_nc()
561            .await
562            .find_by_prefix(&AcceptedItemPrefix)
563            .await
564            .map(|entry| entry.1)
565            .collect()
566            .await
567    }
568
569    pub async fn complete_session(
570        &self,
571        session_index: u64,
572        signed_session_outcome: SignedSessionOutcome,
573    ) {
574        let mut dbtx = self.db.begin_transaction().await;
575
576        dbtx.remove_by_prefix(&AlephUnitsPrefix).await;
577
578        dbtx.remove_by_prefix(&AcceptedItemPrefix).await;
579
580        if dbtx
581            .insert_entry(
582                &SignedSessionOutcomeKey(session_index),
583                &signed_session_outcome,
584            )
585            .await
586            .is_some()
587        {
588            panic!("We tried to overwrite a signed session outcome");
589        }
590
591        dbtx.commit_tx_result()
592            .await
593            .expect("This is the only place where we write to this key");
594    }
595
596    /// Returns the full path where the database checkpoints are stored.
597    fn db_checkpoints_dir(&self) -> PathBuf {
598        self.data_dir.join(DB_CHECKPOINTS_DIR)
599    }
600
601    /// Creates the directory within the data directory for storing the database
602    /// checkpoints or deletes checkpoints before `current_session` -
603    /// `checkpoint_retention`.
604    fn initialize_checkpoint_directory(&self, current_session: u64) -> anyhow::Result<()> {
605        let checkpoint_dir = self.db_checkpoints_dir();
606
607        if checkpoint_dir.exists() {
608            debug!(
609                target: LOG_CONSENSUS,
610                ?current_session,
611                "Removing database checkpoints up to `current_session`"
612            );
613
614            for checkpoint in fs::read_dir(checkpoint_dir)?.flatten() {
615                // Validate that the directory is a session index
616                if let Ok(file_name) = checkpoint.file_name().into_string()
617                    && let Ok(session) = file_name.parse::<u64>()
618                    && current_session >= self.db_checkpoint_retention
619                    && session < current_session - self.db_checkpoint_retention
620                {
621                    fs::remove_dir_all(checkpoint.path())?;
622                }
623            }
624        } else {
625            fs::create_dir_all(&checkpoint_dir)?;
626        }
627
628        Ok(())
629    }
630
631    /// Creates a backup of the database in the checkpoint directory. These
632    /// checkpoints can be used to restore the database in case the
633    /// federation falls out of consensus (recommended for experts only).
634    fn checkpoint_database(&self, session_index: u64) {
635        // If `checkpoint_retention` has been turned off, don't checkpoint the database
636        // at all.
637        if self.db_checkpoint_retention == 0 {
638            return;
639        }
640
641        let checkpoint_dir = self.db_checkpoints_dir();
642        let session_checkpoint_dir = checkpoint_dir.join(format!("{session_index}"));
643
644        {
645            let _timing /* logs on drop */ = timing::TimeReporter::new("database-checkpoint").level(Level::TRACE);
646            match self.db.checkpoint(&session_checkpoint_dir) {
647                Ok(()) => {
648                    debug!(target: LOG_CONSENSUS, ?session_checkpoint_dir, ?session_index, "Created db checkpoint");
649                }
650                Err(err) => {
651                    warn!(target: LOG_CONSENSUS, ?session_checkpoint_dir, ?session_index, err = %err.fmt_compact_anyhow(), "Could not create db checkpoint");
652                }
653            }
654        }
655
656        {
657            // Check if any old checkpoint need to be cleaned up
658            let _timing /* logs on drop */ = timing::TimeReporter::new("remove-database-checkpoint").level(Level::TRACE);
659            if let Err(err) = self.delete_old_database_checkpoint(session_index, &checkpoint_dir) {
660                warn!(target: LOG_CONSENSUS, err = %err.fmt_compact_anyhow(), "Could not delete old checkpoints");
661            }
662        }
663    }
664
665    /// Deletes the database checkpoint directory equal to `session_index` -
666    /// `checkpoint_retention`
667    fn delete_old_database_checkpoint(
668        &self,
669        session_index: u64,
670        checkpoint_dir: &Path,
671    ) -> anyhow::Result<()> {
672        if self.db_checkpoint_retention > session_index {
673            return Ok(());
674        }
675
676        let delete_session_index = session_index - self.db_checkpoint_retention;
677        let checkpoint_to_delete = checkpoint_dir.join(delete_session_index.to_string());
678        if checkpoint_to_delete.exists() {
679            fs::remove_dir_all(checkpoint_to_delete)?;
680        }
681
682        Ok(())
683    }
684
685    #[instrument(target = LOG_CONSENSUS, skip(self, item), level = "info")]
686    pub async fn process_consensus_item(
687        &self,
688        session_index: u64,
689        item_index: u64,
690        item: ConsensusItem,
691        peer: PeerId,
692    ) -> anyhow::Result<()> {
693        let _timing /* logs on drop */ = timing::TimeReporter::new("process_consensus_item").level(Level::TRACE);
694
695        let timing_prom = CONSENSUS_ITEM_PROCESSING_DURATION_SECONDS
696            .with_label_values(&[&peer.to_usize().to_string()])
697            .start_timer();
698
699        trace!(
700            target: LOG_CONSENSUS,
701            %peer,
702            item = ?DebugConsensusItem(&item),
703            "Processing consensus item"
704        );
705
706        self.ci_status_senders
707            .get(&peer)
708            .expect("No ci status sender for peer")
709            .send_replace(Some(session_index));
710
711        CONSENSUS_PEER_CONTRIBUTION_SESSION_IDX
712            .with_label_values(&[
713                &self.cfg.local.identity.to_usize().to_string(),
714                &peer.to_usize().to_string(),
715            ])
716            .set(session_index as i64);
717
718        let mut dbtx = self.db.begin_transaction().await;
719
720        dbtx.ignore_uncommitted();
721
722        // When we recover from a mid-session crash aleph bft will replay the units that
723        // were already processed before the crash. We therefore skip all consensus
724        // items until we have seen every previously accepted items again.
725        if let Some(existing_item) = dbtx
726            .get_value(&AcceptedItemKey(item_index.to_owned()))
727            .await
728        {
729            if existing_item.item == item && existing_item.peer == peer {
730                return Ok(());
731            }
732
733            bail!(
734                "Item was discarded previously: existing: {existing_item:?} {}, current: {item:?}, {peer}",
735                existing_item.peer
736            );
737        }
738
739        self.process_consensus_item_with_db_transaction(&mut dbtx.to_ref_nc(), item.clone(), peer)
740            .await
741            .inspect_err(|err| {
742                // Rejected items are very common, so only trace level
743                trace!(
744                    target: LOG_CONSENSUS,
745                    %peer,
746                    item = ?DebugConsensusItem(&item),
747                    err = %err.fmt_compact_anyhow(),
748                    "Rejected consensus item"
749                );
750            })?;
751
752        // After this point we have to commit the database transaction since the
753        // item has been fully processed without errors
754        dbtx.warn_uncommitted();
755
756        dbtx.insert_entry(
757            &AcceptedItemKey(item_index),
758            &AcceptedItem {
759                item: item.clone(),
760                peer,
761            },
762        )
763        .await;
764
765        debug!(
766            target: LOG_CONSENSUS,
767            %peer,
768            item = ?DebugConsensusItem(&item),
769            "Processed consensus item"
770        );
771        let mut audit = Audit::default();
772
773        for (module_instance_id, kind, module) in self.modules.iter_modules() {
774            let _module_audit_timing =
775                TimeReporter::new(format!("audit module {module_instance_id}")).level(Level::TRACE);
776
777            let timing_prom = CONSENSUS_ITEM_PROCESSING_MODULE_AUDIT_DURATION_SECONDS
778                .with_label_values(&[
779                    MODULE_INSTANCE_ID_GLOBAL.to_string().as_str(),
780                    kind.as_str(),
781                ])
782                .start_timer();
783
784            module
785                .audit(
786                    &mut dbtx
787                        .to_ref_with_prefix_module_id(module_instance_id)
788                        .0
789                        .into_nc(),
790                    &mut audit,
791                    module_instance_id,
792                )
793                .await;
794
795            timing_prom.observe_duration();
796        }
797
798        assert!(
799            audit
800                .net_assets()
801                .expect("Overflow while checking balance sheet")
802                .milli_sat
803                >= 0,
804            "Balance sheet of the fed has gone negative, this should never happen! {audit}"
805        );
806
807        dbtx.commit_tx_result()
808            .await
809            .expect("Committing consensus epoch failed");
810
811        CONSENSUS_ITEMS_PROCESSED_TOTAL
812            .with_label_values(&[&peer.to_usize().to_string()])
813            .inc();
814
815        timing_prom.observe_duration();
816
817        Ok(())
818    }
819
820    async fn process_consensus_item_with_db_transaction(
821        &self,
822        dbtx: &mut DatabaseTransaction<'_>,
823        consensus_item: ConsensusItem,
824        peer_id: PeerId,
825    ) -> anyhow::Result<()> {
826        // We rely on decoding rejecting any unknown module instance ids to avoid
827        // peer-triggered panic here
828        self.decoders().assert_reject_mode();
829
830        match consensus_item {
831            ConsensusItem::Module(module_item) => {
832                let instance_id = module_item.module_instance_id();
833
834                let module_dbtx = &mut dbtx.to_ref_with_prefix_module_id(instance_id).0;
835
836                self.modules
837                    .get_expect(instance_id)
838                    .process_consensus_item(module_dbtx, &module_item, peer_id)
839                    .await
840            }
841            ConsensusItem::Transaction(transaction) => {
842                let txid = transaction.tx_hash();
843                if dbtx
844                    .get_value(&AcceptedTransactionKey(txid))
845                    .await
846                    .is_some()
847                {
848                    debug!(
849                        target: LOG_CONSENSUS,
850                        %txid,
851                        "Transaction already accepted"
852                    );
853                    bail!("Transaction is already accepted");
854                }
855
856                let modules_ids = transaction
857                    .outputs
858                    .iter()
859                    .map(DynOutput::module_instance_id)
860                    .collect::<Vec<_>>();
861
862                process_transaction_with_dbtx(
863                    self.modules.clone(),
864                    dbtx,
865                    &transaction,
866                    self.cfg.consensus.version,
867                    TxProcessingMode::Consensus,
868                )
869                .await
870                .map_err(|error| anyhow!(error.to_string()))?;
871
872                debug!(target: LOG_CONSENSUS, %txid,  "Transaction accepted");
873                dbtx.insert_entry(&AcceptedTransactionKey(txid), &modules_ids)
874                    .await;
875
876                Ok(())
877            }
878            ConsensusItem::Default { variant, .. } => {
879                warn!(
880                    target: LOG_CONSENSUS,
881                    "Minor consensus version mismatch: unexpected consensus item type: {variant}"
882                );
883
884                panic!("Unexpected consensus item type: {variant}")
885            }
886        }
887    }
888
889    async fn request_signed_session_outcome(
890        &self,
891        federation_api: &DynGlobalApi,
892        index: u64,
893    ) -> SignedSessionOutcome {
894        let decoders = self.decoders();
895        let keychain = Keychain::new(&self.cfg);
896        let threshold = self.num_peers().threshold();
897
898        let filter_map = move |response: SerdeModuleEncoding<SignedSessionOutcome>| {
899            let signed_session_outcome = response
900                .try_into_inner(&decoders)
901                .map_err(|x| PeerError::ResponseDeserialization(x.into()))?;
902            let header = signed_session_outcome.session_outcome.header(index);
903            if signed_session_outcome.signatures.len() == threshold
904                && signed_session_outcome
905                    .signatures
906                    .iter()
907                    .all(|(peer_id, sig)| keychain.verify(&header, sig, to_node_index(*peer_id)))
908            {
909                Ok(signed_session_outcome)
910            } else {
911                Err(PeerError::InvalidResponse(anyhow!("Invalid signatures")))
912            }
913        };
914
915        let mut backoff = fedimint_core::util::backoff_util::api_networking_backoff();
916        loop {
917            let result = federation_api
918                .request_with_strategy(
919                    FilterMap::new(filter_map.clone()),
920                    AWAIT_SIGNED_SESSION_OUTCOME_ENDPOINT.to_string(),
921                    ApiRequestErased::new(index),
922                )
923                .await;
924
925            match result {
926                Ok(signed_session_outcome) => return signed_session_outcome,
927                Err(error) => {
928                    error.report_if_unusual("Requesting Session Outcome");
929                }
930            }
931
932            sleep(backoff.next().expect("infinite retries")).await;
933        }
934    }
935
936    /// Returns the number of sessions already saved in the database. This count
937    /// **does not** include the currently running session.
938    async fn get_finished_session_count(&self) -> u64 {
939        get_finished_session_count_static(&mut self.db.begin_transaction_nc().await).await
940    }
941}
942
943pub async fn get_finished_session_count_static(dbtx: &mut DatabaseTransaction<'_>) -> u64 {
944    dbtx.find_by_prefix_sorted_descending(&SignedSessionOutcomePrefix)
945        .await
946        .next()
947        .await
948        .map_or(0, |entry| (entry.0.0) + 1)
949}