agentic_robotics_core/
subscriber.rs

1//! Subscriber implementation
2
3use crate::error::{Error, Result};
4use crate::message::Message;
5use crate::serialization::deserialize_cdr;
6use crossbeam::channel::{self, Receiver, Sender};
7use std::sync::Arc;
8use tracing::debug;
9
10/// Subscriber for receiving messages
11pub struct Subscriber<T: Message> {
12    topic: String,
13    receiver: Receiver<T>,
14    _sender: Arc<Sender<T>>, // Keep sender alive
15}
16
17impl<T: Message> Subscriber<T> {
18    /// Create a new subscriber
19    pub fn new(topic: impl Into<String>) -> Self {
20        let topic = topic.into();
21        debug!("Creating subscriber for topic: {}", topic);
22
23        let (sender, receiver) = channel::unbounded();
24
25        Self {
26            topic,
27            receiver,
28            _sender: Arc::new(sender),
29        }
30    }
31
32    /// Receive a message (blocking)
33    pub fn recv(&self) -> Result<T> {
34        self.receiver
35            .recv()
36            .map_err(|e| Error::Other(e.into()))
37    }
38
39    /// Try to receive a message (non-blocking)
40    pub fn try_recv(&self) -> Result<Option<T>> {
41        match self.receiver.try_recv() {
42            Ok(msg) => Ok(Some(msg)),
43            Err(crossbeam::channel::TryRecvError::Empty) => Ok(None),
44            Err(e) => Err(Error::Other(e.into())),
45        }
46    }
47
48    /// Receive a message asynchronously
49    pub async fn recv_async(&self) -> Result<T> {
50        let receiver = self.receiver.clone();
51        tokio::task::spawn_blocking(move || {
52            receiver.recv()
53        })
54        .await
55        .map_err(|e| Error::Other(e.into()))?
56        .map_err(|e| Error::Other(e.into()))
57    }
58
59    /// Get topic name
60    pub fn topic(&self) -> &str {
61        &self.topic
62    }
63}
64
65impl<T: Message> Clone for Subscriber<T> {
66    fn clone(&self) -> Self {
67        Self {
68            topic: self.topic.clone(),
69            receiver: self.receiver.clone(),
70            _sender: self._sender.clone(),
71        }
72    }
73}
74
75#[cfg(test)]
76mod tests {
77    use super::*;
78    use crate::message::RobotState;
79
80    #[test]
81    fn test_subscriber_creation() {
82        let subscriber = Subscriber::<RobotState>::new("robot/state");
83        assert_eq!(subscriber.topic(), "robot/state");
84    }
85
86    #[test]
87    fn test_subscriber_try_recv() {
88        let subscriber = Subscriber::<RobotState>::new("robot/state");
89        let result = subscriber.try_recv().unwrap();
90        assert!(result.is_none());
91    }
92}