graphrefly_operators/combine.rs
1//! Multi-dep combinator operators (Slice C-2, D020).
2//!
3//! Mirrors the TS shapes in
4//! `~/src/graphrefly-ts/packages/pure-ts/src/extra/operators/combine.ts`,
5//! but driven by Core dispatch ([`graphrefly_core::OperatorOp`]) instead
6//! of derived-fn factories.
7//!
8//! - [`combine`] — N-dep combineLatest (any dep fire → emit packed tuple)
9//! - [`with_latest_from`] — 2-dep, fire-on-primary-only (Phase 10.5)
10//! - [`merge`] — N-dep, forward all DATA handles verbatim (zero FFI)
11
12use std::sync::Arc;
13
14use graphrefly_core::{Core, FnId, HandleId, NodeId, OperatorOp, OperatorOpts};
15
16use crate::binding::OperatorBinding;
17use crate::error::OperatorFactoryError;
18use crate::transform::OperatorRegistration;
19
20/// Boxed packer closure type — converts N HandleIds to a single tuple HandleId.
21pub type PackerFn = Box<dyn Fn(&[HandleId]) -> HandleId + Send + Sync>;
22
23/// `combine(...sources)` — combineLatest semantics.
24///
25/// Emits a packed tuple of the latest handle per dep whenever any dep
26/// fires. First-run gate (`partial: false` default) holds until all deps
27/// deliver real DATA (R2.5.3).
28///
29/// # Arguments
30///
31/// - `core` — the Core dispatcher.
32/// - `binding` — implements [`OperatorBinding`] for closure registration.
33/// - `sources` — N dep node ids (order determines tuple position).
34/// - `packer` — closure that takes N `HandleId`s and returns a single
35/// tuple `HandleId`. The binding wraps `Fn(&[T]) -> Tuple` into this.
36///
37/// # Errors
38///
39/// - [`OperatorFactoryError::EmptySources`] — `sources` is empty.
40/// - [`OperatorFactoryError::Register`] — Core-layer registration error
41/// (unknown / terminal-non-resubscribable dep).
42pub fn combine(
43 core: &Core,
44 binding: &Arc<dyn OperatorBinding>,
45 sources: &[NodeId],
46 packer: PackerFn,
47) -> Result<OperatorRegistration, OperatorFactoryError> {
48 if sources.is_empty() {
49 return Err(OperatorFactoryError::EmptySources);
50 }
51 let pack_fn = binding.register_packer(packer);
52 let opts = OperatorOpts::default(); // partial: false — gate all deps
53 let node = core.register_operator(sources, OperatorOp::Combine { pack_fn }, opts)?;
54 Ok(OperatorRegistration {
55 node,
56 fn_id: pack_fn,
57 })
58}
59
60/// `with_latest_from(primary, secondary)` — fire-on-primary-only.
61///
62/// Emits a packed pair `[primary, secondary]` only when `primary` (dep[0])
63/// has DATA in the wave. If only `secondary` (dep[1]) fires, settles with
64/// RESOLVED. Matches Phase 10.5 semantics (D021).
65///
66/// First-run gate (`partial: false`) holds until both deps deliver real
67/// DATA (R2.5.3). Post-warmup, fires on primary alone; if secondary has
68/// been invalidated (prev_data == NO_HANDLE), settles with RESOLVED
69/// instead of emitting a stale pair.
70///
71/// # Arguments
72///
73/// - `core` — the Core dispatcher.
74/// - `binding` — implements [`OperatorBinding`] for closure registration.
75/// - `primary` — the triggering dep (dep[0]).
76/// - `secondary` — the sampled dep (dep[1]).
77/// - `packer` — closure that packs 2 `HandleId`s into a pair handle.
78pub fn with_latest_from(
79 core: &Core,
80 binding: &Arc<dyn OperatorBinding>,
81 primary: NodeId,
82 secondary: NodeId,
83 packer: PackerFn,
84) -> OperatorRegistration {
85 let pack_fn = binding.register_packer(packer);
86 let opts = OperatorOpts::default(); // partial: false — gate both deps
87 let node = core
88 .register_operator(
89 &[primary, secondary],
90 OperatorOp::WithLatestFrom { pack_fn },
91 opts,
92 )
93 .expect(
94 "invariant: caller has validated dep ids and seed before calling register_operator",
95 );
96 OperatorRegistration {
97 node,
98 fn_id: pack_fn,
99 }
100}
101
102/// Registration output for [`merge`] — no closure involved (zero FFI).
103#[derive(Copy, Clone, Debug)]
104#[must_use = "the merge operator's NodeId is the value of registering it"]
105pub struct MergeRegistration {
106 pub node: NodeId,
107}
108
109impl MergeRegistration {
110 /// Convenience: extract just the node id.
111 #[must_use]
112 pub fn into_node(self) -> NodeId {
113 self.node
114 }
115}
116
117/// `merge(...sources)` — forward all DATA handles verbatim (zero FFI).
118///
119/// Each dep's DATA is forwarded individually without transformation.
120/// COMPLETE cascades when all deps complete (R1.3.4.b). No binding
121/// call on the fire path — maximally efficient.
122///
123/// # Arguments
124///
125/// - `core` — the Core dispatcher.
126/// - `sources` — N dep node ids.
127///
128/// # Errors
129///
130/// - [`OperatorFactoryError::EmptySources`] — `sources` is empty.
131/// - [`OperatorFactoryError::Register`] — Core-layer registration error
132/// (unknown / terminal-non-resubscribable dep).
133pub fn merge(core: &Core, sources: &[NodeId]) -> Result<MergeRegistration, OperatorFactoryError> {
134 if sources.is_empty() {
135 return Err(OperatorFactoryError::EmptySources);
136 }
137 let opts = OperatorOpts {
138 partial: true, // merge fires as soon as ANY dep delivers
139 ..OperatorOpts::default()
140 };
141 let node = core.register_operator(sources, OperatorOp::Merge, opts)?;
142 Ok(MergeRegistration { node })
143}
144
145/// Convenience: `merge` with an explicit [`FnId`] return for API
146/// consistency. Since merge is zero-FFI, the `FnId` is a sentinel.
147/// Prefer [`merge`] unless you need the [`OperatorRegistration`] shape.
148///
149/// # Errors
150///
151/// Same conditions as [`merge`].
152pub fn merge_as_op(
153 core: &Core,
154 sources: &[NodeId],
155) -> Result<OperatorRegistration, OperatorFactoryError> {
156 let reg = merge(core, sources)?;
157 Ok(OperatorRegistration {
158 node: reg.node,
159 fn_id: FnId::new(0), // sentinel — no closure registered
160 })
161}