Skip to main content

graphrefly_operators/
temporal.rs

1//! Temporal operators — time-dependent transforms on reactive streams.
2//!
3//! # Operators
4//!
5//! - [`sample`] — pure reactive; emits source's latest value when notifier
6//!   fires. No timer needed.
7//! - [`debounce`] — emits after `ms` of quiet (no new upstream DATA).
8//! - [`throttle`] — rate-limits: at most one emission per `ms` window.
9//!   Configurable leading / trailing edge.
10//! - [`delay`] — delays each upstream DATA by `ms`. Multiple in-flight.
11//! - [`audit`] — on first DATA, starts a `ms` timer; when it fires, emits
12//!   the latest value. Timer does NOT restart on subsequent DATA within
13//!   the window.
14//! - [`interval`] — source that emits a monotonic counter every `period_ms`.
15//! - [`timeout`] — errors if no DATA arrives within `ms` after subscribe or
16//!   after the previous DATA.
17//! - [`buffer_time`] — collects upstream DATA into a buffer and flushes as
18//!   a packed tuple every `ms` milliseconds.
19//! - [`window_time`] — rotates inner sub-nodes every `ms` milliseconds;
20//!   upstream DATA is forwarded to the current window node.
21//!
22//! # Architecture
23//!
24//! Timer operators (debounce, throttle, delay, audit) spawn a **per-operator
25//! tokio task** that owns all pending state (handles, counters) exclusively.
26//! The sync sink callback sends [`TemporalCmd`] commands; the async task
27//! manages timers via `tokio::time` and calls `Core::emit_or_defer` /
28//! `complete_or_defer` / `error_or_defer` when ready. This avoids the
29//! double-ownership problem that would arise from tracking pending handles
30//! in both the operator's state mutex and the generic timer substrate.
31//!
32//! `sample` is a pure-reactive producer-pattern node with two subscriptions
33//! (source + notifier) and no timer dependency.
34//!
35//! All temporal operators require a **tokio runtime context** at activation
36//! time (first subscriber triggers the build closure, which calls
37//! `tokio::spawn`).
38
39use std::collections::VecDeque;
40use std::sync::{Arc, Weak};
41use std::time::Duration;
42
43use parking_lot::Mutex;
44
45use graphrefly_core::{BindingBoundary, Core, FnId, HandleId, NodeId, Sink, WeakCore};
46use smallvec::SmallVec;
47
48use crate::producer::{ProducerBinding, ProducerBuildFn, ProducerCtx, SubscribeOutcome};
49
50// =========================================================================
51// Shared command type
52// =========================================================================
53
54/// Command sent from operator sink callbacks (sync) to the per-operator
55/// async task. Channel close = shutdown (producer deactivated).
56enum TemporalCmd {
57    /// New DATA handle from upstream. Already retained by the sink.
58    Value(HandleId),
59    /// Upstream COMPLETE. Flush pending if applicable, then complete.
60    Complete,
61    /// Upstream ERROR. Cancel all, propagate. Handle already retained.
62    Error(HandleId),
63}
64
65/// RAII wrapper for temporal operator tasks. On drop, closes the command
66/// channel first (triggering the task's cleanup path which releases pending
67/// handles), then aborts the task as a fallback if it hasn't exited.
68struct TemporalTaskGuard {
69    /// Dropping the sender closes the channel → task sees `None` on
70    /// `rx.recv()` and releases all pending handles before returning.
71    _tx: tokio::sync::mpsc::UnboundedSender<TemporalCmd>,
72    task: tokio::task::JoinHandle<()>,
73}
74
75impl Drop for TemporalTaskGuard {
76    fn drop(&mut self) {
77        // _tx drops first (field order), closing the channel.
78        // Abort as fallback in case the task is stuck on a non-channel await.
79        self.task.abort();
80    }
81}
82
83/// Simple abort-on-drop for tasks with no pending handle state (e.g. interval).
84struct AbortOnDrop(tokio::task::JoinHandle<()>);
85
86impl Drop for AbortOnDrop {
87    fn drop(&mut self) {
88        self.0.abort();
89    }
90}
91
92// =========================================================================
93// sample(source, notifier) — pure reactive, no timer
94// =========================================================================
95
96/// Emits the source's latest DATA each time `notifier` delivers DATA.
97///
98/// - Source COMPLETE clears stored value; subsequent notifier DATAs no-op.
99/// - Notifier COMPLETE terminates the sample node.
100/// - Either dep ERROR terminates immediately.
101#[must_use]
102#[allow(clippy::too_many_lines)]
103pub fn sample(
104    core: &Core,
105    binding: &Arc<dyn ProducerBinding>,
106    source: NodeId,
107    notifier: NodeId,
108) -> NodeId {
109    let core_weak = core.weak_handle();
110    let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
111
112    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
113        let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
114            return;
115        };
116        let pid = ctx.node_id();
117        let state: Arc<Mutex<SampleState>> = Arc::new(Mutex::new(SampleState::default()));
118
119        // --- source sink ---
120        let st = state.clone();
121        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
122        let core_src = core_s.clone();
123        let source_sink: Sink = Arc::new(move |msgs| {
124            enum Act {
125                Release(HandleId),
126                Error(HandleId),
127            }
128            let mut actions: SmallVec<[Act; 2]> = SmallVec::new();
129            {
130                let mut s = st.lock();
131                if s.terminated {
132                    return;
133                }
134                for m in msgs {
135                    match m.tier() {
136                        3 => {
137                            if let Some(h) = m.payload_handle() {
138                                if let Some(old) = s.latest.replace(h) {
139                                    actions.push(Act::Release(old));
140                                }
141                                bb.retain_handle(h);
142                            }
143                        }
144                        5 => {
145                            if let Some(h) = m.payload_handle() {
146                                s.terminated = true;
147                                if let Some(old) = s.latest.take() {
148                                    actions.push(Act::Release(old));
149                                }
150                                bb.retain_handle(h);
151                                actions.push(Act::Error(h));
152                            } else {
153                                s.source_completed = true;
154                                if let Some(old) = s.latest.take() {
155                                    actions.push(Act::Release(old));
156                                }
157                            }
158                        }
159                        _ => {}
160                    }
161                }
162            }
163            for a in actions {
164                match a {
165                    Act::Release(h) => bb.release_handle(h),
166                    Act::Error(h) => core_src.error_or_defer(pid, h),
167                }
168            }
169        });
170
171        let src_outcome = ctx.subscribe_to(source, source_sink);
172        if matches!(src_outcome, SubscribeOutcome::Dead { .. }) {
173            state.lock().source_completed = true;
174        }
175
176        // --- notifier sink ---
177        let st2 = state.clone();
178        let core_n = core_s.clone();
179        let bb2: Arc<dyn BindingBoundary> = binding_s.clone();
180        let notifier_sink: Sink = Arc::new(move |msgs| {
181            let mut s = st2.lock();
182            if s.terminated {
183                return;
184            }
185            for m in msgs {
186                if s.terminated {
187                    return;
188                }
189                match m.tier() {
190                    3 if m.payload_handle().is_some()
191                        && !s.source_completed
192                        && s.latest.is_some() =>
193                    {
194                        let h = s.latest.unwrap();
195                        bb2.retain_handle(h);
196                        drop(s);
197                        core_n.emit_or_defer(pid, h);
198                        // Re-acquire lock and continue processing remaining
199                        // batch messages (e.g. a trailing Complete).
200                        s = st2.lock();
201                    }
202                    5 => {
203                        if let Some(h) = m.payload_handle() {
204                            s.terminated = true;
205                            if let Some(old) = s.latest.take() {
206                                bb2.release_handle(old);
207                            }
208                            bb2.retain_handle(h);
209                            drop(s);
210                            core_n.error_or_defer(pid, h);
211                            return;
212                        }
213                        // Notifier COMPLETE → self-complete.
214                        s.terminated = true;
215                        if let Some(old) = s.latest.take() {
216                            bb2.release_handle(old);
217                        }
218                        drop(s);
219                        core_n.complete_or_defer(pid);
220                        return;
221                    }
222                    _ => {}
223                }
224            }
225        });
226
227        let not_outcome = ctx.subscribe_to(notifier, notifier_sink);
228        if matches!(not_outcome, SubscribeOutcome::Dead { .. }) {
229            let mut s = state.lock();
230            if !s.terminated {
231                s.terminated = true;
232                if let Some(old) = s.latest.take() {
233                    binding_s.release_handle(old);
234                }
235                drop(s);
236                core_s.complete_or_defer(pid);
237            }
238        }
239    });
240
241    let fn_id = binding.register_producer_build(build);
242    core.register_producer(fn_id)
243        .expect("sample: register_producer failed")
244}
245
246#[derive(Default)]
247struct SampleState {
248    latest: Option<HandleId>,
249    source_completed: bool,
250    terminated: bool,
251}
252
253// =========================================================================
254// debounce(source, ms)
255// =========================================================================
256
257/// Emits after `delay` of quiet — each new upstream DATA resets the timer.
258///
259/// On upstream COMPLETE, flushes pending (if any) then completes.
260/// On upstream ERROR, cancels and propagates immediately.
261#[must_use]
262pub fn debounce(
263    core: &Core,
264    binding: &Arc<dyn ProducerBinding>,
265    source: NodeId,
266    ms: u64,
267) -> NodeId {
268    let core_weak = core.weak_handle();
269    let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
270    let delay = Duration::from_millis(ms);
271
272    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
273        let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
274            return;
275        };
276        let pid = ctx.node_id();
277        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
278
279        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
280        let tx_sink = tx.clone();
281        let tx_dead = tx.clone();
282        let task = tokio::spawn(debounce_task(
283            rx,
284            core_s.weak_handle(),
285            pid,
286            bb.clone(),
287            delay,
288        ));
289
290        // Store guard: drops tx (clean shutdown) then aborts task (fallback).
291        {
292            let mut storage = binding_s.producer_storage().lock();
293            let entry = storage.entry(pid).or_default();
294            entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
295        }
296
297        let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
298        let source_sink: Sink = Arc::new(move |msgs| {
299            for m in msgs {
300                match m.tier() {
301                    3 => {
302                        if let Some(h) = m.payload_handle() {
303                            bb_sink.retain_handle(h);
304                            if tx_sink.send(TemporalCmd::Value(h)).is_err() {
305                                bb_sink.release_handle(h);
306                            }
307                        }
308                    }
309                    5 => {
310                        if let Some(h) = m.payload_handle() {
311                            bb_sink.retain_handle(h);
312                            if tx_sink.send(TemporalCmd::Error(h)).is_err() {
313                                bb_sink.release_handle(h);
314                            }
315                        } else {
316                            let _ = tx_sink.send(TemporalCmd::Complete);
317                        }
318                    }
319                    _ => {}
320                }
321            }
322        });
323
324        let outcome = ctx.subscribe_to(source, source_sink);
325        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
326            let _ = tx_dead.send(TemporalCmd::Complete);
327        }
328    });
329
330    let fn_id = binding.register_producer_build(build);
331    core.register_producer(fn_id)
332        .expect("debounce: register_producer failed")
333}
334
335async fn debounce_task(
336    mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
337    core: WeakCore,
338    pid: NodeId,
339    binding: Arc<dyn BindingBoundary>,
340    delay: Duration,
341) {
342    let mut pending: Option<HandleId> = None;
343
344    loop {
345        if let Some(h) = pending {
346            tokio::select! {
347                biased;
348                cmd = rx.recv() => {
349                    match cmd {
350                        Some(TemporalCmd::Value(new_h)) => {
351                            binding.release_handle(h);
352                            pending = Some(new_h);
353                        }
354                        Some(TemporalCmd::Complete) => {
355                            // Flush pending then complete.
356                            if let Some(c) = core.upgrade() {
357                                c.emit_or_defer(pid, h);
358                                c.complete_or_defer(pid);
359                            } else {
360                                binding.release_handle(h);
361                            }
362                            return;
363                        }
364                        Some(TemporalCmd::Error(err_h)) => {
365                            binding.release_handle(h);
366                            if let Some(c) = core.upgrade() {
367                                c.error_or_defer(pid, err_h);
368                            } else {
369                                binding.release_handle(err_h);
370                            }
371                            return;
372                        }
373                        None => {
374                            // Channel closed — deactivated.
375                            binding.release_handle(h);
376                            return;
377                        }
378                    }
379                }
380                () = tokio::time::sleep(delay) => {
381                    // Timer fired — emit pending.
382                    if let Some(c) = core.upgrade() {
383                        c.emit_or_defer(pid, h);
384                    } else {
385                        binding.release_handle(h);
386                        return;
387                    }
388                    pending = None;
389                }
390            }
391        } else {
392            // No pending — wait for command.
393            match rx.recv().await {
394                Some(TemporalCmd::Value(h)) => {
395                    pending = Some(h);
396                }
397                Some(TemporalCmd::Complete) => {
398                    if let Some(c) = core.upgrade() {
399                        c.complete_or_defer(pid);
400                    }
401                    return;
402                }
403                Some(TemporalCmd::Error(err_h)) => {
404                    if let Some(c) = core.upgrade() {
405                        c.error_or_defer(pid, err_h);
406                    } else {
407                        binding.release_handle(err_h);
408                    }
409                    return;
410                }
411                None => return,
412            }
413        }
414    }
415}
416
417// =========================================================================
418// audit(source, ms)
419// =========================================================================
420
421/// On the first upstream DATA in each window, starts a `ms` timer. When
422/// the timer fires, emits the **latest** value received during the window.
423/// Subsequent DATA values within the window update the stored value but
424/// do NOT restart the timer.
425///
426/// Differs from [`debounce`]: debounce resets on each DATA (quiet-time);
427/// audit fires at a fixed interval after the first DATA in the window.
428#[must_use]
429pub fn audit(core: &Core, binding: &Arc<dyn ProducerBinding>, source: NodeId, ms: u64) -> NodeId {
430    let core_weak = core.weak_handle();
431    let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
432    let delay = Duration::from_millis(ms);
433
434    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
435        let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
436            return;
437        };
438        let pid = ctx.node_id();
439        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
440
441        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
442        let tx_sink = tx.clone();
443        let tx_dead = tx.clone();
444        let task = tokio::spawn(audit_task(rx, core_s.weak_handle(), pid, bb.clone(), delay));
445
446        {
447            let mut storage = binding_s.producer_storage().lock();
448            let entry = storage.entry(pid).or_default();
449            entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
450        }
451
452        let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
453        let source_sink: Sink = Arc::new(move |msgs| {
454            for m in msgs {
455                match m.tier() {
456                    3 => {
457                        if let Some(h) = m.payload_handle() {
458                            bb_sink.retain_handle(h);
459                            if tx_sink.send(TemporalCmd::Value(h)).is_err() {
460                                bb_sink.release_handle(h);
461                            }
462                        }
463                    }
464                    5 => {
465                        if let Some(h) = m.payload_handle() {
466                            bb_sink.retain_handle(h);
467                            if tx_sink.send(TemporalCmd::Error(h)).is_err() {
468                                bb_sink.release_handle(h);
469                            }
470                        } else {
471                            let _ = tx_sink.send(TemporalCmd::Complete);
472                        }
473                    }
474                    _ => {}
475                }
476            }
477        });
478
479        let outcome = ctx.subscribe_to(source, source_sink);
480        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
481            let _ = tx_dead.send(TemporalCmd::Complete);
482        }
483    });
484
485    let fn_id = binding.register_producer_build(build);
486    core.register_producer(fn_id)
487        .expect("audit: register_producer failed")
488}
489
490async fn audit_task(
491    mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
492    core: WeakCore,
493    pid: NodeId,
494    binding: Arc<dyn BindingBoundary>,
495    delay: Duration,
496) {
497    // Outer loop: wait for first DATA to start a window.
498    loop {
499        match rx.recv().await {
500            Some(TemporalCmd::Value(h)) => {
501                // First value in window — start timer, collect updates.
502                let mut latest = h;
503
504                tokio::select! {
505                    biased;
506                    // Drain commands until timer fires.
507                    () = async {
508                        loop {
509                            tokio::select! {
510                                biased;
511                                cmd = rx.recv() => {
512                                    match cmd {
513                                        Some(TemporalCmd::Value(new_h)) => {
514                                            binding.release_handle(latest);
515                                            latest = new_h;
516                                        }
517                                        Some(TemporalCmd::Complete) => {
518                                            // Emit latest, then complete.
519                                            if let Some(c) = core.upgrade() {
520                                                c.emit_or_defer(pid, latest);
521                                                c.complete_or_defer(pid);
522                                            } else {
523                                                binding.release_handle(latest);
524                                            }
525                                            return; // exits the async block
526                                        }
527                                        Some(TemporalCmd::Error(err_h)) => {
528                                            binding.release_handle(latest);
529                                            if let Some(c) = core.upgrade() {
530                                                c.error_or_defer(pid, err_h);
531                                            } else {
532                                                binding.release_handle(err_h);
533                                            }
534                                            return;
535                                        }
536                                        None => {
537                                            binding.release_handle(latest);
538                                            return;
539                                        }
540                                    }
541                                }
542                            }
543                        }
544                    } => {
545                        return; // Terminal command handled inside async block.
546                    }
547                    () = tokio::time::sleep(delay) => {
548                        // Timer fired — emit latest, window closes.
549                        if let Some(c) = core.upgrade() {
550                            c.emit_or_defer(pid, latest);
551                        } else {
552                            binding.release_handle(latest);
553                            return;
554                        }
555                        // Continue outer loop — wait for next window.
556                    }
557                }
558            }
559            Some(TemporalCmd::Complete) => {
560                if let Some(c) = core.upgrade() {
561                    c.complete_or_defer(pid);
562                }
563                return;
564            }
565            Some(TemporalCmd::Error(err_h)) => {
566                if let Some(c) = core.upgrade() {
567                    c.error_or_defer(pid, err_h);
568                } else {
569                    binding.release_handle(err_h);
570                }
571                return;
572            }
573            None => return,
574        }
575    }
576}
577
578// =========================================================================
579// delay(source, ms)
580// =========================================================================
581
582/// Delays each upstream DATA by `ms` milliseconds. Multiple values can
583/// be in-flight simultaneously, each with its own timer.
584///
585/// - COMPLETE waits for all pending delays, then completes.
586/// - ERROR cancels all pending and propagates immediately.
587#[must_use]
588pub fn delay(core: &Core, binding: &Arc<dyn ProducerBinding>, source: NodeId, ms: u64) -> NodeId {
589    let core_weak = core.weak_handle();
590    let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
591    let delay_dur = Duration::from_millis(ms);
592
593    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
594        let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
595            return;
596        };
597        let pid = ctx.node_id();
598        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
599
600        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
601        let tx_sink = tx.clone();
602        let tx_dead = tx.clone();
603        let task = tokio::spawn(delay_task(
604            rx,
605            core_s.weak_handle(),
606            pid,
607            bb.clone(),
608            delay_dur,
609        ));
610
611        {
612            let mut storage = binding_s.producer_storage().lock();
613            let entry = storage.entry(pid).or_default();
614            entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
615        }
616
617        let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
618        let source_sink: Sink = Arc::new(move |msgs| {
619            for m in msgs {
620                match m.tier() {
621                    3 => {
622                        if let Some(h) = m.payload_handle() {
623                            bb_sink.retain_handle(h);
624                            if tx_sink.send(TemporalCmd::Value(h)).is_err() {
625                                bb_sink.release_handle(h);
626                            }
627                        }
628                    }
629                    5 => {
630                        if let Some(h) = m.payload_handle() {
631                            bb_sink.retain_handle(h);
632                            if tx_sink.send(TemporalCmd::Error(h)).is_err() {
633                                bb_sink.release_handle(h);
634                            }
635                        } else {
636                            let _ = tx_sink.send(TemporalCmd::Complete);
637                        }
638                    }
639                    _ => {}
640                }
641            }
642        });
643
644        let outcome = ctx.subscribe_to(source, source_sink);
645        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
646            let _ = tx_dead.send(TemporalCmd::Complete);
647        }
648    });
649
650    let fn_id = binding.register_producer_build(build);
651    core.register_producer(fn_id)
652        .expect("delay: register_producer failed")
653}
654
655async fn delay_task(
656    mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
657    core: WeakCore,
658    pid: NodeId,
659    binding: Arc<dyn BindingBoundary>,
660    delay: Duration,
661) {
662    let mut queue: VecDeque<(tokio::time::Instant, HandleId)> = VecDeque::new();
663    let mut complete_pending = false;
664
665    loop {
666        let next_fire = queue.front().map(|(deadline, _)| *deadline);
667
668        tokio::select! {
669            biased;
670            cmd = rx.recv() => {
671                match cmd {
672                    Some(TemporalCmd::Value(h)) => {
673                        queue.push_back((tokio::time::Instant::now() + delay, h));
674                    }
675                    Some(TemporalCmd::Complete) => {
676                        complete_pending = true;
677                        if queue.is_empty() {
678                            if let Some(c) = core.upgrade() {
679                                c.complete_or_defer(pid);
680                            }
681                            return;
682                        }
683                        // Wait for pending timers to drain.
684                    }
685                    Some(TemporalCmd::Error(err_h)) => {
686                        // Release all pending.
687                        for (_, h) in queue.drain(..) {
688                            binding.release_handle(h);
689                        }
690                        if let Some(c) = core.upgrade() {
691                            c.error_or_defer(pid, err_h);
692                        } else {
693                            binding.release_handle(err_h);
694                        }
695                        return;
696                    }
697                    None => {
698                        for (_, h) in queue.drain(..) {
699                            binding.release_handle(h);
700                        }
701                        return;
702                    }
703                }
704            }
705            () = sleep_until_or_forever(next_fire) => {
706                // Fire all expired.
707                let now = tokio::time::Instant::now();
708                while let Some(&(deadline, _)) = queue.front() {
709                    if deadline <= now {
710                        let (_, h) = queue.pop_front().unwrap();
711                        if let Some(c) = core.upgrade() {
712                            c.emit_or_defer(pid, h);
713                        } else {
714                            binding.release_handle(h);
715                            for (_, h2) in queue.drain(..) {
716                                binding.release_handle(h2);
717                            }
718                            return;
719                        }
720                    } else {
721                        break;
722                    }
723                }
724                if complete_pending && queue.is_empty() {
725                    if let Some(c) = core.upgrade() {
726                        c.complete_or_defer(pid);
727                    }
728                    return;
729                }
730            }
731        }
732    }
733}
734
735// =========================================================================
736// throttle(source, ms, opts)
737// =========================================================================
738
739/// Options for [`throttle`].
740#[derive(Debug, Clone, Copy)]
741pub struct ThrottleOpts {
742    /// Emit immediately at the leading edge of each window (default: true).
743    pub leading: bool,
744    /// Emit the latest value at the trailing edge of each window (default: false).
745    pub trailing: bool,
746}
747
748impl Default for ThrottleOpts {
749    fn default() -> Self {
750        Self {
751            leading: true,
752            trailing: false,
753        }
754    }
755}
756
757/// Rate-limits upstream emissions to at most one per `ms`-millisecond window.
758///
759/// With default options (`leading: true, trailing: false`), the first DATA
760/// in each window is emitted immediately; subsequent DATA within the window
761/// is dropped. With `trailing: true`, the latest DATA is emitted at window end.
762///
763/// On upstream COMPLETE, flushes trailing (if any) then completes.
764#[must_use]
765pub fn throttle(
766    core: &Core,
767    binding: &Arc<dyn ProducerBinding>,
768    source: NodeId,
769    ms: u64,
770    opts: ThrottleOpts,
771) -> NodeId {
772    let core_weak = core.weak_handle();
773    let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
774    let window = Duration::from_millis(ms);
775
776    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
777        let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
778            return;
779        };
780        let pid = ctx.node_id();
781        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
782
783        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
784        let tx_sink = tx.clone();
785        let tx_dead = tx.clone();
786        let task = tokio::spawn(throttle_task(
787            rx,
788            core_s.weak_handle(),
789            pid,
790            bb.clone(),
791            window,
792            opts,
793        ));
794
795        {
796            let mut storage = binding_s.producer_storage().lock();
797            let entry = storage.entry(pid).or_default();
798            entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
799        }
800
801        let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
802        let source_sink: Sink = Arc::new(move |msgs| {
803            for m in msgs {
804                match m.tier() {
805                    3 => {
806                        if let Some(h) = m.payload_handle() {
807                            bb_sink.retain_handle(h);
808                            if tx_sink.send(TemporalCmd::Value(h)).is_err() {
809                                bb_sink.release_handle(h);
810                            }
811                        }
812                    }
813                    5 => {
814                        if let Some(h) = m.payload_handle() {
815                            bb_sink.retain_handle(h);
816                            if tx_sink.send(TemporalCmd::Error(h)).is_err() {
817                                bb_sink.release_handle(h);
818                            }
819                        } else {
820                            let _ = tx_sink.send(TemporalCmd::Complete);
821                        }
822                    }
823                    _ => {}
824                }
825            }
826        });
827
828        let outcome = ctx.subscribe_to(source, source_sink);
829        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
830            let _ = tx_dead.send(TemporalCmd::Complete);
831        }
832    });
833
834    let fn_id = binding.register_producer_build(build);
835    core.register_producer(fn_id)
836        .expect("throttle: register_producer failed")
837}
838
839async fn throttle_task(
840    mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
841    core: WeakCore,
842    pid: NodeId,
843    binding: Arc<dyn BindingBoundary>,
844    window: Duration,
845    opts: ThrottleOpts,
846) {
847    let mut trailing_pending: Option<HandleId> = None;
848    let mut window_deadline: Option<tokio::time::Instant> = None;
849
850    loop {
851        let fire_at = if opts.trailing { window_deadline } else { None };
852
853        tokio::select! {
854            biased;
855            cmd = rx.recv() => {
856                match cmd {
857                    Some(TemporalCmd::Value(h)) => {
858                        let in_window = window_deadline
859                            .is_some_and(|d| tokio::time::Instant::now() < d);
860
861                        if !in_window {
862                            // Window open — start new window.
863                            window_deadline = Some(tokio::time::Instant::now() + window);
864                            if opts.leading {
865                                if let Some(c) = core.upgrade() {
866                                    c.emit_or_defer(pid, h);
867                                } else {
868                                    binding.release_handle(h);
869                                    release_opt(&mut trailing_pending, &*binding);
870                                    return;
871                                }
872                            } else if opts.trailing {
873                                if let Some(old) = trailing_pending.replace(h) {
874                                    binding.release_handle(old);
875                                }
876                            } else {
877                                binding.release_handle(h);
878                            }
879                        } else if opts.trailing {
880                            // Inside window — store for trailing.
881                            if let Some(old) = trailing_pending.replace(h) {
882                                binding.release_handle(old);
883                            }
884                        } else {
885                            binding.release_handle(h);
886                        }
887                    }
888                    Some(TemporalCmd::Complete) => {
889                        if let Some(h) = trailing_pending.take() {
890                            if let Some(c) = core.upgrade() {
891                                c.emit_or_defer(pid, h);
892                                c.complete_or_defer(pid);
893                            } else {
894                                binding.release_handle(h);
895                            }
896                        } else if let Some(c) = core.upgrade() {
897                            c.complete_or_defer(pid);
898                        }
899                        return;
900                    }
901                    Some(TemporalCmd::Error(err_h)) => {
902                        release_opt(&mut trailing_pending, &*binding);
903                        if let Some(c) = core.upgrade() {
904                            c.error_or_defer(pid, err_h);
905                        } else {
906                            binding.release_handle(err_h);
907                        }
908                        return;
909                    }
910                    None => {
911                        release_opt(&mut trailing_pending, &*binding);
912                        return;
913                    }
914                }
915            }
916            () = sleep_until_or_forever(fire_at) => {
917                // Window expired — emit trailing if any, reopen window.
918                window_deadline = None;
919                if let Some(h) = trailing_pending.take() {
920                    if let Some(c) = core.upgrade() {
921                        c.emit_or_defer(pid, h);
922                        // Start new window for the trailing emission.
923                        window_deadline = Some(tokio::time::Instant::now() + window);
924                    } else {
925                        binding.release_handle(h);
926                        return;
927                    }
928                }
929            }
930        }
931    }
932}
933
934// =========================================================================
935// interval(period_ms) — timer source
936// =========================================================================
937
938/// Source that emits a monotonically increasing counter (`1, 2, 3, …`)
939/// every `period_ms` milliseconds. Counter starts at 1 (not 0) because
940/// `HandleId(0)` is the `NO_HANDLE` sentinel. Resubscribable: counter
941/// resets on deactivation + reactivation.
942///
943/// The emitted values use raw `HandleId::new(counter)`. Bindings should
944/// register these as integer handles.
945#[must_use]
946pub fn interval(core: &Core, binding: &Arc<dyn ProducerBinding>, period_ms: u64) -> NodeId {
947    let core_weak = core.weak_handle();
948    let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
949    let period = Duration::from_millis(period_ms);
950
951    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
952        let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
953            return;
954        };
955        let pid = ctx.node_id();
956        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
957        let weak = core_s.weak_handle();
958
959        let task = tokio::spawn(async move {
960            let mut ticker = tokio::time::interval(period);
961            ticker.tick().await; // First tick is immediate — skip it.
962            let mut counter: u64 = 1; // Start at 1: HandleId(0) = NO_HANDLE.
963            loop {
964                ticker.tick().await;
965                if let Some(c) = weak.upgrade() {
966                    let h = HandleId::new(counter);
967                    bb.retain_handle(h);
968                    c.emit_or_defer(pid, h);
969                    counter += 1;
970                } else {
971                    break;
972                }
973            }
974        });
975
976        {
977            let mut storage = binding_s.producer_storage().lock();
978            let entry = storage.entry(pid).or_default();
979            entry.op_state = Some(Box::new(AbortOnDrop(task)));
980        }
981    });
982
983    let fn_id = binding.register_producer_build(build);
984    core.register_producer(fn_id)
985        .expect("interval: register_producer failed")
986}
987
988// =========================================================================
989// Helpers
990// =========================================================================
991
992/// Sleep until the given instant, or sleep forever if `None`.
993async fn sleep_until_or_forever(deadline: Option<tokio::time::Instant>) {
994    match deadline {
995        Some(d) => tokio::time::sleep_until(d).await,
996        None => std::future::pending::<()>().await,
997    }
998}
999
1000/// Release an optional handle and set it to `None`.
1001fn release_opt(opt: &mut Option<HandleId>, binding: &dyn BindingBoundary) {
1002    if let Some(h) = opt.take() {
1003        binding.release_handle(h);
1004    }
1005}
1006
1007// =========================================================================
1008// timeout(source, ms, error_handle)
1009// =========================================================================
1010
1011/// Errors if no upstream DATA arrives within `ms` milliseconds after
1012/// subscribe or after the previous DATA. Each DATA resets the timer.
1013///
1014/// On timeout: retains `error_handle` and emits ERROR with it.
1015/// On upstream COMPLETE: cancels timer, forwards complete.
1016/// On upstream ERROR: cancels timer, forwards error.
1017#[must_use]
1018pub fn timeout(
1019    core: &Core,
1020    binding: &Arc<dyn ProducerBinding>,
1021    source: NodeId,
1022    ms: u64,
1023    error_handle: HandleId,
1024) -> NodeId {
1025    let core_weak = core.weak_handle();
1026    let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
1027    let duration = Duration::from_millis(ms);
1028
1029    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
1030        let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
1031            return;
1032        };
1033        let pid = ctx.node_id();
1034        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
1035
1036        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
1037        let tx_sink = tx.clone();
1038        let tx_dead = tx.clone();
1039        let task = tokio::spawn(timeout_task(
1040            rx,
1041            core_s.weak_handle(),
1042            pid,
1043            bb.clone(),
1044            duration,
1045            error_handle,
1046        ));
1047
1048        {
1049            let mut storage = binding_s.producer_storage().lock();
1050            let entry = storage.entry(pid).or_default();
1051            entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
1052        }
1053
1054        let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
1055        let source_sink: Sink = Arc::new(move |msgs| {
1056            for m in msgs {
1057                match m.tier() {
1058                    3 => {
1059                        if let Some(h) = m.payload_handle() {
1060                            bb_sink.retain_handle(h);
1061                            if tx_sink.send(TemporalCmd::Value(h)).is_err() {
1062                                bb_sink.release_handle(h);
1063                            }
1064                        }
1065                    }
1066                    5 => {
1067                        if let Some(h) = m.payload_handle() {
1068                            bb_sink.retain_handle(h);
1069                            if tx_sink.send(TemporalCmd::Error(h)).is_err() {
1070                                bb_sink.release_handle(h);
1071                            }
1072                        } else {
1073                            let _ = tx_sink.send(TemporalCmd::Complete);
1074                        }
1075                    }
1076                    _ => {}
1077                }
1078            }
1079        });
1080
1081        let outcome = ctx.subscribe_to(source, source_sink);
1082        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
1083            let _ = tx_dead.send(TemporalCmd::Complete);
1084        }
1085    });
1086
1087    let fn_id = binding.register_producer_build(build);
1088    core.register_producer(fn_id)
1089        .expect("timeout: register_producer failed")
1090}
1091
1092async fn timeout_task(
1093    mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
1094    core: WeakCore,
1095    pid: NodeId,
1096    binding: Arc<dyn BindingBoundary>,
1097    duration: Duration,
1098    error_handle: HandleId,
1099) {
1100    // The timer starts immediately on subscribe — if no DATA arrives
1101    // within `duration`, we fire the timeout error.
1102    loop {
1103        tokio::select! {
1104            biased;
1105            cmd = rx.recv() => {
1106                match cmd {
1107                    Some(TemporalCmd::Value(h)) => {
1108                        // DATA arrived — forward it and reset the timer
1109                        // (the next loop iteration restarts the sleep).
1110                        if let Some(c) = core.upgrade() {
1111                            c.emit_or_defer(pid, h);
1112                        } else {
1113                            binding.release_handle(h);
1114                            return;
1115                        }
1116                        // Continue loop → resets the sleep timer.
1117                    }
1118                    Some(TemporalCmd::Complete) => {
1119                        if let Some(c) = core.upgrade() {
1120                            c.complete_or_defer(pid);
1121                        }
1122                        return;
1123                    }
1124                    Some(TemporalCmd::Error(err_h)) => {
1125                        if let Some(c) = core.upgrade() {
1126                            c.error_or_defer(pid, err_h);
1127                        } else {
1128                            binding.release_handle(err_h);
1129                        }
1130                        return;
1131                    }
1132                    None => return,
1133                }
1134            }
1135            () = tokio::time::sleep(duration) => {
1136                // Timeout fired — emit error.
1137                if let Some(c) = core.upgrade() {
1138                    binding.retain_handle(error_handle);
1139                    c.error_or_defer(pid, error_handle);
1140                }
1141                return;
1142            }
1143        }
1144    }
1145}
1146
1147// =========================================================================
1148// buffer_time(source, ms, pack_fn_id)
1149// =========================================================================
1150
1151/// Command sent from the `buffer_time` sink to its async task.
1152enum BufferTimeCmd {
1153    /// New DATA handle from upstream. Already retained by the sink.
1154    Value(HandleId),
1155    /// Upstream COMPLETE.
1156    Complete,
1157    /// Upstream ERROR. Handle already retained.
1158    Error(HandleId),
1159}
1160
1161/// Collects upstream DATA handles into a buffer and flushes them as a
1162/// packed tuple every `ms` milliseconds.
1163///
1164/// - On interval tick: packs buffered handles via
1165///   `binding.pack_tuple(pack_fn_id, &buf)`, emits the result, releases
1166///   individual handles, clears buffer.
1167/// - On upstream COMPLETE: flushes remaining buffer, then completes.
1168/// - On upstream ERROR: releases buffered handles, forwards error.
1169#[must_use]
1170pub fn buffer_time(
1171    core: &Core,
1172    binding: &Arc<dyn ProducerBinding>,
1173    source: NodeId,
1174    ms: u64,
1175    pack_fn_id: FnId,
1176) -> NodeId {
1177    let core_weak = core.weak_handle();
1178    let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
1179    let period = Duration::from_millis(ms);
1180
1181    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
1182        let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
1183            return;
1184        };
1185        let pid = ctx.node_id();
1186        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
1187
1188        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
1189        let tx_sink = tx.clone();
1190        let tx_dead = tx.clone();
1191        let task = tokio::spawn(buffer_time_task(
1192            rx,
1193            core_s.weak_handle(),
1194            pid,
1195            bb.clone(),
1196            period,
1197            pack_fn_id,
1198        ));
1199
1200        {
1201            let mut storage = binding_s.producer_storage().lock();
1202            let entry = storage.entry(pid).or_default();
1203            entry.op_state = Some(Box::new(BufferTimeTaskGuard { _tx: tx, task }));
1204        }
1205
1206        let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
1207        let source_sink: Sink = Arc::new(move |msgs| {
1208            for m in msgs {
1209                match m.tier() {
1210                    3 => {
1211                        if let Some(h) = m.payload_handle() {
1212                            bb_sink.retain_handle(h);
1213                            if tx_sink.send(BufferTimeCmd::Value(h)).is_err() {
1214                                bb_sink.release_handle(h);
1215                            }
1216                        }
1217                    }
1218                    5 => {
1219                        if let Some(h) = m.payload_handle() {
1220                            bb_sink.retain_handle(h);
1221                            if tx_sink.send(BufferTimeCmd::Error(h)).is_err() {
1222                                bb_sink.release_handle(h);
1223                            }
1224                        } else {
1225                            let _ = tx_sink.send(BufferTimeCmd::Complete);
1226                        }
1227                    }
1228                    _ => {}
1229                }
1230            }
1231        });
1232
1233        let outcome = ctx.subscribe_to(source, source_sink);
1234        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
1235            let _ = tx_dead.send(BufferTimeCmd::Complete);
1236        }
1237    });
1238
1239    let fn_id = binding.register_producer_build(build);
1240    core.register_producer(fn_id)
1241        .expect("buffer_time: register_producer failed")
1242}
1243
1244/// RAII guard for `buffer_time` task — same pattern as [`TemporalTaskGuard`]
1245/// but with `BufferTimeCmd` channel.
1246struct BufferTimeTaskGuard {
1247    _tx: tokio::sync::mpsc::UnboundedSender<BufferTimeCmd>,
1248    task: tokio::task::JoinHandle<()>,
1249}
1250
1251impl Drop for BufferTimeTaskGuard {
1252    fn drop(&mut self) {
1253        self.task.abort();
1254    }
1255}
1256
1257async fn buffer_time_task(
1258    mut rx: tokio::sync::mpsc::UnboundedReceiver<BufferTimeCmd>,
1259    core: WeakCore,
1260    pid: NodeId,
1261    binding: Arc<dyn BindingBoundary>,
1262    period: Duration,
1263    pack_fn_id: FnId,
1264) {
1265    let mut buf: Vec<HandleId> = Vec::new();
1266    let mut ticker = tokio::time::interval(period);
1267    ticker.tick().await; // First tick is immediate — skip it.
1268
1269    loop {
1270        tokio::select! {
1271            biased;
1272            cmd = rx.recv() => {
1273                match cmd {
1274                    Some(BufferTimeCmd::Value(h)) => {
1275                        buf.push(h);
1276                    }
1277                    Some(BufferTimeCmd::Complete) => {
1278                        // Flush remaining buffer then complete.
1279                        if !buf.is_empty() {
1280                            if let Some(c) = core.upgrade() {
1281                                let packed = binding.pack_tuple(pack_fn_id, &buf);
1282                                c.emit_or_defer(pid, packed);
1283                            }
1284                            for h in buf.drain(..) {
1285                                binding.release_handle(h);
1286                            }
1287                        }
1288                        if let Some(c) = core.upgrade() {
1289                            c.complete_or_defer(pid);
1290                        }
1291                        return;
1292                    }
1293                    Some(BufferTimeCmd::Error(err_h)) => {
1294                        for h in buf.drain(..) {
1295                            binding.release_handle(h);
1296                        }
1297                        if let Some(c) = core.upgrade() {
1298                            c.error_or_defer(pid, err_h);
1299                        } else {
1300                            binding.release_handle(err_h);
1301                        }
1302                        return;
1303                    }
1304                    None => {
1305                        // Channel closed — deactivated. Release buffered.
1306                        for h in buf.drain(..) {
1307                            binding.release_handle(h);
1308                        }
1309                        return;
1310                    }
1311                }
1312            }
1313            _ = ticker.tick() => {
1314                if !buf.is_empty() {
1315                    if let Some(c) = core.upgrade() {
1316                        let packed = binding.pack_tuple(pack_fn_id, &buf);
1317                        c.emit_or_defer(pid, packed);
1318                        for h in buf.drain(..) {
1319                            binding.release_handle(h);
1320                        }
1321                    } else {
1322                        for h in buf.drain(..) {
1323                            binding.release_handle(h);
1324                        }
1325                        return;
1326                    }
1327                }
1328            }
1329        }
1330    }
1331}
1332
1333// =========================================================================
1334// window_time(source, ms)
1335// =========================================================================
1336
1337/// Command sent from the `window_time` sink to its async task.
1338enum WindowTimeCmd {
1339    /// New DATA handle from upstream. Already retained by the sink.
1340    Value(HandleId),
1341    /// Upstream COMPLETE.
1342    Complete,
1343    /// Upstream ERROR. Handle already retained.
1344    Error(HandleId),
1345}
1346
1347/// Rotates sub-node windows every `ms` milliseconds.
1348///
1349/// Creates a new inner state node each interval tick. Upstream DATA is
1350/// forwarded to the current inner window node. On tick, the current
1351/// window is completed, a new inner node is created, and its identity
1352/// is emitted as a DATA handle (via `binding.intern_node`).
1353///
1354/// - On upstream COMPLETE: completes current window, then completes self.
1355/// - On upstream ERROR: errors current window, then errors self.
1356#[must_use]
1357pub fn window_time(
1358    core: &Core,
1359    binding: &Arc<dyn ProducerBinding>,
1360    source: NodeId,
1361    ms: u64,
1362) -> NodeId {
1363    let core_weak = core.weak_handle();
1364    let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
1365    let period = Duration::from_millis(ms);
1366
1367    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
1368        let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
1369            return;
1370        };
1371        let pid = ctx.node_id();
1372        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
1373
1374        // Pre-register a reusable no-op FnId for creating inner window
1375        // nodes. The async task reuses this FnId for every window rotation
1376        // so it only needs Core (via WeakCore), not ProducerBinding.
1377        let noop_fn_id = binding_s.register_producer_build(Box::new(|_ctx: ProducerCtx<'_>| {
1378            // Inner window nodes are passive — all emissions are driven by
1379            // the parent window_time task via emit_or_defer / complete_or_defer.
1380        }));
1381
1382        // Create the first inner window node and emit its handle.
1383        let first_inner = core_s
1384            .register_producer(noop_fn_id)
1385            .expect("window_time inner: register_producer failed");
1386        let first_handle = bb.intern_node(first_inner);
1387        core_s.emit_or_defer(pid, first_handle);
1388
1389        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
1390        let tx_sink = tx.clone();
1391        let tx_dead = tx.clone();
1392        let task = tokio::spawn(window_time_task(
1393            rx,
1394            core_s.weak_handle(),
1395            pid,
1396            first_inner,
1397            bb.clone(),
1398            period,
1399            noop_fn_id,
1400        ));
1401
1402        {
1403            let mut storage = binding_s.producer_storage().lock();
1404            let entry = storage.entry(pid).or_default();
1405            entry.op_state = Some(Box::new(WindowTimeTaskGuard { _tx: tx, task }));
1406        }
1407
1408        let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
1409        let source_sink: Sink = Arc::new(move |msgs| {
1410            for m in msgs {
1411                match m.tier() {
1412                    3 => {
1413                        if let Some(h) = m.payload_handle() {
1414                            bb_sink.retain_handle(h);
1415                            if tx_sink.send(WindowTimeCmd::Value(h)).is_err() {
1416                                bb_sink.release_handle(h);
1417                            }
1418                        }
1419                    }
1420                    5 => {
1421                        if let Some(h) = m.payload_handle() {
1422                            bb_sink.retain_handle(h);
1423                            if tx_sink.send(WindowTimeCmd::Error(h)).is_err() {
1424                                bb_sink.release_handle(h);
1425                            }
1426                        } else {
1427                            let _ = tx_sink.send(WindowTimeCmd::Complete);
1428                        }
1429                    }
1430                    _ => {}
1431                }
1432            }
1433        });
1434
1435        let outcome = ctx.subscribe_to(source, source_sink);
1436        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
1437            let _ = tx_dead.send(WindowTimeCmd::Complete);
1438        }
1439    });
1440
1441    let fn_id = binding.register_producer_build(build);
1442    core.register_producer(fn_id)
1443        .expect("window_time: register_producer failed")
1444}
1445
1446/// RAII guard for `window_time` task.
1447struct WindowTimeTaskGuard {
1448    _tx: tokio::sync::mpsc::UnboundedSender<WindowTimeCmd>,
1449    task: tokio::task::JoinHandle<()>,
1450}
1451
1452impl Drop for WindowTimeTaskGuard {
1453    fn drop(&mut self) {
1454        self.task.abort();
1455    }
1456}
1457
1458async fn window_time_task(
1459    mut rx: tokio::sync::mpsc::UnboundedReceiver<WindowTimeCmd>,
1460    core: WeakCore,
1461    pid: NodeId,
1462    initial_inner: NodeId,
1463    binding: Arc<dyn BindingBoundary>,
1464    period: Duration,
1465    noop_fn_id: FnId,
1466) {
1467    let mut current_inner = initial_inner;
1468    let mut ticker = tokio::time::interval(period);
1469    ticker.tick().await; // First tick is immediate — skip it.
1470
1471    loop {
1472        tokio::select! {
1473            biased;
1474            cmd = rx.recv() => {
1475                match cmd {
1476                    Some(WindowTimeCmd::Value(h)) => {
1477                        // Forward DATA to current inner window.
1478                        if let Some(c) = core.upgrade() {
1479                            c.emit_or_defer(current_inner, h);
1480                        } else {
1481                            binding.release_handle(h);
1482                            return;
1483                        }
1484                    }
1485                    Some(WindowTimeCmd::Complete) => {
1486                        // Complete current inner window, then complete self.
1487                        if let Some(c) = core.upgrade() {
1488                            c.complete_or_defer(current_inner);
1489                            c.complete_or_defer(pid);
1490                        }
1491                        return;
1492                    }
1493                    Some(WindowTimeCmd::Error(err_h)) => {
1494                        // Error current inner window, then error self.
1495                        if let Some(c) = core.upgrade() {
1496                            binding.retain_handle(err_h);
1497                            c.error_or_defer(current_inner, err_h);
1498                            // err_h already retained once by sink + once above;
1499                            // the inner node consumes one retain, self consumes the other.
1500                            c.error_or_defer(pid, err_h);
1501                        } else {
1502                            binding.release_handle(err_h);
1503                        }
1504                        return;
1505                    }
1506                    None => {
1507                        // Channel closed — deactivated.
1508                        return;
1509                    }
1510                }
1511            }
1512            _ = ticker.tick() => {
1513                // Window rotation: complete current, create new, emit handle.
1514                if let Some(c) = core.upgrade() {
1515                    c.complete_or_defer(current_inner);
1516
1517                    // Create a new inner window node using the pre-registered
1518                    // noop FnId. Each inner node is a passive producer whose
1519                    // emissions are driven externally by this task.
1520                    if let Ok(new_inner) = c.register_producer(noop_fn_id) {
1521                        current_inner = new_inner;
1522                        let h = binding.intern_node(new_inner);
1523                        c.emit_or_defer(pid, h);
1524                    } else {
1525                        // Registration failed — complete self and exit.
1526                        c.complete_or_defer(pid);
1527                        return;
1528                    }
1529                } else {
1530                    return;
1531                }
1532            }
1533        }
1534    }
1535}