graphrefly_operators/buffer.rs
1//! Buffer operators — collect and batch reactive values.
2//!
3//! # Operators
4//!
5//! - [`buffer`] — notifier-triggered flush of accumulated DATA handles.
6//! - [`buffer_count`] — fixed-count flush.
7//! - [`window`] — notifier-triggered sub-node splitting.
8//! - [`window_count`] — count-based sub-node splitting.
9
10use std::cell::RefCell;
11use std::rc::Rc;
12use std::sync::Arc;
13
14use graphrefly_core::{BindingBoundary, Core, FnId, HandleId, NodeId, Sink};
15use smallvec::SmallVec;
16
17use crate::producer::{ProducerBinding, ProducerBuildFn, ProducerCtx, SubscribeOutcome};
18
19// =========================================================================
20// buffer(source, notifier) — notifier-triggered flush
21// =========================================================================
22
23/// Per-buffer-node shared state.
24struct BufferState {
25 buf: Vec<HandleId>,
26 terminated: bool,
27 source_completed: bool,
28}
29
30impl BufferState {
31 fn new() -> Self {
32 Self {
33 buf: Vec::new(),
34 terminated: false,
35 source_completed: false,
36 }
37 }
38}
39
40/// Buffers source DATA handles; flushes as a packed array when notifier
41/// emits DATA.
42///
43/// - Source DATA: retain + push to buffer.
44/// - Notifier DATA: pack buffer via `pack_tuple`, emit, clear, release
45/// component handles.
46/// - Source COMPLETE: flush remaining buffer (if non-empty), then complete.
47/// - Either ERROR: terminate immediately, release buffered handles.
48/// - Notifier COMPLETE: does NOT auto-complete (keep buffering; source
49/// COMPLETE triggers final flush).
50#[must_use]
51#[allow(clippy::too_many_lines)]
52pub fn buffer(
53 core: &Core,
54 binding: &Arc<dyn ProducerBinding>,
55 source: NodeId,
56 notifier: NodeId,
57 pack_fn_id: FnId,
58) -> NodeId {
59 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
60 let core_s = ctx.core();
61 let binding_s = ctx.core().binding();
62 let em = ctx.emitter();
63 let pid = ctx.node_id();
64 let state: Rc<RefCell<BufferState>> = Rc::new(RefCell::new(BufferState::new()));
65
66 // --- source sink ---
67 let st = state.clone();
68 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
69 let core_src = em.clone();
70 let source_sink: Sink = Rc::new(move |msgs| {
71 enum Act {
72 Flush(Vec<HandleId>),
73 Complete,
74 Error(HandleId),
75 Release(Vec<HandleId>),
76 }
77 let mut actions: SmallVec<[Act; 2]> = SmallVec::new();
78 {
79 let mut s = st.borrow_mut();
80 if s.terminated {
81 return;
82 }
83 for m in msgs {
84 if s.terminated {
85 break;
86 }
87 match m.tier() {
88 3 => {
89 if let Some(h) = m.payload_handle() {
90 bb.retain_handle(h);
91 s.buf.push(h);
92 }
93 }
94 5 => {
95 if let Some(h) = m.payload_handle() {
96 // ERROR — terminate, release buffered
97 s.terminated = true;
98 bb.retain_handle(h);
99 let to_release: Vec<HandleId> = s.buf.drain(..).collect();
100 actions.push(Act::Release(to_release));
101 actions.push(Act::Error(h));
102 } else {
103 // COMPLETE — flush remainder, then complete
104 s.source_completed = true;
105 s.terminated = true;
106 if !s.buf.is_empty() {
107 let flushed: Vec<HandleId> = s.buf.drain(..).collect();
108 actions.push(Act::Flush(flushed));
109 }
110 actions.push(Act::Complete);
111 }
112 }
113 _ => {}
114 }
115 }
116 }
117 for a in actions {
118 match a {
119 Act::Flush(handles) => {
120 let packed = bb.pack_tuple(pack_fn_id, &handles);
121 for h in &handles {
122 bb.release_handle(*h);
123 }
124 core_src.emit(pid, packed);
125 }
126 Act::Complete => core_src.complete(pid),
127 Act::Error(h) => core_src.error(pid, h),
128 Act::Release(handles) => {
129 for h in handles {
130 bb.release_handle(h);
131 }
132 }
133 }
134 }
135 });
136
137 let src_outcome = ctx.subscribe_to(source, source_sink);
138 if matches!(src_outcome, SubscribeOutcome::Dead { .. }) {
139 let mut s = state.borrow_mut();
140 if !s.terminated {
141 s.terminated = true;
142 s.source_completed = true;
143 // No buffered data yet at activation time.
144 drop(s);
145 core_s.complete(pid);
146 return;
147 }
148 }
149
150 // --- notifier sink ---
151 let st2 = state.clone();
152 let core_n = em.clone();
153 let bb2: Arc<dyn BindingBoundary> = binding_s.clone();
154 let notifier_sink: Sink = Rc::new(move |msgs| {
155 enum Act {
156 Flush(Vec<HandleId>),
157 Error(HandleId),
158 Release(Vec<HandleId>),
159 }
160 let mut actions: SmallVec<[Act; 2]> = SmallVec::new();
161 {
162 let mut s = st2.borrow_mut();
163 if s.terminated {
164 return;
165 }
166 for m in msgs {
167 if s.terminated {
168 break;
169 }
170 match m.tier() {
171 3 if m.payload_handle().is_some() && !s.buf.is_empty() => {
172 let flushed: Vec<HandleId> = s.buf.drain(..).collect();
173 actions.push(Act::Flush(flushed));
174 }
175 5 => {
176 if let Some(h) = m.payload_handle() {
177 // Notifier ERROR — terminate, release buffered
178 s.terminated = true;
179 bb2.retain_handle(h);
180 let to_release: Vec<HandleId> = s.buf.drain(..).collect();
181 actions.push(Act::Release(to_release));
182 actions.push(Act::Error(h));
183 }
184 // Notifier COMPLETE: do NOT auto-complete.
185 }
186 _ => {}
187 }
188 }
189 }
190 for a in actions {
191 match a {
192 Act::Flush(handles) => {
193 let packed = bb2.pack_tuple(pack_fn_id, &handles);
194 for h in &handles {
195 bb2.release_handle(*h);
196 }
197 core_n.emit(pid, packed);
198 }
199 Act::Error(h) => core_n.error(pid, h),
200 Act::Release(handles) => {
201 for h in handles {
202 bb2.release_handle(h);
203 }
204 }
205 }
206 }
207 });
208
209 let not_outcome = ctx.subscribe_to(notifier, notifier_sink);
210 // Dead notifier: ignore — source COMPLETE will handle final flush.
211 let _ = not_outcome;
212 });
213
214 let fn_id = binding.register_producer_build(build);
215 core.register_producer(fn_id)
216 .expect("buffer: register_producer failed")
217}
218
219// =========================================================================
220// buffer_count(source, count) — fixed-size buffer
221// =========================================================================
222
223/// Per-buffer_count-node shared state.
224struct BufferCountState {
225 buf: Vec<HandleId>,
226 terminated: bool,
227}
228
229impl BufferCountState {
230 fn new() -> Self {
231 Self {
232 buf: Vec::new(),
233 terminated: false,
234 }
235 }
236}
237
238/// Fixed-size buffer. Accumulate DATA handles; emit packed array every
239/// `count` items.
240///
241/// - Source DATA: retain + push. When `buf.len() == count`, pack + emit + clear.
242/// - Source COMPLETE: flush remainder (may be < count), then complete.
243/// - Source ERROR: terminate, release buffered.
244///
245/// # Panics
246///
247/// Panics if `count` is 0.
248#[must_use]
249pub fn buffer_count(
250 core: &Core,
251 binding: &Arc<dyn ProducerBinding>,
252 source: NodeId,
253 count: usize,
254 pack_fn_id: FnId,
255) -> NodeId {
256 assert!(count > 0, "buffer_count: count must be > 0");
257
258 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
259 let core_s = ctx.core();
260 let binding_s = ctx.core().binding();
261 let em = ctx.emitter();
262 let pid = ctx.node_id();
263 let state: Rc<RefCell<BufferCountState>> = Rc::new(RefCell::new(BufferCountState::new()));
264
265 let st = state.clone();
266 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
267 let core_src = em.clone();
268 let source_sink: Sink = Rc::new(move |msgs| {
269 enum Act {
270 Flush(Vec<HandleId>),
271 Complete,
272 Error(HandleId),
273 Release(Vec<HandleId>),
274 }
275 let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
276 {
277 let mut s = st.borrow_mut();
278 if s.terminated {
279 return;
280 }
281 for m in msgs {
282 if s.terminated {
283 break;
284 }
285 match m.tier() {
286 3 => {
287 if let Some(h) = m.payload_handle() {
288 bb.retain_handle(h);
289 s.buf.push(h);
290 if s.buf.len() == count {
291 let flushed: Vec<HandleId> = s.buf.drain(..).collect();
292 actions.push(Act::Flush(flushed));
293 }
294 }
295 }
296 5 => {
297 if let Some(h) = m.payload_handle() {
298 // ERROR
299 s.terminated = true;
300 bb.retain_handle(h);
301 let to_release: Vec<HandleId> = s.buf.drain(..).collect();
302 actions.push(Act::Release(to_release));
303 actions.push(Act::Error(h));
304 } else {
305 // COMPLETE — flush remainder
306 s.terminated = true;
307 if !s.buf.is_empty() {
308 let flushed: Vec<HandleId> = s.buf.drain(..).collect();
309 actions.push(Act::Flush(flushed));
310 }
311 actions.push(Act::Complete);
312 }
313 }
314 _ => {}
315 }
316 }
317 }
318 for a in actions {
319 match a {
320 Act::Flush(handles) => {
321 let packed = bb.pack_tuple(pack_fn_id, &handles);
322 for h in &handles {
323 bb.release_handle(*h);
324 }
325 core_src.emit(pid, packed);
326 }
327 Act::Complete => core_src.complete(pid),
328 Act::Error(h) => core_src.error(pid, h),
329 Act::Release(handles) => {
330 for h in handles {
331 bb.release_handle(h);
332 }
333 }
334 }
335 }
336 });
337
338 let outcome = ctx.subscribe_to(source, source_sink);
339 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
340 let mut s = state.borrow_mut();
341 if !s.terminated {
342 s.terminated = true;
343 drop(s);
344 core_s.complete(pid);
345 }
346 }
347 });
348
349 let fn_id = binding.register_producer_build(build);
350 core.register_producer(fn_id)
351 .expect("buffer_count: register_producer failed")
352}
353
354// =========================================================================
355// window(source, notifier) — notifier-triggered sub-node splitting
356// =========================================================================
357
358/// Per-window-node shared state.
359struct WindowState {
360 /// The current inner window's NodeId.
361 inner_id: Option<NodeId>,
362 terminated: bool,
363}
364
365impl WindowState {
366 fn new() -> Self {
367 Self {
368 inner_id: None,
369 terminated: false,
370 }
371 }
372}
373
374/// Splits source into sub-nodes triggered by notifier. Each "window" is
375/// a fresh state node created via `Core::register_state()`. The operator
376/// emits the inner node's `NodeId` as a handle via
377/// `binding.intern_node(inner_id)`.
378///
379/// - On activation: create first inner window node, emit it.
380/// - Source DATA: forward to current inner window via
381/// `core.emit(inner_id, handle)`.
382/// - Notifier DATA: complete current window, create new window, emit it.
383/// - Source COMPLETE: complete current window, then complete self.
384/// - Either ERROR: error current window + error self.
385/// - Notifier COMPLETE: do NOT auto-complete.
386#[must_use]
387#[allow(clippy::too_many_lines)]
388pub fn window(
389 core: &Core,
390 binding: &Arc<dyn ProducerBinding>,
391 source: NodeId,
392 notifier: NodeId,
393) -> NodeId {
394 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
395 let core_s = ctx.core();
396 let binding_s = ctx.core().binding();
397 let em = ctx.emitter();
398 let pid = ctx.node_id();
399 let state: Rc<RefCell<WindowState>> = Rc::new(RefCell::new(WindowState::new()));
400 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
401
402 // Create first inner window node and emit it.
403 let first_inner = create_window_node(core_s, &*bb);
404 {
405 let mut s = state.borrow_mut();
406 s.inner_id = Some(first_inner.0);
407 }
408 core_s.emit(pid, first_inner.1);
409
410 // --- source sink ---
411 // D234: the inner-window selector `s.inner_id` MUST be read
412 // INSIDE the owner-serialized defer closures (never at sink-fire
413 // time) so a source-DATA firing between a notifier-DATA and its
414 // window-roll defer still routes to the correct window. Mailbox
415 // FIFO = arrival order ⇒ a roll posted before a forward is
416 // applied first, so the in-closure `inner_id` read sees the new
417 // window. `terminated` stays a fire-time monotonic gate; retains
418 // happen at fire time (Core owns the share for the emit) and are
419 // released if `em.defer` reports the Core gone (F2 contract).
420 let st = state.clone();
421 let bb_src: Arc<dyn BindingBoundary> = binding_s.clone();
422 let em_src = em.clone();
423 let source_sink: Sink = Rc::new(move |msgs| {
424 for m in msgs {
425 // QA P1: per-message terminal gate (restores the retired
426 // `if s.terminated { break }`) — a batched
427 // `[COMPLETE, DATA]` must NOT forward DATA after the
428 // COMPLETE (R2.6.4 / no-data-after-terminal).
429 if st.borrow().terminated {
430 break;
431 }
432 match m.tier() {
433 3 => {
434 if let Some(h) = m.payload_handle() {
435 bb_src.retain_handle(h);
436 let st_c = st.clone();
437 let bb_c = bb_src.clone();
438 if !em_src.defer(move |c| {
439 let inner = st_c.borrow_mut().inner_id;
440 match inner {
441 Some(i) => c.emit(i, h),
442 None => bb_c.release_handle(h),
443 }
444 }) {
445 bb_src.release_handle(h);
446 }
447 }
448 }
449 5 => {
450 // QA P1: atomic terminal-winner — only the first
451 // terminal (across THIS sink and the notifier
452 // sink) defers the terminal cascade. Without
453 // this, source-COMPLETE and notifier-ERROR each
454 // pass their fire-time check before either defer
455 // runs → double terminal on `pid`.
456 let was = std::mem::replace(&mut st.borrow_mut().terminated, true);
457 if was {
458 break;
459 }
460 if let Some(h) = m.payload_handle() {
461 // ERROR — error current window + error self.
462 bb_src.retain_handle(h);
463 let st_c = st.clone();
464 let bb_c = bb_src.clone();
465 if !em_src.defer(move |c| {
466 if let Some(i) = st_c.borrow_mut().inner_id.take() {
467 bb_c.retain_handle(h);
468 c.error(i, h);
469 }
470 c.error(pid, h);
471 }) {
472 bb_src.release_handle(h);
473 }
474 } else {
475 // COMPLETE — complete current window, then self.
476 let st_c = st.clone();
477 let _ = em_src.defer(move |c| {
478 if let Some(i) = st_c.borrow_mut().inner_id.take() {
479 c.complete(i);
480 }
481 c.complete(pid);
482 });
483 }
484 break;
485 }
486 _ => {}
487 }
488 }
489 });
490
491 let src_outcome = ctx.subscribe_to(source, source_sink);
492 if matches!(src_outcome, SubscribeOutcome::Dead { .. }) {
493 let mut s = state.borrow_mut();
494 if !s.terminated {
495 s.terminated = true;
496 let inner = s.inner_id.take();
497 drop(s);
498 if let Some(inner_id) = inner {
499 core_s.complete(inner_id);
500 }
501 core_s.complete(pid);
502 return;
503 }
504 }
505
506 // --- notifier sink ---
507 // D234: the window roll (complete old → create new → swap
508 // `inner_id` → emit new) is ONE owner-side defer closure, so it
509 // is atomic w.r.t. the FIFO drain and `create_window_node` (a
510 // `CoreFull::register_state`) runs in-wave. A roll posted before
511 // a later source-forward defer is applied first ⇒ that forward's
512 // in-closure `inner_id` read sees the new window. The `new_id`
513 // is consumed entirely inside the closure (drives captured
514 // state); a Core-gone `defer` drops it unrun with nothing
515 // created/retained yet (no leak — that is why the create lives
516 // in the closure, not before it).
517 let st2 = state.clone();
518 let em_not = em.clone();
519 let bb_not: Arc<dyn BindingBoundary> = binding_s.clone();
520 let notifier_sink: Sink = Rc::new(move |msgs| {
521 for m in msgs {
522 // QA P1: per-message terminal gate.
523 if st2.borrow().terminated {
524 break;
525 }
526 match m.tier() {
527 3 if m.payload_handle().is_some() => {
528 let st_c = st2.clone();
529 let bb_c = bb_not.clone();
530 let _ = em_not.defer(move |c| {
531 let (new_id, new_handle) = create_window_node(c, &*bb_c);
532 let old_inner = st_c.borrow_mut().inner_id.replace(new_id);
533 // Degenerate `None` (unreachable when not
534 // terminated): there is no prior window to
535 // complete; just emit the new one.
536 if let Some(old) = old_inner {
537 c.complete(old);
538 }
539 c.emit(pid, new_handle);
540 });
541 }
542 5 => {
543 if let Some(h) = m.payload_handle() {
544 // Notifier ERROR — error current window + self.
545 // QA P1: atomic terminal-winner vs the source
546 // sink's COMPLETE/ERROR (no double-terminal).
547 let was = std::mem::replace(&mut st2.borrow_mut().terminated, true);
548 if was {
549 break;
550 }
551 bb_not.retain_handle(h);
552 let st_c = st2.clone();
553 let bb_c = bb_not.clone();
554 if !em_not.defer(move |c| {
555 if let Some(inner) = st_c.borrow_mut().inner_id.take() {
556 bb_c.retain_handle(h);
557 c.error(inner, h);
558 }
559 c.error(pid, h);
560 }) {
561 bb_not.release_handle(h);
562 }
563 break;
564 }
565 // Notifier COMPLETE: do NOT auto-complete.
566 }
567 _ => {}
568 }
569 }
570 });
571
572 let _ = ctx.subscribe_to(notifier, notifier_sink);
573 });
574
575 let fn_id = binding.register_producer_build(build);
576 core.register_producer(fn_id)
577 .expect("window: register_producer failed")
578}
579
580/// Create a new inner window state node and return `(NodeId, HandleId)`.
581/// The `HandleId` represents the inner node as a value (via `intern_node`).
582fn create_window_node(
583 core: &dyn graphrefly_core::CoreFull,
584 binding: &dyn BindingBoundary,
585) -> (NodeId, HandleId) {
586 let inner_id = core
587 .register_state(graphrefly_core::NO_HANDLE, false)
588 .expect("window: register_state for inner node failed");
589 let handle = binding.intern_node(inner_id);
590 (inner_id, handle)
591}
592
593// =========================================================================
594// window_count(source, count) — count-based sub-node splitting
595// =========================================================================
596
597/// Per-window_count-node shared state.
598struct WindowCountState {
599 /// The current inner window's NodeId.
600 inner_id: Option<NodeId>,
601 /// Items forwarded to the current window.
602 counter: usize,
603 terminated: bool,
604}
605
606impl WindowCountState {
607 fn new() -> Self {
608 Self {
609 inner_id: None,
610 counter: 0,
611 terminated: false,
612 }
613 }
614}
615
616/// Like [`window`] but closes window every `count` DATA items.
617///
618/// - On activation: create first window, emit it.
619/// - Source DATA: forward to current window + increment counter. When
620/// counter hits `count`, complete current window, create new one, emit
621/// it, reset counter.
622/// - Source COMPLETE: complete current window (even if < count items),
623/// complete self.
624/// - Source ERROR: error current window, error self.
625///
626/// # Panics
627///
628/// Panics if `count` is 0.
629#[must_use]
630#[allow(clippy::too_many_lines)]
631pub fn window_count(
632 core: &Core,
633 binding: &Arc<dyn ProducerBinding>,
634 source: NodeId,
635 count: usize,
636) -> NodeId {
637 assert!(count > 0, "window_count: count must be > 0");
638
639 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
640 let core_s = ctx.core();
641 let binding_s = ctx.core().binding();
642 let em = ctx.emitter();
643 let pid = ctx.node_id();
644 let state: Rc<RefCell<WindowCountState>> = Rc::new(RefCell::new(WindowCountState::new()));
645 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
646
647 // Create first inner window and emit it.
648 let first_inner = create_window_node(core_s, &*bb);
649 {
650 let mut s = state.borrow_mut();
651 s.inner_id = Some(first_inner.0);
652 s.counter = 0;
653 }
654 core_s.emit(pid, first_inner.1);
655
656 // --- source sink ---
657 // D234: forward + count + window-roll are ONE owner-side defer
658 // per source-DATA. Defer closures run serially in FIFO drain
659 // order, so the counter increment, the threshold check, the
660 // `create_window_node` (a `CoreFull::register_state`), the
661 // `inner_id` swap, the old-window complete and the new-window
662 // emit are all consistent — equivalent to the old
663 // synchronous-under-lock path, just relocated owner-side. The
664 // operator `Mutex` is dropped before every `CoreFull` call
665 // (no operator-lock held across Core re-entry); it is safe to
666 // re-lock because no other defer runs concurrently (serial drain).
667 let st = state.clone();
668 let bb_src: Arc<dyn BindingBoundary> = binding_s.clone();
669 let em_src = em.clone();
670 let source_sink: Sink = Rc::new(move |msgs| {
671 for m in msgs {
672 // QA P1: per-message terminal gate.
673 if st.borrow().terminated {
674 break;
675 }
676 match m.tier() {
677 3 => {
678 if let Some(h) = m.payload_handle() {
679 bb_src.retain_handle(h);
680 let st_c = st.clone();
681 let bb_c = bb_src.clone();
682 if !em_src.defer(move |c| {
683 let (inner, roll) = {
684 let mut s = st_c.borrow_mut();
685 match s.inner_id {
686 Some(inner) => {
687 s.counter += 1;
688 let roll = s.counter == count;
689 (Some(inner), roll)
690 }
691 None => (None, false),
692 }
693 };
694 let Some(inner) = inner else {
695 bb_c.release_handle(h);
696 return;
697 };
698 c.emit(inner, h);
699 if roll {
700 let (new_id, new_handle) = create_window_node(c, &*bb_c);
701 {
702 let mut s = st_c.borrow_mut();
703 s.inner_id = Some(new_id);
704 s.counter = 0;
705 }
706 c.complete(inner);
707 c.emit(pid, new_handle);
708 }
709 }) {
710 bb_src.release_handle(h);
711 }
712 }
713 }
714 5 => {
715 // QA P1: atomic terminal-winner.
716 let was = std::mem::replace(&mut st.borrow_mut().terminated, true);
717 if was {
718 break;
719 }
720 if let Some(h) = m.payload_handle() {
721 // ERROR — error current window + self.
722 bb_src.retain_handle(h);
723 let st_c = st.clone();
724 let bb_c = bb_src.clone();
725 if !em_src.defer(move |c| {
726 if let Some(inner) = st_c.borrow_mut().inner_id.take() {
727 bb_c.retain_handle(h);
728 c.error(inner, h);
729 }
730 c.error(pid, h);
731 }) {
732 bb_src.release_handle(h);
733 }
734 } else {
735 // COMPLETE.
736 let st_c = st.clone();
737 let _ = em_src.defer(move |c| {
738 if let Some(inner) = st_c.borrow_mut().inner_id.take() {
739 c.complete(inner);
740 }
741 c.complete(pid);
742 });
743 }
744 break;
745 }
746 _ => {}
747 }
748 }
749 });
750
751 let outcome = ctx.subscribe_to(source, source_sink);
752 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
753 let mut s = state.borrow_mut();
754 if !s.terminated {
755 s.terminated = true;
756 let inner = s.inner_id.take();
757 drop(s);
758 if let Some(inner_id) = inner {
759 core_s.complete(inner_id);
760 }
761 core_s.complete(pid);
762 }
763 }
764 });
765
766 let fn_id = binding.register_producer_build(build);
767 core.register_producer(fn_id)
768 .expect("window_count: register_producer failed")
769}