1use std::sync::{Arc, Weak};
11
12use parking_lot::Mutex;
13
14use graphrefly_core::{BindingBoundary, Core, FnId, HandleId, NodeId, Sink};
15use smallvec::SmallVec;
16
17use crate::producer::{ProducerBinding, ProducerBuildFn, ProducerCtx, SubscribeOutcome};
18
19struct BufferState {
25 buf: Vec<HandleId>,
26 terminated: bool,
27 source_completed: bool,
28}
29
30impl BufferState {
31 fn new() -> Self {
32 Self {
33 buf: Vec::new(),
34 terminated: false,
35 source_completed: false,
36 }
37 }
38}
39
40#[must_use]
51#[allow(clippy::too_many_lines)]
52pub fn buffer(
53 core: &Core,
54 binding: &Arc<dyn ProducerBinding>,
55 source: NodeId,
56 notifier: NodeId,
57 pack_fn_id: FnId,
58) -> NodeId {
59 let core_weak = core.weak_handle();
60 let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
61
62 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
63 let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
64 return;
65 };
66 let pid = ctx.node_id();
67 let state: Arc<Mutex<BufferState>> = Arc::new(Mutex::new(BufferState::new()));
68
69 let st = state.clone();
71 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
72 let core_src = core_s.clone();
73 let source_sink: Sink = Arc::new(move |msgs| {
74 enum Act {
75 Flush(Vec<HandleId>),
76 Complete,
77 Error(HandleId),
78 Release(Vec<HandleId>),
79 }
80 let mut actions: SmallVec<[Act; 2]> = SmallVec::new();
81 {
82 let mut s = st.lock();
83 if s.terminated {
84 return;
85 }
86 for m in msgs {
87 if s.terminated {
88 break;
89 }
90 match m.tier() {
91 3 => {
92 if let Some(h) = m.payload_handle() {
93 bb.retain_handle(h);
94 s.buf.push(h);
95 }
96 }
97 5 => {
98 if let Some(h) = m.payload_handle() {
99 s.terminated = true;
101 bb.retain_handle(h);
102 let to_release: Vec<HandleId> = s.buf.drain(..).collect();
103 actions.push(Act::Release(to_release));
104 actions.push(Act::Error(h));
105 } else {
106 s.source_completed = true;
108 s.terminated = true;
109 if !s.buf.is_empty() {
110 let flushed: Vec<HandleId> = s.buf.drain(..).collect();
111 actions.push(Act::Flush(flushed));
112 }
113 actions.push(Act::Complete);
114 }
115 }
116 _ => {}
117 }
118 }
119 }
120 for a in actions {
121 match a {
122 Act::Flush(handles) => {
123 let packed = bb.pack_tuple(pack_fn_id, &handles);
124 for h in &handles {
125 bb.release_handle(*h);
126 }
127 core_src.emit_or_defer(pid, packed);
128 }
129 Act::Complete => core_src.complete_or_defer(pid),
130 Act::Error(h) => core_src.error_or_defer(pid, h),
131 Act::Release(handles) => {
132 for h in handles {
133 bb.release_handle(h);
134 }
135 }
136 }
137 }
138 });
139
140 let src_outcome = ctx.subscribe_to(source, source_sink);
141 if matches!(src_outcome, SubscribeOutcome::Dead { .. }) {
142 let mut s = state.lock();
143 if !s.terminated {
144 s.terminated = true;
145 s.source_completed = true;
146 drop(s);
148 core_s.complete_or_defer(pid);
149 return;
150 }
151 }
152
153 let st2 = state.clone();
155 let core_n = core_s.clone();
156 let bb2: Arc<dyn BindingBoundary> = binding_s.clone();
157 let notifier_sink: Sink = Arc::new(move |msgs| {
158 enum Act {
159 Flush(Vec<HandleId>),
160 Error(HandleId),
161 Release(Vec<HandleId>),
162 }
163 let mut actions: SmallVec<[Act; 2]> = SmallVec::new();
164 {
165 let mut s = st2.lock();
166 if s.terminated {
167 return;
168 }
169 for m in msgs {
170 if s.terminated {
171 break;
172 }
173 match m.tier() {
174 3 if m.payload_handle().is_some() && !s.buf.is_empty() => {
175 let flushed: Vec<HandleId> = s.buf.drain(..).collect();
176 actions.push(Act::Flush(flushed));
177 }
178 5 => {
179 if let Some(h) = m.payload_handle() {
180 s.terminated = true;
182 bb2.retain_handle(h);
183 let to_release: Vec<HandleId> = s.buf.drain(..).collect();
184 actions.push(Act::Release(to_release));
185 actions.push(Act::Error(h));
186 }
187 }
189 _ => {}
190 }
191 }
192 }
193 for a in actions {
194 match a {
195 Act::Flush(handles) => {
196 let packed = bb2.pack_tuple(pack_fn_id, &handles);
197 for h in &handles {
198 bb2.release_handle(*h);
199 }
200 core_n.emit_or_defer(pid, packed);
201 }
202 Act::Error(h) => core_n.error_or_defer(pid, h),
203 Act::Release(handles) => {
204 for h in handles {
205 bb2.release_handle(h);
206 }
207 }
208 }
209 }
210 });
211
212 let not_outcome = ctx.subscribe_to(notifier, notifier_sink);
213 let _ = not_outcome;
215 });
216
217 let fn_id = binding.register_producer_build(build);
218 core.register_producer(fn_id)
219 .expect("buffer: register_producer failed")
220}
221
222struct BufferCountState {
228 buf: Vec<HandleId>,
229 terminated: bool,
230}
231
232impl BufferCountState {
233 fn new() -> Self {
234 Self {
235 buf: Vec::new(),
236 terminated: false,
237 }
238 }
239}
240
241#[must_use]
252pub fn buffer_count(
253 core: &Core,
254 binding: &Arc<dyn ProducerBinding>,
255 source: NodeId,
256 count: usize,
257 pack_fn_id: FnId,
258) -> NodeId {
259 assert!(count > 0, "buffer_count: count must be > 0");
260
261 let core_weak = core.weak_handle();
262 let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
263
264 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
265 let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
266 return;
267 };
268 let pid = ctx.node_id();
269 let state: Arc<Mutex<BufferCountState>> = Arc::new(Mutex::new(BufferCountState::new()));
270
271 let st = state.clone();
272 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
273 let core_src = core_s.clone();
274 let source_sink: Sink = Arc::new(move |msgs| {
275 enum Act {
276 Flush(Vec<HandleId>),
277 Complete,
278 Error(HandleId),
279 Release(Vec<HandleId>),
280 }
281 let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
282 {
283 let mut s = st.lock();
284 if s.terminated {
285 return;
286 }
287 for m in msgs {
288 if s.terminated {
289 break;
290 }
291 match m.tier() {
292 3 => {
293 if let Some(h) = m.payload_handle() {
294 bb.retain_handle(h);
295 s.buf.push(h);
296 if s.buf.len() == count {
297 let flushed: Vec<HandleId> = s.buf.drain(..).collect();
298 actions.push(Act::Flush(flushed));
299 }
300 }
301 }
302 5 => {
303 if let Some(h) = m.payload_handle() {
304 s.terminated = true;
306 bb.retain_handle(h);
307 let to_release: Vec<HandleId> = s.buf.drain(..).collect();
308 actions.push(Act::Release(to_release));
309 actions.push(Act::Error(h));
310 } else {
311 s.terminated = true;
313 if !s.buf.is_empty() {
314 let flushed: Vec<HandleId> = s.buf.drain(..).collect();
315 actions.push(Act::Flush(flushed));
316 }
317 actions.push(Act::Complete);
318 }
319 }
320 _ => {}
321 }
322 }
323 }
324 for a in actions {
325 match a {
326 Act::Flush(handles) => {
327 let packed = bb.pack_tuple(pack_fn_id, &handles);
328 for h in &handles {
329 bb.release_handle(*h);
330 }
331 core_src.emit_or_defer(pid, packed);
332 }
333 Act::Complete => core_src.complete_or_defer(pid),
334 Act::Error(h) => core_src.error_or_defer(pid, h),
335 Act::Release(handles) => {
336 for h in handles {
337 bb.release_handle(h);
338 }
339 }
340 }
341 }
342 });
343
344 let outcome = ctx.subscribe_to(source, source_sink);
345 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
346 let mut s = state.lock();
347 if !s.terminated {
348 s.terminated = true;
349 drop(s);
350 core_s.complete_or_defer(pid);
351 }
352 }
353 });
354
355 let fn_id = binding.register_producer_build(build);
356 core.register_producer(fn_id)
357 .expect("buffer_count: register_producer failed")
358}
359
360struct WindowState {
366 inner_id: Option<NodeId>,
368 terminated: bool,
369}
370
371impl WindowState {
372 fn new() -> Self {
373 Self {
374 inner_id: None,
375 terminated: false,
376 }
377 }
378}
379
380#[must_use]
393#[allow(clippy::too_many_lines)]
394pub fn window(
395 core: &Core,
396 binding: &Arc<dyn ProducerBinding>,
397 source: NodeId,
398 notifier: NodeId,
399) -> NodeId {
400 let core_weak = core.weak_handle();
401 let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
402
403 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
404 let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
405 return;
406 };
407 let pid = ctx.node_id();
408 let state: Arc<Mutex<WindowState>> = Arc::new(Mutex::new(WindowState::new()));
409 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
410
411 let first_inner = create_window_node(&core_s, &*bb);
413 {
414 let mut s = state.lock();
415 s.inner_id = Some(first_inner.0);
416 }
417 core_s.emit_or_defer(pid, first_inner.1);
418
419 let st = state.clone();
421 let bb_src: Arc<dyn BindingBoundary> = binding_s.clone();
422 let core_src = core_s.clone();
423 let source_sink: Sink = Arc::new(move |msgs| {
424 enum Act {
425 Forward(NodeId, HandleId),
426 CompleteInner(NodeId),
427 CompleteSelf,
428 ErrorInner(NodeId, HandleId),
429 ErrorSelf(HandleId),
430 }
431 let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
432 {
433 let mut s = st.lock();
434 if s.terminated {
435 return;
436 }
437 for m in msgs {
438 if s.terminated {
439 break;
440 }
441 match m.tier() {
442 3 => {
443 if let Some(h) = m.payload_handle() {
444 bb_src.retain_handle(h);
445 if let Some(inner) = s.inner_id {
446 actions.push(Act::Forward(inner, h));
447 } else {
448 bb_src.release_handle(h);
449 }
450 }
451 }
452 5 => {
453 if let Some(h) = m.payload_handle() {
454 s.terminated = true;
456 bb_src.retain_handle(h);
457 if let Some(inner) = s.inner_id.take() {
458 bb_src.retain_handle(h);
460 actions.push(Act::ErrorInner(inner, h));
461 }
462 actions.push(Act::ErrorSelf(h));
463 } else {
464 s.terminated = true;
466 if let Some(inner) = s.inner_id.take() {
467 actions.push(Act::CompleteInner(inner));
468 }
469 actions.push(Act::CompleteSelf);
470 }
471 }
472 _ => {}
473 }
474 }
475 }
476 for a in actions {
477 match a {
478 Act::Forward(inner, h) => core_src.emit_or_defer(inner, h),
479 Act::CompleteInner(inner) => core_src.complete_or_defer(inner),
480 Act::CompleteSelf => core_src.complete_or_defer(pid),
481 Act::ErrorInner(inner, h) => core_src.error_or_defer(inner, h),
482 Act::ErrorSelf(h) => core_src.error_or_defer(pid, h),
483 }
484 }
485 });
486
487 let src_outcome = ctx.subscribe_to(source, source_sink);
488 if matches!(src_outcome, SubscribeOutcome::Dead { .. }) {
489 let mut s = state.lock();
490 if !s.terminated {
491 s.terminated = true;
492 let inner = s.inner_id.take();
493 drop(s);
494 if let Some(inner_id) = inner {
495 core_s.complete_or_defer(inner_id);
496 }
497 core_s.complete_or_defer(pid);
498 return;
499 }
500 }
501
502 let st2 = state.clone();
504 let core_n = core_s.clone();
505 let bb_not: Arc<dyn BindingBoundary> = binding_s.clone();
506 let notifier_sink: Sink = Arc::new(move |msgs| {
507 enum Act {
508 CompleteOldEmitNew(NodeId, HandleId),
509 ErrorInner(NodeId, HandleId),
510 ErrorSelf(HandleId),
511 }
512 let mut actions: SmallVec<[Act; 2]> = SmallVec::new();
513 {
514 let mut s = st2.lock();
515 if s.terminated {
516 return;
517 }
518 for m in msgs {
519 if s.terminated {
520 break;
521 }
522 match m.tier() {
523 3 if m.payload_handle().is_some() => {
524 let old_inner = s.inner_id.take();
526 let (new_id, new_handle) = create_window_node(&core_n, &*bb_not);
527 s.inner_id = Some(new_id);
528 if let Some(old) = old_inner {
529 actions.push(Act::CompleteOldEmitNew(old, new_handle));
530 } else {
531 actions.push(Act::CompleteOldEmitNew(new_id, new_handle));
534 }
535 }
536 5 => {
537 if let Some(h) = m.payload_handle() {
538 s.terminated = true;
540 bb_not.retain_handle(h);
541 if let Some(inner) = s.inner_id.take() {
542 bb_not.retain_handle(h);
543 actions.push(Act::ErrorInner(inner, h));
544 }
545 actions.push(Act::ErrorSelf(h));
546 }
547 }
549 _ => {}
550 }
551 }
552 }
553 for a in actions {
554 match a {
555 Act::CompleteOldEmitNew(old, new_handle) => {
556 core_n.complete_or_defer(old);
557 core_n.emit_or_defer(pid, new_handle);
558 }
559 Act::ErrorInner(inner, h) => core_n.error_or_defer(inner, h),
560 Act::ErrorSelf(h) => core_n.error_or_defer(pid, h),
561 }
562 }
563 });
564
565 let _ = ctx.subscribe_to(notifier, notifier_sink);
566 });
567
568 let fn_id = binding.register_producer_build(build);
569 core.register_producer(fn_id)
570 .expect("window: register_producer failed")
571}
572
573fn create_window_node(core: &Core, binding: &dyn BindingBoundary) -> (NodeId, HandleId) {
576 let inner_id = core
577 .register_state(graphrefly_core::NO_HANDLE, false)
578 .expect("window: register_state for inner node failed");
579 let handle = binding.intern_node(inner_id);
580 (inner_id, handle)
581}
582
583struct WindowCountState {
589 inner_id: Option<NodeId>,
591 counter: usize,
593 terminated: bool,
594}
595
596impl WindowCountState {
597 fn new() -> Self {
598 Self {
599 inner_id: None,
600 counter: 0,
601 terminated: false,
602 }
603 }
604}
605
606#[must_use]
620#[allow(clippy::too_many_lines)]
621pub fn window_count(
622 core: &Core,
623 binding: &Arc<dyn ProducerBinding>,
624 source: NodeId,
625 count: usize,
626) -> NodeId {
627 assert!(count > 0, "window_count: count must be > 0");
628
629 let core_weak = core.weak_handle();
630 let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
631
632 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
633 let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
634 return;
635 };
636 let pid = ctx.node_id();
637 let state: Arc<Mutex<WindowCountState>> = Arc::new(Mutex::new(WindowCountState::new()));
638 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
639
640 let first_inner = create_window_node(&core_s, &*bb);
642 {
643 let mut s = state.lock();
644 s.inner_id = Some(first_inner.0);
645 s.counter = 0;
646 }
647 core_s.emit_or_defer(pid, first_inner.1);
648
649 let st = state.clone();
651 let bb_src: Arc<dyn BindingBoundary> = binding_s.clone();
652 let core_src = core_s.clone();
653 let source_sink: Sink = Arc::new(move |msgs| {
654 enum Act {
655 Forward(NodeId, HandleId),
656 CompleteWindowEmitNew(NodeId, HandleId),
657 CompleteInner(NodeId),
658 CompleteSelf,
659 ErrorInner(NodeId, HandleId),
660 ErrorSelf(HandleId),
661 }
662 let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
663 {
664 let mut s = st.lock();
665 if s.terminated {
666 return;
667 }
668 for m in msgs {
669 if s.terminated {
670 break;
671 }
672 match m.tier() {
673 3 => {
674 if let Some(h) = m.payload_handle() {
675 bb_src.retain_handle(h);
676 if let Some(inner) = s.inner_id {
677 actions.push(Act::Forward(inner, h));
678 s.counter += 1;
679 if s.counter == count {
680 let (new_id, new_handle) =
682 create_window_node(&core_src, &*bb_src);
683 actions.push(Act::CompleteWindowEmitNew(inner, new_handle));
684 s.inner_id = Some(new_id);
685 s.counter = 0;
686 }
687 } else {
688 bb_src.release_handle(h);
689 }
690 }
691 }
692 5 => {
693 if let Some(h) = m.payload_handle() {
694 s.terminated = true;
696 bb_src.retain_handle(h);
697 if let Some(inner) = s.inner_id.take() {
698 bb_src.retain_handle(h);
699 actions.push(Act::ErrorInner(inner, h));
700 }
701 actions.push(Act::ErrorSelf(h));
702 } else {
703 s.terminated = true;
705 if let Some(inner) = s.inner_id.take() {
706 actions.push(Act::CompleteInner(inner));
707 }
708 actions.push(Act::CompleteSelf);
709 }
710 }
711 _ => {}
712 }
713 }
714 }
715 for a in actions {
716 match a {
717 Act::Forward(inner, h) => core_src.emit_or_defer(inner, h),
718 Act::CompleteWindowEmitNew(old_inner, new_handle) => {
719 core_src.complete_or_defer(old_inner);
720 core_src.emit_or_defer(pid, new_handle);
721 }
722 Act::CompleteInner(inner) => core_src.complete_or_defer(inner),
723 Act::CompleteSelf => core_src.complete_or_defer(pid),
724 Act::ErrorInner(inner, h) => core_src.error_or_defer(inner, h),
725 Act::ErrorSelf(h) => core_src.error_or_defer(pid, h),
726 }
727 }
728 });
729
730 let outcome = ctx.subscribe_to(source, source_sink);
731 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
732 let mut s = state.lock();
733 if !s.terminated {
734 s.terminated = true;
735 let inner = s.inner_id.take();
736 drop(s);
737 if let Some(inner_id) = inner {
738 core_s.complete_or_defer(inner_id);
739 }
740 core_s.complete_or_defer(pid);
741 }
742 }
743 });
744
745 let fn_id = binding.register_producer_build(build);
746 core.register_producer(fn_id)
747 .expect("window_count: register_producer failed")
748}