cooplan_state_tracker/
state_tracker.rs1use crate::error::{Error, ErrorKind};
2use crate::tracked_data::TrackedData;
3
4use tokio::net::UnixDatagram;
5use tokio::sync::mpsc::Receiver;
6
7pub struct StateTracker {
13 receiver: Receiver<TrackedData>,
14 output_sender: UnixDatagram,
15 output_receiver_path: String,
16}
17
18impl StateTracker {
19 pub fn try_new(
26 output_sender_path: &str,
27 output_receiver_path: &str,
28 receiver: Receiver<TrackedData>,
29 ) -> Result<Self, Error> {
30 let output_sender = match UnixDatagram::bind(output_sender_path) {
31 Ok(output) => output,
32 Err(error) => {
33 return Err(Error::new(
34 ErrorKind::InternalFailure,
35 format!("failed to bind to output path: {}", error),
36 ))
37 }
38 };
39
40 Ok(Self {
41 receiver,
42 output_sender,
43 output_receiver_path: output_receiver_path.to_string(),
44 })
45 }
46
47 pub async fn run(mut self) {
48 loop {
49 match self.receiver.recv().await {
50 Some(tracked_data) => {
51 match serde_json::to_vec(&tracked_data) {
52 Ok(serialized_data) => {
53 match self
54 .output_sender
55 .send_to(serialized_data.as_slice(), &self.output_receiver_path)
56 .await
57 {
58 Ok(_) => {
59 log::info!("sent data to output socket");
60 }
61 Err(error) => {
62 log::error!("failed to write to output socket: {}", error)
63 }
64 }
65 }
66 Err(error) => log::error!("failed to serialize tracked data: {}", error),
67 };
68 }
69 None => (),
70 }
71 }
72 }
73}
74
75#[cfg(test)]
76use crate::state::State;
77use std::time::{Duration, SystemTime};
78use tokio::io::AsyncWriteExt;
79use tokio::time::timeout;
80
81#[tokio::test]
82async fn correct_output_retrieved() {
83 const SENDER_PATH: &str = "/tmp/cooplan_state_tracker_test_sender.sock";
84 const RECEIVER_PATH: &str = "/tmp/cooplan_state_tracker_test_receiver.sock";
85 const TEST_ID: &str = "test_id";
86
87 tokio::fs::remove_file(SENDER_PATH).await;
88 tokio::fs::remove_file(RECEIVER_PATH).await;
89
90 let (sender, receiver) = tokio::sync::mpsc::channel(1024);
91
92 let output_receiver = tokio::net::UnixDatagram::bind(RECEIVER_PATH).unwrap();
93
94 let state_tracker = StateTracker::try_new(SENDER_PATH, RECEIVER_PATH, receiver).unwrap();
95
96 tokio::spawn(state_tracker.run());
97
98 sender
99 .send(TrackedData::new(
100 TEST_ID.to_string(),
101 State::Idle,
102 SystemTime::now(),
103 ))
104 .await
105 .expect("failed to send data");
106
107 let mut buffer = [0; 1024];
108
109 let length = timeout(Duration::from_secs(3), output_receiver.recv(&mut buffer))
110 .await
111 .unwrap()
112 .unwrap();
113
114 let data = &buffer[..length];
115 let tracker_data = serde_json::from_slice::<TrackedData>(data).unwrap();
116
117 assert_eq!(tracker_data.id, TEST_ID);
118 assert_eq!(tracker_data.state, State::Idle);
119}