use crate::app::logging::log_pipewire_operation;
use log::{debug, warn};
use pipewire::{
context::ContextBox, main_loop::MainLoopBox, metadata::Metadata, properties::PropertiesBox,
registry::GlobalObject, types::ObjectType,
};
use std::cell::Cell;
use std::cell::RefCell;
use std::rc::Rc;
use std::sync::OnceLock;
use std::time::Duration;
const SETTINGS_METADATA_NAME: &str = "settings";
const CLOCK_FORCE_RATE_KEY: &str = "clock.force-rate";
const CLOCK_ALLOWED_RATES_KEY: &str = "clock.allowed-rates";
const DISCOVERY_TIMEOUT: Duration = Duration::from_millis(50);
const SYNC_TIMEOUT: Duration = Duration::from_millis(50);
const LOOP_ITERATION_STEP: Duration = Duration::from_millis(10);
static SUPPORTED_RATES_CACHE: OnceLock<Vec<u32>> = OnceLock::new();
pub fn set_sample_rate(rate: u32) -> Result<(), String> {
let result = set_sample_rate_inner(rate);
let operation = if rate == 0 {
"reset_sample_rate"
} else {
"set_sample_rate"
};
match &result {
Ok(()) => {
let details = if rate == 0 {
"automatic".to_string()
} else {
format!("{} Hz", rate)
};
log_pipewire_operation(operation, true, Some(&details));
}
Err(e) => {
log_pipewire_operation(operation, false, Some(e));
}
}
result
}
fn set_sample_rate_inner(rate: u32) -> Result<(), String> {
pipewire::init();
let mainloop =
MainLoopBox::new(None).map_err(|e| format!("Failed to create PipeWire MainLoop: {e}"))?;
let context = ContextBox::new(mainloop.loop_(), None)
.map_err(|e| format!("Failed to create PipeWire Context: {e}"))?;
let core = context
.connect(None)
.map_err(|e| format!("Failed to connect to PipeWire Core: {e}"))?;
let registry = core
.get_registry()
.map_err(|e| format!("Failed to get PipeWire registry: {e}"))?;
let found_global: Rc<RefCell<Option<GlobalObject<PropertiesBox>>>> =
Rc::new(RefCell::new(None));
let found_global_clone = found_global.clone();
let _registry_listener = registry
.add_listener_local()
.global(move |global| {
if global.type_ == ObjectType::Metadata
&& let Some(props) = global.props.as_ref()
&& props.get("metadata.name") == Some(SETTINGS_METADATA_NAME)
{
*found_global_clone.borrow_mut() = Some(global.to_owned());
}
})
.register();
let start = std::time::Instant::now();
while found_global.borrow().is_none() && start.elapsed() < DISCOVERY_TIMEOUT {
mainloop.loop_().iterate(LOOP_ITERATION_STEP);
}
let global_ref = found_global.borrow();
let global = global_ref
.as_ref()
.ok_or_else(|| "Timeout waiting for PipeWire settings metadata".to_string())?;
let metadata: Metadata = registry
.bind(global)
.map_err(|e| format!("Failed to bind metadata: {e}"))?;
if rate == 0 {
metadata.set_property(0, CLOCK_FORCE_RATE_KEY, None, None);
debug!("Reset PipeWire sample rate to automatic");
} else {
let rate_str = rate.to_string();
metadata.set_property(0, CLOCK_FORCE_RATE_KEY, None, Some(&rate_str));
debug!("Set PipeWire sample rate to {rate} Hz");
}
let done = Rc::new(Cell::new(false));
let done_clone = done.clone();
let _core_listener = core
.add_listener_local()
.done(move |_id, _seq| {
done_clone.set(true);
})
.register();
core.sync(0).map_err(|e| format!("Failed to sync: {e}"))?;
drop(global_ref);
let sync_start = std::time::Instant::now();
while !done.get() && sync_start.elapsed() < SYNC_TIMEOUT {
mainloop.loop_().iterate(LOOP_ITERATION_STEP);
}
if !done.get() {
warn!("PipeWire sync timed out, property may not be applied");
}
Ok(())
}
#[cfg(target_os = "linux")]
pub fn reset_sample_rate() -> Result<(), String> {
set_sample_rate(0)
}
pub fn initialize_supported_rates() -> Result<Vec<u32>, String> {
if let Some(rates) = SUPPORTED_RATES_CACHE.get() {
return Ok(rates.clone());
}
let rates = get_supported_rates_inner()?;
let _ = SUPPORTED_RATES_CACHE.set(rates.clone());
Ok(rates)
}
pub fn get_supported_rates() -> Option<Vec<u32>> {
SUPPORTED_RATES_CACHE.get().cloned()
}
fn get_supported_rates_inner() -> Result<Vec<u32>, String> {
if let Some(rates) = get_allowed_rates_from_pw_metadata()
&& !rates.is_empty()
{
debug!("Got allowed-rates from pw-metadata: {:?}", rates);
return Ok(rates);
}
if let Some(rates) = get_allowed_rates_from_api()
&& !rates.is_empty()
{
debug!("Got allowed-rates from PipeWire API: {:?}", rates);
return Ok(rates);
}
debug!("No allowed-rates found in PipeWire, using common rates");
Ok(vec![
44100, 48000,
])
}
fn get_allowed_rates_from_pw_metadata() -> Option<Vec<u32>> {
use std::process::Command;
let output = Command::new("pw-metadata")
.args(["-n", "settings"])
.output()
.ok()?;
if !output.status.success() {
return None;
}
let stdout = String::from_utf8_lossy(&output.stdout);
for line in stdout.lines() {
if line.contains("clock.allowed-rates") && line.contains("value:") {
if let Some(start) = line.find("value:'") {
let value_start = start + 7; if let Some(end) = line[value_start..].find("' type:") {
let value = &line[value_start..value_start + end];
let rates = parse_allowed_rates(value);
if !rates.is_empty() {
return Some(rates);
}
}
}
}
}
None
}
fn get_allowed_rates_from_api() -> Option<Vec<u32>> {
pipewire::init();
let mainloop = MainLoopBox::new(None).ok()?;
let context = ContextBox::new(mainloop.loop_(), None).ok()?;
let core = context.connect(None).ok()?;
let registry = core.get_registry().ok()?;
let found_global: Rc<RefCell<Option<GlobalObject<PropertiesBox>>>> =
Rc::new(RefCell::new(None));
let found_global_clone = found_global.clone();
let allowed_rates: Rc<RefCell<Vec<u32>>> = Rc::new(RefCell::new(Vec::new()));
let allowed_rates_clone = allowed_rates.clone();
let _registry_listener = registry
.add_listener_local()
.global(move |global| {
if global.type_ == ObjectType::Metadata
&& let Some(props) = global.props.as_ref()
&& props.get("metadata.name") == Some(SETTINGS_METADATA_NAME)
{
*found_global_clone.borrow_mut() = Some(global.to_owned());
}
})
.register();
let start = std::time::Instant::now();
while found_global.borrow().is_none() && start.elapsed() < DISCOVERY_TIMEOUT {
mainloop.loop_().iterate(LOOP_ITERATION_STEP);
}
let global_ref = found_global.borrow();
if let Some(global) = global_ref.as_ref() {
let metadata_result: Result<Metadata, _> = registry.bind(global);
if let Ok(metadata) = metadata_result {
let allowed_rates_listener = allowed_rates_clone.clone();
let _metadata_listener = metadata
.add_listener_local()
.property(move |_subject, key, _type, value| {
if let Some(key) = key
&& key == CLOCK_ALLOWED_RATES_KEY
&& let Some(value) = value
{
let rates = parse_allowed_rates(value);
if !rates.is_empty() {
debug!("Found PipeWire allowed-rates: {:?}", rates);
*allowed_rates_listener.borrow_mut() = rates;
}
}
0 })
.register();
let prop_start = std::time::Instant::now();
while allowed_rates_clone.borrow().is_empty()
&& prop_start.elapsed() < DISCOVERY_TIMEOUT
{
mainloop.loop_().iterate(LOOP_ITERATION_STEP);
}
}
}
drop(global_ref);
let rates = allowed_rates.borrow().clone();
if rates.is_empty() { None } else { Some(rates) }
}
fn parse_allowed_rates(value: &str) -> Vec<u32> {
let mut rates = Vec::new();
let cleaned = value
.trim()
.trim_start_matches('[')
.trim_end_matches(']')
.trim();
for part in cleaned.split([',', ' ']) {
let trimmed = part.trim();
if !trimmed.is_empty()
&& let Ok(rate) = trimmed.parse::<u32>()
&& (8000..=384000).contains(&rate)
&& !rates.contains(&rate)
{
rates.push(rate);
}
}
rates.sort();
rates
}
pub async fn set_sample_rate_async(rate: u32) -> Result<(), String> {
tokio::task::spawn_blocking(move || set_sample_rate(rate))
.await
.map_err(|e| format!("Task join error: {e}"))?
}
pub async fn reset_sample_rate_async() -> Result<(), String> {
tokio::task::spawn_blocking(reset_sample_rate)
.await
.map_err(|e| format!("Task join error: {e}"))?
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
#[ignore] fn test_set_and_reset_rate() {
let result = set_sample_rate(48000);
assert!(result.is_ok(), "Failed to set sample rate: {:?}", result);
let result = reset_sample_rate();
assert!(result.is_ok(), "Failed to reset sample rate: {:?}", result);
}
#[test]
#[ignore] fn test_common_sample_rates() {
let rates = [44100, 48000];
for rate in rates {
let result = set_sample_rate(rate);
assert!(
result.is_ok(),
"Failed to set sample rate {}: {:?}",
rate,
result
);
}
let _ = reset_sample_rate();
}
}