graphrefly_operators/stratify.rs
1// D248: post-S2c the substrate is `!Send + !Sync` single-owner Core; the
2// Sink/TopologySink callbacks were deliberately relaxed to `Arc<dyn Fn>`
3// (dropped `+ Send + Sync`). Rc would suffice and is the architecturally
4// correct type for inherently single-owner sinks — the Arc→Rc cleanup is
5// a separate slice tracked in porting-deferred.md. Until then, `Arc` is
6// over-conservative but correct, and this file's Arc<Sink> sites cite
7// the deliberate D248 relaxation, not a missed Send+Sync bound.
8#![allow(clippy::arc_with_non_send_sync)]
9
10//! `stratify_branch` — substrate operator for classifier-routing.
11//!
12//! Substrate counterpart of TS `extra/composition/stratify.ts` (D199,
13//! Unit 5 Q9.2 of `SESSION-rust-port-layer-boundary.md`). The TS
14//! `stratify(name, source, rules, opts) -> Graph` factory composes N
15//! instances of this branch operator (one per rule) inside a single
16//! Graph; the Graph wrapper itself stays binding-side (presentation
17//! under D193). The classifier-routing logic — two-input subscribe,
18//! reactive-rules cache, two-dep DIRTY gating, per-fire dispatch — is
19//! the Rust substrate.
20//!
21//! # Semantics (mirrors TS `_addBranch` in stratify.ts)
22//!
23//! - Subscribes to BOTH `source` and `rules`. Rules first, then
24//! source — so push-on-subscribe of rules' cached DATA arrives
25//! before any source DATA the branch processes (R2.2.7).
26//! - **Two-dep DIRTY gating** (TS parity). On either dep's DIRTY, the
27//! handler buffers and waits. When BOTH deps have settled in the
28//! same wave (no pending DIRTY on either), the cached source value
29//! is classified under the cached rules. This eliminates the
30//! stale-rules race when both deps update inside a single
31//! `core.batch()`.
32//! - `rules` DATA: replaces the cached rules handle. Retains the new
33//! handle; releases the previously cached one. No downstream emit
34//! ("future items only" — rules updates affect FUTURE source items
35//! classified under them, not the current one).
36//! - `source` DATA: buffered (with retain) until both deps settle.
37//! On resolve, invokes
38//! `binding.invoke_stratify_classifier_fn(classifier_fn_id,
39//! rules_handle, value_handle)`. If `true`, the buffered handle's
40//! retain transfers to the emit queue. If `false`, the handle is
41//! released. If no rules handle has arrived (sentinel state), the
42//! buffered source DATA is dropped — matches TS "rule not found →
43//! false".
44//! - `source` COMPLETE / ERROR / TEARDOWN: forwarded unchanged. F1
45//! fix (QA 2026-05-14) — TEARDOWN forwarding restored to match TS
46//! `if (depIndex === 0) actions.down([msg])` for tier 5 + 6.
47//! - `rules` COMPLETE / ERROR / TEARDOWN / INVALIDATE: silently
48//! absorbed. The branch keeps its last-seen rules cache and
49//! continues; rules' terminals don't propagate downstream of the
50//! branch (TS parity). F5 fix (QA 2026-05-14) — rules INVALIDATE
51//! intentionally does NOT clear `latest_rules`; matches TS, where
52//! rules' INVALIDATE message is silently absorbed and `latestRules`
53//! keeps its previous value.
54//! - On producer deactivation: the Drop impl on `StratifyState`
55//! releases both the cached rules handle and any buffered source
56//! value via `Weak<dyn BindingBoundary>` (leaf-op safe).
57//! - **N2 fix (QA 2026-05-14)** — under Phase H+ STRICT scheduling,
58//! `subscribe_to(rules, ...)` may return `Deferred`, in which case
59//! the post-subscribe sync push doesn't populate `latest_rules`
60//! before the source's first DATA arrives. The build closure
61//! defensively reads `core.cache_of(rules)` and pre-seeds the cache
62//! when the sync push didn't. Matches the TS factory-time
63//! `latestRules = rulesNode.cache` seeding.
64
65#![allow(clippy::too_many_lines)]
66
67use std::sync::{Arc, Weak};
68
69use parking_lot::Mutex;
70use smallvec::SmallVec;
71
72use graphrefly_core::{BindingBoundary, Core, FnId, HandleId, NodeId, Sink, NO_HANDLE};
73
74use crate::producer::{ProducerBinding, ProducerBuildFn, ProducerCtx, SubscribeOutcome};
75
76// =====================================================================
77// stratify_branch(source, rules, classifier_fn_id)
78// =====================================================================
79
80/// Internal state shared between source-sink, rules-sink, and the
81/// `Drop` impl. Tracks the cached rules handle, the buffered source
82/// value awaiting classification, and the two-dep DIRTY gating
83/// counters.
84///
85/// `clippy::struct_excessive_bools` is allowed because the four
86/// gating bools (`source_dirty`, `rules_dirty`, `source_phase2`,
87/// `terminated`) each represent an independent lifecycle / per-wave
88/// signal. A state-machine refactor would be more code without
89/// changing semantics; keep the flat shape per TS parity.
90#[allow(clippy::struct_excessive_bools)]
91struct StratifyState {
92 /// Latest rules handle, retain held while present. Replaced (with
93 /// release-of-old / retain-of-new) on each rules DATA. None means
94 /// either no rules emission has arrived yet OR rules emitted
95 /// RESOLVED-only without prior DATA.
96 latest_rules: Option<HandleId>,
97 /// Buffered source DATA value awaiting classification, retain held
98 /// while present. Set when source emits DATA and gating buffers
99 /// it; cleared on resolve (transferring retain to emit) or on
100 /// drop (releasing).
101 source_value: Option<HandleId>,
102 /// True after source emits DIRTY this wave; cleared on source
103 /// DATA/RESOLVED. Gating predicate: resolve only fires when both
104 /// `source_dirty` and `rules_dirty` are false.
105 source_dirty: bool,
106 /// True after rules emits DIRTY this wave; cleared on rules
107 /// DATA/RESOLVED. Symmetric to `source_dirty`.
108 rules_dirty: bool,
109 /// True after source DATA/RESOLVED arrives but resolve hasn't
110 /// fired yet (waiting for rules to settle). Reset by resolve.
111 source_phase2: bool,
112 /// Set on first source terminal (COMPLETE / ERROR / TEARDOWN).
113 /// Subsequent sink callbacks short-circuit so duplicate terminals
114 /// never leak retains.
115 terminated: bool,
116 /// Weak ref to the binding for the `Drop` impl. Held as `Weak`
117 /// rather than `Arc` so the state's drop doesn't keep the binding
118 /// alive past graph teardown.
119 binding_weak: Weak<dyn BindingBoundary>,
120}
121
122impl Drop for StratifyState {
123 fn drop(&mut self) {
124 let bb = self.binding_weak.upgrade();
125 let Some(bb) = bb else {
126 return;
127 };
128 if let Some(h) = self.latest_rules.take() {
129 bb.release_handle(h);
130 }
131 if let Some(h) = self.source_value.take() {
132 bb.release_handle(h);
133 }
134 }
135}
136
137/// Try to resolve a buffered source value under the cached rules.
138/// Returns:
139/// - `Some(handle)` if the classifier matched — caller emits DATA;
140/// the returned handle's retain has transferred to the caller.
141/// - `None` if there was nothing to resolve OR the classifier dropped
142/// the value (release already issued via the returned `released`
143/// handle in the second tuple slot).
144///
145/// Caller must hold `state` lock-released for `bb.release_handle` /
146/// `core.emit_or_defer` calls per leaf-op contract.
147#[allow(clippy::option_option)]
148fn try_resolve(
149 s: &mut StratifyState,
150 bb: &Arc<dyn BindingBoundary>,
151 classifier_fn_id: FnId,
152) -> ResolveOutcome {
153 if s.source_dirty || s.rules_dirty || !s.source_phase2 {
154 return ResolveOutcome::NotReady;
155 }
156 s.source_phase2 = false;
157 let Some(value_h) = s.source_value.take() else {
158 // Source emitted RESOLVED-only — nothing to classify.
159 return ResolveOutcome::ResolvedNoValue;
160 };
161 let Some(rules_h) = s.latest_rules else {
162 // No rules cache — drop.
163 return ResolveOutcome::Drop(value_h);
164 };
165 // Call the classifier under the state lock — consistent with
166 // valve's `predicate_each` call site. Classifier may re-enter
167 // binding for handle deref, but MUST NOT re-enter Core.
168 if bb.invoke_stratify_classifier_fn(classifier_fn_id, rules_h, value_h) {
169 // Match — transfer retain to emit.
170 ResolveOutcome::Emit(value_h)
171 } else {
172 // Miss — release.
173 ResolveOutcome::Drop(value_h)
174 }
175}
176
177enum ResolveOutcome {
178 /// Both deps settled; classifier matched. Emit the handle (caller
179 /// owns retain).
180 Emit(HandleId),
181 /// Both deps settled; classifier missed OR no rules cache. Drop
182 /// the handle (caller releases retain).
183 Drop(HandleId),
184 /// Both deps settled; source emitted RESOLVED-only. Nothing to
185 /// emit; nothing to release.
186 ResolvedNoValue,
187 /// Either dep still dirty OR no source DATA buffered yet. No
188 /// action.
189 NotReady,
190}
191
192/// Single classifier-routing branch. Each rule in a TS `stratify(...)
193/// -> Graph` becomes one instance of this operator.
194///
195/// `classifier_fn_id` is a binding-registered closure of shape
196/// `(rules_handle, value_handle) -> bool`. The binding-side closure
197/// dereferences `rules_handle` to the latest rules array, looks up the
198/// branch's rule by name (captured in the closure), and runs the
199/// rule's `classify(value)` predicate. Returning `false` for "rule not
200/// found" or "classifier threw" matches TS semantics.
201///
202/// # Returns
203///
204/// The produced node's `NodeId`. The node emits DATA only for values
205/// whose classifier returned `true` after BOTH source and rules have
206/// settled in the same wave.
207#[must_use]
208pub fn stratify_branch(
209 core: &Core,
210 binding: &Arc<dyn ProducerBinding>,
211 source: NodeId,
212 rules: NodeId,
213 classifier_fn_id: FnId,
214) -> NodeId {
215 let binding_weak_for_state: Weak<dyn BindingBoundary> =
216 Arc::downgrade(binding) as Weak<dyn BindingBoundary>;
217
218 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
219 // S2b/D231: build-side `&Core` via ctx; sinks use `em`.
220 let core_s = ctx.core();
221 let binding_s = ctx.core().binding();
222 let em = ctx.emitter();
223 let pid = ctx.node_id();
224 let state: Arc<Mutex<StratifyState>> = Arc::new(Mutex::new(StratifyState {
225 latest_rules: None,
226 source_value: None,
227 source_dirty: false,
228 rules_dirty: false,
229 source_phase2: false,
230 terminated: false,
231 binding_weak: binding_weak_for_state.clone(),
232 }));
233
234 // --- rules sink ---
235 let st_rules = state.clone();
236 let bb_rules: Arc<dyn BindingBoundary> = binding_s.clone();
237 let core_rules = em.clone();
238 let rules_sink: Sink = Arc::new(move |msgs| {
239 enum Act {
240 ReleaseOldRules(HandleId),
241 Emit(HandleId),
242 Drop(HandleId),
243 }
244 let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
245 {
246 let mut s = st_rules.lock();
247 if s.terminated {
248 return;
249 }
250 for m in msgs {
251 if s.terminated {
252 break;
253 }
254 match m.tier() {
255 1 => {
256 // Rules DIRTY — gating signal.
257 s.rules_dirty = true;
258 }
259 3 => {
260 if let Some(h) = m.payload_handle() {
261 // Rules DATA — replace cached rules.
262 bb_rules.retain_handle(h);
263 if let Some(old) = s.latest_rules.replace(h) {
264 actions.push(Act::ReleaseOldRules(old));
265 }
266 s.rules_dirty = false;
267 } else {
268 // Rules RESOLVED-only: cache unchanged.
269 s.rules_dirty = false;
270 }
271 // Try resolve any buffered source value.
272 match try_resolve(&mut s, &bb_rules, classifier_fn_id) {
273 ResolveOutcome::Emit(h) => actions.push(Act::Emit(h)),
274 ResolveOutcome::Drop(h) => actions.push(Act::Drop(h)),
275 ResolveOutcome::ResolvedNoValue | ResolveOutcome::NotReady => {}
276 }
277 }
278 // Tier 4 (INVALIDATE), Tier 5 (COMPLETE/ERROR),
279 // Tier 6 (TEARDOWN) — silently absorbed (F5 doc).
280 // Branch keeps cached rules and continues; rules
281 // terminals do not propagate downstream.
282 _ => {}
283 }
284 }
285 }
286 for a in actions {
287 match a {
288 Act::ReleaseOldRules(h) | Act::Drop(h) => bb_rules.release_handle(h),
289 Act::Emit(h) => core_rules.emit_or_defer(pid, h),
290 }
291 }
292 });
293
294 // Subscribe to rules FIRST so push-on-subscribe of rules'
295 // cached DATA arrives before any source DATA the branch
296 // processes (R2.2.7).
297 let rules_outcome = ctx.subscribe_to(rules, rules_sink);
298 let _ = rules_outcome;
299
300 // N2 — defensive pre-seed for the Deferred / Dead path where
301 // the post-subscribe sync push didn't populate latest_rules
302 // before source DATA could arrive. Matches TS factory-time
303 // `latestRules = rulesNode.cache` seeding.
304 let pre_seed = core_s.cache_of(rules);
305 if pre_seed != NO_HANDLE {
306 let already_set = state.lock().latest_rules.is_some();
307 if !already_set {
308 // Lock-released retain per leaf-op contract, then
309 // re-check (covers the race where the rules sink
310 // fires between drop and re-lock, though under the
311 // build closure's partition lock this race is not
312 // expected today).
313 binding_s.retain_handle(pre_seed);
314 let mut s = state.lock();
315 if s.latest_rules.is_none() {
316 s.latest_rules = Some(pre_seed);
317 } else {
318 // Sink fired between drop+re-lock. Release our
319 // pre-seed retain.
320 drop(s);
321 binding_s.release_handle(pre_seed);
322 }
323 }
324 }
325
326 // --- source sink ---
327 let st_src = state.clone();
328 let bb_src: Arc<dyn BindingBoundary> = binding_s.clone();
329 let core_src = em.clone();
330 let source_sink: Sink = Arc::new(move |msgs| {
331 enum Act {
332 Emit(HandleId),
333 Drop(HandleId),
334 Complete,
335 Error(HandleId),
336 Teardown,
337 }
338 let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
339 {
340 let mut s = st_src.lock();
341 for m in msgs {
342 match m.tier() {
343 // Source DIRTY — gating signal. Skip if
344 // already terminated (post-terminal DIRTY
345 // would be spec-illegal but harmless to
346 // ignore). Match-guard form preferred over
347 // collapsed `1 if !s.terminated => ...` for
348 // readability alongside the other tiers'
349 // explicit `if s.terminated { continue; }`
350 // pattern.
351 #[allow(clippy::collapsible_match)]
352 1 => {
353 if !s.terminated {
354 s.source_dirty = true;
355 }
356 }
357 3 => {
358 if s.terminated {
359 continue;
360 }
361 if let Some(h) = m.payload_handle() {
362 // Source DATA — buffer with retain.
363 // If a previous source value was
364 // buffered (multi-emit batch), release
365 // it (use-latest semantic).
366 bb_src.retain_handle(h);
367 if let Some(prev) = s.source_value.replace(h) {
368 actions.push(Act::Drop(prev));
369 }
370 s.source_dirty = false;
371 s.source_phase2 = true;
372 } else {
373 // Source RESOLVED-only — release any
374 // buffered value (this wave settled
375 // without DATA).
376 if let Some(prev) = s.source_value.take() {
377 actions.push(Act::Drop(prev));
378 }
379 s.source_dirty = false;
380 s.source_phase2 = true;
381 }
382 // Try resolve under both-deps-settled
383 // gating.
384 match try_resolve(&mut s, &bb_src, classifier_fn_id) {
385 ResolveOutcome::Emit(h) => actions.push(Act::Emit(h)),
386 ResolveOutcome::Drop(h) => actions.push(Act::Drop(h)),
387 ResolveOutcome::ResolvedNoValue | ResolveOutcome::NotReady => {}
388 }
389 }
390 5 => {
391 if s.terminated {
392 continue;
393 }
394 // Source COMPLETE / ERROR — terminate +
395 // forward. Release any buffered source
396 // value first (we're terminating before
397 // resolving it).
398 if let Some(prev) = s.source_value.take() {
399 actions.push(Act::Drop(prev));
400 }
401 if let Some(h) = m.payload_handle() {
402 s.terminated = true;
403 bb_src.retain_handle(h);
404 actions.push(Act::Error(h));
405 } else {
406 s.terminated = true;
407 actions.push(Act::Complete);
408 }
409 }
410 6 => {
411 // F1 — Source TEARDOWN forward (TS
412 // parity). ALWAYS forwards, even after
413 // COMPLETE in the same wave: per spec
414 // R2.6.4 the framework auto-emits
415 // COMPLETE before TEARDOWN on a non-
416 // terminated state node, so suppressing
417 // TEARDOWN under a `terminated` check
418 // would silently swallow the lifecycle
419 // signal that downstream subscribers
420 // expect. Buffered source value (if any)
421 // is released defensively in case the
422 // tier 5 arm above didn't run (would be
423 // spec-illegal, but defensive).
424 if let Some(prev) = s.source_value.take() {
425 actions.push(Act::Drop(prev));
426 }
427 s.terminated = true;
428 actions.push(Act::Teardown);
429 }
430 _ => {}
431 }
432 }
433 }
434 for a in actions {
435 match a {
436 Act::Emit(h) => core_src.emit_or_defer(pid, h),
437 Act::Drop(h) => bb_src.release_handle(h),
438 Act::Complete => core_src.complete_or_defer(pid),
439 Act::Error(h) => core_src.error_or_defer(pid, h),
440 Act::Teardown => {
441 // D234: sink-side terminal forward via em.defer.
442 let _ = core_src.defer(move |c| c.teardown(pid));
443 }
444 }
445 }
446 });
447
448 let src_outcome = ctx.subscribe_to(source, source_sink);
449 if matches!(src_outcome, SubscribeOutcome::Dead { .. }) {
450 let mut s = state.lock();
451 if !s.terminated {
452 s.terminated = true;
453 drop(s);
454 core_s.complete_or_defer(pid);
455 }
456 }
457 });
458
459 let fn_id = binding.register_producer_build(build);
460 core.register_producer(fn_id)
461 .expect("stratify_branch: register_producer failed")
462}