commonware_resolver/p2p/ingress.rs
1use crate::Resolver;
2use commonware_cryptography::PublicKey;
3use commonware_utils::{channels::fallible::AsyncFallibleExt, vec::NonEmptyVec, Span};
4use futures::channel::mpsc;
5
6type Predicate<K> = Box<dyn Fn(&K) -> bool + Send>;
7
8/// A request to fetch data for a key, optionally with target peers.
9pub struct FetchRequest<K, P> {
10 /// The key to fetch.
11 pub key: K,
12 /// Target peers to restrict the fetch to.
13 ///
14 /// - `None`: No targeting (or clear existing targeting), try any available peer
15 /// - `Some(peers)`: Only try the specified peers
16 pub targets: Option<NonEmptyVec<P>>,
17}
18
19/// Messages that can be sent to the peer actor.
20pub enum Message<K, P> {
21 /// Initiate fetch requests.
22 Fetch(Vec<FetchRequest<K, P>>),
23
24 /// Cancel a fetch request by key.
25 Cancel { key: K },
26
27 /// Cancel all fetch requests.
28 Clear,
29
30 /// Cancel all fetch requests that do not satisfy the predicate.
31 Retain { predicate: Predicate<K> },
32}
33
34/// A way to send messages to the peer actor.
35#[derive(Clone)]
36pub struct Mailbox<K, P> {
37 /// The channel that delivers messages to the peer actor.
38 sender: mpsc::Sender<Message<K, P>>,
39}
40
41impl<K, P> Mailbox<K, P> {
42 /// Create a new mailbox.
43 pub(super) const fn new(sender: mpsc::Sender<Message<K, P>>) -> Self {
44 Self { sender }
45 }
46}
47
48impl<K: Span, P: PublicKey> Resolver for Mailbox<K, P> {
49 type Key = K;
50 type PublicKey = P;
51
52 /// Send a fetch request to the peer actor.
53 ///
54 /// If a fetch is already in progress for this key, this clears any existing
55 /// targets for that key (the fetch will try any available peer).
56 ///
57 /// If the engine has shut down, this is a no-op.
58 async fn fetch(&mut self, key: Self::Key) {
59 self.sender
60 .send_lossy(Message::Fetch(vec![FetchRequest { key, targets: None }]))
61 .await;
62 }
63
64 /// Send a fetch request to the peer actor for a batch of keys.
65 ///
66 /// If a fetch is already in progress for any key, this clears any existing
67 /// targets for that key (the fetch will try any available peer).
68 ///
69 /// If the engine has shut down, this is a no-op.
70 async fn fetch_all(&mut self, keys: Vec<Self::Key>) {
71 self.sender
72 .send_lossy(Message::Fetch(
73 keys.into_iter()
74 .map(|key| FetchRequest { key, targets: None })
75 .collect(),
76 ))
77 .await;
78 }
79
80 /// Send a targeted fetch request to the peer actor.
81 ///
82 /// If the engine has shut down, this is a no-op.
83 async fn fetch_targeted(&mut self, key: Self::Key, targets: NonEmptyVec<Self::PublicKey>) {
84 self.sender
85 .send_lossy(Message::Fetch(vec![FetchRequest {
86 key,
87 targets: Some(targets),
88 }]))
89 .await;
90 }
91
92 /// Send targeted fetch requests to the peer actor for a batch of keys.
93 ///
94 /// If the engine has shut down, this is a no-op.
95 async fn fetch_all_targeted(
96 &mut self,
97 requests: Vec<(Self::Key, NonEmptyVec<Self::PublicKey>)>,
98 ) {
99 self.sender
100 .send_lossy(Message::Fetch(
101 requests
102 .into_iter()
103 .map(|(key, targets)| FetchRequest {
104 key,
105 targets: Some(targets),
106 })
107 .collect(),
108 ))
109 .await;
110 }
111
112 /// Send a cancel request to the peer actor.
113 ///
114 /// If the engine has shut down, this is a no-op.
115 async fn cancel(&mut self, key: Self::Key) {
116 self.sender.send_lossy(Message::Cancel { key }).await;
117 }
118
119 /// Send a retain request to the peer actor.
120 ///
121 /// If the engine has shut down, this is a no-op.
122 async fn retain(&mut self, predicate: impl Fn(&Self::Key) -> bool + Send + 'static) {
123 self.sender
124 .send_lossy(Message::Retain {
125 predicate: Box::new(predicate),
126 })
127 .await;
128 }
129
130 /// Send a clear request to the peer actor.
131 ///
132 /// If the engine has shut down, this is a no-op.
133 async fn clear(&mut self) {
134 self.sender.send_lossy(Message::Clear).await;
135 }
136}