commonware_resolver/p2p/ingress.rs
1use crate::Resolver;
2use commonware_cryptography::PublicKey;
3use commonware_utils::{
4 channel::{fallible::AsyncFallibleExt, mpsc},
5 vec::NonEmptyVec,
6 Span,
7};
8
9type Predicate<K> = Box<dyn Fn(&K) -> bool + Send>;
10
11/// A request to fetch data for a key, optionally with target peers.
12pub struct FetchRequest<K, P> {
13 /// The key to fetch.
14 pub key: K,
15 /// Target peers to restrict the fetch to.
16 ///
17 /// - `None`: No targeting (or clear existing targeting), try any available peer
18 /// - `Some(peers)`: Only try the specified peers
19 pub targets: Option<NonEmptyVec<P>>,
20}
21
22/// Messages that can be sent to the peer actor.
23pub enum Message<K, P> {
24 /// Initiate fetch requests.
25 Fetch(Vec<FetchRequest<K, P>>),
26
27 /// Cancel a fetch request by key.
28 Cancel { key: K },
29
30 /// Cancel all fetch requests.
31 Clear,
32
33 /// Cancel all fetch requests that do not satisfy the predicate.
34 Retain { predicate: Predicate<K> },
35}
36
37/// A way to send messages to the peer actor.
38#[derive(Clone)]
39pub struct Mailbox<K, P> {
40 /// The channel that delivers messages to the peer actor.
41 sender: mpsc::Sender<Message<K, P>>,
42}
43
44impl<K, P> Mailbox<K, P> {
45 /// Create a new mailbox.
46 pub(super) const fn new(sender: mpsc::Sender<Message<K, P>>) -> Self {
47 Self { sender }
48 }
49}
50
51impl<K: Span, P: PublicKey> Resolver for Mailbox<K, P> {
52 type Key = K;
53 type PublicKey = P;
54
55 /// Send a fetch request to the peer actor.
56 ///
57 /// If a fetch is already in progress for this key, this clears any existing
58 /// targets for that key (the fetch will try any available peer).
59 ///
60 /// If the engine has shut down, this is a no-op.
61 async fn fetch(&mut self, key: Self::Key) {
62 self.sender
63 .send_lossy(Message::Fetch(vec![FetchRequest { key, targets: None }]))
64 .await;
65 }
66
67 /// Send a fetch request to the peer actor for a batch of keys.
68 ///
69 /// If a fetch is already in progress for any key, this clears any existing
70 /// targets for that key (the fetch will try any available peer).
71 ///
72 /// If the engine has shut down, this is a no-op.
73 async fn fetch_all(&mut self, keys: Vec<Self::Key>) {
74 self.sender
75 .send_lossy(Message::Fetch(
76 keys.into_iter()
77 .map(|key| FetchRequest { key, targets: None })
78 .collect(),
79 ))
80 .await;
81 }
82
83 /// Send a targeted fetch request to the peer actor.
84 ///
85 /// If the engine has shut down, this is a no-op.
86 async fn fetch_targeted(&mut self, key: Self::Key, targets: NonEmptyVec<Self::PublicKey>) {
87 self.sender
88 .send_lossy(Message::Fetch(vec![FetchRequest {
89 key,
90 targets: Some(targets),
91 }]))
92 .await;
93 }
94
95 /// Send targeted fetch requests to the peer actor for a batch of keys.
96 ///
97 /// If the engine has shut down, this is a no-op.
98 async fn fetch_all_targeted(
99 &mut self,
100 requests: Vec<(Self::Key, NonEmptyVec<Self::PublicKey>)>,
101 ) {
102 self.sender
103 .send_lossy(Message::Fetch(
104 requests
105 .into_iter()
106 .map(|(key, targets)| FetchRequest {
107 key,
108 targets: Some(targets),
109 })
110 .collect(),
111 ))
112 .await;
113 }
114
115 /// Send a cancel request to the peer actor.
116 ///
117 /// If the engine has shut down, this is a no-op.
118 async fn cancel(&mut self, key: Self::Key) {
119 self.sender.send_lossy(Message::Cancel { key }).await;
120 }
121
122 /// Send a retain request to the peer actor.
123 ///
124 /// If the engine has shut down, this is a no-op.
125 async fn retain(&mut self, predicate: impl Fn(&Self::Key) -> bool + Send + 'static) {
126 self.sender
127 .send_lossy(Message::Retain {
128 predicate: Box::new(predicate),
129 })
130 .await;
131 }
132
133 /// Send a clear request to the peer actor.
134 ///
135 /// If the engine has shut down, this is a no-op.
136 async fn clear(&mut self) {
137 self.sender.send_lossy(Message::Clear).await;
138 }
139}