1use super::{
11 metrics, scheme,
12 types::{
13 Ack, Activity, Chunk, ChunkSigner, ChunkVerifier, Context, Error, Lock, Node, Parent,
14 Proposal, SequencersProvider,
15 },
16 AckManager, Config, TipManager,
17};
18use crate::{
19 types::{Epoch, EpochDelta, Height, HeightDelta},
20 Automaton, Monitor, Relay, Reporter,
21};
22use commonware_codec::Encode;
23use commonware_cryptography::{
24 certificate::{Provider, Scheme},
25 Digest, PublicKey, Signer,
26};
27use commonware_macros::select_loop;
28use commonware_p2p::{
29 utils::codec::{wrap, WrappedSender},
30 Receiver, Recipients, Sender,
31};
32use commonware_parallel::Strategy;
33use commonware_runtime::{
34 buffer::paged::CacheRef,
35 spawn_cell,
36 telemetry::metrics::{
37 histogram,
38 status::{CounterExt, GaugeExt, Status},
39 },
40 BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner, Storage,
41};
42use commonware_storage::journal::segmented::variable::{Config as JournalConfig, Journal};
43use commonware_utils::{channel::oneshot, futures::Pool as FuturesPool, ordered::Quorum};
44use futures::{
45 future::{self, Either},
46 pin_mut, StreamExt,
47};
48use rand_core::CryptoRngCore;
49use std::{
50 collections::BTreeMap,
51 num::{NonZeroU64, NonZeroUsize},
52 time::{Duration, SystemTime},
53};
54use tracing::{debug, error, info, warn};
55
56struct Verify<C: PublicKey, D: Digest, E: Clock> {
58 timer: histogram::Timer<E>,
59 context: Context<C>,
60 payload: D,
61 result: Result<bool, Error>,
62}
63
64pub struct Engine<
66 E: BufferPooler + Clock + Spawner + CryptoRngCore + Storage + Metrics,
67 C: Signer,
68 S: SequencersProvider<PublicKey = C::PublicKey>,
69 P: Provider<Scope = Epoch, Scheme: scheme::Scheme<C::PublicKey, D>>,
70 D: Digest,
71 A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
72 R: Relay<Digest = D>,
73 Z: Reporter<Activity = Activity<C::PublicKey, P::Scheme, D>>,
74 M: Monitor<Index = Epoch>,
75 T: Strategy,
76> {
77 context: ContextCell<E>,
81 sequencer_signer: Option<ChunkSigner<C>>,
82 sequencers_provider: S,
83 validators_provider: P,
84 automaton: A,
85 relay: R,
86 monitor: M,
87 reporter: Z,
88 strategy: T,
89
90 chunk_verifier: ChunkVerifier,
96
97 rebroadcast_timeout: Duration,
103 rebroadcast_deadline: Option<SystemTime>,
104
105 epoch_bounds: (EpochDelta, EpochDelta),
117
118 height_bound: HeightDelta,
124
125 pending_verifies: FuturesPool<Verify<C::PublicKey, D, E>>,
137
138 journal_heights_per_section: NonZeroU64,
144
145 journal_replay_buffer: NonZeroUsize,
147
148 journal_write_buffer: NonZeroUsize,
150
151 journal_name_prefix: String,
154
155 journal_compression: Option<u8>,
157
158 journal_page_cache: CacheRef,
160
161 #[allow(clippy::type_complexity)]
163 journals: BTreeMap<C::PublicKey, Journal<E, Node<C::PublicKey, P::Scheme, D>>>,
164
165 tip_manager: TipManager<C::PublicKey, P::Scheme, D>,
174
175 ack_manager: AckManager<C::PublicKey, P::Scheme, D>,
178
179 epoch: Epoch,
181
182 priority_proposals: bool,
188
189 priority_acks: bool,
191
192 metrics: metrics::Metrics<E>,
198
199 propose_timer: Option<histogram::Timer<E>>,
201}
202
203impl<
204 E: BufferPooler + Clock + Spawner + CryptoRngCore + Storage + Metrics,
205 C: Signer,
206 S: SequencersProvider<PublicKey = C::PublicKey>,
207 P: Provider<Scope = Epoch, Scheme: scheme::Scheme<C::PublicKey, D, PublicKey = C::PublicKey>>,
208 D: Digest,
209 A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
210 R: Relay<Digest = D>,
211 Z: Reporter<Activity = Activity<C::PublicKey, P::Scheme, D>>,
212 M: Monitor<Index = Epoch>,
213 T: Strategy,
214 > Engine<E, C, S, P, D, A, R, Z, M, T>
215{
216 pub fn new(context: E, cfg: Config<C, S, P, D, A, R, Z, M, T>) -> Self {
218 let metrics = metrics::Metrics::init(context.clone());
220
221 Self {
222 context: ContextCell::new(context),
223 sequencer_signer: cfg.sequencer_signer,
224 sequencers_provider: cfg.sequencers_provider,
225 validators_provider: cfg.validators_provider,
226 automaton: cfg.automaton,
227 relay: cfg.relay,
228 reporter: cfg.reporter,
229 monitor: cfg.monitor,
230 strategy: cfg.strategy,
231 chunk_verifier: cfg.chunk_verifier,
232 rebroadcast_timeout: cfg.rebroadcast_timeout,
233 rebroadcast_deadline: None,
234 epoch_bounds: cfg.epoch_bounds,
235 height_bound: cfg.height_bound,
236 pending_verifies: FuturesPool::default(),
237 journal_heights_per_section: cfg.journal_heights_per_section,
238 journal_replay_buffer: cfg.journal_replay_buffer,
239 journal_write_buffer: cfg.journal_write_buffer,
240 journal_name_prefix: cfg.journal_name_prefix,
241 journal_compression: cfg.journal_compression,
242 journal_page_cache: cfg.journal_page_cache,
243 journals: BTreeMap::new(),
244 tip_manager: TipManager::<C::PublicKey, P::Scheme, D>::new(),
245 ack_manager: AckManager::<C::PublicKey, P::Scheme, D>::new(),
246 epoch: Epoch::zero(),
247 priority_proposals: cfg.priority_proposals,
248 priority_acks: cfg.priority_acks,
249 metrics,
250 propose_timer: None,
251 }
252 }
253
254 pub fn start(
265 mut self,
266 chunk_network: (
267 impl Sender<PublicKey = C::PublicKey>,
268 impl Receiver<PublicKey = C::PublicKey>,
269 ),
270 ack_network: (
271 impl Sender<PublicKey = C::PublicKey>,
272 impl Receiver<PublicKey = C::PublicKey>,
273 ),
274 ) -> Handle<()> {
275 spawn_cell!(self.context, self.run(chunk_network, ack_network).await)
276 }
277
278 async fn run(
280 mut self,
281 chunk_network: (
282 impl Sender<PublicKey = C::PublicKey>,
283 impl Receiver<PublicKey = C::PublicKey>,
284 ),
285 ack_network: (
286 impl Sender<PublicKey = C::PublicKey>,
287 impl Receiver<PublicKey = C::PublicKey>,
288 ),
289 ) {
290 let mut node_sender = chunk_network.0;
291 let mut node_receiver = chunk_network.1;
292 let (mut ack_sender, mut ack_receiver) = wrap(
293 (),
294 self.context.network_buffer_pool().clone(),
295 ack_network.0,
296 ack_network.1,
297 );
298
299 let mut pending: Option<(Context<C::PublicKey>, oneshot::Receiver<D>)> = None;
301
302 let (latest, mut epoch_updates) = self.monitor.subscribe().await;
304 self.epoch = latest;
305
306 if let Some(ref signer) = self.sequencer_signer {
309 self.journal_prepare(&signer.public_key()).await;
310 if let Err(err) = self.rebroadcast(&mut node_sender).await {
311 info!(?err, "initial rebroadcast failed");
313 }
314 }
315
316 select_loop! {
317 self.context,
318 on_start => {
319 if pending.is_none() {
321 if let Some(context) = self.should_propose() {
322 let receiver = self.automaton.propose(context.clone()).await;
323 pending = Some((context, receiver));
324 }
325 }
326
327 let rebroadcast = match self.rebroadcast_deadline {
331 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
332 None => Either::Right(future::pending()),
333 };
334 let propose = match &mut pending {
335 Some((_context, receiver)) => Either::Left(receiver),
336 None => Either::Right(futures::future::pending()),
337 };
338 },
339 on_stopped => {
340 debug!("shutdown");
341 },
342 Some(epoch) = epoch_updates.recv() else {
344 error!("epoch subscription failed");
345 break;
346 } => {
347 debug!(current = %self.epoch, new = %epoch, "refresh epoch");
349 assert!(epoch >= self.epoch);
350 self.epoch = epoch;
351 continue;
352 },
353
354 _ = rebroadcast => {
356 if let Some(ref signer) = self.sequencer_signer {
357 debug!(epoch = %self.epoch, sender = ?signer.public_key(), "rebroadcast");
358 if let Err(err) = self.rebroadcast(&mut node_sender).await {
359 info!(?err, "rebroadcast failed");
360 continue;
361 }
362 }
363 },
364
365 receiver = propose => {
367 let (context, _) = pending.take().unwrap();
369 debug!(height = %context.height, "propose");
370
371 let Ok(payload) = receiver else {
373 warn!(?context, "automaton dropped proposal");
374 continue;
375 };
376
377 if let Err(err) = self
379 .propose(context.clone(), payload, &mut node_sender)
380 .await
381 {
382 warn!(?err, ?context, "propose new failed");
383 continue;
384 }
385 },
386
387 msg = node_receiver.recv() => {
389 let (sender, msg) = match msg {
391 Ok(r) => r,
392 Err(err) => {
393 error!(?err, "node receiver failed");
394 break;
395 }
396 };
397 let mut guard = self.metrics.nodes.guard(Status::Invalid);
398
399 let node = match Node::read_staged(&mut msg.as_ref(), &self.validators_provider) {
401 Ok(node) => node,
402 Err(err) => {
403 debug!(?err, ?sender, "node decode failed");
404 continue;
405 }
406 };
407 let result = match self.validate_node(&node, &sender) {
408 Ok(result) => result,
409 Err(err) => {
410 debug!(?err, ?sender, "node validate failed");
411 continue;
412 }
413 };
414
415 self.journal_prepare(&sender).await;
417
418 if let Some(parent_chunk) = result {
420 let parent = node.parent.as_ref().unwrap();
421 self.handle_certificate(
422 &parent_chunk,
423 parent.epoch,
424 parent.certificate.clone(),
425 )
426 .await;
427 }
428
429 self.handle_node(&node).await;
434 debug!(?sender, height = %node.chunk.height, "node");
435 guard.set(Status::Success);
436 },
437
438 msg = ack_receiver.recv() => {
440 let (sender, msg) = match msg {
442 Ok(r) => r,
443 Err(err) => {
444 warn!(?err, "ack receiver failed");
445 break;
446 }
447 };
448 let mut guard = self.metrics.acks.guard(Status::Invalid);
449 let ack = match msg {
450 Ok(ack) => ack,
451 Err(err) => {
452 debug!(?err, ?sender, "ack decode failed");
453 continue;
454 }
455 };
456 if let Err(err) = self.validate_ack(&ack, &sender) {
457 debug!(?err, ?sender, "ack validate failed");
458 continue;
459 };
460 if let Err(err) = self.handle_ack(&ack).await {
461 debug!(?err, ?sender, "ack handle failed");
462 guard.set(Status::Failure);
463 continue;
464 }
465 debug!(?sender, epoch = %ack.epoch, sequencer = ?ack.chunk.sequencer, height = %ack.chunk.height, "ack");
466 guard.set(Status::Success);
467 },
468
469 verify = self.pending_verifies.next_completed() => {
471 let Verify {
472 timer,
473 context,
474 payload,
475 result,
476 } = verify;
477 drop(timer); match result {
479 Err(err) => {
480 warn!(?err, ?context, "verified returned error");
481 self.metrics.verify.inc(Status::Dropped);
482 }
483 Ok(false) => {
484 debug!(?context, "verified was false");
485 self.metrics.verify.inc(Status::Failure);
486 }
487 Ok(true) => {
488 debug!(?context, "verified");
489 self.metrics.verify.inc(Status::Success);
490 if let Err(err) = self
491 .handle_app_verified(&context, &payload, &mut ack_sender)
492 .await
493 {
494 debug!(?err, ?context, ?payload, "verified handle failed");
495 }
496 }
497 }
498 },
499 }
500
501 self.pending_verifies.cancel_all();
503 while let Some((_, journal)) = self.journals.pop_first() {
504 journal.sync_all().await.expect("unable to sync journal");
505 }
506 }
507
508 async fn handle_app_verified(
517 &mut self,
518 context: &Context<C::PublicKey>,
519 payload: &D,
520 ack_sender: &mut WrappedSender<
521 impl Sender<PublicKey = C::PublicKey>,
522 Ack<C::PublicKey, P::Scheme, D>,
523 >,
524 ) -> Result<(), Error> {
525 let Some(tip) = self.tip_manager.get(&context.sequencer) else {
527 return Err(Error::AppVerifiedNoTip);
528 };
529
530 if tip.chunk.height != context.height {
532 return Err(Error::AppVerifiedHeightMismatch);
533 }
534
535 if tip.chunk.payload != *payload {
537 return Err(Error::AppVerifiedPayloadMismatch);
538 }
539
540 self.reporter
542 .report(Activity::Tip(Proposal::new(
543 tip.chunk.clone(),
544 tip.signature.clone(),
545 )))
546 .await;
547
548 let Some(scheme) = self.validators_provider.scoped(self.epoch) else {
550 return Err(Error::UnknownScheme(self.epoch));
551 };
552
553 let Some(ack) = Ack::sign(scheme.as_ref(), tip.chunk.clone(), self.epoch) else {
555 return Err(Error::NotSigner(self.epoch));
556 };
557
558 self.journal_sync(&context.sequencer, context.height).await;
561
562 let recipients = {
565 let validators = scheme.participants();
566 let mut recipients = validators.iter().cloned().collect::<Vec<_>>();
567 if !validators.iter().any(|v| v == &tip.chunk.sequencer) {
568 recipients.push(tip.chunk.sequencer.clone());
569 }
570 recipients
571 };
572
573 self.handle_ack(&ack).await?;
575
576 ack_sender
578 .send(Recipients::Some(recipients), ack, self.priority_acks)
579 .await
580 .map_err(|_| Error::UnableToSendMessage)?;
581
582 Ok(())
583 }
584
585 async fn handle_certificate(
591 &mut self,
592 chunk: &Chunk<C::PublicKey, D>,
593 epoch: Epoch,
594 certificate: <P::Scheme as Scheme>::Certificate,
595 ) {
596 if !self.ack_manager.add_certificate(
598 &chunk.sequencer,
599 chunk.height,
600 epoch,
601 certificate.clone(),
602 ) {
603 return;
604 }
605
606 if let Some(ref signer) = self.sequencer_signer {
608 if chunk.sequencer == signer.public_key() {
609 self.propose_timer.take();
610 }
611 }
612
613 self.reporter
615 .report(Activity::Lock(Lock::new(chunk.clone(), epoch, certificate)))
616 .await;
617 }
618
619 async fn handle_ack(&mut self, ack: &Ack<C::PublicKey, P::Scheme, D>) -> Result<(), Error> {
624 let Some(scheme) = self.validators_provider.scoped(ack.epoch) else {
626 return Err(Error::UnknownScheme(ack.epoch));
627 };
628
629 if let Some(certificate) = self
631 .ack_manager
632 .add_ack(ack, scheme.as_ref(), &self.strategy)
633 {
634 debug!(epoch = %ack.epoch, sequencer = ?ack.chunk.sequencer, height = %ack.chunk.height, "recovered certificate");
635 self.metrics.certificates.inc();
636 self.handle_certificate(&ack.chunk, ack.epoch, certificate)
637 .await;
638 }
639
640 Ok(())
641 }
642
643 async fn handle_node(&mut self, node: &Node<C::PublicKey, P::Scheme, D>) {
647 let is_new = self.tip_manager.put(node);
649
650 if is_new {
652 let _ = self
654 .metrics
655 .sequencer_heights
656 .get_or_create(&metrics::SequencerLabel::from(&node.chunk.sequencer))
657 .try_set(node.chunk.height.get());
658
659 self.journal_append(node.clone()).await;
663 self.journal_sync(&node.chunk.sequencer, node.chunk.height)
664 .await;
665 }
666
667 let context = Context {
669 sequencer: node.chunk.sequencer.clone(),
670 height: node.chunk.height,
671 };
672 let payload = node.chunk.payload;
673 let mut automaton = self.automaton.clone();
674 let timer = self.metrics.verify_duration.timer();
675 self.pending_verifies.push(async move {
676 let receiver = automaton.verify(context.clone(), payload).await;
677 let result = receiver.await.map_err(Error::AppVerifyCanceled);
678 Verify {
679 timer,
680 context,
681 payload,
682 result,
683 }
684 });
685 }
686
687 fn should_propose(&self) -> Option<Context<C::PublicKey>> {
695 let me = self.sequencer_signer.as_ref()?.public_key();
697
698 self.sequencers_provider
700 .sequencers(self.epoch)?
701 .position(&me)?;
702
703 match self.tip_manager.get(&me) {
705 None => Some(Context {
706 sequencer: me,
707 height: Height::zero(),
708 }),
709 Some(tip) => self
710 .ack_manager
711 .get_certificate(&me, tip.chunk.height)
712 .map(|_| Context {
713 sequencer: me,
714 height: tip.chunk.height.next(),
715 }),
716 }
717 }
718
719 async fn propose(
724 &mut self,
725 context: Context<C::PublicKey>,
726 payload: D,
727 node_sender: &mut impl Sender<PublicKey = C::PublicKey>,
728 ) -> Result<(), Error> {
729 let mut guard = self.metrics.propose.guard(Status::Dropped);
730 let signer = self
731 .sequencer_signer
732 .as_mut()
733 .ok_or(Error::IAmNotASequencer(self.epoch))?;
734 let me = signer.public_key();
735
736 if context.sequencer != me {
738 return Err(Error::ContextSequencer);
739 }
740
741 self.sequencers_provider
743 .sequencers(self.epoch)
744 .and_then(|s| s.position(&me))
745 .ok_or(Error::IAmNotASequencer(self.epoch))?;
746
747 let mut height = Height::zero();
749 let mut parent = None;
750 if let Some(tip) = self.tip_manager.get(&me) {
751 let Some((epoch, certificate)) =
753 self.ack_manager.get_certificate(&me, tip.chunk.height)
754 else {
755 return Err(Error::MissingCertificate);
756 };
757
758 height = tip.chunk.height.next();
760 parent = Some(Parent::new(tip.chunk.payload, epoch, certificate.clone()));
761 }
762
763 if context.height != height {
765 return Err(Error::ContextHeight);
766 }
767
768 let node = Node::sign(signer, height, payload, parent);
770
771 self.handle_node(&node).await;
773
774 self.journal_sync(&me, height).await;
777
778 self.propose_timer = Some(self.metrics.e2e_duration.timer());
780
781 if let Err(err) = self.broadcast(node, node_sender, self.epoch).await {
783 guard.set(Status::Failure);
784 return Err(err);
785 };
786
787 guard.set(Status::Success);
789 Ok(())
790 }
791
792 async fn rebroadcast(
799 &mut self,
800 node_sender: &mut impl Sender<PublicKey = C::PublicKey>,
801 ) -> Result<(), Error> {
802 let mut guard = self.metrics.rebroadcast.guard(Status::Dropped);
803
804 self.rebroadcast_deadline = None;
806
807 let signer = self
809 .sequencer_signer
810 .as_ref()
811 .ok_or(Error::IAmNotASequencer(self.epoch))?;
812 let me = signer.public_key();
813
814 self.sequencers_provider
816 .sequencers(self.epoch)
817 .and_then(|s| s.position(&me))
818 .ok_or(Error::IAmNotASequencer(self.epoch))?;
819
820 let Some(tip) = self.tip_manager.get(&me) else {
822 return Err(Error::NothingToRebroadcast);
823 };
824
825 if self
827 .ack_manager
828 .get_certificate(&me, tip.chunk.height)
829 .is_some()
830 {
831 return Err(Error::AlreadyCertified);
832 }
833
834 guard.set(Status::Failure);
836 self.broadcast(tip, node_sender, self.epoch).await?;
837 guard.set(Status::Success);
838 Ok(())
839 }
840
841 async fn broadcast(
843 &mut self,
844 node: Node<C::PublicKey, P::Scheme, D>,
845 node_sender: &mut impl Sender<PublicKey = C::PublicKey>,
846 epoch: Epoch,
847 ) -> Result<(), Error> {
848 let Some(scheme) = self.validators_provider.scoped(epoch) else {
850 return Err(Error::UnknownScheme(epoch));
851 };
852 let validators = scheme.participants();
853
854 self.relay.broadcast(node.chunk.payload).await;
856
857 node_sender
859 .send(
860 Recipients::Some(validators.iter().cloned().collect()),
861 node.encode(),
862 self.priority_proposals,
863 )
864 .await
865 .map_err(|_| Error::BroadcastFailed)?;
866
867 self.rebroadcast_deadline = Some(self.context.current() + self.rebroadcast_timeout);
869
870 Ok(())
871 }
872
873 fn validate_node(
883 &mut self,
884 node: &Node<C::PublicKey, P::Scheme, D>,
885 sender: &C::PublicKey,
886 ) -> Result<Option<Chunk<C::PublicKey, D>>, Error> {
887 if node.chunk.sequencer != *sender {
889 return Err(Error::PeerMismatch);
890 }
891
892 if let Some(tip) = self.tip_manager.get(sender) {
895 if tip == *node {
896 return Ok(None);
897 }
898 }
899
900 self.validate_chunk(&node.chunk, self.epoch)?;
902
903 node.verify(
905 &mut self.context,
906 &self.chunk_verifier,
907 &self.validators_provider,
908 &self.strategy,
909 )
910 }
911
912 fn validate_ack(
917 &mut self,
918 ack: &Ack<C::PublicKey, P::Scheme, D>,
919 sender: &<P::Scheme as Scheme>::PublicKey,
920 ) -> Result<(), Error> {
921 self.validate_chunk(&ack.chunk, ack.epoch)?;
923
924 let Some(scheme) = self.validators_provider.scoped(ack.epoch) else {
926 return Err(Error::UnknownScheme(ack.epoch));
927 };
928
929 let participants = scheme.participants();
931 let Some(index) = participants.index(sender) else {
932 return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
933 };
934 if index != ack.attestation.signer {
935 return Err(Error::PeerMismatch);
936 }
937
938 {
940 let (eb_lo, eb_hi) = self.epoch_bounds;
941 let bound_lo = self.epoch.saturating_sub(eb_lo);
942 let bound_hi = self.epoch.saturating_add(eb_hi);
943 if ack.epoch < bound_lo || ack.epoch > bound_hi {
944 return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
945 }
946 }
947
948 {
950 let bound_lo = self
951 .tip_manager
952 .get(&ack.chunk.sequencer)
953 .map(|t| t.chunk.height)
954 .unwrap_or(Height::zero());
955 let bound_hi = bound_lo.saturating_add(self.height_bound);
956 if ack.chunk.height < bound_lo || ack.chunk.height > bound_hi {
957 return Err(Error::AckHeightOutsideBounds(
958 ack.chunk.height,
959 bound_lo,
960 bound_hi,
961 ));
962 }
963 }
964
965 if !ack.verify(&mut self.context, scheme.as_ref(), &self.strategy) {
967 return Err(Error::InvalidAckSignature);
968 }
969
970 Ok(())
971 }
972
973 fn validate_chunk(&self, chunk: &Chunk<C::PublicKey, D>, epoch: Epoch) -> Result<(), Error> {
978 if self
980 .sequencers_provider
981 .sequencers(epoch)
982 .and_then(|s| s.position(&chunk.sequencer))
983 .is_none()
984 {
985 return Err(Error::UnknownSequencer(epoch, chunk.sequencer.to_string()));
986 }
987
988 if let Some(tip) = self.tip_manager.get(&chunk.sequencer) {
990 match chunk.height.cmp(&tip.chunk.height) {
992 std::cmp::Ordering::Less => {
993 return Err(Error::ChunkHeightTooLow(chunk.height, tip.chunk.height));
994 }
995 std::cmp::Ordering::Equal => {
996 if tip.chunk.payload != chunk.payload {
998 return Err(Error::ChunkMismatch(
999 chunk.sequencer.to_string(),
1000 chunk.height,
1001 ));
1002 }
1003 }
1004 std::cmp::Ordering::Greater => {}
1005 }
1006 }
1007
1008 Ok(())
1009 }
1010
1011 const fn get_journal_section(&self, height: Height) -> u64 {
1017 height.get() / self.journal_heights_per_section.get()
1018 }
1019
1020 async fn journal_prepare(&mut self, sequencer: &C::PublicKey) {
1024 if self.journals.contains_key(sequencer) {
1026 return;
1027 }
1028
1029 let cfg = JournalConfig {
1031 partition: format!("{}{}", &self.journal_name_prefix, sequencer),
1032 compression: self.journal_compression,
1033 codec_config: P::Scheme::certificate_codec_config_unbounded(),
1034 page_cache: self.journal_page_cache.clone(),
1035 write_buffer: self.journal_write_buffer,
1036 };
1037 let journal = Journal::<_, Node<C::PublicKey, P::Scheme, D>>::init(
1038 self.context
1039 .with_label("journal")
1040 .with_attribute("sequencer", sequencer)
1041 .into_present(),
1042 cfg,
1043 )
1044 .await
1045 .expect("unable to init journal");
1046
1047 {
1049 debug!(?sequencer, "journal replay begin");
1050
1051 let stream = journal
1053 .replay(0, 0, self.journal_replay_buffer)
1054 .await
1055 .expect("unable to replay journal");
1056 pin_mut!(stream);
1057
1058 let mut tip: Option<Node<C::PublicKey, P::Scheme, D>> = None;
1061 let mut num_items = 0;
1062 while let Some(msg) = stream.next().await {
1063 let (_, _, _, node) = msg.expect("unable to read from journal");
1064 num_items += 1;
1065 let height = node.chunk.height;
1066 match tip {
1067 None => {
1068 tip = Some(node);
1069 }
1070 Some(ref t) => {
1071 if height > t.chunk.height {
1072 tip = Some(node);
1073 }
1074 }
1075 }
1076 }
1077
1078 if let Some(node) = tip.take() {
1081 let is_new = self.tip_manager.put(&node);
1082 assert!(is_new);
1083 }
1084
1085 debug!(?sequencer, ?num_items, "journal replay end");
1086 }
1087
1088 self.journals.insert(sequencer.clone(), journal);
1090 }
1091
1092 async fn journal_append(&mut self, node: Node<C::PublicKey, P::Scheme, D>) {
1097 let section = self.get_journal_section(node.chunk.height);
1098 self.journals
1099 .get_mut(&node.chunk.sequencer)
1100 .expect("journal does not exist")
1101 .append(section, &node)
1102 .await
1103 .expect("unable to append to journal");
1104 }
1105
1106 async fn journal_sync(&mut self, sequencer: &C::PublicKey, height: Height) {
1108 let section = self.get_journal_section(height);
1109
1110 let journal = self
1112 .journals
1113 .get_mut(sequencer)
1114 .expect("journal does not exist");
1115
1116 journal.sync(section).await.expect("unable to sync journal");
1118
1119 let _ = journal.prune(section).await;
1121 }
1122}