pub struct Subscription { /* private fields */ }Expand description
Async-side subscription handle. Returned by
crate::core::Subscribe::subscribe.
Implementations§
Source§impl Subscription
impl Subscription
Sourcepub fn id(&self) -> u32
pub fn id(&self) -> u32
Group id most recently assigned by the server. Updated by
Subscribe::resubscribe if the server hands out a fresh id.
Sourcepub fn name(&self) -> &str
pub fn name(&self) -> &str
Group alias (the name passed to subscribe). Fixed for the
life of the subscription.
Sourcepub fn paths(&self) -> Vec<String>
pub fn paths(&self) -> Vec<String>
Parameter paths the subscription was created against, in order. Extracted from the current server layout; on resubscribe the server may hand back fresh offsets, but paths themselves don’t change.
Sourcepub fn notify<F>(&self, cb: F)
pub fn notify<F>(&self, cb: F)
Install a fire-and-forget callback invoked every time a new payload arrives. The callback runs on the subscribe receive thread — don’t block it.
subscription.notify(|s| {
if let Some((_ts, value)) = s.read::<f64>() {
println!("got {value}");
}
});Sourcepub fn read<V>(&self) -> Option<(TimeSpec, V)>where
V: GetParameterTuple,
pub fn read<V>(&self) -> Option<(TimeSpec, V)>where
V: GetParameterTuple,
Decode the most recent payload into a tuple matching the
subscription’s parameter shape. Returns None if no payload
has arrived yet.
Sourcepub fn read_all<V>(&self) -> Option<(TimeSpec, Vec<V>)>where
V: GetParameterValue + Default,
pub fn read_all<V>(&self) -> Option<(TimeSpec, Vec<V>)>where
V: GetParameterValue + Default,
Decode the most recent payload into a flat Vec<V> — every
scalar element of every subscribed parameter, in order.
Sourcepub async fn latest<V>(&self) -> Result<(TimeSpec, V)>where
V: GetParameterTuple,
pub async fn latest<V>(&self) -> Result<(TimeSpec, V)>where
V: GetParameterTuple,
Await the latest payload. Resolves immediately if any payload has already arrived; otherwise waits for the first one.
The underlying channel is lossy by design — if many payloads
arrive between calls, you only see the most recent. For “give
me every sample” semantics, see stream.
let (_ts, value): (_, f64) = subscription.latest().await?;
println!("latest = {value}");Examples found in repository?
19async fn main() -> Result<()> {
20 let mut args = env::args().skip(1);
21 let url = args.next().unwrap_or_else(|| "wss://127.0.0.1:5568:5567".into());
22 let cert = args.next().unwrap_or_else(|| "tests/mcx.cert.crt".into());
23 let user = args.next().unwrap_or_else(|| "root".into());
24 let pass = args.next().unwrap_or_default();
25 let path = args
26 .next()
27 .unwrap_or_else(|| "root/Control/dummyDouble".into());
28
29 let (req_url, sub_url) = parse_url(&url)?;
30 let opts = ConnectionOptions::new(cert, 5_000, 5_000);
31 let req = Request::connect_to(&req_url, opts.clone()).await?;
32 req.login(&user, &pass).await?;
33 req.request_parameter_tree().await?;
34 let sub = Subscribe::connect_to(&sub_url, opts).await?;
35 let subscription = sub.subscribe(&req, [&path[..]], "example-latest", 100).await?;
36
37 // Sample the latest value every second for 5 seconds.
38 for _ in 0..5 {
39 let (_ts, value): (_, f64) = subscription.latest().await?;
40 println!("{path} = {value}");
41 tokio::time::sleep(Duration::from_secs(1)).await;
42 }
43
44 sub.unsubscribe(&req, subscription).await?;
45 sub.disconnect().await?;
46 req.disconnect().await?;
47 Ok(())
48}Sourcepub fn stream<V>(
&self,
capacity: usize,
) -> impl Stream<Item = StreamResult<V>> + use<V>where
V: GetParameterTuple + Send + 'static,
pub fn stream<V>(
&self,
capacity: usize,
) -> impl Stream<Item = StreamResult<V>> + use<V>where
V: GetParameterTuple + Send + 'static,
Subscribe to every payload via a bounded ring buffer.
use futures::StreamExt;
use motorcortex_rust::core::Missed;
let mut stream = Box::pin(subscription.stream::<f64>(256));
while let Some(item) = stream.next().await {
match item {
Ok((_ts, v)) => println!("{v}"),
Err(Missed(n)) => eprintln!("dropped {n} samples"),
}
}The capacity is the number of in-flight samples the buffer
can hold; if a consumer falls behind by more than that, the
next item is Err(Missed(n)) with n = missed samples, and
the consumer can decide whether to catch up or bail. This is
explicit back-pressure, as opposed to latest which is
lossy by design.
The broadcast channel is created lazily on the first call —
subscriptions with only notify / read / latest users
pay nothing for it. Subsequent calls reuse the existing
broadcast; the capacity argument is honoured only on the
first call.
Examples found in repository?
18async fn main() -> Result<()> {
19 let mut args = env::args().skip(1);
20 let url = args.next().unwrap_or_else(|| "wss://127.0.0.1:5568:5567".into());
21 let cert = args.next().unwrap_or_else(|| "tests/mcx.cert.crt".into());
22 let user = args.next().unwrap_or_else(|| "root".into());
23 let pass = args.next().unwrap_or_default();
24 let path = args
25 .next()
26 .unwrap_or_else(|| "root/Control/dummyDouble".into());
27
28 let (req_url, sub_url) = parse_url(&url)?;
29 let opts = ConnectionOptions::new(cert, 5_000, 5_000);
30 let req = Request::connect_to(&req_url, opts.clone()).await?;
31 req.login(&user, &pass).await?;
32 req.request_parameter_tree().await?;
33 let sub = Subscribe::connect_to(&sub_url, opts).await?;
34 let subscription = sub.subscribe(&req, [&path[..]], "example-stream", 10).await?;
35
36 // Read 20 samples from the ring. Ring size 256 means we tolerate
37 // a ~256-sample consumer stall before seeing Missed.
38 let mut stream = Box::pin(subscription.stream::<f64>(256));
39 let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
40 let mut seen = 0;
41 while seen < 20 {
42 match tokio::time::timeout_at(deadline, stream.next()).await {
43 Ok(Some(Ok((_ts, v)))) => {
44 println!("{path} = {v}");
45 seen += 1;
46 }
47 Ok(Some(Err(Missed(n)))) => eprintln!("lagged, dropped {n} samples"),
48 Ok(None) | Err(_) => break,
49 }
50 }
51
52 sub.unsubscribe(&req, subscription).await?;
53 sub.disconnect().await?;
54 req.disconnect().await?;
55 Ok(())
56}