motorcortex_rust/core/
subscribe.rs1use std::collections::HashMap;
12use std::sync::{Arc, RwLock};
13use std::thread;
14
15use tokio::sync::{mpsc, oneshot, watch};
16
17use crate::client::Parameters;
18use crate::connection::{ConnectionOptions, PipeEvent};
19use crate::core::driver::run_subscribe_driver;
20use crate::core::request::Request;
21use crate::core::state::ConnectionState;
22use crate::core::subscription::Subscription;
23use crate::core::util::await_reply;
24use crate::error::{MotorcortexError, Result};
25use crate::msg::{GroupStatusMsg, StatusCode};
26
27pub(crate) enum SubCmd {
28 Connect {
29 url: String,
30 opts: ConnectionOptions,
31 reply: oneshot::Sender<Result<()>>,
32 },
33 Disconnect {
34 reply: oneshot::Sender<Result<()>>,
35 },
36 Subscribe {
37 group_msg: GroupStatusMsg,
38 fdiv: u32,
39 reply: oneshot::Sender<Result<Subscription>>,
40 },
41 Unsubscribe {
42 id: u32,
43 reply: oneshot::Sender<Result<()>>,
44 },
45 ApplyResubscribe {
52 results: Vec<(u32, GroupStatusMsg)>,
53 reply: oneshot::Sender<Result<()>>,
54 },
55 Pipe(PipeEvent),
57}
58
59pub struct Subscribe {
64 tx: mpsc::UnboundedSender<SubCmd>,
65 state: watch::Receiver<ConnectionState>,
66 subscriptions: Arc<RwLock<HashMap<u32, Subscription>>>,
71}
72
73impl Subscribe {
74 pub fn new() -> Self {
76 let (tx, rx) = mpsc::unbounded_channel();
77 let (state_tx, state_rx) = watch::channel(ConnectionState::Disconnected);
78 let subscriptions: Arc<RwLock<HashMap<u32, Subscription>>> =
79 Arc::new(RwLock::new(HashMap::new()));
80 let subs_for_driver = Arc::clone(&subscriptions);
81 let tx_for_driver = tx.clone();
82 thread::Builder::new()
83 .name("mcx-subscribe-driver".into())
84 .spawn(move || {
85 run_subscribe_driver(tx_for_driver, rx, state_tx, subs_for_driver)
86 })
87 .expect("spawning the subscribe driver must succeed on any OS we target");
88 Self {
89 tx,
90 state: state_rx,
91 subscriptions,
92 }
93 }
94
95 pub fn state(&self) -> watch::Receiver<ConnectionState> {
96 self.state.clone()
97 }
98
99 pub async fn connect(&self, url: &str, opts: ConnectionOptions) -> Result<()> {
100 let (reply_tx, reply_rx) = oneshot::channel();
101 self.send_cmd(SubCmd::Connect {
102 url: url.to_string(),
103 opts,
104 reply: reply_tx,
105 })?;
106 await_reply(reply_rx).await?
107 }
108
109 pub async fn disconnect(&self) -> Result<()> {
110 let (reply_tx, reply_rx) = oneshot::channel();
111 self.send_cmd(SubCmd::Disconnect { reply: reply_tx })?;
112 await_reply(reply_rx).await?
113 }
114
115 pub async fn connect_to(url: &str, opts: ConnectionOptions) -> Result<Self> {
118 let sub = Self::new();
119 sub.connect(url, opts).await?;
120 Ok(sub)
121 }
122
123 pub async fn subscribe<I>(
145 &self,
146 req: &Request,
147 paths: I,
148 alias: &str,
149 fdiv: u32,
150 ) -> Result<Subscription>
151 where
152 I: Parameters,
153 {
154 let group_msg = req.create_group(paths, alias, fdiv).await?;
155 if group_msg.status != StatusCode::Ok as i32 {
156 return Err(MotorcortexError::Subscription(format!(
157 "Failed to create group '{alias}' on server, status: {}",
158 group_msg.status
159 )));
160 }
161
162 let (reply_tx, reply_rx) = oneshot::channel();
163 self.send_cmd(SubCmd::Subscribe {
164 group_msg,
165 fdiv,
166 reply: reply_tx,
167 })?;
168 await_reply(reply_rx).await?
169 }
170
171 pub async fn resubscribe(&self, req: &Request) -> Result<()> {
203 let snapshot: Vec<Subscription> = self
208 .subscriptions
209 .read()
210 .map_err(|_| MotorcortexError::Subscription("subscriptions lock poisoned".into()))?
211 .values()
212 .cloned()
213 .collect();
214
215 let mut results = Vec::with_capacity(snapshot.len());
216 for sub in snapshot {
217 let paths = sub.paths();
218 let alias = sub.name().to_string();
219 let fdiv = sub.fdiv();
220 let new_group = req.create_group(paths, &alias, fdiv).await?;
221 if new_group.status != StatusCode::Ok as i32 {
222 return Err(MotorcortexError::Subscription(format!(
223 "resubscribe: create_group('{alias}') failed with status {}",
224 new_group.status
225 )));
226 }
227 results.push((sub.id(), new_group));
228 }
229
230 let (reply_tx, reply_rx) = oneshot::channel();
231 self.send_cmd(SubCmd::ApplyResubscribe {
232 results,
233 reply: reply_tx,
234 })?;
235 await_reply(reply_rx).await?
236 }
237
238 pub async fn unsubscribe(&self, req: &Request, sub: Subscription) -> Result<()> {
246 let id = sub.id();
247 let alias = sub.name().to_string();
248 drop(sub);
249
250 let (reply_tx, reply_rx) = oneshot::channel();
251 self.send_cmd(SubCmd::Unsubscribe {
252 id,
253 reply: reply_tx,
254 })?;
255 await_reply(reply_rx).await??;
256
257 req.remove_group(&alias).await?;
258 Ok(())
259 }
260
261 fn send_cmd(&self, cmd: SubCmd) -> Result<()> {
262 self.tx
263 .send(cmd)
264 .map_err(|_| MotorcortexError::Connection("subscribe driver is gone".into()))
265 }
266}
267
268impl Default for Subscribe {
269 fn default() -> Self {
270 Self::new()
271 }
272}
273
274impl Clone for Subscribe {
275 fn clone(&self) -> Self {
276 Self {
277 tx: self.tx.clone(),
278 state: self.state.clone(),
279 subscriptions: Arc::clone(&self.subscriptions),
280 }
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287
288 #[test]
289 fn new_starts_disconnected() {
290 let sub = Subscribe::new();
291 assert_eq!(*sub.state().borrow(), ConnectionState::Disconnected);
292 }
293
294 #[test]
295 fn clone_shares_the_same_state_watch() {
296 let a = Subscribe::new();
297 let b = a.clone();
298 assert_eq!(*a.state().borrow(), *b.state().borrow());
299 }
300
301 #[test]
302 fn default_is_equivalent_to_new() {
303 let sub = Subscribe::default();
304 assert_eq!(*sub.state().borrow(), ConnectionState::Disconnected);
305 }
306
307 #[test]
308 fn dropping_handle_does_not_panic() {
309 let sub = Subscribe::new();
310 drop(sub);
311 }
312
313 #[tokio::test]
314 async fn disconnect_without_connect_is_ok() {
315 let sub = Subscribe::new();
316 sub.disconnect().await.expect("no-op disconnect must succeed");
317 }
318}