cooplan_state_tracker/
state_tracker_client.rs1use crate::error::{Error, ErrorKind};
2use crate::state::State;
3use crate::state_tracker::StateTracker;
4use crate::state_tracking_config::StateTrackingConfig;
5use crate::tracked_data;
6use crate::tracked_data::TrackedData;
7use tokio::time::Instant;
8
9#[derive(Clone)]
10pub struct StateTrackerClient {
11 id: String,
12 state_sender: tokio::sync::mpsc::Sender<TrackedData>,
13 latest_update: Instant,
14 update_interval_in_seconds: u64,
15}
16
17impl StateTrackerClient {
18 fn new(
19 id: String,
20 state_sender: tokio::sync::mpsc::Sender<TrackedData>,
21 update_interval_in_seconds: u64,
22 ) -> StateTrackerClient {
23 StateTrackerClient {
24 id,
25 state_sender,
26 latest_update: Instant::now(),
27 update_interval_in_seconds,
28 }
29 }
30
31 pub fn set_id(&mut self, id: String) {
32 self.id = id;
33 }
34
35 pub async fn send_state(&self, state: State) -> Result<(), Error> {
36 if !state.is_error()
38 && self.latest_update.elapsed().as_secs() < self.update_interval_in_seconds
39 {
40 return Ok(());
41 }
42
43 let tracked_data = tracked_data::generate_state_tracking_data(&self.id, state);
44
45 match self.state_sender.send(tracked_data).await {
46 Ok(_) => (),
47 Err(error) => {
48 return Err(Error::new(
49 ErrorKind::InternalFailure,
50 format!("failed to send state to state tracker: {}", error),
51 ))
52 }
53 }
54
55 Ok(())
56 }
57}
58
59#[cfg(test)]
60#[tokio::test]
61pub async fn avoids_spamming_idle_and_active_states() {
62 const ID: &str = "ID";
63 const UPDATE_INTERVAL_IN_SECONDS: u64 = 5;
64
65 let (state_sender, mut state_receiver) = tokio::sync::mpsc::channel::<TrackedData>(5);
66
67 let state_tracker_client =
68 StateTrackerClient::new(ID.to_string(), state_sender, UPDATE_INTERVAL_IN_SECONDS);
69
70 state_tracker_client.send_state(State::Valid).await.unwrap();
71
72 match state_receiver.try_recv() {
73 Ok(_) => panic!("should not have received a state"),
74 Err(error) => assert_eq!(error, tokio::sync::mpsc::error::TryRecvError::Empty),
75 }
76}
77
78#[tokio::test]
79pub async fn error_state_is_instantly_set() {
80 const ID: &str = "ID";
81 const UPDATE_INTERVAL_IN_SECONDS: u64 = 5;
82 const ERROR_MESSAGE: &str = "TEST_ERROR";
83
84 let (state_sender, mut state_receiver) = tokio::sync::mpsc::channel::<TrackedData>(5);
85
86 let state_tracker_client =
87 StateTrackerClient::new(ID.to_string(), state_sender, UPDATE_INTERVAL_IN_SECONDS);
88
89 state_tracker_client
90 .send_state(State::Error(ERROR_MESSAGE.to_string()))
91 .await
92 .unwrap();
93
94 match state_receiver.try_recv() {
95 Ok(tracked_data) => match tracked_data.state {
96 State::Error(_) => {
97 assert_eq!(tracked_data.id, ID);
98 assert_eq!(tracked_data.state, State::Error(ERROR_MESSAGE.to_string()));
99 }
100 _ => panic!("should have received an error state"),
101 },
102 Err(error) => panic!("should have received a state"),
103 }
104}
105
106pub async fn build(
107 state_tracking_config: StateTrackingConfig,
108 state_tracking_channel_boundary: usize,
109) -> StateTrackerClient {
110 let (state_sender, state_receiver) =
111 tokio::sync::mpsc::channel(state_tracking_channel_boundary);
112
113 let state_update_interval = state_tracking_config.state_sender_interval_in_seconds;
114
115 tokio::spawn(async move {
116 let state_tracker = match StateTracker::try_new(
117 state_tracking_config.state_output_sender_path.as_str(),
118 state_tracking_config.state_output_receiver_path.as_str(),
119 state_receiver,
120 ) {
121 Ok(state_tracker) => state_tracker,
122 Err(error) => {
123 panic!("failed to initialize state tracker: {}", error);
124 }
125 };
126
127 state_tracker.run().await;
128 });
129
130 StateTrackerClient::new("default".to_string(), state_sender, state_update_interval)
131}