graphrefly_operators/flow.rs
1//! Flow operators (Slice C-3, D024) — count / predicate / terminal-aware
2//! gates that bound which `DATA` reaches the downstream output.
3//!
4//! Mirrors the TS shapes in
5//! `~/src/graphrefly-ts/packages/pure-ts/src/extra/operators/take.ts`,
6//! driven by Core dispatch ([`graphrefly_core::OperatorOp::Take`] /
7//! [`Skip`] / [`TakeWhile`] / [`Last`]) instead of derived-fn factories.
8//!
9//! - [`take`] — emits the first `count` DATA values then self-completes.
10//! `count == 0` is allowed (D027): self-completes on first fire with
11//! no `Data`.
12//! - [`skip`] — drops the first `count` DATA values; emits the rest.
13//! - [`take_while`] — emits while `predicate` holds; on first `false`,
14//! emits any preceding passes then self-completes.
15//! - [`last`] / [`last_with_default`] — buffers the latest `DATA`;
16//! emits `Data(latest)` (or `Data(default)` if no DATA arrived and a
17//! default was registered) then `Complete` on upstream COMPLETE.
18//! - [`first`] — sugar for `take(source, 1)`.
19//! - [`find`] — sugar for `take(filter(source, predicate), 1)`.
20//! - [`element_at`] — sugar for `take(skip(source, index), 1)`.
21//!
22//! `take_until(source, notifier)` is intentionally NOT in this slice —
23//! it requires the producer / subscription-managed pattern (D020
24//! category B), out of scope for the Core-dispatch operator family.
25//!
26//! # Refcount discipline
27//!
28//! For [`last_with_default`], the `default` handle ownership transfers
29//! from the caller's binding-side intern into Core's
30//! [`LastState`](graphrefly_core::op_state::LastState) via a retain
31//! taken inside `register_operator`. The caller is expected to retain a
32//! share for themselves if they want to reference the default
33//! elsewhere.
34
35use std::sync::Arc;
36
37use graphrefly_core::{Core, HandleId, NodeId, OperatorOp, OperatorOpts, NO_HANDLE};
38
39use crate::binding::OperatorBinding;
40use crate::error::OperatorFactoryError;
41use crate::transform::{filter, OperatorRegistration};
42
43/// Registration output for flow operators that don't carry a user
44/// closure ([`take`], [`skip`], [`last`], [`last_with_default`]). Zero
45/// FFI on the fire path; only the `node` id matters.
46#[derive(Copy, Clone, Debug)]
47#[must_use = "the flow operator's NodeId is the value of registering it"]
48pub struct FlowRegistration {
49 pub node: NodeId,
50}
51
52impl FlowRegistration {
53 /// Convenience: extract just the node id.
54 #[must_use]
55 pub fn into_node(self) -> NodeId {
56 self.node
57 }
58}
59
60impl From<FlowRegistration> for NodeId {
61 fn from(r: FlowRegistration) -> Self {
62 r.node
63 }
64}
65
66// ---------------------------------------------------------------------
67// Count-based flow: take, skip
68// ---------------------------------------------------------------------
69
70/// `take(source, count)` — emits the first `count` DATA values then
71/// self-completes via `Core::complete`. When upstream completes before
72/// `count` is reached, the standard auto-cascade propagates COMPLETE.
73///
74/// `count == 0` is allowed (D027): the first fire emits zero items
75/// then immediately self-completes — subscribers see `[Start,
76/// Complete]` (no `Dirty` precedes the terminal because no `Data` is
77/// emitted on this wave; the canonical-spec one-DIRTY-per-wave rule
78/// (R1.3.1.a) governs DATA waves, not pure-terminal waves).
79pub fn take(core: &Core, source: NodeId, count: u32) -> FlowRegistration {
80 take_with(core, source, count, OperatorOpts::default())
81}
82
83/// [`take`] with explicit [`OperatorOpts`].
84pub fn take_with(core: &Core, source: NodeId, count: u32, opts: OperatorOpts) -> FlowRegistration {
85 let node = core
86 .register_operator(&[source], OperatorOp::Take { count }, opts)
87 .expect(
88 "invariant: caller has validated dep ids and seed before calling register_operator",
89 );
90 FlowRegistration { node }
91}
92
93/// `skip(source, count)` — drops the first `count` DATA values; once
94/// the threshold is crossed, subsequent DATAs pass through verbatim.
95/// On a wave where every input is still in the skip window, settles
96/// `[Dirty, Resolved]` (D018 pattern).
97pub fn skip(core: &Core, source: NodeId, count: u32) -> FlowRegistration {
98 skip_with(core, source, count, OperatorOpts::default())
99}
100
101/// [`skip`] with explicit [`OperatorOpts`].
102pub fn skip_with(core: &Core, source: NodeId, count: u32, opts: OperatorOpts) -> FlowRegistration {
103 let node = core
104 .register_operator(&[source], OperatorOp::Skip { count }, opts)
105 .expect(
106 "invariant: caller has validated dep ids and seed before calling register_operator",
107 );
108 FlowRegistration { node }
109}
110
111// ---------------------------------------------------------------------
112// Predicate-based flow: take_while
113// ---------------------------------------------------------------------
114
115/// `take_while(source, predicate)` — emits while `predicate(input)`
116/// holds; on the first `false`, emits any preceding passes from the
117/// same batch then self-completes. Reuses
118/// [`BindingBoundary::predicate_each`](graphrefly_core::BindingBoundary::predicate_each)
119/// (D029).
120pub fn take_while<F>(
121 core: &Core,
122 binding: &Arc<dyn OperatorBinding>,
123 source: NodeId,
124 predicate: F,
125) -> OperatorRegistration
126where
127 F: Fn(HandleId) -> bool + Send + Sync + 'static,
128{
129 take_while_with(core, binding, source, predicate, OperatorOpts::default())
130}
131
132/// [`take_while`] with explicit [`OperatorOpts`].
133pub fn take_while_with<F>(
134 core: &Core,
135 binding: &Arc<dyn OperatorBinding>,
136 source: NodeId,
137 predicate: F,
138 opts: OperatorOpts,
139) -> OperatorRegistration
140where
141 F: Fn(HandleId) -> bool + Send + Sync + 'static,
142{
143 let fn_id = binding.register_predicate(Box::new(predicate));
144 let node = core
145 .register_operator(&[source], OperatorOp::TakeWhile { fn_id }, opts)
146 .expect(
147 "invariant: caller has validated dep ids and seed before calling register_operator",
148 );
149 OperatorRegistration { node, fn_id }
150}
151
152// ---------------------------------------------------------------------
153// Terminal-aware flow: last, last_with_default
154// ---------------------------------------------------------------------
155
156/// `last(source)` — buffers the latest DATA; emits `Data(latest)` then
157/// `Complete` on upstream COMPLETE. On empty stream (no DATA arrived),
158/// emits only `Complete` — subscribers see `[Start, Complete]`. For a
159/// fallback value on empty streams, use [`last_with_default`].
160///
161/// Opts out of Lock 2.B auto-cascade so it can intercept upstream
162/// COMPLETE (same pattern as [`reduce`](crate::transform::reduce)).
163pub fn last(core: &Core, source: NodeId) -> FlowRegistration {
164 last_with(core, source, OperatorOpts::default())
165}
166
167/// [`last`] with explicit [`OperatorOpts`].
168pub fn last_with(core: &Core, source: NodeId, opts: OperatorOpts) -> FlowRegistration {
169 let node = core
170 .register_operator(&[source], OperatorOp::Last { default: NO_HANDLE }, opts)
171 .expect("invariant: caller has validated dep id before calling last()");
172 FlowRegistration { node }
173}
174
175/// `last_with_default(source, default)` — buffers the latest DATA;
176/// emits `Data(latest)` then `Complete` on upstream COMPLETE. On empty
177/// stream (no DATA arrived), emits `Data(default)` then `Complete`.
178///
179/// Core takes one retain on `default` for the
180/// [`LastState`](graphrefly_core::op_state::LastState)'s lifetime; the
181/// caller should retain a share for themselves if they want to
182/// reference the handle elsewhere.
183///
184/// # Errors
185///
186/// - [`OperatorFactoryError::ZeroDefault`] — `default == NO_HANDLE`. Use
187/// [`last`] for the no-default behavior instead.
188/// - [`OperatorFactoryError::Register`] — Core-layer registration error
189/// (unknown / terminal-non-resubscribable `source`).
190pub fn last_with_default(
191 core: &Core,
192 source: NodeId,
193 default: HandleId,
194) -> Result<FlowRegistration, OperatorFactoryError> {
195 last_with_default_with(core, source, default, OperatorOpts::default())
196}
197
198/// [`last_with_default`] with explicit [`OperatorOpts`].
199///
200/// # Errors
201///
202/// Same conditions as [`last_with_default`].
203pub fn last_with_default_with(
204 core: &Core,
205 source: NodeId,
206 default: HandleId,
207 opts: OperatorOpts,
208) -> Result<FlowRegistration, OperatorFactoryError> {
209 if default == NO_HANDLE {
210 return Err(OperatorFactoryError::ZeroDefault);
211 }
212 let node = core.register_operator(&[source], OperatorOp::Last { default }, opts)?;
213 Ok(FlowRegistration { node })
214}
215
216// ---------------------------------------------------------------------
217// Sugar: first, find, element_at (compositions of take / skip / filter)
218// ---------------------------------------------------------------------
219
220/// `first(source)` — emits the first DATA then `Complete`. Sugar for
221/// `take(source, 1)`.
222pub fn first(core: &Core, source: NodeId) -> FlowRegistration {
223 take(core, source, 1)
224}
225
226/// `find(source, predicate)` — emits the first DATA matching `predicate`
227/// then `Complete`. Sugar for `take(filter(source, predicate), 1)`.
228pub fn find<F>(
229 core: &Core,
230 binding: &Arc<dyn OperatorBinding>,
231 source: NodeId,
232 predicate: F,
233) -> FlowRegistration
234where
235 F: Fn(HandleId) -> bool + Send + Sync + 'static,
236{
237 let filtered = filter(core, binding, source, predicate);
238 take(core, filtered.node, 1)
239}
240
241/// `element_at(source, index)` — emits the `index`th DATA (zero-based)
242/// then `Complete`. Sugar for `take(skip(source, index), 1)`.
243pub fn element_at(core: &Core, source: NodeId, index: u32) -> FlowRegistration {
244 let skipped = skip(core, source, index);
245 take(core, skipped.node, 1)
246}