Skip to main content

motorcortex_rust/core/
subscribe.rs

1//! The async-first `Subscribe` handle.
2//!
3//! Same actor pattern as [`crate::core::Request`]: a thin
4//! `mpsc::UnboundedSender` into a dedicated driver thread. The driver
5//! owns the NNG SUB socket; commands (`SubCmd`) serialise through it.
6//!
7//! Subscriptions + receive-loop plumbing land in the next commits; this
8//! module scaffolds connect / disconnect first so the rest can build on
9//! the same shape.
10
11use std::collections::HashMap;
12use std::sync::{Arc, RwLock};
13use std::thread;
14
15use tokio::sync::{mpsc, oneshot, watch};
16
17use crate::client::Parameters;
18use crate::connection::{ConnectionOptions, PipeEvent};
19use crate::core::driver::run_subscribe_driver;
20use crate::core::request::Request;
21use crate::core::state::ConnectionState;
22use crate::core::subscription::Subscription;
23use crate::core::util::await_reply;
24use crate::error::{MotorcortexError, Result};
25use crate::msg::{GroupStatusMsg, StatusCode};
26
27pub(crate) enum SubCmd {
28    Connect {
29        url: String,
30        opts: ConnectionOptions,
31        reply: oneshot::Sender<Result<()>>,
32    },
33    Disconnect {
34        reply: oneshot::Sender<Result<()>>,
35    },
36    Subscribe {
37        group_msg: GroupStatusMsg,
38        fdiv: u32,
39        reply: oneshot::Sender<Result<Subscription>>,
40    },
41    Unsubscribe {
42        id: u32,
43        reply: oneshot::Sender<Result<()>>,
44    },
45    /// Re-register every pre-existing subscription with NNG using
46    /// freshly-created server groups. Carries `(old_id, new_group_msg)`
47    /// pairs — the driver unsubscribes the old NNG filter, subscribes
48    /// the new id, and [rebinds](crate::core::Subscription::rebind)
49    /// each `Subscription` in place so outstanding consumer handles
50    /// stay valid.
51    ApplyResubscribe {
52        results: Vec<(u32, GroupStatusMsg)>,
53        reply: oneshot::Sender<Result<()>>,
54    },
55    /// Forwarded from the NNG pipe-notify callback.
56    Pipe(PipeEvent),
57}
58
59/// Async handle for the pub/sub channel.
60///
61/// Cloning gives you another handle backed by the same driver, so
62/// every clone observes the same connection state.
63pub struct Subscribe {
64    tx: mpsc::UnboundedSender<SubCmd>,
65    state: watch::Receiver<ConnectionState>,
66    /// Shared with the driver: keyed by current server id, values are
67    /// live [`Subscription`] handles. Used by
68    /// [`resubscribe`](Self::resubscribe) to snapshot the active
69    /// groups without a driver round-trip.
70    subscriptions: Arc<RwLock<HashMap<u32, Subscription>>>,
71}
72
73impl Subscribe {
74    /// Create a new handle and spawn its driver thread.
75    pub fn new() -> Self {
76        let (tx, rx) = mpsc::unbounded_channel();
77        let (state_tx, state_rx) = watch::channel(ConnectionState::Disconnected);
78        let subscriptions: Arc<RwLock<HashMap<u32, Subscription>>> =
79            Arc::new(RwLock::new(HashMap::new()));
80        let subs_for_driver = Arc::clone(&subscriptions);
81        let tx_for_driver = tx.clone();
82        thread::Builder::new()
83            .name("mcx-subscribe-driver".into())
84            .spawn(move || {
85                run_subscribe_driver(tx_for_driver, rx, state_tx, subs_for_driver)
86            })
87            .expect("spawning the subscribe driver must succeed on any OS we target");
88        Self {
89            tx,
90            state: state_rx,
91            subscriptions,
92        }
93    }
94
95    pub fn state(&self) -> watch::Receiver<ConnectionState> {
96        self.state.clone()
97    }
98
99    pub async fn connect(&self, url: &str, opts: ConnectionOptions) -> Result<()> {
100        let (reply_tx, reply_rx) = oneshot::channel();
101        self.send_cmd(SubCmd::Connect {
102            url: url.to_string(),
103            opts,
104            reply: reply_tx,
105        })?;
106        await_reply(reply_rx).await?
107    }
108
109    pub async fn disconnect(&self) -> Result<()> {
110        let (reply_tx, reply_rx) = oneshot::channel();
111        self.send_cmd(SubCmd::Disconnect { reply: reply_tx })?;
112        await_reply(reply_rx).await?
113    }
114
115    /// Convenience: `Subscribe::new()` + [`connect`](Self::connect) in
116    /// one call.
117    pub async fn connect_to(url: &str, opts: ConnectionOptions) -> Result<Self> {
118        let sub = Self::new();
119        sub.connect(url, opts).await?;
120        Ok(sub)
121    }
122
123    /// Create a subscription group on the server + wire it up
124    /// locally so the receive thread starts pushing payloads into
125    /// the returned [`Subscription`] handle.
126    ///
127    /// The `req` handle is used only to issue the `CreateGroupMsg`
128    /// RPC; it's not held after this call returns.
129    ///
130    /// ```no_run
131    /// # async fn demo(
132    /// #     req: motorcortex_rust::core::Request,
133    /// #     sub: motorcortex_rust::core::Subscribe,
134    /// # ) -> motorcortex_rust::Result<()> {
135    /// let subscription = sub.subscribe(
136    ///     &req,
137    ///     ["root/Control/dummyDouble"],
138    ///     "my-group",
139    ///     10, // fdiv: every 10th cycle
140    /// ).await?;
141    /// // Now use subscription.notify / latest / stream.
142    /// # Ok(()) }
143    /// ```
144    pub async fn subscribe<I>(
145        &self,
146        req: &Request,
147        paths: I,
148        alias: &str,
149        fdiv: u32,
150    ) -> Result<Subscription>
151    where
152        I: Parameters,
153    {
154        let group_msg = req.create_group(paths, alias, fdiv).await?;
155        if group_msg.status != StatusCode::Ok as i32 {
156            return Err(MotorcortexError::Subscription(format!(
157                "Failed to create group '{alias}' on server, status: {}",
158                group_msg.status
159            )));
160        }
161
162        let (reply_tx, reply_rx) = oneshot::channel();
163        self.send_cmd(SubCmd::Subscribe {
164            group_msg,
165            fdiv,
166            reply: reply_tx,
167        })?;
168        await_reply(reply_rx).await?
169    }
170
171    /// Re-register every active subscription after a server-side
172    /// session restore.
173    ///
174    /// Iterates the currently-live subscriptions; for each one, calls
175    /// `req.create_group(paths, alias, fdiv)` to ask the server for a
176    /// fresh group descriptor, then hands the `(old_id, new_group)`
177    /// pairs to the driver which rebinds each [`Subscription`] in
178    /// place. Outstanding `Subscription` clones remain valid and
179    /// resume receiving payloads under the new id.
180    ///
181    /// Typical use: after a [`ConnectionState::ConnectionLost`] →
182    /// `Connected` transition where the server lost its group state
183    /// (e.g. a process restart). When the session is merely restored
184    /// via the token flow and groups persisted server-side, the
185    /// `create_group` round-trip still succeeds and the rebind is a
186    /// no-op on the data path.
187    ///
188    /// ```no_run
189    /// # async fn demo(
190    /// #     req: motorcortex_rust::core::Request,
191    /// #     sub: motorcortex_rust::core::Subscribe,
192    /// # ) -> motorcortex_rust::Result<()> {
193    /// use motorcortex_rust::ConnectionState;
194    /// let mut state = sub.state();
195    /// while state.changed().await.is_ok() {
196    ///     if *state.borrow() == ConnectionState::Connected {
197    ///         sub.resubscribe(&req).await?;
198    ///     }
199    /// }
200    /// # Ok(()) }
201    /// ```
202    pub async fn resubscribe(&self, req: &Request) -> Result<()> {
203        // Snapshot the active subscriptions. Cloning the handles is
204        // cheap (Arc bump) and lets us drop the read lock immediately
205        // so concurrent subscribe/unsubscribe commands aren't blocked
206        // by the RPC round-trips that follow.
207        let snapshot: Vec<Subscription> = self
208            .subscriptions
209            .read()
210            .map_err(|_| MotorcortexError::Subscription("subscriptions lock poisoned".into()))?
211            .values()
212            .cloned()
213            .collect();
214
215        let mut results = Vec::with_capacity(snapshot.len());
216        for sub in snapshot {
217            let paths = sub.paths();
218            let alias = sub.name().to_string();
219            let fdiv = sub.fdiv();
220            let new_group = req.create_group(paths, &alias, fdiv).await?;
221            if new_group.status != StatusCode::Ok as i32 {
222                return Err(MotorcortexError::Subscription(format!(
223                    "resubscribe: create_group('{alias}') failed with status {}",
224                    new_group.status
225                )));
226            }
227            results.push((sub.id(), new_group));
228        }
229
230        let (reply_tx, reply_rx) = oneshot::channel();
231        self.send_cmd(SubCmd::ApplyResubscribe {
232            results,
233            reply: reply_tx,
234        })?;
235        await_reply(reply_rx).await?
236    }
237
238    /// Drop a subscription locally + remove the group on the server.
239    /// Safe to call with a stale `Subscription` — if the id isn't
240    /// in the active table the local side is a no-op, and the
241    /// server-side `remove_group` returns its own StatusCode which
242    /// we silently ignore here (a stale group is a cleanup success
243    /// from the caller's perspective). Transport / decode failures
244    /// still propagate as `Err`.
245    pub async fn unsubscribe(&self, req: &Request, sub: Subscription) -> Result<()> {
246        let id = sub.id();
247        let alias = sub.name().to_string();
248        drop(sub);
249
250        let (reply_tx, reply_rx) = oneshot::channel();
251        self.send_cmd(SubCmd::Unsubscribe {
252            id,
253            reply: reply_tx,
254        })?;
255        await_reply(reply_rx).await??;
256
257        req.remove_group(&alias).await?;
258        Ok(())
259    }
260
261    fn send_cmd(&self, cmd: SubCmd) -> Result<()> {
262        self.tx
263            .send(cmd)
264            .map_err(|_| MotorcortexError::Connection("subscribe driver is gone".into()))
265    }
266}
267
268impl Default for Subscribe {
269    fn default() -> Self {
270        Self::new()
271    }
272}
273
274impl Clone for Subscribe {
275    fn clone(&self) -> Self {
276        Self {
277            tx: self.tx.clone(),
278            state: self.state.clone(),
279            subscriptions: Arc::clone(&self.subscriptions),
280        }
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287
288    #[test]
289    fn new_starts_disconnected() {
290        let sub = Subscribe::new();
291        assert_eq!(*sub.state().borrow(), ConnectionState::Disconnected);
292    }
293
294    #[test]
295    fn clone_shares_the_same_state_watch() {
296        let a = Subscribe::new();
297        let b = a.clone();
298        assert_eq!(*a.state().borrow(), *b.state().borrow());
299    }
300
301    #[test]
302    fn default_is_equivalent_to_new() {
303        let sub = Subscribe::default();
304        assert_eq!(*sub.state().borrow(), ConnectionState::Disconnected);
305    }
306
307    #[test]
308    fn dropping_handle_does_not_panic() {
309        let sub = Subscribe::new();
310        drop(sub);
311    }
312
313    #[tokio::test]
314    async fn disconnect_without_connect_is_ok() {
315        let sub = Subscribe::new();
316        sub.disconnect().await.expect("no-op disconnect must succeed");
317    }
318}