commonware-resolver 2026.5.0

Resolve data identified by a fixed-length key.
Documentation
use crate::{ingress, Fetch, Resolver, TargetedResolver};
use commonware_actor::{mailbox::Sender, Feedback};
use commonware_cryptography::PublicKey;
use commonware_utils::{vec::NonEmptyVec, Span};

/// A key to fetch data for, optionally with target peers.
pub type FetchKey<K, P, S> = ingress::FetchKey<K, S, Option<NonEmptyVec<P>>>;

/// Messages that can be sent to the peer actor.
pub type Message<K, P, S> = ingress::Message<K, S, Option<NonEmptyVec<P>>>;

fn fetch_key<K, P, S>(fetch: Fetch<K, S>, targets: Option<NonEmptyVec<P>>) -> FetchKey<K, P, S> {
    FetchKey {
        key: fetch.key,
        subscribers: NonEmptyVec::new(fetch.subscriber),
        metadata: targets,
    }
}

/// A way to send messages to the peer actor.
#[derive(Clone)]
pub struct Mailbox<K: Span, P: Eq, S: Eq = ()> {
    /// The channel that delivers messages to the peer actor.
    sender: Sender<Message<K, P, S>>,
}

impl<K: Span, P: Eq, S: Eq> Mailbox<K, P, S> {
    /// Create a new mailbox.
    pub(super) const fn new(sender: Sender<Message<K, P, S>>) -> Self {
        Self { sender }
    }
}

impl<K, P, S> Resolver for Mailbox<K, P, S>
where
    K: Span,
    P: PublicKey,
    S: Clone + Eq + Send + 'static,
{
    type Key = K;
    type Subscriber = S;

    /// Send a fetch to the peer actor.
    ///
    /// If a fetch is already in progress for this key, this clears any existing
    /// targets for that key (the fetch will try any available peer).
    ///
    /// If the engine has shut down, this is a no-op.
    fn fetch<D>(&mut self, key: D) -> Feedback
    where
        D: Into<Fetch<Self::Key, Self::Subscriber>> + Send,
    {
        let Fetch { key, subscriber } = key.into();
        self.sender.enqueue(Message::Fetch(vec![FetchKey {
            key,
            subscribers: NonEmptyVec::new(subscriber),
            metadata: None,
        }]))
    }

    /// Send fetches to the peer actor for a batch of keys.
    ///
    /// If a fetch is already in progress for any key, this clears any existing
    /// targets for that key (the fetch will try any available peer).
    ///
    /// If the engine has shut down, this is a no-op.
    fn fetch_all<D>(&mut self, keys: Vec<D>) -> Feedback
    where
        D: Into<Fetch<Self::Key, Self::Subscriber>> + Send,
    {
        self.sender.enqueue(Message::Fetch(
            keys.into_iter()
                .map(|key| fetch_key(key.into(), None))
                .collect(),
        ))
    }

    /// Send a retain request to the peer actor.
    ///
    /// If the engine has shut down, this is a no-op.
    fn retain(
        &mut self,
        predicate: impl Fn(&Self::Key, &Self::Subscriber) -> bool + Send + 'static,
    ) -> Feedback {
        self.sender.enqueue(Message::Retain {
            predicate: Box::new(predicate),
        })
    }
}

