Skip to main content

motorcortex_rust/core/
request.rs

1//! The async-first `Request` handle.
2//!
3//! Cheap to clone (carries a `mpsc::Sender`); every method is an
4//! `async fn` that resolves once the driver thread serves the
5//! command. See [`crate::core::driver`] for the loop on the other
6//! side of the channel.
7
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::{Arc, RwLock};
10use std::thread;
11
12use tokio::sync::{mpsc, oneshot, watch};
13
14use crate::client::{ParameterTree, Parameters};
15use crate::connection::{ConnectionOptions, PipeEvent};
16use crate::core::driver::run_request_driver;
17use crate::core::state::ConnectionState;
18use crate::core::util::await_reply;
19use crate::error::{MotorcortexError, Result};
20use crate::msg::{
21    CreateGroupMsg, GetParameterListMsg, GetParameterMsg, GroupStatusMsg, ParameterListMsg,
22    SetParameterListMsg, SetParameterMsg, StatusCode,
23};
24use crate::parameter_value::{
25    GetParameterTuple, GetParameterValue, SetParameterTuple, SetParameterValue,
26    decode_parameter_value, encode_parameter_value,
27};
28
29/// Commands the `Request` handle sends to its driver thread.
30///
31/// Each variant carries an `oneshot::Sender` for the driver to hand
32/// back the result — the user `.await`s the matching `Receiver`.
33pub(crate) enum Cmd {
34    Connect {
35        url: String,
36        opts: ConnectionOptions,
37        reply: oneshot::Sender<Result<()>>,
38    },
39    Disconnect {
40        reply: oneshot::Sender<Result<()>>,
41    },
42    Login {
43        user: String,
44        pass: String,
45        reply: oneshot::Sender<Result<StatusCode>>,
46    },
47    Logout {
48        reply: oneshot::Sender<Result<StatusCode>>,
49    },
50    RequestParameterTree {
51        reply: oneshot::Sender<Result<StatusCode>>,
52    },
53    GetParameter {
54        path: String,
55        reply: oneshot::Sender<Result<Vec<u8>>>,
56    },
57    SetParameter {
58        path: String,
59        value: Vec<u8>,
60        reply: oneshot::Sender<Result<StatusCode>>,
61    },
62    GetParameters {
63        msg: GetParameterListMsg,
64        reply: oneshot::Sender<Result<ParameterListMsg>>,
65    },
66    SetParameters {
67        msg: SetParameterListMsg,
68        reply: oneshot::Sender<Result<StatusCode>>,
69    },
70    CreateGroup {
71        msg: CreateGroupMsg,
72        reply: oneshot::Sender<Result<GroupStatusMsg>>,
73    },
74    RemoveGroup {
75        alias: String,
76        reply: oneshot::Sender<Result<StatusCode>>,
77    },
78    GetParameterTreeHash {
79        reply: oneshot::Sender<Result<u32>>,
80    },
81    /// Fetch a fresh session token from the server. On success,
82    /// updates the driver's shared `last_token` cache (which powers
83    /// the automatic restore on reconnect) *and* returns the token
84    /// to the caller.
85    GetSessionToken {
86        reply: oneshot::Sender<Result<String>>,
87    },
88    /// Restore a previously-issued session without re-login. The
89    /// caller supplies the token explicitly; the driver uses this
90    /// path internally on reconnect too.
91    RestoreSession {
92        token: String,
93        reply: oneshot::Sender<Result<StatusCode>>,
94    },
95    /// Background token-refresh tick. Does a GetSessionToken RPC,
96    /// stores the result in the driver's `last_token` cache, and
97    /// discards the reply. Driven by the refresh helper thread.
98    RefreshTokenTick,
99    /// Forwarded from the NNG pipe-notify callback via an
100    /// `Arc<dyn Fn(PipeEvent)>` installed on the connection. Drains
101    /// through the same command queue as user RPCs so state
102    /// transitions don't race with requests in flight.
103    Pipe(PipeEvent),
104}
105
106/// Async handle for request/reply RPCs.
107///
108/// Cloning a `Request` gives a second handle that multiplexes onto the
109/// same driver thread — commands serialise through the driver, so the
110/// NNG Req/Rep ordering invariant is enforced in the type system
111/// without any user-visible `Mutex`.
112pub struct Request {
113    tx: mpsc::UnboundedSender<Cmd>,
114    state: watch::Receiver<ConnectionState>,
115    tree: Arc<RwLock<ParameterTree>>,
116    /// Latest session token from the most recent `GetSessionToken`
117    /// RPC (either user-invoked or via the refresh helper). Shared
118    /// with the driver so both the user side (via
119    /// [`session_token`](Self::session_token)) and the reconnect
120    /// handler see the same value.
121    last_token: Arc<RwLock<Option<String>>>,
122    /// Counter the driver bumps each time the background refresh
123    /// helper actually fires a `GetSessionToken` RPC (i.e. the
124    /// connection was live at tick time). Lets callers — and the
125    /// integration tests — observe that the refresh is paused while
126    /// the pipe is down.
127    refresh_count: Arc<AtomicU64>,
128}
129
130impl Request {
131    /// Create a new handle and spawn its driver thread.
132    ///
133    /// The handle starts in [`ConnectionState::Disconnected`]; call
134    /// [`connect`](Self::connect) to open the socket.
135    ///
136    /// ```
137    /// use motorcortex_rust::core::{ConnectionState, Request};
138    /// let req = Request::new();
139    /// assert_eq!(*req.state().borrow(), ConnectionState::Disconnected);
140    /// ```
141    pub fn new() -> Self {
142        let (tx, rx) = mpsc::unbounded_channel();
143        let (state_tx, state_rx) = watch::channel(ConnectionState::Disconnected);
144        let tree = Arc::new(RwLock::new(ParameterTree::new()));
145        let last_token: Arc<RwLock<Option<String>>> = Arc::new(RwLock::new(None));
146        let refresh_count = Arc::new(AtomicU64::new(0));
147        let tree_for_driver = Arc::clone(&tree);
148        let token_for_driver = Arc::clone(&last_token);
149        let count_for_driver = Arc::clone(&refresh_count);
150        let tx_for_driver = tx.clone();
151        thread::Builder::new()
152            .name("mcx-request-driver".into())
153            .spawn(move || {
154                run_request_driver(
155                    tx_for_driver,
156                    rx,
157                    state_tx,
158                    tree_for_driver,
159                    token_for_driver,
160                    count_for_driver,
161                )
162            })
163            .expect("spawning the driver thread must succeed on any OS we target");
164        Self {
165            tx,
166            state: state_rx,
167            tree,
168            last_token,
169            refresh_count,
170        }
171    }
172
173    /// Subscribe to connection-state transitions.
174    ///
175    /// Returns a `watch::Receiver`; consumers can `state.changed().await`
176    /// or `*state.borrow()` for the current value.
177    pub fn state(&self) -> watch::Receiver<ConnectionState> {
178        self.state.clone()
179    }
180
181    /// Open the socket and dial `url`.
182    ///
183    /// ```no_run
184    /// # async fn demo() -> motorcortex_rust::Result<()> {
185    /// use motorcortex_rust::{ConnectionOptions, core::Request};
186    /// let req = Request::new();
187    /// let opts = ConnectionOptions::new("mcx.cert.crt".into(), 1000, 1000);
188    /// req.connect("wss://127.0.0.1:5568", opts).await?;
189    /// # Ok(()) }
190    /// ```
191    pub async fn connect(&self, url: &str, opts: ConnectionOptions) -> Result<()> {
192        let (reply_tx, reply_rx) = oneshot::channel();
193        self.send_cmd(Cmd::Connect {
194            url: url.to_string(),
195            opts,
196            reply: reply_tx,
197        })?;
198        await_reply(reply_rx).await?
199    }
200
201    /// Close the socket. Subsequent RPCs will error with
202    /// [`MotorcortexError::Connection`] until `connect` is called again.
203    pub async fn disconnect(&self) -> Result<()> {
204        let (reply_tx, reply_rx) = oneshot::channel();
205        self.send_cmd(Cmd::Disconnect { reply: reply_tx })?;
206        await_reply(reply_rx).await?
207    }
208
209    /// Authenticate with the server.
210    ///
211    /// Returns the server's [`StatusCode`] — `Ok` on success,
212    /// `WrongPassword` / other variants on rejection. Transport-level
213    /// failures surface as [`MotorcortexError`].
214    pub async fn login(&self, user: &str, pass: &str) -> Result<StatusCode> {
215        let (reply_tx, reply_rx) = oneshot::channel();
216        self.send_cmd(Cmd::Login {
217            user: user.to_string(),
218            pass: pass.to_string(),
219            reply: reply_tx,
220        })?;
221        await_reply(reply_rx).await?
222    }
223
224    /// Drop the current session on the server.
225    pub async fn logout(&self) -> Result<StatusCode> {
226        let (reply_tx, reply_rx) = oneshot::channel();
227        self.send_cmd(Cmd::Logout { reply: reply_tx })?;
228        await_reply(reply_rx).await?
229    }
230
231    /// Fetch the parameter tree from the server and store it in the
232    /// shared cache accessible via [`parameter_tree`](Self::parameter_tree).
233    ///
234    /// Returns the server's [`StatusCode`]. The cache is only updated
235    /// on `StatusCode::Ok`; a non-OK reply leaves the previous cache
236    /// intact.
237    pub async fn request_parameter_tree(&self) -> Result<StatusCode> {
238        let (reply_tx, reply_rx) = oneshot::channel();
239        self.send_cmd(Cmd::RequestParameterTree { reply: reply_tx })?;
240        await_reply(reply_rx).await?
241    }
242
243    /// Shared read handle to the local parameter-tree cache. Every
244    /// cloned [`Request`] sees the same tree; reads are `RwLock::read`
245    /// (cheap, no channel round-trip). Populated by
246    /// [`request_parameter_tree`](Self::request_parameter_tree).
247    pub fn parameter_tree(&self) -> Arc<RwLock<ParameterTree>> {
248        Arc::clone(&self.tree)
249    }
250
251    /// Read a single parameter. The caller-specified `V` is the Rust
252    /// type the server value should be converted to (see
253    /// [`GetParameterValue`]).
254    ///
255    /// Returns [`MotorcortexError::ParameterNotFound`] if `path` is
256    /// unknown locally — call [`request_parameter_tree`](Self::request_parameter_tree)
257    /// first.
258    ///
259    /// ```no_run
260    /// # async fn demo(req: motorcortex_rust::core::Request) -> motorcortex_rust::Result<()> {
261    /// // Same path, three different Rust types — the server value is
262    /// // converted per call (lossy casts allowed).
263    /// let as_double: f64    = req.get_parameter("root/Control/dummyDouble").await?;
264    /// let as_int:    i64    = req.get_parameter("root/Control/dummyDouble").await?;
265    /// let as_text:   String = req.get_parameter("root/Control/dummyDouble").await?;
266    /// # Ok(()) }
267    /// ```
268    pub async fn get_parameter<V>(&self, path: &str) -> Result<V>
269    where
270        V: GetParameterValue + Default,
271    {
272        let data_type = self.data_type_of(path)?;
273
274        let (reply_tx, reply_rx) = oneshot::channel();
275        self.send_cmd(Cmd::GetParameter {
276            path: path.to_string(),
277            reply: reply_tx,
278        })?;
279        let value_bytes = await_reply(reply_rx).await??;
280        Ok(decode_parameter_value::<V>(data_type, &value_bytes))
281    }
282
283    /// Write a single parameter. Returns the server's [`StatusCode`].
284    ///
285    /// ```no_run
286    /// # async fn demo(req: motorcortex_rust::core::Request) -> motorcortex_rust::Result<()> {
287    /// // Scalar.
288    /// req.set_parameter("root/Control/dummyDouble", 2.345_f64).await?;
289    /// // Fixed-size array.
290    /// req.set_parameter("root/Control/dummyDoubleVec", [1.0, 2.0, 3.0]).await?;
291    /// // Dynamic Vec.
292    /// req.set_parameter("root/Control/dummyDoubleVec", vec![1.0, 2.0]).await?;
293    /// # Ok(()) }
294    /// ```
295    pub async fn set_parameter<V>(&self, path: &str, value: V) -> Result<StatusCode>
296    where
297        V: SetParameterValue,
298    {
299        let data_type = self.data_type_of(path)?;
300        let value_bytes = encode_parameter_value(data_type, &value);
301
302        let (reply_tx, reply_rx) = oneshot::channel();
303        self.send_cmd(Cmd::SetParameter {
304            path: path.to_string(),
305            value: value_bytes,
306            reply: reply_tx,
307        })?;
308        await_reply(reply_rx).await?
309    }
310
311    /// Read a batch of parameters in one RPC. The generic `T` is a
312    /// tuple type like `(bool, f64, i32)` whose arity matches
313    /// `paths.len()`. Each element is decoded into its position using
314    /// the dtype recorded in the local tree cache.
315    ///
316    /// ```no_run
317    /// # async fn demo(req: motorcortex_rust::core::Request) -> motorcortex_rust::Result<()> {
318    /// let (b, d, i): (bool, f64, i32) = req.get_parameters(&[
319    ///     "root/Control/dummyBool",
320    ///     "root/Control/dummyDouble",
321    ///     "root/Control/dummyInt32",
322    /// ]).await?;
323    /// # Ok(()) }
324    /// ```
325    pub async fn get_parameters<T>(&self, paths: &[&str]) -> Result<T>
326    where
327        T: GetParameterTuple,
328    {
329        // Resolve every dtype up-front so the first missing path is
330        // reported before we even build the request message.
331        let dtypes: Vec<u32> = paths
332            .iter()
333            .map(|p| self.data_type_of(p))
334            .collect::<Result<_>>()?;
335
336        let msg = GetParameterListMsg {
337            header: None,
338            params: paths
339                .iter()
340                .map(|p| GetParameterMsg {
341                    header: None,
342                    path: p.to_string(),
343                })
344                .collect(),
345        };
346
347        let (reply_tx, reply_rx) = oneshot::channel();
348        self.send_cmd(Cmd::GetParameters {
349            msg,
350            reply: reply_tx,
351        })?;
352        let reply = await_reply(reply_rx).await??;
353
354        // Use the dtypes the client looked up locally, not whatever
355        // the server echoed back. Matches legacy semantics (dtype is
356        // authoritative client-side — it's what the caller used when
357        // picking `T`'s element types).
358        let iter = reply
359            .params
360            .iter()
361            .zip(dtypes.iter())
362            .map(|(param, dt)| (dt, param.value.as_slice()));
363        T::get_parameters(iter).map_err(MotorcortexError::Decode)
364    }
365
366    /// Write a batch of parameters in one RPC. `values` is a tuple
367    /// (or single-element tuple) whose arity matches `paths.len()`.
368    /// Each element is encoded against the dtype recorded in the
369    /// local tree cache.
370    pub async fn set_parameters<T>(&self, paths: &[&str], values: T) -> Result<StatusCode>
371    where
372        T: SetParameterTuple,
373    {
374        let mut params = Vec::with_capacity(paths.len());
375        for (i, path) in paths.iter().enumerate() {
376            let data_type = self.data_type_of(path)?;
377            let value = values
378                .get_tuple_element(i, data_type)
379                .map_err(MotorcortexError::Encode)?;
380            params.push(SetParameterMsg {
381                header: None,
382                offset: None,
383                path: path.to_string(),
384                value,
385            });
386        }
387
388        let msg = SetParameterListMsg {
389            header: None,
390            params,
391        };
392
393        let (reply_tx, reply_rx) = oneshot::channel();
394        self.send_cmd(Cmd::SetParameters {
395            msg,
396            reply: reply_tx,
397        })?;
398        await_reply(reply_rx).await?
399    }
400
401    /// Create a server-side subscription group. Returns the
402    /// [`GroupStatusMsg`] the subscribe-side code uses as the
403    /// group descriptor.
404    ///
405    /// `paths` accepts anything implementing [`Parameters`] — a
406    /// string literal, a `Vec<String>`, an array of `&str`, etc.
407    pub async fn create_group<I>(
408        &self,
409        paths: I,
410        alias: &str,
411        frequency_divider: u32,
412    ) -> Result<GroupStatusMsg>
413    where
414        I: Parameters,
415    {
416        let msg = CreateGroupMsg {
417            header: None,
418            frq_divider: frequency_divider,
419            alias: alias.to_string(),
420            paths: paths.into_vec(),
421        };
422        let (reply_tx, reply_rx) = oneshot::channel();
423        self.send_cmd(Cmd::CreateGroup {
424            msg,
425            reply: reply_tx,
426        })?;
427        await_reply(reply_rx).await?
428    }
429
430    /// Remove a previously-created subscription group by alias.
431    /// Returns the server's [`StatusCode`] unchanged — `Ok` on
432    /// success, `Failed` (or similar) if the group wasn't there.
433    /// Transport / decode failures surface as
434    /// [`MotorcortexError`].
435    ///
436    /// The crate-wide rule: RPCs that return `Result<StatusCode>`
437    /// never promote a non-OK server reply into `Err`. Branch on
438    /// the returned code if you care which way the request went.
439    pub async fn remove_group(&self, alias: &str) -> Result<StatusCode> {
440        let (reply_tx, reply_rx) = oneshot::channel();
441        self.send_cmd(Cmd::RemoveGroup {
442            alias: alias.to_string(),
443            reply: reply_tx,
444        })?;
445        await_reply(reply_rx).await?
446    }
447
448    /// Fetch a fresh session token from the server.
449    ///
450    /// On success the token is cached in the driver and returned to
451    /// the caller. Callers that want to persist a session across
452    /// process restarts can stash the returned string and hand it
453    /// back to a fresh `Request` via [`restore_session`](Self::restore_session).
454    ///
455    /// While a connection is live, the driver also refreshes this
456    /// token periodically in the background (see
457    /// [`ConnectionOptions::token_refresh_interval`]) so the cache
458    /// stays warm for the automatic reconnect path.
459    pub async fn get_session_token(&self) -> Result<String> {
460        let (reply_tx, reply_rx) = oneshot::channel();
461        self.send_cmd(Cmd::GetSessionToken { reply: reply_tx })?;
462        await_reply(reply_rx).await?
463    }
464
465    /// Restore a previously-issued session by supplying the token.
466    /// Returns the server's [`StatusCode`]:
467    ///
468    /// - `Ok` / `ReadOnlyMode` — the session was accepted, subsequent
469    ///   RPCs run under that identity.
470    /// - `PermissionDenied` / `Failed` — the token is stale or the
471    ///   server has lost its state.
472    ///
473    /// The driver calls this internally on reconnect using the token
474    /// stashed by the refresh loop; callers generally don't need to
475    /// invoke it explicitly unless they're recovering from a process
476    /// restart.
477    pub async fn restore_session(&self, token: &str) -> Result<StatusCode> {
478        let (reply_tx, reply_rx) = oneshot::channel();
479        self.send_cmd(Cmd::RestoreSession {
480            token: token.to_string(),
481            reply: reply_tx,
482        })?;
483        await_reply(reply_rx).await?
484    }
485
486    /// Snapshot of the most recently cached session token, if any.
487    ///
488    /// Populated by [`get_session_token`](Self::get_session_token)
489    /// and by the periodic refresh loop. Returns `None` before the
490    /// first successful fetch.
491    pub fn session_token(&self) -> Option<String> {
492        self.last_token.read().ok().and_then(|g| g.clone())
493    }
494
495    /// Count of `GetSessionToken` RPCs the background refresh helper
496    /// has fired since this handle was created.
497    ///
498    /// Bumped by the driver only when the refresh tick runs against
499    /// a live pipe — ticks that fire while the state is
500    /// `ConnectionLost` / `SessionExpired` / `Disconnected` are
501    /// skipped and don't count. Useful for observing that the
502    /// refresh loop is paused while the transport is down, or as a
503    /// lightweight liveness metric.
504    pub fn session_refresh_count(&self) -> u64 {
505        self.refresh_count.load(Ordering::Relaxed)
506    }
507
508    /// Fetch the server's parameter-tree hash — useful for cheap
509    /// change detection. A non-zero return on a populated server
510    /// signals the caller can skip a full [`request_parameter_tree`]
511    /// if the hash matches what they cached previously.
512    ///
513    /// [`request_parameter_tree`]: Self::request_parameter_tree
514    pub async fn get_parameter_tree_hash(&self) -> Result<u32> {
515        let (reply_tx, reply_rx) = oneshot::channel();
516        self.send_cmd(Cmd::GetParameterTreeHash { reply: reply_tx })?;
517        await_reply(reply_rx).await?
518    }
519
520    /// Convenience: `Request::new()` + [`connect`](Self::connect) in
521    /// one call. Useful for the "I just want a connected client"
522    /// entry path.
523    ///
524    /// ```no_run
525    /// # async fn demo() -> motorcortex_rust::Result<()> {
526    /// use motorcortex_rust::{ConnectionOptions, core::Request};
527    /// let opts = ConnectionOptions::new("mcx.cert.crt".into(), 1000, 1000);
528    /// let req = Request::connect_to("wss://127.0.0.1:5568", opts).await?;
529    /// req.request_parameter_tree().await?;
530    /// # Ok(()) }
531    /// ```
532    pub async fn connect_to(url: &str, opts: ConnectionOptions) -> Result<Self> {
533        let req = Self::new();
534        req.connect(url, opts).await?;
535        Ok(req)
536    }
537
538    /// Shared helper: local tree lookup with a clear
539    /// `ParameterNotFound` error when the path isn't cached.
540    fn data_type_of(&self, path: &str) -> Result<u32> {
541        self.tree
542            .read()
543            .map_err(|_| MotorcortexError::Decode("parameter tree lock poisoned".into()))?
544            .get_parameter_data_type(path)
545            .ok_or_else(|| MotorcortexError::ParameterNotFound(path.to_string()))
546    }
547
548    fn send_cmd(&self, cmd: Cmd) -> Result<()> {
549        self.tx
550            .send(cmd)
551            .map_err(|_| MotorcortexError::Connection("driver thread is gone".into()))
552    }
553}
554
555impl Default for Request {
556    fn default() -> Self {
557        Self::new()
558    }
559}
560
561impl Clone for Request {
562    fn clone(&self) -> Self {
563        Self {
564            tx: self.tx.clone(),
565            state: self.state.clone(),
566            tree: Arc::clone(&self.tree),
567            last_token: Arc::clone(&self.last_token),
568            refresh_count: Arc::clone(&self.refresh_count),
569        }
570    }
571}
572
573#[cfg(test)]
574mod tests {
575    use super::*;
576
577    #[test]
578    fn new_starts_disconnected() {
579        let req = Request::new();
580        assert_eq!(*req.state().borrow(), ConnectionState::Disconnected);
581    }
582
583    #[test]
584    fn clone_shares_the_same_state_watch() {
585        let a = Request::new();
586        let b = a.clone();
587        // Both handles should observe the same initial value, because
588        // the watch channel is shared (clone clones a Receiver).
589        assert_eq!(*a.state().borrow(), *b.state().borrow());
590    }
591
592    #[test]
593    fn default_is_equivalent_to_new() {
594        let req = Request::default();
595        assert_eq!(*req.state().borrow(), ConnectionState::Disconnected);
596    }
597
598    #[test]
599    fn dropping_handle_does_not_panic() {
600        // When the last handle drops, the mpsc channel closes and the
601        // driver exits cleanly via blocking_recv → None. We can't
602        // directly observe the thread join without tracking the
603        // JoinHandle, but we can at least confirm dropping is safe.
604        let req = Request::new();
605        drop(req);
606    }
607
608    #[tokio::test]
609    async fn disconnect_without_connect_is_ok() {
610        // ConnectionManager::disconnect is idempotent — both sock and
611        // tls_cfg are None, so nothing happens at the NNG layer.
612        let req = Request::new();
613        req.disconnect().await.expect("no-op disconnect must succeed");
614    }
615
616    #[tokio::test]
617    async fn login_without_connect_errors_with_connection_variant() {
618        let req = Request::new();
619        let err = req
620            .login("u", "p")
621            .await
622            .expect_err("no socket → Connection error");
623        assert!(matches!(err, MotorcortexError::Connection(_)));
624    }
625
626    #[tokio::test]
627    async fn logout_without_connect_errors_with_connection_variant() {
628        let req = Request::new();
629        let err = req
630            .logout()
631            .await
632            .expect_err("no socket → Connection error");
633        assert!(matches!(err, MotorcortexError::Connection(_)));
634    }
635
636    #[tokio::test]
637    async fn get_session_token_without_connect_errors() {
638        let req = Request::new();
639        let err = req
640            .get_session_token()
641            .await
642            .expect_err("no socket → Connection error");
643        assert!(matches!(err, MotorcortexError::Connection(_)));
644    }
645
646    #[tokio::test]
647    async fn restore_session_without_connect_errors() {
648        let req = Request::new();
649        let err = req
650            .restore_session("dummy-token")
651            .await
652            .expect_err("no socket → Connection error");
653        assert!(matches!(err, MotorcortexError::Connection(_)));
654    }
655
656    #[test]
657    fn session_token_is_none_on_fresh_handle() {
658        let req = Request::new();
659        assert!(req.session_token().is_none());
660    }
661
662    #[tokio::test]
663    async fn get_parameter_tree_hash_without_connect_errors() {
664        let req = Request::new();
665        let err = req
666            .get_parameter_tree_hash()
667            .await
668            .expect_err("no socket → Connection error");
669        assert!(matches!(err, MotorcortexError::Connection(_)));
670    }
671
672    #[tokio::test]
673    async fn create_group_without_connect_errors() {
674        let req = Request::new();
675        let err = req
676            .create_group("root/x", "g", 1)
677            .await
678            .expect_err("no socket → Connection error");
679        assert!(matches!(err, MotorcortexError::Connection(_)));
680    }
681
682    #[tokio::test]
683    async fn remove_group_without_connect_errors() {
684        let req = Request::new();
685        let err = req
686            .remove_group("g")
687            .await
688            .expect_err("no socket → Connection error");
689        assert!(matches!(err, MotorcortexError::Connection(_)));
690    }
691
692    #[tokio::test]
693    async fn set_parameters_on_empty_tree_returns_parameter_not_found() {
694        let req = Request::new();
695        let err = req
696            .set_parameters(&["root/missing"], (1.0f64,))
697            .await
698            .expect_err("empty tree → ParameterNotFound");
699        assert!(matches!(err, MotorcortexError::ParameterNotFound(ref p) if p == "root/missing"));
700    }
701
702    #[tokio::test]
703    async fn get_parameters_on_empty_tree_returns_parameter_not_found() {
704        let req = Request::new();
705        let err = req
706            .get_parameters::<(f64,)>(&["root/missing"])
707            .await
708            .expect_err("empty tree → ParameterNotFound");
709        assert!(matches!(err, MotorcortexError::ParameterNotFound(ref p) if p == "root/missing"));
710    }
711
712    #[tokio::test]
713    async fn set_parameter_on_empty_tree_returns_parameter_not_found() {
714        let req = Request::new();
715        let err = req
716            .set_parameter("root/missing", 1.0f64)
717            .await
718            .expect_err("empty tree → ParameterNotFound");
719        assert!(matches!(err, MotorcortexError::ParameterNotFound(ref p) if p == "root/missing"));
720    }
721
722    #[tokio::test]
723    async fn get_parameter_on_empty_tree_returns_parameter_not_found() {
724        let req = Request::new();
725        let err = req
726            .get_parameter::<f64>("root/missing")
727            .await
728            .expect_err("empty tree → ParameterNotFound");
729        assert!(matches!(err, MotorcortexError::ParameterNotFound(ref p) if p == "root/missing"));
730    }
731
732    #[tokio::test]
733    async fn request_parameter_tree_without_connect_errors() {
734        let req = Request::new();
735        let err = req
736            .request_parameter_tree()
737            .await
738            .expect_err("no socket → Connection error");
739        assert!(matches!(err, MotorcortexError::Connection(_)));
740    }
741
742    #[test]
743    fn parameter_tree_is_empty_on_fresh_handle() {
744        let req = Request::new();
745        let tree = req.parameter_tree();
746        let guard = tree.read().unwrap();
747        assert!(guard.get_parameter_info("anything").is_none());
748    }
749
750    #[test]
751    fn parameter_tree_is_shared_across_clones() {
752        // Both handles hold the same Arc<RwLock<ParameterTree>>.
753        let a = Request::new();
754        let b = a.clone();
755        assert!(Arc::ptr_eq(&a.parameter_tree(), &b.parameter_tree()));
756    }
757
758    #[tokio::test]
759    async fn state_observer_sees_disconnect() {
760        // After a disconnect the watch should still read Disconnected.
761        // `state.changed().await` fires only on a *transition*, and our
762        // initial value is already Disconnected — but calling
763        // disconnect() flushes a fresh publish through the driver, so
764        // the value is still correct.
765        let req = Request::new();
766        let mut state = req.state();
767        req.disconnect().await.unwrap();
768        assert_eq!(*state.borrow_and_update(), ConnectionState::Disconnected);
769    }
770}