#![allow(clippy::arc_with_non_send_sync)]
#![allow(clippy::too_many_lines)]
use std::sync::{Arc, Weak};
use parking_lot::Mutex;
use smallvec::SmallVec;
use graphrefly_core::{BindingBoundary, Core, FnId, HandleId, NodeId, Sink, NO_HANDLE};
use crate::producer::{ProducerBinding, ProducerBuildFn, ProducerCtx, SubscribeOutcome};
#[allow(clippy::struct_excessive_bools)]
struct StratifyState {
latest_rules: Option<HandleId>,
source_value: Option<HandleId>,
source_dirty: bool,
rules_dirty: bool,
source_phase2: bool,
terminated: bool,
binding_weak: Weak<dyn BindingBoundary>,
}
impl Drop for StratifyState {
fn drop(&mut self) {
let bb = self.binding_weak.upgrade();
let Some(bb) = bb else {
return;
};
if let Some(h) = self.latest_rules.take() {
bb.release_handle(h);
}
if let Some(h) = self.source_value.take() {
bb.release_handle(h);
}
}
}
#[allow(clippy::option_option)]
fn try_resolve(
s: &mut StratifyState,
bb: &Arc<dyn BindingBoundary>,
classifier_fn_id: FnId,
) -> ResolveOutcome {
if s.source_dirty || s.rules_dirty || !s.source_phase2 {
return ResolveOutcome::NotReady;
}
s.source_phase2 = false;
let Some(value_h) = s.source_value.take() else {
return ResolveOutcome::ResolvedNoValue;
};
let Some(rules_h) = s.latest_rules else {
return ResolveOutcome::Drop(value_h);
};
if bb.invoke_stratify_classifier_fn(classifier_fn_id, rules_h, value_h) {
ResolveOutcome::Emit(value_h)
} else {
ResolveOutcome::Drop(value_h)
}
}
enum ResolveOutcome {
Emit(HandleId),
Drop(HandleId),
ResolvedNoValue,
NotReady,
}
#[must_use]
pub fn stratify_branch(
core: &Core,
binding: &Arc<dyn ProducerBinding>,
source: NodeId,
rules: NodeId,
classifier_fn_id: FnId,
) -> NodeId {
let binding_weak_for_state: Weak<dyn BindingBoundary> =
Arc::downgrade(binding) as Weak<dyn BindingBoundary>;
let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
let core_s = ctx.core();
let binding_s = ctx.core().binding();
let em = ctx.emitter();
let pid = ctx.node_id();
let state: Arc<Mutex<StratifyState>> = Arc::new(Mutex::new(StratifyState {
latest_rules: None,
source_value: None,
source_dirty: false,
rules_dirty: false,
source_phase2: false,
terminated: false,
binding_weak: binding_weak_for_state.clone(),
}));
let st_rules = state.clone();
let bb_rules: Arc<dyn BindingBoundary> = binding_s.clone();
let core_rules = em.clone();
let rules_sink: Sink = Arc::new(move |msgs| {
enum Act {
ReleaseOldRules(HandleId),
Emit(HandleId),
Drop(HandleId),
}
let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
{
let mut s = st_rules.lock();
if s.terminated {
return;
}
for m in msgs {
if s.terminated {
break;
}
match m.tier() {
1 => {
s.rules_dirty = true;
}
3 => {
if let Some(h) = m.payload_handle() {
bb_rules.retain_handle(h);
if let Some(old) = s.latest_rules.replace(h) {
actions.push(Act::ReleaseOldRules(old));
}
s.rules_dirty = false;
} else {
s.rules_dirty = false;
}
match try_resolve(&mut s, &bb_rules, classifier_fn_id) {
ResolveOutcome::Emit(h) => actions.push(Act::Emit(h)),
ResolveOutcome::Drop(h) => actions.push(Act::Drop(h)),
ResolveOutcome::ResolvedNoValue | ResolveOutcome::NotReady => {}
}
}
_ => {}
}
}
}
for a in actions {
match a {
Act::ReleaseOldRules(h) | Act::Drop(h) => bb_rules.release_handle(h),
Act::Emit(h) => core_rules.emit_or_defer(pid, h),
}
}
});
let rules_outcome = ctx.subscribe_to(rules, rules_sink);
let _ = rules_outcome;
let pre_seed = core_s.cache_of(rules);
if pre_seed != NO_HANDLE {
let already_set = state.lock().latest_rules.is_some();
if !already_set {
binding_s.retain_handle(pre_seed);
let mut s = state.lock();
if s.latest_rules.is_none() {
s.latest_rules = Some(pre_seed);
} else {
drop(s);
binding_s.release_handle(pre_seed);
}
}
}
let st_src = state.clone();
let bb_src: Arc<dyn BindingBoundary> = binding_s.clone();
let core_src = em.clone();
let source_sink: Sink = Arc::new(move |msgs| {
enum Act {
Emit(HandleId),
Drop(HandleId),
Complete,
Error(HandleId),
Teardown,
}
let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
{
let mut s = st_src.lock();
for m in msgs {
match m.tier() {
#[allow(clippy::collapsible_match)]
1 => {
if !s.terminated {
s.source_dirty = true;
}
}
3 => {
if s.terminated {
continue;
}
if let Some(h) = m.payload_handle() {
bb_src.retain_handle(h);
if let Some(prev) = s.source_value.replace(h) {
actions.push(Act::Drop(prev));
}
s.source_dirty = false;
s.source_phase2 = true;
} else {
if let Some(prev) = s.source_value.take() {
actions.push(Act::Drop(prev));
}
s.source_dirty = false;
s.source_phase2 = true;
}
match try_resolve(&mut s, &bb_src, classifier_fn_id) {
ResolveOutcome::Emit(h) => actions.push(Act::Emit(h)),
ResolveOutcome::Drop(h) => actions.push(Act::Drop(h)),
ResolveOutcome::ResolvedNoValue | ResolveOutcome::NotReady => {}
}
}
5 => {
if s.terminated {
continue;
}
if let Some(prev) = s.source_value.take() {
actions.push(Act::Drop(prev));
}
if let Some(h) = m.payload_handle() {
s.terminated = true;
bb_src.retain_handle(h);
actions.push(Act::Error(h));
} else {
s.terminated = true;
actions.push(Act::Complete);
}
}
6 => {
if let Some(prev) = s.source_value.take() {
actions.push(Act::Drop(prev));
}
s.terminated = true;
actions.push(Act::Teardown);
}
_ => {}
}
}
}
for a in actions {
match a {
Act::Emit(h) => core_src.emit_or_defer(pid, h),
Act::Drop(h) => bb_src.release_handle(h),
Act::Complete => core_src.complete_or_defer(pid),
Act::Error(h) => core_src.error_or_defer(pid, h),
Act::Teardown => {
let _ = core_src.defer(move |c| c.teardown(pid));
}
}
}
});
let src_outcome = ctx.subscribe_to(source, source_sink);
if matches!(src_outcome, SubscribeOutcome::Dead { .. }) {
let mut s = state.lock();
if !s.terminated {
s.terminated = true;
drop(s);
core_s.complete_or_defer(pid);
}
}
});
let fn_id = binding.register_producer_build(build);
core.register_producer(fn_id)
.expect("stratify_branch: register_producer failed")
}