use std::collections::HashMap;
use std::sync::{Arc, OnceLock};
use arc_swap::ArcSwapOption;
use parking_lot::RwLock;
use crate::channel::Channel;
type Slots<T> = RwLock<HashMap<String, Arc<ProducerSlot<T>>>>;
type Observer<T> = Box<dyn Fn(&str, &T) + Send + Sync + 'static>;
pub struct ProducerSlot<T> {
target_id: String,
slot: ArcSwapOption<T>,
observers: Option<Arc<Channel<Observer<T>>>>,
}
impl<T> ProducerSlot<T> {
fn with_observers(target_id: impl Into<String>, observers: Arc<Channel<Observer<T>>>) -> Self {
Self {
target_id: target_id.into(),
slot: ArcSwapOption::const_empty(),
observers: Some(observers),
}
}
pub fn target_id(&self) -> &str {
&self.target_id
}
pub fn publish(&self, value: T) {
let arc = Arc::new(value);
self.slot.store(Some(Arc::clone(&arc)));
if let Some(obs) = &self.observers {
obs.for_each(|cb| cb(&self.target_id, &arc));
}
}
pub fn latest(&self) -> Option<Arc<T>> {
self.slot.load_full()
}
pub fn clear(&self) {
self.slot.store(None);
}
}
pub struct ProducerRegistry<T: 'static> {
inner: OnceLock<Slots<T>>,
observers: OnceLock<Arc<Channel<Observer<T>>>>,
}
impl<T: 'static> ProducerRegistry<T> {
pub const fn new() -> Self {
Self {
inner: OnceLock::new(),
observers: OnceLock::new(),
}
}
fn slots(&self) -> &Slots<T> {
self.inner.get_or_init(|| RwLock::new(HashMap::new()))
}
fn observers(&self) -> &Arc<Channel<Observer<T>>> {
self.observers.get_or_init(|| Arc::new(Channel::new()))
}
pub fn register(&self, target_id: impl Into<String>) -> Arc<ProducerSlot<T>> {
let target_id = target_id.into();
{
let guard = self.slots().read();
if let Some(slot) = guard.get(&target_id) {
return Arc::clone(slot);
}
}
let mut guard = self.slots().write();
if let Some(slot) = guard.get(&target_id) {
return Arc::clone(slot);
}
let slot = Arc::new(ProducerSlot::with_observers(
target_id.clone(),
Arc::clone(self.observers()),
));
guard.insert(target_id, Arc::clone(&slot));
slot
}
pub fn lookup(&self, target_id: &str) -> Option<Arc<ProducerSlot<T>>> {
self.slots().read().get(target_id).map(Arc::clone)
}
pub fn count(&self) -> usize {
self.slots().read().len()
}
pub fn add_observer<F>(&self, cb: F)
where
F: Fn(&str, &T) + Send + Sync + 'static,
{
self.observers().register(Box::new(cb));
}
pub fn observer_count(&self) -> usize {
self.observers().count()
}
}
impl<T: 'static> Default for ProducerRegistry<T> {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn published_value_round_trips_through_slot() {
let reg: ProducerRegistry<i32> = ProducerRegistry::new();
let slot = reg.register("/test/standalone");
assert!(slot.latest().is_none());
slot.publish(42);
assert_eq!(*slot.latest().expect("published"), 42);
slot.publish(7);
assert_eq!(*slot.latest().expect("overwrite"), 7);
slot.clear();
assert!(slot.latest().is_none());
assert_eq!(slot.target_id(), "/test/standalone");
}
#[test]
fn registry_observers_see_every_publish() {
use std::sync::atomic::{AtomicUsize, Ordering};
let reg: ProducerRegistry<i32> = ProducerRegistry::new();
let total = Arc::new(AtomicUsize::new(0));
let total_clone = Arc::clone(&total);
reg.add_observer(move |target, value| {
assert!(target.starts_with("/Robot/"));
total_clone.fetch_add(*value as usize, Ordering::SeqCst);
});
let a = reg.register("/Robot/A");
let b = reg.register("/Robot/B");
a.publish(3);
b.publish(4);
a.publish(5);
assert_eq!(total.load(Ordering::SeqCst), 12);
assert_eq!(reg.observer_count(), 1);
}
#[test]
fn registry_is_keyed_by_target_id() {
let reg: ProducerRegistry<i32> = ProducerRegistry::new();
let a = reg.register("/Robot/A");
let b = reg.register("/Robot/B");
let a_again = reg.register("/Robot/A");
assert!(Arc::ptr_eq(&a, &a_again));
assert!(!Arc::ptr_eq(&a, &b));
a.publish(1);
b.publish(2);
assert_eq!(*reg.lookup("/Robot/A").unwrap().latest().unwrap(), 1);
assert_eq!(*reg.lookup("/Robot/B").unwrap().latest().unwrap(), 2);
assert_eq!(reg.count(), 2);
}
#[test]
fn lookup_unregistered_target_returns_none() {
let reg: ProducerRegistry<i32> = ProducerRegistry::new();
assert!(reg.lookup("/Robot/Unregistered").is_none());
}
}