use core::{future::poll_fn, task::Poll};
use ax_errno::{AxError, AxResult};
use axpoll::{IoEvents, Pollable};
pub async fn poll_io<P: Pollable, F: FnMut() -> AxResult<T>, T>(
pollable: &P,
events: IoEvents,
non_blocking: bool,
mut f: F,
) -> AxResult<T> {
super::interruptible(poll_fn(move |cx| match f() {
Ok(value) => Poll::Ready(Ok(value)),
Err(AxError::WouldBlock) => {
pollable.register(cx, events);
if non_blocking {
return Poll::Ready(Err(AxError::WouldBlock));
}
match f() {
Ok(value) => Poll::Ready(Ok(value)),
Err(AxError::WouldBlock) => Poll::Pending,
Err(e) => Poll::Ready(Err(e)),
}
}
Err(e) => Poll::Ready(Err(e)),
}))
.await?
}
#[cfg(feature = "irq")]
pub fn register_irq_waker(irq: usize, waker: &core::task::Waker) {
use alloc::{collections::BTreeMap, sync::Arc};
use core::{
ptr::NonNull,
sync::atomic::{AtomicBool, Ordering},
};
use ax_kspin::SpinNoIrq;
use axpoll::PollSet;
use crate::WaitQueue;
const MAX_TRACKED_IRQ: usize = 256;
static IRQ_PENDING: [AtomicBool; MAX_TRACKED_IRQ] =
[const { AtomicBool::new(false) }; MAX_TRACKED_IRQ];
static IRQ_ACTION_INSTALLED: [AtomicBool; MAX_TRACKED_IRQ] =
[const { AtomicBool::new(false) }; MAX_TRACKED_IRQ];
static ANY_PENDING: AtomicBool = AtomicBool::new(false);
static DRAIN_WQ: WaitQueue = WaitQueue::new();
static DRAIN_SPAWNED: AtomicBool = AtomicBool::new(false);
static POLL_IRQ: SpinNoIrq<BTreeMap<usize, Arc<PollSet>>> = SpinNoIrq::new(BTreeMap::new());
unsafe fn irq_waker_handler(
ctx: ax_hal::irq::IrqContext,
_data: NonNull<()>,
) -> ax_hal::irq::IrqReturn {
let irq = ctx.irq.0;
if irq < MAX_TRACKED_IRQ {
IRQ_PENDING[irq].store(true, Ordering::Release);
ANY_PENDING.store(true, Ordering::Release);
DRAIN_WQ.notify_one(false);
}
ax_hal::irq::IrqReturn::Handled
}
fn ensure_drain_spawned() {
if DRAIN_SPAWNED
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
return;
}
crate::spawn_raw(
|| {
loop {
DRAIN_WQ.wait_until(|| ANY_PENDING.swap(false, Ordering::AcqRel));
let mut to_wake: alloc::vec::Vec<Arc<PollSet>> = alloc::vec::Vec::new();
{
let map = POLL_IRQ.lock();
for (irq, slot) in IRQ_PENDING.iter().enumerate() {
if slot.swap(false, Ordering::AcqRel)
&& let Some(set) = map.get(&irq)
{
to_wake.push(set.clone());
}
}
}
for set in to_wake {
set.wake();
}
}
},
alloc::string::String::from("irq_waker_drain"),
0x4000,
);
}
if irq >= MAX_TRACKED_IRQ {
warn!(
"register_irq_waker: IRQ {irq} exceeds MAX_TRACKED_IRQ={MAX_TRACKED_IRQ}; ignoring \
registration to avoid silently dropping wakeups"
);
return;
}
ensure_drain_spawned();
if IRQ_ACTION_INSTALLED[irq]
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
ax_hal::irq::request_shared_irq(irq, irq_waker_handler, NonNull::dangling())
.expect("axtask IRQ-waker bridge could not install shared IRQ action");
}
POLL_IRQ
.lock()
.entry(irq)
.or_insert_with(|| Arc::new(PollSet::new()))
.register(waker);
ax_hal::irq::set_enable(irq, true);
}