use std::ffi::{c_void, CStr};
use std::fmt;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex;
use crate::error::SCError;
use crate::stream::delegate_trait::SCStreamDelegateTrait;
use crate::utils::completion::UnitCompletion;
use crate::{
dispatch_queue::DispatchQueue,
ffi,
stream::{
configuration::SCStreamConfiguration, content_filter::SCContentFilter,
output_trait::SCStreamOutputTrait, output_type::SCStreamOutputType,
},
};
struct HandlerEntry {
id: usize,
of_type: SCStreamOutputType,
handler: Box<dyn SCStreamOutputTrait>,
}
struct StreamContext {
handlers: Mutex<Vec<HandlerEntry>>,
delegate: Mutex<Option<Box<dyn SCStreamDelegateTrait>>>,
ref_count: AtomicUsize,
}
impl StreamContext {
fn new() -> *mut Self {
let ctx = Box::new(Self {
handlers: Mutex::new(Vec::new()),
delegate: Mutex::new(None),
ref_count: AtomicUsize::new(1),
});
Box::into_raw(ctx)
}
fn new_with_delegate(delegate: Box<dyn SCStreamDelegateTrait>) -> *mut Self {
let ctx = Box::new(Self {
handlers: Mutex::new(Vec::new()),
delegate: Mutex::new(Some(delegate)),
ref_count: AtomicUsize::new(1),
});
Box::into_raw(ctx)
}
unsafe fn retain(ptr: *mut Self) {
unsafe { &*ptr }.ref_count.fetch_add(1, Ordering::Relaxed);
}
unsafe fn release(ptr: *mut Self) {
if ptr.is_null() {
return;
}
let prev = unsafe { &*ptr }.ref_count.fetch_sub(1, Ordering::Release);
if prev == 1 {
std::sync::atomic::fence(Ordering::Acquire);
drop(unsafe { Box::from_raw(ptr) });
}
}
}
static NEXT_HANDLER_ID: AtomicUsize = AtomicUsize::new(1);
extern "C" fn delegate_error_callback(context: *mut c_void, error_code: i32, msg: *const i8) {
if context.is_null() {
return;
}
let ctx = unsafe { &*(context.cast::<StreamContext>()) };
let message = if msg.is_null() {
"Unknown error".to_string()
} else {
unsafe { CStr::from_ptr(msg) }
.to_str()
.unwrap_or("Unknown error")
.to_string()
};
let error = if error_code != 0 {
crate::error::SCStreamErrorCode::from_raw(error_code).map_or_else(
|| SCError::StreamError(format!("{message} (code: {error_code})")),
|code| SCError::SCStreamError {
code,
message: Some(message.clone()),
},
)
} else {
SCError::StreamError(message.clone())
};
if let Ok(delegate_guard) = ctx.delegate.lock() {
if let Some(ref delegate) = *delegate_guard {
delegate.did_stop_with_error(error);
delegate.stream_did_stop(Some(message));
return;
}
}
eprintln!("SCStream error: {error}");
}
extern "C" fn sample_handler(context: *mut c_void, sample_buffer: *const c_void, output_type: i32) {
if context.is_null() {
unsafe { crate::cm::ffi::cm_sample_buffer_release(sample_buffer.cast_mut()) };
return;
}
let ctx = unsafe { &*(context.cast::<StreamContext>()) };
let output_type_enum = match output_type {
0 => SCStreamOutputType::Screen,
1 => SCStreamOutputType::Audio,
2 => SCStreamOutputType::Microphone,
_ => {
eprintln!("Unknown output type: {output_type}");
unsafe { crate::cm::ffi::cm_sample_buffer_release(sample_buffer.cast_mut()) };
return;
}
};
let handlers = ctx.handlers.lock().unwrap();
let matching: Vec<&HandlerEntry> = handlers
.iter()
.filter(|e| e.of_type == output_type_enum)
.collect();
if matching.is_empty() {
drop(handlers);
unsafe { crate::cm::ffi::cm_sample_buffer_release(sample_buffer.cast_mut()) };
return;
}
let count = matching.len();
for (idx, entry) in matching.iter().enumerate() {
let buffer = unsafe { crate::cm::CMSampleBuffer::from_ptr(sample_buffer.cast_mut()) };
if idx < count - 1 {
unsafe { crate::cm::ffi::cm_sample_buffer_retain(sample_buffer.cast_mut()) };
}
entry
.handler
.did_output_sample_buffer(buffer, output_type_enum);
}
}
pub struct SCStream {
ptr: *const c_void,
context: *mut StreamContext,
}
unsafe impl Send for SCStream {}
unsafe impl Sync for SCStream {}
impl SCStream {
pub fn new(filter: &SCContentFilter, configuration: &SCStreamConfiguration) -> Self {
let context = StreamContext::new();
let context_ptr = context.cast::<c_void>();
let ptr = unsafe {
ffi::sc_stream_create(
filter.as_ptr(),
configuration.as_ptr(),
context_ptr,
delegate_error_callback,
sample_handler,
)
};
Self { ptr, context }
}
pub fn new_with_delegate(
filter: &SCContentFilter,
configuration: &SCStreamConfiguration,
delegate: impl SCStreamDelegateTrait + 'static,
) -> Self {
let context = StreamContext::new_with_delegate(Box::new(delegate));
let context_ptr = context.cast::<c_void>();
let ptr = unsafe {
ffi::sc_stream_create(
filter.as_ptr(),
configuration.as_ptr(),
context_ptr,
delegate_error_callback,
sample_handler,
)
};
Self { ptr, context }
}
pub fn add_output_handler(
&mut self,
handler: impl SCStreamOutputTrait + 'static,
of_type: SCStreamOutputType,
) -> Option<usize> {
self.add_output_handler_with_queue(handler, of_type, None)
}
pub fn add_output_handler_with_queue(
&mut self,
handler: impl SCStreamOutputTrait + 'static,
of_type: SCStreamOutputType,
queue: Option<&DispatchQueue>,
) -> Option<usize> {
let handler_id = NEXT_HANDLER_ID.fetch_add(1, Ordering::Relaxed);
let output_type_int = match of_type {
SCStreamOutputType::Screen => 0,
SCStreamOutputType::Audio => 1,
SCStreamOutputType::Microphone => 2,
};
let ok = if let Some(q) = queue {
unsafe {
ffi::sc_stream_add_stream_output_with_queue(self.ptr, output_type_int, q.as_ptr())
}
} else {
unsafe { ffi::sc_stream_add_stream_output(self.ptr, output_type_int) }
};
if ok {
unsafe { &*self.context }
.handlers
.lock()
.unwrap()
.push(HandlerEntry {
id: handler_id,
of_type,
handler: Box::new(handler),
});
Some(handler_id)
} else {
None
}
}
pub fn remove_output_handler(&mut self, id: usize, of_type: SCStreamOutputType) -> bool {
let mut handlers = unsafe { &*self.context }.handlers.lock().unwrap();
let Some(pos) = handlers.iter().position(|e| e.id == id) else {
return false;
};
handlers.remove(pos);
let has_type = handlers.iter().any(|e| e.of_type == of_type);
drop(handlers);
if !has_type {
let output_type_int = match of_type {
SCStreamOutputType::Screen => 0,
SCStreamOutputType::Audio => 1,
SCStreamOutputType::Microphone => 2,
};
unsafe { ffi::sc_stream_remove_stream_output(self.ptr, output_type_int) };
}
true
}
pub fn start_capture(&self) -> Result<(), SCError> {
let (completion, context) = UnitCompletion::new();
unsafe { ffi::sc_stream_start_capture(self.ptr, context, UnitCompletion::callback) };
completion.wait().map_err(SCError::CaptureStartFailed)
}
pub fn stop_capture(&self) -> Result<(), SCError> {
let (completion, context) = UnitCompletion::new();
unsafe { ffi::sc_stream_stop_capture(self.ptr, context, UnitCompletion::callback) };
completion.wait().map_err(SCError::CaptureStopFailed)
}
pub fn update_configuration(
&self,
configuration: &SCStreamConfiguration,
) -> Result<(), SCError> {
let (completion, context) = UnitCompletion::new();
unsafe {
ffi::sc_stream_update_configuration(
self.ptr,
configuration.as_ptr(),
context,
UnitCompletion::callback,
);
}
completion.wait().map_err(SCError::StreamError)
}
pub fn update_content_filter(&self, filter: &SCContentFilter) -> Result<(), SCError> {
let (completion, context) = UnitCompletion::new();
unsafe {
ffi::sc_stream_update_content_filter(
self.ptr,
filter.as_ptr(),
context,
UnitCompletion::callback,
);
}
completion.wait().map_err(SCError::StreamError)
}
#[cfg(feature = "macos_13_0")]
pub fn synchronization_clock(&self) -> Option<crate::cm::CMClock> {
let ptr = unsafe { ffi::sc_stream_get_synchronization_clock(self.ptr) };
if ptr.is_null() {
None
} else {
Some(crate::cm::CMClock::from_ptr(ptr))
}
}
#[cfg(feature = "macos_15_0")]
pub fn add_recording_output(
&self,
recording_output: &crate::recording_output::SCRecordingOutput,
) -> Result<(), SCError> {
let (completion, context) = UnitCompletion::new();
unsafe {
ffi::sc_stream_add_recording_output(
self.ptr,
recording_output.as_ptr(),
UnitCompletion::callback,
context,
);
}
completion.wait().map_err(SCError::StreamError)
}
#[cfg(feature = "macos_15_0")]
pub fn remove_recording_output(
&self,
recording_output: &crate::recording_output::SCRecordingOutput,
) -> Result<(), SCError> {
let (completion, context) = UnitCompletion::new();
unsafe {
ffi::sc_stream_remove_recording_output(
self.ptr,
recording_output.as_ptr(),
UnitCompletion::callback,
context,
);
}
completion.wait().map_err(SCError::StreamError)
}
#[allow(dead_code)]
pub(crate) fn as_ptr(&self) -> *const c_void {
self.ptr
}
}
impl Drop for SCStream {
fn drop(&mut self) {
if !self.ptr.is_null() {
unsafe { ffi::sc_stream_release(self.ptr) };
}
unsafe { StreamContext::release(self.context) };
}
}
impl Clone for SCStream {
fn clone(&self) -> Self {
unsafe { StreamContext::retain(self.context) };
Self {
ptr: unsafe { crate::ffi::sc_stream_retain(self.ptr) },
context: self.context,
}
}
}
impl fmt::Debug for SCStream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SCStream")
.field("ptr", &self.ptr)
.finish_non_exhaustive()
}
}
impl fmt::Display for SCStream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "SCStream")
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
#[test]
fn test_per_stream_callback_isolation() {
let count_a = Arc::new(AtomicUsize::new(0));
let count_b = Arc::new(AtomicUsize::new(0));
let ctx_a = StreamContext::new();
let ctx_b = StreamContext::new();
{
let counter = count_a.clone();
let mut handlers = unsafe { &*ctx_a }.handlers.lock().unwrap();
handlers.push(HandlerEntry {
id: 1,
of_type: SCStreamOutputType::Audio,
handler: Box::new(
move |buf: crate::cm::CMSampleBuffer, _ty: SCStreamOutputType| {
counter.fetch_add(1, Ordering::Relaxed);
std::mem::forget(buf);
},
),
});
}
{
let counter = count_b.clone();
let mut handlers = unsafe { &*ctx_b }.handlers.lock().unwrap();
handlers.push(HandlerEntry {
id: 2,
of_type: SCStreamOutputType::Audio,
handler: Box::new(
move |buf: crate::cm::CMSampleBuffer, _ty: SCStreamOutputType| {
counter.fetch_add(1, Ordering::Relaxed);
std::mem::forget(buf);
},
),
});
}
for _ in 0..5 {
let handlers = unsafe { &*ctx_a }.handlers.lock().unwrap();
for entry in handlers
.iter()
.filter(|e| e.of_type == SCStreamOutputType::Audio)
{
let buf = unsafe { crate::cm::CMSampleBuffer::from_ptr(std::ptr::null_mut()) };
entry
.handler
.did_output_sample_buffer(buf, SCStreamOutputType::Audio);
}
}
for _ in 0..3 {
let handlers = unsafe { &*ctx_b }.handlers.lock().unwrap();
for entry in handlers
.iter()
.filter(|e| e.of_type == SCStreamOutputType::Audio)
{
let buf = unsafe { crate::cm::CMSampleBuffer::from_ptr(std::ptr::null_mut()) };
entry
.handler
.did_output_sample_buffer(buf, SCStreamOutputType::Audio);
}
}
assert_eq!(
count_a.load(Ordering::Relaxed),
5,
"handler A received callbacks meant for B (cross-stream leak)"
);
assert_eq!(
count_b.load(Ordering::Relaxed),
3,
"handler B received callbacks meant for A (cross-stream leak)"
);
unsafe {
StreamContext::release(ctx_a);
StreamContext::release(ctx_b);
}
}
#[test]
fn test_handler_output_type_filtering() {
let screen_count = Arc::new(AtomicUsize::new(0));
let audio_count = Arc::new(AtomicUsize::new(0));
let ctx = StreamContext::new();
{
let counter = screen_count.clone();
let mut handlers = unsafe { &*ctx }.handlers.lock().unwrap();
handlers.push(HandlerEntry {
id: 1,
of_type: SCStreamOutputType::Screen,
handler: Box::new(
move |buf: crate::cm::CMSampleBuffer, _ty: SCStreamOutputType| {
counter.fetch_add(1, Ordering::Relaxed);
std::mem::forget(buf);
},
),
});
}
{
let counter = audio_count.clone();
let mut handlers = unsafe { &*ctx }.handlers.lock().unwrap();
handlers.push(HandlerEntry {
id: 2,
of_type: SCStreamOutputType::Audio,
handler: Box::new(
move |buf: crate::cm::CMSampleBuffer, _ty: SCStreamOutputType| {
counter.fetch_add(1, Ordering::Relaxed);
std::mem::forget(buf);
},
),
});
}
for _ in 0..4 {
let handlers = unsafe { &*ctx }.handlers.lock().unwrap();
for entry in handlers
.iter()
.filter(|e| e.of_type == SCStreamOutputType::Screen)
{
let buf = unsafe { crate::cm::CMSampleBuffer::from_ptr(std::ptr::null_mut()) };
entry
.handler
.did_output_sample_buffer(buf, SCStreamOutputType::Screen);
}
}
for _ in 0..2 {
let handlers = unsafe { &*ctx }.handlers.lock().unwrap();
for entry in handlers
.iter()
.filter(|e| e.of_type == SCStreamOutputType::Audio)
{
let buf = unsafe { crate::cm::CMSampleBuffer::from_ptr(std::ptr::null_mut()) };
entry
.handler
.did_output_sample_buffer(buf, SCStreamOutputType::Audio);
}
}
assert_eq!(screen_count.load(Ordering::Relaxed), 4);
assert_eq!(audio_count.load(Ordering::Relaxed), 2);
unsafe { StreamContext::release(ctx) };
}
#[test]
fn test_stream_context_ref_counting() {
let ctx = StreamContext::new();
assert_eq!(unsafe { &*ctx }.ref_count.load(Ordering::Relaxed), 1);
unsafe { StreamContext::retain(ctx) };
assert_eq!(unsafe { &*ctx }.ref_count.load(Ordering::Relaxed), 2);
unsafe { StreamContext::release(ctx) };
assert_eq!(unsafe { &*ctx }.ref_count.load(Ordering::Relaxed), 1);
unsafe { StreamContext::release(ctx) };
}
}