graphrefly_operators/transform.rs
1//! Transform operators (R5.7) — element-wise mappings and folds.
2//!
3//! Mirrors the TS shapes in
4//! `~/src/graphrefly-ts/packages/pure-ts/src/extra/operators/transform.ts`,
5//! but driven by Core dispatch ([`graphrefly_core::OperatorOp`]) instead
6//! of derived-fn factories. Per Slice C-1 / D009.
7//!
8//! Each factory takes `&Core`, an `&Arc<dyn OperatorBinding>` (for
9//! registering the user closure), the `source` node id, the user-supplied
10//! callback, and any operator-specific arguments (seeds for folders).
11//! The factory:
12//!
13//! 1. Registers the user closure on the binding (returns a [`FnId`]).
14//! 2. Calls [`Core::register_operator`] with the appropriate
15//! [`OperatorOp`] variant.
16//! 3. Returns an [`OperatorRegistration`] carrying the new node id and
17//! (for folders) the registered seed handle.
18//!
19//! # Refcount discipline
20//!
21//! For [`scan`] and [`reduce`], the seed handle ownership transfers from
22//! the caller's binding-side intern into Core's
23//! [`ScanState`](graphrefly_core::op_state::ScanState) /
24//! [`ReduceState`](graphrefly_core::op_state::ReduceState) (post-Slice
25//! C-3 / D026 — generic `op_scratch` slot replaces the typed
26//! `operator_state` field). Core takes one retain via
27//! [`BindingBoundary::retain_handle`] inside `register_operator`; the
28//! caller is expected to retain a share for themselves if they want to
29//! reference the seed elsewhere.
30
31use std::sync::Arc;
32
33use graphrefly_core::{Core, FnId, HandleId, NodeId, OperatorOp, OperatorOpts};
34
35use crate::binding::OperatorBinding;
36
37/// Output of an operator factory call. Carries the new operator node id
38/// plus operator-specific context (e.g., the registered closure's `FnId`
39/// for traceability / debugging).
40#[derive(Copy, Clone, Debug)]
41#[must_use = "the operator's NodeId is the value of registering it"]
42pub struct OperatorRegistration {
43 pub node: NodeId,
44 pub fn_id: FnId,
45}
46
47impl OperatorRegistration {
48 /// Convenience: extract just the node id (the typical caller need).
49 #[must_use]
50 pub fn into_node(self) -> NodeId {
51 self.node
52 }
53}
54
55impl From<OperatorRegistration> for NodeId {
56 fn from(r: OperatorRegistration) -> Self {
57 r.node
58 }
59}
60
61// ---------------------------------------------------------------------
62// Stateless: map, filter
63// ---------------------------------------------------------------------
64
65/// `map(source, project)` — element-wise transform.
66///
67/// Maps each settled value from `source` through `project`. Each input
68/// in a wave's batch produces one output (R5.7 batch-mapping).
69///
70/// # Example
71///
72/// ```no_run
73/// # use graphrefly_core::{Core, HandleId, NodeId};
74/// # use graphrefly_operators::{map, OperatorBinding};
75/// # use std::sync::Arc;
76/// # fn example(core: &Core, binding: &Arc<dyn OperatorBinding>, source: NodeId) {
77/// let mapped = map(core, binding, source, |h: HandleId| h);
78/// # }
79/// ```
80pub fn map<F>(
81 core: &Core,
82 binding: &Arc<dyn OperatorBinding>,
83 source: NodeId,
84 project: F,
85) -> OperatorRegistration
86where
87 F: Fn(HandleId) -> HandleId + Send + Sync + 'static,
88{
89 map_with(core, binding, source, project, OperatorOpts::default())
90}
91
92/// [`map`] with explicit [`OperatorOpts`].
93pub fn map_with<F>(
94 core: &Core,
95 binding: &Arc<dyn OperatorBinding>,
96 source: NodeId,
97 project: F,
98 opts: OperatorOpts,
99) -> OperatorRegistration
100where
101 F: Fn(HandleId) -> HandleId + Send + Sync + 'static,
102{
103 let fn_id = binding.register_projector(Box::new(project));
104 let node = core
105 .register_operator(&[source], OperatorOp::Map { fn_id }, opts)
106 .expect(
107 "invariant: caller has validated dep ids and seed before calling register_operator",
108 );
109 OperatorRegistration { node, fn_id }
110}
111
112/// `filter(source, predicate)` — silent-drop selection (D012/D018).
113///
114/// Forwards values where `predicate` returns `true`. Mixed-batch waves
115/// emit `[Dirty, Data(v_pass), ...]` per passing item with no settle
116/// noise for dropped items. Full-reject waves emit `[Dirty, Resolved]`
117/// to settle (D018).
118pub fn filter<F>(
119 core: &Core,
120 binding: &Arc<dyn OperatorBinding>,
121 source: NodeId,
122 predicate: F,
123) -> OperatorRegistration
124where
125 F: Fn(HandleId) -> bool + Send + Sync + 'static,
126{
127 filter_with(core, binding, source, predicate, OperatorOpts::default())
128}
129
130/// [`filter`] with explicit [`OperatorOpts`].
131pub fn filter_with<F>(
132 core: &Core,
133 binding: &Arc<dyn OperatorBinding>,
134 source: NodeId,
135 predicate: F,
136 opts: OperatorOpts,
137) -> OperatorRegistration
138where
139 F: Fn(HandleId) -> bool + Send + Sync + 'static,
140{
141 let fn_id = binding.register_predicate(Box::new(predicate));
142 let node = core
143 .register_operator(&[source], OperatorOp::Filter { fn_id }, opts)
144 .expect(
145 "invariant: caller has validated dep ids and seed before calling register_operator",
146 );
147 OperatorRegistration { node, fn_id }
148}
149
150// ---------------------------------------------------------------------
151// Stateful folders: scan, reduce
152// ---------------------------------------------------------------------
153
154/// `scan(source, fold, seed)` — left-fold emitting each new accumulator.
155///
156/// Required `seed`: there is no seedless mode where the first value
157/// becomes the accumulator (matches TS legacy). The seed handle must be
158/// pre-registered by the caller with the binding (so it has a real
159/// [`HandleId`]). Core takes one retain on the seed for the
160/// [`ScanState`](graphrefly_core::op_state::ScanState) slot's lifetime.
161pub fn scan<F>(
162 core: &Core,
163 binding: &Arc<dyn OperatorBinding>,
164 source: NodeId,
165 fold: F,
166 seed: HandleId,
167) -> OperatorRegistration
168where
169 F: Fn(HandleId, HandleId) -> HandleId + Send + Sync + 'static,
170{
171 scan_with(core, binding, source, fold, seed, OperatorOpts::default())
172}
173
174/// [`scan`] with explicit [`OperatorOpts`].
175pub fn scan_with<F>(
176 core: &Core,
177 binding: &Arc<dyn OperatorBinding>,
178 source: NodeId,
179 fold: F,
180 seed: HandleId,
181 opts: OperatorOpts,
182) -> OperatorRegistration
183where
184 F: Fn(HandleId, HandleId) -> HandleId + Send + Sync + 'static,
185{
186 let fn_id = binding.register_folder(Box::new(fold));
187 let node = core
188 .register_operator(&[source], OperatorOp::Scan { fn_id, seed }, opts)
189 .expect(
190 "invariant: caller has validated dep ids and seed before calling register_operator",
191 );
192 OperatorRegistration { node, fn_id }
193}
194
195/// `reduce(source, fold, seed)` — left-fold emitting once on upstream
196/// COMPLETE.
197///
198/// Accumulates silently while `source` emits DATA; on `source` COMPLETE,
199/// emits `[Dirty, Data(acc), Complete]` (where `acc` is the seed if no
200/// DATA arrived). On `source` ERROR, propagates the error verbatim
201/// without emitting the partial accumulator.
202pub fn reduce<F>(
203 core: &Core,
204 binding: &Arc<dyn OperatorBinding>,
205 source: NodeId,
206 fold: F,
207 seed: HandleId,
208) -> OperatorRegistration
209where
210 F: Fn(HandleId, HandleId) -> HandleId + Send + Sync + 'static,
211{
212 reduce_with(core, binding, source, fold, seed, OperatorOpts::default())
213}
214
215/// [`reduce`] with explicit [`OperatorOpts`].
216pub fn reduce_with<F>(
217 core: &Core,
218 binding: &Arc<dyn OperatorBinding>,
219 source: NodeId,
220 fold: F,
221 seed: HandleId,
222 opts: OperatorOpts,
223) -> OperatorRegistration
224where
225 F: Fn(HandleId, HandleId) -> HandleId + Send + Sync + 'static,
226{
227 let fn_id = binding.register_folder(Box::new(fold));
228 let node = core
229 .register_operator(&[source], OperatorOp::Reduce { fn_id, seed }, opts)
230 .expect(
231 "invariant: caller has validated dep ids and seed before calling register_operator",
232 );
233 OperatorRegistration { node, fn_id }
234}
235
236// ---------------------------------------------------------------------
237// Stateful pair-aware: distinctUntilChanged, pairwise
238// ---------------------------------------------------------------------
239
240/// `distinctUntilChanged(source, equals)` — suppresses adjacent duplicates.
241///
242/// Each input is compared against the previous emitted value via
243/// `equals`. If equal (suppressed), no output. If not equal, emit
244/// verbatim and update prev. First-ever input is always emitted.
245///
246/// # Default identity
247///
248/// For Identity-equals (the common case), pass a closure that returns
249/// `a == b`. For deeper comparison (`Object.is`-style or struct-equal),
250/// the binding should deref both handles and call the user's
251/// `Fn(T, T) -> bool` underneath.
252pub fn distinct_until_changed<F>(
253 core: &Core,
254 binding: &Arc<dyn OperatorBinding>,
255 source: NodeId,
256 equals: F,
257) -> OperatorRegistration
258where
259 F: Fn(HandleId, HandleId) -> bool + Send + Sync + 'static,
260{
261 distinct_until_changed_with(core, binding, source, equals, OperatorOpts::default())
262}
263
264/// [`distinct_until_changed`] with explicit [`OperatorOpts`].
265pub fn distinct_until_changed_with<F>(
266 core: &Core,
267 binding: &Arc<dyn OperatorBinding>,
268 source: NodeId,
269 equals: F,
270 opts: OperatorOpts,
271) -> OperatorRegistration
272where
273 F: Fn(HandleId, HandleId) -> bool + Send + Sync + 'static,
274{
275 let equals_fn_id = binding.register_equals(Box::new(equals));
276 let node = core
277 .register_operator(
278 &[source],
279 OperatorOp::DistinctUntilChanged { equals_fn_id },
280 opts,
281 )
282 .expect(
283 "invariant: caller has validated dep ids and seed before calling register_operator",
284 );
285 OperatorRegistration {
286 node,
287 fn_id: equals_fn_id,
288 }
289}
290
291/// `pairwise(source)` — emits `(prev, current)` pairs starting after the
292/// second value. First value is swallowed (used as `prev`).
293///
294/// The `pack` closure is provided by the binding-side wrapping helper —
295/// it takes two handles and returns a tuple-handle. For tests that
296/// don't care about the pair representation, `pack = |_, b| b` is a
297/// valid degenerate implementation that emits the current value only;
298/// production bindings install a real pair-packer.
299pub fn pairwise<F>(
300 core: &Core,
301 binding: &Arc<dyn OperatorBinding>,
302 source: NodeId,
303 pack: F,
304) -> OperatorRegistration
305where
306 F: Fn(HandleId, HandleId) -> HandleId + Send + Sync + 'static,
307{
308 pairwise_with(core, binding, source, pack, OperatorOpts::default())
309}
310
311/// [`pairwise`] with explicit [`OperatorOpts`].
312pub fn pairwise_with<F>(
313 core: &Core,
314 binding: &Arc<dyn OperatorBinding>,
315 source: NodeId,
316 pack: F,
317 opts: OperatorOpts,
318) -> OperatorRegistration
319where
320 F: Fn(HandleId, HandleId) -> HandleId + Send + Sync + 'static,
321{
322 let fn_id = binding.register_pairwise_packer(Box::new(pack));
323 let node = core
324 .register_operator(&[source], OperatorOp::Pairwise { fn_id }, opts)
325 .expect(
326 "invariant: caller has validated dep ids and seed before calling register_operator",
327 );
328 OperatorRegistration { node, fn_id }
329}