1#![allow(dead_code)]
2use {
3 std::collections::HashSet,
4 std::sync::atomic::{AtomicU32, Ordering},
5 std::sync::{Arc, Mutex},
6 std::ffi::CStr,
7 std::os::raw::{
8 c_void,
9 c_int,
10 },
11 self::super::{
12 alsa_audio::AlsaAudioAccess,
13 pulse_sys::*,
14 },
15 crate::{
16 makepad_live_id::*,
17 thread::Signal,
18 audio::*,
19 }
20};
21
22struct PulseAudioDesc {
23 name: String,
24 desc: AudioDeviceDesc,
25}
26struct AlsaAudioDeviceRef {
32 device_id: AudioDeviceId,
33 _is_terminated: bool,
34}
35
36enum ContextState {
37 Connecting,
38 Ready,
39 Failed
40}
41
42struct PulseInputStream {
43 device_id: AudioDeviceId,
44 stream: *mut pa_stream,
45}
46
47struct PulseInputStruct {
48 device_id: AudioDeviceId,
49 input_fn: Arc<Mutex<Option<AudioInputFn> > >,
50 audio_buffer: AudioBuffer,
51 ready_state: AtomicU32,
52}
53
54impl PulseInputStream {
55 unsafe fn new(device_id: AudioDeviceId, name: &str, index: usize, pulse: &PulseAudioAccess) -> PulseInputStream {
56 pa_threaded_mainloop_lock(pulse.main_loop);
57 let sample_spec = pa_sample_spec {
58 format: PA_SAMPLE_FLOAT32LE,
59 rate: 48000,
60 channels: 2
61 };
62
63 let stream = pa_stream_new(pulse.context, "makepad input stream\0".as_ptr(), &sample_spec, std::ptr::null());
64 if stream == std::ptr::null_mut() {
65 panic!("pa_stream_new failed");
66 }
67 let input_ptr = Box::into_raw(Box::new(PulseInputStruct {
68 device_id,
69 ready_state: AtomicU32::new(0),
70 input_fn: pulse.audio_input_cb[index].clone(),
71 audio_buffer: AudioBuffer::default()
72 }));
73 pa_stream_set_state_callback(stream, Some(Self::recording_stream_state_callback), input_ptr as *mut _);
74 pa_stream_set_read_callback(stream, Some(Self::recording_stream_read_callback), input_ptr as *mut _);
75
76 let buffer_attr = pa_buffer_attr {
77 maxlength: std::u32::MAX,
78 tlength: (8 * pulse.buffer_frames) as u32,
79 prebuf: 0,
80 minreq: std::u32::MAX,
81 fragsize: std::u32::MAX,
82 };
83 let flags = PA_STREAM_ADJUST_LATENCY;
84
85 pa_stream_connect_record(
86 stream,
87 format!("{}\0", name).as_ptr(),
88 &buffer_attr,
89 flags,
90 );
91
92 pa_threaded_mainloop_unlock(pulse.main_loop);
93
94 loop {
95 let ready_state = (*input_ptr).ready_state.load(Ordering::Relaxed);
96 if ready_state == 1 {
97 break;
98 }
99 if ready_state == 2 {
100 panic!("STREAM CANNOT BE STARTED");
101 }
102 pa_threaded_mainloop_wait(pulse.main_loop);
103 }
104
105 Self {
106 device_id,
107 stream
108 }
109 }
110
111 pub unsafe fn terminate(self, pulse: &PulseAudioAccess) {
112 pa_threaded_mainloop_lock(pulse.main_loop);
113 pa_stream_set_write_callback(self.stream, None, std::ptr::null_mut());
114 pa_stream_set_state_callback(self.stream, None, std::ptr::null_mut());
115 pa_stream_disconnect(self.stream);
116 pa_stream_unref(self.stream);
117 pa_threaded_mainloop_unlock(pulse.main_loop);
118 }
119
120 unsafe extern "C" fn recording_stream_read_callback (
121 stream: *mut pa_stream,
122 _nbytes: usize,
123 input_ptr: *mut c_void
124 ) {
125 let input = &mut*(input_ptr as *mut PulseInputStruct);
126 let mut read_ptr: *mut f32 = std::ptr::null_mut();
127 let mut byte_count = 0;
128 if pa_stream_peek(stream, &mut read_ptr as *mut _ as *mut _, &mut byte_count) != 0{
129 println!("pa_stream_peek failed");
130 return
131 }
132 if byte_count == 0{
133 return
134 }
135 let mut input_fn = (*input).input_fn.lock().unwrap();
136 if let Some(input_fn) = &mut *input_fn {
137 let interleaved = std::slice::from_raw_parts(read_ptr, byte_count / 4);
138 input.audio_buffer.copy_from_interleaved(2, interleaved);
139 input_fn(AudioInfo {
140 device_id: input.device_id,
141 time: None
142 }, &input.audio_buffer);
143 }
144 pa_stream_drop(stream);
145 }
146
147 unsafe extern "C" fn recording_stream_state_callback (
148 stream: *mut pa_stream,
149 output_ptr: *mut c_void
150 ) {
151 let input_ptr = output_ptr as *mut PulseOutputStruct;
152 let state = pa_stream_get_state(stream);
153 match state {
154 PA_STREAM_UNCONNECTED => (),
155 PA_STREAM_CREATING => (),
156 PA_STREAM_READY => {
157 (*input_ptr).ready_state.store(1, Ordering::Relaxed)
158 },
159 PA_STREAM_FAILED => {
160 (*input_ptr).ready_state.store(2, Ordering::Relaxed)
161 },
162 PA_STREAM_TERMINATED => {
163 let _ = Box::from_raw(output_ptr);
164 },
165 _ => panic!()
166 }
167 }
168}
169
170struct PulseOutputStream {
171 device_id: AudioDeviceId,
172 stream: *mut pa_stream,
173}
174
175struct PulseOutputStruct {
176 device_id: AudioDeviceId,
177 output_fn: Arc<Mutex<Option<AudioOutputFn> > >,
178 write_byte_count: usize,
179 clear_on_read: bool,
180 ready_state: AtomicU32,
181 audio_buffer: AudioBuffer,
182}
183
184impl PulseOutputStream {
185 unsafe fn new(device_id: AudioDeviceId, name: &str, index: usize, pulse: &PulseAudioAccess) -> Option<PulseOutputStream> {
186
187 pa_threaded_mainloop_lock(pulse.main_loop);
188 let sample_spec = pa_sample_spec {
189 format: PA_SAMPLE_FLOAT32LE,
190 rate: 48000,
191 channels: 2
192 };
193
194 let stream = pa_stream_new(pulse.context, "makepad output stream\0".as_ptr(), &sample_spec, std::ptr::null());
195 if stream == std::ptr::null_mut() {
196 panic!("pa_stream_new failed");
197 }
198
199 let output_ptr = Box::into_raw(Box::new(PulseOutputStruct {
200 device_id,
201 clear_on_read: true,
202 output_fn: pulse.audio_output_cb[index].clone(),
203 write_byte_count: 0,
204 ready_state: AtomicU32::new(0),
205 audio_buffer: AudioBuffer::default()
206 }));
207 pa_stream_set_state_callback(stream, Some(Self::playback_stream_state_callback), output_ptr as *mut _);
208
209 let buffer_attr = pa_buffer_attr {
210 maxlength: std::u32::MAX,
211 tlength: (8 * pulse.buffer_frames) as u32,
212 prebuf: 0,
213 minreq: std::u32::MAX,
214 fragsize: std::u32::MAX,
215 };
216 let flags = PA_STREAM_ADJUST_LATENCY | PA_STREAM_START_CORKED | PA_STREAM_START_UNMUTED;
217
218 pa_stream_connect_playback(
219 stream,
220 format!("{}\0", name).as_ptr(),
221 &buffer_attr,
222 flags,
223 std::ptr::null(),
224 std::ptr::null_mut()
225 );
226
227 pa_threaded_mainloop_unlock(pulse.main_loop);
228
229 loop {
230 let ready_state = (*output_ptr).ready_state.load(Ordering::Relaxed);
231 if ready_state == 1 {
232 break;
233 }
234 if ready_state == 2 {
235 Self::terminate_stream(stream, pulse);
237 return None
238 }
239 pa_threaded_mainloop_wait(pulse.main_loop);
240 }
241
242 (*output_ptr).write_byte_count = pa_stream_writable_size(stream);
243
244 pa_stream_set_write_callback(stream, Some(Self::playback_stream_write_callback), output_ptr as *mut _);
245
246 let op = pa_stream_cork(stream, 0, None, std::ptr::null_mut());
247 if op == std::ptr::null_mut() {
248 panic!("pa_stream_cork failed");
249 }
250 pa_operation_unref(op);
251
252 Some(Self {
253 device_id,
254 stream
255 })
256 }
257
258 pub fn terminate(&self, pulse: &PulseAudioAccess) {
259 unsafe{Self::terminate_stream(self.stream, pulse)};
260 }
261
262 pub unsafe fn terminate_stream(stream:*mut pa_stream, pulse: &PulseAudioAccess) {
263 pa_threaded_mainloop_lock(pulse.main_loop);
264 pa_stream_set_write_callback(stream, None, std::ptr::null_mut());
265 pa_stream_set_state_callback(stream, None, std::ptr::null_mut());
266 pa_stream_disconnect(stream);
267 pa_stream_unref(stream);
268 pa_threaded_mainloop_unlock(pulse.main_loop);
269 }
270
271 unsafe extern "C" fn playback_stream_write_callback (
272 stream: *mut pa_stream,
273 _nbytes: usize,
274 output_ptr: *mut c_void
275 ) {
276 let output = &mut*(output_ptr as *mut PulseOutputStruct);
277 let mut write_ptr = std::ptr::null_mut();
278 let mut write_byte_count = output.write_byte_count;
279 if pa_stream_begin_write(stream, &mut write_ptr, &mut write_byte_count) != 0 {
280 panic!("pa_stream_begin_write");
281 }
282 if write_byte_count == output.write_byte_count {
283 let mut output_fn = (*output).output_fn.lock().unwrap();
284 output.audio_buffer.resize(output.write_byte_count / 8, 2);
285 if let Some(output_fn) = &mut *output_fn {
286 output_fn(AudioInfo {
287 device_id: output.device_id,
288 time: None
289 }, &mut output.audio_buffer);
290 let interleaved = std::slice::from_raw_parts_mut(write_ptr as *mut f32, output.write_byte_count / 4);
292 output.audio_buffer.copy_to_interleaved(interleaved);
293 }
294 }
295 else {
296 println!("Pulse audio buffer size unexpected");
297 }
298 let flags = if output.clear_on_read {
299 output.clear_on_read = false;
300 PA_SEEK_RELATIVE_ON_READ
301 }
302 else {
303 PA_SEEK_RELATIVE
304 };
305
306 if pa_stream_write(stream, write_ptr, write_byte_count, None, 0, flags) != 0 {
307 panic!("pa_stream_write");
308 }
309 }
310
311 unsafe extern "C" fn playback_stream_state_callback (
312 stream: *mut pa_stream,
313 output_ptr: *mut c_void
314 ) {
315 let output_ptr = output_ptr as *mut PulseOutputStruct;
316 let state = pa_stream_get_state(stream);
317 match state {
318 PA_STREAM_UNCONNECTED => (),
319 PA_STREAM_CREATING => (),
320 PA_STREAM_READY => {
321 (*output_ptr).ready_state.store(1, Ordering::Relaxed)
322 },
323 PA_STREAM_FAILED => {
324 (*output_ptr).ready_state.store(2, Ordering::Relaxed)
325 },
326 PA_STREAM_TERMINATED => {
327 let _ = Box::from_raw(output_ptr);
328 },
329 _ => panic!()
330 }
331 }
332}
333
334pub struct PulseAudioAccess {
335 pub audio_input_cb: [Arc<Mutex<Option<AudioInputFn> > >; MAX_AUDIO_DEVICE_INDEX],
336 pub audio_output_cb: [Arc<Mutex<Option<AudioOutputFn> > >; MAX_AUDIO_DEVICE_INDEX],
337
338 buffer_frames: usize,
339
340 audio_outputs: Vec<PulseOutputStream>,
341 audio_inputs: Vec<PulseInputStream>,
342 device_query: Option<PulseDeviceQuery>,
343
344 device_descs: Vec<PulseAudioDesc>,
345 change_signal: Signal,
346 context_state: ContextState,
347 main_loop: *mut pa_threaded_mainloop,
348 main_loop_api: *mut pa_mainloop_api,
349 context: *mut pa_context,
350 self_ptr: *const Mutex<PulseAudioAccess>,
351
352 failed_devices: HashSet<AudioDeviceId>,
353}
354
355struct PulseDeviceDesc {
356 name: String,
357 description: String,
358 _index: u32,
359}
360
361#[derive(Default)]
362struct PulseDeviceQuery {
363 main_loop: Option<*mut pa_threaded_mainloop>,
364 sink_list: Vec<PulseDeviceDesc>,
365 source_list: Vec<PulseDeviceDesc>,
366 default_sink: Option<String>,
367 default_source: Option<String>,
368}
369
370impl PulseAudioAccess {
371 pub fn new(change_signal: Signal, alsa_audio: &AlsaAudioAccess) -> Arc<Mutex<Self >> {
372 unsafe {
373 let main_loop = pa_threaded_mainloop_new();
374 let main_loop_api = pa_threaded_mainloop_get_api(main_loop);
375 let prop_list = pa_proplist_new();
376 let context = pa_context_new_with_proplist(main_loop_api, "makepad\0".as_ptr(), prop_list);
377
378 let pulse = Arc::new(Mutex::new(
379 PulseAudioAccess {
380 buffer_frames: 256,
381 audio_inputs: Vec::new(),
382 audio_outputs: Vec::new(),
383 change_signal,
384 device_query: None,
385 context: context.clone(),
386 main_loop,
387 main_loop_api,
388 context_state: ContextState::Connecting,
389 audio_input_cb: alsa_audio.audio_input_cb.clone(),
390 audio_output_cb: alsa_audio.audio_output_cb.clone(),
391 device_descs: Default::default(),
392 self_ptr: std::ptr::null(),
393 failed_devices: Default::default(),
394 }
395 ));
396 let self_ptr = Arc::into_raw(pulse.clone());
397 (*self_ptr).lock().unwrap().self_ptr = self_ptr;
398
399 pa_context_set_state_callback(context, Some(Self::context_state_callback), self_ptr as *mut _);
400 if pa_context_connect(context, std::ptr::null(), 0, std::ptr::null()) != 0 {
401 panic!("Pulse audio pa_context_connect failed");
402 };
403 if pa_threaded_mainloop_start(main_loop) != 0 {
404 panic!("Pulse audio pa_threaded_mainloop_start failed");
405 }
406
407 pa_threaded_mainloop_lock(main_loop);
408 loop {
409 if let ContextState::Connecting = pulse.lock().unwrap().context_state {}
410 else {
411 break;
412 }
413 pa_threaded_mainloop_wait(main_loop);
414 }
415
416 pa_threaded_mainloop_unlock(main_loop);
417 pulse
418 }
419 }
420 unsafe extern "C" fn subscribe_callback (
429 _c: *mut pa_context,
430 _event_bits: pa_subscription_event_type_t,
431 _index: u32,
432 pulse_ptr: *mut c_void
433 ) {
434 let pulse: &Mutex<PulseAudioAccess> = &*(pulse_ptr as *const _);
435 let pulse = pulse.lock().unwrap();
436 pulse.change_signal.set();
437 pa_threaded_mainloop_signal(pulse.main_loop, 0);
438 }
439
440 unsafe extern "C" fn context_state_callback (
441 c: *mut pa_context,
442 pulse_ptr: *mut c_void
443 ) {
444 let pulse: &Mutex<PulseAudioAccess> = &*(pulse_ptr as *mut _);
445 let state = pa_context_get_state(c);
446
447 match state {
448 PA_CONTEXT_READY => {
449 pulse.lock().unwrap().context_state = ContextState::Ready;
450 let main_loop = pulse.lock().unwrap().main_loop;
451 pa_threaded_mainloop_signal(main_loop, 0);
452 }
453 PA_CONTEXT_FAILED | PA_CONTEXT_TERMINATED => {
454 pulse.lock().unwrap().context_state = ContextState::Failed;
455 let main_loop = pulse.lock().unwrap().main_loop;
456 pa_threaded_mainloop_signal(main_loop, 0);
457 },
458 _ => (),
459 }
460 }
461
462 unsafe extern "C" fn sink_info_callback(
463 _ctx: *mut pa_context,
464 info: *const pa_sink_info,
465 eol: c_int,
466 query_ptr: *mut c_void,
467 ) {
468 let query: &mut PulseDeviceQuery = &mut *(query_ptr as *mut _);
469 if eol>0 {
470 pa_threaded_mainloop_signal(query.main_loop.unwrap(), 0);
471 return
472 }
473 query.sink_list.push(PulseDeviceDesc {
474 name: CStr::from_ptr((*info).name).to_str().unwrap().to_string(),
475 description: CStr::from_ptr((*info).description).to_str().unwrap().to_string(),
476 _index: (*info).index
477 })
478 }
479
480 unsafe extern "C" fn source_info_callback(
481 _ctx: *mut pa_context,
482 info: *const pa_source_info,
483 eol: c_int,
484 query_ptr: *mut c_void,
485 ) {
486 let query: &mut PulseDeviceQuery = &mut *(query_ptr as *mut _);
487 if eol>0 {
488 pa_threaded_mainloop_signal(query.main_loop.unwrap(), 0);
489 return
490 }
491 query.source_list.push(PulseDeviceDesc {
492 name: CStr::from_ptr((*info).name).to_str().unwrap().to_string(),
493 description: CStr::from_ptr((*info).description).to_str().unwrap().to_string(),
494 _index: (*info).index
495 })
496 }
497
498 unsafe extern "C" fn server_info_callback(
499 _ctx: *mut pa_context,
500 info: *const pa_server_info,
501 query_ptr: *mut c_void,
502 ) {
503 let query: &mut PulseDeviceQuery = &mut *(query_ptr as *mut _);
504 query.default_sink = Some(CStr::from_ptr((*info).default_sink_name).to_str().unwrap().to_string());
505 query.default_source = Some(CStr::from_ptr((*info).default_source_name).to_str().unwrap().to_string());
506 }
507
508 pub fn get_updated_descs(&mut self) -> Vec<AudioDeviceDesc> {
509 unsafe {
511 let mut query = PulseDeviceQuery::default();
512 query.main_loop = Some(self.main_loop);
513 let sink_op = pa_context_get_sink_info_list(self.context, Some(Self::sink_info_callback), &mut query as *mut _ as *mut _);
514 let source_op = pa_context_get_source_info_list(self.context, Some(Self::source_info_callback), &mut query as *mut _ as *mut _);
515 let server_op = pa_context_get_server_info(self.context, Some(Self::server_info_callback), &mut query as *mut _ as *mut _);
516 while pa_operation_get_state(sink_op) == PA_OPERATION_RUNNING ||
517 pa_operation_get_state(source_op) == PA_OPERATION_RUNNING ||
518 pa_operation_get_state(server_op) == PA_OPERATION_RUNNING {
519 pa_threaded_mainloop_wait(self.main_loop);
520 }
521 pa_operation_unref(sink_op);
522 pa_operation_unref(source_op);
523 pa_operation_unref(server_op);
524 let mut out = Vec::new();
526 let mut device_descs = Vec::new();
527 for source in query.source_list {
528 let device_id = LiveId::from_str(&source.name).into();
529 out.push(AudioDeviceDesc {
530 has_failed: self.failed_devices.contains(&device_id),
531 device_id,
532 device_type: AudioDeviceType::Input,
533 is_default: Some(&source.name) == query.default_source.as_ref(),
534 channel_count: 2,
535 name: format!("[Pulse Audio] {}", source.description)
536 });
537 device_descs.push(PulseAudioDesc {
538 name: source.name.clone(),
539 desc: out.last().cloned().unwrap()
540 });
541 }
542 for sink in query.sink_list {
543 let device_id = LiveId::from_str(&sink.name).into();
544 out.push(AudioDeviceDesc {
545 has_failed: self.failed_devices.contains(&device_id),
546 device_id,
547 device_type: AudioDeviceType::Output,
548 is_default: Some(&sink.name) == query.default_sink.as_ref(),
549 channel_count: 2,
550 name: format!("[Pulse Audio] {}", sink.description)
551 });
552 device_descs.push(PulseAudioDesc {
553 name: sink.name.clone(),
554 desc: out.last().cloned().unwrap()
555 });
556 }
557 self.device_descs = device_descs;
558 out
559 }
560
561 }
562
563 pub fn use_audio_inputs(&mut self, devices: &[AudioDeviceId]) {
564 let new = {
565 let mut i = 0;
566 while i < self.audio_inputs.len() {
567 if !devices.contains(&self.audio_inputs[i].device_id) {
568 let item = self.audio_inputs.remove(i);
569 unsafe {item.terminate(self)};
570 }
571 else {
572 i += 1;
573 }
574 }
575 let mut new = Vec::new();
577 for (index, device_id) in devices.iter().enumerate() {
578 if self.audio_outputs.iter().find( | v | v.device_id == *device_id).is_none() {
579 if let Some(v) = self.device_descs.iter().find( | v | v.desc.device_id == *device_id) {
580 new.push((index, *device_id, &v.name))
581 }
582 }
583 }
584 new
585
586 };
587 for (index, device_id, name) in new {
588 let new_input = unsafe {PulseInputStream::new(device_id, name, index, self)};
589 self.audio_inputs.push(new_input);
590 }
591 }
592
593 pub fn use_audio_outputs(&mut self, devices: &[AudioDeviceId]) {
594 let new = {
595 let mut i = 0;
596 while i < self.audio_outputs.len() {
597 if !devices.contains(&self.audio_outputs[i].device_id) {
598 let item = self.audio_outputs.remove(i);
599 item.terminate(self);
600 }
601 else {
602 i += 1;
603 }
604 }
605 let mut new = Vec::new();
607 for (index, device_id) in devices.iter().enumerate() {
608 if self.audio_outputs.iter().find( | v | v.device_id == *device_id).is_none() {
609 if let Some(v) = self.device_descs.iter().find( | v | v.desc.device_id == *device_id) {
610 new.push((index, *device_id, &v.name))
611 }
612 }
613 }
614 new
615
616 };
617 for (index, device_id, name) in new {
618 if let Some(new_output) = unsafe {PulseOutputStream::new(device_id, name, index, self)}{
619 self.audio_outputs.push(new_output);
620 }
621 else{
622 self.failed_devices.insert(device_id);
623 self.change_signal.set();
624 }
625 }
626 }
627}