Skip to main content

maolan_engine/workers/
worker.rs

1use crate::message::{
2    Action, Message, OfflineAutomationLane, OfflineAutomationPoint, OfflineAutomationTarget,
3    OfflineBounceWork,
4};
5#[cfg(unix)]
6use nix::libc;
7use tokio::sync::mpsc::{Receiver, Sender};
8use tracing::error;
9use wavers::write as write_wav;
10
11#[derive(Debug)]
12pub struct Worker {
13    id: usize,
14    rx: Receiver<Message>,
15    tx: Sender<Message>,
16}
17
18impl Worker {
19    fn automation_lane_value_at(points: &[OfflineAutomationPoint], sample: usize) -> Option<f32> {
20        if points.is_empty() {
21            return None;
22        }
23        if sample <= points[0].sample {
24            return Some(points[0].value.clamp(0.0, 1.0));
25        }
26        if sample >= points[points.len().saturating_sub(1)].sample {
27            return Some(points[points.len().saturating_sub(1)].value.clamp(0.0, 1.0));
28        }
29        for segment in points.windows(2) {
30            let left = &segment[0];
31            let right = &segment[1];
32            if sample < left.sample || sample > right.sample {
33                continue;
34            }
35            let span = right.sample.saturating_sub(left.sample).max(1) as f32;
36            let t = (sample.saturating_sub(left.sample) as f32 / span).clamp(0.0, 1.0);
37            return Some((left.value + (right.value - left.value) * t).clamp(0.0, 1.0));
38        }
39        None
40    }
41
42    fn apply_offline_automation_at_sample(
43        track: &mut crate::track::Track,
44        sample: usize,
45        lanes: &[OfflineAutomationLane],
46    ) {
47        for lane in lanes {
48            let Some(value) = Self::automation_lane_value_at(&lane.points, sample) else {
49                continue;
50            };
51            match lane.target {
52                OfflineAutomationTarget::Volume => {
53                    track.set_level((-90.0 + value * 110.0).clamp(-90.0, 20.0));
54                }
55                OfflineAutomationTarget::Balance => {
56                    track.set_balance((value * 2.0 - 1.0).clamp(-1.0, 1.0));
57                }
58                OfflineAutomationTarget::Mute => {
59                    track.set_muted(value >= 0.5);
60                }
61                #[cfg(all(unix, not(target_os = "macos")))]
62                OfflineAutomationTarget::Lv2Parameter {
63                    instance_id,
64                    index,
65                    min,
66                    max,
67                } => {
68                    let lo = min.min(max);
69                    let hi = max.max(min);
70                    let param_value = (lo + value * (hi - lo)).clamp(lo, hi);
71                    let _ = track.set_lv2_control_value(instance_id, index, param_value);
72                }
73                OfflineAutomationTarget::Vst3Parameter {
74                    instance_id,
75                    param_id,
76                } => {
77                    let _ = track.set_vst3_parameter(instance_id, param_id, value.clamp(0.0, 1.0));
78                }
79                OfflineAutomationTarget::ClapParameter {
80                    instance_id,
81                    param_id,
82                    min,
83                    max,
84                } => {
85                    let lo = min.min(max);
86                    let hi = max.max(min);
87                    let param_value = (lo + value as f64 * (hi - lo)).clamp(lo, hi);
88                    let _ = track.set_clap_parameter_at(instance_id, param_id, param_value, 0);
89                }
90            }
91        }
92    }
93
94    async fn process_offline_bounce(&self, job: OfflineBounceWork) {
95        let track_handle = job.state.lock().tracks.get(&job.track_name).cloned();
96        let Some(target_track) = track_handle else {
97            let _ = self
98                .tx
99                .send(Message::OfflineBounceFinished {
100                    result: Err(format!("Track not found: {}", job.track_name)),
101                })
102                .await;
103            return;
104        };
105        let (channels, block_size, sample_rate) = {
106            let t = target_track.lock();
107            let block_size = t
108                .audio
109                .outs
110                .first()
111                .map(|io| io.buffer.lock().len())
112                .or_else(|| t.audio.ins.first().map(|io| io.buffer.lock().len()))
113                .unwrap_or(0)
114                .max(1);
115            (
116                t.audio.outs.len().max(1),
117                block_size,
118                t.sample_rate.round().max(1.0) as i32,
119            )
120        };
121
122        let mut rendered = vec![0.0_f32; job.length_samples.saturating_mul(channels)];
123        let mut cursor = 0usize;
124        while cursor < job.length_samples {
125            if job.cancel.load(std::sync::atomic::Ordering::Relaxed) {
126                let _ = self
127                    .tx
128                    .send(Message::OfflineBounceFinished {
129                        result: Ok(Action::TrackOfflineBounceCanceled {
130                            track_name: job.track_name.clone(),
131                        }),
132                    })
133                    .await;
134                let _ = self.tx.send(Message::Ready(self.id)).await;
135                return;
136            }
137
138            let step = (job.length_samples - cursor).min(block_size);
139            let tracks: Vec<_> = job.state.lock().tracks.values().cloned().collect();
140            for handle in &tracks {
141                let t = handle.lock();
142                t.audio.finished = false;
143                t.audio.processing = false;
144                t.set_transport_sample(job.start_sample.saturating_add(cursor));
145                t.set_loop_config(false, None);
146                t.set_transport_timing(job.tempo_bpm, job.tsig_num, job.tsig_denom);
147                t.set_clip_playback_enabled(true);
148                t.set_record_tap_enabled(false);
149            }
150
151            let mut remaining = tracks.len();
152            while remaining > 0 {
153                let mut progressed = false;
154                for handle in &tracks {
155                    let t = handle.lock();
156                    if t.audio.finished || t.audio.processing {
157                        continue;
158                    }
159                    if t.audio.ready() {
160                        if t.name == job.track_name {
161                            Self::apply_offline_automation_at_sample(
162                                t,
163                                job.start_sample.saturating_add(cursor),
164                                &job.automation_lanes,
165                            );
166                        }
167                        t.audio.processing = true;
168                        t.process();
169                        t.audio.processing = false;
170                        progressed = true;
171                        remaining = remaining.saturating_sub(1);
172                    }
173                }
174                if !progressed {
175                    for handle in &tracks {
176                        let t = handle.lock();
177                        if t.audio.finished {
178                            continue;
179                        }
180                        if t.name == job.track_name {
181                            Self::apply_offline_automation_at_sample(
182                                t,
183                                job.start_sample.saturating_add(cursor),
184                                &job.automation_lanes,
185                            );
186                        }
187                        t.audio.processing = true;
188                        t.process();
189                        t.audio.processing = false;
190                        remaining = remaining.saturating_sub(1);
191                    }
192                }
193            }
194
195            {
196                let t = target_track.lock();
197                for ch in 0..channels {
198                    let out = t.audio.outs[ch].buffer.lock();
199                    let copy_len = step.min(out.len());
200                    for i in 0..copy_len {
201                        let dst = (cursor + i) * channels + ch;
202                        rendered[dst] = out[i];
203                    }
204                }
205            }
206
207            cursor = cursor.saturating_add(step);
208            let _ = self
209                .tx
210                .send(Message::OfflineBounceFinished {
211                    result: Ok(Action::TrackOfflineBounceProgress {
212                        track_name: job.track_name.clone(),
213                        progress: (cursor as f32 / job.length_samples as f32).clamp(0.0, 1.0),
214                        operation: Some("Rendering freeze".to_string()),
215                    }),
216                })
217                .await;
218        }
219
220        if let Err(e) =
221            write_wav::<f32, _>(&job.output_path, &rendered, sample_rate, channels as u16)
222        {
223            let _ = self
224                .tx
225                .send(Message::OfflineBounceFinished {
226                    result: Err(format!(
227                        "Failed to write offline bounce '{}': {e}",
228                        job.output_path
229                    )),
230                })
231                .await;
232            let _ = self.tx.send(Message::Ready(self.id)).await;
233            return;
234        }
235
236        let _ = self
237            .tx
238            .send(Message::OfflineBounceFinished {
239                result: Ok(Action::TrackOfflineBounce {
240                    track_name: job.track_name,
241                    output_path: job.output_path,
242                    start_sample: job.start_sample,
243                    length_samples: job.length_samples,
244                    automation_lanes: vec![],
245                }),
246            })
247            .await;
248        let _ = self.tx.send(Message::Ready(self.id)).await;
249    }
250
251    #[cfg(unix)]
252    fn try_enable_realtime() -> Result<(), String> {
253        let thread = unsafe { libc::pthread_self() };
254        let policy = libc::SCHED_FIFO;
255        let param = unsafe {
256            let mut p = std::mem::zeroed::<libc::sched_param>();
257            p.sched_priority = 10;
258            p
259        };
260        let rc = unsafe { libc::pthread_setschedparam(thread, policy, &param) };
261        if rc == 0 {
262            Ok(())
263        } else {
264            Err(format!("pthread_setschedparam failed with errno {}", rc))
265        }
266    }
267
268    #[cfg(not(unix))]
269    fn try_enable_realtime() -> Result<(), String> {
270        Err("Realtime thread priority is not supported on this platform".to_string())
271    }
272
273    pub async fn new(id: usize, rx: Receiver<Message>, tx: Sender<Message>) -> Worker {
274        let worker = Worker { id, rx, tx };
275        worker.send(Message::Ready(id)).await;
276        worker
277    }
278
279    pub async fn send(&self, message: Message) {
280        self.tx
281            .send(message)
282            .await
283            .expect("Failed to send message from worker");
284    }
285
286    pub async fn work(&mut self) {
287        if let Err(e) = Self::try_enable_realtime() {
288            error!("Worker {} realtime priority not enabled: {}", self.id, e);
289        }
290        while let Some(message) = self.rx.recv().await {
291            match message {
292                Message::Request(Action::Quit) => {
293                    return;
294                }
295                Message::ProcessTrack(t) => {
296                    let (track_name, output_linear, process_epoch) = {
297                        let track = t.lock();
298                        let process_epoch = track.process_epoch;
299                        track.process();
300                        track.audio.processing = false;
301                        (
302                            track.name.clone(),
303                            track.output_meter_linear(),
304                            process_epoch,
305                        )
306                    };
307                    match self
308                        .tx
309                        .send(Message::Finished {
310                            worker_id: self.id,
311                            track_name,
312                            output_linear,
313                            process_epoch,
314                        })
315                        .await
316                    {
317                        Ok(_) => {}
318                        Err(e) => {
319                            error!("Error while sending Finished: {}", e);
320                        }
321                    }
322                }
323                Message::ProcessOfflineBounce(job) => {
324                    self.process_offline_bounce(job).await;
325                }
326                _ => {}
327            }
328        }
329    }
330}