crosstalk/
lib.rs

1#![doc(html_root_url = "https://docs.rs/crosstalk/1.0")]
2#![doc = include_str!("../README.md")]
3// --------------------------------------------------
4// external
5// --------------------------------------------------
6use std::sync::Arc;
7use tokio::sync::Mutex;
8use std::collections::HashMap;
9use tokio::sync::broadcast::{
10    Sender as TokioSender,
11    Receiver as TokioReceiver,
12};
13
14// --------------------------------------------------
15// local
16// --------------------------------------------------
17pub use crosstalk_macros::init;
18pub use crosstalk_macros::AsTopic;
19
20// --------------------------------------------------
21// re-exports
22// --------------------------------------------------
23pub mod __macro_exports {
24    pub use tokio::runtime;
25    pub use tokio::sync::broadcast;
26
27    #[inline(always)]
28    /// Downcasts a [`Box`] into a type `T`
29    /// 
30    /// # Arguments
31    /// 
32    /// * `buf` - the [`Box`] to downcast
33    /// 
34    /// # Examples
35    /// 
36    /// ```
37    /// use crosstalk::__macro_exports::downcast;
38    /// 
39    /// let mut buf = Box::new(5) as Box<dyn std::any::Any + 'static>;
40    /// assert_eq!(downcast::<i32>(buf, crosstalk::Error::PublisherMismatch("foo", "bar")).unwrap(), 5);
41    /// ```
42    pub fn downcast<T>(buf: Box<dyn std::any::Any + 'static>, on_error: crate::Error) -> Result<T, crate::Error>
43    where
44        T: 'static,
45    {
46        match buf.downcast::<T>() {
47            Ok(t) => Ok(*t),
48            Err(_) => Err(on_error),
49        }
50    }
51}
52
53/// A trait bound an enum as a [`CrosstalkTopic`]
54pub trait CrosstalkTopic: Eq + Copy + Clone + PartialEq + std::hash::Hash {}
55
56/// A trait to bound a datatype as a [`CrosstalkData`]
57pub trait CrosstalkData: Clone + Send + 'static {}
58/// [`CrosstalkData`] implementation for all types
59impl<T: Clone + Send + 'static> CrosstalkData for T {}
60
61#[derive(Copy, Clone, Debug)]
62/// [`crosstalk`](crate) errors
63pub enum Error {
64    PublisherMismatch(&'static str, &'static str),
65    SubscriberMismatch(&'static str, &'static str),
66}
67/// [`crosstalk::Error`](crate::Error) implementation of [`std::error::Error`]
68impl std::error::Error for Error {}
69/// [`crosstalk::Error`](crate::Error) implementation of [`std::fmt::Display`]
70impl std::fmt::Display for Error {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        match self {
73            Error::PublisherMismatch(input, output) => write!(f, "Publisher type mismatch: {} (cast) != {} (expected)", input, output),
74            Error::SubscriberMismatch(input, output) => write!(f, "Subscriber type mismatch: {} (cast) != {} (expected)", input, output),
75        }
76    }
77}
78
79/// A trait to define a [`CrosstalkPubSub`]
80/// 
81/// This is used to implement the [`CrosstalkPubSub`] trait
82/// using the [`crosstalk_macros::init!`] macro
83/// for the [`ImplementedBoundedNode`] struct
84/// 
85/// This is not meant to be used directly, and is automatically
86/// implemented when calling [`crosstalk_macros::init!`]
87pub trait CrosstalkPubSub<T> {
88    fn publisher<D: CrosstalkData>(&mut self, topic: T) -> Result<Publisher<D, T>, crate::Error>;
89    
90    fn subscriber<D: CrosstalkData>(&mut self, topic: T) -> Result<Subscriber<D, T>, crate::Error>;
91    
92    #[allow(clippy::type_complexity)]
93    fn pubsub<D: CrosstalkData>(&mut self, topic: T) -> Result<(Publisher<D, T>, Subscriber<D, T>), crate::Error>;
94}
95
96#[derive(Clone)]
97/// A [`BoundedNode`] is a node to spawn publishers and
98/// subscribers on, where the size of each buffer is
99/// fixed.
100/// 
101/// # Attributes
102/// 
103/// * `node` - the node to spawn publishers and subscribers on
104/// * `size` - the size of each buffer
105/// 
106/// # Type Parameters
107/// 
108/// * `T` - the topic enum name
109/// 
110/// # Examples
111/// 
112/// ```
113/// use crosstalk::AsTopic;
114/// 
115/// #[derive(AsTopic)]
116/// enum House {
117///     Bedroom,
118///     LivingRoom,
119///     Kitchen,
120///     Bathroom,
121/// }
122/// 
123/// crosstalk::init! {
124///     House::Bedroom => String,
125///     House::LivingRoom => String,
126///     House::Kitchen => Vec<f32>,
127///     House::Bathroom => u8,
128/// }
129/// 
130/// let mut node = crosstalk::BoundedNode::<House>::new(10);
131/// let (pub0, mut sub0) = node.pubsub_blocking(House::Bedroom).unwrap();
132/// let (pub1, mut sub1) = node.pubsub_blocking(House::Bedroom).unwrap();
133/// 
134/// pub0.write("Hello".to_string());
135/// pub0.write("World".to_string());
136/// pub1.write("Foo".to_string());
137/// pub1.write("Bar".to_string());
138/// 
139/// assert_eq!(sub1.try_read().unwrap(), "Hello");
140/// assert_eq!(sub1.try_read().unwrap(), "World");
141/// assert_eq!(sub1.try_read().unwrap(), "Foo");
142/// assert_eq!(sub1.try_read().unwrap(), "Bar");
143/// 
144/// assert_eq!(sub0.try_read().unwrap(), "Hello");
145/// assert_eq!(sub0.try_read().unwrap(), "World");
146/// assert_eq!(sub0.try_read().unwrap(), "Foo");
147/// assert_eq!(sub0.try_read().unwrap(), "Bar");
148/// ```
149pub struct BoundedNode<T> {
150    pub node: Arc<Mutex<ImplementedBoundedNode<T>>>,
151    pub size: usize,
152}
153/// [`BoundedNode`] implementation 
154/// 
155/// This holds an [`Arc<Mutex<ImplementedBoundedNode<T>>>`], which
156/// references the true (private) node that implements the [`AsTopic`] trait.
157impl<T> BoundedNode<T> 
158where
159    T: CrosstalkTopic,
160    ImplementedBoundedNode<T>: CrosstalkPubSub<T>,
161{
162    #[inline(always)]
163    /// Creates a new [`BoundedNode`]
164    /// 
165    /// # Arguments
166    /// 
167    /// * `size` - the size of each buffer
168    /// 
169    /// # Panics
170    /// 
171    /// Panics if `size` is 0. This is intentional because of two reasons:
172    /// 
173    /// 1. No buffer can have a size of 0.
174    /// 2. A [`tokio::sync::broadcast::Sender`] cannot be created with a size of 0,
175    ///    and therefore, a [`BoundedNode`] could potentially be created without
176    ///    error but then calling [`BoundedNode::publisher`] or [`BoundedNode::subscriber`]
177    ///    would result in a panic later on.
178    /// 
179    /// # Examples
180    /// 
181    /// ```
182    /// use crosstalk::AsTopic;
183    /// 
184    /// #[derive(AsTopic)]
185    /// enum House {
186    ///     Bedroom,
187    ///     LivingRoom,
188    ///     Kitchen,
189    ///     Bathroom,
190    /// }
191    /// 
192    /// crosstalk::init! {
193    ///     House::Bedroom => String,
194    ///     House::LivingRoom => String,
195    ///     House::Kitchen => Vec<f32>,
196    ///     House::Bathroom => u8,
197    /// }
198    /// 
199    /// fn main() {
200    ///     let node = crosstalk::BoundedNode::<House>::new(10);
201    ///     let moved_node = node.clone();
202    ///     std::thread::spawn(move || another_thread(moved_node));
203    ///     assert_eq!(node.size, 10);
204    /// }
205    /// 
206    /// fn another_thread(mut node: crosstalk::BoundedNode<House>) {
207    ///     assert_eq!(node.size, 10);
208    /// }
209    /// ```
210    pub fn new(size: usize) -> Self {
211        if size == 0 {
212            panic!("Size must be greater than 0. Attempting to make `tokio::sync::broadcast::channels` later will result in a panic.");
213        }
214        Self {
215            node: Arc::new(Mutex::new(ImplementedBoundedNode::<T>::new(size))),
216            size,
217        }
218    }
219
220    #[inline(always)]
221    /// Creates a new publisher for the given topic `T`
222    /// 
223    /// # Arguments
224    /// 
225    /// * `topic` - the topic to create a publisher for
226    /// 
227    /// # Returns
228    /// 
229    /// A publisher for the topic `T`
230    /// 
231    /// # Examples
232    /// 
233    /// ```
234    /// use crosstalk::AsTopic;
235    /// 
236    /// #[derive(AsTopic)]
237    /// enum House {
238    ///     Bedroom,
239    ///     LivingRoom,
240    ///     Kitchen,
241    ///     Bathroom,
242    /// }
243    /// 
244    /// crosstalk::init! {
245    ///     House::Bedroom => String,
246    ///     House::LivingRoom => String,
247    ///     House::Kitchen => Vec<f32>,
248    ///     House::Bathroom => u8,
249    /// }
250    /// 
251    /// #[tokio::main]
252    /// async fn main() {
253    ///     let mut node = crosstalk::BoundedNode::<House>::new(10);
254    ///     assert!(node.publisher::<String>(House::Bedroom).await.is_ok());
255    /// }
256    /// ```
257    pub async fn publisher<D: CrosstalkData>(&mut self, topic: T) -> Result<Publisher<D, T>, crate::Error> {
258        self.node.lock().await.publisher(topic)
259    }
260
261    #[inline(always)]
262    /// Creates a new publisher for the given topic `T`
263    /// 
264    /// # Arguments
265    /// 
266    /// * `topic` - the topic to create a publisher for
267    /// 
268    /// # Returns
269    /// 
270    /// A publisher for the topic `T`
271    /// 
272    /// # Examples
273    /// 
274    /// ```
275    /// use crosstalk::AsTopic;
276    /// 
277    /// #[derive(AsTopic)]
278    /// enum House {
279    ///     Bedroom,
280    ///     LivingRoom,
281    ///     Kitchen,
282    ///     Bathroom,
283    /// }
284    /// 
285    /// crosstalk::init! {
286    ///     House::Bedroom => String,
287    ///     House::LivingRoom => String,
288    ///     House::Kitchen => Vec<f32>,
289    ///     House::Bathroom => u8,
290    /// }
291    /// 
292    /// let mut node = crosstalk::BoundedNode::<House>::new(10);
293    /// assert!(node.publisher_blocking::<String>(House::Bedroom).is_ok());
294    /// ```
295    pub fn publisher_blocking<D: CrosstalkData>(&mut self, topic: T) -> Result<Publisher<D, T>, crate::Error> {
296        self.node.blocking_lock().publisher(topic)
297    }
298
299    #[inline(always)]
300    /// Creates a new subscriber for the given topic `T`
301    /// 
302    /// # Arguments
303    /// 
304    /// * `topic` - the topic to create a subscriber for
305    /// 
306    /// # Returns
307    /// 
308    /// A subscriber for the topic `T`
309    /// 
310    /// # Examples
311    /// 
312    /// ```
313    /// use crosstalk::AsTopic;
314    /// 
315    /// #[derive(AsTopic)]
316    /// enum House {
317    ///     Bedroom,
318    ///     LivingRoom,
319    ///     Kitchen,
320    ///     Bathroom,
321    /// }
322    /// 
323    /// crosstalk::init! {
324    ///     House::Bedroom => String,
325    ///     House::LivingRoom => String,
326    ///     House::Kitchen => Vec<f32>,
327    ///     House::Bathroom => u8,
328    /// }
329    /// 
330    /// #[tokio::main]
331    /// async fn main() {
332    ///     let mut node = crosstalk::BoundedNode::<House>::new(10);
333    ///     assert!(node.subscriber::<String>(House::Bedroom).await.is_ok());
334    /// }
335    /// ```
336    pub async fn subscriber<D: CrosstalkData>(&mut self, topic: T) -> Result<Subscriber<D, T>, crate::Error> {
337        self.node.lock().await.subscriber(topic)
338    }
339
340    #[inline(always)]
341    /// Creates a new subscriber for the given topic `T`
342    /// 
343    /// # Arguments
344    /// 
345    /// * `topic` - the topic to create a subscriber for
346    /// 
347    /// # Returns
348    /// 
349    /// A subscriber for the topic `T`
350    /// 
351    /// # Examples
352    /// 
353    /// ```
354    /// use crosstalk::AsTopic;
355    /// 
356    /// #[derive(AsTopic)]
357    /// enum House {
358    ///     Bedroom,
359    ///     LivingRoom,
360    ///     Kitchen,
361    ///     Bathroom,
362    /// }
363    /// 
364    /// crosstalk::init! {
365    ///     House::Bedroom => String,
366    ///     House::LivingRoom => String,
367    ///     House::Kitchen => Vec<f32>,
368    ///     House::Bathroom => u8,
369    /// }
370    /// 
371    /// let mut node = crosstalk::BoundedNode::<House>::new(10);
372    /// assert!(node.subscriber_blocking::<String>(House::Bedroom).is_ok());
373    /// ```
374    pub fn subscriber_blocking<D: CrosstalkData>(&mut self, topic: T) -> Result<Subscriber<D, T>, crate::Error> {
375        self.node.blocking_lock().subscriber(topic)
376    }
377
378    #[inline(always)]
379    /// Creates a new publisher and subscriber for the given topic `T`
380    /// 
381    /// # Arguments
382    /// 
383    /// * `topic` - the topic to create a publisher and subscriber for
384    /// 
385    /// # Returns
386    /// 
387    /// A publisher and subscriber for the topic `T`
388    /// 
389    /// # Examples
390    /// 
391    /// ```
392    /// use crosstalk::AsTopic;
393    /// 
394    /// #[derive(AsTopic)]
395    /// enum House {
396    ///     Bedroom,
397    ///     LivingRoom,
398    ///     Kitchen,
399    ///     Bathroom,
400    /// }
401    /// 
402    /// crosstalk::init! {
403    ///     House::Bedroom => String,
404    ///     House::LivingRoom => String,
405    ///     House::Kitchen => Vec<f32>,
406    ///     House::Bathroom => u8,
407    /// }
408    /// 
409    /// #[tokio::main]
410    /// async fn main() {
411    ///     let mut node = crosstalk::BoundedNode::<House>::new(10);
412    ///     let (publisher, mut subscriber) = node.pubsub(House::Bedroom).await.unwrap();
413    ///     publisher.write("hello".to_string());
414    ///     assert_eq!(subscriber.try_read().unwrap(), "hello");
415    /// }
416    /// ```
417    /// 
418    pub async fn pubsub<D: CrosstalkData>(&mut self, topic: T) -> Result<(Publisher<D, T>, Subscriber<D, T>), crate::Error> {
419        self.node.lock().await.pubsub(topic)
420    }
421
422    #[inline(always)]
423    #[allow(clippy::type_complexity)]
424    /// Creates a new publisher and subscriber for the given topic `T`
425    /// 
426    /// # Arguments
427    /// 
428    /// * `topic` - the topic to create a publisher and subscriber for
429    /// 
430    /// # Returns
431    /// 
432    /// A publisher and subscriber for the topic `T`
433    /// 
434    /// # Examples
435    /// 
436    /// ```
437    /// use crosstalk::AsTopic;
438    /// 
439    /// #[derive(AsTopic)]
440    /// enum House {
441    ///     Bedroom,
442    ///     LivingRoom,
443    ///     Kitchen,
444    ///     Bathroom,
445    /// }
446    /// 
447    /// crosstalk::init! {
448    ///     House::Bedroom => String,
449    ///     House::LivingRoom => String,
450    ///     House::Kitchen => Vec<f32>,
451    ///     House::Bathroom => u8,
452    /// }
453    /// 
454    /// let mut node = crosstalk::BoundedNode::<House>::new(10);
455    /// let (publisher, mut subscriber) = node.pubsub_blocking(House::Bedroom).unwrap();
456    /// publisher.write("hello".to_string());
457    /// assert_eq!(subscriber.try_read().unwrap(), "hello");
458    /// ```
459    /// 
460    pub fn pubsub_blocking<D: CrosstalkData>(&mut self, topic: T) -> Result<(Publisher<D, T>, Subscriber<D, T>), crate::Error> {
461        self.node.blocking_lock().pubsub(topic)
462    }
463}
464
465/// The inner implementation of the node,
466/// which implements the [`AsTopic`] trait
467/// 
468/// This is auto-generated by the [`crosstalk_macros::init!`] macro
469/// 
470/// # Attributes
471/// 
472/// * `senders` - the senders of the node
473/// * `size` - the size of each buffer
474pub struct ImplementedBoundedNode<T> {
475    pub senders: HashMap<T, Box<dyn std::any::Any + 'static>>,
476    pub size: usize,
477}
478
479/// [`ImplementedBoundedNode`] implementation of [`Send`]
480unsafe impl<T> Send for ImplementedBoundedNode<T> {}
481/// [`ImplementedBoundedNode`] implementation of [`Sync`]
482unsafe impl<T> Sync for ImplementedBoundedNode<T> {}
483
484/// [`ImplementedBoundedNode`] implementation 
485impl<T> ImplementedBoundedNode<T>
486where
487    T: CrosstalkTopic,
488{
489    /// See [`BoundedNode::new`]
490    /// 
491    /// # Arguments
492    /// 
493    /// * `size` - the size of each buffer
494    pub fn new(size: usize) -> Self {
495        Self {
496            senders: HashMap::new(),
497            size,
498        }
499    }
500}
501
502#[derive(Clone)]
503/// A `crosstalk` [`Publisher`]
504/// 
505/// # Attributes
506/// 
507/// * `topic` - the topic of the publisher
508/// * `buf` - the buffer which broadcasts the data
509/// 
510/// # Type Parameters
511/// 
512/// * `T` - the topic of the publisher
513/// * `D` - the data type of the publisher
514/// 
515/// This is not meant to be used directly, please
516/// use the [`crosstalk_macros::init!`] macro instead
517/// and produce a [`Publisher`] with [`BoundedNode::publisher`]
518/// or [`BoundedNode::pubsub`]
519pub struct Publisher<D, T> {
520    pub topic: T,
521    buf: TokioSender<D>,
522}
523/// [`Publisher`] implementation
524impl<D, T> Publisher<D, T> {
525    #[inline(always)]
526    /// See [`BoundedNode::publisher`]
527    pub fn new(topic: T, buf: TokioSender<D>) -> Self {
528        Self { topic, buf }
529    }
530
531    #[inline(always)]
532    /// Publishes data to a topic, broadcasting it to all subscribers
533    /// 
534    /// # Arguments
535    /// 
536    /// * `sample` - the sample to publish
537    /// 
538    /// # Examples
539    /// 
540    /// ```
541    /// use crosstalk::AsTopic;
542    /// 
543    /// #[derive(AsTopic)]
544    /// enum House {
545    ///     Bedroom,
546    ///     LivingRoom,
547    ///     Kitchen,
548    ///     Bathroom,
549    /// }
550    /// 
551    /// crosstalk::init! {
552    ///     House::Bedroom => String,
553    ///     House::LivingRoom => String,
554    ///     House::Kitchen => Vec<f32>,
555    ///     House::Bathroom => u8,
556    /// }
557    /// 
558    /// let mut node = crosstalk::BoundedNode::<House>::new(10);
559    /// let (publisher, mut subscriber) = node.pubsub_blocking(House::Bedroom).unwrap();
560    /// publisher.write("hello".to_string());
561    /// std::thread::spawn(move || {
562    ///     assert_eq!(subscriber.try_read().unwrap(), "hello");
563    /// });
564    /// ```
565    pub fn write(&self, sample: D) {
566        let _ = self.buf.send(sample);
567    }
568}
569
570/// A `crosstalk` [`Subscriber`]
571/// 
572/// # Attributes
573/// 
574/// * `topic` - the topic of the subscriber
575/// * `rcvr` - the receiver of the subscriber
576/// * `sndr` - the sender for the topic. This is used to spawn multiple receivers upon [`Subscriber::clone`]
577/// 
578/// # Type Parameters
579/// 
580/// * `T` - the topic of the subscriber
581/// * `D` - the data type of the subscriber
582/// 
583/// This is not meant to be used directly, please
584/// use the [`crosstalk_macros::init!`] macro instead
585/// and produce a [`Subscriber`] with [`BoundedNode::subscriber`]
586/// or [`BoundedNode::pubsub`]
587pub struct Subscriber<D, T> {
588    pub topic: T,
589    rcvr: Receiver<D>,
590    sndr: Arc<TokioSender<D>>,
591}
592/// [`Subscriber`] implementation 
593impl<D: Clone, T: Clone> Subscriber<D, T> {
594    #[inline(always)]
595    /// See [`BoundedNode::subscriber`]
596    pub fn new(
597        topic: T,
598        rcvr: Option<TokioReceiver<D>>,
599        sndr: Arc<TokioSender<D>>,
600    ) -> Self {
601        Self {
602            topic,
603            rcvr: Receiver::new(rcvr.unwrap_or(sndr.subscribe())),
604            sndr: sndr.clone(),
605        }
606    }
607
608    #[inline(always)]
609    /// Asynchronous blocking read from the [`TokioReceiver`]
610    /// 
611    /// The sequential equivalent to this function is [`Subscriber::read_blocking`]
612    /// which can be used outside of an asynchronous context
613    /// 
614    /// # Examples
615    /// 
616    /// ```
617    /// use crosstalk::AsTopic;
618    /// 
619    /// #[derive(AsTopic)]
620    /// enum House {
621    ///     Bedroom,
622    ///     LivingRoom,
623    ///     Kitchen,
624    ///     Bathroom,
625    /// }
626    /// 
627    /// crosstalk::init! {
628    ///     House::Bedroom => String,
629    ///     House::LivingRoom => String,
630    ///     House::Kitchen => Vec<f32>,
631    ///     House::Bathroom => u8,
632    /// }
633    /// 
634    /// #[tokio::main]
635    /// async fn main() {
636    ///     let mut node = crosstalk::BoundedNode::<House>::new(10);
637    ///     let (publisher, mut subscriber) = node.pubsub(House::Bedroom).await.unwrap();
638    ///     publisher.write("hello".to_string());
639    ///     assert_eq!(subscriber.read().await, Some("hello".to_string()));
640    /// }
641    /// ```
642    pub async fn read(&mut self) -> Option<D> {
643        self.rcvr.read().await
644    }
645    
646    #[inline(always)]
647    /// Non-blocking read from [`TokioReceiver`]
648    /// Upon immediate failure, [`None`] will be returned
649    /// 
650    /// Difference between this function and [`Subscriber::try_read_raw`]
651    /// is that this function continuously loops upon [`tokio`] error of
652    /// [`tokio::sync::broadcast::error::TryRecvError::Lagged`], looping
653    /// until a valid message is received OR the buffer is determined to be
654    /// empty
655    /// 
656    /// [`Subscriber::try_read_raw`] will return [`None`] upon
657    /// [`tokio::sync::broadcast::error::TryRecvError::Lagged`], which
658    /// can cause some unexpected behavior
659    /// 
660    /// # Examples
661    /// 
662    /// ```
663    /// use crosstalk::AsTopic;
664    /// 
665    /// #[derive(AsTopic)]
666    /// enum House {
667    ///     Bedroom,
668    ///     LivingRoom,
669    ///     Kitchen,
670    ///     Bathroom,
671    /// }
672    /// 
673    /// crosstalk::init! {
674    ///     House::Bedroom => String,
675    ///     House::LivingRoom => String,
676    ///     House::Kitchen => Vec<f32>,
677    ///     House::Bathroom => u8,
678    /// }
679    /// 
680    /// let mut node = crosstalk::BoundedNode::<House>::new(10);
681    /// let (publisher, mut subscriber) = node.pubsub_blocking(House::Bedroom).unwrap();
682    /// publisher.write("hello".to_string());
683    /// assert_eq!(subscriber.try_read(), Some(String::from("hello")));
684    /// assert_eq!(subscriber.try_read(), None);
685    /// ```
686    pub fn try_read(&mut self) -> Option<D> {
687        self.rcvr.try_read()
688    }
689
690    #[inline(always)]
691    /// Non-blocking read from [`TokioReceiver`], returning
692    /// [`None`] if there are no messages available or if 
693    /// [`tokio::sync::broadcast::error::TryRecvError::Lagged`] occurs.
694    /// 
695    /// This function can cause some unexpected behavior. It is recommended
696    /// to use [`Subscriber::try_read`] instead.
697    /// 
698    /// # Examples
699    /// 
700    /// ```
701    /// use crosstalk::AsTopic;
702    /// 
703    /// #[derive(AsTopic)]
704    /// enum House {
705    ///     Bedroom,
706    ///     LivingRoom,
707    ///     Kitchen,
708    ///     Bathroom,
709    /// }
710    /// 
711    /// crosstalk::init! {
712    ///     House::Bedroom => String,
713    ///     House::LivingRoom => String,
714    ///     House::Kitchen => Vec<f32>,
715    ///     House::Bathroom => u8,
716    /// }
717    /// 
718    /// let mut node = crosstalk::BoundedNode::<House>::new(10);
719    /// let (publisher, mut subscriber) = node.pubsub_blocking(House::Bedroom).unwrap();
720    /// publisher.write("hello".to_string());
721    /// assert_eq!(subscriber.try_read_raw(), Some(String::from("hello")));
722    /// assert_eq!(subscriber.try_read_raw(), None);
723    /// ```
724    pub fn try_read_raw(&mut self) -> Option<D> {
725        self.rcvr.try_read_raw()
726    }
727    
728    #[inline(always)]
729    /// Sequential blocking read from [`TokioReceiver`]
730    /// 
731    /// The asynchronous equivalent to this function is [`Subscriber::read`], 
732    /// which must be used in an asynchronous context with `.await`
733    /// 
734    /// # Examples
735    /// 
736    /// ```
737    /// use crosstalk::AsTopic;
738    /// 
739    /// #[derive(AsTopic)]
740    /// enum House {
741    ///     Bedroom,
742    ///     LivingRoom,
743    ///     Kitchen,
744    ///     Bathroom,
745    /// }
746    /// 
747    /// crosstalk::init! {
748    ///     House::Bedroom => String,
749    ///     House::LivingRoom => String,
750    ///     House::Kitchen => Vec<f32>,
751    ///     House::Bathroom => u8,
752    /// }
753    /// 
754    /// let mut node = crosstalk::BoundedNode::<House>::new(10);
755    /// let (publisher, mut subscriber) = node.pubsub_blocking(House::Bedroom).unwrap();
756    /// publisher.write("hello".to_string());
757    /// assert_eq!(subscriber.read_blocking(), Some("hello".to_string()));
758    /// ```
759    pub fn read_blocking(&mut self) -> Option<D> {
760        self.rcvr.read_blocking()
761    }
762    
763    #[inline(always)]
764    /// Asynchronous non-blocking read from [`TokioReceiver`]
765    /// with a given timeout. After the timeout if there are no messages,
766    /// returns [`None`].
767    /// 
768    /// # Examples
769    /// 
770    /// ```
771    /// use crosstalk::AsTopic;
772    /// 
773    /// #[derive(AsTopic)]
774    /// enum House {
775    ///     Bedroom,
776    ///     LivingRoom,
777    ///     Kitchen,
778    ///     Bathroom,
779    /// }
780    /// 
781    /// crosstalk::init! {
782    ///     House::Bedroom => String,
783    ///     House::LivingRoom => String,
784    ///     House::Kitchen => Vec<f32>,
785    ///     House::Bathroom => u8,
786    /// }
787    /// 
788    /// #[tokio::main]
789    /// async fn main() {
790    ///     let mut node = crosstalk::BoundedNode::<House>::new(10);
791    ///     let (publisher, mut subscriber) = node.pubsub(House::Bedroom).await.unwrap();
792    ///     publisher.write("hello".to_string());
793    ///     assert_eq!(subscriber.read_timeout(std::time::Duration::from_millis(100)).await, Some("hello".to_string()));
794    /// }
795    /// ```
796    pub async fn read_timeout(&mut self, timeout: std::time::Duration) -> Option<D> {
797        self.rcvr.read_timeout(timeout).await
798    }
799}
800/// [`Subscriber`] implementation of [`Clone`]
801impl<D: Clone, T: Clone> Clone for Subscriber<D, T> {
802    #[inline(always)]
803    /// Clones a [`Subscriber`]
804    /// 
805    /// # Examples
806    /// 
807    /// ```
808    /// use crosstalk::AsTopic;
809    /// 
810    /// #[derive(AsTopic)]
811    /// enum House {
812    ///     Bedroom,
813    ///     LivingRoom,
814    ///     Kitchen,
815    ///     Bathroom,
816    /// }
817    /// 
818    /// crosstalk::init! {
819    ///     House::Bedroom => String,
820    ///     House::LivingRoom => String,
821    ///     House::Kitchen => Vec<f32>,
822    ///     House::Bathroom => u8,
823    /// }
824    /// 
825    /// let mut node = crosstalk::BoundedNode::<House>::new(10);
826    /// let (publisher, mut subscriber) = node.pubsub_blocking(House::Bedroom).unwrap();
827    /// let mut subscriber_2 = subscriber.clone();
828    /// publisher.write("hello".to_string());
829    /// assert_eq!(subscriber.try_read().unwrap(), "hello");
830    /// assert_eq!(subscriber_2.try_read().unwrap(), "hello");
831    /// ```
832    fn clone(&self) -> Self {
833        Self {
834            topic: self.topic.clone(),
835            rcvr: Receiver::new(self.sndr.subscribe()),
836            sndr: self.sndr.clone(),
837        }
838    }
839}
840
841/// Receiver
842/// 
843/// Define a receiver for subscribing messages
844/// 
845/// Reads from [`TokioReceiver`]
846struct Receiver<D> {
847    buf: TokioReceiver<D>,
848}
849/// [`Receiver`] implementation
850impl<D: Clone> Receiver<D>{
851    #[inline(always)]
852    /// Constructs a new [`Receiver`]
853    pub fn new(
854        buf: TokioReceiver<D>,
855    ) -> Self {
856        Self { buf }
857    }
858
859    /// Reads from [`TokioReceiver`]
860    /// 
861    /// This struct/function is not meant to be used directly,
862    /// rather through the [`Subscriber`] struct with [`Subscriber::read`]
863    async fn read(&mut self) -> Option<D> {
864        loop {
865            match self.buf.recv().await {
866                Ok(res) => return Some(res),
867                Err(e) => match e {
868                    tokio::sync::broadcast::error::RecvError::Lagged(_) => { continue; }
869
870                    #[cfg(not(any(feature = "log", feature = "tracing")))]
871                    _ => return None,
872
873                    #[cfg(any(feature = "log", feature = "tracing"))]
874                    _ => {
875                        #[cfg(feature = "log")]
876                        log::error!("{}", e);
877                        #[cfg(feature = "tracing")]
878                        tracing::error!("{}", e);
879                        return None
880                    }
881                }
882            }
883        }
884    }    
885
886    /// Reads from [`TokioReceiver`]
887    /// 
888    /// This struct/function is not meant to be used directly,
889    /// rather through the [`Subscriber`] struct with [`Subscriber::try_read`]
890    fn try_read(&mut self) -> Option<D> {
891        loop {
892            match self.buf.try_recv() {
893                Ok(d) => return Some(d),
894                Err(e) => {
895                    match e {
896                        tokio::sync::broadcast::error::TryRecvError::Lagged(_) => { continue; },
897
898                        #[cfg(not(any(feature = "log", feature = "tracing")))]
899                        _ => return None,
900
901                        #[cfg(any(feature = "log", feature = "tracing"))]
902                        _ => {
903                            #[cfg(feature = "log")]
904                            log::error!("{}", e);
905                            #[cfg(feature = "tracing")]
906                            tracing::error!("{}", e);
907                            return None
908                        },
909                    }
910                },
911            }
912        } 
913    }
914
915    /// Reads from [`TokioReceiver`]
916    /// 
917    /// This struct/function is not meant to be used directly,
918    /// rather through the [`Subscriber`] struct with [`Subscriber::try_read_raw`]
919    fn try_read_raw(&mut self) -> Option<D> {
920        match self.buf.try_recv() {
921            Ok(d) => Some(d),
922
923            #[cfg(not(any(feature = "log", feature = "tracing")))]
924            Err(_) => None,
925            
926            #[cfg(any(feature = "log", feature = "tracing"))]
927            Err(e) => {
928                #[cfg(feature = "log")]
929                log::error!("{}", e);
930                #[cfg(feature = "tracing")]
931                tracing::error!("{}", e);
932                None
933            },
934        }
935    }
936    
937    /// Reads from [`TokioReceiver`]
938    /// 
939    /// This struct/function is not meant to be used directly,
940    /// rather through the [`Subscriber`] struct with [`Subscriber::read_blocking`]
941    fn read_blocking(&mut self) -> Option<D> {
942        loop {
943            match self.buf.blocking_recv() {
944                Ok(res) => return Some(res),
945                Err(e) => match e {
946                    tokio::sync::broadcast::error::RecvError::Lagged(_) => { continue; }
947                    
948                    #[cfg(not(any(feature = "log", feature = "tracing")))]
949                    _ => return None,
950
951                    #[cfg(any(feature = "log", feature = "tracing"))]
952                    _ => {
953                        #[cfg(feature = "log")]
954                        log::error!("{}", e);
955                        #[cfg(feature = "tracing")]
956                        tracing::error!("{}", e);
957                        return None
958                    },
959                }
960            }
961        }
962    }
963    
964    /// Reads from [`TokioReceiver`]
965    /// 
966    /// This struct/function is not meant to be used directly,
967    /// rather through the [`Subscriber`] struct with [`Subscriber::read_timeout`]
968    async fn read_timeout(&mut self, timeout: std::time::Duration) -> Option<D> {
969        match tokio::runtime::Handle::try_current() {
970            Ok(_) => {
971                match tokio::time::timeout(timeout, self.buf.recv()).await {
972                    Ok(res) => {
973                        match res {
974                            Ok(res) => Some(res),
975
976                            #[cfg(not(any(feature = "log", feature = "tracing")))]
977                            Err(_) => None,
978                            
979                            #[cfg(any(feature = "log", feature = "tracing"))]
980                            Err(e) => {
981                                #[cfg(feature = "log")]
982                                log::error!("{}", e);
983                                #[cfg(feature = "tracing")]
984                                tracing::error!("{}", e);
985                                None
986                            },
987                        }
988                    },
989
990                    #[cfg(not(any(feature = "log", feature = "tracing")))]
991                    Err(_) => None,
992                    
993                    #[cfg(any(feature = "log", feature = "tracing"))]
994                    Err(e) => {
995                        #[cfg(feature = "log")]
996                        log::error!("{}", e);
997                        #[cfg(feature = "tracing")]
998                        tracing::error!("{}", e);
999                        None
1000                    },
1001                }
1002            },
1003
1004            #[cfg(not(any(feature = "log", feature = "tracing")))]
1005            Err(_) => None,
1006            
1007            #[cfg(any(feature = "log", feature = "tracing"))]
1008            Err(e) => {
1009                #[cfg(feature = "log")]
1010                log::error!("{}", e);
1011                #[cfg(feature = "tracing")]
1012                tracing::error!("{}", e);
1013                None
1014            },
1015        }
1016    }
1017}
1018
1019// --------------------------------------------------
1020// for testing
1021// --------------------------------------------------
1022#[allow(unused_imports)]
1023use crosstalk_macros::init_test;
1024#[allow(unused_imports)]
1025use crosstalk_macros::AsTopicTest;
1026
1027#[cfg(test)]
1028mod tests {
1029    use super::*;
1030
1031    #[derive(AsTopicTest)]
1032    enum TestTopic {
1033        A,
1034        B,
1035        C,
1036    }
1037    super::init_test! {
1038        TestTopic::A => String,
1039        TestTopic::B => bool,
1040        TestTopic::C => i32,
1041    }
1042
1043    #[derive(AsTopicTest)]
1044    enum AnotherTestTopic {
1045        Foo,
1046        Bar,
1047    }
1048    super::init_test! {
1049        AnotherTestTopic::Foo => Vec<String>,
1050        AnotherTestTopic::Bar => Vec<bool>,
1051    }
1052
1053    #[test]
1054    fn test_single_pubsub_blocking() {
1055        let mut node = BoundedNode::<TestTopic>::new(10);
1056        let (publisher, mut subscriber) = node.pubsub_blocking(TestTopic::A).unwrap();
1057        publisher.write("test".to_string());
1058        assert_eq!(subscriber.try_read().unwrap(), "test");
1059    }
1060
1061    #[test]
1062    fn test_multiple_subscribers_blocking() {
1063        let mut node = BoundedNode::<TestTopic>::new(10);
1064        let (publisher, mut sub1) = node.pubsub_blocking(TestTopic::A).unwrap();
1065        let mut sub2 = node.subscriber_blocking::<String>(TestTopic::A).unwrap();
1066
1067        publisher.write("hello".to_string());
1068        assert_eq!(sub1.try_read().unwrap(), "hello");
1069        assert_eq!(sub2.try_read().unwrap(), "hello");
1070    }
1071
1072    #[test]
1073    fn test_cross_topic_isolation() {
1074        let mut node = BoundedNode::<TestTopic>::new(10);
1075        let (pub_a, mut sub_a) = node.pubsub_blocking(TestTopic::A).unwrap();
1076        let (pub_b, mut sub_b) = node.pubsub_blocking(TestTopic::B).unwrap();
1077
1078        pub_a.write("string".to_string());
1079        pub_b.write(true);
1080
1081        assert_eq!(sub_a.try_read().unwrap(), "string");
1082        assert!(sub_b.try_read().unwrap());
1083        assert!(sub_a.try_read().is_none());
1084        assert!(sub_b.try_read().is_none());
1085    }
1086
1087    #[test]
1088    fn test_multiple_threads_blocking() {
1089        let mut node = BoundedNode::<TestTopic>::new(10);
1090        let (publisher, mut subscriber) = node.pubsub_blocking(TestTopic::A).unwrap();
1091
1092        let handle = std::thread::spawn(move || {
1093            publisher.write("threaded".to_string());
1094        });
1095
1096        handle.join().unwrap();
1097        assert_eq!(subscriber.try_read().unwrap(), "threaded");
1098    }
1099
1100    #[tokio::test]
1101    async fn test_async_pubsub_single_runtime() {
1102        let mut node = BoundedNode::<TestTopic>::new(10);
1103        let (publisher, mut subscriber) = node.pubsub(TestTopic::A).await.unwrap();
1104        publisher.write("async".to_string());
1105        assert_eq!(subscriber.read().await.unwrap(), "async");
1106    }
1107
1108    #[test]
1109    fn test_high_volume_blocking() {
1110        let mut node = BoundedNode::<TestTopic>::new(100);
1111        let (publisher, mut subscriber) = node.pubsub_blocking(TestTopic::C).unwrap();
1112
1113        for i in 0..100 {
1114            publisher.write(i);
1115        }
1116
1117        for i in 0..100 {
1118            assert_eq!(subscriber.try_read().unwrap(), i);
1119        }
1120        assert!(subscriber.try_read().is_none());
1121    }
1122
1123    #[test]
1124    fn test_cloned_subscribers() {
1125        let mut node = BoundedNode::<TestTopic>::new(10);
1126        let (publisher, mut sub1) = node.pubsub_blocking(TestTopic::A).unwrap();
1127        let mut sub2 = sub1.clone();
1128
1129        publisher.write("clone".to_string());
1130        assert_eq!(sub1.try_read().unwrap(), "clone");
1131        assert_eq!(sub2.try_read().unwrap(), "clone");
1132    }
1133
1134    #[test]
1135    fn test_buffer_overflow_handling() {
1136        let mut node = BoundedNode::<TestTopic>::new(2);
1137        let (publisher, mut subscriber) = node.pubsub_blocking(TestTopic::A).unwrap();
1138
1139        publisher.write("msg1".to_string());
1140        publisher.write("msg2".to_string());
1141        publisher.write("msg3".to_string());
1142
1143        assert_eq!(subscriber.try_read().unwrap(), "msg2");
1144        assert_eq!(subscriber.try_read().unwrap(), "msg3");
1145        assert!(subscriber.try_read().is_none());
1146    }
1147
1148    #[test]
1149    fn test_type_mismatch_errors() {
1150        let mut node = BoundedNode::<TestTopic>::new(10);
1151        
1152        let publisher_res = node.publisher_blocking::<i32>(TestTopic::A);
1153        assert!(matches!(publisher_res, Err(Error::PublisherMismatch(_, _))));
1154        
1155        let subscriber_res = node.subscriber_blocking::<i32>(TestTopic::A);
1156        assert!(matches!(subscriber_res, Err(Error::SubscriberMismatch(_, _))));
1157    }
1158
1159    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1160    async fn test_multiple_async_runtimes() {
1161        let mut node = BoundedNode::<TestTopic>::new(10);
1162        let (publisher, mut subscriber) = node.pubsub(TestTopic::A).await.unwrap();
1163
1164        let handle = tokio::spawn(async move {
1165            publisher.write("async".to_string());
1166        });
1167
1168        handle.await.unwrap();
1169        assert_eq!(subscriber.read().await.unwrap(), "async");
1170    }
1171
1172    #[test]
1173    fn test_mixed_async_blocking() {
1174        let mut node = BoundedNode::<TestTopic>::new(10);
1175        let (publisher, mut subscriber) = node.pubsub_blocking(TestTopic::A).unwrap();
1176
1177        let rt = tokio::runtime::Runtime::new().unwrap();
1178        rt.block_on(async {
1179            publisher.write("mixed".to_string());
1180        });
1181
1182        assert_eq!(subscriber.try_read().unwrap(), "mixed");
1183    }
1184
1185    #[test]
1186    fn test_complex_data_types() {
1187        let mut node = BoundedNode::<AnotherTestTopic>::new(10);
1188        let (pub_foo, mut sub_foo) = node.pubsub_blocking(AnotherTestTopic::Foo).unwrap();
1189        let (pub_bar, mut sub_bar) = node.pubsub_blocking(AnotherTestTopic::Bar).unwrap();
1190
1191        pub_foo.write(vec!["a".to_string(), "b".to_string()]);
1192        pub_bar.write(vec![true, false]);
1193
1194        assert_eq!(sub_foo.try_read().unwrap(), vec!["a", "b"]);
1195        assert_eq!(sub_bar.try_read().unwrap(), vec![true, false]);
1196    }
1197
1198    #[test]
1199    fn test_concurrent_publishers() {
1200        let mut node = BoundedNode::<TestTopic>::new(100);
1201        let (pub1, mut sub) = node.pubsub_blocking(TestTopic::C).unwrap();
1202        let pub2 = node.publisher_blocking::<i32>(TestTopic::C).unwrap();
1203
1204        let handle1 = std::thread::spawn(move || {
1205            for i in 0..50 {
1206                pub1.write(i);
1207            }
1208        });
1209
1210        let handle2 = std::thread::spawn(move || {
1211            for i in 50..100 {
1212                pub2.write(i);
1213            }
1214        });
1215
1216        handle1.join().unwrap();
1217        handle2.join().unwrap();
1218
1219        let mut received = Vec::new();
1220        while let Some(msg) = sub.try_read() {
1221            received.push(msg);
1222        }
1223        assert_eq!(received.len(), 100);
1224    }
1225
1226    #[tokio::test]
1227    async fn test_async_subscriber_cloning() {
1228        let mut node = BoundedNode::<TestTopic>::new(10);
1229        let (publisher, mut sub1) = node.pubsub(TestTopic::A).await.unwrap();
1230        let mut sub2 = sub1.clone();
1231
1232        publisher.write("async_clone".to_string());
1233        assert_eq!(sub1.read().await.unwrap(), "async_clone");
1234        assert_eq!(sub2.read().await.unwrap(), "async_clone");
1235    }
1236
1237    #[test]
1238    fn test_dropped_publisher_behavior() {
1239        let mut node = BoundedNode::<TestTopic>::new(10);
1240        let (publisher, mut subscriber) = node.pubsub_blocking::<String>(TestTopic::A).unwrap();
1241        drop(publisher);
1242        assert!(subscriber.try_read().is_none());
1243    }
1244
1245    #[tokio::test]
1246    async fn test_multiple_async_publishers() {
1247        const LOOP_COUNT: usize = 10;
1248        const BUFFER_SIZE: usize = 10;
1249        let mut node = BoundedNode::<TestTopic>::new(BUFFER_SIZE);
1250        let (publisher, mut subscriber) = node.pubsub(TestTopic::A).await.unwrap();
1251        let publisher_1 = publisher.clone();
1252        let publisher_2 = publisher.clone();
1253
1254        let task1 = tokio::spawn({
1255            async move {
1256                for _ in 0..LOOP_COUNT {
1257                    publisher_1.write("task1".to_string());
1258                }
1259            }
1260        });
1261
1262        let task2 = tokio::spawn({
1263            async move {
1264                for _ in 0..LOOP_COUNT {
1265                    publisher_2.write("task2".to_string());
1266                }
1267            }
1268        });
1269
1270        let _ = tokio::join!(task1, task2);
1271
1272        let mut task1_count = 0;
1273        let mut task2_count = 0;
1274        for _ in 0..BUFFER_SIZE {
1275            let msg = subscriber.read().await.unwrap();
1276            if msg == "task1" { task1_count += 1; }
1277            if msg == "task2" { task2_count += 1; }
1278        }
1279        assert_eq!(task1_count + task2_count, BUFFER_SIZE);
1280    }
1281
1282    #[test]
1283    fn test_blocking_read_with_delay() {
1284        let mut node = BoundedNode::<TestTopic>::new(10);
1285        let (publisher, mut subscriber) = node.pubsub_blocking(TestTopic::A).unwrap();
1286
1287        std::thread::spawn(move || {
1288            std::thread::sleep(std::time::Duration::from_millis(500));
1289            publisher.write("delayed".to_string());
1290        });
1291
1292        assert_eq!(subscriber.read_blocking().unwrap(), "delayed");
1293    }
1294
1295    #[tokio::test]
1296    async fn test_read_timeout_behavior() {
1297        let mut node = BoundedNode::<TestTopic>::new(10);
1298        let (publisher, mut subscriber) = node.pubsub(TestTopic::A).await.unwrap();
1299
1300        let timeout = std::time::Duration::from_millis(100);
1301        assert!(subscriber.read_timeout(timeout).await.is_none());
1302
1303        publisher.write("timeout_test".to_string());
1304        assert_eq!(subscriber.read_timeout(timeout).await.unwrap(), "timeout_test");
1305    }
1306
1307    #[test]
1308    fn test_multiple_topics_concurrently() {
1309        let mut node = BoundedNode::<AnotherTestTopic>::new(10);
1310        let (pub_foo, mut sub_foo) = node.pubsub_blocking(AnotherTestTopic::Foo).unwrap();
1311        let (pub_bar, mut sub_bar) = node.pubsub_blocking(AnotherTestTopic::Bar).unwrap();
1312
1313        let handle1 = std::thread::spawn(move || {
1314            pub_foo.write(vec!["thread".to_string()]);
1315        });
1316
1317        let handle2 = std::thread::spawn(move || {
1318            pub_bar.write(vec![true]);
1319        });
1320
1321        handle1.join().unwrap();
1322        handle2.join().unwrap();
1323
1324        assert_eq!(sub_foo.try_read().unwrap(), vec!["thread"]);
1325        assert_eq!(sub_bar.try_read().unwrap(), vec![true]);
1326    }
1327
1328    #[test]
1329    #[should_panic]
1330    fn test_zero_capacity_node() {
1331        let _ = BoundedNode::<TestTopic>::new(0);
1332    }
1333
1334    #[tokio::test]
1335    async fn test_async_unbounded_messaging() {
1336        let mut node = BoundedNode::<TestTopic>::new(1000);
1337        let (publisher, mut subscriber) = node.pubsub(TestTopic::A).await.unwrap();
1338
1339        let messages = vec!["msg1", "msg2", "msg3", "msg4", "msg5"];
1340        for msg in &messages {
1341            publisher.write(msg.to_string());
1342        }
1343
1344        for expected in messages {
1345            assert_eq!(subscriber.read().await.unwrap(), expected);
1346        }
1347    }
1348
1349    #[test]
1350    fn test_error_handling_lagged_messages() {
1351        let mut node = BoundedNode::<TestTopic>::new(2);
1352        let (publisher, mut subscriber) = node.pubsub_blocking(TestTopic::A).unwrap();
1353
1354        for i in 0..5 {
1355            publisher.write(format!("msg{}", i));
1356        }
1357
1358        let mut received = Vec::new();
1359        while let Some(msg) = subscriber.try_read() {
1360            received.push(msg);
1361        }
1362        assert_eq!(received, vec!["msg3", "msg4"]);
1363    }
1364}