Skip to main content

maolan_engine/workers/
hw_worker.rs

1use crate::{
2    hw::config,
3    hw::traits::{HwMidiHub, HwWorkerDriver},
4    message::{HwMidiEvent, Message},
5    mutex::UnsafeMutex,
6};
7#[cfg(unix)]
8use nix::libc;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::{Arc, Condvar, Mutex};
11use std::thread::JoinHandle;
12use std::time::{Duration, Instant};
13use tokio::sync::mpsc::{Receiver, Sender};
14use tracing::error;
15
16pub trait Backend: Send + Sync + 'static {
17    type Driver: HwWorkerDriver + Send + 'static;
18    type MidiHub: HwMidiHub + Send + 'static;
19
20    const LABEL: &'static str;
21    const WORKER_THREAD_NAME: &'static str;
22    const ASSIST_THREAD_NAME: &'static str;
23    const ASSIST_AUTONOMOUS_ENV: &'static str;
24    const ASSIST_AUTONOMOUS_DEFAULT: bool = false;
25    const CYCLE_ON_WORKER_WHEN_ASSIST_AUTONOMOUS: bool = false;
26    const ASSIST_STEP_REQUIRES_REQUEST_CYCLE: bool = false;
27}
28
29#[derive(Debug)]
30pub struct HwWorker<B: Backend> {
31    driver: Arc<UnsafeMutex<B::Driver>>,
32    midi_hub: Arc<UnsafeMutex<B::MidiHub>>,
33    rx: Receiver<Message>,
34    tx: Sender<Message>,
35    cycle_frames: u32,
36    pending_midi_out_events: Vec<HwMidiEvent>,
37    pending_midi_out_sorted: bool,
38    midi_stop: Arc<AtomicBool>,
39    assist_state: Arc<(Mutex<AssistState>, Condvar)>,
40}
41
42impl<B: Backend> Drop for HwWorker<B> {
43    fn drop(&mut self) {
44        self.driver.lock().request_stop();
45        self.midi_stop.store(true, Ordering::Release);
46        {
47            let midi_hub = self.midi_hub.lock();
48            midi_hub.wake_input_waiter();
49            midi_hub.close_input_waiter();
50        }
51        {
52            let (lock, cvar) = &*self.assist_state;
53            if let Ok(mut st) = lock.lock() {
54                st.shutdown = true;
55                cvar.notify_one();
56            }
57        }
58    }
59}
60
61#[derive(Debug, Default)]
62struct AssistState {
63    shutdown: bool,
64    request_seq: u64,
65    done_seq: u64,
66    init_complete: bool,
67    last_error: Option<String>,
68}
69
70#[cfg(unix)]
71const RT_POLICY: i32 = libc::SCHED_FIFO;
72const RT_PRIORITY_WORKER: i32 = 18;
73const RT_PRIORITY_ASSIST: i32 = 12;
74const PROFILE_INTERVAL: Duration = Duration::from_secs(1);
75
76#[derive(Debug)]
77struct AssistProfiler {
78    report_at: Instant,
79    cycle_count: u64,
80    cycle_err_count: u64,
81    cycle_time_ns: u128,
82    step_count: u64,
83    step_work_count: u64,
84    step_err_count: u64,
85    step_time_ns: u128,
86    wait_count: u64,
87    wait_time_ns: u128,
88}
89
90impl AssistProfiler {
91    fn new() -> Self {
92        Self {
93            report_at: Instant::now() + PROFILE_INTERVAL,
94            cycle_count: 0,
95            cycle_err_count: 0,
96            cycle_time_ns: 0,
97            step_count: 0,
98            step_work_count: 0,
99            step_err_count: 0,
100            step_time_ns: 0,
101            wait_count: 0,
102            wait_time_ns: 0,
103        }
104    }
105
106    fn maybe_report(&mut self, cycle_samples: usize, sample_rate: i32, label: &str) {
107        let now = Instant::now();
108        if now < self.report_at {
109            return;
110        }
111        let cycle_avg_us = if self.cycle_count > 0 {
112            (self.cycle_time_ns / self.cycle_count as u128) as f64 / 1_000.0
113        } else {
114            0.0
115        };
116        let step_avg_us = if self.step_count > 0 {
117            (self.step_time_ns / self.step_count as u128) as f64 / 1_000.0
118        } else {
119            0.0
120        };
121        let wait_avg_us = if self.wait_count > 0 {
122            (self.wait_time_ns / self.wait_count as u128) as f64 / 1_000.0
123        } else {
124            0.0
125        };
126        let expected_cycles_per_sec = if cycle_samples > 0 && sample_rate > 0 {
127            sample_rate as f64 / cycle_samples as f64
128        } else {
129            0.0
130        };
131        error!(
132            "{} profile: expected_cps={:.1} cycles={} cycle_err={} cycle_avg_us={:.1} steps={} steps_work={} step_err={} step_avg_us={:.1} waits={} wait_avg_us={:.1}",
133            label,
134            expected_cycles_per_sec,
135            self.cycle_count,
136            self.cycle_err_count,
137            cycle_avg_us,
138            self.step_count,
139            self.step_work_count,
140            self.step_err_count,
141            step_avg_us,
142            self.wait_count,
143            wait_avg_us
144        );
145        self.report_at = now + PROFILE_INTERVAL;
146        self.cycle_count = 0;
147        self.cycle_err_count = 0;
148        self.cycle_time_ns = 0;
149        self.step_count = 0;
150        self.step_work_count = 0;
151        self.step_err_count = 0;
152        self.step_time_ns = 0;
153        self.wait_count = 0;
154        self.wait_time_ns = 0;
155    }
156}
157
158impl<B: Backend> HwWorker<B> {
159    fn profile_enabled() -> bool {
160        config::env_flag(config::HW_PROFILE_ENV)
161    }
162
163    fn assist_autonomous_enabled() -> bool {
164        B::ASSIST_AUTONOMOUS_DEFAULT || config::env_flag(B::ASSIST_AUTONOMOUS_ENV)
165    }
166
167    fn configure_rt_thread(name: &str, priority: i32) -> Result<(), String> {
168        #[cfg(unix)]
169        {
170            let thread = unsafe { libc::pthread_self() };
171            #[cfg(any(target_os = "linux", target_os = "freebsd", target_os = "openbsd"))]
172            let c_name = std::ffi::CString::new(name).map_err(|e| e.to_string())?;
173            #[cfg(target_os = "linux")]
174            unsafe {
175                let _ = libc::pthread_setname_np(thread, c_name.as_ptr());
176            }
177            #[cfg(any(target_os = "freebsd", target_os = "openbsd"))]
178            unsafe {
179                libc::pthread_set_name_np(thread, c_name.as_ptr());
180            }
181
182            let param = unsafe {
183                let mut p = std::mem::zeroed::<libc::sched_param>();
184                p.sched_priority = priority;
185                p
186            };
187            let rc = unsafe { libc::pthread_setschedparam(thread, RT_POLICY, &param) };
188            if rc != 0 {
189                return Err(format!(
190                    "pthread_setschedparam({}, prio {}) failed with errno {}",
191                    name, priority, rc
192                ));
193            }
194
195            let mut actual_policy = 0_i32;
196            let mut actual_param = unsafe { std::mem::zeroed::<libc::sched_param>() };
197            let rc = unsafe {
198                libc::pthread_getschedparam(thread, &mut actual_policy, &mut actual_param)
199            };
200            if rc != 0 {
201                return Err(format!(
202                    "pthread_getschedparam({}) failed with errno {}",
203                    name, rc
204                ));
205            }
206            if actual_policy != RT_POLICY || actual_param.sched_priority != priority {
207                return Err(format!(
208                    "realtime verification failed for {}: policy {}, prio {}",
209                    name, actual_policy, actual_param.sched_priority
210                ));
211            }
212            Ok(())
213        }
214        #[cfg(not(unix))]
215        {
216            let _ = name;
217            let _ = priority;
218            Err("Realtime thread priority is not supported on this platform".to_string())
219        }
220    }
221
222    fn lock_memory_pages() -> Result<(), String> {
223        #[cfg(unix)]
224        {
225            let rc = unsafe { libc::mlockall(libc::MCL_CURRENT | libc::MCL_FUTURE) };
226            if rc == 0 {
227                Ok(())
228            } else {
229                Err(format!(
230                    "mlockall(MCL_CURRENT|MCL_FUTURE) failed: {}",
231                    std::io::Error::last_os_error()
232                ))
233            }
234        }
235        #[cfg(not(unix))]
236        {
237            Err("mlockall is not supported on this platform".to_string())
238        }
239    }
240
241    pub fn new(
242        driver: Arc<UnsafeMutex<B::Driver>>,
243        midi_hub: Arc<UnsafeMutex<B::MidiHub>>,
244        rx: Receiver<Message>,
245        tx: Sender<Message>,
246    ) -> Self {
247        let cycle_frames = {
248            let d = driver.lock();
249            d.cycle_samples() as u32
250        };
251        Self {
252            driver,
253            midi_hub,
254            rx,
255            tx,
256            cycle_frames,
257            pending_midi_out_events: vec![],
258            pending_midi_out_sorted: true,
259            midi_stop: Arc::new(AtomicBool::new(false)),
260            assist_state: Arc::new((Mutex::new(AssistState::default()), Condvar::new())),
261        }
262    }
263
264    pub async fn work(mut self) {
265        crate::enable_flush_denormals_to_zero();
266        if let Err(e) = Self::lock_memory_pages() {
267            error!("{} worker memory lock not enabled: {}", B::LABEL, e);
268        }
269        if let Err(e) = Self::configure_rt_thread(B::WORKER_THREAD_NAME, RT_PRIORITY_WORKER) {
270            error!("{} worker realtime priority not enabled: {}", B::LABEL, e);
271        }
272        #[cfg(target_os = "macos")]
273        unsafe {
274            libc::pthread_set_qos_class_self_np(libc::qos_class_t::QOS_CLASS_USER_INTERACTIVE, 0);
275        }
276
277        #[cfg(unix)]
278        {
279            let has_fds = self.driver.lock().capture_fd().is_some()
280                && self.driver.lock().playback_fd().is_some();
281            if has_fds {
282                self.work_async().await;
283                return;
284            }
285        }
286
287        self.work_legacy().await;
288    }
289
290    #[cfg(unix)]
291    async fn work_async(&mut self) {
292        let midi_handle = Self::start_midi_input_thread(
293            self.midi_hub.clone(),
294            self.tx.clone(),
295            self.cycle_frames,
296            self.midi_stop.clone(),
297        );
298        let mut cycle_running = false;
299        let (cycle_tx, mut cycle_rx) = tokio::sync::mpsc::channel::<Result<(), String>>(1);
300        loop {
301            tokio::select! {
302                msg = self.rx.recv() => {
303                    let msg = match msg {
304                        Some(m) => m,
305                        None => {
306                            self.driver.lock().request_stop();
307                            if cycle_running {
308                                let _ = cycle_rx.recv().await;
309                            }
310                            self.shutdown_channel_closed(midi_handle);
311                            return;
312                        }
313                    };
314                    match msg {
315                        Message::Request(crate::message::Action::Quit) => {
316                            self.driver.lock().request_stop();
317                            if cycle_running {
318                                // Wait for the in-flight cycle to finish so the
319                                // spawn_blocking task releases the driver before
320                                // exit(0) closes the fds.
321                                let _ = cycle_rx.recv().await;
322                            }
323                            self.shutdown_quit(midi_handle);
324                            return;
325                        }
326                        Message::TracksFinished => {
327                            self.flush_pending_midi_out();
328                            if !cycle_running {
329                                cycle_running = true;
330                                let driver = self.driver.clone();
331                                let tx = cycle_tx.clone();
332                                tokio::task::spawn_blocking(move || {
333                                    // The tokio blocking thread that executes the audio cycle
334                                    // does not inherit the async worker thread's realtime
335                                    // priority. Configure it here so OSS/ALSA cycles run with
336                                    // the same SCHED_FIFO priority as the legacy assist thread.
337                                    if let Err(e) = Self::configure_rt_thread(
338                                        B::WORKER_THREAD_NAME,
339                                        RT_PRIORITY_WORKER,
340                                    ) {
341                                        static WARNED: std::sync::atomic::AtomicBool =
342                                            std::sync::atomic::AtomicBool::new(false);
343                                        if !WARNED
344                                            .swap(true, std::sync::atomic::Ordering::Relaxed)
345                                        {
346                                            tracing::warn!(
347                                                "{} cycle thread realtime priority not enabled: {}",
348                                                B::LABEL,
349                                                e
350                                            );
351                                        }
352                                    }
353                                    let result = driver.lock().run_cycle_for_worker();
354                                    let _ = tx.blocking_send(result);
355                                });
356                            }
357                        }
358                        Message::HWMidiOutEvents(mut events) => {
359                            self.pending_midi_out_events.append(&mut events);
360                            self.pending_midi_out_sorted = false;
361                        }
362                        Message::ClearHWMidiOutEvents => {
363                            self.pending_midi_out_events.clear();
364                            self.pending_midi_out_sorted = true;
365                        }
366                        _ => {}
367                    }
368                }
369                result = cycle_rx.recv(), if cycle_running => {
370                    cycle_running = false;
371                    if let Some(Err(e)) = result {
372                        error!("{} cycle error: {}", B::LABEL, e);
373                        let _ = self.tx.send(Message::Response(Err(format!(
374                            "{} cycle error: {}", B::LABEL, e
375                        )))).await;
376                    }
377                    if let Err(e) = self.tx.send(Message::HWFinished).await {
378                        error!("{} worker failed to send HWFinished: {}", B::LABEL, e);
379                    }
380                }
381            }
382        }
383    }
384
385    async fn work_legacy(&mut self) {
386        let assist_handle =
387            Self::start_assist_thread(self.driver.clone(), self.assist_state.clone());
388        let midi_handle = Self::start_midi_input_thread(
389            self.midi_hub.clone(),
390            self.tx.clone(),
391            self.cycle_frames,
392            self.midi_stop.clone(),
393        );
394        loop {
395            let msg = match self.rx.recv().await {
396                Some(msg) => msg,
397                None => {
398                    self.driver.lock().request_stop();
399                    self.shutdown_midi(midi_handle);
400                    Self::stop_assist_thread(&self.assist_state, assist_handle);
401                    self.driver.lock().request_stop();
402                    return;
403                }
404            };
405            match msg {
406                Message::Request(crate::message::Action::Quit) => {
407                    self.driver.lock().request_stop();
408                    self.flush_pending_midi_out();
409                    self.shutdown_midi(midi_handle);
410                    Self::stop_assist_thread(&self.assist_state, assist_handle);
411                    self.driver.lock().request_stop();
412                    return;
413                }
414                Message::TracksFinished => {
415                    self.flush_pending_midi_out();
416                    if let Err(e) = Self::run_assist_cycle(&self.driver, &self.assist_state) {
417                        error!("{} assist cycle error: {}", B::LABEL, e);
418                        let _ = self
419                            .tx
420                            .send(Message::Response(Err(format!(
421                                "{} assist cycle error: {}",
422                                B::LABEL,
423                                e
424                            ))))
425                            .await;
426                    }
427                    if let Err(e) = self.tx.send(Message::HWFinished).await {
428                        error!(
429                            "{} worker failed to send HWFinished to engine: {}",
430                            B::LABEL,
431                            e
432                        );
433                    }
434                }
435                Message::HWMidiOutEvents(mut events) => {
436                    self.pending_midi_out_events.append(&mut events);
437                    self.pending_midi_out_sorted = false;
438                }
439                Message::ClearHWMidiOutEvents => {
440                    self.pending_midi_out_events.clear();
441                    self.pending_midi_out_sorted = true;
442                }
443                _ => {}
444            }
445        }
446    }
447
448    fn flush_pending_midi_out(&mut self) {
449        if self.pending_midi_out_events.is_empty() {
450            return;
451        }
452        if !self.pending_midi_out_sorted {
453            self.pending_midi_out_events.sort_by(|a, b| {
454                a.event
455                    .frame
456                    .cmp(&b.event.frame)
457                    .then_with(|| a.device.cmp(&b.device))
458            });
459            self.pending_midi_out_sorted = true;
460        }
461        let midi_hub = self.midi_hub.lock();
462        midi_hub.write_events(&self.pending_midi_out_events);
463        self.pending_midi_out_events.clear();
464    }
465
466    fn shutdown_midi(&mut self, midi_handle: JoinHandle<()>) {
467        self.midi_stop.store(true, Ordering::Release);
468        {
469            let midi_hub = self.midi_hub.lock();
470            midi_hub.wake_input_waiter();
471        }
472        let _ = midi_handle.join();
473        {
474            let midi_hub = self.midi_hub.lock();
475            midi_hub.close_input_waiter();
476        }
477    }
478
479    #[cfg(unix)]
480    fn shutdown_quit(&mut self, midi_handle: JoinHandle<()>) {
481        self.driver.lock().request_stop();
482        self.flush_pending_midi_out();
483        self.shutdown_midi(midi_handle);
484        self.driver.lock().request_stop();
485    }
486
487    #[cfg(unix)]
488    fn shutdown_channel_closed(&mut self, midi_handle: JoinHandle<()>) {
489        self.driver.lock().request_stop();
490        self.shutdown_midi(midi_handle);
491        self.driver.lock().request_stop();
492    }
493
494    fn start_midi_input_thread(
495        midi_hub: Arc<UnsafeMutex<B::MidiHub>>,
496        tx: Sender<Message>,
497        cycle_frames: u32,
498        stop: Arc<AtomicBool>,
499    ) -> JoinHandle<()> {
500        std::thread::spawn(move || {
501            crate::enable_flush_denormals_to_zero();
502            let mut midi_in_events = Vec::with_capacity(64);
503            while !stop.load(Ordering::Acquire) {
504                let ready_fds = {
505                    let hub = midi_hub.lock();
506                    hub.wait_ready_blocking()
507                };
508                if stop.load(Ordering::Acquire) {
509                    break;
510                }
511                {
512                    let hub = midi_hub.lock();
513                    hub.read_events_for_fds(
514                        ready_fds.as_deref().unwrap_or(&[]),
515                        &mut midi_in_events,
516                    );
517                }
518                if midi_in_events.is_empty() {
519                    continue;
520                }
521                spread_hw_event_frames(&mut midi_in_events, cycle_frames);
522                let cap = midi_in_events.capacity();
523                let out = std::mem::replace(&mut midi_in_events, Vec::with_capacity(cap.max(64)));
524                if tx.blocking_send(Message::HWMidiEvents(out)).is_err() {
525                    break;
526                }
527            }
528        })
529    }
530
531    fn start_assist_thread(
532        driver: Arc<UnsafeMutex<B::Driver>>,
533        assist_state: Arc<(Mutex<AssistState>, Condvar)>,
534    ) -> JoinHandle<()> {
535        let profile = Self::profile_enabled();
536        let autonomous = Self::assist_autonomous_enabled();
537        std::thread::spawn(move || {
538            crate::enable_flush_denormals_to_zero();
539            if let Err(e) = Self::configure_rt_thread(B::ASSIST_THREAD_NAME, RT_PRIORITY_ASSIST) {
540                error!("{} assist realtime priority not enabled: {}", B::LABEL, e);
541            }
542            #[cfg(target_os = "macos")]
543            unsafe {
544                libc::pthread_set_qos_class_self_np(libc::qos_class_t::QOS_CLASS_USER_INITIATED, 0);
545            }
546            let mut profiler = if profile {
547                let (cycle_samples, sample_rate) = {
548                    let d = driver.lock();
549                    (d.cycle_samples(), d.sample_rate())
550                };
551                error!(
552                    "{} profile enabled: cycle_samples={} sample_rate={} expected_cps={:.1}",
553                    B::LABEL,
554                    cycle_samples,
555                    sample_rate,
556                    if cycle_samples > 0 {
557                        sample_rate as f64 / cycle_samples as f64
558                    } else {
559                        0.0
560                    }
561                );
562                Some(AssistProfiler::new())
563            } else {
564                None
565            };
566            let (lock, cvar) = &*assist_state;
567            loop {
568                let (shutdown, has_request, target, init_complete) = {
569                    let st = lock.lock().expect("assist mutex poisoned");
570                    (
571                        st.shutdown,
572                        st.request_seq > st.done_seq,
573                        st.request_seq,
574                        st.init_complete,
575                    )
576                };
577                if shutdown {
578                    break;
579                }
580                if has_request {
581                    let started = Instant::now();
582                    let run_error = {
583                        let d = driver.lock();
584                        d.run_cycle_for_worker().err().map(|e| e.to_string())
585                    };
586                    if let Some(p) = profiler.as_mut() {
587                        p.cycle_count += 1;
588                        if run_error.is_some() {
589                            p.cycle_err_count += 1;
590                        }
591                        p.cycle_time_ns += started.elapsed().as_nanos();
592                        let (cycle_samples, sample_rate) = {
593                            let d = driver.lock();
594                            (d.cycle_samples(), d.sample_rate())
595                        };
596                        p.maybe_report(cycle_samples, sample_rate, B::LABEL);
597                    }
598                    let mut st = lock.lock().expect("assist mutex poisoned");
599                    st.done_seq = st.done_seq.max(target);
600                    if run_error.is_none() {
601                        st.init_complete = true;
602                    }
603                    st.last_error = run_error;
604                    cvar.notify_all();
605                    continue;
606                }
607
608                if B::ASSIST_STEP_REQUIRES_REQUEST_CYCLE && !init_complete {
609                    let st = lock.lock().expect("assist mutex poisoned");
610                    if st.shutdown {
611                        break;
612                    }
613                    let wait_started = Instant::now();
614                    let _guard = cvar.wait(st).expect("assist condvar failed");
615                    if let Some(p) = profiler.as_mut() {
616                        p.wait_count += 1;
617                        p.wait_time_ns += wait_started.elapsed().as_nanos();
618                    }
619                    continue;
620                }
621
622                if !autonomous {
623                    let st = lock.lock().expect("assist mutex poisoned");
624                    if st.shutdown {
625                        break;
626                    }
627                    let wait_started = Instant::now();
628                    let _guard = cvar.wait(st).expect("assist condvar failed");
629                    if let Some(p) = profiler.as_mut() {
630                        p.wait_count += 1;
631                        p.wait_time_ns += wait_started.elapsed().as_nanos();
632                    }
633                    continue;
634                }
635
636                let started = Instant::now();
637                let did_work = {
638                    let d = driver.lock();
639                    match d.run_assist_step_for_worker() {
640                        Ok(v) => v,
641                        Err(e) => {
642                            if let Some(p) = profiler.as_mut() {
643                                p.step_err_count += 1;
644                            }
645                            let mut st = lock.lock().expect("assist mutex poisoned");
646                            st.last_error = Some(e.to_string());
647                            cvar.notify_all();
648                            false
649                        }
650                    }
651                };
652                if let Some(p) = profiler.as_mut() {
653                    p.step_count += 1;
654                    if did_work {
655                        p.step_work_count += 1;
656                    }
657                    p.step_time_ns += started.elapsed().as_nanos();
658                    let (cycle_samples, sample_rate) = {
659                        let d = driver.lock();
660                        (d.cycle_samples(), d.sample_rate())
661                    };
662                    p.maybe_report(cycle_samples, sample_rate, B::LABEL);
663                }
664                if !did_work {
665                    let st = lock.lock().expect("assist mutex poisoned");
666                    if st.shutdown {
667                        break;
668                    }
669                    let wait_started = Instant::now();
670                    let _guard = if autonomous {
671                        cvar.wait_timeout(st, Duration::from_micros(100))
672                            .expect("assist condvar failed")
673                            .0
674                    } else {
675                        cvar.wait(st).expect("assist condvar failed")
676                    };
677                    if let Some(p) = profiler.as_mut() {
678                        p.wait_count += 1;
679                        p.wait_time_ns += wait_started.elapsed().as_nanos();
680                    }
681                }
682            }
683        })
684    }
685
686    fn run_assist_cycle(
687        driver: &Arc<UnsafeMutex<B::Driver>>,
688        assist_state: &Arc<(Mutex<AssistState>, Condvar)>,
689    ) -> Result<(), String> {
690        let autonomous =
691            Self::assist_autonomous_enabled() && B::CYCLE_ON_WORKER_WHEN_ASSIST_AUTONOMOUS;
692        if autonomous {
693            let (lock, cvar) = &**assist_state;
694            {
695                let mut st = lock
696                    .lock()
697                    .map_err(|_| "assist mutex poisoned".to_string())?;
698                st.init_complete = true;
699                cvar.notify_one();
700            }
701            let result = driver.lock().run_cycle_for_worker();
702            {
703                let mut st = lock
704                    .lock()
705                    .map_err(|_| "assist mutex poisoned".to_string())?;
706                st.last_error = result.as_ref().err().map(|e| e.to_string());
707                cvar.notify_one();
708            }
709            return result;
710        }
711
712        let (lock, cvar) = &**assist_state;
713        let mut st = lock
714            .lock()
715            .map_err(|_| "assist mutex poisoned".to_string())?;
716        st.request_seq = st.request_seq.saturating_add(1);
717        let target = st.request_seq;
718        cvar.notify_one();
719        while st.done_seq < target && !st.shutdown {
720            st = cvar
721                .wait(st)
722                .map_err(|_| "assist condvar wait failed".to_string())?;
723        }
724        if let Some(err) = st.last_error.take() {
725            return Err(err);
726        }
727        Ok(())
728    }
729
730    fn stop_assist_thread(
731        assist_state: &Arc<(Mutex<AssistState>, Condvar)>,
732        assist_handle: JoinHandle<()>,
733    ) {
734        let (lock, cvar) = &**assist_state;
735        if let Ok(mut st) = lock.lock() {
736            st.shutdown = true;
737            cvar.notify_all();
738        }
739        let _ = assist_handle.join();
740    }
741}
742
743fn spread_hw_event_frames(events: &mut [HwMidiEvent], frames: u32) {
744    if events.len() <= 1 || frames <= 1 {
745        return;
746    }
747    let n = events.len() as u32;
748    for (idx, event) in events.iter_mut().enumerate() {
749        let pos = idx as u32;
750        event.event.frame = ((pos as u64 * (frames - 1) as u64) / n as u64) as u32;
751    }
752}