1use crate::acts::ActivatorsData;
2use crate::commands::Status::Length;
3use std::{
4 collections::HashMap,
5 fmt::{self, Display},
6 io::Read,
7 net::TcpStream,
8 time::Duration,
9};
10
11pub type InputNumber = u16; #[derive(Debug)]
14pub enum Status {
15 OK, ER, Length(u64), Detail(String), }
20
21impl From<String> for Status {
22 fn from(value: String) -> Self {
23 let value = value.as_str();
24 match value {
25 "OK" => Self::OK,
26 "ER" => Self::ER,
27 _ => {
28 if let Ok(length) = value.parse::<u64>() {
29 Length(length)
30 } else {
31 Self::Detail(value.to_string())
32 }
33 }
34 }
35 }
36}
37
38#[derive(Debug)]
39pub struct TallyResponse {
40 pub status: Status,
41 pub body: HashMap<InputNumber, TallyData>,
42}
43#[derive(Debug)]
44pub enum TallyData {
45 OFF,
46 PROGRAM,
47 PREVIEW,
48}
49impl From<char> for TallyData {
50 fn from(value: char) -> Self {
51 match value {
52 '0' => TallyData::OFF,
53 '1' => TallyData::PROGRAM,
54 '2' => TallyData::PREVIEW,
55 _ => TallyData::OFF, }
57 }
58}
59
60#[derive(Debug)]
61pub struct FunctionResponse {
62 pub status: Status,
63 pub body: Option<String>,
64}
65
66#[derive(Debug)]
67pub struct XMLResponse {
68 pub status: Status,
69 pub body: String, }
71#[derive(Debug)]
72pub struct XMLTextResponse {
73 pub status: Status,
74 pub body: Option<String>,
75}
76#[derive(Debug)]
77pub struct SubscribeResponse {
78 pub status: Status,
79 pub body: Option<String>,
80}
81#[derive(Debug)]
82pub struct UnsubscribeResponse {
83 pub status: Status,
84 pub body: Option<String>,
85}
86#[derive(Debug)]
87pub struct VersionResponse {
88 pub status: Status,
89 pub version: Option<String>, }
91
92#[derive(Debug)]
93pub struct ActivatorsResponse {
94 pub status: Status,
95 pub body: ActivatorsData,
96}
97
98#[derive(Debug)]
99pub enum RecvCommand {
100 TALLY(TallyResponse),
101 FUNCTION(FunctionResponse),
102 ACTS(ActivatorsResponse),
103 XML(XMLResponse),
104 XMLTEXT(XMLTextResponse),
105 SUBSCRIBE(SubscribeResponse),
106 UNSUBSCRIBE(UnsubscribeResponse),
107 QUIT,
108 VERSION(VersionResponse),
109}
110
111pub enum SendCommand {
112 TALLY,
113 FUNCTION(String, Option<String>),
114 ACTS(String, usize),
115 XML,
116 XMLTEXT(String),
117 SUBSCRIBE(SUBSCRIBECommand),
118 UNSUBSCRIBE(SUBSCRIBECommand),
119 QUIT,
120 VERSION,
121
122 RAW(String),
123}
124
125unsafe impl Send for SendCommand {}
127unsafe impl Sync for SendCommand {}
128
129unsafe impl Send for RecvCommand {}
130unsafe impl Sync for RecvCommand {}
131
132unsafe impl Send for TallyData {}
134unsafe impl Sync for TallyData {}
135
136unsafe impl Send for TallyResponse {}
137unsafe impl Sync for TallyResponse {}
138
139unsafe impl Send for FunctionResponse {}
140unsafe impl Sync for FunctionResponse {}
141
142unsafe impl Send for XMLResponse {}
143unsafe impl Sync for XMLResponse {}
144
145unsafe impl Send for XMLTextResponse {}
146unsafe impl Sync for XMLTextResponse {}
147
148unsafe impl Send for SubscribeResponse {}
149unsafe impl Sync for SubscribeResponse {}
150
151unsafe impl Send for UnsubscribeResponse {}
152unsafe impl Sync for UnsubscribeResponse {}
153
154unsafe impl Send for VersionResponse {}
155unsafe impl Sync for VersionResponse {}
156
157unsafe impl Send for ActivatorsResponse {}
158unsafe impl Sync for ActivatorsResponse {}
159
160unsafe impl Send for SUBSCRIBECommand {}
161unsafe impl Sync for SUBSCRIBECommand {}
162
163unsafe impl Send for Status {}
164unsafe impl Sync for Status {}
165
166impl From<SendCommand> for Vec<u8> {
167 fn from(command: SendCommand) -> Self {
168 match command {
169 SendCommand::TALLY => "TALLY\r\n".as_bytes().to_vec(),
170 SendCommand::FUNCTION(func, query) => {
171 format!("FUNCTION {} {}\r\n", func, query.unwrap_or("".to_string())).into_bytes()
172 }
173 SendCommand::ACTS(command, input) => {
174 format!("ACTS {} {}\r\n", command, input).into_bytes()
175 }
176 SendCommand::XML => "XML\r\n".as_bytes().to_vec(),
177 SendCommand::XMLTEXT(path) => format!("XMLTEXT {}\r\n", path).into_bytes(),
178 SendCommand::SUBSCRIBE(command) => format!("SUBSCRIBE {}\r\n", command).into_bytes(),
179 SendCommand::UNSUBSCRIBE(command) => {
180 format!("UNSUBSCRIBE {}\r\n", command).into_bytes()
181 }
182 SendCommand::QUIT => "QUIT\r\n".as_bytes().to_vec(),
183 SendCommand::VERSION => "VERSION\r\n".as_bytes().to_vec(),
184 SendCommand::RAW(raw) => raw.into_bytes(),
185 }
186 }
187}
188
189pub enum SUBSCRIBECommand {
190 TALLY,
191 ACTS,
192}
193
194impl Display for SUBSCRIBECommand {
195 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196 match self {
197 Self::TALLY => write!(f, "TALLY"),
198 Self::ACTS => write!(f, "ACTS"),
199 }
200 }
201}
202
203impl TryFrom<&mut TcpStream> for RecvCommand {
204 type Error = anyhow::Error;
205
206 fn try_from(stream: &mut TcpStream) -> Result<Self, Self::Error> {
207 let mut value = String::new();
209 let mut buffer = [0u8; 1];
210
211 loop {
213 let bytes_read = stream.read(&mut buffer)?;
214 if bytes_read == 0 {
215 return Err(anyhow::anyhow!(std::io::Error::new(
216 std::io::ErrorKind::ConnectionAborted,
217 "connection aborted"
218 )));
219 }
220
221 let ch = buffer[0] as char;
222 value.push(ch);
223
224 if ch == '\n' {
225 break;
226 }
227 }
228
229 let value = value.lines().collect::<String>();
231
232 let commands: Vec<String> = value.split_whitespace().map(|s| s.to_string()).collect();
233
234 let command = commands
236 .first()
237 .ok_or_else(|| anyhow::anyhow!("Empty command"))?;
238 let status: Status = commands.get(1).unwrap().to_owned().into();
239 let body: Option<String> = commands.get(2).cloned();
240 match command.as_str() {
241 "TALLY" => {
243 let mut tally_map = HashMap::new();
244 let chars: Vec<char> = body.unwrap().chars().collect::<Vec<char>>();
246 for (i, char) in chars.iter().enumerate() {
247 let tally: TallyData = (*char).into();
248 let mut index = i as InputNumber;
249 index += 1;
250 tally_map.insert(index, tally);
251 }
252 Ok(Self::TALLY(TallyResponse {
253 status,
254 body: tally_map,
255 }))
256 }
257 "FUNCTION" => Ok(Self::FUNCTION(FunctionResponse { status, body })),
260 "ACTS" => {
262 let len = commands.len();
264 let raw = &commands.clone()[2..len];
265 let body = ActivatorsData::try_from(raw)?;
266 Ok(Self::ACTS(ActivatorsResponse { status, body }))
267 }
268 "XML" => {
273 if let Length(len) = &status {
274 let mut xml_buffer = vec![0u8; *len as usize];
276 let mut bytes_read = 0;
277 let start_time = std::time::Instant::now();
278 let read_timeout = Duration::from_secs(5); while bytes_read < xml_buffer.len() {
281 match stream.read(&mut xml_buffer[bytes_read..]) {
282 Ok(0) => {
283 return Err(anyhow::anyhow!(std::io::Error::new(
285 std::io::ErrorKind::ConnectionAborted,
286 "connection aborted"
287 )));
288 }
289 Ok(n) => {
290 bytes_read += n;
291 }
292 Err(e) => match e.kind() {
293 std::io::ErrorKind::WouldBlock => {
294 if start_time.elapsed() > read_timeout {
296 return Err(anyhow::anyhow!("XML read timeout"));
297 }
298 std::thread::sleep(Duration::from_millis(1));
299 continue;
300 }
301 std::io::ErrorKind::ConnectionAborted
302 | std::io::ErrorKind::ConnectionReset
303 | std::io::ErrorKind::UnexpectedEof => {
304 return Err(anyhow::anyhow!(e));
305 }
306 _ => return Err(anyhow::anyhow!(e)),
307 },
308 }
309 }
310
311 let xml = String::from_utf8(xml_buffer)?.trim_end().to_string();
312 return Ok(Self::XML(XMLResponse { status, body: xml }));
313 }
314 Err(anyhow::anyhow!("Failed to read XML"))
315 }
316 "XMLTEXT" => Ok(Self::XMLTEXT(XMLTextResponse { status, body })),
317 "SUBSCRIBE" => Ok(Self::SUBSCRIBE(SubscribeResponse { status, body })),
318 "UNSUBSCRIBE" => Ok(Self::UNSUBSCRIBE(UnsubscribeResponse { status, body })),
319 "QUIT" => Ok(Self::QUIT), "VERSION" => Ok(Self::VERSION(VersionResponse {
321 status,
322 version: body,
323 })),
324 _ => Err(anyhow::anyhow!("No matching command found: {:?}", command)),
325 }
326 }
327}