cooplan_state_tracker/
state_tracker.rs

1use crate::error::{Error, ErrorKind};
2use crate::tracked_data::TrackedData;
3
4use tokio::net::UnixDatagram;
5use tokio::sync::mpsc::Receiver;
6
7/// Receives state updates from functioning parts of any program
8/// and proceeds to output them through an UnixDatagram socket.
9///
10/// The purpose which it full-fills is to allow microservices to communicate
11/// the current state of all their functionalities easily through a standardized way.
12pub struct StateTracker {
13    receiver: Receiver<TrackedData>,
14    output_sender: UnixDatagram,
15    output_receiver_path: String,
16}
17
18impl StateTracker {
19    /// Tries to create an instance of StateTracker.
20    ///
21    /// # Arguments
22    /// * `output_sender_path` - Path to the UnixDatagram socket that will send the outputs.
23    /// * `output_receiver_path` - Path to the UnixDatagram socket that will receive the outputs.
24    /// * `receiver` - Receiver of TrackedData objects.
25    pub fn try_new(
26        output_sender_path: &str,
27        output_receiver_path: &str,
28        receiver: Receiver<TrackedData>,
29    ) -> Result<Self, Error> {
30        let output_sender = match UnixDatagram::bind(output_sender_path) {
31            Ok(output) => output,
32            Err(error) => {
33                return Err(Error::new(
34                    ErrorKind::InternalFailure,
35                    format!("failed to bind to output path: {}", error),
36                ))
37            }
38        };
39
40        Ok(Self {
41            receiver,
42            output_sender,
43            output_receiver_path: output_receiver_path.to_string(),
44        })
45    }
46
47    pub async fn run(mut self) {
48        loop {
49            match self.receiver.recv().await {
50                Some(tracked_data) => {
51                    match serde_json::to_vec(&tracked_data) {
52                        Ok(serialized_data) => {
53                            match self
54                                .output_sender
55                                .send_to(serialized_data.as_slice(), &self.output_receiver_path)
56                                .await
57                            {
58                                Ok(_) => {
59                                    log::info!("sent data to output socket");
60                                }
61                                Err(error) => {
62                                    log::error!("failed to write to output socket: {}", error)
63                                }
64                            }
65                        }
66                        Err(error) => log::error!("failed to serialize tracked data: {}", error),
67                    };
68                }
69                None => (),
70            }
71        }
72    }
73}
74
75#[cfg(test)]
76use crate::state::State;
77use std::time::{Duration, SystemTime};
78use tokio::io::AsyncWriteExt;
79use tokio::time::timeout;
80
81#[tokio::test]
82async fn correct_output_retrieved() {
83    const SENDER_PATH: &str = "/tmp/cooplan_state_tracker_test_sender.sock";
84    const RECEIVER_PATH: &str = "/tmp/cooplan_state_tracker_test_receiver.sock";
85    const TEST_ID: &str = "test_id";
86
87    tokio::fs::remove_file(SENDER_PATH).await;
88    tokio::fs::remove_file(RECEIVER_PATH).await;
89
90    let (sender, receiver) = tokio::sync::mpsc::channel(1024);
91
92    let output_receiver = tokio::net::UnixDatagram::bind(RECEIVER_PATH).unwrap();
93
94    let state_tracker = StateTracker::try_new(SENDER_PATH, RECEIVER_PATH, receiver).unwrap();
95
96    tokio::spawn(state_tracker.run());
97
98    sender
99        .send(TrackedData::new(
100            TEST_ID.to_string(),
101            State::Idle,
102            SystemTime::now(),
103        ))
104        .await
105        .expect("failed to send data");
106
107    let mut buffer = [0; 1024];
108
109    let length = timeout(Duration::from_secs(3), output_receiver.recv(&mut buffer))
110        .await
111        .unwrap()
112        .unwrap();
113
114    let data = &buffer[..length];
115    let tracker_data = serde_json::from_slice::<TrackedData>(data).unwrap();
116
117    assert_eq!(tracker_data.id, TEST_ID);
118    assert_eq!(tracker_data.state, State::Idle);
119}