graphrefly_operators/stratify.rs
1//! `stratify_branch` — substrate operator for classifier-routing.
2//!
3//! Substrate counterpart of TS `extra/composition/stratify.ts` (D199,
4//! Unit 5 Q9.2 of `SESSION-rust-port-layer-boundary.md`). The TS
5//! `stratify(name, source, rules, opts) -> Graph` factory composes N
6//! instances of this branch operator (one per rule) inside a single
7//! Graph; the Graph wrapper itself stays binding-side (presentation
8//! under D193). The classifier-routing logic — two-input subscribe,
9//! reactive-rules cache, two-dep DIRTY gating, per-fire dispatch — is
10//! the Rust substrate.
11//!
12//! # Semantics (mirrors TS `_addBranch` in stratify.ts)
13//!
14//! - Subscribes to BOTH `source` and `rules`. Rules first, then
15//! source — so push-on-subscribe of rules' cached DATA arrives
16//! before any source DATA the branch processes (R2.2.7).
17//! - **Two-dep DIRTY gating** (TS parity). On either dep's DIRTY, the
18//! handler buffers and waits. When BOTH deps have settled in the
19//! same wave (no pending DIRTY on either), the cached source value
20//! is classified under the cached rules. This eliminates the
21//! stale-rules race when both deps update inside a single
22//! `core.batch()`.
23//! - `rules` DATA: replaces the cached rules handle. Retains the new
24//! handle; releases the previously cached one. No downstream emit
25//! ("future items only" — rules updates affect FUTURE source items
26//! classified under them, not the current one).
27//! - `source` DATA: buffered (with retain) until both deps settle.
28//! On resolve, invokes
29//! `binding.invoke_stratify_classifier_fn(classifier_fn_id,
30//! rules_handle, value_handle)`. If `true`, the buffered handle's
31//! retain transfers to the emit queue. If `false`, the handle is
32//! released. If no rules handle has arrived (sentinel state), the
33//! buffered source DATA is dropped — matches TS "rule not found →
34//! false".
35//! - `source` COMPLETE / ERROR / TEARDOWN: forwarded unchanged. F1
36//! fix (QA 2026-05-14) — TEARDOWN forwarding restored to match TS
37//! `if (depIndex === 0) actions.down([msg])` for tier 5 + 6.
38//! - `rules` COMPLETE / ERROR / TEARDOWN / INVALIDATE: silently
39//! absorbed. The branch keeps its last-seen rules cache and
40//! continues; rules' terminals don't propagate downstream of the
41//! branch (TS parity). F5 fix (QA 2026-05-14) — rules INVALIDATE
42//! intentionally does NOT clear `latest_rules`; matches TS, where
43//! rules' INVALIDATE message is silently absorbed and `latestRules`
44//! keeps its previous value.
45//! - On producer deactivation: the Drop impl on `StratifyState`
46//! releases both the cached rules handle and any buffered source
47//! value via `Weak<dyn BindingBoundary>` (leaf-op safe).
48//! - **N2 fix (QA 2026-05-14)** — under Phase H+ STRICT scheduling,
49//! `subscribe_to(rules, ...)` may return `Deferred`, in which case
50//! the post-subscribe sync push doesn't populate `latest_rules`
51//! before the source's first DATA arrives. The build closure
52//! defensively reads `core.cache_of(rules)` and pre-seeds the cache
53//! when the sync push didn't. Matches the TS factory-time
54//! `latestRules = rulesNode.cache` seeding.
55
56#![allow(clippy::too_many_lines)]
57
58use std::sync::{Arc, Weak};
59
60use parking_lot::Mutex;
61use smallvec::SmallVec;
62
63use graphrefly_core::{BindingBoundary, Core, FnId, HandleId, NodeId, Sink, NO_HANDLE};
64
65use crate::producer::{ProducerBinding, ProducerBuildFn, ProducerCtx, SubscribeOutcome};
66
67// =====================================================================
68// stratify_branch(source, rules, classifier_fn_id)
69// =====================================================================
70
71/// Internal state shared between source-sink, rules-sink, and the
72/// `Drop` impl. Tracks the cached rules handle, the buffered source
73/// value awaiting classification, and the two-dep DIRTY gating
74/// counters.
75///
76/// `clippy::struct_excessive_bools` is allowed because the four
77/// gating bools (`source_dirty`, `rules_dirty`, `source_phase2`,
78/// `terminated`) each represent an independent lifecycle / per-wave
79/// signal. A state-machine refactor would be more code without
80/// changing semantics; keep the flat shape per TS parity.
81#[allow(clippy::struct_excessive_bools)]
82struct StratifyState {
83 /// Latest rules handle, retain held while present. Replaced (with
84 /// release-of-old / retain-of-new) on each rules DATA. None means
85 /// either no rules emission has arrived yet OR rules emitted
86 /// RESOLVED-only without prior DATA.
87 latest_rules: Option<HandleId>,
88 /// Buffered source DATA value awaiting classification, retain held
89 /// while present. Set when source emits DATA and gating buffers
90 /// it; cleared on resolve (transferring retain to emit) or on
91 /// drop (releasing).
92 source_value: Option<HandleId>,
93 /// True after source emits DIRTY this wave; cleared on source
94 /// DATA/RESOLVED. Gating predicate: resolve only fires when both
95 /// `source_dirty` and `rules_dirty` are false.
96 source_dirty: bool,
97 /// True after rules emits DIRTY this wave; cleared on rules
98 /// DATA/RESOLVED. Symmetric to `source_dirty`.
99 rules_dirty: bool,
100 /// True after source DATA/RESOLVED arrives but resolve hasn't
101 /// fired yet (waiting for rules to settle). Reset by resolve.
102 source_phase2: bool,
103 /// Set on first source terminal (COMPLETE / ERROR / TEARDOWN).
104 /// Subsequent sink callbacks short-circuit so duplicate terminals
105 /// never leak retains.
106 terminated: bool,
107 /// Weak ref to the binding for the `Drop` impl. Held as `Weak`
108 /// rather than `Arc` so the state's drop doesn't keep the binding
109 /// alive past graph teardown.
110 binding_weak: Weak<dyn BindingBoundary>,
111}
112
113impl Drop for StratifyState {
114 fn drop(&mut self) {
115 let bb = self.binding_weak.upgrade();
116 let Some(bb) = bb else {
117 return;
118 };
119 if let Some(h) = self.latest_rules.take() {
120 bb.release_handle(h);
121 }
122 if let Some(h) = self.source_value.take() {
123 bb.release_handle(h);
124 }
125 }
126}
127
128/// Try to resolve a buffered source value under the cached rules.
129/// Returns:
130/// - `Some(handle)` if the classifier matched — caller emits DATA;
131/// the returned handle's retain has transferred to the caller.
132/// - `None` if there was nothing to resolve OR the classifier dropped
133/// the value (release already issued via the returned `released`
134/// handle in the second tuple slot).
135///
136/// Caller must hold `state` lock-released for `bb.release_handle` /
137/// `core.emit_or_defer` calls per leaf-op contract.
138#[allow(clippy::option_option)]
139fn try_resolve(
140 s: &mut StratifyState,
141 bb: &Arc<dyn BindingBoundary>,
142 classifier_fn_id: FnId,
143) -> ResolveOutcome {
144 if s.source_dirty || s.rules_dirty || !s.source_phase2 {
145 return ResolveOutcome::NotReady;
146 }
147 s.source_phase2 = false;
148 let Some(value_h) = s.source_value.take() else {
149 // Source emitted RESOLVED-only — nothing to classify.
150 return ResolveOutcome::ResolvedNoValue;
151 };
152 let Some(rules_h) = s.latest_rules else {
153 // No rules cache — drop.
154 return ResolveOutcome::Drop(value_h);
155 };
156 // Call the classifier under the state lock — consistent with
157 // valve's `predicate_each` call site. Classifier may re-enter
158 // binding for handle deref, but MUST NOT re-enter Core.
159 if bb.invoke_stratify_classifier_fn(classifier_fn_id, rules_h, value_h) {
160 // Match — transfer retain to emit.
161 ResolveOutcome::Emit(value_h)
162 } else {
163 // Miss — release.
164 ResolveOutcome::Drop(value_h)
165 }
166}
167
168enum ResolveOutcome {
169 /// Both deps settled; classifier matched. Emit the handle (caller
170 /// owns retain).
171 Emit(HandleId),
172 /// Both deps settled; classifier missed OR no rules cache. Drop
173 /// the handle (caller releases retain).
174 Drop(HandleId),
175 /// Both deps settled; source emitted RESOLVED-only. Nothing to
176 /// emit; nothing to release.
177 ResolvedNoValue,
178 /// Either dep still dirty OR no source DATA buffered yet. No
179 /// action.
180 NotReady,
181}
182
183/// Single classifier-routing branch. Each rule in a TS `stratify(...)
184/// -> Graph` becomes one instance of this operator.
185///
186/// `classifier_fn_id` is a binding-registered closure of shape
187/// `(rules_handle, value_handle) -> bool`. The binding-side closure
188/// dereferences `rules_handle` to the latest rules array, looks up the
189/// branch's rule by name (captured in the closure), and runs the
190/// rule's `classify(value)` predicate. Returning `false` for "rule not
191/// found" or "classifier threw" matches TS semantics.
192///
193/// # Returns
194///
195/// The produced node's `NodeId`. The node emits DATA only for values
196/// whose classifier returned `true` after BOTH source and rules have
197/// settled in the same wave.
198#[must_use]
199pub fn stratify_branch(
200 core: &Core,
201 binding: &Arc<dyn ProducerBinding>,
202 source: NodeId,
203 rules: NodeId,
204 classifier_fn_id: FnId,
205) -> NodeId {
206 let core_weak = core.weak_handle();
207 let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
208 let binding_weak_for_state: Weak<dyn BindingBoundary> =
209 Arc::downgrade(binding) as Weak<dyn BindingBoundary>;
210
211 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
212 let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
213 return;
214 };
215 let pid = ctx.node_id();
216 let state: Arc<Mutex<StratifyState>> = Arc::new(Mutex::new(StratifyState {
217 latest_rules: None,
218 source_value: None,
219 source_dirty: false,
220 rules_dirty: false,
221 source_phase2: false,
222 terminated: false,
223 binding_weak: binding_weak_for_state.clone(),
224 }));
225
226 // --- rules sink ---
227 let st_rules = state.clone();
228 let bb_rules: Arc<dyn BindingBoundary> = binding_s.clone();
229 let core_rules = core_s.clone();
230 let rules_sink: Sink = Arc::new(move |msgs| {
231 enum Act {
232 ReleaseOldRules(HandleId),
233 Emit(HandleId),
234 Drop(HandleId),
235 }
236 let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
237 {
238 let mut s = st_rules.lock();
239 if s.terminated {
240 return;
241 }
242 for m in msgs {
243 if s.terminated {
244 break;
245 }
246 match m.tier() {
247 1 => {
248 // Rules DIRTY — gating signal.
249 s.rules_dirty = true;
250 }
251 3 => {
252 if let Some(h) = m.payload_handle() {
253 // Rules DATA — replace cached rules.
254 bb_rules.retain_handle(h);
255 if let Some(old) = s.latest_rules.replace(h) {
256 actions.push(Act::ReleaseOldRules(old));
257 }
258 s.rules_dirty = false;
259 } else {
260 // Rules RESOLVED-only: cache unchanged.
261 s.rules_dirty = false;
262 }
263 // Try resolve any buffered source value.
264 match try_resolve(&mut s, &bb_rules, classifier_fn_id) {
265 ResolveOutcome::Emit(h) => actions.push(Act::Emit(h)),
266 ResolveOutcome::Drop(h) => actions.push(Act::Drop(h)),
267 ResolveOutcome::ResolvedNoValue | ResolveOutcome::NotReady => {}
268 }
269 }
270 // Tier 4 (INVALIDATE), Tier 5 (COMPLETE/ERROR),
271 // Tier 6 (TEARDOWN) — silently absorbed (F5 doc).
272 // Branch keeps cached rules and continues; rules
273 // terminals do not propagate downstream.
274 _ => {}
275 }
276 }
277 }
278 for a in actions {
279 match a {
280 Act::ReleaseOldRules(h) | Act::Drop(h) => bb_rules.release_handle(h),
281 Act::Emit(h) => core_rules.emit_or_defer(pid, h),
282 }
283 }
284 });
285
286 // Subscribe to rules FIRST so push-on-subscribe of rules'
287 // cached DATA arrives before any source DATA the branch
288 // processes (R2.2.7).
289 let rules_outcome = ctx.subscribe_to(rules, rules_sink);
290 let _ = rules_outcome;
291
292 // N2 — defensive pre-seed for the Deferred / Dead path where
293 // the post-subscribe sync push didn't populate latest_rules
294 // before source DATA could arrive. Matches TS factory-time
295 // `latestRules = rulesNode.cache` seeding.
296 let pre_seed = core_s.cache_of(rules);
297 if pre_seed != NO_HANDLE {
298 let already_set = state.lock().latest_rules.is_some();
299 if !already_set {
300 // Lock-released retain per leaf-op contract, then
301 // re-check (covers the race where the rules sink
302 // fires between drop and re-lock, though under the
303 // build closure's partition lock this race is not
304 // expected today).
305 binding_s.retain_handle(pre_seed);
306 let mut s = state.lock();
307 if s.latest_rules.is_none() {
308 s.latest_rules = Some(pre_seed);
309 } else {
310 // Sink fired between drop+re-lock. Release our
311 // pre-seed retain.
312 drop(s);
313 binding_s.release_handle(pre_seed);
314 }
315 }
316 }
317
318 // --- source sink ---
319 let st_src = state.clone();
320 let bb_src: Arc<dyn BindingBoundary> = binding_s.clone();
321 let core_src = core_s.clone();
322 let source_sink: Sink = Arc::new(move |msgs| {
323 enum Act {
324 Emit(HandleId),
325 Drop(HandleId),
326 Complete,
327 Error(HandleId),
328 Teardown,
329 }
330 let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
331 {
332 let mut s = st_src.lock();
333 for m in msgs {
334 match m.tier() {
335 // Source DIRTY — gating signal. Skip if
336 // already terminated (post-terminal DIRTY
337 // would be spec-illegal but harmless to
338 // ignore). Match-guard form preferred over
339 // collapsed `1 if !s.terminated => ...` for
340 // readability alongside the other tiers'
341 // explicit `if s.terminated { continue; }`
342 // pattern.
343 #[allow(clippy::collapsible_match)]
344 1 => {
345 if !s.terminated {
346 s.source_dirty = true;
347 }
348 }
349 3 => {
350 if s.terminated {
351 continue;
352 }
353 if let Some(h) = m.payload_handle() {
354 // Source DATA — buffer with retain.
355 // If a previous source value was
356 // buffered (multi-emit batch), release
357 // it (use-latest semantic).
358 bb_src.retain_handle(h);
359 if let Some(prev) = s.source_value.replace(h) {
360 actions.push(Act::Drop(prev));
361 }
362 s.source_dirty = false;
363 s.source_phase2 = true;
364 } else {
365 // Source RESOLVED-only — release any
366 // buffered value (this wave settled
367 // without DATA).
368 if let Some(prev) = s.source_value.take() {
369 actions.push(Act::Drop(prev));
370 }
371 s.source_dirty = false;
372 s.source_phase2 = true;
373 }
374 // Try resolve under both-deps-settled
375 // gating.
376 match try_resolve(&mut s, &bb_src, classifier_fn_id) {
377 ResolveOutcome::Emit(h) => actions.push(Act::Emit(h)),
378 ResolveOutcome::Drop(h) => actions.push(Act::Drop(h)),
379 ResolveOutcome::ResolvedNoValue | ResolveOutcome::NotReady => {}
380 }
381 }
382 5 => {
383 if s.terminated {
384 continue;
385 }
386 // Source COMPLETE / ERROR — terminate +
387 // forward. Release any buffered source
388 // value first (we're terminating before
389 // resolving it).
390 if let Some(prev) = s.source_value.take() {
391 actions.push(Act::Drop(prev));
392 }
393 if let Some(h) = m.payload_handle() {
394 s.terminated = true;
395 bb_src.retain_handle(h);
396 actions.push(Act::Error(h));
397 } else {
398 s.terminated = true;
399 actions.push(Act::Complete);
400 }
401 }
402 6 => {
403 // F1 — Source TEARDOWN forward (TS
404 // parity). ALWAYS forwards, even after
405 // COMPLETE in the same wave: per spec
406 // R2.6.4 the framework auto-emits
407 // COMPLETE before TEARDOWN on a non-
408 // terminated state node, so suppressing
409 // TEARDOWN under a `terminated` check
410 // would silently swallow the lifecycle
411 // signal that downstream subscribers
412 // expect. Buffered source value (if any)
413 // is released defensively in case the
414 // tier 5 arm above didn't run (would be
415 // spec-illegal, but defensive).
416 if let Some(prev) = s.source_value.take() {
417 actions.push(Act::Drop(prev));
418 }
419 s.terminated = true;
420 actions.push(Act::Teardown);
421 }
422 _ => {}
423 }
424 }
425 }
426 for a in actions {
427 match a {
428 Act::Emit(h) => core_src.emit_or_defer(pid, h),
429 Act::Drop(h) => bb_src.release_handle(h),
430 Act::Complete => core_src.complete_or_defer(pid),
431 Act::Error(h) => core_src.error_or_defer(pid, h),
432 Act::Teardown => core_src.teardown_or_defer(pid),
433 }
434 }
435 });
436
437 let src_outcome = ctx.subscribe_to(source, source_sink);
438 if matches!(src_outcome, SubscribeOutcome::Dead { .. }) {
439 let mut s = state.lock();
440 if !s.terminated {
441 s.terminated = true;
442 drop(s);
443 core_s.complete_or_defer(pid);
444 }
445 }
446 });
447
448 let fn_id = binding.register_producer_build(build);
449 core.register_producer(fn_id)
450 .expect("stratify_branch: register_producer failed")
451}