Skip to main content

Subscribe

Struct Subscribe 

Source
pub struct Subscribe { /* private fields */ }
Expand description

Async handle for the pub/sub channel.

Cloning gives you another handle backed by the same driver, so every clone observes the same connection state.

Implementations§

Source§

impl Subscribe

Source

pub fn new() -> Self

Create a new handle and spawn its driver thread.

Source

pub fn state(&self) -> Receiver<ConnectionState>

Source

pub async fn connect(&self, url: &str, opts: ConnectionOptions) -> Result<()>

Source

pub async fn disconnect(&self) -> Result<()>

Examples found in repository?
examples/subscribe_latest.rs (line 45)
19async fn main() -> Result<()> {
20    let mut args = env::args().skip(1);
21    let url = args.next().unwrap_or_else(|| "wss://127.0.0.1:5568:5567".into());
22    let cert = args.next().unwrap_or_else(|| "tests/mcx.cert.crt".into());
23    let user = args.next().unwrap_or_else(|| "root".into());
24    let pass = args.next().unwrap_or_default();
25    let path = args
26        .next()
27        .unwrap_or_else(|| "root/Control/dummyDouble".into());
28
29    let (req_url, sub_url) = parse_url(&url)?;
30    let opts = ConnectionOptions::new(cert, 5_000, 5_000);
31    let req = Request::connect_to(&req_url, opts.clone()).await?;
32    req.login(&user, &pass).await?;
33    req.request_parameter_tree().await?;
34    let sub = Subscribe::connect_to(&sub_url, opts).await?;
35    let subscription = sub.subscribe(&req, [&path[..]], "example-latest", 100).await?;
36
37    // Sample the latest value every second for 5 seconds.
38    for _ in 0..5 {
39        let (_ts, value): (_, f64) = subscription.latest().await?;
40        println!("{path} = {value}");
41        tokio::time::sleep(Duration::from_secs(1)).await;
42    }
43
44    sub.unsubscribe(&req, subscription).await?;
45    sub.disconnect().await?;
46    req.disconnect().await?;
47    Ok(())
48}
More examples
Hide additional examples
examples/subscribe_stream.rs (line 53)
18async fn main() -> Result<()> {
19    let mut args = env::args().skip(1);
20    let url = args.next().unwrap_or_else(|| "wss://127.0.0.1:5568:5567".into());
21    let cert = args.next().unwrap_or_else(|| "tests/mcx.cert.crt".into());
22    let user = args.next().unwrap_or_else(|| "root".into());
23    let pass = args.next().unwrap_or_default();
24    let path = args
25        .next()
26        .unwrap_or_else(|| "root/Control/dummyDouble".into());
27
28    let (req_url, sub_url) = parse_url(&url)?;
29    let opts = ConnectionOptions::new(cert, 5_000, 5_000);
30    let req = Request::connect_to(&req_url, opts.clone()).await?;
31    req.login(&user, &pass).await?;
32    req.request_parameter_tree().await?;
33    let sub = Subscribe::connect_to(&sub_url, opts).await?;
34    let subscription = sub.subscribe(&req, [&path[..]], "example-stream", 10).await?;
35
36    // Read 20 samples from the ring. Ring size 256 means we tolerate
37    // a ~256-sample consumer stall before seeing Missed.
38    let mut stream = Box::pin(subscription.stream::<f64>(256));
39    let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
40    let mut seen = 0;
41    while seen < 20 {
42        match tokio::time::timeout_at(deadline, stream.next()).await {
43            Ok(Some(Ok((_ts, v)))) => {
44                println!("{path} = {v}");
45                seen += 1;
46            }
47            Ok(Some(Err(Missed(n)))) => eprintln!("lagged, dropped {n} samples"),
48            Ok(None) | Err(_) => break,
49        }
50    }
51
52    sub.unsubscribe(&req, subscription).await?;
53    sub.disconnect().await?;
54    req.disconnect().await?;
55    Ok(())
56}
Source

pub async fn connect_to(url: &str, opts: ConnectionOptions) -> Result<Self>

