1use super::{
11 metrics, scheme,
12 types::{
13 Ack, Activity, Chunk, Context, Error, Lock, Node, Parent, Proposal, SequencersProvider,
14 },
15 AckManager, Config, TipManager,
16};
17use crate::{
18 types::{Epoch, EpochDelta},
19 Automaton, Monitor, Relay, Reporter,
20};
21use commonware_codec::Encode;
22use commonware_cryptography::{
23 certificate::{Provider, Scheme},
24 Digest, PublicKey, Signer,
25};
26use commonware_macros::select;
27use commonware_p2p::{
28 utils::codec::{wrap, WrappedSender},
29 Receiver, Recipients, Sender,
30};
31use commonware_runtime::{
32 buffer::PoolRef,
33 spawn_cell,
34 telemetry::metrics::{
35 histogram,
36 status::{CounterExt, GaugeExt, Status},
37 },
38 Clock, ContextCell, Handle, Metrics, Spawner, Storage,
39};
40use commonware_storage::journal::segmented::variable::{Config as JournalConfig, Journal};
41use commonware_utils::futures::Pool as FuturesPool;
42use futures::{
43 channel::oneshot,
44 future::{self, Either},
45 pin_mut, StreamExt,
46};
47use rand::{CryptoRng, Rng};
48use std::{
49 collections::BTreeMap,
50 num::NonZeroUsize,
51 time::{Duration, SystemTime},
52};
53use tracing::{debug, error, info, warn};
54
55struct Verify<C: PublicKey, D: Digest, E: Clock> {
57 timer: histogram::Timer<E>,
58 context: Context<C>,
59 payload: D,
60 result: Result<bool, Error>,
61}
62
63pub struct Engine<
65 E: Clock + Spawner + Rng + CryptoRng + Storage + Metrics,
66 C: Signer,
67 S: SequencersProvider<PublicKey = C::PublicKey>,
68 P: Provider<Scope = Epoch, Scheme: scheme::Scheme<C::PublicKey, D>>,
69 D: Digest,
70 A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
71 R: Relay<Digest = D>,
72 Z: Reporter<Activity = Activity<C::PublicKey, P::Scheme, D>>,
73 M: Monitor<Index = Epoch>,
74> {
75 context: ContextCell<E>,
79 sequencer_signer: Option<C>,
80 sequencers_provider: S,
81 validators_provider: P,
82 automaton: A,
83 relay: R,
84 monitor: M,
85 reporter: Z,
86
87 namespace: Vec<u8>,
93
94 rebroadcast_timeout: Duration,
100 rebroadcast_deadline: Option<SystemTime>,
101
102 epoch_bounds: (EpochDelta, EpochDelta),
114
115 height_bound: u64,
121
122 pending_verifies: FuturesPool<Verify<C::PublicKey, D, E>>,
134
135 journal_heights_per_section: u64,
141
142 journal_replay_buffer: NonZeroUsize,
144
145 journal_write_buffer: NonZeroUsize,
147
148 journal_name_prefix: String,
151
152 journal_compression: Option<u8>,
154
155 journal_buffer_pool: PoolRef,
157
158 #[allow(clippy::type_complexity)]
160 journals: BTreeMap<C::PublicKey, Journal<E, Node<C::PublicKey, P::Scheme, D>>>,
161
162 tip_manager: TipManager<C::PublicKey, P::Scheme, D>,
171
172 ack_manager: AckManager<C::PublicKey, P::Scheme, D>,
175
176 epoch: Epoch,
178
179 priority_proposals: bool,
185
186 priority_acks: bool,
188
189 metrics: metrics::Metrics<E>,
195
196 propose_timer: Option<histogram::Timer<E>>,
198}
199
200impl<
201 E: Clock + Spawner + Rng + CryptoRng + Storage + Metrics,
202 C: Signer,
203 S: SequencersProvider<PublicKey = C::PublicKey>,
204 P: Provider<Scope = Epoch, Scheme: scheme::Scheme<C::PublicKey, D, PublicKey = C::PublicKey>>,
205 D: Digest,
206 A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
207 R: Relay<Digest = D>,
208 Z: Reporter<Activity = Activity<C::PublicKey, P::Scheme, D>>,
209 M: Monitor<Index = Epoch>,
210 > Engine<E, C, S, P, D, A, R, Z, M>
211{
212 pub fn new(context: E, cfg: Config<C, S, P, D, A, R, Z, M>) -> Self {
214 let metrics = metrics::Metrics::init(context.clone());
216
217 Self {
218 context: ContextCell::new(context),
219 sequencer_signer: cfg.sequencer_signer,
220 sequencers_provider: cfg.sequencers_provider,
221 validators_provider: cfg.validators_provider,
222 automaton: cfg.automaton,
223 relay: cfg.relay,
224 reporter: cfg.reporter,
225 monitor: cfg.monitor,
226 namespace: cfg.namespace,
227 rebroadcast_timeout: cfg.rebroadcast_timeout,
228 rebroadcast_deadline: None,
229 epoch_bounds: cfg.epoch_bounds,
230 height_bound: cfg.height_bound,
231 pending_verifies: FuturesPool::default(),
232 journal_heights_per_section: cfg.journal_heights_per_section,
233 journal_replay_buffer: cfg.journal_replay_buffer,
234 journal_write_buffer: cfg.journal_write_buffer,
235 journal_name_prefix: cfg.journal_name_prefix,
236 journal_compression: cfg.journal_compression,
237 journal_buffer_pool: cfg.journal_buffer_pool,
238 journals: BTreeMap::new(),
239 tip_manager: TipManager::<C::PublicKey, P::Scheme, D>::new(),
240 ack_manager: AckManager::<C::PublicKey, P::Scheme, D>::new(),
241 epoch: Epoch::zero(),
242 priority_proposals: cfg.priority_proposals,
243 priority_acks: cfg.priority_acks,
244 metrics,
245 propose_timer: None,
246 }
247 }
248
249 pub fn start(
260 mut self,
261 chunk_network: (
262 impl Sender<PublicKey = C::PublicKey>,
263 impl Receiver<PublicKey = C::PublicKey>,
264 ),
265 ack_network: (
266 impl Sender<PublicKey = C::PublicKey>,
267 impl Receiver<PublicKey = C::PublicKey>,
268 ),
269 ) -> Handle<()> {
270 spawn_cell!(self.context, self.run(chunk_network, ack_network).await)
271 }
272
273 async fn run(
275 mut self,
276 chunk_network: (
277 impl Sender<PublicKey = C::PublicKey>,
278 impl Receiver<PublicKey = C::PublicKey>,
279 ),
280 ack_network: (
281 impl Sender<PublicKey = C::PublicKey>,
282 impl Receiver<PublicKey = C::PublicKey>,
283 ),
284 ) {
285 let mut node_sender = chunk_network.0;
286 let mut node_receiver = chunk_network.1;
287 let (mut ack_sender, mut ack_receiver) = wrap((), ack_network.0, ack_network.1);
288 let mut shutdown = self.context.stopped();
289
290 let mut pending: Option<(Context<C::PublicKey>, oneshot::Receiver<D>)> = None;
292
293 let (latest, mut epoch_updates) = self.monitor.subscribe().await;
295 self.epoch = latest;
296
297 if let Some(ref signer) = self.sequencer_signer {
300 self.journal_prepare(&signer.public_key()).await;
301 if let Err(err) = self.rebroadcast(&mut node_sender).await {
302 info!(?err, "initial rebroadcast failed");
304 }
305 }
306
307 loop {
308 if pending.is_none() {
310 if let Some(context) = self.should_propose() {
311 let receiver = self.automaton.propose(context.clone()).await;
312 pending = Some((context, receiver));
313 }
314 }
315
316 let rebroadcast = match self.rebroadcast_deadline {
320 Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
321 None => Either::Right(future::pending()),
322 };
323 let propose = match &mut pending {
324 Some((_context, receiver)) => Either::Left(receiver),
325 None => Either::Right(futures::future::pending()),
326 };
327
328 select! {
330 _ = &mut shutdown => {
332 debug!("shutdown");
333 break;
334 },
335
336 epoch = epoch_updates.next() => {
338 let Some(epoch) = epoch else {
340 error!("epoch subscription failed");
341 break;
342 };
343
344 debug!(current = %self.epoch, new = %epoch, "refresh epoch");
346 assert!(epoch >= self.epoch);
347 self.epoch = epoch;
348 continue;
349 },
350
351 _ = rebroadcast => {
353 if let Some(ref signer) = self.sequencer_signer {
354 debug!(epoch = %self.epoch, sender = ?signer.public_key(), "rebroadcast");
355 if let Err(err) = self.rebroadcast(&mut node_sender).await {
356 info!(?err, "rebroadcast failed");
357 continue;
358 }
359 }
360 },
361
362 receiver = propose => {
364 let (context, _) = pending.take().unwrap();
366 debug!(height = context.height, "propose");
367
368 let Ok(payload) = receiver else {
370 warn!(?context, "automaton dropped proposal");
371 continue;
372 };
373
374 if let Err(err) = self.propose(context.clone(), payload, &mut node_sender).await {
376 warn!(?err, ?context, "propose new failed");
377 continue;
378 }
379 },
380
381 msg = node_receiver.recv() => {
383 let (sender, msg) = match msg {
385 Ok(r) => r,
386 Err(err) => {
387 error!(?err, "node receiver failed");
388 break;
389 }
390 };
391 let mut guard = self.metrics.nodes.guard(Status::Invalid);
392
393 let node = match Node::read_staged(&mut msg.as_ref(), &self.validators_provider) {
395 Ok(node) => node,
396 Err(err) => {
397 debug!(?err, ?sender, "node decode failed");
398 continue;
399 }
400 };
401 let result = match self.validate_node(&node, &sender) {
402 Ok(result) => result,
403 Err(err) => {
404 debug!(?err, ?sender, "node validate failed");
405 continue;
406 }
407 };
408
409 self.journal_prepare(&sender).await;
411
412 if let Some(parent_chunk) = result {
414 let parent = node.parent.as_ref().unwrap();
415 self.handle_certificate(&parent_chunk, parent.epoch, parent.certificate.clone()).await;
416 }
417
418 self.handle_node(&node).await;
423 debug!(?sender, height=node.chunk.height, "node");
424 guard.set(Status::Success);
425 },
426
427 msg = ack_receiver.recv() => {
429 let (sender, msg) = match msg {
431 Ok(r) => r,
432 Err(err) => {
433 warn!(?err, "ack receiver failed");
434 break;
435 }
436 };
437 let mut guard = self.metrics.acks.guard(Status::Invalid);
438 let ack = match msg {
439 Ok(ack) => ack,
440 Err(err) => {
441 debug!(?err, ?sender, "ack decode failed");
442 continue;
443 }
444 };
445 if let Err(err) = self.validate_ack(&ack, &sender) {
446 debug!(?err, ?sender, "ack validate failed");
447 continue;
448 };
449 if let Err(err) = self.handle_ack(&ack).await {
450 debug!(?err, ?sender, "ack handle failed");
451 guard.set(Status::Failure);
452 continue;
453 }
454 debug!(?sender, epoch = %ack.epoch, sequencer = ?ack.chunk.sequencer, height = ack.chunk.height, "ack");
455 guard.set(Status::Success);
456 },
457
458 verify = self.pending_verifies.next_completed() => {
460 let Verify { timer, context, payload, result } = verify;
461 drop(timer); match result {
463 Err(err) => {
464 warn!(?err, ?context, "verified returned error");
465 self.metrics.verify.inc(Status::Dropped);
466 }
467 Ok(false) => {
468 debug!(?context, "verified was false");
469 self.metrics.verify.inc(Status::Failure);
470 }
471 Ok(true) => {
472 debug!(?context, "verified");
473 self.metrics.verify.inc(Status::Success);
474 if let Err(err) = self.handle_app_verified(&context, &payload, &mut ack_sender).await {
475 debug!(?err, ?context, ?payload, "verified handle failed");
476 }
477 },
478 }
479 },
480 }
481 }
482
483 self.pending_verifies.cancel_all();
485 while let Some((_, journal)) = self.journals.pop_first() {
486 journal.close().await.expect("unable to close journal");
487 }
488 }
489
490 async fn handle_app_verified(
499 &mut self,
500 context: &Context<C::PublicKey>,
501 payload: &D,
502 ack_sender: &mut WrappedSender<
503 impl Sender<PublicKey = C::PublicKey>,
504 Ack<C::PublicKey, P::Scheme, D>,
505 >,
506 ) -> Result<(), Error> {
507 let Some(tip) = self.tip_manager.get(&context.sequencer) else {
509 return Err(Error::AppVerifiedNoTip);
510 };
511
512 if tip.chunk.height != context.height {
514 return Err(Error::AppVerifiedHeightMismatch);
515 }
516
517 if tip.chunk.payload != *payload {
519 return Err(Error::AppVerifiedPayloadMismatch);
520 }
521
522 self.reporter
524 .report(Activity::Tip(Proposal::new(
525 tip.chunk.clone(),
526 tip.signature.clone(),
527 )))
528 .await;
529
530 let Some(scheme) = self.validators_provider.scoped(self.epoch) else {
532 return Err(Error::UnknownScheme(self.epoch));
533 };
534
535 let Some(ack) = Ack::sign(
537 &self.namespace,
538 scheme.as_ref(),
539 tip.chunk.clone(),
540 self.epoch,
541 ) else {
542 return Err(Error::NotSigner(self.epoch));
543 };
544
545 self.journal_sync(&context.sequencer, context.height).await;
548
549 let recipients = {
552 let validators = scheme.participants();
553 let mut recipients = validators.iter().cloned().collect::<Vec<_>>();
554 if !validators.iter().any(|v| v == &tip.chunk.sequencer) {
555 recipients.push(tip.chunk.sequencer.clone());
556 }
557 recipients
558 };
559
560 self.handle_ack(&ack).await?;
562
563 ack_sender
565 .send(Recipients::Some(recipients), ack, self.priority_acks)
566 .await
567 .map_err(|_| Error::UnableToSendMessage)?;
568
569 Ok(())
570 }
571
572 async fn handle_certificate(
578 &mut self,
579 chunk: &Chunk<C::PublicKey, D>,
580 epoch: Epoch,
581 certificate: <P::Scheme as Scheme>::Certificate,
582 ) {
583 if !self.ack_manager.add_certificate(
585 &chunk.sequencer,
586 chunk.height,
587 epoch,
588 certificate.clone(),
589 ) {
590 return;
591 }
592
593 if let Some(ref signer) = self.sequencer_signer {
595 if chunk.sequencer == signer.public_key() {
596 self.propose_timer.take();
597 }
598 }
599
600 self.reporter
602 .report(Activity::Lock(Lock::new(chunk.clone(), epoch, certificate)))
603 .await;
604 }
605
606 async fn handle_ack(&mut self, ack: &Ack<C::PublicKey, P::Scheme, D>) -> Result<(), Error> {
611 let Some(scheme) = self.validators_provider.scoped(ack.epoch) else {
613 return Err(Error::UnknownScheme(ack.epoch));
614 };
615
616 if let Some(certificate) = self.ack_manager.add_ack(ack, scheme.as_ref()) {
618 debug!(epoch = %ack.epoch, sequencer = ?ack.chunk.sequencer, height = ack.chunk.height, "recovered certificate");
619 self.metrics.certificates.inc();
620 self.handle_certificate(&ack.chunk, ack.epoch, certificate)
621 .await;
622 }
623
624 Ok(())
625 }
626
627 async fn handle_node(&mut self, node: &Node<C::PublicKey, P::Scheme, D>) {
631 let is_new = self.tip_manager.put(node);
633
634 if is_new {
636 let _ = self
638 .metrics
639 .sequencer_heights
640 .get_or_create(&metrics::SequencerLabel::from(&node.chunk.sequencer))
641 .try_set(node.chunk.height);
642
643 self.journal_append(node.clone()).await;
647 self.journal_sync(&node.chunk.sequencer, node.chunk.height)
648 .await;
649 }
650
651 let context = Context {
653 sequencer: node.chunk.sequencer.clone(),
654 height: node.chunk.height,
655 };
656 let payload = node.chunk.payload;
657 let mut automaton = self.automaton.clone();
658 let timer = self.metrics.verify_duration.timer();
659 self.pending_verifies.push(async move {
660 let receiver = automaton.verify(context.clone(), payload).await;
661 let result = receiver.await.map_err(Error::AppVerifyCanceled);
662 Verify {
663 timer,
664 context,
665 payload,
666 result,
667 }
668 });
669 }
670
671 fn should_propose(&self) -> Option<Context<C::PublicKey>> {
679 let me = self.sequencer_signer.as_ref()?.public_key();
681
682 self.sequencers_provider
684 .sequencers(self.epoch)?
685 .position(&me)?;
686
687 match self.tip_manager.get(&me) {
689 None => Some(Context {
690 sequencer: me,
691 height: 0,
692 }),
693 Some(tip) => self
694 .ack_manager
695 .get_certificate(&me, tip.chunk.height)
696 .map(|_| Context {
697 sequencer: me,
698 height: tip.chunk.height.checked_add(1).unwrap(),
699 }),
700 }
701 }
702
703 async fn propose(
708 &mut self,
709 context: Context<C::PublicKey>,
710 payload: D,
711 node_sender: &mut impl Sender<PublicKey = C::PublicKey>,
712 ) -> Result<(), Error> {
713 let mut guard = self.metrics.propose.guard(Status::Dropped);
714 let signer = self
715 .sequencer_signer
716 .as_mut()
717 .ok_or(Error::IAmNotASequencer(self.epoch))?;
718 let me = signer.public_key();
719
720 if context.sequencer != me {
722 return Err(Error::ContextSequencer);
723 }
724
725 self.sequencers_provider
727 .sequencers(self.epoch)
728 .and_then(|s| s.position(&me))
729 .ok_or(Error::IAmNotASequencer(self.epoch))?;
730
731 let mut height = 0;
733 let mut parent = None;
734 if let Some(tip) = self.tip_manager.get(&me) {
735 let Some((epoch, certificate)) =
737 self.ack_manager.get_certificate(&me, tip.chunk.height)
738 else {
739 return Err(Error::MissingCertificate);
740 };
741
742 height = tip.chunk.height + 1;
744 parent = Some(Parent::new(tip.chunk.payload, epoch, certificate.clone()));
745 }
746
747 if context.height != height {
749 return Err(Error::ContextHeight);
750 }
751
752 let node = Node::sign(&self.namespace, signer, height, payload, parent);
754
755 self.handle_node(&node).await;
757
758 self.journal_sync(&me, height).await;
761
762 self.propose_timer = Some(self.metrics.e2e_duration.timer());
764
765 if let Err(err) = self.broadcast(node, node_sender, self.epoch).await {
767 guard.set(Status::Failure);
768 return Err(err);
769 };
770
771 guard.set(Status::Success);
773 Ok(())
774 }
775
776 async fn rebroadcast(
783 &mut self,
784 node_sender: &mut impl Sender<PublicKey = C::PublicKey>,
785 ) -> Result<(), Error> {
786 let mut guard = self.metrics.rebroadcast.guard(Status::Dropped);
787
788 self.rebroadcast_deadline = None;
790
791 let signer = self
793 .sequencer_signer
794 .as_ref()
795 .ok_or(Error::IAmNotASequencer(self.epoch))?;
796 let me = signer.public_key();
797
798 self.sequencers_provider
800 .sequencers(self.epoch)
801 .and_then(|s| s.position(&me))
802 .ok_or(Error::IAmNotASequencer(self.epoch))?;
803
804 let Some(tip) = self.tip_manager.get(&me) else {
806 return Err(Error::NothingToRebroadcast);
807 };
808
809 if self
811 .ack_manager
812 .get_certificate(&me, tip.chunk.height)
813 .is_some()
814 {
815 return Err(Error::AlreadyCertified);
816 }
817
818 guard.set(Status::Failure);
820 self.broadcast(tip, node_sender, self.epoch).await?;
821 guard.set(Status::Success);
822 Ok(())
823 }
824
825 async fn broadcast(
827 &mut self,
828 node: Node<C::PublicKey, P::Scheme, D>,
829 node_sender: &mut impl Sender<PublicKey = C::PublicKey>,
830 epoch: Epoch,
831 ) -> Result<(), Error> {
832 let Some(scheme) = self.validators_provider.scoped(epoch) else {
834 return Err(Error::UnknownScheme(epoch));
835 };
836 let validators = scheme.participants();
837
838 self.relay.broadcast(node.chunk.payload).await;
840
841 node_sender
843 .send(
844 Recipients::Some(validators.iter().cloned().collect()),
845 node.encode().into(),
846 self.priority_proposals,
847 )
848 .await
849 .map_err(|_| Error::BroadcastFailed)?;
850
851 self.rebroadcast_deadline = Some(self.context.current() + self.rebroadcast_timeout);
853
854 Ok(())
855 }
856
857 fn validate_node(
867 &mut self,
868 node: &Node<C::PublicKey, P::Scheme, D>,
869 sender: &C::PublicKey,
870 ) -> Result<Option<Chunk<C::PublicKey, D>>, Error> {
871 if node.chunk.sequencer != *sender {
873 return Err(Error::PeerMismatch);
874 }
875
876 if let Some(tip) = self.tip_manager.get(sender) {
879 if tip == *node {
880 return Ok(None);
881 }
882 }
883
884 self.validate_chunk(&node.chunk, self.epoch)?;
886
887 node.verify(
889 &mut self.context,
890 &self.namespace,
891 &self.validators_provider,
892 )
893 }
894
895 fn validate_ack(
900 &self,
901 ack: &Ack<C::PublicKey, P::Scheme, D>,
902 sender: &<P::Scheme as Scheme>::PublicKey,
903 ) -> Result<(), Error> {
904 self.validate_chunk(&ack.chunk, ack.epoch)?;
906
907 let Some(scheme) = self.validators_provider.scoped(ack.epoch) else {
909 return Err(Error::UnknownScheme(ack.epoch));
910 };
911
912 let participants = scheme.participants();
914 let Some(index) = participants.iter().position(|p| p == sender) else {
915 return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
916 };
917 if index as u32 != ack.attestation.signer {
918 return Err(Error::PeerMismatch);
919 }
920
921 {
923 let (eb_lo, eb_hi) = self.epoch_bounds;
924 let bound_lo = self.epoch.saturating_sub(eb_lo);
925 let bound_hi = self.epoch.saturating_add(eb_hi);
926 if ack.epoch < bound_lo || ack.epoch > bound_hi {
927 return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
928 }
929 }
930
931 {
933 let bound_lo = self
934 .tip_manager
935 .get(&ack.chunk.sequencer)
936 .map(|t| t.chunk.height)
937 .unwrap_or(0);
938 let bound_hi = bound_lo + self.height_bound;
939 if ack.chunk.height < bound_lo || ack.chunk.height > bound_hi {
940 return Err(Error::AckHeightOutsideBounds(
941 ack.chunk.height,
942 bound_lo,
943 bound_hi,
944 ));
945 }
946 }
947
948 if !ack.verify(&self.namespace, scheme.as_ref()) {
950 return Err(Error::InvalidAckSignature);
951 }
952
953 Ok(())
954 }
955
956 fn validate_chunk(&self, chunk: &Chunk<C::PublicKey, D>, epoch: Epoch) -> Result<(), Error> {
961 if self
963 .sequencers_provider
964 .sequencers(epoch)
965 .and_then(|s| s.position(&chunk.sequencer))
966 .is_none()
967 {
968 return Err(Error::UnknownSequencer(epoch, chunk.sequencer.to_string()));
969 }
970
971 if let Some(tip) = self.tip_manager.get(&chunk.sequencer) {
973 match chunk.height.cmp(&tip.chunk.height) {
975 std::cmp::Ordering::Less => {
976 return Err(Error::ChunkHeightTooLow(chunk.height, tip.chunk.height));
977 }
978 std::cmp::Ordering::Equal => {
979 if tip.chunk.payload != chunk.payload {
981 return Err(Error::ChunkMismatch(
982 chunk.sequencer.to_string(),
983 chunk.height,
984 ));
985 }
986 }
987 std::cmp::Ordering::Greater => {}
988 }
989 }
990
991 Ok(())
992 }
993
994 const fn get_journal_section(&self, height: u64) -> u64 {
1000 height / self.journal_heights_per_section
1001 }
1002
1003 async fn journal_prepare(&mut self, sequencer: &C::PublicKey) {
1007 if self.journals.contains_key(sequencer) {
1009 return;
1010 }
1011
1012 let cfg = JournalConfig {
1014 partition: format!("{}{}", &self.journal_name_prefix, sequencer),
1015 compression: self.journal_compression,
1016 codec_config: P::Scheme::certificate_codec_config_unbounded(),
1017 buffer_pool: self.journal_buffer_pool.clone(),
1018 write_buffer: self.journal_write_buffer,
1019 };
1020 let journal = Journal::<_, Node<C::PublicKey, P::Scheme, D>>::init(
1021 self.context.with_label("journal").into_present(),
1022 cfg,
1023 )
1024 .await
1025 .expect("unable to init journal");
1026
1027 {
1029 debug!(?sequencer, "journal replay begin");
1030
1031 let stream = journal
1033 .replay(0, 0, self.journal_replay_buffer)
1034 .await
1035 .expect("unable to replay journal");
1036 pin_mut!(stream);
1037
1038 let mut tip: Option<Node<C::PublicKey, P::Scheme, D>> = None;
1041 let mut num_items = 0;
1042 while let Some(msg) = stream.next().await {
1043 let (_, _, _, node) = msg.expect("unable to read from journal");
1044 num_items += 1;
1045 let height = node.chunk.height;
1046 match tip {
1047 None => {
1048 tip = Some(node);
1049 }
1050 Some(ref t) => {
1051 if height > t.chunk.height {
1052 tip = Some(node);
1053 }
1054 }
1055 }
1056 }
1057
1058 if let Some(node) = tip.take() {
1061 let is_new = self.tip_manager.put(&node);
1062 assert!(is_new);
1063 }
1064
1065 debug!(?sequencer, ?num_items, "journal replay end");
1066 }
1067
1068 self.journals.insert(sequencer.clone(), journal);
1070 }
1071
1072 async fn journal_append(&mut self, node: Node<C::PublicKey, P::Scheme, D>) {
1077 let section = self.get_journal_section(node.chunk.height);
1078 self.journals
1079 .get_mut(&node.chunk.sequencer)
1080 .expect("journal does not exist")
1081 .append(section, node)
1082 .await
1083 .expect("unable to append to journal");
1084 }
1085
1086 async fn journal_sync(&mut self, sequencer: &C::PublicKey, height: u64) {
1088 let section = self.get_journal_section(height);
1089
1090 let journal = self
1092 .journals
1093 .get_mut(sequencer)
1094 .expect("journal does not exist");
1095
1096 journal.sync(section).await.expect("unable to sync journal");
1098
1099 let _ = journal.prune(section).await;
1101 }
1102}