crosstalk/lib.rs
1#![doc(html_root_url = "https://docs.rs/crosstalk/1.0")]
2#![doc = include_str!("../README.md")]
3// --------------------------------------------------
4// external
5// --------------------------------------------------
6use std::sync::Arc;
7use tokio::sync::Mutex;
8use std::collections::HashMap;
9use tokio::sync::broadcast::{
10 Sender as TokioSender,
11 Receiver as TokioReceiver,
12};
13
14// --------------------------------------------------
15// local
16// --------------------------------------------------
17pub use crosstalk_macros::init;
18pub use crosstalk_macros::AsTopic;
19
20// --------------------------------------------------
21// re-exports
22// --------------------------------------------------
23pub mod __macro_exports {
24 pub use tokio::runtime;
25 pub use tokio::sync::broadcast;
26
27 #[inline(always)]
28 /// Downcasts a [`Box`] into a type `T`
29 ///
30 /// # Arguments
31 ///
32 /// * `buf` - the [`Box`] to downcast
33 ///
34 /// # Examples
35 ///
36 /// ```
37 /// use crosstalk::__macro_exports::downcast;
38 ///
39 /// let mut buf = Box::new(5) as Box<dyn std::any::Any + 'static>;
40 /// assert_eq!(downcast::<i32>(buf, crosstalk::Error::PublisherMismatch("foo", "bar")).unwrap(), 5);
41 /// ```
42 pub fn downcast<T>(buf: Box<dyn std::any::Any + 'static>, on_error: crate::Error) -> Result<T, crate::Error>
43 where
44 T: 'static,
45 {
46 match buf.downcast::<T>() {
47 Ok(t) => Ok(*t),
48 Err(_) => Err(on_error),
49 }
50 }
51}
52
53/// A trait bound an enum as a [`CrosstalkTopic`]
54pub trait CrosstalkTopic: Eq + Copy + Clone + PartialEq + std::hash::Hash {}
55
56/// A trait to bound a datatype as a [`CrosstalkData`]
57pub trait CrosstalkData: Clone + Send + 'static {}
58/// [`CrosstalkData`] implementation for all types
59impl<T: Clone + Send + 'static> CrosstalkData for T {}
60
61#[derive(Copy, Clone, Debug)]
62/// [`crosstalk`](crate) errors
63pub enum Error {
64 PublisherMismatch(&'static str, &'static str),
65 SubscriberMismatch(&'static str, &'static str),
66}
67/// [`crosstalk::Error`](crate::Error) implementation of [`std::error::Error`]
68impl std::error::Error for Error {}
69/// [`crosstalk::Error`](crate::Error) implementation of [`std::fmt::Display`]
70impl std::fmt::Display for Error {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 match self {
73 Error::PublisherMismatch(input, output) => write!(f, "Publisher type mismatch: {} (cast) != {} (expected)", input, output),
74 Error::SubscriberMismatch(input, output) => write!(f, "Subscriber type mismatch: {} (cast) != {} (expected)", input, output),
75 }
76 }
77}
78
79/// A trait to define a [`CrosstalkPubSub`]
80///
81/// This is used to implement the [`CrosstalkPubSub`] trait
82/// using the [`crosstalk_macros::init!`] macro
83/// for the [`ImplementedBoundedNode`] struct
84///
85/// This is not meant to be used directly, and is automatically
86/// implemented when calling [`crosstalk_macros::init!`]
87pub trait CrosstalkPubSub<T> {
88 fn publisher<D: CrosstalkData>(&mut self, topic: T) -> Result<Publisher<D, T>, crate::Error>;
89
90 fn subscriber<D: CrosstalkData>(&mut self, topic: T) -> Result<Subscriber<D, T>, crate::Error>;
91
92 #[allow(clippy::type_complexity)]
93 fn pubsub<D: CrosstalkData>(&mut self, topic: T) -> Result<(Publisher<D, T>, Subscriber<D, T>), crate::Error>;
94}
95
96#[derive(Clone)]
97/// A [`BoundedNode`] is a node to spawn publishers and
98/// subscribers on, where the size of each buffer is
99/// fixed.
100///
101/// # Attributes
102///
103/// * `node` - the node to spawn publishers and subscribers on
104/// * `size` - the size of each buffer
105///
106/// # Type Parameters
107///
108/// * `T` - the topic enum name
109///
110/// # Examples
111///
112/// ```
113/// use crosstalk::AsTopic;
114///
115/// #[derive(AsTopic)]
116/// enum House {
117/// Bedroom,
118/// LivingRoom,
119/// Kitchen,
120/// Bathroom,
121/// }
122///
123/// crosstalk::init! {
124/// House::Bedroom => String,
125/// House::LivingRoom => String,
126/// House::Kitchen => Vec<f32>,
127/// House::Bathroom => u8,
128/// }
129///
130/// let mut node = crosstalk::BoundedNode::<House>::new(10);
131/// let (pub0, mut sub0) = node.pubsub_blocking(House::Bedroom).unwrap();
132/// let (pub1, mut sub1) = node.pubsub_blocking(House::Bedroom).unwrap();
133///
134/// pub0.write("Hello".to_string());
135/// pub0.write("World".to_string());
136/// pub1.write("Foo".to_string());
137/// pub1.write("Bar".to_string());
138///
139/// assert_eq!(sub1.try_read().unwrap(), "Hello");
140/// assert_eq!(sub1.try_read().unwrap(), "World");
141/// assert_eq!(sub1.try_read().unwrap(), "Foo");
142/// assert_eq!(sub1.try_read().unwrap(), "Bar");
143///
144/// assert_eq!(sub0.try_read().unwrap(), "Hello");
145/// assert_eq!(sub0.try_read().unwrap(), "World");
146/// assert_eq!(sub0.try_read().unwrap(), "Foo");
147/// assert_eq!(sub0.try_read().unwrap(), "Bar");
148/// ```
149pub struct BoundedNode<T> {
150 pub node: Arc<Mutex<ImplementedBoundedNode<T>>>,
151 pub size: usize,
152}
153/// [`BoundedNode`] implementation
154///
155/// This holds an [`Arc<Mutex<ImplementedBoundedNode<T>>>`], which
156/// references the true (private) node that implements the [`AsTopic`] trait.
157impl<T> BoundedNode<T>
158where
159 T: CrosstalkTopic,
160 ImplementedBoundedNode<T>: CrosstalkPubSub<T>,
161{
162 #[inline(always)]
163 /// Creates a new [`BoundedNode`]
164 ///
165 /// # Arguments
166 ///
167 /// * `size` - the size of each buffer
168 ///
169 /// # Panics
170 ///
171 /// Panics if `size` is 0. This is intentional because of two reasons:
172 ///
173 /// 1. No buffer can have a size of 0.
174 /// 2. A [`tokio::sync::broadcast::Sender`] cannot be created with a size of 0,
175 /// and therefore, a [`BoundedNode`] could potentially be created without
176 /// error but then calling [`BoundedNode::publisher`] or [`BoundedNode::subscriber`]
177 /// would result in a panic later on.
178 ///
179 /// # Examples
180 ///
181 /// ```
182 /// use crosstalk::AsTopic;
183 ///
184 /// #[derive(AsTopic)]
185 /// enum House {
186 /// Bedroom,
187 /// LivingRoom,
188 /// Kitchen,
189 /// Bathroom,
190 /// }
191 ///
192 /// crosstalk::init! {
193 /// House::Bedroom => String,
194 /// House::LivingRoom => String,
195 /// House::Kitchen => Vec<f32>,
196 /// House::Bathroom => u8,
197 /// }
198 ///
199 /// fn main() {
200 /// let node = crosstalk::BoundedNode::<House>::new(10);
201 /// let moved_node = node.clone();
202 /// std::thread::spawn(move || another_thread(moved_node));
203 /// assert_eq!(node.size, 10);
204 /// }
205 ///
206 /// fn another_thread(mut node: crosstalk::BoundedNode<House>) {
207 /// assert_eq!(node.size, 10);
208 /// }
209 /// ```
210 pub fn new(size: usize) -> Self {
211 if size == 0 {
212 panic!("Size must be greater than 0. Attempting to make `tokio::sync::broadcast::channels` later will result in a panic.");
213 }
214 Self {
215 node: Arc::new(Mutex::new(ImplementedBoundedNode::<T>::new(size))),
216 size,
217 }
218 }
219
220 #[inline(always)]
221 /// Creates a new publisher for the given topic `T`
222 ///
223 /// # Arguments
224 ///
225 /// * `topic` - the topic to create a publisher for
226 ///
227 /// # Returns
228 ///
229 /// A publisher for the topic `T`
230 ///
231 /// # Examples
232 ///
233 /// ```
234 /// use crosstalk::AsTopic;
235 ///
236 /// #[derive(AsTopic)]
237 /// enum House {
238 /// Bedroom,
239 /// LivingRoom,
240 /// Kitchen,
241 /// Bathroom,
242 /// }
243 ///
244 /// crosstalk::init! {
245 /// House::Bedroom => String,
246 /// House::LivingRoom => String,
247 /// House::Kitchen => Vec<f32>,
248 /// House::Bathroom => u8,
249 /// }
250 ///
251 /// #[tokio::main]
252 /// async fn main() {
253 /// let mut node = crosstalk::BoundedNode::<House>::new(10);
254 /// assert!(node.publisher::<String>(House::Bedroom).await.is_ok());
255 /// }
256 /// ```
257 pub async fn publisher<D: CrosstalkData>(&mut self, topic: T) -> Result<Publisher<D, T>, crate::Error> {
258 self.node.lock().await.publisher(topic)
259 }
260
261 #[inline(always)]
262 /// Creates a new publisher for the given topic `T`
263 ///
264 /// # Arguments
265 ///
266 /// * `topic` - the topic to create a publisher for
267 ///
268 /// # Returns
269 ///
270 /// A publisher for the topic `T`
271 ///
272 /// # Examples
273 ///
274 /// ```
275 /// use crosstalk::AsTopic;
276 ///
277 /// #[derive(AsTopic)]
278 /// enum House {
279 /// Bedroom,
280 /// LivingRoom,
281 /// Kitchen,
282 /// Bathroom,
283 /// }
284 ///
285 /// crosstalk::init! {
286 /// House::Bedroom => String,
287 /// House::LivingRoom => String,
288 /// House::Kitchen => Vec<f32>,
289 /// House::Bathroom => u8,
290 /// }
291 ///
292 /// let mut node = crosstalk::BoundedNode::<House>::new(10);
293 /// assert!(node.publisher_blocking::<String>(House::Bedroom).is_ok());
294 /// ```
295 pub fn publisher_blocking<D: CrosstalkData>(&mut self, topic: T) -> Result<Publisher<D, T>, crate::Error> {
296 self.node.blocking_lock().publisher(topic)
297 }
298
299 #[inline(always)]
300 /// Creates a new subscriber for the given topic `T`
301 ///
302 /// # Arguments
303 ///
304 /// * `topic` - the topic to create a subscriber for
305 ///
306 /// # Returns
307 ///
308 /// A subscriber for the topic `T`
309 ///
310 /// # Examples
311 ///
312 /// ```
313 /// use crosstalk::AsTopic;
314 ///
315 /// #[derive(AsTopic)]
316 /// enum House {
317 /// Bedroom,
318 /// LivingRoom,
319 /// Kitchen,
320 /// Bathroom,
321 /// }
322 ///
323 /// crosstalk::init! {
324 /// House::Bedroom => String,
325 /// House::LivingRoom => String,
326 /// House::Kitchen => Vec<f32>,
327 /// House::Bathroom => u8,
328 /// }
329 ///
330 /// #[tokio::main]
331 /// async fn main() {
332 /// let mut node = crosstalk::BoundedNode::<House>::new(10);
333 /// assert!(node.subscriber::<String>(House::Bedroom).await.is_ok());
334 /// }
335 /// ```
336 pub async fn subscriber<D: CrosstalkData>(&mut self, topic: T) -> Result<Subscriber<D, T>, crate::Error> {
337 self.node.lock().await.subscriber(topic)
338 }
339
340 #[inline(always)]
341 /// Creates a new subscriber for the given topic `T`
342 ///
343 /// # Arguments
344 ///
345 /// * `topic` - the topic to create a subscriber for
346 ///
347 /// # Returns
348 ///
349 /// A subscriber for the topic `T`
350 ///
351 /// # Examples
352 ///
353 /// ```
354 /// use crosstalk::AsTopic;
355 ///
356 /// #[derive(AsTopic)]
357 /// enum House {
358 /// Bedroom,
359 /// LivingRoom,
360 /// Kitchen,
361 /// Bathroom,
362 /// }
363 ///
364 /// crosstalk::init! {
365 /// House::Bedroom => String,
366 /// House::LivingRoom => String,
367 /// House::Kitchen => Vec<f32>,
368 /// House::Bathroom => u8,
369 /// }
370 ///
371 /// let mut node = crosstalk::BoundedNode::<House>::new(10);
372 /// assert!(node.subscriber_blocking::<String>(House::Bedroom).is_ok());
373 /// ```
374 pub fn subscriber_blocking<D: CrosstalkData>(&mut self, topic: T) -> Result<Subscriber<D, T>, crate::Error> {
375 self.node.blocking_lock().subscriber(topic)
376 }
377
378 #[inline(always)]
379 /// Creates a new publisher and subscriber for the given topic `T`
380 ///
381 /// # Arguments
382 ///
383 /// * `topic` - the topic to create a publisher and subscriber for
384 ///
385 /// # Returns
386 ///
387 /// A publisher and subscriber for the topic `T`
388 ///
389 /// # Examples
390 ///
391 /// ```
392 /// use crosstalk::AsTopic;
393 ///
394 /// #[derive(AsTopic)]
395 /// enum House {
396 /// Bedroom,
397 /// LivingRoom,
398 /// Kitchen,
399 /// Bathroom,
400 /// }
401 ///
402 /// crosstalk::init! {
403 /// House::Bedroom => String,
404 /// House::LivingRoom => String,
405 /// House::Kitchen => Vec<f32>,
406 /// House::Bathroom => u8,
407 /// }
408 ///
409 /// #[tokio::main]
410 /// async fn main() {
411 /// let mut node = crosstalk::BoundedNode::<House>::new(10);
412 /// let (publisher, mut subscriber) = node.pubsub(House::Bedroom).await.unwrap();
413 /// publisher.write("hello".to_string());
414 /// assert_eq!(subscriber.try_read().unwrap(), "hello");
415 /// }
416 /// ```
417 ///
418 pub async fn pubsub<D: CrosstalkData>(&mut self, topic: T) -> Result<(Publisher<D, T>, Subscriber<D, T>), crate::Error> {
419 self.node.lock().await.pubsub(topic)
420 }
421
422 #[inline(always)]
423 #[allow(clippy::type_complexity)]
424 /// Creates a new publisher and subscriber for the given topic `T`
425 ///
426 /// # Arguments
427 ///
428 /// * `topic` - the topic to create a publisher and subscriber for
429 ///
430 /// # Returns
431 ///
432 /// A publisher and subscriber for the topic `T`
433 ///
434 /// # Examples
435 ///
436 /// ```
437 /// use crosstalk::AsTopic;
438 ///
439 /// #[derive(AsTopic)]
440 /// enum House {
441 /// Bedroom,
442 /// LivingRoom,
443 /// Kitchen,
444 /// Bathroom,
445 /// }
446 ///
447 /// crosstalk::init! {
448 /// House::Bedroom => String,
449 /// House::LivingRoom => String,
450 /// House::Kitchen => Vec<f32>,
451 /// House::Bathroom => u8,
452 /// }
453 ///
454 /// let mut node = crosstalk::BoundedNode::<House>::new(10);
455 /// let (publisher, mut subscriber) = node.pubsub_blocking(House::Bedroom).unwrap();
456 /// publisher.write("hello".to_string());
457 /// assert_eq!(subscriber.try_read().unwrap(), "hello");
458 /// ```
459 ///
460 pub fn pubsub_blocking<D: CrosstalkData>(&mut self, topic: T) -> Result<(Publisher<D, T>, Subscriber<D, T>), crate::Error> {
461 self.node.blocking_lock().pubsub(topic)
462 }
463}
464
465/// The inner implementation of the node,
466/// which implements the [`AsTopic`] trait
467///
468/// This is auto-generated by the [`crosstalk_macros::init!`] macro
469///
470/// # Attributes
471///
472/// * `senders` - the senders of the node
473/// * `size` - the size of each buffer
474pub struct ImplementedBoundedNode<T> {
475 pub senders: HashMap<T, Box<dyn std::any::Any + 'static>>,
476 pub size: usize,
477}
478
479/// [`ImplementedBoundedNode`] implementation of [`Send`]
480unsafe impl<T> Send for ImplementedBoundedNode<T> {}
481/// [`ImplementedBoundedNode`] implementation of [`Sync`]
482unsafe impl<T> Sync for ImplementedBoundedNode<T> {}
483
484/// [`ImplementedBoundedNode`] implementation
485impl<T> ImplementedBoundedNode<T>
486where
487 T: CrosstalkTopic,
488{
489 /// See [`BoundedNode::new`]
490 ///
491 /// # Arguments
492 ///
493 /// * `size` - the size of each buffer
494 pub fn new(size: usize) -> Self {
495 Self {
496 senders: HashMap::new(),
497 size,
498 }
499 }
500}
501
502#[derive(Clone)]
503/// A `crosstalk` [`Publisher`]
504///
505/// # Attributes
506///
507/// * `topic` - the topic of the publisher
508/// * `buf` - the buffer which broadcasts the data
509///
510/// # Type Parameters
511///
512/// * `T` - the topic of the publisher
513/// * `D` - the data type of the publisher
514///
515/// This is not meant to be used directly, please
516/// use the [`crosstalk_macros::init!`] macro instead
517/// and produce a [`Publisher`] with [`BoundedNode::publisher`]
518/// or [`BoundedNode::pubsub`]
519pub struct Publisher<D, T> {
520 pub topic: T,
521 buf: TokioSender<D>,
522}
523/// [`Publisher`] implementation
524impl<D, T> Publisher<D, T> {
525 #[inline(always)]
526 /// See [`BoundedNode::publisher`]
527 pub fn new(topic: T, buf: TokioSender<D>) -> Self {
528 Self { topic, buf }
529 }
530
531 #[inline(always)]
532 /// Publishes data to a topic, broadcasting it to all subscribers
533 ///
534 /// # Arguments
535 ///
536 /// * `sample` - the sample to publish
537 ///
538 /// # Examples
539 ///
540 /// ```
541 /// use crosstalk::AsTopic;
542 ///
543 /// #[derive(AsTopic)]
544 /// enum House {
545 /// Bedroom,
546 /// LivingRoom,
547 /// Kitchen,
548 /// Bathroom,
549 /// }
550 ///
551 /// crosstalk::init! {
552 /// House::Bedroom => String,
553 /// House::LivingRoom => String,
554 /// House::Kitchen => Vec<f32>,
555 /// House::Bathroom => u8,
556 /// }
557 ///
558 /// let mut node = crosstalk::BoundedNode::<House>::new(10);
559 /// let (publisher, mut subscriber) = node.pubsub_blocking(House::Bedroom).unwrap();
560 /// publisher.write("hello".to_string());
561 /// std::thread::spawn(move || {
562 /// assert_eq!(subscriber.try_read().unwrap(), "hello");
563 /// });
564 /// ```
565 pub fn write(&self, sample: D) {
566 let _ = self.buf.send(sample);
567 }
568}
569
570/// A `crosstalk` [`Subscriber`]
571///
572/// # Attributes
573///
574/// * `topic` - the topic of the subscriber
575/// * `rcvr` - the receiver of the subscriber
576/// * `sndr` - the sender for the topic. This is used to spawn multiple receivers upon [`Subscriber::clone`]
577///
578/// # Type Parameters
579///
580/// * `T` - the topic of the subscriber
581/// * `D` - the data type of the subscriber
582///
583/// This is not meant to be used directly, please
584/// use the [`crosstalk_macros::init!`] macro instead
585/// and produce a [`Subscriber`] with [`BoundedNode::subscriber`]
586/// or [`BoundedNode::pubsub`]
587pub struct Subscriber<D, T> {
588 pub topic: T,
589 rcvr: Receiver<D>,
590 sndr: Arc<TokioSender<D>>,
591}
592/// [`Subscriber`] implementation
593impl<D: Clone, T: Clone> Subscriber<D, T> {
594 #[inline(always)]
595 /// See [`BoundedNode::subscriber`]
596 pub fn new(
597 topic: T,
598 rcvr: Option<TokioReceiver<D>>,
599 sndr: Arc<TokioSender<D>>,
600 ) -> Self {
601 Self {
602 topic,
603 rcvr: Receiver::new(rcvr.unwrap_or(sndr.subscribe())),
604 sndr: sndr.clone(),
605 }
606 }
607
608 #[inline(always)]
609 /// Asynchronous blocking read from the [`TokioReceiver`]
610 ///
611 /// The sequential equivalent to this function is [`Subscriber::read_blocking`]
612 /// which can be used outside of an asynchronous context
613 ///
614 /// # Examples
615 ///
616 /// ```
617 /// use crosstalk::AsTopic;
618 ///
619 /// #[derive(AsTopic)]
620 /// enum House {
621 /// Bedroom,
622 /// LivingRoom,
623 /// Kitchen,
624 /// Bathroom,
625 /// }
626 ///
627 /// crosstalk::init! {
628 /// House::Bedroom => String,
629 /// House::LivingRoom => String,
630 /// House::Kitchen => Vec<f32>,
631 /// House::Bathroom => u8,
632 /// }
633 ///
634 /// #[tokio::main]
635 /// async fn main() {
636 /// let mut node = crosstalk::BoundedNode::<House>::new(10);
637 /// let (publisher, mut subscriber) = node.pubsub(House::Bedroom).await.unwrap();
638 /// publisher.write("hello".to_string());
639 /// assert_eq!(subscriber.read().await, Some("hello".to_string()));
640 /// }
641 /// ```
642 pub async fn read(&mut self) -> Option<D> {
643 self.rcvr.read().await
644 }
645
646 #[inline(always)]
647 /// Non-blocking read from [`TokioReceiver`]
648 /// Upon immediate failure, [`None`] will be returned
649 ///
650 /// Difference between this function and [`Subscriber::try_read_raw`]
651 /// is that this function continuously loops upon [`tokio`] error of
652 /// [`tokio::sync::broadcast::error::TryRecvError::Lagged`], looping
653 /// until a valid message is received OR the buffer is determined to be
654 /// empty
655 ///
656 /// [`Subscriber::try_read_raw`] will return [`None`] upon
657 /// [`tokio::sync::broadcast::error::TryRecvError::Lagged`], which
658 /// can cause some unexpected behavior
659 ///
660 /// # Examples
661 ///
662 /// ```
663 /// use crosstalk::AsTopic;
664 ///
665 /// #[derive(AsTopic)]
666 /// enum House {
667 /// Bedroom,
668 /// LivingRoom,
669 /// Kitchen,
670 /// Bathroom,
671 /// }
672 ///
673 /// crosstalk::init! {
674 /// House::Bedroom => String,
675 /// House::LivingRoom => String,
676 /// House::Kitchen => Vec<f32>,
677 /// House::Bathroom => u8,
678 /// }
679 ///
680 /// let mut node = crosstalk::BoundedNode::<House>::new(10);
681 /// let (publisher, mut subscriber) = node.pubsub_blocking(House::Bedroom).unwrap();
682 /// publisher.write("hello".to_string());
683 /// assert_eq!(subscriber.try_read(), Some(String::from("hello")));
684 /// assert_eq!(subscriber.try_read(), None);
685 /// ```
686 pub fn try_read(&mut self) -> Option<D> {
687 self.rcvr.try_read()
688 }
689
690 #[inline(always)]
691 /// Non-blocking read from [`TokioReceiver`], returning
692 /// [`None`] if there are no messages available or if
693 /// [`tokio::sync::broadcast::error::TryRecvError::Lagged`] occurs.
694 ///
695 /// This function can cause some unexpected behavior. It is recommended
696 /// to use [`Subscriber::try_read`] instead.
697 ///
698 /// # Examples
699 ///
700 /// ```
701 /// use crosstalk::AsTopic;
702 ///
703 /// #[derive(AsTopic)]
704 /// enum House {
705 /// Bedroom,
706 /// LivingRoom,
707 /// Kitchen,
708 /// Bathroom,
709 /// }
710 ///
711 /// crosstalk::init! {
712 /// House::Bedroom => String,
713 /// House::LivingRoom => String,
714 /// House::Kitchen => Vec<f32>,
715 /// House::Bathroom => u8,
716 /// }
717 ///
718 /// let mut node = crosstalk::BoundedNode::<House>::new(10);
719 /// let (publisher, mut subscriber) = node.pubsub_blocking(House::Bedroom).unwrap();
720 /// publisher.write("hello".to_string());
721 /// assert_eq!(subscriber.try_read_raw(), Some(String::from("hello")));
722 /// assert_eq!(subscriber.try_read_raw(), None);
723 /// ```
724 pub fn try_read_raw(&mut self) -> Option<D> {
725 self.rcvr.try_read_raw()
726 }
727
728 #[inline(always)]
729 /// Sequential blocking read from [`TokioReceiver`]
730 ///
731 /// The asynchronous equivalent to this function is [`Subscriber::read`],
732 /// which must be used in an asynchronous context with `.await`
733 ///
734 /// # Examples
735 ///
736 /// ```
737 /// use crosstalk::AsTopic;
738 ///
739 /// #[derive(AsTopic)]
740 /// enum House {
741 /// Bedroom,
742 /// LivingRoom,
743 /// Kitchen,
744 /// Bathroom,
745 /// }
746 ///
747 /// crosstalk::init! {
748 /// House::Bedroom => String,
749 /// House::LivingRoom => String,
750 /// House::Kitchen => Vec<f32>,
751 /// House::Bathroom => u8,
752 /// }
753 ///
754 /// let mut node = crosstalk::BoundedNode::<House>::new(10);
755 /// let (publisher, mut subscriber) = node.pubsub_blocking(House::Bedroom).unwrap();
756 /// publisher.write("hello".to_string());
757 /// assert_eq!(subscriber.read_blocking(), Some("hello".to_string()));
758 /// ```
759 pub fn read_blocking(&mut self) -> Option<D> {
760 self.rcvr.read_blocking()
761 }
762
763 #[inline(always)]
764 /// Asynchronous non-blocking read from [`TokioReceiver`]
765 /// with a given timeout. After the timeout if there are no messages,
766 /// returns [`None`].
767 ///
768 /// # Examples
769 ///
770 /// ```
771 /// use crosstalk::AsTopic;
772 ///
773 /// #[derive(AsTopic)]
774 /// enum House {
775 /// Bedroom,
776 /// LivingRoom,
777 /// Kitchen,
778 /// Bathroom,
779 /// }
780 ///
781 /// crosstalk::init! {
782 /// House::Bedroom => String,
783 /// House::LivingRoom => String,
784 /// House::Kitchen => Vec<f32>,
785 /// House::Bathroom => u8,
786 /// }
787 ///
788 /// #[tokio::main]
789 /// async fn main() {
790 /// let mut node = crosstalk::BoundedNode::<House>::new(10);
791 /// let (publisher, mut subscriber) = node.pubsub(House::Bedroom).await.unwrap();
792 /// publisher.write("hello".to_string());
793 /// assert_eq!(subscriber.read_timeout(std::time::Duration::from_millis(100)).await, Some("hello".to_string()));
794 /// }
795 /// ```
796 pub async fn read_timeout(&mut self, timeout: std::time::Duration) -> Option<D> {
797 self.rcvr.read_timeout(timeout).await
798 }
799}
800/// [`Subscriber`] implementation of [`Clone`]
801impl<D: Clone, T: Clone> Clone for Subscriber<D, T> {
802 #[inline(always)]
803 /// Clones a [`Subscriber`]
804 ///
805 /// # Examples
806 ///
807 /// ```
808 /// use crosstalk::AsTopic;
809 ///
810 /// #[derive(AsTopic)]
811 /// enum House {
812 /// Bedroom,
813 /// LivingRoom,
814 /// Kitchen,
815 /// Bathroom,
816 /// }
817 ///
818 /// crosstalk::init! {
819 /// House::Bedroom => String,
820 /// House::LivingRoom => String,
821 /// House::Kitchen => Vec<f32>,
822 /// House::Bathroom => u8,
823 /// }
824 ///
825 /// let mut node = crosstalk::BoundedNode::<House>::new(10);
826 /// let (publisher, mut subscriber) = node.pubsub_blocking(House::Bedroom).unwrap();
827 /// let mut subscriber_2 = subscriber.clone();
828 /// publisher.write("hello".to_string());
829 /// assert_eq!(subscriber.try_read().unwrap(), "hello");
830 /// assert_eq!(subscriber_2.try_read().unwrap(), "hello");
831 /// ```
832 fn clone(&self) -> Self {
833 Self {
834 topic: self.topic.clone(),
835 rcvr: Receiver::new(self.sndr.subscribe()),
836 sndr: self.sndr.clone(),
837 }
838 }
839}
840
841/// Receiver
842///
843/// Define a receiver for subscribing messages
844///
845/// Reads from [`TokioReceiver`]
846struct Receiver<D> {
847 buf: TokioReceiver<D>,
848}
849/// [`Receiver`] implementation
850impl<D: Clone> Receiver<D>{
851 #[inline(always)]
852 /// Constructs a new [`Receiver`]
853 pub fn new(
854 buf: TokioReceiver<D>,
855 ) -> Self {
856 Self { buf }
857 }
858
859 /// Reads from [`TokioReceiver`]
860 ///
861 /// This struct/function is not meant to be used directly,
862 /// rather through the [`Subscriber`] struct with [`Subscriber::read`]
863 async fn read(&mut self) -> Option<D> {
864 loop {
865 match self.buf.recv().await {
866 Ok(res) => return Some(res),
867 Err(e) => match e {
868 tokio::sync::broadcast::error::RecvError::Lagged(_) => { continue; }
869
870 #[cfg(not(any(feature = "log", feature = "tracing")))]
871 _ => return None,
872
873 #[cfg(any(feature = "log", feature = "tracing"))]
874 _ => {
875 #[cfg(feature = "log")]
876 log::error!("{}", e);
877 #[cfg(feature = "tracing")]
878 tracing::error!("{}", e);
879 return None
880 }
881 }
882 }
883 }
884 }
885
886 /// Reads from [`TokioReceiver`]
887 ///
888 /// This struct/function is not meant to be used directly,
889 /// rather through the [`Subscriber`] struct with [`Subscriber::try_read`]
890 fn try_read(&mut self) -> Option<D> {
891 loop {
892 match self.buf.try_recv() {
893 Ok(d) => return Some(d),
894 Err(e) => {
895 match e {
896 tokio::sync::broadcast::error::TryRecvError::Lagged(_) => { continue; },
897
898 #[cfg(not(any(feature = "log", feature = "tracing")))]
899 _ => return None,
900
901 #[cfg(any(feature = "log", feature = "tracing"))]
902 _ => {
903 #[cfg(feature = "log")]
904 log::error!("{}", e);
905 #[cfg(feature = "tracing")]
906 tracing::error!("{}", e);
907 return None
908 },
909 }
910 },
911 }
912 }
913 }
914
915 /// Reads from [`TokioReceiver`]
916 ///
917 /// This struct/function is not meant to be used directly,
918 /// rather through the [`Subscriber`] struct with [`Subscriber::try_read_raw`]
919 fn try_read_raw(&mut self) -> Option<D> {
920 match self.buf.try_recv() {
921 Ok(d) => Some(d),
922
923 #[cfg(not(any(feature = "log", feature = "tracing")))]
924 Err(_) => None,
925
926 #[cfg(any(feature = "log", feature = "tracing"))]
927 Err(e) => {
928 #[cfg(feature = "log")]
929 log::error!("{}", e);
930 #[cfg(feature = "tracing")]
931 tracing::error!("{}", e);
932 None
933 },
934 }
935 }
936
937 /// Reads from [`TokioReceiver`]
938 ///
939 /// This struct/function is not meant to be used directly,
940 /// rather through the [`Subscriber`] struct with [`Subscriber::read_blocking`]
941 fn read_blocking(&mut self) -> Option<D> {
942 loop {
943 match self.buf.blocking_recv() {
944 Ok(res) => return Some(res),
945 Err(e) => match e {
946 tokio::sync::broadcast::error::RecvError::Lagged(_) => { continue; }
947
948 #[cfg(not(any(feature = "log", feature = "tracing")))]
949 _ => return None,
950
951 #[cfg(any(feature = "log", feature = "tracing"))]
952 _ => {
953 #[cfg(feature = "log")]
954 log::error!("{}", e);
955 #[cfg(feature = "tracing")]
956 tracing::error!("{}", e);
957 return None
958 },
959 }
960 }
961 }
962 }
963
964 /// Reads from [`TokioReceiver`]
965 ///
966 /// This struct/function is not meant to be used directly,
967 /// rather through the [`Subscriber`] struct with [`Subscriber::read_timeout`]
968 async fn read_timeout(&mut self, timeout: std::time::Duration) -> Option<D> {
969 match tokio::runtime::Handle::try_current() {
970 Ok(_) => {
971 match tokio::time::timeout(timeout, self.buf.recv()).await {
972 Ok(res) => {
973 match res {
974 Ok(res) => Some(res),
975
976 #[cfg(not(any(feature = "log", feature = "tracing")))]
977 Err(_) => None,
978
979 #[cfg(any(feature = "log", feature = "tracing"))]
980 Err(e) => {
981 #[cfg(feature = "log")]
982 log::error!("{}", e);
983 #[cfg(feature = "tracing")]
984 tracing::error!("{}", e);
985 None
986 },
987 }
988 },
989
990 #[cfg(not(any(feature = "log", feature = "tracing")))]
991 Err(_) => None,
992
993 #[cfg(any(feature = "log", feature = "tracing"))]
994 Err(e) => {
995 #[cfg(feature = "log")]
996 log::error!("{}", e);
997 #[cfg(feature = "tracing")]
998 tracing::error!("{}", e);
999 None
1000 },
1001 }
1002 },
1003
1004 #[cfg(not(any(feature = "log", feature = "tracing")))]
1005 Err(_) => None,
1006
1007 #[cfg(any(feature = "log", feature = "tracing"))]
1008 Err(e) => {
1009 #[cfg(feature = "log")]
1010 log::error!("{}", e);
1011 #[cfg(feature = "tracing")]
1012 tracing::error!("{}", e);
1013 None
1014 },
1015 }
1016 }
1017}
1018
1019// --------------------------------------------------
1020// for testing
1021// --------------------------------------------------
1022#[allow(unused_imports)]
1023use crosstalk_macros::init_test;
1024#[allow(unused_imports)]
1025use crosstalk_macros::AsTopicTest;
1026
1027#[cfg(test)]
1028mod tests {
1029 use super::*;
1030
1031 #[derive(AsTopicTest)]
1032 enum TestTopic {
1033 A,
1034 B,
1035 C,
1036 }
1037 super::init_test! {
1038 TestTopic::A => String,
1039 TestTopic::B => bool,
1040 TestTopic::C => i32,
1041 }
1042
1043 #[derive(AsTopicTest)]
1044 enum AnotherTestTopic {
1045 Foo,
1046 Bar,
1047 }
1048 super::init_test! {
1049 AnotherTestTopic::Foo => Vec<String>,
1050 AnotherTestTopic::Bar => Vec<bool>,
1051 }
1052
1053 #[test]
1054 fn test_single_pubsub_blocking() {
1055 let mut node = BoundedNode::<TestTopic>::new(10);
1056 let (publisher, mut subscriber) = node.pubsub_blocking(TestTopic::A).unwrap();
1057 publisher.write("test".to_string());
1058 assert_eq!(subscriber.try_read().unwrap(), "test");
1059 }
1060
1061 #[test]
1062 fn test_multiple_subscribers_blocking() {
1063 let mut node = BoundedNode::<TestTopic>::new(10);
1064 let (publisher, mut sub1) = node.pubsub_blocking(TestTopic::A).unwrap();
1065 let mut sub2 = node.subscriber_blocking::<String>(TestTopic::A).unwrap();
1066
1067 publisher.write("hello".to_string());
1068 assert_eq!(sub1.try_read().unwrap(), "hello");
1069 assert_eq!(sub2.try_read().unwrap(), "hello");
1070 }
1071
1072 #[test]
1073 fn test_cross_topic_isolation() {
1074 let mut node = BoundedNode::<TestTopic>::new(10);
1075 let (pub_a, mut sub_a) = node.pubsub_blocking(TestTopic::A).unwrap();
1076 let (pub_b, mut sub_b) = node.pubsub_blocking(TestTopic::B).unwrap();
1077
1078 pub_a.write("string".to_string());
1079 pub_b.write(true);
1080
1081 assert_eq!(sub_a.try_read().unwrap(), "string");
1082 assert!(sub_b.try_read().unwrap());
1083 assert!(sub_a.try_read().is_none());
1084 assert!(sub_b.try_read().is_none());
1085 }
1086
1087 #[test]
1088 fn test_multiple_threads_blocking() {
1089 let mut node = BoundedNode::<TestTopic>::new(10);
1090 let (publisher, mut subscriber) = node.pubsub_blocking(TestTopic::A).unwrap();
1091
1092 let handle = std::thread::spawn(move || {
1093 publisher.write("threaded".to_string());
1094 });
1095
1096 handle.join().unwrap();
1097 assert_eq!(subscriber.try_read().unwrap(), "threaded");
1098 }
1099
1100 #[tokio::test]
1101 async fn test_async_pubsub_single_runtime() {
1102 let mut node = BoundedNode::<TestTopic>::new(10);
1103 let (publisher, mut subscriber) = node.pubsub(TestTopic::A).await.unwrap();
1104 publisher.write("async".to_string());
1105 assert_eq!(subscriber.read().await.unwrap(), "async");
1106 }
1107
1108 #[test]
1109 fn test_high_volume_blocking() {
1110 let mut node = BoundedNode::<TestTopic>::new(100);
1111 let (publisher, mut subscriber) = node.pubsub_blocking(TestTopic::C).unwrap();
1112
1113 for i in 0..100 {
1114 publisher.write(i);
1115 }
1116
1117 for i in 0..100 {
1118 assert_eq!(subscriber.try_read().unwrap(), i);
1119 }
1120 assert!(subscriber.try_read().is_none());
1121 }
1122
1123 #[test]
1124 fn test_cloned_subscribers() {
1125 let mut node = BoundedNode::<TestTopic>::new(10);
1126 let (publisher, mut sub1) = node.pubsub_blocking(TestTopic::A).unwrap();
1127 let mut sub2 = sub1.clone();
1128
1129 publisher.write("clone".to_string());
1130 assert_eq!(sub1.try_read().unwrap(), "clone");
1131 assert_eq!(sub2.try_read().unwrap(), "clone");
1132 }
1133
1134 #[test]
1135 fn test_buffer_overflow_handling() {
1136 let mut node = BoundedNode::<TestTopic>::new(2);
1137 let (publisher, mut subscriber) = node.pubsub_blocking(TestTopic::A).unwrap();
1138
1139 publisher.write("msg1".to_string());
1140 publisher.write("msg2".to_string());
1141 publisher.write("msg3".to_string());
1142
1143 assert_eq!(subscriber.try_read().unwrap(), "msg2");
1144 assert_eq!(subscriber.try_read().unwrap(), "msg3");
1145 assert!(subscriber.try_read().is_none());
1146 }
1147
1148 #[test]
1149 fn test_type_mismatch_errors() {
1150 let mut node = BoundedNode::<TestTopic>::new(10);
1151
1152 let publisher_res = node.publisher_blocking::<i32>(TestTopic::A);
1153 assert!(matches!(publisher_res, Err(Error::PublisherMismatch(_, _))));
1154
1155 let subscriber_res = node.subscriber_blocking::<i32>(TestTopic::A);
1156 assert!(matches!(subscriber_res, Err(Error::SubscriberMismatch(_, _))));
1157 }
1158
1159 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1160 async fn test_multiple_async_runtimes() {
1161 let mut node = BoundedNode::<TestTopic>::new(10);
1162 let (publisher, mut subscriber) = node.pubsub(TestTopic::A).await.unwrap();
1163
1164 let handle = tokio::spawn(async move {
1165 publisher.write("async".to_string());
1166 });
1167
1168 handle.await.unwrap();
1169 assert_eq!(subscriber.read().await.unwrap(), "async");
1170 }
1171
1172 #[test]
1173 fn test_mixed_async_blocking() {
1174 let mut node = BoundedNode::<TestTopic>::new(10);
1175 let (publisher, mut subscriber) = node.pubsub_blocking(TestTopic::A).unwrap();
1176
1177 let rt = tokio::runtime::Runtime::new().unwrap();
1178 rt.block_on(async {
1179 publisher.write("mixed".to_string());
1180 });
1181
1182 assert_eq!(subscriber.try_read().unwrap(), "mixed");
1183 }
1184
1185 #[test]
1186 fn test_complex_data_types() {
1187 let mut node = BoundedNode::<AnotherTestTopic>::new(10);
1188 let (pub_foo, mut sub_foo) = node.pubsub_blocking(AnotherTestTopic::Foo).unwrap();
1189 let (pub_bar, mut sub_bar) = node.pubsub_blocking(AnotherTestTopic::Bar).unwrap();
1190
1191 pub_foo.write(vec!["a".to_string(), "b".to_string()]);
1192 pub_bar.write(vec![true, false]);
1193
1194 assert_eq!(sub_foo.try_read().unwrap(), vec!["a", "b"]);
1195 assert_eq!(sub_bar.try_read().unwrap(), vec![true, false]);
1196 }
1197
1198 #[test]
1199 fn test_concurrent_publishers() {
1200 let mut node = BoundedNode::<TestTopic>::new(100);
1201 let (pub1, mut sub) = node.pubsub_blocking(TestTopic::C).unwrap();
1202 let pub2 = node.publisher_blocking::<i32>(TestTopic::C).unwrap();
1203
1204 let handle1 = std::thread::spawn(move || {
1205 for i in 0..50 {
1206 pub1.write(i);
1207 }
1208 });
1209
1210 let handle2 = std::thread::spawn(move || {
1211 for i in 50..100 {
1212 pub2.write(i);
1213 }
1214 });
1215
1216 handle1.join().unwrap();
1217 handle2.join().unwrap();
1218
1219 let mut received = Vec::new();
1220 while let Some(msg) = sub.try_read() {
1221 received.push(msg);
1222 }
1223 assert_eq!(received.len(), 100);
1224 }
1225
1226 #[tokio::test]
1227 async fn test_async_subscriber_cloning() {
1228 let mut node = BoundedNode::<TestTopic>::new(10);
1229 let (publisher, mut sub1) = node.pubsub(TestTopic::A).await.unwrap();
1230 let mut sub2 = sub1.clone();
1231
1232 publisher.write("async_clone".to_string());
1233 assert_eq!(sub1.read().await.unwrap(), "async_clone");
1234 assert_eq!(sub2.read().await.unwrap(), "async_clone");
1235 }
1236
1237 #[test]
1238 fn test_dropped_publisher_behavior() {
1239 let mut node = BoundedNode::<TestTopic>::new(10);
1240 let (publisher, mut subscriber) = node.pubsub_blocking::<String>(TestTopic::A).unwrap();
1241 drop(publisher);
1242 assert!(subscriber.try_read().is_none());
1243 }
1244
1245 #[tokio::test]
1246 async fn test_multiple_async_publishers() {
1247 const LOOP_COUNT: usize = 10;
1248 const BUFFER_SIZE: usize = 10;
1249 let mut node = BoundedNode::<TestTopic>::new(BUFFER_SIZE);
1250 let (publisher, mut subscriber) = node.pubsub(TestTopic::A).await.unwrap();
1251 let publisher_1 = publisher.clone();
1252 let publisher_2 = publisher.clone();
1253
1254 let task1 = tokio::spawn({
1255 async move {
1256 for _ in 0..LOOP_COUNT {
1257 publisher_1.write("task1".to_string());
1258 }
1259 }
1260 });
1261
1262 let task2 = tokio::spawn({
1263 async move {
1264 for _ in 0..LOOP_COUNT {
1265 publisher_2.write("task2".to_string());
1266 }
1267 }
1268 });
1269
1270 let _ = tokio::join!(task1, task2);
1271
1272 let mut task1_count = 0;
1273 let mut task2_count = 0;
1274 for _ in 0..BUFFER_SIZE {
1275 let msg = subscriber.read().await.unwrap();
1276 if msg == "task1" { task1_count += 1; }
1277 if msg == "task2" { task2_count += 1; }
1278 }
1279 assert_eq!(task1_count + task2_count, BUFFER_SIZE);
1280 }
1281
1282 #[test]
1283 fn test_blocking_read_with_delay() {
1284 let mut node = BoundedNode::<TestTopic>::new(10);
1285 let (publisher, mut subscriber) = node.pubsub_blocking(TestTopic::A).unwrap();
1286
1287 std::thread::spawn(move || {
1288 std::thread::sleep(std::time::Duration::from_millis(500));
1289 publisher.write("delayed".to_string());
1290 });
1291
1292 assert_eq!(subscriber.read_blocking().unwrap(), "delayed");
1293 }
1294
1295 #[tokio::test]
1296 async fn test_read_timeout_behavior() {
1297 let mut node = BoundedNode::<TestTopic>::new(10);
1298 let (publisher, mut subscriber) = node.pubsub(TestTopic::A).await.unwrap();
1299
1300 let timeout = std::time::Duration::from_millis(100);
1301 assert!(subscriber.read_timeout(timeout).await.is_none());
1302
1303 publisher.write("timeout_test".to_string());
1304 assert_eq!(subscriber.read_timeout(timeout).await.unwrap(), "timeout_test");
1305 }
1306
1307 #[test]
1308 fn test_multiple_topics_concurrently() {
1309 let mut node = BoundedNode::<AnotherTestTopic>::new(10);
1310 let (pub_foo, mut sub_foo) = node.pubsub_blocking(AnotherTestTopic::Foo).unwrap();
1311 let (pub_bar, mut sub_bar) = node.pubsub_blocking(AnotherTestTopic::Bar).unwrap();
1312
1313 let handle1 = std::thread::spawn(move || {
1314 pub_foo.write(vec!["thread".to_string()]);
1315 });
1316
1317 let handle2 = std::thread::spawn(move || {
1318 pub_bar.write(vec![true]);
1319 });
1320
1321 handle1.join().unwrap();
1322 handle2.join().unwrap();
1323
1324 assert_eq!(sub_foo.try_read().unwrap(), vec!["thread"]);
1325 assert_eq!(sub_bar.try_read().unwrap(), vec![true]);
1326 }
1327
1328 #[test]
1329 #[should_panic]
1330 fn test_zero_capacity_node() {
1331 let _ = BoundedNode::<TestTopic>::new(0);
1332 }
1333
1334 #[tokio::test]
1335 async fn test_async_unbounded_messaging() {
1336 let mut node = BoundedNode::<TestTopic>::new(1000);
1337 let (publisher, mut subscriber) = node.pubsub(TestTopic::A).await.unwrap();
1338
1339 let messages = vec!["msg1", "msg2", "msg3", "msg4", "msg5"];
1340 for msg in &messages {
1341 publisher.write(msg.to_string());
1342 }
1343
1344 for expected in messages {
1345 assert_eq!(subscriber.read().await.unwrap(), expected);
1346 }
1347 }
1348
1349 #[test]
1350 fn test_error_handling_lagged_messages() {
1351 let mut node = BoundedNode::<TestTopic>::new(2);
1352 let (publisher, mut subscriber) = node.pubsub_blocking(TestTopic::A).unwrap();
1353
1354 for i in 0..5 {
1355 publisher.write(format!("msg{}", i));
1356 }
1357
1358 let mut received = Vec::new();
1359 while let Some(msg) = subscriber.try_read() {
1360 received.push(msg);
1361 }
1362 assert_eq!(received, vec!["msg3", "msg4"]);
1363 }
1364}