graphrefly_operators/temporal.rs
1//! Temporal operators — time-dependent transforms on reactive streams.
2//!
3//! # Operators
4//!
5//! - [`sample`] — pure reactive; emits source's latest value when notifier
6//! fires. No timer needed.
7//! - [`debounce`] — emits after `ms` of quiet (no new upstream DATA).
8//! - [`throttle`] — rate-limits: at most one emission per `ms` window.
9//! Configurable leading / trailing edge.
10//! - [`delay`] — delays each upstream DATA by `ms`. Multiple in-flight.
11//! - [`audit`] — on first DATA, starts a `ms` timer; when it fires, emits
12//! the latest value. Timer does NOT restart on subsequent DATA within
13//! the window.
14//! - [`interval`] — source that emits a monotonic counter every `period_ms`.
15//! - [`timeout`] — errors if no DATA arrives within `ms` after subscribe or
16//! after the previous DATA.
17//! - [`buffer_time`] — collects upstream DATA into a buffer and flushes as
18//! a packed tuple every `ms` milliseconds.
19//! - [`window_time`] — rotates inner sub-nodes every `ms` milliseconds;
20//! upstream DATA is forwarded to the current window node.
21//!
22//! # Architecture
23//!
24//! Timer operators (debounce, throttle, delay, audit) spawn a **per-operator
25//! tokio task** that owns all pending state (handles, counters) exclusively.
26//! The sync sink callback sends [`TemporalCmd`] commands; the async task
27//! manages timers via `tokio::time` and calls `Core::emit` /
28//! `complete` / `error` when ready. This avoids the
29//! double-ownership problem that would arise from tracking pending handles
30//! in both the operator's state mutex and the generic timer substrate.
31//!
32//! `sample` is a pure-reactive producer-pattern node with two subscriptions
33//! (source + notifier) and no timer dependency.
34//!
35//! All temporal operators require a **tokio runtime context** at activation
36//! time (first subscriber triggers the build closure, which calls
37//! `tokio::spawn`).
38
39use std::cell::RefCell;
40use std::collections::VecDeque;
41use std::rc::Rc;
42use std::sync::Arc;
43use std::time::Duration;
44
45use parking_lot::Mutex;
46
47use graphrefly_core::{BindingBoundary, Core, FnId, HandleId, NodeId, Sink};
48use smallvec::SmallVec;
49
50use crate::producer::{
51 MailboxEmitter, ProducerBinding, ProducerBuildFn, ProducerCtx, SubscribeOutcome,
52};
53
54// =========================================================================
55// Shared command type
56// =========================================================================
57
58/// Command sent from operator sink callbacks (sync) to the per-operator
59/// async task. Channel close = shutdown (producer deactivated).
60enum TemporalCmd {
61 /// New DATA handle from upstream. Already retained by the sink.
62 Value(HandleId),
63 /// Upstream COMPLETE. Flush pending if applicable, then complete.
64 Complete,
65 /// Upstream ERROR. Cancel all, propagate. Handle already retained.
66 Error(HandleId),
67}
68
69/// RAII wrapper for temporal operator tasks. On drop, closes the command
70/// channel first (triggering the task's cleanup path which releases pending
71/// handles), then aborts the task as a fallback if it hasn't exited.
72struct TemporalTaskGuard {
73 /// Dropping the sender closes the channel → task sees `None` on
74 /// `rx.recv()` and releases all pending handles before returning.
75 _tx: tokio::sync::mpsc::UnboundedSender<TemporalCmd>,
76 task: tokio::task::JoinHandle<()>,
77}
78
79impl Drop for TemporalTaskGuard {
80 fn drop(&mut self) {
81 // _tx drops first (field order), closing the channel.
82 // Abort as fallback in case the task is stuck on a non-channel await.
83 self.task.abort();
84 }
85}
86
87/// Simple abort-on-drop for tasks with no pending handle state (e.g. interval).
88struct AbortOnDrop(tokio::task::JoinHandle<()>);
89
90impl Drop for AbortOnDrop {
91 fn drop(&mut self) {
92 self.0.abort();
93 }
94}
95
96// =========================================================================
97// sample(source, notifier) — pure reactive, no timer
98// =========================================================================
99
100/// Emits the source's latest DATA each time `notifier` delivers DATA.
101///
102/// - Source COMPLETE clears stored value; subsequent notifier DATAs no-op.
103/// - Notifier COMPLETE terminates the sample node.
104/// - Either dep ERROR terminates immediately.
105#[must_use]
106#[allow(clippy::too_many_lines)]
107pub fn sample(
108 core: &Core,
109 binding: &Arc<dyn ProducerBinding>,
110 source: NodeId,
111 notifier: NodeId,
112) -> NodeId {
113 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
114 let core_s = ctx.core();
115 let binding_s = ctx.core().binding();
116 let em = ctx.emitter();
117 let pid = ctx.node_id();
118 let state: Rc<RefCell<SampleState>> = Rc::new(RefCell::new(SampleState::default()));
119
120 // --- source sink ---
121 let st = state.clone();
122 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
123 let core_src = em.clone();
124 let source_sink: Sink = Rc::new(move |msgs| {
125 enum Act {
126 Release(HandleId),
127 Error(HandleId),
128 }
129 let mut actions: SmallVec<[Act; 2]> = SmallVec::new();
130 {
131 let mut s = st.borrow_mut();
132 if s.terminated {
133 return;
134 }
135 for m in msgs {
136 match m.tier() {
137 3 => {
138 if let Some(h) = m.payload_handle() {
139 if let Some(old) = s.latest.replace(h) {
140 actions.push(Act::Release(old));
141 }
142 bb.retain_handle(h);
143 }
144 }
145 5 => {
146 if let Some(h) = m.payload_handle() {
147 s.terminated = true;
148 if let Some(old) = s.latest.take() {
149 actions.push(Act::Release(old));
150 }
151 bb.retain_handle(h);
152 actions.push(Act::Error(h));
153 } else {
154 s.source_completed = true;
155 if let Some(old) = s.latest.take() {
156 actions.push(Act::Release(old));
157 }
158 }
159 }
160 _ => {}
161 }
162 }
163 }
164 for a in actions {
165 match a {
166 Act::Release(h) => bb.release_handle(h),
167 Act::Error(h) => core_src.error(pid, h),
168 }
169 }
170 });
171
172 let src_outcome = ctx.subscribe_to(source, source_sink);
173 if matches!(src_outcome, SubscribeOutcome::Dead { .. }) {
174 state.borrow_mut().source_completed = true;
175 }
176
177 // --- notifier sink ---
178 let st2 = state.clone();
179 let core_n = em.clone();
180 let bb2: Arc<dyn BindingBoundary> = binding_s.clone();
181 let notifier_sink: Sink = Rc::new(move |msgs| {
182 let mut s = st2.borrow_mut();
183 if s.terminated {
184 return;
185 }
186 for m in msgs {
187 if s.terminated {
188 return;
189 }
190 match m.tier() {
191 3 if m.payload_handle().is_some()
192 && !s.source_completed
193 && s.latest.is_some() =>
194 {
195 let h = s.latest.unwrap();
196 bb2.retain_handle(h);
197 drop(s);
198 core_n.emit(pid, h);
199 // Re-acquire lock and continue processing remaining
200 // batch messages (e.g. a trailing Complete).
201 s = st2.borrow_mut();
202 }
203 5 => {
204 if let Some(h) = m.payload_handle() {
205 s.terminated = true;
206 if let Some(old) = s.latest.take() {
207 bb2.release_handle(old);
208 }
209 bb2.retain_handle(h);
210 drop(s);
211 core_n.error(pid, h);
212 return;
213 }
214 // Notifier COMPLETE → self-complete.
215 s.terminated = true;
216 if let Some(old) = s.latest.take() {
217 bb2.release_handle(old);
218 }
219 drop(s);
220 core_n.complete(pid);
221 return;
222 }
223 _ => {}
224 }
225 }
226 });
227
228 let not_outcome = ctx.subscribe_to(notifier, notifier_sink);
229 if matches!(not_outcome, SubscribeOutcome::Dead { .. }) {
230 let mut s = state.borrow_mut();
231 if !s.terminated {
232 s.terminated = true;
233 if let Some(old) = s.latest.take() {
234 binding_s.release_handle(old);
235 }
236 drop(s);
237 core_s.complete(pid);
238 }
239 }
240 });
241
242 let fn_id = binding.register_producer_build(build);
243 core.register_producer(fn_id)
244 .expect("sample: register_producer failed")
245}
246
247#[derive(Default)]
248struct SampleState {
249 latest: Option<HandleId>,
250 source_completed: bool,
251 terminated: bool,
252}
253
254// =========================================================================
255// debounce(source, ms)
256// =========================================================================
257
258/// Emits after `delay` of quiet — each new upstream DATA resets the timer.
259///
260/// On upstream COMPLETE, flushes pending (if any) then completes.
261/// On upstream ERROR, cancels and propagates immediately.
262#[must_use]
263pub fn debounce(
264 core: &Core,
265 binding: &Arc<dyn ProducerBinding>,
266 source: NodeId,
267 ms: u64,
268) -> NodeId {
269 let delay = Duration::from_millis(ms);
270
271 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
272 let binding_s = ctx.core().binding();
273 let em = ctx.emitter();
274 let pid = ctx.node_id();
275 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
276
277 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
278 let tx_sink = tx.clone();
279 let tx_dead = tx.clone();
280 let task = tokio::spawn(debounce_task(rx, em.emitter(), pid, bb.clone(), delay));
281
282 // Store guard: drops tx (clean shutdown) then aborts task (fallback).
283 {
284 let st = ctx.storage();
285 let mut storage = st.lock();
286 let entry = storage.entry(pid).or_default();
287 entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
288 }
289
290 let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
291 let source_sink: Sink = Rc::new(move |msgs| {
292 for m in msgs {
293 match m.tier() {
294 3 => {
295 if let Some(h) = m.payload_handle() {
296 bb_sink.retain_handle(h);
297 if tx_sink.send(TemporalCmd::Value(h)).is_err() {
298 bb_sink.release_handle(h);
299 }
300 }
301 }
302 5 => {
303 if let Some(h) = m.payload_handle() {
304 bb_sink.retain_handle(h);
305 if tx_sink.send(TemporalCmd::Error(h)).is_err() {
306 bb_sink.release_handle(h);
307 }
308 } else {
309 let _ = tx_sink.send(TemporalCmd::Complete);
310 }
311 }
312 _ => {}
313 }
314 }
315 });
316
317 let outcome = ctx.subscribe_to(source, source_sink);
318 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
319 let _ = tx_dead.send(TemporalCmd::Complete);
320 }
321 });
322
323 let fn_id = binding.register_producer_build(build);
324 core.register_producer(fn_id)
325 .expect("debounce: register_producer failed")
326}
327
328// S2b/D230/D232-AMEND: takes a `ProducerEmitter` (was `WeakCore`).
329// `em.{emit,complete,error}` post to the `Core`-owned mailbox and
330// internally release the payload handle if the `Core` is gone (F2,
331// QA-hardened) — so every old `if let Some(c)=core.upgrade(){ c.X }
332// else { binding.release_handle(h) }` collapses to a direct `em.X`.
333// (D274 dropped the `_or_defer` suffix from these method names.)
334// `em.is_core_gone()` preserves the old teardown-promptness where the
335// `else` branch also `return`ed (not leak-load-bearing — the task also
336// exits on channel-close — only prompt shutdown).
337async fn debounce_task(
338 mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
339 em: MailboxEmitter,
340 pid: NodeId,
341 binding: Arc<dyn BindingBoundary>,
342 delay: Duration,
343) {
344 let mut pending: Option<HandleId> = None;
345
346 loop {
347 if let Some(h) = pending {
348 tokio::select! {
349 biased;
350 cmd = rx.recv() => {
351 match cmd {
352 Some(TemporalCmd::Value(new_h)) => {
353 binding.release_handle(h);
354 pending = Some(new_h);
355 }
356 Some(TemporalCmd::Complete) => {
357 // Flush pending then complete (em releases
358 // `h` itself if the Core is gone).
359 em.emit(pid, h);
360 em.complete(pid);
361 return;
362 }
363 Some(TemporalCmd::Error(err_h)) => {
364 binding.release_handle(h);
365 em.error(pid, err_h);
366 return;
367 }
368 None => {
369 // Channel closed — deactivated.
370 binding.release_handle(h);
371 return;
372 }
373 }
374 }
375 () = tokio::time::sleep(delay) => {
376 // Timer fired — emit pending.
377 em.emit(pid, h);
378 if em.is_core_gone() {
379 return;
380 }
381 pending = None;
382 }
383 }
384 } else {
385 // No pending — wait for command.
386 match rx.recv().await {
387 Some(TemporalCmd::Value(h)) => {
388 pending = Some(h);
389 }
390 Some(TemporalCmd::Complete) => {
391 em.complete(pid);
392 return;
393 }
394 Some(TemporalCmd::Error(err_h)) => {
395 em.error(pid, err_h);
396 return;
397 }
398 None => return,
399 }
400 }
401 }
402}
403
404// =========================================================================
405// audit(source, ms)
406// =========================================================================
407
408/// On the first upstream DATA in each window, starts a `ms` timer. When
409/// the timer fires, emits the **latest** value received during the window.
410/// Subsequent DATA values within the window update the stored value but
411/// do NOT restart the timer.
412///
413/// Differs from [`debounce`]: debounce resets on each DATA (quiet-time);
414/// audit fires at a fixed interval after the first DATA in the window.
415#[must_use]
416pub fn audit(core: &Core, binding: &Arc<dyn ProducerBinding>, source: NodeId, ms: u64) -> NodeId {
417 let delay = Duration::from_millis(ms);
418
419 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
420 let binding_s = ctx.core().binding();
421 let em = ctx.emitter();
422 let pid = ctx.node_id();
423 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
424
425 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
426 let tx_sink = tx.clone();
427 let tx_dead = tx.clone();
428 let task = tokio::spawn(audit_task(rx, em.emitter(), pid, bb.clone(), delay));
429
430 {
431 let st = ctx.storage();
432 let mut storage = st.lock();
433 let entry = storage.entry(pid).or_default();
434 entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
435 }
436
437 let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
438 let source_sink: Sink = Rc::new(move |msgs| {
439 for m in msgs {
440 match m.tier() {
441 3 => {
442 if let Some(h) = m.payload_handle() {
443 bb_sink.retain_handle(h);
444 if tx_sink.send(TemporalCmd::Value(h)).is_err() {
445 bb_sink.release_handle(h);
446 }
447 }
448 }
449 5 => {
450 if let Some(h) = m.payload_handle() {
451 bb_sink.retain_handle(h);
452 if tx_sink.send(TemporalCmd::Error(h)).is_err() {
453 bb_sink.release_handle(h);
454 }
455 } else {
456 let _ = tx_sink.send(TemporalCmd::Complete);
457 }
458 }
459 _ => {}
460 }
461 }
462 });
463
464 let outcome = ctx.subscribe_to(source, source_sink);
465 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
466 let _ = tx_dead.send(TemporalCmd::Complete);
467 }
468 });
469
470 let fn_id = binding.register_producer_build(build);
471 core.register_producer(fn_id)
472 .expect("audit: register_producer failed")
473}
474
475// S2b/D230/D232-AMEND: `core: WeakCore` → `em: ProducerEmitter` (same
476// collapse rationale as `debounce_task`).
477async fn audit_task(
478 mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
479 em: MailboxEmitter,
480 pid: NodeId,
481 binding: Arc<dyn BindingBoundary>,
482 delay: Duration,
483) {
484 // Outer loop: wait for first DATA to start a window.
485 loop {
486 match rx.recv().await {
487 Some(TemporalCmd::Value(h)) => {
488 // First value in window — start timer, collect updates.
489 let mut latest = h;
490
491 tokio::select! {
492 biased;
493 // Drain commands until timer fires.
494 () = async {
495 loop {
496 tokio::select! {
497 biased;
498 cmd = rx.recv() => {
499 match cmd {
500 Some(TemporalCmd::Value(new_h)) => {
501 binding.release_handle(latest);
502 latest = new_h;
503 }
504 Some(TemporalCmd::Complete) => {
505 // Emit latest, then complete.
506 em.emit(pid, latest);
507 em.complete(pid);
508 return; // exits the async block
509 }
510 Some(TemporalCmd::Error(err_h)) => {
511 binding.release_handle(latest);
512 em.error(pid, err_h);
513 return;
514 }
515 None => {
516 binding.release_handle(latest);
517 return;
518 }
519 }
520 }
521 }
522 }
523 } => {
524 return; // Terminal command handled inside async block.
525 }
526 () = tokio::time::sleep(delay) => {
527 // Timer fired — emit latest, window closes.
528 em.emit(pid, latest);
529 if em.is_core_gone() {
530 return;
531 }
532 // Continue outer loop — wait for next window.
533 }
534 }
535 }
536 Some(TemporalCmd::Complete) => {
537 em.complete(pid);
538 return;
539 }
540 Some(TemporalCmd::Error(err_h)) => {
541 em.error(pid, err_h);
542 return;
543 }
544 None => return,
545 }
546 }
547}
548
549// =========================================================================
550// delay(source, ms)
551// =========================================================================
552
553/// Delays each upstream DATA by `ms` milliseconds. Multiple values can
554/// be in-flight simultaneously, each with its own timer.
555///
556/// - COMPLETE waits for all pending delays, then completes.
557/// - ERROR cancels all pending and propagates immediately.
558#[must_use]
559pub fn delay(core: &Core, binding: &Arc<dyn ProducerBinding>, source: NodeId, ms: u64) -> NodeId {
560 let delay_dur = Duration::from_millis(ms);
561
562 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
563 let binding_s = ctx.core().binding();
564 let em = ctx.emitter();
565 let pid = ctx.node_id();
566 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
567
568 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
569 let tx_sink = tx.clone();
570 let tx_dead = tx.clone();
571 let task = tokio::spawn(delay_task(rx, em.emitter(), pid, bb.clone(), delay_dur));
572
573 {
574 let st = ctx.storage();
575 let mut storage = st.lock();
576 let entry = storage.entry(pid).or_default();
577 entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
578 }
579
580 let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
581 let source_sink: Sink = Rc::new(move |msgs| {
582 for m in msgs {
583 match m.tier() {
584 3 => {
585 if let Some(h) = m.payload_handle() {
586 bb_sink.retain_handle(h);
587 if tx_sink.send(TemporalCmd::Value(h)).is_err() {
588 bb_sink.release_handle(h);
589 }
590 }
591 }
592 5 => {
593 if let Some(h) = m.payload_handle() {
594 bb_sink.retain_handle(h);
595 if tx_sink.send(TemporalCmd::Error(h)).is_err() {
596 bb_sink.release_handle(h);
597 }
598 } else {
599 let _ = tx_sink.send(TemporalCmd::Complete);
600 }
601 }
602 _ => {}
603 }
604 }
605 });
606
607 let outcome = ctx.subscribe_to(source, source_sink);
608 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
609 let _ = tx_dead.send(TemporalCmd::Complete);
610 }
611 });
612
613 let fn_id = binding.register_producer_build(build);
614 core.register_producer(fn_id)
615 .expect("delay: register_producer failed")
616}
617
618// S2b/D230/D232-AMEND: `core: WeakCore` → `em: ProducerEmitter`.
619async fn delay_task(
620 mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
621 em: MailboxEmitter,
622 pid: NodeId,
623 binding: Arc<dyn BindingBoundary>,
624 delay: Duration,
625) {
626 let mut queue: VecDeque<(tokio::time::Instant, HandleId)> = VecDeque::new();
627 let mut complete_pending = false;
628
629 loop {
630 let next_fire = queue.front().map(|(deadline, _)| *deadline);
631
632 tokio::select! {
633 biased;
634 cmd = rx.recv() => {
635 match cmd {
636 Some(TemporalCmd::Value(h)) => {
637 queue.push_back((tokio::time::Instant::now() + delay, h));
638 }
639 Some(TemporalCmd::Complete) => {
640 complete_pending = true;
641 if queue.is_empty() {
642 em.complete(pid);
643 return;
644 }
645 // Wait for pending timers to drain.
646 }
647 Some(TemporalCmd::Error(err_h)) => {
648 // Release all pending.
649 for (_, h) in queue.drain(..) {
650 binding.release_handle(h);
651 }
652 em.error(pid, err_h);
653 return;
654 }
655 None => {
656 for (_, h) in queue.drain(..) {
657 binding.release_handle(h);
658 }
659 return;
660 }
661 }
662 }
663 () = sleep_until_or_forever(next_fire) => {
664 // Fire all expired.
665 let now = tokio::time::Instant::now();
666 while let Some(&(deadline, _)) = queue.front() {
667 if deadline <= now {
668 let (_, h) = queue.pop_front().unwrap();
669 em.emit(pid, h);
670 if em.is_core_gone() {
671 for (_, h2) in queue.drain(..) {
672 binding.release_handle(h2);
673 }
674 return;
675 }
676 } else {
677 break;
678 }
679 }
680 if complete_pending && queue.is_empty() {
681 em.complete(pid);
682 return;
683 }
684 }
685 }
686 }
687}
688
689// =========================================================================
690// throttle(source, ms, opts)
691// =========================================================================
692
693/// Options for [`throttle`].
694#[derive(Debug, Clone, Copy)]
695pub struct ThrottleOpts {
696 /// Emit immediately at the leading edge of each window (default: true).
697 pub leading: bool,
698 /// Emit the latest value at the trailing edge of each window (default: false).
699 pub trailing: bool,
700}
701
702impl Default for ThrottleOpts {
703 fn default() -> Self {
704 Self {
705 leading: true,
706 trailing: false,
707 }
708 }
709}
710
711/// Rate-limits upstream emissions to at most one per `ms`-millisecond window.
712///
713/// With default options (`leading: true, trailing: false`), the first DATA
714/// in each window is emitted immediately; subsequent DATA within the window
715/// is dropped. With `trailing: true`, the latest DATA is emitted at window end.
716///
717/// On upstream COMPLETE, flushes trailing (if any) then completes.
718#[must_use]
719pub fn throttle(
720 core: &Core,
721 binding: &Arc<dyn ProducerBinding>,
722 source: NodeId,
723 ms: u64,
724 opts: ThrottleOpts,
725) -> NodeId {
726 let window = Duration::from_millis(ms);
727
728 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
729 let binding_s = ctx.core().binding();
730 let em = ctx.emitter();
731 let pid = ctx.node_id();
732 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
733
734 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
735 let tx_sink = tx.clone();
736 let tx_dead = tx.clone();
737 let task = tokio::spawn(throttle_task(
738 rx,
739 em.emitter(),
740 pid,
741 bb.clone(),
742 window,
743 opts,
744 ));
745
746 {
747 let st = ctx.storage();
748 let mut storage = st.lock();
749 let entry = storage.entry(pid).or_default();
750 entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
751 }
752
753 let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
754 let source_sink: Sink = Rc::new(move |msgs| {
755 for m in msgs {
756 match m.tier() {
757 3 => {
758 if let Some(h) = m.payload_handle() {
759 bb_sink.retain_handle(h);
760 if tx_sink.send(TemporalCmd::Value(h)).is_err() {
761 bb_sink.release_handle(h);
762 }
763 }
764 }
765 5 => {
766 if let Some(h) = m.payload_handle() {
767 bb_sink.retain_handle(h);
768 if tx_sink.send(TemporalCmd::Error(h)).is_err() {
769 bb_sink.release_handle(h);
770 }
771 } else {
772 let _ = tx_sink.send(TemporalCmd::Complete);
773 }
774 }
775 _ => {}
776 }
777 }
778 });
779
780 let outcome = ctx.subscribe_to(source, source_sink);
781 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
782 let _ = tx_dead.send(TemporalCmd::Complete);
783 }
784 });
785
786 let fn_id = binding.register_producer_build(build);
787 core.register_producer(fn_id)
788 .expect("throttle: register_producer failed")
789}
790
791async fn throttle_task(
792 mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
793 // S2b/D230/D232-AMEND: `WeakCore` → `ProducerEmitter`.
794 em: MailboxEmitter,
795 pid: NodeId,
796 binding: Arc<dyn BindingBoundary>,
797 window: Duration,
798 opts: ThrottleOpts,
799) {
800 let mut trailing_pending: Option<HandleId> = None;
801 let mut window_deadline: Option<tokio::time::Instant> = None;
802
803 loop {
804 let fire_at = if opts.trailing { window_deadline } else { None };
805
806 tokio::select! {
807 biased;
808 cmd = rx.recv() => {
809 match cmd {
810 Some(TemporalCmd::Value(h)) => {
811 let in_window = window_deadline
812 .is_some_and(|d| tokio::time::Instant::now() < d);
813
814 if !in_window {
815 // Window open — start new window.
816 window_deadline = Some(tokio::time::Instant::now() + window);
817 if opts.leading {
818 em.emit(pid, h);
819 if em.is_core_gone() {
820 release_opt(&mut trailing_pending, &*binding);
821 return;
822 }
823 } else if opts.trailing {
824 if let Some(old) = trailing_pending.replace(h) {
825 binding.release_handle(old);
826 }
827 } else {
828 binding.release_handle(h);
829 }
830 } else if opts.trailing {
831 // Inside window — store for trailing.
832 if let Some(old) = trailing_pending.replace(h) {
833 binding.release_handle(old);
834 }
835 } else {
836 binding.release_handle(h);
837 }
838 }
839 Some(TemporalCmd::Complete) => {
840 if let Some(h) = trailing_pending.take() {
841 em.emit(pid, h);
842 }
843 em.complete(pid);
844 return;
845 }
846 Some(TemporalCmd::Error(err_h)) => {
847 release_opt(&mut trailing_pending, &*binding);
848 em.error(pid, err_h);
849 return;
850 }
851 None => {
852 release_opt(&mut trailing_pending, &*binding);
853 return;
854 }
855 }
856 }
857 () = sleep_until_or_forever(fire_at) => {
858 // Window expired — emit trailing if any, reopen window.
859 window_deadline = None;
860 if let Some(h) = trailing_pending.take() {
861 em.emit(pid, h);
862 if em.is_core_gone() {
863 return;
864 }
865 // Start new window for the trailing emission.
866 window_deadline = Some(tokio::time::Instant::now() + window);
867 }
868 }
869 }
870 }
871}
872
873// =========================================================================
874// interval(period_ms) — timer source
875// =========================================================================
876
877/// Source that emits a monotonically increasing counter (`1, 2, 3, …`)
878/// every `period_ms` milliseconds. Counter starts at 1 (not 0) because
879/// `HandleId(0)` is the `NO_HANDLE` sentinel. Resubscribable: counter
880/// resets on deactivation + reactivation.
881///
882/// The emitted values use raw `HandleId::new(counter)`. Bindings should
883/// register these as integer handles.
884#[must_use]
885pub fn interval(core: &Core, binding: &Arc<dyn ProducerBinding>, period_ms: u64) -> NodeId {
886 let period = Duration::from_millis(period_ms);
887
888 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
889 let binding_s = ctx.core().binding();
890 let em = ctx.emitter();
891 let pid = ctx.node_id();
892 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
893 let em_task = em.emitter();
894
895 let task = tokio::spawn(async move {
896 let mut ticker = tokio::time::interval(period);
897 ticker.tick().await; // First tick is immediate — skip it.
898 let mut counter: u64 = 1; // Start at 1: HandleId(0) = NO_HANDLE.
899 loop {
900 ticker.tick().await;
901 // S2b/D230: stop ticking once the Core is gone (was the
902 // `weak.upgrade() == None` break); check BEFORE retaining
903 // so a dead Core never leaks a fresh counter handle.
904 if em_task.is_core_gone() {
905 break;
906 }
907 let h = HandleId::new(counter);
908 bb.retain_handle(h);
909 em_task.emit(pid, h);
910 counter += 1;
911 }
912 });
913
914 {
915 let st = ctx.storage();
916 let mut storage = st.lock();
917 let entry = storage.entry(pid).or_default();
918 entry.op_state = Some(Box::new(AbortOnDrop(task)));
919 }
920 });
921
922 let fn_id = binding.register_producer_build(build);
923 core.register_producer(fn_id)
924 .expect("interval: register_producer failed")
925}
926
927// =========================================================================
928// Helpers
929// =========================================================================
930
931/// Sleep until the given instant, or sleep forever if `None`.
932async fn sleep_until_or_forever(deadline: Option<tokio::time::Instant>) {
933 match deadline {
934 Some(d) => tokio::time::sleep_until(d).await,
935 None => std::future::pending::<()>().await,
936 }
937}
938
939/// Release an optional handle and set it to `None`.
940fn release_opt(opt: &mut Option<HandleId>, binding: &dyn BindingBoundary) {
941 if let Some(h) = opt.take() {
942 binding.release_handle(h);
943 }
944}
945
946// =========================================================================
947// timeout(source, ms, error_handle)
948// =========================================================================
949
950/// Errors if no upstream DATA arrives within `ms` milliseconds after
951/// subscribe or after the previous DATA. Each DATA resets the timer.
952///
953/// On timeout: retains `error_handle` and emits ERROR with it.
954/// On upstream COMPLETE: cancels timer, forwards complete.
955/// On upstream ERROR: cancels timer, forwards error.
956#[must_use]
957pub fn timeout(
958 core: &Core,
959 binding: &Arc<dyn ProducerBinding>,
960 source: NodeId,
961 ms: u64,
962 error_handle: HandleId,
963) -> NodeId {
964 let duration = Duration::from_millis(ms);
965
966 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
967 let binding_s = ctx.core().binding();
968 let em = ctx.emitter();
969 let pid = ctx.node_id();
970 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
971
972 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
973 let tx_sink = tx.clone();
974 let tx_dead = tx.clone();
975 let task = tokio::spawn(timeout_task(
976 rx,
977 em.emitter(),
978 pid,
979 bb.clone(),
980 duration,
981 error_handle,
982 ));
983
984 {
985 let st = ctx.storage();
986 let mut storage = st.lock();
987 let entry = storage.entry(pid).or_default();
988 entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
989 }
990
991 let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
992 let source_sink: Sink = Rc::new(move |msgs| {
993 for m in msgs {
994 match m.tier() {
995 3 => {
996 if let Some(h) = m.payload_handle() {
997 bb_sink.retain_handle(h);
998 if tx_sink.send(TemporalCmd::Value(h)).is_err() {
999 bb_sink.release_handle(h);
1000 }
1001 }
1002 }
1003 5 => {
1004 if let Some(h) = m.payload_handle() {
1005 bb_sink.retain_handle(h);
1006 if tx_sink.send(TemporalCmd::Error(h)).is_err() {
1007 bb_sink.release_handle(h);
1008 }
1009 } else {
1010 let _ = tx_sink.send(TemporalCmd::Complete);
1011 }
1012 }
1013 _ => {}
1014 }
1015 }
1016 });
1017
1018 let outcome = ctx.subscribe_to(source, source_sink);
1019 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
1020 let _ = tx_dead.send(TemporalCmd::Complete);
1021 }
1022 });
1023
1024 let fn_id = binding.register_producer_build(build);
1025 core.register_producer(fn_id)
1026 .expect("timeout: register_producer failed")
1027}
1028
1029// S2b/D230/D232-AMEND: `WeakCore` → `ProducerEmitter`.
1030async fn timeout_task(
1031 mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
1032 em: MailboxEmitter,
1033 pid: NodeId,
1034 binding: Arc<dyn BindingBoundary>,
1035 duration: Duration,
1036 error_handle: HandleId,
1037) {
1038 // The timer starts immediately on subscribe — if no DATA arrives
1039 // within `duration`, we fire the timeout error.
1040 loop {
1041 tokio::select! {
1042 biased;
1043 cmd = rx.recv() => {
1044 match cmd {
1045 Some(TemporalCmd::Value(h)) => {
1046 // DATA arrived — forward it and reset the timer
1047 // (the next loop iteration restarts the sleep).
1048 em.emit(pid, h);
1049 if em.is_core_gone() {
1050 return;
1051 }
1052 // Continue loop → resets the sleep timer.
1053 }
1054 Some(TemporalCmd::Complete) => {
1055 em.complete(pid);
1056 return;
1057 }
1058 Some(TemporalCmd::Error(err_h)) => {
1059 em.error(pid, err_h);
1060 return;
1061 }
1062 None => return,
1063 }
1064 }
1065 () = tokio::time::sleep(duration) => {
1066 // Timeout fired — emit error. Retain BEFORE the post
1067 // (skip if the Core is gone — nothing to error into).
1068 if !em.is_core_gone() {
1069 binding.retain_handle(error_handle);
1070 em.error(pid, error_handle);
1071 }
1072 return;
1073 }
1074 }
1075 }
1076}
1077
1078// =========================================================================
1079// buffer_time(source, ms, pack_fn_id)
1080// =========================================================================
1081
1082/// Command sent from the `buffer_time` sink to its async task.
1083enum BufferTimeCmd {
1084 /// New DATA handle from upstream. Already retained by the sink.
1085 Value(HandleId),
1086 /// Upstream COMPLETE.
1087 Complete,
1088 /// Upstream ERROR. Handle already retained.
1089 Error(HandleId),
1090}
1091
1092/// Collects upstream DATA handles into a buffer and flushes them as a
1093/// packed tuple every `ms` milliseconds.
1094///
1095/// - On interval tick: packs buffered handles via
1096/// `binding.pack_tuple(pack_fn_id, &buf)`, emits the result, releases
1097/// individual handles, clears buffer.
1098/// - On upstream COMPLETE: flushes remaining buffer, then completes.
1099/// - On upstream ERROR: releases buffered handles, forwards error.
1100#[must_use]
1101pub fn buffer_time(
1102 core: &Core,
1103 binding: &Arc<dyn ProducerBinding>,
1104 source: NodeId,
1105 ms: u64,
1106 pack_fn_id: FnId,
1107) -> NodeId {
1108 let period = Duration::from_millis(ms);
1109
1110 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
1111 let binding_s = ctx.core().binding();
1112 let em = ctx.emitter();
1113 let pid = ctx.node_id();
1114 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
1115
1116 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
1117 let tx_sink = tx.clone();
1118 let tx_dead = tx.clone();
1119 let task = tokio::spawn(buffer_time_task(
1120 rx,
1121 em.emitter(),
1122 pid,
1123 bb.clone(),
1124 period,
1125 pack_fn_id,
1126 ));
1127
1128 {
1129 let st = ctx.storage();
1130 let mut storage = st.lock();
1131 let entry = storage.entry(pid).or_default();
1132 entry.op_state = Some(Box::new(BufferTimeTaskGuard { _tx: tx, task }));
1133 }
1134
1135 let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
1136 let source_sink: Sink = Rc::new(move |msgs| {
1137 for m in msgs {
1138 match m.tier() {
1139 3 => {
1140 if let Some(h) = m.payload_handle() {
1141 bb_sink.retain_handle(h);
1142 if tx_sink.send(BufferTimeCmd::Value(h)).is_err() {
1143 bb_sink.release_handle(h);
1144 }
1145 }
1146 }
1147 5 => {
1148 if let Some(h) = m.payload_handle() {
1149 bb_sink.retain_handle(h);
1150 if tx_sink.send(BufferTimeCmd::Error(h)).is_err() {
1151 bb_sink.release_handle(h);
1152 }
1153 } else {
1154 let _ = tx_sink.send(BufferTimeCmd::Complete);
1155 }
1156 }
1157 _ => {}
1158 }
1159 }
1160 });
1161
1162 let outcome = ctx.subscribe_to(source, source_sink);
1163 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
1164 let _ = tx_dead.send(BufferTimeCmd::Complete);
1165 }
1166 });
1167
1168 let fn_id = binding.register_producer_build(build);
1169 core.register_producer(fn_id)
1170 .expect("buffer_time: register_producer failed")
1171}
1172
1173/// RAII guard for `buffer_time` task — same pattern as [`TemporalTaskGuard`]
1174/// but with `BufferTimeCmd` channel.
1175struct BufferTimeTaskGuard {
1176 _tx: tokio::sync::mpsc::UnboundedSender<BufferTimeCmd>,
1177 task: tokio::task::JoinHandle<()>,
1178}
1179
1180impl Drop for BufferTimeTaskGuard {
1181 fn drop(&mut self) {
1182 self.task.abort();
1183 }
1184}
1185
1186// S2b/D230/D232-AMEND: `WeakCore` → `ProducerEmitter`. Note: the
1187// emit-path now always `pack_tuple`s even on a gone Core (em releases
1188// the packed handle); the buffered component handles are drained-released
1189// exactly as before — behaviour-equivalent, just one extra teardown-only
1190// pack FFI.
1191async fn buffer_time_task(
1192 mut rx: tokio::sync::mpsc::UnboundedReceiver<BufferTimeCmd>,
1193 em: MailboxEmitter,
1194 pid: NodeId,
1195 binding: Arc<dyn BindingBoundary>,
1196 period: Duration,
1197 pack_fn_id: FnId,
1198) {
1199 let mut buf: Vec<HandleId> = Vec::new();
1200 let mut ticker = tokio::time::interval(period);
1201 ticker.tick().await; // First tick is immediate — skip it.
1202
1203 loop {
1204 tokio::select! {
1205 biased;
1206 cmd = rx.recv() => {
1207 match cmd {
1208 Some(BufferTimeCmd::Value(h)) => {
1209 buf.push(h);
1210 }
1211 Some(BufferTimeCmd::Complete) => {
1212 // Flush remaining buffer then complete.
1213 if !buf.is_empty() {
1214 let packed = binding.pack_tuple(pack_fn_id, &buf);
1215 em.emit(pid, packed);
1216 for h in buf.drain(..) {
1217 binding.release_handle(h);
1218 }
1219 }
1220 em.complete(pid);
1221 return;
1222 }
1223 Some(BufferTimeCmd::Error(err_h)) => {
1224 for h in buf.drain(..) {
1225 binding.release_handle(h);
1226 }
1227 em.error(pid, err_h);
1228 return;
1229 }
1230 None => {
1231 // Channel closed — deactivated. Release buffered.
1232 for h in buf.drain(..) {
1233 binding.release_handle(h);
1234 }
1235 return;
1236 }
1237 }
1238 }
1239 _ = ticker.tick() => {
1240 if !buf.is_empty() {
1241 let packed = binding.pack_tuple(pack_fn_id, &buf);
1242 em.emit(pid, packed);
1243 for h in buf.drain(..) {
1244 binding.release_handle(h);
1245 }
1246 if em.is_core_gone() {
1247 return;
1248 }
1249 }
1250 }
1251 }
1252 }
1253}
1254
1255// =========================================================================
1256// window_time(source, ms)
1257// =========================================================================
1258
1259/// Command sent from the `window_time` sink to its async task.
1260enum WindowTimeCmd {
1261 /// New DATA handle from upstream. Already retained by the sink.
1262 Value(HandleId),
1263 /// Upstream COMPLETE.
1264 Complete,
1265 /// Upstream ERROR. Handle already retained.
1266 Error(HandleId),
1267}
1268
1269/// Rotates sub-node windows every `ms` milliseconds.
1270///
1271/// Creates a new inner state node each interval tick. Upstream DATA is
1272/// forwarded to the current inner window node. On tick, the current
1273/// window is completed, a new inner node is created, and its identity
1274/// is emitted as a DATA handle (via `binding.intern_node`).
1275///
1276/// - On upstream COMPLETE: completes current window, then completes self.
1277/// - On upstream ERROR: errors current window, then errors self.
1278#[must_use]
1279pub fn window_time(
1280 core: &Core,
1281 binding: &Arc<dyn ProducerBinding>,
1282 source: NodeId,
1283 ms: u64,
1284) -> NodeId {
1285 let period = Duration::from_millis(ms);
1286
1287 // S2b/D231: register the reusable no-op inner-window build at FACTORY
1288 // scope (where `binding: &Arc<dyn ProducerBinding>` is in hand) — the
1289 // build closure no longer holds a `ProducerBinding`, only `ctx`.
1290 // `FnId` is `Copy`; capture it into the build/task.
1291 let noop_fn_id = binding.register_producer_build(Box::new(|_ctx: ProducerCtx<'_>| {
1292 // Inner window nodes are passive — all emissions are driven by
1293 // the parent window_time task via the mailbox.
1294 }));
1295
1296 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
1297 let core_s = ctx.core();
1298 let binding_s = ctx.core().binding();
1299 let em = ctx.emitter();
1300 let pid = ctx.node_id();
1301 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
1302
1303 // Create the first inner window node and emit its handle.
1304 let first_inner = core_s
1305 .register_producer(noop_fn_id)
1306 .expect("window_time inner: register_producer failed");
1307 let first_handle = bb.intern_node(first_inner);
1308 core_s.emit(pid, first_handle);
1309
1310 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
1311 let tx_sink = tx.clone();
1312 let tx_dead = tx.clone();
1313 let task = tokio::spawn(window_time_task(
1314 rx,
1315 em.emitter(),
1316 pid,
1317 first_inner,
1318 bb.clone(),
1319 period,
1320 noop_fn_id,
1321 ));
1322
1323 {
1324 let st = ctx.storage();
1325 let mut storage = st.lock();
1326 let entry = storage.entry(pid).or_default();
1327 entry.op_state = Some(Box::new(WindowTimeTaskGuard { _tx: tx, task }));
1328 }
1329
1330 let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
1331 let source_sink: Sink = Rc::new(move |msgs| {
1332 for m in msgs {
1333 match m.tier() {
1334 3 => {
1335 if let Some(h) = m.payload_handle() {
1336 bb_sink.retain_handle(h);
1337 if tx_sink.send(WindowTimeCmd::Value(h)).is_err() {
1338 bb_sink.release_handle(h);
1339 }
1340 }
1341 }
1342 5 => {
1343 if let Some(h) = m.payload_handle() {
1344 bb_sink.retain_handle(h);
1345 if tx_sink.send(WindowTimeCmd::Error(h)).is_err() {
1346 bb_sink.release_handle(h);
1347 }
1348 } else {
1349 let _ = tx_sink.send(WindowTimeCmd::Complete);
1350 }
1351 }
1352 _ => {}
1353 }
1354 }
1355 });
1356
1357 let outcome = ctx.subscribe_to(source, source_sink);
1358 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
1359 let _ = tx_dead.send(WindowTimeCmd::Complete);
1360 }
1361 });
1362
1363 let fn_id = binding.register_producer_build(build);
1364 core.register_producer(fn_id)
1365 .expect("window_time: register_producer failed")
1366}
1367
1368/// RAII guard for `window_time` task.
1369struct WindowTimeTaskGuard {
1370 _tx: tokio::sync::mpsc::UnboundedSender<WindowTimeCmd>,
1371 task: tokio::task::JoinHandle<()>,
1372}
1373
1374impl Drop for WindowTimeTaskGuard {
1375 fn drop(&mut self) {
1376 self.task.abort();
1377 }
1378}
1379
1380// S2b/D230/D234: `WeakCore` → `ProducerEmitter`; the window rotation
1381// does task-side topology mutation (`register_producer`) so it routes
1382// through `em.defer` (`CoreFull`). `current_inner` (the routing
1383// selector) becomes a shared `Rc<RefCell<NodeId>>` read INSIDE the
1384// owner-serialized defer closures, so the FIFO mailbox order
1385// (arrival order) keeps every forward routed to the window live at the
1386// time it arrived — the D234 invariant (same as `window`).
1387async fn window_time_task(
1388 mut rx: tokio::sync::mpsc::UnboundedReceiver<WindowTimeCmd>,
1389 em: MailboxEmitter,
1390 pid: NodeId,
1391 initial_inner: NodeId,
1392 binding: Arc<dyn BindingBoundary>,
1393 period: Duration,
1394 noop_fn_id: FnId,
1395) {
1396 // Cat-1/2 (D273): the tokio-driven `WindowTimeCmd` loop posts deferred
1397 // closures into `em.defer(move |c| ...)` which requires `Send + 'static`.
1398 // The captured `current_inner` MUST be `Send + Sync` ⇒ stays as
1399 // `Arc<Mutex<...>>` (compiler-enforced via the `Send + Sync` bound on
1400 // `defer`'s `F`).
1401 #[allow(clippy::arc_with_non_send_sync)]
1402 let current_inner = Arc::new(Mutex::new(initial_inner));
1403 let mut ticker = tokio::time::interval(period);
1404 ticker.tick().await; // First tick is immediate — skip it.
1405
1406 loop {
1407 tokio::select! {
1408 biased;
1409 cmd = rx.recv() => {
1410 match cmd {
1411 Some(WindowTimeCmd::Value(h)) => {
1412 // Forward DATA to whatever window is current when
1413 // this defer runs (FIFO-ordered after any rotation
1414 // posted earlier).
1415 let cur = current_inner.clone();
1416 let b = binding.clone();
1417 if !em.defer(move |c| {
1418 c.emit(*cur.lock(), h);
1419 }) {
1420 b.release_handle(h);
1421 }
1422 }
1423 Some(WindowTimeCmd::Complete) => {
1424 let cur = current_inner.clone();
1425 let _ = em.defer(move |c| {
1426 c.complete(*cur.lock());
1427 c.complete(pid);
1428 });
1429 return;
1430 }
1431 Some(WindowTimeCmd::Error(err_h)) => {
1432 // err_h retained once by the sink; the 2nd retain
1433 // (for the self-error) is done INSIDE the closure
1434 // so a Core-gone drop leaves exactly the sink's
1435 // one retain to release (mirrors the old `else`).
1436 let cur = current_inner.clone();
1437 let b = binding.clone();
1438 let b2 = binding.clone();
1439 if !em.defer(move |c| {
1440 b2.retain_handle(err_h);
1441 c.error(*cur.lock(), err_h);
1442 c.error(pid, err_h);
1443 }) {
1444 b.release_handle(err_h);
1445 }
1446 return;
1447 }
1448 None => {
1449 // Channel closed — deactivated.
1450 return;
1451 }
1452 }
1453 }
1454 _ = ticker.tick() => {
1455 // Window rotation: complete current, create new, emit
1456 // handle — ALL inside one owner-side closure so the
1457 // `register_producer` + `current_inner` swap is atomic
1458 // w.r.t. the FIFO drain (D234).
1459 let cur = current_inner.clone();
1460 let b = binding.clone();
1461 let _ = em.defer(move |c| {
1462 let old = *cur.lock();
1463 c.complete(old);
1464 match c.register_producer(noop_fn_id) {
1465 Ok(new_inner) => {
1466 *cur.lock() = new_inner;
1467 let h = b.intern_node(new_inner);
1468 c.emit(pid, h);
1469 }
1470 Err(_) => {
1471 // Registration failed — complete self.
1472 c.complete(pid);
1473 }
1474 }
1475 });
1476 if em.is_core_gone() {
1477 return;
1478 }
1479 }
1480 }
1481 }
1482}