Skip to main content

graphrefly_operators/
buffer.rs

1//! Buffer operators — collect and batch reactive values.
2//!
3//! # Operators
4//!
5//! - [`buffer`] — notifier-triggered flush of accumulated DATA handles.
6//! - [`buffer_count`] — fixed-count flush.
7//! - [`window`] — notifier-triggered sub-node splitting.
8//! - [`window_count`] — count-based sub-node splitting.
9
10use std::sync::{Arc, Weak};
11
12use parking_lot::Mutex;
13
14use graphrefly_core::{BindingBoundary, Core, FnId, HandleId, NodeId, Sink};
15use smallvec::SmallVec;
16
17use crate::producer::{ProducerBinding, ProducerBuildFn, ProducerCtx, SubscribeOutcome};
18
19// =========================================================================
20// buffer(source, notifier) — notifier-triggered flush
21// =========================================================================
22
23/// Per-buffer-node shared state.
24struct BufferState {
25    buf: Vec<HandleId>,
26    terminated: bool,
27    source_completed: bool,
28}
29
30impl BufferState {
31    fn new() -> Self {
32        Self {
33            buf: Vec::new(),
34            terminated: false,
35            source_completed: false,
36        }
37    }
38}
39
40/// Buffers source DATA handles; flushes as a packed array when notifier
41/// emits DATA.
42///
43/// - Source DATA: retain + push to buffer.
44/// - Notifier DATA: pack buffer via `pack_tuple`, emit, clear, release
45///   component handles.
46/// - Source COMPLETE: flush remaining buffer (if non-empty), then complete.
47/// - Either ERROR: terminate immediately, release buffered handles.
48/// - Notifier COMPLETE: does NOT auto-complete (keep buffering; source
49///   COMPLETE triggers final flush).
50#[must_use]
51#[allow(clippy::too_many_lines)]
52pub fn buffer(
53    core: &Core,
54    binding: &Arc<dyn ProducerBinding>,
55    source: NodeId,
56    notifier: NodeId,
57    pack_fn_id: FnId,
58) -> NodeId {
59    let core_weak = core.weak_handle();
60    let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
61
62    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
63        let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
64            return;
65        };
66        let pid = ctx.node_id();
67        let state: Arc<Mutex<BufferState>> = Arc::new(Mutex::new(BufferState::new()));
68
69        // --- source sink ---
70        let st = state.clone();
71        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
72        let core_src = core_s.clone();
73        let source_sink: Sink = Arc::new(move |msgs| {
74            enum Act {
75                Flush(Vec<HandleId>),
76                Complete,
77                Error(HandleId),
78                Release(Vec<HandleId>),
79            }
80            let mut actions: SmallVec<[Act; 2]> = SmallVec::new();
81            {
82                let mut s = st.lock();
83                if s.terminated {
84                    return;
85                }
86                for m in msgs {
87                    if s.terminated {
88                        break;
89                    }
90                    match m.tier() {
91                        3 => {
92                            if let Some(h) = m.payload_handle() {
93                                bb.retain_handle(h);
94                                s.buf.push(h);
95                            }
96                        }
97                        5 => {
98                            if let Some(h) = m.payload_handle() {
99                                // ERROR — terminate, release buffered
100                                s.terminated = true;
101                                bb.retain_handle(h);
102                                let to_release: Vec<HandleId> = s.buf.drain(..).collect();
103                                actions.push(Act::Release(to_release));
104                                actions.push(Act::Error(h));
105                            } else {
106                                // COMPLETE — flush remainder, then complete
107                                s.source_completed = true;
108                                s.terminated = true;
109                                if !s.buf.is_empty() {
110                                    let flushed: Vec<HandleId> = s.buf.drain(..).collect();
111                                    actions.push(Act::Flush(flushed));
112                                }
113                                actions.push(Act::Complete);
114                            }
115                        }
116                        _ => {}
117                    }
118                }
119            }
120            for a in actions {
121                match a {
122                    Act::Flush(handles) => {
123                        let packed = bb.pack_tuple(pack_fn_id, &handles);
124                        for h in &handles {
125                            bb.release_handle(*h);
126                        }
127                        core_src.emit_or_defer(pid, packed);
128                    }
129                    Act::Complete => core_src.complete_or_defer(pid),
130                    Act::Error(h) => core_src.error_or_defer(pid, h),
131                    Act::Release(handles) => {
132                        for h in handles {
133                            bb.release_handle(h);
134                        }
135                    }
136                }
137            }
138        });
139
140        let src_outcome = ctx.subscribe_to(source, source_sink);
141        if matches!(src_outcome, SubscribeOutcome::Dead { .. }) {
142            let mut s = state.lock();
143            if !s.terminated {
144                s.terminated = true;
145                s.source_completed = true;
146                // No buffered data yet at activation time.
147                drop(s);
148                core_s.complete_or_defer(pid);
149                return;
150            }
151        }
152
153        // --- notifier sink ---
154        let st2 = state.clone();
155        let core_n = core_s.clone();
156        let bb2: Arc<dyn BindingBoundary> = binding_s.clone();
157        let notifier_sink: Sink = Arc::new(move |msgs| {
158            enum Act {
159                Flush(Vec<HandleId>),
160                Error(HandleId),
161                Release(Vec<HandleId>),
162            }
163            let mut actions: SmallVec<[Act; 2]> = SmallVec::new();
164            {
165                let mut s = st2.lock();
166                if s.terminated {
167                    return;
168                }
169                for m in msgs {
170                    if s.terminated {
171                        break;
172                    }
173                    match m.tier() {
174                        3 if m.payload_handle().is_some() && !s.buf.is_empty() => {
175                            let flushed: Vec<HandleId> = s.buf.drain(..).collect();
176                            actions.push(Act::Flush(flushed));
177                        }
178                        5 => {
179                            if let Some(h) = m.payload_handle() {
180                                // Notifier ERROR — terminate, release buffered
181                                s.terminated = true;
182                                bb2.retain_handle(h);
183                                let to_release: Vec<HandleId> = s.buf.drain(..).collect();
184                                actions.push(Act::Release(to_release));
185                                actions.push(Act::Error(h));
186                            }
187                            // Notifier COMPLETE: do NOT auto-complete.
188                        }
189                        _ => {}
190                    }
191                }
192            }
193            for a in actions {
194                match a {
195                    Act::Flush(handles) => {
196                        let packed = bb2.pack_tuple(pack_fn_id, &handles);
197                        for h in &handles {
198                            bb2.release_handle(*h);
199                        }
200                        core_n.emit_or_defer(pid, packed);
201                    }
202                    Act::Error(h) => core_n.error_or_defer(pid, h),
203                    Act::Release(handles) => {
204                        for h in handles {
205                            bb2.release_handle(h);
206                        }
207                    }
208                }
209            }
210        });
211
212        let not_outcome = ctx.subscribe_to(notifier, notifier_sink);
213        // Dead notifier: ignore — source COMPLETE will handle final flush.
214        let _ = not_outcome;
215    });
216
217    let fn_id = binding.register_producer_build(build);
218    core.register_producer(fn_id)
219        .expect("buffer: register_producer failed")
220}
221
222// =========================================================================
223// buffer_count(source, count) — fixed-size buffer
224// =========================================================================
225
226/// Per-buffer_count-node shared state.
227struct BufferCountState {
228    buf: Vec<HandleId>,
229    terminated: bool,
230}
231
232impl BufferCountState {
233    fn new() -> Self {
234        Self {
235            buf: Vec::new(),
236            terminated: false,
237        }
238    }
239}
240
241/// Fixed-size buffer. Accumulate DATA handles; emit packed array every
242/// `count` items.
243///
244/// - Source DATA: retain + push. When `buf.len() == count`, pack + emit + clear.
245/// - Source COMPLETE: flush remainder (may be < count), then complete.
246/// - Source ERROR: terminate, release buffered.
247///
248/// # Panics
249///
250/// Panics if `count` is 0.
251#[must_use]
252pub fn buffer_count(
253    core: &Core,
254    binding: &Arc<dyn ProducerBinding>,
255    source: NodeId,
256    count: usize,
257    pack_fn_id: FnId,
258) -> NodeId {
259    assert!(count > 0, "buffer_count: count must be > 0");
260
261    let core_weak = core.weak_handle();
262    let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
263
264    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
265        let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
266            return;
267        };
268        let pid = ctx.node_id();
269        let state: Arc<Mutex<BufferCountState>> = Arc::new(Mutex::new(BufferCountState::new()));
270
271        let st = state.clone();
272        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
273        let core_src = core_s.clone();
274        let source_sink: Sink = Arc::new(move |msgs| {
275            enum Act {
276                Flush(Vec<HandleId>),
277                Complete,
278                Error(HandleId),
279                Release(Vec<HandleId>),
280            }
281            let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
282            {
283                let mut s = st.lock();
284                if s.terminated {
285                    return;
286                }
287                for m in msgs {
288                    if s.terminated {
289                        break;
290                    }
291                    match m.tier() {
292                        3 => {
293                            if let Some(h) = m.payload_handle() {
294                                bb.retain_handle(h);
295                                s.buf.push(h);
296                                if s.buf.len() == count {
297                                    let flushed: Vec<HandleId> = s.buf.drain(..).collect();
298                                    actions.push(Act::Flush(flushed));
299                                }
300                            }
301                        }
302                        5 => {
303                            if let Some(h) = m.payload_handle() {
304                                // ERROR
305                                s.terminated = true;
306                                bb.retain_handle(h);
307                                let to_release: Vec<HandleId> = s.buf.drain(..).collect();
308                                actions.push(Act::Release(to_release));
309                                actions.push(Act::Error(h));
310                            } else {
311                                // COMPLETE — flush remainder
312                                s.terminated = true;
313                                if !s.buf.is_empty() {
314                                    let flushed: Vec<HandleId> = s.buf.drain(..).collect();
315                                    actions.push(Act::Flush(flushed));
316                                }
317                                actions.push(Act::Complete);
318                            }
319                        }
320                        _ => {}
321                    }
322                }
323            }
324            for a in actions {
325                match a {
326                    Act::Flush(handles) => {
327                        let packed = bb.pack_tuple(pack_fn_id, &handles);
328                        for h in &handles {
329                            bb.release_handle(*h);
330                        }
331                        core_src.emit_or_defer(pid, packed);
332                    }
333                    Act::Complete => core_src.complete_or_defer(pid),
334                    Act::Error(h) => core_src.error_or_defer(pid, h),
335                    Act::Release(handles) => {
336                        for h in handles {
337                            bb.release_handle(h);
338                        }
339                    }
340                }
341            }
342        });
343
344        let outcome = ctx.subscribe_to(source, source_sink);
345        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
346            let mut s = state.lock();
347            if !s.terminated {
348                s.terminated = true;
349                drop(s);
350                core_s.complete_or_defer(pid);
351            }
352        }
353    });
354
355    let fn_id = binding.register_producer_build(build);
356    core.register_producer(fn_id)
357        .expect("buffer_count: register_producer failed")
358}
359
360// =========================================================================
361// window(source, notifier) — notifier-triggered sub-node splitting
362// =========================================================================
363
364/// Per-window-node shared state.
365struct WindowState {
366    /// The current inner window's NodeId.
367    inner_id: Option<NodeId>,
368    terminated: bool,
369}
370
371impl WindowState {
372    fn new() -> Self {
373        Self {
374            inner_id: None,
375            terminated: false,
376        }
377    }
378}
379
380/// Splits source into sub-nodes triggered by notifier. Each "window" is
381/// a fresh state node created via `Core::register_state()`. The operator
382/// emits the inner node's `NodeId` as a handle via
383/// `binding.intern_node(inner_id)`.
384///
385/// - On activation: create first inner window node, emit it.
386/// - Source DATA: forward to current inner window via
387///   `core.emit_or_defer(inner_id, handle)`.
388/// - Notifier DATA: complete current window, create new window, emit it.
389/// - Source COMPLETE: complete current window, then complete self.
390/// - Either ERROR: error current window + error self.
391/// - Notifier COMPLETE: do NOT auto-complete.
392#[must_use]
393#[allow(clippy::too_many_lines)]
394pub fn window(
395    core: &Core,
396    binding: &Arc<dyn ProducerBinding>,
397    source: NodeId,
398    notifier: NodeId,
399) -> NodeId {
400    let core_weak = core.weak_handle();
401    let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
402
403    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
404        let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
405            return;
406        };
407        let pid = ctx.node_id();
408        let state: Arc<Mutex<WindowState>> = Arc::new(Mutex::new(WindowState::new()));
409        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
410
411        // Create first inner window node and emit it.
412        let first_inner = create_window_node(&core_s, &*bb);
413        {
414            let mut s = state.lock();
415            s.inner_id = Some(first_inner.0);
416        }
417        core_s.emit_or_defer(pid, first_inner.1);
418
419        // --- source sink ---
420        let st = state.clone();
421        let bb_src: Arc<dyn BindingBoundary> = binding_s.clone();
422        let core_src = core_s.clone();
423        let source_sink: Sink = Arc::new(move |msgs| {
424            enum Act {
425                Forward(NodeId, HandleId),
426                CompleteInner(NodeId),
427                CompleteSelf,
428                ErrorInner(NodeId, HandleId),
429                ErrorSelf(HandleId),
430            }
431            let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
432            {
433                let mut s = st.lock();
434                if s.terminated {
435                    return;
436                }
437                for m in msgs {
438                    if s.terminated {
439                        break;
440                    }
441                    match m.tier() {
442                        3 => {
443                            if let Some(h) = m.payload_handle() {
444                                bb_src.retain_handle(h);
445                                if let Some(inner) = s.inner_id {
446                                    actions.push(Act::Forward(inner, h));
447                                } else {
448                                    bb_src.release_handle(h);
449                                }
450                            }
451                        }
452                        5 => {
453                            if let Some(h) = m.payload_handle() {
454                                // ERROR — error current window + error self
455                                s.terminated = true;
456                                bb_src.retain_handle(h);
457                                if let Some(inner) = s.inner_id.take() {
458                                    // Retain again for inner error
459                                    bb_src.retain_handle(h);
460                                    actions.push(Act::ErrorInner(inner, h));
461                                }
462                                actions.push(Act::ErrorSelf(h));
463                            } else {
464                                // COMPLETE — complete current window, then self
465                                s.terminated = true;
466                                if let Some(inner) = s.inner_id.take() {
467                                    actions.push(Act::CompleteInner(inner));
468                                }
469                                actions.push(Act::CompleteSelf);
470                            }
471                        }
472                        _ => {}
473                    }
474                }
475            }
476            for a in actions {
477                match a {
478                    Act::Forward(inner, h) => core_src.emit_or_defer(inner, h),
479                    Act::CompleteInner(inner) => core_src.complete_or_defer(inner),
480                    Act::CompleteSelf => core_src.complete_or_defer(pid),
481                    Act::ErrorInner(inner, h) => core_src.error_or_defer(inner, h),
482                    Act::ErrorSelf(h) => core_src.error_or_defer(pid, h),
483                }
484            }
485        });
486
487        let src_outcome = ctx.subscribe_to(source, source_sink);
488        if matches!(src_outcome, SubscribeOutcome::Dead { .. }) {
489            let mut s = state.lock();
490            if !s.terminated {
491                s.terminated = true;
492                let inner = s.inner_id.take();
493                drop(s);
494                if let Some(inner_id) = inner {
495                    core_s.complete_or_defer(inner_id);
496                }
497                core_s.complete_or_defer(pid);
498                return;
499            }
500        }
501
502        // --- notifier sink ---
503        let st2 = state.clone();
504        let core_n = core_s.clone();
505        let bb_not: Arc<dyn BindingBoundary> = binding_s.clone();
506        let notifier_sink: Sink = Arc::new(move |msgs| {
507            enum Act {
508                CompleteOldEmitNew(NodeId, HandleId),
509                ErrorInner(NodeId, HandleId),
510                ErrorSelf(HandleId),
511            }
512            let mut actions: SmallVec<[Act; 2]> = SmallVec::new();
513            {
514                let mut s = st2.lock();
515                if s.terminated {
516                    return;
517                }
518                for m in msgs {
519                    if s.terminated {
520                        break;
521                    }
522                    match m.tier() {
523                        3 if m.payload_handle().is_some() => {
524                            // Complete current window, create new one, emit it.
525                            let old_inner = s.inner_id.take();
526                            let (new_id, new_handle) = create_window_node(&core_n, &*bb_not);
527                            s.inner_id = Some(new_id);
528                            if let Some(old) = old_inner {
529                                actions.push(Act::CompleteOldEmitNew(old, new_handle));
530                            } else {
531                                // Degenerate: inner_id was None (unreachable if
532                                // not terminated). Emit the new window directly.
533                                actions.push(Act::CompleteOldEmitNew(new_id, new_handle));
534                            }
535                        }
536                        5 => {
537                            if let Some(h) = m.payload_handle() {
538                                // Notifier ERROR — error current window + error self
539                                s.terminated = true;
540                                bb_not.retain_handle(h);
541                                if let Some(inner) = s.inner_id.take() {
542                                    bb_not.retain_handle(h);
543                                    actions.push(Act::ErrorInner(inner, h));
544                                }
545                                actions.push(Act::ErrorSelf(h));
546                            }
547                            // Notifier COMPLETE: do NOT auto-complete.
548                        }
549                        _ => {}
550                    }
551                }
552            }
553            for a in actions {
554                match a {
555                    Act::CompleteOldEmitNew(old, new_handle) => {
556                        core_n.complete_or_defer(old);
557                        core_n.emit_or_defer(pid, new_handle);
558                    }
559                    Act::ErrorInner(inner, h) => core_n.error_or_defer(inner, h),
560                    Act::ErrorSelf(h) => core_n.error_or_defer(pid, h),
561                }
562            }
563        });
564
565        let _ = ctx.subscribe_to(notifier, notifier_sink);
566    });
567
568    let fn_id = binding.register_producer_build(build);
569    core.register_producer(fn_id)
570        .expect("window: register_producer failed")
571}
572
573/// Create a new inner window state node and return `(NodeId, HandleId)`.
574/// The `HandleId` represents the inner node as a value (via `intern_node`).
575fn create_window_node(core: &Core, binding: &dyn BindingBoundary) -> (NodeId, HandleId) {
576    let inner_id = core
577        .register_state(graphrefly_core::NO_HANDLE, false)
578        .expect("window: register_state for inner node failed");
579    let handle = binding.intern_node(inner_id);
580    (inner_id, handle)
581}
582
583// =========================================================================
584// window_count(source, count) — count-based sub-node splitting
585// =========================================================================
586
587/// Per-window_count-node shared state.
588struct WindowCountState {
589    /// The current inner window's NodeId.
590    inner_id: Option<NodeId>,
591    /// Items forwarded to the current window.
592    counter: usize,
593    terminated: bool,
594}
595
596impl WindowCountState {
597    fn new() -> Self {
598        Self {
599            inner_id: None,
600            counter: 0,
601            terminated: false,
602        }
603    }
604}
605
606/// Like [`window`] but closes window every `count` DATA items.
607///
608/// - On activation: create first window, emit it.
609/// - Source DATA: forward to current window + increment counter. When
610///   counter hits `count`, complete current window, create new one, emit
611///   it, reset counter.
612/// - Source COMPLETE: complete current window (even if < count items),
613///   complete self.
614/// - Source ERROR: error current window, error self.
615///
616/// # Panics
617///
618/// Panics if `count` is 0.
619#[must_use]
620#[allow(clippy::too_many_lines)]
621pub fn window_count(
622    core: &Core,
623    binding: &Arc<dyn ProducerBinding>,
624    source: NodeId,
625    count: usize,
626) -> NodeId {
627    assert!(count > 0, "window_count: count must be > 0");
628
629    let core_weak = core.weak_handle();
630    let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
631
632    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
633        let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
634            return;
635        };
636        let pid = ctx.node_id();
637        let state: Arc<Mutex<WindowCountState>> = Arc::new(Mutex::new(WindowCountState::new()));
638        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
639
640        // Create first inner window and emit it.
641        let first_inner = create_window_node(&core_s, &*bb);
642        {
643            let mut s = state.lock();
644            s.inner_id = Some(first_inner.0);
645            s.counter = 0;
646        }
647        core_s.emit_or_defer(pid, first_inner.1);
648
649        // --- source sink ---
650        let st = state.clone();
651        let bb_src: Arc<dyn BindingBoundary> = binding_s.clone();
652        let core_src = core_s.clone();
653        let source_sink: Sink = Arc::new(move |msgs| {
654            enum Act {
655                Forward(NodeId, HandleId),
656                CompleteWindowEmitNew(NodeId, HandleId),
657                CompleteInner(NodeId),
658                CompleteSelf,
659                ErrorInner(NodeId, HandleId),
660                ErrorSelf(HandleId),
661            }
662            let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
663            {
664                let mut s = st.lock();
665                if s.terminated {
666                    return;
667                }
668                for m in msgs {
669                    if s.terminated {
670                        break;
671                    }
672                    match m.tier() {
673                        3 => {
674                            if let Some(h) = m.payload_handle() {
675                                bb_src.retain_handle(h);
676                                if let Some(inner) = s.inner_id {
677                                    actions.push(Act::Forward(inner, h));
678                                    s.counter += 1;
679                                    if s.counter == count {
680                                        // Window full — complete it, open new one.
681                                        let (new_id, new_handle) =
682                                            create_window_node(&core_src, &*bb_src);
683                                        actions.push(Act::CompleteWindowEmitNew(inner, new_handle));
684                                        s.inner_id = Some(new_id);
685                                        s.counter = 0;
686                                    }
687                                } else {
688                                    bb_src.release_handle(h);
689                                }
690                            }
691                        }
692                        5 => {
693                            if let Some(h) = m.payload_handle() {
694                                // ERROR — error current window + self
695                                s.terminated = true;
696                                bb_src.retain_handle(h);
697                                if let Some(inner) = s.inner_id.take() {
698                                    bb_src.retain_handle(h);
699                                    actions.push(Act::ErrorInner(inner, h));
700                                }
701                                actions.push(Act::ErrorSelf(h));
702                            } else {
703                                // COMPLETE
704                                s.terminated = true;
705                                if let Some(inner) = s.inner_id.take() {
706                                    actions.push(Act::CompleteInner(inner));
707                                }
708                                actions.push(Act::CompleteSelf);
709                            }
710                        }
711                        _ => {}
712                    }
713                }
714            }
715            for a in actions {
716                match a {
717                    Act::Forward(inner, h) => core_src.emit_or_defer(inner, h),
718                    Act::CompleteWindowEmitNew(old_inner, new_handle) => {
719                        core_src.complete_or_defer(old_inner);
720                        core_src.emit_or_defer(pid, new_handle);
721                    }
722                    Act::CompleteInner(inner) => core_src.complete_or_defer(inner),
723                    Act::CompleteSelf => core_src.complete_or_defer(pid),
724                    Act::ErrorInner(inner, h) => core_src.error_or_defer(inner, h),
725                    Act::ErrorSelf(h) => core_src.error_or_defer(pid, h),
726                }
727            }
728        });
729
730        let outcome = ctx.subscribe_to(source, source_sink);
731        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
732            let mut s = state.lock();
733            if !s.terminated {
734                s.terminated = true;
735                let inner = s.inner_id.take();
736                drop(s);
737                if let Some(inner_id) = inner {
738                    core_s.complete_or_defer(inner_id);
739                }
740                core_s.complete_or_defer(pid);
741            }
742        }
743    });
744
745    let fn_id = binding.register_producer_build(build);
746    core.register_producer(fn_id)
747        .expect("window_count: register_producer failed")
748}