use car_messaging::channel_supervisor::{ChannelSupervisor, SharedLiveness, SpawnFn};
use car_messaging::messaging_config::ChannelId;
use car_server_types::host::HostState;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
fn counting_supervisor() -> (Arc<ChannelSupervisor>, Arc<AtomicUsize>) {
let host = Arc::new(HostState::new());
let (cancel_tx, _rx) = tokio::sync::watch::channel(false);
let counter = Arc::new(AtomicUsize::new(0));
let counter_for_closure = counter.clone();
let spawn: SpawnFn = Box::new(
move |_channel: ChannelId,
_host: &Arc<HostState>,
_liveness: &SharedLiveness,
_cancel: tokio::sync::watch::Receiver<bool>|
-> Result<(), String> {
counter_for_closure.fetch_add(1, Ordering::SeqCst);
Ok(())
},
);
let sup = Arc::new(ChannelSupervisor::new(host, cancel_tx, spawn));
(sup, counter)
}
#[test]
fn runtime_enable_spawns_watcher_once() {
let (sup, count) = counting_supervisor();
assert!(!sup.is_spawned(ChannelId::IMessage));
sup.ensure_spawned(ChannelId::IMessage).unwrap();
assert!(sup.is_spawned(ChannelId::IMessage));
assert_eq!(count.load(Ordering::SeqCst), 1, "exactly one spawn");
}
#[test]
fn re_enable_is_idempotent_no_duplicate_watcher() {
let (sup, count) = counting_supervisor();
sup.ensure_spawned(ChannelId::IMessage).unwrap();
sup.ensure_spawned(ChannelId::IMessage).unwrap();
sup.ensure_spawned(ChannelId::IMessage).unwrap();
assert_eq!(
count.load(Ordering::SeqCst),
1,
"an already-spawned channel is not re-spawned (idempotent)"
);
}
#[test]
fn boot_spawned_channel_is_not_double_spawned_by_a_later_enable() {
let (sup, count) = counting_supervisor();
sup.mark_spawned(ChannelId::IMessage);
assert!(sup.is_spawned(ChannelId::IMessage));
sup.ensure_spawned(ChannelId::IMessage).unwrap();
assert_eq!(
count.load(Ordering::SeqCst),
0,
"a channel already spawned at boot is never re-spawned by a later set"
);
}
#[test]
fn channels_are_independent() {
let (sup, count) = counting_supervisor();
sup.ensure_spawned(ChannelId::IMessage).unwrap();
assert!(sup.is_spawned(ChannelId::IMessage));
assert!(!sup.is_spawned(ChannelId::Slack));
sup.ensure_spawned(ChannelId::Slack).unwrap();
assert!(sup.is_spawned(ChannelId::Slack));
assert_eq!(
count.load(Ordering::SeqCst),
2,
"each channel spawns its own watcher exactly once"
);
}
#[test]
fn spawn_error_leaves_channel_not_live_so_a_later_enable_retries() {
let host = Arc::new(HostState::new());
let (cancel_tx, _rx) = tokio::sync::watch::channel(false);
let attempts = Arc::new(AtomicUsize::new(0));
let attempts_for_closure = attempts.clone();
let spawn: SpawnFn = Box::new(
move |_channel: ChannelId,
_host: &Arc<HostState>,
_liveness: &SharedLiveness,
_cancel: tokio::sync::watch::Receiver<bool>|
-> Result<(), String> {
let n = attempts_for_closure.fetch_add(1, Ordering::SeqCst);
if n == 0 {
Err("not provisioned yet".to_string())
} else {
Ok(())
}
},
);
let sup = ChannelSupervisor::new(host, cancel_tx, spawn);
let err = sup.ensure_spawned(ChannelId::Slack);
assert!(err.is_err(), "the first spawn failed");
assert!(
!sup.is_spawned(ChannelId::Slack),
"a failed spawn does not mark the channel live"
);
sup.ensure_spawned(ChannelId::Slack).unwrap();
assert!(sup.is_spawned(ChannelId::Slack));
assert_eq!(attempts.load(Ordering::SeqCst), 2, "the retry re-attempted");
}