Skip to main content

maolan_engine/workers/
worker.rs

1use crate::message::{
2    Action, Message, OfflineAutomationLane, OfflineAutomationPoint, OfflineAutomationTarget,
3    OfflineBounceWork,
4};
5#[cfg(unix)]
6use nix::libc;
7use std::collections::HashSet;
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10use tokio::sync::mpsc::{Receiver, Sender};
11use tracing::{error, info};
12
13#[derive(Debug)]
14pub struct Worker {
15    id: usize,
16    rx: Receiver<Message>,
17    tx: Sender<Message>,
18    realtime_priority: i32,
19}
20
21impl Worker {
22    fn automation_lane_value_at(points: &[OfflineAutomationPoint], sample: usize) -> Option<f32> {
23        if points.is_empty() {
24            return None;
25        }
26        if sample <= points[0].sample {
27            return Some(points[0].value.clamp(0.0, 1.0));
28        }
29        if sample >= points[points.len().saturating_sub(1)].sample {
30            return Some(points[points.len().saturating_sub(1)].value.clamp(0.0, 1.0));
31        }
32        for segment in points.windows(2) {
33            let left = &segment[0];
34            let right = &segment[1];
35            if sample < left.sample || sample > right.sample {
36                continue;
37            }
38            let span = right.sample.saturating_sub(left.sample).max(1) as f32;
39            let t = (sample.saturating_sub(left.sample) as f32 / span).clamp(0.0, 1.0);
40            return Some((left.value + (right.value - left.value) * t).clamp(0.0, 1.0));
41        }
42        None
43    }
44
45    fn apply_freeze_automation_at_sample(
46        track: &mut crate::track::Track,
47        sample: usize,
48        lanes: &[OfflineAutomationLane],
49    ) {
50        for lane in lanes {
51            if matches!(
52                lane.target,
53                OfflineAutomationTarget::Volume | OfflineAutomationTarget::Balance
54            ) {
55                continue;
56            }
57            let Some(value) = Self::automation_lane_value_at(&lane.points, sample) else {
58                continue;
59            };
60            match lane.target {
61                OfflineAutomationTarget::Volume | OfflineAutomationTarget::Balance => {}
62                OfflineAutomationTarget::Mute => {
63                    track.set_muted(value >= 0.5);
64                }
65                #[cfg(all(unix, not(target_os = "macos")))]
66                OfflineAutomationTarget::Lv2Parameter {
67                    instance_id,
68                    index,
69                    min,
70                    max,
71                } => {
72                    let lo = min.min(max);
73                    let hi = max.max(min);
74                    let param_value = (lo + value * (hi - lo)).clamp(lo, hi);
75                    let _ = track.set_lv2_control_value(
76                        instance_id,
77                        index as usize,
78                        param_value as f64,
79                    );
80                }
81                OfflineAutomationTarget::Vst3Parameter {
82                    instance_id,
83                    param_id,
84                } => {
85                    let _ = track.set_vst3_parameter(instance_id, param_id, value.clamp(0.0, 1.0));
86                }
87                OfflineAutomationTarget::ClapParameter {
88                    instance_id,
89                    param_id,
90                    min,
91                    max,
92                } => {
93                    let lo = min.min(max);
94                    let hi = max.max(min);
95                    let param_value = (lo + value as f64 * (hi - lo)).clamp(lo, hi);
96                    let _ = track.set_clap_parameter_at(instance_id, param_id, param_value, 0);
97                }
98            }
99        }
100    }
101
102    fn prepare_track_for_freeze_render(track: &mut crate::track::Track) -> (f32, f32) {
103        let original_level = track.level();
104        let original_balance = track.balance;
105        track.set_level(0.0);
106        track.set_balance(0.0);
107        (original_level, original_balance)
108    }
109
110    fn restore_track_after_freeze_render(
111        track: &mut crate::track::Track,
112        original_level: f32,
113        original_balance: f32,
114    ) {
115        track.set_level(original_level);
116        track.set_balance(original_balance);
117    }
118
119    async fn process_offline_bounce(&self, job: OfflineBounceWork) {
120        let track_handle = job.state.lock().tracks.get(&job.track_name).cloned();
121        let Some(target_track) = track_handle else {
122            let _ = self
123                .tx
124                .send(Message::OfflineBounceFinished {
125                    result: Err(format!("Track not found: {}", job.track_name)),
126                })
127                .await;
128            return;
129        };
130        let (channels, block_size, sample_rate) = {
131            let t = target_track.lock();
132            let block_size = t
133                .audio
134                .outs
135                .first()
136                .map(|io| io.buffer.lock().len())
137                .or_else(|| t.audio.ins.first().map(|io| io.buffer.lock().len()))
138                .unwrap_or(0)
139                .max(1);
140            (
141                t.audio.outs.len().max(1),
142                block_size,
143                t.sample_rate.round().max(1.0) as i32,
144            )
145        };
146        let freeze_state = if job.apply_fader {
147            None
148        } else {
149            let t = target_track.lock();
150            Some(Self::prepare_track_for_freeze_render(t))
151        };
152
153        let all_tracks: Vec<_> = job.state.lock().tracks.values().cloned().collect();
154        let mut output_to_track: std::collections::HashMap<usize, String> =
155            std::collections::HashMap::new();
156        for handle in &all_tracks {
157            let t = handle.lock();
158            for out in &t.audio.outs {
159                output_to_track.insert(Arc::as_ptr(out) as usize, t.name.clone());
160            }
161        }
162        let mut relevant_names = HashSet::new();
163        let mut queue = vec![job.track_name.clone()];
164        while let Some(name) = queue.pop() {
165            if !relevant_names.insert(name.clone()) {
166                continue;
167            }
168            if let Some(handle) = all_tracks.iter().find(|h| h.lock().name == name) {
169                let t = handle.lock();
170                for input in &t.audio.ins {
171                    for conn in input.connections.lock().iter() {
172                        if let Some(source_name) =
173                            output_to_track.get(&(Arc::as_ptr(conn) as usize))
174                        {
175                            queue.push(source_name.clone());
176                        }
177                    }
178                }
179            }
180        }
181        let relevant_tracks: Vec<_> = all_tracks
182            .into_iter()
183            .filter(|h| relevant_names.contains(&h.lock().name))
184            .collect();
185
186        let spec = hound::WavSpec {
187            channels: channels as u16,
188            sample_rate: sample_rate as u32,
189            bits_per_sample: 32,
190            sample_format: hound::SampleFormat::Float,
191        };
192        let mut writer = match hound::WavWriter::create(&job.output_path, spec) {
193            Ok(w) => w,
194            Err(e) => {
195                if let Some((original_level, original_balance)) = freeze_state {
196                    let t = target_track.lock();
197                    Self::restore_track_after_freeze_render(t, original_level, original_balance);
198                }
199                let _ = self
200                    .tx
201                    .send(Message::OfflineBounceFinished {
202                        result: Err(format!(
203                            "Failed to create offline bounce '{}': {e}",
204                            job.output_path
205                        )),
206                    })
207                    .await;
208                let _ = self.tx.send(Message::Ready(self.id)).await;
209                return;
210            }
211        };
212
213        let mut cursor = 0usize;
214        let mut last_reported_progress = 0.0_f32;
215        let mut total_process_time = Duration::ZERO;
216        let mut total_write_time = Duration::ZERO;
217        let mut block_count = 0usize;
218        let bounce_start = Instant::now();
219        while cursor < job.length_samples {
220            if job.cancel.load(std::sync::atomic::Ordering::Relaxed) {
221                let _ = writer.finalize();
222                let _ = std::fs::remove_file(&job.output_path);
223                if let Some((original_level, original_balance)) = freeze_state {
224                    let t = target_track.lock();
225                    Self::restore_track_after_freeze_render(t, original_level, original_balance);
226                }
227                let _ = self
228                    .tx
229                    .send(Message::OfflineBounceFinished {
230                        result: Ok(Action::TrackOfflineBounceCanceled {
231                            track_name: job.track_name.clone(),
232                        }),
233                    })
234                    .await;
235                let _ = self.tx.send(Message::Ready(self.id)).await;
236                return;
237            }
238
239            let step = (job.length_samples - cursor).min(block_size);
240            for handle in &relevant_tracks {
241                let t = handle.lock();
242                t.audio.finished = false;
243                t.audio.processing = false;
244                t.set_transport_sample(job.start_sample.saturating_add(cursor));
245                t.set_loop_config(false, None);
246                t.set_transport_timing(job.tempo_bpm, job.tsig_num, job.tsig_denom);
247                t.set_clip_playback_enabled(true);
248                t.set_record_tap_enabled(false);
249            }
250
251            let block_process_start = Instant::now();
252            loop {
253                let mut all_finished = true;
254                let mut progressed = false;
255                for handle in &relevant_tracks {
256                    let t = handle.lock();
257                    if t.audio.finished {
258                        continue;
259                    }
260                    all_finished = false;
261                    if !t.audio.processing && t.audio.ready() {
262                        if t.name == job.track_name {
263                            Self::apply_freeze_automation_at_sample(
264                                t,
265                                job.start_sample.saturating_add(cursor),
266                                &job.automation_lanes,
267                            );
268                        }
269                        t.audio.processing = true;
270                        let p_start = Instant::now();
271                        t.process();
272                        total_process_time += p_start.elapsed();
273                        t.audio.processing = false;
274                        progressed = true;
275                    }
276                }
277                if all_finished {
278                    break;
279                }
280                if !progressed {
281                    for handle in &relevant_tracks {
282                        let t = handle.lock();
283                        if t.audio.finished {
284                            continue;
285                        }
286                        if t.name == job.track_name {
287                            Self::apply_freeze_automation_at_sample(
288                                t,
289                                job.start_sample.saturating_add(cursor),
290                                &job.automation_lanes,
291                            );
292                        }
293                        t.audio.processing = true;
294                        let p_start = Instant::now();
295                        t.process();
296                        total_process_time += p_start.elapsed();
297                        t.audio.processing = false;
298                    }
299                    break;
300                }
301            }
302            let _block_process_elapsed = block_process_start.elapsed();
303
304            let write_start = Instant::now();
305            let write_result = {
306                let t = target_track.lock();
307                let outs: Vec<_> = (0..channels)
308                    .map(|ch| t.audio.outs[ch].buffer.lock())
309                    .collect();
310                (|| -> Result<(), hound::Error> {
311                    for i in 0..step {
312                        for out in outs.iter().take(channels) {
313                            let sample = out.get(i).copied().unwrap_or(0.0);
314                            writer.write_sample(sample)?;
315                        }
316                    }
317                    Ok(())
318                })()
319            };
320            total_write_time += write_start.elapsed();
321            if let Err(e) = write_result {
322                let _ = writer.finalize();
323                let _ = std::fs::remove_file(&job.output_path);
324                if let Some((original_level, original_balance)) = freeze_state {
325                    let t = target_track.lock();
326                    Self::restore_track_after_freeze_render(t, original_level, original_balance);
327                }
328                let _ = self
329                    .tx
330                    .send(Message::OfflineBounceFinished {
331                        result: Err(format!(
332                            "Failed to write offline bounce '{}': {e}",
333                            job.output_path
334                        )),
335                    })
336                    .await;
337                let _ = self.tx.send(Message::Ready(self.id)).await;
338                return;
339            }
340
341            cursor = cursor.saturating_add(step);
342            block_count += 1;
343            let progress = (cursor as f32 / job.length_samples as f32).clamp(0.0, 1.0);
344
345            if progress - last_reported_progress >= 0.01 || cursor >= job.length_samples {
346                last_reported_progress = progress;
347                let _ = self
348                    .tx
349                    .send(Message::OfflineBounceFinished {
350                        result: Ok(Action::TrackOfflineBounceProgress {
351                            track_name: job.track_name.clone(),
352                            progress,
353                            operation: Some("Rendering freeze".to_string()),
354                        }),
355                    })
356                    .await;
357            }
358        }
359        let bounce_elapsed = bounce_start.elapsed();
360        info!(
361            "Bounce '{}' — total: {:?}, blocks: {}, process: {:?}, write: {:?}",
362            job.track_name, bounce_elapsed, block_count, total_process_time, total_write_time
363        );
364
365        if let Err(e) = writer.finalize() {
366            let _ = std::fs::remove_file(&job.output_path);
367            if let Some((original_level, original_balance)) = freeze_state {
368                let t = target_track.lock();
369                Self::restore_track_after_freeze_render(t, original_level, original_balance);
370            }
371            let _ = self
372                .tx
373                .send(Message::OfflineBounceFinished {
374                    result: Err(format!(
375                        "Failed to finalize offline bounce '{}': {e}",
376                        job.output_path
377                    )),
378                })
379                .await;
380            let _ = self.tx.send(Message::Ready(self.id)).await;
381            return;
382        }
383
384        if let Some((original_level, original_balance)) = freeze_state {
385            let t = target_track.lock();
386            Self::restore_track_after_freeze_render(t, original_level, original_balance);
387        }
388
389        let _ = self
390            .tx
391            .send(Message::OfflineBounceFinished {
392                result: Ok(Action::TrackOfflineBounce {
393                    track_name: job.track_name,
394                    output_path: job.output_path,
395                    start_sample: job.start_sample,
396                    length_samples: job.length_samples,
397                    automation_lanes: vec![],
398                    apply_fader: job.apply_fader,
399                }),
400            })
401            .await;
402        let _ = self.tx.send(Message::Ready(self.id)).await;
403    }
404
405    #[cfg(unix)]
406    fn try_enable_realtime(priority: i32) -> Result<(), String> {
407        let thread = unsafe { libc::pthread_self() };
408        let policy = libc::SCHED_FIFO;
409        let param = unsafe {
410            let mut p = std::mem::zeroed::<libc::sched_param>();
411            p.sched_priority = priority;
412            p
413        };
414        let rc = unsafe { libc::pthread_setschedparam(thread, policy, &param) };
415        if rc == 0 {
416            Ok(())
417        } else {
418            Err(format!("pthread_setschedparam failed with errno {}", rc))
419        }
420    }
421
422    #[cfg(not(unix))]
423    fn try_enable_realtime(_priority: i32) -> Result<(), String> {
424        Err("Realtime thread priority is not supported on this platform".to_string())
425    }
426
427    pub async fn new(
428        id: usize,
429        rx: Receiver<Message>,
430        tx: Sender<Message>,
431        realtime_priority: i32,
432    ) -> Worker {
433        let worker = Worker {
434            id,
435            rx,
436            tx,
437            realtime_priority,
438        };
439        worker.send(Message::Ready(id)).await;
440        worker
441    }
442
443    pub async fn send(&self, message: Message) {
444        self.tx
445            .send(message)
446            .await
447            .expect("Failed to send message from worker");
448    }
449
450    pub async fn work(&mut self) {
451        if let Err(e) = Self::try_enable_realtime(self.realtime_priority) {
452            error!("Worker {} realtime priority not enabled: {}", self.id, e);
453        }
454        while let Some(message) = self.rx.recv().await {
455            match message {
456                Message::Request(Action::Quit) => {
457                    return;
458                }
459                Message::ProcessTrack(t) => {
460                    let (track_name, output_linear, process_epoch, parameter_updates) = {
461                        let track = t.lock();
462                        let process_epoch = track.process_epoch;
463                        let started = Instant::now();
464                        track.process();
465                        let elapsed = started.elapsed();
466                        if elapsed.as_millis() > 20 {
467                            tracing::warn!(
468                                "Slow track process '{}' took {:.3} ms",
469                                track.name,
470                                elapsed.as_secs_f64() * 1000.0
471                            );
472                        }
473                        track.audio.processing = false;
474                        let updates = std::mem::take(track.echoed_parameter_updates.lock());
475                        (
476                            track.name.clone(),
477                            track.output_meter_linear(),
478                            process_epoch,
479                            updates,
480                        )
481                    };
482                    match self
483                        .tx
484                        .send(Message::Finished {
485                            worker_id: self.id,
486                            track_name,
487                            output_linear,
488                            process_epoch,
489                            parameter_updates,
490                        })
491                        .await
492                    {
493                        Ok(_) => {}
494                        Err(e) => {
495                            error!("Error while sending Finished: {}", e);
496                        }
497                    }
498                }
499                Message::ProcessOfflineBounce(job) => {
500                    self.process_offline_bounce(job).await;
501                }
502                _ => {}
503            }
504        }
505    }
506}
507
508#[cfg(test)]
509mod tests {
510    use super::Worker;
511    use crate::message::{
512        Action, Message, OfflineAutomationLane, OfflineAutomationPoint, OfflineAutomationTarget,
513        OfflineBounceWork,
514    };
515    use crate::mutex::UnsafeMutex;
516    use crate::state::State;
517    use crate::track::Track;
518    use std::path::PathBuf;
519    use std::sync::{Arc, atomic::AtomicBool};
520    use std::time::{SystemTime, UNIX_EPOCH};
521    use tokio::sync::mpsc::channel;
522
523    fn make_state_with_track(track: Track) -> Arc<UnsafeMutex<State>> {
524        let mut state = State::default();
525        state.tracks.insert(
526            track.name.clone(),
527            Arc::new(UnsafeMutex::new(Box::new(track))),
528        );
529        Arc::new(UnsafeMutex::new(state))
530    }
531
532    fn unique_temp_wav(name: &str) -> PathBuf {
533        let nanos = SystemTime::now()
534            .duration_since(UNIX_EPOCH)
535            .expect("clock")
536            .as_nanos();
537        std::env::temp_dir().join(format!("maolan_{name}_{nanos}.wav"))
538    }
539
540    #[test]
541    fn prepare_track_for_freeze_render_neutralizes_level_and_balance() {
542        let mut track = Track::new("track".to_string(), 1, 2, 0, 0, 64, 48_000.0);
543        track.set_level(-6.0);
544        track.set_balance(0.35);
545
546        let (level, balance) = Worker::prepare_track_for_freeze_render(&mut track);
547
548        assert_eq!(level, -6.0);
549        assert_eq!(balance, 0.35);
550        assert_eq!(track.level(), 0.0);
551        assert_eq!(track.balance, 0.0);
552
553        Worker::restore_track_after_freeze_render(&mut track, level, balance);
554        assert_eq!(track.level(), -6.0);
555        assert_eq!(track.balance, 0.35);
556    }
557
558    #[test]
559    fn freeze_automation_ignores_volume_and_balance_lanes() {
560        let mut track = Track::new("track".to_string(), 1, 2, 0, 0, 64, 48_000.0);
561        let lanes = vec![
562            OfflineAutomationLane {
563                target: OfflineAutomationTarget::Volume,
564                points: vec![OfflineAutomationPoint {
565                    sample: 0,
566                    value: 0.0,
567                }],
568            },
569            OfflineAutomationLane {
570                target: OfflineAutomationTarget::Balance,
571                points: vec![OfflineAutomationPoint {
572                    sample: 0,
573                    value: 1.0,
574                }],
575            },
576            OfflineAutomationLane {
577                target: OfflineAutomationTarget::Mute,
578                points: vec![OfflineAutomationPoint {
579                    sample: 0,
580                    value: 1.0,
581                }],
582            },
583        ];
584
585        Worker::apply_freeze_automation_at_sample(&mut track, 0, &lanes);
586
587        assert_eq!(track.level(), 0.0);
588        assert_eq!(track.balance, 0.0);
589        assert!(track.muted);
590    }
591
592    #[test]
593    fn automation_lane_value_at_interpolates_between_points() {
594        let value = Worker::automation_lane_value_at(
595            &[
596                OfflineAutomationPoint {
597                    sample: 10,
598                    value: 0.25,
599                },
600                OfflineAutomationPoint {
601                    sample: 20,
602                    value: 0.75,
603                },
604            ],
605            15,
606        )
607        .expect("value");
608
609        assert!((value - 0.5).abs() < 1.0e-6);
610    }
611
612    #[test]
613    fn freeze_automation_applies_interpolated_mute_lane() {
614        let mut track = Track::new("track".to_string(), 1, 1, 0, 0, 64, 48_000.0);
615        let lanes = vec![OfflineAutomationLane {
616            target: OfflineAutomationTarget::Mute,
617            points: vec![
618                OfflineAutomationPoint {
619                    sample: 0,
620                    value: 0.0,
621                },
622                OfflineAutomationPoint {
623                    sample: 10,
624                    value: 1.0,
625                },
626            ],
627        }];
628
629        Worker::apply_freeze_automation_at_sample(&mut track, 5, &lanes);
630        assert!(track.muted);
631
632        track.set_muted(false);
633        Worker::apply_freeze_automation_at_sample(&mut track, 2, &lanes);
634        assert!(!track.muted);
635    }
636
637    #[tokio::test]
638    async fn process_offline_bounce_errors_when_track_is_missing() {
639        let (_rx_unused_tx, rx_unused) = channel(1);
640        let (tx, mut out_rx) = channel(8);
641        let worker = Worker {
642            id: 7,
643            rx: rx_unused,
644            tx,
645            realtime_priority: 0,
646        };
647        let job = OfflineBounceWork {
648            state: Arc::new(UnsafeMutex::new(State::default())),
649            track_name: "missing".to_string(),
650            output_path: unique_temp_wav("missing").to_string_lossy().to_string(),
651            start_sample: 0,
652            length_samples: 8,
653            tempo_bpm: 120.0,
654            tsig_num: 4,
655            tsig_denom: 4,
656            automation_lanes: vec![],
657            cancel: Arc::new(AtomicBool::new(false)),
658            apply_fader: false,
659        };
660
661        worker.process_offline_bounce(job).await;
662
663        match out_rx.recv().await.expect("message") {
664            Message::OfflineBounceFinished { result: Err(err) } => {
665                assert!(err.contains("Track not found: missing"));
666            }
667            other => panic!("unexpected message: {other:?}"),
668        }
669    }
670
671    #[tokio::test]
672    async fn process_offline_bounce_cancels_and_restores_track_state() {
673        let (_rx_unused_tx, rx_unused) = channel(1);
674        let (tx, mut out_rx) = channel(8);
675        let worker = Worker {
676            id: 5,
677            rx: rx_unused,
678            tx,
679            realtime_priority: 0,
680        };
681        let mut track = Track::new("track".to_string(), 1, 2, 0, 0, 4, 48_000.0);
682        track.set_level(-9.0);
683        track.set_balance(-0.3);
684        let state = make_state_with_track(track);
685        let job = OfflineBounceWork {
686            state: state.clone(),
687            track_name: "track".to_string(),
688            output_path: unique_temp_wav("cancel").to_string_lossy().to_string(),
689            start_sample: 0,
690            length_samples: 8,
691            tempo_bpm: 120.0,
692            tsig_num: 4,
693            tsig_denom: 4,
694            automation_lanes: vec![],
695            cancel: Arc::new(AtomicBool::new(true)),
696            apply_fader: false,
697        };
698
699        worker.process_offline_bounce(job).await;
700
701        match out_rx.recv().await.expect("message") {
702            Message::OfflineBounceFinished {
703                result: Ok(Action::TrackOfflineBounceCanceled { track_name }),
704            } => assert_eq!(track_name, "track"),
705            other => panic!("unexpected message: {other:?}"),
706        }
707        assert!(matches!(out_rx.recv().await, Some(Message::Ready(5))));
708        let track = state.lock().tracks.get("track").expect("track").lock();
709        assert_eq!(track.level(), -9.0);
710        assert_eq!(track.balance, -0.3);
711    }
712
713    #[tokio::test]
714    async fn process_offline_bounce_restores_track_state_on_write_failure() {
715        let (_rx_unused_tx, rx_unused) = channel(1);
716        let (tx, mut out_rx) = channel(8);
717        let worker = Worker {
718            id: 3,
719            rx: rx_unused,
720            tx,
721            realtime_priority: 0,
722        };
723        let mut track = Track::new("track".to_string(), 1, 2, 0, 0, 4, 48_000.0);
724        track.set_level(-4.0);
725        track.set_balance(0.25);
726        let state = make_state_with_track(track);
727        let output_path = std::env::temp_dir().to_string_lossy().to_string();
728        let job = OfflineBounceWork {
729            state: state.clone(),
730            track_name: "track".to_string(),
731            output_path,
732            start_sample: 0,
733            length_samples: 4,
734            tempo_bpm: 120.0,
735            tsig_num: 4,
736            tsig_denom: 4,
737            automation_lanes: vec![],
738            cancel: Arc::new(AtomicBool::new(false)),
739            apply_fader: false,
740        };
741
742        worker.process_offline_bounce(job).await;
743
744        let mut saw_error = false;
745        while let Some(message) = out_rx.recv().await {
746            match message {
747                Message::OfflineBounceFinished {
748                    result: Ok(Action::TrackOfflineBounceProgress { .. }),
749                } => {}
750                Message::OfflineBounceFinished { result: Err(err) } => {
751                    assert!(
752                        err.contains("Failed to create offline bounce")
753                            || err.contains("Failed to write offline bounce")
754                            || err.contains("Failed to finalize offline bounce")
755                    );
756                    saw_error = true;
757                }
758                Message::Ready(3) => break,
759                other => panic!("unexpected message: {other:?}"),
760            }
761        }
762        assert!(saw_error);
763        let track = state.lock().tracks.get("track").expect("track").lock();
764        assert_eq!(track.level(), -4.0);
765        assert_eq!(track.balance, 0.25);
766    }
767
768    #[tokio::test]
769    async fn process_offline_bounce_emits_progress_and_completion() {
770        let (_rx_unused_tx, rx_unused) = channel(1);
771        let (tx, mut out_rx) = channel(16);
772        let worker = Worker {
773            id: 2,
774            rx: rx_unused,
775            tx,
776            realtime_priority: 0,
777        };
778        let mut track = Track::new("track".to_string(), 1, 1, 0, 0, 4, 48_000.0);
779        track.set_level(-3.0);
780        track.set_balance(0.4);
781        let state = make_state_with_track(track);
782        let output = unique_temp_wav("success");
783        let job = OfflineBounceWork {
784            state: state.clone(),
785            track_name: "track".to_string(),
786            output_path: output.to_string_lossy().to_string(),
787            start_sample: 0,
788            length_samples: 8,
789            tempo_bpm: 120.0,
790            tsig_num: 4,
791            tsig_denom: 4,
792            automation_lanes: vec![],
793            cancel: Arc::new(AtomicBool::new(false)),
794            apply_fader: false,
795        };
796
797        worker.process_offline_bounce(job).await;
798
799        let mut saw_progress = false;
800        let mut saw_complete = false;
801        let mut saw_ready = false;
802        while let Some(message) = out_rx.recv().await {
803            match message {
804                Message::OfflineBounceFinished {
805                    result:
806                        Ok(Action::TrackOfflineBounceProgress {
807                            track_name,
808                            progress,
809                            ..
810                        }),
811                } => {
812                    assert_eq!(track_name, "track");
813                    assert!(progress > 0.0);
814                    saw_progress = true;
815                }
816                Message::OfflineBounceFinished {
817                    result:
818                        Ok(Action::TrackOfflineBounce {
819                            track_name,
820                            output_path,
821                            ..
822                        }),
823                } => {
824                    assert_eq!(track_name, "track");
825                    assert_eq!(output_path, output.to_string_lossy());
826                    saw_complete = true;
827                }
828                Message::Ready(2) => {
829                    saw_ready = true;
830                    break;
831                }
832                other => panic!("unexpected message: {other:?}"),
833            }
834        }
835
836        assert!(saw_progress);
837        assert!(saw_complete);
838        assert!(saw_ready);
839        assert!(output.exists());
840        std::fs::remove_file(&output).expect("remove temp wav");
841        let track = state.lock().tracks.get("track").expect("track").lock();
842        assert_eq!(track.level(), -3.0);
843        assert_eq!(track.balance, 0.4);
844        assert!(!track.muted);
845    }
846}