1use super::{
11 metrics, scheme,
12 types::{
13 Ack, Activity, Chunk, ChunkSigner, ChunkVerifier, Context, Error, Lock, Node, Parent,
14 Proposal, SequencersProvider,
15 },
16 AckManager, Config, TipManager,
17};
18use crate::{
19 types::{Epoch, EpochDelta, Height, HeightDelta},
20 Automaton, Monitor, Relay, Reporter,
21};
22use commonware_codec::Encode;
23use commonware_cryptography::{
24 certificate::{Provider, Scheme},
25 Digest, PublicKey, Signer,
26};
27use commonware_macros::select;
28use commonware_p2p::{
29 utils::codec::{wrap, WrappedSender},
30 Receiver, Recipients, Sender,
31};
32use commonware_parallel::Strategy;
33use commonware_runtime::{
34 buffer::PoolRef,
35 spawn_cell,
36 telemetry::metrics::{
37 histogram,
38 status::{CounterExt, GaugeExt, Status},
39 },
40 Clock, ContextCell, Handle, Metrics, Spawner, Storage,
41};
42use commonware_storage::journal::segmented::variable::{Config as JournalConfig, Journal};
43use commonware_utils::{futures::Pool as FuturesPool, ordered::Quorum};
44use futures::{
45 channel::oneshot,
46 future::{self, Either},
47 pin_mut, StreamExt,
48};
49use rand_core::CryptoRngCore;
50use std::{
51 collections::BTreeMap,
52 num::{NonZeroU64, NonZeroUsize},
53 time::{Duration, SystemTime},
54};
55use tracing::{debug, error, info, warn};
56
57struct Verify<C: PublicKey, D: Digest, E: Clock> {
59 timer: histogram::Timer<E>,
60 context: Context<C>,
61 payload: D,
62 result: Result<bool, Error>,
63}
64
65pub struct Engine<
67 E: Clock + Spawner + CryptoRngCore + Storage + Metrics,
68 C: Signer,
69 S: SequencersProvider<PublicKey = C::PublicKey>,
70 P: Provider<Scope = Epoch, Scheme: scheme::Scheme<C::PublicKey, D>>,
71 D: Digest,
72 A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
73 R: Relay<Digest = D>,
74 Z: Reporter<Activity = Activity<C::PublicKey, P::Scheme, D>>,
75 M: Monitor<Index = Epoch>,
76 T: Strategy,
77> {
78 context: ContextCell<E>,
82 sequencer_signer: Option<ChunkSigner<C>>,
83 sequencers_provider: S,
84 validators_provider: P,
85 automaton: A,
86 relay: R,
87 monitor: M,
88 reporter: Z,
89 strategy: T,
90
91 chunk_verifier: ChunkVerifier,
97
98 rebroadcast_timeout: Duration,
104 rebroadcast_deadline: Option<SystemTime>,
105
106 epoch_bounds: (EpochDelta, EpochDelta),
118
119 height_bound: HeightDelta,
125
126 pending_verifies: FuturesPool<Verify<C::PublicKey, D, E>>,
138
139 journal_heights_per_section: NonZeroU64,
145
146 journal_replay_buffer: NonZeroUsize,
148
149 journal_write_buffer: NonZeroUsize,
151
152 journal_name_prefix: String,
155
156 journal_compression: Option<u8>,
158
159 journal_buffer_pool: PoolRef,
161
162 #[allow(clippy::type_complexity)]
164 journals: BTreeMap<C::PublicKey, Journal<E, Node<C::PublicKey, P::Scheme, D>>>,
165
166 tip_manager: TipManager<C::PublicKey, P::Scheme, D>,
175
176 ack_manager: AckManager<C::PublicKey, P::Scheme, D>,
179
180 epoch: Epoch,
182
183 priority_proposals: bool,
189
190 priority_acks: bool,
192
193 metrics: metrics::Metrics<E>,
199
200 propose_timer: Option<histogram::Timer<E>>,
202}
203
204impl<
205 E: Clock + Spawner + CryptoRngCore + Storage + Metrics,
206 C: Signer,
207 S: SequencersProvider<PublicKey = C::PublicKey>,
208 P: Provider<Scope = Epoch, Scheme: scheme::Scheme<C::PublicKey, D, PublicKey = C::PublicKey>>,
209 D: Digest,
210 A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
211 R: Relay<Digest = D>,
212 Z: Reporter<Activity = Activity<C::PublicKey, P::Scheme, D>>,
213 M: Monitor<Index = Epoch>,
214 T: Strategy,
215 > Engine<E, C, S, P, D, A, R, Z, M, T>
216{
217 pub fn new(context: E, cfg: Config<C, S, P, D, A, R, Z, M, T>) -> Self {
219 let metrics = metrics::Metrics::init(context.clone());
221
222 Self {
223 context: ContextCell::new(context),
224 sequencer_signer: cfg.sequencer_signer,
225 sequencers_provider: cfg.sequencers_provider,
226 validators_provider: cfg.validators_provider,
227 automaton: cfg.automaton,
228 relay: cfg.relay,
229 reporter: cfg.reporter,
230 monitor: cfg.monitor,
231 strategy: cfg.strategy,
232 chunk_verifier: cfg.chunk_verifier,
233 rebroadcast_timeout: cfg.rebroadcast_timeout,
234 rebroadcast_deadline: None,
235 epoch_bounds: cfg.epoch_bounds,
236 height_bound: cfg.height_bound,
237 pending_verifies: FuturesPool::default(),
238 journal_heights_per_section: cfg.journal_heights_per_section,
239 journal_replay_buffer: cfg.journal_replay_buffer,
240 journal_write_buffer: cfg.journal_write_buffer,
241 journal_name_prefix: cfg.journal_name_prefix,
242 journal_compression: cfg.journal_compression,
243 journal_buffer_pool: cfg.journal_buffer_pool,
244 journals: BTreeMap::new(),
245 tip_manager: TipManager::<C::PublicKey, P::Scheme, D>::new(),
246 ack_manager: AckManager::<C::PublicKey, P::Scheme, D>::new(),
247 epoch: Epoch::zero(),
248 priority_proposals: cfg.priority_proposals,
249 priority_acks: cfg.priority_acks,
250 metrics,
251 propose_timer: None,
252 }
253 }
254
255 pub fn start(
266 mut self,
267 chunk_network: (
268 impl Sender<PublicKey = C::PublicKey>,
269 impl Receiver<PublicKey = C::PublicKey>,
270 ),
271 ack_network: (
272 impl Sender<PublicKey = C::PublicKey>,
273 impl Receiver<PublicKey = C::PublicKey>,
274 ),
275 ) -> Handle<()> {
276 spawn_cell!(self.context, self.run(chunk_network, ack_network).await)
277 }
278
279 async fn run(
281 mut self,
282 chunk_network: (
283 impl Sender<PublicKey = C::PublicKey>,
284 impl Receiver<PublicKey = C::PublicKey>,
285 ),
286 ack_network: (
287 impl Sender<PublicKey = C::PublicKey>,
288 impl Receiver<PublicKey = C::PublicKey>,
289 ),
290 ) {
291 let mut node_sender = chunk_network.0;
292 let mut node_receiver = chunk_network.1;
293 let (mut ack_sender, mut ack_receiver) = wrap((), ack_network.0, ack_network.1);
294 let mut shutdown = self.context.stopped();
295
296 let mut pending: Option<(Context<C::PublicKey>, oneshot::Receiver<D>)> = None;
298
299 let (latest, mut epoch_updates) = self.monitor.subscribe().await;
301 self.epoch = latest;
302
303 if let Some(ref signer) = self.sequencer_signer {
306 self.journal_prepare(&signer.public_key()).await;
307 if let Err(err) = self.rebroadcast(&mut node_sender).await {
308 info!(?err, "initial rebroadcast failed");
310 }
311 }
312
313 loop {
314 if pending.is_none() {
316 if let Some(context) = self.should_propose() {
317 let receiver = self.automaton.propose(context.clone()).await;
318 pending = Some((context, receiver));
319 }
320 }
321
322 let rebroadcast = match self.rebroadcast_deadline {
326 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
327 None => Either::Right(future::pending()),
328 };
329 let propose = match &mut pending {
330 Some((_context, receiver)) => Either::Left(receiver),
331 None => Either::Right(futures::future::pending()),
332 };
333
334 select! {
336 _ = &mut shutdown => {
338 debug!("shutdown");
339 break;
340 },
341
342 epoch = epoch_updates.next() => {
344 let Some(epoch) = epoch else {
346 error!("epoch subscription failed");
347 break;
348 };
349
350 debug!(current = %self.epoch, new = %epoch, "refresh epoch");
352 assert!(epoch >= self.epoch);
353 self.epoch = epoch;
354 continue;
355 },
356
357 _ = rebroadcast => {
359 if let Some(ref signer) = self.sequencer_signer {
360 debug!(epoch = %self.epoch, sender = ?signer.public_key(), "rebroadcast");
361 if let Err(err) = self.rebroadcast(&mut node_sender).await {
362 info!(?err, "rebroadcast failed");
363 continue;
364 }
365 }
366 },
367
368 receiver = propose => {
370 let (context, _) = pending.take().unwrap();
372 debug!(height = %context.height, "propose");
373
374 let Ok(payload) = receiver else {
376 warn!(?context, "automaton dropped proposal");
377 continue;
378 };
379
380 if let Err(err) = self.propose(context.clone(), payload, &mut node_sender).await {
382 warn!(?err, ?context, "propose new failed");
383 continue;
384 }
385 },
386
387 msg = node_receiver.recv() => {
389 let (sender, msg) = match msg {
391 Ok(r) => r,
392 Err(err) => {
393 error!(?err, "node receiver failed");
394 break;
395 }
396 };
397 let mut guard = self.metrics.nodes.guard(Status::Invalid);
398
399 let node = match Node::read_staged(&mut msg.as_ref(), &self.validators_provider) {
401 Ok(node) => node,
402 Err(err) => {
403 debug!(?err, ?sender, "node decode failed");
404 continue;
405 }
406 };
407 let result = match self.validate_node(&node, &sender) {
408 Ok(result) => result,
409 Err(err) => {
410 debug!(?err, ?sender, "node validate failed");
411 continue;
412 }
413 };
414
415 self.journal_prepare(&sender).await;
417
418 if let Some(parent_chunk) = result {
420 let parent = node.parent.as_ref().unwrap();
421 self.handle_certificate(&parent_chunk, parent.epoch, parent.certificate.clone()).await;
422 }
423
424 self.handle_node(&node).await;
429 debug!(?sender, height = %node.chunk.height, "node");
430 guard.set(Status::Success);
431 },
432
433 msg = ack_receiver.recv() => {
435 let (sender, msg) = match msg {
437 Ok(r) => r,
438 Err(err) => {
439 warn!(?err, "ack receiver failed");
440 break;
441 }
442 };
443 let mut guard = self.metrics.acks.guard(Status::Invalid);
444 let ack = match msg {
445 Ok(ack) => ack,
446 Err(err) => {
447 debug!(?err, ?sender, "ack decode failed");
448 continue;
449 }
450 };
451 if let Err(err) = self.validate_ack(&ack, &sender) {
452 debug!(?err, ?sender, "ack validate failed");
453 continue;
454 };
455 if let Err(err) = self.handle_ack(&ack).await {
456 debug!(?err, ?sender, "ack handle failed");
457 guard.set(Status::Failure);
458 continue;
459 }
460 debug!(?sender, epoch = %ack.epoch, sequencer = ?ack.chunk.sequencer, height = %ack.chunk.height, "ack");
461 guard.set(Status::Success);
462 },
463
464 verify = self.pending_verifies.next_completed() => {
466 let Verify { timer, context, payload, result } = verify;
467 drop(timer); match result {
469 Err(err) => {
470 warn!(?err, ?context, "verified returned error");
471 self.metrics.verify.inc(Status::Dropped);
472 }
473 Ok(false) => {
474 debug!(?context, "verified was false");
475 self.metrics.verify.inc(Status::Failure);
476 }
477 Ok(true) => {
478 debug!(?context, "verified");
479 self.metrics.verify.inc(Status::Success);
480 if let Err(err) = self.handle_app_verified(&context, &payload, &mut ack_sender).await {
481 debug!(?err, ?context, ?payload, "verified handle failed");
482 }
483 },
484 }
485 },
486 }
487 }
488
489 self.pending_verifies.cancel_all();
491 while let Some((_, journal)) = self.journals.pop_first() {
492 journal.sync_all().await.expect("unable to sync journal");
493 }
494 }
495
496 async fn handle_app_verified(
505 &mut self,
506 context: &Context<C::PublicKey>,
507 payload: &D,
508 ack_sender: &mut WrappedSender<
509 impl Sender<PublicKey = C::PublicKey>,
510 Ack<C::PublicKey, P::Scheme, D>,
511 >,
512 ) -> Result<(), Error> {
513 let Some(tip) = self.tip_manager.get(&context.sequencer) else {
515 return Err(Error::AppVerifiedNoTip);
516 };
517
518 if tip.chunk.height != context.height {
520 return Err(Error::AppVerifiedHeightMismatch);
521 }
522
523 if tip.chunk.payload != *payload {
525 return Err(Error::AppVerifiedPayloadMismatch);
526 }
527
528 self.reporter
530 .report(Activity::Tip(Proposal::new(
531 tip.chunk.clone(),
532 tip.signature.clone(),
533 )))
534 .await;
535
536 let Some(scheme) = self.validators_provider.scoped(self.epoch) else {
538 return Err(Error::UnknownScheme(self.epoch));
539 };
540
541 let Some(ack) = Ack::sign(scheme.as_ref(), tip.chunk.clone(), self.epoch) else {
543 return Err(Error::NotSigner(self.epoch));
544 };
545
546 self.journal_sync(&context.sequencer, context.height).await;
549
550 let recipients = {
553 let validators = scheme.participants();
554 let mut recipients = validators.iter().cloned().collect::<Vec<_>>();
555 if !validators.iter().any(|v| v == &tip.chunk.sequencer) {
556 recipients.push(tip.chunk.sequencer.clone());
557 }
558 recipients
559 };
560
561 self.handle_ack(&ack).await?;
563
564 ack_sender
566 .send(Recipients::Some(recipients), ack, self.priority_acks)
567 .await
568 .map_err(|_| Error::UnableToSendMessage)?;
569
570 Ok(())
571 }
572
573 async fn handle_certificate(
579 &mut self,
580 chunk: &Chunk<C::PublicKey, D>,
581 epoch: Epoch,
582 certificate: <P::Scheme as Scheme>::Certificate,
583 ) {
584 if !self.ack_manager.add_certificate(
586 &chunk.sequencer,
587 chunk.height,
588 epoch,
589 certificate.clone(),
590 ) {
591 return;
592 }
593
594 if let Some(ref signer) = self.sequencer_signer {
596 if chunk.sequencer == signer.public_key() {
597 self.propose_timer.take();
598 }
599 }
600
601 self.reporter
603 .report(Activity::Lock(Lock::new(chunk.clone(), epoch, certificate)))
604 .await;
605 }
606
607 async fn handle_ack(&mut self, ack: &Ack<C::PublicKey, P::Scheme, D>) -> Result<(), Error> {
612 let Some(scheme) = self.validators_provider.scoped(ack.epoch) else {
614 return Err(Error::UnknownScheme(ack.epoch));
615 };
616
617 if let Some(certificate) = self
619 .ack_manager
620 .add_ack(ack, scheme.as_ref(), &self.strategy)
621 {
622 debug!(epoch = %ack.epoch, sequencer = ?ack.chunk.sequencer, height = %ack.chunk.height, "recovered certificate");
623 self.metrics.certificates.inc();
624 self.handle_certificate(&ack.chunk, ack.epoch, certificate)
625 .await;
626 }
627
628 Ok(())
629 }
630
631 async fn handle_node(&mut self, node: &Node<C::PublicKey, P::Scheme, D>) {
635 let is_new = self.tip_manager.put(node);
637
638 if is_new {
640 let _ = self
642 .metrics
643 .sequencer_heights
644 .get_or_create(&metrics::SequencerLabel::from(&node.chunk.sequencer))
645 .try_set(node.chunk.height.get());
646
647 self.journal_append(node.clone()).await;
651 self.journal_sync(&node.chunk.sequencer, node.chunk.height)
652 .await;
653 }
654
655 let context = Context {
657 sequencer: node.chunk.sequencer.clone(),
658 height: node.chunk.height,
659 };
660 let payload = node.chunk.payload;
661 let mut automaton = self.automaton.clone();
662 let timer = self.metrics.verify_duration.timer();
663 self.pending_verifies.push(async move {
664 let receiver = automaton.verify(context.clone(), payload).await;
665 let result = receiver.await.map_err(Error::AppVerifyCanceled);
666 Verify {
667 timer,
668 context,
669 payload,
670 result,
671 }
672 });
673 }
674
675 fn should_propose(&self) -> Option<Context<C::PublicKey>> {
683 let me = self.sequencer_signer.as_ref()?.public_key();
685
686 self.sequencers_provider
688 .sequencers(self.epoch)?
689 .position(&me)?;
690
691 match self.tip_manager.get(&me) {
693 None => Some(Context {
694 sequencer: me,
695 height: Height::zero(),
696 }),
697 Some(tip) => self
698 .ack_manager
699 .get_certificate(&me, tip.chunk.height)
700 .map(|_| Context {
701 sequencer: me,
702 height: tip.chunk.height.next(),
703 }),
704 }
705 }
706
707 async fn propose(
712 &mut self,
713 context: Context<C::PublicKey>,
714 payload: D,
715 node_sender: &mut impl Sender<PublicKey = C::PublicKey>,
716 ) -> Result<(), Error> {
717 let mut guard = self.metrics.propose.guard(Status::Dropped);
718 let signer = self
719 .sequencer_signer
720 .as_mut()
721 .ok_or(Error::IAmNotASequencer(self.epoch))?;
722 let me = signer.public_key();
723
724 if context.sequencer != me {
726 return Err(Error::ContextSequencer);
727 }
728
729 self.sequencers_provider
731 .sequencers(self.epoch)
732 .and_then(|s| s.position(&me))
733 .ok_or(Error::IAmNotASequencer(self.epoch))?;
734
735 let mut height = Height::zero();
737 let mut parent = None;
738 if let Some(tip) = self.tip_manager.get(&me) {
739 let Some((epoch, certificate)) =
741 self.ack_manager.get_certificate(&me, tip.chunk.height)
742 else {
743 return Err(Error::MissingCertificate);
744 };
745
746 height = tip.chunk.height.next();
748 parent = Some(Parent::new(tip.chunk.payload, epoch, certificate.clone()));
749 }
750
751 if context.height != height {
753 return Err(Error::ContextHeight);
754 }
755
756 let node = Node::sign(signer, height, payload, parent);
758
759 self.handle_node(&node).await;
761
762 self.journal_sync(&me, height).await;
765
766 self.propose_timer = Some(self.metrics.e2e_duration.timer());
768
769 if let Err(err) = self.broadcast(node, node_sender, self.epoch).await {
771 guard.set(Status::Failure);
772 return Err(err);
773 };
774
775 guard.set(Status::Success);
777 Ok(())
778 }
779
780 async fn rebroadcast(
787 &mut self,
788 node_sender: &mut impl Sender<PublicKey = C::PublicKey>,
789 ) -> Result<(), Error> {
790 let mut guard = self.metrics.rebroadcast.guard(Status::Dropped);
791
792 self.rebroadcast_deadline = None;
794
795 let signer = self
797 .sequencer_signer
798 .as_ref()
799 .ok_or(Error::IAmNotASequencer(self.epoch))?;
800 let me = signer.public_key();
801
802 self.sequencers_provider
804 .sequencers(self.epoch)
805 .and_then(|s| s.position(&me))
806 .ok_or(Error::IAmNotASequencer(self.epoch))?;
807
808 let Some(tip) = self.tip_manager.get(&me) else {
810 return Err(Error::NothingToRebroadcast);
811 };
812
813 if self
815 .ack_manager
816 .get_certificate(&me, tip.chunk.height)
817 .is_some()
818 {
819 return Err(Error::AlreadyCertified);
820 }
821
822 guard.set(Status::Failure);
824 self.broadcast(tip, node_sender, self.epoch).await?;
825 guard.set(Status::Success);
826 Ok(())
827 }
828
829 async fn broadcast(
831 &mut self,
832 node: Node<C::PublicKey, P::Scheme, D>,
833 node_sender: &mut impl Sender<PublicKey = C::PublicKey>,
834 epoch: Epoch,
835 ) -> Result<(), Error> {
836 let Some(scheme) = self.validators_provider.scoped(epoch) else {
838 return Err(Error::UnknownScheme(epoch));
839 };
840 let validators = scheme.participants();
841
842 self.relay.broadcast(node.chunk.payload).await;
844
845 node_sender
847 .send(
848 Recipients::Some(validators.iter().cloned().collect()),
849 node.encode(),
850 self.priority_proposals,
851 )
852 .await
853 .map_err(|_| Error::BroadcastFailed)?;
854
855 self.rebroadcast_deadline = Some(self.context.current() + self.rebroadcast_timeout);
857
858 Ok(())
859 }
860
861 fn validate_node(
871 &mut self,
872 node: &Node<C::PublicKey, P::Scheme, D>,
873 sender: &C::PublicKey,
874 ) -> Result<Option<Chunk<C::PublicKey, D>>, Error> {
875 if node.chunk.sequencer != *sender {
877 return Err(Error::PeerMismatch);
878 }
879
880 if let Some(tip) = self.tip_manager.get(sender) {
883 if tip == *node {
884 return Ok(None);
885 }
886 }
887
888 self.validate_chunk(&node.chunk, self.epoch)?;
890
891 node.verify(
893 &mut self.context,
894 &self.chunk_verifier,
895 &self.validators_provider,
896 &self.strategy,
897 )
898 }
899
900 fn validate_ack(
905 &mut self,
906 ack: &Ack<C::PublicKey, P::Scheme, D>,
907 sender: &<P::Scheme as Scheme>::PublicKey,
908 ) -> Result<(), Error> {
909 self.validate_chunk(&ack.chunk, ack.epoch)?;
911
912 let Some(scheme) = self.validators_provider.scoped(ack.epoch) else {
914 return Err(Error::UnknownScheme(ack.epoch));
915 };
916
917 let participants = scheme.participants();
919 let Some(index) = participants.index(sender) else {
920 return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
921 };
922 if index != ack.attestation.signer {
923 return Err(Error::PeerMismatch);
924 }
925
926 {
928 let (eb_lo, eb_hi) = self.epoch_bounds;
929 let bound_lo = self.epoch.saturating_sub(eb_lo);
930 let bound_hi = self.epoch.saturating_add(eb_hi);
931 if ack.epoch < bound_lo || ack.epoch > bound_hi {
932 return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
933 }
934 }
935
936 {
938 let bound_lo = self
939 .tip_manager
940 .get(&ack.chunk.sequencer)
941 .map(|t| t.chunk.height)
942 .unwrap_or(Height::zero());
943 let bound_hi = bound_lo.saturating_add(self.height_bound);
944 if ack.chunk.height < bound_lo || ack.chunk.height > bound_hi {
945 return Err(Error::AckHeightOutsideBounds(
946 ack.chunk.height,
947 bound_lo,
948 bound_hi,
949 ));
950 }
951 }
952
953 if !ack.verify(&mut self.context, scheme.as_ref(), &self.strategy) {
955 return Err(Error::InvalidAckSignature);
956 }
957
958 Ok(())
959 }
960
961 fn validate_chunk(&self, chunk: &Chunk<C::PublicKey, D>, epoch: Epoch) -> Result<(), Error> {
966 if self
968 .sequencers_provider
969 .sequencers(epoch)
970 .and_then(|s| s.position(&chunk.sequencer))
971 .is_none()
972 {
973 return Err(Error::UnknownSequencer(epoch, chunk.sequencer.to_string()));
974 }
975
976 if let Some(tip) = self.tip_manager.get(&chunk.sequencer) {
978 match chunk.height.cmp(&tip.chunk.height) {
980 std::cmp::Ordering::Less => {
981 return Err(Error::ChunkHeightTooLow(chunk.height, tip.chunk.height));
982 }
983 std::cmp::Ordering::Equal => {
984 if tip.chunk.payload != chunk.payload {
986 return Err(Error::ChunkMismatch(
987 chunk.sequencer.to_string(),
988 chunk.height,
989 ));
990 }
991 }
992 std::cmp::Ordering::Greater => {}
993 }
994 }
995
996 Ok(())
997 }
998
999 const fn get_journal_section(&self, height: Height) -> u64 {
1005 height.get() / self.journal_heights_per_section.get()
1006 }
1007
1008 async fn journal_prepare(&mut self, sequencer: &C::PublicKey) {
1012 if self.journals.contains_key(sequencer) {
1014 return;
1015 }
1016
1017 let cfg = JournalConfig {
1019 partition: format!("{}{}", &self.journal_name_prefix, sequencer),
1020 compression: self.journal_compression,
1021 codec_config: P::Scheme::certificate_codec_config_unbounded(),
1022 buffer_pool: self.journal_buffer_pool.clone(),
1023 write_buffer: self.journal_write_buffer,
1024 };
1025 let journal = Journal::<_, Node<C::PublicKey, P::Scheme, D>>::init(
1026 self.context.with_label("journal").into_present(),
1027 cfg,
1028 )
1029 .await
1030 .expect("unable to init journal");
1031
1032 {
1034 debug!(?sequencer, "journal replay begin");
1035
1036 let stream = journal
1038 .replay(0, 0, self.journal_replay_buffer)
1039 .await
1040 .expect("unable to replay journal");
1041 pin_mut!(stream);
1042
1043 let mut tip: Option<Node<C::PublicKey, P::Scheme, D>> = None;
1046 let mut num_items = 0;
1047 while let Some(msg) = stream.next().await {
1048 let (_, _, _, node) = msg.expect("unable to read from journal");
1049 num_items += 1;
1050 let height = node.chunk.height;
1051 match tip {
1052 None => {
1053 tip = Some(node);
1054 }
1055 Some(ref t) => {
1056 if height > t.chunk.height {
1057 tip = Some(node);
1058 }
1059 }
1060 }
1061 }
1062
1063 if let Some(node) = tip.take() {
1066 let is_new = self.tip_manager.put(&node);
1067 assert!(is_new);
1068 }
1069
1070 debug!(?sequencer, ?num_items, "journal replay end");
1071 }
1072
1073 self.journals.insert(sequencer.clone(), journal);
1075 }
1076
1077 async fn journal_append(&mut self, node: Node<C::PublicKey, P::Scheme, D>) {
1082 let section = self.get_journal_section(node.chunk.height);
1083 self.journals
1084 .get_mut(&node.chunk.sequencer)
1085 .expect("journal does not exist")
1086 .append(section, node)
1087 .await
1088 .expect("unable to append to journal");
1089 }
1090
1091 async fn journal_sync(&mut self, sequencer: &C::PublicKey, height: Height) {
1093 let section = self.get_journal_section(height);
1094
1095 let journal = self
1097 .journals
1098 .get_mut(sequencer)
1099 .expect("journal does not exist");
1100
1101 journal.sync(section).await.expect("unable to sync journal");
1103
1104 let _ = journal.prune(section).await;
1106 }
1107}