use jack::{
AudioIn, AudioOut, Client, ClientOptions, Control, NotificationHandler, Port, ProcessHandler,
ProcessScope,
};
use oxisound_core::{InputStream, OutputStream, OxiSoundError, StreamConfig, StreamStats};
type AudioCallback = Box<dyn FnMut(&mut [f32]) + Send>;
use ringbuf::{
HeapCons, HeapProd, HeapRb,
traits::{Consumer, Producer, Split},
};
use std::sync::{
Arc,
atomic::{AtomicU64, Ordering},
};
use super::{JackTransportPosition, JackTransportState};
use crate::metrics::JackMetrics;
pub(crate) struct JackNotifier {
metrics: JackMetrics,
}
impl NotificationHandler for JackNotifier {
fn sample_rate(&mut self, _: &Client, srate: jack::Frames) -> Control {
self.metrics.record_sample_rate(srate);
Control::Continue
}
fn xrun(&mut self, _: &Client) -> Control {
self.metrics.record_xrun();
Control::Continue
}
}
fn ring_capacity(sample_rate: u32) -> usize {
(sample_rate as usize).saturating_mul(2)
}
pub struct JackDevice {
client: Client,
name: String,
}
impl JackDevice {
pub fn new(client_name: &str) -> Result<Self, OxiSoundError> {
let (client, _status) = Client::new(client_name, ClientOptions::NO_START_SERVER)
.map_err(|e| OxiSoundError::Device(format!("JACK client open failed: {e}")))?;
Ok(Self {
name: client_name.to_owned(),
client,
})
}
pub fn sample_rate(&self) -> u32 {
self.client.sample_rate()
}
pub fn buffer_size(&self) -> u32 {
self.client.buffer_size()
}
pub fn open_output(self, _config: StreamConfig) -> Result<JackOutputStream, OxiSoundError> {
let sample_rate = self.sample_rate();
let capacity = ring_capacity(sample_rate);
let rb = HeapRb::<f32>::new(capacity);
let (producer, consumer) = rb.split();
let underruns = Arc::new(AtomicU64::new(0));
let frames_processed = Arc::new(AtomicU64::new(0));
let out_port = self
.client
.register_port("out", AudioOut::default())
.map_err(|e| OxiSoundError::Device(format!("JACK port register failed: {e}")))?;
let metrics = JackMetrics::new();
let notifier = JackNotifier {
metrics: metrics.clone(),
};
let handler = JackOutputHandler {
port: out_port,
consumer,
underruns: Arc::clone(&underruns),
frames_processed: Arc::clone(&frames_processed),
metrics: metrics.clone(),
};
let active = self
.client
.activate_async(notifier, handler)
.map_err(|e| OxiSoundError::Device(format!("JACK activate_async failed: {e}")))?;
Ok(JackOutputStream {
producer,
frames_processed,
underruns,
metrics,
_active: active,
})
}
pub fn open_input(self, _config: StreamConfig) -> Result<JackInputStream, OxiSoundError> {
let sample_rate = self.sample_rate();
let capacity = ring_capacity(sample_rate);
let rb = HeapRb::<f32>::new(capacity);
let (producer, consumer) = rb.split();
let overruns = Arc::new(AtomicU64::new(0));
let frames_processed = Arc::new(AtomicU64::new(0));
let in_port = self
.client
.register_port("in", AudioIn::default())
.map_err(|e| OxiSoundError::Device(format!("JACK port register failed: {e}")))?;
let metrics = JackMetrics::new();
let notifier = JackNotifier {
metrics: metrics.clone(),
};
let handler = JackInputHandler {
port: in_port,
producer,
overruns: Arc::clone(&overruns),
frames_processed: Arc::clone(&frames_processed),
metrics: metrics.clone(),
};
let active = self
.client
.activate_async(notifier, handler)
.map_err(|e| OxiSoundError::Device(format!("JACK activate_async failed: {e}")))?;
Ok(JackInputStream {
consumer,
frames_processed,
overruns,
metrics,
_active: active,
})
}
pub fn open_output_callback<F>(
self,
_config: StreamConfig,
callback: F,
) -> Result<JackCallbackOutputStream, OxiSoundError>
where
F: FnMut(&mut [f32]) + Send + 'static,
{
let frames_processed = Arc::new(AtomicU64::new(0));
let out_port = self
.client
.register_port("out", AudioOut::default())
.map_err(|e| OxiSoundError::Device(format!("JACK port register failed: {e}")))?;
let metrics = JackMetrics::new();
let notifier = JackNotifier {
metrics: metrics.clone(),
};
let callback_boxed: AudioCallback = Box::new(callback);
let handler = JackCallbackOutputHandler {
port: out_port,
callback: callback_boxed,
frames_processed: Arc::clone(&frames_processed),
metrics: metrics.clone(),
};
let active = self
.client
.activate_async(notifier, handler)
.map_err(|e| OxiSoundError::Device(format!("JACK activate_async failed: {e}")))?;
Ok(JackCallbackOutputStream {
frames_processed,
metrics,
_active: active,
})
}
pub fn connect_ports(&self, src: &str, dst: &str) -> Result<(), OxiSoundError> {
match self.client.connect_ports_by_name(src, dst) {
Ok(()) => Ok(()),
Err(jack::Error::PortAlreadyConnected(_, _)) => Ok(()),
Err(e) => Err(OxiSoundError::Device(format!(
"JACK port connect {src} → {dst} failed: {e}"
))),
}
}
pub fn auto_connect_output(&self, port_name: &str) -> Result<(), OxiSoundError> {
let full_src = format!("{}:{}", self.name, port_name);
let system_ports =
self.client
.ports(Some("system:playback_"), None, jack::PortFlags::IS_INPUT);
for sys_port in system_ports.iter().take(2) {
match self.client.connect_ports_by_name(&full_src, sys_port) {
Ok(()) | Err(jack::Error::PortAlreadyConnected(_, _)) => {}
Err(e) => {
log::warn!("[oxisound-jack] auto-connect {full_src} → {sys_port}: {e}");
}
}
}
Ok(())
}
pub fn transport_state(&self) -> JackTransportState {
match self.client.transport().query_state() {
Ok(jack::TransportState::Rolling) => JackTransportState::Rolling,
Ok(jack::TransportState::Starting) => JackTransportState::Starting,
_ => JackTransportState::Stopped,
}
}
pub fn transport_position(&self) -> JackTransportPosition {
match self.client.transport().query() {
Ok(state_pos) => {
let frame = state_pos.pos.frame() as u64;
let bpm = state_pos.pos.bbt().map(|b| b.bpm);
JackTransportPosition { frame, bpm }
}
Err(_) => JackTransportPosition {
frame: 0,
bpm: None,
},
}
}
pub fn set_freewheel(&self, _enabled: bool) -> Result<(), OxiSoundError> {
Err(OxiSoundError::Unsupported(
"JACK freewheel is not implemented in the jack 0.13.5 safe API (upstream TODO). \
Track: https://github.com/RustAudio/rust-jack"
.into(),
))
}
}
pub struct JackOutputStream {
producer: HeapProd<f32>,
frames_processed: Arc<AtomicU64>,
underruns: Arc<AtomicU64>,
pub(crate) metrics: JackMetrics,
_active: jack::AsyncClient<JackNotifier, JackOutputHandler>,
}
struct JackOutputHandler {
port: Port<AudioOut>,
consumer: HeapCons<f32>,
underruns: Arc<AtomicU64>,
frames_processed: Arc<AtomicU64>,
pub(crate) metrics: JackMetrics,
}
impl ProcessHandler for JackOutputHandler {
fn buffer_size(&mut self, _: &Client, size: jack::Frames) -> Control {
self.metrics.record_buffer_size(size);
Control::Continue
}
fn process(&mut self, _client: &Client, ps: &ProcessScope) -> Control {
let out = self.port.as_mut_slice(ps);
let len = out.len() as u64;
for sample in out.iter_mut() {
*sample = self.consumer.try_pop().unwrap_or_else(|| {
self.underruns.fetch_add(1, Ordering::Relaxed);
0.0
});
}
self.frames_processed.fetch_add(len, Ordering::Relaxed);
let latency = self.port.get_latency_range(jack::LatencyType::Playback).1;
self.metrics.record_latency(latency);
Control::Continue
}
}
impl OutputStream for JackOutputStream {
fn write(&mut self, samples: &[f32]) -> Result<(), OxiSoundError> {
for &s in samples {
let _ = self.producer.try_push(s);
}
Ok(())
}
fn stats(&self) -> StreamStats {
StreamStats {
frames_processed: self.frames_processed.load(Ordering::Relaxed),
underruns: self.underruns.load(Ordering::Relaxed),
overruns: 0,
latency_frames: self.metrics.snapshot().latency_frames,
cpu_load_percent: 0.0,
}
}
}
impl JackOutputStream {
pub fn current_sample_rate(&self) -> u32 {
self.metrics.snapshot().sample_rate
}
pub fn xrun_count(&self) -> u64 {
self.metrics.snapshot().xrun_count
}
pub fn current_buffer_size(&self) -> u32 {
self.metrics.snapshot().buffer_size
}
pub fn connect_ports(&self, src: &str, dst: &str) -> Result<(), OxiSoundError> {
let client = self._active.as_client();
match client.connect_ports_by_name(src, dst) {
Ok(()) => Ok(()),
Err(jack::Error::PortAlreadyConnected(_, _)) => Ok(()),
Err(e) => Err(OxiSoundError::Device(format!(
"JACK port connect {src} → {dst} failed: {e}"
))),
}
}
pub fn auto_connect(&self) -> Result<(), OxiSoundError> {
let client = self._active.as_client();
let client_name = client.name();
let full_src = format!("{client_name}:out");
let system_ports = client.ports(Some("system:playback_"), None, jack::PortFlags::IS_INPUT);
for sys_port in system_ports.iter().take(2) {
match client.connect_ports_by_name(&full_src, sys_port) {
Ok(()) | Err(jack::Error::PortAlreadyConnected(_, _)) => {}
Err(e) => {
log::warn!("[oxisound-jack] auto-connect {full_src} → {sys_port}: {e}");
}
}
}
Ok(())
}
pub fn cpu_load(&self) -> f32 {
self._active.as_client().cpu_load()
}
pub fn list_ports(&self) -> Vec<String> {
self._active
.as_client()
.ports(None, None, jack::PortFlags::empty())
}
pub fn list_input_ports(&self, pattern: Option<&str>) -> Vec<String> {
self._active
.as_client()
.ports(pattern, None, jack::PortFlags::IS_INPUT)
}
pub fn list_output_ports(&self, pattern: Option<&str>) -> Vec<String> {
self._active
.as_client()
.ports(pattern, None, jack::PortFlags::IS_OUTPUT)
}
}
pub struct JackInputStream {
consumer: HeapCons<f32>,
frames_processed: Arc<AtomicU64>,
overruns: Arc<AtomicU64>,
pub(crate) metrics: JackMetrics,
_active: jack::AsyncClient<JackNotifier, JackInputHandler>,
}
struct JackInputHandler {
port: Port<AudioIn>,
producer: HeapProd<f32>,
overruns: Arc<AtomicU64>,
frames_processed: Arc<AtomicU64>,
pub(crate) metrics: JackMetrics,
}
impl ProcessHandler for JackInputHandler {
fn buffer_size(&mut self, _: &Client, size: jack::Frames) -> Control {
self.metrics.record_buffer_size(size);
Control::Continue
}
fn process(&mut self, _client: &Client, ps: &ProcessScope) -> Control {
let inp = self.port.as_slice(ps);
let len = inp.len() as u64;
for &sample in inp {
if self.producer.try_push(sample).is_err() {
self.overruns.fetch_add(1, Ordering::Relaxed);
}
}
self.frames_processed.fetch_add(len, Ordering::Relaxed);
let latency = self.port.get_latency_range(jack::LatencyType::Capture).1;
self.metrics.record_latency(latency);
Control::Continue
}
}
impl InputStream for JackInputStream {
fn read(&mut self, samples: &mut [f32]) -> Result<usize, OxiSoundError> {
let mut count = 0;
for slot in samples.iter_mut() {
match self.consumer.try_pop() {
Some(s) => {
*slot = s;
count += 1;
}
None => break,
}
}
Ok(count)
}
fn stats(&self) -> StreamStats {
StreamStats {
frames_processed: self.frames_processed.load(Ordering::Relaxed),
underruns: 0,
overruns: self.overruns.load(Ordering::Relaxed),
latency_frames: self.metrics.snapshot().latency_frames,
cpu_load_percent: 0.0,
}
}
}
impl JackInputStream {
pub fn current_sample_rate(&self) -> u32 {
self.metrics.snapshot().sample_rate
}
pub fn xrun_count(&self) -> u64 {
self.metrics.snapshot().xrun_count
}
pub fn current_buffer_size(&self) -> u32 {
self.metrics.snapshot().buffer_size
}
pub fn connect_ports(&self, src: &str, dst: &str) -> Result<(), OxiSoundError> {
let client = self._active.as_client();
match client.connect_ports_by_name(src, dst) {
Ok(()) => Ok(()),
Err(jack::Error::PortAlreadyConnected(_, _)) => Ok(()),
Err(e) => Err(OxiSoundError::Device(format!(
"JACK port connect {src} → {dst} failed: {e}"
))),
}
}
pub fn auto_connect(&self) -> Result<(), OxiSoundError> {
let client = self._active.as_client();
let client_name = client.name();
let full_dst = format!("{client_name}:in");
let system_ports = client.ports(Some("system:capture_"), None, jack::PortFlags::IS_OUTPUT);
for sys_port in system_ports.iter().take(2) {
match client.connect_ports_by_name(sys_port, &full_dst) {
Ok(()) | Err(jack::Error::PortAlreadyConnected(_, _)) => {}
Err(e) => {
log::warn!("[oxisound-jack] auto-connect {sys_port} → {full_dst}: {e}");
}
}
}
Ok(())
}
pub fn cpu_load(&self) -> f32 {
self._active.as_client().cpu_load()
}
pub fn list_ports(&self) -> Vec<String> {
self._active
.as_client()
.ports(None, None, jack::PortFlags::empty())
}
pub fn list_input_ports(&self, pattern: Option<&str>) -> Vec<String> {
self._active
.as_client()
.ports(pattern, None, jack::PortFlags::IS_INPUT)
}
pub fn list_output_ports(&self, pattern: Option<&str>) -> Vec<String> {
self._active
.as_client()
.ports(pattern, None, jack::PortFlags::IS_OUTPUT)
}
}
pub struct JackCallbackOutputStream {
frames_processed: Arc<AtomicU64>,
pub(crate) metrics: JackMetrics,
_active: jack::AsyncClient<JackNotifier, JackCallbackOutputHandler>,
}
struct JackCallbackOutputHandler {
port: Port<AudioOut>,
callback: AudioCallback,
frames_processed: Arc<AtomicU64>,
pub(crate) metrics: JackMetrics,
}
impl ProcessHandler for JackCallbackOutputHandler {
fn buffer_size(&mut self, _: &Client, size: jack::Frames) -> Control {
self.metrics.record_buffer_size(size);
Control::Continue
}
fn process(&mut self, _client: &Client, ps: &ProcessScope) -> Control {
let out = self.port.as_mut_slice(ps);
let len = out.len() as u64;
(self.callback)(out);
self.frames_processed.fetch_add(len, Ordering::Relaxed);
let latency = self.port.get_latency_range(jack::LatencyType::Playback).1;
self.metrics.record_latency(latency);
Control::Continue
}
}
impl JackCallbackOutputStream {
pub fn stats(&self) -> StreamStats {
StreamStats {
frames_processed: self.frames_processed.load(Ordering::Relaxed),
underruns: 0,
overruns: 0,
latency_frames: self.metrics.snapshot().latency_frames,
cpu_load_percent: 0.0,
}
}
pub fn current_sample_rate(&self) -> u32 {
self.metrics.snapshot().sample_rate
}
pub fn xrun_count(&self) -> u64 {
self.metrics.snapshot().xrun_count
}
pub fn current_buffer_size(&self) -> u32 {
self.metrics.snapshot().buffer_size
}
pub fn connect_ports(&self, src: &str, dst: &str) -> Result<(), OxiSoundError> {
let client = self._active.as_client();
match client.connect_ports_by_name(src, dst) {
Ok(()) => Ok(()),
Err(jack::Error::PortAlreadyConnected(_, _)) => Ok(()),
Err(e) => Err(OxiSoundError::Device(format!(
"JACK port connect {src} → {dst} failed: {e}"
))),
}
}
pub fn auto_connect(&self) -> Result<(), OxiSoundError> {
let client = self._active.as_client();
let client_name = client.name();
let full_src = format!("{client_name}:out");
let system_ports = client.ports(Some("system:playback_"), None, jack::PortFlags::IS_INPUT);
for sys_port in system_ports.iter().take(2) {
match client.connect_ports_by_name(&full_src, sys_port) {
Ok(()) | Err(jack::Error::PortAlreadyConnected(_, _)) => {}
Err(e) => {
log::warn!("[oxisound-jack] auto-connect {full_src} → {sys_port}: {e}");
}
}
}
Ok(())
}
pub fn cpu_load(&self) -> f32 {
self._active.as_client().cpu_load()
}
pub fn list_ports(&self) -> Vec<String> {
self._active
.as_client()
.ports(None, None, jack::PortFlags::empty())
}
pub fn list_input_ports(&self, pattern: Option<&str>) -> Vec<String> {
self._active
.as_client()
.ports(pattern, None, jack::PortFlags::IS_INPUT)
}
pub fn list_output_ports(&self, pattern: Option<&str>) -> Vec<String> {
self._active
.as_client()
.ports(pattern, None, jack::PortFlags::IS_OUTPUT)
}
}