1use crate::{
2 types::{Height, Round},
3 Block,
4};
5use bytes::{Buf, BufMut, Bytes};
6use commonware_codec::{EncodeSize, Error as CodecError, Read, ReadExt, Write};
7use commonware_resolver::{p2p::Producer, Consumer};
8use commonware_utils::{
9 channel::{mpsc, oneshot},
10 Span,
11};
12use std::{
13 fmt::{Debug, Display},
14 hash::{Hash, Hasher},
15};
16use tracing::error;
17
18const BLOCK_REQUEST: u8 = 0;
20const FINALIZED_REQUEST: u8 = 1;
21const NOTARIZED_REQUEST: u8 = 2;
22
23pub enum Message<B: Block> {
26 Deliver {
28 key: Request<B>,
30 value: Bytes,
32 response: oneshot::Sender<bool>,
34 },
35 Produce {
37 key: Request<B>,
39 response: oneshot::Sender<Bytes>,
41 },
42}
43
44#[derive(Clone)]
49pub struct Handler<B: Block> {
50 sender: mpsc::Sender<Message<B>>,
51}
52
53impl<B: Block> Handler<B> {
54 pub const fn new(sender: mpsc::Sender<Message<B>>) -> Self {
56 Self { sender }
57 }
58}
59
60impl<B: Block> Consumer for Handler<B> {
61 type Key = Request<B>;
62 type Value = Bytes;
63 type Failure = ();
64
65 async fn deliver(&mut self, key: Self::Key, value: Self::Value) -> bool {
66 let (response, receiver) = oneshot::channel();
67 if self
68 .sender
69 .send(Message::Deliver {
70 key,
71 value,
72 response,
73 })
74 .await
75 .is_err()
76 {
77 error!("failed to send deliver message to actor: receiver dropped");
78 return false;
79 }
80 receiver.await.unwrap_or(false)
81 }
82
83 async fn failed(&mut self, _: Self::Key, _: Self::Failure) {
84 }
86}
87
88impl<B: Block> Producer for Handler<B> {
89 type Key = Request<B>;
90
91 async fn produce(&mut self, key: Self::Key) -> oneshot::Receiver<Bytes> {
92 let (response, receiver) = oneshot::channel();
93 if self
94 .sender
95 .send(Message::Produce { key, response })
96 .await
97 .is_err()
98 {
99 error!("failed to send produce message to actor: receiver dropped");
100 }
101 receiver
102 }
103}
104
105#[derive(Clone)]
107pub enum Request<B: Block> {
108 Block(B::Commitment),
109 Finalized { height: Height },
110 Notarized { round: Round },
111}
112
113impl<B: Block> Request<B> {
114 const fn subject(&self) -> u8 {
116 match self {
117 Self::Block(_) => BLOCK_REQUEST,
118 Self::Finalized { .. } => FINALIZED_REQUEST,
119 Self::Notarized { .. } => NOTARIZED_REQUEST,
120 }
121 }
122
123 pub fn predicate(&self) -> impl Fn(&Self) -> bool + Send + 'static {
128 let cloned = self.clone();
129 move |s| match (&cloned, &s) {
130 (Self::Block(_), _) => unreachable!("we should never retain by block"),
131 (Self::Finalized { height: mine }, Self::Finalized { height: theirs }) => {
132 *theirs > *mine
133 }
134 (Self::Finalized { .. }, _) => true,
135 (Self::Notarized { round: mine }, Self::Notarized { round: theirs }) => *theirs > *mine,
136 (Self::Notarized { .. }, _) => true,
137 }
138 }
139}
140
141impl<B: Block> Write for Request<B> {
142 fn write(&self, buf: &mut impl BufMut) {
143 self.subject().write(buf);
144 match self {
145 Self::Block(commitment) => commitment.write(buf),
146 Self::Finalized { height } => height.write(buf),
147 Self::Notarized { round } => round.write(buf),
148 }
149 }
150}
151
152impl<B: Block> Read for Request<B> {
153 type Cfg = ();
154
155 fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
156 let request = match u8::read(buf)? {
157 BLOCK_REQUEST => Self::Block(B::Commitment::read(buf)?),
158 FINALIZED_REQUEST => Self::Finalized {
159 height: Height::read(buf)?,
160 },
161 NOTARIZED_REQUEST => Self::Notarized {
162 round: Round::read(buf)?,
163 },
164 i => return Err(CodecError::InvalidEnum(i)),
165 };
166 Ok(request)
167 }
168}
169
170impl<B: Block> EncodeSize for Request<B> {
171 fn encode_size(&self) -> usize {
172 1 + match self {
173 Self::Block(block) => block.encode_size(),
174 Self::Finalized { height } => height.encode_size(),
175 Self::Notarized { round } => round.encode_size(),
176 }
177 }
178}
179
180impl<B: Block> Span for Request<B> {}
181
182impl<B: Block> PartialEq for Request<B> {
183 fn eq(&self, other: &Self) -> bool {
184 match (&self, &other) {
185 (Self::Block(a), Self::Block(b)) => a == b,
186 (Self::Finalized { height: a }, Self::Finalized { height: b }) => a == b,
187 (Self::Notarized { round: a }, Self::Notarized { round: b }) => a == b,
188 _ => false,
189 }
190 }
191}
192
193impl<B: Block> Eq for Request<B> {}
194
195impl<B: Block> Ord for Request<B> {
196 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
197 match (&self, &other) {
198 (Self::Block(a), Self::Block(b)) => a.cmp(b),
199 (Self::Finalized { height: a }, Self::Finalized { height: b }) => a.cmp(b),
200 (Self::Notarized { round: a }, Self::Notarized { round: b }) => a.cmp(b),
201 (a, b) => a.subject().cmp(&b.subject()),
202 }
203 }
204}
205
206impl<B: Block> PartialOrd for Request<B> {
207 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
208 Some(self.cmp(other))
209 }
210}
211
212impl<B: Block> Hash for Request<B> {
213 fn hash<H: Hasher>(&self, state: &mut H) {
214 match self {
215 Self::Block(commitment) => commitment.hash(state),
216 Self::Finalized { height } => height.hash(state),
217 Self::Notarized { round } => round.hash(state),
218 }
219 }
220}
221
222impl<B: Block> Display for Request<B> {
223 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
224 match self {
225 Self::Block(commitment) => write!(f, "Block({commitment:?})"),
226 Self::Finalized { height } => write!(f, "Finalized({height:?})"),
227 Self::Notarized { round } => write!(f, "Notarized({round:?})"),
228 }
229 }
230}
231
232impl<B: Block> Debug for Request<B> {
233 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234 match self {
235 Self::Block(commitment) => write!(f, "Block({commitment:?})"),
236 Self::Finalized { height } => write!(f, "Finalized({height:?})"),
237 Self::Notarized { round } => write!(f, "Notarized({round:?})"),
238 }
239 }
240}
241
242#[cfg(feature = "arbitrary")]
243impl<B: Block> arbitrary::Arbitrary<'_> for Request<B>
244where
245 B::Commitment: for<'a> arbitrary::Arbitrary<'a>,
246{
247 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
248 let choice = u.int_in_range(0..=2)?;
249 match choice {
250 0 => {
251 let commitment = B::Commitment::arbitrary(u)?;
252 Ok(Self::Block(commitment))
253 }
254 1 => {
255 let height = u.arbitrary::<Height>()?;
256 Ok(Self::Finalized { height })
257 }
258 2 => {
259 let round = Round::arbitrary(u)?;
260 Ok(Self::Notarized { round })
261 }
262 _ => unreachable!(),
263 }
264 }
265}
266
267#[cfg(test)]
268mod tests {
269 use super::*;
270 use crate::{
271 marshal::mocks::block::Block as TestBlock,
272 simplex::types::Context,
273 types::{Epoch, View},
274 };
275 use commonware_codec::{Encode, ReadExt};
276 use commonware_cryptography::{
277 ed25519::PublicKey,
278 sha256::{Digest as Sha256Digest, Sha256},
279 Hasher as _,
280 };
281 use std::collections::BTreeSet;
282
283 type Ctx = Context<Sha256Digest, PublicKey>;
284 type B = TestBlock<Sha256Digest, Ctx>;
285
286 #[test]
287 fn test_subject_block_encoding() {
288 let commitment = Sha256::hash(b"test");
289 let request = Request::<B>::Block(commitment);
290
291 let encoded = request.encode();
293 assert_eq!(encoded.len(), 33); assert_eq!(encoded[0], 0); let mut buf = encoded.as_ref();
298 let decoded = Request::<B>::read(&mut buf).unwrap();
299 assert_eq!(request, decoded);
300 assert_eq!(decoded, Request::Block(commitment));
301 }
302
303 #[test]
304 fn test_subject_finalized_encoding() {
305 let height = Height::new(12345);
306 let request = Request::<B>::Finalized { height };
307
308 let encoded = request.encode();
310 assert_eq!(encoded[0], 1); let mut buf = encoded.as_ref();
314 let decoded = Request::<B>::read(&mut buf).unwrap();
315 assert_eq!(request, decoded);
316 assert_eq!(decoded, Request::Finalized { height });
317 }
318
319 #[test]
320 fn test_subject_notarized_encoding() {
321 let round = Round::new(Epoch::new(67890), View::new(12345));
322 let request = Request::<B>::Notarized { round };
323
324 let encoded = request.encode();
326 assert_eq!(encoded[0], 2); let mut buf = encoded.as_ref();
330 let decoded = Request::<B>::read(&mut buf).unwrap();
331 assert_eq!(request, decoded);
332 assert_eq!(decoded, Request::Notarized { round });
333 }
334
335 #[test]
336 fn test_subject_hash() {
337 use std::collections::HashSet;
338
339 let r1 = Request::<B>::Finalized {
340 height: Height::new(100),
341 };
342 let r2 = Request::<B>::Finalized {
343 height: Height::new(100),
344 };
345 let r3 = Request::<B>::Finalized {
346 height: Height::new(200),
347 };
348
349 let mut set = HashSet::new();
350 set.insert(r1);
351 assert!(!set.insert(r2)); assert!(set.insert(r3)); }
354
355 #[test]
356 fn test_subject_predicate() {
357 let r1 = Request::<B>::Finalized {
358 height: Height::new(100),
359 };
360 let r2 = Request::<B>::Finalized {
361 height: Height::new(200),
362 };
363 let r3 = Request::<B>::Notarized {
364 round: Round::new(Epoch::new(333), View::new(150)),
365 };
366
367 let predicate = r1.predicate();
368 assert!(predicate(&r2)); assert!(predicate(&r3)); let r1_same = Request::<B>::Finalized {
372 height: Height::new(100),
373 };
374 assert!(!predicate(&r1_same)); }
376
377 #[test]
378 fn test_encode_size() {
379 let commitment = Sha256::hash(&[0u8; 32]);
380 let r1 = Request::<B>::Block(commitment);
381 let r2 = Request::<B>::Finalized {
382 height: Height::new(u64::MAX),
383 };
384 let r3 = Request::<B>::Notarized {
385 round: Round::new(Epoch::new(333), View::new(0)),
386 };
387
388 assert_eq!(r1.encode_size(), r1.encode().len());
390 assert_eq!(r2.encode_size(), r2.encode().len());
391 assert_eq!(r3.encode_size(), r3.encode().len());
392 }
393
394 #[test]
395 fn test_request_ord_same_variant() {
396 let commitment1 = Sha256::hash(b"test1");
398 let commitment2 = Sha256::hash(b"test2");
399 let block1 = Request::<B>::Block(commitment1);
400 let block2 = Request::<B>::Block(commitment2);
401
402 if commitment1 < commitment2 {
404 assert!(block1 < block2);
405 assert!(block2 > block1);
406 } else {
407 assert!(block1 > block2);
408 assert!(block2 < block1);
409 }
410
411 let fin1 = Request::<B>::Finalized {
413 height: Height::new(100),
414 };
415 let fin2 = Request::<B>::Finalized {
416 height: Height::new(200),
417 };
418 let fin3 = Request::<B>::Finalized {
419 height: Height::new(200),
420 };
421
422 assert!(fin1 < fin2);
423 assert!(fin2 > fin1);
424 assert_eq!(fin2.cmp(&fin3), std::cmp::Ordering::Equal);
425
426 let not1 = Request::<B>::Notarized {
428 round: Round::new(Epoch::new(333), View::new(50)),
429 };
430 let not2 = Request::<B>::Notarized {
431 round: Round::new(Epoch::new(333), View::new(150)),
432 };
433 let not3 = Request::<B>::Notarized {
434 round: Round::new(Epoch::new(333), View::new(150)),
435 };
436
437 assert!(not1 < not2);
438 assert!(not2 > not1);
439 assert_eq!(not2.cmp(¬3), std::cmp::Ordering::Equal);
440 }
441
442 #[test]
443 fn test_request_ord_cross_variant() {
444 let commitment = Sha256::hash(b"test");
445 let block = Request::<B>::Block(commitment);
446 let finalized = Request::<B>::Finalized {
447 height: Height::new(100),
448 };
449 let notarized = Request::<B>::Notarized {
450 round: Round::new(Epoch::new(333), View::new(200)),
451 };
452
453 assert!(block < finalized);
455 assert!(block < notarized);
456 assert!(finalized < notarized);
457
458 assert!(finalized > block);
459 assert!(notarized > block);
460 assert!(notarized > finalized);
461
462 assert_eq!(block.cmp(&finalized), std::cmp::Ordering::Less);
464 assert_eq!(block.cmp(¬arized), std::cmp::Ordering::Less);
465 assert_eq!(finalized.cmp(¬arized), std::cmp::Ordering::Less);
466 assert_eq!(finalized.cmp(&block), std::cmp::Ordering::Greater);
467 assert_eq!(notarized.cmp(&block), std::cmp::Ordering::Greater);
468 assert_eq!(notarized.cmp(&finalized), std::cmp::Ordering::Greater);
469 }
470
471 #[test]
472 fn test_request_partial_ord() {
473 let commitment1 = Sha256::hash(b"test1");
474 let commitment2 = Sha256::hash(b"test2");
475 let block1 = Request::<B>::Block(commitment1);
476 let block2 = Request::<B>::Block(commitment2);
477 let finalized = Request::<B>::Finalized {
478 height: Height::new(100),
479 };
480 let notarized = Request::<B>::Notarized {
481 round: Round::new(Epoch::new(333), View::new(200)),
482 };
483
484 assert!(block1.partial_cmp(&block2).is_some());
486 assert!(block1.partial_cmp(&finalized).is_some());
487 assert!(finalized.partial_cmp(¬arized).is_some());
488
489 assert_eq!(
491 block1.partial_cmp(&finalized),
492 Some(std::cmp::Ordering::Less)
493 );
494 assert_eq!(
495 finalized.partial_cmp(¬arized),
496 Some(std::cmp::Ordering::Less)
497 );
498 assert_eq!(
499 notarized.partial_cmp(&block1),
500 Some(std::cmp::Ordering::Greater)
501 );
502 }
503
504 #[test]
505 fn test_request_ord_sorting() {
506 let commitment1 = Sha256::hash(b"a");
507 let commitment2 = Sha256::hash(b"b");
508 let commitment3 = Sha256::hash(b"c");
509
510 let requests = vec![
511 Request::<B>::Notarized {
512 round: Round::new(Epoch::new(333), View::new(300)),
513 },
514 Request::<B>::Block(commitment2),
515 Request::<B>::Finalized {
516 height: Height::new(200),
517 },
518 Request::<B>::Block(commitment1),
519 Request::<B>::Notarized {
520 round: Round::new(Epoch::new(333), View::new(250)),
521 },
522 Request::<B>::Finalized {
523 height: Height::new(100),
524 },
525 Request::<B>::Block(commitment3),
526 ];
527
528 let sorted: Vec<_> = requests
530 .into_iter()
531 .collect::<BTreeSet<_>>()
532 .into_iter()
533 .collect();
534
535 assert_eq!(sorted.len(), 7);
537
538 assert!(matches!(sorted[0], Request::<B>::Block(_)));
540 assert!(matches!(sorted[1], Request::<B>::Block(_)));
541 assert!(matches!(sorted[2], Request::<B>::Block(_)));
542
543 assert_eq!(
545 sorted[3],
546 Request::<B>::Finalized {
547 height: Height::new(100)
548 }
549 );
550 assert_eq!(
551 sorted[4],
552 Request::<B>::Finalized {
553 height: Height::new(200)
554 }
555 );
556
557 assert_eq!(
559 sorted[5],
560 Request::<B>::Notarized {
561 round: Round::new(Epoch::new(333), View::new(250))
562 }
563 );
564 assert_eq!(
565 sorted[6],
566 Request::<B>::Notarized {
567 round: Round::new(Epoch::new(333), View::new(300))
568 }
569 );
570 }
571
572 #[test]
573 fn test_request_ord_edge_cases() {
574 let min_finalized = Request::<B>::Finalized {
576 height: Height::zero(),
577 };
578 let max_finalized = Request::<B>::Finalized {
579 height: Height::new(u64::MAX),
580 };
581 let min_notarized = Request::<B>::Notarized {
582 round: Round::new(Epoch::new(333), View::new(0)),
583 };
584 let max_notarized = Request::<B>::Notarized {
585 round: Round::new(Epoch::new(333), View::new(u64::MAX)),
586 };
587
588 assert!(min_finalized < max_finalized);
589 assert!(min_notarized < max_notarized);
590 assert!(max_finalized < min_notarized);
591
592 let commitment = Sha256::hash(b"self");
594 let block = Request::<B>::Block(commitment);
595 assert_eq!(block.cmp(&block), std::cmp::Ordering::Equal);
596 assert_eq!(min_finalized.cmp(&min_finalized), std::cmp::Ordering::Equal);
597 assert_eq!(max_notarized.cmp(&max_notarized), std::cmp::Ordering::Equal);
598 }
599
600 #[cfg(feature = "arbitrary")]
601 mod conformance {
602 use super::*;
603 use commonware_codec::conformance::CodecConformance;
604
605 commonware_conformance::conformance_tests! {
606 CodecConformance<Request<B>>,
607 }
608 }
609}