use std::sync::{Arc, Mutex};
use std::time::Duration;
use asyn_rs::interfaces::motor::{AsynMotor, MotorStatus};
use asyn_rs::user::AsynUser;
use tokio::sync::{mpsc, oneshot};
use crate::device_state::*;
use crate::flags::MotorCommand;
#[derive(Debug)]
pub(crate) enum AxisCommand {
Execute {
actions: DeviceActions,
reply: oneshot::Sender<()>,
},
GetStatus {
reply: oneshot::Sender<Option<MotorStatus>>,
},
StartPolling,
StopPolling,
ScheduleDelay { duration: Duration },
Shutdown,
}
#[derive(Clone)]
pub struct AxisHandle {
tx: mpsc::Sender<AxisCommand>,
io_intr_rx_take: Arc<Mutex<Option<mpsc::Receiver<()>>>>,
}
impl AxisHandle {
pub async fn execute(&self, actions: DeviceActions) {
let (reply_tx, reply_rx) = oneshot::channel();
let _ = self
.tx
.send(AxisCommand::Execute {
actions,
reply: reply_tx,
})
.await;
let _ = reply_rx.await;
}
pub async fn get_status(&self) -> Option<MotorStatus> {
let (reply_tx, reply_rx) = oneshot::channel();
let _ = self
.tx
.send(AxisCommand::GetStatus { reply: reply_tx })
.await;
reply_rx.await.ok().flatten()
}
pub async fn start_polling(&self) {
let _ = self.tx.send(AxisCommand::StartPolling).await;
}
pub async fn stop_polling(&self) {
let _ = self.tx.send(AxisCommand::StopPolling).await;
}
pub async fn schedule_delay(&self, _id: u64, duration: Duration) {
let _ = self.tx.send(AxisCommand::ScheduleDelay { duration }).await;
}
pub fn take_io_intr_receiver(&self) -> Option<mpsc::Receiver<()>> {
self.io_intr_rx_take.lock().ok()?.take()
}
pub async fn shutdown(&self) {
let _ = self.tx.send(AxisCommand::Shutdown).await;
}
}
pub struct AxisRuntime {
motor: Box<dyn AsynMotor>,
cmd_rx: mpsc::Receiver<AxisCommand>,
io_intr_tx: mpsc::Sender<()>,
moving_poll_interval: Duration,
idle_poll_interval: Duration,
forced_fast_polls_config: u32,
forced_fast_polls_remaining: u32,
latest_status: Option<MotorStatus>,
active_polling: bool,
status_seq: u64,
auto_power: bool,
power_on_delay: Duration,
power_off_delay: Duration,
was_moving: bool,
power_off_time: Option<tokio::time::Instant>,
}
#[derive(Debug, Clone)]
pub struct AutoPowerConfig {
pub enabled: bool,
pub on_delay: Duration,
pub off_delay: Duration,
}
impl Default for AutoPowerConfig {
fn default() -> Self {
Self {
enabled: false,
on_delay: Duration::ZERO,
off_delay: Duration::ZERO,
}
}
}
pub fn create_axis_runtime(
motor: Box<dyn AsynMotor>,
moving_poll_interval: Duration,
idle_poll_interval: Duration,
forced_fast_polls: u32,
) -> (AxisRuntime, AxisHandle) {
create_axis_runtime_with_auto_power(
motor,
moving_poll_interval,
idle_poll_interval,
forced_fast_polls,
AutoPowerConfig::default(),
)
}
pub fn create_axis_runtime_with_auto_power(
motor: Box<dyn AsynMotor>,
moving_poll_interval: Duration,
idle_poll_interval: Duration,
forced_fast_polls: u32,
auto_power: AutoPowerConfig,
) -> (AxisRuntime, AxisHandle) {
let (cmd_tx, cmd_rx) = mpsc::channel(64);
let (io_intr_tx, io_intr_rx) = mpsc::channel(16);
let runtime = AxisRuntime {
motor,
cmd_rx,
io_intr_tx,
moving_poll_interval,
idle_poll_interval,
forced_fast_polls_config: forced_fast_polls,
forced_fast_polls_remaining: 0,
latest_status: None,
active_polling: false,
status_seq: 0,
auto_power: auto_power.enabled,
power_on_delay: auto_power.on_delay,
power_off_delay: auto_power.off_delay,
was_moving: false,
power_off_time: None,
};
let handle = AxisHandle {
tx: cmd_tx,
io_intr_rx_take: Arc::new(Mutex::new(Some(io_intr_rx))),
};
(runtime, handle)
}
impl AxisRuntime {
pub async fn run(mut self) {
self.poll_motor().await;
loop {
if self.active_polling {
let interval = self.effective_poll_interval();
tokio::select! {
cmd = self.cmd_rx.recv() => {
match cmd {
Some(cmd) => {
if self.handle_command(cmd).await {
return;
}
}
None => return,
}
}
_ = tokio::time::sleep(interval) => {
self.poll_motor().await;
}
}
} else {
match self.cmd_rx.recv().await {
Some(cmd) => {
if self.handle_command(cmd).await {
return;
}
}
None => return,
}
}
}
}
async fn handle_command(&mut self, cmd: AxisCommand) -> bool {
match cmd {
AxisCommand::Execute { actions, reply } => {
let has_move = actions.commands.iter().any(|c| {
matches!(
c,
MotorCommand::MoveAbsolute { .. }
| MotorCommand::MoveRelative { .. }
| MotorCommand::MoveVelocity { .. }
| MotorCommand::Home { .. }
)
});
self.execute_actions(&actions);
self.apply_poll_directive(&actions);
if has_move {
self.forced_fast_polls_remaining = self.forced_fast_polls_config;
}
if let Some(ref delay) = actions.schedule_delay {
let dur = delay.duration;
let tx = self.io_intr_tx.clone();
tokio::spawn(async move {
tokio::time::sleep(dur).await;
let _ = tx.send(()).await;
});
}
let _ = reply.send(());
false
}
AxisCommand::GetStatus { reply } => {
let _ = reply.send(self.latest_status.clone());
false
}
AxisCommand::StartPolling => {
self.active_polling = true;
self.forced_fast_polls_remaining = self.forced_fast_polls_config;
false
}
AxisCommand::StopPolling => {
self.active_polling = false;
false
}
AxisCommand::ScheduleDelay { duration } => {
self.active_polling = false;
let tx = self.io_intr_tx.clone();
tokio::spawn(async move {
tokio::time::sleep(duration).await;
let _ = tx.send(()).await;
});
false
}
AxisCommand::Shutdown => true,
}
}
fn effective_poll_interval(&mut self) -> Duration {
if self.forced_fast_polls_remaining > 0 {
self.forced_fast_polls_remaining -= 1;
self.moving_poll_interval
} else if self.latest_status.as_ref().is_some_and(|s| s.moving) {
self.moving_poll_interval
} else {
self.idle_poll_interval
}
}
fn execute_actions(&mut self, actions: &DeviceActions) {
let user = AsynUser::new(0);
if self.auto_power {
let has_move = actions.commands.iter().any(|c| {
matches!(
c,
MotorCommand::MoveAbsolute { .. }
| MotorCommand::MoveRelative { .. }
| MotorCommand::MoveVelocity { .. }
| MotorCommand::Home { .. }
)
});
if has_move {
let _ = self.motor.set_closed_loop(&user, true);
if !self.power_on_delay.is_zero() {
std::thread::sleep(self.power_on_delay);
}
}
}
for cmd in &actions.commands {
let result = match cmd {
MotorCommand::MoveAbsolute {
position,
velocity,
acceleration,
} => self
.motor
.move_absolute(&user, *position, *velocity, *acceleration),
MotorCommand::MoveRelative {
distance,
velocity,
acceleration,
} => self
.motor
.move_relative(&user, *distance, *velocity, *acceleration),
MotorCommand::MoveVelocity {
direction,
velocity,
acceleration,
} => {
let signed_vel = if *direction { *velocity } else { -*velocity };
self.motor.move_velocity(&user, signed_vel, *acceleration)
}
MotorCommand::Home {
forward,
velocity,
acceleration: _,
} => self.motor.home(&user, *velocity, *forward),
MotorCommand::Stop { acceleration } => self.motor.stop(&user, *acceleration),
MotorCommand::SetPosition { position } => self.motor.set_position(&user, *position),
MotorCommand::SetClosedLoop { enable } => {
self.motor.set_closed_loop(&user, *enable)
}
MotorCommand::DeferMoves { defer } => self.motor.set_deferred_moves(&user, *defer),
MotorCommand::ProfileInitialize { max_points } => {
self.motor.initialize_profile(&user, *max_points)
}
MotorCommand::ProfileBuild => self.motor.build_profile(&user),
MotorCommand::ProfileExecute => self.motor.execute_profile(&user),
MotorCommand::ProfileAbort => self.motor.abort_profile(&user),
MotorCommand::ProfileReadback => self.motor.readback_profile(&user).map(|_| ()),
MotorCommand::Poll => Ok(()),
};
if let Err(e) = result {
tracing::error!("motor command error: {e}");
}
}
}
fn apply_poll_directive(&mut self, actions: &DeviceActions) {
match actions.poll {
PollDirective::Start => self.active_polling = true,
PollDirective::Stop => self.active_polling = false,
PollDirective::None => {}
}
}
async fn poll_motor(&mut self) {
let user = AsynUser::new(0);
match self.motor.poll(&user) {
Ok(status) => {
if self.auto_power {
if status.moving {
self.was_moving = true;
self.power_off_time = None;
} else if self.was_moving {
self.was_moving = false;
if self.power_off_delay.is_zero() {
let _ = self.motor.set_closed_loop(&user, false);
} else {
self.power_off_time =
Some(tokio::time::Instant::now() + self.power_off_delay);
}
}
if let Some(off_time) = self.power_off_time {
if tokio::time::Instant::now() >= off_time && !status.moving {
let _ = self.motor.set_closed_loop(&user, false);
self.power_off_time = None;
}
}
}
self.status_seq += 1;
self.latest_status = Some(status);
let _ = self.io_intr_tx.send(()).await;
}
Err(e) => {
tracing::error!("motor poll error: {e}");
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use asyn_rs::error::AsynResult;
struct SimMotor {
position: f64,
target: f64,
moving: bool,
}
impl SimMotor {
fn new() -> Self {
Self {
position: 0.0,
target: 0.0,
moving: false,
}
}
}
impl AsynMotor for SimMotor {
fn move_absolute(
&mut self,
_user: &AsynUser,
pos: f64,
_vel: f64,
_acc: f64,
) -> AsynResult<()> {
self.target = pos;
self.moving = true;
Ok(())
}
fn home(&mut self, _user: &AsynUser, _vel: f64, _forward: bool) -> AsynResult<()> {
self.target = 0.0;
self.moving = true;
Ok(())
}
fn stop(&mut self, _user: &AsynUser, _acc: f64) -> AsynResult<()> {
self.target = self.position;
self.moving = false;
Ok(())
}
fn set_position(&mut self, _user: &AsynUser, pos: f64) -> AsynResult<()> {
self.position = pos;
self.target = pos;
Ok(())
}
fn poll(&mut self, _user: &AsynUser) -> AsynResult<MotorStatus> {
if self.moving {
self.position = self.target;
self.moving = false;
}
Ok(MotorStatus {
position: self.position,
encoder_position: self.position,
done: !self.moving,
moving: self.moving,
..MotorStatus::default()
})
}
}
#[tokio::test]
async fn axis_runtime_basic() {
let (runtime, handle) = create_axis_runtime(
Box::new(SimMotor::new()),
Duration::from_millis(50),
Duration::from_millis(50),
0,
);
let runtime_handle = tokio::spawn(runtime.run());
tokio::time::sleep(Duration::from_millis(10)).await;
let status = handle.get_status().await.unwrap();
assert!(status.done);
assert!((status.position - 0.0).abs() < 1e-10);
let actions = DeviceActions {
commands: vec![MotorCommand::MoveAbsolute {
position: 10.0,
velocity: 1.0,
acceleration: 1.0,
}],
poll: PollDirective::Start,
..Default::default()
};
handle.execute(actions).await;
tokio::time::sleep(Duration::from_millis(100)).await;
let status = handle.get_status().await.unwrap();
assert!((status.position - 10.0).abs() < 1e-10);
assert!(status.done);
handle.shutdown().await;
let _ = runtime_handle.await;
}
#[tokio::test]
async fn axis_runtime_polling_start_stop() {
let (runtime, handle) = create_axis_runtime(
Box::new(SimMotor::new()),
Duration::from_millis(20),
Duration::from_millis(20),
0,
);
let runtime_handle = tokio::spawn(runtime.run());
handle.start_polling().await;
tokio::time::sleep(Duration::from_millis(50)).await;
handle.stop_polling().await;
handle.shutdown().await;
let _ = runtime_handle.await;
}
#[tokio::test]
async fn axis_runtime_set_position() {
let (runtime, handle) = create_axis_runtime(
Box::new(SimMotor::new()),
Duration::from_millis(50),
Duration::from_millis(50),
0,
);
let runtime_handle = tokio::spawn(runtime.run());
let actions = DeviceActions {
commands: vec![MotorCommand::SetPosition { position: 5.0 }],
poll: PollDirective::None,
..Default::default()
};
handle.execute(actions).await;
handle.start_polling().await;
tokio::time::sleep(Duration::from_millis(100)).await;
let status = handle.get_status().await.unwrap();
assert!((status.position - 5.0).abs() < 1e-10);
handle.shutdown().await;
let _ = runtime_handle.await;
}
#[tokio::test]
async fn axis_runtime_io_intr() {
let (runtime, handle) = create_axis_runtime(
Box::new(SimMotor::new()),
Duration::from_millis(50),
Duration::from_millis(50),
0,
);
let mut io_intr_rx = handle.take_io_intr_receiver().unwrap();
let runtime_handle = tokio::spawn(runtime.run());
let result = tokio::time::timeout(Duration::from_millis(100), io_intr_rx.recv()).await;
assert!(result.is_ok());
handle.shutdown().await;
let _ = runtime_handle.await;
}
#[tokio::test]
async fn axis_handle_clone() {
let (runtime, handle) = create_axis_runtime(
Box::new(SimMotor::new()),
Duration::from_millis(50),
Duration::from_millis(50),
0,
);
let handle2 = handle.clone();
let runtime_handle = tokio::spawn(runtime.run());
tokio::time::sleep(Duration::from_millis(10)).await;
let status = handle2.get_status().await.unwrap();
assert!((status.position - 0.0).abs() < 1e-10);
handle.shutdown().await;
let _ = runtime_handle.await;
}
}