Skip to main content

xsynth_realtime/
realtime_synth.rs

1use std::{
2    collections::VecDeque,
3    io,
4    sync::{
5        atomic::{AtomicU64, Ordering},
6        Arc, Mutex,
7    },
8    thread::{self},
9};
10
11use cpal::{
12    traits::{DeviceTrait, HostTrait, StreamTrait},
13    BuildStreamError, DefaultStreamConfigError, Device, PauseStreamError, PlayStreamError,
14    SizedSample, Stream, SupportedStreamConfig,
15};
16use crossbeam_channel::{bounded, unbounded};
17use thiserror::Error;
18
19use xsynth_core::{
20    buffered_renderer::{BufferedRenderer, BufferedRendererStatsReader},
21    channel::{ChannelConfigEvent, ChannelEvent, VoiceChannel},
22    channel_group::SynthFormat,
23    effects::VolumeLimiter,
24    helpers::{prepapre_cache_vec, sum_simd},
25    AudioPipe, AudioStreamParams, FunctionAudioPipe,
26};
27
28use crate::{
29    util::ReadWriteAtomicU64, RealtimeEventSender, SynthEvent, ThreadCount, XSynthRealtimeConfig,
30};
31
32#[derive(Debug, Error)]
33pub enum RealtimeSynthError {
34    #[error("failed to find output device")]
35    NoOutputDevice,
36
37    #[error("failed to get default output config: {0}")]
38    DefaultOutputConfig(#[from] DefaultStreamConfigError),
39
40    #[error("failed to build thread pool: {0}")]
41    ThreadPoolBuild(#[from] rayon::ThreadPoolBuildError),
42
43    #[error("failed to spawn realtime channel thread: {0}")]
44    ChannelThreadSpawn(#[source] io::Error),
45
46    #[error("failed to spawn realtime stream thread: {0}")]
47    StreamThreadSpawn(#[source] io::Error),
48
49    #[error("realtime stream thread terminated during startup")]
50    StreamThreadInit,
51
52    #[error("failed to create realtime event sender: {0}")]
53    EventSenderInit(#[source] io::Error),
54
55    #[error("failed to spawn buffered renderer thread: {0}")]
56    BufferedRendererThreadSpawn(#[source] io::Error),
57
58    #[error("failed to create audio stream: {0}")]
59    BuildStream(#[from] BuildStreamError),
60
61    #[error("failed to start audio stream: {0}")]
62    PlayStream(#[from] PlayStreamError),
63
64    #[error("unsupported sample format: {0:?}")]
65    UnsupportedSampleFormat(cpal::SampleFormat),
66}
67
68/// Holds the statistics for an instance of RealtimeSynth.
69#[derive(Debug, Clone)]
70struct RealtimeSynthStats {
71    voice_count: Arc<AtomicU64>,
72}
73
74impl RealtimeSynthStats {
75    pub fn new() -> RealtimeSynthStats {
76        RealtimeSynthStats {
77            voice_count: Arc::new(AtomicU64::new(0)),
78        }
79    }
80}
81
82/// Reads the statistics of an instance of RealtimeSynth in a usable way.
83pub struct RealtimeSynthStatsReader {
84    buffered_stats: BufferedRendererStatsReader,
85    stats: RealtimeSynthStats,
86}
87
88impl RealtimeSynthStatsReader {
89    pub(self) fn new(
90        stats: RealtimeSynthStats,
91        buffered_stats: BufferedRendererStatsReader,
92    ) -> RealtimeSynthStatsReader {
93        RealtimeSynthStatsReader {
94            stats,
95            buffered_stats,
96        }
97    }
98
99    /// Returns the active voice count of all the MIDI channels.
100    pub fn voice_count(&self) -> u64 {
101        self.stats.voice_count.load(Ordering::Relaxed)
102    }
103
104    /// Returns the statistics of the buffered renderer used.
105    ///
106    /// See the BufferedRendererStatsReader documentation for more information.
107    pub fn buffer(&self) -> &BufferedRendererStatsReader {
108        &self.buffered_stats
109    }
110}
111
112struct RealtimeSynthThreadSharedData {
113    buffered_renderer: Arc<Mutex<BufferedRenderer>>,
114
115    stream_control: crossbeam_channel::Sender<StreamCommand>,
116
117    event_senders: RealtimeEventSender,
118}
119
120struct PreparedRealtimeChannels {
121    channel_stats: Vec<xsynth_core::channel::VoiceChannelStatsReader>,
122    senders: Vec<crossbeam_channel::Sender<ChannelEvent>>,
123    command_senders: Vec<crossbeam_channel::Sender<Vec<f32>>>,
124    join_handles: Vec<thread::JoinHandle<()>>,
125    output_receiver: crossbeam_channel::Receiver<Vec<f32>>,
126}
127
128/// A realtime MIDI synthesizer using an audio device for output.
129pub struct RealtimeSynth {
130    data: Option<RealtimeSynthThreadSharedData>,
131    stream_owner: Option<thread::JoinHandle<()>>,
132    join_handles: Vec<thread::JoinHandle<()>>,
133
134    stats: RealtimeSynthStats,
135
136    stream_params: AudioStreamParams,
137}
138
139enum StreamCommand {
140    Pause(crossbeam_channel::Sender<Result<(), PauseStreamError>>),
141    Resume(crossbeam_channel::Sender<Result<(), PlayStreamError>>),
142    Shutdown,
143}
144
145impl RealtimeSynth {
146    /// Initializes a new realtime synthesizer using the default config and
147    /// the default audio output.
148    pub fn open_with_all_defaults() -> Result<Self, RealtimeSynthError> {
149        let host = cpal::default_host();
150
151        let device = host
152            .default_output_device()
153            .ok_or(RealtimeSynthError::NoOutputDevice)?;
154        if let Ok(name) = device.name() {
155            println!("Output device: {name}");
156        }
157
158        let stream_config = device.default_output_config()?;
159
160        RealtimeSynth::open(Default::default(), &device, stream_config)
161    }
162
163    /// Initializes as new realtime synthesizer using a given config and
164    /// the default audio output.
165    ///
166    /// See the `XSynthRealtimeConfig` documentation for the available options.
167    pub fn open_with_default_output(
168        config: XSynthRealtimeConfig,
169    ) -> Result<Self, RealtimeSynthError> {
170        let host = cpal::default_host();
171
172        let device = host
173            .default_output_device()
174            .ok_or(RealtimeSynthError::NoOutputDevice)?;
175        if let Ok(name) = device.name() {
176            println!("Output device: {name}");
177        }
178
179        let stream_config = device.default_output_config()?;
180
181        RealtimeSynth::open(config, &device, stream_config)
182    }
183
184    /// Initializes a new realtime synthesizer using a given config and a
185    /// specified audio output device.
186    ///
187    /// See the `XSynthRealtimeConfig` documentation for the available options.
188    /// See the `cpal` crate documentation for the `device` and `stream_config` parameters.
189    pub fn open(
190        config: XSynthRealtimeConfig,
191        device: &Device,
192        stream_config: SupportedStreamConfig,
193    ) -> Result<Self, RealtimeSynthError> {
194        let sample_rate = stream_config.sample_rate().0;
195        let stream_params = AudioStreamParams::new(sample_rate, stream_config.channels().into());
196        let channel_pool = build_channel_pool(config.multithreading)?;
197        let channel_count = channel_count(config.format);
198
199        let PreparedRealtimeChannels {
200            channel_stats,
201            senders,
202            command_senders,
203            join_handles,
204            output_receiver,
205        } = prepare_channels(
206            channel_count,
207            config.channel_init_options,
208            stream_params,
209            channel_pool,
210            config.format,
211        )?;
212
213        let stats = RealtimeSynthStats::new();
214        let render = build_render_pipe(
215            stream_params,
216            channel_count,
217            command_senders,
218            output_receiver,
219            channel_stats,
220            &stats,
221        );
222        let buffered = Arc::new(Mutex::new(
223            BufferedRenderer::new(
224                render,
225                stream_params,
226                calculate_render_size(sample_rate, config.render_window_ms),
227            )
228            .map_err(RealtimeSynthError::BufferedRendererThreadSpawn)?,
229        ));
230        let (stream_control, stream_owner) =
231            spawn_stream_thread(device.clone(), stream_config, buffered.clone())?;
232
233        let max_nps = Arc::new(ReadWriteAtomicU64::new(10000));
234
235        Ok(Self {
236            data: Some(RealtimeSynthThreadSharedData {
237                buffered_renderer: buffered,
238
239                event_senders: RealtimeEventSender::new(senders, max_nps, config.ignore_range)
240                    .map_err(RealtimeSynthError::EventSenderInit)?,
241                stream_control,
242            }),
243            stream_owner: Some(stream_owner),
244            join_handles,
245
246            stats,
247            stream_params,
248        })
249    }
250
251    /// Sends a SynthEvent to the realtime synthesizer.
252    ///
253    /// See the `SynthEvent` documentation for more information.
254    pub fn send_event(&mut self, event: SynthEvent) {
255        let data = self.data.as_mut().unwrap();
256        data.event_senders.send_event(event);
257    }
258
259    /// Sends a u32 event to the realtime synthesizer.
260    pub fn send_event_u32(&mut self, event: u32) {
261        let data = self.data.as_mut().unwrap();
262        data.event_senders.send_event_u32(event);
263    }
264
265    /// Returns a reference to the event sender of the realtime synthesizer.
266    /// This can be used to clone the sender so it can be passed in threads.
267    ///
268    /// See the `RealtimeEventSender` documentation for more information
269    /// on how to use.
270    pub fn get_sender_ref(&self) -> &RealtimeEventSender {
271        let data = self.data.as_ref().unwrap();
272        &data.event_senders
273    }
274
275    /// Returns a mutable reference the event sender of the realtime synthesizer.
276    /// This can be used to modify its parameters (eg. ignore range).
277    /// Please note that each clone will store its own distinct parameters.
278    ///
279    /// See the `RealtimeEventSender` documentation for more information
280    /// on how to use.
281    pub fn get_sender_mut(&mut self) -> &mut RealtimeEventSender {
282        let data = self.data.as_mut().unwrap();
283        &mut data.event_senders
284    }
285
286    /// Returns the statistics reader of the realtime synthesizer.
287    ///
288    /// See the `RealtimeSynthStatsReader` documentation for more information
289    /// on how to use.
290    pub fn get_stats(&self) -> RealtimeSynthStatsReader {
291        let data = self.data.as_ref().unwrap();
292        let buffered_stats = data.buffered_renderer.lock().unwrap().get_buffer_stats();
293
294        RealtimeSynthStatsReader::new(self.stats.clone(), buffered_stats)
295    }
296
297    /// Returns the stream parameters of the audio output device.
298    pub fn stream_params(&self) -> AudioStreamParams {
299        self.stream_params
300    }
301
302    /// Pauses the playback of the audio output device.
303    pub fn pause(&mut self) -> Result<(), PauseStreamError> {
304        let data = self.data.as_ref().unwrap();
305        let (sender, receiver) = bounded(1);
306        if data
307            .stream_control
308            .send(StreamCommand::Pause(sender))
309            .is_err()
310        {
311            return Err(PauseStreamError::DeviceNotAvailable);
312        }
313        receiver
314            .recv()
315            .unwrap_or(Err(PauseStreamError::DeviceNotAvailable))
316    }
317
318    /// Resumes the playback of the audio output device.
319    pub fn resume(&mut self) -> Result<(), PlayStreamError> {
320        let data = self.data.as_ref().unwrap();
321        let (sender, receiver) = bounded(1);
322        if data
323            .stream_control
324            .send(StreamCommand::Resume(sender))
325            .is_err()
326        {
327            return Err(PlayStreamError::DeviceNotAvailable);
328        }
329        receiver
330            .recv()
331            .unwrap_or(Err(PlayStreamError::DeviceNotAvailable))
332    }
333
334    /// Changes the length of the buffer reader.
335    pub fn set_buffer(&self, render_window_ms: f64) {
336        let data = self.data.as_ref().unwrap();
337        let sample_rate = self.stream_params.sample_rate;
338        let size = calculate_render_size(sample_rate, render_window_ms);
339        data.buffered_renderer.lock().unwrap().set_render_size(size);
340    }
341}
342
343fn build_channel_pool(
344    thread_count: ThreadCount,
345) -> Result<Option<Arc<rayon::ThreadPool>>, RealtimeSynthError> {
346    Ok(match thread_count {
347        ThreadCount::None => None,
348        ThreadCount::Auto => Some(Arc::new(rayon::ThreadPoolBuilder::new().build()?)),
349        ThreadCount::Manual(threads) => Some(Arc::new(
350            rayon::ThreadPoolBuilder::new()
351                .num_threads(threads)
352                .build()?,
353        )),
354    })
355}
356
357fn channel_count(format: SynthFormat) -> u32 {
358    match format {
359        SynthFormat::Midi => 16,
360        SynthFormat::Custom { channels } => channels,
361    }
362}
363
364fn prepare_channels(
365    channel_count: u32,
366    init_options: xsynth_core::channel::ChannelInitOptions,
367    stream_params: AudioStreamParams,
368    channel_pool: Option<Arc<rayon::ThreadPool>>,
369    format: SynthFormat,
370) -> Result<PreparedRealtimeChannels, RealtimeSynthError> {
371    let (output_sender, output_receiver) = bounded::<Vec<f32>>(channel_count as usize);
372
373    let mut channel_stats = Vec::new();
374    let mut senders = Vec::new();
375    let mut command_senders = Vec::new();
376    let mut join_handles = Vec::new();
377
378    for _ in 0..channel_count {
379        let channel = VoiceChannel::new(init_options, stream_params, channel_pool.clone());
380        channel_stats.push(channel.get_channel_stats());
381
382        let (event_sender, event_receiver) = unbounded();
383        senders.push(event_sender);
384
385        let (command_sender, command_receiver) = bounded::<Vec<f32>>(1);
386        command_senders.push(command_sender);
387
388        let output_sender = output_sender.clone();
389        let join_handle =
390            spawn_channel_thread(channel, event_receiver, command_receiver, output_sender)?;
391        join_handles.push(join_handle);
392    }
393
394    if format == SynthFormat::Midi {
395        let _ = senders[9].send(ChannelEvent::Config(ChannelConfigEvent::SetPercussionMode(
396            true,
397        )));
398    }
399
400    Ok(PreparedRealtimeChannels {
401        channel_stats,
402        senders,
403        command_senders,
404        join_handles,
405        output_receiver,
406    })
407}
408
409fn spawn_channel_thread(
410    mut channel: VoiceChannel,
411    event_receiver: crossbeam_channel::Receiver<ChannelEvent>,
412    command_receiver: crossbeam_channel::Receiver<Vec<f32>>,
413    output_sender: crossbeam_channel::Sender<Vec<f32>>,
414) -> Result<thread::JoinHandle<()>, RealtimeSynthError> {
415    thread::Builder::new()
416        .name("xsynth_channel_handler".to_string())
417        .spawn(move || loop {
418            channel.push_events_iter(event_receiver.try_iter());
419            let mut vec = match command_receiver.recv() {
420                Ok(vec) => vec,
421                Err(_) => break,
422            };
423            channel.push_events_iter(event_receiver.try_iter());
424            channel.read_samples(&mut vec);
425            if output_sender.send(vec).is_err() {
426                break;
427            }
428        })
429        .map_err(RealtimeSynthError::ChannelThreadSpawn)
430}
431
432fn build_render_pipe(
433    stream_params: AudioStreamParams,
434    channel_count: u32,
435    command_senders: Vec<crossbeam_channel::Sender<Vec<f32>>>,
436    output_receiver: crossbeam_channel::Receiver<Vec<f32>>,
437    channel_stats: Vec<xsynth_core::channel::VoiceChannelStatsReader>,
438    stats: &RealtimeSynthStats,
439) -> FunctionAudioPipe<impl FnMut(&mut [f32]) + Send> {
440    let mut vec_cache: VecDeque<Vec<f32>> = VecDeque::new();
441    for _ in 0..channel_count {
442        vec_cache.push_front(Vec::new());
443    }
444
445    let total_voice_count = stats.voice_count.clone();
446
447    FunctionAudioPipe::new(stream_params, move |out| {
448        for sender in &command_senders {
449            let mut buf = vec_cache.pop_front().unwrap();
450            prepapre_cache_vec(&mut buf, out.len(), 0.0);
451            sender.send(buf).unwrap();
452        }
453
454        for _ in 0..channel_count {
455            let buf = output_receiver.recv().unwrap();
456            sum_simd(&buf, out);
457            vec_cache.push_front(buf);
458        }
459
460        let total_voices = channel_stats.iter().map(|c| c.voice_count()).sum();
461        total_voice_count.store(total_voices, Ordering::SeqCst);
462    })
463}
464
465fn build_output_stream(
466    device: &Device,
467    stream_config: SupportedStreamConfig,
468    buffered: Arc<Mutex<BufferedRenderer>>,
469) -> Result<Stream, RealtimeSynthError> {
470    match stream_config.sample_format() {
471        cpal::SampleFormat::F32 => build_output_stream_for::<f32>(device, stream_config, buffered),
472        cpal::SampleFormat::I16 => build_output_stream_for::<i16>(device, stream_config, buffered),
473        cpal::SampleFormat::U16 => build_output_stream_for::<u16>(device, stream_config, buffered),
474        _ => Err(RealtimeSynthError::UnsupportedSampleFormat(
475            stream_config.sample_format(),
476        )),
477    }
478}
479
480fn build_output_stream_for<T: SizedSample + ConvertSample>(
481    device: &Device,
482    stream_config: SupportedStreamConfig,
483    buffered: Arc<Mutex<BufferedRenderer>>,
484) -> Result<Stream, RealtimeSynthError> {
485    let err_fn = |err| eprintln!("an error occurred on stream: {err}");
486    let mut output_vec = Vec::new();
487    let mut limiter = VolumeLimiter::new(stream_config.channels());
488
489    Ok(device.build_output_stream(
490        &stream_config.into(),
491        move |data: &mut [T], _: &cpal::OutputCallbackInfo| {
492            output_vec.resize(data.len(), 0.0);
493            buffered.lock().unwrap().read(&mut output_vec);
494            for (i, s) in limiter.limit_iter(output_vec.drain(0..)).enumerate() {
495                data[i] = ConvertSample::from_f32(s);
496            }
497        },
498        err_fn,
499        None,
500    )?)
501}
502
503fn spawn_stream_thread(
504    device: Device,
505    stream_config: SupportedStreamConfig,
506    buffered: Arc<Mutex<BufferedRenderer>>,
507) -> Result<
508    (
509        crossbeam_channel::Sender<StreamCommand>,
510        thread::JoinHandle<()>,
511    ),
512    RealtimeSynthError,
513> {
514    let (command_sender, command_receiver) = unbounded();
515    let (ready_sender, ready_receiver) = bounded(1);
516    let join_handle = thread::Builder::new()
517        .name("xsynth_stream_owner".to_string())
518        .spawn(move || {
519            let stream = match build_output_stream(&device, stream_config, buffered) {
520                Ok(stream) => stream,
521                Err(err) => {
522                    let _ = ready_sender.send(Err(err));
523                    return;
524                }
525            };
526            if let Err(err) = stream.play() {
527                let _ = ready_sender.send(Err(err.into()));
528                return;
529            }
530            if ready_sender.send(Ok(())).is_err() {
531                return;
532            }
533
534            while let Ok(command) = command_receiver.recv() {
535                match command {
536                    StreamCommand::Pause(reply) => {
537                        let _ = reply.send(stream.pause());
538                    }
539                    StreamCommand::Resume(reply) => {
540                        let _ = reply.send(stream.play());
541                    }
542                    StreamCommand::Shutdown => break,
543                }
544            }
545        })
546        .map_err(RealtimeSynthError::StreamThreadSpawn)?;
547
548    match ready_receiver.recv() {
549        Ok(Ok(())) => Ok((command_sender, join_handle)),
550        Ok(Err(err)) => {
551            let _ = join_handle.join();
552            Err(err)
553        }
554        Err(_) => {
555            let _ = join_handle.join();
556            Err(RealtimeSynthError::StreamThreadInit)
557        }
558    }
559}
560
561impl Drop for RealtimeSynth {
562    fn drop(&mut self) {
563        let data = self.data.take().unwrap();
564        let _ = data.stream_control.send(StreamCommand::Shutdown);
565        drop(data);
566        if let Some(handle) = self.stream_owner.take() {
567            if handle.join().is_err() {
568                eprintln!("xsynth-realtime: stream owner thread panicked during shutdown");
569            }
570        }
571        for handle in self.join_handles.drain(..) {
572            if handle.join().is_err() {
573                eprintln!("xsynth-realtime: channel handler thread panicked during shutdown");
574            }
575        }
576    }
577}
578
579trait ConvertSample: SizedSample {
580    fn from_f32(s: f32) -> Self;
581}
582
583impl ConvertSample for f32 {
584    fn from_f32(s: f32) -> Self {
585        s
586    }
587}
588
589impl ConvertSample for i16 {
590    fn from_f32(s: f32) -> Self {
591        (s * i16::MAX as f32) as i16
592    }
593}
594
595impl ConvertSample for u16 {
596    fn from_f32(s: f32) -> Self {
597        ((s * u16::MAX as f32) as i32 + i16::MIN as i32) as u16
598    }
599}
600
601fn calculate_render_size(sample_rate: u32, buffer_ms: f64) -> usize {
602    (sample_rate as f64 * buffer_ms / 1000.0) as usize
603}
604
605#[cfg(test)]
606mod tests {
607    use super::RealtimeSynth;
608
609    #[test]
610    fn realtime_synth_is_send_sync() {
611        fn assert_send_sync<T: Send + Sync>() {}
612        assert_send_sync::<RealtimeSynth>();
613    }
614}