use crate::mpsc::Mpsc;
use crate::preempt::PreemptState;
use crate::queue::{QueueKey, TaskId};
use futures::task::ArcWake;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
#[derive(Debug)]
pub struct TaskHeader<K: QueueKey> {
id: TaskId,
qid: K,
qidx: usize,
queued: AtomicBool,
done: AtomicBool,
cancelled: AtomicBool,
ingress_tx: Arc<Mpsc<TaskId>>,
preempt_state: Option<Arc<PreemptState>>,
}
impl<K: QueueKey> TaskHeader<K> {
pub fn new(
id: TaskId,
qid: K,
qidx: usize,
ingress_tx: Arc<Mpsc<TaskId>>,
preempt_state: Option<Arc<PreemptState>>,
) -> Self {
Self {
id,
qid,
qidx,
queued: AtomicBool::new(false),
done: AtomicBool::new(false),
cancelled: AtomicBool::new(false),
ingress_tx,
preempt_state,
}
}
pub fn qid(&self) -> K {
self.qid
}
#[inline]
pub fn enqueue(&self) {
if self.is_done() || self.is_cancelled() {
return;
}
if !self.queued.swap(true, Ordering::AcqRel) {
self.ingress_tx.push(self.id);
if let Some(preempt_state) = &self.preempt_state {
if preempt_state.would_preempt(self.qidx) {
preempt_state.request_preempt();
}
}
}
}
pub fn cancel(&self) {
self.cancelled.store(true, Ordering::Release);
}
#[inline]
pub fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::Acquire)
}
#[inline]
pub fn is_done(&self) -> bool {
self.done.load(Ordering::Acquire)
}
pub fn set_done(&self) {
self.done.store(true, Ordering::Release);
}
#[inline]
pub fn set_queued(&self, queued: bool) {
self.queued.store(queued, Ordering::Release);
}
}
impl<K: QueueKey> ArcWake for TaskHeader<K> {
#[inline]
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.enqueue();
}
}