1use crate::message::{
2 Action, Message, OfflineAutomationLane, OfflineAutomationPoint, OfflineAutomationTarget,
3 OfflineBounceWork,
4};
5#[cfg(unix)]
6use nix::libc;
7use tokio::sync::mpsc::{Receiver, Sender};
8use tracing::error;
9use wavers::write as write_wav;
10
11#[derive(Debug)]
12pub struct Worker {
13 id: usize,
14 rx: Receiver<Message>,
15 tx: Sender<Message>,
16}
17
18impl Worker {
19 fn automation_lane_value_at(points: &[OfflineAutomationPoint], sample: usize) -> Option<f32> {
20 if points.is_empty() {
21 return None;
22 }
23 if sample <= points[0].sample {
24 return Some(points[0].value.clamp(0.0, 1.0));
25 }
26 if sample >= points[points.len().saturating_sub(1)].sample {
27 return Some(points[points.len().saturating_sub(1)].value.clamp(0.0, 1.0));
28 }
29 for segment in points.windows(2) {
30 let left = &segment[0];
31 let right = &segment[1];
32 if sample < left.sample || sample > right.sample {
33 continue;
34 }
35 let span = right.sample.saturating_sub(left.sample).max(1) as f32;
36 let t = (sample.saturating_sub(left.sample) as f32 / span).clamp(0.0, 1.0);
37 return Some((left.value + (right.value - left.value) * t).clamp(0.0, 1.0));
38 }
39 None
40 }
41
42 fn apply_offline_automation_at_sample(
43 track: &mut crate::track::Track,
44 sample: usize,
45 lanes: &[OfflineAutomationLane],
46 ) {
47 for lane in lanes {
48 let Some(value) = Self::automation_lane_value_at(&lane.points, sample) else {
49 continue;
50 };
51 match lane.target {
52 OfflineAutomationTarget::Volume => {
53 track.set_level((-90.0 + value * 110.0).clamp(-90.0, 20.0));
54 }
55 OfflineAutomationTarget::Balance => {
56 track.set_balance((value * 2.0 - 1.0).clamp(-1.0, 1.0));
57 }
58 OfflineAutomationTarget::Mute => {
59 track.set_muted(value >= 0.5);
60 }
61 #[cfg(all(unix, not(target_os = "macos")))]
62 OfflineAutomationTarget::Lv2Parameter {
63 instance_id,
64 index,
65 min,
66 max,
67 } => {
68 let lo = min.min(max);
69 let hi = max.max(min);
70 let param_value = (lo + value * (hi - lo)).clamp(lo, hi);
71 let _ = track.set_lv2_control_value(instance_id, index, param_value);
72 }
73 OfflineAutomationTarget::Vst3Parameter {
74 instance_id,
75 param_id,
76 } => {
77 let _ = track.set_vst3_parameter(instance_id, param_id, value.clamp(0.0, 1.0));
78 }
79 OfflineAutomationTarget::ClapParameter {
80 instance_id,
81 param_id,
82 min,
83 max,
84 } => {
85 let lo = min.min(max);
86 let hi = max.max(min);
87 let param_value = (lo + value as f64 * (hi - lo)).clamp(lo, hi);
88 let _ = track.set_clap_parameter_at(instance_id, param_id, param_value, 0);
89 }
90 }
91 }
92 }
93
94 async fn process_offline_bounce(&self, job: OfflineBounceWork) {
95 let track_handle = job.state.lock().tracks.get(&job.track_name).cloned();
96 let Some(target_track) = track_handle else {
97 let _ = self
98 .tx
99 .send(Message::OfflineBounceFinished {
100 result: Err(format!("Track not found: {}", job.track_name)),
101 })
102 .await;
103 return;
104 };
105 let (channels, block_size, sample_rate) = {
106 let t = target_track.lock();
107 let block_size = t
108 .audio
109 .outs
110 .first()
111 .map(|io| io.buffer.lock().len())
112 .or_else(|| t.audio.ins.first().map(|io| io.buffer.lock().len()))
113 .unwrap_or(0)
114 .max(1);
115 (
116 t.audio.outs.len().max(1),
117 block_size,
118 t.sample_rate.round().max(1.0) as i32,
119 )
120 };
121
122 let mut rendered = vec![0.0_f32; job.length_samples.saturating_mul(channels)];
123 let mut cursor = 0usize;
124 while cursor < job.length_samples {
125 if job.cancel.load(std::sync::atomic::Ordering::Relaxed) {
126 let _ = self
127 .tx
128 .send(Message::OfflineBounceFinished {
129 result: Ok(Action::TrackOfflineBounceCanceled {
130 track_name: job.track_name.clone(),
131 }),
132 })
133 .await;
134 let _ = self.tx.send(Message::Ready(self.id)).await;
135 return;
136 }
137
138 let step = (job.length_samples - cursor).min(block_size);
139 let tracks: Vec<_> = job.state.lock().tracks.values().cloned().collect();
140 for handle in &tracks {
141 let t = handle.lock();
142 t.audio.finished = false;
143 t.audio.processing = false;
144 t.set_transport_sample(job.start_sample.saturating_add(cursor));
145 t.set_loop_config(false, None);
146 t.set_transport_timing(job.tempo_bpm, job.tsig_num, job.tsig_denom);
147 t.set_clip_playback_enabled(true);
148 t.set_record_tap_enabled(false);
149 }
150
151 let mut remaining = tracks.len();
152 while remaining > 0 {
153 let mut progressed = false;
154 for handle in &tracks {
155 let t = handle.lock();
156 if t.audio.finished || t.audio.processing {
157 continue;
158 }
159 if t.audio.ready() {
160 if t.name == job.track_name {
161 Self::apply_offline_automation_at_sample(
162 t,
163 job.start_sample.saturating_add(cursor),
164 &job.automation_lanes,
165 );
166 }
167 t.audio.processing = true;
168 t.process();
169 t.audio.processing = false;
170 progressed = true;
171 remaining = remaining.saturating_sub(1);
172 }
173 }
174 if !progressed {
175 for handle in &tracks {
176 let t = handle.lock();
177 if t.audio.finished {
178 continue;
179 }
180 if t.name == job.track_name {
181 Self::apply_offline_automation_at_sample(
182 t,
183 job.start_sample.saturating_add(cursor),
184 &job.automation_lanes,
185 );
186 }
187 t.audio.processing = true;
188 t.process();
189 t.audio.processing = false;
190 remaining = remaining.saturating_sub(1);
191 }
192 }
193 }
194
195 {
196 let t = target_track.lock();
197 for ch in 0..channels {
198 let out = t.audio.outs[ch].buffer.lock();
199 let copy_len = step.min(out.len());
200 for i in 0..copy_len {
201 let dst = (cursor + i) * channels + ch;
202 rendered[dst] = out[i];
203 }
204 }
205 }
206
207 cursor = cursor.saturating_add(step);
208 let _ = self
209 .tx
210 .send(Message::OfflineBounceFinished {
211 result: Ok(Action::TrackOfflineBounceProgress {
212 track_name: job.track_name.clone(),
213 progress: (cursor as f32 / job.length_samples as f32).clamp(0.0, 1.0),
214 operation: Some("Rendering freeze".to_string()),
215 }),
216 })
217 .await;
218 }
219
220 if let Err(e) =
221 write_wav::<f32, _>(&job.output_path, &rendered, sample_rate, channels as u16)
222 {
223 let _ = self
224 .tx
225 .send(Message::OfflineBounceFinished {
226 result: Err(format!(
227 "Failed to write offline bounce '{}': {e}",
228 job.output_path
229 )),
230 })
231 .await;
232 let _ = self.tx.send(Message::Ready(self.id)).await;
233 return;
234 }
235
236 let _ = self
237 .tx
238 .send(Message::OfflineBounceFinished {
239 result: Ok(Action::TrackOfflineBounce {
240 track_name: job.track_name,
241 output_path: job.output_path,
242 start_sample: job.start_sample,
243 length_samples: job.length_samples,
244 automation_lanes: vec![],
245 }),
246 })
247 .await;
248 let _ = self.tx.send(Message::Ready(self.id)).await;
249 }
250
251 #[cfg(unix)]
252 fn try_enable_realtime() -> Result<(), String> {
253 let thread = unsafe { libc::pthread_self() };
254 let policy = libc::SCHED_FIFO;
255 let param = unsafe {
256 let mut p = std::mem::zeroed::<libc::sched_param>();
257 p.sched_priority = 10;
258 p
259 };
260 let rc = unsafe { libc::pthread_setschedparam(thread, policy, ¶m) };
261 if rc == 0 {
262 Ok(())
263 } else {
264 Err(format!("pthread_setschedparam failed with errno {}", rc))
265 }
266 }
267
268 #[cfg(not(unix))]
269 fn try_enable_realtime() -> Result<(), String> {
270 Err("Realtime thread priority is not supported on this platform".to_string())
271 }
272
273 pub async fn new(id: usize, rx: Receiver<Message>, tx: Sender<Message>) -> Worker {
274 let worker = Worker { id, rx, tx };
275 worker.send(Message::Ready(id)).await;
276 worker
277 }
278
279 pub async fn send(&self, message: Message) {
280 self.tx
281 .send(message)
282 .await
283 .expect("Failed to send message from worker");
284 }
285
286 pub async fn work(&mut self) {
287 if let Err(e) = Self::try_enable_realtime() {
288 error!("Worker {} realtime priority not enabled: {}", self.id, e);
289 }
290 while let Some(message) = self.rx.recv().await {
291 match message {
292 Message::Request(Action::Quit) => {
293 return;
294 }
295 Message::ProcessTrack(t) => {
296 let (track_name, output_linear, process_epoch) = {
297 let track = t.lock();
298 let process_epoch = track.process_epoch;
299 track.process();
300 track.audio.processing = false;
301 (
302 track.name.clone(),
303 track.output_meter_linear(),
304 process_epoch,
305 )
306 };
307 match self
308 .tx
309 .send(Message::Finished {
310 worker_id: self.id,
311 track_name,
312 output_linear,
313 process_epoch,
314 })
315 .await
316 {
317 Ok(_) => {}
318 Err(e) => {
319 error!("Error while sending Finished: {}", e);
320 }
321 }
322 }
323 Message::ProcessOfflineBounce(job) => {
324 self.process_offline_bounce(job).await;
325 }
326 _ => {}
327 }
328 }
329 }
330}