pub struct Request { /* private fields */ }Expand description
Async handle for request/reply RPCs.
Cloning a Request gives a second handle that multiplexes onto the
same driver thread — commands serialise through the driver, so the
NNG Req/Rep ordering invariant is enforced in the type system
without any user-visible Mutex.
Implementations§
Source§impl Request
impl Request
Sourcepub fn new() -> Self
pub fn new() -> Self
Create a new handle and spawn its driver thread.
The handle starts in ConnectionState::Disconnected; call
connect to open the socket.
use motorcortex_rust::core::{ConnectionState, Request};
let req = Request::new();
assert_eq!(*req.state().borrow(), ConnectionState::Disconnected);Sourcepub fn state(&self) -> Receiver<ConnectionState>
pub fn state(&self) -> Receiver<ConnectionState>
Subscribe to connection-state transitions.
Returns a watch::Receiver; consumers can state.changed().await
or *state.borrow() for the current value.
Sourcepub async fn connect(&self, url: &str, opts: ConnectionOptions) -> Result<()>
pub async fn connect(&self, url: &str, opts: ConnectionOptions) -> Result<()>
Open the socket and dial url.
use motorcortex_rust::{ConnectionOptions, core::Request};
let req = Request::new();
let opts = ConnectionOptions::new("mcx.cert.crt".into(), 1000, 1000);
req.connect("wss://127.0.0.1:5568", opts).await?;Sourcepub async fn disconnect(&self) -> Result<()>
pub async fn disconnect(&self) -> Result<()>
Close the socket. Subsequent RPCs will error with
MotorcortexError::Connection until connect is called again.
Examples found in repository?
15async fn main() -> Result<()> {
16 let mut args = env::args().skip(1);
17 let url = args.next().unwrap_or_else(|| "wss://127.0.0.1:5568:5567".into());
18 let cert = args.next().unwrap_or_else(|| "tests/mcx.cert.crt".into());
19 let user = args.next().unwrap_or_else(|| "root".into());
20 let pass = args.next().unwrap_or_default();
21 let path = args
22 .next()
23 .unwrap_or_else(|| "root/Control/dummyDouble".into());
24
25 let (req_url, _sub_url) = parse_url(&url)?;
26 let opts = ConnectionOptions::new(cert, 5_000, 5_000);
27 let req = Request::connect_to(&req_url, opts).await?;
28 req.login(&user, &pass).await?;
29 req.request_parameter_tree().await?;
30
31 let value: f64 = req.get_parameter(&path).await?;
32 println!("{path} = {value}");
33
34 req.disconnect().await?;
35 Ok(())
36}More examples
19async fn main() -> Result<()> {
20 let mut args = env::args().skip(1);
21 let url = args.next().unwrap_or_else(|| "wss://127.0.0.1:5568:5567".into());
22 let cert = args.next().unwrap_or_else(|| "tests/mcx.cert.crt".into());
23 let user = args.next().unwrap_or_else(|| "root".into());
24 let pass = args.next().unwrap_or_default();
25 let path = args
26 .next()
27 .unwrap_or_else(|| "root/Control/dummyDouble".into());
28
29 let (req_url, sub_url) = parse_url(&url)?;
30 let opts = ConnectionOptions::new(cert, 5_000, 5_000);
31 let req = Request::connect_to(&req_url, opts.clone()).await?;
32 req.login(&user, &pass).await?;
33 req.request_parameter_tree().await?;
34 let sub = Subscribe::connect_to(&sub_url, opts).await?;
35 let subscription = sub.subscribe(&req, [&path[..]], "example-latest", 100).await?;
36
37 // Sample the latest value every second for 5 seconds.
38 for _ in 0..5 {
39 let (_ts, value): (_, f64) = subscription.latest().await?;
40 println!("{path} = {value}");
41 tokio::time::sleep(Duration::from_secs(1)).await;
42 }
43
44 sub.unsubscribe(&req, subscription).await?;
45 sub.disconnect().await?;
46 req.disconnect().await?;
47 Ok(())
48}18async fn main() -> Result<()> {
19 let mut args = env::args().skip(1);
20 let url = args.next().unwrap_or_else(|| "wss://127.0.0.1:5568:5567".into());
21 let cert = args.next().unwrap_or_else(|| "tests/mcx.cert.crt".into());
22 let user = args.next().unwrap_or_else(|| "root".into());
23 let pass = args.next().unwrap_or_default();
24 let path = args
25 .next()
26 .unwrap_or_else(|| "root/Control/dummyDouble".into());
27
28 let (req_url, sub_url) = parse_url(&url)?;
29 let opts = ConnectionOptions::new(cert, 5_000, 5_000);
30 let req = Request::connect_to(&req_url, opts.clone()).await?;
31 req.login(&user, &pass).await?;
32 req.request_parameter_tree().await?;
33 let sub = Subscribe::connect_to(&sub_url, opts).await?;
34 let subscription = sub.subscribe(&req, [&path[..]], "example-stream", 10).await?;
35
36 // Read 20 samples from the ring. Ring size 256 means we tolerate
37 // a ~256-sample consumer stall before seeing Missed.
38 let mut stream = Box::pin(subscription.stream::<f64>(256));
39 let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
40 let mut seen = 0;
41 while seen < 20 {
42 match tokio::time::timeout_at(deadline, stream.next()).await {
43 Ok(Some(Ok((_ts, v)))) => {
44 println!("{path} = {v}");
45 seen += 1;
46 }
47 Ok(Some(Err(Missed(n)))) => eprintln!("lagged, dropped {n} samples"),
48 Ok(None) | Err(_) => break,
49 }
50 }
51
52 sub.unsubscribe(&req, subscription).await?;
53 sub.disconnect().await?;
54 req.disconnect().await?;
55 Ok(())
56}Sourcepub async fn login(&self, user: &str, pass: &str) -> Result<StatusCode>
pub async fn login(&self, user: &str, pass: &str) -> Result<StatusCode>
Authenticate with the server.
Returns the server’s StatusCode — Ok on success,
WrongPassword / other variants on rejection. Transport-level
failures surface as MotorcortexError.
Examples found in repository?
15async fn main() -> Result<()> {
16 let mut args = env::args().skip(1);
17 let url = args.next().unwrap_or_else(|| "wss://127.0.0.1:5568:5567".into());
18 let cert = args.next().unwrap_or_else(|| "tests/mcx.cert.crt".into());
19 let user = args.next().unwrap_or_else(|| "root".into());
20 let pass = args.next().unwrap_or_default();
21 let path = args
22 .next()
23 .unwrap_or_else(|| "root/Control/dummyDouble".into());
24
25 let (req_url, _sub_url) = parse_url(&url)?;
26 let opts = ConnectionOptions::new(cert, 5_000, 5_000);
27 let req = Request::connect_to(&req_url, opts).await?;
28 req.login(&user, &pass).await?;
29 req.request_parameter_tree().await?;
30
31 let value: f64 = req.get_parameter(&path).await?;
32 println!("{path} = {value}");
33
34 req.disconnect().await?;
35 Ok(())
36}More examples
19async fn main() -> Result<()> {
20 let mut args = env::args().skip(1);
21 let url = args.next().unwrap_or_else(|| "wss://127.0.0.1:5568:5567".into());
22 let cert = args.next().unwrap_or_else(|| "tests/mcx.cert.crt".into());
23 let user = args.next().unwrap_or_else(|| "root".into());
24 let pass = args.next().unwrap_or_default();
25 let path = args
26 .next()
27 .unwrap_or_else(|| "root/Control/dummyDouble".into());
28
29 let (req_url, sub_url) = parse_url(&url)?;
30 let opts = ConnectionOptions::new(cert, 5_000, 5_000);
31 let req = Request::connect_to(&req_url, opts.clone()).await?;
32 req.login(&user, &pass).await?;
33 req.request_parameter_tree().await?;
34 let sub = Subscribe::connect_to(&sub_url, opts).await?;
35 let subscription = sub.subscribe(&req, [&path[..]], "example-latest", 100).await?;
36
37 // Sample the latest value every second for 5 seconds.
38 for _ in 0..5 {
39 let (_ts, value): (_, f64) = subscription.latest().await?;
40 println!("{path} = {value}");
41 tokio::time::sleep(Duration::from_secs(1)).await;
42 }
43
44 sub.unsubscribe(&req, subscription).await?;
45 sub.disconnect().await?;
46 req.disconnect().await?;
47 Ok(())
48}18async fn main() -> Result<()> {
19 let mut args = env::args().skip(1);
20 let url = args.next().unwrap_or_else(|| "wss://127.0.0.1:5568:5567".into());
21 let cert = args.next().unwrap_or_else(|| "tests/mcx.cert.crt".into());
22 let user = args.next().unwrap_or_else(|| "root".into());
23 let pass = args.next().unwrap_or_default();
24 let path = args
25 .next()
26 .unwrap_or_else(|| "root/Control/dummyDouble".into());
27
28 let (req_url, sub_url) = parse_url(&url)?;
29 let opts = ConnectionOptions::new(cert, 5_000, 5_000);
30 let req = Request::connect_to(&req_url, opts.clone()).await?;
31 req.login(&user, &pass).await?;
32 req.request_parameter_tree().await?;
33 let sub = Subscribe::connect_to(&sub_url, opts).await?;
34 let subscription = sub.subscribe(&req, [&path[..]], "example-stream", 10).await?;
35
36 // Read 20 samples from the ring. Ring size 256 means we tolerate
37 // a ~256-sample consumer stall before seeing Missed.
38 let mut stream = Box::pin(subscription.stream::<f64>(256));
39 let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
40 let mut seen = 0;
41 while seen < 20 {
42 match tokio::time::timeout_at(deadline, stream.next()).await {
43 Ok(Some(Ok((_ts, v)))) => {
44 println!("{path} = {v}");
45 seen += 1;
46 }
47 Ok(Some(Err(Missed(n)))) => eprintln!("lagged, dropped {n} samples"),
48 Ok(None) | Err(_) => break,
49 }
50 }
51
52 sub.unsubscribe(&req, subscription).await?;
53 sub.disconnect().await?;
54 req.disconnect().await?;
55 Ok(())
56}Sourcepub async fn logout(&self) -> Result<StatusCode>
pub async fn logout(&self) -> Result<StatusCode>
Drop the current session on the server.
Sourcepub async fn request_parameter_tree(&self) -> Result<StatusCode>
pub async fn request_parameter_tree(&self) -> Result<StatusCode>
Fetch the parameter tree from the server and store it in the
shared cache accessible via parameter_tree.
Returns the server’s StatusCode. The cache is only updated
on StatusCode::Ok; a non-OK reply leaves the previous cache
intact.
Examples found in repository?
15async fn main() -> Result<()> {
16 let mut args = env::args().skip(1);
17 let url = args.next().unwrap_or_else(|| "wss://127.0.0.1:5568:5567".into());
18 let cert = args.next().unwrap_or_else(|| "tests/mcx.cert.crt".into());
19 let user = args.next().unwrap_or_else(|| "root".into());
20 let pass = args.next().unwrap_or_default();
21 let path = args
22 .next()
23 .unwrap_or_else(|| "root/Control/dummyDouble".into());
24
25 let (req_url, _sub_url) = parse_url(&url)?;
26 let opts = ConnectionOptions::new(cert, 5_000, 5_000);
27 let req = Request::connect_to(&req_url, opts).await?;
28 req.login(&user, &pass).await?;
29 req.request_parameter_tree().await?;
30
31 let value: f64 = req.get_parameter(&path).await?;
32 println!("{path} = {value}");
33
34 req.disconnect().await?;
35 Ok(())
36}More examples
19async fn main() -> Result<()> {
20 let mut args = env::args().skip(1);
21 let url = args.next().unwrap_or_else(|| "wss://127.0.0.1:5568:5567".into());
22 let cert = args.next().unwrap_or_else(|| "tests/mcx.cert.crt".into());
23 let user = args.next().unwrap_or_else(|| "root".into());
24 let pass = args.next().unwrap_or_default();
25 let path = args
26 .next()
27 .unwrap_or_else(|| "root/Control/dummyDouble".into());
28
29 let (req_url, sub_url) = parse_url(&url)?;
30 let opts = ConnectionOptions::new(cert, 5_000, 5_000);
31 let req = Request::connect_to(&req_url, opts.clone()).await?;
32 req.login(&user, &pass).await?;
33 req.request_parameter_tree().await?;
34 let sub = Subscribe::connect_to(&sub_url, opts).await?;
35 let subscription = sub.subscribe(&req, [&path[..]], "example-latest", 100).await?;
36
37 // Sample the latest value every second for 5 seconds.
38 for _ in 0..5 {
39 let (_ts, value): (_, f64) = subscription.latest().await?;
40 println!("{path} = {value}");
41 tokio::time::sleep(Duration::from_secs(1)).await;
42 }
43
44 sub.unsubscribe(&req, subscription).await?;
45 sub.disconnect().await?;
46 req.disconnect().await?;
47 Ok(())
48}18async fn main() -> Result<()> {
19 let mut args = env::args().skip(1);
20 let url = args.next().unwrap_or_else(|| "wss://127.0.0.1:5568:5567".into());
21 let cert = args.next().unwrap_or_else(|| "tests/mcx.cert.crt".into());
22 let user = args.next().unwrap_or_else(|| "root".into());
23 let pass = args.next().unwrap_or_default();
24 let path = args
25 .next()
26 .unwrap_or_else(|| "root/Control/dummyDouble".into());
27
28 let (req_url, sub_url) = parse_url(&url)?;
29 let opts = ConnectionOptions::new(cert, 5_000, 5_000);
30 let req = Request::connect_to(&req_url, opts.clone()).await?;
31 req.login(&user, &pass).await?;
32 req.request_parameter_tree().await?;
33 let sub = Subscribe::connect_to(&sub_url, opts).await?;
34 let subscription = sub.subscribe(&req, [&path[..]], "example-stream", 10).await?;
35
36 // Read 20 samples from the ring. Ring size 256 means we tolerate
37 // a ~256-sample consumer stall before seeing Missed.
38 let mut stream = Box::pin(subscription.stream::<f64>(256));
39 let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
40 let mut seen = 0;
41 while seen < 20 {
42 match tokio::time::timeout_at(deadline, stream.next()).await {
43 Ok(Some(Ok((_ts, v)))) => {
44 println!("{path} = {v}");
45 seen += 1;
46 }
47 Ok(Some(Err(Missed(n)))) => eprintln!("lagged, dropped {n} samples"),
48 Ok(None) | Err(_) => break,
49 }
50 }
51
52 sub.unsubscribe(&req, subscription).await?;
53 sub.disconnect().await?;
54 req.disconnect().await?;
55 Ok(())
56}Sourcepub fn parameter_tree(&self) -> Arc<RwLock<ParameterTree>>
pub fn parameter_tree(&self) -> Arc<RwLock<ParameterTree>>
Shared read handle to the local parameter-tree cache. Every
cloned Request sees the same tree; reads are RwLock::read
(cheap, no channel round-trip). Populated by
request_parameter_tree.
Sourcepub async fn get_parameter<V>(&self, path: &str) -> Result<V>where
V: GetParameterValue + Default,
pub async fn get_parameter<V>(&self, path: &str) -> Result<V>where
V: GetParameterValue + Default,
Read a single parameter. The caller-specified V is the Rust
type the server value should be converted to (see
GetParameterValue).
Returns MotorcortexError::ParameterNotFound if path is
unknown locally — call request_parameter_tree
first.
// Same path, three different Rust types — the server value is
// converted per call (lossy casts allowed).
let as_double: f64 = req.get_parameter("root/Control/dummyDouble").await?;
let as_int: i64 = req.get_parameter("root/Control/dummyDouble").await?;
let as_text: String = req.get_parameter("root/Control/dummyDouble").await?;Examples found in repository?
15async fn main() -> Result<()> {
16 let mut args = env::args().skip(1);
17 let url = args.next().unwrap_or_else(|| "wss://127.0.0.1:5568:5567".into());
18 let cert = args.next().unwrap_or_else(|| "tests/mcx.cert.crt".into());
19 let user = args.next().unwrap_or_else(|| "root".into());
20 let pass = args.next().unwrap_or_default();
21 let path = args
22 .next()
23 .unwrap_or_else(|| "root/Control/dummyDouble".into());
24
25 let (req_url, _sub_url) = parse_url(&url)?;
26 let opts = ConnectionOptions::new(cert, 5_000, 5_000);
27 let req = Request::connect_to(&req_url, opts).await?;
28 req.login(&user, &pass).await?;
29 req.request_parameter_tree().await?;
30
31 let value: f64 = req.get_parameter(&path).await?;
32 println!("{path} = {value}");
33
34 req.disconnect().await?;
35 Ok(())
36}Sourcepub async fn set_parameter<V>(&self, path: &str, value: V) -> Result<StatusCode>where
V: SetParameterValue,
pub async fn set_parameter<V>(&self, path: &str, value: V) -> Result<StatusCode>where
V: SetParameterValue,
Write a single parameter. Returns the server’s StatusCode.
// Scalar.
req.set_parameter("root/Control/dummyDouble", 2.345_f64).await?;
// Fixed-size array.
req.set_parameter("root/Control/dummyDoubleVec", [1.0, 2.0, 3.0]).await?;
// Dynamic Vec.
req.set_parameter("root/Control/dummyDoubleVec", vec![1.0, 2.0]).await?;Sourcepub async fn get_parameters<T>(&self, paths: &[&str]) -> Result<T>where
T: GetParameterTuple,
pub async fn get_parameters<T>(&self, paths: &[&str]) -> Result<T>where
T: GetParameterTuple,
Read a batch of parameters in one RPC. The generic T is a
tuple type like (bool, f64, i32) whose arity matches
paths.len(). Each element is decoded into its position using
the dtype recorded in the local tree cache.
let (b, d, i): (bool, f64, i32) = req.get_parameters(&[
"root/Control/dummyBool",
"root/Control/dummyDouble",
"root/Control/dummyInt32",
]).await?;Sourcepub async fn set_parameters<T>(
&self,
paths: &[&str],
values: T,
) -> Result<StatusCode>where
T: SetParameterTuple,
pub async fn set_parameters<T>(
&self,
paths: &[&str],
values: T,
) -> Result<StatusCode>where
T: SetParameterTuple,
Write a batch of parameters in one RPC. values is a tuple
(or single-element tuple) whose arity matches paths.len().
Each element is encoded against the dtype recorded in the
local tree cache.
Sourcepub async fn create_group<I>(
&self,
paths: I,
alias: &str,
frequency_divider: u32,
) -> Result<GroupStatusMsg>where
I: Parameters,
pub async fn create_group<I>(
&self,
paths: I,
alias: &str,
frequency_divider: u32,
) -> Result<GroupStatusMsg>where
I: Parameters,
Create a server-side subscription group. Returns the
GroupStatusMsg the subscribe-side code uses as the
group descriptor.
paths accepts anything implementing Parameters — a
string literal, a Vec<String>, an array of &str, etc.
Sourcepub async fn remove_group(&self, alias: &str) -> Result<StatusCode>
pub async fn remove_group(&self, alias: &str) -> Result<StatusCode>
Remove a previously-created subscription group by alias.
Returns the server’s StatusCode unchanged — Ok on
success, Failed (or similar) if the group wasn’t there.
Transport / decode failures surface as
MotorcortexError.
The crate-wide rule: RPCs that return Result<StatusCode>
never promote a non-OK server reply into Err. Branch on
the returned code if you care which way the request went.
Sourcepub async fn get_session_token(&self) -> Result<String>
pub async fn get_session_token(&self) -> Result<String>
Fetch a fresh session token from the server.
On success the token is cached in the driver and returned to
the caller. Callers that want to persist a session across
process restarts can stash the returned string and hand it
back to a fresh Request via restore_session.
While a connection is live, the driver also refreshes this
token periodically in the background (see
ConnectionOptions::token_refresh_interval) so the cache
stays warm for the automatic reconnect path.
Sourcepub async fn restore_session(&self, token: &str) -> Result<StatusCode>
pub async fn restore_session(&self, token: &str) -> Result<StatusCode>
Restore a previously-issued session by supplying the token.
Returns the server’s StatusCode:
Ok/ReadOnlyMode— the session was accepted, subsequent RPCs run under that identity.PermissionDenied/Failed— the token is stale or the server has lost its state.
The driver calls this internally on reconnect using the token stashed by the refresh loop; callers generally don’t need to invoke it explicitly unless they’re recovering from a process restart.
Sourcepub fn session_token(&self) -> Option<String>
pub fn session_token(&self) -> Option<String>
Snapshot of the most recently cached session token, if any.
Populated by get_session_token
and by the periodic refresh loop. Returns None before the
first successful fetch.
Sourcepub fn session_refresh_count(&self) -> u64
pub fn session_refresh_count(&self) -> u64
Count of GetSessionToken RPCs the background refresh helper
has fired since this handle was created.
Bumped by the driver only when the refresh tick runs against
a live pipe — ticks that fire while the state is
ConnectionLost / SessionExpired / Disconnected are
skipped and don’t count. Useful for observing that the
refresh loop is paused while the transport is down, or as a
lightweight liveness metric.
Sourcepub async fn get_parameter_tree_hash(&self) -> Result<u32>
pub async fn get_parameter_tree_hash(&self) -> Result<u32>
Fetch the server’s parameter-tree hash — useful for cheap
change detection. A non-zero return on a populated server
signals the caller can skip a full request_parameter_tree
if the hash matches what they cached previously.
Sourcepub async fn connect_to(url: &str, opts: ConnectionOptions) -> Result<Self>
pub async fn connect_to(url: &str, opts: ConnectionOptions) -> Result<Self>
Convenience: Request::new() + connect in
one call. Useful for the “I just want a connected client”
entry path.
use motorcortex_rust::{ConnectionOptions, core::Request};
let opts = ConnectionOptions::new("mcx.cert.crt".into(), 1000, 1000);
let req = Request::connect_to("wss://127.0.0.1:5568", opts).await?;
req.request_parameter_tree().await?;Examples found in repository?
15async fn main() -> Result<()> {
16 let mut args = env::args().skip(1);
17 let url = args.next().unwrap_or_else(|| "wss://127.0.0.1:5568:5567".into());
18 let cert = args.next().unwrap_or_else(|| "tests/mcx.cert.crt".into());
19 let user = args.next().unwrap_or_else(|| "root".into());
20 let pass = args.next().unwrap_or_default();
21 let path = args
22 .next()
23 .unwrap_or_else(|| "root/Control/dummyDouble".into());
24
25 let (req_url, _sub_url) = parse_url(&url)?;
26 let opts = ConnectionOptions::new(cert, 5_000, 5_000);
27 let req = Request::connect_to(&req_url, opts).await?;
28 req.login(&user, &pass).await?;
29 req.request_parameter_tree().await?;
30
31 let value: f64 = req.get_parameter(&path).await?;
32 println!("{path} = {value}");
33
34 req.disconnect().await?;
35 Ok(())
36}More examples
19async fn main() -> Result<()> {
20 let mut args = env::args().skip(1);
21 let url = args.next().unwrap_or_else(|| "wss://127.0.0.1:5568:5567".into());
22 let cert = args.next().unwrap_or_else(|| "tests/mcx.cert.crt".into());
23 let user = args.next().unwrap_or_else(|| "root".into());
24 let pass = args.next().unwrap_or_default();
25 let path = args
26 .next()
27 .unwrap_or_else(|| "root/Control/dummyDouble".into());
28
29 let (req_url, sub_url) = parse_url(&url)?;
30 let opts = ConnectionOptions::new(cert, 5_000, 5_000);
31 let req = Request::connect_to(&req_url, opts.clone()).await?;
32 req.login(&user, &pass).await?;
33 req.request_parameter_tree().await?;
34 let sub = Subscribe::connect_to(&sub_url, opts).await?;
35 let subscription = sub.subscribe(&req, [&path[..]], "example-latest", 100).await?;
36
37 // Sample the latest value every second for 5 seconds.
38 for _ in 0..5 {
39 let (_ts, value): (_, f64) = subscription.latest().await?;
40 println!("{path} = {value}");
41 tokio::time::sleep(Duration::from_secs(1)).await;
42 }
43
44 sub.unsubscribe(&req, subscription).await?;
45 sub.disconnect().await?;
46 req.disconnect().await?;
47 Ok(())
48}18async fn main() -> Result<()> {
19 let mut args = env::args().skip(1);
20 let url = args.next().unwrap_or_else(|| "wss://127.0.0.1:5568:5567".into());
21 let cert = args.next().unwrap_or_else(|| "tests/mcx.cert.crt".into());
22 let user = args.next().unwrap_or_else(|| "root".into());
23 let pass = args.next().unwrap_or_default();
24 let path = args
25 .next()
26 .unwrap_or_else(|| "root/Control/dummyDouble".into());
27
28 let (req_url, sub_url) = parse_url(&url)?;
29 let opts = ConnectionOptions::new(cert, 5_000, 5_000);
30 let req = Request::connect_to(&req_url, opts.clone()).await?;
31 req.login(&user, &pass).await?;
32 req.request_parameter_tree().await?;
33 let sub = Subscribe::connect_to(&sub_url, opts).await?;
34 let subscription = sub.subscribe(&req, [&path[..]], "example-stream", 10).await?;
35
36 // Read 20 samples from the ring. Ring size 256 means we tolerate
37 // a ~256-sample consumer stall before seeing Missed.
38 let mut stream = Box::pin(subscription.stream::<f64>(256));
39 let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
40 let mut seen = 0;
41 while seen < 20 {
42 match tokio::time::timeout_at(deadline, stream.next()).await {
43 Ok(Some(Ok((_ts, v)))) => {
44 println!("{path} = {v}");
45 seen += 1;
46 }
47 Ok(Some(Err(Missed(n)))) => eprintln!("lagged, dropped {n} samples"),
48 Ok(None) | Err(_) => break,
49 }
50 }
51
52 sub.unsubscribe(&req, subscription).await?;
53 sub.disconnect().await?;
54 req.disconnect().await?;
55 Ok(())
56}