#![allow(clippy::arc_with_non_send_sync)]
#![allow(clippy::too_many_lines, clippy::items_after_statements)]
use std::sync::Arc;
use parking_lot::Mutex;
use graphrefly_core::{BindingBoundary, Core, FnId, HandleId, NodeId, Sink};
use smallvec::SmallVec;
use crate::producer::{ProducerBinding, ProducerBuildFn, ProducerCtx, SubscribeOutcome};
#[must_use]
pub fn tap(core: &Core, binding: &Arc<dyn ProducerBinding>, source: NodeId, fn_id: FnId) -> NodeId {
let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
let core_s = ctx.core();
let binding_s = ctx.core().binding();
let em = ctx.emitter();
let pid = ctx.node_id();
let bb: Arc<dyn BindingBoundary> = binding_s.clone();
let core_sink = em.clone();
let source_sink: Sink = Arc::new(move |msgs| {
enum Act {
EmitAndTap(HandleId),
Complete,
Error(HandleId),
}
let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
for m in msgs {
match m.tier() {
3 => {
if let Some(h) = m.payload_handle() {
bb.retain_handle(h);
actions.push(Act::EmitAndTap(h));
}
}
5 => {
if let Some(h) = m.payload_handle() {
bb.retain_handle(h);
actions.push(Act::Error(h));
} else {
actions.push(Act::Complete);
}
}
_ => {}
}
}
for a in actions {
match a {
Act::EmitAndTap(h) => {
bb.invoke_tap_fn(fn_id, h);
core_sink.emit_or_defer(pid, h);
}
Act::Complete => core_sink.complete_or_defer(pid),
Act::Error(h) => core_sink.error_or_defer(pid, h),
}
}
});
let outcome = ctx.subscribe_to(source, source_sink);
if matches!(outcome, SubscribeOutcome::Dead { .. }) {
core_s.complete_or_defer(pid);
}
});
let fn_id_reg = binding.register_producer_build(build);
core.register_producer(fn_id_reg)
.expect("tap: register_producer failed")
}
#[must_use]
pub fn tap_observer(
core: &Core,
binding: &Arc<dyn ProducerBinding>,
source: NodeId,
data_fn_id: Option<FnId>,
error_fn_id: Option<FnId>,
complete_fn_id: Option<FnId>,
) -> NodeId {
let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
let core_s = ctx.core();
let binding_s = ctx.core().binding();
let em = ctx.emitter();
let pid = ctx.node_id();
let bb: Arc<dyn BindingBoundary> = binding_s.clone();
let core_sink = em.clone();
let source_sink: Sink = Arc::new(move |msgs| {
enum Act {
Emit(HandleId),
Complete,
Error(HandleId),
}
let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
for m in msgs {
match m.tier() {
3 => {
if let Some(h) = m.payload_handle() {
bb.retain_handle(h);
actions.push(Act::Emit(h));
}
}
5 => {
if let Some(h) = m.payload_handle() {
bb.retain_handle(h);
actions.push(Act::Error(h));
} else {
actions.push(Act::Complete);
}
}
_ => {}
}
}
for a in actions {
match a {
Act::Emit(h) => {
if let Some(fid) = data_fn_id {
bb.invoke_tap_fn(fid, h);
}
core_sink.emit_or_defer(pid, h);
}
Act::Complete => {
if let Some(fid) = complete_fn_id {
bb.invoke_tap_complete_fn(fid);
}
core_sink.complete_or_defer(pid);
}
Act::Error(h) => {
if let Some(fid) = error_fn_id {
bb.invoke_tap_error_fn(fid, h);
}
core_sink.error_or_defer(pid, h);
}
}
}
});
let outcome = ctx.subscribe_to(source, source_sink);
if matches!(outcome, SubscribeOutcome::Dead { .. }) {
if let Some(fid) = complete_fn_id {
binding_s.invoke_tap_complete_fn(fid);
}
core_s.complete_or_defer(pid);
}
});
let fn_id = binding.register_producer_build(build);
core.register_producer(fn_id)
.expect("tap_observer: register_producer failed")
}
#[must_use]
pub fn on_first_data(
core: &Core,
binding: &Arc<dyn ProducerBinding>,
source: NodeId,
fn_id: FnId,
) -> NodeId {
struct OnFirstState {
fired: bool,
}
let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
let core_s = ctx.core();
let binding_s = ctx.core().binding();
let em = ctx.emitter();
let pid = ctx.node_id();
let bb: Arc<dyn BindingBoundary> = binding_s.clone();
let core_sink = em.clone();
let state: Arc<Mutex<OnFirstState>> = Arc::new(Mutex::new(OnFirstState { fired: false }));
let source_sink: Sink = Arc::new(move |msgs| {
enum Act {
EmitWithTap(HandleId),
Emit(HandleId),
Complete,
Error(HandleId),
}
let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
{
let mut s = state.lock();
for m in msgs {
match m.tier() {
3 => {
if let Some(h) = m.payload_handle() {
bb.retain_handle(h);
if s.fired {
actions.push(Act::Emit(h));
} else {
s.fired = true;
actions.push(Act::EmitWithTap(h));
}
}
}
5 => {
if let Some(h) = m.payload_handle() {
bb.retain_handle(h);
actions.push(Act::Error(h));
} else {
actions.push(Act::Complete);
}
}
_ => {}
}
}
}
for a in actions {
match a {
Act::EmitWithTap(h) => {
bb.invoke_tap_fn(fn_id, h);
core_sink.emit_or_defer(pid, h);
}
Act::Emit(h) => core_sink.emit_or_defer(pid, h),
Act::Complete => core_sink.complete_or_defer(pid),
Act::Error(h) => core_sink.error_or_defer(pid, h),
}
}
});
let outcome = ctx.subscribe_to(source, source_sink);
if matches!(outcome, SubscribeOutcome::Dead { .. }) {
core_s.complete_or_defer(pid);
}
});
let fn_id_reg = binding.register_producer_build(build);
core.register_producer(fn_id_reg)
.expect("on_first_data: register_producer failed")
}
#[must_use]
pub fn rescue(
core: &Core,
binding: &Arc<dyn ProducerBinding>,
source: NodeId,
fn_id: FnId,
) -> NodeId {
let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
let core_s = ctx.core();
let binding_s = ctx.core().binding();
let em = ctx.emitter();
let pid = ctx.node_id();
let bb: Arc<dyn BindingBoundary> = binding_s.clone();
let core_sink = em.clone();
let source_sink: Sink = Arc::new(move |msgs| {
enum Act {
Emit(HandleId),
Complete,
TryRescue(HandleId),
}
let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
for m in msgs {
match m.tier() {
3 => {
if let Some(h) = m.payload_handle() {
bb.retain_handle(h);
actions.push(Act::Emit(h));
}
}
5 => {
if let Some(h) = m.payload_handle() {
bb.retain_handle(h);
actions.push(Act::TryRescue(h));
} else {
actions.push(Act::Complete);
}
}
_ => {}
}
}
for a in actions {
match a {
Act::Emit(h) => core_sink.emit_or_defer(pid, h),
Act::Complete => core_sink.complete_or_defer(pid),
Act::TryRescue(err_h) => {
match bb.invoke_rescue_fn(fn_id, err_h) {
Ok(recovered_h) => {
bb.release_handle(err_h);
core_sink.emit_or_defer(pid, recovered_h);
}
Err(()) => {
core_sink.error_or_defer(pid, err_h);
}
}
}
}
}
});
let outcome = ctx.subscribe_to(source, source_sink);
if matches!(outcome, SubscribeOutcome::Dead { .. }) {
core_s.complete_or_defer(pid);
}
});
let fn_id_reg = binding.register_producer_build(build);
core.register_producer(fn_id_reg)
.expect("rescue: register_producer failed")
}
#[must_use]
pub fn valve(
core: &Core,
binding: &Arc<dyn ProducerBinding>,
source: NodeId,
control: NodeId,
gate_fn_id: FnId,
cancel: Option<tokio_util::sync::CancellationToken>,
) -> NodeId {
struct ValveState {
open: bool,
terminated: bool,
}
let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
let core_s = ctx.core();
let binding_s = ctx.core().binding();
let em = ctx.emitter();
let pid = ctx.node_id();
let state: Arc<Mutex<ValveState>> = Arc::new(Mutex::new(ValveState {
open: false,
terminated: false,
}));
let st_ctrl = state.clone();
let bb_ctrl: Arc<dyn BindingBoundary> = binding_s.clone();
let core_ctrl = em.clone();
let cancel_ctrl = cancel.clone();
let control_sink: Sink = Arc::new(move |msgs| {
let mut should_cancel = false;
let mut error_action: Option<HandleId> = None;
{
let mut s = st_ctrl.lock();
if s.terminated {
return;
}
for m in msgs {
match m.tier() {
3 => {
if let Some(h) = m.payload_handle() {
let results = bb_ctrl.predicate_each(gate_fn_id, &[h]);
let new_open = results.first().copied().unwrap_or(false);
let was_open = s.open;
s.open = new_open;
if was_open && !new_open && cancel_ctrl.is_some() {
should_cancel = true;
}
}
}
5 => {
if let Some(h) = m.payload_handle() {
if !s.terminated {
s.terminated = true;
bb_ctrl.retain_handle(h);
error_action = Some(h);
}
}
}
_ => {}
}
}
}
if should_cancel {
if let Some(ref ct) = cancel_ctrl {
ct.cancel();
}
}
if let Some(h) = error_action {
core_ctrl.error_or_defer(pid, h);
}
});
let ctrl_outcome = ctx.subscribe_to(control, control_sink);
if matches!(ctrl_outcome, SubscribeOutcome::Dead { .. }) {
}
let st_src = state.clone();
let bb_src: Arc<dyn BindingBoundary> = binding_s.clone();
let core_src = em.clone();
let source_sink: Sink = Arc::new(move |msgs| {
enum Act {
Emit(HandleId),
Complete,
Error(HandleId),
}
let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
{
let s = st_src.lock();
if s.terminated {
return;
}
for m in msgs {
match m.tier() {
3 => {
if let Some(h) = m.payload_handle() {
if s.open {
bb_src.retain_handle(h);
actions.push(Act::Emit(h));
}
}
}
5 => {
if let Some(h) = m.payload_handle() {
bb_src.retain_handle(h);
actions.push(Act::Error(h));
} else {
actions.push(Act::Complete);
}
}
_ => {}
}
}
}
for a in actions {
match a {
Act::Emit(h) => core_src.emit_or_defer(pid, h),
Act::Complete => core_src.complete_or_defer(pid),
Act::Error(h) => core_src.error_or_defer(pid, h),
}
}
});
let src_outcome = ctx.subscribe_to(source, source_sink);
if matches!(src_outcome, SubscribeOutcome::Dead { .. }) {
let mut s = state.lock();
if !s.terminated {
s.terminated = true;
drop(s);
core_s.complete_or_defer(pid);
}
}
});
let fn_id = binding.register_producer_build(build);
core.register_producer(fn_id)
.expect("valve: register_producer failed")
}
#[must_use]
pub fn settle(
core: &Core,
binding: &Arc<dyn ProducerBinding>,
source: NodeId,
quiet_waves: u32,
max_waves: Option<u32>,
) -> NodeId {
struct SettleState {
wave_count: u32,
quiet_count: u32,
completed: bool,
}
let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
let core_s = ctx.core();
let binding_s = ctx.core().binding();
let em = ctx.emitter();
let pid = ctx.node_id();
let bb: Arc<dyn BindingBoundary> = binding_s.clone();
let core_sink = em.clone();
let state: Arc<Mutex<SettleState>> = Arc::new(Mutex::new(SettleState {
wave_count: 0,
quiet_count: 0,
completed: false,
}));
let source_sink: Sink = Arc::new(move |msgs| {
enum Act {
Emit(HandleId),
Complete,
Error(HandleId),
SelfComplete,
}
let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
{
let mut s = state.lock();
if s.completed {
return;
}
for m in msgs {
if s.completed {
break;
}
match m.tier() {
3 => {
if let Some(h) = m.payload_handle() {
s.quiet_count = 0;
s.wave_count += 1;
bb.retain_handle(h);
actions.push(Act::Emit(h));
if let Some(max) = max_waves {
if s.wave_count >= max {
s.completed = true;
actions.push(Act::SelfComplete);
}
}
} else {
s.quiet_count += 1;
if s.quiet_count >= quiet_waves {
s.completed = true;
actions.push(Act::SelfComplete);
}
}
}
5 => {
if let Some(h) = m.payload_handle() {
s.completed = true;
bb.retain_handle(h);
actions.push(Act::Error(h));
} else {
s.completed = true;
actions.push(Act::Complete);
}
}
_ => {}
}
}
}
for a in actions {
match a {
Act::Emit(h) => core_sink.emit_or_defer(pid, h),
Act::Complete | Act::SelfComplete => core_sink.complete_or_defer(pid),
Act::Error(h) => core_sink.error_or_defer(pid, h),
}
}
});
let outcome = ctx.subscribe_to(source, source_sink);
if matches!(outcome, SubscribeOutcome::Dead { .. }) {
core_s.complete_or_defer(pid);
}
});
let fn_id = binding.register_producer_build(build);
core.register_producer(fn_id)
.expect("settle: register_producer failed")
}
#[must_use]
pub fn repeat(
core: &Core,
binding: &Arc<dyn ProducerBinding>,
source: NodeId,
count: u32,
) -> NodeId {
struct RepeatState {
remaining: u32,
terminated: bool,
}
let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
let core_s = ctx.core();
let binding_s = ctx.core().binding();
let em = ctx.emitter();
let pid = ctx.node_id();
let state: Arc<Mutex<RepeatState>> = Arc::new(Mutex::new(RepeatState {
remaining: count,
terminated: false,
}));
let storage = ctx.storage();
let sink_slot: Arc<Mutex<Option<Sink>>> = Arc::new(Mutex::new(None));
let sink_slot_inner = sink_slot.clone();
let bb: Arc<dyn BindingBoundary> = binding_s.clone();
let core_sink = em.clone();
let storage_inner = storage.clone();
let source_sink: Sink = Arc::new(move |msgs| {
enum Act {
Emit(HandleId),
Error(HandleId),
Resubscribe,
Complete,
}
let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
{
let mut s = state.lock();
if s.terminated {
return;
}
for m in msgs {
if s.terminated {
break;
}
match m.tier() {
3 => {
if let Some(h) = m.payload_handle() {
bb.retain_handle(h);
actions.push(Act::Emit(h));
}
}
5 => {
if let Some(h) = m.payload_handle() {
s.terminated = true;
bb.retain_handle(h);
actions.push(Act::Error(h));
} else {
if s.remaining > 0 {
s.remaining -= 1;
actions.push(Act::Resubscribe);
} else {
s.terminated = true;
actions.push(Act::Complete);
}
}
}
_ => {}
}
}
}
for a in actions {
match a {
Act::Emit(h) => core_sink.emit_or_defer(pid, h),
Act::Error(h) => core_sink.error_or_defer(pid, h),
Act::Complete => core_sink.complete_or_defer(pid),
Act::Resubscribe => {
let maybe_sink = sink_slot_inner.lock().clone();
if let Some(new_sink) = maybe_sink {
let storage_d = storage_inner.clone();
let state_d = state.clone();
let _ = core_sink.defer(move |c| {
if let Ok(sub) = c.try_subscribe(source, new_sink) {
storage_d
.lock()
.entry(pid)
.or_default()
.subs
.push((source, sub));
} else {
let mut s = state_d.lock();
if !s.terminated {
s.terminated = true;
drop(s);
c.complete(pid);
}
}
});
}
}
}
}
});
*sink_slot.lock() = Some(source_sink.clone());
let outcome = ctx.subscribe_to(source, source_sink);
if matches!(outcome, SubscribeOutcome::Dead { .. }) {
core_s.complete_or_defer(pid);
}
});
let fn_id = binding.register_producer_build(build);
core.register_producer(fn_id)
.expect("repeat: register_producer failed")
}