commonware_resolver/p2p/
ingress.rs

1use crate::Resolver;
2use commonware_utils::Span;
3use futures::{channel::mpsc, SinkExt};
4
5type Predicate<K> = Box<dyn Fn(&K) -> bool + Send>;
6
7/// Messages that can be sent to the peer actor.
8pub enum Message<K> {
9    /// Initiate a fetch request by key.
10    Fetch { key: K },
11
12    /// Cancel a fetch request by key.
13    Cancel { key: K },
14
15    /// Cancel all fetch requests.
16    Clear,
17
18    /// Cancel all fetch requests that do not satisfy the predicate.
19    Retain { predicate: Predicate<K> },
20}
21
22/// A way to send messages to the peer actor.
23#[derive(Clone)]
24pub struct Mailbox<K> {
25    /// The channel that delivers messages to the peer actor.
26    sender: mpsc::Sender<Message<K>>,
27}
28
29impl<K> Mailbox<K> {
30    /// Create a new mailbox.
31    pub(super) fn new(sender: mpsc::Sender<Message<K>>) -> Self {
32        Self { sender }
33    }
34}
35
36impl<K: Span> Resolver for Mailbox<K> {
37    type Key = K;
38
39    /// Send a fetch request to the peer actor.
40    ///
41    /// Panics if the send fails.
42    async fn fetch(&mut self, key: Self::Key) {
43        self.sender
44            .send(Message::Fetch { key })
45            .await
46            .expect("Failed to send fetch");
47    }
48
49    /// Send a cancel request to the peer actor.
50    ///
51    /// Panics if the send fails.
52    async fn cancel(&mut self, key: Self::Key) {
53        self.sender
54            .send(Message::Cancel { key })
55            .await
56            .expect("Failed to send cancel_fetch");
57    }
58
59    /// Send a cancel all request to the peer actor.
60    ///
61    /// Panics if the send fails.
62    async fn retain(&mut self, predicate: impl Fn(&Self::Key) -> bool + Send + 'static) {
63        self.sender
64            .send(Message::Retain {
65                predicate: Box::new(predicate),
66            })
67            .await
68            .expect("Failed to send retain");
69    }
70
71    /// Send a clear request to the peer actor.
72    ///
73    /// Panics if the send fails.
74    async fn clear(&mut self) {
75        self.sender
76            .send(Message::Clear)
77            .await
78            .expect("Failed to send cancel_all");
79    }
80}