use crate::{Delivery, Span};
use commonware_utils::channel::{fallible::FallibleExt, mpsc, oneshot};
use std::{collections::HashMap, marker::PhantomData};
#[derive(Clone)]
pub struct Consumer<R: Span, V, S = ()> {
sender: mpsc::UnboundedSender<(R, V)>,
expected: HashMap<R, V>,
_subscriber: PhantomData<S>,
}
impl<R: Span, V: Clone + PartialEq, S> Consumer<R, V, S> {
pub fn new() -> (Self, mpsc::UnboundedReceiver<(R, V)>) {
let (sender, receiver) = mpsc::unbounded_channel();
(
Self {
sender,
expected: HashMap::new(),
_subscriber: PhantomData,
},
receiver,
)
}
pub fn add_expected(&mut self, k: R, v: V) {
self.expected.insert(k, v);
}
pub fn pop_expected(&mut self, k: &R) -> Option<V> {
self.expected.remove(k)
}
}
impl<R: Span, V: Clone + PartialEq> Consumer<R, V, ()> {
pub fn dummy() -> Self {
let (sender, _) = mpsc::unbounded_channel();
Self {
sender,
expected: HashMap::new(),
_subscriber: PhantomData,
}
}
}
impl<R, V, S> crate::Consumer for Consumer<R, V, S>
where
R: Span,
V: Clone + PartialEq + Send + 'static,
S: Clone + Eq + Send + 'static,
{
type Key = R;
type Value = V;
type Subscriber = S;
fn deliver(
&mut self,
delivery: Delivery<Self::Key, Self::Subscriber>,
value: Self::Value,
) -> oneshot::Receiver<bool> {
let key = delivery.key;
let (sender, receiver) = oneshot::channel();
let valid = self.expected.get(&key).is_none_or(|v| v == &value);
if valid {
self.sender.send_lossy((key, value));
}
let _ = sender.send(valid);
receiver
}
}