1use crate::{
2 types::{Height, Round},
3 Block,
4};
5use bytes::{Buf, BufMut, Bytes};
6use commonware_codec::{EncodeSize, Error as CodecError, Read, ReadExt, Write};
7use commonware_resolver::{p2p::Producer, Consumer};
8use commonware_utils::Span;
9use futures::{
10 channel::{mpsc, oneshot},
11 SinkExt,
12};
13use std::{
14 fmt::{Debug, Display},
15 hash::{Hash, Hasher},
16};
17use tracing::error;
18
19const BLOCK_REQUEST: u8 = 0;
21const FINALIZED_REQUEST: u8 = 1;
22const NOTARIZED_REQUEST: u8 = 2;
23
24pub enum Message<B: Block> {
27 Deliver {
29 key: Request<B>,
31 value: Bytes,
33 response: oneshot::Sender<bool>,
35 },
36 Produce {
38 key: Request<B>,
40 response: oneshot::Sender<Bytes>,
42 },
43}
44
45#[derive(Clone)]
50pub struct Handler<B: Block> {
51 sender: mpsc::Sender<Message<B>>,
52}
53
54impl<B: Block> Handler<B> {
55 pub const fn new(sender: mpsc::Sender<Message<B>>) -> Self {
57 Self { sender }
58 }
59}
60
61impl<B: Block> Consumer for Handler<B> {
62 type Key = Request<B>;
63 type Value = Bytes;
64 type Failure = ();
65
66 async fn deliver(&mut self, key: Self::Key, value: Self::Value) -> bool {
67 let (response, receiver) = oneshot::channel();
68 if self
69 .sender
70 .send(Message::Deliver {
71 key,
72 value,
73 response,
74 })
75 .await
76 .is_err()
77 {
78 error!("failed to send deliver message to actor: receiver dropped");
79 return false;
80 }
81 receiver.await.unwrap_or(false)
82 }
83
84 async fn failed(&mut self, _: Self::Key, _: Self::Failure) {
85 }
87}
88
89impl<B: Block> Producer for Handler<B> {
90 type Key = Request<B>;
91
92 async fn produce(&mut self, key: Self::Key) -> oneshot::Receiver<Bytes> {
93 let (response, receiver) = oneshot::channel();
94 if self
95 .sender
96 .send(Message::Produce { key, response })
97 .await
98 .is_err()
99 {
100 error!("failed to send produce message to actor: receiver dropped");
101 }
102 receiver
103 }
104}
105
106#[derive(Clone)]
108pub enum Request<B: Block> {
109 Block(B::Commitment),
110 Finalized { height: Height },
111 Notarized { round: Round },
112}
113
114impl<B: Block> Request<B> {
115 const fn subject(&self) -> u8 {
117 match self {
118 Self::Block(_) => BLOCK_REQUEST,
119 Self::Finalized { .. } => FINALIZED_REQUEST,
120 Self::Notarized { .. } => NOTARIZED_REQUEST,
121 }
122 }
123
124 pub fn predicate(&self) -> impl Fn(&Self) -> bool + Send + 'static {
129 let cloned = self.clone();
130 move |s| match (&cloned, &s) {
131 (Self::Block(_), _) => unreachable!("we should never retain by block"),
132 (Self::Finalized { height: mine }, Self::Finalized { height: theirs }) => {
133 *theirs > *mine
134 }
135 (Self::Finalized { .. }, _) => true,
136 (Self::Notarized { round: mine }, Self::Notarized { round: theirs }) => *theirs > *mine,
137 (Self::Notarized { .. }, _) => true,
138 }
139 }
140}
141
142impl<B: Block> Write for Request<B> {
143 fn write(&self, buf: &mut impl BufMut) {
144 self.subject().write(buf);
145 match self {
146 Self::Block(commitment) => commitment.write(buf),
147 Self::Finalized { height } => height.write(buf),
148 Self::Notarized { round } => round.write(buf),
149 }
150 }
151}
152
153impl<B: Block> Read for Request<B> {
154 type Cfg = ();
155
156 fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
157 let request = match u8::read(buf)? {
158 BLOCK_REQUEST => Self::Block(B::Commitment::read(buf)?),
159 FINALIZED_REQUEST => Self::Finalized {
160 height: Height::read(buf)?,
161 },
162 NOTARIZED_REQUEST => Self::Notarized {
163 round: Round::read(buf)?,
164 },
165 i => return Err(CodecError::InvalidEnum(i)),
166 };
167 Ok(request)
168 }
169}
170
171impl<B: Block> EncodeSize for Request<B> {
172 fn encode_size(&self) -> usize {
173 1 + match self {
174 Self::Block(block) => block.encode_size(),
175 Self::Finalized { height } => height.encode_size(),
176 Self::Notarized { round } => round.encode_size(),
177 }
178 }
179}
180
181impl<B: Block> Span for Request<B> {}
182
183impl<B: Block> PartialEq for Request<B> {
184 fn eq(&self, other: &Self) -> bool {
185 match (&self, &other) {
186 (Self::Block(a), Self::Block(b)) => a == b,
187 (Self::Finalized { height: a }, Self::Finalized { height: b }) => a == b,
188 (Self::Notarized { round: a }, Self::Notarized { round: b }) => a == b,
189 _ => false,
190 }
191 }
192}
193
194impl<B: Block> Eq for Request<B> {}
195
196impl<B: Block> Ord for Request<B> {
197 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
198 match (&self, &other) {
199 (Self::Block(a), Self::Block(b)) => a.cmp(b),
200 (Self::Finalized { height: a }, Self::Finalized { height: b }) => a.cmp(b),
201 (Self::Notarized { round: a }, Self::Notarized { round: b }) => a.cmp(b),
202 (a, b) => a.subject().cmp(&b.subject()),
203 }
204 }
205}
206
207impl<B: Block> PartialOrd for Request<B> {
208 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
209 Some(self.cmp(other))
210 }
211}
212
213impl<B: Block> Hash for Request<B> {
214 fn hash<H: Hasher>(&self, state: &mut H) {
215 match self {
216 Self::Block(commitment) => commitment.hash(state),
217 Self::Finalized { height } => height.hash(state),
218 Self::Notarized { round } => round.hash(state),
219 }
220 }
221}
222
223impl<B: Block> Display for Request<B> {
224 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
225 match self {
226 Self::Block(commitment) => write!(f, "Block({commitment:?})"),
227 Self::Finalized { height } => write!(f, "Finalized({height:?})"),
228 Self::Notarized { round } => write!(f, "Notarized({round:?})"),
229 }
230 }
231}
232
233impl<B: Block> Debug for Request<B> {
234 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
235 match self {
236 Self::Block(commitment) => write!(f, "Block({commitment:?})"),
237 Self::Finalized { height } => write!(f, "Finalized({height:?})"),
238 Self::Notarized { round } => write!(f, "Notarized({round:?})"),
239 }
240 }
241}
242
243#[cfg(feature = "arbitrary")]
244impl<B: Block> arbitrary::Arbitrary<'_> for Request<B>
245where
246 B::Commitment: for<'a> arbitrary::Arbitrary<'a>,
247{
248 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
249 let choice = u.int_in_range(0..=2)?;
250 match choice {
251 0 => {
252 let commitment = B::Commitment::arbitrary(u)?;
253 Ok(Self::Block(commitment))
254 }
255 1 => {
256 let height = u.arbitrary::<Height>()?;
257 Ok(Self::Finalized { height })
258 }
259 2 => {
260 let round = Round::arbitrary(u)?;
261 Ok(Self::Notarized { round })
262 }
263 _ => unreachable!(),
264 }
265 }
266}
267
268#[cfg(test)]
269mod tests {
270 use super::*;
271 use crate::{
272 marshal::mocks::block::Block as TestBlock,
273 types::{Epoch, View},
274 };
275 use commonware_codec::{Encode, ReadExt};
276 use commonware_cryptography::{
277 sha256::{Digest as Sha256Digest, Sha256},
278 Hasher as _,
279 };
280 use std::collections::BTreeSet;
281
282 type B = TestBlock<Sha256Digest>;
283
284 #[test]
285 fn test_subject_block_encoding() {
286 let commitment = Sha256::hash(b"test");
287 let request = Request::<B>::Block(commitment);
288
289 let encoded = request.encode();
291 assert_eq!(encoded.len(), 33); assert_eq!(encoded[0], 0); let mut buf = encoded.as_ref();
296 let decoded = Request::<B>::read(&mut buf).unwrap();
297 assert_eq!(request, decoded);
298 assert_eq!(decoded, Request::Block(commitment));
299 }
300
301 #[test]
302 fn test_subject_finalized_encoding() {
303 let height = Height::new(12345);
304 let request = Request::<B>::Finalized { height };
305
306 let encoded = request.encode();
308 assert_eq!(encoded[0], 1); let mut buf = encoded.as_ref();
312 let decoded = Request::<B>::read(&mut buf).unwrap();
313 assert_eq!(request, decoded);
314 assert_eq!(decoded, Request::Finalized { height });
315 }
316
317 #[test]
318 fn test_subject_notarized_encoding() {
319 let round = Round::new(Epoch::new(67890), View::new(12345));
320 let request = Request::<B>::Notarized { round };
321
322 let encoded = request.encode();
324 assert_eq!(encoded[0], 2); let mut buf = encoded.as_ref();
328 let decoded = Request::<B>::read(&mut buf).unwrap();
329 assert_eq!(request, decoded);
330 assert_eq!(decoded, Request::Notarized { round });
331 }
332
333 #[test]
334 fn test_subject_hash() {
335 use std::collections::HashSet;
336
337 let r1 = Request::<B>::Finalized {
338 height: Height::new(100),
339 };
340 let r2 = Request::<B>::Finalized {
341 height: Height::new(100),
342 };
343 let r3 = Request::<B>::Finalized {
344 height: Height::new(200),
345 };
346
347 let mut set = HashSet::new();
348 set.insert(r1);
349 assert!(!set.insert(r2)); assert!(set.insert(r3)); }
352
353 #[test]
354 fn test_subject_predicate() {
355 let r1 = Request::<B>::Finalized {
356 height: Height::new(100),
357 };
358 let r2 = Request::<B>::Finalized {
359 height: Height::new(200),
360 };
361 let r3 = Request::<B>::Notarized {
362 round: Round::new(Epoch::new(333), View::new(150)),
363 };
364
365 let predicate = r1.predicate();
366 assert!(predicate(&r2)); assert!(predicate(&r3)); let r1_same = Request::<B>::Finalized {
370 height: Height::new(100),
371 };
372 assert!(!predicate(&r1_same)); }
374
375 #[test]
376 fn test_encode_size() {
377 let commitment = Sha256::hash(&[0u8; 32]);
378 let r1 = Request::<B>::Block(commitment);
379 let r2 = Request::<B>::Finalized {
380 height: Height::new(u64::MAX),
381 };
382 let r3 = Request::<B>::Notarized {
383 round: Round::new(Epoch::new(333), View::new(0)),
384 };
385
386 assert_eq!(r1.encode_size(), r1.encode().len());
388 assert_eq!(r2.encode_size(), r2.encode().len());
389 assert_eq!(r3.encode_size(), r3.encode().len());
390 }
391
392 #[test]
393 fn test_request_ord_same_variant() {
394 let commitment1 = Sha256::hash(b"test1");
396 let commitment2 = Sha256::hash(b"test2");
397 let block1 = Request::<B>::Block(commitment1);
398 let block2 = Request::<B>::Block(commitment2);
399
400 if commitment1 < commitment2 {
402 assert!(block1 < block2);
403 assert!(block2 > block1);
404 } else {
405 assert!(block1 > block2);
406 assert!(block2 < block1);
407 }
408
409 let fin1 = Request::<B>::Finalized {
411 height: Height::new(100),
412 };
413 let fin2 = Request::<B>::Finalized {
414 height: Height::new(200),
415 };
416 let fin3 = Request::<B>::Finalized {
417 height: Height::new(200),
418 };
419
420 assert!(fin1 < fin2);
421 assert!(fin2 > fin1);
422 assert_eq!(fin2.cmp(&fin3), std::cmp::Ordering::Equal);
423
424 let not1 = Request::<B>::Notarized {
426 round: Round::new(Epoch::new(333), View::new(50)),
427 };
428 let not2 = Request::<B>::Notarized {
429 round: Round::new(Epoch::new(333), View::new(150)),
430 };
431 let not3 = Request::<B>::Notarized {
432 round: Round::new(Epoch::new(333), View::new(150)),
433 };
434
435 assert!(not1 < not2);
436 assert!(not2 > not1);
437 assert_eq!(not2.cmp(¬3), std::cmp::Ordering::Equal);
438 }
439
440 #[test]
441 fn test_request_ord_cross_variant() {
442 let commitment = Sha256::hash(b"test");
443 let block = Request::<B>::Block(commitment);
444 let finalized = Request::<B>::Finalized {
445 height: Height::new(100),
446 };
447 let notarized = Request::<B>::Notarized {
448 round: Round::new(Epoch::new(333), View::new(200)),
449 };
450
451 assert!(block < finalized);
453 assert!(block < notarized);
454 assert!(finalized < notarized);
455
456 assert!(finalized > block);
457 assert!(notarized > block);
458 assert!(notarized > finalized);
459
460 assert_eq!(block.cmp(&finalized), std::cmp::Ordering::Less);
462 assert_eq!(block.cmp(¬arized), std::cmp::Ordering::Less);
463 assert_eq!(finalized.cmp(¬arized), std::cmp::Ordering::Less);
464 assert_eq!(finalized.cmp(&block), std::cmp::Ordering::Greater);
465 assert_eq!(notarized.cmp(&block), std::cmp::Ordering::Greater);
466 assert_eq!(notarized.cmp(&finalized), std::cmp::Ordering::Greater);
467 }
468
469 #[test]
470 fn test_request_partial_ord() {
471 let commitment1 = Sha256::hash(b"test1");
472 let commitment2 = Sha256::hash(b"test2");
473 let block1 = Request::<B>::Block(commitment1);
474 let block2 = Request::<B>::Block(commitment2);
475 let finalized = Request::<B>::Finalized {
476 height: Height::new(100),
477 };
478 let notarized = Request::<B>::Notarized {
479 round: Round::new(Epoch::new(333), View::new(200)),
480 };
481
482 assert!(block1.partial_cmp(&block2).is_some());
484 assert!(block1.partial_cmp(&finalized).is_some());
485 assert!(finalized.partial_cmp(¬arized).is_some());
486
487 assert_eq!(
489 block1.partial_cmp(&finalized),
490 Some(std::cmp::Ordering::Less)
491 );
492 assert_eq!(
493 finalized.partial_cmp(¬arized),
494 Some(std::cmp::Ordering::Less)
495 );
496 assert_eq!(
497 notarized.partial_cmp(&block1),
498 Some(std::cmp::Ordering::Greater)
499 );
500 }
501
502 #[test]
503 fn test_request_ord_sorting() {
504 let commitment1 = Sha256::hash(b"a");
505 let commitment2 = Sha256::hash(b"b");
506 let commitment3 = Sha256::hash(b"c");
507
508 let requests = vec![
509 Request::<B>::Notarized {
510 round: Round::new(Epoch::new(333), View::new(300)),
511 },
512 Request::<B>::Block(commitment2),
513 Request::<B>::Finalized {
514 height: Height::new(200),
515 },
516 Request::<B>::Block(commitment1),
517 Request::<B>::Notarized {
518 round: Round::new(Epoch::new(333), View::new(250)),
519 },
520 Request::<B>::Finalized {
521 height: Height::new(100),
522 },
523 Request::<B>::Block(commitment3),
524 ];
525
526 let sorted: Vec<_> = requests
528 .into_iter()
529 .collect::<BTreeSet<_>>()
530 .into_iter()
531 .collect();
532
533 assert_eq!(sorted.len(), 7);
535
536 assert!(matches!(sorted[0], Request::<B>::Block(_)));
538 assert!(matches!(sorted[1], Request::<B>::Block(_)));
539 assert!(matches!(sorted[2], Request::<B>::Block(_)));
540
541 assert_eq!(
543 sorted[3],
544 Request::<B>::Finalized {
545 height: Height::new(100)
546 }
547 );
548 assert_eq!(
549 sorted[4],
550 Request::<B>::Finalized {
551 height: Height::new(200)
552 }
553 );
554
555 assert_eq!(
557 sorted[5],
558 Request::<B>::Notarized {
559 round: Round::new(Epoch::new(333), View::new(250))
560 }
561 );
562 assert_eq!(
563 sorted[6],
564 Request::<B>::Notarized {
565 round: Round::new(Epoch::new(333), View::new(300))
566 }
567 );
568 }
569
570 #[test]
571 fn test_request_ord_edge_cases() {
572 let min_finalized = Request::<B>::Finalized {
574 height: Height::zero(),
575 };
576 let max_finalized = Request::<B>::Finalized {
577 height: Height::new(u64::MAX),
578 };
579 let min_notarized = Request::<B>::Notarized {
580 round: Round::new(Epoch::new(333), View::new(0)),
581 };
582 let max_notarized = Request::<B>::Notarized {
583 round: Round::new(Epoch::new(333), View::new(u64::MAX)),
584 };
585
586 assert!(min_finalized < max_finalized);
587 assert!(min_notarized < max_notarized);
588 assert!(max_finalized < min_notarized);
589
590 let commitment = Sha256::hash(b"self");
592 let block = Request::<B>::Block(commitment);
593 assert_eq!(block.cmp(&block), std::cmp::Ordering::Equal);
594 assert_eq!(min_finalized.cmp(&min_finalized), std::cmp::Ordering::Equal);
595 assert_eq!(max_notarized.cmp(&max_notarized), std::cmp::Ordering::Equal);
596 }
597
598 #[cfg(feature = "arbitrary")]
599 mod conformance {
600 use super::*;
601 use commonware_codec::conformance::CodecConformance;
602
603 commonware_conformance::conformance_tests! {
604 CodecConformance<Request<B>>,
605 }
606 }
607}