1use std::{
6 marker::PhantomData,
7 ops::{Deref, DerefMut},
8 sync::{atomic::Ordering, Weak},
9};
10
11use log::warn;
12
13use super::SubscriberInner;
14
15pub struct PublisherToken<'a>(pub(super) PhantomData<&'a ()>);
18
19pub trait Subscription {
21 type Item;
22
23 fn push(&mut self, value: Self::Item, token: PublisherToken) -> bool;
28
29 fn map<F, O>(self, map: F) -> Map<Self, F, O>
31 where
32 Self: Sized,
33 F: FnMut(O) -> Self::Item,
34 {
35 Map {
36 inner: self,
37 map,
38 _phantom: PhantomData,
39 }
40 }
41
42 fn filter_map<F, O>(self, map: F) -> FilterMap<Self, F, O>
46 where
47 Self: Sized,
48 F: FnMut(O) -> Self::Item,
49 {
50 FilterMap {
51 inner: self,
52 map,
53 _phantom: PhantomData,
54 }
55 }
56
57 fn boxed(self) -> BoxedSubscription<Self::Item>
59 where
60 Self: Sized + Send + 'static,
61 {
62 Box::new(self)
63 }
64
65 #[must_use]
98 fn set_name(mut self, name: impl Into<String>) -> Self
99 where
100 Self: Sized,
101 {
102 self.set_name_mut(name.into().into_boxed_str());
103 self
104 }
105
106 fn set_name_mut(&mut self, name: Box<str>);
108
109 fn increment_publishers(&self, token: PublisherToken);
112
113 fn decrement_publishers(&self, token: PublisherToken);
116}
117
118pub struct DirectSubscription<T> {
123 pub(super) sub: Weak<SubscriberInner<T>>,
124 pub(super) name: Option<Box<str>>,
125 pub(super) lag: usize,
126}
127
128impl<T> Clone for DirectSubscription<T> {
129 fn clone(&self) -> Self {
130 Self {
131 sub: self.sub.clone(),
132 name: self.name.clone(),
133 lag: self.lag.clone(),
134 }
135 }
136}
137
138impl<T> Subscription for DirectSubscription<T> {
139 type Item = T;
140
141 fn push(&mut self, value: Self::Item, _token: PublisherToken) -> bool {
142 if let Some(sub) = self.sub.upgrade() {
143 if sub.queue.force_push(value).is_some() {
144 self.lag += 1;
145 if let Some(name) = &self.name {
146 warn!(target: "publishers", "{name} lagging by {} messages", self.lag);
147 }
148 } else {
149 self.lag = 0;
150 sub.notify.notify_one();
151 }
152 true
153 } else {
154 false
155 }
156 }
157
158 fn set_name_mut(&mut self, name: Box<str>)
159 where
160 Self: Sized,
161 {
162 self.name = Some(name);
163 }
164
165 fn increment_publishers(&self, _token: PublisherToken) {
166 if let Some(sub) = self.sub.upgrade() {
167 sub.pub_count.fetch_add(1, Ordering::AcqRel);
168 }
169 }
170
171 fn decrement_publishers(&self, _token: PublisherToken) {
172 if let Some(sub) = self.sub.upgrade() {
173 sub.pub_count.fetch_sub(1, Ordering::AcqRel);
174 sub.notify.notify_one();
175 }
176 }
177}
178
179pub struct Map<I, F, O> {
180 inner: I,
181 map: F,
182 _phantom: PhantomData<O>,
183}
184
185impl<O, I, F> Subscription for Map<I, F, O>
186where
187 I: Subscription,
188 F: FnMut(O) -> I::Item,
189{
190 type Item = O;
191
192 fn push(&mut self, value: Self::Item, token: PublisherToken) -> bool {
193 self.inner.push((self.map)(value), token)
194 }
195
196 fn set_name_mut(&mut self, name: Box<str>) {
197 self.inner.set_name_mut(name);
198 }
199
200 fn increment_publishers(&self, token: PublisherToken) {
201 self.inner.increment_publishers(token);
202 }
203
204 fn decrement_publishers(&self, token: PublisherToken) {
205 self.inner.decrement_publishers(token);
206 }
207}
208
209impl<I: Clone, F: Clone, O> Clone for Map<I, F, O> {
210 fn clone(&self) -> Self {
211 Self {
212 inner: self.inner.clone(),
213 map: self.map.clone(),
214 _phantom: PhantomData,
215 }
216 }
217}
218
219pub struct FilterMap<I, F, O> {
220 inner: I,
221 map: F,
222 _phantom: PhantomData<O>,
223}
224
225impl<O, I, F> Subscription for FilterMap<I, F, O>
226where
227 I: Subscription,
228 F: FnMut(O) -> Option<I::Item>,
229{
230 type Item = O;
231
232 fn push(&mut self, value: Self::Item, token: PublisherToken) -> bool {
233 if let Some(value) = (self.map)(value) {
234 self.inner.push(value, token)
235 } else {
236 true
237 }
238 }
239
240 fn set_name_mut(&mut self, name: Box<str>) {
241 self.inner.set_name_mut(name);
242 }
243
244 fn increment_publishers(&self, token: PublisherToken) {
245 self.inner.increment_publishers(token);
246 }
247
248 fn decrement_publishers(&self, token: PublisherToken) {
249 self.inner.decrement_publishers(token);
250 }
251}
252
253impl<I: Clone, F: Clone, O> Clone for FilterMap<I, F, O> {
254 fn clone(&self) -> Self {
255 Self {
256 inner: self.inner.clone(),
257 map: self.map.clone(),
258 _phantom: PhantomData,
259 }
260 }
261}
262
263pub type BoxedSubscription<T> = Box<dyn Subscription<Item = T> + Send>;
264
265impl<T> Subscription for BoxedSubscription<T> {
266 type Item = T;
267
268 fn push(&mut self, value: Self::Item, token: PublisherToken) -> bool {
269 self.deref_mut().push(value, token)
270 }
271
272 fn set_name_mut(&mut self, name: Box<str>) {
273 self.deref_mut().set_name_mut(name);
274 }
275
276 fn increment_publishers(&self, token: PublisherToken) {
277 self.deref().increment_publishers(token);
278 }
279
280 fn decrement_publishers(&self, token: PublisherToken) {
281 self.deref().decrement_publishers(token);
282 }
283}