1use crate::{ingress, Fetch, Resolver, TargetedResolver};
2use commonware_actor::{mailbox::Sender, Feedback};
3use commonware_cryptography::PublicKey;
4use commonware_utils::{vec::NonEmptyVec, Span};
5
6pub type FetchKey<K, P, S> = ingress::FetchKey<K, S, Option<NonEmptyVec<P>>>;
8
9pub type Message<K, P, S> = ingress::Message<K, S, Option<NonEmptyVec<P>>>;
11
12fn fetch_key<K, P, S>(fetch: Fetch<K, S>, targets: Option<NonEmptyVec<P>>) -> FetchKey<K, P, S> {
13 FetchKey {
14 key: fetch.key,
15 subscribers: NonEmptyVec::new(fetch.subscriber),
16 metadata: targets,
17 }
18}
19
20#[derive(Clone)]
22pub struct Mailbox<K: Span, P: Eq, S: Eq = ()> {
23 sender: Sender<Message<K, P, S>>,
25}
26
27impl<K: Span, P: Eq, S: Eq> Mailbox<K, P, S> {
28 pub(super) const fn new(sender: Sender<Message<K, P, S>>) -> Self {
30 Self { sender }
31 }
32}
33
34impl<K, P, S> Resolver for Mailbox<K, P, S>
35where
36 K: Span,
37 P: PublicKey,
38 S: Clone + Eq + Send + 'static,
39{
40 type Key = K;
41 type Subscriber = S;
42
43 fn fetch<D>(&mut self, key: D) -> Feedback
50 where
51 D: Into<Fetch<Self::Key, Self::Subscriber>> + Send,
52 {
53 let Fetch { key, subscriber } = key.into();
54 self.sender.enqueue(Message::Fetch(vec![FetchKey {
55 key,
56 subscribers: NonEmptyVec::new(subscriber),
57 metadata: None,
58 }]))
59 }
60
61 fn fetch_all<D>(&mut self, keys: Vec<D>) -> Feedback
68 where
69 D: Into<Fetch<Self::Key, Self::Subscriber>> + Send,
70 {
71 self.sender.enqueue(Message::Fetch(
72 keys.into_iter()
73 .map(|key| fetch_key(key.into(), None))
74 .collect(),
75 ))
76 }
77
78 fn retain(
82 &mut self,
83 predicate: impl Fn(&Self::Key, &Self::Subscriber) -> bool + Send + 'static,
84 ) -> Feedback {
85 self.sender.enqueue(Message::Retain {
86 predicate: Box::new(predicate),
87 })
88 }
89}
90
91impl<K, P, S> TargetedResolver for Mailbox<K, P, S>
92where
93 K: Span,
94 P: PublicKey,
95 S: Clone + Eq + Send + 'static,
96{
97 type PublicKey = P;
98
99 fn fetch_targeted(
113 &mut self,
114 key: impl Into<Fetch<Self::Key, Self::Subscriber>> + Send,
115 targets: NonEmptyVec<Self::PublicKey>,
116 ) -> Feedback {
117 let Fetch { key, subscriber } = key.into();
118 self.sender.enqueue(Message::Fetch(vec![FetchKey {
119 key,
120 subscribers: NonEmptyVec::new(subscriber),
121 metadata: Some(targets),
122 }]))
123 }
124
125 fn fetch_all_targeted<D>(&mut self, keys: Vec<(D, NonEmptyVec<Self::PublicKey>)>) -> Feedback
129 where
130 D: Into<Fetch<Self::Key, Self::Subscriber>> + Send,
131 {
132 self.sender.enqueue(Message::Fetch(
133 keys.into_iter()
134 .map(|(key, targets)| fetch_key(key.into(), Some(targets)))
135 .collect(),
136 ))
137 }
138}
139
140#[cfg(test)]
141mod tests {
142 use super::*;
143 use commonware_actor::mailbox::{Overflow, Policy};
144
145 type TestMessage = Message<u8, u8, u16>;
146 type TestPending = ingress::Pending<u8, u16, Option<NonEmptyVec<u8>>>;
147
148 fn fetch(key: u8, subscriber: u16, targets: Option<NonEmptyVec<u8>>) -> TestMessage {
149 Message::Fetch(vec![FetchKey {
150 key,
151 subscribers: NonEmptyVec::new(subscriber),
152 metadata: targets,
153 }])
154 }
155
156 fn fetch_with_subscribers(
157 key: u8,
158 subscribers: Vec<u16>,
159 targets: Option<NonEmptyVec<u8>>,
160 ) -> TestMessage {
161 Message::Fetch(vec![FetchKey {
162 key,
163 subscribers: NonEmptyVec::from_unchecked(subscribers),
164 metadata: targets,
165 }])
166 }
167
168 fn subscriber_is(value: u16) -> impl Fn(&u8, &u16) -> bool + Send {
169 move |_, subscriber| *subscriber == value
170 }
171
172 fn targets(values: &[u8]) -> NonEmptyVec<u8> {
173 NonEmptyVec::from_unchecked(values.to_vec())
174 }
175
176 fn drain(pending: &mut TestPending) -> Vec<TestMessage> {
177 let mut messages = Vec::new();
178 Overflow::drain(pending, |message| {
179 messages.push(message);
180 None
181 });
182 messages
183 }
184
185 fn assert_fetch(message: &TestMessage, expected_key: u8, expected_targets: Option<&[u8]>) {
186 let Message::Fetch(keys) = message else {
187 panic!("expected fetch");
188 };
189 assert_eq!(keys.len(), 1);
190 assert_eq!(keys[0].key, expected_key);
191 match (&keys[0].metadata, expected_targets) {
192 (None, None) => {}
193 (Some(actual), Some(expected)) => assert_eq!(&actual[..], expected),
194 _ => panic!("unexpected targets"),
195 }
196 }
197
198 fn assert_fetch_keys(message: &TestMessage, expected: &[u8]) {
199 let Message::Fetch(keys) = message else {
200 panic!("expected fetch");
201 };
202 let actual: Vec<_> = keys.iter().map(|key| key.key).collect();
203 assert_eq!(actual, expected);
204 }
205
206 fn assert_fetch_subscribers(
207 message: &TestMessage,
208 expected_key: u8,
209 expected_subscribers: &[u16],
210 ) {
211 let Message::Fetch(keys) = message else {
212 panic!("expected fetch");
213 };
214 assert_eq!(keys.len(), 1);
215 assert_eq!(keys[0].key, expected_key);
216 assert_eq!(&keys[0].subscribers[..], expected_subscribers);
217 }
218
219 #[test]
220 fn targeted_fetches_for_same_key_are_merged() {
221 let mut pending = TestPending::default();
222
223 Policy::handle(&mut pending, fetch(1, 10, Some(targets(&[2, 3]))));
224 Policy::handle(&mut pending, fetch(1, 11, Some(targets(&[3, 4]))));
225
226 let messages = drain(&mut pending);
227 assert_eq!(messages.len(), 1);
228 assert_fetch(&messages[0], 1, Some(&[2, 3, 4]));
229 assert_fetch_subscribers(&messages[0], 1, &[10, 11]);
230 }
231
232 #[test]
233 fn duplicate_fetches_for_same_key_merge_subscribers() {
234 let mut pending = TestPending::default();
235
236 Policy::handle(&mut pending, fetch_with_subscribers(1, vec![10, 11], None));
237 Policy::handle(&mut pending, fetch_with_subscribers(1, vec![11, 12], None));
238
239 let messages = drain(&mut pending);
240 assert_eq!(messages.len(), 1);
241 assert_fetch_subscribers(&messages[0], 1, &[10, 11, 12]);
242 }
243
244 #[test]
245 fn unrestricted_fetch_dominates_targeted_fetches() {
246 let mut pending = TestPending::default();
247
248 Policy::handle(&mut pending, fetch(1, 10, Some(targets(&[2]))));
249 Policy::handle(&mut pending, fetch(1, 11, None));
250 Policy::handle(&mut pending, fetch(1, 12, Some(targets(&[3]))));
251
252 let messages = drain(&mut pending);
253 assert_eq!(messages.len(), 1);
254 assert_fetch(&messages[0], 1, None);
255 }
256
257 #[test]
258 fn retain_removes_fetches_for_dropped_subscribers() {
259 let mut pending = TestPending::default();
260
261 Policy::handle(&mut pending, fetch(1, 10, None));
262 Policy::handle(&mut pending, fetch(2, 11, None));
263 Policy::handle(
264 &mut pending,
265 Message::Retain {
266 predicate: Box::new(subscriber_is(11)),
267 },
268 );
269
270 let messages = drain(&mut pending);
271 assert_eq!(messages.len(), 2);
272 assert!(matches!(messages[0], Message::Retain { .. }));
273 assert_fetch(&messages[1], 2, None);
274 }
275
276 #[test]
277 fn retain_prunes_pending_fetch_subscribers() {
278 let mut pending = TestPending::default();
279
280 Policy::handle(&mut pending, fetch_with_subscribers(1, vec![10, 11], None));
281 Policy::handle(
282 &mut pending,
283 Message::Retain {
284 predicate: Box::new(subscriber_is(11)),
285 },
286 );
287
288 let messages = drain(&mut pending);
289 assert_eq!(messages.len(), 2);
290 assert!(matches!(messages[0], Message::Retain { .. }));
291 assert_fetch_subscribers(&messages[1], 1, &[11]);
292 }
293
294 #[test]
295 fn retain_drops_pending_fetch_when_all_subscribers_are_dropped() {
296 let mut pending = TestPending::default();
297
298 Policy::handle(&mut pending, fetch_with_subscribers(1, vec![10, 11], None));
299 Policy::handle(
300 &mut pending,
301 Message::Retain {
302 predicate: Box::new(subscriber_is(12)),
303 },
304 );
305
306 let messages = drain(&mut pending);
307 assert_eq!(messages.len(), 1);
308 assert!(matches!(messages[0], Message::Retain { .. }));
309 }
310
311 #[test]
312 fn fetch_after_retain_is_retained_when_subscriber_is_dropped() {
313 let mut pending = TestPending::default();
314
315 Policy::handle(
316 &mut pending,
317 Message::Retain {
318 predicate: Box::new(|_, subscriber| *subscriber != 10),
319 },
320 );
321 Policy::handle(&mut pending, fetch(1, 10, None));
322 Policy::handle(&mut pending, fetch(2, 11, None));
323
324 let messages = drain(&mut pending);
325 assert_eq!(messages.len(), 2);
326 assert!(matches!(messages[0], Message::Retain { .. }));
327 assert_fetch_keys(&messages[1], &[1, 2]);
328 }
329}