Skip to main content

graphrefly_operators/
source.rs

1//! Cold synchronous sources — finite-sequence and terminal-constant
2//! producer factories.
3//!
4//! These are the handle-protocol analogues of TS `extra/sources/iter.ts`:
5//!
6//! - [`from_iter`] — emit each `HandleId` as DATA in order, then COMPLETE.
7//! - [`of`] — convenience wrapper: `from_iter` over a `Vec`.
8//! - [`empty`] — COMPLETE immediately with no DATA.
9//! - [`never`] — silent until teardown (no DATA, no terminal).
10//! - [`throw_error`] — emit ERROR immediately.
11//!
12//! All factories register a **producer node** with no deps. The build
13//! closure fires once on first activation (first subscriber) and emits
14//! synchronously. Deactivation (last subscriber drops) auto-cleans via
15//! the standard `producer_deactivate` path.
16//!
17//! # Handle protocol
18//!
19//! Values live on the binding side; Core sees only opaque `HandleId`.
20//! `from_iter` / `of` take pre-interned `HandleId`s — the caller
21//! (binding layer) must `retain_handle` each handle before passing it
22//! in (the source retains once more per emission and releases the
23//! caller's share on activation). `throw_error` takes a pre-interned
24//! error handle.
25//!
26//! `empty` and `never` need no handles — they are pure lifecycle
27//! sources.
28
29use std::sync::Arc;
30
31use graphrefly_core::{Core, HandleId, NodeId};
32
33use crate::producer::{ProducerBinding, ProducerBuildFn, ProducerCtx};
34
35/// Emit each handle as DATA in order, then COMPLETE. If the iterator
36/// is empty, behaves like [`empty`] (COMPLETE only).
37///
38/// The caller must pre-intern each handle on the binding side and pass
39/// ownership to this factory. Handles are retained once per emission
40/// and the caller's original shares are released during the build
41/// closure (so the caller should NOT release them after calling this).
42#[must_use]
43pub fn from_iter(
44    core: &Core,
45    binding: &Arc<dyn ProducerBinding>,
46    handles: Vec<HandleId>,
47) -> NodeId {
48    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
49        // S2b/D231: build-side only (no spawned sinks) — use the
50        // ctx-borrowed `&Core` directly; no `WeakCore`/`Clone`.
51        let core_s = ctx.core();
52        let binding_s = ctx.core().binding();
53        let pid = ctx.node_id();
54
55        for &h in &handles {
56            // Retain for the emission — Core takes ownership of this share.
57            binding_s.retain_handle(h);
58            core_s.emit_or_defer(pid, h);
59        }
60        // Release the caller's original shares — ownership was
61        // transferred to Core via the retain+emit above.
62        for &h in &handles {
63            binding_s.release_handle(h);
64        }
65        core_s.complete_or_defer(pid);
66    });
67
68    let fn_id = binding.register_producer_build(build);
69    core.register_producer(fn_id)
70        .expect("invariant: register_producer has no deps; no error variants reachable")
71}
72
73/// Emit each handle as DATA in order, then COMPLETE. Convenience
74/// wrapper over [`from_iter`].
75#[must_use]
76pub fn of(core: &Core, binding: &Arc<dyn ProducerBinding>, handles: Vec<HandleId>) -> NodeId {
77    from_iter(core, binding, handles)
78}
79
80/// Complete immediately with no DATA (cold EMPTY analogue).
81#[must_use]
82pub fn empty(core: &Core, binding: &Arc<dyn ProducerBinding>) -> NodeId {
83    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
84        ctx.core().complete_or_defer(ctx.node_id());
85    });
86
87    let fn_id = binding.register_producer_build(build);
88    core.register_producer(fn_id)
89        .expect("invariant: register_producer has no deps; no error variants reachable")
90}
91
92/// Never emit and never complete until teardown (cold NEVER analogue).
93/// The build closure is a no-op — the producer stays active but silent
94/// until the last subscriber drops.
95#[must_use]
96pub fn never(core: &Core, binding: &Arc<dyn ProducerBinding>) -> NodeId {
97    let build: ProducerBuildFn = Box::new(|_ctx: ProducerCtx<'_>| {
98        // Intentionally empty — no emissions, no terminal.
99    });
100
101    let fn_id = binding.register_producer_build(build);
102    core.register_producer(fn_id)
103        .expect("invariant: register_producer has no deps; no error variants reachable")
104}
105
106/// Emit ERROR immediately with the given error handle (cold error source).
107///
108/// The caller must pre-intern the error handle on the binding side.
109/// The handle is retained once for the emission; the caller's original
110/// share is consumed (do NOT release after calling this).
111#[must_use]
112pub fn throw_error(
113    core: &Core,
114    binding: &Arc<dyn ProducerBinding>,
115    error_handle: HandleId,
116) -> NodeId {
117    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
118        let core_s = ctx.core();
119        let binding_s = ctx.core().binding();
120        // Retain for the emission — Core takes ownership.
121        binding_s.retain_handle(error_handle);
122        core_s.error_or_defer(ctx.node_id(), error_handle);
123        // Release the caller's original share.
124        binding_s.release_handle(error_handle);
125    });
126
127    let fn_id = binding.register_producer_build(build);
128    core.register_producer(fn_id)
129        .expect("invariant: register_producer has no deps; no error variants reachable")
130}