Skip to main content

vmix_http/
client.rs

1use crate::traits::VmixApiClient;
2use anyhow::Result;
3use async_trait::async_trait;
4use shiguredo_http11::{Request, ResponseDecoder};
5use std::{collections::HashMap, net::SocketAddr, time::Duration};
6use tokio::io::{AsyncReadExt, AsyncWriteExt};
7use tokio::net::TcpStream;
8use tokio::time::timeout;
9use urlencoding::encode;
10use vmix_core::Vmix;
11use vmix_tcp::{InputNumber, TallyData};
12
13#[derive(Debug, Clone)]
14pub struct HttpVmixClient {
15    host: String,
16    port: u16,
17    request_timeout: Duration,
18}
19
20impl HttpVmixClient {
21    pub fn new(addr: SocketAddr, request_timeout: Duration) -> Self {
22        Self {
23            host: addr.ip().to_string(),
24            port: addr.port(),
25            request_timeout,
26        }
27    }
28
29    pub fn new_with_host_port(host: &str, port: u16, request_timeout: Duration) -> Self {
30        Self {
31            host: host.to_string(),
32            port,
33            request_timeout,
34        }
35    }
36
37    async fn send_request(
38        &self,
39        path: &str,
40        query_params: &HashMap<String, String>,
41    ) -> Result<Vec<u8>> {
42        // Build query string
43        let mut query_parts = Vec::new();
44        for (key, value) in query_params {
45            query_parts.push(format!("{}={}", encode(key), encode(value)));
46        }
47        let query_string = if query_parts.is_empty() {
48            String::new()
49        } else {
50            format!("?{}", query_parts.join("&"))
51        };
52
53        let uri = format!("{}{}", path, query_string);
54
55        // Create HTTP request
56        let request = Request::new("GET", &uri)
57            .header("Host", &format!("{}:{}", self.host, self.port))
58            .header("Connection", "close");
59
60        // Encode request
61        let request_bytes = request.encode();
62
63        // Connect to server
64        let addr = format!("{}:{}", self.host, self.port);
65        let mut stream = timeout(self.request_timeout, TcpStream::connect(&addr)).await??;
66
67        // Send request
68        stream.write_all(&request_bytes).await?;
69        stream.flush().await?;
70
71        // Read response
72        let mut decoder = ResponseDecoder::new();
73        let mut temp_buffer = [0u8; 8192];
74        let mut response_opt = None;
75
76        // Read until we get a complete response
77        loop {
78            let read_timeout = timeout(self.request_timeout, stream.read(&mut temp_buffer)).await?;
79            match read_timeout {
80                Ok(0) => break, // EOF
81                Ok(n) => {
82                    decoder.feed(&temp_buffer[..n])?;
83
84                    // Try to decode response
85                    if let Some(response) = decoder.decode()? {
86                        response_opt = Some(response);
87                        break;
88                    }
89                }
90                Err(e) => return Err(anyhow::anyhow!("Failed to read response: {}", e)),
91            }
92        }
93
94        let response = response_opt.ok_or_else(|| anyhow::anyhow!("Failed to decode response"))?;
95
96        // Check status code
97        if !(200..300).contains(&response.status_code) {
98            return Err(anyhow::anyhow!(
99                "HTTP request failed with status: {}",
100                response.status_code
101            ));
102        }
103
104        // Read body
105        // The decoder has already consumed the headers, so we need to read the body separately
106        // Check if there's a body based on Content-Length or Transfer-Encoding
107        let mut body = Vec::new();
108
109        // Try to get Content-Length from headers
110        let content_length = response
111            .headers
112            .iter()
113            .find(|(name, _)| name.eq_ignore_ascii_case("content-length"))
114            .and_then(|(_, value)| value.parse::<usize>().ok());
115
116        if let Some(len) = content_length {
117            // Read exactly len bytes
118            let mut remaining = len;
119            while remaining > 0 {
120                let to_read = remaining.min(temp_buffer.len());
121                let read_timeout = timeout(
122                    self.request_timeout,
123                    stream.read(&mut temp_buffer[..to_read]),
124                )
125                .await?;
126                match read_timeout {
127                    Ok(0) => break, // EOF
128                    Ok(n) => {
129                        body.extend_from_slice(&temp_buffer[..n]);
130                        remaining -= n;
131                    }
132                    Err(e) => return Err(anyhow::anyhow!("Failed to read body: {}", e)),
133                }
134            }
135        } else {
136            // No Content-Length, read until EOF (Connection: close)
137            loop {
138                let read_timeout =
139                    timeout(self.request_timeout, stream.read(&mut temp_buffer)).await?;
140                match read_timeout {
141                    Ok(0) => break, // EOF
142                    Ok(n) => {
143                        body.extend_from_slice(&temp_buffer[..n]);
144                    }
145                    Err(e) => return Err(anyhow::anyhow!("Failed to read body: {}", e)),
146                }
147            }
148        }
149
150        Ok(body)
151    }
152
153    pub async fn execute_function(
154        &self,
155        function: &str,
156        params: &HashMap<String, String>,
157    ) -> Result<()> {
158        let mut query_params = HashMap::new();
159        query_params.insert("Function".to_string(), function.to_string());
160        query_params.extend(params.clone());
161
162        self.send_request("/api", &query_params).await?;
163        Ok(())
164    }
165
166    pub async fn get_xml_state(&self) -> Result<Vmix> {
167        let body = self.send_request("/api", &HashMap::new()).await?;
168        let xml_text = String::from_utf8(body)?;
169        let vmix_data: Vmix = vmix_core::from_str(&xml_text)?;
170        Ok(vmix_data)
171    }
172
173    pub async fn get_tally_data(&self) -> Result<HashMap<InputNumber, TallyData>> {
174        // HTTP API doesn't have direct TALLY command, so we need to derive it from XML state
175        // This simulates the TCP TALLY response by analyzing the XML state
176        let vmix_state = self.get_xml_state().await?;
177        let mut tally_map = HashMap::new();
178
179        // Parse active and preview inputs to determine tally states
180        let active_input: InputNumber = vmix_state.active.parse().unwrap_or(0);
181        let preview_input: InputNumber = vmix_state.preview.parse().unwrap_or(0);
182
183        // Populate tally data for all inputs (up to 1000 as per vMix spec)
184        for input in &vmix_state.inputs.input {
185            let input_number: InputNumber = input.number.parse().unwrap_or(0);
186
187            let tally_state = if input_number == active_input {
188                TallyData::PROGRAM
189            } else if input_number == preview_input {
190                TallyData::PREVIEW
191            } else {
192                TallyData::OFF
193            };
194
195            tally_map.insert(input_number, tally_state);
196        }
197
198        Ok(tally_map)
199    }
200
201    pub async fn is_connected(&self) -> bool {
202        self.send_request("/api", &HashMap::new()).await.is_ok()
203    }
204
205    pub async fn get_active_input(&self) -> Result<InputNumber> {
206        let vmix_data = self.get_xml_state().await?;
207        Ok(vmix_data.active.parse().unwrap_or(0))
208    }
209
210    pub async fn get_preview_input(&self) -> Result<InputNumber> {
211        let vmix_data = self.get_xml_state().await?;
212        Ok(vmix_data.preview.parse().unwrap_or(0))
213    }
214
215    pub fn get_base_url(&self) -> String {
216        format!("http://{}:{}/api", self.host, self.port)
217    }
218}
219
220// Helper function for common vMix functions
221impl HttpVmixClient {
222    pub async fn cut(&self) -> Result<()> {
223        self.execute_function("Cut", &HashMap::new()).await
224    }
225
226    pub async fn fade(&self, duration_ms: Option<u32>) -> Result<()> {
227        let mut params = HashMap::new();
228        if let Some(duration) = duration_ms {
229            params.insert("Duration".to_string(), duration.to_string());
230        }
231        self.execute_function("Fade", &params).await
232    }
233
234    pub async fn preview_input(&self, input: InputNumber) -> Result<()> {
235        let mut params = HashMap::new();
236        params.insert("Input".to_string(), input.to_string());
237        self.execute_function("PreviewInput", &params).await
238    }
239
240    pub async fn active_input(&self, input: InputNumber) -> Result<()> {
241        let mut params = HashMap::new();
242        params.insert("Input".to_string(), input.to_string());
243        self.execute_function("ActiveInput", &params).await
244    }
245
246    pub async fn set_text(
247        &self,
248        input: InputNumber,
249        selected_name: &str,
250        value: &str,
251    ) -> Result<()> {
252        let mut params = HashMap::new();
253        params.insert("Input".to_string(), input.to_string());
254        params.insert("SelectedName".to_string(), selected_name.to_string());
255        params.insert("Value".to_string(), value.to_string());
256        self.execute_function("SetText", &params).await
257    }
258
259    pub async fn start_recording(&self) -> Result<()> {
260        self.execute_function("StartRecording", &HashMap::new())
261            .await
262    }
263
264    pub async fn stop_recording(&self) -> Result<()> {
265        self.execute_function("StopRecording", &HashMap::new())
266            .await
267    }
268
269    pub async fn start_streaming(&self) -> Result<()> {
270        self.execute_function("StartStreaming", &HashMap::new())
271            .await
272    }
273
274    pub async fn stop_streaming(&self) -> Result<()> {
275        self.execute_function("StopStreaming", &HashMap::new())
276            .await
277    }
278}
279
280// HttpVmixClientはマルチスレッド環境で安全に使用できる
281unsafe impl Send for HttpVmixClient {}
282unsafe impl Sync for HttpVmixClient {}
283
284#[async_trait]
285impl VmixApiClient for HttpVmixClient {
286    async fn execute_function(
287        &self,
288        function: &str,
289        params: &HashMap<String, String>,
290    ) -> Result<()> {
291        self.execute_function(function, params).await
292    }
293
294    async fn get_xml_state(&self) -> Result<Vmix> {
295        self.get_xml_state().await
296    }
297
298    async fn get_tally_data(&self) -> Result<HashMap<InputNumber, TallyData>> {
299        self.get_tally_data().await
300    }
301
302    async fn is_connected(&self) -> bool {
303        self.is_connected().await
304    }
305
306    async fn get_active_input(&self) -> Result<InputNumber> {
307        self.get_active_input().await
308    }
309
310    async fn get_preview_input(&self) -> Result<InputNumber> {
311        self.get_preview_input().await
312    }
313}