#![allow(clippy::arc_with_non_send_sync)]
use std::sync::Arc;
use parking_lot::Mutex;
use graphrefly_core::{BindingBoundary, Core, FnId, HandleId, NodeId, Sink};
use smallvec::SmallVec;
use crate::producer::{ProducerBinding, ProducerBuildFn, ProducerCtx, SubscribeOutcome};
struct BufferState {
buf: Vec<HandleId>,
terminated: bool,
source_completed: bool,
}
impl BufferState {
fn new() -> Self {
Self {
buf: Vec::new(),
terminated: false,
source_completed: false,
}
}
}
#[must_use]
#[allow(clippy::too_many_lines)]
pub fn buffer(
core: &Core,
binding: &Arc<dyn ProducerBinding>,
source: NodeId,
notifier: NodeId,
pack_fn_id: FnId,
) -> NodeId {
let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
let core_s = ctx.core();
let binding_s = ctx.core().binding();
let em = ctx.emitter();
let pid = ctx.node_id();
let state: Arc<Mutex<BufferState>> = Arc::new(Mutex::new(BufferState::new()));
let st = state.clone();
let bb: Arc<dyn BindingBoundary> = binding_s.clone();
let core_src = em.clone();
let source_sink: Sink = Arc::new(move |msgs| {
enum Act {
Flush(Vec<HandleId>),
Complete,
Error(HandleId),
Release(Vec<HandleId>),
}
let mut actions: SmallVec<[Act; 2]> = SmallVec::new();
{
let mut s = st.lock();
if s.terminated {
return;
}
for m in msgs {
if s.terminated {
break;
}
match m.tier() {
3 => {
if let Some(h) = m.payload_handle() {
bb.retain_handle(h);
s.buf.push(h);
}
}
5 => {
if let Some(h) = m.payload_handle() {
s.terminated = true;
bb.retain_handle(h);
let to_release: Vec<HandleId> = s.buf.drain(..).collect();
actions.push(Act::Release(to_release));
actions.push(Act::Error(h));
} else {
s.source_completed = true;
s.terminated = true;
if !s.buf.is_empty() {
let flushed: Vec<HandleId> = s.buf.drain(..).collect();
actions.push(Act::Flush(flushed));
}
actions.push(Act::Complete);
}
}
_ => {}
}
}
}
for a in actions {
match a {
Act::Flush(handles) => {
let packed = bb.pack_tuple(pack_fn_id, &handles);
for h in &handles {
bb.release_handle(*h);
}
core_src.emit_or_defer(pid, packed);
}
Act::Complete => core_src.complete_or_defer(pid),
Act::Error(h) => core_src.error_or_defer(pid, h),
Act::Release(handles) => {
for h in handles {
bb.release_handle(h);
}
}
}
}
});
let src_outcome = ctx.subscribe_to(source, source_sink);
if matches!(src_outcome, SubscribeOutcome::Dead { .. }) {
let mut s = state.lock();
if !s.terminated {
s.terminated = true;
s.source_completed = true;
drop(s);
core_s.complete_or_defer(pid);
return;
}
}
let st2 = state.clone();
let core_n = em.clone();
let bb2: Arc<dyn BindingBoundary> = binding_s.clone();
let notifier_sink: Sink = Arc::new(move |msgs| {
enum Act {
Flush(Vec<HandleId>),
Error(HandleId),
Release(Vec<HandleId>),
}
let mut actions: SmallVec<[Act; 2]> = SmallVec::new();
{
let mut s = st2.lock();
if s.terminated {
return;
}
for m in msgs {
if s.terminated {
break;
}
match m.tier() {
3 if m.payload_handle().is_some() && !s.buf.is_empty() => {
let flushed: Vec<HandleId> = s.buf.drain(..).collect();
actions.push(Act::Flush(flushed));
}
5 => {
if let Some(h) = m.payload_handle() {
s.terminated = true;
bb2.retain_handle(h);
let to_release: Vec<HandleId> = s.buf.drain(..).collect();
actions.push(Act::Release(to_release));
actions.push(Act::Error(h));
}
}
_ => {}
}
}
}
for a in actions {
match a {
Act::Flush(handles) => {
let packed = bb2.pack_tuple(pack_fn_id, &handles);
for h in &handles {
bb2.release_handle(*h);
}
core_n.emit_or_defer(pid, packed);
}
Act::Error(h) => core_n.error_or_defer(pid, h),
Act::Release(handles) => {
for h in handles {
bb2.release_handle(h);
}
}
}
}
});
let not_outcome = ctx.subscribe_to(notifier, notifier_sink);
let _ = not_outcome;
});
let fn_id = binding.register_producer_build(build);
core.register_producer(fn_id)
.expect("buffer: register_producer failed")
}
struct BufferCountState {
buf: Vec<HandleId>,
terminated: bool,
}
impl BufferCountState {
fn new() -> Self {
Self {
buf: Vec::new(),
terminated: false,
}
}
}
#[must_use]
pub fn buffer_count(
core: &Core,
binding: &Arc<dyn ProducerBinding>,
source: NodeId,
count: usize,
pack_fn_id: FnId,
) -> NodeId {
assert!(count > 0, "buffer_count: count must be > 0");
let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
let core_s = ctx.core();
let binding_s = ctx.core().binding();
let em = ctx.emitter();
let pid = ctx.node_id();
let state: Arc<Mutex<BufferCountState>> = Arc::new(Mutex::new(BufferCountState::new()));
let st = state.clone();
let bb: Arc<dyn BindingBoundary> = binding_s.clone();
let core_src = em.clone();
let source_sink: Sink = Arc::new(move |msgs| {
enum Act {
Flush(Vec<HandleId>),
Complete,
Error(HandleId),
Release(Vec<HandleId>),
}
let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
{
let mut s = st.lock();
if s.terminated {
return;
}
for m in msgs {
if s.terminated {
break;
}
match m.tier() {
3 => {
if let Some(h) = m.payload_handle() {
bb.retain_handle(h);
s.buf.push(h);
if s.buf.len() == count {
let flushed: Vec<HandleId> = s.buf.drain(..).collect();
actions.push(Act::Flush(flushed));
}
}
}
5 => {
if let Some(h) = m.payload_handle() {
s.terminated = true;
bb.retain_handle(h);
let to_release: Vec<HandleId> = s.buf.drain(..).collect();
actions.push(Act::Release(to_release));
actions.push(Act::Error(h));
} else {
s.terminated = true;
if !s.buf.is_empty() {
let flushed: Vec<HandleId> = s.buf.drain(..).collect();
actions.push(Act::Flush(flushed));
}
actions.push(Act::Complete);
}
}
_ => {}
}
}
}
for a in actions {
match a {
Act::Flush(handles) => {
let packed = bb.pack_tuple(pack_fn_id, &handles);
for h in &handles {
bb.release_handle(*h);
}
core_src.emit_or_defer(pid, packed);
}
Act::Complete => core_src.complete_or_defer(pid),
Act::Error(h) => core_src.error_or_defer(pid, h),
Act::Release(handles) => {
for h in handles {
bb.release_handle(h);
}
}
}
}
});
let outcome = ctx.subscribe_to(source, source_sink);
if matches!(outcome, SubscribeOutcome::Dead { .. }) {
let mut s = state.lock();
if !s.terminated {
s.terminated = true;
drop(s);
core_s.complete_or_defer(pid);
}
}
});
let fn_id = binding.register_producer_build(build);
core.register_producer(fn_id)
.expect("buffer_count: register_producer failed")
}
struct WindowState {
inner_id: Option<NodeId>,
terminated: bool,
}
impl WindowState {
fn new() -> Self {
Self {
inner_id: None,
terminated: false,
}
}
}
#[must_use]
#[allow(clippy::too_many_lines)]
pub fn window(
core: &Core,
binding: &Arc<dyn ProducerBinding>,
source: NodeId,
notifier: NodeId,
) -> NodeId {
let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
let core_s = ctx.core();
let binding_s = ctx.core().binding();
let em = ctx.emitter();
let pid = ctx.node_id();
let state: Arc<Mutex<WindowState>> = Arc::new(Mutex::new(WindowState::new()));
let bb: Arc<dyn BindingBoundary> = binding_s.clone();
let first_inner = create_window_node(core_s, &*bb);
{
let mut s = state.lock();
s.inner_id = Some(first_inner.0);
}
core_s.emit_or_defer(pid, first_inner.1);
let st = state.clone();
let bb_src: Arc<dyn BindingBoundary> = binding_s.clone();
let em_src = em.clone();
let source_sink: Sink = Arc::new(move |msgs| {
for m in msgs {
if st.lock().terminated {
break;
}
match m.tier() {
3 => {
if let Some(h) = m.payload_handle() {
bb_src.retain_handle(h);
let st_c = st.clone();
let bb_c = bb_src.clone();
if !em_src.defer(move |c| {
let inner = st_c.lock().inner_id;
match inner {
Some(i) => c.emit(i, h),
None => bb_c.release_handle(h),
}
}) {
bb_src.release_handle(h);
}
}
}
5 => {
let was = std::mem::replace(&mut st.lock().terminated, true);
if was {
break;
}
if let Some(h) = m.payload_handle() {
bb_src.retain_handle(h);
let st_c = st.clone();
let bb_c = bb_src.clone();
if !em_src.defer(move |c| {
if let Some(i) = st_c.lock().inner_id.take() {
bb_c.retain_handle(h);
c.error(i, h);
}
c.error(pid, h);
}) {
bb_src.release_handle(h);
}
} else {
let st_c = st.clone();
let _ = em_src.defer(move |c| {
if let Some(i) = st_c.lock().inner_id.take() {
c.complete(i);
}
c.complete(pid);
});
}
break;
}
_ => {}
}
}
});
let src_outcome = ctx.subscribe_to(source, source_sink);
if matches!(src_outcome, SubscribeOutcome::Dead { .. }) {
let mut s = state.lock();
if !s.terminated {
s.terminated = true;
let inner = s.inner_id.take();
drop(s);
if let Some(inner_id) = inner {
core_s.complete_or_defer(inner_id);
}
core_s.complete_or_defer(pid);
return;
}
}
let st2 = state.clone();
let em_not = em.clone();
let bb_not: Arc<dyn BindingBoundary> = binding_s.clone();
let notifier_sink: Sink = Arc::new(move |msgs| {
for m in msgs {
if st2.lock().terminated {
break;
}
match m.tier() {
3 if m.payload_handle().is_some() => {
let st_c = st2.clone();
let bb_c = bb_not.clone();
let _ = em_not.defer(move |c| {
let (new_id, new_handle) = create_window_node(c, &*bb_c);
let old_inner = st_c.lock().inner_id.replace(new_id);
if let Some(old) = old_inner {
c.complete(old);
}
c.emit(pid, new_handle);
});
}
5 => {
if let Some(h) = m.payload_handle() {
let was = std::mem::replace(&mut st2.lock().terminated, true);
if was {
break;
}
bb_not.retain_handle(h);
let st_c = st2.clone();
let bb_c = bb_not.clone();
if !em_not.defer(move |c| {
if let Some(inner) = st_c.lock().inner_id.take() {
bb_c.retain_handle(h);
c.error(inner, h);
}
c.error(pid, h);
}) {
bb_not.release_handle(h);
}
break;
}
}
_ => {}
}
}
});
let _ = ctx.subscribe_to(notifier, notifier_sink);
});
let fn_id = binding.register_producer_build(build);
core.register_producer(fn_id)
.expect("window: register_producer failed")
}
fn create_window_node(
core: &dyn graphrefly_core::CoreFull,
binding: &dyn BindingBoundary,
) -> (NodeId, HandleId) {
let inner_id = core
.register_state(graphrefly_core::NO_HANDLE, false)
.expect("window: register_state for inner node failed");
let handle = binding.intern_node(inner_id);
(inner_id, handle)
}
struct WindowCountState {
inner_id: Option<NodeId>,
counter: usize,
terminated: bool,
}
impl WindowCountState {
fn new() -> Self {
Self {
inner_id: None,
counter: 0,
terminated: false,
}
}
}
#[must_use]
#[allow(clippy::too_many_lines)]
pub fn window_count(
core: &Core,
binding: &Arc<dyn ProducerBinding>,
source: NodeId,
count: usize,
) -> NodeId {
assert!(count > 0, "window_count: count must be > 0");
let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
let core_s = ctx.core();
let binding_s = ctx.core().binding();
let em = ctx.emitter();
let pid = ctx.node_id();
let state: Arc<Mutex<WindowCountState>> = Arc::new(Mutex::new(WindowCountState::new()));
let bb: Arc<dyn BindingBoundary> = binding_s.clone();
let first_inner = create_window_node(core_s, &*bb);
{
let mut s = state.lock();
s.inner_id = Some(first_inner.0);
s.counter = 0;
}
core_s.emit_or_defer(pid, first_inner.1);
let st = state.clone();
let bb_src: Arc<dyn BindingBoundary> = binding_s.clone();
let em_src = em.clone();
let source_sink: Sink = Arc::new(move |msgs| {
for m in msgs {
if st.lock().terminated {
break;
}
match m.tier() {
3 => {
if let Some(h) = m.payload_handle() {
bb_src.retain_handle(h);
let st_c = st.clone();
let bb_c = bb_src.clone();
if !em_src.defer(move |c| {
let (inner, roll) = {
let mut s = st_c.lock();
match s.inner_id {
Some(inner) => {
s.counter += 1;
let roll = s.counter == count;
(Some(inner), roll)
}
None => (None, false),
}
};
let Some(inner) = inner else {
bb_c.release_handle(h);
return;
};
c.emit(inner, h);
if roll {
let (new_id, new_handle) = create_window_node(c, &*bb_c);
{
let mut s = st_c.lock();
s.inner_id = Some(new_id);
s.counter = 0;
}
c.complete(inner);
c.emit(pid, new_handle);
}
}) {
bb_src.release_handle(h);
}
}
}
5 => {
let was = std::mem::replace(&mut st.lock().terminated, true);
if was {
break;
}
if let Some(h) = m.payload_handle() {
bb_src.retain_handle(h);
let st_c = st.clone();
let bb_c = bb_src.clone();
if !em_src.defer(move |c| {
if let Some(inner) = st_c.lock().inner_id.take() {
bb_c.retain_handle(h);
c.error(inner, h);
}
c.error(pid, h);
}) {
bb_src.release_handle(h);
}
} else {
let st_c = st.clone();
let _ = em_src.defer(move |c| {
if let Some(inner) = st_c.lock().inner_id.take() {
c.complete(inner);
}
c.complete(pid);
});
}
break;
}
_ => {}
}
}
});
let outcome = ctx.subscribe_to(source, source_sink);
if matches!(outcome, SubscribeOutcome::Dead { .. }) {
let mut s = state.lock();
if !s.terminated {
s.terminated = true;
let inner = s.inner_id.take();
drop(s);
if let Some(inner_id) = inner {
core_s.complete_or_defer(inner_id);
}
core_s.complete_or_defer(pid);
}
}
});
let fn_id = binding.register_producer_build(build);
core.register_producer(fn_id)
.expect("window_count: register_producer failed")
}