use crate::protocol::serde::stream::{BufferAttr, StreamFlags};
use crate::protocol::{serde::*, ProtocolError};
use crate::protocol::{ChannelMap, ChannelVolume, Props, SampleSpec};
use std::ffi::CString;
use super::CommandReply;
#[derive(Default, Debug, Clone, Eq, PartialEq)]
pub struct RecordStreamParams {
pub sample_spec: SampleSpec,
pub channel_map: ChannelMap,
pub source_index: Option<u32>,
pub source_name: Option<CString>,
pub buffer_attr: BufferAttr,
pub flags: StreamFlags,
#[allow(missing_docs)]
pub direct_on_input_index: Option<u32>,
pub cvolume: Option<ChannelVolume>,
pub props: Props,
pub formats: Vec<FormatInfo>,
}
impl TagStructRead for RecordStreamParams {
fn read(ts: &mut TagStructReader<'_>, protocol_version: u16) -> Result<Self, ProtocolError> {
let sample_spec = ts.read()?;
let channel_map = ts.read()?;
let source_index = ts.read_index()?;
let source_name = ts.read_string()?;
let buffer_attr_max_length = ts.read_u32()?;
let mut flags = StreamFlags {
start_corked: ts.read_bool()?,
..Default::default()
};
let buffer_attr = BufferAttr {
max_length: buffer_attr_max_length,
fragment_size: ts.read_u32()?,
..Default::default()
};
flags.no_remap_channels = ts.read_bool()?;
flags.no_remix_channels = ts.read_bool()?;
flags.fix_format = ts.read_bool()?;
flags.fix_rate = ts.read_bool()?;
flags.fix_channels = ts.read_bool()?;
flags.no_move = ts.read_bool()?;
flags.variable_rate = ts.read_bool()?;
flags.peak_detect = ts.read_bool()?;
flags.adjust_latency = ts.read_bool()?;
let props = ts.read()?;
let direct_on_input_index = ts.read_index()?;
let mut params = Self {
sample_spec,
channel_map,
source_index,
source_name,
buffer_attr,
flags,
props,
direct_on_input_index,
..Default::default()
};
if protocol_version >= 14 {
flags.early_requests = ts.read_bool()?;
}
if protocol_version >= 15 {
flags.no_inhibit_auto_suspend = ts.read_bool()?;
flags.fail_on_suspend = ts.read_bool()?;
}
if protocol_version >= 22 {
for _ in 0..ts.read_u8()? {
params.formats.push(ts.read()?);
}
let volume = ts.read()?;
let start_muted = ts.read_bool()?;
if ts.read_bool()? {
params.cvolume = Some(volume);
}
if ts.read_bool()? {
flags.start_muted = Some(start_muted);
}
flags.relative_volume = ts.read_bool()?;
flags.passthrough = ts.read_bool()?;
}
Ok(params)
}
}
impl TagStructWrite for RecordStreamParams {
fn write(
&self,
ts: &mut TagStructWriter<'_>,
protocol_version: u16,
) -> Result<(), ProtocolError> {
ts.write(self.sample_spec)?;
ts.write(self.channel_map)?;
ts.write_index(self.source_index)?;
ts.write_string(self.source_name.as_ref())?;
ts.write_u32(self.buffer_attr.max_length)?;
ts.write_bool(self.flags.start_corked)?;
ts.write_u32(self.buffer_attr.fragment_size)?;
ts.write_bool(self.flags.no_remap_channels)?;
ts.write_bool(self.flags.no_remix_channels)?;
ts.write_bool(self.flags.fix_format)?;
ts.write_bool(self.flags.fix_rate)?;
ts.write_bool(self.flags.fix_channels)?;
ts.write_bool(self.flags.no_move)?;
ts.write_bool(self.flags.variable_rate)?;
ts.write_bool(self.flags.peak_detect)?;
ts.write_bool(self.flags.adjust_latency)?;
ts.write(&self.props)?;
ts.write_index(self.direct_on_input_index)?;
if protocol_version >= 14 {
ts.write_bool(self.flags.early_requests)?;
}
if protocol_version >= 15 {
ts.write_bool(self.flags.no_inhibit_auto_suspend)?;
ts.write_bool(self.flags.fail_on_suspend)?;
}
if protocol_version >= 22 {
ts.write_u8(self.formats.len() as u8)?;
for format in &self.formats {
ts.write(format)?;
}
ts.write(
self.cvolume
.unwrap_or_else(|| ChannelVolume::muted(self.sample_spec.channels)),
)?;
ts.write_bool(self.flags.start_muted.unwrap_or_default())?;
ts.write_bool(self.cvolume.is_some())?;
ts.write_bool(self.flags.start_muted.is_some())?;
ts.write_bool(self.flags.relative_volume)?;
ts.write_bool(self.flags.passthrough)?;
}
Ok(())
}
}
#[derive(Default, Debug, Clone, Eq, PartialEq)]
pub struct CreateRecordStreamReply {
pub channel: u32,
pub stream_index: u32,
pub buffer_attr: BufferAttr,
pub sample_spec: SampleSpec,
pub channel_map: ChannelMap,
pub stream_latency: u64,
pub sink_index: u32,
pub sink_name: Option<CString>,
pub suspended: bool,
pub format: FormatInfo,
}
impl CommandReply for CreateRecordStreamReply {}
impl TagStructRead for CreateRecordStreamReply {
fn read(ts: &mut TagStructReader<'_>, protocol_version: u16) -> Result<Self, ProtocolError> {
Ok(Self {
channel: ts
.read_index()?
.ok_or_else(|| ProtocolError::Invalid("invalid channel_index".into()))?,
stream_index: ts
.read_index()?
.ok_or_else(|| ProtocolError::Invalid("invalid stream_index".into()))?,
buffer_attr: BufferAttr {
max_length: ts.read_u32()?,
fragment_size: ts.read_u32()?,
..Default::default()
},
sample_spec: ts.read()?,
channel_map: ts.read()?,
sink_index: ts
.read_index()?
.ok_or_else(|| ProtocolError::Invalid("invalid sink_index".into()))?,
sink_name: ts.read_string()?,
suspended: ts.read_bool()?,
stream_latency: ts.read_usec()?,
format: if protocol_version >= 21 {
ts.read()?
} else {
FormatInfo::default()
},
})
}
}
impl TagStructWrite for CreateRecordStreamReply {
fn write(
&self,
w: &mut TagStructWriter<'_>,
protocol_version: u16,
) -> Result<(), ProtocolError> {
w.write_u32(self.channel)?;
w.write_u32(self.stream_index)?;
w.write_u32(self.buffer_attr.max_length)?;
w.write_u32(self.buffer_attr.fragment_size)?;
w.write(self.sample_spec)?;
w.write(self.channel_map)?;
w.write_u32(self.sink_index)?;
w.write_string(self.sink_name.as_ref())?;
w.write_bool(self.suspended)?;
w.write_usec(self.stream_latency)?;
if protocol_version >= 21 {
w.write(&self.format)?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::protocol::test_util::test_serde;
use super::*;
#[test]
fn params_serde() -> anyhow::Result<()> {
let params = RecordStreamParams {
sample_spec: SampleSpec {
format: SampleFormat::S16Le,
sample_rate: 44100,
channels: 2,
},
channel_map: ChannelMap::stereo(),
..Default::default()
};
test_serde(¶ms)
}
#[test]
fn reply_serde() -> anyhow::Result<()> {
let reply = CreateRecordStreamReply {
channel: 0,
stream_index: 1,
sink_index: 2,
..Default::default()
};
test_serde(&reply)
}
}
#[cfg(test)]
#[cfg(feature = "_integration-tests")]
mod integration_tests {
use super::*;
use crate::integration_test_util::*;
use crate::protocol::*;
#[test]
fn create_playback_stream() -> anyhow::Result<()> {
let (mut sock, protocol_version) = connect_and_init()?;
write_command_message(
sock.get_mut(),
0,
&Command::CreatePlaybackStream(PlaybackStreamParams {
sample_spec: SampleSpec {
format: SampleFormat::S16Le,
sample_rate: 44100,
channels: 2,
},
channel_map: ChannelMap::stereo(),
cvolume: Some(ChannelVolume::norm(2)),
flags: StreamFlags {
start_corked: true,
start_muted: Some(true),
..Default::default()
},
sink_index: None,
sink_name: Some(CString::new("@DEFAULT_SINK@")?),
..Default::default()
}),
protocol_version,
)?;
let _ = read_reply_message::<CreatePlaybackStreamReply>(&mut sock, protocol_version)?;
Ok(())
}
#[test]
fn create_record_stream_channel_count_invariants() -> anyhow::Result<()> {
let (mut sock, protocol_version) = connect_and_init()?;
const CHANNEL_COUNT: u8 = 3;
{
write_command_message(
sock.get_mut(),
0,
&Command::CreateRecordStream(RecordStreamParams {
sample_spec: SampleSpec {
format: SampleFormat::S16Le,
channels: CHANNEL_COUNT,
..Default::default()
},
channel_map: ChannelMap::new([ChannelPosition::Mono; CHANNEL_COUNT as usize]),
cvolume: Some(ChannelVolume::norm(CHANNEL_COUNT)),
..Default::default()
}),
protocol_version,
)?;
let _ = read_reply_message::<CreateRecordStreamReply>(&mut sock, protocol_version)?;
}
{
write_command_message(
sock.get_mut(),
1,
&Command::CreateRecordStream(RecordStreamParams {
sample_spec: SampleSpec {
format: SampleFormat::S16Le,
channels: CHANNEL_COUNT,
..Default::default()
},
channel_map: ChannelMap::new([ChannelPosition::Mono; CHANNEL_COUNT as usize]),
cvolume: None,
..Default::default()
}),
protocol_version,
)?;
let _ = read_reply_message::<CreateRecordStreamReply>(&mut sock, protocol_version)?;
}
Ok(())
}
}