Skip to main content

commonware_resolver/p2p/
ingress.rs

1use crate::{ingress, Fetch, Resolver, TargetedResolver};
2use commonware_actor::{mailbox::Sender, Feedback};
3use commonware_cryptography::PublicKey;
4use commonware_utils::{vec::NonEmptyVec, Span};
5
6/// A key to fetch data for, optionally with target peers.
7pub type FetchKey<K, P, S> = ingress::FetchKey<K, S, Option<NonEmptyVec<P>>>;
8
9/// Messages that can be sent to the peer actor.
10pub type Message<K, P, S> = ingress::Message<K, S, Option<NonEmptyVec<P>>>;
11
12fn fetch_key<K, P, S>(fetch: Fetch<K, S>, targets: Option<NonEmptyVec<P>>) -> FetchKey<K, P, S> {
13    FetchKey {
14        key: fetch.key,
15        subscribers: NonEmptyVec::new(fetch.subscriber),
16        metadata: targets,
17    }
18}
19
20/// A way to send messages to the peer actor.
21#[derive(Clone)]
22pub struct Mailbox<K: Span, P: Eq, S: Eq = ()> {
23    /// The channel that delivers messages to the peer actor.
24    sender: Sender<Message<K, P, S>>,
25}
26
27impl<K: Span, P: Eq, S: Eq> Mailbox<K, P, S> {
28    /// Create a new mailbox.
29    pub(super) const fn new(sender: Sender<Message<K, P, S>>) -> Self {
30        Self { sender }
31    }
32}
33
34impl<K, P, S> Resolver for Mailbox<K, P, S>
35where
36    K: Span,
37    P: PublicKey,
38    S: Clone + Eq + Send + 'static,
39{
40    type Key = K;
41    type Subscriber = S;
42
43    /// Send a fetch to the peer actor.
44    ///
45    /// If a fetch is already in progress for this key, this clears any existing
46    /// targets for that key (the fetch will try any available peer).
47    ///
48    /// If the engine has shut down, this is a no-op.
49    fn fetch<D>(&mut self, key: D) -> Feedback
50    where
51        D: Into<Fetch<Self::Key, Self::Subscriber>> + Send,
52    {
53        let Fetch { key, subscriber } = key.into();
54        self.sender.enqueue(Message::Fetch(vec![FetchKey {
55            key,
56            subscribers: NonEmptyVec::new(subscriber),
57            metadata: None,
58        }]))
59    }
60
61    /// Send fetches to the peer actor for a batch of keys.
62    ///
63    /// If a fetch is already in progress for any key, this clears any existing
64    /// targets for that key (the fetch will try any available peer).
65    ///
66    /// If the engine has shut down, this is a no-op.
67    fn fetch_all<D>(&mut self, keys: Vec<D>) -> Feedback
68    where
69        D: Into<Fetch<Self::Key, Self::Subscriber>> + Send,
70    {
71        self.sender.enqueue(Message::Fetch(
72            keys.into_iter()
73                .map(|key| fetch_key(key.into(), None))
74                .collect(),
75        ))
76    }
77
78    /// Send a retain request to the peer actor.
79    ///
80    /// If the engine has shut down, this is a no-op.
81    fn retain(
82        &mut self,
83        predicate: impl Fn(&Self::Key, &Self::Subscriber) -> bool + Send + 'static,
84    ) -> Feedback {
85        self.sender.enqueue(Message::Retain {
86            predicate: Box::new(predicate),
87        })
88    }
89}
90
91impl<K, P, S> TargetedResolver for Mailbox<K, P, S>
92where
93    K: Span,
94    P: PublicKey,
95    S: Clone + Eq + Send + 'static,
96{
97    type PublicKey = P;
98
99    /// Send a targeted fetch to the peer actor.
100    ///
101    /// If a fetch is already in progress for this key:
102    /// - If the existing fetch has targets, the new targets are added to the set.
103    /// - If the existing fetch has no targets, it remains unrestricted.
104    ///
105    /// To clear targeting and fall back to any peer, call [`fetch`](Self::fetch).
106    ///
107    /// Targets are automatically cleared when the fetch succeeds or is canceled.
108    /// When a peer is blocked for invalid data, only that peer is removed from
109    /// the target set.
110    ///
111    /// If the engine has shut down, this is a no-op.
112    fn fetch_targeted(
113        &mut self,
114        key: impl Into<Fetch<Self::Key, Self::Subscriber>> + Send,
115        targets: NonEmptyVec<Self::PublicKey>,
116    ) -> Feedback {
117        let Fetch { key, subscriber } = key.into();
118        self.sender.enqueue(Message::Fetch(vec![FetchKey {
119            key,
120            subscribers: NonEmptyVec::new(subscriber),
121            metadata: Some(targets),
122        }]))
123    }
124
125    /// Send targeted fetches to the peer actor for a batch of keys.
126    ///
127    /// If the engine has shut down, this is a no-op.
128    fn fetch_all_targeted<D>(&mut self, keys: Vec<(D, NonEmptyVec<Self::PublicKey>)>) -> Feedback
129    where
130        D: Into<Fetch<Self::Key, Self::Subscriber>> + Send,
131    {
132        self.sender.enqueue(Message::Fetch(
133            keys.into_iter()
134                .map(|(key, targets)| fetch_key(key.into(), Some(targets)))
135                .collect(),
136        ))
137    }
138}
139
140#[cfg(test)]
141mod tests {
142    use super::*;
143    use commonware_actor::mailbox::{Overflow, Policy};
144
145    type TestMessage = Message<u8, u8, u16>;
146    type TestPending = ingress::Pending<u8, u16, Option<NonEmptyVec<u8>>>;
147
148    fn fetch(key: u8, subscriber: u16, targets: Option<NonEmptyVec<u8>>) -> TestMessage {
149        Message::Fetch(vec![FetchKey {
150            key,
151            subscribers: NonEmptyVec::new(subscriber),
152            metadata: targets,
153        }])
154    }
155
156    fn fetch_with_subscribers(
157        key: u8,
158        subscribers: Vec<u16>,
159        targets: Option<NonEmptyVec<u8>>,
160    ) -> TestMessage {
161        Message::Fetch(vec![FetchKey {
162            key,
163            subscribers: NonEmptyVec::from_unchecked(subscribers),
164            metadata: targets,
165        }])
166    }
167
168    fn subscriber_is(value: u16) -> impl Fn(&u8, &u16) -> bool + Send {
169        move |_, subscriber| *subscriber == value
170    }
171
172    fn targets(values: &[u8]) -> NonEmptyVec<u8> {
173        NonEmptyVec::from_unchecked(values.to_vec())
174    }
175
176    fn drain(pending: &mut TestPending) -> Vec<TestMessage> {
177        let mut messages = Vec::new();
178        Overflow::drain(pending, |message| {
179            messages.push(message);
180            None
181        });
182        messages
183    }
184
185    fn assert_fetch(message: &TestMessage, expected_key: u8, expected_targets: Option<&[u8]>) {
186        let Message::Fetch(keys) = message else {
187            panic!("expected fetch");
188        };
189        assert_eq!(keys.len(), 1);
190        assert_eq!(keys[0].key, expected_key);
191        match (&keys[0].metadata, expected_targets) {
192            (None, None) => {}
193            (Some(actual), Some(expected)) => assert_eq!(&actual[..], expected),
194            _ => panic!("unexpected targets"),
195        }
196    }
197
198    fn assert_fetch_keys(message: &TestMessage, expected: &[u8]) {
199        let Message::Fetch(keys) = message else {
200            panic!("expected fetch");
201        };
202        let actual: Vec<_> = keys.iter().map(|key| key.key).collect();
203        assert_eq!(actual, expected);
204    }
205
206    fn assert_fetch_subscribers(
207        message: &TestMessage,
208        expected_key: u8,
209        expected_subscribers: &[u16],
210    ) {
211        let Message::Fetch(keys) = message else {
212            panic!("expected fetch");
213        };
214        assert_eq!(keys.len(), 1);
215        assert_eq!(keys[0].key, expected_key);
216        assert_eq!(&keys[0].subscribers[..], expected_subscribers);
217    }
218
219    #[test]
220    fn targeted_fetches_for_same_key_are_merged() {
221        let mut pending = TestPending::default();
222
223        Policy::handle(&mut pending, fetch(1, 10, Some(targets(&[2, 3]))));
224        Policy::handle(&mut pending, fetch(1, 11, Some(targets(&[3, 4]))));
225
226        let messages = drain(&mut pending);
227        assert_eq!(messages.len(), 1);
228        assert_fetch(&messages[0], 1, Some(&[2, 3, 4]));
229        assert_fetch_subscribers(&messages[0], 1, &[10, 11]);
230    }
231
232    #[test]
233    fn duplicate_fetches_for_same_key_merge_subscribers() {
234        let mut pending = TestPending::default();
235
236        Policy::handle(&mut pending, fetch_with_subscribers(1, vec![10, 11], None));
237        Policy::handle(&mut pending, fetch_with_subscribers(1, vec![11, 12], None));
238
239        let messages = drain(&mut pending);
240        assert_eq!(messages.len(), 1);
241        assert_fetch_subscribers(&messages[0], 1, &[10, 11, 12]);
242    }
243
244    #[test]
245    fn unrestricted_fetch_dominates_targeted_fetches() {
246        let mut pending = TestPending::default();
247
248        Policy::handle(&mut pending, fetch(1, 10, Some(targets(&[2]))));
249        Policy::handle(&mut pending, fetch(1, 11, None));
250        Policy::handle(&mut pending, fetch(1, 12, Some(targets(&[3]))));
251
252        let messages = drain(&mut pending);
253        assert_eq!(messages.len(), 1);
254        assert_fetch(&messages[0], 1, None);
255    }
256
257    #[test]
258    fn retain_removes_fetches_for_dropped_subscribers() {
259        let mut pending = TestPending::default();
260
261        Policy::handle(&mut pending, fetch(1, 10, None));
262        Policy::handle(&mut pending, fetch(2, 11, None));
263        Policy::handle(
264            &mut pending,
265            Message::Retain {
266                predicate: Box::new(subscriber_is(11)),
267            },
268        );
269
270        let messages = drain(&mut pending);
271        assert_eq!(messages.len(), 2);
272        assert!(matches!(messages[0], Message::Retain { .. }));
273        assert_fetch(&messages[1], 2, None);
274    }
275
276    #[test]
277    fn retain_prunes_pending_fetch_subscribers() {
278        let mut pending = TestPending::default();
279
280        Policy::handle(&mut pending, fetch_with_subscribers(1, vec![10, 11], None));
281        Policy::handle(
282            &mut pending,
283            Message::Retain {
284                predicate: Box::new(subscriber_is(11)),
285            },
286        );
287
288        let messages = drain(&mut pending);
289        assert_eq!(messages.len(), 2);
290        assert!(matches!(messages[0], Message::Retain { .. }));
291        assert_fetch_subscribers(&messages[1], 1, &[11]);
292    }
293
294    #[test]
295    fn retain_drops_pending_fetch_when_all_subscribers_are_dropped() {
296        let mut pending = TestPending::default();
297
298        Policy::handle(&mut pending, fetch_with_subscribers(1, vec![10, 11], None));
299        Policy::handle(
300            &mut pending,
301            Message::Retain {
302                predicate: Box::new(subscriber_is(12)),
303            },
304        );
305
306        let messages = drain(&mut pending);
307        assert_eq!(messages.len(), 1);
308        assert!(matches!(messages[0], Message::Retain { .. }));
309    }
310
311    #[test]
312    fn fetch_after_retain_is_retained_when_subscriber_is_dropped() {
313        let mut pending = TestPending::default();
314
315        Policy::handle(
316            &mut pending,
317            Message::Retain {
318                predicate: Box::new(|_, subscriber| *subscriber != 10),
319            },
320        );
321        Policy::handle(&mut pending, fetch(1, 10, None));
322        Policy::handle(&mut pending, fetch(2, 11, None));
323
324        let messages = drain(&mut pending);
325        assert_eq!(messages.len(), 2);
326        assert!(matches!(messages[0], Message::Retain { .. }));
327        assert_fetch_keys(&messages[1], &[1, 2]);
328    }
329}