state_tracker/
state_tracker_client.rs

1use crate::error::{Error, ErrorKind};
2use crate::state::State;
3use crate::state_tracker::StateTracker;
4use crate::state_tracking_config::StateTrackingConfig;
5use crate::tracked_data;
6use crate::tracked_data::TrackedData;
7use tokio::time::Instant;
8
9#[derive(Clone)]
10pub struct StateTrackerClient {
11    id: String,
12    state_sender: tokio::sync::mpsc::Sender<TrackedData>,
13    latest_update: Instant,
14    update_interval_in_seconds: u64,
15}
16
17impl StateTrackerClient {
18    fn new(
19        id: String,
20        state_sender: tokio::sync::mpsc::Sender<TrackedData>,
21        update_interval_in_seconds: u64,
22    ) -> StateTrackerClient {
23        StateTrackerClient {
24            id,
25            state_sender,
26            latest_update: Instant::now(),
27            update_interval_in_seconds,
28        }
29    }
30
31    pub fn set_id(&mut self, id: String) {
32        self.id = id;
33    }
34
35    pub async fn send_state(&self, state: State) -> Result<(), Error> {
36        // Avoid spamming Idle & Valid states.
37        if !state.is_error()
38            && self.latest_update.elapsed().as_secs() < self.update_interval_in_seconds
39        {
40            return Ok(());
41        }
42
43        let tracked_data = tracked_data::generate_state_tracking_data(&self.id, state);
44
45        match self.state_sender.send(tracked_data).await {
46            Ok(_) => (),
47            Err(error) => {
48                return Err(Error::new(
49                    ErrorKind::InternalFailure,
50                    format!("failed to send state to state tracker: {}", error),
51                ))
52            }
53        }
54
55        Ok(())
56    }
57}
58
59
60pub async fn build(
61    state_tracking_config: StateTrackingConfig,
62    state_tracking_channel_boundary: usize,
63) -> StateTrackerClient {
64    let (state_sender, state_receiver) =
65        tokio::sync::mpsc::channel(state_tracking_channel_boundary);
66
67    let state_update_interval = state_tracking_config.state_sender_interval_in_seconds;
68
69    tokio::spawn(async move {
70        let state_tracker = match StateTracker::try_new(
71            state_tracking_config.state_output_sender_path.as_str(),
72            state_tracking_config.state_output_receiver_path.as_str(),
73            state_receiver,
74        ) {
75            Ok(state_tracker) => state_tracker,
76            Err(error) => {
77                panic!("failed to initialize state tracker: {}", error);
78            }
79        };
80
81        state_tracker.run().await;
82    });
83
84    StateTrackerClient::new("default".to_string(), state_sender, state_update_interval)
85}
86
87#[cfg(test)]
88#[tokio::test]
89pub async fn avoids_spamming_idle_and_active_states() {
90    const ID: &str = "ID";
91    const UPDATE_INTERVAL_IN_SECONDS: u64 = 5;
92
93    let (state_sender, mut state_receiver) = tokio::sync::mpsc::channel::<TrackedData>(5);
94
95    let state_tracker_client =
96        StateTrackerClient::new(ID.to_string(), state_sender, UPDATE_INTERVAL_IN_SECONDS);
97
98    state_tracker_client.send_state(State::Valid).await.unwrap();
99
100    match state_receiver.try_recv() {
101        Ok(_) => panic!("should not have received a state"),
102        Err(error) => assert_eq!(error, tokio::sync::mpsc::error::TryRecvError::Empty),
103    }
104}
105
106#[tokio::test]
107pub async fn error_state_is_instantly_set() {
108    const ID: &str = "ID";
109    const UPDATE_INTERVAL_IN_SECONDS: u64 = 5;
110    const ERROR_MESSAGE: &str = "TEST_ERROR";
111
112    let (state_sender, mut state_receiver) = tokio::sync::mpsc::channel::<TrackedData>(5);
113
114    let state_tracker_client =
115        StateTrackerClient::new(ID.to_string(), state_sender, UPDATE_INTERVAL_IN_SECONDS);
116
117    state_tracker_client
118        .send_state(State::Error(ERROR_MESSAGE.to_string()))
119        .await
120        .unwrap();
121
122    match state_receiver.try_recv() {
123        Ok(tracked_data) => match tracked_data.state {
124            State::Error(_) => {
125                assert_eq!(tracked_data.id, ID);
126                assert_eq!(tracked_data.state, State::Error(ERROR_MESSAGE.to_string()));
127            }
128            _ => panic!("should have received an error state"),
129        },
130        Err(error) => panic!("should have received a state"),
131    }
132}