#![allow(clippy::collapsible_if, clippy::collapsible_match)]
#![allow(clippy::too_many_lines)]
use std::collections::VecDeque;
use std::sync::{Arc, Mutex, Weak};
use graphrefly_core::{Core, HandleId, NodeId, Sink};
use smallvec::SmallVec;
use super::producer::{ProducerBinding, ProducerCtx};
struct ZipState {
queues: Vec<VecDeque<HandleId>>,
completed: Vec<bool>,
errored: bool,
terminated: bool,
}
impl ZipState {
fn new(n: usize) -> Self {
Self {
queues: (0..n).map(|_| VecDeque::new()).collect(),
completed: vec![false; n],
errored: false,
terminated: false,
}
}
}
#[must_use]
pub fn zip(
core: &Core,
binding: &Arc<dyn ProducerBinding>,
sources: Vec<NodeId>,
pack_fn_id: graphrefly_core::FnId,
) -> NodeId {
let n = sources.len();
let core_weak = core.weak_handle();
let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
let build = Box::new(move |ctx: ProducerCtx<'_>| {
let producer_id = ctx.node_id();
let (Some(core_for_build), Some(binding_clone)) =
(core_weak.upgrade(), binding_weak.upgrade())
else {
return;
};
if n == 0 {
let tuple_h = binding_clone.pack_tuple(pack_fn_id, &[]);
core_for_build.emit(producer_id, tuple_h);
core_for_build.complete(producer_id);
return;
}
let state: Arc<Mutex<ZipState>> = Arc::new(Mutex::new(ZipState::new(n)));
for (idx, &source) in sources.iter().enumerate() {
let state_inner = state.clone();
let core_inner = core_for_build.clone();
let binding_inner = binding_clone.clone();
let sink: Sink = Arc::new(move |msgs| {
enum PostLockAction {
PackAndEmit(Vec<HandleId>),
Complete,
Error(HandleId),
}
let mut post_actions: SmallVec<[PostLockAction; 4]> = SmallVec::new();
let mut to_release: SmallVec<[HandleId; 8]> = SmallVec::new();
{
let mut s = state_inner.lock().unwrap();
if s.terminated {
return;
}
for m in msgs {
match m.tier() {
3 => {
if let Some(h) = m.payload_handle() {
binding_inner.retain_handle(h);
s.queues[idx].push_back(h);
while s.queues.iter().all(|q| !q.is_empty()) {
let popped: Vec<HandleId> = s
.queues
.iter_mut()
.map(|q| q.pop_front().unwrap())
.collect();
post_actions.push(PostLockAction::PackAndEmit(popped));
}
}
}
5 => {
if let Some(h) = m.payload_handle() {
if !s.errored && !s.terminated {
s.errored = true;
s.terminated = true;
binding_inner.retain_handle(h);
for q in &mut s.queues {
to_release.extend(q.drain(..));
}
post_actions.push(PostLockAction::Error(h));
}
} else {
s.completed[idx] = true;
if s.queues[idx].is_empty() && !s.terminated {
s.terminated = true;
for q in &mut s.queues {
to_release.extend(q.drain(..));
}
post_actions.push(PostLockAction::Complete);
}
}
}
_ => {} }
}
}
for h in to_release {
binding_inner.release_handle(h);
}
for action in post_actions {
match action {
PostLockAction::PackAndEmit(popped) => {
let tuple_h = binding_inner.pack_tuple(pack_fn_id, &popped);
for h in &popped {
binding_inner.release_handle(*h);
}
core_inner.emit(producer_id, tuple_h);
}
PostLockAction::Complete => core_inner.complete(producer_id),
PostLockAction::Error(h) => core_inner.error(producer_id, h),
}
}
});
ctx.subscribe_to(source, sink);
}
});
let fn_id = binding.register_producer_build(build);
core.register_producer(fn_id)
.expect("invariant: register_producer has no deps; no error variants reachable")
}
struct ConcatState {
phase: u8,
pending: VecDeque<HandleId>,
second_completed: bool,
terminated: bool,
}
impl ConcatState {
fn new() -> Self {
Self {
phase: 0,
pending: VecDeque::new(),
second_completed: false,
terminated: false,
}
}
}
#[must_use]
pub fn concat(
core: &Core,
binding: &Arc<dyn ProducerBinding>,
first: NodeId,
second: NodeId,
) -> NodeId {
let core_weak = core.weak_handle();
let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
let build = Box::new(move |ctx: ProducerCtx<'_>| {
let producer_id = ctx.node_id();
let (Some(core_clone), Some(binding_clone)) = (core_weak.upgrade(), binding_weak.upgrade())
else {
return;
};
let state: Arc<Mutex<ConcatState>> = Arc::new(Mutex::new(ConcatState::new()));
let state_for_second = state.clone();
let core_for_second = core_clone.clone();
let binding_for_second = binding_clone.clone();
let second_sink: Sink = Arc::new(move |msgs| {
enum Action {
Emit(HandleId),
Complete,
Error(HandleId),
}
let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
let mut to_release: SmallVec<[HandleId; 4]> = SmallVec::new();
{
let mut s = state_for_second.lock().unwrap();
if s.terminated {
return;
}
for m in msgs {
match m.tier() {
3 => {
if let Some(h) = m.payload_handle() {
if s.phase == 0 {
binding_for_second.retain_handle(h);
s.pending.push_back(h);
} else {
binding_for_second.retain_handle(h);
actions.push(Action::Emit(h));
}
}
}
5 => {
if let Some(h) = m.payload_handle() {
if !s.terminated {
s.terminated = true;
binding_for_second.retain_handle(h);
to_release.extend(s.pending.drain(..));
actions.push(Action::Error(h));
}
} else {
if s.phase == 1 && !s.terminated {
s.terminated = true;
actions.push(Action::Complete);
} else if s.phase == 0 {
s.second_completed = true;
}
}
}
_ => {} }
}
}
for h in to_release {
binding_for_second.release_handle(h);
}
for action in actions {
match action {
Action::Emit(h) => core_for_second.emit(producer_id, h),
Action::Complete => core_for_second.complete(producer_id),
Action::Error(h) => core_for_second.error(producer_id, h),
}
}
});
ctx.subscribe_to(second, second_sink);
let state_for_first = state.clone();
let core_for_first = core_clone.clone();
let binding_for_first = binding_clone.clone();
let first_sink: Sink = Arc::new(move |msgs| {
enum Action {
Emit(HandleId),
Complete,
Error(HandleId),
}
let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
let mut to_release: SmallVec<[HandleId; 4]> = SmallVec::new();
{
let mut s = state_for_first.lock().unwrap();
if s.terminated {
return;
}
if s.phase != 0 {
return; }
for m in msgs {
if s.terminated || s.phase != 0 {
break;
}
match m.tier() {
3 => {
if let Some(h) = m.payload_handle() {
binding_for_first.retain_handle(h);
actions.push(Action::Emit(h));
}
}
5 => {
if let Some(h) = m.payload_handle() {
if !s.terminated {
s.terminated = true;
binding_for_first.retain_handle(h);
to_release.extend(s.pending.drain(..));
actions.push(Action::Error(h));
}
} else {
s.phase = 1;
for h in s.pending.drain(..) {
actions.push(Action::Emit(h));
}
if s.second_completed && !s.terminated {
s.terminated = true;
actions.push(Action::Complete);
}
}
}
_ => {} }
}
}
for h in to_release {
binding_for_first.release_handle(h);
}
for action in actions {
match action {
Action::Emit(h) => core_for_first.emit(producer_id, h),
Action::Complete => core_for_first.complete(producer_id),
Action::Error(h) => core_for_first.error(producer_id, h),
}
}
});
ctx.subscribe_to(first, first_sink);
});
let fn_id = binding.register_producer_build(build);
core.register_producer(fn_id)
.expect("invariant: register_producer has no deps; no error variants reachable")
}
struct RaceState {
winner: Option<usize>,
completed: Vec<bool>,
terminated: bool,
}
impl RaceState {
fn new(n: usize) -> Self {
Self {
winner: None,
completed: vec![false; n],
terminated: false,
}
}
}
#[must_use]
pub fn race(core: &Core, binding: &Arc<dyn ProducerBinding>, sources: Vec<NodeId>) -> NodeId {
let n = sources.len();
let core_weak = core.weak_handle();
let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
let build = Box::new(move |ctx: ProducerCtx<'_>| {
let producer_id = ctx.node_id();
let (Some(core_clone), Some(binding_clone)) = (core_weak.upgrade(), binding_weak.upgrade())
else {
return;
};
if n == 0 {
core_clone.complete(producer_id);
return;
}
let state: Arc<Mutex<RaceState>> = Arc::new(Mutex::new(RaceState::new(n)));
for (idx, &source) in sources.iter().enumerate() {
let state_inner = state.clone();
let core_inner = core_clone.clone();
let binding_inner = binding_clone.clone();
let sink: Sink = Arc::new(move |msgs| {
enum Action {
Emit(HandleId),
Complete,
Error(HandleId),
}
let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
{
let mut s = state_inner.lock().unwrap();
if s.terminated {
return;
}
for m in msgs {
match m.tier() {
3 => {
if let Some(h) = m.payload_handle() {
if s.winner.is_none() {
s.winner = Some(idx);
binding_inner.retain_handle(h);
actions.push(Action::Emit(h));
} else if s.winner == Some(idx) {
binding_inner.retain_handle(h);
actions.push(Action::Emit(h));
}
}
}
5 => {
if let Some(h) = m.payload_handle() {
if (s.winner.is_none() || s.winner == Some(idx))
&& !s.terminated
{
s.terminated = true;
binding_inner.retain_handle(h);
actions.push(Action::Error(h));
}
} else {
s.completed[idx] = true;
if s.winner == Some(idx) && !s.terminated {
s.terminated = true;
actions.push(Action::Complete);
} else if s.winner.is_none()
&& s.completed.iter().all(|&c| c)
&& !s.terminated
{
s.terminated = true;
actions.push(Action::Complete);
}
}
}
_ => {} }
}
}
for action in actions {
match action {
Action::Emit(h) => core_inner.emit(producer_id, h),
Action::Complete => core_inner.complete(producer_id),
Action::Error(h) => core_inner.error(producer_id, h),
}
}
});
ctx.subscribe_to(source, sink);
}
});
let fn_id = binding.register_producer_build(build);
core.register_producer(fn_id)
.expect("invariant: register_producer has no deps; no error variants reachable")
}
struct TakeUntilState {
terminated: bool,
}
impl TakeUntilState {
fn new() -> Self {
Self { terminated: false }
}
}
#[must_use]
pub fn take_until(
core: &Core,
binding: &Arc<dyn ProducerBinding>,
source: NodeId,
notifier: NodeId,
) -> NodeId {
let core_weak = core.weak_handle();
let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
let build = Box::new(move |ctx: ProducerCtx<'_>| {
let producer_id = ctx.node_id();
let (Some(core_clone), Some(binding_clone)) = (core_weak.upgrade(), binding_weak.upgrade())
else {
return;
};
let state: Arc<Mutex<TakeUntilState>> = Arc::new(Mutex::new(TakeUntilState::new()));
let state_for_source = state.clone();
let core_for_source = core_clone.clone();
let binding_for_source = binding_clone.clone();
let source_sink: Sink = Arc::new(move |msgs| {
enum Action {
Emit(HandleId),
Complete,
Error(HandleId),
}
let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
{
let mut s = state_for_source.lock().unwrap();
if s.terminated {
return;
}
for m in msgs {
match m.tier() {
3 => {
if let Some(h) = m.payload_handle() {
binding_for_source.retain_handle(h);
actions.push(Action::Emit(h));
}
}
5 => {
if let Some(h) = m.payload_handle() {
if !s.terminated {
s.terminated = true;
binding_for_source.retain_handle(h);
actions.push(Action::Error(h));
}
} else {
if !s.terminated {
s.terminated = true;
actions.push(Action::Complete);
}
}
}
_ => {} }
}
}
for action in actions {
match action {
Action::Emit(h) => core_for_source.emit(producer_id, h),
Action::Complete => core_for_source.complete(producer_id),
Action::Error(h) => core_for_source.error(producer_id, h),
}
}
});
ctx.subscribe_to(source, source_sink);
let state_for_notifier = state.clone();
let core_for_notifier = core_clone.clone();
let binding_for_notifier = binding_clone.clone();
let notifier_sink: Sink = Arc::new(move |msgs| {
enum Action {
Complete,
Error(HandleId),
}
let mut action: Option<Action> = None;
{
let mut s = state_for_notifier.lock().unwrap();
if s.terminated {
return;
}
for m in msgs {
match m.tier() {
3 => {
if m.payload_handle().is_some() && !s.terminated {
s.terminated = true;
action = Some(Action::Complete);
break;
}
}
5 => {
if let Some(h) = m.payload_handle() {
if !s.terminated {
s.terminated = true;
binding_for_notifier.retain_handle(h);
action = Some(Action::Error(h));
break;
}
}
}
_ => {} }
}
}
if let Some(a) = action {
match a {
Action::Complete => core_for_notifier.complete(producer_id),
Action::Error(h) => core_for_notifier.error(producer_id, h),
}
}
});
ctx.subscribe_to(notifier, notifier_sink);
});
let fn_id = binding.register_producer_build(build);
core.register_producer(fn_id)
.expect("invariant: register_producer has no deps; no error variants reachable")
}