#[cfg(test)]
use rill_core::queues::MpscQueue;
use rill_core::queues::{SetParameter, SignalOrigin};
use rill_core::traits::{NodeId, ParamValue, ParameterId, PortId};
use rill_core_actor::ActorRef;
#[cfg(test)]
use std::sync::Arc;
use tokio::sync::{mpsc, watch};
use crate::strategy::{ConflictStrategy, ControlStrategy, UiCommand};
pub struct PortCombinerHandle {
pub automaton_tx: mpsc::Sender<f64>,
pub ui_tx: mpsc::UnboundedSender<UiCommand>,
cancel_tx: watch::Sender<bool>,
_handle: tokio::task::JoinHandle<()>,
}
impl PortCombinerHandle {
pub fn stop(&self) {
let _ = self.cancel_tx.send(true);
}
pub fn cancel_rx(&self) -> watch::Receiver<bool> {
self.cancel_tx.subscribe()
}
}
pub fn spawn_combiner(
target: (NodeId, String),
range: (f64, f64),
control: ControlStrategy,
conflict: ConflictStrategy,
output_queue: ActorRef<SetParameter>,
) -> PortCombinerHandle {
let (automaton_tx, automaton_rx) = mpsc::channel::<f64>(16);
let (ui_tx, ui_rx) = mpsc::unbounded_channel::<UiCommand>();
let (cancel_tx, cancel_rx) = watch::channel(false);
let handle = tokio::spawn(combiner_loop(
automaton_rx,
ui_rx,
cancel_rx,
target,
range,
control,
conflict,
output_queue,
));
PortCombinerHandle {
automaton_tx,
ui_tx,
cancel_tx,
_handle: handle,
}
}
async fn combiner_loop(
mut automaton_rx: mpsc::Receiver<f64>,
mut ui_rx: mpsc::UnboundedReceiver<UiCommand>,
mut cancel_rx: watch::Receiver<bool>,
target: (NodeId, String),
range: (f64, f64),
control: ControlStrategy,
conflict: ConflictStrategy,
output_queue: ActorRef<SetParameter>,
) {
let (node_id, param_name) = target;
let (min, max) = range;
let mut base = center(min, max);
let mut frozen = false;
let mut latest_mod = 0.0;
loop {
tokio::select! {
_ = cancel_rx.changed() => {
if *cancel_rx.borrow() {
break;
}
}
Some(mod_val) = automaton_rx.recv() => {
latest_mod = mod_val;
if frozen {
continue;
}
let value = combine(mod_val, base, control, min, max);
let pid = ParameterId::new(¶m_name).unwrap();
output_queue.send(SetParameter::new(
PortId::param(node_id, 0), pid, ParamValue::Float(value as f32), SignalOrigin::Manual,
));
}
Some(cmd) = ui_rx.recv() => {
match (cmd, conflict) {
(UiCommand::SetValue(v), ConflictStrategy::TouchOverride) => {
base = v;
frozen = true;
let pid = ParameterId::new(¶m_name).unwrap();
output_queue.send(SetParameter::new(
PortId::param(node_id, 0), pid, ParamValue::Float(v as f32), SignalOrigin::Manual,
));
}
(UiCommand::SetValue(v), ConflictStrategy::BasePlusModulation) => {
base = v;
let value = combine(latest_mod, v, control, min, max);
let pid = ParameterId::new(¶m_name).unwrap();
output_queue.send(SetParameter::new(
PortId::param(node_id, 0), pid, ParamValue::Float(value as f32), SignalOrigin::Manual,
));
}
(UiCommand::SetValue(v), ConflictStrategy::LastWriteWins) => {
let pid = ParameterId::new(¶m_name).unwrap();
output_queue.send(SetParameter::new(
PortId::param(node_id, 0), pid, ParamValue::Float(v as f32), SignalOrigin::Manual,
));
}
(UiCommand::Release, ConflictStrategy::TouchOverride) => {
frozen = false;
let value = combine(latest_mod, base, control, min, max);
let pid = ParameterId::new(¶m_name).unwrap();
output_queue.send(SetParameter::new(
PortId::param(node_id, 0), pid, ParamValue::Float(value as f32), SignalOrigin::Manual,
));
}
(UiCommand::Release, _) => {
}
}
}
else => break,
}
}
}
fn combine(mod_val: f64, base: f64, control: ControlStrategy, min: f64, max: f64) -> f64 {
match control {
ControlStrategy::Absolute => {
min + mod_val * (max - min)
}
ControlStrategy::Modulation { depth } => {
let value = base + mod_val * depth * (max - min);
value.clamp(min, max)
}
}
}
fn center(min: f64, max: f64) -> f64 {
(min + max) / 2.0
}
#[cfg(test)]
mod tests {
use super::*;
use crate::strategy::ControlStrategy;
#[test]
fn test_combine_absolute() {
let result = combine(0.5, 0.0, ControlStrategy::Absolute, 0.0, 1.0);
assert!((result - 0.5).abs() < 1e-9);
let result = combine(0.0, 0.0, ControlStrategy::Absolute, 100.0, 1000.0);
assert!((result - 100.0).abs() < 1e-9);
let result = combine(1.0, 0.0, ControlStrategy::Absolute, 100.0, 1000.0);
assert!((result - 1000.0).abs() < 1e-9);
}
#[test]
fn test_combine_modulation() {
let strategy = ControlStrategy::Modulation { depth: 1.0 };
let result = combine(0.0, 500.0, strategy, 0.0, 1000.0);
assert!((result - 500.0).abs() < 1e-9);
let result = combine(1.0, 500.0, strategy, 0.0, 1000.0);
assert!((result - 1000.0).abs() < 1e-9);
let result = combine(-1.0, 500.0, strategy, 0.0, 1000.0);
assert!((result - 0.0).abs() < 1e-9);
let shallow = ControlStrategy::Modulation { depth: 0.0 };
let result = combine(1.0, 300.0, shallow, 0.0, 1000.0);
assert!((result - 300.0).abs() < 1e-9);
}
#[tokio::test]
async fn test_combiner_absolute_touch_override() {
let mailbox = Arc::new(MpscQueue::with_capacity(64));
let actor_ref = ActorRef::new(&mailbox);
let handle = spawn_combiner(
(NodeId(1), "cutoff".into()),
(100.0, 1000.0),
ControlStrategy::Absolute,
ConflictStrategy::TouchOverride,
actor_ref,
);
handle.automaton_tx.send(0.5).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
assert!(!mailbox.is_empty());
let cmd = mailbox.pop().unwrap();
assert!((cmd.value.as_f32().unwrap() - 550.0).abs() < 1.0);
handle.ui_tx.send(UiCommand::SetValue(800.0)).unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
let cmd = mailbox.pop().unwrap();
assert!((cmd.value.as_f32().unwrap() - 800.0).abs() < 1.0);
handle.automaton_tx.send(0.1).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
assert!(mailbox.is_empty());
}
#[tokio::test]
async fn test_combiner_modulation_base_plus() {
let mailbox = Arc::new(MpscQueue::with_capacity(64));
let actor_ref = ActorRef::new(&mailbox);
let handle = spawn_combiner(
(NodeId(1), "cutoff".into()),
(100.0, 1000.0),
ControlStrategy::Modulation { depth: 0.5 },
ConflictStrategy::BasePlusModulation,
actor_ref,
);
handle.ui_tx.send(UiCommand::SetValue(500.0)).unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
let cmd = mailbox.pop().unwrap();
assert!((cmd.value.as_f32().unwrap() - 500.0).abs() < 1.0);
handle.automaton_tx.send(0.5).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
let cmd = mailbox.pop().unwrap();
assert!((cmd.value.as_f32().unwrap() - 725.0).abs() < 1.0);
}
#[tokio::test]
async fn test_combiner_last_write_wins() {
let mailbox = Arc::new(MpscQueue::with_capacity(64));
let actor_ref = ActorRef::new(&mailbox);
let handle = spawn_combiner(
(NodeId(1), "gain".into()),
(0.0, 1.0),
ControlStrategy::Absolute,
ConflictStrategy::LastWriteWins,
actor_ref,
);
handle.ui_tx.send(UiCommand::SetValue(0.8)).unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
let cmd1 = mailbox.pop().unwrap();
assert!((cmd1.value.as_f32().unwrap() - 0.8).abs() < 1e-6);
handle.automaton_tx.send(0.3).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
let cmd2 = mailbox.pop().unwrap();
assert!((cmd2.value.as_f32().unwrap() - 0.3).abs() < 1e-6);
}
#[tokio::test]
async fn test_combiner_release_unfreezes() {
let mailbox = Arc::new(MpscQueue::with_capacity(64));
let actor_ref = ActorRef::new(&mailbox);
let handle = spawn_combiner(
(NodeId(1), "cutoff".into()),
(100.0, 1000.0),
ControlStrategy::Absolute,
ConflictStrategy::TouchOverride,
actor_ref,
);
handle.ui_tx.send(UiCommand::SetValue(800.0)).unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
mailbox.pop();
handle.ui_tx.send(UiCommand::Release).unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
mailbox.pop();
handle.automaton_tx.send(0.2).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
let cmd = mailbox.pop().unwrap();
assert!((cmd.value.as_f32().unwrap() - 280.0).abs() < 1.0);
}
}