Skip to main content

commonware_consensus/marshal/ingress/
handler.rs

1use crate::{
2    types::{Height, Round},
3    Block,
4};
5use bytes::{Buf, BufMut, Bytes};
6use commonware_codec::{EncodeSize, Error as CodecError, Read, ReadExt, Write};
7use commonware_resolver::{p2p::Producer, Consumer};
8use commonware_utils::{
9    channel::{mpsc, oneshot},
10    Span,
11};
12use std::{
13    fmt::{Debug, Display},
14    hash::{Hash, Hasher},
15};
16use tracing::error;
17
18/// The subject of a backfill request.
19const BLOCK_REQUEST: u8 = 0;
20const FINALIZED_REQUEST: u8 = 1;
21const NOTARIZED_REQUEST: u8 = 2;
22
23/// Messages sent from the resolver's [Consumer]/[Producer] implementation
24/// to the marshal [Actor](super::super::actor::Actor).
25pub enum Message<B: Block> {
26    /// A request to deliver a value for a given key.
27    Deliver {
28        /// The key of the value being delivered.
29        key: Request<B>,
30        /// The value being delivered.
31        value: Bytes,
32        /// A channel to send the result of the delivery (true for success).
33        response: oneshot::Sender<bool>,
34    },
35    /// A request to produce a value for a given key.
36    Produce {
37        /// The key of the value to produce.
38        key: Request<B>,
39        /// A channel to send the produced value.
40        response: oneshot::Sender<Bytes>,
41    },
42}
43
44/// A handler that forwards requests from the resolver to the marshal actor.
45///
46/// This struct implements the [Consumer] and [Producer] traits from the
47/// resolver, and acts as a bridge to the main actor loop.
48#[derive(Clone)]
49pub struct Handler<B: Block> {
50    sender: mpsc::Sender<Message<B>>,
51}
52
53impl<B: Block> Handler<B> {
54    /// Creates a new handler.
55    pub const fn new(sender: mpsc::Sender<Message<B>>) -> Self {
56        Self { sender }
57    }
58}
59
60impl<B: Block> Consumer for Handler<B> {
61    type Key = Request<B>;
62    type Value = Bytes;
63    type Failure = ();
64
65    async fn deliver(&mut self, key: Self::Key, value: Self::Value) -> bool {
66        let (response, receiver) = oneshot::channel();
67        if self
68            .sender
69            .send(Message::Deliver {
70                key,
71                value,
72                response,
73            })
74            .await
75            .is_err()
76        {
77            error!("failed to send deliver message to actor: receiver dropped");
78            return false;
79        }
80        receiver.await.unwrap_or(false)
81    }
82
83    async fn failed(&mut self, _: Self::Key, _: Self::Failure) {
84        // We don't need to do anything on failure, the resolver will retry.
85    }
86}
87
88impl<B: Block> Producer for Handler<B> {
89    type Key = Request<B>;
90
91    async fn produce(&mut self, key: Self::Key) -> oneshot::Receiver<Bytes> {
92        let (response, receiver) = oneshot::channel();
93        if self
94            .sender
95            .send(Message::Produce { key, response })
96            .await
97            .is_err()
98        {
99            error!("failed to send produce message to actor: receiver dropped");
100        }
101        receiver
102    }
103}
104
105/// A request for backfilling data.
106#[derive(Clone)]
107pub enum Request<B: Block> {
108    Block(B::Commitment),
109    Finalized { height: Height },
110    Notarized { round: Round },
111}
112
113impl<B: Block> Request<B> {
114    /// The subject of the request.
115    const fn subject(&self) -> u8 {
116        match self {
117            Self::Block(_) => BLOCK_REQUEST,
118            Self::Finalized { .. } => FINALIZED_REQUEST,
119            Self::Notarized { .. } => NOTARIZED_REQUEST,
120        }
121    }
122
123    /// The predicate to use when pruning subjects related to this subject.
124    ///
125    /// Specifically, any subjects unrelated will be left unmodified. Any related
126    /// subjects will be pruned if they are "less than or equal to" this subject.
127    pub fn predicate(&self) -> impl Fn(&Self) -> bool + Send + 'static {
128        let cloned = self.clone();
129        move |s| match (&cloned, &s) {
130            (Self::Block(_), _) => unreachable!("we should never retain by block"),
131            (Self::Finalized { height: mine }, Self::Finalized { height: theirs }) => {
132                *theirs > *mine
133            }
134            (Self::Finalized { .. }, _) => true,
135            (Self::Notarized { round: mine }, Self::Notarized { round: theirs }) => *theirs > *mine,
136            (Self::Notarized { .. }, _) => true,
137        }
138    }
139}
140
141impl<B: Block> Write for Request<B> {
142    fn write(&self, buf: &mut impl BufMut) {
143        self.subject().write(buf);
144        match self {
145            Self::Block(commitment) => commitment.write(buf),
146            Self::Finalized { height } => height.write(buf),
147            Self::Notarized { round } => round.write(buf),
148        }
149    }
150}
151
152impl<B: Block> Read for Request<B> {
153    type Cfg = ();
154
155    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
156        let request = match u8::read(buf)? {
157            BLOCK_REQUEST => Self::Block(B::Commitment::read(buf)?),
158            FINALIZED_REQUEST => Self::Finalized {
159                height: Height::read(buf)?,
160            },
161            NOTARIZED_REQUEST => Self::Notarized {
162                round: Round::read(buf)?,
163            },
164            i => return Err(CodecError::InvalidEnum(i)),
165        };
166        Ok(request)
167    }
168}
169
170impl<B: Block> EncodeSize for Request<B> {
171    fn encode_size(&self) -> usize {
172        1 + match self {
173            Self::Block(block) => block.encode_size(),
174            Self::Finalized { height } => height.encode_size(),
175            Self::Notarized { round } => round.encode_size(),
176        }
177    }
178}
179
180impl<B: Block> Span for Request<B> {}
181
182impl<B: Block> PartialEq for Request<B> {
183    fn eq(&self, other: &Self) -> bool {
184        match (&self, &other) {
185            (Self::Block(a), Self::Block(b)) => a == b,
186            (Self::Finalized { height: a }, Self::Finalized { height: b }) => a == b,
187            (Self::Notarized { round: a }, Self::Notarized { round: b }) => a == b,
188            _ => false,
189        }
190    }
191}
192
193impl<B: Block> Eq for Request<B> {}
194
195impl<B: Block> Ord for Request<B> {
196    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
197        match (&self, &other) {
198            (Self::Block(a), Self::Block(b)) => a.cmp(b),
199            (Self::Finalized { height: a }, Self::Finalized { height: b }) => a.cmp(b),
200            (Self::Notarized { round: a }, Self::Notarized { round: b }) => a.cmp(b),
201            (a, b) => a.subject().cmp(&b.subject()),
202        }
203    }
204}
205
206impl<B: Block> PartialOrd for Request<B> {
207    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
208        Some(self.cmp(other))
209    }
210}
211
212impl<B: Block> Hash for Request<B> {
213    fn hash<H: Hasher>(&self, state: &mut H) {
214        match self {
215            Self::Block(commitment) => commitment.hash(state),
216            Self::Finalized { height } => height.hash(state),
217            Self::Notarized { round } => round.hash(state),
218        }
219    }
220}
221
222impl<B: Block> Display for Request<B> {
223    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
224        match self {
225            Self::Block(commitment) => write!(f, "Block({commitment:?})"),
226            Self::Finalized { height } => write!(f, "Finalized({height:?})"),
227            Self::Notarized { round } => write!(f, "Notarized({round:?})"),
228        }
229    }
230}
231
232impl<B: Block> Debug for Request<B> {
233    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234        match self {
235            Self::Block(commitment) => write!(f, "Block({commitment:?})"),
236            Self::Finalized { height } => write!(f, "Finalized({height:?})"),
237            Self::Notarized { round } => write!(f, "Notarized({round:?})"),
238        }
239    }
240}
241
242#[cfg(feature = "arbitrary")]
243impl<B: Block> arbitrary::Arbitrary<'_> for Request<B>
244where
245    B::Commitment: for<'a> arbitrary::Arbitrary<'a>,
246{
247    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
248        let choice = u.int_in_range(0..=2)?;
249        match choice {
250            0 => {
251                let commitment = B::Commitment::arbitrary(u)?;
252                Ok(Self::Block(commitment))
253            }
254            1 => {
255                let height = u.arbitrary::<Height>()?;
256                Ok(Self::Finalized { height })
257            }
258            2 => {
259                let round = Round::arbitrary(u)?;
260                Ok(Self::Notarized { round })
261            }
262            _ => unreachable!(),
263        }
264    }
265}
266
267#[cfg(test)]
268mod tests {
269    use super::*;
270    use crate::{
271        marshal::mocks::block::Block as TestBlock,
272        simplex::types::Context,
273        types::{Epoch, View},
274    };
275    use commonware_codec::{Encode, ReadExt};
276    use commonware_cryptography::{
277        ed25519::PublicKey,
278        sha256::{Digest as Sha256Digest, Sha256},
279        Hasher as _,
280    };
281    use std::collections::BTreeSet;
282
283    type Ctx = Context<Sha256Digest, PublicKey>;
284    type B = TestBlock<Sha256Digest, Ctx>;
285
286    #[test]
287    fn test_subject_block_encoding() {
288        let commitment = Sha256::hash(b"test");
289        let request = Request::<B>::Block(commitment);
290
291        // Test encoding
292        let encoded = request.encode();
293        assert_eq!(encoded.len(), 33); // 1 byte for enum variant + 32 bytes for commitment
294        assert_eq!(encoded[0], 0); // Block variant
295
296        // Test decoding
297        let mut buf = encoded.as_ref();
298        let decoded = Request::<B>::read(&mut buf).unwrap();
299        assert_eq!(request, decoded);
300        assert_eq!(decoded, Request::Block(commitment));
301    }
302
303    #[test]
304    fn test_subject_finalized_encoding() {
305        let height = Height::new(12345);
306        let request = Request::<B>::Finalized { height };
307
308        // Test encoding
309        let encoded = request.encode();
310        assert_eq!(encoded[0], 1); // Finalized variant
311
312        // Test decoding
313        let mut buf = encoded.as_ref();
314        let decoded = Request::<B>::read(&mut buf).unwrap();
315        assert_eq!(request, decoded);
316        assert_eq!(decoded, Request::Finalized { height });
317    }
318
319    #[test]
320    fn test_subject_notarized_encoding() {
321        let round = Round::new(Epoch::new(67890), View::new(12345));
322        let request = Request::<B>::Notarized { round };
323
324        // Test encoding
325        let encoded = request.encode();
326        assert_eq!(encoded[0], 2); // Notarized variant
327
328        // Test decoding
329        let mut buf = encoded.as_ref();
330        let decoded = Request::<B>::read(&mut buf).unwrap();
331        assert_eq!(request, decoded);
332        assert_eq!(decoded, Request::Notarized { round });
333    }
334
335    #[test]
336    fn test_subject_hash() {
337        use std::collections::HashSet;
338
339        let r1 = Request::<B>::Finalized {
340            height: Height::new(100),
341        };
342        let r2 = Request::<B>::Finalized {
343            height: Height::new(100),
344        };
345        let r3 = Request::<B>::Finalized {
346            height: Height::new(200),
347        };
348
349        let mut set = HashSet::new();
350        set.insert(r1);
351        assert!(!set.insert(r2)); // Should not insert duplicate
352        assert!(set.insert(r3)); // Should insert different value
353    }
354
355    #[test]
356    fn test_subject_predicate() {
357        let r1 = Request::<B>::Finalized {
358            height: Height::new(100),
359        };
360        let r2 = Request::<B>::Finalized {
361            height: Height::new(200),
362        };
363        let r3 = Request::<B>::Notarized {
364            round: Round::new(Epoch::new(333), View::new(150)),
365        };
366
367        let predicate = r1.predicate();
368        assert!(predicate(&r2)); // r2.height > r1.height
369        assert!(predicate(&r3)); // Different variant (notarized)
370
371        let r1_same = Request::<B>::Finalized {
372            height: Height::new(100),
373        };
374        assert!(!predicate(&r1_same)); // Same height, should not pass
375    }
376
377    #[test]
378    fn test_encode_size() {
379        let commitment = Sha256::hash(&[0u8; 32]);
380        let r1 = Request::<B>::Block(commitment);
381        let r2 = Request::<B>::Finalized {
382            height: Height::new(u64::MAX),
383        };
384        let r3 = Request::<B>::Notarized {
385            round: Round::new(Epoch::new(333), View::new(0)),
386        };
387
388        // Verify encode_size matches actual encoded length
389        assert_eq!(r1.encode_size(), r1.encode().len());
390        assert_eq!(r2.encode_size(), r2.encode().len());
391        assert_eq!(r3.encode_size(), r3.encode().len());
392    }
393
394    #[test]
395    fn test_request_ord_same_variant() {
396        // Test ordering within the same variant
397        let commitment1 = Sha256::hash(b"test1");
398        let commitment2 = Sha256::hash(b"test2");
399        let block1 = Request::<B>::Block(commitment1);
400        let block2 = Request::<B>::Block(commitment2);
401
402        // Block ordering depends on commitment ordering
403        if commitment1 < commitment2 {
404            assert!(block1 < block2);
405            assert!(block2 > block1);
406        } else {
407            assert!(block1 > block2);
408            assert!(block2 < block1);
409        }
410
411        // Finalized ordering by height
412        let fin1 = Request::<B>::Finalized {
413            height: Height::new(100),
414        };
415        let fin2 = Request::<B>::Finalized {
416            height: Height::new(200),
417        };
418        let fin3 = Request::<B>::Finalized {
419            height: Height::new(200),
420        };
421
422        assert!(fin1 < fin2);
423        assert!(fin2 > fin1);
424        assert_eq!(fin2.cmp(&fin3), std::cmp::Ordering::Equal);
425
426        // Notarized ordering by view
427        let not1 = Request::<B>::Notarized {
428            round: Round::new(Epoch::new(333), View::new(50)),
429        };
430        let not2 = Request::<B>::Notarized {
431            round: Round::new(Epoch::new(333), View::new(150)),
432        };
433        let not3 = Request::<B>::Notarized {
434            round: Round::new(Epoch::new(333), View::new(150)),
435        };
436
437        assert!(not1 < not2);
438        assert!(not2 > not1);
439        assert_eq!(not2.cmp(&not3), std::cmp::Ordering::Equal);
440    }
441
442    #[test]
443    fn test_request_ord_cross_variant() {
444        let commitment = Sha256::hash(b"test");
445        let block = Request::<B>::Block(commitment);
446        let finalized = Request::<B>::Finalized {
447            height: Height::new(100),
448        };
449        let notarized = Request::<B>::Notarized {
450            round: Round::new(Epoch::new(333), View::new(200)),
451        };
452
453        // Block < Finalized < Notarized
454        assert!(block < finalized);
455        assert!(block < notarized);
456        assert!(finalized < notarized);
457
458        assert!(finalized > block);
459        assert!(notarized > block);
460        assert!(notarized > finalized);
461
462        // Test all combinations
463        assert_eq!(block.cmp(&finalized), std::cmp::Ordering::Less);
464        assert_eq!(block.cmp(&notarized), std::cmp::Ordering::Less);
465        assert_eq!(finalized.cmp(&notarized), std::cmp::Ordering::Less);
466        assert_eq!(finalized.cmp(&block), std::cmp::Ordering::Greater);
467        assert_eq!(notarized.cmp(&block), std::cmp::Ordering::Greater);
468        assert_eq!(notarized.cmp(&finalized), std::cmp::Ordering::Greater);
469    }
470
471    #[test]
472    fn test_request_partial_ord() {
473        let commitment1 = Sha256::hash(b"test1");
474        let commitment2 = Sha256::hash(b"test2");
475        let block1 = Request::<B>::Block(commitment1);
476        let block2 = Request::<B>::Block(commitment2);
477        let finalized = Request::<B>::Finalized {
478            height: Height::new(100),
479        };
480        let notarized = Request::<B>::Notarized {
481            round: Round::new(Epoch::new(333), View::new(200)),
482        };
483
484        // PartialOrd should always return Some
485        assert!(block1.partial_cmp(&block2).is_some());
486        assert!(block1.partial_cmp(&finalized).is_some());
487        assert!(finalized.partial_cmp(&notarized).is_some());
488
489        // Verify consistency with Ord
490        assert_eq!(
491            block1.partial_cmp(&finalized),
492            Some(std::cmp::Ordering::Less)
493        );
494        assert_eq!(
495            finalized.partial_cmp(&notarized),
496            Some(std::cmp::Ordering::Less)
497        );
498        assert_eq!(
499            notarized.partial_cmp(&block1),
500            Some(std::cmp::Ordering::Greater)
501        );
502    }
503
504    #[test]
505    fn test_request_ord_sorting() {
506        let commitment1 = Sha256::hash(b"a");
507        let commitment2 = Sha256::hash(b"b");
508        let commitment3 = Sha256::hash(b"c");
509
510        let requests = vec![
511            Request::<B>::Notarized {
512                round: Round::new(Epoch::new(333), View::new(300)),
513            },
514            Request::<B>::Block(commitment2),
515            Request::<B>::Finalized {
516                height: Height::new(200),
517            },
518            Request::<B>::Block(commitment1),
519            Request::<B>::Notarized {
520                round: Round::new(Epoch::new(333), View::new(250)),
521            },
522            Request::<B>::Finalized {
523                height: Height::new(100),
524            },
525            Request::<B>::Block(commitment3),
526        ];
527
528        // Sort using BTreeSet (uses Ord)
529        let sorted: Vec<_> = requests
530            .into_iter()
531            .collect::<BTreeSet<_>>()
532            .into_iter()
533            .collect();
534
535        // Verify order: all Blocks first (sorted by commitment), then Finalized (by height), then Notarized (by view)
536        assert_eq!(sorted.len(), 7);
537
538        // Check that all blocks come first
539        assert!(matches!(sorted[0], Request::<B>::Block(_)));
540        assert!(matches!(sorted[1], Request::<B>::Block(_)));
541        assert!(matches!(sorted[2], Request::<B>::Block(_)));
542
543        // Check that finalized come next
544        assert_eq!(
545            sorted[3],
546            Request::<B>::Finalized {
547                height: Height::new(100)
548            }
549        );
550        assert_eq!(
551            sorted[4],
552            Request::<B>::Finalized {
553                height: Height::new(200)
554            }
555        );
556
557        // Check that notarized come last
558        assert_eq!(
559            sorted[5],
560            Request::<B>::Notarized {
561                round: Round::new(Epoch::new(333), View::new(250))
562            }
563        );
564        assert_eq!(
565            sorted[6],
566            Request::<B>::Notarized {
567                round: Round::new(Epoch::new(333), View::new(300))
568            }
569        );
570    }
571
572    #[test]
573    fn test_request_ord_edge_cases() {
574        // Test with extreme values
575        let min_finalized = Request::<B>::Finalized {
576            height: Height::zero(),
577        };
578        let max_finalized = Request::<B>::Finalized {
579            height: Height::new(u64::MAX),
580        };
581        let min_notarized = Request::<B>::Notarized {
582            round: Round::new(Epoch::new(333), View::new(0)),
583        };
584        let max_notarized = Request::<B>::Notarized {
585            round: Round::new(Epoch::new(333), View::new(u64::MAX)),
586        };
587
588        assert!(min_finalized < max_finalized);
589        assert!(min_notarized < max_notarized);
590        assert!(max_finalized < min_notarized);
591
592        // Test self-comparison
593        let commitment = Sha256::hash(b"self");
594        let block = Request::<B>::Block(commitment);
595        assert_eq!(block.cmp(&block), std::cmp::Ordering::Equal);
596        assert_eq!(min_finalized.cmp(&min_finalized), std::cmp::Ordering::Equal);
597        assert_eq!(max_notarized.cmp(&max_notarized), std::cmp::Ordering::Equal);
598    }
599
600    #[cfg(feature = "arbitrary")]
601    mod conformance {
602        use super::*;
603        use commonware_codec::conformance::CodecConformance;
604
605        commonware_conformance::conformance_tests! {
606            CodecConformance<Request<B>>,
607        }
608    }
609}