crosstalk/
lib.rs

1#![doc(html_root_url = "https://docs.rs/crosstalk/0.3")]
2#![doc = include_str!("../README.md")]
3// --------------------------------------------------
4// external
5// --------------------------------------------------
6use std::{
7    sync::{
8        Arc,
9        Mutex,
10    },
11    any::Any,
12    hash::Hash,
13    collections::HashMap,
14};
15
16// --------------------------------------------------
17// internal
18// --------------------------------------------------
19pub mod external;
20pub use crosstalk_macros::init;
21pub use crosstalk_macros::AsTopic;
22
23#[derive(Clone)]
24/// A [`BoundedNode`] is a node to spawn publishers and
25/// subscribers on, where the size of each buffer is
26/// fixed.
27/// 
28/// # Attributes
29/// 
30/// * `node` - the node to spawn publishers and subscribers on
31/// * `size` - the size of each buffer
32/// 
33/// # Type Parameters
34/// 
35/// * `T` - the topic enum name
36/// 
37/// # Examples
38/// 
39/// ```
40/// use crosstalk::AsTopic;
41/// 
42/// #[derive(AsTopic)]
43/// enum House {
44///     Bedroom,
45///     LivingRoom,
46///     Kitchen,
47///     Bathroom,
48/// }
49/// 
50/// crosstalk::init! {
51///     House::Bedroom => String,
52///     House::LivingRoom => String,
53///     House::Kitchen => Vec<f32>,
54///     House::Bathroom => u8,
55/// }
56/// 
57/// fn main() {
58///     let mut node = crosstalk::BoundedNode::<House>::new(10);
59///     let (pub0, mut sub0) = node.pubsub(House::Bedroom).unwrap();
60///     let (pub1, mut sub1) = node.pubsub(House::Bedroom).unwrap();
61///     
62///     pub0.write("Hello".to_string());
63///     pub0.write("World".to_string());
64///     pub1.write("Foo".to_string());
65///     pub1.write("Bar".to_string());
66///     
67///     assert_eq!(sub1.try_read().unwrap(), "Hello");
68///     assert_eq!(sub1.try_read().unwrap(), "World");
69///     assert_eq!(sub1.try_read().unwrap(), "Foo");
70///     assert_eq!(sub1.try_read().unwrap(), "Bar");
71///     
72///     assert_eq!(sub0.try_read().unwrap(), "Hello");
73///     assert_eq!(sub0.try_read().unwrap(), "World");
74///     assert_eq!(sub0.try_read().unwrap(), "Foo");
75///     assert_eq!(sub0.try_read().unwrap(), "Bar");
76/// }
77/// ```
78pub struct BoundedNode<T> {
79    pub node: Arc<Mutex<ImplementedBoundedNode<T>>>,
80    pub size: usize,
81}
82/// [`BoundedNode`] implementation 
83/// 
84/// This holds an [`Arc<Mutex<ImplementedBoundedNode<T>>>`], which
85/// references the true (private) node that implements the [`AsTopic`] trait.
86impl<T> BoundedNode<T> 
87where
88    T: CrosstalkTopic,
89    ImplementedBoundedNode<T>: CrosstalkPubSub<T>,
90{
91    #[inline]
92    /// Creates a new [`BoundedNode`]
93    /// 
94    /// # Arguments
95    /// 
96    /// * `size` - the size of each buffer
97    /// 
98    /// # Examples
99    /// 
100    /// ```
101    /// use crosstalk::AsTopic;
102    /// 
103    /// #[derive(AsTopic)]
104    /// enum House {
105    ///     Bedroom,
106    ///     LivingRoom,
107    ///     Kitchen,
108    ///     Bathroom,
109    /// }
110    /// 
111    /// crosstalk::init! {
112    ///     House::Bedroom => String,
113    ///     House::LivingRoom => String,
114    ///     House::Kitchen => Vec<f32>,
115    ///     House::Bathroom => u8,
116    /// }
117    /// 
118    /// fn main() {
119    ///     let node = crosstalk::BoundedNode::<House>::new(10);
120    ///     let moved_node = node.clone();
121    ///     std::thread::spawn(move || another_thread(moved_node));
122    ///     assert_eq!(node.size, 10);
123    /// }
124    /// 
125    /// fn another_thread(mut node: crosstalk::BoundedNode<House>) {
126    ///     assert_eq!(node.size, 10);
127    /// }
128    /// ```
129    pub fn new(size: usize) -> Self {
130        Self {
131            node: Arc::new(Mutex::new(ImplementedBoundedNode::<T>::new(size.clone()))),
132            size,
133        }
134    }
135
136    #[inline]
137    #[deprecated(since = "0.3.3", note = "This function will be replaced by `publisher_blocking` in crosstalk v1.0. `publisher` will become async. Both `publisher_*` functions will return a `Result` with return error type `crosstalk::Error`")]
138    /// Creates a new publisher for the given topic `T`
139    /// 
140    /// # Arguments
141    /// 
142    /// * `topic` - the topic to create a publisher for
143    /// 
144    /// # Returns
145    /// 
146    /// A publisher for the topic `T`
147    /// 
148    /// # Panics
149    /// 
150    /// Will panic if the datatype of the topic `T` initialized
151    /// using [`crosstalk_macros::init`] does not match the topic `T`.
152    /// 
153    /// Note that the ***linter will not catch this error until runtime***!
154    /// 
155    /// # Examples
156    /// 
157    /// ```
158    /// use crosstalk::AsTopic;
159    /// 
160    /// #[derive(AsTopic)]
161    /// enum House {
162    ///     Bedroom,
163    ///     LivingRoom,
164    ///     Kitchen,
165    ///     Bathroom,
166    /// }
167    /// 
168    /// crosstalk::init! {
169    ///     House::Bedroom => String,
170    ///     House::LivingRoom => String,
171    ///     House::Kitchen => Vec<f32>,
172    ///     House::Bathroom => u8,
173    /// }
174    /// 
175    /// fn main() {
176    ///     let mut node = crosstalk::BoundedNode::<House>::new(10);
177    ///     assert!(node.publisher::<String>(House::Bedroom).is_ok());
178    /// }
179    /// ```
180    pub fn publisher<D: 'static>(&mut self, topic: T) -> Result<Publisher<D, T>, Box<dyn std::error::Error>> {
181        let mut n = self.node.lock().unwrap();
182        n.publisher(topic)
183    }
184
185    #[inline]
186    #[deprecated(since = "0.3.3", note = "This function will be replaced by `subscriber_blocking` in crosstalk v1.0. `subscriber` will become async. Both `subscriber_*` functions will return a `Result` with return error type `crosstalk::Error`")]
187    /// Creates a new subscriber for the given topic `T`
188    /// 
189    /// # Arguments
190    /// 
191    /// * `topic` - the topic to create a subscriber for
192    /// 
193    /// # Returns
194    /// 
195    /// A subscriber for the topic `T`
196    /// 
197    /// # Panics
198    /// 
199    /// Will panic if the datatype of the topic `T` initialized
200    /// using [`crosstalk_macros::init`] does not match the topic `T`.
201    /// 
202    /// Note that the ***linter will not catch this error until runtime***!
203    /// 
204    /// # Examples
205    /// 
206    /// ```
207    /// use crosstalk::AsTopic;
208    /// 
209    /// #[derive(AsTopic)]
210    /// enum House {
211    ///     Bedroom,
212    ///     LivingRoom,
213    ///     Kitchen,
214    ///     Bathroom,
215    /// }
216    /// 
217    /// crosstalk::init! {
218    ///     House::Bedroom => String,
219    ///     House::LivingRoom => String,
220    ///     House::Kitchen => Vec<f32>,
221    ///     House::Bathroom => u8,
222    /// }
223    /// 
224    /// fn main() {
225    ///     let mut node = crosstalk::BoundedNode::<House>::new(10);
226    ///     assert!(node.subscriber::<String>(House::Bedroom).is_ok());
227    /// }
228    /// ```
229    pub fn subscriber<D: Clone + Send + 'static>(&mut self, topic: T) -> Result<Subscriber<D, T>, Box<dyn std::error::Error>> {
230        let mut n = self.node.lock().unwrap();
231        n.subscriber(topic)
232    }
233
234    #[inline]
235    #[deprecated(since = "0.3.3", note = "This function will be replaced by `pubsub_blocking` in crosstalk v1.0. `pubsub` will become async. Both `pubsub_*` functions will return a `Result` with return error type `crosstalk::Error`")]
236    /// Creates a new publisher and subscriber for the given topic `T`
237    /// 
238    /// # Arguments
239    /// 
240    /// * `topic` - the topic to create a publisher and subscriber for
241    /// 
242    /// # Returns
243    /// 
244    /// A publisher and subscriber for the topic `T`
245    /// 
246    /// # Examples
247    /// 
248    /// ```
249    /// use crosstalk::AsTopic;
250    /// 
251    /// #[derive(AsTopic)]
252    /// enum House {
253    ///     Bedroom,
254    ///     LivingRoom,
255    ///     Kitchen,
256    ///     Bathroom,
257    /// }
258    /// 
259    /// crosstalk::init! {
260    ///     House::Bedroom => String,
261    ///     House::LivingRoom => String,
262    ///     House::Kitchen => Vec<f32>,
263    ///     House::Bathroom => u8,
264    /// }
265    /// 
266    /// fn main() {
267    ///     let mut node = crosstalk::BoundedNode::<House>::new(10);
268    ///     let (publisher, mut subscriber) = node.pubsub(House::Bedroom).unwrap();
269    ///     publisher.write("hello".to_string());
270    ///     assert_eq!(subscriber.try_read().unwrap(), "hello");
271    /// }
272    /// ```
273    /// 
274    pub fn pubsub<D: Clone + Send + 'static>(&mut self, topic: T) -> Result<(Publisher<D, T>, Subscriber<D, T>), Box<dyn std::error::Error>> {
275        let mut n = self.node.lock().unwrap();
276        n.pubsub(topic)
277    }
278
279    #[inline]
280    #[deprecated(since = "0.3.3", note = "This function will be removed in crosstalk v1.0. Dropping the publisher will achieve the same result")]
281    /// Deletes a publisher
282    /// 
283    /// # Arguments
284    /// 
285    /// * `publisher` - the publisher to delete
286    /// 
287    /// # Examples
288    /// 
289    /// ```
290    /// use crosstalk::AsTopic;
291    ///
292    /// #[derive(AsTopic)]
293    /// enum House {
294    ///     Bedroom,
295    ///     LivingRoom,
296    ///     Kitchen,
297    ///     Bathroom,
298    /// }
299    ///
300    /// crosstalk::init! {
301    ///     House::Bedroom => String,
302    ///     House::LivingRoom => String,
303    ///     House::Kitchen => Vec<f32>,
304    ///     House::Bathroom => u8,
305    /// }
306    ///
307    /// fn main() {
308    ///     let mut node = crosstalk::BoundedNode::<House>::new(10);
309    ///     let publisher = node.publisher::<String>(House::Bedroom).unwrap();
310    ///     node.delete_publisher(publisher);
311    ///     assert!(true);
312    /// }
313    /// ```
314    pub fn delete_publisher<D: 'static>(&mut self, _publisher: Publisher<D, T>) {
315        let n = self.node.lock().unwrap();
316        n.delete_publisher(_publisher)
317    }
318
319    #[inline]
320    #[deprecated(since = "0.3.3", note = "This function will be removed in crosstalk v1.0. Dropping the subscriber will achieve the same result")]
321    /// Deletes a subscriber
322    /// 
323    /// # Arguments
324    /// 
325    /// * `subscriber` - the subscriber to delete
326    /// 
327    /// # Examples
328    /// 
329    /// ```
330    /// use crosstalk::AsTopic;
331    ///
332    /// #[derive(AsTopic)]
333    /// enum House {
334    ///     Bedroom,
335    ///     LivingRoom,
336    ///     Kitchen,
337    ///     Bathroom,
338    /// }
339    ///
340    /// crosstalk::init! {
341    ///     House::Bedroom => String,
342    ///     House::LivingRoom => String,
343    ///     House::Kitchen => Vec<f32>,
344    ///     House::Bathroom => u8,
345    /// }
346    ///
347    /// fn main() {
348    ///     let mut node = crosstalk::BoundedNode::<House>::new(10);
349    ///     let subscriber = node.subscriber::<String>(House::Bedroom).unwrap();
350    ///     node.delete_subscriber(subscriber);
351    ///     assert!(true);
352    /// }
353    /// ```
354    pub fn delete_subscriber<D: Clone + Send + 'static>(&mut self, subscriber: Subscriber<D, T>) {
355        let n = self.node.lock().unwrap();
356        n.delete_subscriber(subscriber)
357    }
358}
359
360/// The inner implementation of the node,
361/// which implements the [`AsTopic`] trait
362/// 
363/// This is auto-generated by the [`crosstalk_macros::init!`] macro
364/// 
365/// # Attributes
366/// 
367/// * `senders` - the senders of the node
368/// * `size` - the size of each buffer
369pub struct ImplementedBoundedNode<T> {
370    pub senders: HashMap<T, Box<dyn Any + 'static>>,
371    pub size: usize,
372}
373
374/// [`ImplementedBoundedNode`] implementation of [`Send`]
375unsafe impl<T> Send for ImplementedBoundedNode<T> {}
376/// [`ImplementedBoundedNode`] implementation of [`Sync`]
377unsafe impl<T> Sync for ImplementedBoundedNode<T> {}
378
379/// [`ImplementedBoundedNode`] implementation 
380impl<T> ImplementedBoundedNode<T>
381where
382    T: CrosstalkTopic,
383{
384    /// See [`BoundedNode::new`]
385    /// 
386    /// # Arguments
387    /// 
388    /// * `size` - the size of each buffer
389    pub fn new(size: usize) -> Self {
390        Self {
391            senders: HashMap::new(),
392            size: size,
393        }
394    }
395}
396
397#[derive(Clone)]
398/// A `crosstalk` [`Publisher`]
399/// 
400/// # Attributes
401/// 
402/// * `topic` - the topic of the publisher
403/// * `buf` - the buffer which broadcasts the data
404/// 
405/// # Type Parameters
406/// 
407/// * `T` - the topic of the publisher
408/// * `D` - the data type of the publisher
409/// 
410/// This is not meant to be used directly, please
411/// use the [`crosstalk_macros::init!`] macro instead
412/// and produce a [`Publisher`] with [`BoundedNode::publisher`]
413/// or [`BoundedNode::pubsub`]
414pub struct Publisher<D, T> {
415    pub topic: T,
416    buf: tokio::sync::broadcast::Sender<D>,
417}
418/// Implements [`Publisher`]
419impl<D, T> Publisher<D, T> {
420    #[inline]
421    /// See [`BoundedNode::publisher`]
422    pub fn new(buf: tokio::sync::broadcast::Sender<D>, topic: T) -> Self {
423        Self { buf, topic }
424    }
425
426    #[inline]
427    /// Publishes data to a topic, broadcasting it to all subscribers
428    /// 
429    /// # Arguments
430    /// 
431    /// * `sample` - the sample to publish
432    /// 
433    /// # Examples
434    /// 
435    /// ```
436    /// use crosstalk::AsTopic;
437    /// 
438    /// #[derive(AsTopic)]
439    /// enum House {
440    ///     Bedroom,
441    ///     LivingRoom,
442    ///     Kitchen,
443    ///     Bathroom,
444    /// }
445    /// 
446    /// crosstalk::init! {
447    ///     House::Bedroom => String,
448    ///     House::LivingRoom => String,
449    ///     House::Kitchen => Vec<f32>,
450    ///     House::Bathroom => u8,
451    /// }
452    /// 
453    /// fn main() {
454    ///     let mut node = crosstalk::BoundedNode::<House>::new(10);
455    ///     let (publisher, mut subscriber) = node.pubsub(House::Bedroom).unwrap();
456    ///     publisher.write("hello".to_string());
457    ///     std::thread::spawn(move || {
458    ///         assert_eq!(subscriber.try_read().unwrap(), "hello");
459    ///     });
460    /// }
461    /// ```
462    pub fn write(&self, sample: D) {
463        let _ = self.buf.send(sample);
464    }
465}
466
467/// A `crosstalk` [`Subscriber`]
468/// 
469/// # Attributes
470/// 
471/// * `topic` - the topic of the subscriber
472/// * `rcvr` - the receiver of the subscriber
473/// * `sndr` - the sender for the topic. This is used to spawn multiple receivers upon [`Subscriber::clone`]
474/// 
475/// # Type Parameters
476/// 
477/// * `T` - the topic of the subscriber
478/// * `D` - the data type of the subscriber
479/// 
480/// This is not meant to be used directly, please
481/// use the [`crosstalk_macros::init!`] macro instead
482/// and produce a [`Subscriber`] with [`BoundedNode::subscriber`]
483/// or [`BoundedNode::pubsub`]
484pub struct Subscriber<D, T> {
485    pub topic: T,
486    rcvr: Receiver<D>,
487    sndr: Arc<tokio::sync::broadcast::Sender<D>>,
488}
489/// [`Subscriber`] implementation 
490impl<D: Clone, T: Clone> Subscriber<D, T> {
491    #[inline]
492    /// See [`BoundedNode::subscriber`]
493    pub fn new(
494        topic: T,
495        rcvr: Option<tokio::sync::broadcast::Receiver<D>>,
496        sndr: Arc<tokio::sync::broadcast::Sender<D>>,
497    ) -> Self {
498        Self {
499            topic: topic,
500            rcvr: Receiver::new(rcvr.unwrap_or(sndr.subscribe())),
501            sndr: sndr.clone(),
502        }
503    }
504
505    #[inline]
506    /// Clones a [`Subscriber`]
507    /// 
508    /// # Examples
509    /// 
510    /// ```
511    /// use crosstalk::AsTopic;
512    /// 
513    /// #[derive(AsTopic)]
514    /// enum House {
515    ///     Bedroom,
516    ///     LivingRoom,
517    ///     Kitchen,
518    ///     Bathroom,
519    /// }
520    /// 
521    /// crosstalk::init! {
522    ///     House::Bedroom => String,
523    ///     House::LivingRoom => String,
524    ///     House::Kitchen => Vec<f32>,
525    ///     House::Bathroom => u8,
526    /// }
527    /// 
528    /// fn main() {
529    ///     let mut node = crosstalk::BoundedNode::<House>::new(10);
530    ///     let (publisher, mut subscriber) = node.pubsub(House::Bedroom).unwrap();
531    ///     let mut subscriber_2 = subscriber.clone();
532    ///     publisher.write("hello".to_string());
533    ///     assert_eq!(subscriber.try_read().unwrap(), "hello");
534    ///     assert_eq!(subscriber_2.try_read().unwrap(), "hello");
535    /// }
536    /// ```
537    pub fn clone(&self) -> Self {
538        Self {
539            topic: self.topic.clone(),
540            rcvr: Receiver::new(self.sndr.subscribe()),
541            sndr: self.sndr.clone(),
542        }
543    }
544
545    /// Asynchronous blocking read from the [`tokio::sync::broadcast::Receiver`]
546    /// 
547    /// The sequential equivalent to this function is [`Subscriber::read_blocking`]
548    /// which can be used outside of an asynchronous context
549    /// 
550    /// # Examples
551    /// 
552    /// ```
553    /// use crosstalk::AsTopic;
554    /// 
555    /// #[derive(AsTopic)]
556    /// enum House {
557    ///     Bedroom,
558    ///     LivingRoom,
559    ///     Kitchen,
560    ///     Bathroom,
561    /// }
562    /// 
563    /// crosstalk::init! {
564    ///     House::Bedroom => String,
565    ///     House::LivingRoom => String,
566    ///     House::Kitchen => Vec<f32>,
567    ///     House::Bathroom => u8,
568    /// }
569    /// 
570    /// #[tokio::main]
571    /// async fn main() {
572    ///     let mut node = crosstalk::BoundedNode::<House>::new(10);
573    ///     let (publisher, mut subscriber) = node.pubsub(House::Bedroom).unwrap();
574    ///     publisher.write("hello".to_string());
575    ///     assert_eq!(subscriber.read().await, Some("hello".to_string()));
576    /// }
577    /// ```
578    pub async fn read(&mut self) -> Option<D> {
579        self.rcvr.read().await
580    }
581    
582    #[inline]
583    /// Non-blocking read from [`tokio::sync::broadcast::Receiver`]
584    /// Upon immediate failure, [`None`] will be returned
585    /// 
586    /// Difference between this function and [`Subscriber::try_read_raw`]
587    /// is that this function continuously loops upon [`tokio`] error of
588    /// [`tokio::sync::broadcast::error::TryRecvError::Lagged`], looping
589    /// until a valid message is received OR the buffer is determined to be
590    /// empty
591    /// 
592    /// [`Subscriber::try_read_raw`] will return [`None`] upon
593    /// [`tokio::sync::broadcast::error::TryRecvError::Lagged`], which
594    /// can cause some unexpected behavior
595    /// 
596    /// # Examples
597    /// 
598    /// ```
599    /// use crosstalk::AsTopic;
600    /// 
601    /// #[derive(AsTopic)]
602    /// enum House {
603    ///     Bedroom,
604    ///     LivingRoom,
605    ///     Kitchen,
606    ///     Bathroom,
607    /// }
608    /// 
609    /// crosstalk::init! {
610    ///     House::Bedroom => String,
611    ///     House::LivingRoom => String,
612    ///     House::Kitchen => Vec<f32>,
613    ///     House::Bathroom => u8,
614    /// }
615    /// 
616    /// fn main() {
617    ///     let mut node = crosstalk::BoundedNode::<House>::new(10);
618    ///     let (publisher, mut subscriber) = node.pubsub(House::Bedroom).unwrap();
619    ///     publisher.write("hello".to_string());
620    ///     assert_eq!(subscriber.try_read(), Some(String::from("hello")));
621    ///     assert_eq!(subscriber.try_read(), None);
622    /// }
623    /// ```
624    pub fn try_read(&mut self) -> Option<D> {
625        self.rcvr.try_read()
626    }
627
628    #[inline]
629    /// Non-blocking read from [`tokio::sync::broadcast::Receiver`], returning
630    /// [`None`] if there are no messages available or if 
631    /// [`tokio::sync::broadcast::error::TryRecvError::Lagged`] occurs.
632    /// 
633    /// This function can cause some unexpected behavior. It is recommended
634    /// to use [`Subscriber::try_read`] instead.
635    /// 
636    /// # Examples
637    /// 
638    /// ```
639    /// use crosstalk::AsTopic;
640    /// 
641    /// #[derive(AsTopic)]
642    /// enum House {
643    ///     Bedroom,
644    ///     LivingRoom,
645    ///     Kitchen,
646    ///     Bathroom,
647    /// }
648    /// 
649    /// crosstalk::init! {
650    ///     House::Bedroom => String,
651    ///     House::LivingRoom => String,
652    ///     House::Kitchen => Vec<f32>,
653    ///     House::Bathroom => u8,
654    /// }
655    /// 
656    /// fn main() {
657    ///     let mut node = crosstalk::BoundedNode::<House>::new(10);
658    ///     let (publisher, mut subscriber) = node.pubsub(House::Bedroom).unwrap();
659    ///     publisher.write("hello".to_string());
660    ///     assert_eq!(subscriber.try_read_raw(), Some(String::from("hello")));
661    ///     assert_eq!(subscriber.try_read_raw(), None);
662    /// }
663    /// ```
664    pub fn try_read_raw(&mut self) -> Option<D> {
665        self.rcvr.try_read_raw()
666    }
667    
668    #[inline]
669    /// Sequential blocking read from [`tokio::sync::broadcast::Receiver`]
670    /// 
671    /// The asynchronous equivalent to this function is [`Subscriber::read`], 
672    /// which must be used in an asynchronous context with `.await`
673    /// 
674    /// # Examples
675    /// 
676    /// ```
677    /// use crosstalk::AsTopic;
678    /// 
679    /// #[derive(AsTopic)]
680    /// enum House {
681    ///     Bedroom,
682    ///     LivingRoom,
683    ///     Kitchen,
684    ///     Bathroom,
685    /// }
686    /// 
687    /// crosstalk::init! {
688    ///     House::Bedroom => String,
689    ///     House::LivingRoom => String,
690    ///     House::Kitchen => Vec<f32>,
691    ///     House::Bathroom => u8,
692    /// }
693    /// 
694    /// fn main() {
695    ///     let mut node = crosstalk::BoundedNode::<House>::new(10);
696    ///     let (publisher, mut subscriber) = node.pubsub(House::Bedroom).unwrap();
697    ///     publisher.write("hello".to_string());
698    ///     assert_eq!(subscriber.read_blocking(), Some("hello".to_string()));
699    /// }
700    /// ```
701    pub fn read_blocking(&mut self) -> Option<D> {
702        self.rcvr.read_blocking()
703    }
704    
705    #[inline]
706    /// Asynchronous non-blocking read from [`tokio::sync::broadcast::Receiver`]
707    /// with a given timeout. After the timeout if there are no messages,
708    /// returns [`None`].
709    /// 
710    /// # Examples
711    /// 
712    /// ```
713    /// use crosstalk::AsTopic;
714    /// 
715    /// #[derive(AsTopic)]
716    /// enum House {
717    ///     Bedroom,
718    ///     LivingRoom,
719    ///     Kitchen,
720    ///     Bathroom,
721    /// }
722    /// 
723    /// crosstalk::init! {
724    ///     House::Bedroom => String,
725    ///     House::LivingRoom => String,
726    ///     House::Kitchen => Vec<f32>,
727    ///     House::Bathroom => u8,
728    /// }
729    /// 
730    /// #[tokio::main]
731    /// async fn main() {
732    ///     let mut node = crosstalk::BoundedNode::<House>::new(10);
733    ///     let (publisher, mut subscriber) = node.pubsub(House::Bedroom).unwrap();
734    ///     publisher.write("hello".to_string());
735    ///     assert_eq!(subscriber.read_timeout(std::time::Duration::from_millis(100)).await, Some("hello".to_string()));
736    /// }
737    /// ```
738    pub async fn read_timeout(&mut self, timeout: std::time::Duration) -> Option<D> {
739        self.rcvr.read_timeout(timeout).await
740    }
741}
742
743
744/// Receiver
745/// 
746/// Define a receiver for subscribing messages
747/// 
748/// Reads from [`tokio::sync::broadcast::Receiver`]
749pub struct Receiver<D> {
750    buf: tokio::sync::broadcast::Receiver<D>,
751}
752/// [`Receiver`] implementation
753impl<D> Receiver<D>
754where
755    D: Clone
756{
757    #[inline]
758    pub fn new(
759        buf: tokio::sync::broadcast::Receiver<D>,
760    ) -> Self {
761        Self { buf }
762    }
763
764    #[inline]
765    /// Reads from [`tokio::sync::broadcast::Receiver`]
766    /// 
767    /// This struct/function is not meant to be used directly,
768    /// rather through the [`Subscriber`] struct with [`Subscriber::read`]
769    pub async fn read(&mut self) -> Option<D> {
770        loop {
771            match self.buf.recv().await {
772                Ok(res) => return Some(res),
773                Err(e) => match e {
774                    tokio::sync::broadcast::error::RecvError::Lagged(_) => { continue; }
775
776                    #[cfg(not(any(feature = "log", feature = "tracing")))]
777                    _ => return None,
778
779                    #[cfg(any(feature = "log", feature = "tracing"))]
780                    _ => {
781                        #[cfg(feature = "log")]
782                        log::error!("{}", e);
783                        #[cfg(feature = "tracing")]
784                        tracing::error!("{}", e);
785                        return None
786                    }
787                }
788            }
789        }
790    }    
791
792    #[inline]
793    /// Reads from [`tokio::sync::broadcast::Receiver`]
794    /// 
795    /// This struct/function is not meant to be used directly,
796    /// rather through the [`Subscriber`] struct with [`Subscriber::try_read`]
797    pub fn try_read(&mut self) -> Option<D> {
798        loop {
799            match self.buf.try_recv() {
800                Ok(d) => return Some(d),
801                Err(e) => {
802                    match e {
803                        tokio::sync::broadcast::error::TryRecvError::Lagged(_) => { continue; },
804
805                        #[cfg(not(any(feature = "log", feature = "tracing")))]
806                        _ => return None,
807
808                        #[cfg(any(feature = "log", feature = "tracing"))]
809                        _ => {
810                            #[cfg(feature = "log")]
811                            log::error!("{}", e);
812                            #[cfg(feature = "tracing")]
813                            tracing::error!("{}", e);
814                            return None
815                        },
816                    }
817                },
818            }
819        } 
820    }
821    
822    #[inline]
823    /// Reads from [`tokio::sync::broadcast::Receiver`]
824    /// 
825    /// This struct/function is not meant to be used directly,
826    /// rather through the [`Subscriber`] struct with [`Subscriber::try_read_raw`]
827    pub fn try_read_raw(&mut self) -> Option<D> {
828        match self.buf.try_recv() {
829            Ok(d) => Some(d),
830
831            #[cfg(not(any(feature = "log", feature = "tracing")))]
832            Err(_) => None,
833            
834            #[cfg(any(feature = "log", feature = "tracing"))]
835            Err(e) => {
836                #[cfg(feature = "log")]
837                log::error!("{}", e);
838                #[cfg(feature = "tracing")]
839                tracing::error!("{}", e);
840                None
841            },
842        }
843    }
844    
845    #[inline]
846    /// Reads from [`tokio::sync::broadcast::Receiver`]
847    /// 
848    /// This struct/function is not meant to be used directly,
849    /// rather through the [`Subscriber`] struct with [`Subscriber::read_blocking`]
850    pub fn read_blocking(&mut self) -> Option<D> {
851        loop {
852            match self.buf.blocking_recv() {
853                Ok(res) => return Some(res),
854                Err(e) => match e {
855                    tokio::sync::broadcast::error::RecvError::Lagged(_) => { continue; }
856                    
857                    #[cfg(not(any(feature = "log", feature = "tracing")))]
858                    _ => return None,
859
860                    #[cfg(any(feature = "log", feature = "tracing"))]
861                    _ => {
862                        #[cfg(feature = "log")]
863                        log::error!("{}", e);
864                        #[cfg(feature = "tracing")]
865                        tracing::error!("{}", e);
866                        return None
867                    },
868                }
869            }
870        }
871    }
872    
873    #[inline]
874    /// Reads from [`tokio::sync::broadcast::Receiver`]
875    /// 
876    /// This struct/function is not meant to be used directly,
877    /// rather through the [`Subscriber`] struct with [`Subscriber::read_timeout`]
878    pub async fn read_timeout(&mut self, timeout: std::time::Duration) -> Option<D> {
879        match tokio::runtime::Handle::try_current() {
880            Ok(_) => {
881                match tokio::time::timeout(timeout, self.buf.recv()).await {
882                    Ok(res) => {
883                        match res {
884                            Ok(res) => Some(res),
885
886                            #[cfg(not(any(feature = "log", feature = "tracing")))]
887                            Err(_) => None,
888                            
889                            #[cfg(any(feature = "log", feature = "tracing"))]
890                            Err(e) => {
891                                #[cfg(feature = "log")]
892                                log::error!("{}", e);
893                                #[cfg(feature = "tracing")]
894                                tracing::error!("{}", e);
895                                None
896                            },
897                        }
898                    },
899
900                    #[cfg(not(any(feature = "log", feature = "tracing")))]
901                    Err(_) => None,
902                    
903                    #[cfg(any(feature = "log", feature = "tracing"))]
904                    Err(e) => {
905                        #[cfg(feature = "log")]
906                        log::error!("{}", e);
907                        #[cfg(feature = "tracing")]
908                        tracing::error!("{}", e);
909                        None
910                    },
911                }
912            },
913
914            #[cfg(not(any(feature = "log", feature = "tracing")))]
915            Err(_) => None,
916            
917            #[cfg(any(feature = "log", feature = "tracing"))]
918            Err(e) => {
919                #[cfg(feature = "log")]
920                log::error!("{}", e);
921                #[cfg(feature = "tracing")]
922                tracing::error!("{}", e);
923                None
924            },
925        }
926    }
927}
928
929#[derive(Debug)]
930/// [`crosstalk`](crate) errors
931pub enum Error {
932    PublisherMismatch(String, String),
933    SubscriberMismatch(String, String),
934}
935impl std::error::Error for Error {}
936impl std::fmt::Display for Error {
937    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
938        match self {
939            Error::PublisherMismatch(input, output) => write!(f, "Publisher type mismatch: {} (cast) != {} (expected)", input, output),
940            Error::SubscriberMismatch(input, output) => write!(f, "Subscriber type mismatch: {} (cast) != {} (expected)", input, output),
941        }
942    }
943}
944
945/// A trait to define a [`CrosstalkTopic`]
946pub trait CrosstalkTopic: Eq + Hash + Copy + Clone + PartialEq {}
947/// A trait to define a [`CrosstalkPubSub`]
948/// 
949/// This is used to implement the [`CrosstalkPubSub`] trait
950/// using the [`crosstalk_macros::init!`] macro
951/// for the [`ImplementedBoundedNode`] struct
952/// 
953/// This is not meant to be used directly, and is automatically
954/// implemented when calling [`crosstalk_macros::init!`]
955pub trait CrosstalkPubSub<T> {
956    fn publisher<D: 'static>(&mut self, topic: T) -> Result<Publisher<D, T>, Box<dyn std::error::Error>>;
957    fn subscriber<D: Clone + Send + 'static>(&mut self, topic: T) -> Result<Subscriber<D, T>, Box<dyn std::error::Error>>;
958    fn pubsub<D: Clone + Send + 'static>(&mut self, topic: T) -> Result<(Publisher<D, T>, Subscriber<D, T>), Box<dyn std::error::Error>>;
959    #[deprecated(since = "0.3.3", note = "This function will be removed in crosstalk v1.0. Dropping the publisher will achieve the same result")]
960    fn delete_publisher<D: 'static>(&self, publisher: Publisher<D, T>);
961    #[deprecated(since = "0.3.3", note = "This function will be removed in crosstalk v1.0. Dropping the subscriber will achieve the same result")]
962    fn delete_subscriber<D: Clone + Send + 'static>(&self, subscriber: Subscriber<D, T>);
963}
964
965#[inline]
966/// Downcasts a [`Box`] into a type `T`
967/// 
968/// # Arguments
969/// 
970/// * `buf` - the [`Box`] to downcast
971/// 
972/// # Examples
973/// 
974/// ```
975/// use crosstalk::downcast;
976/// 
977/// let mut buf = Box::new(5) as Box<dyn std::any::Any + 'static>;
978/// assert_eq!(downcast::<i32>(buf).unwrap(), 5);
979/// ```
980pub fn downcast<T>(buf: Box<dyn Any + 'static>) -> Result<T, Box<dyn Any>>
981where
982    T: 'static,
983{
984    match buf.downcast::<T>() {
985        Ok(t) => Ok(*t),
986        Err(e) => Err(e),
987    }
988}