Skip to main content

motorcortex_rust/client/
subscribe.rs

1use crate::client::subscription::ReadOnlySubscription;
2use crate::client::{receive_message, Parameters, Subscription};
3use crate::connection::{ConnectionManager, ConnectionOptions};
4use crate::{Request, StatusCode};
5use std::collections::HashMap;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::{Arc, RwLock};
8use std::thread;
9
10pub struct Subscribe {
11    connection_data: ConnectionManager,
12    receive_thread: Option<thread::JoinHandle<()>>,
13    stop_signal: Arc<AtomicBool>,
14    active_subscriptions: Arc<RwLock<HashMap<u32, Arc<RwLock<Subscription>>>>>,
15}
16
17impl Subscribe {
18    const ID_BYTE_SIZE: usize = 3;
19    /// Deleted default constructor equivalent in Rust
20    /// This prevents instantiation without arguments, as Rust does not support `= delete`.
21    pub fn new() -> Self {
22        let sock = unsafe {
23            let mut sock: nng_c_sys::nng_socket = std::mem::zeroed();
24            nng_c_sys::nng_sub0_open(&mut sock);
25            sock
26        };
27        Self {
28            connection_data: ConnectionManager::new(sock),
29            receive_thread: None,
30            stop_signal: Arc::new(AtomicBool::new(false)),
31            active_subscriptions: Arc::new(RwLock::new(HashMap::new())),
32        }
33    }
34
35    /// Establishes a connection to a specified server URL using the given configuration options.
36    ///
37    /// # Arguments:
38    /// * `url` - The server's address (e.g., "tcp://127.0.0.1:5555").
39    /// * `connection_options` - The connection settings, including TLS certificate and timeouts.
40    ///
41    /// # Returns:
42    /// * `Ok(())` if the connection is successful.
43    /// * `Err(String)` with the error message if the operation fails.
44    pub fn connect(
45        &mut self,
46        url: &str,
47        connection_options: ConnectionOptions,
48    ) -> Result<(), String> {
49        let res = self.connection_data.connect(url, connection_options);
50        let stop_signal_clone = Arc::clone(&self.stop_signal);
51        let active_subscriptions_clone = Arc::clone(&self.active_subscriptions);
52
53        let sock = self.connection_data.sock.unwrap();
54        self.receive_thread = Some(thread::spawn(move || {
55            const HEADER_LEN: usize = 4usize;
56            loop {
57                if stop_signal_clone.load(Ordering::Relaxed) {
58                    println!("Receive thread stopping...");
59                    break;
60                }
61
62                let buffer = match receive_message(&sock) {
63                    Ok(buffer) => buffer,
64                    Err(_) => continue,
65                };
66
67                if buffer.len() > HEADER_LEN {
68                    let id =
69                        (buffer[0] as u32) | ((buffer[1] as u32) << 8) | ((buffer[2] as u32) << 16);
70                    let subscriptions = active_subscriptions_clone.write().unwrap();
71                    if let Some(sub) = subscriptions.get(&id) {
72                        let mut sub = sub.write().unwrap();
73                        sub.update(buffer);
74                    }
75                }
76            }
77        }));
78
79        res
80    }
81
82    /// Disconnects the current connection and frees the associated resources.
83    ///
84    /// # Returns:
85    /// * `Ok(())` if the disconnection is successful.
86    /// * `Err(String)` with the error message if the operation fails.
87    pub fn disconnect(&mut self) -> Result<(), String> {
88        self.stop_signal.store(true, Ordering::Relaxed);
89        let res = self.connection_data.disconnect();
90        self.receive_thread.take().unwrap().join().unwrap();
91        res
92    }
93
94    pub fn unsubscribe(&mut self, request: &Request, id: u32) -> Result<(), String> {
95        // remove from active subscriptions
96        let sub = self.active_subscriptions.write().unwrap().remove(&id);
97        if sub.is_some() {
98            // remove subscription hash from nng
99            let bytes = id.to_le_bytes();
100            let _rv = unsafe {
101                nng_c_sys::nng_setopt(
102                    self.connection_data.sock.unwrap(),
103                    nng_c_sys::NNG_OPT_SUB_UNSUBSCRIBE.as_ptr() as *const core::ffi::c_char,
104                    bytes.as_ptr() as *const std::ffi::c_void,
105                    Subscribe::ID_BYTE_SIZE,
106                )
107            };
108
109            // send remove group message
110            let sub = sub.unwrap();
111            let name = sub.read().unwrap().name();
112            request.remove_group(&name).expect("TODO: panic message");
113        }
114        Ok(())
115    }
116
117    pub fn subscribe<I>(
118        &mut self,
119        request: &Request,
120        paths: I,
121        group_name: &str,
122        frequency_divider: u32,
123    ) -> Result<ReadOnlySubscription, String>
124    where
125        I: Parameters,
126    {
127        let sub = request.create_group(paths, group_name, frequency_divider)?;
128
129        if sub.status != StatusCode::Ok as i32 {
130            return Err(format!(
131                "Failed to create group. Error code: {}",
132                sub.status
133            ));
134        }
135
136        let id = sub.id;
137        let bytes = id.to_le_bytes();
138        let rv = unsafe {
139            nng_c_sys::nng_setopt(
140                self.connection_data.sock.unwrap(),
141                nng_c_sys::NNG_OPT_SUB_SUBSCRIBE.as_ptr() as *const core::ffi::c_char,
142                bytes.as_ptr() as *const std::ffi::c_void,
143                Subscribe::ID_BYTE_SIZE,
144            )
145        };
146        if rv != 0 {
147            return Err(format!(
148                "Failed to subscribe to the specified paths. Error code: {}",
149                rv
150            ));
151        }
152
153        // Create the subscription and store it in the HashMap
154        let subscription = Arc::new(RwLock::new(Subscription::new(sub)));
155        let mut subscriptions = self.active_subscriptions.write().unwrap();
156        subscriptions.insert(id, subscription.clone());
157
158        // Retrieve a read lock from the subscription stored in the map
159        Ok(ReadOnlySubscription::new(subscription))
160    }
161}