use std::cell::RefCell;
use crate::tracer::Tracer;
use crate::reactor::EventId;
const MODTRACE: bool = true;
#[derive(Copy, Clone, Eq, PartialEq)]
pub(crate) struct OneshotId(u32);
impl std::fmt::Debug for OneshotId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!("oneshot:{}", self.0))
}
}
#[derive(Debug, Clone)]
struct RegInfo {
data: *mut (),
event_id: EventId,
}
impl RegInfo {
fn new(data: *mut (), event_id: EventId) -> Self {
RegInfo {
data,
event_id,
}
}
}
#[derive(Clone)]
enum PeerState {
Created,
Registered(RegInfo),
Exchanged,
Dropped,
}
impl std::fmt::Debug for PeerState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PeerState::Created => f.write_str("Created"),
PeerState::Registered(..) => f.write_str("Registered"),
PeerState::Exchanged => f.write_str("Exchanged"),
PeerState::Dropped => f.write_str("Dropped"),
}
}
}
#[derive(Clone)]
struct OneshotNode {
id: OneshotId,
sender: PeerState,
receiver: PeerState,
recv_exchanged: bool,
}
impl OneshotNode {
fn new(id: u32) -> Self {
Self {
id: OneshotId(id),
sender: PeerState::Created,
receiver: PeerState::Created,
recv_exchanged: false,
}
}
fn can_be_dropped(&self) -> bool {
match (&self.receiver, &self.sender) {
(PeerState::Dropped, PeerState::Dropped) => true,
_ => false,
}
}
}
impl std::fmt::Debug for OneshotNode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.sender {
PeerState::Created => f.write_str("(C->"),
PeerState::Registered(..) => {
if matches!(self.receiver, PeerState::Registered(..)) {
f.write_str("(R->")
} else if matches!(self.receiver, PeerState::Created) {
f.write_str("(R->")
} else {
f.write_str("{R->")
}
}
PeerState::Exchanged => f.write_str("(E->"),
PeerState::Dropped => f.write_str("(D->"),
}?;
match self.receiver {
PeerState::Created => f.write_str("C)"),
PeerState::Registered(..) => {
if matches!(self.sender, PeerState::Created) {
f.write_str("R)")
} else {
f.write_str("R}")
}
}
PeerState::Exchanged => f.write_str("E)"),
PeerState::Dropped => {
if self.recv_exchanged && matches!(self.sender, PeerState::Registered(..)) {
f.write_str("D*)")
} else {
f.write_str("D)")
}
}
}
}
}
pub(crate) struct OneshotRt {
inner: RefCell<InnerOneshotRt>,
}
impl OneshotRt {
pub(crate) fn new(tracer: &Tracer) -> Self {
OneshotRt {
inner: RefCell::new(InnerOneshotRt::new(tracer)),
}
}
pub(crate) fn create(&self) -> OneshotId {
self.inner.borrow_mut().create()
}
pub(crate) fn reg_sender(
&self,
oneshot_id: OneshotId,
event_id: EventId,
data: *mut (),
) {
self.inner
.borrow_mut()
.reg_sender(oneshot_id, event_id, data);
}
pub(crate) fn reg_receiver(
&self,
oneshot_id: OneshotId,
event_id: EventId,
data: *mut (),
) {
self.inner
.borrow_mut()
.reg_receiver(oneshot_id, event_id, data);
}
pub(crate) fn get_awake_event_id(&self) -> Option<EventId> {
self.inner.borrow().get_awake_event_id()
}
pub(crate) unsafe fn exchange<T>(&self, oneshot_id: OneshotId) -> bool {
self.inner.borrow_mut().exchange::<T>(oneshot_id)
}
pub(crate) fn cancel_sender(&self, oneshot_id: OneshotId) {
self.inner.borrow_mut().cancel_sender(oneshot_id);
}
pub(crate) fn cancel_receiver(&self, oneshot_id: OneshotId) {
self.inner.borrow_mut().cancel_receiver(oneshot_id);
}
}
struct InnerOneshotRt {
nodes: Vec<OneshotNode>,
last_id: u32,
tracer: Tracer,
}
impl InnerOneshotRt {
fn new(tracer: &Tracer) -> Self {
InnerOneshotRt {
nodes: Vec::new(),
last_id: 0,
tracer: tracer.clone(),
}
}
fn find_index(&self, oneshot_id: OneshotId) -> usize {
self.nodes
.iter()
.position(|node| node.id == oneshot_id)
.unwrap()
}
fn set_sender(&mut self, oneshot_id: OneshotId, sender: PeerState, log_context: &str) {
let idx = self.find_index(oneshot_id);
let old = self.nodes[idx].clone();
self.nodes[idx] = OneshotNode {
id: old.id,
sender: sender,
receiver: old.receiver.clone(),
recv_exchanged: old.recv_exchanged,
};
modtrace!(
self.tracer,
"oneshot_rt: {:?} state {:?} -> {:?} ({})",
oneshot_id,
old,
self.nodes[idx],
log_context
);
if self.nodes[idx].can_be_dropped() {
modtrace!(
self.tracer,
"oneshot_rt: remove {:?} from idx {}",
oneshot_id,
idx
);
self.nodes.remove(idx);
}
}
fn set_receiver(&mut self, oneshot_id: OneshotId, receiver: PeerState, log_context: &str) {
let idx = self.find_index(oneshot_id);
let old = self.nodes[idx].clone();
self.nodes[idx] = OneshotNode {
id: old.id,
sender: old.sender.clone(),
receiver: receiver,
recv_exchanged: old.recv_exchanged,
};
modtrace!(
self.tracer,
"oneshot_rt: {:?} state {:?} -> {:?} ({})",
oneshot_id,
old,
self.nodes[idx],
log_context
);
if self.nodes[idx].can_be_dropped() {
self.nodes.remove(idx);
modtrace!(
self.tracer,
"oneshot_rt: remove {:?} from idx {}",
oneshot_id,
idx
);
}
}
fn set_receiver_ext(
&mut self,
oneshot_id: OneshotId,
receiver: PeerState,
recv_exchanged: bool,
log_context: &str,
) {
let idx = self.find_index(oneshot_id);
let old = self.nodes[idx].clone();
self.nodes[idx] = OneshotNode {
id: old.id,
sender: old.sender.clone(),
receiver: receiver,
recv_exchanged: recv_exchanged,
};
modtrace!(
self.tracer,
"oneshot_rt: {:?} state {:?} -> {:?} ({})",
oneshot_id,
old,
self.nodes[idx],
log_context
);
}
fn create(&mut self) -> OneshotId {
self.last_id = self.last_id.wrapping_add(1);
self.nodes.push(OneshotNode::new(self.last_id));
OneshotId(self.last_id)
}
fn reg_sender(
&mut self,
oneshot_id: OneshotId,
event_id: EventId,
data: *mut (),
) {
let reg_info = RegInfo::new(data, event_id);
self.set_sender(
oneshot_id,
PeerState::Registered(reg_info),
"by reg_sender()",
);
}
fn reg_receiver(
&mut self,
oneshot_id: OneshotId,
event_id: EventId,
data: *mut (),
) {
let reg_info = RegInfo::new(data, event_id);
self.set_receiver(
oneshot_id,
PeerState::Registered(reg_info),
"by reg_receiver()",
);
}
fn get_awake_event_id(&self) -> Option<EventId> {
self.nodes
.iter()
.find_map(|node| Self::get_event_id_for_node(&node))
}
fn get_event_id_for_node(node: &OneshotNode) -> Option<EventId> {
if matches!(node.sender, PeerState::Created) {
return None;
}
if matches!(node.receiver, PeerState::Created) {
return None;
}
match &node.receiver {
PeerState::Registered(ref rx_reg_info) => {
return Some(rx_reg_info.event_id);
}
_ => (),
}
match &node.sender {
PeerState::Registered(ref tx_reg_info) => {
return Some(tx_reg_info.event_id);
}
_ => (),
}
debug_assert!(
match (&node.sender, &node.receiver) {
(PeerState::Created, PeerState::Exchanged)
| (PeerState::Exchanged, PeerState::Created)
| (PeerState::Exchanged, PeerState::Registered(..)) => false,
_ => true,
},
concat!(
"aiur: oneshot::get_awake_event_id() invoked in unexpected state. ",
"Sender: {:?}, receiver: {:?}"
),
node.sender,
node.receiver,
);
return None;
}
unsafe fn exhange_impl<T>(tx_data: *mut (), rx_data: *mut (), tracer: &Tracer) {
let tx_data = std::mem::transmute::<*mut (), *mut Option<T>>(tx_data);
let rx_data = std::mem::transmute::<*mut (), *mut Option<T>>(rx_data);
std::mem::swap(&mut *tx_data, &mut *rx_data);
modtrace!(tracer, "oneshot_rt: exchange<T> mem::swap() just happened");
}
pub(crate) unsafe fn exchange<T>(&mut self, oneshot_id: OneshotId) -> bool {
let node = self.nodes[self.find_index(oneshot_id)].clone();
match (&node.sender, &node.receiver) {
(PeerState::Registered(..), PeerState::Exchanged) => {
self.set_sender(oneshot_id, PeerState::Exchanged, "by exchange()");
return true;
}
(PeerState::Registered(..), PeerState::Dropped) => {
self.set_sender(oneshot_id, PeerState::Exchanged, "by exchange()");
return node.recv_exchanged;
}
(PeerState::Dropped, PeerState::Registered(..)) => {
self.set_receiver(oneshot_id, PeerState::Exchanged, "by exchange()");
return false;
}
(PeerState::Registered(ref tx), PeerState::Registered(ref rx)) => {
Self::exhange_impl::<T>(tx.data, rx.data, &self.tracer);
self.set_receiver_ext(oneshot_id, PeerState::Exchanged, true, "by exchange()");
return true;
}
_ =>
{
panic!(
concat!(
"aiur: oneshot::exhange() invoked in unexpected state. ",
"Sender: {:?}, receiver: {:?}"
),
node.sender, node.receiver
)
}
}
}
pub(crate) fn cancel_sender(&mut self, oneshot_id: OneshotId) {
self.set_sender(oneshot_id, PeerState::Dropped, "by cancel_sender()");
}
pub(crate) fn cancel_receiver(&mut self, oneshot_id: OneshotId) {
self.set_receiver(oneshot_id, PeerState::Dropped, "by cancel_receiver()");
}
}