use alloc::{
collections::{btree_map::BTreeMap, vec_deque::VecDeque},
sync::{Arc, Weak},
vec::Vec,
};
use core::{
future::poll_fn,
ops::Deref,
sync::atomic::AtomicBool,
task::{Poll, Waker},
time::Duration,
};
use ax_errno::AxResult;
use ax_kspin::SpinNoIrq;
use ax_memory_addr::VirtAddr;
use ax_sync::Mutex;
use ax_task::{
current,
future::{self, block_on, interruptible},
};
use hashbrown::HashMap;
use crate::{
mm::{AddrSpace, Backend, SharedPages},
task::AsThread,
};
#[derive(Default)]
pub struct WaitQueue {
queue: SpinNoIrq<VecDeque<(Waker, u32)>>,
}
impl WaitQueue {
pub fn new() -> Self {
Self::default()
}
pub fn wait_if(
&self,
bitset: u32,
timeout: Option<Duration>,
condition: impl FnOnce() -> bool,
) -> AxResult<bool> {
let mut condition = Some(condition);
block_on(interruptible(future::timeout(
timeout,
poll_fn(|cx| {
if let Some(cond) = condition.take() {
let mut queue = self.queue.lock();
if !cond() {
Poll::Ready(Ok(false))
} else {
queue.push_back((cx.waker().clone(), bitset));
Poll::Pending
}
} else {
Poll::Ready(Ok(true))
}
}),
)))??
}
pub fn wake(&self, count: usize, mask: u32) -> usize {
let mut woke = 0;
self.queue.lock().retain(|(waker, bitset)| {
if woke >= count || (bitset & mask) == 0 {
true
} else {
waker.wake_by_ref();
woke += 1;
false
}
});
woke
}
pub fn is_empty(&self) -> bool {
self.queue.lock().is_empty()
}
pub fn requeue(&self, mut count: usize, target: &WaitQueue) -> usize {
let tasks: Vec<_> = {
let mut wq = self.queue.lock();
count = count.min(wq.len());
wq.drain(..count).collect()
};
if !tasks.is_empty() {
let mut wq = target.queue.lock();
wq.extend(tasks);
}
count
}
}
pub enum FutexKey {
Private {
address: usize,
},
Shared {
offset: usize,
region: Result<Weak<SharedPages>, Weak<()>>,
},
}
impl FutexKey {
pub fn new(aspace: &AddrSpace, address: usize) -> Self {
if let Some(area) = aspace.find_area(VirtAddr::from_usize(address)) {
match area.backend() {
Backend::Shared(backend) => {
return Self::Shared {
offset: address - area.start().as_usize(),
region: Ok(Arc::downgrade(backend.pages())),
};
}
Backend::File(file) => {
return Self::Shared {
offset: address - area.start().as_usize(),
region: Err(file.futex_handle()),
};
}
_ => {}
}
}
Self::Private { address }
}
pub fn new_current(address: usize) -> Self {
Self::new(¤t().as_thread().proc_data.aspace.lock(), address)
}
fn as_usize(&self) -> usize {
match self {
FutexKey::Private { address } => *address,
FutexKey::Shared { offset, .. } => *offset,
}
}
}
pub struct FutexEntry {
pub wq: WaitQueue,
pub owner_dead: AtomicBool,
}
impl FutexEntry {
fn new() -> Self {
Self {
wq: WaitQueue::new(),
owner_dead: AtomicBool::new(false),
}
}
}
pub struct FutexTable(Mutex<HashMap<usize, Arc<FutexEntry>>>);
impl FutexTable {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
Self(Mutex::new(HashMap::new()))
}
pub fn is_empty(&self) -> bool {
self.0.lock().is_empty()
}
pub fn get(&self, key: &FutexKey) -> Option<FutexGuard<'_>> {
let key = key.as_usize();
let entry = self.0.lock().get(&key).cloned()?;
Some(FutexGuard {
table: self,
key,
inner: entry,
})
}
pub fn get_or_insert(&self, key: &FutexKey) -> FutexGuard<'_> {
let key = key.as_usize();
let mut table = self.0.lock();
let entry = table
.entry(key)
.or_insert_with(|| Arc::new(FutexEntry::new()));
FutexGuard {
table: self,
key,
inner: entry.clone(),
}
}
}
#[doc(hidden)]
pub struct FutexGuard<'a> {
table: &'a FutexTable,
key: usize,
inner: Arc<FutexEntry>,
}
impl Deref for FutexGuard<'_> {
type Target = Arc<FutexEntry>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl Drop for FutexGuard<'_> {
fn drop(&mut self) {
if Arc::strong_count(&self.inner) <= 2 && self.inner.wq.is_empty() {
self.table.0.lock().remove(&self.key);
}
}
}
struct FutexTables {
map: BTreeMap<usize, Arc<FutexTable>>,
operations: usize,
}
impl FutexTables {
const fn new() -> Self {
Self {
map: BTreeMap::new(),
operations: 0,
}
}
fn get_or_insert(&mut self, key: usize) -> Arc<FutexTable> {
self.operations += 1;
if self.operations == 100 {
self.operations = 0;
self.map
.retain(|_, table| Arc::strong_count(table) > 1 || !table.is_empty());
}
self.map
.entry(key)
.or_insert_with(|| Arc::new(FutexTable::new()))
.clone()
}
}
static SHARED_FUTEX_TABLES: Mutex<FutexTables> = Mutex::new(FutexTables::new());
pub fn futex_table_for(key: &FutexKey) -> Arc<FutexTable> {
match key {
FutexKey::Private { .. } => current().as_thread().proc_data.futex_table.clone(),
FutexKey::Shared { region, .. } => {
let ptr = match region {
Ok(pages) => Weak::as_ptr(pages) as usize,
Err(key) => Weak::as_ptr(key) as usize,
};
SHARED_FUTEX_TABLES.lock().get_or_insert(ptr)
}
}
}