Skip to main content

commonware_resolver/p2p/
ingress.rs

1use crate::Resolver;
2use commonware_cryptography::PublicKey;
3use commonware_utils::{
4    channel::{fallible::AsyncFallibleExt, mpsc},
5    vec::NonEmptyVec,
6    Span,
7};
8
9type Predicate<K> = Box<dyn Fn(&K) -> bool + Send>;
10
11/// A request to fetch data for a key, optionally with target peers.
12pub struct FetchRequest<K, P> {
13    /// The key to fetch.
14    pub key: K,
15    /// Target peers to restrict the fetch to.
16    ///
17    /// - `None`: No targeting (or clear existing targeting), try any available peer
18    /// - `Some(peers)`: Only try the specified peers
19    pub targets: Option<NonEmptyVec<P>>,
20}
21
22/// Messages that can be sent to the peer actor.
23pub enum Message<K, P> {
24    /// Initiate fetch requests.
25    Fetch(Vec<FetchRequest<K, P>>),
26
27    /// Cancel a fetch request by key.
28    Cancel { key: K },
29
30    /// Cancel all fetch requests.
31    Clear,
32
33    /// Cancel all fetch requests that do not satisfy the predicate.
34    Retain { predicate: Predicate<K> },
35}
36
37/// A way to send messages to the peer actor.
38#[derive(Clone)]
39pub struct Mailbox<K, P> {
40    /// The channel that delivers messages to the peer actor.
41    sender: mpsc::Sender<Message<K, P>>,
42}
43
44impl<K, P> Mailbox<K, P> {
45    /// Create a new mailbox.
46    pub(super) const fn new(sender: mpsc::Sender<Message<K, P>>) -> Self {
47        Self { sender }
48    }
49}
50
51impl<K: Span, P: PublicKey> Resolver for Mailbox<K, P> {
52    type Key = K;
53    type PublicKey = P;
54
55    /// Send a fetch request to the peer actor.
56    ///
57    /// If a fetch is already in progress for this key, this clears any existing
58    /// targets for that key (the fetch will try any available peer).
59    ///
60    /// If the engine has shut down, this is a no-op.
61    async fn fetch(&mut self, key: Self::Key) {
62        self.sender
63            .send_lossy(Message::Fetch(vec![FetchRequest { key, targets: None }]))
64            .await;
65    }
66
67    /// Send a fetch request to the peer actor for a batch of keys.
68    ///
69    /// If a fetch is already in progress for any key, this clears any existing
70    /// targets for that key (the fetch will try any available peer).
71    ///
72    /// If the engine has shut down, this is a no-op.
73    async fn fetch_all(&mut self, keys: Vec<Self::Key>) {
74        self.sender
75            .send_lossy(Message::Fetch(
76                keys.into_iter()
77                    .map(|key| FetchRequest { key, targets: None })
78                    .collect(),
79            ))
80            .await;
81    }
82
83    /// Send a targeted fetch request to the peer actor.
84    ///
85    /// If the engine has shut down, this is a no-op.
86    async fn fetch_targeted(&mut self, key: Self::Key, targets: NonEmptyVec<Self::PublicKey>) {
87        self.sender
88            .send_lossy(Message::Fetch(vec![FetchRequest {
89                key,
90                targets: Some(targets),
91            }]))
92            .await;
93    }
94
95    /// Send targeted fetch requests to the peer actor for a batch of keys.
96    ///
97    /// If the engine has shut down, this is a no-op.
98    async fn fetch_all_targeted(
99        &mut self,
100        requests: Vec<(Self::Key, NonEmptyVec<Self::PublicKey>)>,
101    ) {
102        self.sender
103            .send_lossy(Message::Fetch(
104                requests
105                    .into_iter()
106                    .map(|(key, targets)| FetchRequest {
107                        key,
108                        targets: Some(targets),
109                    })
110                    .collect(),
111            ))
112            .await;
113    }
114
115    /// Send a cancel request to the peer actor.
116    ///
117    /// If the engine has shut down, this is a no-op.
118    async fn cancel(&mut self, key: Self::Key) {
119        self.sender.send_lossy(Message::Cancel { key }).await;
120    }
121
122    /// Send a retain request to the peer actor.
123    ///
124    /// If the engine has shut down, this is a no-op.
125    async fn retain(&mut self, predicate: impl Fn(&Self::Key) -> bool + Send + 'static) {
126        self.sender
127            .send_lossy(Message::Retain {
128                predicate: Box::new(predicate),
129            })
130            .await;
131    }
132
133    /// Send a clear request to the peer actor.
134    ///
135    /// If the engine has shut down, this is a no-op.
136    async fn clear(&mut self) {
137        self.sender.send_lossy(Message::Clear).await;
138    }
139}