commonware_resolver/p2p/
ingress.rs

1use crate::Resolver;
2use commonware_cryptography::PublicKey;
3use commonware_utils::{vec::NonEmptyVec, Span};
4use futures::{channel::mpsc, SinkExt};
5
6type Predicate<K> = Box<dyn Fn(&K) -> bool + Send>;
7
8/// A request to fetch data for a key, optionally with target peers.
9pub struct FetchRequest<K, P> {
10    /// The key to fetch.
11    pub key: K,
12    /// Target peers to restrict the fetch to.
13    ///
14    /// - `None`: No targeting (or clear existing targeting), try any available peer
15    /// - `Some(peers)`: Only try the specified peers
16    pub targets: Option<NonEmptyVec<P>>,
17}
18
19/// Messages that can be sent to the peer actor.
20pub enum Message<K, P> {
21    /// Initiate fetch requests.
22    Fetch(Vec<FetchRequest<K, P>>),
23
24    /// Cancel a fetch request by key.
25    Cancel { key: K },
26
27    /// Cancel all fetch requests.
28    Clear,
29
30    /// Cancel all fetch requests that do not satisfy the predicate.
31    Retain { predicate: Predicate<K> },
32}
33
34/// A way to send messages to the peer actor.
35#[derive(Clone)]
36pub struct Mailbox<K, P> {
37    /// The channel that delivers messages to the peer actor.
38    sender: mpsc::Sender<Message<K, P>>,
39}
40
41impl<K, P> Mailbox<K, P> {
42    /// Create a new mailbox.
43    pub(super) const fn new(sender: mpsc::Sender<Message<K, P>>) -> Self {
44        Self { sender }
45    }
46}
47
48impl<K: Span, P: PublicKey> Resolver for Mailbox<K, P> {
49    type Key = K;
50    type PublicKey = P;
51
52    /// Send a fetch request to the peer actor.
53    ///
54    /// If a fetch is already in progress for this key, this clears any existing
55    /// targets for that key (the fetch will try any available peer).
56    ///
57    /// Panics if the send fails.
58    async fn fetch(&mut self, key: Self::Key) {
59        self.sender
60            .send(Message::Fetch(vec![FetchRequest { key, targets: None }]))
61            .await
62            .expect("Failed to send fetch");
63    }
64
65    /// Send a fetch request to the peer actor for a batch of keys.
66    ///
67    /// If a fetch is already in progress for any key, this clears any existing
68    /// targets for that key (the fetch will try any available peer).
69    ///
70    /// Panics if the send fails.
71    async fn fetch_all(&mut self, keys: Vec<Self::Key>) {
72        self.sender
73            .send(Message::Fetch(
74                keys.into_iter()
75                    .map(|key| FetchRequest { key, targets: None })
76                    .collect(),
77            ))
78            .await
79            .expect("Failed to send fetch_all");
80    }
81
82    async fn fetch_targeted(&mut self, key: Self::Key, targets: NonEmptyVec<Self::PublicKey>) {
83        self.sender
84            .send(Message::Fetch(vec![FetchRequest {
85                key,
86                targets: Some(targets),
87            }]))
88            .await
89            .expect("Failed to send fetch_targeted");
90    }
91
92    async fn fetch_all_targeted(
93        &mut self,
94        requests: Vec<(Self::Key, NonEmptyVec<Self::PublicKey>)>,
95    ) {
96        self.sender
97            .send(Message::Fetch(
98                requests
99                    .into_iter()
100                    .map(|(key, targets)| FetchRequest {
101                        key,
102                        targets: Some(targets),
103                    })
104                    .collect(),
105            ))
106            .await
107            .expect("Failed to send fetch_all_targeted");
108    }
109
110    /// Send a cancel request to the peer actor.
111    ///
112    /// Panics if the send fails.
113    async fn cancel(&mut self, key: Self::Key) {
114        self.sender
115            .send(Message::Cancel { key })
116            .await
117            .expect("Failed to send cancel_fetch");
118    }
119
120    /// Send a cancel all request to the peer actor.
121    ///
122    /// Panics if the send fails.
123    async fn retain(&mut self, predicate: impl Fn(&Self::Key) -> bool + Send + 'static) {
124        self.sender
125            .send(Message::Retain {
126                predicate: Box::new(predicate),
127            })
128            .await
129            .expect("Failed to send retain");
130    }
131
132    /// Send a clear request to the peer actor.
133    ///
134    /// Panics if the send fails.
135    async fn clear(&mut self) {
136        self.sender
137            .send(Message::Clear)
138            .await
139            .expect("Failed to send cancel_all");
140    }
141}