1use super::{
11 metrics, scheme,
12 types::{
13 Ack, Activity, Chunk, ChunkSigner, ChunkVerifier, Context, Error, Lock, Node, Parent,
14 Proposal, SequencersProvider,
15 },
16 AckManager, Config, TipManager,
17};
18use crate::{
19 types::{Epoch, EpochDelta, Height, HeightDelta},
20 Automaton, Monitor, Relay, Reporter,
21};
22use commonware_codec::Encode;
23use commonware_cryptography::{
24 certificate::{Provider, Scheme},
25 Digest, PublicKey, Signer,
26};
27use commonware_macros::select_loop;
28use commonware_p2p::{
29 utils::codec::{wrap, WrappedSender},
30 Receiver, Recipients, Sender,
31};
32use commonware_parallel::Strategy;
33use commonware_runtime::{
34 buffer::paged::CacheRef,
35 spawn_cell,
36 telemetry::metrics::{
37 histogram,
38 status::{CounterExt, GaugeExt, Status},
39 },
40 Clock, ContextCell, Handle, Metrics, Spawner, Storage,
41};
42use commonware_storage::journal::segmented::variable::{Config as JournalConfig, Journal};
43use commonware_utils::{channel::oneshot, futures::Pool as FuturesPool, ordered::Quorum};
44use futures::{
45 future::{self, Either},
46 pin_mut, StreamExt,
47};
48use rand_core::CryptoRngCore;
49use std::{
50 collections::BTreeMap,
51 num::{NonZeroU64, NonZeroUsize},
52 time::{Duration, SystemTime},
53};
54use tracing::{debug, error, info, warn};
55
56struct Verify<C: PublicKey, D: Digest, E: Clock> {
58 timer: histogram::Timer<E>,
59 context: Context<C>,
60 payload: D,
61 result: Result<bool, Error>,
62}
63
64pub struct Engine<
66 E: Clock + Spawner + CryptoRngCore + Storage + Metrics,
67 C: Signer,
68 S: SequencersProvider<PublicKey = C::PublicKey>,
69 P: Provider<Scope = Epoch, Scheme: scheme::Scheme<C::PublicKey, D>>,
70 D: Digest,
71 A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
72 R: Relay<Digest = D>,
73 Z: Reporter<Activity = Activity<C::PublicKey, P::Scheme, D>>,
74 M: Monitor<Index = Epoch>,
75 T: Strategy,
76> {
77 context: ContextCell<E>,
81 sequencer_signer: Option<ChunkSigner<C>>,
82 sequencers_provider: S,
83 validators_provider: P,
84 automaton: A,
85 relay: R,
86 monitor: M,
87 reporter: Z,
88 strategy: T,
89
90 chunk_verifier: ChunkVerifier,
96
97 rebroadcast_timeout: Duration,
103 rebroadcast_deadline: Option<SystemTime>,
104
105 epoch_bounds: (EpochDelta, EpochDelta),
117
118 height_bound: HeightDelta,
124
125 pending_verifies: FuturesPool<Verify<C::PublicKey, D, E>>,
137
138 journal_heights_per_section: NonZeroU64,
144
145 journal_replay_buffer: NonZeroUsize,
147
148 journal_write_buffer: NonZeroUsize,
150
151 journal_name_prefix: String,
154
155 journal_compression: Option<u8>,
157
158 journal_page_cache: CacheRef,
160
161 #[allow(clippy::type_complexity)]
163 journals: BTreeMap<C::PublicKey, Journal<E, Node<C::PublicKey, P::Scheme, D>>>,
164
165 tip_manager: TipManager<C::PublicKey, P::Scheme, D>,
174
175 ack_manager: AckManager<C::PublicKey, P::Scheme, D>,
178
179 epoch: Epoch,
181
182 priority_proposals: bool,
188
189 priority_acks: bool,
191
192 metrics: metrics::Metrics<E>,
198
199 propose_timer: Option<histogram::Timer<E>>,
201}
202
203impl<
204 E: Clock + Spawner + CryptoRngCore + Storage + Metrics,
205 C: Signer,
206 S: SequencersProvider<PublicKey = C::PublicKey>,
207 P: Provider<Scope = Epoch, Scheme: scheme::Scheme<C::PublicKey, D, PublicKey = C::PublicKey>>,
208 D: Digest,
209 A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
210 R: Relay<Digest = D>,
211 Z: Reporter<Activity = Activity<C::PublicKey, P::Scheme, D>>,
212 M: Monitor<Index = Epoch>,
213 T: Strategy,
214 > Engine<E, C, S, P, D, A, R, Z, M, T>
215{
216 pub fn new(context: E, cfg: Config<C, S, P, D, A, R, Z, M, T>) -> Self {
218 let metrics = metrics::Metrics::init(context.clone());
220
221 Self {
222 context: ContextCell::new(context),
223 sequencer_signer: cfg.sequencer_signer,
224 sequencers_provider: cfg.sequencers_provider,
225 validators_provider: cfg.validators_provider,
226 automaton: cfg.automaton,
227 relay: cfg.relay,
228 reporter: cfg.reporter,
229 monitor: cfg.monitor,
230 strategy: cfg.strategy,
231 chunk_verifier: cfg.chunk_verifier,
232 rebroadcast_timeout: cfg.rebroadcast_timeout,
233 rebroadcast_deadline: None,
234 epoch_bounds: cfg.epoch_bounds,
235 height_bound: cfg.height_bound,
236 pending_verifies: FuturesPool::default(),
237 journal_heights_per_section: cfg.journal_heights_per_section,
238 journal_replay_buffer: cfg.journal_replay_buffer,
239 journal_write_buffer: cfg.journal_write_buffer,
240 journal_name_prefix: cfg.journal_name_prefix,
241 journal_compression: cfg.journal_compression,
242 journal_page_cache: cfg.journal_page_cache,
243 journals: BTreeMap::new(),
244 tip_manager: TipManager::<C::PublicKey, P::Scheme, D>::new(),
245 ack_manager: AckManager::<C::PublicKey, P::Scheme, D>::new(),
246 epoch: Epoch::zero(),
247 priority_proposals: cfg.priority_proposals,
248 priority_acks: cfg.priority_acks,
249 metrics,
250 propose_timer: None,
251 }
252 }
253
254 pub fn start(
265 mut self,
266 chunk_network: (
267 impl Sender<PublicKey = C::PublicKey>,
268 impl Receiver<PublicKey = C::PublicKey>,
269 ),
270 ack_network: (
271 impl Sender<PublicKey = C::PublicKey>,
272 impl Receiver<PublicKey = C::PublicKey>,
273 ),
274 ) -> Handle<()> {
275 spawn_cell!(self.context, self.run(chunk_network, ack_network).await)
276 }
277
278 async fn run(
280 mut self,
281 chunk_network: (
282 impl Sender<PublicKey = C::PublicKey>,
283 impl Receiver<PublicKey = C::PublicKey>,
284 ),
285 ack_network: (
286 impl Sender<PublicKey = C::PublicKey>,
287 impl Receiver<PublicKey = C::PublicKey>,
288 ),
289 ) {
290 let mut node_sender = chunk_network.0;
291 let mut node_receiver = chunk_network.1;
292 let (mut ack_sender, mut ack_receiver) = wrap((), ack_network.0, ack_network.1);
293
294 let mut pending: Option<(Context<C::PublicKey>, oneshot::Receiver<D>)> = None;
296
297 let (latest, mut epoch_updates) = self.monitor.subscribe().await;
299 self.epoch = latest;
300
301 if let Some(ref signer) = self.sequencer_signer {
304 self.journal_prepare(&signer.public_key()).await;
305 if let Err(err) = self.rebroadcast(&mut node_sender).await {
306 info!(?err, "initial rebroadcast failed");
308 }
309 }
310
311 select_loop! {
312 self.context,
313 on_start => {
314 if pending.is_none() {
316 if let Some(context) = self.should_propose() {
317 let receiver = self.automaton.propose(context.clone()).await;
318 pending = Some((context, receiver));
319 }
320 }
321
322 let rebroadcast = match self.rebroadcast_deadline {
326 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
327 None => Either::Right(future::pending()),
328 };
329 let propose = match &mut pending {
330 Some((_context, receiver)) => Either::Left(receiver),
331 None => Either::Right(futures::future::pending()),
332 };
333 },
334 on_stopped => {
335 debug!("shutdown");
336 },
337 Some(epoch) = epoch_updates.recv() else {
339 error!("epoch subscription failed");
340 break;
341 } => {
342 debug!(current = %self.epoch, new = %epoch, "refresh epoch");
344 assert!(epoch >= self.epoch);
345 self.epoch = epoch;
346 continue;
347 },
348
349 _ = rebroadcast => {
351 if let Some(ref signer) = self.sequencer_signer {
352 debug!(epoch = %self.epoch, sender = ?signer.public_key(), "rebroadcast");
353 if let Err(err) = self.rebroadcast(&mut node_sender).await {
354 info!(?err, "rebroadcast failed");
355 continue;
356 }
357 }
358 },
359
360 receiver = propose => {
362 let (context, _) = pending.take().unwrap();
364 debug!(height = %context.height, "propose");
365
366 let Ok(payload) = receiver else {
368 warn!(?context, "automaton dropped proposal");
369 continue;
370 };
371
372 if let Err(err) = self
374 .propose(context.clone(), payload, &mut node_sender)
375 .await
376 {
377 warn!(?err, ?context, "propose new failed");
378 continue;
379 }
380 },
381
382 msg = node_receiver.recv() => {
384 let (sender, msg) = match msg {
386 Ok(r) => r,
387 Err(err) => {
388 error!(?err, "node receiver failed");
389 break;
390 }
391 };
392 let mut guard = self.metrics.nodes.guard(Status::Invalid);
393
394 let node = match Node::read_staged(&mut msg.as_ref(), &self.validators_provider) {
396 Ok(node) => node,
397 Err(err) => {
398 debug!(?err, ?sender, "node decode failed");
399 continue;
400 }
401 };
402 let result = match self.validate_node(&node, &sender) {
403 Ok(result) => result,
404 Err(err) => {
405 debug!(?err, ?sender, "node validate failed");
406 continue;
407 }
408 };
409
410 self.journal_prepare(&sender).await;
412
413 if let Some(parent_chunk) = result {
415 let parent = node.parent.as_ref().unwrap();
416 self.handle_certificate(
417 &parent_chunk,
418 parent.epoch,
419 parent.certificate.clone(),
420 )
421 .await;
422 }
423
424 self.handle_node(&node).await;
429 debug!(?sender, height = %node.chunk.height, "node");
430 guard.set(Status::Success);
431 },
432
433 msg = ack_receiver.recv() => {
435 let (sender, msg) = match msg {
437 Ok(r) => r,
438 Err(err) => {
439 warn!(?err, "ack receiver failed");
440 break;
441 }
442 };
443 let mut guard = self.metrics.acks.guard(Status::Invalid);
444 let ack = match msg {
445 Ok(ack) => ack,
446 Err(err) => {
447 debug!(?err, ?sender, "ack decode failed");
448 continue;
449 }
450 };
451 if let Err(err) = self.validate_ack(&ack, &sender) {
452 debug!(?err, ?sender, "ack validate failed");
453 continue;
454 };
455 if let Err(err) = self.handle_ack(&ack).await {
456 debug!(?err, ?sender, "ack handle failed");
457 guard.set(Status::Failure);
458 continue;
459 }
460 debug!(?sender, epoch = %ack.epoch, sequencer = ?ack.chunk.sequencer, height = %ack.chunk.height, "ack");
461 guard.set(Status::Success);
462 },
463
464 verify = self.pending_verifies.next_completed() => {
466 let Verify {
467 timer,
468 context,
469 payload,
470 result,
471 } = verify;
472 drop(timer); match result {
474 Err(err) => {
475 warn!(?err, ?context, "verified returned error");
476 self.metrics.verify.inc(Status::Dropped);
477 }
478 Ok(false) => {
479 debug!(?context, "verified was false");
480 self.metrics.verify.inc(Status::Failure);
481 }
482 Ok(true) => {
483 debug!(?context, "verified");
484 self.metrics.verify.inc(Status::Success);
485 if let Err(err) = self
486 .handle_app_verified(&context, &payload, &mut ack_sender)
487 .await
488 {
489 debug!(?err, ?context, ?payload, "verified handle failed");
490 }
491 }
492 }
493 },
494 }
495
496 self.pending_verifies.cancel_all();
498 while let Some((_, journal)) = self.journals.pop_first() {
499 journal.sync_all().await.expect("unable to sync journal");
500 }
501 }
502
503 async fn handle_app_verified(
512 &mut self,
513 context: &Context<C::PublicKey>,
514 payload: &D,
515 ack_sender: &mut WrappedSender<
516 impl Sender<PublicKey = C::PublicKey>,
517 Ack<C::PublicKey, P::Scheme, D>,
518 >,
519 ) -> Result<(), Error> {
520 let Some(tip) = self.tip_manager.get(&context.sequencer) else {
522 return Err(Error::AppVerifiedNoTip);
523 };
524
525 if tip.chunk.height != context.height {
527 return Err(Error::AppVerifiedHeightMismatch);
528 }
529
530 if tip.chunk.payload != *payload {
532 return Err(Error::AppVerifiedPayloadMismatch);
533 }
534
535 self.reporter
537 .report(Activity::Tip(Proposal::new(
538 tip.chunk.clone(),
539 tip.signature.clone(),
540 )))
541 .await;
542
543 let Some(scheme) = self.validators_provider.scoped(self.epoch) else {
545 return Err(Error::UnknownScheme(self.epoch));
546 };
547
548 let Some(ack) = Ack::sign(scheme.as_ref(), tip.chunk.clone(), self.epoch) else {
550 return Err(Error::NotSigner(self.epoch));
551 };
552
553 self.journal_sync(&context.sequencer, context.height).await;
556
557 let recipients = {
560 let validators = scheme.participants();
561 let mut recipients = validators.iter().cloned().collect::<Vec<_>>();
562 if !validators.iter().any(|v| v == &tip.chunk.sequencer) {
563 recipients.push(tip.chunk.sequencer.clone());
564 }
565 recipients
566 };
567
568 self.handle_ack(&ack).await?;
570
571 ack_sender
573 .send(Recipients::Some(recipients), ack, self.priority_acks)
574 .await
575 .map_err(|_| Error::UnableToSendMessage)?;
576
577 Ok(())
578 }
579
580 async fn handle_certificate(
586 &mut self,
587 chunk: &Chunk<C::PublicKey, D>,
588 epoch: Epoch,
589 certificate: <P::Scheme as Scheme>::Certificate,
590 ) {
591 if !self.ack_manager.add_certificate(
593 &chunk.sequencer,
594 chunk.height,
595 epoch,
596 certificate.clone(),
597 ) {
598 return;
599 }
600
601 if let Some(ref signer) = self.sequencer_signer {
603 if chunk.sequencer == signer.public_key() {
604 self.propose_timer.take();
605 }
606 }
607
608 self.reporter
610 .report(Activity::Lock(Lock::new(chunk.clone(), epoch, certificate)))
611 .await;
612 }
613
614 async fn handle_ack(&mut self, ack: &Ack<C::PublicKey, P::Scheme, D>) -> Result<(), Error> {
619 let Some(scheme) = self.validators_provider.scoped(ack.epoch) else {
621 return Err(Error::UnknownScheme(ack.epoch));
622 };
623
624 if let Some(certificate) = self
626 .ack_manager
627 .add_ack(ack, scheme.as_ref(), &self.strategy)
628 {
629 debug!(epoch = %ack.epoch, sequencer = ?ack.chunk.sequencer, height = %ack.chunk.height, "recovered certificate");
630 self.metrics.certificates.inc();
631 self.handle_certificate(&ack.chunk, ack.epoch, certificate)
632 .await;
633 }
634
635 Ok(())
636 }
637
638 async fn handle_node(&mut self, node: &Node<C::PublicKey, P::Scheme, D>) {
642 let is_new = self.tip_manager.put(node);
644
645 if is_new {
647 let _ = self
649 .metrics
650 .sequencer_heights
651 .get_or_create(&metrics::SequencerLabel::from(&node.chunk.sequencer))
652 .try_set(node.chunk.height.get());
653
654 self.journal_append(node.clone()).await;
658 self.journal_sync(&node.chunk.sequencer, node.chunk.height)
659 .await;
660 }
661
662 let context = Context {
664 sequencer: node.chunk.sequencer.clone(),
665 height: node.chunk.height,
666 };
667 let payload = node.chunk.payload;
668 let mut automaton = self.automaton.clone();
669 let timer = self.metrics.verify_duration.timer();
670 self.pending_verifies.push(async move {
671 let receiver = automaton.verify(context.clone(), payload).await;
672 let result = receiver.await.map_err(Error::AppVerifyCanceled);
673 Verify {
674 timer,
675 context,
676 payload,
677 result,
678 }
679 });
680 }
681
682 fn should_propose(&self) -> Option<Context<C::PublicKey>> {
690 let me = self.sequencer_signer.as_ref()?.public_key();
692
693 self.sequencers_provider
695 .sequencers(self.epoch)?
696 .position(&me)?;
697
698 match self.tip_manager.get(&me) {
700 None => Some(Context {
701 sequencer: me,
702 height: Height::zero(),
703 }),
704 Some(tip) => self
705 .ack_manager
706 .get_certificate(&me, tip.chunk.height)
707 .map(|_| Context {
708 sequencer: me,
709 height: tip.chunk.height.next(),
710 }),
711 }
712 }
713
714 async fn propose(
719 &mut self,
720 context: Context<C::PublicKey>,
721 payload: D,
722 node_sender: &mut impl Sender<PublicKey = C::PublicKey>,
723 ) -> Result<(), Error> {
724 let mut guard = self.metrics.propose.guard(Status::Dropped);
725 let signer = self
726 .sequencer_signer
727 .as_mut()
728 .ok_or(Error::IAmNotASequencer(self.epoch))?;
729 let me = signer.public_key();
730
731 if context.sequencer != me {
733 return Err(Error::ContextSequencer);
734 }
735
736 self.sequencers_provider
738 .sequencers(self.epoch)
739 .and_then(|s| s.position(&me))
740 .ok_or(Error::IAmNotASequencer(self.epoch))?;
741
742 let mut height = Height::zero();
744 let mut parent = None;
745 if let Some(tip) = self.tip_manager.get(&me) {
746 let Some((epoch, certificate)) =
748 self.ack_manager.get_certificate(&me, tip.chunk.height)
749 else {
750 return Err(Error::MissingCertificate);
751 };
752
753 height = tip.chunk.height.next();
755 parent = Some(Parent::new(tip.chunk.payload, epoch, certificate.clone()));
756 }
757
758 if context.height != height {
760 return Err(Error::ContextHeight);
761 }
762
763 let node = Node::sign(signer, height, payload, parent);
765
766 self.handle_node(&node).await;
768
769 self.journal_sync(&me, height).await;
772
773 self.propose_timer = Some(self.metrics.e2e_duration.timer());
775
776 if let Err(err) = self.broadcast(node, node_sender, self.epoch).await {
778 guard.set(Status::Failure);
779 return Err(err);
780 };
781
782 guard.set(Status::Success);
784 Ok(())
785 }
786
787 async fn rebroadcast(
794 &mut self,
795 node_sender: &mut impl Sender<PublicKey = C::PublicKey>,
796 ) -> Result<(), Error> {
797 let mut guard = self.metrics.rebroadcast.guard(Status::Dropped);
798
799 self.rebroadcast_deadline = None;
801
802 let signer = self
804 .sequencer_signer
805 .as_ref()
806 .ok_or(Error::IAmNotASequencer(self.epoch))?;
807 let me = signer.public_key();
808
809 self.sequencers_provider
811 .sequencers(self.epoch)
812 .and_then(|s| s.position(&me))
813 .ok_or(Error::IAmNotASequencer(self.epoch))?;
814
815 let Some(tip) = self.tip_manager.get(&me) else {
817 return Err(Error::NothingToRebroadcast);
818 };
819
820 if self
822 .ack_manager
823 .get_certificate(&me, tip.chunk.height)
824 .is_some()
825 {
826 return Err(Error::AlreadyCertified);
827 }
828
829 guard.set(Status::Failure);
831 self.broadcast(tip, node_sender, self.epoch).await?;
832 guard.set(Status::Success);
833 Ok(())
834 }
835
836 async fn broadcast(
838 &mut self,
839 node: Node<C::PublicKey, P::Scheme, D>,
840 node_sender: &mut impl Sender<PublicKey = C::PublicKey>,
841 epoch: Epoch,
842 ) -> Result<(), Error> {
843 let Some(scheme) = self.validators_provider.scoped(epoch) else {
845 return Err(Error::UnknownScheme(epoch));
846 };
847 let validators = scheme.participants();
848
849 self.relay.broadcast(node.chunk.payload).await;
851
852 node_sender
854 .send(
855 Recipients::Some(validators.iter().cloned().collect()),
856 node.encode(),
857 self.priority_proposals,
858 )
859 .await
860 .map_err(|_| Error::BroadcastFailed)?;
861
862 self.rebroadcast_deadline = Some(self.context.current() + self.rebroadcast_timeout);
864
865 Ok(())
866 }
867
868 fn validate_node(
878 &mut self,
879 node: &Node<C::PublicKey, P::Scheme, D>,
880 sender: &C::PublicKey,
881 ) -> Result<Option<Chunk<C::PublicKey, D>>, Error> {
882 if node.chunk.sequencer != *sender {
884 return Err(Error::PeerMismatch);
885 }
886
887 if let Some(tip) = self.tip_manager.get(sender) {
890 if tip == *node {
891 return Ok(None);
892 }
893 }
894
895 self.validate_chunk(&node.chunk, self.epoch)?;
897
898 node.verify(
900 &mut self.context,
901 &self.chunk_verifier,
902 &self.validators_provider,
903 &self.strategy,
904 )
905 }
906
907 fn validate_ack(
912 &mut self,
913 ack: &Ack<C::PublicKey, P::Scheme, D>,
914 sender: &<P::Scheme as Scheme>::PublicKey,
915 ) -> Result<(), Error> {
916 self.validate_chunk(&ack.chunk, ack.epoch)?;
918
919 let Some(scheme) = self.validators_provider.scoped(ack.epoch) else {
921 return Err(Error::UnknownScheme(ack.epoch));
922 };
923
924 let participants = scheme.participants();
926 let Some(index) = participants.index(sender) else {
927 return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
928 };
929 if index != ack.attestation.signer {
930 return Err(Error::PeerMismatch);
931 }
932
933 {
935 let (eb_lo, eb_hi) = self.epoch_bounds;
936 let bound_lo = self.epoch.saturating_sub(eb_lo);
937 let bound_hi = self.epoch.saturating_add(eb_hi);
938 if ack.epoch < bound_lo || ack.epoch > bound_hi {
939 return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
940 }
941 }
942
943 {
945 let bound_lo = self
946 .tip_manager
947 .get(&ack.chunk.sequencer)
948 .map(|t| t.chunk.height)
949 .unwrap_or(Height::zero());
950 let bound_hi = bound_lo.saturating_add(self.height_bound);
951 if ack.chunk.height < bound_lo || ack.chunk.height > bound_hi {
952 return Err(Error::AckHeightOutsideBounds(
953 ack.chunk.height,
954 bound_lo,
955 bound_hi,
956 ));
957 }
958 }
959
960 if !ack.verify(&mut self.context, scheme.as_ref(), &self.strategy) {
962 return Err(Error::InvalidAckSignature);
963 }
964
965 Ok(())
966 }
967
968 fn validate_chunk(&self, chunk: &Chunk<C::PublicKey, D>, epoch: Epoch) -> Result<(), Error> {
973 if self
975 .sequencers_provider
976 .sequencers(epoch)
977 .and_then(|s| s.position(&chunk.sequencer))
978 .is_none()
979 {
980 return Err(Error::UnknownSequencer(epoch, chunk.sequencer.to_string()));
981 }
982
983 if let Some(tip) = self.tip_manager.get(&chunk.sequencer) {
985 match chunk.height.cmp(&tip.chunk.height) {
987 std::cmp::Ordering::Less => {
988 return Err(Error::ChunkHeightTooLow(chunk.height, tip.chunk.height));
989 }
990 std::cmp::Ordering::Equal => {
991 if tip.chunk.payload != chunk.payload {
993 return Err(Error::ChunkMismatch(
994 chunk.sequencer.to_string(),
995 chunk.height,
996 ));
997 }
998 }
999 std::cmp::Ordering::Greater => {}
1000 }
1001 }
1002
1003 Ok(())
1004 }
1005
1006 const fn get_journal_section(&self, height: Height) -> u64 {
1012 height.get() / self.journal_heights_per_section.get()
1013 }
1014
1015 async fn journal_prepare(&mut self, sequencer: &C::PublicKey) {
1019 if self.journals.contains_key(sequencer) {
1021 return;
1022 }
1023
1024 let cfg = JournalConfig {
1026 partition: format!("{}{}", &self.journal_name_prefix, sequencer),
1027 compression: self.journal_compression,
1028 codec_config: P::Scheme::certificate_codec_config_unbounded(),
1029 page_cache: self.journal_page_cache.clone(),
1030 write_buffer: self.journal_write_buffer,
1031 };
1032 let journal = Journal::<_, Node<C::PublicKey, P::Scheme, D>>::init(
1033 self.context
1034 .with_label("journal")
1035 .with_attribute("sequencer", sequencer)
1036 .into_present(),
1037 cfg,
1038 )
1039 .await
1040 .expect("unable to init journal");
1041
1042 {
1044 debug!(?sequencer, "journal replay begin");
1045
1046 let stream = journal
1048 .replay(0, 0, self.journal_replay_buffer)
1049 .await
1050 .expect("unable to replay journal");
1051 pin_mut!(stream);
1052
1053 let mut tip: Option<Node<C::PublicKey, P::Scheme, D>> = None;
1056 let mut num_items = 0;
1057 while let Some(msg) = stream.next().await {
1058 let (_, _, _, node) = msg.expect("unable to read from journal");
1059 num_items += 1;
1060 let height = node.chunk.height;
1061 match tip {
1062 None => {
1063 tip = Some(node);
1064 }
1065 Some(ref t) => {
1066 if height > t.chunk.height {
1067 tip = Some(node);
1068 }
1069 }
1070 }
1071 }
1072
1073 if let Some(node) = tip.take() {
1076 let is_new = self.tip_manager.put(&node);
1077 assert!(is_new);
1078 }
1079
1080 debug!(?sequencer, ?num_items, "journal replay end");
1081 }
1082
1083 self.journals.insert(sequencer.clone(), journal);
1085 }
1086
1087 async fn journal_append(&mut self, node: Node<C::PublicKey, P::Scheme, D>) {
1092 let section = self.get_journal_section(node.chunk.height);
1093 self.journals
1094 .get_mut(&node.chunk.sequencer)
1095 .expect("journal does not exist")
1096 .append(section, node)
1097 .await
1098 .expect("unable to append to journal");
1099 }
1100
1101 async fn journal_sync(&mut self, sequencer: &C::PublicKey, height: Height) {
1103 let section = self.get_journal_section(height);
1104
1105 let journal = self
1107 .journals
1108 .get_mut(sequencer)
1109 .expect("journal does not exist");
1110
1111 journal.sync(section).await.expect("unable to sync journal");
1113
1114 let _ = journal.prune(section).await;
1116 }
1117}