Skip to main content

maolan_engine/plugins/
lv2_proc.rs

1//! Out-of-process LV2 processor using `maolan-plugin-host` IPC.
2
3use crate::audio::io::AudioIO;
4use crate::midi::io::MidiEvent;
5use crate::mutex::UnsafeMutex;
6use crate::plugins::ipc;
7use maolan_plugin_protocol::events::EventPair;
8use maolan_plugin_protocol::protocol::*;
9use maolan_plugin_protocol::ringbuf::RingBuffer;
10use maolan_plugin_protocol::shm::ShmMapping;
11use std::collections::HashMap;
12use std::path::PathBuf;
13use std::process::Child;
14use std::sync::atomic::{AtomicBool, Ordering};
15use std::sync::{Arc, atomic::AtomicU32};
16use std::time::{Duration, Instant};
17
18/// Shared state for an out-of-process LV2 plugin instance.
19pub struct Lv2Processor {
20    uri: String,
21    name: String,
22    audio_inputs: Vec<Arc<AudioIO>>,
23    audio_outputs: Vec<Arc<AudioIO>>,
24    main_audio_inputs: usize,
25    main_audio_outputs: usize,
26    param_values: UnsafeMutex<HashMap<u32, f64>>,
27    bypassed: Arc<AtomicBool>,
28    // IPC state
29    child: UnsafeMutex<Option<Child>>,
30    mapping: Option<ShmMapping>,
31    events: Option<EventPair>,
32    shm_name: String,
33    // Crash recovery
34    crash_count: AtomicU32,
35    last_process_time: UnsafeMutex<Instant>,
36}
37
38pub type SharedLv2Processor = Arc<UnsafeMutex<Lv2Processor>>;
39
40impl Lv2Processor {
41    pub fn new(
42        sample_rate: f64,
43        buffer_size: usize,
44        plugin_uri: &str,
45        input_count: usize,
46        output_count: usize,
47        host_binary: PathBuf,
48    ) -> Result<Self, String> {
49        let audio_inputs = (0..input_count.max(1))
50            .map(|_| Arc::new(AudioIO::new(buffer_size)))
51            .collect::<Vec<_>>();
52        let audio_outputs = (0..output_count.max(1))
53            .map(|_| Arc::new(AudioIO::new(buffer_size)))
54            .collect::<Vec<_>>();
55
56        let instance_id = format!("lv2-{}", std::process::id());
57        let sample_rate_str = sample_rate.to_string();
58        let buffer_size_str = buffer_size.to_string();
59        let num_inputs_str = input_count.max(1).to_string();
60        let num_outputs_str = output_count.max(1).to_string();
61        let (mut child, mapping, events, shm_name) = ipc::spawn_host(ipc::HostSpawnArgs {
62            host_binary: &host_binary,
63            format: "lv2",
64            plugin_spec: plugin_uri,
65            instance_id: &instance_id,
66            extra_args: &[
67                &sample_rate_str,
68                &buffer_size_str,
69                &num_inputs_str,
70                &num_outputs_str,
71            ],
72        })?;
73
74        let header = unsafe { header_ref(mapping.as_ptr()) };
75        if !ipc::wait_for_ready(header, Duration::from_secs(10)) {
76            let _ = child.kill();
77            return Err("LV2 host did not signal ready".to_string());
78        }
79
80        let name = unsafe {
81            let mut name = None;
82            for _ in 0..50 {
83                name = maolan_plugin_protocol::protocol::read_plugin_name_from_scratch(
84                    mapping.as_ptr(),
85                );
86                if name.is_some() {
87                    break;
88                }
89                std::thread::sleep(std::time::Duration::from_millis(10));
90            }
91            name.unwrap_or_else(|| {
92                plugin_uri
93                    .rsplit_once('/')
94                    .map(|(_, name)| name)
95                    .unwrap_or(plugin_uri)
96                    .to_string()
97            })
98        };
99
100        Ok(Self {
101            uri: plugin_uri.to_string(),
102            name,
103            audio_inputs,
104            audio_outputs,
105            main_audio_inputs: input_count.max(1),
106            main_audio_outputs: output_count.max(1),
107            param_values: UnsafeMutex::new(HashMap::new()),
108            bypassed: Arc::new(AtomicBool::new(false)),
109            child: UnsafeMutex::new(Some(child)),
110            mapping: Some(mapping),
111            events: Some(events),
112            shm_name,
113            crash_count: AtomicU32::new(0),
114            last_process_time: UnsafeMutex::new(Instant::now()),
115        })
116    }
117
118    pub fn setup_audio_ports(&self) {
119        for port in &self.audio_inputs {
120            port.setup();
121        }
122        for port in &self.audio_outputs {
123            port.setup();
124        }
125    }
126
127    pub fn audio_inputs(&self) -> &[Arc<AudioIO>] {
128        &self.audio_inputs
129    }
130
131    pub fn audio_outputs(&self) -> &[Arc<AudioIO>] {
132        &self.audio_outputs
133    }
134
135    pub fn main_audio_input_count(&self) -> usize {
136        self.main_audio_inputs
137    }
138
139    pub fn main_audio_output_count(&self) -> usize {
140        self.main_audio_outputs
141    }
142
143    pub fn midi_input_count(&self) -> usize {
144        0
145    }
146
147    pub fn midi_output_count(&self) -> usize {
148        0
149    }
150
151    pub fn set_bypassed(&self, bypassed: bool) {
152        self.bypassed.store(bypassed, Ordering::Relaxed);
153    }
154
155    pub fn is_bypassed(&self) -> bool {
156        self.bypassed.load(Ordering::Relaxed)
157    }
158
159    pub fn parameter_values(&self) -> HashMap<u32, f64> {
160        self.param_values.lock().clone()
161    }
162
163    pub fn set_parameter(&self, param_id: u32, value: f64) -> Result<(), String> {
164        self.set_parameter_at(param_id, value, 0)
165    }
166
167    pub fn set_parameter_at(&self, param_id: u32, value: f64, _frame: u32) -> Result<(), String> {
168        self.param_values.lock().insert(param_id, value);
169        if let Some(ref mapping) = self.mapping {
170            let ring = unsafe {
171                let buf = param_ring_ptr(mapping.as_ptr());
172                let (w, r) = param_indices(mapping.as_ptr());
173                RingBuffer::new(buf, w, r, RING_CAPACITY)
174            };
175            let ev = ParameterEvent {
176                param_index: param_id,
177                value: value as f32,
178                sample_offset: 0,
179                event_kind: maolan_plugin_protocol::PARAM_EVENT_VALUE,
180            };
181            if !ring.push(ev) {
182                tracing::warn!("LV2 param ring full, dropping parameter event");
183            }
184        }
185        Ok(())
186    }
187
188    pub fn begin_parameter_edit(&self, _param_id: u32) -> Result<(), String> {
189        Ok(())
190    }
191
192    pub fn end_parameter_edit(&self, _param_id: u32) -> Result<(), String> {
193        Ok(())
194    }
195
196    pub fn is_parameter_edit_active(&self, _param_id: u32) -> bool {
197        false
198    }
199
200    pub fn snapshot_state(&self) -> Result<Vec<u8>, String> {
201        let (mapping, events) = match (&self.mapping, &self.events) {
202            (Some(m), Some(e)) => (m, e),
203            _ => return Err("LV2 processor not initialized".to_string()),
204        };
205        let ptr = mapping.as_ptr();
206        let header = unsafe { header_mut(ptr) };
207
208        header.request_type.store(1, Ordering::Release);
209        header.request_status.store(0, Ordering::Release);
210        if let Err(e) = events.signal_host() {
211            header.request_type.store(0, Ordering::Release);
212            return Err(format!("Failed to signal host for state save: {}", e));
213        }
214
215        if let Err(e) = events.wait_host(Duration::from_secs(5)) {
216            header.request_type.store(0, Ordering::Release);
217            return Err(format!("Host did not respond to state save: {}", e));
218        }
219
220        let status = header.request_status.load(Ordering::Acquire);
221        let size = header.scratch_size.load(Ordering::Acquire) as usize;
222        if status != 1 {
223            header.request_type.store(0, Ordering::Release);
224            return Err("State save failed in host".to_string());
225        }
226
227        let scratch = unsafe { scratch_ptr(ptr) };
228        let mut bytes = vec![0u8; size];
229        unsafe {
230            std::ptr::copy_nonoverlapping(scratch, bytes.as_mut_ptr(), size);
231        }
232        header.request_type.store(0, Ordering::Release);
233        Ok(bytes)
234    }
235
236    pub fn restore_state(&self, state: &[u8]) -> Result<(), String> {
237        let (mapping, events) = match (&self.mapping, &self.events) {
238            (Some(m), Some(e)) => (m, e),
239            _ => return Err("LV2 processor not initialized".to_string()),
240        };
241        let ptr = mapping.as_ptr();
242        let header = unsafe { header_mut(ptr) };
243
244        let scratch = unsafe { scratch_ptr(ptr) };
245        let size = state.len().min(SCRATCH_SIZE);
246        unsafe {
247            std::ptr::copy_nonoverlapping(state.as_ptr(), scratch, size);
248        }
249        header.scratch_size.store(size as u32, Ordering::Release);
250
251        header.request_type.store(2, Ordering::Release);
252        header.request_status.store(0, Ordering::Release);
253        if let Err(e) = events.signal_host() {
254            header.request_type.store(0, Ordering::Release);
255            return Err(format!("Failed to signal host for state restore: {}", e));
256        }
257
258        if let Err(e) = events.wait_host(Duration::from_secs(5)) {
259            header.request_type.store(0, Ordering::Release);
260            return Err(format!("Host did not respond to state restore: {}", e));
261        }
262
263        let status = header.request_status.load(Ordering::Acquire);
264        header.request_type.store(0, Ordering::Release);
265        if status != 1 {
266            return Err("State restore failed in host".to_string());
267        }
268        Ok(())
269    }
270
271    pub fn process_with_audio_io(&self, frames: usize) {
272        let _ = self.process_with_midi(frames, &[]);
273    }
274
275    pub fn process_with_midi(&self, frames: usize, _midi_in: &[MidiEvent]) -> Vec<MidiEvent> {
276        if self.bypassed.load(Ordering::Relaxed) {
277            ipc::bypass_copy_inputs_to_outputs(&self.audio_inputs, &self.audio_outputs);
278            return Vec::new();
279        }
280
281        // Check if host process has crashed.
282        {
283            let child = self.child.lock();
284            if let Some(ref mut c) = child.as_mut() {
285                match c.try_wait() {
286                    Ok(Some(status)) if !status.success() => {
287                        tracing::error!(
288                            "LV2 plugin host crashed for '{}' ({})",
289                            self.name,
290                            self.uri
291                        );
292                        self.crash_count.fetch_add(1, Ordering::Relaxed);
293                        ipc::bypass_copy_inputs_to_outputs(&self.audio_inputs, &self.audio_outputs);
294                        return Vec::new();
295                    }
296                    Ok(Some(status)) => {
297                        eprintln!("[LV2 debug] host exited with success: {:?}", status);
298                    }
299                    Ok(None) => {
300                        eprintln!("[LV2 debug] host still alive");
301                    }
302                    Err(e) => {
303                        eprintln!("[LV2 debug] try_wait error: {}", e);
304                    }
305                }
306            }
307        }
308
309        let started = Instant::now();
310
311        let (mapping, events) = match (&self.mapping, &self.events) {
312            (Some(m), Some(e)) => (m, e),
313            _ => {
314                ipc::bypass_copy_inputs_to_outputs(&self.audio_inputs, &self.audio_outputs);
315                return Vec::new();
316            }
317        };
318
319        let ptr = mapping.as_ptr();
320        let num_in = self.audio_inputs.len();
321        let num_out = self.audio_outputs.len();
322        unsafe {
323            ipc::configure_shm_header(ptr, frames, num_in, num_out);
324            // Write default transport state (can be overridden by track later).
325            let t = transport_mut(ptr);
326            t.playhead_sample = 0;
327            t.tempo = 120.0;
328            t.numerator = 4;
329            t.denominator = 4;
330            t.flags = 1; // playing
331
332            // Copy input AudioIO buffers to shared memory (bus 0).
333            ipc::copy_inputs_to_shm(&self.audio_inputs, ptr, frames);
334        }
335
336        // Signal host to process.
337        if let Err(e) = events.signal_host() {
338            tracing::error!("Failed to signal LV2 host: {e}");
339            ipc::bypass_copy_inputs_to_outputs(&self.audio_inputs, &self.audio_outputs);
340            return Vec::new();
341        }
342        eprintln!("[LV2 debug] signal_host succeeded");
343
344        // Wait for host to complete (with timeout).
345        let timeout = Duration::from_millis(100);
346        match events.wait_host(timeout) {
347            Ok(()) => {
348                eprintln!("[LV2 debug] wait_host succeeded");
349            }
350            Err(e) => {
351                eprintln!(
352                    "[LV2 debug] host did not respond for '{}' ({}): {}",
353                    self.name, self.uri, e
354                );
355                ipc::bypass_copy_inputs_to_outputs(&self.audio_inputs, &self.audio_outputs);
356                return Vec::new();
357            }
358        }
359
360        // Copy output shared memory (bus 1) back to AudioIO buffers.
361        unsafe {
362            ipc::copy_outputs_from_shm(&self.audio_outputs, ptr, frames);
363        }
364
365        let elapsed = started.elapsed();
366        if elapsed > Duration::from_millis(20) {
367            tracing::warn!(
368                "Slow LV2 process '{}' ({}) took {:.3} ms for {} frames",
369                self.name,
370                self.uri,
371                elapsed.as_secs_f64() * 1000.0,
372                frames
373            );
374        }
375
376        *self.last_process_time.lock() = Instant::now();
377        Vec::new()
378    }
379
380    pub fn uri(&self) -> &str {
381        &self.uri
382    }
383
384    pub fn name(&self) -> &str {
385        &self.name
386    }
387
388    pub fn begin_parameter_edit_at(&self, _param_id: u32, _frame: u32) -> Result<(), String> {
389        Ok(())
390    }
391
392    pub fn end_parameter_edit_at(&self, _param_id: u32, _frame: u32) -> Result<(), String> {
393        Ok(())
394    }
395
396    pub fn run_host_callbacks_main_thread(&self) {}
397
398    pub fn reconfigure_ports_if_needed(&self) -> Result<bool, String> {
399        Ok(false)
400    }
401
402    pub fn gui_set_parent_x11(&self, window: usize) -> Result<(), String> {
403        if let Some(ref mapping) = self.mapping {
404            let header = unsafe { header_mut(mapping.as_ptr()) };
405            header.set_parent_window(window);
406            return Ok(());
407        }
408        Err("No active host to set parent window".to_string())
409    }
410
411    pub fn gui_show(&self) -> Result<(), String> {
412        if let Some(ref mapping) = self.mapping
413            && let Some(ref events) = self.events
414        {
415            let header = unsafe { header_mut(mapping.as_ptr()) };
416            header.request_type.store(3, Ordering::Release);
417            let _ = events.signal_host();
418            return Ok(());
419        }
420        Err("No active host to show GUI".to_string())
421    }
422
423    pub fn gui_hide(&self) {
424        if let Some(ref mapping) = self.mapping
425            && let Some(ref events) = self.events
426        {
427            let header = unsafe { header_mut(mapping.as_ptr()) };
428            header.request_type.store(4, Ordering::Release);
429            let _ = events.signal_host();
430        }
431    }
432
433    pub fn drain_echoed_parameters(&self) -> Vec<ParameterEvent> {
434        let mut result = Vec::new();
435        if let Some(ref mapping) = self.mapping {
436            let ring = unsafe {
437                let buf = echo_ring_ptr(mapping.as_ptr());
438                let (w, r) = echo_indices(mapping.as_ptr());
439                RingBuffer::new(buf, w, r, RING_CAPACITY)
440            };
441            while let Some(ev) = ring.pop() {
442                result.push(ev);
443            }
444        }
445        result
446    }
447
448    pub fn drain_midi_outputs(&self) -> Vec<crate::midi::io::MidiEvent> {
449        let mut result = Vec::new();
450        if let Some(ref mapping) = self.mapping {
451            let ring = unsafe {
452                let buf = midi_out_ring_ptr(mapping.as_ptr());
453                let (w, r) = midi_out_indices(mapping.as_ptr());
454                RingBuffer::new(buf, w, r, RING_CAPACITY)
455            };
456            while let Some(ev) = ring.pop() {
457                result.push(crate::midi::io::MidiEvent {
458                    frame: ev.sample_offset,
459                    data: ev.data.to_vec(),
460                });
461            }
462        }
463        result
464    }
465}
466
467impl Drop for Lv2Processor {
468    fn drop(&mut self) {
469        ipc::drop_host(&self.mapping, &self.events, &self.child, &self.shm_name);
470    }
471}
472
473crate::impl_ipc_processor_wrapper!(Lv2Processor);
474
475impl UnsafeMutex<Lv2Processor> {
476    pub fn process_with_midi(&self, frames: usize, midi_events: &[MidiEvent]) -> Vec<MidiEvent> {
477        self.lock().process_with_midi(frames, midi_events)
478    }
479
480    pub fn snapshot_state(&self) -> Result<Vec<u8>, String> {
481        self.lock().snapshot_state()
482    }
483
484    pub fn restore_state(&self, state: &[u8]) -> Result<(), String> {
485        self.lock().restore_state(state)
486    }
487
488    pub fn drain_echoed_parameters(&self) -> Vec<ParameterEvent> {
489        self.lock().drain_echoed_parameters()
490    }
491
492    pub fn drain_midi_outputs(&self) -> Vec<crate::midi::io::MidiEvent> {
493        self.lock().drain_midi_outputs()
494    }
495}
496
497#[cfg(test)]
498mod tests {
499    use super::*;
500
501    fn find_host_binary() -> PathBuf {
502        ipc::find_plugin_host_binary().expect("maolan-plugin-host binary should be built for tests")
503    }
504
505    #[test]
506    fn find_host_binary_locates_binary() {
507        let host_bin = find_host_binary();
508        assert!(
509            host_bin.exists(),
510            "plugin-host binary should exist at {}",
511            host_bin.display()
512        );
513    }
514
515    #[test]
516    fn lv2_processor_crash_bypass() {
517        let host_bin = find_host_binary();
518
519        let processor = Lv2Processor::new(48000.0, 256, "__crash__", 1, 1, host_bin)
520            .expect("should create LV2 processor for crash test");
521
522        processor.setup_audio_ports();
523
524        // Fill input buffer.
525        {
526            let buf = processor.audio_inputs()[0].buffer.lock();
527            buf.fill(1.0);
528            *processor.audio_inputs()[0].finished.lock() = true;
529        }
530
531        // First process should trigger the crash; subsequent calls should bypass.
532        processor.process_with_audio_io(256);
533
534        // After crash, output should be a copy of input (bypass).
535        let out_buf = processor.audio_outputs()[0].buffer.lock();
536        let first_few: Vec<f32> = out_buf.iter().take(10).copied().collect();
537        assert!(
538            out_buf.iter().all(|&s| s == 1.0),
539            "after crash, output should be bypass copy of input, got: {:?}",
540            first_few
541        );
542    }
543}