use crate::{ingress, Fetch, Resolver, TargetedResolver};
use commonware_actor::{mailbox::Sender, Feedback};
use commonware_cryptography::PublicKey;
use commonware_utils::{vec::NonEmptyVec, Span};
pub type FetchKey<K, P, S> = ingress::FetchKey<K, S, Option<NonEmptyVec<P>>>;
pub type Message<K, P, S> = ingress::Message<K, S, Option<NonEmptyVec<P>>>;
fn fetch_key<K, P, S>(fetch: Fetch<K, S>, targets: Option<NonEmptyVec<P>>) -> FetchKey<K, P, S> {
FetchKey {
key: fetch.key,
subscribers: NonEmptyVec::new(fetch.subscriber),
metadata: targets,
}
}
#[derive(Clone)]
pub struct Mailbox<K: Span, P: Eq, S: Eq = ()> {
sender: Sender<Message<K, P, S>>,
}
impl<K: Span, P: Eq, S: Eq> Mailbox<K, P, S> {
pub(super) const fn new(sender: Sender<Message<K, P, S>>) -> Self {
Self { sender }
}
}
impl<K, P, S> Resolver for Mailbox<K, P, S>
where
K: Span,
P: PublicKey,
S: Clone + Eq + Send + 'static,
{
type Key = K;
type Subscriber = S;
fn fetch<D>(&mut self, key: D) -> Feedback
where
D: Into<Fetch<Self::Key, Self::Subscriber>> + Send,
{
let Fetch { key, subscriber } = key.into();
self.sender.enqueue(Message::Fetch(vec![FetchKey {
key,
subscribers: NonEmptyVec::new(subscriber),
metadata: None,
}]))
}
fn fetch_all<D>(&mut self, keys: Vec<D>) -> Feedback
where
D: Into<Fetch<Self::Key, Self::Subscriber>> + Send,
{
self.sender.enqueue(Message::Fetch(
keys.into_iter()
.map(|key| fetch_key(key.into(), None))
.collect(),
))
}
fn retain(
&mut self,
predicate: impl Fn(&Self::Key, &Self::Subscriber) -> bool + Send + 'static,
) -> Feedback {
self.sender.enqueue(Message::Retain {
predicate: Box::new(predicate),
})
}
}
impl<K, P, S> TargetedResolver for Mailbox<K, P, S>
where
K: Span,
P: PublicKey,
S: Clone + Eq + Send + 'static,
{
type PublicKey = P;
fn fetch_targeted(
&mut self,
key: impl Into<Fetch<Self::Key, Self::Subscriber>> + Send,
targets: NonEmptyVec<Self::PublicKey>,
) -> Feedback {
let Fetch { key, subscriber } = key.into();
self.sender.enqueue(Message::Fetch(vec![FetchKey {
key,
subscribers: NonEmptyVec::new(subscriber),
metadata: Some(targets),
}]))
}
fn fetch_all_targeted<D>(&mut self, keys: Vec<(D, NonEmptyVec<Self::PublicKey>)>) -> Feedback
where
D: Into<Fetch<Self::Key, Self::Subscriber>> + Send,
{
self.sender.enqueue(Message::Fetch(
keys.into_iter()
.map(|(key, targets)| fetch_key(key.into(), Some(targets)))
.collect(),
))
}
}
#[cfg(test)]
mod tests {
use super::*;
use commonware_actor::mailbox::{Overflow, Policy};
type TestMessage = Message<u8, u8, u16>;
type TestPending = ingress::Pending<u8, u16, Option<NonEmptyVec<u8>>>;
fn fetch(key: u8, subscriber: u16, targets: Option<NonEmptyVec<u8>>) -> TestMessage {
Message::Fetch(vec![FetchKey {
key,
subscribers: NonEmptyVec::new(subscriber),
metadata: targets,
}])
}
fn fetch_with_subscribers(
key: u8,
subscribers: Vec<u16>,
targets: Option<NonEmptyVec<u8>>,
) -> TestMessage {
Message::Fetch(vec![FetchKey {
key,
subscribers: NonEmptyVec::from_unchecked(subscribers),
metadata: targets,
}])
}
fn subscriber_is(value: u16) -> impl Fn(&u8, &u16) -> bool + Send {
move |_, subscriber| *subscriber == value
}
fn targets(values: &[u8]) -> NonEmptyVec<u8> {
NonEmptyVec::from_unchecked(values.to_vec())
}
fn drain(pending: &mut TestPending) -> Vec<TestMessage> {
let mut messages = Vec::new();
Overflow::drain(pending, |message| {
messages.push(message);
None
});
messages
}
fn assert_fetch(message: &TestMessage, expected_key: u8, expected_targets: Option<&[u8]>) {
let Message::Fetch(keys) = message else {
panic!("expected fetch");
};
assert_eq!(keys.len(), 1);
assert_eq!(keys[0].key, expected_key);
match (&keys[0].metadata, expected_targets) {
(None, None) => {}
(Some(actual), Some(expected)) => assert_eq!(&actual[..], expected),
_ => panic!("unexpected targets"),
}
}
fn assert_fetch_keys(message: &TestMessage, expected: &[u8]) {
let Message::Fetch(keys) = message else {
panic!("expected fetch");
};
let actual: Vec<_> = keys.iter().map(|key| key.key).collect();
assert_eq!(actual, expected);
}
fn assert_fetch_subscribers(
message: &TestMessage,
expected_key: u8,
expected_subscribers: &[u16],
) {
let Message::Fetch(keys) = message else {
panic!("expected fetch");
};
assert_eq!(keys.len(), 1);
assert_eq!(keys[0].key, expected_key);
assert_eq!(&keys[0].subscribers[..], expected_subscribers);
}
#[test]
fn targeted_fetches_for_same_key_are_merged() {
let mut pending = TestPending::default();
Policy::handle(&mut pending, fetch(1, 10, Some(targets(&[2, 3]))));
Policy::handle(&mut pending, fetch(1, 11, Some(targets(&[3, 4]))));
let messages = drain(&mut pending);
assert_eq!(messages.len(), 1);
assert_fetch(&messages[0], 1, Some(&[2, 3, 4]));
assert_fetch_subscribers(&messages[0], 1, &[10, 11]);
}
#[test]
fn duplicate_fetches_for_same_key_merge_subscribers() {
let mut pending = TestPending::default();
Policy::handle(&mut pending, fetch_with_subscribers(1, vec![10, 11], None));
Policy::handle(&mut pending, fetch_with_subscribers(1, vec![11, 12], None));
let messages = drain(&mut pending);
assert_eq!(messages.len(), 1);
assert_fetch_subscribers(&messages[0], 1, &[10, 11, 12]);
}
#[test]
fn unrestricted_fetch_dominates_targeted_fetches() {
let mut pending = TestPending::default();
Policy::handle(&mut pending, fetch(1, 10, Some(targets(&[2]))));
Policy::handle(&mut pending, fetch(1, 11, None));
Policy::handle(&mut pending, fetch(1, 12, Some(targets(&[3]))));
let messages = drain(&mut pending);
assert_eq!(messages.len(), 1);
assert_fetch(&messages[0], 1, None);
}
#[test]
fn retain_removes_fetches_for_dropped_subscribers() {
let mut pending = TestPending::default();
Policy::handle(&mut pending, fetch(1, 10, None));
Policy::handle(&mut pending, fetch(2, 11, None));
Policy::handle(
&mut pending,
Message::Retain {
predicate: Box::new(subscriber_is(11)),
},
);
let messages = drain(&mut pending);
assert_eq!(messages.len(), 2);
assert!(matches!(messages[0], Message::Retain { .. }));
assert_fetch(&messages[1], 2, None);
}
#[test]
fn retain_prunes_pending_fetch_subscribers() {
let mut pending = TestPending::default();
Policy::handle(&mut pending, fetch_with_subscribers(1, vec![10, 11], None));
Policy::handle(
&mut pending,
Message::Retain {
predicate: Box::new(subscriber_is(11)),
},
);
let messages = drain(&mut pending);
assert_eq!(messages.len(), 2);
assert!(matches!(messages[0], Message::Retain { .. }));
assert_fetch_subscribers(&messages[1], 1, &[11]);
}
#[test]
fn retain_drops_pending_fetch_when_all_subscribers_are_dropped() {
let mut pending = TestPending::default();
Policy::handle(&mut pending, fetch_with_subscribers(1, vec![10, 11], None));
Policy::handle(
&mut pending,
Message::Retain {
predicate: Box::new(subscriber_is(12)),
},
);
let messages = drain(&mut pending);
assert_eq!(messages.len(), 1);
assert!(matches!(messages[0], Message::Retain { .. }));
}
#[test]
fn fetch_after_retain_is_retained_when_subscriber_is_dropped() {
let mut pending = TestPending::default();
Policy::handle(
&mut pending,
Message::Retain {
predicate: Box::new(|_, subscriber| *subscriber != 10),
},
);
Policy::handle(&mut pending, fetch(1, 10, None));
Policy::handle(&mut pending, fetch(2, 11, None));
let messages = drain(&mut pending);
assert_eq!(messages.len(), 2);
assert!(matches!(messages[0], Message::Retain { .. }));
assert_fetch_keys(&messages[1], &[1, 2]);
}
}