#![allow(clippy::arc_with_non_send_sync)]
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::Duration;
use parking_lot::Mutex;
use graphrefly_core::{BindingBoundary, Core, FnId, HandleId, NodeId, Sink};
use smallvec::SmallVec;
use crate::producer::{
MailboxEmitter, ProducerBinding, ProducerBuildFn, ProducerCtx, SubscribeOutcome,
};
enum TemporalCmd {
Value(HandleId),
Complete,
Error(HandleId),
}
struct TemporalTaskGuard {
_tx: tokio::sync::mpsc::UnboundedSender<TemporalCmd>,
task: tokio::task::JoinHandle<()>,
}
impl Drop for TemporalTaskGuard {
fn drop(&mut self) {
self.task.abort();
}
}
struct AbortOnDrop(tokio::task::JoinHandle<()>);
impl Drop for AbortOnDrop {
fn drop(&mut self) {
self.0.abort();
}
}
#[must_use]
#[allow(clippy::too_many_lines)]
pub fn sample(
core: &Core,
binding: &Arc<dyn ProducerBinding>,
source: NodeId,
notifier: NodeId,
) -> NodeId {
let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
let core_s = ctx.core();
let binding_s = ctx.core().binding();
let em = ctx.emitter();
let pid = ctx.node_id();
let state: Arc<Mutex<SampleState>> = Arc::new(Mutex::new(SampleState::default()));
let st = state.clone();
let bb: Arc<dyn BindingBoundary> = binding_s.clone();
let core_src = em.clone();
let source_sink: Sink = Arc::new(move |msgs| {
enum Act {
Release(HandleId),
Error(HandleId),
}
let mut actions: SmallVec<[Act; 2]> = SmallVec::new();
{
let mut s = st.lock();
if s.terminated {
return;
}
for m in msgs {
match m.tier() {
3 => {
if let Some(h) = m.payload_handle() {
if let Some(old) = s.latest.replace(h) {
actions.push(Act::Release(old));
}
bb.retain_handle(h);
}
}
5 => {
if let Some(h) = m.payload_handle() {
s.terminated = true;
if let Some(old) = s.latest.take() {
actions.push(Act::Release(old));
}
bb.retain_handle(h);
actions.push(Act::Error(h));
} else {
s.source_completed = true;
if let Some(old) = s.latest.take() {
actions.push(Act::Release(old));
}
}
}
_ => {}
}
}
}
for a in actions {
match a {
Act::Release(h) => bb.release_handle(h),
Act::Error(h) => core_src.error_or_defer(pid, h),
}
}
});
let src_outcome = ctx.subscribe_to(source, source_sink);
if matches!(src_outcome, SubscribeOutcome::Dead { .. }) {
state.lock().source_completed = true;
}
let st2 = state.clone();
let core_n = em.clone();
let bb2: Arc<dyn BindingBoundary> = binding_s.clone();
let notifier_sink: Sink = Arc::new(move |msgs| {
let mut s = st2.lock();
if s.terminated {
return;
}
for m in msgs {
if s.terminated {
return;
}
match m.tier() {
3 if m.payload_handle().is_some()
&& !s.source_completed
&& s.latest.is_some() =>
{
let h = s.latest.unwrap();
bb2.retain_handle(h);
drop(s);
core_n.emit_or_defer(pid, h);
s = st2.lock();
}
5 => {
if let Some(h) = m.payload_handle() {
s.terminated = true;
if let Some(old) = s.latest.take() {
bb2.release_handle(old);
}
bb2.retain_handle(h);
drop(s);
core_n.error_or_defer(pid, h);
return;
}
s.terminated = true;
if let Some(old) = s.latest.take() {
bb2.release_handle(old);
}
drop(s);
core_n.complete_or_defer(pid);
return;
}
_ => {}
}
}
});
let not_outcome = ctx.subscribe_to(notifier, notifier_sink);
if matches!(not_outcome, SubscribeOutcome::Dead { .. }) {
let mut s = state.lock();
if !s.terminated {
s.terminated = true;
if let Some(old) = s.latest.take() {
binding_s.release_handle(old);
}
drop(s);
core_s.complete_or_defer(pid);
}
}
});
let fn_id = binding.register_producer_build(build);
core.register_producer(fn_id)
.expect("sample: register_producer failed")
}
#[derive(Default)]
struct SampleState {
latest: Option<HandleId>,
source_completed: bool,
terminated: bool,
}
#[must_use]
pub fn debounce(
core: &Core,
binding: &Arc<dyn ProducerBinding>,
source: NodeId,
ms: u64,
) -> NodeId {
let delay = Duration::from_millis(ms);
let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
let binding_s = ctx.core().binding();
let em = ctx.emitter();
let pid = ctx.node_id();
let bb: Arc<dyn BindingBoundary> = binding_s.clone();
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let tx_sink = tx.clone();
let tx_dead = tx.clone();
let task = tokio::spawn(debounce_task(rx, em.emitter(), pid, bb.clone(), delay));
{
let st = ctx.storage();
let mut storage = st.lock();
let entry = storage.entry(pid).or_default();
entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
}
let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
let source_sink: Sink = Arc::new(move |msgs| {
for m in msgs {
match m.tier() {
3 => {
if let Some(h) = m.payload_handle() {
bb_sink.retain_handle(h);
if tx_sink.send(TemporalCmd::Value(h)).is_err() {
bb_sink.release_handle(h);
}
}
}
5 => {
if let Some(h) = m.payload_handle() {
bb_sink.retain_handle(h);
if tx_sink.send(TemporalCmd::Error(h)).is_err() {
bb_sink.release_handle(h);
}
} else {
let _ = tx_sink.send(TemporalCmd::Complete);
}
}
_ => {}
}
}
});
let outcome = ctx.subscribe_to(source, source_sink);
if matches!(outcome, SubscribeOutcome::Dead { .. }) {
let _ = tx_dead.send(TemporalCmd::Complete);
}
});
let fn_id = binding.register_producer_build(build);
core.register_producer(fn_id)
.expect("debounce: register_producer failed")
}
async fn debounce_task(
mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
em: MailboxEmitter,
pid: NodeId,
binding: Arc<dyn BindingBoundary>,
delay: Duration,
) {
let mut pending: Option<HandleId> = None;
loop {
if let Some(h) = pending {
tokio::select! {
biased;
cmd = rx.recv() => {
match cmd {
Some(TemporalCmd::Value(new_h)) => {
binding.release_handle(h);
pending = Some(new_h);
}
Some(TemporalCmd::Complete) => {
em.emit_or_defer(pid, h);
em.complete_or_defer(pid);
return;
}
Some(TemporalCmd::Error(err_h)) => {
binding.release_handle(h);
em.error_or_defer(pid, err_h);
return;
}
None => {
binding.release_handle(h);
return;
}
}
}
() = tokio::time::sleep(delay) => {
em.emit_or_defer(pid, h);
if em.is_core_gone() {
return;
}
pending = None;
}
}
} else {
match rx.recv().await {
Some(TemporalCmd::Value(h)) => {
pending = Some(h);
}
Some(TemporalCmd::Complete) => {
em.complete_or_defer(pid);
return;
}
Some(TemporalCmd::Error(err_h)) => {
em.error_or_defer(pid, err_h);
return;
}
None => return,
}
}
}
}
#[must_use]
pub fn audit(core: &Core, binding: &Arc<dyn ProducerBinding>, source: NodeId, ms: u64) -> NodeId {
let delay = Duration::from_millis(ms);
let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
let binding_s = ctx.core().binding();
let em = ctx.emitter();
let pid = ctx.node_id();
let bb: Arc<dyn BindingBoundary> = binding_s.clone();
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let tx_sink = tx.clone();
let tx_dead = tx.clone();
let task = tokio::spawn(audit_task(rx, em.emitter(), pid, bb.clone(), delay));
{
let st = ctx.storage();
let mut storage = st.lock();
let entry = storage.entry(pid).or_default();
entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
}
let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
let source_sink: Sink = Arc::new(move |msgs| {
for m in msgs {
match m.tier() {
3 => {
if let Some(h) = m.payload_handle() {
bb_sink.retain_handle(h);
if tx_sink.send(TemporalCmd::Value(h)).is_err() {
bb_sink.release_handle(h);
}
}
}
5 => {
if let Some(h) = m.payload_handle() {
bb_sink.retain_handle(h);
if tx_sink.send(TemporalCmd::Error(h)).is_err() {
bb_sink.release_handle(h);
}
} else {
let _ = tx_sink.send(TemporalCmd::Complete);
}
}
_ => {}
}
}
});
let outcome = ctx.subscribe_to(source, source_sink);
if matches!(outcome, SubscribeOutcome::Dead { .. }) {
let _ = tx_dead.send(TemporalCmd::Complete);
}
});
let fn_id = binding.register_producer_build(build);
core.register_producer(fn_id)
.expect("audit: register_producer failed")
}
async fn audit_task(
mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
em: MailboxEmitter,
pid: NodeId,
binding: Arc<dyn BindingBoundary>,
delay: Duration,
) {
loop {
match rx.recv().await {
Some(TemporalCmd::Value(h)) => {
let mut latest = h;
tokio::select! {
biased;
() = async {
loop {
tokio::select! {
biased;
cmd = rx.recv() => {
match cmd {
Some(TemporalCmd::Value(new_h)) => {
binding.release_handle(latest);
latest = new_h;
}
Some(TemporalCmd::Complete) => {
em.emit_or_defer(pid, latest);
em.complete_or_defer(pid);
return; }
Some(TemporalCmd::Error(err_h)) => {
binding.release_handle(latest);
em.error_or_defer(pid, err_h);
return;
}
None => {
binding.release_handle(latest);
return;
}
}
}
}
}
} => {
return; }
() = tokio::time::sleep(delay) => {
em.emit_or_defer(pid, latest);
if em.is_core_gone() {
return;
}
}
}
}
Some(TemporalCmd::Complete) => {
em.complete_or_defer(pid);
return;
}
Some(TemporalCmd::Error(err_h)) => {
em.error_or_defer(pid, err_h);
return;
}
None => return,
}
}
}
#[must_use]
pub fn delay(core: &Core, binding: &Arc<dyn ProducerBinding>, source: NodeId, ms: u64) -> NodeId {
let delay_dur = Duration::from_millis(ms);
let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
let binding_s = ctx.core().binding();
let em = ctx.emitter();
let pid = ctx.node_id();
let bb: Arc<dyn BindingBoundary> = binding_s.clone();
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let tx_sink = tx.clone();
let tx_dead = tx.clone();
let task = tokio::spawn(delay_task(rx, em.emitter(), pid, bb.clone(), delay_dur));
{
let st = ctx.storage();
let mut storage = st.lock();
let entry = storage.entry(pid).or_default();
entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
}
let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
let source_sink: Sink = Arc::new(move |msgs| {
for m in msgs {
match m.tier() {
3 => {
if let Some(h) = m.payload_handle() {
bb_sink.retain_handle(h);
if tx_sink.send(TemporalCmd::Value(h)).is_err() {
bb_sink.release_handle(h);
}
}
}
5 => {
if let Some(h) = m.payload_handle() {
bb_sink.retain_handle(h);
if tx_sink.send(TemporalCmd::Error(h)).is_err() {
bb_sink.release_handle(h);
}
} else {
let _ = tx_sink.send(TemporalCmd::Complete);
}
}
_ => {}
}
}
});
let outcome = ctx.subscribe_to(source, source_sink);
if matches!(outcome, SubscribeOutcome::Dead { .. }) {
let _ = tx_dead.send(TemporalCmd::Complete);
}
});
let fn_id = binding.register_producer_build(build);
core.register_producer(fn_id)
.expect("delay: register_producer failed")
}
async fn delay_task(
mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
em: MailboxEmitter,
pid: NodeId,
binding: Arc<dyn BindingBoundary>,
delay: Duration,
) {
let mut queue: VecDeque<(tokio::time::Instant, HandleId)> = VecDeque::new();
let mut complete_pending = false;
loop {
let next_fire = queue.front().map(|(deadline, _)| *deadline);
tokio::select! {
biased;
cmd = rx.recv() => {
match cmd {
Some(TemporalCmd::Value(h)) => {
queue.push_back((tokio::time::Instant::now() + delay, h));
}
Some(TemporalCmd::Complete) => {
complete_pending = true;
if queue.is_empty() {
em.complete_or_defer(pid);
return;
}
}
Some(TemporalCmd::Error(err_h)) => {
for (_, h) in queue.drain(..) {
binding.release_handle(h);
}
em.error_or_defer(pid, err_h);
return;
}
None => {
for (_, h) in queue.drain(..) {
binding.release_handle(h);
}
return;
}
}
}
() = sleep_until_or_forever(next_fire) => {
let now = tokio::time::Instant::now();
while let Some(&(deadline, _)) = queue.front() {
if deadline <= now {
let (_, h) = queue.pop_front().unwrap();
em.emit_or_defer(pid, h);
if em.is_core_gone() {
for (_, h2) in queue.drain(..) {
binding.release_handle(h2);
}
return;
}
} else {
break;
}
}
if complete_pending && queue.is_empty() {
em.complete_or_defer(pid);
return;
}
}
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct ThrottleOpts {
pub leading: bool,
pub trailing: bool,
}
impl Default for ThrottleOpts {
fn default() -> Self {
Self {
leading: true,
trailing: false,
}
}
}
#[must_use]
pub fn throttle(
core: &Core,
binding: &Arc<dyn ProducerBinding>,
source: NodeId,
ms: u64,
opts: ThrottleOpts,
) -> NodeId {
let window = Duration::from_millis(ms);
let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
let binding_s = ctx.core().binding();
let em = ctx.emitter();
let pid = ctx.node_id();
let bb: Arc<dyn BindingBoundary> = binding_s.clone();
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let tx_sink = tx.clone();
let tx_dead = tx.clone();
let task = tokio::spawn(throttle_task(
rx,
em.emitter(),
pid,
bb.clone(),
window,
opts,
));
{
let st = ctx.storage();
let mut storage = st.lock();
let entry = storage.entry(pid).or_default();
entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
}
let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
let source_sink: Sink = Arc::new(move |msgs| {
for m in msgs {
match m.tier() {
3 => {
if let Some(h) = m.payload_handle() {
bb_sink.retain_handle(h);
if tx_sink.send(TemporalCmd::Value(h)).is_err() {
bb_sink.release_handle(h);
}
}
}
5 => {
if let Some(h) = m.payload_handle() {
bb_sink.retain_handle(h);
if tx_sink.send(TemporalCmd::Error(h)).is_err() {
bb_sink.release_handle(h);
}
} else {
let _ = tx_sink.send(TemporalCmd::Complete);
}
}
_ => {}
}
}
});
let outcome = ctx.subscribe_to(source, source_sink);
if matches!(outcome, SubscribeOutcome::Dead { .. }) {
let _ = tx_dead.send(TemporalCmd::Complete);
}
});
let fn_id = binding.register_producer_build(build);
core.register_producer(fn_id)
.expect("throttle: register_producer failed")
}
async fn throttle_task(
mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
em: MailboxEmitter,
pid: NodeId,
binding: Arc<dyn BindingBoundary>,
window: Duration,
opts: ThrottleOpts,
) {
let mut trailing_pending: Option<HandleId> = None;
let mut window_deadline: Option<tokio::time::Instant> = None;
loop {
let fire_at = if opts.trailing { window_deadline } else { None };
tokio::select! {
biased;
cmd = rx.recv() => {
match cmd {
Some(TemporalCmd::Value(h)) => {
let in_window = window_deadline
.is_some_and(|d| tokio::time::Instant::now() < d);
if !in_window {
window_deadline = Some(tokio::time::Instant::now() + window);
if opts.leading {
em.emit_or_defer(pid, h);
if em.is_core_gone() {
release_opt(&mut trailing_pending, &*binding);
return;
}
} else if opts.trailing {
if let Some(old) = trailing_pending.replace(h) {
binding.release_handle(old);
}
} else {
binding.release_handle(h);
}
} else if opts.trailing {
if let Some(old) = trailing_pending.replace(h) {
binding.release_handle(old);
}
} else {
binding.release_handle(h);
}
}
Some(TemporalCmd::Complete) => {
if let Some(h) = trailing_pending.take() {
em.emit_or_defer(pid, h);
}
em.complete_or_defer(pid);
return;
}
Some(TemporalCmd::Error(err_h)) => {
release_opt(&mut trailing_pending, &*binding);
em.error_or_defer(pid, err_h);
return;
}
None => {
release_opt(&mut trailing_pending, &*binding);
return;
}
}
}
() = sleep_until_or_forever(fire_at) => {
window_deadline = None;
if let Some(h) = trailing_pending.take() {
em.emit_or_defer(pid, h);
if em.is_core_gone() {
return;
}
window_deadline = Some(tokio::time::Instant::now() + window);
}
}
}
}
}
#[must_use]
pub fn interval(core: &Core, binding: &Arc<dyn ProducerBinding>, period_ms: u64) -> NodeId {
let period = Duration::from_millis(period_ms);
let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
let binding_s = ctx.core().binding();
let em = ctx.emitter();
let pid = ctx.node_id();
let bb: Arc<dyn BindingBoundary> = binding_s.clone();
let em_task = em.emitter();
let task = tokio::spawn(async move {
let mut ticker = tokio::time::interval(period);
ticker.tick().await; let mut counter: u64 = 1; loop {
ticker.tick().await;
if em_task.is_core_gone() {
break;
}
let h = HandleId::new(counter);
bb.retain_handle(h);
em_task.emit_or_defer(pid, h);
counter += 1;
}
});
{
let st = ctx.storage();
let mut storage = st.lock();
let entry = storage.entry(pid).or_default();
entry.op_state = Some(Box::new(AbortOnDrop(task)));
}
});
let fn_id = binding.register_producer_build(build);
core.register_producer(fn_id)
.expect("interval: register_producer failed")
}
async fn sleep_until_or_forever(deadline: Option<tokio::time::Instant>) {
match deadline {
Some(d) => tokio::time::sleep_until(d).await,
None => std::future::pending::<()>().await,
}
}
fn release_opt(opt: &mut Option<HandleId>, binding: &dyn BindingBoundary) {
if let Some(h) = opt.take() {
binding.release_handle(h);
}
}
#[must_use]
pub fn timeout(
core: &Core,
binding: &Arc<dyn ProducerBinding>,
source: NodeId,
ms: u64,
error_handle: HandleId,
) -> NodeId {
let duration = Duration::from_millis(ms);
let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
let binding_s = ctx.core().binding();
let em = ctx.emitter();
let pid = ctx.node_id();
let bb: Arc<dyn BindingBoundary> = binding_s.clone();
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let tx_sink = tx.clone();
let tx_dead = tx.clone();
let task = tokio::spawn(timeout_task(
rx,
em.emitter(),
pid,
bb.clone(),
duration,
error_handle,
));
{
let st = ctx.storage();
let mut storage = st.lock();
let entry = storage.entry(pid).or_default();
entry.op_state = Some(Box::new(TemporalTaskGuard { _tx: tx, task }));
}
let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
let source_sink: Sink = Arc::new(move |msgs| {
for m in msgs {
match m.tier() {
3 => {
if let Some(h) = m.payload_handle() {
bb_sink.retain_handle(h);
if tx_sink.send(TemporalCmd::Value(h)).is_err() {
bb_sink.release_handle(h);
}
}
}
5 => {
if let Some(h) = m.payload_handle() {
bb_sink.retain_handle(h);
if tx_sink.send(TemporalCmd::Error(h)).is_err() {
bb_sink.release_handle(h);
}
} else {
let _ = tx_sink.send(TemporalCmd::Complete);
}
}
_ => {}
}
}
});
let outcome = ctx.subscribe_to(source, source_sink);
if matches!(outcome, SubscribeOutcome::Dead { .. }) {
let _ = tx_dead.send(TemporalCmd::Complete);
}
});
let fn_id = binding.register_producer_build(build);
core.register_producer(fn_id)
.expect("timeout: register_producer failed")
}
async fn timeout_task(
mut rx: tokio::sync::mpsc::UnboundedReceiver<TemporalCmd>,
em: MailboxEmitter,
pid: NodeId,
binding: Arc<dyn BindingBoundary>,
duration: Duration,
error_handle: HandleId,
) {
loop {
tokio::select! {
biased;
cmd = rx.recv() => {
match cmd {
Some(TemporalCmd::Value(h)) => {
em.emit_or_defer(pid, h);
if em.is_core_gone() {
return;
}
}
Some(TemporalCmd::Complete) => {
em.complete_or_defer(pid);
return;
}
Some(TemporalCmd::Error(err_h)) => {
em.error_or_defer(pid, err_h);
return;
}
None => return,
}
}
() = tokio::time::sleep(duration) => {
if !em.is_core_gone() {
binding.retain_handle(error_handle);
em.error_or_defer(pid, error_handle);
}
return;
}
}
}
}
enum BufferTimeCmd {
Value(HandleId),
Complete,
Error(HandleId),
}
#[must_use]
pub fn buffer_time(
core: &Core,
binding: &Arc<dyn ProducerBinding>,
source: NodeId,
ms: u64,
pack_fn_id: FnId,
) -> NodeId {
let period = Duration::from_millis(ms);
let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
let binding_s = ctx.core().binding();
let em = ctx.emitter();
let pid = ctx.node_id();
let bb: Arc<dyn BindingBoundary> = binding_s.clone();
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let tx_sink = tx.clone();
let tx_dead = tx.clone();
let task = tokio::spawn(buffer_time_task(
rx,
em.emitter(),
pid,
bb.clone(),
period,
pack_fn_id,
));
{
let st = ctx.storage();
let mut storage = st.lock();
let entry = storage.entry(pid).or_default();
entry.op_state = Some(Box::new(BufferTimeTaskGuard { _tx: tx, task }));
}
let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
let source_sink: Sink = Arc::new(move |msgs| {
for m in msgs {
match m.tier() {
3 => {
if let Some(h) = m.payload_handle() {
bb_sink.retain_handle(h);
if tx_sink.send(BufferTimeCmd::Value(h)).is_err() {
bb_sink.release_handle(h);
}
}
}
5 => {
if let Some(h) = m.payload_handle() {
bb_sink.retain_handle(h);
if tx_sink.send(BufferTimeCmd::Error(h)).is_err() {
bb_sink.release_handle(h);
}
} else {
let _ = tx_sink.send(BufferTimeCmd::Complete);
}
}
_ => {}
}
}
});
let outcome = ctx.subscribe_to(source, source_sink);
if matches!(outcome, SubscribeOutcome::Dead { .. }) {
let _ = tx_dead.send(BufferTimeCmd::Complete);
}
});
let fn_id = binding.register_producer_build(build);
core.register_producer(fn_id)
.expect("buffer_time: register_producer failed")
}
struct BufferTimeTaskGuard {
_tx: tokio::sync::mpsc::UnboundedSender<BufferTimeCmd>,
task: tokio::task::JoinHandle<()>,
}
impl Drop for BufferTimeTaskGuard {
fn drop(&mut self) {
self.task.abort();
}
}
async fn buffer_time_task(
mut rx: tokio::sync::mpsc::UnboundedReceiver<BufferTimeCmd>,
em: MailboxEmitter,
pid: NodeId,
binding: Arc<dyn BindingBoundary>,
period: Duration,
pack_fn_id: FnId,
) {
let mut buf: Vec<HandleId> = Vec::new();
let mut ticker = tokio::time::interval(period);
ticker.tick().await;
loop {
tokio::select! {
biased;
cmd = rx.recv() => {
match cmd {
Some(BufferTimeCmd::Value(h)) => {
buf.push(h);
}
Some(BufferTimeCmd::Complete) => {
if !buf.is_empty() {
let packed = binding.pack_tuple(pack_fn_id, &buf);
em.emit_or_defer(pid, packed);
for h in buf.drain(..) {
binding.release_handle(h);
}
}
em.complete_or_defer(pid);
return;
}
Some(BufferTimeCmd::Error(err_h)) => {
for h in buf.drain(..) {
binding.release_handle(h);
}
em.error_or_defer(pid, err_h);
return;
}
None => {
for h in buf.drain(..) {
binding.release_handle(h);
}
return;
}
}
}
_ = ticker.tick() => {
if !buf.is_empty() {
let packed = binding.pack_tuple(pack_fn_id, &buf);
em.emit_or_defer(pid, packed);
for h in buf.drain(..) {
binding.release_handle(h);
}
if em.is_core_gone() {
return;
}
}
}
}
}
}
enum WindowTimeCmd {
Value(HandleId),
Complete,
Error(HandleId),
}
#[must_use]
pub fn window_time(
core: &Core,
binding: &Arc<dyn ProducerBinding>,
source: NodeId,
ms: u64,
) -> NodeId {
let period = Duration::from_millis(ms);
let noop_fn_id = binding.register_producer_build(Box::new(|_ctx: ProducerCtx<'_>| {
}));
let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
let core_s = ctx.core();
let binding_s = ctx.core().binding();
let em = ctx.emitter();
let pid = ctx.node_id();
let bb: Arc<dyn BindingBoundary> = binding_s.clone();
let first_inner = core_s
.register_producer(noop_fn_id)
.expect("window_time inner: register_producer failed");
let first_handle = bb.intern_node(first_inner);
core_s.emit_or_defer(pid, first_handle);
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let tx_sink = tx.clone();
let tx_dead = tx.clone();
let task = tokio::spawn(window_time_task(
rx,
em.emitter(),
pid,
first_inner,
bb.clone(),
period,
noop_fn_id,
));
{
let st = ctx.storage();
let mut storage = st.lock();
let entry = storage.entry(pid).or_default();
entry.op_state = Some(Box::new(WindowTimeTaskGuard { _tx: tx, task }));
}
let bb_sink: Arc<dyn BindingBoundary> = binding_s.clone();
let source_sink: Sink = Arc::new(move |msgs| {
for m in msgs {
match m.tier() {
3 => {
if let Some(h) = m.payload_handle() {
bb_sink.retain_handle(h);
if tx_sink.send(WindowTimeCmd::Value(h)).is_err() {
bb_sink.release_handle(h);
}
}
}
5 => {
if let Some(h) = m.payload_handle() {
bb_sink.retain_handle(h);
if tx_sink.send(WindowTimeCmd::Error(h)).is_err() {
bb_sink.release_handle(h);
}
} else {
let _ = tx_sink.send(WindowTimeCmd::Complete);
}
}
_ => {}
}
}
});
let outcome = ctx.subscribe_to(source, source_sink);
if matches!(outcome, SubscribeOutcome::Dead { .. }) {
let _ = tx_dead.send(WindowTimeCmd::Complete);
}
});
let fn_id = binding.register_producer_build(build);
core.register_producer(fn_id)
.expect("window_time: register_producer failed")
}
struct WindowTimeTaskGuard {
_tx: tokio::sync::mpsc::UnboundedSender<WindowTimeCmd>,
task: tokio::task::JoinHandle<()>,
}
impl Drop for WindowTimeTaskGuard {
fn drop(&mut self) {
self.task.abort();
}
}
async fn window_time_task(
mut rx: tokio::sync::mpsc::UnboundedReceiver<WindowTimeCmd>,
em: MailboxEmitter,
pid: NodeId,
initial_inner: NodeId,
binding: Arc<dyn BindingBoundary>,
period: Duration,
noop_fn_id: FnId,
) {
let current_inner = Arc::new(Mutex::new(initial_inner));
let mut ticker = tokio::time::interval(period);
ticker.tick().await;
loop {
tokio::select! {
biased;
cmd = rx.recv() => {
match cmd {
Some(WindowTimeCmd::Value(h)) => {
let cur = current_inner.clone();
let b = binding.clone();
if !em.defer(move |c| {
c.emit(*cur.lock(), h);
}) {
b.release_handle(h);
}
}
Some(WindowTimeCmd::Complete) => {
let cur = current_inner.clone();
let _ = em.defer(move |c| {
c.complete(*cur.lock());
c.complete(pid);
});
return;
}
Some(WindowTimeCmd::Error(err_h)) => {
let cur = current_inner.clone();
let b = binding.clone();
let b2 = binding.clone();
if !em.defer(move |c| {
b2.retain_handle(err_h);
c.error(*cur.lock(), err_h);
c.error(pid, err_h);
}) {
b.release_handle(err_h);
}
return;
}
None => {
return;
}
}
}
_ = ticker.tick() => {
let cur = current_inner.clone();
let b = binding.clone();
let _ = em.defer(move |c| {
let old = *cur.lock();
c.complete(old);
match c.register_producer(noop_fn_id) {
Ok(new_inner) => {
*cur.lock() = new_inner;
let h = b.intern_node(new_inner);
c.emit(pid, h);
}
Err(_) => {
c.complete(pid);
}
}
});
if em.is_core_gone() {
return;
}
}
}
}
}