state_tracker/
state_tracker_client.rs1use crate::error::{Error, ErrorKind};
2use crate::state::State;
3use crate::state_tracker::StateTracker;
4use crate::state_tracking_config::StateTrackingConfig;
5use crate::tracked_data;
6use crate::tracked_data::TrackedData;
7use tokio::time::Instant;
8
9#[derive(Clone)]
10pub struct StateTrackerClient {
11 id: String,
12 state_sender: tokio::sync::mpsc::Sender<TrackedData>,
13 latest_update: Instant,
14 update_interval_in_seconds: u64,
15}
16
17impl StateTrackerClient {
18 fn new(
19 id: String,
20 state_sender: tokio::sync::mpsc::Sender<TrackedData>,
21 update_interval_in_seconds: u64,
22 ) -> StateTrackerClient {
23 StateTrackerClient {
24 id,
25 state_sender,
26 latest_update: Instant::now(),
27 update_interval_in_seconds,
28 }
29 }
30
31 pub fn set_id(&mut self, id: String) {
32 self.id = id;
33 }
34
35 pub async fn send_state(&self, state: State) -> Result<(), Error> {
36 if !state.is_error()
38 && self.latest_update.elapsed().as_secs() < self.update_interval_in_seconds
39 {
40 return Ok(());
41 }
42
43 let tracked_data = tracked_data::generate_state_tracking_data(&self.id, state);
44
45 match self.state_sender.send(tracked_data).await {
46 Ok(_) => (),
47 Err(error) => {
48 return Err(Error::new(
49 ErrorKind::InternalFailure,
50 format!("failed to send state to state tracker: {}", error),
51 ))
52 }
53 }
54
55 Ok(())
56 }
57}
58
59
60pub async fn build(
61 state_tracking_config: StateTrackingConfig,
62 state_tracking_channel_boundary: usize,
63) -> StateTrackerClient {
64 let (state_sender, state_receiver) =
65 tokio::sync::mpsc::channel(state_tracking_channel_boundary);
66
67 let state_update_interval = state_tracking_config.state_sender_interval_in_seconds;
68
69 tokio::spawn(async move {
70 let state_tracker = match StateTracker::try_new(
71 state_tracking_config.state_output_sender_path.as_str(),
72 state_tracking_config.state_output_receiver_path.as_str(),
73 state_receiver,
74 ) {
75 Ok(state_tracker) => state_tracker,
76 Err(error) => {
77 panic!("failed to initialize state tracker: {}", error);
78 }
79 };
80
81 state_tracker.run().await;
82 });
83
84 StateTrackerClient::new("default".to_string(), state_sender, state_update_interval)
85}
86
87#[cfg(test)]
88#[tokio::test]
89pub async fn avoids_spamming_idle_and_active_states() {
90 const ID: &str = "ID";
91 const UPDATE_INTERVAL_IN_SECONDS: u64 = 5;
92
93 let (state_sender, mut state_receiver) = tokio::sync::mpsc::channel::<TrackedData>(5);
94
95 let state_tracker_client =
96 StateTrackerClient::new(ID.to_string(), state_sender, UPDATE_INTERVAL_IN_SECONDS);
97
98 state_tracker_client.send_state(State::Valid).await.unwrap();
99
100 match state_receiver.try_recv() {
101 Ok(_) => panic!("should not have received a state"),
102 Err(error) => assert_eq!(error, tokio::sync::mpsc::error::TryRecvError::Empty),
103 }
104}
105
106#[tokio::test]
107pub async fn error_state_is_instantly_set() {
108 const ID: &str = "ID";
109 const UPDATE_INTERVAL_IN_SECONDS: u64 = 5;
110 const ERROR_MESSAGE: &str = "TEST_ERROR";
111
112 let (state_sender, mut state_receiver) = tokio::sync::mpsc::channel::<TrackedData>(5);
113
114 let state_tracker_client =
115 StateTrackerClient::new(ID.to_string(), state_sender, UPDATE_INTERVAL_IN_SECONDS);
116
117 state_tracker_client
118 .send_state(State::Error(ERROR_MESSAGE.to_string()))
119 .await
120 .unwrap();
121
122 match state_receiver.try_recv() {
123 Ok(tracked_data) => match tracked_data.state {
124 State::Error(_) => {
125 assert_eq!(tracked_data.id, ID);
126 assert_eq!(tracked_data.state, State::Error(ERROR_MESSAGE.to_string()));
127 }
128 _ => panic!("should have received an error state"),
129 },
130 Err(error) => panic!("should have received a state"),
131 }
132}