1use std::collections::BTreeMap;
72
73use async_graphql::{ComplexObject, SimpleObject};
74use custom_debug_derive::Debug;
75use futures::future::Either;
76use linera_base::{
77 crypto::{AccountPublicKey, ValidatorSecretKey},
78 data_types::{Blob, BlockHeight, Round, Timestamp},
79 ensure,
80 identifiers::{AccountOwner, BlobId, ChainId},
81 ownership::ChainOwnership,
82};
83use linera_execution::{committee::Epoch, ExecutionRuntimeContext};
84use linera_views::{
85 context::Context,
86 map_view::MapView,
87 register_view::RegisterView,
88 views::{ClonableView, View, ViewError},
89};
90use rand_chacha::{rand_core::SeedableRng, ChaCha8Rng};
91use rand_distr::{Distribution, WeightedAliasIndex};
92use serde::{Deserialize, Serialize};
93
94use crate::{
95 block::{Block, ConfirmedBlock, Timeout, ValidatedBlock},
96 data_types::{BlockProposal, LiteVote, ProposedBlock, Vote},
97 types::{TimeoutCertificate, ValidatedBlockCertificate},
98 ChainError,
99};
100
101#[derive(Eq, PartialEq)]
103pub enum Outcome {
104 Accept,
105 Skip,
106}
107
108pub type ValidatedOrConfirmedVote<'a> = Either<&'a Vote<ValidatedBlock>, &'a Vote<ConfirmedBlock>>;
109
110#[derive(Debug, Clone, Serialize, Deserialize)]
114#[cfg_attr(with_testing, derive(Eq, PartialEq))]
115pub enum LockingBlock {
116 Fast(BlockProposal),
118 Regular(ValidatedBlockCertificate),
120}
121
122impl LockingBlock {
123 pub fn round(&self) -> Round {
126 match self {
127 Self::Fast(_) => Round::Fast,
128 Self::Regular(certificate) => certificate.round,
129 }
130 }
131
132 pub fn chain_id(&self) -> ChainId {
133 match self {
134 Self::Fast(proposal) => proposal.content.block.chain_id,
135 Self::Regular(certificate) => certificate.value().chain_id(),
136 }
137 }
138}
139
140#[derive(Debug, View, ClonableView, SimpleObject)]
142#[graphql(complex)]
143pub struct ChainManager<C>
144where
145 C: Clone + Context + Send + Sync + 'static,
146{
147 pub ownership: RegisterView<C, ChainOwnership>,
149 pub seed: RegisterView<C, u64>,
151 #[graphql(skip)] pub distribution: RegisterView<C, Option<WeightedAliasIndex<u64>>>,
154 #[graphql(skip)] pub fallback_distribution: RegisterView<C, Option<WeightedAliasIndex<u64>>>,
157 #[graphql(skip)]
160 pub proposed: RegisterView<C, Option<BlockProposal>>,
161 pub proposed_blobs: MapView<C, BlobId, Blob>,
163 #[graphql(skip)]
166 pub locking_block: RegisterView<C, Option<LockingBlock>>,
167 pub locking_blobs: MapView<C, BlobId, Blob>,
169 #[graphql(skip)]
171 pub timeout: RegisterView<C, Option<TimeoutCertificate>>,
172 #[graphql(skip)]
174 pub confirmed_vote: RegisterView<C, Option<Vote<ConfirmedBlock>>>,
175 #[graphql(skip)]
177 pub validated_vote: RegisterView<C, Option<Vote<ValidatedBlock>>>,
178 #[graphql(skip)]
180 pub timeout_vote: RegisterView<C, Option<Vote<Timeout>>>,
181 #[graphql(skip)]
183 pub fallback_vote: RegisterView<C, Option<Vote<Timeout>>>,
184 pub round_timeout: RegisterView<C, Option<Timestamp>>,
186 #[graphql(skip)]
193 pub current_round: RegisterView<C, Round>,
194 pub fallback_owners: RegisterView<C, BTreeMap<AccountOwner, u64>>,
196}
197
198#[ComplexObject]
199impl<C> ChainManager<C>
200where
201 C: Context + Clone + Send + Sync + 'static,
202{
203 #[graphql(derived(name = "current_round"))]
210 async fn _current_round(&self) -> Round {
211 self.current_round()
212 }
213}
214
215impl<C> ChainManager<C>
216where
217 C: Context + Clone + Send + Sync + 'static,
218{
219 pub fn reset<'a>(
221 &mut self,
222 ownership: ChainOwnership,
223 height: BlockHeight,
224 local_time: Timestamp,
225 fallback_owners: impl Iterator<Item = (AccountPublicKey, u64)> + 'a,
226 ) -> Result<(), ChainError> {
227 let distribution = if !ownership.owners.is_empty() {
228 let weights = ownership.owners.values().copied().collect();
229 Some(WeightedAliasIndex::new(weights)?)
230 } else {
231 None
232 };
233 let fallback_owners = fallback_owners
234 .map(|(pub_key, weight)| (AccountOwner::from(pub_key), weight))
235 .collect::<BTreeMap<_, _>>();
236 let fallback_distribution = if !fallback_owners.is_empty() {
237 let weights = fallback_owners.values().copied().collect();
238 Some(WeightedAliasIndex::new(weights)?)
239 } else {
240 None
241 };
242
243 let current_round = ownership.first_round();
244 let round_duration = ownership.round_timeout(current_round);
245 let round_timeout = round_duration.map(|rd| local_time.saturating_add(rd));
246
247 self.clear();
248 self.seed.set(height.0);
249 self.ownership.set(ownership);
250 self.distribution.set(distribution);
251 self.fallback_distribution.set(fallback_distribution);
252 self.fallback_owners.set(fallback_owners);
253 self.current_round.set(current_round);
254 self.round_timeout.set(round_timeout);
255 Ok(())
256 }
257
258 pub fn confirmed_vote(&self) -> Option<&Vote<ConfirmedBlock>> {
260 self.confirmed_vote.get().as_ref()
261 }
262
263 pub fn validated_vote(&self) -> Option<&Vote<ValidatedBlock>> {
265 self.validated_vote.get().as_ref()
266 }
267
268 pub fn timeout_vote(&self) -> Option<&Vote<Timeout>> {
270 self.timeout_vote.get().as_ref()
271 }
272
273 pub fn fallback_vote(&self) -> Option<&Vote<Timeout>> {
275 self.fallback_vote.get().as_ref()
276 }
277
278 pub fn current_round(&self) -> Round {
285 *self.current_round.get()
286 }
287
288 pub fn check_proposed_block(&self, proposal: &BlockProposal) -> Result<Outcome, ChainError> {
290 let new_block = &proposal.content.block;
291 let new_round = proposal.content.round;
292 if let Some(old_proposal) = self.proposed.get() {
293 if old_proposal.content == proposal.content {
294 return Ok(Outcome::Skip); }
296 }
297 ensure!(
299 new_block.height < BlockHeight::MAX,
300 ChainError::InvalidBlockHeight
301 );
302 let current_round = self.current_round();
303 match new_round {
304 Round::Fast => {}
307 Round::MultiLeader(_) | Round::SingleLeader(0) => {
308 ensure!(
311 self.is_super(&proposal.public_key.into()) || !current_round.is_fast(),
312 ChainError::WrongRound(current_round)
313 );
314 ensure!(
316 new_round >= current_round,
317 ChainError::InsufficientRound(new_round)
318 );
319 }
320 Round::SingleLeader(_) | Round::Validator(_) => {
321 ensure!(
323 new_round == current_round,
324 ChainError::WrongRound(current_round)
325 );
326 }
327 }
328 if let Some(vote) = self.validated_vote() {
330 ensure!(
331 new_round > vote.round,
332 ChainError::InsufficientRoundStrict(vote.round)
333 );
334 }
335 if let Some(locking_block) = self.locking_block.get() {
337 ensure!(
338 locking_block.round() < new_round,
339 ChainError::MustBeNewerThanLockingBlock(new_block.height, locking_block.round())
340 );
341 }
342 if let Some(vote) = self.confirmed_vote() {
345 ensure!(
346 if let Some(validated_cert) = proposal.validated_block_certificate.as_ref() {
347 vote.round <= validated_cert.round
348 } else {
349 vote.round.is_fast() && vote.value().matches_proposed_block(new_block)
350 },
351 ChainError::HasIncompatibleConfirmedVote(new_block.height, vote.round)
352 );
353 }
354 Ok(Outcome::Accept)
355 }
356
357 pub fn vote_timeout(
359 &mut self,
360 chain_id: ChainId,
361 height: BlockHeight,
362 epoch: Epoch,
363 key_pair: Option<&ValidatorSecretKey>,
364 local_time: Timestamp,
365 ) -> bool {
366 let Some(key_pair) = key_pair else {
367 return false; };
369 let Some(round_timeout) = *self.round_timeout.get() else {
370 return false; };
372 if local_time < round_timeout || self.ownership.get().owners.is_empty() {
373 return false; }
375 let current_round = self.current_round();
376 if let Some(vote) = self.timeout_vote.get() {
377 if vote.round == current_round {
378 return false; }
380 }
381 let value = Timeout::new(chain_id, height, epoch);
382 self.timeout_vote
383 .set(Some(Vote::new(value, current_round, key_pair)));
384 true
385 }
386
387 pub fn vote_fallback(
392 &mut self,
393 chain_id: ChainId,
394 height: BlockHeight,
395 epoch: Epoch,
396 key_pair: Option<&ValidatorSecretKey>,
397 ) -> bool {
398 let Some(key_pair) = key_pair else {
399 return false; };
401 if self.fallback_vote.get().is_some() || self.current_round() >= Round::Validator(0) {
402 return false; }
404 let value = Timeout::new(chain_id, height, epoch);
405 let last_regular_round = Round::SingleLeader(u32::MAX);
406 self.fallback_vote
407 .set(Some(Vote::new(value, last_regular_round, key_pair)));
408 true
409 }
410
411 pub fn check_validated_block(
413 &self,
414 certificate: &ValidatedBlockCertificate,
415 ) -> Result<Outcome, ChainError> {
416 let new_block = certificate.block();
417 let new_round = certificate.round;
418 if let Some(Vote { value, round, .. }) = self.confirmed_vote.get() {
419 if value.block() == new_block && *round == new_round {
420 return Ok(Outcome::Skip); }
422 }
423
424 if let Some(Vote { round, .. }) = self.validated_vote.get() {
426 ensure!(new_round >= *round, ChainError::InsufficientRound(*round))
427 }
428
429 if let Some(locking) = self.locking_block.get() {
430 if let LockingBlock::Regular(locking_cert) = locking {
431 if locking_cert.hash() == certificate.hash() && locking.round() == new_round {
432 return Ok(Outcome::Skip); }
434 }
435 ensure!(
436 new_round > locking.round(),
437 ChainError::InsufficientRoundStrict(locking.round())
438 );
439 }
440 Ok(Outcome::Accept)
441 }
442
443 pub fn create_vote(
445 &mut self,
446 proposal: BlockProposal,
447 block: Block,
448 key_pair: Option<&ValidatorSecretKey>,
449 local_time: Timestamp,
450 blobs: BTreeMap<BlobId, Blob>,
451 ) -> Result<Option<ValidatedOrConfirmedVote>, ChainError> {
452 let round = proposal.content.round;
453
454 if let Some(lite_cert) = &proposal.validated_block_certificate {
456 if self
457 .locking_block
458 .get()
459 .as_ref()
460 .is_none_or(|locking| locking.round() < lite_cert.round)
461 {
462 let value = ValidatedBlock::new(block.clone());
463 if let Some(certificate) = lite_cert.clone().with_value(value) {
464 self.update_locking(LockingBlock::Regular(certificate), blobs.clone())?;
465 }
466 }
467 } else if round.is_fast() && self.locking_block.get().is_none() {
468 self.update_locking(LockingBlock::Fast(proposal.clone()), blobs.clone())?;
470 }
471
472 self.update_proposed(proposal.clone(), blobs)?;
474 self.update_current_round(local_time);
475
476 let Some(key_pair) = key_pair else {
477 return Ok(None);
479 };
480
481 if round.is_fast() {
483 self.validated_vote.set(None);
484 let value = ConfirmedBlock::new(block);
485 let vote = Vote::new(value, round, key_pair);
486 Ok(Some(Either::Right(
487 self.confirmed_vote.get_mut().insert(vote),
488 )))
489 } else {
490 let value = ValidatedBlock::new(block);
491 let vote = Vote::new(value, round, key_pair);
492 Ok(Some(Either::Left(
493 self.validated_vote.get_mut().insert(vote),
494 )))
495 }
496 }
497
498 pub fn create_final_vote(
500 &mut self,
501 validated: ValidatedBlockCertificate,
502 key_pair: Option<&ValidatorSecretKey>,
503 local_time: Timestamp,
504 blobs: BTreeMap<BlobId, Blob>,
505 ) -> Result<(), ViewError> {
506 let round = validated.round;
507 let confirmed_block = ConfirmedBlock::new(validated.inner().block().clone());
508 self.update_locking(LockingBlock::Regular(validated), blobs)?;
509 self.update_current_round(local_time);
510 if let Some(key_pair) = key_pair {
511 if self.current_round() != round {
512 return Ok(()); }
514 let vote = Vote::new(confirmed_block, round, key_pair);
516 self.confirmed_vote.set(Some(vote));
518 self.validated_vote.set(None);
519 }
520 Ok(())
521 }
522
523 pub async fn pending_blob(&self, blob_id: &BlobId) -> Result<Option<Blob>, ViewError> {
525 if let Some(blob) = self.proposed_blobs.get(blob_id).await? {
526 return Ok(Some(blob));
527 }
528 self.locking_blobs.get(blob_id).await
529 }
530
531 fn update_current_round(&mut self, local_time: Timestamp) {
535 let current_round = self
536 .timeout
537 .get()
538 .iter()
539 .map(|certificate| {
540 self.ownership
541 .get()
542 .next_round(certificate.round)
543 .unwrap_or(Round::Validator(u32::MAX))
544 })
545 .chain(self.locking_block.get().as_ref().map(LockingBlock::round))
546 .chain(
547 self.proposed
548 .get()
549 .iter()
550 .map(|proposal| proposal.content.round),
551 )
552 .max()
553 .unwrap_or_default()
554 .max(self.ownership.get().first_round());
555 if current_round <= self.current_round() {
556 return;
557 }
558 let round_duration = self.ownership.get().round_timeout(current_round);
559 self.round_timeout
560 .set(round_duration.map(|rd| local_time.saturating_add(rd)));
561 self.current_round.set(current_round);
562 }
563
564 pub fn handle_timeout_certificate(
567 &mut self,
568 certificate: TimeoutCertificate,
569 local_time: Timestamp,
570 ) {
571 let round = certificate.round;
572 if let Some(known_certificate) = self.timeout.get() {
573 if known_certificate.round >= round {
574 return;
575 }
576 }
577 self.timeout.set(Some(certificate));
578 self.update_current_round(local_time);
579 }
580
581 pub fn verify_owner(&self, proposal: &BlockProposal) -> bool {
584 let owner = &proposal.public_key.into();
585 if self.ownership.get().super_owners.contains(owner) {
586 return true;
587 }
588 match proposal.content.round {
589 Round::Fast => {
590 false }
592 Round::MultiLeader(_) => {
593 let ownership = self.ownership.get();
594 ownership.open_multi_leader_rounds || ownership.owners.contains_key(owner)
596 }
597 Round::SingleLeader(r) => {
598 let Some(index) = self.round_leader_index(r) else {
599 return false;
600 };
601 self.ownership.get().owners.keys().nth(index) == Some(owner)
602 }
603 Round::Validator(r) => {
604 let Some(index) = self.fallback_round_leader_index(r) else {
605 return false;
606 };
607 self.fallback_owners.get().keys().nth(index) == Some(owner)
608 }
609 }
610 }
611
612 fn round_leader(&self, round: Round) -> Option<&AccountOwner> {
615 match round {
616 Round::SingleLeader(r) => {
617 let index = self.round_leader_index(r)?;
618 self.ownership.get().owners.keys().nth(index)
619 }
620 Round::Validator(r) => {
621 let index = self.fallback_round_leader_index(r)?;
622 self.fallback_owners.get().keys().nth(index)
623 }
624 Round::Fast | Round::MultiLeader(_) => None,
625 }
626 }
627
628 fn round_leader_index(&self, round: u32) -> Option<usize> {
630 let seed = u64::from(round)
631 .rotate_left(32)
632 .wrapping_add(*self.seed.get());
633 let mut rng = ChaCha8Rng::seed_from_u64(seed);
634 Some(self.distribution.get().as_ref()?.sample(&mut rng))
635 }
636
637 fn fallback_round_leader_index(&self, round: u32) -> Option<usize> {
640 let seed = u64::from(round)
641 .rotate_left(32)
642 .wrapping_add(*self.seed.get());
643 let mut rng = ChaCha8Rng::seed_from_u64(seed);
644 Some(self.fallback_distribution.get().as_ref()?.sample(&mut rng))
645 }
646
647 fn is_super(&self, owner: &AccountOwner) -> bool {
649 self.ownership.get().super_owners.contains(owner)
650 }
651
652 fn update_proposed(
654 &mut self,
655 proposal: BlockProposal,
656 blobs: BTreeMap<BlobId, Blob>,
657 ) -> Result<(), ViewError> {
658 if let Some(old_proposal) = self.proposed.get() {
659 if old_proposal.content.round >= proposal.content.round {
660 return Ok(());
661 }
662 }
663 self.proposed.set(Some(proposal));
664 self.proposed_blobs.clear();
665 for (blob_id, blob) in blobs {
666 self.proposed_blobs.insert(&blob_id, blob)?;
667 }
668 Ok(())
669 }
670
671 fn update_locking(
673 &mut self,
674 locking: LockingBlock,
675 blobs: BTreeMap<BlobId, Blob>,
676 ) -> Result<(), ViewError> {
677 if let Some(old_locked) = self.locking_block.get() {
678 if old_locked.round() >= locking.round() {
679 return Ok(());
680 }
681 }
682 self.locking_block.set(Some(locking));
683 self.locking_blobs.clear();
684 for (blob_id, blob) in blobs {
685 self.locking_blobs.insert(&blob_id, blob)?;
686 }
687 Ok(())
688 }
689}
690
691#[derive(Default, Clone, Debug, Serialize, Deserialize)]
693#[cfg_attr(with_testing, derive(Eq, PartialEq))]
694pub struct ChainManagerInfo {
695 pub ownership: ChainOwnership,
697 #[debug(skip_if = Option::is_none)]
699 pub requested_proposed: Option<Box<BlockProposal>>,
700 #[debug(skip_if = Option::is_none)]
703 pub requested_locking: Option<Box<LockingBlock>>,
704 #[debug(skip_if = Option::is_none)]
706 pub timeout: Option<Box<TimeoutCertificate>>,
707 #[debug(skip_if = Option::is_none)]
709 pub pending: Option<LiteVote>,
710 #[debug(skip_if = Option::is_none)]
712 pub timeout_vote: Option<LiteVote>,
713 #[debug(skip_if = Option::is_none)]
715 pub fallback_vote: Option<LiteVote>,
716 #[debug(skip_if = Option::is_none)]
718 pub requested_confirmed: Option<Box<ConfirmedBlock>>,
719 #[debug(skip_if = Option::is_none)]
721 pub requested_validated: Option<Box<ValidatedBlock>>,
722 pub current_round: Round,
724 #[debug(skip_if = Option::is_none)]
727 pub leader: Option<AccountOwner>,
728 #[debug(skip_if = Option::is_none)]
730 pub round_timeout: Option<Timestamp>,
731}
732
733impl<C> From<&ChainManager<C>> for ChainManagerInfo
734where
735 C: Context + Clone + Send + Sync + 'static,
736{
737 fn from(manager: &ChainManager<C>) -> Self {
738 let current_round = manager.current_round();
739 let pending = match (manager.confirmed_vote.get(), manager.validated_vote.get()) {
740 (None, None) => None,
741 (Some(confirmed_vote), Some(validated_vote))
742 if validated_vote.round > confirmed_vote.round =>
743 {
744 Some(validated_vote.lite())
745 }
746 (Some(vote), _) => Some(vote.lite()),
747 (None, Some(vote)) => Some(vote.lite()),
748 };
749 ChainManagerInfo {
750 ownership: manager.ownership.get().clone(),
751 requested_proposed: None,
752 requested_locking: None,
753 timeout: manager.timeout.get().clone().map(Box::new),
754 pending,
755 timeout_vote: manager.timeout_vote.get().as_ref().map(Vote::lite),
756 fallback_vote: manager.fallback_vote.get().as_ref().map(Vote::lite),
757 requested_confirmed: None,
758 requested_validated: None,
759 current_round,
760 leader: manager.round_leader(current_round).cloned(),
761 round_timeout: *manager.round_timeout.get(),
762 }
763 }
764}
765
766impl ChainManagerInfo {
767 pub fn add_values<C>(&mut self, manager: &ChainManager<C>)
769 where
770 C: Context + Clone + Send + Sync + 'static,
771 C::Extra: ExecutionRuntimeContext,
772 {
773 self.requested_proposed = manager.proposed.get().clone().map(Box::new);
774 self.requested_locking = manager.locking_block.get().clone().map(Box::new);
775 self.requested_confirmed = manager
776 .confirmed_vote
777 .get()
778 .as_ref()
779 .map(|vote| Box::new(vote.value.clone()));
780 self.requested_validated = manager
781 .validated_vote
782 .get()
783 .as_ref()
784 .map(|vote| Box::new(vote.value.clone()));
785 }
786
787 pub fn can_propose(&self, identity: &AccountOwner, round: Round) -> bool {
790 match round {
791 Round::Fast => self.ownership.super_owners.contains(identity),
792 Round::MultiLeader(_) => true,
793 Round::SingleLeader(_) | Round::Validator(_) => self.leader.as_ref() == Some(identity),
794 }
795 }
796
797 pub fn already_handled_proposal(&self, round: Round, proposed_block: &ProposedBlock) -> bool {
799 self.requested_proposed.as_ref().is_some_and(|proposal| {
800 proposal.content.round == round && *proposed_block == proposal.content.block
801 })
802 }
803
804 pub fn has_locking_block_in_current_round(&self) -> bool {
806 self.requested_locking
807 .as_ref()
808 .is_some_and(|locking| locking.round() == self.current_round)
809 }
810}