Skip to main content

graphrefly_operators/
buffer.rs

1//! Buffer operators — collect and batch reactive values.
2//!
3//! # Operators
4//!
5//! - [`buffer`] — notifier-triggered flush of accumulated DATA handles.
6//! - [`buffer_count`] — fixed-count flush.
7//! - [`window`] — notifier-triggered sub-node splitting.
8//! - [`window_count`] — count-based sub-node splitting.
9
10use std::cell::RefCell;
11use std::rc::Rc;
12use std::sync::Arc;
13
14use graphrefly_core::{BindingBoundary, Core, FnId, HandleId, NodeId, Sink};
15use smallvec::SmallVec;
16
17use crate::producer::{ProducerBinding, ProducerBuildFn, ProducerCtx, SubscribeOutcome};
18
19// =========================================================================
20// buffer(source, notifier) — notifier-triggered flush
21// =========================================================================
22
23/// Per-buffer-node shared state.
24struct BufferState {
25    buf: Vec<HandleId>,
26    terminated: bool,
27    source_completed: bool,
28}
29
30impl BufferState {
31    fn new() -> Self {
32        Self {
33            buf: Vec::new(),
34            terminated: false,
35            source_completed: false,
36        }
37    }
38}
39
40/// Buffers source DATA handles; flushes as a packed array when notifier
41/// emits DATA.
42///
43/// - Source DATA: retain + push to buffer.
44/// - Notifier DATA: pack buffer via `pack_tuple`, emit, clear, release
45///   component handles.
46/// - Source COMPLETE: flush remaining buffer (if non-empty), then complete.
47/// - Either ERROR: terminate immediately, release buffered handles.
48/// - Notifier COMPLETE: does NOT auto-complete (keep buffering; source
49///   COMPLETE triggers final flush).
50#[must_use]
51#[allow(clippy::too_many_lines)]
52pub fn buffer(
53    core: &Core,
54    binding: &Arc<dyn ProducerBinding>,
55    source: NodeId,
56    notifier: NodeId,
57    pack_fn_id: FnId,
58) -> NodeId {
59    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
60        let core_s = ctx.core();
61        let binding_s = ctx.core().binding();
62        let em = ctx.emitter();
63        let pid = ctx.node_id();
64        let state: Rc<RefCell<BufferState>> = Rc::new(RefCell::new(BufferState::new()));
65
66        // --- source sink ---
67        let st = state.clone();
68        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
69        let core_src = em.clone();
70        let source_sink: Sink = Rc::new(move |msgs| {
71            enum Act {
72                Flush(Vec<HandleId>),
73                Complete,
74                Error(HandleId),
75                Release(Vec<HandleId>),
76            }
77            let mut actions: SmallVec<[Act; 2]> = SmallVec::new();
78            {
79                let mut s = st.borrow_mut();
80                if s.terminated {
81                    return;
82                }
83                for m in msgs {
84                    if s.terminated {
85                        break;
86                    }
87                    match m.tier() {
88                        3 => {
89                            if let Some(h) = m.payload_handle() {
90                                bb.retain_handle(h);
91                                s.buf.push(h);
92                            }
93                        }
94                        5 => {
95                            if let Some(h) = m.payload_handle() {
96                                // ERROR — terminate, release buffered
97                                s.terminated = true;
98                                bb.retain_handle(h);
99                                let to_release: Vec<HandleId> = s.buf.drain(..).collect();
100                                actions.push(Act::Release(to_release));
101                                actions.push(Act::Error(h));
102                            } else {
103                                // COMPLETE — flush remainder, then complete
104                                s.source_completed = true;
105                                s.terminated = true;
106                                if !s.buf.is_empty() {
107                                    let flushed: Vec<HandleId> = s.buf.drain(..).collect();
108                                    actions.push(Act::Flush(flushed));
109                                }
110                                actions.push(Act::Complete);
111                            }
112                        }
113                        _ => {}
114                    }
115                }
116            }
117            for a in actions {
118                match a {
119                    Act::Flush(handles) => {
120                        let packed = bb.pack_tuple(pack_fn_id, &handles);
121                        for h in &handles {
122                            bb.release_handle(*h);
123                        }
124                        core_src.emit(pid, packed);
125                    }
126                    Act::Complete => core_src.complete(pid),
127                    Act::Error(h) => core_src.error(pid, h),
128                    Act::Release(handles) => {
129                        for h in handles {
130                            bb.release_handle(h);
131                        }
132                    }
133                }
134            }
135        });
136
137        let src_outcome = ctx.subscribe_to(source, source_sink);
138        if matches!(src_outcome, SubscribeOutcome::Dead { .. }) {
139            let mut s = state.borrow_mut();
140            if !s.terminated {
141                s.terminated = true;
142                s.source_completed = true;
143                // No buffered data yet at activation time.
144                drop(s);
145                core_s.complete(pid);
146                return;
147            }
148        }
149
150        // --- notifier sink ---
151        let st2 = state.clone();
152        let core_n = em.clone();
153        let bb2: Arc<dyn BindingBoundary> = binding_s.clone();
154        let notifier_sink: Sink = Rc::new(move |msgs| {
155            enum Act {
156                Flush(Vec<HandleId>),
157                Error(HandleId),
158                Release(Vec<HandleId>),
159            }
160            let mut actions: SmallVec<[Act; 2]> = SmallVec::new();
161            {
162                let mut s = st2.borrow_mut();
163                if s.terminated {
164                    return;
165                }
166                for m in msgs {
167                    if s.terminated {
168                        break;
169                    }
170                    match m.tier() {
171                        3 if m.payload_handle().is_some() && !s.buf.is_empty() => {
172                            let flushed: Vec<HandleId> = s.buf.drain(..).collect();
173                            actions.push(Act::Flush(flushed));
174                        }
175                        5 => {
176                            if let Some(h) = m.payload_handle() {
177                                // Notifier ERROR — terminate, release buffered
178                                s.terminated = true;
179                                bb2.retain_handle(h);
180                                let to_release: Vec<HandleId> = s.buf.drain(..).collect();
181                                actions.push(Act::Release(to_release));
182                                actions.push(Act::Error(h));
183                            }
184                            // Notifier COMPLETE: do NOT auto-complete.
185                        }
186                        _ => {}
187                    }
188                }
189            }
190            for a in actions {
191                match a {
192                    Act::Flush(handles) => {
193                        let packed = bb2.pack_tuple(pack_fn_id, &handles);
194                        for h in &handles {
195                            bb2.release_handle(*h);
196                        }
197                        core_n.emit(pid, packed);
198                    }
199                    Act::Error(h) => core_n.error(pid, h),
200                    Act::Release(handles) => {
201                        for h in handles {
202                            bb2.release_handle(h);
203                        }
204                    }
205                }
206            }
207        });
208
209        let not_outcome = ctx.subscribe_to(notifier, notifier_sink);
210        // Dead notifier: ignore — source COMPLETE will handle final flush.
211        let _ = not_outcome;
212    });
213
214    let fn_id = binding.register_producer_build(build);
215    core.register_producer(fn_id)
216        .expect("buffer: register_producer failed")
217}
218
219// =========================================================================
220// buffer_count(source, count) — fixed-size buffer
221// =========================================================================
222
223/// Per-buffer_count-node shared state.
224struct BufferCountState {
225    buf: Vec<HandleId>,
226    terminated: bool,
227}
228
229impl BufferCountState {
230    fn new() -> Self {
231        Self {
232            buf: Vec::new(),
233            terminated: false,
234        }
235    }
236}
237
238/// Fixed-size buffer. Accumulate DATA handles; emit packed array every
239/// `count` items.
240///
241/// - Source DATA: retain + push. When `buf.len() == count`, pack + emit + clear.
242/// - Source COMPLETE: flush remainder (may be < count), then complete.
243/// - Source ERROR: terminate, release buffered.
244///
245/// # Panics
246///
247/// Panics if `count` is 0.
248#[must_use]
249pub fn buffer_count(
250    core: &Core,
251    binding: &Arc<dyn ProducerBinding>,
252    source: NodeId,
253    count: usize,
254    pack_fn_id: FnId,
255) -> NodeId {
256    assert!(count > 0, "buffer_count: count must be > 0");
257
258    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
259        let core_s = ctx.core();
260        let binding_s = ctx.core().binding();
261        let em = ctx.emitter();
262        let pid = ctx.node_id();
263        let state: Rc<RefCell<BufferCountState>> = Rc::new(RefCell::new(BufferCountState::new()));
264
265        let st = state.clone();
266        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
267        let core_src = em.clone();
268        let source_sink: Sink = Rc::new(move |msgs| {
269            enum Act {
270                Flush(Vec<HandleId>),
271                Complete,
272                Error(HandleId),
273                Release(Vec<HandleId>),
274            }
275            let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
276            {
277                let mut s = st.borrow_mut();
278                if s.terminated {
279                    return;
280                }
281                for m in msgs {
282                    if s.terminated {
283                        break;
284                    }
285                    match m.tier() {
286                        3 => {
287                            if let Some(h) = m.payload_handle() {
288                                bb.retain_handle(h);
289                                s.buf.push(h);
290                                if s.buf.len() == count {
291                                    let flushed: Vec<HandleId> = s.buf.drain(..).collect();
292                                    actions.push(Act::Flush(flushed));
293                                }
294                            }
295                        }
296                        5 => {
297                            if let Some(h) = m.payload_handle() {
298                                // ERROR
299                                s.terminated = true;
300                                bb.retain_handle(h);
301                                let to_release: Vec<HandleId> = s.buf.drain(..).collect();
302                                actions.push(Act::Release(to_release));
303                                actions.push(Act::Error(h));
304                            } else {
305                                // COMPLETE — flush remainder
306                                s.terminated = true;
307                                if !s.buf.is_empty() {
308                                    let flushed: Vec<HandleId> = s.buf.drain(..).collect();
309                                    actions.push(Act::Flush(flushed));
310                                }
311                                actions.push(Act::Complete);
312                            }
313                        }
314                        _ => {}
315                    }
316                }
317            }
318            for a in actions {
319                match a {
320                    Act::Flush(handles) => {
321                        let packed = bb.pack_tuple(pack_fn_id, &handles);
322                        for h in &handles {
323                            bb.release_handle(*h);
324                        }
325                        core_src.emit(pid, packed);
326                    }
327                    Act::Complete => core_src.complete(pid),
328                    Act::Error(h) => core_src.error(pid, h),
329                    Act::Release(handles) => {
330                        for h in handles {
331                            bb.release_handle(h);
332                        }
333                    }
334                }
335            }
336        });
337
338        let outcome = ctx.subscribe_to(source, source_sink);
339        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
340            let mut s = state.borrow_mut();
341            if !s.terminated {
342                s.terminated = true;
343                drop(s);
344                core_s.complete(pid);
345            }
346        }
347    });
348
349    let fn_id = binding.register_producer_build(build);
350    core.register_producer(fn_id)
351        .expect("buffer_count: register_producer failed")
352}
353
354// =========================================================================
355// window(source, notifier) — notifier-triggered sub-node splitting
356// =========================================================================
357
358/// Per-window-node shared state.
359struct WindowState {
360    /// The current inner window's NodeId.
361    inner_id: Option<NodeId>,
362    terminated: bool,
363}
364
365impl WindowState {
366    fn new() -> Self {
367        Self {
368            inner_id: None,
369            terminated: false,
370        }
371    }
372}
373
374/// Splits source into sub-nodes triggered by notifier. Each "window" is
375/// a fresh state node created via `Core::register_state()`. The operator
376/// emits the inner node's `NodeId` as a handle via
377/// `binding.intern_node(inner_id)`.
378///
379/// - On activation: create first inner window node, emit it.
380/// - Source DATA: forward to current inner window via
381///   `core.emit(inner_id, handle)`.
382/// - Notifier DATA: complete current window, create new window, emit it.
383/// - Source COMPLETE: complete current window, then complete self.
384/// - Either ERROR: error current window + error self.
385/// - Notifier COMPLETE: do NOT auto-complete.
386#[must_use]
387#[allow(clippy::too_many_lines)]
388pub fn window(
389    core: &Core,
390    binding: &Arc<dyn ProducerBinding>,
391    source: NodeId,
392    notifier: NodeId,
393) -> NodeId {
394    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
395        let core_s = ctx.core();
396        let binding_s = ctx.core().binding();
397        let em = ctx.emitter();
398        let pid = ctx.node_id();
399        let state: Rc<RefCell<WindowState>> = Rc::new(RefCell::new(WindowState::new()));
400        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
401
402        // Create first inner window node and emit it.
403        let first_inner = create_window_node(core_s, &*bb);
404        {
405            let mut s = state.borrow_mut();
406            s.inner_id = Some(first_inner.0);
407        }
408        core_s.emit(pid, first_inner.1);
409
410        // --- source sink ---
411        // D234: the inner-window selector `s.inner_id` MUST be read
412        // INSIDE the owner-serialized defer closures (never at sink-fire
413        // time) so a source-DATA firing between a notifier-DATA and its
414        // window-roll defer still routes to the correct window. Mailbox
415        // FIFO = arrival order ⇒ a roll posted before a forward is
416        // applied first, so the in-closure `inner_id` read sees the new
417        // window. `terminated` stays a fire-time monotonic gate; retains
418        // happen at fire time (Core owns the share for the emit) and are
419        // released if `em.defer` reports the Core gone (F2 contract).
420        let st = state.clone();
421        let bb_src: Arc<dyn BindingBoundary> = binding_s.clone();
422        let em_src = em.clone();
423        let source_sink: Sink = Rc::new(move |msgs| {
424            for m in msgs {
425                // QA P1: per-message terminal gate (restores the retired
426                // `if s.terminated { break }`) — a batched
427                // `[COMPLETE, DATA]` must NOT forward DATA after the
428                // COMPLETE (R2.6.4 / no-data-after-terminal).
429                if st.borrow().terminated {
430                    break;
431                }
432                match m.tier() {
433                    3 => {
434                        if let Some(h) = m.payload_handle() {
435                            bb_src.retain_handle(h);
436                            let st_c = st.clone();
437                            let bb_c = bb_src.clone();
438                            if !em_src.defer(move |c| {
439                                let inner = st_c.borrow_mut().inner_id;
440                                match inner {
441                                    Some(i) => c.emit(i, h),
442                                    None => bb_c.release_handle(h),
443                                }
444                            }) {
445                                bb_src.release_handle(h);
446                            }
447                        }
448                    }
449                    5 => {
450                        // QA P1: atomic terminal-winner — only the first
451                        // terminal (across THIS sink and the notifier
452                        // sink) defers the terminal cascade. Without
453                        // this, source-COMPLETE and notifier-ERROR each
454                        // pass their fire-time check before either defer
455                        // runs → double terminal on `pid`.
456                        let was = std::mem::replace(&mut st.borrow_mut().terminated, true);
457                        if was {
458                            break;
459                        }
460                        if let Some(h) = m.payload_handle() {
461                            // ERROR — error current window + error self.
462                            bb_src.retain_handle(h);
463                            let st_c = st.clone();
464                            let bb_c = bb_src.clone();
465                            if !em_src.defer(move |c| {
466                                if let Some(i) = st_c.borrow_mut().inner_id.take() {
467                                    bb_c.retain_handle(h);
468                                    c.error(i, h);
469                                }
470                                c.error(pid, h);
471                            }) {
472                                bb_src.release_handle(h);
473                            }
474                        } else {
475                            // COMPLETE — complete current window, then self.
476                            let st_c = st.clone();
477                            let _ = em_src.defer(move |c| {
478                                if let Some(i) = st_c.borrow_mut().inner_id.take() {
479                                    c.complete(i);
480                                }
481                                c.complete(pid);
482                            });
483                        }
484                        break;
485                    }
486                    _ => {}
487                }
488            }
489        });
490
491        let src_outcome = ctx.subscribe_to(source, source_sink);
492        if matches!(src_outcome, SubscribeOutcome::Dead { .. }) {
493            let mut s = state.borrow_mut();
494            if !s.terminated {
495                s.terminated = true;
496                let inner = s.inner_id.take();
497                drop(s);
498                if let Some(inner_id) = inner {
499                    core_s.complete(inner_id);
500                }
501                core_s.complete(pid);
502                return;
503            }
504        }
505
506        // --- notifier sink ---
507        // D234: the window roll (complete old → create new → swap
508        // `inner_id` → emit new) is ONE owner-side defer closure, so it
509        // is atomic w.r.t. the FIFO drain and `create_window_node` (a
510        // `CoreFull::register_state`) runs in-wave. A roll posted before
511        // a later source-forward defer is applied first ⇒ that forward's
512        // in-closure `inner_id` read sees the new window. The `new_id`
513        // is consumed entirely inside the closure (drives captured
514        // state); a Core-gone `defer` drops it unrun with nothing
515        // created/retained yet (no leak — that is why the create lives
516        // in the closure, not before it).
517        let st2 = state.clone();
518        let em_not = em.clone();
519        let bb_not: Arc<dyn BindingBoundary> = binding_s.clone();
520        let notifier_sink: Sink = Rc::new(move |msgs| {
521            for m in msgs {
522                // QA P1: per-message terminal gate.
523                if st2.borrow().terminated {
524                    break;
525                }
526                match m.tier() {
527                    3 if m.payload_handle().is_some() => {
528                        let st_c = st2.clone();
529                        let bb_c = bb_not.clone();
530                        let _ = em_not.defer(move |c| {
531                            let (new_id, new_handle) = create_window_node(c, &*bb_c);
532                            let old_inner = st_c.borrow_mut().inner_id.replace(new_id);
533                            // Degenerate `None` (unreachable when not
534                            // terminated): there is no prior window to
535                            // complete; just emit the new one.
536                            if let Some(old) = old_inner {
537                                c.complete(old);
538                            }
539                            c.emit(pid, new_handle);
540                        });
541                    }
542                    5 => {
543                        if let Some(h) = m.payload_handle() {
544                            // Notifier ERROR — error current window + self.
545                            // QA P1: atomic terminal-winner vs the source
546                            // sink's COMPLETE/ERROR (no double-terminal).
547                            let was = std::mem::replace(&mut st2.borrow_mut().terminated, true);
548                            if was {
549                                break;
550                            }
551                            bb_not.retain_handle(h);
552                            let st_c = st2.clone();
553                            let bb_c = bb_not.clone();
554                            if !em_not.defer(move |c| {
555                                if let Some(inner) = st_c.borrow_mut().inner_id.take() {
556                                    bb_c.retain_handle(h);
557                                    c.error(inner, h);
558                                }
559                                c.error(pid, h);
560                            }) {
561                                bb_not.release_handle(h);
562                            }
563                            break;
564                        }
565                        // Notifier COMPLETE: do NOT auto-complete.
566                    }
567                    _ => {}
568                }
569            }
570        });
571
572        let _ = ctx.subscribe_to(notifier, notifier_sink);
573    });
574
575    let fn_id = binding.register_producer_build(build);
576    core.register_producer(fn_id)
577        .expect("window: register_producer failed")
578}
579
580/// Create a new inner window state node and return `(NodeId, HandleId)`.
581/// The `HandleId` represents the inner node as a value (via `intern_node`).
582fn create_window_node(
583    core: &dyn graphrefly_core::CoreFull,
584    binding: &dyn BindingBoundary,
585) -> (NodeId, HandleId) {
586    let inner_id = core
587        .register_state(graphrefly_core::NO_HANDLE, false)
588        .expect("window: register_state for inner node failed");
589    let handle = binding.intern_node(inner_id);
590    (inner_id, handle)
591}
592
593// =========================================================================
594// window_count(source, count) — count-based sub-node splitting
595// =========================================================================
596
597/// Per-window_count-node shared state.
598struct WindowCountState {
599    /// The current inner window's NodeId.
600    inner_id: Option<NodeId>,
601    /// Items forwarded to the current window.
602    counter: usize,
603    terminated: bool,
604}
605
606impl WindowCountState {
607    fn new() -> Self {
608        Self {
609            inner_id: None,
610            counter: 0,
611            terminated: false,
612        }
613    }
614}
615
616/// Like [`window`] but closes window every `count` DATA items.
617///
618/// - On activation: create first window, emit it.
619/// - Source DATA: forward to current window + increment counter. When
620///   counter hits `count`, complete current window, create new one, emit
621///   it, reset counter.
622/// - Source COMPLETE: complete current window (even if < count items),
623///   complete self.
624/// - Source ERROR: error current window, error self.
625///
626/// # Panics
627///
628/// Panics if `count` is 0.
629#[must_use]
630#[allow(clippy::too_many_lines)]
631pub fn window_count(
632    core: &Core,
633    binding: &Arc<dyn ProducerBinding>,
634    source: NodeId,
635    count: usize,
636) -> NodeId {
637    assert!(count > 0, "window_count: count must be > 0");
638
639    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
640        let core_s = ctx.core();
641        let binding_s = ctx.core().binding();
642        let em = ctx.emitter();
643        let pid = ctx.node_id();
644        let state: Rc<RefCell<WindowCountState>> = Rc::new(RefCell::new(WindowCountState::new()));
645        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
646
647        // Create first inner window and emit it.
648        let first_inner = create_window_node(core_s, &*bb);
649        {
650            let mut s = state.borrow_mut();
651            s.inner_id = Some(first_inner.0);
652            s.counter = 0;
653        }
654        core_s.emit(pid, first_inner.1);
655
656        // --- source sink ---
657        // D234: forward + count + window-roll are ONE owner-side defer
658        // per source-DATA. Defer closures run serially in FIFO drain
659        // order, so the counter increment, the threshold check, the
660        // `create_window_node` (a `CoreFull::register_state`), the
661        // `inner_id` swap, the old-window complete and the new-window
662        // emit are all consistent — equivalent to the old
663        // synchronous-under-lock path, just relocated owner-side. The
664        // operator `Mutex` is dropped before every `CoreFull` call
665        // (no operator-lock held across Core re-entry); it is safe to
666        // re-lock because no other defer runs concurrently (serial drain).
667        let st = state.clone();
668        let bb_src: Arc<dyn BindingBoundary> = binding_s.clone();
669        let em_src = em.clone();
670        let source_sink: Sink = Rc::new(move |msgs| {
671            for m in msgs {
672                // QA P1: per-message terminal gate.
673                if st.borrow().terminated {
674                    break;
675                }
676                match m.tier() {
677                    3 => {
678                        if let Some(h) = m.payload_handle() {
679                            bb_src.retain_handle(h);
680                            let st_c = st.clone();
681                            let bb_c = bb_src.clone();
682                            if !em_src.defer(move |c| {
683                                let (inner, roll) = {
684                                    let mut s = st_c.borrow_mut();
685                                    match s.inner_id {
686                                        Some(inner) => {
687                                            s.counter += 1;
688                                            let roll = s.counter == count;
689                                            (Some(inner), roll)
690                                        }
691                                        None => (None, false),
692                                    }
693                                };
694                                let Some(inner) = inner else {
695                                    bb_c.release_handle(h);
696                                    return;
697                                };
698                                c.emit(inner, h);
699                                if roll {
700                                    let (new_id, new_handle) = create_window_node(c, &*bb_c);
701                                    {
702                                        let mut s = st_c.borrow_mut();
703                                        s.inner_id = Some(new_id);
704                                        s.counter = 0;
705                                    }
706                                    c.complete(inner);
707                                    c.emit(pid, new_handle);
708                                }
709                            }) {
710                                bb_src.release_handle(h);
711                            }
712                        }
713                    }
714                    5 => {
715                        // QA P1: atomic terminal-winner.
716                        let was = std::mem::replace(&mut st.borrow_mut().terminated, true);
717                        if was {
718                            break;
719                        }
720                        if let Some(h) = m.payload_handle() {
721                            // ERROR — error current window + self.
722                            bb_src.retain_handle(h);
723                            let st_c = st.clone();
724                            let bb_c = bb_src.clone();
725                            if !em_src.defer(move |c| {
726                                if let Some(inner) = st_c.borrow_mut().inner_id.take() {
727                                    bb_c.retain_handle(h);
728                                    c.error(inner, h);
729                                }
730                                c.error(pid, h);
731                            }) {
732                                bb_src.release_handle(h);
733                            }
734                        } else {
735                            // COMPLETE.
736                            let st_c = st.clone();
737                            let _ = em_src.defer(move |c| {
738                                if let Some(inner) = st_c.borrow_mut().inner_id.take() {
739                                    c.complete(inner);
740                                }
741                                c.complete(pid);
742                            });
743                        }
744                        break;
745                    }
746                    _ => {}
747                }
748            }
749        });
750
751        let outcome = ctx.subscribe_to(source, source_sink);
752        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
753            let mut s = state.borrow_mut();
754            if !s.terminated {
755                s.terminated = true;
756                let inner = s.inner_id.take();
757                drop(s);
758                if let Some(inner_id) = inner {
759                    core_s.complete(inner_id);
760                }
761                core_s.complete(pid);
762            }
763        }
764    });
765
766    let fn_id = binding.register_producer_build(build);
767    core.register_producer(fn_id)
768        .expect("window_count: register_producer failed")
769}