1use crate::message::{
2 Action, Message, OfflineAutomationLane, OfflineAutomationPoint, OfflineAutomationTarget,
3 OfflineBounceWork,
4};
5#[cfg(unix)]
6use nix::libc;
7use std::time::Instant;
8use tokio::sync::mpsc::{Receiver, Sender};
9use tracing::error;
10
11#[derive(Debug)]
12pub struct Worker {
13 id: usize,
14 rx: Receiver<Message>,
15 tx: Sender<Message>,
16}
17
18impl Worker {
19 fn automation_lane_value_at(points: &[OfflineAutomationPoint], sample: usize) -> Option<f32> {
20 if points.is_empty() {
21 return None;
22 }
23 if sample <= points[0].sample {
24 return Some(points[0].value.clamp(0.0, 1.0));
25 }
26 if sample >= points[points.len().saturating_sub(1)].sample {
27 return Some(points[points.len().saturating_sub(1)].value.clamp(0.0, 1.0));
28 }
29 for segment in points.windows(2) {
30 let left = &segment[0];
31 let right = &segment[1];
32 if sample < left.sample || sample > right.sample {
33 continue;
34 }
35 let span = right.sample.saturating_sub(left.sample).max(1) as f32;
36 let t = (sample.saturating_sub(left.sample) as f32 / span).clamp(0.0, 1.0);
37 return Some((left.value + (right.value - left.value) * t).clamp(0.0, 1.0));
38 }
39 None
40 }
41
42 fn apply_freeze_automation_at_sample(
43 track: &mut crate::track::Track,
44 sample: usize,
45 lanes: &[OfflineAutomationLane],
46 ) {
47 for lane in lanes {
48 if matches!(
49 lane.target,
50 OfflineAutomationTarget::Volume | OfflineAutomationTarget::Balance
51 ) {
52 continue;
53 }
54 let Some(value) = Self::automation_lane_value_at(&lane.points, sample) else {
55 continue;
56 };
57 match lane.target {
58 OfflineAutomationTarget::Volume | OfflineAutomationTarget::Balance => {}
59 OfflineAutomationTarget::Mute => {
60 track.set_muted(value >= 0.5);
61 }
62 #[cfg(all(unix, not(target_os = "macos")))]
63 OfflineAutomationTarget::Lv2Parameter {
64 instance_id,
65 index,
66 min,
67 max,
68 } => {
69 let lo = min.min(max);
70 let hi = max.max(min);
71 let param_value = (lo + value * (hi - lo)).clamp(lo, hi);
72 let _ = track.set_lv2_control_value(instance_id, index, param_value);
73 }
74 OfflineAutomationTarget::Vst3Parameter {
75 instance_id,
76 param_id,
77 } => {
78 let _ = track.set_vst3_parameter(instance_id, param_id, value.clamp(0.0, 1.0));
79 }
80 OfflineAutomationTarget::ClapParameter {
81 instance_id,
82 param_id,
83 min,
84 max,
85 } => {
86 let lo = min.min(max);
87 let hi = max.max(min);
88 let param_value = (lo + value as f64 * (hi - lo)).clamp(lo, hi);
89 let _ = track.set_clap_parameter_at(instance_id, param_id, param_value, 0);
90 }
91 }
92 }
93 }
94
95 fn prepare_track_for_freeze_render(track: &mut crate::track::Track) -> (f32, f32) {
96 let original_level = track.level();
97 let original_balance = track.balance;
98 track.set_level(0.0);
99 track.set_balance(0.0);
100 (original_level, original_balance)
101 }
102
103 fn restore_track_after_freeze_render(
104 track: &mut crate::track::Track,
105 original_level: f32,
106 original_balance: f32,
107 ) {
108 track.set_level(original_level);
109 track.set_balance(original_balance);
110 }
111
112 async fn process_offline_bounce(&self, job: OfflineBounceWork) {
113 let track_handle = job.state.lock().tracks.get(&job.track_name).cloned();
114 let Some(target_track) = track_handle else {
115 let _ = self
116 .tx
117 .send(Message::OfflineBounceFinished {
118 result: Err(format!("Track not found: {}", job.track_name)),
119 })
120 .await;
121 return;
122 };
123 let (channels, block_size, sample_rate) = {
124 let t = target_track.lock();
125 let block_size = t
126 .audio
127 .outs
128 .first()
129 .map(|io| io.buffer.lock().len())
130 .or_else(|| t.audio.ins.first().map(|io| io.buffer.lock().len()))
131 .unwrap_or(0)
132 .max(1);
133 (
134 t.audio.outs.len().max(1),
135 block_size,
136 t.sample_rate.round().max(1.0) as i32,
137 )
138 };
139 let freeze_state = if job.apply_fader {
140 None
141 } else {
142 let t = target_track.lock();
143 Some(Self::prepare_track_for_freeze_render(t))
144 };
145
146 let spec = hound::WavSpec {
147 channels: channels as u16,
148 sample_rate: sample_rate as u32,
149 bits_per_sample: 32,
150 sample_format: hound::SampleFormat::Float,
151 };
152 let mut writer = match hound::WavWriter::create(&job.output_path, spec) {
153 Ok(w) => w,
154 Err(e) => {
155 if let Some((original_level, original_balance)) = freeze_state {
156 let t = target_track.lock();
157 Self::restore_track_after_freeze_render(t, original_level, original_balance);
158 }
159 let _ = self
160 .tx
161 .send(Message::OfflineBounceFinished {
162 result: Err(format!(
163 "Failed to create offline bounce '{}': {e}",
164 job.output_path
165 )),
166 })
167 .await;
168 let _ = self.tx.send(Message::Ready(self.id)).await;
169 return;
170 }
171 };
172
173 let mut cursor = 0usize;
174 while cursor < job.length_samples {
175 if job.cancel.load(std::sync::atomic::Ordering::Relaxed) {
176 let _ = writer.finalize();
177 let _ = std::fs::remove_file(&job.output_path);
178 if let Some((original_level, original_balance)) = freeze_state {
179 let t = target_track.lock();
180 Self::restore_track_after_freeze_render(t, original_level, original_balance);
181 }
182 let _ = self
183 .tx
184 .send(Message::OfflineBounceFinished {
185 result: Ok(Action::TrackOfflineBounceCanceled {
186 track_name: job.track_name.clone(),
187 }),
188 })
189 .await;
190 let _ = self.tx.send(Message::Ready(self.id)).await;
191 return;
192 }
193
194 let step = (job.length_samples - cursor).min(block_size);
195 let tracks: Vec<_> = job.state.lock().tracks.values().cloned().collect();
196 for handle in &tracks {
197 let t = handle.lock();
198 t.audio.finished = false;
199 t.audio.processing = false;
200 t.set_transport_sample(job.start_sample.saturating_add(cursor));
201 t.set_loop_config(false, None);
202 t.set_transport_timing(job.tempo_bpm, job.tsig_num, job.tsig_denom);
203 t.set_clip_playback_enabled(true);
204 t.set_record_tap_enabled(false);
205 }
206
207 loop {
208 let mut all_finished = true;
209 let mut progressed = false;
210 for handle in &tracks {
211 let t = handle.lock();
212 if t.audio.finished {
213 continue;
214 }
215 all_finished = false;
216 if !t.audio.processing && t.audio.ready() {
217 if t.name == job.track_name {
218 Self::apply_freeze_automation_at_sample(
219 t,
220 job.start_sample.saturating_add(cursor),
221 &job.automation_lanes,
222 );
223 }
224 t.audio.processing = true;
225 t.process();
226 t.audio.processing = false;
227 progressed = true;
228 }
229 }
230 if all_finished {
231 break;
232 }
233 if !progressed {
234 for handle in &tracks {
236 let t = handle.lock();
237 if t.audio.finished {
238 continue;
239 }
240 if t.name == job.track_name {
241 Self::apply_freeze_automation_at_sample(
242 t,
243 job.start_sample.saturating_add(cursor),
244 &job.automation_lanes,
245 );
246 }
247 t.audio.processing = true;
248 t.process();
249 t.audio.processing = false;
250 }
251 break;
252 }
253 }
254
255 {
256 let t = target_track.lock();
257 for i in 0..step {
258 for ch in 0..channels {
259 let out = t.audio.outs[ch].buffer.lock();
260 let sample = if i < out.len() { out[i] } else { 0.0 };
261 if let Err(e) = writer.write_sample(sample) {
262 let _ = writer.finalize();
263 let _ = std::fs::remove_file(&job.output_path);
264 if let Some((original_level, original_balance)) = freeze_state {
265 let t = target_track.lock();
266 Self::restore_track_after_freeze_render(
267 t,
268 original_level,
269 original_balance,
270 );
271 }
272 let _ = self
273 .tx
274 .send(Message::OfflineBounceFinished {
275 result: Err(format!(
276 "Failed to write offline bounce '{}': {e}",
277 job.output_path
278 )),
279 })
280 .await;
281 let _ = self.tx.send(Message::Ready(self.id)).await;
282 return;
283 }
284 }
285 }
286 }
287
288 cursor = cursor.saturating_add(step);
289 let _ = self
290 .tx
291 .send(Message::OfflineBounceFinished {
292 result: Ok(Action::TrackOfflineBounceProgress {
293 track_name: job.track_name.clone(),
294 progress: (cursor as f32 / job.length_samples as f32).clamp(0.0, 1.0),
295 operation: Some("Rendering freeze".to_string()),
296 }),
297 })
298 .await;
299 }
300
301 if let Err(e) = writer.finalize() {
302 let _ = std::fs::remove_file(&job.output_path);
303 if let Some((original_level, original_balance)) = freeze_state {
304 let t = target_track.lock();
305 Self::restore_track_after_freeze_render(t, original_level, original_balance);
306 }
307 let _ = self
308 .tx
309 .send(Message::OfflineBounceFinished {
310 result: Err(format!(
311 "Failed to finalize offline bounce '{}': {e}",
312 job.output_path
313 )),
314 })
315 .await;
316 let _ = self.tx.send(Message::Ready(self.id)).await;
317 return;
318 }
319
320 if let Some((original_level, original_balance)) = freeze_state {
321 let t = target_track.lock();
322 Self::restore_track_after_freeze_render(t, original_level, original_balance);
323 }
324
325 let _ = self
326 .tx
327 .send(Message::OfflineBounceFinished {
328 result: Ok(Action::TrackOfflineBounce {
329 track_name: job.track_name,
330 output_path: job.output_path,
331 start_sample: job.start_sample,
332 length_samples: job.length_samples,
333 automation_lanes: vec![],
334 apply_fader: job.apply_fader,
335 }),
336 })
337 .await;
338 let _ = self.tx.send(Message::Ready(self.id)).await;
339 }
340
341 #[cfg(unix)]
342 fn try_enable_realtime() -> Result<(), String> {
343 let thread = unsafe { libc::pthread_self() };
344 let policy = libc::SCHED_FIFO;
345 let param = unsafe {
346 let mut p = std::mem::zeroed::<libc::sched_param>();
347 p.sched_priority = 10;
348 p
349 };
350 let rc = unsafe { libc::pthread_setschedparam(thread, policy, ¶m) };
351 if rc == 0 {
352 Ok(())
353 } else {
354 Err(format!("pthread_setschedparam failed with errno {}", rc))
355 }
356 }
357
358 #[cfg(not(unix))]
359 fn try_enable_realtime() -> Result<(), String> {
360 Err("Realtime thread priority is not supported on this platform".to_string())
361 }
362
363 pub async fn new(id: usize, rx: Receiver<Message>, tx: Sender<Message>) -> Worker {
364 let worker = Worker { id, rx, tx };
365 worker.send(Message::Ready(id)).await;
366 worker
367 }
368
369 pub async fn send(&self, message: Message) {
370 self.tx
371 .send(message)
372 .await
373 .expect("Failed to send message from worker");
374 }
375
376 pub async fn work(&mut self) {
377 if let Err(e) = Self::try_enable_realtime() {
378 error!("Worker {} realtime priority not enabled: {}", self.id, e);
379 }
380 while let Some(message) = self.rx.recv().await {
381 match message {
382 Message::Request(Action::Quit) => {
383 return;
384 }
385 Message::ProcessTrack(t) => {
386 let (track_name, output_linear, process_epoch) = {
387 let track = t.lock();
388 let process_epoch = track.process_epoch;
389 let started = Instant::now();
390 track.process();
391 let elapsed = started.elapsed();
392 if elapsed.as_millis() > 20 {
393 tracing::warn!(
394 "Slow track process '{}' took {:.3} ms",
395 track.name,
396 elapsed.as_secs_f64() * 1000.0
397 );
398 }
399 track.audio.processing = false;
400 (
401 track.name.clone(),
402 track.output_meter_linear(),
403 process_epoch,
404 )
405 };
406 match self
407 .tx
408 .send(Message::Finished {
409 worker_id: self.id,
410 track_name,
411 output_linear,
412 process_epoch,
413 })
414 .await
415 {
416 Ok(_) => {}
417 Err(e) => {
418 error!("Error while sending Finished: {}", e);
419 }
420 }
421 }
422 Message::ProcessOfflineBounce(job) => {
423 self.process_offline_bounce(job).await;
424 }
425 _ => {}
426 }
427 }
428 }
429}
430
431#[cfg(test)]
432mod tests {
433 use super::Worker;
434 use crate::message::{
435 Action, Message, OfflineAutomationLane, OfflineAutomationPoint, OfflineAutomationTarget,
436 OfflineBounceWork,
437 };
438 use crate::mutex::UnsafeMutex;
439 use crate::state::State;
440 use crate::track::Track;
441 use std::path::PathBuf;
442 use std::sync::{Arc, atomic::AtomicBool};
443 use std::time::{SystemTime, UNIX_EPOCH};
444 use tokio::sync::mpsc::channel;
445
446 fn make_state_with_track(track: Track) -> Arc<UnsafeMutex<State>> {
447 let mut state = State::default();
448 state.tracks.insert(
449 track.name.clone(),
450 Arc::new(UnsafeMutex::new(Box::new(track))),
451 );
452 Arc::new(UnsafeMutex::new(state))
453 }
454
455 fn unique_temp_wav(name: &str) -> PathBuf {
456 let nanos = SystemTime::now()
457 .duration_since(UNIX_EPOCH)
458 .expect("clock")
459 .as_nanos();
460 std::env::temp_dir().join(format!("maolan_{name}_{nanos}.wav"))
461 }
462
463 #[test]
464 fn prepare_track_for_freeze_render_neutralizes_level_and_balance() {
465 let mut track = Track::new("track".to_string(), 1, 2, 0, 0, 64, 48_000.0);
466 track.set_level(-6.0);
467 track.set_balance(0.35);
468
469 let (level, balance) = Worker::prepare_track_for_freeze_render(&mut track);
470
471 assert_eq!(level, -6.0);
472 assert_eq!(balance, 0.35);
473 assert_eq!(track.level(), 0.0);
474 assert_eq!(track.balance, 0.0);
475
476 Worker::restore_track_after_freeze_render(&mut track, level, balance);
477 assert_eq!(track.level(), -6.0);
478 assert_eq!(track.balance, 0.35);
479 }
480
481 #[test]
482 fn freeze_automation_ignores_volume_and_balance_lanes() {
483 let mut track = Track::new("track".to_string(), 1, 2, 0, 0, 64, 48_000.0);
484 let lanes = vec![
485 OfflineAutomationLane {
486 target: OfflineAutomationTarget::Volume,
487 points: vec![OfflineAutomationPoint {
488 sample: 0,
489 value: 0.0,
490 }],
491 },
492 OfflineAutomationLane {
493 target: OfflineAutomationTarget::Balance,
494 points: vec![OfflineAutomationPoint {
495 sample: 0,
496 value: 1.0,
497 }],
498 },
499 OfflineAutomationLane {
500 target: OfflineAutomationTarget::Mute,
501 points: vec![OfflineAutomationPoint {
502 sample: 0,
503 value: 1.0,
504 }],
505 },
506 ];
507
508 Worker::apply_freeze_automation_at_sample(&mut track, 0, &lanes);
509
510 assert_eq!(track.level(), 0.0);
511 assert_eq!(track.balance, 0.0);
512 assert!(track.muted);
513 }
514
515 #[test]
516 fn automation_lane_value_at_interpolates_between_points() {
517 let value = Worker::automation_lane_value_at(
518 &[
519 OfflineAutomationPoint {
520 sample: 10,
521 value: 0.25,
522 },
523 OfflineAutomationPoint {
524 sample: 20,
525 value: 0.75,
526 },
527 ],
528 15,
529 )
530 .expect("value");
531
532 assert!((value - 0.5).abs() < 1.0e-6);
533 }
534
535 #[test]
536 fn freeze_automation_applies_interpolated_mute_lane() {
537 let mut track = Track::new("track".to_string(), 1, 1, 0, 0, 64, 48_000.0);
538 let lanes = vec![OfflineAutomationLane {
539 target: OfflineAutomationTarget::Mute,
540 points: vec![
541 OfflineAutomationPoint {
542 sample: 0,
543 value: 0.0,
544 },
545 OfflineAutomationPoint {
546 sample: 10,
547 value: 1.0,
548 },
549 ],
550 }];
551
552 Worker::apply_freeze_automation_at_sample(&mut track, 5, &lanes);
553 assert!(track.muted);
554
555 track.set_muted(false);
556 Worker::apply_freeze_automation_at_sample(&mut track, 2, &lanes);
557 assert!(!track.muted);
558 }
559
560 #[tokio::test]
561 async fn process_offline_bounce_errors_when_track_is_missing() {
562 let (_rx_unused_tx, rx_unused) = channel(1);
563 let (tx, mut out_rx) = channel(8);
564 let worker = Worker {
565 id: 7,
566 rx: rx_unused,
567 tx,
568 };
569 let job = OfflineBounceWork {
570 state: Arc::new(UnsafeMutex::new(State::default())),
571 track_name: "missing".to_string(),
572 output_path: unique_temp_wav("missing").to_string_lossy().to_string(),
573 start_sample: 0,
574 length_samples: 8,
575 tempo_bpm: 120.0,
576 tsig_num: 4,
577 tsig_denom: 4,
578 automation_lanes: vec![],
579 cancel: Arc::new(AtomicBool::new(false)),
580 apply_fader: false,
581 };
582
583 worker.process_offline_bounce(job).await;
584
585 match out_rx.recv().await.expect("message") {
586 Message::OfflineBounceFinished { result: Err(err) } => {
587 assert!(err.contains("Track not found: missing"));
588 }
589 other => panic!("unexpected message: {other:?}"),
590 }
591 }
592
593 #[tokio::test]
594 async fn process_offline_bounce_cancels_and_restores_track_state() {
595 let (_rx_unused_tx, rx_unused) = channel(1);
596 let (tx, mut out_rx) = channel(8);
597 let worker = Worker {
598 id: 5,
599 rx: rx_unused,
600 tx,
601 };
602 let mut track = Track::new("track".to_string(), 1, 2, 0, 0, 4, 48_000.0);
603 track.set_level(-9.0);
604 track.set_balance(-0.3);
605 let state = make_state_with_track(track);
606 let job = OfflineBounceWork {
607 state: state.clone(),
608 track_name: "track".to_string(),
609 output_path: unique_temp_wav("cancel").to_string_lossy().to_string(),
610 start_sample: 0,
611 length_samples: 8,
612 tempo_bpm: 120.0,
613 tsig_num: 4,
614 tsig_denom: 4,
615 automation_lanes: vec![],
616 cancel: Arc::new(AtomicBool::new(true)),
617 apply_fader: false,
618 };
619
620 worker.process_offline_bounce(job).await;
621
622 match out_rx.recv().await.expect("message") {
623 Message::OfflineBounceFinished {
624 result: Ok(Action::TrackOfflineBounceCanceled { track_name }),
625 } => assert_eq!(track_name, "track"),
626 other => panic!("unexpected message: {other:?}"),
627 }
628 assert!(matches!(out_rx.recv().await, Some(Message::Ready(5))));
629 let track = state.lock().tracks.get("track").expect("track").lock();
630 assert_eq!(track.level(), -9.0);
631 assert_eq!(track.balance, -0.3);
632 }
633
634 #[tokio::test]
635 async fn process_offline_bounce_restores_track_state_on_write_failure() {
636 let (_rx_unused_tx, rx_unused) = channel(1);
637 let (tx, mut out_rx) = channel(8);
638 let worker = Worker {
639 id: 3,
640 rx: rx_unused,
641 tx,
642 };
643 let mut track = Track::new("track".to_string(), 1, 2, 0, 0, 4, 48_000.0);
644 track.set_level(-4.0);
645 track.set_balance(0.25);
646 let state = make_state_with_track(track);
647 let output_path = std::env::temp_dir().to_string_lossy().to_string();
648 let job = OfflineBounceWork {
649 state: state.clone(),
650 track_name: "track".to_string(),
651 output_path,
652 start_sample: 0,
653 length_samples: 4,
654 tempo_bpm: 120.0,
655 tsig_num: 4,
656 tsig_denom: 4,
657 automation_lanes: vec![],
658 cancel: Arc::new(AtomicBool::new(false)),
659 apply_fader: false,
660 };
661
662 worker.process_offline_bounce(job).await;
663
664 let mut saw_error = false;
665 while let Some(message) = out_rx.recv().await {
666 match message {
667 Message::OfflineBounceFinished {
668 result: Ok(Action::TrackOfflineBounceProgress { .. }),
669 } => {}
670 Message::OfflineBounceFinished { result: Err(err) } => {
671 assert!(
672 err.contains("Failed to create offline bounce")
673 || err.contains("Failed to write offline bounce")
674 || err.contains("Failed to finalize offline bounce")
675 );
676 saw_error = true;
677 }
678 Message::Ready(3) => break,
679 other => panic!("unexpected message: {other:?}"),
680 }
681 }
682 assert!(saw_error);
683 let track = state.lock().tracks.get("track").expect("track").lock();
684 assert_eq!(track.level(), -4.0);
685 assert_eq!(track.balance, 0.25);
686 }
687
688 #[tokio::test]
689 async fn process_offline_bounce_emits_progress_and_completion() {
690 let (_rx_unused_tx, rx_unused) = channel(1);
691 let (tx, mut out_rx) = channel(16);
692 let worker = Worker {
693 id: 2,
694 rx: rx_unused,
695 tx,
696 };
697 let mut track = Track::new("track".to_string(), 1, 1, 0, 0, 4, 48_000.0);
698 track.set_level(-3.0);
699 track.set_balance(0.4);
700 let state = make_state_with_track(track);
701 let output = unique_temp_wav("success");
702 let job = OfflineBounceWork {
703 state: state.clone(),
704 track_name: "track".to_string(),
705 output_path: output.to_string_lossy().to_string(),
706 start_sample: 0,
707 length_samples: 8,
708 tempo_bpm: 120.0,
709 tsig_num: 4,
710 tsig_denom: 4,
711 automation_lanes: vec![],
712 cancel: Arc::new(AtomicBool::new(false)),
713 apply_fader: false,
714 };
715
716 worker.process_offline_bounce(job).await;
717
718 let mut saw_progress = false;
719 let mut saw_complete = false;
720 let mut saw_ready = false;
721 while let Some(message) = out_rx.recv().await {
722 match message {
723 Message::OfflineBounceFinished {
724 result:
725 Ok(Action::TrackOfflineBounceProgress {
726 track_name,
727 progress,
728 ..
729 }),
730 } => {
731 assert_eq!(track_name, "track");
732 assert!(progress > 0.0);
733 saw_progress = true;
734 }
735 Message::OfflineBounceFinished {
736 result:
737 Ok(Action::TrackOfflineBounce {
738 track_name,
739 output_path,
740 ..
741 }),
742 } => {
743 assert_eq!(track_name, "track");
744 assert_eq!(output_path, output.to_string_lossy());
745 saw_complete = true;
746 }
747 Message::Ready(2) => {
748 saw_ready = true;
749 break;
750 }
751 other => panic!("unexpected message: {other:?}"),
752 }
753 }
754
755 assert!(saw_progress);
756 assert!(saw_complete);
757 assert!(saw_ready);
758 assert!(output.exists());
759 std::fs::remove_file(&output).expect("remove temp wav");
760 let track = state.lock().tracks.get("track").expect("track").lock();
761 assert_eq!(track.level(), -3.0);
762 assert_eq!(track.balance, 0.4);
763 assert!(!track.muted);
764 }
765}