use core::{future::poll_fn, task::Poll};
use ax_errno::{AxError, AxResult};
use axpoll::{IoEvents, Pollable};
pub async fn poll_io<P: Pollable, F: FnMut() -> AxResult<T>, T>(
pollable: &P,
events: IoEvents,
non_blocking: bool,
mut f: F,
) -> AxResult<T> {
super::interruptible(poll_fn(move |cx| match f() {
Ok(value) => Poll::Ready(Ok(value)),
Err(AxError::WouldBlock) => {
pollable.register(cx, events);
if non_blocking {
return Poll::Ready(Err(AxError::WouldBlock));
}
match f() {
Ok(value) => Poll::Ready(Ok(value)),
Err(AxError::WouldBlock) => Poll::Pending,
Err(e) => Poll::Ready(Err(e)),
}
}
Err(e) => Poll::Ready(Err(e)),
}))
.await?
}
#[cfg(feature = "irq")]
pub fn register_irq_waker(irq: usize, waker: &core::task::Waker) {
use alloc::{collections::BTreeMap, sync::Arc};
use core::sync::atomic::{AtomicBool, Ordering};
use ax_kspin::SpinNoIrq;
use axpoll::PollSet;
use crate::WaitQueue;
const MAX_TRACKED_IRQ: usize = 256;
static IRQ_PENDING: [AtomicBool; MAX_TRACKED_IRQ] =
[const { AtomicBool::new(false) }; MAX_TRACKED_IRQ];
static ANY_PENDING: AtomicBool = AtomicBool::new(false);
static DRAIN_WQ: WaitQueue = WaitQueue::new();
static DRAIN_SPAWNED: AtomicBool = AtomicBool::new(false);
static HOOK_INSTALLED: AtomicBool = AtomicBool::new(false);
static POLL_IRQ: SpinNoIrq<BTreeMap<usize, Arc<PollSet>>> = SpinNoIrq::new(BTreeMap::new());
fn irq_hook(irq: usize) {
if irq < MAX_TRACKED_IRQ {
IRQ_PENDING[irq].store(true, Ordering::Release);
ANY_PENDING.store(true, Ordering::Release);
DRAIN_WQ.notify_one(false);
}
}
fn ensure_drain_spawned() {
if DRAIN_SPAWNED
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
return;
}
crate::spawn_raw(
|| {
loop {
DRAIN_WQ.wait_until(|| ANY_PENDING.swap(false, Ordering::AcqRel));
let mut to_wake: alloc::vec::Vec<Arc<PollSet>> = alloc::vec::Vec::new();
{
let map = POLL_IRQ.lock();
for (irq, slot) in IRQ_PENDING.iter().enumerate() {
if slot.swap(false, Ordering::AcqRel)
&& let Some(set) = map.get(&irq)
{
to_wake.push(set.clone());
}
}
}
for set in to_wake {
set.wake();
}
}
},
alloc::string::String::from("irq_waker_drain"),
0x4000,
);
}
if irq >= MAX_TRACKED_IRQ {
warn!(
"register_irq_waker: IRQ {irq} exceeds MAX_TRACKED_IRQ={MAX_TRACKED_IRQ}; ignoring \
registration to avoid silently dropping wakeups"
);
return;
}
ensure_drain_spawned();
if HOOK_INSTALLED
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
assert!(
ax_hal::irq::register_irq_hook(irq_hook),
"axtask IRQ-waker bridge could not install its post-IRQ hook: axhal's single hook \
slot is already claimed by another subsystem. Wakers registered here would never \
fire."
);
}
POLL_IRQ
.lock()
.entry(irq)
.or_insert_with(|| Arc::new(PollSet::new()))
.register(waker);
ax_hal::irq::set_enable(irq, true);
}