1use std::{
2 collections::VecDeque,
3 io,
4 sync::{
5 atomic::{AtomicU64, Ordering},
6 Arc, Mutex,
7 },
8 thread::{self},
9};
10
11use cpal::{
12 traits::{DeviceTrait, HostTrait, StreamTrait},
13 BuildStreamError, DefaultStreamConfigError, Device, PauseStreamError, PlayStreamError,
14 SizedSample, Stream, SupportedStreamConfig,
15};
16use crossbeam_channel::{bounded, unbounded};
17use thiserror::Error;
18
19use xsynth_core::{
20 buffered_renderer::{BufferedRenderer, BufferedRendererStatsReader},
21 channel::{ChannelConfigEvent, ChannelEvent, VoiceChannel},
22 channel_group::SynthFormat,
23 effects::VolumeLimiter,
24 helpers::{prepapre_cache_vec, sum_simd},
25 AudioPipe, AudioStreamParams, FunctionAudioPipe,
26};
27
28use crate::{
29 util::ReadWriteAtomicU64, RealtimeEventSender, SynthEvent, ThreadCount, XSynthRealtimeConfig,
30};
31
32#[derive(Debug, Error)]
33pub enum RealtimeSynthError {
34 #[error("failed to find output device")]
35 NoOutputDevice,
36
37 #[error("failed to get default output config: {0}")]
38 DefaultOutputConfig(#[from] DefaultStreamConfigError),
39
40 #[error("failed to build thread pool: {0}")]
41 ThreadPoolBuild(#[from] rayon::ThreadPoolBuildError),
42
43 #[error("failed to spawn realtime channel thread: {0}")]
44 ChannelThreadSpawn(#[source] io::Error),
45
46 #[error("failed to spawn realtime stream thread: {0}")]
47 StreamThreadSpawn(#[source] io::Error),
48
49 #[error("realtime stream thread terminated during startup")]
50 StreamThreadInit,
51
52 #[error("failed to create realtime event sender: {0}")]
53 EventSenderInit(#[source] io::Error),
54
55 #[error("failed to spawn buffered renderer thread: {0}")]
56 BufferedRendererThreadSpawn(#[source] io::Error),
57
58 #[error("failed to create audio stream: {0}")]
59 BuildStream(#[from] BuildStreamError),
60
61 #[error("failed to start audio stream: {0}")]
62 PlayStream(#[from] PlayStreamError),
63
64 #[error("unsupported sample format: {0:?}")]
65 UnsupportedSampleFormat(cpal::SampleFormat),
66}
67
68#[derive(Debug, Clone)]
70struct RealtimeSynthStats {
71 voice_count: Arc<AtomicU64>,
72}
73
74impl RealtimeSynthStats {
75 pub fn new() -> RealtimeSynthStats {
76 RealtimeSynthStats {
77 voice_count: Arc::new(AtomicU64::new(0)),
78 }
79 }
80}
81
82pub struct RealtimeSynthStatsReader {
84 buffered_stats: BufferedRendererStatsReader,
85 stats: RealtimeSynthStats,
86}
87
88impl RealtimeSynthStatsReader {
89 pub(self) fn new(
90 stats: RealtimeSynthStats,
91 buffered_stats: BufferedRendererStatsReader,
92 ) -> RealtimeSynthStatsReader {
93 RealtimeSynthStatsReader {
94 stats,
95 buffered_stats,
96 }
97 }
98
99 pub fn voice_count(&self) -> u64 {
101 self.stats.voice_count.load(Ordering::Relaxed)
102 }
103
104 pub fn buffer(&self) -> &BufferedRendererStatsReader {
108 &self.buffered_stats
109 }
110}
111
112struct RealtimeSynthThreadSharedData {
113 buffered_renderer: Arc<Mutex<BufferedRenderer>>,
114
115 stream_control: crossbeam_channel::Sender<StreamCommand>,
116
117 event_senders: RealtimeEventSender,
118}
119
120struct PreparedRealtimeChannels {
121 channel_stats: Vec<xsynth_core::channel::VoiceChannelStatsReader>,
122 senders: Vec<crossbeam_channel::Sender<ChannelEvent>>,
123 command_senders: Vec<crossbeam_channel::Sender<Vec<f32>>>,
124 join_handles: Vec<thread::JoinHandle<()>>,
125 output_receiver: crossbeam_channel::Receiver<Vec<f32>>,
126}
127
128pub struct RealtimeSynth {
130 data: Option<RealtimeSynthThreadSharedData>,
131 stream_owner: Option<thread::JoinHandle<()>>,
132 join_handles: Vec<thread::JoinHandle<()>>,
133
134 stats: RealtimeSynthStats,
135
136 stream_params: AudioStreamParams,
137}
138
139enum StreamCommand {
140 Pause(crossbeam_channel::Sender<Result<(), PauseStreamError>>),
141 Resume(crossbeam_channel::Sender<Result<(), PlayStreamError>>),
142 Shutdown,
143}
144
145impl RealtimeSynth {
146 pub fn open_with_all_defaults() -> Result<Self, RealtimeSynthError> {
149 let host = cpal::default_host();
150
151 let device = host
152 .default_output_device()
153 .ok_or(RealtimeSynthError::NoOutputDevice)?;
154 if let Ok(name) = device.name() {
155 println!("Output device: {name}");
156 }
157
158 let stream_config = device.default_output_config()?;
159
160 RealtimeSynth::open(Default::default(), &device, stream_config)
161 }
162
163 pub fn open_with_default_output(
168 config: XSynthRealtimeConfig,
169 ) -> Result<Self, RealtimeSynthError> {
170 let host = cpal::default_host();
171
172 let device = host
173 .default_output_device()
174 .ok_or(RealtimeSynthError::NoOutputDevice)?;
175 if let Ok(name) = device.name() {
176 println!("Output device: {name}");
177 }
178
179 let stream_config = device.default_output_config()?;
180
181 RealtimeSynth::open(config, &device, stream_config)
182 }
183
184 pub fn open(
190 config: XSynthRealtimeConfig,
191 device: &Device,
192 stream_config: SupportedStreamConfig,
193 ) -> Result<Self, RealtimeSynthError> {
194 let sample_rate = stream_config.sample_rate().0;
195 let stream_params = AudioStreamParams::new(sample_rate, stream_config.channels().into());
196 let channel_pool = build_channel_pool(config.multithreading)?;
197 let channel_count = channel_count(config.format);
198
199 let PreparedRealtimeChannels {
200 channel_stats,
201 senders,
202 command_senders,
203 join_handles,
204 output_receiver,
205 } = prepare_channels(
206 channel_count,
207 config.channel_init_options,
208 stream_params,
209 channel_pool,
210 config.format,
211 )?;
212
213 let stats = RealtimeSynthStats::new();
214 let render = build_render_pipe(
215 stream_params,
216 channel_count,
217 command_senders,
218 output_receiver,
219 channel_stats,
220 &stats,
221 );
222 let buffered = Arc::new(Mutex::new(
223 BufferedRenderer::new(
224 render,
225 stream_params,
226 calculate_render_size(sample_rate, config.render_window_ms),
227 )
228 .map_err(RealtimeSynthError::BufferedRendererThreadSpawn)?,
229 ));
230 let (stream_control, stream_owner) =
231 spawn_stream_thread(device.clone(), stream_config, buffered.clone())?;
232
233 let max_nps = Arc::new(ReadWriteAtomicU64::new(10000));
234
235 Ok(Self {
236 data: Some(RealtimeSynthThreadSharedData {
237 buffered_renderer: buffered,
238
239 event_senders: RealtimeEventSender::new(senders, max_nps, config.ignore_range)
240 .map_err(RealtimeSynthError::EventSenderInit)?,
241 stream_control,
242 }),
243 stream_owner: Some(stream_owner),
244 join_handles,
245
246 stats,
247 stream_params,
248 })
249 }
250
251 pub fn send_event(&mut self, event: SynthEvent) {
255 let data = self.data.as_mut().unwrap();
256 data.event_senders.send_event(event);
257 }
258
259 pub fn send_event_u32(&mut self, event: u32) {
261 let data = self.data.as_mut().unwrap();
262 data.event_senders.send_event_u32(event);
263 }
264
265 pub fn get_sender_ref(&self) -> &RealtimeEventSender {
271 let data = self.data.as_ref().unwrap();
272 &data.event_senders
273 }
274
275 pub fn get_sender_mut(&mut self) -> &mut RealtimeEventSender {
282 let data = self.data.as_mut().unwrap();
283 &mut data.event_senders
284 }
285
286 pub fn get_stats(&self) -> RealtimeSynthStatsReader {
291 let data = self.data.as_ref().unwrap();
292 let buffered_stats = data.buffered_renderer.lock().unwrap().get_buffer_stats();
293
294 RealtimeSynthStatsReader::new(self.stats.clone(), buffered_stats)
295 }
296
297 pub fn stream_params(&self) -> AudioStreamParams {
299 self.stream_params
300 }
301
302 pub fn pause(&mut self) -> Result<(), PauseStreamError> {
304 let data = self.data.as_ref().unwrap();
305 let (sender, receiver) = bounded(1);
306 if data
307 .stream_control
308 .send(StreamCommand::Pause(sender))
309 .is_err()
310 {
311 return Err(PauseStreamError::DeviceNotAvailable);
312 }
313 receiver
314 .recv()
315 .unwrap_or(Err(PauseStreamError::DeviceNotAvailable))
316 }
317
318 pub fn resume(&mut self) -> Result<(), PlayStreamError> {
320 let data = self.data.as_ref().unwrap();
321 let (sender, receiver) = bounded(1);
322 if data
323 .stream_control
324 .send(StreamCommand::Resume(sender))
325 .is_err()
326 {
327 return Err(PlayStreamError::DeviceNotAvailable);
328 }
329 receiver
330 .recv()
331 .unwrap_or(Err(PlayStreamError::DeviceNotAvailable))
332 }
333
334 pub fn set_buffer(&self, render_window_ms: f64) {
336 let data = self.data.as_ref().unwrap();
337 let sample_rate = self.stream_params.sample_rate;
338 let size = calculate_render_size(sample_rate, render_window_ms);
339 data.buffered_renderer.lock().unwrap().set_render_size(size);
340 }
341}
342
343fn build_channel_pool(
344 thread_count: ThreadCount,
345) -> Result<Option<Arc<rayon::ThreadPool>>, RealtimeSynthError> {
346 Ok(match thread_count {
347 ThreadCount::None => None,
348 ThreadCount::Auto => Some(Arc::new(rayon::ThreadPoolBuilder::new().build()?)),
349 ThreadCount::Manual(threads) => Some(Arc::new(
350 rayon::ThreadPoolBuilder::new()
351 .num_threads(threads)
352 .build()?,
353 )),
354 })
355}
356
357fn channel_count(format: SynthFormat) -> u32 {
358 match format {
359 SynthFormat::Midi => 16,
360 SynthFormat::Custom { channels } => channels,
361 }
362}
363
364fn prepare_channels(
365 channel_count: u32,
366 init_options: xsynth_core::channel::ChannelInitOptions,
367 stream_params: AudioStreamParams,
368 channel_pool: Option<Arc<rayon::ThreadPool>>,
369 format: SynthFormat,
370) -> Result<PreparedRealtimeChannels, RealtimeSynthError> {
371 let (output_sender, output_receiver) = bounded::<Vec<f32>>(channel_count as usize);
372
373 let mut channel_stats = Vec::new();
374 let mut senders = Vec::new();
375 let mut command_senders = Vec::new();
376 let mut join_handles = Vec::new();
377
378 for _ in 0..channel_count {
379 let channel = VoiceChannel::new(init_options, stream_params, channel_pool.clone());
380 channel_stats.push(channel.get_channel_stats());
381
382 let (event_sender, event_receiver) = unbounded();
383 senders.push(event_sender);
384
385 let (command_sender, command_receiver) = bounded::<Vec<f32>>(1);
386 command_senders.push(command_sender);
387
388 let output_sender = output_sender.clone();
389 let join_handle =
390 spawn_channel_thread(channel, event_receiver, command_receiver, output_sender)?;
391 join_handles.push(join_handle);
392 }
393
394 if format == SynthFormat::Midi {
395 let _ = senders[9].send(ChannelEvent::Config(ChannelConfigEvent::SetPercussionMode(
396 true,
397 )));
398 }
399
400 Ok(PreparedRealtimeChannels {
401 channel_stats,
402 senders,
403 command_senders,
404 join_handles,
405 output_receiver,
406 })
407}
408
409fn spawn_channel_thread(
410 mut channel: VoiceChannel,
411 event_receiver: crossbeam_channel::Receiver<ChannelEvent>,
412 command_receiver: crossbeam_channel::Receiver<Vec<f32>>,
413 output_sender: crossbeam_channel::Sender<Vec<f32>>,
414) -> Result<thread::JoinHandle<()>, RealtimeSynthError> {
415 thread::Builder::new()
416 .name("xsynth_channel_handler".to_string())
417 .spawn(move || loop {
418 channel.push_events_iter(event_receiver.try_iter());
419 let mut vec = match command_receiver.recv() {
420 Ok(vec) => vec,
421 Err(_) => break,
422 };
423 channel.push_events_iter(event_receiver.try_iter());
424 channel.read_samples(&mut vec);
425 if output_sender.send(vec).is_err() {
426 break;
427 }
428 })
429 .map_err(RealtimeSynthError::ChannelThreadSpawn)
430}
431
432fn build_render_pipe(
433 stream_params: AudioStreamParams,
434 channel_count: u32,
435 command_senders: Vec<crossbeam_channel::Sender<Vec<f32>>>,
436 output_receiver: crossbeam_channel::Receiver<Vec<f32>>,
437 channel_stats: Vec<xsynth_core::channel::VoiceChannelStatsReader>,
438 stats: &RealtimeSynthStats,
439) -> FunctionAudioPipe<impl FnMut(&mut [f32]) + Send> {
440 let mut vec_cache: VecDeque<Vec<f32>> = VecDeque::new();
441 for _ in 0..channel_count {
442 vec_cache.push_front(Vec::new());
443 }
444
445 let total_voice_count = stats.voice_count.clone();
446
447 FunctionAudioPipe::new(stream_params, move |out| {
448 for sender in &command_senders {
449 let mut buf = vec_cache.pop_front().unwrap();
450 prepapre_cache_vec(&mut buf, out.len(), 0.0);
451 sender.send(buf).unwrap();
452 }
453
454 for _ in 0..channel_count {
455 let buf = output_receiver.recv().unwrap();
456 sum_simd(&buf, out);
457 vec_cache.push_front(buf);
458 }
459
460 let total_voices = channel_stats.iter().map(|c| c.voice_count()).sum();
461 total_voice_count.store(total_voices, Ordering::SeqCst);
462 })
463}
464
465fn build_output_stream(
466 device: &Device,
467 stream_config: SupportedStreamConfig,
468 buffered: Arc<Mutex<BufferedRenderer>>,
469) -> Result<Stream, RealtimeSynthError> {
470 match stream_config.sample_format() {
471 cpal::SampleFormat::F32 => build_output_stream_for::<f32>(device, stream_config, buffered),
472 cpal::SampleFormat::I16 => build_output_stream_for::<i16>(device, stream_config, buffered),
473 cpal::SampleFormat::U16 => build_output_stream_for::<u16>(device, stream_config, buffered),
474 _ => Err(RealtimeSynthError::UnsupportedSampleFormat(
475 stream_config.sample_format(),
476 )),
477 }
478}
479
480fn build_output_stream_for<T: SizedSample + ConvertSample>(
481 device: &Device,
482 stream_config: SupportedStreamConfig,
483 buffered: Arc<Mutex<BufferedRenderer>>,
484) -> Result<Stream, RealtimeSynthError> {
485 let err_fn = |err| eprintln!("an error occurred on stream: {err}");
486 let mut output_vec = Vec::new();
487 let mut limiter = VolumeLimiter::new(stream_config.channels());
488
489 Ok(device.build_output_stream(
490 &stream_config.into(),
491 move |data: &mut [T], _: &cpal::OutputCallbackInfo| {
492 output_vec.resize(data.len(), 0.0);
493 buffered.lock().unwrap().read(&mut output_vec);
494 for (i, s) in limiter.limit_iter(output_vec.drain(0..)).enumerate() {
495 data[i] = ConvertSample::from_f32(s);
496 }
497 },
498 err_fn,
499 None,
500 )?)
501}
502
503fn spawn_stream_thread(
504 device: Device,
505 stream_config: SupportedStreamConfig,
506 buffered: Arc<Mutex<BufferedRenderer>>,
507) -> Result<
508 (
509 crossbeam_channel::Sender<StreamCommand>,
510 thread::JoinHandle<()>,
511 ),
512 RealtimeSynthError,
513> {
514 let (command_sender, command_receiver) = unbounded();
515 let (ready_sender, ready_receiver) = bounded(1);
516 let join_handle = thread::Builder::new()
517 .name("xsynth_stream_owner".to_string())
518 .spawn(move || {
519 let stream = match build_output_stream(&device, stream_config, buffered) {
520 Ok(stream) => stream,
521 Err(err) => {
522 let _ = ready_sender.send(Err(err));
523 return;
524 }
525 };
526 if let Err(err) = stream.play() {
527 let _ = ready_sender.send(Err(err.into()));
528 return;
529 }
530 if ready_sender.send(Ok(())).is_err() {
531 return;
532 }
533
534 while let Ok(command) = command_receiver.recv() {
535 match command {
536 StreamCommand::Pause(reply) => {
537 let _ = reply.send(stream.pause());
538 }
539 StreamCommand::Resume(reply) => {
540 let _ = reply.send(stream.play());
541 }
542 StreamCommand::Shutdown => break,
543 }
544 }
545 })
546 .map_err(RealtimeSynthError::StreamThreadSpawn)?;
547
548 match ready_receiver.recv() {
549 Ok(Ok(())) => Ok((command_sender, join_handle)),
550 Ok(Err(err)) => {
551 let _ = join_handle.join();
552 Err(err)
553 }
554 Err(_) => {
555 let _ = join_handle.join();
556 Err(RealtimeSynthError::StreamThreadInit)
557 }
558 }
559}
560
561impl Drop for RealtimeSynth {
562 fn drop(&mut self) {
563 let data = self.data.take().unwrap();
564 let _ = data.stream_control.send(StreamCommand::Shutdown);
565 drop(data);
566 if let Some(handle) = self.stream_owner.take() {
567 if handle.join().is_err() {
568 eprintln!("xsynth-realtime: stream owner thread panicked during shutdown");
569 }
570 }
571 for handle in self.join_handles.drain(..) {
572 if handle.join().is_err() {
573 eprintln!("xsynth-realtime: channel handler thread panicked during shutdown");
574 }
575 }
576 }
577}
578
579trait ConvertSample: SizedSample {
580 fn from_f32(s: f32) -> Self;
581}
582
583impl ConvertSample for f32 {
584 fn from_f32(s: f32) -> Self {
585 s
586 }
587}
588
589impl ConvertSample for i16 {
590 fn from_f32(s: f32) -> Self {
591 (s * i16::MAX as f32) as i16
592 }
593}
594
595impl ConvertSample for u16 {
596 fn from_f32(s: f32) -> Self {
597 ((s * u16::MAX as f32) as i32 + i16::MIN as i32) as u16
598 }
599}
600
601fn calculate_render_size(sample_rate: u32, buffer_ms: f64) -> usize {
602 (sample_rate as f64 * buffer_ms / 1000.0) as usize
603}
604
605#[cfg(test)]
606mod tests {
607 use super::RealtimeSynth;
608
609 #[test]
610 fn realtime_synth_is_send_sync() {
611 fn assert_send_sync<T: Send + Sync>() {}
612 assert_send_sync::<RealtimeSynth>();
613 }
614}