pub(crate) mod activity;
pub(crate) mod exec;
pub(crate) mod iac;
use crate::nut::exec::Deferred;
use crate::nut::iac::subscription::OnDelete;
use crate::*;
use crate::{debug::DebugTypeName, nut::exec::inchoate::InchoateActivityContainer};
use core::any::Any;
use core::sync::atomic::AtomicBool;
use exec::fifo::ThreadLocalFifo;
use iac::managed_state::*;
use std::cell::RefCell;
use self::iac::{
publish::{BroadcastInfo, ResponseTracker},
subscription::Subscriptions,
};
thread_local!(static NUT: Nut = Nut::new());
pub(crate) const IMPOSSIBLE_ERR_MSG: &str =
"Bug in nuts. It should be impossible to trigger this panic through any combinations of library calls.";
#[derive(Default)]
struct Nut {
activities: RefCell<ActivityContainer>,
managed_state: RefCell<ManagedState>,
subscriptions: Subscriptions,
deferred_events: ThreadLocalFifo<Deferred>,
response_tracker: RefCell<ResponseTracker>,
executing: AtomicBool,
inchoate_activities: RefCell<InchoateActivityContainer>,
#[cfg(debug_assertions)]
active_activity_name: std::cell::Cell<Option<DebugTypeName>>,
}
pub(crate) type Handler = Box<dyn Fn(&mut ActivityContainer, &mut ManagedState)>;
impl Nut {
fn new() -> Self {
Default::default()
}
fn quiescent(&self) -> bool {
!self.executing.load(std::sync::atomic::Ordering::Relaxed)
}
fn add_on_delete(&self, id: UncheckedActivityId, subscription: OnDelete) {
if self.quiescent() {
self.activities
.try_borrow_mut()
.expect(IMPOSSIBLE_ERR_MSG)
.add_on_delete(id, subscription);
} else {
self.deferred_events
.push(Deferred::OnDeleteSubscription(id, subscription))
}
}
pub(crate) fn with_response_tracker_mut<T>(f: impl FnOnce(&mut ResponseTracker) -> T) -> T {
NUT.with(|nut| {
let mut response_tracker = nut
.response_tracker
.try_borrow_mut()
.expect(IMPOSSIBLE_ERR_MSG);
f(&mut *response_tracker)
})
}
}
pub(crate) fn new_activity<A>(
activity: A,
domain_index: DomainId,
status: LifecycleStatus,
) -> ActivityId<A>
where
A: Activity,
{
NUT.with(|nut| {
if !nut.executing.load(std::sync::atomic::Ordering::Relaxed) {
nut.managed_state
.try_borrow_mut()
.expect(IMPOSSIBLE_ERR_MSG)
.prepare(domain_index);
nut.inchoate_activities
.try_borrow_mut()
.expect(IMPOSSIBLE_ERR_MSG)
.inc_offset();
nut.activities
.try_borrow_mut()
.expect(IMPOSSIBLE_ERR_MSG)
.add(activity, domain_index, status)
} else {
nut.deferred_events.push(Deferred::FlushInchoateActivities);
let a = nut
.inchoate_activities
.try_borrow_mut()
.expect(IMPOSSIBLE_ERR_MSG)
.add(activity, domain_index, status);
#[cfg(feature = "verbose-debug-log")]
#[cfg(debug_assertions)]
debug_print!(
"Added activity as inchoate. ID = {} ({} complete activities, {} inchoate activities exist)",
a.id.index,
nut.inchoate_activities.try_borrow().expect(IMPOSSIBLE_ERR_MSG).offset(),
nut.inchoate_activities.try_borrow().expect(IMPOSSIBLE_ERR_MSG).len() - 1
);
a
}
})
}
pub(crate) fn publish_custom<MSG: Any>(a: MSG) {
NUT.with(|nut| nut.broadcast(BroadcastInfo::global(a, Topic::public_message::<MSG>())))
}
pub(crate) fn send_custom<RECV: Any, MSG: Any>(a: MSG) {
NUT.with(|nut| {
nut.broadcast(BroadcastInfo::local_by_type::<RECV, MSG>(
a,
Topic::private_message::<MSG>(),
))
})
}
pub(crate) fn send_custom_by_id<MSG: Any>(msg: MSG, id: UncheckedActivityId) {
NUT.with(|nut| {
nut.broadcast(BroadcastInfo::local(
msg,
id,
Topic::private_message::<MSG>(),
))
})
}
pub(crate) async fn publish_custom_and_await<A: Any>(a: A) {
NUT.with(move |nut| nut.publish_and_await(a)).await;
}
pub(crate) fn register<A, F, MSG>(id: ActivityId<A>, f: F, filter: SubscriptionFilter)
where
A: Activity,
F: Fn(&mut A, &MSG) + 'static,
MSG: Any,
{
NUT.with(|nut| {
let closure = ManagedState::pack_closure::<_, _, MSG>(f, id, filter);
let topic = Topic::public_message::<MSG>();
nut.push_closure(topic, id, closure);
});
}
pub(crate) fn register_mut<A, F, MSG>(id: ActivityId<A>, f: F, filter: SubscriptionFilter)
where
A: Activity,
F: Fn(&mut A, &mut MSG) + 'static,
MSG: Any,
{
NUT.with(|nut| {
let closure = ManagedState::pack_closure_mut::<_, _, MSG>(f, id, filter);
let topic = Topic::public_message::<MSG>();
nut.push_closure(topic, id, closure);
});
}
pub(crate) fn register_owned<A, F, MSG>(id: ActivityId<A>, f: F, filter: SubscriptionFilter)
where
A: Activity,
F: Fn(&mut A, MSG) + 'static,
MSG: Any,
{
NUT.with(|nut| {
let closure = ManagedState::pack_closure_owned::<_, _, MSG>(f, id, filter);
let topic = Topic::private_message::<MSG>();
nut.push_closure(topic, id, closure);
});
}
pub(crate) fn register_no_payload<A, F>(
id: ActivityId<A>,
f: F,
topic: Topic,
filter: SubscriptionFilter,
) where
A: Activity,
F: Fn(&mut A) + 'static,
{
NUT.with(|nut| {
let closure = ManagedState::pack_closure_no_payload(f, id, filter);
nut.push_closure(topic, id, closure);
});
}
pub(crate) fn register_domained<A, F, MSG>(id: ActivityId<A>, f: F, filter: SubscriptionFilter)
where
A: Activity,
F: Fn(&mut A, &mut DomainState, &MSG) + 'static,
MSG: Any,
{
NUT.with(|nut| {
let closure = ManagedState::pack_domained_closure(f, id, filter);
let topic = Topic::public_message::<MSG>();
nut.push_closure(topic, id, closure);
});
}
pub(crate) fn register_domained_mut<A, F, MSG>(id: ActivityId<A>, f: F, filter: SubscriptionFilter)
where
A: Activity,
F: Fn(&mut A, &mut DomainState, &mut MSG) + 'static,
MSG: Any,
{
NUT.with(|nut| {
let closure = ManagedState::pack_domained_closure_mut(f, id, filter);
let topic = Topic::public_message::<MSG>();
nut.push_closure(topic, id, closure);
});
}
pub(crate) fn register_domained_owned<A, F, MSG>(
id: ActivityId<A>,
f: F,
filter: SubscriptionFilter,
) where
A: Activity,
F: Fn(&mut A, &mut DomainState, MSG) + 'static,
MSG: Any,
{
NUT.with(|nut| {
let closure = ManagedState::pack_domained_closure_owned(f, id, filter);
let topic = Topic::private_message::<MSG>();
nut.push_closure(topic, id, closure);
});
}
pub(crate) fn register_domained_no_payload<A, F>(
id: ActivityId<A>,
f: F,
topic: Topic,
filter: SubscriptionFilter,
) where
A: Activity,
F: Fn(&mut A, &mut DomainState) + 'static,
{
NUT.with(|nut| {
let closure = ManagedState::pack_closure_domained_no_payload(f, id, filter);
nut.push_closure(topic, id, closure);
});
}
pub(crate) fn register_on_delete<A, F>(id: ActivityId<A>, f: F)
where
A: Activity,
F: FnOnce(A) + 'static,
{
NUT.with(|nut| {
let closure = Box::new(|a: Box<dyn Any>| {
let activity = a.downcast().expect(IMPOSSIBLE_ERR_MSG);
f(*activity);
});
nut.add_on_delete(id.into(), OnDelete::Simple(closure));
})
}
pub(crate) fn register_domained_on_delete<A, F>(id: ActivityId<A>, f: F)
where
A: Activity,
F: FnOnce(A, &mut DomainState) + 'static,
{
NUT.with(|nut| {
let closure = Box::new(move |a: Box<dyn Any>, managed_state: &mut ManagedState| {
let activity = a.downcast().expect(IMPOSSIBLE_ERR_MSG);
let domain = managed_state
.get_mut(id.domain_index)
.expect("missing domain");
f(*activity, domain);
});
let subscription = OnDelete::WithDomain(closure);
nut.add_on_delete(id.into(), subscription);
})
}
pub(crate) fn set_status(id: UncheckedActivityId, status: LifecycleStatus) {
NUT.with(|nut| nut.set_status(id, status));
}
pub(crate) fn write_domain<D, T>(domain: &D, data: T)
where
D: DomainEnumeration,
T: core::any::Any,
{
NUT.with(|nut| {
let id = DomainId::new(domain);
if let Ok(mut managed_state) = nut.managed_state.try_borrow_mut() {
managed_state.prepare(id);
let storage = managed_state.get_mut(id).expect("No domain");
storage.store(data);
} else {
let event = Deferred::DomainStore(DomainStoreData::new(id, data));
nut.deferred_events.push(event);
}
})
}
#[cfg(debug_assertions)]
pub(crate) fn nuts_panic_info() -> Option<String> {
NUT.try_with(|nut| {
let mut info = String::new();
info.push_str("NUTS panic hook: Panicked while ");
if let Some(activity) = nut.active_activity_name.get() {
info.push_str(activity.0);
} else {
info.push_str("no");
}
info.push_str(" activity was active.\n");
info
})
.ok()
}