commonware_resolver/p2p/
ingress.rs1use crate::Resolver;
2use commonware_cryptography::PublicKey;
3use commonware_utils::{vec::NonEmptyVec, Span};
4use futures::{channel::mpsc, SinkExt};
5
6type Predicate<K> = Box<dyn Fn(&K) -> bool + Send>;
7
8pub struct FetchRequest<K, P> {
10 pub key: K,
12 pub targets: Option<NonEmptyVec<P>>,
17}
18
19pub enum Message<K, P> {
21 Fetch(Vec<FetchRequest<K, P>>),
23
24 Cancel { key: K },
26
27 Clear,
29
30 Retain { predicate: Predicate<K> },
32}
33
34#[derive(Clone)]
36pub struct Mailbox<K, P> {
37 sender: mpsc::Sender<Message<K, P>>,
39}
40
41impl<K, P> Mailbox<K, P> {
42 pub(super) const fn new(sender: mpsc::Sender<Message<K, P>>) -> Self {
44 Self { sender }
45 }
46}
47
48impl<K: Span, P: PublicKey> Resolver for Mailbox<K, P> {
49 type Key = K;
50 type PublicKey = P;
51
52 async fn fetch(&mut self, key: Self::Key) {
59 self.sender
60 .send(Message::Fetch(vec![FetchRequest { key, targets: None }]))
61 .await
62 .expect("Failed to send fetch");
63 }
64
65 async fn fetch_all(&mut self, keys: Vec<Self::Key>) {
72 self.sender
73 .send(Message::Fetch(
74 keys.into_iter()
75 .map(|key| FetchRequest { key, targets: None })
76 .collect(),
77 ))
78 .await
79 .expect("Failed to send fetch_all");
80 }
81
82 async fn fetch_targeted(&mut self, key: Self::Key, targets: NonEmptyVec<Self::PublicKey>) {
83 self.sender
84 .send(Message::Fetch(vec![FetchRequest {
85 key,
86 targets: Some(targets),
87 }]))
88 .await
89 .expect("Failed to send fetch_targeted");
90 }
91
92 async fn fetch_all_targeted(
93 &mut self,
94 requests: Vec<(Self::Key, NonEmptyVec<Self::PublicKey>)>,
95 ) {
96 self.sender
97 .send(Message::Fetch(
98 requests
99 .into_iter()
100 .map(|(key, targets)| FetchRequest {
101 key,
102 targets: Some(targets),
103 })
104 .collect(),
105 ))
106 .await
107 .expect("Failed to send fetch_all_targeted");
108 }
109
110 async fn cancel(&mut self, key: Self::Key) {
114 self.sender
115 .send(Message::Cancel { key })
116 .await
117 .expect("Failed to send cancel_fetch");
118 }
119
120 async fn retain(&mut self, predicate: impl Fn(&Self::Key) -> bool + Send + 'static) {
124 self.sender
125 .send(Message::Retain {
126 predicate: Box::new(predicate),
127 })
128 .await
129 .expect("Failed to send retain");
130 }
131
132 async fn clear(&mut self) {
136 self.sender
137 .send(Message::Clear)
138 .await
139 .expect("Failed to send cancel_all");
140 }
141}