use std::{
ffi::CString,
fs::File,
io::{BufReader, Read},
os::unix::net::UnixStream,
path::Path,
time,
};
use anyhow::{bail, Context};
use pulseaudio::protocol;
fn main() -> anyhow::Result<()> {
let args: Vec<String> = std::env::args().collect();
if args.len() != 2 {
println!("Usage: {} <file>", args[0]);
return Ok(());
}
let (mut sock, protocol_version) = connect_and_init().context("failed to initialize client")?;
let mut file = File::open(Path::new(&args[1]))?;
let mut wav_reader = hound::WavReader::new(&mut file)?;
let spec = wav_reader.spec();
let format = match (spec.bits_per_sample, spec.sample_format) {
(16, hound::SampleFormat::Int) => protocol::SampleFormat::S16Le,
_ => bail!(
"unsupported sample format: {}bit {:?}",
spec.bits_per_sample,
spec.sample_format,
),
};
let channel_map = match spec.channels {
1 => protocol::ChannelMap::mono(),
2 => protocol::ChannelMap::stereo(),
_ => bail!("unsupported channel count: {}", spec.channels),
};
let file_duration =
time::Duration::from_secs(wav_reader.duration() as u64 / spec.sample_rate as u64);
let file_bytes =
wav_reader.duration() as u64 * (spec.channels * spec.bits_per_sample / 8) as u64;
let pb = indicatif::ProgressBar::new(file_bytes)
.with_style(indicatif::ProgressStyle::with_template(&format!(
"[{{elapsed_precise}} / {}] {{bar}} {{msg}}",
indicatif::FormattedDuration(file_duration)
))?)
.with_finish(indicatif::ProgressFinish::AndLeave);
protocol::write_command_message(
sock.get_mut(),
99,
&protocol::Command::CreatePlaybackStream(protocol::PlaybackStreamParams {
sample_spec: protocol::SampleSpec {
format,
channels: spec.channels as u8,
sample_rate: spec.sample_rate,
},
channel_map,
cvolume: Some(protocol::ChannelVolume::norm(2)),
sink_name: Some(protocol::DEFAULT_SINK.to_owned()),
..Default::default()
}),
protocol_version,
)
.context("failed to send create_playback_stream")?;
let (seq, stream_info) = protocol::read_reply_message::<protocol::CreatePlaybackStreamReply>(
&mut sock,
protocol_version,
)
.context("create_playback_stream failed")?;
assert_eq!(seq, 99);
let mut buf = vec![0u8; stream_info.buffer_attr.minimum_request_length as usize];
let size = read_chunk(
&mut wav_reader,
&mut buf,
stream_info.requested_bytes as u64,
)?;
protocol::write_memblock(sock.get_mut(), stream_info.channel, &buf[..size], 0)
.context("write_memblock failed")?;
const TIMING_INFO: u32 = 200;
const DRAIN_COMPLETED: u32 = 201;
let mut draining = false;
loop {
let (seq, msg) = protocol::read_command_message(&mut sock, protocol_version)
.context("reading from socket")?;
match msg {
protocol::Command::Started(_) => pb.reset_elapsed(),
protocol::Command::Request(protocol::Request { channel, length }) => {
if channel != stream_info.channel {
bail!("unexpected channel: {}", channel);
}
if !draining {
let size = read_chunk(&mut wav_reader, &mut buf, length as u64)?;
if size > 0 {
protocol::write_memblock(
sock.get_mut(),
stream_info.channel,
&buf[..size],
0,
)
.context("write_memblock failed")?;
} else {
protocol::write_command_message(
sock.get_mut(),
DRAIN_COMPLETED,
&protocol::Command::DrainPlaybackStream(stream_info.channel),
protocol_version,
)?;
draining = true;
}
}
protocol::write_command_message(
sock.get_mut(),
TIMING_INFO,
&protocol::Command::GetPlaybackLatency(protocol::LatencyParams {
channel,
now: time::SystemTime::now(),
}),
protocol_version,
)?;
}
protocol::Command::Reply if seq == TIMING_INFO => {
let mut ts = protocol::TagStructReader::new(&mut sock, protocol_version);
let timing_info = ts.read::<protocol::PlaybackLatency>()?;
let latency =
time::Duration::from_micros(timing_info.sink_usec + timing_info.source_usec);
pb.set_message(format!("{}ms latency", latency.as_millis()));
pb.set_position(timing_info.read_offset as u64)
}
protocol::Command::Reply if seq == DRAIN_COMPLETED => break,
protocol::Command::Underflow(_) => bail!("buffer underrun!"),
protocol::Command::Overflow(_) => bail!("buffer overrun!"),
protocol::Command::Error(e) => bail!("server error: {:?}", e),
_ => eprintln!("ignoring message: {:#?}", msg),
}
}
Ok(())
}
fn read_chunk<T: Read>(
wav_reader: &mut hound::WavReader<T>,
buf: &mut Vec<u8>,
target_length: u64,
) -> anyhow::Result<usize> {
use byteorder::WriteBytesExt;
if target_length > buf.len() as u64 {
buf.resize(target_length as usize, 0);
}
let mut cursor = std::io::Cursor::new(buf);
for sample in wav_reader.samples::<i16>() {
cursor.write_i16::<byteorder::LittleEndian>(sample?)?;
if cursor.position() >= target_length {
break;
}
}
Ok(cursor.position() as usize)
}
fn connect_and_init() -> anyhow::Result<(BufReader<UnixStream>, u16)> {
let socket_path = pulseaudio::socket_path_from_env().context("PulseAudio not available")?;
let mut sock = std::io::BufReader::new(UnixStream::connect(socket_path)?);
let cookie = pulseaudio::cookie_path_from_env()
.and_then(|path| std::fs::read(path).ok())
.unwrap_or_default();
let auth = protocol::AuthParams {
version: protocol::MAX_VERSION,
supports_shm: false,
supports_memfd: false,
cookie,
};
protocol::write_command_message(
sock.get_mut(),
0,
&protocol::Command::Auth(auth),
protocol::MAX_VERSION,
)?;
let (_, auth_reply) =
protocol::read_reply_message::<protocol::AuthReply>(&mut sock, protocol::MAX_VERSION)?;
let protocol_version = std::cmp::min(protocol::MAX_VERSION, auth_reply.version);
let mut props = protocol::Props::new();
props.set(
protocol::Prop::ApplicationName,
CString::new("pulseaudio-rs-playback").unwrap(),
);
protocol::write_command_message(
sock.get_mut(),
1,
&protocol::Command::SetClientName(props),
protocol_version,
)?;
let _ =
protocol::read_reply_message::<protocol::SetClientNameReply>(&mut sock, protocol_version)?;
Ok((sock, protocol_version))
}