Skip to main content

Subscription

Struct Subscription 

Source
pub struct Subscription { /* private fields */ }
Expand description

Async-side subscription handle. Returned by crate::core::Subscribe::subscribe.

Implementations§

Source§

impl Subscription

Source

pub fn id(&self) -> u32

Group id most recently assigned by the server. Updated by Subscribe::resubscribe if the server hands out a fresh id.

Source

pub fn name(&self) -> &str

Group alias (the name passed to subscribe). Fixed for the life of the subscription.

Source

pub fn fdiv(&self) -> u32

Frequency divider the subscription was created with.

Source

pub fn paths(&self) -> Vec<String>

Parameter paths the subscription was created against, in order. Extracted from the current server layout; on resubscribe the server may hand back fresh offsets, but paths themselves don’t change.

Source

pub fn notify<F>(&self, cb: F)
where F: Fn(&Subscription) + Send + Sync + 'static,

Install a fire-and-forget callback invoked every time a new payload arrives. The callback runs on the subscribe receive thread — don’t block it.

subscription.notify(|s| {
    if let Some((_ts, value)) = s.read::<f64>() {
        println!("got {value}");
    }
});
Source

pub fn read<V>(&self) -> Option<(TimeSpec, V)>
where V: GetParameterTuple,

Decode the most recent payload into a tuple matching the subscription’s parameter shape. Returns None if no payload has arrived yet.

Source

pub fn read_all<V>(&self) -> Option<(TimeSpec, Vec<V>)>

Decode the most recent payload into a flat Vec<V> — every scalar element of every subscribed parameter, in order.

Source

pub async fn latest<V>(&self) -> Result<(TimeSpec, V)>
where V: GetParameterTuple,

Await the latest payload. Resolves immediately if any payload has already arrived; otherwise waits for the first one.

The underlying channel is lossy by design — if many payloads arrive between calls, you only see the most recent. For “give me every sample” semantics, see stream.

let (_ts, value): (_, f64) = subscription.latest().await?;
println!("latest = {value}");
Examples found in repository?
examples/subscribe_latest.rs (line 39)
19async fn main() -> Result<()> {
20    let mut args = env::args().skip(1);
21    let url = args.next().unwrap_or_else(|| "wss://127.0.0.1:5568:5567".into());
22    let cert = args.next().unwrap_or_else(|| "tests/mcx.cert.crt".into());
23    let user = args.next().unwrap_or_else(|| "root".into());
24    let pass = args.next().unwrap_or_default();
25    let path = args
26        .next()
27        .unwrap_or_else(|| "root/Control/dummyDouble".into());
28
29    let (req_url, sub_url) = parse_url(&url)?;
30    let opts = ConnectionOptions::new(cert, 5_000, 5_000);
31    let req = Request::connect_to(&req_url, opts.clone()).await?;
32    req.login(&user, &pass).await?;
33    req.request_parameter_tree().await?;
34    let sub = Subscribe::connect_to(&sub_url, opts).await?;
35    let subscription = sub.subscribe(&req, [&path[..]], "example-latest", 100).await?;
36
37    // Sample the latest value every second for 5 seconds.
38    for _ in 0..5 {
39        let (_ts, value): (_, f64) = subscription.latest().await?;
40        println!("{path} = {value}");
41        tokio::time::sleep(Duration::from_secs(1)).await;
42    }
43
44    sub.unsubscribe(&req, subscription).await?;
45    sub.disconnect().await?;
46    req.disconnect().await?;
47    Ok(())
48}
Source

pub fn stream<V>( &self, capacity: usize, ) -> impl Stream<Item = StreamResult<V>> + use<V>
where V: GetParameterTuple + Send + 'static,

Subscribe to every payload via a bounded ring buffer.

use futures::StreamExt;
use motorcortex_rust::core::Missed;

let mut stream = Box::pin(subscription.stream::<f64>(256));
while let Some(item) = stream.next().await {
    match item {
        Ok((_ts, v))       => println!("{v}"),
        Err(Missed(n))     => eprintln!("dropped {n} samples"),
    }
}

The capacity is the number of in-flight samples the buffer can hold; if a consumer falls behind by more than that, the next item is Err(Missed(n)) with n = missed samples, and the consumer can decide whether to catch up or bail. This is explicit back-pressure, as opposed to latest which is lossy by design.

The broadcast channel is created lazily on the first call — subscriptions with only notify / read / latest users pay nothing for it. Subsequent calls reuse the existing broadcast; the capacity argument is honoured only on the first call.

Examples found in repository?
examples/subscribe_stream.rs (line 38)
18async fn main() -> Result<()> {
19    let mut args = env::args().skip(1);
20    let url = args.next().unwrap_or_else(|| "wss://127.0.0.1:5568:5567".into());
21    let cert = args.next().unwrap_or_else(|| "tests/mcx.cert.crt".into());
22    let user = args.next().unwrap_or_else(|| "root".into());
23    let pass = args.next().unwrap_or_default();
24    let path = args
25        .next()
26        .unwrap_or_else(|| "root/Control/dummyDouble".into());
27
28    let (req_url, sub_url) = parse_url(&url)?;
29    let opts = ConnectionOptions::new(cert, 5_000, 5_000);
30    let req = Request::connect_to(&req_url, opts.clone()).await?;
31    req.login(&user, &pass).await?;
32    req.request_parameter_tree().await?;
33    let sub = Subscribe::connect_to(&sub_url, opts).await?;
34    let subscription = sub.subscribe(&req, [&path[..]], "example-stream", 10).await?;
35
36    // Read 20 samples from the ring. Ring size 256 means we tolerate
37    // a ~256-sample consumer stall before seeing Missed.
38    let mut stream = Box::pin(subscription.stream::<f64>(256));
39    let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
40    let mut seen = 0;
41    while seen < 20 {
42        match tokio::time::timeout_at(deadline, stream.next()).await {
43            Ok(Some(Ok((_ts, v)))) => {
44                println!("{path} = {v}");
45                seen += 1;
46            }
47            Ok(Some(Err(Missed(n)))) => eprintln!("lagged, dropped {n} samples"),
48            Ok(None) | Err(_) => break,
49        }
50    }
51
52    sub.unsubscribe(&req, subscription).await?;
53    sub.disconnect().await?;
54    req.disconnect().await?;
55    Ok(())
56}

Trait Implementations§

Source§

impl Clone for Subscription

Source§

fn clone(&self) -> Self

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.