impl<K, P, S> TargetedResolver for Mailbox<K, P, S>
where
    K: Span,
    P: PublicKey,
    S: Clone + Eq + Send + 'static,
{
    type PublicKey = P;

    /// Send a targeted fetch to the peer actor.
    ///
    /// If a fetch is already in progress for this key:
    /// - If the existing fetch has targets, the new targets are added to the set.
    /// - If the existing fetch has no targets, it remains unrestricted.
    ///
    /// To clear targeting and fall back to any peer, call [`fetch`](Self::fetch).
    ///
    /// Targets are automatically cleared when the fetch succeeds or is canceled.
    /// When a peer is blocked for invalid data, only that peer is removed from
    /// the target set.
    ///
    /// If the engine has shut down, this is a no-op.
    fn fetch_targeted(
        &mut self,
        key: impl Into<Fetch<Self::Key, Self::Subscriber>> + Send,
        targets: NonEmptyVec<Self::PublicKey>,
    ) -> Feedback {
        let Fetch { key, subscriber } = key.into();
        self.sender.enqueue(Message::Fetch(vec![FetchKey {
            key,
            subscribers: NonEmptyVec::new(subscriber),
            metadata: Some(targets),
        }]))
    }

    /// Send targeted fetches to the peer actor for a batch of keys.
    ///
    /// If the engine has shut down, this is a no-op.
    fn fetch_all_targeted<D>(&mut self, keys: Vec<(D, NonEmptyVec<Self::PublicKey>)>) -> Feedback
    where
        D: Into<Fetch<Self::Key, Self::Subscriber>> + Send,
    {
        self.sender.enqueue(Message::Fetch(
            keys.into_iter()
                .map(|(key, targets)| fetch_key(key.into(), Some(targets)))
                .collect(),
        ))
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use commonware_actor::mailbox::{Overflow, Policy};

    type TestMessage = Message<u8, u8, u16>;
    type TestPending = ingress::Pending<u8, u16, Option<NonEmptyVec<u8>>>;

    fn fetch(key: u8, subscriber: u16, targets: Option<NonEmptyVec<u8>>) -> TestMessage {
        Message::Fetch(vec![FetchKey {
            key,
            subscribers: NonEmptyVec::new(subscriber),
            metadata: targets,
        }])
    }

    fn fetch_with_subscribers(
        key: u8,
        subscribers: Vec<u16>,
        targets: Option<NonEmptyVec<u8>>,
    ) -> TestMessage {
        Message::Fetch(vec![FetchKey {
            key,
            subscribers: NonEmptyVec::from_unchecked(subscribers),
            metadata: targets,
        }])
    }

    fn subscriber_is(value: u16) -> impl Fn(&u8, &u16) -> bool + Send {
        move |_, subscriber| *subscriber == value
    }

    fn targets(values: &[u8]) -> NonEmptyVec<u8> {
        NonEmptyVec::from_unchecked(values.to_vec())
    }

    fn drain(pending: &mut TestPending) -> Vec<TestMessage> {
        let mut messages = Vec::new();
        Overflow::drain(pending, |message| {
            messages.push(message);
            None
        });
        messages
    }

    fn assert_fetch(message: &TestMessage, expected_key: u8, expected_targets: Option<&[u8]>) {
        let Message::Fetch(keys) = message else {
            panic!("expected fetch");
        };
        assert_eq!(keys.len(), 1);
        assert_eq!(keys[0].key, expected_key);
        match (&keys[0].metadata, expected_targets) {
            (None, None) => {}
            (Some(actual), Some(expected)) => assert_eq!(&actual[..], expected),
            _ => panic!("unexpected targets"),
        }
    }

    fn assert_fetch_keys(message: &TestMessage, expected: &[u8]) {
        let Message::Fetch(keys) = message else {
            panic!("expected fetch");
        };
        let actual: Vec<_> = keys.iter().map(|key| key.key).collect();
        assert_eq!(actual, expected);
    }

    fn assert_fetch_subscribers(
        message: &TestMessage,
        expected_key: u8,
        expected_subscribers: &[u16],
    ) {
        let Message::Fetch(keys) = message else {
            panic!("expected fetch");
        };
        assert_eq!(keys.len(), 1);
        assert_eq!(keys[0].key, expected_key);
        assert_eq!(&keys[0].subscribers[..], expected_subscribers);
    }

    #[test]
    fn targeted_fetches_for_same_key_are_merged() {
        let mut pending = TestPending::default();

        Policy::handle(&mut pending, fetch(1, 10, Some(targets(&[2, 3]))));
        Policy::handle(&mut pending, fetch(1, 11, Some(targets(&[3, 4]))));

        let messages = drain(&mut pending);
        assert_eq!(messages.len(), 1);
        assert_fetch(&messages[0], 1, Some(&[2, 3, 4]));
        assert_fetch_subscribers(&messages[0], 1, &[10, 11]);
    }

    #[test]
    fn duplicate_fetches_for_same_key_merge_subscribers() {
        let mut pending = TestPending::default();

        Policy::handle(&mut pending, fetch_with_subscribers(1, vec![10, 11], None));
        Policy::handle(&mut pending, fetch_with_subscribers(1, vec![11, 12], None));

        let messages = drain(&mut pending);
        assert_eq!(messages.len(), 1);
        assert_fetch_subscribers(&messages[0], 1, &[10, 11, 12]);
    }

    #[test]
    fn unrestricted_fetch_dominates_targeted_fetches() {
        let mut pending = TestPending::default();

        Policy::handle(&mut pending, fetch(1, 10, Some(targets(&[2]))));
        Policy::handle(&mut pending, fetch(1, 11, None));
        Policy::handle(&mut pending, fetch(1, 12, Some(targets(&[3]))));

        let messages = drain(&mut pending);
        assert_eq!(messages.len(), 1);
        assert_fetch(&messages[0], 1, None);
    }

    #[test]
    fn retain_removes_fetches_for_dropped_subscribers() {
        let mut pending = TestPending::default();

        Policy::handle(&mut pending, fetch(1, 10, None));
        Policy::handle(&mut pending, fetch(2, 11, None));
        Policy::handle(
            &mut pending,
            Message::Retain {
                predicate: Box::new(subscriber_is(11)),
            },
        );

        let messages = drain(&mut pending);
        assert_eq!(messages.len(), 2);
        assert!(matches!(messages[0], Message::Retain { .. }));
        assert_fetch(&messages[1], 2, None);
    }

    #[test]
    fn retain_prunes_pending_fetch_subscribers() {
        let mut pending = TestPending::default();

        Policy::handle(&mut pending, fetch_with_subscribers(1, vec![10, 11], None));
        Policy::handle(
            &mut pending,
            Message::Retain {
                predicate: Box::new(subscriber_is(11)),
            },
        );

        let messages = drain(&mut pending);
        assert_eq!(messages.len(), 2);
        assert!(matches!(messages[0], Message::Retain { .. }));
        assert_fetch_subscribers(&messages[1], 1, &[11]);
    }

    #[test]
    fn retain_drops_pending_fetch_when_all_subscribers_are_dropped() {
        let mut pending = TestPending::default();

        Policy::handle(&mut pending, fetch_with_subscribers(1, vec![10, 11], None));
        Policy::handle(
            &mut pending,
            Message::Retain {
                predicate: Box::new(subscriber_is(12)),
            },
        );

        let messages = drain(&mut pending);
        assert_eq!(messages.len(), 1);
        assert!(matches!(messages[0], Message::Retain { .. }));
    }

    #[test]
    fn fetch_after_retain_is_retained_when_subscriber_is_dropped() {
        let mut pending = TestPending::default();

        Policy::handle(
            &mut pending,
            Message::Retain {
                predicate: Box::new(|_, subscriber| *subscriber != 10),
            },
        );
        Policy::handle(&mut pending, fetch(1, 10, None));
        Policy::handle(&mut pending, fetch(2, 11, None));

        let messages = drain(&mut pending);
        assert_eq!(messages.len(), 2);
        assert!(matches!(messages[0], Message::Retain { .. }));
        assert_fetch_keys(&messages[1], &[1, 2]);
    }
}