Skip to main content

vmix_tcp/
commands.rs

1use crate::acts::ActivatorsData;
2use crate::commands::Status::Length;
3use std::{
4    collections::HashMap,
5    fmt::{self, Display},
6    io::Read,
7    net::TcpStream,
8    time::Duration,
9};
10
11pub type InputNumber = u16; // 0~1000
12
13#[derive(Debug)]
14pub enum Status {
15    OK,             // "OK"
16    ER,             // "ER"
17    Length(u64),    // Length of body
18    Detail(String), // detail data
19}
20
21impl From<String> for Status {
22    fn from(value: String) -> Self {
23        let value = value.as_str();
24        match value {
25            "OK" => Self::OK,
26            "ER" => Self::ER,
27            _ => {
28                if let Ok(length) = value.parse::<u64>() {
29                    Length(length)
30                } else {
31                    Self::Detail(value.to_string())
32                }
33            }
34        }
35    }
36}
37
38#[derive(Debug)]
39pub struct TallyResponse {
40    pub status: Status,
41    pub body: HashMap<InputNumber, TallyData>,
42}
43#[derive(Debug)]
44pub enum TallyData {
45    OFF,
46    PROGRAM,
47    PREVIEW,
48}
49impl From<char> for TallyData {
50    fn from(value: char) -> Self {
51        match value {
52            '0' => TallyData::OFF,
53            '1' => TallyData::PROGRAM,
54            '2' => TallyData::PREVIEW,
55            _ => TallyData::OFF, // NO MATCHING PATTERN
56        }
57    }
58}
59
60#[derive(Debug)]
61pub struct FunctionResponse {
62    pub status: Status,
63    pub body: Option<String>,
64}
65
66#[derive(Debug)]
67pub struct XMLResponse {
68    pub status: Status,
69    pub body: String, // Change to String for raw XML content
70}
71#[derive(Debug)]
72pub struct XMLTextResponse {
73    pub status: Status,
74    pub body: Option<String>,
75}
76#[derive(Debug)]
77pub struct SubscribeResponse {
78    pub status: Status,
79    pub body: Option<String>,
80}
81#[derive(Debug)]
82pub struct UnsubscribeResponse {
83    pub status: Status,
84    pub body: Option<String>,
85}
86#[derive(Debug)]
87pub struct VersionResponse {
88    pub status: Status,
89    pub version: Option<String>, // TODO: parse semver
90}
91
92#[derive(Debug)]
93pub struct ActivatorsResponse {
94    pub status: Status,
95    pub body: ActivatorsData,
96}
97
98#[derive(Debug)]
99pub enum RecvCommand {
100    TALLY(TallyResponse),
101    FUNCTION(FunctionResponse),
102    ACTS(ActivatorsResponse),
103    XML(XMLResponse),
104    XMLTEXT(XMLTextResponse),
105    SUBSCRIBE(SubscribeResponse),
106    UNSUBSCRIBE(UnsubscribeResponse),
107    QUIT,
108    VERSION(VersionResponse),
109}
110
111pub enum SendCommand {
112    TALLY,
113    FUNCTION(String, Option<String>),
114    ACTS(String, Option<usize>),
115    XML,
116    XMLTEXT(String),
117    SUBSCRIBE(SUBSCRIBECommand),
118    UNSUBSCRIBE(SUBSCRIBECommand),
119    QUIT,
120    VERSION,
121
122    RAW(String),
123}
124
125// SendCommandとRecvCommandはマルチスレッド環境で安全に使用できる
126unsafe impl Send for SendCommand {}
127unsafe impl Sync for SendCommand {}
128
129unsafe impl Send for RecvCommand {}
130unsafe impl Sync for RecvCommand {}
131
132// 関連する構造体とenumにもSend + Syncを実装
133unsafe impl Send for TallyData {}
134unsafe impl Sync for TallyData {}
135
136unsafe impl Send for TallyResponse {}
137unsafe impl Sync for TallyResponse {}
138
139unsafe impl Send for FunctionResponse {}
140unsafe impl Sync for FunctionResponse {}
141
142unsafe impl Send for XMLResponse {}
143unsafe impl Sync for XMLResponse {}
144
145unsafe impl Send for XMLTextResponse {}
146unsafe impl Sync for XMLTextResponse {}
147
148unsafe impl Send for SubscribeResponse {}
149unsafe impl Sync for SubscribeResponse {}
150
151unsafe impl Send for UnsubscribeResponse {}
152unsafe impl Sync for UnsubscribeResponse {}
153
154unsafe impl Send for VersionResponse {}
155unsafe impl Sync for VersionResponse {}
156
157unsafe impl Send for ActivatorsResponse {}
158unsafe impl Sync for ActivatorsResponse {}
159
160unsafe impl Send for SUBSCRIBECommand {}
161unsafe impl Sync for SUBSCRIBECommand {}
162
163unsafe impl Send for Status {}
164unsafe impl Sync for Status {}
165
166impl From<SendCommand> for Vec<u8> {
167    fn from(command: SendCommand) -> Self {
168        match command {
169            SendCommand::TALLY => "TALLY\r\n".as_bytes().to_vec(),
170            SendCommand::FUNCTION(func, query) => {
171                format!("FUNCTION {} {}\r\n", func, query.unwrap_or("".to_string())).into_bytes()
172            }
173            SendCommand::ACTS(command, input) => {
174                if let Some(input_num) = input {
175                    format!("ACTS {} {}\r\n", command, input_num).into_bytes()
176                } else {
177                    format!("ACTS {}\r\n", command).into_bytes()
178                }
179            }
180            SendCommand::XML => "XML\r\n".as_bytes().to_vec(),
181            SendCommand::XMLTEXT(path) => format!("XMLTEXT {}\r\n", path).into_bytes(),
182            SendCommand::SUBSCRIBE(command) => format!("SUBSCRIBE {}\r\n", command).into_bytes(),
183            SendCommand::UNSUBSCRIBE(command) => {
184                format!("UNSUBSCRIBE {}\r\n", command).into_bytes()
185            }
186            SendCommand::QUIT => "QUIT\r\n".as_bytes().to_vec(),
187            SendCommand::VERSION => "VERSION\r\n".as_bytes().to_vec(),
188            SendCommand::RAW(raw) => raw.into_bytes(),
189        }
190    }
191}
192
193pub enum SUBSCRIBECommand {
194    TALLY,
195    ACTS,
196}
197
198impl Display for SUBSCRIBECommand {
199    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
200        match self {
201            Self::TALLY => write!(f, "TALLY"),
202            Self::ACTS => write!(f, "ACTS"),
203        }
204    }
205}
206
207impl TryFrom<&mut TcpStream> for RecvCommand {
208    type Error = anyhow::Error;
209
210    fn try_from(stream: &mut TcpStream) -> Result<Self, Self::Error> {
211        // Read directly from TcpStream to avoid BufReader buffering issues
212        let mut value = String::new();
213        let mut buffer = [0u8; 1];
214
215        // Read byte by byte until we hit \n
216        loop {
217            let bytes_read = stream.read(&mut buffer)?;
218            if bytes_read == 0 {
219                return Err(anyhow::anyhow!(std::io::Error::new(
220                    std::io::ErrorKind::ConnectionAborted,
221                    "connection aborted"
222                )));
223            }
224
225            let ch = buffer[0] as char;
226            value.push(ch);
227
228            if ch == '\n' {
229                break;
230            }
231        }
232
233        // remove \r\n
234        let value = value.lines().collect::<String>();
235
236        let commands: Vec<String> = value.split_whitespace().map(|s| s.to_string()).collect();
237
238        // first element
239        let command = commands
240            .first()
241            .ok_or_else(|| anyhow::anyhow!("Empty command"))?;
242        let status: Status = commands.get(1).unwrap().to_owned().into();
243        let body: Option<String> = commands.get(2).cloned();
244        match command.as_str() {
245            // Example Response: TALLY OK 0121...\r\n
246            "TALLY" => {
247                let mut tally_map = HashMap::new();
248                // check if status is ok
249                let chars: Vec<char> = body.unwrap().chars().collect::<Vec<char>>();
250                for (i, char) in chars.iter().enumerate() {
251                    let tally: TallyData = (*char).into();
252                    let mut index = i as InputNumber;
253                    index += 1;
254                    tally_map.insert(index, tally);
255                }
256                Ok(Self::TALLY(TallyResponse {
257                    status,
258                    body: tally_map,
259                }))
260            }
261            // Example Response: FUNCTION OK PreviewInput\r\n
262            // Example Response: FUNCTION ER Error message\r\n
263            "FUNCTION" => Ok(Self::FUNCTION(FunctionResponse { status, body })),
264            // Example Response: ACTS OK Input 1 1\r\n
265            "ACTS" => {
266                // 2以降のベクターを使用する
267                let len = commands.len();
268                let raw = &commands.clone()[2..len];
269                let body = ActivatorsData::try_from(raw)?;
270                Ok(Self::ACTS(ActivatorsResponse { status, body }))
271            }
272            /*
273            Example Response: XML 37\r\n
274            <vmix><version>x.x.x.x</version></vmix>
275            */
276            "XML" => {
277                if let Length(len) = &status {
278                    // Read exact number of bytes with timeout handling
279                    let mut xml_buffer = vec![0u8; *len as usize];
280                    let mut bytes_read = 0;
281                    let start_time = std::time::Instant::now();
282                    let read_timeout = Duration::from_secs(5); // 5 second timeout for XML reads
283
284                    while bytes_read < xml_buffer.len() {
285                        match stream.read(&mut xml_buffer[bytes_read..]) {
286                            Ok(0) => {
287                                // EOF reached before reading all expected bytes
288                                return Err(anyhow::anyhow!(std::io::Error::new(
289                                    std::io::ErrorKind::ConnectionAborted,
290                                    "connection aborted"
291                                )));
292                            }
293                            Ok(n) => {
294                                bytes_read += n;
295                            }
296                            Err(e) => match e.kind() {
297                                std::io::ErrorKind::WouldBlock => {
298                                    // Non-blocking read would block, check timeout
299                                    if start_time.elapsed() > read_timeout {
300                                        return Err(anyhow::anyhow!("XML read timeout"));
301                                    }
302                                    std::thread::sleep(Duration::from_millis(1));
303                                    continue;
304                                }
305                                std::io::ErrorKind::ConnectionAborted
306                                | std::io::ErrorKind::ConnectionReset
307                                | std::io::ErrorKind::UnexpectedEof => {
308                                    return Err(anyhow::anyhow!(e));
309                                }
310                                _ => return Err(anyhow::anyhow!(e)),
311                            },
312                        }
313                    }
314
315                    let xml = String::from_utf8(xml_buffer)?.trim_end().to_string();
316                    return Ok(Self::XML(XMLResponse { status, body: xml }));
317                }
318                Err(anyhow::anyhow!("Failed to read XML"))
319            }
320            "XMLTEXT" => Ok(Self::XMLTEXT(XMLTextResponse { status, body })),
321            "SUBSCRIBE" => Ok(Self::SUBSCRIBE(SubscribeResponse { status, body })),
322            "UNSUBSCRIBE" => Ok(Self::UNSUBSCRIBE(UnsubscribeResponse { status, body })),
323            "QUIT" => Ok(Self::QUIT), // No body
324            "VERSION" => Ok(Self::VERSION(VersionResponse {
325                status,
326                version: body,
327            })),
328            _ => Err(anyhow::anyhow!("No matching command found: {:?}", command)),
329        }
330    }
331}