Skip to main content

graphrefly_operators/
stratify.rs

1// D248: post-S2c the substrate is `!Send + !Sync` single-owner Core; the
2// Sink/TopologySink callbacks were deliberately relaxed to `Arc<dyn Fn>`
3// (dropped `+ Send + Sync`). Rc would suffice and is the architecturally
4// correct type for inherently single-owner sinks — the Arc→Rc cleanup is
5// a separate slice tracked in porting-deferred.md. Until then, `Arc` is
6// over-conservative but correct, and this file's Arc<Sink> sites cite
7// the deliberate D248 relaxation, not a missed Send+Sync bound.
8#![allow(clippy::arc_with_non_send_sync)]
9
10//! `stratify_branch` — substrate operator for classifier-routing.
11//!
12//! Substrate counterpart of TS `extra/composition/stratify.ts` (D199,
13//! Unit 5 Q9.2 of `SESSION-rust-port-layer-boundary.md`). The TS
14//! `stratify(name, source, rules, opts) -> Graph` factory composes N
15//! instances of this branch operator (one per rule) inside a single
16//! Graph; the Graph wrapper itself stays binding-side (presentation
17//! under D193). The classifier-routing logic — two-input subscribe,
18//! reactive-rules cache, two-dep DIRTY gating, per-fire dispatch — is
19//! the Rust substrate.
20//!
21//! # Semantics (mirrors TS `_addBranch` in stratify.ts)
22//!
23//! - Subscribes to BOTH `source` and `rules`. Rules first, then
24//!   source — so push-on-subscribe of rules' cached DATA arrives
25//!   before any source DATA the branch processes (R2.2.7).
26//! - **Two-dep DIRTY gating** (TS parity). On either dep's DIRTY, the
27//!   handler buffers and waits. When BOTH deps have settled in the
28//!   same wave (no pending DIRTY on either), the cached source value
29//!   is classified under the cached rules. This eliminates the
30//!   stale-rules race when both deps update inside a single
31//!   `core.batch()`.
32//! - `rules` DATA: replaces the cached rules handle. Retains the new
33//!   handle; releases the previously cached one. No downstream emit
34//!   ("future items only" — rules updates affect FUTURE source items
35//!   classified under them, not the current one).
36//! - `source` DATA: buffered (with retain) until both deps settle.
37//!   On resolve, invokes
38//!   `binding.invoke_stratify_classifier_fn(classifier_fn_id,
39//!   rules_handle, value_handle)`. If `true`, the buffered handle's
40//!   retain transfers to the emit queue. If `false`, the handle is
41//!   released. If no rules handle has arrived (sentinel state), the
42//!   buffered source DATA is dropped — matches TS "rule not found →
43//!   false".
44//! - `source` COMPLETE / ERROR / TEARDOWN: forwarded unchanged. F1
45//!   fix (QA 2026-05-14) — TEARDOWN forwarding restored to match TS
46//!   `if (depIndex === 0) actions.down([msg])` for tier 5 + 6.
47//! - `rules` COMPLETE / ERROR / TEARDOWN / INVALIDATE: silently
48//!   absorbed. The branch keeps its last-seen rules cache and
49//!   continues; rules' terminals don't propagate downstream of the
50//!   branch (TS parity). F5 fix (QA 2026-05-14) — rules INVALIDATE
51//!   intentionally does NOT clear `latest_rules`; matches TS, where
52//!   rules' INVALIDATE message is silently absorbed and `latestRules`
53//!   keeps its previous value.
54//! - On producer deactivation: the Drop impl on `StratifyState`
55//!   releases both the cached rules handle and any buffered source
56//!   value via `Weak<dyn BindingBoundary>` (leaf-op safe).
57//! - **N2 fix (QA 2026-05-14)** — under Phase H+ STRICT scheduling,
58//!   `subscribe_to(rules, ...)` may return `Deferred`, in which case
59//!   the post-subscribe sync push doesn't populate `latest_rules`
60//!   before the source's first DATA arrives. The build closure
61//!   defensively reads `core.cache_of(rules)` and pre-seeds the cache
62//!   when the sync push didn't. Matches the TS factory-time
63//!   `latestRules = rulesNode.cache` seeding.
64
65#![allow(clippy::too_many_lines)]
66
67use std::sync::{Arc, Weak};
68
69use parking_lot::Mutex;
70use smallvec::SmallVec;
71
72use graphrefly_core::{BindingBoundary, Core, FnId, HandleId, NodeId, Sink, NO_HANDLE};
73
74use crate::producer::{ProducerBinding, ProducerBuildFn, ProducerCtx, SubscribeOutcome};
75
76// =====================================================================
77// stratify_branch(source, rules, classifier_fn_id)
78// =====================================================================
79
80/// Internal state shared between source-sink, rules-sink, and the
81/// `Drop` impl. Tracks the cached rules handle, the buffered source
82/// value awaiting classification, and the two-dep DIRTY gating
83/// counters.
84///
85/// `clippy::struct_excessive_bools` is allowed because the four
86/// gating bools (`source_dirty`, `rules_dirty`, `source_phase2`,
87/// `terminated`) each represent an independent lifecycle / per-wave
88/// signal. A state-machine refactor would be more code without
89/// changing semantics; keep the flat shape per TS parity.
90#[allow(clippy::struct_excessive_bools)]
91struct StratifyState {
92    /// Latest rules handle, retain held while present. Replaced (with
93    /// release-of-old / retain-of-new) on each rules DATA. None means
94    /// either no rules emission has arrived yet OR rules emitted
95    /// RESOLVED-only without prior DATA.
96    latest_rules: Option<HandleId>,
97    /// Buffered source DATA value awaiting classification, retain held
98    /// while present. Set when source emits DATA and gating buffers
99    /// it; cleared on resolve (transferring retain to emit) or on
100    /// drop (releasing).
101    source_value: Option<HandleId>,
102    /// True after source emits DIRTY this wave; cleared on source
103    /// DATA/RESOLVED. Gating predicate: resolve only fires when both
104    /// `source_dirty` and `rules_dirty` are false.
105    source_dirty: bool,
106    /// True after rules emits DIRTY this wave; cleared on rules
107    /// DATA/RESOLVED. Symmetric to `source_dirty`.
108    rules_dirty: bool,
109    /// True after source DATA/RESOLVED arrives but resolve hasn't
110    /// fired yet (waiting for rules to settle). Reset by resolve.
111    source_phase2: bool,
112    /// Set on first source terminal (COMPLETE / ERROR / TEARDOWN).
113    /// Subsequent sink callbacks short-circuit so duplicate terminals
114    /// never leak retains.
115    terminated: bool,
116    /// Weak ref to the binding for the `Drop` impl. Held as `Weak`
117    /// rather than `Arc` so the state's drop doesn't keep the binding
118    /// alive past graph teardown.
119    binding_weak: Weak<dyn BindingBoundary>,
120}
121
122impl Drop for StratifyState {
123    fn drop(&mut self) {
124        let bb = self.binding_weak.upgrade();
125        let Some(bb) = bb else {
126            return;
127        };
128        if let Some(h) = self.latest_rules.take() {
129            bb.release_handle(h);
130        }
131        if let Some(h) = self.source_value.take() {
132            bb.release_handle(h);
133        }
134    }
135}
136
137/// Try to resolve a buffered source value under the cached rules.
138/// Returns:
139/// - `Some(handle)` if the classifier matched — caller emits DATA;
140///   the returned handle's retain has transferred to the caller.
141/// - `None` if there was nothing to resolve OR the classifier dropped
142///   the value (release already issued via the returned `released`
143///   handle in the second tuple slot).
144///
145/// Caller must hold `state` lock-released for `bb.release_handle` /
146/// `core.emit_or_defer` calls per leaf-op contract.
147#[allow(clippy::option_option)]
148fn try_resolve(
149    s: &mut StratifyState,
150    bb: &Arc<dyn BindingBoundary>,
151    classifier_fn_id: FnId,
152) -> ResolveOutcome {
153    if s.source_dirty || s.rules_dirty || !s.source_phase2 {
154        return ResolveOutcome::NotReady;
155    }
156    s.source_phase2 = false;
157    let Some(value_h) = s.source_value.take() else {
158        // Source emitted RESOLVED-only — nothing to classify.
159        return ResolveOutcome::ResolvedNoValue;
160    };
161    let Some(rules_h) = s.latest_rules else {
162        // No rules cache — drop.
163        return ResolveOutcome::Drop(value_h);
164    };
165    // Call the classifier under the state lock — consistent with
166    // valve's `predicate_each` call site. Classifier may re-enter
167    // binding for handle deref, but MUST NOT re-enter Core.
168    if bb.invoke_stratify_classifier_fn(classifier_fn_id, rules_h, value_h) {
169        // Match — transfer retain to emit.
170        ResolveOutcome::Emit(value_h)
171    } else {
172        // Miss — release.
173        ResolveOutcome::Drop(value_h)
174    }
175}
176
177enum ResolveOutcome {
178    /// Both deps settled; classifier matched. Emit the handle (caller
179    /// owns retain).
180    Emit(HandleId),
181    /// Both deps settled; classifier missed OR no rules cache. Drop
182    /// the handle (caller releases retain).
183    Drop(HandleId),
184    /// Both deps settled; source emitted RESOLVED-only. Nothing to
185    /// emit; nothing to release.
186    ResolvedNoValue,
187    /// Either dep still dirty OR no source DATA buffered yet. No
188    /// action.
189    NotReady,
190}
191
192/// Single classifier-routing branch. Each rule in a TS `stratify(...)
193/// -> Graph` becomes one instance of this operator.
194///
195/// `classifier_fn_id` is a binding-registered closure of shape
196/// `(rules_handle, value_handle) -> bool`. The binding-side closure
197/// dereferences `rules_handle` to the latest rules array, looks up the
198/// branch's rule by name (captured in the closure), and runs the
199/// rule's `classify(value)` predicate. Returning `false` for "rule not
200/// found" or "classifier threw" matches TS semantics.
201///
202/// # Returns
203///
204/// The produced node's `NodeId`. The node emits DATA only for values
205/// whose classifier returned `true` after BOTH source and rules have
206/// settled in the same wave.
207#[must_use]
208pub fn stratify_branch(
209    core: &Core,
210    binding: &Arc<dyn ProducerBinding>,
211    source: NodeId,
212    rules: NodeId,
213    classifier_fn_id: FnId,
214) -> NodeId {
215    let binding_weak_for_state: Weak<dyn BindingBoundary> =
216        Arc::downgrade(binding) as Weak<dyn BindingBoundary>;
217
218    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
219        // S2b/D231: build-side `&Core` via ctx; sinks use `em`.
220        let core_s = ctx.core();
221        let binding_s = ctx.core().binding();
222        let em = ctx.emitter();
223        let pid = ctx.node_id();
224        let state: Arc<Mutex<StratifyState>> = Arc::new(Mutex::new(StratifyState {
225            latest_rules: None,
226            source_value: None,
227            source_dirty: false,
228            rules_dirty: false,
229            source_phase2: false,
230            terminated: false,
231            binding_weak: binding_weak_for_state.clone(),
232        }));
233
234        // --- rules sink ---
235        let st_rules = state.clone();
236        let bb_rules: Arc<dyn BindingBoundary> = binding_s.clone();
237        let core_rules = em.clone();
238        let rules_sink: Sink = Arc::new(move |msgs| {
239            enum Act {
240                ReleaseOldRules(HandleId),
241                Emit(HandleId),
242                Drop(HandleId),
243            }
244            let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
245            {
246                let mut s = st_rules.lock();
247                if s.terminated {
248                    return;
249                }
250                for m in msgs {
251                    if s.terminated {
252                        break;
253                    }
254                    match m.tier() {
255                        1 => {
256                            // Rules DIRTY — gating signal.
257                            s.rules_dirty = true;
258                        }
259                        3 => {
260                            if let Some(h) = m.payload_handle() {
261                                // Rules DATA — replace cached rules.
262                                bb_rules.retain_handle(h);
263                                if let Some(old) = s.latest_rules.replace(h) {
264                                    actions.push(Act::ReleaseOldRules(old));
265                                }
266                                s.rules_dirty = false;
267                            } else {
268                                // Rules RESOLVED-only: cache unchanged.
269                                s.rules_dirty = false;
270                            }
271                            // Try resolve any buffered source value.
272                            match try_resolve(&mut s, &bb_rules, classifier_fn_id) {
273                                ResolveOutcome::Emit(h) => actions.push(Act::Emit(h)),
274                                ResolveOutcome::Drop(h) => actions.push(Act::Drop(h)),
275                                ResolveOutcome::ResolvedNoValue | ResolveOutcome::NotReady => {}
276                            }
277                        }
278                        // Tier 4 (INVALIDATE), Tier 5 (COMPLETE/ERROR),
279                        // Tier 6 (TEARDOWN) — silently absorbed (F5 doc).
280                        // Branch keeps cached rules and continues; rules
281                        // terminals do not propagate downstream.
282                        _ => {}
283                    }
284                }
285            }
286            for a in actions {
287                match a {
288                    Act::ReleaseOldRules(h) | Act::Drop(h) => bb_rules.release_handle(h),
289                    Act::Emit(h) => core_rules.emit_or_defer(pid, h),
290                }
291            }
292        });
293
294        // Subscribe to rules FIRST so push-on-subscribe of rules'
295        // cached DATA arrives before any source DATA the branch
296        // processes (R2.2.7).
297        let rules_outcome = ctx.subscribe_to(rules, rules_sink);
298        let _ = rules_outcome;
299
300        // N2 — defensive pre-seed for the Deferred / Dead path where
301        // the post-subscribe sync push didn't populate latest_rules
302        // before source DATA could arrive. Matches TS factory-time
303        // `latestRules = rulesNode.cache` seeding.
304        let pre_seed = core_s.cache_of(rules);
305        if pre_seed != NO_HANDLE {
306            let already_set = state.lock().latest_rules.is_some();
307            if !already_set {
308                // Lock-released retain per leaf-op contract, then
309                // re-check (covers the race where the rules sink
310                // fires between drop and re-lock, though under the
311                // build closure's partition lock this race is not
312                // expected today).
313                binding_s.retain_handle(pre_seed);
314                let mut s = state.lock();
315                if s.latest_rules.is_none() {
316                    s.latest_rules = Some(pre_seed);
317                } else {
318                    // Sink fired between drop+re-lock. Release our
319                    // pre-seed retain.
320                    drop(s);
321                    binding_s.release_handle(pre_seed);
322                }
323            }
324        }
325
326        // --- source sink ---
327        let st_src = state.clone();
328        let bb_src: Arc<dyn BindingBoundary> = binding_s.clone();
329        let core_src = em.clone();
330        let source_sink: Sink = Arc::new(move |msgs| {
331            enum Act {
332                Emit(HandleId),
333                Drop(HandleId),
334                Complete,
335                Error(HandleId),
336                Teardown,
337            }
338            let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
339            {
340                let mut s = st_src.lock();
341                for m in msgs {
342                    match m.tier() {
343                        // Source DIRTY — gating signal. Skip if
344                        // already terminated (post-terminal DIRTY
345                        // would be spec-illegal but harmless to
346                        // ignore). Match-guard form preferred over
347                        // collapsed `1 if !s.terminated => ...` for
348                        // readability alongside the other tiers'
349                        // explicit `if s.terminated { continue; }`
350                        // pattern.
351                        #[allow(clippy::collapsible_match)]
352                        1 => {
353                            if !s.terminated {
354                                s.source_dirty = true;
355                            }
356                        }
357                        3 => {
358                            if s.terminated {
359                                continue;
360                            }
361                            if let Some(h) = m.payload_handle() {
362                                // Source DATA — buffer with retain.
363                                // If a previous source value was
364                                // buffered (multi-emit batch), release
365                                // it (use-latest semantic).
366                                bb_src.retain_handle(h);
367                                if let Some(prev) = s.source_value.replace(h) {
368                                    actions.push(Act::Drop(prev));
369                                }
370                                s.source_dirty = false;
371                                s.source_phase2 = true;
372                            } else {
373                                // Source RESOLVED-only — release any
374                                // buffered value (this wave settled
375                                // without DATA).
376                                if let Some(prev) = s.source_value.take() {
377                                    actions.push(Act::Drop(prev));
378                                }
379                                s.source_dirty = false;
380                                s.source_phase2 = true;
381                            }
382                            // Try resolve under both-deps-settled
383                            // gating.
384                            match try_resolve(&mut s, &bb_src, classifier_fn_id) {
385                                ResolveOutcome::Emit(h) => actions.push(Act::Emit(h)),
386                                ResolveOutcome::Drop(h) => actions.push(Act::Drop(h)),
387                                ResolveOutcome::ResolvedNoValue | ResolveOutcome::NotReady => {}
388                            }
389                        }
390                        5 => {
391                            if s.terminated {
392                                continue;
393                            }
394                            // Source COMPLETE / ERROR — terminate +
395                            // forward. Release any buffered source
396                            // value first (we're terminating before
397                            // resolving it).
398                            if let Some(prev) = s.source_value.take() {
399                                actions.push(Act::Drop(prev));
400                            }
401                            if let Some(h) = m.payload_handle() {
402                                s.terminated = true;
403                                bb_src.retain_handle(h);
404                                actions.push(Act::Error(h));
405                            } else {
406                                s.terminated = true;
407                                actions.push(Act::Complete);
408                            }
409                        }
410                        6 => {
411                            // F1 — Source TEARDOWN forward (TS
412                            // parity). ALWAYS forwards, even after
413                            // COMPLETE in the same wave: per spec
414                            // R2.6.4 the framework auto-emits
415                            // COMPLETE before TEARDOWN on a non-
416                            // terminated state node, so suppressing
417                            // TEARDOWN under a `terminated` check
418                            // would silently swallow the lifecycle
419                            // signal that downstream subscribers
420                            // expect. Buffered source value (if any)
421                            // is released defensively in case the
422                            // tier 5 arm above didn't run (would be
423                            // spec-illegal, but defensive).
424                            if let Some(prev) = s.source_value.take() {
425                                actions.push(Act::Drop(prev));
426                            }
427                            s.terminated = true;
428                            actions.push(Act::Teardown);
429                        }
430                        _ => {}
431                    }
432                }
433            }
434            for a in actions {
435                match a {
436                    Act::Emit(h) => core_src.emit_or_defer(pid, h),
437                    Act::Drop(h) => bb_src.release_handle(h),
438                    Act::Complete => core_src.complete_or_defer(pid),
439                    Act::Error(h) => core_src.error_or_defer(pid, h),
440                    Act::Teardown => {
441                        // D234: sink-side terminal forward via em.defer.
442                        let _ = core_src.defer(move |c| c.teardown(pid));
443                    }
444                }
445            }
446        });
447
448        let src_outcome = ctx.subscribe_to(source, source_sink);
449        if matches!(src_outcome, SubscribeOutcome::Dead { .. }) {
450            let mut s = state.lock();
451            if !s.terminated {
452                s.terminated = true;
453                drop(s);
454                core_s.complete_or_defer(pid);
455            }
456        }
457    });
458
459    let fn_id = binding.register_producer_build(build);
460    core.register_producer(fn_id)
461        .expect("stratify_branch: register_producer failed")
462}