motorcortex_rust/client/
subscribe.rs1use crate::client::subscription::ReadOnlySubscription;
2use crate::client::{receive_message, Parameters, Subscription};
3use crate::connection::{ConnectionManager, ConnectionOptions};
4use crate::error::{MotorcortexError, Result};
5use crate::{Request, StatusCode};
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::{Arc, RwLock};
9use std::thread;
10
11pub struct Subscribe {
21 connection_data: ConnectionManager,
22 receive_thread: Option<thread::JoinHandle<()>>,
23 stop_signal: Arc<AtomicBool>,
24 active_subscriptions: Arc<RwLock<HashMap<u32, Arc<RwLock<Subscription>>>>>,
25}
26
27impl Subscribe {
28 const ID_BYTE_SIZE: usize = 3;
29
30 pub fn new() -> Self {
34 Self {
35 connection_data: ConnectionManager::new(),
36 receive_thread: None,
37 stop_signal: Arc::new(AtomicBool::new(false)),
38 active_subscriptions: Arc::new(RwLock::new(HashMap::new())),
39 }
40 }
41
42 pub fn connect(&mut self, url: &str, connection_options: ConnectionOptions) -> Result<()> {
54 let res = self
55 .connection_data
56 .connect(url, connection_options, nng_c_sys::nng_sub0_open);
57 let stop_signal_clone = Arc::clone(&self.stop_signal);
58 let active_subscriptions_clone = Arc::clone(&self.active_subscriptions);
59
60 let sock = self.connection_data.sock.unwrap();
61 self.receive_thread = Some(thread::spawn(move || {
62 const HEADER_LEN: usize = 4usize;
63 loop {
64 if stop_signal_clone.load(Ordering::Relaxed) {
65 println!("Receive thread stopping...");
66 break;
67 }
68
69 let buffer = match receive_message(&sock) {
70 Ok(buffer) => buffer,
71 Err(_) => continue,
72 };
73
74 if buffer.len() > HEADER_LEN {
75 let id =
76 (buffer[0] as u32) | ((buffer[1] as u32) << 8) | ((buffer[2] as u32) << 16);
77 let subscriptions = active_subscriptions_clone.write().unwrap();
78 if let Some(sub) = subscriptions.get(&id) {
79 let mut sub = sub.write().unwrap();
80 sub.update(buffer);
81 }
82 }
83 }
84 }));
85
86 res
87 }
88
89 pub fn disconnect(&mut self) -> Result<()> {
94 self.stop_signal.store(true, Ordering::Relaxed);
95 let res = self.connection_data.disconnect();
96 self.receive_thread.take().unwrap().join().unwrap();
97 res
98 }
99
100 pub fn unsubscribe(&mut self, request: &Request, id: u32) -> Result<()> {
109 let sub = self.active_subscriptions.write().unwrap().remove(&id);
110 if let Some(sub) = sub {
111 let bytes = id.to_le_bytes();
113 let rv = unsafe {
114 nng_c_sys::nng_setopt(
115 self.connection_data.sock.unwrap(),
116 nng_c_sys::NNG_OPT_SUB_UNSUBSCRIBE.as_ptr() as *const core::ffi::c_char,
117 bytes.as_ptr() as *const std::ffi::c_void,
118 Subscribe::ID_BYTE_SIZE,
119 )
120 };
121 if rv != 0 {
122 return Err(MotorcortexError::Subscription(format!(
123 "Failed to unsubscribe from NNG. Error code: {}",
124 rv
125 )));
126 }
127
128 let name = sub.read().unwrap().name();
130 request.remove_group(&name)?;
131 }
132 Ok(())
133 }
134
135 pub fn subscribe<I>(
146 &mut self,
147 request: &Request,
148 paths: I,
149 group_name: &str,
150 frequency_divider: u32,
151 ) -> Result<ReadOnlySubscription>
152 where
153 I: Parameters,
154 {
155 let sub = request.create_group(paths, group_name, frequency_divider)?;
156
157 if sub.status != StatusCode::Ok as i32 {
158 return Err(MotorcortexError::Subscription(format!(
159 "Failed to create group, status: {}",
160 sub.status
161 )));
162 }
163
164 let id = sub.id;
165 let bytes = id.to_le_bytes();
166 let rv = unsafe {
167 nng_c_sys::nng_setopt(
168 self.connection_data.sock.unwrap(),
169 nng_c_sys::NNG_OPT_SUB_SUBSCRIBE.as_ptr() as *const core::ffi::c_char,
170 bytes.as_ptr() as *const std::ffi::c_void,
171 Subscribe::ID_BYTE_SIZE,
172 )
173 };
174 if rv != 0 {
175 return Err(MotorcortexError::Subscription(format!(
176 "Failed to subscribe via NNG. Error code: {}",
177 rv
178 )));
179 }
180
181 let subscription = Arc::new(RwLock::new(Subscription::new(sub)));
183 let mut subscriptions = self.active_subscriptions.write().unwrap();
184 subscriptions.insert(id, subscription.clone());
185
186 Ok(ReadOnlySubscription::new(subscription))
187 }
188}
189
190impl Drop for Subscribe {
191 fn drop(&mut self) {
192 self.stop_signal.store(true, Ordering::Relaxed);
193 if let Some(handle) = self.receive_thread.take() {
194 let _ = handle.join();
195 }
196 }
198}