#![deny(unsafe_op_in_unsafe_fn)]
#![deny(clippy::undocumented_unsafe_blocks)]
#![allow(clippy::expl_impl_clone_on_copy)]
use std::{
cell::Cell,
fmt, ptr,
sync::{Arc, atomic::Ordering},
};
use crate::{
Context, JsNativeError, JsResult, JsValue,
builtins::{
array_buffer::{SharedArrayBuffer, utils::SliceRef},
promise::ResolvingFunctions,
typed_array::Element,
},
job::{NativeAsyncJob, TimeoutJob},
js_string,
sys::time::{Duration, Instant},
};
use std::sync::{Condvar, Mutex, MutexGuard};
use boa_string::JsString;
use intrusive_collections::{LinkedList, LinkedListLink, UnsafeRef, intrusive_adapter};
use small_btree::{Entry, SmallBTreeMap};
use futures_channel::oneshot;
#[derive(Debug, Clone, Copy)]
pub(super) enum AtomicsWaitResult {
NotEqual,
TimedOut,
Ok,
}
impl AtomicsWaitResult {
pub(super) fn to_js_string(self) -> JsString {
match self {
AtomicsWaitResult::NotEqual => js_string!("not-equal"),
AtomicsWaitResult::TimedOut => js_string!("timed-out"),
AtomicsWaitResult::Ok => js_string!("ok"),
}
}
}
#[derive(Debug)]
struct AsyncWaiterData {
_buffer: SharedArrayBuffer,
sender: oneshot::Sender<AtomicsWaitResult>,
}
struct FutexWaiter {
link: LinkedListLink,
cond_var: Condvar,
addr: usize,
async_data: Cell<Option<AsyncWaiterData>>,
}
impl fmt::Debug for FutexWaiter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FutexWaiter")
.field("link", &self.link)
.field("cond_var", &self.cond_var)
.field("addr", &self.addr)
.finish_non_exhaustive()
}
}
intrusive_adapter!(FutexWaiterAdapter = UnsafeRef<FutexWaiter>: FutexWaiter { link: LinkedListLink });
impl FutexWaiter {
fn new_sync(addr: usize) -> Self {
Self {
link: LinkedListLink::new(),
cond_var: Condvar::new(),
addr,
async_data: Cell::new(None),
}
}
#[allow(
clippy::arc_with_non_send_sync,
reason = "across threads we only access the fields that are `Sync`"
)]
fn new_async(data: AsyncWaiterData, addr: usize) -> Arc<FutexWaiter> {
Arc::new(Self {
link: LinkedListLink::new(),
cond_var: Condvar::new(),
addr,
async_data: Cell::new(Some(data)),
})
}
}
struct FutexWaiters {
waiters: SmallBTreeMap<usize, LinkedList<FutexWaiterAdapter>, 16>,
}
unsafe impl Send for FutexWaiters {}
impl FutexWaiters {
fn get() -> JsResult<MutexGuard<'static, Self>> {
static CRITICAL_SECTION: Mutex<FutexWaiters> = Mutex::new(FutexWaiters {
waiters: SmallBTreeMap::new(),
});
CRITICAL_SECTION.lock().map_err(|_| {
JsNativeError::typ()
.with_message("failed to synchronize with the agent cluster")
.into()
})
}
unsafe fn add_waiter(&mut self, node: &FutexWaiter) {
let node = unsafe { UnsafeRef::from_raw(ptr::from_ref(node)) };
self.waiters
.entry(node.addr)
.or_insert_with(|| LinkedList::new(FutexWaiterAdapter::new()))
.push_back(node);
}
unsafe fn add_async_waiter(&mut self, node: Arc<FutexWaiter>) {
let node = unsafe { UnsafeRef::from_raw(Arc::into_raw(node)) };
self.waiters
.entry(node.addr)
.or_insert_with(|| LinkedList::new(FutexWaiterAdapter::new()))
.push_back(node);
}
fn notify_many(&mut self, addr: usize, max_count: u64) -> u64 {
let Entry::Occupied(mut wl) = self.waiters.entry(addr) else {
return 0;
};
for i in 0..max_count {
let Some(elem) = wl.get_mut().pop_front() else {
wl.remove();
return i;
};
elem.cond_var.notify_one();
if let Some(async_data) = elem.async_data.take() {
unsafe { Arc::from_raw(UnsafeRef::into_raw(elem)) };
let _ = async_data.sender.send(AtomicsWaitResult::Ok);
}
}
if wl.get().is_empty() {
wl.remove();
}
max_count
}
#[track_caller]
pub(super) unsafe fn remove_waiter(&mut self, node: &FutexWaiter) {
debug_assert!(node.link.is_linked());
let Entry::Occupied(mut wl) = self.waiters.entry(node.addr) else {
panic!("node was not a valid `FutexWaiter`");
};
let node = unsafe {
let Some(node) = wl
.get_mut()
.cursor_mut_from_ptr(ptr::from_ref(node))
.remove()
else {
panic!("node was not a valid `FutexWaiter`")
};
node
};
if let Some(async_data) = node.async_data.take() {
unsafe {
Arc::from_raw(UnsafeRef::into_raw(node));
}
let _ = async_data.sender.send(AtomicsWaitResult::TimedOut);
}
if wl.get().is_empty() {
wl.remove();
}
}
}
pub(super) unsafe fn wait<E: Element + PartialEq>(
buffer: &SharedArrayBuffer,
buf_len: usize,
offset: usize,
check: E,
timeout: Option<Duration>,
) -> JsResult<AtomicsWaitResult> {
let buffer = &buffer.bytes_with_len(buf_len)[offset..];
let mut waiters = FutexWaiters::get()?;
let value = unsafe { E::read(SliceRef::AtomicSlice(buffer)).load(Ordering::SeqCst) };
if check != value {
return Ok(AtomicsWaitResult::NotEqual);
}
let timeout_time = timeout.map(|to| (Instant::now(), to));
let waiter = FutexWaiter::new_sync(buffer.as_ptr().addr());
unsafe {
waiters.add_waiter(&waiter);
}
let result = loop {
if !waiter.link.is_linked() {
break AtomicsWaitResult::Ok;
}
if let Some((start, timeout)) = timeout_time {
let Some(remaining) = timeout.checked_sub(start.elapsed()) else {
break AtomicsWaitResult::TimedOut;
};
waiters = waiter
.cond_var
.wait_timeout(waiters, remaining)
.map_err(|_| {
JsNativeError::typ()
.with_message("failed to synchronize with the agent cluster")
})?
.0;
} else {
waiters = waiter.cond_var.wait(waiters).map_err(|_| {
JsNativeError::typ().with_message("failed to synchronize with the agent cluster")
})?;
}
};
if waiter.link.is_linked() {
unsafe {
waiters.remove_waiter(&waiter);
}
}
drop(waiters);
Ok(result)
}
pub(super) unsafe fn wait_async<E: Element + PartialEq>(
buffer: &SharedArrayBuffer,
buf_len: usize,
offset: usize,
check: E,
timeout: Option<Duration>,
functions: ResolvingFunctions,
context: &mut Context,
) -> JsResult<AtomicsWaitResult> {
let buf = &buffer.bytes_with_len(buf_len)[offset..];
let mut waiters = FutexWaiters::get()?;
let value = unsafe { E::read(SliceRef::AtomicSlice(buf)).load(Ordering::SeqCst) };
if check != value {
return Ok(AtomicsWaitResult::NotEqual);
}
if let Some(timeout) = &timeout
&& timeout.is_zero()
{
return Ok(AtomicsWaitResult::TimedOut);
}
let (sender, receiver) = oneshot::channel();
let waiter = FutexWaiter::new_async(
AsyncWaiterData {
_buffer: buffer.clone(),
sender,
},
buf.as_ptr().addr(),
);
let weak_waiter = Arc::downgrade(&waiter);
unsafe {
waiters.add_async_waiter(waiter.clone());
}
let timeout_cancel = if let Some(timeout) = timeout {
let job = TimeoutJob::with_realm(
move |_| {
let mut waiters = FutexWaiters::get()?;
if let Some(waiter) = weak_waiter.upgrade()
&& waiter.link.is_linked()
{
unsafe {
waiters.remove_waiter(&waiter);
}
}
Ok(JsValue::undefined())
},
context.realm().clone(),
timeout,
);
let tc = job.cancelled_flag();
context.enqueue_job(job.into());
Some(tc)
} else {
None
};
context.enqueue_job(
NativeAsyncJob::new(async move |context| {
if let Ok(result) = receiver.await {
functions
.resolve
.call(
&JsValue::undefined(),
&[result.to_js_string().into()],
&mut context.borrow_mut(),
)
.expect("default resolving functions cannot error");
} else {
}
if let Some(flag) = timeout_cancel {
flag.set();
}
Ok(JsValue::undefined())
})
.into(),
);
Ok(AtomicsWaitResult::Ok)
}
pub(super) fn notify(buffer: &SharedArrayBuffer, offset: usize, count: u64) -> JsResult<u64> {
let addr = buffer.as_ptr().addr() + offset;
let mut waiters = FutexWaiters::get()?;
let count = waiters.notify_many(addr, count);
drop(waiters);
Ok(count)
}