motorcortex_rust/client/
subscribe.rs1use crate::client::subscription::ReadOnlySubscription;
2use crate::client::{receive_message, Parameters, Subscription};
3use crate::connection::{ConnectionManager, ConnectionOptions};
4use crate::{Request, StatusCode};
5use std::collections::HashMap;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, RwLock};
8use std::thread;
9
10pub struct Subscribe {
11 connection_data: ConnectionManager,
12 receive_thread: Option<thread::JoinHandle<()>>,
13 stop_signal: Arc<AtomicBool>,
14 active_subscriptions: Arc<RwLock<HashMap<u32, Arc<RwLock<Subscription>>>>>,
15}
16
17impl Subscribe {
18 const ID_BYTE_SIZE: usize = 3;
19 pub fn new() -> Self {
22 let sock = unsafe {
23 let mut sock: nng_c_sys::nng_socket = std::mem::zeroed();
24 nng_c_sys::nng_sub0_open(&mut sock);
25 sock
26 };
27 Self {
28 connection_data: ConnectionManager::new(sock),
29 receive_thread: None,
30 stop_signal: Arc::new(AtomicBool::new(false)),
31 active_subscriptions: Arc::new(RwLock::new(HashMap::new())),
32 }
33 }
34
35 pub fn connect(
45 &mut self,
46 url: &str,
47 connection_options: ConnectionOptions,
48 ) -> Result<(), String> {
49 let res = self.connection_data.connect(url, connection_options);
50 let stop_signal_clone = Arc::clone(&self.stop_signal);
51 let active_subscriptions_clone = Arc::clone(&self.active_subscriptions);
52
53 let sock = self.connection_data.sock.unwrap();
54 self.receive_thread = Some(thread::spawn(move || {
55 const HEADER_LEN: usize = 4usize;
56 loop {
57 if stop_signal_clone.load(Ordering::Relaxed) {
58 println!("Receive thread stopping...");
59 break;
60 }
61
62 let buffer = match receive_message(&sock) {
63 Ok(buffer) => buffer,
64 Err(_) => continue,
65 };
66
67 if buffer.len() > HEADER_LEN {
68 let id =
69 (buffer[0] as u32) | ((buffer[1] as u32) << 8) | ((buffer[2] as u32) << 16);
70 let subscriptions = active_subscriptions_clone.write().unwrap();
71 if let Some(sub) = subscriptions.get(&id) {
72 let mut sub = sub.write().unwrap();
73 sub.update(buffer);
74 }
75 }
76 }
77 }));
78
79 res
80 }
81
82 pub fn disconnect(&mut self) -> Result<(), String> {
88 self.stop_signal.store(true, Ordering::Relaxed);
89 let res = self.connection_data.disconnect();
90 self.receive_thread.take().unwrap().join().unwrap();
91 res
92 }
93
94 pub fn unsubscribe(&mut self, request: &Request, id: u32) -> Result<(), String> {
95 let sub = self.active_subscriptions.write().unwrap().remove(&id);
97 if sub.is_some() {
98 let bytes = id.to_le_bytes();
100 let _rv = unsafe {
101 nng_c_sys::nng_setopt(
102 self.connection_data.sock.unwrap(),
103 nng_c_sys::NNG_OPT_SUB_UNSUBSCRIBE.as_ptr() as *const core::ffi::c_char,
104 bytes.as_ptr() as *const std::ffi::c_void,
105 Subscribe::ID_BYTE_SIZE,
106 )
107 };
108
109 let sub = sub.unwrap();
111 let name = sub.read().unwrap().name();
112 request.remove_group(&name).expect("TODO: panic message");
113 }
114 Ok(())
115 }
116
117 pub fn subscribe<I>(
118 &mut self,
119 request: &Request,
120 paths: I,
121 group_name: &str,
122 frequency_divider: u32,
123 ) -> Result<ReadOnlySubscription, String>
124 where
125 I: Parameters,
126 {
127 let sub = request.create_group(paths, group_name, frequency_divider)?;
128
129 if sub.status != StatusCode::Ok as i32 {
130 return Err(format!(
131 "Failed to create group. Error code: {}",
132 sub.status
133 ));
134 }
135
136 let id = sub.id;
137 let bytes = id.to_le_bytes();
138 let rv = unsafe {
139 nng_c_sys::nng_setopt(
140 self.connection_data.sock.unwrap(),
141 nng_c_sys::NNG_OPT_SUB_SUBSCRIBE.as_ptr() as *const core::ffi::c_char,
142 bytes.as_ptr() as *const std::ffi::c_void,
143 Subscribe::ID_BYTE_SIZE,
144 )
145 };
146 if rv != 0 {
147 return Err(format!(
148 "Failed to subscribe to the specified paths. Error code: {}",
149 rv
150 ));
151 }
152
153 let subscription = Arc::new(RwLock::new(Subscription::new(sub)));
155 let mut subscriptions = self.active_subscriptions.write().unwrap();
156 subscriptions.insert(id, subscription.clone());
157
158 Ok(ReadOnlySubscription::new(subscription))
160 }
161}