commonware_resolver/p2p/
ingress.rs

1use crate::Resolver;
2use commonware_cryptography::PublicKey;
3use commonware_utils::{channels::fallible::AsyncFallibleExt, vec::NonEmptyVec, Span};
4use futures::channel::mpsc;
5
6type Predicate<K> = Box<dyn Fn(&K) -> bool + Send>;
7
8/// A request to fetch data for a key, optionally with target peers.
9pub struct FetchRequest<K, P> {
10    /// The key to fetch.
11    pub key: K,
12    /// Target peers to restrict the fetch to.
13    ///
14    /// - `None`: No targeting (or clear existing targeting), try any available peer
15    /// - `Some(peers)`: Only try the specified peers
16    pub targets: Option<NonEmptyVec<P>>,
17}
18
19/// Messages that can be sent to the peer actor.
20pub enum Message<K, P> {
21    /// Initiate fetch requests.
22    Fetch(Vec<FetchRequest<K, P>>),
23
24    /// Cancel a fetch request by key.
25    Cancel { key: K },
26
27    /// Cancel all fetch requests.
28    Clear,
29
30    /// Cancel all fetch requests that do not satisfy the predicate.
31    Retain { predicate: Predicate<K> },
32}
33
34/// A way to send messages to the peer actor.
35#[derive(Clone)]
36pub struct Mailbox<K, P> {
37    /// The channel that delivers messages to the peer actor.
38    sender: mpsc::Sender<Message<K, P>>,
39}
40
41impl<K, P> Mailbox<K, P> {
42    /// Create a new mailbox.
43    pub(super) const fn new(sender: mpsc::Sender<Message<K, P>>) -> Self {
44        Self { sender }
45    }
46}
47
48impl<K: Span, P: PublicKey> Resolver for Mailbox<K, P> {
49    type Key = K;
50    type PublicKey = P;
51
52    /// Send a fetch request to the peer actor.
53    ///
54    /// If a fetch is already in progress for this key, this clears any existing
55    /// targets for that key (the fetch will try any available peer).
56    ///
57    /// If the engine has shut down, this is a no-op.
58    async fn fetch(&mut self, key: Self::Key) {
59        self.sender
60            .send_lossy(Message::Fetch(vec![FetchRequest { key, targets: None }]))
61            .await;
62    }
63
64    /// Send a fetch request to the peer actor for a batch of keys.
65    ///
66    /// If a fetch is already in progress for any key, this clears any existing
67    /// targets for that key (the fetch will try any available peer).
68    ///
69    /// If the engine has shut down, this is a no-op.
70    async fn fetch_all(&mut self, keys: Vec<Self::Key>) {
71        self.sender
72            .send_lossy(Message::Fetch(
73                keys.into_iter()
74                    .map(|key| FetchRequest { key, targets: None })
75                    .collect(),
76            ))
77            .await;
78    }
79
80    /// Send a targeted fetch request to the peer actor.
81    ///
82    /// If the engine has shut down, this is a no-op.
83    async fn fetch_targeted(&mut self, key: Self::Key, targets: NonEmptyVec<Self::PublicKey>) {
84        self.sender
85            .send_lossy(Message::Fetch(vec![FetchRequest {
86                key,
87                targets: Some(targets),
88            }]))
89            .await;
90    }
91
92    /// Send targeted fetch requests to the peer actor for a batch of keys.
93    ///
94    /// If the engine has shut down, this is a no-op.
95    async fn fetch_all_targeted(
96        &mut self,
97        requests: Vec<(Self::Key, NonEmptyVec<Self::PublicKey>)>,
98    ) {
99        self.sender
100            .send_lossy(Message::Fetch(
101                requests
102                    .into_iter()
103                    .map(|(key, targets)| FetchRequest {
104                        key,
105                        targets: Some(targets),
106                    })
107                    .collect(),
108            ))
109            .await;
110    }
111
112    /// Send a cancel request to the peer actor.
113    ///
114    /// If the engine has shut down, this is a no-op.
115    async fn cancel(&mut self, key: Self::Key) {
116        self.sender.send_lossy(Message::Cancel { key }).await;
117    }
118
119    /// Send a retain request to the peer actor.
120    ///
121    /// If the engine has shut down, this is a no-op.
122    async fn retain(&mut self, predicate: impl Fn(&Self::Key) -> bool + Send + 'static) {
123        self.sender
124            .send_lossy(Message::Retain {
125                predicate: Box::new(predicate),
126            })
127            .await;
128    }
129
130    /// Send a clear request to the peer actor.
131    ///
132    /// If the engine has shut down, this is a no-op.
133    async fn clear(&mut self) {
134        self.sender.send_lossy(Message::Clear).await;
135    }
136}