use std::sync::{Arc, Mutex};
use std::time::Instant;
use arc_swap::ArcSwap;
use crate::canopen::nmt::NmtState;
use crate::types::{MotorIdentity, MotorMode};
use super::subscribe::Subscriber;
use super::types::{Connection, LiveState, Logic, Measurements, MotorLifecycle};
use super::velocity::VelocityEstimator;
#[derive(Debug)]
pub(crate) struct MotorEntry {
pub node_id: u8,
pub inner: Mutex<MotorEntryInner>,
pub snapshot: ArcSwap<LiveState>,
pub subscribers: Mutex<Vec<Subscriber>>,
}
#[derive(Debug)]
pub(crate) struct MotorEntryInner {
pub identity: Option<MotorIdentity>,
pub lifecycle: MotorLifecycle,
pub online: bool,
pub last_heartbeat: Option<Instant>,
pub last_tpdo: Option<Instant>,
pub nmt_state: Option<NmtState>,
pub logic: Option<Logic>,
pub target_mode: Option<MotorMode>,
pub peak_torque_nm: Option<f32>,
pub mit_kp_kd_factor: Option<f32>,
pub measurements: Measurements,
pub vel_filter: VelocityEstimator,
}
impl MotorEntry {
pub fn new(node_id: u8) -> Self {
let now = Instant::now();
Self {
node_id,
inner: Mutex::new(MotorEntryInner {
identity: None,
lifecycle: MotorLifecycle::Unknown,
online: false,
last_heartbeat: None,
last_tpdo: None,
nmt_state: None,
logic: None,
target_mode: None,
peak_torque_nm: None,
mit_kp_kd_factor: None,
measurements: Measurements::default(),
vel_filter: VelocityEstimator::default(),
}),
snapshot: ArcSwap::from(Arc::new(LiveState::empty(now))),
subscribers: Mutex::new(Vec::new()),
}
}
pub fn publish(&self, state: LiveState) {
self.snapshot.store(Arc::new(state.clone()));
let mut subs = self.subscribers.lock().unwrap();
if subs.is_empty() {
return;
}
subs.retain_mut(|sub| sub.push(&state));
}
pub fn add_subscriber(&self, sub: Subscriber) {
self.subscribers.lock().unwrap().push(sub);
}
}
impl MotorEntryInner {
pub fn last_seen(&self) -> Option<Instant> {
match (self.last_heartbeat, self.last_tpdo) {
(Some(a), Some(b)) => Some(a.max(b)),
(a, b) => a.or(b),
}
}
pub fn build_live_state(&self, now: Instant) -> LiveState {
LiveState {
connection: Connection {
last_heartbeat: self.last_heartbeat,
last_tpdo: self.last_tpdo,
online: self.online,
nmt_state: self.nmt_state,
},
logic: self.logic.clone(),
measurements: self.measurements.clone(),
timestamp: now,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[test]
fn new_starts_unknown_offline() {
let e = MotorEntry::new(0x10);
let g = e.inner.lock().unwrap();
assert_eq!(g.lifecycle, MotorLifecycle::Unknown);
assert!(!g.online);
assert!(g.identity.is_none());
assert!(g.last_heartbeat.is_none());
assert!(g.last_tpdo.is_none());
}
#[test]
fn new_snapshot_is_empty() {
let e = MotorEntry::new(0x10);
let s = e.snapshot.load_full();
assert!(!s.connection.online);
assert!(s.logic.is_none());
assert_eq!(s.measurements, Measurements::default());
}
#[test]
fn last_seen_picks_max() {
let now = Instant::now();
let mut inner = MotorEntry::new(0x10).inner.into_inner().unwrap();
assert_eq!(inner.last_seen(), None);
inner.last_heartbeat = Some(now);
assert_eq!(inner.last_seen(), Some(now));
let later = now + Duration::from_millis(10);
inner.last_tpdo = Some(later);
assert_eq!(inner.last_seen(), Some(later));
let earlier = now - Duration::from_millis(20);
inner.last_heartbeat = Some(earlier);
assert_eq!(inner.last_seen(), Some(later));
}
#[test]
fn publish_updates_snapshot_and_drops_closed_subscribers() {
use crate::cia402::subscribe::{StreamOptions, Subscriber};
let e = MotorEntry::new(0x10);
let (sub, stream) = Subscriber::new(&StreamOptions::default());
e.add_subscriber(sub);
assert_eq!(e.subscribers.lock().unwrap().len(), 1);
let now = Instant::now();
let state = {
let mut inner = e.inner.lock().unwrap();
inner.online = true;
inner.last_heartbeat = Some(now);
inner.build_live_state(now)
};
e.publish(state);
assert!(e.snapshot.load_full().connection.online);
drop(stream);
let now2 = Instant::now();
let state2 = e.inner.lock().unwrap().build_live_state(now2);
e.publish(state2);
assert_eq!(e.subscribers.lock().unwrap().len(), 0);
}
}