makepad_platform/os/linux/
pulse_audio.rs

1#![allow(dead_code)]
2use {
3    std::collections::HashSet,
4    std::sync::atomic::{AtomicU32, Ordering},
5    std::sync::{Arc, Mutex},
6    std::ffi::CStr,
7    std::os::raw::{
8        c_void,
9        c_int,
10    },
11    self::super::{
12        alsa_audio::AlsaAudioAccess,
13        pulse_sys::*,
14    },
15    crate::{
16        makepad_live_id::*,
17        thread::Signal,
18        audio::*,
19    }
20};
21
22struct PulseAudioDesc {
23    name: String,
24    desc: AudioDeviceDesc,
25}
26/*
27struct PulseAudioDevice {
28}
29*/
30
31struct AlsaAudioDeviceRef {
32    device_id: AudioDeviceId,
33    _is_terminated: bool,
34}
35
36enum ContextState {
37    Connecting,
38    Ready,
39    Failed
40}
41
42struct PulseInputStream {
43    device_id: AudioDeviceId,
44    stream: *mut pa_stream,
45}
46
47struct PulseInputStruct {
48    device_id: AudioDeviceId,
49    input_fn: Arc<Mutex<Option<AudioInputFn> > >,
50    audio_buffer: AudioBuffer,
51    ready_state: AtomicU32,
52}
53
54impl PulseInputStream {
55    unsafe fn new(device_id: AudioDeviceId, name: &str, index: usize, pulse: &PulseAudioAccess) -> PulseInputStream {
56        pa_threaded_mainloop_lock(pulse.main_loop);
57        let sample_spec = pa_sample_spec {
58            format: PA_SAMPLE_FLOAT32LE,
59            rate: 48000,
60            channels: 2
61        };
62        
63        let stream = pa_stream_new(pulse.context, "makepad input stream\0".as_ptr(), &sample_spec, std::ptr::null());
64        if stream == std::ptr::null_mut() {
65            panic!("pa_stream_new failed");
66        }
67        let input_ptr = Box::into_raw(Box::new(PulseInputStruct {
68            device_id,
69            ready_state: AtomicU32::new(0),
70            input_fn: pulse.audio_input_cb[index].clone(),
71            audio_buffer: AudioBuffer::default()
72        }));
73        pa_stream_set_state_callback(stream, Some(Self::recording_stream_state_callback), input_ptr as *mut _);
74        pa_stream_set_read_callback(stream, Some(Self::recording_stream_read_callback), input_ptr as *mut _);
75        
76        let buffer_attr = pa_buffer_attr {
77            maxlength: std::u32::MAX,
78            tlength: (8 * pulse.buffer_frames) as u32,
79            prebuf: 0,
80            minreq: std::u32::MAX,
81            fragsize: std::u32::MAX,
82        };
83        let flags = PA_STREAM_ADJUST_LATENCY;
84        
85        pa_stream_connect_record(
86            stream,
87            format!("{}\0", name).as_ptr(),
88            &buffer_attr,
89            flags,
90        );
91        
92        pa_threaded_mainloop_unlock(pulse.main_loop);
93        
94        loop {
95            let ready_state = (*input_ptr).ready_state.load(Ordering::Relaxed);
96            if ready_state == 1 {
97                break;
98            }
99            if ready_state == 2 {
100                panic!("STREAM CANNOT BE STARTED");
101            }
102            pa_threaded_mainloop_wait(pulse.main_loop);
103        }
104        
105        Self {
106            device_id,
107            stream
108        }
109    }
110    
111    pub unsafe fn terminate(self, pulse: &PulseAudioAccess) {
112        pa_threaded_mainloop_lock(pulse.main_loop);
113        pa_stream_set_write_callback(self.stream, None, std::ptr::null_mut());
114        pa_stream_set_state_callback(self.stream, None, std::ptr::null_mut());
115        pa_stream_disconnect(self.stream);
116        pa_stream_unref(self.stream);
117        pa_threaded_mainloop_unlock(pulse.main_loop);
118    }
119    
120    unsafe extern "C" fn recording_stream_read_callback (
121        stream: *mut pa_stream,
122        _nbytes: usize,
123        input_ptr: *mut c_void
124    ) {
125        let input = &mut*(input_ptr as *mut PulseInputStruct);
126        let mut read_ptr: *mut f32 = std::ptr::null_mut();
127        let mut byte_count = 0;
128        if pa_stream_peek(stream, &mut read_ptr as *mut _ as *mut _, &mut byte_count) != 0{
129            println!("pa_stream_peek failed");
130            return
131        }
132        if byte_count == 0{
133            return
134        }
135        let mut input_fn = (*input).input_fn.lock().unwrap();
136        if let Some(input_fn) = &mut *input_fn {
137            let interleaved = std::slice::from_raw_parts(read_ptr, byte_count / 4);
138            input.audio_buffer.copy_from_interleaved(2, interleaved);
139            input_fn(AudioInfo {
140                device_id: input.device_id,
141                time: None
142            }, &input.audio_buffer);
143        }        
144        pa_stream_drop(stream);
145    }
146    
147    unsafe extern "C" fn recording_stream_state_callback (
148        stream: *mut pa_stream,
149        output_ptr: *mut c_void
150    ) {
151        let input_ptr = output_ptr as *mut PulseOutputStruct;
152        let state = pa_stream_get_state(stream);
153        match state {
154            PA_STREAM_UNCONNECTED => (),
155            PA_STREAM_CREATING => (),
156            PA_STREAM_READY => {
157                (*input_ptr).ready_state.store(1, Ordering::Relaxed)
158            },
159            PA_STREAM_FAILED => {
160                (*input_ptr).ready_state.store(2, Ordering::Relaxed)
161            },
162            PA_STREAM_TERMINATED => {
163                let _ = Box::from_raw(output_ptr);
164            },
165            _ => panic!()
166        }
167    }
168}
169
170struct PulseOutputStream {
171    device_id: AudioDeviceId,
172    stream: *mut pa_stream,
173}
174
175struct PulseOutputStruct {
176    device_id: AudioDeviceId,
177    output_fn: Arc<Mutex<Option<AudioOutputFn> > >,
178    write_byte_count: usize,
179    clear_on_read: bool,
180    ready_state: AtomicU32,
181    audio_buffer: AudioBuffer,
182}
183
184impl PulseOutputStream {
185    unsafe fn new(device_id: AudioDeviceId, name: &str, index: usize, pulse: &PulseAudioAccess) -> Option<PulseOutputStream> {
186        
187        pa_threaded_mainloop_lock(pulse.main_loop);
188        let sample_spec = pa_sample_spec {
189            format: PA_SAMPLE_FLOAT32LE,
190            rate: 48000,
191            channels: 2
192        };
193        
194        let stream = pa_stream_new(pulse.context, "makepad output stream\0".as_ptr(), &sample_spec, std::ptr::null());
195        if stream == std::ptr::null_mut() {
196            panic!("pa_stream_new failed");
197        }
198        
199        let output_ptr = Box::into_raw(Box::new(PulseOutputStruct {
200            device_id,
201            clear_on_read: true,
202            output_fn: pulse.audio_output_cb[index].clone(),
203            write_byte_count: 0,
204            ready_state: AtomicU32::new(0),
205            audio_buffer: AudioBuffer::default()
206        }));
207        pa_stream_set_state_callback(stream, Some(Self::playback_stream_state_callback), output_ptr as *mut _);
208        
209        let buffer_attr = pa_buffer_attr {
210            maxlength: std::u32::MAX,
211            tlength: (8 * pulse.buffer_frames) as u32,
212            prebuf: 0,
213            minreq: std::u32::MAX,
214            fragsize: std::u32::MAX,
215        };
216        let flags = PA_STREAM_ADJUST_LATENCY | PA_STREAM_START_CORKED | PA_STREAM_START_UNMUTED;
217        
218        pa_stream_connect_playback(
219            stream,
220            format!("{}\0", name).as_ptr(),
221            &buffer_attr,
222            flags,
223            std::ptr::null(),
224            std::ptr::null_mut()
225        );
226        
227        pa_threaded_mainloop_unlock(pulse.main_loop);
228        
229        loop {
230            let ready_state = (*output_ptr).ready_state.load(Ordering::Relaxed);
231            if ready_state == 1 {
232                break;
233            }
234            if ready_state == 2 {
235                // ok here we return None
236                Self::terminate_stream(stream, pulse);
237                return None
238            }
239            pa_threaded_mainloop_wait(pulse.main_loop);
240        }
241        
242        (*output_ptr).write_byte_count = pa_stream_writable_size(stream);
243        
244        pa_stream_set_write_callback(stream, Some(Self::playback_stream_write_callback), output_ptr as *mut _);
245        
246        let op = pa_stream_cork(stream, 0, None, std::ptr::null_mut());
247        if op == std::ptr::null_mut() {
248            panic!("pa_stream_cork failed");
249        }
250        pa_operation_unref(op);
251        
252        Some(Self {
253            device_id,
254            stream
255        })
256    }
257
258    pub fn terminate(&self, pulse: &PulseAudioAccess) {
259        unsafe{Self::terminate_stream(self.stream, pulse)};
260    }
261    
262    pub unsafe fn terminate_stream(stream:*mut pa_stream, pulse: &PulseAudioAccess) {
263        pa_threaded_mainloop_lock(pulse.main_loop);
264        pa_stream_set_write_callback(stream, None, std::ptr::null_mut());
265        pa_stream_set_state_callback(stream, None, std::ptr::null_mut());
266        pa_stream_disconnect(stream);
267        pa_stream_unref(stream);
268        pa_threaded_mainloop_unlock(pulse.main_loop);
269    }
270    
271    unsafe extern "C" fn playback_stream_write_callback (
272        stream: *mut pa_stream,
273        _nbytes: usize,
274        output_ptr: *mut c_void
275    ) {
276        let output = &mut*(output_ptr as *mut PulseOutputStruct);
277        let mut write_ptr = std::ptr::null_mut();
278        let mut write_byte_count = output.write_byte_count;
279        if pa_stream_begin_write(stream, &mut write_ptr, &mut write_byte_count) != 0 {
280            panic!("pa_stream_begin_write");
281        }
282        if write_byte_count == output.write_byte_count {
283            let mut output_fn = (*output).output_fn.lock().unwrap();
284            output.audio_buffer.resize(output.write_byte_count / 8, 2);
285            if let Some(output_fn) = &mut *output_fn {
286                output_fn(AudioInfo {
287                    device_id: output.device_id,
288                    time: None
289                }, &mut output.audio_buffer);
290                // lets copy it to interleaved format
291                let interleaved = std::slice::from_raw_parts_mut(write_ptr as *mut f32, output.write_byte_count / 4);
292                output.audio_buffer.copy_to_interleaved(interleaved);
293            }
294        }
295        else {
296            println!("Pulse audio buffer size unexpected");
297        }
298        let flags = if output.clear_on_read {
299            output.clear_on_read = false;
300            PA_SEEK_RELATIVE_ON_READ
301        }
302        else {
303            PA_SEEK_RELATIVE
304        };
305        
306        if pa_stream_write(stream, write_ptr, write_byte_count, None, 0, flags) != 0 {
307            panic!("pa_stream_write");
308        }
309    }
310    
311    unsafe extern "C" fn playback_stream_state_callback (
312        stream: *mut pa_stream,
313        output_ptr: *mut c_void
314    ) {
315        let output_ptr = output_ptr as *mut PulseOutputStruct;
316        let state = pa_stream_get_state(stream);
317        match state {
318            PA_STREAM_UNCONNECTED => (),
319            PA_STREAM_CREATING => (),
320            PA_STREAM_READY => {
321                (*output_ptr).ready_state.store(1, Ordering::Relaxed)
322            },
323            PA_STREAM_FAILED => {
324                (*output_ptr).ready_state.store(2, Ordering::Relaxed)
325            },
326            PA_STREAM_TERMINATED => {
327                let _ = Box::from_raw(output_ptr);
328            },
329            _ => panic!()
330        }
331    }
332}
333
334pub struct PulseAudioAccess {
335    pub audio_input_cb: [Arc<Mutex<Option<AudioInputFn> > >; MAX_AUDIO_DEVICE_INDEX],
336    pub audio_output_cb: [Arc<Mutex<Option<AudioOutputFn> > >; MAX_AUDIO_DEVICE_INDEX],
337    
338    buffer_frames: usize,
339    
340    audio_outputs: Vec<PulseOutputStream>,
341    audio_inputs: Vec<PulseInputStream>,
342    device_query: Option<PulseDeviceQuery>,
343    
344    device_descs: Vec<PulseAudioDesc>,
345    change_signal: Signal,
346    context_state: ContextState,
347    main_loop: *mut pa_threaded_mainloop,
348    main_loop_api: *mut pa_mainloop_api,
349    context: *mut pa_context,
350    self_ptr: *const Mutex<PulseAudioAccess>,
351    
352    failed_devices: HashSet<AudioDeviceId>,
353}
354
355struct PulseDeviceDesc {
356    name: String,
357    description: String,
358    _index: u32,
359}
360
361#[derive(Default)]
362struct PulseDeviceQuery {
363    main_loop: Option<*mut pa_threaded_mainloop>,
364    sink_list: Vec<PulseDeviceDesc>,
365    source_list: Vec<PulseDeviceDesc>,
366    default_sink: Option<String>,
367    default_source: Option<String>,
368}
369
370impl PulseAudioAccess {
371    pub fn new(change_signal: Signal, alsa_audio: &AlsaAudioAccess) -> Arc<Mutex<Self >> {
372        unsafe {
373            let main_loop = pa_threaded_mainloop_new();
374            let main_loop_api = pa_threaded_mainloop_get_api(main_loop);
375            let prop_list = pa_proplist_new();
376            let context = pa_context_new_with_proplist(main_loop_api, "makepad\0".as_ptr(), prop_list);
377            
378            let pulse = Arc::new(Mutex::new(
379                PulseAudioAccess {
380                    buffer_frames: 256,
381                    audio_inputs: Vec::new(),
382                    audio_outputs: Vec::new(),
383                    change_signal,
384                    device_query: None,
385                    context: context.clone(),
386                    main_loop,
387                    main_loop_api,
388                    context_state: ContextState::Connecting,
389                    audio_input_cb: alsa_audio.audio_input_cb.clone(),
390                    audio_output_cb: alsa_audio.audio_output_cb.clone(),
391                    device_descs: Default::default(),
392                    self_ptr: std::ptr::null(),
393                    failed_devices: Default::default(),
394                }
395            ));
396            let self_ptr = Arc::into_raw(pulse.clone());
397            (*self_ptr).lock().unwrap().self_ptr = self_ptr;
398            
399            pa_context_set_state_callback(context, Some(Self::context_state_callback), self_ptr as *mut _);
400            if pa_context_connect(context, std::ptr::null(), 0, std::ptr::null()) != 0 {
401                panic!("Pulse audio pa_context_connect failed");
402            };
403            if pa_threaded_mainloop_start(main_loop) != 0 {
404                panic!("Pulse audio pa_threaded_mainloop_start failed");
405            }
406            
407            pa_threaded_mainloop_lock(main_loop);
408            loop {
409                if let ContextState::Connecting = pulse.lock().unwrap().context_state {}
410                else {
411                    break;
412                }
413                pa_threaded_mainloop_wait(main_loop);
414            }
415            
416            pa_threaded_mainloop_unlock(main_loop);
417            pulse
418        }
419    }
420    /*
421    pub fn destroy(&self){
422        pa_threaded_mainloop_stop(self.main_loop);
423        pa_context_unref(self.context);
424        pa_context_disconnect(self.context);
425        pa_threaded_mainloop_free(self.main_loop);
426    }*/
427    
428    unsafe extern "C" fn subscribe_callback (
429        _c: *mut pa_context,
430        _event_bits: pa_subscription_event_type_t,
431        _index: u32,
432        pulse_ptr: *mut c_void
433    ) {
434        let pulse: &Mutex<PulseAudioAccess> = &*(pulse_ptr as *const _);
435        let pulse = pulse.lock().unwrap();
436        pulse.change_signal.set();
437        pa_threaded_mainloop_signal(pulse.main_loop, 0);
438    }
439    
440    unsafe extern "C" fn context_state_callback (
441        c: *mut pa_context,
442        pulse_ptr: *mut c_void
443    ) {
444        let pulse: &Mutex<PulseAudioAccess> = &*(pulse_ptr as *mut _);
445        let state = pa_context_get_state(c);
446        
447        match state {
448            PA_CONTEXT_READY => {
449                pulse.lock().unwrap().context_state = ContextState::Ready;
450                let main_loop = pulse.lock().unwrap().main_loop;
451                pa_threaded_mainloop_signal(main_loop, 0);
452            }
453            PA_CONTEXT_FAILED | PA_CONTEXT_TERMINATED => {
454                pulse.lock().unwrap().context_state = ContextState::Failed;
455                let main_loop = pulse.lock().unwrap().main_loop;
456                pa_threaded_mainloop_signal(main_loop, 0);
457            },
458            _ => (),
459        }
460    }
461    
462    unsafe extern "C" fn sink_info_callback(
463        _ctx: *mut pa_context,
464        info: *const pa_sink_info,
465        eol: c_int,
466        query_ptr: *mut c_void,
467    ) {
468        let query: &mut PulseDeviceQuery = &mut *(query_ptr as *mut _);
469        if eol>0 {
470            pa_threaded_mainloop_signal(query.main_loop.unwrap(), 0);
471            return
472        }
473        query.sink_list.push(PulseDeviceDesc {
474            name: CStr::from_ptr((*info).name).to_str().unwrap().to_string(),
475            description: CStr::from_ptr((*info).description).to_str().unwrap().to_string(),
476            _index: (*info).index
477        })
478    }
479    
480    unsafe extern "C" fn source_info_callback(
481        _ctx: *mut pa_context,
482        info: *const pa_source_info,
483        eol: c_int,
484        query_ptr: *mut c_void,
485    ) {
486        let query: &mut PulseDeviceQuery = &mut *(query_ptr as *mut _);
487        if eol>0 {
488            pa_threaded_mainloop_signal(query.main_loop.unwrap(), 0);
489            return
490        }
491        query.source_list.push(PulseDeviceDesc {
492            name: CStr::from_ptr((*info).name).to_str().unwrap().to_string(),
493            description: CStr::from_ptr((*info).description).to_str().unwrap().to_string(),
494            _index: (*info).index
495        })
496    }
497    
498    unsafe extern "C" fn server_info_callback(
499        _ctx: *mut pa_context,
500        info: *const pa_server_info,
501        query_ptr: *mut c_void,
502    ) {
503        let query: &mut PulseDeviceQuery = &mut *(query_ptr as *mut _);
504        query.default_sink = Some(CStr::from_ptr((*info).default_sink_name).to_str().unwrap().to_string());
505        query.default_source = Some(CStr::from_ptr((*info).default_source_name).to_str().unwrap().to_string());
506    }
507    
508    pub fn get_updated_descs(&mut self) -> Vec<AudioDeviceDesc> {
509        // ok lets enumerate pulse audio
510        unsafe {
511            let mut query = PulseDeviceQuery::default();
512            query.main_loop = Some(self.main_loop);
513            let sink_op = pa_context_get_sink_info_list(self.context, Some(Self::sink_info_callback), &mut query as *mut _ as *mut _);
514            let source_op = pa_context_get_source_info_list(self.context, Some(Self::source_info_callback), &mut query as *mut _ as *mut _);
515            let server_op = pa_context_get_server_info(self.context, Some(Self::server_info_callback), &mut query as *mut _ as *mut _);
516            while pa_operation_get_state(sink_op) == PA_OPERATION_RUNNING ||
517            pa_operation_get_state(source_op) == PA_OPERATION_RUNNING ||
518            pa_operation_get_state(server_op) == PA_OPERATION_RUNNING {
519                pa_threaded_mainloop_wait(self.main_loop);
520            }
521            pa_operation_unref(sink_op);
522            pa_operation_unref(source_op);
523            pa_operation_unref(server_op);
524            // lets add some input/output devices
525            let mut out = Vec::new();
526            let mut device_descs = Vec::new();
527            for source in query.source_list {
528                let device_id = LiveId::from_str(&source.name).into();
529                out.push(AudioDeviceDesc {
530                    has_failed: self.failed_devices.contains(&device_id),
531                    device_id,
532                    device_type: AudioDeviceType::Input,
533                    is_default: Some(&source.name) == query.default_source.as_ref(),
534                    channel_count: 2,
535                    name: format!("[Pulse Audio] {}", source.description)
536                });
537                device_descs.push(PulseAudioDesc {
538                    name: source.name.clone(),
539                    desc: out.last().cloned().unwrap()
540                });
541            }
542            for sink in query.sink_list {
543                let device_id = LiveId::from_str(&sink.name).into();
544                out.push(AudioDeviceDesc {
545                    has_failed: self.failed_devices.contains(&device_id),
546                    device_id,
547                    device_type: AudioDeviceType::Output,
548                    is_default: Some(&sink.name) == query.default_sink.as_ref(),
549                    channel_count: 2,
550                    name: format!("[Pulse Audio] {}", sink.description)
551                });
552                device_descs.push(PulseAudioDesc {
553                    name: sink.name.clone(),
554                    desc: out.last().cloned().unwrap()
555                });
556            }
557            self.device_descs = device_descs;
558            out
559        }
560        
561    }
562    
563    pub fn use_audio_inputs(&mut self, devices: &[AudioDeviceId]) {
564        let new = {
565            let mut i = 0;
566            while i < self.audio_inputs.len() {
567                if !devices.contains(&self.audio_inputs[i].device_id) {
568                    let item = self.audio_inputs.remove(i);
569                    unsafe {item.terminate(self)};
570                }
571                else {
572                    i += 1;
573                }
574            }
575            // create the new ones
576            let mut new = Vec::new();
577            for (index, device_id) in devices.iter().enumerate() {
578                if self.audio_outputs.iter().find( | v | v.device_id == *device_id).is_none() {
579                    if let Some(v) = self.device_descs.iter().find( | v | v.desc.device_id == *device_id) {
580                        new.push((index, *device_id, &v.name))
581                    }
582                }
583            }
584            new
585            
586        };
587        for (index, device_id, name) in new {
588            let new_input = unsafe {PulseInputStream::new(device_id, name, index, self)};
589            self.audio_inputs.push(new_input);
590        }
591    }
592    
593    pub fn use_audio_outputs(&mut self, devices: &[AudioDeviceId]) {
594        let new = {
595            let mut i = 0;
596            while i < self.audio_outputs.len() {
597                if !devices.contains(&self.audio_outputs[i].device_id) {
598                    let item = self.audio_outputs.remove(i);
599                    item.terminate(self);
600                }
601                else {
602                    i += 1;
603                }
604            }
605            // create the new ones
606            let mut new = Vec::new();
607            for (index, device_id) in devices.iter().enumerate() {
608                if self.audio_outputs.iter().find( | v | v.device_id == *device_id).is_none() {
609                    if let Some(v) = self.device_descs.iter().find( | v | v.desc.device_id == *device_id) {
610                        new.push((index, *device_id, &v.name))
611                    }
612                }
613            }
614            new
615            
616        };
617        for (index, device_id, name) in new {
618            if let Some(new_output) = unsafe {PulseOutputStream::new(device_id, name, index, self)}{
619                self.audio_outputs.push(new_output);
620            }
621            else{
622                self.failed_devices.insert(device_id);
623                self.change_signal.set();
624            }
625        }
626    }
627}