#[cfg(feature = "enable_runtime")]
use std::any;
use std::ffi::CStr;
use std::rc::Rc;
use std::sync::Arc;
use std::{ptr, thread};
use crate::context::ObsContext;
use crate::crash_handler::main_crash_handler;
use crate::enums::{ObsLogLevel, ObsResetVideoStatus};
use crate::logger::{extern_log_callback, internal_log_global, LOGGER};
#[cfg(target_os = "linux")]
use crate::run_with_obs;
use crate::utils::initialization::{platform_specific_setup, PlatformSpecificGuard};
use crate::utils::{ObsError, ObsModules, ObsString};
use crate::{context::OBS_THREAD_ID, utils::StartupInfo};
#[cfg(feature = "enable_runtime")]
use crate::unsafe_send::Sendable;
use std::fmt::Debug;
#[cfg(feature = "enable_runtime")]
use std::sync::atomic::{AtomicUsize, Ordering};
#[cfg(feature = "enable_runtime")]
use std::sync::mpsc::{channel, Sender};
#[cfg(feature = "enable_runtime")]
use std::sync::Mutex;
#[cfg(feature = "enable_runtime")]
use std::thread::JoinHandle;
#[cfg(feature = "enable_runtime")]
enum ObsCommand {
Execute(
Box<dyn FnOnce() -> Box<dyn any::Any + Send> + Send>,
Option<oneshot::Sender<Box<dyn any::Any + Send>>>,
),
Terminate,
}
#[derive(Debug, Clone)]
pub struct ObsRuntime {
#[cfg(feature = "enable_runtime")]
command_sender: Arc<Sender<ObsCommand>>,
#[cfg(feature = "enable_runtime")]
queued_commands: Arc<AtomicUsize>,
thread_id: std::thread::ThreadId,
_guard: Arc<_ObsRuntimeGuard>,
#[cfg(not(feature = "enable_runtime"))]
_platform_specific: Option<Rc<PlatformSpecificGuard>>,
}
impl ObsRuntime {
#[allow(unused_mut)]
pub(crate) fn startup(
mut options: StartupInfo,
) -> Result<(ObsRuntime, ObsModules, StartupInfo), ObsError> {
let obs_id = OBS_THREAD_ID.lock().map_err(|_e| ObsError::MutexFailure)?;
if obs_id.is_some() {
return Err(ObsError::ThreadFailure);
}
drop(obs_id);
log::trace!("Initializing OBS context");
ObsRuntime::init(options)
.map_err(|e| ObsError::Unexpected(format!("Failed to initialize OBS runtime: {:?}", e)))
}
#[cfg(not(feature = "enable_runtime"))]
fn init(info: StartupInfo) -> Result<(ObsRuntime, ObsModules, StartupInfo), ObsError> {
let (startup, mut modules, platform_specific) = unsafe { Self::initialize_inner(info)? };
let runtime = Self {
thread_id: thread::current().id(),
_guard: Arc::new(_ObsRuntimeGuard {}),
_platform_specific: platform_specific,
};
modules.runtime = Some(runtime.clone());
Ok((runtime, modules, startup))
}
#[cfg(feature = "enable_runtime")]
fn init(info: StartupInfo) -> Result<(ObsRuntime, ObsModules, StartupInfo), ObsError> {
static RUNTIME_THREAD_NAME: &str = "libobs-wrapper-obs-runtime";
let (command_sender, command_receiver) = channel();
let (init_tx, init_rx) = oneshot::channel();
let queued_commands = Arc::new(AtomicUsize::new(0));
let queued_commands_clone = queued_commands.clone();
let handle = std::thread::Builder::new()
.name(RUNTIME_THREAD_NAME.to_string())
.spawn(move || {
log::trace!("Starting OBS thread");
let res = unsafe {
Self::initialize_inner(info)
};
match res {
Ok((info, modules, _platform_specific_guard)) => {
log::trace!("OBS context initialized successfully");
let e = init_tx.send(Ok((Sendable(modules), info)));
if let Err(err) = e {
log::error!("Failed to send initialization signal: {:?}", err);
}
while let Ok(command) = command_receiver.recv() {
match command {
ObsCommand::Execute(func, result_sender) => {
let result = func();
if let Some(result_sender) = result_sender {
let _ = result_sender.send(result);
}
queued_commands_clone.fetch_sub(1, Ordering::SeqCst);
}
ObsCommand::Terminate => break,
}
}
let r = unsafe {
Self::shutdown_inner()
};
if let Err(err) = r {
log::error!("Failed to shut down OBS context: {:?}", err);
}
}
Err(err) => {
log::error!("Failed to initialize OBS context: {:?}", err);
let _ = init_tx.send(Err(err));
}
}
})
.map_err(|_e| ObsError::ThreadFailure)?;
log::trace!("Waiting for OBS thread to initialize");
let (mut m, info) = init_rx.recv().map_err(|_| {
ObsError::RuntimeChannelError("Failed to receive initialization result".to_string())
})??;
let thread_id = handle.thread().id();
let handle = Arc::new(Mutex::new(Some(handle)));
let command_sender = Arc::new(command_sender);
let runtime = Self {
command_sender: command_sender.clone(),
thread_id,
queued_commands,
_guard: Arc::new(_ObsRuntimeGuard {
handle,
command_sender,
}),
};
m.0.runtime = Some(runtime.clone());
Ok((runtime, m.0, info))
}
#[cfg(feature = "enable_runtime")]
pub fn run_with_obs_no_block<F>(&self, operation: F) -> Result<(), ObsError>
where
F: FnOnce() + Send + 'static,
{
let is_within_runtime = std::thread::current().id() == self.thread_id;
if is_within_runtime {
operation();
return Ok(());
}
let val = self.queued_commands.fetch_add(1, Ordering::SeqCst);
if val > 50 {
log::warn!("More than 50 queued commands. Try to batch them together.");
}
let wrapper = move || -> Box<dyn std::any::Any + Send> {
operation();
Box::new(())
};
self.command_sender
.send(ObsCommand::Execute(Box::new(wrapper), None))
.map_err(|_| {
ObsError::RuntimeChannelError("Failed to send command to OBS thread".to_string())
})?;
Ok(())
}
#[cfg(not(feature = "enable_runtime"))]
pub fn run_with_obs_no_block<F>(&self, operation: F) -> Result<(), ObsError>
where
F: FnOnce() + 'static,
{
self.run_with_obs_result(operation)
}
#[cfg(not(feature = "enable_runtime"))]
pub fn run_with_obs_result<F, T>(&self, operation: F) -> Result<T, ObsError>
where
F: FnOnce() -> T,
{
let is_within_runtime = std::thread::current().id() == self.thread_id;
if !is_within_runtime {
return Err(ObsError::RuntimeOutsideThread);
}
Ok(operation())
}
#[cfg(feature = "enable_runtime")]
pub fn run_with_obs_result<F, T>(&self, operation: F) -> Result<T, ObsError>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
let is_within_runtime = std::thread::current().id() == self.thread_id;
if is_within_runtime {
let result = operation();
return Ok(result);
}
let (tx, rx) = oneshot::channel();
let wrapper = move || -> Box<dyn std::any::Any + Send> {
let result = operation();
Box::new(result)
};
let val = self.queued_commands.fetch_add(1, Ordering::SeqCst);
if val > 50 {
log::warn!("More than 50 queued commands. Try to batch them together.");
}
self.command_sender
.send(ObsCommand::Execute(Box::new(wrapper), Some(tx)))
.map_err(|_| {
ObsError::RuntimeChannelError("Failed to send command to OBS thread".to_string())
})?;
let result = rx.recv().map_err(|_| {
ObsError::RuntimeChannelError("OBS thread dropped the response channel".to_string())
})?;
let res = result.downcast::<T>().map(|boxed| *boxed).map_err(|_| {
ObsError::RuntimeChannelError(
"Failed to downcast result to the expected type".to_string(),
)
})?;
Ok(res)
}
#[allow(unknown_lints)]
#[allow(ensure_obs_call_in_runtime)]
unsafe fn initialize_inner(
mut info: StartupInfo,
) -> Result<(StartupInfo, ObsModules, Option<Rc<PlatformSpecificGuard>>), ObsError> {
let mut mutex_value = OBS_THREAD_ID.lock().map_err(|_e| ObsError::MutexFailure)?;
if (*mutex_value).is_some() {
return Err(ObsError::ThreadFailure);
}
*mutex_value = Some(thread::current().id());
#[cfg(windows)]
unsafe {
libobs::obs_init_win32_crash_handler();
}
unsafe {
libobs::base_set_crash_handler(Some(main_crash_handler), std::ptr::null_mut());
}
let native = unsafe {
platform_specific_setup(info.nix_display.clone())?
};
unsafe {
libobs::base_set_log_handler(Some(extern_log_callback), std::ptr::null_mut());
}
let mut log_callback = LOGGER.lock().map_err(|_e| ObsError::MutexFailure)?;
*log_callback = info.logger.take().expect("Logger can never be null");
drop(log_callback);
let locale_str = ObsString::new("en-US");
let startup_status = unsafe {
libobs::obs_startup(locale_str.as_ptr().0, ptr::null(), ptr::null_mut())
};
let version = unsafe { libobs::obs_get_version_string() };
let version_cstr = unsafe { CStr::from_ptr(version) };
let version_str = version_cstr.to_string_lossy().into_owned();
internal_log_global(ObsLogLevel::Info, format!("OBS {}", version_str));
if !ObsContext::check_version_compatibility() {
internal_log_global(
ObsLogLevel::Warning,
"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!".to_string(),
);
internal_log_global(
ObsLogLevel::Warning,
format!(
"OBS major version mismatch: installed version is {}, but expected major version {}. Expect crashes or bugs!!",
version_str,
libobs::LIBOBS_API_MAJOR_VER
),
);
internal_log_global(
ObsLogLevel::Warning,
"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!".to_string(),
);
}
internal_log_global(
ObsLogLevel::Info,
"---------------------------------".to_string(),
);
if !startup_status {
return Err(ObsError::Failure);
}
let mut obs_modules = unsafe {
ObsModules::add_paths(&info.startup_paths)
};
unsafe {
libobs::obs_reset_audio2(info.obs_audio_info.as_ptr().0);
}
let reset_video_status = num_traits::FromPrimitive::from_i32(unsafe {
libobs::obs_reset_video(info.obs_video_info.as_ptr())
});
let reset_video_status = match reset_video_status {
Some(x) => x,
None => ObsResetVideoStatus::Failure,
};
if reset_video_status != ObsResetVideoStatus::Success {
return Err(ObsError::ResetVideoFailure(reset_video_status));
}
let sdr_info = info.obs_video_info.get_sdr_info();
unsafe {
libobs::obs_set_video_levels(sdr_info.sdr_white_level, sdr_info.hdr_nominal_peak_level);
}
unsafe {
obs_modules.load_modules();
}
internal_log_global(
ObsLogLevel::Info,
"==== Startup complete ===============================================".to_string(),
);
Ok((info, obs_modules, native))
}
#[allow(unknown_lints)]
#[allow(ensure_obs_call_in_runtime)]
unsafe fn shutdown_inner() -> Result<(), ObsError> {
for i in 0..libobs::MAX_CHANNELS {
unsafe { libobs::obs_set_output_source(i, ptr::null_mut()) };
}
unsafe {
libobs::obs_shutdown()
}
let r = LOGGER.lock();
match r {
Ok(mut logger) => {
logger.log(ObsLogLevel::Info, "OBS context shutdown.".to_string());
let allocs = unsafe {
libobs::bnum_allocs()
};
let mut notice = "";
let level = if allocs > 1 {
ObsLogLevel::Error
} else {
notice = " (this is an issue in the OBS source code that cannot be fixed)";
ObsLogLevel::Info
};
logger.log(
level,
format!("Number of memory leaks: {}{}", allocs, notice),
);
#[cfg(any(feature = "__test_environment", test))]
{
assert_eq!(allocs, 1, "Memory leaks detected: {}", allocs);
}
}
Err(_) => {
println!("OBS context shutdown. (but couldn't lock logger)");
}
}
unsafe {
libobs::base_set_crash_handler(None, std::ptr::null_mut());
libobs::base_set_log_handler(None, std::ptr::null_mut());
}
let mut mutex_value = OBS_THREAD_ID.lock().map_err(|_e| ObsError::MutexFailure)?;
*mutex_value = None;
Ok(())
}
#[cfg(target_os = "linux")]
pub fn get_platform(&self) -> Result<crate::utils::initialization::PlatformType, ObsError> {
run_with_obs!(self, || {
let raw_platform = unsafe {
libobs::obs_get_nix_platform()
};
match raw_platform {
libobs::obs_nix_platform_type_OBS_NIX_PLATFORM_X11_EGL => {
crate::utils::initialization::PlatformType::X11
}
libobs::obs_nix_platform_type_OBS_NIX_PLATFORM_WAYLAND => {
crate::utils::initialization::PlatformType::Wayland
}
_ => crate::utils::initialization::PlatformType::Invalid,
}
})
}
}
#[derive(Debug)]
pub struct _ObsRuntimeGuard {
#[cfg(feature = "enable_runtime")]
#[cfg_attr(
all(
feature = "no_blocking_drops",
not(feature = "__test_environment"),
not(test)
),
allow(dead_code)
)]
handle: Arc<Mutex<Option<JoinHandle<()>>>>,
#[cfg(feature = "enable_runtime")]
command_sender: Arc<Sender<ObsCommand>>,
}
#[cfg(feature = "enable_runtime")]
impl Drop for _ObsRuntimeGuard {
fn drop(&mut self) {
log::trace!("Dropping ObsRuntime and shutting down OBS thread");
let r = self.command_sender.send(ObsCommand::Terminate);
if thread::panicking() {
return;
}
r.expect("Failed to send termination command to OBS thread");
#[cfg(any(
not(feature = "no_blocking_drops"),
test,
feature = "__test_environment"
))]
{
if cfg!(feature = "enable_runtime") {
let handle = self.handle.lock();
if handle.is_err() {
log::error!("Failed to lock OBS thread handle for shutdown");
return;
}
let mut handle = handle.unwrap();
let handle = handle.take().expect("Handle can not be empty");
handle.join().expect("Failed to join OBS thread");
}
}
}
}
#[cfg(not(feature = "enable_runtime"))]
impl Drop for _ObsRuntimeGuard {
fn drop(&mut self) {
log::trace!("Dropping ObsRuntime and shutting down OBS thread");
let r = unsafe { ObsRuntime::shutdown_inner() };
if thread::panicking() {
return;
}
r.unwrap();
}
}