use super::*;
use crate::stream::runtime::current_stream_cancelled;
use std::collections::VecDeque;
use std::mem;
use std::panic::{AssertUnwindSafe, catch_unwind};
use std::time::Instant;
const DELAY_BUFFER_CAPACITY: usize = 16;
const WAIT_POLL_INTERVAL: Duration = Duration::from_millis(10);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ThrottleMode {
Shaping,
Enforcing,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DelayOverflowStrategy {
EmitEarly,
DropHead,
DropTail,
DropBuffer,
DropNew,
Backpressure,
Fail,
}
#[derive(Clone)]
enum TerminalSignal {
Complete,
Error(StreamError),
}
struct DelayQueueShared<T> {
state: Mutex<DelayQueueState<T>>,
available: Condvar,
cancelled: Arc<AtomicBool>,
}
struct DelayQueueState<T> {
queue: VecDeque<(Instant, T)>,
terminal: Option<TerminalSignal>,
}
impl<T> DelayQueueShared<T> {
fn new() -> Arc<Self> {
Arc::new(Self {
state: Mutex::new(DelayQueueState {
queue: VecDeque::new(),
terminal: None,
}),
available: Condvar::new(),
cancelled: Arc::new(AtomicBool::new(false)),
})
}
}
struct DelayQueueStream<T> {
shared: Arc<DelayQueueShared<T>>,
completion: Option<StreamCompletion<NotUsed>>,
}
impl<T> Iterator for DelayQueueStream<T> {
type Item = StreamResult<T>;
fn next(&mut self) -> Option<Self::Item> {
let mut state = self
.shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
loop {
if let Some((deadline, _)) = state.queue.front() {
let now = Instant::now();
if *deadline <= now {
let (_, item) = state.queue.pop_front().expect("front element present");
drop(state);
self.shared.available.notify_all();
return Some(Ok(item));
}
let wait = (*deadline - now).min(WAIT_POLL_INTERVAL);
let (next, _) = self
.shared
.available
.wait_timeout(state, wait)
.unwrap_or_else(|poison| poison.into_inner());
state = next;
continue;
}
if let Some(terminal) = state.terminal.clone() {
return match terminal {
TerminalSignal::Complete => None,
TerminalSignal::Error(error) => Some(Err(error)),
};
}
if self.shared.cancelled.load(Ordering::SeqCst) {
return Some(Err(StreamError::Cancelled));
}
let (next, _) = self
.shared
.available
.wait_timeout(state, WAIT_POLL_INTERVAL)
.unwrap_or_else(|poison| poison.into_inner());
state = next;
}
}
}
impl<T> Drop for DelayQueueStream<T> {
fn drop(&mut self) {
self.shared.cancelled.store(true, Ordering::SeqCst);
self.shared.available.notify_all();
let _ = self.completion.take();
}
}
struct SlotShared<T, Extra> {
state: Mutex<SlotState<T, Extra>>,
available: Condvar,
cancelled: Arc<AtomicBool>,
}
struct SlotState<T, Extra> {
slot: Option<T>,
terminal: Option<TerminalSignal>,
extra: Extra,
}
impl<T, Extra> SlotShared<T, Extra> {
fn new(extra: Extra) -> Arc<Self> {
Arc::new(Self {
state: Mutex::new(SlotState {
slot: None,
terminal: None,
extra,
}),
available: Condvar::new(),
cancelled: Arc::new(AtomicBool::new(false)),
})
}
}
struct SlotStream<T, Extra> {
shared: Arc<SlotShared<T, Extra>>,
completion: Option<StreamCompletion<NotUsed>>,
}
impl<T, Extra> Iterator for SlotStream<T, Extra> {
type Item = StreamResult<T>;
fn next(&mut self) -> Option<Self::Item> {
let mut state = self
.shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
loop {
if let Some(item) = state.slot.take() {
drop(state);
self.shared.available.notify_all();
return Some(Ok(item));
}
if let Some(terminal) = state.terminal.clone() {
return match terminal {
TerminalSignal::Complete => None,
TerminalSignal::Error(error) => Some(Err(error)),
};
}
if self.shared.cancelled.load(Ordering::SeqCst) {
return Some(Err(StreamError::Cancelled));
}
let (next, _) = self
.shared
.available
.wait_timeout(state, WAIT_POLL_INTERVAL)
.unwrap_or_else(|poison| poison.into_inner());
state = next;
}
}
}
impl<T, Extra> Drop for SlotStream<T, Extra> {
fn drop(&mut self) {
self.shared.cancelled.store(true, Ordering::SeqCst);
self.shared.available.notify_all();
let _ = self.completion.take();
}
}
fn finish_delay_queue<T>(shared: &DelayQueueShared<T>, terminal: TerminalSignal) {
let mut state = shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
if state.terminal.is_none() {
state.terminal = Some(terminal);
}
drop(state);
shared.available.notify_all();
}
fn finish_slot<T, Extra>(shared: &SlotShared<T, Extra>, terminal: TerminalSignal) {
let mut state = shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
if state.terminal.is_none() {
state.terminal = Some(terminal);
}
drop(state);
shared.available.notify_all();
}
struct QueuePanicGuard<T> {
shared: Arc<DelayQueueShared<T>>,
armed: bool,
}
impl<T> QueuePanicGuard<T> {
fn new(shared: Arc<DelayQueueShared<T>>) -> Self {
Self {
shared,
armed: true,
}
}
fn disarm(&mut self) {
self.armed = false;
}
}
impl<T> Drop for QueuePanicGuard<T> {
fn drop(&mut self) {
if self.armed {
finish_delay_queue(
&self.shared,
TerminalSignal::Error(StreamError::AbruptTermination),
);
}
}
}
struct SlotPanicGuard<T, Extra> {
shared: Arc<SlotShared<T, Extra>>,
armed: bool,
}
impl<T, Extra> SlotPanicGuard<T, Extra> {
fn new(shared: Arc<SlotShared<T, Extra>>) -> Self {
Self {
shared,
armed: true,
}
}
fn disarm(&mut self) {
self.armed = false;
}
}
impl<T, Extra> Drop for SlotPanicGuard<T, Extra> {
fn drop(&mut self) {
if self.armed {
finish_slot(
&self.shared,
TerminalSignal::Error(StreamError::AbruptTermination),
);
}
}
}
fn cloned_materializer(materializer: &Materializer) -> Materializer {
materializer.with_name_prefix(Arc::from(materializer.name_prefix()))
}
fn wait_for_timer(materializer: &Materializer, delay: Duration) -> StreamResult<()> {
if delay.is_zero() {
return Ok(());
}
let gate = Arc::new((Mutex::new(false), Condvar::new()));
let gate_task = Arc::clone(&gate);
let _timer = materializer.schedule_once(delay, move || {
let (lock, condvar) = &*gate_task;
let mut done = lock.lock().unwrap_or_else(|poison| poison.into_inner());
*done = true;
drop(done);
condvar.notify_all();
});
let (lock, condvar) = &*gate;
let mut done = lock.lock().unwrap_or_else(|poison| poison.into_inner());
while !*done {
if materializer.is_shutdown() {
return Err(StreamError::AbruptTermination);
}
if current_stream_cancelled()
.as_ref()
.is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
{
return Err(StreamError::Cancelled);
}
let (next, _) = condvar
.wait_timeout(done, WAIT_POLL_INTERVAL)
.unwrap_or_else(|poison| poison.into_inner());
done = next;
}
Ok(())
}
#[derive(Debug)]
struct TokenBucket {
available: f64,
last: Instant,
capacity: f64,
nanos_between_tokens: f64,
}
impl TokenBucket {
fn new(cost: u64, per: Duration, maximum_burst: i32) -> Self {
assert!(cost > 0, "throttle cost must be greater than zero");
assert!(
per > Duration::ZERO,
"throttle period must be greater than zero"
);
assert!(
per.as_nanos() >= u128::from(cost),
"Rates larger than 1 unit / nanosecond are not supported"
);
assert!(maximum_burst >= -1, "maximum_burst must be -1 or greater");
let nanos_between_tokens = per.as_nanos() as f64 / cost as f64;
let capacity = if maximum_burst == -1 {
let automatic = ((100_000_000_f64 / nanos_between_tokens).max(1.0)).floor();
automatic.max(1.0)
} else {
maximum_burst as f64
};
Self {
available: capacity,
last: Instant::now(),
capacity,
nanos_between_tokens,
}
}
fn offer(&mut self, cost: u64) -> Duration {
let now = Instant::now();
if now > self.last {
let elapsed = now.duration_since(self.last).as_nanos() as f64;
let replenished = elapsed / self.nanos_between_tokens;
self.available = (self.available + replenished).min(self.capacity);
self.last = now;
}
let cost = cost as f64;
if self.available >= cost {
self.available -= cost;
Duration::ZERO
} else {
let needed = cost - self.available;
self.available = 0.0;
let delay_nanos = (needed * self.nanos_between_tokens).ceil() as u64;
self.last = now + Duration::from_nanos(delay_nanos);
Duration::from_nanos(delay_nanos)
}
}
}
fn schedule_notify<T>(
materializer: &Materializer,
shared: Arc<DelayQueueShared<T>>,
delay: Duration,
) where
T: Send + 'static,
{
if delay.is_zero() {
shared.available.notify_all();
return;
}
let _timer = materializer.schedule_once(delay, move || {
shared.available.notify_all();
});
}
fn delay_stage<Out, Supplier, Strategy>(
input: BoxStream<Out>,
delay_strategy_supplier: Arc<Supplier>,
overflow_strategy: DelayOverflowStrategy,
materializer: &Materializer,
) -> StreamResult<BoxStream<Out>>
where
Out: Send + 'static,
Supplier: Fn() -> Strategy + Send + Sync + 'static,
Strategy: FnMut(&Out) -> Duration + Send + 'static,
{
let shared = DelayQueueShared::new();
let producer_shared = Arc::clone(&shared);
let cancelled = Arc::clone(&shared.cancelled);
let state = Arc::clone(&materializer.inner.state);
let materializer = cloned_materializer(materializer);
let task_materializer = materializer.clone();
let completion = materializer.spawn_stream(move |_| {
let mut panic_guard = QueuePanicGuard::new(Arc::clone(&producer_shared));
let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
let mut delay_strategy = delay_strategy_supplier();
loop {
if cancelled.load(Ordering::SeqCst) {
panic_guard.disarm();
return Ok(NotUsed);
}
match input.next() {
Some(Ok(item)) => {
let delay = match catch_unwind(AssertUnwindSafe(|| delay_strategy(&item))) {
Ok(delay) => delay,
Err(_) => {
panic_guard.disarm();
finish_delay_queue(
&producer_shared,
TerminalSignal::Error(StreamError::AbruptTermination),
);
return Ok(NotUsed);
}
};
let deadline = Instant::now() + delay;
let mut state = producer_shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
match overflow_strategy {
DelayOverflowStrategy::Backpressure => {
while state.queue.len() == DELAY_BUFFER_CAPACITY
&& !cancelled.load(Ordering::SeqCst)
{
state = producer_shared
.available
.wait(state)
.unwrap_or_else(|poison| poison.into_inner());
}
if cancelled.load(Ordering::SeqCst) {
panic_guard.disarm();
return Ok(NotUsed);
}
let was_empty = state.queue.is_empty();
state.queue.push_back((deadline, item));
drop(state);
if was_empty {
schedule_notify(
&task_materializer,
Arc::clone(&producer_shared),
delay,
);
}
producer_shared.available.notify_all();
}
DelayOverflowStrategy::DropHead => {
if state.queue.len() == DELAY_BUFFER_CAPACITY {
let _ = state.queue.pop_front();
}
let was_empty = state.queue.is_empty();
state.queue.push_back((deadline, item));
drop(state);
if was_empty {
schedule_notify(
&task_materializer,
Arc::clone(&producer_shared),
delay,
);
}
producer_shared.available.notify_all();
}
DelayOverflowStrategy::DropTail => {
if state.queue.len() == DELAY_BUFFER_CAPACITY {
let _ = state.queue.pop_back();
}
let was_empty = state.queue.is_empty();
state.queue.push_back((deadline, item));
drop(state);
if was_empty {
schedule_notify(
&task_materializer,
Arc::clone(&producer_shared),
delay,
);
}
producer_shared.available.notify_all();
}
DelayOverflowStrategy::DropBuffer => {
if state.queue.len() == DELAY_BUFFER_CAPACITY {
state.queue.clear();
}
let was_empty = state.queue.is_empty();
state.queue.push_back((deadline, item));
drop(state);
if was_empty {
schedule_notify(
&task_materializer,
Arc::clone(&producer_shared),
delay,
);
}
producer_shared.available.notify_all();
}
DelayOverflowStrategy::DropNew => {
if state.queue.len() < DELAY_BUFFER_CAPACITY {
let was_empty = state.queue.is_empty();
state.queue.push_back((deadline, item));
drop(state);
if was_empty {
schedule_notify(
&task_materializer,
Arc::clone(&producer_shared),
delay,
);
}
producer_shared.available.notify_all();
}
}
DelayOverflowStrategy::Fail => {
if state.queue.len() == DELAY_BUFFER_CAPACITY {
state.queue.clear();
drop(state);
panic_guard.disarm();
finish_delay_queue(
&producer_shared,
TerminalSignal::Error(StreamError::Failed(format!(
"Buffer overflow for delay operator (max capacity was: {DELAY_BUFFER_CAPACITY})!"
))),
);
return Ok(NotUsed);
}
let was_empty = state.queue.is_empty();
state.queue.push_back((deadline, item));
drop(state);
if was_empty {
schedule_notify(
&task_materializer,
Arc::clone(&producer_shared),
delay,
);
}
producer_shared.available.notify_all();
}
DelayOverflowStrategy::EmitEarly => {
if state.queue.len() == DELAY_BUFFER_CAPACITY {
if let Some((early_deadline, _)) = state.queue.front_mut() {
*early_deadline = Instant::now();
}
drop(state);
producer_shared.available.notify_all();
state = producer_shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
while state.queue.len() == DELAY_BUFFER_CAPACITY
&& !cancelled.load(Ordering::SeqCst)
{
state = producer_shared
.available
.wait(state)
.unwrap_or_else(|poison| poison.into_inner());
}
if cancelled.load(Ordering::SeqCst) {
panic_guard.disarm();
return Ok(NotUsed);
}
}
let was_empty = state.queue.is_empty();
state.queue.push_back((deadline, item));
drop(state);
if was_empty {
schedule_notify(
&task_materializer,
Arc::clone(&producer_shared),
delay,
);
}
producer_shared.available.notify_all();
}
}
}
Some(Err(error)) => {
panic_guard.disarm();
finish_delay_queue(&producer_shared, TerminalSignal::Error(error));
return Ok(NotUsed);
}
None => {
panic_guard.disarm();
finish_delay_queue(&producer_shared, TerminalSignal::Complete);
return Ok(NotUsed);
}
}
}
});
Ok(Box::new(DelayQueueStream {
shared,
completion: Some(completion),
}))
}
struct ThrottleStream<Out, CostFn> {
input: BoxStream<Out>,
materializer: Materializer,
token_bucket: TokenBucket,
cost_fn: Arc<CostFn>,
mode: ThrottleMode,
terminal: Option<TerminalSignal>,
}
impl<Out, CostFn> Iterator for ThrottleStream<Out, CostFn>
where
Out: Send + 'static,
CostFn: Fn(&Out) -> u64 + Send + Sync + 'static,
{
type Item = StreamResult<Out>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(terminal) = self.terminal.clone() {
return match terminal {
TerminalSignal::Complete => None,
TerminalSignal::Error(error) => Some(Err(error)),
};
}
match self.input.next() {
Some(Ok(item)) => {
let cost = match catch_unwind(AssertUnwindSafe(|| (self.cost_fn)(&item))) {
Ok(cost) => cost,
Err(_) => {
self.terminal = Some(TerminalSignal::Error(StreamError::AbruptTermination));
return Some(Err(StreamError::AbruptTermination));
}
};
let delay = self.token_bucket.offer(cost);
if delay.is_zero() {
Some(Ok(item))
} else if self.mode == ThrottleMode::Enforcing {
let error =
StreamError::Failed("Maximum throttle throughput exceeded.".to_owned());
self.terminal = Some(TerminalSignal::Error(error.clone()));
Some(Err(error))
} else {
match wait_for_timer(&self.materializer, delay) {
Ok(()) => Some(Ok(item)),
Err(error) => {
self.terminal = Some(TerminalSignal::Error(error.clone()));
Some(Err(error))
}
}
}
}
Some(Err(error)) => {
self.terminal = Some(TerminalSignal::Error(error.clone()));
Some(Err(error))
}
None => {
self.terminal = Some(TerminalSignal::Complete);
None
}
}
}
}
struct GroupedShared<T> {
state: Mutex<GroupedState<T>>,
available: Condvar,
cancelled: Arc<AtomicBool>,
}
struct GroupedState<T> {
ready: VecDeque<Vec<T>>,
current: Vec<T>,
current_weight: u64,
generation: u64,
terminal: Option<TerminalSignal>,
}
impl<T> GroupedShared<T> {
fn new() -> Arc<Self> {
Arc::new(Self {
state: Mutex::new(GroupedState {
ready: VecDeque::new(),
current: Vec::new(),
current_weight: 0,
generation: 0,
terminal: None,
}),
available: Condvar::new(),
cancelled: Arc::new(AtomicBool::new(false)),
})
}
}
struct GroupedStream<T> {
shared: Arc<GroupedShared<T>>,
completion: Option<StreamCompletion<NotUsed>>,
}
impl<T> Iterator for GroupedStream<T> {
type Item = StreamResult<Vec<T>>;
fn next(&mut self) -> Option<Self::Item> {
let mut state = self
.shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
loop {
if let Some(group) = state.ready.pop_front() {
drop(state);
self.shared.available.notify_all();
return Some(Ok(group));
}
if let Some(terminal) = state.terminal.clone() {
return match terminal {
TerminalSignal::Complete => None,
TerminalSignal::Error(error) => Some(Err(error)),
};
}
if self.shared.cancelled.load(Ordering::SeqCst) {
return Some(Err(StreamError::Cancelled));
}
let (next, _) = self
.shared
.available
.wait_timeout(state, WAIT_POLL_INTERVAL)
.unwrap_or_else(|poison| poison.into_inner());
state = next;
}
}
}
impl<T> Drop for GroupedStream<T> {
fn drop(&mut self) {
self.shared.cancelled.store(true, Ordering::SeqCst);
self.shared.available.notify_all();
let _ = self.completion.take();
}
}
fn finish_grouped<T>(shared: &GroupedShared<T>, terminal: TerminalSignal) {
let mut state = shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
if state.terminal.is_none() {
if !state.current.is_empty() {
let current = mem::take(&mut state.current);
state.ready.push_back(current);
state.current_weight = 0;
state.generation = state.generation.wrapping_add(1);
}
state.terminal = Some(terminal);
}
drop(state);
shared.available.notify_all();
}
fn arm_grouped_timer<T: Send + 'static>(
materializer: &Materializer,
shared: Arc<GroupedShared<T>>,
interval: Duration,
generation: u64,
) {
let _timer = materializer.schedule_once(interval, move || {
let mut state = shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
if state.terminal.is_some() || state.generation != generation || state.current.is_empty() {
return;
}
let current = mem::take(&mut state.current);
state.ready.push_back(current);
state.current_weight = 0;
state.generation = state.generation.wrapping_add(1);
drop(state);
shared.available.notify_all();
});
}
fn grouped_weighted_within_stage<Out, CostFn>(
input: BoxStream<Out>,
max_weight: u64,
max_number: usize,
interval: Duration,
cost_fn: Arc<CostFn>,
materializer: &Materializer,
) -> StreamResult<BoxStream<Vec<Out>>>
where
Out: Send + 'static,
CostFn: Fn(&Out) -> u64 + Send + Sync + 'static,
{
assert!(
max_weight > 0,
"grouped_weighted_within max_weight must be greater than zero"
);
assert!(
max_number > 0,
"grouped_weighted_within max_number must be greater than zero"
);
assert!(
interval > Duration::ZERO,
"grouped_weighted_within interval must be greater than zero"
);
let shared = GroupedShared::new();
let producer_shared = Arc::clone(&shared);
let cancelled = Arc::clone(&shared.cancelled);
let state = Arc::clone(&materializer.inner.state);
let materializer = cloned_materializer(materializer);
let task_materializer = materializer.clone();
let completion = materializer.spawn_stream(move |_| {
let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
loop {
if cancelled.load(Ordering::SeqCst) {
return Ok(NotUsed);
}
match input.next() {
Some(Ok(item)) => {
let weight = match catch_unwind(AssertUnwindSafe(|| (cost_fn)(&item))) {
Ok(weight) => weight,
Err(_) => {
finish_grouped(
&producer_shared,
TerminalSignal::Error(StreamError::AbruptTermination),
);
return Ok(NotUsed);
}
};
let mut state = producer_shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
if state.current.is_empty() {
state.current.push(item);
state.current_weight = weight;
state.generation = state.generation.wrapping_add(1);
let generation = state.generation;
drop(state);
arm_grouped_timer(
&task_materializer,
Arc::clone(&producer_shared),
interval,
generation,
);
producer_shared.available.notify_all();
continue;
}
let fits = state.current_weight.saturating_add(weight) <= max_weight
&& state.current.len() < max_number;
if fits {
state.current.push(item);
state.current_weight = state.current_weight.saturating_add(weight);
if state.current_weight >= max_weight || state.current.len() >= max_number {
let current = mem::take(&mut state.current);
state.ready.push_back(current);
state.current_weight = 0;
state.generation = state.generation.wrapping_add(1);
}
drop(state);
producer_shared.available.notify_all();
continue;
}
let current = mem::take(&mut state.current);
state.ready.push_back(current);
state.current_weight = 0;
state.generation = state.generation.wrapping_add(1);
let heavy_alone = weight > max_weight;
if heavy_alone {
state.ready.push_back(vec![item]);
} else {
state.current.push(item);
state.current_weight = weight;
state.generation = state.generation.wrapping_add(1);
let generation = state.generation;
drop(state);
arm_grouped_timer(
&task_materializer,
Arc::clone(&producer_shared),
interval,
generation,
);
producer_shared.available.notify_all();
continue;
}
drop(state);
producer_shared.available.notify_all();
}
Some(Err(error)) => {
finish_grouped(&producer_shared, TerminalSignal::Error(error));
return Ok(NotUsed);
}
None => {
finish_grouped(&producer_shared, TerminalSignal::Complete);
return Ok(NotUsed);
}
}
}
});
Ok(Box::new(GroupedStream {
shared,
completion: Some(completion),
}))
}
struct ForwardExtra {
generation: u64,
}
fn forward_slot_stage<Out, Setup, OnItem>(
input: BoxStream<Out>,
materializer: &Materializer,
setup: Setup,
on_item: OnItem,
) -> StreamResult<BoxStream<Out>>
where
Out: Send + 'static,
Setup:
FnOnce(Arc<SlotShared<Out, ForwardExtra>>, Arc<AtomicBool>, &Materializer) + Send + 'static,
OnItem: Fn(&Arc<SlotShared<Out, ForwardExtra>>, &Materializer, &Out) -> StreamResult<()>
+ Send
+ Sync
+ 'static,
{
let shared = SlotShared::new(ForwardExtra { generation: 0 });
let producer_shared = Arc::clone(&shared);
let cancelled = Arc::clone(&shared.cancelled);
let state = Arc::clone(&materializer.inner.state);
let materializer = cloned_materializer(materializer);
let task_materializer = materializer.clone();
setup(
Arc::clone(&producer_shared),
Arc::clone(&cancelled),
&materializer,
);
let on_item = Arc::new(on_item);
let completion = materializer.spawn_stream(move |_| {
let mut panic_guard = SlotPanicGuard::new(Arc::clone(&producer_shared));
let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
loop {
if cancelled.load(Ordering::SeqCst) {
panic_guard.disarm();
return Ok(NotUsed);
}
match input.next() {
Some(Ok(item)) => {
on_item(&producer_shared, &task_materializer, &item)?;
let mut state = producer_shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
while state.slot.is_some()
&& state.terminal.is_none()
&& !cancelled.load(Ordering::SeqCst)
{
state = producer_shared
.available
.wait(state)
.unwrap_or_else(|poison| poison.into_inner());
}
if cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
panic_guard.disarm();
return Ok(NotUsed);
}
state.slot = Some(item);
state.extra.generation = state.extra.generation.wrapping_add(1);
drop(state);
producer_shared.available.notify_all();
}
Some(Err(error)) => {
panic_guard.disarm();
finish_slot(&producer_shared, TerminalSignal::Error(error));
return Ok(NotUsed);
}
None => {
panic_guard.disarm();
finish_slot(&producer_shared, TerminalSignal::Complete);
return Ok(NotUsed);
}
}
}
});
Ok(Box::new(SlotStream {
shared,
completion: Some(completion),
}))
}
fn take_within_stage<Out: Send + 'static>(
input: BoxStream<Out>,
timeout: Duration,
materializer: &Materializer,
) -> StreamResult<BoxStream<Out>> {
assert!(
timeout > Duration::ZERO,
"take_within timeout must be greater than zero"
);
forward_slot_stage(
input,
materializer,
move |shared, cancelled, materializer| {
let _timer = materializer.schedule_once(timeout, move || {
finish_slot(&shared, TerminalSignal::Complete);
cancelled.store(true, Ordering::SeqCst);
});
},
|_, _, _| Ok(()),
)
}
fn initial_delay_stage<Out: Send + 'static>(
input: BoxStream<Out>,
delay: Duration,
materializer: &Materializer,
) -> StreamResult<BoxStream<Out>> {
assert!(delay >= Duration::ZERO);
let materializer = cloned_materializer(materializer);
Ok(Box::new(InitialDelayStream {
input,
materializer,
opened: delay.is_zero(),
delay,
terminal: None,
}))
}
struct InitialDelayStream<Out> {
input: BoxStream<Out>,
materializer: Materializer,
opened: bool,
delay: Duration,
terminal: Option<TerminalSignal>,
}
impl<Out: Send + 'static> Iterator for InitialDelayStream<Out> {
type Item = StreamResult<Out>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(terminal) = self.terminal.clone() {
return match terminal {
TerminalSignal::Complete => None,
TerminalSignal::Error(error) => Some(Err(error)),
};
}
if !self.opened {
if let Err(error) = wait_for_timer(&self.materializer, self.delay) {
self.terminal = Some(TerminalSignal::Error(error.clone()));
return Some(Err(error));
}
self.opened = true;
}
match self.input.next() {
Some(Ok(item)) => Some(Ok(item)),
Some(Err(error)) => {
self.terminal = Some(TerminalSignal::Error(error.clone()));
Some(Err(error))
}
None => {
self.terminal = Some(TerminalSignal::Complete);
None
}
}
}
}
fn arm_generation_failure<Out: Send + 'static>(
materializer: &Materializer,
shared: Arc<SlotShared<Out, ForwardExtra>>,
timeout: Duration,
message: &'static str,
generation: u64,
require_empty_slot: bool,
) {
let _timer = materializer.schedule_once(timeout, move || {
let should_fail = {
let state = shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
state.terminal.is_none()
&& state.extra.generation == generation
&& (!require_empty_slot || state.slot.is_none())
};
if should_fail {
finish_slot(
&shared,
TerminalSignal::Error(StreamError::Failed(message.to_owned())),
);
shared.cancelled.store(true, Ordering::SeqCst);
}
});
}
fn initial_timeout_stage<Out: Send + 'static>(
input: BoxStream<Out>,
timeout: Duration,
materializer: &Materializer,
) -> StreamResult<BoxStream<Out>> {
assert!(
timeout > Duration::ZERO,
"initial_timeout timeout must be greater than zero"
);
let shared = SlotShared::new(ForwardExtra { generation: 0 });
let producer_shared = Arc::clone(&shared);
let cancelled = Arc::clone(&shared.cancelled);
let state = Arc::clone(&materializer.inner.state);
let materializer = cloned_materializer(materializer);
arm_generation_failure(
&materializer,
Arc::clone(&producer_shared),
timeout,
"The first element has not yet passed through before the initial timeout elapsed.",
0,
true,
);
let completion = materializer.spawn_stream(move |_| {
let mut panic_guard = SlotPanicGuard::new(Arc::clone(&producer_shared));
let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
loop {
if cancelled.load(Ordering::SeqCst) {
panic_guard.disarm();
return Ok(NotUsed);
}
match input.next() {
Some(Ok(item)) => {
let mut state = producer_shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
state.extra.generation = state.extra.generation.wrapping_add(1);
while state.slot.is_some()
&& state.terminal.is_none()
&& !cancelled.load(Ordering::SeqCst)
{
state = producer_shared
.available
.wait(state)
.unwrap_or_else(|poison| poison.into_inner());
}
if cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
panic_guard.disarm();
return Ok(NotUsed);
}
state.slot = Some(item);
drop(state);
producer_shared.available.notify_all();
}
Some(Err(error)) => {
panic_guard.disarm();
finish_slot(&producer_shared, TerminalSignal::Error(error));
return Ok(NotUsed);
}
None => {
panic_guard.disarm();
finish_slot(&producer_shared, TerminalSignal::Complete);
return Ok(NotUsed);
}
}
}
});
Ok(Box::new(SlotStream {
shared,
completion: Some(completion),
}))
}
fn completion_timeout_stage<Out: Send + 'static>(
input: BoxStream<Out>,
timeout: Duration,
materializer: &Materializer,
) -> StreamResult<BoxStream<Out>> {
assert!(
timeout > Duration::ZERO,
"completion_timeout timeout must be greater than zero"
);
forward_slot_stage(
input,
materializer,
move |shared, cancelled, materializer| {
let _timer = materializer.schedule_once(timeout, move || {
finish_slot(
&shared,
TerminalSignal::Error(StreamError::Failed(
"The stream has not been completed before the completion timeout elapsed."
.to_owned(),
)),
);
cancelled.store(true, Ordering::SeqCst);
});
},
|_, _, _| Ok(()),
)
}
fn idle_timeout_stage<Out: Send + 'static>(
input: BoxStream<Out>,
timeout: Duration,
materializer: &Materializer,
) -> StreamResult<BoxStream<Out>> {
assert!(
timeout > Duration::ZERO,
"idle_timeout timeout must be greater than zero"
);
let shared = SlotShared::new(ForwardExtra { generation: 0 });
let producer_shared = Arc::clone(&shared);
let cancelled = Arc::clone(&shared.cancelled);
let state = Arc::clone(&materializer.inner.state);
let materializer = cloned_materializer(materializer);
let task_materializer = materializer.clone();
arm_generation_failure(
&materializer,
Arc::clone(&producer_shared),
timeout,
"No elements passed before the idle timeout elapsed.",
0,
false,
);
let completion = materializer.spawn_stream(move |_| {
let mut panic_guard = SlotPanicGuard::new(Arc::clone(&producer_shared));
let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
loop {
if cancelled.load(Ordering::SeqCst) {
panic_guard.disarm();
return Ok(NotUsed);
}
match input.next() {
Some(Ok(item)) => {
let mut state = producer_shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
while state.slot.is_some()
&& state.terminal.is_none()
&& !cancelled.load(Ordering::SeqCst)
{
state = producer_shared
.available
.wait(state)
.unwrap_or_else(|poison| poison.into_inner());
}
if cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
panic_guard.disarm();
return Ok(NotUsed);
}
state.slot = Some(item);
state.extra.generation = state.extra.generation.wrapping_add(1);
let generation = state.extra.generation;
drop(state);
arm_generation_failure(
&task_materializer,
Arc::clone(&producer_shared),
timeout,
"No elements passed before the idle timeout elapsed.",
generation,
false,
);
producer_shared.available.notify_all();
}
Some(Err(error)) => {
panic_guard.disarm();
finish_slot(&producer_shared, TerminalSignal::Error(error));
return Ok(NotUsed);
}
None => {
panic_guard.disarm();
finish_slot(&producer_shared, TerminalSignal::Complete);
return Ok(NotUsed);
}
}
}
});
Ok(Box::new(SlotStream {
shared,
completion: Some(completion),
}))
}
fn backpressure_timeout_stage<Out: Send + 'static>(
input: BoxStream<Out>,
timeout: Duration,
materializer: &Materializer,
) -> StreamResult<BoxStream<Out>> {
assert!(
timeout > Duration::ZERO,
"backpressure_timeout timeout must be greater than zero"
);
let shared = SlotShared::new(ForwardExtra { generation: 0 });
let producer_shared = Arc::clone(&shared);
let cancelled = Arc::clone(&shared.cancelled);
let state = Arc::clone(&materializer.inner.state);
let materializer = cloned_materializer(materializer);
let task_materializer = materializer.clone();
let completion = materializer.spawn_stream(move |_| {
let schedule_timeout = |generation: u64, shared: Arc<SlotShared<Out, ForwardExtra>>| {
let _timer = task_materializer.schedule_once(timeout, move || {
let mut state = shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
if state.terminal.is_some() || state.extra.generation != generation {
return;
}
state.slot = None;
state.terminal = Some(TerminalSignal::Error(StreamError::Failed(
"No downstream demand signalled before the backpressure timeout elapsed."
.to_owned(),
)));
drop(state);
shared.cancelled.store(true, Ordering::SeqCst);
shared.available.notify_all();
});
};
let mut panic_guard = SlotPanicGuard::new(Arc::clone(&producer_shared));
let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
loop {
if cancelled.load(Ordering::SeqCst) {
panic_guard.disarm();
return Ok(NotUsed);
}
match input.next() {
Some(Ok(item)) => {
let mut state = producer_shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
while state.slot.is_some()
&& state.terminal.is_none()
&& !cancelled.load(Ordering::SeqCst)
{
state = producer_shared
.available
.wait(state)
.unwrap_or_else(|poison| poison.into_inner());
}
if cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
panic_guard.disarm();
return Ok(NotUsed);
}
state.slot = Some(item);
state.extra.generation = state.extra.generation.wrapping_add(1);
let generation = state.extra.generation;
drop(state);
schedule_timeout(generation, Arc::clone(&producer_shared));
producer_shared.available.notify_all();
}
Some(Err(error)) => {
panic_guard.disarm();
finish_slot(&producer_shared, TerminalSignal::Error(error));
return Ok(NotUsed);
}
None => {
panic_guard.disarm();
finish_slot(&producer_shared, TerminalSignal::Complete);
return Ok(NotUsed);
}
}
}
});
struct BackpressureStream<Out> {
shared: Arc<SlotShared<Out, ForwardExtra>>,
completion: Option<StreamCompletion<NotUsed>>,
}
impl<Out> Iterator for BackpressureStream<Out> {
type Item = StreamResult<Out>;
fn next(&mut self) -> Option<Self::Item> {
let mut state = self
.shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
loop {
if let Some(item) = state.slot.take() {
state.extra.generation = state.extra.generation.wrapping_add(1);
drop(state);
self.shared.available.notify_all();
return Some(Ok(item));
}
if let Some(terminal) = state.terminal.clone() {
return match terminal {
TerminalSignal::Complete => None,
TerminalSignal::Error(error) => Some(Err(error)),
};
}
if self.shared.cancelled.load(Ordering::SeqCst) {
return Some(Err(StreamError::Cancelled));
}
let (next, _) = self
.shared
.available
.wait_timeout(state, WAIT_POLL_INTERVAL)
.unwrap_or_else(|poison| poison.into_inner());
state = next;
}
}
}
impl<Out> Drop for BackpressureStream<Out> {
fn drop(&mut self) {
self.shared.cancelled.store(true, Ordering::SeqCst);
self.shared.available.notify_all();
let _ = self.completion.take();
}
}
Ok(Box::new(BackpressureStream {
shared,
completion: Some(completion),
}))
}
struct KeepAliveExtra {
generation: u64,
}
fn arm_keep_alive_timer<Out, Inject>(
materializer: &Materializer,
shared: Arc<SlotShared<Out, KeepAliveExtra>>,
timeout: Duration,
generation: u64,
inject: Arc<Inject>,
) where
Out: Send + 'static,
Inject: Fn() -> Out + Send + Sync + 'static,
{
let materializer = cloned_materializer(materializer);
let task_materializer = materializer.clone();
let _timer = materializer.schedule_once(timeout, move || {
let slot_occupied = {
let state = shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
if state.terminal.is_some() || state.extra.generation != generation {
return;
}
state.slot.is_some()
};
if slot_occupied {
arm_keep_alive_timer(
&task_materializer,
Arc::clone(&shared),
timeout,
generation,
Arc::clone(&inject),
);
return;
}
let injected = match catch_unwind(AssertUnwindSafe(|| inject())) {
Ok(item) => item,
Err(_) => {
finish_slot(
&shared,
TerminalSignal::Error(StreamError::AbruptTermination),
);
shared.cancelled.store(true, Ordering::SeqCst);
return;
}
};
let mut state = shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
if state.terminal.is_some() || state.extra.generation != generation || state.slot.is_some()
{
return;
}
state.slot = Some(injected);
state.extra.generation = state.extra.generation.wrapping_add(1);
let next_generation = state.extra.generation;
drop(state);
shared.available.notify_all();
arm_keep_alive_timer(
&task_materializer,
Arc::clone(&shared),
timeout,
next_generation,
Arc::clone(&inject),
);
});
}
fn keep_alive_stage<Out, Inject>(
input: BoxStream<Out>,
timeout: Duration,
inject: Arc<Inject>,
materializer: &Materializer,
) -> StreamResult<BoxStream<Out>>
where
Out: Send + 'static,
Inject: Fn() -> Out + Send + Sync + 'static,
{
assert!(
timeout > Duration::ZERO,
"keep_alive timeout must be greater than zero"
);
let shared = SlotShared::new(KeepAliveExtra { generation: 0 });
let producer_shared = Arc::clone(&shared);
let cancelled = Arc::clone(&shared.cancelled);
let state = Arc::clone(&materializer.inner.state);
let materializer = cloned_materializer(materializer);
let task_materializer = materializer.clone();
arm_keep_alive_timer(
&materializer,
Arc::clone(&producer_shared),
timeout,
0,
Arc::clone(&inject),
);
let completion = materializer.spawn_stream(move |_| {
let mut panic_guard = SlotPanicGuard::new(Arc::clone(&producer_shared));
let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
loop {
if cancelled.load(Ordering::SeqCst) {
panic_guard.disarm();
return Ok(NotUsed);
}
match input.next() {
Some(Ok(item)) => {
let mut state = producer_shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
while state.slot.is_some()
&& state.terminal.is_none()
&& !cancelled.load(Ordering::SeqCst)
{
state = producer_shared
.available
.wait(state)
.unwrap_or_else(|poison| poison.into_inner());
}
if cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
panic_guard.disarm();
return Ok(NotUsed);
}
state.slot = Some(item);
state.extra.generation = state.extra.generation.wrapping_add(1);
let generation = state.extra.generation;
drop(state);
producer_shared.available.notify_all();
arm_keep_alive_timer(
&task_materializer,
Arc::clone(&producer_shared),
timeout,
generation,
Arc::clone(&inject),
);
}
Some(Err(error)) => {
panic_guard.disarm();
finish_slot(&producer_shared, TerminalSignal::Error(error));
return Ok(NotUsed);
}
None => {
panic_guard.disarm();
finish_slot(&producer_shared, TerminalSignal::Complete);
return Ok(NotUsed);
}
}
}
});
Ok(Box::new(SlotStream {
shared,
completion: Some(completion),
}))
}
fn drop_within_stage<Out: Send + 'static>(
input: BoxStream<Out>,
timeout: Duration,
materializer: &Materializer,
) -> StreamResult<BoxStream<Out>> {
assert!(
timeout > Duration::ZERO,
"drop_within timeout must be greater than zero"
);
struct DropWithinExtra;
let shared = SlotShared::new(DropWithinExtra);
let producer_shared = Arc::clone(&shared);
let cancelled = Arc::clone(&shared.cancelled);
let state = Arc::clone(&materializer.inner.state);
let materializer = cloned_materializer(materializer);
let deadline = Instant::now() + timeout;
let completion = materializer.spawn_stream(move |_| {
let mut panic_guard = SlotPanicGuard::new(Arc::clone(&producer_shared));
let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
loop {
if cancelled.load(Ordering::SeqCst) {
panic_guard.disarm();
return Ok(NotUsed);
}
match input.next() {
Some(Ok(item)) => {
if Instant::now() < deadline {
continue;
}
let mut state = producer_shared
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
while state.slot.is_some()
&& state.terminal.is_none()
&& !cancelled.load(Ordering::SeqCst)
{
state = producer_shared
.available
.wait(state)
.unwrap_or_else(|poison| poison.into_inner());
}
if cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
panic_guard.disarm();
return Ok(NotUsed);
}
state.slot = Some(item);
drop(state);
producer_shared.available.notify_all();
}
Some(Err(error)) => {
panic_guard.disarm();
finish_slot(&producer_shared, TerminalSignal::Error(error));
return Ok(NotUsed);
}
None => {
panic_guard.disarm();
finish_slot(&producer_shared, TerminalSignal::Complete);
return Ok(NotUsed);
}
}
}
});
Ok(Box::new(SlotStream {
shared,
completion: Some(completion),
}))
}
impl<In: Send + 'static, Out: Send + 'static, Mat: Send + 'static> Flow<In, Out, Mat> {
pub fn throttle(
self,
elements: u64,
per: Duration,
maximum_burst: i32,
mode: ThrottleMode,
) -> Flow<In, Out, Mat> {
self.throttle_with_cost(elements, per, maximum_burst, |_| 1, mode)
}
pub fn throttle_with_cost<CostFn>(
self,
cost: u64,
per: Duration,
maximum_burst: i32,
cost_fn: CostFn,
mode: ThrottleMode,
) -> Flow<In, Out, Mat>
where
CostFn: Fn(&Out) -> u64 + Send + Sync + 'static,
{
let cost_fn = Arc::new(cost_fn);
self.via(Flow::from_runtime_transform(move |input, materializer| {
let materializer = cloned_materializer(materializer);
Ok(Box::new(ThrottleStream {
input,
materializer,
token_bucket: TokenBucket::new(cost, per, maximum_burst),
cost_fn: Arc::clone(&cost_fn),
mode,
terminal: None,
}))
}))
}
pub fn delay(self, delay: Duration, strategy: DelayOverflowStrategy) -> Flow<In, Out, Mat> {
self.delay_with(move || move |_: &Out| delay, strategy)
}
pub fn delay_with<Supplier, Strategy>(
self,
delay_strategy_supplier: Supplier,
overflow_strategy: DelayOverflowStrategy,
) -> Flow<In, Out, Mat>
where
Supplier: Fn() -> Strategy + Send + Sync + 'static,
Strategy: FnMut(&Out) -> Duration + Send + 'static,
{
let delay_strategy_supplier = Arc::new(delay_strategy_supplier);
self.via(Flow::from_runtime_transform(move |input, materializer| {
delay_stage(
input,
Arc::clone(&delay_strategy_supplier),
overflow_strategy,
materializer,
)
}))
}
pub fn initial_delay(self, delay: Duration) -> Flow<In, Out, Mat> {
self.via(Flow::from_runtime_transform(move |input, materializer| {
initial_delay_stage(input, delay, materializer)
}))
}
pub fn grouped_within(self, max_number: usize, interval: Duration) -> Flow<In, Vec<Out>, Mat> {
let unit_cost = Arc::new(|_: &Out| 1_u64);
self.via(Flow::from_runtime_transform(move |input, materializer| {
grouped_weighted_within_stage(
input,
max_number as u64,
max_number,
interval,
Arc::clone(&unit_cost),
materializer,
)
}))
}
pub fn grouped_weighted_within<CostFn>(
self,
max_weight: u64,
interval: Duration,
cost_fn: CostFn,
) -> Flow<In, Vec<Out>, Mat>
where
CostFn: Fn(&Out) -> u64 + Send + Sync + 'static,
{
let cost_fn = Arc::new(cost_fn);
self.via(Flow::from_runtime_transform(move |input, materializer| {
grouped_weighted_within_stage(
input,
max_weight,
usize::MAX,
interval,
Arc::clone(&cost_fn),
materializer,
)
}))
}
pub fn drop_within(self, timeout: Duration) -> Flow<In, Out, Mat> {
self.via(Flow::from_runtime_transform(move |input, materializer| {
drop_within_stage(input, timeout, materializer)
}))
}
pub fn take_within(self, timeout: Duration) -> Flow<In, Out, Mat> {
self.via(Flow::from_runtime_transform(move |input, materializer| {
take_within_stage(input, timeout, materializer)
}))
}
pub fn idle_timeout(self, timeout: Duration) -> Flow<In, Out, Mat> {
self.via(Flow::from_runtime_transform(move |input, materializer| {
idle_timeout_stage(input, timeout, materializer)
}))
}
pub fn backpressure_timeout(self, timeout: Duration) -> Flow<In, Out, Mat> {
self.via(Flow::from_runtime_transform(move |input, materializer| {
backpressure_timeout_stage(input, timeout, materializer)
}))
}
pub fn completion_timeout(self, timeout: Duration) -> Flow<In, Out, Mat> {
self.via(Flow::from_runtime_transform(move |input, materializer| {
completion_timeout_stage(input, timeout, materializer)
}))
}
pub fn initial_timeout(self, timeout: Duration) -> Flow<In, Out, Mat> {
self.via(Flow::from_runtime_transform(move |input, materializer| {
initial_timeout_stage(input, timeout, materializer)
}))
}
pub fn keep_alive<Inject>(self, timeout: Duration, inject: Inject) -> Flow<In, Out, Mat>
where
Inject: Fn() -> Out + Send + Sync + 'static,
{
let inject = Arc::new(inject);
self.via(Flow::from_runtime_transform(move |input, materializer| {
keep_alive_stage(input, timeout, Arc::clone(&inject), materializer)
}))
}
}
impl<Out: Send + 'static, Mat: Send + 'static> Source<Out, Mat> {
pub fn throttle(
self,
elements: u64,
per: Duration,
maximum_burst: i32,
mode: ThrottleMode,
) -> Self {
self.via(Flow::identity().throttle(elements, per, maximum_burst, mode))
}
pub fn throttle_with_cost<CostFn>(
self,
cost: u64,
per: Duration,
maximum_burst: i32,
cost_fn: CostFn,
mode: ThrottleMode,
) -> Self
where
CostFn: Fn(&Out) -> u64 + Send + Sync + 'static,
{
self.via(Flow::identity().throttle_with_cost(cost, per, maximum_burst, cost_fn, mode))
}
pub fn delay(self, delay: Duration, strategy: DelayOverflowStrategy) -> Self {
self.via(Flow::identity().delay(delay, strategy))
}
pub fn delay_with<Supplier, Strategy>(
self,
delay_strategy_supplier: Supplier,
overflow_strategy: DelayOverflowStrategy,
) -> Self
where
Supplier: Fn() -> Strategy + Send + Sync + 'static,
Strategy: FnMut(&Out) -> Duration + Send + 'static,
{
self.via(Flow::identity().delay_with(delay_strategy_supplier, overflow_strategy))
}
pub fn initial_delay(self, delay: Duration) -> Self {
self.via(Flow::identity().initial_delay(delay))
}
pub fn grouped_within(self, max_number: usize, interval: Duration) -> Source<Vec<Out>, Mat> {
self.via(Flow::identity().grouped_within(max_number, interval))
}
pub fn grouped_weighted_within<CostFn>(
self,
max_weight: u64,
interval: Duration,
cost_fn: CostFn,
) -> Source<Vec<Out>, Mat>
where
CostFn: Fn(&Out) -> u64 + Send + Sync + 'static,
{
self.via(Flow::identity().grouped_weighted_within(max_weight, interval, cost_fn))
}
pub fn drop_within(self, timeout: Duration) -> Self {
self.via(Flow::identity().drop_within(timeout))
}
pub fn take_within(self, timeout: Duration) -> Self {
self.via(Flow::identity().take_within(timeout))
}
pub fn idle_timeout(self, timeout: Duration) -> Self {
self.via(Flow::identity().idle_timeout(timeout))
}
pub fn backpressure_timeout(self, timeout: Duration) -> Self {
self.via(Flow::identity().backpressure_timeout(timeout))
}
pub fn completion_timeout(self, timeout: Duration) -> Self {
self.via(Flow::identity().completion_timeout(timeout))
}
pub fn initial_timeout(self, timeout: Duration) -> Self {
self.via(Flow::identity().initial_timeout(timeout))
}
pub fn keep_alive<Inject>(self, timeout: Duration, inject: Inject) -> Self
where
Inject: Fn() -> Out + Send + Sync + 'static,
{
self.via(Flow::identity().keep_alive(timeout, inject))
}
}
impl<Out: Clone + Send + Sync + 'static> Source<Out, Cancellable> {
pub fn tick(initial_delay: Duration, interval: Duration, element: Out) -> Self {
assert!(
interval > Duration::ZERO,
"tick interval must be greater than zero"
);
Source::from_materialized_factory(move |materializer| {
struct TickState {
pending: bool,
closed: bool,
}
let shared = Arc::new((
Mutex::new(TickState {
pending: false,
closed: false,
}),
Condvar::new(),
));
let keep_alive: Arc<dyn Send + Sync> =
Arc::clone(&materializer.inner) as Arc<dyn Send + Sync>;
let cancellable = Cancellable::new_with_keep_alive(Some(keep_alive));
let cancelled = Arc::clone(&cancellable.cancelled);
let shutdown = Arc::clone(&materializer.inner.state.shutdown);
let shared_task = Arc::clone(&shared);
let timer =
materializer.schedule_with_fixed_delay(initial_delay, interval, move || {
if cancelled.load(Ordering::SeqCst) || shutdown.load(Ordering::SeqCst) {
return;
}
let (lock, condvar) = &*shared_task;
let mut state = lock.lock().unwrap_or_else(|poison| poison.into_inner());
if !state.closed {
state.pending = true;
}
drop(state);
condvar.notify_all();
});
struct TickStream<Out> {
shared: Arc<(Mutex<TickState>, Condvar)>,
timer: Cancellable,
external: Cancellable,
element: Out,
shutdown: Arc<AtomicBool>,
}
impl<Out: Clone> Iterator for TickStream<Out> {
type Item = StreamResult<Out>;
fn next(&mut self) -> Option<Self::Item> {
let (lock, condvar) = &*self.shared;
let mut state = lock.lock().unwrap_or_else(|poison| poison.into_inner());
loop {
if self.external.is_cancelled() || self.shutdown.load(Ordering::SeqCst) {
state.closed = true;
self.timer.cancel();
return None;
}
if state.pending {
state.pending = false;
return Some(Ok(self.element.clone()));
}
let (next, _) = condvar
.wait_timeout(state, WAIT_POLL_INTERVAL)
.unwrap_or_else(|poison| poison.into_inner());
state = next;
}
}
}
impl<Out> Drop for TickStream<Out> {
fn drop(&mut self) {
let (lock, condvar) = &*self.shared;
let mut state = lock.lock().unwrap_or_else(|poison| poison.into_inner());
state.closed = true;
drop(state);
self.timer.cancel();
condvar.notify_all();
}
}
Ok((
Box::new(TickStream {
shared,
timer,
external: cancellable.clone(),
element: element.clone(),
shutdown: Arc::clone(&materializer.inner.state.shutdown),
}) as BoxStream<Out>,
cancellable,
))
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::testkit::{TestSink, TestSource};
use std::sync::mpsc;
use std::thread;
const LOAD_TIMEOUT: Duration = Duration::from_millis(100);
const LOAD_GAP: Duration = Duration::from_millis(250);
fn materialize_stream<T: Send + 'static, Mat: Send + 'static>(
source: Source<T, Mat>,
) -> (BoxStream<T>, Materializer, Mat) {
let materializer = Materializer::new();
let (stream, mat) = Arc::clone(&source.factory)
.create(&materializer)
.expect("stream materializes");
(stream, materializer, mat)
}
#[test]
fn throttle_shaping_spaces_elements() {
let (tx, rx) = mpsc::channel();
Source::from_iter(1..=3)
.map(move |_| {
tx.send(Instant::now()).unwrap();
})
.throttle(1, Duration::from_millis(40), 0, ThrottleMode::Shaping)
.run_with(Sink::ignore())
.unwrap()
.wait()
.unwrap();
let first = rx.recv_timeout(Duration::from_millis(250)).unwrap();
let second = rx.recv_timeout(Duration::from_millis(250)).unwrap();
let third = rx.recv_timeout(Duration::from_millis(250)).unwrap();
assert!(second.duration_since(first) >= Duration::from_millis(20));
assert!(third.duration_since(second) >= Duration::from_millis(20));
}
#[test]
fn throttle_enforcing_fails_when_rate_exceeded() {
let result = Source::from_iter(1..=3)
.throttle(1, Duration::from_millis(80), 1, ThrottleMode::Enforcing)
.run_collect();
assert_eq!(
result,
Err(StreamError::Failed(
"Maximum throttle throughput exceeded.".to_owned()
))
);
}
#[test]
fn delay_delays_first_element_without_reordering() {
let start = Instant::now();
let items = Source::from_iter([1, 2])
.delay(
Duration::from_millis(40),
DelayOverflowStrategy::Backpressure,
)
.run_collect()
.unwrap();
assert_eq!(items, vec![1, 2]);
assert!(start.elapsed() >= Duration::from_millis(20));
}
#[test]
fn delay_emit_early_preserves_elements_when_buffer_not_full() {
let start = Instant::now();
let items = Source::from_iter([1, 2])
.delay(Duration::from_millis(40), DelayOverflowStrategy::EmitEarly)
.run_collect()
.unwrap();
assert_eq!(items, vec![1, 2]);
assert!(start.elapsed() >= Duration::from_millis(20));
}
#[test]
fn delay_emit_early_flushes_oldest_when_buffer_full() {
let delay = Duration::from_secs(2);
let (mut stream, materializer, _mat) = materialize_stream(
Source::from_iter(1..=17).delay(delay, DelayOverflowStrategy::EmitEarly),
);
let start = Instant::now();
assert_eq!(stream.next(), Some(Ok(1)));
assert!(
start.elapsed() < Duration::from_millis(750),
"EmitEarly should flush well before the configured delay under load"
);
let mut items = vec![1];
for item in stream {
items.push(item.unwrap());
}
assert_eq!(items, (1..=17).collect::<Vec<_>>());
materializer.shutdown();
}
#[test]
fn initial_delay_holds_back_first_pull() {
let start = Instant::now();
let result = Source::single(42)
.initial_delay(Duration::from_millis(40))
.run_with(Sink::head())
.unwrap();
assert_eq!(result.wait().unwrap(), 42);
assert!(start.elapsed() >= Duration::from_millis(20));
}
#[test]
fn grouped_within_flushes_on_timer() {
let (publisher, subscriber) = TestSource::probe::<i32>()
.grouped_within(8, Duration::from_millis(40))
.to_mat(TestSink::probe(), Keep::both)
.run()
.unwrap();
publisher.expect_request();
publisher.send_next(1);
publisher.expect_request();
publisher.send_next(2);
subscriber.request(1);
assert_eq!(subscriber.expect_next(), vec![1, 2]);
}
#[test]
fn grouped_weighted_within_flushes_on_weight() {
let result = Source::from_iter(["aa", "bbb", "c"])
.grouped_weighted_within(4, Duration::from_secs(1), |s| s.len() as u64)
.run_collect()
.unwrap();
assert_eq!(result, vec![vec!["aa"], vec!["bbb", "c"]]);
}
#[test]
fn drop_within_drops_early_elements() {
let (publisher, subscriber) = TestSource::probe::<i32>()
.drop_within(Duration::from_millis(40))
.to_mat(TestSink::probe(), Keep::both)
.run()
.unwrap();
publisher.expect_request();
publisher.send_next(1);
thread::sleep(Duration::from_millis(120));
publisher.expect_request();
publisher.send_next(2);
publisher.send_complete();
subscriber.request(2);
assert_eq!(subscriber.expect_next(), 2);
subscriber.expect_complete();
}
#[test]
fn take_within_completes_after_timeout() {
let (publisher, subscriber) = TestSource::probe::<i32>()
.take_within(Duration::from_millis(40))
.to_mat(TestSink::probe(), Keep::both)
.run()
.unwrap();
publisher.expect_request();
publisher.send_next(1);
subscriber.request(2);
assert_eq!(subscriber.expect_next(), 1);
subscriber.expect_complete();
}
#[test]
fn idle_timeout_fails_on_gap() {
let (publisher, subscriber) = TestSource::probe::<i32>()
.idle_timeout(Duration::from_millis(40))
.to_mat(TestSink::probe(), Keep::both)
.run()
.unwrap();
publisher.expect_request();
publisher.send_next(1);
subscriber.request(2);
assert_eq!(subscriber.expect_next(), 1);
assert!(matches!(subscriber.expect_error(), StreamError::Failed(_)));
}
#[test]
fn backpressure_timeout_fails_without_demand() {
let (publisher, subscriber) = TestSource::probe::<i32>()
.backpressure_timeout(LOAD_TIMEOUT)
.to_mat(TestSink::probe(), Keep::both)
.run()
.unwrap();
subscriber.request(1);
publisher.expect_request();
publisher.send_next(1);
assert_eq!(subscriber.expect_next(), 1);
publisher.expect_request();
publisher.send_next(2);
thread::sleep(LOAD_GAP);
subscriber.request(1);
assert!(matches!(subscriber.expect_error(), StreamError::Failed(_)));
}
#[test]
fn completion_timeout_fails_unfinished_stream() {
let result = Source::<i32>::never()
.completion_timeout(Duration::from_millis(40))
.run_collect();
assert!(matches!(result, Err(StreamError::Failed(_))));
}
#[test]
fn initial_timeout_fails_before_first_element() {
let (mut stream, materializer, _mat) =
materialize_stream(Source::<i32>::never().initial_timeout(Duration::from_millis(40)));
assert!(matches!(stream.next(), Some(Err(StreamError::Failed(_)))));
materializer.shutdown();
}
#[test]
fn keep_alive_injects_on_idle_gap() {
let (publisher, subscriber) = TestSource::probe::<i32>()
.keep_alive(LOAD_TIMEOUT, || 0)
.to_mat(TestSink::probe(), Keep::both)
.run()
.unwrap();
subscriber.request(3);
publisher.expect_request();
publisher.send_next(1);
assert_eq!(subscriber.expect_next(), 1);
assert_eq!(subscriber.expect_next(), 0);
}
#[test]
fn keep_alive_rearms_after_slow_consumer_drains_slot() {
let (publisher, subscriber) = TestSource::probe::<i32>()
.keep_alive(LOAD_TIMEOUT, || 0)
.to_mat(TestSink::probe(), Keep::both)
.run()
.unwrap();
subscriber.request(1);
publisher.expect_request();
publisher.send_next(1);
assert_eq!(subscriber.expect_next(), 1);
publisher.expect_request();
publisher.send_next(2);
thread::sleep(LOAD_GAP);
subscriber.request(1);
assert_eq!(subscriber.expect_next(), 2);
thread::sleep(LOAD_GAP);
subscriber.request(1);
assert_eq!(subscriber.expect_next(), 0);
}
#[test]
#[should_panic(expected = "maximum_burst must be -1 or greater")]
fn throttle_rejects_invalid_negative_maximum_burst() {
let _ = Source::single(1)
.throttle(1, Duration::from_millis(40), -2, ThrottleMode::Shaping)
.run_collect();
}
#[test]
fn tick_drops_missed_ticks_and_cancels() {
let (mut stream, materializer, _cancellable) = materialize_stream(Source::tick(
Duration::from_millis(20),
Duration::from_millis(20),
7_i32,
));
thread::sleep(Duration::from_millis(100));
assert_eq!(stream.next(), Some(Ok(7)));
assert_eq!(stream.next(), Some(Ok(7)));
materializer.shutdown();
assert_eq!(stream.next(), None);
}
}