graphrefly_operators/source.rs
1//! Cold synchronous sources — finite-sequence and terminal-constant
2//! producer factories.
3//!
4//! These are the handle-protocol analogues of TS `extra/sources/iter.ts`:
5//!
6//! - [`from_iter`] — emit each `HandleId` as DATA in order, then COMPLETE.
7//! - [`of`] — convenience wrapper: `from_iter` over a `Vec`.
8//! - [`empty`] — COMPLETE immediately with no DATA.
9//! - [`never`] — silent until teardown (no DATA, no terminal).
10//! - [`throw_error`] — emit ERROR immediately.
11//!
12//! All factories register a **producer node** with no deps. The build
13//! closure fires once on first activation (first subscriber) and emits
14//! synchronously. Deactivation (last subscriber drops) auto-cleans via
15//! the standard `producer_deactivate` path.
16//!
17//! # Handle protocol
18//!
19//! Values live on the binding side; Core sees only opaque `HandleId`.
20//! `from_iter` / `of` take pre-interned `HandleId`s — the caller
21//! (binding layer) must `retain_handle` each handle before passing it
22//! in (the source retains once more per emission and releases the
23//! caller's share on activation). `throw_error` takes a pre-interned
24//! error handle.
25//!
26//! `empty` and `never` need no handles — they are pure lifecycle
27//! sources.
28
29use std::sync::{Arc, Weak};
30
31use graphrefly_core::{Core, HandleId, NodeId};
32
33use crate::producer::{ProducerBinding, ProducerBuildFn, ProducerCtx};
34
35/// Emit each handle as DATA in order, then COMPLETE. If the iterator
36/// is empty, behaves like [`empty`] (COMPLETE only).
37///
38/// The caller must pre-intern each handle on the binding side and pass
39/// ownership to this factory. Handles are retained once per emission
40/// and the caller's original shares are released during the build
41/// closure (so the caller should NOT release them after calling this).
42#[must_use]
43pub fn from_iter(
44 core: &Core,
45 binding: &Arc<dyn ProducerBinding>,
46 handles: Vec<HandleId>,
47) -> NodeId {
48 let core_weak = core.weak_handle();
49 let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
50
51 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
52 let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
53 return;
54 };
55 let pid = ctx.node_id();
56
57 for &h in &handles {
58 // Retain for the emission — Core takes ownership of this share.
59 binding_s.retain_handle(h);
60 core_s.emit_or_defer(pid, h);
61 }
62 // Release the caller's original shares — ownership was
63 // transferred to Core via the retain+emit above.
64 for &h in &handles {
65 binding_s.release_handle(h);
66 }
67 core_s.complete_or_defer(pid);
68 });
69
70 let fn_id = binding.register_producer_build(build);
71 core.register_producer(fn_id)
72 .expect("invariant: register_producer has no deps; no error variants reachable")
73}
74
75/// Emit each handle as DATA in order, then COMPLETE. Convenience
76/// wrapper over [`from_iter`].
77#[must_use]
78pub fn of(core: &Core, binding: &Arc<dyn ProducerBinding>, handles: Vec<HandleId>) -> NodeId {
79 from_iter(core, binding, handles)
80}
81
82/// Complete immediately with no DATA (cold EMPTY analogue).
83#[must_use]
84pub fn empty(core: &Core, binding: &Arc<dyn ProducerBinding>) -> NodeId {
85 let core_weak = core.weak_handle();
86
87 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
88 let Some(core_s) = core_weak.upgrade() else {
89 return;
90 };
91 core_s.complete_or_defer(ctx.node_id());
92 });
93
94 let fn_id = binding.register_producer_build(build);
95 core.register_producer(fn_id)
96 .expect("invariant: register_producer has no deps; no error variants reachable")
97}
98
99/// Never emit and never complete until teardown (cold NEVER analogue).
100/// The build closure is a no-op — the producer stays active but silent
101/// until the last subscriber drops.
102#[must_use]
103pub fn never(core: &Core, binding: &Arc<dyn ProducerBinding>) -> NodeId {
104 let build: ProducerBuildFn = Box::new(|_ctx: ProducerCtx<'_>| {
105 // Intentionally empty — no emissions, no terminal.
106 });
107
108 let fn_id = binding.register_producer_build(build);
109 core.register_producer(fn_id)
110 .expect("invariant: register_producer has no deps; no error variants reachable")
111}
112
113/// Emit ERROR immediately with the given error handle (cold error source).
114///
115/// The caller must pre-intern the error handle on the binding side.
116/// The handle is retained once for the emission; the caller's original
117/// share is consumed (do NOT release after calling this).
118#[must_use]
119pub fn throw_error(
120 core: &Core,
121 binding: &Arc<dyn ProducerBinding>,
122 error_handle: HandleId,
123) -> NodeId {
124 let core_weak = core.weak_handle();
125 let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
126
127 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
128 let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
129 return;
130 };
131 // Retain for the emission — Core takes ownership.
132 binding_s.retain_handle(error_handle);
133 core_s.error_or_defer(ctx.node_id(), error_handle);
134 // Release the caller's original share.
135 binding_s.release_handle(error_handle);
136 });
137
138 let fn_id = binding.register_producer_build(build);
139 core.register_producer(fn_id)
140 .expect("invariant: register_producer has no deps; no error variants reachable")
141}