Skip to main content

maolan_engine/workers/
worker.rs

1use crate::message::{
2    Action, Message, OfflineAutomationLane, OfflineAutomationPoint, OfflineAutomationTarget,
3    OfflineBounceWork,
4};
5#[cfg(unix)]
6use nix::libc;
7use std::collections::HashSet;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10use tokio::sync::mpsc::{Receiver, Sender};
11use tracing::{error, info};
12
13#[derive(Debug)]
14pub struct Worker {
15    id: usize,
16    rx: Receiver<Message>,
17    tx: Sender<Message>,
18    realtime_priority: i32,
19}
20
21impl Worker {
22    fn automation_lane_value_at(points: &[OfflineAutomationPoint], sample: usize) -> Option<f32> {
23        if points.is_empty() {
24            return None;
25        }
26        if sample <= points[0].sample {
27            return Some(points[0].value.clamp(0.0, 1.0));
28        }
29        if sample >= points[points.len().saturating_sub(1)].sample {
30            return Some(points[points.len().saturating_sub(1)].value.clamp(0.0, 1.0));
31        }
32        for segment in points.windows(2) {
33            let left = &segment[0];
34            let right = &segment[1];
35            if sample < left.sample || sample > right.sample {
36                continue;
37            }
38            let span = right.sample.saturating_sub(left.sample).max(1) as f32;
39            let t = (sample.saturating_sub(left.sample) as f32 / span).clamp(0.0, 1.0);
40            return Some((left.value + (right.value - left.value) * t).clamp(0.0, 1.0));
41        }
42        None
43    }
44
45    fn apply_freeze_automation_at_sample(
46        track: &mut crate::track::Track,
47        sample: usize,
48        lanes: &[OfflineAutomationLane],
49    ) {
50        for lane in lanes {
51            if matches!(
52                lane.target,
53                OfflineAutomationTarget::Volume | OfflineAutomationTarget::Balance
54            ) {
55                continue;
56            }
57            let Some(value) = Self::automation_lane_value_at(&lane.points, sample) else {
58                continue;
59            };
60            match lane.target {
61                OfflineAutomationTarget::Volume | OfflineAutomationTarget::Balance => {}
62                OfflineAutomationTarget::Mute => {
63                    track.set_muted(value >= 0.5);
64                }
65                #[cfg(all(unix, not(target_os = "macos")))]
66                OfflineAutomationTarget::Lv2Parameter {
67                    instance_id,
68                    index,
69                    min,
70                    max,
71                } => {
72                    let lo = min.min(max);
73                    let hi = max.max(min);
74                    let param_value = (lo + value * (hi - lo)).clamp(lo, hi);
75                    let _ = track.set_lv2_control_value(
76                        instance_id,
77                        index as usize,
78                        param_value as f64,
79                    );
80                }
81                OfflineAutomationTarget::Vst3Parameter {
82                    instance_id,
83                    param_id,
84                } => {
85                    let _ = track.set_vst3_parameter(instance_id, param_id, value.clamp(0.0, 1.0));
86                }
87                OfflineAutomationTarget::ClapParameter {
88                    instance_id,
89                    param_id,
90                    min,
91                    max,
92                } => {
93                    let lo = min.min(max);
94                    let hi = max.max(min);
95                    let param_value = (lo + value as f64 * (hi - lo)).clamp(lo, hi);
96                    let _ = track.set_clap_parameter_at(instance_id, param_id, param_value, 0);
97                }
98            }
99        }
100    }
101
102    fn prepare_track_for_freeze_render(track: &mut crate::track::Track) -> (f32, f32) {
103        let original_level = track.level();
104        let original_balance = track.balance;
105        track.set_level(0.0);
106        track.set_balance(0.0);
107        (original_level, original_balance)
108    }
109
110    fn restore_track_after_freeze_render(
111        track: &mut crate::track::Track,
112        original_level: f32,
113        original_balance: f32,
114    ) {
115        track.set_level(original_level);
116        track.set_balance(original_balance);
117    }
118
119    async fn process_offline_bounce(&self, job: OfflineBounceWork) {
120        let track_handle = job.state.lock().tracks.get(&job.track_name).cloned();
121        let Some(target_track) = track_handle else {
122            let _ = self
123                .tx
124                .send(Message::OfflineBounceFinished {
125                    result: Err(format!("Track not found: {}", job.track_name)),
126                })
127                .await;
128            return;
129        };
130        let (channels, block_size, sample_rate) = {
131            let t = target_track.lock();
132            let block_size = t
133                .audio
134                .outs
135                .first()
136                .map(|io| io.buffer.lock().len())
137                .or_else(|| t.audio.ins.first().map(|io| io.buffer.lock().len()))
138                .unwrap_or(0)
139                .max(1);
140            (
141                t.audio.outs.len().max(1),
142                block_size,
143                t.sample_rate.round().max(1.0) as i32,
144            )
145        };
146        let freeze_state = if job.apply_fader {
147            None
148        } else {
149            let t = target_track.lock();
150            Some(Self::prepare_track_for_freeze_render(t))
151        };
152
153        let all_tracks: Vec<_> = job.state.lock().tracks.values().cloned().collect();
154        let mut output_to_track: std::collections::HashMap<usize, String> =
155            std::collections::HashMap::new();
156        for handle in &all_tracks {
157            let t = handle.lock();
158            for out in &t.audio.outs {
159                output_to_track.insert(Arc::as_ptr(out) as usize, t.name.clone());
160            }
161        }
162        let mut relevant_names = HashSet::new();
163        let mut queue = vec![job.track_name.clone()];
164        while let Some(name) = queue.pop() {
165            if !relevant_names.insert(name.clone()) {
166                continue;
167            }
168            if let Some(handle) = all_tracks.iter().find(|h| h.lock().name == name) {
169                let t = handle.lock();
170                for input in &t.audio.ins {
171                    for conn in input.connections.lock().iter() {
172                        if let Some(source_name) =
173                            output_to_track.get(&(Arc::as_ptr(conn) as usize))
174                        {
175                            queue.push(source_name.clone());
176                        }
177                    }
178                }
179            }
180        }
181        let relevant_tracks: Vec<_> = all_tracks
182            .into_iter()
183            .filter(|h| relevant_names.contains(&h.lock().name))
184            .collect();
185
186        let mut output_samples =
187            Vec::<f32>::with_capacity(job.length_samples.saturating_mul(channels.max(1)));
188
189        let mut cursor = 0usize;
190        let mut last_reported_progress = 0.0_f32;
191        let mut total_process_time = Duration::ZERO;
192        let mut total_write_time = Duration::ZERO;
193        let mut block_count = 0usize;
194        let bounce_start = Instant::now();
195        while cursor < job.length_samples {
196            if job.cancel.load(std::sync::atomic::Ordering::Relaxed) {
197                let _ = std::fs::remove_file(&job.output_path);
198                if let Some((original_level, original_balance)) = freeze_state {
199                    let t = target_track.lock();
200                    Self::restore_track_after_freeze_render(t, original_level, original_balance);
201                }
202                let _ = self
203                    .tx
204                    .send(Message::OfflineBounceFinished {
205                        result: Ok(Action::TrackOfflineBounceCanceled {
206                            track_name: job.track_name.clone(),
207                        }),
208                    })
209                    .await;
210                let _ = self.tx.send(Message::Ready(self.id)).await;
211                return;
212            }
213
214            let step = (job.length_samples - cursor).min(block_size);
215            for handle in &relevant_tracks {
216                let t = handle.lock();
217                t.audio.finished = false;
218                t.audio.processing = false;
219                t.set_transport_sample(job.start_sample.saturating_add(cursor));
220                t.set_loop_config(false, None);
221                t.set_transport_timing(job.tempo_bpm, job.tsig_num, job.tsig_denom);
222                t.set_clip_playback_enabled(true);
223                t.set_record_tap_enabled(false);
224            }
225
226            let block_process_start = Instant::now();
227            loop {
228                let mut all_finished = true;
229                let mut progressed = false;
230                for handle in &relevant_tracks {
231                    let t = handle.lock();
232                    if t.audio.finished {
233                        continue;
234                    }
235                    all_finished = false;
236                    if !t.audio.processing && t.audio.ready() {
237                        if t.name == job.track_name {
238                            Self::apply_freeze_automation_at_sample(
239                                t,
240                                job.start_sample.saturating_add(cursor),
241                                &job.automation_lanes,
242                            );
243                        }
244                        t.audio.processing = true;
245                        let p_start = Instant::now();
246                        t.process();
247                        total_process_time += p_start.elapsed();
248                        t.audio.processing = false;
249                        progressed = true;
250                    }
251                }
252                if all_finished {
253                    break;
254                }
255                if !progressed {
256                    for handle in &relevant_tracks {
257                        let t = handle.lock();
258                        if t.audio.finished {
259                            continue;
260                        }
261                        if t.name == job.track_name {
262                            Self::apply_freeze_automation_at_sample(
263                                t,
264                                job.start_sample.saturating_add(cursor),
265                                &job.automation_lanes,
266                            );
267                        }
268                        t.audio.processing = true;
269                        let p_start = Instant::now();
270                        t.process();
271                        total_process_time += p_start.elapsed();
272                        t.audio.processing = false;
273                    }
274                    break;
275                }
276            }
277            let _block_process_elapsed = block_process_start.elapsed();
278
279            let write_start = Instant::now();
280            {
281                let t = target_track.lock();
282                let outs: Vec<_> = (0..channels)
283                    .map(|ch| t.audio.outs[ch].buffer.lock())
284                    .collect();
285                for i in 0..step {
286                    for out in outs.iter().take(channels) {
287                        let sample = out.get(i).copied().unwrap_or(0.0);
288                        output_samples.push(sample);
289                    }
290                }
291            }
292            total_write_time += write_start.elapsed();
293
294            cursor = cursor.saturating_add(step);
295            block_count += 1;
296            let progress = (cursor as f32 / job.length_samples as f32).clamp(0.0, 1.0);
297
298            if progress - last_reported_progress >= 0.01 || cursor >= job.length_samples {
299                last_reported_progress = progress;
300                let _ = self
301                    .tx
302                    .send(Message::OfflineBounceFinished {
303                        result: Ok(Action::TrackOfflineBounceProgress {
304                            track_name: job.track_name.clone(),
305                            progress,
306                            operation: Some("Rendering freeze".to_string()),
307                        }),
308                    })
309                    .await;
310            }
311        }
312        let bounce_elapsed = bounce_start.elapsed();
313        info!(
314            "Bounce '{}' — total: {:?}, blocks: {}, process: {:?}, write: {:?}",
315            job.track_name, bounce_elapsed, block_count, total_process_time, total_write_time
316        );
317
318        if let Err(e) = crate::audio_codec::write_wav_f32(
319            std::path::Path::new(&job.output_path),
320            &output_samples,
321            channels,
322            sample_rate as u32,
323        ) {
324            let _ = std::fs::remove_file(&job.output_path);
325            if let Some((original_level, original_balance)) = freeze_state {
326                let t = target_track.lock();
327                Self::restore_track_after_freeze_render(t, original_level, original_balance);
328            }
329            let _ = self
330                .tx
331                .send(Message::OfflineBounceFinished {
332                    result: Err(format!(
333                        "Failed to write offline bounce '{}': {e}",
334                        job.output_path
335                    )),
336                })
337                .await;
338            let _ = self.tx.send(Message::Ready(self.id)).await;
339            return;
340        }
341
342        if let Some((original_level, original_balance)) = freeze_state {
343            let t = target_track.lock();
344            Self::restore_track_after_freeze_render(t, original_level, original_balance);
345        }
346
347        let _ = self
348            .tx
349            .send(Message::OfflineBounceFinished {
350                result: Ok(Action::TrackOfflineBounce {
351                    track_name: job.track_name,
352                    output_path: job.output_path,
353                    start_sample: job.start_sample,
354                    length_samples: job.length_samples,
355                    automation_lanes: vec![],
356                    apply_fader: job.apply_fader,
357                }),
358            })
359            .await;
360        let _ = self.tx.send(Message::Ready(self.id)).await;
361    }
362
363    #[cfg(unix)]
364    fn try_enable_realtime(priority: i32) -> Result<(), String> {
365        let thread = unsafe { libc::pthread_self() };
366        let policy = libc::SCHED_FIFO;
367        let param = unsafe {
368            let mut p = std::mem::zeroed::<libc::sched_param>();
369            p.sched_priority = priority;
370            p
371        };
372        let rc = unsafe { libc::pthread_setschedparam(thread, policy, &param) };
373        if rc == 0 {
374            Ok(())
375        } else {
376            Err(format!("pthread_setschedparam failed with errno {}", rc))
377        }
378    }
379
380    #[cfg(not(unix))]
381    fn try_enable_realtime(_priority: i32) -> Result<(), String> {
382        Err("Realtime thread priority is not supported on this platform".to_string())
383    }
384
385    pub async fn new(
386        id: usize,
387        rx: Receiver<Message>,
388        tx: Sender<Message>,
389        realtime_priority: i32,
390    ) -> Worker {
391        let worker = Worker {
392            id,
393            rx,
394            tx,
395            realtime_priority,
396        };
397        worker.send(Message::Ready(id)).await;
398        worker
399    }
400
401    pub async fn send(&self, message: Message) {
402        self.tx
403            .send(message)
404            .await
405            .expect("Failed to send message from worker");
406    }
407
408    pub async fn work(&mut self) {
409        if let Err(e) = Self::try_enable_realtime(self.realtime_priority) {
410            error!("Worker {} realtime priority not enabled: {}", self.id, e);
411        }
412        while let Some(message) = self.rx.recv().await {
413            match message {
414                Message::Request(Action::Quit) => {
415                    return;
416                }
417                Message::ProcessTrack(t) => {
418                    let (track_name, output_linear, process_epoch, parameter_updates) = {
419                        let track = t.lock();
420                        let process_epoch = track.process_epoch;
421                        let started = Instant::now();
422                        track.process();
423                        let elapsed = started.elapsed();
424                        if elapsed.as_millis() > 20 {
425                            tracing::warn!(
426                                "Slow track process '{}' took {:.3} ms",
427                                track.name,
428                                elapsed.as_secs_f64() * 1000.0
429                            );
430                        }
431                        track.audio.processing = false;
432                        let updates = std::mem::take(track.echoed_parameter_updates.lock());
433                        (
434                            track.name.clone(),
435                            track.output_meter_linear(),
436                            process_epoch,
437                            updates,
438                        )
439                    };
440                    match self
441                        .tx
442                        .send(Message::Finished {
443                            worker_id: self.id,
444                            track_name,
445                            output_linear,
446                            process_epoch,
447                            parameter_updates,
448                        })
449                        .await
450                    {
451                        Ok(_) => {}
452                        Err(e) => {
453                            error!("Error while sending Finished: {}", e);
454                        }
455                    }
456                }
457                Message::ProcessOfflineBounce(job) => {
458                    self.process_offline_bounce(job).await;
459                }
460                _ => {}
461            }
462        }
463    }
464}
465
466#[cfg(test)]
467mod tests {
468    use super::Worker;
469    use crate::message::{
470        Action, Message, OfflineAutomationLane, OfflineAutomationPoint, OfflineAutomationTarget,
471        OfflineBounceWork,
472    };
473    use crate::mutex::UnsafeMutex;
474    use crate::state::State;
475    use crate::track::Track;
476    use std::path::PathBuf;
477    use std::sync::{Arc, atomic::AtomicBool};
478    use std::time::{SystemTime, UNIX_EPOCH};
479    use tokio::sync::mpsc::channel;
480
481    fn make_state_with_track(track: Track) -> Arc<UnsafeMutex<State>> {
482        let mut state = State::default();
483        state.tracks.insert(
484            track.name.clone(),
485            Arc::new(UnsafeMutex::new(Box::new(track))),
486        );
487        Arc::new(UnsafeMutex::new(state))
488    }
489
490    fn unique_temp_wav(name: &str) -> PathBuf {
491        let nanos = SystemTime::now()
492            .duration_since(UNIX_EPOCH)
493            .expect("clock")
494            .as_nanos();
495        std::env::temp_dir().join(format!("maolan_{name}_{nanos}.wav"))
496    }
497
498    #[test]
499    fn prepare_track_for_freeze_render_neutralizes_level_and_balance() {
500        let mut track = Track::new("track".to_string(), 1, 2, 0, 0, 64, 48_000.0);
501        track.set_level(-6.0);
502        track.set_balance(0.35);
503
504        let (level, balance) = Worker::prepare_track_for_freeze_render(&mut track);
505
506        assert_eq!(level, -6.0);
507        assert_eq!(balance, 0.35);
508        assert_eq!(track.level(), 0.0);
509        assert_eq!(track.balance, 0.0);
510
511        Worker::restore_track_after_freeze_render(&mut track, level, balance);
512        assert_eq!(track.level(), -6.0);
513        assert_eq!(track.balance, 0.35);
514    }
515
516    #[test]
517    fn freeze_automation_ignores_volume_and_balance_lanes() {
518        let mut track = Track::new("track".to_string(), 1, 2, 0, 0, 64, 48_000.0);
519        let lanes = vec![
520            OfflineAutomationLane {
521                target: OfflineAutomationTarget::Volume,
522                points: vec![OfflineAutomationPoint {
523                    sample: 0,
524                    value: 0.0,
525                }],
526            },
527            OfflineAutomationLane {
528                target: OfflineAutomationTarget::Balance,
529                points: vec![OfflineAutomationPoint {
530                    sample: 0,
531                    value: 1.0,
532                }],
533            },
534            OfflineAutomationLane {
535                target: OfflineAutomationTarget::Mute,
536                points: vec![OfflineAutomationPoint {
537                    sample: 0,
538                    value: 1.0,
539                }],
540            },
541        ];
542
543        Worker::apply_freeze_automation_at_sample(&mut track, 0, &lanes);
544
545        assert_eq!(track.level(), 0.0);
546        assert_eq!(track.balance, 0.0);
547        assert!(track.muted);
548    }
549
550    #[test]
551    fn automation_lane_value_at_interpolates_between_points() {
552        let value = Worker::automation_lane_value_at(
553            &[
554                OfflineAutomationPoint {
555                    sample: 10,
556                    value: 0.25,
557                },
558                OfflineAutomationPoint {
559                    sample: 20,
560                    value: 0.75,
561                },
562            ],
563            15,
564        )
565        .expect("value");
566
567        assert!((value - 0.5).abs() < 1.0e-6);
568    }
569
570    #[test]
571    fn freeze_automation_applies_interpolated_mute_lane() {
572        let mut track = Track::new("track".to_string(), 1, 1, 0, 0, 64, 48_000.0);
573        let lanes = vec![OfflineAutomationLane {
574            target: OfflineAutomationTarget::Mute,
575            points: vec![
576                OfflineAutomationPoint {
577                    sample: 0,
578                    value: 0.0,
579                },
580                OfflineAutomationPoint {
581                    sample: 10,
582                    value: 1.0,
583                },
584            ],
585        }];
586
587        Worker::apply_freeze_automation_at_sample(&mut track, 5, &lanes);
588        assert!(track.muted);
589
590        track.set_muted(false);
591        Worker::apply_freeze_automation_at_sample(&mut track, 2, &lanes);
592        assert!(!track.muted);
593    }
594
595    #[tokio::test]
596    async fn process_offline_bounce_errors_when_track_is_missing() {
597        let (_rx_unused_tx, rx_unused) = channel(1);
598        let (tx, mut out_rx) = channel(8);
599        let worker = Worker {
600            id: 7,
601            rx: rx_unused,
602            tx,
603            realtime_priority: 0,
604        };
605        let job = OfflineBounceWork {
606            state: Arc::new(UnsafeMutex::new(State::default())),
607            track_name: "missing".to_string(),
608            output_path: unique_temp_wav("missing").to_string_lossy().to_string(),
609            start_sample: 0,
610            length_samples: 8,
611            tempo_bpm: 120.0,
612            tsig_num: 4,
613            tsig_denom: 4,
614            automation_lanes: vec![],
615            cancel: Arc::new(AtomicBool::new(false)),
616            apply_fader: false,
617        };
618
619        worker.process_offline_bounce(job).await;
620
621        match out_rx.recv().await.expect("message") {
622            Message::OfflineBounceFinished { result: Err(err) } => {
623                assert!(err.contains("Track not found: missing"));
624            }
625            other => panic!("unexpected message: {other:?}"),
626        }
627    }
628
629    #[tokio::test]
630    async fn process_offline_bounce_cancels_and_restores_track_state() {
631        let (_rx_unused_tx, rx_unused) = channel(1);
632        let (tx, mut out_rx) = channel(8);
633        let worker = Worker {
634            id: 5,
635            rx: rx_unused,
636            tx,
637            realtime_priority: 0,
638        };
639        let mut track = Track::new("track".to_string(), 1, 2, 0, 0, 4, 48_000.0);
640        track.set_level(-9.0);
641        track.set_balance(-0.3);
642        let state = make_state_with_track(track);
643        let job = OfflineBounceWork {
644            state: state.clone(),
645            track_name: "track".to_string(),
646            output_path: unique_temp_wav("cancel").to_string_lossy().to_string(),
647            start_sample: 0,
648            length_samples: 8,
649            tempo_bpm: 120.0,
650            tsig_num: 4,
651            tsig_denom: 4,
652            automation_lanes: vec![],
653            cancel: Arc::new(AtomicBool::new(true)),
654            apply_fader: false,
655        };
656
657        worker.process_offline_bounce(job).await;
658
659        match out_rx.recv().await.expect("message") {
660            Message::OfflineBounceFinished {
661                result: Ok(Action::TrackOfflineBounceCanceled { track_name }),
662            } => assert_eq!(track_name, "track"),
663            other => panic!("unexpected message: {other:?}"),
664        }
665        assert!(matches!(out_rx.recv().await, Some(Message::Ready(5))));
666        let track = state.lock().tracks.get("track").expect("track").lock();
667        assert_eq!(track.level(), -9.0);
668        assert_eq!(track.balance, -0.3);
669    }
670
671    #[tokio::test]
672    async fn process_offline_bounce_restores_track_state_on_write_failure() {
673        let (_rx_unused_tx, rx_unused) = channel(1);
674        let (tx, mut out_rx) = channel(8);
675        let worker = Worker {
676            id: 3,
677            rx: rx_unused,
678            tx,
679            realtime_priority: 0,
680        };
681        let mut track = Track::new("track".to_string(), 1, 2, 0, 0, 4, 48_000.0);
682        track.set_level(-4.0);
683        track.set_balance(0.25);
684        let state = make_state_with_track(track);
685        let output_path = std::env::temp_dir().to_string_lossy().to_string();
686        let job = OfflineBounceWork {
687            state: state.clone(),
688            track_name: "track".to_string(),
689            output_path,
690            start_sample: 0,
691            length_samples: 4,
692            tempo_bpm: 120.0,
693            tsig_num: 4,
694            tsig_denom: 4,
695            automation_lanes: vec![],
696            cancel: Arc::new(AtomicBool::new(false)),
697            apply_fader: false,
698        };
699
700        worker.process_offline_bounce(job).await;
701
702        let mut saw_error = false;
703        while let Some(message) = out_rx.recv().await {
704            match message {
705                Message::OfflineBounceFinished {
706                    result: Ok(Action::TrackOfflineBounceProgress { .. }),
707                } => {}
708                Message::OfflineBounceFinished { result: Err(err) } => {
709                    assert!(
710                        err.contains("Failed to create offline bounce")
711                            || err.contains("Failed to write offline bounce")
712                            || err.contains("Failed to finalize offline bounce")
713                    );
714                    saw_error = true;
715                }
716                Message::Ready(3) => break,
717                other => panic!("unexpected message: {other:?}"),
718            }
719        }
720        assert!(saw_error);
721        let track = state.lock().tracks.get("track").expect("track").lock();
722        assert_eq!(track.level(), -4.0);
723        assert_eq!(track.balance, 0.25);
724    }
725
726    #[tokio::test]
727    async fn process_offline_bounce_emits_progress_and_completion() {
728        let (_rx_unused_tx, rx_unused) = channel(1);
729        let (tx, mut out_rx) = channel(16);
730        let worker = Worker {
731            id: 2,
732            rx: rx_unused,
733            tx,
734            realtime_priority: 0,
735        };
736        let mut track = Track::new("track".to_string(), 1, 1, 0, 0, 4, 48_000.0);
737        track.set_level(-3.0);
738        track.set_balance(0.4);
739        let state = make_state_with_track(track);
740        let output = unique_temp_wav("success");
741        let job = OfflineBounceWork {
742            state: state.clone(),
743            track_name: "track".to_string(),
744            output_path: output.to_string_lossy().to_string(),
745            start_sample: 0,
746            length_samples: 8,
747            tempo_bpm: 120.0,
748            tsig_num: 4,
749            tsig_denom: 4,
750            automation_lanes: vec![],
751            cancel: Arc::new(AtomicBool::new(false)),
752            apply_fader: false,
753        };
754
755        worker.process_offline_bounce(job).await;
756
757        let mut saw_progress = false;
758        let mut saw_complete = false;
759        let mut saw_ready = false;
760        while let Some(message) = out_rx.recv().await {
761            match message {
762                Message::OfflineBounceFinished {
763                    result:
764                        Ok(Action::TrackOfflineBounceProgress {
765                            track_name,
766                            progress,
767                            ..
768                        }),
769                } => {
770                    assert_eq!(track_name, "track");
771                    assert!(progress > 0.0);
772                    saw_progress = true;
773                }
774                Message::OfflineBounceFinished {
775                    result:
776                        Ok(Action::TrackOfflineBounce {
777                            track_name,
778                            output_path,
779                            ..
780                        }),
781                } => {
782                    assert_eq!(track_name, "track");
783                    assert_eq!(output_path, output.to_string_lossy());
784                    saw_complete = true;
785                }
786                Message::Ready(2) => {
787                    saw_ready = true;
788                    break;
789                }
790                other => panic!("unexpected message: {other:?}"),
791            }
792        }
793
794        assert!(saw_progress);
795        assert!(saw_complete);
796        assert!(saw_ready);
797        assert!(output.exists());
798        std::fs::remove_file(&output).expect("remove temp wav");
799        let track = state.lock().tracks.get("track").expect("track").lock();
800        assert_eq!(track.level(), -3.0);
801        assert_eq!(track.balance, 0.4);
802        assert!(!track.muted);
803    }
804}