mechutil 0.8.1

Utility structures and functions for mechatronics applications.
Documentation
//
// Copyright (C) 2024 Automated Design Corp.. All Rights Reserved.
// Created Date: 2024-09-06 07:32:01
// -----
// Last Modified: 2024-09-23 19:10:10
// -----
//
//

use anyhow::anyhow;
use async_trait::async_trait;
use log::{debug, error, info};
use std::time::Duration;
use tokio::sync::mpsc;

pub enum HeartbeatMessage {
    Shutdown,
}

/// A trait that state machines can implement to handle custom command execution.
#[async_trait]
pub trait HeartbeatHandler {
    /// Write the latest heartbeat count to the client/master. This is called independent
    /// of the tick so that long operations don't interrupt the heartbeat.
    ///
    /// If an error is returned, the FSM will shift to an error state. If Ok(()), the FSM proceeds
    /// normally.
    ///
    /// The default implementation does nothing, but usage of the heartbeat is recommended.
    async fn heartbeat(&mut self, _heartbeat_count: i64) -> Result<(), anyhow::Error>;

    /// Called only once as part of the shutdown process.
    ///
    /// Default implementation does nothing.
    fn on_shutdown(&mut self) -> Result<(), anyhow::Error> {
        return Ok(());
    }
}

pub struct HeartbeatActor {
    /// This is the upstream struct that implements this CommandFsmActor
    handler: Box<dyn HeartbeatHandler + Send>,
    /// Last counter value sent with the last heartbeat. Should increment every time.
    last_heartbeat: i64,
}

impl HeartbeatActor {
    pub fn new(handler: Box<dyn HeartbeatHandler + Send>) -> Self {
        Self {
            handler: handler,
            last_heartbeat: 0,
        }
    }

    /// Tick writing the heartbeat on regular intervals.
    async fn tick(&mut self) -> Result<(), anyhow::Error> {
        self.last_heartbeat += 1;
        return self.handler.heartbeat(self.last_heartbeat).await;
    }
}

async fn run_heartbeat(
    mut actor: HeartbeatActor,
    mut receiver: mpsc::Receiver<HeartbeatMessage>,
    interval: Duration,
) {
    let mut ticker = tokio::time::interval(interval);

    loop {
        tokio::select! {
            _ = ticker.tick() => {
                // tick the FSM
                if let Err(err) = actor.tick().await {
                    log::error!("Error ticking heartbeat. Heartbeat will shut down. Error: {}", err);
                    break;
                }
            },
            msg = receiver.recv() => {
                if let Some(m) = msg {
                    match m {
                        HeartbeatMessage::Shutdown => {
                            break;
                        }
                    }
                }

            }
        }
    }

    debug!("run_command_fsm done exited loop. Shutting down actor...");

    if let Err(err) = actor.handler.on_shutdown() {
        error!("Error shutting down command fsm: {}", err);
        return;
    }

    info!("run_command_fsm shutdown completed successfully.");
}

/// Handle that contains, starts and runs the CommandFsmActor.
pub struct HeartbeatHandle {
    sender: mpsc::Sender<HeartbeatMessage>,
}

impl HeartbeatHandle {
    /// Creates a new `CommandFsmActor` that ticks at the specified `interval`.
    ///
    /// # Arguments
    ///
    /// * `interval` - The time duration between each tick.
    ///
    /// # Examples
    ///
    /// ```ignore
    /// use mechutil::CommandFsmHandle;
    /// use std::time::Duration;
    ///
    /// let handle = CommandFsmHandle::new(Duration::from_secs(1));
    /// ```    
    pub fn new(handler: Box<dyn HeartbeatHandler + Send>, interval: Duration) -> Self {
        let (sender, receiver) = mpsc::channel(8);
        let actor = HeartbeatActor::new(handler);

        tokio::spawn(run_heartbeat(actor, receiver, interval));

        Self { sender: sender }
    }

    /// Shuts down the Actor.
    ///
    /// # Examples
    ///
    /// ```ignore
    /// let _ = actor.shutdown().await;
    /// ```    
    pub async fn shutdown(&self) -> Result<(), anyhow::Error> {
        match self.sender.send(HeartbeatMessage::Shutdown).await {
            Ok(_) => Ok(()),
            Err(err) => return Err(anyhow!("Error shutting down Actor: {}", err)),
        }
    }
}