use crate::nut::activity::LifecycleChange;
use crate::nut::iac::publish::{BroadcastInfo, ResponseSlot};
use crate::nut::Nut;
use crate::DomainStoreData;
use crate::UncheckedActivityId;
pub(crate) mod fifo;
pub(crate) mod inchoate;
pub(crate) enum Deferred {
Broadcast(BroadcastInfo),
BroadcastAwaitingResponse(BroadcastInfo, ResponseSlot),
Subscription(NewSubscription),
OnDeleteSubscription(UncheckedActivityId, OnDelete),
LifecycleChange(LifecycleChange),
RemoveActivity(UncheckedActivityId),
DomainStore(DomainStoreData),
FlushInchoateActivities,
}
use core::sync::atomic::Ordering;
use super::{
iac::subscription::{NewSubscription, OnDelete},
IMPOSSIBLE_ERR_MSG,
};
impl Nut {
pub(crate) fn catch_up_deferred_to_quiescence(&self) {
if !self.executing.swap(true, Ordering::Relaxed) {
#[cfg(feature = "verbose-debug-log")]
debug_print!("Start Executing from quiescent moment");
self.unchecked_catch_up_deferred_to_quiescence();
self.executing.store(false, Ordering::Relaxed);
#[cfg(feature = "verbose-debug-log")]
debug_print!("Quiescence Reached");
}
}
fn unchecked_catch_up_deferred_to_quiescence(&self) {
while let Some(deferred) = self.deferred_events.pop() {
#[cfg(debug_assertions)]
let debug_message = format!("Executing: {:?}", deferred);
#[cfg(feature = "verbose-debug-log")]
#[cfg(debug_assertions)]
debug_print!("{}", debug_message);
#[cfg(feature = "verbose-debug-log")]
#[cfg(debug_assertions)]
if self.deferred_events.len() > 0 {
let events = self.deferred_events.events_debug_list();
debug_print!(
"{} more events in queue: {}",
self.deferred_events.len(),
events
);
}
#[cfg(not(debug_assertions))]
self.exec_deferred(deferred);
#[cfg(debug_assertions)]
if let Err(panic_info) =
std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
self.exec_deferred(deferred)
}))
{
log_print!("Panic ocurred while nuts was executing. {}", debug_message);
log_print!(
"Activity executing right now: {:?}",
self.active_activity_name.get()
);
std::panic::resume_unwind(panic_info);
}
}
}
fn exec_deferred(&self, deferred: Deferred) {
match deferred {
Deferred::Broadcast(b) => self.unchecked_broadcast(b),
Deferred::BroadcastAwaitingResponse(b, slot) => {
self.unchecked_broadcast(b);
Nut::with_response_tracker_mut(|rt| rt.done(&slot));
}
Deferred::Subscription(sub) => {
self.subscriptions.exec_new_subscription(sub);
}
Deferred::OnDeleteSubscription(id, sub) => {
self.activities
.try_borrow_mut()
.expect(IMPOSSIBLE_ERR_MSG)
.add_on_delete(id, sub);
}
Deferred::LifecycleChange(lc) => self.unchecked_lifecycle_change(&lc),
Deferred::RemoveActivity(id) => self.delete_activity(id),
Deferred::DomainStore(d) => self.exec_domain_store(d),
Deferred::FlushInchoateActivities => self
.inchoate_activities
.try_borrow_mut()
.expect(IMPOSSIBLE_ERR_MSG)
.flush(&mut *self.activities.try_borrow_mut().expect(IMPOSSIBLE_ERR_MSG)),
}
}
}
impl Into<Deferred> for BroadcastInfo {
fn into(self) -> Deferred {
Deferred::Broadcast(self)
}
}
impl Into<Deferred> for LifecycleChange {
fn into(self) -> Deferred {
Deferred::LifecycleChange(self)
}
}
#[cfg(debug_assertions)]
impl std::fmt::Debug for Deferred {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Broadcast(b) => write!(f, "Broadcasting {:?}", b),
Self::BroadcastAwaitingResponse(b, _rs) => write!(f, "Broadcasting {:?}", b),
Self::Subscription(sub) => write!(f, "{:?}", sub),
Self::OnDeleteSubscription(_id, _) => {
write!(f, "Adding new on delete listener {}", _id.index)
}
Self::LifecycleChange(lc) => write!(f, "{:?}", lc),
Self::RemoveActivity(_id) => write!(f, "Delete activity {}.", _id.index),
Self::DomainStore(ds) => write!(f, "{:?}", ds),
Self::FlushInchoateActivities => write!(f, "Adding new activities previously deferred"),
}
}
}