use car_server_types::channel::{ChannelId, SharedHost};
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex};
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ChannelLiveness {
pub last_send_at_ms: Option<i64>,
pub last_send_ok: Option<bool>,
pub last_error: Option<String>,
}
impl ChannelLiveness {
fn record_success(&mut self, at_ms: i64) {
self.last_send_at_ms = Some(at_ms);
self.last_send_ok = Some(true);
self.last_error = None;
}
fn record_failure(&mut self, at_ms: i64, reason: impl Into<String>) {
self.last_send_at_ms = Some(at_ms);
self.last_send_ok = Some(false);
self.last_error = Some(reason.into());
}
}
pub type SpawnFn = Box<
dyn Fn(ChannelId, &SharedHost, &SharedLiveness, tokio::sync::watch::Receiver<bool>)
-> Result<(), String>
+ Send
+ Sync,
>;
pub type SharedLiveness = Arc<Mutex<HashMap<ChannelId, ChannelLiveness>>>;
pub struct ChannelSupervisor {
host: SharedHost,
live: Mutex<HashSet<ChannelId>>,
cancel_tx: tokio::sync::watch::Sender<bool>,
spawn: Option<SpawnFn>,
liveness: SharedLiveness,
}
impl ChannelSupervisor {
pub fn new(
host: SharedHost,
cancel_tx: tokio::sync::watch::Sender<bool>,
spawn: SpawnFn,
) -> Self {
Self {
host,
live: Mutex::new(HashSet::new()),
cancel_tx,
spawn: Some(spawn),
liveness: Arc::new(Mutex::new(HashMap::new())),
}
}
#[cfg(test)]
pub fn new_for_test(host: SharedHost) -> Self {
let (cancel_tx, _rx) = tokio::sync::watch::channel(false);
Self {
host,
live: Mutex::new(HashSet::new()),
cancel_tx,
spawn: None,
liveness: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn liveness(&self) -> SharedLiveness {
self.liveness.clone()
}
pub fn cancel_all(&self) {
let _ = self.cancel_tx.send(true);
}
pub fn mark_spawned(&self, channel: ChannelId) {
self.live.lock().unwrap().insert(channel);
}
pub fn is_spawned(&self, channel: ChannelId) -> bool {
self.live.lock().unwrap().contains(&channel)
}
pub fn ensure_spawned(&self, channel: ChannelId) -> Result<(), String> {
{
let mut live = self.live.lock().unwrap();
if live.contains(&channel) {
return Ok(());
}
live.insert(channel);
}
let result = match &self.spawn {
Some(spawn) => spawn(
channel,
&self.host,
&self.liveness,
self.cancel_tx.subscribe(),
),
None => Ok(()),
};
if result.is_err() {
self.live.lock().unwrap().remove(&channel);
}
result
}
pub fn record_send_success(&self, channel: ChannelId) {
let now = now_ms();
self.liveness
.lock()
.unwrap()
.entry(channel)
.or_default()
.record_success(now);
}
pub fn record_send_failure(&self, channel: ChannelId, reason: impl Into<String>) {
let now = now_ms();
self.liveness
.lock()
.unwrap()
.entry(channel)
.or_default()
.record_failure(now, reason);
}
pub fn liveness_snapshot(&self, channel: ChannelId) -> ChannelLiveness {
self.liveness
.lock()
.unwrap()
.get(&channel)
.cloned()
.unwrap_or_default()
}
}
pub fn liveness_record_success(liveness: &SharedLiveness, channel: ChannelId) {
let now = now_ms();
liveness
.lock()
.unwrap()
.entry(channel)
.or_default()
.record_success(now);
}
pub fn liveness_record_failure(
liveness: &SharedLiveness,
channel: ChannelId,
reason: impl Into<String>,
) {
let now = now_ms();
liveness
.lock()
.unwrap()
.entry(channel)
.or_default()
.record_failure(now, reason);
}
fn now_ms() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
use car_server_types::host::HostState;
fn host() -> SharedHost {
Arc::new(HostState::new())
}
#[test]
fn ensure_spawned_is_idempotent() {
let sup = ChannelSupervisor::new_for_test(host());
assert!(!sup.is_spawned(ChannelId::IMessage));
sup.ensure_spawned(ChannelId::IMessage).unwrap();
assert!(sup.is_spawned(ChannelId::IMessage));
sup.ensure_spawned(ChannelId::IMessage).unwrap();
assert!(sup.is_spawned(ChannelId::IMessage));
assert!(!sup.is_spawned(ChannelId::Slack));
}
#[test]
fn mark_spawned_reflects_in_is_spawned() {
let sup = ChannelSupervisor::new_for_test(host());
sup.mark_spawned(ChannelId::IMessage);
assert!(sup.is_spawned(ChannelId::IMessage));
assert!(!sup.is_spawned(ChannelId::Slack));
}
#[test]
fn liveness_records_success_and_failure() {
let sup = ChannelSupervisor::new_for_test(host());
let snap = sup.liveness_snapshot(ChannelId::IMessage);
assert_eq!(snap, ChannelLiveness::default());
assert!(snap.last_send_at_ms.is_none());
sup.record_send_success(ChannelId::IMessage);
let snap = sup.liveness_snapshot(ChannelId::IMessage);
assert_eq!(snap.last_send_ok, Some(true));
assert!(snap.last_send_at_ms.is_some());
assert!(snap.last_error.is_none());
sup.record_send_failure(ChannelId::IMessage, "recipient not found");
let snap = sup.liveness_snapshot(ChannelId::IMessage);
assert_eq!(snap.last_send_ok, Some(false));
assert_eq!(snap.last_error.as_deref(), Some("recipient not found"));
sup.record_send_success(ChannelId::IMessage);
let snap = sup.liveness_snapshot(ChannelId::IMessage);
assert_eq!(snap.last_send_ok, Some(true));
assert!(snap.last_error.is_none());
}
}