Convenience: Subscribe::new() + connect in one call.

Examples found in repository?
examples/subscribe_latest.rs (line 34)
19async fn main() -> Result<()> {
20    let mut args = env::args().skip(1);
21    let url = args.next().unwrap_or_else(|| "wss://127.0.0.1:5568:5567".into());
22    let cert = args.next().unwrap_or_else(|| "tests/mcx.cert.crt".into());
23    let user = args.next().unwrap_or_else(|| "root".into());
24    let pass = args.next().unwrap_or_default();
25    let path = args
26        .next()
27        .unwrap_or_else(|| "root/Control/dummyDouble".into());
28
29    let (req_url, sub_url) = parse_url(&url)?;
30    let opts = ConnectionOptions::new(cert, 5_000, 5_000);
31    let req = Request::connect_to(&req_url, opts.clone()).await?;
32    req.login(&user, &pass).await?;
33    req.request_parameter_tree().await?;
34    let sub = Subscribe::connect_to(&sub_url, opts).await?;
35    let subscription = sub.subscribe(&req, [&path[..]], "example-latest", 100).await?;
36
37    // Sample the latest value every second for 5 seconds.
38    for _ in 0..5 {
39        let (_ts, value): (_, f64) = subscription.latest().await?;
40        println!("{path} = {value}");
41        tokio::time::sleep(Duration::from_secs(1)).await;
42    }
43
44    sub.unsubscribe(&req, subscription).await?;
45    sub.disconnect().await?;
46    req.disconnect().await?;
47    Ok(())
48}
More examples
Hide additional examples
examples/subscribe_stream.rs (line 33)
18async fn main() -> Result<()> {
19    let mut args = env::args().skip(1);
20    let url = args.next().unwrap_or_else(|| "wss://127.0.0.1:5568:5567".into());
21    let cert = args.next().unwrap_or_else(|| "tests/mcx.cert.crt".into());
22    let user = args.next().unwrap_or_else(|| "root".into());
23    let pass = args.next().unwrap_or_default();
24    let path = args
25        .next()
26        .unwrap_or_else(|| "root/Control/dummyDouble".into());
27
28    let (req_url, sub_url) = parse_url(&url)?;
29    let opts = ConnectionOptions::new(cert, 5_000, 5_000);
30    let req = Request::connect_to(&req_url, opts.clone()).await?;
31    req.login(&user, &pass).await?;
32    req.request_parameter_tree().await?;
33    let sub = Subscribe::connect_to(&sub_url, opts).await?;
34    let subscription = sub.subscribe(&req, [&path[..]], "example-stream", 10).await?;
35
36    // Read 20 samples from the ring. Ring size 256 means we tolerate
37    // a ~256-sample consumer stall before seeing Missed.
38    let mut stream = Box::pin(subscription.stream::<f64>(256));
39    let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
40    let mut seen = 0;
41    while seen < 20 {
42        match tokio::time::timeout_at(deadline, stream.next()).await {
43            Ok(Some(Ok((_ts, v)))) => {
44                println!("{path} = {v}");
45                seen += 1;
46            }
47            Ok(Some(Err(Missed(n)))) => eprintln!("lagged, dropped {n} samples"),
48            Ok(None) | Err(_) => break,
49        }
50    }
51
52    sub.unsubscribe(&req, subscription).await?;
53    sub.disconnect().await?;
54    req.disconnect().await?;
55    Ok(())
56}
Source

pub async fn subscribe<I>( &self, req: &Request, paths: I, alias: &str, fdiv: u32, ) -> Result<Subscription>
where I: Parameters,

Create a subscription group on the server + wire it up locally so the receive thread starts pushing payloads into the returned Subscription handle.

The req handle is used only to issue the CreateGroupMsg RPC; it’s not held after this call returns.

