use crate::buffer::Buffer;
use crate::{error::Error, properties::Properties};
use bitflags::bitflags;
use spa::utils::result::SpaResult;
use std::{
ffi::{self, CStr, CString},
fmt::Debug,
mem, os,
pin::Pin,
ptr,
};
mod box_;
pub use box_::*;
mod rc;
pub use rc::*;
#[derive(Debug, PartialEq)]
pub enum StreamState {
Error(String),
Unconnected,
Connecting,
Paused,
Streaming,
}
impl StreamState {
pub(crate) fn from_raw(state: pw_sys::pw_stream_state, error: *const os::raw::c_char) -> Self {
match state {
pw_sys::pw_stream_state_PW_STREAM_STATE_UNCONNECTED => StreamState::Unconnected,
pw_sys::pw_stream_state_PW_STREAM_STATE_CONNECTING => StreamState::Connecting,
pw_sys::pw_stream_state_PW_STREAM_STATE_PAUSED => StreamState::Paused,
pw_sys::pw_stream_state_PW_STREAM_STATE_STREAMING => StreamState::Streaming,
_ => {
let error = if error.is_null() {
"".to_string()
} else {
unsafe { ffi::CStr::from_ptr(error).to_string_lossy().to_string() }
};
StreamState::Error(error)
}
}
}
}
#[repr(transparent)]
pub struct Time(pw_sys::pw_time);
impl Clone for Time {
fn clone(&self) -> Self {
Self(pw_sys::pw_time { ..self.0 })
}
}
impl Time {
pub fn as_raw(&self) -> &pw_sys::pw_time {
&self.0
}
pub fn now(&self) -> i64 {
self.0.now
}
pub fn rate(&self) -> spa::utils::Fraction {
self.0.rate
}
pub fn ticks(&self) -> u64 {
self.0.ticks
}
pub fn delay(&self) -> i64 {
self.0.delay
}
pub fn queued(&self) -> u64 {
self.0.queued
}
#[cfg(feature = "v0_3_50")]
pub fn buffered(&self) -> u64 {
self.0.buffered
}
#[cfg(feature = "v0_3_50")]
pub fn queued_buffers(&self) -> u32 {
self.0.queued_buffers
}
#[cfg(feature = "v0_3_50")]
pub fn avail_buffers(&self) -> u32 {
self.0.avail_buffers
}
}
impl Debug for Time {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut s = f.debug_struct("Time");
s.field("now", &self.now())
.field("rate", &self.rate())
.field("ticks", &self.ticks())
.field("delay", &self.delay())
.field("queued", &self.queued());
#[cfg(feature = "v0_3_50")]
s.field("buffered", &self.buffered())
.field("queued_buffers", &self.queued_buffers())
.field("avail_buffers", &self.avail_buffers());
s.finish()
}
}
#[repr(transparent)]
pub struct Stream(pw_sys::pw_stream);
impl Stream {
pub fn as_raw(&self) -> &pw_sys::pw_stream {
&self.0
}
pub fn as_raw_ptr(&self) -> *mut pw_sys::pw_stream {
ptr::addr_of!(self.0).cast_mut()
}
#[must_use = "Use the builder to register event callbacks"]
pub fn add_local_listener_with_user_data<D>(
&self,
user_data: D,
) -> ListenerLocalBuilder<'_, D> {
let mut callbacks = ListenerLocalCallbacks::with_user_data(user_data);
callbacks.stream =
Some(ptr::NonNull::new(self.as_raw_ptr()).expect("Pointer should be nonnull"));
ListenerLocalBuilder {
stream: self,
callbacks,
}
}
#[must_use = "Use the builder to register event callbacks"]
pub fn add_local_listener<D: Default>(&self) -> ListenerLocalBuilder<'_, D> {
self.add_local_listener_with_user_data(Default::default())
}
pub fn connect(
&self,
direction: spa::utils::Direction,
id: Option<u32>,
flags: StreamFlags,
params: &mut [&spa::pod::Pod],
) -> Result<(), Error> {
let r = unsafe {
pw_sys::pw_stream_connect(
self.as_raw_ptr(),
direction.as_raw(),
id.unwrap_or(crate::constants::ID_ANY),
flags.bits(),
params.as_mut_ptr().cast(),
params.len() as u32,
)
};
SpaResult::from_c(r).into_sync_result()?;
Ok(())
}
pub fn update_params(&self, params: &mut [&spa::pod::Pod]) -> Result<(), Error> {
let r = unsafe {
pw_sys::pw_stream_update_params(
self.as_raw_ptr(),
params.as_mut_ptr().cast(),
params.len() as u32,
)
};
SpaResult::from_c(r).into_sync_result()?;
Ok(())
}
pub fn set_active(&self, active: bool) -> Result<(), Error> {
let r = unsafe { pw_sys::pw_stream_set_active(self.as_raw_ptr(), active) };
SpaResult::from_c(r).into_sync_result()?;
Ok(())
}
pub unsafe fn dequeue_raw_buffer(&self) -> *mut pw_sys::pw_buffer {
pw_sys::pw_stream_dequeue_buffer(self.as_raw_ptr())
}
pub fn dequeue_buffer(&self) -> Option<Buffer<'_>> {
unsafe { Buffer::from_raw(self.dequeue_raw_buffer(), self) }
}
pub unsafe fn queue_raw_buffer(&self, buffer: *mut pw_sys::pw_buffer) {
pw_sys::pw_stream_queue_buffer(self.as_raw_ptr(), buffer);
}
pub fn disconnect(&self) -> Result<(), Error> {
let r = unsafe { pw_sys::pw_stream_disconnect(self.as_raw_ptr()) };
SpaResult::from_c(r).into_sync_result()?;
Ok(())
}
pub fn set_error(&mut self, res: i32, error: &str) {
let error = CString::new(error).expect("failed to convert error to CString");
let error_cstr = error.as_c_str();
Stream::set_error_cstr(self, res, error_cstr)
}
pub fn set_error_cstr(&mut self, res: i32, error: &CStr) {
unsafe {
pw_sys::pw_stream_set_error(self.as_raw_ptr(), res, error.as_ptr());
}
}
pub fn flush(&self, drain: bool) -> Result<(), Error> {
let r = unsafe { pw_sys::pw_stream_flush(self.as_raw_ptr(), drain) };
SpaResult::from_c(r).into_sync_result()?;
Ok(())
}
pub fn set_control(&self, id: u32, values: &[f32]) -> Result<(), Error> {
let r = unsafe {
pw_sys::pw_stream_set_control(
self.as_raw_ptr(),
id,
values.len() as u32,
values.as_ptr() as *mut f32,
)
};
SpaResult::from_c(r).into_sync_result()?;
Ok(())
}
pub fn name(&self) -> String {
let name = unsafe {
let name = pw_sys::pw_stream_get_name(self.as_raw_ptr());
CStr::from_ptr(name)
};
name.to_string_lossy().to_string()
}
pub fn state(&self) -> StreamState {
let mut error: *const std::os::raw::c_char = ptr::null();
let state = unsafe {
pw_sys::pw_stream_get_state(self.as_raw_ptr(), (&mut error) as *mut *const _)
};
StreamState::from_raw(state, error)
}
pub fn properties(&self) -> &Properties {
unsafe {
let props = pw_sys::pw_stream_get_properties(self.as_raw_ptr());
let props = ptr::NonNull::new(props.cast_mut()).expect("stream properties is NULL");
props.cast().as_ref()
}
}
pub fn node_id(&self) -> u32 {
unsafe { pw_sys::pw_stream_get_node_id(self.as_raw_ptr()) }
}
#[cfg(feature = "v0_3_34")]
pub fn is_driving(&self) -> bool {
unsafe { pw_sys::pw_stream_is_driving(self.as_raw_ptr()) }
}
#[cfg(feature = "v0_3_34")]
pub fn trigger_process(&self) -> Result<(), Error> {
let r = unsafe { pw_sys::pw_stream_trigger_process(self.as_raw_ptr()) };
SpaResult::from_c(r).into_result()?;
Ok(())
}
pub fn time(&self) -> Result<Time, Error> {
unsafe {
let mut time = mem::MaybeUninit::<pw_sys::pw_time>::zeroed();
#[cfg(feature = "v0_3_50")]
let r = pw_sys::pw_stream_get_time_n(
self.as_raw_ptr(),
time.as_mut_ptr(),
mem::size_of::<pw_sys::pw_time>(),
);
#[cfg(not(feature = "v0_3_50"))]
let r = pw_sys::pw_stream_get_time(self.as_raw_ptr(), time.as_mut_ptr());
SpaResult::from_c(r).into_result()?;
Ok(Time(time.assume_init()))
}
}
}
type ParamChangedCB<D> = dyn FnMut(&Stream, &mut D, u32, Option<&spa::pod::Pod>);
type ProcessCB<D> = dyn FnMut(&Stream, &mut D);
#[allow(clippy::type_complexity)]
pub struct ListenerLocalCallbacks<D> {
pub state_changed: Option<Box<dyn FnMut(&Stream, &mut D, StreamState, StreamState)>>,
pub control_info:
Option<Box<dyn FnMut(&Stream, &mut D, u32, *const pw_sys::pw_stream_control)>>,
pub io_changed: Option<Box<dyn FnMut(&Stream, &mut D, u32, *mut os::raw::c_void, u32)>>,
pub param_changed: Option<Box<ParamChangedCB<D>>>,
pub add_buffer: Option<Box<dyn FnMut(&Stream, &mut D, *mut pw_sys::pw_buffer)>>,
pub remove_buffer: Option<Box<dyn FnMut(&Stream, &mut D, *mut pw_sys::pw_buffer)>>,
pub process: Option<Box<ProcessCB<D>>>,
pub drained: Option<Box<dyn FnMut(&Stream, &mut D)>>,
#[cfg(feature = "v0_3_39")]
pub command: Option<Box<dyn FnMut(&Stream, &mut D, *const spa_sys::spa_command)>>,
#[cfg(feature = "v0_3_40")]
pub trigger_done: Option<Box<dyn FnMut(&Stream, &mut D)>>,
pub user_data: D,
stream: Option<ptr::NonNull<pw_sys::pw_stream>>,
}
unsafe fn unwrap_stream_ptr<'a>(stream: Option<ptr::NonNull<pw_sys::pw_stream>>) -> &'a Stream {
stream
.map(|ptr| ptr.cast::<Stream>().as_ref())
.expect("stream cannot be null")
}
impl<D> ListenerLocalCallbacks<D> {
fn with_user_data(user_data: D) -> Self {
ListenerLocalCallbacks {
process: Default::default(),
stream: Default::default(),
drained: Default::default(),
add_buffer: Default::default(),
control_info: Default::default(),
io_changed: Default::default(),
param_changed: Default::default(),
remove_buffer: Default::default(),
state_changed: Default::default(),
#[cfg(feature = "v0_3_39")]
command: Default::default(),
#[cfg(feature = "v0_3_40")]
trigger_done: Default::default(),
user_data,
}
}
pub(crate) fn into_raw(
self,
) -> (
Pin<Box<pw_sys::pw_stream_events>>,
Box<ListenerLocalCallbacks<D>>,
) {
let callbacks = Box::new(self);
unsafe extern "C" fn on_state_changed<D>(
data: *mut os::raw::c_void,
old: pw_sys::pw_stream_state,
new: pw_sys::pw_stream_state,
error: *const os::raw::c_char,
) {
if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
if let Some(cb) = &mut state.state_changed {
let stream = unwrap_stream_ptr(state.stream);
let old = StreamState::from_raw(old, error);
let new = StreamState::from_raw(new, error);
cb(stream, &mut state.user_data, old, new)
};
}
}
unsafe extern "C" fn on_control_info<D>(
data: *mut os::raw::c_void,
id: u32,
control: *const pw_sys::pw_stream_control,
) {
if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
if let Some(cb) = &mut state.control_info {
let stream = unwrap_stream_ptr(state.stream);
cb(stream, &mut state.user_data, id, control);
}
}
}
unsafe extern "C" fn on_io_changed<D>(
data: *mut os::raw::c_void,
id: u32,
area: *mut os::raw::c_void,
size: u32,
) {
if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
if let Some(cb) = &mut state.io_changed {
let stream = unwrap_stream_ptr(state.stream);
cb(stream, &mut state.user_data, id, area, size);
}
}
}
unsafe extern "C" fn on_param_changed<D>(
data: *mut os::raw::c_void,
id: u32,
param: *const spa_sys::spa_pod,
) {
if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
if let Some(cb) = &mut state.param_changed {
let stream = unwrap_stream_ptr(state.stream);
let param = if !param.is_null() {
Some(spa::pod::Pod::from_raw(param))
} else {
None
};
cb(stream, &mut state.user_data, id, param);
}
}
}
unsafe extern "C" fn on_add_buffer<D>(
data: *mut ::std::os::raw::c_void,
buffer: *mut pw_sys::pw_buffer,
) {
if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
if let Some(cb) = &mut state.add_buffer {
let stream = unwrap_stream_ptr(state.stream);
cb(stream, &mut state.user_data, buffer);
}
}
}
unsafe extern "C" fn on_remove_buffer<D>(
data: *mut ::std::os::raw::c_void,
buffer: *mut pw_sys::pw_buffer,
) {
if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
if let Some(cb) = &mut state.remove_buffer {
let stream = unwrap_stream_ptr(state.stream);
cb(stream, &mut state.user_data, buffer);
}
}
}
unsafe extern "C" fn on_process<D>(data: *mut ::std::os::raw::c_void) {
if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
if let Some(cb) = &mut state.process {
let stream = unwrap_stream_ptr(state.stream);
cb(stream, &mut state.user_data);
}
}
}
unsafe extern "C" fn on_drained<D>(data: *mut ::std::os::raw::c_void) {
if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
if let Some(cb) = &mut state.drained {
let stream = unwrap_stream_ptr(state.stream);
cb(stream, &mut state.user_data);
}
}
}
#[cfg(feature = "v0_3_39")]
unsafe extern "C" fn on_command<D>(
data: *mut ::std::os::raw::c_void,
command: *const spa_sys::spa_command,
) {
if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
if let Some(cb) = &mut state.command {
let stream = unwrap_stream_ptr(state.stream);
cb(stream, &mut state.user_data, command);
}
}
}
#[cfg(feature = "v0_3_40")]
unsafe extern "C" fn on_trigger_done<D>(data: *mut ::std::os::raw::c_void) {
if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
if let Some(cb) = &mut state.trigger_done {
let stream = unwrap_stream_ptr(state.stream);
cb(stream, &mut state.user_data);
}
}
}
let events = unsafe {
let mut events: Pin<Box<pw_sys::pw_stream_events>> = Box::pin(mem::zeroed());
events.version = pw_sys::PW_VERSION_STREAM_EVENTS;
if callbacks.state_changed.is_some() {
events.state_changed = Some(on_state_changed::<D>);
}
if callbacks.control_info.is_some() {
events.control_info = Some(on_control_info::<D>);
}
if callbacks.io_changed.is_some() {
events.io_changed = Some(on_io_changed::<D>);
}
if callbacks.param_changed.is_some() {
events.param_changed = Some(on_param_changed::<D>);
}
if callbacks.add_buffer.is_some() {
events.add_buffer = Some(on_add_buffer::<D>);
}
if callbacks.remove_buffer.is_some() {
events.remove_buffer = Some(on_remove_buffer::<D>);
}
if callbacks.process.is_some() {
events.process = Some(on_process::<D>);
}
if callbacks.drained.is_some() {
events.drained = Some(on_drained::<D>);
}
#[cfg(feature = "v0_3_39")]
if callbacks.command.is_some() {
events.command = Some(on_command::<D>);
}
#[cfg(feature = "v0_3_40")]
if callbacks.trigger_done.is_some() {
events.trigger_done = Some(on_trigger_done::<D>);
}
events
};
(events, callbacks)
}
}
pub struct ListenerLocalBuilder<'a, D> {
stream: &'a Stream,
callbacks: ListenerLocalCallbacks<D>,
}
impl<'a, D> ListenerLocalBuilder<'a, D> {
#[must_use = "Call `.register()` to start receiving events"]
pub fn state_changed<F>(mut self, callback: F) -> Self
where
F: FnMut(&Stream, &mut D, StreamState, StreamState) + 'static,
{
self.callbacks.state_changed = Some(Box::new(callback));
self
}
#[must_use = "Call `.register()` to start receiving events"]
pub fn control_info<F>(mut self, callback: F) -> Self
where
F: FnMut(&Stream, &mut D, u32, *const pw_sys::pw_stream_control) + 'static,
{
self.callbacks.control_info = Some(Box::new(callback));
self
}
#[must_use = "Call `.register()` to start receiving events"]
pub fn io_changed<F>(mut self, callback: F) -> Self
where
F: FnMut(&Stream, &mut D, u32, *mut os::raw::c_void, u32) + 'static,
{
self.callbacks.io_changed = Some(Box::new(callback));
self
}
#[must_use = "Call `.register()` to start receiving events"]
pub fn param_changed<F>(mut self, callback: F) -> Self
where
F: FnMut(&Stream, &mut D, u32, Option<&spa::pod::Pod>) + 'static,
{
self.callbacks.param_changed = Some(Box::new(callback));
self
}
#[must_use = "Call `.register()` to start receiving events"]
pub fn add_buffer<F>(mut self, callback: F) -> Self
where
F: FnMut(&Stream, &mut D, *mut pw_sys::pw_buffer) + 'static,
{
self.callbacks.add_buffer = Some(Box::new(callback));
self
}
#[must_use = "Call `.register()` to start receiving events"]
pub fn remove_buffer<F>(mut self, callback: F) -> Self
where
F: FnMut(&Stream, &mut D, *mut pw_sys::pw_buffer) + 'static,
{
self.callbacks.remove_buffer = Some(Box::new(callback));
self
}
#[must_use = "Call `.register()` to start receiving events"]
pub fn process<F>(mut self, callback: F) -> Self
where
F: FnMut(&Stream, &mut D) + 'static,
{
self.callbacks.process = Some(Box::new(callback));
self
}
#[must_use = "Call `.register()` to start receiving events"]
pub fn drained<F>(mut self, callback: F) -> Self
where
F: FnMut(&Stream, &mut D) + 'static,
{
self.callbacks.drained = Some(Box::new(callback));
self
}
pub fn register(self) -> Result<StreamListener<D>, Error> {
let (events, data) = self.callbacks.into_raw();
let (listener, data) = unsafe {
let listener: Box<spa_sys::spa_hook> = Box::new(mem::zeroed());
let raw_listener = Box::into_raw(listener);
let raw_data = Box::into_raw(data);
pw_sys::pw_stream_add_listener(
self.stream.as_raw_ptr(),
raw_listener,
events.as_ref().get_ref(),
raw_data as *mut _,
);
(Box::from_raw(raw_listener), Box::from_raw(raw_data))
};
Ok(StreamListener {
listener,
_events: events,
_data: data,
})
}
}
#[must_use = "Listeners unregister themselves when dropped. Keep the listener alive in order to receive events."]
pub struct StreamListener<D> {
listener: Box<spa_sys::spa_hook>,
_events: Pin<Box<pw_sys::pw_stream_events>>,
_data: Box<ListenerLocalCallbacks<D>>,
}
impl<D> StreamListener<D> {
pub fn unregister(self) {
}
}
impl<D> std::ops::Drop for StreamListener<D> {
fn drop(&mut self) {
spa::utils::hook::remove(*self.listener);
}
}
bitflags! {
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct StreamFlags: pw_sys::pw_stream_flags {
const AUTOCONNECT = pw_sys::pw_stream_flags_PW_STREAM_FLAG_AUTOCONNECT;
const INACTIVE = pw_sys::pw_stream_flags_PW_STREAM_FLAG_INACTIVE;
const MAP_BUFFERS = pw_sys::pw_stream_flags_PW_STREAM_FLAG_MAP_BUFFERS;
const DRIVER = pw_sys::pw_stream_flags_PW_STREAM_FLAG_DRIVER;
const RT_PROCESS = pw_sys::pw_stream_flags_PW_STREAM_FLAG_RT_PROCESS;
const NO_CONVERT = pw_sys::pw_stream_flags_PW_STREAM_FLAG_NO_CONVERT;
const EXCLUSIVE = pw_sys::pw_stream_flags_PW_STREAM_FLAG_EXCLUSIVE;
const DONT_RECONNECT = pw_sys::pw_stream_flags_PW_STREAM_FLAG_DONT_RECONNECT;
const ALLOC_BUFFERS = pw_sys::pw_stream_flags_PW_STREAM_FLAG_ALLOC_BUFFERS;
#[cfg(feature = "v0_3_41")]
const TRIGGER = pw_sys::pw_stream_flags_PW_STREAM_FLAG_TRIGGER;
}
}