subscribe_stream/
subscribe_stream.rs1use std::env;
11use std::time::Duration;
12
13use futures::StreamExt;
14use motorcortex_rust::core::{Missed, Request, Subscribe};
15use motorcortex_rust::{ConnectionOptions, Result, parse_url};
16
17#[tokio::main]
18async fn main() -> Result<()> {
19 let mut args = env::args().skip(1);
20 let url = args.next().unwrap_or_else(|| "wss://127.0.0.1:5568:5567".into());
21 let cert = args.next().unwrap_or_else(|| "tests/mcx.cert.crt".into());
22 let user = args.next().unwrap_or_else(|| "root".into());
23 let pass = args.next().unwrap_or_default();
24 let path = args
25 .next()
26 .unwrap_or_else(|| "root/Control/dummyDouble".into());
27
28 let (req_url, sub_url) = parse_url(&url)?;
29 let opts = ConnectionOptions::new(cert, 5_000, 5_000);
30 let req = Request::connect_to(&req_url, opts.clone()).await?;
31 req.login(&user, &pass).await?;
32 req.request_parameter_tree().await?;
33 let sub = Subscribe::connect_to(&sub_url, opts).await?;
34 let subscription = sub.subscribe(&req, [&path[..]], "example-stream", 10).await?;
35
36 let mut stream = Box::pin(subscription.stream::<f64>(256));
39 let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
40 let mut seen = 0;
41 while seen < 20 {
42 match tokio::time::timeout_at(deadline, stream.next()).await {
43 Ok(Some(Ok((_ts, v)))) => {
44 println!("{path} = {v}");
45 seen += 1;
46 }
47 Ok(Some(Err(Missed(n)))) => eprintln!("lagged, dropped {n} samples"),
48 Ok(None) | Err(_) => break,
49 }
50 }
51
52 sub.unsubscribe(&req, subscription).await?;
53 sub.disconnect().await?;
54 req.disconnect().await?;
55 Ok(())
56}