use core::{future::poll_fn, task::Poll};
use ax_errno::{AxError, AxResult};
use axpoll::{IoEvents, Pollable};
use crate::current;
pub async fn poll_io<P: Pollable, F: FnMut() -> AxResult<T>, T>(
pollable: &P,
events: IoEvents,
non_blocking: bool,
mut f: F,
) -> AxResult<T> {
let curr = current();
poll_fn(move |cx| {
match f() {
Ok(value) => return Poll::Ready(Ok(value)),
Err(AxError::WouldBlock) => {}
Err(e) => return Poll::Ready(Err(e)),
}
pollable.register(cx, events);
match f() {
Ok(value) => Poll::Ready(Ok(value)),
Err(AxError::WouldBlock) if non_blocking => Poll::Ready(Err(AxError::WouldBlock)),
Err(AxError::WouldBlock) => {
if curr.poll_interrupt(cx).is_ready() {
Poll::Ready(Err(AxError::Interrupted))
} else {
Poll::Pending
}
}
Err(e) => Poll::Ready(Err(e)),
}
})
.await
}
#[cfg(feature = "irq")]
pub fn register_irq_waker(irq: usize, waker: &core::task::Waker) {
use alloc::{collections::BTreeMap, sync::Arc};
use core::{
ptr::NonNull,
sync::atomic::{AtomicBool, Ordering},
};
use ax_kspin::SpinNoIrq;
use axpoll::PollSet;
use crate::IrqNotify;
static IRQ_NOTIFY: IrqNotify = IrqNotify::new();
static DRAIN_SPAWNED: AtomicBool = AtomicBool::new(false);
static IRQ_STATE: SpinNoIrq<BTreeMap<usize, IrqPollState>> = SpinNoIrq::new(BTreeMap::new());
struct IrqPollState {
pending: bool,
installed: bool,
poll: Arc<PollSet>,
}
unsafe fn irq_waker_handler(
ctx: ax_hal::irq::IrqContext,
_data: NonNull<()>,
) -> ax_hal::irq::IrqReturn {
if let Some(state) = IRQ_STATE.lock().get_mut(&ctx.irq.0) {
state.pending = true;
IRQ_NOTIFY.notify_irq();
}
ax_hal::irq::IrqReturn::Handled
}
fn ensure_drain_spawned() {
if DRAIN_SPAWNED
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
return;
}
crate::spawn_raw(
|| {
loop {
IRQ_NOTIFY.wait();
let mut to_wake: alloc::vec::Vec<Arc<PollSet>> = alloc::vec::Vec::new();
{
let mut map = IRQ_STATE.lock();
for state in map.values_mut() {
if state.pending {
state.pending = false;
to_wake.push(state.poll.clone());
}
}
}
for set in to_wake {
unsafe { set.wake(axpoll::IoEvents::all()) };
}
}
},
alloc::string::String::from("irq_waker_drain"),
0x4000,
);
}
ensure_drain_spawned();
let (poll, should_install) = {
let mut map = IRQ_STATE.lock();
let state = map.entry(irq).or_insert_with(|| IrqPollState {
pending: false,
installed: false,
poll: Arc::new(PollSet::new()),
});
if state.installed {
(state.poll.clone(), false)
} else {
state.installed = true;
(state.poll.clone(), true)
}
};
unsafe { poll.register(waker, axpoll::IoEvents::all()) };
if should_install {
ax_hal::irq::request_shared_irq(irq, irq_waker_handler, NonNull::dangling())
.expect("axtask IRQ-waker bridge could not install shared IRQ action");
}
ax_hal::irq::set_enable(irq, true);
}