Skip to main content

graphrefly_operators/
flow.rs

1//! Flow operators (Slice C-3, D024) — count / predicate / terminal-aware
2//! gates that bound which `DATA` reaches the downstream output.
3//!
4//! Mirrors the TS shapes in
5//! `~/src/graphrefly-ts/packages/pure-ts/src/extra/operators/take.ts`,
6//! driven by Core dispatch ([`graphrefly_core::OperatorOp::Take`] /
7//! [`Skip`] / [`TakeWhile`] / [`Last`]) instead of derived-fn factories.
8//!
9//! - [`take`] — emits the first `count` DATA values then self-completes.
10//!   `count == 0` is allowed (D027): self-completes on first fire with
11//!   no `Data`.
12//! - [`skip`] — drops the first `count` DATA values; emits the rest.
13//! - [`take_while`] — emits while `predicate` holds; on first `false`,
14//!   emits any preceding passes then self-completes.
15//! - [`last`] / [`last_with_default`] — buffers the latest `DATA`;
16//!   emits `Data(latest)` (or `Data(default)` if no DATA arrived and a
17//!   default was registered) then `Complete` on upstream COMPLETE.
18//! - [`first`] — sugar for `take(source, 1)`.
19//! - [`find`] — sugar for `take(filter(source, predicate), 1)`.
20//! - [`element_at`] — sugar for `take(skip(source, index), 1)`.
21//!
22//! `take_until(source, notifier)` is intentionally NOT in this slice —
23//! it requires the producer / subscription-managed pattern (D020
24//! category B), out of scope for the Core-dispatch operator family.
25//!
26//! # Refcount discipline
27//!
28//! For [`last_with_default`], the `default` handle ownership transfers
29//! from the caller's binding-side intern into Core's
30//! [`LastState`](graphrefly_core::op_state::LastState) via a retain
31//! taken inside `register_operator`. The caller is expected to retain a
32//! share for themselves if they want to reference the default
33//! elsewhere.
34
35use std::sync::Arc;
36
37use graphrefly_core::{Core, HandleId, NodeId, OperatorOp, OperatorOpts, NO_HANDLE};
38
39use crate::binding::OperatorBinding;
40use crate::error::OperatorFactoryError;
41use crate::transform::{filter, OperatorRegistration};
42
43/// Registration output for flow operators that don't carry a user
44/// closure ([`take`], [`skip`], [`last`], [`last_with_default`]). Zero
45/// FFI on the fire path; only the `node` id matters.
46#[derive(Copy, Clone, Debug)]
47#[must_use = "the flow operator's NodeId is the value of registering it"]
48pub struct FlowRegistration {
49    pub node: NodeId,
50}
51
52impl FlowRegistration {
53    /// Convenience: extract just the node id.
54    #[must_use]
55    pub fn into_node(self) -> NodeId {
56        self.node
57    }
58}
59
60impl From<FlowRegistration> for NodeId {
61    fn from(r: FlowRegistration) -> Self {
62        r.node
63    }
64}
65
66// ---------------------------------------------------------------------
67// Count-based flow: take, skip
68// ---------------------------------------------------------------------
69
70/// `take(source, count)` — emits the first `count` DATA values then
71/// self-completes via `Core::complete`. When upstream completes before
72/// `count` is reached, the standard auto-cascade propagates COMPLETE.
73///
74/// `count == 0` is allowed (D027): the first fire emits zero items
75/// then immediately self-completes — subscribers see `[Start,
76/// Complete]` (no `Dirty` precedes the terminal because no `Data` is
77/// emitted on this wave; the canonical-spec one-DIRTY-per-wave rule
78/// (R1.3.1.a) governs DATA waves, not pure-terminal waves).
79pub fn take(core: &Core, source: NodeId, count: u32) -> FlowRegistration {
80    take_with(core, source, count, OperatorOpts::default())
81}
82
83/// [`take`] with explicit [`OperatorOpts`].
84pub fn take_with(core: &Core, source: NodeId, count: u32, opts: OperatorOpts) -> FlowRegistration {
85    let node = core
86        .register_operator(&[source], OperatorOp::Take { count }, opts)
87        .expect(
88            "invariant: caller has validated dep ids and seed before calling register_operator",
89        );
90    FlowRegistration { node }
91}
92
93/// `skip(source, count)` — drops the first `count` DATA values; once
94/// the threshold is crossed, subsequent DATAs pass through verbatim.
95/// On a wave where every input is still in the skip window, settles
96/// `[Dirty, Resolved]` (D018 pattern).
97pub fn skip(core: &Core, source: NodeId, count: u32) -> FlowRegistration {
98    skip_with(core, source, count, OperatorOpts::default())
99}
100
101/// [`skip`] with explicit [`OperatorOpts`].
102pub fn skip_with(core: &Core, source: NodeId, count: u32, opts: OperatorOpts) -> FlowRegistration {
103    let node = core
104        .register_operator(&[source], OperatorOp::Skip { count }, opts)
105        .expect(
106            "invariant: caller has validated dep ids and seed before calling register_operator",
107        );
108    FlowRegistration { node }
109}
110
111// ---------------------------------------------------------------------
112// Predicate-based flow: take_while
113// ---------------------------------------------------------------------
114
115/// `take_while(source, predicate)` — emits while `predicate(input)`
116/// holds; on the first `false`, emits any preceding passes from the
117/// same batch then self-completes. Reuses
118/// [`BindingBoundary::predicate_each`](graphrefly_core::BindingBoundary::predicate_each)
119/// (D029).
120pub fn take_while<F>(
121    core: &Core,
122    binding: &Arc<dyn OperatorBinding>,
123    source: NodeId,
124    predicate: F,
125) -> OperatorRegistration
126where
127    F: Fn(HandleId) -> bool + Send + Sync + 'static,
128{
129    take_while_with(core, binding, source, predicate, OperatorOpts::default())
130}
131
132/// [`take_while`] with explicit [`OperatorOpts`].
133pub fn take_while_with<F>(
134    core: &Core,
135    binding: &Arc<dyn OperatorBinding>,
136    source: NodeId,
137    predicate: F,
138    opts: OperatorOpts,
139) -> OperatorRegistration
140where
141    F: Fn(HandleId) -> bool + Send + Sync + 'static,
142{
143    let fn_id = binding.register_predicate(Box::new(predicate));
144    let node = core
145        .register_operator(&[source], OperatorOp::TakeWhile { fn_id }, opts)
146        .expect(
147            "invariant: caller has validated dep ids and seed before calling register_operator",
148        );
149    OperatorRegistration { node, fn_id }
150}
151
152// ---------------------------------------------------------------------
153// Terminal-aware flow: last, last_with_default
154// ---------------------------------------------------------------------
155
156/// `last(source)` — buffers the latest DATA; emits `Data(latest)` then
157/// `Complete` on upstream COMPLETE. On empty stream (no DATA arrived),
158/// emits only `Complete` — subscribers see `[Start, Complete]`. For a
159/// fallback value on empty streams, use [`last_with_default`].
160///
161/// Opts out of Lock 2.B auto-cascade so it can intercept upstream
162/// COMPLETE (same pattern as [`reduce`](crate::transform::reduce)).
163pub fn last(core: &Core, source: NodeId) -> FlowRegistration {
164    last_with(core, source, OperatorOpts::default())
165}
166
167/// [`last`] with explicit [`OperatorOpts`].
168pub fn last_with(core: &Core, source: NodeId, opts: OperatorOpts) -> FlowRegistration {
169    let node = core
170        .register_operator(&[source], OperatorOp::Last { default: NO_HANDLE }, opts)
171        .expect("invariant: caller has validated dep id before calling last()");
172    FlowRegistration { node }
173}
174
175/// `last_with_default(source, default)` — buffers the latest DATA;
176/// emits `Data(latest)` then `Complete` on upstream COMPLETE. On empty
177/// stream (no DATA arrived), emits `Data(default)` then `Complete`.
178///
179/// Core takes one retain on `default` for the
180/// [`LastState`](graphrefly_core::op_state::LastState)'s lifetime; the
181/// caller should retain a share for themselves if they want to
182/// reference the handle elsewhere.
183///
184/// # Errors
185///
186/// - [`OperatorFactoryError::ZeroDefault`] — `default == NO_HANDLE`. Use
187///   [`last`] for the no-default behavior instead.
188/// - [`OperatorFactoryError::Register`] — Core-layer registration error
189///   (unknown / terminal-non-resubscribable `source`).
190pub fn last_with_default(
191    core: &Core,
192    source: NodeId,
193    default: HandleId,
194) -> Result<FlowRegistration, OperatorFactoryError> {
195    last_with_default_with(core, source, default, OperatorOpts::default())
196}
197
198/// [`last_with_default`] with explicit [`OperatorOpts`].
199///
200/// # Errors
201///
202/// Same conditions as [`last_with_default`].
203pub fn last_with_default_with(
204    core: &Core,
205    source: NodeId,
206    default: HandleId,
207    opts: OperatorOpts,
208) -> Result<FlowRegistration, OperatorFactoryError> {
209    if default == NO_HANDLE {
210        return Err(OperatorFactoryError::ZeroDefault);
211    }
212    let node = core.register_operator(&[source], OperatorOp::Last { default }, opts)?;
213    Ok(FlowRegistration { node })
214}
215
216// ---------------------------------------------------------------------
217// Sugar: first, find, element_at (compositions of take / skip / filter)
218// ---------------------------------------------------------------------
219
220/// `first(source)` — emits the first DATA then `Complete`. Sugar for
221/// `take(source, 1)`.
222pub fn first(core: &Core, source: NodeId) -> FlowRegistration {
223    take(core, source, 1)
224}
225
226/// `find(source, predicate)` — emits the first DATA matching `predicate`
227/// then `Complete`. Sugar for `take(filter(source, predicate), 1)`.
228pub fn find<F>(
229    core: &Core,
230    binding: &Arc<dyn OperatorBinding>,
231    source: NodeId,
232    predicate: F,
233) -> FlowRegistration
234where
235    F: Fn(HandleId) -> bool + Send + Sync + 'static,
236{
237    let filtered = filter(core, binding, source, predicate);
238    take(core, filtered.node, 1)
239}
240
241/// `element_at(source, index)` — emits the `index`th DATA (zero-based)
242/// then `Complete`. Sugar for `take(skip(source, index), 1)`.
243pub fn element_at(core: &Core, source: NodeId, index: u32) -> FlowRegistration {
244    let skipped = skip(core, source, index);
245    take(core, skipped.node, 1)
246}