use std::{
collections::HashSet,
error::Error,
io,
net::{AddrParseError, Ipv4Addr, SocketAddr, SocketAddrV4},
sync::Arc,
time::Duration,
};
use rosc::{
address::{Matcher, OscAddress},
OscMessage, OscPacket, OscType,
};
use tokio::{
net::UdpSocket,
select,
sync::{
mpsc::{self, Receiver, Sender},
Mutex,
},
task::JoinHandle,
};
use tracing::{error, info, span, Level};
use crate::{config, player::Player, util};
const BROADCAST_SLEEP_DURATION: Duration = Duration::from_millis(500);
const STATUS_STOPPED: &str = "Stopped";
const STATUS_PLAYING: &str = "Playing";
#[derive(Debug, PartialEq)]
enum OscAction {
Play,
Prev,
Next,
Stop,
AllSongs,
Playlist,
StopSamples,
SectionAck,
StopSectionLoop,
LoopSection,
Unrecognized,
}
pub struct Driver {
player: Arc<Player>,
addr: SocketAddr,
broadcast_addresses: Vec<SocketAddr>,
osc_events: Arc<OscEvents>,
}
pub(super) struct OscEvents {
play: Matcher,
prev: Matcher,
next: Matcher,
stop: Matcher,
all_songs: Matcher,
playlist: Matcher,
stop_samples: Matcher,
section_ack: Matcher,
stop_section_loop: Matcher,
loop_section: Matcher,
status: String,
playlist_current: String,
playlist_current_song: String,
playlist_current_song_elapsed: String,
}
impl Driver {
pub fn new(
config: Box<config::OscController>,
player: Arc<Player>,
) -> Result<Arc<Self>, Box<dyn Error>> {
let addr: SocketAddr =
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, config.port()));
let broadcast_addresses: Vec<SocketAddr> = config
.broadcast_addresses()
.iter()
.map(|addr| addr.parse())
.collect::<Result<Vec<SocketAddr>, AddrParseError>>()?;
Ok(Arc::new(Driver {
player,
addr,
broadcast_addresses,
osc_events: Arc::new(OscEvents {
play: Matcher::new(config.play())?,
prev: Matcher::new(config.prev())?,
next: Matcher::new(config.next())?,
stop: Matcher::new(config.stop())?,
all_songs: Matcher::new(config.all_songs())?,
playlist: Matcher::new(config.playlist())?,
stop_samples: Matcher::new(config.stop_samples())?,
section_ack: Matcher::new(config.section_ack())?,
stop_section_loop: Matcher::new(config.stop_section_loop())?,
loop_section: Matcher::new(config.loop_section())?,
status: config.status().to_string(),
playlist_current: config.playlist_current().to_string(),
playlist_current_song: config.playlist_current_song().to_string(),
playlist_current_song_elapsed: config.playlist_current_song_elapsed().to_string(),
}),
}))
}
}
impl super::Driver for Driver {
fn monitor_events(&self) -> JoinHandle<Result<(), std::io::Error>> {
let addr = self.addr;
let broadcast_addresses = self.broadcast_addresses.clone();
let player = self.player.clone();
let osc_events = self.osc_events.clone();
tokio::spawn(async move {
let span = span!(Level::INFO, "OSC Driver");
let _enter = span.enter();
info!("OSC driver started.");
let socket = UdpSocket::bind(addr).await?;
socket.set_broadcast(true)?;
for broadcast_addr in broadcast_addresses.iter() {
let ip = broadcast_addr.ip();
if ip.is_multicast() {
match ip {
std::net::IpAddr::V4(ipv4_addr) => {
socket.join_multicast_v4(ipv4_addr, Ipv4Addr::UNSPECIFIED)?
}
std::net::IpAddr::V6(ipv6_addr) => {
socket.join_multicast_v6(&ipv6_addr, 0)?
}
}
}
}
let (rx_sender, mut rx_receiver) = mpsc::channel::<OscPacket>(10);
let (tx_sender, tx_receiver) = mpsc::channel::<OscPacket>(10);
let connected_clients: Arc<Mutex<HashSet<SocketAddr>>> =
Arc::new(Mutex::new(HashSet::new()));
tokio::spawn(Self::handle_udp_comms(
socket,
broadcast_addresses,
connected_clients.clone(),
rx_sender,
tx_receiver,
));
{
let player = player.clone();
let tx_sender = tx_sender.clone();
let osc_events = osc_events.clone();
info!("Starting broadcast loop");
tokio::spawn(async move {
loop {
if let Err(e) = Self::broadcast(&player, &osc_events, &tx_sender).await {
error!(err = e, "Error broadcasting player status");
}
tokio::time::sleep(BROADCAST_SLEEP_DURATION).await;
}
});
}
loop {
let packet = rx_receiver.recv().await;
let tx_sender = tx_sender.clone();
if let Some(packet) = packet {
if Self::handle_packet(&player, &osc_events, &packet)
.await
.map_err(|e| io::Error::other(e.to_string()))?
{
if let Err(e) = Self::broadcast(&player, &osc_events, &tx_sender).await {
error!(err = e, "Error broadcasting player status");
};
}
}
}
})
}
}
impl Driver {
pub(super) async fn handle_udp_comms(
socket: UdpSocket,
broadcast_addresses: Vec<SocketAddr>,
connected_clients: Arc<Mutex<HashSet<SocketAddr>>>,
rx_sender: Sender<OscPacket>,
mut tx_receiver: Receiver<OscPacket>,
) {
let mut buf = [0u8; rosc::decoder::MTU];
loop {
select! {
result = socket.recv_from(&mut buf) => {
match result {
Ok((size, sender_addr)) => {
{
let mut clients = connected_clients.lock().await;
clients.insert(sender_addr);
}
match rosc::decoder::decode_udp(&buf[..size]) {
Ok((_, packet)) => {
if let Err(e) = rx_sender.send(packet).await {
error!(err = e.to_string(), "Error sending packet on channel.");
}
}
Err(e) => error!(err = e.to_string(), "Error decoding OSC message"),
}
},
Err(e) => error!(err = e.to_string(), "Error receiving UDP."),
}
}
packet = tx_receiver.recv() => {
if let Some(packet) = packet {
match rosc::encoder::encode(&packet) {
Ok(buf) => {
for addr in broadcast_addresses.iter() {
if let Err(e) = socket.send_to(&buf, addr).await {
error!(err = e.to_string(), "Error sending UDP data.");
}
}
let clients = connected_clients.lock().await;
for addr in clients.iter() {
if let Err(e) = socket.send_to(&buf, addr).await {
error!(err = e.to_string(), "Error sending UDP data to client.");
}
}
}
Err(e) => error!(err = e.to_string(), "Error encoding OSC message"),
};
}
}
};
}
}
pub(super) async fn broadcast(
player: &Arc<Player>,
osc_events: &Arc<OscEvents>,
tx_sender: &Sender<OscPacket>,
) -> Result<(), Box<dyn Error>> {
let playlist = player.get_playlist();
let song = match playlist.current() {
Some(song) => song,
None => return Ok(()),
};
let is_playing = player.is_playing().await;
let elapsed = player.elapsed().await?;
let status_string = if is_playing {
STATUS_PLAYING
} else {
STATUS_STOPPED
};
let duration_string = format!(
"{}/{}",
util::duration_minutes_seconds(elapsed.unwrap_or_default()),
util::duration_minutes_seconds(song.duration())
);
let playlist_songs: Vec<String> = playlist.songs().to_vec();
let packets = build_broadcast_packets(
osc_events,
song.name(),
status_string,
&duration_string,
&playlist_songs,
);
for packet in packets {
tx_sender.send(packet).await?;
}
Ok(())
}
pub(super) async fn handle_packet(
player: &Arc<Player>,
osc_events: &Arc<OscEvents>,
packet: &OscPacket,
) -> Result<bool, Box<dyn Error>> {
match packet {
OscPacket::Message(osc_message) => {
Box::pin(Self::handle_message(player, osc_events, osc_message)).await
}
OscPacket::Bundle(osc_bundle) => {
let mut recognized_event = false;
for packet in &osc_bundle.content {
recognized_event = recognized_event
|| Box::pin(Self::handle_packet(player, osc_events, packet)).await?;
}
Ok(recognized_event)
}
}
}
pub(super) async fn handle_message(
player: &Arc<Player>,
osc_events: &Arc<OscEvents>,
msg: &OscMessage,
) -> Result<bool, Box<dyn Error>> {
let action = classify_message(osc_events, &msg.addr)?;
match action {
OscAction::Play => {
if let Err(e) = player.play().await {
error!(err = e.as_ref(), "Failed to play song: {}", e);
}
}
OscAction::Prev => {
player.prev().await;
}
OscAction::Next => {
player.next().await;
}
OscAction::Stop => {
player.stop().await;
}
OscAction::AllSongs => {
if let Err(e) = player.switch_to_playlist("all_songs").await {
error!("Failed to switch to all_songs: {}", e);
}
}
OscAction::Playlist => {
let name = player.persisted_playlist_name();
if let Err(e) = player.switch_to_playlist(&name).await {
error!("Failed to switch to playlist {}: {}", name, e);
}
}
OscAction::StopSamples => player.stop_samples(),
OscAction::SectionAck => {
if let Err(e) = player.section_ack().await {
error!("Failed to ack section: {}", e);
}
}
OscAction::StopSectionLoop => {
player.stop_section_loop();
}
OscAction::LoopSection => {
let section_name = msg
.args
.first()
.and_then(|arg| match arg {
OscType::String(s) => Some(s.as_str()),
_ => None,
})
.unwrap_or("");
if section_name.is_empty() {
error!("loop_section OSC message missing section name argument");
} else if let Err(e) = player.loop_section(section_name).await {
error!("Failed to loop section '{}': {}", section_name, e);
}
}
OscAction::Unrecognized => return Ok(false),
}
Ok(true)
}
}
fn classify_message(osc_events: &OscEvents, addr: &str) -> Result<OscAction, Box<dyn Error>> {
let address = OscAddress::new(addr.to_string())?;
if osc_events.play.match_address(&address) {
Ok(OscAction::Play)
} else if osc_events.prev.match_address(&address) {
Ok(OscAction::Prev)
} else if osc_events.next.match_address(&address) {
Ok(OscAction::Next)
} else if osc_events.stop.match_address(&address) {
Ok(OscAction::Stop)
} else if osc_events.all_songs.match_address(&address) {
Ok(OscAction::AllSongs)
} else if osc_events.playlist.match_address(&address) {
Ok(OscAction::Playlist)
} else if osc_events.stop_samples.match_address(&address) {
Ok(OscAction::StopSamples)
} else if osc_events.section_ack.match_address(&address) {
Ok(OscAction::SectionAck)
} else if osc_events.stop_section_loop.match_address(&address) {
Ok(OscAction::StopSectionLoop)
} else if osc_events.loop_section.match_address(&address) {
Ok(OscAction::LoopSection)
} else {
Ok(OscAction::Unrecognized)
}
}
fn format_playlist_content(songs: &[String]) -> String {
songs
.iter()
.enumerate()
.map(|(i, song)| format!("{}. {}", i + 1, song))
.collect::<Vec<String>>()
.join("\n")
}
fn build_broadcast_packets(
osc_events: &OscEvents,
song_name: &str,
status: &str,
duration_string: &str,
playlist_songs: &[String],
) -> Vec<OscPacket> {
vec![
OscPacket::Message(OscMessage {
addr: osc_events.playlist_current_song.clone(),
args: vec![OscType::String(song_name.to_string())],
}),
OscPacket::Message(OscMessage {
addr: osc_events.status.clone(),
args: vec![OscType::String(status.to_string())],
}),
OscPacket::Message(OscMessage {
addr: osc_events.playlist_current_song_elapsed.clone(),
args: vec![OscType::String(duration_string.to_string())],
}),
OscPacket::Message(OscMessage {
addr: osc_events.playlist_current.clone(),
args: vec![OscType::String(format_playlist_content(playlist_songs))],
}),
]
}
#[cfg(test)]
mod test {
use std::{
collections::HashMap, error::Error, net::SocketAddr, path::Path, sync::Arc, time::Duration,
};
use rosc::{OscMessage, OscPacket, OscType};
use tokio::{net::UdpSocket, sync::mpsc, time::timeout};
use crate::{
config,
controller::osc::{Driver, STATUS_PLAYING, STATUS_STOPPED},
playlist,
playlist::Playlist,
songs,
testutil::{eventually, eventually_async},
};
use super::Player;
#[tokio::test(flavor = "multi_thread")]
async fn test_osc() -> Result<(), Box<dyn Error>> {
let songs = songs::get_all_songs(Path::new("assets/songs"))?;
let pl = Playlist::new(
"playlist",
&config::Playlist::deserialize(Path::new("assets/playlist.yaml"))?,
songs.clone(),
)?;
let mut playlists = HashMap::new();
playlists.insert(
"all_songs".to_string(),
playlist::from_songs(songs.clone())?,
);
playlists.insert("playlist".to_string(), pl);
let player = Player::new(
playlists,
"playlist".to_string(),
&config::Player::new(
vec![],
Some(config::Audio::new("mock-device")),
Some(config::Midi::new("mock-midi-device", None)),
None,
HashMap::new(),
"assets/songs",
),
None,
)?;
player.await_hardware_ready().await;
let binding = player
.audio_device()
.expect("audio device should be present");
let device = binding.to_mock()?;
let driver = Driver::new(Box::new(config::OscController::new()), player.clone())?;
let next = driver.osc_events.next.pattern.clone();
let prev = driver.osc_events.prev.pattern.clone();
let play = driver.osc_events.play.pattern.clone();
let stop = driver.osc_events.stop.pattern.clone();
let all_songs = driver.osc_events.all_songs.pattern.clone();
let playlist = driver.osc_events.playlist.pattern.clone();
let osc_packet = |address| async {
let packet = osc_event(address);
Driver::handle_packet(&player, &driver.osc_events, &packet).await
};
println!("Playlist -> Song 1");
assert_eq!(player.get_playlist().current().unwrap().name(), "Song 1");
osc_packet(next.clone()).await?;
println!("Playlist -> Song 3");
assert_eq!(player.get_playlist().current().unwrap().name(), "Song 3");
osc_packet(prev.clone()).await?;
println!("Playlist -> Song 1");
assert_eq!(player.get_playlist().current().unwrap().name(), "Song 1");
println!("Switch to AllSongs");
osc_packet(all_songs.clone()).await?;
assert_eq!(player.get_playlist().current().unwrap().name(), "Song 1");
osc_packet(next.clone()).await?;
println!("AllSongs -> Song 10");
assert_eq!(player.get_playlist().current().unwrap().name(), "Song 10");
osc_packet(next.clone()).await?;
println!("AllSongs -> Song 2");
assert_eq!(player.get_playlist().current().unwrap().name(), "Song 2");
osc_packet(next.clone()).await?;
println!("AllSongs -> Song 3");
assert_eq!(player.get_playlist().current().unwrap().name(), "Song 3");
osc_packet(playlist.clone()).await?;
println!("Switch to Playlist");
assert_eq!(player.get_playlist().current().unwrap().name(), "Song 1");
osc_packet(next.clone()).await?;
println!("Playlist -> Song 3");
assert_eq!(player.get_playlist().current().unwrap().name(), "Song 3");
osc_packet(play.clone()).await?;
eventually(
|| player.get_playlist().current().unwrap().name() == "Song 5",
format!(
"Song never moved to next, on song {}",
player.get_playlist().current().unwrap().name()
)
.as_str(),
);
osc_packet(play.clone()).await?;
println!("Play Song 5.");
eventually(|| device.is_playing(), "Song never started playing");
let (tx_sender, mut tx_receiver) = mpsc::channel::<OscPacket>(10);
Driver::broadcast(&player, &driver.osc_events, &tx_sender).await?;
let mut buf: Vec<OscPacket> = Vec::new();
tx_receiver.recv_many(&mut buf, 10).await;
assert_eq!(
buf[1],
OscPacket::Message(OscMessage {
addr: driver.osc_events.status.clone(),
args: vec![OscType::String(STATUS_PLAYING.to_string())],
})
);
osc_packet(stop.clone()).await?;
eventually(|| !device.is_playing(), "Song never stopped playing");
eventually_async(
|| async { !player.is_playing().await },
"Player state never updated to stopped",
)
.await;
assert_eq!(player.get_playlist().current().unwrap().name(), "Song 5");
Driver::broadcast(&player, &driver.osc_events, &tx_sender).await?;
let mut buf: Vec<OscPacket> = Vec::new();
tx_receiver.recv_many(&mut buf, 10).await;
assert_eq!(buf.len(), 4);
assert_eq!(
buf[0],
OscPacket::Message(OscMessage {
addr: driver.osc_events.playlist_current_song.clone(),
args: vec![OscType::String("Song 5".to_string())],
})
);
assert_eq!(
buf[1],
OscPacket::Message(OscMessage {
addr: driver.osc_events.status.clone(),
args: vec![OscType::String(STATUS_STOPPED.to_string())],
})
);
assert_eq!(
buf[2],
OscPacket::Message(OscMessage {
addr: driver.osc_events.playlist_current_song_elapsed.clone(),
args: vec![OscType::String("0:00/0:20".to_string())],
})
);
assert_eq!(
buf[3],
OscPacket::Message(OscMessage {
addr: driver.osc_events.playlist_current.clone(),
args: vec![OscType::String(
"1. Song 1\n2. Song 3\n3. Song 5\n4. Song 7\n5. Song 9".to_string()
)],
})
);
Ok(())
}
fn osc_event(addr: String) -> OscPacket {
OscPacket::Message(OscMessage { addr, args: vec![] })
}
#[tokio::test(flavor = "multi_thread")]
async fn test_osc_client_tracking() -> Result<(), Box<dyn Error>> {
let songs = songs::get_all_songs(Path::new("assets/songs"))?;
let pl = Playlist::new(
"playlist",
&config::Playlist::deserialize(Path::new("assets/playlist.yaml"))?,
songs.clone(),
)?;
let mut playlists = HashMap::new();
playlists.insert(
"all_songs".to_string(),
playlist::from_songs(songs.clone())?,
);
playlists.insert("playlist".to_string(), pl);
let player = Player::new(
playlists,
"playlist".to_string(),
&config::Player::new(
vec![],
Some(config::Audio::new("mock-device")),
Some(config::Midi::new("mock-midi-device", None)),
None,
HashMap::new(),
"assets/songs",
),
None,
)?;
player.await_hardware_ready().await;
let _player = player;
let server_socket = UdpSocket::bind("127.0.0.1:0").await?;
let server_addr = server_socket.local_addr()?;
let client_socket = UdpSocket::bind("127.0.0.1:0").await?;
let client_addr = client_socket.local_addr()?;
let (rx_sender, _rx_receiver) = mpsc::channel::<OscPacket>(10);
let (tx_sender, tx_receiver) = mpsc::channel::<OscPacket>(10);
let connected_clients: Arc<tokio::sync::Mutex<std::collections::HashSet<SocketAddr>>> =
Arc::new(tokio::sync::Mutex::new(std::collections::HashSet::new()));
let handler_clients = connected_clients.clone();
let handler_socket = server_socket;
let handler_task = tokio::spawn(async move {
Driver::handle_udp_comms(
handler_socket,
vec![], handler_clients,
rx_sender,
tx_receiver,
)
.await;
});
let test_message = OscPacket::Message(OscMessage {
addr: "/test".to_string(),
args: vec![],
});
let encoded = rosc::encoder::encode(&test_message)?;
client_socket.send_to(&encoded, server_addr).await?;
tokio::time::sleep(Duration::from_millis(100)).await;
let clients = connected_clients.lock().await;
assert!(
clients.contains(&client_addr),
"Client address should be in connected clients list"
);
drop(clients);
let broadcast_packet = OscPacket::Message(OscMessage {
addr: "/status".to_string(),
args: vec![OscType::String("test".to_string())],
});
tx_sender.send(broadcast_packet).await?;
tokio::time::sleep(Duration::from_millis(100)).await;
let mut recv_buf = [0u8; 1024];
match timeout(
Duration::from_secs(1),
client_socket.recv_from(&mut recv_buf),
)
.await
{
Ok(Ok((size, _))) => {
match rosc::decoder::decode_udp(&recv_buf[..size]) {
Ok((_, packet)) => {
if let OscPacket::Message(msg) = packet {
assert_eq!(msg.addr, "/status");
assert_eq!(msg.args.len(), 1);
if let OscType::String(s) = &msg.args[0] {
assert_eq!(s, "test");
} else {
panic!("Expected string argument");
}
} else {
panic!("Expected OSC message");
}
}
Err(e) => panic!("Failed to decode received packet: {}", e),
}
}
Ok(Err(e)) => panic!("Failed to receive broadcast: {}", e),
Err(_) => panic!("Timeout waiting for broadcast message"),
}
handler_task.abort();
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn test_osc_multiple_clients() -> Result<(), Box<dyn Error>> {
let songs = songs::get_all_songs(Path::new("assets/songs"))?;
let pl = Playlist::new(
"playlist",
&config::Playlist::deserialize(Path::new("assets/playlist.yaml"))?,
songs.clone(),
)?;
let mut playlists = HashMap::new();
playlists.insert(
"all_songs".to_string(),
playlist::from_songs(songs.clone())?,
);
playlists.insert("playlist".to_string(), pl);
let player = Player::new(
playlists,
"playlist".to_string(),
&config::Player::new(
vec![],
Some(config::Audio::new("mock-device")),
Some(config::Midi::new("mock-midi-device", None)),
None,
HashMap::new(),
"assets/songs",
),
None,
)?;
player.await_hardware_ready().await;
let _player = player;
let server_socket = UdpSocket::bind("127.0.0.1:0").await?;
let server_addr = server_socket.local_addr()?;
let client1_socket = UdpSocket::bind("127.0.0.1:0").await?;
let client1_addr = client1_socket.local_addr()?;
let client2_socket = UdpSocket::bind("127.0.0.1:0").await?;
let client2_addr = client2_socket.local_addr()?;
let (rx_sender, _rx_receiver) = mpsc::channel::<OscPacket>(10);
let (tx_sender, tx_receiver) = mpsc::channel::<OscPacket>(10);
let connected_clients: Arc<tokio::sync::Mutex<std::collections::HashSet<SocketAddr>>> =
Arc::new(tokio::sync::Mutex::new(std::collections::HashSet::new()));
let handler_clients = connected_clients.clone();
let handler_socket = server_socket;
let handler_task = tokio::spawn(async move {
Driver::handle_udp_comms(
handler_socket,
vec![],
handler_clients,
rx_sender,
tx_receiver,
)
.await;
});
let test_message = OscPacket::Message(OscMessage {
addr: "/test".to_string(),
args: vec![],
});
let encoded = rosc::encoder::encode(&test_message)?;
client1_socket.send_to(&encoded, server_addr).await?;
client2_socket.send_to(&encoded, server_addr).await?;
tokio::time::sleep(Duration::from_millis(100)).await;
let clients = connected_clients.lock().await;
assert!(
clients.contains(&client1_addr),
"Client 1 should be in connected clients list"
);
assert!(
clients.contains(&client2_addr),
"Client 2 should be in connected clients list"
);
assert_eq!(clients.len(), 2, "Should have exactly 2 clients");
drop(clients);
let broadcast_packet = OscPacket::Message(OscMessage {
addr: "/broadcast".to_string(),
args: vec![OscType::String("test".to_string())],
});
tx_sender.send(broadcast_packet).await?;
tokio::time::sleep(Duration::from_millis(100)).await;
let mut recv_buf1 = [0u8; 1024];
let mut recv_buf2 = [0u8; 1024];
match timeout(
Duration::from_secs(1),
client1_socket.recv_from(&mut recv_buf1),
)
.await
{
Ok(Ok((size, _))) => {
let decoded = rosc::decoder::decode_udp(&recv_buf1[..size])?;
if let OscPacket::Message(msg) = decoded.1 {
assert_eq!(msg.addr, "/broadcast");
}
}
_ => panic!("Client 1 should receive broadcast"),
}
match timeout(
Duration::from_secs(1),
client2_socket.recv_from(&mut recv_buf2),
)
.await
{
Ok(Ok((size, _))) => {
let decoded = rosc::decoder::decode_udp(&recv_buf2[..size])?;
if let OscPacket::Message(msg) = decoded.1 {
assert_eq!(msg.addr, "/broadcast");
}
}
_ => panic!("Client 2 should receive broadcast"),
}
handler_task.abort();
Ok(())
}
use super::{
build_broadcast_packets, classify_message, format_playlist_content, OscAction, OscEvents,
};
use rosc::address::Matcher;
fn make_default_osc_events() -> OscEvents {
let config = config::OscController::new();
OscEvents {
play: Matcher::new(config.play()).unwrap(),
prev: Matcher::new(config.prev()).unwrap(),
next: Matcher::new(config.next()).unwrap(),
stop: Matcher::new(config.stop()).unwrap(),
all_songs: Matcher::new(config.all_songs()).unwrap(),
playlist: Matcher::new(config.playlist()).unwrap(),
stop_samples: Matcher::new(config.stop_samples()).unwrap(),
section_ack: Matcher::new(config.section_ack()).unwrap(),
stop_section_loop: Matcher::new(config.stop_section_loop()).unwrap(),
loop_section: Matcher::new(config.loop_section()).unwrap(),
status: config.status().to_string(),
playlist_current: config.playlist_current().to_string(),
playlist_current_song: config.playlist_current_song().to_string(),
playlist_current_song_elapsed: config.playlist_current_song_elapsed().to_string(),
}
}
mod classify_message_tests {
use super::{classify_message, make_default_osc_events, OscAction};
#[test]
fn recognizes_play() {
let events = make_default_osc_events();
assert_eq!(
classify_message(&events, "/mtrack/play").unwrap(),
OscAction::Play
);
}
#[test]
fn recognizes_prev() {
let events = make_default_osc_events();
assert_eq!(
classify_message(&events, "/mtrack/prev").unwrap(),
OscAction::Prev
);
}
#[test]
fn recognizes_next() {
let events = make_default_osc_events();
assert_eq!(
classify_message(&events, "/mtrack/next").unwrap(),
OscAction::Next
);
}
#[test]
fn recognizes_stop() {
let events = make_default_osc_events();
assert_eq!(
classify_message(&events, "/mtrack/stop").unwrap(),
OscAction::Stop
);
}
#[test]
fn recognizes_all_songs() {
let events = make_default_osc_events();
assert_eq!(
classify_message(&events, "/mtrack/all_songs").unwrap(),
OscAction::AllSongs
);
}
#[test]
fn recognizes_playlist() {
let events = make_default_osc_events();
assert_eq!(
classify_message(&events, "/mtrack/playlist").unwrap(),
OscAction::Playlist
);
}
#[test]
fn recognizes_stop_samples() {
let events = make_default_osc_events();
assert_eq!(
classify_message(&events, "/mtrack/samples/stop").unwrap(),
OscAction::StopSamples
);
}
#[test]
fn unrecognized_address() {
let events = make_default_osc_events();
assert_eq!(
classify_message(&events, "/unknown/address").unwrap(),
OscAction::Unrecognized
);
}
#[test]
fn invalid_osc_address_is_error() {
let events = make_default_osc_events();
assert!(classify_message(&events, "no_leading_slash").is_err());
}
}
mod format_playlist_content_tests {
use super::format_playlist_content;
#[test]
fn empty_playlist() {
assert_eq!(format_playlist_content(&[]), "");
}
#[test]
fn single_song() {
let songs = vec!["Song A".to_string()];
assert_eq!(format_playlist_content(&songs), "1. Song A");
}
#[test]
fn multiple_songs() {
let songs = vec![
"Song A".to_string(),
"Song B".to_string(),
"Song C".to_string(),
];
assert_eq!(
format_playlist_content(&songs),
"1. Song A\n2. Song B\n3. Song C"
);
}
#[test]
fn preserves_song_names() {
let songs = vec![" Spaces ".to_string(), "Special: Chars!".to_string()];
assert_eq!(
format_playlist_content(&songs),
"1. Spaces \n2. Special: Chars!"
);
}
}
mod handle_packet_tests {
use super::*;
use std::{collections::HashMap, path::Path, sync::Arc};
async fn make_player() -> Arc<Player> {
let songs = songs::get_all_songs(Path::new("assets/songs")).unwrap();
let pl = Playlist::new(
"playlist",
&config::Playlist::deserialize(Path::new("assets/playlist.yaml")).unwrap(),
songs.clone(),
)
.unwrap();
let mut playlists = HashMap::new();
playlists.insert(
"all_songs".to_string(),
playlist::from_songs(songs.clone()).unwrap(),
);
playlists.insert("playlist".to_string(), pl);
let player = Player::new(
playlists,
"playlist".to_string(),
&config::Player::new(
vec![],
Some(config::Audio::new("mock-device")),
Some(config::Midi::new("mock-midi-device", None)),
None,
HashMap::new(),
"assets/songs",
),
None,
)
.unwrap();
player.await_hardware_ready().await;
player
}
#[tokio::test(flavor = "multi_thread")]
async fn handle_packet_bundle() {
let player = make_player().await;
let events = Arc::new(make_default_osc_events());
let bundle = OscPacket::Bundle(rosc::OscBundle {
timetag: rosc::OscTime {
seconds: 0,
fractional: 0,
},
content: vec![OscPacket::Message(OscMessage {
addr: "/mtrack/next".to_string(),
args: vec![],
})],
});
assert_eq!(player.get_playlist().current().unwrap().name(), "Song 1");
let result = Driver::handle_packet(&player, &events, &bundle).await;
assert!(
result.unwrap(),
"bundle with recognized messages should return true"
);
assert_eq!(player.get_playlist().current().unwrap().name(), "Song 3");
}
#[tokio::test(flavor = "multi_thread")]
async fn handle_packet_bundle_unrecognized() {
let player = make_player().await;
let events = Arc::new(make_default_osc_events());
let bundle = OscPacket::Bundle(rosc::OscBundle {
timetag: rosc::OscTime {
seconds: 0,
fractional: 0,
},
content: vec![OscPacket::Message(OscMessage {
addr: "/unknown".to_string(),
args: vec![],
})],
});
let result = Driver::handle_packet(&player, &events, &bundle).await;
assert!(
!result.unwrap(),
"bundle with only unrecognized messages should return false"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn handle_message_stop_samples() {
let player = make_player().await;
let events = Arc::new(make_default_osc_events());
let packet = OscPacket::Message(OscMessage {
addr: "/mtrack/samples/stop".to_string(),
args: vec![],
});
let result = Driver::handle_packet(&player, &events, &packet).await;
assert!(result.unwrap(), "stop_samples should return true");
}
#[tokio::test(flavor = "multi_thread")]
async fn handle_message_unrecognized_returns_false() {
let player = make_player().await;
let events = Arc::new(make_default_osc_events());
let packet = OscPacket::Message(OscMessage {
addr: "/something/unknown".to_string(),
args: vec![],
});
let result = Driver::handle_packet(&player, &events, &packet).await;
assert!(!result.unwrap(), "unrecognized address should return false");
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_osc_monitor_events() -> Result<(), Box<dyn Error>> {
let songs = songs::get_all_songs(Path::new("assets/songs"))?;
let pl = Playlist::new(
"playlist",
&config::Playlist::deserialize(Path::new("assets/playlist.yaml"))?,
songs.clone(),
)?;
let mut playlists = HashMap::new();
playlists.insert(
"all_songs".to_string(),
playlist::from_songs(songs.clone())?,
);
playlists.insert("playlist".to_string(), pl);
let player = Player::new(
playlists,
"playlist".to_string(),
&config::Player::new(
vec![],
Some(config::Audio::new("mock-device")),
Some(config::Midi::new("mock-midi-device", None)),
None,
HashMap::new(),
"assets/songs",
),
None,
)?;
player.await_hardware_ready().await;
let binding = player
.audio_device()
.expect("audio device should be present");
let device = binding.to_mock()?;
let temp_socket = std::net::UdpSocket::bind("127.0.0.1:0")?;
let port = temp_socket.local_addr()?.port();
drop(temp_socket);
let driver = Arc::new(Driver {
player: player.clone(),
addr: SocketAddr::V4(std::net::SocketAddrV4::new(
std::net::Ipv4Addr::UNSPECIFIED,
port,
)),
broadcast_addresses: vec![],
osc_events: Arc::new(make_default_osc_events()),
});
let handle = super::super::Driver::monitor_events(&*driver);
tokio::time::sleep(Duration::from_millis(200)).await;
let send_osc = |addr: &str| {
let addr = addr.to_string();
async move {
let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let packet = OscPacket::Message(OscMessage { addr, args: vec![] });
let encoded = rosc::encoder::encode(&packet).unwrap();
client
.send_to(&encoded, format!("127.0.0.1:{}", port))
.await
.unwrap();
}
};
assert_eq!(player.get_playlist().current().unwrap().name(), "Song 1");
send_osc("/mtrack/next").await;
eventually(
|| player.get_playlist().current().unwrap().name() == "Song 3",
"Player never advanced to Song 3 via monitor_events",
);
send_osc("/mtrack/play").await;
eventually(
|| device.is_playing(),
"Song never started playing via monitor_events",
);
send_osc("/mtrack/stop").await;
eventually(
|| !device.is_playing(),
"Song never stopped via monitor_events",
);
handle.abort();
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn test_osc_monitor_events_with_multicast() -> Result<(), Box<dyn Error>> {
let songs = songs::get_all_songs(Path::new("assets/songs"))?;
let pl = Playlist::new(
"playlist",
&config::Playlist::deserialize(Path::new("assets/playlist.yaml"))?,
songs.clone(),
)?;
let mut playlists = HashMap::new();
playlists.insert(
"all_songs".to_string(),
playlist::from_songs(songs.clone())?,
);
playlists.insert("playlist".to_string(), pl);
let player = Player::new(
playlists,
"playlist".to_string(),
&config::Player::new(
vec![],
Some(config::Audio::new("mock-device")),
Some(config::Midi::new("mock-midi-device", None)),
None,
HashMap::new(),
"assets/songs",
),
None,
)?;
player.await_hardware_ready().await;
let temp_socket = std::net::UdpSocket::bind("127.0.0.1:0")?;
let port = temp_socket.local_addr()?.port();
drop(temp_socket);
let multicast_addr: SocketAddr = format!("224.0.0.1:{}", port + 1).parse()?;
let driver = Arc::new(Driver {
player: player.clone(),
addr: SocketAddr::V4(std::net::SocketAddrV4::new(
std::net::Ipv4Addr::UNSPECIFIED,
port,
)),
broadcast_addresses: vec![multicast_addr],
osc_events: Arc::new(make_default_osc_events()),
});
let handle = super::super::Driver::monitor_events(&*driver);
tokio::time::sleep(Duration::from_millis(200)).await;
let client = UdpSocket::bind("127.0.0.1:0").await?;
let packet = OscPacket::Message(OscMessage {
addr: "/mtrack/next".to_string(),
args: vec![],
});
let encoded = rosc::encoder::encode(&packet)?;
client
.send_to(&encoded, format!("127.0.0.1:{}", port))
.await?;
eventually(
|| player.get_playlist().current().unwrap().name() == "Song 3",
"Player never advanced via monitor_events with multicast",
);
handle.abort();
Ok(())
}
mod build_broadcast_packets_tests {
use super::{build_broadcast_packets, make_default_osc_events};
use crate::controller::osc::{STATUS_PLAYING, STATUS_STOPPED};
use rosc::{OscMessage, OscPacket, OscType};
#[test]
fn returns_four_packets() {
let events = make_default_osc_events();
let songs = vec!["Song 1".to_string(), "Song 2".to_string()];
let packets =
build_broadcast_packets(&events, "Song 1", STATUS_STOPPED, "0:00/3:30", &songs);
assert_eq!(packets.len(), 4);
}
#[test]
fn first_packet_is_current_song() {
let events = make_default_osc_events();
let packets =
build_broadcast_packets(&events, "My Song", STATUS_PLAYING, "1:00/3:00", &[]);
assert_eq!(
packets[0],
OscPacket::Message(OscMessage {
addr: events.playlist_current_song.clone(),
args: vec![OscType::String("My Song".to_string())],
})
);
}
#[test]
fn second_packet_is_status() {
let events = make_default_osc_events();
let packets =
build_broadcast_packets(&events, "Song", STATUS_PLAYING, "0:00/0:00", &[]);
assert_eq!(
packets[1],
OscPacket::Message(OscMessage {
addr: events.status.clone(),
args: vec![OscType::String(STATUS_PLAYING.to_string())],
})
);
}
#[test]
fn third_packet_is_elapsed() {
let events = make_default_osc_events();
let packets =
build_broadcast_packets(&events, "Song", STATUS_STOPPED, "1:23/4:56", &[]);
assert_eq!(
packets[2],
OscPacket::Message(OscMessage {
addr: events.playlist_current_song_elapsed.clone(),
args: vec![OscType::String("1:23/4:56".to_string())],
})
);
}
#[test]
fn fourth_packet_is_playlist() {
let events = make_default_osc_events();
let songs = vec!["A".to_string(), "B".to_string()];
let packets =
build_broadcast_packets(&events, "A", STATUS_STOPPED, "0:00/1:00", &songs);
assert_eq!(
packets[3],
OscPacket::Message(OscMessage {
addr: events.playlist_current.clone(),
args: vec![OscType::String("1. A\n2. B".to_string())],
})
);
}
#[test]
fn stopped_status() {
let events = make_default_osc_events();
let packets =
build_broadcast_packets(&events, "Song", STATUS_STOPPED, "0:00/0:00", &[]);
if let OscPacket::Message(msg) = &packets[1] {
assert_eq!(msg.args[0], OscType::String(STATUS_STOPPED.to_string()));
} else {
panic!("expected message");
}
}
}
}