use crate::Fetch;
use commonware_actor::mailbox::{Overflow, Policy};
use commonware_utils::vec::NonEmptyVec;
use std::collections::VecDeque;
pub type Predicate<K, S> = Box<dyn Fn(&K, &S) -> bool + Send>;
pub(crate) trait Metadata {
fn merge(&mut self, incoming: Self);
}
impl Metadata for () {
fn merge(&mut self, _incoming: Self) {}
}
impl<T: Eq> Metadata for Option<NonEmptyVec<T>> {
fn merge(&mut self, incoming: Self) {
let Some(incoming) = incoming else {
*self = None;
return;
};
let Some(existing) = self else {
return;
};
for item in incoming {
if !existing.contains(&item) {
existing.push(item);
}
}
}
}
pub struct FetchKey<K, S, M = ()> {
pub key: K,
pub subscribers: NonEmptyVec<S>,
pub metadata: M,
}
impl<K, S> From<Fetch<K, S>> for FetchKey<K, S> {
fn from(fetch: Fetch<K, S>) -> Self {
Self {
key: fetch.key,
subscribers: NonEmptyVec::new(fetch.subscriber),
metadata: (),
}
}
}
pub enum Message<K, S, M = ()> {
Fetch(Vec<FetchKey<K, S, M>>),
Retain {
predicate: Predicate<K, S>,
},
}
pub struct Pending<K, S, M = ()> {
modifications: VecDeque<Predicate<K, S>>,
fetches: Vec<FetchKey<K, S, M>>,
}
impl<K, S, M> Default for Pending<K, S, M> {
fn default() -> Self {
Self {
modifications: VecDeque::new(),
fetches: Vec::new(),
}
}
}
impl<K, S, M> Overflow<Message<K, S, M>> for Pending<K, S, M> {
fn is_empty(&self) -> bool {
self.modifications.is_empty() && self.fetches.is_empty()
}
fn drain<F>(&mut self, mut push: F)
where
F: FnMut(Message<K, S, M>) -> Option<Message<K, S, M>>,
{
while let Some(predicate) = self.modifications.pop_front() {
let message = Message::Retain { predicate };
if let Some(message) = push(message) {
self.push_front(message);
return;
}
}
if !self.fetches.is_empty() {
let fetches = std::mem::take(&mut self.fetches);
if let Some(message) = push(Message::Fetch(fetches)) {
self.push_front(message);
}
}
}
}
impl<K, S, M> Pending<K, S, M> {
fn push_front(&mut self, message: Message<K, S, M>) {
match message {
Message::Fetch(fetches) => {
self.fetches.splice(0..0, fetches);
}
Message::Retain { predicate } => {
self.modifications.push_front(predicate);
}
}
}
}
fn retain_fetch<K, S, M>(
mut fetch: FetchKey<K, S, M>,
predicate: &(dyn Fn(&K, &S) -> bool + Send),
) -> Option<FetchKey<K, S, M>> {
let mut subscribers = fetch.subscribers.into_vec();
subscribers.retain(|subscriber| predicate(&fetch.key, subscriber));
fetch.subscribers = NonEmptyVec::try_from(subscribers).ok()?;
Some(fetch)
}
fn merge_subscribers<S: Eq>(existing: &mut NonEmptyVec<S>, incoming: NonEmptyVec<S>) {
for subscriber in incoming {
if !existing.contains(&subscriber) {
existing.push(subscriber);
}
}
}
impl<K, S, M> Policy for Message<K, S, M>
where
K: Clone + Eq,
S: Eq,
M: Metadata,
{
type Overflow = Pending<K, S, M>;
fn handle(overflow: &mut Pending<K, S, M>, message: Self) {
match message {
Self::Fetch(fetches) => {
for fetch in fetches {
if let Some(existing) = overflow
.fetches
.iter_mut()
.find(|existing| existing.key == fetch.key)
{
merge_subscribers(&mut existing.subscribers, fetch.subscribers);
existing.metadata.merge(fetch.metadata);
} else {
overflow.fetches.push(fetch);
}
}
}
Self::Retain { predicate } => {
overflow.fetches = std::mem::take(&mut overflow.fetches)
.into_iter()
.filter_map(|fetch| retain_fetch(fetch, predicate.as_ref()))
.collect();
overflow.modifications.push_back(predicate);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use commonware_utils::non_empty_vec;
type TestMessage = Message<u8, u16>;
type TestPending = Pending<u8, u16>;
fn fetch(key: u8, subscriber: u16) -> TestMessage {
Message::Fetch(vec![FetchKey {
key,
subscribers: NonEmptyVec::new(subscriber),
metadata: (),
}])
}
fn fetch_with_subscribers(key: u8, subscribers: Vec<u16>) -> TestMessage {
Message::Fetch(vec![FetchKey {
key,
subscribers: NonEmptyVec::from_unchecked(subscribers),
metadata: (),
}])
}
fn fetch_with_metadata(
key: u8,
subscriber: u16,
metadata: Option<NonEmptyVec<u8>>,
) -> Message<u8, u16, Option<NonEmptyVec<u8>>> {
Message::Fetch(vec![FetchKey {
key,
subscribers: NonEmptyVec::new(subscriber),
metadata,
}])
}
fn subscriber_is(value: u16) -> impl Fn(&u8, &u16) -> bool + Send {
move |_, subscriber| *subscriber == value
}
fn drain(pending: &mut TestPending) -> Vec<TestMessage> {
let mut messages = Vec::new();
Overflow::drain(pending, |message| {
messages.push(message);
None
});
messages
}
fn assert_fetch_subscribers(
message: &TestMessage,
expected_key: u8,
expected_subscribers: &[u16],
) {
let Message::Fetch(fetches) = message else {
panic!("expected fetch");
};
assert_eq!(fetches.len(), 1);
assert_eq!(fetches[0].key, expected_key);
assert_eq!(&fetches[0].subscribers[..], expected_subscribers);
}
#[test]
fn duplicate_fetches_for_same_key_merge_subscribers() {
let mut pending = TestPending::default();
Policy::handle(&mut pending, fetch_with_subscribers(1, vec![10, 11]));
Policy::handle(&mut pending, fetch_with_subscribers(1, vec![11, 12]));
let messages = drain(&mut pending);
assert_eq!(messages.len(), 1);
assert_fetch_subscribers(&messages[0], 1, &[10, 11, 12]);
}
#[test]
fn retain_prunes_pending_fetch_subscribers() {
let mut pending = TestPending::default();
Policy::handle(&mut pending, fetch_with_subscribers(1, vec![10, 11]));
Policy::handle(
&mut pending,
Message::Retain {
predicate: Box::new(subscriber_is(11)),
},
);
let messages = drain(&mut pending);
assert_eq!(messages.len(), 2);
assert!(matches!(messages[0], Message::Retain { .. }));
assert_fetch_subscribers(&messages[1], 1, &[11]);
}
#[test]
fn retain_drops_pending_fetch_when_all_subscribers_are_dropped() {
let mut pending = TestPending::default();
Policy::handle(&mut pending, fetch_with_subscribers(1, vec![10, 11]));
Policy::handle(
&mut pending,
Message::Retain {
predicate: Box::new(subscriber_is(12)),
},
);
let messages = drain(&mut pending);
assert_eq!(messages.len(), 1);
assert!(matches!(messages[0], Message::Retain { .. }));
}
#[test]
fn retain_messages_drain_before_fetches() {
let mut pending = TestPending::default();
Policy::handle(&mut pending, fetch(1, 10));
Policy::handle(
&mut pending,
Message::Retain {
predicate: Box::new(|_, _| true),
},
);
let messages = drain(&mut pending);
assert_eq!(messages.len(), 2);
assert!(matches!(messages[0], Message::Retain { .. }));
assert_fetch_subscribers(&messages[1], 1, &[10]);
}
#[test]
fn from_fetch_creates_single_subscriber_fetch_key() {
let fetch = Fetch {
key: 7,
subscriber: 8,
};
let key = FetchKey::from(fetch);
assert_eq!(key.key, 7);
assert_eq!(key.subscribers, non_empty_vec![8]);
}
#[test]
fn duplicate_fetches_merge_metadata() {
let mut pending = Pending::default();
Policy::handle(
&mut pending,
fetch_with_metadata(1, 10, Some(non_empty_vec![2, 3])),
);
Policy::handle(
&mut pending,
fetch_with_metadata(1, 11, Some(non_empty_vec![3, 4])),
);
let mut messages = Vec::new();
Overflow::drain(&mut pending, |message| {
messages.push(message);
None
});
assert_eq!(messages.len(), 1);
let Message::Fetch(fetches) = messages.pop().unwrap() else {
panic!("expected fetch");
};
assert_eq!(fetches.len(), 1);
assert_eq!(fetches[0].metadata, Some(non_empty_vec![2, 3, 4]));
}
#[test]
fn unrestricted_metadata_dominates_duplicate_fetches() {
let mut pending = Pending::default();
Policy::handle(
&mut pending,
fetch_with_metadata(1, 10, Some(non_empty_vec![2])),
);
Policy::handle(&mut pending, fetch_with_metadata(1, 11, None));
Policy::handle(
&mut pending,
fetch_with_metadata(1, 12, Some(non_empty_vec![3])),
);
let mut messages = Vec::new();
Overflow::drain(&mut pending, |message| {
messages.push(message);
None
});
assert_eq!(messages.len(), 1);
let Message::Fetch(fetches) = messages.pop().unwrap() else {
panic!("expected fetch");
};
assert_eq!(fetches.len(), 1);
assert!(fetches[0].metadata.is_none());
}
}