let subscription = sub.subscribe(
    &req,
    ["root/Control/dummyDouble"],
    "my-group",
    10, // fdiv: every 10th cycle
).await?;
// Now use subscription.notify / latest / stream.
Examples found in repository?
examples/subscribe_latest.rs (line 35)
19async fn main() -> Result<()> {
20    let mut args = env::args().skip(1);
21    let url = args.next().unwrap_or_else(|| "wss://127.0.0.1:5568:5567".into());
22    let cert = args.next().unwrap_or_else(|| "tests/mcx.cert.crt".into());
23    let user = args.next().unwrap_or_else(|| "root".into());
24    let pass = args.next().unwrap_or_default();
25    let path = args
26        .next()
27        .unwrap_or_else(|| "root/Control/dummyDouble".into());
28
29    let (req_url, sub_url) = parse_url(&url)?;
30    let opts = ConnectionOptions::new(cert, 5_000, 5_000);
31    let req = Request::connect_to(&req_url, opts.clone()).await?;
32    req.login(&user, &pass).await?;
33    req.request_parameter_tree().await?;
34    let sub = Subscribe::connect_to(&sub_url, opts).await?;
35    let subscription = sub.subscribe(&req, [&path[..]], "example-latest", 100).await?;
36
37    // Sample the latest value every second for 5 seconds.
38    for _ in 0..5 {
39        let (_ts, value): (_, f64) = subscription.latest().await?;
40        println!("{path} = {value}");
41        tokio::time::sleep(Duration::from_secs(1)).await;
42    }
43
44    sub.unsubscribe(&req, subscription).await?;
45    sub.disconnect().await?;
46    req.disconnect().await?;
47    Ok(())
48}
More examples
Hide additional examples
examples/subscribe_stream.rs (line 34)
18async fn main() -> Result<()> {
19    let mut args = env::args().skip(1);
20    let url = args.next().unwrap_or_else(|| "wss://127.0.0.1:5568:5567".into());
21    let cert = args.next().unwrap_or_else(|| "tests/mcx.cert.crt".into());
22    let user = args.next().unwrap_or_else(|| "root".into());
23    let pass = args.next().unwrap_or_default();
24    let path = args
25        .next()
26        .unwrap_or_else(|| "root/Control/dummyDouble".into());
27
28    let (req_url, sub_url) = parse_url(&url)?;
29    let opts = ConnectionOptions::new(cert, 5_000, 5_000);
30    let req = Request::connect_to(&req_url, opts.clone()).await?;
31    req.login(&user, &pass).await?;
32    req.request_parameter_tree().await?;
33    let sub = Subscribe::connect_to(&sub_url, opts).await?;
34    let subscription = sub.subscribe(&req, [&path[..]], "example-stream", 10).await?;
35
36    // Read 20 samples from the ring. Ring size 256 means we tolerate
37    // a ~256-sample consumer stall before seeing Missed.
38    let mut stream = Box::pin(subscription.stream::<f64>(256));
39    let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
40    let mut seen = 0;
41    while seen < 20 {
42        match tokio::time::timeout_at(deadline, stream.next()).await {
43            Ok(Some(Ok((_ts, v)))) => {
44                println!("{path} = {v}");
45                seen += 1;
46            }
47            Ok(Some(Err(Missed(n)))) => eprintln!("lagged, dropped {n} samples"),
48            Ok(None) | Err(_) => break,
49        }
50    }
51
52    sub.unsubscribe(&req, subscription).await?;
53    sub.disconnect().await?;
54    req.disconnect().await?;
55    Ok(())
56}
Source

pub async fn resubscribe(&self, req: &Request) -> Result<()>

Re-register every active subscription after a server-side session restore.

Iterates the currently-live subscriptions; for each one, calls req.create_group(paths, alias, fdiv) to ask the server for a fresh group descriptor, then hands the (old_id, new_group) pairs to the driver which rebinds each Subscription in place. Outstanding Subscription clones remain valid and resume receiving payloads under the new id.

Typical use: after a ConnectionState::ConnectionLostConnected transition where the server lost its group state (e.g. a process restart). When the session is merely restored via the token flow and groups persisted server-side, the create_group round-trip still succeeds and the rebind is a no-op on the data path.

use motorcortex_rust::ConnectionState;
let mut state = sub.state();
while state.changed().await.is_ok() {
    if *state.borrow() == ConnectionState::Connected {
        sub.resubscribe(&req).await?;
    }
}
Source

