Skip to main content

graphrefly_operators/
transform.rs

1//! Transform operators (R5.7) — element-wise mappings and folds.
2//!
3//! Mirrors the TS shapes in
4//! `~/src/graphrefly-ts/packages/legacy-pure-ts/src/extra/operators/transform.ts`,
5//! but driven by Core dispatch ([`graphrefly_core::OperatorOp`]) instead
6//! of derived-fn factories. Per Slice C-1 / D009.
7//!
8//! Each factory takes `&Core`, an `&Arc<dyn OperatorBinding>` (for
9//! registering the user closure), the `source` node id, the user-supplied
10//! callback, and any operator-specific arguments (seeds for folders).
11//! The factory:
12//!
13//! 1. Registers the user closure on the binding (returns a [`FnId`]).
14//! 2. Calls [`Core::register_operator`] with the appropriate
15//!    [`OperatorOp`] variant.
16//! 3. Returns an [`OperatorRegistration`] carrying the new node id and
17//!    (for folders) the registered seed handle.
18//!
19//! # Refcount discipline
20//!
21//! For [`scan`] and [`reduce`], the seed handle ownership transfers from
22//! the caller's binding-side intern into Core's
23//! [`ScanState`](graphrefly_core::op_state::ScanState) /
24//! [`ReduceState`](graphrefly_core::op_state::ReduceState) (post-Slice
25//! C-3 / D026 — generic `op_scratch` slot replaces the typed
26//! `operator_state` field). Core takes one retain via
27//! [`BindingBoundary::retain_handle`] inside `register_operator`; the
28//! caller is expected to retain a share for themselves if they want to
29//! reference the seed elsewhere.
30
31use std::sync::Arc;
32
33use graphrefly_core::{Core, FnId, HandleId, NodeId, OperatorOp, OperatorOpts};
34
35use crate::binding::OperatorBinding;
36
37/// Output of an operator factory call. Carries the new operator node id
38/// plus operator-specific context (e.g., the registered closure's `FnId`
39/// for traceability / debugging).
40#[derive(Copy, Clone, Debug)]
41#[must_use = "the operator's NodeId is the value of registering it"]
42pub struct OperatorRegistration {
43    pub node: NodeId,
44    pub fn_id: FnId,
45}
46
47impl OperatorRegistration {
48    /// Convenience: extract just the node id (the typical caller need).
49    #[must_use]
50    pub fn into_node(self) -> NodeId {
51        self.node
52    }
53}
54
55impl From<OperatorRegistration> for NodeId {
56    fn from(r: OperatorRegistration) -> Self {
57        r.node
58    }
59}
60
61// ---------------------------------------------------------------------
62// Stateless: map, filter
63// ---------------------------------------------------------------------
64
65/// `map(source, project)` — element-wise transform.
66///
67/// Maps each settled value from `source` through `project`. Each input
68/// in a wave's batch produces one output (R5.7 batch-mapping).
69///
70/// # Example
71///
72/// ```ignore
73/// use graphrefly_operators::map;
74/// let mapped = map(&core, &binding, source, |h: HandleId| { /* deref + transform */ });
75/// ```
76pub fn map<F>(
77    core: &Core,
78    binding: &Arc<dyn OperatorBinding>,
79    source: NodeId,
80    project: F,
81) -> OperatorRegistration
82where
83    F: Fn(HandleId) -> HandleId + Send + Sync + 'static,
84{
85    map_with(core, binding, source, project, OperatorOpts::default())
86}
87
88/// [`map`] with explicit [`OperatorOpts`].
89pub fn map_with<F>(
90    core: &Core,
91    binding: &Arc<dyn OperatorBinding>,
92    source: NodeId,
93    project: F,
94    opts: OperatorOpts,
95) -> OperatorRegistration
96where
97    F: Fn(HandleId) -> HandleId + Send + Sync + 'static,
98{
99    let fn_id = binding.register_projector(Box::new(project));
100    let node = core
101        .register_operator(&[source], OperatorOp::Map { fn_id }, opts)
102        .expect(
103            "invariant: caller has validated dep ids and seed before calling register_operator",
104        );
105    OperatorRegistration { node, fn_id }
106}
107
108/// `filter(source, predicate)` — silent-drop selection (D012/D018).
109///
110/// Forwards values where `predicate` returns `true`. Mixed-batch waves
111/// emit `[Dirty, Data(v_pass), ...]` per passing item with no settle
112/// noise for dropped items. Full-reject waves emit `[Dirty, Resolved]`
113/// to settle (D018).
114pub fn filter<F>(
115    core: &Core,
116    binding: &Arc<dyn OperatorBinding>,
117    source: NodeId,
118    predicate: F,
119) -> OperatorRegistration
120where
121    F: Fn(HandleId) -> bool + Send + Sync + 'static,
122{
123    filter_with(core, binding, source, predicate, OperatorOpts::default())
124}
125
126/// [`filter`] with explicit [`OperatorOpts`].
127pub fn filter_with<F>(
128    core: &Core,
129    binding: &Arc<dyn OperatorBinding>,
130    source: NodeId,
131    predicate: F,
132    opts: OperatorOpts,
133) -> OperatorRegistration
134where
135    F: Fn(HandleId) -> bool + Send + Sync + 'static,
136{
137    let fn_id = binding.register_predicate(Box::new(predicate));
138    let node = core
139        .register_operator(&[source], OperatorOp::Filter { fn_id }, opts)
140        .expect(
141            "invariant: caller has validated dep ids and seed before calling register_operator",
142        );
143    OperatorRegistration { node, fn_id }
144}
145
146// ---------------------------------------------------------------------
147// Stateful folders: scan, reduce
148// ---------------------------------------------------------------------
149
150/// `scan(source, fold, seed)` — left-fold emitting each new accumulator.
151///
152/// Required `seed`: there is no seedless mode where the first value
153/// becomes the accumulator (matches TS legacy). The seed handle must be
154/// pre-registered by the caller with the binding (so it has a real
155/// [`HandleId`]). Core takes one retain on the seed for the
156/// [`ScanState`](graphrefly_core::op_state::ScanState) slot's lifetime.
157pub fn scan<F>(
158    core: &Core,
159    binding: &Arc<dyn OperatorBinding>,
160    source: NodeId,
161    fold: F,
162    seed: HandleId,
163) -> OperatorRegistration
164where
165    F: Fn(HandleId, HandleId) -> HandleId + Send + Sync + 'static,
166{
167    scan_with(core, binding, source, fold, seed, OperatorOpts::default())
168}
169
170/// [`scan`] with explicit [`OperatorOpts`].
171pub fn scan_with<F>(
172    core: &Core,
173    binding: &Arc<dyn OperatorBinding>,
174    source: NodeId,
175    fold: F,
176    seed: HandleId,
177    opts: OperatorOpts,
178) -> OperatorRegistration
179where
180    F: Fn(HandleId, HandleId) -> HandleId + Send + Sync + 'static,
181{
182    let fn_id = binding.register_folder(Box::new(fold));
183    let node = core
184        .register_operator(&[source], OperatorOp::Scan { fn_id, seed }, opts)
185        .expect(
186            "invariant: caller has validated dep ids and seed before calling register_operator",
187        );
188    OperatorRegistration { node, fn_id }
189}
190
191/// `reduce(source, fold, seed)` — left-fold emitting once on upstream
192/// COMPLETE.
193///
194/// Accumulates silently while `source` emits DATA; on `source` COMPLETE,
195/// emits `[Dirty, Data(acc), Complete]` (where `acc` is the seed if no
196/// DATA arrived). On `source` ERROR, propagates the error verbatim
197/// without emitting the partial accumulator.
198pub fn reduce<F>(
199    core: &Core,
200    binding: &Arc<dyn OperatorBinding>,
201    source: NodeId,
202    fold: F,
203    seed: HandleId,
204) -> OperatorRegistration
205where
206    F: Fn(HandleId, HandleId) -> HandleId + Send + Sync + 'static,
207{
208    reduce_with(core, binding, source, fold, seed, OperatorOpts::default())
209}
210
211/// [`reduce`] with explicit [`OperatorOpts`].
212pub fn reduce_with<F>(
213    core: &Core,
214    binding: &Arc<dyn OperatorBinding>,
215    source: NodeId,
216    fold: F,
217    seed: HandleId,
218    opts: OperatorOpts,
219) -> OperatorRegistration
220where
221    F: Fn(HandleId, HandleId) -> HandleId + Send + Sync + 'static,
222{
223    let fn_id = binding.register_folder(Box::new(fold));
224    let node = core
225        .register_operator(&[source], OperatorOp::Reduce { fn_id, seed }, opts)
226        .expect(
227            "invariant: caller has validated dep ids and seed before calling register_operator",
228        );
229    OperatorRegistration { node, fn_id }
230}
231
232// ---------------------------------------------------------------------
233// Stateful pair-aware: distinctUntilChanged, pairwise
234// ---------------------------------------------------------------------
235
236/// `distinctUntilChanged(source, equals)` — suppresses adjacent duplicates.
237///
238/// Each input is compared against the previous emitted value via
239/// `equals`. If equal (suppressed), no output. If not equal, emit
240/// verbatim and update prev. First-ever input is always emitted.
241///
242/// # Default identity
243///
244/// For Identity-equals (the common case), pass a closure that returns
245/// `a == b`. For deeper comparison (`Object.is`-style or struct-equal),
246/// the binding should deref both handles and call the user's
247/// `Fn(T, T) -> bool` underneath.
248pub fn distinct_until_changed<F>(
249    core: &Core,
250    binding: &Arc<dyn OperatorBinding>,
251    source: NodeId,
252    equals: F,
253) -> OperatorRegistration
254where
255    F: Fn(HandleId, HandleId) -> bool + Send + Sync + 'static,
256{
257    distinct_until_changed_with(core, binding, source, equals, OperatorOpts::default())
258}
259
260/// [`distinct_until_changed`] with explicit [`OperatorOpts`].
261pub fn distinct_until_changed_with<F>(
262    core: &Core,
263    binding: &Arc<dyn OperatorBinding>,
264    source: NodeId,
265    equals: F,
266    opts: OperatorOpts,
267) -> OperatorRegistration
268where
269    F: Fn(HandleId, HandleId) -> bool + Send + Sync + 'static,
270{
271    let equals_fn_id = binding.register_equals(Box::new(equals));
272    let node = core
273        .register_operator(
274            &[source],
275            OperatorOp::DistinctUntilChanged { equals_fn_id },
276            opts,
277        )
278        .expect(
279            "invariant: caller has validated dep ids and seed before calling register_operator",
280        );
281    OperatorRegistration {
282        node,
283        fn_id: equals_fn_id,
284    }
285}
286
287/// `pairwise(source)` — emits `(prev, current)` pairs starting after the
288/// second value. First value is swallowed (used as `prev`).
289///
290/// The `pack` closure is provided by the binding-side wrapping helper —
291/// it takes two handles and returns a tuple-handle. For tests that
292/// don't care about the pair representation, `pack = |_, b| b` is a
293/// valid degenerate implementation that emits the current value only;
294/// production bindings install a real pair-packer.
295pub fn pairwise<F>(
296    core: &Core,
297    binding: &Arc<dyn OperatorBinding>,
298    source: NodeId,
299    pack: F,
300) -> OperatorRegistration
301where
302    F: Fn(HandleId, HandleId) -> HandleId + Send + Sync + 'static,
303{
304    pairwise_with(core, binding, source, pack, OperatorOpts::default())
305}
306
307/// [`pairwise`] with explicit [`OperatorOpts`].
308pub fn pairwise_with<F>(
309    core: &Core,
310    binding: &Arc<dyn OperatorBinding>,
311    source: NodeId,
312    pack: F,
313    opts: OperatorOpts,
314) -> OperatorRegistration
315where
316    F: Fn(HandleId, HandleId) -> HandleId + Send + Sync + 'static,
317{
318    let fn_id = binding.register_pairwise_packer(Box::new(pack));
319    let node = core
320        .register_operator(&[source], OperatorOp::Pairwise { fn_id }, opts)
321        .expect(
322            "invariant: caller has validated dep ids and seed before calling register_operator",
323        );
324    OperatorRegistration { node, fn_id }
325}