1use std::collections::VecDeque;
40use std::sync::{Arc, Weak};
41use std::time::Duration;
42
43use parking_lot::Mutex;
44
45use graphrefly_core::{BindingBoundary, Core, FnId, HandleId, NodeId, Sink, WeakCore};
46use smallvec::SmallVec;
47
48use crate::producer::{ProducerBinding, ProducerBuildFn, ProducerCtx, SubscribeOutcome};
49
50enum TemporalCmd {
57 Value(HandleId),
59 Complete,
61 Error(HandleId),
63}
64
65struct TemporalTaskGuard {
69 _tx: tokio::sync::mpsc::UnboundedSender<TemporalCmd>,
72 task: tokio::task::JoinHandle<()>,
73}
74
75impl Drop for TemporalTaskGuard {
76 fn drop(&mut self) {
77 self.task.abort();
80 }
81}
82
83struct AbortOnDrop(tokio::task::JoinHandle<()>);
85
86impl Drop for AbortOnDrop {
87 fn drop(&mut self) {
88 self.0.abort();
89 }
90}
91
92#[must_use]
102#[allow(clippy::too_many_lines)]
103pub fn sample(
104 core: &Core,
105 binding: &Arc<dyn ProducerBinding>,
106 source: NodeId,
107 notifier: NodeId,
108) -> NodeId {
109 let core_weak = core.weak_handle();
110 let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
111
112 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
113 let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
114 return;
115 };
116 let pid = ctx.node_id();
117 let state: Arc<Mutex<SampleState>> = Arc::new(Mutex::new(SampleState::default()));
118
119 let st = state.clone();
121 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
122 let core_src = core_s.clone();
123 let source_sink: Sink = Arc::new(move |msgs| {
124 enum Act {
125 Release(HandleId),
126 Error(HandleId),
127 }
128 let mut actions: SmallVec<[Act; 2]> = SmallVec::new();
129 {
130 let mut s = st.lock();
131 if s.terminated {
132 return;
133 }
134 for m in msgs {
135 match m.tier() {
136 3 => {
137 if let Some(h) = m.payload_handle() {
138 if let Some(old) = s.latest.replace(h) {
139 actions.push(Act::Release(old));
140 }
141 bb.retain_handle(h);
142 }
143 }
144 5 => {
145 if let Some(h) = m.payload_handle() {
146 s.terminated = true;
147 if let Some(old) = s.latest.take() {
148 actions.push(Act::Release(old));
149 }
150 bb.retain_handle(h);
151 actions.push(Act::Error(h));
152 } else {
153 s.source_completed = true;
154 if let Some(old) = s.latest.take() {
155 actions.push(Act::Release(old));
156 }
157 }
158 }
159 _ => {}
160 }
161 }
162 }
163 for a in actions {
164 match a {
165 Act::Release(h) => bb.release_handle(h),
166 Act::Error(h) => core_src.error_or_defer(pid, h),
167 }
168 }
169 });
170
171 let src_outcome = ctx.subscribe_to(source, source_sink);
172 if matches!(src_outcome, SubscribeOutcome::Dead { .. }) {
173 state.lock().source_completed = true;
174 }
175
176 let st2 = state.clone();
178 let core_n = core_s.clone();
179 let bb2: Arc<dyn BindingBoundary> = binding_s.clone();
180 let notifier_sink: Sink = Arc::new(move |msgs| {
181 let mut s = st2.lock();
182 if s.terminated {
183 return;
184 }
185 for m in msgs {
186 if s.terminated {
187 return;
188 }
189 match m.tier() {
190 3 if m.payload_handle().is_some()
191 && !s.source_completed
192 && s.latest.is_some() =>
193 {
194 let h = s.latest.unwrap();
195 bb2.retain_handle(h);
196 drop(s);
197 core_n.emit_or_defer(pid, h);
198 s = st2.lock();
201 }
202 5 => {
203 if let Some(h) = m.payload_handle() {
204 s.terminated = true;
205 if let Some(old) = s.latest.take() {
206 bb2.release_handle(old);
207 }
208 bb2.retain_handle(h);
209 drop(s);
210 core_n.error_or_defer(pid, h);
211 return;
212 }
213 s.terminated = true;
215 if let Some(old) = s.latest.take() {
216 bb2.release_handle(old);
217 }
218 drop(s);
219 core_n.complete_or_defer(pid);
220 return;
221 }
222 _ => {}
223 }
224 }
225 });
226
227 let not_outcome = ctx.subscribe_to(notifier, notifier_sink);
228 if matches!(not_outcome, SubscribeOutcome::Dead { .. }) {
229 let mut s = state.lock();
230 if !s.terminated {
231 s.terminated = true;
232 if let Some(old) = s.latest.take() {
233 binding_s.release_handle(old);
234 }
235 drop(s);
236 core_s.complete_or_defer(pid);
237 }
238 }
239 });
240
241 let fn_id = binding.register_producer_build(build);
242 core.register_producer(fn_id)
243 .expect("sample: register_producer failed")
244}
245
246#[derive(Default)]
247struct SampleState {
248 latest: Option<HandleId>,
249 source_completed: bool,
250 terminated: bool,
251}
252
253#[must_use]
262pub fn debounce(
263 core: &Core,
264 binding: &Arc<dyn ProducerBinding>,
265 source: NodeId,
266 ms: u64,
267) -> NodeId {
268 let core_weak = core.weak_handle();
269 let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
270 let delay = Duration::from_millis(ms);
271
272 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
273 let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
274 return;
275 };
276 let pid = ctx.node_id();
277 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
278
279 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
280 let tx_sink = tx.clone();
281 let tx_dead = tx.clone();
282 let task = tokio::spawn(debounce_task(
283 rx,
284 core_s.weak_handle(),
285 pid,
286 bb.clone(),
287 delay,
288 ));
289
290 {
292 let mut storage = binding_s.producer_storage().lock();
293 let entry = storage.entry(pid).or_default();
294 entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
295 }
296
297 let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
298 let source_sink: Sink = Arc::new(move |msgs| {
299 for m in msgs {
300 match m.tier() {
301 3 => {
302 if let Some(h) = m.payload_handle() {
303 bb_sink.retain_handle(h);
304 if tx_sink.send(TemporalCmd::Value(h)).is_err() {
305 bb_sink.release_handle(h);
306 }
307 }
308 }
309 5 => {
310 if let Some(h) = m.payload_handle() {
311 bb_sink.retain_handle(h);
312 if tx_sink.send(TemporalCmd::Error(h)).is_err() {
313 bb_sink.release_handle(h);
314 }
315 } else {
316 let _ = tx_sink.send(TemporalCmd::Complete);
317 }
318 }
319 _ => {}
320 }
321 }
322 });
323
324 let outcome = ctx.subscribe_to(source, source_sink);
325 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
326 let _ = tx_dead.send(TemporalCmd::Complete);
327 }
328 });
329
330 let fn_id = binding.register_producer_build(build);
331 core.register_producer(fn_id)
332 .expect("debounce: register_producer failed")
333}
334
335async fn debounce_task(
336 mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
337 core: WeakCore,
338 pid: NodeId,
339 binding: Arc<dyn BindingBoundary>,
340 delay: Duration,
341) {
342 let mut pending: Option<HandleId> = None;
343
344 loop {
345 if let Some(h) = pending {
346 tokio::select! {
347 biased;
348 cmd = rx.recv() => {
349 match cmd {
350 Some(TemporalCmd::Value(new_h)) => {
351 binding.release_handle(h);
352 pending = Some(new_h);
353 }
354 Some(TemporalCmd::Complete) => {
355 if let Some(c) = core.upgrade() {
357 c.emit_or_defer(pid, h);
358 c.complete_or_defer(pid);
359 } else {
360 binding.release_handle(h);
361 }
362 return;
363 }
364 Some(TemporalCmd::Error(err_h)) => {
365 binding.release_handle(h);
366 if let Some(c) = core.upgrade() {
367 c.error_or_defer(pid, err_h);
368 } else {
369 binding.release_handle(err_h);
370 }
371 return;
372 }
373 None => {
374 binding.release_handle(h);
376 return;
377 }
378 }
379 }
380 () = tokio::time::sleep(delay) => {
381 if let Some(c) = core.upgrade() {
383 c.emit_or_defer(pid, h);
384 } else {
385 binding.release_handle(h);
386 return;
387 }
388 pending = None;
389 }
390 }
391 } else {
392 match rx.recv().await {
394 Some(TemporalCmd::Value(h)) => {
395 pending = Some(h);
396 }
397 Some(TemporalCmd::Complete) => {
398 if let Some(c) = core.upgrade() {
399 c.complete_or_defer(pid);
400 }
401 return;
402 }
403 Some(TemporalCmd::Error(err_h)) => {
404 if let Some(c) = core.upgrade() {
405 c.error_or_defer(pid, err_h);
406 } else {
407 binding.release_handle(err_h);
408 }
409 return;
410 }
411 None => return,
412 }
413 }
414 }
415}
416
417#[must_use]
429pub fn audit(core: &Core, binding: &Arc<dyn ProducerBinding>, source: NodeId, ms: u64) -> NodeId {
430 let core_weak = core.weak_handle();
431 let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
432 let delay = Duration::from_millis(ms);
433
434 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
435 let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
436 return;
437 };
438 let pid = ctx.node_id();
439 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
440
441 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
442 let tx_sink = tx.clone();
443 let tx_dead = tx.clone();
444 let task = tokio::spawn(audit_task(rx, core_s.weak_handle(), pid, bb.clone(), delay));
445
446 {
447 let mut storage = binding_s.producer_storage().lock();
448 let entry = storage.entry(pid).or_default();
449 entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
450 }
451
452 let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
453 let source_sink: Sink = Arc::new(move |msgs| {
454 for m in msgs {
455 match m.tier() {
456 3 => {
457 if let Some(h) = m.payload_handle() {
458 bb_sink.retain_handle(h);
459 if tx_sink.send(TemporalCmd::Value(h)).is_err() {
460 bb_sink.release_handle(h);
461 }
462 }
463 }
464 5 => {
465 if let Some(h) = m.payload_handle() {
466 bb_sink.retain_handle(h);
467 if tx_sink.send(TemporalCmd::Error(h)).is_err() {
468 bb_sink.release_handle(h);
469 }
470 } else {
471 let _ = tx_sink.send(TemporalCmd::Complete);
472 }
473 }
474 _ => {}
475 }
476 }
477 });
478
479 let outcome = ctx.subscribe_to(source, source_sink);
480 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
481 let _ = tx_dead.send(TemporalCmd::Complete);
482 }
483 });
484
485 let fn_id = binding.register_producer_build(build);
486 core.register_producer(fn_id)
487 .expect("audit: register_producer failed")
488}
489
490async fn audit_task(
491 mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
492 core: WeakCore,
493 pid: NodeId,
494 binding: Arc<dyn BindingBoundary>,
495 delay: Duration,
496) {
497 loop {
499 match rx.recv().await {
500 Some(TemporalCmd::Value(h)) => {
501 let mut latest = h;
503
504 tokio::select! {
505 biased;
506 () = async {
508 loop {
509 tokio::select! {
510 biased;
511 cmd = rx.recv() => {
512 match cmd {
513 Some(TemporalCmd::Value(new_h)) => {
514 binding.release_handle(latest);
515 latest = new_h;
516 }
517 Some(TemporalCmd::Complete) => {
518 if let Some(c) = core.upgrade() {
520 c.emit_or_defer(pid, latest);
521 c.complete_or_defer(pid);
522 } else {
523 binding.release_handle(latest);
524 }
525 return; }
527 Some(TemporalCmd::Error(err_h)) => {
528 binding.release_handle(latest);
529 if let Some(c) = core.upgrade() {
530 c.error_or_defer(pid, err_h);
531 } else {
532 binding.release_handle(err_h);
533 }
534 return;
535 }
536 None => {
537 binding.release_handle(latest);
538 return;
539 }
540 }
541 }
542 }
543 }
544 } => {
545 return; }
547 () = tokio::time::sleep(delay) => {
548 if let Some(c) = core.upgrade() {
550 c.emit_or_defer(pid, latest);
551 } else {
552 binding.release_handle(latest);
553 return;
554 }
555 }
557 }
558 }
559 Some(TemporalCmd::Complete) => {
560 if let Some(c) = core.upgrade() {
561 c.complete_or_defer(pid);
562 }
563 return;
564 }
565 Some(TemporalCmd::Error(err_h)) => {
566 if let Some(c) = core.upgrade() {
567 c.error_or_defer(pid, err_h);
568 } else {
569 binding.release_handle(err_h);
570 }
571 return;
572 }
573 None => return,
574 }
575 }
576}
577
578#[must_use]
588pub fn delay(core: &Core, binding: &Arc<dyn ProducerBinding>, source: NodeId, ms: u64) -> NodeId {
589 let core_weak = core.weak_handle();
590 let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
591 let delay_dur = Duration::from_millis(ms);
592
593 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
594 let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
595 return;
596 };
597 let pid = ctx.node_id();
598 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
599
600 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
601 let tx_sink = tx.clone();
602 let tx_dead = tx.clone();
603 let task = tokio::spawn(delay_task(
604 rx,
605 core_s.weak_handle(),
606 pid,
607 bb.clone(),
608 delay_dur,
609 ));
610
611 {
612 let mut storage = binding_s.producer_storage().lock();
613 let entry = storage.entry(pid).or_default();
614 entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
615 }
616
617 let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
618 let source_sink: Sink = Arc::new(move |msgs| {
619 for m in msgs {
620 match m.tier() {
621 3 => {
622 if let Some(h) = m.payload_handle() {
623 bb_sink.retain_handle(h);
624 if tx_sink.send(TemporalCmd::Value(h)).is_err() {
625 bb_sink.release_handle(h);
626 }
627 }
628 }
629 5 => {
630 if let Some(h) = m.payload_handle() {
631 bb_sink.retain_handle(h);
632 if tx_sink.send(TemporalCmd::Error(h)).is_err() {
633 bb_sink.release_handle(h);
634 }
635 } else {
636 let _ = tx_sink.send(TemporalCmd::Complete);
637 }
638 }
639 _ => {}
640 }
641 }
642 });
643
644 let outcome = ctx.subscribe_to(source, source_sink);
645 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
646 let _ = tx_dead.send(TemporalCmd::Complete);
647 }
648 });
649
650 let fn_id = binding.register_producer_build(build);
651 core.register_producer(fn_id)
652 .expect("delay: register_producer failed")
653}
654
655async fn delay_task(
656 mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
657 core: WeakCore,
658 pid: NodeId,
659 binding: Arc<dyn BindingBoundary>,
660 delay: Duration,
661) {
662 let mut queue: VecDeque<(tokio::time::Instant, HandleId)> = VecDeque::new();
663 let mut complete_pending = false;
664
665 loop {
666 let next_fire = queue.front().map(|(deadline, _)| *deadline);
667
668 tokio::select! {
669 biased;
670 cmd = rx.recv() => {
671 match cmd {
672 Some(TemporalCmd::Value(h)) => {
673 queue.push_back((tokio::time::Instant::now() + delay, h));
674 }
675 Some(TemporalCmd::Complete) => {
676 complete_pending = true;
677 if queue.is_empty() {
678 if let Some(c) = core.upgrade() {
679 c.complete_or_defer(pid);
680 }
681 return;
682 }
683 }
685 Some(TemporalCmd::Error(err_h)) => {
686 for (_, h) in queue.drain(..) {
688 binding.release_handle(h);
689 }
690 if let Some(c) = core.upgrade() {
691 c.error_or_defer(pid, err_h);
692 } else {
693 binding.release_handle(err_h);
694 }
695 return;
696 }
697 None => {
698 for (_, h) in queue.drain(..) {
699 binding.release_handle(h);
700 }
701 return;
702 }
703 }
704 }
705 () = sleep_until_or_forever(next_fire) => {
706 let now = tokio::time::Instant::now();
708 while let Some(&(deadline, _)) = queue.front() {
709 if deadline <= now {
710 let (_, h) = queue.pop_front().unwrap();
711 if let Some(c) = core.upgrade() {
712 c.emit_or_defer(pid, h);
713 } else {
714 binding.release_handle(h);
715 for (_, h2) in queue.drain(..) {
716 binding.release_handle(h2);
717 }
718 return;
719 }
720 } else {
721 break;
722 }
723 }
724 if complete_pending && queue.is_empty() {
725 if let Some(c) = core.upgrade() {
726 c.complete_or_defer(pid);
727 }
728 return;
729 }
730 }
731 }
732 }
733}
734
735#[derive(Debug, Clone, Copy)]
741pub struct ThrottleOpts {
742 pub leading: bool,
744 pub trailing: bool,
746}
747
748impl Default for ThrottleOpts {
749 fn default() -> Self {
750 Self {
751 leading: true,
752 trailing: false,
753 }
754 }
755}
756
757#[must_use]
765pub fn throttle(
766 core: &Core,
767 binding: &Arc<dyn ProducerBinding>,
768 source: NodeId,
769 ms: u64,
770 opts: ThrottleOpts,
771) -> NodeId {
772 let core_weak = core.weak_handle();
773 let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
774 let window = Duration::from_millis(ms);
775
776 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
777 let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
778 return;
779 };
780 let pid = ctx.node_id();
781 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
782
783 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
784 let tx_sink = tx.clone();
785 let tx_dead = tx.clone();
786 let task = tokio::spawn(throttle_task(
787 rx,
788 core_s.weak_handle(),
789 pid,
790 bb.clone(),
791 window,
792 opts,
793 ));
794
795 {
796 let mut storage = binding_s.producer_storage().lock();
797 let entry = storage.entry(pid).or_default();
798 entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
799 }
800
801 let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
802 let source_sink: Sink = Arc::new(move |msgs| {
803 for m in msgs {
804 match m.tier() {
805 3 => {
806 if let Some(h) = m.payload_handle() {
807 bb_sink.retain_handle(h);
808 if tx_sink.send(TemporalCmd::Value(h)).is_err() {
809 bb_sink.release_handle(h);
810 }
811 }
812 }
813 5 => {
814 if let Some(h) = m.payload_handle() {
815 bb_sink.retain_handle(h);
816 if tx_sink.send(TemporalCmd::Error(h)).is_err() {
817 bb_sink.release_handle(h);
818 }
819 } else {
820 let _ = tx_sink.send(TemporalCmd::Complete);
821 }
822 }
823 _ => {}
824 }
825 }
826 });
827
828 let outcome = ctx.subscribe_to(source, source_sink);
829 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
830 let _ = tx_dead.send(TemporalCmd::Complete);
831 }
832 });
833
834 let fn_id = binding.register_producer_build(build);
835 core.register_producer(fn_id)
836 .expect("throttle: register_producer failed")
837}
838
839async fn throttle_task(
840 mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
841 core: WeakCore,
842 pid: NodeId,
843 binding: Arc<dyn BindingBoundary>,
844 window: Duration,
845 opts: ThrottleOpts,
846) {
847 let mut trailing_pending: Option<HandleId> = None;
848 let mut window_deadline: Option<tokio::time::Instant> = None;
849
850 loop {
851 let fire_at = if opts.trailing { window_deadline } else { None };
852
853 tokio::select! {
854 biased;
855 cmd = rx.recv() => {
856 match cmd {
857 Some(TemporalCmd::Value(h)) => {
858 let in_window = window_deadline
859 .is_some_and(|d| tokio::time::Instant::now() < d);
860
861 if !in_window {
862 window_deadline = Some(tokio::time::Instant::now() + window);
864 if opts.leading {
865 if let Some(c) = core.upgrade() {
866 c.emit_or_defer(pid, h);
867 } else {
868 binding.release_handle(h);
869 release_opt(&mut trailing_pending, &*binding);
870 return;
871 }
872 } else if opts.trailing {
873 if let Some(old) = trailing_pending.replace(h) {
874 binding.release_handle(old);
875 }
876 } else {
877 binding.release_handle(h);
878 }
879 } else if opts.trailing {
880 if let Some(old) = trailing_pending.replace(h) {
882 binding.release_handle(old);
883 }
884 } else {
885 binding.release_handle(h);
886 }
887 }
888 Some(TemporalCmd::Complete) => {
889 if let Some(h) = trailing_pending.take() {
890 if let Some(c) = core.upgrade() {
891 c.emit_or_defer(pid, h);
892 c.complete_or_defer(pid);
893 } else {
894 binding.release_handle(h);
895 }
896 } else if let Some(c) = core.upgrade() {
897 c.complete_or_defer(pid);
898 }
899 return;
900 }
901 Some(TemporalCmd::Error(err_h)) => {
902 release_opt(&mut trailing_pending, &*binding);
903 if let Some(c) = core.upgrade() {
904 c.error_or_defer(pid, err_h);
905 } else {
906 binding.release_handle(err_h);
907 }
908 return;
909 }
910 None => {
911 release_opt(&mut trailing_pending, &*binding);
912 return;
913 }
914 }
915 }
916 () = sleep_until_or_forever(fire_at) => {
917 window_deadline = None;
919 if let Some(h) = trailing_pending.take() {
920 if let Some(c) = core.upgrade() {
921 c.emit_or_defer(pid, h);
922 window_deadline = Some(tokio::time::Instant::now() + window);
924 } else {
925 binding.release_handle(h);
926 return;
927 }
928 }
929 }
930 }
931 }
932}
933
934#[must_use]
946pub fn interval(core: &Core, binding: &Arc<dyn ProducerBinding>, period_ms: u64) -> NodeId {
947 let core_weak = core.weak_handle();
948 let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
949 let period = Duration::from_millis(period_ms);
950
951 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
952 let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
953 return;
954 };
955 let pid = ctx.node_id();
956 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
957 let weak = core_s.weak_handle();
958
959 let task = tokio::spawn(async move {
960 let mut ticker = tokio::time::interval(period);
961 ticker.tick().await; let mut counter: u64 = 1; loop {
964 ticker.tick().await;
965 if let Some(c) = weak.upgrade() {
966 let h = HandleId::new(counter);
967 bb.retain_handle(h);
968 c.emit_or_defer(pid, h);
969 counter += 1;
970 } else {
971 break;
972 }
973 }
974 });
975
976 {
977 let mut storage = binding_s.producer_storage().lock();
978 let entry = storage.entry(pid).or_default();
979 entry.op_state = Some(Box::new(AbortOnDrop(task)));
980 }
981 });
982
983 let fn_id = binding.register_producer_build(build);
984 core.register_producer(fn_id)
985 .expect("interval: register_producer failed")
986}
987
988async fn sleep_until_or_forever(deadline: Option<tokio::time::Instant>) {
994 match deadline {
995 Some(d) => tokio::time::sleep_until(d).await,
996 None => std::future::pending::<()>().await,
997 }
998}
999
1000fn release_opt(opt: &mut Option<HandleId>, binding: &dyn BindingBoundary) {
1002 if let Some(h) = opt.take() {
1003 binding.release_handle(h);
1004 }
1005}
1006
1007#[must_use]
1018pub fn timeout(
1019 core: &Core,
1020 binding: &Arc<dyn ProducerBinding>,
1021 source: NodeId,
1022 ms: u64,
1023 error_handle: HandleId,
1024) -> NodeId {
1025 let core_weak = core.weak_handle();
1026 let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
1027 let duration = Duration::from_millis(ms);
1028
1029 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
1030 let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
1031 return;
1032 };
1033 let pid = ctx.node_id();
1034 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
1035
1036 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
1037 let tx_sink = tx.clone();
1038 let tx_dead = tx.clone();
1039 let task = tokio::spawn(timeout_task(
1040 rx,
1041 core_s.weak_handle(),
1042 pid,
1043 bb.clone(),
1044 duration,
1045 error_handle,
1046 ));
1047
1048 {
1049 let mut storage = binding_s.producer_storage().lock();
1050 let entry = storage.entry(pid).or_default();
1051 entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
1052 }
1053
1054 let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
1055 let source_sink: Sink = Arc::new(move |msgs| {
1056 for m in msgs {
1057 match m.tier() {
1058 3 => {
1059 if let Some(h) = m.payload_handle() {
1060 bb_sink.retain_handle(h);
1061 if tx_sink.send(TemporalCmd::Value(h)).is_err() {
1062 bb_sink.release_handle(h);
1063 }
1064 }
1065 }
1066 5 => {
1067 if let Some(h) = m.payload_handle() {
1068 bb_sink.retain_handle(h);
1069 if tx_sink.send(TemporalCmd::Error(h)).is_err() {
1070 bb_sink.release_handle(h);
1071 }
1072 } else {
1073 let _ = tx_sink.send(TemporalCmd::Complete);
1074 }
1075 }
1076 _ => {}
1077 }
1078 }
1079 });
1080
1081 let outcome = ctx.subscribe_to(source, source_sink);
1082 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
1083 let _ = tx_dead.send(TemporalCmd::Complete);
1084 }
1085 });
1086
1087 let fn_id = binding.register_producer_build(build);
1088 core.register_producer(fn_id)
1089 .expect("timeout: register_producer failed")
1090}
1091
1092async fn timeout_task(
1093 mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
1094 core: WeakCore,
1095 pid: NodeId,
1096 binding: Arc<dyn BindingBoundary>,
1097 duration: Duration,
1098 error_handle: HandleId,
1099) {
1100 loop {
1103 tokio::select! {
1104 biased;
1105 cmd = rx.recv() => {
1106 match cmd {
1107 Some(TemporalCmd::Value(h)) => {
1108 if let Some(c) = core.upgrade() {
1111 c.emit_or_defer(pid, h);
1112 } else {
1113 binding.release_handle(h);
1114 return;
1115 }
1116 }
1118 Some(TemporalCmd::Complete) => {
1119 if let Some(c) = core.upgrade() {
1120 c.complete_or_defer(pid);
1121 }
1122 return;
1123 }
1124 Some(TemporalCmd::Error(err_h)) => {
1125 if let Some(c) = core.upgrade() {
1126 c.error_or_defer(pid, err_h);
1127 } else {
1128 binding.release_handle(err_h);
1129 }
1130 return;
1131 }
1132 None => return,
1133 }
1134 }
1135 () = tokio::time::sleep(duration) => {
1136 if let Some(c) = core.upgrade() {
1138 binding.retain_handle(error_handle);
1139 c.error_or_defer(pid, error_handle);
1140 }
1141 return;
1142 }
1143 }
1144 }
1145}
1146
1147enum BufferTimeCmd {
1153 Value(HandleId),
1155 Complete,
1157 Error(HandleId),
1159}
1160
1161#[must_use]
1170pub fn buffer_time(
1171 core: &Core,
1172 binding: &Arc<dyn ProducerBinding>,
1173 source: NodeId,
1174 ms: u64,
1175 pack_fn_id: FnId,
1176) -> NodeId {
1177 let core_weak = core.weak_handle();
1178 let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
1179 let period = Duration::from_millis(ms);
1180
1181 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
1182 let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
1183 return;
1184 };
1185 let pid = ctx.node_id();
1186 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
1187
1188 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
1189 let tx_sink = tx.clone();
1190 let tx_dead = tx.clone();
1191 let task = tokio::spawn(buffer_time_task(
1192 rx,
1193 core_s.weak_handle(),
1194 pid,
1195 bb.clone(),
1196 period,
1197 pack_fn_id,
1198 ));
1199
1200 {
1201 let mut storage = binding_s.producer_storage().lock();
1202 let entry = storage.entry(pid).or_default();
1203 entry.op_state = Some(Box::new(BufferTimeTaskGuard { _tx: tx, task }));
1204 }
1205
1206 let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
1207 let source_sink: Sink = Arc::new(move |msgs| {
1208 for m in msgs {
1209 match m.tier() {
1210 3 => {
1211 if let Some(h) = m.payload_handle() {
1212 bb_sink.retain_handle(h);
1213 if tx_sink.send(BufferTimeCmd::Value(h)).is_err() {
1214 bb_sink.release_handle(h);
1215 }
1216 }
1217 }
1218 5 => {
1219 if let Some(h) = m.payload_handle() {
1220 bb_sink.retain_handle(h);
1221 if tx_sink.send(BufferTimeCmd::Error(h)).is_err() {
1222 bb_sink.release_handle(h);
1223 }
1224 } else {
1225 let _ = tx_sink.send(BufferTimeCmd::Complete);
1226 }
1227 }
1228 _ => {}
1229 }
1230 }
1231 });
1232
1233 let outcome = ctx.subscribe_to(source, source_sink);
1234 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
1235 let _ = tx_dead.send(BufferTimeCmd::Complete);
1236 }
1237 });
1238
1239 let fn_id = binding.register_producer_build(build);
1240 core.register_producer(fn_id)
1241 .expect("buffer_time: register_producer failed")
1242}
1243
1244struct BufferTimeTaskGuard {
1247 _tx: tokio::sync::mpsc::UnboundedSender<BufferTimeCmd>,
1248 task: tokio::task::JoinHandle<()>,
1249}
1250
1251impl Drop for BufferTimeTaskGuard {
1252 fn drop(&mut self) {
1253 self.task.abort();
1254 }
1255}
1256
1257async fn buffer_time_task(
1258 mut rx: tokio::sync::mpsc::UnboundedReceiver<BufferTimeCmd>,
1259 core: WeakCore,
1260 pid: NodeId,
1261 binding: Arc<dyn BindingBoundary>,
1262 period: Duration,
1263 pack_fn_id: FnId,
1264) {
1265 let mut buf: Vec<HandleId> = Vec::new();
1266 let mut ticker = tokio::time::interval(period);
1267 ticker.tick().await; loop {
1270 tokio::select! {
1271 biased;
1272 cmd = rx.recv() => {
1273 match cmd {
1274 Some(BufferTimeCmd::Value(h)) => {
1275 buf.push(h);
1276 }
1277 Some(BufferTimeCmd::Complete) => {
1278 if !buf.is_empty() {
1280 if let Some(c) = core.upgrade() {
1281 let packed = binding.pack_tuple(pack_fn_id, &buf);
1282 c.emit_or_defer(pid, packed);
1283 }
1284 for h in buf.drain(..) {
1285 binding.release_handle(h);
1286 }
1287 }
1288 if let Some(c) = core.upgrade() {
1289 c.complete_or_defer(pid);
1290 }
1291 return;
1292 }
1293 Some(BufferTimeCmd::Error(err_h)) => {
1294 for h in buf.drain(..) {
1295 binding.release_handle(h);
1296 }
1297 if let Some(c) = core.upgrade() {
1298 c.error_or_defer(pid, err_h);
1299 } else {
1300 binding.release_handle(err_h);
1301 }
1302 return;
1303 }
1304 None => {
1305 for h in buf.drain(..) {
1307 binding.release_handle(h);
1308 }
1309 return;
1310 }
1311 }
1312 }
1313 _ = ticker.tick() => {
1314 if !buf.is_empty() {
1315 if let Some(c) = core.upgrade() {
1316 let packed = binding.pack_tuple(pack_fn_id, &buf);
1317 c.emit_or_defer(pid, packed);
1318 for h in buf.drain(..) {
1319 binding.release_handle(h);
1320 }
1321 } else {
1322 for h in buf.drain(..) {
1323 binding.release_handle(h);
1324 }
1325 return;
1326 }
1327 }
1328 }
1329 }
1330 }
1331}
1332
1333enum WindowTimeCmd {
1339 Value(HandleId),
1341 Complete,
1343 Error(HandleId),
1345}
1346
1347#[must_use]
1357pub fn window_time(
1358 core: &Core,
1359 binding: &Arc<dyn ProducerBinding>,
1360 source: NodeId,
1361 ms: u64,
1362) -> NodeId {
1363 let core_weak = core.weak_handle();
1364 let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
1365 let period = Duration::from_millis(ms);
1366
1367 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
1368 let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
1369 return;
1370 };
1371 let pid = ctx.node_id();
1372 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
1373
1374 let noop_fn_id = binding_s.register_producer_build(Box::new(|_ctx: ProducerCtx<'_>| {
1378 }));
1381
1382 let first_inner = core_s
1384 .register_producer(noop_fn_id)
1385 .expect("window_time inner: register_producer failed");
1386 let first_handle = bb.intern_node(first_inner);
1387 core_s.emit_or_defer(pid, first_handle);
1388
1389 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
1390 let tx_sink = tx.clone();
1391 let tx_dead = tx.clone();
1392 let task = tokio::spawn(window_time_task(
1393 rx,
1394 core_s.weak_handle(),
1395 pid,
1396 first_inner,
1397 bb.clone(),
1398 period,
1399 noop_fn_id,
1400 ));
1401
1402 {
1403 let mut storage = binding_s.producer_storage().lock();
1404 let entry = storage.entry(pid).or_default();
1405 entry.op_state = Some(Box::new(WindowTimeTaskGuard { _tx: tx, task }));
1406 }
1407
1408 let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
1409 let source_sink: Sink = Arc::new(move |msgs| {
1410 for m in msgs {
1411 match m.tier() {
1412 3 => {
1413 if let Some(h) = m.payload_handle() {
1414 bb_sink.retain_handle(h);
1415 if tx_sink.send(WindowTimeCmd::Value(h)).is_err() {
1416 bb_sink.release_handle(h);
1417 }
1418 }
1419 }
1420 5 => {
1421 if let Some(h) = m.payload_handle() {
1422 bb_sink.retain_handle(h);
1423 if tx_sink.send(WindowTimeCmd::Error(h)).is_err() {
1424 bb_sink.release_handle(h);
1425 }
1426 } else {
1427 let _ = tx_sink.send(WindowTimeCmd::Complete);
1428 }
1429 }
1430 _ => {}
1431 }
1432 }
1433 });
1434
1435 let outcome = ctx.subscribe_to(source, source_sink);
1436 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
1437 let _ = tx_dead.send(WindowTimeCmd::Complete);
1438 }
1439 });
1440
1441 let fn_id = binding.register_producer_build(build);
1442 core.register_producer(fn_id)
1443 .expect("window_time: register_producer failed")
1444}
1445
1446struct WindowTimeTaskGuard {
1448 _tx: tokio::sync::mpsc::UnboundedSender<WindowTimeCmd>,
1449 task: tokio::task::JoinHandle<()>,
1450}
1451
1452impl Drop for WindowTimeTaskGuard {
1453 fn drop(&mut self) {
1454 self.task.abort();
1455 }
1456}
1457
1458async fn window_time_task(
1459 mut rx: tokio::sync::mpsc::UnboundedReceiver<WindowTimeCmd>,
1460 core: WeakCore,
1461 pid: NodeId,
1462 initial_inner: NodeId,
1463 binding: Arc<dyn BindingBoundary>,
1464 period: Duration,
1465 noop_fn_id: FnId,
1466) {
1467 let mut current_inner = initial_inner;
1468 let mut ticker = tokio::time::interval(period);
1469 ticker.tick().await; loop {
1472 tokio::select! {
1473 biased;
1474 cmd = rx.recv() => {
1475 match cmd {
1476 Some(WindowTimeCmd::Value(h)) => {
1477 if let Some(c) = core.upgrade() {
1479 c.emit_or_defer(current_inner, h);
1480 } else {
1481 binding.release_handle(h);
1482 return;
1483 }
1484 }
1485 Some(WindowTimeCmd::Complete) => {
1486 if let Some(c) = core.upgrade() {
1488 c.complete_or_defer(current_inner);
1489 c.complete_or_defer(pid);
1490 }
1491 return;
1492 }
1493 Some(WindowTimeCmd::Error(err_h)) => {
1494 if let Some(c) = core.upgrade() {
1496 binding.retain_handle(err_h);
1497 c.error_or_defer(current_inner, err_h);
1498 c.error_or_defer(pid, err_h);
1501 } else {
1502 binding.release_handle(err_h);
1503 }
1504 return;
1505 }
1506 None => {
1507 return;
1509 }
1510 }
1511 }
1512 _ = ticker.tick() => {
1513 if let Some(c) = core.upgrade() {
1515 c.complete_or_defer(current_inner);
1516
1517 if let Ok(new_inner) = c.register_producer(noop_fn_id) {
1521 current_inner = new_inner;
1522 let h = binding.intern_node(new_inner);
1523 c.emit_or_defer(pid, h);
1524 } else {
1525 c.complete_or_defer(pid);
1527 return;
1528 }
1529 } else {
1530 return;
1531 }
1532 }
1533 }
1534 }
1535}