commonware_consensus/marshal/ingress/
handler.rs

1use crate::{
2    types::{Height, Round},
3    Block,
4};
5use bytes::{Buf, BufMut, Bytes};
6use commonware_codec::{EncodeSize, Error as CodecError, Read, ReadExt, Write};
7use commonware_resolver::{p2p::Producer, Consumer};
8use commonware_utils::Span;
9use futures::{
10    channel::{mpsc, oneshot},
11    SinkExt,
12};
13use std::{
14    fmt::{Debug, Display},
15    hash::{Hash, Hasher},
16};
17use tracing::error;
18
19/// The subject of a backfill request.
20const BLOCK_REQUEST: u8 = 0;
21const FINALIZED_REQUEST: u8 = 1;
22const NOTARIZED_REQUEST: u8 = 2;
23
24/// Messages sent from the resolver's [Consumer]/[Producer] implementation
25/// to the marshal [Actor](super::super::actor::Actor).
26pub enum Message<B: Block> {
27    /// A request to deliver a value for a given key.
28    Deliver {
29        /// The key of the value being delivered.
30        key: Request<B>,
31        /// The value being delivered.
32        value: Bytes,
33        /// A channel to send the result of the delivery (true for success).
34        response: oneshot::Sender<bool>,
35    },
36    /// A request to produce a value for a given key.
37    Produce {
38        /// The key of the value to produce.
39        key: Request<B>,
40        /// A channel to send the produced value.
41        response: oneshot::Sender<Bytes>,
42    },
43}
44
45/// A handler that forwards requests from the resolver to the marshal actor.
46///
47/// This struct implements the [Consumer] and [Producer] traits from the
48/// resolver, and acts as a bridge to the main actor loop.
49#[derive(Clone)]
50pub struct Handler<B: Block> {
51    sender: mpsc::Sender<Message<B>>,
52}
53
54impl<B: Block> Handler<B> {
55    /// Creates a new handler.
56    pub const fn new(sender: mpsc::Sender<Message<B>>) -> Self {
57        Self { sender }
58    }
59}
60
61impl<B: Block> Consumer for Handler<B> {
62    type Key = Request<B>;
63    type Value = Bytes;
64    type Failure = ();
65
66    async fn deliver(&mut self, key: Self::Key, value: Self::Value) -> bool {
67        let (response, receiver) = oneshot::channel();
68        if self
69            .sender
70            .send(Message::Deliver {
71                key,
72                value,
73                response,
74            })
75            .await
76            .is_err()
77        {
78            error!("failed to send deliver message to actor: receiver dropped");
79            return false;
80        }
81        receiver.await.unwrap_or(false)
82    }
83
84    async fn failed(&mut self, _: Self::Key, _: Self::Failure) {
85        // We don't need to do anything on failure, the resolver will retry.
86    }
87}
88
89impl<B: Block> Producer for Handler<B> {
90    type Key = Request<B>;
91
92    async fn produce(&mut self, key: Self::Key) -> oneshot::Receiver<Bytes> {
93        let (response, receiver) = oneshot::channel();
94        if self
95            .sender
96            .send(Message::Produce { key, response })
97            .await
98            .is_err()
99        {
100            error!("failed to send produce message to actor: receiver dropped");
101        }
102        receiver
103    }
104}
105
106/// A request for backfilling data.
107#[derive(Clone)]
108pub enum Request<B: Block> {
109    Block(B::Commitment),
110    Finalized { height: Height },
111    Notarized { round: Round },
112}
113
114impl<B: Block> Request<B> {
115    /// The subject of the request.
116    const fn subject(&self) -> u8 {
117        match self {
118            Self::Block(_) => BLOCK_REQUEST,
119            Self::Finalized { .. } => FINALIZED_REQUEST,
120            Self::Notarized { .. } => NOTARIZED_REQUEST,
121        }
122    }
123
124    /// The predicate to use when pruning subjects related to this subject.
125    ///
126    /// Specifically, any subjects unrelated will be left unmodified. Any related
127    /// subjects will be pruned if they are "less than or equal to" this subject.
128    pub fn predicate(&self) -> impl Fn(&Self) -> bool + Send + 'static {
129        let cloned = self.clone();
130        move |s| match (&cloned, &s) {
131            (Self::Block(_), _) => unreachable!("we should never retain by block"),
132            (Self::Finalized { height: mine }, Self::Finalized { height: theirs }) => {
133                *theirs > *mine
134            }
135            (Self::Finalized { .. }, _) => true,
136            (Self::Notarized { round: mine }, Self::Notarized { round: theirs }) => *theirs > *mine,
137            (Self::Notarized { .. }, _) => true,
138        }
139    }
140}
141
142impl<B: Block> Write for Request<B> {
143    fn write(&self, buf: &mut impl BufMut) {
144        self.subject().write(buf);
145        match self {
146            Self::Block(commitment) => commitment.write(buf),
147            Self::Finalized { height } => height.write(buf),
148            Self::Notarized { round } => round.write(buf),
149        }
150    }
151}
152
153impl<B: Block> Read for Request<B> {
154    type Cfg = ();
155
156    fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
157        let request = match u8::read(buf)? {
158            BLOCK_REQUEST => Self::Block(B::Commitment::read(buf)?),
159            FINALIZED_REQUEST => Self::Finalized {
160                height: Height::read(buf)?,
161            },
162            NOTARIZED_REQUEST => Self::Notarized {
163                round: Round::read(buf)?,
164            },
165            i => return Err(CodecError::InvalidEnum(i)),
166        };
167        Ok(request)
168    }
169}
170
171impl<B: Block> EncodeSize for Request<B> {
172    fn encode_size(&self) -> usize {
173        1 + match self {
174            Self::Block(block) => block.encode_size(),
175            Self::Finalized { height } => height.encode_size(),
176            Self::Notarized { round } => round.encode_size(),
177        }
178    }
179}
180
181impl<B: Block> Span for Request<B> {}
182
183impl<B: Block> PartialEq for Request<B> {
184    fn eq(&self, other: &Self) -> bool {
185        match (&self, &other) {
186            (Self::Block(a), Self::Block(b)) => a == b,
187            (Self::Finalized { height: a }, Self::Finalized { height: b }) => a == b,
188            (Self::Notarized { round: a }, Self::Notarized { round: b }) => a == b,
189            _ => false,
190        }
191    }
192}
193
194impl<B: Block> Eq for Request<B> {}
195
196impl<B: Block> Ord for Request<B> {
197    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
198        match (&self, &other) {
199            (Self::Block(a), Self::Block(b)) => a.cmp(b),
200            (Self::Finalized { height: a }, Self::Finalized { height: b }) => a.cmp(b),
201            (Self::Notarized { round: a }, Self::Notarized { round: b }) => a.cmp(b),
202            (a, b) => a.subject().cmp(&b.subject()),
203        }
204    }
205}
206
207impl<B: Block> PartialOrd for Request<B> {
208    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
209        Some(self.cmp(other))
210    }
211}
212
213impl<B: Block> Hash for Request<B> {
214    fn hash<H: Hasher>(&self, state: &mut H) {
215        match self {
216            Self::Block(commitment) => commitment.hash(state),
217            Self::Finalized { height } => height.hash(state),
218            Self::Notarized { round } => round.hash(state),
219        }
220    }
221}
222
223impl<B: Block> Display for Request<B> {
224    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
225        match self {
226            Self::Block(commitment) => write!(f, "Block({commitment:?})"),
227            Self::Finalized { height } => write!(f, "Finalized({height:?})"),
228            Self::Notarized { round } => write!(f, "Notarized({round:?})"),
229        }
230    }
231}
232
233impl<B: Block> Debug for Request<B> {
234    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
235        match self {
236            Self::Block(commitment) => write!(f, "Block({commitment:?})"),
237            Self::Finalized { height } => write!(f, "Finalized({height:?})"),
238            Self::Notarized { round } => write!(f, "Notarized({round:?})"),
239        }
240    }
241}
242
243#[cfg(feature = "arbitrary")]
244impl<B: Block> arbitrary::Arbitrary<'_> for Request<B>
245where
246    B::Commitment: for<'a> arbitrary::Arbitrary<'a>,
247{
248    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
249        let choice = u.int_in_range(0..=2)?;
250        match choice {
251            0 => {
252                let commitment = B::Commitment::arbitrary(u)?;
253                Ok(Self::Block(commitment))
254            }
255            1 => {
256                let height = u.arbitrary::<Height>()?;
257                Ok(Self::Finalized { height })
258            }
259            2 => {
260                let round = Round::arbitrary(u)?;
261                Ok(Self::Notarized { round })
262            }
263            _ => unreachable!(),
264        }
265    }
266}
267
268#[cfg(test)]
269mod tests {
270    use super::*;
271    use crate::{
272        marshal::mocks::block::Block as TestBlock,
273        types::{Epoch, View},
274    };
275    use commonware_codec::{Encode, ReadExt};
276    use commonware_cryptography::{
277        sha256::{Digest as Sha256Digest, Sha256},
278        Hasher as _,
279    };
280    use std::collections::BTreeSet;
281
282    type B = TestBlock<Sha256Digest>;
283
284    #[test]
285    fn test_subject_block_encoding() {
286        let commitment = Sha256::hash(b"test");
287        let request = Request::<B>::Block(commitment);
288
289        // Test encoding
290        let encoded = request.encode();
291        assert_eq!(encoded.len(), 33); // 1 byte for enum variant + 32 bytes for commitment
292        assert_eq!(encoded[0], 0); // Block variant
293
294        // Test decoding
295        let mut buf = encoded.as_ref();
296        let decoded = Request::<B>::read(&mut buf).unwrap();
297        assert_eq!(request, decoded);
298        assert_eq!(decoded, Request::Block(commitment));
299    }
300
301    #[test]
302    fn test_subject_finalized_encoding() {
303        let height = Height::new(12345);
304        let request = Request::<B>::Finalized { height };
305
306        // Test encoding
307        let encoded = request.encode();
308        assert_eq!(encoded[0], 1); // Finalized variant
309
310        // Test decoding
311        let mut buf = encoded.as_ref();
312        let decoded = Request::<B>::read(&mut buf).unwrap();
313        assert_eq!(request, decoded);
314        assert_eq!(decoded, Request::Finalized { height });
315    }
316
317    #[test]
318    fn test_subject_notarized_encoding() {
319        let round = Round::new(Epoch::new(67890), View::new(12345));
320        let request = Request::<B>::Notarized { round };
321
322        // Test encoding
323        let encoded = request.encode();
324        assert_eq!(encoded[0], 2); // Notarized variant
325
326        // Test decoding
327        let mut buf = encoded.as_ref();
328        let decoded = Request::<B>::read(&mut buf).unwrap();
329        assert_eq!(request, decoded);
330        assert_eq!(decoded, Request::Notarized { round });
331    }
332
333    #[test]
334    fn test_subject_hash() {
335        use std::collections::HashSet;
336
337        let r1 = Request::<B>::Finalized {
338            height: Height::new(100),
339        };
340        let r2 = Request::<B>::Finalized {
341            height: Height::new(100),
342        };
343        let r3 = Request::<B>::Finalized {
344            height: Height::new(200),
345        };
346
347        let mut set = HashSet::new();
348        set.insert(r1);
349        assert!(!set.insert(r2)); // Should not insert duplicate
350        assert!(set.insert(r3)); // Should insert different value
351    }
352
353    #[test]
354    fn test_subject_predicate() {
355        let r1 = Request::<B>::Finalized {
356            height: Height::new(100),
357        };
358        let r2 = Request::<B>::Finalized {
359            height: Height::new(200),
360        };
361        let r3 = Request::<B>::Notarized {
362            round: Round::new(Epoch::new(333), View::new(150)),
363        };
364
365        let predicate = r1.predicate();
366        assert!(predicate(&r2)); // r2.height > r1.height
367        assert!(predicate(&r3)); // Different variant (notarized)
368
369        let r1_same = Request::<B>::Finalized {
370            height: Height::new(100),
371        };
372        assert!(!predicate(&r1_same)); // Same height, should not pass
373    }
374
375    #[test]
376    fn test_encode_size() {
377        let commitment = Sha256::hash(&[0u8; 32]);
378        let r1 = Request::<B>::Block(commitment);
379        let r2 = Request::<B>::Finalized {
380            height: Height::new(u64::MAX),
381        };
382        let r3 = Request::<B>::Notarized {
383            round: Round::new(Epoch::new(333), View::new(0)),
384        };
385
386        // Verify encode_size matches actual encoded length
387        assert_eq!(r1.encode_size(), r1.encode().len());
388        assert_eq!(r2.encode_size(), r2.encode().len());
389        assert_eq!(r3.encode_size(), r3.encode().len());
390    }
391
392    #[test]
393    fn test_request_ord_same_variant() {
394        // Test ordering within the same variant
395        let commitment1 = Sha256::hash(b"test1");
396        let commitment2 = Sha256::hash(b"test2");
397        let block1 = Request::<B>::Block(commitment1);
398        let block2 = Request::<B>::Block(commitment2);
399
400        // Block ordering depends on commitment ordering
401        if commitment1 < commitment2 {
402            assert!(block1 < block2);
403            assert!(block2 > block1);
404        } else {
405            assert!(block1 > block2);
406            assert!(block2 < block1);
407        }
408
409        // Finalized ordering by height
410        let fin1 = Request::<B>::Finalized {
411            height: Height::new(100),
412        };
413        let fin2 = Request::<B>::Finalized {
414            height: Height::new(200),
415        };
416        let fin3 = Request::<B>::Finalized {
417            height: Height::new(200),
418        };
419
420        assert!(fin1 < fin2);
421        assert!(fin2 > fin1);
422        assert_eq!(fin2.cmp(&fin3), std::cmp::Ordering::Equal);
423
424        // Notarized ordering by view
425        let not1 = Request::<B>::Notarized {
426            round: Round::new(Epoch::new(333), View::new(50)),
427        };
428        let not2 = Request::<B>::Notarized {
429            round: Round::new(Epoch::new(333), View::new(150)),
430        };
431        let not3 = Request::<B>::Notarized {
432            round: Round::new(Epoch::new(333), View::new(150)),
433        };
434
435        assert!(not1 < not2);
436        assert!(not2 > not1);
437        assert_eq!(not2.cmp(&not3), std::cmp::Ordering::Equal);
438    }
439
440    #[test]
441    fn test_request_ord_cross_variant() {
442        let commitment = Sha256::hash(b"test");
443        let block = Request::<B>::Block(commitment);
444        let finalized = Request::<B>::Finalized {
445            height: Height::new(100),
446        };
447        let notarized = Request::<B>::Notarized {
448            round: Round::new(Epoch::new(333), View::new(200)),
449        };
450
451        // Block < Finalized < Notarized
452        assert!(block < finalized);
453        assert!(block < notarized);
454        assert!(finalized < notarized);
455
456        assert!(finalized > block);
457        assert!(notarized > block);
458        assert!(notarized > finalized);
459
460        // Test all combinations
461        assert_eq!(block.cmp(&finalized), std::cmp::Ordering::Less);
462        assert_eq!(block.cmp(&notarized), std::cmp::Ordering::Less);
463        assert_eq!(finalized.cmp(&notarized), std::cmp::Ordering::Less);
464        assert_eq!(finalized.cmp(&block), std::cmp::Ordering::Greater);
465        assert_eq!(notarized.cmp(&block), std::cmp::Ordering::Greater);
466        assert_eq!(notarized.cmp(&finalized), std::cmp::Ordering::Greater);
467    }
468
469    #[test]
470    fn test_request_partial_ord() {
471        let commitment1 = Sha256::hash(b"test1");
472        let commitment2 = Sha256::hash(b"test2");
473        let block1 = Request::<B>::Block(commitment1);
474        let block2 = Request::<B>::Block(commitment2);
475        let finalized = Request::<B>::Finalized {
476            height: Height::new(100),
477        };
478        let notarized = Request::<B>::Notarized {
479            round: Round::new(Epoch::new(333), View::new(200)),
480        };
481
482        // PartialOrd should always return Some
483        assert!(block1.partial_cmp(&block2).is_some());
484        assert!(block1.partial_cmp(&finalized).is_some());
485        assert!(finalized.partial_cmp(&notarized).is_some());
486
487        // Verify consistency with Ord
488        assert_eq!(
489            block1.partial_cmp(&finalized),
490            Some(std::cmp::Ordering::Less)
491        );
492        assert_eq!(
493            finalized.partial_cmp(&notarized),
494            Some(std::cmp::Ordering::Less)
495        );
496        assert_eq!(
497            notarized.partial_cmp(&block1),
498            Some(std::cmp::Ordering::Greater)
499        );
500    }
501
502    #[test]
503    fn test_request_ord_sorting() {
504        let commitment1 = Sha256::hash(b"a");
505        let commitment2 = Sha256::hash(b"b");
506        let commitment3 = Sha256::hash(b"c");
507
508        let requests = vec![
509            Request::<B>::Notarized {
510                round: Round::new(Epoch::new(333), View::new(300)),
511            },
512            Request::<B>::Block(commitment2),
513            Request::<B>::Finalized {
514                height: Height::new(200),
515            },
516            Request::<B>::Block(commitment1),
517            Request::<B>::Notarized {
518                round: Round::new(Epoch::new(333), View::new(250)),
519            },
520            Request::<B>::Finalized {
521                height: Height::new(100),
522            },
523            Request::<B>::Block(commitment3),
524        ];
525
526        // Sort using BTreeSet (uses Ord)
527        let sorted: Vec<_> = requests
528            .into_iter()
529            .collect::<BTreeSet<_>>()
530            .into_iter()
531            .collect();
532
533        // Verify order: all Blocks first (sorted by commitment), then Finalized (by height), then Notarized (by view)
534        assert_eq!(sorted.len(), 7);
535
536        // Check that all blocks come first
537        assert!(matches!(sorted[0], Request::<B>::Block(_)));
538        assert!(matches!(sorted[1], Request::<B>::Block(_)));
539        assert!(matches!(sorted[2], Request::<B>::Block(_)));
540
541        // Check that finalized come next
542        assert_eq!(
543            sorted[3],
544            Request::<B>::Finalized {
545                height: Height::new(100)
546            }
547        );
548        assert_eq!(
549            sorted[4],
550            Request::<B>::Finalized {
551                height: Height::new(200)
552            }
553        );
554
555        // Check that notarized come last
556        assert_eq!(
557            sorted[5],
558            Request::<B>::Notarized {
559                round: Round::new(Epoch::new(333), View::new(250))
560            }
561        );
562        assert_eq!(
563            sorted[6],
564            Request::<B>::Notarized {
565                round: Round::new(Epoch::new(333), View::new(300))
566            }
567        );
568    }
569
570    #[test]
571    fn test_request_ord_edge_cases() {
572        // Test with extreme values
573        let min_finalized = Request::<B>::Finalized {
574            height: Height::zero(),
575        };
576        let max_finalized = Request::<B>::Finalized {
577            height: Height::new(u64::MAX),
578        };
579        let min_notarized = Request::<B>::Notarized {
580            round: Round::new(Epoch::new(333), View::new(0)),
581        };
582        let max_notarized = Request::<B>::Notarized {
583            round: Round::new(Epoch::new(333), View::new(u64::MAX)),
584        };
585
586        assert!(min_finalized < max_finalized);
587        assert!(min_notarized < max_notarized);
588        assert!(max_finalized < min_notarized);
589
590        // Test self-comparison
591        let commitment = Sha256::hash(b"self");
592        let block = Request::<B>::Block(commitment);
593        assert_eq!(block.cmp(&block), std::cmp::Ordering::Equal);
594        assert_eq!(min_finalized.cmp(&min_finalized), std::cmp::Ordering::Equal);
595        assert_eq!(max_notarized.cmp(&max_notarized), std::cmp::Ordering::Equal);
596    }
597
598    #[cfg(feature = "arbitrary")]
599    mod conformance {
600        use super::*;
601        use commonware_codec::conformance::CodecConformance;
602
603        commonware_conformance::conformance_tests! {
604            CodecConformance<Request<B>>,
605        }
606    }
607}