crosstalk/lib.rs
1#![doc(html_root_url = "https://docs.rs/crosstalk/0.3")]
2#![doc = include_str!("../README.md")]
3// --------------------------------------------------
4// external
5// --------------------------------------------------
6use std::{
7 sync::{
8 Arc,
9 Mutex,
10 },
11 any::Any,
12 hash::Hash,
13 collections::HashMap,
14};
15
16// --------------------------------------------------
17// internal
18// --------------------------------------------------
19pub mod external;
20pub use crosstalk_macros::init;
21pub use crosstalk_macros::AsTopic;
22
23#[derive(Clone)]
24/// A [`BoundedNode`] is a node to spawn publishers and
25/// subscribers on, where the size of each buffer is
26/// fixed.
27///
28/// # Attributes
29///
30/// * `node` - the node to spawn publishers and subscribers on
31/// * `size` - the size of each buffer
32///
33/// # Type Parameters
34///
35/// * `T` - the topic enum name
36///
37/// # Examples
38///
39/// ```
40/// use crosstalk::AsTopic;
41///
42/// #[derive(AsTopic)]
43/// enum House {
44/// Bedroom,
45/// LivingRoom,
46/// Kitchen,
47/// Bathroom,
48/// }
49///
50/// crosstalk::init! {
51/// House::Bedroom => String,
52/// House::LivingRoom => String,
53/// House::Kitchen => Vec<f32>,
54/// House::Bathroom => u8,
55/// }
56///
57/// fn main() {
58/// let mut node = crosstalk::BoundedNode::<House>::new(10);
59/// let (pub0, mut sub0) = node.pubsub(House::Bedroom).unwrap();
60/// let (pub1, mut sub1) = node.pubsub(House::Bedroom).unwrap();
61///
62/// pub0.write("Hello".to_string());
63/// pub0.write("World".to_string());
64/// pub1.write("Foo".to_string());
65/// pub1.write("Bar".to_string());
66///
67/// assert_eq!(sub1.try_read().unwrap(), "Hello");
68/// assert_eq!(sub1.try_read().unwrap(), "World");
69/// assert_eq!(sub1.try_read().unwrap(), "Foo");
70/// assert_eq!(sub1.try_read().unwrap(), "Bar");
71///
72/// assert_eq!(sub0.try_read().unwrap(), "Hello");
73/// assert_eq!(sub0.try_read().unwrap(), "World");
74/// assert_eq!(sub0.try_read().unwrap(), "Foo");
75/// assert_eq!(sub0.try_read().unwrap(), "Bar");
76/// }
77/// ```
78pub struct BoundedNode<T> {
79 pub node: Arc<Mutex<ImplementedBoundedNode<T>>>,
80 pub size: usize,
81}
82/// [`BoundedNode`] implementation
83///
84/// This holds an [`Arc<Mutex<ImplementedBoundedNode<T>>>`], which
85/// references the true (private) node that implements the [`AsTopic`] trait.
86impl<T> BoundedNode<T>
87where
88 T: CrosstalkTopic,
89 ImplementedBoundedNode<T>: CrosstalkPubSub<T>,
90{
91 #[inline]
92 /// Creates a new [`BoundedNode`]
93 ///
94 /// # Arguments
95 ///
96 /// * `size` - the size of each buffer
97 ///
98 /// # Examples
99 ///
100 /// ```
101 /// use crosstalk::AsTopic;
102 ///
103 /// #[derive(AsTopic)]
104 /// enum House {
105 /// Bedroom,
106 /// LivingRoom,
107 /// Kitchen,
108 /// Bathroom,
109 /// }
110 ///
111 /// crosstalk::init! {
112 /// House::Bedroom => String,
113 /// House::LivingRoom => String,
114 /// House::Kitchen => Vec<f32>,
115 /// House::Bathroom => u8,
116 /// }
117 ///
118 /// fn main() {
119 /// let node = crosstalk::BoundedNode::<House>::new(10);
120 /// let moved_node = node.clone();
121 /// std::thread::spawn(move || another_thread(moved_node));
122 /// assert_eq!(node.size, 10);
123 /// }
124 ///
125 /// fn another_thread(mut node: crosstalk::BoundedNode<House>) {
126 /// assert_eq!(node.size, 10);
127 /// }
128 /// ```
129 pub fn new(size: usize) -> Self {
130 Self {
131 node: Arc::new(Mutex::new(ImplementedBoundedNode::<T>::new(size.clone()))),
132 size,
133 }
134 }
135
136 #[inline]
137 #[deprecated(since = "0.3.3", note = "This function will be replaced by `publisher_blocking` in crosstalk v1.0. `publisher` will become async. Both `publisher_*` functions will return a `Result` with return error type `crosstalk::Error`")]
138 /// Creates a new publisher for the given topic `T`
139 ///
140 /// # Arguments
141 ///
142 /// * `topic` - the topic to create a publisher for
143 ///
144 /// # Returns
145 ///
146 /// A publisher for the topic `T`
147 ///
148 /// # Panics
149 ///
150 /// Will panic if the datatype of the topic `T` initialized
151 /// using [`crosstalk_macros::init`] does not match the topic `T`.
152 ///
153 /// Note that the ***linter will not catch this error until runtime***!
154 ///
155 /// # Examples
156 ///
157 /// ```
158 /// use crosstalk::AsTopic;
159 ///
160 /// #[derive(AsTopic)]
161 /// enum House {
162 /// Bedroom,
163 /// LivingRoom,
164 /// Kitchen,
165 /// Bathroom,
166 /// }
167 ///
168 /// crosstalk::init! {
169 /// House::Bedroom => String,
170 /// House::LivingRoom => String,
171 /// House::Kitchen => Vec<f32>,
172 /// House::Bathroom => u8,
173 /// }
174 ///
175 /// fn main() {
176 /// let mut node = crosstalk::BoundedNode::<House>::new(10);
177 /// assert!(node.publisher::<String>(House::Bedroom).is_ok());
178 /// }
179 /// ```
180 pub fn publisher<D: 'static>(&mut self, topic: T) -> Result<Publisher<D, T>, Box<dyn std::error::Error>> {
181 let mut n = self.node.lock().unwrap();
182 n.publisher(topic)
183 }
184
185 #[inline]
186 #[deprecated(since = "0.3.3", note = "This function will be replaced by `subscriber_blocking` in crosstalk v1.0. `subscriber` will become async. Both `subscriber_*` functions will return a `Result` with return error type `crosstalk::Error`")]
187 /// Creates a new subscriber for the given topic `T`
188 ///
189 /// # Arguments
190 ///
191 /// * `topic` - the topic to create a subscriber for
192 ///
193 /// # Returns
194 ///
195 /// A subscriber for the topic `T`
196 ///
197 /// # Panics
198 ///
199 /// Will panic if the datatype of the topic `T` initialized
200 /// using [`crosstalk_macros::init`] does not match the topic `T`.
201 ///
202 /// Note that the ***linter will not catch this error until runtime***!
203 ///
204 /// # Examples
205 ///
206 /// ```
207 /// use crosstalk::AsTopic;
208 ///
209 /// #[derive(AsTopic)]
210 /// enum House {
211 /// Bedroom,
212 /// LivingRoom,
213 /// Kitchen,
214 /// Bathroom,
215 /// }
216 ///
217 /// crosstalk::init! {
218 /// House::Bedroom => String,
219 /// House::LivingRoom => String,
220 /// House::Kitchen => Vec<f32>,
221 /// House::Bathroom => u8,
222 /// }
223 ///
224 /// fn main() {
225 /// let mut node = crosstalk::BoundedNode::<House>::new(10);
226 /// assert!(node.subscriber::<String>(House::Bedroom).is_ok());
227 /// }
228 /// ```
229 pub fn subscriber<D: Clone + Send + 'static>(&mut self, topic: T) -> Result<Subscriber<D, T>, Box<dyn std::error::Error>> {
230 let mut n = self.node.lock().unwrap();
231 n.subscriber(topic)
232 }
233
234 #[inline]
235 #[deprecated(since = "0.3.3", note = "This function will be replaced by `pubsub_blocking` in crosstalk v1.0. `pubsub` will become async. Both `pubsub_*` functions will return a `Result` with return error type `crosstalk::Error`")]
236 /// Creates a new publisher and subscriber for the given topic `T`
237 ///
238 /// # Arguments
239 ///
240 /// * `topic` - the topic to create a publisher and subscriber for
241 ///
242 /// # Returns
243 ///
244 /// A publisher and subscriber for the topic `T`
245 ///
246 /// # Examples
247 ///
248 /// ```
249 /// use crosstalk::AsTopic;
250 ///
251 /// #[derive(AsTopic)]
252 /// enum House {
253 /// Bedroom,
254 /// LivingRoom,
255 /// Kitchen,
256 /// Bathroom,
257 /// }
258 ///
259 /// crosstalk::init! {
260 /// House::Bedroom => String,
261 /// House::LivingRoom => String,
262 /// House::Kitchen => Vec<f32>,
263 /// House::Bathroom => u8,
264 /// }
265 ///
266 /// fn main() {
267 /// let mut node = crosstalk::BoundedNode::<House>::new(10);
268 /// let (publisher, mut subscriber) = node.pubsub(House::Bedroom).unwrap();
269 /// publisher.write("hello".to_string());
270 /// assert_eq!(subscriber.try_read().unwrap(), "hello");
271 /// }
272 /// ```
273 ///
274 pub fn pubsub<D: Clone + Send + 'static>(&mut self, topic: T) -> Result<(Publisher<D, T>, Subscriber<D, T>), Box<dyn std::error::Error>> {
275 let mut n = self.node.lock().unwrap();
276 n.pubsub(topic)
277 }
278
279 #[inline]
280 #[deprecated(since = "0.3.3", note = "This function will be removed in crosstalk v1.0. Dropping the publisher will achieve the same result")]
281 /// Deletes a publisher
282 ///
283 /// # Arguments
284 ///
285 /// * `publisher` - the publisher to delete
286 ///
287 /// # Examples
288 ///
289 /// ```
290 /// use crosstalk::AsTopic;
291 ///
292 /// #[derive(AsTopic)]
293 /// enum House {
294 /// Bedroom,
295 /// LivingRoom,
296 /// Kitchen,
297 /// Bathroom,
298 /// }
299 ///
300 /// crosstalk::init! {
301 /// House::Bedroom => String,
302 /// House::LivingRoom => String,
303 /// House::Kitchen => Vec<f32>,
304 /// House::Bathroom => u8,
305 /// }
306 ///
307 /// fn main() {
308 /// let mut node = crosstalk::BoundedNode::<House>::new(10);
309 /// let publisher = node.publisher::<String>(House::Bedroom).unwrap();
310 /// node.delete_publisher(publisher);
311 /// assert!(true);
312 /// }
313 /// ```
314 pub fn delete_publisher<D: 'static>(&mut self, _publisher: Publisher<D, T>) {
315 let n = self.node.lock().unwrap();
316 n.delete_publisher(_publisher)
317 }
318
319 #[inline]
320 #[deprecated(since = "0.3.3", note = "This function will be removed in crosstalk v1.0. Dropping the subscriber will achieve the same result")]
321 /// Deletes a subscriber
322 ///
323 /// # Arguments
324 ///
325 /// * `subscriber` - the subscriber to delete
326 ///
327 /// # Examples
328 ///
329 /// ```
330 /// use crosstalk::AsTopic;
331 ///
332 /// #[derive(AsTopic)]
333 /// enum House {
334 /// Bedroom,
335 /// LivingRoom,
336 /// Kitchen,
337 /// Bathroom,
338 /// }
339 ///
340 /// crosstalk::init! {
341 /// House::Bedroom => String,
342 /// House::LivingRoom => String,
343 /// House::Kitchen => Vec<f32>,
344 /// House::Bathroom => u8,
345 /// }
346 ///
347 /// fn main() {
348 /// let mut node = crosstalk::BoundedNode::<House>::new(10);
349 /// let subscriber = node.subscriber::<String>(House::Bedroom).unwrap();
350 /// node.delete_subscriber(subscriber);
351 /// assert!(true);
352 /// }
353 /// ```
354 pub fn delete_subscriber<D: Clone + Send + 'static>(&mut self, subscriber: Subscriber<D, T>) {
355 let n = self.node.lock().unwrap();
356 n.delete_subscriber(subscriber)
357 }
358}
359
360/// The inner implementation of the node,
361/// which implements the [`AsTopic`] trait
362///
363/// This is auto-generated by the [`crosstalk_macros::init!`] macro
364///
365/// # Attributes
366///
367/// * `senders` - the senders of the node
368/// * `size` - the size of each buffer
369pub struct ImplementedBoundedNode<T> {
370 pub senders: HashMap<T, Box<dyn Any + 'static>>,
371 pub size: usize,
372}
373
374/// [`ImplementedBoundedNode`] implementation of [`Send`]
375unsafe impl<T> Send for ImplementedBoundedNode<T> {}
376/// [`ImplementedBoundedNode`] implementation of [`Sync`]
377unsafe impl<T> Sync for ImplementedBoundedNode<T> {}
378
379/// [`ImplementedBoundedNode`] implementation
380impl<T> ImplementedBoundedNode<T>
381where
382 T: CrosstalkTopic,
383{
384 /// See [`BoundedNode::new`]
385 ///
386 /// # Arguments
387 ///
388 /// * `size` - the size of each buffer
389 pub fn new(size: usize) -> Self {
390 Self {
391 senders: HashMap::new(),
392 size: size,
393 }
394 }
395}
396
397#[derive(Clone)]
398/// A `crosstalk` [`Publisher`]
399///
400/// # Attributes
401///
402/// * `topic` - the topic of the publisher
403/// * `buf` - the buffer which broadcasts the data
404///
405/// # Type Parameters
406///
407/// * `T` - the topic of the publisher
408/// * `D` - the data type of the publisher
409///
410/// This is not meant to be used directly, please
411/// use the [`crosstalk_macros::init!`] macro instead
412/// and produce a [`Publisher`] with [`BoundedNode::publisher`]
413/// or [`BoundedNode::pubsub`]
414pub struct Publisher<D, T> {
415 pub topic: T,
416 buf: tokio::sync::broadcast::Sender<D>,
417}
418/// Implements [`Publisher`]
419impl<D, T> Publisher<D, T> {
420 #[inline]
421 /// See [`BoundedNode::publisher`]
422 pub fn new(buf: tokio::sync::broadcast::Sender<D>, topic: T) -> Self {
423 Self { buf, topic }
424 }
425
426 #[inline]
427 /// Publishes data to a topic, broadcasting it to all subscribers
428 ///
429 /// # Arguments
430 ///
431 /// * `sample` - the sample to publish
432 ///
433 /// # Examples
434 ///
435 /// ```
436 /// use crosstalk::AsTopic;
437 ///
438 /// #[derive(AsTopic)]
439 /// enum House {
440 /// Bedroom,
441 /// LivingRoom,
442 /// Kitchen,
443 /// Bathroom,
444 /// }
445 ///
446 /// crosstalk::init! {
447 /// House::Bedroom => String,
448 /// House::LivingRoom => String,
449 /// House::Kitchen => Vec<f32>,
450 /// House::Bathroom => u8,
451 /// }
452 ///
453 /// fn main() {
454 /// let mut node = crosstalk::BoundedNode::<House>::new(10);
455 /// let (publisher, mut subscriber) = node.pubsub(House::Bedroom).unwrap();
456 /// publisher.write("hello".to_string());
457 /// std::thread::spawn(move || {
458 /// assert_eq!(subscriber.try_read().unwrap(), "hello");
459 /// });
460 /// }
461 /// ```
462 pub fn write(&self, sample: D) {
463 let _ = self.buf.send(sample);
464 }
465}
466
467/// A `crosstalk` [`Subscriber`]
468///
469/// # Attributes
470///
471/// * `topic` - the topic of the subscriber
472/// * `rcvr` - the receiver of the subscriber
473/// * `sndr` - the sender for the topic. This is used to spawn multiple receivers upon [`Subscriber::clone`]
474///
475/// # Type Parameters
476///
477/// * `T` - the topic of the subscriber
478/// * `D` - the data type of the subscriber
479///
480/// This is not meant to be used directly, please
481/// use the [`crosstalk_macros::init!`] macro instead
482/// and produce a [`Subscriber`] with [`BoundedNode::subscriber`]
483/// or [`BoundedNode::pubsub`]
484pub struct Subscriber<D, T> {
485 pub topic: T,
486 rcvr: Receiver<D>,
487 sndr: Arc<tokio::sync::broadcast::Sender<D>>,
488}
489/// [`Subscriber`] implementation
490impl<D: Clone, T: Clone> Subscriber<D, T> {
491 #[inline]
492 /// See [`BoundedNode::subscriber`]
493 pub fn new(
494 topic: T,
495 rcvr: Option<tokio::sync::broadcast::Receiver<D>>,
496 sndr: Arc<tokio::sync::broadcast::Sender<D>>,
497 ) -> Self {
498 Self {
499 topic: topic,
500 rcvr: Receiver::new(rcvr.unwrap_or(sndr.subscribe())),
501 sndr: sndr.clone(),
502 }
503 }
504
505 #[inline]
506 /// Clones a [`Subscriber`]
507 ///
508 /// # Examples
509 ///
510 /// ```
511 /// use crosstalk::AsTopic;
512 ///
513 /// #[derive(AsTopic)]
514 /// enum House {
515 /// Bedroom,
516 /// LivingRoom,
517 /// Kitchen,
518 /// Bathroom,
519 /// }
520 ///
521 /// crosstalk::init! {
522 /// House::Bedroom => String,
523 /// House::LivingRoom => String,
524 /// House::Kitchen => Vec<f32>,
525 /// House::Bathroom => u8,
526 /// }
527 ///
528 /// fn main() {
529 /// let mut node = crosstalk::BoundedNode::<House>::new(10);
530 /// let (publisher, mut subscriber) = node.pubsub(House::Bedroom).unwrap();
531 /// let mut subscriber_2 = subscriber.clone();
532 /// publisher.write("hello".to_string());
533 /// assert_eq!(subscriber.try_read().unwrap(), "hello");
534 /// assert_eq!(subscriber_2.try_read().unwrap(), "hello");
535 /// }
536 /// ```
537 pub fn clone(&self) -> Self {
538 Self {
539 topic: self.topic.clone(),
540 rcvr: Receiver::new(self.sndr.subscribe()),
541 sndr: self.sndr.clone(),
542 }
543 }
544
545 /// Asynchronous blocking read from the [`tokio::sync::broadcast::Receiver`]
546 ///
547 /// The sequential equivalent to this function is [`Subscriber::read_blocking`]
548 /// which can be used outside of an asynchronous context
549 ///
550 /// # Examples
551 ///
552 /// ```
553 /// use crosstalk::AsTopic;
554 ///
555 /// #[derive(AsTopic)]
556 /// enum House {
557 /// Bedroom,
558 /// LivingRoom,
559 /// Kitchen,
560 /// Bathroom,
561 /// }
562 ///
563 /// crosstalk::init! {
564 /// House::Bedroom => String,
565 /// House::LivingRoom => String,
566 /// House::Kitchen => Vec<f32>,
567 /// House::Bathroom => u8,
568 /// }
569 ///
570 /// #[tokio::main]
571 /// async fn main() {
572 /// let mut node = crosstalk::BoundedNode::<House>::new(10);
573 /// let (publisher, mut subscriber) = node.pubsub(House::Bedroom).unwrap();
574 /// publisher.write("hello".to_string());
575 /// assert_eq!(subscriber.read().await, Some("hello".to_string()));
576 /// }
577 /// ```
578 pub async fn read(&mut self) -> Option<D> {
579 self.rcvr.read().await
580 }
581
582 #[inline]
583 /// Non-blocking read from [`tokio::sync::broadcast::Receiver`]
584 /// Upon immediate failure, [`None`] will be returned
585 ///
586 /// Difference between this function and [`Subscriber::try_read_raw`]
587 /// is that this function continuously loops upon [`tokio`] error of
588 /// [`tokio::sync::broadcast::error::TryRecvError::Lagged`], looping
589 /// until a valid message is received OR the buffer is determined to be
590 /// empty
591 ///
592 /// [`Subscriber::try_read_raw`] will return [`None`] upon
593 /// [`tokio::sync::broadcast::error::TryRecvError::Lagged`], which
594 /// can cause some unexpected behavior
595 ///
596 /// # Examples
597 ///
598 /// ```
599 /// use crosstalk::AsTopic;
600 ///
601 /// #[derive(AsTopic)]
602 /// enum House {
603 /// Bedroom,
604 /// LivingRoom,
605 /// Kitchen,
606 /// Bathroom,
607 /// }
608 ///
609 /// crosstalk::init! {
610 /// House::Bedroom => String,
611 /// House::LivingRoom => String,
612 /// House::Kitchen => Vec<f32>,
613 /// House::Bathroom => u8,
614 /// }
615 ///
616 /// fn main() {
617 /// let mut node = crosstalk::BoundedNode::<House>::new(10);
618 /// let (publisher, mut subscriber) = node.pubsub(House::Bedroom).unwrap();
619 /// publisher.write("hello".to_string());
620 /// assert_eq!(subscriber.try_read(), Some(String::from("hello")));
621 /// assert_eq!(subscriber.try_read(), None);
622 /// }
623 /// ```
624 pub fn try_read(&mut self) -> Option<D> {
625 self.rcvr.try_read()
626 }
627
628 #[inline]
629 /// Non-blocking read from [`tokio::sync::broadcast::Receiver`], returning
630 /// [`None`] if there are no messages available or if
631 /// [`tokio::sync::broadcast::error::TryRecvError::Lagged`] occurs.
632 ///
633 /// This function can cause some unexpected behavior. It is recommended
634 /// to use [`Subscriber::try_read`] instead.
635 ///
636 /// # Examples
637 ///
638 /// ```
639 /// use crosstalk::AsTopic;
640 ///
641 /// #[derive(AsTopic)]
642 /// enum House {
643 /// Bedroom,
644 /// LivingRoom,
645 /// Kitchen,
646 /// Bathroom,
647 /// }
648 ///
649 /// crosstalk::init! {
650 /// House::Bedroom => String,
651 /// House::LivingRoom => String,
652 /// House::Kitchen => Vec<f32>,
653 /// House::Bathroom => u8,
654 /// }
655 ///
656 /// fn main() {
657 /// let mut node = crosstalk::BoundedNode::<House>::new(10);
658 /// let (publisher, mut subscriber) = node.pubsub(House::Bedroom).unwrap();
659 /// publisher.write("hello".to_string());
660 /// assert_eq!(subscriber.try_read_raw(), Some(String::from("hello")));
661 /// assert_eq!(subscriber.try_read_raw(), None);
662 /// }
663 /// ```
664 pub fn try_read_raw(&mut self) -> Option<D> {
665 self.rcvr.try_read_raw()
666 }
667
668 #[inline]
669 /// Sequential blocking read from [`tokio::sync::broadcast::Receiver`]
670 ///
671 /// The asynchronous equivalent to this function is [`Subscriber::read`],
672 /// which must be used in an asynchronous context with `.await`
673 ///
674 /// # Examples
675 ///
676 /// ```
677 /// use crosstalk::AsTopic;
678 ///
679 /// #[derive(AsTopic)]
680 /// enum House {
681 /// Bedroom,
682 /// LivingRoom,
683 /// Kitchen,
684 /// Bathroom,
685 /// }
686 ///
687 /// crosstalk::init! {
688 /// House::Bedroom => String,
689 /// House::LivingRoom => String,
690 /// House::Kitchen => Vec<f32>,
691 /// House::Bathroom => u8,
692 /// }
693 ///
694 /// fn main() {
695 /// let mut node = crosstalk::BoundedNode::<House>::new(10);
696 /// let (publisher, mut subscriber) = node.pubsub(House::Bedroom).unwrap();
697 /// publisher.write("hello".to_string());
698 /// assert_eq!(subscriber.read_blocking(), Some("hello".to_string()));
699 /// }
700 /// ```
701 pub fn read_blocking(&mut self) -> Option<D> {
702 self.rcvr.read_blocking()
703 }
704
705 #[inline]
706 /// Asynchronous non-blocking read from [`tokio::sync::broadcast::Receiver`]
707 /// with a given timeout. After the timeout if there are no messages,
708 /// returns [`None`].
709 ///
710 /// # Examples
711 ///
712 /// ```
713 /// use crosstalk::AsTopic;
714 ///
715 /// #[derive(AsTopic)]
716 /// enum House {
717 /// Bedroom,
718 /// LivingRoom,
719 /// Kitchen,
720 /// Bathroom,
721 /// }
722 ///
723 /// crosstalk::init! {
724 /// House::Bedroom => String,
725 /// House::LivingRoom => String,
726 /// House::Kitchen => Vec<f32>,
727 /// House::Bathroom => u8,
728 /// }
729 ///
730 /// #[tokio::main]
731 /// async fn main() {
732 /// let mut node = crosstalk::BoundedNode::<House>::new(10);
733 /// let (publisher, mut subscriber) = node.pubsub(House::Bedroom).unwrap();
734 /// publisher.write("hello".to_string());
735 /// assert_eq!(subscriber.read_timeout(std::time::Duration::from_millis(100)).await, Some("hello".to_string()));
736 /// }
737 /// ```
738 pub async fn read_timeout(&mut self, timeout: std::time::Duration) -> Option<D> {
739 self.rcvr.read_timeout(timeout).await
740 }
741}
742
743
744/// Receiver
745///
746/// Define a receiver for subscribing messages
747///
748/// Reads from [`tokio::sync::broadcast::Receiver`]
749pub struct Receiver<D> {
750 buf: tokio::sync::broadcast::Receiver<D>,
751}
752/// [`Receiver`] implementation
753impl<D> Receiver<D>
754where
755 D: Clone
756{
757 #[inline]
758 pub fn new(
759 buf: tokio::sync::broadcast::Receiver<D>,
760 ) -> Self {
761 Self { buf }
762 }
763
764 #[inline]
765 /// Reads from [`tokio::sync::broadcast::Receiver`]
766 ///
767 /// This struct/function is not meant to be used directly,
768 /// rather through the [`Subscriber`] struct with [`Subscriber::read`]
769 pub async fn read(&mut self) -> Option<D> {
770 loop {
771 match self.buf.recv().await {
772 Ok(res) => return Some(res),
773 Err(e) => match e {
774 tokio::sync::broadcast::error::RecvError::Lagged(_) => { continue; }
775
776 #[cfg(not(any(feature = "log", feature = "tracing")))]
777 _ => return None,
778
779 #[cfg(any(feature = "log", feature = "tracing"))]
780 _ => {
781 #[cfg(feature = "log")]
782 log::error!("{}", e);
783 #[cfg(feature = "tracing")]
784 tracing::error!("{}", e);
785 return None
786 }
787 }
788 }
789 }
790 }
791
792 #[inline]
793 /// Reads from [`tokio::sync::broadcast::Receiver`]
794 ///
795 /// This struct/function is not meant to be used directly,
796 /// rather through the [`Subscriber`] struct with [`Subscriber::try_read`]
797 pub fn try_read(&mut self) -> Option<D> {
798 loop {
799 match self.buf.try_recv() {
800 Ok(d) => return Some(d),
801 Err(e) => {
802 match e {
803 tokio::sync::broadcast::error::TryRecvError::Lagged(_) => { continue; },
804
805 #[cfg(not(any(feature = "log", feature = "tracing")))]
806 _ => return None,
807
808 #[cfg(any(feature = "log", feature = "tracing"))]
809 _ => {
810 #[cfg(feature = "log")]
811 log::error!("{}", e);
812 #[cfg(feature = "tracing")]
813 tracing::error!("{}", e);
814 return None
815 },
816 }
817 },
818 }
819 }
820 }
821
822 #[inline]
823 /// Reads from [`tokio::sync::broadcast::Receiver`]
824 ///
825 /// This struct/function is not meant to be used directly,
826 /// rather through the [`Subscriber`] struct with [`Subscriber::try_read_raw`]
827 pub fn try_read_raw(&mut self) -> Option<D> {
828 match self.buf.try_recv() {
829 Ok(d) => Some(d),
830
831 #[cfg(not(any(feature = "log", feature = "tracing")))]
832 Err(_) => None,
833
834 #[cfg(any(feature = "log", feature = "tracing"))]
835 Err(e) => {
836 #[cfg(feature = "log")]
837 log::error!("{}", e);
838 #[cfg(feature = "tracing")]
839 tracing::error!("{}", e);
840 None
841 },
842 }
843 }
844
845 #[inline]
846 /// Reads from [`tokio::sync::broadcast::Receiver`]
847 ///
848 /// This struct/function is not meant to be used directly,
849 /// rather through the [`Subscriber`] struct with [`Subscriber::read_blocking`]
850 pub fn read_blocking(&mut self) -> Option<D> {
851 loop {
852 match self.buf.blocking_recv() {
853 Ok(res) => return Some(res),
854 Err(e) => match e {
855 tokio::sync::broadcast::error::RecvError::Lagged(_) => { continue; }
856
857 #[cfg(not(any(feature = "log", feature = "tracing")))]
858 _ => return None,
859
860 #[cfg(any(feature = "log", feature = "tracing"))]
861 _ => {
862 #[cfg(feature = "log")]
863 log::error!("{}", e);
864 #[cfg(feature = "tracing")]
865 tracing::error!("{}", e);
866 return None
867 },
868 }
869 }
870 }
871 }
872
873 #[inline]
874 /// Reads from [`tokio::sync::broadcast::Receiver`]
875 ///
876 /// This struct/function is not meant to be used directly,
877 /// rather through the [`Subscriber`] struct with [`Subscriber::read_timeout`]
878 pub async fn read_timeout(&mut self, timeout: std::time::Duration) -> Option<D> {
879 match tokio::runtime::Handle::try_current() {
880 Ok(_) => {
881 match tokio::time::timeout(timeout, self.buf.recv()).await {
882 Ok(res) => {
883 match res {
884 Ok(res) => Some(res),
885
886 #[cfg(not(any(feature = "log", feature = "tracing")))]
887 Err(_) => None,
888
889 #[cfg(any(feature = "log", feature = "tracing"))]
890 Err(e) => {
891 #[cfg(feature = "log")]
892 log::error!("{}", e);
893 #[cfg(feature = "tracing")]
894 tracing::error!("{}", e);
895 None
896 },
897 }
898 },
899
900 #[cfg(not(any(feature = "log", feature = "tracing")))]
901 Err(_) => None,
902
903 #[cfg(any(feature = "log", feature = "tracing"))]
904 Err(e) => {
905 #[cfg(feature = "log")]
906 log::error!("{}", e);
907 #[cfg(feature = "tracing")]
908 tracing::error!("{}", e);
909 None
910 },
911 }
912 },
913
914 #[cfg(not(any(feature = "log", feature = "tracing")))]
915 Err(_) => None,
916
917 #[cfg(any(feature = "log", feature = "tracing"))]
918 Err(e) => {
919 #[cfg(feature = "log")]
920 log::error!("{}", e);
921 #[cfg(feature = "tracing")]
922 tracing::error!("{}", e);
923 None
924 },
925 }
926 }
927}
928
929#[derive(Debug)]
930/// [`crosstalk`](crate) errors
931pub enum Error {
932 PublisherMismatch(String, String),
933 SubscriberMismatch(String, String),
934}
935impl std::error::Error for Error {}
936impl std::fmt::Display for Error {
937 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
938 match self {
939 Error::PublisherMismatch(input, output) => write!(f, "Publisher type mismatch: {} (cast) != {} (expected)", input, output),
940 Error::SubscriberMismatch(input, output) => write!(f, "Subscriber type mismatch: {} (cast) != {} (expected)", input, output),
941 }
942 }
943}
944
945/// A trait to define a [`CrosstalkTopic`]
946pub trait CrosstalkTopic: Eq + Hash + Copy + Clone + PartialEq {}
947/// A trait to define a [`CrosstalkPubSub`]
948///
949/// This is used to implement the [`CrosstalkPubSub`] trait
950/// using the [`crosstalk_macros::init!`] macro
951/// for the [`ImplementedBoundedNode`] struct
952///
953/// This is not meant to be used directly, and is automatically
954/// implemented when calling [`crosstalk_macros::init!`]
955pub trait CrosstalkPubSub<T> {
956 fn publisher<D: 'static>(&mut self, topic: T) -> Result<Publisher<D, T>, Box<dyn std::error::Error>>;
957 fn subscriber<D: Clone + Send + 'static>(&mut self, topic: T) -> Result<Subscriber<D, T>, Box<dyn std::error::Error>>;
958 fn pubsub<D: Clone + Send + 'static>(&mut self, topic: T) -> Result<(Publisher<D, T>, Subscriber<D, T>), Box<dyn std::error::Error>>;
959 #[deprecated(since = "0.3.3", note = "This function will be removed in crosstalk v1.0. Dropping the publisher will achieve the same result")]
960 fn delete_publisher<D: 'static>(&self, publisher: Publisher<D, T>);
961 #[deprecated(since = "0.3.3", note = "This function will be removed in crosstalk v1.0. Dropping the subscriber will achieve the same result")]
962 fn delete_subscriber<D: Clone + Send + 'static>(&self, subscriber: Subscriber<D, T>);
963}
964
965#[inline]
966/// Downcasts a [`Box`] into a type `T`
967///
968/// # Arguments
969///
970/// * `buf` - the [`Box`] to downcast
971///
972/// # Examples
973///
974/// ```
975/// use crosstalk::downcast;
976///
977/// let mut buf = Box::new(5) as Box<dyn std::any::Any + 'static>;
978/// assert_eq!(downcast::<i32>(buf).unwrap(), 5);
979/// ```
980pub fn downcast<T>(buf: Box<dyn Any + 'static>) -> Result<T, Box<dyn Any>>
981where
982 T: 'static,
983{
984 match buf.downcast::<T>() {
985 Ok(t) => Ok(*t),
986 Err(e) => Err(e),
987 }
988}