yamaha_rcp/
lib.rs

1#![allow(clippy::needless_doctest_main)]
2/*!
3# Yamaha Remote Control Protocol (Rust)
4
5Remote control of [Yamaha mixing consoles](https://usa.yamaha.com/products/proaudio/mixers/index.html) using IP networking.
6
7## Disclaimer
8
9 > This library's API is nowhere near the "final" API.
10 > Development will follow Semantic Versioning, but expect many changes.
11
12 > Also, `yamaha-rcp` is mainly tested against the [Yamaha TF Series](https://usa.yamaha.com/products/proaudio/mixers/tf/index.html)
13 > of consoles, specifically the TF1.
14 > Fully tested compatibility of the
15 > [Rivage PM](https://usa.yamaha.com/products/proaudio/mixers/rivage_pm/index.html),
16 > [DM7](https://usa.yamaha.com/products/proaudio/mixers/dm7/index.html),
17 > [DM3](https://usa.yamaha.com/products/proaudio/mixers/dm3/index.html),
18 > [CL](https://usa.yamaha.com/products/proaudio/mixers/cl_series/index.html),
19 > and [QL](https://usa.yamaha.com/products/proaudio/mixers/ql_series/index.html)
20 > lines is the final goal of this library,
21 > but I do not have access to any of these consoles to be able to test against.
22 > If you do happen to have access and are willing to help out development, please [get in touch](https://github.com/BrenekH/yamaha-rcp-rs/discussions).
23
24## Example
25
26```rust
27use yamaha_rcp::{TFMixer, Error};
28
29#[tokio::main]
30async fn main() -> Result<(), Error> {
31    let mixer = TFMixer::new("192.168.0.128:49280").await?;
32
33    // Set channel 1 to -10.00 dB
34    mixer.set_fader_level(0, -10_00).await?;
35
36    Ok(())
37}
38```
39
40## Extra Documentation
41
42The following is a personal collection of documentation on Yamaha's mixer control protocol since
43they don't provide any decent version of their own: [github.com/BrenekH/yamaha-rcp-docs](https://github.com/BrenekH/yamaha-rcp-docs#readme)
44*/
45
46// We use an underscore to make our decibel values more readable, which
47// Clippy by default does not agree with.
48#![allow(clippy::inconsistent_digit_grouping)]
49
50use log::debug;
51use serde::{Deserialize, Serialize};
52use std::fmt::Display;
53use std::net::SocketAddr;
54use std::str::FromStr;
55use std::sync::Arc;
56use std::time::Duration;
57use tokio::io::{AsyncReadExt, AsyncWriteExt};
58use tokio::net::{tcp::OwnedWriteHalf, TcpStream};
59use tokio::sync::{mpsc, mpsc::Receiver, Mutex};
60use tokio::time;
61
62/// Enumeration of errors that originate from `yamaha-rcp-rs`
63#[derive(thiserror::Error, Debug)]
64pub enum Error {
65    #[error("network error: {0}")]
66    NetworkError(#[from] std::io::Error),
67    #[error("invalid network address: {0}")]
68    AddrParseError(#[from] std::net::AddrParseError),
69    #[error("Yamaha Remote Control Protocol error: {0}")]
70    RCPError(String),
71    #[error("could not parse console response: {0}")]
72    RCPParseError(#[from] Box<dyn std::error::Error>),
73    #[error("{0}")]
74    LabelColorParseError(String),
75    #[error("{0}")]
76    SceneListParseError(String),
77}
78
79/// All possible colors that the TF1 console can use for a channel
80#[derive(Clone, Copy, Debug, Deserialize, Serialize)]
81pub enum LabelColor {
82    Purple,
83    Pink,
84    Red,
85    Orange,
86    Yellow,
87    Blue,
88    SkyBlue,
89    Green,
90}
91
92impl Display for LabelColor {
93    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94        write!(
95            f,
96            "{}",
97            match self {
98                Self::Purple => "Purple",
99                Self::Pink => "Pink",
100                Self::Red => "Red",
101                Self::Orange => "Orange",
102                Self::Yellow => "Yellow",
103                Self::Blue => "Blue",
104                Self::SkyBlue => "SkyBlue",
105                Self::Green => "Green",
106            }
107        )
108    }
109}
110
111impl FromStr for LabelColor {
112    type Err = Error;
113
114    fn from_str(s: &str) -> Result<Self, Self::Err> {
115        match s.to_lowercase().as_str() {
116            "purple" => Ok(Self::Purple),
117            "pink" => Ok(Self::Pink),
118            "red" => Ok(Self::Red),
119            "orange" => Ok(Self::Orange),
120            "yellow" => Ok(Self::Yellow),
121            "blue" => Ok(Self::Blue),
122            "skyblue" => Ok(Self::SkyBlue),
123            "green" => Ok(Self::Green),
124            _ => Err(Error::LabelColorParseError(format!(
125                "unknown LabelColor descriptor: {s}"
126            ))),
127        }
128    }
129}
130
131/// Possible scene lists that scenes may be stored in
132#[derive(Clone, Copy, Debug, Deserialize, Serialize)]
133pub enum SceneList {
134    A,
135    B,
136}
137
138impl Display for SceneList {
139    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140        write!(
141            f,
142            "{}",
143            match self {
144                Self::A => "scene_a",
145                Self::B => "scene_b",
146            }
147        )
148    }
149}
150
151impl FromStr for SceneList {
152    type Err = Error;
153
154    fn from_str(s: &str) -> Result<Self, Self::Err> {
155        match s.to_lowercase().as_str() {
156            "a" => Ok(Self::A),
157            "b" => Ok(Self::B),
158            _ => Err(Error::SceneListParseError(format!(
159                "unknown SceneList descriptor: {s}"
160            ))),
161        }
162    }
163}
164
165/// Main entry point to access a TF series mixer
166#[derive(Clone, Debug)]
167pub struct TFMixer {
168    max_fader_val: i32,
169    min_fader_val: i32,
170    neg_inf_val: i32,
171    socket_addr: SocketAddr,
172    connections: Arc<Mutex<Vec<Connection>>>,
173    num_connections: Arc<Mutex<u8>>,
174    connection_limit: u8,
175}
176
177/// Represents a connection that can be acquired by a thread
178/// to send commands to the console
179#[derive(Debug)]
180struct Connection {
181    writer: OwnedWriteHalf,
182    recv_channel: Receiver<String>,
183}
184
185impl TFMixer {
186    /// Create a new [TFMixer]
187    ///
188    /// While this method takes connection info (pattern of `<ip address>:<port>`),
189    /// it does not make any connections.
190    /// They are created on demand up to the connection limit.
191    ///
192    /// ```rust
193    /// use yamaha_rcp::TFMixer;
194    ///
195    /// # tokio_test::block_on(async {
196    /// // Creates new mixer using the default IP and port of the TF series consoles
197    /// TFMixer::new("192.168.0.128:49280").await?;
198    /// # Ok::<(), yamaha_rcp::Error>(())
199    /// # });
200    /// ```
201    pub async fn new(addr: &str) -> Result<Self, Error> {
202        let socket_addr: SocketAddr = addr.parse()?;
203
204        let mixer = TFMixer {
205            max_fader_val: 10_00,
206            min_fader_val: -138_00,
207            neg_inf_val: -327_68,
208            socket_addr,
209            connections: Arc::new(Mutex::new(vec![])),
210            num_connections: Arc::new(Mutex::new(8)),
211            connection_limit: 1,
212        };
213
214        let initial_connection = mixer.new_connection().await?;
215        {
216            let mut connections = mixer.connections.lock().await;
217            let mut num_conns = mixer.num_connections.lock().await;
218            connections.push(initial_connection);
219            *num_conns += 1;
220        }
221
222        Ok(mixer)
223    }
224
225    /// Sets the number of allowed connections
226    pub async fn set_connection_limit(&mut self, limit: u8) {
227        self.connection_limit = limit;
228
229        // Remove excess connections if any exist
230        let mut conns = self.connections.lock().await;
231        let curr_num_conns = conns.len();
232        if curr_num_conns > self.connection_limit.into() {
233            conns.drain(0..(curr_num_conns - usize::from(self.connection_limit)));
234        }
235    }
236
237    /// Creates a new connection using the saved IP address and port.
238    ///
239    /// If the connection is not made within 3 seconds, a timeout error is produced.
240    async fn new_connection(&self) -> Result<Connection, Error> {
241        let (tx, rx) = mpsc::channel::<String>(16);
242
243        let std_tcp_sock =
244            std::net::TcpStream::connect_timeout(&self.socket_addr, time::Duration::from_secs(3))?;
245        std_tcp_sock.set_nonblocking(true)?;
246
247        let stream = TcpStream::from_std(std_tcp_sock)?;
248        let (mut reader, writer) = stream.into_split();
249
250        tokio::spawn(async move {
251            let buffer_size = 512;
252
253            loop {
254                let mut line = Vec::new();
255                let mut buffer = vec![0; buffer_size];
256                match reader.read(&mut buffer).await {
257                    Ok(_) => {
258                        for ele in buffer {
259                            match ele {
260                                0xA => {
261                                    let result = std::str::from_utf8(&line).unwrap();
262
263                                    if result.starts_with("ERROR") || result.starts_with("OK") {
264                                        tx.send(result.to_owned()).await.unwrap();
265                                    }
266
267                                    line.clear();
268                                }
269                                _ => line.push(ele),
270                            }
271                        }
272                    }
273                    Err(e) => return Err::<(), Box<std::io::Error>>(Box::new(e)),
274                }
275            }
276        });
277
278        Ok(Connection {
279            writer,
280            recv_channel: rx,
281        })
282    }
283
284    /// Send a command string to the console.
285    ///
286    /// A non-error response from the console will be returned as an `Ok(String)` value,
287    /// but if an error in transit occurs or if the console returns an `ERROR` value,
288    /// that error will be returned as an `Err(Error)`.
289    async fn send_command(&self, mut cmd: String) -> Result<String, Error> {
290        cmd.push('\n');
291
292        debug!("Sending command: {cmd}");
293
294        // Extract a connection from the connection pool while observing the connection limit
295        let mut conn: Connection;
296        {
297            let mut conns = self.connections.lock().await;
298            conn = match conns.pop() {
299                Some(c) => c,
300                None => {
301                    let mut num_conns = self.num_connections.lock().await;
302                    if *num_conns < self.connection_limit {
303                        *num_conns += 1;
304                        self.new_connection().await?
305                    } else {
306                        drop(num_conns);
307                        let existing_conn: Connection;
308                        loop {
309                            drop(conns);
310                            tokio::time::sleep(Duration::from_millis(10)).await;
311                            conns = self.connections.lock().await;
312                            if let Some(c) = conns.pop() {
313                                existing_conn = c;
314                                break;
315                            }
316                        }
317
318                        existing_conn
319                    }
320                }
321            };
322        }
323
324        conn.writer.write_all(cmd.as_bytes()).await?;
325
326        let result = match conn.recv_channel.recv().await {
327            Some(v) => {
328                if v.starts_with("ERROR") {
329                    Err(Error::RCPError(v))
330                } else if v.starts_with("OK") {
331                    Ok(v)
332                } else {
333                    Err(Error::RCPError(format!(
334                        "received message did not start with ERROR or OK: {v}"
335                    )))
336                }
337            }
338            None => Err(Error::RCPError("closed channel from reader task".into())),
339        };
340
341        // Add the connection we used back into the pool
342        {
343            let mut conns = self.connections.lock().await;
344            conns.push(conn);
345        }
346
347        result
348    }
349
350    /// Generic method to request a boolean from the console
351    async fn request_bool(&self, cmd: String) -> Result<bool, Error> {
352        let response = self.send_command(cmd).await?;
353
354        match response.split(' ').last() {
355            Some(v) => Ok(v != "0"),
356            None => Err(Error::RCPError("Could not get last item in list".into())),
357        }
358    }
359
360    /// Generic method to request an integer from the console
361    async fn request_int(&self, cmd: String) -> Result<i32, Error> {
362        let response = self.send_command(cmd).await?;
363
364        match response.split(' ').last() {
365            Some(v) => Ok(v
366                .parse::<i32>()
367                .map_err(|e| Error::RCPParseError(Box::new(e)))?),
368            None => Err(Error::RCPError("Couldn't find the last item".into())),
369        }
370    }
371
372    /// Generic method to request a string from the console
373    async fn request_string(&self, cmd: String) -> Result<String, Error> {
374        let response = self.send_command(cmd).await?;
375
376        let mut resp_vec = Vec::new();
377        let mut looking = false;
378        for fragment in response.split(' ') {
379            if !looking && fragment.starts_with('\"') && fragment.ends_with('\"') {
380                resp_vec.push(fragment[1..fragment.len() - 1].to_owned());
381                break;
382            }
383
384            if fragment.starts_with('\"') && !looking {
385                looking = true;
386                resp_vec.push(fragment[1..fragment.len()].to_owned());
387                continue;
388            }
389
390            if fragment.ends_with('\"') && looking {
391                resp_vec.push(fragment[0..fragment.len() - 1].to_owned());
392                break;
393            }
394
395            if looking {
396                resp_vec.push(fragment.to_owned());
397            }
398        }
399        let label = resp_vec.join(" ");
400
401        Ok(label)
402    }
403
404    pub async fn fader_level(&self, channel: u16) -> Result<i32, Error> {
405        self.request_int(format!("get MIXER:Current/InCh/Fader/Level {channel} 0"))
406            .await
407    }
408
409    pub async fn set_fader_level(&self, channel: u16, value: i32) -> Result<(), Error> {
410        self.send_command(format!(
411            "set MIXER:Current/InCh/Fader/Level {channel} 0 {value}"
412        ))
413        .await?;
414
415        // Technically, this RCP call returns the actually set value, which we could capture and
416        // return to the consumer.
417        Ok(())
418    }
419
420    pub async fn muted(&self, channel: u16) -> Result<bool, Error> {
421        self.request_bool(format!("get MIXER:Current/InCh/Fader/On {channel} 0"))
422            .await
423    }
424
425    pub async fn set_muted(&self, channel: u16, muted: bool) -> Result<(), Error> {
426        self.send_command(format!(
427            "set MIXER:Current/InCh/Fader/On {channel} 0 {}",
428            if muted { 0 } else { 1 }
429        ))
430        .await?;
431
432        Ok(())
433    }
434
435    pub async fn color(&self, channel: u16) -> Result<LabelColor, Error> {
436        let response = self
437            .send_command(format!("get MIXER:Current/InCh/Label/Color {channel} 0"))
438            .await?;
439
440        match response.split(' ').last() {
441            Some(v) => Ok(v.replace('\"', "").parse()?),
442            None => Err(Error::RCPError("could not get last item in list".into())),
443        }
444    }
445
446    pub async fn set_color(&self, channel: u16, color: LabelColor) -> Result<(), Error> {
447        self.send_command(format!(
448            "set MIXER:Current/InCh/Label/Color {channel} 0 \"{}\"",
449            color
450        ))
451        .await?;
452
453        Ok(())
454    }
455
456    pub async fn label(&self, channel: u16) -> Result<String, Error> {
457        self.request_string(format!("get MIXER:Current/InCh/Label/Name {channel} 0"))
458            .await
459    }
460
461    pub async fn set_label(&self, channel: u16, label: &str) -> Result<(), Error> {
462        self.send_command(format!(
463            "set MIXER:Current/InCh/Label/Name {channel} 0 \"{label}\""
464        ))
465        .await?;
466
467        Ok(())
468    }
469
470    pub async fn recall_scene(&self, scene_list: SceneList, scene_number: u8) -> Result<(), Error> {
471        self.send_command(format!("ssrecall_ex {scene_list} {scene_number}"))
472            .await?;
473        Ok(())
474    }
475
476    pub async fn fade(
477        &self,
478        channel: u16,
479        mut initial_value: i32,
480        mut final_value: i32,
481        duration_ms: u64,
482    ) -> Result<(), Error> {
483        initial_value = initial_value.clamp(self.min_fader_val, self.max_fader_val);
484        final_value = final_value.clamp(self.min_fader_val, self.max_fader_val);
485
486        let num_steps: u64 = duration_ms / 50;
487        let step_delta: i32 = (final_value - initial_value) / (num_steps as i32);
488
489        let mut interval = time::interval(time::Duration::from_millis(50));
490        let mut current_value = initial_value;
491
492        for _i in 0..num_steps {
493            interval.tick().await;
494
495            self.set_fader_level(channel, current_value).await?;
496            debug!("Set channel {channel} to {current_value}");
497
498            current_value += step_delta;
499        }
500
501        final_value = if final_value == self.min_fader_val {
502            self.neg_inf_val
503        } else {
504            final_value
505        };
506
507        self.set_fader_level(channel, final_value).await?;
508        debug!("Set channel {channel} to {final_value}");
509
510        Ok(())
511    }
512}