1use super::{
11 metrics, scheme,
12 types::{
13 Ack, Activity, Chunk, ChunkSigner, ChunkVerifier, Context, Error, Lock, Node, Parent,
14 Proposal, SequencersProvider,
15 },
16 AckManager, Config, TipManager,
17};
18use crate::{
19 types::{Epoch, EpochDelta, Height, HeightDelta},
20 Automaton, Monitor, Relay, Reporter,
21};
22use commonware_codec::Encode;
23use commonware_cryptography::{
24 certificate::{Provider, Scheme},
25 Digest, PublicKey, Signer,
26};
27use commonware_macros::select_loop;
28use commonware_p2p::{
29 utils::codec::{wrap, WrappedSender},
30 Receiver, Recipients, Sender,
31};
32use commonware_parallel::Strategy;
33use commonware_runtime::{
34 buffer::paged::CacheRef,
35 spawn_cell,
36 telemetry::metrics::{histogram, status::Status, GaugeExt},
37 BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner, Storage,
38};
39use commonware_storage::journal::segmented::variable::{Config as JournalConfig, Journal};
40use commonware_utils::{channel::oneshot, futures::Pool as FuturesPool, ordered::Quorum};
41use futures::{
42 future::{self, Either},
43 pin_mut, StreamExt,
44};
45use rand_core::CryptoRngCore;
46use std::{
47 collections::BTreeMap,
48 num::{NonZeroU64, NonZeroUsize},
49 time::{Duration, SystemTime},
50};
51use tracing::{debug, error, info, warn};
52
53struct Verify<C: PublicKey, D: Digest> {
55 timer: histogram::Timer,
56 context: Context<C>,
57 payload: D,
58 result: Result<bool, Error>,
59}
60
61pub struct Engine<
63 E: BufferPooler + Clock + Spawner + CryptoRngCore + Storage + Metrics,
64 C: Signer,
65 S: SequencersProvider<PublicKey = C::PublicKey>,
66 P: Provider<Scope = Epoch, Scheme: scheme::Scheme<C::PublicKey, D>>,
67 D: Digest,
68 A: Automaton<Context = Context<C::PublicKey>, Digest = D>,
69 R: Relay<Digest = D, PublicKey = C::PublicKey, Plan = ()>,
70 Z: Reporter<Activity = Activity<C::PublicKey, P::Scheme, D>>,
71 M: Monitor<Index = Epoch>,
72 T: Strategy,
73> {
74 context: ContextCell<E>,
78 sequencer_signer: Option<ChunkSigner<C>>,
79 sequencers_provider: S,
80 validators_provider: P,
81 automaton: A,
82 relay: R,
83 monitor: M,
84 reporter: Z,
85 strategy: T,
86
87 chunk_verifier: ChunkVerifier,
93
94 rebroadcast_timeout: Duration,
100 rebroadcast_deadline: Option<SystemTime>,
101
102 epoch_bounds: (EpochDelta, EpochDelta),
114
115 height_bound: HeightDelta,
121
122 pending_verifies: FuturesPool<Verify<C::PublicKey, D>>,
134
135 journal_heights_per_section: NonZeroU64,
141
142 journal_replay_buffer: NonZeroUsize,
144
145 journal_write_buffer: NonZeroUsize,
147
148 journal_name_prefix: String,
151
152 journal_compression: Option<u8>,
154
155 journal_page_cache: CacheRef,
157
158 #[allow(clippy::type_complexity)]
160 journals: BTreeMap<C::PublicKey, Journal<E, Node<C::PublicKey, P::Scheme, D>>>,
161
162 tip_manager: TipManager<C::PublicKey, P::Scheme, D>,
171
172 ack_manager: AckManager<C::PublicKey, P::Scheme, D>,
175
176 epoch: Epoch,
178
179 priority_proposals: bool,
185
186 priority_acks: bool,
188
189 metrics: metrics::Metrics<C::PublicKey>,
195
196 propose_timer: Option<histogram::Timer>,
198}
199
200impl<
201 E: BufferPooler + Clock + Spawner + CryptoRngCore + Storage + Metrics,
202 C: Signer,
203 S: SequencersProvider<PublicKey = C::PublicKey>,
204 P: Provider<Scope = Epoch, Scheme: scheme::Scheme<C::PublicKey, D, PublicKey = C::PublicKey>>,
205 D: Digest,
206 A: Automaton<Context = Context<C::PublicKey>, Digest = D>,
207 R: Relay<Digest = D, PublicKey = C::PublicKey, Plan = ()>,
208 Z: Reporter<Activity = Activity<C::PublicKey, P::Scheme, D>>,
209 M: Monitor<Index = Epoch>,
210 T: Strategy,
211 > Engine<E, C, S, P, D, A, R, Z, M, T>
212{
213 pub fn new(context: E, cfg: Config<C, S, P, D, A, R, Z, M, T>) -> Self {
215 let metrics = metrics::Metrics::init(&context);
216
217 Self {
218 context: ContextCell::new(context),
219 sequencer_signer: cfg.sequencer_signer,
220 sequencers_provider: cfg.sequencers_provider,
221 validators_provider: cfg.validators_provider,
222 automaton: cfg.automaton,
223 relay: cfg.relay,
224 reporter: cfg.reporter,
225 monitor: cfg.monitor,
226 strategy: cfg.strategy,
227 chunk_verifier: cfg.chunk_verifier,
228 rebroadcast_timeout: cfg.rebroadcast_timeout,
229 rebroadcast_deadline: None,
230 epoch_bounds: cfg.epoch_bounds,
231 height_bound: cfg.height_bound,
232 pending_verifies: FuturesPool::default(),
233 journal_heights_per_section: cfg.journal_heights_per_section,
234 journal_replay_buffer: cfg.journal_replay_buffer,
235 journal_write_buffer: cfg.journal_write_buffer,
236 journal_name_prefix: cfg.journal_name_prefix,
237 journal_compression: cfg.journal_compression,
238 journal_page_cache: cfg.journal_page_cache,
239 journals: BTreeMap::new(),
240 tip_manager: TipManager::<C::PublicKey, P::Scheme, D>::new(),
241 ack_manager: AckManager::<C::PublicKey, P::Scheme, D>::new(),
242 epoch: Epoch::zero(),
243 priority_proposals: cfg.priority_proposals,
244 priority_acks: cfg.priority_acks,
245 metrics,
246 propose_timer: None,
247 }
248 }
249
250 pub fn start(
261 mut self,
262 chunk_network: (
263 impl Sender<PublicKey = C::PublicKey>,
264 impl Receiver<PublicKey = C::PublicKey>,
265 ),
266 ack_network: (
267 impl Sender<PublicKey = C::PublicKey>,
268 impl Receiver<PublicKey = C::PublicKey>,
269 ),
270 ) -> Handle<()> {
271 spawn_cell!(self.context, self.run(chunk_network, ack_network))
272 }
273
274 async fn run(
276 mut self,
277 chunk_network: (
278 impl Sender<PublicKey = C::PublicKey>,
279 impl Receiver<PublicKey = C::PublicKey>,
280 ),
281 ack_network: (
282 impl Sender<PublicKey = C::PublicKey>,
283 impl Receiver<PublicKey = C::PublicKey>,
284 ),
285 ) {
286 let mut node_sender = chunk_network.0;
287 let mut node_receiver = chunk_network.1;
288 let (mut ack_sender, mut ack_receiver) = wrap(
289 (),
290 self.context.network_buffer_pool().clone(),
291 ack_network.0,
292 ack_network.1,
293 );
294
295 let mut pending: Option<(Context<C::PublicKey>, oneshot::Receiver<D>)> = None;
297
298 let (latest, mut epoch_updates) = self.monitor.subscribe().await;
300 self.epoch = latest;
301
302 if let Some(ref signer) = self.sequencer_signer {
305 self.journal_prepare(&signer.public_key()).await;
306 if let Err(err) = self.rebroadcast(&mut node_sender) {
307 info!(?err, "initial rebroadcast failed");
309 }
310 }
311
312 select_loop! {
313 self.context,
314 on_start => {
315 if pending.is_none() {
317 if let Some(context) = self.should_propose() {
318 let receiver = self.automaton.propose(context.clone()).await;
319 pending = Some((context, receiver));
320 }
321 }
322
323 let rebroadcast = match self.rebroadcast_deadline {
327 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
328 None => Either::Right(future::pending()),
329 };
330 let propose = match &mut pending {
331 Some((_context, receiver)) => Either::Left(receiver),
332 None => Either::Right(futures::future::pending()),
333 };
334 },
335 on_stopped => {
336 debug!("shutdown");
337 },
338 Some(epoch) = epoch_updates.recv() else {
340 error!("epoch subscription failed");
341 break;
342 } => {
343 debug!(current = %self.epoch, new = %epoch, "refresh epoch");
345 assert!(epoch >= self.epoch);
346 self.epoch = epoch;
347 continue;
348 },
349
350 _ = rebroadcast => {
352 if let Some(ref signer) = self.sequencer_signer {
353 debug!(epoch = %self.epoch, sender = ?signer.public_key(), "rebroadcast");
354 if let Err(err) = self.rebroadcast(&mut node_sender) {
355 info!(?err, "rebroadcast failed");
356 continue;
357 }
358 }
359 },
360
361 receiver = propose => {
363 let (context, _) = pending.take().unwrap();
365 debug!(height = %context.height, "propose");
366
367 let Ok(payload) = receiver else {
369 warn!(?context, "automaton dropped proposal");
370 continue;
371 };
372
373 if let Err(err) = self
375 .propose(context.clone(), payload, &mut node_sender)
376 .await
377 {
378 warn!(?err, ?context, "propose new failed");
379 continue;
380 }
381 },
382
383 msg = node_receiver.recv() => {
385 let (sender, msg) = match msg {
387 Ok(r) => r,
388 Err(err) => {
389 error!(?err, "node receiver failed");
390 break;
391 }
392 };
393 let mut guard = self.metrics.nodes.guard(Status::Invalid);
394
395 let node = match Node::read_staged(&mut msg.as_ref(), &self.validators_provider) {
397 Ok(node) => node,
398 Err(err) => {
399 debug!(?err, ?sender, "node decode failed");
400 continue;
401 }
402 };
403 let result = match self.validate_node(&node, &sender) {
404 Ok(result) => result,
405 Err(err) => {
406 debug!(?err, ?sender, "node validate failed");
407 continue;
408 }
409 };
410
411 self.journal_prepare(&sender).await;
413
414 if let Some(parent_chunk) = result {
416 let parent = node.parent.as_ref().unwrap();
417 self.handle_certificate(
418 &parent_chunk,
419 parent.epoch,
420 parent.certificate.clone(),
421 );
422 }
423
424 self.handle_node(&node).await;
429 debug!(?sender, height = %node.chunk.height, "node");
430 guard.set(Status::Success);
431 },
432
433 msg = ack_receiver.recv() => {
435 let (sender, msg) = match msg {
437 Ok(r) => r,
438 Err(err) => {
439 warn!(?err, "ack receiver failed");
440 break;
441 }
442 };
443 let mut guard = self.metrics.acks.guard(Status::Invalid);
444 let ack = match msg {
445 Ok(ack) => ack,
446 Err(err) => {
447 debug!(?err, ?sender, "ack decode failed");
448 continue;
449 }
450 };
451 if let Err(err) = self.validate_ack(&ack, &sender) {
452 debug!(?err, ?sender, "ack validate failed");
453 continue;
454 };
455 if let Err(err) = self.handle_ack(&ack) {
456 debug!(?err, ?sender, "ack handle failed");
457 guard.set(Status::Failure);
458 continue;
459 }
460 debug!(?sender, epoch = %ack.epoch, sequencer = ?ack.chunk.sequencer, height = %ack.chunk.height, "ack");
461 guard.set(Status::Success);
462 },
463
464 verify = self.pending_verifies.next_completed() => {
466 let Verify {
467 timer,
468 context,
469 payload,
470 result,
471 } = verify;
472 match result {
473 Err(err) => {
474 warn!(?err, ?context, "verified returned error");
475 self.metrics.verify.inc(Status::Dropped);
476 }
477 Ok(false) => {
478 timer.observe(self.context.as_ref());
479 debug!(?context, "verified was false");
480 self.metrics.verify.inc(Status::Failure);
481 }
482 Ok(true) => {
483 timer.observe(self.context.as_ref());
484 debug!(?context, "verified");
485 self.metrics.verify.inc(Status::Success);
486 if let Err(err) = self
487 .handle_app_verified(&context, &payload, &mut ack_sender)
488 .await
489 {
490 debug!(?err, ?context, ?payload, "verified handle failed");
491 }
492 }
493 }
494 },
495 }
496
497 self.pending_verifies.cancel_all();
499 while let Some((_, journal)) = self.journals.pop_first() {
500 journal.sync_all().await.expect("unable to sync journal");
501 }
502 }
503
504 async fn handle_app_verified(
513 &mut self,
514 context: &Context<C::PublicKey>,
515 payload: &D,
516 ack_sender: &mut WrappedSender<
517 impl Sender<PublicKey = C::PublicKey>,
518 Ack<C::PublicKey, P::Scheme, D>,
519 >,
520 ) -> Result<(), Error> {
521 let Some(tip) = self.tip_manager.get(&context.sequencer) else {
523 return Err(Error::AppVerifiedNoTip);
524 };
525
526 if tip.chunk.height != context.height {
528 return Err(Error::AppVerifiedHeightMismatch);
529 }
530
531 if tip.chunk.payload != *payload {
533 return Err(Error::AppVerifiedPayloadMismatch);
534 }
535
536 self.reporter.report(Activity::Tip(Proposal::new(
538 tip.chunk.clone(),
539 tip.signature.clone(),
540 )));
541
542 let Some(scheme) = self.validators_provider.scoped(self.epoch) else {
544 return Err(Error::UnknownScheme(self.epoch));
545 };
546
547 let Some(ack) = Ack::sign(scheme.as_ref(), tip.chunk.clone(), self.epoch) else {
549 return Err(Error::NotSigner(self.epoch));
550 };
551
552 self.journal_sync(&context.sequencer, context.height).await;
555
556 let recipients = {
559 let validators = scheme.participants();
560 let mut recipients = validators.iter().cloned().collect::<Vec<_>>();
561 if !validators.iter().any(|v| v == &tip.chunk.sequencer) {
562 recipients.push(tip.chunk.sequencer.clone());
563 }
564 recipients
565 };
566
567 self.handle_ack(&ack)?;
569
570 ack_sender.send(Recipients::Some(recipients), ack, self.priority_acks);
572
573 Ok(())
574 }
575
576 fn handle_certificate(
582 &mut self,
583 chunk: &Chunk<C::PublicKey, D>,
584 epoch: Epoch,
585 certificate: <P::Scheme as Scheme>::Certificate,
586 ) {
587 if !self.ack_manager.add_certificate(
589 &chunk.sequencer,
590 chunk.height,
591 epoch,
592 certificate.clone(),
593 ) {
594 return;
595 }
596
597 if let Some(ref signer) = self.sequencer_signer {
599 if chunk.sequencer == signer.public_key() {
600 if let Some(timer) = self.propose_timer.take() {
601 timer.observe(self.context.as_ref());
602 }
603 }
604 }
605
606 self.reporter
608 .report(Activity::Lock(Lock::new(chunk.clone(), epoch, certificate)));
609 }
610
611 fn handle_ack(&mut self, ack: &Ack<C::PublicKey, P::Scheme, D>) -> Result<(), Error> {
616 let Some(scheme) = self.validators_provider.scoped(ack.epoch) else {
618 return Err(Error::UnknownScheme(ack.epoch));
619 };
620
621 if let Some(certificate) = self
623 .ack_manager
624 .add_ack(ack, scheme.as_ref(), &self.strategy)
625 {
626 debug!(epoch = %ack.epoch, sequencer = ?ack.chunk.sequencer, height = %ack.chunk.height, "recovered certificate");
627 self.metrics.certificates.inc();
628 self.handle_certificate(&ack.chunk, ack.epoch, certificate);
629 }
630
631 Ok(())
632 }
633
634 async fn handle_node(&mut self, node: &Node<C::PublicKey, P::Scheme, D>) {
638 let is_new = self.tip_manager.put(node);
640
641 if is_new {
643 let _ = self
645 .metrics
646 .sequencer_heights
647 .get_or_create_by(&node.chunk.sequencer)
648 .try_set(node.chunk.height.get());
649
650 self.journal_append(node.clone()).await;
654 self.journal_sync(&node.chunk.sequencer, node.chunk.height)
655 .await;
656 }
657
658 let context = Context {
660 sequencer: node.chunk.sequencer.clone(),
661 height: node.chunk.height,
662 };
663 let payload = node.chunk.payload;
664 let mut automaton = self.automaton.clone();
665 let timer = self.metrics.verify_duration.timer(self.context.as_ref());
666 self.pending_verifies.push(async move {
667 let receiver = automaton.verify(context.clone(), payload).await;
668 let result = receiver.await.map_err(Error::AppVerifyCanceled);
669 Verify {
670 timer,
671 context,
672 payload,
673 result,
674 }
675 });
676 }
677
678 fn should_propose(&self) -> Option<Context<C::PublicKey>> {
686 let me = self.sequencer_signer.as_ref()?.public_key();
688
689 self.sequencers_provider
691 .sequencers(self.epoch)?
692 .position(&me)?;
693
694 match self.tip_manager.get(&me) {
696 None => Some(Context {
697 sequencer: me,
698 height: Height::zero(),
699 }),
700 Some(tip) => self
701 .ack_manager
702 .get_certificate(&me, tip.chunk.height)
703 .map(|_| Context {
704 sequencer: me,
705 height: tip.chunk.height.next(),
706 }),
707 }
708 }
709
710 async fn propose(
715 &mut self,
716 context: Context<C::PublicKey>,
717 payload: D,
718 node_sender: &mut impl Sender<PublicKey = C::PublicKey>,
719 ) -> Result<(), Error> {
720 let mut guard = self.metrics.propose.guard(Status::Dropped);
721 let signer = self
722 .sequencer_signer
723 .as_mut()
724 .ok_or(Error::IAmNotASequencer(self.epoch))?;
725 let me = signer.public_key();
726
727 if context.sequencer != me {
729 return Err(Error::ContextSequencer);
730 }
731
732 self.sequencers_provider
734 .sequencers(self.epoch)
735 .and_then(|s| s.position(&me))
736 .ok_or(Error::IAmNotASequencer(self.epoch))?;
737
738 let mut height = Height::zero();
740 let mut parent = None;
741 if let Some(tip) = self.tip_manager.get(&me) {
742 let Some((epoch, certificate)) =
744 self.ack_manager.get_certificate(&me, tip.chunk.height)
745 else {
746 return Err(Error::MissingCertificate);
747 };
748
749 height = tip.chunk.height.next();
751 parent = Some(Parent::new(tip.chunk.payload, epoch, certificate.clone()));
752 }
753
754 if context.height != height {
756 return Err(Error::ContextHeight);
757 }
758
759 let node = Node::sign(signer, height, payload, parent);
761
762 self.handle_node(&node).await;
764
765 self.journal_sync(&me, height).await;
768
769 self.propose_timer = Some(self.metrics.e2e_duration.timer(self.context.as_ref()));
771
772 if let Err(err) = self.broadcast(node, node_sender, self.epoch) {
774 guard.set(Status::Failure);
775 return Err(err);
776 };
777
778 guard.set(Status::Success);
780 Ok(())
781 }
782
783 fn rebroadcast(
790 &mut self,
791 node_sender: &mut impl Sender<PublicKey = C::PublicKey>,
792 ) -> Result<(), Error> {
793 let mut guard = self.metrics.rebroadcast.guard(Status::Dropped);
794
795 self.rebroadcast_deadline = None;
797
798 let signer = self
800 .sequencer_signer
801 .as_ref()
802 .ok_or(Error::IAmNotASequencer(self.epoch))?;
803 let me = signer.public_key();
804
805 self.sequencers_provider
807 .sequencers(self.epoch)
808 .and_then(|s| s.position(&me))
809 .ok_or(Error::IAmNotASequencer(self.epoch))?;
810
811 let Some(tip) = self.tip_manager.get(&me) else {
813 return Err(Error::NothingToRebroadcast);
814 };
815
816 if self
818 .ack_manager
819 .get_certificate(&me, tip.chunk.height)
820 .is_some()
821 {
822 return Err(Error::AlreadyCertified);
823 }
824
825 guard.set(Status::Failure);
827 self.broadcast(tip, node_sender, self.epoch)?;
828 guard.set(Status::Success);
829 Ok(())
830 }
831
832 fn broadcast(
834 &mut self,
835 node: Node<C::PublicKey, P::Scheme, D>,
836 node_sender: &mut impl Sender<PublicKey = C::PublicKey>,
837 epoch: Epoch,
838 ) -> Result<(), Error> {
839 let Some(scheme) = self.validators_provider.scoped(epoch) else {
841 return Err(Error::UnknownScheme(epoch));
842 };
843 let validators = scheme.participants();
844
845 let _ = self.relay.broadcast(node.chunk.payload, ());
847
848 node_sender.send(
850 Recipients::Some(validators.iter().cloned().collect()),
851 node.encode(),
852 self.priority_proposals,
853 );
854
855 self.rebroadcast_deadline = Some(self.context.current() + self.rebroadcast_timeout);
857
858 Ok(())
859 }
860
861 fn validate_node(
871 &mut self,
872 node: &Node<C::PublicKey, P::Scheme, D>,
873 sender: &C::PublicKey,
874 ) -> Result<Option<Chunk<C::PublicKey, D>>, Error> {
875 if node.chunk.sequencer != *sender {
877 return Err(Error::PeerMismatch);
878 }
879
880 if let Some(tip) = self.tip_manager.get(sender) {
883 if tip == *node {
884 return Ok(None);
885 }
886 }
887
888 self.validate_chunk(&node.chunk, self.epoch)?;
890
891 node.verify(
893 self.context.as_mut(),
894 &self.chunk_verifier,
895 &self.validators_provider,
896 &self.strategy,
897 )
898 }
899
900 fn validate_ack(
905 &mut self,
906 ack: &Ack<C::PublicKey, P::Scheme, D>,
907 sender: &<P::Scheme as Scheme>::PublicKey,
908 ) -> Result<(), Error> {
909 self.validate_chunk(&ack.chunk, ack.epoch)?;
911
912 let Some(scheme) = self.validators_provider.scoped(ack.epoch) else {
914 return Err(Error::UnknownScheme(ack.epoch));
915 };
916
917 let participants = scheme.participants();
919 let Some(index) = participants.index(sender) else {
920 return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
921 };
922 if index != ack.attestation.signer {
923 return Err(Error::PeerMismatch);
924 }
925
926 {
928 let (eb_lo, eb_hi) = self.epoch_bounds;
929 let bound_lo = self.epoch.saturating_sub(eb_lo);
930 let bound_hi = self.epoch.saturating_add(eb_hi);
931 if ack.epoch < bound_lo || ack.epoch > bound_hi {
932 return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
933 }
934 }
935
936 {
938 let bound_lo = self
939 .tip_manager
940 .get(&ack.chunk.sequencer)
941 .map(|t| t.chunk.height)
942 .unwrap_or(Height::zero());
943 let bound_hi = bound_lo.saturating_add(self.height_bound);
944 if ack.chunk.height < bound_lo || ack.chunk.height > bound_hi {
945 return Err(Error::AckHeightOutsideBounds(
946 ack.chunk.height,
947 bound_lo,
948 bound_hi,
949 ));
950 }
951 }
952
953 if !ack.verify(self.context.as_mut(), scheme.as_ref(), &self.strategy) {
955 return Err(Error::InvalidAckSignature);
956 }
957
958 Ok(())
959 }
960
961 fn validate_chunk(&self, chunk: &Chunk<C::PublicKey, D>, epoch: Epoch) -> Result<(), Error> {
966 if self
968 .sequencers_provider
969 .sequencers(epoch)
970 .and_then(|s| s.position(&chunk.sequencer))
971 .is_none()
972 {
973 return Err(Error::UnknownSequencer(epoch, chunk.sequencer.to_string()));
974 }
975
976 if let Some(tip) = self.tip_manager.get(&chunk.sequencer) {
978 match chunk.height.cmp(&tip.chunk.height) {
980 std::cmp::Ordering::Less => {
981 return Err(Error::ChunkHeightTooLow(chunk.height, tip.chunk.height));
982 }
983 std::cmp::Ordering::Equal => {
984 if tip.chunk.payload != chunk.payload {
986 return Err(Error::ChunkMismatch(
987 chunk.sequencer.to_string(),
988 chunk.height,
989 ));
990 }
991 }
992 std::cmp::Ordering::Greater => {}
993 }
994 }
995
996 Ok(())
997 }
998
999 const fn get_journal_section(&self, height: Height) -> u64 {
1005 height.get() / self.journal_heights_per_section.get()
1006 }
1007
1008 async fn journal_prepare(&mut self, sequencer: &C::PublicKey) {
1012 if self.journals.contains_key(sequencer) {
1014 return;
1015 }
1016
1017 let cfg = JournalConfig {
1019 partition: format!("{}{}", &self.journal_name_prefix, sequencer),
1020 compression: self.journal_compression,
1021 codec_config: P::Scheme::certificate_codec_config_unbounded(),
1022 page_cache: self.journal_page_cache.clone(),
1023 write_buffer: self.journal_write_buffer,
1024 };
1025 let journal = Journal::<_, Node<C::PublicKey, P::Scheme, D>>::init(
1026 self.context
1027 .child("journal")
1028 .with_attribute("sequencer", sequencer),
1029 cfg,
1030 )
1031 .await
1032 .expect("unable to init journal");
1033
1034 {
1036 debug!(?sequencer, "journal replay begin");
1037
1038 let stream = journal
1040 .replay(0, 0, self.journal_replay_buffer)
1041 .await
1042 .expect("unable to replay journal");
1043 pin_mut!(stream);
1044
1045 let mut tip: Option<Node<C::PublicKey, P::Scheme, D>> = None;
1048 let mut num_items = 0;
1049 while let Some(msg) = stream.next().await {
1050 let (_, _, _, node) = msg.expect("unable to read from journal");
1051 num_items += 1;
1052 let height = node.chunk.height;
1053 match tip {
1054 None => {
1055 tip = Some(node);
1056 }
1057 Some(ref t) => {
1058 if height > t.chunk.height {
1059 tip = Some(node);
1060 }
1061 }
1062 }
1063 }
1064
1065 if let Some(node) = tip.take() {
1068 let is_new = self.tip_manager.put(&node);
1069 assert!(is_new);
1070 }
1071
1072 debug!(?sequencer, ?num_items, "journal replay end");
1073 }
1074
1075 self.journals.insert(sequencer.clone(), journal);
1077 }
1078
1079 async fn journal_append(&mut self, node: Node<C::PublicKey, P::Scheme, D>) {
1084 let section = self.get_journal_section(node.chunk.height);
1085 self.journals
1086 .get_mut(&node.chunk.sequencer)
1087 .expect("journal does not exist")
1088 .append(section, &node)
1089 .await
1090 .expect("unable to append to journal");
1091 }
1092
1093 async fn journal_sync(&mut self, sequencer: &C::PublicKey, height: Height) {
1095 let section = self.get_journal_section(height);
1096
1097 let journal = self
1099 .journals
1100 .get_mut(sequencer)
1101 .expect("journal does not exist");
1102
1103 journal.sync(section).await.expect("unable to sync journal");
1105
1106 let _ = journal.prune(section).await;
1108 }
1109}