use crate::buffer::Buffer;
use crate::{error::Error, Core, Loop, MainLoop, Properties, PropertiesRef};
use bitflags::bitflags;
use spa::result::SpaResult;
use std::fmt::Debug;
use std::{
ffi::{self, CStr, CString},
mem, os,
pin::Pin,
ptr,
};
#[derive(Debug)]
pub enum StreamState {
Error(String),
Unconnected,
Connecting,
Paused,
Streaming,
}
impl StreamState {
pub(crate) fn from_raw(state: pw_sys::pw_stream_state, error: *const os::raw::c_char) -> Self {
match state {
pw_sys::pw_stream_state_PW_STREAM_STATE_UNCONNECTED => StreamState::Unconnected,
pw_sys::pw_stream_state_PW_STREAM_STATE_CONNECTING => StreamState::Connecting,
pw_sys::pw_stream_state_PW_STREAM_STATE_PAUSED => StreamState::Paused,
pw_sys::pw_stream_state_PW_STREAM_STATE_STREAMING => StreamState::Streaming,
_ => {
let error = if error.is_null() {
"".to_string()
} else {
unsafe { ffi::CStr::from_ptr(error).to_string_lossy().to_string() }
};
StreamState::Error(error)
}
}
}
}
pub struct Stream<D> {
ptr: ptr::NonNull<pw_sys::pw_stream>,
_alive: KeepAlive<D>,
}
enum KeepAlive<D> {
Normal {
_core: Core,
},
Simple {
_events: Pin<Box<pw_sys::pw_stream_events>>,
_data: Box<ListenerLocalCallbacks<D>>,
},
Temp,
}
impl<D> Stream<D> {
pub fn new(core: &Core, name: &str, properties: Properties) -> Result<Self, Error> {
let name = CString::new(name).expect("Invalid byte in stream name");
let stream =
unsafe { pw_sys::pw_stream_new(core.as_ptr(), name.as_ptr(), properties.into_raw()) };
let stream = ptr::NonNull::new(stream).ok_or(Error::CreationFailed)?;
Ok(Stream {
ptr: stream,
_alive: KeepAlive::Normal {
_core: core.clone(),
},
})
}
pub fn with_user_data<'a>(
main_loop: &'a MainLoop,
name: &str,
properties: Properties,
user_data: D,
) -> SimpleLocalBuilder<'a, D> {
let name = CString::new(name).expect("Invalid byte in stream name");
SimpleLocalBuilder::<D> {
main_loop,
name,
properties,
callbacks: ListenerLocalCallbacks::with_user_data(user_data),
}
}
#[must_use = "Fluent builder API"]
pub fn add_local_listener_with_user_data(
&mut self,
user_data: D,
) -> ListenerLocalBuilder<'_, D> {
ListenerLocalBuilder {
stream: self,
callbacks: ListenerLocalCallbacks::with_user_data(user_data),
}
}
pub fn connect(
&self,
direction: spa::Direction,
id: Option<u32>,
flags: StreamFlags,
params: &mut [*const spa_sys::spa_pod],
) -> Result<(), Error> {
let r = unsafe {
pw_sys::pw_stream_connect(
self.as_ptr(),
direction.as_raw(),
id.unwrap_or(crate::constants::ID_ANY),
flags.bits(),
params.as_mut_ptr(),
params.len() as u32,
)
};
SpaResult::from_c(r).into_sync_result()?;
Ok(())
}
pub fn update_params(&self, params: &mut [*const spa_sys::spa_pod]) -> Result<(), Error> {
let r = unsafe {
pw_sys::pw_stream_update_params(self.as_ptr(), params.as_mut_ptr(), params.len() as u32)
};
SpaResult::from_c(r).into_sync_result()?;
Ok(())
}
pub fn set_active(&self, active: bool) -> Result<(), Error> {
let r = unsafe { pw_sys::pw_stream_set_active(self.as_ptr(), active) };
SpaResult::from_c(r).into_sync_result()?;
Ok(())
}
pub unsafe fn dequeue_raw_buffer(&self) -> *mut pw_sys::pw_buffer {
pw_sys::pw_stream_dequeue_buffer(self.as_ptr())
}
pub fn dequeue_buffer(&self) -> Option<Buffer<D>> {
unsafe { Buffer::from_raw(self.dequeue_raw_buffer(), self) }
}
pub unsafe fn queue_raw_buffer(&self, buffer: *mut pw_sys::pw_buffer) {
pw_sys::pw_stream_queue_buffer(self.as_ptr(), buffer);
}
fn as_ptr(&self) -> *mut pw_sys::pw_stream {
self.ptr.as_ptr()
}
pub fn disconnect(&self) -> Result<(), Error> {
let r = unsafe { pw_sys::pw_stream_disconnect(self.as_ptr()) };
SpaResult::from_c(r).into_sync_result()?;
Ok(())
}
pub fn set_error(&mut self, res: i32, error: &str) {
let error = CString::new(error).expect("failed to convert error to CString");
unsafe {
pw_sys::pw_stream_set_error(self.as_ptr(), res, error.as_c_str().as_ptr());
}
}
pub fn flush(&self, drain: bool) -> Result<(), Error> {
let r = unsafe { pw_sys::pw_stream_flush(self.as_ptr(), drain) };
SpaResult::from_c(r).into_sync_result()?;
Ok(())
}
pub fn name(&self) -> String {
let name = unsafe {
let name = pw_sys::pw_stream_get_name(self.as_ptr());
CStr::from_ptr(name)
};
name.to_string_lossy().to_string()
}
pub fn state(&self) -> StreamState {
let mut error: *const std::os::raw::c_char = ptr::null();
let state =
unsafe { pw_sys::pw_stream_get_state(self.as_ptr(), (&mut error) as *mut *const _) };
StreamState::from_raw(state, error)
}
pub fn properties(&self) -> PropertiesRef<'_> {
unsafe {
let props = pw_sys::pw_stream_get_properties(self.as_ptr());
let props = ptr::NonNull::new(props as *mut _).expect("stream properties is NULL");
PropertiesRef::from_ptr(props)
}
}
pub fn node_id(&self) -> u32 {
unsafe { pw_sys::pw_stream_get_node_id(self.as_ptr()) }
}
}
impl<D: Default> Stream<D> {
#[must_use]
pub fn simple<'a>(
main_loop: &'a MainLoop,
name: &str,
properties: Properties,
) -> SimpleLocalBuilder<'a, D> {
Self::with_user_data(main_loop, name, properties, Default::default())
}
#[must_use = "Fluent builder API"]
pub fn add_local_listener(&mut self) -> ListenerLocalBuilder<'_, D> {
ListenerLocalBuilder {
stream: self,
callbacks: ListenerLocalCallbacks::with_user_data(Default::default()),
}
}
}
impl<D> std::fmt::Debug for Stream<D> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Stream")
.field("name", &self.name())
.field("state", &self.state())
.field("node-id", &self.node_id())
.field("properties", &self.properties())
.finish()
}
}
type ParamChangedCB<D> = dyn Fn(u32, &mut D, *const spa_sys::spa_pod);
type ProcessCB<D> = dyn Fn(&Stream<D>, &mut D);
pub struct ListenerLocalCallbacks<D> {
pub state_changed: Option<Box<dyn Fn(StreamState, StreamState)>>,
pub control_info: Option<Box<dyn Fn(u32, *const pw_sys::pw_stream_control)>>,
#[allow(clippy::type_complexity)]
pub io_changed: Option<Box<dyn Fn(u32, *mut os::raw::c_void, u32)>>,
pub param_changed: Option<Box<ParamChangedCB<D>>>,
pub add_buffer: Option<Box<dyn Fn(*mut pw_sys::pw_buffer)>>,
pub remove_buffer: Option<Box<dyn Fn(*mut pw_sys::pw_buffer)>>,
pub process: Option<Box<ProcessCB<D>>>,
pub drained: Option<Box<dyn Fn()>>,
pub user_data: D,
stream: Option<ptr::NonNull<pw_sys::pw_stream>>,
}
impl<D> ListenerLocalCallbacks<D> {
fn with_user_data(user_data: D) -> Self {
ListenerLocalCallbacks {
process: Default::default(),
stream: Default::default(),
drained: Default::default(),
add_buffer: Default::default(),
control_info: Default::default(),
io_changed: Default::default(),
param_changed: Default::default(),
remove_buffer: Default::default(),
state_changed: Default::default(),
user_data,
}
}
pub(crate) fn into_raw(
self,
) -> (
Pin<Box<pw_sys::pw_stream_events>>,
Box<ListenerLocalCallbacks<D>>,
) {
let callbacks = Box::new(self);
unsafe extern "C" fn on_state_changed<D>(
data: *mut os::raw::c_void,
old: pw_sys::pw_stream_state,
new: pw_sys::pw_stream_state,
error: *const os::raw::c_char,
) {
if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_ref() {
if let Some(ref cb) = state.state_changed {
let old = StreamState::from_raw(old, error);
let new = StreamState::from_raw(new, error);
cb(old, new)
};
}
}
unsafe extern "C" fn on_control_info<D>(
data: *mut os::raw::c_void,
id: u32,
control: *const pw_sys::pw_stream_control,
) {
if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_ref() {
if let Some(ref cb) = state.control_info {
cb(id, control);
}
}
}
unsafe extern "C" fn on_io_changed<D>(
data: *mut os::raw::c_void,
id: u32,
area: *mut os::raw::c_void,
size: u32,
) {
if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_ref() {
if let Some(ref cb) = state.io_changed {
cb(id, area, size);
}
}
}
unsafe extern "C" fn on_param_changed<D>(
data: *mut os::raw::c_void,
id: u32,
param: *const spa_sys::spa_pod,
) {
if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
if let Some(ref cb) = state.param_changed {
cb(id, &mut state.user_data, param);
}
}
}
unsafe extern "C" fn on_add_buffer<D>(
data: *mut ::std::os::raw::c_void,
buffer: *mut pw_sys::pw_buffer,
) {
if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_ref() {
if let Some(ref cb) = state.add_buffer {
cb(buffer);
}
}
}
unsafe extern "C" fn on_remove_buffer<D>(
data: *mut ::std::os::raw::c_void,
buffer: *mut pw_sys::pw_buffer,
) {
if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_ref() {
if let Some(ref cb) = state.remove_buffer {
cb(buffer);
}
}
}
unsafe extern "C" fn on_process<D>(data: *mut ::std::os::raw::c_void) {
if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_mut() {
if let Some(ref cb) = state.process {
let stream = state
.stream
.map(|ptr| Stream {
ptr,
_alive: KeepAlive::Temp,
})
.expect("stream cannot be null");
cb(&stream, &mut state.user_data);
}
}
}
unsafe extern "C" fn on_drained<D>(data: *mut ::std::os::raw::c_void) {
if let Some(state) = (data as *mut ListenerLocalCallbacks<D>).as_ref() {
if let Some(ref cb) = state.drained {
cb();
}
}
}
let events = unsafe {
let mut events: Pin<Box<pw_sys::pw_stream_events>> = Box::pin(mem::zeroed());
events.version = pw_sys::PW_VERSION_STREAM_EVENTS;
if callbacks.state_changed.is_some() {
events.state_changed = Some(on_state_changed::<D>);
}
if callbacks.control_info.is_some() {
events.control_info = Some(on_control_info::<D>);
}
if callbacks.io_changed.is_some() {
events.io_changed = Some(on_io_changed::<D>);
}
if callbacks.param_changed.is_some() {
events.param_changed = Some(on_param_changed::<D>);
}
if callbacks.add_buffer.is_some() {
events.add_buffer = Some(on_add_buffer::<D>);
}
if callbacks.remove_buffer.is_some() {
events.remove_buffer = Some(on_remove_buffer::<D>);
}
if callbacks.process.is_some() {
events.process = Some(on_process::<D>);
}
if callbacks.drained.is_some() {
events.drained = Some(on_drained::<D>);
}
events
};
(events, callbacks)
}
}
pub trait ListenerBuilderT<D>: Sized {
fn callbacks(&mut self) -> &mut ListenerLocalCallbacks<D>;
fn state_changed<F>(mut self, callback: F) -> Self
where
F: Fn(StreamState, StreamState) + 'static,
{
self.callbacks().state_changed = Some(Box::new(callback));
self
}
fn control_info<F>(mut self, callback: F) -> Self
where
F: Fn(u32, *const pw_sys::pw_stream_control) + 'static,
{
self.callbacks().control_info = Some(Box::new(callback));
self
}
fn io_changed<F>(mut self, callback: F) -> Self
where
F: Fn(u32, *mut os::raw::c_void, u32) + 'static,
{
self.callbacks().io_changed = Some(Box::new(callback));
self
}
fn param_changed<F>(mut self, callback: F) -> Self
where
F: Fn(u32, &mut D, *const spa_sys::spa_pod) + 'static,
{
self.callbacks().param_changed = Some(Box::new(callback));
self
}
fn add_buffer<F>(mut self, callback: F) -> Self
where
F: Fn(*mut pw_sys::pw_buffer) + 'static,
{
self.callbacks().add_buffer = Some(Box::new(callback));
self
}
fn remove_buffer<F>(mut self, callback: F) -> Self
where
F: Fn(*mut pw_sys::pw_buffer) + 'static,
{
self.callbacks().remove_buffer = Some(Box::new(callback));
self
}
fn process<F>(mut self, callback: F) -> Self
where
F: Fn(&Stream<D>, &mut D) + 'static,
{
self.callbacks().process = Some(Box::new(callback));
self
}
fn drained<F>(mut self, callback: F) -> Self
where
F: Fn() + 'static,
{
self.callbacks().drained = Some(Box::new(callback));
self
}
}
pub struct ListenerLocalBuilder<'a, D> {
stream: &'a mut Stream<D>,
callbacks: ListenerLocalCallbacks<D>,
}
impl<'a, D: Default> ListenerBuilderT<D> for ListenerLocalBuilder<'a, D> {
fn callbacks(&mut self) -> &mut ListenerLocalCallbacks<D> {
&mut self.callbacks
}
}
impl<'a, D> ListenerLocalBuilder<'a, D> {
pub fn register(self) -> Result<StreamListener<D>, Error> {
let (events, data) = self.callbacks.into_raw();
let (listener, data) = unsafe {
let listener: Box<spa_sys::spa_hook> = Box::new(mem::zeroed());
let raw_listener = Box::into_raw(listener);
let raw_data = Box::into_raw(data);
pw_sys::pw_stream_add_listener(
self.stream.as_ptr(),
raw_listener,
events.as_ref().get_ref(),
raw_data as *mut _,
);
(Box::from_raw(raw_listener), Box::from_raw(raw_data))
};
Ok(StreamListener {
listener,
_events: events,
_data: data,
})
}
}
pub struct SimpleLocalBuilder<'a, D> {
main_loop: &'a MainLoop,
name: CString,
properties: Properties,
callbacks: ListenerLocalCallbacks<D>,
}
impl<'a, D> ListenerBuilderT<D> for SimpleLocalBuilder<'a, D> {
fn callbacks(&mut self) -> &mut ListenerLocalCallbacks<D> {
&mut self.callbacks
}
}
impl<'a, D> SimpleLocalBuilder<'a, D> {
pub fn create(self) -> Result<Stream<D>, Error> {
let (events, data) = self.callbacks.into_raw();
let data = Box::into_raw(data);
let (stream, mut data) = unsafe {
let stream = pw_sys::pw_stream_new_simple(
self.main_loop.as_ptr(),
self.name.as_ptr(),
self.properties.into_raw(),
events.as_ref().get_ref(),
data as *mut _,
);
(stream, Box::from_raw(data))
};
let stream = ptr::NonNull::new(stream).ok_or(Error::CreationFailed)?;
data.stream = Some(stream);
Ok(Stream {
ptr: stream,
_alive: KeepAlive::Simple {
_events: events,
_data: data,
},
})
}
}
pub struct StreamListener<D> {
listener: Box<spa_sys::spa_hook>,
_events: Pin<Box<pw_sys::pw_stream_events>>,
_data: Box<ListenerLocalCallbacks<D>>,
}
impl<D> StreamListener<D> {
pub fn unregister(self) {
}
}
impl<D> std::ops::Drop for StreamListener<D> {
fn drop(&mut self) {
spa::hook::remove(*self.listener);
}
}
bitflags! {
pub struct StreamFlags: pw_sys::pw_stream_flags {
const AUTOCONNECT = pw_sys::pw_stream_flags_PW_STREAM_FLAG_AUTOCONNECT;
const INACTIVE = pw_sys::pw_stream_flags_PW_STREAM_FLAG_INACTIVE;
const MAP_BUFFERS = pw_sys::pw_stream_flags_PW_STREAM_FLAG_MAP_BUFFERS;
const DRIVER = pw_sys::pw_stream_flags_PW_STREAM_FLAG_DRIVER;
const RT_PROCESS = pw_sys::pw_stream_flags_PW_STREAM_FLAG_RT_PROCESS;
const NO_CONVERT = pw_sys::pw_stream_flags_PW_STREAM_FLAG_NO_CONVERT;
const EXCLUSIVE = pw_sys::pw_stream_flags_PW_STREAM_FLAG_EXCLUSIVE;
const DONT_RECONNECT = pw_sys::pw_stream_flags_PW_STREAM_FLAG_DONT_RECONNECT;
const ALLOC_BUFFERS = pw_sys::pw_stream_flags_PW_STREAM_FLAG_ALLOC_BUFFERS;
}
}