vmix_tcp/
commands.rs

1use crate::acts::ActivatorsData;
2use crate::commands::Status::Length;
3use std::{
4    collections::HashMap,
5    fmt::{self, Display},
6    io::Read,
7    net::TcpStream,
8    time::Duration,
9};
10
11pub type InputNumber = u16; // 0~1000
12
13#[derive(Debug)]
14pub enum Status {
15    OK,             // "OK"
16    ER,             // "ER"
17    Length(u64),    // Length of body
18    Detail(String), // detail data
19}
20
21impl From<String> for Status {
22    fn from(value: String) -> Self {
23        let value = value.as_str();
24        match value {
25            "OK" => Self::OK,
26            "ER" => Self::ER,
27            _ => {
28                if let Ok(length) = value.parse::<u64>() {
29                    Length(length)
30                } else {
31                    Self::Detail(value.to_string())
32                }
33            }
34        }
35    }
36}
37
38#[derive(Debug)]
39pub struct TallyResponse {
40    pub status: Status,
41    pub body: HashMap<InputNumber, TallyData>,
42}
43#[derive(Debug)]
44pub enum TallyData {
45    OFF,
46    PROGRAM,
47    PREVIEW,
48}
49impl From<char> for TallyData {
50    fn from(value: char) -> Self {
51        match value {
52            '0' => TallyData::OFF,
53            '1' => TallyData::PROGRAM,
54            '2' => TallyData::PREVIEW,
55            _ => TallyData::OFF, // NO MATCHING PATTERN
56        }
57    }
58}
59
60#[derive(Debug)]
61pub struct FunctionResponse {
62    pub status: Status,
63    pub body: Option<String>,
64}
65
66#[derive(Debug)]
67pub struct XMLResponse {
68    pub status: Status,
69    pub body: String, // Change to String for raw XML content
70}
71#[derive(Debug)]
72pub struct XMLTextResponse {
73    pub status: Status,
74    pub body: Option<String>,
75}
76#[derive(Debug)]
77pub struct SubscribeResponse {
78    pub status: Status,
79    pub body: Option<String>,
80}
81#[derive(Debug)]
82pub struct UnsubscribeResponse {
83    pub status: Status,
84    pub body: Option<String>,
85}
86#[derive(Debug)]
87pub struct VersionResponse {
88    pub status: Status,
89    pub version: Option<String>, // TODO: parse semver
90}
91
92#[derive(Debug)]
93pub struct ActivatorsResponse {
94    pub status: Status,
95    pub body: ActivatorsData,
96}
97
98#[derive(Debug)]
99pub enum RecvCommand {
100    TALLY(TallyResponse),
101    FUNCTION(FunctionResponse),
102    ACTS(ActivatorsResponse),
103    XML(XMLResponse),
104    XMLTEXT(XMLTextResponse),
105    SUBSCRIBE(SubscribeResponse),
106    UNSUBSCRIBE(UnsubscribeResponse),
107    QUIT,
108    VERSION(VersionResponse),
109}
110
111pub enum SendCommand {
112    TALLY,
113    FUNCTION(String, Option<String>),
114    ACTS(String, usize),
115    XML,
116    XMLTEXT(String),
117    SUBSCRIBE(SUBSCRIBECommand),
118    UNSUBSCRIBE(SUBSCRIBECommand),
119    QUIT,
120    VERSION,
121
122    RAW(String),
123}
124
125// SendCommandとRecvCommandはマルチスレッド環境で安全に使用できる
126unsafe impl Send for SendCommand {}
127unsafe impl Sync for SendCommand {}
128
129unsafe impl Send for RecvCommand {}
130unsafe impl Sync for RecvCommand {}
131
132// 関連する構造体とenumにもSend + Syncを実装
133unsafe impl Send for TallyData {}
134unsafe impl Sync for TallyData {}
135
136unsafe impl Send for TallyResponse {}
137unsafe impl Sync for TallyResponse {}
138
139unsafe impl Send for FunctionResponse {}
140unsafe impl Sync for FunctionResponse {}
141
142unsafe impl Send for XMLResponse {}
143unsafe impl Sync for XMLResponse {}
144
145unsafe impl Send for XMLTextResponse {}
146unsafe impl Sync for XMLTextResponse {}
147
148unsafe impl Send for SubscribeResponse {}
149unsafe impl Sync for SubscribeResponse {}
150
151unsafe impl Send for UnsubscribeResponse {}
152unsafe impl Sync for UnsubscribeResponse {}
153
154unsafe impl Send for VersionResponse {}
155unsafe impl Sync for VersionResponse {}
156
157unsafe impl Send for ActivatorsResponse {}
158unsafe impl Sync for ActivatorsResponse {}
159
160unsafe impl Send for SUBSCRIBECommand {}
161unsafe impl Sync for SUBSCRIBECommand {}
162
163unsafe impl Send for Status {}
164unsafe impl Sync for Status {}
165
166impl From<SendCommand> for Vec<u8> {
167    fn from(command: SendCommand) -> Self {
168        match command {
169            SendCommand::TALLY => "TALLY\r\n".as_bytes().to_vec(),
170            SendCommand::FUNCTION(func, query) => {
171                format!("FUNCTION {} {}\r\n", func, query.unwrap_or("".to_string())).into_bytes()
172            }
173            SendCommand::ACTS(command, input) => {
174                format!("ACTS {} {}\r\n", command, input).into_bytes()
175            }
176            SendCommand::XML => "XML\r\n".as_bytes().to_vec(),
177            SendCommand::XMLTEXT(path) => format!("XMLTEXT {}\r\n", path).into_bytes(),
178            SendCommand::SUBSCRIBE(command) => format!("SUBSCRIBE {}\r\n", command).into_bytes(),
179            SendCommand::UNSUBSCRIBE(command) => {
180                format!("UNSUBSCRIBE {}\r\n", command).into_bytes()
181            }
182            SendCommand::QUIT => "QUIT\r\n".as_bytes().to_vec(),
183            SendCommand::VERSION => "VERSION\r\n".as_bytes().to_vec(),
184            SendCommand::RAW(raw) => raw.into_bytes(),
185        }
186    }
187}
188
189pub enum SUBSCRIBECommand {
190    TALLY,
191    ACTS,
192}
193
194impl Display for SUBSCRIBECommand {
195    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
196        match self {
197            Self::TALLY => write!(f, "TALLY"),
198            Self::ACTS => write!(f, "ACTS"),
199        }
200    }
201}
202
203impl TryFrom<&mut TcpStream> for RecvCommand {
204    type Error = anyhow::Error;
205
206    fn try_from(stream: &mut TcpStream) -> Result<Self, Self::Error> {
207        // Read directly from TcpStream to avoid BufReader buffering issues
208        let mut value = String::new();
209        let mut buffer = [0u8; 1];
210
211        // Read byte by byte until we hit \n
212        loop {
213            let bytes_read = stream.read(&mut buffer)?;
214            if bytes_read == 0 {
215                return Err(anyhow::anyhow!(std::io::Error::new(
216                    std::io::ErrorKind::ConnectionAborted,
217                    "connection aborted"
218                )));
219            }
220
221            let ch = buffer[0] as char;
222            value.push(ch);
223
224            if ch == '\n' {
225                break;
226            }
227        }
228
229        // remove \r\n
230        let value = value.lines().collect::<String>();
231
232        let commands: Vec<String> = value.split_whitespace().map(|s| s.to_string()).collect();
233
234        // first element
235        let command = commands
236            .first()
237            .ok_or_else(|| anyhow::anyhow!("Empty command"))?;
238        let status: Status = commands.get(1).unwrap().to_owned().into();
239        let body: Option<String> = commands.get(2).cloned();
240        match command.as_str() {
241            // Example Response: TALLY OK 0121...\r\n
242            "TALLY" => {
243                let mut tally_map = HashMap::new();
244                // check if status is ok
245                let chars: Vec<char> = body.unwrap().chars().collect::<Vec<char>>();
246                for (i, char) in chars.iter().enumerate() {
247                    let tally: TallyData = (*char).into();
248                    let mut index = i as InputNumber;
249                    index += 1;
250                    tally_map.insert(index, tally);
251                }
252                Ok(Self::TALLY(TallyResponse {
253                    status,
254                    body: tally_map,
255                }))
256            }
257            // Example Response: FUNCTION OK PreviewInput\r\n
258            // Example Response: FUNCTION ER Error message\r\n
259            "FUNCTION" => Ok(Self::FUNCTION(FunctionResponse { status, body })),
260            // Example Response: ACTS OK Input 1 1\r\n
261            "ACTS" => {
262                // 2以降のベクターを使用する
263                let len = commands.len();
264                let raw = &commands.clone()[2..len];
265                let body = ActivatorsData::try_from(raw)?;
266                Ok(Self::ACTS(ActivatorsResponse { status, body }))
267            }
268            /*
269            Example Response: XML 37\r\n
270            <vmix><version>x.x.x.x</version></vmix>
271            */
272            "XML" => {
273                if let Length(len) = &status {
274                    // Read exact number of bytes with timeout handling
275                    let mut xml_buffer = vec![0u8; *len as usize];
276                    let mut bytes_read = 0;
277                    let start_time = std::time::Instant::now();
278                    let read_timeout = Duration::from_secs(5); // 5 second timeout for XML reads
279
280                    while bytes_read < xml_buffer.len() {
281                        match stream.read(&mut xml_buffer[bytes_read..]) {
282                            Ok(0) => {
283                                // EOF reached before reading all expected bytes
284                                return Err(anyhow::anyhow!(std::io::Error::new(
285                                    std::io::ErrorKind::ConnectionAborted,
286                                    "connection aborted"
287                                )));
288                            }
289                            Ok(n) => {
290                                bytes_read += n;
291                            }
292                            Err(e) => match e.kind() {
293                                std::io::ErrorKind::WouldBlock => {
294                                    // Non-blocking read would block, check timeout
295                                    if start_time.elapsed() > read_timeout {
296                                        return Err(anyhow::anyhow!("XML read timeout"));
297                                    }
298                                    std::thread::sleep(Duration::from_millis(1));
299                                    continue;
300                                }
301                                std::io::ErrorKind::ConnectionAborted
302                                | std::io::ErrorKind::ConnectionReset
303                                | std::io::ErrorKind::UnexpectedEof => {
304                                    return Err(anyhow::anyhow!(e));
305                                }
306                                _ => return Err(anyhow::anyhow!(e)),
307                            },
308                        }
309                    }
310
311                    let xml = String::from_utf8(xml_buffer)?.trim_end().to_string();
312                    return Ok(Self::XML(XMLResponse { status, body: xml }));
313                }
314                Err(anyhow::anyhow!("Failed to read XML"))
315            }
316            "XMLTEXT" => Ok(Self::XMLTEXT(XMLTextResponse { status, body })),
317            "SUBSCRIBE" => Ok(Self::SUBSCRIBE(SubscribeResponse { status, body })),
318            "UNSUBSCRIBE" => Ok(Self::UNSUBSCRIBE(UnsubscribeResponse { status, body })),
319            "QUIT" => Ok(Self::QUIT), // No body
320            "VERSION" => Ok(Self::VERSION(VersionResponse {
321                status,
322                version: body,
323            })),
324            _ => Err(anyhow::anyhow!("No matching command found: {:?}", command)),
325        }
326    }
327}