1#![allow(clippy::needless_doctest_main)]
2#![allow(clippy::inconsistent_digit_grouping)]
49
50use log::debug;
51use serde::{Deserialize, Serialize};
52use std::fmt::Display;
53use std::net::SocketAddr;
54use std::str::FromStr;
55use std::sync::Arc;
56use std::time::Duration;
57use tokio::io::{AsyncReadExt, AsyncWriteExt};
58use tokio::net::{tcp::OwnedWriteHalf, TcpStream};
59use tokio::sync::{mpsc, mpsc::Receiver, Mutex};
60use tokio::time;
61
62#[derive(thiserror::Error, Debug)]
64pub enum Error {
65 #[error("network error: {0}")]
66 NetworkError(#[from] std::io::Error),
67 #[error("invalid network address: {0}")]
68 AddrParseError(#[from] std::net::AddrParseError),
69 #[error("Yamaha Remote Control Protocol error: {0}")]
70 RCPError(String),
71 #[error("could not parse console response: {0}")]
72 RCPParseError(#[from] Box<dyn std::error::Error>),
73 #[error("{0}")]
74 LabelColorParseError(String),
75 #[error("{0}")]
76 SceneListParseError(String),
77}
78
79#[derive(Clone, Copy, Debug, Deserialize, Serialize)]
81pub enum LabelColor {
82 Purple,
83 Pink,
84 Red,
85 Orange,
86 Yellow,
87 Blue,
88 SkyBlue,
89 Green,
90}
91
92impl Display for LabelColor {
93 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94 write!(
95 f,
96 "{}",
97 match self {
98 Self::Purple => "Purple",
99 Self::Pink => "Pink",
100 Self::Red => "Red",
101 Self::Orange => "Orange",
102 Self::Yellow => "Yellow",
103 Self::Blue => "Blue",
104 Self::SkyBlue => "SkyBlue",
105 Self::Green => "Green",
106 }
107 )
108 }
109}
110
111impl FromStr for LabelColor {
112 type Err = Error;
113
114 fn from_str(s: &str) -> Result<Self, Self::Err> {
115 match s.to_lowercase().as_str() {
116 "purple" => Ok(Self::Purple),
117 "pink" => Ok(Self::Pink),
118 "red" => Ok(Self::Red),
119 "orange" => Ok(Self::Orange),
120 "yellow" => Ok(Self::Yellow),
121 "blue" => Ok(Self::Blue),
122 "skyblue" => Ok(Self::SkyBlue),
123 "green" => Ok(Self::Green),
124 _ => Err(Error::LabelColorParseError(format!(
125 "unknown LabelColor descriptor: {s}"
126 ))),
127 }
128 }
129}
130
131#[derive(Clone, Copy, Debug, Deserialize, Serialize)]
133pub enum SceneList {
134 A,
135 B,
136}
137
138impl Display for SceneList {
139 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140 write!(
141 f,
142 "{}",
143 match self {
144 Self::A => "scene_a",
145 Self::B => "scene_b",
146 }
147 )
148 }
149}
150
151impl FromStr for SceneList {
152 type Err = Error;
153
154 fn from_str(s: &str) -> Result<Self, Self::Err> {
155 match s.to_lowercase().as_str() {
156 "a" => Ok(Self::A),
157 "b" => Ok(Self::B),
158 _ => Err(Error::SceneListParseError(format!(
159 "unknown SceneList descriptor: {s}"
160 ))),
161 }
162 }
163}
164
165#[derive(Clone, Debug)]
167pub struct TFMixer {
168 max_fader_val: i32,
169 min_fader_val: i32,
170 neg_inf_val: i32,
171 socket_addr: SocketAddr,
172 connections: Arc<Mutex<Vec<Connection>>>,
173 num_connections: Arc<Mutex<u8>>,
174 connection_limit: u8,
175}
176
177#[derive(Debug)]
180struct Connection {
181 writer: OwnedWriteHalf,
182 recv_channel: Receiver<String>,
183}
184
185impl TFMixer {
186 pub async fn new(addr: &str) -> Result<Self, Error> {
202 let socket_addr: SocketAddr = addr.parse()?;
203
204 let mixer = TFMixer {
205 max_fader_val: 10_00,
206 min_fader_val: -138_00,
207 neg_inf_val: -327_68,
208 socket_addr,
209 connections: Arc::new(Mutex::new(vec![])),
210 num_connections: Arc::new(Mutex::new(8)),
211 connection_limit: 1,
212 };
213
214 let initial_connection = mixer.new_connection().await?;
215 {
216 let mut connections = mixer.connections.lock().await;
217 let mut num_conns = mixer.num_connections.lock().await;
218 connections.push(initial_connection);
219 *num_conns += 1;
220 }
221
222 Ok(mixer)
223 }
224
225 pub async fn set_connection_limit(&mut self, limit: u8) {
227 self.connection_limit = limit;
228
229 let mut conns = self.connections.lock().await;
231 let curr_num_conns = conns.len();
232 if curr_num_conns > self.connection_limit.into() {
233 conns.drain(0..(curr_num_conns - usize::from(self.connection_limit)));
234 }
235 }
236
237 async fn new_connection(&self) -> Result<Connection, Error> {
241 let (tx, rx) = mpsc::channel::<String>(16);
242
243 let std_tcp_sock =
244 std::net::TcpStream::connect_timeout(&self.socket_addr, time::Duration::from_secs(3))?;
245 std_tcp_sock.set_nonblocking(true)?;
246
247 let stream = TcpStream::from_std(std_tcp_sock)?;
248 let (mut reader, writer) = stream.into_split();
249
250 tokio::spawn(async move {
251 let buffer_size = 512;
252
253 loop {
254 let mut line = Vec::new();
255 let mut buffer = vec![0; buffer_size];
256 match reader.read(&mut buffer).await {
257 Ok(_) => {
258 for ele in buffer {
259 match ele {
260 0xA => {
261 let result = std::str::from_utf8(&line).unwrap();
262
263 if result.starts_with("ERROR") || result.starts_with("OK") {
264 tx.send(result.to_owned()).await.unwrap();
265 }
266
267 line.clear();
268 }
269 _ => line.push(ele),
270 }
271 }
272 }
273 Err(e) => return Err::<(), Box<std::io::Error>>(Box::new(e)),
274 }
275 }
276 });
277
278 Ok(Connection {
279 writer,
280 recv_channel: rx,
281 })
282 }
283
284 async fn send_command(&self, mut cmd: String) -> Result<String, Error> {
290 cmd.push('\n');
291
292 debug!("Sending command: {cmd}");
293
294 let mut conn: Connection;
296 {
297 let mut conns = self.connections.lock().await;
298 conn = match conns.pop() {
299 Some(c) => c,
300 None => {
301 let mut num_conns = self.num_connections.lock().await;
302 if *num_conns < self.connection_limit {
303 *num_conns += 1;
304 self.new_connection().await?
305 } else {
306 drop(num_conns);
307 let existing_conn: Connection;
308 loop {
309 drop(conns);
310 tokio::time::sleep(Duration::from_millis(10)).await;
311 conns = self.connections.lock().await;
312 if let Some(c) = conns.pop() {
313 existing_conn = c;
314 break;
315 }
316 }
317
318 existing_conn
319 }
320 }
321 };
322 }
323
324 conn.writer.write_all(cmd.as_bytes()).await?;
325
326 let result = match conn.recv_channel.recv().await {
327 Some(v) => {
328 if v.starts_with("ERROR") {
329 Err(Error::RCPError(v))
330 } else if v.starts_with("OK") {
331 Ok(v)
332 } else {
333 Err(Error::RCPError(format!(
334 "received message did not start with ERROR or OK: {v}"
335 )))
336 }
337 }
338 None => Err(Error::RCPError("closed channel from reader task".into())),
339 };
340
341 {
343 let mut conns = self.connections.lock().await;
344 conns.push(conn);
345 }
346
347 result
348 }
349
350 async fn request_bool(&self, cmd: String) -> Result<bool, Error> {
352 let response = self.send_command(cmd).await?;
353
354 match response.split(' ').last() {
355 Some(v) => Ok(v != "0"),
356 None => Err(Error::RCPError("Could not get last item in list".into())),
357 }
358 }
359
360 async fn request_int(&self, cmd: String) -> Result<i32, Error> {
362 let response = self.send_command(cmd).await?;
363
364 match response.split(' ').last() {
365 Some(v) => Ok(v
366 .parse::<i32>()
367 .map_err(|e| Error::RCPParseError(Box::new(e)))?),
368 None => Err(Error::RCPError("Couldn't find the last item".into())),
369 }
370 }
371
372 async fn request_string(&self, cmd: String) -> Result<String, Error> {
374 let response = self.send_command(cmd).await?;
375
376 let mut resp_vec = Vec::new();
377 let mut looking = false;
378 for fragment in response.split(' ') {
379 if !looking && fragment.starts_with('\"') && fragment.ends_with('\"') {
380 resp_vec.push(fragment[1..fragment.len() - 1].to_owned());
381 break;
382 }
383
384 if fragment.starts_with('\"') && !looking {
385 looking = true;
386 resp_vec.push(fragment[1..fragment.len()].to_owned());
387 continue;
388 }
389
390 if fragment.ends_with('\"') && looking {
391 resp_vec.push(fragment[0..fragment.len() - 1].to_owned());
392 break;
393 }
394
395 if looking {
396 resp_vec.push(fragment.to_owned());
397 }
398 }
399 let label = resp_vec.join(" ");
400
401 Ok(label)
402 }
403
404 pub async fn fader_level(&self, channel: u16) -> Result<i32, Error> {
405 self.request_int(format!("get MIXER:Current/InCh/Fader/Level {channel} 0"))
406 .await
407 }
408
409 pub async fn set_fader_level(&self, channel: u16, value: i32) -> Result<(), Error> {
410 self.send_command(format!(
411 "set MIXER:Current/InCh/Fader/Level {channel} 0 {value}"
412 ))
413 .await?;
414
415 Ok(())
418 }
419
420 pub async fn muted(&self, channel: u16) -> Result<bool, Error> {
421 self.request_bool(format!("get MIXER:Current/InCh/Fader/On {channel} 0"))
422 .await
423 }
424
425 pub async fn set_muted(&self, channel: u16, muted: bool) -> Result<(), Error> {
426 self.send_command(format!(
427 "set MIXER:Current/InCh/Fader/On {channel} 0 {}",
428 if muted { 0 } else { 1 }
429 ))
430 .await?;
431
432 Ok(())
433 }
434
435 pub async fn color(&self, channel: u16) -> Result<LabelColor, Error> {
436 let response = self
437 .send_command(format!("get MIXER:Current/InCh/Label/Color {channel} 0"))
438 .await?;
439
440 match response.split(' ').last() {
441 Some(v) => Ok(v.replace('\"', "").parse()?),
442 None => Err(Error::RCPError("could not get last item in list".into())),
443 }
444 }
445
446 pub async fn set_color(&self, channel: u16, color: LabelColor) -> Result<(), Error> {
447 self.send_command(format!(
448 "set MIXER:Current/InCh/Label/Color {channel} 0 \"{}\"",
449 color
450 ))
451 .await?;
452
453 Ok(())
454 }
455
456 pub async fn label(&self, channel: u16) -> Result<String, Error> {
457 self.request_string(format!("get MIXER:Current/InCh/Label/Name {channel} 0"))
458 .await
459 }
460
461 pub async fn set_label(&self, channel: u16, label: &str) -> Result<(), Error> {
462 self.send_command(format!(
463 "set MIXER:Current/InCh/Label/Name {channel} 0 \"{label}\""
464 ))
465 .await?;
466
467 Ok(())
468 }
469
470 pub async fn recall_scene(&self, scene_list: SceneList, scene_number: u8) -> Result<(), Error> {
471 self.send_command(format!("ssrecall_ex {scene_list} {scene_number}"))
472 .await?;
473 Ok(())
474 }
475
476 pub async fn fade(
477 &self,
478 channel: u16,
479 mut initial_value: i32,
480 mut final_value: i32,
481 duration_ms: u64,
482 ) -> Result<(), Error> {
483 initial_value = initial_value.clamp(self.min_fader_val, self.max_fader_val);
484 final_value = final_value.clamp(self.min_fader_val, self.max_fader_val);
485
486 let num_steps: u64 = duration_ms / 50;
487 let step_delta: i32 = (final_value - initial_value) / (num_steps as i32);
488
489 let mut interval = time::interval(time::Duration::from_millis(50));
490 let mut current_value = initial_value;
491
492 for _i in 0..num_steps {
493 interval.tick().await;
494
495 self.set_fader_level(channel, current_value).await?;
496 debug!("Set channel {channel} to {current_value}");
497
498 current_value += step_delta;
499 }
500
501 final_value = if final_value == self.min_fader_val {
502 self.neg_inf_val
503 } else {
504 final_value
505 };
506
507 self.set_fader_level(channel, final_value).await?;
508 debug!("Set channel {channel} to {final_value}");
509
510 Ok(())
511 }
512}