motorcortex-rust 0.5.0

Motorcortex Rust: a Rust client for the Motorcortex Core real-time control system (async + blocking).
Documentation
//! The async-first `Subscribe` handle.
//!
//! Same actor pattern as [`crate::core::Request`]: a thin
//! `mpsc::UnboundedSender` into a dedicated driver thread. The driver
//! owns the NNG SUB socket; commands (`SubCmd`) serialise through it.
//!
//! Subscriptions + receive-loop plumbing land in the next commits; this
//! module scaffolds connect / disconnect first so the rest can build on
//! the same shape.

use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::thread;

use tokio::sync::{mpsc, oneshot, watch};

use crate::client::Parameters;
use crate::connection::{ConnectionOptions, PipeEvent};
use crate::core::driver::run_subscribe_driver;
use crate::core::request::Request;
use crate::core::state::ConnectionState;
use crate::core::subscription::Subscription;
use crate::core::util::await_reply;
use crate::error::{MotorcortexError, Result};
use crate::msg::{GroupStatusMsg, StatusCode};

pub(crate) enum SubCmd {
    Connect {
        url: String,
        opts: ConnectionOptions,
        reply: oneshot::Sender<Result<()>>,
    },
    Disconnect {
        reply: oneshot::Sender<Result<()>>,
    },
    Subscribe {
        group_msg: GroupStatusMsg,
        fdiv: u32,
        reply: oneshot::Sender<Result<Subscription>>,
    },
    Unsubscribe {
        id: u32,
        reply: oneshot::Sender<Result<()>>,
    },
    /// Re-register every pre-existing subscription with NNG using
    /// freshly-created server groups. Carries `(old_id, new_group_msg)`
    /// pairs — the driver unsubscribes the old NNG filter, subscribes
    /// the new id, and [rebinds](crate::core::Subscription::rebind)
    /// each `Subscription` in place so outstanding consumer handles
    /// stay valid.
    ApplyResubscribe {
        results: Vec<(u32, GroupStatusMsg)>,
        reply: oneshot::Sender<Result<()>>,
    },
    /// Forwarded from the NNG pipe-notify callback.
    Pipe(PipeEvent),
}

/// Async handle for the pub/sub channel.
///
/// Cloning gives you another handle backed by the same driver, so
/// every clone observes the same connection state.
pub struct Subscribe {
    tx: mpsc::UnboundedSender<SubCmd>,
    state: watch::Receiver<ConnectionState>,
    /// Shared with the driver: keyed by current server id, values are
    /// live [`Subscription`] handles. Used by
    /// [`resubscribe`](Self::resubscribe) to snapshot the active
    /// groups without a driver round-trip.
    subscriptions: Arc<RwLock<HashMap<u32, Subscription>>>,
}

impl Subscribe {
    /// Create a new handle and spawn its driver thread.
    pub fn new() -> Self {
        let (tx, rx) = mpsc::unbounded_channel();
        let (state_tx, state_rx) = watch::channel(ConnectionState::Disconnected);
        let subscriptions: Arc<RwLock<HashMap<u32, Subscription>>> =
            Arc::new(RwLock::new(HashMap::new()));
        let subs_for_driver = Arc::clone(&subscriptions);
        let tx_for_driver = tx.clone();
        thread::Builder::new()
            .name("mcx-subscribe-driver".into())
            .spawn(move || {
                run_subscribe_driver(tx_for_driver, rx, state_tx, subs_for_driver)
            })
            .expect("spawning the subscribe driver must succeed on any OS we target");
        Self {
            tx,
            state: state_rx,
            subscriptions,
        }
    }

    pub fn state(&self) -> watch::Receiver<ConnectionState> {
        self.state.clone()
    }

    pub async fn connect(&self, url: &str, opts: ConnectionOptions) -> Result<()> {
        let (reply_tx, reply_rx) = oneshot::channel();
        self.send_cmd(SubCmd::Connect {
            url: url.to_string(),
            opts,
            reply: reply_tx,
        })?;
        await_reply(reply_rx).await?
    }

    pub async fn disconnect(&self) -> Result<()> {
        let (reply_tx, reply_rx) = oneshot::channel();
        self.send_cmd(SubCmd::Disconnect { reply: reply_tx })?;
        await_reply(reply_rx).await?
    }

    /// Convenience: `Subscribe::new()` + [`connect`](Self::connect) in
    /// one call.
    pub async fn connect_to(url: &str, opts: ConnectionOptions) -> Result<Self> {
        let sub = Self::new();
        sub.connect(url, opts).await?;
        Ok(sub)
    }

    /// Create a subscription group on the server + wire it up
    /// locally so the receive thread starts pushing payloads into
    /// the returned [`Subscription`] handle.
    ///
    /// The `req` handle is used only to issue the `CreateGroupMsg`
    /// RPC; it's not held after this call returns.
    ///
    /// ```no_run
    /// # async fn demo(
    /// #     req: motorcortex_rust::core::Request,
    /// #     sub: motorcortex_rust::core::Subscribe,
    /// # ) -> motorcortex_rust::Result<()> {
    /// let subscription = sub.subscribe(
    ///     &req,
    ///     ["root/Control/dummyDouble"],
    ///     "my-group",
    ///     10, // fdiv: every 10th cycle
    /// ).await?;
    /// // Now use subscription.notify / latest / stream.
    /// # Ok(()) }
    /// ```
    pub async fn subscribe<I>(
        &self,
        req: &Request,
        paths: I,
        alias: &str,
        fdiv: u32,
    ) -> Result<Subscription>
    where
        I: Parameters,
    {
        let group_msg = req.create_group(paths, alias, fdiv).await?;
        if group_msg.status != StatusCode::Ok as i32 {
            return Err(MotorcortexError::Subscription(format!(
                "Failed to create group '{alias}' on server, status: {}",
                group_msg.status
            )));
        }

        let (reply_tx, reply_rx) = oneshot::channel();
        self.send_cmd(SubCmd::Subscribe {
            group_msg,
            fdiv,
            reply: reply_tx,
        })?;
        await_reply(reply_rx).await?
    }

