motorcortex_rust/client/
subscribe.rs1use crate::client::subscription::ReadOnlySubscription;
3use crate::client::{receive_message, Parameters, Subscription};
4use crate::connection::{ConnectionManager, ConnectionOptions};
5use crate::{Request, StatusCode};
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::{Arc, RwLock};
9use std::thread;
10
11pub struct Subscribe<'a> {
12 request: &'a Request, connection_data: ConnectionManager,
14 receive_thread: Option<thread::JoinHandle<()>>,
15 stop_signal: Arc<AtomicBool>,
16 active_subscriptions: Arc<RwLock<HashMap<u32, Arc<RwLock<Subscription>>>>>,
17}
18
19impl<'a> Subscribe<'a> {
20 const ID_BYTE_SIZE: usize = 3;
21 pub fn new(request: &'a Request) -> Self {
24 let sock = unsafe {
25 let mut sock: nng_c_sys::nng_socket = std::mem::zeroed();
26 nng_c_sys::nng_sub0_open(&mut sock);
27 sock
28 };
29 Self {
30 request,
31 connection_data: ConnectionManager::new(sock),
32 receive_thread: None,
33 stop_signal: Arc::new(AtomicBool::new(false)),
34 active_subscriptions: Arc::new(RwLock::new(HashMap::new())),
35 }
36 }
37
38 pub fn connect(
48 &mut self,
49 url: &str,
50 connection_options: ConnectionOptions,
51 ) -> Result<(), String> {
52 let res = self.connection_data.connect(url, connection_options);
53 let stop_signal_clone = Arc::clone(&self.stop_signal);
54 let active_subscriptions_clone = Arc::clone(&self.active_subscriptions);
55
56 let sock = self.connection_data.sock.unwrap();
57 self.receive_thread = Some(thread::spawn(move || {
58 const HEADER_LEN: usize = 4usize;
59 loop {
60 if stop_signal_clone.load(Ordering::Relaxed) {
61 println!("Receive thread stopping...");
62 break;
63 }
64
65 let buffer = match receive_message(&sock) {
66 Ok(buffer) => buffer,
67 Err(_) => continue,
68 };
69
70 if buffer.len() > HEADER_LEN {
71 let id =
72 (buffer[0] as u32) | ((buffer[1] as u32) << 8) | ((buffer[2] as u32) << 16);
73 let subscriptions = active_subscriptions_clone.write().unwrap();
74 if let Some(sub) = subscriptions.get(&id) {
75 let mut sub = sub.write().unwrap();
76 sub.update(buffer);
77 }
78 }
79 }
80 }));
81
82 res
83 }
84
85 pub fn disconnect(&mut self) -> Result<(), String> {
91 self.stop_signal.store(true, Ordering::Relaxed);
92 let res = self.connection_data.disconnect();
93 self.receive_thread.take().unwrap().join().unwrap();
94 res
95 }
96
97 pub fn subscribe<I>(
98 &mut self,
99 paths: I,
100 group_name: &str,
101 frequency_divider: u32,
102 ) -> Result<ReadOnlySubscription, String>
103 where
104 I: Parameters,
105 {
106 let sub = self
107 .request
108 .create_group(paths, group_name, frequency_divider)?;
109
110 if sub.status != StatusCode::Ok as i32 {
111 return Err(format!(
112 "Failed to create group. Error code: {}",
113 sub.status
114 ));
115 }
116
117 let id = sub.id;
118 let bytes = id.to_le_bytes();
119 let rv = unsafe {
120 nng_c_sys::nng_setopt(
121 self.connection_data.sock.unwrap(),
122 nng_c_sys::NNG_OPT_SUB_SUBSCRIBE.as_ptr() as *const i8,
123 bytes.as_ptr() as *const std::ffi::c_void,
124 Subscribe::ID_BYTE_SIZE,
125 )
126 };
127 if rv != 0 {
128 return Err(format!(
129 "Failed to subscribe to the specified paths. Error code: {}",
130 rv
131 ));
132 }
133
134 let subscription = Arc::new(RwLock::new(Subscription::new(sub)));
136 let mut subscriptions = self.active_subscriptions.write().unwrap();
137 subscriptions.insert(id, subscription.clone());
138
139 Ok(ReadOnlySubscription::new(subscription))
141 }
142}