agentic_robotics_core/
subscriber.rs1use crate::error::{Error, Result};
4use crate::message::Message;
5use crate::serialization::deserialize_cdr;
6use crossbeam::channel::{self, Receiver, Sender};
7use std::sync::Arc;
8use tracing::debug;
9
10pub struct Subscriber<T: Message> {
12 topic: String,
13 receiver: Receiver<T>,
14 _sender: Arc<Sender<T>>, }
16
17impl<T: Message> Subscriber<T> {
18 pub fn new(topic: impl Into<String>) -> Self {
20 let topic = topic.into();
21 debug!("Creating subscriber for topic: {}", topic);
22
23 let (sender, receiver) = channel::unbounded();
24
25 Self {
26 topic,
27 receiver,
28 _sender: Arc::new(sender),
29 }
30 }
31
32 pub fn recv(&self) -> Result<T> {
34 self.receiver
35 .recv()
36 .map_err(|e| Error::Other(e.into()))
37 }
38
39 pub fn try_recv(&self) -> Result<Option<T>> {
41 match self.receiver.try_recv() {
42 Ok(msg) => Ok(Some(msg)),
43 Err(crossbeam::channel::TryRecvError::Empty) => Ok(None),
44 Err(e) => Err(Error::Other(e.into())),
45 }
46 }
47
48 pub async fn recv_async(&self) -> Result<T> {
50 let receiver = self.receiver.clone();
51 tokio::task::spawn_blocking(move || {
52 receiver.recv()
53 })
54 .await
55 .map_err(|e| Error::Other(e.into()))?
56 .map_err(|e| Error::Other(e.into()))
57 }
58
59 pub fn topic(&self) -> &str {
61 &self.topic
62 }
63}
64
65impl<T: Message> Clone for Subscriber<T> {
66 fn clone(&self) -> Self {
67 Self {
68 topic: self.topic.clone(),
69 receiver: self.receiver.clone(),
70 _sender: self._sender.clone(),
71 }
72 }
73}
74
75#[cfg(test)]
76mod tests {
77 use super::*;
78 use crate::message::RobotState;
79
80 #[test]
81 fn test_subscriber_creation() {
82 let subscriber = Subscriber::<RobotState>::new("robot/state");
83 assert_eq!(subscriber.topic(), "robot/state");
84 }
85
86 #[test]
87 fn test_subscriber_try_recv() {
88 let subscriber = Subscriber::<RobotState>::new("robot/state");
89 let result = subscriber.try_recv().unwrap();
90 assert!(result.is_none());
91 }
92}