Skip to main content

graphrefly_operators/
combine.rs

1//! Multi-dep combinator operators (Slice C-2, D020).
2//!
3//! Mirrors the TS shapes in
4//! `~/src/graphrefly-ts/packages/pure-ts/src/extra/operators/combine.ts`,
5//! but driven by Core dispatch ([`graphrefly_core::OperatorOp`]) instead
6//! of derived-fn factories.
7//!
8//! - [`combine`] — N-dep combineLatest (any dep fire → emit packed tuple)
9//! - [`with_latest_from`] — 2-dep, fire-on-primary-only (Phase 10.5)
10//! - [`merge`] — N-dep, forward all DATA handles verbatim (zero FFI)
11
12use std::sync::Arc;
13
14use graphrefly_core::{Core, FnId, HandleId, NodeId, OperatorOp, OperatorOpts};
15
16use crate::binding::OperatorBinding;
17use crate::error::OperatorFactoryError;
18use crate::transform::OperatorRegistration;
19
20/// Boxed packer closure type — converts N HandleIds to a single tuple HandleId.
21pub type PackerFn = Box<dyn Fn(&[HandleId]) -> HandleId + Send + Sync>;
22
23/// `combine(...sources)` — combineLatest semantics.
24///
25/// Emits a packed tuple of the latest handle per dep whenever any dep
26/// fires. First-run gate (`partial: false` default) holds until all deps
27/// deliver real DATA (R2.5.3).
28///
29/// # Arguments
30///
31/// - `core` — the Core dispatcher.
32/// - `binding` — implements [`OperatorBinding`] for closure registration.
33/// - `sources` — N dep node ids (order determines tuple position).
34/// - `packer` — closure that takes N `HandleId`s and returns a single
35///   tuple `HandleId`. The binding wraps `Fn(&[T]) -> Tuple` into this.
36///
37/// # Errors
38///
39/// - [`OperatorFactoryError::EmptySources`] — `sources` is empty.
40/// - [`OperatorFactoryError::Register`] — Core-layer registration error
41///   (unknown / terminal-non-resubscribable dep).
42pub fn combine(
43    core: &Core,
44    binding: &Arc<dyn OperatorBinding>,
45    sources: &[NodeId],
46    packer: PackerFn,
47) -> Result<OperatorRegistration, OperatorFactoryError> {
48    if sources.is_empty() {
49        return Err(OperatorFactoryError::EmptySources);
50    }
51    let pack_fn = binding.register_packer(packer);
52    let opts = OperatorOpts::default(); // partial: false — gate all deps
53    let node = core.register_operator(sources, OperatorOp::Combine { pack_fn }, opts)?;
54    Ok(OperatorRegistration {
55        node,
56        fn_id: pack_fn,
57    })
58}
59
60/// `with_latest_from(primary, secondary)` — fire-on-primary-only.
61///
62/// Emits a packed pair `[primary, secondary]` only when `primary` (dep[0])
63/// has DATA in the wave. If only `secondary` (dep[1]) fires, settles with
64/// RESOLVED. Matches Phase 10.5 semantics (D021).
65///
66/// First-run gate (`partial: false`) holds until both deps deliver real
67/// DATA (R2.5.3). Post-warmup, fires on primary alone; if secondary has
68/// been invalidated (prev_data == NO_HANDLE), settles with RESOLVED
69/// instead of emitting a stale pair.
70///
71/// # Arguments
72///
73/// - `core` — the Core dispatcher.
74/// - `binding` — implements [`OperatorBinding`] for closure registration.
75/// - `primary` — the triggering dep (dep[0]).
76/// - `secondary` — the sampled dep (dep[1]).
77/// - `packer` — closure that packs 2 `HandleId`s into a pair handle.
78pub fn with_latest_from(
79    core: &Core,
80    binding: &Arc<dyn OperatorBinding>,
81    primary: NodeId,
82    secondary: NodeId,
83    packer: PackerFn,
84) -> OperatorRegistration {
85    let pack_fn = binding.register_packer(packer);
86    let opts = OperatorOpts::default(); // partial: false — gate both deps
87    let node = core
88        .register_operator(
89            &[primary, secondary],
90            OperatorOp::WithLatestFrom { pack_fn },
91            opts,
92        )
93        .expect(
94            "invariant: caller has validated dep ids and seed before calling register_operator",
95        );
96    OperatorRegistration {
97        node,
98        fn_id: pack_fn,
99    }
100}
101
102/// Registration output for [`merge`] — no closure involved (zero FFI).
103#[derive(Copy, Clone, Debug)]
104#[must_use = "the merge operator's NodeId is the value of registering it"]
105pub struct MergeRegistration {
106    pub node: NodeId,
107}
108
109impl MergeRegistration {
110    /// Convenience: extract just the node id.
111    #[must_use]
112    pub fn into_node(self) -> NodeId {
113        self.node
114    }
115}
116
117/// `merge(...sources)` — forward all DATA handles verbatim (zero FFI).
118///
119/// Each dep's DATA is forwarded individually without transformation.
120/// COMPLETE cascades when all deps complete (R1.3.4.b). No binding
121/// call on the fire path — maximally efficient.
122///
123/// # Arguments
124///
125/// - `core` — the Core dispatcher.
126/// - `sources` — N dep node ids.
127///
128/// # Errors
129///
130/// - [`OperatorFactoryError::EmptySources`] — `sources` is empty.
131/// - [`OperatorFactoryError::Register`] — Core-layer registration error
132///   (unknown / terminal-non-resubscribable dep).
133pub fn merge(core: &Core, sources: &[NodeId]) -> Result<MergeRegistration, OperatorFactoryError> {
134    if sources.is_empty() {
135        return Err(OperatorFactoryError::EmptySources);
136    }
137    let opts = OperatorOpts {
138        partial: true, // merge fires as soon as ANY dep delivers
139        ..OperatorOpts::default()
140    };
141    let node = core.register_operator(sources, OperatorOp::Merge, opts)?;
142    Ok(MergeRegistration { node })
143}
144
145/// Convenience: `merge` with an explicit [`FnId`] return for API
146/// consistency. Since merge is zero-FFI, the `FnId` is a sentinel.
147/// Prefer [`merge`] unless you need the [`OperatorRegistration`] shape.
148///
149/// # Errors
150///
151/// Same conditions as [`merge`].
152pub fn merge_as_op(
153    core: &Core,
154    sources: &[NodeId],
155) -> Result<OperatorRegistration, OperatorFactoryError> {
156    let reg = merge(core, sources)?;
157    Ok(OperatorRegistration {
158        node: reg.node,
159        fn_id: FnId::new(0), // sentinel — no closure registered
160    })
161}