Skip to main content

maolan_engine/workers/
worker.rs

1use crate::message::{
2    Action, Message, OfflineAutomationLane, OfflineAutomationPoint, OfflineAutomationTarget,
3    OfflineBounceWork,
4};
5#[cfg(unix)]
6use nix::libc;
7use std::collections::HashSet;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10use tokio::sync::mpsc::{Receiver, Sender};
11use tracing::{error, info};
12
13#[derive(Debug)]
14pub struct Worker {
15    id: usize,
16    rx: Receiver<Message>,
17    tx: Sender<Message>,
18}
19
20impl Worker {
21    fn automation_lane_value_at(points: &[OfflineAutomationPoint], sample: usize) -> Option<f32> {
22        if points.is_empty() {
23            return None;
24        }
25        if sample <= points[0].sample {
26            return Some(points[0].value.clamp(0.0, 1.0));
27        }
28        if sample >= points[points.len().saturating_sub(1)].sample {
29            return Some(points[points.len().saturating_sub(1)].value.clamp(0.0, 1.0));
30        }
31        for segment in points.windows(2) {
32            let left = &segment[0];
33            let right = &segment[1];
34            if sample < left.sample || sample > right.sample {
35                continue;
36            }
37            let span = right.sample.saturating_sub(left.sample).max(1) as f32;
38            let t = (sample.saturating_sub(left.sample) as f32 / span).clamp(0.0, 1.0);
39            return Some((left.value + (right.value - left.value) * t).clamp(0.0, 1.0));
40        }
41        None
42    }
43
44    fn apply_freeze_automation_at_sample(
45        track: &mut crate::track::Track,
46        sample: usize,
47        lanes: &[OfflineAutomationLane],
48    ) {
49        for lane in lanes {
50            if matches!(
51                lane.target,
52                OfflineAutomationTarget::Volume | OfflineAutomationTarget::Balance
53            ) {
54                continue;
55            }
56            let Some(value) = Self::automation_lane_value_at(&lane.points, sample) else {
57                continue;
58            };
59            match lane.target {
60                OfflineAutomationTarget::Volume | OfflineAutomationTarget::Balance => {}
61                OfflineAutomationTarget::Mute => {
62                    track.set_muted(value >= 0.5);
63                }
64                #[cfg(all(unix, not(target_os = "macos")))]
65                OfflineAutomationTarget::Lv2Parameter {
66                    instance_id,
67                    index,
68                    min,
69                    max,
70                } => {
71                    let lo = min.min(max);
72                    let hi = max.max(min);
73                    let param_value = (lo + value * (hi - lo)).clamp(lo, hi);
74                    let _ = track.set_lv2_control_value(instance_id, index, param_value);
75                }
76                OfflineAutomationTarget::Vst3Parameter {
77                    instance_id,
78                    param_id,
79                } => {
80                    let _ = track.set_vst3_parameter(instance_id, param_id, value.clamp(0.0, 1.0));
81                }
82                OfflineAutomationTarget::ClapParameter {
83                    instance_id,
84                    param_id,
85                    min,
86                    max,
87                } => {
88                    let lo = min.min(max);
89                    let hi = max.max(min);
90                    let param_value = (lo + value as f64 * (hi - lo)).clamp(lo, hi);
91                    let _ = track.set_clap_parameter_at(instance_id, param_id, param_value, 0);
92                }
93            }
94        }
95    }
96
97    fn prepare_track_for_freeze_render(track: &mut crate::track::Track) -> (f32, f32) {
98        let original_level = track.level();
99        let original_balance = track.balance;
100        track.set_level(0.0);
101        track.set_balance(0.0);
102        (original_level, original_balance)
103    }
104
105    fn restore_track_after_freeze_render(
106        track: &mut crate::track::Track,
107        original_level: f32,
108        original_balance: f32,
109    ) {
110        track.set_level(original_level);
111        track.set_balance(original_balance);
112    }
113
114    async fn process_offline_bounce(&self, job: OfflineBounceWork) {
115        let track_handle = job.state.lock().tracks.get(&job.track_name).cloned();
116        let Some(target_track) = track_handle else {
117            let _ = self
118                .tx
119                .send(Message::OfflineBounceFinished {
120                    result: Err(format!("Track not found: {}", job.track_name)),
121                })
122                .await;
123            return;
124        };
125        let (channels, block_size, sample_rate) = {
126            let t = target_track.lock();
127            let block_size = t
128                .audio
129                .outs
130                .first()
131                .map(|io| io.buffer.lock().len())
132                .or_else(|| t.audio.ins.first().map(|io| io.buffer.lock().len()))
133                .unwrap_or(0)
134                .max(1);
135            (
136                t.audio.outs.len().max(1),
137                block_size,
138                t.sample_rate.round().max(1.0) as i32,
139            )
140        };
141        let freeze_state = if job.apply_fader {
142            None
143        } else {
144            let t = target_track.lock();
145            Some(Self::prepare_track_for_freeze_render(t))
146        };
147
148        // Collect all tracks once and compute the dependency closure for the
149        // target track so that unrelated tracks are skipped during bounce.
150        let all_tracks: Vec<_> = job.state.lock().tracks.values().cloned().collect();
151        let mut output_to_track: std::collections::HashMap<usize, String> =
152            std::collections::HashMap::new();
153        for handle in &all_tracks {
154            let t = handle.lock();
155            for out in &t.audio.outs {
156                output_to_track.insert(Arc::as_ptr(out) as usize, t.name.clone());
157            }
158        }
159        let mut relevant_names = HashSet::new();
160        let mut queue = vec![job.track_name.clone()];
161        while let Some(name) = queue.pop() {
162            if !relevant_names.insert(name.clone()) {
163                continue;
164            }
165            if let Some(handle) = all_tracks.iter().find(|h| h.lock().name == name) {
166                let t = handle.lock();
167                for input in &t.audio.ins {
168                    for conn in input.connections.lock().iter() {
169                        if let Some(source_name) =
170                            output_to_track.get(&(Arc::as_ptr(conn) as usize))
171                        {
172                            queue.push(source_name.clone());
173                        }
174                    }
175                }
176            }
177        }
178        let relevant_tracks: Vec<_> = all_tracks
179            .into_iter()
180            .filter(|h| relevant_names.contains(&h.lock().name))
181            .collect();
182
183        let spec = hound::WavSpec {
184            channels: channels as u16,
185            sample_rate: sample_rate as u32,
186            bits_per_sample: 32,
187            sample_format: hound::SampleFormat::Float,
188        };
189        let mut writer = match hound::WavWriter::create(&job.output_path, spec) {
190            Ok(w) => w,
191            Err(e) => {
192                if let Some((original_level, original_balance)) = freeze_state {
193                    let t = target_track.lock();
194                    Self::restore_track_after_freeze_render(t, original_level, original_balance);
195                }
196                let _ = self
197                    .tx
198                    .send(Message::OfflineBounceFinished {
199                        result: Err(format!(
200                            "Failed to create offline bounce '{}': {e}",
201                            job.output_path
202                        )),
203                    })
204                    .await;
205                let _ = self.tx.send(Message::Ready(self.id)).await;
206                return;
207            }
208        };
209
210        let mut cursor = 0usize;
211        let mut last_reported_progress = 0.0_f32;
212        let mut total_process_time = Duration::ZERO;
213        let mut total_write_time = Duration::ZERO;
214        let mut block_count = 0usize;
215        let bounce_start = Instant::now();
216        while cursor < job.length_samples {
217            if job.cancel.load(std::sync::atomic::Ordering::Relaxed) {
218                let _ = writer.finalize();
219                let _ = std::fs::remove_file(&job.output_path);
220                if let Some((original_level, original_balance)) = freeze_state {
221                    let t = target_track.lock();
222                    Self::restore_track_after_freeze_render(t, original_level, original_balance);
223                }
224                let _ = self
225                    .tx
226                    .send(Message::OfflineBounceFinished {
227                        result: Ok(Action::TrackOfflineBounceCanceled {
228                            track_name: job.track_name.clone(),
229                        }),
230                    })
231                    .await;
232                let _ = self.tx.send(Message::Ready(self.id)).await;
233                return;
234            }
235
236            let step = (job.length_samples - cursor).min(block_size);
237            for handle in &relevant_tracks {
238                let t = handle.lock();
239                t.audio.finished = false;
240                t.audio.processing = false;
241                t.set_transport_sample(job.start_sample.saturating_add(cursor));
242                t.set_loop_config(false, None);
243                t.set_transport_timing(job.tempo_bpm, job.tsig_num, job.tsig_denom);
244                t.set_clip_playback_enabled(true);
245                t.set_record_tap_enabled(false);
246            }
247
248            let block_process_start = Instant::now();
249            loop {
250                let mut all_finished = true;
251                let mut progressed = false;
252                for handle in &relevant_tracks {
253                    let t = handle.lock();
254                    if t.audio.finished {
255                        continue;
256                    }
257                    all_finished = false;
258                    if !t.audio.processing && t.audio.ready() {
259                        if t.name == job.track_name {
260                            Self::apply_freeze_automation_at_sample(
261                                t,
262                                job.start_sample.saturating_add(cursor),
263                                &job.automation_lanes,
264                            );
265                        }
266                        t.audio.processing = true;
267                        let p_start = Instant::now();
268                        t.process();
269                        total_process_time += p_start.elapsed();
270                        t.audio.processing = false;
271                        progressed = true;
272                    }
273                }
274                if all_finished {
275                    break;
276                }
277                if !progressed {
278                    // No track was ready — force-process all remaining non-finished tracks.
279                    for handle in &relevant_tracks {
280                        let t = handle.lock();
281                        if t.audio.finished {
282                            continue;
283                        }
284                        if t.name == job.track_name {
285                            Self::apply_freeze_automation_at_sample(
286                                t,
287                                job.start_sample.saturating_add(cursor),
288                                &job.automation_lanes,
289                            );
290                        }
291                        t.audio.processing = true;
292                        let p_start = Instant::now();
293                        t.process();
294                        total_process_time += p_start.elapsed();
295                        t.audio.processing = false;
296                    }
297                    break;
298                }
299            }
300            let _block_process_elapsed = block_process_start.elapsed();
301
302            let write_start = Instant::now();
303            let write_result = {
304                let t = target_track.lock();
305                let outs: Vec<_> = (0..channels)
306                    .map(|ch| t.audio.outs[ch].buffer.lock())
307                    .collect();
308                (|| -> Result<(), hound::Error> {
309                    for i in 0..step {
310                        for out in outs.iter().take(channels) {
311                            let sample = out.get(i).copied().unwrap_or(0.0);
312                            writer.write_sample(sample)?;
313                        }
314                    }
315                    Ok(())
316                })()
317            };
318            total_write_time += write_start.elapsed();
319            if let Err(e) = write_result {
320                let _ = writer.finalize();
321                let _ = std::fs::remove_file(&job.output_path);
322                if let Some((original_level, original_balance)) = freeze_state {
323                    let t = target_track.lock();
324                    Self::restore_track_after_freeze_render(t, original_level, original_balance);
325                }
326                let _ = self
327                    .tx
328                    .send(Message::OfflineBounceFinished {
329                        result: Err(format!(
330                            "Failed to write offline bounce '{}': {e}",
331                            job.output_path
332                        )),
333                    })
334                    .await;
335                let _ = self.tx.send(Message::Ready(self.id)).await;
336                return;
337            }
338
339            cursor = cursor.saturating_add(step);
340            block_count += 1;
341            let progress = (cursor as f32 / job.length_samples as f32).clamp(0.0, 1.0);
342            // Throttle progress reports to ~1 % steps to avoid async channel overhead.
343            if progress - last_reported_progress >= 0.01 || cursor >= job.length_samples {
344                last_reported_progress = progress;
345                let _ = self
346                    .tx
347                    .send(Message::OfflineBounceFinished {
348                        result: Ok(Action::TrackOfflineBounceProgress {
349                            track_name: job.track_name.clone(),
350                            progress,
351                            operation: Some("Rendering freeze".to_string()),
352                        }),
353                    })
354                    .await;
355            }
356        }
357        let bounce_elapsed = bounce_start.elapsed();
358        info!(
359            "Bounce '{}' — total: {:?}, blocks: {}, process: {:?}, write: {:?}",
360            job.track_name, bounce_elapsed, block_count, total_process_time, total_write_time
361        );
362
363        if let Err(e) = writer.finalize() {
364            let _ = std::fs::remove_file(&job.output_path);
365            if let Some((original_level, original_balance)) = freeze_state {
366                let t = target_track.lock();
367                Self::restore_track_after_freeze_render(t, original_level, original_balance);
368            }
369            let _ = self
370                .tx
371                .send(Message::OfflineBounceFinished {
372                    result: Err(format!(
373                        "Failed to finalize offline bounce '{}': {e}",
374                        job.output_path
375                    )),
376                })
377                .await;
378            let _ = self.tx.send(Message::Ready(self.id)).await;
379            return;
380        }
381
382        if let Some((original_level, original_balance)) = freeze_state {
383            let t = target_track.lock();
384            Self::restore_track_after_freeze_render(t, original_level, original_balance);
385        }
386
387        let _ = self
388            .tx
389            .send(Message::OfflineBounceFinished {
390                result: Ok(Action::TrackOfflineBounce {
391                    track_name: job.track_name,
392                    output_path: job.output_path,
393                    start_sample: job.start_sample,
394                    length_samples: job.length_samples,
395                    automation_lanes: vec![],
396                    apply_fader: job.apply_fader,
397                }),
398            })
399            .await;
400        let _ = self.tx.send(Message::Ready(self.id)).await;
401    }
402
403    #[cfg(unix)]
404    fn try_enable_realtime() -> Result<(), String> {
405        let thread = unsafe { libc::pthread_self() };
406        let policy = libc::SCHED_FIFO;
407        let param = unsafe {
408            let mut p = std::mem::zeroed::<libc::sched_param>();
409            p.sched_priority = 10;
410            p
411        };
412        let rc = unsafe { libc::pthread_setschedparam(thread, policy, &param) };
413        if rc == 0 {
414            Ok(())
415        } else {
416            Err(format!("pthread_setschedparam failed with errno {}", rc))
417        }
418    }
419
420    #[cfg(not(unix))]
421    fn try_enable_realtime() -> Result<(), String> {
422        Err("Realtime thread priority is not supported on this platform".to_string())
423    }
424
425    pub async fn new(id: usize, rx: Receiver<Message>, tx: Sender<Message>) -> Worker {
426        let worker = Worker { id, rx, tx };
427        worker.send(Message::Ready(id)).await;
428        worker
429    }
430
431    pub async fn send(&self, message: Message) {
432        self.tx
433            .send(message)
434            .await
435            .expect("Failed to send message from worker");
436    }
437
438    pub async fn work(&mut self) {
439        if let Err(e) = Self::try_enable_realtime() {
440            error!("Worker {} realtime priority not enabled: {}", self.id, e);
441        }
442        while let Some(message) = self.rx.recv().await {
443            match message {
444                Message::Request(Action::Quit) => {
445                    return;
446                }
447                Message::ProcessTrack(t) => {
448                    let (track_name, output_linear, process_epoch) = {
449                        let track = t.lock();
450                        let process_epoch = track.process_epoch;
451                        let started = Instant::now();
452                        track.process();
453                        let elapsed = started.elapsed();
454                        if elapsed.as_millis() > 20 {
455                            tracing::warn!(
456                                "Slow track process '{}' took {:.3} ms",
457                                track.name,
458                                elapsed.as_secs_f64() * 1000.0
459                            );
460                        }
461                        track.audio.processing = false;
462                        (
463                            track.name.clone(),
464                            track.output_meter_linear(),
465                            process_epoch,
466                        )
467                    };
468                    match self
469                        .tx
470                        .send(Message::Finished {
471                            worker_id: self.id,
472                            track_name,
473                            output_linear,
474                            process_epoch,
475                        })
476                        .await
477                    {
478                        Ok(_) => {}
479                        Err(e) => {
480                            error!("Error while sending Finished: {}", e);
481                        }
482                    }
483                }
484                Message::ProcessOfflineBounce(job) => {
485                    self.process_offline_bounce(job).await;
486                }
487                _ => {}
488            }
489        }
490    }
491}
492
493#[cfg(test)]
494mod tests {
495    use super::Worker;
496    use crate::message::{
497        Action, Message, OfflineAutomationLane, OfflineAutomationPoint, OfflineAutomationTarget,
498        OfflineBounceWork,
499    };
500    use crate::mutex::UnsafeMutex;
501    use crate::state::State;
502    use crate::track::Track;
503    use std::path::PathBuf;
504    use std::sync::{Arc, atomic::AtomicBool};
505    use std::time::{SystemTime, UNIX_EPOCH};
506    use tokio::sync::mpsc::channel;
507
508    fn make_state_with_track(track: Track) -> Arc<UnsafeMutex<State>> {
509        let mut state = State::default();
510        state.tracks.insert(
511            track.name.clone(),
512            Arc::new(UnsafeMutex::new(Box::new(track))),
513        );
514        Arc::new(UnsafeMutex::new(state))
515    }
516
517    fn unique_temp_wav(name: &str) -> PathBuf {
518        let nanos = SystemTime::now()
519            .duration_since(UNIX_EPOCH)
520            .expect("clock")
521            .as_nanos();
522        std::env::temp_dir().join(format!("maolan_{name}_{nanos}.wav"))
523    }
524
525    #[test]
526    fn prepare_track_for_freeze_render_neutralizes_level_and_balance() {
527        let mut track = Track::new("track".to_string(), 1, 2, 0, 0, 64, 48_000.0);
528        track.set_level(-6.0);
529        track.set_balance(0.35);
530
531        let (level, balance) = Worker::prepare_track_for_freeze_render(&mut track);
532
533        assert_eq!(level, -6.0);
534        assert_eq!(balance, 0.35);
535        assert_eq!(track.level(), 0.0);
536        assert_eq!(track.balance, 0.0);
537
538        Worker::restore_track_after_freeze_render(&mut track, level, balance);
539        assert_eq!(track.level(), -6.0);
540        assert_eq!(track.balance, 0.35);
541    }
542
543    #[test]
544    fn freeze_automation_ignores_volume_and_balance_lanes() {
545        let mut track = Track::new("track".to_string(), 1, 2, 0, 0, 64, 48_000.0);
546        let lanes = vec![
547            OfflineAutomationLane {
548                target: OfflineAutomationTarget::Volume,
549                points: vec![OfflineAutomationPoint {
550                    sample: 0,
551                    value: 0.0,
552                }],
553            },
554            OfflineAutomationLane {
555                target: OfflineAutomationTarget::Balance,
556                points: vec![OfflineAutomationPoint {
557                    sample: 0,
558                    value: 1.0,
559                }],
560            },
561            OfflineAutomationLane {
562                target: OfflineAutomationTarget::Mute,
563                points: vec![OfflineAutomationPoint {
564                    sample: 0,
565                    value: 1.0,
566                }],
567            },
568        ];
569
570        Worker::apply_freeze_automation_at_sample(&mut track, 0, &lanes);
571
572        assert_eq!(track.level(), 0.0);
573        assert_eq!(track.balance, 0.0);
574        assert!(track.muted);
575    }
576
577    #[test]
578    fn automation_lane_value_at_interpolates_between_points() {
579        let value = Worker::automation_lane_value_at(
580            &[
581                OfflineAutomationPoint {
582                    sample: 10,
583                    value: 0.25,
584                },
585                OfflineAutomationPoint {
586                    sample: 20,
587                    value: 0.75,
588                },
589            ],
590            15,
591        )
592        .expect("value");
593
594        assert!((value - 0.5).abs() < 1.0e-6);
595    }
596
597    #[test]
598    fn freeze_automation_applies_interpolated_mute_lane() {
599        let mut track = Track::new("track".to_string(), 1, 1, 0, 0, 64, 48_000.0);
600        let lanes = vec![OfflineAutomationLane {
601            target: OfflineAutomationTarget::Mute,
602            points: vec![
603                OfflineAutomationPoint {
604                    sample: 0,
605                    value: 0.0,
606                },
607                OfflineAutomationPoint {
608                    sample: 10,
609                    value: 1.0,
610                },
611            ],
612        }];
613
614        Worker::apply_freeze_automation_at_sample(&mut track, 5, &lanes);
615        assert!(track.muted);
616
617        track.set_muted(false);
618        Worker::apply_freeze_automation_at_sample(&mut track, 2, &lanes);
619        assert!(!track.muted);
620    }
621
622    #[tokio::test]
623    async fn process_offline_bounce_errors_when_track_is_missing() {
624        let (_rx_unused_tx, rx_unused) = channel(1);
625        let (tx, mut out_rx) = channel(8);
626        let worker = Worker {
627            id: 7,
628            rx: rx_unused,
629            tx,
630        };
631        let job = OfflineBounceWork {
632            state: Arc::new(UnsafeMutex::new(State::default())),
633            track_name: "missing".to_string(),
634            output_path: unique_temp_wav("missing").to_string_lossy().to_string(),
635            start_sample: 0,
636            length_samples: 8,
637            tempo_bpm: 120.0,
638            tsig_num: 4,
639            tsig_denom: 4,
640            automation_lanes: vec![],
641            cancel: Arc::new(AtomicBool::new(false)),
642            apply_fader: false,
643        };
644
645        worker.process_offline_bounce(job).await;
646
647        match out_rx.recv().await.expect("message") {
648            Message::OfflineBounceFinished { result: Err(err) } => {
649                assert!(err.contains("Track not found: missing"));
650            }
651            other => panic!("unexpected message: {other:?}"),
652        }
653    }
654
655    #[tokio::test]
656    async fn process_offline_bounce_cancels_and_restores_track_state() {
657        let (_rx_unused_tx, rx_unused) = channel(1);
658        let (tx, mut out_rx) = channel(8);
659        let worker = Worker {
660            id: 5,
661            rx: rx_unused,
662            tx,
663        };
664        let mut track = Track::new("track".to_string(), 1, 2, 0, 0, 4, 48_000.0);
665        track.set_level(-9.0);
666        track.set_balance(-0.3);
667        let state = make_state_with_track(track);
668        let job = OfflineBounceWork {
669            state: state.clone(),
670            track_name: "track".to_string(),
671            output_path: unique_temp_wav("cancel").to_string_lossy().to_string(),
672            start_sample: 0,
673            length_samples: 8,
674            tempo_bpm: 120.0,
675            tsig_num: 4,
676            tsig_denom: 4,
677            automation_lanes: vec![],
678            cancel: Arc::new(AtomicBool::new(true)),
679            apply_fader: false,
680        };
681
682        worker.process_offline_bounce(job).await;
683
684        match out_rx.recv().await.expect("message") {
685            Message::OfflineBounceFinished {
686                result: Ok(Action::TrackOfflineBounceCanceled { track_name }),
687            } => assert_eq!(track_name, "track"),
688            other => panic!("unexpected message: {other:?}"),
689        }
690        assert!(matches!(out_rx.recv().await, Some(Message::Ready(5))));
691        let track = state.lock().tracks.get("track").expect("track").lock();
692        assert_eq!(track.level(), -9.0);
693        assert_eq!(track.balance, -0.3);
694    }
695
696    #[tokio::test]
697    async fn process_offline_bounce_restores_track_state_on_write_failure() {
698        let (_rx_unused_tx, rx_unused) = channel(1);
699        let (tx, mut out_rx) = channel(8);
700        let worker = Worker {
701            id: 3,
702            rx: rx_unused,
703            tx,
704        };
705        let mut track = Track::new("track".to_string(), 1, 2, 0, 0, 4, 48_000.0);
706        track.set_level(-4.0);
707        track.set_balance(0.25);
708        let state = make_state_with_track(track);
709        let output_path = std::env::temp_dir().to_string_lossy().to_string();
710        let job = OfflineBounceWork {
711            state: state.clone(),
712            track_name: "track".to_string(),
713            output_path,
714            start_sample: 0,
715            length_samples: 4,
716            tempo_bpm: 120.0,
717            tsig_num: 4,
718            tsig_denom: 4,
719            automation_lanes: vec![],
720            cancel: Arc::new(AtomicBool::new(false)),
721            apply_fader: false,
722        };
723
724        worker.process_offline_bounce(job).await;
725
726        let mut saw_error = false;
727        while let Some(message) = out_rx.recv().await {
728            match message {
729                Message::OfflineBounceFinished {
730                    result: Ok(Action::TrackOfflineBounceProgress { .. }),
731                } => {}
732                Message::OfflineBounceFinished { result: Err(err) } => {
733                    assert!(
734                        err.contains("Failed to create offline bounce")
735                            || err.contains("Failed to write offline bounce")
736                            || err.contains("Failed to finalize offline bounce")
737                    );
738                    saw_error = true;
739                }
740                Message::Ready(3) => break,
741                other => panic!("unexpected message: {other:?}"),
742            }
743        }
744        assert!(saw_error);
745        let track = state.lock().tracks.get("track").expect("track").lock();
746        assert_eq!(track.level(), -4.0);
747        assert_eq!(track.balance, 0.25);
748    }
749
750    #[tokio::test]
751    async fn process_offline_bounce_emits_progress_and_completion() {
752        let (_rx_unused_tx, rx_unused) = channel(1);
753        let (tx, mut out_rx) = channel(16);
754        let worker = Worker {
755            id: 2,
756            rx: rx_unused,
757            tx,
758        };
759        let mut track = Track::new("track".to_string(), 1, 1, 0, 0, 4, 48_000.0);
760        track.set_level(-3.0);
761        track.set_balance(0.4);
762        let state = make_state_with_track(track);
763        let output = unique_temp_wav("success");
764        let job = OfflineBounceWork {
765            state: state.clone(),
766            track_name: "track".to_string(),
767            output_path: output.to_string_lossy().to_string(),
768            start_sample: 0,
769            length_samples: 8,
770            tempo_bpm: 120.0,
771            tsig_num: 4,
772            tsig_denom: 4,
773            automation_lanes: vec![],
774            cancel: Arc::new(AtomicBool::new(false)),
775            apply_fader: false,
776        };
777
778        worker.process_offline_bounce(job).await;
779
780        let mut saw_progress = false;
781        let mut saw_complete = false;
782        let mut saw_ready = false;
783        while let Some(message) = out_rx.recv().await {
784            match message {
785                Message::OfflineBounceFinished {
786                    result:
787                        Ok(Action::TrackOfflineBounceProgress {
788                            track_name,
789                            progress,
790                            ..
791                        }),
792                } => {
793                    assert_eq!(track_name, "track");
794                    assert!(progress > 0.0);
795                    saw_progress = true;
796                }
797                Message::OfflineBounceFinished {
798                    result:
799                        Ok(Action::TrackOfflineBounce {
800                            track_name,
801                            output_path,
802                            ..
803                        }),
804                } => {
805                    assert_eq!(track_name, "track");
806                    assert_eq!(output_path, output.to_string_lossy());
807                    saw_complete = true;
808                }
809                Message::Ready(2) => {
810                    saw_ready = true;
811                    break;
812                }
813                other => panic!("unexpected message: {other:?}"),
814            }
815        }
816
817        assert!(saw_progress);
818        assert!(saw_complete);
819        assert!(saw_ready);
820        assert!(output.exists());
821        std::fs::remove_file(&output).expect("remove temp wav");
822        let track = state.lock().tracks.get("track").expect("track").lock();
823        assert_eq!(track.level(), -3.0);
824        assert_eq!(track.balance, 0.4);
825        assert!(!track.muted);
826    }
827}