Skip to main content

maolan_engine/workers/
worker.rs

1use crate::message::{
2    Action, Message, OfflineAutomationLane, OfflineAutomationPoint, OfflineAutomationTarget,
3    OfflineBounceWork,
4};
5#[cfg(unix)]
6use nix::libc;
7use std::collections::HashSet;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10use tokio::sync::mpsc::{Receiver, Sender};
11use tracing::{error, info};
12
13#[derive(Debug)]
14pub struct Worker {
15    id: usize,
16    rx: Receiver<Message>,
17    tx: Sender<Message>,
18    realtime_priority: i32,
19}
20
21impl Worker {
22    fn automation_lane_value_at(points: &[OfflineAutomationPoint], sample: usize) -> Option<f32> {
23        if points.is_empty() {
24            return None;
25        }
26        if sample <= points[0].sample {
27            return Some(points[0].value.clamp(0.0, 1.0));
28        }
29        if sample >= points[points.len().saturating_sub(1)].sample {
30            return Some(points[points.len().saturating_sub(1)].value.clamp(0.0, 1.0));
31        }
32        for segment in points.windows(2) {
33            let left = &segment[0];
34            let right = &segment[1];
35            if sample < left.sample || sample > right.sample {
36                continue;
37            }
38            let span = right.sample.saturating_sub(left.sample).max(1) as f32;
39            let t = (sample.saturating_sub(left.sample) as f32 / span).clamp(0.0, 1.0);
40            return Some((left.value + (right.value - left.value) * t).clamp(0.0, 1.0));
41        }
42        None
43    }
44
45    fn apply_freeze_automation_at_sample(
46        track: &mut crate::track::Track,
47        sample: usize,
48        lanes: &[OfflineAutomationLane],
49    ) {
50        for lane in lanes {
51            if matches!(
52                lane.target,
53                OfflineAutomationTarget::Volume | OfflineAutomationTarget::Balance
54            ) {
55                continue;
56            }
57            let Some(value) = Self::automation_lane_value_at(&lane.points, sample) else {
58                continue;
59            };
60            match lane.target {
61                OfflineAutomationTarget::Volume | OfflineAutomationTarget::Balance => {}
62                OfflineAutomationTarget::Mute => {
63                    track.set_muted(value >= 0.5);
64                }
65                #[cfg(all(unix, not(target_os = "macos")))]
66                OfflineAutomationTarget::Lv2Parameter {
67                    instance_id,
68                    index,
69                    min,
70                    max,
71                } => {
72                    let lo = min.min(max);
73                    let hi = max.max(min);
74                    let param_value = (lo + value * (hi - lo)).clamp(lo, hi);
75                    let _ = track.set_lv2_control_value(
76                        instance_id,
77                        index as usize,
78                        param_value as f64,
79                    );
80                }
81                OfflineAutomationTarget::Vst3Parameter {
82                    instance_id,
83                    param_id,
84                } => {
85                    let _ = track.set_vst3_parameter(instance_id, param_id, value.clamp(0.0, 1.0));
86                }
87                OfflineAutomationTarget::ClapParameter {
88                    instance_id,
89                    param_id,
90                    min,
91                    max,
92                } => {
93                    let lo = min.min(max);
94                    let hi = max.max(min);
95                    let param_value = (lo + value as f64 * (hi - lo)).clamp(lo, hi);
96                    let _ = track.set_clap_parameter_at(instance_id, param_id, param_value, 0);
97                }
98            }
99        }
100    }
101
102    fn prepare_track_for_freeze_render(track: &mut crate::track::Track) -> (f32, f32) {
103        let original_level = track.level();
104        let original_balance = track.balance;
105        track.set_level(0.0);
106        track.set_balance(0.0);
107        (original_level, original_balance)
108    }
109
110    fn restore_track_after_freeze_render(
111        track: &mut crate::track::Track,
112        original_level: f32,
113        original_balance: f32,
114    ) {
115        track.set_level(original_level);
116        track.set_balance(original_balance);
117    }
118
119    async fn process_offline_bounce(&self, job: OfflineBounceWork) {
120        let track_handle = job.state.lock().tracks.get(&job.track_name).cloned();
121        let Some(target_track) = track_handle else {
122            let _ = self
123                .tx
124                .send(Message::OfflineBounceFinished {
125                    result: Err(format!("Track not found: {}", job.track_name)),
126                })
127                .await;
128            return;
129        };
130        let (channels, block_size, sample_rate) = {
131            let t = target_track.lock();
132            let block_size = t
133                .audio
134                .outs
135                .first()
136                .map(|io| io.buffer.lock().len())
137                .or_else(|| t.audio.ins.first().map(|io| io.buffer.lock().len()))
138                .unwrap_or(0)
139                .max(1);
140            (
141                t.audio.outs.len().max(1),
142                block_size,
143                t.sample_rate.round().max(1.0) as i32,
144            )
145        };
146        let freeze_state = if job.apply_fader {
147            None
148        } else {
149            let t = target_track.lock();
150            Some(Self::prepare_track_for_freeze_render(t))
151        };
152
153        let all_tracks: Vec<_> = job.state.lock().tracks.values().cloned().collect();
154        let mut output_to_track: std::collections::HashMap<usize, String> =
155            std::collections::HashMap::new();
156        for handle in &all_tracks {
157            let t = handle.lock();
158            for out in &t.audio.outs {
159                output_to_track.insert(Arc::as_ptr(out) as usize, t.name.clone());
160            }
161        }
162        let mut relevant_names = HashSet::new();
163        let mut queue = vec![job.track_name.clone()];
164        while let Some(name) = queue.pop() {
165            if !relevant_names.insert(name.clone()) {
166                continue;
167            }
168            if let Some(handle) = all_tracks.iter().find(|h| h.lock().name == name) {
169                let t = handle.lock();
170                for input in &t.audio.ins {
171                    for conn in input.connections.lock().iter() {
172                        if let Some(source_name) =
173                            output_to_track.get(&(Arc::as_ptr(conn) as usize))
174                        {
175                            queue.push(source_name.clone());
176                        }
177                    }
178                }
179            }
180        }
181        let relevant_tracks: Vec<_> = all_tracks
182            .into_iter()
183            .filter(|h| relevant_names.contains(&h.lock().name))
184            .collect();
185
186        let mut output_samples =
187            Vec::<f32>::with_capacity(job.length_samples.saturating_mul(channels.max(1)));
188
189        let mut cursor = 0usize;
190        let mut last_reported_progress = 0.0_f32;
191        let mut total_process_time = Duration::ZERO;
192        let mut total_write_time = Duration::ZERO;
193        let mut block_count = 0usize;
194        let bounce_start = Instant::now();
195        while cursor < job.length_samples {
196            if job.cancel.load(std::sync::atomic::Ordering::Relaxed) {
197                let _ = std::fs::remove_file(&job.output_path);
198                if let Some((original_level, original_balance)) = freeze_state {
199                    let t = target_track.lock();
200                    Self::restore_track_after_freeze_render(t, original_level, original_balance);
201                }
202                let _ = self
203                    .tx
204                    .send(Message::OfflineBounceFinished {
205                        result: Ok(Action::TrackOfflineBounceCanceled {
206                            track_name: job.track_name.clone(),
207                        }),
208                    })
209                    .await;
210                let _ = self.tx.send(Message::Ready(self.id)).await;
211                return;
212            }
213
214            let step = (job.length_samples - cursor).min(block_size);
215            for handle in &relevant_tracks {
216                let t = handle.lock();
217                t.audio.finished = false;
218                t.audio.processing = false;
219                t.set_transport_sample(job.start_sample.saturating_add(cursor));
220                t.set_loop_config(false, None);
221                t.set_transport_timing(job.tempo_bpm, job.tsig_num, job.tsig_denom);
222                t.set_clip_playback_enabled(true);
223                t.set_record_tap_enabled(false);
224            }
225
226            let block_process_start = Instant::now();
227            loop {
228                let mut all_finished = true;
229                let mut progressed = false;
230                for handle in &relevant_tracks {
231                    let t = handle.lock();
232                    if t.audio.finished {
233                        continue;
234                    }
235                    all_finished = false;
236                    if !t.audio.processing && t.audio.ready() {
237                        if t.name == job.track_name {
238                            Self::apply_freeze_automation_at_sample(
239                                t,
240                                job.start_sample.saturating_add(cursor),
241                                &job.automation_lanes,
242                            );
243                        }
244                        t.audio.processing = true;
245                        let p_start = Instant::now();
246                        t.process();
247                        total_process_time += p_start.elapsed();
248                        t.audio.processing = false;
249                        progressed = true;
250                    }
251                }
252                if all_finished {
253                    break;
254                }
255                if !progressed {
256                    for handle in &relevant_tracks {
257                        let t = handle.lock();
258                        if t.audio.finished {
259                            continue;
260                        }
261                        if t.name == job.track_name {
262                            Self::apply_freeze_automation_at_sample(
263                                t,
264                                job.start_sample.saturating_add(cursor),
265                                &job.automation_lanes,
266                            );
267                        }
268                        t.audio.processing = true;
269                        let p_start = Instant::now();
270                        t.process();
271                        total_process_time += p_start.elapsed();
272                        t.audio.processing = false;
273                    }
274                    break;
275                }
276            }
277            let _block_process_elapsed = block_process_start.elapsed();
278
279            let write_start = Instant::now();
280            {
281                let t = target_track.lock();
282                let outs: Vec<_> = (0..channels)
283                    .map(|ch| t.audio.outs[ch].buffer.lock())
284                    .collect();
285                for i in 0..step {
286                    for out in outs.iter().take(channels) {
287                        let sample = out.get(i).copied().unwrap_or(0.0);
288                        output_samples.push(sample);
289                    }
290                }
291            }
292            total_write_time += write_start.elapsed();
293
294            cursor = cursor.saturating_add(step);
295            block_count += 1;
296            let progress = (cursor as f32 / job.length_samples as f32).clamp(0.0, 1.0);
297
298            if progress - last_reported_progress >= 0.01 || cursor >= job.length_samples {
299                last_reported_progress = progress;
300                let _ = self
301                    .tx
302                    .send(Message::OfflineBounceFinished {
303                        result: Ok(Action::TrackOfflineBounceProgress {
304                            track_name: job.track_name.clone(),
305                            progress,
306                            operation: Some("Rendering freeze".to_string()),
307                        }),
308                    })
309                    .await;
310            }
311        }
312        let bounce_elapsed = bounce_start.elapsed();
313        info!(
314            "Bounce '{}' — total: {:?}, blocks: {}, process: {:?}, write: {:?}",
315            job.track_name, bounce_elapsed, block_count, total_process_time, total_write_time
316        );
317
318        if let Err(e) = crate::audio_codec::write_wav_f32(
319            std::path::Path::new(&job.output_path),
320            &output_samples,
321            channels,
322            sample_rate as u32,
323        ) {
324            let _ = std::fs::remove_file(&job.output_path);
325            if let Some((original_level, original_balance)) = freeze_state {
326                let t = target_track.lock();
327                Self::restore_track_after_freeze_render(t, original_level, original_balance);
328            }
329            let _ = self
330                .tx
331                .send(Message::OfflineBounceFinished {
332                    result: Err(format!(
333                        "Failed to write offline bounce '{}': {e}",
334                        job.output_path
335                    )),
336                })
337                .await;
338            let _ = self.tx.send(Message::Ready(self.id)).await;
339            return;
340        }
341
342        if let Some((original_level, original_balance)) = freeze_state {
343            let t = target_track.lock();
344            Self::restore_track_after_freeze_render(t, original_level, original_balance);
345        }
346
347        let _ = self
348            .tx
349            .send(Message::OfflineBounceFinished {
350                result: Ok(Action::TrackOfflineBounce {
351                    track_name: job.track_name,
352                    output_path: job.output_path,
353                    start_sample: job.start_sample,
354                    length_samples: job.length_samples,
355                    automation_lanes: vec![],
356                    apply_fader: job.apply_fader,
357                }),
358            })
359            .await;
360        let _ = self.tx.send(Message::Ready(self.id)).await;
361    }
362
363    #[cfg(unix)]
364    fn try_enable_realtime(priority: i32) -> Result<(), String> {
365        let thread = unsafe { libc::pthread_self() };
366        let policy = libc::SCHED_FIFO;
367        let param = unsafe {
368            let mut p = std::mem::zeroed::<libc::sched_param>();
369            p.sched_priority = priority;
370            p
371        };
372        let rc = unsafe { libc::pthread_setschedparam(thread, policy, &param) };
373        if rc == 0 {
374            Ok(())
375        } else {
376            Err(format!("pthread_setschedparam failed with errno {}", rc))
377        }
378    }
379
380    #[cfg(not(unix))]
381    fn try_enable_realtime(_priority: i32) -> Result<(), String> {
382        Err("Realtime thread priority is not supported on this platform".to_string())
383    }
384
385    pub async fn new(
386        id: usize,
387        rx: Receiver<Message>,
388        tx: Sender<Message>,
389        realtime_priority: i32,
390    ) -> Worker {
391        let worker = Worker {
392            id,
393            rx,
394            tx,
395            realtime_priority,
396        };
397        worker.send(Message::Ready(id)).await;
398        worker
399    }
400
401    pub async fn send(&self, message: Message) {
402        self.tx
403            .send(message)
404            .await
405            .expect("Failed to send message from worker");
406    }
407
408    pub async fn work(&mut self) {
409        crate::enable_flush_denormals_to_zero();
410        if let Err(e) = Self::try_enable_realtime(self.realtime_priority) {
411            error!("Worker {} realtime priority not enabled: {}", self.id, e);
412        }
413        while let Some(message) = self.rx.recv().await {
414            match message {
415                Message::Request(Action::Quit) => {
416                    return;
417                }
418                Message::ProcessTrack(t) => {
419                    let (track_name, output_linear, process_epoch, parameter_updates) = {
420                        let track = t.lock();
421                        let process_epoch = track.process_epoch;
422                        let started = Instant::now();
423                        track.process();
424                        let elapsed = started.elapsed();
425                        if elapsed.as_millis() > 20 {
426                            tracing::warn!(
427                                "Slow track process '{}' took {:.3} ms",
428                                track.name,
429                                elapsed.as_secs_f64() * 1000.0
430                            );
431                        }
432                        track.audio.processing = false;
433                        let updates = std::mem::take(track.echoed_parameter_updates.lock());
434                        (
435                            track.name.clone(),
436                            track.output_meter_linear(),
437                            process_epoch,
438                            updates,
439                        )
440                    };
441                    match self
442                        .tx
443                        .send(Message::Finished {
444                            worker_id: self.id,
445                            track_name,
446                            output_linear,
447                            process_epoch,
448                            parameter_updates,
449                        })
450                        .await
451                    {
452                        Ok(_) => {}
453                        Err(e) => {
454                            error!("Error while sending Finished: {}", e);
455                        }
456                    }
457                }
458                Message::ProcessOfflineBounce(job) => {
459                    self.process_offline_bounce(job).await;
460                }
461                _ => {}
462            }
463        }
464    }
465}
466
467#[cfg(test)]
468mod tests {
469    use super::Worker;
470    use crate::message::{
471        Action, Message, OfflineAutomationLane, OfflineAutomationPoint, OfflineAutomationTarget,
472        OfflineBounceWork,
473    };
474    use crate::mutex::UnsafeMutex;
475    use crate::state::State;
476    use crate::track::Track;
477    use std::path::PathBuf;
478    use std::sync::{Arc, atomic::AtomicBool};
479    use std::time::{SystemTime, UNIX_EPOCH};
480    use tokio::sync::mpsc::channel;
481
482    fn make_state_with_track(track: Track) -> Arc<UnsafeMutex<State>> {
483        let mut state = State::default();
484        state.tracks.insert(
485            track.name.clone(),
486            Arc::new(UnsafeMutex::new(Box::new(track))),
487        );
488        Arc::new(UnsafeMutex::new(state))
489    }
490
491    fn unique_temp_wav(name: &str) -> PathBuf {
492        let nanos = SystemTime::now()
493            .duration_since(UNIX_EPOCH)
494            .expect("clock")
495            .as_nanos();
496        std::env::temp_dir().join(format!("maolan_{name}_{nanos}.wav"))
497    }
498
499    #[test]
500    fn prepare_track_for_freeze_render_neutralizes_level_and_balance() {
501        let mut track = Track::new("track".to_string(), 1, 2, 0, 0, 64, 48_000.0);
502        track.set_level(-6.0);
503        track.set_balance(0.35);
504
505        let (level, balance) = Worker::prepare_track_for_freeze_render(&mut track);
506
507        assert_eq!(level, -6.0);
508        assert_eq!(balance, 0.35);
509        assert_eq!(track.level(), 0.0);
510        assert_eq!(track.balance, 0.0);
511
512        Worker::restore_track_after_freeze_render(&mut track, level, balance);
513        assert_eq!(track.level(), -6.0);
514        assert_eq!(track.balance, 0.35);
515    }
516
517    #[test]
518    fn freeze_automation_ignores_volume_and_balance_lanes() {
519        let mut track = Track::new("track".to_string(), 1, 2, 0, 0, 64, 48_000.0);
520        let lanes = vec![
521            OfflineAutomationLane {
522                target: OfflineAutomationTarget::Volume,
523                points: vec![OfflineAutomationPoint {
524                    sample: 0,
525                    value: 0.0,
526                }],
527            },
528            OfflineAutomationLane {
529                target: OfflineAutomationTarget::Balance,
530                points: vec![OfflineAutomationPoint {
531                    sample: 0,
532                    value: 1.0,
533                }],
534            },
535            OfflineAutomationLane {
536                target: OfflineAutomationTarget::Mute,
537                points: vec![OfflineAutomationPoint {
538                    sample: 0,
539                    value: 1.0,
540                }],
541            },
542        ];
543
544        Worker::apply_freeze_automation_at_sample(&mut track, 0, &lanes);
545
546        assert_eq!(track.level(), 0.0);
547        assert_eq!(track.balance, 0.0);
548        assert!(track.muted);
549    }
550
551    #[test]
552    fn automation_lane_value_at_interpolates_between_points() {
553        let value = Worker::automation_lane_value_at(
554            &[
555                OfflineAutomationPoint {
556                    sample: 10,
557                    value: 0.25,
558                },
559                OfflineAutomationPoint {
560                    sample: 20,
561                    value: 0.75,
562                },
563            ],
564            15,
565        )
566        .expect("value");
567
568        assert!((value - 0.5).abs() < 1.0e-6);
569    }
570
571    #[test]
572    fn freeze_automation_applies_interpolated_mute_lane() {
573        let mut track = Track::new("track".to_string(), 1, 1, 0, 0, 64, 48_000.0);
574        let lanes = vec![OfflineAutomationLane {
575            target: OfflineAutomationTarget::Mute,
576            points: vec![
577                OfflineAutomationPoint {
578                    sample: 0,
579                    value: 0.0,
580                },
581                OfflineAutomationPoint {
582                    sample: 10,
583                    value: 1.0,
584                },
585            ],
586        }];
587
588        Worker::apply_freeze_automation_at_sample(&mut track, 5, &lanes);
589        assert!(track.muted);
590
591        track.set_muted(false);
592        Worker::apply_freeze_automation_at_sample(&mut track, 2, &lanes);
593        assert!(!track.muted);
594    }
595
596    #[tokio::test]
597    async fn process_offline_bounce_errors_when_track_is_missing() {
598        let (_rx_unused_tx, rx_unused) = channel(1);
599        let (tx, mut out_rx) = channel(8);
600        let worker = Worker {
601            id: 7,
602            rx: rx_unused,
603            tx,
604            realtime_priority: 0,
605        };
606        let job = OfflineBounceWork {
607            state: Arc::new(UnsafeMutex::new(State::default())),
608            track_name: "missing".to_string(),
609            output_path: unique_temp_wav("missing").to_string_lossy().to_string(),
610            start_sample: 0,
611            length_samples: 8,
612            tempo_bpm: 120.0,
613            tsig_num: 4,
614            tsig_denom: 4,
615            automation_lanes: vec![],
616            cancel: Arc::new(AtomicBool::new(false)),
617            apply_fader: false,
618        };
619
620        worker.process_offline_bounce(job).await;
621
622        match out_rx.recv().await.expect("message") {
623            Message::OfflineBounceFinished { result: Err(err) } => {
624                assert!(err.contains("Track not found: missing"));
625            }
626            other => panic!("unexpected message: {other:?}"),
627        }
628    }
629
630    #[tokio::test]
631    async fn process_offline_bounce_cancels_and_restores_track_state() {
632        let (_rx_unused_tx, rx_unused) = channel(1);
633        let (tx, mut out_rx) = channel(8);
634        let worker = Worker {
635            id: 5,
636            rx: rx_unused,
637            tx,
638            realtime_priority: 0,
639        };
640        let mut track = Track::new("track".to_string(), 1, 2, 0, 0, 4, 48_000.0);
641        track.set_level(-9.0);
642        track.set_balance(-0.3);
643        let state = make_state_with_track(track);
644        let job = OfflineBounceWork {
645            state: state.clone(),
646            track_name: "track".to_string(),
647            output_path: unique_temp_wav("cancel").to_string_lossy().to_string(),
648            start_sample: 0,
649            length_samples: 8,
650            tempo_bpm: 120.0,
651            tsig_num: 4,
652            tsig_denom: 4,
653            automation_lanes: vec![],
654            cancel: Arc::new(AtomicBool::new(true)),
655            apply_fader: false,
656        };
657
658        worker.process_offline_bounce(job).await;
659
660        match out_rx.recv().await.expect("message") {
661            Message::OfflineBounceFinished {
662                result: Ok(Action::TrackOfflineBounceCanceled { track_name }),
663            } => assert_eq!(track_name, "track"),
664            other => panic!("unexpected message: {other:?}"),
665        }
666        assert!(matches!(out_rx.recv().await, Some(Message::Ready(5))));
667        let track = state.lock().tracks.get("track").expect("track").lock();
668        assert_eq!(track.level(), -9.0);
669        assert_eq!(track.balance, -0.3);
670    }
671
672    #[tokio::test]
673    async fn process_offline_bounce_restores_track_state_on_write_failure() {
674        let (_rx_unused_tx, rx_unused) = channel(1);
675        let (tx, mut out_rx) = channel(8);
676        let worker = Worker {
677            id: 3,
678            rx: rx_unused,
679            tx,
680            realtime_priority: 0,
681        };
682        let mut track = Track::new("track".to_string(), 1, 2, 0, 0, 4, 48_000.0);
683        track.set_level(-4.0);
684        track.set_balance(0.25);
685        let state = make_state_with_track(track);
686        let output_path = std::env::temp_dir().to_string_lossy().to_string();
687        let job = OfflineBounceWork {
688            state: state.clone(),
689            track_name: "track".to_string(),
690            output_path,
691            start_sample: 0,
692            length_samples: 4,
693            tempo_bpm: 120.0,
694            tsig_num: 4,
695            tsig_denom: 4,
696            automation_lanes: vec![],
697            cancel: Arc::new(AtomicBool::new(false)),
698            apply_fader: false,
699        };
700
701        worker.process_offline_bounce(job).await;
702
703        let mut saw_error = false;
704        while let Some(message) = out_rx.recv().await {
705            match message {
706                Message::OfflineBounceFinished {
707                    result: Ok(Action::TrackOfflineBounceProgress { .. }),
708                } => {}
709                Message::OfflineBounceFinished { result: Err(err) } => {
710                    assert!(
711                        err.contains("Failed to create offline bounce")
712                            || err.contains("Failed to write offline bounce")
713                            || err.contains("Failed to finalize offline bounce")
714                    );
715                    saw_error = true;
716                }
717                Message::Ready(3) => break,
718                other => panic!("unexpected message: {other:?}"),
719            }
720        }
721        assert!(saw_error);
722        let track = state.lock().tracks.get("track").expect("track").lock();
723        assert_eq!(track.level(), -4.0);
724        assert_eq!(track.balance, 0.25);
725    }
726
727    #[tokio::test]
728    async fn process_offline_bounce_emits_progress_and_completion() {
729        let (_rx_unused_tx, rx_unused) = channel(1);
730        let (tx, mut out_rx) = channel(16);
731        let worker = Worker {
732            id: 2,
733            rx: rx_unused,
734            tx,
735            realtime_priority: 0,
736        };
737        let mut track = Track::new("track".to_string(), 1, 1, 0, 0, 4, 48_000.0);
738        track.set_level(-3.0);
739        track.set_balance(0.4);
740        let state = make_state_with_track(track);
741        let output = unique_temp_wav("success");
742        let job = OfflineBounceWork {
743            state: state.clone(),
744            track_name: "track".to_string(),
745            output_path: output.to_string_lossy().to_string(),
746            start_sample: 0,
747            length_samples: 8,
748            tempo_bpm: 120.0,
749            tsig_num: 4,
750            tsig_denom: 4,
751            automation_lanes: vec![],
752            cancel: Arc::new(AtomicBool::new(false)),
753            apply_fader: false,
754        };
755
756        worker.process_offline_bounce(job).await;
757
758        let mut saw_progress = false;
759        let mut saw_complete = false;
760        let mut saw_ready = false;
761        while let Some(message) = out_rx.recv().await {
762            match message {
763                Message::OfflineBounceFinished {
764                    result:
765                        Ok(Action::TrackOfflineBounceProgress {
766                            track_name,
767                            progress,
768                            ..
769                        }),
770                } => {
771                    assert_eq!(track_name, "track");
772                    assert!(progress > 0.0);
773                    saw_progress = true;
774                }
775                Message::OfflineBounceFinished {
776                    result:
777                        Ok(Action::TrackOfflineBounce {
778                            track_name,
779                            output_path,
780                            ..
781                        }),
782                } => {
783                    assert_eq!(track_name, "track");
784                    assert_eq!(output_path, output.to_string_lossy());
785                    saw_complete = true;
786                }
787                Message::Ready(2) => {
788                    saw_ready = true;
789                    break;
790                }
791                other => panic!("unexpected message: {other:?}"),
792            }
793        }
794
795        assert!(saw_progress);
796        assert!(saw_complete);
797        assert!(saw_ready);
798        assert!(output.exists());
799        std::fs::remove_file(&output).expect("remove temp wav");
800        let track = state.lock().tracks.get("track").expect("track").lock();
801        assert_eq!(track.level(), -3.0);
802        assert_eq!(track.balance, 0.4);
803        assert!(!track.muted);
804    }
805}