use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::thread;
use tokio::sync::{mpsc, oneshot, watch};
use crate::client::Parameters;
use crate::connection::{ConnectionOptions, PipeEvent};
use crate::core::driver::run_subscribe_driver;
use crate::core::request::Request;
use crate::core::state::ConnectionState;
use crate::core::subscription::Subscription;
use crate::core::util::await_reply;
use crate::error::{MotorcortexError, Result};
use crate::msg::{GroupStatusMsg, StatusCode};
pub(crate) enum SubCmd {
Connect {
url: String,
opts: ConnectionOptions,
reply: oneshot::Sender<Result<()>>,
},
Disconnect {
reply: oneshot::Sender<Result<()>>,
},
Subscribe {
group_msg: GroupStatusMsg,
fdiv: u32,
reply: oneshot::Sender<Result<Subscription>>,
},
Unsubscribe {
id: u32,
reply: oneshot::Sender<Result<()>>,
},
ApplyResubscribe {
results: Vec<(u32, GroupStatusMsg)>,
reply: oneshot::Sender<Result<()>>,
},
Pipe(PipeEvent),
}
pub struct Subscribe {
tx: mpsc::UnboundedSender<SubCmd>,
state: watch::Receiver<ConnectionState>,
subscriptions: Arc<RwLock<HashMap<u32, Subscription>>>,
}
impl Subscribe {
pub fn new() -> Self {
let (tx, rx) = mpsc::unbounded_channel();
let (state_tx, state_rx) = watch::channel(ConnectionState::Disconnected);
let subscriptions: Arc<RwLock<HashMap<u32, Subscription>>> =
Arc::new(RwLock::new(HashMap::new()));
let subs_for_driver = Arc::clone(&subscriptions);
let tx_for_driver = tx.clone();
thread::Builder::new()
.name("mcx-subscribe-driver".into())
.spawn(move || {
run_subscribe_driver(tx_for_driver, rx, state_tx, subs_for_driver)
})
.expect("spawning the subscribe driver must succeed on any OS we target");
Self {
tx,
state: state_rx,
subscriptions,
}
}
pub fn state(&self) -> watch::Receiver<ConnectionState> {
self.state.clone()
}
pub async fn connect(&self, url: &str, opts: ConnectionOptions) -> Result<()> {
let (reply_tx, reply_rx) = oneshot::channel();
self.send_cmd(SubCmd::Connect {
url: url.to_string(),
opts,
reply: reply_tx,
})?;
await_reply(reply_rx).await?
}
pub async fn disconnect(&self) -> Result<()> {
let (reply_tx, reply_rx) = oneshot::channel();
self.send_cmd(SubCmd::Disconnect { reply: reply_tx })?;
await_reply(reply_rx).await?
}
pub async fn connect_to(url: &str, opts: ConnectionOptions) -> Result<Self> {
let sub = Self::new();
sub.connect(url, opts).await?;
Ok(sub)
}
pub async fn subscribe<I>(
&self,
req: &Request,
paths: I,
alias: &str,
fdiv: u32,
) -> Result<Subscription>
where
I: Parameters,
{
let group_msg = req.create_group(paths, alias, fdiv).await?;
if group_msg.status != StatusCode::Ok as i32 {
return Err(MotorcortexError::Subscription(format!(
"Failed to create group '{alias}' on server, status: {}",
group_msg.status
)));
}
let (reply_tx, reply_rx) = oneshot::channel();
self.send_cmd(SubCmd::Subscribe {
group_msg,
fdiv,
reply: reply_tx,
})?;
await_reply(reply_rx).await?
}
pub async fn resubscribe(&self, req: &Request) -> Result<()> {
let snapshot: Vec<Subscription> = self
.subscriptions
.read()
.map_err(|_| MotorcortexError::Subscription("subscriptions lock poisoned".into()))?
.values()
.cloned()
.collect();
let mut results = Vec::with_capacity(snapshot.len());
for sub in snapshot {
let paths = sub.paths();
let alias = sub.name().to_string();
let fdiv = sub.fdiv();
let new_group = req.create_group(paths, &alias, fdiv).await?;
if new_group.status != StatusCode::Ok as i32 {
return Err(MotorcortexError::Subscription(format!(
"resubscribe: create_group('{alias}') failed with status {}",
new_group.status
)));
}
results.push((sub.id(), new_group));
}
let (reply_tx, reply_rx) = oneshot::channel();
self.send_cmd(SubCmd::ApplyResubscribe {
results,
reply: reply_tx,
})?;
await_reply(reply_rx).await?
}
pub async fn unsubscribe(&self, req: &Request, sub: Subscription) -> Result<()> {
let id = sub.id();
let alias = sub.name().to_string();
drop(sub);
let (reply_tx, reply_rx) = oneshot::channel();
self.send_cmd(SubCmd::Unsubscribe {
id,
reply: reply_tx,
})?;
await_reply(reply_rx).await??;
req.remove_group(&alias).await?;
Ok(())
}
fn send_cmd(&self, cmd: SubCmd) -> Result<()> {
self.tx
.send(cmd)
.map_err(|_| MotorcortexError::Connection("subscribe driver is gone".into()))
}
}
impl Default for Subscribe {
fn default() -> Self {
Self::new()
}
}
impl Clone for Subscribe {
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
state: self.state.clone(),
subscriptions: Arc::clone(&self.subscriptions),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn new_starts_disconnected() {
let sub = Subscribe::new();
assert_eq!(*sub.state().borrow(), ConnectionState::Disconnected);
}
#[test]
fn clone_shares_the_same_state_watch() {
let a = Subscribe::new();
let b = a.clone();
assert_eq!(*a.state().borrow(), *b.state().borrow());
}
#[test]
fn default_is_equivalent_to_new() {
let sub = Subscribe::default();
assert_eq!(*sub.state().borrow(), ConnectionState::Disconnected);
}
#[test]
fn dropping_handle_does_not_panic() {
let sub = Subscribe::new();
drop(sub);
}
#[tokio::test]
async fn disconnect_without_connect_is_ok() {
let sub = Subscribe::new();
sub.disconnect().await.expect("no-op disconnect must succeed");
}
}