#![allow(clippy::arc_with_non_send_sync)]
#![allow(clippy::collapsible_if, clippy::collapsible_match)]
#![allow(clippy::too_many_lines)]
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use graphrefly_core::{Core, HandleId, NodeId, Sink};
use smallvec::SmallVec;
use super::error::OperatorFactoryError;
use super::producer::{ProducerBinding, ProducerCtx};
struct ZipState {
queues: Vec<VecDeque<HandleId>>,
completed: Vec<bool>,
errored: bool,
terminated: bool,
}
impl ZipState {
fn new(n: usize) -> Self {
Self {
queues: (0..n).map(|_| VecDeque::new()).collect(),
completed: vec![false; n],
errored: false,
terminated: false,
}
}
}
pub fn zip(
core: &Core,
binding: &Arc<dyn ProducerBinding>,
sources: Vec<NodeId>,
pack_fn_id: graphrefly_core::FnId,
) -> Result<NodeId, OperatorFactoryError> {
if sources.is_empty() {
return Err(OperatorFactoryError::EmptySources);
}
let n = sources.len();
let build = Box::new(move |ctx: ProducerCtx<'_>| {
let producer_id = ctx.node_id();
let binding_clone = ctx.core().binding();
let em = ctx.emitter();
let state: Arc<Mutex<ZipState>> = Arc::new(Mutex::new(ZipState::new(n)));
for (idx, &source) in sources.iter().enumerate() {
let state_inner = state.clone();
let core_inner = em.clone();
let binding_inner = binding_clone.clone();
let sink: Sink = Arc::new(move |msgs| {
enum PostLockAction {
PackAndEmit(Vec<HandleId>),
Complete,
Error(HandleId),
}
let mut post_actions: SmallVec<[PostLockAction; 4]> = SmallVec::new();
let mut to_release: SmallVec<[HandleId; 8]> = SmallVec::new();
{
let mut s = state_inner.lock().unwrap();
if s.terminated {
return;
}
for m in msgs {
match m.tier() {
3 => {
if let Some(h) = m.payload_handle() {
binding_inner.retain_handle(h);
s.queues[idx].push_back(h);
while s.queues.iter().all(|q| !q.is_empty()) {
let popped: Vec<HandleId> = s
.queues
.iter_mut()
.map(|q| q.pop_front().unwrap())
.collect();
post_actions.push(PostLockAction::PackAndEmit(popped));
}
}
}
5 => {
if let Some(h) = m.payload_handle() {
if !s.errored && !s.terminated {
s.errored = true;
s.terminated = true;
binding_inner.retain_handle(h);
for q in &mut s.queues {
to_release.extend(q.drain(..));
}
post_actions.push(PostLockAction::Error(h));
}
} else {
s.completed[idx] = true;
if s.queues[idx].is_empty() && !s.terminated {
s.terminated = true;
for q in &mut s.queues {
to_release.extend(q.drain(..));
}
post_actions.push(PostLockAction::Complete);
}
}
}
_ => {} }
}
}
for h in to_release {
binding_inner.release_handle(h);
}
for action in post_actions {
match action {
PostLockAction::PackAndEmit(popped) => {
let tuple_h = binding_inner.pack_tuple(pack_fn_id, &popped);
for h in &popped {
binding_inner.release_handle(*h);
}
core_inner.emit_or_defer(producer_id, tuple_h);
}
PostLockAction::Complete => core_inner.complete_or_defer(producer_id),
PostLockAction::Error(h) => core_inner.error_or_defer(producer_id, h),
}
}
});
let outcome = ctx.subscribe_to(source, sink);
if matches!(outcome, crate::producer::SubscribeOutcome::Dead { .. }) {
let core_dead = em.clone();
let binding_dead = binding_clone.clone();
let mut should_complete = false;
let mut to_release: SmallVec<[HandleId; 8]> = SmallVec::new();
{
let mut s = state.lock().unwrap();
if !s.terminated {
s.completed[idx] = true;
if s.queues[idx].is_empty() {
s.terminated = true;
for q in &mut s.queues {
to_release.extend(q.drain(..));
}
should_complete = true;
}
}
}
for h in to_release {
binding_dead.release_handle(h);
}
if should_complete {
core_dead.complete_or_defer(producer_id);
}
}
}
});
let fn_id = binding.register_producer_build(build);
Ok(core
.register_producer(fn_id)
.expect("invariant: register_producer has no deps; no error variants reachable"))
}
struct ConcatState {
phase: u8,
pending: VecDeque<HandleId>,
second_completed: bool,
terminated: bool,
}
impl ConcatState {
fn new() -> Self {
Self {
phase: 0,
pending: VecDeque::new(),
second_completed: false,
terminated: false,
}
}
}
#[must_use]
pub fn concat(
core: &Core,
binding: &Arc<dyn ProducerBinding>,
first: NodeId,
second: NodeId,
) -> NodeId {
let build = Box::new(move |ctx: ProducerCtx<'_>| {
let producer_id = ctx.node_id();
let binding_clone = ctx.core().binding();
let em = ctx.emitter();
let state: Arc<Mutex<ConcatState>> = Arc::new(Mutex::new(ConcatState::new()));
let state_for_second = state.clone();
let core_for_second = em.clone();
let binding_for_second = binding_clone.clone();
let second_sink: Sink = Arc::new(move |msgs| {
enum Action {
Emit(HandleId),
Complete,
Error(HandleId),
}
let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
let mut to_release: SmallVec<[HandleId; 4]> = SmallVec::new();
{
let mut s = state_for_second.lock().unwrap();
if s.terminated {
return;
}
for m in msgs {
match m.tier() {
3 => {
if let Some(h) = m.payload_handle() {
if s.phase == 0 {
binding_for_second.retain_handle(h);
s.pending.push_back(h);
} else {
binding_for_second.retain_handle(h);
actions.push(Action::Emit(h));
}
}
}
5 => {
if let Some(h) = m.payload_handle() {
if !s.terminated {
s.terminated = true;
binding_for_second.retain_handle(h);
to_release.extend(s.pending.drain(..));
actions.push(Action::Error(h));
}
} else {
if s.phase == 1 && !s.terminated {
s.terminated = true;
actions.push(Action::Complete);
} else if s.phase == 0 {
s.second_completed = true;
}
}
}
_ => {} }
}
}
for h in to_release {
binding_for_second.release_handle(h);
}
for action in actions {
match action {
Action::Emit(h) => core_for_second.emit_or_defer(producer_id, h),
Action::Complete => core_for_second.complete_or_defer(producer_id),
Action::Error(h) => core_for_second.error_or_defer(producer_id, h),
}
}
});
let second_outcome = ctx.subscribe_to(second, second_sink);
if matches!(
second_outcome,
crate::producer::SubscribeOutcome::Dead { .. }
) {
let mut s = state.lock().unwrap();
s.second_completed = true;
}
let state_for_first = state.clone();
let core_for_first = em.clone();
let binding_for_first = binding_clone.clone();
let first_sink: Sink = Arc::new(move |msgs| {
enum Action {
Emit(HandleId),
Complete,
Error(HandleId),
}
let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
let mut to_release: SmallVec<[HandleId; 4]> = SmallVec::new();
{
let mut s = state_for_first.lock().unwrap();
if s.terminated {
return;
}
if s.phase != 0 {
return; }
for m in msgs {
if s.terminated || s.phase != 0 {
break;
}
match m.tier() {
3 => {
if let Some(h) = m.payload_handle() {
binding_for_first.retain_handle(h);
actions.push(Action::Emit(h));
}
}
5 => {
if let Some(h) = m.payload_handle() {
if !s.terminated {
s.terminated = true;
binding_for_first.retain_handle(h);
to_release.extend(s.pending.drain(..));
actions.push(Action::Error(h));
}
} else {
s.phase = 1;
for h in s.pending.drain(..) {
actions.push(Action::Emit(h));
}
if s.second_completed && !s.terminated {
s.terminated = true;
actions.push(Action::Complete);
}
}
}
_ => {} }
}
}
for h in to_release {
binding_for_first.release_handle(h);
}
for action in actions {
match action {
Action::Emit(h) => core_for_first.emit_or_defer(producer_id, h),
Action::Complete => core_for_first.complete_or_defer(producer_id),
Action::Error(h) => core_for_first.error_or_defer(producer_id, h),
}
}
});
let first_outcome = ctx.subscribe_to(first, first_sink);
if matches!(
first_outcome,
crate::producer::SubscribeOutcome::Dead { .. }
) {
let core_first_dead = em.clone();
let mut should_complete = false;
let mut pending_to_emit: Vec<HandleId> = Vec::new();
{
let mut s = state.lock().unwrap();
if !s.terminated && s.phase == 0 {
s.phase = 1;
pending_to_emit.extend(s.pending.drain(..));
if s.second_completed && !s.terminated {
s.terminated = true;
should_complete = true;
}
}
}
for h in pending_to_emit {
core_first_dead.emit_or_defer(producer_id, h);
}
if should_complete {
core_first_dead.complete_or_defer(producer_id);
}
}
});
let fn_id = binding.register_producer_build(build);
core.register_producer(fn_id)
.expect("invariant: register_producer has no deps; no error variants reachable")
}
struct RaceState {
winner: Option<usize>,
completed: Vec<bool>,
terminated: bool,
}
impl RaceState {
fn new(n: usize) -> Self {
Self {
winner: None,
completed: vec![false; n],
terminated: false,
}
}
}
pub fn race(
core: &Core,
binding: &Arc<dyn ProducerBinding>,
sources: Vec<NodeId>,
) -> Result<NodeId, OperatorFactoryError> {
if sources.is_empty() {
return Err(OperatorFactoryError::EmptySources);
}
let n = sources.len();
let build = Box::new(move |ctx: ProducerCtx<'_>| {
let producer_id = ctx.node_id();
let binding_clone = ctx.core().binding();
let em = ctx.emitter();
let state: Arc<Mutex<RaceState>> = Arc::new(Mutex::new(RaceState::new(n)));
for (idx, &source) in sources.iter().enumerate() {
let state_inner = state.clone();
let core_inner = em.clone();
let binding_inner = binding_clone.clone();
let sink: Sink = Arc::new(move |msgs| {
enum Action {
Emit(HandleId),
Complete,
Error(HandleId),
}
let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
{
let mut s = state_inner.lock().unwrap();
if s.terminated {
return;
}
for m in msgs {
match m.tier() {
3 => {
if let Some(h) = m.payload_handle() {
if s.winner.is_none() {
s.winner = Some(idx);
binding_inner.retain_handle(h);
actions.push(Action::Emit(h));
} else if s.winner == Some(idx) {
binding_inner.retain_handle(h);
actions.push(Action::Emit(h));
}
}
}
5 => {
if let Some(h) = m.payload_handle() {
if (s.winner.is_none() || s.winner == Some(idx))
&& !s.terminated
{
s.terminated = true;
binding_inner.retain_handle(h);
actions.push(Action::Error(h));
}
} else {
s.completed[idx] = true;
if s.winner == Some(idx) && !s.terminated {
s.terminated = true;
actions.push(Action::Complete);
} else if s.winner.is_none()
&& s.completed.iter().all(|&c| c)
&& !s.terminated
{
s.terminated = true;
actions.push(Action::Complete);
}
}
}
_ => {} }
}
}
for action in actions {
match action {
Action::Emit(h) => core_inner.emit_or_defer(producer_id, h),
Action::Complete => core_inner.complete_or_defer(producer_id),
Action::Error(h) => core_inner.error_or_defer(producer_id, h),
}
}
});
let outcome = ctx.subscribe_to(source, sink);
if matches!(outcome, crate::producer::SubscribeOutcome::Dead { .. }) {
let core_dead = em.clone();
let mut should_complete = false;
{
let mut s = state.lock().unwrap();
if !s.terminated && s.winner.is_none() {
s.completed[idx] = true;
if s.completed.iter().all(|&c| c) {
s.terminated = true;
should_complete = true;
}
}
}
if should_complete {
core_dead.complete_or_defer(producer_id);
}
}
}
});
let fn_id = binding.register_producer_build(build);
Ok(core
.register_producer(fn_id)
.expect("invariant: register_producer has no deps; no error variants reachable"))
}
struct TakeUntilState {
terminated: bool,
}
impl TakeUntilState {
fn new() -> Self {
Self { terminated: false }
}
}
#[must_use]
pub fn take_until(
core: &Core,
binding: &Arc<dyn ProducerBinding>,
source: NodeId,
notifier: NodeId,
) -> NodeId {
let build = Box::new(move |ctx: ProducerCtx<'_>| {
let producer_id = ctx.node_id();
let binding_clone = ctx.core().binding();
let em = ctx.emitter();
let state: Arc<Mutex<TakeUntilState>> = Arc::new(Mutex::new(TakeUntilState::new()));
let state_for_source = state.clone();
let core_for_source = em.clone();
let binding_for_source = binding_clone.clone();
let source_sink: Sink = Arc::new(move |msgs| {
enum Action {
Emit(HandleId),
Complete,
Error(HandleId),
}
let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
{
let mut s = state_for_source.lock().unwrap();
if s.terminated {
return;
}
for m in msgs {
match m.tier() {
3 => {
if let Some(h) = m.payload_handle() {
binding_for_source.retain_handle(h);
actions.push(Action::Emit(h));
}
}
5 => {
if let Some(h) = m.payload_handle() {
if !s.terminated {
s.terminated = true;
binding_for_source.retain_handle(h);
actions.push(Action::Error(h));
}
} else {
if !s.terminated {
s.terminated = true;
actions.push(Action::Complete);
}
}
}
_ => {} }
}
}
for action in actions {
match action {
Action::Emit(h) => core_for_source.emit_or_defer(producer_id, h),
Action::Complete => core_for_source.complete_or_defer(producer_id),
Action::Error(h) => core_for_source.error_or_defer(producer_id, h),
}
}
});
let source_outcome = ctx.subscribe_to(source, source_sink);
if matches!(
source_outcome,
crate::producer::SubscribeOutcome::Dead { .. }
) {
let core_dead = em.clone();
let mut should_complete = false;
{
let mut s = state.lock().unwrap();
if !s.terminated {
s.terminated = true;
should_complete = true;
}
}
if should_complete {
core_dead.complete_or_defer(producer_id);
}
}
let state_for_notifier = state.clone();
let core_for_notifier = em.clone();
let binding_for_notifier = binding_clone.clone();
let notifier_sink: Sink = Arc::new(move |msgs| {
enum Action {
Complete,
Error(HandleId),
}
let mut action: Option<Action> = None;
{
let mut s = state_for_notifier.lock().unwrap();
if s.terminated {
return;
}
for m in msgs {
match m.tier() {
3 => {
if m.payload_handle().is_some() && !s.terminated {
s.terminated = true;
action = Some(Action::Complete);
break;
}
}
5 => {
if let Some(h) = m.payload_handle() {
if !s.terminated {
s.terminated = true;
binding_for_notifier.retain_handle(h);
action = Some(Action::Error(h));
break;
}
}
}
_ => {} }
}
}
if let Some(a) = action {
match a {
Action::Complete => core_for_notifier.complete_or_defer(producer_id),
Action::Error(h) => core_for_notifier.error_or_defer(producer_id, h),
}
}
});
let _ = ctx.subscribe_to(notifier, notifier_sink);
});
let fn_id = binding.register_producer_build(build);
core.register_producer(fn_id)
.expect("invariant: register_producer has no deps; no error variants reachable")
}