Skip to main content

graphrefly_operators/
source.rs

1//! Cold synchronous sources — finite-sequence and terminal-constant
2//! producer factories.
3//!
4//! These are the handle-protocol analogues of TS `extra/sources/iter.ts`:
5//!
6//! - [`from_iter`] — emit each `HandleId` as DATA in order, then COMPLETE.
7//! - [`of`] — convenience wrapper: `from_iter` over a `Vec`.
8//! - [`empty`] — COMPLETE immediately with no DATA.
9//! - [`never`] — silent until teardown (no DATA, no terminal).
10//! - [`throw_error`] — emit ERROR immediately.
11//!
12//! All factories register a **producer node** with no deps. The build
13//! closure fires once on first activation (first subscriber) and emits
14//! synchronously. Deactivation (last subscriber drops) auto-cleans via
15//! the standard `producer_deactivate` path.
16//!
17//! # Handle protocol
18//!
19//! Values live on the binding side; Core sees only opaque `HandleId`.
20//! `from_iter` / `of` take pre-interned `HandleId`s — the caller
21//! (binding layer) must `retain_handle` each handle before passing it
22//! in (the source retains once more per emission and releases the
23//! caller's share on activation). `throw_error` takes a pre-interned
24//! error handle.
25//!
26//! `empty` and `never` need no handles — they are pure lifecycle
27//! sources.
28
29use std::sync::{Arc, Weak};
30
31use graphrefly_core::{Core, HandleId, NodeId};
32
33use crate::producer::{ProducerBinding, ProducerBuildFn, ProducerCtx};
34
35/// Emit each handle as DATA in order, then COMPLETE. If the iterator
36/// is empty, behaves like [`empty`] (COMPLETE only).
37///
38/// The caller must pre-intern each handle on the binding side and pass
39/// ownership to this factory. Handles are retained once per emission
40/// and the caller's original shares are released during the build
41/// closure (so the caller should NOT release them after calling this).
42#[must_use]
43pub fn from_iter(
44    core: &Core,
45    binding: &Arc<dyn ProducerBinding>,
46    handles: Vec<HandleId>,
47) -> NodeId {
48    let core_weak = core.weak_handle();
49    let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
50
51    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
52        let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
53            return;
54        };
55        let pid = ctx.node_id();
56
57        for &h in &handles {
58            // Retain for the emission — Core takes ownership of this share.
59            binding_s.retain_handle(h);
60            core_s.emit_or_defer(pid, h);
61        }
62        // Release the caller's original shares — ownership was
63        // transferred to Core via the retain+emit above.
64        for &h in &handles {
65            binding_s.release_handle(h);
66        }
67        core_s.complete_or_defer(pid);
68    });
69
70    let fn_id = binding.register_producer_build(build);
71    core.register_producer(fn_id)
72        .expect("invariant: register_producer has no deps; no error variants reachable")
73}
74
75/// Emit each handle as DATA in order, then COMPLETE. Convenience
76/// wrapper over [`from_iter`].
77#[must_use]
78pub fn of(core: &Core, binding: &Arc<dyn ProducerBinding>, handles: Vec<HandleId>) -> NodeId {
79    from_iter(core, binding, handles)
80}
81
82/// Complete immediately with no DATA (cold EMPTY analogue).
83#[must_use]
84pub fn empty(core: &Core, binding: &Arc<dyn ProducerBinding>) -> NodeId {
85    let core_weak = core.weak_handle();
86
87    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
88        let Some(core_s) = core_weak.upgrade() else {
89            return;
90        };
91        core_s.complete_or_defer(ctx.node_id());
92    });
93
94    let fn_id = binding.register_producer_build(build);
95    core.register_producer(fn_id)
96        .expect("invariant: register_producer has no deps; no error variants reachable")
97}
98
99/// Never emit and never complete until teardown (cold NEVER analogue).
100/// The build closure is a no-op — the producer stays active but silent
101/// until the last subscriber drops.
102#[must_use]
103pub fn never(core: &Core, binding: &Arc<dyn ProducerBinding>) -> NodeId {
104    let build: ProducerBuildFn = Box::new(|_ctx: ProducerCtx<'_>| {
105        // Intentionally empty — no emissions, no terminal.
106    });
107
108    let fn_id = binding.register_producer_build(build);
109    core.register_producer(fn_id)
110        .expect("invariant: register_producer has no deps; no error variants reachable")
111}
112
113/// Emit ERROR immediately with the given error handle (cold error source).
114///
115/// The caller must pre-intern the error handle on the binding side.
116/// The handle is retained once for the emission; the caller's original
117/// share is consumed (do NOT release after calling this).
118#[must_use]
119pub fn throw_error(
120    core: &Core,
121    binding: &Arc<dyn ProducerBinding>,
122    error_handle: HandleId,
123) -> NodeId {
124    let core_weak = core.weak_handle();
125    let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
126
127    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
128        let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
129            return;
130        };
131        // Retain for the emission — Core takes ownership.
132        binding_s.retain_handle(error_handle);
133        core_s.error_or_defer(ctx.node_id(), error_handle);
134        // Release the caller's original share.
135        binding_s.release_handle(error_handle);
136    });
137
138    let fn_id = binding.register_producer_build(build);
139    core.register_producer(fn_id)
140        .expect("invariant: register_producer has no deps; no error variants reachable")
141}