Skip to main content

graphrefly_operators/
temporal.rs

1// D248: post-S2c the substrate is `!Send + !Sync` single-owner Core; the
2// Sink/TopologySink callbacks were deliberately relaxed to `Arc<dyn Fn>`
3// (dropped `+ Send + Sync`). Rc would suffice and is the architecturally
4// correct type for inherently single-owner sinks — the Arc→Rc cleanup is
5// a separate slice tracked in porting-deferred.md. Until then, `Arc` is
6// over-conservative but correct, and this file's Arc<Sink> sites cite
7// the deliberate D248 relaxation, not a missed Send+Sync bound.
8#![allow(clippy::arc_with_non_send_sync)]
9
10//! Temporal operators — time-dependent transforms on reactive streams.
11//!
12//! # Operators
13//!
14//! - [`sample`] — pure reactive; emits source's latest value when notifier
15//!   fires. No timer needed.
16//! - [`debounce`] — emits after `ms` of quiet (no new upstream DATA).
17//! - [`throttle`] — rate-limits: at most one emission per `ms` window.
18//!   Configurable leading / trailing edge.
19//! - [`delay`] — delays each upstream DATA by `ms`. Multiple in-flight.
20//! - [`audit`] — on first DATA, starts a `ms` timer; when it fires, emits
21//!   the latest value. Timer does NOT restart on subsequent DATA within
22//!   the window.
23//! - [`interval`] — source that emits a monotonic counter every `period_ms`.
24//! - [`timeout`] — errors if no DATA arrives within `ms` after subscribe or
25//!   after the previous DATA.
26//! - [`buffer_time`] — collects upstream DATA into a buffer and flushes as
27//!   a packed tuple every `ms` milliseconds.
28//! - [`window_time`] — rotates inner sub-nodes every `ms` milliseconds;
29//!   upstream DATA is forwarded to the current window node.
30//!
31//! # Architecture
32//!
33//! Timer operators (debounce, throttle, delay, audit) spawn a **per-operator
34//! tokio task** that owns all pending state (handles, counters) exclusively.
35//! The sync sink callback sends [`TemporalCmd`] commands; the async task
36//! manages timers via `tokio::time` and calls `Core::emit_or_defer` /
37//! `complete_or_defer` / `error_or_defer` when ready. This avoids the
38//! double-ownership problem that would arise from tracking pending handles
39//! in both the operator's state mutex and the generic timer substrate.
40//!
41//! `sample` is a pure-reactive producer-pattern node with two subscriptions
42//! (source + notifier) and no timer dependency.
43//!
44//! All temporal operators require a **tokio runtime context** at activation
45//! time (first subscriber triggers the build closure, which calls
46//! `tokio::spawn`).
47
48use std::collections::VecDeque;
49use std::sync::Arc;
50use std::time::Duration;
51
52use parking_lot::Mutex;
53
54use graphrefly_core::{BindingBoundary, Core, FnId, HandleId, NodeId, Sink};
55use smallvec::SmallVec;
56
57use crate::producer::{
58    MailboxEmitter, ProducerBinding, ProducerBuildFn, ProducerCtx, SubscribeOutcome,
59};
60
61// =========================================================================
62// Shared command type
63// =========================================================================
64
65/// Command sent from operator sink callbacks (sync) to the per-operator
66/// async task. Channel close = shutdown (producer deactivated).
67enum TemporalCmd {
68    /// New DATA handle from upstream. Already retained by the sink.
69    Value(HandleId),
70    /// Upstream COMPLETE. Flush pending if applicable, then complete.
71    Complete,
72    /// Upstream ERROR. Cancel all, propagate. Handle already retained.
73    Error(HandleId),
74}
75
76/// RAII wrapper for temporal operator tasks. On drop, closes the command
77/// channel first (triggering the task's cleanup path which releases pending
78/// handles), then aborts the task as a fallback if it hasn't exited.
79struct TemporalTaskGuard {
80    /// Dropping the sender closes the channel → task sees `None` on
81    /// `rx.recv()` and releases all pending handles before returning.
82    _tx: tokio::sync::mpsc::UnboundedSender<TemporalCmd>,
83    task: tokio::task::JoinHandle<()>,
84}
85
86impl Drop for TemporalTaskGuard {
87    fn drop(&mut self) {
88        // _tx drops first (field order), closing the channel.
89        // Abort as fallback in case the task is stuck on a non-channel await.
90        self.task.abort();
91    }
92}
93
94/// Simple abort-on-drop for tasks with no pending handle state (e.g. interval).
95struct AbortOnDrop(tokio::task::JoinHandle<()>);
96
97impl Drop for AbortOnDrop {
98    fn drop(&mut self) {
99        self.0.abort();
100    }
101}
102
103// =========================================================================
104// sample(source, notifier) — pure reactive, no timer
105// =========================================================================
106
107/// Emits the source's latest DATA each time `notifier` delivers DATA.
108///
109/// - Source COMPLETE clears stored value; subsequent notifier DATAs no-op.
110/// - Notifier COMPLETE terminates the sample node.
111/// - Either dep ERROR terminates immediately.
112#[must_use]
113#[allow(clippy::too_many_lines)]
114pub fn sample(
115    core: &Core,
116    binding: &Arc<dyn ProducerBinding>,
117    source: NodeId,
118    notifier: NodeId,
119) -> NodeId {
120    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
121        let core_s = ctx.core();
122        let binding_s = ctx.core().binding();
123        let em = ctx.emitter();
124        let pid = ctx.node_id();
125        let state: Arc<Mutex<SampleState>> = Arc::new(Mutex::new(SampleState::default()));
126
127        // --- source sink ---
128        let st = state.clone();
129        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
130        let core_src = em.clone();
131        let source_sink: Sink = Arc::new(move |msgs| {
132            enum Act {
133                Release(HandleId),
134                Error(HandleId),
135            }
136            let mut actions: SmallVec<[Act; 2]> = SmallVec::new();
137            {
138                let mut s = st.lock();
139                if s.terminated {
140                    return;
141                }
142                for m in msgs {
143                    match m.tier() {
144                        3 => {
145                            if let Some(h) = m.payload_handle() {
146                                if let Some(old) = s.latest.replace(h) {
147                                    actions.push(Act::Release(old));
148                                }
149                                bb.retain_handle(h);
150                            }
151                        }
152                        5 => {
153                            if let Some(h) = m.payload_handle() {
154                                s.terminated = true;
155                                if let Some(old) = s.latest.take() {
156                                    actions.push(Act::Release(old));
157                                }
158                                bb.retain_handle(h);
159                                actions.push(Act::Error(h));
160                            } else {
161                                s.source_completed = true;
162                                if let Some(old) = s.latest.take() {
163                                    actions.push(Act::Release(old));
164                                }
165                            }
166                        }
167                        _ => {}
168                    }
169                }
170            }
171            for a in actions {
172                match a {
173                    Act::Release(h) => bb.release_handle(h),
174                    Act::Error(h) => core_src.error_or_defer(pid, h),
175                }
176            }
177        });
178
179        let src_outcome = ctx.subscribe_to(source, source_sink);
180        if matches!(src_outcome, SubscribeOutcome::Dead { .. }) {
181            state.lock().source_completed = true;
182        }
183
184        // --- notifier sink ---
185        let st2 = state.clone();
186        let core_n = em.clone();
187        let bb2: Arc<dyn BindingBoundary> = binding_s.clone();
188        let notifier_sink: Sink = Arc::new(move |msgs| {
189            let mut s = st2.lock();
190            if s.terminated {
191                return;
192            }
193            for m in msgs {
194                if s.terminated {
195                    return;
196                }
197                match m.tier() {
198                    3 if m.payload_handle().is_some()
199                        && !s.source_completed
200                        && s.latest.is_some() =>
201                    {
202                        let h = s.latest.unwrap();
203                        bb2.retain_handle(h);
204                        drop(s);
205                        core_n.emit_or_defer(pid, h);
206                        // Re-acquire lock and continue processing remaining
207                        // batch messages (e.g. a trailing Complete).
208                        s = st2.lock();
209                    }
210                    5 => {
211                        if let Some(h) = m.payload_handle() {
212                            s.terminated = true;
213                            if let Some(old) = s.latest.take() {
214                                bb2.release_handle(old);
215                            }
216                            bb2.retain_handle(h);
217                            drop(s);
218                            core_n.error_or_defer(pid, h);
219                            return;
220                        }
221                        // Notifier COMPLETE → self-complete.
222                        s.terminated = true;
223                        if let Some(old) = s.latest.take() {
224                            bb2.release_handle(old);
225                        }
226                        drop(s);
227                        core_n.complete_or_defer(pid);
228                        return;
229                    }
230                    _ => {}
231                }
232            }
233        });
234
235        let not_outcome = ctx.subscribe_to(notifier, notifier_sink);
236        if matches!(not_outcome, SubscribeOutcome::Dead { .. }) {
237            let mut s = state.lock();
238            if !s.terminated {
239                s.terminated = true;
240                if let Some(old) = s.latest.take() {
241                    binding_s.release_handle(old);
242                }
243                drop(s);
244                core_s.complete_or_defer(pid);
245            }
246        }
247    });
248
249    let fn_id = binding.register_producer_build(build);
250    core.register_producer(fn_id)
251        .expect("sample: register_producer failed")
252}
253
254#[derive(Default)]
255struct SampleState {
256    latest: Option<HandleId>,
257    source_completed: bool,
258    terminated: bool,
259}
260
261// =========================================================================
262// debounce(source, ms)
263// =========================================================================
264
265/// Emits after `delay` of quiet — each new upstream DATA resets the timer.
266///
267/// On upstream COMPLETE, flushes pending (if any) then completes.
268/// On upstream ERROR, cancels and propagates immediately.
269#[must_use]
270pub fn debounce(
271    core: &Core,
272    binding: &Arc<dyn ProducerBinding>,
273    source: NodeId,
274    ms: u64,
275) -> NodeId {
276    let delay = Duration::from_millis(ms);
277
278    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
279        let binding_s = ctx.core().binding();
280        let em = ctx.emitter();
281        let pid = ctx.node_id();
282        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
283
284        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
285        let tx_sink = tx.clone();
286        let tx_dead = tx.clone();
287        let task = tokio::spawn(debounce_task(rx, em.emitter(), pid, bb.clone(), delay));
288
289        // Store guard: drops tx (clean shutdown) then aborts task (fallback).
290        {
291            let st = ctx.storage();
292            let mut storage = st.lock();
293            let entry = storage.entry(pid).or_default();
294            entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
295        }
296
297        let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
298        let source_sink: Sink = Arc::new(move |msgs| {
299            for m in msgs {
300                match m.tier() {
301                    3 => {
302                        if let Some(h) = m.payload_handle() {
303                            bb_sink.retain_handle(h);
304                            if tx_sink.send(TemporalCmd::Value(h)).is_err() {
305                                bb_sink.release_handle(h);
306                            }
307                        }
308                    }
309                    5 => {
310                        if let Some(h) = m.payload_handle() {
311                            bb_sink.retain_handle(h);
312                            if tx_sink.send(TemporalCmd::Error(h)).is_err() {
313                                bb_sink.release_handle(h);
314                            }
315                        } else {
316                            let _ = tx_sink.send(TemporalCmd::Complete);
317                        }
318                    }
319                    _ => {}
320                }
321            }
322        });
323
324        let outcome = ctx.subscribe_to(source, source_sink);
325        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
326            let _ = tx_dead.send(TemporalCmd::Complete);
327        }
328    });
329
330    let fn_id = binding.register_producer_build(build);
331    core.register_producer(fn_id)
332        .expect("debounce: register_producer failed")
333}
334
335// S2b/D230/D232-AMEND: takes a `ProducerEmitter` (was `WeakCore`).
336// `em.{emit,complete,error}_or_defer` post to the `Core`-owned mailbox
337// and internally release the payload handle if the `Core` is gone (F2,
338// QA-hardened) — so every old `if let Some(c)=core.upgrade(){ c.X }
339// else { binding.release_handle(h) }` collapses to a direct `em.X`.
340// `em.is_core_gone()` preserves the old teardown-promptness where the
341// `else` branch also `return`ed (not leak-load-bearing — the task also
342// exits on channel-close — only prompt shutdown).
343async fn debounce_task(
344    mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
345    em: MailboxEmitter,
346    pid: NodeId,
347    binding: Arc<dyn BindingBoundary>,
348    delay: Duration,
349) {
350    let mut pending: Option<HandleId> = None;
351
352    loop {
353        if let Some(h) = pending {
354            tokio::select! {
355                biased;
356                cmd = rx.recv() => {
357                    match cmd {
358                        Some(TemporalCmd::Value(new_h)) => {
359                            binding.release_handle(h);
360                            pending = Some(new_h);
361                        }
362                        Some(TemporalCmd::Complete) => {
363                            // Flush pending then complete (em releases
364                            // `h` itself if the Core is gone).
365                            em.emit_or_defer(pid, h);
366                            em.complete_or_defer(pid);
367                            return;
368                        }
369                        Some(TemporalCmd::Error(err_h)) => {
370                            binding.release_handle(h);
371                            em.error_or_defer(pid, err_h);
372                            return;
373                        }
374                        None => {
375                            // Channel closed — deactivated.
376                            binding.release_handle(h);
377                            return;
378                        }
379                    }
380                }
381                () = tokio::time::sleep(delay) => {
382                    // Timer fired — emit pending.
383                    em.emit_or_defer(pid, h);
384                    if em.is_core_gone() {
385                        return;
386                    }
387                    pending = None;
388                }
389            }
390        } else {
391            // No pending — wait for command.
392            match rx.recv().await {
393                Some(TemporalCmd::Value(h)) => {
394                    pending = Some(h);
395                }
396                Some(TemporalCmd::Complete) => {
397                    em.complete_or_defer(pid);
398                    return;
399                }
400                Some(TemporalCmd::Error(err_h)) => {
401                    em.error_or_defer(pid, err_h);
402                    return;
403                }
404                None => return,
405            }
406        }
407    }
408}
409
410// =========================================================================
411// audit(source, ms)
412// =========================================================================
413
414/// On the first upstream DATA in each window, starts a `ms` timer. When
415/// the timer fires, emits the **latest** value received during the window.
416/// Subsequent DATA values within the window update the stored value but
417/// do NOT restart the timer.
418///
419/// Differs from [`debounce`]: debounce resets on each DATA (quiet-time);
420/// audit fires at a fixed interval after the first DATA in the window.
421#[must_use]
422pub fn audit(core: &Core, binding: &Arc<dyn ProducerBinding>, source: NodeId, ms: u64) -> NodeId {
423    let delay = Duration::from_millis(ms);
424
425    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
426        let binding_s = ctx.core().binding();
427        let em = ctx.emitter();
428        let pid = ctx.node_id();
429        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
430
431        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
432        let tx_sink = tx.clone();
433        let tx_dead = tx.clone();
434        let task = tokio::spawn(audit_task(rx, em.emitter(), pid, bb.clone(), delay));
435
436        {
437            let st = ctx.storage();
438            let mut storage = st.lock();
439            let entry = storage.entry(pid).or_default();
440            entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
441        }
442
443        let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
444        let source_sink: Sink = Arc::new(move |msgs| {
445            for m in msgs {
446                match m.tier() {
447                    3 => {
448                        if let Some(h) = m.payload_handle() {
449                            bb_sink.retain_handle(h);
450                            if tx_sink.send(TemporalCmd::Value(h)).is_err() {
451                                bb_sink.release_handle(h);
452                            }
453                        }
454                    }
455                    5 => {
456                        if let Some(h) = m.payload_handle() {
457                            bb_sink.retain_handle(h);
458                            if tx_sink.send(TemporalCmd::Error(h)).is_err() {
459                                bb_sink.release_handle(h);
460                            }
461                        } else {
462                            let _ = tx_sink.send(TemporalCmd::Complete);
463                        }
464                    }
465                    _ => {}
466                }
467            }
468        });
469
470        let outcome = ctx.subscribe_to(source, source_sink);
471        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
472            let _ = tx_dead.send(TemporalCmd::Complete);
473        }
474    });
475
476    let fn_id = binding.register_producer_build(build);
477    core.register_producer(fn_id)
478        .expect("audit: register_producer failed")
479}
480
481// S2b/D230/D232-AMEND: `core: WeakCore` → `em: ProducerEmitter` (same
482// collapse rationale as `debounce_task`).
483async fn audit_task(
484    mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
485    em: MailboxEmitter,
486    pid: NodeId,
487    binding: Arc<dyn BindingBoundary>,
488    delay: Duration,
489) {
490    // Outer loop: wait for first DATA to start a window.
491    loop {
492        match rx.recv().await {
493            Some(TemporalCmd::Value(h)) => {
494                // First value in window — start timer, collect updates.
495                let mut latest = h;
496
497                tokio::select! {
498                    biased;
499                    // Drain commands until timer fires.
500                    () = async {
501                        loop {
502                            tokio::select! {
503                                biased;
504                                cmd = rx.recv() => {
505                                    match cmd {
506                                        Some(TemporalCmd::Value(new_h)) => {
507                                            binding.release_handle(latest);
508                                            latest = new_h;
509                                        }
510                                        Some(TemporalCmd::Complete) => {
511                                            // Emit latest, then complete.
512                                            em.emit_or_defer(pid, latest);
513                                            em.complete_or_defer(pid);
514                                            return; // exits the async block
515                                        }
516                                        Some(TemporalCmd::Error(err_h)) => {
517                                            binding.release_handle(latest);
518                                            em.error_or_defer(pid, err_h);
519                                            return;
520                                        }
521                                        None => {
522                                            binding.release_handle(latest);
523                                            return;
524                                        }
525                                    }
526                                }
527                            }
528                        }
529                    } => {
530                        return; // Terminal command handled inside async block.
531                    }
532                    () = tokio::time::sleep(delay) => {
533                        // Timer fired — emit latest, window closes.
534                        em.emit_or_defer(pid, latest);
535                        if em.is_core_gone() {
536                            return;
537                        }
538                        // Continue outer loop — wait for next window.
539                    }
540                }
541            }
542            Some(TemporalCmd::Complete) => {
543                em.complete_or_defer(pid);
544                return;
545            }
546            Some(TemporalCmd::Error(err_h)) => {
547                em.error_or_defer(pid, err_h);
548                return;
549            }
550            None => return,
551        }
552    }
553}
554
555// =========================================================================
556// delay(source, ms)
557// =========================================================================
558
559/// Delays each upstream DATA by `ms` milliseconds. Multiple values can
560/// be in-flight simultaneously, each with its own timer.
561///
562/// - COMPLETE waits for all pending delays, then completes.
563/// - ERROR cancels all pending and propagates immediately.
564#[must_use]
565pub fn delay(core: &Core, binding: &Arc<dyn ProducerBinding>, source: NodeId, ms: u64) -> NodeId {
566    let delay_dur = Duration::from_millis(ms);
567
568    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
569        let binding_s = ctx.core().binding();
570        let em = ctx.emitter();
571        let pid = ctx.node_id();
572        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
573
574        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
575        let tx_sink = tx.clone();
576        let tx_dead = tx.clone();
577        let task = tokio::spawn(delay_task(rx, em.emitter(), pid, bb.clone(), delay_dur));
578
579        {
580            let st = ctx.storage();
581            let mut storage = st.lock();
582            let entry = storage.entry(pid).or_default();
583            entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
584        }
585
586        let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
587        let source_sink: Sink = Arc::new(move |msgs| {
588            for m in msgs {
589                match m.tier() {
590                    3 => {
591                        if let Some(h) = m.payload_handle() {
592                            bb_sink.retain_handle(h);
593                            if tx_sink.send(TemporalCmd::Value(h)).is_err() {
594                                bb_sink.release_handle(h);
595                            }
596                        }
597                    }
598                    5 => {
599                        if let Some(h) = m.payload_handle() {
600                            bb_sink.retain_handle(h);
601                            if tx_sink.send(TemporalCmd::Error(h)).is_err() {
602                                bb_sink.release_handle(h);
603                            }
604                        } else {
605                            let _ = tx_sink.send(TemporalCmd::Complete);
606                        }
607                    }
608                    _ => {}
609                }
610            }
611        });
612
613        let outcome = ctx.subscribe_to(source, source_sink);
614        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
615            let _ = tx_dead.send(TemporalCmd::Complete);
616        }
617    });
618
619    let fn_id = binding.register_producer_build(build);
620    core.register_producer(fn_id)
621        .expect("delay: register_producer failed")
622}
623
624// S2b/D230/D232-AMEND: `core: WeakCore` → `em: ProducerEmitter`.
625async fn delay_task(
626    mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
627    em: MailboxEmitter,
628    pid: NodeId,
629    binding: Arc<dyn BindingBoundary>,
630    delay: Duration,
631) {
632    let mut queue: VecDeque<(tokio::time::Instant, HandleId)> = VecDeque::new();
633    let mut complete_pending = false;
634
635    loop {
636        let next_fire = queue.front().map(|(deadline, _)| *deadline);
637
638        tokio::select! {
639            biased;
640            cmd = rx.recv() => {
641                match cmd {
642                    Some(TemporalCmd::Value(h)) => {
643                        queue.push_back((tokio::time::Instant::now() + delay, h));
644                    }
645                    Some(TemporalCmd::Complete) => {
646                        complete_pending = true;
647                        if queue.is_empty() {
648                            em.complete_or_defer(pid);
649                            return;
650                        }
651                        // Wait for pending timers to drain.
652                    }
653                    Some(TemporalCmd::Error(err_h)) => {
654                        // Release all pending.
655                        for (_, h) in queue.drain(..) {
656                            binding.release_handle(h);
657                        }
658                        em.error_or_defer(pid, err_h);
659                        return;
660                    }
661                    None => {
662                        for (_, h) in queue.drain(..) {
663                            binding.release_handle(h);
664                        }
665                        return;
666                    }
667                }
668            }
669            () = sleep_until_or_forever(next_fire) => {
670                // Fire all expired.
671                let now = tokio::time::Instant::now();
672                while let Some(&(deadline, _)) = queue.front() {
673                    if deadline <= now {
674                        let (_, h) = queue.pop_front().unwrap();
675                        em.emit_or_defer(pid, h);
676                        if em.is_core_gone() {
677                            for (_, h2) in queue.drain(..) {
678                                binding.release_handle(h2);
679                            }
680                            return;
681                        }
682                    } else {
683                        break;
684                    }
685                }
686                if complete_pending && queue.is_empty() {
687                    em.complete_or_defer(pid);
688                    return;
689                }
690            }
691        }
692    }
693}
694
695// =========================================================================
696// throttle(source, ms, opts)
697// =========================================================================
698
699/// Options for [`throttle`].
700#[derive(Debug, Clone, Copy)]
701pub struct ThrottleOpts {
702    /// Emit immediately at the leading edge of each window (default: true).
703    pub leading: bool,
704    /// Emit the latest value at the trailing edge of each window (default: false).
705    pub trailing: bool,
706}
707
708impl Default for ThrottleOpts {
709    fn default() -> Self {
710        Self {
711            leading: true,
712            trailing: false,
713        }
714    }
715}
716
717/// Rate-limits upstream emissions to at most one per `ms`-millisecond window.
718///
719/// With default options (`leading: true, trailing: false`), the first DATA
720/// in each window is emitted immediately; subsequent DATA within the window
721/// is dropped. With `trailing: true`, the latest DATA is emitted at window end.
722///
723/// On upstream COMPLETE, flushes trailing (if any) then completes.
724#[must_use]
725pub fn throttle(
726    core: &Core,
727    binding: &Arc<dyn ProducerBinding>,
728    source: NodeId,
729    ms: u64,
730    opts: ThrottleOpts,
731) -> NodeId {
732    let window = Duration::from_millis(ms);
733
734    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
735        let binding_s = ctx.core().binding();
736        let em = ctx.emitter();
737        let pid = ctx.node_id();
738        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
739
740        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
741        let tx_sink = tx.clone();
742        let tx_dead = tx.clone();
743        let task = tokio::spawn(throttle_task(
744            rx,
745            em.emitter(),
746            pid,
747            bb.clone(),
748            window,
749            opts,
750        ));
751
752        {
753            let st = ctx.storage();
754            let mut storage = st.lock();
755            let entry = storage.entry(pid).or_default();
756            entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
757        }
758
759        let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
760        let source_sink: Sink = Arc::new(move |msgs| {
761            for m in msgs {
762                match m.tier() {
763                    3 => {
764                        if let Some(h) = m.payload_handle() {
765                            bb_sink.retain_handle(h);
766                            if tx_sink.send(TemporalCmd::Value(h)).is_err() {
767                                bb_sink.release_handle(h);
768                            }
769                        }
770                    }
771                    5 => {
772                        if let Some(h) = m.payload_handle() {
773                            bb_sink.retain_handle(h);
774                            if tx_sink.send(TemporalCmd::Error(h)).is_err() {
775                                bb_sink.release_handle(h);
776                            }
777                        } else {
778                            let _ = tx_sink.send(TemporalCmd::Complete);
779                        }
780                    }
781                    _ => {}
782                }
783            }
784        });
785
786        let outcome = ctx.subscribe_to(source, source_sink);
787        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
788            let _ = tx_dead.send(TemporalCmd::Complete);
789        }
790    });
791
792    let fn_id = binding.register_producer_build(build);
793    core.register_producer(fn_id)
794        .expect("throttle: register_producer failed")
795}
796
797async fn throttle_task(
798    mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
799    // S2b/D230/D232-AMEND: `WeakCore` → `ProducerEmitter`.
800    em: MailboxEmitter,
801    pid: NodeId,
802    binding: Arc<dyn BindingBoundary>,
803    window: Duration,
804    opts: ThrottleOpts,
805) {
806    let mut trailing_pending: Option<HandleId> = None;
807    let mut window_deadline: Option<tokio::time::Instant> = None;
808
809    loop {
810        let fire_at = if opts.trailing { window_deadline } else { None };
811
812        tokio::select! {
813            biased;
814            cmd = rx.recv() => {
815                match cmd {
816                    Some(TemporalCmd::Value(h)) => {
817                        let in_window = window_deadline
818                            .is_some_and(|d| tokio::time::Instant::now() < d);
819
820                        if !in_window {
821                            // Window open — start new window.
822                            window_deadline = Some(tokio::time::Instant::now() + window);
823                            if opts.leading {
824                                em.emit_or_defer(pid, h);
825                                if em.is_core_gone() {
826                                    release_opt(&mut trailing_pending, &*binding);
827                                    return;
828                                }
829                            } else if opts.trailing {
830                                if let Some(old) = trailing_pending.replace(h) {
831                                    binding.release_handle(old);
832                                }
833                            } else {
834                                binding.release_handle(h);
835                            }
836                        } else if opts.trailing {
837                            // Inside window — store for trailing.
838                            if let Some(old) = trailing_pending.replace(h) {
839                                binding.release_handle(old);
840                            }
841                        } else {
842                            binding.release_handle(h);
843                        }
844                    }
845                    Some(TemporalCmd::Complete) => {
846                        if let Some(h) = trailing_pending.take() {
847                            em.emit_or_defer(pid, h);
848                        }
849                        em.complete_or_defer(pid);
850                        return;
851                    }
852                    Some(TemporalCmd::Error(err_h)) => {
853                        release_opt(&mut trailing_pending, &*binding);
854                        em.error_or_defer(pid, err_h);
855                        return;
856                    }
857                    None => {
858                        release_opt(&mut trailing_pending, &*binding);
859                        return;
860                    }
861                }
862            }
863            () = sleep_until_or_forever(fire_at) => {
864                // Window expired — emit trailing if any, reopen window.
865                window_deadline = None;
866                if let Some(h) = trailing_pending.take() {
867                    em.emit_or_defer(pid, h);
868                    if em.is_core_gone() {
869                        return;
870                    }
871                    // Start new window for the trailing emission.
872                    window_deadline = Some(tokio::time::Instant::now() + window);
873                }
874            }
875        }
876    }
877}
878
879// =========================================================================
880// interval(period_ms) — timer source
881// =========================================================================
882
883/// Source that emits a monotonically increasing counter (`1, 2, 3, …`)
884/// every `period_ms` milliseconds. Counter starts at 1 (not 0) because
885/// `HandleId(0)` is the `NO_HANDLE` sentinel. Resubscribable: counter
886/// resets on deactivation + reactivation.
887///
888/// The emitted values use raw `HandleId::new(counter)`. Bindings should
889/// register these as integer handles.
890#[must_use]
891pub fn interval(core: &Core, binding: &Arc<dyn ProducerBinding>, period_ms: u64) -> NodeId {
892    let period = Duration::from_millis(period_ms);
893
894    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
895        let binding_s = ctx.core().binding();
896        let em = ctx.emitter();
897        let pid = ctx.node_id();
898        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
899        let em_task = em.emitter();
900
901        let task = tokio::spawn(async move {
902            let mut ticker = tokio::time::interval(period);
903            ticker.tick().await; // First tick is immediate — skip it.
904            let mut counter: u64 = 1; // Start at 1: HandleId(0) = NO_HANDLE.
905            loop {
906                ticker.tick().await;
907                // S2b/D230: stop ticking once the Core is gone (was the
908                // `weak.upgrade() == None` break); check BEFORE retaining
909                // so a dead Core never leaks a fresh counter handle.
910                if em_task.is_core_gone() {
911                    break;
912                }
913                let h = HandleId::new(counter);
914                bb.retain_handle(h);
915                em_task.emit_or_defer(pid, h);
916                counter += 1;
917            }
918        });
919
920        {
921            let st = ctx.storage();
922            let mut storage = st.lock();
923            let entry = storage.entry(pid).or_default();
924            entry.op_state = Some(Box::new(AbortOnDrop(task)));
925        }
926    });
927
928    let fn_id = binding.register_producer_build(build);
929    core.register_producer(fn_id)
930        .expect("interval: register_producer failed")
931}
932
933// =========================================================================
934// Helpers
935// =========================================================================
936
937/// Sleep until the given instant, or sleep forever if `None`.
938async fn sleep_until_or_forever(deadline: Option<tokio::time::Instant>) {
939    match deadline {
940        Some(d) => tokio::time::sleep_until(d).await,
941        None => std::future::pending::<()>().await,
942    }
943}
944
945/// Release an optional handle and set it to `None`.
946fn release_opt(opt: &mut Option<HandleId>, binding: &dyn BindingBoundary) {
947    if let Some(h) = opt.take() {
948        binding.release_handle(h);
949    }
950}
951
952// =========================================================================
953// timeout(source, ms, error_handle)
954// =========================================================================
955
956/// Errors if no upstream DATA arrives within `ms` milliseconds after
957/// subscribe or after the previous DATA. Each DATA resets the timer.
958///
959/// On timeout: retains `error_handle` and emits ERROR with it.
960/// On upstream COMPLETE: cancels timer, forwards complete.
961/// On upstream ERROR: cancels timer, forwards error.
962#[must_use]
963pub fn timeout(
964    core: &Core,
965    binding: &Arc<dyn ProducerBinding>,
966    source: NodeId,
967    ms: u64,
968    error_handle: HandleId,
969) -> NodeId {
970    let duration = Duration::from_millis(ms);
971
972    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
973        let binding_s = ctx.core().binding();
974        let em = ctx.emitter();
975        let pid = ctx.node_id();
976        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
977
978        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
979        let tx_sink = tx.clone();
980        let tx_dead = tx.clone();
981        let task = tokio::spawn(timeout_task(
982            rx,
983            em.emitter(),
984            pid,
985            bb.clone(),
986            duration,
987            error_handle,
988        ));
989
990        {
991            let st = ctx.storage();
992            let mut storage = st.lock();
993            let entry = storage.entry(pid).or_default();
994            entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
995        }
996
997        let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
998        let source_sink: Sink = Arc::new(move |msgs| {
999            for m in msgs {
1000                match m.tier() {
1001                    3 => {
1002                        if let Some(h) = m.payload_handle() {
1003                            bb_sink.retain_handle(h);
1004                            if tx_sink.send(TemporalCmd::Value(h)).is_err() {
1005                                bb_sink.release_handle(h);
1006                            }
1007                        }
1008                    }
1009                    5 => {
1010                        if let Some(h) = m.payload_handle() {
1011                            bb_sink.retain_handle(h);
1012                            if tx_sink.send(TemporalCmd::Error(h)).is_err() {
1013                                bb_sink.release_handle(h);
1014                            }
1015                        } else {
1016                            let _ = tx_sink.send(TemporalCmd::Complete);
1017                        }
1018                    }
1019                    _ => {}
1020                }
1021            }
1022        });
1023
1024        let outcome = ctx.subscribe_to(source, source_sink);
1025        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
1026            let _ = tx_dead.send(TemporalCmd::Complete);
1027        }
1028    });
1029
1030    let fn_id = binding.register_producer_build(build);
1031    core.register_producer(fn_id)
1032        .expect("timeout: register_producer failed")
1033}
1034
1035// S2b/D230/D232-AMEND: `WeakCore` → `ProducerEmitter`.
1036async fn timeout_task(
1037    mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
1038    em: MailboxEmitter,
1039    pid: NodeId,
1040    binding: Arc<dyn BindingBoundary>,
1041    duration: Duration,
1042    error_handle: HandleId,
1043) {
1044    // The timer starts immediately on subscribe — if no DATA arrives
1045    // within `duration`, we fire the timeout error.
1046    loop {
1047        tokio::select! {
1048            biased;
1049            cmd = rx.recv() => {
1050                match cmd {
1051                    Some(TemporalCmd::Value(h)) => {
1052                        // DATA arrived — forward it and reset the timer
1053                        // (the next loop iteration restarts the sleep).
1054                        em.emit_or_defer(pid, h);
1055                        if em.is_core_gone() {
1056                            return;
1057                        }
1058                        // Continue loop → resets the sleep timer.
1059                    }
1060                    Some(TemporalCmd::Complete) => {
1061                        em.complete_or_defer(pid);
1062                        return;
1063                    }
1064                    Some(TemporalCmd::Error(err_h)) => {
1065                        em.error_or_defer(pid, err_h);
1066                        return;
1067                    }
1068                    None => return,
1069                }
1070            }
1071            () = tokio::time::sleep(duration) => {
1072                // Timeout fired — emit error. Retain BEFORE the post
1073                // (skip if the Core is gone — nothing to error into).
1074                if !em.is_core_gone() {
1075                    binding.retain_handle(error_handle);
1076                    em.error_or_defer(pid, error_handle);
1077                }
1078                return;
1079            }
1080        }
1081    }
1082}
1083
1084// =========================================================================
1085// buffer_time(source, ms, pack_fn_id)
1086// =========================================================================
1087
1088/// Command sent from the `buffer_time` sink to its async task.
1089enum BufferTimeCmd {
1090    /// New DATA handle from upstream. Already retained by the sink.
1091    Value(HandleId),
1092    /// Upstream COMPLETE.
1093    Complete,
1094    /// Upstream ERROR. Handle already retained.
1095    Error(HandleId),
1096}
1097
1098/// Collects upstream DATA handles into a buffer and flushes them as a
1099/// packed tuple every `ms` milliseconds.
1100///
1101/// - On interval tick: packs buffered handles via
1102///   `binding.pack_tuple(pack_fn_id, &buf)`, emits the result, releases
1103///   individual handles, clears buffer.
1104/// - On upstream COMPLETE: flushes remaining buffer, then completes.
1105/// - On upstream ERROR: releases buffered handles, forwards error.
1106#[must_use]
1107pub fn buffer_time(
1108    core: &Core,
1109    binding: &Arc<dyn ProducerBinding>,
1110    source: NodeId,
1111    ms: u64,
1112    pack_fn_id: FnId,
1113) -> NodeId {
1114    let period = Duration::from_millis(ms);
1115
1116    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
1117        let binding_s = ctx.core().binding();
1118        let em = ctx.emitter();
1119        let pid = ctx.node_id();
1120        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
1121
1122        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
1123        let tx_sink = tx.clone();
1124        let tx_dead = tx.clone();
1125        let task = tokio::spawn(buffer_time_task(
1126            rx,
1127            em.emitter(),
1128            pid,
1129            bb.clone(),
1130            period,
1131            pack_fn_id,
1132        ));
1133
1134        {
1135            let st = ctx.storage();
1136            let mut storage = st.lock();
1137            let entry = storage.entry(pid).or_default();
1138            entry.op_state = Some(Box::new(BufferTimeTaskGuard { _tx: tx, task }));
1139        }
1140
1141        let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
1142        let source_sink: Sink = Arc::new(move |msgs| {
1143            for m in msgs {
1144                match m.tier() {
1145                    3 => {
1146                        if let Some(h) = m.payload_handle() {
1147                            bb_sink.retain_handle(h);
1148                            if tx_sink.send(BufferTimeCmd::Value(h)).is_err() {
1149                                bb_sink.release_handle(h);
1150                            }
1151                        }
1152                    }
1153                    5 => {
1154                        if let Some(h) = m.payload_handle() {
1155                            bb_sink.retain_handle(h);
1156                            if tx_sink.send(BufferTimeCmd::Error(h)).is_err() {
1157                                bb_sink.release_handle(h);
1158                            }
1159                        } else {
1160                            let _ = tx_sink.send(BufferTimeCmd::Complete);
1161                        }
1162                    }
1163                    _ => {}
1164                }
1165            }
1166        });
1167
1168        let outcome = ctx.subscribe_to(source, source_sink);
1169        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
1170            let _ = tx_dead.send(BufferTimeCmd::Complete);
1171        }
1172    });
1173
1174    let fn_id = binding.register_producer_build(build);
1175    core.register_producer(fn_id)
1176        .expect("buffer_time: register_producer failed")
1177}
1178
1179/// RAII guard for `buffer_time` task — same pattern as [`TemporalTaskGuard`]
1180/// but with `BufferTimeCmd` channel.
1181struct BufferTimeTaskGuard {
1182    _tx: tokio::sync::mpsc::UnboundedSender<BufferTimeCmd>,
1183    task: tokio::task::JoinHandle<()>,
1184}
1185
1186impl Drop for BufferTimeTaskGuard {
1187    fn drop(&mut self) {
1188        self.task.abort();
1189    }
1190}
1191
1192// S2b/D230/D232-AMEND: `WeakCore` → `ProducerEmitter`. Note: the
1193// emit-path now always `pack_tuple`s even on a gone Core (em releases
1194// the packed handle); the buffered component handles are drained-released
1195// exactly as before — behaviour-equivalent, just one extra teardown-only
1196// pack FFI.
1197async fn buffer_time_task(
1198    mut rx: tokio::sync::mpsc::UnboundedReceiver<BufferTimeCmd>,
1199    em: MailboxEmitter,
1200    pid: NodeId,
1201    binding: Arc<dyn BindingBoundary>,
1202    period: Duration,
1203    pack_fn_id: FnId,
1204) {
1205    let mut buf: Vec<HandleId> = Vec::new();
1206    let mut ticker = tokio::time::interval(period);
1207    ticker.tick().await; // First tick is immediate — skip it.
1208
1209    loop {
1210        tokio::select! {
1211            biased;
1212            cmd = rx.recv() => {
1213                match cmd {
1214                    Some(BufferTimeCmd::Value(h)) => {
1215                        buf.push(h);
1216                    }
1217                    Some(BufferTimeCmd::Complete) => {
1218                        // Flush remaining buffer then complete.
1219                        if !buf.is_empty() {
1220                            let packed = binding.pack_tuple(pack_fn_id, &buf);
1221                            em.emit_or_defer(pid, packed);
1222                            for h in buf.drain(..) {
1223                                binding.release_handle(h);
1224                            }
1225                        }
1226                        em.complete_or_defer(pid);
1227                        return;
1228                    }
1229                    Some(BufferTimeCmd::Error(err_h)) => {
1230                        for h in buf.drain(..) {
1231                            binding.release_handle(h);
1232                        }
1233                        em.error_or_defer(pid, err_h);
1234                        return;
1235                    }
1236                    None => {
1237                        // Channel closed — deactivated. Release buffered.
1238                        for h in buf.drain(..) {
1239                            binding.release_handle(h);
1240                        }
1241                        return;
1242                    }
1243                }
1244            }
1245            _ = ticker.tick() => {
1246                if !buf.is_empty() {
1247                    let packed = binding.pack_tuple(pack_fn_id, &buf);
1248                    em.emit_or_defer(pid, packed);
1249                    for h in buf.drain(..) {
1250                        binding.release_handle(h);
1251                    }
1252                    if em.is_core_gone() {
1253                        return;
1254                    }
1255                }
1256            }
1257        }
1258    }
1259}
1260
1261// =========================================================================
1262// window_time(source, ms)
1263// =========================================================================
1264
1265/// Command sent from the `window_time` sink to its async task.
1266enum WindowTimeCmd {
1267    /// New DATA handle from upstream. Already retained by the sink.
1268    Value(HandleId),
1269    /// Upstream COMPLETE.
1270    Complete,
1271    /// Upstream ERROR. Handle already retained.
1272    Error(HandleId),
1273}
1274
1275/// Rotates sub-node windows every `ms` milliseconds.
1276///
1277/// Creates a new inner state node each interval tick. Upstream DATA is
1278/// forwarded to the current inner window node. On tick, the current
1279/// window is completed, a new inner node is created, and its identity
1280/// is emitted as a DATA handle (via `binding.intern_node`).
1281///
1282/// - On upstream COMPLETE: completes current window, then completes self.
1283/// - On upstream ERROR: errors current window, then errors self.
1284#[must_use]
1285pub fn window_time(
1286    core: &Core,
1287    binding: &Arc<dyn ProducerBinding>,
1288    source: NodeId,
1289    ms: u64,
1290) -> NodeId {
1291    let period = Duration::from_millis(ms);
1292
1293    // S2b/D231: register the reusable no-op inner-window build at FACTORY
1294    // scope (where `binding: &Arc<dyn ProducerBinding>` is in hand) — the
1295    // build closure no longer holds a `ProducerBinding`, only `ctx`.
1296    // `FnId` is `Copy`; capture it into the build/task.
1297    let noop_fn_id = binding.register_producer_build(Box::new(|_ctx: ProducerCtx<'_>| {
1298        // Inner window nodes are passive — all emissions are driven by
1299        // the parent window_time task via the mailbox.
1300    }));
1301
1302    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
1303        let core_s = ctx.core();
1304        let binding_s = ctx.core().binding();
1305        let em = ctx.emitter();
1306        let pid = ctx.node_id();
1307        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
1308
1309        // Create the first inner window node and emit its handle.
1310        let first_inner = core_s
1311            .register_producer(noop_fn_id)
1312            .expect("window_time inner: register_producer failed");
1313        let first_handle = bb.intern_node(first_inner);
1314        core_s.emit_or_defer(pid, first_handle);
1315
1316        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
1317        let tx_sink = tx.clone();
1318        let tx_dead = tx.clone();
1319        let task = tokio::spawn(window_time_task(
1320            rx,
1321            em.emitter(),
1322            pid,
1323            first_inner,
1324            bb.clone(),
1325            period,
1326            noop_fn_id,
1327        ));
1328
1329        {
1330            let st = ctx.storage();
1331            let mut storage = st.lock();
1332            let entry = storage.entry(pid).or_default();
1333            entry.op_state = Some(Box::new(WindowTimeTaskGuard { _tx: tx, task }));
1334        }
1335
1336        let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
1337        let source_sink: Sink = Arc::new(move |msgs| {
1338            for m in msgs {
1339                match m.tier() {
1340                    3 => {
1341                        if let Some(h) = m.payload_handle() {
1342                            bb_sink.retain_handle(h);
1343                            if tx_sink.send(WindowTimeCmd::Value(h)).is_err() {
1344                                bb_sink.release_handle(h);
1345                            }
1346                        }
1347                    }
1348                    5 => {
1349                        if let Some(h) = m.payload_handle() {
1350                            bb_sink.retain_handle(h);
1351                            if tx_sink.send(WindowTimeCmd::Error(h)).is_err() {
1352                                bb_sink.release_handle(h);
1353                            }
1354                        } else {
1355                            let _ = tx_sink.send(WindowTimeCmd::Complete);
1356                        }
1357                    }
1358                    _ => {}
1359                }
1360            }
1361        });
1362
1363        let outcome = ctx.subscribe_to(source, source_sink);
1364        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
1365            let _ = tx_dead.send(WindowTimeCmd::Complete);
1366        }
1367    });
1368
1369    let fn_id = binding.register_producer_build(build);
1370    core.register_producer(fn_id)
1371        .expect("window_time: register_producer failed")
1372}
1373
1374/// RAII guard for `window_time` task.
1375struct WindowTimeTaskGuard {
1376    _tx: tokio::sync::mpsc::UnboundedSender<WindowTimeCmd>,
1377    task: tokio::task::JoinHandle<()>,
1378}
1379
1380impl Drop for WindowTimeTaskGuard {
1381    fn drop(&mut self) {
1382        self.task.abort();
1383    }
1384}
1385
1386// S2b/D230/D234: `WeakCore` → `ProducerEmitter`; the window rotation
1387// does task-side topology mutation (`register_producer`) so it routes
1388// through `em.defer` (`CoreFull`). `current_inner` (the routing
1389// selector) becomes a shared `Arc<Mutex<NodeId>>` read INSIDE the
1390// owner-serialized defer closures, so the FIFO mailbox order
1391// (arrival order) keeps every forward routed to the window live at the
1392// time it arrived — the D234 invariant (same as `window`).
1393async fn window_time_task(
1394    mut rx: tokio::sync::mpsc::UnboundedReceiver<WindowTimeCmd>,
1395    em: MailboxEmitter,
1396    pid: NodeId,
1397    initial_inner: NodeId,
1398    binding: Arc<dyn BindingBoundary>,
1399    period: Duration,
1400    noop_fn_id: FnId,
1401) {
1402    let current_inner = Arc::new(Mutex::new(initial_inner));
1403    let mut ticker = tokio::time::interval(period);
1404    ticker.tick().await; // First tick is immediate — skip it.
1405
1406    loop {
1407        tokio::select! {
1408            biased;
1409            cmd = rx.recv() => {
1410                match cmd {
1411                    Some(WindowTimeCmd::Value(h)) => {
1412                        // Forward DATA to whatever window is current when
1413                        // this defer runs (FIFO-ordered after any rotation
1414                        // posted earlier).
1415                        let cur = current_inner.clone();
1416                        let b = binding.clone();
1417                        if !em.defer(move |c| {
1418                            c.emit(*cur.lock(), h);
1419                        }) {
1420                            b.release_handle(h);
1421                        }
1422                    }
1423                    Some(WindowTimeCmd::Complete) => {
1424                        let cur = current_inner.clone();
1425                        let _ = em.defer(move |c| {
1426                            c.complete(*cur.lock());
1427                            c.complete(pid);
1428                        });
1429                        return;
1430                    }
1431                    Some(WindowTimeCmd::Error(err_h)) => {
1432                        // err_h retained once by the sink; the 2nd retain
1433                        // (for the self-error) is done INSIDE the closure
1434                        // so a Core-gone drop leaves exactly the sink's
1435                        // one retain to release (mirrors the old `else`).
1436                        let cur = current_inner.clone();
1437                        let b = binding.clone();
1438                        let b2 = binding.clone();
1439                        if !em.defer(move |c| {
1440                            b2.retain_handle(err_h);
1441                            c.error(*cur.lock(), err_h);
1442                            c.error(pid, err_h);
1443                        }) {
1444                            b.release_handle(err_h);
1445                        }
1446                        return;
1447                    }
1448                    None => {
1449                        // Channel closed — deactivated.
1450                        return;
1451                    }
1452                }
1453            }
1454            _ = ticker.tick() => {
1455                // Window rotation: complete current, create new, emit
1456                // handle — ALL inside one owner-side closure so the
1457                // `register_producer` + `current_inner` swap is atomic
1458                // w.r.t. the FIFO drain (D234).
1459                let cur = current_inner.clone();
1460                let b = binding.clone();
1461                let _ = em.defer(move |c| {
1462                    let old = *cur.lock();
1463                    c.complete(old);
1464                    match c.register_producer(noop_fn_id) {
1465                        Ok(new_inner) => {
1466                            *cur.lock() = new_inner;
1467                            let h = b.intern_node(new_inner);
1468                            c.emit(pid, h);
1469                        }
1470                        Err(_) => {
1471                            // Registration failed — complete self.
1472                            c.complete(pid);
1473                        }
1474                    }
1475                });
1476                if em.is_core_gone() {
1477                    return;
1478                }
1479            }
1480        }
1481    }
1482}