Skip to main content

motorcortex_rust/client/
request.rs

1use crate::client::{receive_message, Parameters};
2use crate::connection::{ConnectionManager, ConnectionOptions};
3use crate::error::{MotorcortexError, Result};
4use crate::msg::{
5    get_hash, get_hash_size, CreateGroupMsg, GetParameterListMsg,
6    GetParameterMsg, GetParameterTreeHashMsg, GetParameterTreeMsg, GroupStatusMsg, Hash, LoginMsg, LogoutMsg,
7    ParameterListMsg, ParameterMsg, ParameterTreeMsg, RemoveGroupMsg, SetParameterListMsg, SetParameterMsg,
8    StatusCode, StatusMsg,
9};
10use crate::parameter_value::{
11    decode_parameter_value, encode_parameter_value, GetParameterTuple, GetParameterValue,
12    SetParameterTuple, SetParameterValue,
13};
14use crate::ParameterTree;
15
16use prost::Message;
17
18/// Represents a request client that manages socket-based connections
19/// and interacts with the server.
20///
21/// # Thread Safety
22///
23/// `Request` is `Send` but not `Sync`. This means you can move a `Request`
24/// to another thread, but you cannot share it between threads by reference.
25/// The Req/Rep protocol requires strict send→receive ordering, which cannot
26/// be guaranteed with concurrent callers.
27///
28/// Each thread that needs concurrent server access should create its own
29/// `Request` instance, or protect shared access with a `Mutex<Request>`.
30pub struct Request {
31    connection_data: ConnectionManager,
32    /// Client's local representation of the parameter hierarchy.
33    parameter_tree: ParameterTree,
34}
35
36impl Request {
37    /// Creates a new `Request` instance.
38    ///
39    /// No socket is opened until `connect()` is called.
40    pub fn new() -> Self {
41        Self {
42            connection_data: ConnectionManager::new(),
43            parameter_tree: ParameterTree::new(),
44        }
45    }
46
47    /// Establishes a connection to a specified server URL using the given configuration options.
48    ///
49    /// Opens a REQ socket and connects to the server.
50    ///
51    /// # Arguments:
52    /// * `url` - The server's address (e.g., "wss://127.0.0.1:5568").
53    /// * `connection_options` - The connection settings, including TLS certificate and timeouts.
54    ///
55    /// # Errors:
56    /// Returns `MotorcortexError::Connection` if the socket cannot be opened or the connection fails.
57    pub fn connect(&mut self, url: &str, connection_options: ConnectionOptions) -> Result<()> {
58        self.connection_data
59            .connect(url, connection_options, nng_c_sys::nng_req0_open)
60    }
61
62    /// Disconnects the current connection and frees the associated resources.
63    ///
64    /// # Errors:
65    /// Returns `MotorcortexError::Connection` if the disconnection fails.
66    pub fn disconnect(&mut self) -> Result<()> {
67        self.connection_data.disconnect()
68    }
69
70    /// Sends a login message to the server with the specified username and password.
71    ///
72    /// # Arguments:
73    /// * `username` - The username to authenticate with.
74    /// * `password` - The password for authentication.
75    ///
76    /// # Returns:
77    /// * `Ok(StatusCode)` - The status code representing the login response.
78    ///
79    /// # Errors:
80    /// Returns `MotorcortexError::Encode`, `MotorcortexError::Io`, or `MotorcortexError::Decode`.
81    pub fn login(&self, username: String, password: String) -> Result<StatusCode> {
82        let login_msg = LoginMsg {
83            header: None,
84            login: username,
85            password,
86        };
87
88        let buffer = Self::encode_with_hash(&login_msg)?;
89        self.send_message(&buffer)?;
90
91        let buf = self.receive()?;
92        let msg = Self::decode_status_msg(&buf)?;
93
94        Ok(StatusCode::try_from(msg.status).unwrap())
95    }
96
97    /// Sends a logout message to the server.
98    ///
99    /// # Returns:
100    /// * `Ok(StatusCode)` - The status code representing the logout response.
101    ///
102    /// # Errors:
103    /// Returns `MotorcortexError::Encode`, `MotorcortexError::Io`, or `MotorcortexError::Decode`.
104    pub fn logout(&self) -> Result<StatusCode> {
105        let logout_msg = LogoutMsg { header: None };
106
107        let buffer = Self::encode_with_hash(&logout_msg)?;
108        self.send_message(&buffer)?;
109
110        let buf = self.receive()?;
111        let msg = Self::decode_status_msg(&buf)?;
112
113        Ok(StatusCode::try_from(msg.status).unwrap())
114    }
115
116    /// Requests and updates the client's parameter tree from the server.
117    ///
118    /// # Returns:
119    /// * `Ok(StatusCode)` - The status code indicating the result of the request.
120    ///
121    /// # Errors:
122    /// Returns `MotorcortexError::Encode`, `MotorcortexError::Io`, or `MotorcortexError::Decode`.
123    pub fn request_parameter_tree(&mut self) -> Result<StatusCode> {
124        let (status_code, parameter_tree) = self.get_parameter_tree()?;
125        self.parameter_tree = parameter_tree;
126        Ok(status_code)
127    }
128
129    /// Updates a specific parameter on the server with the provided value.
130    ///
131    /// # Arguments:
132    /// * `path` - The hierarchical path to the parameter being updated.
133    /// * `value` - The new value for the parameter. Must implement `SetParameterValue`.
134    ///
135    /// # Returns:
136    /// * `Ok(StatusCode)` - The status code after setting the parameter.
137    ///
138    /// # Errors:
139    /// Returns `MotorcortexError::ParameterNotFound` if the path doesn't exist in the tree.
140    /// Returns `MotorcortexError::Encode`, `MotorcortexError::Io`, or `MotorcortexError::Decode`
141    /// for communication failures.
142    pub fn set_parameter<V>(&self, path: &str, value: V) -> Result<StatusCode>
143    where
144        V: SetParameterValue + Default + PartialEq,
145    {
146        let data_type = self
147            .parameter_tree
148            .get_parameter_data_type(path)
149            .ok_or_else(|| MotorcortexError::ParameterNotFound(path.to_string()))?;
150
151        let msg = SetParameterMsg {
152            header: None,
153            offset: None,
154            path: path.to_string(),
155            value: encode_parameter_value(data_type, &value),
156        };
157
158        let buffer = Self::encode_with_hash(&msg)?;
159        self.send_message(&buffer)?;
160
161        let buf = self.receive()?;
162        let msg = Self::decode_status_msg(&buf)?;
163
164        Ok(StatusCode::try_from(msg.status).unwrap())
165    }
166
167    /// Sets multiple parameters on the server in a single request.
168    ///
169    /// # Arguments:
170    /// * `paths` - A vector of hierarchical paths to the parameters.
171    /// * `values` - A tuple of values matching the paths.
172    ///
173    /// # Errors:
174    /// Returns `MotorcortexError::ParameterNotFound` if any path doesn't exist in the tree.
175    pub fn set_parameters<T>(&self, paths: Vec<&str>, values: T) -> Result<StatusCode>
176    where
177        T: SetParameterTuple,
178    {
179        let mut msg = SetParameterListMsg {
180            header: None,
181            params: Vec::new(),
182        };
183
184        for (i, path) in paths.iter().enumerate() {
185            let data_type = self
186                .parameter_tree
187                .get_parameter_data_type(path)
188                .ok_or_else(|| MotorcortexError::ParameterNotFound(path.to_string()))?;
189
190            msg.params.push(SetParameterMsg {
191                header: None,
192                offset: None,
193                path: path.to_string(),
194                value: values
195                    .get_tuple_element(i, data_type)
196                    .map_err(|e| MotorcortexError::Encode(e))?,
197            });
198        }
199
200        let buffer = Self::encode_with_hash(&msg)?;
201        self.send_message(&buffer)?;
202
203        let buf = self.receive()?;
204        let msg = Self::decode_status_msg(&buf)?;
205
206        Ok(StatusCode::try_from(msg.status).unwrap())
207    }
208
209    /// Retrieves the value of a parameter from the server for the given path.
210    ///
211    /// # Arguments:
212    /// * `path` - The hierarchical path of the parameter to retrieve.
213    ///
214    /// # Type Parameters:
215    /// * `V` - The expected value type. The server value is automatically converted.
216    ///
217    /// # Errors:
218    /// Returns `MotorcortexError::ParameterNotFound` if the path doesn't exist in the tree.
219    /// Returns `MotorcortexError::Encode`, `MotorcortexError::Io`, or `MotorcortexError::Decode`
220    /// for communication failures.
221    pub fn get_parameter<V>(&self, path: &str) -> Result<V>
222    where
223        V: GetParameterValue + Default,
224    {
225        let data_type = self
226            .parameter_tree
227            .get_parameter_data_type(path)
228            .ok_or_else(|| MotorcortexError::ParameterNotFound(path.to_string()))?;
229
230        let msg = GetParameterMsg {
231            header: None,
232            path: path.to_string(),
233        };
234
235        let buffer = Self::encode_with_hash(&msg)?;
236        self.send_message(&buffer)?;
237
238        let buf = self.receive()?;
239        let msg = Self::decode_parameter_msg(&buf)?;
240
241        Ok(decode_parameter_value(data_type, &msg.value))
242    }
243
244    /// Retrieves multiple parameters from the server in a single request.
245    ///
246    /// # Arguments:
247    /// * `paths` - A vector of hierarchical paths.
248    ///
249    /// # Type Parameters:
250    /// * `T` - A tuple type matching the expected parameter types.
251    ///
252    /// # Errors:
253    /// Returns `MotorcortexError::Decode` if the response cannot be decoded.
254    pub fn get_parameters<T>(&self, paths: Vec<&str>) -> Result<T>
255    where
256        T: GetParameterTuple,
257    {
258        let mut msg = GetParameterListMsg {
259            header: None,
260            params: Vec::new(),
261        };
262
263        for path in paths {
264            msg.params.push(GetParameterMsg {
265                header: None,
266                path: path.to_string(),
267            })
268        }
269
270        let buffer = Self::encode_with_hash(&msg)?;
271        self.send_message(&buffer)?;
272
273        let buf = self.receive()?;
274        let msg = Self::decode_message::<ParameterListMsg>(&buf)?;
275
276        let combined_iterator = msg.params.iter().map(|parameter| {
277            (
278                &parameter.info.as_ref().unwrap().data_type,
279                parameter.value.as_slice(),
280            )
281        });
282
283        T::get_parameters(combined_iterator).map_err(|e| MotorcortexError::Decode(e))
284    }
285
286    /// Retrieves the server's parameter tree.
287    ///
288    /// # Returns:
289    /// * `Ok((StatusCode, ParameterTree))` - The status code and the retrieved parameter tree.
290    ///
291    /// # Errors:
292    /// Returns `MotorcortexError::Decode` if the tree message is invalid.
293    pub fn get_parameter_tree(&self) -> Result<(StatusCode, ParameterTree)> {
294        let get_parameter_tree = GetParameterTreeMsg { header: None };
295        let buffer = Self::encode_with_hash(&get_parameter_tree)?;
296        self.send_message(&buffer)?;
297
298        let buf = self.receive()?;
299        let msg = Self::decode_parameter_tree_msg(&buf)?;
300
301        match ParameterTree::from_message(msg) {
302            Some(parameter_tree) => Ok((StatusCode::Ok, parameter_tree)),
303            None => Err(MotorcortexError::Decode(
304                "Failed to create ParameterTree: invalid status code".to_string(),
305            )),
306        }
307    }
308
309    /// Retrieves the hash of the server's parameter tree for change detection.
310    pub fn get_parameter_tree_hash(&self) -> Result<u32> {
311        let get_parameter_tree_hash = GetParameterTreeHashMsg { header: None };
312        let buffer = Self::encode_with_hash(&get_parameter_tree_hash)?;
313        self.send_message(&buffer)?;
314
315        let buf = self.receive()?;
316        let msg = Self::decode_message::<ParameterTreeMsg>(&buf)?;
317
318        Ok(msg.hash)
319    }
320
321    /// Creates a subscription group on the server.
322    ///
323    /// # Arguments:
324    /// * `parameters` - The parameter paths to include in the group.
325    /// * `group_name` - An alias for the group.
326    /// * `frequency_divider` - Update rate divider relative to the server's base frequency.
327    pub fn create_group<I>(
328        &self,
329        parameters: I,
330        group_name: &str,
331        frequency_divider: u32,
332    ) -> Result<GroupStatusMsg>
333    where
334        I: Parameters,
335    {
336        let create_group_msg = CreateGroupMsg {
337            header: None,
338            frq_divider: frequency_divider,
339            alias: group_name.to_string(),
340            paths: parameters.into_vec(),
341        };
342        let buffer = Self::encode_with_hash(&create_group_msg)?;
343        self.send_message(&buffer)?;
344
345        let buf = self.receive()?;
346        let msg = Self::decode_message::<GroupStatusMsg>(&buf)?;
347
348        Ok(msg)
349    }
350
351    /// Removes a subscription group from the server.
352    ///
353    /// # Arguments:
354    /// * `group_name` - The alias of the group to remove.
355    pub fn remove_group(&self, group_name: &str) -> Result<StatusCode> {
356        let remove_group_msg = RemoveGroupMsg {
357            header: None,
358            alias: group_name.to_string(),
359        };
360        let buffer = Self::encode_with_hash(&remove_group_msg)?;
361        self.send_message(&buffer)?;
362
363        let buf = self.receive()?;
364        let msg = Self::decode_status_msg(&buf)?;
365
366        if msg.status == StatusCode::Ok as i32 {
367            Ok(StatusCode::Ok)
368        } else {
369            Err(MotorcortexError::Status(
370                StatusCode::try_from(msg.status).unwrap(),
371            ))
372        }
373    }
374
375    /// Encodes a message with its associated hash into a byte buffer for transport.
376    fn encode_with_hash<M: Message + Hash>(message: &M) -> Result<Vec<u8>> {
377        let mut buffer: Vec<u8> = Vec::new();
378        buffer.extend(get_hash::<M>().to_le_bytes());
379        message
380            .encode(&mut buffer)
381            .map_err(|e| MotorcortexError::Encode(e.to_string()))?;
382        Ok(buffer)
383    }
384
385    /// Decodes a byte slice into a specific Protobuf message type.
386    pub fn decode_message<T: Message + Default + Hash>(reply_slice: &[u8]) -> Result<T> {
387        let hash_size = get_hash_size();
388
389        if hash_size > reply_slice.len() {
390            return Err(MotorcortexError::Decode(
391                "Invalid message length, hash missing".to_string(),
392            ));
393        }
394
395        let provided_hash = u32::from_le_bytes(
396            reply_slice[..hash_size]
397                .try_into()
398                .map_err(|_| MotorcortexError::Decode("Failed to extract hash".to_string()))?,
399        );
400
401        if provided_hash != get_hash::<T>() {
402            return Err(MotorcortexError::Decode("Invalid message hash".to_string()));
403        }
404
405        let decode_slice = &reply_slice[hash_size..];
406        T::decode(decode_slice).map_err(MotorcortexError::from)
407    }
408
409    fn decode_parameter_tree_msg(reply_slice: &[u8]) -> Result<ParameterTreeMsg> {
410        Self::decode_message::<ParameterTreeMsg>(reply_slice)
411    }
412
413    fn decode_status_msg(reply_slice: &[u8]) -> Result<StatusMsg> {
414        Self::decode_message::<StatusMsg>(reply_slice)
415    }
416
417    fn decode_parameter_msg(reply_slice: &[u8]) -> Result<ParameterMsg> {
418        Self::decode_message::<ParameterMsg>(reply_slice)
419    }
420
421    /// Receives a message from the server using the active socket.
422    fn receive(&self) -> Result<Vec<u8>> {
423        receive_message(self.connection_data.sock.as_ref().ok_or_else(|| {
424            MotorcortexError::Connection("Socket is not available. Connect first.".to_string())
425        })?)
426    }
427
428    /// Sends the provided data buffer to the server using the NNG socket.
429    fn send_message(&self, buffer: &[u8]) -> Result<()> {
430        unsafe {
431            let data_ptr = buffer.as_ptr() as *mut std::ffi::c_void;
432            let data_len = buffer.len();
433
434            let sock = self.connection_data.sock.ok_or_else(|| {
435                MotorcortexError::Connection("Socket is not available. Connect first.".to_string())
436            })?;
437            let rv = nng_c_sys::nng_send(sock, data_ptr, data_len, 0);
438
439            if rv != 0 {
440                return Err(MotorcortexError::Io(format!(
441                    "nng_send failed with code: {}",
442                    rv
443                )));
444            }
445        }
446
447        Ok(())
448    }
449}