use std::marker::PhantomData;
use crate::Message;
use crate::PROCESS;
use crate::ProcessItem;
use crate::ProcessMonitor;
use crate::Receivable;
use crate::SystemMessage;
use crate::deserialize_value;
pub struct ProcessReceiver<T: Receivable> {
ignore_type: bool,
_message: PhantomData<T>,
}
impl<T> ProcessReceiver<T>
where
T: Receivable,
{
pub(crate) fn new() -> ProcessReceiver<T> {
Self {
ignore_type: true,
_message: PhantomData,
}
}
pub fn strict_type_checking(mut self) -> Self {
self.ignore_type = false;
self
}
pub fn for_message<N: Receivable>(self) -> ProcessReceiver<N> {
ProcessReceiver::<N> {
ignore_type: self.ignore_type,
_message: PhantomData,
}
}
pub async fn select<F: (Fn(&Message<&T>) -> bool) + Send>(self, filter: F) -> Message<T> {
let result = PROCESS.with(|process| {
let mut items = process.items.borrow_mut();
let mut found: Option<usize> = None;
for (index, item) in items.iter_mut().enumerate() {
match process_item::<T>(item) {
Ok(Some(message)) => {
if filter(&message) {
found = Some(index);
break;
}
}
Ok(None) => {
continue;
}
Err(_) => {
if self.ignore_type {
continue;
} else {
panic!("Unsupported message type!")
}
}
}
}
if let Some(found) = found {
return Some(convert_item::<T>(items.remove(found)));
}
None
});
if let Some(message) = result {
return message;
}
let receiver = PROCESS.with(|process| process.receiver.clone());
loop {
let mut item = receiver.recv_async().await.unwrap();
match process_item::<T>(&mut item) {
Ok(Some(message)) => {
if filter(&message) {
return convert_item::<T>(item);
}
PROCESS.with(|process| process.items.borrow_mut().push(item));
}
Ok(None) => {
continue;
}
Err(_) => {
if self.ignore_type {
PROCESS.with(|process| process.items.borrow_mut().push(item));
continue;
} else {
panic!("Unsupported message type!")
}
}
}
}
}
pub fn remove<F: (FnMut(&Message<&T>) -> bool) + Send>(self, mut filter: F) {
PROCESS.with(|process| {
let mut items = process.items.borrow_mut();
items.retain_mut(|item| match process_item::<T>(item) {
Ok(Some(message)) => !filter(&message),
Ok(None) => true,
Err(_) => {
if self.ignore_type {
true
} else {
panic!("Unsupported message type!")
}
}
});
});
let receiver = PROCESS.with(|process| process.receiver.clone());
for mut item in receiver.drain() {
match process_item::<T>(&mut item) {
Ok(Some(message)) => {
if !filter(&message) {
PROCESS.with(|process| process.items.borrow_mut().push(item));
}
}
Ok(None) => continue,
Err(_) => {
if self.ignore_type {
PROCESS.with(|process| process.items.borrow_mut().push(item));
continue;
} else {
panic!("Unsupported message type!")
}
}
}
}
}
pub async fn receive(self) -> Message<T> {
self.select(|_| true).await
}
}
#[inline(always)]
fn convert_item<T: Receivable>(item: ProcessItem) -> Message<T> {
match item {
ProcessItem::UserRemoteMessage(_) => unreachable!(),
ProcessItem::UserLocalMessage(deserialized) => {
deserialized.downcast().map(|x| Message::User(*x)).unwrap()
}
ProcessItem::SystemMessage(system) => Message::System(system),
ProcessItem::MonitorProcessDown(_, _, _) => unreachable!(),
ProcessItem::MonitorNodeDown(_, _) => unreachable!(),
ProcessItem::MonitorProcessUpdate(_, _) => unreachable!(),
ProcessItem::AliasDeactivated(_) => unreachable!(),
}
}
#[inline(always)]
fn process_item<T: Receivable>(item: &mut ProcessItem) -> Result<Option<Message<&T>>, ()> {
if let ProcessItem::UserRemoteMessage(serialized) = item {
let result: Result<T, _> = deserialize_value(serialized);
if let Ok(result) = result {
*item = ProcessItem::UserLocalMessage(Box::new(result));
} else {
return Err(());
}
}
match item {
ProcessItem::UserRemoteMessage(_) => unreachable!(),
ProcessItem::UserLocalMessage(deserialized) => deserialized
.downcast_ref()
.map(Message::User)
.ok_or(())
.map(Some),
ProcessItem::SystemMessage(system) => Ok(Some(Message::System(system.clone()))),
ProcessItem::MonitorProcessDown(dest, reference, exit_reason) => {
if PROCESS.with(|process| process.monitors.borrow_mut().remove(reference).is_none()) {
return Ok(None);
}
let system = SystemMessage::ProcessDown(dest.clone(), *reference, exit_reason.clone());
*item = ProcessItem::SystemMessage(system.clone());
Ok(Some(Message::System(system)))
}
ProcessItem::MonitorNodeDown(node, reference) => {
if PROCESS.with(|process| process.monitors.borrow_mut().remove(reference).is_none()) {
return Ok(None);
}
let system = SystemMessage::NodeDown(node.clone(), *reference);
*item = ProcessItem::SystemMessage(system.clone());
Ok(Some(Message::System(system)))
}
ProcessItem::MonitorProcessUpdate(reference, pid) => {
PROCESS.with(|process| {
if let Some(monitor) = process.monitors.borrow_mut().get_mut(reference) {
*monitor = ProcessMonitor::ForProcess(Some(*pid));
}
});
Ok(None)
}
ProcessItem::AliasDeactivated(id) => {
PROCESS.with(|process| process.aliases.borrow_mut().remove(id));
Ok(None)
}
}
}