Skip to main content

graphrefly_operators/
temporal.rs

1//! Temporal operators — time-dependent transforms on reactive streams.
2//!
3//! # Operators
4//!
5//! - [`sample`] — pure reactive; emits source's latest value when notifier
6//!   fires. No timer needed.
7//! - [`debounce`] — emits after `ms` of quiet (no new upstream DATA).
8//! - [`throttle`] — rate-limits: at most one emission per `ms` window.
9//!   Configurable leading / trailing edge.
10//! - [`delay`] — delays each upstream DATA by `ms`. Multiple in-flight.
11//! - [`audit`] — on first DATA, starts a `ms` timer; when it fires, emits
12//!   the latest value. Timer does NOT restart on subsequent DATA within
13//!   the window.
14//! - [`interval`] — source that emits a monotonic counter every `period_ms`.
15//! - [`timeout`] — errors if no DATA arrives within `ms` after subscribe or
16//!   after the previous DATA.
17//! - [`buffer_time`] — collects upstream DATA into a buffer and flushes as
18//!   a packed tuple every `ms` milliseconds.
19//! - [`window_time`] — rotates inner sub-nodes every `ms` milliseconds;
20//!   upstream DATA is forwarded to the current window node.
21//!
22//! # Architecture
23//!
24//! Timer operators (debounce, throttle, delay, audit) spawn a **per-operator
25//! tokio task** that owns all pending state (handles, counters) exclusively.
26//! The sync sink callback sends [`TemporalCmd`] commands; the async task
27//! manages timers via `tokio::time` and calls `Core::emit` /
28//! `complete` / `error` when ready. This avoids the
29//! double-ownership problem that would arise from tracking pending handles
30//! in both the operator's state mutex and the generic timer substrate.
31//!
32//! `sample` is a pure-reactive producer-pattern node with two subscriptions
33//! (source + notifier) and no timer dependency.
34//!
35//! All temporal operators require a **tokio runtime context** at activation
36//! time (first subscriber triggers the build closure, which calls
37//! `tokio::spawn`).
38
39use std::cell::RefCell;
40use std::collections::VecDeque;
41use std::rc::Rc;
42use std::sync::Arc;
43use std::time::Duration;
44
45use parking_lot::Mutex;
46
47use graphrefly_core::{BindingBoundary, Core, FnId, HandleId, NodeId, Sink};
48use smallvec::SmallVec;
49
50use crate::producer::{
51    MailboxEmitter, ProducerBinding, ProducerBuildFn, ProducerCtx, SubscribeOutcome,
52};
53
54// =========================================================================
55// Shared command type
56// =========================================================================
57
58/// Command sent from operator sink callbacks (sync) to the per-operator
59/// async task. Channel close = shutdown (producer deactivated).
60enum TemporalCmd {
61    /// New DATA handle from upstream. Already retained by the sink.
62    Value(HandleId),
63    /// Upstream COMPLETE. Flush pending if applicable, then complete.
64    Complete,
65    /// Upstream ERROR. Cancel all, propagate. Handle already retained.
66    Error(HandleId),
67}
68
69/// RAII wrapper for temporal operator tasks. On drop, closes the command
70/// channel first (triggering the task's cleanup path which releases pending
71/// handles), then aborts the task as a fallback if it hasn't exited.
72struct TemporalTaskGuard {
73    /// Dropping the sender closes the channel → task sees `None` on
74    /// `rx.recv()` and releases all pending handles before returning.
75    _tx: tokio::sync::mpsc::UnboundedSender<TemporalCmd>,
76    task: tokio::task::JoinHandle<()>,
77}
78
79impl Drop for TemporalTaskGuard {
80    fn drop(&mut self) {
81        // _tx drops first (field order), closing the channel.
82        // Abort as fallback in case the task is stuck on a non-channel await.
83        self.task.abort();
84    }
85}
86
87/// Simple abort-on-drop for tasks with no pending handle state (e.g. interval).
88struct AbortOnDrop(tokio::task::JoinHandle<()>);
89
90impl Drop for AbortOnDrop {
91    fn drop(&mut self) {
92        self.0.abort();
93    }
94}
95
96// =========================================================================
97// sample(source, notifier) — pure reactive, no timer
98// =========================================================================
99
100/// Emits the source's latest DATA each time `notifier` delivers DATA.
101///
102/// - Source COMPLETE clears stored value; subsequent notifier DATAs no-op.
103/// - Notifier COMPLETE terminates the sample node.
104/// - Either dep ERROR terminates immediately.
105#[must_use]
106#[allow(clippy::too_many_lines)]
107pub fn sample(
108    core: &Core,
109    binding: &Arc<dyn ProducerBinding>,
110    source: NodeId,
111    notifier: NodeId,
112) -> NodeId {
113    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
114        let core_s = ctx.core();
115        let binding_s = ctx.core().binding();
116        let em = ctx.emitter();
117        let pid = ctx.node_id();
118        let state: Rc<RefCell<SampleState>> = Rc::new(RefCell::new(SampleState::default()));
119
120        // --- source sink ---
121        let st = state.clone();
122        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
123        let core_src = em.clone();
124        let source_sink: Sink = Rc::new(move |msgs| {
125            enum Act {
126                Release(HandleId),
127                Error(HandleId),
128            }
129            let mut actions: SmallVec<[Act; 2]> = SmallVec::new();
130            {
131                let mut s = st.borrow_mut();
132                if s.terminated {
133                    return;
134                }
135                for m in msgs {
136                    match m.tier() {
137                        3 => {
138                            if let Some(h) = m.payload_handle() {
139                                if let Some(old) = s.latest.replace(h) {
140                                    actions.push(Act::Release(old));
141                                }
142                                bb.retain_handle(h);
143                            }
144                        }
145                        5 => {
146                            if let Some(h) = m.payload_handle() {
147                                s.terminated = true;
148                                if let Some(old) = s.latest.take() {
149                                    actions.push(Act::Release(old));
150                                }
151                                bb.retain_handle(h);
152                                actions.push(Act::Error(h));
153                            } else {
154                                s.source_completed = true;
155                                if let Some(old) = s.latest.take() {
156                                    actions.push(Act::Release(old));
157                                }
158                            }
159                        }
160                        _ => {}
161                    }
162                }
163            }
164            for a in actions {
165                match a {
166                    Act::Release(h) => bb.release_handle(h),
167                    Act::Error(h) => core_src.error(pid, h),
168                }
169            }
170        });
171
172        let src_outcome = ctx.subscribe_to(source, source_sink);
173        if matches!(src_outcome, SubscribeOutcome::Dead { .. }) {
174            state.borrow_mut().source_completed = true;
175        }
176
177        // --- notifier sink ---
178        let st2 = state.clone();
179        let core_n = em.clone();
180        let bb2: Arc<dyn BindingBoundary> = binding_s.clone();
181        let notifier_sink: Sink = Rc::new(move |msgs| {
182            let mut s = st2.borrow_mut();
183            if s.terminated {
184                return;
185            }
186            for m in msgs {
187                if s.terminated {
188                    return;
189                }
190                match m.tier() {
191                    3 if m.payload_handle().is_some()
192                        && !s.source_completed
193                        && s.latest.is_some() =>
194                    {
195                        let h = s.latest.unwrap();
196                        bb2.retain_handle(h);
197                        drop(s);
198                        core_n.emit(pid, h);
199                        // Re-acquire lock and continue processing remaining
200                        // batch messages (e.g. a trailing Complete).
201                        s = st2.borrow_mut();
202                    }
203                    5 => {
204                        if let Some(h) = m.payload_handle() {
205                            s.terminated = true;
206                            if let Some(old) = s.latest.take() {
207                                bb2.release_handle(old);
208                            }
209                            bb2.retain_handle(h);
210                            drop(s);
211                            core_n.error(pid, h);
212                            return;
213                        }
214                        // Notifier COMPLETE → self-complete.
215                        s.terminated = true;
216                        if let Some(old) = s.latest.take() {
217                            bb2.release_handle(old);
218                        }
219                        drop(s);
220                        core_n.complete(pid);
221                        return;
222                    }
223                    _ => {}
224                }
225            }
226        });
227
228        let not_outcome = ctx.subscribe_to(notifier, notifier_sink);
229        if matches!(not_outcome, SubscribeOutcome::Dead { .. }) {
230            let mut s = state.borrow_mut();
231            if !s.terminated {
232                s.terminated = true;
233                if let Some(old) = s.latest.take() {
234                    binding_s.release_handle(old);
235                }
236                drop(s);
237                core_s.complete(pid);
238            }
239        }
240    });
241
242    let fn_id = binding.register_producer_build(build);
243    core.register_producer(fn_id)
244        .expect("sample: register_producer failed")
245}
246
247#[derive(Default)]
248struct SampleState {
249    latest: Option<HandleId>,
250    source_completed: bool,
251    terminated: bool,
252}
253
254// =========================================================================
255// debounce(source, ms)
256// =========================================================================
257
258/// Emits after `delay` of quiet — each new upstream DATA resets the timer.
259///
260/// On upstream COMPLETE, flushes pending (if any) then completes.
261/// On upstream ERROR, cancels and propagates immediately.
262#[must_use]
263pub fn debounce(
264    core: &Core,
265    binding: &Arc<dyn ProducerBinding>,
266    source: NodeId,
267    ms: u64,
268) -> NodeId {
269    let delay = Duration::from_millis(ms);
270
271    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
272        let binding_s = ctx.core().binding();
273        let em = ctx.emitter();
274        let pid = ctx.node_id();
275        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
276
277        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
278        let tx_sink = tx.clone();
279        let tx_dead = tx.clone();
280        let task = tokio::spawn(debounce_task(rx, em.emitter(), pid, bb.clone(), delay));
281
282        // Store guard: drops tx (clean shutdown) then aborts task (fallback).
283        {
284            let st = ctx.storage();
285            let mut storage = st.lock();
286            let entry = storage.entry(pid).or_default();
287            entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
288        }
289
290        let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
291        let source_sink: Sink = Rc::new(move |msgs| {
292            for m in msgs {
293                match m.tier() {
294                    3 => {
295                        if let Some(h) = m.payload_handle() {
296                            bb_sink.retain_handle(h);
297                            if tx_sink.send(TemporalCmd::Value(h)).is_err() {
298                                bb_sink.release_handle(h);
299                            }
300                        }
301                    }
302                    5 => {
303                        if let Some(h) = m.payload_handle() {
304                            bb_sink.retain_handle(h);
305                            if tx_sink.send(TemporalCmd::Error(h)).is_err() {
306                                bb_sink.release_handle(h);
307                            }
308                        } else {
309                            let _ = tx_sink.send(TemporalCmd::Complete);
310                        }
311                    }
312                    _ => {}
313                }
314            }
315        });
316
317        let outcome = ctx.subscribe_to(source, source_sink);
318        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
319            let _ = tx_dead.send(TemporalCmd::Complete);
320        }
321    });
322
323    let fn_id = binding.register_producer_build(build);
324    core.register_producer(fn_id)
325        .expect("debounce: register_producer failed")
326}
327
328// S2b/D230/D232-AMEND: takes a `ProducerEmitter` (was `WeakCore`).
329// `em.{emit,complete,error}` post to the `Core`-owned mailbox and
330// internally release the payload handle if the `Core` is gone (F2,
331// QA-hardened) — so every old `if let Some(c)=core.upgrade(){ c.X }
332// else { binding.release_handle(h) }` collapses to a direct `em.X`.
333// (D274 dropped the `_or_defer` suffix from these method names.)
334// `em.is_core_gone()` preserves the old teardown-promptness where the
335// `else` branch also `return`ed (not leak-load-bearing — the task also
336// exits on channel-close — only prompt shutdown).
337async fn debounce_task(
338    mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
339    em: MailboxEmitter,
340    pid: NodeId,
341    binding: Arc<dyn BindingBoundary>,
342    delay: Duration,
343) {
344    let mut pending: Option<HandleId> = None;
345
346    loop {
347        if let Some(h) = pending {
348            tokio::select! {
349                biased;
350                cmd = rx.recv() => {
351                    match cmd {
352                        Some(TemporalCmd::Value(new_h)) => {
353                            binding.release_handle(h);
354                            pending = Some(new_h);
355                        }
356                        Some(TemporalCmd::Complete) => {
357                            // Flush pending then complete (em releases
358                            // `h` itself if the Core is gone).
359                            em.emit(pid, h);
360                            em.complete(pid);
361                            return;
362                        }
363                        Some(TemporalCmd::Error(err_h)) => {
364                            binding.release_handle(h);
365                            em.error(pid, err_h);
366                            return;
367                        }
368                        None => {
369                            // Channel closed — deactivated.
370                            binding.release_handle(h);
371                            return;
372                        }
373                    }
374                }
375                () = tokio::time::sleep(delay) => {
376                    // Timer fired — emit pending.
377                    em.emit(pid, h);
378                    if em.is_core_gone() {
379                        return;
380                    }
381                    pending = None;
382                }
383            }
384        } else {
385            // No pending — wait for command.
386            match rx.recv().await {
387                Some(TemporalCmd::Value(h)) => {
388                    pending = Some(h);
389                }
390                Some(TemporalCmd::Complete) => {
391                    em.complete(pid);
392                    return;
393                }
394                Some(TemporalCmd::Error(err_h)) => {
395                    em.error(pid, err_h);
396                    return;
397                }
398                None => return,
399            }
400        }
401    }
402}
403
404// =========================================================================
405// audit(source, ms)
406// =========================================================================
407
408/// On the first upstream DATA in each window, starts a `ms` timer. When
409/// the timer fires, emits the **latest** value received during the window.
410/// Subsequent DATA values within the window update the stored value but
411/// do NOT restart the timer.
412///
413/// Differs from [`debounce`]: debounce resets on each DATA (quiet-time);
414/// audit fires at a fixed interval after the first DATA in the window.
415#[must_use]
416pub fn audit(core: &Core, binding: &Arc<dyn ProducerBinding>, source: NodeId, ms: u64) -> NodeId {
417    let delay = Duration::from_millis(ms);
418
419    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
420        let binding_s = ctx.core().binding();
421        let em = ctx.emitter();
422        let pid = ctx.node_id();
423        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
424
425        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
426        let tx_sink = tx.clone();
427        let tx_dead = tx.clone();
428        let task = tokio::spawn(audit_task(rx, em.emitter(), pid, bb.clone(), delay));
429
430        {
431            let st = ctx.storage();
432            let mut storage = st.lock();
433            let entry = storage.entry(pid).or_default();
434            entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
435        }
436
437        let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
438        let source_sink: Sink = Rc::new(move |msgs| {
439            for m in msgs {
440                match m.tier() {
441                    3 => {
442                        if let Some(h) = m.payload_handle() {
443                            bb_sink.retain_handle(h);
444                            if tx_sink.send(TemporalCmd::Value(h)).is_err() {
445                                bb_sink.release_handle(h);
446                            }
447                        }
448                    }
449                    5 => {
450                        if let Some(h) = m.payload_handle() {
451                            bb_sink.retain_handle(h);
452                            if tx_sink.send(TemporalCmd::Error(h)).is_err() {
453                                bb_sink.release_handle(h);
454                            }
455                        } else {
456                            let _ = tx_sink.send(TemporalCmd::Complete);
457                        }
458                    }
459                    _ => {}
460                }
461            }
462        });
463
464        let outcome = ctx.subscribe_to(source, source_sink);
465        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
466            let _ = tx_dead.send(TemporalCmd::Complete);
467        }
468    });
469
470    let fn_id = binding.register_producer_build(build);
471    core.register_producer(fn_id)
472        .expect("audit: register_producer failed")
473}
474
475// S2b/D230/D232-AMEND: `core: WeakCore` → `em: ProducerEmitter` (same
476// collapse rationale as `debounce_task`).
477async fn audit_task(
478    mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
479    em: MailboxEmitter,
480    pid: NodeId,
481    binding: Arc<dyn BindingBoundary>,
482    delay: Duration,
483) {
484    // Outer loop: wait for first DATA to start a window.
485    loop {
486        match rx.recv().await {
487            Some(TemporalCmd::Value(h)) => {
488                // First value in window — start timer, collect updates.
489                let mut latest = h;
490
491                tokio::select! {
492                    biased;
493                    // Drain commands until timer fires.
494                    () = async {
495                        loop {
496                            tokio::select! {
497                                biased;
498                                cmd = rx.recv() => {
499                                    match cmd {
500                                        Some(TemporalCmd::Value(new_h)) => {
501                                            binding.release_handle(latest);
502                                            latest = new_h;
503                                        }
504                                        Some(TemporalCmd::Complete) => {
505                                            // Emit latest, then complete.
506                                            em.emit(pid, latest);
507                                            em.complete(pid);
508                                            return; // exits the async block
509                                        }
510                                        Some(TemporalCmd::Error(err_h)) => {
511                                            binding.release_handle(latest);
512                                            em.error(pid, err_h);
513                                            return;
514                                        }
515                                        None => {
516                                            binding.release_handle(latest);
517                                            return;
518                                        }
519                                    }
520                                }
521                            }
522                        }
523                    } => {
524                        return; // Terminal command handled inside async block.
525                    }
526                    () = tokio::time::sleep(delay) => {
527                        // Timer fired — emit latest, window closes.
528                        em.emit(pid, latest);
529                        if em.is_core_gone() {
530                            return;
531                        }
532                        // Continue outer loop — wait for next window.
533                    }
534                }
535            }
536            Some(TemporalCmd::Complete) => {
537                em.complete(pid);
538                return;
539            }
540            Some(TemporalCmd::Error(err_h)) => {
541                em.error(pid, err_h);
542                return;
543            }
544            None => return,
545        }
546    }
547}
548
549// =========================================================================
550// delay(source, ms)
551// =========================================================================
552
553/// Delays each upstream DATA by `ms` milliseconds. Multiple values can
554/// be in-flight simultaneously, each with its own timer.
555///
556/// - COMPLETE waits for all pending delays, then completes.
557/// - ERROR cancels all pending and propagates immediately.
558#[must_use]
559pub fn delay(core: &Core, binding: &Arc<dyn ProducerBinding>, source: NodeId, ms: u64) -> NodeId {
560    let delay_dur = Duration::from_millis(ms);
561
562    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
563        let binding_s = ctx.core().binding();
564        let em = ctx.emitter();
565        let pid = ctx.node_id();
566        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
567
568        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
569        let tx_sink = tx.clone();
570        let tx_dead = tx.clone();
571        let task = tokio::spawn(delay_task(rx, em.emitter(), pid, bb.clone(), delay_dur));
572
573        {
574            let st = ctx.storage();
575            let mut storage = st.lock();
576            let entry = storage.entry(pid).or_default();
577            entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
578        }
579
580        let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
581        let source_sink: Sink = Rc::new(move |msgs| {
582            for m in msgs {
583                match m.tier() {
584                    3 => {
585                        if let Some(h) = m.payload_handle() {
586                            bb_sink.retain_handle(h);
587                            if tx_sink.send(TemporalCmd::Value(h)).is_err() {
588                                bb_sink.release_handle(h);
589                            }
590                        }
591                    }
592                    5 => {
593                        if let Some(h) = m.payload_handle() {
594                            bb_sink.retain_handle(h);
595                            if tx_sink.send(TemporalCmd::Error(h)).is_err() {
596                                bb_sink.release_handle(h);
597                            }
598                        } else {
599                            let _ = tx_sink.send(TemporalCmd::Complete);
600                        }
601                    }
602                    _ => {}
603                }
604            }
605        });
606
607        let outcome = ctx.subscribe_to(source, source_sink);
608        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
609            let _ = tx_dead.send(TemporalCmd::Complete);
610        }
611    });
612
613    let fn_id = binding.register_producer_build(build);
614    core.register_producer(fn_id)
615        .expect("delay: register_producer failed")
616}
617
618// S2b/D230/D232-AMEND: `core: WeakCore` → `em: ProducerEmitter`.
619async fn delay_task(
620    mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
621    em: MailboxEmitter,
622    pid: NodeId,
623    binding: Arc<dyn BindingBoundary>,
624    delay: Duration,
625) {
626    let mut queue: VecDeque<(tokio::time::Instant, HandleId)> = VecDeque::new();
627    let mut complete_pending = false;
628
629    loop {
630        let next_fire = queue.front().map(|(deadline, _)| *deadline);
631
632        tokio::select! {
633            biased;
634            cmd = rx.recv() => {
635                match cmd {
636                    Some(TemporalCmd::Value(h)) => {
637                        queue.push_back((tokio::time::Instant::now() + delay, h));
638                    }
639                    Some(TemporalCmd::Complete) => {
640                        complete_pending = true;
641                        if queue.is_empty() {
642                            em.complete(pid);
643                            return;
644                        }
645                        // Wait for pending timers to drain.
646                    }
647                    Some(TemporalCmd::Error(err_h)) => {
648                        // Release all pending.
649                        for (_, h) in queue.drain(..) {
650                            binding.release_handle(h);
651                        }
652                        em.error(pid, err_h);
653                        return;
654                    }
655                    None => {
656                        for (_, h) in queue.drain(..) {
657                            binding.release_handle(h);
658                        }
659                        return;
660                    }
661                }
662            }
663            () = sleep_until_or_forever(next_fire) => {
664                // Fire all expired.
665                let now = tokio::time::Instant::now();
666                while let Some(&(deadline, _)) = queue.front() {
667                    if deadline <= now {
668                        let (_, h) = queue.pop_front().unwrap();
669                        em.emit(pid, h);
670                        if em.is_core_gone() {
671                            for (_, h2) in queue.drain(..) {
672                                binding.release_handle(h2);
673                            }
674                            return;
675                        }
676                    } else {
677                        break;
678                    }
679                }
680                if complete_pending && queue.is_empty() {
681                    em.complete(pid);
682                    return;
683                }
684            }
685        }
686    }
687}
688
689// =========================================================================
690// throttle(source, ms, opts)
691// =========================================================================
692
693/// Options for [`throttle`].
694#[derive(Debug, Clone, Copy)]
695pub struct ThrottleOpts {
696    /// Emit immediately at the leading edge of each window (default: true).
697    pub leading: bool,
698    /// Emit the latest value at the trailing edge of each window (default: false).
699    pub trailing: bool,
700}
701
702impl Default for ThrottleOpts {
703    fn default() -> Self {
704        Self {
705            leading: true,
706            trailing: false,
707        }
708    }
709}
710
711/// Rate-limits upstream emissions to at most one per `ms`-millisecond window.
712///
713/// With default options (`leading: true, trailing: false`), the first DATA
714/// in each window is emitted immediately; subsequent DATA within the window
715/// is dropped. With `trailing: true`, the latest DATA is emitted at window end.
716///
717/// On upstream COMPLETE, flushes trailing (if any) then completes.
718#[must_use]
719pub fn throttle(
720    core: &Core,
721    binding: &Arc<dyn ProducerBinding>,
722    source: NodeId,
723    ms: u64,
724    opts: ThrottleOpts,
725) -> NodeId {
726    let window = Duration::from_millis(ms);
727
728    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
729        let binding_s = ctx.core().binding();
730        let em = ctx.emitter();
731        let pid = ctx.node_id();
732        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
733
734        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
735        let tx_sink = tx.clone();
736        let tx_dead = tx.clone();
737        let task = tokio::spawn(throttle_task(
738            rx,
739            em.emitter(),
740            pid,
741            bb.clone(),
742            window,
743            opts,
744        ));
745
746        {
747            let st = ctx.storage();
748            let mut storage = st.lock();
749            let entry = storage.entry(pid).or_default();
750            entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
751        }
752
753        let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
754        let source_sink: Sink = Rc::new(move |msgs| {
755            for m in msgs {
756                match m.tier() {
757                    3 => {
758                        if let Some(h) = m.payload_handle() {
759                            bb_sink.retain_handle(h);
760                            if tx_sink.send(TemporalCmd::Value(h)).is_err() {
761                                bb_sink.release_handle(h);
762                            }
763                        }
764                    }
765                    5 => {
766                        if let Some(h) = m.payload_handle() {
767                            bb_sink.retain_handle(h);
768                            if tx_sink.send(TemporalCmd::Error(h)).is_err() {
769                                bb_sink.release_handle(h);
770                            }
771                        } else {
772                            let _ = tx_sink.send(TemporalCmd::Complete);
773                        }
774                    }
775                    _ => {}
776                }
777            }
778        });
779
780        let outcome = ctx.subscribe_to(source, source_sink);
781        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
782            let _ = tx_dead.send(TemporalCmd::Complete);
783        }
784    });
785
786    let fn_id = binding.register_producer_build(build);
787    core.register_producer(fn_id)
788        .expect("throttle: register_producer failed")
789}
790
791async fn throttle_task(
792    mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
793    // S2b/D230/D232-AMEND: `WeakCore` → `ProducerEmitter`.
794    em: MailboxEmitter,
795    pid: NodeId,
796    binding: Arc<dyn BindingBoundary>,
797    window: Duration,
798    opts: ThrottleOpts,
799) {
800    let mut trailing_pending: Option<HandleId> = None;
801    let mut window_deadline: Option<tokio::time::Instant> = None;
802
803    loop {
804        let fire_at = if opts.trailing { window_deadline } else { None };
805
806        tokio::select! {
807            biased;
808            cmd = rx.recv() => {
809                match cmd {
810                    Some(TemporalCmd::Value(h)) => {
811                        let in_window = window_deadline
812                            .is_some_and(|d| tokio::time::Instant::now() < d);
813
814                        if !in_window {
815                            // Window open — start new window.
816                            window_deadline = Some(tokio::time::Instant::now() + window);
817                            if opts.leading {
818                                em.emit(pid, h);
819                                if em.is_core_gone() {
820                                    release_opt(&mut trailing_pending, &*binding);
821                                    return;
822                                }
823                            } else if opts.trailing {
824                                if let Some(old) = trailing_pending.replace(h) {
825                                    binding.release_handle(old);
826                                }
827                            } else {
828                                binding.release_handle(h);
829                            }
830                        } else if opts.trailing {
831                            // Inside window — store for trailing.
832                            if let Some(old) = trailing_pending.replace(h) {
833                                binding.release_handle(old);
834                            }
835                        } else {
836                            binding.release_handle(h);
837                        }
838                    }
839                    Some(TemporalCmd::Complete) => {
840                        if let Some(h) = trailing_pending.take() {
841                            em.emit(pid, h);
842                        }
843                        em.complete(pid);
844                        return;
845                    }
846                    Some(TemporalCmd::Error(err_h)) => {
847                        release_opt(&mut trailing_pending, &*binding);
848                        em.error(pid, err_h);
849                        return;
850                    }
851                    None => {
852                        release_opt(&mut trailing_pending, &*binding);
853                        return;
854                    }
855                }
856            }
857            () = sleep_until_or_forever(fire_at) => {
858                // Window expired — emit trailing if any, reopen window.
859                window_deadline = None;
860                if let Some(h) = trailing_pending.take() {
861                    em.emit(pid, h);
862                    if em.is_core_gone() {
863                        return;
864                    }
865                    // Start new window for the trailing emission.
866                    window_deadline = Some(tokio::time::Instant::now() + window);
867                }
868            }
869        }
870    }
871}
872
873// =========================================================================
874// interval(period_ms) — timer source
875// =========================================================================
876
877/// Source that emits a monotonically increasing counter (`1, 2, 3, …`)
878/// every `period_ms` milliseconds. Counter starts at 1 (not 0) because
879/// `HandleId(0)` is the `NO_HANDLE` sentinel. Resubscribable: counter
880/// resets on deactivation + reactivation.
881///
882/// The emitted values use raw `HandleId::new(counter)`. Bindings should
883/// register these as integer handles.
884#[must_use]
885pub fn interval(core: &Core, binding: &Arc<dyn ProducerBinding>, period_ms: u64) -> NodeId {
886    let period = Duration::from_millis(period_ms);
887
888    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
889        let binding_s = ctx.core().binding();
890        let em = ctx.emitter();
891        let pid = ctx.node_id();
892        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
893        let em_task = em.emitter();
894
895        let task = tokio::spawn(async move {
896            let mut ticker = tokio::time::interval(period);
897            ticker.tick().await; // First tick is immediate — skip it.
898            let mut counter: u64 = 1; // Start at 1: HandleId(0) = NO_HANDLE.
899            loop {
900                ticker.tick().await;
901                // S2b/D230: stop ticking once the Core is gone (was the
902                // `weak.upgrade() == None` break); check BEFORE retaining
903                // so a dead Core never leaks a fresh counter handle.
904                if em_task.is_core_gone() {
905                    break;
906                }
907                let h = HandleId::new(counter);
908                bb.retain_handle(h);
909                em_task.emit(pid, h);
910                counter += 1;
911            }
912        });
913
914        {
915            let st = ctx.storage();
916            let mut storage = st.lock();
917            let entry = storage.entry(pid).or_default();
918            entry.op_state = Some(Box::new(AbortOnDrop(task)));
919        }
920    });
921
922    let fn_id = binding.register_producer_build(build);
923    core.register_producer(fn_id)
924        .expect("interval: register_producer failed")
925}
926
927// =========================================================================
928// Helpers
929// =========================================================================
930
931/// Sleep until the given instant, or sleep forever if `None`.
932async fn sleep_until_or_forever(deadline: Option<tokio::time::Instant>) {
933    match deadline {
934        Some(d) => tokio::time::sleep_until(d).await,
935        None => std::future::pending::<()>().await,
936    }
937}
938
939/// Release an optional handle and set it to `None`.
940fn release_opt(opt: &mut Option<HandleId>, binding: &dyn BindingBoundary) {
941    if let Some(h) = opt.take() {
942        binding.release_handle(h);
943    }
944}
945
946// =========================================================================
947// timeout(source, ms, error_handle)
948// =========================================================================
949
950/// Errors if no upstream DATA arrives within `ms` milliseconds after
951/// subscribe or after the previous DATA. Each DATA resets the timer.
952///
953/// On timeout: retains `error_handle` and emits ERROR with it.
954/// On upstream COMPLETE: cancels timer, forwards complete.
955/// On upstream ERROR: cancels timer, forwards error.
956#[must_use]
957pub fn timeout(
958    core: &Core,
959    binding: &Arc<dyn ProducerBinding>,
960    source: NodeId,
961    ms: u64,
962    error_handle: HandleId,
963) -> NodeId {
964    let duration = Duration::from_millis(ms);
965
966    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
967        let binding_s = ctx.core().binding();
968        let em = ctx.emitter();
969        let pid = ctx.node_id();
970        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
971
972        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
973        let tx_sink = tx.clone();
974        let tx_dead = tx.clone();
975        let task = tokio::spawn(timeout_task(
976            rx,
977            em.emitter(),
978            pid,
979            bb.clone(),
980            duration,
981            error_handle,
982        ));
983
984        {
985            let st = ctx.storage();
986            let mut storage = st.lock();
987            let entry = storage.entry(pid).or_default();
988            entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
989        }
990
991        let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
992        let source_sink: Sink = Rc::new(move |msgs| {
993            for m in msgs {
994                match m.tier() {
995                    3 => {
996                        if let Some(h) = m.payload_handle() {
997                            bb_sink.retain_handle(h);
998                            if tx_sink.send(TemporalCmd::Value(h)).is_err() {
999                                bb_sink.release_handle(h);
1000                            }
1001                        }
1002                    }
1003                    5 => {
1004                        if let Some(h) = m.payload_handle() {
1005                            bb_sink.retain_handle(h);
1006                            if tx_sink.send(TemporalCmd::Error(h)).is_err() {
1007                                bb_sink.release_handle(h);
1008                            }
1009                        } else {
1010                            let _ = tx_sink.send(TemporalCmd::Complete);
1011                        }
1012                    }
1013                    _ => {}
1014                }
1015            }
1016        });
1017
1018        let outcome = ctx.subscribe_to(source, source_sink);
1019        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
1020            let _ = tx_dead.send(TemporalCmd::Complete);
1021        }
1022    });
1023
1024    let fn_id = binding.register_producer_build(build);
1025    core.register_producer(fn_id)
1026        .expect("timeout: register_producer failed")
1027}
1028
1029// S2b/D230/D232-AMEND: `WeakCore` → `ProducerEmitter`.
1030async fn timeout_task(
1031    mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
1032    em: MailboxEmitter,
1033    pid: NodeId,
1034    binding: Arc<dyn BindingBoundary>,
1035    duration: Duration,
1036    error_handle: HandleId,
1037) {
1038    // The timer starts immediately on subscribe — if no DATA arrives
1039    // within `duration`, we fire the timeout error.
1040    loop {
1041        tokio::select! {
1042            biased;
1043            cmd = rx.recv() => {
1044                match cmd {
1045                    Some(TemporalCmd::Value(h)) => {
1046                        // DATA arrived — forward it and reset the timer
1047                        // (the next loop iteration restarts the sleep).
1048                        em.emit(pid, h);
1049                        if em.is_core_gone() {
1050                            return;
1051                        }
1052                        // Continue loop → resets the sleep timer.
1053                    }
1054                    Some(TemporalCmd::Complete) => {
1055                        em.complete(pid);
1056                        return;
1057                    }
1058                    Some(TemporalCmd::Error(err_h)) => {
1059                        em.error(pid, err_h);
1060                        return;
1061                    }
1062                    None => return,
1063                }
1064            }
1065            () = tokio::time::sleep(duration) => {
1066                // Timeout fired — emit error. Retain BEFORE the post
1067                // (skip if the Core is gone — nothing to error into).
1068                if !em.is_core_gone() {
1069                    binding.retain_handle(error_handle);
1070                    em.error(pid, error_handle);
1071                }
1072                return;
1073            }
1074        }
1075    }
1076}
1077
1078// =========================================================================
1079// buffer_time(source, ms, pack_fn_id)
1080// =========================================================================
1081
1082/// Command sent from the `buffer_time` sink to its async task.
1083enum BufferTimeCmd {
1084    /// New DATA handle from upstream. Already retained by the sink.
1085    Value(HandleId),
1086    /// Upstream COMPLETE.
1087    Complete,
1088    /// Upstream ERROR. Handle already retained.
1089    Error(HandleId),
1090}
1091
1092/// Collects upstream DATA handles into a buffer and flushes them as a
1093/// packed tuple every `ms` milliseconds.
1094///
1095/// - On interval tick: packs buffered handles via
1096///   `binding.pack_tuple(pack_fn_id, &buf)`, emits the result, releases
1097///   individual handles, clears buffer.
1098/// - On upstream COMPLETE: flushes remaining buffer, then completes.
1099/// - On upstream ERROR: releases buffered handles, forwards error.
1100#[must_use]
1101pub fn buffer_time(
1102    core: &Core,
1103    binding: &Arc<dyn ProducerBinding>,
1104    source: NodeId,
1105    ms: u64,
1106    pack_fn_id: FnId,
1107) -> NodeId {
1108    let period = Duration::from_millis(ms);
1109
1110    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
1111        let binding_s = ctx.core().binding();
1112        let em = ctx.emitter();
1113        let pid = ctx.node_id();
1114        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
1115
1116        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
1117        let tx_sink = tx.clone();
1118        let tx_dead = tx.clone();
1119        let task = tokio::spawn(buffer_time_task(
1120            rx,
1121            em.emitter(),
1122            pid,
1123            bb.clone(),
1124            period,
1125            pack_fn_id,
1126        ));
1127
1128        {
1129            let st = ctx.storage();
1130            let mut storage = st.lock();
1131            let entry = storage.entry(pid).or_default();
1132            entry.op_state = Some(Box::new(BufferTimeTaskGuard { _tx: tx, task }));
1133        }
1134
1135        let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
1136        let source_sink: Sink = Rc::new(move |msgs| {
1137            for m in msgs {
1138                match m.tier() {
1139                    3 => {
1140                        if let Some(h) = m.payload_handle() {
1141                            bb_sink.retain_handle(h);
1142                            if tx_sink.send(BufferTimeCmd::Value(h)).is_err() {
1143                                bb_sink.release_handle(h);
1144                            }
1145                        }
1146                    }
1147                    5 => {
1148                        if let Some(h) = m.payload_handle() {
1149                            bb_sink.retain_handle(h);
1150                            if tx_sink.send(BufferTimeCmd::Error(h)).is_err() {
1151                                bb_sink.release_handle(h);
1152                            }
1153                        } else {
1154                            let _ = tx_sink.send(BufferTimeCmd::Complete);
1155                        }
1156                    }
1157                    _ => {}
1158                }
1159            }
1160        });
1161
1162        let outcome = ctx.subscribe_to(source, source_sink);
1163        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
1164            let _ = tx_dead.send(BufferTimeCmd::Complete);
1165        }
1166    });
1167
1168    let fn_id = binding.register_producer_build(build);
1169    core.register_producer(fn_id)
1170        .expect("buffer_time: register_producer failed")
1171}
1172
1173/// RAII guard for `buffer_time` task — same pattern as [`TemporalTaskGuard`]
1174/// but with `BufferTimeCmd` channel.
1175struct BufferTimeTaskGuard {
1176    _tx: tokio::sync::mpsc::UnboundedSender<BufferTimeCmd>,
1177    task: tokio::task::JoinHandle<()>,
1178}
1179
1180impl Drop for BufferTimeTaskGuard {
1181    fn drop(&mut self) {
1182        self.task.abort();
1183    }
1184}
1185
1186// S2b/D230/D232-AMEND: `WeakCore` → `ProducerEmitter`. Note: the
1187// emit-path now always `pack_tuple`s even on a gone Core (em releases
1188// the packed handle); the buffered component handles are drained-released
1189// exactly as before — behaviour-equivalent, just one extra teardown-only
1190// pack FFI.
1191async fn buffer_time_task(
1192    mut rx: tokio::sync::mpsc::UnboundedReceiver<BufferTimeCmd>,
1193    em: MailboxEmitter,
1194    pid: NodeId,
1195    binding: Arc<dyn BindingBoundary>,
1196    period: Duration,
1197    pack_fn_id: FnId,
1198) {
1199    let mut buf: Vec<HandleId> = Vec::new();
1200    let mut ticker = tokio::time::interval(period);
1201    ticker.tick().await; // First tick is immediate — skip it.
1202
1203    loop {
1204        tokio::select! {
1205            biased;
1206            cmd = rx.recv() => {
1207                match cmd {
1208                    Some(BufferTimeCmd::Value(h)) => {
1209                        buf.push(h);
1210                    }
1211                    Some(BufferTimeCmd::Complete) => {
1212                        // Flush remaining buffer then complete.
1213                        if !buf.is_empty() {
1214                            let packed = binding.pack_tuple(pack_fn_id, &buf);
1215                            em.emit(pid, packed);
1216                            for h in buf.drain(..) {
1217                                binding.release_handle(h);
1218                            }
1219                        }
1220                        em.complete(pid);
1221                        return;
1222                    }
1223                    Some(BufferTimeCmd::Error(err_h)) => {
1224                        for h in buf.drain(..) {
1225                            binding.release_handle(h);
1226                        }
1227                        em.error(pid, err_h);
1228                        return;
1229                    }
1230                    None => {
1231                        // Channel closed — deactivated. Release buffered.
1232                        for h in buf.drain(..) {
1233                            binding.release_handle(h);
1234                        }
1235                        return;
1236                    }
1237                }
1238            }
1239            _ = ticker.tick() => {
1240                if !buf.is_empty() {
1241                    let packed = binding.pack_tuple(pack_fn_id, &buf);
1242                    em.emit(pid, packed);
1243                    for h in buf.drain(..) {
1244                        binding.release_handle(h);
1245                    }
1246                    if em.is_core_gone() {
1247                        return;
1248                    }
1249                }
1250            }
1251        }
1252    }
1253}
1254
1255// =========================================================================
1256// window_time(source, ms)
1257// =========================================================================
1258
1259/// Command sent from the `window_time` sink to its async task.
1260enum WindowTimeCmd {
1261    /// New DATA handle from upstream. Already retained by the sink.
1262    Value(HandleId),
1263    /// Upstream COMPLETE.
1264    Complete,
1265    /// Upstream ERROR. Handle already retained.
1266    Error(HandleId),
1267}
1268
1269/// Rotates sub-node windows every `ms` milliseconds.
1270///
1271/// Creates a new inner state node each interval tick. Upstream DATA is
1272/// forwarded to the current inner window node. On tick, the current
1273/// window is completed, a new inner node is created, and its identity
1274/// is emitted as a DATA handle (via `binding.intern_node`).
1275///
1276/// - On upstream COMPLETE: completes current window, then completes self.
1277/// - On upstream ERROR: errors current window, then errors self.
1278#[must_use]
1279pub fn window_time(
1280    core: &Core,
1281    binding: &Arc<dyn ProducerBinding>,
1282    source: NodeId,
1283    ms: u64,
1284) -> NodeId {
1285    let period = Duration::from_millis(ms);
1286
1287    // S2b/D231: register the reusable no-op inner-window build at FACTORY
1288    // scope (where `binding: &Arc<dyn ProducerBinding>` is in hand) — the
1289    // build closure no longer holds a `ProducerBinding`, only `ctx`.
1290    // `FnId` is `Copy`; capture it into the build/task.
1291    let noop_fn_id = binding.register_producer_build(Box::new(|_ctx: ProducerCtx<'_>| {
1292        // Inner window nodes are passive — all emissions are driven by
1293        // the parent window_time task via the mailbox.
1294    }));
1295
1296    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
1297        let core_s = ctx.core();
1298        let binding_s = ctx.core().binding();
1299        let em = ctx.emitter();
1300        let pid = ctx.node_id();
1301        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
1302
1303        // Create the first inner window node and emit its handle.
1304        let first_inner = core_s
1305            .register_producer(noop_fn_id)
1306            .expect("window_time inner: register_producer failed");
1307        let first_handle = bb.intern_node(first_inner);
1308        core_s.emit(pid, first_handle);
1309
1310        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
1311        let tx_sink = tx.clone();
1312        let tx_dead = tx.clone();
1313        let task = tokio::spawn(window_time_task(
1314            rx,
1315            em.emitter(),
1316            pid,
1317            first_inner,
1318            bb.clone(),
1319            period,
1320            noop_fn_id,
1321        ));
1322
1323        {
1324            let st = ctx.storage();
1325            let mut storage = st.lock();
1326            let entry = storage.entry(pid).or_default();
1327            entry.op_state = Some(Box::new(WindowTimeTaskGuard { _tx: tx, task }));
1328        }
1329
1330        let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
1331        let source_sink: Sink = Rc::new(move |msgs| {
1332            for m in msgs {
1333                match m.tier() {
1334                    3 => {
1335                        if let Some(h) = m.payload_handle() {
1336                            bb_sink.retain_handle(h);
1337                            if tx_sink.send(WindowTimeCmd::Value(h)).is_err() {
1338                                bb_sink.release_handle(h);
1339                            }
1340                        }
1341                    }
1342                    5 => {
1343                        if let Some(h) = m.payload_handle() {
1344                            bb_sink.retain_handle(h);
1345                            if tx_sink.send(WindowTimeCmd::Error(h)).is_err() {
1346                                bb_sink.release_handle(h);
1347                            }
1348                        } else {
1349                            let _ = tx_sink.send(WindowTimeCmd::Complete);
1350                        }
1351                    }
1352                    _ => {}
1353                }
1354            }
1355        });
1356
1357        let outcome = ctx.subscribe_to(source, source_sink);
1358        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
1359            let _ = tx_dead.send(WindowTimeCmd::Complete);
1360        }
1361    });
1362
1363    let fn_id = binding.register_producer_build(build);
1364    core.register_producer(fn_id)
1365        .expect("window_time: register_producer failed")
1366}
1367
1368/// RAII guard for `window_time` task.
1369struct WindowTimeTaskGuard {
1370    _tx: tokio::sync::mpsc::UnboundedSender<WindowTimeCmd>,
1371    task: tokio::task::JoinHandle<()>,
1372}
1373
1374impl Drop for WindowTimeTaskGuard {
1375    fn drop(&mut self) {
1376        self.task.abort();
1377    }
1378}
1379
1380// S2b/D230/D234: `WeakCore` → `ProducerEmitter`; the window rotation
1381// does task-side topology mutation (`register_producer`) so it routes
1382// through `em.defer` (`CoreFull`). `current_inner` (the routing
1383// selector) becomes a shared `Rc<RefCell<NodeId>>` read INSIDE the
1384// owner-serialized defer closures, so the FIFO mailbox order
1385// (arrival order) keeps every forward routed to the window live at the
1386// time it arrived — the D234 invariant (same as `window`).
1387async fn window_time_task(
1388    mut rx: tokio::sync::mpsc::UnboundedReceiver<WindowTimeCmd>,
1389    em: MailboxEmitter,
1390    pid: NodeId,
1391    initial_inner: NodeId,
1392    binding: Arc<dyn BindingBoundary>,
1393    period: Duration,
1394    noop_fn_id: FnId,
1395) {
1396    // Cat-1/2 (D273): the tokio-driven `WindowTimeCmd` loop posts deferred
1397    // closures into `em.defer(move |c| ...)` which requires `Send + 'static`.
1398    // The captured `current_inner` MUST be `Send + Sync` ⇒ stays as
1399    // `Arc<Mutex<...>>` (compiler-enforced via the `Send + Sync` bound on
1400    // `defer`'s `F`).
1401    #[allow(clippy::arc_with_non_send_sync)]
1402    let current_inner = Arc::new(Mutex::new(initial_inner));
1403    let mut ticker = tokio::time::interval(period);
1404    ticker.tick().await; // First tick is immediate — skip it.
1405
1406    loop {
1407        tokio::select! {
1408            biased;
1409            cmd = rx.recv() => {
1410                match cmd {
1411                    Some(WindowTimeCmd::Value(h)) => {
1412                        // Forward DATA to whatever window is current when
1413                        // this defer runs (FIFO-ordered after any rotation
1414                        // posted earlier).
1415                        let cur = current_inner.clone();
1416                        let b = binding.clone();
1417                        if !em.defer(move |c| {
1418                            c.emit(*cur.lock(), h);
1419                        }) {
1420                            b.release_handle(h);
1421                        }
1422                    }
1423                    Some(WindowTimeCmd::Complete) => {
1424                        let cur = current_inner.clone();
1425                        let _ = em.defer(move |c| {
1426                            c.complete(*cur.lock());
1427                            c.complete(pid);
1428                        });
1429                        return;
1430                    }
1431                    Some(WindowTimeCmd::Error(err_h)) => {
1432                        // err_h retained once by the sink; the 2nd retain
1433                        // (for the self-error) is done INSIDE the closure
1434                        // so a Core-gone drop leaves exactly the sink's
1435                        // one retain to release (mirrors the old `else`).
1436                        let cur = current_inner.clone();
1437                        let b = binding.clone();
1438                        let b2 = binding.clone();
1439                        if !em.defer(move |c| {
1440                            b2.retain_handle(err_h);
1441                            c.error(*cur.lock(), err_h);
1442                            c.error(pid, err_h);
1443                        }) {
1444                            b.release_handle(err_h);
1445                        }
1446                        return;
1447                    }
1448                    None => {
1449                        // Channel closed — deactivated.
1450                        return;
1451                    }
1452                }
1453            }
1454            _ = ticker.tick() => {
1455                // Window rotation: complete current, create new, emit
1456                // handle — ALL inside one owner-side closure so the
1457                // `register_producer` + `current_inner` swap is atomic
1458                // w.r.t. the FIFO drain (D234).
1459                let cur = current_inner.clone();
1460                let b = binding.clone();
1461                let _ = em.defer(move |c| {
1462                    let old = *cur.lock();
1463                    c.complete(old);
1464                    match c.register_producer(noop_fn_id) {
1465                        Ok(new_inner) => {
1466                            *cur.lock() = new_inner;
1467                            let h = b.intern_node(new_inner);
1468                            c.emit(pid, h);
1469                        }
1470                        Err(_) => {
1471                            // Registration failed — complete self.
1472                            c.complete(pid);
1473                        }
1474                    }
1475                });
1476                if em.is_core_gone() {
1477                    return;
1478                }
1479            }
1480        }
1481    }
1482}