graphrefly_operators/buffer.rs
1// D248: post-S2c the substrate is `!Send + !Sync` single-owner Core; the
2// Sink/TopologySink callbacks were deliberately relaxed to `Arc<dyn Fn>`
3// (dropped `+ Send + Sync`). Rc would suffice and is the architecturally
4// correct type for inherently single-owner sinks — the Arc→Rc cleanup is
5// a separate slice tracked in porting-deferred.md. Until then, `Arc` is
6// over-conservative but correct, and this file's Arc<Sink> sites cite
7// the deliberate D248 relaxation, not a missed Send+Sync bound.
8#![allow(clippy::arc_with_non_send_sync)]
9
10//! Buffer operators — collect and batch reactive values.
11//!
12//! # Operators
13//!
14//! - [`buffer`] — notifier-triggered flush of accumulated DATA handles.
15//! - [`buffer_count`] — fixed-count flush.
16//! - [`window`] — notifier-triggered sub-node splitting.
17//! - [`window_count`] — count-based sub-node splitting.
18
19use std::sync::Arc;
20
21use parking_lot::Mutex;
22
23use graphrefly_core::{BindingBoundary, Core, FnId, HandleId, NodeId, Sink};
24use smallvec::SmallVec;
25
26use crate::producer::{ProducerBinding, ProducerBuildFn, ProducerCtx, SubscribeOutcome};
27
28// =========================================================================
29// buffer(source, notifier) — notifier-triggered flush
30// =========================================================================
31
32/// Per-buffer-node shared state.
33struct BufferState {
34 buf: Vec<HandleId>,
35 terminated: bool,
36 source_completed: bool,
37}
38
39impl BufferState {
40 fn new() -> Self {
41 Self {
42 buf: Vec::new(),
43 terminated: false,
44 source_completed: false,
45 }
46 }
47}
48
49/// Buffers source DATA handles; flushes as a packed array when notifier
50/// emits DATA.
51///
52/// - Source DATA: retain + push to buffer.
53/// - Notifier DATA: pack buffer via `pack_tuple`, emit, clear, release
54/// component handles.
55/// - Source COMPLETE: flush remaining buffer (if non-empty), then complete.
56/// - Either ERROR: terminate immediately, release buffered handles.
57/// - Notifier COMPLETE: does NOT auto-complete (keep buffering; source
58/// COMPLETE triggers final flush).
59#[must_use]
60#[allow(clippy::too_many_lines)]
61pub fn buffer(
62 core: &Core,
63 binding: &Arc<dyn ProducerBinding>,
64 source: NodeId,
65 notifier: NodeId,
66 pack_fn_id: FnId,
67) -> NodeId {
68 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
69 let core_s = ctx.core();
70 let binding_s = ctx.core().binding();
71 let em = ctx.emitter();
72 let pid = ctx.node_id();
73 let state: Arc<Mutex<BufferState>> = Arc::new(Mutex::new(BufferState::new()));
74
75 // --- source sink ---
76 let st = state.clone();
77 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
78 let core_src = em.clone();
79 let source_sink: Sink = Arc::new(move |msgs| {
80 enum Act {
81 Flush(Vec<HandleId>),
82 Complete,
83 Error(HandleId),
84 Release(Vec<HandleId>),
85 }
86 let mut actions: SmallVec<[Act; 2]> = SmallVec::new();
87 {
88 let mut s = st.lock();
89 if s.terminated {
90 return;
91 }
92 for m in msgs {
93 if s.terminated {
94 break;
95 }
96 match m.tier() {
97 3 => {
98 if let Some(h) = m.payload_handle() {
99 bb.retain_handle(h);
100 s.buf.push(h);
101 }
102 }
103 5 => {
104 if let Some(h) = m.payload_handle() {
105 // ERROR — terminate, release buffered
106 s.terminated = true;
107 bb.retain_handle(h);
108 let to_release: Vec<HandleId> = s.buf.drain(..).collect();
109 actions.push(Act::Release(to_release));
110 actions.push(Act::Error(h));
111 } else {
112 // COMPLETE — flush remainder, then complete
113 s.source_completed = true;
114 s.terminated = true;
115 if !s.buf.is_empty() {
116 let flushed: Vec<HandleId> = s.buf.drain(..).collect();
117 actions.push(Act::Flush(flushed));
118 }
119 actions.push(Act::Complete);
120 }
121 }
122 _ => {}
123 }
124 }
125 }
126 for a in actions {
127 match a {
128 Act::Flush(handles) => {
129 let packed = bb.pack_tuple(pack_fn_id, &handles);
130 for h in &handles {
131 bb.release_handle(*h);
132 }
133 core_src.emit_or_defer(pid, packed);
134 }
135 Act::Complete => core_src.complete_or_defer(pid),
136 Act::Error(h) => core_src.error_or_defer(pid, h),
137 Act::Release(handles) => {
138 for h in handles {
139 bb.release_handle(h);
140 }
141 }
142 }
143 }
144 });
145
146 let src_outcome = ctx.subscribe_to(source, source_sink);
147 if matches!(src_outcome, SubscribeOutcome::Dead { .. }) {
148 let mut s = state.lock();
149 if !s.terminated {
150 s.terminated = true;
151 s.source_completed = true;
152 // No buffered data yet at activation time.
153 drop(s);
154 core_s.complete_or_defer(pid);
155 return;
156 }
157 }
158
159 // --- notifier sink ---
160 let st2 = state.clone();
161 let core_n = em.clone();
162 let bb2: Arc<dyn BindingBoundary> = binding_s.clone();
163 let notifier_sink: Sink = Arc::new(move |msgs| {
164 enum Act {
165 Flush(Vec<HandleId>),
166 Error(HandleId),
167 Release(Vec<HandleId>),
168 }
169 let mut actions: SmallVec<[Act; 2]> = SmallVec::new();
170 {
171 let mut s = st2.lock();
172 if s.terminated {
173 return;
174 }
175 for m in msgs {
176 if s.terminated {
177 break;
178 }
179 match m.tier() {
180 3 if m.payload_handle().is_some() && !s.buf.is_empty() => {
181 let flushed: Vec<HandleId> = s.buf.drain(..).collect();
182 actions.push(Act::Flush(flushed));
183 }
184 5 => {
185 if let Some(h) = m.payload_handle() {
186 // Notifier ERROR — terminate, release buffered
187 s.terminated = true;
188 bb2.retain_handle(h);
189 let to_release: Vec<HandleId> = s.buf.drain(..).collect();
190 actions.push(Act::Release(to_release));
191 actions.push(Act::Error(h));
192 }
193 // Notifier COMPLETE: do NOT auto-complete.
194 }
195 _ => {}
196 }
197 }
198 }
199 for a in actions {
200 match a {
201 Act::Flush(handles) => {
202 let packed = bb2.pack_tuple(pack_fn_id, &handles);
203 for h in &handles {
204 bb2.release_handle(*h);
205 }
206 core_n.emit_or_defer(pid, packed);
207 }
208 Act::Error(h) => core_n.error_or_defer(pid, h),
209 Act::Release(handles) => {
210 for h in handles {
211 bb2.release_handle(h);
212 }
213 }
214 }
215 }
216 });
217
218 let not_outcome = ctx.subscribe_to(notifier, notifier_sink);
219 // Dead notifier: ignore — source COMPLETE will handle final flush.
220 let _ = not_outcome;
221 });
222
223 let fn_id = binding.register_producer_build(build);
224 core.register_producer(fn_id)
225 .expect("buffer: register_producer failed")
226}
227
228// =========================================================================
229// buffer_count(source, count) — fixed-size buffer
230// =========================================================================
231
232/// Per-buffer_count-node shared state.
233struct BufferCountState {
234 buf: Vec<HandleId>,
235 terminated: bool,
236}
237
238impl BufferCountState {
239 fn new() -> Self {
240 Self {
241 buf: Vec::new(),
242 terminated: false,
243 }
244 }
245}
246
247/// Fixed-size buffer. Accumulate DATA handles; emit packed array every
248/// `count` items.
249///
250/// - Source DATA: retain + push. When `buf.len() == count`, pack + emit + clear.
251/// - Source COMPLETE: flush remainder (may be < count), then complete.
252/// - Source ERROR: terminate, release buffered.
253///
254/// # Panics
255///
256/// Panics if `count` is 0.
257#[must_use]
258pub fn buffer_count(
259 core: &Core,
260 binding: &Arc<dyn ProducerBinding>,
261 source: NodeId,
262 count: usize,
263 pack_fn_id: FnId,
264) -> NodeId {
265 assert!(count > 0, "buffer_count: count must be > 0");
266
267 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
268 let core_s = ctx.core();
269 let binding_s = ctx.core().binding();
270 let em = ctx.emitter();
271 let pid = ctx.node_id();
272 let state: Arc<Mutex<BufferCountState>> = Arc::new(Mutex::new(BufferCountState::new()));
273
274 let st = state.clone();
275 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
276 let core_src = em.clone();
277 let source_sink: Sink = Arc::new(move |msgs| {
278 enum Act {
279 Flush(Vec<HandleId>),
280 Complete,
281 Error(HandleId),
282 Release(Vec<HandleId>),
283 }
284 let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
285 {
286 let mut s = st.lock();
287 if s.terminated {
288 return;
289 }
290 for m in msgs {
291 if s.terminated {
292 break;
293 }
294 match m.tier() {
295 3 => {
296 if let Some(h) = m.payload_handle() {
297 bb.retain_handle(h);
298 s.buf.push(h);
299 if s.buf.len() == count {
300 let flushed: Vec<HandleId> = s.buf.drain(..).collect();
301 actions.push(Act::Flush(flushed));
302 }
303 }
304 }
305 5 => {
306 if let Some(h) = m.payload_handle() {
307 // ERROR
308 s.terminated = true;
309 bb.retain_handle(h);
310 let to_release: Vec<HandleId> = s.buf.drain(..).collect();
311 actions.push(Act::Release(to_release));
312 actions.push(Act::Error(h));
313 } else {
314 // COMPLETE — flush remainder
315 s.terminated = true;
316 if !s.buf.is_empty() {
317 let flushed: Vec<HandleId> = s.buf.drain(..).collect();
318 actions.push(Act::Flush(flushed));
319 }
320 actions.push(Act::Complete);
321 }
322 }
323 _ => {}
324 }
325 }
326 }
327 for a in actions {
328 match a {
329 Act::Flush(handles) => {
330 let packed = bb.pack_tuple(pack_fn_id, &handles);
331 for h in &handles {
332 bb.release_handle(*h);
333 }
334 core_src.emit_or_defer(pid, packed);
335 }
336 Act::Complete => core_src.complete_or_defer(pid),
337 Act::Error(h) => core_src.error_or_defer(pid, h),
338 Act::Release(handles) => {
339 for h in handles {
340 bb.release_handle(h);
341 }
342 }
343 }
344 }
345 });
346
347 let outcome = ctx.subscribe_to(source, source_sink);
348 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
349 let mut s = state.lock();
350 if !s.terminated {
351 s.terminated = true;
352 drop(s);
353 core_s.complete_or_defer(pid);
354 }
355 }
356 });
357
358 let fn_id = binding.register_producer_build(build);
359 core.register_producer(fn_id)
360 .expect("buffer_count: register_producer failed")
361}
362
363// =========================================================================
364// window(source, notifier) — notifier-triggered sub-node splitting
365// =========================================================================
366
367/// Per-window-node shared state.
368struct WindowState {
369 /// The current inner window's NodeId.
370 inner_id: Option<NodeId>,
371 terminated: bool,
372}
373
374impl WindowState {
375 fn new() -> Self {
376 Self {
377 inner_id: None,
378 terminated: false,
379 }
380 }
381}
382
383/// Splits source into sub-nodes triggered by notifier. Each "window" is
384/// a fresh state node created via `Core::register_state()`. The operator
385/// emits the inner node's `NodeId` as a handle via
386/// `binding.intern_node(inner_id)`.
387///
388/// - On activation: create first inner window node, emit it.
389/// - Source DATA: forward to current inner window via
390/// `core.emit_or_defer(inner_id, handle)`.
391/// - Notifier DATA: complete current window, create new window, emit it.
392/// - Source COMPLETE: complete current window, then complete self.
393/// - Either ERROR: error current window + error self.
394/// - Notifier COMPLETE: do NOT auto-complete.
395#[must_use]
396#[allow(clippy::too_many_lines)]
397pub fn window(
398 core: &Core,
399 binding: &Arc<dyn ProducerBinding>,
400 source: NodeId,
401 notifier: NodeId,
402) -> NodeId {
403 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
404 let core_s = ctx.core();
405 let binding_s = ctx.core().binding();
406 let em = ctx.emitter();
407 let pid = ctx.node_id();
408 let state: Arc<Mutex<WindowState>> = Arc::new(Mutex::new(WindowState::new()));
409 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
410
411 // Create first inner window node and emit it.
412 let first_inner = create_window_node(core_s, &*bb);
413 {
414 let mut s = state.lock();
415 s.inner_id = Some(first_inner.0);
416 }
417 core_s.emit_or_defer(pid, first_inner.1);
418
419 // --- source sink ---
420 // D234: the inner-window selector `s.inner_id` MUST be read
421 // INSIDE the owner-serialized defer closures (never at sink-fire
422 // time) so a source-DATA firing between a notifier-DATA and its
423 // window-roll defer still routes to the correct window. Mailbox
424 // FIFO = arrival order ⇒ a roll posted before a forward is
425 // applied first, so the in-closure `inner_id` read sees the new
426 // window. `terminated` stays a fire-time monotonic gate; retains
427 // happen at fire time (Core owns the share for the emit) and are
428 // released if `em.defer` reports the Core gone (F2 contract).
429 let st = state.clone();
430 let bb_src: Arc<dyn BindingBoundary> = binding_s.clone();
431 let em_src = em.clone();
432 let source_sink: Sink = Arc::new(move |msgs| {
433 for m in msgs {
434 // QA P1: per-message terminal gate (restores the retired
435 // `if s.terminated { break }`) — a batched
436 // `[COMPLETE, DATA]` must NOT forward DATA after the
437 // COMPLETE (R2.6.4 / no-data-after-terminal).
438 if st.lock().terminated {
439 break;
440 }
441 match m.tier() {
442 3 => {
443 if let Some(h) = m.payload_handle() {
444 bb_src.retain_handle(h);
445 let st_c = st.clone();
446 let bb_c = bb_src.clone();
447 if !em_src.defer(move |c| {
448 let inner = st_c.lock().inner_id;
449 match inner {
450 Some(i) => c.emit(i, h),
451 None => bb_c.release_handle(h),
452 }
453 }) {
454 bb_src.release_handle(h);
455 }
456 }
457 }
458 5 => {
459 // QA P1: atomic terminal-winner — only the first
460 // terminal (across THIS sink and the notifier
461 // sink) defers the terminal cascade. Without
462 // this, source-COMPLETE and notifier-ERROR each
463 // pass their fire-time check before either defer
464 // runs → double terminal on `pid`.
465 let was = std::mem::replace(&mut st.lock().terminated, true);
466 if was {
467 break;
468 }
469 if let Some(h) = m.payload_handle() {
470 // ERROR — error current window + error self.
471 bb_src.retain_handle(h);
472 let st_c = st.clone();
473 let bb_c = bb_src.clone();
474 if !em_src.defer(move |c| {
475 if let Some(i) = st_c.lock().inner_id.take() {
476 bb_c.retain_handle(h);
477 c.error(i, h);
478 }
479 c.error(pid, h);
480 }) {
481 bb_src.release_handle(h);
482 }
483 } else {
484 // COMPLETE — complete current window, then self.
485 let st_c = st.clone();
486 let _ = em_src.defer(move |c| {
487 if let Some(i) = st_c.lock().inner_id.take() {
488 c.complete(i);
489 }
490 c.complete(pid);
491 });
492 }
493 break;
494 }
495 _ => {}
496 }
497 }
498 });
499
500 let src_outcome = ctx.subscribe_to(source, source_sink);
501 if matches!(src_outcome, SubscribeOutcome::Dead { .. }) {
502 let mut s = state.lock();
503 if !s.terminated {
504 s.terminated = true;
505 let inner = s.inner_id.take();
506 drop(s);
507 if let Some(inner_id) = inner {
508 core_s.complete_or_defer(inner_id);
509 }
510 core_s.complete_or_defer(pid);
511 return;
512 }
513 }
514
515 // --- notifier sink ---
516 // D234: the window roll (complete old → create new → swap
517 // `inner_id` → emit new) is ONE owner-side defer closure, so it
518 // is atomic w.r.t. the FIFO drain and `create_window_node` (a
519 // `CoreFull::register_state`) runs in-wave. A roll posted before
520 // a later source-forward defer is applied first ⇒ that forward's
521 // in-closure `inner_id` read sees the new window. The `new_id`
522 // is consumed entirely inside the closure (drives captured
523 // state); a Core-gone `defer` drops it unrun with nothing
524 // created/retained yet (no leak — that is why the create lives
525 // in the closure, not before it).
526 let st2 = state.clone();
527 let em_not = em.clone();
528 let bb_not: Arc<dyn BindingBoundary> = binding_s.clone();
529 let notifier_sink: Sink = Arc::new(move |msgs| {
530 for m in msgs {
531 // QA P1: per-message terminal gate.
532 if st2.lock().terminated {
533 break;
534 }
535 match m.tier() {
536 3 if m.payload_handle().is_some() => {
537 let st_c = st2.clone();
538 let bb_c = bb_not.clone();
539 let _ = em_not.defer(move |c| {
540 let (new_id, new_handle) = create_window_node(c, &*bb_c);
541 let old_inner = st_c.lock().inner_id.replace(new_id);
542 // Degenerate `None` (unreachable when not
543 // terminated): there is no prior window to
544 // complete; just emit the new one.
545 if let Some(old) = old_inner {
546 c.complete(old);
547 }
548 c.emit(pid, new_handle);
549 });
550 }
551 5 => {
552 if let Some(h) = m.payload_handle() {
553 // Notifier ERROR — error current window + self.
554 // QA P1: atomic terminal-winner vs the source
555 // sink's COMPLETE/ERROR (no double-terminal).
556 let was = std::mem::replace(&mut st2.lock().terminated, true);
557 if was {
558 break;
559 }
560 bb_not.retain_handle(h);
561 let st_c = st2.clone();
562 let bb_c = bb_not.clone();
563 if !em_not.defer(move |c| {
564 if let Some(inner) = st_c.lock().inner_id.take() {
565 bb_c.retain_handle(h);
566 c.error(inner, h);
567 }
568 c.error(pid, h);
569 }) {
570 bb_not.release_handle(h);
571 }
572 break;
573 }
574 // Notifier COMPLETE: do NOT auto-complete.
575 }
576 _ => {}
577 }
578 }
579 });
580
581 let _ = ctx.subscribe_to(notifier, notifier_sink);
582 });
583
584 let fn_id = binding.register_producer_build(build);
585 core.register_producer(fn_id)
586 .expect("window: register_producer failed")
587}
588
589/// Create a new inner window state node and return `(NodeId, HandleId)`.
590/// The `HandleId` represents the inner node as a value (via `intern_node`).
591fn create_window_node(
592 core: &dyn graphrefly_core::CoreFull,
593 binding: &dyn BindingBoundary,
594) -> (NodeId, HandleId) {
595 let inner_id = core
596 .register_state(graphrefly_core::NO_HANDLE, false)
597 .expect("window: register_state for inner node failed");
598 let handle = binding.intern_node(inner_id);
599 (inner_id, handle)
600}
601
602// =========================================================================
603// window_count(source, count) — count-based sub-node splitting
604// =========================================================================
605
606/// Per-window_count-node shared state.
607struct WindowCountState {
608 /// The current inner window's NodeId.
609 inner_id: Option<NodeId>,
610 /// Items forwarded to the current window.
611 counter: usize,
612 terminated: bool,
613}
614
615impl WindowCountState {
616 fn new() -> Self {
617 Self {
618 inner_id: None,
619 counter: 0,
620 terminated: false,
621 }
622 }
623}
624
625/// Like [`window`] but closes window every `count` DATA items.
626///
627/// - On activation: create first window, emit it.
628/// - Source DATA: forward to current window + increment counter. When
629/// counter hits `count`, complete current window, create new one, emit
630/// it, reset counter.
631/// - Source COMPLETE: complete current window (even if < count items),
632/// complete self.
633/// - Source ERROR: error current window, error self.
634///
635/// # Panics
636///
637/// Panics if `count` is 0.
638#[must_use]
639#[allow(clippy::too_many_lines)]
640pub fn window_count(
641 core: &Core,
642 binding: &Arc<dyn ProducerBinding>,
643 source: NodeId,
644 count: usize,
645) -> NodeId {
646 assert!(count > 0, "window_count: count must be > 0");
647
648 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
649 let core_s = ctx.core();
650 let binding_s = ctx.core().binding();
651 let em = ctx.emitter();
652 let pid = ctx.node_id();
653 let state: Arc<Mutex<WindowCountState>> = Arc::new(Mutex::new(WindowCountState::new()));
654 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
655
656 // Create first inner window and emit it.
657 let first_inner = create_window_node(core_s, &*bb);
658 {
659 let mut s = state.lock();
660 s.inner_id = Some(first_inner.0);
661 s.counter = 0;
662 }
663 core_s.emit_or_defer(pid, first_inner.1);
664
665 // --- source sink ---
666 // D234: forward + count + window-roll are ONE owner-side defer
667 // per source-DATA. Defer closures run serially in FIFO drain
668 // order, so the counter increment, the threshold check, the
669 // `create_window_node` (a `CoreFull::register_state`), the
670 // `inner_id` swap, the old-window complete and the new-window
671 // emit are all consistent — equivalent to the old
672 // synchronous-under-lock path, just relocated owner-side. The
673 // operator `Mutex` is dropped before every `CoreFull` call
674 // (no operator-lock held across Core re-entry); it is safe to
675 // re-lock because no other defer runs concurrently (serial drain).
676 let st = state.clone();
677 let bb_src: Arc<dyn BindingBoundary> = binding_s.clone();
678 let em_src = em.clone();
679 let source_sink: Sink = Arc::new(move |msgs| {
680 for m in msgs {
681 // QA P1: per-message terminal gate.
682 if st.lock().terminated {
683 break;
684 }
685 match m.tier() {
686 3 => {
687 if let Some(h) = m.payload_handle() {
688 bb_src.retain_handle(h);
689 let st_c = st.clone();
690 let bb_c = bb_src.clone();
691 if !em_src.defer(move |c| {
692 let (inner, roll) = {
693 let mut s = st_c.lock();
694 match s.inner_id {
695 Some(inner) => {
696 s.counter += 1;
697 let roll = s.counter == count;
698 (Some(inner), roll)
699 }
700 None => (None, false),
701 }
702 };
703 let Some(inner) = inner else {
704 bb_c.release_handle(h);
705 return;
706 };
707 c.emit(inner, h);
708 if roll {
709 let (new_id, new_handle) = create_window_node(c, &*bb_c);
710 {
711 let mut s = st_c.lock();
712 s.inner_id = Some(new_id);
713 s.counter = 0;
714 }
715 c.complete(inner);
716 c.emit(pid, new_handle);
717 }
718 }) {
719 bb_src.release_handle(h);
720 }
721 }
722 }
723 5 => {
724 // QA P1: atomic terminal-winner.
725 let was = std::mem::replace(&mut st.lock().terminated, true);
726 if was {
727 break;
728 }
729 if let Some(h) = m.payload_handle() {
730 // ERROR — error current window + self.
731 bb_src.retain_handle(h);
732 let st_c = st.clone();
733 let bb_c = bb_src.clone();
734 if !em_src.defer(move |c| {
735 if let Some(inner) = st_c.lock().inner_id.take() {
736 bb_c.retain_handle(h);
737 c.error(inner, h);
738 }
739 c.error(pid, h);
740 }) {
741 bb_src.release_handle(h);
742 }
743 } else {
744 // COMPLETE.
745 let st_c = st.clone();
746 let _ = em_src.defer(move |c| {
747 if let Some(inner) = st_c.lock().inner_id.take() {
748 c.complete(inner);
749 }
750 c.complete(pid);
751 });
752 }
753 break;
754 }
755 _ => {}
756 }
757 }
758 });
759
760 let outcome = ctx.subscribe_to(source, source_sink);
761 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
762 let mut s = state.lock();
763 if !s.terminated {
764 s.terminated = true;
765 let inner = s.inner_id.take();
766 drop(s);
767 if let Some(inner_id) = inner {
768 core_s.complete_or_defer(inner_id);
769 }
770 core_s.complete_or_defer(pid);
771 }
772 }
773 });
774
775 let fn_id = binding.register_producer_build(build);
776 core.register_producer(fn_id)
777 .expect("window_count: register_producer failed")
778}