1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
//
// Copyright (C) 2024 Automated Design Corp.. All Rights Reserved.
// Created Date: 2024-09-06 07:32:01
// -----
// Last Modified: 2024-09-23 19:10:10
// -----
//
//
use anyhow::anyhow;
use async_trait::async_trait;
use log::{debug, error, info};
use std::time::Duration;
use tokio::sync::mpsc;
pub enum HeartbeatMessage {
Shutdown,
}
/// A trait that state machines can implement to handle custom command execution.
#[async_trait]
pub trait HeartbeatHandler {
/// Write the latest heartbeat count to the client/master. This is called independent
/// of the tick so that long operations don't interrupt the heartbeat.
///
/// If an error is returned, the FSM will shift to an error state. If Ok(()), the FSM proceeds
/// normally.
///
/// The default implementation does nothing, but usage of the heartbeat is recommended.
async fn heartbeat(&mut self, _heartbeat_count: i64) -> Result<(), anyhow::Error>;
/// Called only once as part of the shutdown process.
///
/// Default implementation does nothing.
fn on_shutdown(&mut self) -> Result<(), anyhow::Error> {
return Ok(());
}
}
pub struct HeartbeatActor {
/// This is the upstream struct that implements this CommandFsmActor
handler: Box<dyn HeartbeatHandler + Send>,
/// Last counter value sent with the last heartbeat. Should increment every time.
last_heartbeat: i64,
}
impl HeartbeatActor {
pub fn new(handler: Box<dyn HeartbeatHandler + Send>) -> Self {
Self {
handler: handler,
last_heartbeat: 0,
}
}
/// Tick writing the heartbeat on regular intervals.
async fn tick(&mut self) -> Result<(), anyhow::Error> {
self.last_heartbeat += 1;
return self.handler.heartbeat(self.last_heartbeat).await;
}
}
async fn run_heartbeat(
mut actor: HeartbeatActor,
mut receiver: mpsc::Receiver<HeartbeatMessage>,
interval: Duration,
) {
let mut ticker = tokio::time::interval(interval);
loop {
tokio::select! {
_ = ticker.tick() => {
// tick the FSM
if let Err(err) = actor.tick().await {
log::error!("Error ticking heartbeat. Heartbeat will shut down. Error: {}", err);
break;
}
},
msg = receiver.recv() => {
if let Some(m) = msg {
match m {
HeartbeatMessage::Shutdown => {
break;
}
}
}
}
}
}
debug!("run_command_fsm done exited loop. Shutting down actor...");
if let Err(err) = actor.handler.on_shutdown() {
error!("Error shutting down command fsm: {}", err);
return;
}
info!("run_command_fsm shutdown completed successfully.");
}
/// Handle that contains, starts and runs the CommandFsmActor.
pub struct HeartbeatHandle {
sender: mpsc::Sender<HeartbeatMessage>,
}
impl HeartbeatHandle {
/// Creates a new `CommandFsmActor` that ticks at the specified `interval`.
///
/// # Arguments
///
/// * `interval` - The time duration between each tick.
///
/// # Examples
///
/// ```ignore
/// use mechutil::CommandFsmHandle;
/// use std::time::Duration;
///
/// let handle = CommandFsmHandle::new(Duration::from_secs(1));
/// ```
pub fn new(handler: Box<dyn HeartbeatHandler + Send>, interval: Duration) -> Self {
let (sender, receiver) = mpsc::channel(8);
let actor = HeartbeatActor::new(handler);
tokio::spawn(run_heartbeat(actor, receiver, interval));
Self { sender: sender }
}
/// Shuts down the Actor.
///
/// # Examples
///
/// ```ignore
/// let _ = actor.shutdown().await;
/// ```
pub async fn shutdown(&self) -> Result<(), anyhow::Error> {
match self.sender.send(HeartbeatMessage::Shutdown).await {
Ok(_) => Ok(()),
Err(err) => return Err(anyhow!("Error shutting down Actor: {}", err)),
}
}
}