motorcortex_rust/client/
subscribe.rs

1// use crate::client::subscription::ReadOnlySubscription;
2use crate::client::subscription::ReadOnlySubscription;
3use crate::client::{receive_message, Parameters, Subscription};
4use crate::connection::{ConnectionManager, ConnectionOptions};
5use crate::{Request, StatusCode};
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::{Arc, RwLock};
9use std::thread;
10
11pub struct Subscribe<'a> {
12    request: &'a Request, // Reference to a Request
13    connection_data: ConnectionManager,
14    receive_thread: Option<thread::JoinHandle<()>>,
15    stop_signal: Arc<AtomicBool>,
16    active_subscriptions: Arc<RwLock<HashMap<u32, Arc<RwLock<Subscription>>>>>,
17}
18
19impl<'a> Subscribe<'a> {
20    const ID_BYTE_SIZE: usize = 3;
21    /// Deleted default constructor equivalent in Rust
22    /// This prevents instantiation without arguments, as Rust does not support `= delete`.
23    pub fn new(request: &'a Request) -> Self {
24        let sock = unsafe {
25            let mut sock: nng_c_sys::nng_socket = std::mem::zeroed();
26            nng_c_sys::nng_sub0_open(&mut sock);
27            sock
28        };
29        Self {
30            request,
31            connection_data: ConnectionManager::new(sock),
32            receive_thread: None,
33            stop_signal: Arc::new(AtomicBool::new(false)),
34            active_subscriptions: Arc::new(RwLock::new(HashMap::new())),
35        }
36    }
37
38    /// Establishes a connection to a specified server URL using the given configuration options.
39    ///
40    /// # Arguments:
41    /// * `url` - The server's address (e.g., "tcp://127.0.0.1:5555").
42    /// * `connection_options` - The connection settings, including TLS certificate and timeouts.
43    ///
44    /// # Returns:
45    /// * `Ok(())` if the connection is successful.
46    /// * `Err(String)` with the error message if the operation fails.
47    pub fn connect(
48        &mut self,
49        url: &str,
50        connection_options: ConnectionOptions,
51    ) -> Result<(), String> {
52        let res = self.connection_data.connect(url, connection_options);
53        let stop_signal_clone = Arc::clone(&self.stop_signal);
54        let active_subscriptions_clone = Arc::clone(&self.active_subscriptions);
55
56        let sock = self.connection_data.sock.unwrap();
57        self.receive_thread = Some(thread::spawn(move || {
58            const HEADER_LEN: usize = 4usize;
59            loop {
60                if stop_signal_clone.load(Ordering::Relaxed) {
61                    println!("Receive thread stopping...");
62                    break;
63                }
64
65                let buffer = match receive_message(&sock) {
66                    Ok(buffer) => buffer,
67                    Err(_) => continue,
68                };
69
70                if buffer.len() > HEADER_LEN {
71                    let id =
72                        (buffer[0] as u32) | ((buffer[1] as u32) << 8) | ((buffer[2] as u32) << 16);
73                    let subscriptions = active_subscriptions_clone.write().unwrap();
74                    if let Some(sub) = subscriptions.get(&id) {
75                        let mut sub = sub.write().unwrap();
76                        sub.update(buffer);
77                    }
78                }
79            }
80        }));
81
82        res
83    }
84
85    /// Disconnects the current connection and frees the associated resources.
86    ///
87    /// # Returns:
88    /// * `Ok(())` if the disconnection is successful.
89    /// * `Err(String)` with the error message if the operation fails.
90    pub fn disconnect(&mut self) -> Result<(), String> {
91        self.stop_signal.store(true, Ordering::Relaxed);
92        let res = self.connection_data.disconnect();
93        self.receive_thread.take().unwrap().join().unwrap();
94        res
95    }
96
97    pub fn subscribe<I>(
98        &mut self,
99        paths: I,
100        group_name: &str,
101        frequency_divider: u32,
102    ) -> Result<ReadOnlySubscription, String>
103    where
104        I: Parameters,
105    {
106        let sub = self
107            .request
108            .create_group(paths, group_name, frequency_divider)?;
109
110        if sub.status != StatusCode::Ok as i32 {
111            return Err(format!(
112                "Failed to create group. Error code: {}",
113                sub.status
114            ));
115        }
116
117        let id = sub.id;
118        let bytes = id.to_le_bytes();
119        let rv = unsafe {
120            nng_c_sys::nng_setopt(
121                self.connection_data.sock.unwrap(),
122                nng_c_sys::NNG_OPT_SUB_SUBSCRIBE.as_ptr() as *const i8,
123                bytes.as_ptr() as *const std::ffi::c_void,
124                Subscribe::ID_BYTE_SIZE,
125            )
126        };
127        if rv != 0 {
128            return Err(format!(
129                "Failed to subscribe to the specified paths. Error code: {}",
130                rv
131            ));
132        }
133
134        // Create the subscription and store it in the HashMap
135        let subscription = Arc::new(RwLock::new(Subscription::new(sub)));
136        let mut subscriptions = self.active_subscriptions.write().unwrap();
137        subscriptions.insert(id, subscription.clone());
138
139        // Retrieve a read lock from the subscription stored in the map
140        Ok(ReadOnlySubscription::new(subscription))
141    }
142}