1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
use service_channel::{mpsc, oneshot};
use std::collections::BTreeMap;
/// A typed pub/sub topic.
pub trait Topic: Ord + Clone + Send + 'static {
type Item: Clone + Send + 'static;
}
/// Bind a topic to a concrete endpoint field on a service.
///
/// This is the key piece that replaces Any/TypeId routing:
/// each topic knows where its endpoint lives on service S.
pub trait RoutedTopic<S>: Topic
where
S: crate::Service,
{
/// Returns this topic's [`TopicEndpoint`] on `service`.
///
/// Implementations should consistently point at the same logical field on `S` so
/// routing matches how the service stores topic state.
fn endpoint(service: &mut S) -> &mut TopicEndpoint<Self>
where
Self: Sized;
}
/// A single-shot broadcast endpoint.
///
/// - each subscribe registers one waiter
/// - each publish wakes all current waiters once
/// - future publishes require future subscribe calls again
pub struct TopicEndpoint<T>
where
T: Topic,
{
once_waiters: BTreeMap<T, Vec<oneshot::Sender<T::Item>>>,
all_waiters: BTreeMap<T, Vec<mpsc::UnboundedSender<T::Item>>>,
}
impl<T> Default for TopicEndpoint<T>
where
T: Topic,
{
/// Empty endpoint with no waiters.
fn default() -> Self {
Self {
once_waiters: BTreeMap::new(),
all_waiters: BTreeMap::new(),
}
}
}
impl<T> TopicEndpoint<T>
where
T: Topic,
{
/// Register one subscriber waiting for the next publication.
pub fn subscribe(&mut self, topic: T, tx: oneshot::Sender<T::Item>) {
self.once_waiters.entry(topic).or_default().push(tx);
}
/// Register a subscriber waiting for all future publications.
pub fn subscribe_all(&mut self, topic: T, tx: mpsc::UnboundedSender<T::Item>) {
self.all_waiters.entry(topic).or_default().push(tx);
}
/// Publish once to all current subscribers, then clear them.
///
/// Subscribers that already dropped are silently skipped.
pub fn publish(&mut self, topic: &T, item: T::Item) {
let waiters = self.once_waiters.remove(topic).unwrap_or_default();
for tx in waiters {
let _ = tx.send(item.clone());
}
let waiters = self.all_waiters.get(topic);
if let Some(waiters) = waiters {
for tx in waiters {
let _ = tx.unbounded_send(item.clone());
}
}
}
// /// Remove all pending subscribers without publishing.
// ///
// /// Dropped [`oneshot::Sender`]s cause the corresponding receivers to see a closed channel.
// pub fn clear(&mut self) {
// self.once_waiters.clear();
// }
// /// Remove all pending subscribers for a specific topic value.
// pub fn clear_topic(&mut self, topic: &T) {
// self.once_waiters.remove(topic);
// }
}