Skip to main content

motorcortex_rust/client/
subscribe.rs

1use crate::client::subscription::ReadOnlySubscription;
2use crate::client::{receive_message, Parameters, Subscription};
3use crate::connection::{ConnectionManager, ConnectionOptions};
4use crate::error::{MotorcortexError, Result};
5use crate::{Request, StatusCode};
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::{Arc, RwLock};
9use std::thread;
10
11/// Real-time streaming client that receives parameter updates from the server.
12///
13/// # Thread Safety
14///
15/// `Subscribe` is `Send` but not `Sync`. You can move it to another thread,
16/// but you cannot share it between threads by reference. The internal receive
17/// thread is managed automatically — no external synchronization is needed
18/// for subscription reads via `ReadOnlySubscription`, which is independently
19/// thread-safe.
20pub struct Subscribe {
21    connection_data: ConnectionManager,
22    receive_thread: Option<thread::JoinHandle<()>>,
23    stop_signal: Arc<AtomicBool>,
24    active_subscriptions: Arc<RwLock<HashMap<u32, Arc<RwLock<Subscription>>>>>,
25}
26
27impl Subscribe {
28    const ID_BYTE_SIZE: usize = 3;
29
30    /// Creates a new `Subscribe` instance.
31    ///
32    /// No socket is opened until `connect()` is called.
33    pub fn new() -> Self {
34        Self {
35            connection_data: ConnectionManager::new(),
36            receive_thread: None,
37            stop_signal: Arc::new(AtomicBool::new(false)),
38            active_subscriptions: Arc::new(RwLock::new(HashMap::new())),
39        }
40    }
41
42    /// Establishes a connection to a specified server URL using the given configuration options.
43    ///
44    /// Opens a SUB socket, connects to the server, and spawns a background
45    /// receive thread for processing incoming subscription updates.
46    ///
47    /// # Arguments:
48    /// * `url` - The server's address (e.g., "wss://127.0.0.1:5569").
49    /// * `connection_options` - The connection settings, including TLS certificate and timeouts.
50    ///
51    /// # Errors:
52    /// Returns `MotorcortexError::Connection` if the socket cannot be opened or the connection fails.
53    pub fn connect(&mut self, url: &str, connection_options: ConnectionOptions) -> Result<()> {
54        let res = self
55            .connection_data
56            .connect(url, connection_options, nng_c_sys::nng_sub0_open);
57        let stop_signal_clone = Arc::clone(&self.stop_signal);
58        let active_subscriptions_clone = Arc::clone(&self.active_subscriptions);
59
60        let sock = self.connection_data.sock.unwrap();
61        self.receive_thread = Some(thread::spawn(move || {
62            const HEADER_LEN: usize = 4usize;
63            loop {
64                if stop_signal_clone.load(Ordering::Relaxed) {
65                    println!("Receive thread stopping...");
66                    break;
67                }
68
69                let buffer = match receive_message(&sock) {
70                    Ok(buffer) => buffer,
71                    Err(_) => continue,
72                };
73
74                if buffer.len() > HEADER_LEN {
75                    let id =
76                        (buffer[0] as u32) | ((buffer[1] as u32) << 8) | ((buffer[2] as u32) << 16);
77                    let subscriptions = active_subscriptions_clone.write().unwrap();
78                    if let Some(sub) = subscriptions.get(&id) {
79                        let mut sub = sub.write().unwrap();
80                        sub.update(buffer);
81                    }
82                }
83            }
84        }));
85
86        res
87    }
88
89    /// Disconnects the current connection and frees the associated resources.
90    ///
91    /// # Errors:
92    /// Returns `MotorcortexError::Connection` if the disconnection fails.
93    pub fn disconnect(&mut self) -> Result<()> {
94        self.stop_signal.store(true, Ordering::Relaxed);
95        let res = self.connection_data.disconnect();
96        self.receive_thread.take().unwrap().join().unwrap();
97        res
98    }
99
100    /// Removes a subscription and its associated server-side group.
101    ///
102    /// # Arguments:
103    /// * `request` - The Request client used to remove the group on the server.
104    /// * `id` - The subscription ID to remove.
105    ///
106    /// # Errors:
107    /// Returns `MotorcortexError::Subscription` if the NNG unsubscribe fails.
108    pub fn unsubscribe(&mut self, request: &Request, id: u32) -> Result<()> {
109        let sub = self.active_subscriptions.write().unwrap().remove(&id);
110        if let Some(sub) = sub {
111            // Remove subscription hash from nng
112            let bytes = id.to_le_bytes();
113            let rv = unsafe {
114                nng_c_sys::nng_setopt(
115                    self.connection_data.sock.unwrap(),
116                    nng_c_sys::NNG_OPT_SUB_UNSUBSCRIBE.as_ptr() as *const core::ffi::c_char,
117                    bytes.as_ptr() as *const std::ffi::c_void,
118                    Subscribe::ID_BYTE_SIZE,
119                )
120            };
121            if rv != 0 {
122                return Err(MotorcortexError::Subscription(format!(
123                    "Failed to unsubscribe from NNG. Error code: {}",
124                    rv
125                )));
126            }
127
128            // Send remove group message
129            let name = sub.read().unwrap().name();
130            request.remove_group(&name)?;
131        }
132        Ok(())
133    }
134
135    /// Creates a new subscription for the specified parameters.
136    ///
137    /// # Arguments:
138    /// * `request` - The Request client used to create the group on the server.
139    /// * `paths` - The parameter paths to subscribe to.
140    /// * `group_name` - An alias for the subscription group.
141    /// * `frequency_divider` - Update rate divider relative to the server's base frequency.
142    ///
143    /// # Errors:
144    /// Returns `MotorcortexError::Subscription` if the group creation or NNG subscribe fails.
145    pub fn subscribe<I>(
146        &mut self,
147        request: &Request,
148        paths: I,
149        group_name: &str,
150        frequency_divider: u32,
151    ) -> Result<ReadOnlySubscription>
152    where
153        I: Parameters,
154    {
155        let sub = request.create_group(paths, group_name, frequency_divider)?;
156
157        if sub.status != StatusCode::Ok as i32 {
158            return Err(MotorcortexError::Subscription(format!(
159                "Failed to create group, status: {}",
160                sub.status
161            )));
162        }
163
164        let id = sub.id;
165        let bytes = id.to_le_bytes();
166        let rv = unsafe {
167            nng_c_sys::nng_setopt(
168                self.connection_data.sock.unwrap(),
169                nng_c_sys::NNG_OPT_SUB_SUBSCRIBE.as_ptr() as *const core::ffi::c_char,
170                bytes.as_ptr() as *const std::ffi::c_void,
171                Subscribe::ID_BYTE_SIZE,
172            )
173        };
174        if rv != 0 {
175            return Err(MotorcortexError::Subscription(format!(
176                "Failed to subscribe via NNG. Error code: {}",
177                rv
178            )));
179        }
180
181        // Create the subscription and store it in the HashMap
182        let subscription = Arc::new(RwLock::new(Subscription::new(sub)));
183        let mut subscriptions = self.active_subscriptions.write().unwrap();
184        subscriptions.insert(id, subscription.clone());
185
186        Ok(ReadOnlySubscription::new(subscription))
187    }
188}
189
190impl Drop for Subscribe {
191    fn drop(&mut self) {
192        self.stop_signal.store(true, Ordering::Relaxed);
193        if let Some(handle) = self.receive_thread.take() {
194            let _ = handle.join();
195        }
196        // ConnectionManager::drop() handles socket and TLS cleanup
197    }
198}