Skip to main content

graphrefly_operators/
stratify.rs

1//! `stratify_branch` — substrate operator for classifier-routing.
2//!
3//! Substrate counterpart of TS `extra/composition/stratify.ts` (D199,
4//! Unit 5 Q9.2 of `SESSION-rust-port-layer-boundary.md`). The TS
5//! `stratify(name, source, rules, opts) -> Graph` factory composes N
6//! instances of this branch operator (one per rule) inside a single
7//! Graph; the Graph wrapper itself stays binding-side (presentation
8//! under D193). The classifier-routing logic — two-input subscribe,
9//! reactive-rules cache, two-dep DIRTY gating, per-fire dispatch — is
10//! the Rust substrate.
11//!
12//! # Semantics (mirrors TS `_addBranch` in stratify.ts)
13//!
14//! - Subscribes to BOTH `source` and `rules`. Rules first, then
15//!   source — so push-on-subscribe of rules' cached DATA arrives
16//!   before any source DATA the branch processes (R2.2.7).
17//! - **Two-dep DIRTY gating** (TS parity). On either dep's DIRTY, the
18//!   handler buffers and waits. When BOTH deps have settled in the
19//!   same wave (no pending DIRTY on either), the cached source value
20//!   is classified under the cached rules. This eliminates the
21//!   stale-rules race when both deps update inside a single
22//!   `core.batch()`.
23//! - `rules` DATA: replaces the cached rules handle. Retains the new
24//!   handle; releases the previously cached one. No downstream emit
25//!   ("future items only" — rules updates affect FUTURE source items
26//!   classified under them, not the current one).
27//! - `source` DATA: buffered (with retain) until both deps settle.
28//!   On resolve, invokes
29//!   `binding.invoke_stratify_classifier_fn(classifier_fn_id,
30//!   rules_handle, value_handle)`. If `true`, the buffered handle's
31//!   retain transfers to the emit queue. If `false`, the handle is
32//!   released. If no rules handle has arrived (sentinel state), the
33//!   buffered source DATA is dropped — matches TS "rule not found →
34//!   false".
35//! - `source` COMPLETE / ERROR / TEARDOWN: forwarded unchanged. F1
36//!   fix (QA 2026-05-14) — TEARDOWN forwarding restored to match TS
37//!   `if (depIndex === 0) actions.down([msg])` for tier 5 + 6.
38//! - `rules` COMPLETE / ERROR / TEARDOWN / INVALIDATE: silently
39//!   absorbed. The branch keeps its last-seen rules cache and
40//!   continues; rules' terminals don't propagate downstream of the
41//!   branch (TS parity). F5 fix (QA 2026-05-14) — rules INVALIDATE
42//!   intentionally does NOT clear `latest_rules`; matches TS, where
43//!   rules' INVALIDATE message is silently absorbed and `latestRules`
44//!   keeps its previous value.
45//! - On producer deactivation: the Drop impl on `StratifyState`
46//!   releases both the cached rules handle and any buffered source
47//!   value via `Weak<dyn BindingBoundary>` (leaf-op safe).
48//! - **N2 fix (QA 2026-05-14)** — under Phase H+ STRICT scheduling,
49//!   `subscribe_to(rules, ...)` may return `Deferred`, in which case
50//!   the post-subscribe sync push doesn't populate `latest_rules`
51//!   before the source's first DATA arrives. The build closure
52//!   defensively reads `core.cache_of(rules)` and pre-seeds the cache
53//!   when the sync push didn't. Matches the TS factory-time
54//!   `latestRules = rulesNode.cache` seeding.
55
56#![allow(clippy::too_many_lines)]
57
58use std::sync::{Arc, Weak};
59
60use parking_lot::Mutex;
61use smallvec::SmallVec;
62
63use graphrefly_core::{BindingBoundary, Core, FnId, HandleId, NodeId, Sink, NO_HANDLE};
64
65use crate::producer::{ProducerBinding, ProducerBuildFn, ProducerCtx, SubscribeOutcome};
66
67// =====================================================================
68// stratify_branch(source, rules, classifier_fn_id)
69// =====================================================================
70
71/// Internal state shared between source-sink, rules-sink, and the
72/// `Drop` impl. Tracks the cached rules handle, the buffered source
73/// value awaiting classification, and the two-dep DIRTY gating
74/// counters.
75///
76/// `clippy::struct_excessive_bools` is allowed because the four
77/// gating bools (`source_dirty`, `rules_dirty`, `source_phase2`,
78/// `terminated`) each represent an independent lifecycle / per-wave
79/// signal. A state-machine refactor would be more code without
80/// changing semantics; keep the flat shape per TS parity.
81#[allow(clippy::struct_excessive_bools)]
82struct StratifyState {
83    /// Latest rules handle, retain held while present. Replaced (with
84    /// release-of-old / retain-of-new) on each rules DATA. None means
85    /// either no rules emission has arrived yet OR rules emitted
86    /// RESOLVED-only without prior DATA.
87    latest_rules: Option<HandleId>,
88    /// Buffered source DATA value awaiting classification, retain held
89    /// while present. Set when source emits DATA and gating buffers
90    /// it; cleared on resolve (transferring retain to emit) or on
91    /// drop (releasing).
92    source_value: Option<HandleId>,
93    /// True after source emits DIRTY this wave; cleared on source
94    /// DATA/RESOLVED. Gating predicate: resolve only fires when both
95    /// `source_dirty` and `rules_dirty` are false.
96    source_dirty: bool,
97    /// True after rules emits DIRTY this wave; cleared on rules
98    /// DATA/RESOLVED. Symmetric to `source_dirty`.
99    rules_dirty: bool,
100    /// True after source DATA/RESOLVED arrives but resolve hasn't
101    /// fired yet (waiting for rules to settle). Reset by resolve.
102    source_phase2: bool,
103    /// Set on first source terminal (COMPLETE / ERROR / TEARDOWN).
104    /// Subsequent sink callbacks short-circuit so duplicate terminals
105    /// never leak retains.
106    terminated: bool,
107    /// Weak ref to the binding for the `Drop` impl. Held as `Weak`
108    /// rather than `Arc` so the state's drop doesn't keep the binding
109    /// alive past graph teardown.
110    binding_weak: Weak<dyn BindingBoundary>,
111}
112
113impl Drop for StratifyState {
114    fn drop(&mut self) {
115        let bb = self.binding_weak.upgrade();
116        let Some(bb) = bb else {
117            return;
118        };
119        if let Some(h) = self.latest_rules.take() {
120            bb.release_handle(h);
121        }
122        if let Some(h) = self.source_value.take() {
123            bb.release_handle(h);
124        }
125    }
126}
127
128/// Try to resolve a buffered source value under the cached rules.
129/// Returns:
130/// - `Some(handle)` if the classifier matched — caller emits DATA;
131///   the returned handle's retain has transferred to the caller.
132/// - `None` if there was nothing to resolve OR the classifier dropped
133///   the value (release already issued via the returned `released`
134///   handle in the second tuple slot).
135///
136/// Caller must hold `state` lock-released for `bb.release_handle` /
137/// `core.emit_or_defer` calls per leaf-op contract.
138#[allow(clippy::option_option)]
139fn try_resolve(
140    s: &mut StratifyState,
141    bb: &Arc<dyn BindingBoundary>,
142    classifier_fn_id: FnId,
143) -> ResolveOutcome {
144    if s.source_dirty || s.rules_dirty || !s.source_phase2 {
145        return ResolveOutcome::NotReady;
146    }
147    s.source_phase2 = false;
148    let Some(value_h) = s.source_value.take() else {
149        // Source emitted RESOLVED-only — nothing to classify.
150        return ResolveOutcome::ResolvedNoValue;
151    };
152    let Some(rules_h) = s.latest_rules else {
153        // No rules cache — drop.
154        return ResolveOutcome::Drop(value_h);
155    };
156    // Call the classifier under the state lock — consistent with
157    // valve's `predicate_each` call site. Classifier may re-enter
158    // binding for handle deref, but MUST NOT re-enter Core.
159    if bb.invoke_stratify_classifier_fn(classifier_fn_id, rules_h, value_h) {
160        // Match — transfer retain to emit.
161        ResolveOutcome::Emit(value_h)
162    } else {
163        // Miss — release.
164        ResolveOutcome::Drop(value_h)
165    }
166}
167
168enum ResolveOutcome {
169    /// Both deps settled; classifier matched. Emit the handle (caller
170    /// owns retain).
171    Emit(HandleId),
172    /// Both deps settled; classifier missed OR no rules cache. Drop
173    /// the handle (caller releases retain).
174    Drop(HandleId),
175    /// Both deps settled; source emitted RESOLVED-only. Nothing to
176    /// emit; nothing to release.
177    ResolvedNoValue,
178    /// Either dep still dirty OR no source DATA buffered yet. No
179    /// action.
180    NotReady,
181}
182
183/// Single classifier-routing branch. Each rule in a TS `stratify(...)
184/// -> Graph` becomes one instance of this operator.
185///
186/// `classifier_fn_id` is a binding-registered closure of shape
187/// `(rules_handle, value_handle) -> bool`. The binding-side closure
188/// dereferences `rules_handle` to the latest rules array, looks up the
189/// branch's rule by name (captured in the closure), and runs the
190/// rule's `classify(value)` predicate. Returning `false` for "rule not
191/// found" or "classifier threw" matches TS semantics.
192///
193/// # Returns
194///
195/// The produced node's `NodeId`. The node emits DATA only for values
196/// whose classifier returned `true` after BOTH source and rules have
197/// settled in the same wave.
198#[must_use]
199pub fn stratify_branch(
200    core: &Core,
201    binding: &Arc<dyn ProducerBinding>,
202    source: NodeId,
203    rules: NodeId,
204    classifier_fn_id: FnId,
205) -> NodeId {
206    let core_weak = core.weak_handle();
207    let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
208    let binding_weak_for_state: Weak<dyn BindingBoundary> =
209        Arc::downgrade(binding) as Weak<dyn BindingBoundary>;
210
211    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
212        let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
213            return;
214        };
215        let pid = ctx.node_id();
216        let state: Arc<Mutex<StratifyState>> = Arc::new(Mutex::new(StratifyState {
217            latest_rules: None,
218            source_value: None,
219            source_dirty: false,
220            rules_dirty: false,
221            source_phase2: false,
222            terminated: false,
223            binding_weak: binding_weak_for_state.clone(),
224        }));
225
226        // --- rules sink ---
227        let st_rules = state.clone();
228        let bb_rules: Arc<dyn BindingBoundary> = binding_s.clone();
229        let core_rules = core_s.clone();
230        let rules_sink: Sink = Arc::new(move |msgs| {
231            enum Act {
232                ReleaseOldRules(HandleId),
233                Emit(HandleId),
234                Drop(HandleId),
235            }
236            let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
237            {
238                let mut s = st_rules.lock();
239                if s.terminated {
240                    return;
241                }
242                for m in msgs {
243                    if s.terminated {
244                        break;
245                    }
246                    match m.tier() {
247                        1 => {
248                            // Rules DIRTY — gating signal.
249                            s.rules_dirty = true;
250                        }
251                        3 => {
252                            if let Some(h) = m.payload_handle() {
253                                // Rules DATA — replace cached rules.
254                                bb_rules.retain_handle(h);
255                                if let Some(old) = s.latest_rules.replace(h) {
256                                    actions.push(Act::ReleaseOldRules(old));
257                                }
258                                s.rules_dirty = false;
259                            } else {
260                                // Rules RESOLVED-only: cache unchanged.
261                                s.rules_dirty = false;
262                            }
263                            // Try resolve any buffered source value.
264                            match try_resolve(&mut s, &bb_rules, classifier_fn_id) {
265                                ResolveOutcome::Emit(h) => actions.push(Act::Emit(h)),
266                                ResolveOutcome::Drop(h) => actions.push(Act::Drop(h)),
267                                ResolveOutcome::ResolvedNoValue | ResolveOutcome::NotReady => {}
268                            }
269                        }
270                        // Tier 4 (INVALIDATE), Tier 5 (COMPLETE/ERROR),
271                        // Tier 6 (TEARDOWN) — silently absorbed (F5 doc).
272                        // Branch keeps cached rules and continues; rules
273                        // terminals do not propagate downstream.
274                        _ => {}
275                    }
276                }
277            }
278            for a in actions {
279                match a {
280                    Act::ReleaseOldRules(h) | Act::Drop(h) => bb_rules.release_handle(h),
281                    Act::Emit(h) => core_rules.emit_or_defer(pid, h),
282                }
283            }
284        });
285
286        // Subscribe to rules FIRST so push-on-subscribe of rules'
287        // cached DATA arrives before any source DATA the branch
288        // processes (R2.2.7).
289        let rules_outcome = ctx.subscribe_to(rules, rules_sink);
290        let _ = rules_outcome;
291
292        // N2 — defensive pre-seed for the Deferred / Dead path where
293        // the post-subscribe sync push didn't populate latest_rules
294        // before source DATA could arrive. Matches TS factory-time
295        // `latestRules = rulesNode.cache` seeding.
296        let pre_seed = core_s.cache_of(rules);
297        if pre_seed != NO_HANDLE {
298            let already_set = state.lock().latest_rules.is_some();
299            if !already_set {
300                // Lock-released retain per leaf-op contract, then
301                // re-check (covers the race where the rules sink
302                // fires between drop and re-lock, though under the
303                // build closure's partition lock this race is not
304                // expected today).
305                binding_s.retain_handle(pre_seed);
306                let mut s = state.lock();
307                if s.latest_rules.is_none() {
308                    s.latest_rules = Some(pre_seed);
309                } else {
310                    // Sink fired between drop+re-lock. Release our
311                    // pre-seed retain.
312                    drop(s);
313                    binding_s.release_handle(pre_seed);
314                }
315            }
316        }
317
318        // --- source sink ---
319        let st_src = state.clone();
320        let bb_src: Arc<dyn BindingBoundary> = binding_s.clone();
321        let core_src = core_s.clone();
322        let source_sink: Sink = Arc::new(move |msgs| {
323            enum Act {
324                Emit(HandleId),
325                Drop(HandleId),
326                Complete,
327                Error(HandleId),
328                Teardown,
329            }
330            let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
331            {
332                let mut s = st_src.lock();
333                for m in msgs {
334                    match m.tier() {
335                        // Source DIRTY — gating signal. Skip if
336                        // already terminated (post-terminal DIRTY
337                        // would be spec-illegal but harmless to
338                        // ignore). Match-guard form preferred over
339                        // collapsed `1 if !s.terminated => ...` for
340                        // readability alongside the other tiers'
341                        // explicit `if s.terminated { continue; }`
342                        // pattern.
343                        #[allow(clippy::collapsible_match)]
344                        1 => {
345                            if !s.terminated {
346                                s.source_dirty = true;
347                            }
348                        }
349                        3 => {
350                            if s.terminated {
351                                continue;
352                            }
353                            if let Some(h) = m.payload_handle() {
354                                // Source DATA — buffer with retain.
355                                // If a previous source value was
356                                // buffered (multi-emit batch), release
357                                // it (use-latest semantic).
358                                bb_src.retain_handle(h);
359                                if let Some(prev) = s.source_value.replace(h) {
360                                    actions.push(Act::Drop(prev));
361                                }
362                                s.source_dirty = false;
363                                s.source_phase2 = true;
364                            } else {
365                                // Source RESOLVED-only — release any
366                                // buffered value (this wave settled
367                                // without DATA).
368                                if let Some(prev) = s.source_value.take() {
369                                    actions.push(Act::Drop(prev));
370                                }
371                                s.source_dirty = false;
372                                s.source_phase2 = true;
373                            }
374                            // Try resolve under both-deps-settled
375                            // gating.
376                            match try_resolve(&mut s, &bb_src, classifier_fn_id) {
377                                ResolveOutcome::Emit(h) => actions.push(Act::Emit(h)),
378                                ResolveOutcome::Drop(h) => actions.push(Act::Drop(h)),
379                                ResolveOutcome::ResolvedNoValue | ResolveOutcome::NotReady => {}
380                            }
381                        }
382                        5 => {
383                            if s.terminated {
384                                continue;
385                            }
386                            // Source COMPLETE / ERROR — terminate +
387                            // forward. Release any buffered source
388                            // value first (we're terminating before
389                            // resolving it).
390                            if let Some(prev) = s.source_value.take() {
391                                actions.push(Act::Drop(prev));
392                            }
393                            if let Some(h) = m.payload_handle() {
394                                s.terminated = true;
395                                bb_src.retain_handle(h);
396                                actions.push(Act::Error(h));
397                            } else {
398                                s.terminated = true;
399                                actions.push(Act::Complete);
400                            }
401                        }
402                        6 => {
403                            // F1 — Source TEARDOWN forward (TS
404                            // parity). ALWAYS forwards, even after
405                            // COMPLETE in the same wave: per spec
406                            // R2.6.4 the framework auto-emits
407                            // COMPLETE before TEARDOWN on a non-
408                            // terminated state node, so suppressing
409                            // TEARDOWN under a `terminated` check
410                            // would silently swallow the lifecycle
411                            // signal that downstream subscribers
412                            // expect. Buffered source value (if any)
413                            // is released defensively in case the
414                            // tier 5 arm above didn't run (would be
415                            // spec-illegal, but defensive).
416                            if let Some(prev) = s.source_value.take() {
417                                actions.push(Act::Drop(prev));
418                            }
419                            s.terminated = true;
420                            actions.push(Act::Teardown);
421                        }
422                        _ => {}
423                    }
424                }
425            }
426            for a in actions {
427                match a {
428                    Act::Emit(h) => core_src.emit_or_defer(pid, h),
429                    Act::Drop(h) => bb_src.release_handle(h),
430                    Act::Complete => core_src.complete_or_defer(pid),
431                    Act::Error(h) => core_src.error_or_defer(pid, h),
432                    Act::Teardown => core_src.teardown_or_defer(pid),
433                }
434            }
435        });
436
437        let src_outcome = ctx.subscribe_to(source, source_sink);
438        if matches!(src_outcome, SubscribeOutcome::Dead { .. }) {
439            let mut s = state.lock();
440            if !s.terminated {
441                s.terminated = true;
442                drop(s);
443                core_s.complete_or_defer(pid);
444            }
445        }
446    });
447
448    let fn_id = binding.register_producer_build(build);
449    core.register_producer(fn_id)
450        .expect("stratify_branch: register_producer failed")
451}