use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;
use tokio::sync::{mpsc::UnboundedReceiver, watch};
use crate::client::{ParameterTree, receive_message};
use crate::connection::{ConnectionManager, PipeEvent};
use crate::core::proto::{decode_message, encode_with_hash};
use crate::core::request::Cmd;
use crate::core::state::ConnectionState;
use crate::core::subscribe::SubCmd;
use crate::core::subscription::Subscription;
use crate::error::{MotorcortexError, Result};
use crate::msg::{
CreateGroupMsg, GetParameterListMsg, GetParameterMsg, GetParameterTreeHashMsg,
GetParameterTreeMsg, GetSessionTokenMsg, GroupStatusMsg, LoginMsg, LogoutMsg, ParameterListMsg,
ParameterMsg, ParameterTreeHashMsg, ParameterTreeMsg, RemoveGroupMsg, RestoreSessionMsg,
SessionTokenMsg, SetParameterListMsg, SetParameterMsg, StatusCode, StatusMsg,
};
pub(crate) fn run_request_driver(
self_tx: tokio::sync::mpsc::UnboundedSender<Cmd>,
mut rx: UnboundedReceiver<Cmd>,
state_tx: watch::Sender<ConnectionState>,
tree: Arc<RwLock<ParameterTree>>,
last_token: Arc<RwLock<Option<String>>>,
refresh_count: Arc<AtomicU64>,
) {
let mut conn = ConnectionManager::new();
let mut user_wants_connected = false;
let mut pending_reconnect = false;
let mut reconnect_enabled = true;
let mut max_reconnect_attempts: Option<u32> = None;
let mut consecutive_restore_failures: u32 = 0;
let on_pipe = pipe_handler_for_request(self_tx.clone());
let refresh_stop = Arc::new(AtomicBool::new(false));
let mut refresh_thread: Option<thread::JoinHandle<()>> = None;
while let Some(cmd) = rx.blocking_recv() {
match cmd {
Cmd::Connect { url, opts, reply } => {
let refresh_interval = opts.token_refresh_interval;
let opt_reconnect = opts.reconnect;
let opt_max_attempts = opts.max_reconnect_attempts;
let result = conn.connect(
&url,
opts,
nng_c_sys::nng_req0_open,
Arc::clone(&on_pipe),
);
if result.is_ok() {
user_wants_connected = true;
pending_reconnect = false;
reconnect_enabled = opt_reconnect;
max_reconnect_attempts = opt_max_attempts;
consecutive_restore_failures = 0;
if let Ok(mut g) = last_token.write() {
*g = None;
}
let _ = state_tx.send(ConnectionState::Connected);
if refresh_thread.is_none() && !refresh_interval.is_zero() {
refresh_stop.store(false, Ordering::Relaxed);
let tx_clone = self_tx.clone();
let stop_clone = Arc::clone(&refresh_stop);
refresh_thread = Some(
thread::Builder::new()
.name("mcx-token-refresh".into())
.spawn(move || {
run_token_refresh(tx_clone, stop_clone, refresh_interval);
})
.expect("spawning the token-refresh thread must succeed"),
);
}
} else {
let _ = state_tx.send(ConnectionState::Disconnected);
}
let _ = reply.send(result);
}
Cmd::Disconnect { reply } => {
user_wants_connected = false;
pending_reconnect = false;
refresh_stop.store(true, Ordering::Relaxed);
let result = conn.disconnect();
if let Some(handle) = refresh_thread.take() {
let _ = handle.join();
}
if let Ok(mut g) = last_token.write() {
*g = None;
}
let _ = state_tx.send(ConnectionState::Disconnected);
let _ = reply.send(result);
}
Cmd::Login { user, pass, reply } => {
let result = do_login(&conn, user, pass);
let _ = reply.send(result);
}
Cmd::Logout { reply } => {
let result = do_logout(&conn);
let _ = reply.send(result);
}
Cmd::RequestParameterTree { reply } => {
let result = do_request_parameter_tree(&conn, &tree);
let _ = reply.send(result);
}
Cmd::GetParameter { path, reply } => {
let result = do_get_parameter(&conn, path);
let _ = reply.send(result);
}
Cmd::SetParameter { path, value, reply } => {
let result = do_set_parameter(&conn, path, value);
let _ = reply.send(result);
}
Cmd::GetParameters { msg, reply } => {
let result = do_get_parameters(&conn, msg);
let _ = reply.send(result);
}
Cmd::SetParameters { msg, reply } => {
let result = do_set_parameters(&conn, msg);
let _ = reply.send(result);
}
Cmd::CreateGroup { msg, reply } => {
let result = do_create_group(&conn, msg);
let _ = reply.send(result);
}
Cmd::RemoveGroup { alias, reply } => {
let result = do_remove_group(&conn, alias);
let _ = reply.send(result);
}
Cmd::GetParameterTreeHash { reply } => {
let result = do_get_parameter_tree_hash(&conn);
let _ = reply.send(result);
}
Cmd::GetSessionToken { reply } => {
let result = do_get_session_token(&conn, &last_token);
let _ = reply.send(result);
}
Cmd::RestoreSession { token, reply } => {
let result = do_restore_session(&conn, token);
let _ = reply.send(result);
}
Cmd::RefreshTokenTick => {
if *state_tx.borrow() == ConnectionState::Connected {
let _ = do_get_session_token(&conn, &last_token);
refresh_count.fetch_add(1, Ordering::Relaxed);
}
}
Cmd::Pipe(event) => {
apply_pipe_event_request(
event,
&mut pending_reconnect,
user_wants_connected,
&mut reconnect_enabled,
max_reconnect_attempts,
&mut consecutive_restore_failures,
&state_tx,
&conn,
&last_token,
);
}
}
}
refresh_stop.store(true, Ordering::Relaxed);
if let Some(handle) = refresh_thread.take() {
let _ = handle.join();
}
}
fn pipe_handler_for_request(
tx: tokio::sync::mpsc::UnboundedSender<Cmd>,
) -> crate::connection::PipeEventHandler {
Arc::new(move |event: PipeEvent| {
let _ = tx.send(Cmd::Pipe(event));
})
}
fn pipe_handler_for_subscribe(
tx: tokio::sync::mpsc::UnboundedSender<SubCmd>,
) -> crate::connection::PipeEventHandler {
Arc::new(move |event: PipeEvent| {
let _ = tx.send(SubCmd::Pipe(event));
})
}
fn apply_pipe_event(
event: PipeEvent,
user_wants_connected: bool,
state_tx: &watch::Sender<ConnectionState>,
) {
if !user_wants_connected {
return;
}
let new_state = match event {
PipeEvent::Added => ConnectionState::Connected,
PipeEvent::Removed => ConnectionState::ConnectionLost,
};
let _ = state_tx.send(new_state);
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum RestoreOutcome {
NotAttempted,
Accepted,
Rejected,
}
pub(crate) fn decide_state_after_pipe_event(
event: PipeEvent,
pending_reconnect: bool,
user_wants_connected: bool,
reconnect_enabled: bool,
max_reconnect_attempts: Option<u32>,
consecutive_restore_failures: u32,
restore_outcome: RestoreOutcome,
) -> Option<PipeEventDecision> {
if !user_wants_connected {
return None;
}
match event {
PipeEvent::Removed => {
if reconnect_enabled {
Some(PipeEventDecision {
new_state: ConnectionState::ConnectionLost,
new_pending_reconnect: true,
new_reconnect_enabled: reconnect_enabled,
new_consecutive_failures: consecutive_restore_failures,
disable_dialer: false,
})
} else {
Some(PipeEventDecision {
new_state: ConnectionState::Disconnected,
new_pending_reconnect: false,
new_reconnect_enabled: reconnect_enabled,
new_consecutive_failures: consecutive_restore_failures,
disable_dialer: false,
})
}
}
PipeEvent::Added if pending_reconnect => match restore_outcome {
RestoreOutcome::Accepted | RestoreOutcome::NotAttempted => {
Some(PipeEventDecision {
new_state: ConnectionState::Connected,
new_pending_reconnect: false,
new_reconnect_enabled: reconnect_enabled,
new_consecutive_failures: 0,
disable_dialer: false,
})
}
RestoreOutcome::Rejected => {
let new_failures = consecutive_restore_failures.saturating_add(1);
let cap_hit = matches!(max_reconnect_attempts, Some(cap) if new_failures >= cap);
if cap_hit {
Some(PipeEventDecision {
new_state: ConnectionState::Disconnected,
new_pending_reconnect: false,
new_reconnect_enabled: false,
new_consecutive_failures: new_failures,
disable_dialer: true,
})
} else {
Some(PipeEventDecision {
new_state: ConnectionState::SessionExpired,
new_pending_reconnect: false,
new_reconnect_enabled: reconnect_enabled,
new_consecutive_failures: new_failures,
disable_dialer: false,
})
}
}
},
PipeEvent::Added => Some(PipeEventDecision {
new_state: ConnectionState::Connected,
new_pending_reconnect: false,
new_reconnect_enabled: reconnect_enabled,
new_consecutive_failures: consecutive_restore_failures,
disable_dialer: false,
}),
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct PipeEventDecision {
pub new_state: ConnectionState,
pub new_pending_reconnect: bool,
pub new_reconnect_enabled: bool,
pub new_consecutive_failures: u32,
pub disable_dialer: bool,
}
#[allow(clippy::too_many_arguments)]
fn apply_pipe_event_request(
event: PipeEvent,
pending_reconnect: &mut bool,
user_wants_connected: bool,
reconnect_enabled: &mut bool,
max_reconnect_attempts: Option<u32>,
consecutive_restore_failures: &mut u32,
state_tx: &watch::Sender<ConnectionState>,
conn: &ConnectionManager,
last_token: &Arc<RwLock<Option<String>>>,
) {
let restore_outcome = if matches!(event, PipeEvent::Added) && *pending_reconnect {
match last_token.read().ok().and_then(|g| g.clone()) {
Some(tok) => match do_restore_session(conn, tok) {
Ok(StatusCode::Ok) | Ok(StatusCode::ReadOnlyMode) => RestoreOutcome::Accepted,
_ => RestoreOutcome::Rejected,
},
None => RestoreOutcome::NotAttempted,
}
} else {
RestoreOutcome::NotAttempted
};
let decision = match decide_state_after_pipe_event(
event,
*pending_reconnect,
user_wants_connected,
*reconnect_enabled,
max_reconnect_attempts,
*consecutive_restore_failures,
restore_outcome,
) {
Some(d) => d,
None => return,
};
*pending_reconnect = decision.new_pending_reconnect;
*reconnect_enabled = decision.new_reconnect_enabled;
*consecutive_restore_failures = decision.new_consecutive_failures;
if decision.disable_dialer {
conn.disable_dialer_reconnect();
}
let _ = state_tx.send(decision.new_state);
}
fn run_token_refresh(
tx: tokio::sync::mpsc::UnboundedSender<Cmd>,
stop: Arc<AtomicBool>,
interval: Duration,
) {
let tick = Duration::from_millis(200);
let mut elapsed = Duration::ZERO;
loop {
if stop.load(Ordering::Relaxed) {
break;
}
thread::sleep(tick);
elapsed += tick;
if elapsed >= interval {
elapsed = Duration::ZERO;
if tx.send(Cmd::RefreshTokenTick).is_err() {
break;
}
}
}
}
const ID_BYTE_SIZE: usize = 3;
pub(crate) fn run_subscribe_driver(
self_tx: tokio::sync::mpsc::UnboundedSender<SubCmd>,
mut rx: UnboundedReceiver<SubCmd>,
state_tx: watch::Sender<ConnectionState>,
subscriptions: Arc<RwLock<HashMap<u32, Subscription>>>,
) {
let mut conn = ConnectionManager::new();
let stop = Arc::new(AtomicBool::new(false));
let mut receive_thread: Option<thread::JoinHandle<()>> = None;
let mut user_wants_connected = false;
let on_pipe = pipe_handler_for_subscribe(self_tx);
while let Some(cmd) = rx.blocking_recv() {
match cmd {
SubCmd::Connect { url, opts, reply } => {
let result = conn.connect(
&url,
opts,
nng_c_sys::nng_sub0_open,
Arc::clone(&on_pipe),
);
if result.is_ok() {
user_wants_connected = true;
stop.store(false, Ordering::Relaxed);
let sock = conn.sock.expect("connect succeeded → sock is Some");
let subs = Arc::clone(&subscriptions);
let stop_signal = Arc::clone(&stop);
receive_thread = Some(
thread::Builder::new()
.name("mcx-subscribe-receive".into())
.spawn(move || run_subscribe_receive(sock, subs, stop_signal))
.expect("spawning the subscribe receive thread must succeed"),
);
let _ = state_tx.send(ConnectionState::Connected);
} else {
let _ = state_tx.send(ConnectionState::Disconnected);
}
let _ = reply.send(result);
}
SubCmd::Disconnect { reply } => {
user_wants_connected = false;
stop.store(true, Ordering::Relaxed);
let result = conn.disconnect();
if let Some(handle) = receive_thread.take() {
let _ = handle.join();
}
subscriptions.write().unwrap().clear();
let _ = state_tx.send(ConnectionState::Disconnected);
let _ = reply.send(result);
}
SubCmd::Pipe(event) => {
apply_pipe_event(event, user_wants_connected, &state_tx);
}
SubCmd::Subscribe { group_msg, fdiv, reply } => {
let result = do_subscribe(&conn, &subscriptions, group_msg, fdiv);
let _ = reply.send(result);
}
SubCmd::Unsubscribe { id, reply } => {
let result = do_unsubscribe(&conn, &subscriptions, id);
let _ = reply.send(result);
}
SubCmd::ApplyResubscribe { results, reply } => {
let result = do_apply_resubscribe(&conn, &subscriptions, results);
let _ = reply.send(result);
}
}
}
stop.store(true, Ordering::Relaxed);
let _ = conn.disconnect();
if let Some(handle) = receive_thread.take() {
let _ = handle.join();
}
}
fn run_subscribe_receive(
sock: nng_c_sys::nng_socket,
subscriptions: Arc<RwLock<HashMap<u32, Subscription>>>,
stop: Arc<AtomicBool>,
) {
const HEADER_LEN: usize = 4;
loop {
if stop.load(Ordering::Relaxed) {
break;
}
match receive_message(&sock) {
Ok(buffer) => {
if buffer.len() > HEADER_LEN {
let id = (buffer[0] as u32)
| ((buffer[1] as u32) << 8)
| ((buffer[2] as u32) << 16);
let sub_opt = subscriptions.read().unwrap().get(&id).cloned();
if let Some(sub) = sub_opt {
sub.update(buffer);
}
}
}
Err(_) => {
if stop.load(Ordering::Relaxed) {
break;
}
thread::sleep(Duration::from_millis(50));
}
}
}
}
fn do_subscribe(
conn: &ConnectionManager,
subscriptions: &Arc<RwLock<HashMap<u32, Subscription>>>,
group_msg: crate::msg::GroupStatusMsg,
fdiv: u32,
) -> Result<Subscription> {
let sock = conn.sock.as_ref().ok_or_else(|| {
MotorcortexError::Connection("Socket is not available. Connect first.".into())
})?;
let id = group_msg.id;
nng_sub(*sock, id)?;
let sub = Subscription::new(group_msg, fdiv);
let returned = sub.clone();
subscriptions.write().unwrap().insert(id, sub);
Ok(returned)
}
fn do_apply_resubscribe(
conn: &ConnectionManager,
subscriptions: &Arc<RwLock<HashMap<u32, Subscription>>>,
results: Vec<(u32, crate::msg::GroupStatusMsg)>,
) -> Result<()> {
let sock = conn.sock.as_ref().ok_or_else(|| {
MotorcortexError::Connection("Socket is not available. Connect first.".into())
})?;
let mut guard = subscriptions.write().unwrap();
for (old_id, new_group) in results {
let sub = match guard.remove(&old_id) {
Some(s) => s,
None => continue,
};
let _ = nng_unsub(*sock, old_id);
let new_id = new_group.id;
nng_sub(*sock, new_id)?;
sub.rebind(new_group);
guard.insert(new_id, sub);
}
Ok(())
}
fn nng_sub(sock: nng_c_sys::nng_socket, id: u32) -> Result<()> {
let bytes = id.to_le_bytes();
let rv = unsafe {
nng_c_sys::nng_setopt(
sock,
nng_c_sys::NNG_OPT_SUB_SUBSCRIBE.as_ptr() as *const core::ffi::c_char,
bytes.as_ptr() as *const std::ffi::c_void,
ID_BYTE_SIZE,
)
};
if rv != 0 {
return Err(MotorcortexError::Subscription(format!(
"Failed to subscribe via NNG. Error code: {rv}"
)));
}
Ok(())
}
fn nng_unsub(sock: nng_c_sys::nng_socket, id: u32) -> Result<()> {
let bytes = id.to_le_bytes();
let rv = unsafe {
nng_c_sys::nng_setopt(
sock,
nng_c_sys::NNG_OPT_SUB_UNSUBSCRIBE.as_ptr() as *const core::ffi::c_char,
bytes.as_ptr() as *const std::ffi::c_void,
ID_BYTE_SIZE,
)
};
if rv != 0 {
return Err(MotorcortexError::Subscription(format!(
"Failed to unsubscribe via NNG. Error code: {rv}"
)));
}
Ok(())
}
fn do_unsubscribe(
conn: &ConnectionManager,
subscriptions: &Arc<RwLock<HashMap<u32, Subscription>>>,
id: u32,
) -> Result<()> {
let removed = subscriptions.write().unwrap().remove(&id);
if removed.is_none() {
return Ok(());
}
let sock = conn.sock.as_ref().ok_or_else(|| {
MotorcortexError::Connection("Socket is not available.".into())
})?;
nng_unsub(*sock, id)
}
fn rpc_round_trip<M, T>(conn: &ConnectionManager, msg: &M) -> Result<T>
where
M: prost::Message + crate::msg::Hash,
T: prost::Message + Default + crate::msg::Hash,
{
let sock = conn.sock.as_ref().ok_or_else(|| {
MotorcortexError::Connection("Socket is not available. Connect first.".into())
})?;
let buffer = encode_with_hash(msg)?;
let data_ptr = buffer.as_ptr() as *mut std::ffi::c_void;
let rv = unsafe { nng_c_sys::nng_send(*sock, data_ptr, buffer.len(), 0) };
if rv != 0 {
return Err(MotorcortexError::Io(format!(
"nng_send failed with code: {rv}"
)));
}
let reply = receive_message(sock)?;
decode_message::<T>(&reply)
}
fn do_login(conn: &ConnectionManager, user: String, pass: String) -> Result<StatusCode> {
let msg = LoginMsg {
header: None,
login: user,
password: pass,
};
let status: StatusMsg = rpc_round_trip(conn, &msg)?;
Ok(StatusCode::try_from(status.status)
.unwrap_or(StatusCode::Failed))
}
fn do_logout(conn: &ConnectionManager) -> Result<StatusCode> {
let msg = LogoutMsg { header: None };
let status: StatusMsg = rpc_round_trip(conn, &msg)?;
Ok(StatusCode::try_from(status.status)
.unwrap_or(StatusCode::Failed))
}
fn do_get_parameter(conn: &ConnectionManager, path: String) -> Result<Vec<u8>> {
let msg = GetParameterMsg { header: None, path };
let reply: ParameterMsg = rpc_round_trip(conn, &msg)?;
Ok(reply.value)
}
fn do_set_parameter(
conn: &ConnectionManager,
path: String,
value: Vec<u8>,
) -> Result<StatusCode> {
let msg = SetParameterMsg {
header: None,
offset: None,
path,
value,
};
let status: StatusMsg = rpc_round_trip(conn, &msg)?;
Ok(StatusCode::try_from(status.status).unwrap_or(StatusCode::Failed))
}
fn do_get_parameters(conn: &ConnectionManager, msg: GetParameterListMsg) -> Result<ParameterListMsg> {
rpc_round_trip(conn, &msg)
}
fn do_set_parameters(conn: &ConnectionManager, msg: SetParameterListMsg) -> Result<StatusCode> {
let status: StatusMsg = rpc_round_trip(conn, &msg)?;
Ok(StatusCode::try_from(status.status).unwrap_or(StatusCode::Failed))
}
fn do_create_group(conn: &ConnectionManager, msg: CreateGroupMsg) -> Result<GroupStatusMsg> {
rpc_round_trip(conn, &msg)
}
fn do_get_session_token(
conn: &ConnectionManager,
last_token: &Arc<RwLock<Option<String>>>,
) -> Result<String> {
let msg = GetSessionTokenMsg { header: None };
let reply: SessionTokenMsg = rpc_round_trip(conn, &msg)?;
let status = StatusCode::try_from(reply.status).unwrap_or(StatusCode::Failed);
if status != StatusCode::Ok {
return Err(MotorcortexError::Status(status));
}
if let Ok(mut g) = last_token.write() {
*g = Some(reply.token.clone());
}
Ok(reply.token)
}
fn do_restore_session(conn: &ConnectionManager, token: String) -> Result<StatusCode> {
let msg = RestoreSessionMsg {
header: None,
token,
};
let status: StatusMsg = rpc_round_trip(conn, &msg)?;
Ok(StatusCode::try_from(status.status).unwrap_or(StatusCode::Failed))
}
fn do_get_parameter_tree_hash(conn: &ConnectionManager) -> Result<u32> {
let msg = GetParameterTreeHashMsg { header: None };
let reply: ParameterTreeHashMsg = rpc_round_trip(conn, &msg)?;
Ok(reply.hash)
}
fn do_remove_group(conn: &ConnectionManager, alias: String) -> Result<StatusCode> {
let msg = RemoveGroupMsg { header: None, alias };
let status: StatusMsg = rpc_round_trip(conn, &msg)?;
Ok(StatusCode::try_from(status.status).unwrap_or(StatusCode::Failed))
}
fn do_request_parameter_tree(
conn: &ConnectionManager,
tree: &Arc<RwLock<ParameterTree>>,
) -> Result<StatusCode> {
let msg = GetParameterTreeMsg { header: None };
let reply: ParameterTreeMsg = rpc_round_trip(conn, &msg)?;
let status = StatusCode::try_from(reply.status).unwrap_or(StatusCode::Failed);
if status == StatusCode::Ok {
if let Some(new_tree) = ParameterTree::from_message(reply) {
let mut guard = tree
.write()
.map_err(|_| MotorcortexError::Decode("parameter tree lock poisoned".into()))?;
*guard = new_tree;
}
}
Ok(status)
}
#[cfg(test)]
mod state_machine_tests {
use super::*;
fn decide(
event: PipeEvent,
pending: bool,
outcome: RestoreOutcome,
failures: u32,
cap: Option<u32>,
reconnect_enabled: bool,
) -> PipeEventDecision {
decide_state_after_pipe_event(
event,
pending,
true, reconnect_enabled,
cap,
failures,
outcome,
)
.expect("user_wants_connected = true always returns Some")
}
#[test]
fn user_disconnect_swallows_events() {
for event in [PipeEvent::Added, PipeEvent::Removed] {
let d = decide_state_after_pipe_event(
event,
false,
false, true,
None,
0,
RestoreOutcome::NotAttempted,
);
assert!(d.is_none(), "events while disconnected must be ignored");
}
}
#[test]
fn pipe_removed_with_reconnect_enabled_goes_to_connection_lost() {
let d = decide(
PipeEvent::Removed,
false,
RestoreOutcome::NotAttempted,
0,
None,
true,
);
assert_eq!(d.new_state, ConnectionState::ConnectionLost);
assert!(d.new_pending_reconnect);
assert!(d.new_reconnect_enabled);
assert!(!d.disable_dialer);
}
#[test]
fn pipe_removed_with_reconnect_disabled_goes_to_disconnected() {
let d = decide(
PipeEvent::Removed,
false,
RestoreOutcome::NotAttempted,
0,
None,
false,
);
assert_eq!(d.new_state, ConnectionState::Disconnected);
assert!(!d.new_pending_reconnect);
}
#[test]
fn pipe_added_without_pending_reconnect_is_plain_connected() {
let d = decide(
PipeEvent::Added,
false,
RestoreOutcome::NotAttempted,
0,
None,
true,
);
assert_eq!(d.new_state, ConnectionState::Connected);
}
#[test]
fn pipe_added_with_no_token_treats_reconnect_as_success() {
let d = decide(
PipeEvent::Added,
true, RestoreOutcome::NotAttempted,
3, Some(5),
true,
);
assert_eq!(d.new_state, ConnectionState::Connected);
assert_eq!(d.new_consecutive_failures, 0, "counter resets on success");
}
#[test]
fn pipe_added_with_accepted_restore_resets_counter() {
let d = decide(
PipeEvent::Added,
true,
RestoreOutcome::Accepted,
2,
Some(3),
true,
);
assert_eq!(d.new_state, ConnectionState::Connected);
assert_eq!(d.new_consecutive_failures, 0);
}
#[test]
fn pipe_added_with_rejected_restore_emits_session_expired_under_cap() {
let d = decide(
PipeEvent::Added,
true,
RestoreOutcome::Rejected,
0,
Some(3),
true,
);
assert_eq!(d.new_state, ConnectionState::SessionExpired);
assert_eq!(d.new_consecutive_failures, 1);
assert!(!d.disable_dialer);
}
#[test]
fn pipe_added_with_rejected_restore_trips_cap() {
let d = decide(
PipeEvent::Added,
true,
RestoreOutcome::Rejected,
1,
Some(2),
true,
);
assert_eq!(d.new_state, ConnectionState::Disconnected);
assert_eq!(d.new_consecutive_failures, 2);
assert!(d.disable_dialer, "cap-exceeded must request dialer disable");
assert!(!d.new_reconnect_enabled, "reconnect flag must flip off");
}
#[test]
fn pipe_added_with_rejected_restore_and_no_cap_keeps_retrying() {
let d = decide(
PipeEvent::Added,
true,
RestoreOutcome::Rejected,
1_000_000,
None,
true,
);
assert_eq!(d.new_state, ConnectionState::SessionExpired);
assert_eq!(d.new_consecutive_failures, 1_000_001);
assert!(!d.disable_dialer);
assert!(d.new_reconnect_enabled);
}
#[test]
fn cap_exactly_1_trips_on_first_rejection() {
let d = decide(
PipeEvent::Added,
true,
RestoreOutcome::Rejected,
0,
Some(1),
true,
);
assert_eq!(d.new_state, ConnectionState::Disconnected);
assert!(d.disable_dialer);
}
#[test]
fn consecutive_restore_failures_saturates_rather_than_overflowing() {
let d = decide(
PipeEvent::Added,
true,
RestoreOutcome::Rejected,
u32::MAX,
None,
true,
);
assert_eq!(d.new_consecutive_failures, u32::MAX);
}
}