motorcortex_rust/core/request.rs
1//! The async-first `Request` handle.
2//!
3//! Cheap to clone (carries a `mpsc::Sender`); every method is an
4//! `async fn` that resolves once the driver thread serves the
5//! command. See [`crate::core::driver`] for the loop on the other
6//! side of the channel.
7
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{Arc, RwLock};
10use std::thread;
11
12use tokio::sync::{mpsc, oneshot, watch};
13
14use crate::client::{ParameterTree, Parameters};
15use crate::connection::{ConnectionOptions, PipeEvent};
16use crate::core::driver::run_request_driver;
17use crate::core::state::ConnectionState;
18use crate::core::util::await_reply;
19use crate::error::{MotorcortexError, Result};
20use crate::msg::{
21 CreateGroupMsg, GetParameterListMsg, GetParameterMsg, GroupStatusMsg, ParameterListMsg,
22 SetParameterListMsg, SetParameterMsg, StatusCode,
23};
24use crate::parameter_value::{
25 GetParameterTuple, GetParameterValue, SetParameterTuple, SetParameterValue,
26 decode_parameter_value, encode_parameter_value,
27};
28
29/// Commands the `Request` handle sends to its driver thread.
30///
31/// Each variant carries an `oneshot::Sender` for the driver to hand
32/// back the result — the user `.await`s the matching `Receiver`.
33pub(crate) enum Cmd {
34 Connect {
35 url: String,
36 opts: ConnectionOptions,
37 reply: oneshot::Sender<Result<()>>,
38 },
39 Disconnect {
40 reply: oneshot::Sender<Result<()>>,
41 },
42 Login {
43 user: String,
44 pass: String,
45 reply: oneshot::Sender<Result<StatusCode>>,
46 },
47 Logout {
48 reply: oneshot::Sender<Result<StatusCode>>,
49 },
50 RequestParameterTree {
51 reply: oneshot::Sender<Result<StatusCode>>,
52 },
53 GetParameter {
54 path: String,
55 reply: oneshot::Sender<Result<Vec<u8>>>,
56 },
57 SetParameter {
58 path: String,
59 value: Vec<u8>,
60 reply: oneshot::Sender<Result<StatusCode>>,
61 },
62 GetParameters {
63 msg: GetParameterListMsg,
64 reply: oneshot::Sender<Result<ParameterListMsg>>,
65 },
66 SetParameters {
67 msg: SetParameterListMsg,
68 reply: oneshot::Sender<Result<StatusCode>>,
69 },
70 CreateGroup {
71 msg: CreateGroupMsg,
72 reply: oneshot::Sender<Result<GroupStatusMsg>>,
73 },
74 RemoveGroup {
75 alias: String,
76 reply: oneshot::Sender<Result<StatusCode>>,
77 },
78 GetParameterTreeHash {
79 reply: oneshot::Sender<Result<u32>>,
80 },
81 /// Fetch a fresh session token from the server. On success,
82 /// updates the driver's shared `last_token` cache (which powers
83 /// the automatic restore on reconnect) *and* returns the token
84 /// to the caller.
85 GetSessionToken {
86 reply: oneshot::Sender<Result<String>>,
87 },
88 /// Restore a previously-issued session without re-login. The
89 /// caller supplies the token explicitly; the driver uses this
90 /// path internally on reconnect too.
91 RestoreSession {
92 token: String,
93 reply: oneshot::Sender<Result<StatusCode>>,
94 },
95 /// Background token-refresh tick. Does a GetSessionToken RPC,
96 /// stores the result in the driver's `last_token` cache, and
97 /// discards the reply. Driven by the refresh helper thread.
98 RefreshTokenTick,
99 /// Forwarded from the NNG pipe-notify callback via an
100 /// `Arc<dyn Fn(PipeEvent)>` installed on the connection. Drains
101 /// through the same command queue as user RPCs so state
102 /// transitions don't race with requests in flight.
103 Pipe(PipeEvent),
104}
105
106/// Async handle for request/reply RPCs.
107///
108/// Cloning a `Request` gives a second handle that multiplexes onto the
109/// same driver thread — commands serialise through the driver, so the
110/// NNG Req/Rep ordering invariant is enforced in the type system
111/// without any user-visible `Mutex`.
112pub struct Request {
113 tx: mpsc::UnboundedSender<Cmd>,
114 state: watch::Receiver<ConnectionState>,
115 tree: Arc<RwLock<ParameterTree>>,
116 /// Latest session token from the most recent `GetSessionToken`
117 /// RPC (either user-invoked or via the refresh helper). Shared
118 /// with the driver so both the user side (via
119 /// [`session_token`](Self::session_token)) and the reconnect
120 /// handler see the same value.
121 last_token: Arc<RwLock<Option<String>>>,
122 /// Counter the driver bumps each time the background refresh
123 /// helper actually fires a `GetSessionToken` RPC (i.e. the
124 /// connection was live at tick time). Lets callers — and the
125 /// integration tests — observe that the refresh is paused while
126 /// the pipe is down.
127 refresh_count: Arc<AtomicU64>,
128}
129
130impl Request {
131 /// Create a new handle and spawn its driver thread.
132 ///
133 /// The handle starts in [`ConnectionState::Disconnected`]; call
134 /// [`connect`](Self::connect) to open the socket.
135 ///
136 /// ```
137 /// use motorcortex_rust::core::{ConnectionState, Request};
138 /// let req = Request::new();
139 /// assert_eq!(*req.state().borrow(), ConnectionState::Disconnected);
140 /// ```
141 pub fn new() -> Self {
142 let (tx, rx) = mpsc::unbounded_channel();
143 let (state_tx, state_rx) = watch::channel(ConnectionState::Disconnected);
144 let tree = Arc::new(RwLock::new(ParameterTree::new()));
145 let last_token: Arc<RwLock<Option<String>>> = Arc::new(RwLock::new(None));
146 let refresh_count = Arc::new(AtomicU64::new(0));
147 let tree_for_driver = Arc::clone(&tree);
148 let token_for_driver = Arc::clone(&last_token);
149 let count_for_driver = Arc::clone(&refresh_count);
150 let tx_for_driver = tx.clone();
151 thread::Builder::new()
152 .name("mcx-request-driver".into())
153 .spawn(move || {
154 run_request_driver(
155 tx_for_driver,
156 rx,
157 state_tx,
158 tree_for_driver,
159 token_for_driver,
160 count_for_driver,
161 )
162 })
163 .expect("spawning the driver thread must succeed on any OS we target");
164 Self {
165 tx,
166 state: state_rx,
167 tree,
168 last_token,
169 refresh_count,
170 }
171 }
172
173 /// Subscribe to connection-state transitions.
174 ///
175 /// Returns a `watch::Receiver`; consumers can `state.changed().await`
176 /// or `*state.borrow()` for the current value.
177 pub fn state(&self) -> watch::Receiver<ConnectionState> {
178 self.state.clone()
179 }
180
181 /// Open the socket and dial `url`.
182 ///
183 /// ```no_run
184 /// # async fn demo() -> motorcortex_rust::Result<()> {
185 /// use motorcortex_rust::{ConnectionOptions, core::Request};
186 /// let req = Request::new();
187 /// let opts = ConnectionOptions::new("mcx.cert.crt".into(), 1000, 1000);
188 /// req.connect("wss://127.0.0.1:5568", opts).await?;
189 /// # Ok(()) }
190 /// ```
191 pub async fn connect(&self, url: &str, opts: ConnectionOptions) -> Result<()> {
192 let (reply_tx, reply_rx) = oneshot::channel();
193 self.send_cmd(Cmd::Connect {
194 url: url.to_string(),
195 opts,
196 reply: reply_tx,
197 })?;
198 await_reply(reply_rx).await?
199 }
200
201 /// Close the socket. Subsequent RPCs will error with
202 /// [`MotorcortexError::Connection`] until `connect` is called again.
203 pub async fn disconnect(&self) -> Result<()> {
204 let (reply_tx, reply_rx) = oneshot::channel();
205 self.send_cmd(Cmd::Disconnect { reply: reply_tx })?;
206 await_reply(reply_rx).await?
207 }
208
209 /// Authenticate with the server.
210 ///
211 /// Returns the server's [`StatusCode`] — `Ok` on success,
212 /// `WrongPassword` / other variants on rejection. Transport-level
213 /// failures surface as [`MotorcortexError`].
214 pub async fn login(&self, user: &str, pass: &str) -> Result<StatusCode> {
215 let (reply_tx, reply_rx) = oneshot::channel();
216 self.send_cmd(Cmd::Login {
217 user: user.to_string(),
218 pass: pass.to_string(),
219 reply: reply_tx,
220 })?;
221 await_reply(reply_rx).await?
222 }
223
224 /// Drop the current session on the server.
225 pub async fn logout(&self) -> Result<StatusCode> {
226 let (reply_tx, reply_rx) = oneshot::channel();
227 self.send_cmd(Cmd::Logout { reply: reply_tx })?;
228 await_reply(reply_rx).await?
229 }
230
231 /// Fetch the parameter tree from the server and store it in the
232 /// shared cache accessible via [`parameter_tree`](Self::parameter_tree).
233 ///
234 /// Returns the server's [`StatusCode`]. The cache is only updated
235 /// on `StatusCode::Ok`; a non-OK reply leaves the previous cache
236 /// intact.
237 pub async fn request_parameter_tree(&self) -> Result<StatusCode> {
238 let (reply_tx, reply_rx) = oneshot::channel();
239 self.send_cmd(Cmd::RequestParameterTree { reply: reply_tx })?;
240 await_reply(reply_rx).await?
241 }
242
243 /// Shared read handle to the local parameter-tree cache. Every
244 /// cloned [`Request`] sees the same tree; reads are `RwLock::read`
245 /// (cheap, no channel round-trip). Populated by
246 /// [`request_parameter_tree`](Self::request_parameter_tree).
247 pub fn parameter_tree(&self) -> Arc<RwLock<ParameterTree>> {
248 Arc::clone(&self.tree)
249 }
250
251 /// Read a single parameter. The caller-specified `V` is the Rust
252 /// type the server value should be converted to (see
253 /// [`GetParameterValue`]).
254 ///
255 /// Returns [`MotorcortexError::ParameterNotFound`] if `path` is
256 /// unknown locally — call [`request_parameter_tree`](Self::request_parameter_tree)
257 /// first.
258 ///
259 /// ```no_run
260 /// # async fn demo(req: motorcortex_rust::core::Request) -> motorcortex_rust::Result<()> {
261 /// // Same path, three different Rust types — the server value is
262 /// // converted per call (lossy casts allowed).
263 /// let as_double: f64 = req.get_parameter("root/Control/dummyDouble").await?;
264 /// let as_int: i64 = req.get_parameter("root/Control/dummyDouble").await?;
265 /// let as_text: String = req.get_parameter("root/Control/dummyDouble").await?;
266 /// # Ok(()) }
267 /// ```
268 pub async fn get_parameter<V>(&self, path: &str) -> Result<V>
269 where
270 V: GetParameterValue + Default,
271 {
272 let data_type = self.data_type_of(path)?;
273
274 let (reply_tx, reply_rx) = oneshot::channel();
275 self.send_cmd(Cmd::GetParameter {
276 path: path.to_string(),
277 reply: reply_tx,
278 })?;
279 let value_bytes = await_reply(reply_rx).await??;
280 Ok(decode_parameter_value::<V>(data_type, &value_bytes))
281 }
282
283 /// Write a single parameter. Returns the server's [`StatusCode`].
284 ///
285 /// ```no_run
286 /// # async fn demo(req: motorcortex_rust::core::Request) -> motorcortex_rust::Result<()> {
287 /// // Scalar.
288 /// req.set_parameter("root/Control/dummyDouble", 2.345_f64).await?;
289 /// // Fixed-size array.
290 /// req.set_parameter("root/Control/dummyDoubleVec", [1.0, 2.0, 3.0]).await?;
291 /// // Dynamic Vec.
292 /// req.set_parameter("root/Control/dummyDoubleVec", vec![1.0, 2.0]).await?;
293 /// # Ok(()) }
294 /// ```
295 pub async fn set_parameter<V>(&self, path: &str, value: V) -> Result<StatusCode>
296 where
297 V: SetParameterValue,
298 {
299 let data_type = self.data_type_of(path)?;
300 let value_bytes = encode_parameter_value(data_type, &value);
301
302 let (reply_tx, reply_rx) = oneshot::channel();
303 self.send_cmd(Cmd::SetParameter {
304 path: path.to_string(),
305 value: value_bytes,
306 reply: reply_tx,
307 })?;
308 await_reply(reply_rx).await?
309 }
310
311 /// Read a batch of parameters in one RPC. The generic `T` is a
312 /// tuple type like `(bool, f64, i32)` whose arity matches
313 /// `paths.len()`. Each element is decoded into its position using
314 /// the dtype recorded in the local tree cache.
315 ///
316 /// ```no_run
317 /// # async fn demo(req: motorcortex_rust::core::Request) -> motorcortex_rust::Result<()> {
318 /// let (b, d, i): (bool, f64, i32) = req.get_parameters(&[
319 /// "root/Control/dummyBool",
320 /// "root/Control/dummyDouble",
321 /// "root/Control/dummyInt32",
322 /// ]).await?;
323 /// # Ok(()) }
324 /// ```
325 pub async fn get_parameters<T>(&self, paths: &[&str]) -> Result<T>
326 where
327 T: GetParameterTuple,
328 {
329 // Resolve every dtype up-front so the first missing path is
330 // reported before we even build the request message.
331 let dtypes: Vec<u32> = paths
332 .iter()
333 .map(|p| self.data_type_of(p))
334 .collect::<Result<_>>()?;
335
336 let msg = GetParameterListMsg {
337 header: None,
338 params: paths
339 .iter()
340 .map(|p| GetParameterMsg {
341 header: None,
342 path: p.to_string(),
343 })
344 .collect(),
345 };
346
347 let (reply_tx, reply_rx) = oneshot::channel();
348 self.send_cmd(Cmd::GetParameters {
349 msg,
350 reply: reply_tx,
351 })?;
352 let reply = await_reply(reply_rx).await??;
353
354 // Use the dtypes the client looked up locally, not whatever
355 // the server echoed back. Matches legacy semantics (dtype is
356 // authoritative client-side — it's what the caller used when
357 // picking `T`'s element types).
358 let iter = reply
359 .params
360 .iter()
361 .zip(dtypes.iter())
362 .map(|(param, dt)| (dt, param.value.as_slice()));
363 T::get_parameters(iter).map_err(MotorcortexError::Decode)
364 }
365
366 /// Write a batch of parameters in one RPC. `values` is a tuple
367 /// (or single-element tuple) whose arity matches `paths.len()`.
368 /// Each element is encoded against the dtype recorded in the
369 /// local tree cache.
370 pub async fn set_parameters<T>(&self, paths: &[&str], values: T) -> Result<StatusCode>
371 where
372 T: SetParameterTuple,
373 {
374 let mut params = Vec::with_capacity(paths.len());
375 for (i, path) in paths.iter().enumerate() {
376 let data_type = self.data_type_of(path)?;
377 let value = values
378 .get_tuple_element(i, data_type)
379 .map_err(MotorcortexError::Encode)?;
380 params.push(SetParameterMsg {
381 header: None,
382 offset: None,
383 path: path.to_string(),
384 value,
385 });
386 }
387
388 let msg = SetParameterListMsg {
389 header: None,
390 params,
391 };
392
393 let (reply_tx, reply_rx) = oneshot::channel();
394 self.send_cmd(Cmd::SetParameters {
395 msg,
396 reply: reply_tx,
397 })?;
398 await_reply(reply_rx).await?
399 }
400
401 /// Create a server-side subscription group. Returns the
402 /// [`GroupStatusMsg`] the subscribe-side code uses as the
403 /// group descriptor.
404 ///
405 /// `paths` accepts anything implementing [`Parameters`] — a
406 /// string literal, a `Vec<String>`, an array of `&str`, etc.
407 pub async fn create_group<I>(
408 &self,
409 paths: I,
410 alias: &str,
411 frequency_divider: u32,
412 ) -> Result<GroupStatusMsg>
413 where
414 I: Parameters,
415 {
416 let msg = CreateGroupMsg {
417 header: None,
418 frq_divider: frequency_divider,
419 alias: alias.to_string(),
420 paths: paths.into_vec(),
421 };
422 let (reply_tx, reply_rx) = oneshot::channel();
423 self.send_cmd(Cmd::CreateGroup {
424 msg,
425 reply: reply_tx,
426 })?;
427 await_reply(reply_rx).await?
428 }
429
430 /// Remove a previously-created subscription group by alias.
431 /// Returns the server's [`StatusCode`] unchanged — `Ok` on
432 /// success, `Failed` (or similar) if the group wasn't there.
433 /// Transport / decode failures surface as
434 /// [`MotorcortexError`].
435 ///
436 /// The crate-wide rule: RPCs that return `Result<StatusCode>`
437 /// never promote a non-OK server reply into `Err`. Branch on
438 /// the returned code if you care which way the request went.
439 pub async fn remove_group(&self, alias: &str) -> Result<StatusCode> {
440 let (reply_tx, reply_rx) = oneshot::channel();
441 self.send_cmd(Cmd::RemoveGroup {
442 alias: alias.to_string(),
443 reply: reply_tx,
444 })?;
445 await_reply(reply_rx).await?
446 }
447
448 /// Fetch a fresh session token from the server.
449 ///
450 /// On success the token is cached in the driver and returned to
451 /// the caller. Callers that want to persist a session across
452 /// process restarts can stash the returned string and hand it
453 /// back to a fresh `Request` via [`restore_session`](Self::restore_session).
454 ///
455 /// While a connection is live, the driver also refreshes this
456 /// token periodically in the background (see
457 /// [`ConnectionOptions::token_refresh_interval`]) so the cache
458 /// stays warm for the automatic reconnect path.
459 pub async fn get_session_token(&self) -> Result<String> {
460 let (reply_tx, reply_rx) = oneshot::channel();
461 self.send_cmd(Cmd::GetSessionToken { reply: reply_tx })?;
462 await_reply(reply_rx).await?
463 }
464
465 /// Restore a previously-issued session by supplying the token.
466 /// Returns the server's [`StatusCode`]:
467 ///
468 /// - `Ok` / `ReadOnlyMode` — the session was accepted, subsequent
469 /// RPCs run under that identity.
470 /// - `PermissionDenied` / `Failed` — the token is stale or the
471 /// server has lost its state.
472 ///
473 /// The driver calls this internally on reconnect using the token
474 /// stashed by the refresh loop; callers generally don't need to
475 /// invoke it explicitly unless they're recovering from a process
476 /// restart.
477 pub async fn restore_session(&self, token: &str) -> Result<StatusCode> {
478 let (reply_tx, reply_rx) = oneshot::channel();
479 self.send_cmd(Cmd::RestoreSession {
480 token: token.to_string(),
481 reply: reply_tx,
482 })?;
483 await_reply(reply_rx).await?
484 }
485
486 /// Snapshot of the most recently cached session token, if any.
487 ///
488 /// Populated by [`get_session_token`](Self::get_session_token)
489 /// and by the periodic refresh loop. Returns `None` before the
490 /// first successful fetch.
491 pub fn session_token(&self) -> Option<String> {
492 self.last_token.read().ok().and_then(|g| g.clone())
493 }
494
495 /// Count of `GetSessionToken` RPCs the background refresh helper
496 /// has fired since this handle was created.
497 ///
498 /// Bumped by the driver only when the refresh tick runs against
499 /// a live pipe — ticks that fire while the state is
500 /// `ConnectionLost` / `SessionExpired` / `Disconnected` are
501 /// skipped and don't count. Useful for observing that the
502 /// refresh loop is paused while the transport is down, or as a
503 /// lightweight liveness metric.
504 pub fn session_refresh_count(&self) -> u64 {
505 self.refresh_count.load(Ordering::Relaxed)
506 }
507
508 /// Fetch the server's parameter-tree hash — useful for cheap
509 /// change detection. A non-zero return on a populated server
510 /// signals the caller can skip a full [`request_parameter_tree`]
511 /// if the hash matches what they cached previously.
512 ///
513 /// [`request_parameter_tree`]: Self::request_parameter_tree
514 pub async fn get_parameter_tree_hash(&self) -> Result<u32> {
515 let (reply_tx, reply_rx) = oneshot::channel();
516 self.send_cmd(Cmd::GetParameterTreeHash { reply: reply_tx })?;
517 await_reply(reply_rx).await?
518 }
519
520 /// Convenience: `Request::new()` + [`connect`](Self::connect) in
521 /// one call. Useful for the "I just want a connected client"
522 /// entry path.
523 ///
524 /// ```no_run
525 /// # async fn demo() -> motorcortex_rust::Result<()> {
526 /// use motorcortex_rust::{ConnectionOptions, core::Request};
527 /// let opts = ConnectionOptions::new("mcx.cert.crt".into(), 1000, 1000);
528 /// let req = Request::connect_to("wss://127.0.0.1:5568", opts).await?;
529 /// req.request_parameter_tree().await?;
530 /// # Ok(()) }
531 /// ```
532 pub async fn connect_to(url: &str, opts: ConnectionOptions) -> Result<Self> {
533 let req = Self::new();
534 req.connect(url, opts).await?;
535 Ok(req)
536 }
537
538 /// Shared helper: local tree lookup with a clear
539 /// `ParameterNotFound` error when the path isn't cached.
540 fn data_type_of(&self, path: &str) -> Result<u32> {
541 self.tree
542 .read()
543 .map_err(|_| MotorcortexError::Decode("parameter tree lock poisoned".into()))?
544 .get_parameter_data_type(path)
545 .ok_or_else(|| MotorcortexError::ParameterNotFound(path.to_string()))
546 }
547
548 fn send_cmd(&self, cmd: Cmd) -> Result<()> {
549 self.tx
550 .send(cmd)
551 .map_err(|_| MotorcortexError::Connection("driver thread is gone".into()))
552 }
553}
554
555impl Default for Request {
556 fn default() -> Self {
557 Self::new()
558 }
559}
560
561impl Clone for Request {
562 fn clone(&self) -> Self {
563 Self {
564 tx: self.tx.clone(),
565 state: self.state.clone(),
566 tree: Arc::clone(&self.tree),
567 last_token: Arc::clone(&self.last_token),
568 refresh_count: Arc::clone(&self.refresh_count),
569 }
570 }
571}
572
573#[cfg(test)]
574mod tests {
575 use super::*;
576
577 #[test]
578 fn new_starts_disconnected() {
579 let req = Request::new();
580 assert_eq!(*req.state().borrow(), ConnectionState::Disconnected);
581 }
582
583 #[test]
584 fn clone_shares_the_same_state_watch() {
585 let a = Request::new();
586 let b = a.clone();
587 // Both handles should observe the same initial value, because
588 // the watch channel is shared (clone clones a Receiver).
589 assert_eq!(*a.state().borrow(), *b.state().borrow());
590 }
591
592 #[test]
593 fn default_is_equivalent_to_new() {
594 let req = Request::default();
595 assert_eq!(*req.state().borrow(), ConnectionState::Disconnected);
596 }
597
598 #[test]
599 fn dropping_handle_does_not_panic() {
600 // When the last handle drops, the mpsc channel closes and the
601 // driver exits cleanly via blocking_recv → None. We can't
602 // directly observe the thread join without tracking the
603 // JoinHandle, but we can at least confirm dropping is safe.
604 let req = Request::new();
605 drop(req);
606 }
607
608 #[tokio::test]
609 async fn disconnect_without_connect_is_ok() {
610 // ConnectionManager::disconnect is idempotent — both sock and
611 // tls_cfg are None, so nothing happens at the NNG layer.
612 let req = Request::new();
613 req.disconnect().await.expect("no-op disconnect must succeed");
614 }
615
616 #[tokio::test]
617 async fn login_without_connect_errors_with_connection_variant() {
618 let req = Request::new();
619 let err = req
620 .login("u", "p")
621 .await
622 .expect_err("no socket → Connection error");
623 assert!(matches!(err, MotorcortexError::Connection(_)));
624 }
625
626 #[tokio::test]
627 async fn logout_without_connect_errors_with_connection_variant() {
628 let req = Request::new();
629 let err = req
630 .logout()
631 .await
632 .expect_err("no socket → Connection error");
633 assert!(matches!(err, MotorcortexError::Connection(_)));
634 }
635
636 #[tokio::test]
637 async fn get_session_token_without_connect_errors() {
638 let req = Request::new();
639 let err = req
640 .get_session_token()
641 .await
642 .expect_err("no socket → Connection error");
643 assert!(matches!(err, MotorcortexError::Connection(_)));
644 }
645
646 #[tokio::test]
647 async fn restore_session_without_connect_errors() {
648 let req = Request::new();
649 let err = req
650 .restore_session("dummy-token")
651 .await
652 .expect_err("no socket → Connection error");
653 assert!(matches!(err, MotorcortexError::Connection(_)));
654 }
655
656 #[test]
657 fn session_token_is_none_on_fresh_handle() {
658 let req = Request::new();
659 assert!(req.session_token().is_none());
660 }
661
662 #[tokio::test]
663 async fn get_parameter_tree_hash_without_connect_errors() {
664 let req = Request::new();
665 let err = req
666 .get_parameter_tree_hash()
667 .await
668 .expect_err("no socket → Connection error");
669 assert!(matches!(err, MotorcortexError::Connection(_)));
670 }
671
672 #[tokio::test]
673 async fn create_group_without_connect_errors() {
674 let req = Request::new();
675 let err = req
676 .create_group("root/x", "g", 1)
677 .await
678 .expect_err("no socket → Connection error");
679 assert!(matches!(err, MotorcortexError::Connection(_)));
680 }
681
682 #[tokio::test]
683 async fn remove_group_without_connect_errors() {
684 let req = Request::new();
685 let err = req
686 .remove_group("g")
687 .await
688 .expect_err("no socket → Connection error");
689 assert!(matches!(err, MotorcortexError::Connection(_)));
690 }
691
692 #[tokio::test]
693 async fn set_parameters_on_empty_tree_returns_parameter_not_found() {
694 let req = Request::new();
695 let err = req
696 .set_parameters(&["root/missing"], (1.0f64,))
697 .await
698 .expect_err("empty tree → ParameterNotFound");
699 assert!(matches!(err, MotorcortexError::ParameterNotFound(ref p) if p == "root/missing"));
700 }
701
702 #[tokio::test]
703 async fn get_parameters_on_empty_tree_returns_parameter_not_found() {
704 let req = Request::new();
705 let err = req
706 .get_parameters::<(f64,)>(&["root/missing"])
707 .await
708 .expect_err("empty tree → ParameterNotFound");
709 assert!(matches!(err, MotorcortexError::ParameterNotFound(ref p) if p == "root/missing"));
710 }
711
712 #[tokio::test]
713 async fn set_parameter_on_empty_tree_returns_parameter_not_found() {
714 let req = Request::new();
715 let err = req
716 .set_parameter("root/missing", 1.0f64)
717 .await
718 .expect_err("empty tree → ParameterNotFound");
719 assert!(matches!(err, MotorcortexError::ParameterNotFound(ref p) if p == "root/missing"));
720 }
721
722 #[tokio::test]
723 async fn get_parameter_on_empty_tree_returns_parameter_not_found() {
724 let req = Request::new();
725 let err = req
726 .get_parameter::<f64>("root/missing")
727 .await
728 .expect_err("empty tree → ParameterNotFound");
729 assert!(matches!(err, MotorcortexError::ParameterNotFound(ref p) if p == "root/missing"));
730 }
731
732 #[tokio::test]
733 async fn request_parameter_tree_without_connect_errors() {
734 let req = Request::new();
735 let err = req
736 .request_parameter_tree()
737 .await
738 .expect_err("no socket → Connection error");
739 assert!(matches!(err, MotorcortexError::Connection(_)));
740 }
741
742 #[test]
743 fn parameter_tree_is_empty_on_fresh_handle() {
744 let req = Request::new();
745 let tree = req.parameter_tree();
746 let guard = tree.read().unwrap();
747 assert!(guard.get_parameter_info("anything").is_none());
748 }
749
750 #[test]
751 fn parameter_tree_is_shared_across_clones() {
752 // Both handles hold the same Arc<RwLock<ParameterTree>>.
753 let a = Request::new();
754 let b = a.clone();
755 assert!(Arc::ptr_eq(&a.parameter_tree(), &b.parameter_tree()));
756 }
757
758 #[tokio::test]
759 async fn state_observer_sees_disconnect() {
760 // After a disconnect the watch should still read Disconnected.
761 // `state.changed().await` fires only on a *transition*, and our
762 // initial value is already Disconnected — but calling
763 // disconnect() flushes a fresh publish through the driver, so
764 // the value is still correct.
765 let req = Request::new();
766 let mut state = req.state();
767 req.disconnect().await.unwrap();
768 assert_eq!(*state.borrow_and_update(), ConnectionState::Disconnected);
769 }
770}