Skip to main content

maolan_engine/plugins/
lv2_proc.rs

1//! Out-of-process LV2 processor using `maolan-engine-plugin-host` IPC.
2
3use crate::audio::io::AudioIO;
4use crate::midi::io::MidiEvent;
5use crate::mutex::UnsafeMutex;
6use maolan_plugin_protocol::events::EventPair;
7use maolan_plugin_protocol::protocol::*;
8use maolan_plugin_protocol::ringbuf::RingBuffer;
9use maolan_plugin_protocol::shm::ShmMapping;
10use std::collections::HashMap;
11use std::path::{Path, PathBuf};
12use std::process::{Child, Command, Stdio};
13use std::sync::atomic::{AtomicBool, Ordering};
14use std::sync::{Arc, atomic::AtomicU32};
15use std::time::{Duration, Instant};
16
17/// Shared state for an out-of-process LV2 plugin instance.
18pub struct Lv2Processor {
19    uri: String,
20    name: String,
21    audio_inputs: Vec<Arc<AudioIO>>,
22    audio_outputs: Vec<Arc<AudioIO>>,
23    main_audio_inputs: usize,
24    main_audio_outputs: usize,
25    param_values: UnsafeMutex<HashMap<u32, f64>>,
26    bypassed: Arc<AtomicBool>,
27    // IPC state
28    child: UnsafeMutex<Option<Child>>,
29    mapping: Option<ShmMapping>,
30    events: Option<EventPair>,
31    shm_name: String,
32    // Crash recovery
33    crash_count: AtomicU32,
34    last_process_time: UnsafeMutex<Instant>,
35}
36
37pub type SharedLv2Processor = Arc<UnsafeMutex<Lv2Processor>>;
38
39impl Lv2Processor {
40    pub fn new(
41        sample_rate: f64,
42        buffer_size: usize,
43        plugin_uri: &str,
44        input_count: usize,
45        output_count: usize,
46        host_binary: PathBuf,
47    ) -> Result<Self, String> {
48        let audio_inputs = (0..input_count.max(1))
49            .map(|_| Arc::new(AudioIO::new(buffer_size)))
50            .collect::<Vec<_>>();
51        let audio_outputs = (0..output_count.max(1))
52            .map(|_| Arc::new(AudioIO::new(buffer_size)))
53            .collect::<Vec<_>>();
54
55        let instance_id = format!("lv2-{}", std::process::id());
56        let (mut child, mapping, events, shm_name) = spawn_host(
57            &host_binary,
58            plugin_uri,
59            &instance_id,
60            sample_rate,
61            buffer_size,
62            input_count.max(1),
63            output_count.max(1),
64        )?;
65
66        let header = unsafe { header_ref(mapping.as_ptr()) };
67        if !wait_for_ready(header, Duration::from_secs(10)) {
68            let _ = child.kill();
69            return Err("LV2 host did not signal ready".to_string());
70        }
71
72        let name = plugin_uri
73            .rsplit_once('/')
74            .map(|(_, name)| name)
75            .unwrap_or(plugin_uri)
76            .to_string();
77
78        Ok(Self {
79            uri: plugin_uri.to_string(),
80            name,
81            audio_inputs,
82            audio_outputs,
83            main_audio_inputs: input_count.max(1),
84            main_audio_outputs: output_count.max(1),
85            param_values: UnsafeMutex::new(HashMap::new()),
86            bypassed: Arc::new(AtomicBool::new(false)),
87            child: UnsafeMutex::new(Some(child)),
88            mapping: Some(mapping),
89            events: Some(events),
90            shm_name,
91            crash_count: AtomicU32::new(0),
92            last_process_time: UnsafeMutex::new(Instant::now()),
93        })
94    }
95
96    pub fn setup_audio_ports(&self) {
97        for port in &self.audio_inputs {
98            port.setup();
99        }
100        for port in &self.audio_outputs {
101            port.setup();
102        }
103    }
104
105    pub fn audio_inputs(&self) -> &[Arc<AudioIO>] {
106        &self.audio_inputs
107    }
108
109    pub fn audio_outputs(&self) -> &[Arc<AudioIO>] {
110        &self.audio_outputs
111    }
112
113    pub fn main_audio_input_count(&self) -> usize {
114        self.main_audio_inputs
115    }
116
117    pub fn main_audio_output_count(&self) -> usize {
118        self.main_audio_outputs
119    }
120
121    pub fn midi_input_count(&self) -> usize {
122        0
123    }
124
125    pub fn midi_output_count(&self) -> usize {
126        0
127    }
128
129    pub fn set_bypassed(&self, bypassed: bool) {
130        self.bypassed.store(bypassed, Ordering::Relaxed);
131    }
132
133    pub fn is_bypassed(&self) -> bool {
134        self.bypassed.load(Ordering::Relaxed)
135    }
136
137    pub fn parameter_values(&self) -> HashMap<u32, f64> {
138        self.param_values.lock().clone()
139    }
140
141    pub fn set_parameter(&self, param_id: u32, value: f64) -> Result<(), String> {
142        self.set_parameter_at(param_id, value, 0)
143    }
144
145    pub fn set_parameter_at(&self, param_id: u32, value: f64, _frame: u32) -> Result<(), String> {
146        self.param_values.lock().insert(param_id, value);
147        if let Some(ref mapping) = self.mapping {
148            let ring = unsafe {
149                let buf = param_ring_ptr(mapping.as_ptr());
150                let (w, r) = param_indices(mapping.as_ptr());
151                RingBuffer::new(buf, w, r, RING_CAPACITY)
152            };
153            let ev = ParameterEvent {
154                param_index: param_id,
155                value: value as f32,
156                sample_offset: 0,
157                event_kind: maolan_plugin_protocol::PARAM_EVENT_VALUE,
158            };
159            if !ring.push(ev) {
160                tracing::warn!("LV2 param ring full, dropping parameter event");
161            }
162        }
163        Ok(())
164    }
165
166    pub fn begin_parameter_edit(&self, _param_id: u32) -> Result<(), String> {
167        Ok(())
168    }
169
170    pub fn end_parameter_edit(&self, _param_id: u32) -> Result<(), String> {
171        Ok(())
172    }
173
174    pub fn is_parameter_edit_active(&self, _param_id: u32) -> bool {
175        false
176    }
177
178    pub fn snapshot_state(&self) -> Result<Vec<u8>, String> {
179        let (mapping, events) = match (&self.mapping, &self.events) {
180            (Some(m), Some(e)) => (m, e),
181            _ => return Err("LV2 processor not initialized".to_string()),
182        };
183        let ptr = mapping.as_ptr();
184        let header = unsafe { header_mut(ptr) };
185
186        header.request_type.store(1, Ordering::Release);
187        header.request_status.store(0, Ordering::Release);
188        if let Err(e) = events.signal_host() {
189            header.request_type.store(0, Ordering::Release);
190            return Err(format!("Failed to signal host for state save: {}", e));
191        }
192
193        if let Err(e) = events.wait_host(Duration::from_secs(5)) {
194            header.request_type.store(0, Ordering::Release);
195            return Err(format!("Host did not respond to state save: {}", e));
196        }
197
198        let status = header.request_status.load(Ordering::Acquire);
199        let size = header.scratch_size.load(Ordering::Acquire) as usize;
200        if status != 1 {
201            header.request_type.store(0, Ordering::Release);
202            return Err("State save failed in host".to_string());
203        }
204
205        let scratch = unsafe { scratch_ptr(ptr) };
206        let mut bytes = vec![0u8; size];
207        unsafe {
208            std::ptr::copy_nonoverlapping(scratch, bytes.as_mut_ptr(), size);
209        }
210        header.request_type.store(0, Ordering::Release);
211        Ok(bytes)
212    }
213
214    pub fn restore_state(&self, state: &[u8]) -> Result<(), String> {
215        let (mapping, events) = match (&self.mapping, &self.events) {
216            (Some(m), Some(e)) => (m, e),
217            _ => return Err("LV2 processor not initialized".to_string()),
218        };
219        let ptr = mapping.as_ptr();
220        let header = unsafe { header_mut(ptr) };
221
222        let scratch = unsafe { scratch_ptr(ptr) };
223        let size = state.len().min(SCRATCH_SIZE);
224        unsafe {
225            std::ptr::copy_nonoverlapping(state.as_ptr(), scratch, size);
226        }
227        header.scratch_size.store(size as u32, Ordering::Release);
228
229        header.request_type.store(2, Ordering::Release);
230        header.request_status.store(0, Ordering::Release);
231        if let Err(e) = events.signal_host() {
232            header.request_type.store(0, Ordering::Release);
233            return Err(format!("Failed to signal host for state restore: {}", e));
234        }
235
236        if let Err(e) = events.wait_host(Duration::from_secs(5)) {
237            header.request_type.store(0, Ordering::Release);
238            return Err(format!("Host did not respond to state restore: {}", e));
239        }
240
241        let status = header.request_status.load(Ordering::Acquire);
242        header.request_type.store(0, Ordering::Release);
243        if status != 1 {
244            return Err("State restore failed in host".to_string());
245        }
246        Ok(())
247    }
248
249    pub fn process_with_audio_io(&self, frames: usize) {
250        let _ = self.process_with_midi(frames, &[]);
251    }
252
253    pub fn process_with_midi(&self, frames: usize, _midi_in: &[MidiEvent]) -> Vec<MidiEvent> {
254        if self.bypassed.load(Ordering::Relaxed) {
255            self.bypass_copy_inputs_to_outputs();
256            return Vec::new();
257        }
258
259        // Check if host process has crashed.
260        {
261            let child = self.child.lock();
262            if let Some(ref mut c) = child.as_mut() {
263                match c.try_wait() {
264                    Ok(Some(status)) if !status.success() => {
265                        tracing::error!(
266                            "LV2 plugin host crashed for '{}' ({})",
267                            self.name,
268                            self.uri
269                        );
270                        self.crash_count.fetch_add(1, Ordering::Relaxed);
271                        self.bypass_copy_inputs_to_outputs();
272                        return Vec::new();
273                    }
274                    Ok(Some(status)) => {
275                        eprintln!("[LV2 debug] host exited with success: {:?}", status);
276                    }
277                    Ok(None) => {
278                        eprintln!("[LV2 debug] host still alive");
279                    }
280                    Err(e) => {
281                        eprintln!("[LV2 debug] try_wait error: {}", e);
282                    }
283                }
284            }
285        }
286
287        let started = Instant::now();
288
289        let (mapping, events) = match (&self.mapping, &self.events) {
290            (Some(m), Some(e)) => (m, e),
291            _ => {
292                self.bypass_copy_inputs_to_outputs();
293                return Vec::new();
294            }
295        };
296
297        let ptr = mapping.as_ptr();
298        let num_in = self.audio_inputs.len();
299        let num_out = self.audio_outputs.len();
300        unsafe {
301            let h = header_mut(ptr);
302            h.block_size.store(frames as u32, Ordering::Release);
303            h.num_input_channels.store(num_in as u32, Ordering::Release);
304            h.num_output_channels
305                .store(num_out as u32, Ordering::Release);
306            // Write default transport state (can be overridden by track later).
307            let t = transport_mut(ptr);
308            t.playhead_sample = 0;
309            t.tempo = 120.0;
310            t.numerator = 4;
311            t.denominator = 4;
312            t.flags = 1; // playing
313        }
314
315        // Copy input AudioIO buffers to shared memory (bus 0).
316        for (ch, input) in self.audio_inputs.iter().enumerate() {
317            let src = input.buffer.lock();
318            let dst = unsafe { audio_channel_ptr(ptr, ch, 0) };
319            let len = frames.min(src.len());
320            unsafe {
321                std::ptr::copy_nonoverlapping(src.as_ptr(), dst, len);
322            }
323        }
324
325        // Signal host to process.
326        if let Err(e) = events.signal_host() {
327            tracing::error!("Failed to signal LV2 host: {e}");
328            self.bypass_copy_inputs_to_outputs();
329            return Vec::new();
330        }
331        eprintln!("[LV2 debug] signal_host succeeded");
332
333        // Wait for host to complete (with timeout).
334        let timeout = Duration::from_millis(100);
335        match events.wait_host(timeout) {
336            Ok(()) => {
337                eprintln!("[LV2 debug] wait_host succeeded");
338            }
339            Err(e) => {
340                eprintln!(
341                    "[LV2 debug] host did not respond for '{}' ({}): {}",
342                    self.name, self.uri, e
343                );
344                self.bypass_copy_inputs_to_outputs();
345                return Vec::new();
346            }
347        }
348
349        // Copy output shared memory (bus 1) back to AudioIO buffers.
350        for (ch, output) in self.audio_outputs.iter().enumerate() {
351            let dst = output.buffer.lock();
352            let src = unsafe { audio_channel_ptr(ptr, ch, 1) };
353            let len = frames.min(dst.len());
354            unsafe {
355                std::ptr::copy_nonoverlapping(src, dst.as_mut_ptr(), len);
356            }
357            *output.finished.lock() = true;
358        }
359
360        let elapsed = started.elapsed();
361        if elapsed > Duration::from_millis(20) {
362            tracing::warn!(
363                "Slow LV2 process '{}' ({}) took {:.3} ms for {} frames",
364                self.name,
365                self.uri,
366                elapsed.as_secs_f64() * 1000.0,
367                frames
368            );
369        }
370
371        *self.last_process_time.lock() = Instant::now();
372        Vec::new()
373    }
374
375    fn bypass_copy_inputs_to_outputs(&self) {
376        for (input, output) in self.audio_inputs.iter().zip(self.audio_outputs.iter()) {
377            let src = input.buffer.lock();
378            let dst = output.buffer.lock();
379            dst.fill(0.0);
380            for (d, s) in dst.iter_mut().zip(src.iter()) {
381                *d = *s;
382            }
383            *output.finished.lock() = true;
384        }
385        for output in self.audio_outputs.iter().skip(self.audio_inputs.len()) {
386            output.buffer.lock().fill(0.0);
387            *output.finished.lock() = true;
388        }
389    }
390
391    pub fn uri(&self) -> &str {
392        &self.uri
393    }
394
395    pub fn name(&self) -> &str {
396        &self.name
397    }
398
399    pub fn begin_parameter_edit_at(&self, _param_id: u32, _frame: u32) -> Result<(), String> {
400        Ok(())
401    }
402
403    pub fn end_parameter_edit_at(&self, _param_id: u32, _frame: u32) -> Result<(), String> {
404        Ok(())
405    }
406
407    pub fn run_host_callbacks_main_thread(&self) {}
408
409    pub fn reconfigure_ports_if_needed(&self) -> Result<bool, String> {
410        Ok(false)
411    }
412
413    pub fn gui_show(&self) -> Result<(), String> {
414        if let Some(ref mapping) = self.mapping
415            && let Some(ref events) = self.events
416        {
417            let header = unsafe { header_mut(mapping.as_ptr()) };
418            header.request_type.store(3, Ordering::Release);
419            let _ = events.signal_host();
420            return Ok(());
421        }
422        Err("No active host to show GUI".to_string())
423    }
424
425    pub fn gui_hide(&self) {
426        if let Some(ref mapping) = self.mapping
427            && let Some(ref events) = self.events
428        {
429            let header = unsafe { header_mut(mapping.as_ptr()) };
430            header.request_type.store(4, Ordering::Release);
431            let _ = events.signal_host();
432        }
433    }
434
435    pub fn drain_echoed_parameters(&self) -> Vec<ParameterEvent> {
436        let mut result = Vec::new();
437        if let Some(ref mapping) = self.mapping {
438            let ring = unsafe {
439                let buf = echo_ring_ptr(mapping.as_ptr());
440                let (w, r) = echo_indices(mapping.as_ptr());
441                RingBuffer::new(buf, w, r, RING_CAPACITY)
442            };
443            while let Some(ev) = ring.pop() {
444                result.push(ev);
445            }
446        }
447        result
448    }
449}
450
451impl Drop for Lv2Processor {
452    fn drop(&mut self) {
453        if let Some(ref mapping) = self.mapping
454            && let Some(ref events) = self.events
455        {
456            let header = unsafe { header_mut(mapping.as_ptr()) };
457            header.shutdown_request.store(1, Ordering::Release);
458            let _ = events.signal_host();
459        }
460        let mut child_opt = self.child.lock().take();
461        if let Some(mut child) = child_opt.take() {
462            let start = Instant::now();
463            while start.elapsed() < Duration::from_secs(2) {
464                if child.try_wait().map(|s| s.is_some()).unwrap_or(true) {
465                    break;
466                }
467                std::thread::sleep(Duration::from_millis(10));
468            }
469            if child.try_wait().map(|s| s.is_none()).unwrap_or(false) {
470                let _ = child.kill();
471            }
472        }
473        let _ = ShmMapping::unlink(&self.shm_name);
474    }
475}
476
477impl UnsafeMutex<Lv2Processor> {
478    pub fn setup_audio_ports(&self) {
479        self.lock().setup_audio_ports();
480    }
481
482    pub fn process_with_midi(&self, frames: usize, midi_events: &[MidiEvent]) -> Vec<MidiEvent> {
483        self.lock().process_with_midi(frames, midi_events)
484    }
485
486    pub fn set_bypassed(&self, bypassed: bool) {
487        self.lock().set_bypassed(bypassed);
488    }
489
490    pub fn is_bypassed(&self) -> bool {
491        self.lock().is_bypassed()
492    }
493
494    pub fn set_parameter(&self, param_id: u32, value: f64) -> Result<(), String> {
495        self.lock().set_parameter(param_id, value)
496    }
497
498    pub fn set_parameter_at(&self, param_id: u32, value: f64, frame: u32) -> Result<(), String> {
499        self.lock().set_parameter_at(param_id, value, frame)
500    }
501
502    pub fn begin_parameter_edit_at(&self, param_id: u32, frame: u32) -> Result<(), String> {
503        self.lock().begin_parameter_edit_at(param_id, frame)
504    }
505
506    pub fn end_parameter_edit_at(&self, param_id: u32, frame: u32) -> Result<(), String> {
507        self.lock().end_parameter_edit_at(param_id, frame)
508    }
509
510    pub fn snapshot_state(&self) -> Result<Vec<u8>, String> {
511        self.lock().snapshot_state()
512    }
513
514    pub fn restore_state(&self, state: &[u8]) -> Result<(), String> {
515        self.lock().restore_state(state)
516    }
517
518    pub fn audio_inputs(&self) -> &[Arc<AudioIO>] {
519        self.lock().audio_inputs()
520    }
521
522    pub fn audio_outputs(&self) -> &[Arc<AudioIO>] {
523        self.lock().audio_outputs()
524    }
525
526    pub fn main_audio_input_count(&self) -> usize {
527        self.lock().main_audio_input_count()
528    }
529
530    pub fn main_audio_output_count(&self) -> usize {
531        self.lock().main_audio_output_count()
532    }
533
534    pub fn midi_input_count(&self) -> usize {
535        self.lock().midi_input_count()
536    }
537
538    pub fn midi_output_count(&self) -> usize {
539        self.lock().midi_output_count()
540    }
541
542    pub fn uri(&self) -> String {
543        self.lock().uri().to_string()
544    }
545
546    pub fn name(&self) -> String {
547        self.lock().name().to_string()
548    }
549
550    pub fn run_host_callbacks_main_thread(&self) {
551        self.lock().run_host_callbacks_main_thread();
552    }
553
554    pub fn reconfigure_ports_if_needed(&self) -> Result<bool, String> {
555        self.lock().reconfigure_ports_if_needed()
556    }
557}
558
559pub fn find_host_binary() -> Option<PathBuf> {
560    let exe_dir = std::env::current_exe()
561        .ok()
562        .and_then(|p| p.parent().map(PathBuf::from));
563
564    if let Some(ref dir) = exe_dir {
565        let candidate = dir.join("maolan-engine-plugin-host");
566        if candidate.exists() {
567            return Some(candidate);
568        }
569    }
570
571    if let Ok(manifest) = std::env::var("CARGO_MANIFEST_DIR") {
572        let engine_root = Path::new(&manifest);
573        for profile in ["debug", "release"] {
574            let candidate = engine_root
575                .join("target")
576                .join(profile)
577                .join("maolan-engine-plugin-host");
578            if candidate.exists() {
579                return Some(candidate);
580            }
581        }
582    }
583
584    if let Ok(manifest) = std::env::var("CARGO_MANIFEST_DIR") {
585        let engine_root = Path::new(&manifest);
586        for profile in ["debug", "release"] {
587            let candidate = engine_root
588                .parent()
589                .unwrap_or(Path::new(""))
590                .join("daw")
591                .join("target")
592                .join(profile)
593                .join("maolan-engine-plugin-host");
594            if candidate.exists() {
595                return Some(candidate);
596            }
597        }
598    }
599
600    if let Ok(path_var) = std::env::var("PATH") {
601        for dir in path_var.split(':') {
602            let candidate = Path::new(dir).join("maolan-engine-plugin-host");
603            if candidate.exists() {
604                return Some(candidate);
605            }
606        }
607    }
608
609    None
610}
611
612fn spawn_host(
613    host_binary: &PathBuf,
614    plugin_uri: &str,
615    instance_id: &str,
616    sample_rate: f64,
617    buffer_size: usize,
618    num_inputs: usize,
619    num_outputs: usize,
620) -> Result<(Child, ShmMapping, EventPair, String), String> {
621    let pid = std::process::id();
622    let shm_name = format!("/maolan-{pid}-{instance_id}");
623
624    let mapping = ShmMapping::create(&shm_name, SHM_SIZE)?;
625    unsafe {
626        init_shm_layout(mapping.as_ptr(), mapping.size());
627    }
628
629    let mut events = EventPair::new().map_err(|e| format!("failed to create pipes: {e}"))?;
630
631    let mut cmd = Command::new(host_binary);
632    cmd.arg("lv2")
633        .arg(plugin_uri)
634        .arg(&shm_name)
635        .arg(instance_id)
636        .arg(events.host_read_fd().to_string())
637        .arg(events.host_write_fd().to_string())
638        .arg(sample_rate.to_string())
639        .arg(buffer_size.to_string())
640        .arg(num_inputs.to_string())
641        .arg(num_outputs.to_string())
642        .stdin(Stdio::null())
643        .stdout(Stdio::null())
644        .stderr(Stdio::inherit());
645
646    let child = cmd
647        .spawn()
648        .map_err(|e| format!("failed to spawn LV2 host: {e}"))?;
649
650    events.close_daw_unused();
651
652    Ok((child, mapping, events, shm_name))
653}
654
655fn wait_for_ready(header: &ShmHeader, timeout: Duration) -> bool {
656    let start = Instant::now();
657    while start.elapsed() < timeout {
658        if header.ready.load(Ordering::Acquire) != 0 {
659            return true;
660        }
661        std::thread::sleep(Duration::from_millis(10));
662    }
663    false
664}
665
666#[cfg(test)]
667mod tests {
668    use super::*;
669
670    fn find_host_binary() -> PathBuf {
671        super::find_host_binary()
672            .expect("maolan-engine-plugin-host binary should be built for tests")
673    }
674
675    #[test]
676    fn find_host_binary_locates_binary() {
677        let host_bin = find_host_binary();
678        assert!(
679            host_bin.exists(),
680            "plugin-host binary should exist at {}",
681            host_bin.display()
682        );
683    }
684
685    #[test]
686    fn lv2_processor_crash_bypass() {
687        let host_bin = find_host_binary();
688
689        let processor = Lv2Processor::new(48000.0, 256, "__crash__", 1, 1, host_bin)
690            .expect("should create LV2 processor for crash test");
691
692        processor.setup_audio_ports();
693
694        // Fill input buffer.
695        {
696            let buf = processor.audio_inputs()[0].buffer.lock();
697            buf.fill(1.0);
698            *processor.audio_inputs()[0].finished.lock() = true;
699        }
700
701        // First process should trigger the crash; subsequent calls should bypass.
702        processor.process_with_audio_io(256);
703
704        // After crash, output should be a copy of input (bypass).
705        let out_buf = processor.audio_outputs()[0].buffer.lock();
706        let first_few: Vec<f32> = out_buf.iter().take(10).copied().collect();
707        assert!(
708            out_buf.iter().all(|&s| s == 1.0),
709            "after crash, output should be bypass copy of input, got: {:?}",
710            first_few
711        );
712    }
713}