pub async fn unsubscribe(&self, req: &Request, sub: Subscription) -> Result<()>

Drop a subscription locally + remove the group on the server. Safe to call with a stale Subscription — if the id isn’t in the active table the local side is a no-op, and the server-side remove_group returns its own StatusCode which we silently ignore here (a stale group is a cleanup success from the caller’s perspective). Transport / decode failures still propagate as Err.

Examples found in repository?
examples/subscribe_latest.rs (line 44)
19async fn main() -> Result<()> {
20    let mut args = env::args().skip(1);
21    let url = args.next().unwrap_or_else(|| "wss://127.0.0.1:5568:5567".into());
22    let cert = args.next().unwrap_or_else(|| "tests/mcx.cert.crt".into());
23    let user = args.next().unwrap_or_else(|| "root".into());
24    let pass = args.next().unwrap_or_default();
25    let path = args
26        .next()
27        .unwrap_or_else(|| "root/Control/dummyDouble".into());
28
29    let (req_url, sub_url) = parse_url(&url)?;
30    let opts = ConnectionOptions::new(cert, 5_000, 5_000);
31    let req = Request::connect_to(&req_url, opts.clone()).await?;
32    req.login(&user, &pass).await?;
33    req.request_parameter_tree().await?;
34    let sub = Subscribe::connect_to(&sub_url, opts).await?;
35    let subscription = sub.subscribe(&req, [&path[..]], "example-latest", 100).await?;
36
37    // Sample the latest value every second for 5 seconds.
38    for _ in 0..5 {
39        let (_ts, value): (_, f64) = subscription.latest().await?;
40        println!("{path} = {value}");
41        tokio::time::sleep(Duration::from_secs(1)).await;
42    }
43
44    sub.unsubscribe(&req, subscription).await?;
45    sub.disconnect().await?;
46    req.disconnect().await?;
47    Ok(())
48}
More examples
Hide additional examples
examples/subscribe_stream.rs (line 52)
18async fn main() -> Result<()> {
19    let mut args = env::args().skip(1);
20    let url = args.next().unwrap_or_else(|| "wss://127.0.0.1:5568:5567".into());
21    let cert = args.next().unwrap_or_else(|| "tests/mcx.cert.crt".into());
22    let user = args.next().unwrap_or_else(|| "root".into());
23    let pass = args.next().unwrap_or_default();
24    let path = args
25        .next()
26        .unwrap_or_else(|| "root/Control/dummyDouble".into());
27
28    let (req_url, sub_url) = parse_url(&url)?;
29    let opts = ConnectionOptions::new(cert, 5_000, 5_000);
30    let req = Request::connect_to(&req_url, opts.clone()).await?;
31    req.login(&user, &pass).await?;
32    req.request_parameter_tree().await?;
33    let sub = Subscribe::connect_to(&sub_url, opts).await?;
34    let subscription = sub.subscribe(&req, [&path[..]], "example-stream", 10).await?;
35
36    // Read 20 samples from the ring. Ring size 256 means we tolerate
37    // a ~256-sample consumer stall before seeing Missed.
38    let mut stream = Box::pin(subscription.stream::<f64>(256));
39    let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
40    let mut seen = 0;
41    while seen < 20 {
42        match tokio::time::timeout_at(deadline, stream.next()).await {
43            Ok(Some(Ok((_ts, v)))) => {
44                println!("{path} = {v}");
45                seen += 1;
46            }
47            Ok(Some(Err(Missed(n)))) => eprintln!("lagged, dropped {n} samples"),
48            Ok(None) | Err(_) => break,
49        }
50    }
51
52    sub.unsubscribe(&req, subscription).await?;
53    sub.disconnect().await?;
54    req.disconnect().await?;
55    Ok(())
56}

Trait Implementations§

Source§

impl Clone for Subscribe

Source§

fn clone(&self) -> Self

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Default for Subscribe

Source§

fn default() -> Self

Returns the “default value” for a type. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.