Skip to main content

datum/stream/
time.rs

1//! Time-aware operators: `throttle`, `delay`/`delay_with`, `initial_delay`,
2//! `*_within`, `*_timeout`, `keep_alive`, and `Source::tick`, plus `ThrottleMode`
3//! and `DelayOverflowStrategy`.
4//!
5//! Timed behavior is driven by the `Runtime`'s shared `TimerDriver`; the delay
6//! buffer holds `DELAY_BUFFER_CAPACITY` (16) pending elements.
7
8use super::*;
9use crate::stream::runtime::current_stream_cancelled;
10use std::collections::VecDeque;
11use std::mem;
12use std::panic::{AssertUnwindSafe, catch_unwind};
13use std::time::Instant;
14
15const DELAY_BUFFER_CAPACITY: usize = 16;
16const WAIT_POLL_INTERVAL: Duration = Duration::from_millis(10);
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19pub enum ThrottleMode {
20    Shaping,
21    Enforcing,
22}
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum DelayOverflowStrategy {
26    EmitEarly,
27    DropHead,
28    DropTail,
29    DropBuffer,
30    DropNew,
31    Backpressure,
32    Fail,
33}
34
35#[derive(Clone)]
36enum TerminalSignal {
37    Complete,
38    Error(StreamError),
39}
40
41struct DelayQueueShared<T> {
42    state: Mutex<DelayQueueState<T>>,
43    available: Condvar,
44    cancelled: Arc<AtomicBool>,
45}
46
47struct DelayQueueState<T> {
48    queue: VecDeque<(Instant, T)>,
49    terminal: Option<TerminalSignal>,
50}
51
52impl<T> DelayQueueShared<T> {
53    fn new() -> Arc<Self> {
54        Arc::new(Self {
55            state: Mutex::new(DelayQueueState {
56                queue: VecDeque::new(),
57                terminal: None,
58            }),
59            available: Condvar::new(),
60            cancelled: Arc::new(AtomicBool::new(false)),
61        })
62    }
63}
64
65struct DelayQueueStream<T> {
66    shared: Arc<DelayQueueShared<T>>,
67    completion: Option<StreamCompletion<NotUsed>>,
68}
69
70impl<T> Iterator for DelayQueueStream<T> {
71    type Item = StreamResult<T>;
72
73    fn next(&mut self) -> Option<Self::Item> {
74        let mut state = self
75            .shared
76            .state
77            .lock()
78            .unwrap_or_else(|poison| poison.into_inner());
79        loop {
80            if let Some((deadline, _)) = state.queue.front() {
81                let now = Instant::now();
82                if *deadline <= now {
83                    let (_, item) = state.queue.pop_front().expect("front element present");
84                    drop(state);
85                    self.shared.available.notify_all();
86                    return Some(Ok(item));
87                }
88                let wait = (*deadline - now).min(WAIT_POLL_INTERVAL);
89                let (next, _) = self
90                    .shared
91                    .available
92                    .wait_timeout(state, wait)
93                    .unwrap_or_else(|poison| poison.into_inner());
94                state = next;
95                continue;
96            }
97
98            if let Some(terminal) = state.terminal.clone() {
99                return match terminal {
100                    TerminalSignal::Complete => None,
101                    TerminalSignal::Error(error) => Some(Err(error)),
102                };
103            }
104            if self.shared.cancelled.load(Ordering::SeqCst) {
105                return Some(Err(StreamError::Cancelled));
106            }
107
108            let (next, _) = self
109                .shared
110                .available
111                .wait_timeout(state, WAIT_POLL_INTERVAL)
112                .unwrap_or_else(|poison| poison.into_inner());
113            state = next;
114        }
115    }
116}
117
118impl<T> Drop for DelayQueueStream<T> {
119    fn drop(&mut self) {
120        self.shared.cancelled.store(true, Ordering::SeqCst);
121        self.shared.available.notify_all();
122        let _ = self.completion.take();
123    }
124}
125
126struct SlotShared<T, Extra> {
127    state: Mutex<SlotState<T, Extra>>,
128    available: Condvar,
129    cancelled: Arc<AtomicBool>,
130}
131
132struct SlotState<T, Extra> {
133    slot: Option<T>,
134    terminal: Option<TerminalSignal>,
135    extra: Extra,
136}
137
138impl<T, Extra> SlotShared<T, Extra> {
139    fn new(extra: Extra) -> Arc<Self> {
140        Arc::new(Self {
141            state: Mutex::new(SlotState {
142                slot: None,
143                terminal: None,
144                extra,
145            }),
146            available: Condvar::new(),
147            cancelled: Arc::new(AtomicBool::new(false)),
148        })
149    }
150}
151
152struct SlotStream<T, Extra> {
153    shared: Arc<SlotShared<T, Extra>>,
154    completion: Option<StreamCompletion<NotUsed>>,
155}
156
157impl<T, Extra> Iterator for SlotStream<T, Extra> {
158    type Item = StreamResult<T>;
159
160    fn next(&mut self) -> Option<Self::Item> {
161        let mut state = self
162            .shared
163            .state
164            .lock()
165            .unwrap_or_else(|poison| poison.into_inner());
166        loop {
167            if let Some(item) = state.slot.take() {
168                drop(state);
169                self.shared.available.notify_all();
170                return Some(Ok(item));
171            }
172            if let Some(terminal) = state.terminal.clone() {
173                return match terminal {
174                    TerminalSignal::Complete => None,
175                    TerminalSignal::Error(error) => Some(Err(error)),
176                };
177            }
178            if self.shared.cancelled.load(Ordering::SeqCst) {
179                return Some(Err(StreamError::Cancelled));
180            }
181            let (next, _) = self
182                .shared
183                .available
184                .wait_timeout(state, WAIT_POLL_INTERVAL)
185                .unwrap_or_else(|poison| poison.into_inner());
186            state = next;
187        }
188    }
189}
190
191impl<T, Extra> Drop for SlotStream<T, Extra> {
192    fn drop(&mut self) {
193        self.shared.cancelled.store(true, Ordering::SeqCst);
194        self.shared.available.notify_all();
195        let _ = self.completion.take();
196    }
197}
198
199fn finish_delay_queue<T>(shared: &DelayQueueShared<T>, terminal: TerminalSignal) {
200    let mut state = shared
201        .state
202        .lock()
203        .unwrap_or_else(|poison| poison.into_inner());
204    if state.terminal.is_none() {
205        state.terminal = Some(terminal);
206    }
207    drop(state);
208    shared.available.notify_all();
209}
210
211fn finish_slot<T, Extra>(shared: &SlotShared<T, Extra>, terminal: TerminalSignal) {
212    let mut state = shared
213        .state
214        .lock()
215        .unwrap_or_else(|poison| poison.into_inner());
216    if state.terminal.is_none() {
217        state.terminal = Some(terminal);
218    }
219    drop(state);
220    shared.available.notify_all();
221}
222
223struct QueuePanicGuard<T> {
224    shared: Arc<DelayQueueShared<T>>,
225    armed: bool,
226}
227
228impl<T> QueuePanicGuard<T> {
229    fn new(shared: Arc<DelayQueueShared<T>>) -> Self {
230        Self {
231            shared,
232            armed: true,
233        }
234    }
235
236    fn disarm(&mut self) {
237        self.armed = false;
238    }
239}
240
241impl<T> Drop for QueuePanicGuard<T> {
242    fn drop(&mut self) {
243        if self.armed {
244            finish_delay_queue(
245                &self.shared,
246                TerminalSignal::Error(StreamError::AbruptTermination),
247            );
248        }
249    }
250}
251
252struct SlotPanicGuard<T, Extra> {
253    shared: Arc<SlotShared<T, Extra>>,
254    armed: bool,
255}
256
257impl<T, Extra> SlotPanicGuard<T, Extra> {
258    fn new(shared: Arc<SlotShared<T, Extra>>) -> Self {
259        Self {
260            shared,
261            armed: true,
262        }
263    }
264
265    fn disarm(&mut self) {
266        self.armed = false;
267    }
268}
269
270impl<T, Extra> Drop for SlotPanicGuard<T, Extra> {
271    fn drop(&mut self) {
272        if self.armed {
273            finish_slot(
274                &self.shared,
275                TerminalSignal::Error(StreamError::AbruptTermination),
276            );
277        }
278    }
279}
280
281fn cloned_materializer(materializer: &Materializer) -> Materializer {
282    materializer.with_name_prefix(Arc::from(materializer.name_prefix()))
283}
284
285fn wait_for_timer(materializer: &Materializer, delay: Duration) -> StreamResult<()> {
286    if delay.is_zero() {
287        return Ok(());
288    }
289    let gate = Arc::new((Mutex::new(false), Condvar::new()));
290    let gate_task = Arc::clone(&gate);
291    let _timer = materializer.schedule_once(delay, move || {
292        let (lock, condvar) = &*gate_task;
293        let mut done = lock.lock().unwrap_or_else(|poison| poison.into_inner());
294        *done = true;
295        drop(done);
296        condvar.notify_all();
297    });
298
299    let (lock, condvar) = &*gate;
300    let mut done = lock.lock().unwrap_or_else(|poison| poison.into_inner());
301    while !*done {
302        if materializer.is_shutdown() {
303            return Err(StreamError::AbruptTermination);
304        }
305        if current_stream_cancelled()
306            .as_ref()
307            .is_some_and(|cancelled| cancelled.load(Ordering::SeqCst))
308        {
309            return Err(StreamError::Cancelled);
310        }
311        let (next, _) = condvar
312            .wait_timeout(done, WAIT_POLL_INTERVAL)
313            .unwrap_or_else(|poison| poison.into_inner());
314        done = next;
315    }
316    Ok(())
317}
318
319#[derive(Debug)]
320struct TokenBucket {
321    available: f64,
322    last: Instant,
323    capacity: f64,
324    nanos_between_tokens: f64,
325}
326
327impl TokenBucket {
328    fn new(cost: u64, per: Duration, maximum_burst: i32) -> Self {
329        assert!(cost > 0, "throttle cost must be greater than zero");
330        assert!(
331            per > Duration::ZERO,
332            "throttle period must be greater than zero"
333        );
334        assert!(
335            per.as_nanos() >= u128::from(cost),
336            "Rates larger than 1 unit / nanosecond are not supported"
337        );
338        assert!(maximum_burst >= -1, "maximum_burst must be -1 or greater");
339        let nanos_between_tokens = per.as_nanos() as f64 / cost as f64;
340        let capacity = if maximum_burst == -1 {
341            let automatic = ((100_000_000_f64 / nanos_between_tokens).max(1.0)).floor();
342            automatic.max(1.0)
343        } else {
344            maximum_burst as f64
345        };
346        Self {
347            available: capacity,
348            last: Instant::now(),
349            capacity,
350            nanos_between_tokens,
351        }
352    }
353
354    fn offer(&mut self, cost: u64) -> Duration {
355        let now = Instant::now();
356        if now > self.last {
357            let elapsed = now.duration_since(self.last).as_nanos() as f64;
358            let replenished = elapsed / self.nanos_between_tokens;
359            self.available = (self.available + replenished).min(self.capacity);
360            self.last = now;
361        }
362
363        let cost = cost as f64;
364        if self.available >= cost {
365            self.available -= cost;
366            Duration::ZERO
367        } else {
368            let needed = cost - self.available;
369            self.available = 0.0;
370            let delay_nanos = (needed * self.nanos_between_tokens).ceil() as u64;
371            self.last = now + Duration::from_nanos(delay_nanos);
372            Duration::from_nanos(delay_nanos)
373        }
374    }
375}
376
377fn schedule_notify<T>(
378    materializer: &Materializer,
379    shared: Arc<DelayQueueShared<T>>,
380    delay: Duration,
381) where
382    T: Send + 'static,
383{
384    if delay.is_zero() {
385        shared.available.notify_all();
386        return;
387    }
388    let _timer = materializer.schedule_once(delay, move || {
389        shared.available.notify_all();
390    });
391}
392
393fn delay_stage<Out, Supplier, Strategy>(
394    input: BoxStream<Out>,
395    delay_strategy_supplier: Arc<Supplier>,
396    overflow_strategy: DelayOverflowStrategy,
397    materializer: &Materializer,
398) -> StreamResult<BoxStream<Out>>
399where
400    Out: Send + 'static,
401    Supplier: Fn() -> Strategy + Send + Sync + 'static,
402    Strategy: FnMut(&Out) -> Duration + Send + 'static,
403{
404    let shared = DelayQueueShared::new();
405    let producer_shared = Arc::clone(&shared);
406    let cancelled = Arc::clone(&shared.cancelled);
407    let state = Arc::clone(&materializer.inner.state);
408    let materializer = cloned_materializer(materializer);
409    let task_materializer = materializer.clone();
410    let completion = materializer.spawn_stream(move |_| {
411        let mut panic_guard = QueuePanicGuard::new(Arc::clone(&producer_shared));
412        let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
413        let mut delay_strategy = delay_strategy_supplier();
414
415        loop {
416            if cancelled.load(Ordering::SeqCst) {
417                panic_guard.disarm();
418                return Ok(NotUsed);
419            }
420
421            match input.next() {
422                Some(Ok(item)) => {
423                    let delay = match catch_unwind(AssertUnwindSafe(|| delay_strategy(&item))) {
424                        Ok(delay) => delay,
425                        Err(_) => {
426                            panic_guard.disarm();
427                            finish_delay_queue(
428                                &producer_shared,
429                                TerminalSignal::Error(StreamError::AbruptTermination),
430                            );
431                            return Ok(NotUsed);
432                        }
433                    };
434
435                    let deadline = Instant::now() + delay;
436                    let mut state = producer_shared
437                        .state
438                        .lock()
439                        .unwrap_or_else(|poison| poison.into_inner());
440
441                    match overflow_strategy {
442                        DelayOverflowStrategy::Backpressure => {
443                            while state.queue.len() == DELAY_BUFFER_CAPACITY
444                                && !cancelled.load(Ordering::SeqCst)
445                            {
446                                state = producer_shared
447                                    .available
448                                    .wait(state)
449                                    .unwrap_or_else(|poison| poison.into_inner());
450                            }
451                            if cancelled.load(Ordering::SeqCst) {
452                                panic_guard.disarm();
453                                return Ok(NotUsed);
454                            }
455                            let was_empty = state.queue.is_empty();
456                            state.queue.push_back((deadline, item));
457                            drop(state);
458                            if was_empty {
459                                schedule_notify(
460                                    &task_materializer,
461                                    Arc::clone(&producer_shared),
462                                    delay,
463                                );
464                            }
465                            producer_shared.available.notify_all();
466                        }
467                        DelayOverflowStrategy::DropHead => {
468                            if state.queue.len() == DELAY_BUFFER_CAPACITY {
469                                let _ = state.queue.pop_front();
470                            }
471                            let was_empty = state.queue.is_empty();
472                            state.queue.push_back((deadline, item));
473                            drop(state);
474                            if was_empty {
475                                schedule_notify(
476                                    &task_materializer,
477                                    Arc::clone(&producer_shared),
478                                    delay,
479                                );
480                            }
481                            producer_shared.available.notify_all();
482                        }
483                        DelayOverflowStrategy::DropTail => {
484                            if state.queue.len() == DELAY_BUFFER_CAPACITY {
485                                let _ = state.queue.pop_back();
486                            }
487                            let was_empty = state.queue.is_empty();
488                            state.queue.push_back((deadline, item));
489                            drop(state);
490                            if was_empty {
491                                schedule_notify(
492                                    &task_materializer,
493                                    Arc::clone(&producer_shared),
494                                    delay,
495                                );
496                            }
497                            producer_shared.available.notify_all();
498                        }
499                        DelayOverflowStrategy::DropBuffer => {
500                            if state.queue.len() == DELAY_BUFFER_CAPACITY {
501                                state.queue.clear();
502                            }
503                            let was_empty = state.queue.is_empty();
504                            state.queue.push_back((deadline, item));
505                            drop(state);
506                            if was_empty {
507                                schedule_notify(
508                                    &task_materializer,
509                                    Arc::clone(&producer_shared),
510                                    delay,
511                                );
512                            }
513                            producer_shared.available.notify_all();
514                        }
515                        DelayOverflowStrategy::DropNew => {
516                            if state.queue.len() < DELAY_BUFFER_CAPACITY {
517                                let was_empty = state.queue.is_empty();
518                                state.queue.push_back((deadline, item));
519                                drop(state);
520                                if was_empty {
521                                    schedule_notify(
522                                        &task_materializer,
523                                        Arc::clone(&producer_shared),
524                                        delay,
525                                    );
526                                }
527                                producer_shared.available.notify_all();
528                            }
529                        }
530                        DelayOverflowStrategy::Fail => {
531                            if state.queue.len() == DELAY_BUFFER_CAPACITY {
532                                state.queue.clear();
533                                drop(state);
534                                panic_guard.disarm();
535                                finish_delay_queue(
536                                    &producer_shared,
537                                    TerminalSignal::Error(StreamError::Failed(format!(
538                                        "Buffer overflow for delay operator (max capacity was: {DELAY_BUFFER_CAPACITY})!"
539                                    ))),
540                                );
541                                return Ok(NotUsed);
542                            }
543                            let was_empty = state.queue.is_empty();
544                            state.queue.push_back((deadline, item));
545                            drop(state);
546                            if was_empty {
547                                schedule_notify(
548                                    &task_materializer,
549                                    Arc::clone(&producer_shared),
550                                    delay,
551                                );
552                            }
553                            producer_shared.available.notify_all();
554                        }
555                        DelayOverflowStrategy::EmitEarly => {
556                            if state.queue.len() == DELAY_BUFFER_CAPACITY {
557                                if let Some((early_deadline, _)) = state.queue.front_mut() {
558                                    *early_deadline = Instant::now();
559                                }
560                                drop(state);
561                                producer_shared.available.notify_all();
562                                state = producer_shared
563                                    .state
564                                    .lock()
565                                    .unwrap_or_else(|poison| poison.into_inner());
566                                while state.queue.len() == DELAY_BUFFER_CAPACITY
567                                    && !cancelled.load(Ordering::SeqCst)
568                                {
569                                    state = producer_shared
570                                        .available
571                                        .wait(state)
572                                        .unwrap_or_else(|poison| poison.into_inner());
573                                }
574                                if cancelled.load(Ordering::SeqCst) {
575                                    panic_guard.disarm();
576                                    return Ok(NotUsed);
577                                }
578                            }
579                            let was_empty = state.queue.is_empty();
580                            state.queue.push_back((deadline, item));
581                            drop(state);
582                            if was_empty {
583                                schedule_notify(
584                                    &task_materializer,
585                                    Arc::clone(&producer_shared),
586                                    delay,
587                                );
588                            }
589                            producer_shared.available.notify_all();
590                        }
591                    }
592                }
593                Some(Err(error)) => {
594                    panic_guard.disarm();
595                    finish_delay_queue(&producer_shared, TerminalSignal::Error(error));
596                    return Ok(NotUsed);
597                }
598                None => {
599                    panic_guard.disarm();
600                    finish_delay_queue(&producer_shared, TerminalSignal::Complete);
601                    return Ok(NotUsed);
602                }
603            }
604        }
605    });
606
607    Ok(Box::new(DelayQueueStream {
608        shared,
609        completion: Some(completion),
610    }))
611}
612
613struct ThrottleStream<Out, CostFn> {
614    input: BoxStream<Out>,
615    materializer: Materializer,
616    token_bucket: TokenBucket,
617    cost_fn: Arc<CostFn>,
618    mode: ThrottleMode,
619    terminal: Option<TerminalSignal>,
620}
621
622impl<Out, CostFn> Iterator for ThrottleStream<Out, CostFn>
623where
624    Out: Send + 'static,
625    CostFn: Fn(&Out) -> u64 + Send + Sync + 'static,
626{
627    type Item = StreamResult<Out>;
628
629    fn next(&mut self) -> Option<Self::Item> {
630        if let Some(terminal) = self.terminal.clone() {
631            return match terminal {
632                TerminalSignal::Complete => None,
633                TerminalSignal::Error(error) => Some(Err(error)),
634            };
635        }
636
637        match self.input.next() {
638            Some(Ok(item)) => {
639                let cost = match catch_unwind(AssertUnwindSafe(|| (self.cost_fn)(&item))) {
640                    Ok(cost) => cost,
641                    Err(_) => {
642                        self.terminal = Some(TerminalSignal::Error(StreamError::AbruptTermination));
643                        return Some(Err(StreamError::AbruptTermination));
644                    }
645                };
646                let delay = self.token_bucket.offer(cost);
647                if delay.is_zero() {
648                    Some(Ok(item))
649                } else if self.mode == ThrottleMode::Enforcing {
650                    let error =
651                        StreamError::Failed("Maximum throttle throughput exceeded.".to_owned());
652                    self.terminal = Some(TerminalSignal::Error(error.clone()));
653                    Some(Err(error))
654                } else {
655                    match wait_for_timer(&self.materializer, delay) {
656                        Ok(()) => Some(Ok(item)),
657                        Err(error) => {
658                            self.terminal = Some(TerminalSignal::Error(error.clone()));
659                            Some(Err(error))
660                        }
661                    }
662                }
663            }
664            Some(Err(error)) => {
665                self.terminal = Some(TerminalSignal::Error(error.clone()));
666                Some(Err(error))
667            }
668            None => {
669                self.terminal = Some(TerminalSignal::Complete);
670                None
671            }
672        }
673    }
674}
675
676struct GroupedShared<T> {
677    state: Mutex<GroupedState<T>>,
678    available: Condvar,
679    cancelled: Arc<AtomicBool>,
680}
681
682struct GroupedState<T> {
683    ready: VecDeque<Vec<T>>,
684    current: Vec<T>,
685    current_weight: u64,
686    generation: u64,
687    terminal: Option<TerminalSignal>,
688}
689
690impl<T> GroupedShared<T> {
691    fn new() -> Arc<Self> {
692        Arc::new(Self {
693            state: Mutex::new(GroupedState {
694                ready: VecDeque::new(),
695                current: Vec::new(),
696                current_weight: 0,
697                generation: 0,
698                terminal: None,
699            }),
700            available: Condvar::new(),
701            cancelled: Arc::new(AtomicBool::new(false)),
702        })
703    }
704}
705
706struct GroupedStream<T> {
707    shared: Arc<GroupedShared<T>>,
708    completion: Option<StreamCompletion<NotUsed>>,
709}
710
711impl<T> Iterator for GroupedStream<T> {
712    type Item = StreamResult<Vec<T>>;
713
714    fn next(&mut self) -> Option<Self::Item> {
715        let mut state = self
716            .shared
717            .state
718            .lock()
719            .unwrap_or_else(|poison| poison.into_inner());
720        loop {
721            if let Some(group) = state.ready.pop_front() {
722                drop(state);
723                self.shared.available.notify_all();
724                return Some(Ok(group));
725            }
726            if let Some(terminal) = state.terminal.clone() {
727                return match terminal {
728                    TerminalSignal::Complete => None,
729                    TerminalSignal::Error(error) => Some(Err(error)),
730                };
731            }
732            if self.shared.cancelled.load(Ordering::SeqCst) {
733                return Some(Err(StreamError::Cancelled));
734            }
735            let (next, _) = self
736                .shared
737                .available
738                .wait_timeout(state, WAIT_POLL_INTERVAL)
739                .unwrap_or_else(|poison| poison.into_inner());
740            state = next;
741        }
742    }
743}
744
745impl<T> Drop for GroupedStream<T> {
746    fn drop(&mut self) {
747        self.shared.cancelled.store(true, Ordering::SeqCst);
748        self.shared.available.notify_all();
749        let _ = self.completion.take();
750    }
751}
752
753fn finish_grouped<T>(shared: &GroupedShared<T>, terminal: TerminalSignal) {
754    let mut state = shared
755        .state
756        .lock()
757        .unwrap_or_else(|poison| poison.into_inner());
758    if state.terminal.is_none() {
759        if !state.current.is_empty() {
760            let current = mem::take(&mut state.current);
761            state.ready.push_back(current);
762            state.current_weight = 0;
763            state.generation = state.generation.wrapping_add(1);
764        }
765        state.terminal = Some(terminal);
766    }
767    drop(state);
768    shared.available.notify_all();
769}
770
771fn arm_grouped_timer<T: Send + 'static>(
772    materializer: &Materializer,
773    shared: Arc<GroupedShared<T>>,
774    interval: Duration,
775    generation: u64,
776) {
777    let _timer = materializer.schedule_once(interval, move || {
778        let mut state = shared
779            .state
780            .lock()
781            .unwrap_or_else(|poison| poison.into_inner());
782        if state.terminal.is_some() || state.generation != generation || state.current.is_empty() {
783            return;
784        }
785        let current = mem::take(&mut state.current);
786        state.ready.push_back(current);
787        state.current_weight = 0;
788        state.generation = state.generation.wrapping_add(1);
789        drop(state);
790        shared.available.notify_all();
791    });
792}
793
794fn grouped_weighted_within_stage<Out, CostFn>(
795    input: BoxStream<Out>,
796    max_weight: u64,
797    max_number: usize,
798    interval: Duration,
799    cost_fn: Arc<CostFn>,
800    materializer: &Materializer,
801) -> StreamResult<BoxStream<Vec<Out>>>
802where
803    Out: Send + 'static,
804    CostFn: Fn(&Out) -> u64 + Send + Sync + 'static,
805{
806    assert!(
807        max_weight > 0,
808        "grouped_weighted_within max_weight must be greater than zero"
809    );
810    assert!(
811        max_number > 0,
812        "grouped_weighted_within max_number must be greater than zero"
813    );
814    assert!(
815        interval > Duration::ZERO,
816        "grouped_weighted_within interval must be greater than zero"
817    );
818
819    let shared = GroupedShared::new();
820    let producer_shared = Arc::clone(&shared);
821    let cancelled = Arc::clone(&shared.cancelled);
822    let state = Arc::clone(&materializer.inner.state);
823    let materializer = cloned_materializer(materializer);
824    let task_materializer = materializer.clone();
825    let completion = materializer.spawn_stream(move |_| {
826        let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
827        loop {
828            if cancelled.load(Ordering::SeqCst) {
829                return Ok(NotUsed);
830            }
831
832            match input.next() {
833                Some(Ok(item)) => {
834                    let weight = match catch_unwind(AssertUnwindSafe(|| (cost_fn)(&item))) {
835                        Ok(weight) => weight,
836                        Err(_) => {
837                            finish_grouped(
838                                &producer_shared,
839                                TerminalSignal::Error(StreamError::AbruptTermination),
840                            );
841                            return Ok(NotUsed);
842                        }
843                    };
844
845                    let mut state = producer_shared
846                        .state
847                        .lock()
848                        .unwrap_or_else(|poison| poison.into_inner());
849                    if state.current.is_empty() {
850                        state.current.push(item);
851                        state.current_weight = weight;
852                        state.generation = state.generation.wrapping_add(1);
853                        let generation = state.generation;
854                        drop(state);
855                        arm_grouped_timer(
856                            &task_materializer,
857                            Arc::clone(&producer_shared),
858                            interval,
859                            generation,
860                        );
861                        producer_shared.available.notify_all();
862                        continue;
863                    }
864
865                    let fits = state.current_weight.saturating_add(weight) <= max_weight
866                        && state.current.len() < max_number;
867                    if fits {
868                        state.current.push(item);
869                        state.current_weight = state.current_weight.saturating_add(weight);
870                        if state.current_weight >= max_weight || state.current.len() >= max_number {
871                            let current = mem::take(&mut state.current);
872                            state.ready.push_back(current);
873                            state.current_weight = 0;
874                            state.generation = state.generation.wrapping_add(1);
875                        }
876                        drop(state);
877                        producer_shared.available.notify_all();
878                        continue;
879                    }
880
881                    let current = mem::take(&mut state.current);
882                    state.ready.push_back(current);
883                    state.current_weight = 0;
884                    state.generation = state.generation.wrapping_add(1);
885                    let heavy_alone = weight > max_weight;
886                    if heavy_alone {
887                        state.ready.push_back(vec![item]);
888                    } else {
889                        state.current.push(item);
890                        state.current_weight = weight;
891                        state.generation = state.generation.wrapping_add(1);
892                        let generation = state.generation;
893                        drop(state);
894                        arm_grouped_timer(
895                            &task_materializer,
896                            Arc::clone(&producer_shared),
897                            interval,
898                            generation,
899                        );
900                        producer_shared.available.notify_all();
901                        continue;
902                    }
903                    drop(state);
904                    producer_shared.available.notify_all();
905                }
906                Some(Err(error)) => {
907                    finish_grouped(&producer_shared, TerminalSignal::Error(error));
908                    return Ok(NotUsed);
909                }
910                None => {
911                    finish_grouped(&producer_shared, TerminalSignal::Complete);
912                    return Ok(NotUsed);
913                }
914            }
915        }
916    });
917
918    Ok(Box::new(GroupedStream {
919        shared,
920        completion: Some(completion),
921    }))
922}
923
924struct ForwardExtra {
925    generation: u64,
926}
927
928fn forward_slot_stage<Out, Setup, OnItem>(
929    input: BoxStream<Out>,
930    materializer: &Materializer,
931    setup: Setup,
932    on_item: OnItem,
933) -> StreamResult<BoxStream<Out>>
934where
935    Out: Send + 'static,
936    Setup:
937        FnOnce(Arc<SlotShared<Out, ForwardExtra>>, Arc<AtomicBool>, &Materializer) + Send + 'static,
938    OnItem: Fn(&Arc<SlotShared<Out, ForwardExtra>>, &Materializer, &Out) -> StreamResult<()>
939        + Send
940        + Sync
941        + 'static,
942{
943    let shared = SlotShared::new(ForwardExtra { generation: 0 });
944    let producer_shared = Arc::clone(&shared);
945    let cancelled = Arc::clone(&shared.cancelled);
946    let state = Arc::clone(&materializer.inner.state);
947    let materializer = cloned_materializer(materializer);
948    let task_materializer = materializer.clone();
949    setup(
950        Arc::clone(&producer_shared),
951        Arc::clone(&cancelled),
952        &materializer,
953    );
954    let on_item = Arc::new(on_item);
955    let completion = materializer.spawn_stream(move |_| {
956        let mut panic_guard = SlotPanicGuard::new(Arc::clone(&producer_shared));
957        let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
958        loop {
959            if cancelled.load(Ordering::SeqCst) {
960                panic_guard.disarm();
961                return Ok(NotUsed);
962            }
963            match input.next() {
964                Some(Ok(item)) => {
965                    on_item(&producer_shared, &task_materializer, &item)?;
966                    let mut state = producer_shared
967                        .state
968                        .lock()
969                        .unwrap_or_else(|poison| poison.into_inner());
970                    while state.slot.is_some()
971                        && state.terminal.is_none()
972                        && !cancelled.load(Ordering::SeqCst)
973                    {
974                        state = producer_shared
975                            .available
976                            .wait(state)
977                            .unwrap_or_else(|poison| poison.into_inner());
978                    }
979                    if cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
980                        panic_guard.disarm();
981                        return Ok(NotUsed);
982                    }
983                    state.slot = Some(item);
984                    state.extra.generation = state.extra.generation.wrapping_add(1);
985                    drop(state);
986                    producer_shared.available.notify_all();
987                }
988                Some(Err(error)) => {
989                    panic_guard.disarm();
990                    finish_slot(&producer_shared, TerminalSignal::Error(error));
991                    return Ok(NotUsed);
992                }
993                None => {
994                    panic_guard.disarm();
995                    finish_slot(&producer_shared, TerminalSignal::Complete);
996                    return Ok(NotUsed);
997                }
998            }
999        }
1000    });
1001
1002    Ok(Box::new(SlotStream {
1003        shared,
1004        completion: Some(completion),
1005    }))
1006}
1007
1008fn take_within_stage<Out: Send + 'static>(
1009    input: BoxStream<Out>,
1010    timeout: Duration,
1011    materializer: &Materializer,
1012) -> StreamResult<BoxStream<Out>> {
1013    assert!(
1014        timeout > Duration::ZERO,
1015        "take_within timeout must be greater than zero"
1016    );
1017    forward_slot_stage(
1018        input,
1019        materializer,
1020        move |shared, cancelled, materializer| {
1021            let _timer = materializer.schedule_once(timeout, move || {
1022                finish_slot(&shared, TerminalSignal::Complete);
1023                cancelled.store(true, Ordering::SeqCst);
1024            });
1025        },
1026        |_, _, _| Ok(()),
1027    )
1028}
1029
1030fn initial_delay_stage<Out: Send + 'static>(
1031    input: BoxStream<Out>,
1032    delay: Duration,
1033    materializer: &Materializer,
1034) -> StreamResult<BoxStream<Out>> {
1035    assert!(delay >= Duration::ZERO);
1036    let materializer = cloned_materializer(materializer);
1037    Ok(Box::new(InitialDelayStream {
1038        input,
1039        materializer,
1040        opened: delay.is_zero(),
1041        delay,
1042        terminal: None,
1043    }))
1044}
1045
1046struct InitialDelayStream<Out> {
1047    input: BoxStream<Out>,
1048    materializer: Materializer,
1049    opened: bool,
1050    delay: Duration,
1051    terminal: Option<TerminalSignal>,
1052}
1053
1054impl<Out: Send + 'static> Iterator for InitialDelayStream<Out> {
1055    type Item = StreamResult<Out>;
1056
1057    fn next(&mut self) -> Option<Self::Item> {
1058        if let Some(terminal) = self.terminal.clone() {
1059            return match terminal {
1060                TerminalSignal::Complete => None,
1061                TerminalSignal::Error(error) => Some(Err(error)),
1062            };
1063        }
1064        if !self.opened {
1065            if let Err(error) = wait_for_timer(&self.materializer, self.delay) {
1066                self.terminal = Some(TerminalSignal::Error(error.clone()));
1067                return Some(Err(error));
1068            }
1069            self.opened = true;
1070        }
1071        match self.input.next() {
1072            Some(Ok(item)) => Some(Ok(item)),
1073            Some(Err(error)) => {
1074                self.terminal = Some(TerminalSignal::Error(error.clone()));
1075                Some(Err(error))
1076            }
1077            None => {
1078                self.terminal = Some(TerminalSignal::Complete);
1079                None
1080            }
1081        }
1082    }
1083}
1084
1085fn arm_generation_failure<Out: Send + 'static>(
1086    materializer: &Materializer,
1087    shared: Arc<SlotShared<Out, ForwardExtra>>,
1088    timeout: Duration,
1089    message: &'static str,
1090    generation: u64,
1091    require_empty_slot: bool,
1092) {
1093    let _timer = materializer.schedule_once(timeout, move || {
1094        let should_fail = {
1095            let state = shared
1096                .state
1097                .lock()
1098                .unwrap_or_else(|poison| poison.into_inner());
1099            state.terminal.is_none()
1100                && state.extra.generation == generation
1101                && (!require_empty_slot || state.slot.is_none())
1102        };
1103        if should_fail {
1104            finish_slot(
1105                &shared,
1106                TerminalSignal::Error(StreamError::Failed(message.to_owned())),
1107            );
1108            shared.cancelled.store(true, Ordering::SeqCst);
1109        }
1110    });
1111}
1112
1113fn initial_timeout_stage<Out: Send + 'static>(
1114    input: BoxStream<Out>,
1115    timeout: Duration,
1116    materializer: &Materializer,
1117) -> StreamResult<BoxStream<Out>> {
1118    assert!(
1119        timeout > Duration::ZERO,
1120        "initial_timeout timeout must be greater than zero"
1121    );
1122    let shared = SlotShared::new(ForwardExtra { generation: 0 });
1123    let producer_shared = Arc::clone(&shared);
1124    let cancelled = Arc::clone(&shared.cancelled);
1125    let state = Arc::clone(&materializer.inner.state);
1126    let materializer = cloned_materializer(materializer);
1127    arm_generation_failure(
1128        &materializer,
1129        Arc::clone(&producer_shared),
1130        timeout,
1131        "The first element has not yet passed through before the initial timeout elapsed.",
1132        0,
1133        true,
1134    );
1135    let completion = materializer.spawn_stream(move |_| {
1136        let mut panic_guard = SlotPanicGuard::new(Arc::clone(&producer_shared));
1137        let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
1138        loop {
1139            if cancelled.load(Ordering::SeqCst) {
1140                panic_guard.disarm();
1141                return Ok(NotUsed);
1142            }
1143            match input.next() {
1144                Some(Ok(item)) => {
1145                    let mut state = producer_shared
1146                        .state
1147                        .lock()
1148                        .unwrap_or_else(|poison| poison.into_inner());
1149                    state.extra.generation = state.extra.generation.wrapping_add(1);
1150                    while state.slot.is_some()
1151                        && state.terminal.is_none()
1152                        && !cancelled.load(Ordering::SeqCst)
1153                    {
1154                        state = producer_shared
1155                            .available
1156                            .wait(state)
1157                            .unwrap_or_else(|poison| poison.into_inner());
1158                    }
1159                    if cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
1160                        panic_guard.disarm();
1161                        return Ok(NotUsed);
1162                    }
1163                    state.slot = Some(item);
1164                    drop(state);
1165                    producer_shared.available.notify_all();
1166                }
1167                Some(Err(error)) => {
1168                    panic_guard.disarm();
1169                    finish_slot(&producer_shared, TerminalSignal::Error(error));
1170                    return Ok(NotUsed);
1171                }
1172                None => {
1173                    panic_guard.disarm();
1174                    finish_slot(&producer_shared, TerminalSignal::Complete);
1175                    return Ok(NotUsed);
1176                }
1177            }
1178        }
1179    });
1180
1181    Ok(Box::new(SlotStream {
1182        shared,
1183        completion: Some(completion),
1184    }))
1185}
1186
1187fn completion_timeout_stage<Out: Send + 'static>(
1188    input: BoxStream<Out>,
1189    timeout: Duration,
1190    materializer: &Materializer,
1191) -> StreamResult<BoxStream<Out>> {
1192    assert!(
1193        timeout > Duration::ZERO,
1194        "completion_timeout timeout must be greater than zero"
1195    );
1196    forward_slot_stage(
1197        input,
1198        materializer,
1199        move |shared, cancelled, materializer| {
1200            let _timer = materializer.schedule_once(timeout, move || {
1201                finish_slot(
1202                    &shared,
1203                    TerminalSignal::Error(StreamError::Failed(
1204                        "The stream has not been completed before the completion timeout elapsed."
1205                            .to_owned(),
1206                    )),
1207                );
1208                cancelled.store(true, Ordering::SeqCst);
1209            });
1210        },
1211        |_, _, _| Ok(()),
1212    )
1213}
1214
1215fn idle_timeout_stage<Out: Send + 'static>(
1216    input: BoxStream<Out>,
1217    timeout: Duration,
1218    materializer: &Materializer,
1219) -> StreamResult<BoxStream<Out>> {
1220    assert!(
1221        timeout > Duration::ZERO,
1222        "idle_timeout timeout must be greater than zero"
1223    );
1224    let shared = SlotShared::new(ForwardExtra { generation: 0 });
1225    let producer_shared = Arc::clone(&shared);
1226    let cancelled = Arc::clone(&shared.cancelled);
1227    let state = Arc::clone(&materializer.inner.state);
1228    let materializer = cloned_materializer(materializer);
1229    let task_materializer = materializer.clone();
1230    arm_generation_failure(
1231        &materializer,
1232        Arc::clone(&producer_shared),
1233        timeout,
1234        "No elements passed before the idle timeout elapsed.",
1235        0,
1236        false,
1237    );
1238    let completion = materializer.spawn_stream(move |_| {
1239        let mut panic_guard = SlotPanicGuard::new(Arc::clone(&producer_shared));
1240        let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
1241        loop {
1242            if cancelled.load(Ordering::SeqCst) {
1243                panic_guard.disarm();
1244                return Ok(NotUsed);
1245            }
1246            match input.next() {
1247                Some(Ok(item)) => {
1248                    let mut state = producer_shared
1249                        .state
1250                        .lock()
1251                        .unwrap_or_else(|poison| poison.into_inner());
1252                    while state.slot.is_some()
1253                        && state.terminal.is_none()
1254                        && !cancelled.load(Ordering::SeqCst)
1255                    {
1256                        state = producer_shared
1257                            .available
1258                            .wait(state)
1259                            .unwrap_or_else(|poison| poison.into_inner());
1260                    }
1261                    if cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
1262                        panic_guard.disarm();
1263                        return Ok(NotUsed);
1264                    }
1265                    state.slot = Some(item);
1266                    state.extra.generation = state.extra.generation.wrapping_add(1);
1267                    let generation = state.extra.generation;
1268                    drop(state);
1269                    arm_generation_failure(
1270                        &task_materializer,
1271                        Arc::clone(&producer_shared),
1272                        timeout,
1273                        "No elements passed before the idle timeout elapsed.",
1274                        generation,
1275                        false,
1276                    );
1277                    producer_shared.available.notify_all();
1278                }
1279                Some(Err(error)) => {
1280                    panic_guard.disarm();
1281                    finish_slot(&producer_shared, TerminalSignal::Error(error));
1282                    return Ok(NotUsed);
1283                }
1284                None => {
1285                    panic_guard.disarm();
1286                    finish_slot(&producer_shared, TerminalSignal::Complete);
1287                    return Ok(NotUsed);
1288                }
1289            }
1290        }
1291    });
1292
1293    Ok(Box::new(SlotStream {
1294        shared,
1295        completion: Some(completion),
1296    }))
1297}
1298
1299fn backpressure_timeout_stage<Out: Send + 'static>(
1300    input: BoxStream<Out>,
1301    timeout: Duration,
1302    materializer: &Materializer,
1303) -> StreamResult<BoxStream<Out>> {
1304    assert!(
1305        timeout > Duration::ZERO,
1306        "backpressure_timeout timeout must be greater than zero"
1307    );
1308    let shared = SlotShared::new(ForwardExtra { generation: 0 });
1309    let producer_shared = Arc::clone(&shared);
1310    let cancelled = Arc::clone(&shared.cancelled);
1311    let state = Arc::clone(&materializer.inner.state);
1312    let materializer = cloned_materializer(materializer);
1313    let task_materializer = materializer.clone();
1314    let completion = materializer.spawn_stream(move |_| {
1315        let schedule_timeout = |generation: u64, shared: Arc<SlotShared<Out, ForwardExtra>>| {
1316            let _timer = task_materializer.schedule_once(timeout, move || {
1317                let mut state = shared
1318                    .state
1319                    .lock()
1320                    .unwrap_or_else(|poison| poison.into_inner());
1321                if state.terminal.is_some() || state.extra.generation != generation {
1322                    return;
1323                }
1324                state.slot = None;
1325                state.terminal = Some(TerminalSignal::Error(StreamError::Failed(
1326                    "No downstream demand signalled before the backpressure timeout elapsed."
1327                        .to_owned(),
1328                )));
1329                drop(state);
1330                shared.cancelled.store(true, Ordering::SeqCst);
1331                shared.available.notify_all();
1332            });
1333        };
1334        let mut panic_guard = SlotPanicGuard::new(Arc::clone(&producer_shared));
1335        let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
1336        loop {
1337            if cancelled.load(Ordering::SeqCst) {
1338                panic_guard.disarm();
1339                return Ok(NotUsed);
1340            }
1341            match input.next() {
1342                Some(Ok(item)) => {
1343                    let mut state = producer_shared
1344                        .state
1345                        .lock()
1346                        .unwrap_or_else(|poison| poison.into_inner());
1347                    while state.slot.is_some()
1348                        && state.terminal.is_none()
1349                        && !cancelled.load(Ordering::SeqCst)
1350                    {
1351                        state = producer_shared
1352                            .available
1353                            .wait(state)
1354                            .unwrap_or_else(|poison| poison.into_inner());
1355                    }
1356                    if cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
1357                        panic_guard.disarm();
1358                        return Ok(NotUsed);
1359                    }
1360                    state.slot = Some(item);
1361                    state.extra.generation = state.extra.generation.wrapping_add(1);
1362                    let generation = state.extra.generation;
1363                    drop(state);
1364                    schedule_timeout(generation, Arc::clone(&producer_shared));
1365                    producer_shared.available.notify_all();
1366                }
1367                Some(Err(error)) => {
1368                    panic_guard.disarm();
1369                    finish_slot(&producer_shared, TerminalSignal::Error(error));
1370                    return Ok(NotUsed);
1371                }
1372                None => {
1373                    panic_guard.disarm();
1374                    finish_slot(&producer_shared, TerminalSignal::Complete);
1375                    return Ok(NotUsed);
1376                }
1377            }
1378        }
1379    });
1380
1381    struct BackpressureStream<Out> {
1382        shared: Arc<SlotShared<Out, ForwardExtra>>,
1383        completion: Option<StreamCompletion<NotUsed>>,
1384    }
1385
1386    impl<Out> Iterator for BackpressureStream<Out> {
1387        type Item = StreamResult<Out>;
1388
1389        fn next(&mut self) -> Option<Self::Item> {
1390            let mut state = self
1391                .shared
1392                .state
1393                .lock()
1394                .unwrap_or_else(|poison| poison.into_inner());
1395            loop {
1396                if let Some(item) = state.slot.take() {
1397                    state.extra.generation = state.extra.generation.wrapping_add(1);
1398                    drop(state);
1399                    self.shared.available.notify_all();
1400                    return Some(Ok(item));
1401                }
1402                if let Some(terminal) = state.terminal.clone() {
1403                    return match terminal {
1404                        TerminalSignal::Complete => None,
1405                        TerminalSignal::Error(error) => Some(Err(error)),
1406                    };
1407                }
1408                if self.shared.cancelled.load(Ordering::SeqCst) {
1409                    return Some(Err(StreamError::Cancelled));
1410                }
1411                let (next, _) = self
1412                    .shared
1413                    .available
1414                    .wait_timeout(state, WAIT_POLL_INTERVAL)
1415                    .unwrap_or_else(|poison| poison.into_inner());
1416                state = next;
1417            }
1418        }
1419    }
1420
1421    impl<Out> Drop for BackpressureStream<Out> {
1422        fn drop(&mut self) {
1423            self.shared.cancelled.store(true, Ordering::SeqCst);
1424            self.shared.available.notify_all();
1425            let _ = self.completion.take();
1426        }
1427    }
1428
1429    Ok(Box::new(BackpressureStream {
1430        shared,
1431        completion: Some(completion),
1432    }))
1433}
1434
1435struct KeepAliveExtra {
1436    generation: u64,
1437}
1438
1439fn arm_keep_alive_timer<Out, Inject>(
1440    materializer: &Materializer,
1441    shared: Arc<SlotShared<Out, KeepAliveExtra>>,
1442    timeout: Duration,
1443    generation: u64,
1444    inject: Arc<Inject>,
1445) where
1446    Out: Send + 'static,
1447    Inject: Fn() -> Out + Send + Sync + 'static,
1448{
1449    let materializer = cloned_materializer(materializer);
1450    let task_materializer = materializer.clone();
1451    let _timer = materializer.schedule_once(timeout, move || {
1452        let slot_occupied = {
1453            let state = shared
1454                .state
1455                .lock()
1456                .unwrap_or_else(|poison| poison.into_inner());
1457            if state.terminal.is_some() || state.extra.generation != generation {
1458                return;
1459            }
1460            state.slot.is_some()
1461        };
1462        if slot_occupied {
1463            arm_keep_alive_timer(
1464                &task_materializer,
1465                Arc::clone(&shared),
1466                timeout,
1467                generation,
1468                Arc::clone(&inject),
1469            );
1470            return;
1471        }
1472
1473        let injected = match catch_unwind(AssertUnwindSafe(|| inject())) {
1474            Ok(item) => item,
1475            Err(_) => {
1476                finish_slot(
1477                    &shared,
1478                    TerminalSignal::Error(StreamError::AbruptTermination),
1479                );
1480                shared.cancelled.store(true, Ordering::SeqCst);
1481                return;
1482            }
1483        };
1484
1485        let mut state = shared
1486            .state
1487            .lock()
1488            .unwrap_or_else(|poison| poison.into_inner());
1489        if state.terminal.is_some() || state.extra.generation != generation || state.slot.is_some()
1490        {
1491            return;
1492        }
1493        state.slot = Some(injected);
1494        state.extra.generation = state.extra.generation.wrapping_add(1);
1495        let next_generation = state.extra.generation;
1496        drop(state);
1497        shared.available.notify_all();
1498        arm_keep_alive_timer(
1499            &task_materializer,
1500            Arc::clone(&shared),
1501            timeout,
1502            next_generation,
1503            Arc::clone(&inject),
1504        );
1505    });
1506}
1507
1508fn keep_alive_stage<Out, Inject>(
1509    input: BoxStream<Out>,
1510    timeout: Duration,
1511    inject: Arc<Inject>,
1512    materializer: &Materializer,
1513) -> StreamResult<BoxStream<Out>>
1514where
1515    Out: Send + 'static,
1516    Inject: Fn() -> Out + Send + Sync + 'static,
1517{
1518    assert!(
1519        timeout > Duration::ZERO,
1520        "keep_alive timeout must be greater than zero"
1521    );
1522    let shared = SlotShared::new(KeepAliveExtra { generation: 0 });
1523    let producer_shared = Arc::clone(&shared);
1524    let cancelled = Arc::clone(&shared.cancelled);
1525    let state = Arc::clone(&materializer.inner.state);
1526    let materializer = cloned_materializer(materializer);
1527    let task_materializer = materializer.clone();
1528    arm_keep_alive_timer(
1529        &materializer,
1530        Arc::clone(&producer_shared),
1531        timeout,
1532        0,
1533        Arc::clone(&inject),
1534    );
1535    let completion = materializer.spawn_stream(move |_| {
1536        let mut panic_guard = SlotPanicGuard::new(Arc::clone(&producer_shared));
1537        let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
1538        loop {
1539            if cancelled.load(Ordering::SeqCst) {
1540                panic_guard.disarm();
1541                return Ok(NotUsed);
1542            }
1543            match input.next() {
1544                Some(Ok(item)) => {
1545                    let mut state = producer_shared
1546                        .state
1547                        .lock()
1548                        .unwrap_or_else(|poison| poison.into_inner());
1549                    while state.slot.is_some()
1550                        && state.terminal.is_none()
1551                        && !cancelled.load(Ordering::SeqCst)
1552                    {
1553                        state = producer_shared
1554                            .available
1555                            .wait(state)
1556                            .unwrap_or_else(|poison| poison.into_inner());
1557                    }
1558                    if cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
1559                        panic_guard.disarm();
1560                        return Ok(NotUsed);
1561                    }
1562                    state.slot = Some(item);
1563                    state.extra.generation = state.extra.generation.wrapping_add(1);
1564                    let generation = state.extra.generation;
1565                    drop(state);
1566                    producer_shared.available.notify_all();
1567                    arm_keep_alive_timer(
1568                        &task_materializer,
1569                        Arc::clone(&producer_shared),
1570                        timeout,
1571                        generation,
1572                        Arc::clone(&inject),
1573                    );
1574                }
1575                Some(Err(error)) => {
1576                    panic_guard.disarm();
1577                    finish_slot(&producer_shared, TerminalSignal::Error(error));
1578                    return Ok(NotUsed);
1579                }
1580                None => {
1581                    panic_guard.disarm();
1582                    finish_slot(&producer_shared, TerminalSignal::Complete);
1583                    return Ok(NotUsed);
1584                }
1585            }
1586        }
1587    });
1588
1589    Ok(Box::new(SlotStream {
1590        shared,
1591        completion: Some(completion),
1592    }))
1593}
1594
1595fn drop_within_stage<Out: Send + 'static>(
1596    input: BoxStream<Out>,
1597    timeout: Duration,
1598    materializer: &Materializer,
1599) -> StreamResult<BoxStream<Out>> {
1600    assert!(
1601        timeout > Duration::ZERO,
1602        "drop_within timeout must be greater than zero"
1603    );
1604    struct DropWithinExtra;
1605    let shared = SlotShared::new(DropWithinExtra);
1606    let producer_shared = Arc::clone(&shared);
1607    let cancelled = Arc::clone(&shared.cancelled);
1608    let state = Arc::clone(&materializer.inner.state);
1609    let materializer = cloned_materializer(materializer);
1610    let deadline = Instant::now() + timeout;
1611    let completion = materializer.spawn_stream(move |_| {
1612        let mut panic_guard = SlotPanicGuard::new(Arc::clone(&producer_shared));
1613        let mut input = runtime_checked_stream(input, state, Some(Arc::clone(&cancelled)));
1614        loop {
1615            if cancelled.load(Ordering::SeqCst) {
1616                panic_guard.disarm();
1617                return Ok(NotUsed);
1618            }
1619            match input.next() {
1620                Some(Ok(item)) => {
1621                    if Instant::now() < deadline {
1622                        continue;
1623                    }
1624                    let mut state = producer_shared
1625                        .state
1626                        .lock()
1627                        .unwrap_or_else(|poison| poison.into_inner());
1628                    while state.slot.is_some()
1629                        && state.terminal.is_none()
1630                        && !cancelled.load(Ordering::SeqCst)
1631                    {
1632                        state = producer_shared
1633                            .available
1634                            .wait(state)
1635                            .unwrap_or_else(|poison| poison.into_inner());
1636                    }
1637                    if cancelled.load(Ordering::SeqCst) || state.terminal.is_some() {
1638                        panic_guard.disarm();
1639                        return Ok(NotUsed);
1640                    }
1641                    state.slot = Some(item);
1642                    drop(state);
1643                    producer_shared.available.notify_all();
1644                }
1645                Some(Err(error)) => {
1646                    panic_guard.disarm();
1647                    finish_slot(&producer_shared, TerminalSignal::Error(error));
1648                    return Ok(NotUsed);
1649                }
1650                None => {
1651                    panic_guard.disarm();
1652                    finish_slot(&producer_shared, TerminalSignal::Complete);
1653                    return Ok(NotUsed);
1654                }
1655            }
1656        }
1657    });
1658
1659    Ok(Box::new(SlotStream {
1660        shared,
1661        completion: Some(completion),
1662    }))
1663}
1664
1665impl<In: Send + 'static, Out: Send + 'static, Mat: Send + 'static> Flow<In, Out, Mat> {
1666    pub fn throttle(
1667        self,
1668        elements: u64,
1669        per: Duration,
1670        maximum_burst: i32,
1671        mode: ThrottleMode,
1672    ) -> Flow<In, Out, Mat> {
1673        self.throttle_with_cost(elements, per, maximum_burst, |_| 1, mode)
1674    }
1675
1676    pub fn throttle_with_cost<CostFn>(
1677        self,
1678        cost: u64,
1679        per: Duration,
1680        maximum_burst: i32,
1681        cost_fn: CostFn,
1682        mode: ThrottleMode,
1683    ) -> Flow<In, Out, Mat>
1684    where
1685        CostFn: Fn(&Out) -> u64 + Send + Sync + 'static,
1686    {
1687        let cost_fn = Arc::new(cost_fn);
1688        self.via(Flow::from_runtime_transform(move |input, materializer| {
1689            let materializer = cloned_materializer(materializer);
1690            Ok(Box::new(ThrottleStream {
1691                input,
1692                materializer,
1693                token_bucket: TokenBucket::new(cost, per, maximum_burst),
1694                cost_fn: Arc::clone(&cost_fn),
1695                mode,
1696                terminal: None,
1697            }))
1698        }))
1699    }
1700
1701    pub fn delay(self, delay: Duration, strategy: DelayOverflowStrategy) -> Flow<In, Out, Mat> {
1702        self.delay_with(move || move |_: &Out| delay, strategy)
1703    }
1704
1705    pub fn delay_with<Supplier, Strategy>(
1706        self,
1707        delay_strategy_supplier: Supplier,
1708        overflow_strategy: DelayOverflowStrategy,
1709    ) -> Flow<In, Out, Mat>
1710    where
1711        Supplier: Fn() -> Strategy + Send + Sync + 'static,
1712        Strategy: FnMut(&Out) -> Duration + Send + 'static,
1713    {
1714        let delay_strategy_supplier = Arc::new(delay_strategy_supplier);
1715        self.via(Flow::from_runtime_transform(move |input, materializer| {
1716            delay_stage(
1717                input,
1718                Arc::clone(&delay_strategy_supplier),
1719                overflow_strategy,
1720                materializer,
1721            )
1722        }))
1723    }
1724
1725    pub fn initial_delay(self, delay: Duration) -> Flow<In, Out, Mat> {
1726        self.via(Flow::from_runtime_transform(move |input, materializer| {
1727            initial_delay_stage(input, delay, materializer)
1728        }))
1729    }
1730
1731    pub fn grouped_within(self, max_number: usize, interval: Duration) -> Flow<In, Vec<Out>, Mat> {
1732        let unit_cost = Arc::new(|_: &Out| 1_u64);
1733        self.via(Flow::from_runtime_transform(move |input, materializer| {
1734            grouped_weighted_within_stage(
1735                input,
1736                max_number as u64,
1737                max_number,
1738                interval,
1739                Arc::clone(&unit_cost),
1740                materializer,
1741            )
1742        }))
1743    }
1744
1745    pub fn grouped_weighted_within<CostFn>(
1746        self,
1747        max_weight: u64,
1748        interval: Duration,
1749        cost_fn: CostFn,
1750    ) -> Flow<In, Vec<Out>, Mat>
1751    where
1752        CostFn: Fn(&Out) -> u64 + Send + Sync + 'static,
1753    {
1754        let cost_fn = Arc::new(cost_fn);
1755        self.via(Flow::from_runtime_transform(move |input, materializer| {
1756            grouped_weighted_within_stage(
1757                input,
1758                max_weight,
1759                usize::MAX,
1760                interval,
1761                Arc::clone(&cost_fn),
1762                materializer,
1763            )
1764        }))
1765    }
1766
1767    pub fn drop_within(self, timeout: Duration) -> Flow<In, Out, Mat> {
1768        self.via(Flow::from_runtime_transform(move |input, materializer| {
1769            drop_within_stage(input, timeout, materializer)
1770        }))
1771    }
1772
1773    pub fn take_within(self, timeout: Duration) -> Flow<In, Out, Mat> {
1774        self.via(Flow::from_runtime_transform(move |input, materializer| {
1775            take_within_stage(input, timeout, materializer)
1776        }))
1777    }
1778
1779    pub fn idle_timeout(self, timeout: Duration) -> Flow<In, Out, Mat> {
1780        self.via(Flow::from_runtime_transform(move |input, materializer| {
1781            idle_timeout_stage(input, timeout, materializer)
1782        }))
1783    }
1784
1785    pub fn backpressure_timeout(self, timeout: Duration) -> Flow<In, Out, Mat> {
1786        self.via(Flow::from_runtime_transform(move |input, materializer| {
1787            backpressure_timeout_stage(input, timeout, materializer)
1788        }))
1789    }
1790
1791    pub fn completion_timeout(self, timeout: Duration) -> Flow<In, Out, Mat> {
1792        self.via(Flow::from_runtime_transform(move |input, materializer| {
1793            completion_timeout_stage(input, timeout, materializer)
1794        }))
1795    }
1796
1797    pub fn initial_timeout(self, timeout: Duration) -> Flow<In, Out, Mat> {
1798        self.via(Flow::from_runtime_transform(move |input, materializer| {
1799            initial_timeout_stage(input, timeout, materializer)
1800        }))
1801    }
1802
1803    pub fn keep_alive<Inject>(self, timeout: Duration, inject: Inject) -> Flow<In, Out, Mat>
1804    where
1805        Inject: Fn() -> Out + Send + Sync + 'static,
1806    {
1807        let inject = Arc::new(inject);
1808        self.via(Flow::from_runtime_transform(move |input, materializer| {
1809            keep_alive_stage(input, timeout, Arc::clone(&inject), materializer)
1810        }))
1811    }
1812}
1813
1814impl<Out: Send + 'static, Mat: Send + 'static> Source<Out, Mat> {
1815    pub fn throttle(
1816        self,
1817        elements: u64,
1818        per: Duration,
1819        maximum_burst: i32,
1820        mode: ThrottleMode,
1821    ) -> Self {
1822        self.via(Flow::identity().throttle(elements, per, maximum_burst, mode))
1823    }
1824
1825    pub fn throttle_with_cost<CostFn>(
1826        self,
1827        cost: u64,
1828        per: Duration,
1829        maximum_burst: i32,
1830        cost_fn: CostFn,
1831        mode: ThrottleMode,
1832    ) -> Self
1833    where
1834        CostFn: Fn(&Out) -> u64 + Send + Sync + 'static,
1835    {
1836        self.via(Flow::identity().throttle_with_cost(cost, per, maximum_burst, cost_fn, mode))
1837    }
1838
1839    pub fn delay(self, delay: Duration, strategy: DelayOverflowStrategy) -> Self {
1840        self.via(Flow::identity().delay(delay, strategy))
1841    }
1842
1843    pub fn delay_with<Supplier, Strategy>(
1844        self,
1845        delay_strategy_supplier: Supplier,
1846        overflow_strategy: DelayOverflowStrategy,
1847    ) -> Self
1848    where
1849        Supplier: Fn() -> Strategy + Send + Sync + 'static,
1850        Strategy: FnMut(&Out) -> Duration + Send + 'static,
1851    {
1852        self.via(Flow::identity().delay_with(delay_strategy_supplier, overflow_strategy))
1853    }
1854
1855    pub fn initial_delay(self, delay: Duration) -> Self {
1856        self.via(Flow::identity().initial_delay(delay))
1857    }
1858
1859    pub fn grouped_within(self, max_number: usize, interval: Duration) -> Source<Vec<Out>, Mat> {
1860        self.via(Flow::identity().grouped_within(max_number, interval))
1861    }
1862
1863    pub fn grouped_weighted_within<CostFn>(
1864        self,
1865        max_weight: u64,
1866        interval: Duration,
1867        cost_fn: CostFn,
1868    ) -> Source<Vec<Out>, Mat>
1869    where
1870        CostFn: Fn(&Out) -> u64 + Send + Sync + 'static,
1871    {
1872        self.via(Flow::identity().grouped_weighted_within(max_weight, interval, cost_fn))
1873    }
1874
1875    pub fn drop_within(self, timeout: Duration) -> Self {
1876        self.via(Flow::identity().drop_within(timeout))
1877    }
1878
1879    pub fn take_within(self, timeout: Duration) -> Self {
1880        self.via(Flow::identity().take_within(timeout))
1881    }
1882
1883    pub fn idle_timeout(self, timeout: Duration) -> Self {
1884        self.via(Flow::identity().idle_timeout(timeout))
1885    }
1886
1887    pub fn backpressure_timeout(self, timeout: Duration) -> Self {
1888        self.via(Flow::identity().backpressure_timeout(timeout))
1889    }
1890
1891    pub fn completion_timeout(self, timeout: Duration) -> Self {
1892        self.via(Flow::identity().completion_timeout(timeout))
1893    }
1894
1895    pub fn initial_timeout(self, timeout: Duration) -> Self {
1896        self.via(Flow::identity().initial_timeout(timeout))
1897    }
1898
1899    pub fn keep_alive<Inject>(self, timeout: Duration, inject: Inject) -> Self
1900    where
1901        Inject: Fn() -> Out + Send + Sync + 'static,
1902    {
1903        self.via(Flow::identity().keep_alive(timeout, inject))
1904    }
1905}
1906
1907impl<Out: Clone + Send + Sync + 'static> Source<Out, Cancellable> {
1908    pub fn tick(initial_delay: Duration, interval: Duration, element: Out) -> Self {
1909        assert!(
1910            interval > Duration::ZERO,
1911            "tick interval must be greater than zero"
1912        );
1913        Source::from_materialized_factory(move |materializer| {
1914            struct TickState {
1915                pending: bool,
1916                closed: bool,
1917            }
1918
1919            let shared = Arc::new((
1920                Mutex::new(TickState {
1921                    pending: false,
1922                    closed: false,
1923                }),
1924                Condvar::new(),
1925            ));
1926            let keep_alive: Arc<dyn Send + Sync> =
1927                Arc::clone(&materializer.inner) as Arc<dyn Send + Sync>;
1928            let cancellable = Cancellable::new_with_keep_alive(Some(keep_alive));
1929            let cancelled = Arc::clone(&cancellable.cancelled);
1930            let shutdown = Arc::clone(&materializer.inner.state.shutdown);
1931            let shared_task = Arc::clone(&shared);
1932            let timer =
1933                materializer.schedule_with_fixed_delay(initial_delay, interval, move || {
1934                    if cancelled.load(Ordering::SeqCst) || shutdown.load(Ordering::SeqCst) {
1935                        return;
1936                    }
1937                    let (lock, condvar) = &*shared_task;
1938                    let mut state = lock.lock().unwrap_or_else(|poison| poison.into_inner());
1939                    if !state.closed {
1940                        state.pending = true;
1941                    }
1942                    drop(state);
1943                    condvar.notify_all();
1944                });
1945
1946            struct TickStream<Out> {
1947                shared: Arc<(Mutex<TickState>, Condvar)>,
1948                timer: Cancellable,
1949                external: Cancellable,
1950                element: Out,
1951                shutdown: Arc<AtomicBool>,
1952            }
1953
1954            impl<Out: Clone> Iterator for TickStream<Out> {
1955                type Item = StreamResult<Out>;
1956
1957                fn next(&mut self) -> Option<Self::Item> {
1958                    let (lock, condvar) = &*self.shared;
1959                    let mut state = lock.lock().unwrap_or_else(|poison| poison.into_inner());
1960                    loop {
1961                        if self.external.is_cancelled() || self.shutdown.load(Ordering::SeqCst) {
1962                            state.closed = true;
1963                            self.timer.cancel();
1964                            return None;
1965                        }
1966                        if state.pending {
1967                            state.pending = false;
1968                            return Some(Ok(self.element.clone()));
1969                        }
1970                        let (next, _) = condvar
1971                            .wait_timeout(state, WAIT_POLL_INTERVAL)
1972                            .unwrap_or_else(|poison| poison.into_inner());
1973                        state = next;
1974                    }
1975                }
1976            }
1977
1978            impl<Out> Drop for TickStream<Out> {
1979                fn drop(&mut self) {
1980                    let (lock, condvar) = &*self.shared;
1981                    let mut state = lock.lock().unwrap_or_else(|poison| poison.into_inner());
1982                    state.closed = true;
1983                    drop(state);
1984                    self.timer.cancel();
1985                    condvar.notify_all();
1986                }
1987            }
1988
1989            Ok((
1990                Box::new(TickStream {
1991                    shared,
1992                    timer,
1993                    external: cancellable.clone(),
1994                    element: element.clone(),
1995                    shutdown: Arc::clone(&materializer.inner.state.shutdown),
1996                }) as BoxStream<Out>,
1997                cancellable,
1998            ))
1999        })
2000    }
2001}
2002
2003#[cfg(test)]
2004mod tests {
2005    use super::*;
2006    use crate::testkit::{TestSink, TestSource};
2007    use std::sync::mpsc;
2008    use std::thread;
2009
2010    const LOAD_TIMEOUT: Duration = Duration::from_millis(100);
2011    const LOAD_GAP: Duration = Duration::from_millis(250);
2012
2013    fn materialize_stream<T: Send + 'static, Mat: Send + 'static>(
2014        source: Source<T, Mat>,
2015    ) -> (BoxStream<T>, Materializer, Mat) {
2016        let materializer = Materializer::new();
2017        let (stream, mat) = Arc::clone(&source.factory)
2018            .create(&materializer)
2019            .expect("stream materializes");
2020        (stream, materializer, mat)
2021    }
2022
2023    #[test]
2024    fn throttle_shaping_spaces_elements() {
2025        let (tx, rx) = mpsc::channel();
2026        Source::from_iter(1..=3)
2027            .map(move |_| {
2028                tx.send(Instant::now()).unwrap();
2029            })
2030            .throttle(1, Duration::from_millis(40), 0, ThrottleMode::Shaping)
2031            .run_with(Sink::ignore())
2032            .unwrap()
2033            .wait()
2034            .unwrap();
2035
2036        let first = rx.recv_timeout(Duration::from_millis(250)).unwrap();
2037        let second = rx.recv_timeout(Duration::from_millis(250)).unwrap();
2038        let third = rx.recv_timeout(Duration::from_millis(250)).unwrap();
2039        assert!(second.duration_since(first) >= Duration::from_millis(20));
2040        assert!(third.duration_since(second) >= Duration::from_millis(20));
2041    }
2042
2043    #[test]
2044    fn throttle_enforcing_fails_when_rate_exceeded() {
2045        let result = Source::from_iter(1..=3)
2046            .throttle(1, Duration::from_millis(80), 1, ThrottleMode::Enforcing)
2047            .run_collect();
2048        assert_eq!(
2049            result,
2050            Err(StreamError::Failed(
2051                "Maximum throttle throughput exceeded.".to_owned()
2052            ))
2053        );
2054    }
2055
2056    #[test]
2057    fn delay_delays_first_element_without_reordering() {
2058        let start = Instant::now();
2059        let items = Source::from_iter([1, 2])
2060            .delay(
2061                Duration::from_millis(40),
2062                DelayOverflowStrategy::Backpressure,
2063            )
2064            .run_collect()
2065            .unwrap();
2066        assert_eq!(items, vec![1, 2]);
2067        assert!(start.elapsed() >= Duration::from_millis(20));
2068    }
2069
2070    #[test]
2071    fn delay_emit_early_preserves_elements_when_buffer_not_full() {
2072        let start = Instant::now();
2073        let items = Source::from_iter([1, 2])
2074            .delay(Duration::from_millis(40), DelayOverflowStrategy::EmitEarly)
2075            .run_collect()
2076            .unwrap();
2077        assert_eq!(items, vec![1, 2]);
2078        assert!(start.elapsed() >= Duration::from_millis(20));
2079    }
2080
2081    #[test]
2082    fn delay_emit_early_flushes_oldest_when_buffer_full() {
2083        let delay = Duration::from_secs(2);
2084        let (mut stream, materializer, _mat) = materialize_stream(
2085            Source::from_iter(1..=17).delay(delay, DelayOverflowStrategy::EmitEarly),
2086        );
2087
2088        let start = Instant::now();
2089        assert_eq!(stream.next(), Some(Ok(1)));
2090        assert!(
2091            start.elapsed() < Duration::from_millis(750),
2092            "EmitEarly should flush well before the configured delay under load"
2093        );
2094
2095        let mut items = vec![1];
2096        for item in stream {
2097            items.push(item.unwrap());
2098        }
2099        assert_eq!(items, (1..=17).collect::<Vec<_>>());
2100        materializer.shutdown();
2101    }
2102
2103    #[test]
2104    fn initial_delay_holds_back_first_pull() {
2105        let start = Instant::now();
2106        let result = Source::single(42)
2107            .initial_delay(Duration::from_millis(40))
2108            .run_with(Sink::head())
2109            .unwrap();
2110        assert_eq!(result.wait().unwrap(), 42);
2111        assert!(start.elapsed() >= Duration::from_millis(20));
2112    }
2113
2114    #[test]
2115    fn grouped_within_flushes_on_timer() {
2116        let (publisher, subscriber) = TestSource::probe::<i32>()
2117            .grouped_within(8, Duration::from_millis(40))
2118            .to_mat(TestSink::probe(), Keep::both)
2119            .run()
2120            .unwrap();
2121
2122        publisher.expect_request();
2123        publisher.send_next(1);
2124        publisher.expect_request();
2125        publisher.send_next(2);
2126
2127        subscriber.request(1);
2128        assert_eq!(subscriber.expect_next(), vec![1, 2]);
2129    }
2130
2131    #[test]
2132    fn grouped_weighted_within_flushes_on_weight() {
2133        let result = Source::from_iter(["aa", "bbb", "c"])
2134            .grouped_weighted_within(4, Duration::from_secs(1), |s| s.len() as u64)
2135            .run_collect()
2136            .unwrap();
2137        assert_eq!(result, vec![vec!["aa"], vec!["bbb", "c"]]);
2138    }
2139
2140    #[test]
2141    fn drop_within_drops_early_elements() {
2142        let (publisher, subscriber) = TestSource::probe::<i32>()
2143            .drop_within(Duration::from_millis(40))
2144            .to_mat(TestSink::probe(), Keep::both)
2145            .run()
2146            .unwrap();
2147
2148        publisher.expect_request();
2149        publisher.send_next(1);
2150        thread::sleep(Duration::from_millis(120));
2151        publisher.expect_request();
2152        publisher.send_next(2);
2153        publisher.send_complete();
2154
2155        subscriber.request(2);
2156        assert_eq!(subscriber.expect_next(), 2);
2157        subscriber.expect_complete();
2158    }
2159
2160    #[test]
2161    fn take_within_completes_after_timeout() {
2162        let (publisher, subscriber) = TestSource::probe::<i32>()
2163            .take_within(Duration::from_millis(40))
2164            .to_mat(TestSink::probe(), Keep::both)
2165            .run()
2166            .unwrap();
2167
2168        publisher.expect_request();
2169        publisher.send_next(1);
2170        subscriber.request(2);
2171        assert_eq!(subscriber.expect_next(), 1);
2172        subscriber.expect_complete();
2173    }
2174
2175    #[test]
2176    fn idle_timeout_fails_on_gap() {
2177        let (publisher, subscriber) = TestSource::probe::<i32>()
2178            .idle_timeout(Duration::from_millis(40))
2179            .to_mat(TestSink::probe(), Keep::both)
2180            .run()
2181            .unwrap();
2182
2183        publisher.expect_request();
2184        publisher.send_next(1);
2185        subscriber.request(2);
2186        assert_eq!(subscriber.expect_next(), 1);
2187        assert!(matches!(subscriber.expect_error(), StreamError::Failed(_)));
2188    }
2189
2190    #[test]
2191    fn backpressure_timeout_fails_without_demand() {
2192        let (publisher, subscriber) = TestSource::probe::<i32>()
2193            .backpressure_timeout(LOAD_TIMEOUT)
2194            .to_mat(TestSink::probe(), Keep::both)
2195            .run()
2196            .unwrap();
2197
2198        subscriber.request(1);
2199        publisher.expect_request();
2200        publisher.send_next(1);
2201        assert_eq!(subscriber.expect_next(), 1);
2202        publisher.expect_request();
2203        publisher.send_next(2);
2204        thread::sleep(LOAD_GAP);
2205        subscriber.request(1);
2206        assert!(matches!(subscriber.expect_error(), StreamError::Failed(_)));
2207    }
2208
2209    #[test]
2210    fn completion_timeout_fails_unfinished_stream() {
2211        let result = Source::<i32>::never()
2212            .completion_timeout(Duration::from_millis(40))
2213            .run_collect();
2214        assert!(matches!(result, Err(StreamError::Failed(_))));
2215    }
2216
2217    #[test]
2218    fn initial_timeout_fails_before_first_element() {
2219        let (mut stream, materializer, _mat) =
2220            materialize_stream(Source::<i32>::never().initial_timeout(Duration::from_millis(40)));
2221        assert!(matches!(stream.next(), Some(Err(StreamError::Failed(_)))));
2222        materializer.shutdown();
2223    }
2224
2225    #[test]
2226    fn keep_alive_injects_on_idle_gap() {
2227        let (publisher, subscriber) = TestSource::probe::<i32>()
2228            .keep_alive(LOAD_TIMEOUT, || 0)
2229            .to_mat(TestSink::probe(), Keep::both)
2230            .run()
2231            .unwrap();
2232
2233        subscriber.request(3);
2234        publisher.expect_request();
2235        publisher.send_next(1);
2236        assert_eq!(subscriber.expect_next(), 1);
2237        assert_eq!(subscriber.expect_next(), 0);
2238    }
2239
2240    #[test]
2241    fn keep_alive_rearms_after_slow_consumer_drains_slot() {
2242        let (publisher, subscriber) = TestSource::probe::<i32>()
2243            .keep_alive(LOAD_TIMEOUT, || 0)
2244            .to_mat(TestSink::probe(), Keep::both)
2245            .run()
2246            .unwrap();
2247
2248        subscriber.request(1);
2249        publisher.expect_request();
2250        publisher.send_next(1);
2251        assert_eq!(subscriber.expect_next(), 1);
2252
2253        publisher.expect_request();
2254        publisher.send_next(2);
2255        thread::sleep(LOAD_GAP);
2256
2257        subscriber.request(1);
2258        assert_eq!(subscriber.expect_next(), 2);
2259
2260        thread::sleep(LOAD_GAP);
2261        subscriber.request(1);
2262        assert_eq!(subscriber.expect_next(), 0);
2263    }
2264
2265    #[test]
2266    #[should_panic(expected = "maximum_burst must be -1 or greater")]
2267    fn throttle_rejects_invalid_negative_maximum_burst() {
2268        let _ = Source::single(1)
2269            .throttle(1, Duration::from_millis(40), -2, ThrottleMode::Shaping)
2270            .run_collect();
2271    }
2272
2273    #[test]
2274    fn tick_drops_missed_ticks_and_cancels() {
2275        let (mut stream, materializer, _cancellable) = materialize_stream(Source::tick(
2276            Duration::from_millis(20),
2277            Duration::from_millis(20),
2278            7_i32,
2279        ));
2280
2281        thread::sleep(Duration::from_millis(100));
2282        assert_eq!(stream.next(), Some(Ok(7)));
2283        assert_eq!(stream.next(), Some(Ok(7)));
2284        materializer.shutdown();
2285        assert_eq!(stream.next(), None);
2286    }
2287}