graphrefly_operators/temporal.rs
1// D248: post-S2c the substrate is `!Send + !Sync` single-owner Core; the
2// Sink/TopologySink callbacks were deliberately relaxed to `Arc<dyn Fn>`
3// (dropped `+ Send + Sync`). Rc would suffice and is the architecturally
4// correct type for inherently single-owner sinks — the Arc→Rc cleanup is
5// a separate slice tracked in porting-deferred.md. Until then, `Arc` is
6// over-conservative but correct, and this file's Arc<Sink> sites cite
7// the deliberate D248 relaxation, not a missed Send+Sync bound.
8#![allow(clippy::arc_with_non_send_sync)]
9
10//! Temporal operators — time-dependent transforms on reactive streams.
11//!
12//! # Operators
13//!
14//! - [`sample`] — pure reactive; emits source's latest value when notifier
15//! fires. No timer needed.
16//! - [`debounce`] — emits after `ms` of quiet (no new upstream DATA).
17//! - [`throttle`] — rate-limits: at most one emission per `ms` window.
18//! Configurable leading / trailing edge.
19//! - [`delay`] — delays each upstream DATA by `ms`. Multiple in-flight.
20//! - [`audit`] — on first DATA, starts a `ms` timer; when it fires, emits
21//! the latest value. Timer does NOT restart on subsequent DATA within
22//! the window.
23//! - [`interval`] — source that emits a monotonic counter every `period_ms`.
24//! - [`timeout`] — errors if no DATA arrives within `ms` after subscribe or
25//! after the previous DATA.
26//! - [`buffer_time`] — collects upstream DATA into a buffer and flushes as
27//! a packed tuple every `ms` milliseconds.
28//! - [`window_time`] — rotates inner sub-nodes every `ms` milliseconds;
29//! upstream DATA is forwarded to the current window node.
30//!
31//! # Architecture
32//!
33//! Timer operators (debounce, throttle, delay, audit) spawn a **per-operator
34//! tokio task** that owns all pending state (handles, counters) exclusively.
35//! The sync sink callback sends [`TemporalCmd`] commands; the async task
36//! manages timers via `tokio::time` and calls `Core::emit_or_defer` /
37//! `complete_or_defer` / `error_or_defer` when ready. This avoids the
38//! double-ownership problem that would arise from tracking pending handles
39//! in both the operator's state mutex and the generic timer substrate.
40//!
41//! `sample` is a pure-reactive producer-pattern node with two subscriptions
42//! (source + notifier) and no timer dependency.
43//!
44//! All temporal operators require a **tokio runtime context** at activation
45//! time (first subscriber triggers the build closure, which calls
46//! `tokio::spawn`).
47
48use std::collections::VecDeque;
49use std::sync::Arc;
50use std::time::Duration;
51
52use parking_lot::Mutex;
53
54use graphrefly_core::{BindingBoundary, Core, FnId, HandleId, NodeId, Sink};
55use smallvec::SmallVec;
56
57use crate::producer::{
58 MailboxEmitter, ProducerBinding, ProducerBuildFn, ProducerCtx, SubscribeOutcome,
59};
60
61// =========================================================================
62// Shared command type
63// =========================================================================
64
65/// Command sent from operator sink callbacks (sync) to the per-operator
66/// async task. Channel close = shutdown (producer deactivated).
67enum TemporalCmd {
68 /// New DATA handle from upstream. Already retained by the sink.
69 Value(HandleId),
70 /// Upstream COMPLETE. Flush pending if applicable, then complete.
71 Complete,
72 /// Upstream ERROR. Cancel all, propagate. Handle already retained.
73 Error(HandleId),
74}
75
76/// RAII wrapper for temporal operator tasks. On drop, closes the command
77/// channel first (triggering the task's cleanup path which releases pending
78/// handles), then aborts the task as a fallback if it hasn't exited.
79struct TemporalTaskGuard {
80 /// Dropping the sender closes the channel → task sees `None` on
81 /// `rx.recv()` and releases all pending handles before returning.
82 _tx: tokio::sync::mpsc::UnboundedSender<TemporalCmd>,
83 task: tokio::task::JoinHandle<()>,
84}
85
86impl Drop for TemporalTaskGuard {
87 fn drop(&mut self) {
88 // _tx drops first (field order), closing the channel.
89 // Abort as fallback in case the task is stuck on a non-channel await.
90 self.task.abort();
91 }
92}
93
94/// Simple abort-on-drop for tasks with no pending handle state (e.g. interval).
95struct AbortOnDrop(tokio::task::JoinHandle<()>);
96
97impl Drop for AbortOnDrop {
98 fn drop(&mut self) {
99 self.0.abort();
100 }
101}
102
103// =========================================================================
104// sample(source, notifier) — pure reactive, no timer
105// =========================================================================
106
107/// Emits the source's latest DATA each time `notifier` delivers DATA.
108///
109/// - Source COMPLETE clears stored value; subsequent notifier DATAs no-op.
110/// - Notifier COMPLETE terminates the sample node.
111/// - Either dep ERROR terminates immediately.
112#[must_use]
113#[allow(clippy::too_many_lines)]
114pub fn sample(
115 core: &Core,
116 binding: &Arc<dyn ProducerBinding>,
117 source: NodeId,
118 notifier: NodeId,
119) -> NodeId {
120 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
121 let core_s = ctx.core();
122 let binding_s = ctx.core().binding();
123 let em = ctx.emitter();
124 let pid = ctx.node_id();
125 let state: Arc<Mutex<SampleState>> = Arc::new(Mutex::new(SampleState::default()));
126
127 // --- source sink ---
128 let st = state.clone();
129 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
130 let core_src = em.clone();
131 let source_sink: Sink = Arc::new(move |msgs| {
132 enum Act {
133 Release(HandleId),
134 Error(HandleId),
135 }
136 let mut actions: SmallVec<[Act; 2]> = SmallVec::new();
137 {
138 let mut s = st.lock();
139 if s.terminated {
140 return;
141 }
142 for m in msgs {
143 match m.tier() {
144 3 => {
145 if let Some(h) = m.payload_handle() {
146 if let Some(old) = s.latest.replace(h) {
147 actions.push(Act::Release(old));
148 }
149 bb.retain_handle(h);
150 }
151 }
152 5 => {
153 if let Some(h) = m.payload_handle() {
154 s.terminated = true;
155 if let Some(old) = s.latest.take() {
156 actions.push(Act::Release(old));
157 }
158 bb.retain_handle(h);
159 actions.push(Act::Error(h));
160 } else {
161 s.source_completed = true;
162 if let Some(old) = s.latest.take() {
163 actions.push(Act::Release(old));
164 }
165 }
166 }
167 _ => {}
168 }
169 }
170 }
171 for a in actions {
172 match a {
173 Act::Release(h) => bb.release_handle(h),
174 Act::Error(h) => core_src.error_or_defer(pid, h),
175 }
176 }
177 });
178
179 let src_outcome = ctx.subscribe_to(source, source_sink);
180 if matches!(src_outcome, SubscribeOutcome::Dead { .. }) {
181 state.lock().source_completed = true;
182 }
183
184 // --- notifier sink ---
185 let st2 = state.clone();
186 let core_n = em.clone();
187 let bb2: Arc<dyn BindingBoundary> = binding_s.clone();
188 let notifier_sink: Sink = Arc::new(move |msgs| {
189 let mut s = st2.lock();
190 if s.terminated {
191 return;
192 }
193 for m in msgs {
194 if s.terminated {
195 return;
196 }
197 match m.tier() {
198 3 if m.payload_handle().is_some()
199 && !s.source_completed
200 && s.latest.is_some() =>
201 {
202 let h = s.latest.unwrap();
203 bb2.retain_handle(h);
204 drop(s);
205 core_n.emit_or_defer(pid, h);
206 // Re-acquire lock and continue processing remaining
207 // batch messages (e.g. a trailing Complete).
208 s = st2.lock();
209 }
210 5 => {
211 if let Some(h) = m.payload_handle() {
212 s.terminated = true;
213 if let Some(old) = s.latest.take() {
214 bb2.release_handle(old);
215 }
216 bb2.retain_handle(h);
217 drop(s);
218 core_n.error_or_defer(pid, h);
219 return;
220 }
221 // Notifier COMPLETE → self-complete.
222 s.terminated = true;
223 if let Some(old) = s.latest.take() {
224 bb2.release_handle(old);
225 }
226 drop(s);
227 core_n.complete_or_defer(pid);
228 return;
229 }
230 _ => {}
231 }
232 }
233 });
234
235 let not_outcome = ctx.subscribe_to(notifier, notifier_sink);
236 if matches!(not_outcome, SubscribeOutcome::Dead { .. }) {
237 let mut s = state.lock();
238 if !s.terminated {
239 s.terminated = true;
240 if let Some(old) = s.latest.take() {
241 binding_s.release_handle(old);
242 }
243 drop(s);
244 core_s.complete_or_defer(pid);
245 }
246 }
247 });
248
249 let fn_id = binding.register_producer_build(build);
250 core.register_producer(fn_id)
251 .expect("sample: register_producer failed")
252}
253
254#[derive(Default)]
255struct SampleState {
256 latest: Option<HandleId>,
257 source_completed: bool,
258 terminated: bool,
259}
260
261// =========================================================================
262// debounce(source, ms)
263// =========================================================================
264
265/// Emits after `delay` of quiet — each new upstream DATA resets the timer.
266///
267/// On upstream COMPLETE, flushes pending (if any) then completes.
268/// On upstream ERROR, cancels and propagates immediately.
269#[must_use]
270pub fn debounce(
271 core: &Core,
272 binding: &Arc<dyn ProducerBinding>,
273 source: NodeId,
274 ms: u64,
275) -> NodeId {
276 let delay = Duration::from_millis(ms);
277
278 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
279 let binding_s = ctx.core().binding();
280 let em = ctx.emitter();
281 let pid = ctx.node_id();
282 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
283
284 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
285 let tx_sink = tx.clone();
286 let tx_dead = tx.clone();
287 let task = tokio::spawn(debounce_task(rx, em.emitter(), pid, bb.clone(), delay));
288
289 // Store guard: drops tx (clean shutdown) then aborts task (fallback).
290 {
291 let st = ctx.storage();
292 let mut storage = st.lock();
293 let entry = storage.entry(pid).or_default();
294 entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
295 }
296
297 let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
298 let source_sink: Sink = Arc::new(move |msgs| {
299 for m in msgs {
300 match m.tier() {
301 3 => {
302 if let Some(h) = m.payload_handle() {
303 bb_sink.retain_handle(h);
304 if tx_sink.send(TemporalCmd::Value(h)).is_err() {
305 bb_sink.release_handle(h);
306 }
307 }
308 }
309 5 => {
310 if let Some(h) = m.payload_handle() {
311 bb_sink.retain_handle(h);
312 if tx_sink.send(TemporalCmd::Error(h)).is_err() {
313 bb_sink.release_handle(h);
314 }
315 } else {
316 let _ = tx_sink.send(TemporalCmd::Complete);
317 }
318 }
319 _ => {}
320 }
321 }
322 });
323
324 let outcome = ctx.subscribe_to(source, source_sink);
325 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
326 let _ = tx_dead.send(TemporalCmd::Complete);
327 }
328 });
329
330 let fn_id = binding.register_producer_build(build);
331 core.register_producer(fn_id)
332 .expect("debounce: register_producer failed")
333}
334
335// S2b/D230/D232-AMEND: takes a `ProducerEmitter` (was `WeakCore`).
336// `em.{emit,complete,error}_or_defer` post to the `Core`-owned mailbox
337// and internally release the payload handle if the `Core` is gone (F2,
338// QA-hardened) — so every old `if let Some(c)=core.upgrade(){ c.X }
339// else { binding.release_handle(h) }` collapses to a direct `em.X`.
340// `em.is_core_gone()` preserves the old teardown-promptness where the
341// `else` branch also `return`ed (not leak-load-bearing — the task also
342// exits on channel-close — only prompt shutdown).
343async fn debounce_task(
344 mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
345 em: MailboxEmitter,
346 pid: NodeId,
347 binding: Arc<dyn BindingBoundary>,
348 delay: Duration,
349) {
350 let mut pending: Option<HandleId> = None;
351
352 loop {
353 if let Some(h) = pending {
354 tokio::select! {
355 biased;
356 cmd = rx.recv() => {
357 match cmd {
358 Some(TemporalCmd::Value(new_h)) => {
359 binding.release_handle(h);
360 pending = Some(new_h);
361 }
362 Some(TemporalCmd::Complete) => {
363 // Flush pending then complete (em releases
364 // `h` itself if the Core is gone).
365 em.emit_or_defer(pid, h);
366 em.complete_or_defer(pid);
367 return;
368 }
369 Some(TemporalCmd::Error(err_h)) => {
370 binding.release_handle(h);
371 em.error_or_defer(pid, err_h);
372 return;
373 }
374 None => {
375 // Channel closed — deactivated.
376 binding.release_handle(h);
377 return;
378 }
379 }
380 }
381 () = tokio::time::sleep(delay) => {
382 // Timer fired — emit pending.
383 em.emit_or_defer(pid, h);
384 if em.is_core_gone() {
385 return;
386 }
387 pending = None;
388 }
389 }
390 } else {
391 // No pending — wait for command.
392 match rx.recv().await {
393 Some(TemporalCmd::Value(h)) => {
394 pending = Some(h);
395 }
396 Some(TemporalCmd::Complete) => {
397 em.complete_or_defer(pid);
398 return;
399 }
400 Some(TemporalCmd::Error(err_h)) => {
401 em.error_or_defer(pid, err_h);
402 return;
403 }
404 None => return,
405 }
406 }
407 }
408}
409
410// =========================================================================
411// audit(source, ms)
412// =========================================================================
413
414/// On the first upstream DATA in each window, starts a `ms` timer. When
415/// the timer fires, emits the **latest** value received during the window.
416/// Subsequent DATA values within the window update the stored value but
417/// do NOT restart the timer.
418///
419/// Differs from [`debounce`]: debounce resets on each DATA (quiet-time);
420/// audit fires at a fixed interval after the first DATA in the window.
421#[must_use]
422pub fn audit(core: &Core, binding: &Arc<dyn ProducerBinding>, source: NodeId, ms: u64) -> NodeId {
423 let delay = Duration::from_millis(ms);
424
425 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
426 let binding_s = ctx.core().binding();
427 let em = ctx.emitter();
428 let pid = ctx.node_id();
429 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
430
431 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
432 let tx_sink = tx.clone();
433 let tx_dead = tx.clone();
434 let task = tokio::spawn(audit_task(rx, em.emitter(), pid, bb.clone(), delay));
435
436 {
437 let st = ctx.storage();
438 let mut storage = st.lock();
439 let entry = storage.entry(pid).or_default();
440 entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
441 }
442
443 let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
444 let source_sink: Sink = Arc::new(move |msgs| {
445 for m in msgs {
446 match m.tier() {
447 3 => {
448 if let Some(h) = m.payload_handle() {
449 bb_sink.retain_handle(h);
450 if tx_sink.send(TemporalCmd::Value(h)).is_err() {
451 bb_sink.release_handle(h);
452 }
453 }
454 }
455 5 => {
456 if let Some(h) = m.payload_handle() {
457 bb_sink.retain_handle(h);
458 if tx_sink.send(TemporalCmd::Error(h)).is_err() {
459 bb_sink.release_handle(h);
460 }
461 } else {
462 let _ = tx_sink.send(TemporalCmd::Complete);
463 }
464 }
465 _ => {}
466 }
467 }
468 });
469
470 let outcome = ctx.subscribe_to(source, source_sink);
471 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
472 let _ = tx_dead.send(TemporalCmd::Complete);
473 }
474 });
475
476 let fn_id = binding.register_producer_build(build);
477 core.register_producer(fn_id)
478 .expect("audit: register_producer failed")
479}
480
481// S2b/D230/D232-AMEND: `core: WeakCore` → `em: ProducerEmitter` (same
482// collapse rationale as `debounce_task`).
483async fn audit_task(
484 mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
485 em: MailboxEmitter,
486 pid: NodeId,
487 binding: Arc<dyn BindingBoundary>,
488 delay: Duration,
489) {
490 // Outer loop: wait for first DATA to start a window.
491 loop {
492 match rx.recv().await {
493 Some(TemporalCmd::Value(h)) => {
494 // First value in window — start timer, collect updates.
495 let mut latest = h;
496
497 tokio::select! {
498 biased;
499 // Drain commands until timer fires.
500 () = async {
501 loop {
502 tokio::select! {
503 biased;
504 cmd = rx.recv() => {
505 match cmd {
506 Some(TemporalCmd::Value(new_h)) => {
507 binding.release_handle(latest);
508 latest = new_h;
509 }
510 Some(TemporalCmd::Complete) => {
511 // Emit latest, then complete.
512 em.emit_or_defer(pid, latest);
513 em.complete_or_defer(pid);
514 return; // exits the async block
515 }
516 Some(TemporalCmd::Error(err_h)) => {
517 binding.release_handle(latest);
518 em.error_or_defer(pid, err_h);
519 return;
520 }
521 None => {
522 binding.release_handle(latest);
523 return;
524 }
525 }
526 }
527 }
528 }
529 } => {
530 return; // Terminal command handled inside async block.
531 }
532 () = tokio::time::sleep(delay) => {
533 // Timer fired — emit latest, window closes.
534 em.emit_or_defer(pid, latest);
535 if em.is_core_gone() {
536 return;
537 }
538 // Continue outer loop — wait for next window.
539 }
540 }
541 }
542 Some(TemporalCmd::Complete) => {
543 em.complete_or_defer(pid);
544 return;
545 }
546 Some(TemporalCmd::Error(err_h)) => {
547 em.error_or_defer(pid, err_h);
548 return;
549 }
550 None => return,
551 }
552 }
553}
554
555// =========================================================================
556// delay(source, ms)
557// =========================================================================
558
559/// Delays each upstream DATA by `ms` milliseconds. Multiple values can
560/// be in-flight simultaneously, each with its own timer.
561///
562/// - COMPLETE waits for all pending delays, then completes.
563/// - ERROR cancels all pending and propagates immediately.
564#[must_use]
565pub fn delay(core: &Core, binding: &Arc<dyn ProducerBinding>, source: NodeId, ms: u64) -> NodeId {
566 let delay_dur = Duration::from_millis(ms);
567
568 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
569 let binding_s = ctx.core().binding();
570 let em = ctx.emitter();
571 let pid = ctx.node_id();
572 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
573
574 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
575 let tx_sink = tx.clone();
576 let tx_dead = tx.clone();
577 let task = tokio::spawn(delay_task(rx, em.emitter(), pid, bb.clone(), delay_dur));
578
579 {
580 let st = ctx.storage();
581 let mut storage = st.lock();
582 let entry = storage.entry(pid).or_default();
583 entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
584 }
585
586 let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
587 let source_sink: Sink = Arc::new(move |msgs| {
588 for m in msgs {
589 match m.tier() {
590 3 => {
591 if let Some(h) = m.payload_handle() {
592 bb_sink.retain_handle(h);
593 if tx_sink.send(TemporalCmd::Value(h)).is_err() {
594 bb_sink.release_handle(h);
595 }
596 }
597 }
598 5 => {
599 if let Some(h) = m.payload_handle() {
600 bb_sink.retain_handle(h);
601 if tx_sink.send(TemporalCmd::Error(h)).is_err() {
602 bb_sink.release_handle(h);
603 }
604 } else {
605 let _ = tx_sink.send(TemporalCmd::Complete);
606 }
607 }
608 _ => {}
609 }
610 }
611 });
612
613 let outcome = ctx.subscribe_to(source, source_sink);
614 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
615 let _ = tx_dead.send(TemporalCmd::Complete);
616 }
617 });
618
619 let fn_id = binding.register_producer_build(build);
620 core.register_producer(fn_id)
621 .expect("delay: register_producer failed")
622}
623
624// S2b/D230/D232-AMEND: `core: WeakCore` → `em: ProducerEmitter`.
625async fn delay_task(
626 mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
627 em: MailboxEmitter,
628 pid: NodeId,
629 binding: Arc<dyn BindingBoundary>,
630 delay: Duration,
631) {
632 let mut queue: VecDeque<(tokio::time::Instant, HandleId)> = VecDeque::new();
633 let mut complete_pending = false;
634
635 loop {
636 let next_fire = queue.front().map(|(deadline, _)| *deadline);
637
638 tokio::select! {
639 biased;
640 cmd = rx.recv() => {
641 match cmd {
642 Some(TemporalCmd::Value(h)) => {
643 queue.push_back((tokio::time::Instant::now() + delay, h));
644 }
645 Some(TemporalCmd::Complete) => {
646 complete_pending = true;
647 if queue.is_empty() {
648 em.complete_or_defer(pid);
649 return;
650 }
651 // Wait for pending timers to drain.
652 }
653 Some(TemporalCmd::Error(err_h)) => {
654 // Release all pending.
655 for (_, h) in queue.drain(..) {
656 binding.release_handle(h);
657 }
658 em.error_or_defer(pid, err_h);
659 return;
660 }
661 None => {
662 for (_, h) in queue.drain(..) {
663 binding.release_handle(h);
664 }
665 return;
666 }
667 }
668 }
669 () = sleep_until_or_forever(next_fire) => {
670 // Fire all expired.
671 let now = tokio::time::Instant::now();
672 while let Some(&(deadline, _)) = queue.front() {
673 if deadline <= now {
674 let (_, h) = queue.pop_front().unwrap();
675 em.emit_or_defer(pid, h);
676 if em.is_core_gone() {
677 for (_, h2) in queue.drain(..) {
678 binding.release_handle(h2);
679 }
680 return;
681 }
682 } else {
683 break;
684 }
685 }
686 if complete_pending && queue.is_empty() {
687 em.complete_or_defer(pid);
688 return;
689 }
690 }
691 }
692 }
693}
694
695// =========================================================================
696// throttle(source, ms, opts)
697// =========================================================================
698
699/// Options for [`throttle`].
700#[derive(Debug, Clone, Copy)]
701pub struct ThrottleOpts {
702 /// Emit immediately at the leading edge of each window (default: true).
703 pub leading: bool,
704 /// Emit the latest value at the trailing edge of each window (default: false).
705 pub trailing: bool,
706}
707
708impl Default for ThrottleOpts {
709 fn default() -> Self {
710 Self {
711 leading: true,
712 trailing: false,
713 }
714 }
715}
716
717/// Rate-limits upstream emissions to at most one per `ms`-millisecond window.
718///
719/// With default options (`leading: true, trailing: false`), the first DATA
720/// in each window is emitted immediately; subsequent DATA within the window
721/// is dropped. With `trailing: true`, the latest DATA is emitted at window end.
722///
723/// On upstream COMPLETE, flushes trailing (if any) then completes.
724#[must_use]
725pub fn throttle(
726 core: &Core,
727 binding: &Arc<dyn ProducerBinding>,
728 source: NodeId,
729 ms: u64,
730 opts: ThrottleOpts,
731) -> NodeId {
732 let window = Duration::from_millis(ms);
733
734 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
735 let binding_s = ctx.core().binding();
736 let em = ctx.emitter();
737 let pid = ctx.node_id();
738 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
739
740 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
741 let tx_sink = tx.clone();
742 let tx_dead = tx.clone();
743 let task = tokio::spawn(throttle_task(
744 rx,
745 em.emitter(),
746 pid,
747 bb.clone(),
748 window,
749 opts,
750 ));
751
752 {
753 let st = ctx.storage();
754 let mut storage = st.lock();
755 let entry = storage.entry(pid).or_default();
756 entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
757 }
758
759 let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
760 let source_sink: Sink = Arc::new(move |msgs| {
761 for m in msgs {
762 match m.tier() {
763 3 => {
764 if let Some(h) = m.payload_handle() {
765 bb_sink.retain_handle(h);
766 if tx_sink.send(TemporalCmd::Value(h)).is_err() {
767 bb_sink.release_handle(h);
768 }
769 }
770 }
771 5 => {
772 if let Some(h) = m.payload_handle() {
773 bb_sink.retain_handle(h);
774 if tx_sink.send(TemporalCmd::Error(h)).is_err() {
775 bb_sink.release_handle(h);
776 }
777 } else {
778 let _ = tx_sink.send(TemporalCmd::Complete);
779 }
780 }
781 _ => {}
782 }
783 }
784 });
785
786 let outcome = ctx.subscribe_to(source, source_sink);
787 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
788 let _ = tx_dead.send(TemporalCmd::Complete);
789 }
790 });
791
792 let fn_id = binding.register_producer_build(build);
793 core.register_producer(fn_id)
794 .expect("throttle: register_producer failed")
795}
796
797async fn throttle_task(
798 mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
799 // S2b/D230/D232-AMEND: `WeakCore` → `ProducerEmitter`.
800 em: MailboxEmitter,
801 pid: NodeId,
802 binding: Arc<dyn BindingBoundary>,
803 window: Duration,
804 opts: ThrottleOpts,
805) {
806 let mut trailing_pending: Option<HandleId> = None;
807 let mut window_deadline: Option<tokio::time::Instant> = None;
808
809 loop {
810 let fire_at = if opts.trailing { window_deadline } else { None };
811
812 tokio::select! {
813 biased;
814 cmd = rx.recv() => {
815 match cmd {
816 Some(TemporalCmd::Value(h)) => {
817 let in_window = window_deadline
818 .is_some_and(|d| tokio::time::Instant::now() < d);
819
820 if !in_window {
821 // Window open — start new window.
822 window_deadline = Some(tokio::time::Instant::now() + window);
823 if opts.leading {
824 em.emit_or_defer(pid, h);
825 if em.is_core_gone() {
826 release_opt(&mut trailing_pending, &*binding);
827 return;
828 }
829 } else if opts.trailing {
830 if let Some(old) = trailing_pending.replace(h) {
831 binding.release_handle(old);
832 }
833 } else {
834 binding.release_handle(h);
835 }
836 } else if opts.trailing {
837 // Inside window — store for trailing.
838 if let Some(old) = trailing_pending.replace(h) {
839 binding.release_handle(old);
840 }
841 } else {
842 binding.release_handle(h);
843 }
844 }
845 Some(TemporalCmd::Complete) => {
846 if let Some(h) = trailing_pending.take() {
847 em.emit_or_defer(pid, h);
848 }
849 em.complete_or_defer(pid);
850 return;
851 }
852 Some(TemporalCmd::Error(err_h)) => {
853 release_opt(&mut trailing_pending, &*binding);
854 em.error_or_defer(pid, err_h);
855 return;
856 }
857 None => {
858 release_opt(&mut trailing_pending, &*binding);
859 return;
860 }
861 }
862 }
863 () = sleep_until_or_forever(fire_at) => {
864 // Window expired — emit trailing if any, reopen window.
865 window_deadline = None;
866 if let Some(h) = trailing_pending.take() {
867 em.emit_or_defer(pid, h);
868 if em.is_core_gone() {
869 return;
870 }
871 // Start new window for the trailing emission.
872 window_deadline = Some(tokio::time::Instant::now() + window);
873 }
874 }
875 }
876 }
877}
878
879// =========================================================================
880// interval(period_ms) — timer source
881// =========================================================================
882
883/// Source that emits a monotonically increasing counter (`1, 2, 3, …`)
884/// every `period_ms` milliseconds. Counter starts at 1 (not 0) because
885/// `HandleId(0)` is the `NO_HANDLE` sentinel. Resubscribable: counter
886/// resets on deactivation + reactivation.
887///
888/// The emitted values use raw `HandleId::new(counter)`. Bindings should
889/// register these as integer handles.
890#[must_use]
891pub fn interval(core: &Core, binding: &Arc<dyn ProducerBinding>, period_ms: u64) -> NodeId {
892 let period = Duration::from_millis(period_ms);
893
894 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
895 let binding_s = ctx.core().binding();
896 let em = ctx.emitter();
897 let pid = ctx.node_id();
898 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
899 let em_task = em.emitter();
900
901 let task = tokio::spawn(async move {
902 let mut ticker = tokio::time::interval(period);
903 ticker.tick().await; // First tick is immediate — skip it.
904 let mut counter: u64 = 1; // Start at 1: HandleId(0) = NO_HANDLE.
905 loop {
906 ticker.tick().await;
907 // S2b/D230: stop ticking once the Core is gone (was the
908 // `weak.upgrade() == None` break); check BEFORE retaining
909 // so a dead Core never leaks a fresh counter handle.
910 if em_task.is_core_gone() {
911 break;
912 }
913 let h = HandleId::new(counter);
914 bb.retain_handle(h);
915 em_task.emit_or_defer(pid, h);
916 counter += 1;
917 }
918 });
919
920 {
921 let st = ctx.storage();
922 let mut storage = st.lock();
923 let entry = storage.entry(pid).or_default();
924 entry.op_state = Some(Box::new(AbortOnDrop(task)));
925 }
926 });
927
928 let fn_id = binding.register_producer_build(build);
929 core.register_producer(fn_id)
930 .expect("interval: register_producer failed")
931}
932
933// =========================================================================
934// Helpers
935// =========================================================================
936
937/// Sleep until the given instant, or sleep forever if `None`.
938async fn sleep_until_or_forever(deadline: Option<tokio::time::Instant>) {
939 match deadline {
940 Some(d) => tokio::time::sleep_until(d).await,
941 None => std::future::pending::<()>().await,
942 }
943}
944
945/// Release an optional handle and set it to `None`.
946fn release_opt(opt: &mut Option<HandleId>, binding: &dyn BindingBoundary) {
947 if let Some(h) = opt.take() {
948 binding.release_handle(h);
949 }
950}
951
952// =========================================================================
953// timeout(source, ms, error_handle)
954// =========================================================================
955
956/// Errors if no upstream DATA arrives within `ms` milliseconds after
957/// subscribe or after the previous DATA. Each DATA resets the timer.
958///
959/// On timeout: retains `error_handle` and emits ERROR with it.
960/// On upstream COMPLETE: cancels timer, forwards complete.
961/// On upstream ERROR: cancels timer, forwards error.
962#[must_use]
963pub fn timeout(
964 core: &Core,
965 binding: &Arc<dyn ProducerBinding>,
966 source: NodeId,
967 ms: u64,
968 error_handle: HandleId,
969) -> NodeId {
970 let duration = Duration::from_millis(ms);
971
972 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
973 let binding_s = ctx.core().binding();
974 let em = ctx.emitter();
975 let pid = ctx.node_id();
976 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
977
978 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
979 let tx_sink = tx.clone();
980 let tx_dead = tx.clone();
981 let task = tokio::spawn(timeout_task(
982 rx,
983 em.emitter(),
984 pid,
985 bb.clone(),
986 duration,
987 error_handle,
988 ));
989
990 {
991 let st = ctx.storage();
992 let mut storage = st.lock();
993 let entry = storage.entry(pid).or_default();
994 entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
995 }
996
997 let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
998 let source_sink: Sink = Arc::new(move |msgs| {
999 for m in msgs {
1000 match m.tier() {
1001 3 => {
1002 if let Some(h) = m.payload_handle() {
1003 bb_sink.retain_handle(h);
1004 if tx_sink.send(TemporalCmd::Value(h)).is_err() {
1005 bb_sink.release_handle(h);
1006 }
1007 }
1008 }
1009 5 => {
1010 if let Some(h) = m.payload_handle() {
1011 bb_sink.retain_handle(h);
1012 if tx_sink.send(TemporalCmd::Error(h)).is_err() {
1013 bb_sink.release_handle(h);
1014 }
1015 } else {
1016 let _ = tx_sink.send(TemporalCmd::Complete);
1017 }
1018 }
1019 _ => {}
1020 }
1021 }
1022 });
1023
1024 let outcome = ctx.subscribe_to(source, source_sink);
1025 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
1026 let _ = tx_dead.send(TemporalCmd::Complete);
1027 }
1028 });
1029
1030 let fn_id = binding.register_producer_build(build);
1031 core.register_producer(fn_id)
1032 .expect("timeout: register_producer failed")
1033}
1034
1035// S2b/D230/D232-AMEND: `WeakCore` → `ProducerEmitter`.
1036async fn timeout_task(
1037 mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
1038 em: MailboxEmitter,
1039 pid: NodeId,
1040 binding: Arc<dyn BindingBoundary>,
1041 duration: Duration,
1042 error_handle: HandleId,
1043) {
1044 // The timer starts immediately on subscribe — if no DATA arrives
1045 // within `duration`, we fire the timeout error.
1046 loop {
1047 tokio::select! {
1048 biased;
1049 cmd = rx.recv() => {
1050 match cmd {
1051 Some(TemporalCmd::Value(h)) => {
1052 // DATA arrived — forward it and reset the timer
1053 // (the next loop iteration restarts the sleep).
1054 em.emit_or_defer(pid, h);
1055 if em.is_core_gone() {
1056 return;
1057 }
1058 // Continue loop → resets the sleep timer.
1059 }
1060 Some(TemporalCmd::Complete) => {
1061 em.complete_or_defer(pid);
1062 return;
1063 }
1064 Some(TemporalCmd::Error(err_h)) => {
1065 em.error_or_defer(pid, err_h);
1066 return;
1067 }
1068 None => return,
1069 }
1070 }
1071 () = tokio::time::sleep(duration) => {
1072 // Timeout fired — emit error. Retain BEFORE the post
1073 // (skip if the Core is gone — nothing to error into).
1074 if !em.is_core_gone() {
1075 binding.retain_handle(error_handle);
1076 em.error_or_defer(pid, error_handle);
1077 }
1078 return;
1079 }
1080 }
1081 }
1082}
1083
1084// =========================================================================
1085// buffer_time(source, ms, pack_fn_id)
1086// =========================================================================
1087
1088/// Command sent from the `buffer_time` sink to its async task.
1089enum BufferTimeCmd {
1090 /// New DATA handle from upstream. Already retained by the sink.
1091 Value(HandleId),
1092 /// Upstream COMPLETE.
1093 Complete,
1094 /// Upstream ERROR. Handle already retained.
1095 Error(HandleId),
1096}
1097
1098/// Collects upstream DATA handles into a buffer and flushes them as a
1099/// packed tuple every `ms` milliseconds.
1100///
1101/// - On interval tick: packs buffered handles via
1102/// `binding.pack_tuple(pack_fn_id, &buf)`, emits the result, releases
1103/// individual handles, clears buffer.
1104/// - On upstream COMPLETE: flushes remaining buffer, then completes.
1105/// - On upstream ERROR: releases buffered handles, forwards error.
1106#[must_use]
1107pub fn buffer_time(
1108 core: &Core,
1109 binding: &Arc<dyn ProducerBinding>,
1110 source: NodeId,
1111 ms: u64,
1112 pack_fn_id: FnId,
1113) -> NodeId {
1114 let period = Duration::from_millis(ms);
1115
1116 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
1117 let binding_s = ctx.core().binding();
1118 let em = ctx.emitter();
1119 let pid = ctx.node_id();
1120 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
1121
1122 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
1123 let tx_sink = tx.clone();
1124 let tx_dead = tx.clone();
1125 let task = tokio::spawn(buffer_time_task(
1126 rx,
1127 em.emitter(),
1128 pid,
1129 bb.clone(),
1130 period,
1131 pack_fn_id,
1132 ));
1133
1134 {
1135 let st = ctx.storage();
1136 let mut storage = st.lock();
1137 let entry = storage.entry(pid).or_default();
1138 entry.op_state = Some(Box::new(BufferTimeTaskGuard { _tx: tx, task }));
1139 }
1140
1141 let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
1142 let source_sink: Sink = Arc::new(move |msgs| {
1143 for m in msgs {
1144 match m.tier() {
1145 3 => {
1146 if let Some(h) = m.payload_handle() {
1147 bb_sink.retain_handle(h);
1148 if tx_sink.send(BufferTimeCmd::Value(h)).is_err() {
1149 bb_sink.release_handle(h);
1150 }
1151 }
1152 }
1153 5 => {
1154 if let Some(h) = m.payload_handle() {
1155 bb_sink.retain_handle(h);
1156 if tx_sink.send(BufferTimeCmd::Error(h)).is_err() {
1157 bb_sink.release_handle(h);
1158 }
1159 } else {
1160 let _ = tx_sink.send(BufferTimeCmd::Complete);
1161 }
1162 }
1163 _ => {}
1164 }
1165 }
1166 });
1167
1168 let outcome = ctx.subscribe_to(source, source_sink);
1169 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
1170 let _ = tx_dead.send(BufferTimeCmd::Complete);
1171 }
1172 });
1173
1174 let fn_id = binding.register_producer_build(build);
1175 core.register_producer(fn_id)
1176 .expect("buffer_time: register_producer failed")
1177}
1178
1179/// RAII guard for `buffer_time` task — same pattern as [`TemporalTaskGuard`]
1180/// but with `BufferTimeCmd` channel.
1181struct BufferTimeTaskGuard {
1182 _tx: tokio::sync::mpsc::UnboundedSender<BufferTimeCmd>,
1183 task: tokio::task::JoinHandle<()>,
1184}
1185
1186impl Drop for BufferTimeTaskGuard {
1187 fn drop(&mut self) {
1188 self.task.abort();
1189 }
1190}
1191
1192// S2b/D230/D232-AMEND: `WeakCore` → `ProducerEmitter`. Note: the
1193// emit-path now always `pack_tuple`s even on a gone Core (em releases
1194// the packed handle); the buffered component handles are drained-released
1195// exactly as before — behaviour-equivalent, just one extra teardown-only
1196// pack FFI.
1197async fn buffer_time_task(
1198 mut rx: tokio::sync::mpsc::UnboundedReceiver<BufferTimeCmd>,
1199 em: MailboxEmitter,
1200 pid: NodeId,
1201 binding: Arc<dyn BindingBoundary>,
1202 period: Duration,
1203 pack_fn_id: FnId,
1204) {
1205 let mut buf: Vec<HandleId> = Vec::new();
1206 let mut ticker = tokio::time::interval(period);
1207 ticker.tick().await; // First tick is immediate — skip it.
1208
1209 loop {
1210 tokio::select! {
1211 biased;
1212 cmd = rx.recv() => {
1213 match cmd {
1214 Some(BufferTimeCmd::Value(h)) => {
1215 buf.push(h);
1216 }
1217 Some(BufferTimeCmd::Complete) => {
1218 // Flush remaining buffer then complete.
1219 if !buf.is_empty() {
1220 let packed = binding.pack_tuple(pack_fn_id, &buf);
1221 em.emit_or_defer(pid, packed);
1222 for h in buf.drain(..) {
1223 binding.release_handle(h);
1224 }
1225 }
1226 em.complete_or_defer(pid);
1227 return;
1228 }
1229 Some(BufferTimeCmd::Error(err_h)) => {
1230 for h in buf.drain(..) {
1231 binding.release_handle(h);
1232 }
1233 em.error_or_defer(pid, err_h);
1234 return;
1235 }
1236 None => {
1237 // Channel closed — deactivated. Release buffered.
1238 for h in buf.drain(..) {
1239 binding.release_handle(h);
1240 }
1241 return;
1242 }
1243 }
1244 }
1245 _ = ticker.tick() => {
1246 if !buf.is_empty() {
1247 let packed = binding.pack_tuple(pack_fn_id, &buf);
1248 em.emit_or_defer(pid, packed);
1249 for h in buf.drain(..) {
1250 binding.release_handle(h);
1251 }
1252 if em.is_core_gone() {
1253 return;
1254 }
1255 }
1256 }
1257 }
1258 }
1259}
1260
1261// =========================================================================
1262// window_time(source, ms)
1263// =========================================================================
1264
1265/// Command sent from the `window_time` sink to its async task.
1266enum WindowTimeCmd {
1267 /// New DATA handle from upstream. Already retained by the sink.
1268 Value(HandleId),
1269 /// Upstream COMPLETE.
1270 Complete,
1271 /// Upstream ERROR. Handle already retained.
1272 Error(HandleId),
1273}
1274
1275/// Rotates sub-node windows every `ms` milliseconds.
1276///
1277/// Creates a new inner state node each interval tick. Upstream DATA is
1278/// forwarded to the current inner window node. On tick, the current
1279/// window is completed, a new inner node is created, and its identity
1280/// is emitted as a DATA handle (via `binding.intern_node`).
1281///
1282/// - On upstream COMPLETE: completes current window, then completes self.
1283/// - On upstream ERROR: errors current window, then errors self.
1284#[must_use]
1285pub fn window_time(
1286 core: &Core,
1287 binding: &Arc<dyn ProducerBinding>,
1288 source: NodeId,
1289 ms: u64,
1290) -> NodeId {
1291 let period = Duration::from_millis(ms);
1292
1293 // S2b/D231: register the reusable no-op inner-window build at FACTORY
1294 // scope (where `binding: &Arc<dyn ProducerBinding>` is in hand) — the
1295 // build closure no longer holds a `ProducerBinding`, only `ctx`.
1296 // `FnId` is `Copy`; capture it into the build/task.
1297 let noop_fn_id = binding.register_producer_build(Box::new(|_ctx: ProducerCtx<'_>| {
1298 // Inner window nodes are passive — all emissions are driven by
1299 // the parent window_time task via the mailbox.
1300 }));
1301
1302 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
1303 let core_s = ctx.core();
1304 let binding_s = ctx.core().binding();
1305 let em = ctx.emitter();
1306 let pid = ctx.node_id();
1307 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
1308
1309 // Create the first inner window node and emit its handle.
1310 let first_inner = core_s
1311 .register_producer(noop_fn_id)
1312 .expect("window_time inner: register_producer failed");
1313 let first_handle = bb.intern_node(first_inner);
1314 core_s.emit_or_defer(pid, first_handle);
1315
1316 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
1317 let tx_sink = tx.clone();
1318 let tx_dead = tx.clone();
1319 let task = tokio::spawn(window_time_task(
1320 rx,
1321 em.emitter(),
1322 pid,
1323 first_inner,
1324 bb.clone(),
1325 period,
1326 noop_fn_id,
1327 ));
1328
1329 {
1330 let st = ctx.storage();
1331 let mut storage = st.lock();
1332 let entry = storage.entry(pid).or_default();
1333 entry.op_state = Some(Box::new(WindowTimeTaskGuard { _tx: tx, task }));
1334 }
1335
1336 let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
1337 let source_sink: Sink = Arc::new(move |msgs| {
1338 for m in msgs {
1339 match m.tier() {
1340 3 => {
1341 if let Some(h) = m.payload_handle() {
1342 bb_sink.retain_handle(h);
1343 if tx_sink.send(WindowTimeCmd::Value(h)).is_err() {
1344 bb_sink.release_handle(h);
1345 }
1346 }
1347 }
1348 5 => {
1349 if let Some(h) = m.payload_handle() {
1350 bb_sink.retain_handle(h);
1351 if tx_sink.send(WindowTimeCmd::Error(h)).is_err() {
1352 bb_sink.release_handle(h);
1353 }
1354 } else {
1355 let _ = tx_sink.send(WindowTimeCmd::Complete);
1356 }
1357 }
1358 _ => {}
1359 }
1360 }
1361 });
1362
1363 let outcome = ctx.subscribe_to(source, source_sink);
1364 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
1365 let _ = tx_dead.send(WindowTimeCmd::Complete);
1366 }
1367 });
1368
1369 let fn_id = binding.register_producer_build(build);
1370 core.register_producer(fn_id)
1371 .expect("window_time: register_producer failed")
1372}
1373
1374/// RAII guard for `window_time` task.
1375struct WindowTimeTaskGuard {
1376 _tx: tokio::sync::mpsc::UnboundedSender<WindowTimeCmd>,
1377 task: tokio::task::JoinHandle<()>,
1378}
1379
1380impl Drop for WindowTimeTaskGuard {
1381 fn drop(&mut self) {
1382 self.task.abort();
1383 }
1384}
1385
1386// S2b/D230/D234: `WeakCore` → `ProducerEmitter`; the window rotation
1387// does task-side topology mutation (`register_producer`) so it routes
1388// through `em.defer` (`CoreFull`). `current_inner` (the routing
1389// selector) becomes a shared `Arc<Mutex<NodeId>>` read INSIDE the
1390// owner-serialized defer closures, so the FIFO mailbox order
1391// (arrival order) keeps every forward routed to the window live at the
1392// time it arrived — the D234 invariant (same as `window`).
1393async fn window_time_task(
1394 mut rx: tokio::sync::mpsc::UnboundedReceiver<WindowTimeCmd>,
1395 em: MailboxEmitter,
1396 pid: NodeId,
1397 initial_inner: NodeId,
1398 binding: Arc<dyn BindingBoundary>,
1399 period: Duration,
1400 noop_fn_id: FnId,
1401) {
1402 let current_inner = Arc::new(Mutex::new(initial_inner));
1403 let mut ticker = tokio::time::interval(period);
1404 ticker.tick().await; // First tick is immediate — skip it.
1405
1406 loop {
1407 tokio::select! {
1408 biased;
1409 cmd = rx.recv() => {
1410 match cmd {
1411 Some(WindowTimeCmd::Value(h)) => {
1412 // Forward DATA to whatever window is current when
1413 // this defer runs (FIFO-ordered after any rotation
1414 // posted earlier).
1415 let cur = current_inner.clone();
1416 let b = binding.clone();
1417 if !em.defer(move |c| {
1418 c.emit(*cur.lock(), h);
1419 }) {
1420 b.release_handle(h);
1421 }
1422 }
1423 Some(WindowTimeCmd::Complete) => {
1424 let cur = current_inner.clone();
1425 let _ = em.defer(move |c| {
1426 c.complete(*cur.lock());
1427 c.complete(pid);
1428 });
1429 return;
1430 }
1431 Some(WindowTimeCmd::Error(err_h)) => {
1432 // err_h retained once by the sink; the 2nd retain
1433 // (for the self-error) is done INSIDE the closure
1434 // so a Core-gone drop leaves exactly the sink's
1435 // one retain to release (mirrors the old `else`).
1436 let cur = current_inner.clone();
1437 let b = binding.clone();
1438 let b2 = binding.clone();
1439 if !em.defer(move |c| {
1440 b2.retain_handle(err_h);
1441 c.error(*cur.lock(), err_h);
1442 c.error(pid, err_h);
1443 }) {
1444 b.release_handle(err_h);
1445 }
1446 return;
1447 }
1448 None => {
1449 // Channel closed — deactivated.
1450 return;
1451 }
1452 }
1453 }
1454 _ = ticker.tick() => {
1455 // Window rotation: complete current, create new, emit
1456 // handle — ALL inside one owner-side closure so the
1457 // `register_producer` + `current_inner` swap is atomic
1458 // w.r.t. the FIFO drain (D234).
1459 let cur = current_inner.clone();
1460 let b = binding.clone();
1461 let _ = em.defer(move |c| {
1462 let old = *cur.lock();
1463 c.complete(old);
1464 match c.register_producer(noop_fn_id) {
1465 Ok(new_inner) => {
1466 *cur.lock() = new_inner;
1467 let h = b.intern_node(new_inner);
1468 c.emit(pid, h);
1469 }
1470 Err(_) => {
1471 // Registration failed — complete self.
1472 c.complete(pid);
1473 }
1474 }
1475 });
1476 if em.is_core_gone() {
1477 return;
1478 }
1479 }
1480 }
1481 }
1482}