    /// Re-register every active subscription after a server-side
    /// session restore.
    ///
    /// Iterates the currently-live subscriptions; for each one, calls
    /// `req.create_group(paths, alias, fdiv)` to ask the server for a
    /// fresh group descriptor, then hands the `(old_id, new_group)`
    /// pairs to the driver which rebinds each [`Subscription`] in
    /// place. Outstanding `Subscription` clones remain valid and
    /// resume receiving payloads under the new id.
    ///
    /// Typical use: after a [`ConnectionState::ConnectionLost`] →
    /// `Connected` transition where the server lost its group state
    /// (e.g. a process restart). When the session is merely restored
    /// via the token flow and groups persisted server-side, the
    /// `create_group` round-trip still succeeds and the rebind is a
    /// no-op on the data path.
    ///
    /// ```no_run
    /// # async fn demo(
    /// #     req: motorcortex_rust::core::Request,
    /// #     sub: motorcortex_rust::core::Subscribe,
    /// # ) -> motorcortex_rust::Result<()> {
    /// use motorcortex_rust::ConnectionState;
    /// let mut state = sub.state();
    /// while state.changed().await.is_ok() {
    ///     if *state.borrow() == ConnectionState::Connected {
    ///         sub.resubscribe(&req).await?;
    ///     }
    /// }
    /// # Ok(()) }
    /// ```
    pub async fn resubscribe(&self, req: &Request) -> Result<()> {
        // Snapshot the active subscriptions. Cloning the handles is
        // cheap (Arc bump) and lets us drop the read lock immediately
        // so concurrent subscribe/unsubscribe commands aren't blocked
        // by the RPC round-trips that follow.
        let snapshot: Vec<Subscription> = self
            .subscriptions
            .read()
            .map_err(|_| MotorcortexError::Subscription("subscriptions lock poisoned".into()))?
            .values()
            .cloned()
            .collect();

        let mut results = Vec::with_capacity(snapshot.len());
        for sub in snapshot {
            let paths = sub.paths();
            let alias = sub.name().to_string();
            let fdiv = sub.fdiv();
            let new_group = req.create_group(paths, &alias, fdiv).await?;
            if new_group.status != StatusCode::Ok as i32 {
                return Err(MotorcortexError::Subscription(format!(
                    "resubscribe: create_group('{alias}') failed with status {}",
                    new_group.status
                )));
            }
            results.push((sub.id(), new_group));
        }

        let (reply_tx, reply_rx) = oneshot::channel();
        self.send_cmd(SubCmd::ApplyResubscribe {
            results,
            reply: reply_tx,
        })?;
        await_reply(reply_rx).await?
    }

    /// Drop a subscription locally + remove the group on the server.
    /// Safe to call with a stale `Subscription` — if the id isn't
    /// in the active table the local side is a no-op, and the
    /// server-side `remove_group` returns its own StatusCode which
    /// we silently ignore here (a stale group is a cleanup success
    /// from the caller's perspective). Transport / decode failures
    /// still propagate as `Err`.
    pub async fn unsubscribe(&self, req: &Request, sub: Subscription) -> Result<()> {
        let id = sub.id();
        let alias = sub.name().to_string();
        drop(sub);

        let (reply_tx, reply_rx) = oneshot::channel();
        self.send_cmd(SubCmd::Unsubscribe {
            id,
            reply: reply_tx,
        })?;
        await_reply(reply_rx).await??;

        req.remove_group(&alias).await?;
        Ok(())
    }

    fn send_cmd(&self, cmd: SubCmd) -> Result<()> {
        self.tx
            .send(cmd)
            .map_err(|_| MotorcortexError::Connection("subscribe driver is gone".into()))
    }
}

impl Default for Subscribe {
    fn default() -> Self {
        Self::new()
    }
}

impl Clone for Subscribe {
    fn clone(&self) -> Self {
        Self {
            tx: self.tx.clone(),
            state: self.state.clone(),
            subscriptions: Arc::clone(&self.subscriptions),
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn new_starts_disconnected() {
        let sub = Subscribe::new();
        assert_eq!(*sub.state().borrow(), ConnectionState::Disconnected);
    }

    #[test]
    fn clone_shares_the_same_state_watch() {
        let a = Subscribe::new();
        let b = a.clone();
        assert_eq!(*a.state().borrow(), *b.state().borrow());
    }

    #[test]
    fn default_is_equivalent_to_new() {
        let sub = Subscribe::default();
        assert_eq!(*sub.state().borrow(), ConnectionState::Disconnected);
    }

    #[test]
    fn dropping_handle_does_not_panic() {
        let sub = Subscribe::new();
        drop(sub);
    }

    #[tokio::test]
    async fn disconnect_without_connect_is_ok() {
        let sub = Subscribe::new();
        sub.disconnect().await.expect("no-op disconnect must succeed");
    }
}