Skip to main content

subscribe_stream/
subscribe_stream.rs

1//! Subscribe and consume *every* sample via
2//! [`Subscription::stream`] — lossless within the ring capacity,
3//! explicit `Err(Missed(n))` when a consumer falls behind.
4//!
5//! ```bash
6//! cargo run --example subscribe_stream -- \
7//!     wss://127.0.0.1:5568:5567 path/to/mcx.cert.crt root mypassword root/Control/dummyDouble
8//! ```
9
10use std::env;
11use std::time::Duration;
12
13use futures::StreamExt;
14use motorcortex_rust::core::{Missed, Request, Subscribe};
15use motorcortex_rust::{ConnectionOptions, Result, parse_url};
16
17#[tokio::main]
18async fn main() -> Result<()> {
19    let mut args = env::args().skip(1);
20    let url = args.next().unwrap_or_else(|| "wss://127.0.0.1:5568:5567".into());
21    let cert = args.next().unwrap_or_else(|| "tests/mcx.cert.crt".into());
22    let user = args.next().unwrap_or_else(|| "root".into());
23    let pass = args.next().unwrap_or_default();
24    let path = args
25        .next()
26        .unwrap_or_else(|| "root/Control/dummyDouble".into());
27
28    let (req_url, sub_url) = parse_url(&url)?;
29    let opts = ConnectionOptions::new(cert, 5_000, 5_000);
30    let req = Request::connect_to(&req_url, opts.clone()).await?;
31    req.login(&user, &pass).await?;
32    req.request_parameter_tree().await?;
33    let sub = Subscribe::connect_to(&sub_url, opts).await?;
34    let subscription = sub.subscribe(&req, [&path[..]], "example-stream", 10).await?;
35
36    // Read 20 samples from the ring. Ring size 256 means we tolerate
37    // a ~256-sample consumer stall before seeing Missed.
38    let mut stream = Box::pin(subscription.stream::<f64>(256));
39    let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
40    let mut seen = 0;
41    while seen < 20 {
42        match tokio::time::timeout_at(deadline, stream.next()).await {
43            Ok(Some(Ok((_ts, v)))) => {
44                println!("{path} = {v}");
45                seen += 1;
46            }
47            Ok(Some(Err(Missed(n)))) => eprintln!("lagged, dropped {n} samples"),
48            Ok(None) | Err(_) => break,
49        }
50    }
51
52    sub.unsubscribe(&req, subscription).await?;
53    sub.disconnect().await?;
54    req.disconnect().await?;
55    Ok(())
56}