use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex as StdMutex, RwLock};
use std::time::{Duration, Instant};
use can_transport::{CanBus, CanFilter};
use tokio::sync::broadcast;
use tokio::time::MissedTickBehavior;
use tokio_util::sync::CancellationToken;
use crate::canopen::{nmt, sdo};
use crate::error::{Error, Result};
use crate::types::MotorIdentity;
use super::events::Cia402Event;
use super::motor_entry::MotorEntry;
use super::types::{MotorLifecycle, ReinitReason};
const HB_COB_BASE: u16 = 0x700;
const HB_COB_MASK: u16 = 0x780;
pub(crate) async fn run_discovery(
bus: Arc<dyn CanBus>,
our_hb_node_id: u8,
motors: Arc<RwLock<HashMap<u8, Arc<MotorEntry>>>>,
inflight_ops: Arc<StdMutex<HashSet<u8>>>,
events_tx: broadcast::Sender<Cia402Event>,
sdo_timeout: Duration,
cancel: CancellationToken,
) {
let mut rx = match bus
.subscribe(CanFilter::standard(HB_COB_BASE, HB_COB_MASK))
.await
{
Ok(r) => r,
Err(e) => {
log::error!("Discovery: subscribe failed: {e}");
return;
}
};
loop {
tokio::select! {
_ = cancel.cancelled() => {
log::debug!("Discovery cancelled");
return;
}
frame_res = rx.recv() => {
let frame = match frame_res {
Ok(f) => f,
Err(e) => {
log::warn!("Discovery rx error (continuing): {e}");
continue;
}
};
let Some((nid, state)) = nmt::parse_heartbeat(&frame) else {
continue;
};
if nid == our_hb_node_id {
continue;
}
handle_inbound_hb(
nid,
state,
&motors,
&inflight_ops,
&events_tx,
&bus,
sdo_timeout,
&cancel,
);
}
}
}
}
#[allow(clippy::too_many_arguments)]
fn handle_inbound_hb(
nid: u8,
state: nmt::NmtState,
motors: &Arc<RwLock<HashMap<u8, Arc<MotorEntry>>>>,
inflight_ops: &Arc<StdMutex<HashSet<u8>>>,
events_tx: &broadcast::Sender<Cia402Event>,
bus: &Arc<dyn CanBus>,
sdo_timeout: Duration,
cancel: &CancellationToken,
) {
let now = Instant::now();
let (entry, is_new) = {
let mut g = motors.write().unwrap();
match g.get(&nid) {
Some(e) => (e.clone(), false),
None => {
let e = Arc::new(MotorEntry::new(nid));
g.insert(nid, e.clone());
(e, true)
}
}
};
if is_new {
let _ = events_tx.send(Cia402Event::NodeAppeared { nid });
}
let needs_identify;
let needs_reinit_event;
let needs_online_event;
let live_state;
{
let mut inner = entry.inner.lock().unwrap();
inner.last_heartbeat = Some(now);
inner.nmt_state = Some(state);
needs_online_event = !inner.online;
inner.online = true;
needs_reinit_event = if matches!(inner.lifecycle, MotorLifecycle::Initialized)
&& state != nmt::NmtState::Operational
{
inner.lifecycle = MotorLifecycle::NeedsReinit {
reason: ReinitReason::LeftOperational,
};
true
} else {
false
};
needs_identify = matches!(inner.lifecycle, MotorLifecycle::Unknown);
live_state = inner.build_live_state(now);
}
if needs_online_event {
let _ = events_tx.send(Cia402Event::NodeOnline { nid });
}
if needs_reinit_event {
let _ = events_tx.send(Cia402Event::NeedsReinit {
nid,
reason: ReinitReason::LeftOperational,
});
}
entry.publish(live_state);
if needs_identify {
spawn_identify_if_idle(
nid,
entry,
inflight_ops.clone(),
events_tx.clone(),
bus.clone(),
sdo_timeout,
cancel.clone(),
);
}
}
fn spawn_identify_if_idle(
nid: u8,
entry: Arc<MotorEntry>,
inflight: Arc<StdMutex<HashSet<u8>>>,
events_tx: broadcast::Sender<Cia402Event>,
bus: Arc<dyn CanBus>,
sdo_timeout: Duration,
cancel: CancellationToken,
) {
{
let mut g = inflight.lock().unwrap();
if !g.insert(nid) {
return;
}
}
tokio::spawn(async move {
let res = tokio::select! {
_ = cancel.cancelled() => Err(Error::Internal("cancelled".into())),
r = identify_once(bus.as_ref(), nid, sdo_timeout) => r,
};
match res {
Ok(identity) => {
{
let mut inner = entry.inner.lock().unwrap();
inner.identity = Some(identity.clone());
if matches!(inner.lifecycle, MotorLifecycle::Unknown) {
inner.lifecycle = MotorLifecycle::Identified;
}
}
let _ = events_tx.send(Cia402Event::Identified { nid, identity });
}
Err(e) => {
log::warn!("identify nid 0x{nid:02X} failed: {e}");
let _ = events_tx.send(Cia402Event::IdentifyFailed {
nid,
reason: e.to_string(),
});
}
}
inflight.lock().unwrap().remove(&nid);
});
}
pub(crate) async fn identify_once(
bus: &dyn CanBus,
nid: u8,
sdo_timeout: Duration,
) -> Result<MotorIdentity> {
let timeout = Some(sdo_timeout);
let vendor_id = sdo::upload_u32(bus, nid, 0x1018, 1, timeout).await?;
let product_code = sdo::upload_u32(bus, nid, 0x1018, 2, timeout).await?;
let revision_number = sdo::upload_u32(bus, nid, 0x1018, 3, timeout).await?;
let serial_number = sdo::upload_u32(bus, nid, 0x1018, 4, timeout).await?;
let product_name = match sdo::upload_string(bus, nid, 0x1008, 0, timeout).await {
Ok(s) => Some(s),
Err(e) => {
log::debug!("nid 0x{nid:02X}: 0x1008 not readable ({e}); proceeding without product name");
None
}
};
Ok(MotorIdentity {
node_id: nid,
vendor_id,
product_code,
revision_number,
serial_number,
product_name,
})
}
pub(crate) async fn run_liveness_monitor(
motors: Arc<RwLock<HashMap<u8, Arc<MotorEntry>>>>,
events_tx: broadcast::Sender<Cia402Event>,
motor_heartbeat_period: Duration,
initialized_stale_threshold: Duration,
cancel: CancellationToken,
) {
let uninit_threshold = (motor_heartbeat_period * 5) / 2;
let init_threshold = initialized_stale_threshold;
let tick_period = init_threshold
.min(uninit_threshold)
.div_f32(2.0)
.clamp(Duration::from_millis(20), Duration::from_millis(200));
log::debug!(
"liveness monitor: uninit_threshold={uninit_threshold:?}, \
init_threshold={init_threshold:?}, tick={tick_period:?}"
);
let mut tick = tokio::time::interval(tick_period);
tick.set_missed_tick_behavior(MissedTickBehavior::Delay);
tick.tick().await;
loop {
tokio::select! {
_ = cancel.cancelled() => {
log::debug!("Liveness monitor cancelled");
return;
}
_ = tick.tick() => {
let now = Instant::now();
let entries: Vec<Arc<MotorEntry>> = {
let g = motors.read().unwrap();
g.values().cloned().collect()
};
for entry in entries {
let result = {
let mut inner = entry.inner.lock().unwrap();
if !inner.online {
continue; }
let threshold = if matches!(inner.lifecycle, MotorLifecycle::Initialized) {
init_threshold
} else {
uninit_threshold
};
let last_seen = inner.last_seen();
let online_now = last_seen
.is_some_and(|t| now.saturating_duration_since(t) <= threshold);
let was = inner.online;
inner.online = online_now;
let live = if was && !online_now {
Some(inner.build_live_state(now))
} else {
None
};
(was, online_now, live)
};
let (was_online, is_online, live_state) = result;
if was_online && !is_online {
let _ = events_tx.send(Cia402Event::NodeOffline {
nid: entry.node_id,
});
if let Some(state) = live_state {
entry.publish(state);
}
}
}
}
}
}
}