Skip to main content

maolan_engine/workers/
worker.rs

1use crate::{
2    message::{
3        Action, Message, OfflineAutomationLane, OfflineAutomationPoint, OfflineAutomationTarget,
4        OfflineBounceWork, ProcessTask,
5    },
6    midi::io::MidiEvent,
7};
8#[cfg(unix)]
9use nix::libc;
10use std::collections::HashSet;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tokio::sync::mpsc::{Receiver, Sender};
14
15#[derive(Debug)]
16pub struct Worker {
17    id: usize,
18    rx: Receiver<Message>,
19    tx: Sender<Message>,
20    realtime_priority: i32,
21}
22
23impl Worker {
24    fn automation_lane_value_at(points: &[OfflineAutomationPoint], sample: usize) -> Option<f32> {
25        if points.is_empty() {
26            return None;
27        }
28        if sample <= points[0].sample {
29            return Some(points[0].value.clamp(0.0, 1.0));
30        }
31        if sample >= points[points.len().saturating_sub(1)].sample {
32            return Some(points[points.len().saturating_sub(1)].value.clamp(0.0, 1.0));
33        }
34        for segment in points.windows(2) {
35            let left = &segment[0];
36            let right = &segment[1];
37            if sample < left.sample || sample > right.sample {
38                continue;
39            }
40            let span = right.sample.saturating_sub(left.sample).max(1) as f32;
41            let t = (sample.saturating_sub(left.sample) as f32 / span).clamp(0.0, 1.0);
42            return Some((left.value + (right.value - left.value) * t).clamp(0.0, 1.0));
43        }
44        None
45    }
46
47    fn apply_freeze_automation_at_sample(
48        track: &mut crate::track::Track,
49        sample: usize,
50        lanes: &[OfflineAutomationLane],
51    ) {
52        for lane in lanes {
53            if matches!(
54                lane.target,
55                OfflineAutomationTarget::Volume | OfflineAutomationTarget::Balance
56            ) {
57                continue;
58            }
59            let Some(value) = Self::automation_lane_value_at(&lane.points, sample) else {
60                continue;
61            };
62            match lane.target {
63                OfflineAutomationTarget::Volume | OfflineAutomationTarget::Balance => {}
64                OfflineAutomationTarget::MidiCc { channel, cc } => {
65                    let cc_value = (value * 127.0).round() as u8;
66                    track.pending_automation_midi_events.push(MidiEvent::new(
67                        0,
68                        vec![0xB0 | channel.min(15), cc.min(127), cc_value],
69                    ));
70                }
71                #[cfg(all(unix, not(target_os = "macos")))]
72                OfflineAutomationTarget::Lv2Parameter {
73                    instance_id,
74                    index,
75                    min,
76                    max,
77                } => {
78                    let lo = min.min(max);
79                    let hi = max.max(min);
80                    let param_value = (lo + value * (hi - lo)).clamp(lo, hi);
81                    let _ = track.set_lv2_control_value(
82                        instance_id,
83                        index as usize,
84                        param_value as f64,
85                    );
86                }
87                OfflineAutomationTarget::Vst3Parameter {
88                    instance_id,
89                    param_id,
90                } => {
91                    let _ = track.set_vst3_parameter(instance_id, param_id, value.clamp(0.0, 1.0));
92                }
93                OfflineAutomationTarget::ClapParameter {
94                    instance_id,
95                    param_id,
96                    min,
97                    max,
98                } => {
99                    let lo = min.min(max);
100                    let hi = max.max(min);
101                    let param_value = (lo + value as f64 * (hi - lo)).clamp(lo, hi);
102                    let _ = track.set_clap_parameter_at(instance_id, param_id, param_value, 0);
103                }
104            }
105        }
106    }
107
108    fn prepare_track_for_freeze_render(track: &mut crate::track::Track) -> (f32, f32) {
109        let original_level = track.level();
110        let original_balance = track.balance;
111        track.set_level(0.0);
112        track.set_balance(0.0);
113        (original_level, original_balance)
114    }
115
116    fn restore_track_after_freeze_render(
117        track: &mut crate::track::Track,
118        original_level: f32,
119        original_balance: f32,
120    ) {
121        track.set_level(original_level);
122        track.set_balance(original_balance);
123    }
124
125    async fn process_offline_bounce(&self, job: OfflineBounceWork) {
126        let track_handle = job.state.lock().tracks.get(&job.track_name).cloned();
127        let Some(target_track) = track_handle else {
128            let _ = self
129                .tx
130                .send(Message::OfflineBounceFinished {
131                    result: Err(format!("Track not found: {}", job.track_name)),
132                })
133                .await;
134            return;
135        };
136        let (channels, block_size, sample_rate) = {
137            let t = target_track.lock();
138            let block_size = t
139                .audio
140                .outs
141                .first()
142                .map(|io| io.buffer.lock().len())
143                .or_else(|| t.audio.ins.first().map(|io| io.buffer.lock().len()))
144                .unwrap_or(0)
145                .max(1);
146            (
147                t.audio.outs.len().max(1),
148                block_size,
149                t.sample_rate.round().max(1.0) as i32,
150            )
151        };
152        let freeze_state = if job.apply_fader {
153            None
154        } else {
155            let t = target_track.lock();
156            Some(Self::prepare_track_for_freeze_render(t))
157        };
158
159        let all_tracks: Vec<_> = job.state.lock().tracks.values().cloned().collect();
160        let mut output_to_track: std::collections::HashMap<usize, String> =
161            std::collections::HashMap::new();
162        for handle in &all_tracks {
163            let t = handle.lock();
164            for out in &t.audio.outs {
165                output_to_track.insert(Arc::as_ptr(out) as usize, t.name.clone());
166            }
167        }
168        let mut relevant_names = HashSet::new();
169        let mut queue = vec![job.track_name.clone()];
170        while let Some(name) = queue.pop() {
171            if !relevant_names.insert(name.clone()) {
172                continue;
173            }
174            if let Some(handle) = all_tracks.iter().find(|h| h.lock().name == name) {
175                let t = handle.lock();
176                for input in &t.audio.ins {
177                    for conn in input.connections.lock().iter() {
178                        if let Some(source_name) =
179                            output_to_track.get(&(Arc::as_ptr(conn) as usize))
180                        {
181                            queue.push(source_name.clone());
182                        }
183                    }
184                }
185            }
186        }
187        let relevant_tracks: Vec<_> = all_tracks
188            .into_iter()
189            .filter(|h| relevant_names.contains(&h.lock().name))
190            .collect();
191
192        let mut output_samples =
193            Vec::<f32>::with_capacity(job.length_samples.saturating_mul(channels.max(1)));
194
195        let mut cursor = 0usize;
196        let mut last_reported_progress = 0.0_f32;
197        let mut total_process_time = Duration::ZERO;
198        let mut total_write_time = Duration::ZERO;
199        while cursor < job.length_samples {
200            if job.cancel.load(std::sync::atomic::Ordering::Relaxed) {
201                let _ = std::fs::remove_file(&job.output_path);
202                if let Some((original_level, original_balance)) = freeze_state {
203                    let t = target_track.lock();
204                    Self::restore_track_after_freeze_render(t, original_level, original_balance);
205                }
206                let _ = self
207                    .tx
208                    .send(Message::OfflineBounceFinished {
209                        result: Ok(Action::TrackOfflineBounceCanceled {
210                            track_name: job.track_name.clone(),
211                        }),
212                    })
213                    .await;
214                let _ = self.tx.send(Message::Ready(self.id)).await;
215                return;
216            }
217
218            let step = (job.length_samples - cursor).min(block_size);
219            for handle in &relevant_tracks {
220                let t = handle.lock();
221                t.audio.finished = false;
222                t.audio.processing = false;
223                t.set_transport_sample(job.start_sample.saturating_add(cursor));
224                t.set_loop_config(false, None);
225                t.set_transport_timing(job.tempo_bpm, job.tsig_num, job.tsig_denom);
226                t.set_clip_playback_enabled(true);
227                t.set_record_tap_enabled(false);
228            }
229
230            loop {
231                let mut all_finished = true;
232                let mut progressed = false;
233                for handle in &relevant_tracks {
234                    let t = handle.lock();
235                    if t.audio.finished {
236                        continue;
237                    }
238                    all_finished = false;
239                    if !t.audio.processing && t.audio.ready() {
240                        if t.name == job.track_name {
241                            Self::apply_freeze_automation_at_sample(
242                                t,
243                                job.start_sample.saturating_add(cursor),
244                                &job.automation_lanes,
245                            );
246                        }
247                        t.audio.processing = true;
248                        let p_start = Instant::now();
249                        t.process();
250                        total_process_time += p_start.elapsed();
251                        t.audio.processing = false;
252                        progressed = true;
253                    }
254                }
255                if all_finished {
256                    break;
257                }
258                if !progressed {
259                    for handle in &relevant_tracks {
260                        let t = handle.lock();
261                        if t.audio.finished {
262                            continue;
263                        }
264                        if t.name == job.track_name {
265                            Self::apply_freeze_automation_at_sample(
266                                t,
267                                job.start_sample.saturating_add(cursor),
268                                &job.automation_lanes,
269                            );
270                        }
271                        t.audio.processing = true;
272                        let p_start = Instant::now();
273                        t.process();
274                        total_process_time += p_start.elapsed();
275                        t.audio.processing = false;
276                    }
277                    break;
278                }
279            }
280
281            let write_start = Instant::now();
282            {
283                let t = target_track.lock();
284                let outs: Vec<_> = (0..channels)
285                    .map(|ch| t.audio.outs[ch].buffer.lock())
286                    .collect();
287                for i in 0..step {
288                    for out in outs.iter().take(channels) {
289                        let sample = out.get(i).copied().unwrap_or(0.0);
290                        output_samples.push(sample);
291                    }
292                }
293            }
294            total_write_time += write_start.elapsed();
295
296            cursor = cursor.saturating_add(step);
297            let progress = (cursor as f32 / job.length_samples as f32).clamp(0.0, 1.0);
298
299            if progress - last_reported_progress >= 0.01 || cursor >= job.length_samples {
300                last_reported_progress = progress;
301                let _ = self
302                    .tx
303                    .send(Message::OfflineBounceFinished {
304                        result: Ok(Action::TrackOfflineBounceProgress {
305                            track_name: job.track_name.clone(),
306                            progress,
307                            operation: Some("Rendering freeze".to_string()),
308                        }),
309                    })
310                    .await;
311            }
312        }
313
314        if let Err(e) = crate::audio_codec::write_wav_f32(
315            std::path::Path::new(&job.output_path),
316            &output_samples,
317            channels,
318            sample_rate as u32,
319        ) {
320            let _ = std::fs::remove_file(&job.output_path);
321            if let Some((original_level, original_balance)) = freeze_state {
322                let t = target_track.lock();
323                Self::restore_track_after_freeze_render(t, original_level, original_balance);
324            }
325            let _ = self
326                .tx
327                .send(Message::OfflineBounceFinished {
328                    result: Err(format!(
329                        "Failed to write offline bounce '{}': {e}",
330                        job.output_path
331                    )),
332                })
333                .await;
334            let _ = self.tx.send(Message::Ready(self.id)).await;
335            return;
336        }
337
338        if let Some((original_level, original_balance)) = freeze_state {
339            let t = target_track.lock();
340            Self::restore_track_after_freeze_render(t, original_level, original_balance);
341        }
342
343        let _ = self
344            .tx
345            .send(Message::OfflineBounceFinished {
346                result: Ok(Action::TrackOfflineBounce {
347                    track_name: job.track_name,
348                    output_path: job.output_path,
349                    start_sample: job.start_sample,
350                    length_samples: job.length_samples,
351                    automation_lanes: vec![],
352                    apply_fader: job.apply_fader,
353                }),
354            })
355            .await;
356        let _ = self.tx.send(Message::Ready(self.id)).await;
357    }
358
359    #[cfg(unix)]
360    fn try_enable_realtime(priority: i32) -> Result<(), String> {
361        let thread = unsafe { libc::pthread_self() };
362        let policy = libc::SCHED_FIFO;
363        let param = unsafe {
364            let mut p = std::mem::zeroed::<libc::sched_param>();
365            p.sched_priority = priority;
366            p
367        };
368        let rc = unsafe { libc::pthread_setschedparam(thread, policy, &param) };
369        if rc == 0 {
370            Ok(())
371        } else {
372            Err(format!("pthread_setschedparam failed with errno {}", rc))
373        }
374    }
375
376    #[cfg(not(unix))]
377    fn try_enable_realtime(_priority: i32) -> Result<(), String> {
378        Err("Realtime thread priority is not supported on this platform".to_string())
379    }
380
381    pub async fn new(
382        id: usize,
383        rx: Receiver<Message>,
384        tx: Sender<Message>,
385        realtime_priority: i32,
386    ) -> Worker {
387        let worker = Worker {
388            id,
389            rx,
390            tx,
391            realtime_priority,
392        };
393        worker.send(Message::Ready(id)).await;
394        worker
395    }
396
397    pub async fn send(&self, message: Message) {
398        self.tx
399            .send(message)
400            .await
401            .expect("Failed to send message from worker");
402    }
403
404    pub async fn work(&mut self) {
405        crate::enable_flush_denormals_to_zero();
406        if let Err(e) = Self::try_enable_realtime(self.realtime_priority) {
407            tracing::warn!(
408                "Worker {} realtime priority {} not enabled: {}",
409                self.id,
410                self.realtime_priority,
411                e
412            );
413        }
414        while let Some(message) = self.rx.recv().await {
415            match message {
416                Message::Request(Action::Quit) => {
417                    return;
418                }
419                Message::ProcessTask(task) => {
420                    tracing::debug!("worker {} received task {:?}", self.id, task);
421                    let (output_linear, process_epoch, parameter_updates) = match &task {
422                        ProcessTask::Track(t) => {
423                            let track = t.lock();
424                            let process_epoch = track.process_epoch;
425                            track.process();
426                            track.audio.processing = false;
427                            let updates = std::mem::take(track.echoed_parameter_updates.lock());
428                            (track.output_meter_linear(), process_epoch, updates)
429                        }
430                        ProcessTask::FolderInput(t) => {
431                            let track = t.lock();
432                            let process_epoch = track.process_epoch;
433                            track.process_folder_input();
434                            track.audio.processing = false;
435                            let updates = std::mem::take(track.echoed_parameter_updates.lock());
436                            (track.output_meter_linear(), process_epoch, updates)
437                        }
438                        ProcessTask::FolderOutput(t) => {
439                            let track = t.lock();
440                            let process_epoch = track.process_epoch;
441                            track.process_folder_output();
442                            track.audio.processing = false;
443                            let updates = std::mem::take(track.echoed_parameter_updates.lock());
444                            (track.output_meter_linear(), process_epoch, updates)
445                        }
446                        ProcessTask::Plugin { track, kind, index } => {
447                            let track = track.lock();
448                            let process_epoch = track.process_epoch;
449                            track.process_plugin(*kind, *index);
450                            track.audio.processing = false;
451                            let updates = std::mem::take(track.echoed_parameter_updates.lock());
452                            (track.output_meter_linear(), process_epoch, updates)
453                        }
454                    };
455                    tracing::debug!(
456                        "worker {} finished task {:?}, output_linear={:?}",
457                        self.id,
458                        task,
459                        output_linear
460                    );
461                    let _ = self
462                        .tx
463                        .send(Message::Finished {
464                            worker_id: self.id,
465                            task,
466                            output_linear,
467                            process_epoch,
468                            parameter_updates,
469                        })
470                        .await;
471                }
472                Message::ProcessOfflineBounce(job) => {
473                    self.process_offline_bounce(job).await;
474                }
475                _ => {}
476            }
477        }
478    }
479}
480
481#[cfg(test)]
482mod tests {
483    use super::Worker;
484    use crate::message::{
485        Action, Message, OfflineAutomationLane, OfflineAutomationPoint, OfflineAutomationTarget,
486        OfflineBounceWork,
487    };
488    use crate::mutex::UnsafeMutex;
489    use crate::state::State;
490    use crate::track::Track;
491    use std::path::PathBuf;
492    use std::sync::{Arc, atomic::AtomicBool};
493    use std::time::{SystemTime, UNIX_EPOCH};
494    use tokio::sync::mpsc::channel;
495
496    fn make_state_with_track(track: Track) -> Arc<UnsafeMutex<State>> {
497        let mut state = State::default();
498        state.tracks.insert(
499            track.name.clone(),
500            Arc::new(UnsafeMutex::new(Box::new(track))),
501        );
502        Arc::new(UnsafeMutex::new(state))
503    }
504
505    fn unique_temp_wav(name: &str) -> PathBuf {
506        let nanos = SystemTime::now()
507            .duration_since(UNIX_EPOCH)
508            .expect("clock")
509            .as_nanos();
510        std::env::temp_dir().join(format!("maolan_{name}_{nanos}.wav"))
511    }
512
513    #[test]
514    fn prepare_track_for_freeze_render_neutralizes_level_and_balance() {
515        let mut track = Track::new("track".to_string(), 1, 2, 0, 0, 64, 48_000.0);
516        track.set_level(-6.0);
517        track.set_balance(0.35);
518
519        let (level, balance) = Worker::prepare_track_for_freeze_render(&mut track);
520
521        assert_eq!(level, -6.0);
522        assert_eq!(balance, 0.35);
523        assert_eq!(track.level(), 0.0);
524        assert_eq!(track.balance, 0.0);
525
526        Worker::restore_track_after_freeze_render(&mut track, level, balance);
527        assert_eq!(track.level(), -6.0);
528        assert_eq!(track.balance, 0.35);
529    }
530
531    #[test]
532    fn freeze_automation_ignores_volume_and_balance_lanes() {
533        let mut track = Track::new("track".to_string(), 1, 2, 0, 1, 64, 48_000.0);
534        let lanes = vec![
535            OfflineAutomationLane {
536                target: OfflineAutomationTarget::Volume,
537                points: vec![OfflineAutomationPoint {
538                    sample: 0,
539                    value: 0.0,
540                }],
541            },
542            OfflineAutomationLane {
543                target: OfflineAutomationTarget::Balance,
544                points: vec![OfflineAutomationPoint {
545                    sample: 0,
546                    value: 1.0,
547                }],
548            },
549            OfflineAutomationLane {
550                target: OfflineAutomationTarget::MidiCc { channel: 0, cc: 7 },
551                points: vec![OfflineAutomationPoint {
552                    sample: 0,
553                    value: 1.0,
554                }],
555            },
556        ];
557
558        Worker::apply_freeze_automation_at_sample(&mut track, 0, &lanes);
559
560        assert_eq!(track.level(), 0.0);
561        assert_eq!(track.balance, 0.0);
562        assert_eq!(track.pending_automation_midi_events.len(), 1);
563        assert_eq!(
564            track.pending_automation_midi_events[0].data,
565            vec![0xB0, 7, 127]
566        );
567    }
568
569    #[test]
570    fn automation_lane_value_at_interpolates_between_points() {
571        let value = Worker::automation_lane_value_at(
572            &[
573                OfflineAutomationPoint {
574                    sample: 10,
575                    value: 0.25,
576                },
577                OfflineAutomationPoint {
578                    sample: 20,
579                    value: 0.75,
580                },
581            ],
582            15,
583        )
584        .expect("value");
585
586        assert!((value - 0.5).abs() < 1.0e-6);
587    }
588
589    #[test]
590    fn freeze_automation_applies_interpolated_midi_cc_lane() {
591        let mut track = Track::new("track".to_string(), 1, 1, 0, 1, 64, 48_000.0);
592        let lanes = vec![OfflineAutomationLane {
593            target: OfflineAutomationTarget::MidiCc { channel: 0, cc: 7 },
594            points: vec![
595                OfflineAutomationPoint {
596                    sample: 0,
597                    value: 0.0,
598                },
599                OfflineAutomationPoint {
600                    sample: 10,
601                    value: 1.0,
602                },
603            ],
604        }];
605
606        Worker::apply_freeze_automation_at_sample(&mut track, 5, &lanes);
607        assert_eq!(track.pending_automation_midi_events.len(), 1);
608        assert_eq!(track.pending_automation_midi_events[0].data[2], 64);
609
610        track.pending_automation_midi_events.clear();
611        Worker::apply_freeze_automation_at_sample(&mut track, 2, &lanes);
612        assert_eq!(track.pending_automation_midi_events.len(), 1);
613        assert_eq!(track.pending_automation_midi_events[0].data[2], 25);
614    }
615
616    #[tokio::test]
617    async fn process_offline_bounce_errors_when_track_is_missing() {
618        let (_rx_unused_tx, rx_unused) = channel(1);
619        let (tx, mut out_rx) = channel(8);
620        let worker = Worker {
621            id: 7,
622            rx: rx_unused,
623            tx,
624            realtime_priority: 0,
625        };
626        let job = OfflineBounceWork {
627            state: Arc::new(UnsafeMutex::new(State::default())),
628            track_name: "missing".to_string(),
629            output_path: unique_temp_wav("missing").to_string_lossy().to_string(),
630            start_sample: 0,
631            length_samples: 8,
632            tempo_bpm: 120.0,
633            tsig_num: 4,
634            tsig_denom: 4,
635            automation_lanes: vec![],
636            cancel: Arc::new(AtomicBool::new(false)),
637            apply_fader: false,
638        };
639
640        worker.process_offline_bounce(job).await;
641
642        match out_rx.recv().await.expect("message") {
643            Message::OfflineBounceFinished { result: Err(err) } => {
644                assert!(err.contains("Track not found: missing"));
645            }
646            other => panic!("unexpected message: {other:?}"),
647        }
648    }
649
650    #[tokio::test]
651    async fn process_offline_bounce_cancels_and_restores_track_state() {
652        let (_rx_unused_tx, rx_unused) = channel(1);
653        let (tx, mut out_rx) = channel(8);
654        let worker = Worker {
655            id: 5,
656            rx: rx_unused,
657            tx,
658            realtime_priority: 0,
659        };
660        let mut track = Track::new("track".to_string(), 1, 2, 0, 0, 4, 48_000.0);
661        track.set_level(-9.0);
662        track.set_balance(-0.3);
663        let state = make_state_with_track(track);
664        let job = OfflineBounceWork {
665            state: state.clone(),
666            track_name: "track".to_string(),
667            output_path: unique_temp_wav("cancel").to_string_lossy().to_string(),
668            start_sample: 0,
669            length_samples: 8,
670            tempo_bpm: 120.0,
671            tsig_num: 4,
672            tsig_denom: 4,
673            automation_lanes: vec![],
674            cancel: Arc::new(AtomicBool::new(true)),
675            apply_fader: false,
676        };
677
678        worker.process_offline_bounce(job).await;
679
680        match out_rx.recv().await.expect("message") {
681            Message::OfflineBounceFinished {
682                result: Ok(Action::TrackOfflineBounceCanceled { track_name }),
683            } => assert_eq!(track_name, "track"),
684            other => panic!("unexpected message: {other:?}"),
685        }
686        assert!(matches!(out_rx.recv().await, Some(Message::Ready(5))));
687        let track = state.lock().tracks.get("track").expect("track").lock();
688        assert_eq!(track.level(), -9.0);
689        assert_eq!(track.balance, -0.3);
690    }
691
692    #[tokio::test]
693    async fn process_offline_bounce_restores_track_state_on_write_failure() {
694        let (_rx_unused_tx, rx_unused) = channel(1);
695        let (tx, mut out_rx) = channel(8);
696        let worker = Worker {
697            id: 3,
698            rx: rx_unused,
699            tx,
700            realtime_priority: 0,
701        };
702        let mut track = Track::new("track".to_string(), 1, 2, 0, 0, 4, 48_000.0);
703        track.set_level(-4.0);
704        track.set_balance(0.25);
705        let state = make_state_with_track(track);
706        let output_path = std::env::temp_dir().to_string_lossy().to_string();
707        let job = OfflineBounceWork {
708            state: state.clone(),
709            track_name: "track".to_string(),
710            output_path,
711            start_sample: 0,
712            length_samples: 4,
713            tempo_bpm: 120.0,
714            tsig_num: 4,
715            tsig_denom: 4,
716            automation_lanes: vec![],
717            cancel: Arc::new(AtomicBool::new(false)),
718            apply_fader: false,
719        };
720
721        worker.process_offline_bounce(job).await;
722
723        let mut saw_error = false;
724        while let Some(message) = out_rx.recv().await {
725            match message {
726                Message::OfflineBounceFinished {
727                    result: Ok(Action::TrackOfflineBounceProgress { .. }),
728                } => {}
729                Message::OfflineBounceFinished { result: Err(err) } => {
730                    assert!(
731                        err.contains("Failed to create offline bounce")
732                            || err.contains("Failed to write offline bounce")
733                            || err.contains("Failed to finalize offline bounce")
734                    );
735                    saw_error = true;
736                }
737                Message::Ready(3) => break,
738                other => panic!("unexpected message: {other:?}"),
739            }
740        }
741        assert!(saw_error);
742        let track = state.lock().tracks.get("track").expect("track").lock();
743        assert_eq!(track.level(), -4.0);
744        assert_eq!(track.balance, 0.25);
745    }
746
747    #[tokio::test]
748    async fn process_offline_bounce_emits_progress_and_completion() {
749        let (_rx_unused_tx, rx_unused) = channel(1);
750        let (tx, mut out_rx) = channel(16);
751        let worker = Worker {
752            id: 2,
753            rx: rx_unused,
754            tx,
755            realtime_priority: 0,
756        };
757        let mut track = Track::new("track".to_string(), 1, 1, 0, 0, 4, 48_000.0);
758        track.set_level(-3.0);
759        track.set_balance(0.4);
760        let state = make_state_with_track(track);
761        let output = unique_temp_wav("success");
762        let job = OfflineBounceWork {
763            state: state.clone(),
764            track_name: "track".to_string(),
765            output_path: output.to_string_lossy().to_string(),
766            start_sample: 0,
767            length_samples: 8,
768            tempo_bpm: 120.0,
769            tsig_num: 4,
770            tsig_denom: 4,
771            automation_lanes: vec![],
772            cancel: Arc::new(AtomicBool::new(false)),
773            apply_fader: false,
774        };
775
776        worker.process_offline_bounce(job).await;
777
778        let mut saw_progress = false;
779        let mut saw_complete = false;
780        let mut saw_ready = false;
781        while let Some(message) = out_rx.recv().await {
782            match message {
783                Message::OfflineBounceFinished {
784                    result:
785                        Ok(Action::TrackOfflineBounceProgress {
786                            track_name,
787                            progress,
788                            ..
789                        }),
790                } => {
791                    assert_eq!(track_name, "track");
792                    assert!(progress > 0.0);
793                    saw_progress = true;
794                }
795                Message::OfflineBounceFinished {
796                    result:
797                        Ok(Action::TrackOfflineBounce {
798                            track_name,
799                            output_path,
800                            ..
801                        }),
802                } => {
803                    assert_eq!(track_name, "track");
804                    assert_eq!(output_path, output.to_string_lossy());
805                    saw_complete = true;
806                }
807                Message::Ready(2) => {
808                    saw_ready = true;
809                    break;
810                }
811                other => panic!("unexpected message: {other:?}"),
812            }
813        }
814
815        assert!(saw_progress);
816        assert!(saw_complete);
817        assert!(saw_ready);
818        assert!(output.exists());
819        std::fs::remove_file(&output).expect("remove temp wav");
820        let track = state.lock().tracks.get("track").expect("track").lock();
821        assert_eq!(track.level(), -3.0);
822        assert_eq!(track.balance, 0.4);
823        assert!(!track.muted);
824    }
825}