pub struct ConnectionOptions {
pub certificate: String,
pub conn_timeout_ms: u32,
pub io_timeout_ms: u32,
pub reconnect: bool,
pub reconnect_min: Duration,
pub reconnect_max: Duration,
pub token_refresh_interval: Duration,
pub max_reconnect_attempts: Option<u32>,
}Expand description
Options used to configure the connection settings.
Fields§
§certificate: StringPath to the TLS certificate used for secure communication.
conn_timeout_ms: u32Connection timeout in milliseconds.
io_timeout_ms: u32I/O timeout in milliseconds.
reconnect: boolWhether NNG should auto-redial the transport on drop. When
true (default), NNG_OPT_RECONNMINT / NNG_OPT_RECONNMAXT
are set on the dialer and the driver’s session-restore logic
kicks in on the next pipe ADD event. When false, dropped
transports stay Disconnected and the user must call
connect again explicitly.
reconnect_min: DurationMin backoff for NNG’s dialer redial (NNG_OPT_RECONNMINT).
Ignored when reconnect == false.
reconnect_max: DurationMax backoff ceiling for NNG’s dialer redial
(NNG_OPT_RECONNMAXT). Ignored when reconnect == false.
token_refresh_interval: DurationHow often to refresh the session token on a live connection.
Each refresh is a GetSessionToken RPC; the fresh token is
stashed in the driver for restore_session to use after a
drop. Default 30 s matches motorcortex-python. Set to
Duration::ZERO to disable.
max_reconnect_attempts: Option<u32>Safety net for the automatic reconnect path. When Some(n),
the driver counts consecutive
ADD_POST → RestoreSession(token) → non-Ok cycles and, on
the n-th failure, disables NNG’s dialer redial and
publishes ConnectionState::Disconnected. A successful
restore (or a reconnect that skipped restore because no
token was cached) resets the counter. None (default) means
the driver keeps trying indefinitely, matching NNG’s
built-in behaviour.
Implementations§
Source§impl ConnectionOptions
impl ConnectionOptions
Sourcepub fn new(
certificate: String,
conn_timeout_ms: u32,
io_timeout_ms: u32,
) -> Self
pub fn new( certificate: String, conn_timeout_ms: u32, io_timeout_ms: u32, ) -> Self
Creates a new ConnectionOptions instance with reconnect /
token-refresh defaults. Use the with_* setters to override
the reconnect knobs.
§Arguments:
certificate- Path to the TLS certificate.conn_timeout_ms- Timeout value for connection establishment, in milliseconds.io_timeout_ms- Timeout value for I/O operations, in milliseconds.
Examples found in repository?
15fn main() -> Result<()> {
16 let mut args = env::args().skip(1);
17 let url = args.next().unwrap_or_else(|| "wss://127.0.0.1:5568:5567".into());
18 let cert = args.next().unwrap_or_else(|| "tests/mcx.cert.crt".into());
19 let user = args.next().unwrap_or_else(|| "root".into());
20 let pass = args.next().unwrap_or_default();
21 let path = args
22 .next()
23 .unwrap_or_else(|| "root/Control/dummyDouble".into());
24
25 let (req_url, _sub_url) = parse_url(&url)?;
26 let opts = ConnectionOptions::new(cert, 5_000, 5_000);
27 let req = Request::connect_to(&req_url, opts)?;
28 req.login(&user, &pass)?;
29 req.request_parameter_tree()?;
30
31 let value: f64 = req.get_parameter(&path)?;
32 println!("{path} = {value}");
33
34 req.disconnect()?;
35 Ok(())
36}More examples
15async fn main() -> Result<()> {
16 let mut args = env::args().skip(1);
17 let url = args.next().unwrap_or_else(|| "wss://127.0.0.1:5568:5567".into());
18 let cert = args.next().unwrap_or_else(|| "tests/mcx.cert.crt".into());
19 let user = args.next().unwrap_or_else(|| "root".into());
20 let pass = args.next().unwrap_or_default();
21 let path = args
22 .next()
23 .unwrap_or_else(|| "root/Control/dummyDouble".into());
24
25 let (req_url, _sub_url) = parse_url(&url)?;
26 let opts = ConnectionOptions::new(cert, 5_000, 5_000);
27 let req = Request::connect_to(&req_url, opts).await?;
28 req.login(&user, &pass).await?;
29 req.request_parameter_tree().await?;
30
31 let value: f64 = req.get_parameter(&path).await?;
32 println!("{path} = {value}");
33
34 req.disconnect().await?;
35 Ok(())
36}19async fn main() -> Result<()> {
20 let mut args = env::args().skip(1);
21 let url = args.next().unwrap_or_else(|| "wss://127.0.0.1:5568:5567".into());
22 let cert = args.next().unwrap_or_else(|| "tests/mcx.cert.crt".into());
23 let user = args.next().unwrap_or_else(|| "root".into());
24 let pass = args.next().unwrap_or_default();
25 let path = args
26 .next()
27 .unwrap_or_else(|| "root/Control/dummyDouble".into());
28
29 let (req_url, sub_url) = parse_url(&url)?;
30 let opts = ConnectionOptions::new(cert, 5_000, 5_000);
31 let req = Request::connect_to(&req_url, opts.clone()).await?;
32 req.login(&user, &pass).await?;
33 req.request_parameter_tree().await?;
34 let sub = Subscribe::connect_to(&sub_url, opts).await?;
35 let subscription = sub.subscribe(&req, [&path[..]], "example-latest", 100).await?;
36
37 // Sample the latest value every second for 5 seconds.
38 for _ in 0..5 {
39 let (_ts, value): (_, f64) = subscription.latest().await?;
40 println!("{path} = {value}");
41 tokio::time::sleep(Duration::from_secs(1)).await;
42 }
43
44 sub.unsubscribe(&req, subscription).await?;
45 sub.disconnect().await?;
46 req.disconnect().await?;
47 Ok(())
48}18async fn main() -> Result<()> {
19 let mut args = env::args().skip(1);
20 let url = args.next().unwrap_or_else(|| "wss://127.0.0.1:5568:5567".into());
21 let cert = args.next().unwrap_or_else(|| "tests/mcx.cert.crt".into());
22 let user = args.next().unwrap_or_else(|| "root".into());
23 let pass = args.next().unwrap_or_default();
24 let path = args
25 .next()
26 .unwrap_or_else(|| "root/Control/dummyDouble".into());
27
28 let (req_url, sub_url) = parse_url(&url)?;
29 let opts = ConnectionOptions::new(cert, 5_000, 5_000);
30 let req = Request::connect_to(&req_url, opts.clone()).await?;
31 req.login(&user, &pass).await?;
32 req.request_parameter_tree().await?;
33 let sub = Subscribe::connect_to(&sub_url, opts).await?;
34 let subscription = sub.subscribe(&req, [&path[..]], "example-stream", 10).await?;
35
36 // Read 20 samples from the ring. Ring size 256 means we tolerate
37 // a ~256-sample consumer stall before seeing Missed.
38 let mut stream = Box::pin(subscription.stream::<f64>(256));
39 let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
40 let mut seen = 0;
41 while seen < 20 {
42 match tokio::time::timeout_at(deadline, stream.next()).await {
43 Ok(Some(Ok((_ts, v)))) => {
44 println!("{path} = {v}");
45 seen += 1;
46 }
47 Ok(Some(Err(Missed(n)))) => eprintln!("lagged, dropped {n} samples"),
48 Ok(None) | Err(_) => break,
49 }
50 }
51
52 sub.unsubscribe(&req, subscription).await?;
53 sub.disconnect().await?;
54 req.disconnect().await?;
55 Ok(())
56}Sourcepub fn with_reconnect(self, enabled: bool) -> Self
pub fn with_reconnect(self, enabled: bool) -> Self
Disable NNG auto-redial + driver session-restore logic.
Sourcepub fn with_reconnect_backoff(self, min: Duration, max: Duration) -> Self
pub fn with_reconnect_backoff(self, min: Duration, max: Duration) -> Self
Override NNG’s dialer backoff window.
Sourcepub fn with_token_refresh_interval(self, interval: Duration) -> Self
pub fn with_token_refresh_interval(self, interval: Duration) -> Self
Override the session-token refresh cadence. Duration::ZERO
disables the background refresh entirely.
Sourcepub fn with_max_reconnect_attempts(self, attempts: Option<u32>) -> Self
pub fn with_max_reconnect_attempts(self, attempts: Option<u32>) -> Self
Cap the number of consecutive RestoreSession failures the
driver tolerates before it disables NNG’s dialer and
publishes ConnectionState::Disconnected. Pass None to
keep the default “retry forever” behaviour.
Trait Implementations§
Source§impl Clone for ConnectionOptions
impl Clone for ConnectionOptions
Source§fn clone(&self) -> ConnectionOptions
fn clone(&self) -> ConnectionOptions
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read more