Skip to main content

commonware_consensus/marshal/core/
mailbox.rs

1use super::Variant;
2use crate::{
3    marshal::{
4        ancestry::{AncestorStream, Ancestry, BlockProvider},
5        Identifier,
6    },
7    simplex::types::{Activity, Finalization, Notarization},
8    types::{Height, Round},
9    Reporter,
10};
11use commonware_actor::{
12    mailbox::{Overflow, Policy, Sender},
13    Feedback,
14};
15use commonware_cryptography::{certificate::Scheme, Digestible};
16use commonware_p2p::Recipients;
17use commonware_runtime::{telemetry::metrics::histogram::Timed, Clock};
18use commonware_utils::{channel::oneshot, vec::NonEmptyVec};
19use std::{
20    collections::{btree_map::Entry, BTreeMap, VecDeque},
21    sync::Arc,
22};
23
24/// Messages sent to the marshal [Actor](super::Actor).
25///
26/// These messages are sent from the consensus engine and other parts of the
27/// system to drive the state of the marshal.
28pub(crate) enum Message<S: Scheme, V: Variant> {
29    /// A request to retrieve the `(height, digest)` of a block by its identifier.
30    /// The block must be finalized; returns `None` if the block is not finalized.
31    GetInfo {
32        /// The identifier of the block to get the information of.
33        identifier: Identifier<<V::Block as Digestible>::Digest>,
34        /// A channel to send the retrieved `(height, digest)`.
35        response: oneshot::Sender<Option<(Height, <V::Block as Digestible>::Digest)>>,
36    },
37    /// A request to retrieve a block by its identifier.
38    ///
39    /// Requesting by [Identifier::Height] or [Identifier::Latest] will only return finalized
40    /// blocks, whereas requesting by [Identifier::Digest] may return non-finalized
41    /// or even unverified blocks.
42    GetBlock {
43        /// The identifier of the block to retrieve.
44        identifier: Identifier<<V::Block as Digestible>::Digest>,
45        /// A channel to send the retrieved block.
46        response: oneshot::Sender<Option<V::Block>>,
47    },
48    /// A request to retrieve a finalization by height.
49    GetFinalization {
50        /// The height of the finalization to retrieve.
51        height: Height,
52        /// A channel to send the retrieved finalization.
53        response: oneshot::Sender<Option<Finalization<S, V::Commitment>>>,
54    },
55    /// A request to retrieve the latest processed height.
56    GetProcessedHeight {
57        /// A channel to send the latest processed height.
58        response: oneshot::Sender<Option<Height>>,
59    },
60    /// A hint that a finalized block may be available at a given height.
61    ///
62    /// This triggers a network fetch if the finalization is not available locally.
63    /// This is fire-and-forget: the finalization will be stored in marshal and
64    /// delivered via the normal finalization flow when available.
65    ///
66    /// The height must be covered by both the epocher and the provider. If the
67    /// epocher cannot map the height to an epoch, or the provider cannot supply
68    /// a scheme for that epoch, the hint is silently dropped.
69    ///
70    /// Targets are required because this is typically called when a peer claims to
71    /// be ahead. If a target returns invalid data, the resolver will block them.
72    /// Sending this message multiple times with different targets adds to the
73    /// target set.
74    HintFinalized {
75        /// The height of the finalization to fetch.
76        height: Height,
77        /// Target peers to fetch from. Added to any existing targets for this height.
78        targets: NonEmptyVec<S::PublicKey>,
79    },
80    /// A request to subscribe to a block by its digest.
81    SubscribeByDigest {
82        /// The digest of the block to retrieve.
83        digest: <V::Block as Digestible>::Digest,
84        /// How marshal should behave if the block is missing locally.
85        fallback: DigestFallback,
86        /// A channel to send the retrieved block.
87        response: oneshot::Sender<V::Block>,
88    },
89    /// A request to subscribe to a block by its commitment.
90    SubscribeByCommitment {
91        /// The commitment of the block to retrieve.
92        commitment: V::Commitment,
93        /// How marshal should behave if the block is missing locally.
94        fallback: CommitmentFallback,
95        /// A channel to send the retrieved block.
96        response: oneshot::Sender<V::Block>,
97    },
98    /// A hint to fetch a notarized block by round without adding another local subscriber.
99    ///
100    /// `commitment` is used as a locality check: if the block is already
101    /// available locally, the fetch is skipped.
102    HintNotarized {
103        /// The notarized round to request.
104        round: Round,
105        /// The commitment used to short-circuit if the block is already local.
106        commitment: V::Commitment,
107    },
108    /// A request to retrieve the verified block previously persisted for `round`.
109    GetVerified {
110        /// The round to query.
111        round: Round,
112        /// A channel to send the retrieved block, if any.
113        response: oneshot::Sender<Option<V::Block>>,
114    },
115    /// A request to forward a block to a set of recipients.
116    Forward {
117        /// The round in which the block was proposed.
118        round: Round,
119        /// The commitment of the block to forward.
120        commitment: V::Commitment,
121        /// The recipients to forward the block to.
122        recipients: Recipients<S::PublicKey>,
123    },
124    /// A notification that a block has been locally proposed by this node.
125    Proposed {
126        /// The round in which the block was proposed.
127        round: Round,
128        /// The proposed block.
129        block: V::Block,
130        /// A channel signaled once the block is durably stored.
131        ack: Option<oneshot::Sender<()>>,
132    },
133    /// A notification that a block has been verified by the application.
134    Verified {
135        /// The round in which the block was verified.
136        round: Round,
137        /// The verified block.
138        block: V::Block,
139        /// A channel signaled once the block is durably stored.
140        ack: Option<oneshot::Sender<()>>,
141    },
142    /// A notification that a block has been certified by the application.
143    Certified {
144        /// The round in which the block was certified.
145        round: Round,
146        /// The certified block.
147        block: V::Block,
148        /// A channel signaled once the block is durably stored.
149        ack: Option<oneshot::Sender<()>>,
150    },
151    /// Attempts to set the sync starting point from a finalized commitment.
152    ///
153    /// If the verified finalization advances marshal's current floor, marshal
154    /// anchors on its block, prunes below it, then syncs and delivers blocks
155    /// starting at the floor height. Stale or superseded floors may be ignored.
156    ///
157    /// To prune data without changing the sync starting point, use
158    /// [Message::Prune] instead.
159    SetFloor {
160        /// The candidate floor finalization, verified by the actor before use.
161        finalization: Finalization<S, V::Commitment>,
162    },
163    /// Requests pruning finalized blocks and certificates below the given height.
164    ///
165    /// Unlike [Message::SetFloor], this does not affect the sync starting
166    /// point. Requests above marshal's current floor are ignored.
167    Prune {
168        /// The minimum height to keep (blocks below this are pruned).
169        height: Height,
170    },
171    /// A notarization from the consensus engine.
172    Notarization {
173        /// The notarization.
174        notarization: Notarization<S, V::Commitment>,
175    },
176    /// A finalization from the consensus engine.
177    Finalization {
178        /// The finalization.
179        finalization: Finalization<S, V::Commitment>,
180    },
181}
182
183/// How a digest-keyed block subscription should behave when the block is missing locally.
184#[derive(Clone, Copy, Debug, Eq, PartialEq)]
185pub enum DigestFallback {
186    /// Wait for local availability only.
187    Wait,
188    /// Request the notarized proposal for `round` from peers.
189    ///
190    /// Use this only when the caller has a trusted round for the digest. Digest-keyed
191    /// subscriptions intentionally cannot request exact commitment fetches.
192    FetchByRound { round: Round },
193}
194
195impl From<DigestFallback> for CommitmentFallback {
196    fn from(fallback: DigestFallback) -> Self {
197        match fallback {
198            DigestFallback::Wait => Self::Wait,
199            DigestFallback::FetchByRound { round } => Self::FetchByRound { round },
200        }
201    }
202}
203
204/// How a commitment-keyed block subscription should behave when the block is missing locally.
205#[derive(Clone, Copy, Debug, Eq, PartialEq)]
206pub enum CommitmentFallback {
207    /// Wait for local availability only.
208    ///
209    /// Use this for pending candidate proposal data before notarization.
210    Wait,
211    /// Request the notarized proposal for `round` from peers.
212    ///
213    /// Use this when the caller knows a trusted notarized or certified round and
214    /// commitment but not the proposal height, such as proposal construction,
215    /// verification of a known child, or certification of a notarized candidate. Do not infer
216    /// height from the finalized tip or another block: proposals may build on
217    /// a certified parent that is not finalized locally yet, and an unverified
218    /// child may lie about its height.
219    ///
220    /// The returned block is heightable once decoded, but that is too late for
221    /// the in-flight resolver key or pruning bound.
222    FetchByRound { round: Round },
223    /// Request the exact commitment from peers and prune the request at
224    /// `height`.
225    ///
226    /// Use this only when no certified parent round is available and the caller
227    /// has a locally validated pruning bound, such as repairing a finalized gap
228    /// or walking an accepted ancestry stream. Do not use it for a candidate's
229    /// immediate parent when the consensus context supplies the parent round.
230    ///
231    /// The height is not sent to peers. It is a local pruning hint for request
232    /// retention, not part of response validity: a fetched block is delivered
233    /// if its commitment matches, and certified storage uses the decoded block
234    /// height.
235    FetchByCommitment { height: Height },
236}
237
238impl<S: Scheme, V: Variant> Message<S, V> {
239    fn stale(&self, current: Option<Height>) -> bool {
240        match self {
241            // Height-targeted reads below the floor can never be served
242            Self::GetInfo {
243                identifier: Identifier::Height(height),
244                ..
245            }
246            | Self::GetBlock {
247                identifier: Identifier::Height(height),
248                ..
249            }
250            | Self::GetFinalization { height, .. } => Some(*height) < current,
251            // Hints only inform the actor about heights strictly above the floor
252            Self::HintFinalized { height, .. } => Some(*height) <= current,
253            // Durability acks cannot be dropped: callers depend on them
254            Self::Proposed { .. } | Self::Verified { .. } | Self::Certified { .. } => false,
255            // Digest and latest lookups are not bound to a specific height
256            Self::GetBlock {
257                identifier: Identifier::Digest(_) | Identifier::Latest,
258                ..
259            }
260            | Self::GetInfo {
261                identifier: Identifier::Digest(_) | Identifier::Latest,
262                ..
263            }
264            | Self::GetProcessedHeight { .. } => false,
265            Self::HintNotarized { .. } => false,
266            Self::SubscribeByDigest { .. }
267            | Self::SubscribeByCommitment { .. }
268            | Self::GetVerified { .. }
269            | Self::Forward { .. }
270            | Self::SetFloor { .. }
271            | Self::Prune { .. }
272            | Self::Notarization { .. }
273            | Self::Finalization { .. } => false,
274        }
275    }
276
277    pub(crate) fn response_closed(&self) -> bool {
278        match self {
279            Self::GetInfo { response, .. } => response.is_closed(),
280            Self::GetBlock { response, .. } | Self::GetVerified { response, .. } => {
281                response.is_closed()
282            }
283            Self::GetFinalization { response, .. } => response.is_closed(),
284            Self::GetProcessedHeight { response } => response.is_closed(),
285            Self::SubscribeByDigest { response, .. }
286            | Self::SubscribeByCommitment { response, .. } => response.is_closed(),
287            Self::HintNotarized { .. } => false,
288            Self::HintFinalized { .. }
289            | Self::Forward { .. }
290            | Self::Proposed { .. }
291            | Self::Verified { .. }
292            | Self::Certified { .. }
293            | Self::SetFloor { .. }
294            | Self::Prune { .. }
295            | Self::Notarization { .. }
296            | Self::Finalization { .. } => false,
297        }
298    }
299}
300
301pub(crate) struct Pending<S: Scheme, V: Variant> {
302    floor: Option<Finalization<S, V::Commitment>>,
303    prune: Option<Height>,
304    hints: BTreeMap<Height, NonEmptyVec<S::PublicKey>>,
305    messages: VecDeque<PendingMessage<S, V>>,
306}
307
308enum PendingMessage<S: Scheme, V: Variant> {
309    Message(Message<S, V>),
310    HintFinalized(Height),
311}
312
313impl<S: Scheme, V: Variant> Default for Pending<S, V> {
314    fn default() -> Self {
315        Self {
316            floor: None,
317            prune: None,
318            hints: BTreeMap::new(),
319            messages: VecDeque::new(),
320        }
321    }
322}
323
324impl<S: Scheme, V: Variant> Pending<S, V> {
325    // Only prune advances are usable for height staleness checks. A pending
326    // floor finalization does not carry the block height until the block is decoded.
327    const fn height(&self) -> Option<Height> {
328        self.prune
329    }
330
331    fn retain(&mut self) {
332        let current = self.height();
333        self.hints.retain(|height, _| Some(*height) > current);
334
335        let hints = &self.hints;
336        self.messages.retain(|message| match message {
337            PendingMessage::Message(message) => {
338                !message.response_closed() && !message.stale(current)
339            }
340            PendingMessage::HintFinalized(height) => hints.contains_key(height),
341        });
342    }
343
344    fn set_floor(&mut self, finalization: Finalization<S, V::Commitment>) {
345        let round = finalization.round();
346        if self
347            .floor
348            .as_ref()
349            .is_some_and(|floor| floor.round() >= round)
350        {
351            return;
352        }
353
354        self.floor = Some(finalization);
355    }
356
357    fn prune(&mut self, height: Height) {
358        let current = self.height();
359        let prune = Some(height);
360        if self.prune >= prune {
361            return;
362        }
363
364        self.prune = self.prune.max(prune);
365        if self.height() > current {
366            self.retain();
367        }
368    }
369
370    fn extend_hint_targets(
371        pending: &mut NonEmptyVec<S::PublicKey>,
372        targets: NonEmptyVec<S::PublicKey>,
373    ) {
374        for target in targets {
375            if !pending.contains(&target) {
376                pending.push(target);
377            }
378        }
379    }
380
381    fn hint_finalized(&mut self, height: Height, targets: NonEmptyVec<S::PublicKey>) {
382        // The finalized height is already covered by the floor or prune point.
383        let current = self.height();
384        if current.is_some_and(|current| height <= current) {
385            return;
386        }
387
388        match self.hints.entry(height) {
389            Entry::Vacant(entry) => {
390                entry.insert(targets);
391                self.messages
392                    .push_back(PendingMessage::HintFinalized(height));
393            }
394            Entry::Occupied(mut entry) => {
395                Self::extend_hint_targets(entry.get_mut(), targets);
396            }
397        }
398    }
399
400    fn restore_hint(&mut self, height: Height, targets: NonEmptyVec<S::PublicKey>) {
401        match self.hints.entry(height) {
402            Entry::Vacant(entry) => {
403                entry.insert(targets);
404            }
405            Entry::Occupied(mut entry) => {
406                Self::extend_hint_targets(entry.get_mut(), targets);
407            }
408        }
409        self.messages
410            .push_front(PendingMessage::HintFinalized(height));
411    }
412
413    fn drain_one<F>(&mut self, message: Message<S, V>, push: &mut F) -> bool
414    where
415        F: FnMut(Message<S, V>) -> Option<Message<S, V>>,
416    {
417        // Receiver accepted; the message is consumed
418        let Some(message) = push(message) else {
419            return true;
420        };
421
422        // Receiver rejected; restore so the next drain retries from the same point
423        match message {
424            Message::SetFloor { finalization } => self.set_floor(finalization),
425            Message::Prune { height } => self.prune(height),
426            Message::HintFinalized { height, targets } => self.restore_hint(height, targets),
427            message => self.messages.push_front(PendingMessage::Message(message)),
428        }
429        false
430    }
431}
432
433impl<S: Scheme, V: Variant> Overflow<Message<S, V>> for Pending<S, V> {
434    fn is_empty(&self) -> bool {
435        self.floor.is_none()
436            && self.prune.is_none()
437            && self.hints.is_empty()
438            && self.messages.is_empty()
439    }
440
441    fn drain<F>(&mut self, mut push: F)
442    where
443        F: FnMut(Message<S, V>) -> Option<Message<S, V>>,
444    {
445        // Drain floor and prune first so the actor advances its floor before
446        // it sees the height-bounded reads that follow
447        if let Some(finalization) = self.floor.take() {
448            if !self.drain_one(Message::SetFloor { finalization }, &mut push) {
449                return;
450            }
451        }
452        if let Some(height) = self.prune.take() {
453            if !self.drain_one(Message::Prune { height }, &mut push) {
454                return;
455            }
456        }
457
458        // Drain the remaining queued messages in FIFO order
459        while let Some(pending) = self.messages.pop_front() {
460            match pending {
461                PendingMessage::Message(message) => {
462                    if message.response_closed() {
463                        continue;
464                    }
465                    if !self.drain_one(message, &mut push) {
466                        break;
467                    }
468                }
469                PendingMessage::HintFinalized(hint_height) => {
470                    let Some(targets) = self.hints.remove(&hint_height) else {
471                        continue;
472                    };
473                    let message = Message::HintFinalized {
474                        height: hint_height,
475                        targets,
476                    };
477                    if !self.drain_one(message, &mut push) {
478                        break;
479                    }
480                }
481            }
482        }
483    }
484}
485
486impl<S: Scheme, V: Variant> Policy for Message<S, V> {
487    type Overflow = Pending<S, V>;
488
489    fn handle(overflow: &mut Self::Overflow, message: Self) {
490        // A closed responder cannot be served
491        if message.response_closed() {
492            return;
493        }
494        match message {
495            // Coalesce hints: a single entry per height with a unioned target set
496            Self::HintFinalized { height, targets } => {
497                overflow.hint_finalized(height, targets);
498            }
499            // Floors collapse to the highest round seen; prune collapses to
500            // the highest height seen.
501            Self::SetFloor { finalization } => {
502                overflow.set_floor(finalization);
503            }
504            Self::Prune { height } => {
505                overflow.prune(height);
506            }
507            // Queue if the new message is still useful
508            message => {
509                if message.stale(overflow.height()) {
510                    return;
511                }
512                overflow
513                    .messages
514                    .push_back(PendingMessage::Message(message));
515            }
516        }
517    }
518}
519
520/// A mailbox for sending messages to the marshal [Actor](super::Actor).
521#[derive(Clone)]
522pub struct Mailbox<S: Scheme, V: Variant> {
523    sender: Sender<Message<S, V>>,
524}
525
526impl<S: Scheme, V: Variant> Mailbox<S, V> {
527    /// Creates a new mailbox.
528    pub(crate) const fn new(sender: Sender<Message<S, V>>) -> Self {
529        Self { sender }
530    }
531
532    /// Create an ancestor stream that fetches missing parents by commitment.
533    ///
534    /// This stream is always a fetching stream. Callers must only use it after
535    /// they already have a block that is safe to verify, certify, build on, or
536    /// repair from. From that point, every parent walked by the stream is part of
537    /// a certified ancestry chain, and the stream can derive each missing
538    /// parent's height from its child before issuing a height-bound request.
539    ///
540    /// Do not use this to wait for pending candidate proposal data.
541    pub(crate) fn ancestor_stream<I, C>(
542        &self,
543        clock: Arc<C>,
544        initial: I,
545        fetch_duration: Timed,
546    ) -> impl Ancestry<V::ApplicationBlock> + use<S, V, I, C>
547    where
548        Self: BlockProvider<Block = V::ApplicationBlock>,
549        I: IntoIterator<Item = V::Block>,
550        C: Clock,
551    {
552        AncestorStream::new(
553            clock,
554            self.clone(),
555            initial.into_iter().map(V::into_inner),
556            fetch_duration,
557        )
558    }
559
560    /// Retrieve `(height, digest)` for a finalized block by height, digest, or latest.
561    pub async fn get_info(
562        &self,
563        identifier: impl Into<Identifier<<V::Block as Digestible>::Digest>>,
564    ) -> Option<(Height, <V::Block as Digestible>::Digest)> {
565        let identifier = identifier.into();
566        let (response, receiver) = oneshot::channel();
567        let _ = self.sender.enqueue(Message::GetInfo {
568            identifier,
569            response,
570        });
571        receiver.await.ok().flatten()
572    }
573
574    /// A best-effort attempt to retrieve a given block from local
575    /// storage. It is not an indication to go fetch the block from the network.
576    pub async fn get_block(
577        &self,
578        identifier: impl Into<Identifier<<V::Block as Digestible>::Digest>>,
579    ) -> Option<V::Block> {
580        let identifier = identifier.into();
581        let (response, receiver) = oneshot::channel();
582        let _ = self.sender.enqueue(Message::GetBlock {
583            identifier,
584            response,
585        });
586        receiver.await.ok().flatten()
587    }
588
589    /// A best-effort attempt to retrieve a given [Finalization] from local
590    /// storage. It is not an indication to go fetch the [Finalization] from the network.
591    pub async fn get_finalization(&self, height: Height) -> Option<Finalization<S, V::Commitment>> {
592        let (response, receiver) = oneshot::channel();
593        let _ = self
594            .sender
595            .enqueue(Message::GetFinalization { height, response });
596        receiver.await.ok().flatten()
597    }
598
599    /// Retrieve the latest processed height.
600    pub async fn get_processed_height(&self) -> Option<Height> {
601        let (response, receiver) = oneshot::channel();
602        let _ = self
603            .sender
604            .enqueue(Message::GetProcessedHeight { response });
605        receiver.await.ok().flatten()
606    }
607
608    /// Hints that a finalized block may be available at the given height.
609    ///
610    /// This method will request the finalization from the network via the resolver
611    /// if it is not available locally.
612    ///
613    /// Targets are required because this is typically called when a peer claims to be
614    /// ahead. By targeting only those peers, we limit who we ask. If a target returns
615    /// invalid data, they will be blocked by the resolver. If targets don't respond
616    /// or return "no data", they effectively rate-limit themselves.
617    ///
618    /// Calling this multiple times for the same height with different targets will
619    /// add to the target set if there is an ongoing fetch, allowing more peers to be tried.
620    ///
621    /// This is fire-and-forget: the finalization will be stored in marshal and delivered
622    /// via the normal finalization flow when available.
623    ///
624    /// The height must be covered by both the epocher and the provider. If the
625    /// epocher cannot map the height to an epoch, or the provider cannot supply
626    /// a scheme for that epoch, the hint is silently dropped.
627    pub fn hint_finalized(&self, height: Height, targets: NonEmptyVec<S::PublicKey>) {
628        let _ = self
629            .sender
630            .enqueue(Message::HintFinalized { height, targets });
631    }
632
633    /// Subscribe to a block by its digest.
634    ///
635    /// If the block is found available locally, the block will be returned immediately.
636    ///
637    /// If the block is not available locally, the subscription will be registered and the caller
638    /// will be notified when the block is available. If the block is not finalized, it's possible
639    /// that it may never become available.
640    ///
641    /// The `fallback` parameter controls whether marshal also asks peers for the missing block.
642    /// Digest-keyed subscriptions only support waiting locally or fetching by round.
643    ///
644    /// The oneshot receiver should be dropped to cancel the subscription.
645    pub fn subscribe_by_digest(
646        &self,
647        digest: <V::Block as Digestible>::Digest,
648        fallback: DigestFallback,
649    ) -> oneshot::Receiver<V::Block> {
650        let (tx, rx) = oneshot::channel();
651        let _ = self.sender.enqueue(Message::SubscribeByDigest {
652            digest,
653            fallback,
654            response: tx,
655        });
656        rx
657    }
658
659    /// Subscribe to a block by its commitment.
660    ///
661    /// If the block is found available locally, the block will be returned immediately.
662    ///
663    /// If the block is not available locally, the subscription will be registered and the caller
664    /// will be notified when the block is available. If the block is not finalized, it's possible
665    /// that it may never become available.
666    ///
667    /// The `fallback` parameter controls whether marshal also asks peers for the missing block.
668    ///
669    /// The oneshot receiver should be dropped to cancel the subscription.
670    pub fn subscribe_by_commitment(
671        &self,
672        commitment: V::Commitment,
673        fallback: CommitmentFallback,
674    ) -> oneshot::Receiver<V::Block> {
675        let (tx, rx) = oneshot::channel();
676        let _ = self.sender.enqueue(Message::SubscribeByCommitment {
677            fallback,
678            commitment,
679            response: tx,
680        });
681        rx
682    }
683
684    /// Hint that peers may have the block notarized at `round`.
685    ///
686    /// This issues a round-bound resolver request without registering a new
687    /// block subscriber. The `commitment` is only used to skip the request when
688    /// the block is already available locally.
689    ///
690    /// This is useful when a local-only waiter already exists and later
691    /// certification makes a network fetch by notarized round valid.
692    pub fn hint_notarized(&self, round: Round, commitment: V::Commitment) {
693        let _ = self
694            .sender
695            .enqueue(Message::HintNotarized { round, commitment });
696    }
697
698    /// Returns a stream over the ancestry of a given block, leading up to genesis.
699    ///
700    /// This stream may fetch missing parents because callers should only request
701    /// ancestry for data they already have locally and are willing to build on,
702    /// verify, certify, or repair from. It is not a candidate fetch path.
703    ///
704    /// If the starting block is not found, `None` is returned.
705    pub async fn ancestry<C>(
706        &self,
707        clock: Arc<C>,
708        (fallback, start_digest): (DigestFallback, <V::Block as Digestible>::Digest),
709        fetch_duration: Timed,
710    ) -> Option<impl Ancestry<V::ApplicationBlock> + use<S, V, C>>
711    where
712        Self: BlockProvider<Block = V::ApplicationBlock>,
713        C: Clock,
714    {
715        let receiver = self.subscribe_by_digest(start_digest, fallback);
716        receiver
717            .await
718            .ok()
719            .map(|block| self.ancestor_stream(clock, [block], fetch_duration))
720    }
721
722    /// Returns the verified block previously persisted for `round`, if any.
723    pub async fn get_verified(&self, round: Round) -> Option<V::Block> {
724        let (response, receiver) = oneshot::channel();
725        let _ = self
726            .sender
727            .enqueue(Message::GetVerified { round, response });
728        receiver.await.ok().flatten()
729    }
730
731    /// Notifies the actor that a block has been locally proposed.
732    ///
733    /// Returns after the block is durably persisted.
734    #[must_use = "callers must consider block durability before proceeding"]
735    pub async fn proposed(&self, round: Round, block: V::Block) -> bool {
736        let (ack, receiver) = oneshot::channel();
737        let _ = self.sender.enqueue(Message::Proposed {
738            round,
739            block,
740            ack: Some(ack),
741        });
742        receiver.await.is_ok()
743    }
744
745    /// Notifies the actor that a block has been verified.
746    ///
747    /// Returns after the block is durably persisted.
748    #[must_use = "callers must consider block durability before proceeding"]
749    pub async fn verified(&self, round: Round, block: V::Block) -> bool {
750        let (ack, receiver) = oneshot::channel();
751        let _ = self.sender.enqueue(Message::Verified {
752            round,
753            block,
754            ack: Some(ack),
755        });
756        receiver.await.is_ok()
757    }
758
759    /// Notifies the actor that a block has been certified.
760    ///
761    /// Returns after the block is durably persisted.
762    #[must_use = "callers must consider block durability before proceeding"]
763    pub async fn certified(&self, round: Round, block: V::Block) -> bool {
764        let (ack, receiver) = oneshot::channel();
765        let _ = self.sender.enqueue(Message::Certified {
766            round,
767            block,
768            ack: Some(ack),
769        });
770        receiver.await.is_ok()
771    }
772
773    /// Attempts to set the sync starting point from a finalized commitment.
774    ///
775    /// If the verified finalization advances marshal's current floor, marshal
776    /// anchors on its block, prunes below it, then syncs and delivers blocks
777    /// starting at the floor height. Stale or superseded floors may be ignored.
778    ///
779    /// To prune data without changing the sync starting point, use
780    /// [Self::prune] instead.
781    /// Use [`crate::marshal::Config::start`] to provide the startup anchor.
782    pub fn set_floor(&self, finalization: Finalization<S, V::Commitment>) {
783        let _ = self.sender.enqueue(Message::SetFloor { finalization });
784    }
785
786    /// Requests pruning finalized blocks and certificates below the given height.
787    ///
788    /// Unlike [Self::set_floor], this does not affect the sync starting point.
789    /// Requests above marshal's current floor are ignored.
790    pub fn prune(&self, height: Height) {
791        let _ = self.sender.enqueue(Message::Prune { height });
792    }
793
794    /// Forward a block to a set of recipients.
795    pub fn forward(
796        &self,
797        round: Round,
798        commitment: V::Commitment,
799        recipients: Recipients<S::PublicKey>,
800    ) -> Feedback {
801        self.sender.enqueue(Message::Forward {
802            round,
803            commitment,
804            recipients,
805        })
806    }
807}
808
809impl<S: Scheme, V: Variant> Reporter for Mailbox<S, V> {
810    type Activity = Activity<S, V::Commitment>;
811
812    fn report(&mut self, activity: Self::Activity) -> Feedback {
813        let message = match activity {
814            Activity::Notarization(notarization) => Message::Notarization { notarization },
815            Activity::Finalization(finalization) => Message::Finalization { finalization },
816            _ => return Feedback::Ok,
817        };
818        self.sender.enqueue(message)
819    }
820}
821
822#[cfg(test)]
823mod tests {
824    use super::*;
825    use crate::{
826        marshal::{mocks::harness, standard::Standard},
827        simplex::{scheme::bls12381_threshold::vrf as bls12381_threshold_vrf, types::Proposal},
828        types::{Epoch, View},
829        Heightable,
830    };
831    use commonware_cryptography::{
832        certificate::mocks::Fixture, ed25519::PrivateKey, Digest as _, Signer as _,
833    };
834    use commonware_utils::{channel::oneshot::error::TryRecvError, test_rng_seeded};
835
836    type TestMessage = Message<harness::S, Standard<harness::B>>;
837    type TestPending = Pending<harness::S, Standard<harness::B>>;
838
839    fn public_key(seed: u64) -> harness::K {
840        PrivateKey::from_seed(seed).public_key()
841    }
842
843    fn round(height: u64) -> Round {
844        Round::new(Epoch::zero(), View::new(height))
845    }
846
847    fn block(height: u64) -> harness::B {
848        harness::make_raw_block(harness::D::EMPTY, Height::new(height), height)
849    }
850
851    fn commitment(height: u64) -> harness::D {
852        <Standard<harness::B> as Variant>::commitment(&block(height))
853    }
854
855    fn finalization(height: u64) -> Finalization<harness::S, harness::D> {
856        let mut rng = test_rng_seeded(height);
857        let Fixture { schemes, .. } = bls12381_threshold_vrf::fixture::<harness::V, _>(
858            &mut rng,
859            harness::NAMESPACE,
860            harness::NUM_VALIDATORS,
861        );
862        let proposal = Proposal::new(round(height), View::zero(), commitment(height));
863        <harness::StandardHarness as harness::TestHarness>::make_finalization(
864            proposal,
865            &schemes,
866            harness::QUORUM,
867        )
868    }
869
870    fn get_info(height: u64) -> (TestMessage, oneshot::Receiver<Option<(Height, harness::D)>>) {
871        let (response, receiver) = oneshot::channel();
872        (
873            TestMessage::GetInfo {
874                identifier: Identifier::Height(Height::new(height)),
875                response,
876            },
877            receiver,
878        )
879    }
880
881    fn proposed(height: u64) -> (TestMessage, oneshot::Receiver<()>) {
882        let (ack, receiver) = oneshot::channel();
883        (
884            TestMessage::Proposed {
885                round: round(height),
886                block: block(height),
887                ack: Some(ack),
888            },
889            receiver,
890        )
891    }
892
893    fn verified(height: u64) -> (TestMessage, oneshot::Receiver<()>) {
894        let (ack, receiver) = oneshot::channel();
895        (
896            TestMessage::Verified {
897                round: round(height),
898                block: block(height),
899                ack: Some(ack),
900            },
901            receiver,
902        )
903    }
904
905    fn certified(height: u64) -> (TestMessage, oneshot::Receiver<()>) {
906        let (ack, receiver) = oneshot::channel();
907        (
908            TestMessage::Certified {
909                round: round(height),
910                block: block(height),
911                ack: Some(ack),
912            },
913            receiver,
914        )
915    }
916
917    fn get_block(height: u64) -> (TestMessage, oneshot::Receiver<Option<harness::B>>) {
918        let (response, receiver) = oneshot::channel();
919        (
920            TestMessage::GetBlock {
921                identifier: Identifier::Height(Height::new(height)),
922                response,
923            },
924            receiver,
925        )
926    }
927
928    fn get_finalization(
929        height: u64,
930    ) -> (
931        TestMessage,
932        oneshot::Receiver<Option<Finalization<harness::S, harness::D>>>,
933    ) {
934        let (response, receiver) = oneshot::channel();
935        (
936            TestMessage::GetFinalization {
937                height: Height::new(height),
938                response,
939            },
940            receiver,
941        )
942    }
943
944    fn subscribe_by_digest(height: u64) -> (TestMessage, oneshot::Receiver<harness::B>) {
945        let (response, receiver) = oneshot::channel();
946        (
947            TestMessage::SubscribeByDigest {
948                digest: block(height).digest(),
949                fallback: DigestFallback::FetchByRound {
950                    round: round(height),
951                },
952                response,
953            },
954            receiver,
955        )
956    }
957
958    fn subscribe_by_commitment_message(
959        height: u64,
960        fallback: CommitmentFallback,
961    ) -> (TestMessage, oneshot::Receiver<harness::B>) {
962        let (response, receiver) = oneshot::channel();
963        (
964            TestMessage::SubscribeByCommitment {
965                commitment: commitment(height),
966                fallback,
967                response,
968            },
969            receiver,
970        )
971    }
972
973    fn hint_finalized(height: u64, target: harness::K) -> TestMessage {
974        TestMessage::HintFinalized {
975            height: Height::new(height),
976            targets: NonEmptyVec::new(target),
977        }
978    }
979
980    fn set_floor(height: u64) -> TestMessage {
981        TestMessage::SetFloor {
982            finalization: finalization(height),
983        }
984    }
985
986    fn prune(height: u64) -> TestMessage {
987        TestMessage::Prune {
988            height: Height::new(height),
989        }
990    }
991
992    fn pending() -> TestPending {
993        TestPending::default()
994    }
995
996    fn drain(overflow: &mut TestPending) -> VecDeque<TestMessage> {
997        let mut drained = VecDeque::new();
998        overflow.drain(|message| {
999            drained.push_back(message);
1000            None
1001        });
1002        drained
1003    }
1004
1005    fn has_get_info(overflow: &TestPending, height: u64) -> bool {
1006        overflow.messages.iter().any(|message| {
1007            matches!(
1008                message,
1009                PendingMessage::Message(TestMessage::GetInfo {
1010                    identifier: Identifier::Height(found),
1011                    response,
1012                    ..
1013                }) if *found == Height::new(height) && !response.is_closed()
1014            )
1015        })
1016    }
1017
1018    fn has_get_block(overflow: &TestPending, height: u64) -> bool {
1019        overflow.messages.iter().any(|message| {
1020            matches!(
1021                message,
1022                PendingMessage::Message(TestMessage::GetBlock {
1023                    identifier: Identifier::Height(found),
1024                    response,
1025                    ..
1026                }) if *found == Height::new(height) && !response.is_closed()
1027            )
1028        })
1029    }
1030
1031    fn has_get_finalization(overflow: &TestPending, height: u64) -> bool {
1032        overflow.messages.iter().any(|message| {
1033            matches!(
1034                message,
1035                PendingMessage::Message(TestMessage::GetFinalization {
1036                    height: found,
1037                    response,
1038                }) if *found == Height::new(height) && !response.is_closed()
1039            )
1040        })
1041    }
1042
1043    fn hint_targets(overflow: &TestPending, height: u64) -> Option<&NonEmptyVec<harness::K>> {
1044        overflow.hints.get(&Height::new(height))
1045    }
1046
1047    fn has_block_message(overflow: &TestPending, height: u64) -> bool {
1048        overflow.messages.iter().any(|message| {
1049            matches!(
1050                message,
1051                PendingMessage::Message(
1052                    TestMessage::Proposed { block, .. }
1053                        | TestMessage::Verified { block, .. }
1054                        | TestMessage::Certified { block, .. }
1055                )
1056                    if block.height() == Height::new(height)
1057            )
1058        })
1059    }
1060
1061    fn has_prune(overflow: &TestPending, height: u64) -> bool {
1062        overflow.prune == Some(Height::new(height))
1063    }
1064
1065    fn has_subscription(overflow: &TestPending, height: u64) -> bool {
1066        let expected_digest = block(height).digest();
1067        let expected_commitment = commitment(height);
1068        overflow.messages.iter().any(|message| {
1069            matches!(
1070                message,
1071                PendingMessage::Message(TestMessage::SubscribeByDigest { digest, response, .. })
1072                    if *digest == expected_digest && !response.is_closed()
1073            ) || matches!(
1074                message,
1075                PendingMessage::Message(TestMessage::SubscribeByCommitment {
1076                    commitment,
1077                    response,
1078                    ..
1079                }) if *commitment == expected_commitment && !response.is_closed()
1080            )
1081        })
1082    }
1083
1084    #[test]
1085    fn policy_coalesces_hint_targets() {
1086        let mut overflow = pending();
1087        let first = public_key(1);
1088        let second = public_key(2);
1089
1090        <TestMessage as Policy>::handle(&mut overflow, hint_finalized(10, first.clone()));
1091        <TestMessage as Policy>::handle(&mut overflow, hint_finalized(10, first.clone()));
1092        <TestMessage as Policy>::handle(&mut overflow, hint_finalized(10, second.clone()));
1093
1094        assert_eq!(overflow.messages.len(), 1);
1095        let targets = hint_targets(&overflow, 10).expect("expected hint");
1096        assert_eq!(targets.len().get(), 2);
1097        assert!(targets.contains(&first));
1098        assert!(targets.contains(&second));
1099    }
1100
1101    #[test]
1102    fn policy_preserves_commitment_subscription_fallbacks() {
1103        let mut overflow = pending();
1104
1105        let (wait, _wait_rx) = subscribe_by_commitment_message(1, CommitmentFallback::Wait);
1106        let (by_round, _by_round_rx) = subscribe_by_commitment_message(
1107            2,
1108            CommitmentFallback::FetchByRound { round: round(2) },
1109        );
1110        let (by_commitment, _by_commitment_rx) = subscribe_by_commitment_message(
1111            3,
1112            CommitmentFallback::FetchByCommitment {
1113                height: Height::new(3),
1114            },
1115        );
1116
1117        <TestMessage as Policy>::handle(&mut overflow, wait);
1118        <TestMessage as Policy>::handle(&mut overflow, by_round);
1119        <TestMessage as Policy>::handle(&mut overflow, by_commitment);
1120
1121        let drained = drain(&mut overflow);
1122        assert_eq!(drained.len(), 3);
1123        assert!(matches!(
1124            &drained[0],
1125            TestMessage::SubscribeByCommitment {
1126                fallback: CommitmentFallback::Wait,
1127                ..
1128            }
1129        ));
1130        assert!(matches!(
1131            &drained[1],
1132            TestMessage::SubscribeByCommitment {
1133                fallback: CommitmentFallback::FetchByRound { round: found },
1134                ..
1135            } if *found == round(2)
1136        ));
1137        assert!(matches!(
1138            &drained[2],
1139            TestMessage::SubscribeByCommitment {
1140                fallback: CommitmentFallback::FetchByCommitment { height },
1141                ..
1142            } if *height == Height::new(3)
1143        ));
1144    }
1145
1146    #[test]
1147    fn policy_handles_closed_subscriptions() {
1148        let mut overflow = pending();
1149
1150        let (pending_closed, pending_closed_rx) = subscribe_by_digest(1);
1151        drop(pending_closed_rx);
1152        overflow
1153            .messages
1154            .push_back(PendingMessage::Message(pending_closed));
1155
1156        let (pending_open, mut pending_open_rx) = subscribe_by_commitment_message(
1157            2,
1158            CommitmentFallback::FetchByRound { round: round(2) },
1159        );
1160        overflow
1161            .messages
1162            .push_back(PendingMessage::Message(pending_open));
1163
1164        let (current_closed, current_closed_rx) = subscribe_by_digest(3);
1165        drop(current_closed_rx);
1166        <TestMessage as Policy>::handle(&mut overflow, current_closed);
1167
1168        assert!(!has_subscription(&overflow, 1));
1169        assert!(has_subscription(&overflow, 2));
1170        assert!(!has_subscription(&overflow, 3));
1171        assert!(matches!(
1172            pending_open_rx.try_recv(),
1173            Err(TryRecvError::Empty)
1174        ));
1175    }
1176
1177    #[test]
1178    fn policy_handles_closed_responses() {
1179        let mut overflow = pending();
1180
1181        let (pending_closed, pending_closed_rx) = get_block(1);
1182        drop(pending_closed_rx);
1183        overflow
1184            .messages
1185            .push_back(PendingMessage::Message(pending_closed));
1186
1187        let (pending_open, mut pending_open_rx) = get_info(2);
1188        overflow
1189            .messages
1190            .push_back(PendingMessage::Message(pending_open));
1191
1192        let (current_closed, current_closed_rx) = get_finalization(3);
1193        drop(current_closed_rx);
1194        <TestMessage as Policy>::handle(&mut overflow, current_closed);
1195
1196        assert!(!has_get_block(&overflow, 1));
1197        assert!(has_get_info(&overflow, 2));
1198        assert!(!has_get_finalization(&overflow, 3));
1199        assert!(matches!(
1200            pending_open_rx.try_recv(),
1201            Err(TryRecvError::Empty)
1202        ));
1203    }
1204
1205    #[test]
1206    fn policy_drain_stops_after_returned_response_closes() {
1207        let mut overflow = pending();
1208        let (first, first_rx) = get_block(1);
1209        let (second, mut second_rx) = get_info(2);
1210        overflow.messages.push_back(PendingMessage::Message(first));
1211        overflow.messages.push_back(PendingMessage::Message(second));
1212
1213        let mut first_rx = Some(first_rx);
1214        let mut attempts = 0;
1215        overflow.drain(|message| {
1216            attempts += 1;
1217            drop(first_rx.take());
1218            Some(message)
1219        });
1220        assert_eq!(attempts, 1);
1221
1222        let drained = drain(&mut overflow);
1223        assert_eq!(drained.len(), 1);
1224        assert!(matches!(
1225            &drained[0],
1226            TestMessage::GetInfo {
1227                identifier: Identifier::Height(height),
1228                response,
1229            } if *height == Height::new(2) && !response.is_closed()
1230        ));
1231        assert!(matches!(second_rx.try_recv(), Err(TryRecvError::Empty)));
1232    }
1233
1234    #[test]
1235    fn policy_keeps_coalesced_hints_in_fifo_position() {
1236        let mut overflow = pending();
1237        let first = public_key(1);
1238        let second = public_key(2);
1239        let (get_block_9, _get_block_9_rx) = get_block(9);
1240        let (get_info_11, _get_info_11_rx) = get_info(11);
1241
1242        <TestMessage as Policy>::handle(&mut overflow, get_block_9);
1243        <TestMessage as Policy>::handle(&mut overflow, hint_finalized(10, first.clone()));
1244        <TestMessage as Policy>::handle(&mut overflow, get_info_11);
1245        <TestMessage as Policy>::handle(&mut overflow, hint_finalized(10, second.clone()));
1246
1247        let drained = drain(&mut overflow);
1248        assert_eq!(drained.len(), 3);
1249        assert!(matches!(
1250            &drained[0],
1251            TestMessage::GetBlock {
1252                identifier: Identifier::Height(height),
1253                ..
1254            } if *height == Height::new(9)
1255        ));
1256        assert!(matches!(
1257            &drained[2],
1258            TestMessage::GetInfo {
1259                identifier: Identifier::Height(height),
1260                ..
1261            } if *height == Height::new(11)
1262        ));
1263        let TestMessage::HintFinalized { height, targets } = &drained[1] else {
1264            panic!("expected hint");
1265        };
1266        assert_eq!(*height, Height::new(10));
1267        assert_eq!(targets.len().get(), 2);
1268        assert!(targets.contains(&first));
1269        assert!(targets.contains(&second));
1270    }
1271
1272    #[test]
1273    fn policy_keeps_highest_floor_and_prune() {
1274        let mut overflow = pending();
1275
1276        <TestMessage as Policy>::handle(&mut overflow, set_floor(5));
1277        <TestMessage as Policy>::handle(&mut overflow, set_floor(3));
1278        <TestMessage as Policy>::handle(&mut overflow, set_floor(8));
1279        <TestMessage as Policy>::handle(&mut overflow, prune(4));
1280        <TestMessage as Policy>::handle(&mut overflow, prune(2));
1281        <TestMessage as Policy>::handle(&mut overflow, prune(7));
1282
1283        assert_eq!(
1284            overflow.floor.as_ref().map(Finalization::round),
1285            Some(round(8))
1286        );
1287        assert_eq!(overflow.prune, Some(Height::new(7)));
1288        assert!(overflow.messages.is_empty());
1289
1290        let drained = drain(&mut overflow);
1291        assert_eq!(drained.len(), 2);
1292        assert!(matches!(
1293            &drained[0],
1294            TestMessage::SetFloor { finalization } if finalization.round() == round(8)
1295        ));
1296        assert!(matches!(
1297            &drained[1],
1298            TestMessage::Prune { height } if *height == Height::new(7)
1299        ));
1300    }
1301
1302    #[test]
1303    fn policy_replaces_floor_and_prune_and_drops_stale_pending_on_drain() {
1304        let mut overflow = pending();
1305
1306        overflow.floor = Some(finalization(5));
1307        let (get_info_4, _get_info_4_rx) = get_info(4);
1308        let (get_block_7, _get_block_7_rx) = get_block(7);
1309        let (get_block_8, _get_block_8_rx) = get_block(8);
1310        overflow
1311            .messages
1312            .push_back(PendingMessage::Message(get_info_4));
1313        overflow
1314            .messages
1315            .push_back(PendingMessage::Message(get_block_7));
1316        overflow.hint_finalized(Height::new(8), NonEmptyVec::new(public_key(1)));
1317        overflow
1318            .messages
1319            .push_back(PendingMessage::Message(get_block_8));
1320        <TestMessage as Policy>::handle(&mut overflow, set_floor(8));
1321        <TestMessage as Policy>::handle(&mut overflow, prune(8));
1322        assert_eq!(
1323            overflow.floor.as_ref().map(Finalization::round),
1324            Some(round(8))
1325        );
1326        assert_eq!(overflow.messages.len(), 1);
1327        assert!(!has_get_info(&overflow, 4));
1328        assert!(!has_get_block(&overflow, 7));
1329        assert!(has_get_block(&overflow, 8));
1330        assert!(hint_targets(&overflow, 8).is_none());
1331        let drained = drain(&mut overflow);
1332        assert_eq!(drained.len(), 3);
1333        assert!(matches!(
1334            &drained[0],
1335            TestMessage::SetFloor { finalization } if finalization.round() == round(8)
1336        ));
1337        assert!(matches!(
1338            &drained[1],
1339            TestMessage::Prune { height } if *height == Height::new(8)
1340        ));
1341        assert!(matches!(
1342            &drained[2],
1343            TestMessage::GetBlock {
1344                identifier: Identifier::Height(height),
1345                ..
1346            } if *height == Height::new(8)
1347        ));
1348
1349        let mut overflow = pending();
1350        overflow.prune = Some(Height::new(5));
1351        let (get_finalization_4, _get_finalization_4_rx) = get_finalization(4);
1352        let (get_block_6, _get_block_6_rx) = get_block(6);
1353        let (get_block_7, _get_block_7_rx) = get_block(7);
1354        overflow
1355            .messages
1356            .push_back(PendingMessage::Message(get_finalization_4));
1357        overflow
1358            .messages
1359            .push_back(PendingMessage::Message(get_block_6));
1360        overflow.hint_finalized(Height::new(6), NonEmptyVec::new(public_key(2)));
1361        overflow
1362            .messages
1363            .push_back(PendingMessage::Message(get_block_7));
1364        <TestMessage as Policy>::handle(&mut overflow, prune(7));
1365        assert_eq!(overflow.prune, Some(Height::new(7)));
1366        assert_eq!(overflow.messages.len(), 1);
1367        assert!(!has_get_finalization(&overflow, 4));
1368        assert!(!has_get_block(&overflow, 6));
1369        assert!(has_get_block(&overflow, 7));
1370        assert!(hint_targets(&overflow, 6).is_none());
1371        let drained = drain(&mut overflow);
1372        assert_eq!(drained.len(), 2);
1373        assert!(matches!(
1374            &drained[0],
1375            TestMessage::Prune { height } if *height == Height::new(7)
1376        ));
1377        assert!(matches!(
1378            &drained[1],
1379            TestMessage::GetBlock {
1380                identifier: Identifier::Height(height),
1381                ..
1382            } if *height == Height::new(7)
1383        ));
1384    }
1385
1386    #[test]
1387    fn policy_prune_drops_closed_pending() {
1388        let mut overflow = pending();
1389        let (closed_message, closed_rx) = get_block(8);
1390        drop(closed_rx);
1391        let (open_message, mut open_rx) = get_block(8);
1392
1393        overflow
1394            .messages
1395            .push_back(PendingMessage::Message(closed_message));
1396        overflow
1397            .messages
1398            .push_back(PendingMessage::Message(open_message));
1399
1400        <TestMessage as Policy>::handle(&mut overflow, prune(7));
1401        assert_eq!(overflow.messages.len(), 1);
1402        assert!(has_get_block(&overflow, 8));
1403        assert!(matches!(open_rx.try_recv(), Err(TryRecvError::Empty)));
1404
1405        let mut overflow = pending();
1406        let (closed_message, closed_rx) = get_finalization(8);
1407        drop(closed_rx);
1408        let (open_message, mut open_rx) = get_finalization(8);
1409
1410        overflow
1411            .messages
1412            .push_back(PendingMessage::Message(closed_message));
1413        overflow
1414            .messages
1415            .push_back(PendingMessage::Message(open_message));
1416
1417        <TestMessage as Policy>::handle(&mut overflow, prune(7));
1418        assert_eq!(overflow.messages.len(), 1);
1419        assert!(has_get_finalization(&overflow, 8));
1420        assert!(matches!(open_rx.try_recv(), Err(TryRecvError::Empty)));
1421    }
1422
1423    #[test]
1424    fn policy_skips_retain_when_prune_height_does_not_increase() {
1425        let mut overflow = pending();
1426        <TestMessage as Policy>::handle(&mut overflow, prune(10));
1427
1428        let (closed_message, closed_rx) = get_block(11);
1429        drop(closed_rx);
1430        overflow
1431            .messages
1432            .push_back(PendingMessage::Message(closed_message));
1433
1434        <TestMessage as Policy>::handle(&mut overflow, set_floor(9));
1435        assert_eq!(overflow.messages.len(), 1);
1436
1437        <TestMessage as Policy>::handle(&mut overflow, prune(9));
1438        assert_eq!(overflow.messages.len(), 1);
1439
1440        <TestMessage as Policy>::handle(&mut overflow, prune(12));
1441        assert!(overflow.messages.is_empty());
1442    }
1443
1444    #[test]
1445    fn policy_drops_stale_requests_against_pending_floor_and_prune() {
1446        let mut overflow = pending();
1447        let (get_info_4, _get_info_4_rx) = get_info(4);
1448        let (get_info_5, _get_info_5_rx) = get_info(5);
1449        let (get_info_6, _get_info_6_rx) = get_info(6);
1450        let (get_info_7, _get_info_7_rx) = get_info(7);
1451        let (get_block_4, _get_block_4_rx) = get_block(4);
1452        let (get_block_5, _get_block_5_rx) = get_block(5);
1453        let (get_block_6, _get_block_6_rx) = get_block(6);
1454        let (get_block_7, _get_block_7_rx) = get_block(7);
1455        let (get_finalization_4, _get_finalization_4_rx) = get_finalization(4);
1456        let (get_finalization_6, _get_finalization_6_rx) = get_finalization(6);
1457
1458        <TestMessage as Policy>::handle(&mut overflow, set_floor(5));
1459        <TestMessage as Policy>::handle(&mut overflow, get_info_4);
1460        <TestMessage as Policy>::handle(&mut overflow, get_info_5);
1461        <TestMessage as Policy>::handle(&mut overflow, get_block_4);
1462        <TestMessage as Policy>::handle(&mut overflow, get_block_5);
1463        <TestMessage as Policy>::handle(&mut overflow, get_finalization_4);
1464        <TestMessage as Policy>::handle(&mut overflow, hint_finalized(5, public_key(1)));
1465        <TestMessage as Policy>::handle(&mut overflow, hint_finalized(6, public_key(2)));
1466
1467        <TestMessage as Policy>::handle(&mut overflow, prune(7));
1468        assert!(has_prune(&overflow, 7));
1469        <TestMessage as Policy>::handle(&mut overflow, get_info_6);
1470        <TestMessage as Policy>::handle(&mut overflow, get_finalization_6);
1471        assert!(!has_get_finalization(&overflow, 6));
1472        <TestMessage as Policy>::handle(&mut overflow, get_block_6);
1473        <TestMessage as Policy>::handle(&mut overflow, get_info_7);
1474        assert!(has_get_info(&overflow, 7));
1475        <TestMessage as Policy>::handle(&mut overflow, get_block_7);
1476        assert!(has_get_block(&overflow, 7));
1477
1478        let drained = drain(&mut overflow);
1479        assert_eq!(drained.len(), 4);
1480        assert!(matches!(
1481            &drained[0],
1482            TestMessage::SetFloor { finalization } if finalization.round() == round(5)
1483        ));
1484        assert!(matches!(
1485            &drained[1],
1486            TestMessage::Prune { height } if *height == Height::new(7)
1487        ));
1488        assert!(matches!(
1489            &drained[2],
1490            TestMessage::GetInfo {
1491                identifier: Identifier::Height(height),
1492                ..
1493            } if *height == Height::new(7)
1494        ));
1495        assert!(matches!(
1496            &drained[3],
1497            TestMessage::GetBlock {
1498                identifier: Identifier::Height(height),
1499                ..
1500            } if *height == Height::new(7)
1501        ));
1502    }
1503
1504    #[test]
1505    fn policy_keeps_block_messages_and_waiters() {
1506        let mut overflow = pending();
1507
1508        let (proposed_message, mut proposed_ack) = proposed(4);
1509        let (verified_message, mut verified_ack) = verified(6);
1510        let (certified_message, mut certified_ack) = certified(8);
1511        overflow
1512            .messages
1513            .push_back(PendingMessage::Message(proposed_message));
1514        overflow
1515            .messages
1516            .push_back(PendingMessage::Message(verified_message));
1517        overflow
1518            .messages
1519            .push_back(PendingMessage::Message(certified_message));
1520
1521        <TestMessage as Policy>::handle(&mut overflow, set_floor(7));
1522        assert!(has_block_message(&overflow, 4));
1523        assert!(has_block_message(&overflow, 6));
1524        assert!(has_block_message(&overflow, 8));
1525        assert!(matches!(proposed_ack.try_recv(), Err(TryRecvError::Empty)));
1526        assert!(matches!(verified_ack.try_recv(), Err(TryRecvError::Empty)));
1527        assert!(matches!(certified_ack.try_recv(), Err(TryRecvError::Empty)));
1528
1529        <TestMessage as Policy>::handle(&mut overflow, prune(9));
1530        assert!(has_block_message(&overflow, 8));
1531        assert!(matches!(certified_ack.try_recv(), Err(TryRecvError::Empty)));
1532
1533        let (stale, mut stale_ack) = proposed(8);
1534        <TestMessage as Policy>::handle(&mut overflow, stale);
1535        assert!(has_block_message(&overflow, 8));
1536        assert!(matches!(stale_ack.try_recv(), Err(TryRecvError::Empty)));
1537
1538        let (current, mut current_ack) = verified(9);
1539        <TestMessage as Policy>::handle(&mut overflow, current);
1540        assert!(has_block_message(&overflow, 9));
1541        assert!(matches!(current_ack.try_recv(), Err(TryRecvError::Empty)));
1542
1543        let drained = drain(&mut overflow);
1544        assert!(matches!(drained[0], TestMessage::SetFloor { .. }));
1545        assert!(matches!(drained[1], TestMessage::Prune { .. }));
1546    }
1547}