Skip to main content

graphrefly_operators/
buffer.rs

1// D248: post-S2c the substrate is `!Send + !Sync` single-owner Core; the
2// Sink/TopologySink callbacks were deliberately relaxed to `Arc<dyn Fn>`
3// (dropped `+ Send + Sync`). Rc would suffice and is the architecturally
4// correct type for inherently single-owner sinks — the Arc→Rc cleanup is
5// a separate slice tracked in porting-deferred.md. Until then, `Arc` is
6// over-conservative but correct, and this file's Arc<Sink> sites cite
7// the deliberate D248 relaxation, not a missed Send+Sync bound.
8#![allow(clippy::arc_with_non_send_sync)]
9
10//! Buffer operators — collect and batch reactive values.
11//!
12//! # Operators
13//!
14//! - [`buffer`] — notifier-triggered flush of accumulated DATA handles.
15//! - [`buffer_count`] — fixed-count flush.
16//! - [`window`] — notifier-triggered sub-node splitting.
17//! - [`window_count`] — count-based sub-node splitting.
18
19use std::sync::Arc;
20
21use parking_lot::Mutex;
22
23use graphrefly_core::{BindingBoundary, Core, FnId, HandleId, NodeId, Sink};
24use smallvec::SmallVec;
25
26use crate::producer::{ProducerBinding, ProducerBuildFn, ProducerCtx, SubscribeOutcome};
27
28// =========================================================================
29// buffer(source, notifier) — notifier-triggered flush
30// =========================================================================
31
32/// Per-buffer-node shared state.
33struct BufferState {
34    buf: Vec<HandleId>,
35    terminated: bool,
36    source_completed: bool,
37}
38
39impl BufferState {
40    fn new() -> Self {
41        Self {
42            buf: Vec::new(),
43            terminated: false,
44            source_completed: false,
45        }
46    }
47}
48
49/// Buffers source DATA handles; flushes as a packed array when notifier
50/// emits DATA.
51///
52/// - Source DATA: retain + push to buffer.
53/// - Notifier DATA: pack buffer via `pack_tuple`, emit, clear, release
54///   component handles.
55/// - Source COMPLETE: flush remaining buffer (if non-empty), then complete.
56/// - Either ERROR: terminate immediately, release buffered handles.
57/// - Notifier COMPLETE: does NOT auto-complete (keep buffering; source
58///   COMPLETE triggers final flush).
59#[must_use]
60#[allow(clippy::too_many_lines)]
61pub fn buffer(
62    core: &Core,
63    binding: &Arc<dyn ProducerBinding>,
64    source: NodeId,
65    notifier: NodeId,
66    pack_fn_id: FnId,
67) -> NodeId {
68    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
69        let core_s = ctx.core();
70        let binding_s = ctx.core().binding();
71        let em = ctx.emitter();
72        let pid = ctx.node_id();
73        let state: Arc<Mutex<BufferState>> = Arc::new(Mutex::new(BufferState::new()));
74
75        // --- source sink ---
76        let st = state.clone();
77        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
78        let core_src = em.clone();
79        let source_sink: Sink = Arc::new(move |msgs| {
80            enum Act {
81                Flush(Vec<HandleId>),
82                Complete,
83                Error(HandleId),
84                Release(Vec<HandleId>),
85            }
86            let mut actions: SmallVec<[Act; 2]> = SmallVec::new();
87            {
88                let mut s = st.lock();
89                if s.terminated {
90                    return;
91                }
92                for m in msgs {
93                    if s.terminated {
94                        break;
95                    }
96                    match m.tier() {
97                        3 => {
98                            if let Some(h) = m.payload_handle() {
99                                bb.retain_handle(h);
100                                s.buf.push(h);
101                            }
102                        }
103                        5 => {
104                            if let Some(h) = m.payload_handle() {
105                                // ERROR — terminate, release buffered
106                                s.terminated = true;
107                                bb.retain_handle(h);
108                                let to_release: Vec<HandleId> = s.buf.drain(..).collect();
109                                actions.push(Act::Release(to_release));
110                                actions.push(Act::Error(h));
111                            } else {
112                                // COMPLETE — flush remainder, then complete
113                                s.source_completed = true;
114                                s.terminated = true;
115                                if !s.buf.is_empty() {
116                                    let flushed: Vec<HandleId> = s.buf.drain(..).collect();
117                                    actions.push(Act::Flush(flushed));
118                                }
119                                actions.push(Act::Complete);
120                            }
121                        }
122                        _ => {}
123                    }
124                }
125            }
126            for a in actions {
127                match a {
128                    Act::Flush(handles) => {
129                        let packed = bb.pack_tuple(pack_fn_id, &handles);
130                        for h in &handles {
131                            bb.release_handle(*h);
132                        }
133                        core_src.emit_or_defer(pid, packed);
134                    }
135                    Act::Complete => core_src.complete_or_defer(pid),
136                    Act::Error(h) => core_src.error_or_defer(pid, h),
137                    Act::Release(handles) => {
138                        for h in handles {
139                            bb.release_handle(h);
140                        }
141                    }
142                }
143            }
144        });
145
146        let src_outcome = ctx.subscribe_to(source, source_sink);
147        if matches!(src_outcome, SubscribeOutcome::Dead { .. }) {
148            let mut s = state.lock();
149            if !s.terminated {
150                s.terminated = true;
151                s.source_completed = true;
152                // No buffered data yet at activation time.
153                drop(s);
154                core_s.complete_or_defer(pid);
155                return;
156            }
157        }
158
159        // --- notifier sink ---
160        let st2 = state.clone();
161        let core_n = em.clone();
162        let bb2: Arc<dyn BindingBoundary> = binding_s.clone();
163        let notifier_sink: Sink = Arc::new(move |msgs| {
164            enum Act {
165                Flush(Vec<HandleId>),
166                Error(HandleId),
167                Release(Vec<HandleId>),
168            }
169            let mut actions: SmallVec<[Act; 2]> = SmallVec::new();
170            {
171                let mut s = st2.lock();
172                if s.terminated {
173                    return;
174                }
175                for m in msgs {
176                    if s.terminated {
177                        break;
178                    }
179                    match m.tier() {
180                        3 if m.payload_handle().is_some() && !s.buf.is_empty() => {
181                            let flushed: Vec<HandleId> = s.buf.drain(..).collect();
182                            actions.push(Act::Flush(flushed));
183                        }
184                        5 => {
185                            if let Some(h) = m.payload_handle() {
186                                // Notifier ERROR — terminate, release buffered
187                                s.terminated = true;
188                                bb2.retain_handle(h);
189                                let to_release: Vec<HandleId> = s.buf.drain(..).collect();
190                                actions.push(Act::Release(to_release));
191                                actions.push(Act::Error(h));
192                            }
193                            // Notifier COMPLETE: do NOT auto-complete.
194                        }
195                        _ => {}
196                    }
197                }
198            }
199            for a in actions {
200                match a {
201                    Act::Flush(handles) => {
202                        let packed = bb2.pack_tuple(pack_fn_id, &handles);
203                        for h in &handles {
204                            bb2.release_handle(*h);
205                        }
206                        core_n.emit_or_defer(pid, packed);
207                    }
208                    Act::Error(h) => core_n.error_or_defer(pid, h),
209                    Act::Release(handles) => {
210                        for h in handles {
211                            bb2.release_handle(h);
212                        }
213                    }
214                }
215            }
216        });
217
218        let not_outcome = ctx.subscribe_to(notifier, notifier_sink);
219        // Dead notifier: ignore — source COMPLETE will handle final flush.
220        let _ = not_outcome;
221    });
222
223    let fn_id = binding.register_producer_build(build);
224    core.register_producer(fn_id)
225        .expect("buffer: register_producer failed")
226}
227
228// =========================================================================
229// buffer_count(source, count) — fixed-size buffer
230// =========================================================================
231
232/// Per-buffer_count-node shared state.
233struct BufferCountState {
234    buf: Vec<HandleId>,
235    terminated: bool,
236}
237
238impl BufferCountState {
239    fn new() -> Self {
240        Self {
241            buf: Vec::new(),
242            terminated: false,
243        }
244    }
245}
246
247/// Fixed-size buffer. Accumulate DATA handles; emit packed array every
248/// `count` items.
249///
250/// - Source DATA: retain + push. When `buf.len() == count`, pack + emit + clear.
251/// - Source COMPLETE: flush remainder (may be < count), then complete.
252/// - Source ERROR: terminate, release buffered.
253///
254/// # Panics
255///
256/// Panics if `count` is 0.
257#[must_use]
258pub fn buffer_count(
259    core: &Core,
260    binding: &Arc<dyn ProducerBinding>,
261    source: NodeId,
262    count: usize,
263    pack_fn_id: FnId,
264) -> NodeId {
265    assert!(count > 0, "buffer_count: count must be > 0");
266
267    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
268        let core_s = ctx.core();
269        let binding_s = ctx.core().binding();
270        let em = ctx.emitter();
271        let pid = ctx.node_id();
272        let state: Arc<Mutex<BufferCountState>> = Arc::new(Mutex::new(BufferCountState::new()));
273
274        let st = state.clone();
275        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
276        let core_src = em.clone();
277        let source_sink: Sink = Arc::new(move |msgs| {
278            enum Act {
279                Flush(Vec<HandleId>),
280                Complete,
281                Error(HandleId),
282                Release(Vec<HandleId>),
283            }
284            let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
285            {
286                let mut s = st.lock();
287                if s.terminated {
288                    return;
289                }
290                for m in msgs {
291                    if s.terminated {
292                        break;
293                    }
294                    match m.tier() {
295                        3 => {
296                            if let Some(h) = m.payload_handle() {
297                                bb.retain_handle(h);
298                                s.buf.push(h);
299                                if s.buf.len() == count {
300                                    let flushed: Vec<HandleId> = s.buf.drain(..).collect();
301                                    actions.push(Act::Flush(flushed));
302                                }
303                            }
304                        }
305                        5 => {
306                            if let Some(h) = m.payload_handle() {
307                                // ERROR
308                                s.terminated = true;
309                                bb.retain_handle(h);
310                                let to_release: Vec<HandleId> = s.buf.drain(..).collect();
311                                actions.push(Act::Release(to_release));
312                                actions.push(Act::Error(h));
313                            } else {
314                                // COMPLETE — flush remainder
315                                s.terminated = true;
316                                if !s.buf.is_empty() {
317                                    let flushed: Vec<HandleId> = s.buf.drain(..).collect();
318                                    actions.push(Act::Flush(flushed));
319                                }
320                                actions.push(Act::Complete);
321                            }
322                        }
323                        _ => {}
324                    }
325                }
326            }
327            for a in actions {
328                match a {
329                    Act::Flush(handles) => {
330                        let packed = bb.pack_tuple(pack_fn_id, &handles);
331                        for h in &handles {
332                            bb.release_handle(*h);
333                        }
334                        core_src.emit_or_defer(pid, packed);
335                    }
336                    Act::Complete => core_src.complete_or_defer(pid),
337                    Act::Error(h) => core_src.error_or_defer(pid, h),
338                    Act::Release(handles) => {
339                        for h in handles {
340                            bb.release_handle(h);
341                        }
342                    }
343                }
344            }
345        });
346
347        let outcome = ctx.subscribe_to(source, source_sink);
348        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
349            let mut s = state.lock();
350            if !s.terminated {
351                s.terminated = true;
352                drop(s);
353                core_s.complete_or_defer(pid);
354            }
355        }
356    });
357
358    let fn_id = binding.register_producer_build(build);
359    core.register_producer(fn_id)
360        .expect("buffer_count: register_producer failed")
361}
362
363// =========================================================================
364// window(source, notifier) — notifier-triggered sub-node splitting
365// =========================================================================
366
367/// Per-window-node shared state.
368struct WindowState {
369    /// The current inner window's NodeId.
370    inner_id: Option<NodeId>,
371    terminated: bool,
372}
373
374impl WindowState {
375    fn new() -> Self {
376        Self {
377            inner_id: None,
378            terminated: false,
379        }
380    }
381}
382
383/// Splits source into sub-nodes triggered by notifier. Each "window" is
384/// a fresh state node created via `Core::register_state()`. The operator
385/// emits the inner node's `NodeId` as a handle via
386/// `binding.intern_node(inner_id)`.
387///
388/// - On activation: create first inner window node, emit it.
389/// - Source DATA: forward to current inner window via
390///   `core.emit_or_defer(inner_id, handle)`.
391/// - Notifier DATA: complete current window, create new window, emit it.
392/// - Source COMPLETE: complete current window, then complete self.
393/// - Either ERROR: error current window + error self.
394/// - Notifier COMPLETE: do NOT auto-complete.
395#[must_use]
396#[allow(clippy::too_many_lines)]
397pub fn window(
398    core: &Core,
399    binding: &Arc<dyn ProducerBinding>,
400    source: NodeId,
401    notifier: NodeId,
402) -> NodeId {
403    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
404        let core_s = ctx.core();
405        let binding_s = ctx.core().binding();
406        let em = ctx.emitter();
407        let pid = ctx.node_id();
408        let state: Arc<Mutex<WindowState>> = Arc::new(Mutex::new(WindowState::new()));
409        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
410
411        // Create first inner window node and emit it.
412        let first_inner = create_window_node(core_s, &*bb);
413        {
414            let mut s = state.lock();
415            s.inner_id = Some(first_inner.0);
416        }
417        core_s.emit_or_defer(pid, first_inner.1);
418
419        // --- source sink ---
420        // D234: the inner-window selector `s.inner_id` MUST be read
421        // INSIDE the owner-serialized defer closures (never at sink-fire
422        // time) so a source-DATA firing between a notifier-DATA and its
423        // window-roll defer still routes to the correct window. Mailbox
424        // FIFO = arrival order ⇒ a roll posted before a forward is
425        // applied first, so the in-closure `inner_id` read sees the new
426        // window. `terminated` stays a fire-time monotonic gate; retains
427        // happen at fire time (Core owns the share for the emit) and are
428        // released if `em.defer` reports the Core gone (F2 contract).
429        let st = state.clone();
430        let bb_src: Arc<dyn BindingBoundary> = binding_s.clone();
431        let em_src = em.clone();
432        let source_sink: Sink = Arc::new(move |msgs| {
433            for m in msgs {
434                // QA P1: per-message terminal gate (restores the retired
435                // `if s.terminated { break }`) — a batched
436                // `[COMPLETE, DATA]` must NOT forward DATA after the
437                // COMPLETE (R2.6.4 / no-data-after-terminal).
438                if st.lock().terminated {
439                    break;
440                }
441                match m.tier() {
442                    3 => {
443                        if let Some(h) = m.payload_handle() {
444                            bb_src.retain_handle(h);
445                            let st_c = st.clone();
446                            let bb_c = bb_src.clone();
447                            if !em_src.defer(move |c| {
448                                let inner = st_c.lock().inner_id;
449                                match inner {
450                                    Some(i) => c.emit(i, h),
451                                    None => bb_c.release_handle(h),
452                                }
453                            }) {
454                                bb_src.release_handle(h);
455                            }
456                        }
457                    }
458                    5 => {
459                        // QA P1: atomic terminal-winner — only the first
460                        // terminal (across THIS sink and the notifier
461                        // sink) defers the terminal cascade. Without
462                        // this, source-COMPLETE and notifier-ERROR each
463                        // pass their fire-time check before either defer
464                        // runs → double terminal on `pid`.
465                        let was = std::mem::replace(&mut st.lock().terminated, true);
466                        if was {
467                            break;
468                        }
469                        if let Some(h) = m.payload_handle() {
470                            // ERROR — error current window + error self.
471                            bb_src.retain_handle(h);
472                            let st_c = st.clone();
473                            let bb_c = bb_src.clone();
474                            if !em_src.defer(move |c| {
475                                if let Some(i) = st_c.lock().inner_id.take() {
476                                    bb_c.retain_handle(h);
477                                    c.error(i, h);
478                                }
479                                c.error(pid, h);
480                            }) {
481                                bb_src.release_handle(h);
482                            }
483                        } else {
484                            // COMPLETE — complete current window, then self.
485                            let st_c = st.clone();
486                            let _ = em_src.defer(move |c| {
487                                if let Some(i) = st_c.lock().inner_id.take() {
488                                    c.complete(i);
489                                }
490                                c.complete(pid);
491                            });
492                        }
493                        break;
494                    }
495                    _ => {}
496                }
497            }
498        });
499
500        let src_outcome = ctx.subscribe_to(source, source_sink);
501        if matches!(src_outcome, SubscribeOutcome::Dead { .. }) {
502            let mut s = state.lock();
503            if !s.terminated {
504                s.terminated = true;
505                let inner = s.inner_id.take();
506                drop(s);
507                if let Some(inner_id) = inner {
508                    core_s.complete_or_defer(inner_id);
509                }
510                core_s.complete_or_defer(pid);
511                return;
512            }
513        }
514
515        // --- notifier sink ---
516        // D234: the window roll (complete old → create new → swap
517        // `inner_id` → emit new) is ONE owner-side defer closure, so it
518        // is atomic w.r.t. the FIFO drain and `create_window_node` (a
519        // `CoreFull::register_state`) runs in-wave. A roll posted before
520        // a later source-forward defer is applied first ⇒ that forward's
521        // in-closure `inner_id` read sees the new window. The `new_id`
522        // is consumed entirely inside the closure (drives captured
523        // state); a Core-gone `defer` drops it unrun with nothing
524        // created/retained yet (no leak — that is why the create lives
525        // in the closure, not before it).
526        let st2 = state.clone();
527        let em_not = em.clone();
528        let bb_not: Arc<dyn BindingBoundary> = binding_s.clone();
529        let notifier_sink: Sink = Arc::new(move |msgs| {
530            for m in msgs {
531                // QA P1: per-message terminal gate.
532                if st2.lock().terminated {
533                    break;
534                }
535                match m.tier() {
536                    3 if m.payload_handle().is_some() => {
537                        let st_c = st2.clone();
538                        let bb_c = bb_not.clone();
539                        let _ = em_not.defer(move |c| {
540                            let (new_id, new_handle) = create_window_node(c, &*bb_c);
541                            let old_inner = st_c.lock().inner_id.replace(new_id);
542                            // Degenerate `None` (unreachable when not
543                            // terminated): there is no prior window to
544                            // complete; just emit the new one.
545                            if let Some(old) = old_inner {
546                                c.complete(old);
547                            }
548                            c.emit(pid, new_handle);
549                        });
550                    }
551                    5 => {
552                        if let Some(h) = m.payload_handle() {
553                            // Notifier ERROR — error current window + self.
554                            // QA P1: atomic terminal-winner vs the source
555                            // sink's COMPLETE/ERROR (no double-terminal).
556                            let was = std::mem::replace(&mut st2.lock().terminated, true);
557                            if was {
558                                break;
559                            }
560                            bb_not.retain_handle(h);
561                            let st_c = st2.clone();
562                            let bb_c = bb_not.clone();
563                            if !em_not.defer(move |c| {
564                                if let Some(inner) = st_c.lock().inner_id.take() {
565                                    bb_c.retain_handle(h);
566                                    c.error(inner, h);
567                                }
568                                c.error(pid, h);
569                            }) {
570                                bb_not.release_handle(h);
571                            }
572                            break;
573                        }
574                        // Notifier COMPLETE: do NOT auto-complete.
575                    }
576                    _ => {}
577                }
578            }
579        });
580
581        let _ = ctx.subscribe_to(notifier, notifier_sink);
582    });
583
584    let fn_id = binding.register_producer_build(build);
585    core.register_producer(fn_id)
586        .expect("window: register_producer failed")
587}
588
589/// Create a new inner window state node and return `(NodeId, HandleId)`.
590/// The `HandleId` represents the inner node as a value (via `intern_node`).
591fn create_window_node(
592    core: &dyn graphrefly_core::CoreFull,
593    binding: &dyn BindingBoundary,
594) -> (NodeId, HandleId) {
595    let inner_id = core
596        .register_state(graphrefly_core::NO_HANDLE, false)
597        .expect("window: register_state for inner node failed");
598    let handle = binding.intern_node(inner_id);
599    (inner_id, handle)
600}
601
602// =========================================================================
603// window_count(source, count) — count-based sub-node splitting
604// =========================================================================
605
606/// Per-window_count-node shared state.
607struct WindowCountState {
608    /// The current inner window's NodeId.
609    inner_id: Option<NodeId>,
610    /// Items forwarded to the current window.
611    counter: usize,
612    terminated: bool,
613}
614
615impl WindowCountState {
616    fn new() -> Self {
617        Self {
618            inner_id: None,
619            counter: 0,
620            terminated: false,
621        }
622    }
623}
624
625/// Like [`window`] but closes window every `count` DATA items.
626///
627/// - On activation: create first window, emit it.
628/// - Source DATA: forward to current window + increment counter. When
629///   counter hits `count`, complete current window, create new one, emit
630///   it, reset counter.
631/// - Source COMPLETE: complete current window (even if < count items),
632///   complete self.
633/// - Source ERROR: error current window, error self.
634///
635/// # Panics
636///
637/// Panics if `count` is 0.
638#[must_use]
639#[allow(clippy::too_many_lines)]
640pub fn window_count(
641    core: &Core,
642    binding: &Arc<dyn ProducerBinding>,
643    source: NodeId,
644    count: usize,
645) -> NodeId {
646    assert!(count > 0, "window_count: count must be > 0");
647
648    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
649        let core_s = ctx.core();
650        let binding_s = ctx.core().binding();
651        let em = ctx.emitter();
652        let pid = ctx.node_id();
653        let state: Arc<Mutex<WindowCountState>> = Arc::new(Mutex::new(WindowCountState::new()));
654        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
655
656        // Create first inner window and emit it.
657        let first_inner = create_window_node(core_s, &*bb);
658        {
659            let mut s = state.lock();
660            s.inner_id = Some(first_inner.0);
661            s.counter = 0;
662        }
663        core_s.emit_or_defer(pid, first_inner.1);
664
665        // --- source sink ---
666        // D234: forward + count + window-roll are ONE owner-side defer
667        // per source-DATA. Defer closures run serially in FIFO drain
668        // order, so the counter increment, the threshold check, the
669        // `create_window_node` (a `CoreFull::register_state`), the
670        // `inner_id` swap, the old-window complete and the new-window
671        // emit are all consistent — equivalent to the old
672        // synchronous-under-lock path, just relocated owner-side. The
673        // operator `Mutex` is dropped before every `CoreFull` call
674        // (no operator-lock held across Core re-entry); it is safe to
675        // re-lock because no other defer runs concurrently (serial drain).
676        let st = state.clone();
677        let bb_src: Arc<dyn BindingBoundary> = binding_s.clone();
678        let em_src = em.clone();
679        let source_sink: Sink = Arc::new(move |msgs| {
680            for m in msgs {
681                // QA P1: per-message terminal gate.
682                if st.lock().terminated {
683                    break;
684                }
685                match m.tier() {
686                    3 => {
687                        if let Some(h) = m.payload_handle() {
688                            bb_src.retain_handle(h);
689                            let st_c = st.clone();
690                            let bb_c = bb_src.clone();
691                            if !em_src.defer(move |c| {
692                                let (inner, roll) = {
693                                    let mut s = st_c.lock();
694                                    match s.inner_id {
695                                        Some(inner) => {
696                                            s.counter += 1;
697                                            let roll = s.counter == count;
698                                            (Some(inner), roll)
699                                        }
700                                        None => (None, false),
701                                    }
702                                };
703                                let Some(inner) = inner else {
704                                    bb_c.release_handle(h);
705                                    return;
706                                };
707                                c.emit(inner, h);
708                                if roll {
709                                    let (new_id, new_handle) = create_window_node(c, &*bb_c);
710                                    {
711                                        let mut s = st_c.lock();
712                                        s.inner_id = Some(new_id);
713                                        s.counter = 0;
714                                    }
715                                    c.complete(inner);
716                                    c.emit(pid, new_handle);
717                                }
718                            }) {
719                                bb_src.release_handle(h);
720                            }
721                        }
722                    }
723                    5 => {
724                        // QA P1: atomic terminal-winner.
725                        let was = std::mem::replace(&mut st.lock().terminated, true);
726                        if was {
727                            break;
728                        }
729                        if let Some(h) = m.payload_handle() {
730                            // ERROR — error current window + self.
731                            bb_src.retain_handle(h);
732                            let st_c = st.clone();
733                            let bb_c = bb_src.clone();
734                            if !em_src.defer(move |c| {
735                                if let Some(inner) = st_c.lock().inner_id.take() {
736                                    bb_c.retain_handle(h);
737                                    c.error(inner, h);
738                                }
739                                c.error(pid, h);
740                            }) {
741                                bb_src.release_handle(h);
742                            }
743                        } else {
744                            // COMPLETE.
745                            let st_c = st.clone();
746                            let _ = em_src.defer(move |c| {
747                                if let Some(inner) = st_c.lock().inner_id.take() {
748                                    c.complete(inner);
749                                }
750                                c.complete(pid);
751                            });
752                        }
753                        break;
754                    }
755                    _ => {}
756                }
757            }
758        });
759
760        let outcome = ctx.subscribe_to(source, source_sink);
761        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
762            let mut s = state.lock();
763            if !s.terminated {
764                s.terminated = true;
765                let inner = s.inner_id.take();
766                drop(s);
767                if let Some(inner_id) = inner {
768                    core_s.complete_or_defer(inner_id);
769                }
770                core_s.complete_or_defer(pid);
771            }
772        }
773    });
774
775    let fn_id = binding.register_producer_build(build);
776    core.register_producer(fn_id)
777        .expect("window_count: register_producer failed")
778}