1#![allow(clippy::too_many_lines, clippy::items_after_statements)]
13
14use std::sync::{Arc, Weak};
15
16use parking_lot::Mutex;
17
18use graphrefly_core::{BindingBoundary, Core, FnId, HandleId, NodeId, Sink};
19use smallvec::SmallVec;
20
21use crate::producer::{ProducerBinding, ProducerBuildFn, ProducerCtx, SubscribeOutcome};
22
23#[must_use]
31pub fn tap(core: &Core, binding: &Arc<dyn ProducerBinding>, source: NodeId, fn_id: FnId) -> NodeId {
32 let core_weak = core.weak_handle();
33 let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
34
35 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
36 let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
37 return;
38 };
39 let pid = ctx.node_id();
40 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
41 let core_sink = core_s.clone();
42
43 let source_sink: Sink = Arc::new(move |msgs| {
44 enum Act {
45 EmitAndTap(HandleId),
46 Complete,
47 Error(HandleId),
48 }
49 let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
50 for m in msgs {
51 match m.tier() {
52 3 => {
53 if let Some(h) = m.payload_handle() {
54 bb.retain_handle(h);
55 actions.push(Act::EmitAndTap(h));
56 }
57 }
58 5 => {
59 if let Some(h) = m.payload_handle() {
60 bb.retain_handle(h);
61 actions.push(Act::Error(h));
62 } else {
63 actions.push(Act::Complete);
64 }
65 }
66 _ => {}
67 }
68 }
69 for a in actions {
70 match a {
71 Act::EmitAndTap(h) => {
72 bb.invoke_tap_fn(fn_id, h);
73 core_sink.emit_or_defer(pid, h);
74 }
75 Act::Complete => core_sink.complete_or_defer(pid),
76 Act::Error(h) => core_sink.error_or_defer(pid, h),
77 }
78 }
79 });
80
81 let outcome = ctx.subscribe_to(source, source_sink);
82 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
83 core_s.complete_or_defer(pid);
84 }
85 });
86
87 let fn_id_reg = binding.register_producer_build(build);
88 core.register_producer(fn_id_reg)
89 .expect("tap: register_producer failed")
90}
91
92#[must_use]
105pub fn tap_observer(
106 core: &Core,
107 binding: &Arc<dyn ProducerBinding>,
108 source: NodeId,
109 data_fn_id: Option<FnId>,
110 error_fn_id: Option<FnId>,
111 complete_fn_id: Option<FnId>,
112) -> NodeId {
113 let core_weak = core.weak_handle();
114 let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
115
116 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
117 let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
118 return;
119 };
120 let pid = ctx.node_id();
121 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
122 let core_sink = core_s.clone();
123
124 let source_sink: Sink = Arc::new(move |msgs| {
125 enum Act {
126 Emit(HandleId),
127 Complete,
128 Error(HandleId),
129 }
130 let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
131 for m in msgs {
132 match m.tier() {
133 3 => {
134 if let Some(h) = m.payload_handle() {
135 bb.retain_handle(h);
136 actions.push(Act::Emit(h));
137 }
138 }
139 5 => {
140 if let Some(h) = m.payload_handle() {
141 bb.retain_handle(h);
142 actions.push(Act::Error(h));
143 } else {
144 actions.push(Act::Complete);
145 }
146 }
147 _ => {}
148 }
149 }
150 for a in actions {
151 match a {
152 Act::Emit(h) => {
153 if let Some(fid) = data_fn_id {
154 bb.invoke_tap_fn(fid, h);
155 }
156 core_sink.emit_or_defer(pid, h);
157 }
158 Act::Complete => {
159 if let Some(fid) = complete_fn_id {
160 bb.invoke_tap_complete_fn(fid);
161 }
162 core_sink.complete_or_defer(pid);
163 }
164 Act::Error(h) => {
165 if let Some(fid) = error_fn_id {
166 bb.invoke_tap_error_fn(fid, h);
167 }
168 core_sink.error_or_defer(pid, h);
169 }
170 }
171 }
172 });
173
174 let outcome = ctx.subscribe_to(source, source_sink);
175 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
176 if let Some(fid) = complete_fn_id {
177 binding_s.invoke_tap_complete_fn(fid);
178 }
179 core_s.complete_or_defer(pid);
180 }
181 });
182
183 let fn_id = binding.register_producer_build(build);
184 core.register_producer(fn_id)
185 .expect("tap_observer: register_producer failed")
186}
187
188#[must_use]
196pub fn on_first_data(
197 core: &Core,
198 binding: &Arc<dyn ProducerBinding>,
199 source: NodeId,
200 fn_id: FnId,
201) -> NodeId {
202 let core_weak = core.weak_handle();
203 let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
204
205 struct OnFirstState {
206 fired: bool,
207 }
208
209 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
210 let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
211 return;
212 };
213 let pid = ctx.node_id();
214 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
215 let core_sink = core_s.clone();
216 let state: Arc<Mutex<OnFirstState>> = Arc::new(Mutex::new(OnFirstState { fired: false }));
217
218 let source_sink: Sink = Arc::new(move |msgs| {
219 enum Act {
220 EmitWithTap(HandleId),
221 Emit(HandleId),
222 Complete,
223 Error(HandleId),
224 }
225 let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
226 {
227 let mut s = state.lock();
228 for m in msgs {
229 match m.tier() {
230 3 => {
231 if let Some(h) = m.payload_handle() {
232 bb.retain_handle(h);
233 if s.fired {
234 actions.push(Act::Emit(h));
235 } else {
236 s.fired = true;
237 actions.push(Act::EmitWithTap(h));
238 }
239 }
240 }
241 5 => {
242 if let Some(h) = m.payload_handle() {
243 bb.retain_handle(h);
244 actions.push(Act::Error(h));
245 } else {
246 actions.push(Act::Complete);
247 }
248 }
249 _ => {}
250 }
251 }
252 }
253 for a in actions {
254 match a {
255 Act::EmitWithTap(h) => {
256 bb.invoke_tap_fn(fn_id, h);
257 core_sink.emit_or_defer(pid, h);
258 }
259 Act::Emit(h) => core_sink.emit_or_defer(pid, h),
260 Act::Complete => core_sink.complete_or_defer(pid),
261 Act::Error(h) => core_sink.error_or_defer(pid, h),
262 }
263 }
264 });
265
266 let outcome = ctx.subscribe_to(source, source_sink);
267 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
268 core_s.complete_or_defer(pid);
269 }
270 });
271
272 let fn_id_reg = binding.register_producer_build(build);
273 core.register_producer(fn_id_reg)
274 .expect("on_first_data: register_producer failed")
275}
276
277#[must_use]
289pub fn rescue(
290 core: &Core,
291 binding: &Arc<dyn ProducerBinding>,
292 source: NodeId,
293 fn_id: FnId,
294) -> NodeId {
295 let core_weak = core.weak_handle();
296 let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
297
298 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
299 let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
300 return;
301 };
302 let pid = ctx.node_id();
303 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
304 let core_sink = core_s.clone();
305
306 let source_sink: Sink = Arc::new(move |msgs| {
307 enum Act {
308 Emit(HandleId),
309 Complete,
310 TryRescue(HandleId),
311 }
312 let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
313 for m in msgs {
314 match m.tier() {
315 3 => {
316 if let Some(h) = m.payload_handle() {
317 bb.retain_handle(h);
318 actions.push(Act::Emit(h));
319 }
320 }
321 5 => {
322 if let Some(h) = m.payload_handle() {
323 bb.retain_handle(h);
324 actions.push(Act::TryRescue(h));
325 } else {
326 actions.push(Act::Complete);
327 }
328 }
329 _ => {}
330 }
331 }
332 for a in actions {
333 match a {
334 Act::Emit(h) => core_sink.emit_or_defer(pid, h),
335 Act::Complete => core_sink.complete_or_defer(pid),
336 Act::TryRescue(err_h) => {
337 match bb.invoke_rescue_fn(fn_id, err_h) {
338 Ok(recovered_h) => {
339 bb.release_handle(err_h);
342 core_sink.emit_or_defer(pid, recovered_h);
343 }
344 Err(()) => {
345 core_sink.error_or_defer(pid, err_h);
347 }
348 }
349 }
350 }
351 }
352 });
353
354 let outcome = ctx.subscribe_to(source, source_sink);
355 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
356 core_s.complete_or_defer(pid);
357 }
358 });
359
360 let fn_id_reg = binding.register_producer_build(build);
361 core.register_producer(fn_id_reg)
362 .expect("rescue: register_producer failed")
363}
364
365#[must_use]
382pub fn valve(
383 core: &Core,
384 binding: &Arc<dyn ProducerBinding>,
385 source: NodeId,
386 control: NodeId,
387 gate_fn_id: FnId,
388 cancel: Option<tokio_util::sync::CancellationToken>,
389) -> NodeId {
390 let core_weak = core.weak_handle();
391 let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
392
393 struct ValveState {
394 open: bool,
395 terminated: bool,
396 }
397
398 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
399 let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
400 return;
401 };
402 let pid = ctx.node_id();
403 let state: Arc<Mutex<ValveState>> = Arc::new(Mutex::new(ValveState {
404 open: false,
405 terminated: false,
406 }));
407
408 let st_ctrl = state.clone();
410 let bb_ctrl: Arc<dyn BindingBoundary> = binding_s.clone();
411 let core_ctrl = core_s.clone();
412 let cancel_ctrl = cancel.clone();
413 let control_sink: Sink = Arc::new(move |msgs| {
414 let mut should_cancel = false;
415 let mut error_action: Option<HandleId> = None;
416 {
417 let mut s = st_ctrl.lock();
418 if s.terminated {
419 return;
420 }
421 for m in msgs {
422 match m.tier() {
423 3 => {
424 if let Some(h) = m.payload_handle() {
425 let results = bb_ctrl.predicate_each(gate_fn_id, &[h]);
426 let new_open = results.first().copied().unwrap_or(false);
427 let was_open = s.open;
428 s.open = new_open;
429 if was_open && !new_open && cancel_ctrl.is_some() {
431 should_cancel = true;
432 }
433 }
434 }
435 5 => {
436 if let Some(h) = m.payload_handle() {
437 if !s.terminated {
439 s.terminated = true;
440 bb_ctrl.retain_handle(h);
441 error_action = Some(h);
442 }
443 }
444 }
446 _ => {}
447 }
448 }
449 }
450 if should_cancel {
451 if let Some(ref ct) = cancel_ctrl {
452 ct.cancel();
453 }
454 }
455 if let Some(h) = error_action {
456 core_ctrl.error_or_defer(pid, h);
457 }
458 });
459
460 let ctrl_outcome = ctx.subscribe_to(control, control_sink);
461 if matches!(ctrl_outcome, SubscribeOutcome::Dead { .. }) {
462 }
465
466 let st_src = state.clone();
468 let bb_src: Arc<dyn BindingBoundary> = binding_s.clone();
469 let core_src = core_s.clone();
470 let source_sink: Sink = Arc::new(move |msgs| {
471 enum Act {
472 Emit(HandleId),
473 Complete,
474 Error(HandleId),
475 }
476 let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
477 {
478 let s = st_src.lock();
479 if s.terminated {
480 return;
481 }
482 for m in msgs {
483 match m.tier() {
484 3 => {
485 if let Some(h) = m.payload_handle() {
486 if s.open {
487 bb_src.retain_handle(h);
488 actions.push(Act::Emit(h));
489 }
490 }
492 }
493 5 => {
494 if let Some(h) = m.payload_handle() {
495 bb_src.retain_handle(h);
496 actions.push(Act::Error(h));
497 } else {
498 actions.push(Act::Complete);
499 }
500 }
501 _ => {}
502 }
503 }
504 }
505 for a in actions {
506 match a {
507 Act::Emit(h) => core_src.emit_or_defer(pid, h),
508 Act::Complete => core_src.complete_or_defer(pid),
509 Act::Error(h) => core_src.error_or_defer(pid, h),
510 }
511 }
512 });
513
514 let src_outcome = ctx.subscribe_to(source, source_sink);
515 if matches!(src_outcome, SubscribeOutcome::Dead { .. }) {
516 let mut s = state.lock();
517 if !s.terminated {
518 s.terminated = true;
519 drop(s);
520 core_s.complete_or_defer(pid);
521 }
522 }
523 });
524
525 let fn_id = binding.register_producer_build(build);
526 core.register_producer(fn_id)
527 .expect("valve: register_producer failed")
528}
529
530#[must_use]
543pub fn settle(
544 core: &Core,
545 binding: &Arc<dyn ProducerBinding>,
546 source: NodeId,
547 quiet_waves: u32,
548 max_waves: Option<u32>,
549) -> NodeId {
550 let core_weak = core.weak_handle();
551 let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
552
553 struct SettleState {
554 wave_count: u32,
555 quiet_count: u32,
556 completed: bool,
557 }
558
559 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
560 let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
561 return;
562 };
563 let pid = ctx.node_id();
564 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
565 let core_sink = core_s.clone();
566 let state: Arc<Mutex<SettleState>> = Arc::new(Mutex::new(SettleState {
567 wave_count: 0,
568 quiet_count: 0,
569 completed: false,
570 }));
571
572 let source_sink: Sink = Arc::new(move |msgs| {
573 enum Act {
574 Emit(HandleId),
575 Complete,
576 Error(HandleId),
577 SelfComplete,
578 }
579 let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
580 {
581 let mut s = state.lock();
582 if s.completed {
583 return;
584 }
585 for m in msgs {
586 if s.completed {
587 break;
588 }
589 match m.tier() {
590 3 => {
591 if let Some(h) = m.payload_handle() {
592 s.quiet_count = 0;
594 s.wave_count += 1;
595 bb.retain_handle(h);
596 actions.push(Act::Emit(h));
597 if let Some(max) = max_waves {
599 if s.wave_count >= max {
600 s.completed = true;
601 actions.push(Act::SelfComplete);
602 }
603 }
604 } else {
605 s.quiet_count += 1;
607 if s.quiet_count >= quiet_waves {
608 s.completed = true;
609 actions.push(Act::SelfComplete);
610 }
611 }
612 }
613 5 => {
614 if let Some(h) = m.payload_handle() {
615 s.completed = true;
616 bb.retain_handle(h);
617 actions.push(Act::Error(h));
618 } else {
619 s.completed = true;
620 actions.push(Act::Complete);
621 }
622 }
623 _ => {}
624 }
625 }
626 }
627 for a in actions {
628 match a {
629 Act::Emit(h) => core_sink.emit_or_defer(pid, h),
630 Act::Complete | Act::SelfComplete => core_sink.complete_or_defer(pid),
631 Act::Error(h) => core_sink.error_or_defer(pid, h),
632 }
633 }
634 });
635
636 let outcome = ctx.subscribe_to(source, source_sink);
637 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
638 core_s.complete_or_defer(pid);
639 }
640 });
641
642 let fn_id = binding.register_producer_build(build);
643 core.register_producer(fn_id)
644 .expect("settle: register_producer failed")
645}
646
647#[must_use]
659pub fn repeat(
660 core: &Core,
661 binding: &Arc<dyn ProducerBinding>,
662 source: NodeId,
663 count: u32,
664) -> NodeId {
665 let core_weak = core.weak_handle();
666 let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
667
668 struct RepeatState {
669 remaining: u32,
670 terminated: bool,
671 }
672
673 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
674 let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
675 return;
676 };
677 let pid = ctx.node_id();
678 let state: Arc<Mutex<RepeatState>> = Arc::new(Mutex::new(RepeatState {
679 remaining: count,
680 terminated: false,
681 }));
682
683 let storage = binding_s.producer_storage().clone();
689
690 let sink_slot: Arc<Mutex<Option<Sink>>> = Arc::new(Mutex::new(None));
693 let sink_slot_inner = sink_slot.clone();
694 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
695 let core_sink = core_s.clone();
696 let storage_inner = storage.clone();
697
698 let source_sink: Sink = Arc::new(move |msgs| {
699 enum Act {
700 Emit(HandleId),
701 Error(HandleId),
702 Resubscribe,
703 Complete,
704 }
705 let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
706 {
707 let mut s = state.lock();
708 if s.terminated {
709 return;
710 }
711 for m in msgs {
712 if s.terminated {
713 break;
714 }
715 match m.tier() {
716 3 => {
717 if let Some(h) = m.payload_handle() {
718 bb.retain_handle(h);
719 actions.push(Act::Emit(h));
720 }
721 }
722 5 => {
723 if let Some(h) = m.payload_handle() {
724 s.terminated = true;
726 bb.retain_handle(h);
727 actions.push(Act::Error(h));
728 } else {
729 if s.remaining > 0 {
731 s.remaining -= 1;
732 actions.push(Act::Resubscribe);
733 } else {
734 s.terminated = true;
735 actions.push(Act::Complete);
736 }
737 }
738 }
739 _ => {}
740 }
741 }
742 }
743 for a in actions {
744 match a {
745 Act::Emit(h) => core_sink.emit_or_defer(pid, h),
746 Act::Error(h) => core_sink.error_or_defer(pid, h),
747 Act::Complete => core_sink.complete_or_defer(pid),
748 Act::Resubscribe => {
749 let maybe_sink = sink_slot_inner.lock().clone();
751 if let Some(new_sink) = maybe_sink {
752 if let Ok(sub) = core_sink.try_subscribe(source, new_sink) {
753 storage_inner.lock().entry(pid).or_default().subs.push(sub);
754 } else {
755 let mut s = state.lock();
758 if !s.terminated {
759 s.terminated = true;
760 drop(s);
761 core_sink.complete_or_defer(pid);
762 }
763 }
764 }
765 }
766 }
767 }
768 });
769
770 *sink_slot.lock() = Some(source_sink.clone());
772
773 let outcome = ctx.subscribe_to(source, source_sink);
774 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
775 core_s.complete_or_defer(pid);
778 }
779 });
780
781 let fn_id = binding.register_producer_build(build);
782 core.register_producer(fn_id)
783 .expect("repeat: register_producer failed")
784}